-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathmigration.go
More file actions
180 lines (171 loc) · 5.34 KB
/
migration.go
File metadata and controls
180 lines (171 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package pgengine
import (
"context"
"embed"
"github.com/cybertec-postgresql/pg_timetable/internal/log"
migrator "github.com/cybertec-postgresql/pgx-migrator"
pgx "github.com/jackc/pgx/v5"
)
//go:embed sql/migrations/*.sql
var migrationsFiles embed.FS
// MigrateDb upgrades database with all migrations
func (pge *PgEngine) MigrateDb(ctx context.Context) error {
m, err := pge.initMigrator()
if err != nil {
return err
}
pge.l.Info("Upgrading database...")
conn, err := pge.ConfigDb.Acquire(ctx)
defer conn.Release()
if err != nil {
return err
}
return m.Migrate(ctx, conn.Conn())
}
// CheckNeedMigrateDb checks need of upgrading database and throws error if that's true
func (pge *PgEngine) CheckNeedMigrateDb(ctx context.Context) (bool, error) {
m, err := pge.initMigrator()
if err != nil {
return false, err
}
pge.l.Debug("Check need of upgrading database...")
ctx = log.WithLogger(ctx, pge.l)
conn, err := pge.ConfigDb.Acquire(ctx)
defer conn.Release()
if err != nil {
return false, err
}
return m.NeedUpgrade(ctx, conn.Conn())
}
// ExecuteMigrationScript executes the migration script specified by fname within transaction tx
func ExecuteMigrationScript(ctx context.Context, tx pgx.Tx, fname string) error {
sql, err := migrationsFiles.ReadFile("sql/migrations/" + fname)
if err != nil {
return err
}
_, err = tx.Exec(ctx, string(sql))
return err
}
// Migrations holds function returning all updgrade migrations needed
var Migrations func() migrator.Option = func() migrator.Option {
return migrator.Migrations(
&migrator.Migration{
Name: "00259 Restart migrations for v4",
Func: func(context.Context, pgx.Tx) error {
// "migrations" table will be created automatically
return nil
},
},
&migrator.Migration{
Name: "00305 Fix timetable.is_cron_in_time",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00305.sql")
},
},
&migrator.Migration{
Name: "00323 Append timetable.delete_job function",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00323.sql")
},
},
&migrator.Migration{
Name: "00329 Migration required for some new added functions",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00329.sql")
},
},
&migrator.Migration{
Name: "00334 Refactor timetable.task as plain schema without tree-like dependencies",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00334.sql")
},
},
&migrator.Migration{
Name: "00381 Rewrite active chain handling",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00381.sql")
},
},
&migrator.Migration{
Name: "00394 Add started_at column to active_session and active_chain tables",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00394.sql")
},
},
&migrator.Migration{
Name: "00417 Rename LOG database log level to INFO",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00417.sql")
},
},
&migrator.Migration{
Name: "00436 Add txid column to timetable.execution_log",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00436.sql")
},
},
&migrator.Migration{
Name: "00534 Use cron_split_to_arrays() in cron domain check",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00534.sql")
},
},
&migrator.Migration{
Name: "00560 Alter txid column to bigint",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00560.sql")
},
},
&migrator.Migration{
Name: "00573 Add ability to start a chain with delay",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00573.sql")
},
},
&migrator.Migration{
Name: "00575 Add on_error handling",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00575.sql")
},
},
&migrator.Migration{
Name: "00629 Add ignore_error column to timetable.execution_log",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00629.sql")
},
},
&migrator.Migration{
Name: "00721 Add more job control functions",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00721.sql")
},
},
&migrator.Migration{
Name: "00733 Add params column to timetable.execution_log table",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00733.sql")
},
},
// adding new migration here, update "timetable"."migration" in "sql/init.sql"
// and "dbapi" variable in main.go!
// &migrator.Migration{
// Name: "000XX Short description of a migration",
// Func: func(ctx context.Context, tx pgx.Tx) error {
// return executeMigrationScript(ctx, tx, "000XX.sql")
// },
// },
)
}
func (pge *PgEngine) initMigrator() (*migrator.Migrator, error) {
m, err := migrator.New(
migrator.TableName("timetable.migration"),
migrator.SetNotice(func(s string) {
pge.l.Info(s)
}),
Migrations(),
)
if err != nil {
pge.l.WithError(err).Error("Cannot initialize migration")
}
return m, err
}