Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/apis/cpo.opensource.cybertec.at/v1/crds.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
"standby_port": {
Type: "string",
},
"standby_primary_slot_name": {
Type: "string",
},
},
OneOf: []apiextv1.JSONSchemaProps{
apiextv1.JSONSchemaProps{Required: []string{"s3_wal_path"}},
Expand Down Expand Up @@ -1465,6 +1468,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
"image": {
Type: "string",
},
"customQueries": {
Type: "string",
},
},
},
},
Expand Down
12 changes: 7 additions & 5 deletions pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,11 @@ type Patroni struct {

// StandbyDescription contains remote primary config or s3/gs wal path
type StandbyDescription struct {
S3WalPath string `json:"s3_wal_path,omitempty"`
GSWalPath string `json:"gs_wal_path,omitempty"`
StandbyHost string `json:"standby_host,omitempty"`
StandbyPort string `json:"standby_port,omitempty"`
S3WalPath string `json:"s3_wal_path,omitempty"`
GSWalPath string `json:"gs_wal_path,omitempty"`
StandbyHost string `json:"standby_host,omitempty"`
StandbyPort string `json:"standby_port,omitempty"`
StandbyPrimarySlotName string `json:"standby_primary_slot_name,omitempty"`
}

// TLSDescription specs TLS properties
Expand Down Expand Up @@ -323,7 +324,8 @@ type TDE struct {

// Monitoring Sidecar defines a container to be run in the same pod as the Postgres container.
type Monitoring struct {
Image string `json:"image,omitempty"`
Image string `json:"image,omitempty"`
CustomQueries string `json:"customQueries,omitempty"`
}

// Multisite enables cross Kubernetes replication coordinated via etcd
Expand Down
7 changes: 7 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,13 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {
}
}()

// add or remove standby_cluster section from Patroni config depending on changes in standby section
if !reflect.DeepEqual(oldSpec.Spec.StandbyCluster, newSpec.Spec.StandbyCluster) {
if err := c.syncStandbyClusterConfiguration(); err != nil {
return fmt.Errorf("could not set StandbyCluster configuration options: %v", err)
}
}

// pod disruption budget
if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances {
c.logger.Debug("syncing pod disruption budgets")
Expand Down
24 changes: 24 additions & 0 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,12 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu
additionalVolumes = append(additionalVolumes, c.generatePgbackrestCloneConfigVolumes(spec.Clone)...)
}

if c.Spec.Monitoring != nil && c.Spec.Monitoring.CustomQueries != "" {
if queryVol := c.generateCustomQueriesVolume(c.Spec.Monitoring.CustomQueries); queryVol != nil {
additionalVolumes = append(additionalVolumes, *queryVol)
}
}

// generate pod template for the statefulset, based on the spilo container and sidecars
podTemplate, err = c.generatePodTemplate(
c.Namespace,
Expand Down Expand Up @@ -2146,6 +2152,24 @@ func (c *Cluster) generateCertSecretVolume() cpov1.AdditionalVolume {
}
}

func (c *Cluster) generateCustomQueriesVolume(customQueryConfigMap string) *cpov1.AdditionalVolume {
defaultMode := int32(0640)
return &cpov1.AdditionalVolume{
Name: "custom-queries-vol",
MountPath: "/postgres_exporter/custom_queries",
SubPath: "",
TargetContainers: []string{"postgres-exporter"},
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: customQueryConfigMap,
},
DefaultMode: &defaultMode,
},
},
}
}

func (c *Cluster) generatePodAnnotations(spec *cpov1.PostgresSpec) map[string]string {
annotations := make(map[string]string)
for k, v := range c.OpConfig.CustomPodAnnotations {
Expand Down
64 changes: 64 additions & 0 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error {
}
}

// add or remove standby_cluster section from Patroni config depending on changes in standby section
if !reflect.DeepEqual(oldSpec.Spec.StandbyCluster, newSpec.Spec.StandbyCluster) {
if err := c.syncStandbyClusterConfiguration(); err != nil {
return fmt.Errorf("could not sync StandbyCluster configuration: %v", err)
}
}

c.logger.Debug("syncing pod disruption budgets")
if err = c.syncPodDisruptionBudget(false); err != nil {
err = fmt.Errorf("could not sync pod disruption budget: %v", err)
Expand Down Expand Up @@ -973,6 +980,63 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
return configPatched, requiresMasterRestart, nil
}

// syncStandbyClusterConfiguration checks whether standby cluster
// parameters have changed and if necessary sets it via the Patroni API
func (c *Cluster) syncStandbyClusterConfiguration() error {
var (
err error
pods []v1.Pod
)

standbyOptionsToSet := make(map[string]interface{})
if c.Spec.StandbyCluster != nil {
c.logger.Infof("turning %q into a standby cluster", c.Name)
standbyOptionsToSet["create_replica_methods"] = []string{"bootstrap_standby_with_wale", "basebackup_fast_xlog"}
standbyOptionsToSet["restore_command"] = "envdir \"/run/etc/wal-e.d/env-standby\" /scripts/restore_command.sh \"%f\" \"%p\""

if c.Spec.StandbyCluster.StandbyHost != "" {
standbyOptionsToSet["host"] = c.Spec.StandbyCluster.StandbyHost
} else {
standbyOptionsToSet["host"] = nil
}

if c.Spec.StandbyCluster.StandbyPort != "" {
standbyOptionsToSet["port"] = c.Spec.StandbyCluster.StandbyPort
} else {
standbyOptionsToSet["port"] = nil
}

if c.Spec.StandbyCluster.StandbyPrimarySlotName != "" {
standbyOptionsToSet["primary_slot_name"] = c.Spec.StandbyCluster.StandbyPrimarySlotName
} else {
standbyOptionsToSet["primary_slot_name"] = nil
}
} else {
c.logger.Infof("promoting standby cluster and detach from source")
standbyOptionsToSet = nil
}

if pods, err = c.listPods(); err != nil {
return err
}
if len(pods) == 0 {
return fmt.Errorf("could not call Patroni API: cluster has no pods")
}
// try all pods until the first one that is successful, as it doesn't matter which pod
// carries the request to change configuration through
for _, pod := range pods {
podName := util.NameFromMeta(pod.ObjectMeta)
c.logger.Infof("patching Postgres config via Patroni API on pod %s with following options: %s",
podName, standbyOptionsToSet)
if err = c.patroni.SetStandbyClusterParameters(&pod, standbyOptionsToSet); err == nil {
return nil
}
c.logger.Warningf("could not patch postgres parameters within pod %s: %v", podName, err)
}
return fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)",
len(pods))
}

func (c *Cluster) syncSecrets() error {
c.logger.Info("syncing secrets")
c.setProcessName("syncing secrets")
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/patroni/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Interface interface {
GetClusterMembers(master *v1.Pod) ([]ClusterMember, error)
Switchover(master *v1.Pod, candidate string) error
SetPostgresParameters(server *v1.Pod, options map[string]string) error
SetStandbyClusterParameters(server *v1.Pod, options map[string]interface{}) error
GetMemberData(server *v1.Pod) (MemberData, error)
Restart(server *v1.Pod) error
GetConfig(server *v1.Pod) (cpov1.Patroni, map[string]string, error)
Expand Down Expand Up @@ -166,6 +167,11 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
}

// SetStandbyClusterParameters sets StandbyCluster options via Patroni patch API call.
func (p *Patroni) SetStandbyClusterParameters(server *v1.Pod, parameters map[string]interface{}) error {
return p.SetConfig(server, map[string]interface{}{"standby_cluster": parameters})
}

// SetConfig sets Patroni options via Patroni patch API call.
func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error {
buf := &bytes.Buffer{}
Expand Down
Loading