Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { resolveIdentifiers, isIsoDate } from '../utils'
import { MultiStatusResponse } from '@segment/actions-core'
import {
isIsoDate,
parseTrackApiErrors,
parseTrackApiMultiStatusResponse,
resolveIdentifiers,
sendBatch
} from '../utils'

describe('isIsoDate', () => {
it('should return true for valid ISO date with fractional seconds from 1-9 digits', () => {
Expand Down Expand Up @@ -79,3 +86,140 @@ describe('resolveIdentifiers', () => {
expect(resolveIdentifiers({})).toBeUndefined()
})
})

describe('sendBatch', () => {
it('should parse 207 multi-status Track API responses', async () => {
const request = jest.fn().mockResolvedValue({
status: 207,
data: {
errors: [
{
batch_index: 1,
reason: 'invalid',
message: 'Attribute value too long'
}
]
}
})

const response = await sendBatch(request, [
{
type: 'person',
action: 'event',
settings: {},
payload: { person_id: 'user-1', name: 'First' }
},
{
type: 'person',
action: 'event',
settings: {},
payload: { person_id: 'user-2', name: 'Second' }
}
])

expect(response).toBeInstanceOf(MultiStatusResponse)
expect((response as MultiStatusResponse).length()).toBe(2)
expect((response as MultiStatusResponse).getResponseAtIndex(0).value()).toEqual({
status: 200,
body: { person_id: 'user-1', name: 'First' },
sent: { type: 'person', action: 'event', identifiers: { id: 'user-1' }, name: 'First' }
})
expect((response as MultiStatusResponse).getResponseAtIndex(1).value()).toEqual({
status: 400,
errormessage: 'Attribute value too long',
errortype: 'PAYLOAD_VALIDATION_FAILED',
body: { person_id: 'user-2', name: 'Second' },
sent: { type: 'person', action: 'event', identifiers: { id: 'user-2' }, name: 'Second' }
})
})

it('should parse 200 Track API responses that still contain batch errors', async () => {
const request = jest.fn().mockResolvedValue({
status: 200,
data: {
errors: [
{
batch_index: 0,
reason: 'required',
field: 'name',
message: 'Name is required'
}
]
}
})

const response = await sendBatch(request, [
{
type: 'person',
action: 'event',
settings: {},
payload: { person_id: 'user-1', name: 'First' }
}
])

expect(response).toBeInstanceOf(MultiStatusResponse)
expect((response as MultiStatusResponse).getResponseAtIndex(0).value()).toEqual({
status: 400,
errormessage: 'Name is required',
errortype: 'PAYLOAD_VALIDATION_FAILED',
body: { person_id: 'user-1', name: 'First' },
sent: { type: 'person', action: 'event', identifiers: { id: 'user-1' }, name: 'First' }
})
})
})

describe('parseTrackApiErrors', () => {
it('should fill success entries for items without errors', () => {
const options = [
{ type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-0' } },
{ type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-1' } },
{ type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-2' } }
]
const batch = [
{ type: 'person', action: 'event', identifiers: { id: 'user-0' } },
{ type: 'person', action: 'event', identifiers: { id: 'user-1' } },
{ type: 'person', action: 'event', identifiers: { id: 'user-2' } }
]

const response = parseTrackApiErrors(
[
{
batch_index: 1,
reason: 'required',
field: 'name',
message: 'Name is required'
}
],
options,
batch
)

expect(response.getAllResponses().map((result) => result.value())).toEqual([
{
status: 200,
body: { person_id: 'user-0' },
sent: { type: 'person', action: 'event', identifiers: { id: 'user-0' } }
},
{
status: 400,
errormessage: 'Name is required',
errortype: 'PAYLOAD_VALIDATION_FAILED',
body: { person_id: 'user-1' },
sent: { type: 'person', action: 'event', identifiers: { id: 'user-1' } }
},
{
status: 200,
body: { person_id: 'user-2' },
sent: { type: 'person', action: 'event', identifiers: { id: 'user-2' } }
}
])
})
})

describe('parseTrackApiMultiStatusResponse', () => {
it('should return null for non-Track API response bodies', () => {
const options = [{ type: 'person', action: 'event', settings: {}, payload: { person_id: 'user-0' } }]
const batch = [{ type: 'person', action: 'event', identifiers: { id: 'user-0' } }]
expect(parseTrackApiMultiStatusResponse({ ok: true }, options, batch)).toBeNull()
})
})
155 changes: 148 additions & 7 deletions packages/destination-actions/src/destinations/customerio/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import dayjs from '../../lib/dayjs'
import isPlainObject from 'lodash/isPlainObject'
import { fullFormats } from 'ajv-formats/dist/formats'
import { ErrorCodes, MultiStatusResponse, RequestClient } from '@segment/actions-core'
import { CUSTOMERIO_TRACK_API_VERSION } from './versioning-info'

const isEmail = (value: string): boolean => {
Expand Down Expand Up @@ -196,23 +197,163 @@ export const resolveIdentifiers = ({
}
}

export const sendBatch = <Payload extends BasePayload>(request: Function, options: RequestPayload<Payload>[]) => {
export const sendBatch = async <Payload extends BasePayload>(
request: RequestClient,
options: RequestPayload<Payload>[]
) => {
if (!options?.length) {
return
}

const [{ settings }] = options
const batch = options.map((opts) => buildPayload(opts))

return request(`${trackApiEndpoint(settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/batch`, {
method: 'post',
json: {
batch
try {
const response = await request<CustomerIOBatchResponse>(
`${trackApiEndpoint(settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/batch`,
{
method: 'post',
json: {
batch
}
}
)

const responseBody = getResponseBody(response)

if ((response?.status === 200 || response?.status === 207) && responseBody) {
const parsedResults = parseTrackApiMultiStatusResponse(responseBody, options, batch)
if (parsedResults) {
return parsedResults
}
}
Comment on lines +240 to +264
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendBatch adds new behavior to convert non-retryable HTTPError responses into a per-item MultiStatusResponse, but the test suite only covers retryable HTTP errors (429/5xx) and unexpected response shapes. Add a unit test that simulates a non-retryable HTTPError (e.g., 400/401) and asserts the returned MultiStatusResponse contains per-item errors with the expected status/message (and desired error typing).

Copilot generated this review using guidance from repository custom instructions.
})

return response
} catch (err) {
const error = err as { message?: string; response?: { status?: number } }
const status = error.response?.status ?? 500
const message = error.message ?? 'Unknown error'

const multiStatusResponse = new MultiStatusResponse()
for (let i = 0; i < options.length; i++) {
multiStatusResponse.setErrorResponseAtIndex(i, {
status,
errormessage: message,
body: options[i].payload,
sent: batch[i]
})
}
return multiStatusResponse
}
Comment thread
joe-ayoub-segment marked this conversation as resolved.
}

interface TrackApiError {
batch_index?: number
reason?: string
field?: string
message?: string
}

interface CustomerIOBatchResponse {
errors?: TrackApiError[]
}

interface RequestResponse {
status?: number
data?: unknown
content?: unknown
body?: unknown
}

function mapTrackApiReasonToErrorCode(reason: string | undefined) {
if (!reason) {
return undefined
}

switch (reason.toLowerCase()) {
case 'invalid':
case 'required':
return ErrorCodes.PAYLOAD_VALIDATION_FAILED
default:
return undefined
}
}

function getResponseBody(response: RequestResponse): CustomerIOBatchResponse | string | undefined {
const body = response.data ?? response.content ?? response.body

if (typeof body !== 'string') {
return body
}

try {
return JSON.parse(body)
} catch {
try {
const decoded = Buffer.from(body, 'base64').toString('utf-8')
return JSON.parse(decoded)
} catch {
return body
}
}
}

export function parseTrackApiErrors<Payload extends BasePayload>(
errors: TrackApiError[],
options: RequestPayload<Payload>[],
batch: Record<string, unknown>[]
): MultiStatusResponse {
const multiStatusResponse = new MultiStatusResponse()
const errorMap = new Map<number, TrackApiError>()

for (const error of errors) {
if (typeof error.batch_index === 'number') {
errorMap.set(error.batch_index, error)
}
}
Comment on lines +301 to +334
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseTrackApiErrors stores errors in a Map<number, TrackApiError>, so if the Track API returns multiple error entries for the same batch_index, earlier errors will be overwritten and lost. Consider grouping errors per index (e.g., Map<number, TrackApiError[]>) and combining messages/fields so the per-item response preserves all reported issues.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback


Comment on lines +304 to +335
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseTrackApiErrors currently drops any error objects that don't include a numeric batch_index. If Customer.io returns an error entry without batch_index (or with an out-of-range index), the code will silently treat all items as success, which can hide real failures. Consider detecting unindexable errors and surfacing them (e.g., throw an IntegrationError, or apply a per-item error for the whole batch) rather than ignoring them.

Suggested change
for (const error of errors) {
if (typeof error.batch_index === 'number') {
const existing = errorMap.get(error.batch_index)
if (existing) {
existing.push(error)
} else {
errorMap.set(error.batch_index, [error])
}
}
}
const unindexableErrors: TrackApiError[] = []
for (const error of errors) {
const batchIndex = error.batch_index
if (!Number.isInteger(batchIndex) || batchIndex < 0 || batchIndex >= options.length) {
unindexableErrors.push(error)
continue
}
const existing = errorMap.get(batchIndex)
if (existing) {
existing.push(error)
} else {
errorMap.set(batchIndex, [error])
}
}
if (unindexableErrors.length > 0) {
const errormessage = unindexableErrors
.map((error) => {
const indexDescription =
typeof error.batch_index === 'number' ? `batch_index ${error.batch_index}` : 'missing batch_index'
return error.message || `${error.reason || 'ERROR'}: ${error.field || 'unknown field'} (${indexDescription})`
})
.join('; ')
throw new IntegrationError(`Customer.io returned batch errors that could not be mapped to request items: ${errormessage}`)
}

Copilot uses AI. Check for mistakes.
for (let i = 0; i < options.length; i++) {
const error = errorMap.get(i)

if (!error) {
multiStatusResponse.setSuccessResponseAtIndex(i, {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contents of each multiStatusResponse item is passed to the UI for observability purposes.
So if possible we should populate body and sent values correctly:

    msResponse.setSuccessResponseAtIndex(index, {
      status: 200,
      body: // this should be the original payload object sent to the perform function,
      sent: // this should be the actual JSON which was posted to your platform
    })

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Threaded the original options array and the built batch array through to parseTrackApiErrors. Success responses now get body: options[i].payload (original payload from perform) and sent: batch[i] (the transformed JSON we posted).

status: 200,
body: options[i].payload,
sent: batch[i]
})
continue
}

multiStatusResponse.setErrorResponseAtIndex(i, {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to successful responses, we should try and populate error responses with correct details.

msResponse.setErrorResponseAtIndex(index, {
  status: 400,
  errortype,
  errormessage,
  body: // this should be the original payload object sent to the perform function,
  sent: // this should be the actual JSON which was posted to your platform
})

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same fix, error responses now also get body: options[i].payload and sent: batch[i].

status: 400,
errormessage: error.message || `${error.reason || 'ERROR'}: ${error.field || 'unknown field'}`,
errortype: mapTrackApiReasonToErrorCode(error.reason),
body: options[i].payload,
sent: batch[i]
})
}

return multiStatusResponse
}

export function parseTrackApiMultiStatusResponse<Payload extends BasePayload>(
responseBody: CustomerIOBatchResponse | string | undefined,
options: RequestPayload<Payload>[],
batch: Record<string, unknown>[]
): MultiStatusResponse | null {
if (!isRecord(responseBody)) {
return null
}

const { errors } = responseBody as CustomerIOBatchResponse
if (!Array.isArray(errors) || errors.length === 0) {
return null
}

return parseTrackApiErrors(errors, options, batch)
}

export const sendSingle = <Payload extends BasePayload>(request: Function, options: RequestPayload<Payload>) => {
export const sendSingle = <Payload extends BasePayload>(request: RequestClient, options: RequestPayload<Payload>) => {
const json = buildPayload(options)
return request(`${trackApiEndpoint(options.settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/entity`, {
method: 'post',
Expand Down
Loading