Skip to content

fix(dash-spv): split peer TcpStream and add per-peer writer task#720

Draft
xdustinface wants to merge 1 commit intov0.42-devfrom
fix/peer-concurrent-read-write
Draft

fix(dash-spv): split peer TcpStream and add per-peer writer task#720
xdustinface wants to merge 1 commit intov0.42-devfrom
fix/peer-concurrent-read-write

Conversation

@xdustinface
Copy link
Copy Markdown
Collaborator

@xdustinface xdustinface commented May 4, 2026

Peer now splits its TcpStream into independent read and write halves via tokio::io::split. The read half lives behind its own Mutex<ReadState> so the reader can frame inbound bytes without contending with senders. The write half is owned by a dedicated per-peer writer task that drains an mpsc::Sender<NetworkMessage> onto the socket. send_message now just queues into that channel and returns immediately.

This removes the single Mutex<ConnectionState> that previously serialised the reader, the maintenance ping, every distributed send, and every broadcast through the same critical section, so a long inbound Headers2 decompression no longer holds outbound pings off the wire. The reader loop also no longer has anything to gain from holding a mutating lock to call receive_message.

The Peer API is adjusted accordingly: send_message, receive_message, and handle_ping are now &self since the only state they touch lives behind Arcs. bytes_sent becomes an AtomicU64 shared with the writer task. Peer::connect_instance is retained for compatibility but routes through the same path.

Summary by CodeRabbit

  • Refactor
    • Restructured peer communication and connection management to support enhanced concurrent message handling throughout the network layer.

`Peer` now splits its `TcpStream` into independent read and write halves via `tokio::io::split`. The read half lives behind its own `Mutex<ReadState>` so the reader can frame inbound bytes without contending with senders. The write half is owned by a dedicated per-peer writer task that drains an `mpsc::Sender<NetworkMessage>` onto the socket. `send_message` now just queues into that channel and returns immediately.

This removes the single `Mutex<ConnectionState>` that previously serialised the reader, the maintenance ping, every distributed send, and every broadcast through the same critical section, so a long inbound `Headers2` decompression no longer holds outbound pings off the wire. The reader loop also no longer has anything to gain from holding a mutating lock to call `receive_message`.

The `Peer` API is adjusted accordingly: `send_message`, `receive_message`, and `handle_ping` are now `&self` since the only state they touch lives behind `Arc`s. `bytes_sent` becomes an `AtomicU64` shared with the writer task. `Peer::connect_instance` is retained for compatibility but routes through the same path.
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 4, 2026

📝 Walkthrough

Walkthrough

This PR refactors the Dash SPV network layer to decouple inbound and outbound message handling. The peer architecture shifts from a single shared socket mutex to separate read-state locking and an mpsc-bounded outbound queue with a dedicated writer task. The peer reader loop now uses read locks for message acquisition and explicitly marks peers disconnected on fatal errors.

Changes

Peer Concurrency Architecture & Manager Integration

Layer / File(s) Summary
Constants
dash-spv/src/network/constants.rs
Removed MESSAGE_POLL_INTERVAL constant, eliminating the timeout-based polling fallback.
Peer Data Shape
dash-spv/src/network/peer.rs
Peer struct refactored to replace single state: Option<Arc<Mutex<ConnectionState>>> with separate read_state: Option<Arc<Mutex<ReadState>>> (inbound) and out_tx: Option<mpsc::Sender<NetworkMessage>> (outbound queue). bytes_sent upgraded from u64 scalar to Arc<AtomicU64> for writer task updates. is_connected() now checks both fields.
Peer Core I/O & Writer Task
dash-spv/src/network/peer.rs
Refactored stream setup into install_stream() with TCP stream splitting, bounded mpsc channel creation, and background writer task spawning. Added send_message(&self) to enqueue into out_tx; receive_message(&self) now locks only read_state with strengthened framing validation (payload length against MAX_MSG_SIZE, overflow checks). Added tear_down_connection() and public mark_disconnected() semantics tied to writer task lifecycle. Changed method receivers from &mut self to &self for send_message, receive_message, handle_ping.
Reader Loop & Lock Strategy
dash-spv/src/network/manager.rs
Reader loop now acquires read lock (peer.read().await) for message acquisition via receive_message, eliminating write-lock-based polling. GetAddr and Ping handling switch to read locks for response sending; ping failures explicitly call peer.write().await.mark_disconnected() before breaking. Reader termination paths (PeerDisconnected, serialization errors) now mark peer disconnected before exiting.
Outbound Message Sending
dash-spv/src/network/manager.rs
send_message_to_peer and broadcast updated to use read locks (peer.read().await) instead of write locks when calling send_message.
Tests & API Adaptation
dash-spv/src/network/peer.rs, masternode-seeds-fetcher/src/probe.rs
Peer tests expanded with async Tokio tests validating concurrent send/receive, writer task draining, and socket-close signaling. Probe code updated to bind peer immutably (let peer = ...) matching the new API.

Sequence Diagram

sequenceDiagram
    actor Mgr as Manager/Reader
    participant P as Peer
    participant Ch as Out Channel
    participant WT as Writer Task
    participant Net as Network Socket

    Mgr->>P: send_message(msg)
    P->>Ch: enqueue NetworkMessage
    Ch-->>WT: message available
    
    par Concurrent Paths
        Mgr->>P: receive_message()
        P->>P: lock read_state
        P->>Net: read from socket
        Net-->>P: raw bytes
        P->>P: decode, validate checksum
        P-->>Mgr: Message
        P->>P: unlock read_state
    and
        WT->>Ch: dequeue NetworkMessage
        WT->>WT: encode RawNetworkMessage
        WT->>Net: write + flush
        Net-->>WT: acked
        WT->>P: increment bytes_sent (atomic)
    end

    Mgr->>P: disconnect()
    P->>P: tear_down_connection()
    P->>Ch: drop out_tx
    Ch-->>WT: channel closed
    WT->>WT: exit writer task
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐇 Async writes now hop their own lane,
while readers hold locks—no polling pain!
Atoms tick bytes, channels enqueue with grace,
one socket, two paths, concurrent embrace!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main architectural change: splitting peer TcpStream and adding a per-peer writer task. It matches the primary objective of refactoring concurrent read/write handling.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/peer-concurrent-read-write

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

@xdustinface xdustinface changed the title refactor(dash-spv): split peer TcpStream and add per-peer writer task fix(dash-spv): split peer TcpStream and add per-peer writer task May 4, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
dash-spv/src/network/peer.rs (1)

345-350: Remove unreachable error handling branches for WouldBlock and TimedOut on async reads.

With tokio::io::AsyncReadExt on ReadHalf<TcpStream>, neither WouldBlock nor TimedOut errors surface to the caller. Tokio's async runtime handles non-blocking I/O internally, and socket timeouts are managed via tokio::time::timeout() (which returns Elapsed, not TimedOut). No socket-level timeout configuration exists in the codebase. Remove lines 345–350 for clarity.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@dash-spv/src/network/peer.rs` around lines 345 - 350, The match arms handling
Err(e) where e.kind() == std::io::ErrorKind::WouldBlock and ErrorKind::TimedOut
are unreachable for async reads using tokio::io::AsyncReadExt on
ReadHalf<TcpStream>; remove those branches from the error handling in the read
routine (the function handling reads from ReadHalf<TcpStream> / the code that
matches on the read result) and rely on tokio timeouts via tokio::time::timeout
(which returns Elapsed) or propagate the error instead; update the match to only
handle real IO errors and the Ok/None cases, deleting the WouldBlock and
TimedOut arms to keep the logic correct and clear.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@dash-spv/src/network/peer.rs`:
- Around line 345-350: The match arms handling Err(e) where e.kind() ==
std::io::ErrorKind::WouldBlock and ErrorKind::TimedOut are unreachable for async
reads using tokio::io::AsyncReadExt on ReadHalf<TcpStream>; remove those
branches from the error handling in the read routine (the function handling
reads from ReadHalf<TcpStream> / the code that matches on the read result) and
rely on tokio timeouts via tokio::time::timeout (which returns Elapsed) or
propagate the error instead; update the match to only handle real IO errors and
the Ok/None cases, deleting the WouldBlock and TimedOut arms to keep the logic
correct and clear.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 5c0736c7-28fb-4719-a120-a8c9fcd3d143

📥 Commits

Reviewing files that changed from the base of the PR and between fcaf66e and 406eba4.

📒 Files selected for processing (4)
  • dash-spv/src/network/constants.rs
  • dash-spv/src/network/manager.rs
  • dash-spv/src/network/peer.rs
  • masternode-seeds-fetcher/src/probe.rs
💤 Files with no reviewable changes (1)
  • dash-spv/src/network/constants.rs

@xdustinface xdustinface marked this pull request as draft May 5, 2026 00:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant