diff --git a/internal/pgengine/access.go b/internal/pgengine/access.go index 66716aed..182957e1 100644 --- a/internal/pgengine/access.go +++ b/internal/pgengine/access.go @@ -28,7 +28,7 @@ func (pge *PgEngine) IsAlive() bool { } // LogTaskExecution will log current chain element execution status including retcode -func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retCode int, output string) { +func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retCode int, output string, params string) { switch pge.Logging.LogDBLevel { case "none": return @@ -38,12 +38,12 @@ func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retC } } _, err := pge.ConfigDb.Exec(ctx, `INSERT INTO timetable.execution_log ( -chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, client_name, txid, ignore_error) -VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10, $11)`, +chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, client_name, txid, ignore_error, params) +VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10, $11, $12)`, task.ChainID, task.TaskID, task.Command, task.Kind, fmt.Sprintf("%f seconds", float64(task.Duration)/1000000), retCode, pge.Getsid(), strings.TrimSpace(output), pge.ClientName, task.Vxid, - task.IgnoreError) + task.IgnoreError, params) if err != nil { pge.l.WithError(err).Error("Failed to log chain element execution status") } diff --git a/internal/pgengine/access_test.go b/internal/pgengine/access_test.go index e7f3f98c..4cb06e47 100644 --- a/internal/pgengine/access_test.go +++ b/internal/pgengine/access_test.go @@ -107,9 +107,9 @@ func TestLogChainElementExecution(t *testing.T) { t.Run("Check LogChainElementExecution if sql fails", func(*testing.T) { mockPool.ExpectExec("INSERT INTO .*execution_log").WithArgs( pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), - pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg()). + pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg()). WillReturnError(errors.New("Failed to log chain element execution status")) - pge.LogTaskExecution(context.Background(), &pgengine.ChainTask{}, 0, "STATUS") + pge.LogTaskExecution(context.Background(), &pgengine.ChainTask{}, 0, "STATUS", "") }) assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations") diff --git a/internal/pgengine/migration.go b/internal/pgengine/migration.go index 7b93c8d9..6486480c 100644 --- a/internal/pgengine/migration.go +++ b/internal/pgengine/migration.go @@ -147,6 +147,12 @@ var Migrations func() migrator.Option = func() migrator.Option { return ExecuteMigrationScript(ctx, tx, "00721.sql") }, }, + &migrator.Migration{ + Name: "00733 Add params column to timetable.execution_log table", + Func: func(ctx context.Context, tx pgx.Tx) error { + return ExecuteMigrationScript(ctx, tx, "00733.sql") + }, + }, // adding new migration here, update "timetable"."migration" in "sql/init.sql" // and "dbapi" variable in main.go! diff --git a/internal/pgengine/pgengine_test.go b/internal/pgengine/pgengine_test.go index 6e3cf7de..7f4255e2 100644 --- a/internal/pgengine/pgengine_test.go +++ b/internal/pgengine/pgengine_test.go @@ -135,7 +135,7 @@ func TestSchedulerFunctions(t *testing.T) { assert.NoError(t, err, "Should start transaction") assert.Greater(t, txid, int64(0), "Should return transaction id") f := func(sql string, params []string) error { - _, err := pge.ExecuteSQLCommand(ctx, tx, sql, params) + err := pge.ExecuteSQLCommand(ctx, tx, &pgengine.ChainTask{Command: sql}, params) return err } assert.Error(t, f("", nil), "Should error for empty script") diff --git a/internal/pgengine/sql/ddl.sql b/internal/pgengine/sql/ddl.sql index 8bff1d76..00ef3d49 100644 --- a/internal/pgengine/sql/ddl.sql +++ b/internal/pgengine/sql/ddl.sql @@ -118,7 +118,8 @@ CREATE TABLE timetable.execution_log ( kind timetable.command_kind, command TEXT, output TEXT, - client_name TEXT NOT NULL + client_name TEXT NOT NULL, + params TEXT ); COMMENT ON TABLE timetable.execution_log IS diff --git a/internal/pgengine/sql/init.sql b/internal/pgengine/sql/init.sql index 2b1b9eb8..97572c76 100644 --- a/internal/pgengine/sql/init.sql +++ b/internal/pgengine/sql/init.sql @@ -27,4 +27,5 @@ VALUES (11, '00573 Add ability to start a chain with delay'), (12, '00575 Add on_error handling'), (13, '00629 Add ignore_error column to timetable.execution_log'), - (14, '00721 Add more job control functions'); \ No newline at end of file + (14, '00721 Add more job control functions'), + (15, '00733 Add params column to timetable.execution_log table'); \ No newline at end of file diff --git a/internal/pgengine/sql/migrations/00733.sql b/internal/pgengine/sql/migrations/00733.sql new file mode 100644 index 00000000..436d15d5 --- /dev/null +++ b/internal/pgengine/sql/migrations/00733.sql @@ -0,0 +1,4 @@ +-- Add params column to execution_log table to store parameter values used during task execution +ALTER TABLE timetable.execution_log ADD COLUMN params TEXT; + +COMMENT ON COLUMN timetable.execution_log.params IS 'Array of parameter values used during task execution'; diff --git a/internal/pgengine/transaction.go b/internal/pgengine/transaction.go index 50dac9ee..f0446463 100644 --- a/internal/pgengine/transaction.go +++ b/internal/pgengine/transaction.go @@ -10,7 +10,6 @@ import ( "github.com/cybertec-postgresql/pg_timetable/internal/log" pgx "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" ) // StartTransaction returns transaction object, virtual transaction id and error @@ -56,7 +55,7 @@ func (pge *PgEngine) MustRollbackToSavepoint(ctx context.Context, tx pgx.Tx, tas } // ExecuteSQLTask executes SQL task -func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (out string, err error) { +func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (err error) { switch { case task.IsRemote(): return pge.ExecRemoteSQLTask(ctx, task, paramValues) @@ -68,15 +67,15 @@ func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainT } // ExecLocalSQLTask executes local task in the chain transaction -func (pge *PgEngine) ExecLocalSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (out string, err error) { +func (pge *PgEngine) ExecLocalSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (err error) { if err := pge.SetRole(ctx, tx, task.RunAs); err != nil { - return "", err + return err } if task.IgnoreError { pge.MustSavepoint(ctx, tx, task.TaskID) } pge.SetCurrentTaskContext(ctx, tx, task.ChainID, task.TaskID) - out, err = pge.ExecuteSQLCommand(ctx, tx, task.Command, paramValues) + err = pge.ExecuteSQLCommand(ctx, tx, task, paramValues) if err != nil && task.IgnoreError { pge.MustRollbackToSavepoint(ctx, tx, task.TaskID) } @@ -87,21 +86,21 @@ func (pge *PgEngine) ExecLocalSQLTask(ctx context.Context, tx pgx.Tx, task *Chai } // ExecStandaloneTask executes task against the provided connection interface, it can be remote connection or acquired connection from the pool -func (pge *PgEngine) ExecStandaloneTask(ctx context.Context, connf func() (PgxConnIface, error), task *ChainTask, paramValues []string) (string, error) { +func (pge *PgEngine) ExecStandaloneTask(ctx context.Context, connf func() (PgxConnIface, error), task *ChainTask, paramValues []string) error { conn, err := connf() if err != nil { - return "", err + return err } defer pge.FinalizeDBConnection(ctx, conn) if err := pge.SetRole(ctx, conn, task.RunAs); err != nil { - return "", err + return err } pge.SetCurrentTaskContext(ctx, conn, task.ChainID, task.TaskID) - return pge.ExecuteSQLCommand(ctx, conn, task.Command, paramValues) + return pge.ExecuteSQLCommand(ctx, conn, task, paramValues) } // ExecRemoteSQLTask executes task against remote connection -func (pge *PgEngine) ExecRemoteSQLTask(ctx context.Context, task *ChainTask, paramValues []string) (string, error) { +func (pge *PgEngine) ExecRemoteSQLTask(ctx context.Context, task *ChainTask, paramValues []string) error { log.GetLogger(ctx).Info("Switching to remote task mode") return pge.ExecStandaloneTask(ctx, func() (PgxConnIface, error) { return pge.GetRemoteDBConnection(ctx, task.ConnectString) }, @@ -109,7 +108,7 @@ func (pge *PgEngine) ExecRemoteSQLTask(ctx context.Context, task *ChainTask, par } // ExecAutonomousSQLTask executes autonomous task in an acquired connection from pool -func (pge *PgEngine) ExecAutonomousSQLTask(ctx context.Context, task *ChainTask, paramValues []string) (string, error) { +func (pge *PgEngine) ExecAutonomousSQLTask(ctx context.Context, task *ChainTask, paramValues []string) error { log.GetLogger(ctx).Info("Switching to autonomous task mode") return pge.ExecStandaloneTask(ctx, func() (PgxConnIface, error) { return pge.GetLocalDBConnection(ctx) }, @@ -117,25 +116,27 @@ func (pge *PgEngine) ExecAutonomousSQLTask(ctx context.Context, task *ChainTask, } // ExecuteSQLCommand executes chain command with parameters inside transaction -func (pge *PgEngine) ExecuteSQLCommand(ctx context.Context, executor executor, command string, paramValues []string) (out string, err error) { - var ct pgconn.CommandTag +func (pge *PgEngine) ExecuteSQLCommand(ctx context.Context, executor executor, task *ChainTask, paramValues []string) (err error) { var params []any - if strings.TrimSpace(command) == "" { - return "", errors.New("SQL command cannot be empty") + var errCodes = map[bool]int{false: 0, true: -1} + if strings.TrimSpace(task.Command) == "" { + return errors.New("SQL command cannot be empty") } if len(paramValues) == 0 { //mimic empty param - ct, err = executor.Exec(ctx, command) - out = ct.String() - return + ct, e := executor.Exec(ctx, task.Command) + pge.LogTaskExecution(context.Background(), task, errCodes[err != nil], ct.String(), "") + return e } for _, val := range paramValues { - if val > "" { - if err = json.Unmarshal([]byte(val), ¶ms); err != nil { - return - } - ct, err = executor.Exec(ctx, command, params...) - out = out + ct.String() + "\n" + if val == "" { + continue + } + if err = json.Unmarshal([]byte(val), ¶ms); err != nil { + return } + ct, e := executor.Exec(ctx, task.Command, params...) + err = errors.Join(err, e) + pge.LogTaskExecution(context.Background(), task, errCodes[e != nil], ct.String(), val) } return } diff --git a/internal/pgengine/transaction_test.go b/internal/pgengine/transaction_test.go index 9ac88e6e..4cb5561d 100644 --- a/internal/pgengine/transaction_test.go +++ b/internal/pgengine/transaction_test.go @@ -88,13 +88,13 @@ func TestExecuteSQLTask(t *testing.T) { pge := pgengine.NewDB(mockPool, "pgengine_unit_test") t.Run("Check autonomous SQL task", func(t *testing.T) { - _, err := pge.ExecuteSQLTask(ctx, nil, &pgengine.ChainTask{Autonomous: true}, []string{}) + err := pge.ExecuteSQLTask(ctx, nil, &pgengine.ChainTask{Autonomous: true}, []string{}) assert.ErrorContains(t, err, "pgpool.Acquire() method is not implemented") }) t.Run("Check remote SQL task", func(t *testing.T) { task := pgengine.ChainTask{ConnectString: "foo"} - _, err := pge.ExecuteSQLTask(ctx, nil, &task, []string{}) + err := pge.ExecuteSQLTask(ctx, nil, &task, []string{}) assert.ErrorContains(t, err, "cannot parse") }) @@ -102,7 +102,7 @@ func TestExecuteSQLTask(t *testing.T) { mockPool.ExpectBegin() tx, err := mockPool.Begin(ctx) assert.NoError(t, err) - _, err = pge.ExecuteSQLTask(ctx, tx, &pgengine.ChainTask{IgnoreError: true}, []string{}) + err = pge.ExecuteSQLTask(ctx, tx, &pgengine.ChainTask{IgnoreError: true}, []string{}) assert.ErrorContains(t, err, "SQL command cannot be empty") }) } @@ -125,11 +125,11 @@ func TestExecLocalSQLTask(t *testing.T) { Command: "FOO", RunAs: "Bob", } - _, err := pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{}) + err := pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{}) assert.Error(t, err) mockPool.ExpectExec("SET ROLE").WillReturnError(errors.New("unknown role Bob")) - _, err = pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{}) + err = pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{}) assert.ErrorContains(t, err, "unknown role Bob") assert.NoError(t, mockPool.ExpectationsWereMet()) } @@ -152,16 +152,16 @@ func TestExecStandaloneTask(t *testing.T) { } cf := func() (pgengine.PgxConnIface, error) { return mockPool.AsConn(), nil } - _, err := pge.ExecStandaloneTask(ctx, cf, &task, []string{}) + err := pge.ExecStandaloneTask(ctx, cf, &task, []string{}) assert.Error(t, err) mockPool.ExpectExec("SET ROLE").WillReturnError(errors.New("unknown role Bob")) mockPool.ExpectClose() - _, err = pge.ExecStandaloneTask(ctx, cf, &task, []string{}) + err = pge.ExecStandaloneTask(ctx, cf, &task, []string{}) assert.ErrorContains(t, err, "unknown role Bob") cf = func() (pgengine.PgxConnIface, error) { return nil, errors.New("no connection") } - _, err = pge.ExecStandaloneTask(ctx, cf, &task, []string{}) + err = pge.ExecStandaloneTask(ctx, cf, &task, []string{}) assert.ErrorContains(t, err, "no connection") assert.NoError(t, mockPool.ExpectationsWereMet()) @@ -181,19 +181,19 @@ func TestExecuteSQLCommand(t *testing.T) { pge := pgengine.NewDB(mockPool, "pgengine_unit_test") - _, err := pge.ExecuteSQLCommand(ctx, mockPool, "", []string{}) + err := pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{}, []string{}) assert.Error(t, err) mockPool.ExpectExec("correct json").WillReturnResult(pgxmock.NewResult("EXECUTE", 0)) - _, err = pge.ExecuteSQLCommand(ctx, mockPool, "correct json", []string{}) + err = pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{Command: "correct json"}, []string{}) assert.NoError(t, err) mockPool.ExpectExec("correct json").WithArgs("John", 30.0, nil).WillReturnResult(pgxmock.NewResult("EXECUTE", 0)) - _, err = pge.ExecuteSQLCommand(ctx, mockPool, "correct json", []string{`["John", 30, null]`}) + err = pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{Command: "correct json"}, []string{`["John", 30, null]`}) assert.NoError(t, err) mockPool.ExpectExec("incorrect json").WillReturnError(json.Unmarshal([]byte("foo"), &struct{}{})) - _, err = pge.ExecuteSQLCommand(ctx, mockPool, "incorrect json", []string{"foo"}) + err = pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{Command: "incorrect json"}, []string{"foo"}) assert.Error(t, err) } diff --git a/internal/scheduler/chain.go b/internal/scheduler/chain.go index fa52951a..f0abe1ac 100644 --- a/internal/scheduler/chain.go +++ b/internal/scheduler/chain.go @@ -3,8 +3,8 @@ package scheduler import ( "cmp" "context" + "errors" "fmt" - "strings" "time" "github.com/cybertec-postgresql/pg_timetable/internal/log" @@ -222,11 +222,16 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) { l := chainL.WithField("task", task.TaskID) l.Info("Starting task") taskCtx := log.WithLogger(chainCtx, l) - retCode := sch.executeTask(taskCtx, tx, &task) + err = sch.executeTask(taskCtx, tx, &task) + if err != nil { + l.WithError(err).Error("Task execution failed") + } else { + l.Info("Task executed successfully") + } // we use background context here because current one (chainCtx) might be cancelled bctx = log.WithLogger(ctx, l) - if retCode != 0 { + if err != nil { if !task.IgnoreError { chainL.Error("Chain failed") sch.pgengine.RemoveChainRunStatus(bctx, chain.ChainID) @@ -247,12 +252,10 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) { } /* execute a task */ -func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine.ChainTask) int { +func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine.ChainTask) error { var ( paramValues []string err error - out string - retCode int cancel context.CancelFunc ) @@ -260,7 +263,7 @@ func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine err = sch.pgengine.GetChainParamValues(ctx, ¶mValues, task) if err != nil { l.WithError(err).Error("cannot fetch parameters values for chain: ", err) - return -1 + return err } ctx, cancel = getTimeoutContext(ctx, sch.Config().Resource.TaskTimeout, task.Timeout) @@ -271,27 +274,16 @@ func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine task.StartedAt = time.Now() switch task.Kind { case "SQL": - out, err = sch.pgengine.ExecuteSQLTask(ctx, tx, task, paramValues) + err = sch.pgengine.ExecuteSQLTask(ctx, tx, task, paramValues) case "PROGRAM": if sch.pgengine.NoProgramTasks { l.Info("Program task execution skipped") - return -2 + return errors.New("program tasks execution is disabled") } - retCode, out, err = sch.ExecuteProgramCommand(ctx, task.Command, paramValues) + err = sch.ExecuteProgramCommand(ctx, task, paramValues) case "BUILTIN": - out, err = sch.executeBuiltinTask(ctx, task.Command, paramValues) + err = sch.executeBuiltinTask(ctx, task, paramValues) } task.Duration = time.Since(task.StartedAt).Microseconds() - - if err != nil { - if retCode == 0 { - retCode = -1 - } - out = strings.Join([]string{out, err.Error()}, "\n") - l.WithError(err).Error("Task execution failed") - } else { - l.Info("Task executed successfully") - } - sch.pgengine.LogTaskExecution(context.Background(), task, retCode, out) - return retCode + return err } diff --git a/internal/scheduler/chain_test.go b/internal/scheduler/chain_test.go index 71b7272d..40596b5e 100644 --- a/internal/scheduler/chain_test.go +++ b/internal/scheduler/chain_test.go @@ -99,9 +99,7 @@ func TestExecuteChain(t *testing.T) { pge := pgengine.NewDB(mock, "-c", "scheduler_unit_test", "--password=somestrong") sch := New(pge, log.Init(config.LoggingOpts{LogLevel: "panic", LogDBLevel: "none"})) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - sch.executeChain(ctx, Chain{Timeout: 1}) + sch.executeChain(t.Context(), Chain{Timeout: 1}) } func TestExecuteChainElement(t *testing.T) { @@ -110,10 +108,8 @@ func TestExecuteChainElement(t *testing.T) { pge := pgengine.NewDB(mock, "-c", "scheduler_unit_test", "--password=somestrong") sch := New(pge, log.Init(config.LoggingOpts{LogLevel: "panic", LogDBLevel: "none"})) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() mock.ExpectQuery("SELECT").WillReturnRows(pgxmock.NewRows([]string{"value"}).AddRow("foo")) - sch.executeTask(ctx, mock, &pgengine.ChainTask{Timeout: 1}) + _ = sch.executeTask(t.Context(), mock, &pgengine.ChainTask{Timeout: 1}) } func TestExecuteOnErrorHandler(t *testing.T) { diff --git a/internal/scheduler/shell.go b/internal/scheduler/shell.go index b31781e0..1017b72a 100644 --- a/internal/scheduler/shell.go +++ b/internal/scheduler/shell.go @@ -4,9 +4,10 @@ import ( "context" "encoding/json" "errors" - "fmt" "os/exec" "strings" + + "github.com/cybertec-postgresql/pg_timetable/internal/pgengine" ) type commander interface { @@ -26,11 +27,11 @@ func (c realCommander) CombinedOutput(ctx context.Context, command string, args var Cmd commander = realCommander{} // ExecuteProgramCommand executes program command and returns status code, output and error if any -func (sch *Scheduler) ExecuteProgramCommand(ctx context.Context, command string, paramValues []string) (code int, stdout string, stderr error) { +func (sch *Scheduler) ExecuteProgramCommand(ctx context.Context, task *pgengine.ChainTask, paramValues []string) error { - command = strings.TrimSpace(command) + command := strings.TrimSpace(task.Command) if command == "" { - return -1, "", errors.New("program command cannot be empty") + return errors.New("program command cannot be empty") } if len(paramValues) == 0 { //mimic empty param paramValues = []string{""} @@ -39,24 +40,21 @@ func (sch *Scheduler) ExecuteProgramCommand(ctx context.Context, command string, params := []string{} if val > "" { if err := json.Unmarshal([]byte(val), ¶ms); err != nil { - return -1, "", err + return err } } out, err := Cmd.CombinedOutput(ctx, command, params...) // #nosec - cmdLine := fmt.Sprintf("%s %v: ", command, params) - stdout = strings.TrimSpace(string(out)) - l := sch.l.WithField("command", cmdLine). - WithField("output", string(out)) if err != nil { //check if we're dealing with an ExitError - i.e. return code other than 0 if exitError, ok := err.(*exec.ExitError); ok { exitCode := exitError.ExitCode() - l.WithField("retcode", exitCode).Debug("Program run", cmdLine, exitCode) - return exitCode, stdout, exitError + sch.pgengine.LogTaskExecution(context.Background(), task, exitCode, string(out), val) + return exitError } - return -1, stdout, err + sch.pgengine.LogTaskExecution(context.Background(), task, -1, string(out), val) + return err } - l.WithField("retcode", 0).Debug("Program run") + sch.pgengine.LogTaskExecution(context.Background(), task, 0, string(out), val) } - return 0, stdout, nil + return nil } diff --git a/internal/scheduler/shell_test.go b/internal/scheduler/shell_test.go index 492ee575..7f63a6d4 100644 --- a/internal/scheduler/shell_test.go +++ b/internal/scheduler/shell_test.go @@ -29,8 +29,6 @@ func (c testCommander) CombinedOutput(_ context.Context, command string, args .. func TestShellCommand(t *testing.T) { scheduler.Cmd = testCommander{} var err error - var out string - var retCode int mock, err := pgxmock.NewPool() // assert.NoError(t, err) @@ -38,35 +36,33 @@ func TestShellCommand(t *testing.T) { scheduler := scheduler.New(pge, log.Init(config.LoggingOpts{LogLevel: "panic", LogDBLevel: "none"})) ctx := context.Background() - _, _, err = scheduler.ExecuteProgramCommand(ctx, "", []string{""}) + err = scheduler.ExecuteProgramCommand(ctx, &pgengine.ChainTask{}, []string{""}) assert.EqualError(t, err, "program command cannot be empty", "Empty command should out, fail") - _, out, err = scheduler.ExecuteProgramCommand(ctx, "ping0", nil) + err = scheduler.ExecuteProgramCommand(ctx, &pgengine.ChainTask{Command: "ping0"}, nil) assert.NoError(t, err, "Command with nil param is out, OK") - assert.True(t, strings.HasPrefix(string(out), "ping0"), "Output should containt only command ") - _, _, err = scheduler.ExecuteProgramCommand(ctx, "ping1", []string{}) + err = scheduler.ExecuteProgramCommand(ctx, &pgengine.ChainTask{Command: "ping1"}, []string{}) assert.NoError(t, err, "Command with empty array param is OK") - _, _, err = scheduler.ExecuteProgramCommand(ctx, "ping2", []string{""}) + err = scheduler.ExecuteProgramCommand(ctx, &pgengine.ChainTask{Command: "ping2"}, []string{""}) assert.NoError(t, err, "Command with empty string param is OK") - _, _, err = scheduler.ExecuteProgramCommand(ctx, "ping3", []string{"[]"}) + err = scheduler.ExecuteProgramCommand(ctx, &pgengine.ChainTask{Command: "ping3"}, []string{"[]"}) assert.NoError(t, err, "Command with empty json array param is OK") - _, _, err = scheduler.ExecuteProgramCommand(ctx, "ping3", []string{"[null]"}) + err = scheduler.ExecuteProgramCommand(ctx, &pgengine.ChainTask{Command: "ping3"}, []string{"[null]"}) assert.NoError(t, err, "Command with nil array param is OK") - _, _, err = scheduler.ExecuteProgramCommand(ctx, "ping4", []string{`["localhost"]`}) + err = scheduler.ExecuteProgramCommand(ctx, &pgengine.ChainTask{Command: "ping4"}, []string{`["localhost"]`}) assert.NoError(t, err, "Command with one param is OK") - _, _, err = scheduler.ExecuteProgramCommand(ctx, "ping5", []string{`["localhost", "-4"]`}) + err = scheduler.ExecuteProgramCommand(ctx, &pgengine.ChainTask{Command: "ping5"}, []string{`["localhost", "-4"]`}) assert.NoError(t, err, "Command with many params is OK") - _, _, err = scheduler.ExecuteProgramCommand(ctx, "pong", nil) + err = scheduler.ExecuteProgramCommand(ctx, &pgengine.ChainTask{Command: "pong"}, nil) assert.IsType(t, (*exec.Error)(nil), err, "Uknown command should produce error") - retCode, _, err = scheduler.ExecuteProgramCommand(ctx, "ping5", []string{`{"param1": "localhost"}`}) + err = scheduler.ExecuteProgramCommand(ctx, &pgengine.ChainTask{Command: "ping5"}, []string{`{"param1": "localhost"}`}) assert.IsType(t, (*json.UnmarshalTypeError)(nil), err, "Command should fail with mailformed json parameter") - assert.NotEqual(t, 0, retCode, "return code should indicate failure.") } diff --git a/internal/scheduler/tasks.go b/internal/scheduler/tasks.go index 883ac808..542210b3 100644 --- a/internal/scheduler/tasks.go +++ b/internal/scheduler/tasks.go @@ -9,6 +9,7 @@ import ( "time" "github.com/cybertec-postgresql/pg_timetable/internal/log" + "github.com/cybertec-postgresql/pg_timetable/internal/pgengine" "github.com/cybertec-postgresql/pg_timetable/internal/tasks" ) @@ -25,22 +26,27 @@ var BuiltinTasks = map[string](func(context.Context, *Scheduler, string) (string "CopyFromProgram": taskCopyFromProgram, "Shutdown": taskShutdown} -func (sch *Scheduler) executeBuiltinTask(ctx context.Context, name string, paramValues []string) (stdout string, err error) { - var s string +func (sch *Scheduler) executeBuiltinTask(ctx context.Context, task *pgengine.ChainTask, paramValues []string) (err error) { + var stdout string + var errCodes = map[bool]int{true: 0, false: -1} + name := task.Command f := BuiltinTasks[name] if f == nil { - return "", errors.New("No built-in task found: " + name) + return errors.New("No built-in task found: " + name) } l := log.GetLogger(ctx) l.WithField("name", name).Debugf("Executing builtin task with parameters %+q", paramValues) if len(paramValues) == 0 { - return f(ctx, sch, "") + stdout, err = f(ctx, sch, "") + sch.pgengine.LogTaskExecution(context.Background(), task, errCodes[err == nil], stdout, "") + return err } for _, val := range paramValues { - if s, err = f(ctx, sch, val); err != nil { + stdout, err = f(ctx, sch, val) + sch.pgengine.LogTaskExecution(context.Background(), task, errCodes[err == nil], stdout, val) + if err != nil { return } - stdout += fmt.Sprintln(s) } return } diff --git a/internal/scheduler/tasks_test.go b/internal/scheduler/tasks_test.go index 9be87841..7ee83c9d 100644 --- a/internal/scheduler/tasks_test.go +++ b/internal/scheduler/tasks_test.go @@ -18,7 +18,7 @@ func TestExecuteTask(t *testing.T) { mocksch := New(pge, log.Init(config.LoggingOpts{LogLevel: "panic", LogDBLevel: "none"})) et := func(task string, params []string) (err error) { - _, err = mocksch.executeBuiltinTask(context.TODO(), task, params) + err = mocksch.executeBuiltinTask(context.TODO(), &pgengine.ChainTask{Command: task}, params) return } diff --git a/main.go b/main.go index cb9a85f4..375b72db 100644 --- a/main.go +++ b/main.go @@ -54,7 +54,7 @@ var ( commit = "000000" version = "master" date = "unknown" - dbapi = "00629" + dbapi = "00733" ) func printVersion() {