From 9f662bf08a169b873ea084e653eb43a29c533af7 Mon Sep 17 00:00:00 2001 From: David Porter Date: Mon, 15 Jun 2026 23:53:36 -0700 Subject: [PATCH 1/4] scheduler: fix nil LastCompletionResult after migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CreateSchedulerFromMigration stored whatever LastCompletionResult the migrated V1 state carried, which is nil when the schedule had no prior completion (e.g. migrated before its first action) — convertLastCompletionLegacyToCHASM returns nil in that case. InvokerExecuteTaskHandler.startWorkflow then dereferences lastCompletionState.Success/.Failure unconditionally, panicking on the first workflow start after migration. The normal CreateScheduler/NewScheduler path defaults to a non-nil empty &schedulerpb.LastCompletionResult{}, so it never hit this. Default to the same non-nil empty value in CreateSchedulerFromMigration. Found by the V2->V1->V2 migration round-trip test. Co-Authored-By: Claude Opus 4.8 (1M context) (cherry picked from commit 23ad378b19a638df2603c47c0e3a806e9877bf2e) --- chasm/lib/scheduler/scheduler.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/chasm/lib/scheduler/scheduler.go b/chasm/lib/scheduler/scheduler.go index 4514b6af17..5e281f5cda 100644 --- a/chasm/lib/scheduler/scheduler.go +++ b/chasm/lib/scheduler/scheduler.go @@ -274,11 +274,19 @@ func CreateSchedulerFromMigration( ) (*Scheduler, error) { state := req.GetState() + // Default to a non-nil empty result, matching NewScheduler. Migrated state + // may carry no completion result (e.g. a schedule migrated before its first + // action), and startWorkflow dereferences LastCompletionResult unconditionally. + lastCompletion := state.GetLastCompletionResult() + if lastCompletion == nil { + lastCompletion = &schedulerpb.LastCompletionResult{} + } + sched := &Scheduler{ SchedulerState: state.GetSchedulerState(), cacheConflictToken: state.GetSchedulerState().GetConflictToken(), Backfillers: make(chasm.Map[string, *Backfiller]), - LastCompletionResult: chasm.NewDataField(ctx, state.GetLastCompletionResult()), + LastCompletionResult: chasm.NewDataField(ctx, lastCompletion), EventLog: chasm.NewComponentField(ctx, NewEventLog(ctx)), } sched.setNullableFields() From cd44159d98759fca2ab4cfe5ff4fda0bb6663c16 Mon Sep 17 00:00:00 2001 From: David Porter Date: Mon, 15 Jun 2026 23:53:29 -0700 Subject: [PATCH 2/4] chasm/chasmtest: add node/backend accessors and backend decorator Test drivers that step a component forward need to invoke node-level task dispatch (EachPureTask/ExecuteSideEffectTask) and inspect accumulated physical tasks, which the Engine API did not expose. Add: - NodeForRef / BackendForRef accessors on the in-memory Engine. - WithNodeBackendDecorator option, applied to each execution's MockNodeBackend at creation time, so callers can supply handlers the default leaves unset (e.g. HandleGetNamespaceEntry, or a constant transition counter). All additive and default-nil; existing chasmtest users are unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) (cherry picked from commit aeddc4227acc0ec1c8a8b6b0034efa0d4c6bc42b) --- chasm/chasmtest/test_engine.go | 41 ++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/chasm/chasmtest/test_engine.go b/chasm/chasmtest/test_engine.go index 3c6b122969..7f84709f9c 100644 --- a/chasm/chasmtest/test_engine.go +++ b/chasm/chasmtest/test_engine.go @@ -39,6 +39,10 @@ type ( // allExecutions maps (namespaceID, businessID, runID) to any run, for lookups by specific RunID. allExecutions map[runKey]*execution notifier *executionNotifier + // decorateBackend, if set, is applied to each execution's MockNodeBackend at + // creation time, before the tree is built. It lets callers supply handlers the + // default backend leaves unset (e.g. HandleGetNamespaceEntry). + decorateBackend func(*chasm.MockNodeBackend) } execution struct { @@ -72,6 +76,17 @@ func WithTimeSource(ts clock.TimeSource) EngineOption { } } +// WithNodeBackendDecorator registers a function applied to every execution's +// [chasm.MockNodeBackend] at creation time, before its tree is built. Use it to +// supply backend handlers the default leaves unset — most commonly +// HandleGetNamespaceEntry, which components that read namespace-scoped config +// (e.g. the scheduler's Tweakables) require. +func WithNodeBackendDecorator(fn func(*chasm.MockNodeBackend)) EngineOption { + return func(e *Engine) { + e.decorateBackend = fn + } +} + var defaultTransitionOptions = chasm.TransitionOptions{ ReusePolicy: chasm.BusinessIDReusePolicyAllowDuplicate, ConflictPolicy: chasm.BusinessIDConflictPolicyFail, @@ -559,6 +574,9 @@ func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { return changed, nil }, } + if e.decorateBackend != nil { + e.decorateBackend(backend) + } return &execution{ key: key, backend: backend, @@ -573,6 +591,29 @@ func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { } } +// NodeForRef returns the CHASM tree node backing the execution identified by ref. +// It is intended for test drivers that need to invoke node-level task dispatch +// ([chasm.Node.EachPureTask], [chasm.Node.ExecuteSideEffectTask]) or close +// transactions directly, which the [Engine] API does not expose. +func (e *Engine) NodeForRef(ref chasm.ComponentRef) (*chasm.Node, error) { + exec, err := e.executionForRef(ref) + if err != nil { + return nil, err + } + return exec.node, nil +} + +// BackendForRef returns the [chasm.MockNodeBackend] backing the execution +// identified by ref. Test drivers use it to inspect the physical tasks +// ([chasm.MockNodeBackend.TasksByCategory]) the engine has accumulated. +func (e *Engine) BackendForRef(ref chasm.ComponentRef) (*chasm.MockNodeBackend, error) { + exec, err := e.executionForRef(ref) + if err != nil { + return nil, err + } + return exec.backend, nil +} + // executionForRef looks up an execution by the ref's RunID when present, or falls back // to the current run for the business ID when RunID is empty. func (e *Engine) executionForRef(ref chasm.ComponentRef) (*execution, error) { From f94564bb48baf14555d8e27289a5dd54066d389f Mon Sep 17 00:00:00 2001 From: David Porter Date: Mon, 15 Jun 2026 23:53:56 -0700 Subject: [PATCH 3/4] schedulertest: scheduler lifecycle driver harness Add a harness that builds a real CHASM scheduler on a chasmtest.Engine and steps it forward through every task it schedules for itself, advancing a controllable clock. Pure tasks dispatch via Node.EachPureTask and side-effect tasks via Node.ExecuteSideEffectTask (the production dispatch paths). The driver exposes Step/RunToQuiescence, Pause/Unpause/TriggerImmediately/MigrateToWorkflow operations, a Snapshot of observable state, and an AfterStep hook. This lives in a non-test package so property and migration tests can import it. library.go mirrors the in-package fixtures (NewTestLibrary, DefaultSchedule, ...) but exported and wired with the mock clients the side-effect handlers need. Smoke tests cover the three settled behaviors: a running interval schedule ticks forever, a paused schedule is held open and keeps ticking, and a schedule with no remaining actions idles then closes. Co-Authored-By: Claude Opus 4.8 (1M context) (cherry picked from commit 77342fdbbdf5ef7fadaa16553a87c496edb2db81) --- chasm/lib/scheduler/schedulertest/driver.go | 522 ++++++++++++++++++ .../schedulertest/driver_smoke_test.go | 72 +++ chasm/lib/scheduler/schedulertest/library.go | 163 ++++++ 3 files changed, 757 insertions(+) create mode 100644 chasm/lib/scheduler/schedulertest/driver.go create mode 100644 chasm/lib/scheduler/schedulertest/driver_smoke_test.go create mode 100644 chasm/lib/scheduler/schedulertest/library.go diff --git a/chasm/lib/scheduler/schedulertest/driver.go b/chasm/lib/scheduler/schedulertest/driver.go new file mode 100644 index 0000000000..053bce7116 --- /dev/null +++ b/chasm/lib/scheduler/schedulertest/driver.go @@ -0,0 +1,522 @@ +package schedulertest + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + enumspb "go.temporal.io/api/enums/v1" + schedulepb "go.temporal.io/api/schedule/v1" + "go.temporal.io/api/serviceerror" + workflowpb "go.temporal.io/api/workflow/v1" + "go.temporal.io/api/workflowservice/v1" + historyservice "go.temporal.io/server/api/historyservice/v1" + historyservicemock "go.temporal.io/server/api/historyservicemock/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/chasm" + "go.temporal.io/server/chasm/chasmtest" + "go.temporal.io/server/chasm/lib/scheduler" + schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/testing/mockapi/workflowservicemock/v1" + "go.temporal.io/server/common/testing/testlogger" + "go.temporal.io/server/common/testing/testvars" + "go.temporal.io/server/service/history/tasks" + "go.uber.org/mock/gomock" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// maxKeyFireTime is the sentinel returned by MockNodeBackend.LastDeletePureTaskCall +// when the tree has no pending pure task. It matches the value passed by +// Node.closeTransactionGeneratePhysicalPureTask in that case. +var maxKeyFireTime = tasks.MaximumKey.FireTime + +// Driver builds a real CHASM scheduler on top of a [chasmtest.Engine] and steps +// it forward through every task it schedules for itself, advancing a controllable +// clock. After each settled transition it invokes AfterStep (if set), which is +// where invariant assertions are plugged in. +// +// A "step" fires every pure task due at the current reference time (via +// [chasm.Node.EachPureTask]) and every side-effect task whose visibility time has +// arrived (via [chasm.Node.ExecuteSideEffectTask]), then advances the clock to the +// next scheduled task. A healthy interval schedule never truly quiesces (it always +// re-arms a generator task), so callers bound the run with maxSteps; quiescence is +// reached only for paused/exhausted/closed schedules. +type Driver struct { + T *testing.T + Engine *chasmtest.Engine + Registry *chasm.Registry + TimeSource *clock.EventTimeSource + Logger log.Logger + + Frontend *workflowservicemock.MockWorkflowServiceClient + History *historyservicemock.MockHistoryServiceClient + + ref chasm.ComponentRef + execKey chasm.ExecutionKey + + // dispatchedSideEffects tracks physical side-effect tasks already executed, + // keyed by pointer identity, since MockNodeBackend accumulates tasks append-only. + dispatchedSideEffects map[*tasks.ChasmTask]struct{} + + // AfterStep, if set, is called after every step's CloseTransaction with the + // driver, so invariant checks can observe each settled state. + AfterStep func(d *Driver) +} + +type driverConfig struct { + schedule *schedulepb.Schedule + startTime time.Time + afterStep func(d *Driver) +} + +// DriverOption configures a Driver at construction. +type DriverOption func(*driverConfig) + +// WithSchedule overrides the schedule definition (default: DefaultSchedule). +func WithSchedule(s *schedulepb.Schedule) DriverOption { + return func(c *driverConfig) { c.schedule = s } +} + +// WithStartTime sets the wall-clock instant the engine's clock starts at. +func WithStartTime(t time.Time) DriverOption { + return func(c *driverConfig) { c.startTime = t } +} + +// WithAfterStep installs a hook called after every step. +func WithAfterStep(fn func(d *Driver)) DriverOption { + return func(c *driverConfig) { c.afterStep = fn } +} + +// NewDriver constructs a Driver and starts a scheduler execution via the +// production CreateScheduler path. startTime defaults to a fixed instant for +// determinism. +func NewDriver(t *testing.T, opts ...DriverOption) *Driver { + t.Helper() + d, cfg := newDriverScaffold(t, opts...) + + req := &schedulerpb.CreateScheduleRequest{ + NamespaceId: NamespaceID, + FrontendRequest: &workflowservice.CreateScheduleRequest{ + Namespace: Namespace, + ScheduleId: ScheduleID, + Schedule: cfg.schedule, + RequestId: "create-req", + }, + } + d.startExecution(func(mctx chasm.MutableContext) (*scheduler.Scheduler, error) { + return scheduler.CreateScheduler(mctx, req) + }) + return d +} + +// NewDriverFromMigration constructs a Driver and starts a scheduler execution +// from a migrated V1 state via the production CreateSchedulerFromMigration path. +// This is the V1->V2 import side of a migration round trip. +func NewDriverFromMigration(t *testing.T, req *schedulerpb.CreateFromMigrationStateRequest, opts ...DriverOption) *Driver { + t.Helper() + d, _ := newDriverScaffold(t, opts...) + d.startExecution(func(mctx chasm.MutableContext) (*scheduler.Scheduler, error) { + return scheduler.CreateSchedulerFromMigration(mctx, req) + }) + return d +} + +// newDriverScaffold builds the engine, registry, clients, and time source, but +// does not start an execution. +func newDriverScaffold(t *testing.T, opts ...DriverOption) (*Driver, *driverConfig) { + t.Helper() + + cfg := &driverConfig{ + schedule: DefaultSchedule(), + startTime: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + } + for _, opt := range opts { + opt(cfg) + } + + ctrl := gomock.NewController(t) + logger := testlogger.NewTestLogger(t, testlogger.FailOnExpectedErrorOnly) + + ts := clock.NewEventTimeSource() + ts.Update(cfg.startTime) + + frontend := workflowservicemock.NewMockWorkflowServiceClient(ctrl) + history := historyservicemock.NewMockHistoryServiceClient(ctrl) + // Default happy-path stubs; tests can layer more specific expectations. + frontend.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()). + Return(&workflowservice.StartWorkflowExecutionResponse{RunId: "run-id"}, nil).AnyTimes() + // The callbacks task (used for migrated running workflows) describes each + // running workflow; report it completed so watchers resolve deterministically. + history.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, _ *historyservice.DescribeWorkflowExecutionRequest, _ ...grpc.CallOption) (*historyservice.DescribeWorkflowExecutionResponse, error) { + return &historyservice.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &workflowpb.WorkflowExecutionInfo{ + Status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + CloseTime: timestamppb.New(ts.Now()), + }, + }, nil + }).AnyTimes() + + specProcessor := NewRealSpecProcessor(ctrl, logger) + + registry := chasm.NewRegistry(logger) + if err := registry.Register(&chasm.CoreLibrary{}); err != nil { + t.Fatalf("failed to register core library: %v", err) + } + if err := registry.Register(NewTestLibrary(DefaultConfig(), logger, specProcessor, frontend, history)); err != nil { + t.Fatalf("failed to register scheduler library: %v", err) + } + + // Scheduler components read namespace-scoped Tweakables via the CHASM context, + // so each execution's backend must answer GetNamespaceEntry. The transition + // counter is held constant (matching helper_test.go); the chasmtest engine's + // default incrementing counter prevents re-armed pure tasks from materializing. + tv := testvars.New(t) + engine := chasmtest.NewEngine(t, registry, + chasmtest.WithTimeSource(ts), + chasmtest.WithNodeBackendDecorator(func(b *chasm.MockNodeBackend) { + b.HandleGetNamespaceEntry = tv.Namespace + b.HandleNextTransitionCount = func() int64 { return 2 } + b.HandleCurrentVersionedTransition = func() *persistencespb.VersionedTransition { + return &persistencespb.VersionedTransition{NamespaceFailoverVersion: 1, TransitionCount: 1} + } + }), + ) + + d := &Driver{ + T: t, + Engine: engine, + Registry: registry, + TimeSource: ts, + Logger: logger, + Frontend: frontend, + History: history, + dispatchedSideEffects: make(map[*tasks.ChasmTask]struct{}), + AfterStep: cfg.afterStep, + } + return d, cfg +} + +// startExecution starts the scheduler execution using the given start function. +func (d *Driver) startExecution(startFn func(chasm.MutableContext) (*scheduler.Scheduler, error)) { + d.T.Helper() + + d.execKey = chasm.ExecutionKey{NamespaceID: NamespaceID, BusinessID: ScheduleID} + engineCtx := chasm.NewEngineContext(context.Background(), d.Engine) + + _, err := chasm.StartExecution(engineCtx, d.execKey, func(mctx chasm.MutableContext, _ *struct{}) (*scheduler.Scheduler, error) { + return startFn(mctx) + }, (*struct{})(nil), chasm.WithRequestID("create-req")) + if err != nil { + d.T.Fatalf("failed to start scheduler execution: %v", err) + } + + d.ref = chasm.NewComponentRef[*scheduler.Scheduler](d.execKey) +} + +// Now returns the engine's current clock time. +func (d *Driver) Now() time.Time { + return d.TimeSource.Now() +} + +// ReadScheduler reads the scheduler root component and invokes fn with it. +func (d *Driver) ReadScheduler(fn func(s *scheduler.Scheduler, ctx chasm.Context)) { + d.T.Helper() + engineCtx := chasm.NewEngineContext(context.Background(), d.Engine) + err := d.Engine.ReadComponent(engineCtx, d.ref, func(ctx chasm.Context, c chasm.Component) error { + s, ok := c.(*scheduler.Scheduler) + if !ok { + d.T.Fatalf("root component is %T, want *scheduler.Scheduler", c) + } + fn(s, ctx) + return nil + }) + if err != nil { + d.T.Fatalf("ReadScheduler failed: %v", err) + } +} + +// node returns the CHASM tree node backing the scheduler execution. +func (d *Driver) node() *chasm.Node { + d.T.Helper() + node, err := d.Engine.NodeForRef(d.ref) + if err != nil { + d.T.Fatalf("NodeForRef failed: %v", err) + } + return node +} + +func (d *Driver) backend() *chasm.MockNodeBackend { + d.T.Helper() + backend, err := d.Engine.BackendForRef(d.ref) + if err != nil { + d.T.Fatalf("BackendForRef failed: %v", err) + } + return backend +} + +// nextFireTime returns the earliest time at which any pending task (pure or +// side-effect) is due, and whether any task is pending at all. +func (d *Driver) nextFireTime() (time.Time, bool) { + backend := d.backend() + + var next time.Time + have := false + consider := func(t time.Time) { + if !have || t.Before(next) { + next = t + have = true + } + } + + // Earliest pending pure task: recorded by closeTransactionGeneratePhysicalPureTask + // via DeleteCHASMPureTasks(earliestPureTaskTime), or MaximumKey.FireTime if none. + if pureTime := backend.LastDeletePureTaskCall(); !pureTime.Equal(maxKeyFireTime) { + consider(pureTime) + } + + // Earliest undispatched side-effect task. + for _, t := range d.pendingSideEffectTasks() { + consider(t.GetVisibilityTime()) + } + + return next, have +} + +// pendingSideEffectTasks returns physical side-effect tasks that have not yet +// been dispatched by this driver. +func (d *Driver) pendingSideEffectTasks() []*tasks.ChasmTask { + backend := d.backend() + var out []*tasks.ChasmTask + for category, categoryTasks := range backend.TasksByCategory { + // Visibility tasks are framework-managed (indexing search attributes) and + // processed by a separate queue, not the CHASM side-effect executor; their + // handler panics if invoked directly. Skip them. + if category.ID() == tasks.CategoryVisibility.ID() { + continue + } + for _, t := range categoryTasks { + ct, ok := t.(*tasks.ChasmTask) + if !ok { + continue + } + if _, done := d.dispatchedSideEffects[ct]; done { + continue + } + out = append(out, ct) + } + } + return out +} + +// Step advances the clock to the next scheduled task and fires all tasks due at +// that time. It returns false if nothing was pending (the schedule is quiescent). +func (d *Driver) Step() bool { + d.T.Helper() + + fireTime, have := d.nextFireTime() + if !have { + return false + } + + // Never move the clock backward; immediate tasks (zero scheduled time) fire now. + now := d.Now() + if fireTime.Before(now) { + fireTime = now + } + d.TimeSource.Update(fireTime) + + node := d.node() + + // Fire all pure tasks due at fireTime, then commit. + pureCtx := chasm.NewEngineContext(context.Background(), d.Engine) + err := node.EachPureTask(fireTime, func(executor chasm.NodePureTask, attrs chasm.TaskAttributes, taskInstance any) (bool, error) { + return executor.ExecutePureTask(pureCtx, attrs, taskInstance) + }) + if err != nil { + d.T.Fatalf("EachPureTask failed: %v", err) + } + if _, err := node.CloseTransaction(); err != nil { + d.T.Fatalf("CloseTransaction after pure tasks failed: %v", err) + } + + // Fire side-effect tasks now due (visibility <= fireTime), each exactly once. + engineCtx := chasm.NewEngineContext(context.Background(), d.Engine) + for _, ct := range d.pendingSideEffectTasks() { + if ct.GetVisibilityTime().After(fireTime) { + continue + } + d.dispatchedSideEffects[ct] = struct{}{} + execErr := node.ExecuteSideEffectTask( + engineCtx, + d.execKey, + ct, + func(chasm.NodeBackend, chasm.Context, chasm.Component) error { return nil }, + ) + if execErr != nil { + // A NotFound means the logical task was superseded/removed before + // dispatch (the backend accumulates physical tasks append-only, so a + // stale one can linger). Production drops these; so do we. + var notFound *serviceerror.NotFound + if errors.As(execErr, ¬Found) { + continue + } + d.T.Fatalf("ExecuteSideEffectTask failed: %v", execErr) + } + } + if _, err := node.CloseTransaction(); err != nil { + d.T.Fatalf("CloseTransaction after side-effect tasks failed: %v", err) + } + + if d.AfterStep != nil { + d.AfterStep(d) + } + return true +} + +// RunToQuiescence steps until no task is pending or maxSteps is reached. It +// returns the number of steps taken and whether the schedule reached quiescence +// (no pending task). A normal running schedule never quiesces and will stop at +// maxSteps with quiescent == false. +func (d *Driver) RunToQuiescence(maxSteps int) (steps int, quiescent bool) { + d.T.Helper() + for steps = 0; steps < maxSteps; steps++ { + if !d.Step() { + return steps, true + } + } + return steps, false +} + +// HasPendingTask reports whether any task (pure or side-effect) is currently +// scheduled to fire. +func (d *Driver) HasPendingTask() bool { + _, have := d.nextFireTime() + return have +} + +// Patch applies a SchedulePatch through the production Scheduler.Patch path in a +// single transaction. It returns the scheduler's error (e.g. ErrClosed when the +// schedule has already closed) without failing the test, so callers driving +// random operations can tolerate rejected patches. On success the AfterStep hook +// runs so invariants are checked at the new settled state. +func (d *Driver) Patch(patch *schedulepb.SchedulePatch) error { + d.T.Helper() + engineCtx := chasm.NewEngineContext(context.Background(), d.Engine) + _, err := d.Engine.UpdateComponent(engineCtx, d.ref, func(mctx chasm.MutableContext, c chasm.Component) error { + s, ok := c.(*scheduler.Scheduler) + if !ok { + return fmt.Errorf("root component is %T, want *scheduler.Scheduler", c) + } + _, perr := s.Patch(mctx, &schedulerpb.PatchScheduleRequest{ + NamespaceId: NamespaceID, + FrontendRequest: &workflowservice.PatchScheduleRequest{ + Namespace: Namespace, + ScheduleId: ScheduleID, + Patch: patch, + }, + }) + return perr + }) + if err == nil && d.AfterStep != nil { + d.AfterStep(d) + } + return err +} + +// Pause pauses the schedule via Patch. +func (d *Driver) Pause() error { + return d.Patch(&schedulepb.SchedulePatch{Pause: "paused by driver"}) +} + +// Unpause unpauses the schedule via Patch. +func (d *Driver) Unpause() error { + return d.Patch(&schedulepb.SchedulePatch{Unpause: "unpaused by driver"}) +} + +// TriggerImmediately requests an immediate action via Patch. +func (d *Driver) TriggerImmediately() error { + return d.Patch(&schedulepb.SchedulePatch{ + TriggerImmediately: &schedulepb.TriggerImmediatelyRequest{}, + }) +} + +// MigrateToWorkflow initiates a V2->V1 migration: it pauses the schedule and +// schedules the SchedulerMigrateToWorkflowTask side-effect task. Step the driver +// afterward to fire the task (which calls the history client's +// StartWorkflowExecution with the exported StartScheduleArgs). +func (d *Driver) MigrateToWorkflow() error { + d.T.Helper() + engineCtx := chasm.NewEngineContext(context.Background(), d.Engine) + _, err := d.Engine.UpdateComponent(engineCtx, d.ref, func(mctx chasm.MutableContext, c chasm.Component) error { + s, ok := c.(*scheduler.Scheduler) + if !ok { + return fmt.Errorf("root component is %T, want *scheduler.Scheduler", c) + } + _, merr := s.MigrateToWorkflow(mctx, &schedulerpb.MigrateToWorkflowRequest{ + NamespaceId: NamespaceID, + ScheduleId: ScheduleID, + }) + return merr + }) + return err +} + +// Snapshot captures the observable scheduler state at a settled point (after a +// step's CloseTransaction). It is the input to invariant checks. +type Snapshot struct { + // Now is the engine clock at capture time. + Now time.Time + // HasPendingTask is true if any task is scheduled to fire. + HasPendingTask bool + + Closed bool + IsSentinel bool + Paused bool + // BackfillerCount is the number of live backfiller components. + BackfillerCount int + // IsHeldOpen mirrors Scheduler.isHeldOpen: the schedule must stay open + // regardless of having no work (paused or pending backfill, non-sentinel). + IsHeldOpen bool + // IdleCloseTime is the armed idle-close deadline, or zero if no idle task is armed. + IdleCloseTime time.Time + + // GeneratorLPT / InvokerLPT are the generator and invoker high-water marks. + GeneratorLPT time.Time + InvokerLPT time.Time +} + +// Snapshot reads current scheduler state into a Snapshot. +func (d *Driver) Snapshot() Snapshot { + d.T.Helper() + snap := Snapshot{ + Now: d.Now(), + HasPendingTask: d.HasPendingTask(), + } + d.ReadScheduler(func(s *scheduler.Scheduler, ctx chasm.Context) { + snap.Closed = s.Closed + snap.IsSentinel = s.IsSentinel() + snap.Paused = s.Schedule.GetState().GetPaused() + snap.BackfillerCount = len(s.Backfillers) + // Mirror Scheduler.isHeldOpen() from public state. + snap.IsHeldOpen = !snap.IsSentinel && (snap.Paused || snap.BackfillerCount > 0) + if s.IdleCloseTime != nil { + snap.IdleCloseTime = s.IdleCloseTime.AsTime() + } + // Sentinels have no generator/invoker sub-components. + if !snap.IsSentinel { + if g := s.Generator.Get(ctx); g != nil && g.LastProcessedTime != nil { + snap.GeneratorLPT = g.LastProcessedTime.AsTime() + } + if inv := s.Invoker.Get(ctx); inv != nil && inv.LastProcessedTime != nil { + snap.InvokerLPT = inv.LastProcessedTime.AsTime() + } + } + }) + return snap +} diff --git a/chasm/lib/scheduler/schedulertest/driver_smoke_test.go b/chasm/lib/scheduler/schedulertest/driver_smoke_test.go new file mode 100644 index 0000000000..4f80dfa219 --- /dev/null +++ b/chasm/lib/scheduler/schedulertest/driver_smoke_test.go @@ -0,0 +1,72 @@ +package schedulertest_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.temporal.io/server/chasm" + "go.temporal.io/server/chasm/lib/scheduler" + "go.temporal.io/server/chasm/lib/scheduler/schedulertest" +) + +// TestDriver_RunsIntervalScheduleForward verifies the driver fires the +// scheduler's self-scheduled tasks and advances the clock across several +// interval ticks of a normal (running) schedule. +func TestDriver_RunsIntervalScheduleForward(t *testing.T) { + start := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) + d := schedulertest.NewDriver(t, schedulertest.WithStartTime(start)) + + steps, quiescent := d.RunToQuiescence(20) + + // A normal interval schedule re-arms a generator task forever, so it never + // quiesces; it should burn the whole step budget. + require.False(t, quiescent, "running interval schedule should not quiesce") + require.Equal(t, 20, steps) + + // The clock must have advanced as interval ticks fire. + require.True(t, d.Now().After(start.Add(schedulertest.DefaultInterval)), + "clock should advance past one interval, now=%s", d.Now()) + + d.ReadScheduler(func(s *scheduler.Scheduler, _ chasm.Context) { + require.False(t, s.Closed, "schedule should still be open") + }) +} + +// TestDriver_PausedScheduleKeepsTicking documents that a paused interval +// schedule is *held open* and keeps re-arming its generator task to advance the +// high-water mark — it neither quiesces nor closes — but it never buffers a +// start. +func TestDriver_PausedScheduleKeepsTicking(t *testing.T) { + sched := schedulertest.DefaultSchedule() + sched.State.Paused = true + + d := schedulertest.NewDriver(t, schedulertest.WithSchedule(sched)) + + _, quiescent := d.RunToQuiescence(10) + require.False(t, quiescent, "paused interval schedule keeps ticking, not quiescent") + + d.ReadScheduler(func(s *scheduler.Scheduler, _ chasm.Context) { + require.True(t, s.Schedule.State.Paused) + require.False(t, s.Closed, "paused schedule is held open, not closed") + }) +} + +// TestDriver_LimitedActionsExhaustedClosesAndQuiesces verifies that a schedule +// with no remaining actions arms an idle task instead of generating work, and +// after the idle task fires the schedule closes and the driver reaches +// quiescence. +func TestDriver_LimitedActionsExhaustedClosesAndQuiesces(t *testing.T) { + sched := schedulertest.DefaultSchedule() + sched.State.LimitedActions = true + sched.State.RemainingActions = 0 + + d := schedulertest.NewDriver(t, schedulertest.WithSchedule(sched)) + + _, quiescent := d.RunToQuiescence(10) + require.True(t, quiescent, "exhausted schedule should close and quiesce") + + d.ReadScheduler(func(s *scheduler.Scheduler, _ chasm.Context) { + require.True(t, s.Closed, "schedule should be closed after idle task fires") + }) +} diff --git a/chasm/lib/scheduler/schedulertest/library.go b/chasm/lib/scheduler/schedulertest/library.go new file mode 100644 index 0000000000..ce7f78e014 --- /dev/null +++ b/chasm/lib/scheduler/schedulertest/library.go @@ -0,0 +1,163 @@ +// Package schedulertest provides a lifecycle-driving harness for the CHASM +// scheduler. Unlike the per-handler unit tests in package scheduler_test, this +// package builds a real component on top of [chasmtest.Engine] and steps it +// forward through every task it schedules for itself, so that invariants can be +// asserted across sequences of transitions (e.g. "the scheduler never gets +// stuck without a task to drive the next state"). +// +// It deliberately lives in a non-test package so that property tests +// (pgregory.net/rapid) and the V1<->V2 migration round-trip test can import the +// driver. The helpers below mirror chasm/lib/scheduler/helper_test.go, but are +// exported and accept the mock clients the side-effect task handlers need. +package schedulertest + +import ( + "time" + + commonpb "go.temporal.io/api/common/v1" + schedulepb "go.temporal.io/api/schedule/v1" + "go.temporal.io/api/workflowservice/v1" + workflowpb "go.temporal.io/api/workflow/v1" + "go.temporal.io/server/chasm/lib/scheduler" + "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/searchattribute" + legacyscheduler "go.temporal.io/server/service/worker/scheduler" + "go.uber.org/mock/gomock" + "google.golang.org/protobuf/types/known/durationpb" +) + +const ( + Namespace = "ns" + NamespaceID = "ns-id" + ScheduleID = "sched-id" + + DefaultInterval = 1 * time.Minute + DefaultCatchupWindow = 5 * time.Minute +) + +// DefaultSchedule returns a schedule with a single 1-minute interval and a +// 5-minute catchup window, matching the package's other defaults. +func DefaultSchedule() *schedulepb.Schedule { + return &schedulepb.Schedule{ + Spec: &schedulepb.ScheduleSpec{ + Interval: []*schedulepb.IntervalSpec{ + { + Interval: durationpb.New(DefaultInterval), + Phase: durationpb.New(0), + }, + }, + }, + Action: &schedulepb.ScheduleAction{ + Action: &schedulepb.ScheduleAction_StartWorkflow{ + StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{ + WorkflowId: "scheduled-wf", + WorkflowType: &commonpb.WorkflowType{Name: "scheduled-wf-type"}, + }, + }, + }, + Policies: &schedulepb.SchedulePolicies{ + CatchupWindow: durationpb.New(DefaultCatchupWindow), + }, + State: &schedulepb.ScheduleState{ + Paused: false, + LimitedActions: false, + RemainingActions: 0, + }, + } +} + +// DefaultConfig returns a scheduler config wired with the package defaults. +func DefaultConfig() *scheduler.Config { + return &scheduler.Config{ + Tweakables: func(_ string) scheduler.Tweakables { + return scheduler.DefaultTweakables + }, + ServiceCallTimeout: func() time.Duration { + return 5 * time.Second + }, + EncodeInternalTokenWithEnvelope: func(string) bool { + return true + }, + RetryPolicy: func() backoff.RetryPolicy { + return backoff.NewExponentialRetryPolicy(1 * time.Second) + }, + } +} + +// NewRealSpecProcessor builds a real SpecProcessor with no-op metrics, matching +// production spec processing so generated buffered starts and next-times are +// realistic. +func NewRealSpecProcessor(ctrl *gomock.Controller, logger log.Logger) scheduler.SpecProcessor { + mockMetrics := metrics.NewMockHandler(ctrl) + mockMetrics.EXPECT().Counter(gomock.Any()).Return(metrics.NoopCounterMetricFunc).AnyTimes() + mockMetrics.EXPECT().WithTags(gomock.Any()).Return(mockMetrics).AnyTimes() + mockMetrics.EXPECT().Timer(gomock.Any()).Return(metrics.NoopTimerMetricFunc).AnyTimes() + + return scheduler.NewSpecProcessor( + DefaultConfig(), + mockMetrics, + logger, + legacyscheduler.NewSpecBuilder(), + ) +} + +// NewTestLibrary builds a scheduler Library with all task handlers wired to the +// supplied spec processor and mock clients. The clients back the side-effect +// task handlers (execute/callbacks/migrate); pass mocks whose StartWorkflow / +// Terminate / Cancel expectations the test controls. +func NewTestLibrary( + config *scheduler.Config, + logger log.Logger, + specProcessor scheduler.SpecProcessor, + frontendClient workflowservice.WorkflowServiceClient, + historyClient resource.HistoryClient, +) *scheduler.Library { + specBuilder := legacyscheduler.NewSpecBuilder() + invokerOpts := scheduler.InvokerTaskHandlerOptions{ + Config: config, + MetricsHandler: metrics.NoopMetricsHandler, + BaseLogger: logger, + SpecProcessor: specProcessor, + HistoryClient: historyClient, + FrontendClient: frontendClient, + } + return scheduler.NewLibrary( + config, + nil, + scheduler.NewSchedulerIdleTaskHandler(scheduler.SchedulerIdleTaskHandlerOptions{ + Config: config, + MetricsHandler: metrics.NoopMetricsHandler, + BaseLogger: logger, + }), + scheduler.NewSchedulerCallbacksTaskHandler(scheduler.SchedulerCallbacksTaskHandlerOptions{ + Config: config, + HistoryClient: historyClient, + FrontendClient: frontendClient, + }), + scheduler.NewGeneratorTaskHandler(scheduler.GeneratorTaskHandlerOptions{ + Config: config, + MetricsHandler: metrics.NoopMetricsHandler, + BaseLogger: logger, + SpecProcessor: specProcessor, + SpecBuilder: specBuilder, + }), + scheduler.NewInvokerExecuteTaskHandler(invokerOpts), + scheduler.NewInvokerProcessBufferTaskHandler(invokerOpts), + scheduler.NewBackfillerTaskHandler(scheduler.BackfillerTaskHandlerOptions{ + Config: config, + MetricsHandler: metrics.NoopMetricsHandler, + BaseLogger: logger, + SpecProcessor: specProcessor, + }), + scheduler.NewSchedulerMigrateToWorkflowTaskHandler(scheduler.SchedulerMigrateToWorkflowTaskHandlerOptions{ + Config: config, + MetricsHandler: metrics.NoopMetricsHandler, + BaseLogger: logger, + HistoryClient: historyClient, + SaMapperProvider: searchattribute.NewTestMapperProvider(nil), + }), + ) +} From 49c88ca39f8d293c7eec5bac262d6dde07b5d047 Mon Sep 17 00:00:00 2001 From: David Porter Date: Mon, 15 Jun 2026 23:54:04 -0700 Subject: [PATCH 4/4] schedulertest: scheduler invariants and deterministic tables Add CheckInvariants, evaluated after every settled transition via an AfterStep hook, encoding the legitimate-quiescence rules from Scheduler.isHeldOpen and getIdleExpiration: - no-stuck (primary): a non-closed schedule may have no pending task only when held open (paused or draining a backfill); any other taskless, non-closed state is the stuck-open bug class. - idle-not-while-held-open: a held-open schedule must not arm an idle close. - closed-is-terminal: a closed schedule never reopens. - high-water-mark monotonicity for the generator and invoker. Deterministic table tests run several schedules (running, paused, exhausted, short-interval) forward with the hook wired in, plus direct unit tests proving the predicate flags a stuck state and an HWM regression while allowing a legitimate held-open-without-task state. Co-Authored-By: Claude Opus 4.8 (1M context) (cherry picked from commit 1c2ac604e4638e60b8f2ca180ba25e671f0c936b) --- .../lib/scheduler/schedulertest/invariants.go | 100 +++++++++++++++ .../schedulertest/invariants_test.go | 115 ++++++++++++++++++ 2 files changed, 215 insertions(+) create mode 100644 chasm/lib/scheduler/schedulertest/invariants.go create mode 100644 chasm/lib/scheduler/schedulertest/invariants_test.go diff --git a/chasm/lib/scheduler/schedulertest/invariants.go b/chasm/lib/scheduler/schedulertest/invariants.go new file mode 100644 index 0000000000..9ef00eb909 --- /dev/null +++ b/chasm/lib/scheduler/schedulertest/invariants.go @@ -0,0 +1,100 @@ +package schedulertest + +import ( + "fmt" +) + +// Violation describes a single broken invariant. +type Violation struct { + Name string + Message string +} + +func (v Violation) String() string { + return fmt.Sprintf("[%s] %s", v.Name, v.Message) +} + +// CheckInvariants evaluates the scheduler invariants against a settled state +// (cur). When prev is non-nil it also checks cross-step invariants (monotonicity, +// closed-is-terminal). It returns all violations found; an empty slice means the +// state is healthy. +// +// The invariants encode the legitimate-quiescence rules from Scheduler.isHeldOpen +// and Scheduler.getIdleExpiration: a non-closed schedule may have no pending task +// only when it is held open (paused or draining a backfill). Any other taskless, +// non-closed state is the "stuck" bug class. +func CheckInvariants(prev *Snapshot, cur Snapshot) []Violation { + var violations []Violation + add := func(name, format string, args ...any) { + violations = append(violations, Violation{Name: name, Message: fmt.Sprintf(format, args...)}) + } + + // (1) No-stuck (primary): a live schedule must always have a way to make + // progress. The only taskless, non-closed state allowed is "held open" + // (paused or pending backfill). When idle is armed there is a pending idle + // task, so HasPendingTask covers that case. + if !cur.Closed && !cur.HasPendingTask && !cur.IsHeldOpen { + add("no-stuck", + "scheduler has no pending task, is not closed, and is not held open "+ + "(paused=%v backfillers=%d) at now=%s — stuck", + cur.Paused, cur.BackfillerCount, cur.Now.Format("2006-01-02T15:04:05")) + } + + // (2) Idle correctness: a held-open schedule must never have an idle-close + // armed; idle close would race the customer's intent to resume/drain. + if cur.IsHeldOpen && !cur.IdleCloseTime.IsZero() { + add("idle-not-while-held-open", + "scheduler is held open (paused=%v backfillers=%d) but has idle close armed at %s", + cur.Paused, cur.BackfillerCount, cur.IdleCloseTime.Format("2006-01-02T15:04:05")) + } + + if prev != nil { + // (3) Closed is terminal: a closed schedule must never reopen. + if prev.Closed && !cur.Closed { + add("closed-terminal", "scheduler reopened after being closed") + } + + // (4) High-water-mark monotonicity: generator/invoker HWMs never regress. + if cur.GeneratorLPT.Before(prev.GeneratorLPT) { + add("hwm-monotonic-generator", + "generator high-water mark went backward: %s -> %s", + prev.GeneratorLPT.Format("2006-01-02T15:04:05"), + cur.GeneratorLPT.Format("2006-01-02T15:04:05")) + } + if cur.InvokerLPT.Before(prev.InvokerLPT) { + add("hwm-monotonic-invoker", + "invoker high-water mark went backward: %s -> %s", + prev.InvokerLPT.Format("2006-01-02T15:04:05"), + cur.InvokerLPT.Format("2006-01-02T15:04:05")) + } + } + + return violations +} + +// fataler is the subset of testing.TB the invariant assertion helper needs. +type fataler interface { + Helper() + Fatalf(format string, args ...any) +} + +// CheckInvariantsHook returns an AfterStep hook that captures a snapshot after +// every step and fails the test on the first invariant violation. It threads the +// previous snapshot so cross-step invariants are checked. This is the normal way +// invariant checking is wired into a Driver: +// +// d := NewDriver(t) +// d.AfterStep = CheckInvariantsHook(t) +func CheckInvariantsHook(t fataler) func(d *Driver) { + var prev *Snapshot + return func(d *Driver) { + t.Helper() + cur := d.Snapshot() + if violations := CheckInvariants(prev, cur); len(violations) > 0 { + t.Fatalf("scheduler invariant violation at step (now=%s): %s", + cur.Now.Format("2006-01-02T15:04:05"), violations[0]) + } + snap := cur + prev = &snap + } +} diff --git a/chasm/lib/scheduler/schedulertest/invariants_test.go b/chasm/lib/scheduler/schedulertest/invariants_test.go new file mode 100644 index 0000000000..b16e29d0ef --- /dev/null +++ b/chasm/lib/scheduler/schedulertest/invariants_test.go @@ -0,0 +1,115 @@ +package schedulertest_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + schedulepb "go.temporal.io/api/schedule/v1" + "go.temporal.io/server/chasm/lib/scheduler/schedulertest" + "google.golang.org/protobuf/types/known/durationpb" +) + +// TestInvariants_HoldAcrossScenarios runs a variety of schedules forward with +// the invariant hook wired in and asserts no invariant fires. Each scenario +// exercises a different legitimate-quiescence path (running, paused, exhausted, +// end-time passed). +func TestInvariants_HoldAcrossScenarios(t *testing.T) { + cases := []struct { + name string + schedule func() *schedulepb.Schedule + maxSteps int + }{ + { + name: "running interval", + schedule: schedulertest.DefaultSchedule, + maxSteps: 30, + }, + { + name: "paused interval (held open, keeps ticking)", + schedule: func() *schedulepb.Schedule { + s := schedulertest.DefaultSchedule() + s.State.Paused = true + return s + }, + maxSteps: 15, + }, + { + name: "limited actions exhausted (idles then closes)", + schedule: func() *schedulepb.Schedule { + s := schedulertest.DefaultSchedule() + s.State.LimitedActions = true + s.State.RemainingActions = 0 + return s + }, + maxSteps: 10, + }, + { + name: "limited actions, a few then idle", + schedule: func() *schedulepb.Schedule { + s := schedulertest.DefaultSchedule() + s.State.LimitedActions = true + s.State.RemainingActions = 3 + return s + }, + maxSteps: 30, + }, + { + name: "short interval, long catchup", + schedule: func() *schedulepb.Schedule { + s := schedulertest.DefaultSchedule() + s.Spec.Interval[0].Interval = durationpb.New(10 * time.Second) + return s + }, + maxSteps: 40, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + d := schedulertest.NewDriver(t, schedulertest.WithSchedule(tc.schedule())) + d.AfterStep = schedulertest.CheckInvariantsHook(t) + // The hook fails the test on any violation; we just drive it forward. + d.RunToQuiescence(tc.maxSteps) + }) + } +} + +// TestCheckInvariants_DetectsStuck is a direct unit test of the invariant +// predicate: a non-closed schedule with no pending task that is not held open is +// flagged as stuck. +func TestCheckInvariants_DetectsStuck(t *testing.T) { + stuck := schedulertest.Snapshot{ + Now: time.Now(), + HasPendingTask: false, + Closed: false, + IsHeldOpen: false, + } + violations := schedulertest.CheckInvariants(nil, stuck) + require.NotEmpty(t, violations, "stuck state must be flagged") + require.Equal(t, "no-stuck", violations[0].Name) +} + +// TestCheckInvariants_AllowsHeldOpenWithoutTask confirms a paused schedule with +// no pending task is NOT flagged (legitimate hold-open). +func TestCheckInvariants_AllowsHeldOpenWithoutTask(t *testing.T) { + heldOpen := schedulertest.Snapshot{ + Now: time.Now(), + HasPendingTask: false, + Closed: false, + Paused: true, + IsHeldOpen: true, + } + require.Empty(t, schedulertest.CheckInvariants(nil, heldOpen)) +} + +// TestCheckInvariants_DetectsHwmRegression confirms the high-water-mark +// monotonicity invariant fires when the generator HWM moves backward. +func TestCheckInvariants_DetectsHwmRegression(t *testing.T) { + t0 := time.Date(2020, 1, 1, 0, 5, 0, 0, time.UTC) + prev := schedulertest.Snapshot{HasPendingTask: true, GeneratorLPT: t0} + cur := schedulertest.Snapshot{HasPendingTask: true, GeneratorLPT: t0.Add(-time.Minute)} + violations := schedulertest.CheckInvariants(&prev, cur) + require.NotEmpty(t, violations) + require.Equal(t, "hwm-monotonic-generator", violations[0].Name) +}