diff --git a/tools/tdbg/commands.go b/tools/tdbg/commands.go index a72c96ce911..68ab81c16e5 100644 --- a/tools/tdbg/commands.go +++ b/tools/tdbg/commands.go @@ -1,10 +1,14 @@ package tdbg import ( + "bufio" + "context" + "encoding/json" "errors" "fmt" "os" "strings" + "sync" "time" "github.com/fatih/color" @@ -27,6 +31,7 @@ import ( "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/worker/scheduler" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -872,45 +877,391 @@ func AdminReplicateWorkflow( return nil } -// AdminMigrateSchedule migrates a schedule between V1 (workflow-backed) and V2 (CHASM). +// AdminMigrateSchedule migrates schedules between V1 (workflow-backed) and V2 (CHASM). +// +// It supports three mutually-exclusive selection modes, all sharing the required --target flag: +// - single: --schedule-id (performs immediately, as before) +// - from visibility: --from-visibility [--query ] (default query is chosen from --target: the +// running V1 schedules when migrating to chasm, the running V2 schedules when migrating to workflow) +// - stdin: JSON lines piped on stdin, one {"namespace":..., "schedule_id":...} per line +// +// The from-visibility and stdin modes default to a dry-run; pass --execute to perform the migration. func AdminMigrateSchedule(c *cli.Context, clientFactory ClientFactory) error { - ns, err := getRequiredOption(c, FlagNamespace) + target, targetStr, err := parseMigrateTarget(c) if err != nil { return err } - scheduleID, err := getRequiredOption(c, FlagScheduleID) - if err != nil { - return err + + fromVisibility := c.Bool(FlagFromVisibility) + scheduleID := c.String(FlagScheduleID) + + // --query only takes effect in --from-visibility mode; reject it elsewhere rather than + // silently ignoring it. + if !fromVisibility && c.IsSet(FlagVisibilityQuery) { + return fmt.Errorf("--%s is only valid with --%s", FlagVisibilityQuery, FlagFromVisibility) + } + // --workers applies to the bulk modes (--from-visibility and stdin); it has no effect when + // migrating a single --schedule-id, so reject it there rather than silently ignoring it. + if scheduleID != "" && c.IsSet(FlagWorkers) { + return fmt.Errorf("--%s is only valid with --%s or when piping JSON lines on stdin", FlagWorkers, FlagFromVisibility) + } + + switch { + case fromVisibility: + if scheduleID != "" { + return fmt.Errorf("--%s cannot be combined with --%s", FlagFromVisibility, FlagScheduleID) + } + return migrateSchedulesFromVisibility(c, clientFactory, target, targetStr) + case scheduleID != "": + return migrateSingleSchedule(c, clientFactory, target, targetStr, scheduleID) + case isStdinPiped(): + return migrateSchedulesFromStdin(c, clientFactory, target, targetStr) + default: + return fmt.Errorf("specify one of: --%s, --%s, or pipe JSON lines on stdin", FlagScheduleID, FlagFromVisibility) } +} + +func parseMigrateTarget(c *cli.Context) (adminservice.MigrateScheduleRequest_SchedulerTarget, string, error) { targetStr, err := getRequiredOption(c, FlagTarget) if err != nil { - return err + return 0, "", err } - var target adminservice.MigrateScheduleRequest_SchedulerTarget switch strings.ToLower(targetStr) { case "chasm": - target = adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_CHASM + return adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_CHASM, targetStr, nil case "workflow": - target = adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW + return adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW, targetStr, nil default: - return fmt.Errorf("invalid target %q, valid values are: chasm, workflow", targetStr) + return 0, "", fmt.Errorf("invalid target %q, valid values are: chasm, workflow", targetStr) + } +} + +// migrateSingleSchedule migrates one schedule and performs the migration immediately. +func migrateSingleSchedule( + c *cli.Context, + clientFactory ClientFactory, + target adminservice.MigrateScheduleRequest_SchedulerTarget, + targetStr string, + scheduleID string, +) error { + ns, err := getRequiredOption(c, FlagNamespace) + if err != nil { + return err } adminClient := clientFactory.AdminClient(c) ctx, cancel := newContext(c) defer cancel() - _, err = adminClient.MigrateSchedule(ctx, &adminservice.MigrateScheduleRequest{ + if err := migrateScheduleRPC(ctx, adminClient, ns, scheduleID, target); err != nil { + return fmt.Errorf("unable to migrate schedule: %w", err) + } + + _, _ = fmt.Fprintf(c.App.Writer, "Successfully initiated migration of schedule %q in namespace %q to %s.\n", scheduleID, ns, targetStr) + return nil +} + +// migrateSchedulesFromVisibility selects schedules via a visibility query and migrates each. +// When --query is not supplied the default query is chosen from the --target direction (see +// defaultMigrateQuery). +func migrateSchedulesFromVisibility( + c *cli.Context, + clientFactory ClientFactory, + target adminservice.MigrateScheduleRequest_SchedulerTarget, + targetStr string, +) error { + ns, err := getRequiredOption(c, FlagNamespace) + if err != nil { + return err + } + + query := c.String(FlagVisibilityQuery) + if query == "" { + query = defaultMigrateQuery(target) + } + + execute := c.Bool(FlagExecute) + workers := c.Int(FlagWorkers) + if workers < 1 { + workers = 1 + } + wfClient := clientFactory.WorkflowClient(c) + adminClient := clientFactory.AdminClient(c) + + // Schedules are listed (paginated) on this goroutine and fed to a pool of workers + // that migrate them concurrently. + var summary migrateSummary + closeLog, err := openMigrateLog(c, &summary) + if err != nil { + return err + } + defer closeLog() + jobs := make(chan migrateJob) + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Go(func() { + for job := range jobs { + migrateOne(c, adminClient, job.namespace, job.scheduleID, target, targetStr, execute, &summary) + } + }) + } + + var listErr error + var nextPageToken []byte + for { + ctx, cancel := newContext(c) + resp, err := wfClient.ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: ns, + Query: query, + NextPageToken: nextPageToken, + }) + cancel() + if err != nil { + listErr = fmt.Errorf("unable to list schedules from visibility: %w", err) + break + } + + for _, exec := range resp.GetExecutions() { + workflowID := exec.GetExecution().GetWorkflowId() + // CHASM scheduler executions store the schedule id directly as the workflow id; + // TrimPrefix is a no-op for them and handles any V1 records defensively. + scheduleID := strings.TrimPrefix(workflowID, primitives.ScheduleWorkflowIDPrefix) + jobs <- migrateJob{namespace: ns, scheduleID: scheduleID} + } + + nextPageToken = resp.GetNextPageToken() + if len(nextPageToken) == 0 { + break + } + } + close(jobs) + wg.Wait() + + // Always report what was migrated before surfacing a listing error: if pagination fails + // partway through, workers may have already migrated the schedules listed so far, and the + // user needs to see that partial progress. + summary.print(c, execute) + return listErr +} + +type migrateJob struct { + namespace string + scheduleID string +} + +// defaultMigrateQuery returns the visibility query selecting which schedules to migrate when +// --query is not supplied. The direction is inferred from --target: migrating to CHASM (V2) +// selects the running V1 (workflow-backed) schedules to move forward, while migrating to +// workflow (V1) selects the running V2 (CHASM) schedules to roll back. +func defaultMigrateQuery(target adminservice.MigrateScheduleRequest_SchedulerTarget) string { + if target == adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_CHASM { + // Forward migration V1 -> V2: all running V1 (workflow-backed) schedules. + return fmt.Sprintf("TemporalNamespaceDivision = '%s' AND ExecutionStatus = 'Running'", scheduler.NamespaceDivision) + } + // Rollback V2 -> V1: all running V2 (CHASM) schedules. The explicit TemporalNamespaceDivision + // filter is required, otherwise the visibility query converter appends + // "TemporalNamespaceDivision IS NULL" and excludes CHASM executions. + return fmt.Sprintf("TemporalNamespaceDivision = '%d' AND ExecutionStatus = 'Running'", chasm.SchedulerArchetypeID) +} + +// openMigrateLog wires summary.logEnc to the --output-log file when the flag is set, returning a +// cleanup func that closes the file (a no-op when the flag is unset). +func openMigrateLog(c *cli.Context, summary *migrateSummary) (func(), error) { + logPath := c.String(FlagOutputLog) + if logPath == "" { + return func() {}, nil + } + logFile, err := os.Create(logPath) + if err != nil { + return nil, fmt.Errorf("unable to open output log %q: %w", logPath, err) + } + summary.logEnc = json.NewEncoder(logFile) + return func() { _ = logFile.Close() }, nil +} + +// migrateSchedulesFromStdin reads JSON lines from stdin, one {"namespace","schedule_id"} per line, +// feeding them to a pool of --workers goroutines that migrate them concurrently (mirroring +// --from-visibility mode). With the default of one worker, lines are processed in order. +func migrateSchedulesFromStdin( + c *cli.Context, + clientFactory ClientFactory, + target adminservice.MigrateScheduleRequest_SchedulerTarget, + targetStr string, +) error { + execute := c.Bool(FlagExecute) + workers := c.Int(FlagWorkers) + if workers < 1 { + workers = 1 + } + adminClient := clientFactory.AdminClient(c) + + var summary migrateSummary + closeLog, err := openMigrateLog(c, &summary) + if err != nil { + return err + } + defer closeLog() + + jobs := make(chan migrateJob) + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Go(func() { + for job := range jobs { + migrateOne(c, adminClient, job.namespace, job.scheduleID, target, targetStr, execute, &summary) + } + }) + } + + var readErr error + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + var record struct { + Namespace string `json:"namespace"` + ScheduleID string `json:"schedule_id"` + } + if err := json.Unmarshal([]byte(line), &record); err != nil { + readErr = fmt.Errorf("invalid JSON line %q: %w", line, err) + break + } + if record.Namespace == "" || record.ScheduleID == "" { + readErr = fmt.Errorf("each line must include non-empty \"namespace\" and \"schedule_id\": %q", line) + break + } + jobs <- migrateJob{namespace: record.Namespace, scheduleID: record.ScheduleID} + } + if readErr == nil { + if err := scanner.Err(); err != nil { + readErr = fmt.Errorf("error reading stdin: %w", err) + } + } + close(jobs) + wg.Wait() + + // Always report what was migrated before surfacing a read error: workers may have already + // migrated the lines read so far, and the user needs to see that partial progress. + summary.print(c, execute) + return readErr +} + +// migrateOne migrates a single schedule (or prints the planned action in dry-run), updating +// summary. It is safe to call concurrently from multiple workers: the migration RPC runs +// outside the summary lock, while counter updates and output are serialized. +func migrateOne( + c *cli.Context, + adminClient adminservice.AdminServiceClient, + ns string, + scheduleID string, + target adminservice.MigrateScheduleRequest_SchedulerTarget, + targetStr string, + execute bool, + summary *migrateSummary, +) { + if !execute { + summary.recordDryRun(c, ns, scheduleID, targetStr) + return + } + + ctx, cancel := newContext(c) + defer cancel() + err := migrateScheduleRPC(ctx, adminClient, ns, scheduleID, target) + summary.recordResult(c, ns, scheduleID, targetStr, err) +} + +func migrateScheduleRPC( + ctx context.Context, + adminClient adminservice.AdminServiceClient, + ns string, + scheduleID string, + target adminservice.MigrateScheduleRequest_SchedulerTarget, +) error { + _, err := adminClient.MigrateSchedule(ctx, &adminservice.MigrateScheduleRequest{ Namespace: ns, ScheduleId: scheduleID, Target: target, Identity: getCurrentUserFromEnv(), RequestId: uuid.NewString(), }) + return err +} + +// migrateLogRecord is one structured entry written to --output-log per schedule. +type migrateLogRecord struct { + Timestamp string `json:"timestamp"` + Namespace string `json:"namespace"` + ScheduleID string `json:"schedule_id"` + Target string `json:"target"` + Status string `json:"status"` // "migrated", "failed", or "dry-run" + Error string `json:"error,omitempty"` +} + +type migrateSummary struct { + mu sync.Mutex + planned int + migrated int + failed int + logEnc *json.Encoder // optional; writes one migrateLogRecord per result +} + +func (s *migrateSummary) recordDryRun(c *cli.Context, ns, scheduleID, targetStr string) { + s.mu.Lock() + defer s.mu.Unlock() + s.planned++ + _, _ = fmt.Fprintf(c.App.Writer, "[dry-run] would migrate %s/%s -> %s\n", ns, scheduleID, targetStr) + s.writeLogLocked(ns, scheduleID, targetStr, "dry-run", nil) +} + +func (s *migrateSummary) recordResult(c *cli.Context, ns, scheduleID, targetStr string, err error) { + s.mu.Lock() + defer s.mu.Unlock() + s.planned++ if err != nil { - return fmt.Errorf("unable to migrate schedule: %w", err) + s.failed++ + _, _ = fmt.Fprintf(c.App.ErrWriter, "failed to migrate %s/%s: %v\n", ns, scheduleID, err) + s.writeLogLocked(ns, scheduleID, targetStr, "failed", err) + return + } + s.migrated++ + _, _ = fmt.Fprintf(c.App.Writer, "migrated %s/%s -> %s\n", ns, scheduleID, targetStr) + s.writeLogLocked(ns, scheduleID, targetStr, "migrated", nil) +} + +// writeLogLocked appends a structured record to the output log. Callers must hold s.mu. +func (s *migrateSummary) writeLogLocked(ns, scheduleID, targetStr, status string, err error) { + if s.logEnc == nil { + return + } + rec := migrateLogRecord{ + Timestamp: time.Now().UTC().Format(time.RFC3339), + Namespace: ns, + ScheduleID: scheduleID, + Target: targetStr, + Status: status, + } + if err != nil { + rec.Error = err.Error() } + _ = s.logEnc.Encode(&rec) +} - _, _ = fmt.Fprintf(c.App.Writer, "Successfully initiated migration of schedule %q in namespace %q to %s.\n", scheduleID, ns, targetStr) - return nil +func (s *migrateSummary) print(c *cli.Context, execute bool) { + s.mu.Lock() + defer s.mu.Unlock() + if !execute { + _, _ = fmt.Fprintf(c.App.Writer, "Dry-run: %d schedule(s) would be migrated. Re-run with --%s to perform.\n", s.planned, FlagExecute) + return + } + _, _ = fmt.Fprintf(c.App.Writer, "Done: %d migrated, %d failed (of %d).\n", s.migrated, s.failed, s.planned) +} + +// isStdinPiped reports whether stdin is connected to a pipe or file rather than a terminal. +func isStdinPiped() bool { + fi, err := os.Stdin.Stat() + if err != nil { + return false + } + return (fi.Mode() & os.ModeCharDevice) == 0 } diff --git a/tools/tdbg/flags.go b/tools/tdbg/flags.go index ee9e558cd3b..ee35fcbed13 100644 --- a/tools/tdbg/flags.go +++ b/tools/tdbg/flags.go @@ -80,4 +80,10 @@ var ( FlagScheduleID = "schedule-id" FlagScheduleIDAlias = []string{"sid"} FlagTarget = "target" + FlagFromVisibility = "from-visibility" + FlagExecute = "execute" + FlagWorkers = "workers" + FlagOutputLog = "output-log" ) + +const defaultMigrateWorkers = 5 diff --git a/tools/tdbg/schedule_migrate_test.go b/tools/tdbg/schedule_migrate_test.go new file mode 100644 index 00000000000..28616b8cd5f --- /dev/null +++ b/tools/tdbg/schedule_migrate_test.go @@ -0,0 +1,421 @@ +package tdbg_test + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "github.com/urfave/cli/v2" + commonpb "go.temporal.io/api/common/v1" + workflowpb "go.temporal.io/api/workflow/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/api/adminservice/v1" + "go.temporal.io/server/chasm" + "go.temporal.io/server/service/worker/scheduler" + "go.temporal.io/server/tools/tdbg" + "go.temporal.io/server/tools/tdbg/tdbgtest" + "google.golang.org/grpc" +) + +type migrateAdminClient struct { + adminservice.AdminServiceClient + err error + failIDs map[string]string // scheduleID -> error message + + mu sync.Mutex + requests []*adminservice.MigrateScheduleRequest +} + +func (c *migrateAdminClient) MigrateSchedule( + _ context.Context, + req *adminservice.MigrateScheduleRequest, + _ ...grpc.CallOption, +) (*adminservice.MigrateScheduleResponse, error) { + c.mu.Lock() + c.requests = append(c.requests, req) + c.mu.Unlock() + if c.err != nil { + return nil, c.err + } + if msg, ok := c.failIDs[req.ScheduleId]; ok { + return nil, errors.New(msg) + } + return &adminservice.MigrateScheduleResponse{}, nil +} + +type migrateWorkflowClient struct { + workflowservice.WorkflowServiceClient + pages []*workflowservice.ListWorkflowExecutionsResponse + next int + requests []*workflowservice.ListWorkflowExecutionsRequest +} + +func (c *migrateWorkflowClient) ListWorkflowExecutions( + _ context.Context, + req *workflowservice.ListWorkflowExecutionsRequest, + _ ...grpc.CallOption, +) (*workflowservice.ListWorkflowExecutionsResponse, error) { + c.requests = append(c.requests, req) + if c.next >= len(c.pages) { + return &workflowservice.ListWorkflowExecutionsResponse{}, nil + } + resp := c.pages[c.next] + c.next++ + return resp, nil +} + +type migrateClientFactory struct { + admin adminservice.AdminServiceClient + workflow workflowservice.WorkflowServiceClient +} + +func (f migrateClientFactory) AdminClient(*cli.Context) adminservice.AdminServiceClient { + return f.admin +} + +func (f migrateClientFactory) WorkflowClient(*cli.Context) workflowservice.WorkflowServiceClient { + return f.workflow +} + +func scheduleExecution(workflowID string) *workflowpb.WorkflowExecutionInfo { + return &workflowpb.WorkflowExecutionInfo{ + Execution: &commonpb.WorkflowExecution{WorkflowId: workflowID}, + } +} + +func runMigrate(t *testing.T, factory tdbg.ClientFactory, args ...string) (stdoutStr, stderrStr string, err error) { + t.Helper() + var stdout, stderr bytes.Buffer + app := tdbgtest.NewCliApp(func(params *tdbg.Params) { + params.ClientFactory = factory + params.Writer = &stdout + params.ErrWriter = &stderr + }) + runArgs := append([]string{"tdbg"}, args...) + err = app.Run(runArgs) + return stdout.String(), stderr.String(), err +} + +func TestMigrateSchedule_FromVisibility_DryRun(t *testing.T) { + admin := &migrateAdminClient{} + wf := &migrateWorkflowClient{ + pages: []*workflowservice.ListWorkflowExecutionsResponse{ + { + Executions: []*workflowpb.WorkflowExecutionInfo{scheduleExecution("sched-a")}, + NextPageToken: []byte("page2"), + }, + { + Executions: []*workflowpb.WorkflowExecutionInfo{scheduleExecution("sched-b")}, + }, + }, + } + factory := migrateClientFactory{admin: admin, workflow: wf} + + stdout, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "workflow", "--from-visibility") + require.NoError(t, err) + + // Dry-run performs no migrations. + require.Empty(t, admin.requests) + // Default query is built from the CHASM scheduler archetype ID and scoped to --namespace. + require.NotEmpty(t, wf.requests) + expectedQuery := fmt.Sprintf("TemporalNamespaceDivision = '%d' AND ExecutionStatus = 'Running'", chasm.SchedulerArchetypeID) + require.Equal(t, expectedQuery, wf.requests[0].Query) + require.Equal(t, "my-ns", wf.requests[0].Namespace) + // Both pages are listed and reported. + require.Contains(t, stdout, "[dry-run] would migrate my-ns/sched-a -> workflow") + require.Contains(t, stdout, "[dry-run] would migrate my-ns/sched-b -> workflow") + require.Contains(t, stdout, "Dry-run: 2 schedule(s)") +} + +func TestMigrateSchedule_FromVisibility_Execute(t *testing.T) { + admin := &migrateAdminClient{} + wf := &migrateWorkflowClient{ + pages: []*workflowservice.ListWorkflowExecutionsResponse{ + { + Executions: []*workflowpb.WorkflowExecutionInfo{ + // CHASM execution: workflow id is the schedule id directly. + scheduleExecution("sched-v2"), + // V1 execution: the scheduler workflow-id prefix is trimmed off. + scheduleExecution("temporal-sys-scheduler:sched-v1"), + }, + }, + }, + } + factory := migrateClientFactory{admin: admin, workflow: wf} + + _, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "workflow", "--from-visibility", "--execute") + require.NoError(t, err) + + require.Len(t, admin.requests, 2) + ids := []string{admin.requests[0].ScheduleId, admin.requests[1].ScheduleId} + require.ElementsMatch(t, []string{"sched-v2", "sched-v1"}, ids) + for _, req := range admin.requests { + require.Equal(t, "my-ns", req.Namespace) + require.Equal(t, adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW, req.Target) + require.NotEmpty(t, req.RequestId) + } +} + +func TestMigrateSchedule_FromVisibility_Workers(t *testing.T) { + admin := &migrateAdminClient{} + const n = 12 + execs := make([]*workflowpb.WorkflowExecutionInfo, n) + want := make([]string, n) + for i := range n { + id := fmt.Sprintf("sched-%d", i) + execs[i] = scheduleExecution(id) + want[i] = id + } + wf := &migrateWorkflowClient{ + pages: []*workflowservice.ListWorkflowExecutionsResponse{{Executions: execs}}, + } + factory := migrateClientFactory{admin: admin, workflow: wf} + + _, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "workflow", "--from-visibility", + "--execute", "--workers", "4") + require.NoError(t, err) + + require.Len(t, admin.requests, n) + got := make([]string, len(admin.requests)) + for i, req := range admin.requests { + got[i] = req.ScheduleId + } + require.ElementsMatch(t, want, got) +} + +func TestMigrateSchedule_FromVisibility_OutputLog(t *testing.T) { + admin := &migrateAdminClient{failIDs: map[string]string{"sched-bad": "boom"}} + wf := &migrateWorkflowClient{ + pages: []*workflowservice.ListWorkflowExecutionsResponse{{ + Executions: []*workflowpb.WorkflowExecutionInfo{ + scheduleExecution("sched-ok"), + scheduleExecution("sched-bad"), + }, + }}, + } + factory := migrateClientFactory{admin: admin, workflow: wf} + + logPath := filepath.Join(t.TempDir(), "migrations.jsonl") + _, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "workflow", "--from-visibility", + "--execute", "--output-log", logPath) + require.NoError(t, err) + + type logRec struct { + Timestamp string `json:"timestamp"` + Namespace string `json:"namespace"` + ScheduleID string `json:"schedule_id"` + Target string `json:"target"` + Status string `json:"status"` + Error string `json:"error"` + } + data, err := os.ReadFile(logPath) + require.NoError(t, err) + byID := map[string]logRec{} + for _, line := range strings.Split(strings.TrimSpace(string(data)), "\n") { + var rec logRec + require.NoError(t, json.Unmarshal([]byte(line), &rec)) + byID[rec.ScheduleID] = rec + } + + require.Len(t, byID, 2) + + ok := byID["sched-ok"] + require.Equal(t, "migrated", ok.Status) + require.Equal(t, "my-ns", ok.Namespace) + require.Equal(t, "workflow", ok.Target) + require.Empty(t, ok.Error) + require.NotEmpty(t, ok.Timestamp) + + bad := byID["sched-bad"] + require.Equal(t, "failed", bad.Status) + require.Contains(t, bad.Error, "boom") +} + +func TestMigrateSchedule_FromVisibility_CustomQuery(t *testing.T) { + admin := &migrateAdminClient{} + wf := &migrateWorkflowClient{} + factory := migrateClientFactory{admin: admin, workflow: wf} + + customQuery := "TemporalNamespaceDivision = '403648407' AND ScheduleId = 'only-this'" + _, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "workflow", "--from-visibility", + "--query", customQuery) + require.NoError(t, err) + + require.NotEmpty(t, wf.requests) + require.Equal(t, customQuery, wf.requests[0].Query) +} + +func TestMigrateSchedule_FromVisibility_DefaultQueryToChasm(t *testing.T) { + admin := &migrateAdminClient{} + wf := &migrateWorkflowClient{} + factory := migrateClientFactory{admin: admin, workflow: wf} + + // Migrating to chasm (V1 -> V2) defaults to selecting running V1 (workflow-backed) schedules. + _, _, err := runMigrate(t, factory, + "-n", "my-ns", "schedule", "migrate", "--target", "chasm", "--from-visibility") + require.NoError(t, err) + + require.NotEmpty(t, wf.requests) + expectedQuery := fmt.Sprintf("TemporalNamespaceDivision = '%s' AND ExecutionStatus = 'Running'", scheduler.NamespaceDivision) + require.Equal(t, expectedQuery, wf.requests[0].Query) +} + +func TestMigrateSchedule_RejectsQueryWithoutFromVisibility(t *testing.T) { + factory := migrateClientFactory{admin: &migrateAdminClient{}, workflow: &migrateWorkflowClient{}} + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--schedule-id", "x", "--query", "ScheduleId = 'x'") + require.Error(t, err) + require.Contains(t, err.Error(), "query") + require.Contains(t, err.Error(), "from-visibility") +} + +func TestMigrateSchedule_RejectsWorkersWithScheduleID(t *testing.T) { + // --workers applies to the bulk modes (--from-visibility and stdin) but is meaningless when + // migrating a single --schedule-id, so it is rejected rather than silently ignored. + factory := migrateClientFactory{admin: &migrateAdminClient{}, workflow: &migrateWorkflowClient{}} + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--schedule-id", "x", "--workers", "4") + require.Error(t, err) + require.Contains(t, err.Error(), "workers") +} + +func TestMigrateSchedule_FromVisibility_RejectsScheduleID(t *testing.T) { + factory := migrateClientFactory{admin: &migrateAdminClient{}, workflow: &migrateWorkflowClient{}} + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--from-visibility", "--schedule-id", "x") + require.Error(t, err) + require.Contains(t, err.Error(), "from-visibility") + require.Contains(t, err.Error(), "schedule-id") +} + +func TestMigrateSchedule_Stdin_Execute(t *testing.T) { + admin := &migrateAdminClient{} + factory := migrateClientFactory{admin: admin, workflow: &migrateWorkflowClient{}} + + stdin := strings.Join([]string{ + `{"namespace":"ns-1","schedule_id":"sched-1"}`, + ``, // blank lines are skipped + `{"namespace":"ns-2","schedule_id":"sched-2"}`, + }, "\n") + + withStdin(t, stdin, func() { + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--execute") + require.NoError(t, err) + }) + + require.Len(t, admin.requests, 2) + require.Equal(t, "ns-1", admin.requests[0].Namespace) + require.Equal(t, "sched-1", admin.requests[0].ScheduleId) + require.Equal(t, "ns-2", admin.requests[1].Namespace) + require.Equal(t, "sched-2", admin.requests[1].ScheduleId) + for _, req := range admin.requests { + require.Equal(t, adminservice.MigrateScheduleRequest_SCHEDULER_TARGET_WORKFLOW, req.Target) + } +} + +func TestMigrateSchedule_Stdin_Workers(t *testing.T) { + admin := &migrateAdminClient{} + factory := migrateClientFactory{admin: admin, workflow: &migrateWorkflowClient{}} + + const n = 12 + lines := make([]string, n) + want := make([]string, n) + for i := range n { + id := fmt.Sprintf("sched-%d", i) + lines[i] = fmt.Sprintf(`{"namespace":"my-ns","schedule_id":%q}`, id) + want[i] = id + } + + withStdin(t, strings.Join(lines, "\n"), func() { + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--execute", "--workers", "4") + require.NoError(t, err) + }) + + require.Len(t, admin.requests, n) + got := make([]string, len(admin.requests)) + for i, req := range admin.requests { + got[i] = req.ScheduleId + } + // Order is not guaranteed with multiple workers. + require.ElementsMatch(t, want, got) +} + +func TestMigrateSchedule_Stdin_DryRun(t *testing.T) { + admin := &migrateAdminClient{} + factory := migrateClientFactory{admin: admin, workflow: &migrateWorkflowClient{}} + + withStdin(t, `{"namespace":"ns-1","schedule_id":"sched-1"}`, func() { + stdout, _, err := runMigrate(t, factory, "schedule", "migrate", "--target", "workflow") + require.NoError(t, err) + require.Contains(t, stdout, "[dry-run] would migrate ns-1/sched-1 -> workflow") + }) + + require.Empty(t, admin.requests) +} + +func TestMigrateSchedule_Stdin_OutputLog(t *testing.T) { + admin := &migrateAdminClient{} + factory := migrateClientFactory{admin: admin, workflow: &migrateWorkflowClient{}} + + logPath := filepath.Join(t.TempDir(), "migrations.jsonl") + withStdin(t, `{"namespace":"ns-1","schedule_id":"sched-1"}`, func() { + _, _, err := runMigrate(t, factory, + "schedule", "migrate", "--target", "workflow", "--execute", "--output-log", logPath) + require.NoError(t, err) + }) + + data, err := os.ReadFile(logPath) + require.NoError(t, err) + var rec struct { + Namespace string `json:"namespace"` + ScheduleID string `json:"schedule_id"` + Target string `json:"target"` + Status string `json:"status"` + } + require.NoError(t, json.Unmarshal([]byte(strings.TrimSpace(string(data))), &rec)) + require.Equal(t, "ns-1", rec.Namespace) + require.Equal(t, "sched-1", rec.ScheduleID) + require.Equal(t, "workflow", rec.Target) + require.Equal(t, "migrated", rec.Status) +} + +// withStdin redirects os.Stdin to a temp file containing content for the duration of fn. +// A regular file is not a character device, so the command's piped-stdin detection treats it +// as piped input. +// +// NOTE: os.Stdin is process-global, so tests using withStdin must NOT call t.Parallel(). +// TODO: inject the input reader through tdbg.Params instead of mutating os.Stdin, which would +// remove this constraint (deferred to a follow-up PR to keep this change small). +func withStdin(t *testing.T, content string, fn func()) { + t.Helper() + f, err := os.CreateTemp(t.TempDir(), "stdin") + require.NoError(t, err) + _, err = f.WriteString(content) + require.NoError(t, err) + require.NoError(t, f.Sync()) + _, err = f.Seek(0, 0) + require.NoError(t, err) + + orig := os.Stdin + os.Stdin = f + defer func() { + os.Stdin = orig + _ = f.Close() + }() + fn() +} diff --git a/tools/tdbg/tdbg_commands.go b/tools/tdbg/tdbg_commands.go index c815978b218..eb9fa89fefa 100644 --- a/tools/tdbg/tdbg_commands.go +++ b/tools/tdbg/tdbg_commands.go @@ -304,16 +304,40 @@ func newAdminScheduleCommands(clientFactory ClientFactory) []*cli.Command { Usage: "Migrate a schedule between V1 (workflow-backed) and V2 (CHASM)", Flags: []cli.Flag{ &cli.StringFlag{ - Name: FlagScheduleID, - Aliases: FlagScheduleIDAlias, - Usage: "Schedule ID", - Required: true, + Name: FlagScheduleID, + Aliases: FlagScheduleIDAlias, + Usage: "Schedule ID (single-schedule mode)", }, &cli.StringFlag{ Name: FlagTarget, Usage: "Target scheduler implementation: chasm, workflow", Required: true, }, + &cli.BoolFlag{ + Name: FlagFromVisibility, + Usage: "Select schedules from visibility instead of --schedule-id, scoped to --namespace. " + + "The default query is chosen from --target: migrating to chasm selects running V1 schedules, " + + "migrating to workflow selects running V2 schedules. Override with --query", + }, + &cli.StringFlag{ + Name: FlagVisibilityQuery, + Usage: "Visibility query used with --from-visibility, overriding the target-based default. The defaults are:\n" + + "\tV1 (workflow-backed): TemporalNamespaceDivision = 'TemporalScheduler' AND ExecutionStatus = 'Running'\n" + + "\tV2 (CHASM): TemporalNamespaceDivision = '' AND ExecutionStatus = 'Running'", + }, + &cli.BoolFlag{ + Name: FlagExecute, + Usage: "Perform the migration. Without this flag, --from-visibility and stdin modes only print what they would do (dry-run)", + }, + &cli.IntFlag{ + Name: FlagWorkers, + Value: defaultMigrateWorkers, + Usage: "Number of concurrent workers migrating schedules in --from-visibility and stdin modes", + }, + &cli.StringFlag{ + Name: FlagOutputLog, + Usage: "Path to write a structured (JSON lines) log of each migration result in --from-visibility and stdin modes", + }, }, Action: func(c *cli.Context) error { return AdminMigrateSchedule(c, clientFactory)