Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion packages/core/src/__tests__/destination-kit.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ErrorCodes, IntegrationError } from '../errors'
import { ErrorCodes, IntegrationError, RefreshTokenAndRetryError, RetryableError } from '../errors'
import { ActionDefinition, MultiStatusResponse } from '../destination-kit/action'
import {
StateContext,
Expand Down Expand Up @@ -1452,6 +1452,67 @@ describe('destination kit', () => {
])
expect(spy).toHaveBeenCalledTimes(0)
})

test('should refresh token and throw RetryableError when RefreshTokenAndRetryError is thrown', async () => {
const destinationWithPropagationError: DestinationDefinition<JSONObject> = {
name: 'Test Propagation Error Destination',
Comment thread
harsh-joshi99 marked this conversation as resolved.
Outdated
mode: 'cloud',
authentication: authentication,
actions: {
customEvent: {
title: 'Send a Custom Event',
description: 'Send events to a custom event in API',
defaultSubscription: 'type = "track"',
fields: {
advertiserId: {
label: 'Advertiser ID',
description: 'Advertiser Id',
type: 'string',
required: true
}
},
perform: () => {
throw new RefreshTokenAndRetryError('Token not yet propagated (serviceErrorCode 65601)')
}
}
}
}

const destinationTest = new Destination(destinationWithPropagationError)
const testEvent: SegmentEvent = {
properties: { a: 'foo' },
userId: '3456fff',
type: 'track'
}
const testSettings = {
apiSecret: 'test_key',
subscription: {
subscribe: 'type = "track"',
partnerAction: 'customEvent',
mapping: {
advertiserId: '1231241241'
}
},
oauth: {
access_token: 'some-access-token',
refresh_token: 'refresh-token'
}
}
const eventOptions = {
onTokenRefresh: async (_tokens: RefreshAccessTokenResult) => {
jest.fn(() => Promise.resolve())
}
Comment thread
harsh-joshi99 marked this conversation as resolved.
Outdated
}

const refreshTokenSpy = jest.spyOn(authentication, 'refreshAccessToken')
const onTokenRefreshSpy = jest.spyOn(eventOptions, 'onTokenRefresh')

// handleError should refresh the token and then throw RetryableError
await expect(destinationTest.onEvent(testEvent, testSettings, eventOptions)).rejects.toThrow(RetryableError)
// The perform was called once, then handleError refreshed the token and threw RetryableError
expect(refreshTokenSpy).toHaveBeenCalledTimes(1)
expect(onTokenRefreshSpy).toHaveBeenCalledTimes(1)
})
Comment thread
harsh-joshi99 marked this conversation as resolved.
})
describe('onBatch', () => {
test('should refresh the access-token in case of Unauthorized(401)', async () => {
Expand Down
16 changes: 15 additions & 1 deletion packages/core/src/destination-kit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ import type {
ResultMultiStatusNode
} from './types'
import type { AllRequestOptions } from '../request-client'
import { ErrorCodes, IntegrationError, InvalidAuthenticationError, MultiStatusErrorReporter } from '../errors'
import {
ErrorCodes,
IntegrationError,
InvalidAuthenticationError,
MultiStatusErrorReporter,
RefreshTokenAndRetryError,
RetryableError
} from '../errors'
import { AuthTokens, getAuthData, getOAuth2Data, updateOAuthSettings } from './parse-settings'
import { InputData, Features } from '../mapping-kit'
import { retry } from '../retry'
Expand Down Expand Up @@ -1022,6 +1029,13 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
settings: JSONObject,
options?: OnEventOptions
): Promise<JSONObject> {
// Handle RefreshTokenAndRetryError: refresh the token, then throw RetryableError
// so Segment infrastructure retries later (after token propagation)
if (error instanceof RefreshTokenAndRetryError) {
await this.handleAuthError(settings, options)
throw new RetryableError(error.message, 503)
}
Comment thread
harsh-joshi99 marked this conversation as resolved.

const statusCode = (error as ResponseError).status ?? (error as HTTPError)?.response?.status ?? 500
const needsReauthentication =
statusCode === 401 &&
Expand Down
18 changes: 18 additions & 0 deletions packages/core/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ export class APIError extends IntegrationError {
}
}

/**
* Error that signals the framework to refresh the OAuth token AND then
* throw a RetryableError. Use this when a provider's 401 may be caused
* by eventual consistency (token not yet propagated) rather than true
* revocation. The token refresh covers the revocation case; the retry
* covers the propagation delay case.
Comment thread
harsh-joshi99 marked this conversation as resolved.
Outdated
*/
export class RefreshTokenAndRetryError extends CustomError {
status = 401
code = ErrorCodes.REFRESH_AND_RETRY

constructor(message = 'Token refresh required with retry') {
super(message)
}
}
Comment thread
harsh-joshi99 marked this conversation as resolved.

/**
* Error to indicate the destination has gone over its allotted execution time
* and is self-terminating.
Expand Down Expand Up @@ -207,6 +223,8 @@ export enum CustomErrorCodes {
GET_AUDIENCE_FAILED = 'GET_AUDIENCE_FAILED',
// When the RETL onMappingSave hook fails
RETL_ON_MAPPING_SAVE_FAILED = 'RETL_ON_MAPPING_SAVE_FAILED',
// Refresh the OAuth token and then retry via Segment infrastructure
Comment thread
harsh-joshi99 marked this conversation as resolved.
Outdated
REFRESH_AND_RETRY = 'REFRESH_AND_RETRY',
// Fallback error code if no other error code matches
UNKNOWN_ERROR = 'UNKNOWN_ERROR'
}
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export {
RetryableError,
PayloadValidationError,
SelfTimeoutError,
RefreshTokenAndRetryError,
APIError,
ErrorCodes,
HttpErrorCodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ import { LINKEDIN_AUDIENCES_API_VERSION } from './versioning-info'
export const LINKEDIN_API_VERSION = LINKEDIN_AUDIENCES_API_VERSION
export const BASE_URL = 'https://api.linkedin.com/rest'
export const LINKEDIN_SOURCE_PLATFORM = 'SEGMENT'

// LinkedIn service error codes that indicate token propagation delays (eventual consistency).
// These 401s are not true revocations — the token is valid but not yet propagated.
export const LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES = [65601, 65602]
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import https from 'https'

import type { DestinationDefinition, ModifiedResponse } from '@segment/actions-core'
import { InvalidAuthenticationError, IntegrationError, ErrorCodes } from '@segment/actions-core'
import {
InvalidAuthenticationError,
IntegrationError,
ErrorCodes,
RefreshTokenAndRetryError
} from '@segment/actions-core'

import type { Settings } from './generated-types'
import updateAudience from './updateAudience'
import { LINKEDIN_API_VERSION } from './constants'
import { LINKEDIN_API_VERSION, LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES } from './constants'
import { LinkedInAudiences } from './api'
import type {
RefreshTokenResponse,
Expand Down Expand Up @@ -148,7 +153,21 @@ const destination: DestinationDefinition<Settings> = {
authorization: `Bearer ${auth?.accessToken}`,
'LinkedIn-Version': LINKEDIN_API_VERSION
},
agent
agent,
afterResponse: [
(_request: unknown, _options: unknown, response: { status: number; data: unknown }) => {
if (response.status === 401) {
const body = response.data as Record<string, unknown> | undefined
const serviceErrorCode = body?.serviceErrorCode as number | undefined
if (serviceErrorCode && LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES.includes(serviceErrorCode)) {
throw new RefreshTokenAndRetryError(
`LinkedIn eventual consistency: token not yet propagated (serviceErrorCode ${serviceErrorCode})`
)
}
}
return response
}
]
}
Comment thread
harsh-joshi99 marked this conversation as resolved.
},
Comment thread
harsh-joshi99 marked this conversation as resolved.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import nock from 'nock'
import { createTestEvent, createTestIntegration } from '@segment/actions-core'
import { createTestEvent, createTestIntegration, RefreshTokenAndRetryError } from '@segment/actions-core'
import Destination from '../../index'
import { BASE_URL, LINKEDIN_SOURCE_PLATFORM } from '../../constants'

Expand Down Expand Up @@ -864,5 +864,32 @@ describe('LinkedinAudiences.updateAudience', () => {
})
).rejects.toThrow('The value of `source_segment_id` and `personas_audience_key` must match.')
})

it('should throw RefreshTokenAndRetryError when LinkedIn returns 401 with token propagation error code', async () => {
nock(`${BASE_URL}/dmpSegments`)
.get(/.*/)
.query(() => true)
.reply(200, { elements: [{ id: 'dmp_segment_id' }] })
nock(`${BASE_URL}/dmpSegments/dmp_segment_id/users`).post(/.*/, updateUsersRequestBody).reply(401, {
serviceErrorCode: 65601,
message: 'Unable to verify access token'
})

await expect(
testDestination.testAction('updateAudience', {
event,
settings: {
ad_account_id: '123',
send_email: true,
send_google_advertising_id: true
},
useDefaultMappings: true,
auth,
mapping: {
personas_audience_key: 'personas_test_audience'
}
})
).rejects.toThrow(RefreshTokenAndRetryError)
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ export const LINKEDIN_API_VERSION = LINKEDIN_CONVERSIONS_API_VERSION
export const BASE_URL = 'https://api.linkedin.com/rest'
export const LINKEDIN_SOURCE_PLATFORM = 'SEGMENT'

// LinkedIn service error codes that indicate token propagation delays (eventual consistency).
// These 401s are not true revocations — the token is valid but not yet propagated.
export const LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES = [65601, 65602]

interface Choice {
value: string | number
label: string
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import type { DestinationDefinition } from '@segment/actions-core'
import { InvalidAuthenticationError, IntegrationError, ErrorCodes } from '@segment/actions-core'
import {
InvalidAuthenticationError,
IntegrationError,
ErrorCodes,
RefreshTokenAndRetryError
} from '@segment/actions-core'
import type { Settings } from './generated-types'
import { LinkedInConversions } from './api'
import type { LinkedInTestAuthenticationError, RefreshTokenResponse, LinkedInRefreshTokenError } from './types'
import { LINKEDIN_API_VERSION } from './constants'
import { LINKEDIN_API_VERSION, LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES } from './constants'
import https from 'https'
import streamConversion from './streamConversion'

Expand Down Expand Up @@ -100,7 +105,21 @@ const destination: DestinationDefinition<Settings> = {
'LinkedIn-Version': LINKEDIN_API_VERSION,
'X-Restli-Protocol-Version': `2.0.0`
},
agent
agent,
afterResponse: [
(_request: unknown, _options: unknown, response: { status: number; data: unknown }) => {
if (response.status === 401) {
const body = response.data as Record<string, unknown> | undefined
const serviceErrorCode = body?.serviceErrorCode as number | undefined
if (serviceErrorCode && LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES.includes(serviceErrorCode)) {
throw new RefreshTokenAndRetryError(
`LinkedIn eventual consistency: token not yet propagated (serviceErrorCode ${serviceErrorCode})`
)
}
}
return response
}
]
Comment thread
harsh-joshi99 marked this conversation as resolved.
}
Comment thread
harsh-joshi99 marked this conversation as resolved.
Outdated
},
actions: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import nock from 'nock'
import { createTestEvent, createTestIntegration } from '@segment/actions-core'
import {
createTestEvent,
createTestIntegration,
RefreshTokenAndRetryError,
RetryableError
} from '@segment/actions-core'
import { DynamicFieldResponse } from '@segment/actions-core'
import { BASE_URL } from '../../constants'
import Destination from '../../index'
Expand Down Expand Up @@ -612,6 +617,83 @@ describe('LinkedinConversions.streamConversion', () => {
).rejects.toThrowError("User Info is missing the required field 'lastName'.")
})

it('should throw RefreshTokenAndRetryError when LinkedIn returns 401 with token propagation error code', async () => {
nock(`${BASE_URL}/conversionEvents`).post(/.*/).reply(401, {
serviceErrorCode: 65601,
message: 'Unable to verify access token'
})

await expect(
testDestination.testAction('streamConversion', {
event,
settings,
mapping: {
email: { '@path': '$.context.traits.email' },
conversionHappenedAt: {
'@path': '$.timestamp'
},
onMappingSave: {
inputs: {},
outputs: {
id: 789123
}
},
enable_batching: true,
batch_size: 5000
}
})
).rejects.toThrow(RefreshTokenAndRetryError)
})

it('should refresh token and throw RetryableError for the full propagation-delay flow', async () => {
// Simulate a fresh token that hasn't propagated yet:
// the conversion call returns 401+65601, the framework then refreshes the token
// and throws RetryableError so Segment infrastructure retries later.
process.env.ACTIONS_LINKEDIN_CONVERSIONS_CLIENT_ID = 'test-client-id'
process.env.ACTIONS_LINKEDIN_CONVERSIONS_CLIENT_SECRET = 'test-client-secret'

nock(`${BASE_URL}/conversionEvents`).post(/.*/).reply(401, {
serviceErrorCode: 65601,
message: 'Unable to verify access token'
})

Comment on lines +710 to +717
Copy link

Copilot AI Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test name says it refreshes the token, but the assertions only check that a RetryableError is thrown. As written, the test would still pass even if the refresh call never happens (the LinkedIn OAuth nock is not asserted). Capture the OAuth nock scope and assert it was consumed (e.g., scope.isDone()/scope.done()), and/or assert the thrown RetryableError has status 503 to validate the intended behavior.

Copilot uses AI. Check for mistakes.
nock('https://www.linkedin.com').post('/oauth/v2/accessToken').reply(200, {
access_token: 'new-propagated-token',
expires_in: 3600
})

const onTokenRefresh = jest.fn().mockResolvedValue(undefined)

await expect(
testDestination.onEvent(
event,
{
subscription: {
subscribe: 'type = "track"',
partnerAction: 'streamConversion',
mapping: {
email: { '@path': '$.context.traits.email' },
conversionHappenedAt: { '@path': '$.timestamp' },
onMappingSave: {
inputs: {},
outputs: { id: 789123 }
},
enable_batching: false,
batch_size: 5000
}
},
oauth: {
access_token: 'old-not-yet-propagated-token',
refresh_token: 'refresh-token'
}
},
{ onTokenRefresh }
)
).rejects.toThrow(RetryableError)

expect(onTokenRefresh).toHaveBeenCalledWith({ accessToken: 'new-propagated-token' })
Comment thread
harsh-joshi99 marked this conversation as resolved.
Outdated
})

it('should detect hashed email if feature flag for smart hashing is passed', async () => {
nock(`${BASE_URL}/conversionEvents`)
.post('', {
Expand Down
Loading