From 651fb896b54eb43be4901234fd73152862e8ccb7 Mon Sep 17 00:00:00 2001 From: Feiyang Xie Date: Wed, 29 Apr 2026 17:09:21 -0700 Subject: [PATCH 1/3] add state-based replication to time skipping --- api/persistence/v1/executions.pb.go | 22 +- .../api/persistence/v1/executions.proto | 9 +- .../timer_queue_standby_task_executor.go | 60 ++- .../timer_queue_standby_task_executor_test.go | 103 +++++ .../history/workflow/mutable_state_impl.go | 57 ++- .../workflow/mutable_state_impl_test.go | 122 ++++++ service/history/workflow/task_generator.go | 16 +- .../history/workflow/task_generator_test.go | 65 +++ service/history/workflow/task_refresher.go | 22 +- service/history/workflow/timer_sequence.go | 6 + tests/xdc/timeskipping_replication_test.go | 380 ++++++++++++++++++ 11 files changed, 825 insertions(+), 37 deletions(-) create mode 100644 tests/xdc/timeskipping_replication_test.go diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index d42bdb3ce83..bbb941a5647 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -1170,11 +1170,17 @@ type TimeSkippingInfo struct { // Current time-skipping configuration applied to the workflow. Config *v13.TimeSkippingConfig `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"` // Total skipped duration for the current workflow execution run, including any + // inherited skipped duration carried over from a preceding execution that started this run. AccumulatedSkippedDuration *durationpb.Duration `protobuf:"bytes,2,opt,name=accumulated_skipped_duration,json=accumulatedSkippedDuration,proto3" json:"accumulated_skipped_duration,omitempty"` // The current fast-forward info for time skipping. FastForwardInfo *FastForwardInfo `protobuf:"bytes,4,opt,name=fast_forward_info,json=fastForwardInfo,proto3" json:"fast_forward_info,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Task regeneration status is used to track the status of the task regeneration. + // 0: Not set + // 1: Need to regenerate tasks + // 2: All tasks regenerated + TaskRegenerationStatus int32 `protobuf:"varint,5,opt,name=task_regeneration_status,json=taskRegenerationStatus,proto3" json:"task_regeneration_status,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *TimeSkippingInfo) Reset() { @@ -1228,6 +1234,13 @@ func (x *TimeSkippingInfo) GetFastForwardInfo() *FastForwardInfo { return nil } +func (x *TimeSkippingInfo) GetTaskRegenerationStatus() int32 { + if x != nil { + return x.TaskRegenerationStatus + } + return 0 +} + type FastForwardInfo struct { state protoimpl.MessageState `protogen:"open.v1"` // Target time for the fast-forward, expressed in virtual time. @@ -5040,11 +5053,12 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "&ChildrenInitializedPostResetPointEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12H\n" + "\x05value\x18\x02 \x01(\v22.temporal.server.api.persistence.v1.ResetChildInfoR\x05value:\x028\x01B\x1c\n" + - "\x1alast_workflow_task_failureJ\x04\b\b\x10\tJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11J\x04\bp\x10qJ\x04\b,\x10-J\x04\b-\x10.J\x04\b/\x100J\x04\b0\x101J\x04\b1\x102J\x04\b2\x103\"\xba\x02\n" + + "\x1alast_workflow_task_failureJ\x04\b\b\x10\tJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11J\x04\bp\x10qJ\x04\b,\x10-J\x04\b-\x10.J\x04\b/\x100J\x04\b0\x101J\x04\b1\x102J\x04\b2\x103\"\xf4\x02\n" + "\x10TimeSkippingInfo\x12B\n" + "\x06config\x18\x01 \x01(\v2*.temporal.api.common.v1.TimeSkippingConfigR\x06config\x12[\n" + "\x1caccumulated_skipped_duration\x18\x02 \x01(\v2\x19.google.protobuf.DurationR\x1aaccumulatedSkippedDuration\x12_\n" + - "\x11fast_forward_info\x18\x04 \x01(\v23.temporal.server.api.persistence.v1.FastForwardInfoR\x0ffastForwardInfoJ\x04\b\x03\x10\x04R\x1ecurrent_elapsed_duration_bound\"\x97\x01\n" + + "\x11fast_forward_info\x18\x04 \x01(\v23.temporal.server.api.persistence.v1.FastForwardInfoR\x0ffastForwardInfo\x128\n" + + "\x18task_regeneration_status\x18\x05 \x01(\x05R\x16taskRegenerationStatusJ\x04\b\x03\x10\x04R\x1ecurrent_elapsed_duration_bound\"\x97\x01\n" + "\x0fFastForwardInfo\x12;\n" + "\vtarget_time\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\n" + "targetTime\x12\x1f\n" + diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index a729bafa62b..4cbe186daac 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -328,10 +328,17 @@ message TimeSkippingInfo { temporal.api.common.v1.TimeSkippingConfig config = 1; // Total skipped duration for the current workflow execution run, including any - ./* inherited skipped duration carried over from a preceding execution that started this run. */google.protobuf.Duration accumulated_skipped_duration = 2; + // inherited skipped duration carried over from a preceding execution that started this run. + google.protobuf.Duration accumulated_skipped_duration = 2; // The current fast-forward info for time skipping. FastForwardInfo fast_forward_info = 4; + + // Task regeneration status is used to track the status of the task regeneration. + // 0: Not set + // 1: Need to regenerate tasks + // 2: All tasks regenerated + int32 task_regeneration_status = 5; } message FastForwardInfo { diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index 560e0c2d6f0..0aec256735e 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -110,9 +110,7 @@ func (t *timerQueueStandbyTaskExecutor) Execute( case *tasks.ChasmTask: err = t.executeChasmSideEffectTimerTask(ctx, task) case *tasks.TimeSkippingTimerTask: - // todo@time-skipping: replication. The disable-after-fast-forward transition is emitted - // on the active side and will replicate; standby drops the local task. - err = nil + err = t.executeTimeSkippingTimerTask(ctx, task) default: err = queueserrors.NewUnprocessableTaskError("unknown task type") } @@ -231,17 +229,53 @@ func (t *timerQueueStandbyTaskExecutor) discardChasmTask( ) } +// executeTimeSkippingTimerTask waits on the standby until the active cluster +// replicates the fast-forward transition. If the fast-forward this task was +// generated for is still pending (same source event and not yet reached), the +// task is retried until the discard delay elapses; otherwise it is acked. +func (t *timerQueueStandbyTaskExecutor) executeTimeSkippingTimerTask( + ctx context.Context, + timerTask *tasks.TimeSkippingTimerTask, +) error { + actionFn := func(_ context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) { + if !mutableState.IsWorkflowExecutionRunning() { + return nil, nil + } + tsi := mutableState.GetExecutionInfo().GetTimeSkippingInfo() + ffi := tsi.GetFastForwardInfo() + + // the fast-forward this timer task is associated with is still valid and has not been reached so keep waiting + if ffi != nil && ffi.GetSourceEventId() == timerTask.EventID && !ffi.GetHasReached() { + return &struct{}{}, nil + } + return nil, nil + } + + return t.processTimer( + ctx, + timerTask, + actionFn, + getStandbyPostActionFn( + timerTask, + t.getCurrentTime, + t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), + t.checkExecutionStillExistsOnSourceBeforeDiscard, + ), + ) +} + func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( ctx context.Context, timerTask *tasks.UserTimerTask, ) error { - referenceTime := t.Now() actionFn := func(_ context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) { if !mutableState.IsWorkflowExecutionRunning() { // workflow already finished, no need to process the timer return nil, nil } + referenceTime := mutableState.Now() + timerSequence := t.getTimerSequence(mutableState) timerSequenceIDs := timerSequence.LoadAndSortUserTimers() if len(timerSequenceIDs) > 0 { @@ -253,6 +287,10 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( return nil, serviceerror.NewInternal(errString) } + // Use mutableState.Now() as reference time as a mutable state may use virtual time + // which can skip duration and be before the wall clock time. + // And when this happens the timerSequenceID.Timestamp is also virtual time and before the wall clock time, + // while the timerTask.VisibilityTimestamp uses the wall clock time that maps to the virtual time. if queues.IsTimeExpired( timerTask, referenceTime, @@ -295,13 +333,14 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( // // the overall solution is to attempt to generate a new activity timer task whenever the // task passed in is safe to be throw away. - referenceTime := t.Now() actionFn := func(ctx context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) { if !mutableState.IsWorkflowExecutionRunning() { // workflow already finished, no need to process the timer return nil, nil } + referenceTime := mutableState.Now() + timerSequence := t.getTimerSequence(mutableState) updateMutableState := false timerSequenceIDs := timerSequence.LoadAndSortActivityTimers() @@ -314,6 +353,10 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( return nil, serviceerror.NewInternal(errString) } + // Use mutableState.Now() as reference time as a mutable state may use virtual time + // which can skip duration and be before the wall clock time. + // And when this happens the timerSequenceID.Timestamp is also virtual time and before the wall clock time, + // while the timerTask.VisibilityTimestamp uses the wall clock time that maps to the virtual time. if queues.IsTimeExpired( timerTask, referenceTime, @@ -336,6 +379,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( // created. isHeartBeatTask := timerTask.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID) + if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask, timerTask.GetVisibilityTime(), mutableState.ToRealTime(heartbeatTimeoutVis)) { if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil { return nil, err @@ -774,6 +818,12 @@ func (t *timerQueueStandbyTaskExecutor) pushActivity( ) } +// getCurrentTime returns the shard's wall-clock view of "now" for t.clusterName. +// Must stay wall-clock: it gates standby task-retry timing against VisibilityTime +// (also wall-clock); mutableState.Now() is virtual time and would force-discard +// time-skipping workflows. actionFn closures compare against virtual timestamps, +// so they use mutableState.Now() instead. +// // TODO: deprecate this function and always use t.Now() // Only test code sets t.clusterName to be non-current cluster name // and advance the time by setting calling shardContext.SetCurrentTime. diff --git a/service/history/timer_queue_standby_task_executor_test.go b/service/history/timer_queue_standby_task_executor_test.go index 18239bdfdcd..ebc43f7c203 100644 --- a/service/history/timer_queue_standby_task_executor_test.go +++ b/service/history/timer_queue_standby_task_executor_test.go @@ -2459,6 +2459,109 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteChasmPureTimerTask_Valid s.ErrorIs(expectedErr, resp.ExecutionErr) } +// makeTimeSkippingMS builds a running mutable state, snapshots it to a persistence proto, +// and returns the persistence proto plus the workflow key. The caller can mutate the returned +// ExecutionInfo (e.g. set TimeSkippingInfo) before programming GetWorkflowExecution. +func (s *timerQueueStandbyTaskExecutorSuite) makeTimeSkippingMS() (*persistencespb.WorkflowMutableState, definition.WorkflowKey) { + execution := &commonpb.WorkflowExecution{ + WorkflowId: "ts-bound-wf-" + uuid.NewString(), + RunId: uuid.NewString(), + } + workflowKey := definition.NewWorkflowKey(s.namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId()) + + mutableState := workflow.TestGlobalMutableState( + s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId()) + event, err := mutableState.AddWorkflowExecutionStartedEvent( + execution, + &historyservice.StartWorkflowExecutionRequest{ + Attempt: 1, + NamespaceId: s.namespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowType: &commonpb.WorkflowType{Name: "test-wf-type"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-tq"}, + WorkflowRunTimeout: durationpb.New(200 * time.Second), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + }, + }, + ) + s.NoError(err) + + pms := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) + return pms, workflowKey +} + +// makeTimeSkippingPendingMS builds an MS that puts the standby's action function on +// the "still waiting" path: fast-forward matches the task's source event and HasReached=false. +func (s *timerQueueStandbyTaskExecutorSuite) makeTimeSkippingPendingMS() (*persistencespb.WorkflowMutableState, definition.WorkflowKey) { + pms, workflowKey := s.makeTimeSkippingMS() + pms.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + Config: &commonpb.TimeSkippingConfig{ + Enabled: true, + FastForward: durationpb.New(time.Hour), + }, + FastForwardInfo: &persistencespb.FastForwardInfo{ + TargetTime: timestamppb.New(s.now.Add(time.Hour)), + SourceEventId: 1, + }, + } + return pms, workflowKey +} + +func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteTimeSkippingTimerTask_Wait() { + pms, workflowKey := s.makeTimeSkippingPendingMS() + + timerTask := &tasks.TimeSkippingTimerTask{ + WorkflowKey: workflowKey, + TaskID: s.mustGenerateTaskID(), + VisibilityTimestamp: s.now, + EventID: 1, + } + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). + Return(&persistence.GetWorkflowExecutionResponse{State: pms}, nil) + + s.mockShard.SetCurrentTime(s.clusterName, s.now) + resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) +} + +func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteTimeSkippingTimerTask_Ack() { + // HasReached=true: active side already replicated the disable transition, + // so the standby's action function returns nil and the task is acked. + pms, workflowKey := s.makeTimeSkippingPendingMS() + pms.ExecutionInfo.TimeSkippingInfo.FastForwardInfo.HasReached = true + + timerTask := &tasks.TimeSkippingTimerTask{ + WorkflowKey: workflowKey, + TaskID: s.mustGenerateTaskID(), + VisibilityTimestamp: s.now.Add(time.Hour), + EventID: 1, + } + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). + Return(&persistence.GetWorkflowExecutionResponse{State: pms}, nil) + + s.mockShard.SetCurrentTime(s.clusterName, s.now) + resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.NoError(resp.ExecutionErr) +} + +func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteTimeSkippingTimerTask_Discard() { + pms, workflowKey := s.makeTimeSkippingPendingMS() + + timerTask := &tasks.TimeSkippingTimerTask{ + WorkflowKey: workflowKey, + TaskID: s.mustGenerateTaskID(), + VisibilityTimestamp: s.now, + EventID: 1, + } + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). + Return(&persistence.GetWorkflowExecutionResponse{State: pms}, nil) + + // Past VisibilityTime + discardDelay: ErrTaskDiscarded. + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.discardDuration)) + resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.Equal(consts.ErrTaskDiscarded, resp.ExecutionErr) +} + func (s *timerQueueStandbyTaskExecutorSuite) createPersistenceMutableState( ms historyi.MutableState, lastEventID int64, diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 997b33bb393..e6136dba5c8 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4163,32 +4163,27 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionTimeSkippingTransitionedEvent( attr := event.GetWorkflowExecutionTimeSkippingTransitionedEventAttributes() tsi := ms.executionInfo.GetTimeSkippingInfo() + opTag := tag.WorkflowActionWorkflowExecutionTimeSkippingTransitioned if tsi == nil { - return serviceerror.NewInternal( - "TimeSkippingInfo is not set when applying WorkflowExecutionTimeSkippingTransitionedEvent, mutable state is corrupted", - ) + ms.logError("TimeSkippingTransitionedEvent failed to apply: TimeSkippingInfo is nil", opTag) + return serviceerror.NewInternal("TimeSkippingTransitionedEvent failed to apply") } - if attr.TargetTime == nil && !attr.GetDisabledAfterFastForward() { - return serviceerror.NewInternal( - "empty WorkflowExecutionTimeSkippingTransitionedEvent found, event is corrupted", - ) + if timeNotSet(attr.TargetTime) && !attr.GetDisabledAfterFastForward() { + ms.logError("TimeSkippingTransitionedEvent failed to apply: no TargetTime and not disabled after fast forward", opTag) + return serviceerror.NewInternal("TimeSkippingTransitionedEvent failed to apply") } - if tsi.GetAccumulatedSkippedDuration() == nil { - tsi.AccumulatedSkippedDuration = durationpb.New(0) - } - accumulatedSkippedDuration := tsi.GetAccumulatedSkippedDuration().AsDuration() if !timeNotSet(attr.TargetTime) { - accumulatedSkippedDuration += attr.TargetTime.AsTime().Sub(event.GetEventTime().AsTime()) + asd := tsi.GetAccumulatedSkippedDuration().AsDuration() + asd += attr.TargetTime.AsTime().Sub(event.GetEventTime().AsTime()) + tsi.AccumulatedSkippedDuration = durationpb.New(asd) + tsi.TaskRegenerationStatus = TimerRegenStatusNeeded } - tsi.AccumulatedSkippedDuration = durationpb.New(accumulatedSkippedDuration) tsi.Config.Enabled = !attr.GetDisabledAfterFastForward() - if attr.GetDisabledAfterFastForward() && tsi.GetFastForwardInfo() != nil { tsi.FastForwardInfo.HasReached = true } - ms.timeSkippingInfoUpdated = true return nil } @@ -9468,6 +9463,7 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx ms.workflowTaskUpdated = false } } + ms.applyIncomingTimeSkippingInfo(current, incoming) doNotSync := func(v any) []any { info, ok := v.(*persistencespb.WorkflowExecutionInfo) @@ -9502,6 +9498,7 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx &info.StateMachineTimers, &info.TaskGenerationShardClockTimestamp, &info.UpdateInfos, + &info.TimeSkippingInfo, } if !isSnapshot { ignoreFields = append(ignoreFields, &info.SubStateMachineTombstoneBatches) @@ -9518,6 +9515,36 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx return nil } +// applyIncomingTimeSkippingInfo is used in state-based replication +// and merges the incoming TimeSkippingInfo (from a replication mutation/snapshot) into current, +// and the task regeneration status is maintained locally. +func (ms *MutableStateImpl) applyIncomingTimeSkippingInfo( + current *persistencespb.WorkflowExecutionInfo, + incoming *persistencespb.WorkflowExecutionInfo, +) { + if incoming.GetTimeSkippingInfo() == nil { + // State-based replication sends the full ExecutionInfo; nil incoming TSI + // means the active has no TSI (workflow never enabled time-skipping), so + // mirror that on the standby. + current.TimeSkippingInfo = nil + return + } + newTSI := common.CloneProto(incoming.TimeSkippingInfo) + currentSD := current.GetTimeSkippingInfo().GetAccumulatedSkippedDuration().AsDuration() + incomingSD := incoming.GetTimeSkippingInfo().GetAccumulatedSkippedDuration().AsDuration() + // maintain the local task regeneration status + if incomingSD != currentSD { + newTSI.TaskRegenerationStatus = TimerRegenStatusNeeded + } else { + if incomingSD == 0 { + newTSI.TaskRegenerationStatus = TimerRegenStatusUnset + } else { + newTSI.TaskRegenerationStatus = TimerRegenStatusCompleted + } + } + current.TimeSkippingInfo = newTSI +} + func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*persistencespb.StateMachineMap) error { // check if there is node been deleted currentHSM := ms.HSM() diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 103ab1400ca..6dd945c0dec 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -9054,3 +9054,125 @@ func (s *mutableStateSuite) TestAddContinueAsNewEvent_CompletionEventBatchID() { s.NoError(err) s.Equal(event.GetEventId(), s.mutableState.GetExecutionInfo().CompletionEventBatchId) } + +// TestApplyIncomingTimeSkippingInfo_RegenStatusFromSkipDelta verifies the merge +// contract: TaskRegenerationStatus is recomputed locally from the +// (currentSD, incomingSD) pair on every merge; the incoming wire value is +// ignored. SD changed -> Needed; SD unchanged at 0 -> Unset; SD unchanged > 0 +// -> Completed (a prior merge set Needed and the subsequent PartialRefresh +// has already regenerated). +func TestApplyIncomingTimeSkippingInfo_RegenStatusFromSkipDelta(t *testing.T) { + t.Parallel() + + ms := &MutableStateImpl{} + + for _, tc := range []struct { + name string + current *persistencespb.TimeSkippingInfo + incoming *persistencespb.TimeSkippingInfo + wantNil bool + wantRegenStatus int32 + }{ + { + name: "incoming nil clears current", + current: &persistencespb.TimeSkippingInfo{TaskRegenerationStatus: TimerRegenStatusCompleted}, + incoming: nil, + wantNil: true, + }, + { + name: "current nil, incoming has skip: Needed", + current: nil, + incoming: &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusCompleted, + }, + wantRegenStatus: TimerRegenStatusNeeded, + }, + { + name: "different skipped duration: Needed", + current: &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusCompleted, + }, + incoming: &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(2 * time.Hour), + TaskRegenerationStatus: TimerRegenStatusCompleted, + }, + wantRegenStatus: TimerRegenStatusNeeded, + }, + { + // Incoming wire status is Needed but result must be Completed, + // proving the wire value is ignored when SD is unchanged. + name: "same non-zero skip: Completed regardless of incoming wire status", + current: &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusCompleted, + }, + incoming: &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusNeeded, + }, + wantRegenStatus: TimerRegenStatusCompleted, + }, + { + // Incoming wire status is Completed but result must be Unset, + // proving the wire value is ignored when both sides are zero. + name: "both zero skip: Unset regardless of incoming wire status", + current: &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(0), + TaskRegenerationStatus: TimerRegenStatusCompleted, + }, + incoming: &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(0), + TaskRegenerationStatus: TimerRegenStatusCompleted, + }, + wantRegenStatus: TimerRegenStatusUnset, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + current := &persistencespb.WorkflowExecutionInfo{TimeSkippingInfo: tc.current} + incoming := &persistencespb.WorkflowExecutionInfo{TimeSkippingInfo: tc.incoming} + + ms.applyIncomingTimeSkippingInfo(current, incoming) + + if tc.wantNil { + require.Nil(t, current.TimeSkippingInfo) + return + } + require.NotNil(t, current.TimeSkippingInfo) + require.Equal(t, tc.wantRegenStatus, current.TimeSkippingInfo.GetTaskRegenerationStatus()) + }) + } +} + +// TestApplyIncomingTimeSkippingInfo_ClonesToBreakAliasing verifies the merge +// clones the incoming TimeSkippingInfo so subsequent local mutations +// (e.g. flipping TaskRegenerationStatus during PartialRefresh) cannot leak back +// into the wire message. +func TestApplyIncomingTimeSkippingInfo_ClonesToBreakAliasing(t *testing.T) { + t.Parallel() + + ms := &MutableStateImpl{} + + incoming := &persistencespb.WorkflowExecutionInfo{ + TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusCompleted, + }, + } + current := &persistencespb.WorkflowExecutionInfo{} + + ms.applyIncomingTimeSkippingInfo(current, incoming) + + require.NotSame(t, current.TimeSkippingInfo, incoming.TimeSkippingInfo, + "current.TimeSkippingInfo must be a cloned copy, not aliased to incoming") + + // Mutate local; incoming must not change. + current.TimeSkippingInfo.TaskRegenerationStatus = TimerRegenStatusCompleted + require.Equal(t, int32(TimerRegenStatusCompleted), + incoming.TimeSkippingInfo.GetTaskRegenerationStatus(), + "sanity: incoming was already Completed; this assertion is a regression guard if the wire value were ever mutated") +} diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index f30c96a7c02..78ad31ae847 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -1033,16 +1033,13 @@ func isPathAffectedByDelete(deletePath []hsm.Key, timerPath []*persistencespb.St } // RegenerateTimerTasksForTimeSkipping regenerates the timer tasks for time skipping. -// This function is not idempotent, but when called twice, logically the timerTasks regenerated will have the same contents, -// and the only difference is the TaskID. -// TODO@time-skipping: currently not safe to call in replication context +// Idempotent via TimeSkippingInfo.TaskRegenerationStatus: only emits when the status +// is Needed, then flips it to Completed. Safe to call in replication context — mirrors +// the TimerInfo.TaskStatus pattern in CreateNextUserTimer. func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error { - if r.mutableState.GetExecutionInfo().TimeSkippingInfo == nil { - return nil - } - accumulatedSkippedDuration := r.mutableState.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration.AsDuration() - if accumulatedSkippedDuration <= 0 { + tsi := r.mutableState.GetExecutionInfo().GetTimeSkippingInfo() + if tsi.GetTaskRegenerationStatus() != TimerRegenStatusNeeded { return nil } @@ -1093,7 +1090,6 @@ func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error { // (3) fast-forward timer — regenerate when configured so its real-time // VisibilityTimestamp tracks the new accumulated skip. - tsi := r.mutableState.GetExecutionInfo().GetTimeSkippingInfo() if tsi.GetConfig().GetEnabled() { fastForward := tsi.GetFastForwardInfo() if fastForward != nil && !fastForward.GetHasReached() { @@ -1150,5 +1146,7 @@ func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error { } // todo@time-skipping: ChasmTaskPure is not supported yet. + + tsi.TaskRegenerationStatus = TimerRegenStatusCompleted return nil } diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index ea1152eec3a..f91cacabf99 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -1196,6 +1196,7 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping(t *testing.T) { mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(skippedDuration), + TaskRegenerationStatus: TimerRegenStatusNeeded, }, }).AnyTimes() mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{ @@ -1245,6 +1246,49 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping(t *testing.T) { require.Equal(t, timer2ExpiryTime, byEventID[2].VisibilityTimestamp) } +// TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_MarksStatusAndIsIdempotent +// asserts that a successful regen flips TaskRegenerationStatus from Needed to +// Completed, and that a second back-to-back call is a no-op (mirrors the +// TimerInfo.TaskStatus idempotency in CreateNextUserTimer). +func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_MarksStatusAndIsIdempotent(t *testing.T) { + t.Parallel() + + tsi := &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusNeeded, + } + + ctrl := gomock.NewController(t) + mutableState := historyi.NewMockMutableState(ctrl) + mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TimeSkippingInfo: tsi, + }).AnyTimes() + mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{ + "timer-1": {StartedEventId: 1, ExpiryTime: timestamppb.New(time.Now().Add(time.Hour))}, + }).AnyTimes() + mutableState.EXPECT().GetWorkflowKey().Return(tests.WorkflowKey).AnyTimes() + mutableState.EXPECT().HadOrHasWorkflowTask().Return(true).AnyTimes() + mutableState.EXPECT().GetPendingActivityInfos().Return(map[int64]*persistencespb.ActivityInfo{}).AnyTimes() + + emitCount := 0 + mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(ts ...tasks.Task) { + emitCount += len(ts) + }).AnyTimes() + + taskGenerator := NewTaskGenerator(nil, mutableState, &configs.Config{}, nil, log.NewTestLogger()) + + // First call: emits, flips TaskRegenerationStatus to Completed. + require.NoError(t, taskGenerator.RegenerateTimerTasksForTimeSkipping()) + require.Positive(t, emitCount, "first call must emit at least one task") + require.Equal(t, int32(TimerRegenStatusCompleted), tsi.TaskRegenerationStatus, + "TaskRegenerationStatus must be Completed after first call") + + // Second call: idempotent — no further emissions. + emitsAfterFirst := emitCount + require.NoError(t, taskGenerator.RegenerateTimerTasksForTimeSkipping()) + require.Equal(t, emitsAfterFirst, emitCount, "second call must be a no-op") +} + func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_EdgeCases(t *testing.T) { t.Parallel() @@ -1272,6 +1316,9 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_EdgeCases(t *test execInfo: &persistencespb.WorkflowExecutionInfo{ TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(time.Hour), + // Unmarked detail required so the regen guard passes; with no + // pending timers, the function still completes but emits nothing. + TaskRegenerationStatus: TimerRegenStatusNeeded, }, }, setupTimers: func(ms *historyi.MockMutableState) { @@ -1279,6 +1326,16 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_EdgeCases(t *test // AddTasks must not be called — no expectation set. }, }, + { + name: "TaskRegenerationStatus already Completed returns immediately (idempotency)", + execInfo: &persistencespb.WorkflowExecutionInfo{ + TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusCompleted, + }, + }, + // GetPendingTimerInfos and AddTasks must not be called — no expectations set. + }, } { tc := tc t.Run(tc.name, func(t *testing.T) { @@ -1357,6 +1414,7 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_ExecutionTimers(t WorkflowRunExpirationTime: tc.runExpirationTime, TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(skippedDuration), + TaskRegenerationStatus: TimerRegenStatusNeeded, }, }).AnyTimes() mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{ @@ -1456,6 +1514,7 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_FastForwardTimer( SourceEventId: 7, HasReached: false, }, + TaskRegenerationStatus: TimerRegenStatusNeeded, }, wantFastForwardTask: &wantTask{visibilityTimestamp: fastForwardTarget, eventID: 7}, }, @@ -1471,6 +1530,7 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_FastForwardTimer( SourceEventId: 7, HasReached: true, }, + TaskRegenerationStatus: TimerRegenStatusNeeded, }, }, { @@ -1485,6 +1545,7 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_FastForwardTimer( SourceEventId: 7, HasReached: false, }, + TaskRegenerationStatus: TimerRegenStatusNeeded, }, }, { @@ -1492,6 +1553,7 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_FastForwardTimer( tsi: &persistencespb.TimeSkippingInfo{ Config: &commonpb.TimeSkippingConfig{Enabled: true}, AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusNeeded, }, }, } { @@ -1633,6 +1695,7 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_BackoffTimer(t *t Attempt: tc.attempt, TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusNeeded, }, }).AnyTimes() mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{}).AnyTimes() @@ -1689,6 +1752,7 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_ActivityRetry(t * mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusNeeded, }, }).AnyTimes() mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{}).AnyTimes() @@ -1757,6 +1821,7 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_BackoffTimer_Star Attempt: 1, TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(time.Hour), + TaskRegenerationStatus: TimerRegenStatusNeeded, }, }).AnyTimes() mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{}).AnyTimes() diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 078a6950777..6ff4c59996f 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -198,9 +198,16 @@ func (r *TaskRefresherImpl) PartialRefresh( return err } - return r.refreshTasksForSubStateMachines( + if err := r.refreshTasksForSubStateMachines( mutableState, minVersionedTransition, + ); err != nil { + return err + } + + return r.refreshTasksForTimeSkipping( + mutableState, + taskGenerator, ) } @@ -439,8 +446,6 @@ func (r *TaskRefresherImpl) refreshTasksForTimer( return nil } - // if mutableState.ExecutionInfo.TimeSkippingInfo changed, - // we need to pendingTimerInfos := mutableState.GetPendingTimerInfos() for _, timerInfo := range pendingTimerInfos { @@ -465,6 +470,17 @@ func (r *TaskRefresherImpl) refreshTasksForTimer( return err } +func (r *TaskRefresherImpl) refreshTasksForTimeSkipping( + mutableState historyi.MutableState, + taskGenerator TaskGenerator, +) error { + executionState := mutableState.GetExecutionState() + if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { + return nil + } + return taskGenerator.RegenerateTimerTasksForTimeSkipping() +} + func (r *TaskRefresherImpl) refreshTasksForChildWorkflow( mutableState historyi.MutableState, taskGenerator TaskGenerator, diff --git a/service/history/workflow/timer_sequence.go b/service/history/workflow/timer_sequence.go index 83c65ccefd4..eb5fad28de0 100644 --- a/service/history/workflow/timer_sequence.go +++ b/service/history/workflow/timer_sequence.go @@ -31,6 +31,12 @@ const ( TimerTaskStatusCreatedHeartbeat ) +const ( + TimerRegenStatusUnset = iota + TimerRegenStatusNeeded + TimerRegenStatusCompleted +) + type ( // TimerSequenceID represent a in mem timer TimerSequenceID struct { diff --git a/tests/xdc/timeskipping_replication_test.go b/tests/xdc/timeskipping_replication_test.go new file mode 100644 index 00000000000..f2a99bcffa5 --- /dev/null +++ b/tests/xdc/timeskipping_replication_test.go @@ -0,0 +1,380 @@ +package xdc + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + commandpb "go.temporal.io/api/command/v1" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/api/historyservice/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/chasm" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/testing/await" + "go.temporal.io/server/common/testing/taskpoller" + "go.temporal.io/server/common/testing/testvars" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" +) + +type timeSkippingReplicationSuite struct { + xdcBaseSuite +} + +func TestTimeSkippingReplicationSuite(t *testing.T) { + t.Parallel() + s := &timeSkippingReplicationSuite{} + suite.Run(t, s) +} + +func (s *timeSkippingReplicationSuite) SetupSuite() { + s.dynamicConfigOverrides = map[dynamicconfig.Key]any{ + dynamicconfig.TimeSkippingEnabled.Key(): true, + } + // Drive the state-based replication path so applyIncomingTimeSkippingInfo runs. + // Without this, events alone replicate TimeSkippingInfo via their handlers, and + // the state-based merge function added by this commit is never exercised. + s.enableTransitionHistory = true + s.logger = log.NewTestLogger() + s.setupSuite() +} + +func (s *timeSkippingReplicationSuite) TearDownSuite() { + s.tearDownSuite() +} + +func (s *timeSkippingReplicationSuite) SetupTest() { + s.setupTest() +} + +// describeNamespaceID looks up the namespace ID from cluster[0]; the same ID is +// shared on standby once the namespace replicates. +func (s *timeSkippingReplicationSuite) describeNamespaceID(ctx context.Context, ns string) string { + resp, err := s.clusters[0].FrontendClient().DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{ + Namespace: ns, + }) + s.NoError(err) + return resp.GetNamespaceInfo().GetId() +} + +// getExecutionInfoFromCluster reads the persisted ExecutionInfo from the given cluster's +// history service. Use the database (not cache) view to ensure we observe replicated state. +func (s *timeSkippingReplicationSuite) getExecutionInfoFromCluster( + ctx context.Context, + clusterIdx int, + nsID, wfID, runID string, +) *persistencespb.WorkflowExecutionInfo { + resp, err := s.clusters[clusterIdx].HistoryClient().DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{ + NamespaceId: nsID, + Execution: &commonpb.WorkflowExecution{WorkflowId: wfID, RunId: runID}, + ArchetypeId: chasm.WorkflowArchetypeID, + }) + s.NoError(err) + s.NotNil(resp.GetDatabaseMutableState()) + return resp.GetDatabaseMutableState().GetExecutionInfo() +} + +// waitForTimeSkippingInfoSynced blocks until the standby cluster's TimeSkippingInfo +// agrees with the active's on Config and AccumulatedSkippedDuration. TaskRegenerationStatus +// is cluster-local (regen is re-run on standby after replication) and is not asserted. +func (s *timeSkippingReplicationSuite) waitForTimeSkippingInfoSynced( + ctx context.Context, + nsID, wfID, runID string, +) { + s.waitForClusterSynced() + await.Require(ctx, s.T(), func(t *await.T) { + active := s.getExecutionInfoFromCluster(ctx, 0, nsID, wfID, runID).GetTimeSkippingInfo() + standby := s.getExecutionInfoFromCluster(ctx, 1, nsID, wfID, runID).GetTimeSkippingInfo() + require.NotNil(t, active, "active TimeSkippingInfo must be present") + require.NotNil(t, standby, "standby TimeSkippingInfo must be present") + require.True(t, proto.Equal(active.GetConfig(), standby.GetConfig()), + "config mismatch: active=%v standby=%v", active.GetConfig(), standby.GetConfig()) + require.Equal(t, + active.GetAccumulatedSkippedDuration().AsDuration(), + standby.GetAccumulatedSkippedDuration().AsDuration(), + "accumulated skipped duration must match") + }, replicationWaitTime, replicationCheckInterval) +} + +// startSkippingWorkflow starts a workflow on the active cluster (cluster[0]) with +// the given TimeSkippingConfig and optional WorkflowStartDelay. Returns the run ID. +// The start-delay scenario triggers a skip on the very first close transaction +// (no WT yet, ExecutionTime > StartTime), giving us deterministic accumulated skip +// without needing to drive any workflow tasks. +func (s *timeSkippingReplicationSuite) startSkippingWorkflow( + ctx context.Context, + ns, wfID, tq string, + runTimeout, startDelay time.Duration, + cfg *commonpb.TimeSkippingConfig, +) string { + req := &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.NewString(), + Namespace: ns, + WorkflowId: wfID, + WorkflowType: &commonpb.WorkflowType{Name: "ts-replication-wf"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: tq, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + WorkflowRunTimeout: durationpb.New(runTimeout), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + TimeSkippingConfig: cfg, + } + if startDelay > 0 { + req.WorkflowStartDelay = durationpb.New(startDelay) + } + resp, err := s.clusters[0].FrontendClient().StartWorkflowExecution(ctx, req) + s.NoError(err) + return resp.GetRunId() +} + +// completeFirstWorkflowTask polls and completes the initial workflow task on the +// active cluster with no commands, leaving the workflow open and idle. Once idle, +// a registered FastForward fires on the next close transaction (see the +// FastForward functional tests for the same pattern). +func (s *timeSkippingReplicationSuite) completeFirstWorkflowTask(ns, wfID, tq string) { + tv := testvars.New(s.T()).WithTaskQueue(tq).WithWorkflowID(wfID) + poller := taskpoller.New(s.T(), s.clusters[0].FrontendClient(), ns) + _, err := poller.PollAndHandleWorkflowTask(tv, + func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{}, nil + }) + s.NoError(err) +} + +// TestBasicSkipReplicates verifies the core replication contract for time-skipping: +// a skip transition applied on the active cluster's mutable state replicates to the +// standby with matching Config and AccumulatedSkippedDuration. TaskRegenerationStatus +// is cluster-local (the standby re-runs regen after replication) and is not asserted. +func (s *timeSkippingReplicationSuite) TestBasicSkipReplicates() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + ns := s.createGlobalNamespace() + nsID := s.describeNamespaceID(ctx, ns) + wfID := "ts-repl-basic-" + uuid.NewString() + tq := "ts-repl-basic-tq-" + uuid.NewString() + + const ( + startDelay = time.Hour + accumTol = 100 * time.Millisecond + ) + runID := s.startSkippingWorkflow(ctx, ns, wfID, tq, 24*time.Hour, startDelay, + &commonpb.TimeSkippingConfig{Enabled: true}) + + // Wait for the start close-transaction skip to land on active. + await.Require(ctx, s.T(), func(t *await.T) { + info := s.getExecutionInfoFromCluster(ctx, 0, nsID, wfID, runID).GetTimeSkippingInfo() + require.NotNil(t, info, "active must persist TimeSkippingInfo after start") + require.Greater(t, info.GetAccumulatedSkippedDuration().AsDuration(), 30*time.Minute, + "active must accumulate ~startDelay of skip on the first close transaction") + }, 15*time.Second, 200*time.Millisecond) + + s.waitForTimeSkippingInfoSynced(ctx, nsID, wfID, runID) + + active := s.getExecutionInfoFromCluster(ctx, 0, nsID, wfID, runID).GetTimeSkippingInfo() + standby := s.getExecutionInfoFromCluster(ctx, 1, nsID, wfID, runID).GetTimeSkippingInfo() + s.True(proto.Equal(active.GetConfig(), standby.GetConfig())) + s.Equal( + active.GetAccumulatedSkippedDuration().AsDuration(), + standby.GetAccumulatedSkippedDuration().AsDuration(), + ) + s.InDelta(float64(startDelay), float64(standby.GetAccumulatedSkippedDuration().AsDuration()), float64(accumTol), + "standby's accumulated skip should match the configured startDelay within tolerance") +} + +// TestFastForwardDisablePropagates verifies that completing a registered FastForward +// on the active — which flips Config.Enabled to false and accumulates the fast-forward +// duration — replicates to the standby. After replication, standby's config must +// report Enabled=false so subsequent skip checks short-circuit there too. +func (s *timeSkippingReplicationSuite) TestFastForwardDisablePropagates() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + ns := s.createGlobalNamespace() + nsID := s.describeNamespaceID(ctx, ns) + wfID := "ts-repl-ff-" + uuid.NewString() + tq := "ts-repl-ff-tq-" + uuid.NewString() + + const ( + fastForward = 30 * time.Minute + accumTol = 30 * time.Second + ) + runID := s.startSkippingWorkflow(ctx, ns, wfID, tq, 24*time.Hour, 0, + &commonpb.TimeSkippingConfig{ + Enabled: true, + FastForward: durationpb.New(fastForward), + }, + ) + + // Drive the initial workflow task so the workflow goes idle; the fast-forward + // fires on the next close transaction. + s.completeFirstWorkflowTask(ns, wfID, tq) + + // Wait for the fast-forward to complete on active: Enabled must flip to false, + // HasReached must be set, and accumulated must land at ~fastForward. + await.Require(ctx, s.T(), func(t *await.T) { + info := s.getExecutionInfoFromCluster(ctx, 0, nsID, wfID, runID).GetTimeSkippingInfo() + require.NotNil(t, info) + require.False(t, info.GetConfig().GetEnabled(), + "completing the fast-forward must flip Config.Enabled to false") + require.True(t, info.GetFastForwardInfo().GetHasReached(), "active must set HasReached=true") + require.InDelta(t, + float64(fastForward), float64(info.GetAccumulatedSkippedDuration().AsDuration()), + float64(accumTol), + "accumulated must land at ~fastForward after the disable transition") + }, 30*time.Second, 200*time.Millisecond) + + s.waitForTimeSkippingInfoSynced(ctx, nsID, wfID, runID) + + standby := s.getExecutionInfoFromCluster(ctx, 1, nsID, wfID, runID).GetTimeSkippingInfo() + s.False(standby.GetConfig().GetEnabled(), + "standby must observe Config.Enabled=false after fast-forward replication") + s.InDelta(float64(fastForward), float64(standby.GetAccumulatedSkippedDuration().AsDuration()), float64(accumTol)) + + // Standby's history must contain the TimeSkippingTransitioned event marking + // the fast-forward disable — proves the event itself (not just the MS field) + // replicated from active. + histResp, err := s.clusters[1].FrontendClient().GetWorkflowExecutionHistory(ctx, + &workflowservice.GetWorkflowExecutionHistoryRequest{ + Namespace: ns, + Execution: &commonpb.WorkflowExecution{WorkflowId: wfID, RunId: runID}, + }) + s.NoError(err) + disableTransitions := 0 + for _, ev := range histResp.GetHistory().GetEvents() { + if ev.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPING_TRANSITIONED { + continue + } + if ev.GetWorkflowExecutionTimeSkippingTransitionedEventAttributes().GetDisabledAfterFastForward() { + disableTransitions++ + } + } + s.Equal(1, disableTransitions, + "standby history must contain exactly one TimeSkippingTransitioned event with DisabledAfterFastForward=true") +} + +// TestStandbyTimeSkippingTimerTaskAcksOnReachedFastForward drives the standby +// executor's executeTimeSkippingTimerTask end-to-end. A registered FastForward +// installs a TimeSkippingTimerTask whose visibility is scheduled wall-clock time. +// On the standby this task is regenerated during state-based replication via +// task_refresher.refreshTasksForTimeSkipping. +// +// Scenario: +// +// 1. Start a workflow on active with TimeSkippingConfig {Enabled, FastForward}. +// 2. Drive the initial workflow task so the workflow goes idle; the fast-forward +// fires, flipping Enabled=false and setting HasReached=true. +// 3. State replicates to standby. Standby's own TimeSkippingTimerTask was generated +// by the refresh; when it fires, executeTimeSkippingTimerTask runs and observes +// SourceEventId-match with HasReached=true → ack branch. +// +// Convergence on both clusters proves the standby's executor path is wired up, +// returns no error, and doesn't hang or crash. +func (s *timeSkippingReplicationSuite) TestStandbyTimeSkippingTimerTaskAcksOnReachedFastForward() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + ns := s.createGlobalNamespace() + nsID := s.describeNamespaceID(ctx, ns) + wfID := "ts-repl-ff-timer-" + uuid.NewString() + tq := "ts-repl-ff-timer-tq-" + uuid.NewString() + + const fastForward = 30 * time.Minute + runID := s.startSkippingWorkflow(ctx, ns, wfID, tq, 24*time.Hour, 0, + &commonpb.TimeSkippingConfig{ + Enabled: true, + FastForward: durationpb.New(fastForward), + }, + ) + + s.completeFirstWorkflowTask(ns, wfID, tq) + + // Active reaches the fast-forward: Enabled flips to false and HasReached is set. + // This is driven by the TimeSkippingTimerTask firing on the active. + await.Require(ctx, s.T(), func(t *await.T) { + info := s.getExecutionInfoFromCluster(ctx, 0, nsID, wfID, runID).GetTimeSkippingInfo() + require.NotNil(t, info) + require.False(t, info.GetConfig().GetEnabled(), + "active must flip Enabled=false after the fast-forward timer fires") + ff := info.GetFastForwardInfo() + require.NotNil(t, ff) + require.True(t, ff.GetHasReached(), "active must set HasReached=true") + }, 30*time.Second, 200*time.Millisecond) + + s.waitForTimeSkippingInfoSynced(ctx, nsID, wfID, runID) + + // Standby must converge to the same end state. The standby's own + // TimeSkippingTimerTask was generated by refreshTasksForTimeSkipping during + // replication; it fires through executeTimeSkippingTimerTask and acks because + // HasReached is already true by that time. + standby := s.getExecutionInfoFromCluster(ctx, 1, nsID, wfID, runID).GetTimeSkippingInfo() + s.NotNil(standby) + s.False(standby.GetConfig().GetEnabled(), + "standby must observe Config.Enabled=false after fast-forward replication") + ff := standby.GetFastForwardInfo() + s.NotNil(ff) + s.True(ff.GetHasReached(), + "standby must observe HasReached=true; absence indicates the standby's TimeSkippingTimerTask didn't replicate the fast-forward transition correctly") +} + +// TestFailoverPreservesAccumulatedSkip verifies that after failover, the new active +// cluster preserves the AccumulatedSkippedDuration accumulated under the previous +// active and can drive further work — the regenerated WorkflowBackoffTimerTask on +// the new active fires, producing a workflow task that completes the run. +func (s *timeSkippingReplicationSuite) TestFailoverPreservesAccumulatedSkip() { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + ns := s.createGlobalNamespace() + nsID := s.describeNamespaceID(ctx, ns) + wfID := "ts-repl-failover-" + uuid.NewString() + tq := "ts-repl-failover-tq-" + uuid.NewString() + + const startDelay = time.Hour + runID := s.startSkippingWorkflow(ctx, ns, wfID, tq, 24*time.Hour, startDelay, + &commonpb.TimeSkippingConfig{Enabled: true}) + + // Skip on active; wait for replication to standby. + await.Require(ctx, s.T(), func(t *await.T) { + info := s.getExecutionInfoFromCluster(ctx, 0, nsID, wfID, runID).GetTimeSkippingInfo() + require.NotNil(t, info) + require.Greater(t, info.GetAccumulatedSkippedDuration().AsDuration(), 30*time.Minute) + }, 15*time.Second, 200*time.Millisecond) + s.waitForTimeSkippingInfoSynced(ctx, nsID, wfID, runID) + accumBefore := s.getExecutionInfoFromCluster(ctx, 0, nsID, wfID, runID). + GetTimeSkippingInfo().GetAccumulatedSkippedDuration().AsDuration() + + // Fail over to cluster[1]. cluster[1].InitialFailoverVersion=2. + s.failover(ns, 0, s.clusters[1].ClusterName(), 2) + + // New active (cluster[1]) must preserve accumulated skip in its MS. + afterFailover := s.getExecutionInfoFromCluster(ctx, 1, nsID, wfID, runID).GetTimeSkippingInfo() + s.NotNil(afterFailover, "TimeSkippingInfo must persist on the new active after failover") + s.Equal(accumBefore, afterFailover.GetAccumulatedSkippedDuration().AsDuration(), + "accumulated skip must survive failover") + + // New active must be able to drive forward progress: the regenerated + // WorkflowBackoffTimerTask on cluster[1] should fire (its visibility was + // shifted into near-now by RegenerateTimerTasksForTimeSkipping at replicate + // time), making a workflow task available to complete the run. + tv := testvars.New(s.T()).WithTaskQueue(tq).WithWorkflowID(wfID) + poller := taskpoller.New(s.T(), s.clusters[1].FrontendClient(), ns) + _, err := poller.PollAndHandleWorkflowTask(tv, + func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}, + }, + }}, + }, nil + }) + s.NoError(err, "new active must be able to complete a workflow task post-failover") +} From a2de8d00a60605e6b8d97ac36ae7e056d126aaf4 Mon Sep 17 00:00:00 2001 From: Feiyang Xie Date: Tue, 16 Jun 2026 20:38:06 -0700 Subject: [PATCH 2/3] TS replication: use LastUpdateVersionedTransition to gate timer regen MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stamp TimeSkippingInfo.LastUpdateVersionedTransition on each skip and gate PartialRefresh's timer re-stamp on it, instead of the TaskRegenerationStatus enum. The stamp is a global fact, so it replicates via the generic ExecutionInfo merge — dropping applyIncomingTimeSkippingInfo and the TimerRegenStatus constants. --- api/persistence/v1/executions.pb.go | 250 +++++++++--------- .../api/persistence/v1/executions.proto | 11 +- .../history/workflow/mutable_state_impl.go | 40 +-- .../workflow/mutable_state_impl_test.go | 122 --------- service/history/workflow/task_generator.go | 15 +- .../history/workflow/task_generator_test.go | 42 +-- service/history/workflow/task_refresher.go | 21 ++ .../history/workflow/task_refresher_test.go | 80 ++++++ service/history/workflow/timer_sequence.go | 6 - tests/xdc/timeskipping_replication_test.go | 15 +- 10 files changed, 266 insertions(+), 336 deletions(-) diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index bbb941a5647..03a1da28eac 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -1174,13 +1174,14 @@ type TimeSkippingInfo struct { AccumulatedSkippedDuration *durationpb.Duration `protobuf:"bytes,2,opt,name=accumulated_skipped_duration,json=accumulatedSkippedDuration,proto3" json:"accumulated_skipped_duration,omitempty"` // The current fast-forward info for time skipping. FastForwardInfo *FastForwardInfo `protobuf:"bytes,4,opt,name=fast_forward_info,json=fastForwardInfo,proto3" json:"fast_forward_info,omitempty"` - // Task regeneration status is used to track the status of the task regeneration. - // 0: Not set - // 1: Need to regenerate tasks - // 2: All tasks regenerated - TaskRegenerationStatus int32 `protobuf:"varint,5,opt,name=task_regeneration_status,json=taskRegenerationStatus,proto3" json:"task_regeneration_status,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Versioned transition at which this TimeSkippingInfo was last modified (i.e. when a + // skip transition changed accumulated_skipped_duration). Used by PartialRefresh to detect + // that pending timer tasks must be re-stamped against the new accumulated skip, since a + // skip mutates this workflow-level field without bumping any per-timer + // last_update_versioned_transition. Mirrors the per-entity stamps on TimerInfo/ActivityInfo. + LastUpdateVersionedTransition *VersionedTransition `protobuf:"bytes,5,opt,name=last_update_versioned_transition,json=lastUpdateVersionedTransition,proto3" json:"last_update_versioned_transition,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *TimeSkippingInfo) Reset() { @@ -1234,11 +1235,11 @@ func (x *TimeSkippingInfo) GetFastForwardInfo() *FastForwardInfo { return nil } -func (x *TimeSkippingInfo) GetTaskRegenerationStatus() int32 { +func (x *TimeSkippingInfo) GetLastUpdateVersionedTransition() *VersionedTransition { if x != nil { - return x.TaskRegenerationStatus + return x.LastUpdateVersionedTransition } - return 0 + return nil } type FastForwardInfo struct { @@ -5053,12 +5054,12 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "&ChildrenInitializedPostResetPointEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12H\n" + "\x05value\x18\x02 \x01(\v22.temporal.server.api.persistence.v1.ResetChildInfoR\x05value:\x028\x01B\x1c\n" + - "\x1alast_workflow_task_failureJ\x04\b\b\x10\tJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11J\x04\bp\x10qJ\x04\b,\x10-J\x04\b-\x10.J\x04\b/\x100J\x04\b0\x101J\x04\b1\x102J\x04\b2\x103\"\xf4\x02\n" + + "\x1alast_workflow_task_failureJ\x04\b\b\x10\tJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11J\x04\bp\x10qJ\x04\b,\x10-J\x04\b-\x10.J\x04\b/\x100J\x04\b0\x101J\x04\b1\x102J\x04\b2\x103\"\xbd\x03\n" + "\x10TimeSkippingInfo\x12B\n" + "\x06config\x18\x01 \x01(\v2*.temporal.api.common.v1.TimeSkippingConfigR\x06config\x12[\n" + "\x1caccumulated_skipped_duration\x18\x02 \x01(\v2\x19.google.protobuf.DurationR\x1aaccumulatedSkippedDuration\x12_\n" + - "\x11fast_forward_info\x18\x04 \x01(\v23.temporal.server.api.persistence.v1.FastForwardInfoR\x0ffastForwardInfo\x128\n" + - "\x18task_regeneration_status\x18\x05 \x01(\x05R\x16taskRegenerationStatusJ\x04\b\x03\x10\x04R\x1ecurrent_elapsed_duration_bound\"\x97\x01\n" + + "\x11fast_forward_info\x18\x04 \x01(\v23.temporal.server.api.persistence.v1.FastForwardInfoR\x0ffastForwardInfo\x12\x80\x01\n" + + " last_update_versioned_transition\x18\x05 \x01(\v27.temporal.server.api.persistence.v1.VersionedTransitionR\x1dlastUpdateVersionedTransitionJ\x04\b\x03\x10\x04R\x1ecurrent_elapsed_duration_bound\"\x97\x01\n" + "\x0fFastForwardInfo\x12;\n" + "\vtarget_time\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\n" + "targetTime\x12\x1f\n" + @@ -5558,117 +5559,118 @@ var file_temporal_server_api_persistence_v1_executions_proto_depIdxs = []int32{ 64, // 48: temporal.server.api.persistence.v1.TimeSkippingInfo.config:type_name -> temporal.api.common.v1.TimeSkippingConfig 48, // 49: temporal.server.api.persistence.v1.TimeSkippingInfo.accumulated_skipped_duration:type_name -> google.protobuf.Duration 3, // 50: temporal.server.api.persistence.v1.TimeSkippingInfo.fast_forward_info:type_name -> temporal.server.api.persistence.v1.FastForwardInfo - 47, // 51: temporal.server.api.persistence.v1.FastForwardInfo.target_time:type_name -> google.protobuf.Timestamp - 65, // 52: temporal.server.api.persistence.v1.LastNotifiedTargetVersion.deployment_version:type_name -> temporal.api.deployment.v1.WorkerDeploymentVersion - 66, // 53: temporal.server.api.persistence.v1.WorkflowExecutionState.state:type_name -> temporal.server.api.enums.v1.WorkflowExecutionState - 67, // 54: temporal.server.api.persistence.v1.WorkflowExecutionState.status:type_name -> temporal.api.enums.v1.WorkflowExecutionStatus - 56, // 55: temporal.server.api.persistence.v1.WorkflowExecutionState.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition - 47, // 56: temporal.server.api.persistence.v1.WorkflowExecutionState.start_time:type_name -> google.protobuf.Timestamp - 37, // 57: temporal.server.api.persistence.v1.WorkflowExecutionState.request_ids:type_name -> temporal.server.api.persistence.v1.WorkflowExecutionState.RequestIdsEntry - 68, // 58: temporal.server.api.persistence.v1.RequestIDInfo.event_type:type_name -> temporal.api.enums.v1.EventType - 69, // 59: temporal.server.api.persistence.v1.TransferTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType - 47, // 60: temporal.server.api.persistence.v1.TransferTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp - 38, // 61: temporal.server.api.persistence.v1.TransferTaskInfo.close_execution_task_details:type_name -> temporal.server.api.persistence.v1.TransferTaskInfo.CloseExecutionTaskDetails - 70, // 62: temporal.server.api.persistence.v1.TransferTaskInfo.chasm_task_info:type_name -> temporal.server.api.persistence.v1.ChasmTaskInfo - 69, // 63: temporal.server.api.persistence.v1.ReplicationTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType - 47, // 64: temporal.server.api.persistence.v1.ReplicationTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp - 71, // 65: temporal.server.api.persistence.v1.ReplicationTaskInfo.priority:type_name -> temporal.server.api.enums.v1.TaskPriority - 56, // 66: temporal.server.api.persistence.v1.ReplicationTaskInfo.versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition - 9, // 67: temporal.server.api.persistence.v1.ReplicationTaskInfo.task_equivalents:type_name -> temporal.server.api.persistence.v1.ReplicationTaskInfo - 72, // 68: temporal.server.api.persistence.v1.ReplicationTaskInfo.last_version_history_item:type_name -> temporal.server.api.history.v1.VersionHistoryItem - 69, // 69: temporal.server.api.persistence.v1.VisibilityTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType - 47, // 70: temporal.server.api.persistence.v1.VisibilityTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp - 47, // 71: temporal.server.api.persistence.v1.VisibilityTaskInfo.close_time:type_name -> google.protobuf.Timestamp - 47, // 72: temporal.server.api.persistence.v1.VisibilityTaskInfo.start_time:type_name -> google.protobuf.Timestamp - 70, // 73: temporal.server.api.persistence.v1.VisibilityTaskInfo.chasm_task_info:type_name -> temporal.server.api.persistence.v1.ChasmTaskInfo - 69, // 74: temporal.server.api.persistence.v1.TimerTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType - 62, // 75: temporal.server.api.persistence.v1.TimerTaskInfo.timeout_type:type_name -> temporal.api.enums.v1.TimeoutType - 73, // 76: temporal.server.api.persistence.v1.TimerTaskInfo.workflow_backoff_type:type_name -> temporal.server.api.enums.v1.WorkflowBackoffType - 47, // 77: temporal.server.api.persistence.v1.TimerTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp - 70, // 78: temporal.server.api.persistence.v1.TimerTaskInfo.chasm_task_info:type_name -> temporal.server.api.persistence.v1.ChasmTaskInfo - 69, // 79: temporal.server.api.persistence.v1.ArchivalTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType - 47, // 80: temporal.server.api.persistence.v1.ArchivalTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp - 69, // 81: temporal.server.api.persistence.v1.OutboundTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType - 47, // 82: temporal.server.api.persistence.v1.OutboundTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp - 74, // 83: temporal.server.api.persistence.v1.OutboundTaskInfo.state_machine_info:type_name -> temporal.server.api.persistence.v1.StateMachineTaskInfo - 70, // 84: temporal.server.api.persistence.v1.OutboundTaskInfo.chasm_task_info:type_name -> temporal.server.api.persistence.v1.ChasmTaskInfo - 14, // 85: temporal.server.api.persistence.v1.OutboundTaskInfo.worker_commands_task:type_name -> temporal.server.api.persistence.v1.WorkerCommandsTask - 75, // 86: temporal.server.api.persistence.v1.WorkerCommandsTask.commands:type_name -> temporal.api.worker.v1.WorkerCommand - 47, // 87: temporal.server.api.persistence.v1.ActivityInfo.scheduled_time:type_name -> google.protobuf.Timestamp - 47, // 88: temporal.server.api.persistence.v1.ActivityInfo.started_time:type_name -> google.protobuf.Timestamp - 48, // 89: temporal.server.api.persistence.v1.ActivityInfo.schedule_to_start_timeout:type_name -> google.protobuf.Duration - 48, // 90: temporal.server.api.persistence.v1.ActivityInfo.schedule_to_close_timeout:type_name -> google.protobuf.Duration - 48, // 91: temporal.server.api.persistence.v1.ActivityInfo.start_to_close_timeout:type_name -> google.protobuf.Duration - 48, // 92: temporal.server.api.persistence.v1.ActivityInfo.heartbeat_timeout:type_name -> google.protobuf.Duration - 48, // 93: temporal.server.api.persistence.v1.ActivityInfo.retry_initial_interval:type_name -> google.protobuf.Duration - 48, // 94: temporal.server.api.persistence.v1.ActivityInfo.retry_maximum_interval:type_name -> google.protobuf.Duration - 47, // 95: temporal.server.api.persistence.v1.ActivityInfo.retry_expiration_time:type_name -> google.protobuf.Timestamp - 76, // 96: temporal.server.api.persistence.v1.ActivityInfo.retry_last_failure:type_name -> temporal.api.failure.v1.Failure - 77, // 97: temporal.server.api.persistence.v1.ActivityInfo.last_heartbeat_details:type_name -> temporal.api.common.v1.Payloads - 47, // 98: temporal.server.api.persistence.v1.ActivityInfo.last_heartbeat_update_time:type_name -> google.protobuf.Timestamp - 78, // 99: temporal.server.api.persistence.v1.ActivityInfo.activity_type:type_name -> temporal.api.common.v1.ActivityType - 39, // 100: temporal.server.api.persistence.v1.ActivityInfo.use_workflow_build_id_info:type_name -> temporal.server.api.persistence.v1.ActivityInfo.UseWorkflowBuildIdInfo - 55, // 101: temporal.server.api.persistence.v1.ActivityInfo.last_worker_version_stamp:type_name -> temporal.api.common.v1.WorkerVersionStamp - 56, // 102: temporal.server.api.persistence.v1.ActivityInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition - 47, // 103: temporal.server.api.persistence.v1.ActivityInfo.first_scheduled_time:type_name -> google.protobuf.Timestamp - 47, // 104: temporal.server.api.persistence.v1.ActivityInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp - 79, // 105: temporal.server.api.persistence.v1.ActivityInfo.last_started_deployment:type_name -> temporal.api.deployment.v1.Deployment - 65, // 106: temporal.server.api.persistence.v1.ActivityInfo.last_deployment_version:type_name -> temporal.api.deployment.v1.WorkerDeploymentVersion - 60, // 107: temporal.server.api.persistence.v1.ActivityInfo.priority:type_name -> temporal.api.common.v1.Priority - 40, // 108: temporal.server.api.persistence.v1.ActivityInfo.pause_info:type_name -> temporal.server.api.persistence.v1.ActivityInfo.PauseInfo - 53, // 109: temporal.server.api.persistence.v1.ActivityInfo.started_clock:type_name -> temporal.server.api.clock.v1.VectorClock - 47, // 110: temporal.server.api.persistence.v1.TimerInfo.expiry_time:type_name -> google.protobuf.Timestamp - 56, // 111: temporal.server.api.persistence.v1.TimerInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition - 80, // 112: temporal.server.api.persistence.v1.ChildExecutionInfo.parent_close_policy:type_name -> temporal.api.enums.v1.ParentClosePolicy - 53, // 113: temporal.server.api.persistence.v1.ChildExecutionInfo.clock:type_name -> temporal.server.api.clock.v1.VectorClock - 56, // 114: temporal.server.api.persistence.v1.ChildExecutionInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition - 60, // 115: temporal.server.api.persistence.v1.ChildExecutionInfo.priority:type_name -> temporal.api.common.v1.Priority - 56, // 116: temporal.server.api.persistence.v1.RequestCancelInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition - 56, // 117: temporal.server.api.persistence.v1.SignalInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition - 81, // 118: temporal.server.api.persistence.v1.Checksum.flavor:type_name -> temporal.server.api.enums.v1.ChecksumFlavor - 42, // 119: temporal.server.api.persistence.v1.Callback.nexus:type_name -> temporal.server.api.persistence.v1.Callback.Nexus - 43, // 120: temporal.server.api.persistence.v1.Callback.hsm:type_name -> temporal.server.api.persistence.v1.Callback.HSM - 82, // 121: temporal.server.api.persistence.v1.Callback.links:type_name -> temporal.api.common.v1.Link - 83, // 122: temporal.server.api.persistence.v1.HSMCompletionCallbackArg.last_event:type_name -> temporal.api.history.v1.HistoryEvent - 23, // 123: temporal.server.api.persistence.v1.CallbackInfo.callback:type_name -> temporal.server.api.persistence.v1.Callback - 46, // 124: temporal.server.api.persistence.v1.CallbackInfo.trigger:type_name -> temporal.server.api.persistence.v1.CallbackInfo.Trigger - 47, // 125: temporal.server.api.persistence.v1.CallbackInfo.registration_time:type_name -> google.protobuf.Timestamp - 84, // 126: temporal.server.api.persistence.v1.CallbackInfo.state:type_name -> temporal.server.api.enums.v1.CallbackState - 47, // 127: temporal.server.api.persistence.v1.CallbackInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp - 76, // 128: temporal.server.api.persistence.v1.CallbackInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure - 47, // 129: temporal.server.api.persistence.v1.CallbackInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp - 48, // 130: temporal.server.api.persistence.v1.NexusOperationInfo.schedule_to_close_timeout:type_name -> google.protobuf.Duration - 47, // 131: temporal.server.api.persistence.v1.NexusOperationInfo.scheduled_time:type_name -> google.protobuf.Timestamp - 85, // 132: temporal.server.api.persistence.v1.NexusOperationInfo.state:type_name -> temporal.server.api.enums.v1.NexusOperationState - 47, // 133: temporal.server.api.persistence.v1.NexusOperationInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp - 76, // 134: temporal.server.api.persistence.v1.NexusOperationInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure - 47, // 135: temporal.server.api.persistence.v1.NexusOperationInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp - 48, // 136: temporal.server.api.persistence.v1.NexusOperationInfo.schedule_to_start_timeout:type_name -> google.protobuf.Duration - 48, // 137: temporal.server.api.persistence.v1.NexusOperationInfo.start_to_close_timeout:type_name -> google.protobuf.Duration - 47, // 138: temporal.server.api.persistence.v1.NexusOperationInfo.started_time:type_name -> google.protobuf.Timestamp - 47, // 139: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.requested_time:type_name -> google.protobuf.Timestamp - 86, // 140: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.state:type_name -> temporal.api.enums.v1.NexusOperationCancellationState - 47, // 141: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp - 76, // 142: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure - 47, // 143: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp - 47, // 144: temporal.server.api.persistence.v1.WorkflowPauseInfo.pause_time:type_name -> google.protobuf.Timestamp - 87, // 145: temporal.server.api.persistence.v1.ShardInfo.QueueStatesEntry.value:type_name -> temporal.server.api.persistence.v1.QueueState - 88, // 146: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SearchAttributesEntry.value:type_name -> temporal.api.common.v1.Payload - 88, // 147: temporal.server.api.persistence.v1.WorkflowExecutionInfo.MemoEntry.value:type_name -> temporal.api.common.v1.Payload - 89, // 148: temporal.server.api.persistence.v1.WorkflowExecutionInfo.UpdateInfosEntry.value:type_name -> temporal.server.api.persistence.v1.UpdateInfo - 90, // 149: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SubStateMachinesByTypeEntry.value:type_name -> temporal.server.api.persistence.v1.StateMachineMap - 28, // 150: temporal.server.api.persistence.v1.WorkflowExecutionInfo.ChildrenInitializedPostResetPointEntry.value:type_name -> temporal.server.api.persistence.v1.ResetChildInfo - 7, // 151: temporal.server.api.persistence.v1.WorkflowExecutionState.RequestIdsEntry.value:type_name -> temporal.server.api.persistence.v1.RequestIDInfo - 47, // 152: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.pause_time:type_name -> google.protobuf.Timestamp - 41, // 153: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.manual:type_name -> temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.Manual - 44, // 154: temporal.server.api.persistence.v1.Callback.Nexus.header:type_name -> temporal.server.api.persistence.v1.Callback.Nexus.HeaderEntry - 91, // 155: temporal.server.api.persistence.v1.Callback.HSM.ref:type_name -> temporal.server.api.persistence.v1.StateMachineRef - 45, // 156: temporal.server.api.persistence.v1.CallbackInfo.Trigger.workflow_closed:type_name -> temporal.server.api.persistence.v1.CallbackInfo.WorkflowClosed - 157, // [157:157] is the sub-list for method output_type - 157, // [157:157] is the sub-list for method input_type - 157, // [157:157] is the sub-list for extension type_name - 157, // [157:157] is the sub-list for extension extendee - 0, // [0:157] is the sub-list for field type_name + 56, // 51: temporal.server.api.persistence.v1.TimeSkippingInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition + 47, // 52: temporal.server.api.persistence.v1.FastForwardInfo.target_time:type_name -> google.protobuf.Timestamp + 65, // 53: temporal.server.api.persistence.v1.LastNotifiedTargetVersion.deployment_version:type_name -> temporal.api.deployment.v1.WorkerDeploymentVersion + 66, // 54: temporal.server.api.persistence.v1.WorkflowExecutionState.state:type_name -> temporal.server.api.enums.v1.WorkflowExecutionState + 67, // 55: temporal.server.api.persistence.v1.WorkflowExecutionState.status:type_name -> temporal.api.enums.v1.WorkflowExecutionStatus + 56, // 56: temporal.server.api.persistence.v1.WorkflowExecutionState.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition + 47, // 57: temporal.server.api.persistence.v1.WorkflowExecutionState.start_time:type_name -> google.protobuf.Timestamp + 37, // 58: temporal.server.api.persistence.v1.WorkflowExecutionState.request_ids:type_name -> temporal.server.api.persistence.v1.WorkflowExecutionState.RequestIdsEntry + 68, // 59: temporal.server.api.persistence.v1.RequestIDInfo.event_type:type_name -> temporal.api.enums.v1.EventType + 69, // 60: temporal.server.api.persistence.v1.TransferTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType + 47, // 61: temporal.server.api.persistence.v1.TransferTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp + 38, // 62: temporal.server.api.persistence.v1.TransferTaskInfo.close_execution_task_details:type_name -> temporal.server.api.persistence.v1.TransferTaskInfo.CloseExecutionTaskDetails + 70, // 63: temporal.server.api.persistence.v1.TransferTaskInfo.chasm_task_info:type_name -> temporal.server.api.persistence.v1.ChasmTaskInfo + 69, // 64: temporal.server.api.persistence.v1.ReplicationTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType + 47, // 65: temporal.server.api.persistence.v1.ReplicationTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp + 71, // 66: temporal.server.api.persistence.v1.ReplicationTaskInfo.priority:type_name -> temporal.server.api.enums.v1.TaskPriority + 56, // 67: temporal.server.api.persistence.v1.ReplicationTaskInfo.versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition + 9, // 68: temporal.server.api.persistence.v1.ReplicationTaskInfo.task_equivalents:type_name -> temporal.server.api.persistence.v1.ReplicationTaskInfo + 72, // 69: temporal.server.api.persistence.v1.ReplicationTaskInfo.last_version_history_item:type_name -> temporal.server.api.history.v1.VersionHistoryItem + 69, // 70: temporal.server.api.persistence.v1.VisibilityTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType + 47, // 71: temporal.server.api.persistence.v1.VisibilityTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp + 47, // 72: temporal.server.api.persistence.v1.VisibilityTaskInfo.close_time:type_name -> google.protobuf.Timestamp + 47, // 73: temporal.server.api.persistence.v1.VisibilityTaskInfo.start_time:type_name -> google.protobuf.Timestamp + 70, // 74: temporal.server.api.persistence.v1.VisibilityTaskInfo.chasm_task_info:type_name -> temporal.server.api.persistence.v1.ChasmTaskInfo + 69, // 75: temporal.server.api.persistence.v1.TimerTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType + 62, // 76: temporal.server.api.persistence.v1.TimerTaskInfo.timeout_type:type_name -> temporal.api.enums.v1.TimeoutType + 73, // 77: temporal.server.api.persistence.v1.TimerTaskInfo.workflow_backoff_type:type_name -> temporal.server.api.enums.v1.WorkflowBackoffType + 47, // 78: temporal.server.api.persistence.v1.TimerTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp + 70, // 79: temporal.server.api.persistence.v1.TimerTaskInfo.chasm_task_info:type_name -> temporal.server.api.persistence.v1.ChasmTaskInfo + 69, // 80: temporal.server.api.persistence.v1.ArchivalTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType + 47, // 81: temporal.server.api.persistence.v1.ArchivalTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp + 69, // 82: temporal.server.api.persistence.v1.OutboundTaskInfo.task_type:type_name -> temporal.server.api.enums.v1.TaskType + 47, // 83: temporal.server.api.persistence.v1.OutboundTaskInfo.visibility_time:type_name -> google.protobuf.Timestamp + 74, // 84: temporal.server.api.persistence.v1.OutboundTaskInfo.state_machine_info:type_name -> temporal.server.api.persistence.v1.StateMachineTaskInfo + 70, // 85: temporal.server.api.persistence.v1.OutboundTaskInfo.chasm_task_info:type_name -> temporal.server.api.persistence.v1.ChasmTaskInfo + 14, // 86: temporal.server.api.persistence.v1.OutboundTaskInfo.worker_commands_task:type_name -> temporal.server.api.persistence.v1.WorkerCommandsTask + 75, // 87: temporal.server.api.persistence.v1.WorkerCommandsTask.commands:type_name -> temporal.api.worker.v1.WorkerCommand + 47, // 88: temporal.server.api.persistence.v1.ActivityInfo.scheduled_time:type_name -> google.protobuf.Timestamp + 47, // 89: temporal.server.api.persistence.v1.ActivityInfo.started_time:type_name -> google.protobuf.Timestamp + 48, // 90: temporal.server.api.persistence.v1.ActivityInfo.schedule_to_start_timeout:type_name -> google.protobuf.Duration + 48, // 91: temporal.server.api.persistence.v1.ActivityInfo.schedule_to_close_timeout:type_name -> google.protobuf.Duration + 48, // 92: temporal.server.api.persistence.v1.ActivityInfo.start_to_close_timeout:type_name -> google.protobuf.Duration + 48, // 93: temporal.server.api.persistence.v1.ActivityInfo.heartbeat_timeout:type_name -> google.protobuf.Duration + 48, // 94: temporal.server.api.persistence.v1.ActivityInfo.retry_initial_interval:type_name -> google.protobuf.Duration + 48, // 95: temporal.server.api.persistence.v1.ActivityInfo.retry_maximum_interval:type_name -> google.protobuf.Duration + 47, // 96: temporal.server.api.persistence.v1.ActivityInfo.retry_expiration_time:type_name -> google.protobuf.Timestamp + 76, // 97: temporal.server.api.persistence.v1.ActivityInfo.retry_last_failure:type_name -> temporal.api.failure.v1.Failure + 77, // 98: temporal.server.api.persistence.v1.ActivityInfo.last_heartbeat_details:type_name -> temporal.api.common.v1.Payloads + 47, // 99: temporal.server.api.persistence.v1.ActivityInfo.last_heartbeat_update_time:type_name -> google.protobuf.Timestamp + 78, // 100: temporal.server.api.persistence.v1.ActivityInfo.activity_type:type_name -> temporal.api.common.v1.ActivityType + 39, // 101: temporal.server.api.persistence.v1.ActivityInfo.use_workflow_build_id_info:type_name -> temporal.server.api.persistence.v1.ActivityInfo.UseWorkflowBuildIdInfo + 55, // 102: temporal.server.api.persistence.v1.ActivityInfo.last_worker_version_stamp:type_name -> temporal.api.common.v1.WorkerVersionStamp + 56, // 103: temporal.server.api.persistence.v1.ActivityInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition + 47, // 104: temporal.server.api.persistence.v1.ActivityInfo.first_scheduled_time:type_name -> google.protobuf.Timestamp + 47, // 105: temporal.server.api.persistence.v1.ActivityInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp + 79, // 106: temporal.server.api.persistence.v1.ActivityInfo.last_started_deployment:type_name -> temporal.api.deployment.v1.Deployment + 65, // 107: temporal.server.api.persistence.v1.ActivityInfo.last_deployment_version:type_name -> temporal.api.deployment.v1.WorkerDeploymentVersion + 60, // 108: temporal.server.api.persistence.v1.ActivityInfo.priority:type_name -> temporal.api.common.v1.Priority + 40, // 109: temporal.server.api.persistence.v1.ActivityInfo.pause_info:type_name -> temporal.server.api.persistence.v1.ActivityInfo.PauseInfo + 53, // 110: temporal.server.api.persistence.v1.ActivityInfo.started_clock:type_name -> temporal.server.api.clock.v1.VectorClock + 47, // 111: temporal.server.api.persistence.v1.TimerInfo.expiry_time:type_name -> google.protobuf.Timestamp + 56, // 112: temporal.server.api.persistence.v1.TimerInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition + 80, // 113: temporal.server.api.persistence.v1.ChildExecutionInfo.parent_close_policy:type_name -> temporal.api.enums.v1.ParentClosePolicy + 53, // 114: temporal.server.api.persistence.v1.ChildExecutionInfo.clock:type_name -> temporal.server.api.clock.v1.VectorClock + 56, // 115: temporal.server.api.persistence.v1.ChildExecutionInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition + 60, // 116: temporal.server.api.persistence.v1.ChildExecutionInfo.priority:type_name -> temporal.api.common.v1.Priority + 56, // 117: temporal.server.api.persistence.v1.RequestCancelInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition + 56, // 118: temporal.server.api.persistence.v1.SignalInfo.last_update_versioned_transition:type_name -> temporal.server.api.persistence.v1.VersionedTransition + 81, // 119: temporal.server.api.persistence.v1.Checksum.flavor:type_name -> temporal.server.api.enums.v1.ChecksumFlavor + 42, // 120: temporal.server.api.persistence.v1.Callback.nexus:type_name -> temporal.server.api.persistence.v1.Callback.Nexus + 43, // 121: temporal.server.api.persistence.v1.Callback.hsm:type_name -> temporal.server.api.persistence.v1.Callback.HSM + 82, // 122: temporal.server.api.persistence.v1.Callback.links:type_name -> temporal.api.common.v1.Link + 83, // 123: temporal.server.api.persistence.v1.HSMCompletionCallbackArg.last_event:type_name -> temporal.api.history.v1.HistoryEvent + 23, // 124: temporal.server.api.persistence.v1.CallbackInfo.callback:type_name -> temporal.server.api.persistence.v1.Callback + 46, // 125: temporal.server.api.persistence.v1.CallbackInfo.trigger:type_name -> temporal.server.api.persistence.v1.CallbackInfo.Trigger + 47, // 126: temporal.server.api.persistence.v1.CallbackInfo.registration_time:type_name -> google.protobuf.Timestamp + 84, // 127: temporal.server.api.persistence.v1.CallbackInfo.state:type_name -> temporal.server.api.enums.v1.CallbackState + 47, // 128: temporal.server.api.persistence.v1.CallbackInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp + 76, // 129: temporal.server.api.persistence.v1.CallbackInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure + 47, // 130: temporal.server.api.persistence.v1.CallbackInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp + 48, // 131: temporal.server.api.persistence.v1.NexusOperationInfo.schedule_to_close_timeout:type_name -> google.protobuf.Duration + 47, // 132: temporal.server.api.persistence.v1.NexusOperationInfo.scheduled_time:type_name -> google.protobuf.Timestamp + 85, // 133: temporal.server.api.persistence.v1.NexusOperationInfo.state:type_name -> temporal.server.api.enums.v1.NexusOperationState + 47, // 134: temporal.server.api.persistence.v1.NexusOperationInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp + 76, // 135: temporal.server.api.persistence.v1.NexusOperationInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure + 47, // 136: temporal.server.api.persistence.v1.NexusOperationInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp + 48, // 137: temporal.server.api.persistence.v1.NexusOperationInfo.schedule_to_start_timeout:type_name -> google.protobuf.Duration + 48, // 138: temporal.server.api.persistence.v1.NexusOperationInfo.start_to_close_timeout:type_name -> google.protobuf.Duration + 47, // 139: temporal.server.api.persistence.v1.NexusOperationInfo.started_time:type_name -> google.protobuf.Timestamp + 47, // 140: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.requested_time:type_name -> google.protobuf.Timestamp + 86, // 141: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.state:type_name -> temporal.api.enums.v1.NexusOperationCancellationState + 47, // 142: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp + 76, // 143: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure + 47, // 144: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp + 47, // 145: temporal.server.api.persistence.v1.WorkflowPauseInfo.pause_time:type_name -> google.protobuf.Timestamp + 87, // 146: temporal.server.api.persistence.v1.ShardInfo.QueueStatesEntry.value:type_name -> temporal.server.api.persistence.v1.QueueState + 88, // 147: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SearchAttributesEntry.value:type_name -> temporal.api.common.v1.Payload + 88, // 148: temporal.server.api.persistence.v1.WorkflowExecutionInfo.MemoEntry.value:type_name -> temporal.api.common.v1.Payload + 89, // 149: temporal.server.api.persistence.v1.WorkflowExecutionInfo.UpdateInfosEntry.value:type_name -> temporal.server.api.persistence.v1.UpdateInfo + 90, // 150: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SubStateMachinesByTypeEntry.value:type_name -> temporal.server.api.persistence.v1.StateMachineMap + 28, // 151: temporal.server.api.persistence.v1.WorkflowExecutionInfo.ChildrenInitializedPostResetPointEntry.value:type_name -> temporal.server.api.persistence.v1.ResetChildInfo + 7, // 152: temporal.server.api.persistence.v1.WorkflowExecutionState.RequestIdsEntry.value:type_name -> temporal.server.api.persistence.v1.RequestIDInfo + 47, // 153: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.pause_time:type_name -> google.protobuf.Timestamp + 41, // 154: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.manual:type_name -> temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.Manual + 44, // 155: temporal.server.api.persistence.v1.Callback.Nexus.header:type_name -> temporal.server.api.persistence.v1.Callback.Nexus.HeaderEntry + 91, // 156: temporal.server.api.persistence.v1.Callback.HSM.ref:type_name -> temporal.server.api.persistence.v1.StateMachineRef + 45, // 157: temporal.server.api.persistence.v1.CallbackInfo.Trigger.workflow_closed:type_name -> temporal.server.api.persistence.v1.CallbackInfo.WorkflowClosed + 158, // [158:158] is the sub-list for method output_type + 158, // [158:158] is the sub-list for method input_type + 158, // [158:158] is the sub-list for extension type_name + 158, // [158:158] is the sub-list for extension extendee + 0, // [0:158] is the sub-list for field type_name } func init() { file_temporal_server_api_persistence_v1_executions_proto_init() } diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 4cbe186daac..753331192e3 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -334,11 +334,12 @@ message TimeSkippingInfo { // The current fast-forward info for time skipping. FastForwardInfo fast_forward_info = 4; - // Task regeneration status is used to track the status of the task regeneration. - // 0: Not set - // 1: Need to regenerate tasks - // 2: All tasks regenerated - int32 task_regeneration_status = 5; + // Versioned transition at which this TimeSkippingInfo was last modified (i.e. when a + // skip transition changed accumulated_skipped_duration). Used by PartialRefresh to detect + // that pending timer tasks must be re-stamped against the new accumulated skip, since a + // skip mutates this workflow-level field without bumping any per-timer + // last_update_versioned_transition. Mirrors the per-entity stamps on TimerInfo/ActivityInfo. + VersionedTransition last_update_versioned_transition = 5; } message FastForwardInfo { diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index e6136dba5c8..51f9cae455d 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4178,7 +4178,6 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionTimeSkippingTransitionedEvent( asd := tsi.GetAccumulatedSkippedDuration().AsDuration() asd += attr.TargetTime.AsTime().Sub(event.GetEventTime().AsTime()) tsi.AccumulatedSkippedDuration = durationpb.New(asd) - tsi.TaskRegenerationStatus = TimerRegenStatusNeeded } tsi.Config.Enabled = !attr.GetDisabledAfterFastForward() if attr.GetDisabledAfterFastForward() && tsi.GetFastForwardInfo() != nil { @@ -7920,6 +7919,13 @@ func (ms *MutableStateImpl) closeTransactionTrackLastUpdateVersionedTransition( ms.executionState.LastUpdateVersionedTransition = currentVersionedTransition } + // A time-skipping transition mutates only executionInfo.TimeSkippingInfo (a workflow-level + // field), not any per-timer entity, so stamp the change here. PartialRefresh uses this to + // know that all pending timer tasks must be re-stamped against the new accumulated skip. + if ms.timeSkippingInfoUpdated && ms.executionInfo.TimeSkippingInfo != nil { + ms.executionInfo.TimeSkippingInfo.LastUpdateVersionedTransition = currentVersionedTransition + } + // LastUpdateVersionTransition for HSM nodes already updated when transitioning the nodes. // LastUpdateVersionTransition for CHASM nodes already updated when closing the chasm tree transaction. } @@ -9463,7 +9469,6 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx ms.workflowTaskUpdated = false } } - ms.applyIncomingTimeSkippingInfo(current, incoming) doNotSync := func(v any) []any { info, ok := v.(*persistencespb.WorkflowExecutionInfo) @@ -9498,7 +9503,6 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx &info.StateMachineTimers, &info.TaskGenerationShardClockTimestamp, &info.UpdateInfos, - &info.TimeSkippingInfo, } if !isSnapshot { ignoreFields = append(ignoreFields, &info.SubStateMachineTombstoneBatches) @@ -9515,36 +9519,6 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx return nil } -// applyIncomingTimeSkippingInfo is used in state-based replication -// and merges the incoming TimeSkippingInfo (from a replication mutation/snapshot) into current, -// and the task regeneration status is maintained locally. -func (ms *MutableStateImpl) applyIncomingTimeSkippingInfo( - current *persistencespb.WorkflowExecutionInfo, - incoming *persistencespb.WorkflowExecutionInfo, -) { - if incoming.GetTimeSkippingInfo() == nil { - // State-based replication sends the full ExecutionInfo; nil incoming TSI - // means the active has no TSI (workflow never enabled time-skipping), so - // mirror that on the standby. - current.TimeSkippingInfo = nil - return - } - newTSI := common.CloneProto(incoming.TimeSkippingInfo) - currentSD := current.GetTimeSkippingInfo().GetAccumulatedSkippedDuration().AsDuration() - incomingSD := incoming.GetTimeSkippingInfo().GetAccumulatedSkippedDuration().AsDuration() - // maintain the local task regeneration status - if incomingSD != currentSD { - newTSI.TaskRegenerationStatus = TimerRegenStatusNeeded - } else { - if incomingSD == 0 { - newTSI.TaskRegenerationStatus = TimerRegenStatusUnset - } else { - newTSI.TaskRegenerationStatus = TimerRegenStatusCompleted - } - } - current.TimeSkippingInfo = newTSI -} - func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*persistencespb.StateMachineMap) error { // check if there is node been deleted currentHSM := ms.HSM() diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 6dd945c0dec..103ab1400ca 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -9054,125 +9054,3 @@ func (s *mutableStateSuite) TestAddContinueAsNewEvent_CompletionEventBatchID() { s.NoError(err) s.Equal(event.GetEventId(), s.mutableState.GetExecutionInfo().CompletionEventBatchId) } - -// TestApplyIncomingTimeSkippingInfo_RegenStatusFromSkipDelta verifies the merge -// contract: TaskRegenerationStatus is recomputed locally from the -// (currentSD, incomingSD) pair on every merge; the incoming wire value is -// ignored. SD changed -> Needed; SD unchanged at 0 -> Unset; SD unchanged > 0 -// -> Completed (a prior merge set Needed and the subsequent PartialRefresh -// has already regenerated). -func TestApplyIncomingTimeSkippingInfo_RegenStatusFromSkipDelta(t *testing.T) { - t.Parallel() - - ms := &MutableStateImpl{} - - for _, tc := range []struct { - name string - current *persistencespb.TimeSkippingInfo - incoming *persistencespb.TimeSkippingInfo - wantNil bool - wantRegenStatus int32 - }{ - { - name: "incoming nil clears current", - current: &persistencespb.TimeSkippingInfo{TaskRegenerationStatus: TimerRegenStatusCompleted}, - incoming: nil, - wantNil: true, - }, - { - name: "current nil, incoming has skip: Needed", - current: nil, - incoming: &persistencespb.TimeSkippingInfo{ - AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusCompleted, - }, - wantRegenStatus: TimerRegenStatusNeeded, - }, - { - name: "different skipped duration: Needed", - current: &persistencespb.TimeSkippingInfo{ - AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusCompleted, - }, - incoming: &persistencespb.TimeSkippingInfo{ - AccumulatedSkippedDuration: durationpb.New(2 * time.Hour), - TaskRegenerationStatus: TimerRegenStatusCompleted, - }, - wantRegenStatus: TimerRegenStatusNeeded, - }, - { - // Incoming wire status is Needed but result must be Completed, - // proving the wire value is ignored when SD is unchanged. - name: "same non-zero skip: Completed regardless of incoming wire status", - current: &persistencespb.TimeSkippingInfo{ - AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusCompleted, - }, - incoming: &persistencespb.TimeSkippingInfo{ - AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusNeeded, - }, - wantRegenStatus: TimerRegenStatusCompleted, - }, - { - // Incoming wire status is Completed but result must be Unset, - // proving the wire value is ignored when both sides are zero. - name: "both zero skip: Unset regardless of incoming wire status", - current: &persistencespb.TimeSkippingInfo{ - AccumulatedSkippedDuration: durationpb.New(0), - TaskRegenerationStatus: TimerRegenStatusCompleted, - }, - incoming: &persistencespb.TimeSkippingInfo{ - AccumulatedSkippedDuration: durationpb.New(0), - TaskRegenerationStatus: TimerRegenStatusCompleted, - }, - wantRegenStatus: TimerRegenStatusUnset, - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - current := &persistencespb.WorkflowExecutionInfo{TimeSkippingInfo: tc.current} - incoming := &persistencespb.WorkflowExecutionInfo{TimeSkippingInfo: tc.incoming} - - ms.applyIncomingTimeSkippingInfo(current, incoming) - - if tc.wantNil { - require.Nil(t, current.TimeSkippingInfo) - return - } - require.NotNil(t, current.TimeSkippingInfo) - require.Equal(t, tc.wantRegenStatus, current.TimeSkippingInfo.GetTaskRegenerationStatus()) - }) - } -} - -// TestApplyIncomingTimeSkippingInfo_ClonesToBreakAliasing verifies the merge -// clones the incoming TimeSkippingInfo so subsequent local mutations -// (e.g. flipping TaskRegenerationStatus during PartialRefresh) cannot leak back -// into the wire message. -func TestApplyIncomingTimeSkippingInfo_ClonesToBreakAliasing(t *testing.T) { - t.Parallel() - - ms := &MutableStateImpl{} - - incoming := &persistencespb.WorkflowExecutionInfo{ - TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ - AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusCompleted, - }, - } - current := &persistencespb.WorkflowExecutionInfo{} - - ms.applyIncomingTimeSkippingInfo(current, incoming) - - require.NotSame(t, current.TimeSkippingInfo, incoming.TimeSkippingInfo, - "current.TimeSkippingInfo must be a cloned copy, not aliased to incoming") - - // Mutate local; incoming must not change. - current.TimeSkippingInfo.TaskRegenerationStatus = TimerRegenStatusCompleted - require.Equal(t, int32(TimerRegenStatusCompleted), - incoming.TimeSkippingInfo.GetTaskRegenerationStatus(), - "sanity: incoming was already Completed; this assertion is a regression guard if the wire value were ever mutated") -} diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 78ad31ae847..6be28584d79 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -1032,14 +1032,18 @@ func isPathAffectedByDelete(deletePath []hsm.Key, timerPath []*persistencespb.St return true } -// RegenerateTimerTasksForTimeSkipping regenerates the timer tasks for time skipping. -// Idempotent via TimeSkippingInfo.TaskRegenerationStatus: only emits when the status -// is Needed, then flips it to Completed. Safe to call in replication context — mirrors -// the TimerInfo.TaskStatus pattern in CreateNextUserTimer. +// RegenerateTimerTasksForTimeSkipping force re-stamps every pending timer task against the +// current accumulated skip. It is content-idempotent but produces fresh TaskIDs per call. +// +// It needs no per-task dedup status of its own. Callers gate it on whether a skip actually +// happened: the active close transaction only invokes it when a skip transition was emitted +// this transaction (regenerateTimerTasksForTimeSkipping), and PartialRefresh only invokes it +// when TimeSkippingInfo.LastUpdateVersionedTransition falls within the replicated delta (see +// refreshTasksForTimeSkipping). The accumulated-skip guard below is the final no-op shortcut. func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error { tsi := r.mutableState.GetExecutionInfo().GetTimeSkippingInfo() - if tsi.GetTaskRegenerationStatus() != TimerRegenStatusNeeded { + if tsi == nil || tsi.GetAccumulatedSkippedDuration().AsDuration() <= 0 { return nil } @@ -1147,6 +1151,5 @@ func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error { // todo@time-skipping: ChasmTaskPure is not supported yet. - tsi.TaskRegenerationStatus = TimerRegenStatusCompleted return nil } diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index f91cacabf99..b3bc76ff54f 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -1196,7 +1196,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping(t *testing.T) { mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(skippedDuration), - TaskRegenerationStatus: TimerRegenStatusNeeded, }, }).AnyTimes() mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{ @@ -1246,16 +1245,16 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping(t *testing.T) { require.Equal(t, timer2ExpiryTime, byEventID[2].VisibilityTimestamp) } -// TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_MarksStatusAndIsIdempotent -// asserts that a successful regen flips TaskRegenerationStatus from Needed to -// Completed, and that a second back-to-back call is a no-op (mirrors the -// TimerInfo.TaskStatus idempotency in CreateNextUserTimer). -func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_MarksStatusAndIsIdempotent(t *testing.T) { +// TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_ForceRegenerates asserts the +// contract: regen carries no per-task status of its own and force re-stamps on every call +// (callers gate it — the active boolean and PartialRefresh's LastUpdateVersionedTransition +// check). So a second back-to-back call emits the same tasks again, not a no-op. Content is +// identical; only the shard-assigned TaskID would differ. +func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_ForceRegenerates(t *testing.T) { t.Parallel() tsi := &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusNeeded, } ctrl := gomock.NewController(t) @@ -1277,16 +1276,14 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_MarksStatusAndIsI taskGenerator := NewTaskGenerator(nil, mutableState, &configs.Config{}, nil, log.NewTestLogger()) - // First call: emits, flips TaskRegenerationStatus to Completed. + // First call emits. require.NoError(t, taskGenerator.RegenerateTimerTasksForTimeSkipping()) require.Positive(t, emitCount, "first call must emit at least one task") - require.Equal(t, int32(TimerRegenStatusCompleted), tsi.TaskRegenerationStatus, - "TaskRegenerationStatus must be Completed after first call") - // Second call: idempotent — no further emissions. + // Second call force-regenerates: emits again (no status latch suppresses it). emitsAfterFirst := emitCount require.NoError(t, taskGenerator.RegenerateTimerTasksForTimeSkipping()) - require.Equal(t, emitsAfterFirst, emitCount, "second call must be a no-op") + require.Equal(t, 2*emitsAfterFirst, emitCount, "second call must re-emit the same tasks") } func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_EdgeCases(t *testing.T) { @@ -1316,9 +1313,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_EdgeCases(t *test execInfo: &persistencespb.WorkflowExecutionInfo{ TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(time.Hour), - // Unmarked detail required so the regen guard passes; with no - // pending timers, the function still completes but emits nothing. - TaskRegenerationStatus: TimerRegenStatusNeeded, }, }, setupTimers: func(ms *historyi.MockMutableState) { @@ -1326,16 +1320,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_EdgeCases(t *test // AddTasks must not be called — no expectation set. }, }, - { - name: "TaskRegenerationStatus already Completed returns immediately (idempotency)", - execInfo: &persistencespb.WorkflowExecutionInfo{ - TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ - AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusCompleted, - }, - }, - // GetPendingTimerInfos and AddTasks must not be called — no expectations set. - }, } { tc := tc t.Run(tc.name, func(t *testing.T) { @@ -1414,7 +1398,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_ExecutionTimers(t WorkflowRunExpirationTime: tc.runExpirationTime, TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(skippedDuration), - TaskRegenerationStatus: TimerRegenStatusNeeded, }, }).AnyTimes() mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{ @@ -1514,7 +1497,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_FastForwardTimer( SourceEventId: 7, HasReached: false, }, - TaskRegenerationStatus: TimerRegenStatusNeeded, }, wantFastForwardTask: &wantTask{visibilityTimestamp: fastForwardTarget, eventID: 7}, }, @@ -1530,7 +1512,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_FastForwardTimer( SourceEventId: 7, HasReached: true, }, - TaskRegenerationStatus: TimerRegenStatusNeeded, }, }, { @@ -1545,7 +1526,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_FastForwardTimer( SourceEventId: 7, HasReached: false, }, - TaskRegenerationStatus: TimerRegenStatusNeeded, }, }, { @@ -1553,7 +1533,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_FastForwardTimer( tsi: &persistencespb.TimeSkippingInfo{ Config: &commonpb.TimeSkippingConfig{Enabled: true}, AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusNeeded, }, }, } { @@ -1695,7 +1674,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_BackoffTimer(t *t Attempt: tc.attempt, TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusNeeded, }, }).AnyTimes() mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{}).AnyTimes() @@ -1752,7 +1730,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_ActivityRetry(t * mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusNeeded, }, }).AnyTimes() mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{}).AnyTimes() @@ -1821,7 +1798,6 @@ func TestTaskGeneratorImpl_RegenerateTimerTasksForTimeSkipping_BackoffTimer_Star Attempt: 1, TimeSkippingInfo: &persistencespb.TimeSkippingInfo{ AccumulatedSkippedDuration: durationpb.New(time.Hour), - TaskRegenerationStatus: TimerRegenStatusNeeded, }, }).AnyTimes() mutableState.EXPECT().GetPendingTimerInfos().Return(map[string]*persistencespb.TimerInfo{}).AnyTimes() diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 6ff4c59996f..2c64d9f58d6 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -208,6 +208,7 @@ func (r *TaskRefresherImpl) PartialRefresh( return r.refreshTasksForTimeSkipping( mutableState, taskGenerator, + minVersionedTransition, ) } @@ -470,14 +471,34 @@ func (r *TaskRefresherImpl) refreshTasksForTimer( return err } +// refreshTasksForTimeSkipping re-stamps pending timer tasks against the current accumulated +// skip when a time-skipping transition happened within the replicated delta. A skip mutates +// only executionInfo.TimeSkippingInfo, never a per-timer entity, so refreshTasksForTimer's +// per-timer versioned-transition gate cannot see it. We gate instead on TimeSkippingInfo's own +// LastUpdateVersionedTransition: if it advanced at or after minVersionedTransition, the skip is +// new to this peer and every pending timer task must be regenerated with the shifted wall-clock +// VisibilityTimestamp. func (r *TaskRefresherImpl) refreshTasksForTimeSkipping( mutableState historyi.MutableState, taskGenerator TaskGenerator, + minVersionedTransition *persistencespb.VersionedTransition, ) error { executionState := mutableState.GetExecutionState() if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { return nil } + + tsi := mutableState.GetExecutionInfo().GetTimeSkippingInfo() + if tsi == nil { + return nil + } + if transitionhistory.Compare( + tsi.GetLastUpdateVersionedTransition(), + minVersionedTransition, + ) < 0 { + return nil + } + return taskGenerator.RegenerateTimerTasksForTimeSkipping() } diff --git a/service/history/workflow/task_refresher_test.go b/service/history/workflow/task_refresher_test.go index 11bdc928c78..f3bab190f04 100644 --- a/service/history/workflow/task_refresher_test.go +++ b/service/history/workflow/task_refresher_test.go @@ -1434,6 +1434,86 @@ func (s *taskRefresherSuite) TestRefreshSubStateMachineTasks() { s.False(hsmRoot.Dirty()) } +// TestRefreshTasksForTimeSkipping gates the standby's time-skipping timer re-stamp on +// TimeSkippingInfo.LastUpdateVersionedTransition. It covers both invariants in one place: +// regen runs iff a skip happened at/after the watermark (don't miss), and is bounded — +// it does NOT run when the skip predates the delta or there's no TimeSkippingInfo (don't +// re-stamp on every unrelated replication delta). +func (s *taskRefresherSuite) TestRefreshTasksForTimeSkipping() { + tsiAt := func(tc int64) *persistencespb.TimeSkippingInfo { + return &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + LastUpdateVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: common.EmptyVersion, + TransitionCount: tc, + }, + } + } + + for _, tc := range []struct { + name string + tsi *persistencespb.TimeSkippingInfo + minVersionedTransition *persistencespb.VersionedTransition + wantRegen bool + }{ + { + // Skip at transition 5, watermark 3 → new to this peer → re-stamp. + name: "SkipWithinDelta/Regenerates", + tsi: tsiAt(5), + minVersionedTransition: &persistencespb.VersionedTransition{NamespaceFailoverVersion: common.EmptyVersion, TransitionCount: 3}, + wantRegen: true, + }, + { + // Skip at transition 2, watermark 4 → predates the delta → bounded, no re-stamp. + name: "SkipBeforeDelta/DoesNotRegenerate", + tsi: tsiAt(2), + minVersionedTransition: &persistencespb.VersionedTransition{NamespaceFailoverVersion: common.EmptyVersion, TransitionCount: 4}, + wantRegen: false, + }, + { + // Workflow never enabled time-skipping → never re-stamp. + name: "NoTimeSkippingInfo/DoesNotRegenerate", + tsi: nil, + minVersionedTransition: &persistencespb.VersionedTransition{NamespaceFailoverVersion: common.EmptyVersion, TransitionCount: 4}, + wantRegen: false, + }, + } { + s.Run(tc.name, func() { + mutableState, err := NewMutableStateFromDB( + s.mockShard, + s.mockShard.GetEventsCache(), + log.NewTestLogger(), + tests.LocalNamespaceEntry, + &persistencespb.WorkflowMutableState{ + ExecutionInfo: &persistencespb.WorkflowExecutionInfo{ + NamespaceId: tests.NamespaceID.String(), + WorkflowId: tests.WorkflowID, + TimeSkippingInfo: tc.tsi, + }, + ExecutionState: &persistencespb.WorkflowExecutionState{ + RunId: tests.RunID, + State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, + Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + }, + NextEventId: int64(20), + }, + 10, + ) + s.NoError(err) + + // Times asserts the bound directly: 0 means the mock fails if regen is called. + times := 0 + if tc.wantRegen { + times = 1 + } + s.mockTaskGenerator.EXPECT().RegenerateTimerTasksForTimeSkipping().Return(nil).Times(times) + + err = s.taskRefresher.refreshTasksForTimeSkipping(mutableState, s.mockTaskGenerator, tc.minVersionedTransition) + s.NoError(err) + }) + } +} + type mockTaskGeneratorProvider struct { mockTaskGenerator *MockTaskGenerator } diff --git a/service/history/workflow/timer_sequence.go b/service/history/workflow/timer_sequence.go index eb5fad28de0..83c65ccefd4 100644 --- a/service/history/workflow/timer_sequence.go +++ b/service/history/workflow/timer_sequence.go @@ -31,12 +31,6 @@ const ( TimerTaskStatusCreatedHeartbeat ) -const ( - TimerRegenStatusUnset = iota - TimerRegenStatusNeeded - TimerRegenStatusCompleted -) - type ( // TimerSequenceID represent a in mem timer TimerSequenceID struct { diff --git a/tests/xdc/timeskipping_replication_test.go b/tests/xdc/timeskipping_replication_test.go index f2a99bcffa5..7223bb8baac 100644 --- a/tests/xdc/timeskipping_replication_test.go +++ b/tests/xdc/timeskipping_replication_test.go @@ -39,9 +39,10 @@ func (s *timeSkippingReplicationSuite) SetupSuite() { s.dynamicConfigOverrides = map[dynamicconfig.Key]any{ dynamicconfig.TimeSkippingEnabled.Key(): true, } - // Drive the state-based replication path so applyIncomingTimeSkippingInfo runs. - // Without this, events alone replicate TimeSkippingInfo via their handlers, and - // the state-based merge function added by this commit is never exercised. + // Drive the state-based replication path so TimeSkippingInfo replicates via the + // generic ExecutionInfo merge and PartialRefresh re-stamps timer tasks on the standby. + // Without this, events alone replicate TimeSkippingInfo via their handlers, and the + // state-based path is never exercised. s.enableTransitionHistory = true s.logger = log.NewTestLogger() s.setupSuite() @@ -83,8 +84,8 @@ func (s *timeSkippingReplicationSuite) getExecutionInfoFromCluster( } // waitForTimeSkippingInfoSynced blocks until the standby cluster's TimeSkippingInfo -// agrees with the active's on Config and AccumulatedSkippedDuration. TaskRegenerationStatus -// is cluster-local (regen is re-run on standby after replication) and is not asserted. +// agrees with the active's on Config and AccumulatedSkippedDuration. LastUpdateVersionedTransition +// replicates verbatim and drives the standby's PartialRefresh re-stamp; it is not asserted here. func (s *timeSkippingReplicationSuite) waitForTimeSkippingInfoSynced( ctx context.Context, nsID, wfID, runID string, @@ -149,8 +150,8 @@ func (s *timeSkippingReplicationSuite) completeFirstWorkflowTask(ns, wfID, tq st // TestBasicSkipReplicates verifies the core replication contract for time-skipping: // a skip transition applied on the active cluster's mutable state replicates to the -// standby with matching Config and AccumulatedSkippedDuration. TaskRegenerationStatus -// is cluster-local (the standby re-runs regen after replication) and is not asserted. +// standby with matching Config and AccumulatedSkippedDuration. LastUpdateVersionedTransition +// replicates verbatim and drives the standby's PartialRefresh re-stamp; it is not asserted. func (s *timeSkippingReplicationSuite) TestBasicSkipReplicates() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() From 3b97426ebe5bee52ba1c230d984f4c390b36d6fa Mon Sep 17 00:00:00 2001 From: Feiyang Xie Date: Wed, 17 Jun 2026 10:29:23 -0700 Subject: [PATCH 3/3] cleanup code --- service/history/workflow/mutable_state_impl.go | 3 --- service/history/workflow/mutable_state_impl_test.go | 10 +++++++++- service/history/workflow/task_generator.go | 8 ++++---- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 51f9cae455d..d81bc9114d6 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -7919,9 +7919,6 @@ func (ms *MutableStateImpl) closeTransactionTrackLastUpdateVersionedTransition( ms.executionState.LastUpdateVersionedTransition = currentVersionedTransition } - // A time-skipping transition mutates only executionInfo.TimeSkippingInfo (a workflow-level - // field), not any per-timer entity, so stamp the change here. PartialRefresh uses this to - // know that all pending timer tasks must be re-stamped against the new accumulated skip. if ms.timeSkippingInfoUpdated && ms.executionInfo.TimeSkippingInfo != nil { ms.executionInfo.TimeSkippingInfo.LastUpdateVersionedTransition = currentVersionedTransition } diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 103ab1400ca..a517e53ea04 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -7260,7 +7260,15 @@ func (s *mutableStateSuite) TestSetSpeculativeWorkflowTaskTimeoutTask_SubtractsS // SetSpeculativeWorkflowTaskTimeoutTask, and ToRealTime all rely on this; if it panicked or // returned non-zero on a nil chain, every non-time-skipping workflow would break. func (s *mutableStateSuite) TestAccumulatedSkippedDuration_NilSafety() { - s.Run("TimeSkippingInfoNil", func() { + + s.Run("ExecutionInfoNilSafe", func() { + s.mutableState.executionInfo = nil + var got time.Duration + s.NotPanics(func() { got = s.mutableState.accumulatedSkippedDuration() }) + s.Equal(time.Duration(0), got) + }) + + s.Run("TimeSkippingInfoNilSafe", func() { s.mutableState.executionInfo.TimeSkippingInfo = nil var got time.Duration s.NotPanics(func() { got = s.mutableState.accumulatedSkippedDuration() }) diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 6be28584d79..a2a57c5a25b 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -1033,17 +1033,16 @@ func isPathAffectedByDelete(deletePath []hsm.Key, timerPath []*persistencespb.St } // RegenerateTimerTasksForTimeSkipping force re-stamps every pending timer task against the -// current accumulated skip. It is content-idempotent but produces fresh TaskIDs per call. +// current accumulated skip. // // It needs no per-task dedup status of its own. Callers gate it on whether a skip actually // happened: the active close transaction only invokes it when a skip transition was emitted // this transaction (regenerateTimerTasksForTimeSkipping), and PartialRefresh only invokes it // when TimeSkippingInfo.LastUpdateVersionedTransition falls within the replicated delta (see -// refreshTasksForTimeSkipping). The accumulated-skip guard below is the final no-op shortcut. +// refreshTasksForTimeSkipping). func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error { - tsi := r.mutableState.GetExecutionInfo().GetTimeSkippingInfo() - if tsi == nil || tsi.GetAccumulatedSkippedDuration().AsDuration() <= 0 { + if accumulatedSkippedDuration(r.mutableState.GetExecutionInfo()) <= 0 { return nil } @@ -1094,6 +1093,7 @@ func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error { // (3) fast-forward timer — regenerate when configured so its real-time // VisibilityTimestamp tracks the new accumulated skip. + tsi := r.mutableState.GetExecutionInfo().GetTimeSkippingInfo() if tsi.GetConfig().GetEnabled() { fastForward := tsi.GetFastForwardInfo() if fastForward != nil && !fastForward.GetHasReached() {