Skip to content

Commit c49f578

Browse files
authored
[*] refactor autonomous and remote tasks execution, fixes #584 (#585)
* [*] refactor autonomous and remote tasks execution, fixes #584 * [-] fix `TestSetRole()` * [-] fix `TestSelectChains()` * [+] add `TestIsIntervalChainListed()` * [+] add `TestExecLocalSQLTask()` and `TestExecStandaloneTask()` * [+] bump `pashagolub/pgxmock` to v2.9.0
1 parent 39f9c53 commit c49f578

15 files changed

Lines changed: 344 additions & 302 deletions

.golangci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ linters:
77
linters-settings:
88
gocyclo:
99
# minimal code complexity to report, 30 by default (but we recommend 10-20)
10-
min-complexity: 16
10+
min-complexity: 15
1111

1212
issues:
1313
# List of regexps of issue texts to exclude, empty list by default.

go.mod

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/jackc/pgx/v5 v5.4.1
88
github.com/jessevdk/go-flags v1.5.0
99
github.com/ory/mail/v3 v3.0.1-0.20210418065910-7f033ddea8dc
10-
github.com/pashagolub/pgxmock/v2 v2.8.0
10+
github.com/pashagolub/pgxmock/v2 v2.9.0
1111
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
1212
github.com/sethvargo/go-retry v0.2.4
1313
github.com/sirupsen/logrus v1.9.3
@@ -33,10 +33,10 @@ require (
3333
github.com/spf13/jwalterweatherman v1.1.0 // indirect
3434
github.com/spf13/pflag v1.0.5 // indirect
3535
github.com/subosito/gotenv v1.4.2 // indirect
36-
golang.org/x/crypto v0.9.0 // indirect
37-
golang.org/x/sync v0.1.0 // indirect
38-
golang.org/x/sys v0.8.0 // indirect
39-
golang.org/x/text v0.9.0 // indirect
36+
golang.org/x/crypto v0.10.0 // indirect
37+
golang.org/x/sync v0.3.0 // indirect
38+
golang.org/x/sys v0.9.0 // indirect
39+
golang.org/x/text v0.11.0 // indirect
4040
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
4141
gopkg.in/ini.v1 v1.67.0 // indirect
4242
gopkg.in/yaml.v3 v3.0.1 // indirect

go.sum

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
149149
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
150150
github.com/ory/mail/v3 v3.0.1-0.20210418065910-7f033ddea8dc h1:BU12v9x5hvONtYU2R2LnlkxmWSsjzco046NzJLcWMHg=
151151
github.com/ory/mail/v3 v3.0.1-0.20210418065910-7f033ddea8dc/go.mod h1:vAPEMm1zIQKGmM9hcZTSlOU/CDVCXHGOw6SFxPlSoHw=
152-
github.com/pashagolub/pgxmock/v2 v2.8.0 h1:8y6yskm4sdXY0ELZ5vngDxrqwhlbIr1eEDHLAN727Z4=
153-
github.com/pashagolub/pgxmock/v2 v2.8.0/go.mod h1:J+Cg7sz4O4zV94P/jAp1K8d5hZjH7pGODw+e3y/K7TQ=
152+
github.com/pashagolub/pgxmock/v2 v2.9.0 h1:9EBuAUsrkTVtBsyRsqOJpEwq457s7AWTfI/tYn/FbEY=
153+
github.com/pashagolub/pgxmock/v2 v2.9.0/go.mod h1:J+Cg7sz4O4zV94P/jAp1K8d5hZjH7pGODw+e3y/K7TQ=
154154
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
155155
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
156156
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -209,8 +209,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
209209
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
210210
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
211211
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
212-
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
213-
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
212+
golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM=
213+
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
214214
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
215215
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
216216
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -295,8 +295,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
295295
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
296296
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
297297
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
298-
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
299-
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
298+
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
299+
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
300300
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
301301
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
302302
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -335,8 +335,8 @@ golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7w
335335
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
336336
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
337337
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
338-
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
339-
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
338+
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
339+
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
340340
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
341341
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
342342
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -346,8 +346,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
346346
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
347347
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
348348
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
349-
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
350-
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
349+
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
350+
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
351351
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
352352
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
353353
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

internal/pgengine/access.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,26 @@ FROM timetable.chain WHERE (client_name = $1 OR client_name IS NULL) AND chain_i
119119
*dest, err = pgx.CollectOneRow(rows, pgx.RowToStructByName[Chain])
120120
return err
121121
}
122+
123+
// GetChainElements returns all elements for a given chain
124+
func (pge *PgEngine) GetChainElements(ctx context.Context, chainTasks *[]ChainTask, chainID int) error {
125+
const sqlSelectChainTasks = `SELECT task_id, command, kind, run_as, ignore_error, autonomous, database_connection, timeout
126+
FROM timetable.task WHERE chain_id = $1 ORDER BY task_order ASC`
127+
rows, err := pge.ConfigDb.Query(ctx, sqlSelectChainTasks, chainID)
128+
if err != nil {
129+
return err
130+
}
131+
*chainTasks, err = pgx.CollectRows(rows, pgx.RowToStructByName[ChainTask])
132+
return err
133+
}
134+
135+
// GetChainParamValues returns parameter values to pass for task being executed
136+
func (pge *PgEngine) GetChainParamValues(ctx context.Context, paramValues *[]string, task *ChainTask) error {
137+
const sqlGetParamValues = `SELECT value FROM timetable.parameter WHERE task_id = $1 AND value IS NOT NULL ORDER BY order_id ASC`
138+
rows, err := pge.ConfigDb.Query(ctx, sqlGetParamValues, task.TaskID)
139+
if err != nil {
140+
return err
141+
}
142+
*paramValues, err = pgx.CollectRows(rows, pgx.RowTo[string])
143+
return err
144+
}

internal/pgengine/access_test.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,25 @@ func TestRemoveChainRunStatus(t *testing.T) {
6161
}
6262

6363
func TestSelectChains(t *testing.T) {
64-
var c *[]pgengine.Chain
65-
var ic *[]pgengine.IntervalChain
64+
var c []pgengine.Chain
65+
var ic []pgengine.IntervalChain
6666
initmockdb(t)
6767
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")
6868
defer mockPool.Close()
6969

70-
mockPool.ExpectExec("SELECT.+chain_id").WillReturnError(errors.New("error"))
71-
assert.Error(t, pge.SelectChains(context.Background(), c))
70+
for i := 0; i < 3; i++ {
71+
mockPool.ExpectQuery("SELECT.+chain_id").WithArgs(pgxmock.AnyArg()).WillReturnError(errors.New("error"))
72+
mockPool.ExpectQuery("SELECT.+chain_id").WithArgs(pgxmock.AnyArg()).WillReturnRows(pgxmock.NewRows([]string{"foo"}).AddRow("baz"))
73+
}
7274

73-
mockPool.ExpectExec("SELECT.+chain_id").WillReturnError(errors.New("error"))
74-
assert.Error(t, pge.SelectRebootChains(context.Background(), c))
75+
assert.Error(t, pge.SelectChains(context.Background(), &c))
76+
assert.Error(t, pge.SelectChains(context.Background(), &c), "unacceptable columns")
7577

76-
mockPool.ExpectExec("SELECT.+chain_id").WillReturnError(errors.New("error"))
77-
assert.Error(t, pge.SelectIntervalChains(context.Background(), ic))
78+
assert.Error(t, pge.SelectRebootChains(context.Background(), &c))
79+
assert.Error(t, pge.SelectRebootChains(context.Background(), &c), "unacceptable columns")
80+
81+
assert.Error(t, pge.SelectIntervalChains(context.Background(), &ic))
82+
assert.Error(t, pge.SelectIntervalChains(context.Background(), &ic), "unacceptable columns")
7883
}
7984

8085
func TestSelectChain(t *testing.T) {

internal/pgengine/bootstrap.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"math/rand"
88
"os"
9+
"strings"
910
"time"
1011

1112
"github.com/cybertec-postgresql/pg_timetable/internal/config"
@@ -133,6 +134,10 @@ func NewDB(DB PgxPoolIface, args ...string) *PgEngine {
133134
}
134135
}
135136

137+
func quoteIdent(s string) string {
138+
return `"` + strings.Replace(s, `"`, `""`, -1) + `"`
139+
}
140+
136141
// getPgxConnConfig transforms standard connestion string to pgx specific one with
137142
func (pge *PgEngine) getPgxConnConfig() *pgxpool.Config {
138143
var connstr string
@@ -166,6 +171,7 @@ func (pge *PgEngine) getPgxConnConfig() *pgxpool.Config {
166171
if err = pge.TryLockClientName(ctx, pgconn); err != nil {
167172
return err
168173
}
174+
169175
_, err = pgconn.Exec(ctx, "LISTEN "+quoteIdent(pge.ClientName))
170176
if pge.logTypeOID == InvalidOid {
171177
err = pgconn.QueryRow(ctx, "select coalesce(to_regtype('timetable.log_type')::oid, 0)").Scan(&pge.logTypeOID)

internal/pgengine/pgengine_test.go

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ import (
77
"testing"
88
"time"
99

10-
pgx "github.com/jackc/pgx/v5"
11-
"github.com/jackc/pgx/v5/pgtype"
10+
pgtype "github.com/jackc/pgx/v5/pgtype"
1211
"github.com/stretchr/testify/assert"
1312
"github.com/stretchr/testify/require"
1413

@@ -51,11 +50,11 @@ func SetupTestCase(t *testing.T) func(t *testing.T) {
5150
}
5251

5352
// setupTestRenoteDBFunc used to connect to remote postgreSQL database
54-
var setupTestRemoteDBFunc = func() (pgengine.PgxConnIface, pgx.Tx, error) {
53+
var setupTestRemoteDBFunc = func() (pgengine.PgxConnIface, error) {
5554
c := cmdOpts.Connection
5655
connstr := fmt.Sprintf("host='%s' port='%d' sslmode='%s' dbname='%s' user='%s' password='%s'",
5756
c.Host, c.Port, c.SSLMode, c.DBName, c.User, c.Password)
58-
return pge.GetRemoteDBTransaction(context.Background(), connstr)
57+
return pge.GetRemoteDBConnection(context.Background(), connstr)
5958
}
6059

6160
func TestInitAndTestConfigDBConnection(t *testing.T) {
@@ -134,23 +133,23 @@ func TestSchedulerFunctions(t *testing.T) {
134133

135134
ctx := context.Background()
136135

137-
t.Run("Check DeleteChainConfig funсtion", func(t *testing.T) {
136+
t.Run("Check DeleteChainConfig function", func(t *testing.T) {
138137
assert.Equal(t, false, pge.DeleteChain(ctx, 0), "Should not delete in clean database")
139138
})
140139

141-
t.Run("Check GetChainElements funсtion", func(t *testing.T) {
140+
t.Run("Check GetChainElements function", func(t *testing.T) {
142141
var chains []pgengine.ChainTask
143-
tx, txid, err := pge.StartTransaction(ctx, 0)
142+
tx, txid, err := pge.StartTransaction(ctx)
144143
assert.NoError(t, err, "Should start transaction")
145144
assert.Greater(t, txid, int64(0), "Should return transaction id")
146145
assert.NoError(t, pge.GetChainElements(ctx, &chains, 0), "Should no error in clean database")
147146
assert.Empty(t, chains, "Should be empty in clean database")
148147
pge.CommitTransaction(ctx, tx)
149148
})
150149

151-
t.Run("Check GetChainParamValues funсtion", func(t *testing.T) {
150+
t.Run("Check GetChainParamValues function", func(t *testing.T) {
152151
var paramVals []string
153-
tx, txid, err := pge.StartTransaction(ctx, 0)
152+
tx, txid, err := pge.StartTransaction(ctx)
154153
assert.NoError(t, err, "Should start transaction")
155154
assert.Greater(t, txid, int64(0), "Should return transaction id")
156155
assert.NoError(t, pge.GetChainParamValues(ctx, &paramVals, &pgengine.ChainTask{
@@ -160,15 +159,15 @@ func TestSchedulerFunctions(t *testing.T) {
160159
pge.CommitTransaction(ctx, tx)
161160
})
162161

163-
t.Run("Check InsertChainRunStatus funсtion", func(t *testing.T) {
162+
t.Run("Check InsertChainRunStatus function", func(t *testing.T) {
164163
var res bool
165164
assert.NotPanics(t, func() { res = pge.InsertChainRunStatus(ctx, 0, 1) },
166165
"Should no error in clean database")
167166
assert.True(t, res, "Active chain should be inserted")
168167
})
169168

170169
t.Run("Check ExecuteSQLCommand function", func(t *testing.T) {
171-
tx, txid, err := pge.StartTransaction(ctx, 0)
170+
tx, txid, err := pge.StartTransaction(ctx)
172171
assert.NoError(t, err, "Should start transaction")
173172
assert.Greater(t, txid, int64(0), "Should return transaction id")
174173
f := func(sql string, params []string) error {
@@ -191,30 +190,17 @@ func TestSchedulerFunctions(t *testing.T) {
191190
func TestGetRemoteDBTransaction(t *testing.T) {
192191
teardownTestCase := SetupTestCase(t)
193192
defer teardownTestCase(t)
194-
195193
ctx := context.Background()
196-
197-
remoteDb, tx, err := setupTestRemoteDBFunc()
198-
defer pge.FinalizeRemoteDBConnection(ctx, remoteDb)
194+
remoteDb, err := setupTestRemoteDBFunc()
195+
defer pge.FinalizeDBConnection(ctx, remoteDb)
199196
require.NoError(t, err, "remoteDB should be initialized")
200197
require.NotNil(t, remoteDb, "remoteDB should be initialized")
201198

202-
t.Run("Check connection closing", func(t *testing.T) {
203-
pge.FinalizeRemoteDBConnection(ctx, remoteDb)
204-
assert.NotNil(t, remoteDb, "Connection isn't closed properly")
205-
})
206-
207-
t.Run("Check set role function", func(t *testing.T) {
208-
var runUID pgtype.Text
209-
runUID.String = cmdOpts.Connection.User
210-
assert.NotPanics(t, func() { pge.SetRole(ctx, tx, runUID) }, "Set Role failed")
211-
})
212-
213-
t.Run("Check reset role function", func(t *testing.T) {
214-
assert.NotPanics(t, func() { pge.ResetRole(ctx, tx) }, "Reset Role failed")
215-
})
216-
217-
pge.CommitTransaction(ctx, tx)
199+
assert.NoError(t, pge.SetRole(ctx, remoteDb, pgtype.Text{String: cmdOpts.Connection.User, Valid: true}),
200+
"Set Role failed")
201+
assert.NotPanics(t, func() { pge.ResetRole(ctx, remoteDb) }, "Reset Role failed")
202+
pge.FinalizeDBConnection(ctx, remoteDb)
203+
assert.NotNil(t, remoteDb, "Connection isn't closed properly")
218204
}
219205

220206
func TestSamplesScripts(t *testing.T) {

0 commit comments

Comments
 (0)