-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathscheduler.go
More file actions
147 lines (124 loc) · 4.53 KB
/
scheduler.go
File metadata and controls
147 lines (124 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package scheduler
import (
"context"
"sync"
"time"
"github.com/cybertec-postgresql/pg_timetable/internal/config"
"github.com/cybertec-postgresql/pg_timetable/internal/log"
"github.com/cybertec-postgresql/pg_timetable/internal/pgengine"
)
// the main loop period. Should be 60 (sec) for release configuration. Set to 10 (sec) for debug purposes
const refetchTimeout = 60
// the min capacity of chains channels
const minChannelCapacity = 1024
// RunStatus specifies the current status of execution
type RunStatus int
const (
// RunningStatus specifies the scheduler is in the main loop processing chains
RunningStatus RunStatus = iota
// ContextCancelledStatus specifies the context has been cancelled probably due to timeout
ContextCancelledStatus
// Shutdown specifies proper termination of the session
ShutdownStatus
)
// Scheduler is the main class for running the tasks
type Scheduler struct {
pgengine *pgengine.PgEngine
l log.LoggerIface
chainsChan chan Chain // channel for passing chains to workers
ichainsChan chan IntervalChain // channel for passing interval chains to workers
exclusiveMutex sync.RWMutex //read-write mutex for running regular and exclusive chains
activeChains map[int]func() // map of chain ID with context cancel() function to abort chain by request
activeChainMutex sync.Mutex
intervalChains map[int]IntervalChain // map of active chains, updated every minute
intervalChainMutex sync.Mutex
shutdown chan struct{} // closed when shutdown is called
status RunStatus
}
// New returns a new instance of Scheduler
func New(pge *pgengine.PgEngine, logger log.LoggerIface) *Scheduler {
return &Scheduler{
l: logger,
pgengine: pge,
chainsChan: make(chan Chain, max(minChannelCapacity, pge.Resource.CronWorkers*2)),
ichainsChan: make(chan IntervalChain, max(minChannelCapacity, pge.Resource.IntervalWorkers*2)),
activeChains: make(map[int]func()), //holds cancel() functions to stop chains
intervalChains: make(map[int]IntervalChain),
shutdown: make(chan struct{}),
status: RunningStatus,
}
}
// Shutdown terminates the current session
func (sch *Scheduler) Shutdown() {
close(sch.shutdown)
}
// Config returns the current configuration for application
func (sch *Scheduler) Config() config.CmdOptions {
return sch.pgengine.CmdOptions
}
// IsReady returns True if the scheduler is in the main loop processing chains
func (sch *Scheduler) IsReady() bool {
return sch.status == RunningStatus
}
func (sch *Scheduler) StartChain(ctx context.Context, chainID int) error {
return sch.processAsyncChain(ctx, ChainSignal{
ConfigID: chainID,
Command: "START",
Ts: time.Now().Unix()})
}
func (sch *Scheduler) StopChain(ctx context.Context, chainID int) error {
return sch.processAsyncChain(ctx, ChainSignal{
ConfigID: chainID,
Command: "STOP",
Ts: time.Now().Unix()})
}
// Run executes jobs. Returns RunStatus why it terminated.
// There are only two possibilities: dropped connection and cancelled context.
func (sch *Scheduler) Run(ctx context.Context) RunStatus {
// create sleeping workers waiting data on channel
for w := 1; w <= sch.Config().Resource.CronWorkers; w++ {
workerCtx, cancel := context.WithCancel(ctx)
defer cancel()
go sch.chainWorker(workerCtx, sch.chainsChan)
}
for w := 1; w <= sch.Config().Resource.IntervalWorkers; w++ {
workerCtx, cancel := context.WithCancel(ctx)
defer cancel()
go sch.intervalChainWorker(workerCtx, sch.ichainsChan)
}
ctx = log.WithLogger(ctx, sch.l)
/*
Loop forever or until we ask it to stop.
First loop fetches notifications.
Main loop works every refetchTimeout seconds and runs chains.
*/
sch.l.Info("Accepting asynchronous chains execution requests...")
go sch.retrieveAsyncChainsAndRun(ctx)
if sch.Config().Start.Debug { //run blocking notifications receiving
sch.pgengine.HandleNotifications(ctx)
return ContextCancelledStatus
}
sch.l.Debug("Checking for @reboot task chains...")
sch.retrieveChainsAndRun(ctx, true)
// Use ticker for strict intervals
ticker := time.NewTicker(refetchTimeout * time.Second)
defer ticker.Stop()
for {
sch.l.Debug("Checking for task chains...")
go sch.retrieveChainsAndRun(ctx, false)
sch.l.Debug("Checking for interval task chains...")
go sch.retrieveIntervalChainsAndRun(ctx)
select {
case <-ticker.C:
// pass
case <-ctx.Done():
sch.status = ContextCancelledStatus
case <-sch.shutdown:
sch.status = ShutdownStatus
sch.terminateChains()
}
if sch.status != RunningStatus {
return sch.status
}
}
}