From cd4dc79e741a17c31058345c25a876f2baa3612e Mon Sep 17 00:00:00 2001 From: Chunel Date: Thu, 25 Jun 2026 00:40:22 +0800 Subject: [PATCH] [chg] change GSome function --- src/GraphCtrl/GraphElement/GElement.h | 2 +- src/GraphCtrl/GraphElement/GGroup/GGroup.h | 2 +- .../GGroup/GSome/{GSome.inl => GSome.cpp} | 45 ++++++++----------- .../GraphElement/GGroup/GSome/GSome.h | 17 ++++--- src/UtilsCtrl/ThreadPool/UThreadPool.inl | 1 + tutorial/MyGSome/MySome.h | 25 +++++++++++ tutorial/T23-Some.cpp | 5 ++- 7 files changed, 60 insertions(+), 37 deletions(-) rename src/GraphCtrl/GraphElement/GGroup/GSome/{GSome.inl => GSome.cpp} (69%) create mode 100644 tutorial/MyGSome/MySome.h diff --git a/src/GraphCtrl/GraphElement/GElement.h b/src/GraphCtrl/GraphElement/GElement.h index 38c8bfaa..1741d1a2 100644 --- a/src/GraphCtrl/GraphElement/GElement.h +++ b/src/GraphCtrl/GraphElement/GElement.h @@ -478,7 +478,7 @@ class GElement : public GElementObject, friend class GRegion; friend class GCondition; friend class GMutable; - template friend class GSome; + friend class GSome; template friend class GMultiCondition; friend class GPipeline; friend class GElementManager; diff --git a/src/GraphCtrl/GraphElement/GGroup/GGroup.h b/src/GraphCtrl/GraphElement/GGroup/GGroup.h index 1d81413e..6a1f50cd 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GGroup.h +++ b/src/GraphCtrl/GraphElement/GGroup/GGroup.h @@ -77,7 +77,7 @@ class GGroup : public GElement { friend class GElementRepository; friend class GStorage; template friend class GMultiCondition; - template friend class GSome; + friend class GSome; public: CStatus __addGElements_4py(const GElementPtrArr& elements); diff --git a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.cpp similarity index 69% rename from src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl rename to src/GraphCtrl/GraphElement/GGroup/GSome/GSome.cpp index d0fb5ad7..a39d2846 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl +++ b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.cpp @@ -6,23 +6,21 @@ @Desc: ***************************/ -#ifndef CGRAPH_GSOME_INL -#define CGRAPH_GSOME_INL +#ifndef CGRAPH_GSOME_CPP +#define CGRAPH_GSOME_CPP #include "GSome.h" CGRAPH_NAMESPACE_BEGIN -template -GSome::GSome() { +GSome::GSome() { element_type_ = GElementType::SOME; binding_index_ = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY; session_ = URandom<>::generateSession(CGRAPH_STR_SOME); } -template -CStatus GSome::addElementEx(GElementPtr element) { +CStatus GSome::addElementEx(GElementPtr element) { CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_INIT(false) CGRAPH_ASSERT_NOT_NULL(element) @@ -39,11 +37,12 @@ CStatus GSome::addElementEx(GElementPtr element) { } -template -CStatus GSome::run() { +CStatus GSome::run() { CGRAPH_FUNCTION_BEGIN - left_num_ = TriggerNum; // 还剩n个,就完成当前GSome的执行逻辑 + wait_num_ = static_cast(getWaitNum()); + CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(wait_num_ > children_.size(), + "num is bigger than elements size."); cur_status_.reset(); /** @@ -53,20 +52,21 @@ CStatus GSome::run() { * 4. 赋返回值 */ for (auto* element : children_) { - thread_pool_->commit([this, element] { + thread_pool_->execute([this, element] { { const auto& curStatus = element->fatProcessor(CFunctionType::RUN); CGRAPH_UNIQUE_LOCK lock(lock_); cur_status_ += curStatus; - left_num_--; + if (--wait_num_ <= 0 || cur_status_.isErr()) { + cv_.notify_one(); + } } - cv_.notify_one(); - }, CGRAPH_POOL_TASK_STRATEGY); + }, binding_index_); } CGRAPH_UNIQUE_LOCK lock(lock_); cv_.wait(lock, [this] { - return left_num_ <= 0 || cur_status_.isErr(); + return wait_num_ <= 0 || cur_status_.isErr(); }); for (auto* element : children_) { @@ -79,14 +79,12 @@ CStatus GSome::run() { } -template -CBool GSome::isSerializable() const { +CBool GSome::isSerializable() const { return false; // 情况较为复杂,默认不可以 } -template -CVoid GSome::dump(std::ostream& oss) { +CVoid GSome::dump(std::ostream& oss) { dumpElement(oss); dumpGroupLabelBegin(oss); oss << 'p' << this << "[shape=point height=0];\n"; @@ -104,23 +102,18 @@ CVoid GSome::dump(std::ostream& oss) { } -template -CBool GSome::isHold() { +CBool GSome::isHold() { // 这里固定是不可以 hold的 return false; } -template -CStatus GSome::checkSuitable() { +CStatus GSome::checkSuitable() { CGRAPH_FUNCTION_BEGIN status = GElement::checkSuitable(); CGRAPH_FUNCTION_CHECK_STATUS CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((CGRAPH_DEFAULT_LOOP_TIMES != loop_), "GSome cannot set loop > 1.") - CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((0 >= TriggerNum), "trigger num must bigger than 0.") - CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((children_.size() < TriggerNum), \ - "this GSome need at least [" + std::to_string(TriggerNum) + "] element.") CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(std::any_of(children_.begin(), children_.end(), [](GElementPtr ptr) { return !ptr->isAsync(); }), "GSome contains async node only.") @@ -130,4 +123,4 @@ CStatus GSome::checkSuitable() { CGRAPH_NAMESPACE_END -#endif //CGRAPH_GSOME_INL \ No newline at end of file +#endif //CGRAPH_GSOME_CPP \ No newline at end of file diff --git a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h index 64542e69..f8c8b0b5 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h +++ b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h @@ -9,7 +9,6 @@ #ifndef CGRAPH_GSOME_H #define CGRAPH_GSOME_H -#include #include #include @@ -17,8 +16,15 @@ CGRAPH_NAMESPACE_BEGIN -template class GSome : public GGroup { +protected: + /** + * 设定 wait_num 个数 + * 当前 group 执行完成 wait_num 个后,就可以继续执行 + * @return + */ + virtual CSize getWaitNum() = 0; + protected: explicit GSome(); @@ -34,11 +40,10 @@ class GSome : public GGroup { CGRAPH_NO_ALLOWED_COPY(GSome) -private: - CStatus addElementEx(GElementPtr element) final; + CStatus addElementEx(GElementPtr element) override; private: - CInt left_num_ = 0; // 还剩的触发结束的个数 + CInt wait_num_ {0}; // 还剩的触发结束的个数 CStatus cur_status_ ; // 记录异步时刻的当前状态信息 std::mutex lock_; @@ -52,6 +57,4 @@ class GSome : public GGroup { CGRAPH_NAMESPACE_END -#include "GSome.inl" - #endif //CGRAPH_GSOME_H diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.inl b/src/UtilsCtrl/ThreadPool/UThreadPool.inl index 6b4e9ad1..850b6f89 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.inl +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.inl @@ -57,6 +57,7 @@ auto UThreadPool::commitWithPriority(const FunctionType& func, int priority) template CVoid UThreadPool::execute(FunctionType&& task, const CIndex index) { const CIndex realIndex = dispatch(index); + if (likely(realIndex >= 0 && realIndex < config_.default_thread_size_)) { primary_threads_[realIndex]->pushTask(std::forward(task)); } else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) { diff --git a/tutorial/MyGSome/MySome.h b/tutorial/MyGSome/MySome.h new file mode 100644 index 00000000..66eae2d2 --- /dev/null +++ b/tutorial/MyGSome/MySome.h @@ -0,0 +1,25 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: MySome.h +@Time: 2026/6/24 23:23 +@Desc: +***************************/ + +#ifndef CGRAPH_MYSOME_H +#define CGRAPH_MYSOME_H + +#include "CGraph.h" + +class MySome : public CGraph::GSome { +protected: + CSize getWaitNum() override { + /** + * 执行完 1个之后,当前的 GSome 就继续往后执行 + * 其余的 element,会在 pipeline 结束之前,执行完成。 + */ + return 1; + } +}; + +#endif //CGRAPH_MYSOME_H diff --git a/tutorial/T23-Some.cpp b/tutorial/T23-Some.cpp index 2b5f5488..1253e6f6 100644 --- a/tutorial/T23-Some.cpp +++ b/tutorial/T23-Some.cpp @@ -8,6 +8,7 @@ #include "MyGNode/MyNode1.h" #include "MyGNode/MyNode2.h" +#include "MyGSome/MySome.h" using namespace CGraph; @@ -16,13 +17,13 @@ void tutorial_some() { GElementPtr a, b_some, c, d = nullptr; /** - * 创建一个类型为GSome的组,其中注入3个异步节点,其中有[1]个执行完成,则GSome执行结束 + * 创建一个类型为GSome的组,其中有个别执行结束就执行完成 * 也就是说,这个 GSome 会执行1s后,就往下继续执行。效果类似增强版本的弱依赖 * 需要注意的是,当前GSome执行完成,并且今后后续的element后, * 内部的节点仍会继续执行,故如果在节点内做参数处理,请注意考虑后续影响 * 当前pipeline的run()方法,会等到内部的内部节点全部执行完成后,才结束 */ - b_some = pipeline->createGGroup>({ + b_some = pipeline->createGGroup({ pipeline->createGNode(GNodeInfo("nodeB1")), pipeline->createGNode(GNodeInfo("nodeB2")), pipeline->createGNode(GNodeInfo("nodeB3"))