From 2d438adf0b9b4c8ef0f35daa42fda6d004fcfa5a Mon Sep 17 00:00:00 2001 From: yafengzhang2025 <516730181@qq.com> Date: Mon, 12 Jan 2026 17:16:53 +0800 Subject: [PATCH 1/3] feature: support data intelligence orchestation front --- frontend/package-lock.json | 29 +- frontend/package.json | 2 +- .../Orchestration/SmartOrchestration.tsx | 1800 +++++++++++++++++ frontend/src/routes/routes.ts | 4 +- 4 files changed, 1826 insertions(+), 9 deletions(-) create mode 100644 frontend/src/pages/Orchestration/SmartOrchestration.tsx diff --git a/frontend/package-lock.json b/frontend/package-lock.json index f48ffc742..3649d2a5e 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -8,7 +8,7 @@ "name": "edatamate", "version": "0.0.0", "dependencies": { - "@reduxjs/toolkit": "^2.11.0", + "@reduxjs/toolkit": "^2.11.2", "@xyflow/react": "^12.8.3", "antd": "^5.27.0", "jssha": "^3.3.1", @@ -188,6 +188,7 @@ "integrity": "sha512-UlLAnTPrFdNGoFtbSXwcGFQBtQZJCNjaN6hQNP3UPvuNXT1i82N26KL3dZeIpNalWywr9IuQuncaAfUaS1g6sQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@ampproject/remapping": "^2.2.0", "@babel/code-frame": "^7.27.1", @@ -1373,9 +1374,9 @@ } }, "node_modules/@reduxjs/toolkit": { - "version": "2.11.0", - "resolved": "https://registry.npmjs.org/@reduxjs/toolkit/-/toolkit-2.11.0.tgz", - "integrity": "sha512-hBjYg0aaRL1O2Z0IqWhnTLytnjDIxekmRxm1snsHjHaKVmIF1HiImWqsq+PuEbn6zdMlkIj9WofK1vR8jjx+Xw==", + "version": "2.11.2", + "resolved": "https://registry.npmjs.org/@reduxjs/toolkit/-/toolkit-2.11.2.tgz", + "integrity": "sha512-Kd6kAHTA6/nUpp8mySPqj3en3dm0tdMIgbttnQ1xFMVpufoj+ADi8pXLBsd4xzTRHQa7t/Jv8W5UnCuW4kuWMQ==", "license": "MIT", "dependencies": { "@standard-schema/spec": "^1.0.0", @@ -2136,6 +2137,7 @@ "integrity": "sha512-DRh5K+ka5eJic8CjH7td8QpYEV6Zo10gfRkjHCO3weqZHWDtAaSTFtl4+VMqOJ4N5jcuhZ9/l+yy8rVgw7BQeQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~7.10.0" } @@ -2153,6 +2155,7 @@ "integrity": "sha512-/LDXMQh55EzZQ0uVAZmKKhfENivEvWz6E+EYzh+/MCjMhNsotd+ZHhBGIjFDTi6+fz0OhQQQLbTgdQIxxCsC0w==", "devOptional": true, "license": "MIT", + "peer": true, "dependencies": { "@types/prop-types": "*", "csstype": "^3.0.2" @@ -2220,6 +2223,7 @@ "integrity": "sha512-pUXGCuHnnKw6PyYq93lLRiZm3vjuslIy7tus1lIQTYVK9bL8XBgJnCWm8a0KcTtHC84Yya1Q6rtll+duSMj0dg==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.39.1", "@typescript-eslint/types": "8.39.1", @@ -2518,6 +2522,7 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", + "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2766,6 +2771,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "caniuse-lite": "^1.0.30001733", "electron-to-chromium": "^1.5.199", @@ -3162,6 +3168,7 @@ "resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz", "integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==", "license": "ISC", + "peer": true, "engines": { "node": ">=12" } @@ -3250,7 +3257,8 @@ "version": "1.11.13", "resolved": "https://registry.npmjs.org/dayjs/-/dayjs-1.11.13.tgz", "integrity": "sha512-oaMBel6gjolK862uaPQOVTA7q3TZhuSvuMQAAglQDOWYO9A91IrAOUJEyKVlqJlHE0vq5p5UXxzdPfMH/x6xNg==", - "license": "MIT" + "license": "MIT", + "peer": true }, "node_modules/debug": { "version": "4.4.1", @@ -3477,6 +3485,7 @@ "integrity": "sha512-TS9bTNIryDzStCpJN93aC5VRSW3uTx9sClUn4B87pwiCaJh220otoI0X8mJKr+VcPtniMdN8GKjlwgWGUv5ZKA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.2.0", "@eslint-community/regexpp": "^4.12.1", @@ -6015,6 +6024,7 @@ "resolved": "https://registry.npmjs.org/react/-/react-18.3.1.tgz", "integrity": "sha512-wS+hAgJShR0KhEvPJArfuPVN1+Hz1t0Y6n5jLrGQbkb4urgPE/0Rve+1kMB1v/oWgHgm4WIcV+i7F2pTVj+2iQ==", "license": "MIT", + "peer": true, "dependencies": { "loose-envify": "^1.1.0" }, @@ -6027,6 +6037,7 @@ "resolved": "https://registry.npmjs.org/react-dom/-/react-dom-18.3.1.tgz", "integrity": "sha512-5m4nQKp+rZRb09LNH59GM4BxTh9251/ylbKIbpe7TpGxfJ+9kv6BLkLBXIjjspbgbnIBNqlI23tRnTWT0snUIw==", "license": "MIT", + "peer": true, "dependencies": { "loose-envify": "^1.1.0", "scheduler": "^0.23.2" @@ -6046,6 +6057,7 @@ "resolved": "https://registry.npmjs.org/react-redux/-/react-redux-9.2.0.tgz", "integrity": "sha512-ROY9fvHhwOD9ySfrF0wmvu//bKCQ6AeZZq1nJNtbDC+kk5DuSuNX/n6YWYF/SYy7bSba4D4FSz8DJeKY/S/r+g==", "license": "MIT", + "peer": true, "dependencies": { "@types/use-sync-external-store": "^0.0.6", "use-sync-external-store": "^1.4.0" @@ -6176,7 +6188,8 @@ "version": "5.0.1", "resolved": "https://registry.npmjs.org/redux/-/redux-5.0.1.tgz", "integrity": "sha512-M9/ELqF6fy8FwmkpnF0S3YKOqMyoWJ4+CS5Efg2ct3oY9daQvd/Pc71FpGZsVsbl3Cpb+IIcjBDUnnyBdQbq4w==", - "license": "MIT" + "license": "MIT", + "peer": true }, "node_modules/redux-thunk": { "version": "3.1.0", @@ -6760,6 +6773,7 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -6863,6 +6877,7 @@ "integrity": "sha512-p1diW6TqL9L07nNxvRMM7hMMw4c5XOo/1ibL4aAIGmSAt9slTE1Xgw5KWuof2uTOvCg9BY7ZRi+GaF+7sfgPeQ==", "dev": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -7030,6 +7045,7 @@ "integrity": "sha512-J0SQBPlQiEXAF7tajiH+rUooJPo0l8KQgyg4/aMunNtrOa7bwuZJsJbDWzeljqQpgftxuq5yNJxQ91O9ts29UQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.4.6", @@ -7120,6 +7136,7 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, diff --git a/frontend/package.json b/frontend/package.json index 5d44f3bae..d781ac0be 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -11,7 +11,7 @@ "preview": "vite preview" }, "dependencies": { - "@reduxjs/toolkit": "^2.11.0", + "@reduxjs/toolkit": "^2.11.2", "@xyflow/react": "^12.8.3", "antd": "^5.27.0", "jssha": "^3.3.1", diff --git a/frontend/src/pages/Orchestration/SmartOrchestration.tsx b/frontend/src/pages/Orchestration/SmartOrchestration.tsx new file mode 100644 index 000000000..59f91d9b5 --- /dev/null +++ b/frontend/src/pages/Orchestration/SmartOrchestration.tsx @@ -0,0 +1,1800 @@ +"use client" + +import type React from "react" +import { useState, useCallback } from "react" +import { + ReactFlow, + MiniMap, + Controls, + Background, + useNodesState, + useEdgesState, + addEdge, + Handle, + Position, + type Connection, + type Node, + type NodeTypes, + BackgroundVariant, +} from "@xyflow/react" +import "@xyflow/react/dist/style.css" +import { Card, Button, Input, Badge, Tabs, Switch, Divider, Select } from "antd" +import TextArea from "antd/es/input/TextArea" +import { + Play, + Save, + ArrowLeft, + Database, + Trash2, + Copy, + Search, + Plus, + Clock, + Zap, + BarChart3, + Brain, + FileText, + Target, + Sparkles, + Settings, + Filter, + BookOpen, + TrendingUp, + GitBranch, + Eye, + CheckCircle, + AlertCircle, + PlayCircle, + Code, +} from "lucide-react" + +interface NodeConfig { + name: string + description: string + pythonCode: string + // Removed schedule from NodeConfig interface +} + +// Removed all specific config interfaces +// Removed DataCollectionConfig, DataCleaningConfig, DataRatioConfig, KnowledgeGenerationConfig, ModelTrainingConfig, ModelReportConfig + +const SmartOrchestrationNode = ({ data, selected }: { data: any; selected: boolean }) => { + const [isHovered, setIsHovered] = useState(false) + + const getNodeIcon = (type: string) => { + const icons: Record = { + "data-collection": , + "data-cleaning": , + "data-ratio": , + "knowledge-generation": , + "model-training": , + "model-report": , + } + return icons[type] || + } + + const getNodeColor = (type: string) => { + const colors: Record = { + "data-collection": { bg: "bg-blue-50", border: "border-blue-300", icon: "text-blue-600" }, + "data-cleaning": { bg: "bg-green-50", border: "border-green-300", icon: "text-green-600" }, + "data-ratio": { bg: "bg-purple-50", border: "border-purple-300", icon: "text-purple-600" }, + "knowledge-generation": { bg: "bg-orange-50", border: "border-orange-300", icon: "text-orange-600" }, + "model-training": { bg: "bg-red-50", border: "border-red-300", icon: "text-red-600" }, + "model-report": { bg: "bg-cyan-50", border: "border-cyan-300", icon: "text-cyan-600" }, + } + return colors[type] || { bg: "bg-gray-50", border: "border-gray-300", icon: "text-gray-600" } + } + + const nodeColor = getNodeColor(data.type) + + return ( +
setIsHovered(true)} onMouseLeave={() => setIsHovered(false)}> + {/* Connection handles */} + {(selected || isHovered) && ( + <> + + + + )} + + +
+
+
+
+
{getNodeIcon(data.type)}
+
+
+

{data.name}

+

{data.description}

+
+
+
+
+
+
+
+
+
+ 状态 + + {data.config?.enabled ? "已启用" : "已禁用"} + +
+
+ 点击节点查看详细配置 +
+
+
+
+
+ ) +} + +const nodeTypes: NodeTypes = { + smartNode: SmartOrchestrationNode, +} + +const nodeTemplates = [ + { + type: "data-collection", + name: "数据归集", + description: "从多种数据源收集和同步数据", + icon: Database, + category: "数据源", + // Removed defaultConfig, replaced by defaultConfigs object later + }, + { + type: "data-cleaning", + name: "数据清洗", + description: "清洗和预处理数据", + icon: Filter, + category: "数据处理", + // Removed defaultConfig + }, + { + type: "data-ratio", + name: "数据配比", + description: "划分训练集、验证集和测试集", + icon: BarChart3, + category: "数据处理", + // Removed defaultConfig + }, + { + type: "knowledge-generation", + name: "知识生成", + description: "生成知识库和向量化", + icon: Brain, + category: "知识处理", + // Removed defaultConfig + }, + { + type: "model-training", + name: "模型训练", + description: "训练机器学习模型", + icon: Target, + category: "模型", + // Removed defaultConfig + }, + { + type: "model-report", + name: "模型报告", + description: "生成模型评估报告", + icon: FileText, + category: "输出", + // Removed defaultConfig + }, +] + +const workflowTemplates = [ + { + id: "data-flywheel", + name: "数据飞轮 - 持续学习流程", + description: "自动化数据收集、清洗、训练的持续学习循环", + category: "智能学习", + icon: , + schedule: "0 0 */6 * * ?", // Every 6 hours + nodes: [ + { type: "data-collection", position: { x: 100, y: 200 } }, + { type: "data-cleaning", position: { x: 450, y: 200 } }, + { type: "data-ratio", position: { x: 800, y: 200 } }, + { type: "model-training", position: { x: 1150, y: 200 } }, + { type: "model-report", position: { x: 1500, y: 200 } }, + ], + }, + { + id: "knowledge-pipeline", + name: "知识库构建流程", + description: "秒级知识库检索和更新流程", + category: "知识管理", + icon: , + schedule: "*/30 * * * * ?", // Every 30 seconds + nodes: [ + { type: "data-collection", position: { x: 100, y: 200 } }, + { type: "data-cleaning", position: { x: 450, y: 200 } }, + { type: "knowledge-generation", position: { x: 800, y: 200 } }, + ], + }, + { + id: "full-pipeline", + name: "完整AI流程", + description: "从数据收集到模型报告的完整流程", + category: "综合", + icon: , + schedule: "0 0 0 * * ?", // Daily at midnight + nodes: [ + { type: "data-collection", position: { x: 100, y: 150 } }, + { type: "data-cleaning", position: { x: 450, y: 150 } }, + { type: "data-ratio", position: { x: 800, y: 50 } }, + { type: "knowledge-generation", position: { x: 800, y: 250 } }, + { type: "model-training", position: { x: 1150, y: 150 } }, + { type: "model-report", position: { x: 1500, y: 150 } }, + ], + }, +] + +const mockExecutions = [ + { + id: 1, + workflowName: "数据飞轮 - 持续学习流程", + status: "running", + startTime: "2025-11-26 14:30:00", + progress: 60, + currentNode: "model-training", + }, + { + id: 2, + workflowName: "知识库构建流程", + status: "success", + startTime: "2025-11-26 14:28:30", + endTime: "2025-11-26 14:29:15", + duration: "45s", + }, + { + id: 3, + workflowName: "完整AI流程", + status: "failed", + startTime: "2025-11-26 12:00:00", + endTime: "2025-11-26 12:15:32", + duration: "15m 32s", + error: "数据清洗节点执行失败: 字段格式错误", + }, +] + +interface SmartOrchestrationProps { + onBack?: () => void +} + +export default function SmartOrchestrationPage({ onBack }: SmartOrchestrationProps) { + const [view, setView] = useState<"list" | "editor" | "execution">("list") + const [selectedExecutionId, setSelectedExecutionId] = useState(null) + const [workflow, setWorkflow] = useState({ + id: Date.now(), + name: "新建流程", + description: "", + category: "自定义", + schedule: { + enabled: false, + cronExpression: "0 0 * * * ?", + timezone: "Asia/Shanghai", + triggerType: "manual" as "manual" | "schedule" | "api", + }, + }) + + const [nodes, setNodes, onNodesChange] = useNodesState([]) + const [edges, setEdges, onEdgesChange] = useEdgesState([]) + const [selectedNodeId, setSelectedNodeId] = useState(null) + const [searchTerm, setSearchTerm] = useState("") + + const filteredTemplates = nodeTemplates.filter( + (t) => + t.name.toLowerCase().includes(searchTerm.toLowerCase()) || + t.description.toLowerCase().includes(searchTerm.toLowerCase()), + ) + + const onConnect = useCallback( + (params: Connection) => { + if (params.source === params.target) return + + const newEdge = { + ...params, + id: `edge-${params.source}-${params.target}-${Date.now()}`, + type: "smoothstep", + animated: true, + style: { stroke: "#3b82f6", strokeWidth: 2 }, + markerEnd: { type: "arrowclosed" as const, color: "#3b82f6" }, + } + + setEdges((eds) => addEdge(newEdge, eds)) + }, + [setEdges], + ) + + const deleteNode = useCallback( + (nodeId: string) => { + setNodes((nds) => nds.filter((node) => node.id !== nodeId)) + setEdges((eds) => eds.filter((edge) => edge.source !== nodeId && edge.target !== nodeId)) + }, + [setNodes, setEdges], + ) + + const duplicateNode = useCallback( + (nodeId: string) => { + const node = nodes.find((n) => n.id === nodeId) + if (!node) return + + const newNode: Node = { + ...node, + id: `${node.data.type}_${Date.now()}`, + position: { x: node.position.x + 50, y: node.position.y + 50 }, + data: { ...node.data, id: `${node.data.type}_${Date.now()}` }, + } + + setNodes((nds) => [...nds, newNode]) + }, + [nodes, setNodes], + ) + + const onDragStart = (event: React.DragEvent, nodeType: string) => { + event.dataTransfer.setData("application/reactflow", nodeType) + event.dataTransfer.effectAllowed = "move" + } + + const onDragOver = useCallback((event: React.DragEvent) => { + event.preventDefault() + event.dataTransfer.dropEffect = "move" + }, []) + + const onDrop = useCallback( + (event: React.DragEvent) => { + event.preventDefault() + + const type = event.dataTransfer.getData("application/reactflow") + if (!type) return + + const position = { x: event.clientX - 400, y: event.clientY - 100 } + const template = nodeTemplates.find((t) => t.type === type) + if (!template) return + + // Find the default config from the new defaultConfigs object + const defaultConfig = defaultConfigs[type] + if (!defaultConfig) return // Should not happen if template exists + + const newNode: Node = { + id: `${type}_${Date.now()}`, + type: "smartNode", + position, + data: { + id: `${type}_${Date.now()}`, + type, + name: template.name, + description: template.description, + config: { ...defaultConfig }, // Use default config from the new object + onDelete: deleteNode, + onDuplicate: duplicateNode, + }, + } + + setNodes((nds) => [...nds, newNode]) + }, + [setNodes, deleteNode, duplicateNode], + ) + + const loadTemplate = (templateId: string) => { + const template = workflowTemplates.find((t) => t.id === templateId) + if (!template) return + + const newNodes: Node[] = template.nodes + .map((nodeSpec, index) => { + const nodeTemplate = nodeTemplates.find((t) => t.type === nodeSpec.type) + if (!nodeTemplate) return null + + // Find the default config from the new defaultConfigs object + const defaultConfig = defaultConfigs[nodeSpec.type] + if (!defaultConfig) return null // Should not happen + + return { + id: `${nodeSpec.type}_${Date.now()}_${index}`, + type: "smartNode", + position: nodeSpec.position, + data: { + id: `${nodeSpec.type}_${Date.now()}_${index}`, + type: nodeSpec.type, + name: nodeTemplate.name, + description: nodeTemplate.description, + config: { ...defaultConfig }, // Use default config from the new object + onDelete: deleteNode, + onDuplicate: duplicateNode, + }, + } + }) + .filter(Boolean) as Node[] + + // Connect nodes sequentially + const newEdges = newNodes.slice(0, -1).map((node, index) => ({ + id: `edge-${node.id}-${newNodes[index + 1].id}`, + source: node.id, + target: newNodes[index + 1].id, + type: "smoothstep", + animated: true, + style: { stroke: "#3b82f6", strokeWidth: 2 }, + markerEnd: { type: "arrowclosed" as const, color: "#3b82f6" }, + })) + + setNodes(newNodes) + setEdges(newEdges) + setWorkflow((prev) => ({ + ...prev, + name: template.name, + description: template.description, + category: template.category, + schedule: { + ...prev.schedule, + enabled: true, + cronExpression: template.schedule, + triggerType: "schedule", + }, + })) + setView("editor") + } + + const handleSave = () => { + console.log("[v0] Saving workflow:", { workflow, nodes, edges }) + // Save logic here + setView("list") + } + + const handleBack = () => { + if (view === "editor" || view === "execution") { + setView("list") + } else { + onBack?.() + } + } + + const handleRunWorkflow = () => { + console.log("[v0] Running workflow:", { workflow, nodes, edges }) + // Here you would trigger the workflow execution + } + + const selectedNode = selectedNodeId ? nodes.find((n) => n.id === selectedNodeId) : null + + if (view === "execution") { + return + } + + if (view === "list") { + return ( +
+ {/* Header */} +
+
+

数据智能编排

+

可视化设计和管理数据处理流程,支持定时调度和实时触发

+
+
+ {onBack && ( + + )} + +
+
+ + {/* Quick Stats */} +
+ +
+
+ +
+
+

12

+

活跃流程

+
+
+
+ +
+
+ +
+
+

156

+

成功执行

+
+
+
+ +
+
+ +
+
+

3

+

运行中

+
+
+
+ +
+
+ +
+
+

8

+

定时任务

+
+
+
+
+ + {/* Workflow Templates */} + +
+

+ + 流程模板 +

+

快速开始,使用预定义的流程模板

+
+
+
+ {workflowTemplates.map((template) => ( + +
+
+
+
+ {template.icon} +
+
+

{template.name}

+ + {template.category} + +
+
+

{template.description}

+
+ + {template.schedule} +
+
+ {template.nodes.length} 个节点 + +
+
+
+
+ ))} +
+
+
+ + +
+

+ + 执行历史 +

+

查看流程执行记录和运行状态

+
+
+
+ {mockExecutions.map((execution) => ( + { + setSelectedExecutionId(execution.id) + setView("execution") + }} + > +
+
+
+
+

{execution.workflowName}

+ + {execution.status === "success" + ? "成功" + : execution.status === "running" + ? "运行中" + : "失败"} + +
+
+ 开始时间: {execution.startTime} + {execution.duration && 耗时: {execution.duration}} +
+ {execution.status === "running" && ( +
+
+ 进度: {execution.progress}% + 当前: {execution.currentNode} +
+
+
+
+
+ )} + {execution.error && ( +
+ + {execution.error} +
+ )} +
+ +
+
+ + ))} +
+
+ +
+ ) + } + + return ( +
+ {/* Header */} +
+
+
+ + +
+ setWorkflow((prev) => ({ ...prev, name: e.target.value }))} + className="text-lg font-semibold border-none p-0 h-auto bg-transparent focus-visible:ring-0" + /> + setWorkflow((prev) => ({ ...prev, description: e.target.value }))} + className="text-sm text-gray-600 border-none p-0 h-auto bg-transparent focus-visible:ring-0 mt-1" + placeholder="流程描述" + /> +
+
+
+
+ + 定时调度 + + setWorkflow((prev) => ({ ...prev, schedule: { ...prev.schedule, enabled: checked } })) + } + /> +
+ + +
+
+ + {workflow.schedule.enabled && ( +
+
+
+ + +
+ {workflow.schedule.triggerType === "schedule" && ( + <> +
+ + + setWorkflow((prev) => ({ + ...prev, + schedule: { ...prev.schedule, cronExpression: e.target.value }, + })) + } + className="mt-1 font-mono text-sm" + placeholder="0 0 * * * ?" + /> +
+
+ + +
+ + )} +
+ {workflow.schedule.triggerType === "schedule" && ( +
+

常用Cron示例:

+

• */30 * * * * ? - 每30秒执行 (秒级知识库检索)

+

• 0 */5 * * * ? - 每5分钟执行

+

• 0 0 */6 * * ? - 每6小时执行 (数据飞轮)

+

• 0 0 0 * * ? - 每天午夜执行

+
+ )} +
+ )} +
+ + {/* Node Library Sidebar */} +
+
+
+ + setSearchTerm(e.target.value)} + className="pl-10" + /> +
+
+
+
+ {filteredTemplates.map((template) => ( + onDragStart(e, template.type)} + > +
+
+
+ +
+
+

{template.name}

+

{template.description}

+ + {template.category} + +
+
+
+
+ ))} +
+
+
+ + {/* Canvas */} +
+ setSelectedNodeId(node.id)} + onPaneClick={() => setSelectedNodeId(null)} + onDrop={onDrop} + onDragOver={onDragOver} + nodeTypes={nodeTypes} + fitView + className="bg-gray-50" + > + + + + +
+ + {/* Configuration Panel */} + {selectedNode && ( +
+
+

节点配置

+

{String(selectedNode.data.name || '')}

+
+
+
+ + +
+ +
+
+
+
+
+
+ )} +
+ ) +} + +function NodeConfigPanel({ node, setNodes }: any) { + const config = node.data.config as NodeConfig + + const getCodeTemplate = (nodeType: string) => { + const templates: Record = { + "data-collection": `# 数据归集 PythonOperator +from airflow.operators.python import PythonOperator +import pandas as pd +from sqlalchemy import create_engine + +def collect_data(**context): + """ + 从数据源采集数据 + """ + # 数据库连接配置 + db_config = { + 'host': 'localhost', + 'port': 3306, + 'user': 'root', + 'password': 'password', + 'database': 'mydb' + } + + # 创建数据库连接 + engine = create_engine( + f"mysql+pymysql://{db_config['user']}:{db_config['password']}@" + f"{db_config['host']}:{db_config['port']}/{db_config['database']}" + ) + + # 增量查询 + execution_date = context['execution_date'] + query = f""" + SELECT * FROM users + WHERE updated_at > '{execution_date}' + ORDER BY updated_at + """ + + # 读取数据 + df = pd.read_sql(query, engine) + + # 保存到XCom + context['ti'].xcom_push(key='collected_data', value=df.to_json()) + + print(f"[v0] 采集完成,共 {len(df)} 条记录") + return len(df) + +# 创建PythonOperator +data_collection_task = PythonOperator( + task_id='data_collection', + python_callable=collect_data, + provide_context=True, +)`, + + "data-cleaning": `# 数据清洗 PythonOperator +from airflow.operators.python import PythonOperator +import pandas as pd +import json + +def clean_data(**context): + """ + 清洗和预处理数据 + """ + # 从上游节点获取数据 + ti = context['ti'] + data_json = ti.xcom_pull(key='collected_data') + df = pd.read_json(data_json) + + # 移除空值 + df = df.dropna(subset=['important_column']) + + # 移除重复 + df = df.drop_duplicates(subset=['id']) + + # 文本标准化 + df['name'] = df['name'].str.strip().str.lower() + + # 数据类型转换 + df['age'] = pd.to_numeric(df['age'], errors='coerce') + + # 异常值处理 + df = df[(df['age'] > 0) & (df['age'] < 120)] + + # 保存清洗后的数据 + ti.xcom_push(key='cleaned_data', value=df.to_json()) + + print(f"[v0] 清洗完成,保留 {len(df)} 条有效记录") + return len(df) + +# 创建PythonOperator +data_cleaning_task = PythonOperator( + task_id='data_cleaning', + python_callable=clean_data, + provide_context=True, +)`, + + "data-ratio": `# 数据配比 PythonOperator +from airflow.operators.python import PythonOperator +import pandas as pd +from sklearn.model_selection import train_test_split + +def split_data(**context): + """ + 按比例划分训练集、验证集、测试集 + """ + # 从上游节点获取数据 + ti = context['ti'] + data_json = ti.xcom_pull(key='cleaned_data') + df = pd.read_json(data_json) + + # 配置比例 + train_ratio = 0.7 + val_ratio = 0.15 + test_ratio = 0.15 + + # 是否分层抽样 + stratify_column = 'label' # 设置为None则不分层 + + # 第一次切分:训练集 vs (验证集+测试集) + train_df, temp_df = train_test_split( + df, + test_size=(1 - train_ratio), + stratify=df[stratify_column] if stratify_column else None, + random_state=42 + ) + + # 第二次切分:验证集 vs 测试集 + val_df, test_df = train_test_split( + temp_df, + test_size=test_ratio / (val_ratio + test_ratio), + stratify=temp_df[stratify_column] if stratify_column else None, + random_state=42 + ) + + # 保存划分后的数据集 + ti.xcom_push(key='train_data', value=train_df.to_json()) + ti.xcom_push(key='val_data', value=val_df.to_json()) + ti.xcom_push(key='test_data', value=test_df.to_json()) + + print(f"[v0] 数据划分完成:") + print(f" 训练集: {len(train_df)} 条 ({train_ratio*100}%)") + print(f" 验证集: {len(val_df)} 条 ({val_ratio*100}%)") + print(f" 测试集: {len(test_df)} 条 ({test_ratio*100}%)") + + return { + 'train': len(train_df), + 'val': len(val_df), + 'test': len(test_df) + } + +# 创建PythonOperator +data_ratio_task = PythonOperator( + task_id='data_ratio', + python_callable=split_data, + provide_context=True, +)`, + + "knowledge-generation": `# 知识生成 PythonOperator +from airflow.operators.python import PythonOperator +import pandas as pd +from langchain.text_splitter import RecursiveCharacterTextSplitter +from langchain.embeddings import OpenAIEmbeddings +from langchain.vectorstores import Chroma + +def generate_knowledge(**context): + """ + 生成知识库,包括文档切片和向量化 + """ + # 从上游节点获取数据 + ti = context['ti'] + data_json = ti.xcom_pull(key='cleaned_data') + df = pd.read_json(data_json) + + # 文档切片配置 + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=1000, + chunk_overlap=200, + length_function=len, + separators=["\\n\\n", "\\n", "。", "!", "?", ",", " ", ""] + ) + + # 处理每个文档 + documents = [] + for idx, row in df.iterrows(): + text = row['content'] + chunks = text_splitter.split_text(text) + + for chunk in chunks: + documents.append({ + 'text': chunk, + 'metadata': { + 'source_id': row['id'], + 'title': row.get('title', ''), + 'chunk_index': chunks.index(chunk) + } + }) + + # 创建嵌入模型 + embeddings = OpenAIEmbeddings( + model="text-embedding-3-small", + openai_api_key="your-api-key" + ) + + # 创建向量数据库 + vectorstore = Chroma.from_texts( + texts=[doc['text'] for doc in documents], + embedding=embeddings, + metadatas=[doc['metadata'] for doc in documents], + persist_directory="./chroma_db" + ) + + vectorstore.persist() + + # 保存知识库信息 + ti.xcom_push(key='knowledge_base', value={ + 'total_chunks': len(documents), + 'vectorstore_path': './chroma_db' + }) + + print(f"[v0] 知识库生成完成,共 {len(documents)} 个文档块") + return len(documents) + +# 创建PythonOperator +knowledge_generation_task = PythonOperator( + task_id='knowledge_generation', + python_callable=generate_knowledge, + provide_context=True, +)`, + + "model-training": `# 模型训练 PythonOperator +from airflow.operators.python import PythonOperator +import pandas as pd +import json +from sklearn.ensemble import RandomForestClassifier +from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score +import joblib + +def train_model(**context): + """ + 训练机器学习模型 + """ + # 从上游节点获取数据 + ti = context['ti'] + train_json = ti.xcom_pull(key='train_data') + val_json = ti.xcom_pull(key='val_data') + + train_df = pd.read_json(train_json) + val_df = pd.read_json(val_json) + + # 准备特征和标签 + feature_columns = ['feature1', 'feature2', 'feature3'] + label_column = 'label' + + X_train = train_df[feature_columns] + y_train = train_df[label_column] + X_val = val_df[feature_columns] + y_val = val_df[label_column] + + # 创建并训练模型 + model = RandomForestClassifier( + n_estimators=100, + max_depth=10, + random_state=42, + n_jobs=-1 + ) + + print("[v0] 开始训练模型...") + model.fit(X_train, y_train) + + # 验证集评估 + y_pred = model.predict(X_val) + metrics = { + 'accuracy': accuracy_score(y_val, y_pred), + 'precision': precision_score(y_val, y_pred, average='weighted'), + 'recall': recall_score(y_val, y_pred, average='weighted'), + 'f1': f1_score(y_val, y_pred, average='weighted') + } + + # 保存模型 + run_id = context['run_id'] + model_path = f"./models/model_{run_id}.pkl" + joblib.dump(model, model_path) + + # 保存结果 + ti.xcom_push(key='model_path', value=model_path) + ti.xcom_push(key='metrics', value=json.dumps(metrics)) + + print(f"[v0] 模型训练完成:") + for metric, value in metrics.items(): + print(f" {metric}: {value:.4f}") + + return metrics + +# 创建PythonOperator +model_training_task = PythonOperator( + task_id='model_training', + python_callable=train_model, + provide_context=True, +)`, + + "model-report": `# 模型报告 PythonOperator +from airflow.operators.python import PythonOperator +import pandas as pd +import json +import matplotlib.pyplot as plt +from sklearn.metrics import confusion_matrix, classification_report +import seaborn as sns +from reportlab.lib.pagesizes import letter +from reportlab.pdfgen import canvas +import joblib + +def generate_report(**context): + """ + 生成模型评估报告 + """ + # 从上游节点获取数据 + ti = context['ti'] + model_path = ti.xcom_pull(key='model_path') + metrics_json = ti.xcom_pull(key='metrics') + test_json = ti.xcom_pull(key='test_data') + + metrics = json.loads(metrics_json) + test_df = pd.read_json(test_json) + + # 加载模型 + model = joblib.load(model_path) + + # 准备测试数据 + feature_columns = ['feature1', 'feature2', 'feature3'] + label_column = 'label' + + X_test = test_df[feature_columns] + y_test = test_df[label_column] + y_pred = model.predict(X_test) + + # 生成混淆矩阵 + cm = confusion_matrix(y_test, y_pred) + plt.figure(figsize=(10, 8)) + sns.heatmap(cm, annot=True, fmt='d', cmap='Blues') + plt.title('Confusion Matrix') + plt.ylabel('True Label') + plt.xlabel('Predicted Label') + plt.savefig('./reports/confusion_matrix.png') + plt.close() + + # 生成分类报告 + report = classification_report(y_test, y_pred, output_dict=True) + + # 生成PDF报告 + run_id = context['run_id'] + report_path = f"./reports/model_report_{run_id}.pdf" + c = canvas.Canvas(report_path, pagesize=letter) + + # 写入报告标题 + c.setFont("Helvetica-Bold", 16) + c.drawString(100, 750, "Model Evaluation Report") + + # 写入指标 + c.setFont("Helvetica", 12) + y_position = 700 + for metric, value in metrics.items(): + c.drawString(100, y_position, f"{metric.upper()}: {value:.4f}") + y_position -= 30 + + c.save() + + # 保存报告路径 + ti.xcom_push(key='report_path', value=report_path) + + print(f"[v0] 报告生成完成: {report_path}") + print("\\n详细指标:") + print(json.dumps(report, indent=2)) + + return { + 'report_path': report_path, + 'metrics': metrics + } + +# 创建PythonOperator +model_report_task = PythonOperator( + task_id='model_report', + python_callable=generate_report, + provide_context=True, +)`, + } + + return ( + templates[nodeType] || + `# PythonOperator 示例 +from airflow.operators.python import PythonOperator + +def execute(**context): + """ + 在这里编写你的代码 + """ + # 获取上游数据 + ti = context['ti'] + data = ti.xcom_pull(key='your_key') + + # 处理逻辑 + result = process(data) + + # 推送结果到XCom + ti.xcom_push(key='result', value=result) + + return result + +# 创建PythonOperator +task = PythonOperator( + task_id='your_task', + python_callable=execute, + provide_context=True, +)` + ) + } + + return ( +
+
+ + { + setNodes((nds: any) => + nds.map((n: any) => + n.id === node.id ? { ...n, data: { ...n.data, config: { ...config, name: e.target.value } } } : n, + ), + ) + }} + placeholder="输入节点名称" + className="mt-1" + /> +
+ +
+ +