diff --git a/packages/api/src/client/BaseSubstrateClient.ts b/packages/api/src/client/BaseSubstrateClient.ts index 0ab5bbbc..26de33fe 100644 --- a/packages/api/src/client/BaseSubstrateClient.ts +++ b/packages/api/src/client/BaseSubstrateClient.ts @@ -8,7 +8,7 @@ import { RuntimeVersion, StorageDataLike, } from '@dedot/codecs'; -import { type JsonRpcProvider, WsProvider } from '@dedot/providers'; +import { type JsonRpcProvider, MaxRetryAttemptedError, WsProvider } from '@dedot/providers'; import { type IStorage, LocalStorage } from '@dedot/storage'; import { Callback, @@ -30,6 +30,7 @@ import { Deferred, ensurePresence as _ensurePresence, HexString, + JsonRpcV2NotSupportedError, LRUCache, noop, u8aToHex, @@ -346,12 +347,26 @@ export abstract class BaseSubstrateClient< this.on('disconnected', this.onDisconnected); return new Promise((resolve, reject) => { + // Only reject on initialization errors (via the internal 'initError' event) + // or when the provider gives up reconnecting (MaxRetryAttemptedError). + // Transient provider errors (e.g., socket errors while the provider is retrying + // another endpoint) should not reject the pending connect() promise — terminal + // provider errors already reject through provider.connect() in JsonRpcClient. + const doReject = (err: unknown) => { + offInitError(); + offError(); + reject(err instanceof Error ? err : new Error(String(err))); + }; + + // @ts-ignore + const offInitError = this.on('initError', doReject); // @ts-ignore const offError = this.on('error', (err: unknown) => { - reject(err instanceof Error ? err : new Error(String(err))); + if (err instanceof MaxRetryAttemptedError) doReject(err); }); // @ts-ignore this.once('ready', () => { + offInitError(); offError(); resolve(this); }); @@ -361,12 +376,32 @@ export abstract class BaseSubstrateClient< protected onConnected = async () => { try { await this.initialize(); - } catch (e) { - // @ts-ignore — surface init failure so the pending connect() promise can reject + } catch (e: any) { + // @ts-ignore — keep emitting 'error' for external observability this.emit('error', e); + + // If the endpoint misbehaves during initialization and the provider supports + // automatic retry, switch over to a different endpoint and re-initialize there + // instead of giving up on the current one + if (this.shouldSwitchEndpointOnInitError(e)) { + console.warn('Failed to initialize the client, switching to a different endpoint...', e); + (this.provider as WsProvider).disconnect(true).catch(noop); + return; + } + + // @ts-ignore — surface init failure so the pending connect() promise can reject + this.emit('initError', e); } }; + protected shouldSwitchEndpointOnInitError(e: any): boolean { + // JSON-RPC v2 not supported is a terminal error, switching endpoint won't help + // and DedotClient relies on it to fall back to legacy JSON-RPC + if (e instanceof JsonRpcV2NotSupportedError) return false; + + return this.provider instanceof WsProvider && this.provider.retryEnabled; + } + protected onDisconnected = async () => {}; protected async initialize() { diff --git a/packages/api/src/client/LegacyClient.ts b/packages/api/src/client/LegacyClient.ts index 14726f1a..12f1fe67 100644 --- a/packages/api/src/client/LegacyClient.ts +++ b/packages/api/src/client/LegacyClient.ts @@ -244,6 +244,10 @@ export class LegacyClient / if (!stalingDetectionFn) return; this.block.best(stalingDetectionFn); + + // Arm the watchdog immediately after initialization so an endpoint that stops + // producing blocks right after connecting is still detected (don't wait for the next block) + stalingDetectionFn(); } #subscribeUpdates() { diff --git a/packages/api/src/client/V2Client.ts b/packages/api/src/client/V2Client.ts index e7ab2484..a5d8517d 100644 --- a/packages/api/src/client/V2Client.ts +++ b/packages/api/src/client/V2Client.ts @@ -260,6 +260,10 @@ export class V2Client // pr if (!stalingDetectionFn) return; this.chainHead.on('bestBlock', stalingDetectionFn); + + // Arm the watchdog immediately after initialization so an endpoint that stops + // producing blocks right after connecting is still detected (don't wait for the next block) + stalingDetectionFn(); } protected override async beforeDisconnect(): Promise { diff --git a/packages/api/src/client/__tests__/BaseSubstrateClient.spec.ts b/packages/api/src/client/__tests__/BaseSubstrateClient.spec.ts index 7c87afca..e834c39f 100644 --- a/packages/api/src/client/__tests__/BaseSubstrateClient.spec.ts +++ b/packages/api/src/client/__tests__/BaseSubstrateClient.spec.ts @@ -1,12 +1,43 @@ +import { type JsonRpcProvider, MaxRetryAttemptedError, WsProvider } from '@dedot/providers'; import { type IStorage } from '@dedot/storage'; +import { JsonRpcV2NotSupportedError } from '@dedot/utils'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { BaseSubstrateClient } from '../BaseSubstrateClient.js'; import MockProvider from './MockProvider.js'; +// A WsProvider that never opens a real socket - used to test endpoint switching behavior +class MockWsProvider extends WsProvider { + disconnectCalls: boolean[] = []; + failOnSwitch: boolean = false; + + constructor() { + super('ws://127.0.0.1:1'); + } + + async connect(): Promise { + this._setStatus('connected'); + return this; + } + + async disconnect(switchEndpoint?: boolean): Promise { + this.disconnectCalls.push(!!switchEndpoint); + + if (this.failOnSwitch) { + this.emit('error', new MaxRetryAttemptedError('Cannot reconnect to network after 3 retry attempts')); + this._setStatus('disconnected'); + return; + } + + // Simulate an automatic reconnection to a different endpoint + this._setStatus('disconnected'); + setTimeout(() => this._setStatus('connected'), 10); + } +} + // Create a mock implementation of BaseSubstrateClient for testing class MockBaseSubstrateClient extends BaseSubstrateClient { - constructor() { - super('v2', new MockProvider()); + constructor(provider: JsonRpcProvider = new MockProvider()) { + super('v2', provider); } // Expose protected methods for testing @@ -374,4 +405,65 @@ describe('BaseSubstrateClient', () => { expect(apiAtCacheClearSpy).toHaveBeenCalledTimes(1); }); }); + + describe('connect() error semantics', () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('does not reject connect() on transient provider errors before connection', async () => { + const client = new MockBaseSubstrateClient(); + const provider = client.provider as MockProvider; + + // Simulate a provider that hits a transient socket error first, + // then successfully connects (e.g., retrying a different endpoint) + vi.spyOn(provider, 'connect').mockImplementation(async () => { + provider.emit('error', new Error('transient socket error')); + setTimeout(() => provider.setStatus('connected'), 10); + return provider; + }); + + await expect(client.connect()).resolves.toBe(client); + }); + + it('rejects connect() when initialization fails', async () => { + const client = new MockBaseSubstrateClient(); + vi.spyOn(client as any, 'initialize').mockRejectedValue(new Error('init failed')); + + await expect(client.connect()).rejects.toThrow('init failed'); + }); + + it('switches endpoint when initialization fails and provider retry is enabled', async () => { + const provider = new MockWsProvider(); + const client = new MockBaseSubstrateClient(provider); + + const doInitSpy = vi + .spyOn(client as any, 'doInitialize') + .mockRejectedValueOnce(new Error('weird endpoint')); + + await expect(client.connect()).resolves.toBe(client); + expect(provider.disconnectCalls).toEqual([true]); + expect(doInitSpy).toHaveBeenCalledTimes(2); + }); + + it('does not switch endpoint when JSON-RPC v2 is not supported', async () => { + const provider = new MockWsProvider(); + const client = new MockBaseSubstrateClient(provider); + + vi.spyOn(client as any, 'doInitialize').mockRejectedValue(new JsonRpcV2NotSupportedError('v2 not supported')); + + await expect(client.connect()).rejects.toThrow(JsonRpcV2NotSupportedError); + expect(provider.disconnectCalls).toEqual([]); + }); + + it('rejects connect() when max retry attempts exhausted while switching endpoints', async () => { + const provider = new MockWsProvider(); + provider.failOnSwitch = true; + const client = new MockBaseSubstrateClient(provider); + + vi.spyOn(client as any, 'doInitialize').mockRejectedValue(new Error('weird endpoint')); + + await expect(client.connect()).rejects.toThrow(MaxRetryAttemptedError); + }); + }); }); diff --git a/packages/api/src/client/__tests__/V2Client.spec.ts b/packages/api/src/client/__tests__/V2Client.spec.ts index cd218371..fe2edd07 100644 --- a/packages/api/src/client/__tests__/V2Client.spec.ts +++ b/packages/api/src/client/__tests__/V2Client.spec.ts @@ -82,6 +82,19 @@ describe( simulator && (await simulator.cleanup()); }); + it('arms staling detection immediately after initialization', async () => { + const api = new V2Client({ provider }); + const armFn = vi.fn(); + vi.spyOn(api as any, 'getStalingDetectionFn').mockReturnValue(armFn); + + await api.connect(); + + // The watchdog must be armed right after init, not only when the next block arrives + expect(armFn).toHaveBeenCalled(); + + await api.disconnect(); + }); + describe('cache disabled', () => { let api: V2Client; beforeEach(async () => { diff --git a/packages/api/src/json-rpc/group/ChainHead/ChainHead.ts b/packages/api/src/json-rpc/group/ChainHead/ChainHead.ts index a26fa993..c216b80a 100644 --- a/packages/api/src/json-rpc/group/ChainHead/ChainHead.ts +++ b/packages/api/src/json-rpc/group/ChainHead/ChainHead.ts @@ -1,5 +1,5 @@ import { $Header, BlockHash, Header, Option } from '@dedot/codecs'; -import { type JsonRpcSubscription, NetworkDisconnectedError } from '@dedot/providers'; +import { type JsonRpcSubscription, NetworkDisconnectedError, WsProvider } from '@dedot/providers'; import type { AsyncMethod, Unsub } from '@dedot/types'; import type { ArchiveStorageResult, @@ -72,6 +72,12 @@ const MIN_FINALIZED_QUEUE_SIZE = 10; // finalized queue size const CHAINHEAD_CACHE_CAPACITY = 256; const CHAINHEAD_CACHE_TTL = 30_000; // 30 seconds +// Number of times to attempt re-following the chain head after a `stop` event +// before giving up and switching to a different endpoint (if possible) +const STOP_RECOVERY_MAX_ATTEMPTS = 3; +// Delay between re-follow attempts after a `stop` event +const STOP_RECOVERY_RETRY_DELAY_MS = 1_000; + export class ChainHead extends JsonRpcGroup { #unsub?: Unsub; #subscriptionId?: string; @@ -282,6 +288,45 @@ export class ChainHead extends JsonRpcGroup { } } + /** + * Attempt to re-follow the chain head with a bounded number of retries. + * Used to recover from a `stop` event - transient failures get a few chances + * before we give up and let the caller escalate (e.g. switch endpoint). + */ + async #reFollow(maxAttempts: number = STOP_RECOVERY_MAX_ATTEMPTS): Promise { + let lastError: any; + + for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { + try { + await this.#doFollow(); + return; + } catch (e: any) { + lastError = e; + console.error(`ChainHead re-follow attempt ${attempt}/${maxAttempts} failed`, e); + + if (attempt < maxAttempts) { + await waitFor(STOP_RECOVERY_RETRY_DELAY_MS); + } + } + } + + throw lastError ?? new ChainHeadError('Cannot recover from stop event!'); + } + + /** + * Escalate an unrecoverable `stop` event by switching to a different endpoint. + * Only effective when backed by a WsProvider with retry enabled - the provider + * will reconnect to a different endpoint and the client will re-initialize there. + */ + protected switchEndpoint(): void { + const provider = this.client.provider; + + if (provider instanceof WsProvider && provider.retryEnabled) { + console.warn('Cannot recover ChainHead after a stop event, switching to a different endpoint...'); + provider.disconnect(true).catch(noop); + } + } + /** * Retry pending requests that were queued during network disconnection */ @@ -488,9 +533,13 @@ export class ChainHead extends JsonRpcGroup { // So any requests/operations coming to the chainHead should be put on waiting // for the #recovering promise to resolve this.#recovering = deferred(); + // Guard against an unhandled rejection: on recovery failure this promise is + // rejected, but it only has a consumer when an operation is in flight (#ensureFollowed). + // The no-op handler is harmless for real awaiters, who still receive the rejection. + this.#recovering.promise.catch(noop); - // 2. Attempt to re-follow the chainHead - this.#doFollow() + // 2. Attempt to re-follow the chainHead (with bounded retries) + this.#reFollow() .then(() => { // 3. Resolve the recovering promise // This means to continue all pending requests while the chainHead started recovering mode at step 1. @@ -510,13 +559,16 @@ export class ChainHead extends JsonRpcGroup { }) .catch((e: any) => { console.error(e); - // TODO we should retry a few attempts this.#recovering?.reject(new ChainHeadError('Cannot recover from stop event!')); Object.values(this.#handlers).forEach(({ defer, operationId }) => { defer.reject(new ChainHeadError('Cannot recover from stop event!')); delete this.#handlers[operationId]; }); + + // Re-follow exhausted all attempts on the current endpoint; + // escalate by switching to a different endpoint (if supported) + this.switchEndpoint(); }) .finally(() => { // cleaning up diff --git a/packages/api/src/json-rpc/group/__tests__/ChainHead.spec.ts b/packages/api/src/json-rpc/group/__tests__/ChainHead.spec.ts index a10401f0..40a298e4 100644 --- a/packages/api/src/json-rpc/group/__tests__/ChainHead.spec.ts +++ b/packages/api/src/json-rpc/group/__tests__/ChainHead.spec.ts @@ -1445,6 +1445,71 @@ describe('ChainHead', () => { 'storage02', ]); }); + + it('retries re-follow and switches endpoint after exhausting attempts', async () => { + const switchSpy = vi.spyOn(chainHead as any, 'switchEndpoint').mockImplementation(() => {}); + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + // Every re-follow attempt fails + provider.setRpcRequests({ + chainHead_v1_follow: () => { + throw new Error('follow failed'); + }, + }); + + // Only count follow calls triggered by the stop recovery (ignore the initial follow) + providerSend.mockClear(); + + // Trigger a stop event on the current follow subscription + notify(simulator.subscriptionId, { event: 'stop' }); + + // Wait long enough for all bounded retry attempts to be exhausted + await waitFor(3_500); + + const followCalls = providerSend.mock.calls.filter(([method]) => method === 'chainHead_v1_follow'); + expect(followCalls.length).toBe(3); + expect(switchSpy).toHaveBeenCalledTimes(1); + + switchSpy.mockRestore(); + errorSpy.mockRestore(); + }, 10_000); + }); + + describe('switchEndpoint', () => { + it('calls provider.disconnect(true) for a WsProvider with retry enabled', async () => { + const { WsProvider } = await import('@dedot/providers'); + const wsProvider = new WsProvider({ endpoint: 'ws://127.0.0.1:1', retryDelayMs: 2_500 }); + const disconnectSpy = vi.spyOn(wsProvider, 'disconnect').mockResolvedValue(undefined); + + const wsClient = new JsonRpcClient({ provider: wsProvider }); + const wsChainHead = new ChainHead(wsClient); + + (wsChainHead as any).switchEndpoint(); + + expect(disconnectSpy).toHaveBeenCalledTimes(1); + expect(disconnectSpy).toHaveBeenCalledWith(true); + }); + + it('does not switch endpoint for a WsProvider with retry disabled', async () => { + const { WsProvider } = await import('@dedot/providers'); + const wsProvider = new WsProvider({ endpoint: 'ws://127.0.0.1:1', retryDelayMs: -1 }); + const disconnectSpy = vi.spyOn(wsProvider, 'disconnect').mockResolvedValue(undefined); + + const wsClient = new JsonRpcClient({ provider: wsProvider }); + const wsChainHead = new ChainHead(wsClient); + + (wsChainHead as any).switchEndpoint(); + + expect(disconnectSpy).not.toHaveBeenCalled(); + }); + + it('does not switch endpoint for a non-WsProvider', async () => { + const disconnectSpy = vi.spyOn(provider, 'disconnect').mockResolvedValue(undefined); + + (chainHead as any).switchEndpoint(); + + expect(disconnectSpy).not.toHaveBeenCalled(); + }); }); describe('caching', () => { diff --git a/packages/providers/src/error.ts b/packages/providers/src/error.ts index 068750e8..cff2fd44 100644 --- a/packages/providers/src/error.ts +++ b/packages/providers/src/error.ts @@ -25,3 +25,15 @@ export class JsonRpcError extends Error { export class MaxRetryAttemptedError extends DedotError {} export class NetworkDisconnectedError extends DedotError {} + +export class WsConnectionError extends DedotError { + /** + * The original error event emitted by the websocket, if any + */ + cause?: any; + + constructor(message: string, cause?: any) { + super(message); + this.cause = cause; + } +} diff --git a/packages/providers/src/ws/WsProvider.ts b/packages/providers/src/ws/WsProvider.ts index a34ccab3..99e90734 100644 --- a/packages/providers/src/ws/WsProvider.ts +++ b/packages/providers/src/ws/WsProvider.ts @@ -1,9 +1,9 @@ import { WebSocket } from '@polkadot/x-ws'; import { assert, DedotError, deferred, Deferred } from '@dedot/utils'; import { SubscriptionProvider } from '../base/index.js'; -import { MaxRetryAttemptedError, NetworkDisconnectedError } from '../error.js'; +import { MaxRetryAttemptedError, NetworkDisconnectedError, WsConnectionError } from '../error.js'; import { JsonRpcRequest } from '../types.js'; -import { pickRandomItem, validateEndpoint } from '../utils.js'; +import { validateEndpoint } from '../utils.js'; export interface WsConnectionState { /** @@ -62,16 +62,31 @@ export interface WsProviderOptions { * @default 30000 */ timeout?: number; + /** + * Timeout in milliseconds for establishing the websocket connection (handshake). + * If the socket does not open within this time, it is force-closed so the next + * reconnection attempt can rotate to a different endpoint. + * If the value is <= 0, the connect timeout is disabled. + * + * @default 30000 + */ + connectTimeoutMs?: number; } const DEFAULT_OPTIONS: Partial = { retryDelayMs: 2500, timeout: 30_000, + connectTimeoutMs: 30_000, }; // No resubscribe subscriptions these prefixes on reconnect const NO_RESUBSCRIBE_PREFIXES = ['author_', 'chainHead_', 'transactionWatch_']; +// How long a connection must stay open before it is considered healthy and the +// retry/failure-memory state is reset. A connection that drops before this window +// (or before receiving any message) does not reset the attempt counter. +const HEALTHY_CONNECTION_THRESHOLD_MS = 10_000; + /** * @name WsProvider * @description A JSON-RPC provider that connects to WebSocket endpoints with support for @@ -128,12 +143,18 @@ export class WsProvider extends SubscriptionProvider { #options: Required; #ws?: WebSocket; #timeoutTimer?: ReturnType; + #connectTimer?: ReturnType; + #healthyTimer?: ReturnType; // Connection state tracking #attempt: number = 0; #currentEndpoint?: string; #initialized: boolean = false; + // Endpoints that have failed since the last healthy connection (for array failover). + // Excluded from selection until exhausted or a healthy connection is established. + #failedEndpoints: Set = new Set(); + // Recovering promise for request queueing during reconnection #recovering?: Deferred; @@ -181,6 +202,13 @@ export class WsProvider extends SubscriptionProvider { return this.#options.retryDelayMs > 0; } + /** + * Whether automatic reconnection/retry is enabled (retryDelayMs > 0) + */ + get retryEnabled(): boolean { + return this.#retryEnabled; + } + get #canRetry() { if (!this.#retryEnabled) return false; @@ -212,41 +240,71 @@ export class WsProvider extends SubscriptionProvider { ); } - // If endpoint is an array, this should not happen as arrays are converted to functions in #normalizeOptions - // But we add this check for type safety if (Array.isArray(endpoint)) { - throw new DedotError('Endpoint array should have been converted to a selector function'); + return this.#pickEndpointFromArray(endpoint); } return endpoint; } + /** + * Pick an endpoint from the array, preferring ones that haven't failed since the + * last healthy connection. Excludes the current endpoint and any remembered + * failed endpoints. When every endpoint has been excluded, the failure memory is + * reset so the full list becomes available again. + */ + #pickEndpointFromArray(endpoints: string[]): string { + const excluded = new Set(this.#failedEndpoints); + if (this.#currentEndpoint) excluded.add(this.#currentEndpoint); + + let candidates = endpoints.filter((e) => !excluded.has(e)); + + // Everything has been excluded - reset failure memory and retry the whole list, + // still preferring an endpoint different from the current one when possible. + if (candidates.length === 0) { + this.#failedEndpoints.clear(); + + candidates = this.#currentEndpoint ? endpoints.filter((e) => e !== this.#currentEndpoint) : endpoints; + if (candidates.length === 0) candidates = endpoints; + } + + return candidates[Math.floor(Math.random() * candidates.length)]; + } + + /** + * Create the underlying WebSocket instance. Extracted so it can be overridden in tests. + */ + protected createWebSocket(endpoint: string): WebSocket { + return new WebSocket(endpoint); + } + async #doConnect() { assert(!this.#ws, 'Websocket connection already exists'); try { this.#currentEndpoint = await this.#getEndpoint(); - this.#ws = new WebSocket(this.#currentEndpoint); + this.#ws = this.createWebSocket(this.#currentEndpoint); this.#ws.onopen = this.#onSocketOpen; this.#ws.onclose = this.#onSocketClose; this.#ws.onmessage = this.#onSocketMessage; this.#ws.onerror = this.#onSocketError; + this.#setupConnectTimeoutHandler(); this.#setupRequestTimeoutHandler(); } catch (e: any) { console.error('Error connecting to websocket', e); + // The endpoint failed to connect - remember it so array failover avoids it + if (this.#currentEndpoint) this.#failedEndpoints.add(this.#currentEndpoint); this.emit('error', e); throw e; } } - async #connectAndRetry(skipTrackAttempts?: boolean) { + async #connectAndRetry() { assert(!this.#ws, 'Websocket connection already exists'); - if (!skipTrackAttempts) { - this.#attempt += 1; - } + this.#attempt += 1; try { await this.#doConnect(); @@ -266,7 +324,7 @@ export class WsProvider extends SubscriptionProvider { setTimeout( () => { this._setStatus('reconnecting'); - this.#connectAndRetry(immediate).catch(console.error); + this.#connectAndRetry().catch(console.error); }, immediate ? 0 : this.#options.retryDelayMs, ); @@ -286,9 +344,15 @@ export class WsProvider extends SubscriptionProvider { } } - #onSocketOpen = async (event: Event) => { - // Connection successful - reset attempt counter - this.#attempt = 0; + #onSocketOpen = async () => { + // The handshake completed - cancel the connect timeout + this.#clearConnectTimeoutHandler(); + + // Arm the healthy-connection timer. The attempt counter / failure memory are only + // reset once the connection has proven healthy (stayed open long enough or received + // a message), not merely because the socket opened - an endpoint that opens then + // immediately drops must keep accumulating attempts. + this.#armHealthyConnectionTimer(); this._setStatus('connected', this.#currentEndpoint); @@ -383,11 +447,15 @@ export class WsProvider extends SubscriptionProvider { super._cleanUp(); this.#clearWs(); this.#clearTimeoutHandler(); + this.#clearConnectTimeoutHandler(); + this.#clearHealthyConnectionTimer(); } #onSocketClose = (event: CloseEvent) => { this.#clearWs(); this.#clearTimeoutHandler(); + this.#clearConnectTimeoutHandler(); + this.#clearHealthyConnectionTimer(); // Keep _handlers intact for retry on reconnect, they will be processed in #onSocketOpen this._pendingNotifications = {}; @@ -397,6 +465,9 @@ export class WsProvider extends SubscriptionProvider { // attempt to reconnect if the connection was not closed manually (via .disconnect()) const normalClosure = event.code === 1000; if (!normalClosure) { + // Remember the endpoint that just dropped so array failover prefers a different one + if (this.#currentEndpoint) this.#failedEndpoints.add(this.#currentEndpoint); + // Initialize recovering promise to queue incoming requests during reconnection this.#recovering = deferred(); @@ -408,13 +479,65 @@ export class WsProvider extends SubscriptionProvider { }; #onSocketError = (error: Event) => { - this.emit('error', error); + this.emit('error', new WsConnectionError(`Websocket connection error from ${this.#currentEndpoint}`, error)); }; #onSocketMessage = (message: MessageEvent) => { + // Receiving a message proves the endpoint is responsive - mark the connection healthy + this.#onHealthyConnection(); this._onReceiveResponse(message.data); }; + #armHealthyConnectionTimer() { + this.#clearHealthyConnectionTimer(); + + const threshold = HEALTHY_CONNECTION_THRESHOLD_MS; + if (threshold <= 0) { + this.#onHealthyConnection(); + return; + } + + this.#healthyTimer = setTimeout(() => this.#onHealthyConnection(), threshold); + } + + #clearHealthyConnectionTimer() { + if (!this.#healthyTimer) return; + + clearTimeout(this.#healthyTimer); + this.#healthyTimer = undefined; + } + + /** + * Called once a connection has proven healthy. Resets the retry attempt counter and + * clears the failed-endpoint memory so the full endpoint list is available again. + */ + #onHealthyConnection() { + this.#clearHealthyConnectionTimer(); + this.#attempt = 0; + this.#failedEndpoints.clear(); + } + + #setupConnectTimeoutHandler() { + const connectTimeout = this.#options.connectTimeoutMs; + if (connectTimeout <= 0) return; + + this.#clearConnectTimeoutHandler(); + + this.#connectTimer = setTimeout(() => { + console.warn(`Connection to ${this.#currentEndpoint} timed out after ${connectTimeout}ms, reconnecting...`); + // Force-close the half-open socket; the close handler will trigger a retry + // that rotates to a different endpoint. + this.#ws?.close(); + }, connectTimeout); + } + + #clearConnectTimeoutHandler() { + if (!this.#connectTimer) return; + + clearTimeout(this.#connectTimer); + this.#connectTimer = undefined; + } + #normalizeOptions(options: WsProviderOptions | string | string[] | WsEndpointSelector): Required { const normalizedOptions = typeof options === 'string' || typeof options === 'function' || Array.isArray(options) @@ -438,10 +561,7 @@ export class WsProvider extends SubscriptionProvider { } endpoint.forEach(validateEndpoint); - - normalizedOptions.endpoint = (info: WsConnectionState) => { - return pickRandomItem(endpoint, info.currentEndpoint); - }; + // Kept as an array - endpoint selection (with failure memory) happens in #getEndpoint } return normalizedOptions as Required; diff --git a/packages/providers/src/ws/__tests__/WsProvider.spec.ts b/packages/providers/src/ws/__tests__/WsProvider.spec.ts index d4bcaced..653df31a 100644 --- a/packages/providers/src/ws/__tests__/WsProvider.spec.ts +++ b/packages/providers/src/ws/__tests__/WsProvider.spec.ts @@ -1,4 +1,4 @@ -import { JsonRpcRequest, JsonRpcResponse } from '@dedot/providers'; +import { JsonRpcRequest, JsonRpcResponse, MaxRetryAttemptedError } from '@dedot/providers'; import { Client, Server } from 'mock-socket'; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; import { WsEndpointSelector, WsProvider, WsProviderOptions } from '../WsProvider.js'; @@ -221,6 +221,163 @@ describe('WsProvider', () => { } }, 20000); // Increase test timeout to 20 seconds + describe('Socket Error Handling', () => { + it('emits proper Error instances for socket-level errors', async () => { + // No server is listening on this port + const badEndpoint = 'ws://127.0.0.1:9999'; + const provider = new WsProvider({ endpoint: badEndpoint, retryDelayMs: -1 }); + + const errors: any[] = []; + provider.on('error', (e) => errors.push(e)); + + await expect(provider.connect()).rejects.toThrow(); + + expect(errors.length).toBeGreaterThan(0); + errors.forEach((e) => { + expect(e).toBeInstanceOf(Error); + expect(e.message).toContain(badEndpoint); + }); + }); + }); + + describe('Endpoint Failure Memory', () => { + // Servers that accept the connection then immediately drop it with an abnormal + // close code, simulating endpoints that are up but misbehaving. + const createFlakyServer = (url: string): Server => { + const server = new Server(url); + server.on('connection', (socket) => { + setTimeout(() => socket.close({ code: 3000, reason: 'flaky', wasClean: false }), 0); + }); + return server; + }; + + it('rotates through all endpoints without getting stuck on failing ones', async () => { + // Two flaky endpoints + one healthy endpoint. + // Math.random pinned to 0 always picks the first available candidate, so without + // failure memory the selection would bounce between the two flaky endpoints forever + // and never reach the healthy one. + const flaky1 = 'ws://127.0.0.1:9990'; + const flaky2 = 'ws://127.0.0.1:9991'; + const healthy = FAKE_WS_URL; + + const server1 = createFlakyServer(flaky1); + const server2 = createFlakyServer(flaky2); + + const mockRandom = vi.spyOn(Math, 'random').mockReturnValue(0); + + const provider = new WsProvider({ + endpoint: [flaky1, flaky2, healthy], + retryDelayMs: 30, + }); + provider.on('error', () => {}); + + const connectedEndpoints: (string | undefined)[] = []; + provider.on('connected', (url) => connectedEndpoints.push(url)); + + try { + await provider.connect(); + + // Wait for the failover cascade to settle + await new Promise((resolve) => setTimeout(resolve, 600)); + + expect(provider.status).toBe('connected'); + // It must end up on the healthy endpoint, not bouncing between the flaky ones + expect(connectedEndpoints[connectedEndpoints.length - 1]).toBe(healthy); + } finally { + mockRandom.mockRestore(); + server1.stop(); + server2.stop(); + await provider.disconnect().catch(() => {}); + } + }, 15_000); + }); + + describe('Retry Attempt Counting', () => { + const createDroppingServer = (url: string, closeCode: number): Server => { + const server = new Server(url); + server.on('connection', (socket) => { + // Drop the connection right after it opens, before any message is exchanged + setTimeout(() => socket.close({ code: closeCode, reason: 'flaky', wasClean: false }), 0); + }); + return server; + }; + + // The socket opens (so connect() resolves) but immediately drops on every attempt. + // If `open` reset the attempt counter, maxRetryAttempts would never be reached and + // no MaxRetryAttemptedError would ever be emitted. + const expectMaxRetryEmitted = async (url: string, closeCode: number) => { + const server = createDroppingServer(url, closeCode); + const provider = new WsProvider({ endpoint: url, retryDelayMs: 20, maxRetryAttempts: 3 }); + + const maxRetryEmitted = new Promise((resolve) => { + provider.on('error', (e: any) => { + if (e instanceof MaxRetryAttemptedError) resolve(e); + }); + }); + + try { + await provider.connect(); // resolves on the first successful open + await expect(maxRetryEmitted).resolves.toBeInstanceOf(MaxRetryAttemptedError); + } finally { + server.stop(); + await provider.disconnect().catch(() => {}); + } + }; + + it('counts immediate retries (close code 1005) toward maxRetryAttempts', async () => { + await expectMaxRetryEmitted('ws://127.0.0.1:9953', 1005); + }, 15_000); + + it('counts delayed retries (close code 1006) toward maxRetryAttempts without resetting on open', async () => { + await expectMaxRetryEmitted('ws://127.0.0.1:9954', 1006); + }, 15_000); + }); + + describe('Connect Timeout', () => { + class FakeSocket { + onopen: any = null; + onclose: any = null; + onmessage: any = null; + onerror: any = null; + readyState = 0; // CONNECTING + close = vi.fn((code?: number) => { + this.readyState = 3; + this.onclose?.({ code: code ?? 1006, reason: 'forced', wasClean: false }); + }); + send = vi.fn(); + constructor(public url: string) {} + } + + class TimeoutTestProvider extends WsProvider { + public sockets: FakeSocket[] = []; + protected override createWebSocket(endpoint: string): any { + const socket = new FakeSocket(endpoint); + this.sockets.push(socket); + return socket; + } + } + + it('force-closes a socket that never finishes the handshake within connectTimeoutMs', async () => { + const provider = new TimeoutTestProvider({ + endpoint: FAKE_WS_URL, + connectTimeoutMs: 80, + retryDelayMs: 50, + maxRetryAttempts: 1, + }); + provider.on('error', () => {}); + + provider.connect().catch(() => {}); + + // Wait past the connect timeout + await new Promise((resolve) => setTimeout(resolve, 200)); + + // The first socket never opened, so the connect-timeout should have force-closed it + expect(provider.sockets[0].close).toHaveBeenCalled(); + + await provider.disconnect().catch(() => {}); + }, 15_000); + }); + describe('Array Endpoints', () => { const FAKE_WS_URL_3 = 'ws://127.0.0.1:9946'; const FAKE_WS_URL_4 = 'ws://127.0.0.1:9947';