Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion packages/core/src/__tests__/destination-kit.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ErrorCodes, IntegrationError } from '../errors'
import { ErrorCodes, IntegrationError, TokenPropagationRetryError, RetryableError } from '../errors'
import { ActionDefinition, MultiStatusResponse } from '../destination-kit/action'
import {
StateContext,
Expand Down Expand Up @@ -1452,6 +1452,67 @@ describe('destination kit', () => {
])
expect(spy).toHaveBeenCalledTimes(0)
})

test('should refresh token and throw RetryableError(503) when TokenPropagationRetryError is thrown', async () => {
const refreshAccessToken = jest.fn().mockResolvedValue({ accessToken: 'fresh-token' })
const onTokenRefresh = jest.fn()

const destinationWithPropagationError: DestinationDefinition<JSONObject> = {
name: 'Test Propagation Error Destination',
mode: 'cloud',
authentication: {
...authentication,
refreshAccessToken
},
actions: {
customEvent: {
title: 'Send a Custom Event',
description: 'Send events to a custom event in API',
defaultSubscription: 'type = "track"',
fields: {
advertiserId: {
label: 'Advertiser ID',
description: 'Advertiser Id',
type: 'string',
required: true
}
},
perform: () => {
throw new TokenPropagationRetryError('Token not yet propagated (serviceErrorCode 65601)')
}
}
}
}

const destinationTest = new Destination(destinationWithPropagationError)
const testEvent: SegmentEvent = {
properties: { a: 'foo' },
userId: '3456fff',
type: 'track'
}
const testSettings = {
apiSecret: 'test_key',
subscription: {
subscribe: 'type = "track"',
partnerAction: 'customEvent',
mapping: {
advertiserId: '1231241241'
}
},
oauth: {
access_token: 'some-access-token',
refresh_token: 'refresh-token'
}
}

// handleError should refresh the token (expired tokens also return 65601) then
// throw RetryableError(503) so the retry uses a fresh token
const error = await destinationTest.onEvent(testEvent, testSettings, { onTokenRefresh }).catch((e) => e)
expect(error).toBeInstanceOf(RetryableError)
expect(error.status).toBe(503)
expect(refreshAccessToken).toHaveBeenCalledTimes(1)
expect(onTokenRefresh).toHaveBeenCalledWith({ accessToken: 'fresh-token' })
})
Comment on lines +1456 to +1515
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new test asserts refreshAccessToken is called when TokenPropagationRetryError is thrown, which conflicts with the PR description's test plan claim that refreshAccessToken is NOT called. Please reconcile the expected behavior: either adjust the test and implementation to avoid refresh on token-propagation retries, or update the PR description/test plan to reflect that a refresh is performed.

Copilot uses AI. Check for mistakes.
})
describe('onBatch', () => {
test('should refresh the access-token in case of Unauthorized(401)', async () => {
Expand Down
22 changes: 21 additions & 1 deletion packages/core/src/destination-kit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ import type {
ResultMultiStatusNode
} from './types'
import type { AllRequestOptions } from '../request-client'
import { ErrorCodes, IntegrationError, InvalidAuthenticationError, MultiStatusErrorReporter } from '../errors'
import {
ErrorCodes,
IntegrationError,
InvalidAuthenticationError,
MultiStatusErrorReporter,
TokenPropagationRetryError,
RetryableError
} from '../errors'
import { AuthTokens, getAuthData, getOAuth2Data, updateOAuthSettings } from './parse-settings'
import { InputData, Features } from '../mapping-kit'
import { retry } from '../retry'
Expand Down Expand Up @@ -1022,6 +1029,19 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
settings: JSONObject,
options?: OnEventOptions
): Promise<JSONObject> {
// Handle TokenPropagationRetryError: LinkedIn returns serviceErrorCode 65601 for both
// token propagation delays and revoked/expired tokens. Refresh the token first so the
// retry always uses a fresh token. If the refresh itself fails (e.g. the refresh token
// is also expired), that error propagates instead of retrying forever with a dead token.
// RetryableError(503) signals Segment infrastructure to retry after backoff.
if (error instanceof TokenPropagationRetryError) {
const isOAuth = this.authentication?.scheme === 'oauth2' || this.authentication?.scheme === 'oauth-managed'
if (isOAuth) {
await this.handleAuthError(settings, options)
}
throw new RetryableError(error.message, 503)
}
Comment on lines +1032 to +1043
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description says TokenPropagationRetryError is converted into RetryableError(503) without refreshing tokens, but handleError currently calls handleAuthError (refresh) before throwing RetryableError. Please align the implementation + tests with the intended behavior (either remove the refresh here, or update the PR description and the TokenPropagationRetryError docs to reflect the refresh-and-retry semantics).

Copilot uses AI. Check for mistakes.

const statusCode = (error as ResponseError).status ?? (error as HTTPError)?.response?.status ?? 500
const needsReauthentication =
statusCode === 401 &&
Expand Down
29 changes: 29 additions & 0 deletions packages/core/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,33 @@ export class APIError extends IntegrationError {
}
}

/**
* Error indicating a 401 caused by OAuth token propagation delay (eventual
* consistency) rather than true revocation. The token was recently refreshed
* and is valid but has not yet propagated across all provider nodes.
*
* The framework converts this into a RetryableError(503) so Segment
* infrastructure retries the event after a backoff delay, by which time
* the token will have propagated. No additional token refresh is performed
* on the exception-throwing path — handleError intercepts this error before
* the OAuth re-authentication logic runs.
*
* This error is currently thrown by LinkedIn destination hooks when
* serviceErrorCode 65601 or 65602 appears in a 401 response body. Reuse for
* other providers should only occur after confirming the same
* retry-without-refresh semantic applies.
*/
export class TokenPropagationRetryError extends CustomError {
// Use 503 to match the RetryableError this converts into — do NOT use 401,
// which would cause generic status-based 401 handlers to trigger a token refresh.
Comment on lines +108 to +119
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring says no additional token refresh is performed and that handleError intercepts this error before OAuth refresh logic runs, but Destination.handleError now explicitly calls handleAuthError (refresh) when it sees TokenPropagationRetryError. Please update this documentation (or adjust the implementation) so the behavior matches what the comment describes.

Suggested change
* the token will have propagated. No additional token refresh is performed
* on the exception-throwing path handleError intercepts this error before
* the OAuth re-authentication logic runs.
*
* This error is currently thrown by LinkedIn destination hooks when
* serviceErrorCode 65601 or 65602 appears in a 401 response body. Reuse for
* other providers should only occur after confirming the same
* retry-without-refresh semantic applies.
*/
export class TokenPropagationRetryError extends CustomError {
// Use 503 to match the RetryableError this converts into — do NOT use 401,
// which would cause generic status-based 401 handlers to trigger a token refresh.
* the token will typically have propagated. This error is intended to model
* a transient, retryable propagation delay rather than a permanent invalid-
* credentials condition. Framework auth-error handling may still run as part
* of the normal exception path, so callers should not rely on this error type
* to suppress OAuth refresh logic.
*
* This error is currently thrown by LinkedIn destination hooks when
* serviceErrorCode 65601 or 65602 appears in a 401 response body. Reuse for
* other providers should only occur after confirming the same
* transient retry semantic applies.
*/
export class TokenPropagationRetryError extends CustomError {
// Use 503 to match the RetryableError this converts into and to represent
// a transient retryable condition instead of a standard invalid-auth 401.

Copilot uses AI. Check for mistakes.
status = 503
code = ErrorCodes.TOKEN_PROPAGATION_RETRY

constructor(message = 'Token not yet propagated, retry later') {
super(message)
}
}
Comment thread
harsh-joshi99 marked this conversation as resolved.

/**
* Error to indicate the destination has gone over its allotted execution time
* and is self-terminating.
Expand Down Expand Up @@ -207,6 +234,8 @@ export enum CustomErrorCodes {
GET_AUDIENCE_FAILED = 'GET_AUDIENCE_FAILED',
// When the RETL onMappingSave hook fails
RETL_ON_MAPPING_SAVE_FAILED = 'RETL_ON_MAPPING_SAVE_FAILED',
// Retry later due to OAuth token propagation delay
TOKEN_PROPAGATION_RETRY = 'TOKEN_PROPAGATION_RETRY',
// Fallback error code if no other error code matches
UNKNOWN_ERROR = 'UNKNOWN_ERROR'
}
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export {
RetryableError,
PayloadValidationError,
SelfTimeoutError,
TokenPropagationRetryError,
APIError,
ErrorCodes,
HttpErrorCodes,
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ export async function retry<T>(
return response
} catch (error) {
if (options?.onFailedAttempt) {
// IMPORTANT: if onFailedAttempt throws, that exception propagates immediately
// and the retry loop terminates — no further attempts are made. handleError relies
// on this behavior to convert TokenPropagationRetryError into a RetryableError
// that escapes to Segment infrastructure rather than being retried in-process.
await options.onFailedAttempt(error, attemptCount)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ import { LINKEDIN_AUDIENCES_API_VERSION } from './versioning-info'
export const LINKEDIN_API_VERSION = LINKEDIN_AUDIENCES_API_VERSION
export const BASE_URL = 'https://api.linkedin.com/rest'
export const LINKEDIN_SOURCE_PLATFORM = 'SEGMENT'

// LinkedIn service error codes that indicate token propagation delays (eventual consistency).
// These 401s are not true revocations — the token is valid but not yet propagated.
export const LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES = [65601, 65602]
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import https from 'https'

import type { DestinationDefinition, ModifiedResponse } from '@segment/actions-core'
import { InvalidAuthenticationError, IntegrationError, ErrorCodes } from '@segment/actions-core'
import {
InvalidAuthenticationError,
IntegrationError,
ErrorCodes,
TokenPropagationRetryError
} from '@segment/actions-core'

import type { Settings } from './generated-types'
import updateAudience from './updateAudience'
import { LINKEDIN_API_VERSION } from './constants'
import { LINKEDIN_API_VERSION, LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES } from './constants'
import { LinkedInAudiences } from './api'
import type {
RefreshTokenResponse,
Expand Down Expand Up @@ -148,7 +153,21 @@ const destination: DestinationDefinition<Settings> = {
authorization: `Bearer ${auth?.accessToken}`,
'LinkedIn-Version': LINKEDIN_API_VERSION
},
agent
agent,
afterResponse: [
(_request: unknown, _options: unknown, response: { status: number; data: unknown }) => {
if (response.status === 401) {
const body = response.data as Record<string, unknown> | undefined
const serviceErrorCode = body?.serviceErrorCode as number | undefined
if (serviceErrorCode && LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES.includes(serviceErrorCode)) {
throw new TokenPropagationRetryError(
`LinkedIn eventual consistency: token not yet propagated (serviceErrorCode ${serviceErrorCode})`
)
}
}
return response
}
]
}
},

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import nock from 'nock'
import { createTestEvent, createTestIntegration } from '@segment/actions-core'
import {
createTestEvent,
createTestIntegration,
TokenPropagationRetryError,
RetryableError
} from '@segment/actions-core'
import Destination from '../../index'
import { BASE_URL, LINKEDIN_SOURCE_PLATFORM } from '../../constants'

Expand Down Expand Up @@ -864,5 +869,101 @@ describe('LinkedinAudiences.updateAudience', () => {
})
).rejects.toThrow('The value of `source_segment_id` and `personas_audience_key` must match.')
})

it('should throw TokenPropagationRetryError when LinkedIn returns 401 with token propagation error code 65601', async () => {
nock(`${BASE_URL}/dmpSegments`)
.get(/.*/)
.query(() => true)
.reply(200, { elements: [{ id: 'dmp_segment_id' }] })
nock(`${BASE_URL}/dmpSegments/dmp_segment_id/users`).post(/.*/, updateUsersRequestBody).reply(401, {
serviceErrorCode: 65601,
message: 'Unable to verify access token'
})

await expect(
testDestination.testAction('updateAudience', {
event,
settings: {
ad_account_id: '123',
send_email: true,
send_google_advertising_id: true
},
useDefaultMappings: true,
auth,
mapping: {
personas_audience_key: 'personas_test_audience'
}
})
).rejects.toThrow(TokenPropagationRetryError)
})

it('should throw TokenPropagationRetryError when LinkedIn returns 401 with token propagation error code 65602', async () => {
nock(`${BASE_URL}/dmpSegments`)
.get(/.*/)
.query(() => true)
.reply(200, { elements: [{ id: 'dmp_segment_id' }] })
nock(`${BASE_URL}/dmpSegments/dmp_segment_id/users`).post(/.*/, updateUsersRequestBody).reply(401, {
serviceErrorCode: 65602,
message: 'Unable to verify access token'
})

await expect(
testDestination.testAction('updateAudience', {
event,
settings: {
ad_account_id: '123',
send_email: true,
send_google_advertising_id: true
},
useDefaultMappings: true,
auth,
mapping: {
personas_audience_key: 'personas_test_audience'
}
})
).rejects.toThrow(TokenPropagationRetryError)
})

it('should refresh token and throw RetryableError when LinkedIn returns 401+65601', async () => {
// LinkedIn returns 65601 for both propagation delays and revoked/expired tokens.
// The framework refreshes the token before throwing RetryableError so the retry
// always uses a fresh token regardless of which case triggered the 401.
nock(`${BASE_URL}/dmpSegments`)
.get(/.*/)
.query(() => true)
.reply(200, { elements: [{ id: 'dmp_segment_id' }] })
nock(`${BASE_URL}/dmpSegments/dmp_segment_id/users`).post(/.*/).reply(401, {
serviceErrorCode: 65601,
message: 'Unable to verify access token'
})
nock('https://www.linkedin.com')
.post('/oauth/v2/accessToken')
.reply(200, { access_token: 'fresh-token', expires_in: 5183944 })

await expect(
testDestination.onEvent(event, {
ad_account_id: '123',
send_email: true,
send_google_advertising_id: true,
subscription: {
subscribe: 'event = "Audience Entered" or event = "Audience Exited"',
partnerAction: 'updateAudience',
mapping: {
personas_audience_key: 'personas_test_audience',
source_segment_id: { '@path': '$.properties.audience_key' },
email: { '@path': '$.context.traits.email' },
google_advertising_id: { '@path': '$.context.device.advertisingId' },
dmp_user_action: 'AUTO'
}
},
oauth: {
access_token: 'old-token',
refresh_token: 'refresh-token',
clientId: 'test-client-id',
clientSecret: 'test-client-secret'
}
})
).rejects.toThrow(RetryableError)
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ export const LINKEDIN_API_VERSION = LINKEDIN_CONVERSIONS_API_VERSION
export const BASE_URL = 'https://api.linkedin.com/rest'
export const LINKEDIN_SOURCE_PLATFORM = 'SEGMENT'

// LinkedIn service error codes that indicate token propagation delays (eventual consistency).
// These 401s are not true revocations — the token is valid but not yet propagated.
export const LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES = [65601, 65602]

interface Choice {
value: string | number
label: string
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import type { DestinationDefinition } from '@segment/actions-core'
import { InvalidAuthenticationError, IntegrationError, ErrorCodes } from '@segment/actions-core'
import {
InvalidAuthenticationError,
IntegrationError,
ErrorCodes,
TokenPropagationRetryError
} from '@segment/actions-core'
import type { Settings } from './generated-types'
import { LinkedInConversions } from './api'
import type { LinkedInTestAuthenticationError, RefreshTokenResponse, LinkedInRefreshTokenError } from './types'
import { LINKEDIN_API_VERSION } from './constants'
import { LINKEDIN_API_VERSION, LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES } from './constants'
import https from 'https'
import streamConversion from './streamConversion'

Expand Down Expand Up @@ -100,7 +105,21 @@ const destination: DestinationDefinition<Settings> = {
'LinkedIn-Version': LINKEDIN_API_VERSION,
'X-Restli-Protocol-Version': `2.0.0`
},
agent
agent,
afterResponse: [
(_request: unknown, _options: unknown, response: { status: number; data: unknown }) => {
if (response.status === 401) {
const body = response.data as Record<string, unknown> | undefined
const serviceErrorCode = body?.serviceErrorCode as number | undefined
if (serviceErrorCode && LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES.includes(serviceErrorCode)) {
throw new TokenPropagationRetryError(
`LinkedIn eventual consistency: token not yet propagated (serviceErrorCode ${serviceErrorCode})`
)
}
}
return response
}
]
Comment on lines +108 to +122
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extendRequest is typed to return RequestOptions (see packages/core/src/destination-kit/types.ts:348), but RequestOptions does not include afterResponse. Adding afterResponse here will fail TypeScript compilation (and is currently contrary to create-request-client.ts’s note about not exposing hooks). Consider either (a) updating the core types to allow afterResponse in RequestExtension (e.g., return AllRequestOptions), or (b) moving this logic into the action/API layer by disabling throwHttpErrors and throwing TokenPropagationRetryError after inspecting the response in the calling code.

Copilot uses AI. Check for mistakes.
}
},
actions: {
Expand Down
Loading
Loading