diff --git a/agent/internal/agent/agent.go b/agent/internal/agent/agent.go index 26cbb5b..3c56756 100644 --- a/agent/internal/agent/agent.go +++ b/agent/internal/agent/agent.go @@ -65,6 +65,9 @@ type Agent struct { statusReportRequested chan string refreshMutex sync.Mutex pendingExpectedStateRefresh bool + workMutex sync.Mutex + activeWorkItem *agenthttp.WorkQueueItem + pendingWorkResults []agenthttp.CompletedWorkItem Client *agenthttp.Client Reconciler *reconcile.Reconciler Config *Config diff --git a/agent/internal/agent/run.go b/agent/internal/agent/run.go index 0760dd2..0447552 100644 --- a/agent/internal/agent/run.go +++ b/agent/internal/agent/run.go @@ -40,7 +40,6 @@ func (a *Agent) Run(ctx context.Context) { } go a.StatusReportLoop(ctx) - go a.WorkQueueLoop(ctx) a.Tick() @@ -74,17 +73,25 @@ func (a *Agent) Run(ctx context.Context) { func (a *Agent) StatusReportLoop(ctx context.Context) { a.reportStatus("startup") - ticker := time.NewTicker(WorkQueueStatusInterval) - defer ticker.Stop() + timer := time.NewTimer(nextStatusReportDelay()) + defer timer.Stop() for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-timer.C: a.reportStatus("periodic") + timer.Reset(nextStatusReportDelay()) case reason := <-a.statusReportRequested: a.reportStatus(reason) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(nextStatusReportDelay()) } } } @@ -100,21 +107,19 @@ func (a *Agent) RequestStatusReport(reason string) { func (a *Agent) reportStatus(reason string) { report := a.BuildStatusReport(true) - if err := a.Client.ReportStatus(report); err != nil { + completed, active := a.SnapshotWorkStatus() + response, err := a.Client.ReportStatus(report, completed, active) + if err != nil { log.Printf("[status] failed to report (%s): %v", reason, err) return } + a.AcknowledgeWorkResults(response.AcceptedWorkItemResults, response.RejectedWorkItemResults) + a.LogRejectedActiveWorkItems(response.RejectedActiveWorkItems) + a.AcceptLeasedWorkItems(response.WorkItems) log.Printf("[status] reported (%s)", reason) } -func (a *Agent) WorkQueueLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - } - - a.ProcessWorkQueue() - } +func nextStatusReportDelay() time.Duration { + jitter := time.Duration(time.Now().UnixNano() % int64(5*time.Second)) + return StatusReportInterval + jitter } diff --git a/agent/internal/agent/workqueue.go b/agent/internal/agent/workqueue.go index ffc4adc..6aa1690 100644 --- a/agent/internal/agent/workqueue.go +++ b/agent/internal/agent/workqueue.go @@ -1,61 +1,134 @@ package agent import ( + "fmt" "log" "time" + + agenthttp "techulus/cloud-agent/internal/http" ) const ( - LongPollTimeout = 30 * time.Second - WorkQueueStatusInterval = 60 * time.Second + StatusReportInterval = 15 * time.Second ) -func (a *Agent) ProcessWorkQueue() { - items, err := a.Client.GetWorkQueue(LongPollTimeout) - if err != nil { - log.Printf("[work-queue] failed to get work queue: %v", err) - time.Sleep(5 * time.Second) +func (a *Agent) SnapshotWorkStatus() ([]agenthttp.CompletedWorkItem, []agenthttp.ActiveWorkItem) { + a.workMutex.Lock() + defer a.workMutex.Unlock() + + completed := append([]agenthttp.CompletedWorkItem(nil), a.pendingWorkResults...) + active := []agenthttp.ActiveWorkItem{} + if a.activeWorkItem != nil { + active = append(active, agenthttp.ActiveWorkItem{ + ID: a.activeWorkItem.ID, + Attempt: a.activeWorkItem.Attempt, + }) + } + + return completed, active +} + +func (a *Agent) AcknowledgeWorkResults(accepted []string, rejected []agenthttp.RejectedWorkItemResult) { + if len(accepted) == 0 && len(rejected) == 0 { return } - for _, item := range items { - log.Printf("[work-queue] processing item %s (type=%s)", Truncate(item.ID, 8), item.Type) - - var processErr error - switch item.Type { - case "restart": - processErr = a.ProcessRestart(item) - case "stop": - processErr = a.ProcessStop(item) - case "deploy": - a.RequestReconcile("deploy work item " + Truncate(item.ID, 8)) - case "force_cleanup": - processErr = a.ProcessForceCleanup(item) - case "cleanup_volumes": - processErr = a.ProcessCleanupVolumes(item) - case "build": - processErr = a.ProcessBuild(item) - case "backup_volume": - processErr = a.ProcessBackupVolume(item) - case "restore_volume": - processErr = a.ProcessRestoreVolume(item) - case "create_manifest": - processErr = a.ProcessCreateManifest(item) - default: - log.Printf("[work-queue] unknown work item type: %s", item.Type) - continue - } + acknowledged := map[string]struct{}{} + for _, id := range accepted { + acknowledged[id] = struct{}{} + } + for _, item := range rejected { + acknowledged[item.ID] = struct{}{} + log.Printf("[work-queue] completion rejected for %s: %s", Truncate(item.ID, 8), item.Reason) + } + + a.workMutex.Lock() + defer a.workMutex.Unlock() - if processErr != nil { - log.Printf("[work-queue] item %s failed: %v", Truncate(item.ID, 8), processErr) - if err := a.Client.CompleteWorkItem(item.ID, "failed", processErr.Error()); err != nil { - log.Printf("[work-queue] failed to mark item as failed: %v", err) - } - } else { - log.Printf("[work-queue] item %s completed", Truncate(item.ID, 8)) - if err := a.Client.CompleteWorkItem(item.ID, "completed", ""); err != nil { - log.Printf("[work-queue] failed to mark item as completed: %v", err) - } + pending := a.pendingWorkResults[:0] + for _, result := range a.pendingWorkResults { + if _, ok := acknowledged[result.ID]; !ok { + pending = append(pending, result) } } + a.pendingWorkResults = pending +} + +func (a *Agent) LogRejectedActiveWorkItems(rejected []agenthttp.RejectedWorkItemResult) { + for _, item := range rejected { + log.Printf("[work-queue] active item renewal rejected for %s: %s", Truncate(item.ID, 8), item.Reason) + } +} + +func (a *Agent) AcceptLeasedWorkItems(items []agenthttp.WorkQueueItem) { + if len(items) == 0 { + return + } + + item := items[0] + + a.workMutex.Lock() + if a.activeWorkItem != nil { + log.Printf("[work-queue] ignoring leased item %s while %s is active", Truncate(item.ID, 8), Truncate(a.activeWorkItem.ID, 8)) + a.workMutex.Unlock() + return + } + a.activeWorkItem = &item + a.workMutex.Unlock() + + go a.processLeasedWorkItem(item) +} + +func (a *Agent) processLeasedWorkItem(item agenthttp.WorkQueueItem) { + log.Printf("[work-queue] processing item %s (type=%s attempt=%d)", Truncate(item.ID, 8), item.Type, item.Attempt) + + status := "completed" + errorMsg := "" + if err := a.ProcessWorkItem(item); err != nil { + status = "failed" + errorMsg = err.Error() + log.Printf("[work-queue] item %s failed: %v", Truncate(item.ID, 8), err) + } else { + log.Printf("[work-queue] item %s completed", Truncate(item.ID, 8)) + } + + a.workMutex.Lock() + if a.activeWorkItem != nil && a.activeWorkItem.ID == item.ID && a.activeWorkItem.Attempt == item.Attempt { + a.activeWorkItem = nil + } + a.pendingWorkResults = append(a.pendingWorkResults, agenthttp.CompletedWorkItem{ + ID: item.ID, + Attempt: item.Attempt, + Status: status, + Error: errorMsg, + }) + a.workMutex.Unlock() + + a.RequestStatusReport("work item " + status) +} + +func (a *Agent) ProcessWorkItem(item agenthttp.WorkQueueItem) error { + switch item.Type { + case "restart": + return a.ProcessRestart(item) + case "stop": + return a.ProcessStop(item) + case "deploy": + a.RequestReconcile("deploy work item " + Truncate(item.ID, 8)) + return nil + case "force_cleanup": + return a.ProcessForceCleanup(item) + case "cleanup_volumes": + return a.ProcessCleanupVolumes(item) + case "build": + return a.ProcessBuild(item) + case "backup_volume": + return a.ProcessBackupVolume(item) + case "restore_volume": + return a.ProcessRestoreVolume(item) + case "create_manifest": + return a.ProcessCreateManifest(item) + default: + return fmt.Errorf("unknown work item type: %s", item.Type) + } } diff --git a/agent/internal/container/runtime_darwin.go b/agent/internal/container/runtime_darwin.go index ec068b3..0cd1e0e 100644 --- a/agent/internal/container/runtime_darwin.go +++ b/agent/internal/container/runtime_darwin.go @@ -104,6 +104,7 @@ func Deploy(config *DeployConfig) (*DeployResult, error) { "--cap-add", "CHOWN", "--cap-add", "DAC_OVERRIDE", "--cap-add", "FOWNER", + "--cap-add", "SETPCAP", "--cap-add", "SETUID", "--cap-add", "SETGID", "--cap-add", "NET_BIND_SERVICE", diff --git a/agent/internal/container/runtime_linux.go b/agent/internal/container/runtime_linux.go index 630f5fb..840a947 100644 --- a/agent/internal/container/runtime_linux.go +++ b/agent/internal/container/runtime_linux.go @@ -102,6 +102,7 @@ func Deploy(config *DeployConfig) (*DeployResult, error) { "--cap-add", "CHOWN", "--cap-add", "DAC_OVERRIDE", "--cap-add", "FOWNER", + "--cap-add", "SETPCAP", "--cap-add", "SETUID", "--cap-add", "SETGID", "--cap-add", "NET_BIND_SERVICE", diff --git a/agent/internal/http/client.go b/agent/internal/http/client.go index 418ee6a..15a4f42 100644 --- a/agent/internal/http/client.go +++ b/agent/internal/http/client.go @@ -257,6 +257,18 @@ type StatusReport struct { AgentHealth *AgentHealth `json:"agentHealth,omitempty"` } +type CompletedWorkItem struct { + ID string `json:"id"` + Attempt int `json:"attempt"` + Status string `json:"status"` + Error string `json:"error,omitempty"` +} + +type ActiveWorkItem struct { + ID string `json:"id"` + Attempt int `json:"attempt"` +} + type BuildDetails struct { Build struct { ID string `json:"id"` @@ -344,52 +356,41 @@ type WorkQueueItem struct { ID string `json:"id"` Type string `json:"type"` Payload string `json:"payload"` + Attempt int `json:"attempt"` } -func (c *Client) GetWorkQueue(timeout time.Duration) ([]WorkQueueItem, error) { - url := fmt.Sprintf("%s/api/v1/agent/work-queue?timeout=%d", c.baseURL, timeout.Milliseconds()) - - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - c.signRequest(req, "") - - resp, err := c.longClient.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to fetch work queue: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("work queue request failed with status %d: %s", resp.StatusCode, string(body)) - } - - var result struct { - Items []WorkQueueItem `json:"items"` - } - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return nil, fmt.Errorf("failed to decode work queue: %w", err) - } +type RejectedWorkItemResult struct { + ID string `json:"id"` + Reason string `json:"reason"` +} - return result.Items, nil +type StatusResponse struct { + OK bool `json:"ok"` + AcceptedWorkItemResults []string `json:"acceptedWorkItemResults"` + RejectedWorkItemResults []RejectedWorkItemResult `json:"rejectedWorkItemResults"` + RejectedActiveWorkItems []RejectedWorkItemResult `json:"rejectedActiveWorkItems"` + WorkItems []WorkQueueItem `json:"workItems"` } -func (c *Client) ReportStatus(report *StatusReport) error { +func (c *Client) ReportStatus(report *StatusReport, completed []CompletedWorkItem, active []ActiveWorkItem) (*StatusResponse, error) { payload := map[string]interface{}{ "statusReport": report, } + if len(completed) > 0 { + payload["completedWorkItems"] = completed + } + if len(active) > 0 { + payload["activeWorkItems"] = active + } body, err := json.Marshal(payload) if err != nil { - return fmt.Errorf("failed to marshal status report: %w", err) + return nil, fmt.Errorf("failed to marshal status report: %w", err) } req, err := http.NewRequest("POST", c.baseURL+"/api/v1/agent/status", bytes.NewReader(body)) if err != nil { - return fmt.Errorf("failed to create request: %w", err) + return nil, fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/json") @@ -397,52 +398,21 @@ func (c *Client) ReportStatus(report *StatusReport) error { resp, err := c.client.Do(req) if err != nil { - return fmt.Errorf("failed to report status: %w", err) + return nil, fmt.Errorf("failed to report status: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { respBody, _ := io.ReadAll(resp.Body) - return fmt.Errorf("status report failed with status %d: %s", resp.StatusCode, string(respBody)) + return nil, fmt.Errorf("status report failed with status %d: %s", resp.StatusCode, string(respBody)) } - return nil -} - -func (c *Client) CompleteWorkItem(id, status, errorMsg string) error { - payload := map[string]string{ - "id": id, - "status": status, - } - if errorMsg != "" { - payload["error"] = errorMsg + var statusResponse StatusResponse + if err := json.NewDecoder(resp.Body).Decode(&statusResponse); err != nil { + return nil, fmt.Errorf("failed to decode status response: %w", err) } - body, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("failed to marshal work item update: %w", err) - } - - req, err := http.NewRequest("POST", c.baseURL+"/api/v1/agent/work-queue/complete", bytes.NewReader(body)) - if err != nil { - return fmt.Errorf("failed to create request: %w", err) - } - - req.Header.Set("Content-Type", "application/json") - c.signRequest(req, string(body)) - - resp, err := c.client.Do(req) - if err != nil { - return fmt.Errorf("failed to complete work item: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - respBody, _ := io.ReadAll(resp.Body) - return fmt.Errorf("work item update failed with status %d: %s", resp.StatusCode, string(respBody)) - } - - return nil + return &statusResponse, nil } func (c *Client) GetBuildStatus(buildID string) (string, error) { diff --git a/cli/src/main.ts b/cli/src/main.ts index fdd61ed..9481465 100644 --- a/cli/src/main.ts +++ b/cli/src/main.ts @@ -360,6 +360,9 @@ service: image: nginx:1.27 replicas: count: 1 + resources: + cpuCores: 2 + memoryMb: 1024 ports: - port: 80 public: false diff --git a/deployment/README.md b/deployment/README.md index 3c15252..884fd1c 100644 --- a/deployment/README.md +++ b/deployment/README.md @@ -53,6 +53,19 @@ INNGEST_SIGNING_KEY=signkey-prod- INNGEST_EVENT_KEY= ``` +### Web Replicas + +Set `WEB_REPLICAS` in `.env` to run multiple control plane web containers: + +```env +WEB_REPLICAS=2 +``` + +Traefik discovers the replicated `web` containers through the Docker provider +and load balances requests for `${ROOT_DOMAIN}` across them. The startup schema +sync remains in the web container entrypoint, so keep in mind that simultaneous +replica starts may run `drizzle-kit push` concurrently during upgrades. + ## Database Migrations Schema is synced automatically on container startup via `drizzle-kit push`. This approach auto-confirms non-destructive changes (adding tables, columns, indexes) but will **not** auto-apply destructive changes like dropping columns or tables — those require manual intervention. diff --git a/deployment/compose.postgres.yml b/deployment/compose.postgres.yml index c08db65..3d309c8 100644 --- a/deployment/compose.postgres.yml +++ b/deployment/compose.postgres.yml @@ -32,6 +32,9 @@ services: - "--certificatesresolvers.letsencrypt.acme.httpchallenge.entrypoint=web" - "--certificatesresolvers.letsencrypt.acme.email=${ACME_EMAIL}" - "--certificatesresolvers.letsencrypt.acme.storage=/letsencrypt/acme.json" + - "--entrypoints.websecure.transport.respondingTimeouts.readTimeout=0" + - "--entrypoints.websecure.transport.respondingTimeouts.writeTimeout=0" + - "--entrypoints.websecure.transport.respondingTimeouts.idleTimeout=3600" - "--ping=true" ports: - "80:80" @@ -68,6 +71,7 @@ services: web: image: ghcr.io/techulus/cloud/web:tip + scale: ${WEB_REPLICAS:-1} env_file: - ./.env environment: diff --git a/deployment/compose.production.yml b/deployment/compose.production.yml index 5f3af28..7b18b68 100644 --- a/deployment/compose.production.yml +++ b/deployment/compose.production.yml @@ -32,7 +32,9 @@ services: - "--certificatesresolvers.letsencrypt.acme.httpchallenge.entrypoint=web" - "--certificatesresolvers.letsencrypt.acme.email=${ACME_EMAIL}" - "--certificatesresolvers.letsencrypt.acme.storage=/letsencrypt/acme.json" - - "--entrypoints.websecure.transport.respondingTimeouts.readTimeout=600" + - "--entrypoints.websecure.transport.respondingTimeouts.readTimeout=0" + - "--entrypoints.websecure.transport.respondingTimeouts.writeTimeout=0" + - "--entrypoints.websecure.transport.respondingTimeouts.idleTimeout=3600" - "--ping=true" ports: - "80:80" @@ -51,6 +53,7 @@ services: web: image: ghcr.io/techulus/cloud/web:tip + scale: ${WEB_REPLICAS:-1} env_file: - ./.env environment: diff --git a/deployment/install.sh b/deployment/install.sh index 5016aff..b4cbfce 100755 --- a/deployment/install.sh +++ b/deployment/install.sh @@ -377,6 +377,7 @@ REGISTRY_HTTP_SECRET=${REGISTRY_HTTP_SECRET} INNGEST_SIGNING_KEY=${INNGEST_SIGNING_KEY} INNGEST_EVENT_KEY=${INNGEST_EVENT_KEY} +WEB_REPLICAS=1 ALLOW_SIGNUP=true COMPOSE_FILE=${COMPOSE_FILE} diff --git a/docs/agents/architecture.mdx b/docs/agents/architecture.mdx index fec5c24..16e466b 100644 --- a/docs/agents/architecture.mdx +++ b/docs/agents/architecture.mdx @@ -1,6 +1,6 @@ --- title: "Architecture" -description: "State machine, drift detection, build pipeline, and work queue." +description: "State machine, drift detection, build pipeline, and leased commands." --- ## State Machine @@ -59,9 +59,14 @@ Agents can build container images directly from GitHub sources: Build logs stream to VictoriaLogs in real time. -## Work Queue +## Leased Commands -Agents also process queue items for operations that cannot be modeled purely as expected state: +Agents report status to the control plane on a short interval. The status response +can include one command for operations that cannot be modeled purely as expected +state. The command's attempt number acts as a generation guard: the agent reports +the result with that attempt, and stale completions from older attempts are +ignored. If an agent crashes or stops renewing the command through status +reports, the command can be retried up to the fixed attempt limit. | Type | Description | | --- | --- | diff --git a/docs/architecture.mdx b/docs/architecture.mdx index 0ed9ff5..6272152 100644 --- a/docs/architecture.mdx +++ b/docs/architecture.mdx @@ -25,7 +25,7 @@ Techulus Cloud is a stateless container deployment platform built around three c | Reverse Proxy | Traefik | Automatic HTTPS via Let's Encrypt, runs on proxy nodes only | | Private Network | WireGuard | Full mesh coordinated by the control plane | | Service Discovery | Built-in DNS | Agent serves `.internal` domains | -| Agent Communication | Pull-based HTTP | Agent polls expected state and reports status | +| Agent Communication | Pull-based HTTP | Agent polls expected state and receives leased commands through status reports | ## Node Types @@ -44,9 +44,9 @@ graph TD Internet["Internet"] -->|"DNS"| P1 CP["Control Plane
Next.js + API Routes + Postgres"] - CP -- "HTTPS poll every 10s" --> P1 - CP -- "HTTPS poll every 10s" --> W1 - CP -- "HTTPS poll every 10s" --> W2 + CP -- "HTTPS status + state poll" --> P1 + CP -- "HTTPS status + state poll" --> W1 + CP -- "HTTPS status + state poll" --> W2 subgraph Servers P1["Proxy Node
Agent · Podman · Traefik · DNS · WireGuard
WG: 10.100.1.1 · Containers: 10.200.1.2-254"] diff --git a/docs/deployments/compose.mdx b/docs/deployments/compose.mdx index d03e8e7..44b881d 100644 --- a/docs/deployments/compose.mdx +++ b/docs/deployments/compose.mdx @@ -20,6 +20,8 @@ The following Compose fields are parsed and applied: | `deploy.resources.limits` | CPU and memory limits | | `command` | Start command override | +When a Compose service does not define `deploy.resources.limits`, Techulus Cloud applies the default Large preset: 2 CPU cores and 1024 MB memory. + ## How It Works 1. Paste or upload your `docker-compose.yml` in the project settings. diff --git a/docs/installation.mdx b/docs/installation.mdx index 7331407..7af57c2 100644 --- a/docs/installation.mdx +++ b/docs/installation.mdx @@ -92,6 +92,17 @@ use the common commands below when investigating a self-hosted service. | `INNGEST_SIGNING_KEY` | Request verification key (prefix with `signkey-prod-`) | | `INNGEST_EVENT_KEY` | Event API key | +### Control Plane Replicas + +| Variable | Description | +| --- | --- | +| `WEB_REPLICAS` | Number of control plane web containers to run (default: `1`) | + +When `WEB_REPLICAS` is greater than `1`, Traefik discovers the replicated web +containers through Docker and load balances requests for `` across +them. Startup schema sync still runs from each web container, so simultaneous +replica starts may run `drizzle-kit push` concurrently during upgrades. + ### GitHub Integration (Optional) | Variable | Description | diff --git a/docs/services/configuration.mdx b/docs/services/configuration.mdx index 41265cd..21ea234 100644 --- a/docs/services/configuration.mdx +++ b/docs/services/configuration.mdx @@ -35,7 +35,7 @@ You can set CPU and memory limits per service: | CPU limit | Maximum CPU cores (e.g., `0.5`, `1`, `2`) | | Memory limit | Maximum memory in MB (e.g., `256`, `512`, `1024`) | -When no limits are set, the container uses whatever resources are available on the host. +New services default to the Large preset: 2 CPU cores and 1024 MB memory. You can change the preset or choose No limit to let the container use whatever resources are available on the host. ## Health Checks diff --git a/web/actions/projects.ts b/web/actions/projects.ts index abeff3c..b855b9d 100644 --- a/web/actions/projects.ts +++ b/web/actions/projects.ts @@ -38,6 +38,7 @@ import { getEnvironment, getProject, getService } from "@/db/queries"; import { allocatePort } from "@/lib/port-allocation"; import cronstrue from "cronstrue"; import { startMigration } from "./migrations"; +import { DEFAULT_RESOURCE_LIMITS } from "@/lib/constants"; import { inngest } from "@/lib/inngest/client"; import { inngestEvents } from "@/lib/inngest/events"; @@ -372,6 +373,10 @@ type CreateServiceInput = { environmentId: string; name: string; image: string; + resourceLimits?: { + cpuCores: number | null; + memoryMb: number | null; + }; github?: { repoUrl: string; branch: string; @@ -383,6 +388,7 @@ type CreateServiceInput = { export async function createService(input: CreateServiceInput) { const { projectId, environmentId, name, image, github } = input; + const resourceLimits = input.resourceLimits ?? DEFAULT_RESOURCE_LIMITS; const env = await getEnvironment(environmentId); if (!env) { throw new Error("Environment not found"); @@ -428,6 +434,8 @@ export async function createService(input: CreateServiceInput) { replicas: 1, stateful: false, autoPlace: true, + resourceCpuLimit: resourceLimits.cpuCores, + resourceMemoryLimitMb: resourceLimits.memoryMb, }); if (github?.installationId && github?.repoId) { diff --git a/web/app/api/v1/agent/status/route.ts b/web/app/api/v1/agent/status/route.ts index e7b8dbd..2f53b84 100644 --- a/web/app/api/v1/agent/status/route.ts +++ b/web/app/api/v1/agent/status/route.ts @@ -1,6 +1,19 @@ -import { NextRequest, NextResponse } from "next/server"; +import { type NextRequest, NextResponse } from "next/server"; import { verifyAgentRequest } from "@/lib/agent-auth"; import { applyStatusReport, type StatusReport } from "@/lib/agent-status"; +import { + type ActiveWorkItem, + claimNextWorkItem, + completeWorkItemResults, + renewActiveWorkItems, + type WorkItemResult, +} from "@/lib/work-queue"; + +type StatusRequestBody = { + statusReport?: StatusReport; + completedWorkItems?: WorkItemResult[]; + activeWorkItems?: ActiveWorkItem[]; +}; export async function POST(request: NextRequest) { const body = await request.text(); @@ -9,7 +22,7 @@ export async function POST(request: NextRequest) { return NextResponse.json({ error: auth.error }, { status: auth.status }); } - let data: { statusReport?: StatusReport }; + let data: StatusRequestBody; try { data = JSON.parse(body); } catch { @@ -27,5 +40,52 @@ export async function POST(request: NextRequest) { await applyStatusReport(serverId, data.statusReport); - return NextResponse.json({ ok: true }); + const completedWorkItems = Array.isArray(data.completedWorkItems) + ? data.completedWorkItems.filter(isValidWorkItemResult) + : []; + const activeWorkItems = Array.isArray(data.activeWorkItems) + ? data.activeWorkItems.filter(isValidActiveWorkItem) + : []; + + const { accepted, rejected } = await completeWorkItemResults( + serverId, + completedWorkItems, + ); + + const rejectedActive = await renewActiveWorkItems(serverId, activeWorkItems); + + const nextWorkItem = + activeWorkItems.length === 0 ? await claimNextWorkItem(serverId) : null; + + return NextResponse.json({ + ok: true, + acceptedWorkItemResults: accepted, + rejectedWorkItemResults: rejected, + rejectedActiveWorkItems: rejectedActive, + workItems: nextWorkItem ? [nextWorkItem] : [], + }); +} + +function isValidWorkItemResult(value: unknown): value is WorkItemResult { + if (!value || typeof value !== "object") return false; + + const candidate = value as WorkItemResult; + return ( + typeof candidate.id === "string" && + Number.isInteger(candidate.attempt) && + candidate.attempt > 0 && + (candidate.status === "completed" || candidate.status === "failed") && + (candidate.error === undefined || typeof candidate.error === "string") + ); +} + +function isValidActiveWorkItem(value: unknown): value is ActiveWorkItem { + if (!value || typeof value !== "object") return false; + + const candidate = value as ActiveWorkItem; + return ( + typeof candidate.id === "string" && + Number.isInteger(candidate.attempt) && + candidate.attempt > 0 + ); } diff --git a/web/app/api/v1/agent/work-queue/complete/route.ts b/web/app/api/v1/agent/work-queue/complete/route.ts deleted file mode 100644 index c82cc0c..0000000 --- a/web/app/api/v1/agent/work-queue/complete/route.ts +++ /dev/null @@ -1,85 +0,0 @@ -import { NextRequest, NextResponse } from "next/server"; -import { db } from "@/db"; -import { workQueue } from "@/db/schema"; -import { eq, and } from "drizzle-orm"; -import { verifyAgentRequest } from "@/lib/agent-auth"; -import { inngest } from "@/lib/inngest/client"; -import { inngestEvents } from "@/lib/inngest/events"; - -export async function POST(request: NextRequest) { - const body = await request.text(); - const auth = await verifyAgentRequest(request, body); - if (!auth.success) { - return NextResponse.json({ error: auth.error }, { status: auth.status }); - } - - let data: { id?: string; status?: "completed" | "failed"; error?: string }; - try { - data = JSON.parse(body); - } catch { - return NextResponse.json({ error: "Invalid JSON body" }, { status: 400 }); - } - - if ( - typeof data.id !== "string" || - (data.status !== "completed" && data.status !== "failed") - ) { - return NextResponse.json( - { error: "Missing required fields: id, status" }, - { status: 400 }, - ); - } - - const { serverId } = auth; - - const result = await db - .update(workQueue) - .set({ - status: data.status, - }) - .where(and(eq(workQueue.id, data.id), eq(workQueue.serverId, serverId))) - .returning(); - - if (result.length === 0) { - return NextResponse.json( - { error: "Work queue item not found" }, - { status: 404 }, - ); - } - - const item = result[0]; - - if (item.type === "create_manifest" && item.payload) { - try { - const payload = JSON.parse(item.payload) as { - serviceId?: string; - finalImageUri?: string; - buildGroupId?: string; - }; - - if (data.status === "completed") { - if (payload.serviceId && payload.finalImageUri) { - await inngest.send( - inngestEvents.manifestCompleted.create({ - serviceId: payload.serviceId, - buildGroupId: payload.buildGroupId || "", - imageUri: payload.finalImageUri, - }), - ); - } - } else if (data.status === "failed" && payload.serviceId) { - await inngest.send( - inngestEvents.manifestFailed.create({ - serviceId: payload.serviceId, - buildGroupId: payload.buildGroupId || "", - error: data.error || "Manifest creation failed", - }), - ); - } - } catch (error) { - console.error(`[work-queue] failed to parse payload:`, error); - } - } - - return NextResponse.json({ ok: true }); -} diff --git a/web/app/api/v1/agent/work-queue/route.ts b/web/app/api/v1/agent/work-queue/route.ts deleted file mode 100644 index ac06f04..0000000 --- a/web/app/api/v1/agent/work-queue/route.ts +++ /dev/null @@ -1,85 +0,0 @@ -import { NextRequest, NextResponse } from "next/server"; -import { db } from "@/db"; -import { workQueue } from "@/db/schema"; -import { eq, and, inArray } from "drizzle-orm"; -import { verifyAgentRequest } from "@/lib/agent-auth"; - -const MAX_TIMEOUT = 30000; -// Short interval keeps deploy wake latency low for now, but increases empty -// polling load. Replace with Postgres LISTEN/NOTIFY or another wake mechanism -// before scaling this broadly. -const POLL_INTERVAL = 500; - -function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - -function normalizeTimeout(rawTimeout?: number | null) { - const parsed = rawTimeout ?? MAX_TIMEOUT; - return Math.min(Math.max(0, parsed || 0), MAX_TIMEOUT); -} - -async function longPollWorkQueue( - request: NextRequest, - serverId: string, - timeout: number, -) { - const startTime = Date.now(); - - console.log( - `[work-queue] long poll started for server=${serverId} timeout=${timeout}ms`, - ); - - while (true) { - if (request.signal.aborted) { - console.log(`[work-queue] request aborted for server=${serverId}`); - return NextResponse.json({ items: [] }); - } - - const items = await db - .select() - .from(workQueue) - .where( - and(eq(workQueue.serverId, serverId), eq(workQueue.status, "pending")), - ); - - if (items.length > 0) { - await db - .update(workQueue) - .set({ status: "processing", startedAt: new Date() }) - .where( - inArray( - workQueue.id, - items.map((i) => i.id), - ), - ); - - console.log( - `[work-queue] found ${items.length} items for server=${serverId}`, - ); - return NextResponse.json({ items }); - } - - if (Date.now() - startTime >= timeout) { - console.log(`[work-queue] timeout elapsed for server=${serverId}`); - return NextResponse.json({ items: [] }); - } - - await sleep(POLL_INTERVAL); - } -} - -export async function GET(request: NextRequest) { - const auth = await verifyAgentRequest(request); - if (!auth.success) { - return NextResponse.json({ error: auth.error }, { status: auth.status }); - } - - const { serverId } = auth; - const rawTimeout = request.nextUrl.searchParams.get("timeout"); - const timeout = normalizeTimeout( - rawTimeout ? parseInt(rawTimeout, 10) : null, - ); - - return longPollWorkQueue(request, serverId, timeout); -} diff --git a/web/components/service/details/resource-limits-section.tsx b/web/components/service/details/resource-limits-section.tsx index 1f28a59..bd48366 100644 --- a/web/components/service/details/resource-limits-section.tsx +++ b/web/components/service/details/resource-limits-section.tsx @@ -11,6 +11,7 @@ import { import { Gauge } from "lucide-react"; import { updateServiceResourceLimits } from "@/actions/projects"; import type { ServiceWithDetails as Service } from "@/db/types"; +import { DEFAULT_RESOURCE_LIMITS } from "@/lib/constants"; type Preset = { label: string; @@ -22,7 +23,11 @@ const PRESETS: Record = { none: { label: "No limit", cpuCores: null, memoryMb: null }, small: { label: "Small (0.5 CPU, 256MB)", cpuCores: 0.5, memoryMb: 256 }, medium: { label: "Medium (1 CPU, 512MB)", cpuCores: 1, memoryMb: 512 }, - large: { label: "Large (2 CPU, 1024MB)", cpuCores: 2, memoryMb: 1024 }, + large: { + label: "Large (2 CPU, 1024MB)", + cpuCores: DEFAULT_RESOURCE_LIMITS.cpuCores, + memoryMb: DEFAULT_RESOURCE_LIMITS.memoryMb, + }, xlarge: { label: "X-Large (4 CPU, 2048MB)", cpuCores: 4, memoryMb: 2048 }, custom: { label: "Custom", cpuCores: null, memoryMb: null }, }; diff --git a/web/db/schema.ts b/web/db/schema.ts index 757b6ed..fef32b5 100644 --- a/web/db/schema.ts +++ b/web/db/schema.ts @@ -302,8 +302,8 @@ export const services = pgTable("services", { healthCheckRetries: integer("health_check_retries").default(3), healthCheckStartPeriod: integer("health_check_start_period").default(30), startCommand: text("start_command"), - resourceCpuLimit: real("resource_cpu_limit"), - resourceMemoryLimitMb: integer("resource_memory_limit_mb"), + resourceCpuLimit: real("resource_cpu_limit").default(2), + resourceMemoryLimitMb: integer("resource_memory_limit_mb").default(1024), deployedConfig: text("deployed_config"), deploymentSchedule: text("deployment_schedule"), lastScheduledDeploymentRunAt: timestamp("last_scheduled_deployment_run_at", { diff --git a/web/lib/cli-service.ts b/web/lib/cli-service.ts index 0d4e672..30d28f5 100644 --- a/web/lib/cli-service.ts +++ b/web/lib/cli-service.ts @@ -19,6 +19,7 @@ import { getManifestServiceName, } from "@/lib/cli-manifest"; import { slugify } from "@/lib/utils"; +import { DEFAULT_RESOURCE_LIMITS } from "@/lib/constants"; import { createEnvironment, createProject, @@ -38,6 +39,18 @@ export type ManifestChange = { to: string; }; +function getManifestResourceLimits(manifest: TechulusManifest) { + const resources = manifest.service.resources; + if (resources === undefined) { + return DEFAULT_RESOURCE_LIMITS; + } + + return { + cpuCores: resources.cpuCores ?? null, + memoryMb: resources.memoryMb ?? null, + }; +} + export type ManifestApplyResult = { project: { id: string; name: string; slug: string }; environment: { id: string; name: string }; @@ -527,8 +540,9 @@ async function syncResources( manifest: TechulusManifest, changes: ManifestChange[], ) { - const desiredCpu = manifest.service.resources?.cpuCores ?? null; - const desiredMemory = manifest.service.resources?.memoryMb ?? null; + const desiredResources = getManifestResourceLimits(manifest); + const desiredCpu = desiredResources.cpuCores; + const desiredMemory = desiredResources.memoryMb; if ( currentService.resourceCpuLimit === desiredCpu && diff --git a/web/lib/constants.ts b/web/lib/constants.ts index 88493e4..318b66a 100644 --- a/web/lib/constants.ts +++ b/web/lib/constants.ts @@ -1,2 +1,7 @@ export const WIREGUARD_SUBNET_PREFIX = "10.100"; export const CONTAINER_SUBNET_PREFIX = "10.200"; + +export const DEFAULT_RESOURCE_LIMITS = { + cpuCores: 2, + memoryMb: 1024, +} as const; diff --git a/web/lib/scheduler.ts b/web/lib/scheduler.ts index 6da74f4..838896a 100644 --- a/web/lib/scheduler.ts +++ b/web/lib/scheduler.ts @@ -1,5 +1,5 @@ import { CronExpressionParser } from "cron-parser"; -import { and, eq, inArray, isNotNull, lt, ne } from "drizzle-orm"; +import { and, eq, inArray, isNotNull, lt, ne, sql } from "drizzle-orm"; import { triggerBuild } from "@/actions/builds"; import { deployService } from "@/actions/projects"; import { db } from "@/db"; @@ -15,6 +15,10 @@ import { calculateResourceAwarePlacement, replaceServiceReplicaPlacements, } from "@/lib/placement"; +import { + WORK_QUEUE_LEASE_DURATION_MS, + WORK_QUEUE_MAX_ATTEMPTS, +} from "@/lib/work-queue"; const STALE_THRESHOLD_MS = 120_000; // 2 minutes @@ -235,6 +239,9 @@ const OLD_ITEM_THRESHOLD_MS = 90 * 24 * 60 * 60 * 1000; export async function cleanupStaleItems(): Promise { const staleThreshold = new Date(Date.now() - STALE_ITEM_THRESHOLD_MS); + const workItemLeaseThreshold = new Date( + Date.now() - WORK_QUEUE_LEASE_DURATION_MS, + ); const oldThreshold = new Date(Date.now() - OLD_ITEM_THRESHOLD_MS); const staleRollouts = await db @@ -258,13 +265,16 @@ export async function cleanupStaleItems(): Promise { ); } + // Pending work is intentionally retained so commands can run when an agent + // reconnects. Only exhausted processing attempts are failed here. const staleWorkItems = await db .update(workQueue) .set({ status: "failed" }) .where( and( - inArray(workQueue.status, ["pending", "processing"]), - lt(workQueue.createdAt, staleThreshold), + eq(workQueue.status, "processing"), + lt(workQueue.startedAt, workItemLeaseThreshold), + sql`${workQueue.attempts} >= ${WORK_QUEUE_MAX_ATTEMPTS}`, ), ) .returning({ id: workQueue.id }); diff --git a/web/lib/work-queue.ts b/web/lib/work-queue.ts index d2cd726..338debf 100644 --- a/web/lib/work-queue.ts +++ b/web/lib/work-queue.ts @@ -1,7 +1,42 @@ import { randomUUID } from "node:crypto"; +import { and, eq, sql } from "drizzle-orm"; import { db } from "@/db"; import { workQueue } from "@/db/schema"; import type { WorkQueue } from "@/db/types"; +import { inngest } from "@/lib/inngest/client"; +import { inngestEvents } from "@/lib/inngest/events"; + +export const WORK_QUEUE_MAX_ATTEMPTS = 3; +export const WORK_QUEUE_LEASE_DURATION_MS = 2 * 60 * 1000; + +export type WorkItemResult = { + id: string; + attempt: number; + status: "completed" | "failed"; + error?: string; +}; + +export type ActiveWorkItem = { + id: string; + attempt: number; +}; + +export type LeasedWorkItem = { + id: string; + type: WorkQueue["type"]; + payload: string; + attempt: number; +}; + +export type RejectedWorkItemResult = { + id: string; + reason: string; +}; + +export type RejectedActiveWorkItem = { + id: string; + reason: string; +}; export async function enqueueWork( serverId: string, @@ -15,3 +50,187 @@ export async function enqueueWork( payload: JSON.stringify(payload), }); } + +export async function completeWorkItemResults( + serverId: string, + results: WorkItemResult[], +): Promise<{ + accepted: string[]; + rejected: RejectedWorkItemResult[]; +}> { + const accepted: string[] = []; + const rejected: RejectedWorkItemResult[] = []; + + for (const result of results) { + const updated = await db + .update(workQueue) + .set({ status: result.status }) + .where( + and( + eq(workQueue.id, result.id), + eq(workQueue.serverId, serverId), + eq(workQueue.status, "processing"), + eq(workQueue.attempts, result.attempt), + ), + ) + .returning(); + + if (updated.length === 0) { + rejected.push({ + id: result.id, + reason: await getRejectionReason(serverId, result.id, result.attempt), + }); + continue; + } + + accepted.push(result.id); + await runWorkItemCompletionSideEffects(updated[0], result); + } + + return { accepted, rejected }; +} + +export async function renewActiveWorkItems( + serverId: string, + items: ActiveWorkItem[], +): Promise { + if (items.length === 0) return []; + + const rejected: RejectedActiveWorkItem[] = []; + + for (const item of items) { + const updated = await db + .update(workQueue) + .set({ startedAt: new Date() }) + .where( + and( + eq(workQueue.id, item.id), + eq(workQueue.serverId, serverId), + eq(workQueue.status, "processing"), + eq(workQueue.attempts, item.attempt), + ), + ) + .returning({ id: workQueue.id }); + + if (updated.length === 0) { + rejected.push({ + id: item.id, + reason: await getRejectionReason(serverId, item.id, item.attempt), + }); + } + } + + return rejected; +} + +export async function claimNextWorkItem( + serverId: string, +): Promise { + const staleThreshold = new Date(Date.now() - WORK_QUEUE_LEASE_DURATION_MS); + + const result = await db.execute(sql` + UPDATE work_queue + SET + status = 'processing', + started_at = NOW(), + attempts = attempts + 1 + WHERE id = ( + SELECT id + FROM work_queue + WHERE server_id = ${serverId} + AND ( + status = 'pending' + OR ( + status = 'processing' + AND started_at < ${staleThreshold} + AND attempts < ${WORK_QUEUE_MAX_ATTEMPTS} + ) + ) + ORDER BY created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + RETURNING id, type, payload, attempts + `); + + const rows = result.rows as Array<{ + id: string; + type: WorkQueue["type"]; + payload: string; + attempts: number; + }>; + + const row = rows[0]; + if (!row) return null; + + return { + id: row.id, + type: row.type, + payload: row.payload, + attempt: row.attempts, + }; +} + +async function getRejectionReason( + serverId: string, + id: string, + attempt: number, +): Promise { + const item = await db + .select({ + serverId: workQueue.serverId, + status: workQueue.status, + attempts: workQueue.attempts, + }) + .from(workQueue) + .where(eq(workQueue.id, id)) + .then((rows) => rows[0]); + + if (!item) return "not_found"; + if (item.serverId !== serverId) return "server_mismatch"; + if (item.status === "completed" || item.status === "failed") { + return "already_terminal"; + } + if (item.status !== "processing") return "not_processing"; + if (item.attempts !== attempt) return "attempt_mismatch"; + return "unknown"; +} + +async function runWorkItemCompletionSideEffects( + item: WorkQueue, + result: WorkItemResult, +): Promise { + if (item.type !== "create_manifest" || !item.payload) { + return; + } + + try { + const payload = JSON.parse(item.payload) as { + serviceId?: string; + finalImageUri?: string; + buildGroupId?: string; + }; + + if (result.status === "completed") { + if (payload.serviceId && payload.finalImageUri) { + await inngest.send( + inngestEvents.manifestCompleted.create({ + serviceId: payload.serviceId, + buildGroupId: payload.buildGroupId || "", + imageUri: payload.finalImageUri, + }), + ); + } + } else if (payload.serviceId) { + await inngest.send( + inngestEvents.manifestFailed.create({ + serviceId: payload.serviceId, + buildGroupId: payload.buildGroupId || "", + error: result.error || "Manifest creation failed", + }), + ); + } + } catch (error) { + console.error("[work-queue] failed to run completion side effects:", error); + } +} diff --git a/web/public/robots.txt b/web/public/robots.txt new file mode 100644 index 0000000..1f53798 --- /dev/null +++ b/web/public/robots.txt @@ -0,0 +1,2 @@ +User-agent: * +Disallow: /