diff --git a/Makefile b/Makefile index 5b63ea78..6c8cbc2e 100644 --- a/Makefile +++ b/Makefile @@ -382,8 +382,20 @@ VALID_K8S_TARGETS := datamate deer-flow milvus label-studio data-juicer mineru m cp runtime/deer-flow/conf.yaml deployment/helm/deer-flow/charts/public/conf.yaml; \ helm upgrade deer-flow deployment/helm/deer-flow -n $(NAMESPACE) --install --set global.image.repository=$(REGISTRY); \ elif [ "$*" = "milvus" ]; then \ - kubectl apply -f deployment/kubernetes/sealed-secrets/milvus.yaml; \ - helm upgrade milvus deployment/helm/milvus -n $(NAMESPACE) --install; \ + kubectl apply -f deployment/kubernetes/sealed-secrets/milvus.yaml 2>/dev/null || true; \ + ACCESSKEY=$$(kubectl get secret milvus-minio-secret -n $(NAMESPACE) -o jsonpath='{.data.accessKey}' 2>/dev/null | base64 -d 2>/dev/null || echo ""); \ + SECRETKEY=$$(kubectl get secret milvus-minio-secret -n $(NAMESPACE) -o jsonpath='{.data.secretKey}' 2>/dev/null | base64 -d 2>/dev/null || echo ""); \ + if [ -n "$$ACCESSKEY" ] && [ -n "$$SECRETKEY" ]; then \ + helm upgrade milvus deployment/helm/milvus -n $(NAMESPACE) --install \ + --set minio.accessKey=$$ACCESSKEY \ + --set minio.secretKey=$$SECRETKEY; \ + else \ + echo "[ERROR] milvus-minio-secret not found or empty in namespace $(NAMESPACE)"; \ + echo " Please ensure Sealed Secrets Controller is running and the secret was decrypted."; \ + echo " For local dev: kubectl create secret generic milvus-minio-secret \\"; \ + echo " --from-literal=accessKey= --from-literal=secretKey= -n $(NAMESPACE)"; \ + exit 1; \ + fi; \ elif [ "$*" = "data-juicer" ] || [ "$*" = "dj" ]; then \ kubectl apply -f deployment/kubernetes/data-juicer/deploy.yaml -n $(NAMESPACE); \ fi diff --git a/deployment/helm/milvus/values.yaml b/deployment/helm/milvus/values.yaml index 626248e4..7d4396c5 100644 --- a/deployment/helm/milvus/values.yaml +++ b/deployment/helm/milvus/values.yaml @@ -4,6 +4,14 @@ nameOverride: "" ## Default fully qualified app name fullnameOverride: "" +## Define toleration for node isolation +## This anchor can be referenced throughout the configuration +nodeIsolationTolerations: &nodeIsolationTolerations + - key: "node-role.kubernetes.io/datamate" + operator: "Equal" + value: "true" + effect: "NoSchedule" + ## Enable or disable Milvus Cluster mode cluster: enabled: false @@ -31,7 +39,7 @@ nodeSelector: {} # Global tolerations # If set, this will apply to all milvus components # Individual components can be set to a different tolerations -tolerations: [] +tolerations: *nodeIsolationTolerations # Global affinity # If set, this will apply to all milvus components @@ -234,7 +242,7 @@ standalone: # ephemeral-storage: 100Gi nodeSelector: {} affinity: {} - tolerations: [] + tolerations: *nodeIsolationTolerations securityContext: {} containerSecurityContext: {} topologySpreadConstraints: [] # Component specific topologySpreadConstraints @@ -653,6 +661,10 @@ minio: type: ClusterIP port: 9000 + nodeSelector: {} + tolerations: *nodeIsolationTolerations + affinity: {} + persistence: enabled: true existingClaim: "" @@ -722,6 +734,10 @@ etcd: storagePath: storageNode: + nodeSelector: {} + tolerations: *nodeIsolationTolerations + affinity: {} + ## Change default timeout periods to mitigate zoobie probe process livenessProbe: enabled: true diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py index 3efa7ae6..779dd910 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py @@ -1,3 +1,4 @@ +import asyncio from sqlalchemy.ext.asyncio import AsyncSession from app.core.logging import get_logger from app.module.cleaning.repository import CleaningTaskRepository @@ -12,6 +13,7 @@ class CleaningTaskScheduler: def __init__(self, task_repo: CleaningTaskRepository, runtime_client: RuntimeClient): self.task_repo = task_repo self.runtime_client = runtime_client + self._polling_tasks: dict[str, asyncio.Task] = {} async def execute_task(self, db: AsyncSession, task_id: str, retry_count: int) -> bool: """Execute cleaning task""" @@ -25,12 +27,23 @@ async def execute_task(self, db: AsyncSession, task_id: str, retry_count: int) - task.retry_count = retry_count await self.task_repo.update_task(db, task) - return await self.runtime_client.submit_task(task_id, retry_count) + submitted = await self.runtime_client.submit_task(task_id, retry_count) + + if submitted: + # Start background polling to sync task status from runtime + self._start_status_polling(task_id) + + return submitted async def stop_task(self, db: AsyncSession, task_id: str) -> bool: """Stop cleaning task""" from app.module.cleaning.schema import CleaningTaskDto, CleaningTaskStatus + # Cancel background polling + if task_id in self._polling_tasks: + self._polling_tasks[task_id].cancel() + del self._polling_tasks[task_id] + await self.runtime_client.stop_task(task_id) task = CleaningTaskDto() @@ -39,3 +52,61 @@ async def stop_task(self, db: AsyncSession, task_id: str) -> bool: await self.task_repo.update_task(db, task) return True + + def _start_status_polling(self, task_id: str): + """Start background task to poll runtime for task status""" + + async def _poll_loop(): + from app.module.cleaning.schema import CleaningTaskDto, CleaningTaskStatus + from app.db.session import AsyncSessionLocal + from datetime import datetime + + logger.info(f"[Polling] Starting status polling for task {task_id}") + await asyncio.sleep(5) + + terminal_statuses = {"completed", "failed", "cancelled", "stopped"} + max_polls = 1800 # Max 1 hour (2s interval) + poll_count = 0 + + while poll_count < max_polls: + try: + status_data = await self.runtime_client.get_task_status(task_id) + + if status_data is None: + poll_count += 1 + await asyncio.sleep(2) + continue + + current_status = (status_data.get("status", "") or "").lower() + logger.debug(f"[Polling] Task {task_id} status: {current_status}") + + if current_status in terminal_statuses: + async with AsyncSessionLocal() as db: + task = CleaningTaskDto() + task.id = task_id + if current_status == "completed": + task.status = CleaningTaskStatus.COMPLETED + else: + task.status = CleaningTaskStatus.FAILED + task.finished_at = datetime.now() + await self.task_repo.update_task(db, task) + await db.commit() + logger.info( + f"[Polling] Task {task_id} finished: {current_status}" + ) + break + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"[Polling] Error polling task {task_id}: {e}") + + poll_count += 1 + await asyncio.sleep(2) + else: + logger.warning(f"[Polling] Task {task_id} timed out") + + self._polling_tasks.pop(task_id, None) + + task = asyncio.create_task(_poll_loop()) + self._polling_tasks[task_id] = task diff --git a/runtime/python-executor/datamate/common/error_code.py b/runtime/python-executor/datamate/common/error_code.py index d62906e2..11fd542a 100644 --- a/runtime/python-executor/datamate/common/error_code.py +++ b/runtime/python-executor/datamate/common/error_code.py @@ -93,6 +93,7 @@ class ErrorCode(Enum): # 通用错误 SUCCESS = (0, "Success") UNKNOWN_ERROR = (1, "Unknown error") + PARAM_ERROR = (2, "Invalid parameter") FILE_NOT_FOUND_ERROR = (1000, "File not found!") SUBMIT_TASK_ERROR = (1001, "Task submitted Failed!") CANCEL_TASK_ERROR = (1002, "Task canceled Failed!") \ No newline at end of file diff --git a/runtime/python-executor/datamate/operator_runtime.py b/runtime/python-executor/datamate/operator_runtime.py index 9963a1d5..ce814d53 100644 --- a/runtime/python-executor/datamate/operator_runtime.py +++ b/runtime/python-executor/datamate/operator_runtime.py @@ -1,4 +1,5 @@ import os +import re from typing import Optional, Dict, Any, List import uvicorn @@ -13,6 +14,7 @@ from datamate.common.error_code import ErrorCode from datamate.scheduler import cmd_scheduler from datamate.scheduler import func_scheduler +from datamate.scheduler import ray_job_scheduler from datamate.wrappers import WRAPPERS # 日志配置 @@ -63,6 +65,18 @@ class QueryTaskRequest(BaseModel): task_ids: List[str] +# UUID pattern for task_id validation (prevents path traversal) +_TASK_ID_PATTERN = re.compile( + r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$' +) + + +def _validate_task_id(task_id: str) -> None: + """Validate task_id is a proper UUID to prevent path traversal (CodeQL).""" + if not _TASK_ID_PATTERN.match(task_id): + raise APIException(ErrorCode.PARAM_ERROR, detail=f"Invalid task_id format: {task_id}") + + @app.post("/api/task/list") async def query_task_info(request: QueryTaskRequest): try: @@ -77,6 +91,7 @@ class SubmitTaskRequest(BaseModel): @app.post("/api/task/{task_id}/submit") async def submit_task(task_id, request: SubmitTaskRequest = None): + _validate_task_id(task_id) retry_count = request.retry_count if request else 0 config_path = f"/flow/{task_id}/process.yaml" logger.info(f"Start submitting job with retry_count={retry_count}...") @@ -102,8 +117,42 @@ async def submit_task(task_id, request: SubmitTaskRequest = None): return success_json_info +@app.get("/api/task/{task_id}/status") +async def get_task_status(task_id): + """Get task execution status from the scheduler""" + _validate_task_id(task_id) + try: + executor_type = get_from_cfg(task_id, "executor_type", default="datamate") + if executor_type == "ray" or executor_type == "datamate": + result = ray_job_scheduler.get_task_status(task_id) + else: + result = cmd_scheduler.get_task_status(task_id) + + if result is None: + return JSONResponse( + content={"status": "NOT_FOUND", "message": f"Task {task_id} not found"}, + status_code=200 + ) + + return JSONResponse( + content={ + "task_id": result.task_id, + "status": result.status.value, + "error": result.error, + "created_at": result.created_at.isoformat() if result.created_at else None, + "started_at": result.started_at.isoformat() if result.started_at else None, + "completed_at": result.completed_at.isoformat() if result.completed_at else None, + }, + status_code=200 + ) + except Exception as e: + logger.error(f"Error getting task status {task_id}: {e}") + raise APIException(ErrorCode.UNKNOWN_ERROR) + + @app.post("/api/task/{task_id}/stop") async def stop_task(task_id): + _validate_task_id(task_id) logger.info("Start stopping ray job...") success_json_info = JSONResponse( content={"status": "Success", "message": f"{task_id} has been stopped"}, @@ -128,8 +177,13 @@ def check_valid_path(file_path): return os.path.exists(full_path) -def get_from_cfg(task_id, key): - config_path = f"/flow/{task_id}/process.yaml" +def get_from_cfg(task_id, key, default=None): + # Build path, then resolve ".." and verify it stays within /flow (CodeQL) + config_path = os.path.join("/flow", task_id, "process.yaml") + config_path = os.path.normpath(config_path) + if not config_path.startswith("/flow/"): + raise APIException(ErrorCode.PARAM_ERROR, detail=f"Invalid task_id: {task_id}") + if not check_valid_path(config_path): logger.error(f"config_path is not existed! please check this path.") raise APIException(ErrorCode.FILE_NOT_FOUND_ERROR) @@ -137,7 +191,7 @@ def get_from_cfg(task_id, key): with open(config_path, "r", encoding='utf-8') as f: content = f.read() cfg = yaml.safe_load(content) - return cfg[key] + return cfg.get(key, default) def parse_args(): diff --git a/runtime/python-executor/datamate/scheduler/job_task_scheduler.py b/runtime/python-executor/datamate/scheduler/job_task_scheduler.py index c73080c0..29ceb83d 100644 --- a/runtime/python-executor/datamate/scheduler/job_task_scheduler.py +++ b/runtime/python-executor/datamate/scheduler/job_task_scheduler.py @@ -73,6 +73,14 @@ async def _execute(self): poll_interval = 2 # 2 秒轮询一次 elapsed_time = 0 last_log_position = 0 # 记录已写入的日志位置 + connection_failure_count = 0 # 连接失败计数器 + max_connection_failures = 5 # 最大连接失败次数阈值 + + # 任务无进展超时:如果 job 一直是 RUNNING 但日志/进度没有任何变化, + # 说明 workers 可能已全部丢失,判定为失败 + stall_timeout = int(os.getenv("RAY_JOB_STALL_TIMEOUT", "120")) # 默认 120 秒 + last_log_size = 0 + last_active_time = datetime.now() while True: if self._cancelled: @@ -86,10 +94,28 @@ async def _execute(self): try: info = client.get_job_info(self.job_id) job_status = info.status + + # 连接成功,重置失败计数器 + connection_failure_count = 0 # 获取并写入日志 await self._fetch_and_write_logs(client, last_log_position) - last_log_position = os.path.getsize(self.log_path) if os.path.exists(self.log_path) else 0 + current_log_size = os.path.getsize(self.log_path) if os.path.exists(self.log_path) else 0 + last_log_position = current_log_size + + # 检测任务是否停滞(workers 全部丢失但 head 仍返回 RUNNING) + if current_log_size > last_log_size: + last_active_time = datetime.now() + last_log_size = current_log_size + elif (datetime.now() - last_active_time).total_seconds() > stall_timeout: + logger.error( + f"Ray Job {self.job_id} stalled for {stall_timeout}s " + f"(no log progress), marking as FAILED" + ) + self._stop_job(client) + self.status = TaskStatus.FAILED + self.error = f"Job stalled: no progress for {stall_timeout} seconds (workers may be lost)" + break if job_status == "SUCCEEDED": self.status = TaskStatus.COMPLETED @@ -119,7 +145,21 @@ async def _execute(self): self.error = f"Job timed out after {self.timeout} seconds" break + except (ConnectionError, ConnectionRefusedError, ConnectionResetError, OSError) as e: + # 连接类异常:Ray 集群可能已下线 + connection_failure_count += 1 + logger.error( + f"Connection to Ray cluster failed (attempt {connection_failure_count}/{max_connection_failures}): {e}" + ) + + if connection_failure_count >= max_connection_failures: + self.status = TaskStatus.FAILED + self.error = f"Lost connection to Ray cluster after {max_connection_failures} attempts: {str(e)}" + logger.error(f"Task {self.task_id} failed: {self.error}") + break + except Exception as e: + # 其他异常:记录警告但继续重试 logger.warning(f"Error checking job status: {e}") await asyncio.sleep(poll_interval)