The Problem π΄
The error occurs intermittently because of a race condition on the m_lastPipe field:
- Poller receives subscription message β m_lastPipe is set to the subscriber's pipe (XPub.cs:398)
- OnPubSocketOnReceiveReady event fires and your code calls Subscribe(topic)
- BUT: While you're in Subscribe(), another subscription message arrives on a different subscriber
- Poller processes it β calls XRecv() again β sets m_lastPipe to the NEW pipe
- Then clears it β m_lastPipe = null (XPub.cs:271)
- Your Subscribe() call checks β if (m_manual && m_lastPipe != null) β fails because it's now null
- Falls through to generic Options.SetSocketOption() which doesn't know about Subscribe
- Throws the error!
Why It's Intermittent π²
- High message rate: More likely to have multiple messages queued
- System load: Affects thread scheduling and timing
- Pure timing: Race condition depends on which thread runs first
Key Code Locations:
βββββββββββββββββββ¬ββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β File β Lines β Issue β
βββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β XPub.cs β 265-274 β Subscribe handler - requires m_lastPipe to be non-null β
βββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β XPub.cs β 271 β m_lastPipe = null; - race condition here β
βββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β XPub.cs β 398 β XRecv() sets m_lastPipe from new messages β
βββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β SocketBase.cs β 336-340 β SetSocketOption calls XSetSocketOption, falls back if false β
βββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Options.cs β 582 β Throws error when Subscribe reaches here β
βββββββββββββββββββ΄ββββββββββ΄ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Evidence in the Code:
Even the test file acknowledges this (XPubSubTests.cs:479):
// NB Identity must be set before pub.Subscribe/Unsubscribe/Send,
// because these operations clear a private field with last subscriber
Test Objectives:
- Simulates random connect/disconnect timing between EventPublisher and 2 subscribers (ALPHA, BETA)
- Verifies EventPublisher can handle various startup orderings
- Validates that clients receive messages even if they connect later than the publisher
Current Test Parameters (Modified):
βββββββββββββββββββββββββ¬ββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββββββββ
β Parameter β Value β Notes β
βββββββββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ€
β iterations β 1 β Reduced from original 1,000 for faster feedback β
βββββββββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ€
β minDelayMs β 0 β Startup delay can begin at 0 ms β
βββββββββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ€
β maxDelayMs β 1_000 β Max 1 second random delay (original: 10s) β
βββββββββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ€
β publishIntervalMs β 100 β Publishes every 100 ms (original: 1,000 ms) β
βββββββββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ€
β iterationTimeout β 1.1s β Maximum wait for messages to arrive β
βββββββββββββββββββββββββ΄ββββββββββ΄ββββββββββββββββββββββββββββββββββββββββββββββββββ
Test Execution Flow (Per Iteration):
- Initialization:
- Find an available TCP port
- Generate 3 independent random delays (0-1,000 ms):
- Publisher startup delay
- ALPHA client startup delay
- BETA client startup delay
- Create TaskCompletionSource instances (alphaReceived, betaReceived)
- Parallel Startup (3 Async Tasks):
Publisher Task:
- Waits for publisherDelayMs
- Creates EventPublisher instance (with Moq-ed Test Logger)
- Binds to ZeroMQ socket
- Publishes "General" topic messages every 100 ms
- Runs until CancellationToken is triggered
ALPHA Client Task:
- Waits for alphaDelayMs
- Creates ClientPublishChannel instance
- Event Handler: when "General" message arrives β alphaReceived.TrySetResult(true)
- Subscribes to "General" topic
- Starts listening loop in background Task
BETA Client Task:
- Same as ALPHA, with separate topic subscription and event
- Wait & Cleanup:
allGot = Task.WhenAll(alphaReceived.Task, betaReceived.Task)
.Wait(iterationTimeout); // Max 1.1 seconds
- If both clients received at least 1 "General" message β iteration passes
- Finally block: cancel token, wait for task completion, dispose resources
The Problem π΄
The error occurs intermittently because of a race condition on the m_lastPipe field:
Why It's Intermittent π²
Key Code Locations:
βββββββββββββββββββ¬ββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β File β Lines β Issue β
βββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β XPub.cs β 265-274 β Subscribe handler - requires m_lastPipe to be non-null β
βββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β XPub.cs β 271 β m_lastPipe = null; - race condition here β
βββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β XPub.cs β 398 β XRecv() sets m_lastPipe from new messages β
βββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β SocketBase.cs β 336-340 β SetSocketOption calls XSetSocketOption, falls back if false β
βββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Options.cs β 582 β Throws error when Subscribe reaches here β
βββββββββββββββββββ΄ββββββββββ΄ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Evidence in the Code:
Even the test file acknowledges this (XPubSubTests.cs:479):
// NB Identity must be set before pub.Subscribe/Unsubscribe/Send,
// because these operations clear a private field with last subscriber
Test Objectives:
Current Test Parameters (Modified):
βββββββββββββββββββββββββ¬ββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββββββββ
β Parameter β Value β Notes β
βββββββββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ€
β iterations β 1 β Reduced from original 1,000 for faster feedback β
βββββββββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ€
β minDelayMs β 0 β Startup delay can begin at 0 ms β
βββββββββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ€
β maxDelayMs β 1_000 β Max 1 second random delay (original: 10s) β
βββββββββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ€
β publishIntervalMs β 100 β Publishes every 100 ms (original: 1,000 ms) β
βββββββββββββββββββββββββΌββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββ€
β iterationTimeout β 1.1s β Maximum wait for messages to arrive β
βββββββββββββββββββββββββ΄ββββββββββ΄ββββββββββββββββββββββββββββββββββββββββββββββββββ
Test Execution Flow (Per Iteration):
- Publisher startup delay
- ALPHA client startup delay
- BETA client startup delay
Publisher Task:
ALPHA Client Task:
BETA Client Task:
allGot = Task.WhenAll(alphaReceived.Task, betaReceived.Task)
.Wait(iterationTimeout); // Max 1.1 seconds