Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 131 additions & 115 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,18 @@ message TimeSkippingInfo {
temporal.api.common.v1.TimeSkippingConfig config = 1;

// Total skipped duration for the current workflow execution run, including any
./* inherited skipped duration carried over from a preceding execution that started this run. */google.protobuf.Duration accumulated_skipped_duration = 2;
// inherited skipped duration carried over from a preceding execution that started this run.
Comment thread
feiyang3cat marked this conversation as resolved.
google.protobuf.Duration accumulated_skipped_duration = 2;

// The current fast-forward info for time skipping.
FastForwardInfo fast_forward_info = 4;

// Versioned transition at which this TimeSkippingInfo was last modified (i.e. when a
// skip transition changed accumulated_skipped_duration). Used by PartialRefresh to detect
// that pending timer tasks must be re-stamped against the new accumulated skip, since a
// skip mutates this workflow-level field without bumping any per-timer
// last_update_versioned_transition. Mirrors the per-entity stamps on TimerInfo/ActivityInfo.
VersionedTransition last_update_versioned_transition = 5;
}

message FastForwardInfo {
Expand Down
60 changes: 55 additions & 5 deletions service/history/timer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ func (t *timerQueueStandbyTaskExecutor) Execute(
case *tasks.ChasmTask:
err = t.executeChasmSideEffectTimerTask(ctx, task)
case *tasks.TimeSkippingTimerTask:
// todo@time-skipping: replication. The disable-after-fast-forward transition is emitted
// on the active side and will replicate; standby drops the local task.
err = nil
err = t.executeTimeSkippingTimerTask(ctx, task)
default:
err = queueserrors.NewUnprocessableTaskError("unknown task type")
}
Expand Down Expand Up @@ -231,17 +229,53 @@ func (t *timerQueueStandbyTaskExecutor) discardChasmTask(
)
}

Comment thread
feiyang3cat marked this conversation as resolved.
// executeTimeSkippingTimerTask waits on the standby until the active cluster
// replicates the fast-forward transition. If the fast-forward this task was
// generated for is still pending (same source event and not yet reached), the
// task is retried until the discard delay elapses; otherwise it is acked.
func (t *timerQueueStandbyTaskExecutor) executeTimeSkippingTimerTask(
ctx context.Context,
timerTask *tasks.TimeSkippingTimerTask,
) error {
actionFn := func(_ context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) {
if !mutableState.IsWorkflowExecutionRunning() {
return nil, nil
}
tsi := mutableState.GetExecutionInfo().GetTimeSkippingInfo()
ffi := tsi.GetFastForwardInfo()

@feiyang3cat feiyang3cat May 17, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for CGS team:

Not sure if in complicated failover cases (A failover to a slow B and then A is up again as stand-by) there may be legacy timer tasks that have larger event IDs than the one of the current mutable state but even this happens loadMutableStateForTimerTask will filter these tasks and they will never reach executeTimeSkippingTimerTask

	// Validation based on eventID is not good enough as certain operation does not generate events.
	// For example, scheduling transient workflow task, or starting activities that have retry policy.
	//
	// Some tasks don't have an associated eventID (CHASM tasks).
	eventID, eidOk := getEventID(task)
	if !eidOk || eventID < mutableState.GetNextEventID() {
		return mutableState, nil
	}

already added time skipping timer task logic to getEventID in previous PRs

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the source event ID in the skipping info > timer task event ID? Does it mean the timer task is invalid?

@feiyang3cat feiyang3cat May 19, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. yes, it can. if the user sets a new duration limit later, the event ID may be different, and the old timer shall be acked silently so I returned nil, nil for this case. does this make sense? another way is to find the overwritten source event ID in events or keep them in a list in the mutable state, and I was not sure what is the benefits so I used the current simple logic

  2. under current replication mechanism, will it happen under extreme edgy case

  • active A (like next event ID = 10), passive B (catching up to 5)
  • failover -> B as active (like next event ID = 5, and A as passive so A use next ID = 5 again
  • failover back to-> A as active and proceed to event ID = 10 with a new timer, and an older timer when A was active in the first place fired to match the "event ID 10" but totally a different history branch?

// the fast-forward this timer task is associated with is still valid and has not been reached so keep waiting
if ffi != nil && ffi.GetSourceEventId() == timerTask.EventID && !ffi.GetHasReached() {
return &struct{}{}, nil
}
return nil, nil
}

return t.processTimer(
ctx,
timerTask,
actionFn,
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.checkExecutionStillExistsOnSourceBeforeDiscard,
),
)
}

func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask(
ctx context.Context,
timerTask *tasks.UserTimerTask,
) error {
referenceTime := t.Now()
actionFn := func(_ context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) {
if !mutableState.IsWorkflowExecutionRunning() {
// workflow already finished, no need to process the timer
return nil, nil
}

referenceTime := mutableState.Now()

timerSequence := t.getTimerSequence(mutableState)
timerSequenceIDs := timerSequence.LoadAndSortUserTimers()
if len(timerSequenceIDs) > 0 {
Expand All @@ -253,6 +287,10 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask(
return nil, serviceerror.NewInternal(errString)
}

// Use mutableState.Now() as reference time as a mutable state may use virtual time
// which can skip duration and be before the wall clock time.
// And when this happens the timerSequenceID.Timestamp is also virtual time and before the wall clock time,
// while the timerTask.VisibilityTimestamp uses the wall clock time that maps to the virtual time.
if queues.IsTimeExpired(
timerTask,
referenceTime,
Expand Down Expand Up @@ -295,13 +333,14 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
//
// the overall solution is to attempt to generate a new activity timer task whenever the
// task passed in is safe to be throw away.
referenceTime := t.Now()
actionFn := func(ctx context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) {
if !mutableState.IsWorkflowExecutionRunning() {
// workflow already finished, no need to process the timer
return nil, nil
}

referenceTime := mutableState.Now()

timerSequence := t.getTimerSequence(mutableState)
updateMutableState := false
timerSequenceIDs := timerSequence.LoadAndSortActivityTimers()
Expand All @@ -314,6 +353,10 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
return nil, serviceerror.NewInternal(errString)
}

// Use mutableState.Now() as reference time as a mutable state may use virtual time
// which can skip duration and be before the wall clock time.
// And when this happens the timerSequenceID.Timestamp is also virtual time and before the wall clock time,
// while the timerTask.VisibilityTimestamp uses the wall clock time that maps to the virtual time.
if queues.IsTimeExpired(
timerTask,
referenceTime,
Expand All @@ -336,6 +379,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
// created.
isHeartBeatTask := timerTask.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID)

if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask, timerTask.GetVisibilityTime(), mutableState.ToRealTime(heartbeatTimeoutVis)) {
if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil {
return nil, err
Expand Down Expand Up @@ -774,6 +818,12 @@ func (t *timerQueueStandbyTaskExecutor) pushActivity(
)
}

// getCurrentTime returns the shard's wall-clock view of "now" for t.clusterName.
// Must stay wall-clock: it gates standby task-retry timing against VisibilityTime
// (also wall-clock); mutableState.Now() is virtual time and would force-discard
// time-skipping workflows. actionFn closures compare against virtual timestamps,
// so they use mutableState.Now() instead.
//
// TODO: deprecate this function and always use t.Now()
// Only test code sets t.clusterName to be non-current cluster name
// and advance the time by setting calling shardContext.SetCurrentTime.
Expand Down
103 changes: 103 additions & 0 deletions service/history/timer_queue_standby_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2459,6 +2459,109 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteChasmPureTimerTask_Valid
s.ErrorIs(expectedErr, resp.ExecutionErr)
}

// makeTimeSkippingMS builds a running mutable state, snapshots it to a persistence proto,
// and returns the persistence proto plus the workflow key. The caller can mutate the returned
// ExecutionInfo (e.g. set TimeSkippingInfo) before programming GetWorkflowExecution.
func (s *timerQueueStandbyTaskExecutorSuite) makeTimeSkippingMS() (*persistencespb.WorkflowMutableState, definition.WorkflowKey) {
execution := &commonpb.WorkflowExecution{
WorkflowId: "ts-bound-wf-" + uuid.NewString(),
RunId: uuid.NewString(),
}
workflowKey := definition.NewWorkflowKey(s.namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId())

mutableState := workflow.TestGlobalMutableState(
s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId())
event, err := mutableState.AddWorkflowExecutionStartedEvent(
execution,
&historyservice.StartWorkflowExecutionRequest{
Attempt: 1,
NamespaceId: s.namespaceID.String(),
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
WorkflowType: &commonpb.WorkflowType{Name: "test-wf-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "test-tq"},
WorkflowRunTimeout: durationpb.New(200 * time.Second),
WorkflowTaskTimeout: durationpb.New(1 * time.Second),
},
},
)
s.NoError(err)

pms := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
return pms, workflowKey
}

// makeTimeSkippingPendingMS builds an MS that puts the standby's action function on
// the "still waiting" path: fast-forward matches the task's source event and HasReached=false.
func (s *timerQueueStandbyTaskExecutorSuite) makeTimeSkippingPendingMS() (*persistencespb.WorkflowMutableState, definition.WorkflowKey) {
pms, workflowKey := s.makeTimeSkippingMS()
pms.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{
Config: &commonpb.TimeSkippingConfig{
Enabled: true,
FastForward: durationpb.New(time.Hour),
},
FastForwardInfo: &persistencespb.FastForwardInfo{
TargetTime: timestamppb.New(s.now.Add(time.Hour)),
SourceEventId: 1,
},
}
return pms, workflowKey
}

func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteTimeSkippingTimerTask_Wait() {
pms, workflowKey := s.makeTimeSkippingPendingMS()

timerTask := &tasks.TimeSkippingTimerTask{
WorkflowKey: workflowKey,
TaskID: s.mustGenerateTaskID(),
VisibilityTimestamp: s.now,
EventID: 1,
}
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
Return(&persistence.GetWorkflowExecutionResponse{State: pms}, nil)

s.mockShard.SetCurrentTime(s.clusterName, s.now)
resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask))
s.Equal(consts.ErrTaskRetry, resp.ExecutionErr)
}

func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteTimeSkippingTimerTask_Ack() {
// HasReached=true: active side already replicated the disable transition,
// so the standby's action function returns nil and the task is acked.
pms, workflowKey := s.makeTimeSkippingPendingMS()
pms.ExecutionInfo.TimeSkippingInfo.FastForwardInfo.HasReached = true

timerTask := &tasks.TimeSkippingTimerTask{
WorkflowKey: workflowKey,
TaskID: s.mustGenerateTaskID(),
VisibilityTimestamp: s.now.Add(time.Hour),
EventID: 1,
}
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
Return(&persistence.GetWorkflowExecutionResponse{State: pms}, nil)

s.mockShard.SetCurrentTime(s.clusterName, s.now)
resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask))
s.NoError(resp.ExecutionErr)
}

func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteTimeSkippingTimerTask_Discard() {
pms, workflowKey := s.makeTimeSkippingPendingMS()

timerTask := &tasks.TimeSkippingTimerTask{
WorkflowKey: workflowKey,
TaskID: s.mustGenerateTaskID(),
VisibilityTimestamp: s.now,
EventID: 1,
}
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
Return(&persistence.GetWorkflowExecutionResponse{State: pms}, nil)

// Past VisibilityTime + discardDelay: ErrTaskDiscarded.
s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.discardDuration))
resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask))
s.Equal(consts.ErrTaskDiscarded, resp.ExecutionErr)
}

func (s *timerQueueStandbyTaskExecutorSuite) createPersistenceMutableState(
ms historyi.MutableState,
lastEventID int64,
Expand Down
31 changes: 16 additions & 15 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4163,32 +4163,26 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionTimeSkippingTransitionedEvent(

attr := event.GetWorkflowExecutionTimeSkippingTransitionedEventAttributes()
tsi := ms.executionInfo.GetTimeSkippingInfo()
opTag := tag.WorkflowActionWorkflowExecutionTimeSkippingTransitioned

if tsi == nil {
return serviceerror.NewInternal(
"TimeSkippingInfo is not set when applying WorkflowExecutionTimeSkippingTransitionedEvent, mutable state is corrupted",
)
ms.logError("TimeSkippingTransitionedEvent failed to apply: TimeSkippingInfo is nil", opTag)
return serviceerror.NewInternal("TimeSkippingTransitionedEvent failed to apply")
}
if attr.TargetTime == nil && !attr.GetDisabledAfterFastForward() {
return serviceerror.NewInternal(
"empty WorkflowExecutionTimeSkippingTransitionedEvent found, event is corrupted",
)
if timeNotSet(attr.TargetTime) && !attr.GetDisabledAfterFastForward() {
ms.logError("TimeSkippingTransitionedEvent failed to apply: no TargetTime and not disabled after fast forward", opTag)
return serviceerror.NewInternal("TimeSkippingTransitionedEvent failed to apply")
}

if tsi.GetAccumulatedSkippedDuration() == nil {
tsi.AccumulatedSkippedDuration = durationpb.New(0)
}
accumulatedSkippedDuration := tsi.GetAccumulatedSkippedDuration().AsDuration()
if !timeNotSet(attr.TargetTime) {
accumulatedSkippedDuration += attr.TargetTime.AsTime().Sub(event.GetEventTime().AsTime())
asd := tsi.GetAccumulatedSkippedDuration().AsDuration()
asd += attr.TargetTime.AsTime().Sub(event.GetEventTime().AsTime())
tsi.AccumulatedSkippedDuration = durationpb.New(asd)
}
tsi.AccumulatedSkippedDuration = durationpb.New(accumulatedSkippedDuration)
tsi.Config.Enabled = !attr.GetDisabledAfterFastForward()

if attr.GetDisabledAfterFastForward() && tsi.GetFastForwardInfo() != nil {
tsi.FastForwardInfo.HasReached = true
}

ms.timeSkippingInfoUpdated = true
return nil
}
Expand Down Expand Up @@ -7925,6 +7919,13 @@ func (ms *MutableStateImpl) closeTransactionTrackLastUpdateVersionedTransition(
ms.executionState.LastUpdateVersionedTransition = currentVersionedTransition
}

// A time-skipping transition mutates only executionInfo.TimeSkippingInfo (a workflow-level
// field), not any per-timer entity, so stamp the change here. PartialRefresh uses this to
// know that all pending timer tasks must be re-stamped against the new accumulated skip.
if ms.timeSkippingInfoUpdated && ms.executionInfo.TimeSkippingInfo != nil {
ms.executionInfo.TimeSkippingInfo.LastUpdateVersionedTransition = currentVersionedTransition
}

// LastUpdateVersionTransition for HSM nodes already updated when transitioning the nodes.
// LastUpdateVersionTransition for CHASM nodes already updated when closing the chasm tree transaction.
}
Expand Down
21 changes: 11 additions & 10 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,17 +1032,18 @@ func isPathAffectedByDelete(deletePath []hsm.Key, timerPath []*persistencespb.St
return true
}

// RegenerateTimerTasksForTimeSkipping regenerates the timer tasks for time skipping.
// This function is not idempotent, but when called twice, logically the timerTasks regenerated will have the same contents,
// and the only difference is the TaskID.
// TODO@time-skipping: currently not safe to call in replication context
Comment thread
feiyang3cat marked this conversation as resolved.
// RegenerateTimerTasksForTimeSkipping force re-stamps every pending timer task against the
// current accumulated skip. It is content-idempotent but produces fresh TaskIDs per call.
//
// It needs no per-task dedup status of its own. Callers gate it on whether a skip actually
// happened: the active close transaction only invokes it when a skip transition was emitted
// this transaction (regenerateTimerTasksForTimeSkipping), and PartialRefresh only invokes it
// when TimeSkippingInfo.LastUpdateVersionedTransition falls within the replicated delta (see
// refreshTasksForTimeSkipping). The accumulated-skip guard below is the final no-op shortcut.
func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error {

if r.mutableState.GetExecutionInfo().TimeSkippingInfo == nil {
return nil
}
accumulatedSkippedDuration := r.mutableState.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration.AsDuration()
if accumulatedSkippedDuration <= 0 {
tsi := r.mutableState.GetExecutionInfo().GetTimeSkippingInfo()
if tsi == nil || tsi.GetAccumulatedSkippedDuration().AsDuration() <= 0 {
return nil
}

Expand Down Expand Up @@ -1093,7 +1094,6 @@ func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error {

// (3) fast-forward timer — regenerate when configured so its real-time
// VisibilityTimestamp tracks the new accumulated skip.
tsi := r.mutableState.GetExecutionInfo().GetTimeSkippingInfo()
if tsi.GetConfig().GetEnabled() {
fastForward := tsi.GetFastForwardInfo()
if fastForward != nil && !fastForward.GetHasReached() {
Expand Down Expand Up @@ -1150,5 +1150,6 @@ func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error {
}

// todo@time-skipping: ChasmTaskPure is not supported yet.

return nil
}
Loading
Loading