Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ Versioning: [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### Changed

- Remediation now updates live. The Remediation tab and the compliance score
refresh automatically when a queued fix or rollback finishes, over the SSE
event stream (new `remediation.completed` topic), instead of requiring a
manual page refresh.

- CI release safety: the release workflow now fails closed on a `v*` tag push
when no GPG signing key is configured, rather than publishing unsigned
packages. Manual `workflow_dispatch` trial builds stay permissive (warn +
Expand All @@ -24,6 +29,12 @@ Versioning: [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### Fixed

- Applying several fixes on the same host at once no longer fails the extra
ones. Concurrent remediations on a host now serialize: a fix whose host is
busy backs off and requeues (with a short delay, via a new delayed-visibility
column on the job queue) until the host is free, instead of colliding on the
per-host SSH guard and being marked failed.

- Documentation version drift: operator guides referenced `0.2.0-rc.5` while
`packaging/version.env` was `0.2.0-rc.10`; all guides now match.
- SPA static-delivery tests are self-contained (in-memory fixture) instead of
Expand Down
15 changes: 15 additions & 0 deletions frontend/src/hooks/useLiveEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export const ALL_TOPICS = [
'host.discovered',
'intelligence.event',
'scan.completed',
'remediation.completed',
] as const;

type Topic = (typeof ALL_TOPICS)[number];
Expand Down Expand Up @@ -141,6 +142,20 @@ export function useLiveEvents(options: UseLiveEventsOptions = {}) {
queryClient.invalidateQueries({ queryKey: ['host', hostId] });
}
},
// remediation.completed -> the Remediation tab + Compliance score update
// without a manual refresh. The worker publishes this when a queued
// execute/rollback reaches its terminal state (executed | failed |
// rolled_back); a committed execute also flips the rule to pass, so the
// host detail (compliance) is invalidated too.
'remediation.completed': (e) => {
const env = parseEnvelope(e);
if (!env) return;
const hostId = (env.payload?.HostID ?? env.payload?.host_id) as string | undefined;
if (hostId) {
queryClient.invalidateQueries({ queryKey: ['host', hostId, 'remediations'] });
queryClient.invalidateQueries({ queryKey: ['host', hostId] });
}
},
};

for (const k of topics) {
Expand Down
20 changes: 17 additions & 3 deletions frontend/tests/hooks/useLiveEvents.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,19 @@ beforeEach(() => {
});

// @ac AC-01
// AC-01: ALL_TOPICS exported as the closed set of 5 topics (v1.1.0
// adds scan.completed).
// AC-01: ALL_TOPICS exported as the closed set (v1.1.0 adds scan.completed;
// v1.2.0 adds remediation.completed).
test('frontend-live-events/AC-01 — ALL_TOPICS is the closed v1.0 set', () => {
const want = [
'host.changed',
'monitoring.band.changed',
'host.discovered',
'intelligence.event',
'scan.completed',
'remediation.completed',
];
expect([...ALL_TOPICS]).toEqual(want);
expect(ALL_TOPICS.length).toBe(5);
expect(ALL_TOPICS.length).toBe(6);
});

// Helper to mount the hook and return the stub + spies.
Expand Down Expand Up @@ -124,6 +125,19 @@ test('frontend-live-events/AC-02 — host.changed invalidates [hosts] + [host, i
expect(calls).toContainEqual(['host', 'h-aaa']);
});

// @ac AC-09
// AC-09: remediation.completed invalidates the host's remediations list (the
// Remediation tab updates without a manual refresh) and the host detail (a
// committed fix flips a rule to pass, moving the compliance score). The worker
// publishes HostID (Go field name).
test('frontend-live-events/AC-09 — remediation.completed invalidates [host, id, remediations] + [host, id]', () => {
const { es, spy } = mountHook();
es.fire('remediation.completed', { HostID: 'h-rem' });
const calls = spy.mock.calls.map((c) => c[0]?.queryKey);
expect(calls).toContainEqual(['host', 'h-rem', 'remediations']);
expect(calls).toContainEqual(['host', 'h-rem']);
});

// @ac AC-03
test('frontend-live-events/AC-03 — monitoring.band.changed invalidates [hosts] + [host, id]', () => {
const { es, spy } = mountHook();
Expand Down
24 changes: 24 additions & 0 deletions internal/db/migrations/0039_job_queue_available_at.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- Delayed-visibility for the job queue. A pending job becomes dequeuable only
-- at or after available_at, which defaults to now() — so every existing enqueue
-- path (scans, diagnostics, etc.) is immediately visible and unchanged. The
-- remediation worker sets a future available_at to back off and requeue a job
-- whose target host is already being remediated, so concurrent "Fix" clicks on
-- one host serialize (queue) instead of colliding on the per-host SSH guard and
-- failing.
--
-- Spec: specs/system/job-queue.spec.yaml.

-- +goose Up
ALTER TABLE job_queue ADD COLUMN available_at TIMESTAMPTZ NOT NULL DEFAULT now();

-- The dequeue hot path now filters and orders on availability. Replace the
-- status-only partial index so the claim query stays index-driven.
DROP INDEX IF EXISTS idx_job_queue_pending;
CREATE INDEX idx_job_queue_pending ON job_queue (available_at, created_at)
WHERE status = 'pending';

-- +goose Down
DROP INDEX IF EXISTS idx_job_queue_pending;
CREATE INDEX idx_job_queue_pending ON job_queue (created_at)
WHERE status = 'pending';
ALTER TABLE job_queue DROP COLUMN IF EXISTS available_at;
4 changes: 2 additions & 2 deletions internal/queue/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func Dequeue(ctx context.Context, pool *pgxpool.Pool) (*Job, context.Context, er
attempts = attempts + 1
WHERE id = (
SELECT id FROM job_queue
WHERE status = 'pending'
ORDER BY created_at
WHERE status = 'pending' AND available_at <= now()
ORDER BY available_at, created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
Expand Down
30 changes: 24 additions & 6 deletions internal/queue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,34 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/Hanalyx/openwatch/internal/correlation"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
)

// Enqueue persists a new job_queue row. The caller's ctx MUST carry a
// correlation_id; this is a programming-error guard per spec C-01. The
// row's correlation_id pins the job to the originating intent across
// Enqueue persists a new job_queue row, immediately dequeuable. The caller's
// ctx MUST carry a correlation_id; this is a programming-error guard per spec
// C-01. The row's correlation_id pins the job to the originating intent across
// the async boundary.
//
// Returns the inserted job's ID. The job is in "pending" status with
// attempts=0 until a worker claims it via Dequeue.
//
// Spec system-job-queue AC-01.
func Enqueue(ctx context.Context, pool *pgxpool.Pool, jobType string, payload any) (uuid.UUID, error) {
return EnqueueAfter(ctx, pool, jobType, payload, 0)
}

// EnqueueAfter is Enqueue with a delay: the job is not dequeuable until
// `delay` from now (available_at = now() + delay). A non-positive delay makes
// it immediately available, identical to Enqueue. Used to back off and requeue
// a job that can't run yet (e.g. the target host is busy) without a tight
// re-dequeue loop, since Dequeue skips not-yet-available rows.
//
// Spec system-job-queue AC-13 (delayed visibility).
func EnqueueAfter(ctx context.Context, pool *pgxpool.Pool, jobType string, payload any, delay time.Duration) (uuid.UUID, error) {
corrID, ok := correlation.From(ctx)
if !ok {
return uuid.Nil, ErrMissingCorrelation
Expand All @@ -44,10 +56,16 @@ func Enqueue(ctx context.Context, pool *pgxpool.Pool, jobType string, payload an
return uuid.Nil, fmt.Errorf("queue: uuid: %w", err)
}

// available_at is computed server-side (now() + interval) so it shares the
// database clock that Dequeue compares against.
secs := delay.Seconds()
if secs < 0 {
secs = 0
}
const stmt = `
INSERT INTO job_queue (id, job_type, payload, correlation_id, status, attempts)
VALUES ($1, $2, $3::jsonb, $4, 'pending', 0)`
if _, err := pool.Exec(ctx, stmt, id, jobType, payloadJSON, corrID); err != nil {
INSERT INTO job_queue (id, job_type, payload, correlation_id, status, attempts, available_at)
VALUES ($1, $2, $3::jsonb, $4, 'pending', 0, now() + make_interval(secs => $5))`
if _, err := pool.Exec(ctx, stmt, id, jobType, payloadJSON, corrID, secs); err != nil {
return uuid.Nil, fmt.Errorf("queue: insert: %w", err)
}
return id, nil
Expand Down
46 changes: 46 additions & 0 deletions internal/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,52 @@ func freshPool(t *testing.T) *pgxpool.Pool {
return pool
}

// @ac AC-13
// AC-13: EnqueueAfter delays a job's visibility. Dequeue skips a job whose
// available_at is in the future and claims it only once available_at <= now(),
// so a requeue-with-backoff does not busy-loop the drain.
func TestEnqueueAfter_DelaysVisibility(t *testing.T) {
t.Run("system-job-queue/AC-13", func(t *testing.T) {
pool := freshPool(t)
ctx := correlation.Set(context.Background(), "req-ac13-001")

future, err := EnqueueAfter(ctx, pool, "diagnostics.test_job", map[string]any{"k": "future"}, time.Hour)
if err != nil {
t.Fatalf("EnqueueAfter(future): %v", err)
}
now, err := EnqueueAfter(ctx, pool, "diagnostics.test_job", map[string]any{"k": "now"}, 0)
if err != nil {
t.Fatalf("EnqueueAfter(now): %v", err)
}

// Dequeue claims the immediately-available job, never the future one.
job, _, err := Dequeue(ctx, pool)
if err != nil {
t.Fatalf("Dequeue: %v", err)
}
if job.ID != now {
t.Fatalf("Dequeue claimed %s, want the immediately-available job %s", job.ID, now)
}
// The future job stays hidden.
if _, _, err := Dequeue(ctx, pool); !errors.Is(err, ErrNoJob) {
t.Fatalf("second Dequeue err = %v, want ErrNoJob (future job hidden)", err)
}

// Once its available_at passes, the future job becomes dequeuable.
if _, err := pool.Exec(ctx,
`UPDATE job_queue SET available_at = now() - interval '1 second' WHERE id = $1`, future); err != nil {
t.Fatalf("backdate: %v", err)
}
job2, _, err := Dequeue(ctx, pool)
if err != nil {
t.Fatalf("Dequeue after backdate: %v", err)
}
if job2.ID != future {
t.Errorf("Dequeue claimed %s, want the now-available job %s", job2.ID, future)
}
})
}

// @ac AC-01
// AC-01: Enqueue persists a job_queue row with the expected fields populated.
func TestEnqueue_PersistsRow(t *testing.T) {
Expand Down
26 changes: 26 additions & 0 deletions internal/remediation/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,32 @@ func (s *Service) MarkRolledBack(ctx context.Context, id uuid.UUID) (Request, er
return s.transition(ctx, id, StatusExecuted, StatusRolledBack)
}

// RevertToApproved transitions an 'executing' request back to 'approved'. The
// remediation worker calls this when it has marked a request executing but the
// host turned out to be busy (lost a race for the per-host guard): the request
// returns to approved so the requeued job can run it once the host frees up.
func (s *Service) RevertToApproved(ctx context.Context, id uuid.UUID) (Request, error) {
return s.transition(ctx, id, StatusExecuting, StatusApproved)
}

// HostHasExecuting reports whether the host already has a remediation request
// in the 'executing' state. The worker uses this to serialize per-host
// remediation: only one rule is applied on a host at a time (they share one
// SSH session via the executor's per-host guard), so a second concurrent
// request backs off and requeues instead of colliding.
func (s *Service) HostHasExecuting(ctx context.Context, hostID uuid.UUID) (bool, error) {
var exists bool
err := s.pool.QueryRow(ctx, `
SELECT EXISTS (
SELECT 1 FROM remediation_requests
WHERE host_id = $1 AND status = 'executing'
)`, hostID).Scan(&exists)
if err != nil {
return false, fmt.Errorf("remediation: host-executing check: %w", err)
}
return exists, nil
}

// transition performs a guarded fromState -> toState update under FOR UPDATE.
// Unlike review() it does not touch reviewed_by/reviewed_at — execution
// transitions are system-driven, not a human review.
Expand Down
49 changes: 49 additions & 0 deletions internal/remediation/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,52 @@ func TestProjectLift_AndOverlayNeverMutatesRuleState(t *testing.T) {
}
})
}

// @ac AC-08
// AC-08: per-host serialization primitives. HostHasExecuting reports whether a
// request is 'executing' on a host; RevertToApproved returns an 'executing'
// request to 'approved'. The worker uses these (with a backoff requeue via
// queue.EnqueueAfter) so a second concurrent execute on a busy host requeues
// instead of failing.
func TestSerializePrimitives(t *testing.T) {
t.Run("api-remediation/AC-08", func(t *testing.T) {
pool := freshPool(t)
ctx := context.Background()
user := seedUser(t, pool, "ser")
hostID := seedHost(t, pool, user)
svc := NewService(pool, fakeEmitter(&[]emitCall{}))

// Idle host: nothing executing.
if busy, err := svc.HostHasExecuting(ctx, hostID); err != nil || busy {
t.Fatalf("HostHasExecuting (idle) = %v, %v; want false", busy, err)
}

// Seed an executing request directly (the worker sets this via
// MarkExecuting; we seed it to test the primitives in isolation).
execID := uuid.Must(uuid.NewV7())
if _, err := pool.Exec(ctx,
`INSERT INTO remediation_requests (id, host_id, rule_id, status, requested_by)
VALUES ($1, $2, 'rule-x', 'executing', $3)`, execID, hostID, user); err != nil {
t.Fatalf("seed executing: %v", err)
}

// The host is now busy.
if busy, err := svc.HostHasExecuting(ctx, hostID); err != nil || !busy {
t.Errorf("HostHasExecuting (executing) = %v, %v; want true", busy, err)
}

// RevertToApproved returns it to approved and clears busy.
reverted, err := svc.RevertToApproved(ctx, execID)
if err != nil || reverted.Status != StatusApproved {
t.Errorf("RevertToApproved = %+v, %v; want approved", reverted, err)
}
if busy, _ := svc.HostHasExecuting(ctx, hostID); busy {
t.Errorf("HostHasExecuting after revert = true; want false")
}

// RevertToApproved on a non-executing request -> ErrWrongState.
if _, err := svc.RevertToApproved(ctx, execID); !errors.Is(err, ErrWrongState) {
t.Errorf("RevertToApproved (approved) err = %v, want ErrWrongState", err)
}
})
}
Loading
Loading