diff --git a/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts b/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts index ded2e3e7..42969500 100644 --- a/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts +++ b/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts @@ -1,9 +1,10 @@ /* eslint-env jest */ -import { DocumentNode } from 'graphql'; +import { DocumentNode, print } from 'graphql'; import WS from 'jest-websocket-mock'; import { DeserializedMessage } from 'jest-websocket-mock/lib/websocket'; -import { lastValueFrom, Observer, Subscription, take } from 'rxjs'; +import { lastValueFrom, Observer, Subscriber, Subscription, take } from 'rxjs'; + import { ConnectionStatus } from '../../../model/connection-status'; import { GraphqlService } from '../graphql.service'; @@ -23,7 +24,7 @@ export class GraphQLSubscriptionsFixture { .mockReturnValue(SERVER_URL); jest - .spyOn(this.graphqlService as any, 'fetchTemporaryApiKey') + .spyOn(this.graphqlService as any, 'getTemporaryApiKey') .mockResolvedValue(DUMMY_API_KEY); } @@ -40,12 +41,8 @@ export class GraphQLSubscriptionsFixture { return (this.graphqlService as any).subscriptions.length; } - getGraphqlServiceSubscriptionObserverMapSize(): number { - return Object.keys(this.getGraphqlServiceSubscriptionObserverMap()).length; - } - - getGraphqlServiceSubscriptionObserverMap(): Record> { - return (this.graphqlService as any).subscriptionObserverMap; + getMessagesSubscribers(): Map>> { + return this.graphqlService['messagesSubscribers']; } async waitForConnection() { @@ -99,11 +96,12 @@ export class GraphQLSubscriptionsFixture { async consumeSubscribeMessage( query: DocumentNode | string = 'subscription { baba }', + { id }: { readonly id: string } = { id: '1' }, ) { expect(await this.server.nextMessage).toEqual({ - id: '1', + id, type: 'start', - payload: { query }, + payload: { query: typeof query === 'string' ? query : print(query) }, }); } diff --git a/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts b/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts index 88ed8840..9715bd5e 100644 --- a/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts +++ b/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts @@ -1,7 +1,8 @@ import gql from 'graphql-tag'; import fetchMock from 'jest-fetch-mock'; import { WebSocket } from 'mock-socket'; -import { Subscriber } from 'rxjs'; +import { firstValueFrom } from 'rxjs'; + import { ConnectionStatus } from '../../../model/connection-status'; import { GraphQLSubscriptionsFixture } from '../__fixtures__/graphql-subscriptions-fixture'; import { QminderGraphQLError } from '../graphql.service'; @@ -96,24 +97,23 @@ describe('GraphQL subscriptions', () => { }); it('cleans up internal state when unsubscribing', async () => { - // start the test with an empty observer-map - expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0); + expect(fixture.getMessagesSubscribers().size).toBe(0); + const subscription = fixture.triggerSubscription(); await fixture.handleConnectionInit(); await fixture.consumeSubscribeMessage(); - // the observer map should equal { "1": Subscriber => spy } - expect(fixture.getGraphqlServiceSubscriptionObserverMap()).toEqual({ - '1': expect.any(Subscriber), - }); - // unsubscribing should clean up + expect([...fixture.getMessagesSubscribers().keys()]).toEqual(['1']); + subscription.unsubscribe(); await fixture.consumeAnyMessage(); - expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0); + + expect(fixture.getMessagesSubscribers().size).toBe(0); }); it('when receiving a published message for a subscription that does not exist anymore, it does not throw', async () => { - expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0); + expect(fixture.getMessagesSubscribers().size).toBe(0); + const subscription = fixture.triggerSubscription(); await fixture.handleConnectionInit(); @@ -150,7 +150,7 @@ describe('GraphQL subscriptions', () => { it('when the server does not reply to ping message, reconnects', async () => { const reconnectSpy = jest.spyOn( fixture.graphqlService as any, - 'handleConnectionDropWithThisBound', + 'handleConnectionDrop', ); jest.useFakeTimers(); const subscription = fixture.triggerSubscription(); @@ -175,7 +175,7 @@ describe('GraphQL subscriptions', () => { it('handles multiple consecutive connect/ping/timeout cycles gracefully', async () => { const reconnectSpy = jest.spyOn( fixture.graphqlService as any, - 'handleConnectionDropWithThisBound', + 'handleConnectionDrop', ); jest.useFakeTimers(); const subscription = fixture.triggerSubscription(); @@ -221,7 +221,7 @@ describe('GraphQL subscriptions', () => { it('when the server replies to ping message, does not reconnect', async () => { const reconnectSpy = jest.spyOn( fixture.graphqlService as any, - 'handleConnectionDropWithThisBound', + 'handleConnectionDrop', ); jest.useFakeTimers(); const subscription = fixture.triggerSubscription(); @@ -335,45 +335,901 @@ describe('GraphQL subscriptions', () => { subscription.unsubscribe(); }); - it('error messages are propagated to the subscriber', async () => { - const ERRORS: QminderGraphQLError[] = [ - { - message: - "Invalid Syntax : offending token 'createdTickets' at line 1 column 1", - sourcePreview: - 'createdTickets(locationId: 673) {\n' + - ' id\n' + - ' firstName\n' + - ' lastName\n', - offendingToken: 'createdTickets', - locations: [], - errorType: 'InvalidSyntax', - extensions: null, - path: null, + it(`should send GQL_STOP for retryable errored subscription if it's unsubscribed`, async () => { + const query = gql` + subscription { + name + } + `; + + const subscription = fixture.triggerSubscription(query); + + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(query); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [ + { + message: 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], }, - ]; - const errorSpy = jest.fn(); - const subscription = fixture.triggerSubscription('subscription { baba }', { - error: errorSpy, }); + + subscription.unsubscribe(); + + expect(await fixture.getNextMessage()).toEqual( + expect.objectContaining({ + id: '1', + type: 'stop', + }), + ); + }); + + it(`should not send GQL_STOP for subscription if it has been cleaned up`, async () => { + const subscription = fixture.triggerSubscription(gql` + subscription { + name + } + `); + + await fixture.handleConnectionInit(); + + fixture.server.send({ + id: '1', + type: 'complete', + }); + + expect(fixture.server.messagesToConsume.pendingItems).toHaveLength(0); + + subscription.unsubscribe(); + }); + + it(`should clean up retryable errored subscription if it's unsubscribed`, async () => { + const subscription = fixture.triggerSubscription(gql` + subscription { + name + } + `); + await fixture.handleConnectionInit(); - await fixture.consumeSubscribeMessage(); + fixture.server.send({ id: '1', type: 'error', payload: { data: null, - errors: ERRORS, + errors: [ + { + message: 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], }, }); - expect(errorSpy).toHaveBeenCalledWith(ERRORS); - // Cleans up as well - expect( - (fixture.graphqlService as any).subscriptionObserverMap['1'], - ).toBeUndefined(); + expect([...fixture.getMessagesSubscribers().keys()]).toEqual(['1']); subscription.unsubscribe(); + + expect(fixture.getMessagesSubscribers().size).toBe(0); + }); + + describe('GQL_CONNECTION_ACK', () => { + it('should retry retryable errored subscriptions', async () => { + const subscription = fixture.triggerSubscription(gql` + subscription { + name + } + `); + + const subscription2 = fixture.triggerSubscription(gql` + subscription { + name2 + } + `); + + await fixture.handleConnectionInit(); + + [{ messageId: '1' }, { messageId: '2' }].forEach(({ messageId }) => { + fixture.server.send({ + id: messageId, + type: 'error', + payload: { + data: null, + errors: [ + { + message: + 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], + }, + }); + }); + + await fixture.closeWithCode(1001); + + fixture.openServer(); + await fixture.handleConnectionInit(); + + expect(await fixture.getNextMessage()).toEqual( + expect.objectContaining({ + id: '1', + type: 'start', + }), + ); + + expect(await fixture.getNextMessage()).toEqual( + expect.objectContaining({ + id: '2', + type: 'start', + }), + ); + + subscription.unsubscribe(); + subscription2.unsubscribe(); + }); + }); + + describe('GQL_DATA', () => { + it('should send data to subscriber', async () => { + const subscriptionNextSpy = jest.fn(); + + const subscription = fixture.triggerSubscription( + gql` + subscription { + name + } + `, + { next: subscriptionNextSpy }, + ); + + await fixture.handleConnectionInit(); + + fixture.sendMessageToClient({ + id: '1', + type: 'data', + payload: { data: { id: '1' } }, + }); + + expect(subscriptionNextSpy).toHaveBeenCalledWith({ id: '1' }); + + subscription.unsubscribe(); + }); + }); + + describe('GQL_COMPLETE', () => { + it('should complete subscription', async () => { + const subscriptionCompleteSpy = jest.fn(); + + const subscription = fixture.triggerSubscription( + gql` + subscription { + name + } + `, + { complete: subscriptionCompleteSpy }, + ); + + await fixture.handleConnectionInit(); + + fixture.sendMessageToClient({ + id: '1', + type: 'complete', + }); + + expect(subscriptionCompleteSpy).toHaveBeenCalledTimes(1); + + subscription.unsubscribe(); + }); + + it('should clean up subscription', async () => { + const subscription = fixture.triggerSubscription(gql` + subscription { + name + } + `); + + await fixture.handleConnectionInit(); + + fixture.sendMessageToClient({ + id: '1', + type: 'complete', + }); + + expect(fixture.getMessagesSubscribers().size).toBe(0); + + subscription.unsubscribe(); + }); + + it('should not send GQL_STOP if subscription completes', async () => { + const subscription = fixture.triggerSubscription(gql` + subscription { + name + } + `); + + await fixture.handleConnectionInit(); + + fixture.sendMessageToClient({ + id: '1', + type: 'complete', + }); + + expect(fixture.server.messagesToConsume.pendingItems).toHaveLength(0); + + subscription.unsubscribe(); + }); + }); + + describe('GQL_ERROR', () => { + it.each([ + 'BAD_REQUEST', + 'FIELD_NOT_FOUND', + 'INVALID_ARGUMENT', + 'InvalidSyntax', + 'NOT_FOUND', + 'PERMISSION_DENIED', + 'ValidationError', + ])( + 'should immediately error non-retryable subscriptions (errorType: %s)', + async (errorType) => { + const subscriptionErrorSpy = jest.fn(); + + const subscription = fixture.triggerSubscription( + gql` + subscription { + name + } + `, + { error: subscriptionErrorSpy }, + ); + + await fixture.handleConnectionInit(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + errors: [ + { + message: 'error', + errorType, + }, + ] satisfies QminderGraphQLError[], + }, + }); + + expect(subscriptionErrorSpy).toHaveBeenCalledWith([ + { + message: 'error', + errorType, + }, + ]); + + subscription.unsubscribe(); + }, + ); + + it.each([ + 'BAD_REQUEST', + 'FIELD_NOT_FOUND', + 'INVALID_ARGUMENT', + 'InvalidSyntax', + 'NOT_FOUND', + 'PERMISSION_DENIED', + 'ValidationError', + ])( + 'should clean up non-retryable subscriptions (errorType: %s)', + async (errorType) => { + const subscription = fixture.triggerSubscription( + gql` + subscription { + name + } + `, + { error: () => {} }, + ); + + await fixture.handleConnectionInit(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + errors: [ + { + message: 'error', + errorType, + }, + ] satisfies QminderGraphQLError[], + }, + }); + + expect(fixture.getMessagesSubscribers().size).toBe(0); + + subscription.unsubscribe(); + }, + ); + + it('should not immediately error retryable subscriptions', async () => { + const subscriptionErrorSpy = jest.fn(); + + const subscription = fixture.triggerSubscription( + gql` + subscription { + name + } + `, + { error: subscriptionErrorSpy }, + ); + + await fixture.handleConnectionInit(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [ + { + message: 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], + }, + }); + + expect(subscriptionErrorSpy).not.toHaveBeenCalled(); + + subscription.unsubscribe(); + }); + + it('should not clean up subscriptions before retrying', async () => { + const subscription = fixture.triggerSubscription(gql` + subscription { + name + } + `); + + await fixture.handleConnectionInit(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [ + { + message: 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], + }, + }); + + expect([...fixture.getMessagesSubscribers().keys()]).toEqual(['1']); + + subscription.unsubscribe(); + }); + + it('should not drop socket connection', async () => { + const connectionDropSpy = jest.spyOn( + fixture.graphqlService as any, + 'handleConnectionDrop', + ); + + const subscription = fixture.triggerSubscription(gql` + subscription { + name + } + `); + + await fixture.handleConnectionInit(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [ + { + message: 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], + }, + }); + + expect(connectionDropSpy).not.toHaveBeenCalled(); + + subscription.unsubscribe(); + }); + + it('should retry retryable errored subscriptions after delay', async () => { + jest.useFakeTimers(); + + const query = gql` + subscription { + name + } + `; + + const subscription = fixture.triggerSubscription(query); + + const query2 = gql` + subscription { + name2 + } + `; + + const subscription2 = fixture.triggerSubscription(query2); + + // Wait for temporary api key + await jest.runAllTimersAsync(); + + await fixture.handleConnectionInit(); + + // Send subscriptions start messages + await jest.runOnlyPendingTimersAsync(); + + await fixture.consumeSubscribeMessage(query, { id: '1' }); + await fixture.consumeSubscribeMessage(query2, { id: '2' }); + + // Send ping message + await jest.advanceTimersToNextTimerAsync(); + + await fixture.consumePingMessage(); + fixture.sendMessageToClient({ type: 'pong' }); + + [{ messageId: '1' }, { messageId: '2' }].forEach(({ messageId }) => { + fixture.server.send({ + id: messageId, + type: 'error', + payload: { + data: null, + errors: [ + { + message: + 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], + }, + }); + }); + + // Wait for retry + await jest.advanceTimersByTimeAsync(7_000); + + expect(await fixture.getNextMessage()).toEqual( + expect.objectContaining({ + id: '1', + type: 'start', + }), + ); + + expect(await fixture.getNextMessage()).toEqual( + expect.objectContaining({ + id: '2', + type: 'start', + }), + ); + + subscription.unsubscribe(); + subscription2.unsubscribe(); + + jest.useRealTimers(); + }); + + it('should error retryable subscriptions after 5 failed retries', async () => { + jest.useFakeTimers(); + + const subscriptionErrorSpy = jest.fn(); + + const query = gql` + subscription { + name + } + `; + + const subscription = fixture.triggerSubscription(query, { + error: subscriptionErrorSpy, + }); + + const query2 = gql` + subscription { + name2 + } + `; + + const subscription2ErrorSpy = jest.fn(); + + const subscription2 = fixture.triggerSubscription(query2, { + error: subscription2ErrorSpy, + }); + + // Wait for temporary api key + await jest.runAllTimersAsync(); + + await fixture.handleConnectionInit(); + + // Send subscriptions start messages + await jest.runOnlyPendingTimersAsync(); + + await fixture.consumeSubscribeMessage(query, { id: '1' }); + await fixture.consumeSubscribeMessage(query2, { id: '2' }); + + // Send ping message + await jest.advanceTimersToNextTimerAsync(); + + await fixture.consumePingMessage(); + fixture.sendMessageToClient({ type: 'pong' }); + + const sendErrorMessages = (): void => { + [{ messageId: '1' }, { messageId: '2' }].forEach(({ messageId }) => { + fixture.server.send({ + id: messageId, + type: 'error', + payload: { + data: null, + errors: [ + { + message: + 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], + }, + }); + }); + }; + + sendErrorMessages(); + + for (let retryCount = 0; retryCount < 5; retryCount++) { + // Wait for retry + await jest.advanceTimersToNextTimerAsync(); + + // mock-socket delivers client→server messages via setTimeout(4) + await jest.advanceTimersByTimeAsync(10); + + sendErrorMessages(); + } + + expect(subscriptionErrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: 'Subscription failed after 5 retries', + }), + ); + + expect(subscription2ErrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: 'Subscription failed after 5 retries', + }), + ); + + subscription.unsubscribe(); + subscription2.unsubscribe(); + + jest.useRealTimers(); + }); + + it('should clean up retryable errored subscriptions after 5 failed retries', async () => { + jest.useFakeTimers(); + + const query = gql` + subscription { + name + } + `; + + const subscription = fixture.triggerSubscription(query, { + error: () => {}, + }); + + const query2 = gql` + subscription { + name2 + } + `; + + const subscription2 = fixture.triggerSubscription(query2, { + error: () => {}, + }); + + // Wait for temporary api key + await jest.runAllTimersAsync(); + + await fixture.handleConnectionInit(); + + // Send subscriptions start messages + await jest.runOnlyPendingTimersAsync(); + + await fixture.consumeSubscribeMessage(query, { id: '1' }); + await fixture.consumeSubscribeMessage(query2, { id: '2' }); + + // Send ping message + await jest.advanceTimersToNextTimerAsync(); + + await fixture.consumePingMessage(); + fixture.sendMessageToClient({ type: 'pong' }); + + const sendErrorMessages = (): void => { + [{ messageId: '1' }, { messageId: '2' }].forEach(({ messageId }) => { + fixture.server.send({ + id: messageId, + type: 'error', + payload: { + data: null, + errors: [ + { + message: + 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], + }, + }); + }); + }; + + sendErrorMessages(); + + expect([...fixture.getMessagesSubscribers().keys()]).toEqual(['1', '2']); + + for (let retryCount = 0; retryCount < 5; retryCount++) { + // Wait for retry + await jest.advanceTimersToNextTimerAsync(); + + // mock-socket delivers client → server messages via setTimeout(4) + await jest.advanceTimersByTimeAsync(10); + + sendErrorMessages(); + } + + expect(fixture.getMessagesSubscribers().size).toBe(0); + + subscription.unsubscribe(); + subscription2.unsubscribe(); + + jest.useRealTimers(); + }); + + it(`should error subscription if server sends unknown message (has data)`, async () => { + const subscriptionErrorSpy = jest.fn(); + + const subscription = fixture.triggerSubscription( + gql` + subscription { + name + } + `, + { error: subscriptionErrorSpy }, + ); + + await fixture.handleConnectionInit(); + + fixture.sendMessageToClient({ + id: '1', + type: 'unknown', + payload: { data: { unknown: 'unknown' } }, + }); + + expect(subscriptionErrorSpy).toHaveBeenCalledWith({ unknown: 'unknown' }); + + subscription.unsubscribe(); + }); + + it(`should clean up subscription if server sends unknown message (has data)`, async () => { + const subscription = fixture.triggerSubscription( + gql` + subscription { + name + } + `, + { error: () => {} }, + ); + + await fixture.handleConnectionInit(); + + fixture.sendMessageToClient({ + id: '1', + type: 'unknown', + payload: { data: { unknown: 'unknown' } }, + }); + + expect(fixture.getMessagesSubscribers().size).toBe(0); + + subscription.unsubscribe(); + }); + + it(`should error subscription if server sends unknown message (has errors)`, async () => { + const subscriptionErrorSpy = jest.fn(); + + const subscription = fixture.triggerSubscription( + gql` + subscription { + name + } + `, + { error: subscriptionErrorSpy }, + ); + + await fixture.handleConnectionInit(); + + fixture.sendMessageToClient({ + id: '1', + type: 'unknown', + payload: { + errors: [{ message: 'error' }] satisfies QminderGraphQLError[], + }, + }); + + expect(subscriptionErrorSpy).toHaveBeenCalledWith([{ message: 'error' }]); + + subscription.unsubscribe(); + }); + + it(`should clean up subscription if server sends unknown message (has errors)`, async () => { + const subscription = fixture.triggerSubscription( + gql` + subscription { + name + } + `, + { error: () => {} }, + ); + + await fixture.handleConnectionInit(); + + fixture.sendMessageToClient({ + id: '1', + type: 'unknown', + payload: { + errors: [{ message: 'error' }] satisfies QminderGraphQLError[], + }, + }); + + expect(fixture.getMessagesSubscribers().size).toBe(0); + + subscription.unsubscribe(); + }); + }); + + describe('haveAnyRetryableSubscriptionsErrored', () => { + it(`should emit 'true' if any retryable subscriptions have errored`, async () => { + const subscription = fixture.triggerSubscription(gql` + subscription { + name + } + `); + + await fixture.handleConnectionInit(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [ + { + message: 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], + }, + }); + + const haveAnySubscriptionsErrored = await firstValueFrom( + fixture.graphqlService.haveAnyRetryableSubscriptionsErrored(), + ); + + expect(haveAnySubscriptionsErrored).toBe(true); + + subscription.unsubscribe(); + }); + + it('should clear retryable errored subscriptions with a delay after successful batch retry', async () => { + jest.useFakeTimers(); + + const query = gql` + subscription { + name + } + `; + + const subscription = fixture.triggerSubscription(query); + + // Wait for temporary api key + await jest.runAllTimersAsync(); + + await fixture.handleConnectionInit(); + + // Send subscriptions start messages + await jest.runOnlyPendingTimersAsync(); + + await fixture.consumeSubscribeMessage(query); + + // Send ping message + await jest.advanceTimersToNextTimerAsync(); + + await fixture.consumePingMessage(); + fixture.sendMessageToClient({ type: 'pong' }); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [ + { + message: 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], + }, + }); + + // Get latest haveAnySubscriptionsErrored state + await jest.advanceTimersByTimeAsync(0); + + const haveAnySubscriptionsErroredBeforeRetry = await firstValueFrom( + fixture.graphqlService.haveAnyRetryableSubscriptionsErrored(), + ); + + expect(haveAnySubscriptionsErroredBeforeRetry).toBe(true); + + // Wait for retry + await jest.advanceTimersByTimeAsync(7_000); + + const haveAnySubscriptionsErroredAfterRetry = await firstValueFrom( + fixture.graphqlService.haveAnyRetryableSubscriptionsErrored(), + ); + + expect(haveAnySubscriptionsErroredAfterRetry).toBe(false); + + subscription.unsubscribe(); + + jest.useRealTimers(); + }); + + it(`should emit 'true' if there are retryable errored subscriptions but socket reconnects`, async () => { + const subscription = fixture.triggerSubscription(gql` + subscription { + name + } + `); + + await fixture.handleConnectionInit(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [ + { + message: 'The maximum subscription limit of 100 has been reached', + }, + ] satisfies QminderGraphQLError[], + }, + }); + + const haveAnySubscriptionsErroredBeforeReconnect = await firstValueFrom( + fixture.graphqlService.haveAnyRetryableSubscriptionsErrored(), + ); + + expect(haveAnySubscriptionsErroredBeforeReconnect).toBe(true); + + await fixture.closeWithCode(1001); + + fixture.openServer(); + await fixture.handleConnectionInit(); + + const haveAnySubscriptionsErroredAfterReconnect = await firstValueFrom( + fixture.graphqlService.haveAnyRetryableSubscriptionsErrored(), + ); + + expect(haveAnySubscriptionsErroredAfterReconnect).toBe(false); + + subscription.unsubscribe(); + }); }); describe('WebSocket readyState guards', () => { diff --git a/packages/javascript-api/src/lib/services/graphql/__tests__/graphql.service.spec.ts b/packages/javascript-api/src/lib/services/graphql/__tests__/graphql.service.spec.ts index abb2e9ff..3858ee89 100644 --- a/packages/javascript-api/src/lib/services/graphql/__tests__/graphql.service.spec.ts +++ b/packages/javascript-api/src/lib/services/graphql/__tests__/graphql.service.spec.ts @@ -42,7 +42,7 @@ describe('GraphQL service', function () { requestStub = sinon.stub(Qminder.ApiBase, 'queryGraph'); temporaryApiKeySpy = jest - .spyOn(graphqlService as any, 'fetchTemporaryApiKey') + .spyOn(graphqlService as any, 'getTemporaryApiKey') .mockResolvedValue(keyValue); }); @@ -155,15 +155,7 @@ describe('GraphQL service', function () { expect(WebSocket).toHaveBeenCalledTimes(1); }); }); - describe('.generateOperationId', () => { - it('returns an incrementing string', () => { - expect((graphqlService as any).generateOperationId()).toBe('1'); - expect((graphqlService as any).generateOperationId()).toBe('2'); - expect((graphqlService as any).generateOperationId()).toBe('3'); - expect((graphqlService as any).generateOperationId()).toBe('4'); - expect((graphqlService as any).generateOperationId()).toBe('5'); - }); - }); + afterEach(function () { requestStub.restore(); }); diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index 87458fff..c20de039 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -8,28 +8,27 @@ import WebSocket, { CloseEvent } from 'isomorphic-ws'; import { BehaviorSubject, distinctUntilChanged, + map, Observable, - Observer, + scan, shareReplay, + startWith, + Subject, + Subscriber, + take, } from 'rxjs'; import { ConnectionStatus } from '../../model/connection-status.js'; +import { Logger } from '../../util/logger/logger.js'; import { calculateRandomizedExponentialBackoffTime } from '../../util/randomized-exponential-backoff/randomized-exponential-backoff.js'; import { sleepMs } from '../../util/sleep-ms/sleep-ms.js'; import { ApiBase, GraphqlQuery } from '../api-base/api-base.js'; import { TemporaryApiKeyService } from '../temporary-api-key/temporary-api-key.service.js'; -import { Logger } from '../../util/logger/logger.js'; - -type QueryOrDocument = string | DocumentNode; -function queryToString(query: QueryOrDocument): string { - if (typeof query === 'string') { - return query; - } - if (query.kind === 'Document') { - return print(query); - } - throw new Error('queryToString: query must be a string or a DocumentNode'); +function parseQuery(queryOrDocumentNode: string | DocumentNode): string { + return typeof queryOrDocumentNode === 'string' + ? queryOrDocumentNode + : print(queryOrDocumentNode); } export interface QminderGraphQLError { @@ -42,23 +41,18 @@ export interface QminderGraphQLError { path?: (string | number)[] | null; } -interface OperationMessage { - id?: string; - type: MessageType; - payload?: { - data?: T | null; - errors?: QminderGraphQLError[]; +interface Message { + readonly id?: string; + readonly type: MessageType; + readonly payload?: { + readonly data?: Record | null; + readonly errors?: QminderGraphQLError[]; }; } -class Subscription { - id: string; - query: string; - - constructor(id: string, query: string) { - this.id = id; - this.query = query; - } +interface Subscription { + readonly messageId: string; + readonly query: string; } enum MessageType { @@ -77,8 +71,23 @@ enum MessageType { GQL_ERROR = 'error', } -const PONG_TIMEOUT_IN_MS = 12000; -const PING_PONG_INTERVAL_IN_MS = 20000; +const NON_RETRYABLE_SUBSCRIPTION_ERROR_TYPES = [ + 'BAD_REQUEST', + 'FIELD_NOT_FOUND', + 'INVALID_ARGUMENT', + 'InvalidSyntax', + 'NOT_FOUND', + 'PERMISSION_DENIED', + 'ValidationError', +] as const; + +const RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT = 5; + +// To avoid haveAnySubscriptionsErrored returning 'false' temporarily if retrying errored subscriptions fails. +const RETRYABLE_ERRORED_SUBSCRIPTIONS_SUCCEEDED_DELAY_MS = 1_000; + +const PONG_TIMEOUT_IN_MS = 12_000; +const PING_PONG_INTERVAL_IN_MS = 20_000; // https://www.w3.org/TR/websockets/#concept-websocket-close-fail const CLIENT_SIDE_CLOSE_EVENT = 1000; @@ -102,22 +111,70 @@ export class GraphqlService { ConnectionStatus.DISCONNECTED, ); - private nextSubscriptionId = 1; + private subscriptionsCount = 0; private subscriptions: Subscription[] = []; - private readonly subscriptionObserverMap: { [id: string]: Observer } = - {}; + private readonly messagesSubscribers = new Map< + string, + Subscriber> + >(); private readonly subscriptionConnection$: Observable; + + private readonly retryableErroredSubscriptionsAction$ = new Subject< + | { + readonly type: 'add'; + readonly messageId: string; + } + | { + readonly type: 'remove'; + readonly messageId: string; + } + | { + readonly type: 'clear'; + } + >(); + + private readonly retryableErroredSubscriptionsMessageIds$ = + this.retryableErroredSubscriptionsAction$.pipe( + scan((messageIds, action) => { + const result = new Set(messageIds); + + switch (action.type) { + case 'add': + return result.add(action.messageId); + case 'remove': + result.delete(action.messageId); + return result; + case 'clear': + return new Set(); + } + }, new Set()), + startWith(new Set()), + shareReplay(1), + ); + + private readonly haveAnyRetryableSubscriptionsErrored$ = + this.retryableErroredSubscriptionsMessageIds$.pipe( + map(({ size }) => !!size), + distinctUntilChanged(), + ); + + private retryableErroredSubscriptionsRetryTimeout: ReturnType< + typeof setTimeout + > | null = null; + + private retryableErroredSubscriptionsSuccessTimeout: ReturnType< + typeof setTimeout + > | null = null; + + private retryableErroredSubscriptionsRetryCount = 0; + private temporaryApiKeyService: TemporaryApiKeyService | undefined; private pongTimeout: any; private pingPongInterval: any; - private readonly sendPingWithThisBound = this.sendPing.bind(this); - - private readonly handleConnectionDropWithThisBound = - this.handleConnectionDrop.bind(this); private connectionAttemptsCount = 0; @@ -128,6 +185,8 @@ export class GraphqlService { distinctUntilChanged(), shareReplay(1), ); + + this.retryableErroredSubscriptionsMessageIds$.subscribe(); } /** @@ -182,41 +241,77 @@ export class GraphqlService { /** * Subscribe to Qminder Events API using GraphQL. * - * For example + * @example + * + * Be notified of any created tickets * * ```javascript * import { Qminder } from 'qminder-api'; - * // 1. Be notified of any created tickets - * try { - * const observable = Qminder.GraphQL.subscribe("subscription { createdTickets(locationId: 123) { id firstName } }") * - * observable.subscribe(data => console.log(data)); - * // => { createdTickets: { id: '12', firstName: 'Marta' } } + * try { + * Qminder.GraphQL.subscribe(` + * subscription { + * createdTickets(locationId: 123) { + * id + * firstName + * } + * } + * `).subscribe((data) => { + * console.log(data); // { createdTickets: { id: '12', firstName: 'Marta' } } + * }); * } catch (error) { * console.error(error); * } * ``` * - * @param queryDocument required: the GraphQL query to send, for example `"subscription { createdTickets(locationId: 123) { id firstName } }"` - * @returns an RxJS Observable that will push data as - * @throws when the 'query' argument is undefined or an empty string + * @param queryOrDocumentNode the GraphQL query to send, for example `"subscription { createdTickets(locationId: 123) { id firstName } }"` + * @returns a RxJS Observable that will push data + * @throws when the `queryDocument` argument is an empty string + * + * Retries retryable errored subscriptions up to 5 times. Afterwards throws an error. + * + * To get notified when any retryable subscriptions have errored, use the {@link haveAnyRetryableSubscriptionsErrored} method. + * + * @see {@link NON_RETRYABLE_SUBSCRIPTION_ERROR_TYPES | non-retryable subscription error types} */ - subscribe(queryDocument: QueryOrDocument): Observable { - const query = queryToString(queryDocument); - - if (!query || query.length === 0) { + subscribe>( + queryOrDocumentNode: string | DocumentNode, + ): Observable { + const query = parseQuery(queryOrDocumentNode); + if (!query) { throw new Error( 'GraphQLService query expects a GraphQL query as its first argument', ); } - return new Observable((observer: Observer) => { - const id = this.generateOperationId(); - this.subscriptions.push(new Subscription(id, query)); - this.sendMessage(id, MessageType.GQL_START, { query }); - this.subscriptionObserverMap[id] = observer; + return new Observable((subscriber) => { + const messageId = `${++this.subscriptionsCount}`; + this.subscriptions.push({ messageId, query }); + + this.sendMessage(messageId, MessageType.GQL_START, { query }).catch( + (error: Error) => { + this.logger.error('Failed to start subscription: ', error); + }, + ); + + this.messagesSubscribers.set(messageId, subscriber); - return () => this.stopSubscription(id); + return () => { + if (this.messagesSubscribers.has(messageId)) { + this.sendMessage(messageId, MessageType.GQL_STOP, null).catch( + (error) => { + this.logger.error('Failed to stop subscription: ', error); + }, + ); + + this.retryableErroredSubscriptionsAction$.next({ + type: 'remove', + messageId, + }); + } + + this.cleanUpSubscription(messageId); + }; }); } @@ -227,13 +322,13 @@ export class GraphqlService { * There is no need to call this method in order for data transfer to work. The `subscribe()` method also initializes * a websocket connection before proceeding. */ - openPendingWebSocket(): void { + async openPendingWebSocket(): Promise { if ( ![ConnectionStatus.CONNECTING, ConnectionStatus.CONNECTED].includes( this.connectionStatus, ) ) { - this.openSocket(); + await this.openSocket(); } } @@ -243,7 +338,7 @@ export class GraphqlService { * This method is automatically called when doing Qminder.setKey(). * @hidden */ - setKey(apiKey: string) { + setKey(apiKey: string): void { this.temporaryApiKeyService = new TemporaryApiKeyService( this.apiServer, apiKey, @@ -258,27 +353,34 @@ export class GraphqlService { return this.subscriptionConnection$; } + /** + * Have any retryable GraphQL subscriptions been rejected by the server. + * + * Emits `false` if all retryable errored subscriptions have been successfully retried. + * + * @see {@link NON_RETRYABLE_SUBSCRIPTION_ERROR_TYPES | non-retryable subscription error types} + */ + haveAnyRetryableSubscriptionsErrored(): Observable { + return this.haveAnyRetryableSubscriptionsErrored$; + } + /** * Set the WebSocket hostname the GraphQL service uses. * @hidden */ - setServer(apiServer: string) { + setServer(apiServer: string): void { this.apiServer = apiServer; } - private stopSubscription(id: string) { - this.sendMessage(id, MessageType.GQL_STOP, null); - this.cleanupSubscription(id); - } + private cleanUpSubscription(messageId: string): void { + this.messagesSubscribers.delete(messageId); - private cleanupSubscription(id: string) { - delete this.subscriptionObserverMap[id]; - this.subscriptions = this.subscriptions.filter((sub) => { - return sub.id !== id; - }); + this.subscriptions = this.subscriptions.filter( + (subscription) => subscription.messageId !== messageId, + ); } - private openSocket() { + private async openSocket(): Promise { if ( [ConnectionStatus.CONNECTING, ConnectionStatus.CONNECTED].includes( this.connectionStatus, @@ -286,26 +388,23 @@ export class GraphqlService { ) { return; } + this.setConnectionStatus(ConnectionStatus.CONNECTING); this.logger.info('Connecting to websocket'); - this.fetchTemporaryApiKey() - .then((temporaryApiKey: string) => { - this.createSocketConnection(temporaryApiKey); - }) - .catch((e) => { - throw e; - }); + + const temporaryApiKey = await this.getTemporaryApiKey(); + this.createSocketConnection(temporaryApiKey); } - private async fetchTemporaryApiKey(): Promise { - return this.temporaryApiKeyService.fetchTemporaryApiKey(); + private async getTemporaryApiKey(): Promise { + return await this.temporaryApiKeyService.fetchTemporaryApiKey(); } private getServerUrl(temporaryApiKey: string): string { return `wss://${this.apiServer}:443/graphql/subscription?rest-api-key=${temporaryApiKey}`; } - private createSocketConnection(temporaryApiKey: string) { + private createSocketConnection(temporaryApiKey: string): void { if (this.socket) { this.socket.onclose = null; this.socket.onmessage = null; @@ -314,10 +413,10 @@ export class GraphqlService { this.socket.close(); this.socket = null; } + this.socket = new WebSocket(this.getServerUrl(temporaryApiKey)); - const socket = this.socket; - socket.onopen = () => { + this.socket.onopen = () => { this.sendRawMessage( JSON.stringify({ id: undefined, @@ -327,7 +426,7 @@ export class GraphqlService { ); }; - socket.onclose = (event: CloseEvent) => { + this.socket.onclose = (event) => { this.logger.warn('WebSocket connection closed:', { code: event.code, reason: event.reason, @@ -335,19 +434,23 @@ export class GraphqlService { this.setConnectionStatus(ConnectionStatus.DISCONNECTED); this.socket = null; - this.clearPingMonitoring(); + if (this.shouldRetry(event)) { const timer = calculateRandomizedExponentialBackoffTime( this.connectionAttemptsCount, ); - this.logger.info( - `Waiting for ${timer.toFixed(1)}ms before reconnecting`, - ); - sleepMs(timer).then(() => { - this.connectionAttemptsCount += 1; - this.openSocket(); - }); + + this.logger.info(`Reconnect socket in ${timer.toFixed(0)}ms`); + + sleepMs(timer) + .then(async () => { + this.connectionAttemptsCount++; + return await this.openSocket(); + }) + .catch((error: Error) => { + this.logger.error('Failed to reconnect socket: ', error); + }); } if (this.connectionStatus === ConnectionStatus.CONNECTING) { @@ -357,105 +460,177 @@ export class GraphqlService { } }; - socket.onerror = () => { - const message = 'Websocket error occurred!'; + this.socket.onerror = () => { if (this.isBrowserOnline()) { - this.logger.error(message); + this.logger.error('Websocket error occurred!'); } else { - this.logger.info(message); + this.logger.info('Websocket error occurred!'); } }; - socket.onmessage = (rawMessage: { data: WebSocket.Data }) => { - if (typeof rawMessage.data === 'string') { - const message: OperationMessage = JSON.parse(rawMessage.data); + this.socket.onmessage = (event) => { + if (typeof event.data !== 'string') { + return; + } - switch (message.type) { - case MessageType.GQL_CONNECTION_KEEP_ALIVE: - break; + const message: Message = JSON.parse(event.data); - case MessageType.GQL_CONNECTION_ACK: { - this.connectionAttemptsCount = 0; - this.setConnectionStatus(ConnectionStatus.CONNECTED); - this.logger.info('Connected to websocket'); - this.startConnectionMonitoring(); - let resubscriptionFailed = false; - for (const subscription of this.subscriptions) { - const payload = { query: subscription.query }; - const msg = JSON.stringify({ - id: subscription.id, - type: MessageType.GQL_START, - payload, - }); - if (!this.sendRawMessage(msg)) { - this.logger.warn( - `Failed to re-subscribe ${this.subscriptions.length} subscription(s): WebSocket not open`, - ); - resubscriptionFailed = true; - break; - } - } - if (resubscriptionFailed) { - this.handleConnectionDrop(); + switch (message.type) { + case MessageType.GQL_CONNECTION_KEEP_ALIVE: + break; + + case MessageType.GQL_CONNECTION_ACK: { + this.connectionAttemptsCount = 0; + + this.clearErroredSubscriptionsTimeouts(); + this.retryableErroredSubscriptionsRetryCount = 0; + this.retryableErroredSubscriptionsAction$.next({ type: 'clear' }); + + this.setConnectionStatus(ConnectionStatus.CONNECTED); + this.logger.info('Connected to websocket'); + this.startConnectionMonitoring(); + + let resubscriptionFailed = false; + + for (const { messageId, query } of this.subscriptions) { + const msg = JSON.stringify({ + id: messageId, + type: MessageType.GQL_START, + payload: { query }, + }); + + if (!this.sendRawMessage(msg)) { + this.logger.warn( + `Failed to re-subscribe ${this.subscriptions.length} subscription(s): WebSocket not open`, + ); + + resubscriptionFailed = true; + break; } - break; } - case MessageType.GQL_DATA: - this.subscriptionObserverMap[message.id]?.next( - message.payload.data, - ); - break; + if (resubscriptionFailed) { + this.handleConnectionDrop().catch((error: Error) => { + this.logger.error( + 'Failed to handle connection drop after resubscription failure: ', + error, + ); + }); + } - case MessageType.GQL_COMPLETE: - this.subscriptionObserverMap[message.id]?.complete(); - break; + break; + } - case MessageType.GQL_PONG: - clearTimeout(this.pongTimeout); - break; + case MessageType.GQL_DATA: + this.retryableErroredSubscriptionsAction$.next({ + type: 'remove', + messageId: message.id, + }); + + this.messagesSubscribers.get(message.id)?.next(message.payload.data); + break; + + case MessageType.GQL_COMPLETE: { + this.retryableErroredSubscriptionsAction$.next({ + type: 'remove', + messageId: message.id, + }); + + const subscriber = this.messagesSubscribers.get(message.id); + this.cleanUpSubscription(message.id); + subscriber?.complete(); + + break; + } + + case MessageType.GQL_PONG: + clearTimeout(this.pongTimeout); + break; + + case MessageType.GQL_ERROR: { + const errors = message.payload?.errors ?? []; - case MessageType.GQL_ERROR: - this.subscriptionObserverMap[message.id]?.error( - message.payload.errors, + if (this.isAnySubscriptionErrorNonRetryable(errors)) { + this.logger.error( + `Non-retryable GraphQL subscription error: ${JSON.stringify( + message, + )}`, ); - this.cleanupSubscription(message.id); + + // May have been retryable before + this.retryableErroredSubscriptionsAction$.next({ + type: 'remove', + messageId: message.id, + }); + + const subscriber = this.messagesSubscribers.get(message.id); + this.cleanUpSubscription(message.id); + subscriber?.error(errors); + break; + } - default: - if (message.payload && message.payload.data) { - this.subscriptionObserverMap[message.id]?.error( - message.payload.data, - ); - } else if ( - message.payload.errors && - message.payload.errors.length > 0 - ) { - this.subscriptionObserverMap[message.id]?.error( - message.payload.errors, - ); - } + this.logger.warn( + `Retryable GraphQL subscription error: ${JSON.stringify(message)}`, + ); + + this.clearErroredSubscriptionsSuccessTimeout(); + + this.retryableErroredSubscriptionsAction$.next({ + type: 'add', + messageId: message.id, + }); + + if ( + this.retryableErroredSubscriptionsRetryCount < + RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT && + !this.retryableErroredSubscriptionsRetryTimeout + ) { + this.scheduleErroredSubscriptionsRetry(); + } else if (!this.retryableErroredSubscriptionsRetryTimeout) { + this.failErroredSubscriptions(); + } + + break; + } + + default: { + const subscriber = this.messagesSubscribers.get(message.id); + if (!subscriber) { + return; + } + + if (message.payload?.data) { + this.cleanUpSubscription(message.id); + subscriber.error(message.payload.data); + } else if (message.payload?.errors?.length) { + this.cleanUpSubscription(message.id); + subscriber.error(message.payload.errors); + } } } }; } - private shouldRetry(event: CloseEvent) { - if (event.code !== CLIENT_SIDE_CLOSE_EVENT) { - return true; - } - - return Object.entries(this.subscriptionObserverMap).length > 0; + private shouldRetry(event: CloseEvent): boolean { + return ( + event.code !== CLIENT_SIDE_CLOSE_EVENT || !!this.messagesSubscribers.size + ); } - private sendMessage(id: string, type: MessageType, payload: any) { - if (this.connectionStatus === ConnectionStatus.CONNECTED) { - if (!this.sendRawMessage(JSON.stringify({ id, type, payload }))) { - this.logger.warn('Message dropped: WebSocket is not in OPEN state'); - this.handleConnectionDrop(); - } - } else { - this.openSocket(); + private async sendMessage( + id: string, + type: MessageType, + payload: Record | null, + ): Promise { + if (this.connectionStatus !== ConnectionStatus.CONNECTED) { + await this.openSocket(); + return; + } + + if (!this.sendRawMessage(JSON.stringify({ id, type, payload }))) { + this.logger.warn('Message dropped: WebSocket is not in OPEN state'); + await this.handleConnectionDrop(); } } @@ -464,16 +639,11 @@ export class GraphqlService { this.socket.send(message); return true; } - return false; - } - private generateOperationId(): string { - const currentId = `${this.nextSubscriptionId}`; - this.nextSubscriptionId += 1; - return currentId; + return false; } - private setConnectionStatus(status: ConnectionStatus) { + private setConnectionStatus(status: ConnectionStatus): void { this.connectionStatus = status; this.connectionStatus$.next(status); } @@ -484,40 +654,48 @@ export class GraphqlService { } private monitorWithPingPong(): void { - this.pingPongInterval = setInterval( - this.sendPingWithThisBound, - PING_PONG_INTERVAL_IN_MS, - ); + this.pingPongInterval = setInterval(() => { + this.sendPing(); + }, PING_PONG_INTERVAL_IN_MS); } private monitorWithOfflineEvent(): void { if (typeof window !== 'undefined') { - window.removeEventListener('offline', this.sendPingWithThisBound); - window.addEventListener('offline', this.sendPingWithThisBound); + window.removeEventListener('offline', () => { + this.sendPing(); + }); + + window.addEventListener('offline', () => { + this.sendPing(); + }); } } private sendPing(): void { - this.pongTimeout = setTimeout( - this.handleConnectionDropWithThisBound, - PONG_TIMEOUT_IN_MS, - ); + this.pongTimeout = setTimeout(() => { + this.handleConnectionDrop().catch((error: Error) => { + this.logger.error('Failed to handle pong connection drop: ', error); + }); + }, PONG_TIMEOUT_IN_MS); + this.sendRawMessage(JSON.stringify({ type: MessageType.GQL_PING })); } - private handleConnectionDrop(): void { + private async handleConnectionDrop(): Promise { if (this.connectionStatus === ConnectionStatus.CONNECTING) { return; } + if (this.isBrowserOnline()) { this.logger.warn(`Websocket connection dropped!`); } else { this.logger.info(`Websocket connection dropped. We are offline.`); } + this.setConnectionStatus(ConnectionStatus.DISCONNECTED); this.clearPingMonitoring(); - this.openSocket(); + await this.openSocket(); } private clearPingMonitoring(): void { @@ -525,14 +703,100 @@ export class GraphqlService { clearInterval(this.pingPongInterval); } + private clearErroredSubscriptionsTimeouts(): void { + clearTimeout(this.retryableErroredSubscriptionsRetryTimeout ?? undefined); + this.retryableErroredSubscriptionsRetryTimeout = null; + + this.clearErroredSubscriptionsSuccessTimeout(); + } + + private clearErroredSubscriptionsSuccessTimeout(): void { + clearTimeout(this.retryableErroredSubscriptionsSuccessTimeout ?? undefined); + this.retryableErroredSubscriptionsSuccessTimeout = null; + } + + private isAnySubscriptionErrorNonRetryable( + errors: QminderGraphQLError[], + ): boolean { + return errors + .filter((error) => error.errorType) + .some(({ errorType }) => + ( + NON_RETRYABLE_SUBSCRIPTION_ERROR_TYPES as unknown as string[] + ).includes(errorType), + ); + } + + private scheduleErroredSubscriptionsRetry(): void { + const retryCount = this.retryableErroredSubscriptionsRetryCount + 1; + const delay = calculateRandomizedExponentialBackoffTime(retryCount); + + this.logger.info( + `Retry (${retryCount}) errored subscriptions in ${delay.toFixed(0)}ms`, + ); + + this.retryableErroredSubscriptionsRetryTimeout = setTimeout(() => { + this.retryErroredSubscriptions(); + this.retryableErroredSubscriptionsRetryCount = retryCount; + this.retryableErroredSubscriptionsRetryTimeout = null; + }, delay); + } + + private failErroredSubscriptions(): void { + this.logger.error( + `Errored subscriptions retry limit (${RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT}) reached, giving up`, + ); + + this.retryableErroredSubscriptionsMessageIds$ + .pipe(take(1)) + .subscribe((messageIds) => { + for (const messageId of messageIds) { + const subscriber = this.messagesSubscribers.get(messageId); + this.cleanUpSubscription(messageId); + + subscriber?.error( + new Error( + `Subscription failed after ${this.retryableErroredSubscriptionsRetryCount} retries`, + ), + ); + } + }); + } + + private retryErroredSubscriptions(): void { + this.retryableErroredSubscriptionsMessageIds$ + .pipe(take(1)) + .subscribe((messageIds) => { + for (const messageId of messageIds) { + const subscription = this.subscriptions.find( + (subscription) => subscription.messageId === messageId, + ); + + if (!subscription) { + continue; + } + + this.sendRawMessage( + JSON.stringify({ + id: subscription.messageId, + type: MessageType.GQL_START, + payload: { query: subscription.query }, + }), + ); + } + + this.retryableErroredSubscriptionsSuccessTimeout = setTimeout(() => { + this.retryableErroredSubscriptionsAction$.next({ type: 'clear' }); + this.retryableErroredSubscriptionsRetryCount = 0; + this.retryableErroredSubscriptionsSuccessTimeout = null; + }, RETRYABLE_ERRORED_SUBSCRIPTIONS_SUCCEEDED_DELAY_MS); + }); + } + /** - * Returns the online status of the browser. - * In the non-browser environment (NodeJS) this always returns true. + * In a non-browser environment (NodeJS) returns `true`. */ private isBrowserOnline(): boolean { - if (typeof navigator === 'undefined') { - return true; - } - return navigator.onLine; + return typeof navigator === 'undefined' || navigator.onLine; } }