-
Notifications
You must be signed in to change notification settings - Fork 305
[Actions Core] [LinkedIn Audiences] [Linkedin Conversions] Fix LinkedIn token propagation 401s with refresh and retry #3708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 11 commits
e39d6fc
58502ec
e4fc510
82a51a5
1246404
391b2c3
0255433
0c47c61
eebb75d
a96cf93
4c5c523
7ea5b47
5635b9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| import { ErrorCodes, IntegrationError } from '../errors' | ||
| import { ErrorCodes, IntegrationError, TokenPropagationRetryError, RetryableError } from '../errors' | ||
| import { ActionDefinition, MultiStatusResponse } from '../destination-kit/action' | ||
| import { | ||
| StateContext, | ||
|
|
@@ -1452,6 +1452,61 @@ describe('destination kit', () => { | |
| ]) | ||
| expect(spy).toHaveBeenCalledTimes(0) | ||
| }) | ||
|
|
||
| test('should throw RetryableError without token refresh when TokenPropagationRetryError is thrown', async () => { | ||
| const destinationWithPropagationError: DestinationDefinition<JSONObject> = { | ||
| name: 'Test Propagation Error Destination', | ||
| mode: 'cloud', | ||
| authentication: authentication, | ||
| actions: { | ||
| customEvent: { | ||
| title: 'Send a Custom Event', | ||
| description: 'Send events to a custom event in API', | ||
| defaultSubscription: 'type = "track"', | ||
| fields: { | ||
| advertiserId: { | ||
| label: 'Advertiser ID', | ||
| description: 'Advertiser Id', | ||
| type: 'string', | ||
| required: true | ||
| } | ||
| }, | ||
| perform: () => { | ||
| throw new TokenPropagationRetryError('Token not yet propagated (serviceErrorCode 65601)') | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| const destinationTest = new Destination(destinationWithPropagationError) | ||
| const testEvent: SegmentEvent = { | ||
| properties: { a: 'foo' }, | ||
| userId: '3456fff', | ||
| type: 'track' | ||
| } | ||
| const testSettings = { | ||
| apiSecret: 'test_key', | ||
| subscription: { | ||
| subscribe: 'type = "track"', | ||
| partnerAction: 'customEvent', | ||
| mapping: { | ||
| advertiserId: '1231241241' | ||
| } | ||
| }, | ||
| oauth: { | ||
| access_token: 'some-access-token', | ||
| refresh_token: 'refresh-token' | ||
| } | ||
| } | ||
|
|
||
| const refreshTokenSpy = jest.spyOn(authentication, 'refreshAccessToken') | ||
|
|
||
| // handleError should throw RetryableError(503) without calling refresh (token is already fresh) | ||
| const error = await destinationTest.onEvent(testEvent, testSettings).catch((e) => e) | ||
| expect(error).toBeInstanceOf(RetryableError) | ||
| expect(error.status).toBe(503) | ||
| expect(refreshTokenSpy).not.toHaveBeenCalled() | ||
| }) | ||
|
Comment on lines
+1456
to
+1515
|
||
| }) | ||
| describe('onBatch', () => { | ||
| test('should refresh the access-token in case of Unauthorized(401)', async () => { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,7 +30,14 @@ import type { | |
| ResultMultiStatusNode | ||
| } from './types' | ||
| import type { AllRequestOptions } from '../request-client' | ||
| import { ErrorCodes, IntegrationError, InvalidAuthenticationError, MultiStatusErrorReporter } from '../errors' | ||
| import { | ||
| ErrorCodes, | ||
| IntegrationError, | ||
| InvalidAuthenticationError, | ||
| MultiStatusErrorReporter, | ||
| TokenPropagationRetryError, | ||
| RetryableError | ||
| } from '../errors' | ||
| import { AuthTokens, getAuthData, getOAuth2Data, updateOAuthSettings } from './parse-settings' | ||
| import { InputData, Features } from '../mapping-kit' | ||
| import { retry } from '../retry' | ||
|
|
@@ -1022,6 +1029,16 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> { | |
| settings: JSONObject, | ||
| options?: OnEventOptions | ||
| ): Promise<JSONObject> { | ||
| // Handle TokenPropagationRetryError: token was just refreshed but hasn't propagated yet. | ||
| // Throwing here intentionally terminates the internal retry loop in retry.ts — when | ||
| // onFailedAttempt throws, retry.ts propagates that exception immediately without | ||
| // continuing the loop. RetryableError(503) signals Segment infrastructure to retry | ||
| // after backoff, by which time the token will have propagated. 503 (Service Unavailable) | ||
| // distinguishes a propagation delay from a generic internal error (500). | ||
| if (error instanceof TokenPropagationRetryError) { | ||
| throw new RetryableError(error.message, 503) | ||
| } | ||
|
Comment on lines
+1032
to
+1043
|
||
|
|
||
| const statusCode = (error as ResponseError).status ?? (error as HTTPError)?.response?.status ?? 500 | ||
| const needsReauthentication = | ||
| statusCode === 401 && | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -98,6 +98,33 @@ export class APIError extends IntegrationError { | |||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Error indicating a 401 caused by OAuth token propagation delay (eventual | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * consistency) rather than true revocation. The token was recently refreshed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * and is valid but has not yet propagated across all provider nodes. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * The framework converts this into a RetryableError(503) so Segment | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * infrastructure retries the event after a backoff delay, by which time | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * the token will have propagated. No additional token refresh is performed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * on the exception-throwing path — handleError intercepts this error before | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * the OAuth re-authentication logic runs. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * This error is currently thrown by LinkedIn destination hooks when | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * serviceErrorCode 65601 or 65602 appears in a 401 response body. Reuse for | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * other providers should only occur after confirming the same | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * retry-without-refresh semantic applies. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export class TokenPropagationRetryError extends CustomError { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Use 503 to match the RetryableError this converts into — do NOT use 401, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // which would cause generic status-based 401 handlers to trigger a token refresh. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+108
to
+119
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * the token will have propagated. No additional token refresh is performed | |
| * on the exception-throwing path — handleError intercepts this error before | |
| * the OAuth re-authentication logic runs. | |
| * | |
| * This error is currently thrown by LinkedIn destination hooks when | |
| * serviceErrorCode 65601 or 65602 appears in a 401 response body. Reuse for | |
| * other providers should only occur after confirming the same | |
| * retry-without-refresh semantic applies. | |
| */ | |
| export class TokenPropagationRetryError extends CustomError { | |
| // Use 503 to match the RetryableError this converts into — do NOT use 401, | |
| // which would cause generic status-based 401 handlers to trigger a token refresh. | |
| * the token will typically have propagated. This error is intended to model | |
| * a transient, retryable propagation delay rather than a permanent invalid- | |
| * credentials condition. Framework auth-error handling may still run as part | |
| * of the normal exception path, so callers should not rely on this error type | |
| * to suppress OAuth refresh logic. | |
| * | |
| * This error is currently thrown by LinkedIn destination hooks when | |
| * serviceErrorCode 65601 or 65602 appears in a 401 response body. Reuse for | |
| * other providers should only occur after confirming the same | |
| * transient retry semantic applies. | |
| */ | |
| export class TokenPropagationRetryError extends CustomError { | |
| // Use 503 to match the RetryableError this converts into and to represent | |
| // a transient retryable condition instead of a standard invalid-auth 401. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,16 @@ | ||
| import https from 'https' | ||
|
|
||
| import type { DestinationDefinition, ModifiedResponse } from '@segment/actions-core' | ||
| import { InvalidAuthenticationError, IntegrationError, ErrorCodes } from '@segment/actions-core' | ||
| import { | ||
| InvalidAuthenticationError, | ||
| IntegrationError, | ||
| ErrorCodes, | ||
| TokenPropagationRetryError | ||
| } from '@segment/actions-core' | ||
|
|
||
| import type { Settings } from './generated-types' | ||
| import updateAudience from './updateAudience' | ||
| import { LINKEDIN_API_VERSION } from './constants' | ||
| import { LINKEDIN_API_VERSION, LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES } from './constants' | ||
| import { LinkedInAudiences } from './api' | ||
| import type { | ||
| RefreshTokenResponse, | ||
|
|
@@ -148,7 +153,21 @@ const destination: DestinationDefinition<Settings> = { | |
| authorization: `Bearer ${auth?.accessToken}`, | ||
| 'LinkedIn-Version': LINKEDIN_API_VERSION | ||
| }, | ||
| agent | ||
| agent, | ||
| afterResponse: [ | ||
| (_request: unknown, _options: unknown, response: { status: number; data: unknown }) => { | ||
| if (response.status === 401) { | ||
| const body = response.data as Record<string, unknown> | undefined | ||
| const serviceErrorCode = body?.serviceErrorCode as number | undefined | ||
| if (serviceErrorCode && LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES.includes(serviceErrorCode)) { | ||
| throw new TokenPropagationRetryError( | ||
| `LinkedIn eventual consistency: token not yet propagated (serviceErrorCode ${serviceErrorCode})` | ||
| ) | ||
| } | ||
| } | ||
| return response | ||
| } | ||
| ] | ||
| } | ||
|
Comment on lines
+151
to
171
|
||
| }, | ||
|
Comment on lines
+146
to
172
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,14 @@ | ||
| import type { DestinationDefinition } from '@segment/actions-core' | ||
| import { InvalidAuthenticationError, IntegrationError, ErrorCodes } from '@segment/actions-core' | ||
| import { | ||
| InvalidAuthenticationError, | ||
| IntegrationError, | ||
| ErrorCodes, | ||
| TokenPropagationRetryError | ||
| } from '@segment/actions-core' | ||
| import type { Settings } from './generated-types' | ||
| import { LinkedInConversions } from './api' | ||
| import type { LinkedInTestAuthenticationError, RefreshTokenResponse, LinkedInRefreshTokenError } from './types' | ||
| import { LINKEDIN_API_VERSION } from './constants' | ||
| import { LINKEDIN_API_VERSION, LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES } from './constants' | ||
| import https from 'https' | ||
| import streamConversion from './streamConversion' | ||
|
|
||
|
|
@@ -100,7 +105,21 @@ const destination: DestinationDefinition<Settings> = { | |
| 'LinkedIn-Version': LINKEDIN_API_VERSION, | ||
| 'X-Restli-Protocol-Version': `2.0.0` | ||
| }, | ||
| agent | ||
| agent, | ||
| afterResponse: [ | ||
| (_request: unknown, _options: unknown, response: { status: number; data: unknown }) => { | ||
| if (response.status === 401) { | ||
| const body = response.data as Record<string, unknown> | undefined | ||
| const serviceErrorCode = body?.serviceErrorCode as number | undefined | ||
| if (serviceErrorCode && LINKEDIN_TOKEN_PROPAGATION_ERROR_CODES.includes(serviceErrorCode)) { | ||
| throw new TokenPropagationRetryError( | ||
| `LinkedIn eventual consistency: token not yet propagated (serviceErrorCode ${serviceErrorCode})` | ||
| ) | ||
| } | ||
| } | ||
| return response | ||
| } | ||
| ] | ||
|
Comment on lines
+108
to
+122
|
||
| } | ||
|
Comment on lines
+96
to
123
|
||
| }, | ||
| actions: { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.