Add makePool method on ThreadMap#283
Conversation
|
The following sections might be updated with supplementary metadata relevant to reviewers and maintainers. ReviewsSee the guideline for information on the review process.
If your review is incorrectly listed, please copy-paste ConflictsReviewers, this pull request conflicts with the following ones:
If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first. |
makePool method on ThreadMapmakePool method on ThreadMap
d56e637 to
158f30d
Compare
makePool method on ThreadMapmakePool method on ThreadMap
8050527 to
0ac739b
Compare
|
Concept ACK. This is a good idea that should make development of rust & python clients easier, while making the behavior opt-in and not affecting c++ clients or existing clients using current behavior. With this feature, we may also want to add a way to mark certain methods that can take long time before returning to not use the thread pool, so they do not block other work. These methods could create their own threads to run on, or require threads to be specified.
Actually I think this is not an issue. If a server method is called with a callbackThread value it can still pass the same value back to the client when making the callback, no matter where the server thread is running.
Yes I need to look more closely at current implementation, but would seem better to have a single queue of waiting tasks instead of giving each thread its own queue. Otherwise execution order could be very unpredictable and if a single request is slow, it could create a backlog of requests assigned to the same thread. It seems like it should be possible to have a shared queue without too much complexity, so probably worth looking into. |
|
Concept ACK. This feature significantly reduces integration burden for clients written in other programming languages. I just glanced at it, planning to review it deeply soon. One thing to note: This PR will definitely need release notes. |
|
Would be nice to have a before and after usage example (with |
|
Should I take a hack at implementing proper work delegation throughout the pool or would reviewers prefer I keep it as is for now? |
|
re: #283 (comment)
On my side, I'd love to see a more elaborate pool. I was thinking about something like https://github.com/bitcoin/bitcoin/blob/master/src/util/threadpool.h as reference |
|
Just tested 0ac739b on peer-observer's ipc-extractor with a It's working as expected and, notably, reduces the overhead of creating a |
0ac739b to
acce408
Compare
|
Re-implemented as a "shortest queue" pool, where the thread with the lowest count of pending jobs is selected, tiebreaker by lowest index. I'd say this is more than sufficient for now (I would imagine users would not have more outstanding jobs than allocated threads in the current mining use case). |
7973f8d to
a828771
Compare
ViniciusCestarii
left a comment
There was a problem hiding this comment.
Shortest-queue selection is looking good
a828771 to
baab0fe
Compare
|
Nice feedback, updated baab0fe |
xyzconstant
left a comment
There was a problem hiding this comment.
Thanks for the changes @rustaceanrob
I exercised this test again: #283 (comment) and downstream IPC tests, everything is working as expected.
I am still reviewing but I have left a couple of comments.
Note that doc/design.md needs to be updated too to reflect this new feature
| } else { | ||
| MP_LOG(loop, Log::Error) | ||
| << "IPC server error request #" << req << ", pool thread not found"; | ||
| throw std::runtime_error("pool thread not found"); |
There was a problem hiding this comment.
I'm wondering how this block could be exercised. A test for this would be great.
There was a problem hiding this comment.
This can only happen if a worker is removed from m_threads and not m_thread_pool, which only be possible if we added a shrink/resize method. I could add something like this, but for now I'm not sure if it's necessary. FWIW the previous else path also seems untested.
diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp
index 9be8742..cd5d5f4 100644
--- a/test/mp/test/test.cpp
+++ b/test/mp/test/test.cpp
@@ -598,5 +598,38 @@ KJ_TEST("Call async IPC method without thread or pool errors correctly")
KJ_EXPECT(error_thrown);
}
+KJ_TEST("Pool dispatch with torn-down slot surfaces error")
+{
+ TestSetup setup;
+ ProxyClient<messages::FooInterface>* foo = setup.client.get();
+ setup.server->m_impl->m_fn = [] {};
+
+ std::promise<void> done;
+ bool error_thrown{false};
+ foo->m_context.loop->sync([&] {
+ foo->m_context.connection->m_thread_pool.push_back(
+ {Thread::Client{KJ_EXCEPTION(FAILED, "pool slot torn down")}});
+
+ auto request{foo->m_client.callFnAsyncRequest()};
+ request.initContext();
+ foo->m_context.loop->m_task_set->add(
+ request.send().then(
+ [&](auto&&) { done.set_value(); },
+ [&](kj::Exception&& e) {
+ error_thrown = true;
+ KJ_EXPECT(std::string_view{e.getDescription().cStr()}.find(
+ "pool thread not found") != std::string_view::npos);
+ done.set_value();
+ }));
+ });
+ done.get_future().get();
+ KJ_EXPECT(error_thrown);
+}
+| if (pool[i].depth < slot->depth) slot = &pool[i]; | ||
| } | ||
| ++slot->depth; | ||
| auto guard = kj::defer([slot] { --slot->depth; }); |
There was a problem hiding this comment.
nit: KJ_DEFER macro would be more consistent IMO.
There was a problem hiding this comment.
KJ_DEFER doesn't support move assignment
There was a problem hiding this comment.
I actually meant something like this:
diff --git a/include/mp/type-context.h b/include/mp/type-context.h
index d241d95..e31ceee 100644
--- a/include/mp/type-context.h
+++ b/include/mp/type-context.h
@@ -223,9 +223,9 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
if (pool[i].depth < slot->depth) slot = &pool[i];
}
++slot->depth;
- auto guard = kj::defer([slot] { --slot->depth; });
return connection->m_threads.getLocalServer(slot->client)
- .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& pool_perhaps) mutable {
+ .then([&loop, invoke = kj::mv(invoke), req, slot](const kj::Maybe<Thread::Server&>& pool_perhaps) mutable {
+ KJ_DEFER(--slot->depth);
KJ_IF_MAYBE (pt, pool_perhaps) {
auto& pool_thread = static_cast<ProxyServer<Thread>&>(*pt);
MP_LOG(loop, Log::Debug)
@@ -236,8 +236,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
<< "IPC server error request #" << req << ", pool thread not found";
throw std::runtime_error("pool thread not found");
}
- })
- .attach(kj::mv(guard));
+ });
}
}, [&loop, req](::kj::Exception&& e) -> kj::Promise<typename ServerContext::CallContext> {
// If you see the error "(remote):0: failed: remote exception:However, this is a nit
There was a problem hiding this comment.
The diff above compiles and passes the tests, but so does this other:
diff --git a/include/mp/type-context.h b/include/mp/type-context.h
index d241d95..0a99610 100644
--- a/include/mp/type-context.h
+++ b/include/mp/type-context.h
@@ -223,7 +223,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
if (pool[i].depth < slot->depth) slot = &pool[i];
}
++slot->depth;
- auto guard = kj::defer([slot] { --slot->depth; });
return connection->m_threads.getLocalServer(slot->client)
.then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& pool_perhaps) mutable {
KJ_IF_MAYBE (pt, pool_perhaps) {
@@ -236,8 +235,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
<< "IPC server error request #" << req << ", pool thread not found";
throw std::runtime_error("pool thread not found");
}
- })
- .attach(kj::mv(guard));
+ });
}
}, [&loop, req](::kj::Exception&& e) -> kj::Promise<typename ServerContext::CallContext> {
// If you see the error "(remote):0: failed: remote exception:baab0fe to
ac910d2
Compare
Perhaps as a follow up? Or at least I'd like to get a few approach ACKs so we know this design won't change |
ac910d2 to
baab0fe
Compare
Definite approach ACK for baab0fe & sorry I haven't reviewed this more closely yet. This is a simple and elegant way of adding a thread pool while changing existing code as little as possible. My one concern about this is I don't think scheduling algorithm is very robust. But it's probably good enough for practical purposes, and it can be improved later. An example of a problem would be that if that if there are 10 threads, and a client makes 100 method calls, and 99 of the method calls return instantly, but one call waits for a new block to be connected and takes minutes to return. In that case, some of the 99 method calls which could complete instantaneously will be stuck queued behind the one slow call, even though there are threads sitting idle which could execute them. To fix this, it would be best to just add all incoming requests to a single queue, and have threads read from the common queue, instead of giving each thread its own individual queue. I asked claude to implement this to get an idea of what it would look like bd1f80c (branch) but it's a more complicated approach and completely untested and I'd be happy to save something like this for a followup. Other more minor things:
Otherwise this looks very good as far as I can tell and I do want to get this reviewed and merged. |
baab0fe to
dd05eea
Compare
|
dd05eea removes |
|
@ryanofsky, r.e. second comment, is this what you mean? diff --git a/include/mp/type-context.h b/include/mp/type-context.h
index d241d95..9751de7 100644
--- a/include/mp/type-context.h
+++ b/include/mp/type-context.h
@@ -202,7 +202,10 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
Context::Reader context_arg = Accessor::get(params);
auto thread_client = context_arg.getThread();
auto* connection = server.m_context.connection;
- auto result = connection->m_threads.getLocalServer(thread_client)
+ auto thread_promise = context_arg.hasThread()
+ ? connection->m_threads.getLocalServer(thread_client)
+ : kj::Promise<kj::Maybe<Thread::Server&>>(nullptr);
+ auto result = thread_promise
.then([&loop, invoke = kj::mv(invoke), req, connection](const kj::Maybe<Thread::Server&>& perhaps) mutable {
// If the client specified a thread, dispatch to it directly.
KJ_IF_MAYBE (thread_server, perhaps) { |
That's close but not exactly what I mean. I'd like to preserve the current "invalid thread handle" error in the case where So idea would be more like |
This patch introduces a pool of threads to the `Connection` class, and allows this pool to be populated with the thread map via `makePool`. When a client thread is not set in a request context, it is delegated to the pool. The pool is implemented as shortest-queue, where the thread with the shortest list of pending work handles the request. Tiebreaking is by lowest index. This was raised to me by Rust users, as they did not particularly care where work is executed on the server-side, but they have to set the thread regardless. ref: https://github.com/2140-dev/bitcoin-capnp-types/blob/master/tests/util/bitcoin_core.rs#L149
dd05eea to
d77e897
Compare
|
Unfortunately a larger diff but all existing code moved into the |
This patch introduces a pool of threads to the
Connectionclass, and allows this pool to be populated with the thread map viamakePool. When a client thread is not set in a request context, it is delegated to the pool. This is unable to handle the guarentees with server-invoked callbacks that the current API offers, but these callbacks are not yet present in the interface.The pool is implemented as
round-robin as it is simplestshortest queue, but perhaps the pool could be a queue of requests with work-stealing for threads that are available.This was raised to me by Rust users, as they did not particularly care where work is executed on the server-side, but they have to set the thread regardless.
Tests in Rust can be seen here: 2140-dev/bitcoin-capnp-types#24
ref: https://github.com/2140-dev/bitcoin-capnp-types/blob/master/tests/util/bitcoin_core.rs#L149
ref: #281