Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,13 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {

}()

// Ensure the cluster has a leader
if err := c.waitForLeader(60 * time.Second); err != nil {
c.logger.Infof("Postgres not yet ready for writes (Patroni leader election pending?). Skipping DB sync until next loop: %v", err)
updateFailed = true
return nil
}

// Roles and Databases
if !userInitFailed && !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) {
c.logger.Debugf("syncing roles")
Expand Down Expand Up @@ -1205,12 +1212,16 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {

// Check if we need to call addMonitoringPermissions-func
if c.Spec.Monitoring != nil && newSpec.Spec.Monitoring != nil && oldSpec.Spec.Monitoring == nil {
c.addMonitoringPermissions()
if err := c.addMonitoringPermissions(); err != nil {
c.logger.Errorf("could not add monitoring permissions: %v", err)
updateFailed = true
}
}
// Check if Monitoring-Secret needs to be removed
if newSpec.Spec.Monitoring == nil && oldSpec.Spec.Monitoring != nil {
if err := c.deleteMonitoringSecret(); err != nil {
return fmt.Errorf("could not remove the Monitoring secret: %v", err)
c.logger.Errorf("could not remove the Monitoring secret: %v", err)
updateFailed = true
}
}

Expand Down Expand Up @@ -1272,6 +1283,42 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
return false
}

func (c *Cluster) waitForLeader(timeout time.Duration) error {
c.logger.Debugf("waiting up to %v for Patroni leader...", timeout)

deadline := time.Now().Add(timeout)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if time.Now().After(deadline) {
return fmt.Errorf("timeout waiting for Patroni leader")
}

pods, err := c.listPodsOfType(TYPE_POSTGRESQL)
if err != nil {
continue
}

for _, pod := range pods {
isLeader, err := c.patroni.IsLeader(&pod)

if err != nil {
c.logger.Debugf("check leader failed for %s: %v", pod.Name, err)
continue
}

if isLeader {
c.logger.Infof("Patroni leader found: %s. Proceeding with DB sync.", pod.Name)
return nil
}
}
}
}
}

// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint
Expand Down
26 changes: 25 additions & 1 deletion pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,22 +261,46 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error {
return err
}

// Check if Cluster has an Leader
needDBAccess := !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress())
// create database objects unless we are running without pods or disabled that feature explicitly
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) {
if needDBAccess {
if err := c.waitForLeader(60 * time.Second); err != nil {
c.logger.Infof("Postgres not yet ready for writes (Patroni leader election pending?). Skipping DB sync until next loop: %v", err)
return nil
}

c.logger.Debug("syncing roles")
if err = c.syncRoles(); err != nil {
c.logger.Errorf("could not sync roles: %v", err)
}

c.logger.Debug("syncing databases")
if err = c.syncDatabases(); err != nil {
c.logger.Errorf("could not sync databases: %v", err)
}

c.logger.Debug("syncing prepared databases with schemas")
if err = c.syncPreparedDatabases(); err != nil {
c.logger.Errorf("could not sync prepared database: %v", err)
}
}

// if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil || c.restoreInProgress()) {
// c.logger.Debug("syncing roles")
// if err = c.syncRoles(); err != nil {
// c.logger.Errorf("could not sync roles: %v", err)
// }
// c.logger.Debug("syncing databases")
// if err = c.syncDatabases(); err != nil {
// c.logger.Errorf("could not sync databases: %v", err)
// }
// c.logger.Debug("syncing prepared databases with schemas")
// if err = c.syncPreparedDatabases(); err != nil {
// c.logger.Errorf("could not sync prepared database: %v", err)
// }
// }

// sync connection pooler
if _, err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil {
return fmt.Errorf("could not sync connection pooler: %v", err)
Expand Down
29 changes: 26 additions & 3 deletions pkg/util/patroni/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/constants"
httpclient "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/httpclient"

"github.com/sirupsen/logrus"
cpov1 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/apis/cpo.opensource.cybertec.at/v1"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)

Expand All @@ -25,6 +25,7 @@ const (
clusterPath = "/cluster"
statusPath = "/patroni"
restartPath = "/restart"
leaderPath = "/leader"
ApiPort = 8008
timeout = 30 * time.Second
)
Expand All @@ -38,6 +39,7 @@ type Interface interface {
Restart(server *v1.Pod) error
GetConfig(server *v1.Pod) (cpov1.Patroni, map[string]string, error)
SetConfig(server *v1.Pod, config map[string]interface{}) error
IsLeader(server *v1.Pod) (bool, error)
}

// Patroni API client
Expand Down Expand Up @@ -150,7 +152,7 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {

//TODO: add an option call /patroni to check if it is necessary to restart the server

//SetPostgresParameters sets Postgres options via Patroni patch API call.
// SetPostgresParameters sets Postgres options via Patroni patch API call.
func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error {
buf := &bytes.Buffer{}
err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"postgresql": {"parameters": parameters}})
Expand All @@ -164,7 +166,7 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
}

//SetConfig sets Patroni options via Patroni patch API call.
// SetConfig sets Patroni options via Patroni patch API call.
func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error {
buf := &bytes.Buffer{}
err := json.NewEncoder(buf).Encode(config)
Expand Down Expand Up @@ -320,3 +322,24 @@ func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) {

return data, nil
}

// Call leader-Endpoint (expecting statuscode 200 or 503)
func (p *Patroni) IsLeader(server *v1.Pod) (bool, error) {
apiURLString, err := apiURL(server)
if err != nil {
return false, err
}

resp, err := p.httpClient.Get(apiURLString + leaderPath)
if err != nil {
return false, fmt.Errorf("request failed: %v", err)
}
defer resp.Body.Close()

// 200 = Leader, 503 = Replica
if resp.StatusCode == http.StatusOK {
return true, nil
}

return false, nil
}
Loading