From 42a78f70d10cc345f4a6138c7373a995c23dc0b1 Mon Sep 17 00:00:00 2001 From: David Porter Date: Tue, 2 Jun 2026 14:57:15 -0700 Subject: [PATCH 1/5] migration script --- tools/tdbg/commands.go | 313 ++++++++++++++++++++++++-- tools/tdbg/flags.go | 6 + tools/tdbg/schedule_migrate_test.go | 329 ++++++++++++++++++++++++++++ tools/tdbg/tdbg_commands.go | 28 ++- 4 files changed, 658 insertions(+), 18 deletions(-) create mode 100644 tools/tdbg/schedule_migrate_test.go diff --git a/tools/tdbg/commands.go b/tools/tdbg/commands.go index a72c96ce91..63f1ffc93f 100644 --- a/tools/tdbg/commands.go +++ b/tools/tdbg/commands.go @@ -1,10 +1,14 @@ package tdbg import ( + "bufio" + "context" + "encoding/json" "errors" "fmt" "os" "strings" + "sync" "time" "github.com/fatih/color" @@ -872,45 +876,326 @@ func AdminReplicateWorkflow( return nil } -// AdminMigrateSchedule migrates a schedule between V1 (workflow-backed) and V2 (CHASM). +// AdminMigrateSchedule migrates schedules between V1 (workflow-backed) and V2 (CHASM). +// +// It supports three mutually-exclusive selection modes, all sharing the required --target flag: +// - single: --schedule-id (performs immediately, as before) +// - from visibility: --from-visibility [--query ] (defaults to all running V2 schedules in --namespace) +// - stdin: JSON lines piped on stdin, one {"namespace":..., "schedule_id":...} per line +// +// The from-visibility and stdin modes default to a dry-run; pass --execute to perform the migration. func AdminMigrateSchedule(c *cli.Context, clientFactory ClientFactory) error { - ns, err := getRequiredOption(c, FlagNamespace) + target, targetStr, err := parseMigrateTarget(c) if err != nil { return err } - scheduleID, err := getRequiredOption(c, FlagScheduleID) - if err != nil { - return err + + fromVisibility := c.Bool(FlagFromVisibility) + scheduleID := c.String(FlagScheduleID) + + switch { + case fromVisibility: + if scheduleID != "" { + return fmt.Errorf("--%s cannot be combined with --%s", FlagFromVisibility, FlagScheduleID) + } + return migrateSchedulesFromVisibility(c, clientFactory, target, targetStr) + case scheduleID != "": + return migrateSingleSchedule(c, clientFactory, target, targetStr, scheduleID) + case isStdinPiped(): + return migrateSchedulesFromStdin(c, clientFactory, target, targetStr) + default: + return fmt.Errorf("specify one of: --%s, --%s, or pipe JSON lines on stdin", FlagScheduleID, FlagFromVisibility) } +} + +func parseMigrateTarget(c *cli.Context) (adminservice.MigrateScheduleRequest_SchedulerTarget, string, error) { targetStr, err := getRequiredOption(c, FlagTarget) if err != nil { - return err + return 0, "", err } - var target adminservice.MigrateScheduleRequest_SchedulerTarget switch strings.ToLower(targetStr) { case "chasm": - target = adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_CHASM + return adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_CHASM, targetStr, nil case "workflow": - target = adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW + return adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW, targetStr, nil default: - return fmt.Errorf("invalid target %q, valid values are: chasm, workflow", targetStr) + return 0, "", fmt.Errorf("invalid target %q, valid values are: chasm, workflow", targetStr) + } +} + +// migrateSingleSchedule migrates one schedule and performs the migration immediately. +func migrateSingleSchedule( + c *cli.Context, + clientFactory ClientFactory, + target adminservice.MigrateScheduleRequest_SchedulerTarget, + targetStr string, + scheduleID string, +) error { + ns, err := getRequiredOption(c, FlagNamespace) + if err != nil { + return err } adminClient := clientFactory.AdminClient(c) ctx, cancel := newContext(c) defer cancel() - _, err = adminClient.MigrateSchedule(ctx, &adminservice.MigrateScheduleRequest{ + if err := migrateScheduleRPC(ctx, adminClient, ns, scheduleID, target); err != nil { + return fmt.Errorf("unable to migrate schedule: %w", err) + } + + _, _ = fmt.Fprintf(c.App.Writer, "Successfully initiated migration of schedule %q in namespace %q to %s.\n", scheduleID, ns, targetStr) + return nil +} + +// migrateSchedulesFromVisibility selects schedules via a visibility query and migrates each. +// By default it targets all running V2 (CHASM) schedules in --namespace; --query overrides the query. +func migrateSchedulesFromVisibility( + c *cli.Context, + clientFactory ClientFactory, + target adminservice.MigrateScheduleRequest_SchedulerTarget, + targetStr string, +) error { + ns, err := getRequiredOption(c, FlagNamespace) + if err != nil { + return err + } + + query := c.String(FlagVisibilityQuery) + if query == "" { + // Default: all running V2 (CHASM) schedules. The explicit TemporalNamespaceDivision + // filter is required, otherwise the visibility query converter appends + // "TemporalNamespaceDivision IS NULL" and excludes CHASM executions. + query = fmt.Sprintf("TemporalNamespaceDivision = '%d' AND ExecutionStatus = 'Running'", chasm.SchedulerArchetypeID) + } + + execute := c.Bool(FlagExecute) + workers := c.Int(FlagWorkers) + if workers < 1 { + workers = 1 + } + wfClient := clientFactory.WorkflowClient(c) + adminClient := clientFactory.AdminClient(c) + + // Schedules are listed (paginated) on this goroutine and fed to a pool of workers + // that migrate them concurrently. + var summary migrateSummary + if logPath := c.String(FlagOutputLog); logPath != "" { + logFile, err := os.Create(logPath) + if err != nil { + return fmt.Errorf("unable to open output log %q: %w", logPath, err) + } + defer func() { _ = logFile.Close() }() + summary.logEnc = json.NewEncoder(logFile) + } + jobs := make(chan migrateJob) + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for job := range jobs { + migrateOne(c, adminClient, job.namespace, job.scheduleID, target, targetStr, execute, &summary) + } + }() + } + + var listErr error + var nextPageToken []byte + for { + ctx, cancel := newContext(c) + resp, err := wfClient.ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: ns, + Query: query, + NextPageToken: nextPageToken, + }) + cancel() + if err != nil { + listErr = fmt.Errorf("unable to list schedules from visibility: %w", err) + break + } + + for _, exec := range resp.GetExecutions() { + workflowID := exec.GetExecution().GetWorkflowId() + // CHASM scheduler executions store the schedule id directly as the workflow id; + // TrimPrefix is a no-op for them and handles any V1 records defensively. + scheduleID := strings.TrimPrefix(workflowID, primitives.ScheduleWorkflowIDPrefix) + jobs <- migrateJob{namespace: ns, scheduleID: scheduleID} + } + + nextPageToken = resp.GetNextPageToken() + if len(nextPageToken) == 0 { + break + } + } + close(jobs) + wg.Wait() + + if listErr != nil { + return listErr + } + + summary.print(c, execute) + return nil +} + +type migrateJob struct { + namespace string + scheduleID string +} + +// migrateSchedulesFromStdin reads JSON lines from stdin, one {"namespace","schedule_id"} per line. +func migrateSchedulesFromStdin( + c *cli.Context, + clientFactory ClientFactory, + target adminservice.MigrateScheduleRequest_SchedulerTarget, + targetStr string, +) error { + execute := c.Bool(FlagExecute) + adminClient := clientFactory.AdminClient(c) + + var summary migrateSummary + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + var record struct { + Namespace string `json:"namespace"` + ScheduleID string `json:"schedule_id"` + } + if err := json.Unmarshal([]byte(line), &record); err != nil { + return fmt.Errorf("invalid JSON line %q: %w", line, err) + } + if record.Namespace == "" || record.ScheduleID == "" { + return fmt.Errorf("each line must include non-empty \"namespace\" and \"schedule_id\": %q", line) + } + migrateOne(c, adminClient, record.Namespace, record.ScheduleID, target, targetStr, execute, &summary) + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("error reading stdin: %w", err) + } + + summary.print(c, execute) + return nil +} + +// migrateOne migrates a single schedule (or prints the planned action in dry-run), updating +// summary. It is safe to call concurrently from multiple workers: the migration RPC runs +// outside the summary lock, while counter updates and output are serialized. +func migrateOne( + c *cli.Context, + adminClient adminservice.AdminServiceClient, + ns string, + scheduleID string, + target adminservice.MigrateScheduleRequest_SchedulerTarget, + targetStr string, + execute bool, + summary *migrateSummary, +) { + if !execute { + summary.recordDryRun(c, ns, scheduleID, targetStr) + return + } + + ctx, cancel := newContext(c) + defer cancel() + err := migrateScheduleRPC(ctx, adminClient, ns, scheduleID, target) + summary.recordResult(c, ns, scheduleID, targetStr, err) +} + +func migrateScheduleRPC( + ctx context.Context, + adminClient adminservice.AdminServiceClient, + ns string, + scheduleID string, + target adminservice.MigrateScheduleRequest_SchedulerTarget, +) error { + _, err := adminClient.MigrateSchedule(ctx, &adminservice.MigrateScheduleRequest{ Namespace: ns, ScheduleId: scheduleID, Target: target, Identity: getCurrentUserFromEnv(), RequestId: uuid.NewString(), }) + return err +} + +// migrateLogRecord is one structured entry written to --output-log per schedule. +type migrateLogRecord struct { + Timestamp string `json:"timestamp"` + Namespace string `json:"namespace"` + ScheduleID string `json:"schedule_id"` + Target string `json:"target"` + Status string `json:"status"` // "migrated", "failed", or "dry-run" + Error string `json:"error,omitempty"` +} + +type migrateSummary struct { + mu sync.Mutex + planned int + migrated int + failed int + logEnc *json.Encoder // optional; writes one migrateLogRecord per result +} + +func (s *migrateSummary) recordDryRun(c *cli.Context, ns, scheduleID, targetStr string) { + s.mu.Lock() + defer s.mu.Unlock() + s.planned++ + _, _ = fmt.Fprintf(c.App.Writer, "[dry-run] would migrate %s/%s -> %s\n", ns, scheduleID, targetStr) + s.writeLogLocked(ns, scheduleID, targetStr, "dry-run", nil) +} + +func (s *migrateSummary) recordResult(c *cli.Context, ns, scheduleID, targetStr string, err error) { + s.mu.Lock() + defer s.mu.Unlock() + s.planned++ if err != nil { - return fmt.Errorf("unable to migrate schedule: %w", err) + s.failed++ + _, _ = fmt.Fprintf(c.App.ErrWriter, "failed to migrate %s/%s: %v\n", ns, scheduleID, err) + s.writeLogLocked(ns, scheduleID, targetStr, "failed", err) + return + } + s.migrated++ + _, _ = fmt.Fprintf(c.App.Writer, "migrated %s/%s -> %s\n", ns, scheduleID, targetStr) + s.writeLogLocked(ns, scheduleID, targetStr, "migrated", nil) +} + +// writeLogLocked appends a structured record to the output log. Callers must hold s.mu. +func (s *migrateSummary) writeLogLocked(ns, scheduleID, targetStr, status string, err error) { + if s.logEnc == nil { + return + } + rec := migrateLogRecord{ + Timestamp: time.Now().UTC().Format(time.RFC3339), + Namespace: ns, + ScheduleID: scheduleID, + Target: targetStr, + Status: status, } + if err != nil { + rec.Error = err.Error() + } + _ = s.logEnc.Encode(&rec) +} - _, _ = fmt.Fprintf(c.App.Writer, "Successfully initiated migration of schedule %q in namespace %q to %s.\n", scheduleID, ns, targetStr) - return nil +func (s *migrateSummary) print(c *cli.Context, execute bool) { + s.mu.Lock() + defer s.mu.Unlock() + if !execute { + _, _ = fmt.Fprintf(c.App.Writer, "Dry-run: %d schedule(s) would be migrated. Re-run with --%s to perform.\n", s.planned, FlagExecute) + return + } + _, _ = fmt.Fprintf(c.App.Writer, "Done: %d migrated, %d failed (of %d).\n", s.migrated, s.failed, s.planned) +} + +// isStdinPiped reports whether stdin is connected to a pipe or file rather than a terminal. +func isStdinPiped() bool { + fi, err := os.Stdin.Stat() + if err != nil { + return false + } + return (fi.Mode() & os.ModeCharDevice) == 0 } diff --git a/tools/tdbg/flags.go b/tools/tdbg/flags.go index ee9e558cd3..ee35fcbed1 100644 --- a/tools/tdbg/flags.go +++ b/tools/tdbg/flags.go @@ -80,4 +80,10 @@ var ( FlagScheduleID = "schedule-id" FlagScheduleIDAlias = []string{"sid"} FlagTarget = "target" + FlagFromVisibility = "from-visibility" + FlagExecute = "execute" + FlagWorkers = "workers" + FlagOutputLog = "output-log" ) + +const defaultMigrateWorkers = 5 diff --git a/tools/tdbg/schedule_migrate_test.go b/tools/tdbg/schedule_migrate_test.go new file mode 100644 index 0000000000..9cdff16f4a --- /dev/null +++ b/tools/tdbg/schedule_migrate_test.go @@ -0,0 +1,329 @@ +package tdbg_test + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/urfave/cli/v2" + commonpb "go.temporal.io/api/common/v1" + workflowpb "go.temporal.io/api/workflow/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/api/adminservice/v1" + "go.temporal.io/server/chasm" + "go.temporal.io/server/tools/tdbg" + "go.temporal.io/server/tools/tdbg/tdbgtest" + "google.golang.org/grpc" +) + +type migrateAdminClient struct { + adminservice.AdminServiceClient + err error + failIDs map[string]string // scheduleID -> error message + + mu sync.Mutex + requests []*adminservice.MigrateScheduleRequest +} + +func (c *migrateAdminClient) MigrateSchedule( + _ context.Context, + req *adminservice.MigrateScheduleRequest, + _ ...grpc.CallOption, +) (*adminservice.MigrateScheduleResponse, error) { + c.mu.Lock() + c.requests = append(c.requests, req) + c.mu.Unlock() + if c.err != nil { + return nil, c.err + } + if msg, ok := c.failIDs[req.ScheduleId]; ok { + return nil, errors.New(msg) + } + return &adminservice.MigrateScheduleResponse{}, nil +} + +type migrateWorkflowClient struct { + workflowservice.WorkflowServiceClient + pages []*workflowservice.ListWorkflowExecutionsResponse + next int + requests []*workflowservice.ListWorkflowExecutionsRequest +} + +func (c *migrateWorkflowClient) ListWorkflowExecutions( + _ context.Context, + req *workflowservice.ListWorkflowExecutionsRequest, + _ ...grpc.CallOption, +) (*workflowservice.ListWorkflowExecutionsResponse, error) { + c.requests = append(c.requests, req) + if c.next >= len(c.pages) { + return &workflowservice.ListWorkflowExecutionsResponse{}, nil + } + resp := c.pages[c.next] + c.next++ + return resp, nil +} + +type migrateClientFactory struct { + admin adminservice.AdminServiceClient + workflow workflowservice.WorkflowServiceClient +} + +func (f migrateClientFactory) AdminClient(*cli.Context) adminservice.AdminServiceClient { + return f.admin +} + +func (f migrateClientFactory) WorkflowClient(*cli.Context) workflowservice.WorkflowServiceClient { + return f.workflow +} + +func scheduleExecution(workflowID string) *workflowpb.WorkflowExecutionInfo { + return &workflowpb.WorkflowExecutionInfo{ + Execution: &commonpb.WorkflowExecution{WorkflowId: workflowID}, + } +} + +func runMigrate(t *testing.T, factory tdbg.ClientFactory, args ...string) (string, string, error) { + t.Helper() + var stdout, stderr bytes.Buffer + app := tdbgtest.NewCliApp(func(params *tdbg.Params) { + params.ClientFactory = factory + params.Writer = &stdout + params.ErrWriter = &stderr + }) + runArgs := append([]string{"tdbg"}, args...) + err := app.Run(runArgs) + return stdout.String(), stderr.String(), err +} + +func TestMigrateSchedule_FromVisibility_DryRun(t *testing.T) { + admin := &migrateAdminClient{} + wf := &migrateWorkflowClient{ + pages: []*workflowservice.ListWorkflowExecutionsResponse{ + { + Executions: []*workflowpb.WorkflowExecutionInfo{scheduleExecution("sched-a")}, + NextPageToken: []byte("page2"), + }, + { + Executions: []*workflowpb.WorkflowExecutionInfo{scheduleExecution("sched-b")}, + }, + }, + } + factory := migrateClientFactory{admin: admin, workflow: wf} + + stdout, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "workflow", "--from-visibility") + require.NoError(t, err) + + // Dry-run performs no migrations. + assert.Empty(t, admin.requests) + // Default query is built from the CHASM scheduler archetype ID and scoped to --namespace. + require.NotEmpty(t, wf.requests) + expectedQuery := fmt.Sprintf("TemporalNamespaceDivision = '%d' AND ExecutionStatus = 'Running'", chasm.SchedulerArchetypeID) + assert.Equal(t, expectedQuery, wf.requests[0].Query) + assert.Equal(t, "my-ns", wf.requests[0].Namespace) + // Both pages are listed and reported. + assert.Contains(t, stdout, "[dry-run] would migrate my-ns/sched-a -> workflow") + assert.Contains(t, stdout, "[dry-run] would migrate my-ns/sched-b -> workflow") + assert.Contains(t, stdout, "Dry-run: 2 schedule(s)") +} + +func TestMigrateSchedule_FromVisibility_Execute(t *testing.T) { + admin := &migrateAdminClient{} + wf := &migrateWorkflowClient{ + pages: []*workflowservice.ListWorkflowExecutionsResponse{ + { + Executions: []*workflowpb.WorkflowExecutionInfo{ + // CHASM execution: workflow id is the schedule id directly. + scheduleExecution("sched-v2"), + // V1 execution: the scheduler workflow-id prefix is trimmed off. + scheduleExecution("temporal-sys-scheduler:sched-v1"), + }, + }, + }, + } + factory := migrateClientFactory{admin: admin, workflow: wf} + + _, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "workflow", "--from-visibility", "--execute") + require.NoError(t, err) + + require.Len(t, admin.requests, 2) + ids := []string{admin.requests[0].ScheduleId, admin.requests[1].ScheduleId} + assert.ElementsMatch(t, []string{"sched-v2", "sched-v1"}, ids) + for _, req := range admin.requests { + assert.Equal(t, "my-ns", req.Namespace) + assert.Equal(t, adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW, req.Target) + assert.NotEmpty(t, req.RequestId) + } +} + +func TestMigrateSchedule_FromVisibility_Workers(t *testing.T) { + admin := &migrateAdminClient{} + const n = 12 + execs := make([]*workflowpb.WorkflowExecutionInfo, n) + want := make([]string, n) + for i := 0; i < n; i++ { + id := fmt.Sprintf("sched-%d", i) + execs[i] = scheduleExecution(id) + want[i] = id + } + wf := &migrateWorkflowClient{ + pages: []*workflowservice.ListWorkflowExecutionsResponse{{Executions: execs}}, + } + factory := migrateClientFactory{admin: admin, workflow: wf} + + _, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "workflow", "--from-visibility", + "--execute", "--workers", "4") + require.NoError(t, err) + + require.Len(t, admin.requests, n) + got := make([]string, len(admin.requests)) + for i, req := range admin.requests { + got[i] = req.ScheduleId + } + assert.ElementsMatch(t, want, got) +} + +func TestMigrateSchedule_FromVisibility_OutputLog(t *testing.T) { + admin := &migrateAdminClient{failIDs: map[string]string{"sched-bad": "boom"}} + wf := &migrateWorkflowClient{ + pages: []*workflowservice.ListWorkflowExecutionsResponse{{ + Executions: []*workflowpb.WorkflowExecutionInfo{ + scheduleExecution("sched-ok"), + scheduleExecution("sched-bad"), + }, + }}, + } + factory := migrateClientFactory{admin: admin, workflow: wf} + + logPath := filepath.Join(t.TempDir(), "migrations.jsonl") + _, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "workflow", "--from-visibility", + "--execute", "--output-log", logPath) + require.NoError(t, err) + + type logRec struct { + Timestamp string `json:"timestamp"` + Namespace string `json:"namespace"` + ScheduleID string `json:"schedule_id"` + Target string `json:"target"` + Status string `json:"status"` + Error string `json:"error"` + } + data, err := os.ReadFile(logPath) + require.NoError(t, err) + byID := map[string]logRec{} + for _, line := range strings.Split(strings.TrimSpace(string(data)), "\n") { + var rec logRec + require.NoError(t, json.Unmarshal([]byte(line), &rec)) + byID[rec.ScheduleID] = rec + } + + require.Len(t, byID, 2) + + ok := byID["sched-ok"] + assert.Equal(t, "migrated", ok.Status) + assert.Equal(t, "my-ns", ok.Namespace) + assert.Equal(t, "workflow", ok.Target) + assert.Empty(t, ok.Error) + assert.NotEmpty(t, ok.Timestamp) + + bad := byID["sched-bad"] + assert.Equal(t, "failed", bad.Status) + assert.Contains(t, bad.Error, "boom") +} + +func TestMigrateSchedule_FromVisibility_CustomQuery(t *testing.T) { + admin := &migrateAdminClient{} + wf := &migrateWorkflowClient{} + factory := migrateClientFactory{admin: admin, workflow: wf} + + customQuery := "TemporalNamespaceDivision = '403648407' AND ScheduleId = 'only-this'" + _, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "workflow", "--from-visibility", + "--query", customQuery) + require.NoError(t, err) + + require.NotEmpty(t, wf.requests) + assert.Equal(t, customQuery, wf.requests[0].Query) +} + +func TestMigrateSchedule_FromVisibility_RejectsScheduleID(t *testing.T) { + factory := migrateClientFactory{admin: &migrateAdminClient{}, workflow: &migrateWorkflowClient{}} + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--from-visibility", "--schedule-id", "x") + require.Error(t, err) + assert.Contains(t, err.Error(), "from-visibility") + assert.Contains(t, err.Error(), "schedule-id") +} + +func TestMigrateSchedule_Stdin_Execute(t *testing.T) { + admin := &migrateAdminClient{} + factory := migrateClientFactory{admin: admin, workflow: &migrateWorkflowClient{}} + + stdin := strings.Join([]string{ + `{"namespace":"ns-1","schedule_id":"sched-1"}`, + ``, // blank lines are skipped + `{"namespace":"ns-2","schedule_id":"sched-2"}`, + }, "\n") + + withStdin(t, stdin, func() { + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--execute") + require.NoError(t, err) + }) + + require.Len(t, admin.requests, 2) + assert.Equal(t, "ns-1", admin.requests[0].Namespace) + assert.Equal(t, "sched-1", admin.requests[0].ScheduleId) + assert.Equal(t, "ns-2", admin.requests[1].Namespace) + assert.Equal(t, "sched-2", admin.requests[1].ScheduleId) + for _, req := range admin.requests { + assert.Equal(t, adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW, req.Target) + } +} + +func TestMigrateSchedule_Stdin_DryRun(t *testing.T) { + admin := &migrateAdminClient{} + factory := migrateClientFactory{admin: admin, workflow: &migrateWorkflowClient{}} + + withStdin(t, `{"namespace":"ns-1","schedule_id":"sched-1"}`, func() { + stdout, _, err := runMigrate(t, factory, "schedule", "migrate", "--target", "workflow") + require.NoError(t, err) + assert.Contains(t, stdout, "[dry-run] would migrate ns-1/sched-1 -> workflow") + }) + + assert.Empty(t, admin.requests) +} + +// withStdin redirects os.Stdin to a temp file containing content for the duration of fn. +// A regular file is not a character device, so the command's piped-stdin detection treats it +// as piped input. +func withStdin(t *testing.T, content string, fn func()) { + t.Helper() + f, err := os.CreateTemp(t.TempDir(), "stdin") + require.NoError(t, err) + _, err = f.WriteString(content) + require.NoError(t, err) + require.NoError(t, f.Sync()) + _, err = f.Seek(0, 0) + require.NoError(t, err) + + orig := os.Stdin + os.Stdin = f + defer func() { + os.Stdin = orig + _ = f.Close() + }() + fn() +} diff --git a/tools/tdbg/tdbg_commands.go b/tools/tdbg/tdbg_commands.go index c815978b21..953f08022a 100644 --- a/tools/tdbg/tdbg_commands.go +++ b/tools/tdbg/tdbg_commands.go @@ -304,16 +304,36 @@ func newAdminScheduleCommands(clientFactory ClientFactory) []*cli.Command { Usage: "Migrate a schedule between V1 (workflow-backed) and V2 (CHASM)", Flags: []cli.Flag{ &cli.StringFlag{ - Name: FlagScheduleID, - Aliases: FlagScheduleIDAlias, - Usage: "Schedule ID", - Required: true, + Name: FlagScheduleID, + Aliases: FlagScheduleIDAlias, + Usage: "Schedule ID (single-schedule mode)", }, &cli.StringFlag{ Name: FlagTarget, Usage: "Target scheduler implementation: chasm, workflow", Required: true, }, + &cli.BoolFlag{ + Name: FlagFromVisibility, + Usage: "Select schedules from visibility instead of --schedule-id. Defaults to all running V2 (CHASM) schedules in --namespace; override with --query", + }, + &cli.StringFlag{ + Name: FlagVisibilityQuery, + Usage: "Visibility query used with --from-visibility (overrides the default V2-schedule query)", + }, + &cli.BoolFlag{ + Name: FlagExecute, + Usage: "Perform the migration. Without this flag, --from-visibility and stdin modes only print what they would do (dry-run)", + }, + &cli.IntFlag{ + Name: FlagWorkers, + Value: defaultMigrateWorkers, + Usage: "Number of concurrent workers migrating schedules in --from-visibility mode", + }, + &cli.StringFlag{ + Name: FlagOutputLog, + Usage: "Path to write a structured (JSON lines) log of each migration result in --from-visibility mode", + }, }, Action: func(c *cli.Context) error { return AdminMigrateSchedule(c, clientFactory) From 26bfb0d4b1217768357133d857324d4800487dd6 Mon Sep 17 00:00:00 2001 From: David Porter Date: Thu, 4 Jun 2026 22:39:01 -0700 Subject: [PATCH 2/5] lint --- tools/tdbg/commands.go | 6 +-- tools/tdbg/schedule_migrate_test.go | 61 ++++++++++++++--------------- 2 files changed, 32 insertions(+), 35 deletions(-) diff --git a/tools/tdbg/commands.go b/tools/tdbg/commands.go index 63f1ffc93f..bf289331d9 100644 --- a/tools/tdbg/commands.go +++ b/tools/tdbg/commands.go @@ -991,13 +991,11 @@ func migrateSchedulesFromVisibility( jobs := make(chan migrateJob) var wg sync.WaitGroup for i := 0; i < workers; i++ { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { for job := range jobs { migrateOne(c, adminClient, job.namespace, job.scheduleID, target, targetStr, execute, &summary) } - }() + }) } var listErr error diff --git a/tools/tdbg/schedule_migrate_test.go b/tools/tdbg/schedule_migrate_test.go index 9cdff16f4a..502b328168 100644 --- a/tools/tdbg/schedule_migrate_test.go +++ b/tools/tdbg/schedule_migrate_test.go @@ -12,7 +12,6 @@ import ( "sync" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" commonpb "go.temporal.io/api/common/v1" @@ -91,7 +90,7 @@ func scheduleExecution(workflowID string) *workflowpb.WorkflowExecutionInfo { } } -func runMigrate(t *testing.T, factory tdbg.ClientFactory, args ...string) (string, string, error) { +func runMigrate(t *testing.T, factory tdbg.ClientFactory, args ...string) (stdoutStr, stderrStr string, err error) { t.Helper() var stdout, stderr bytes.Buffer app := tdbgtest.NewCliApp(func(params *tdbg.Params) { @@ -100,7 +99,7 @@ func runMigrate(t *testing.T, factory tdbg.ClientFactory, args ...string) (strin params.ErrWriter = &stderr }) runArgs := append([]string{"tdbg"}, args...) - err := app.Run(runArgs) + err = app.Run(runArgs) return stdout.String(), stderr.String(), err } @@ -124,16 +123,16 @@ func TestMigrateSchedule_FromVisibility_DryRun(t *testing.T) { require.NoError(t, err) // Dry-run performs no migrations. - assert.Empty(t, admin.requests) + require.Empty(t, admin.requests) // Default query is built from the CHASM scheduler archetype ID and scoped to --namespace. require.NotEmpty(t, wf.requests) expectedQuery := fmt.Sprintf("TemporalNamespaceDivision = '%d' AND ExecutionStatus = 'Running'", chasm.SchedulerArchetypeID) - assert.Equal(t, expectedQuery, wf.requests[0].Query) - assert.Equal(t, "my-ns", wf.requests[0].Namespace) + require.Equal(t, expectedQuery, wf.requests[0].Query) + require.Equal(t, "my-ns", wf.requests[0].Namespace) // Both pages are listed and reported. - assert.Contains(t, stdout, "[dry-run] would migrate my-ns/sched-a -> workflow") - assert.Contains(t, stdout, "[dry-run] would migrate my-ns/sched-b -> workflow") - assert.Contains(t, stdout, "Dry-run: 2 schedule(s)") + require.Contains(t, stdout, "[dry-run] would migrate my-ns/sched-a -> workflow") + require.Contains(t, stdout, "[dry-run] would migrate my-ns/sched-b -> workflow") + require.Contains(t, stdout, "Dry-run: 2 schedule(s)") } func TestMigrateSchedule_FromVisibility_Execute(t *testing.T) { @@ -158,11 +157,11 @@ func TestMigrateSchedule_FromVisibility_Execute(t *testing.T) { require.Len(t, admin.requests, 2) ids := []string{admin.requests[0].ScheduleId, admin.requests[1].ScheduleId} - assert.ElementsMatch(t, []string{"sched-v2", "sched-v1"}, ids) + require.ElementsMatch(t, []string{"sched-v2", "sched-v1"}, ids) for _, req := range admin.requests { - assert.Equal(t, "my-ns", req.Namespace) - assert.Equal(t, adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW, req.Target) - assert.NotEmpty(t, req.RequestId) + require.Equal(t, "my-ns", req.Namespace) + require.Equal(t, adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW, req.Target) + require.NotEmpty(t, req.RequestId) } } @@ -191,7 +190,7 @@ func TestMigrateSchedule_FromVisibility_Workers(t *testing.T) { for i, req := range admin.requests { got[i] = req.ScheduleId } - assert.ElementsMatch(t, want, got) + require.ElementsMatch(t, want, got) } func TestMigrateSchedule_FromVisibility_OutputLog(t *testing.T) { @@ -232,15 +231,15 @@ func TestMigrateSchedule_FromVisibility_OutputLog(t *testing.T) { require.Len(t, byID, 2) ok := byID["sched-ok"] - assert.Equal(t, "migrated", ok.Status) - assert.Equal(t, "my-ns", ok.Namespace) - assert.Equal(t, "workflow", ok.Target) - assert.Empty(t, ok.Error) - assert.NotEmpty(t, ok.Timestamp) + require.Equal(t, "migrated", ok.Status) + require.Equal(t, "my-ns", ok.Namespace) + require.Equal(t, "workflow", ok.Target) + require.Empty(t, ok.Error) + require.NotEmpty(t, ok.Timestamp) bad := byID["sched-bad"] - assert.Equal(t, "failed", bad.Status) - assert.Contains(t, bad.Error, "boom") + require.Equal(t, "failed", bad.Status) + require.Contains(t, bad.Error, "boom") } func TestMigrateSchedule_FromVisibility_CustomQuery(t *testing.T) { @@ -255,7 +254,7 @@ func TestMigrateSchedule_FromVisibility_CustomQuery(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, wf.requests) - assert.Equal(t, customQuery, wf.requests[0].Query) + require.Equal(t, customQuery, wf.requests[0].Query) } func TestMigrateSchedule_FromVisibility_RejectsScheduleID(t *testing.T) { @@ -263,8 +262,8 @@ func TestMigrateSchedule_FromVisibility_RejectsScheduleID(t *testing.T) { _, _, err := runMigrate(t, factory, "schedule", "migrate", "--target", "workflow", "--from-visibility", "--schedule-id", "x") require.Error(t, err) - assert.Contains(t, err.Error(), "from-visibility") - assert.Contains(t, err.Error(), "schedule-id") + require.Contains(t, err.Error(), "from-visibility") + require.Contains(t, err.Error(), "schedule-id") } func TestMigrateSchedule_Stdin_Execute(t *testing.T) { @@ -284,12 +283,12 @@ func TestMigrateSchedule_Stdin_Execute(t *testing.T) { }) require.Len(t, admin.requests, 2) - assert.Equal(t, "ns-1", admin.requests[0].Namespace) - assert.Equal(t, "sched-1", admin.requests[0].ScheduleId) - assert.Equal(t, "ns-2", admin.requests[1].Namespace) - assert.Equal(t, "sched-2", admin.requests[1].ScheduleId) + require.Equal(t, "ns-1", admin.requests[0].Namespace) + require.Equal(t, "sched-1", admin.requests[0].ScheduleId) + require.Equal(t, "ns-2", admin.requests[1].Namespace) + require.Equal(t, "sched-2", admin.requests[1].ScheduleId) for _, req := range admin.requests { - assert.Equal(t, adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW, req.Target) + require.Equal(t, adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW, req.Target) } } @@ -300,10 +299,10 @@ func TestMigrateSchedule_Stdin_DryRun(t *testing.T) { withStdin(t, `{"namespace":"ns-1","schedule_id":"sched-1"}`, func() { stdout, _, err := runMigrate(t, factory, "schedule", "migrate", "--target", "workflow") require.NoError(t, err) - assert.Contains(t, stdout, "[dry-run] would migrate ns-1/sched-1 -> workflow") + require.Contains(t, stdout, "[dry-run] would migrate ns-1/sched-1 -> workflow") }) - assert.Empty(t, admin.requests) + require.Empty(t, admin.requests) } // withStdin redirects os.Stdin to a temp file containing content for the duration of fn. From f63b002de082f845cd749a8551574667f21eea6b Mon Sep 17 00:00:00 2001 From: David Porter Date: Thu, 4 Jun 2026 23:49:56 -0700 Subject: [PATCH 3/5] lint --- tools/tdbg/schedule_migrate_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/tdbg/schedule_migrate_test.go b/tools/tdbg/schedule_migrate_test.go index 502b328168..c4ab081fae 100644 --- a/tools/tdbg/schedule_migrate_test.go +++ b/tools/tdbg/schedule_migrate_test.go @@ -170,7 +170,7 @@ func TestMigrateSchedule_FromVisibility_Workers(t *testing.T) { const n = 12 execs := make([]*workflowpb.WorkflowExecutionInfo, n) want := make([]string, n) - for i := 0; i < n; i++ { + for i := range n { id := fmt.Sprintf("sched-%d", i) execs[i] = scheduleExecution(id) want[i] = id From a4cc287890cf2f92cdaf97d9f63dfc3798a78d13 Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 5 Jun 2026 13:07:05 -0700 Subject: [PATCH 4/5] Feedback/fix --- tools/tdbg/commands.go | 79 ++++++++++++++++++++++------- tools/tdbg/schedule_migrate_test.go | 64 +++++++++++++++++++++++ tools/tdbg/tdbg_commands.go | 12 +++-- 3 files changed, 133 insertions(+), 22 deletions(-) diff --git a/tools/tdbg/commands.go b/tools/tdbg/commands.go index bf289331d9..56fa8f1647 100644 --- a/tools/tdbg/commands.go +++ b/tools/tdbg/commands.go @@ -31,6 +31,7 @@ import ( "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/worker/scheduler" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -880,7 +881,8 @@ func AdminReplicateWorkflow( // // It supports three mutually-exclusive selection modes, all sharing the required --target flag: // - single: --schedule-id (performs immediately, as before) -// - from visibility: --from-visibility [--query ] (defaults to all running V2 schedules in --namespace) +// - from visibility: --from-visibility [--query ] (default query is chosen from --target: the +// running V1 schedules when migrating to chasm, the running V2 schedules when migrating to workflow) // - stdin: JSON lines piped on stdin, one {"namespace":..., "schedule_id":...} per line // // The from-visibility and stdin modes default to a dry-run; pass --execute to perform the migration. @@ -893,6 +895,17 @@ func AdminMigrateSchedule(c *cli.Context, clientFactory ClientFactory) error { fromVisibility := c.Bool(FlagFromVisibility) scheduleID := c.String(FlagScheduleID) + // --query and --workers only take effect in --from-visibility mode; reject them elsewhere + // rather than silently ignoring them. + if !fromVisibility { + if c.IsSet(FlagVisibilityQuery) { + return fmt.Errorf("--%s is only valid with --%s", FlagVisibilityQuery, FlagFromVisibility) + } + if c.IsSet(FlagWorkers) { + return fmt.Errorf("--%s is only valid with --%s", FlagWorkers, FlagFromVisibility) + } + } + switch { case fromVisibility: if scheduleID != "" { @@ -949,7 +962,8 @@ func migrateSingleSchedule( } // migrateSchedulesFromVisibility selects schedules via a visibility query and migrates each. -// By default it targets all running V2 (CHASM) schedules in --namespace; --query overrides the query. +// When --query is not supplied the default query is chosen from the --target direction (see +// defaultMigrateQuery). func migrateSchedulesFromVisibility( c *cli.Context, clientFactory ClientFactory, @@ -963,10 +977,7 @@ func migrateSchedulesFromVisibility( query := c.String(FlagVisibilityQuery) if query == "" { - // Default: all running V2 (CHASM) schedules. The explicit TemporalNamespaceDivision - // filter is required, otherwise the visibility query converter appends - // "TemporalNamespaceDivision IS NULL" and excludes CHASM executions. - query = fmt.Sprintf("TemporalNamespaceDivision = '%d' AND ExecutionStatus = 'Running'", chasm.SchedulerArchetypeID) + query = defaultMigrateQuery(target) } execute := c.Bool(FlagExecute) @@ -980,14 +991,11 @@ func migrateSchedulesFromVisibility( // Schedules are listed (paginated) on this goroutine and fed to a pool of workers // that migrate them concurrently. var summary migrateSummary - if logPath := c.String(FlagOutputLog); logPath != "" { - logFile, err := os.Create(logPath) - if err != nil { - return fmt.Errorf("unable to open output log %q: %w", logPath, err) - } - defer func() { _ = logFile.Close() }() - summary.logEnc = json.NewEncoder(logFile) + closeLog, err := openMigrateLog(c, &summary) + if err != nil { + return err } + defer closeLog() jobs := make(chan migrateJob) var wg sync.WaitGroup for i := 0; i < workers; i++ { @@ -1029,12 +1037,11 @@ func migrateSchedulesFromVisibility( close(jobs) wg.Wait() - if listErr != nil { - return listErr - } - + // Always report what was migrated before surfacing a listing error: if pagination fails + // partway through, workers may have already migrated the schedules listed so far, and the + // user needs to see that partial progress. summary.print(c, execute) - return nil + return listErr } type migrateJob struct { @@ -1042,6 +1049,36 @@ type migrateJob struct { scheduleID string } +// defaultMigrateQuery returns the visibility query selecting which schedules to migrate when +// --query is not supplied. The direction is inferred from --target: migrating to CHASM (V2) +// selects the running V1 (workflow-backed) schedules to move forward, while migrating to +// workflow (V1) selects the running V2 (CHASM) schedules to roll back. +func defaultMigrateQuery(target adminservice.MigrateScheduleRequest_SchedulerTarget) string { + if target == adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_CHASM { + // Forward migration V1 -> V2: all running V1 (workflow-backed) schedules. + return fmt.Sprintf("TemporalNamespaceDivision = '%s' AND ExecutionStatus = 'Running'", scheduler.NamespaceDivision) + } + // Rollback V2 -> V1: all running V2 (CHASM) schedules. The explicit TemporalNamespaceDivision + // filter is required, otherwise the visibility query converter appends + // "TemporalNamespaceDivision IS NULL" and excludes CHASM executions. + return fmt.Sprintf("TemporalNamespaceDivision = '%d' AND ExecutionStatus = 'Running'", chasm.SchedulerArchetypeID) +} + +// openMigrateLog wires summary.logEnc to the --output-log file when the flag is set, returning a +// cleanup func that closes the file (a no-op when the flag is unset). +func openMigrateLog(c *cli.Context, summary *migrateSummary) (func(), error) { + logPath := c.String(FlagOutputLog) + if logPath == "" { + return func() {}, nil + } + logFile, err := os.Create(logPath) + if err != nil { + return nil, fmt.Errorf("unable to open output log %q: %w", logPath, err) + } + summary.logEnc = json.NewEncoder(logFile) + return func() { _ = logFile.Close() }, nil +} + // migrateSchedulesFromStdin reads JSON lines from stdin, one {"namespace","schedule_id"} per line. func migrateSchedulesFromStdin( c *cli.Context, @@ -1053,6 +1090,12 @@ func migrateSchedulesFromStdin( adminClient := clientFactory.AdminClient(c) var summary migrateSummary + closeLog, err := openMigrateLog(c, &summary) + if err != nil { + return err + } + defer closeLog() + scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) diff --git a/tools/tdbg/schedule_migrate_test.go b/tools/tdbg/schedule_migrate_test.go index c4ab081fae..1749b0f752 100644 --- a/tools/tdbg/schedule_migrate_test.go +++ b/tools/tdbg/schedule_migrate_test.go @@ -19,6 +19,7 @@ import ( "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/chasm" + "go.temporal.io/server/service/worker/scheduler" "go.temporal.io/server/tools/tdbg" "go.temporal.io/server/tools/tdbg/tdbgtest" "google.golang.org/grpc" @@ -257,6 +258,39 @@ func TestMigrateSchedule_FromVisibility_CustomQuery(t *testing.T) { require.Equal(t, customQuery, wf.requests[0].Query) } +func TestMigrateSchedule_FromVisibility_DefaultQueryToChasm(t *testing.T) { + admin := &migrateAdminClient{} + wf := &migrateWorkflowClient{} + factory := migrateClientFactory{admin: admin, workflow: wf} + + // Migrating to chasm (V1 -> V2) defaults to selecting running V1 (workflow-backed) schedules. + _, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "chasm", "--from-visibility") + require.NoError(t, err) + + require.NotEmpty(t, wf.requests) + expectedQuery := fmt.Sprintf("TemporalNamespaceDivision = '%s' AND ExecutionStatus = 'Running'", scheduler.NamespaceDivision) + require.Equal(t, expectedQuery, wf.requests[0].Query) +} + +func TestMigrateSchedule_RejectsQueryWithoutFromVisibility(t *testing.T) { + factory := migrateClientFactory{admin: &migrateAdminClient{}, workflow: &migrateWorkflowClient{}} + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--schedule-id", "x", "--query", "ScheduleId = 'x'") + require.Error(t, err) + require.Contains(t, err.Error(), "query") + require.Contains(t, err.Error(), "from-visibility") +} + +func TestMigrateSchedule_RejectsWorkersWithoutFromVisibility(t *testing.T) { + factory := migrateClientFactory{admin: &migrateAdminClient{}, workflow: &migrateWorkflowClient{}} + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--schedule-id", "x", "--workers", "4") + require.Error(t, err) + require.Contains(t, err.Error(), "workers") + require.Contains(t, err.Error(), "from-visibility") +} + func TestMigrateSchedule_FromVisibility_RejectsScheduleID(t *testing.T) { factory := migrateClientFactory{admin: &migrateAdminClient{}, workflow: &migrateWorkflowClient{}} _, _, err := runMigrate(t, factory, @@ -305,9 +339,39 @@ func TestMigrateSchedule_Stdin_DryRun(t *testing.T) { require.Empty(t, admin.requests) } +func TestMigrateSchedule_Stdin_OutputLog(t *testing.T) { + admin := &migrateAdminClient{} + factory := migrateClientFactory{admin: admin, workflow: &migrateWorkflowClient{}} + + logPath := filepath.Join(t.TempDir(), "migrations.jsonl") + withStdin(t, `{"namespace":"ns-1","schedule_id":"sched-1"}`, func() { + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--execute", "--output-log", logPath) + require.NoError(t, err) + }) + + data, err := os.ReadFile(logPath) + require.NoError(t, err) + var rec struct { + Namespace string `json:"namespace"` + ScheduleID string `json:"schedule_id"` + Target string `json:"target"` + Status string `json:"status"` + } + require.NoError(t, json.Unmarshal([]byte(strings.TrimSpace(string(data))), &rec)) + require.Equal(t, "ns-1", rec.Namespace) + require.Equal(t, "sched-1", rec.ScheduleID) + require.Equal(t, "workflow", rec.Target) + require.Equal(t, "migrated", rec.Status) +} + // withStdin redirects os.Stdin to a temp file containing content for the duration of fn. // A regular file is not a character device, so the command's piped-stdin detection treats it // as piped input. +// +// NOTE: os.Stdin is process-global, so tests using withStdin must NOT call t.Parallel(). +// TODO: inject the input reader through tdbg.Params instead of mutating os.Stdin, which would +// remove this constraint (deferred to a follow-up PR to keep this change small). func withStdin(t *testing.T, content string, fn func()) { t.Helper() f, err := os.CreateTemp(t.TempDir(), "stdin") diff --git a/tools/tdbg/tdbg_commands.go b/tools/tdbg/tdbg_commands.go index 953f08022a..1d2e61681e 100644 --- a/tools/tdbg/tdbg_commands.go +++ b/tools/tdbg/tdbg_commands.go @@ -314,12 +314,16 @@ func newAdminScheduleCommands(clientFactory ClientFactory) []*cli.Command { Required: true, }, &cli.BoolFlag{ - Name: FlagFromVisibility, - Usage: "Select schedules from visibility instead of --schedule-id. Defaults to all running V2 (CHASM) schedules in --namespace; override with --query", + Name: FlagFromVisibility, + Usage: "Select schedules from visibility instead of --schedule-id, scoped to --namespace. " + + "The default query is chosen from --target: migrating to chasm selects running V1 schedules, " + + "migrating to workflow selects running V2 schedules. Override with --query", }, &cli.StringFlag{ - Name: FlagVisibilityQuery, - Usage: "Visibility query used with --from-visibility (overrides the default V2-schedule query)", + Name: FlagVisibilityQuery, + Usage: "Visibility query used with --from-visibility, overriding the target-based default. The defaults are:\n" + + "\tV1 (workflow-backed): TemporalNamespaceDivision = 'TemporalScheduler' AND ExecutionStatus = 'Running'\n" + + "\tV2 (CHASM): TemporalNamespaceDivision = '' AND ExecutionStatus = 'Running'", }, &cli.BoolFlag{ Name: FlagExecute, From 739ba3a3ef5ac4560fd29b4d04a2b7ca1ad8d807 Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 5 Jun 2026 14:59:12 -0700 Subject: [PATCH 5/5] feedback 2 --- tools/tdbg/commands.go | 57 +++++++++++++++++++++-------- tools/tdbg/schedule_migrate_test.go | 33 ++++++++++++++++- tools/tdbg/tdbg_commands.go | 4 +- 3 files changed, 74 insertions(+), 20 deletions(-) diff --git a/tools/tdbg/commands.go b/tools/tdbg/commands.go index 56fa8f1647..68ab81c16e 100644 --- a/tools/tdbg/commands.go +++ b/tools/tdbg/commands.go @@ -895,15 +895,15 @@ func AdminMigrateSchedule(c *cli.Context, clientFactory ClientFactory) error { fromVisibility := c.Bool(FlagFromVisibility) scheduleID := c.String(FlagScheduleID) - // --query and --workers only take effect in --from-visibility mode; reject them elsewhere - // rather than silently ignoring them. - if !fromVisibility { - if c.IsSet(FlagVisibilityQuery) { - return fmt.Errorf("--%s is only valid with --%s", FlagVisibilityQuery, FlagFromVisibility) - } - if c.IsSet(FlagWorkers) { - return fmt.Errorf("--%s is only valid with --%s", FlagWorkers, FlagFromVisibility) - } + // --query only takes effect in --from-visibility mode; reject it elsewhere rather than + // silently ignoring it. + if !fromVisibility && c.IsSet(FlagVisibilityQuery) { + return fmt.Errorf("--%s is only valid with --%s", FlagVisibilityQuery, FlagFromVisibility) + } + // --workers applies to the bulk modes (--from-visibility and stdin); it has no effect when + // migrating a single --schedule-id, so reject it there rather than silently ignoring it. + if scheduleID != "" && c.IsSet(FlagWorkers) { + return fmt.Errorf("--%s is only valid with --%s or when piping JSON lines on stdin", FlagWorkers, FlagFromVisibility) } switch { @@ -1079,7 +1079,9 @@ func openMigrateLog(c *cli.Context, summary *migrateSummary) (func(), error) { return func() { _ = logFile.Close() }, nil } -// migrateSchedulesFromStdin reads JSON lines from stdin, one {"namespace","schedule_id"} per line. +// migrateSchedulesFromStdin reads JSON lines from stdin, one {"namespace","schedule_id"} per line, +// feeding them to a pool of --workers goroutines that migrate them concurrently (mirroring +// --from-visibility mode). With the default of one worker, lines are processed in order. func migrateSchedulesFromStdin( c *cli.Context, clientFactory ClientFactory, @@ -1087,6 +1089,10 @@ func migrateSchedulesFromStdin( targetStr string, ) error { execute := c.Bool(FlagExecute) + workers := c.Int(FlagWorkers) + if workers < 1 { + workers = 1 + } adminClient := clientFactory.AdminClient(c) var summary migrateSummary @@ -1096,6 +1102,17 @@ func migrateSchedulesFromStdin( } defer closeLog() + jobs := make(chan migrateJob) + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Go(func() { + for job := range jobs { + migrateOne(c, adminClient, job.namespace, job.scheduleID, target, targetStr, execute, &summary) + } + }) + } + + var readErr error scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) @@ -1107,19 +1124,27 @@ func migrateSchedulesFromStdin( ScheduleID string `json:"schedule_id"` } if err := json.Unmarshal([]byte(line), &record); err != nil { - return fmt.Errorf("invalid JSON line %q: %w", line, err) + readErr = fmt.Errorf("invalid JSON line %q: %w", line, err) + break } if record.Namespace == "" || record.ScheduleID == "" { - return fmt.Errorf("each line must include non-empty \"namespace\" and \"schedule_id\": %q", line) + readErr = fmt.Errorf("each line must include non-empty \"namespace\" and \"schedule_id\": %q", line) + break } - migrateOne(c, adminClient, record.Namespace, record.ScheduleID, target, targetStr, execute, &summary) + jobs <- migrateJob{namespace: record.Namespace, scheduleID: record.ScheduleID} } - if err := scanner.Err(); err != nil { - return fmt.Errorf("error reading stdin: %w", err) + if readErr == nil { + if err := scanner.Err(); err != nil { + readErr = fmt.Errorf("error reading stdin: %w", err) + } } + close(jobs) + wg.Wait() + // Always report what was migrated before surfacing a read error: workers may have already + // migrated the lines read so far, and the user needs to see that partial progress. summary.print(c, execute) - return nil + return readErr } // migrateOne migrates a single schedule (or prints the planned action in dry-run), updating diff --git a/tools/tdbg/schedule_migrate_test.go b/tools/tdbg/schedule_migrate_test.go index 1749b0f752..28616b8cd5 100644 --- a/tools/tdbg/schedule_migrate_test.go +++ b/tools/tdbg/schedule_migrate_test.go @@ -282,13 +282,14 @@ func TestMigrateSchedule_RejectsQueryWithoutFromVisibility(t *testing.T) { require.Contains(t, err.Error(), "from-visibility") } -func TestMigrateSchedule_RejectsWorkersWithoutFromVisibility(t *testing.T) { +func TestMigrateSchedule_RejectsWorkersWithScheduleID(t *testing.T) { + // --workers applies to the bulk modes (--from-visibility and stdin) but is meaningless when + // migrating a single --schedule-id, so it is rejected rather than silently ignored. factory := migrateClientFactory{admin: &migrateAdminClient{}, workflow: &migrateWorkflowClient{}} _, _, err := runMigrate(t, factory, "schedule", "migrate", "--target", "workflow", "--schedule-id", "x", "--workers", "4") require.Error(t, err) require.Contains(t, err.Error(), "workers") - require.Contains(t, err.Error(), "from-visibility") } func TestMigrateSchedule_FromVisibility_RejectsScheduleID(t *testing.T) { @@ -326,6 +327,34 @@ func TestMigrateSchedule_Stdin_Execute(t *testing.T) { } } +func TestMigrateSchedule_Stdin_Workers(t *testing.T) { + admin := &migrateAdminClient{} + factory := migrateClientFactory{admin: admin, workflow: &migrateWorkflowClient{}} + + const n = 12 + lines := make([]string, n) + want := make([]string, n) + for i := range n { + id := fmt.Sprintf("sched-%d", i) + lines[i] = fmt.Sprintf(`{"namespace":"my-ns","schedule_id":%q}`, id) + want[i] = id + } + + withStdin(t, strings.Join(lines, "\n"), func() { + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--execute", "--workers", "4") + require.NoError(t, err) + }) + + require.Len(t, admin.requests, n) + got := make([]string, len(admin.requests)) + for i, req := range admin.requests { + got[i] = req.ScheduleId + } + // Order is not guaranteed with multiple workers. + require.ElementsMatch(t, want, got) +} + func TestMigrateSchedule_Stdin_DryRun(t *testing.T) { admin := &migrateAdminClient{} factory := migrateClientFactory{admin: admin, workflow: &migrateWorkflowClient{}} diff --git a/tools/tdbg/tdbg_commands.go b/tools/tdbg/tdbg_commands.go index 1d2e61681e..eb9fa89fef 100644 --- a/tools/tdbg/tdbg_commands.go +++ b/tools/tdbg/tdbg_commands.go @@ -332,11 +332,11 @@ func newAdminScheduleCommands(clientFactory ClientFactory) []*cli.Command { &cli.IntFlag{ Name: FlagWorkers, Value: defaultMigrateWorkers, - Usage: "Number of concurrent workers migrating schedules in --from-visibility mode", + Usage: "Number of concurrent workers migrating schedules in --from-visibility and stdin modes", }, &cli.StringFlag{ Name: FlagOutputLog, - Usage: "Path to write a structured (JSON lines) log of each migration result in --from-visibility mode", + Usage: "Path to write a structured (JSON lines) log of each migration result in --from-visibility and stdin modes", }, }, Action: func(c *cli.Context) error {