Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphElement/GElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ class GElement : public GElementObject,
friend class GRegion;
friend class GCondition;
friend class GMutable;
template<CInt> friend class GSome;
friend class GSome;
template<GMultiConditionType> friend class GMultiCondition;
friend class GPipeline;
friend class GElementManager;
Expand Down
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphElement/GGroup/GGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class GGroup : public GElement {
friend class GElementRepository;
friend class GStorage;
template<GMultiConditionType> friend class GMultiCondition;
template<CInt> friend class GSome;
friend class GSome;

public:
CStatus __addGElements_4py(const GElementPtrArr& elements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,21 @@
@Desc:
***************************/

#ifndef CGRAPH_GSOME_INL
#define CGRAPH_GSOME_INL
#ifndef CGRAPH_GSOME_CPP
#define CGRAPH_GSOME_CPP

#include "GSome.h"

CGRAPH_NAMESPACE_BEGIN

template<CInt TriggerNum>
GSome<TriggerNum>::GSome() {
GSome::GSome() {
element_type_ = GElementType::SOME;
binding_index_ = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY;
session_ = URandom<>::generateSession(CGRAPH_STR_SOME);
}


template<CInt TriggerNum>
CStatus GSome<TriggerNum>::addElementEx(GElementPtr element) {
CStatus GSome::addElementEx(GElementPtr element) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)
CGRAPH_ASSERT_NOT_NULL(element)
Expand All @@ -39,11 +37,12 @@ CStatus GSome<TriggerNum>::addElementEx(GElementPtr element) {
}


template<CInt TriggerNum>
CStatus GSome<TriggerNum>::run() {
CStatus GSome::run() {
CGRAPH_FUNCTION_BEGIN

left_num_ = TriggerNum; // 还剩n个,就完成当前GSome的执行逻辑
wait_num_ = static_cast<CInt>(getWaitNum());
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(wait_num_ > children_.size(),
"num is bigger than elements size.");
cur_status_.reset();

/**
Expand All @@ -53,20 +52,21 @@ CStatus GSome<TriggerNum>::run() {
* 4. 赋返回值
*/
for (auto* element : children_) {
thread_pool_->commit([this, element] {
thread_pool_->execute([this, element] {
{
const auto& curStatus = element->fatProcessor(CFunctionType::RUN);
CGRAPH_UNIQUE_LOCK lock(lock_);
cur_status_ += curStatus;
left_num_--;
if (--wait_num_ <= 0 || cur_status_.isErr()) {
cv_.notify_one();
}
}
cv_.notify_one();
}, CGRAPH_POOL_TASK_STRATEGY);
}, binding_index_);
}

CGRAPH_UNIQUE_LOCK lock(lock_);
cv_.wait(lock, [this] {
return left_num_ <= 0 || cur_status_.isErr();
return wait_num_ <= 0 || cur_status_.isErr();
});

for (auto* element : children_) {
Expand All @@ -79,14 +79,12 @@ CStatus GSome<TriggerNum>::run() {
}


template<CInt TriggerNum>
CBool GSome<TriggerNum>::isSerializable() const {
CBool GSome::isSerializable() const {
return false; // 情况较为复杂,默认不可以
}


template<CInt TriggerNum>
CVoid GSome<TriggerNum>::dump(std::ostream& oss) {
CVoid GSome::dump(std::ostream& oss) {
dumpElement(oss);
dumpGroupLabelBegin(oss);
oss << 'p' << this << "[shape=point height=0];\n";
Expand All @@ -104,23 +102,18 @@ CVoid GSome<TriggerNum>::dump(std::ostream& oss) {
}


template<CInt TriggerNum>
CBool GSome<TriggerNum>::isHold() {
CBool GSome::isHold() {
// 这里固定是不可以 hold的
return false;
}


template<CInt TriggerNum>
CStatus GSome<TriggerNum>::checkSuitable() {
CStatus GSome::checkSuitable() {
CGRAPH_FUNCTION_BEGIN
status = GElement::checkSuitable();
CGRAPH_FUNCTION_CHECK_STATUS

CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((CGRAPH_DEFAULT_LOOP_TIMES != loop_), "GSome cannot set loop > 1.")
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((0 >= TriggerNum), "trigger num must bigger than 0.")
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((children_.size() < TriggerNum), \
"this GSome need at least [" + std::to_string(TriggerNum) + "] element.")
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(std::any_of(children_.begin(), children_.end(), [](GElementPtr ptr) {
return !ptr->isAsync();
}), "GSome contains async node only.")
Expand All @@ -130,4 +123,4 @@ CStatus GSome<TriggerNum>::checkSuitable() {

CGRAPH_NAMESPACE_END

#endif //CGRAPH_GSOME_INL
#endif //CGRAPH_GSOME_CPP
17 changes: 10 additions & 7 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@
#ifndef CGRAPH_GSOME_H
#define CGRAPH_GSOME_H

#include <memory>
#include <mutex>
#include <condition_variable>

#include "../GGroup.h"

CGRAPH_NAMESPACE_BEGIN

template<CInt TriggerNum = 1>
class GSome : public GGroup {
protected:
/**
* 设定 wait_num 个数
* 当前 group 执行完成 wait_num 个后,就可以继续执行
* @return
*/
virtual CSize getWaitNum() = 0;

protected:
explicit GSome();

Expand All @@ -34,11 +40,10 @@ class GSome : public GGroup {

CGRAPH_NO_ALLOWED_COPY(GSome)

private:
CStatus addElementEx(GElementPtr element) final;
CStatus addElementEx(GElementPtr element) override;

private:
CInt left_num_ = 0; // 还剩的触发结束的个数
CInt wait_num_ {0}; // 还剩的触发结束的个数
CStatus cur_status_ ; // 记录异步时刻的当前状态信息

std::mutex lock_;
Expand All @@ -52,6 +57,4 @@ class GSome : public GGroup {

CGRAPH_NAMESPACE_END

#include "GSome.inl"

#endif //CGRAPH_GSOME_H
1 change: 1 addition & 0 deletions src/UtilsCtrl/ThreadPool/UThreadPool.inl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ auto UThreadPool::commitWithPriority(const FunctionType& func, int priority)
template<typename FunctionType>
CVoid UThreadPool::execute(FunctionType&& task, const CIndex index) {
const CIndex realIndex = dispatch(index);

if (likely(realIndex >= 0 && realIndex < config_.default_thread_size_)) {
primary_threads_[realIndex]->pushTask(std::forward<FunctionType>(task));
} else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) {
Expand Down
25 changes: 25 additions & 0 deletions tutorial/MyGSome/MySome.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/***************************
@Author: Chunel
@Contact: [email protected]
@File: MySome.h
@Time: 2026/6/24 23:23
@Desc:
***************************/

#ifndef CGRAPH_MYSOME_H
#define CGRAPH_MYSOME_H

#include "CGraph.h"

class MySome : public CGraph::GSome {
protected:
CSize getWaitNum() override {
/**
* 执行完 1个之后,当前的 GSome 就继续往后执行
* 其余的 element,会在 pipeline 结束之前,执行完成。
*/
return 1;
}
};

#endif //CGRAPH_MYSOME_H
5 changes: 3 additions & 2 deletions tutorial/T23-Some.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "MyGNode/MyNode1.h"
#include "MyGNode/MyNode2.h"
#include "MyGSome/MySome.h"

using namespace CGraph;

Expand All @@ -16,13 +17,13 @@ void tutorial_some() {
GElementPtr a, b_some, c, d = nullptr;

/**
* 创建一个类型为GSome的组,其中注入3个异步节点,其中有[1]个执行完成,则GSome执行结束
* 创建一个类型为GSome的组,其中有个别执行结束就执行完成
* 也就是说,这个 GSome 会执行1s后,就往下继续执行。效果类似增强版本的弱依赖
* 需要注意的是,当前GSome执行完成,并且今后后续的element后,
* 内部的节点仍会继续执行,故如果在节点内做参数处理,请注意考虑后续影响
* 当前pipeline的run()方法,会等到内部的内部节点全部执行完成后,才结束
*/
b_some = pipeline->createGGroup<GSome<1>>({
b_some = pipeline->createGGroup<MySome>({
pipeline->createGNode<MyNode1>(GNodeInfo("nodeB1")),
pipeline->createGNode<MyNode2>(GNodeInfo("nodeB2")),
pipeline->createGNode<MyNode2>(GNodeInfo("nodeB3"))
Expand Down
Loading