TypeScript SDK for integrating human-in-the-loop checkpoints into AI agent workflows.
Datashift enables AI agents to submit tasks for human review before committing changes. Create review queues, define review types (approval, classification, labeling, multiple_choice, scoring, augmentation), and integrate human oversight into any AI workflow — via REST API or MCP.
npm install @datashift/sdkimport {DatashiftRestClient} from '@datashift/sdk';
const datashift = new DatashiftRestClient({
apiKey: process.env.DATASHIFT_API_KEY!,
});
// AI agent enriches company data from external sources
const enrichedData = {
companyId: 'acme_corp_123',
original: {name: 'Acme Corp', industry: 'Unknown'},
enriched: {
name: 'Acme Corporation',
industry: 'Manufacturing',
employees: 5200,
revenue: '$1.2B',
linkedIn: 'https://linkedin.com/company/acme',
},
sources: ['LinkedIn', 'Crunchbase', 'SEC Filings'],
};
// Submit enrichment for human verification before updating CRM
const task = await datashift.task.submit({
queueKey: 'data-enrichment',
data: enrichedData,
summary: 'Verify Acme Corp enrichment before CRM update',
});
// Wait for reviewer approval
const result = await datashift.task.waitForReview(task.id, {
timeout: 300000, // 5 minutes
pollInterval: 5000, // Check every 5 seconds
});
// Apply verified changes to CRM
if (result.result.includes('approved')) {
await updateCRM(enrichedData.companyId, enrichedData.enriched);
}Create an API key in the Datashift Console.
const datashift = new DatashiftRestClient({
apiKey: 'sk_live_...'
});// Submit a task
const task = await datashift.task.submit({
queueKey: 'approvals',
data: {...},
summary: 'Short description',
});
// Get task by ID
const task = await datashift.task.get(taskId);
// List tasks
const tasks = await datashift.task.list({
queueId?: string;
state?: 'pending' | 'queued' | 'reviewed';
});
// Wait for review
const reviewed = await datashift.task.waitForReview(taskId, {
timeout: 300000, // 5 minutes
pollInterval: 2000, // Start at 2s
maxPollInterval: 30000 // Back off to 30s
});// List queues
const queues = await datashift.queue.list();
// Get queue config
const queue = await datashift.queue.get('refund-approvals');// List reviews with filters
const reviews = await datashift.review.list({
queueKey: 'approvals',
reviewerType: 'human',
from: new Date('2024-01-01'),
limit: 50,
});
// Get review by ID
const review = await datashift.review.get(reviewId);// Create webhook
const webhook = await datashift.webhook.create({
url: 'https://your-service.com/webhook',
events: ['task.reviewed'],
});
// Save webhook.secret securely!
// List webhooks
const webhooks = await datashift.webhook.list();
// Update webhook
const updated = await datashift.webhook.update(webhookId, {
url: 'https://new-url.com/webhook',
events: ['task.reviewed', 'task.created'],
active: true,
});
// Rotate webhook secret
const rotated = await datashift.webhook.rotateSecret(webhookId);
// Save rotated.secret securely!
// Send test event
await datashift.webhook.test(webhookId);
// Delete webhook
await datashift.webhook.delete(webhookId);Webhooks provide async notifications when tasks are reviewed.
import express from 'express';
import {verifyWebhookSignature, parseWebhookEvent} from '@datashift/sdk';
import type {WebhookEvent} from '@datashift/sdk';
const app = express();
// IMPORTANT: Use raw body for signature verification
app.post('/webhook/datashift',
express.raw({type: 'application/json'}),
(req, res) => {
const signature = req.headers['x-datashift-signature'] as string;
// Verify signature
if (!verifyWebhookSignature(req.body, signature, WEBHOOK_SECRET)) {
return res.status(401).send('Invalid signature');
}
// Parse event
const event = parseWebhookEvent<WebhookEvent>(req.body);
if (event.event === 'task.reviewed') {
const {task, queue, reviews} = event.data;
console.log(`Task ${task.id} in queue ${queue.key} reviewed`);
console.log(`Result: ${reviews[0].result}`);
console.log(`Reviewer: ${reviews[0].reviewer.name}`);
} else if (event.event === 'task.created') {
const {task, queue} = event.data;
console.log(`Task ${task.id} created in queue ${queue.key}`);
}
res.status(200).send('OK');
}
);| Event | Description |
|---|---|
task.created |
A new task was submitted for review |
task.reviewed |
A task has been reviewed (includes all reviews) |
// task.created
interface TaskCreatedWebhookEvent {
event: 'task.created';
timestamp: string;
data: {
task: WebhookTaskData;
queue: WebhookQueueData;
};
}
// task.reviewed
interface TaskReviewedWebhookEvent {
event: 'task.reviewed';
timestamp: string;
data: {
task: WebhookTaskData;
queue: WebhookQueueData;
reviews: WebhookReviewData[]; // All reviews (supports two-step workflows)
};
}
interface WebhookTaskData {
id: string;
external_id: string | null;
state: string;
summary: string | null;
data: Record<string, unknown>;
metadata: Record<string, unknown>;
choices: TaskChoice[] | null; // multiple_choice queues only
max_selections: number | null; // per-task override; multiple_choice only
sla_deadline: string | null;
reviewed_at: string | null;
created_at: string;
}
interface WebhookQueueData {
key: string;
name: string;
review_type: string;
}
interface WebhookReviewData {
result: string[];
data: Record<string, unknown>;
feedback: string | null;
reviewer: { name: string; type: string };
created_at: string;
}
// Union type for all webhook events
type WebhookEvent = TaskCreatedWebhookEvent | TaskReviewedWebhookEvent;interface Task {
id: string;
queue_id: string;
external_id: string | null;
state: 'pending' | 'queued' | 'reviewed';
data: Record<string, unknown>;
context: Record<string, unknown>;
metadata: Record<string, unknown>;
choices: TaskChoice[] | null; // multiple_choice queues only
max_selections: number | null; // per-task override; multiple_choice only
summary: string | null;
sla_deadline: string | null;
reviewed_at: string | null;
created_at: string;
updated_at: string;
reviews?: Review[];
}
interface TaskChoice {
key: string;
label: string;
description?: string | null;
// Structured per-choice fields the viewer can render alongside the label
// (e.g., address/EIN for an entity-match candidate).
data?: Record<string, unknown> | null;
}interface Queue {
id: string;
key: string;
name: string;
description: string | null;
review_type:
| 'approval'
| 'labeling'
| 'classification'
| 'multiple_choice'
| 'scoring'
| 'augmentation';
review_options: ReviewOption[];
// Selection primitives. review_type sets sensible defaults; explicit
// values override.
max_selections: number | null; // 1 = single-select, null = unbounded
result_required: boolean; // if false, empty selection is valid
feedback_required: 'never' | 'always' | 'when_empty';
assignment: 'manual' | 'round_robin' | 'ai_first';
sla_minutes: number | null;
deleted_at: string | null;
}Use review_type: 'multiple_choice' when the candidate list varies per
task (e.g., entity matching, output ranking, picking the best of N
generated drafts). Ship the candidates with the task instead of the
queue:
await datashift.task.submit({
queueKey: 'entity-match',
summary: 'Verify business identity',
data: { name: 'ACME Plumbing LLC', ein: '12-3456789' },
choices: [
{
key: 'cand_1',
label: 'ACME Plumbing LLC',
description: 'EIN 12-3456789',
data: { address: '100 Main St, Austin TX', confidence: 0.92 },
},
{
key: 'cand_2',
label: 'Acme Plumbing Inc',
description: 'EIN 11-2233445',
data: { address: '220 Oak St, Houston TX', confidence: 0.71 },
},
],
});If none of the candidates is a real match, the reviewer can submit an
empty result. Configure the queue with result_required: false and
feedback_required: 'when_empty' to require an explanation in that case;
no reserved __none__ key needed.
interface Review {
id: string;
task_id: string;
reviewer_id: string | null;
result: string[];
data: Record<string, unknown>;
feedback: string | null;
created_at: string;
}interface ListReviewsFilters {
taskId?: string;
reviewerId?: string;
queueKey?: string;
reviewerType?: 'ai' | 'human';
from?: Date | string;
to?: Date | string;
limit?: number;
offset?: number;
}interface ReviewResult {
task_id: string;
result: string[];
data: Record<string, unknown>;
reviewed_at: string;
review: Review;
}The SDK provides typed error classes for common error scenarios.
import {
DatashiftError,
AuthenticationError,
NotFoundError,
ValidationError,
TimeoutError,
RateLimitError,
ServerError,
} from '@datashift/sdk';
try {
const result = await datashift.task.waitForReview(taskId, {timeout: 60000});
} catch (error) {
if (error instanceof TimeoutError) {
console.log('Review not finished in time');
} else if (error instanceof AuthenticationError) {
console.log('Invalid credentials');
} else if (error instanceof NotFoundError) {
console.log('Task not found');
} else if (error instanceof RateLimitError) {
console.log(`Rate limited. Retry after ${error.retryAfter}s`);
} else if (error instanceof DatashiftError) {
console.log(`API error: ${error.message}`);
}
}interface RestClientConfig {
apiKey: string; // Required
baseUrl?: string; // Default: 'https://api.datashift.io'
timeout?: number; // Request timeout (default: 30000ms)
retries?: number; // Retry count (default: 3)
retryDelay?: number; // Initial retry delay (default: 1000ms)
}interface WaitOptions {
timeout?: number; // Max wait time (default: 300000ms)
pollInterval?: number; // Poll interval (default: 2000ms)
maxPollInterval?: number; // Max poll interval for backoff (default: 30000ms)
}import {DatashiftRestClient, verifyWebhookSignature, parseWebhookEvent} from '@datashift/sdk';
import type {WebhookEvent} from '@datashift/sdk';
import express from 'express';
const datashift = new DatashiftRestClient({
apiKey: process.env.DATASHIFT_API_KEY!,
});
// Store pending tasks (use Redis/DB in production)
const pendingTasks = new Map<string, (result: any) => void>();
// Submit task and return immediately
async function submitForReview(data: any): Promise<string> {
const task = await datashift.task.submit({
queueKey: 'reviews',
data,
});
return task.id;
}
// Webhook handler
const app = express();
app.post('/webhook/datashift',
express.raw({type: 'application/json'}),
(req, res) => {
const signature = req.headers['x-datashift-signature'] as string;
if (!verifyWebhookSignature(req.body, signature, process.env.WEBHOOK_SECRET!)) {
return res.status(401).send('Invalid signature');
}
const event = parseWebhookEvent<WebhookEvent>(req.body);
if (event.event === 'task.reviewed') {
const resolver = pendingTasks.get(event.data.task.id);
if (resolver) {
resolver(event.data);
pendingTasks.delete(event.data.task.id);
}
}
res.status(200).send('OK');
}
);
app.listen(3000);- Node.js 18+
- TypeScript 5.0+ (for type definitions)
- Datashift Website — Product overview and sign up
- Documentation — Guides, API reference, and examples
- MCP Server — Connect AI agents via Model Context Protocol
- Console — Manage queues, reviewers, and API keys
- GitHub — SDKs and sample projects
MIT