Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/pgengine/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (pge *PgEngine) IsAlive() bool {
}

// LogTaskExecution will log current chain element execution status including retcode
func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retCode int, output string) {
func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retCode int, output string, params string) {
switch pge.Logging.LogDBLevel {
case "none":
return
Expand All @@ -38,12 +38,12 @@ func (pge *PgEngine) LogTaskExecution(ctx context.Context, task *ChainTask, retC
}
}
_, err := pge.ConfigDb.Exec(ctx, `INSERT INTO timetable.execution_log (
chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, client_name, txid, ignore_error)
VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10, $11)`,
chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, client_name, txid, ignore_error, params)
VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10, $11, $12)`,
task.ChainID, task.TaskID, task.Command, task.Kind,
fmt.Sprintf("%f seconds", float64(task.Duration)/1000000),
retCode, pge.Getsid(), strings.TrimSpace(output), pge.ClientName, task.Vxid,
task.IgnoreError)
task.IgnoreError, params)
if err != nil {
pge.l.WithError(err).Error("Failed to log chain element execution status")
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pgengine/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func TestLogChainElementExecution(t *testing.T) {
t.Run("Check LogChainElementExecution if sql fails", func(*testing.T) {
mockPool.ExpectExec("INSERT INTO .*execution_log").WithArgs(
pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(),
pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg()).
pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg(), pgxmock.AnyArg()).
WillReturnError(errors.New("Failed to log chain element execution status"))
pge.LogTaskExecution(context.Background(), &pgengine.ChainTask{}, 0, "STATUS")
pge.LogTaskExecution(context.Background(), &pgengine.ChainTask{}, 0, "STATUS", "")
})

assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
Expand Down
6 changes: 6 additions & 0 deletions internal/pgengine/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ var Migrations func() migrator.Option = func() migrator.Option {
return ExecuteMigrationScript(ctx, tx, "00721.sql")
},
},
&migrator.Migration{
Name: "00733 Add params column to timetable.execution_log table",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00733.sql")
},
},
// adding new migration here, update "timetable"."migration" in "sql/init.sql"
// and "dbapi" variable in main.go!

Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/pgengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestSchedulerFunctions(t *testing.T) {
assert.NoError(t, err, "Should start transaction")
assert.Greater(t, txid, int64(0), "Should return transaction id")
f := func(sql string, params []string) error {
_, err := pge.ExecuteSQLCommand(ctx, tx, sql, params)
err := pge.ExecuteSQLCommand(ctx, tx, &pgengine.ChainTask{Command: sql}, params)
return err
}
assert.Error(t, f("", nil), "Should error for empty script")
Expand Down
3 changes: 2 additions & 1 deletion internal/pgengine/sql/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ CREATE TABLE timetable.execution_log (
kind timetable.command_kind,
command TEXT,
output TEXT,
client_name TEXT NOT NULL
client_name TEXT NOT NULL,
params TEXT
);

COMMENT ON TABLE timetable.execution_log IS
Expand Down
3 changes: 2 additions & 1 deletion internal/pgengine/sql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ VALUES
(11, '00573 Add ability to start a chain with delay'),
(12, '00575 Add on_error handling'),
(13, '00629 Add ignore_error column to timetable.execution_log'),
(14, '00721 Add more job control functions');
(14, '00721 Add more job control functions'),
(15, '00733 Add params column to timetable.execution_log table');
4 changes: 4 additions & 0 deletions internal/pgengine/sql/migrations/00733.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Add params column to execution_log table to store parameter values used during task execution
ALTER TABLE timetable.execution_log ADD COLUMN params TEXT;

COMMENT ON COLUMN timetable.execution_log.params IS 'Array of parameter values used during task execution';
49 changes: 25 additions & 24 deletions internal/pgengine/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/cybertec-postgresql/pg_timetable/internal/log"
pgx "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)

// StartTransaction returns transaction object, virtual transaction id and error
Expand Down Expand Up @@ -56,7 +55,7 @@ func (pge *PgEngine) MustRollbackToSavepoint(ctx context.Context, tx pgx.Tx, tas
}

// ExecuteSQLTask executes SQL task
func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (out string, err error) {
func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (err error) {
switch {
case task.IsRemote():
return pge.ExecRemoteSQLTask(ctx, task, paramValues)
Expand All @@ -68,15 +67,15 @@ func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainT
}

// ExecLocalSQLTask executes local task in the chain transaction
func (pge *PgEngine) ExecLocalSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (out string, err error) {
func (pge *PgEngine) ExecLocalSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (err error) {
if err := pge.SetRole(ctx, tx, task.RunAs); err != nil {
return "", err
return err
}
if task.IgnoreError {
pge.MustSavepoint(ctx, tx, task.TaskID)
}
pge.SetCurrentTaskContext(ctx, tx, task.ChainID, task.TaskID)
out, err = pge.ExecuteSQLCommand(ctx, tx, task.Command, paramValues)
err = pge.ExecuteSQLCommand(ctx, tx, task, paramValues)
if err != nil && task.IgnoreError {
pge.MustRollbackToSavepoint(ctx, tx, task.TaskID)
}
Expand All @@ -87,55 +86,57 @@ func (pge *PgEngine) ExecLocalSQLTask(ctx context.Context, tx pgx.Tx, task *Chai
}

// ExecStandaloneTask executes task against the provided connection interface, it can be remote connection or acquired connection from the pool
func (pge *PgEngine) ExecStandaloneTask(ctx context.Context, connf func() (PgxConnIface, error), task *ChainTask, paramValues []string) (string, error) {
func (pge *PgEngine) ExecStandaloneTask(ctx context.Context, connf func() (PgxConnIface, error), task *ChainTask, paramValues []string) error {
conn, err := connf()
if err != nil {
return "", err
return err
}
defer pge.FinalizeDBConnection(ctx, conn)
if err := pge.SetRole(ctx, conn, task.RunAs); err != nil {
return "", err
return err
}
pge.SetCurrentTaskContext(ctx, conn, task.ChainID, task.TaskID)
return pge.ExecuteSQLCommand(ctx, conn, task.Command, paramValues)
return pge.ExecuteSQLCommand(ctx, conn, task, paramValues)
}

// ExecRemoteSQLTask executes task against remote connection
func (pge *PgEngine) ExecRemoteSQLTask(ctx context.Context, task *ChainTask, paramValues []string) (string, error) {
func (pge *PgEngine) ExecRemoteSQLTask(ctx context.Context, task *ChainTask, paramValues []string) error {
log.GetLogger(ctx).Info("Switching to remote task mode")
return pge.ExecStandaloneTask(ctx,
func() (PgxConnIface, error) { return pge.GetRemoteDBConnection(ctx, task.ConnectString) },
task, paramValues)
}

// ExecAutonomousSQLTask executes autonomous task in an acquired connection from pool
func (pge *PgEngine) ExecAutonomousSQLTask(ctx context.Context, task *ChainTask, paramValues []string) (string, error) {
func (pge *PgEngine) ExecAutonomousSQLTask(ctx context.Context, task *ChainTask, paramValues []string) error {
log.GetLogger(ctx).Info("Switching to autonomous task mode")
return pge.ExecStandaloneTask(ctx,
func() (PgxConnIface, error) { return pge.GetLocalDBConnection(ctx) },
task, paramValues)
}

// ExecuteSQLCommand executes chain command with parameters inside transaction
func (pge *PgEngine) ExecuteSQLCommand(ctx context.Context, executor executor, command string, paramValues []string) (out string, err error) {
var ct pgconn.CommandTag
func (pge *PgEngine) ExecuteSQLCommand(ctx context.Context, executor executor, task *ChainTask, paramValues []string) (err error) {
var params []any
if strings.TrimSpace(command) == "" {
return "", errors.New("SQL command cannot be empty")
var errCodes = map[bool]int{false: 0, true: -1}
if strings.TrimSpace(task.Command) == "" {
return errors.New("SQL command cannot be empty")
}
if len(paramValues) == 0 { //mimic empty param
ct, err = executor.Exec(ctx, command)
out = ct.String()
return
ct, e := executor.Exec(ctx, task.Command)
pge.LogTaskExecution(context.Background(), task, errCodes[err != nil], ct.String(), "")
return e
}
for _, val := range paramValues {
if val > "" {
if err = json.Unmarshal([]byte(val), &params); err != nil {
return
}
ct, err = executor.Exec(ctx, command, params...)
out = out + ct.String() + "\n"
if val == "" {
continue
}
if err = json.Unmarshal([]byte(val), &params); err != nil {
return
}
ct, e := executor.Exec(ctx, task.Command, params...)
err = errors.Join(err, e)
pge.LogTaskExecution(context.Background(), task, errCodes[e != nil], ct.String(), val)
}
return
}
Expand Down
24 changes: 12 additions & 12 deletions internal/pgengine/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,21 @@ func TestExecuteSQLTask(t *testing.T) {
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")

t.Run("Check autonomous SQL task", func(t *testing.T) {
_, err := pge.ExecuteSQLTask(ctx, nil, &pgengine.ChainTask{Autonomous: true}, []string{})
err := pge.ExecuteSQLTask(ctx, nil, &pgengine.ChainTask{Autonomous: true}, []string{})
assert.ErrorContains(t, err, "pgpool.Acquire() method is not implemented")
})

t.Run("Check remote SQL task", func(t *testing.T) {
task := pgengine.ChainTask{ConnectString: "foo"}
_, err := pge.ExecuteSQLTask(ctx, nil, &task, []string{})
err := pge.ExecuteSQLTask(ctx, nil, &task, []string{})
assert.ErrorContains(t, err, "cannot parse")
})

t.Run("Check local SQL task", func(t *testing.T) {
mockPool.ExpectBegin()
tx, err := mockPool.Begin(ctx)
assert.NoError(t, err)
_, err = pge.ExecuteSQLTask(ctx, tx, &pgengine.ChainTask{IgnoreError: true}, []string{})
err = pge.ExecuteSQLTask(ctx, tx, &pgengine.ChainTask{IgnoreError: true}, []string{})
assert.ErrorContains(t, err, "SQL command cannot be empty")
})
}
Expand All @@ -125,11 +125,11 @@ func TestExecLocalSQLTask(t *testing.T) {
Command: "FOO",
RunAs: "Bob",
}
_, err := pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{})
err := pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{})
assert.Error(t, err)

mockPool.ExpectExec("SET ROLE").WillReturnError(errors.New("unknown role Bob"))
_, err = pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{})
err = pge.ExecLocalSQLTask(ctx, mockPool, &task, []string{})
assert.ErrorContains(t, err, "unknown role Bob")
assert.NoError(t, mockPool.ExpectationsWereMet())
}
Expand All @@ -152,16 +152,16 @@ func TestExecStandaloneTask(t *testing.T) {
}
cf := func() (pgengine.PgxConnIface, error) { return mockPool.AsConn(), nil }

_, err := pge.ExecStandaloneTask(ctx, cf, &task, []string{})
err := pge.ExecStandaloneTask(ctx, cf, &task, []string{})
assert.Error(t, err)

mockPool.ExpectExec("SET ROLE").WillReturnError(errors.New("unknown role Bob"))
mockPool.ExpectClose()
_, err = pge.ExecStandaloneTask(ctx, cf, &task, []string{})
err = pge.ExecStandaloneTask(ctx, cf, &task, []string{})
assert.ErrorContains(t, err, "unknown role Bob")

cf = func() (pgengine.PgxConnIface, error) { return nil, errors.New("no connection") }
_, err = pge.ExecStandaloneTask(ctx, cf, &task, []string{})
err = pge.ExecStandaloneTask(ctx, cf, &task, []string{})
assert.ErrorContains(t, err, "no connection")

assert.NoError(t, mockPool.ExpectationsWereMet())
Expand All @@ -181,19 +181,19 @@ func TestExecuteSQLCommand(t *testing.T) {

pge := pgengine.NewDB(mockPool, "pgengine_unit_test")

_, err := pge.ExecuteSQLCommand(ctx, mockPool, "", []string{})
err := pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{}, []string{})
assert.Error(t, err)

mockPool.ExpectExec("correct json").WillReturnResult(pgxmock.NewResult("EXECUTE", 0))
_, err = pge.ExecuteSQLCommand(ctx, mockPool, "correct json", []string{})
err = pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{Command: "correct json"}, []string{})
assert.NoError(t, err)

mockPool.ExpectExec("correct json").WithArgs("John", 30.0, nil).WillReturnResult(pgxmock.NewResult("EXECUTE", 0))
_, err = pge.ExecuteSQLCommand(ctx, mockPool, "correct json", []string{`["John", 30, null]`})
err = pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{Command: "correct json"}, []string{`["John", 30, null]`})
assert.NoError(t, err)

mockPool.ExpectExec("incorrect json").WillReturnError(json.Unmarshal([]byte("foo"), &struct{}{}))
_, err = pge.ExecuteSQLCommand(ctx, mockPool, "incorrect json", []string{"foo"})
err = pge.ExecuteSQLCommand(ctx, mockPool, &pgengine.ChainTask{Command: "incorrect json"}, []string{"foo"})
assert.Error(t, err)
}

Expand Down
38 changes: 15 additions & 23 deletions internal/scheduler/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package scheduler
import (
"cmp"
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/cybertec-postgresql/pg_timetable/internal/log"
Expand Down Expand Up @@ -222,11 +222,16 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
l := chainL.WithField("task", task.TaskID)
l.Info("Starting task")
taskCtx := log.WithLogger(chainCtx, l)
retCode := sch.executeTask(taskCtx, tx, &task)
err = sch.executeTask(taskCtx, tx, &task)
if err != nil {
l.WithError(err).Error("Task execution failed")
} else {
l.Info("Task executed successfully")
}

// we use background context here because current one (chainCtx) might be cancelled
bctx = log.WithLogger(ctx, l)
if retCode != 0 {
if err != nil {
if !task.IgnoreError {
chainL.Error("Chain failed")
sch.pgengine.RemoveChainRunStatus(bctx, chain.ChainID)
Expand All @@ -247,20 +252,18 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
}

/* execute a task */
func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine.ChainTask) int {
func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine.ChainTask) error {
var (
paramValues []string
err error
out string
retCode int
cancel context.CancelFunc
)

l := log.GetLogger(ctx)
err = sch.pgengine.GetChainParamValues(ctx, &paramValues, task)
if err != nil {
l.WithError(err).Error("cannot fetch parameters values for chain: ", err)
return -1
return err
}

ctx, cancel = getTimeoutContext(ctx, sch.Config().Resource.TaskTimeout, task.Timeout)
Expand All @@ -271,27 +274,16 @@ func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine
task.StartedAt = time.Now()
switch task.Kind {
case "SQL":
out, err = sch.pgengine.ExecuteSQLTask(ctx, tx, task, paramValues)
err = sch.pgengine.ExecuteSQLTask(ctx, tx, task, paramValues)
case "PROGRAM":
if sch.pgengine.NoProgramTasks {
l.Info("Program task execution skipped")
return -2
return errors.New("program tasks execution is disabled")
}
retCode, out, err = sch.ExecuteProgramCommand(ctx, task.Command, paramValues)
err = sch.ExecuteProgramCommand(ctx, task, paramValues)
case "BUILTIN":
out, err = sch.executeBuiltinTask(ctx, task.Command, paramValues)
err = sch.executeBuiltinTask(ctx, task, paramValues)
}
task.Duration = time.Since(task.StartedAt).Microseconds()

if err != nil {
if retCode == 0 {
retCode = -1
}
out = strings.Join([]string{out, err.Error()}, "\n")
l.WithError(err).Error("Task execution failed")
} else {
l.Info("Task executed successfully")
}
sch.pgengine.LogTaskExecution(context.Background(), task, retCode, out)
return retCode
return err
}
8 changes: 2 additions & 6 deletions internal/scheduler/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ func TestExecuteChain(t *testing.T) {
pge := pgengine.NewDB(mock, "-c", "scheduler_unit_test", "--password=somestrong")
sch := New(pge, log.Init(config.LoggingOpts{LogLevel: "panic", LogDBLevel: "none"}))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sch.executeChain(ctx, Chain{Timeout: 1})
sch.executeChain(t.Context(), Chain{Timeout: 1})
}

func TestExecuteChainElement(t *testing.T) {
Expand All @@ -110,10 +108,8 @@ func TestExecuteChainElement(t *testing.T) {
pge := pgengine.NewDB(mock, "-c", "scheduler_unit_test", "--password=somestrong")
sch := New(pge, log.Init(config.LoggingOpts{LogLevel: "panic", LogDBLevel: "none"}))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mock.ExpectQuery("SELECT").WillReturnRows(pgxmock.NewRows([]string{"value"}).AddRow("foo"))
sch.executeTask(ctx, mock, &pgengine.ChainTask{Timeout: 1})
_ = sch.executeTask(t.Context(), mock, &pgengine.ChainTask{Timeout: 1})
}

func TestExecuteOnErrorHandler(t *testing.T) {
Expand Down
Loading
Loading