Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 90 additions & 7 deletions packages/core/src/destination-kit/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import type {
ActionDestinationSuccessResponseType,
ActionDestinationErrorResponseType,
ResultMultiStatusNode,
AudienceMembership
AudienceMembership,
AsyncPollResponseType
} from './types'
import { syncModeTypes } from './types'
import { HTTPError, NormalizedOptions } from '../request-client'
Expand All @@ -41,7 +42,14 @@ type MaybePromise<T> = T | Promise<T>
type RequestClient = ReturnType<typeof createRequestClient>

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type RequestFn<Settings, Payload, Return = any, AudienceSettings = any, ActionHookInputs = any, AudienceMembershipType = AudienceMembership | AudienceMembership[]> = (
export type RequestFn<
Settings,
Payload,
Return = any,
AudienceSettings = any,
ActionHookInputs = any,
AudienceMembershipType = AudienceMembership | AudienceMembership[]
> = (
request: RequestClient,
data: ExecuteInput<Settings, Payload, AudienceSettings, ActionHookInputs, any, AudienceMembershipType>
) => MaybePromise<Return>
Expand Down Expand Up @@ -84,6 +92,13 @@ export interface BaseActionDefinition {
* The fields used to perform the action. These fields should match what the partner API expects.
*/
fields: ActionFields

/**
* The fields used specifically for polling async operations. These are typically minimal fields
* containing only identifiers needed to check operation status (e.g., operationId).
* REQUIRED when defining a poll method - ensures security and performance by validating only essential polling data.
*/
pollFields?: ActionFields
}

type HookValueTypes = string | boolean | number | Array<string | boolean | number>
Expand All @@ -105,7 +120,9 @@ export interface ActionDefinition<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
GeneratedActionHookInputs = any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
GeneratedActionHookOutputs = any
GeneratedActionHookOutputs = any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
PollPayload = any
> extends BaseActionDefinition {
/**
* A way to "register" dynamic fields.
Expand Down Expand Up @@ -141,6 +158,9 @@ export interface ActionDefinition<
/** The operation to perform when this action is triggered for a batch of events */
performBatch?: RequestFn<Settings, Payload[], PerformBatchResponse, AudienceSettings, any, AudienceMembership[]>

/** The operation to poll the status of asynchronous actions */
pollStatus?: RequestFn<Settings, PollPayload, AsyncPollResponseType, AudienceSettings>

/** Hooks are triggered at some point in a mappings lifecycle. They may perform a request with the
* destination using the provided inputs and return a response. The response may then optionally be stored
* in the mapping for later use in the action.
Expand Down Expand Up @@ -255,20 +275,27 @@ const isSyncMode = (value: unknown): value is SyncMode => {
* Action is the beginning step for all partner actions. Entrypoints always start with the
* MapAndValidateInput step.
*/
export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings = any> extends EventEmitter {
readonly definition: ActionDefinition<Settings, Payload, AudienceSettings>
export class Action<
Settings,
Payload extends JSONLikeObject,
AudienceSettings = any,
PollPayload = unknown
> extends EventEmitter {
readonly definition: ActionDefinition<Settings, Payload, AudienceSettings, unknown, unknown, PollPayload>
readonly destinationName: string
readonly schema?: JSONSchema4
readonly pollSchema?: JSONSchema4
readonly hookSchemas?: Record<string, JSONSchema4>
readonly hasBatchSupport: boolean
readonly hasHookSupport: boolean
readonly hasPollSupport: boolean
// Payloads may be any type so we use `any` explicitly here.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private extendRequest: RequestExtension<Settings, any> | undefined

constructor(
destinationName: string,
definition: ActionDefinition<Settings, Payload, AudienceSettings>,
definition: ActionDefinition<Settings, Payload, AudienceSettings, unknown, unknown, PollPayload>,
// Payloads may be any type so we use `any` explicitly here.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
extendRequest?: RequestExtension<Settings, any>
Expand All @@ -279,10 +306,17 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
this.extendRequest = extendRequest
this.hasBatchSupport = typeof definition.performBatch === 'function'
this.hasHookSupport = definition.hooks !== undefined
this.hasPollSupport = typeof definition.pollStatus === 'function'
// Generate json schema based on the field definitions
if (Object.keys(definition.fields ?? {}).length) {
this.schema = fieldsToJsonSchema(definition.fields)
}

// Generate json schema for poll fields if they are defined
if (Object.keys(definition.pollFields ?? {}).length) {
this.pollSchema = fieldsToJsonSchema(definition.pollFields)
}

// Generate a json schema for each defined hook based on the field definitions
if (definition.hooks) {
for (const hookName in definition.hooks) {
Expand Down Expand Up @@ -362,7 +396,7 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
rawMapping: bundle.mapping,
settings: bundle.settings,
payload,
...( typeof audienceMembership === 'boolean' ? { audienceMembership } : {}),
...(typeof audienceMembership === 'boolean' ? { audienceMembership } : {}),
auth: bundle.auth,
features: bundle.features,
statsContext: bundle.statsContext,
Expand Down Expand Up @@ -593,6 +627,55 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
return multiStatusResponse
}

async executePoll(
bundle: ExecuteBundle<Settings, InputData | undefined, AudienceSettings>
): Promise<AsyncPollResponseType> {
if (!this.hasPollSupport || !this.definition.pollStatus) {
throw new IntegrationError('This action does not support polling.', 'NotImplemented', 501)
}

const payload = bundle.data as PollPayload
// Remove empty values and validate using poll schema (required for polling operations)
if (!this.pollSchema) {
throw new IntegrationError('Poll fields must be defined for polling operations.', 'NotImplemented', 501)
}
const validationSchema = this.pollSchema
// Cast to PollPayload as the removeEmptyValues pipeline produces a valid poll payload
// This represents the PollPayload type defined in the ActionDefinition (e.g., { operationId: string })
const pollPayload = removeEmptyValues(payload, validationSchema, true) as PollPayload
Comment on lines +637 to +645
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pollFields are converted into pollSchema, but executePoll never applies the mapping transform step (unlike execute, which uses transform(bundle.mapping, bundle.data, ...)). As a result, polling ignores the provided mapping and effectively requires callers to pass an already-shaped poll payload, which makes pollFields misleading and breaks consistency with other action entrypoints. Consider transforming bundle.mapping + bundle.data into a poll payload (using pollFields) before removeEmptyValues/validation.

Suggested change
const payload = bundle.data as PollPayload
// Remove empty values and validate using poll schema (required for polling operations)
if (!this.pollSchema) {
throw new IntegrationError('Poll fields must be defined for polling operations.', 'NotImplemented', 501)
}
const validationSchema = this.pollSchema
// Cast to PollPayload as the removeEmptyValues pipeline produces a valid poll payload
// This represents the PollPayload type defined in the ActionDefinition (e.g., { operationId: string })
const pollPayload = removeEmptyValues(payload, validationSchema, true) as PollPayload
// Remove empty values and validate using poll schema (required for polling operations)
if (!this.pollSchema) {
throw new IntegrationError('Poll fields must be defined for polling operations.', 'NotImplemented', 501)
}
const validationSchema = this.pollSchema
const resolvedPayload =
bundle.mapping && this.definition.pollFields
? ((await transform(bundle.mapping, bundle.data, this.definition.pollFields)) as PollPayload)
: (bundle.data as PollPayload)
// Cast to PollPayload as the removeEmptyValues pipeline produces a valid poll payload
// This represents the PollPayload type defined in the ActionDefinition (e.g., { operationId: string })
const pollPayload = removeEmptyValues(resolvedPayload, validationSchema, true) as PollPayload

Copilot uses AI. Check for mistakes.
// Validate the resolved payload against the poll schema
const schemaKey = `${this.destinationName}:${this.definition.title}:poll`
validateSchema(pollPayload, validationSchema, {
schemaKey,
statsContext: bundle.statsContext,
exempt: ['dynamicAuthSettings']
})
Comment on lines +630 to +652
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executePoll can throw a runtime TypeError when bundle.data is undefined: removeEmptyValues will return undefined, and then validateSchema spreads the object ({ ...(obj as Record<string, unknown>) }), which crashes for undefined. Add an explicit guard (and a clearer IntegrationError/validation error) when the poll input is missing or not an object before calling removeEmptyValues/validateSchema.

Copilot uses AI. Check for mistakes.

// Construct the data bundle to send to the poll action
const dataBundle = {
rawData: bundle.data,
rawMapping: bundle.mapping,
settings: bundle.settings,
payload: pollPayload,
auth: bundle.auth,
features: bundle.features,
statsContext: bundle.statsContext,
logger: bundle.logger,
engageDestinationCache: bundle.engageDestinationCache,
transactionContext: bundle.transactionContext,
stateContext: bundle.stateContext,
audienceSettings: bundle.audienceSettings,
subscriptionMetadata: bundle.subscriptionMetadata,
signal: bundle?.signal
}

// Construct the request client and perform the poll operation
const requestClient = this.createRequestClient(dataBundle)
const pollResponse = await this.definition.pollStatus(requestClient, dataBundle)

return pollResponse
}
Comment on lines +630 to +677
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New polling entrypoints (Destination.executePoll / Action.executePoll) add significant framework behavior (schema generation, validation, request execution) but there are no corresponding unit tests alongside the existing destination-kit tests. Add tests that cover: action without pollStatus, missing pollFields, successful poll execution, and invalid poll payload validation.

Copilot generated this review using guidance from repository custom instructions.

/*
* Extract the dynamic field context and handler path from a field string. Examples:
* - "structured.first_name" => { dynamicHandlerPath: "structured.first_name" }
Expand Down
49 changes: 47 additions & 2 deletions packages/core/src/destination-kit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import type {
Deletion,
DeletionPayload,
DynamicFieldResponse,
ResultMultiStatusNode
ResultMultiStatusNode,
AsyncPollResponseType
} from './types'
import type { AllRequestOptions } from '../request-client'
import { ErrorCodes, IntegrationError, InvalidAuthenticationError, MultiStatusErrorReporter } from '../errors'
Expand All @@ -44,7 +45,8 @@ export type {
ActionHookType,
ExecuteInput,
RequestFn,
Result
Result,
AsyncPollResponseType
}
export { hookTypeStrings }
export type { MinimalInputField }
Expand Down Expand Up @@ -716,6 +718,49 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
return action.executeDynamicField(fieldKey, data, dynamicFn)
}

public async executePoll(
actionSlug: string,
{
event,
mapping,
subscriptionMetadata,
settings,
features,
statsContext,
logger,
engageDestinationCache,
transactionContext,
stateContext,
signal
}: EventInput<Settings>
): Promise<AsyncPollResponseType> {
const action = this.actions[actionSlug]
if (!action) {
throw new IntegrationError(`Action ${actionSlug} not found`, 'NotImplemented', 404)
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the action slug is missing, executePoll throws an IntegrationError, but executeBatch/executeDynamicField/executeAction return an empty result for unknown actions. This inconsistency makes the public API harder to use. Consider aligning executePoll behavior with the other entrypoints (e.g., return an empty response or a consistent sentinel) unless there’s a strong reason to throw.

Suggested change
throw new IntegrationError(`Action ${actionSlug} not found`, 'NotImplemented', 404)
return [] as AsyncPollResponseType

Copilot uses AI. Check for mistakes.
}

let audienceSettings = {} as AudienceSettings
if (event.context?.personas) {
audienceSettings = event.context?.personas?.audience_settings as AudienceSettings
}
const authData = getAuthData(settings as JSONObject)
return action.executePoll({
Comment on lines +742 to +747
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Destination.executePoll ignores the optional auth provided in EventInput and always derives auth via getAuthData(settings), which only covers OAuth tokens. This can break non-OAuth schemes and also surprises callers that explicitly pass auth. Prefer using the passed auth when provided, and only fallback to deriving it from settings if it is missing.

Copilot uses AI. Check for mistakes.
mapping,
data: event as unknown as InputData,
settings,
audienceSettings,
auth: authData,
features,
statsContext,
logger,
engageDestinationCache,
transactionContext,
stateContext,
subscriptionMetadata,
signal
})
}

private async onSubscription(
subscription: Subscription,
events: SegmentEvent | SegmentEvent[],
Expand Down
25 changes: 25 additions & 0 deletions packages/core/src/destination-kit/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,31 @@ export type ActionDestinationErrorResponseType = {
body?: JSONLikeObject | string
}

export type AsyncOperationResult = {
/** The current status of this operation */
status: 'pending' | 'completed' | 'failed'
/** Message about current state */
message?: string
/** Final result data when status is 'completed' */
result?: JSONLikeObject
/** Error information when status is 'failed' */
error?: {
code: string
message: string
}
/** Original context for this operation */
context?: JSONLikeObject
}

export type AsyncPollResponseType = {
/** Array of operation results - single element for individual operations, multiple for batch */
results: AsyncOperationResult[]
/** Overall status - completed when all operations are done */
overallStatus: 'pending' | 'completed' | 'failed' | 'partial'
/** Summary message */
message?: string
}

export type ResultMultiStatusNode =
| ActionDestinationSuccessResponseType
| (ActionDestinationErrorResponseType & {
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ export type {
StatsContext,
Logger,
Preset,
Result
Result,
AsyncPollResponseType
} from './destination-kit'

export type {
Expand Down
Loading