Skip to content

Ruby-Leung/p-flow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

p-flow

Tiny, zero-dependency concurrency orchestrator for Promises. Concurrency limit · rate limiting · retry with backoff & jitter · timeout · priority queue · adaptive backpressure.

npm bundle size CI types license

English · 中文


Why p-flow?

p-limit, p-queue, and p-retry are great — but you usually need several of them at once, wired together by hand. p-flow bundles concurrency, rate limiting, retries, timeouts, and a priority queue behind one intuitive API — and adds something none of them have:

🌟 Adaptive backpressure — when the remote starts pushing back (HTTP 429/503 or Retry-After), p-flow automatically dials concurrency down, then ramps it back up once things recover (AIMD, the algorithm behind TCP congestion control). Push maximum throughput without getting banned.

  • 🪶 Zero dependencies, tiny, fully typed.
  • 🌍 Runs anywhere: Node, Bun, Deno, edge, browsers.
  • 🧩 One object instead of four libraries.
  • 🛡️ Honors Retry-After, supports AbortSignal, per-task overrides.
npm install @rubyleung/p-flow

Quick start

import { PFlow } from '@rubyleung/p-flow'

const flow = new PFlow({
  concurrency: 8,             // at most 8 at a time
  interval: 1000,             // ...and
  intervalCap: 20,            // at most 20 started per second
  timeout: 10_000,            // 10s per task
  retry: { retries: 3, jitter: true, respectRetryAfter: true },
  adaptive: true,             // auto-throttle on 429/503
})

// run a single task
const me = await flow.run(({ signal }) => fetch('/api/me', { signal }).then((r) => r.json()))

// map over many — order preserved, limits enforced.
// throw on bad responses so retry / respectRetryAfter kick in
// (fetch never rejects on 4xx/5xx, so retries only fire if you throw)
const pages = await flow.map(urls, async (url, _i, { signal }) => {
  const res = await fetch(url, { signal })
  if (!res.ok) throw Object.assign(new Error(`HTTP ${res.status}`), { status: res.status, headers: res.headers })
  return res.text()
})

The problem it solves

Naive:   Promise.all(urls.map(fetch))   →  1000 sockets at once  →  💥 banned / OOM
Serial:  for (const u of urls) await…    →  safe but painfully slow
p-flow:  flow.map(urls, fetch)           →  capped, rate-limited, retried,
                                            self-throttling on 429  →  ✅ fast & safe

Recipes

Polite, ban-resistant scraping

const flow = new PFlow({
  concurrency: 10,
  interval: 1000, intervalCap: 30,
  retry: { retries: 5, backoff: 'exponential', jitter: 'full', respectRetryAfter: true },
  adaptive: { min: 2 },              // never drop below 2, ramp back to 10
})

flow.on('throttle', ({ concurrency }) => console.warn('backing off →', concurrency))
flow.on('retry', ({ attempt, delay }) => console.log(`retry #${attempt} in ${delay}ms`))

const results = await flow.map(urls, async (url, _i, { signal }) => {
  const res = await fetch(url, { signal })
  // throw on push-back so retry + respectRetryAfter (reads res.headers) engage
  if (res.status === 429 || res.status === 503 || !res.ok) {
    throw Object.assign(new Error(`HTTP ${res.status}`), { status: res.status, headers: res.headers })
  }
  return res.json()
})

Prioritize some work

flow.run(() => criticalJob(), { priority: 10 })   // jumps the queue
flow.run(() => backgroundJob(), { priority: 0 })

Wait for everything to finish

for (const job of jobs) flow.run(job)   // fire-and-forget
await flow.onIdle()                     // resolves when fully drained

Gotchas

  • Retries fire only on throw. fetch resolves a Response even for 429/503, so it won't retry unless you throw on bad responses (see the examples). adaptive backpressure does react to a returned 429/503, but retry/respectRetryAfter need an error.
  • timeout & signal are cooperative. On timeout the task's signal is aborted and run() rejects, but JS can't force-cancel a Promise — a task that ignores signal keeps running in the background, so actually executing bodies can briefly exceed concurrency. Always forward ctx.signal to fetch/IO.
  • clear() rejects pending tasks with AbortError (so awaiters aren't stuck). Catch them, or they surface as unhandled rejections.
  • Two kinds of attempt. ctx.attempt is 0-based (0 = first try). The retry event's attempt and retryOn(error, attempt) are 1-based (1 = first retry).
  • Invalid config fails fast. concurrency/intervalCap must be a finite number >= 1 (or Infinity); otherwise the constructor throws RangeError rather than silently deadlocking.

API

new PFlow(options)

option type default description
concurrency number Infinity Max tasks running at once.
interval number Rate-limit window (ms). Pair with intervalCap.
intervalCap number Infinity Max tasks started per interval.
timeout number 0 Per-task timeout (ms). Aborts the task's signal.
retry number | RetryOptions 0 Retry policy (number = retries).
adaptive boolean | AdaptiveOptions false AIMD auto-throttling on push-back.
autoStart boolean true Start processing immediately.
throttleOn (info) => boolean Custom throttle detection.

RetryOptions: retries, backoff ('exponential' \| 'linear' \| 'fixed'), factor, minDelay, maxDelay, jitter (true \| 'full' \| 'equal'), respectRetryAfter, retryOn(error, attempt).

AdaptiveOptions: min, max, decreaseFactor (default 0.5), increaseStep (default 1), successesToIncrease (default 10).

Methods

  • run(fn, options?)Promise<T> — enqueue a task. fn receives { signal, attempt }. Options: priority, signal, timeout, retry.
  • map(items, fn, options?)Promise<R[]> — map with limits, order preserved.
  • pause() / start() / clear() / onIdle()
  • getters: size, pending, concurrency, isPaused

Events

flow.on(event, cb) (returns an unsubscribe fn): retry {error, attempt, delay} · throttle {concurrency} · concurrency (n) · idle · active · resolve · reject.

vs. p-limit / p-queue

p-limit p-queue p-flow
Concurrency limit
Rate limiting
Priority queue
Retry + backoff + jitter
Per-task timeout
Adaptive backpressure
Dependencies 1 1 0

中文

p-limitp-queuep-retry 都很好用——但你往往需要同时用上好几个,再手动拼起来。p-flow 把并发控制、限流、重试、超时、优先级队列收进一个直观的 API,还多了一个它们都没有的本事:

🌟 自适应背压——当对方开始限速(HTTP 429/503Retry-After),p-flow自动下调并发,稳定后再逐步回升(AIMD,即 TCP 拥塞控制用的算法)。既打满吞吐,又不被封。

  • 🪶 零依赖、极小、完整 TypeScript 类型。
  • 🌍 到处能跑:Node、Bun、Deno、边缘、浏览器
  • 🧩 一个对象顶四个库。
  • 🛡️ 尊重 Retry-After、支持 AbortSignal、可按任务覆盖配置。
npm install @rubyleung/p-flow
import { PFlow } from '@rubyleung/p-flow'

const flow = new PFlow({
  concurrency: 8,            // 最多同时 8 个
  interval: 1000,            // 每秒
  intervalCap: 20,           // 最多启动 20 个
  timeout: 10_000,           // 每个任务 10 秒超时
  retry: { retries: 3, jitter: true, respectRetryAfter: true },
  adaptive: true,            // 遇到 429/503 自动减速
})

const data = await flow.run(() => fetch(url).then((r) => r.json()))   // 单个
const all  = await flow.map(urls, (u) => fetch(u))                     // 批量,保序

它解决什么问题

一股脑:  Promise.all(urls.map(fetch))  →  瞬间 1000 个连接  →  💥 被封 / 内存爆
傻等:    for (const u of urls) await…  →  安全但慢到怀疑人生
p-flow:  flow.map(urls, fetch)         →  限并发 + 限速 + 重试 + 遇 429 自动减速  →  ✅ 又快又稳

API、事件、配置项同上表。完整示例见 examples/ 与上方英文 Recipes。

License

MIT © Ruby Leung

About

Tiny, zero-dependency concurrency orchestrator for Promises: concurrency limit, rate limiting, retry with backoff & jitter, timeout, priority queue, and adaptive backpressure.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors