Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion packages/core/src/__tests__/destination-kit.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ErrorCodes, IntegrationError } from '../errors'
import { ErrorCodes, IntegrationError, RefreshTokenAndRetryError, RetryableError } from '../errors'
import { ActionDefinition, MultiStatusResponse } from '../destination-kit/action'
import {
StateContext,
Expand Down Expand Up @@ -1452,6 +1452,55 @@ describe('destination kit', () => {
])
expect(spy).toHaveBeenCalledTimes(0)
})

test('should throw RetryableError without token refresh when RefreshTokenAndRetryError is thrown', async () => {
const destinationWithPropagationError: DestinationDefinition<JSONObject> = {
name: 'Test Propagation Error Destination',
mode: 'cloud',
authentication: authentication,
actions: {
customEvent: {
title: 'Send a Custom Event',
description: 'Send events to a custom event in API',
defaultSubscription: 'type = "track"',
fields: {
advertiserId: {
label: 'Advertiser ID',
description: 'Advertiser Id',
type: 'string',
required: true
}
},
perform: () => {
throw new RefreshTokenAndRetryError('Token not yet propagated (serviceErrorCode 65601)')
}
}
}
}

const destinationTest = new Destination(destinationWithPropagationError)
const testEvent: SegmentEvent = {
properties: { a: 'foo' },
userId: '3456fff',
type: 'track'
}
const testSettings = {
apiSecret: 'test_key',
subscription: {
subscribe: 'type = "track"',
partnerAction: 'customEvent',
mapping: {
advertiserId: '1231241241'
}
},
oauth: {
access_token: 'some-access-token',
refresh_token: 'refresh-token'
}
}
// handleError should throw RetryableError without calling refresh (token is already fresh)
Comment thread
harsh-joshi99 marked this conversation as resolved.
Outdated
await expect(destinationTest.onEvent(testEvent, testSettings)).rejects.toThrow(RetryableError)
})
Comment on lines +1456 to +1515
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new test asserts refreshAccessToken is called when TokenPropagationRetryError is thrown, which conflicts with the PR description's test plan claim that refreshAccessToken is NOT called. Please reconcile the expected behavior: either adjust the test and implementation to avoid refresh on token-propagation retries, or update the PR description/test plan to reflect that a refresh is performed.

Copilot uses AI. Check for mistakes.
})
describe('onBatch', () => {
test('should refresh the access-token in case of Unauthorized(401)', async () => {
Expand Down
15 changes: 14 additions & 1 deletion packages/core/src/destination-kit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ import type {
ResultMultiStatusNode
} from './types'
import type { AllRequestOptions } from '../request-client'
import { ErrorCodes, IntegrationError, InvalidAuthenticationError, MultiStatusErrorReporter } from '../errors'
import {
ErrorCodes,
IntegrationError,
InvalidAuthenticationError,
MultiStatusErrorReporter,
RefreshTokenAndRetryError,
RetryableError
} from '../errors'
import { AuthTokens, getAuthData, getOAuth2Data, updateOAuthSettings } from './parse-settings'
import { InputData, Features } from '../mapping-kit'
import { retry } from '../retry'
Expand Down Expand Up @@ -1022,6 +1029,12 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
settings: JSONObject,
options?: OnEventOptions
): Promise<JSONObject> {
// Handle RefreshTokenAndRetryError: token was just refreshed but hasn't propagated yet.
// Just retry later — no need to refresh again.
if (error instanceof RefreshTokenAndRetryError) {
throw new RetryableError(error.message, 503)
}
Comment thread
harsh-joshi99 marked this conversation as resolved.
Outdated
Comment on lines +1032 to +1043
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description says TokenPropagationRetryError is converted into RetryableError(503) without refreshing tokens, but handleError currently calls handleAuthError (refresh) before throwing RetryableError. Please align the implementation + tests with the intended behavior (either remove the refresh here, or update the PR description and the TokenPropagationRetryError docs to reflect the refresh-and-retry semantics).

Copilot uses AI. Check for mistakes.

const statusCode = (error as ResponseError).status ?? (error as HTTPError)?.response?.status ?? 500
const needsReauthentication =
statusCode === 401 &&
Expand Down
21 changes: 21 additions & 0 deletions packages/core/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,25 @@ export class APIError extends IntegrationError {
}
}

/**
* Error indicating a 401 caused by OAuth token propagation delay (eventual
* consistency) rather than true revocation. The token was recently refreshed
* and is valid but has not yet propagated across all provider nodes.
*
* The framework converts this into a RetryableError(503) so Segment
* infrastructure retries the event after a backoff delay, by which time
* the token will have propagated. No additional token refresh is performed
* since the token is already fresh.
*/
export class RefreshTokenAndRetryError extends CustomError {
status = 401
code = ErrorCodes.REFRESH_AND_RETRY

constructor(message = 'Token refresh required with retry') {
super(message)
}
}
Comment thread
harsh-joshi99 marked this conversation as resolved.

/**
* Error to indicate the destination has gone over its allotted execution time
* and is self-terminating.
Expand Down Expand Up @@ -207,6 +226,8 @@ export enum CustomErrorCodes {
GET_AUDIENCE_FAILED = 'GET_AUDIENCE_FAILED',
// When the RETL onMappingSave hook fails
RETL_ON_MAPPING_SAVE_FAILED = 'RETL_ON_MAPPING_SAVE_FAILED',
// Refresh the OAuth token and then retry via Segment infrastructure
Comment thread
harsh-joshi99 marked this conversation as resolved.
Outdated
REFRESH_AND_RETRY = 'REFRESH_AND_RETRY',
// Fallback error code if no other error code matches
UNKNOWN_ERROR = 'UNKNOWN_ERROR'
}
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export {
RetryableError,
PayloadValidationError,
SelfTimeoutError,
RefreshTokenAndRetryError,
APIError,
ErrorCodes,
HttpErrorCodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ import { LINKEDIN_AUDIENCES_API_VERSION } from './versioning-info'
export const LINKEDIN_API_VERSION = LINKEDIN_AUDIENCES_API_VERSION
export const BASE_URL = 'https://api.linkedin.com/rest'
export const LINKEDIN_SOURCE_PLATFORM = 'SEGMENT'

// LinkedIn service error codes that indicate token propagation delays (eventual consistency).
// These 401s are not true revocations — the token is valid but not yet propagated.
export const LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES = [65601, 65602]
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import https from 'https'

import type { DestinationDefinition, ModifiedResponse } from '@segment/actions-core'
import { InvalidAuthenticationError, IntegrationError, ErrorCodes } from '@segment/actions-core'
import {
InvalidAuthenticationError,
IntegrationError,
ErrorCodes,
RefreshTokenAndRetryError
} from '@segment/actions-core'

import type { Settings } from './generated-types'
import updateAudience from './updateAudience'
import { LINKEDIN_API_VERSION } from './constants'
import { LINKEDIN_API_VERSION, LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES } from './constants'
import { LinkedInAudiences } from './api'
import type {
RefreshTokenResponse,
Expand Down Expand Up @@ -148,7 +153,21 @@ const destination: DestinationDefinition<Settings> = {
authorization: `Bearer ${auth?.accessToken}`,
'LinkedIn-Version': LINKEDIN_API_VERSION
},
agent
agent,
afterResponse: [
(_request: unknown, _options: unknown, response: { status: number; data: unknown }) => {
if (response.status === 401) {
const body = response.data as Record<string, unknown> | undefined
const serviceErrorCode = body?.serviceErrorCode as number | undefined
if (serviceErrorCode && LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES.includes(serviceErrorCode)) {
throw new RefreshTokenAndRetryError(
`LinkedIn eventual consistency: token not yet propagated (serviceErrorCode ${serviceErrorCode})`
)
}
}
return response
}
]
}
Comment on lines +151 to 171
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extendRequest currently returns RequestOptions, which does not support an afterResponse hook. This will not type-check and likely won’t build. Either adjust the core typing/behavior to accept afterResponse (return AllRequestOptions) or implement the 401+serviceErrorCode detection where you already have a ModifiedResponse (e.g., in the API wrapper methods) without relying on request-client hooks.

Copilot uses AI. Check for mistakes.
},
Comment on lines +146 to 172
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extendRequest is typed to return RequestOptions (via RequestExtension in core), which does not include afterResponse. Adding afterResponse here will fail TypeScript excess-property checks. Either update the core type to allow AllRequestOptions/hooks for extendRequest, or relocate this response-inspection logic to a supported layer.

Copilot uses AI. Check for mistakes.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import nock from 'nock'
import { createTestEvent, createTestIntegration } from '@segment/actions-core'
import { createTestEvent, createTestIntegration, RefreshTokenAndRetryError } from '@segment/actions-core'
import Destination from '../../index'
import { BASE_URL, LINKEDIN_SOURCE_PLATFORM } from '../../constants'

Expand Down Expand Up @@ -864,5 +864,32 @@ describe('LinkedinAudiences.updateAudience', () => {
})
).rejects.toThrow('The value of `source_segment_id` and `personas_audience_key` must match.')
})

it('should throw RefreshTokenAndRetryError when LinkedIn returns 401 with token propagation error code', async () => {
nock(`${BASE_URL}/dmpSegments`)
.get(/.*/)
.query(() => true)
.reply(200, { elements: [{ id: 'dmp_segment_id' }] })
nock(`${BASE_URL}/dmpSegments/dmp_segment_id/users`).post(/.*/, updateUsersRequestBody).reply(401, {
serviceErrorCode: 65601,
message: 'Unable to verify access token'
})

await expect(
testDestination.testAction('updateAudience', {
event,
settings: {
ad_account_id: '123',
send_email: true,
send_google_advertising_id: true
},
useDefaultMappings: true,
auth,
mapping: {
personas_audience_key: 'personas_test_audience'
}
})
).rejects.toThrow(RefreshTokenAndRetryError)
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ export const LINKEDIN_API_VERSION = LINKEDIN_CONVERSIONS_API_VERSION
export const BASE_URL = 'https://api.linkedin.com/rest'
export const LINKEDIN_SOURCE_PLATFORM = 'SEGMENT'

// LinkedIn service error codes that indicate token propagation delays (eventual consistency).
// These 401s are not true revocations — the token is valid but not yet propagated.
export const LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES = [65601, 65602]

interface Choice {
value: string | number
label: string
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import type { DestinationDefinition } from '@segment/actions-core'
import { InvalidAuthenticationError, IntegrationError, ErrorCodes } from '@segment/actions-core'
import {
InvalidAuthenticationError,
IntegrationError,
ErrorCodes,
RefreshTokenAndRetryError
} from '@segment/actions-core'
import type { Settings } from './generated-types'
import { LinkedInConversions } from './api'
import type { LinkedInTestAuthenticationError, RefreshTokenResponse, LinkedInRefreshTokenError } from './types'
import { LINKEDIN_API_VERSION } from './constants'
import { LINKEDIN_API_VERSION, LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES } from './constants'
import https from 'https'
import streamConversion from './streamConversion'

Expand Down Expand Up @@ -100,7 +105,21 @@ const destination: DestinationDefinition<Settings> = {
'LinkedIn-Version': LINKEDIN_API_VERSION,
'X-Restli-Protocol-Version': `2.0.0`
},
agent
agent,
afterResponse: [
(_request: unknown, _options: unknown, response: { status: number; data: unknown }) => {
if (response.status === 401) {
const body = response.data as Record<string, unknown> | undefined
const serviceErrorCode = body?.serviceErrorCode as number | undefined
if (serviceErrorCode && LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES.includes(serviceErrorCode)) {
throw new RefreshTokenAndRetryError(
`LinkedIn eventual consistency: token not yet propagated (serviceErrorCode ${serviceErrorCode})`
)
}
}
return response
}
]
Comment on lines +108 to +122
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extendRequest is typed to return RequestOptions (see packages/core/src/destination-kit/types.ts:348), but RequestOptions does not include afterResponse. Adding afterResponse here will fail TypeScript compilation (and is currently contrary to create-request-client.ts’s note about not exposing hooks). Consider either (a) updating the core types to allow afterResponse in RequestExtension (e.g., return AllRequestOptions), or (b) moving this logic into the action/API layer by disabling throwHttpErrors and throwing TokenPropagationRetryError after inspecting the response in the calling code.

Copilot uses AI. Check for mistakes.
}
Comment on lines +96 to 123
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extendRequest is contextually typed to return RequestOptions (see packages/core/src/destination-kit/types.ts), which does not include an afterResponse field. Returning an object literal with afterResponse here will fail TypeScript excess-property checking. To fix, update the core RequestExtension/extendRequest return type to allow AllRequestOptions (or a variant that includes hook arrays), or move this logic into a supported hook point (e.g., action-level request handling).

Copilot uses AI. Check for mistakes.
},
actions: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import nock from 'nock'
import { createTestEvent, createTestIntegration } from '@segment/actions-core'
import {
createTestEvent,
createTestIntegration,
RefreshTokenAndRetryError,
RetryableError
} from '@segment/actions-core'
import { DynamicFieldResponse } from '@segment/actions-core'
import { BASE_URL } from '../../constants'
import Destination from '../../index'
Expand Down Expand Up @@ -612,6 +617,67 @@ describe('LinkedinConversions.streamConversion', () => {
).rejects.toThrowError("User Info is missing the required field 'lastName'.")
})

it('should throw RefreshTokenAndRetryError when LinkedIn returns 401 with token propagation error code', async () => {
nock(`${BASE_URL}/conversionEvents`).post(/.*/).reply(401, {
serviceErrorCode: 65601,
message: 'Unable to verify access token'
})

await expect(
testDestination.testAction('streamConversion', {
event,
settings,
mapping: {
email: { '@path': '$.context.traits.email' },
conversionHappenedAt: {
'@path': '$.timestamp'
},
onMappingSave: {
inputs: {},
outputs: {
id: 789123
}
},
enable_batching: true,
batch_size: 5000
}
})
).rejects.toThrow(RefreshTokenAndRetryError)
})

it('should throw RetryableError for the full propagation-delay flow without refreshing token', async () => {
// Simulate a fresh token that hasn't propagated yet:
// the conversion call returns 401+65601, the framework throws RetryableError
// so Segment infrastructure retries later — no token refresh needed.
nock(`${BASE_URL}/conversionEvents`).post(/.*/).reply(401, {
serviceErrorCode: 65601,
message: 'Unable to verify access token'
})

await expect(
testDestination.onEvent(event, {
subscription: {
subscribe: 'type = "track"',
partnerAction: 'streamConversion',
mapping: {
email: { '@path': '$.context.traits.email' },
conversionHappenedAt: { '@path': '$.timestamp' },
onMappingSave: {
inputs: {},
outputs: { id: 789123 }
},
enable_batching: false,
batch_size: 5000
}
},
oauth: {
access_token: 'old-not-yet-propagated-token',
refresh_token: 'refresh-token'
}
})
).rejects.toThrow(RetryableError)
})

it('should detect hashed email if feature flag for smart hashing is passed', async () => {
nock(`${BASE_URL}/conversionEvents`)
.post('', {
Expand Down
Loading