From 5c809a323a534c3baf5c4dbac1e9cd1b57eb59e4 Mon Sep 17 00:00:00 2001 From: matthias Date: Tue, 16 Dec 2025 19:24:52 +0100 Subject: [PATCH 1/2] add func for leader endpoint(patroni) and call it after syncing statefulset and before syncing roles --- pkg/cluster/cluster.go | 36 ++++++++++++++++++++++++++++++++++++ pkg/cluster/sync.go | 26 +++++++++++++++++++++++++- pkg/util/patroni/patroni.go | 29 ++++++++++++++++++++++++++--- 3 files changed, 87 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 6b2e37856..e531fea1d 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1272,6 +1272,42 @@ func syncResources(a, b *v1.ResourceRequirements) bool { return false } +func (c *Cluster) waitForLeader(timeout time.Duration) error { + c.logger.Debugf("waiting up to %v for Patroni leader...", timeout) + + deadline := time.Now().Add(timeout) + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if time.Now().After(deadline) { + return fmt.Errorf("timeout waiting for Patroni leader") + } + + pods, err := c.listPodsOfType(TYPE_POSTGRESQL) + if err != nil { + continue + } + + for _, pod := range pods { + isLeader, err := c.patroni.IsLeader(&pod) + + if err != nil { + c.logger.Debugf("check leader failed for %s: %v", pod.Name, err) + continue + } + + if isLeader { + c.logger.Infof("Patroni leader found: %s. Proceeding with DB sync.", pod.Name) + return nil + } + } + } + } +} + // Delete deletes the cluster and cleans up all objects associated with it (including statefulsets). // The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes // DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index b0535481d..4618c14ce 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -261,22 +261,46 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { return err } + // Check if Cluster has an Leader + needDBAccess := !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) // create database objects unless we are running without pods or disabled that feature explicitly - if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) { + if needDBAccess { + if err := c.waitForLeader(60 * time.Second); err != nil { + c.logger.Infof("Postgres not yet ready for writes (Patroni leader election pending?). Skipping DB sync until next loop: %v", err) + return nil + } + c.logger.Debug("syncing roles") if err = c.syncRoles(); err != nil { c.logger.Errorf("could not sync roles: %v", err) } + c.logger.Debug("syncing databases") if err = c.syncDatabases(); err != nil { c.logger.Errorf("could not sync databases: %v", err) } + c.logger.Debug("syncing prepared databases with schemas") if err = c.syncPreparedDatabases(); err != nil { c.logger.Errorf("could not sync prepared database: %v", err) } } + // if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) { + // c.logger.Debug("syncing roles") + // if err = c.syncRoles(); err != nil { + // c.logger.Errorf("could not sync roles: %v", err) + // } + // c.logger.Debug("syncing databases") + // if err = c.syncDatabases(); err != nil { + // c.logger.Errorf("could not sync databases: %v", err) + // } + // c.logger.Debug("syncing prepared databases with schemas") + // if err = c.syncPreparedDatabases(); err != nil { + // c.logger.Errorf("could not sync prepared database: %v", err) + // } + // } + // sync connection pooler if _, err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil { return fmt.Errorf("could not sync connection pooler: %v", err) diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index da001d4df..a0ffa23f0 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -14,8 +14,8 @@ import ( "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/constants" httpclient "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/httpclient" - "github.com/sirupsen/logrus" cpov1 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/apis/cpo.opensource.cybertec.at/v1" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" ) @@ -25,6 +25,7 @@ const ( clusterPath = "/cluster" statusPath = "/patroni" restartPath = "/restart" + leaderPath = "/leader" ApiPort = 8008 timeout = 30 * time.Second ) @@ -38,6 +39,7 @@ type Interface interface { Restart(server *v1.Pod) error GetConfig(server *v1.Pod) (cpov1.Patroni, map[string]string, error) SetConfig(server *v1.Pod, config map[string]interface{}) error + IsLeader(server *v1.Pod) (bool, error) } // Patroni API client @@ -150,7 +152,7 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error { //TODO: add an option call /patroni to check if it is necessary to restart the server -//SetPostgresParameters sets Postgres options via Patroni patch API call. +// SetPostgresParameters sets Postgres options via Patroni patch API call. func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error { buf := &bytes.Buffer{} err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"postgresql": {"parameters": parameters}}) @@ -164,7 +166,7 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) } -//SetConfig sets Patroni options via Patroni patch API call. +// SetConfig sets Patroni options via Patroni patch API call. func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error { buf := &bytes.Buffer{} err := json.NewEncoder(buf).Encode(config) @@ -320,3 +322,24 @@ func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) { return data, nil } + +// Call leader-Endpoint (expecting statuscode 200 or 503) +func (p *Patroni) IsLeader(server *v1.Pod) (bool, error) { + apiURLString, err := apiURL(server) + if err != nil { + return false, err + } + + resp, err := p.httpClient.Get(apiURLString + leaderPath) + if err != nil { + return false, fmt.Errorf("request failed: %v", err) + } + defer resp.Body.Close() + + // 200 = Leader, 503 = Replica + if resp.StatusCode == http.StatusOK { + return true, nil + } + + return false, nil +} From 63a5f1b179f32296afd146b9155e3734c828c3f9 Mon Sep 17 00:00:00 2001 From: matthias Date: Tue, 16 Dec 2025 20:13:33 +0100 Subject: [PATCH 2/2] add patroni leader check to update() and catch error for addMonitoringPermissions --- pkg/cluster/cluster.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index e531fea1d..93aaf950b 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1169,6 +1169,13 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { }() + // Ensure the cluster has a leader + if err := c.waitForLeader(60 * time.Second); err != nil { + c.logger.Infof("Postgres not yet ready for writes (Patroni leader election pending?). Skipping DB sync until next loop: %v", err) + updateFailed = true + return nil + } + // Roles and Databases if !userInitFailed && !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) { c.logger.Debugf("syncing roles") @@ -1205,12 +1212,16 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { // Check if we need to call addMonitoringPermissions-func if c.Spec.Monitoring != nil && newSpec.Spec.Monitoring != nil && oldSpec.Spec.Monitoring == nil { - c.addMonitoringPermissions() + if err := c.addMonitoringPermissions(); err != nil { + c.logger.Errorf("could not add monitoring permissions: %v", err) + updateFailed = true + } } // Check if Monitoring-Secret needs to be removed if newSpec.Spec.Monitoring == nil && oldSpec.Spec.Monitoring != nil { if err := c.deleteMonitoringSecret(); err != nil { - return fmt.Errorf("could not remove the Monitoring secret: %v", err) + c.logger.Errorf("could not remove the Monitoring secret: %v", err) + updateFailed = true } }