Add optional Boost.Asio coroutine/networking backend (with multi-threading and vsock bridging)#84
Conversation
Introduce a SUBSPACE_CORO_BACKEND build switch (Bazel string_flag + CMake option) selecting between the existing `co` coroutine library and boost.asio, and a new common/async abstraction layer (Context, AsyncRuntime, WaitReadable/WaitEither/Sleep, and a StreamSocket/UDPSocket facade) that compiles against either backend. Port the server to route all coroutines, spawns, waits, sleeps, and bridge socket I/O through the abstraction instead of co::self / scheduler_ directly. The co backend remains the default with identical behavior; the asio socket facade and runtime are unit-tested on both backends.
Implement async::UnixSocket for the asio backend with SCM_RIGHTS fd passing, replicating toolbelt's wire framing exactly so an asio client interoperates with a co server. Add a loopback message + fd-passing unit test that passes under both backends. Port the client additively: the co backend is unchanged (async::UnixSocket aliases toolbelt::UnixSocket on co, and socket I/O routes through a new SocketContext() helper returning co_). The asio backend gets an additional ClientImpl(async::Context) constructor and asio branches in the Wait* methods. Existing code using a co coroutine pointer still compiles and behaves identically.
Add additive Server(boost::asio::io_context&, ...) constructors guarded to the asio backend; the existing Server(co::CoroutineScheduler&, ...) constructors are unchanged so code that builds a Server with a scheduler keeps compiling and working on co. Make the UDS listener and ClientHandler backend-agnostic: socket_ is now async::UnixSocket (a toolbelt alias on co) and ClientHandler::Run takes the Context that drives its socket I/O (equal to co::self on co). Thread the Context through TransmitDiscovery and SendSubscribeMessage; on co these keep their blocking, non-interleaving sends, on asio they use cooperative non-blocking sends. The retirement fd read uses WaitReadable + raw read on asio. co server/client/bridge tests pass unchanged.
Builds on the additive Boost.Asio backend with the remaining server/client work and a new vsock bridge transport: - AsyncRuntime: multi-thread the io_context (--num_asio_threads), confine client handlers, the UDS listener, and shared-state coroutines to a strand (no mutexes), and add graceful shutdown via re-emitted cancellation signals (the asio analogue of the co backend's interrupt fd). - Client: allow asio clients with no yield_context to run in blocking mode so existing non-coroutine client code is unchanged. - Tests: backend-agnostic test harness (TestCoroMachine/RuntimeEngine), port client_test/stress_test, add asio_client_test, and add a multi-threaded bridge stress test with self-validating payloads. - vsock bridging: add --vsock/--vsock_cid flags and Server::SetVsockBridging, carry the address family in the discovery protocol (ChannelAddress.family) so bridge endpoints are self-describing, bind/connect bridge and retirement sockets over AF_VSOCK, and add a Linux-gated vsock loopback bridge test. - Docs: document the asio backend, multi-threading, and vsock enablement. Discovery still runs over IP; only the per-channel bridge data connections move to vsock. The co backend is unchanged and remains the default.
The vsock bridge integration test probed for vsock loopback by binding (VMADDR_CID_ANY, VMADDR_PORT_ANY), an unprivileged ephemeral port that succeeds even for non-root processes (e.g. GitHub Actions runners). The server's bridge listener, however, binds (VMADDR_CID_ANY, port 0), and vsock port 0 is privileged, so the server's bind fails with EPERM for an unprivileged user. The probe therefore reported "available" while the server could not actually bind, and the test timed out waiting for the bridge. Make the probe bind port 0 exactly like the server so it skips precisely when the server would be unable to establish the bridge, and runs only where it actually works (root / VM).
server.cc and client.cc call subspace::async::WaitReadable/WaitEither/ Sleep, whose co-backend definitions live in the subspace_async library (common/async/wait.cc). Neither subspace_client nor libserver linked it, so the CMake build failed with undefined references in subspace_server, client_test, stress_test, and latency_test. (Bazel was unaffected since its deps were already correct.) Add subspace_async as a PUBLIC dependency of both libraries so all downstream targets pick up the async symbols transitively.
Publisher::Wait / Subscriber::Wait (and the underlying ClientImpl WaitForReliablePublisher / WaitForSubscriber) only accepted a co::Coroutine*, so an asio caller had no way to cooperatively wait on a caller-supplied yield_context the way a co caller passes a coroutine. Add async::Context overloads of these methods. On the asio backend they wait cooperatively on the supplied context via async::WaitReadable / WaitEither (matching the existing zero-timeout = wait-forever and no-timeout-for-fd-pair semantics). They are guarded for the asio backend only, since on the co backend async::Context is co::Coroutine* and the overloads would collide with the existing ones.
Previously the only asio-backend CI coverage was building/running //common/async:async_test on Linux and macOS; the full client/server suite never ran in asio mode, and Android only ran the co backend. - Expand the asio-backend job (Linux + macOS) to also run split_buffer_test, client_test, bridge_test, and server_test under --//:coro_backend=asio, exercising the real client/server port end to end. - Add an android-asio job that cross-compiles the backend-dependent targets for android_x86_64 with the asio backend and runs them on the emulator via the new android-asio-test.sh script.
Under the asio backend with >1 io_context thread, the bridge transmitter and receiver coroutines run on the parallel io_context but were touching strand-confined server state directly: channels_.find() and RemoveBridgedAddress() (which mutate the channels_ / bridged_publishers_ absl::flat_hash_maps owned by the discovery coroutines on the client strand). Concurrent hash-map access tripped Abseil's reentrancy assertion (raw_hash_set.h) and corrupted the maps, manifesting as wedged bridges, dropped messages and hangs. Single-threaded runs were immune because the strand and io_context never truly overlap with one thread. Keep the bridge data plane parallel (subscriber shared-memory reads, socket I/O and lock-free GetCounters are all thread-safe) and serialize only the server-container bookkeeping onto the strand: - Add AsyncRuntime::RunOnStrand(fn): post fn to the client strand (asio) or run it inline (co, single-threaded). - Snapshot the channel metadata the bridge transmitter needs into a BridgeChannelInfo on the strand at spawn time, so the transmitter no longer dereferences the ServerChannel for slot size, retirement fds, split-buffer options, etc. - Compute the receiver's local split-buffer preference on the strand and pass it in, removing the off-strand channels_.find(). - Post the teardown RemoveBridgedAddress (transmitter and receiver BridgeGuard) back onto the strand, looking the channel up by name. Also includes the wait-primitive cancellation-safety fixes (notifier steady_timer + poll fast-path in wait.cc/socket.cc, and the SpawnTracked completion handler running inline on the strand) that share runtime.h.
WaitFdReady (socket.cc) and WaitReadable/WaitEither (wait.cc) suspend the coroutine on a sentinel steady_timer (`notifier`) and arm the real descriptor wait with a completion handler that wakes the notifier via notifier.cancel(). That handler was unbound, so under a multi-threaded io_context the reactor could run it on another thread the instant the fd became ready - before the coroutine had even registered notifier.async_wait(). The cancel then cancelled nothing and the coroutine suspended forever on the never-expiring notifier: a missed wakeup that hung whatever was waiting (observed as a client's CreatePublisher blocking in recvfrom while the server sat idle). Single-threaded runs were immune because the reactor cannot run a completion until the coroutine yields. Fix: - Bind the descriptor (and timeout-timer) completions to the coroutine's own executor (boost::asio::bind_executor(ctx.get_executor(), ...)) so the cancel is serialized with the async_wait registration and can never be lost. This requires the coroutine to run on a strand. - Client handlers, the listener and the discovery coroutines already run on the shared client strand. Give each bridge transmitter/receiver and the retirement coroutines their own private strand via a new AsyncRuntime::SpawnOnNewStrand(): they still run in parallel across io_context threads (bridge load spreads across cores) but each one's internal fd-wait handshake is serialized. Also make the multi-threaded bridge stress test deterministic: size the channel so the 1000-message burst fits without the ring buffer wrapping. A small buffer let a transmitter that was starved for the whole burst under heavy contention miss an entire channel and deliver zero messages - legitimate unreliable behaviour, but it made the per-channel liveness check flaky. The checksum / ordering / cross-channel correctness checks (the actual point of the test) are unchanged and still exercised. MultiThreadedBridging now passes 40/40 under asio; bridge_test and client_test pass under both asio and co backends.
IncomingSubscribe registered the bridged-publisher entry keyed on the discovery `sender` address, but the bridge transmitter removed it using the `subscriber` (bridge data-receiver) address. The keys never matched, so the remove was a silent no-op and the channel stayed permanently flagged as bridged, blocking re-establishment after teardown. Thread the discovery `sender` into BridgeTransmitterCoroutine and remove by `sender` (matching the add). Replace the manual end-of-stream teardown with a BridgeGuard (mirroring the receiver) so every exit path - normal end-of-stream and the early returns on connect/handshake/bind failures - cleans up the entry; previously those early returns leaked it entirely. The erase is still posted to the client strand and looks the channel up by name.
MultiThreadedBridging sizes its channels to hold the whole burst (kNumSlots ~= kNumMessages = 1024) so the multi-threaded data plane can be exercised without ring-buffer wrap. Split buffers allocate one shared memory file per slot, so under the CI's --use_split_buffers=true variant kNumChannels x kNumSlots files exhausted the process fd limit (the macos runners default low), which surfaced as "Too many open files" creating slot ~960 and a cascade of bind / fd-read failures in unrelated bridge tests sharing the process. Explicitly opt this stress test out of split buffers (regular channels use a single shared-memory region for all slots). The test targets the threaded bridge data plane, not split buffers, which are covered by the dedicated BridgeSplitSource/BridgeSplitRemote tests.
The android-asio lane runs the asio backend on a GitHub-hosted 2-core
swiftshader x86_64 emulator. Two environmental failure modes showed up
that are unrelated to the backend logic (which passes the full asio
bridge_test on the real Linux/macOS runners and locally):
- timing flakes: the thread-multiplexed asio backend is far more
sensitive to the emulator's scheduling latency than the cooperative
co backend, so timing-sensitive tests occasionally miss a deadline;
- transient infra: a corrupted Android system-image download once left
the emulator unable to boot.
Wrap each emulator test invocation in a small retry (3 attempts) to
absorb both, and exclude the multi-threaded bridge stress test on the
emulator - it spawns 4 io-context threads x 8 channels x 1000 messages,
which is ill-matched to 2 cores and the heaviest flake source here; its
multi-core coverage runs on the real asio-backend runners.
The wait primitives suspended the coroutine on a sentinel steady_timer whose operation was bound to the coroutine's cancellation slot. During graceful shutdown EmitCancellations() re-emits each coroutine's signal every 2ms; emitting onto that timer operation invoked asio's per-op timer cancellation, which races the timer's own natural completion and corrupts the timer queue (an intermittent SIGSEGV in timer_queue::pop() seen during BridgeTest.MultipleRetirement shutdown). Route shutdown cancellation through a single idempotent wake() instead: each wait now installs its own handler on the coroutine's cancellation slot that pokes the sentinel timer exactly once, and waits on the timer with an unconnected slot so asio never installs per-operation timer cancellation. Re-emitting is now harmless - it only ever triggers the idempotent waker. Applied to WaitReadable, WaitEither, Sleep (common/async/wait.cc) and WaitFdReady (common/async/socket.cc). EmitCancellations is correspondingly simplified back to a plain post-to-executor emit (the inflight-gating workaround is no longer needed). Verified: 600x MultipleRetirement(2), 40x multi-threaded bridge stress, and full bridge_test/client_test green under asio; bridge_test green under co.
The asio branch of RetirementCoroutine did a blocking ::read() of the retirement fd on the io_context thread after WaitReadable returned. If WaitReadable ever reports readiness that this reader cannot satisfy with a full record - a spurious epoll edge, or a second retirement coroutine spawned for the same channel after the bridge re-establishes consuming the byte first - the blocking read freezes the entire single-threaded server. Every client handler, discovery and bridge coroutine then stalls behind it, which manifested as an intermittent ~300s hang during client teardown (BridgeTest.MultipleRetirement) on the Linux/Android CI runners. Make the retirement fd non-blocking and treat EAGAIN as "wait again", exactly like the socket facade, so the io_context thread is never blocked. Reproduced on an arm64 Linux container (hung on the first attempt); with this fix it survives 60 back-to-back runs and the 100x bazel stress run no longer times out. co backend unchanged (300x still green).
Five BridgeTest cases (Basic, TwoSubs, BasicRetirement, MultipleRetirement,
MultipleRetirement2) all shared the channel name "/bridged_channel", while
every other bridge test already uses a unique name. Because the servers
persist across the whole suite, one test's bridge/channel teardown could
overlap the next test's setup on the same channel. Under the asio backend's
parallel, loaded scheduling this races in two ways:
* slot ids shift (residual slots from a prior test push the publisher into
unexpected slots), breaking MultipleRetirement's hard-coded slot
expectations - ~4/100 under load; and
* a new bridge subscriber tries to open shared memory that the previous
test is unlinking ("No such file or directory"), the bridge dies, and the
test blocks forever in sub->Wait() - a rare full-suite hang.
The co backend's deterministic, single-threaded scheduling hid both (300x
green). Give each of the five tests its own channel, matching the
convention of the rest of the file. Verified on an arm64 Linux container
under the asio backend: 200x and 400x bazel stress plus concurrent-load runs
all pass with max 0.2s (previously flaked / hung); co backend still green.
SubspaceTestBase started the in-process server with Run() (single thread), so the StressTest and client test suites never exercised the multi-threaded asio server path. Run the io_context on 4 threads under the asio backend so CI continuously verifies the multi-threaded server: client handlers stay serialized on the client strand while the remaining coroutines spread across threads. The co backend is unchanged (guarded, and it ignores the count). Verified under asio with 4 threads: full stress suite passes 4x (~231-235s each) and client_test passes; co backend unaffected.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a build-time-selectable Boost.Asio backend for the server (and the client paths it relies on) alongside the existing
cobackend, behind a commonsubspace::asyncabstraction. Thecobackend remains the default and is unchanged; acoserver and anasioserver interoperate, and existing client code compiles unmodified.Select the backend with
--//:coro_backend=asio(Bazel) or-DSUBSPACE_CORO_BACKEND=asio(CMake).Highlights:
common/async/):Context,AsyncRuntime,WaitReadable/WaitEither/Sleep, and aStreamSocket/UDPSocket/UnixSocketfacade (toolbelt forco, Boost.Asio forasio) with identical wire framing and SCM_RIGHTS fd passing.--num_asio_threadsruns theio_contexton N threads. Client handlers, the UDS listener, and all shared-state coroutines are confined to a strand (no mutexes); bridge data-plane coroutines spread across threads.cancellation_signals drain coroutines cleanly (the analogue of the co backend's interrupt fd).yield_contextrun in blocking mode, so non-coroutine client code needs no changes.--vsock/--vsock_cid(andServer::SetVsockBridging) carry per-channel bridge + retirement connections overAF_VSOCK. The discovery protocol'sChannelAddressnow has a self-describingfamilyfield (defaults to INET for backward compatibility). Discovery still runs over IP; only bridge data connections move to vsock.client_test/stress_testported; newasio_client_test; a multi-threaded bridge stress test with self-validating payloads; and a Linux-gated vsock loopback bridge test.docs/asio-backend.mdcovering backend selection, multi-threading, and vsock enablement.Test plan
bazelisk test //client:bridge_test //client:client_test //client:stress_test //server:server_testpass under both--//:coro_backend=coand--//:coro_backend=asio(macOS).bazelisk test //client:bridge_test --test_filter='VsockBridgeTest.*' --test_output=all(and with--//:coro_backend=asio). Requires thevsock_loopbackkernel module.-DSUBSPACE_CORO_BACKEND=asio).