Skip to content

Commit fa728f0

Browse files
authored
[-] prevent running several instances with the same clientname, fixes #475 (#476)
* `[-]` prevent running several instances with the same `clientname`, fixes #475 * `[*]` increase connection timeout in SetupTestCase * `[-]` add `pge.Finalize()` to migrator tests
1 parent ff713b2 commit fa728f0

7 files changed

Lines changed: 43 additions & 24 deletions

File tree

internal/migrator/migrator_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ func TestBadMigrations(t *testing.T) {
8686
if err != nil {
8787
t.Fatal(err)
8888
}
89+
defer pge.Finalize()
90+
8991
ctx := context.Background()
9092
db, err := pge.ConfigDb.Acquire(ctx)
9193
if err != nil {
@@ -143,6 +145,7 @@ func TestPending(t *testing.T) {
143145
if err != nil {
144146
t.Fatal(err)
145147
}
148+
defer pge.Finalize()
146149
ctx := context.Background()
147150
db, err := pge.ConfigDb.Acquire(ctx)
148151
if err != nil {

internal/pgengine/access.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package pgengine
33
import (
44
"context"
55
"fmt"
6-
"os"
76
"strings"
87

98
"github.com/georgysavva/scany/pgxscan"
@@ -35,7 +34,7 @@ chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, c
3534
VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10)`,
3635
task.ChainID, task.TaskID, task.Script, task.Kind,
3736
fmt.Sprintf("%f seconds", float64(task.Duration)/1000000),
38-
retCode, os.Getpid(), strings.TrimSpace(output), pge.ClientName, task.Txid)
37+
retCode, pge.Getpid(), strings.TrimSpace(output), pge.ClientName, task.Txid)
3938
if err != nil {
4039
pge.l.WithError(err).Error("Failed to log chain element execution status")
4140
}
@@ -65,7 +64,7 @@ func (pge *PgEngine) RemoveChainRunStatus(ctx context.Context, chainID int) {
6564
}
6665
}
6766

68-
//Select live chains with proper client_name value
67+
// Select live chains with proper client_name value
6968
const sqlSelectLiveChains = `SELECT chain_id, chain_name, self_destruct, exclusive_execution, COALESCE(timeout, 0) as timeout, COALESCE(max_instances, 16) as max_instances
7069
FROM timetable.chain WHERE live AND (client_name = $1 or client_name IS NULL)`
7170

internal/pgengine/bootstrap.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io/ioutil"
7-
"os"
7+
"math/rand"
88
"time"
99

1010
"github.com/cybertec-postgresql/pg_timetable/internal/config"
@@ -51,6 +51,18 @@ type PgEngine struct {
5151
config.CmdOptions
5252
// NOTIFY messages passed verification are pushed to this channel
5353
chainSignalChan chan ChainSignal
54+
pid int32
55+
}
56+
57+
// Getpid returns the pseudo-random process ID to use for the session identification.
58+
// Previously `os.Getpid()` used but this approach is not producing unique IDs for docker containers
59+
// where all IDs are the same across all running containers, e.g. 1
60+
func (pge *PgEngine) Getpid() int32 {
61+
if pge.pid == 0 {
62+
rand.Seed(time.Now().UnixNano())
63+
pge.pid = rand.Int31()
64+
}
65+
return pge.pid
5466
}
5567

5668
var sqls = []string{sqlDDL, sqlJSONSchema, sqlCronFunctions, sqlJobFunctions}
@@ -61,12 +73,12 @@ func New(ctx context.Context, cmdOpts config.CmdOptions, logger log.LoggerHooker
6173
var wt int = WaitTime
6274
var err error
6375
pge := &PgEngine{
64-
logger,
65-
nil,
66-
cmdOpts,
67-
make(chan ChainSignal, 64),
76+
l: logger,
77+
ConfigDb: nil,
78+
CmdOptions: cmdOpts,
79+
chainSignalChan: make(chan ChainSignal, 64),
6880
}
69-
pge.l.WithField("PID", os.Getpid()).Debug("Starting new session... ")
81+
pge.l.WithField("PID", pge.Getpid()).Info("Starting new session... ")
7082
connctx, conncancel := context.WithTimeout(ctx, time.Duration(cmdOpts.Connection.Timeout)*time.Second)
7183
defer conncancel()
7284

@@ -106,7 +118,12 @@ func New(ctx context.Context, cmdOpts config.CmdOptions, logger log.LoggerHooker
106118
// NewDB creates pgengine instance for already opened database connection, allowing to bypass a parameters based credentials.
107119
// We assume here all checks for proper schema validation are done beforehannd
108120
func NewDB(DB PgxPoolIface, args ...string) *PgEngine {
109-
return &PgEngine{log.Init(config.LoggingOpts{LogLevel: "error"}), DB, *config.NewCmdOptions(args...), make(chan ChainSignal, 64)}
121+
return &PgEngine{
122+
l: log.Init(config.LoggingOpts{LogLevel: "error"}),
123+
ConfigDb: DB,
124+
CmdOptions: *config.NewCmdOptions(args...),
125+
chainSignalChan: make(chan ChainSignal, 64),
126+
}
110127
}
111128

112129
// getPgxConnConfig transforms standard connestion string to pgx specific one with
@@ -159,7 +176,7 @@ func (pge *PgEngine) getPgxConnConfig() *pgxpool.Config {
159176

160177
// AddLogHook adds a new pgx log hook to logrus logger
161178
func (pge *PgEngine) AddLogHook(ctx context.Context) {
162-
pge.l.AddHook(NewHook(ctx, pge.ConfigDb, pge.ClientName, 500, pge.Logging.LogDBLevel))
179+
pge.l.AddHook(NewHook(ctx, pge, pge.Logging.LogDBLevel))
163180
}
164181

165182
// QueryRowIface specifies interface to use QueryRow method
@@ -182,7 +199,7 @@ func (pge *PgEngine) TryLockClientName(ctx context.Context, conn QueryRowIface)
182199
}
183200
var wt int = WaitTime
184201
for {
185-
sql := fmt.Sprintf("SELECT timetable.try_lock_client_name(%d, $worker$%s$worker$)", os.Getpid(), pge.ClientName)
202+
sql := fmt.Sprintf("SELECT timetable.try_lock_client_name(%d, $worker$%s$worker$)", pge.Getpid(), pge.ClientName)
186203
var locked bool
187204
err = conn.QueryRow(ctx, sql).Scan(&locked)
188205
if err != nil {
@@ -262,7 +279,7 @@ DELETE FROM timetable.active_session WHERE client_name = $1`
262279
pge.ConfigDb = nil
263280
}
264281

265-
//Reconnect keeps trying reconnecting every `waitTime` seconds till connection established
282+
// Reconnect keeps trying reconnecting every `waitTime` seconds till connection established
266283
func (pge *PgEngine) Reconnect(ctx context.Context) bool {
267284
for pge.ConfigDb.Ping(ctx) != nil {
268285
pge.l.Info("Connection to the server was lost. Waiting for ", WaitTime, " sec...")

internal/pgengine/log_hook.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package pgengine
33
import (
44
"context"
55
"encoding/json"
6-
"os"
76
"time"
87

98
pgx "github.com/jackc/pgx/v4"
@@ -19,23 +18,24 @@ type LogHook struct {
1918
input chan logrus.Entry
2019
ctx context.Context
2120
lastError chan error
22-
pid int
21+
pid int32
2322
client string
2423
level string
2524
}
2625

2726
// NewHook creates a LogHook to be added to an instance of logger
28-
func NewHook(ctx context.Context, db PgxPoolIface, client string, cacheLimit int, level string) *LogHook {
27+
func NewHook(ctx context.Context, pge *PgEngine, level string) *LogHook {
28+
cacheLimit := 500
2929
l := &LogHook{
3030
cacheLimit: cacheLimit,
3131
cacheTimeout: 2 * time.Second,
3232
highLoadTimeout: 200 * time.Millisecond,
33-
db: db,
33+
db: pge.ConfigDb,
3434
input: make(chan logrus.Entry, cacheLimit),
3535
lastError: make(chan error),
3636
ctx: ctx,
37-
pid: os.Getpid(),
38-
client: client,
37+
pid: pge.Getpid(),
38+
client: pge.ClientName,
3939
level: level,
4040
}
4141
go l.poll(l.input)

internal/pgengine/log_hook_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ func TestLogHook(t *testing.T) {
4949
func TestCancelledContext(t *testing.T) {
5050
ctx, cancel := context.WithCancel(context.Background())
5151
cancel()
52-
h := NewHook(ctx, nil, "foo", 100, "debug")
52+
h := NewHook(ctx, &PgEngine{}, "debug")
5353
assert.Equal(t, h.Levels(), logrus.AllLevels)
5454
assert.NoError(t, h.Fire(&logrus.Entry{}))
5555
}
5656

5757
func TestFireError(t *testing.T) {
5858
ctx, cancel := context.WithCancel(context.Background())
5959
defer cancel()
60-
h := NewHook(ctx, nil, "foo", 100, "debug")
60+
h := NewHook(ctx, &PgEngine{}, "debug")
6161
err := errors.New("fire error")
6262
go func() { h.lastError <- err }()
6363
<-time.After(time.Second)

internal/pgengine/pgengine_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ func SetupTestCaseEx(t *testing.T, fc func(c *config.CmdOptions)) func(t *testin
2929
return SetupTestCase(t)
3030
}
3131

32-
//SetupTestCase used to connect and to initialize test PostgreSQL database
32+
// SetupTestCase used to connect and to initialize test PostgreSQL database
3333
func SetupTestCase(t *testing.T) func(t *testing.T) {
3434
t.Helper()
35-
timeout := time.After(6 * time.Second)
35+
timeout := time.After(30 * time.Second)
3636
done := make(chan bool)
3737
go func() {
3838
pge, _ = pgengine.New(context.Background(), *cmdOpts, log.Init(config.LoggingOpts{LogLevel: "error"}))

internal/pgengine/sql/ddl.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ COMMENT ON TABLE timetable.parameter IS
107107

108108
CREATE UNLOGGED TABLE timetable.active_session(
109109
client_pid BIGINT NOT NULL,
110-
client_name TEXT NOT NULL,
111110
server_pid BIGINT NOT NULL,
111+
client_name TEXT NOT NULL,
112112
started_at TIMESTAMPTZ DEFAULT now()
113113
);
114114

0 commit comments

Comments
 (0)