Skip to content

Commit 5e7df14

Browse files
authored
Merge pull request #120 from cybertec-postgresql/fixLeaderNotReady
add func for leader endpoint(patroni) and call it after syncing state…
2 parents 169d2c2 + 63a5f1b commit 5e7df14

3 files changed

Lines changed: 100 additions & 6 deletions

File tree

pkg/cluster/cluster.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,6 +1169,13 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {
11691169

11701170
}()
11711171

1172+
// Ensure the cluster has a leader
1173+
if err := c.waitForLeader(60 * time.Second); err != nil {
1174+
c.logger.Infof("Postgres not yet ready for writes (Patroni leader election pending?). Skipping DB sync until next loop: %v", err)
1175+
updateFailed = true
1176+
return nil
1177+
}
1178+
11721179
// Roles and Databases
11731180
if !userInitFailed && !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) {
11741181
c.logger.Debugf("syncing roles")
@@ -1205,12 +1212,16 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {
12051212

12061213
// Check if we need to call addMonitoringPermissions-func
12071214
if c.Spec.Monitoring != nil && newSpec.Spec.Monitoring != nil && oldSpec.Spec.Monitoring == nil {
1208-
c.addMonitoringPermissions()
1215+
if err := c.addMonitoringPermissions(); err != nil {
1216+
c.logger.Errorf("could not add monitoring permissions: %v", err)
1217+
updateFailed = true
1218+
}
12091219
}
12101220
// Check if Monitoring-Secret needs to be removed
12111221
if newSpec.Spec.Monitoring == nil && oldSpec.Spec.Monitoring != nil {
12121222
if err := c.deleteMonitoringSecret(); err != nil {
1213-
return fmt.Errorf("could not remove the Monitoring secret: %v", err)
1223+
c.logger.Errorf("could not remove the Monitoring secret: %v", err)
1224+
updateFailed = true
12141225
}
12151226
}
12161227

@@ -1272,6 +1283,42 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
12721283
return false
12731284
}
12741285

1286+
func (c *Cluster) waitForLeader(timeout time.Duration) error {
1287+
c.logger.Debugf("waiting up to %v for Patroni leader...", timeout)
1288+
1289+
deadline := time.Now().Add(timeout)
1290+
ticker := time.NewTicker(2 * time.Second)
1291+
defer ticker.Stop()
1292+
1293+
for {
1294+
select {
1295+
case <-ticker.C:
1296+
if time.Now().After(deadline) {
1297+
return fmt.Errorf("timeout waiting for Patroni leader")
1298+
}
1299+
1300+
pods, err := c.listPodsOfType(TYPE_POSTGRESQL)
1301+
if err != nil {
1302+
continue
1303+
}
1304+
1305+
for _, pod := range pods {
1306+
isLeader, err := c.patroni.IsLeader(&pod)
1307+
1308+
if err != nil {
1309+
c.logger.Debugf("check leader failed for %s: %v", pod.Name, err)
1310+
continue
1311+
}
1312+
1313+
if isLeader {
1314+
c.logger.Infof("Patroni leader found: %s. Proceeding with DB sync.", pod.Name)
1315+
return nil
1316+
}
1317+
}
1318+
}
1319+
}
1320+
}
1321+
12751322
// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
12761323
// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes
12771324
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint

pkg/cluster/sync.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,22 +261,46 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error {
261261
return err
262262
}
263263

264+
// Check if Cluster has an Leader
265+
needDBAccess := !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress())
264266
// create database objects unless we are running without pods or disabled that feature explicitly
265-
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) {
267+
if needDBAccess {
268+
if err := c.waitForLeader(60 * time.Second); err != nil {
269+
c.logger.Infof("Postgres not yet ready for writes (Patroni leader election pending?). Skipping DB sync until next loop: %v", err)
270+
return nil
271+
}
272+
266273
c.logger.Debug("syncing roles")
267274
if err = c.syncRoles(); err != nil {
268275
c.logger.Errorf("could not sync roles: %v", err)
269276
}
277+
270278
c.logger.Debug("syncing databases")
271279
if err = c.syncDatabases(); err != nil {
272280
c.logger.Errorf("could not sync databases: %v", err)
273281
}
282+
274283
c.logger.Debug("syncing prepared databases with schemas")
275284
if err = c.syncPreparedDatabases(); err != nil {
276285
c.logger.Errorf("could not sync prepared database: %v", err)
277286
}
278287
}
279288

289+
// if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) {
290+
// c.logger.Debug("syncing roles")
291+
// if err = c.syncRoles(); err != nil {
292+
// c.logger.Errorf("could not sync roles: %v", err)
293+
// }
294+
// c.logger.Debug("syncing databases")
295+
// if err = c.syncDatabases(); err != nil {
296+
// c.logger.Errorf("could not sync databases: %v", err)
297+
// }
298+
// c.logger.Debug("syncing prepared databases with schemas")
299+
// if err = c.syncPreparedDatabases(); err != nil {
300+
// c.logger.Errorf("could not sync prepared database: %v", err)
301+
// }
302+
// }
303+
280304
// sync connection pooler
281305
if _, err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil {
282306
return fmt.Errorf("could not sync connection pooler: %v", err)

pkg/util/patroni/patroni.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/constants"
1515
httpclient "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/httpclient"
1616

17-
"github.com/sirupsen/logrus"
1817
cpov1 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/apis/cpo.opensource.cybertec.at/v1"
18+
"github.com/sirupsen/logrus"
1919
v1 "k8s.io/api/core/v1"
2020
)
2121

@@ -25,6 +25,7 @@ const (
2525
clusterPath = "/cluster"
2626
statusPath = "/patroni"
2727
restartPath = "/restart"
28+
leaderPath = "/leader"
2829
ApiPort = 8008
2930
timeout = 30 * time.Second
3031
)
@@ -38,6 +39,7 @@ type Interface interface {
3839
Restart(server *v1.Pod) error
3940
GetConfig(server *v1.Pod) (cpov1.Patroni, map[string]string, error)
4041
SetConfig(server *v1.Pod, config map[string]interface{}) error
42+
IsLeader(server *v1.Pod) (bool, error)
4143
}
4244

4345
// Patroni API client
@@ -150,7 +152,7 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
150152

151153
//TODO: add an option call /patroni to check if it is necessary to restart the server
152154

153-
//SetPostgresParameters sets Postgres options via Patroni patch API call.
155+
// SetPostgresParameters sets Postgres options via Patroni patch API call.
154156
func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error {
155157
buf := &bytes.Buffer{}
156158
err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"postgresql": {"parameters": parameters}})
@@ -164,7 +166,7 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
164166
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
165167
}
166168

167-
//SetConfig sets Patroni options via Patroni patch API call.
169+
// SetConfig sets Patroni options via Patroni patch API call.
168170
func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error {
169171
buf := &bytes.Buffer{}
170172
err := json.NewEncoder(buf).Encode(config)
@@ -320,3 +322,24 @@ func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) {
320322

321323
return data, nil
322324
}
325+
326+
// Call leader-Endpoint (expecting statuscode 200 or 503)
327+
func (p *Patroni) IsLeader(server *v1.Pod) (bool, error) {
328+
apiURLString, err := apiURL(server)
329+
if err != nil {
330+
return false, err
331+
}
332+
333+
resp, err := p.httpClient.Get(apiURLString + leaderPath)
334+
if err != nil {
335+
return false, fmt.Errorf("request failed: %v", err)
336+
}
337+
defer resp.Body.Close()
338+
339+
// 200 = Leader, 503 = Replica
340+
if resp.StatusCode == http.StatusOK {
341+
return true, nil
342+
}
343+
344+
return false, nil
345+
}

0 commit comments

Comments
 (0)