Skip to content

gitstq/FlowPilot

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Python 3.8+ MIT License Zero Dependencies Beta

FlowPilot

轻量级持久化工作流编排引擎 CLI 工具
纯 Python 实现,零外部依赖

简体中文 | 繁體中文 | English


简体中文

🎉 项目介绍

FlowPilot 是一个轻量级的持久化工作流编排引擎,以 CLI 工具的形式提供。它使用纯 Python 标准库构建,零外部依赖,开箱即用。

无论你是需要编排数据处理流水线、自动化运维任务,还是构建 CI/CD 流程,FlowPilot 都能以极低的接入成本为你提供可靠的工作流执行能力。

💡 核心设计理念:简单、可靠、可恢复。让工作流编排回归本质,不再被复杂的依赖和配置所困扰。

🏗️ 架构概览

┌─────────────────────────────────────────────────┐
│                   CLI / TUI                      │
│            flowpilot run / dashboard             │
├─────────────────────────────────────────────────┤
│              HTTP REST API Server                │
│              flowpilot serve                     │
├─────────────────────────────────────────────────┤
│              Workflow Engine                     │
│  ┌──────────┬──────────┬──────────┬───────────┐ │
│  │  Router   │ Executor │  Retry   │    DLQ     │ │
│  └──────────┴──────────┴──────────┴───────────┘ │
├─────────────────────────────────────────────────┤
│           SQLite Persistent Storage              │
├─────────────────────────────────────────────────┤
│           Plugin System (Extensible)             │
└─────────────────────────────────────────────────┘

✨ 核心特性

特性 说明
📦 零外部依赖 纯 Python 标准库实现,无需安装任何第三方包
📝 YAML 工作流定义 使用直观的 YAML 文件描述工作流,清晰易读
🔧 9 种内置步骤类型 shellhttp_requestsleeptransformconditionparallelforeachsub_workflownotify
🔄 指数退避重试 支持带抖动(jitter)的指数退避重试策略
📬 死信队列 (DLQ) 失败步骤自动进入死信队列,支持查看和重试
并行执行 基于 ThreadPoolExecutor 的并行步骤执行
🔀 条件路由 支持多种运算符的动态条件分支
💾 SQLite 持久化存储 零配置,开箱即用的状态持久化
🌐 内置 HTTP REST API 轻量级 HTTP 服务器,支持远程管理
📊 TUI 仪表盘 基于 curses 的终端实时监控面板
🔌 插件系统 支持自定义步骤处理器,灵活扩展
🛡️ 崩溃恢复 中断的工作流可自动恢复,继续执行
📐 模板变量解析 支持 ${steps.step_name.output} 引用上游步骤输出

🚀 快速开始

📋 环境要求

  • Python 3.8+(无其他依赖)

🔧 安装方式

方式一:pip 安装(推荐)

# 克隆项目
git clone https://github.com/your-repo/flowpilot.git
cd flowpilot

# 以开发模式安装
pip install -e .

方式二:直接使用

# 将项目路径加入 PYTHONPATH
export PYTHONPATH=/path/to/flowpilot:$PYTHONPATH

# 直接通过 Python 模块运行
python -m flowpilot --version

方式三:Docker 部署

FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install -e .
ENTRYPOINT ["flowpilot"]
docker build -t flowpilot .
docker run -v $(pwd)/workflows:/workflows flowpilot run /workflows/my-pipeline.yaml

🏃 运行第一个工作流

创建一个简单的工作流文件 hello.yaml

name: hello-flowpilot
version: "1.0.0"
description: "我的第一个 FlowPilot 工作流"
tags: [demo, getting-started]

steps:
  - name: greet
    type: shell
    config:
      command: "echo 'Hello, FlowPilot! 🎉'"
    timeout: 10

  - name: get-time
    type: shell
    config:
      command: "date '+%Y-%m-%d %H:%M:%S'"
    timeout: 10

  - name: done
    type: notify
    config:
      message: "工作流执行完成!"
      level: info

运行它:

# 执行工作流并等待结果
flowpilot run hello.yaml --wait

# 异步执行
flowpilot run hello.yaml

# 带输入参数执行
flowpilot run hello.yaml --input name=张三 --input env=production

输出示例:

Execution exec_abc123 finished: completed
  [+] greet: completed (45ms)
      Output: Hello, FlowPilot! 🎉
  [+] get-time: completed (12ms)
      Output: 2026-05-07 14:30:00
  [+] done: completed (1ms)

📖 详细使用指南

📝 工作流 YAML 格式

一个完整的 FlowPilot 工作流定义如下:

name: my-workflow          # 工作流名称(必填)
version: "1.0.0"           # 版本号(必填)
description: "工作流描述"    # 描述信息
tags: [tag1, tag2]         # 标签(可选)

steps:                     # 步骤列表(必填)
  - name: step-one         # 步骤名称(唯一标识)
    type: shell            # 步骤类型
    config:                # 步骤配置
      command: "echo 'hi'"
    retry:                 # 重试策略(可选)
      max_retries: 3
      backoff_base: 2.0
      backoff_max: 30.0
      jitter: true
    timeout: 30            # 超时时间(秒)
    on_failure: dlq        # 失败处理策略:dlq / skip / abort(默认 abort)

🔧 步骤类型详解

1. shell - 执行 Shell 命令
- name: run-script
  type: shell
  config:
    command: "python script.py --arg value"
    cwd: "/path/to/project"       # 工作目录
    env:                           # 环境变量
      API_KEY: "${input.api_key}"
    shell: true                    # 是否使用 shell(默认 true)
    timeout: 60                    # 超时时间
2. http_request - 发送 HTTP 请求
- name: fetch-data
  type: http_request
  config:
    url: "https://api.example.com/data"
    method: "GET"                  # GET / POST / PUT / DELETE
    headers:
      Content-Type: "application/json"
      Authorization: "Bearer ${input.token}"
    body: '{"key": "value"}'       # 请求体
    timeout: 30
3. sleep - 等待/延迟
- name: wait-a-bit
  type: sleep
  config:
    seconds: 5                     # 等待秒数
4. transform - 数据转换
- name: parse-json
  type: transform
  config:
    operation: "json_parse"        # json_parse / json_extract / template / base64_encode / base64_decode
    input: "${steps.fetch-data.output}"
5. condition - 条件路由
- name: check-env
  type: shell
  config:
    command: "echo 'production'"

- name: route
  type: condition
  config:
    expression: "equals"           # equals / not_equals / contains / greater_than / less_than / regex / is_empty / is_not_empty
    left: "${steps.check-env.output}"
    right: "production"
  branches:
    true: [prod-deploy]            # 条件为真时执行的步骤
    false: [dev-deploy]            # 条件为假时执行的步骤

- name: prod-deploy
  type: shell
  config:
    command: "echo 'Deploying to production...'"

- name: dev-deploy
  type: shell
  config:
    command: "echo 'Deploying to development...'"
6. parallel - 并行执行
- name: parallel-tasks
  type: parallel
  config:
    max_workers: 4                 # 最大并发数
    steps:
      - name: task-a
        type: shell
        config:
          command: "echo 'Task A'"
      - name: task-b
        type: shell
        config:
          command: "echo 'Task B'"
      - name: task-c
        type: shell
        config:
          command: "sleep 3 && echo 'Task C'"
7. foreach - 循环迭代
- name: process-items
  type: foreach
  config:
    items: ["apple", "banana", "cherry"]   # 也可以引用变量:${steps.get-list.output}
    item_var: item                          # 循环变量名
    steps:
      - name: process
        type: shell
        config:
          command: "echo 'Processing: ${item}'"
8. sub_workflow - 子工作流
- name: run-sub
  type: sub_workflow
  config:
    workflow: "path/to/another-workflow.yaml"
    input:                                # 传递给子工作流的参数
      parent_id: "${execution.id}"
      data: "${steps.prepare.output}"
9. notify - 通知
- name: send-notification
  type: notify
  config:
    message: "部署完成!构建版本:${steps.build.output}"
    level: info                           # info / warning / error

🖥️ CLI 命令参考

# 运行工作流
flowpilot run <workflow.yaml> [--input key=value...] [--wait]

# 列出执行记录
flowpilot list [--status STATUS] [--workflow NAME] [--limit N]

# 查看执行状态
flowpilot status <execution_id>

# 取消执行
flowpilot cancel <execution_id>

# 重试失败执行
flowpilot retry <execution_id> [--step STEP_NAME]

# 死信队列管理
flowpilot dlq [--list] [--retry ID] [--purge] [--limit N]

# 启动 HTTP API 服务
flowpilot serve [--host HOST] [--port PORT]

# 打开 TUI 仪表盘
flowpilot dashboard

# 清理执行记录
flowpilot purge [--all] [--completed] [--before DATE]

# 列出已保存的工作流
flowpilot workflows

# 查看版本
flowpilot --version

# 查看帮助
flowpilot --help

全局参数:

参数 说明 默认值
--db SQLite 数据库路径 .flowpilot.db
--plugins-dir 插件目录路径 ./plugins
--log-level 日志级别 INFO

🌐 HTTP REST API

启动 API 服务器:

flowpilot serve --host 0.0.0.0 --port 8080

主要 API 端点:

方法 路径 说明
POST /api/v1/workflows/run 提交工作流执行
GET /api/v1/executions 列出执行记录
GET /api/v1/executions/:id 获取执行详情
POST /api/v1/executions/:id/cancel 取消执行
POST /api/v1/executions/:id/retry 重试执行
GET /api/v1/dlq 列出死信队列
POST /api/v1/dlq/:id/retry 重试死信项
DELETE /api/v1/dlq 清空死信队列
GET /api/v1/workflows 列出已注册工作流

示例:通过 API 提交工作流

curl -X POST http://localhost:8080/api/v1/workflows/run \
  -H "Content-Type: application/json" \
  -d '{
    "workflow": "hello.yaml",
    "input": {"name": "FlowPilot"}
  }'

📊 TUI 仪表盘

启动终端仪表盘,实时监控工作流执行状态:

flowpilot dashboard

功能包括:

  • 📋 实时执行列表与状态展示
  • 🔄 自动刷新运行中的工作流
  • 📈 步骤级别执行详情
  • Q 退出仪表盘

🔌 插件系统

FlowPilot 支持通过插件系统扩展自定义步骤类型。

创建插件:

plugins/ 目录下创建 Python 文件,例如 plugins/my_steps.py

"""自定义步骤处理器插件"""

def register(handlers):
    """注册自定义步骤类型。"""

    def ssh_exec(config, context):
        """执行远程 SSH 命令。"""
        host = config.get("host")
        cmd = config.get("command")
        # 实现你的 SSH 逻辑...
        return {"output": f"Executed on {host}: {cmd}"}

    def sql_query(config, context):
        """执行 SQL 查询。"""
        db_url = config.get("database")
        query = config.get("query")
        # 实现你的查询逻辑...
        return {"output": "Query results..."}

    # 注册步骤处理器
    handlers["ssh_exec"] = ssh_exec
    handlers["sql_query"] = sql_query

使用自定义步骤:

steps:
  - name: remote-deploy
    type: ssh_exec
    config:
      host: "deploy.example.com"
      command: "docker-compose up -d"

指定插件目录:

flowpilot run workflow.yaml --plugins-dir /path/to/my/plugins

💡 设计思路与迭代规划

🎯 设计哲学

  1. 零依赖原则 - 仅使用 Python 标准库,消除版本冲突和环境配置的烦恼
  2. 持久化优先 - 所有状态实时写入 SQLite,确保崩溃后可完整恢复
  3. 约定优于配置 - 合理的默认值,开箱即用,减少不必要的配置项
  4. 渐进式复杂度 - 简单场景用简单写法,复杂场景有足够的能力支撑
  5. 可观测性 - 丰富的 CLI 输出、TUI 仪表盘、REST API,全面掌握执行状态

🗺️ 迭代规划

  • v0.1 - 核心引擎:步骤执行、状态持久化、崩溃恢复
  • v0.5 - 重试机制:指数退避、抖动、死信队列
  • v0.8 - 控制流:条件路由、并行执行、循环迭代
  • v1.0 - 完整功能:HTTP API、TUI 仪表盘、插件系统
  • v1.1 - 工作流可视化:执行流程图生成
  • v1.2 - 分布式模式:多节点任务分发
  • v1.5 - Web UI:浏览器管理界面
  • v2.0 - 云原生:Kubernetes Operator、事件驱动架构

📦 打包与部署指南

📌 作为 Python 包安装

# 从源码安装
git clone https://github.com/your-repo/flowpilot.git
cd flowpilot
pip install -e .

# 验证安装
flowpilot --version

setup.py 中已配置 console_scripts 入口点,安装后 flowpilot 命令将自动注册到系统 PATH。

🐳 Docker 部署

Dockerfile:

FROM python:3.11-slim

WORKDIR /app

COPY . .
RUN pip install -e .

# 创建工作目录
RUN mkdir -p /data/workflows /data/plugins

ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1

ENTRYPOINT ["flowpilot"]
CMD ["serve", "--host", "0.0.0.0", "--port", "8080"]

构建与运行:

# 构建镜像
docker build -t flowpilot:latest .

# 运行 API 服务
docker run -d \
  --name flowpilot \
  -p 8080:8080 \
  -v $(pwd)/workflows:/data/workflows \
  -v flowpilot-data:/app \
  flowpilot:latest

# 执行工作流
docker run --rm \
  -v $(pwd)/workflows:/data/workflows \
  flowpilot:latest run /data/workflows/pipeline.yaml --wait

🔧 作为库使用

FlowPilot 也可以作为 Python 库在代码中直接调用:

from flowpilot.engine import WorkflowEngine

# 创建引擎实例
engine = WorkflowEngine(db_path="my_workflows.db")
engine.start()

# 提交工作流
execution = engine.submit(
    "path/to/workflow.yaml",
    input_data={"key": "value"},
    wait=True,
)

print(f"状态: {execution.status.value}")
print(f"步骤结果: {execution.step_results}")

# 查询执行记录
records = engine.list_executions(status="completed", limit=10)

# 清理
engine.stop()

⚙️ 生产环境建议

  • 使用 --db 指定持久化的 SQLite 路径,确保数据目录有足够的磁盘空间
  • 使用 --log-level DEBUG 排查问题,生产环境建议使用 INFOWARNING
  • 通过 systemd 或 supervisor 管理 flowpilot serve 进程,确保服务自动重启
  • 定期使用 flowpilot purge --completed 清理已完成的执行记录

🤝 贡献指南

我们欢迎并感谢所有形式的贡献!无论是提交 Bug、改进文档,还是贡献代码。

🔄 贡献流程

  1. Fork 本仓库
  2. 创建特性分支:git checkout -b feature/my-new-feature
  3. 编写代码并添加测试
  4. 确保测试通过:python -m pytest tests/
  5. 提交变更:git commit -m 'feat: add my new feature'
  6. 推送分支:git push origin feature/my-new-feature
  7. 提交 Pull Request

📏 代码规范

  • 遵循 PEP 8 编码规范
  • 所有公共方法需包含 docstring
  • 新功能需附带对应的单元测试
  • Commit 信息建议使用 Conventional Commits 格式

🐛 报告问题

请通过 GitHub Issues 提交 Bug 报告或功能建议,并尽量附上:

  • 问题描述和复现步骤
  • 工作流 YAML 文件(如相关)
  • 运行环境信息(Python 版本、操作系统)
  • 相关日志输出

📄 开源协议说明

本项目基于 MIT License 开源。

MIT License

Copyright (c) 2026 FlowPilot Contributors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.

你可以自由使用、修改和分发本软件,但请保留版权声明和许可声明。


繁體中文

🎉 專案介紹

FlowPilot 是一個輕量級的持久化工作流程編排引擎,以 CLI 工具的形式提供。它使用純 Python 標準函式庫建構,零外部依賴,開箱即用。

無論你需要編排資料處理管線、自動化維運任務,還是建構 CI/CD 流程,FlowPilot 都能以極低的接入成本為你提供可靠的工作流程執行能力。

💡 核心設計理念:簡單、可靠、可恢復。讓工作流程編排回歸本質,不再被複雜的依賴和設定所困擾。

🏗️ 架構概覽

┌─────────────────────────────────────────────────┐
│                   CLI / TUI                      │
│            flowpilot run / dashboard             │
├─────────────────────────────────────────────────┤
│              HTTP REST API Server                │
│              flowpilot serve                     │
├─────────────────────────────────────────────────┤
│              Workflow Engine                     │
│  ┌──────────┬──────────┬──────────┬───────────┐ │
│  │  Router   │ Executor │  Retry   │    DLQ     │ │
│  └──────────┴──────────┴──────────┴───────────┘ │
├─────────────────────────────────────────────────┤
│           SQLite Persistent Storage              │
├─────────────────────────────────────────────────┤
│           Plugin System (Extensible)             │
└─────────────────────────────────────────────────┘

✨ 核心特性

特性 說明
📦 零外部依賴 純 Python 標準函式庫實作,無需安裝任何第三方套件
📝 YAML 工作流程定義 使用直觀的 YAML 檔案描述工作流程,清晰易讀
🔧 9 種內建步驟類型 shellhttp_requestsleeptransformconditionparallelforeachsub_workflownotify
🔄 指數退避重試 支援帶抖動(jitter)的指數退避重試策略
📬 死信佇列 (DLQ) 失敗步驟自動進入死信佇列,支援查看和重試
平行執行 基於 ThreadPoolExecutor 的平行步驟執行
🔀 條件路由 支援多種運算子的動態條件分支
💾 SQLite 持久化儲存 零設定,開箱即用的狀態持久化
🌐 內建 HTTP REST API 輕量級 HTTP 伺服器,支援遠端管理
📊 TUI 儀表板 基於 curses 的終端即時監控面板
🔌 外掛系統 支援自訂步驟處理器,靈活擴展
🛡️ 崩潰恢復 中斷的工作流程可自動恢復,繼續執行
📐 模板變數解析 支援 ${steps.step_name.output} 引用上游步驟輸出

🚀 快速開始

📋 環境需求

  • Python 3.8+(無其他依賴)

🔧 安裝方式

方式一:pip 安裝(推薦)

# 克隆專案
git clone https://github.com/your-repo/flowpilot.git
cd flowpilot

# 以開發模式安裝
pip install -e .

方式二:直接使用

# 將專案路徑加入 PYTHONPATH
export PYTHONPATH=/path/to/flowpilot:$PYTHONPATH

# 直接透過 Python 模組執行
python -m flowpilot --version

方式三:Docker 部署

FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install -e .
ENTRYPOINT ["flowpilot"]
docker build -t flowpilot .
docker run -v $(pwd)/workflows:/workflows flowpilot run /workflows/my-pipeline.yaml

🏃 執行第一個工作流程

建立一個簡單的工作流程檔案 hello.yaml

name: hello-flowpilot
version: "1.0.0"
description: "我的第一個 FlowPilot 工作流程"
tags: [demo, getting-started]

steps:
  - name: greet
    type: shell
    config:
      command: "echo 'Hello, FlowPilot! 🎉'"
    timeout: 10

  - name: get-time
    type: shell
    config:
      command: "date '+%Y-%m-%d %H:%M:%S'"
    timeout: 10

  - name: done
    type: notify
    config:
      message: "工作流程執行完成!"
      level: info

執行它:

# 執行工作流程並等待結果
flowpilot run hello.yaml --wait

# 非同步執行
flowpilot run hello.yaml

# 帶輸入參數執行
flowpilot run hello.yaml --input name=張三 --input env=production

輸出範例:

Execution exec_abc123 finished: completed
  [+] greet: completed (45ms)
      Output: Hello, FlowPilot! 🎉
  [+] get-time: completed (12ms)
      Output: 2026-05-07 14:30:00
  [+] done: completed (1ms)

📖 詳細使用指南

📝 工作流程 YAML 格式

一個完整的 FlowPilot 工作流程定義如下:

name: my-workflow          # 工作流程名稱(必填)
version: "1.0.0"           # 版本號(必填)
description: "工作流程描述"  # 描述資訊
tags: [tag1, tag2]         # 標籤(可選)

steps:                     # 步驟列表(必填)
  - name: step-one         # 步驟名稱(唯一識別)
    type: shell            # 步驟類型
    config:                # 步驟設定
      command: "echo 'hi'"
    retry:                 # 重試策略(可選)
      max_retries: 3
      backoff_base: 2.0
      backoff_max: 30.0
      jitter: true
    timeout: 30            # 逾時時間(秒)
    on_failure: dlq        # 失敗處理策略:dlq / skip / abort(預設 abort)

🔧 步驟類型詳解

1. shell - 執行 Shell 命令
- name: run-script
  type: shell
  config:
    command: "python script.py --arg value"
    cwd: "/path/to/project"       # 工作目錄
    env:                           # 環境變數
      API_KEY: "${input.api_key}"
    shell: true                    # 是否使用 shell(預設 true)
    timeout: 60                    # 逾時時間
2. http_request - 發送 HTTP 請求
- name: fetch-data
  type: http_request
  config:
    url: "https://api.example.com/data"
    method: "GET"                  # GET / POST / PUT / DELETE
    headers:
      Content-Type: "application/json"
      Authorization: "Bearer ${input.token}"
    body: '{"key": "value"}'       # 請求體
    timeout: 30
3. sleep - 等待/延遲
- name: wait-a-bit
  type: sleep
  config:
    seconds: 5                     # 等待秒數
4. transform - 資料轉換
- name: parse-json
  type: transform
  config:
    operation: "json_parse"        # json_parse / json_extract / template / base64_encode / base64_decode
    input: "${steps.fetch-data.output}"
5. condition - 條件路由
- name: check-env
  type: shell
  config:
    command: "echo 'production'"

- name: route
  type: condition
  config:
    expression: "equals"           # equals / not_equals / contains / greater_than / less_than / regex / is_empty / is_not_empty
    left: "${steps.check-env.output}"
    right: "production"
  branches:
    true: [prod-deploy]            # 條件為真時執行的步驟
    false: [dev-deploy]            # 條件為假時執行的步驟

- name: prod-deploy
  type: shell
  config:
    command: "echo 'Deploying to production...'"

- name: dev-deploy
  type: shell
  config:
    command: "echo 'Deploying to development...'"
6. parallel - 平行執行
- name: parallel-tasks
  type: parallel
  config:
    max_workers: 4                 # 最大並行數
    steps:
      - name: task-a
        type: shell
        config:
          command: "echo 'Task A'"
      - name: task-b
        type: shell
        config:
          command: "echo 'Task B'"
      - name: task-c
        type: shell
        config:
          command: "sleep 3 && echo 'Task C'"
7. foreach - 迴圈迭代
- name: process-items
  type: foreach
  config:
    items: ["apple", "banana", "cherry"]   # 也可以引用變數:${steps.get-list.output}
    item_var: item                          # 迴圈變數名稱
    steps:
      - name: process
        type: shell
        config:
          command: "echo 'Processing: ${item}'"
8. sub_workflow - 子工作流程
- name: run-sub
  type: sub_workflow
  config:
    workflow: "path/to/another-workflow.yaml"
    input:                                # 傳遞給子工作流程的參數
      parent_id: "${execution.id}"
      data: "${steps.prepare.output}"
9. notify - 通知
- name: send-notification
  type: notify
  config:
    message: "部署完成!建構版本:${steps.build.output}"
    level: info                           # info / warning / error

🖥️ CLI 命令參考

# 執行工作流程
flowpilot run <workflow.yaml> [--input key=value...] [--wait]

# 列出執行記錄
flowpilot list [--status STATUS] [--workflow NAME] [--limit N]

# 查看執行狀態
flowpilot status <execution_id>

# 取消執行
flowpilot cancel <execution_id>

# 重試失敗執行
flowpilot retry <execution_id> [--step STEP_NAME]

# 死信佇列管理
flowpilot dlq [--list] [--retry ID] [--purge] [--limit N]

# 啟動 HTTP API 服務
flowpilot serve [--host HOST] [--port PORT]

# 開啟 TUI 儀表板
flowpilot dashboard

# 清理執行記錄
flowpilot purge [--all] [--completed] [--before DATE]

# 列出已儲存的工作流程
flowpilot workflows

# 查看版本
flowpilot --version

# 查看說明
flowpilot --help

全域參數:

參數 說明 預設值
--db SQLite 資料庫路徑 .flowpilot.db
--plugins-dir 外掛目錄路徑 ./plugins
--log-level 日誌級別 INFO

🌐 HTTP REST API

啟動 API 伺服器:

flowpilot serve --host 0.0.0.0 --port 8080

主要 API 端點:

方法 路徑 說明
POST /api/v1/workflows/run 提交工作流程執行
GET /api/v1/executions 列出執行記錄
GET /api/v1/executions/:id 取得執行詳情
POST /api/v1/executions/:id/cancel 取消執行
POST /api/v1/executions/:id/retry 重試執行
GET /api/v1/dlq 列出死信佇列
POST /api/v1/dlq/:id/retry 重試死信項目
DELETE /api/v1/dlq 清空死信佇列
GET /api/v1/workflows 列出已註冊工作流程

範例:透過 API 提交工作流程

curl -X POST http://localhost:8080/api/v1/workflows/run \
  -H "Content-Type: application/json" \
  -d '{
    "workflow": "hello.yaml",
    "input": {"name": "FlowPilot"}
  }'

📊 TUI 儀表板

啟動終端儀表板,即時監控工作流程執行狀態:

flowpilot dashboard

功能包括:

  • 📋 即時執行列表與狀態展示
  • 🔄 自動重新整理執行中的工作流程
  • 📈 步驟級別執行詳情
  • Q 退出儀表板

🔌 外掛系統

FlowPilot 支援透過外掛系統擴展自訂步驟類型。

建立外掛:

plugins/ 目錄下建立 Python 檔案,例如 plugins/my_steps.py

"""自訂步驟處理器外掛"""

def register(handlers):
    """註冊自訂步驟類型。"""

    def ssh_exec(config, context):
        """執行遠端 SSH 命令。"""
        host = config.get("host")
        cmd = config.get("command")
        # 實作你的 SSH 邏輯...
        return {"output": f"Executed on {host}: {cmd}"}

    def sql_query(config, context):
        """執行 SQL 查詢。"""
        db_url = config.get("database")
        query = config.get("query")
        # 實作你的查詢邏輯...
        return {"output": "Query results..."}

    # 註冊步驟處理器
    handlers["ssh_exec"] = ssh_exec
    handlers["sql_query"] = sql_query

使用自訂步驟:

steps:
  - name: remote-deploy
    type: ssh_exec
    config:
      host: "deploy.example.com"
      command: "docker-compose up -d"

指定外掛目錄:

flowpilot run workflow.yaml --plugins-dir /path/to/my/plugins

💡 設計思路與迭代規劃

🎯 設計哲學

  1. 零依賴原則 - 僅使用 Python 標準函式庫,消除版本衝突和環境設定的煩惱
  2. 持久化優先 - 所有狀態即時寫入 SQLite,確保崩潰後可完整恢復
  3. 約定優於設定 - 合理的預設值,開箱即用,減少不必要的設定項
  4. 漸進式複雜度 - 簡單場景用簡單寫法,複雜場景有足夠的能力支撐
  5. 可觀測性 - 豐富的 CLI 輸出、TUI 儀表板、REST API,全面掌握執行狀態

🗺️ 迭代規劃

  • v0.1 - 核心引擎:步驟執行、狀態持久化、崩潰恢復
  • v0.5 - 重試機制:指數退避、抖動、死信佇列
  • v0.8 - 控制流:條件路由、平行執行、迴圈迭代
  • v1.0 - 完整功能:HTTP API、TUI 儀表板、外掛系統
  • v1.1 - 工作流程視覺化:執行流程圖生成
  • v1.2 - 分散式模式:多節點任務分發
  • v1.5 - Web UI:瀏覽器管理介面
  • v2.0 - 雲原生:Kubernetes Operator、事件驅動架構

📦 打包與部署指南

📌 作為 Python 套件安裝

# 從原始碼安裝
git clone https://github.com/your-repo/flowpilot.git
cd flowpilot
pip install -e .

# 驗證安裝
flowpilot --version

setup.py 中已設定 console_scripts 入口點,安裝後 flowpilot 命令將自動註冊到系統 PATH。

🐳 Docker 部署

Dockerfile:

FROM python:3.11-slim

WORKDIR /app

COPY . .
RUN pip install -e .

# 建立工作目錄
RUN mkdir -p /data/workflows /data/plugins

ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1

ENTRYPOINT ["flowpilot"]
CMD ["serve", "--host", "0.0.0.0", "--port", "8080"]

建構與執行:

# 建構映像檔
docker build -t flowpilot:latest .

# 執行 API 服務
docker run -d \
  --name flowpilot \
  -p 8080:8080 \
  -v $(pwd)/workflows:/data/workflows \
  -v flowpilot-data:/app \
  flowpilot:latest

# 執行工作流程
docker run --rm \
  -v $(pwd)/workflows:/data/workflows \
  flowpilot:latest run /data/workflows/pipeline.yaml --wait

🔧 作為函式庫使用

FlowPilot 也可以作為 Python 函式庫在程式碼中直接呼叫:

from flowpilot.engine import WorkflowEngine

# 建立引擎實例
engine = WorkflowEngine(db_path="my_workflows.db")
engine.start()

# 提交工作流程
execution = engine.submit(
    "path/to/workflow.yaml",
    input_data={"key": "value"},
    wait=True,
)

print(f"狀態: {execution.status.value}")
print(f"步驟結果: {execution.step_results}")

# 查詢執行記錄
records = engine.list_executions(status="completed", limit=10)

# 清理
engine.stop()

⚙️ 生產環境建議

  • 使用 --db 指定持久化的 SQLite 路徑,確保資料目錄有足夠的磁碟空間
  • 使用 --log-level DEBUG 排查問題,生產環境建議使用 INFOWARNING
  • 透過 systemd 或 supervisor 管理 flowpilot serve 進程,確保服務自動重啟
  • 定期使用 flowpilot purge --completed 清理已完成的執行記錄

🤝 貢獻指南

我們歡迎並感謝所有形式的貢獻!無論是提交 Bug、改進文件,還是貢獻程式碼。

🔄 貢獻流程

  1. Fork 本儲存庫
  2. 建立特性分支:git checkout -b feature/my-new-feature
  3. 撰寫程式碼並新增測試
  4. 確保測試通過:python -m pytest tests/
  5. 提交變更:git commit -m 'feat: add my new feature'
  6. 推送分支:git push origin feature/my-new-feature
  7. 提交 Pull Request

📏 程式碼規範

  • 遵循 PEP 8 編碼規範
  • 所有公共方法需包含 docstring
  • 新功能需附帶對應的單元測試
  • Commit 訊息建議使用 Conventional Commits 格式

🐛 回報問題

請透過 GitHub Issues 提交 Bug 回報或功能建議,並盡量附上:

  • 問題描述和重現步驟
  • 工作流程 YAML 檔案(如相關)
  • 執行環境資訊(Python 版本、作業系統)
  • 相關日誌輸出

📄 開源協議說明

本專案基於 MIT License 開源。

MIT License

Copyright (c) 2026 FlowPilot Contributors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.

你可以自由使用、修改和分發本軟體,但請保留著作權宣告和許可宣告。


English

🎉 Project Introduction

FlowPilot is a lightweight durable workflow orchestration engine delivered as a CLI tool. Built entirely with the Python standard library, it has zero external dependencies and works out of the box.

Whether you need to orchestrate data processing pipelines, automate DevOps tasks, or build CI/CD workflows, FlowPilot provides reliable workflow execution with minimal setup.

💡 Core Philosophy: Simple, reliable, recoverable. Let workflow orchestration get back to basics -- no more wrestling with complex dependencies and configurations.

🏗️ Architecture Overview

┌─────────────────────────────────────────────────┐
│                   CLI / TUI                      │
│            flowpilot run / dashboard             │
├─────────────────────────────────────────────────┤
│              HTTP REST API Server                │
│              flowpilot serve                     │
├─────────────────────────────────────────────────┤
│              Workflow Engine                     │
│  ┌──────────┬──────────┬──────────┬───────────┐ │
│  │  Router   │ Executor │  Retry   │    DLQ     │ │
│  └──────────┴──────────┴──────────┴───────────┘ │
├─────────────────────────────────────────────────┤
│           SQLite Persistent Storage              │
├─────────────────────────────────────────────────┤
│           Plugin System (Extensible)             │
└─────────────────────────────────────────────────┘

✨ Core Features

Feature Description
📦 Zero Dependencies Pure Python standard library -- no third-party packages needed
📝 YAML Workflow Definitions Intuitive YAML files for clear, readable workflow descriptions
🔧 9 Built-in Step Types shell, http_request, sleep, transform, condition, parallel, foreach, sub_workflow, notify
🔄 Exponential Backoff Retry Configurable retry with jitter for resilient execution
📬 Dead Letter Queue (DLQ) Failed steps are captured for inspection and retry
Parallel Execution ThreadPoolExecutor-based concurrent step execution
🔀 Conditional Routing Dynamic workflow branching with multiple operators
💾 SQLite Persistent Storage Zero-config state persistence out of the box
🌐 Built-in HTTP REST API Lightweight HTTP server for remote management
📊 TUI Dashboard Real-time terminal monitoring with curses
🔌 Plugin System Extend with custom step handlers
🛡️ Crash Recovery Interrupted workflows resume automatically
📐 Template Variable Resolution Reference upstream step outputs with ${steps.step_name.output}

🚀 Quick Start

📋 Requirements

  • Python 3.8+ (nothing else required)

🔧 Installation

Option 1: pip install (recommended)

# Clone the repository
git clone https://github.com/your-repo/flowpilot.git
cd flowpilot

# Install in development mode
pip install -e .

Option 2: Direct usage

# Add the project to PYTHONPATH
export PYTHONPATH=/path/to/flowpilot:$PYTHONPATH

# Run via Python module
python -m flowpilot --version

Option 3: Docker

FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install -e .
ENTRYPOINT ["flowpilot"]
docker build -t flowpilot .
docker run -v $(pwd)/workflows:/workflows flowpilot run /workflows/my-pipeline.yaml

🏃 Run Your First Workflow

Create a workflow file hello.yaml:

name: hello-flowpilot
version: "1.0.0"
description: "My first FlowPilot workflow"
tags: [demo, getting-started]

steps:
  - name: greet
    type: shell
    config:
      command: "echo 'Hello, FlowPilot! 🎉'"
    timeout: 10

  - name: get-time
    type: shell
    config:
      command: "date '+%Y-%m-%d %H:%M:%S'"
    timeout: 10

  - name: done
    type: notify
    config:
      message: "Workflow completed successfully!"
      level: info

Run it:

# Execute and wait for results
flowpilot run hello.yaml --wait

# Run asynchronously
flowpilot run hello.yaml

# Run with input parameters
flowpilot run hello.yaml --input name=Alice --input env=production

Example output:

Execution exec_abc123 finished: completed
  [+] greet: completed (45ms)
      Output: Hello, FlowPilot! 🎉
  [+] get-time: completed (12ms)
      Output: 2026-05-07 14:30:00
  [+] done: completed (1ms)

📖 Detailed Usage Guide

📝 Workflow YAML Format

A complete FlowPilot workflow definition:

name: my-workflow          # Workflow name (required)
version: "1.0.0"           # Version (required)
description: "Workflow description"  # Description
tags: [tag1, tag2]         # Tags (optional)

steps:                     # Step list (required)
  - name: step-one         # Step name (unique identifier)
    type: shell            # Step type
    config:                # Step configuration
      command: "echo 'hi'"
    retry:                 # Retry strategy (optional)
      max_retries: 3
      backoff_base: 2.0
      backoff_max: 30.0
      jitter: true
    timeout: 30            # Timeout in seconds
    on_failure: dlq        # Failure policy: dlq / skip / abort (default: abort)

🔧 Step Types

1. shell - Execute Shell Commands
- name: run-script
  type: shell
  config:
    command: "python script.py --arg value"
    cwd: "/path/to/project"       # Working directory
    env:                           # Environment variables
      API_KEY: "${input.api_key}"
    shell: true                    # Use shell (default: true)
    timeout: 60                    # Timeout in seconds
2. http_request - Send HTTP Requests
- name: fetch-data
  type: http_request
  config:
    url: "https://api.example.com/data"
    method: "GET"                  # GET / POST / PUT / DELETE
    headers:
      Content-Type: "application/json"
      Authorization: "Bearer ${input.token}"
    body: '{"key": "value"}'       # Request body
    timeout: 30
3. sleep - Wait / Delay
- name: wait-a-bit
  type: sleep
  config:
    seconds: 5                     # Seconds to wait
4. transform - Data Transformation
- name: parse-json
  type: transform
  config:
    operation: "json_parse"        # json_parse / json_extract / template / base64_encode / base64_decode
    input: "${steps.fetch-data.output}"
5. condition - Conditional Routing
- name: check-env
  type: shell
  config:
    command: "echo 'production'"

- name: route
  type: condition
  config:
    expression: "equals"           # equals / not_equals / contains / greater_than / less_than / regex / is_empty / is_not_empty
    left: "${steps.check-env.output}"
    right: "production"
  branches:
    true: [prod-deploy]            # Steps to run when condition is true
    false: [dev-deploy]            # Steps to run when condition is false

- name: prod-deploy
  type: shell
  config:
    command: "echo 'Deploying to production...'"

- name: dev-deploy
  type: shell
  config:
    command: "echo 'Deploying to development...'"
6. parallel - Parallel Execution
- name: parallel-tasks
  type: parallel
  config:
    max_workers: 4                 # Maximum concurrency
    steps:
      - name: task-a
        type: shell
        config:
          command: "echo 'Task A'"
      - name: task-b
        type: shell
        config:
          command: "echo 'Task B'"
      - name: task-c
        type: shell
        config:
          command: "sleep 3 && echo 'Task C'"
7. foreach - Loop Iteration
- name: process-items
  type: foreach
  config:
    items: ["apple", "banana", "cherry"]   # Can also reference a variable: ${steps.get-list.output}
    item_var: item                          # Loop variable name
    steps:
      - name: process
        type: shell
        config:
          command: "echo 'Processing: ${item}'"
8. sub_workflow - Sub-workflow
- name: run-sub
  type: sub_workflow
  config:
    workflow: "path/to/another-workflow.yaml"
    input:                                # Parameters passed to the sub-workflow
      parent_id: "${execution.id}"
      data: "${steps.prepare.output}"
9. notify - Notification
- name: send-notification
  type: notify
  config:
    message: "Deployment complete! Build: ${steps.build.output}"
    level: info                           # info / warning / error

🖥️ CLI Command Reference

# Run a workflow
flowpilot run <workflow.yaml> [--input key=value...] [--wait]

# List executions
flowpilot list [--status STATUS] [--workflow NAME] [--limit N]

# Check execution status
flowpilot status <execution_id>

# Cancel an execution
flowpilot cancel <execution_id>

# Retry a failed execution
flowpilot retry <execution_id> [--step STEP_NAME]

# Dead Letter Queue management
flowpilot dlq [--list] [--retry ID] [--purge] [--limit N]

# Start HTTP API server
flowpilot serve [--host HOST] [--port PORT]

# Open TUI dashboard
flowpilot dashboard

# Purge execution records
flowpilot purge [--all] [--completed] [--before DATE]

# List saved workflows
flowpilot workflows

# Show version
flowpilot --version

# Show help
flowpilot --help

Global Options:

Flag Description Default
--db SQLite database path .flowpilot.db
--plugins-dir Plugin directory path ./plugins
--log-level Logging level INFO

🌐 HTTP REST API

Start the API server:

flowpilot serve --host 0.0.0.0 --port 8080

Key API endpoints:

Method Path Description
POST /api/v1/workflows/run Submit a workflow execution
GET /api/v1/executions List executions
GET /api/v1/executions/:id Get execution details
POST /api/v1/executions/:id/cancel Cancel an execution
POST /api/v1/executions/:id/retry Retry an execution
GET /api/v1/dlq List Dead Letter Queue items
POST /api/v1/dlq/:id/retry Retry a DLQ item
DELETE /api/v1/dlq Purge the Dead Letter Queue
GET /api/v1/workflows List registered workflows

Example: Submit a workflow via API

curl -X POST http://localhost:8080/api/v1/workflows/run \
  -H "Content-Type: application/json" \
  -d '{
    "workflow": "hello.yaml",
    "input": {"name": "FlowPilot"}
  }'

📊 TUI Dashboard

Launch the terminal dashboard for real-time workflow monitoring:

flowpilot dashboard

Features:

  • 📋 Live execution list with status display
  • 🔄 Auto-refresh for running workflows
  • 📈 Step-level execution details
  • Press Q to quit

🔌 Plugin System

FlowPilot supports extending custom step types through its plugin system.

Creating a plugin:

Create a Python file in the plugins/ directory, e.g. plugins/my_steps.py:

"""Custom step handler plugin."""

def register(handlers):
    """Register custom step types."""

    def ssh_exec(config, context):
        """Execute remote SSH commands."""
        host = config.get("host")
        cmd = config.get("command")
        # Implement your SSH logic...
        return {"output": f"Executed on {host}: {cmd}"}

    def sql_query(config, context):
        """Execute SQL queries."""
        db_url = config.get("database")
        query = config.get("query")
        # Implement your query logic...
        return {"output": "Query results..."}

    # Register step handlers
    handlers["ssh_exec"] = ssh_exec
    handlers["sql_query"] = sql_query

Using custom steps:

steps:
  - name: remote-deploy
    type: ssh_exec
    config:
      host: "deploy.example.com"
      command: "docker-compose up -d"

Specifying the plugin directory:

flowpilot run workflow.yaml --plugins-dir /path/to/my/plugins

💡 Design Philosophy & Roadmap

🎯 Design Philosophy

  1. Zero Dependencies -- Only the Python standard library is used, eliminating version conflicts and environment configuration headaches
  2. Durability First -- All state is written to SQLite in real time, ensuring complete recovery after crashes
  3. Convention over Configuration -- Sensible defaults work out of the box, minimizing unnecessary config
  4. Progressive Complexity -- Simple syntax for simple cases, with enough power for complex scenarios
  5. Observability -- Rich CLI output, TUI dashboard, and REST API for full execution visibility

🗺️ Roadmap

  • v0.1 - Core engine: step execution, state persistence, crash recovery
  • v0.5 - Retry mechanism: exponential backoff, jitter, dead letter queue
  • v0.8 - Control flow: conditional routing, parallel execution, loop iteration
  • v1.0 - Full feature set: HTTP API, TUI dashboard, plugin system
  • v1.1 - Workflow visualization: execution flow graph generation
  • v1.2 - Distributed mode: multi-node task distribution
  • v1.5 - Web UI: browser-based management interface
  • v2.0 - Cloud native: Kubernetes Operator, event-driven architecture

📦 Packaging & Deployment Guide

📌 Install as a Python Package

# Install from source
git clone https://github.com/your-repo/flowpilot.git
cd flowpilot
pip install -e .

# Verify installation
flowpilot --version

The setup.py includes a console_scripts entry point, so the flowpilot command is automatically registered to your system PATH after installation.

🐳 Docker Deployment

Dockerfile:

FROM python:3.11-slim

WORKDIR /app

COPY . .
RUN pip install -e .

# Create working directories
RUN mkdir -p /data/workflows /data/plugins

ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1

ENTRYPOINT ["flowpilot"]
CMD ["serve", "--host", "0.0.0.0", "--port", "8080"]

Build and run:

# Build the image
docker build -t flowpilot:latest .

# Run the API server
docker run -d \
  --name flowpilot \
  -p 8080:8080 \
  -v $(pwd)/workflows:/data/workflows \
  -v flowpilot-data:/app \
  flowpilot:latest

# Execute a workflow
docker run --rm \
  -v $(pwd)/workflows:/data/workflows \
  flowpilot:latest run /data/workflows/pipeline.yaml --wait

🔧 Use as a Library

FlowPilot can also be used directly as a Python library:

from flowpilot.engine import WorkflowEngine

# Create an engine instance
engine = WorkflowEngine(db_path="my_workflows.db")
engine.start()

# Submit a workflow
execution = engine.submit(
    "path/to/workflow.yaml",
    input_data={"key": "value"},
    wait=True,
)

print(f"Status: {execution.status.value}")
print(f"Step results: {execution.step_results}")

# Query execution records
records = engine.list_executions(status="completed", limit=10)

# Cleanup
engine.stop()

⚙️ Production Recommendations

  • Use --db to specify a persistent SQLite path; ensure the data directory has sufficient disk space
  • Use --log-level DEBUG for troubleshooting; INFO or WARNING is recommended for production
  • Manage the flowpilot serve process with systemd or supervisor for automatic restarts
  • Periodically run flowpilot purge --completed to clean up completed execution records

🤝 Contributing Guide

We welcome and appreciate contributions of all kinds -- bug reports, documentation improvements, and code patches alike.

🔄 Contribution Workflow

  1. Fork this repository
  2. Create a feature branch: git checkout -b feature/my-new-feature
  3. Write code and add tests
  4. Ensure tests pass: python -m pytest tests/
  5. Commit your changes: git commit -m 'feat: add my new feature'
  6. Push the branch: git push origin feature/my-new-feature
  7. Submit a Pull Request

📏 Code Standards

  • Follow PEP 8 coding conventions
  • All public methods must include docstrings
  • New features must include corresponding unit tests
  • Commit messages should follow the Conventional Commits format

🐛 Reporting Issues

Please submit bug reports or feature requests via GitHub Issues and include:

  • Problem description and reproduction steps
  • Workflow YAML file (if relevant)
  • Runtime environment (Python version, OS)
  • Relevant log output

📄 License

This project is released under the MIT License.

MIT License

Copyright (c) 2026 FlowPilot Contributors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.

You are free to use, modify, and distribute this software. Please retain the copyright notice and license text.


Made with ❤️ by FlowPilot Contributors

About

🚀 FlowPilot - Lightweight Durable Workflow Orchestration Engine CLI | 轻量级持久化工作流编排引擎 - Zero dependencies, YAML workflows, retry, DLQ, parallel, conditional routing, TUI dashboard, HTTP API, plugin system

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages