From a36f1ab522aae65fc2962d0c0e3ff142b207eba0 Mon Sep 17 00:00:00 2001 From: "Dr.J" Date: Tue, 10 Mar 2026 10:31:45 -0700 Subject: [PATCH 1/8] Parse Customer.io Track API batch multi-status responses --- .../customerio/__tests__/utils.test.ts | 129 +++++++++++++++++- .../src/destinations/customerio/utils.ts | 128 ++++++++++++++++- 2 files changed, 253 insertions(+), 4 deletions(-) diff --git a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts index b510e62e8df..0840a3fead5 100644 --- a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts +++ b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts @@ -1,4 +1,5 @@ -import { resolveIdentifiers, isIsoDate } from '../utils' +import { MultiStatusResponse } from '@segment/actions-core' +import { isIsoDate, parseTrackApiErrors, parseTrackApiMultiStatusResponse, resolveIdentifiers, sendBatch } from '../utils' describe('isIsoDate', () => { it('should return true for valid ISO date with fractional seconds from 1-9 digits', () => { @@ -79,3 +80,129 @@ describe('resolveIdentifiers', () => { expect(resolveIdentifiers({})).toBeUndefined() }) }) + +describe('sendBatch', () => { + it('should parse 207 multi-status Track API responses', async () => { + const request = jest.fn().mockResolvedValue({ + status: 207, + data: { + errors: [ + { + batch_index: 1, + reason: 'invalid', + message: 'Attribute value too long' + } + ] + } + }) + + const response = await sendBatch(request, [ + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-1', name: 'First' } + }, + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-2', name: 'Second' } + } + ]) + + expect(response).toBeInstanceOf(MultiStatusResponse) + expect((response as MultiStatusResponse).length()).toBe(2) + expect((response as MultiStatusResponse).getResponseAtIndex(0).value()).toEqual({ + status: 200, + body: {}, + sent: {} + }) + expect((response as MultiStatusResponse).getResponseAtIndex(1).value()).toEqual({ + status: 400, + errormessage: 'Attribute value too long', + errortype: 'PAYLOAD_VALIDATION_FAILED', + body: { + batch_index: 1, + reason: 'invalid', + message: 'Attribute value too long' + } + }) + }) + + it('should parse 200 Track API responses that still contain batch errors', async () => { + const request = jest.fn().mockResolvedValue({ + status: 200, + data: { + errors: [ + { + batch_index: 0, + reason: 'required', + field: 'name', + message: 'Name is required' + } + ] + } + }) + + const response = await sendBatch(request, [ + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-1', name: 'First' } + } + ]) + + expect(response).toBeInstanceOf(MultiStatusResponse) + expect((response as MultiStatusResponse).getResponseAtIndex(0).value()).toEqual({ + status: 400, + errormessage: 'Name is required', + errortype: 'PAYLOAD_VALIDATION_FAILED', + body: { + batch_index: 0, + reason: 'required', + field: 'name', + message: 'Name is required' + } + }) + }) +}) + +describe('parseTrackApiErrors', () => { + it('should fill success entries for items without errors', () => { + const response = parseTrackApiErrors( + [ + { + batch_index: 1, + reason: 'required', + field: 'name', + message: 'Name is required' + } + ], + 3 + ) + + expect(response.getAllResponses().map((result) => result.value())).toEqual([ + { status: 200, body: {}, sent: {} }, + { + status: 400, + errormessage: 'Name is required', + errortype: 'PAYLOAD_VALIDATION_FAILED', + body: { + batch_index: 1, + reason: 'required', + field: 'name', + message: 'Name is required' + } + }, + { status: 200, body: {}, sent: {} } + ]) + }) +}) + +describe('parseTrackApiMultiStatusResponse', () => { + it('should return null for non-Track API response bodies', () => { + expect(parseTrackApiMultiStatusResponse({ ok: true }, 1)).toBeNull() + }) +}) diff --git a/packages/destination-actions/src/destinations/customerio/utils.ts b/packages/destination-actions/src/destinations/customerio/utils.ts index 5d490f473e4..39d452251ae 100644 --- a/packages/destination-actions/src/destinations/customerio/utils.ts +++ b/packages/destination-actions/src/destinations/customerio/utils.ts @@ -1,6 +1,7 @@ import dayjs from '../../lib/dayjs' import isPlainObject from 'lodash/isPlainObject' import { fullFormats } from 'ajv-formats/dist/formats' +import { ErrorCodes, MultiStatusResponse, type RequestClient } from '@segment/actions-core' import { CUSTOMERIO_TRACK_API_VERSION } from './versioning-info' const isEmail = (value: string): boolean => { @@ -196,7 +197,10 @@ export const resolveIdentifiers = ({ } } -export const sendBatch = (request: Function, options: RequestPayload[]) => { +export const sendBatch = async ( + request: RequestClient, + options: RequestPayload[] +) => { if (!options?.length) { return } @@ -204,15 +208,133 @@ export const sendBatch = (request: Function, option const [{ settings }] = options const batch = options.map((opts) => buildPayload(opts)) - return request(`${trackApiEndpoint(settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/batch`, { + const response = await request(`${trackApiEndpoint(settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/batch`, { method: 'post', json: { batch } }) + + const responseBody = getResponseBody(response) + + if (response?.status === 207 && responseBody) { + const parsedResults = parseTrackApiMultiStatusResponse(responseBody, batch.length) + if (parsedResults) { + return parsedResults + } + } + + if (response?.status === 200 && responseBody) { + const parsedResults = parseTrackApiMultiStatusResponse(responseBody, batch.length) + if (parsedResults) { + return parsedResults + } + } + + return response +} + +interface TrackApiError { + batch_index?: number + reason?: string + field?: string + message?: string +} + +interface TrackApiResponse { + errors?: TrackApiError[] +} + +interface RequestResponse { + status?: number + data?: unknown + content?: unknown + body?: unknown +} + +function mapTrackApiReasonToErrorCode(reason: string | undefined) { + if (!reason) { + return undefined + } + + switch (reason.toLowerCase()) { + case 'invalid': + case 'required': + return ErrorCodes.PAYLOAD_VALIDATION_FAILED + default: + return undefined + } +} + +function getResponseBody(response: RequestResponse): unknown { + const body = response.data ?? response.content ?? response.body + + if (typeof body !== 'string') { + return body + } + + try { + return JSON.parse(body) + } catch { + try { + const decoded = Buffer.from(body, 'base64').toString('utf-8') + return JSON.parse(decoded) + } catch { + return body + } + } +} + +export function parseTrackApiErrors(errors: TrackApiError[], totalItems: number): MultiStatusResponse { + const multiStatusResponse = new MultiStatusResponse() + const errorMap = new Map() + + for (const error of errors) { + if (typeof error.batch_index === 'number') { + errorMap.set(error.batch_index, error) + } + } + + for (let i = 0; i < totalItems; i++) { + const error = errorMap.get(i) + + if (!error) { + multiStatusResponse.setSuccessResponseAtIndex(i, { + status: 200, + body: {}, + sent: {} + }) + continue + } + + multiStatusResponse.setErrorResponseAtIndex(i, { + status: 400, + errormessage: error.message || `${error.reason || 'ERROR'}: ${error.field || 'unknown field'}`, + errortype: mapTrackApiReasonToErrorCode(error.reason), + body: error + }) + } + + return multiStatusResponse +} + +export function parseTrackApiMultiStatusResponse( + responseBody: unknown, + totalItems: number +): MultiStatusResponse | null { + if (!isRecord(responseBody)) { + return null + } + + const { errors } = responseBody as TrackApiResponse + if (!Array.isArray(errors) || errors.length === 0) { + return null + } + + return parseTrackApiErrors(errors, totalItems) } -export const sendSingle = (request: Function, options: RequestPayload) => { +export const sendSingle = (request: RequestClient, options: RequestPayload) => { const json = buildPayload(options) return request(`${trackApiEndpoint(options.settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/entity`, { method: 'post', From 8be62adaed9719721183d3494d6b13f654a704df Mon Sep 17 00:00:00 2001 From: "Dr.J" Date: Mon, 30 Mar 2026 14:03:00 -0700 Subject: [PATCH 2/8] Address PR review: add types, populate body/sent, and wrap request in try/catch - Collapse duplicate 200/207 status blocks into single OR condition - Rename TrackApiResponse to CustomerIOBatchResponse and add as request generic - Add return type to getResponseBody - Thread original payloads and built batch through to populate body/sent in multi-status responses - Wrap batch request in try/catch to handle full payload failures --- .../customerio/__tests__/utils.test.ts | 65 +++++++++------ .../src/destinations/customerio/utils.ts | 81 ++++++++++++------- 2 files changed, 91 insertions(+), 55 deletions(-) diff --git a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts index 0840a3fead5..6ace8eb981d 100644 --- a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts +++ b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts @@ -1,5 +1,11 @@ import { MultiStatusResponse } from '@segment/actions-core' -import { isIsoDate, parseTrackApiErrors, parseTrackApiMultiStatusResponse, resolveIdentifiers, sendBatch } from '../utils' +import { + isIsoDate, + parseTrackApiErrors, + parseTrackApiMultiStatusResponse, + resolveIdentifiers, + sendBatch +} from '../utils' describe('isIsoDate', () => { it('should return true for valid ISO date with fractional seconds from 1-9 digits', () => { @@ -115,18 +121,15 @@ describe('sendBatch', () => { expect((response as MultiStatusResponse).length()).toBe(2) expect((response as MultiStatusResponse).getResponseAtIndex(0).value()).toEqual({ status: 200, - body: {}, - sent: {} + body: { person_id: 'user-1', name: 'First' }, + sent: { type: 'person', action: 'event', identifiers: { id: 'user-1' }, name: 'First' } }) expect((response as MultiStatusResponse).getResponseAtIndex(1).value()).toEqual({ status: 400, errormessage: 'Attribute value too long', errortype: 'PAYLOAD_VALIDATION_FAILED', - body: { - batch_index: 1, - reason: 'invalid', - message: 'Attribute value too long' - } + body: { person_id: 'user-2', name: 'Second' }, + sent: { type: 'person', action: 'event', identifiers: { id: 'user-2' }, name: 'Second' } }) }) @@ -159,18 +162,25 @@ describe('sendBatch', () => { status: 400, errormessage: 'Name is required', errortype: 'PAYLOAD_VALIDATION_FAILED', - body: { - batch_index: 0, - reason: 'required', - field: 'name', - message: 'Name is required' - } + body: { person_id: 'user-1', name: 'First' }, + sent: { type: 'person', action: 'event', identifiers: { id: 'user-1' }, name: 'First' } }) }) }) describe('parseTrackApiErrors', () => { it('should fill success entries for items without errors', () => { + const options = [ + { type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-0' } }, + { type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-1' } }, + { type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-2' } } + ] + const batch = [ + { type: 'person', action: 'event', identifiers: { id: 'user-0' } }, + { type: 'person', action: 'event', identifiers: { id: 'user-1' } }, + { type: 'person', action: 'event', identifiers: { id: 'user-2' } } + ] + const response = parseTrackApiErrors( [ { @@ -180,29 +190,36 @@ describe('parseTrackApiErrors', () => { message: 'Name is required' } ], - 3 + options, + batch ) expect(response.getAllResponses().map((result) => result.value())).toEqual([ - { status: 200, body: {}, sent: {} }, + { + status: 200, + body: { person_id: 'user-0' }, + sent: { type: 'person', action: 'event', identifiers: { id: 'user-0' } } + }, { status: 400, errormessage: 'Name is required', errortype: 'PAYLOAD_VALIDATION_FAILED', - body: { - batch_index: 1, - reason: 'required', - field: 'name', - message: 'Name is required' - } + body: { person_id: 'user-1' }, + sent: { type: 'person', action: 'event', identifiers: { id: 'user-1' } } }, - { status: 200, body: {}, sent: {} } + { + status: 200, + body: { person_id: 'user-2' }, + sent: { type: 'person', action: 'event', identifiers: { id: 'user-2' } } + } ]) }) }) describe('parseTrackApiMultiStatusResponse', () => { it('should return null for non-Track API response bodies', () => { - expect(parseTrackApiMultiStatusResponse({ ok: true }, 1)).toBeNull() + const options = [{ type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-0' } }] + const batch = [{ type: 'person', action: 'event', identifiers: { id: 'user-0' } }] + expect(parseTrackApiMultiStatusResponse({ ok: true }, options, batch)).toBeNull() }) }) diff --git a/packages/destination-actions/src/destinations/customerio/utils.ts b/packages/destination-actions/src/destinations/customerio/utils.ts index 39d452251ae..4219a645c91 100644 --- a/packages/destination-actions/src/destinations/customerio/utils.ts +++ b/packages/destination-actions/src/destinations/customerio/utils.ts @@ -1,7 +1,7 @@ import dayjs from '../../lib/dayjs' import isPlainObject from 'lodash/isPlainObject' import { fullFormats } from 'ajv-formats/dist/formats' -import { ErrorCodes, MultiStatusResponse, type RequestClient } from '@segment/actions-core' +import { ErrorCodes, MultiStatusResponse, RequestClient } from '@segment/actions-core' import { CUSTOMERIO_TRACK_API_VERSION } from './versioning-info' const isEmail = (value: string): boolean => { @@ -208,30 +208,43 @@ export const sendBatch = async ( const [{ settings }] = options const batch = options.map((opts) => buildPayload(opts)) - const response = await request(`${trackApiEndpoint(settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/batch`, { - method: 'post', - json: { - batch - } - }) + try { + const response = await request( + `${trackApiEndpoint(settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/batch`, + { + method: 'post', + json: { + batch + } + } + ) - const responseBody = getResponseBody(response) + const responseBody = getResponseBody(response) - if (response?.status === 207 && responseBody) { - const parsedResults = parseTrackApiMultiStatusResponse(responseBody, batch.length) - if (parsedResults) { - return parsedResults + if ((response?.status === 200 || response?.status === 207) && responseBody) { + const parsedResults = parseTrackApiMultiStatusResponse(responseBody, options, batch) + if (parsedResults) { + return parsedResults + } } - } - if (response?.status === 200 && responseBody) { - const parsedResults = parseTrackApiMultiStatusResponse(responseBody, batch.length) - if (parsedResults) { - return parsedResults + return response + } catch (err) { + const error = err as { message?: string; response?: { status?: number } } + const status = error.response?.status ?? 500 + const message = error.message ?? 'Unknown error' + + const multiStatusResponse = new MultiStatusResponse() + for (let i = 0; i < options.length; i++) { + multiStatusResponse.setErrorResponseAtIndex(i, { + status, + errormessage: message, + body: options[i].payload, + sent: batch[i] + }) } + return multiStatusResponse } - - return response } interface TrackApiError { @@ -241,7 +254,7 @@ interface TrackApiError { message?: string } -interface TrackApiResponse { +interface CustomerIOBatchResponse { errors?: TrackApiError[] } @@ -266,7 +279,7 @@ function mapTrackApiReasonToErrorCode(reason: string | undefined) { } } -function getResponseBody(response: RequestResponse): unknown { +function getResponseBody(response: RequestResponse): CustomerIOBatchResponse | string | undefined { const body = response.data ?? response.content ?? response.body if (typeof body !== 'string') { @@ -285,7 +298,11 @@ function getResponseBody(response: RequestResponse): unknown { } } -export function parseTrackApiErrors(errors: TrackApiError[], totalItems: number): MultiStatusResponse { +export function parseTrackApiErrors( + errors: TrackApiError[], + options: RequestPayload[], + batch: Record[] +): MultiStatusResponse { const multiStatusResponse = new MultiStatusResponse() const errorMap = new Map() @@ -295,14 +312,14 @@ export function parseTrackApiErrors(errors: TrackApiError[], totalItems: number) } } - for (let i = 0; i < totalItems; i++) { + for (let i = 0; i < options.length; i++) { const error = errorMap.get(i) if (!error) { multiStatusResponse.setSuccessResponseAtIndex(i, { status: 200, - body: {}, - sent: {} + body: options[i].payload, + sent: batch[i] }) continue } @@ -311,27 +328,29 @@ export function parseTrackApiErrors(errors: TrackApiError[], totalItems: number) status: 400, errormessage: error.message || `${error.reason || 'ERROR'}: ${error.field || 'unknown field'}`, errortype: mapTrackApiReasonToErrorCode(error.reason), - body: error + body: options[i].payload, + sent: batch[i] }) } return multiStatusResponse } -export function parseTrackApiMultiStatusResponse( - responseBody: unknown, - totalItems: number +export function parseTrackApiMultiStatusResponse( + responseBody: CustomerIOBatchResponse | string | undefined, + options: RequestPayload[], + batch: Record[] ): MultiStatusResponse | null { if (!isRecord(responseBody)) { return null } - const { errors } = responseBody as TrackApiResponse + const { errors } = responseBody as CustomerIOBatchResponse if (!Array.isArray(errors) || errors.length === 0) { return null } - return parseTrackApiErrors(errors, totalItems) + return parseTrackApiErrors(errors, options, batch) } export const sendSingle = (request: RequestClient, options: RequestPayload) => { From 7c9ad7fc26f6b1dd220611bf560b9f9588b4f731 Mon Sep 17 00:00:00 2001 From: "Dr.J" Date: Mon, 20 Apr 2026 10:35:42 -0700 Subject: [PATCH 3/8] Fix Customer.io batch multistatus handling --- .../customerio/__tests__/utils.test.ts | 130 +++++++++++++++++- .../src/destinations/customerio/utils.ts | 117 +++++++--------- 2 files changed, 178 insertions(+), 69 deletions(-) diff --git a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts index 6ace8eb981d..6ceb114f23d 100644 --- a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts +++ b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts @@ -1,4 +1,4 @@ -import { MultiStatusResponse } from '@segment/actions-core' +import { HTTPError, MultiStatusResponse } from '@segment/actions-core' import { isIsoDate, parseTrackApiErrors, @@ -118,13 +118,13 @@ describe('sendBatch', () => { ]) expect(response).toBeInstanceOf(MultiStatusResponse) - expect((response as MultiStatusResponse).length()).toBe(2) - expect((response as MultiStatusResponse).getResponseAtIndex(0).value()).toEqual({ + expect(response.length()).toBe(2) + expect(response.getResponseAtIndex(0).value()).toEqual({ status: 200, body: { person_id: 'user-1', name: 'First' }, sent: { type: 'person', action: 'event', identifiers: { id: 'user-1' }, name: 'First' } }) - expect((response as MultiStatusResponse).getResponseAtIndex(1).value()).toEqual({ + expect(response.getResponseAtIndex(1).value()).toEqual({ status: 400, errormessage: 'Attribute value too long', errortype: 'PAYLOAD_VALIDATION_FAILED', @@ -158,7 +158,7 @@ describe('sendBatch', () => { ]) expect(response).toBeInstanceOf(MultiStatusResponse) - expect((response as MultiStatusResponse).getResponseAtIndex(0).value()).toEqual({ + expect(response.getResponseAtIndex(0).value()).toEqual({ status: 400, errormessage: 'Name is required', errortype: 'PAYLOAD_VALIDATION_FAILED', @@ -166,6 +166,80 @@ describe('sendBatch', () => { sent: { type: 'person', action: 'event', identifiers: { id: 'user-1' }, name: 'First' } }) }) + + it('should return all-success responses when the Track API reports an empty errors array', async () => { + const request = jest.fn().mockResolvedValue({ + status: 207, + data: { + errors: [] + } + }) + + const response = await sendBatch(request, [ + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-1', name: 'First' } + }, + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-2', name: 'Second' } + } + ]) + + expect(response).toBeInstanceOf(MultiStatusResponse) + expect(response.getAllResponses().map((result) => result.value())).toEqual([ + { + status: 200, + body: { person_id: 'user-1', name: 'First' }, + sent: { type: 'person', action: 'event', identifiers: { id: 'user-1' }, name: 'First' } + }, + { + status: 200, + body: { person_id: 'user-2', name: 'Second' }, + sent: { type: 'person', action: 'event', identifiers: { id: 'user-2' }, name: 'Second' } + } + ]) + }) + + it('should rethrow retryable HTTP errors so the framework can retry them', async () => { + const error = new HTTPError({ status: 429, statusText: 'Too Many Requests' } as any, {} as any, {} as any) + const request = jest.fn().mockRejectedValue(error) + + await expect( + sendBatch(request, [ + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-1', name: 'First' } + } + ]) + ).rejects.toBe(error) + }) + + it('should throw when the batch endpoint returns an unexpected response shape', async () => { + const request = jest.fn().mockResolvedValue({ + status: 200, + data: { + ok: true + } + }) + + await expect( + sendBatch(request, [ + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-1', name: 'First' } + } + ]) + ).rejects.toThrow('Customer.io Track API batch response did not include an errors array') + }) }) describe('parseTrackApiErrors', () => { @@ -214,6 +288,38 @@ describe('parseTrackApiErrors', () => { } ]) }) + + it('should preserve multiple errors for the same batch index and default unknown reasons', () => { + const options = [{ type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-1' } }] + const batch = [{ type: 'person', action: 'event', identifiers: { id: 'user-1' } }] + + const response = parseTrackApiErrors( + [ + { + batch_index: 0, + reason: 'duplicate', + field: 'email', + message: 'Email already exists' + }, + { + batch_index: 0, + reason: 'duplicate', + field: 'id', + message: 'ID already exists' + } + ], + options, + batch + ) + + expect(response.getResponseAtIndex(0).value()).toEqual({ + status: 400, + errormessage: 'Email already exists; ID already exists', + errortype: 'UNKNOWN_ERROR', + body: { person_id: 'user-1' }, + sent: { type: 'person', action: 'event', identifiers: { id: 'user-1' } } + }) + }) }) describe('parseTrackApiMultiStatusResponse', () => { @@ -222,4 +328,18 @@ describe('parseTrackApiMultiStatusResponse', () => { const batch = [{ type: 'person', action: 'event', identifiers: { id: 'user-0' } }] expect(parseTrackApiMultiStatusResponse({ ok: true }, options, batch)).toBeNull() }) + + it('should treat an empty Track API errors array as an all-success response', () => { + const options = [{ type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-0' } }] + const batch = [{ type: 'person', action: 'event', identifiers: { id: 'user-0' } }] + + const response = parseTrackApiMultiStatusResponse({ errors: [] }, options, batch) + + expect(response).toBeInstanceOf(MultiStatusResponse) + expect((response as MultiStatusResponse).getResponseAtIndex(0).value()).toEqual({ + status: 200, + body: { person_id: 'user-0' }, + sent: { type: 'person', action: 'event', identifiers: { id: 'user-0' } } + }) + }) }) diff --git a/packages/destination-actions/src/destinations/customerio/utils.ts b/packages/destination-actions/src/destinations/customerio/utils.ts index 4219a645c91..29ac56a958f 100644 --- a/packages/destination-actions/src/destinations/customerio/utils.ts +++ b/packages/destination-actions/src/destinations/customerio/utils.ts @@ -1,7 +1,7 @@ import dayjs from '../../lib/dayjs' import isPlainObject from 'lodash/isPlainObject' import { fullFormats } from 'ajv-formats/dist/formats' -import { ErrorCodes, MultiStatusResponse, RequestClient } from '@segment/actions-core' +import { ErrorCodes, HTTPError, MultiStatusResponse, RequestClient } from '@segment/actions-core' import { CUSTOMERIO_TRACK_API_VERSION } from './versioning-info' const isEmail = (value: string): boolean => { @@ -200,9 +200,9 @@ export const resolveIdentifiers = ({ export const sendBatch = async ( request: RequestClient, options: RequestPayload[] -) => { +): Promise => { if (!options?.length) { - return + return new MultiStatusResponse() } const [{ settings }] = options @@ -219,31 +219,37 @@ export const sendBatch = async ( } ) - const responseBody = getResponseBody(response) - - if ((response?.status === 200 || response?.status === 207) && responseBody) { - const parsedResults = parseTrackApiMultiStatusResponse(responseBody, options, batch) - if (parsedResults) { - return parsedResults - } + const parsedResults = parseTrackApiMultiStatusResponse(response.data, options, batch) + if (parsedResults) { + return parsedResults } - return response + throw new Error('Customer.io Track API batch response did not include an errors array') } catch (err) { - const error = err as { message?: string; response?: { status?: number } } - const status = error.response?.status ?? 500 - const message = error.message ?? 'Unknown error' - - const multiStatusResponse = new MultiStatusResponse() - for (let i = 0; i < options.length; i++) { - multiStatusResponse.setErrorResponseAtIndex(i, { - status, - errormessage: message, - body: options[i].payload, - sent: batch[i] - }) + // Retryable HTTP errors (408 Request Timeout, 429 Too Many Requests, 5xx Server Errors) + // and unexpected non-HTTP errors should be rethrown so the framework's retry wrapper + // can handle them. Only convert to per-item errors for non-retryable HTTP failures. + if (err instanceof HTTPError) { + const status = err.response.status + if (status === 408 || status === 429 || status >= 500) { + throw err + } + + const message = err.message ?? 'Unknown error' + const multiStatusResponse = new MultiStatusResponse() + for (let i = 0; i < options.length; i++) { + multiStatusResponse.setErrorResponseAtIndex(i, { + status, + errormessage: message, + body: options[i].payload, + sent: batch[i] + }) + } + return multiStatusResponse } - return multiStatusResponse + + // Non-HTTP errors are unexpected - rethrow so the framework can handle/retry them + throw err } } @@ -258,43 +264,13 @@ interface CustomerIOBatchResponse { errors?: TrackApiError[] } -interface RequestResponse { - status?: number - data?: unknown - content?: unknown - body?: unknown -} - function mapTrackApiReasonToErrorCode(reason: string | undefined) { - if (!reason) { - return undefined - } - - switch (reason.toLowerCase()) { + switch (reason?.toLowerCase()) { case 'invalid': case 'required': return ErrorCodes.PAYLOAD_VALIDATION_FAILED default: - return undefined - } -} - -function getResponseBody(response: RequestResponse): CustomerIOBatchResponse | string | undefined { - const body = response.data ?? response.content ?? response.body - - if (typeof body !== 'string') { - return body - } - - try { - return JSON.parse(body) - } catch { - try { - const decoded = Buffer.from(body, 'base64').toString('utf-8') - return JSON.parse(decoded) - } catch { - return body - } + return ErrorCodes.UNKNOWN_ERROR } } @@ -304,18 +280,23 @@ export function parseTrackApiErrors( batch: Record[] ): MultiStatusResponse { const multiStatusResponse = new MultiStatusResponse() - const errorMap = new Map() + const errorMap = new Map() for (const error of errors) { if (typeof error.batch_index === 'number') { - errorMap.set(error.batch_index, error) + const existing = errorMap.get(error.batch_index) + if (existing) { + existing.push(error) + } else { + errorMap.set(error.batch_index, [error]) + } } } for (let i = 0; i < options.length; i++) { - const error = errorMap.get(i) + const indexErrors = errorMap.get(i) - if (!error) { + if (!indexErrors) { multiStatusResponse.setSuccessResponseAtIndex(i, { status: 200, body: options[i].payload, @@ -324,10 +305,18 @@ export function parseTrackApiErrors( continue } + const errormessage = indexErrors + .map((e) => e.message || `${e.reason || 'ERROR'}: ${e.field || 'unknown field'}`) + .join('; ') + + // Use the reason from the first error for error code mapping (all errors for the same + // batch_index are expected to share the same underlying reason class) + const errortype = mapTrackApiReasonToErrorCode(indexErrors[0].reason) + multiStatusResponse.setErrorResponseAtIndex(i, { status: 400, - errormessage: error.message || `${error.reason || 'ERROR'}: ${error.field || 'unknown field'}`, - errortype: mapTrackApiReasonToErrorCode(error.reason), + errormessage, + errortype, body: options[i].payload, sent: batch[i] }) @@ -337,7 +326,7 @@ export function parseTrackApiErrors( } export function parseTrackApiMultiStatusResponse( - responseBody: CustomerIOBatchResponse | string | undefined, + responseBody: CustomerIOBatchResponse | undefined, options: RequestPayload[], batch: Record[] ): MultiStatusResponse | null { @@ -346,7 +335,7 @@ export function parseTrackApiMultiStatusResponse( } const { errors } = responseBody as CustomerIOBatchResponse - if (!Array.isArray(errors) || errors.length === 0) { + if (!Array.isArray(errors)) { return null } From 26a251db99f9c74b111060074823408750f2f8fe Mon Sep 17 00:00:00 2001 From: Sydney Collins Date: Mon, 20 Apr 2026 17:14:10 -0400 Subject: [PATCH 4/8] fix: add errortype to batch error path, use IntegrationError for invalid responses, improve error message extraction, add 500 retry test --- .../customerio/__tests__/utils.test.ts | 22 ++++++++++++++++--- .../src/destinations/customerio/utils.ts | 12 +++++++--- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts index 6ceb114f23d..51215f0a40a 100644 --- a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts +++ b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts @@ -1,4 +1,4 @@ -import { HTTPError, MultiStatusResponse } from '@segment/actions-core' +import { HTTPError, IntegrationError, MultiStatusResponse } from '@segment/actions-core' import { isIsoDate, parseTrackApiErrors, @@ -205,7 +205,7 @@ describe('sendBatch', () => { ]) }) - it('should rethrow retryable HTTP errors so the framework can retry them', async () => { + it('should rethrow retryable HTTP errors (429) so the framework can retry them', async () => { const error = new HTTPError({ status: 429, statusText: 'Too Many Requests' } as any, {} as any, {} as any) const request = jest.fn().mockRejectedValue(error) @@ -221,6 +221,22 @@ describe('sendBatch', () => { ).rejects.toBe(error) }) + it('should rethrow retryable HTTP errors (500) so the framework can retry them', async () => { + const error = new HTTPError({ status: 500, statusText: 'Internal Server Error' } as any, {} as any, {} as any) + const request = jest.fn().mockRejectedValue(error) + + await expect( + sendBatch(request, [ + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-1', name: 'First' } + } + ]) + ).rejects.toBe(error) + }) + it('should throw when the batch endpoint returns an unexpected response shape', async () => { const request = jest.fn().mockResolvedValue({ status: 200, @@ -238,7 +254,7 @@ describe('sendBatch', () => { payload: { person_id: 'user-1', name: 'First' } } ]) - ).rejects.toThrow('Customer.io Track API batch response did not include an errors array') + ).rejects.toThrow(IntegrationError) }) }) diff --git a/packages/destination-actions/src/destinations/customerio/utils.ts b/packages/destination-actions/src/destinations/customerio/utils.ts index 29ac56a958f..06979b1b9f3 100644 --- a/packages/destination-actions/src/destinations/customerio/utils.ts +++ b/packages/destination-actions/src/destinations/customerio/utils.ts @@ -1,7 +1,7 @@ import dayjs from '../../lib/dayjs' import isPlainObject from 'lodash/isPlainObject' import { fullFormats } from 'ajv-formats/dist/formats' -import { ErrorCodes, HTTPError, MultiStatusResponse, RequestClient } from '@segment/actions-core' +import { ErrorCodes, HTTPError, IntegrationError, MultiStatusResponse, RequestClient } from '@segment/actions-core' import { CUSTOMERIO_TRACK_API_VERSION } from './versioning-info' const isEmail = (value: string): boolean => { @@ -224,7 +224,11 @@ export const sendBatch = async ( return parsedResults } - throw new Error('Customer.io Track API batch response did not include an errors array') + throw new IntegrationError( + 'Customer.io Track API batch response did not include an errors array', + 'INVALID_RESPONSE', + 400 + ) } catch (err) { // Retryable HTTP errors (408 Request Timeout, 429 Too Many Requests, 5xx Server Errors) // and unexpected non-HTTP errors should be rethrown so the framework's retry wrapper @@ -235,11 +239,13 @@ export const sendBatch = async ( throw err } - const message = err.message ?? 'Unknown error' + const responseBody = err.response?.data as { message?: string } | undefined + const message = responseBody?.message ?? err.message ?? 'Unknown error' const multiStatusResponse = new MultiStatusResponse() for (let i = 0; i < options.length; i++) { multiStatusResponse.setErrorResponseAtIndex(i, { status, + errortype: ErrorCodes.INTEGRATION_ERROR, errormessage: message, body: options[i].payload, sent: batch[i] From f1eadd3ad2ed2eda94b1e1c78db3d76d224451cc Mon Sep 17 00:00:00 2001 From: Sydney Collins Date: Mon, 20 Apr 2026 17:21:23 -0400 Subject: [PATCH 5/8] fix: map errortype from HTTP status code, add non-retryable HTTPError test --- .../customerio/__tests__/utils.test.ts | 37 +++++++++++++++++++ .../src/destinations/customerio/utils.ts | 8 +++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts index 51215f0a40a..fc58cb0ce9d 100644 --- a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts +++ b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts @@ -237,6 +237,43 @@ describe('sendBatch', () => { ).rejects.toBe(error) }) + it('should convert non-retryable HTTP errors into per-item MultiStatusResponse entries', async () => { + const error = new HTTPError( + { status: 400, statusText: 'Bad Request' } as any, + { url: 'https://track.customer.io/api/v2/batch' } as any, + {} as any + ) + const request = jest.fn().mockRejectedValue(error) + + const response = await sendBatch(request, [ + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-1', name: 'First' } + }, + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-2', name: 'Second' } + } + ]) + + expect(response).toBeInstanceOf(MultiStatusResponse) + expect(response.length()).toBe(2) + expect(response.getResponseAtIndex(0).value()).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + body: { person_id: 'user-1', name: 'First' } + }) + expect(response.getResponseAtIndex(1).value()).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + body: { person_id: 'user-2', name: 'Second' } + }) + }) + it('should throw when the batch endpoint returns an unexpected response shape', async () => { const request = jest.fn().mockResolvedValue({ status: 200, diff --git a/packages/destination-actions/src/destinations/customerio/utils.ts b/packages/destination-actions/src/destinations/customerio/utils.ts index 06979b1b9f3..08964d03445 100644 --- a/packages/destination-actions/src/destinations/customerio/utils.ts +++ b/packages/destination-actions/src/destinations/customerio/utils.ts @@ -241,11 +241,12 @@ export const sendBatch = async ( const responseBody = err.response?.data as { message?: string } | undefined const message = responseBody?.message ?? err.message ?? 'Unknown error' + const errortype = mapHttpStatusToErrorCode(status) const multiStatusResponse = new MultiStatusResponse() for (let i = 0; i < options.length; i++) { multiStatusResponse.setErrorResponseAtIndex(i, { status, - errortype: ErrorCodes.INTEGRATION_ERROR, + errortype, errormessage: message, body: options[i].payload, sent: batch[i] @@ -270,6 +271,11 @@ interface CustomerIOBatchResponse { errors?: TrackApiError[] } +function mapHttpStatusToErrorCode(status: number): ErrorCodes { + if (status === 400) return ErrorCodes.PAYLOAD_VALIDATION_FAILED + return ErrorCodes.INTEGRATION_ERROR +} + function mapTrackApiReasonToErrorCode(reason: string | undefined) { switch (reason?.toLowerCase()) { case 'invalid': From edfd72e13591f36546a6ef5d56e53065253c5bb1 Mon Sep 17 00:00:00 2001 From: Sydney Collins Date: Tue, 21 Apr 2026 15:34:14 -0400 Subject: [PATCH 6/8] use getErrorCodeFromHttpStatus for flat HTTP errors, add 401 test, revert UNKNOWN_ERROR default --- .../customerio/__tests__/utils.test.ts | 29 +++++++++++++++++-- .../src/destinations/customerio/utils.ts | 7 ++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts index fc58cb0ce9d..137bd9c49cd 100644 --- a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts +++ b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts @@ -264,16 +264,41 @@ describe('sendBatch', () => { expect(response.length()).toBe(2) expect(response.getResponseAtIndex(0).value()).toMatchObject({ status: 400, - errortype: 'PAYLOAD_VALIDATION_FAILED', + errortype: 'BAD_REQUEST', body: { person_id: 'user-1', name: 'First' } }) expect(response.getResponseAtIndex(1).value()).toMatchObject({ status: 400, - errortype: 'PAYLOAD_VALIDATION_FAILED', + errortype: 'BAD_REQUEST', body: { person_id: 'user-2', name: 'Second' } }) }) + it('should convert non-retryable auth errors (401) into per-item INTEGRATION_ERROR entries', async () => { + const error = new HTTPError( + { status: 401, statusText: 'Unauthorized' } as any, + { url: 'https://track.customer.io/api/v2/batch' } as any, + {} as any + ) + const request = jest.fn().mockRejectedValue(error) + + const response = await sendBatch(request, [ + { + type: 'person', + action: 'event', + settings: {}, + payload: { person_id: 'user-1', name: 'First' } + } + ]) + + expect(response).toBeInstanceOf(MultiStatusResponse) + expect(response.getResponseAtIndex(0).value()).toMatchObject({ + status: 401, + errortype: 'UNAUTHORIZED', + body: { person_id: 'user-1', name: 'First' } + }) + }) + it('should throw when the batch endpoint returns an unexpected response shape', async () => { const request = jest.fn().mockResolvedValue({ status: 200, diff --git a/packages/destination-actions/src/destinations/customerio/utils.ts b/packages/destination-actions/src/destinations/customerio/utils.ts index 08964d03445..bae4e848768 100644 --- a/packages/destination-actions/src/destinations/customerio/utils.ts +++ b/packages/destination-actions/src/destinations/customerio/utils.ts @@ -1,7 +1,7 @@ import dayjs from '../../lib/dayjs' import isPlainObject from 'lodash/isPlainObject' import { fullFormats } from 'ajv-formats/dist/formats' -import { ErrorCodes, HTTPError, IntegrationError, MultiStatusResponse, RequestClient } from '@segment/actions-core' +import { ErrorCodes, getErrorCodeFromHttpStatus, HTTPError, IntegrationError, MultiStatusResponse, RequestClient } from '@segment/actions-core' import { CUSTOMERIO_TRACK_API_VERSION } from './versioning-info' const isEmail = (value: string): boolean => { @@ -271,9 +271,8 @@ interface CustomerIOBatchResponse { errors?: TrackApiError[] } -function mapHttpStatusToErrorCode(status: number): ErrorCodes { - if (status === 400) return ErrorCodes.PAYLOAD_VALIDATION_FAILED - return ErrorCodes.INTEGRATION_ERROR +function mapHttpStatusToErrorCode(status: number): string { + return getErrorCodeFromHttpStatus(status) } function mapTrackApiReasonToErrorCode(reason: string | undefined) { From 0b4d8278570b39ffe1730d3b350af09e00d10ba6 Mon Sep 17 00:00:00 2001 From: Sydney Collins Date: Tue, 21 Apr 2026 17:55:51 -0400 Subject: [PATCH 7/8] fix: 502 for invalid response shape, surface unindexable batch errors, fix return type --- .../customerio/__tests__/utils.test.ts | 22 +++++++++++ .../src/destinations/customerio/utils.ts | 39 ++++++++++++++----- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts index 137bd9c49cd..c75a376d8d3 100644 --- a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts +++ b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts @@ -321,6 +321,28 @@ describe('sendBatch', () => { }) describe('parseTrackApiErrors', () => { + it('should throw when errors contain an unindexable batch_index', () => { + const options = [ + { type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-0' } } + ] + const batch = [{ type: 'person', action: 'event', identifiers: { id: 'user-0' } }] + + expect(() => + parseTrackApiErrors( + [{ reason: 'invalid', message: 'some error' }], // no batch_index + options, + batch + ) + ).toThrow(IntegrationError) + + expect(() => + parseTrackApiErrors( + [{ batch_index: 99, reason: 'invalid', message: 'out of range' }], // out of range + options, + batch + ) + ).toThrow(IntegrationError) + }) it('should fill success entries for items without errors', () => { const options = [ { type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-0' } }, diff --git a/packages/destination-actions/src/destinations/customerio/utils.ts b/packages/destination-actions/src/destinations/customerio/utils.ts index bae4e848768..d8fa300ecdd 100644 --- a/packages/destination-actions/src/destinations/customerio/utils.ts +++ b/packages/destination-actions/src/destinations/customerio/utils.ts @@ -227,7 +227,7 @@ export const sendBatch = async ( throw new IntegrationError( 'Customer.io Track API batch response did not include an errors array', 'INVALID_RESPONSE', - 400 + 502 ) } catch (err) { // Retryable HTTP errors (408 Request Timeout, 429 Too Many Requests, 5xx Server Errors) @@ -271,7 +271,7 @@ interface CustomerIOBatchResponse { errors?: TrackApiError[] } -function mapHttpStatusToErrorCode(status: number): string { +function mapHttpStatusToErrorCode(status: number): keyof typeof ErrorCodes { return getErrorCodeFromHttpStatus(status) } @@ -292,16 +292,37 @@ export function parseTrackApiErrors( ): MultiStatusResponse { const multiStatusResponse = new MultiStatusResponse() const errorMap = new Map() + const unindexableErrors: TrackApiError[] = [] for (const error of errors) { - if (typeof error.batch_index === 'number') { - const existing = errorMap.get(error.batch_index) - if (existing) { - existing.push(error) - } else { - errorMap.set(error.batch_index, [error]) - } + const batchIndex = error.batch_index + + if (!Number.isInteger(batchIndex) || (batchIndex as number) < 0 || (batchIndex as number) >= options.length) { + unindexableErrors.push(error) + continue } + + const existing = errorMap.get(batchIndex as number) + if (existing) { + existing.push(error) + } else { + errorMap.set(batchIndex as number, [error]) + } + } + + if (unindexableErrors.length > 0) { + const errormessage = unindexableErrors + .map((e) => { + const indexDesc = typeof e.batch_index === 'number' ? `batch_index ${e.batch_index}` : 'missing batch_index' + return e.message || `${e.reason || 'ERROR'}: ${e.field || 'unknown field'} (${indexDesc})` + }) + .join('; ') + + throw new IntegrationError( + `Customer.io returned batch errors that could not be mapped to request items: ${errormessage}`, + 'INVALID_RESPONSE', + 502 + ) } for (let i = 0; i < options.length; i++) { From 7567e54544c24fc1e91504966eb134cb7399c929 Mon Sep 17 00:00:00 2001 From: sydneycollins-cio <149523950+sydneycollins-cio@users.noreply.github.com> Date: Fri, 1 May 2026 12:47:17 -0400 Subject: [PATCH 8/8] test: add assertion to empty resolveIdentifiers test case --- .../src/destinations/customerio/__tests__/utils.test.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts index 7a4ba0018e1..72607521045 100644 --- a/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts +++ b/packages/destination-actions/src/destinations/customerio/__tests__/utils.test.ts @@ -83,7 +83,9 @@ describe('resolveIdentifiers', () => { expect(resolveIdentifiers(identifiers)).toEqual({ anonymous_id: '123' }) }) - it('should return undefined if no identifiers are provided', () => {}) + it('should return undefined if no identifiers are provided', () => { + expect(resolveIdentifiers({})).toBeUndefined() + }) }) describe('sendBatch', () => { @@ -446,3 +448,4 @@ describe('convertValidTimestamp', () => { expect(convertValidTimestamp('1712345678.123')).toBe('1712345678.123') }) }) +