Skip to content

Commit 5c809a3

Browse files
committed
add func for leader endpoint(patroni) and call it after syncing statefulset and before syncing roles
1 parent d9bc339 commit 5c809a3

3 files changed

Lines changed: 87 additions & 4 deletions

File tree

pkg/cluster/cluster.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,6 +1272,42 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
12721272
return false
12731273
}
12741274

1275+
func (c *Cluster) waitForLeader(timeout time.Duration) error {
1276+
c.logger.Debugf("waiting up to %v for Patroni leader...", timeout)
1277+
1278+
deadline := time.Now().Add(timeout)
1279+
ticker := time.NewTicker(2 * time.Second)
1280+
defer ticker.Stop()
1281+
1282+
for {
1283+
select {
1284+
case <-ticker.C:
1285+
if time.Now().After(deadline) {
1286+
return fmt.Errorf("timeout waiting for Patroni leader")
1287+
}
1288+
1289+
pods, err := c.listPodsOfType(TYPE_POSTGRESQL)
1290+
if err != nil {
1291+
continue
1292+
}
1293+
1294+
for _, pod := range pods {
1295+
isLeader, err := c.patroni.IsLeader(&pod)
1296+
1297+
if err != nil {
1298+
c.logger.Debugf("check leader failed for %s: %v", pod.Name, err)
1299+
continue
1300+
}
1301+
1302+
if isLeader {
1303+
c.logger.Infof("Patroni leader found: %s. Proceeding with DB sync.", pod.Name)
1304+
return nil
1305+
}
1306+
}
1307+
}
1308+
}
1309+
}
1310+
12751311
// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
12761312
// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes
12771313
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint

pkg/cluster/sync.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,22 +261,46 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error {
261261
return err
262262
}
263263

264+
// Check if Cluster has an Leader
265+
needDBAccess := !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress())
264266
// create database objects unless we are running without pods or disabled that feature explicitly
265-
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) {
267+
if needDBAccess {
268+
if err := c.waitForLeader(60 * time.Second); err != nil {
269+
c.logger.Infof("Postgres not yet ready for writes (Patroni leader election pending?). Skipping DB sync until next loop: %v", err)
270+
return nil
271+
}
272+
266273
c.logger.Debug("syncing roles")
267274
if err = c.syncRoles(); err != nil {
268275
c.logger.Errorf("could not sync roles: %v", err)
269276
}
277+
270278
c.logger.Debug("syncing databases")
271279
if err = c.syncDatabases(); err != nil {
272280
c.logger.Errorf("could not sync databases: %v", err)
273281
}
282+
274283
c.logger.Debug("syncing prepared databases with schemas")
275284
if err = c.syncPreparedDatabases(); err != nil {
276285
c.logger.Errorf("could not sync prepared database: %v", err)
277286
}
278287
}
279288

289+
// if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) {
290+
// c.logger.Debug("syncing roles")
291+
// if err = c.syncRoles(); err != nil {
292+
// c.logger.Errorf("could not sync roles: %v", err)
293+
// }
294+
// c.logger.Debug("syncing databases")
295+
// if err = c.syncDatabases(); err != nil {
296+
// c.logger.Errorf("could not sync databases: %v", err)
297+
// }
298+
// c.logger.Debug("syncing prepared databases with schemas")
299+
// if err = c.syncPreparedDatabases(); err != nil {
300+
// c.logger.Errorf("could not sync prepared database: %v", err)
301+
// }
302+
// }
303+
280304
// sync connection pooler
281305
if _, err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil {
282306
return fmt.Errorf("could not sync connection pooler: %v", err)

pkg/util/patroni/patroni.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/constants"
1515
httpclient "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/httpclient"
1616

17-
"github.com/sirupsen/logrus"
1817
cpov1 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/apis/cpo.opensource.cybertec.at/v1"
18+
"github.com/sirupsen/logrus"
1919
v1 "k8s.io/api/core/v1"
2020
)
2121

@@ -25,6 +25,7 @@ const (
2525
clusterPath = "/cluster"
2626
statusPath = "/patroni"
2727
restartPath = "/restart"
28+
leaderPath = "/leader"
2829
ApiPort = 8008
2930
timeout = 30 * time.Second
3031
)
@@ -38,6 +39,7 @@ type Interface interface {
3839
Restart(server *v1.Pod) error
3940
GetConfig(server *v1.Pod) (cpov1.Patroni, map[string]string, error)
4041
SetConfig(server *v1.Pod, config map[string]interface{}) error
42+
IsLeader(server *v1.Pod) (bool, error)
4143
}
4244

4345
// Patroni API client
@@ -150,7 +152,7 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
150152

151153
//TODO: add an option call /patroni to check if it is necessary to restart the server
152154

153-
//SetPostgresParameters sets Postgres options via Patroni patch API call.
155+
// SetPostgresParameters sets Postgres options via Patroni patch API call.
154156
func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error {
155157
buf := &bytes.Buffer{}
156158
err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"postgresql": {"parameters": parameters}})
@@ -164,7 +166,7 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
164166
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
165167
}
166168

167-
//SetConfig sets Patroni options via Patroni patch API call.
169+
// SetConfig sets Patroni options via Patroni patch API call.
168170
func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error {
169171
buf := &bytes.Buffer{}
170172
err := json.NewEncoder(buf).Encode(config)
@@ -320,3 +322,24 @@ func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) {
320322

321323
return data, nil
322324
}
325+
326+
// Call leader-Endpoint (expecting statuscode 200 or 503)
327+
func (p *Patroni) IsLeader(server *v1.Pod) (bool, error) {
328+
apiURLString, err := apiURL(server)
329+
if err != nil {
330+
return false, err
331+
}
332+
333+
resp, err := p.httpClient.Get(apiURLString + leaderPath)
334+
if err != nil {
335+
return false, fmt.Errorf("request failed: %v", err)
336+
}
337+
defer resp.Body.Close()
338+
339+
// 200 = Leader, 503 = Replica
340+
if resp.StatusCode == http.StatusOK {
341+
return true, nil
342+
}
343+
344+
return false, nil
345+
}

0 commit comments

Comments
 (0)