Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { resolveIdentifiers, isIsoDate } from '../utils'
import { MultiStatusResponse } from '@segment/actions-core'
import { isIsoDate, parseTrackApiErrors, parseTrackApiMultiStatusResponse, resolveIdentifiers, sendBatch } from '../utils'

describe('isIsoDate', () => {
it('should return true for valid ISO date with fractional seconds from 1-9 digits', () => {
Expand Down Expand Up @@ -79,3 +80,129 @@ describe('resolveIdentifiers', () => {
expect(resolveIdentifiers({})).toBeUndefined()
})
})

describe('sendBatch', () => {
it('should parse 207 multi-status Track API responses', async () => {
const request = jest.fn().mockResolvedValue({
status: 207,
data: {
errors: [
{
batch_index: 1,
reason: 'invalid',
message: 'Attribute value too long'
}
]
}
})

const response = await sendBatch(request, [
{
type: 'person',
action: 'event',
settings: {},
payload: { person_id: 'user-1', name: 'First' }
},
{
type: 'person',
action: 'event',
settings: {},
payload: { person_id: 'user-2', name: 'Second' }
}
])

expect(response).toBeInstanceOf(MultiStatusResponse)
expect((response as MultiStatusResponse).length()).toBe(2)
expect((response as MultiStatusResponse).getResponseAtIndex(0).value()).toEqual({
status: 200,
body: {},
sent: {}
})
expect((response as MultiStatusResponse).getResponseAtIndex(1).value()).toEqual({
status: 400,
errormessage: 'Attribute value too long',
errortype: 'PAYLOAD_VALIDATION_FAILED',
body: {
batch_index: 1,
reason: 'invalid',
message: 'Attribute value too long'
}
})
})

it('should parse 200 Track API responses that still contain batch errors', async () => {
const request = jest.fn().mockResolvedValue({
status: 200,
data: {
errors: [
{
batch_index: 0,
reason: 'required',
field: 'name',
message: 'Name is required'
}
]
}
})

const response = await sendBatch(request, [
{
type: 'person',
action: 'event',
settings: {},
payload: { person_id: 'user-1', name: 'First' }
}
])

expect(response).toBeInstanceOf(MultiStatusResponse)
expect((response as MultiStatusResponse).getResponseAtIndex(0).value()).toEqual({
status: 400,
errormessage: 'Name is required',
errortype: 'PAYLOAD_VALIDATION_FAILED',
body: {
batch_index: 0,
reason: 'required',
field: 'name',
message: 'Name is required'
}
})
})
})

describe('parseTrackApiErrors', () => {
it('should fill success entries for items without errors', () => {
const response = parseTrackApiErrors(
[
{
batch_index: 1,
reason: 'required',
field: 'name',
message: 'Name is required'
}
],
3
)

expect(response.getAllResponses().map((result) => result.value())).toEqual([
{ status: 200, body: {}, sent: {} },
{
status: 400,
errormessage: 'Name is required',
errortype: 'PAYLOAD_VALIDATION_FAILED',
body: {
batch_index: 1,
reason: 'required',
field: 'name',
message: 'Name is required'
}
},
{ status: 200, body: {}, sent: {} }
])
})
})

describe('parseTrackApiMultiStatusResponse', () => {
it('should return null for non-Track API response bodies', () => {
expect(parseTrackApiMultiStatusResponse({ ok: true }, 1)).toBeNull()
})
})
128 changes: 125 additions & 3 deletions packages/destination-actions/src/destinations/customerio/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import dayjs from '../../lib/dayjs'
import isPlainObject from 'lodash/isPlainObject'
import { fullFormats } from 'ajv-formats/dist/formats'
import { ErrorCodes, MultiStatusResponse, type RequestClient } from '@segment/actions-core'
import { CUSTOMERIO_TRACK_API_VERSION } from './versioning-info'

const isEmail = (value: string): boolean => {
Expand Down Expand Up @@ -196,23 +197,144 @@ export const resolveIdentifiers = ({
}
}

export const sendBatch = <Payload extends BasePayload>(request: Function, options: RequestPayload<Payload>[]) => {
export const sendBatch = async <Payload extends BasePayload>(
request: RequestClient,
options: RequestPayload<Payload>[]
) => {
if (!options?.length) {
return
}

const [{ settings }] = options
const batch = options.map((opts) => buildPayload(opts))

return request(`${trackApiEndpoint(settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/batch`, {
const response = await request(`${trackApiEndpoint(settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/batch`, {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an expected response type to this?
For example:

    const response = await request<CustomerIOBatchResponse>(`${trackApiEndpoint(settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/batch`, {
      method: 'POST',
      json { batch }
    })

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, consider wrapping this request in a try catch so that you can properly populate errors in the multistatusResponse when the entire payload fails.

try {
const response = await request(${trackApiEndpoint(settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/batch, {
method: 'POST',
json { batch }
})
}
catch(err) {
const error as CustomerIOErrorResponse
// for each payload in the batch, do setErrorResponseAtIndex
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Renamed TrackApiResponse to CustomerIOBatchResponse and passed it as the generic: request<CustomerIOBatchResponse>(...). Also wrapped the whole thing in a try/catch per your other comment so failed requests populate every index with the error details.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. The catch block extracts the status and message from the error, then iterates over every item in the batch and calls setErrorResponseAtIndex with the original payload and sent JSON. If the whole request blows up, every item in the multi-status response reflects the failure.

method: 'post',
json: {
batch
}
})

const responseBody = getResponseBody(response)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a type to this too please?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered by the same rename. getResponseBody now returns CustomerIOBatchResponse | string | undefined and that flows through to parseTrackApiMultiStatusResponse.


if (response?.status === 207 && responseBody) {
const parsedResults = parseTrackApiMultiStatusResponse(responseBody, batch.length)
if (parsedResults) {
return parsedResults
}
}

if (response?.status === 200 && responseBody) {
const parsedResults = parseTrackApiMultiStatusResponse(responseBody, batch.length)
if (parsedResults) {
return parsedResults
}
Comment on lines +240 to +264
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendBatch adds new behavior to convert non-retryable HTTPError responses into a per-item MultiStatusResponse, but the test suite only covers retryable HTTP errors (429/5xx) and unexpected response shapes. Add a unit test that simulates a non-retryable HTTPError (e.g., 400/401) and asserts the returned MultiStatusResponse contains per-item errors with the expected status/message (and desired error typing).

Copilot generated this review using guidance from repository custom instructions.
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we reduce the redundant code here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, collapsed the two blocks into a single OR condition.

Comment thread
joe-ayoub-segment marked this conversation as resolved.

return response
}

interface TrackApiError {
batch_index?: number
reason?: string
field?: string
message?: string
}

interface TrackApiResponse {
errors?: TrackApiError[]
}

interface RequestResponse {
status?: number
data?: unknown
content?: unknown
body?: unknown
}

function mapTrackApiReasonToErrorCode(reason: string | undefined) {
if (!reason) {
return undefined
}

switch (reason.toLowerCase()) {
case 'invalid':
case 'required':
return ErrorCodes.PAYLOAD_VALIDATION_FAILED
default:
return undefined
}
}

function getResponseBody(response: RequestResponse): unknown {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define a proper response type for this function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Returns CustomerIOBatchResponse | string | undefined now.

const body = response.data ?? response.content ?? response.body

if (typeof body !== 'string') {
return body
}

try {
return JSON.parse(body)
} catch {
try {
const decoded = Buffer.from(body, 'base64').toString('utf-8')
return JSON.parse(decoded)
} catch {
return body
}
}
}

export function parseTrackApiErrors(errors: TrackApiError[], totalItems: number): MultiStatusResponse {
const multiStatusResponse = new MultiStatusResponse()
const errorMap = new Map<number, TrackApiError>()

for (const error of errors) {
if (typeof error.batch_index === 'number') {
errorMap.set(error.batch_index, error)
}
}
Comment on lines +301 to +334
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseTrackApiErrors stores errors in a Map<number, TrackApiError>, so if the Track API returns multiple error entries for the same batch_index, earlier errors will be overwritten and lost. Consider grouping errors per index (e.g., Map<number, TrackApiError[]>) and combining messages/fields so the per-item response preserves all reported issues.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback


Comment on lines +304 to +335
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseTrackApiErrors currently drops any error objects that don't include a numeric batch_index. If Customer.io returns an error entry without batch_index (or with an out-of-range index), the code will silently treat all items as success, which can hide real failures. Consider detecting unindexable errors and surfacing them (e.g., throw an IntegrationError, or apply a per-item error for the whole batch) rather than ignoring them.

Suggested change
for (const error of errors) {
if (typeof error.batch_index === 'number') {
const existing = errorMap.get(error.batch_index)
if (existing) {
existing.push(error)
} else {
errorMap.set(error.batch_index, [error])
}
}
}
const unindexableErrors: TrackApiError[] = []
for (const error of errors) {
const batchIndex = error.batch_index
if (!Number.isInteger(batchIndex) || batchIndex < 0 || batchIndex >= options.length) {
unindexableErrors.push(error)
continue
}
const existing = errorMap.get(batchIndex)
if (existing) {
existing.push(error)
} else {
errorMap.set(batchIndex, [error])
}
}
if (unindexableErrors.length > 0) {
const errormessage = unindexableErrors
.map((error) => {
const indexDescription =
typeof error.batch_index === 'number' ? `batch_index ${error.batch_index}` : 'missing batch_index'
return error.message || `${error.reason || 'ERROR'}: ${error.field || 'unknown field'} (${indexDescription})`
})
.join('; ')
throw new IntegrationError(`Customer.io returned batch errors that could not be mapped to request items: ${errormessage}`)
}

Copilot uses AI. Check for mistakes.
for (let i = 0; i < totalItems; i++) {
const error = errorMap.get(i)

if (!error) {
multiStatusResponse.setSuccessResponseAtIndex(i, {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contents of each multiStatusResponse item is passed to the UI for observability purposes.
So if possible we should populate body and sent values correctly:

    msResponse.setSuccessResponseAtIndex(index, {
      status: 200,
      body: // this should be the original payload object sent to the perform function,
      sent: // this should be the actual JSON which was posted to your platform
    })

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Threaded the original options array and the built batch array through to parseTrackApiErrors. Success responses now get body: options[i].payload (original payload from perform) and sent: batch[i] (the transformed JSON we posted).

status: 200,
body: {},
sent: {}
})
continue
}

multiStatusResponse.setErrorResponseAtIndex(i, {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to successful responses, we should try and populate error responses with correct details.

msResponse.setErrorResponseAtIndex(index, {
  status: 400,
  errortype,
  errormessage,
  body: // this should be the original payload object sent to the perform function,
  sent: // this should be the actual JSON which was posted to your platform
})

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same fix, error responses now also get body: options[i].payload and sent: batch[i].

status: 400,
errormessage: error.message || `${error.reason || 'ERROR'}: ${error.field || 'unknown field'}`,
errortype: mapTrackApiReasonToErrorCode(error.reason),
body: error
})
}

return multiStatusResponse
}

export function parseTrackApiMultiStatusResponse(
responseBody: unknown,
totalItems: number
): MultiStatusResponse | null {
if (!isRecord(responseBody)) {
return null
}

const { errors } = responseBody as TrackApiResponse
if (!Array.isArray(errors) || errors.length === 0) {
return null
}

return parseTrackApiErrors(errors, totalItems)
}

export const sendSingle = <Payload extends BasePayload>(request: Function, options: RequestPayload<Payload>) => {
export const sendSingle = <Payload extends BasePayload>(request: RequestClient, options: RequestPayload<Payload>) => {
const json = buildPayload(options)
return request(`${trackApiEndpoint(options.settings)}/api/${CUSTOMERIO_TRACK_API_VERSION}/entity`, {
method: 'post',
Expand Down