Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions packages/api/src/client/BaseSubstrateClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
RuntimeVersion,
StorageDataLike,
} from '@dedot/codecs';
import { type JsonRpcProvider, WsProvider } from '@dedot/providers';
import { type JsonRpcProvider, MaxRetryAttemptedError, WsProvider } from '@dedot/providers';
import { type IStorage, LocalStorage } from '@dedot/storage';
import {
Callback,
Expand All @@ -30,6 +30,7 @@ import {
Deferred,
ensurePresence as _ensurePresence,
HexString,
JsonRpcV2NotSupportedError,
LRUCache,
noop,
u8aToHex,
Expand Down Expand Up @@ -346,12 +347,26 @@ export abstract class BaseSubstrateClient<
this.on('disconnected', this.onDisconnected);

return new Promise<this>((resolve, reject) => {
// Only reject on initialization errors (via the internal 'initError' event)
// or when the provider gives up reconnecting (MaxRetryAttemptedError).
// Transient provider errors (e.g., socket errors while the provider is retrying
// another endpoint) should not reject the pending connect() promise — terminal
// provider errors already reject through provider.connect() in JsonRpcClient.
const doReject = (err: unknown) => {
offInitError();
offError();
reject(err instanceof Error ? err : new Error(String(err)));
};

// @ts-ignore
const offInitError = this.on('initError', doReject);
// @ts-ignore
const offError = this.on('error', (err: unknown) => {
reject(err instanceof Error ? err : new Error(String(err)));
if (err instanceof MaxRetryAttemptedError) doReject(err);
});
// @ts-ignore
this.once('ready', () => {
offInitError();
offError();
resolve(this);
});
Expand All @@ -361,12 +376,32 @@ export abstract class BaseSubstrateClient<
protected onConnected = async () => {
try {
await this.initialize();
} catch (e) {
// @ts-ignore — surface init failure so the pending connect() promise can reject
} catch (e: any) {
// @ts-ignore — keep emitting 'error' for external observability
this.emit('error', e);

// If the endpoint misbehaves during initialization and the provider supports
// automatic retry, switch over to a different endpoint and re-initialize there
// instead of giving up on the current one
if (this.shouldSwitchEndpointOnInitError(e)) {
console.warn('Failed to initialize the client, switching to a different endpoint...', e);
(this.provider as WsProvider).disconnect(true).catch(noop);
return;
}

// @ts-ignore — surface init failure so the pending connect() promise can reject
this.emit('initError', e);
}
};

protected shouldSwitchEndpointOnInitError(e: any): boolean {
// JSON-RPC v2 not supported is a terminal error, switching endpoint won't help
// and DedotClient relies on it to fall back to legacy JSON-RPC
if (e instanceof JsonRpcV2NotSupportedError) return false;

return this.provider instanceof WsProvider && this.provider.retryEnabled;
}

protected onDisconnected = async () => {};

protected async initialize() {
Expand Down
4 changes: 4 additions & 0 deletions packages/api/src/client/LegacyClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ export class LegacyClient<ChainApi extends GenericSubstrateApi = SubstrateApi> /
if (!stalingDetectionFn) return;

this.block.best(stalingDetectionFn);

// Arm the watchdog immediately after initialization so an endpoint that stops
// producing blocks right after connecting is still detected (don't wait for the next block)
stalingDetectionFn();
}

#subscribeUpdates() {
Expand Down
4 changes: 4 additions & 0 deletions packages/api/src/client/V2Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ export class V2Client<ChainApi extends GenericSubstrateApi = SubstrateApi> // pr
if (!stalingDetectionFn) return;

this.chainHead.on('bestBlock', stalingDetectionFn);

// Arm the watchdog immediately after initialization so an endpoint that stops
// producing blocks right after connecting is still detected (don't wait for the next block)
stalingDetectionFn();
}

protected override async beforeDisconnect(): Promise<void> {
Expand Down
96 changes: 94 additions & 2 deletions packages/api/src/client/__tests__/BaseSubstrateClient.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,43 @@
import { type JsonRpcProvider, MaxRetryAttemptedError, WsProvider } from '@dedot/providers';
import { type IStorage } from '@dedot/storage';
import { JsonRpcV2NotSupportedError } from '@dedot/utils';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { BaseSubstrateClient } from '../BaseSubstrateClient.js';
import MockProvider from './MockProvider.js';

// A WsProvider that never opens a real socket - used to test endpoint switching behavior
class MockWsProvider extends WsProvider {
disconnectCalls: boolean[] = [];
failOnSwitch: boolean = false;

constructor() {
super('ws://127.0.0.1:1');
}

async connect(): Promise<this> {
this._setStatus('connected');
return this;
}

async disconnect(switchEndpoint?: boolean): Promise<void> {
this.disconnectCalls.push(!!switchEndpoint);

if (this.failOnSwitch) {
this.emit('error', new MaxRetryAttemptedError('Cannot reconnect to network after 3 retry attempts'));
this._setStatus('disconnected');
return;
}

// Simulate an automatic reconnection to a different endpoint
this._setStatus('disconnected');
setTimeout(() => this._setStatus('connected'), 10);
}
}

// Create a mock implementation of BaseSubstrateClient for testing
class MockBaseSubstrateClient extends BaseSubstrateClient {
constructor() {
super('v2', new MockProvider());
constructor(provider: JsonRpcProvider = new MockProvider()) {
super('v2', provider);
}

// Expose protected methods for testing
Expand Down Expand Up @@ -374,4 +405,65 @@ describe('BaseSubstrateClient', () => {
expect(apiAtCacheClearSpy).toHaveBeenCalledTimes(1);
});
});

describe('connect() error semantics', () => {
afterEach(() => {
vi.restoreAllMocks();
});

it('does not reject connect() on transient provider errors before connection', async () => {
const client = new MockBaseSubstrateClient();
const provider = client.provider as MockProvider;

// Simulate a provider that hits a transient socket error first,
// then successfully connects (e.g., retrying a different endpoint)
vi.spyOn(provider, 'connect').mockImplementation(async () => {
provider.emit('error', new Error('transient socket error'));
setTimeout(() => provider.setStatus('connected'), 10);
return provider;
});

await expect(client.connect()).resolves.toBe(client);
});

it('rejects connect() when initialization fails', async () => {
const client = new MockBaseSubstrateClient();
vi.spyOn(client as any, 'initialize').mockRejectedValue(new Error('init failed'));

await expect(client.connect()).rejects.toThrow('init failed');
});

it('switches endpoint when initialization fails and provider retry is enabled', async () => {
const provider = new MockWsProvider();
const client = new MockBaseSubstrateClient(provider);

const doInitSpy = vi
.spyOn(client as any, 'doInitialize')
.mockRejectedValueOnce(new Error('weird endpoint'));

await expect(client.connect()).resolves.toBe(client);
expect(provider.disconnectCalls).toEqual([true]);
expect(doInitSpy).toHaveBeenCalledTimes(2);
});

it('does not switch endpoint when JSON-RPC v2 is not supported', async () => {
const provider = new MockWsProvider();
const client = new MockBaseSubstrateClient(provider);

vi.spyOn(client as any, 'doInitialize').mockRejectedValue(new JsonRpcV2NotSupportedError('v2 not supported'));

await expect(client.connect()).rejects.toThrow(JsonRpcV2NotSupportedError);
expect(provider.disconnectCalls).toEqual([]);
});

it('rejects connect() when max retry attempts exhausted while switching endpoints', async () => {
const provider = new MockWsProvider();
provider.failOnSwitch = true;
const client = new MockBaseSubstrateClient(provider);

vi.spyOn(client as any, 'doInitialize').mockRejectedValue(new Error('weird endpoint'));

await expect(client.connect()).rejects.toThrow(MaxRetryAttemptedError);
});
});
});
13 changes: 13 additions & 0 deletions packages/api/src/client/__tests__/V2Client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ describe(
simulator && (await simulator.cleanup());
});

it('arms staling detection immediately after initialization', async () => {
const api = new V2Client({ provider });
const armFn = vi.fn();
vi.spyOn(api as any, 'getStalingDetectionFn').mockReturnValue(armFn);

await api.connect();

// The watchdog must be armed right after init, not only when the next block arrives
expect(armFn).toHaveBeenCalled();

await api.disconnect();
});

describe('cache disabled', () => {
let api: V2Client;
beforeEach(async () => {
Expand Down
60 changes: 56 additions & 4 deletions packages/api/src/json-rpc/group/ChainHead/ChainHead.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { $Header, BlockHash, Header, Option } from '@dedot/codecs';
import { type JsonRpcSubscription, NetworkDisconnectedError } from '@dedot/providers';
import { type JsonRpcSubscription, NetworkDisconnectedError, WsProvider } from '@dedot/providers';
import type { AsyncMethod, Unsub } from '@dedot/types';
import type {
ArchiveStorageResult,
Expand Down Expand Up @@ -72,6 +72,12 @@ const MIN_FINALIZED_QUEUE_SIZE = 10; // finalized queue size
const CHAINHEAD_CACHE_CAPACITY = 256;
const CHAINHEAD_CACHE_TTL = 30_000; // 30 seconds

// Number of times to attempt re-following the chain head after a `stop` event
// before giving up and switching to a different endpoint (if possible)
const STOP_RECOVERY_MAX_ATTEMPTS = 3;
// Delay between re-follow attempts after a `stop` event
const STOP_RECOVERY_RETRY_DELAY_MS = 1_000;

export class ChainHead extends JsonRpcGroup<ChainHeadEvent> {
#unsub?: Unsub;
#subscriptionId?: string;
Expand Down Expand Up @@ -282,6 +288,45 @@ export class ChainHead extends JsonRpcGroup<ChainHeadEvent> {
}
}

/**
* Attempt to re-follow the chain head with a bounded number of retries.
* Used to recover from a `stop` event - transient failures get a few chances
* before we give up and let the caller escalate (e.g. switch endpoint).
*/
async #reFollow(maxAttempts: number = STOP_RECOVERY_MAX_ATTEMPTS): Promise<void> {
let lastError: any;

for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
try {
await this.#doFollow();
return;
} catch (e: any) {
lastError = e;
console.error(`ChainHead re-follow attempt ${attempt}/${maxAttempts} failed`, e);

if (attempt < maxAttempts) {
await waitFor(STOP_RECOVERY_RETRY_DELAY_MS);
}
}
}

throw lastError ?? new ChainHeadError('Cannot recover from stop event!');
}

/**
* Escalate an unrecoverable `stop` event by switching to a different endpoint.
* Only effective when backed by a WsProvider with retry enabled - the provider
* will reconnect to a different endpoint and the client will re-initialize there.
*/
protected switchEndpoint(): void {
const provider = this.client.provider;

if (provider instanceof WsProvider && provider.retryEnabled) {
console.warn('Cannot recover ChainHead after a stop event, switching to a different endpoint...');
provider.disconnect(true).catch(noop);
}
}

/**
* Retry pending requests that were queued during network disconnection
*/
Expand Down Expand Up @@ -488,9 +533,13 @@ export class ChainHead extends JsonRpcGroup<ChainHeadEvent> {
// So any requests/operations coming to the chainHead should be put on waiting
// for the #recovering promise to resolve
this.#recovering = deferred<void>();
// Guard against an unhandled rejection: on recovery failure this promise is
// rejected, but it only has a consumer when an operation is in flight (#ensureFollowed).
// The no-op handler is harmless for real awaiters, who still receive the rejection.
this.#recovering.promise.catch(noop);

// 2. Attempt to re-follow the chainHead
this.#doFollow()
// 2. Attempt to re-follow the chainHead (with bounded retries)
this.#reFollow()
.then(() => {
// 3. Resolve the recovering promise
// This means to continue all pending requests while the chainHead started recovering mode at step 1.
Expand All @@ -510,13 +559,16 @@ export class ChainHead extends JsonRpcGroup<ChainHeadEvent> {
})
.catch((e: any) => {
console.error(e);
// TODO we should retry a few attempts
this.#recovering?.reject(new ChainHeadError('Cannot recover from stop event!'));

Object.values(this.#handlers).forEach(({ defer, operationId }) => {
defer.reject(new ChainHeadError('Cannot recover from stop event!'));
delete this.#handlers[operationId];
});

// Re-follow exhausted all attempts on the current endpoint;
// escalate by switching to a different endpoint (if supported)
this.switchEndpoint();
})
.finally(() => {
// cleaning up
Expand Down
Loading
Loading