diff --git a/CHANGELOG.md b/CHANGELOG.md index a8750821..9c806687 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,11 @@ Versioning: [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Changed +- Remediation now updates live. The Remediation tab and the compliance score + refresh automatically when a queued fix or rollback finishes, over the SSE + event stream (new `remediation.completed` topic), instead of requiring a + manual page refresh. + - CI release safety: the release workflow now fails closed on a `v*` tag push when no GPG signing key is configured, rather than publishing unsigned packages. Manual `workflow_dispatch` trial builds stay permissive (warn + @@ -24,6 +29,12 @@ Versioning: [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Fixed +- Applying several fixes on the same host at once no longer fails the extra + ones. Concurrent remediations on a host now serialize: a fix whose host is + busy backs off and requeues (with a short delay, via a new delayed-visibility + column on the job queue) until the host is free, instead of colliding on the + per-host SSH guard and being marked failed. + - Documentation version drift: operator guides referenced `0.2.0-rc.5` while `packaging/version.env` was `0.2.0-rc.10`; all guides now match. - SPA static-delivery tests are self-contained (in-memory fixture) instead of diff --git a/frontend/src/hooks/useLiveEvents.ts b/frontend/src/hooks/useLiveEvents.ts index f2b009ff..b2ef292b 100644 --- a/frontend/src/hooks/useLiveEvents.ts +++ b/frontend/src/hooks/useLiveEvents.ts @@ -31,6 +31,7 @@ export const ALL_TOPICS = [ 'host.discovered', 'intelligence.event', 'scan.completed', + 'remediation.completed', ] as const; type Topic = (typeof ALL_TOPICS)[number]; @@ -141,6 +142,20 @@ export function useLiveEvents(options: UseLiveEventsOptions = {}) { queryClient.invalidateQueries({ queryKey: ['host', hostId] }); } }, + // remediation.completed -> the Remediation tab + Compliance score update + // without a manual refresh. The worker publishes this when a queued + // execute/rollback reaches its terminal state (executed | failed | + // rolled_back); a committed execute also flips the rule to pass, so the + // host detail (compliance) is invalidated too. + 'remediation.completed': (e) => { + const env = parseEnvelope(e); + if (!env) return; + const hostId = (env.payload?.HostID ?? env.payload?.host_id) as string | undefined; + if (hostId) { + queryClient.invalidateQueries({ queryKey: ['host', hostId, 'remediations'] }); + queryClient.invalidateQueries({ queryKey: ['host', hostId] }); + } + }, }; for (const k of topics) { diff --git a/frontend/tests/hooks/useLiveEvents.test.tsx b/frontend/tests/hooks/useLiveEvents.test.tsx index 6025dacf..68e0d4b1 100644 --- a/frontend/tests/hooks/useLiveEvents.test.tsx +++ b/frontend/tests/hooks/useLiveEvents.test.tsx @@ -83,8 +83,8 @@ beforeEach(() => { }); // @ac AC-01 -// AC-01: ALL_TOPICS exported as the closed set of 5 topics (v1.1.0 -// adds scan.completed). +// AC-01: ALL_TOPICS exported as the closed set (v1.1.0 adds scan.completed; +// v1.2.0 adds remediation.completed). test('frontend-live-events/AC-01 — ALL_TOPICS is the closed v1.0 set', () => { const want = [ 'host.changed', @@ -92,9 +92,10 @@ test('frontend-live-events/AC-01 — ALL_TOPICS is the closed v1.0 set', () => { 'host.discovered', 'intelligence.event', 'scan.completed', + 'remediation.completed', ]; expect([...ALL_TOPICS]).toEqual(want); - expect(ALL_TOPICS.length).toBe(5); + expect(ALL_TOPICS.length).toBe(6); }); // Helper to mount the hook and return the stub + spies. @@ -124,6 +125,19 @@ test('frontend-live-events/AC-02 — host.changed invalidates [hosts] + [host, i expect(calls).toContainEqual(['host', 'h-aaa']); }); +// @ac AC-09 +// AC-09: remediation.completed invalidates the host's remediations list (the +// Remediation tab updates without a manual refresh) and the host detail (a +// committed fix flips a rule to pass, moving the compliance score). The worker +// publishes HostID (Go field name). +test('frontend-live-events/AC-09 — remediation.completed invalidates [host, id, remediations] + [host, id]', () => { + const { es, spy } = mountHook(); + es.fire('remediation.completed', { HostID: 'h-rem' }); + const calls = spy.mock.calls.map((c) => c[0]?.queryKey); + expect(calls).toContainEqual(['host', 'h-rem', 'remediations']); + expect(calls).toContainEqual(['host', 'h-rem']); +}); + // @ac AC-03 test('frontend-live-events/AC-03 — monitoring.band.changed invalidates [hosts] + [host, id]', () => { const { es, spy } = mountHook(); diff --git a/internal/db/migrations/0039_job_queue_available_at.sql b/internal/db/migrations/0039_job_queue_available_at.sql new file mode 100644 index 00000000..c1033d83 --- /dev/null +++ b/internal/db/migrations/0039_job_queue_available_at.sql @@ -0,0 +1,24 @@ +-- Delayed-visibility for the job queue. A pending job becomes dequeuable only +-- at or after available_at, which defaults to now() — so every existing enqueue +-- path (scans, diagnostics, etc.) is immediately visible and unchanged. The +-- remediation worker sets a future available_at to back off and requeue a job +-- whose target host is already being remediated, so concurrent "Fix" clicks on +-- one host serialize (queue) instead of colliding on the per-host SSH guard and +-- failing. +-- +-- Spec: specs/system/job-queue.spec.yaml. + +-- +goose Up +ALTER TABLE job_queue ADD COLUMN available_at TIMESTAMPTZ NOT NULL DEFAULT now(); + +-- The dequeue hot path now filters and orders on availability. Replace the +-- status-only partial index so the claim query stays index-driven. +DROP INDEX IF EXISTS idx_job_queue_pending; +CREATE INDEX idx_job_queue_pending ON job_queue (available_at, created_at) + WHERE status = 'pending'; + +-- +goose Down +DROP INDEX IF EXISTS idx_job_queue_pending; +CREATE INDEX idx_job_queue_pending ON job_queue (created_at) + WHERE status = 'pending'; +ALTER TABLE job_queue DROP COLUMN IF EXISTS available_at; diff --git a/internal/queue/dequeue.go b/internal/queue/dequeue.go index 077d336c..7d0533e2 100644 --- a/internal/queue/dequeue.go +++ b/internal/queue/dequeue.go @@ -32,8 +32,8 @@ func Dequeue(ctx context.Context, pool *pgxpool.Pool) (*Job, context.Context, er attempts = attempts + 1 WHERE id = ( SELECT id FROM job_queue - WHERE status = 'pending' - ORDER BY created_at + WHERE status = 'pending' AND available_at <= now() + ORDER BY available_at, created_at FOR UPDATE SKIP LOCKED LIMIT 1 ) diff --git a/internal/queue/enqueue.go b/internal/queue/enqueue.go index a7bb5906..207d54fd 100644 --- a/internal/queue/enqueue.go +++ b/internal/queue/enqueue.go @@ -4,15 +4,16 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/Hanalyx/openwatch/internal/correlation" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" ) -// Enqueue persists a new job_queue row. The caller's ctx MUST carry a -// correlation_id; this is a programming-error guard per spec C-01. The -// row's correlation_id pins the job to the originating intent across +// Enqueue persists a new job_queue row, immediately dequeuable. The caller's +// ctx MUST carry a correlation_id; this is a programming-error guard per spec +// C-01. The row's correlation_id pins the job to the originating intent across // the async boundary. // // Returns the inserted job's ID. The job is in "pending" status with @@ -20,6 +21,17 @@ import ( // // Spec system-job-queue AC-01. func Enqueue(ctx context.Context, pool *pgxpool.Pool, jobType string, payload any) (uuid.UUID, error) { + return EnqueueAfter(ctx, pool, jobType, payload, 0) +} + +// EnqueueAfter is Enqueue with a delay: the job is not dequeuable until +// `delay` from now (available_at = now() + delay). A non-positive delay makes +// it immediately available, identical to Enqueue. Used to back off and requeue +// a job that can't run yet (e.g. the target host is busy) without a tight +// re-dequeue loop, since Dequeue skips not-yet-available rows. +// +// Spec system-job-queue AC-13 (delayed visibility). +func EnqueueAfter(ctx context.Context, pool *pgxpool.Pool, jobType string, payload any, delay time.Duration) (uuid.UUID, error) { corrID, ok := correlation.From(ctx) if !ok { return uuid.Nil, ErrMissingCorrelation @@ -44,10 +56,16 @@ func Enqueue(ctx context.Context, pool *pgxpool.Pool, jobType string, payload an return uuid.Nil, fmt.Errorf("queue: uuid: %w", err) } + // available_at is computed server-side (now() + interval) so it shares the + // database clock that Dequeue compares against. + secs := delay.Seconds() + if secs < 0 { + secs = 0 + } const stmt = ` - INSERT INTO job_queue (id, job_type, payload, correlation_id, status, attempts) - VALUES ($1, $2, $3::jsonb, $4, 'pending', 0)` - if _, err := pool.Exec(ctx, stmt, id, jobType, payloadJSON, corrID); err != nil { + INSERT INTO job_queue (id, job_type, payload, correlation_id, status, attempts, available_at) + VALUES ($1, $2, $3::jsonb, $4, 'pending', 0, now() + make_interval(secs => $5))` + if _, err := pool.Exec(ctx, stmt, id, jobType, payloadJSON, corrID, secs); err != nil { return uuid.Nil, fmt.Errorf("queue: insert: %w", err) } return id, nil diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index bff1a5d5..09b28cae 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -29,6 +29,52 @@ func freshPool(t *testing.T) *pgxpool.Pool { return pool } +// @ac AC-13 +// AC-13: EnqueueAfter delays a job's visibility. Dequeue skips a job whose +// available_at is in the future and claims it only once available_at <= now(), +// so a requeue-with-backoff does not busy-loop the drain. +func TestEnqueueAfter_DelaysVisibility(t *testing.T) { + t.Run("system-job-queue/AC-13", func(t *testing.T) { + pool := freshPool(t) + ctx := correlation.Set(context.Background(), "req-ac13-001") + + future, err := EnqueueAfter(ctx, pool, "diagnostics.test_job", map[string]any{"k": "future"}, time.Hour) + if err != nil { + t.Fatalf("EnqueueAfter(future): %v", err) + } + now, err := EnqueueAfter(ctx, pool, "diagnostics.test_job", map[string]any{"k": "now"}, 0) + if err != nil { + t.Fatalf("EnqueueAfter(now): %v", err) + } + + // Dequeue claims the immediately-available job, never the future one. + job, _, err := Dequeue(ctx, pool) + if err != nil { + t.Fatalf("Dequeue: %v", err) + } + if job.ID != now { + t.Fatalf("Dequeue claimed %s, want the immediately-available job %s", job.ID, now) + } + // The future job stays hidden. + if _, _, err := Dequeue(ctx, pool); !errors.Is(err, ErrNoJob) { + t.Fatalf("second Dequeue err = %v, want ErrNoJob (future job hidden)", err) + } + + // Once its available_at passes, the future job becomes dequeuable. + if _, err := pool.Exec(ctx, + `UPDATE job_queue SET available_at = now() - interval '1 second' WHERE id = $1`, future); err != nil { + t.Fatalf("backdate: %v", err) + } + job2, _, err := Dequeue(ctx, pool) + if err != nil { + t.Fatalf("Dequeue after backdate: %v", err) + } + if job2.ID != future { + t.Errorf("Dequeue claimed %s, want the now-available job %s", job2.ID, future) + } + }) +} + // @ac AC-01 // AC-01: Enqueue persists a job_queue row with the expected fields populated. func TestEnqueue_PersistsRow(t *testing.T) { diff --git a/internal/remediation/execution.go b/internal/remediation/execution.go index 440fcf5e..08ce4f72 100644 --- a/internal/remediation/execution.go +++ b/internal/remediation/execution.go @@ -32,6 +32,32 @@ func (s *Service) MarkRolledBack(ctx context.Context, id uuid.UUID) (Request, er return s.transition(ctx, id, StatusExecuted, StatusRolledBack) } +// RevertToApproved transitions an 'executing' request back to 'approved'. The +// remediation worker calls this when it has marked a request executing but the +// host turned out to be busy (lost a race for the per-host guard): the request +// returns to approved so the requeued job can run it once the host frees up. +func (s *Service) RevertToApproved(ctx context.Context, id uuid.UUID) (Request, error) { + return s.transition(ctx, id, StatusExecuting, StatusApproved) +} + +// HostHasExecuting reports whether the host already has a remediation request +// in the 'executing' state. The worker uses this to serialize per-host +// remediation: only one rule is applied on a host at a time (they share one +// SSH session via the executor's per-host guard), so a second concurrent +// request backs off and requeues instead of colliding. +func (s *Service) HostHasExecuting(ctx context.Context, hostID uuid.UUID) (bool, error) { + var exists bool + err := s.pool.QueryRow(ctx, ` + SELECT EXISTS ( + SELECT 1 FROM remediation_requests + WHERE host_id = $1 AND status = 'executing' + )`, hostID).Scan(&exists) + if err != nil { + return false, fmt.Errorf("remediation: host-executing check: %w", err) + } + return exists, nil +} + // transition performs a guarded fromState -> toState update under FOR UPDATE. // Unlike review() it does not touch reviewed_by/reviewed_at — execution // transitions are system-driven, not a human review. diff --git a/internal/remediation/service_test.go b/internal/remediation/service_test.go index 5c04ca1c..1e01b55b 100644 --- a/internal/remediation/service_test.go +++ b/internal/remediation/service_test.go @@ -283,3 +283,52 @@ func TestProjectLift_AndOverlayNeverMutatesRuleState(t *testing.T) { } }) } + +// @ac AC-08 +// AC-08: per-host serialization primitives. HostHasExecuting reports whether a +// request is 'executing' on a host; RevertToApproved returns an 'executing' +// request to 'approved'. The worker uses these (with a backoff requeue via +// queue.EnqueueAfter) so a second concurrent execute on a busy host requeues +// instead of failing. +func TestSerializePrimitives(t *testing.T) { + t.Run("api-remediation/AC-08", func(t *testing.T) { + pool := freshPool(t) + ctx := context.Background() + user := seedUser(t, pool, "ser") + hostID := seedHost(t, pool, user) + svc := NewService(pool, fakeEmitter(&[]emitCall{})) + + // Idle host: nothing executing. + if busy, err := svc.HostHasExecuting(ctx, hostID); err != nil || busy { + t.Fatalf("HostHasExecuting (idle) = %v, %v; want false", busy, err) + } + + // Seed an executing request directly (the worker sets this via + // MarkExecuting; we seed it to test the primitives in isolation). + execID := uuid.Must(uuid.NewV7()) + if _, err := pool.Exec(ctx, + `INSERT INTO remediation_requests (id, host_id, rule_id, status, requested_by) + VALUES ($1, $2, 'rule-x', 'executing', $3)`, execID, hostID, user); err != nil { + t.Fatalf("seed executing: %v", err) + } + + // The host is now busy. + if busy, err := svc.HostHasExecuting(ctx, hostID); err != nil || !busy { + t.Errorf("HostHasExecuting (executing) = %v, %v; want true", busy, err) + } + + // RevertToApproved returns it to approved and clears busy. + reverted, err := svc.RevertToApproved(ctx, execID) + if err != nil || reverted.Status != StatusApproved { + t.Errorf("RevertToApproved = %+v, %v; want approved", reverted, err) + } + if busy, _ := svc.HostHasExecuting(ctx, hostID); busy { + t.Errorf("HostHasExecuting after revert = true; want false") + } + + // RevertToApproved on a non-executing request -> ErrWrongState. + if _, err := svc.RevertToApproved(ctx, execID); !errors.Is(err, ErrWrongState) { + t.Errorf("RevertToApproved (approved) err = %v, want ErrWrongState", err) + } + }) +} diff --git a/internal/worker/remediation_worker.go b/internal/worker/remediation_worker.go index 60a4e624..59e0a23e 100644 --- a/internal/worker/remediation_worker.go +++ b/internal/worker/remediation_worker.go @@ -136,8 +136,25 @@ func (w *RemediationWorker) ProcessJob(ctx context.Context, j *queue.Job) { } } -// processExecute drives approved -> executing -> executed|failed. +// remediationBusyBackoff is how long a remediation job waits before being +// retried when its target host is busy with another remediation. Only one rule +// is applied on a host at a time (they share a single SSH session via the +// executor's per-host guard), so concurrent "Fix" clicks serialize instead of +// failing. Kept short — the running remediation, not the wait, is the bottleneck. +const remediationBusyBackoff = 3 * time.Second + +// processExecute drives approved -> executing -> executed|failed. When the host +// is already being remediated, it backs off and requeues (serialize per host) +// instead of failing the request. func (w *RemediationWorker) processExecute(ctx context.Context, j *queue.Job, p RemediationPayload) { + // Serialize per host: if another remediation is already executing on this + // host, requeue with a backoff rather than colliding on the per-host SSH + // guard (which would fail this request). The request stays 'approved'. + if busy, err := w.svc.HostHasExecuting(ctx, p.HostID); err == nil && busy { + w.requeueBusy(ctx, j, p) + return + } + // Guard + transition approved -> executing (row-locked). A duplicate // enqueue or a request not in 'approved' fails here without touching the // host. @@ -158,7 +175,19 @@ func (w *RemediationWorker) processExecute(ctx context.Context, j *queue.Job, p // executor owns the per-host concurrency guard. result, remErr := w.executor.Remediate(ctx, p.HostID, p.RuleID) if remErr != nil { - // Host-side failure: record an empty journal + transition to failed. + // A lost race for the per-host guard is TRANSIENT, not a host-side + // failure: revert executing -> approved and requeue so it retries once + // the host frees, rather than marking the request failed. + if errors.Is(remErr, kensa.ErrHostBusy) { + if _, rerr := w.svc.RevertToApproved(ctx, p.RequestID); rerr != nil { + slog.WarnContext(ctx, "remediation revert-to-approved failed", + slog.String("request_id", p.RequestID.String()), + slog.String("error", rerr.Error())) + } + w.requeueBusy(ctx, j, p) + return + } + // Real host-side failure: record an empty journal + transition to failed. w.finishExecute(ctx, j, rq, p, nil, false) slog.WarnContext(ctx, "remediation execute failed on host", slog.String("request_id", p.RequestID.String()), @@ -172,6 +201,23 @@ func (w *RemediationWorker) processExecute(ctx context.Context, j *queue.Job, p w.finishExecute(ctx, j, rq, p, txns, committed) } +// requeueBusy completes the current job and re-enqueues the same signed action +// after remediationBusyBackoff, so a worker retries it once the host frees up. +// Dequeue skips the not-yet-available row, so this does not busy-loop the +// drain. A failure to re-enqueue falls back to failing the job (visible) rather +// than silently dropping the action. +func (w *RemediationWorker) requeueBusy(ctx context.Context, j *queue.Job, p RemediationPayload) { + body := MarshalRemediationJob(w.queueKey, p) + if _, err := queue.EnqueueAfter(ctx, w.pool, RemediationJobType, body, remediationBusyBackoff); err != nil { + slog.WarnContext(ctx, "remediation requeue (host busy) failed", + slog.String("request_id", p.RequestID.String()), + slog.String("error", err.Error())) + _ = queue.Fail(ctx, w.pool, j.ID, "requeue (host busy) failed: "+err.Error()) + return + } + _ = queue.Complete(ctx, w.pool, j.ID) +} + // finishExecute writes the journal, transitions to executed|failed, flips the // rule to pass on a committed execute, publishes + audits, completes the job. func (w *RemediationWorker) finishExecute(ctx context.Context, j *queue.Job, @@ -235,6 +281,13 @@ func (w *RemediationWorker) processRollback(ctx context.Context, j *queue.Job, p return } + // Serialize per host: a rollback shares the per-host SSH guard with execute, + // so if another remediation is executing on this host, back off and requeue. + if busy, herr := w.svc.HostHasExecuting(ctx, p.HostID); herr == nil && busy { + w.requeueBusy(ctx, j, p) + return + } + // Resolve the rollback handle: the payload's txn id, or the first // committed transaction recorded for the request. txnID := p.TxnID @@ -248,6 +301,12 @@ func (w *RemediationWorker) processRollback(ctx context.Context, j *queue.Job, p } res, rbErr := w.executor.Rollback(ctx, p.HostID, txnID) + // A lost race for the per-host guard is transient: requeue and retry rather + // than recording a failed rollback (the request stays 'executed'). + if errors.Is(rbErr, kensa.ErrHostBusy) { + w.requeueBusy(ctx, j, p) + return + } status := "failed" if rbErr == nil && res != nil { status = res.Status diff --git a/specs/api/remediation.spec.yaml b/specs/api/remediation.spec.yaml index ae042ff2..541ba7d0 100644 --- a/specs/api/remediation.spec.yaml +++ b/specs/api/remediation.spec.yaml @@ -148,3 +148,6 @@ spec: description: 'Worker execution path: a queued remediation job with a valid HMAC drives an approved request approved -> executing -> executed when the executor returns a committed transaction, writes a remediation_transactions journal row (kensa_txn_id, phase_result=committed), and flips the rule to pass in host_rule_state (the compliance score moves). A committed run that the executor reports as errored, or a host-side failure, transitions the request to failed and writes no flip. A job whose payload HMAC does not verify is dead-lettered (queue.Fail) with scheduler.job.hmac_rejected and no executor invocation.' priority: critical references_constraints: [C-08] + - id: AC-08 + description: 'Per-host remediation serialization: HostHasExecuting(host) reports whether a request is in the executing state on the host; RevertToApproved(id) returns an executing request to approved (and is ErrWrongState otherwise). The worker uses these plus a backoff requeue (queue.EnqueueAfter) so a second concurrent execute/rollback on a busy host requeues and retries once the host frees, instead of colliding on the per-host SSH guard and being marked failed.' + priority: high diff --git a/specs/frontend/live-events.spec.yaml b/specs/frontend/live-events.spec.yaml index 2cf21545..a66cd258 100644 --- a/specs/frontend/live-events.spec.yaml +++ b/specs/frontend/live-events.spec.yaml @@ -28,6 +28,7 @@ spec: invalidation; list page is NOT invalidated (intel events don't affect the list view) + - remediation.completed — remediations list + detail invalidation - scan.completed — list + detail invalidation (v1.1.0: compliance_summary on both views changes when a scan's @@ -65,7 +66,7 @@ spec: constraints: - id: C-01 - description: 'ALL_TOPICS MUST be a closed const-tuple containing exactly the five kinds: host.changed, monitoring.band.changed, host.discovered, intelligence.event, scan.completed (v1.1.0). Adding a sixth requires bumping this spec''s version and updating the test' + description: 'ALL_TOPICS MUST be a closed const-tuple containing exactly the six kinds: host.changed, monitoring.band.changed, host.discovered, intelligence.event, scan.completed, remediation.completed (v1.2.0). Adding another requires bumping this spec''s version and updating the test' type: technical enforcement: error - id: C-02 @@ -95,7 +96,7 @@ spec: acceptance_criteria: - id: AC-01 - description: 'ALL_TOPICS as exported from useLiveEvents.ts equals exactly the closed set ["host.changed", "monitoring.band.changed", "host.discovered", "intelligence.event", "scan.completed"]. Verified by importing the const and asserting the array contents + length' + description: 'ALL_TOPICS as exported from useLiveEvents.ts equals exactly the closed set ["host.changed", "monitoring.band.changed", "host.discovered", "intelligence.event", "scan.completed", "remediation.completed"]. Verified by importing the const and asserting the array contents + length' priority: critical references_constraints: [C-01] - id: AC-02 @@ -126,3 +127,7 @@ spec: description: 'Firing scan.completed with host_id=H invalidates ["hosts"] AND ["host", H] — the hero compliance card refreshes without a reload after a scan finishes' priority: critical references_constraints: [C-07] + - id: AC-09 + description: 'Firing remediation.completed with HostID=H invalidates ["host", H, "remediations"] AND ["host", H] — the Remediation tab and the compliance score refresh without a reload when a queued fix or rollback finishes (a committed fix flips a rule to pass)' + priority: high + references_constraints: [C-07] diff --git a/specs/system/job-queue.spec.yaml b/specs/system/job-queue.spec.yaml index 1debe87f..95ad930e 100644 --- a/specs/system/job-queue.spec.yaml +++ b/specs/system/job-queue.spec.yaml @@ -128,3 +128,6 @@ spec: the default-1 worker stays strictly serial. priority: critical references_constraints: [C-07] + - id: AC-13 + description: 'EnqueueAfter(delay) sets a future available_at; Dequeue does not claim a job before its available_at and claims it once available_at <= now() (ORDER BY available_at, created_at). Enqueue (delay 0) is immediately available, so existing producers are unchanged. This backs a requeue-with-backoff without busy-looping the drain.' + priority: high