diff --git a/packages/core/src/__tests__/destination-kit.test.ts b/packages/core/src/__tests__/destination-kit.test.ts index d72b51e4943..63fdd30f7f9 100644 --- a/packages/core/src/__tests__/destination-kit.test.ts +++ b/packages/core/src/__tests__/destination-kit.test.ts @@ -1,4 +1,4 @@ -import { ErrorCodes, IntegrationError } from '../errors' +import { ErrorCodes, IntegrationError, TokenPropagationRetryError, RetryableError } from '../errors' import { ActionDefinition, MultiStatusResponse } from '../destination-kit/action' import { StateContext, @@ -1452,6 +1452,67 @@ describe('destination kit', () => { ]) expect(spy).toHaveBeenCalledTimes(0) }) + + test('should refresh token and throw RetryableError(503) when TokenPropagationRetryError is thrown', async () => { + const refreshAccessToken = jest.fn().mockResolvedValue({ accessToken: 'fresh-token' }) + const onTokenRefresh = jest.fn() + + const destinationWithPropagationError: DestinationDefinition = { + name: 'Test Propagation Error Destination', + mode: 'cloud', + authentication: { + ...authentication, + refreshAccessToken + }, + actions: { + customEvent: { + title: 'Send a Custom Event', + description: 'Send events to a custom event in API', + defaultSubscription: 'type = "track"', + fields: { + advertiserId: { + label: 'Advertiser ID', + description: 'Advertiser Id', + type: 'string', + required: true + } + }, + perform: () => { + throw new TokenPropagationRetryError('Token not yet propagated (serviceErrorCode 65601)') + } + } + } + } + + const destinationTest = new Destination(destinationWithPropagationError) + const testEvent: SegmentEvent = { + properties: { a: 'foo' }, + userId: '3456fff', + type: 'track' + } + const testSettings = { + apiSecret: 'test_key', + subscription: { + subscribe: 'type = "track"', + partnerAction: 'customEvent', + mapping: { + advertiserId: '1231241241' + } + }, + oauth: { + access_token: 'some-access-token', + refresh_token: 'refresh-token' + } + } + + // handleError should refresh the token (expired tokens also return 65601) then + // throw RetryableError(503) so the retry uses a fresh token + const error = await destinationTest.onEvent(testEvent, testSettings, { onTokenRefresh }).catch((e) => e) + expect(error).toBeInstanceOf(RetryableError) + expect(error.status).toBe(503) + expect(refreshAccessToken).toHaveBeenCalledTimes(1) + expect(onTokenRefresh).toHaveBeenCalledWith({ accessToken: 'fresh-token' }) + }) }) describe('onBatch', () => { test('should refresh the access-token in case of Unauthorized(401)', async () => { diff --git a/packages/core/src/destination-kit/index.ts b/packages/core/src/destination-kit/index.ts index 45927603b6f..ed9c8d7ecd6 100644 --- a/packages/core/src/destination-kit/index.ts +++ b/packages/core/src/destination-kit/index.ts @@ -30,7 +30,14 @@ import type { ResultMultiStatusNode } from './types' import type { AllRequestOptions } from '../request-client' -import { ErrorCodes, IntegrationError, InvalidAuthenticationError, MultiStatusErrorReporter } from '../errors' +import { + ErrorCodes, + IntegrationError, + InvalidAuthenticationError, + MultiStatusErrorReporter, + TokenPropagationRetryError, + RetryableError +} from '../errors' import { AuthTokens, getAuthData, getOAuth2Data, updateOAuthSettings } from './parse-settings' import { InputData, Features } from '../mapping-kit' import { retry } from '../retry' @@ -1022,6 +1029,19 @@ export class Destination { settings: JSONObject, options?: OnEventOptions ): Promise { + // Handle TokenPropagationRetryError: LinkedIn returns serviceErrorCode 65601 for both + // token propagation delays and revoked/expired tokens. Refresh the token first so the + // retry always uses a fresh token. If the refresh itself fails (e.g. the refresh token + // is also expired), that error propagates instead of retrying forever with a dead token. + // RetryableError(503) signals Segment infrastructure to retry after backoff. + if (error instanceof TokenPropagationRetryError) { + const isOAuth = this.authentication?.scheme === 'oauth2' || this.authentication?.scheme === 'oauth-managed' + if (isOAuth) { + await this.handleAuthError(settings, options) + } + throw new RetryableError(error.message, 503) + } + const statusCode = (error as ResponseError).status ?? (error as HTTPError)?.response?.status ?? 500 const needsReauthentication = statusCode === 401 && diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index 42a50654a26..8d597703ca4 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -98,6 +98,33 @@ export class APIError extends IntegrationError { } } +/** + * Error indicating a 401 caused by OAuth token propagation delay (eventual + * consistency) rather than true revocation. The token was recently refreshed + * and is valid but has not yet propagated across all provider nodes. + * + * The framework converts this into a RetryableError(503) so Segment + * infrastructure retries the event after a backoff delay, by which time + * the token will have propagated. No additional token refresh is performed + * on the exception-throwing path — handleError intercepts this error before + * the OAuth re-authentication logic runs. + * + * This error is currently thrown by LinkedIn destination hooks when + * serviceErrorCode 65601 or 65602 appears in a 401 response body. Reuse for + * other providers should only occur after confirming the same + * retry-without-refresh semantic applies. + */ +export class TokenPropagationRetryError extends CustomError { + // Use 503 to match the RetryableError this converts into — do NOT use 401, + // which would cause generic status-based 401 handlers to trigger a token refresh. + status = 503 + code = ErrorCodes.TOKEN_PROPAGATION_RETRY + + constructor(message = 'Token not yet propagated, retry later') { + super(message) + } +} + /** * Error to indicate the destination has gone over its allotted execution time * and is self-terminating. @@ -207,6 +234,8 @@ export enum CustomErrorCodes { GET_AUDIENCE_FAILED = 'GET_AUDIENCE_FAILED', // When the RETL onMappingSave hook fails RETL_ON_MAPPING_SAVE_FAILED = 'RETL_ON_MAPPING_SAVE_FAILED', + // Retry later due to OAuth token propagation delay + TOKEN_PROPAGATION_RETRY = 'TOKEN_PROPAGATION_RETRY', // Fallback error code if no other error code matches UNKNOWN_ERROR = 'UNKNOWN_ERROR' } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index ce662378f41..53c02aacfb9 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -38,6 +38,7 @@ export { RetryableError, PayloadValidationError, SelfTimeoutError, + TokenPropagationRetryError, APIError, ErrorCodes, HttpErrorCodes, diff --git a/packages/core/src/retry.ts b/packages/core/src/retry.ts index 23123a907ba..6e3dbbd529e 100644 --- a/packages/core/src/retry.ts +++ b/packages/core/src/retry.ts @@ -26,6 +26,10 @@ export async function retry( return response } catch (error) { if (options?.onFailedAttempt) { + // IMPORTANT: if onFailedAttempt throws, that exception propagates immediately + // and the retry loop terminates — no further attempts are made. handleError relies + // on this behavior to convert TokenPropagationRetryError into a RetryableError + // that escapes to Segment infrastructure rather than being retried in-process. await options.onFailedAttempt(error, attemptCount) } diff --git a/packages/destination-actions/src/destinations/linkedin-audiences/constants.ts b/packages/destination-actions/src/destinations/linkedin-audiences/constants.ts index 0a9d9f654ae..4efe2d5f03f 100644 --- a/packages/destination-actions/src/destinations/linkedin-audiences/constants.ts +++ b/packages/destination-actions/src/destinations/linkedin-audiences/constants.ts @@ -3,3 +3,7 @@ import { LINKEDIN_AUDIENCES_API_VERSION } from './versioning-info' export const LINKEDIN_API_VERSION = LINKEDIN_AUDIENCES_API_VERSION export const BASE_URL = 'https://api.linkedin.com/rest' export const LINKEDIN_SOURCE_PLATFORM = 'SEGMENT' + +// LinkedIn service error codes that indicate token propagation delays (eventual consistency). +// These 401s are not true revocations — the token is valid but not yet propagated. +export const LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES = [65601, 65602] diff --git a/packages/destination-actions/src/destinations/linkedin-audiences/index.ts b/packages/destination-actions/src/destinations/linkedin-audiences/index.ts index 8bcabbb8069..21f85474277 100644 --- a/packages/destination-actions/src/destinations/linkedin-audiences/index.ts +++ b/packages/destination-actions/src/destinations/linkedin-audiences/index.ts @@ -1,11 +1,16 @@ import https from 'https' import type { DestinationDefinition, ModifiedResponse } from '@segment/actions-core' -import { InvalidAuthenticationError, IntegrationError, ErrorCodes } from '@segment/actions-core' +import { + InvalidAuthenticationError, + IntegrationError, + ErrorCodes, + TokenPropagationRetryError +} from '@segment/actions-core' import type { Settings } from './generated-types' import updateAudience from './updateAudience' -import { LINKEDIN_API_VERSION } from './constants' +import { LINKEDIN_API_VERSION, LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES } from './constants' import { LinkedInAudiences } from './api' import type { RefreshTokenResponse, @@ -148,7 +153,21 @@ const destination: DestinationDefinition = { authorization: `Bearer ${auth?.accessToken}`, 'LinkedIn-Version': LINKEDIN_API_VERSION }, - agent + agent, + afterResponse: [ + (_request: unknown, _options: unknown, response: { status: number; data: unknown }) => { + if (response.status === 401) { + const body = response.data as Record | undefined + const serviceErrorCode = body?.serviceErrorCode as number | undefined + if (serviceErrorCode && LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES.includes(serviceErrorCode)) { + throw new TokenPropagationRetryError( + `LinkedIn eventual consistency: token not yet propagated (serviceErrorCode ${serviceErrorCode})` + ) + } + } + return response + } + ] } }, diff --git a/packages/destination-actions/src/destinations/linkedin-audiences/updateAudience/__tests__/index.test.ts b/packages/destination-actions/src/destinations/linkedin-audiences/updateAudience/__tests__/index.test.ts index 79d7da30cbd..d1c10160e8c 100644 --- a/packages/destination-actions/src/destinations/linkedin-audiences/updateAudience/__tests__/index.test.ts +++ b/packages/destination-actions/src/destinations/linkedin-audiences/updateAudience/__tests__/index.test.ts @@ -1,5 +1,10 @@ import nock from 'nock' -import { createTestEvent, createTestIntegration } from '@segment/actions-core' +import { + createTestEvent, + createTestIntegration, + TokenPropagationRetryError, + RetryableError +} from '@segment/actions-core' import Destination from '../../index' import { BASE_URL, LINKEDIN_SOURCE_PLATFORM } from '../../constants' @@ -864,5 +869,101 @@ describe('LinkedinAudiences.updateAudience', () => { }) ).rejects.toThrow('The value of `source_segment_id` and `personas_audience_key` must match.') }) + + it('should throw TokenPropagationRetryError when LinkedIn returns 401 with token propagation error code 65601', async () => { + nock(`${BASE_URL}/dmpSegments`) + .get(/.*/) + .query(() => true) + .reply(200, { elements: [{ id: 'dmp_segment_id' }] }) + nock(`${BASE_URL}/dmpSegments/dmp_segment_id/users`).post(/.*/, updateUsersRequestBody).reply(401, { + serviceErrorCode: 65601, + message: 'Unable to verify access token' + }) + + await expect( + testDestination.testAction('updateAudience', { + event, + settings: { + ad_account_id: '123', + send_email: true, + send_google_advertising_id: true + }, + useDefaultMappings: true, + auth, + mapping: { + personas_audience_key: 'personas_test_audience' + } + }) + ).rejects.toThrow(TokenPropagationRetryError) + }) + + it('should throw TokenPropagationRetryError when LinkedIn returns 401 with token propagation error code 65602', async () => { + nock(`${BASE_URL}/dmpSegments`) + .get(/.*/) + .query(() => true) + .reply(200, { elements: [{ id: 'dmp_segment_id' }] }) + nock(`${BASE_URL}/dmpSegments/dmp_segment_id/users`).post(/.*/, updateUsersRequestBody).reply(401, { + serviceErrorCode: 65602, + message: 'Unable to verify access token' + }) + + await expect( + testDestination.testAction('updateAudience', { + event, + settings: { + ad_account_id: '123', + send_email: true, + send_google_advertising_id: true + }, + useDefaultMappings: true, + auth, + mapping: { + personas_audience_key: 'personas_test_audience' + } + }) + ).rejects.toThrow(TokenPropagationRetryError) + }) + + it('should refresh token and throw RetryableError when LinkedIn returns 401+65601', async () => { + // LinkedIn returns 65601 for both propagation delays and revoked/expired tokens. + // The framework refreshes the token before throwing RetryableError so the retry + // always uses a fresh token regardless of which case triggered the 401. + nock(`${BASE_URL}/dmpSegments`) + .get(/.*/) + .query(() => true) + .reply(200, { elements: [{ id: 'dmp_segment_id' }] }) + nock(`${BASE_URL}/dmpSegments/dmp_segment_id/users`).post(/.*/).reply(401, { + serviceErrorCode: 65601, + message: 'Unable to verify access token' + }) + nock('https://www.linkedin.com') + .post('/oauth/v2/accessToken') + .reply(200, { access_token: 'fresh-token', expires_in: 5183944 }) + + await expect( + testDestination.onEvent(event, { + ad_account_id: '123', + send_email: true, + send_google_advertising_id: true, + subscription: { + subscribe: 'event = "Audience Entered" or event = "Audience Exited"', + partnerAction: 'updateAudience', + mapping: { + personas_audience_key: 'personas_test_audience', + source_segment_id: { '@path': '$.properties.audience_key' }, + email: { '@path': '$.context.traits.email' }, + google_advertising_id: { '@path': '$.context.device.advertisingId' }, + dmp_user_action: 'AUTO' + } + }, + oauth: { + access_token: 'old-token', + refresh_token: 'refresh-token', + clientId: 'test-client-id', + clientSecret: 'test-client-secret' + } + }) + ).rejects.toThrow(RetryableError) + }) }) }) diff --git a/packages/destination-actions/src/destinations/linkedin-conversions/constants.ts b/packages/destination-actions/src/destinations/linkedin-conversions/constants.ts index c7832faef55..0dd520ba6e0 100644 --- a/packages/destination-actions/src/destinations/linkedin-conversions/constants.ts +++ b/packages/destination-actions/src/destinations/linkedin-conversions/constants.ts @@ -5,6 +5,10 @@ export const LINKEDIN_API_VERSION = LINKEDIN_CONVERSIONS_API_VERSION export const BASE_URL = 'https://api.linkedin.com/rest' export const LINKEDIN_SOURCE_PLATFORM = 'SEGMENT' +// LinkedIn service error codes that indicate token propagation delays (eventual consistency). +// These 401s are not true revocations — the token is valid but not yet propagated. +export const LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES = [65601, 65602] + interface Choice { value: string | number label: string diff --git a/packages/destination-actions/src/destinations/linkedin-conversions/index.ts b/packages/destination-actions/src/destinations/linkedin-conversions/index.ts index b8040cc20b9..23158eead6a 100644 --- a/packages/destination-actions/src/destinations/linkedin-conversions/index.ts +++ b/packages/destination-actions/src/destinations/linkedin-conversions/index.ts @@ -1,9 +1,14 @@ import type { DestinationDefinition } from '@segment/actions-core' -import { InvalidAuthenticationError, IntegrationError, ErrorCodes } from '@segment/actions-core' +import { + InvalidAuthenticationError, + IntegrationError, + ErrorCodes, + TokenPropagationRetryError +} from '@segment/actions-core' import type { Settings } from './generated-types' import { LinkedInConversions } from './api' import type { LinkedInTestAuthenticationError, RefreshTokenResponse, LinkedInRefreshTokenError } from './types' -import { LINKEDIN_API_VERSION } from './constants' +import { LINKEDIN_API_VERSION, LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES } from './constants' import https from 'https' import streamConversion from './streamConversion' @@ -100,7 +105,21 @@ const destination: DestinationDefinition = { 'LinkedIn-Version': LINKEDIN_API_VERSION, 'X-Restli-Protocol-Version': `2.0.0` }, - agent + agent, + afterResponse: [ + (_request: unknown, _options: unknown, response: { status: number; data: unknown }) => { + if (response.status === 401) { + const body = response.data as Record | undefined + const serviceErrorCode = body?.serviceErrorCode as number | undefined + if (serviceErrorCode && LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES.includes(serviceErrorCode)) { + throw new TokenPropagationRetryError( + `LinkedIn eventual consistency: token not yet propagated (serviceErrorCode ${serviceErrorCode})` + ) + } + } + return response + } + ] } }, actions: { diff --git a/packages/destination-actions/src/destinations/linkedin-conversions/streamConversion/__tests__/index.test.ts b/packages/destination-actions/src/destinations/linkedin-conversions/streamConversion/__tests__/index.test.ts index d6cf990dc40..589f41df192 100644 --- a/packages/destination-actions/src/destinations/linkedin-conversions/streamConversion/__tests__/index.test.ts +++ b/packages/destination-actions/src/destinations/linkedin-conversions/streamConversion/__tests__/index.test.ts @@ -1,5 +1,10 @@ import nock from 'nock' -import { createTestEvent, createTestIntegration } from '@segment/actions-core' +import { + createTestEvent, + createTestIntegration, + TokenPropagationRetryError, + RetryableError +} from '@segment/actions-core' import { DynamicFieldResponse } from '@segment/actions-core' import { BASE_URL } from '../../constants' import Destination from '../../index' @@ -612,6 +617,138 @@ describe('LinkedinConversions.streamConversion', () => { ).rejects.toThrowError("User Info is missing the required field 'lastName'.") }) + it('should throw TokenPropagationRetryError when LinkedIn returns 401 with token propagation error code 65601', async () => { + nock(`${BASE_URL}/conversionEvents`).post(/.*/).reply(401, { + serviceErrorCode: 65601, + message: 'Unable to verify access token' + }) + + await expect( + testDestination.testAction('streamConversion', { + event, + settings, + mapping: { + email: { '@path': '$.context.traits.email' }, + conversionHappenedAt: { + '@path': '$.timestamp' + }, + onMappingSave: { + inputs: {}, + outputs: { + id: 789123 + } + }, + enable_batching: true, + batch_size: 5000 + } + }) + ).rejects.toThrow(TokenPropagationRetryError) + }) + + it('should throw TokenPropagationRetryError when LinkedIn returns 401 with token propagation error code 65602', async () => { + nock(`${BASE_URL}/conversionEvents`).post(/.*/).reply(401, { + serviceErrorCode: 65602, + message: 'Unable to verify access token' + }) + + await expect( + testDestination.testAction('streamConversion', { + event, + settings, + mapping: { + email: { '@path': '$.context.traits.email' }, + conversionHappenedAt: { + '@path': '$.timestamp' + }, + onMappingSave: { + inputs: {}, + outputs: { + id: 789123 + } + }, + enable_batching: true, + batch_size: 5000 + } + }) + ).rejects.toThrow(TokenPropagationRetryError) + }) + + it('should not throw TokenPropagationRetryError when LinkedIn returns 401 without a propagation error code', async () => { + nock(`${BASE_URL}/conversionEvents`).post(/.*/).reply(401, { + serviceErrorCode: 99999, + message: 'Unauthorized' + }) + + const error = await testDestination + .testAction('streamConversion', { + event, + settings, + mapping: { + email: { '@path': '$.context.traits.email' }, + conversionHappenedAt: { + '@path': '$.timestamp' + }, + onMappingSave: { + inputs: {}, + outputs: { + id: 789123 + } + }, + enable_batching: true, + batch_size: 5000 + } + }) + .catch((e) => e) + + expect(error).not.toBeInstanceOf(TokenPropagationRetryError) + }) + + it('should refresh token and throw RetryableError when LinkedIn returns 401+65601', async () => { + // LinkedIn returns 65601 for both propagation delays and revoked/expired tokens. + // The framework refreshes the token before throwing RetryableError so the retry + // always uses a fresh token regardless of which case triggered the 401. + nock(`${BASE_URL}/conversionEvents`).post(/.*/).reply(401, { + serviceErrorCode: 65601, + message: 'Unable to verify access token' + }) + nock('https://www.linkedin.com') + .post('/oauth/v2/accessToken') + .reply(200, { access_token: 'fresh-token', expires_in: 5183944 }) + + const originalClientId = process.env.ACTIONS_LINKEDIN_CONVERSIONS_CLIENT_ID + const originalClientSecret = process.env.ACTIONS_LINKEDIN_CONVERSIONS_CLIENT_SECRET + process.env.ACTIONS_LINKEDIN_CONVERSIONS_CLIENT_ID = 'test-client-id' + process.env.ACTIONS_LINKEDIN_CONVERSIONS_CLIENT_SECRET = 'test-client-secret' + + try { + await expect( + testDestination.onEvent(event, { + subscription: { + subscribe: 'type = "track"', + partnerAction: 'streamConversion', + mapping: { + email: { '@path': '$.context.traits.email' }, + conversionHappenedAt: { '@path': '$.timestamp' }, + onMappingSave: { + inputs: {}, + outputs: { id: 789123 } + }, + enable_batching: false, + batch_size: 5000 + } + }, + oauth: { + access_token: 'old-token', + refresh_token: 'refresh-token' + } + }) + ).rejects.toThrow(RetryableError) + } finally { + process.env.ACTIONS_LINKEDIN_CONVERSIONS_CLIENT_ID = originalClientId + process.env.ACTIONS_LINKEDIN_CONVERSIONS_CLIENT_SECRET = originalClientSecret + } + }) + it('should detect hashed email if feature flag for smart hashing is passed', async () => { nock(`${BASE_URL}/conversionEvents`) .post('', {