Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,20 @@ VALID_K8S_TARGETS := datamate deer-flow milvus label-studio data-juicer mineru m
cp runtime/deer-flow/conf.yaml deployment/helm/deer-flow/charts/public/conf.yaml; \
helm upgrade deer-flow deployment/helm/deer-flow -n $(NAMESPACE) --install --set global.image.repository=$(REGISTRY); \
elif [ "$*" = "milvus" ]; then \
kubectl apply -f deployment/kubernetes/sealed-secrets/milvus.yaml; \
helm upgrade milvus deployment/helm/milvus -n $(NAMESPACE) --install; \
kubectl apply -f deployment/kubernetes/sealed-secrets/milvus.yaml 2>/dev/null || true; \
ACCESSKEY=$$(kubectl get secret milvus-minio-secret -n $(NAMESPACE) -o jsonpath='{.data.accessKey}' 2>/dev/null | base64 -d 2>/dev/null || echo ""); \
SECRETKEY=$$(kubectl get secret milvus-minio-secret -n $(NAMESPACE) -o jsonpath='{.data.secretKey}' 2>/dev/null | base64 -d 2>/dev/null || echo ""); \
if [ -n "$$ACCESSKEY" ] && [ -n "$$SECRETKEY" ]; then \
helm upgrade milvus deployment/helm/milvus -n $(NAMESPACE) --install \
--set minio.accessKey=$$ACCESSKEY \
--set minio.secretKey=$$SECRETKEY; \
else \
echo "[ERROR] milvus-minio-secret not found or empty in namespace $(NAMESPACE)"; \
echo " Please ensure Sealed Secrets Controller is running and the secret was decrypted."; \
echo " For local dev: kubectl create secret generic milvus-minio-secret \\"; \
echo " --from-literal=accessKey=<key> --from-literal=secretKey=<key> -n $(NAMESPACE)"; \
exit 1; \
fi; \
elif [ "$*" = "data-juicer" ] || [ "$*" = "dj" ]; then \
kubectl apply -f deployment/kubernetes/data-juicer/deploy.yaml -n $(NAMESPACE); \
fi
Expand Down
20 changes: 18 additions & 2 deletions deployment/helm/milvus/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ nameOverride: ""
## Default fully qualified app name
fullnameOverride: ""

## Define toleration for node isolation
## This anchor can be referenced throughout the configuration
nodeIsolationTolerations: &nodeIsolationTolerations
- key: "node-role.kubernetes.io/datamate"
operator: "Equal"
value: "true"
effect: "NoSchedule"

## Enable or disable Milvus Cluster mode
cluster:
enabled: false
Expand Down Expand Up @@ -31,7 +39,7 @@ nodeSelector: {}
# Global tolerations
# If set, this will apply to all milvus components
# Individual components can be set to a different tolerations
tolerations: []
tolerations: *nodeIsolationTolerations

# Global affinity
# If set, this will apply to all milvus components
Expand Down Expand Up @@ -234,7 +242,7 @@ standalone:
# ephemeral-storage: 100Gi
nodeSelector: {}
affinity: {}
tolerations: []
tolerations: *nodeIsolationTolerations
securityContext: {}
containerSecurityContext: {}
topologySpreadConstraints: [] # Component specific topologySpreadConstraints
Expand Down Expand Up @@ -653,6 +661,10 @@ minio:
type: ClusterIP
port: 9000

nodeSelector: {}
tolerations: *nodeIsolationTolerations
affinity: {}

persistence:
enabled: true
existingClaim: ""
Expand Down Expand Up @@ -722,6 +734,10 @@ etcd:
storagePath:
storageNode:

nodeSelector: {}
tolerations: *nodeIsolationTolerations
affinity: {}

## Change default timeout periods to mitigate zoobie probe process
livenessProbe:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logging import get_logger
from app.module.cleaning.repository import CleaningTaskRepository
Expand All @@ -12,6 +13,7 @@ class CleaningTaskScheduler:
def __init__(self, task_repo: CleaningTaskRepository, runtime_client: RuntimeClient):
self.task_repo = task_repo
self.runtime_client = runtime_client
self._polling_tasks: dict[str, asyncio.Task] = {}

async def execute_task(self, db: AsyncSession, task_id: str, retry_count: int) -> bool:
"""Execute cleaning task"""
Expand All @@ -25,12 +27,23 @@ async def execute_task(self, db: AsyncSession, task_id: str, retry_count: int) -
task.retry_count = retry_count

await self.task_repo.update_task(db, task)
return await self.runtime_client.submit_task(task_id, retry_count)
submitted = await self.runtime_client.submit_task(task_id, retry_count)

if submitted:
# Start background polling to sync task status from runtime
self._start_status_polling(task_id)

return submitted

async def stop_task(self, db: AsyncSession, task_id: str) -> bool:
"""Stop cleaning task"""
from app.module.cleaning.schema import CleaningTaskDto, CleaningTaskStatus

# Cancel background polling
if task_id in self._polling_tasks:
self._polling_tasks[task_id].cancel()
del self._polling_tasks[task_id]

await self.runtime_client.stop_task(task_id)

task = CleaningTaskDto()
Expand All @@ -39,3 +52,61 @@ async def stop_task(self, db: AsyncSession, task_id: str) -> bool:

await self.task_repo.update_task(db, task)
return True

def _start_status_polling(self, task_id: str):
"""Start background task to poll runtime for task status"""

async def _poll_loop():
from app.module.cleaning.schema import CleaningTaskDto, CleaningTaskStatus
from app.db.session import AsyncSessionLocal
from datetime import datetime

logger.info(f"[Polling] Starting status polling for task {task_id}")
await asyncio.sleep(5)

terminal_statuses = {"completed", "failed", "cancelled", "stopped"}
max_polls = 1800 # Max 1 hour (2s interval)
poll_count = 0

while poll_count < max_polls:
try:
status_data = await self.runtime_client.get_task_status(task_id)

if status_data is None:
poll_count += 1
await asyncio.sleep(2)
continue

current_status = (status_data.get("status", "") or "").lower()
logger.debug(f"[Polling] Task {task_id} status: {current_status}")

if current_status in terminal_statuses:
async with AsyncSessionLocal() as db:
task = CleaningTaskDto()
task.id = task_id
if current_status == "completed":
task.status = CleaningTaskStatus.COMPLETED
else:
task.status = CleaningTaskStatus.FAILED
task.finished_at = datetime.now()
await self.task_repo.update_task(db, task)
await db.commit()
logger.info(
f"[Polling] Task {task_id} finished: {current_status}"
)
break

except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"[Polling] Error polling task {task_id}: {e}")

poll_count += 1
await asyncio.sleep(2)
else:
logger.warning(f"[Polling] Task {task_id} timed out")

self._polling_tasks.pop(task_id, None)

task = asyncio.create_task(_poll_loop())
self._polling_tasks[task_id] = task
1 change: 1 addition & 0 deletions runtime/python-executor/datamate/common/error_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ErrorCode(Enum):
# 通用错误
SUCCESS = (0, "Success")
UNKNOWN_ERROR = (1, "Unknown error")
PARAM_ERROR = (2, "Invalid parameter")
FILE_NOT_FOUND_ERROR = (1000, "File not found!")
SUBMIT_TASK_ERROR = (1001, "Task submitted Failed!")
CANCEL_TASK_ERROR = (1002, "Task canceled Failed!")
60 changes: 57 additions & 3 deletions runtime/python-executor/datamate/operator_runtime.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
from typing import Optional, Dict, Any, List

import uvicorn
Expand All @@ -13,6 +14,7 @@
from datamate.common.error_code import ErrorCode
from datamate.scheduler import cmd_scheduler
from datamate.scheduler import func_scheduler
from datamate.scheduler import ray_job_scheduler
from datamate.wrappers import WRAPPERS

# 日志配置
Expand Down Expand Up @@ -63,6 +65,18 @@ class QueryTaskRequest(BaseModel):
task_ids: List[str]


# UUID pattern for task_id validation (prevents path traversal)
_TASK_ID_PATTERN = re.compile(
r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$'
)


def _validate_task_id(task_id: str) -> None:
"""Validate task_id is a proper UUID to prevent path traversal (CodeQL)."""
if not _TASK_ID_PATTERN.match(task_id):
raise APIException(ErrorCode.PARAM_ERROR, detail=f"Invalid task_id format: {task_id}")


@app.post("/api/task/list")
async def query_task_info(request: QueryTaskRequest):
try:
Expand All @@ -77,6 +91,7 @@ class SubmitTaskRequest(BaseModel):

@app.post("/api/task/{task_id}/submit")
async def submit_task(task_id, request: SubmitTaskRequest = None):
_validate_task_id(task_id)
retry_count = request.retry_count if request else 0
config_path = f"/flow/{task_id}/process.yaml"
logger.info(f"Start submitting job with retry_count={retry_count}...")
Expand All @@ -102,8 +117,42 @@ async def submit_task(task_id, request: SubmitTaskRequest = None):
return success_json_info


@app.get("/api/task/{task_id}/status")
async def get_task_status(task_id):
"""Get task execution status from the scheduler"""
_validate_task_id(task_id)
try:
executor_type = get_from_cfg(task_id, "executor_type", default="datamate")
if executor_type == "ray" or executor_type == "datamate":
result = ray_job_scheduler.get_task_status(task_id)
else:
result = cmd_scheduler.get_task_status(task_id)

if result is None:
return JSONResponse(
content={"status": "NOT_FOUND", "message": f"Task {task_id} not found"},
status_code=200
)

return JSONResponse(
content={
"task_id": result.task_id,
"status": result.status.value,
"error": result.error,
"created_at": result.created_at.isoformat() if result.created_at else None,
"started_at": result.started_at.isoformat() if result.started_at else None,
"completed_at": result.completed_at.isoformat() if result.completed_at else None,
},
status_code=200
)
except Exception as e:
logger.error(f"Error getting task status {task_id}: {e}")
raise APIException(ErrorCode.UNKNOWN_ERROR)


@app.post("/api/task/{task_id}/stop")
async def stop_task(task_id):
_validate_task_id(task_id)
logger.info("Start stopping ray job...")
success_json_info = JSONResponse(
content={"status": "Success", "message": f"{task_id} has been stopped"},
Expand All @@ -128,16 +177,21 @@ def check_valid_path(file_path):
return os.path.exists(full_path)


def get_from_cfg(task_id, key):
config_path = f"/flow/{task_id}/process.yaml"
def get_from_cfg(task_id, key, default=None):
# Build path, then resolve ".." and verify it stays within /flow (CodeQL)
config_path = os.path.join("/flow", task_id, "process.yaml")
config_path = os.path.normpath(config_path)
if not config_path.startswith("/flow/"):
raise APIException(ErrorCode.PARAM_ERROR, detail=f"Invalid task_id: {task_id}")

if not check_valid_path(config_path):
logger.error(f"config_path is not existed! please check this path.")
raise APIException(ErrorCode.FILE_NOT_FOUND_ERROR)

with open(config_path, "r", encoding='utf-8') as f:
content = f.read()
cfg = yaml.safe_load(content)
return cfg[key]
return cfg.get(key, default)


def parse_args():
Expand Down
42 changes: 41 additions & 1 deletion runtime/python-executor/datamate/scheduler/job_task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ async def _execute(self):
poll_interval = 2 # 2 秒轮询一次
elapsed_time = 0
last_log_position = 0 # 记录已写入的日志位置
connection_failure_count = 0 # 连接失败计数器
max_connection_failures = 5 # 最大连接失败次数阈值

# 任务无进展超时:如果 job 一直是 RUNNING 但日志/进度没有任何变化,
# 说明 workers 可能已全部丢失,判定为失败
stall_timeout = int(os.getenv("RAY_JOB_STALL_TIMEOUT", "120")) # 默认 120 秒
last_log_size = 0
last_active_time = datetime.now()

while True:
if self._cancelled:
Expand All @@ -86,10 +94,28 @@ async def _execute(self):
try:
info = client.get_job_info(self.job_id)
job_status = info.status

# 连接成功,重置失败计数器
connection_failure_count = 0

# 获取并写入日志
await self._fetch_and_write_logs(client, last_log_position)
last_log_position = os.path.getsize(self.log_path) if os.path.exists(self.log_path) else 0
current_log_size = os.path.getsize(self.log_path) if os.path.exists(self.log_path) else 0
last_log_position = current_log_size

# 检测任务是否停滞(workers 全部丢失但 head 仍返回 RUNNING)
if current_log_size > last_log_size:
last_active_time = datetime.now()
last_log_size = current_log_size
elif (datetime.now() - last_active_time).total_seconds() > stall_timeout:
logger.error(
f"Ray Job {self.job_id} stalled for {stall_timeout}s "
f"(no log progress), marking as FAILED"
)
self._stop_job(client)
self.status = TaskStatus.FAILED
self.error = f"Job stalled: no progress for {stall_timeout} seconds (workers may be lost)"
break

if job_status == "SUCCEEDED":
self.status = TaskStatus.COMPLETED
Expand Down Expand Up @@ -119,7 +145,21 @@ async def _execute(self):
self.error = f"Job timed out after {self.timeout} seconds"
break

except (ConnectionError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
# 连接类异常:Ray 集群可能已下线
connection_failure_count += 1
logger.error(
f"Connection to Ray cluster failed (attempt {connection_failure_count}/{max_connection_failures}): {e}"
)

if connection_failure_count >= max_connection_failures:
self.status = TaskStatus.FAILED
self.error = f"Lost connection to Ray cluster after {max_connection_failures} attempts: {str(e)}"
logger.error(f"Task {self.task_id} failed: {self.error}")
break

except Exception as e:
# 其他异常:记录警告但继续重试
logger.warning(f"Error checking job status: {e}")

await asyncio.sleep(poll_interval)
Expand Down
Loading