Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/apis/cpo.opensource.cybertec.at/v1/crds.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
"standby_port": {
Type: "string",
},
"standby_primary_slot_name": {
Type: "string",
},
},
OneOf: []apiextv1.JSONSchemaProps{
apiextv1.JSONSchemaProps{Required: []string{"s3_wal_path"}},
Expand Down Expand Up @@ -1465,6 +1468,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
"image": {
Type: "string",
},
"customQueries": {
Type: "string",
},
},
},
},
Expand Down
12 changes: 7 additions & 5 deletions pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,11 @@ type Patroni struct {

// StandbyDescription contains remote primary config or s3/gs wal path
type StandbyDescription struct {
S3WalPath string `json:"s3_wal_path,omitempty"`
GSWalPath string `json:"gs_wal_path,omitempty"`
StandbyHost string `json:"standby_host,omitempty"`
StandbyPort string `json:"standby_port,omitempty"`
S3WalPath string `json:"s3_wal_path,omitempty"`
GSWalPath string `json:"gs_wal_path,omitempty"`
StandbyHost string `json:"standby_host,omitempty"`
StandbyPort string `json:"standby_port,omitempty"`
StandbyPrimarySlotName string `json:"standby_primary_slot_name,omitempty"`
}

// TLSDescription specs TLS properties
Expand Down Expand Up @@ -323,7 +324,8 @@ type TDE struct {

// Monitoring Sidecar defines a container to be run in the same pod as the Postgres container.
type Monitoring struct {
Image string `json:"image,omitempty"`
Image string `json:"image,omitempty"`
CustomQueries string `json:"customQueries,omitempty"`
}

// Multisite enables cross Kubernetes replication coordinated via etcd
Expand Down
7 changes: 7 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,13 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {
}
}()

// add or remove standby_cluster section from Patroni config depending on changes in standby section
if !reflect.DeepEqual(oldSpec.Spec.StandbyCluster, newSpec.Spec.StandbyCluster) {
if err := c.syncStandbyClusterConfiguration(); err != nil {
return fmt.Errorf("could not set StandbyCluster configuration options: %v", err)
}
}

// pod disruption budget
if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances {
c.logger.Debug("syncing pod disruption budgets")
Expand Down
24 changes: 24 additions & 0 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,12 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu
additionalVolumes = append(additionalVolumes, c.generatePgbackrestCloneConfigVolumes(spec.Clone)...)
}

if c.Spec.Monitoring != nil && c.Spec.Monitoring.CustomQueries != "" {
if queryVol := c.generateCustomQueriesVolume(c.Spec.Monitoring.CustomQueries); queryVol != nil {
additionalVolumes = append(additionalVolumes, *queryVol)
}
}

// generate pod template for the statefulset, based on the spilo container and sidecars
podTemplate, err = c.generatePodTemplate(
c.Namespace,
Expand Down Expand Up @@ -2146,6 +2152,24 @@ func (c *Cluster) generateCertSecretVolume() cpov1.AdditionalVolume {
}
}

func (c *Cluster) generateCustomQueriesVolume(customQueryConfigMap string) *cpov1.AdditionalVolume {
defaultMode := int32(0640)
return &cpov1.AdditionalVolume{
Name: "custom-queries-vol",
MountPath: "/postgres_exporter/custom_queries",
SubPath: "",
TargetContainers: []string{"postgres-exporter"},
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: customQueryConfigMap,
},
DefaultMode: &defaultMode,
},
},
}
}

func (c *Cluster) generatePodAnnotations(spec *cpov1.PostgresSpec) map[string]string {
annotations := make(map[string]string)
for k, v := range c.OpConfig.CustomPodAnnotations {
Expand Down
64 changes: 64 additions & 0 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@
}
}

// add or remove standby_cluster section from Patroni config depending on changes in standby section
if !reflect.DeepEqual(oldSpec.Spec.StandbyCluster, newSpec.Spec.StandbyCluster) {
if err := c.syncStandbyClusterConfiguration(); err != nil {
return fmt.Errorf("could not sync StandbyCluster configuration: %v", err)
}
}

c.logger.Debug("syncing pod disruption budgets")
if err = c.syncPodDisruptionBudget(false); err != nil {
err = fmt.Errorf("could not sync pod disruption budget: %v", err)
Expand Down Expand Up @@ -973,6 +980,63 @@
return configPatched, requiresMasterRestart, nil
}

// syncStandbyClusterConfiguration checks whether standby cluster
// parameters have changed and if necessary sets it via the Patroni API
func (c *Cluster) syncStandbyClusterConfiguration() error {
var (
err error
pods []v1.Pod
)

standbyOptionsToSet := make(map[string]interface{})
if c.Spec.StandbyCluster != nil {
c.logger.Infof("turning %q into a standby cluster", c.Name)
standbyOptionsToSet["create_replica_methods"] = []string{"bootstrap_standby_with_wale", "basebackup_fast_xlog"}
standbyOptionsToSet["restore_command"] = "envdir \"/run/etc/wal-e.d/env-standby\" /scripts/restore_command.sh \"%f\" \"%p\""

if c.Spec.StandbyCluster.StandbyHost != "" {
standbyOptionsToSet["host"] = c.Spec.StandbyCluster.StandbyHost
} else {
standbyOptionsToSet["host"] = nil
}

if c.Spec.StandbyCluster.StandbyPort != "" {
standbyOptionsToSet["port"] = c.Spec.StandbyCluster.StandbyPort
} else {
standbyOptionsToSet["port"] = nil
}

if c.Spec.StandbyCluster.StandbyPrimarySlotName != "" {
standbyOptionsToSet["primary_slot_name"] = c.Spec.StandbyCluster.StandbyPrimarySlotName
} else {
standbyOptionsToSet["primary_slot_name"] = nil
}
} else {
c.logger.Infof("promoting standby cluster and detach from source")
standbyOptionsToSet = nil
}

if pods, err = c.listPods(); err != nil {
return err
}
if len(pods) == 0 {
return fmt.Errorf("could not call Patroni API: cluster has no pods")
}
// try all pods until the first one that is successful, as it doesn't matter which pod
// carries the request to change configuration through
for _, pod := range pods {
podName := util.NameFromMeta(pod.ObjectMeta)
c.logger.Infof("patching Postgres config via Patroni API on pod %s with following options: %s",
podName, standbyOptionsToSet)
if err = c.patroni.SetStandbyClusterParameters(&pod, standbyOptionsToSet); err == nil {

Check failure on line 1031 in pkg/cluster/sync.go

View workflow job for this annotation

GitHub Actions / Unit tests and coverage

c.patroni.SetStandbyClusterParameters undefined (type patroni.Interface has no field or method SetStandbyClusterParameters)

Check failure on line 1031 in pkg/cluster/sync.go

View workflow job for this annotation

GitHub Actions / End-2-End tests

c.patroni.SetStandbyClusterParameters undefined (type patroni.Interface has no field or method SetStandbyClusterParameters)

Check failure on line 1031 in pkg/cluster/sync.go

View workflow job for this annotation

GitHub Actions / Unit tests and coverage

c.patroni.SetStandbyClusterParameters undefined (type patroni.Interface has no field or method SetStandbyClusterParameters)

Check failure on line 1031 in pkg/cluster/sync.go

View workflow job for this annotation

GitHub Actions / End-2-End tests

c.patroni.SetStandbyClusterParameters undefined (type patroni.Interface has no field or method SetStandbyClusterParameters)
return nil
}
c.logger.Warningf("could not patch postgres parameters within pod %s: %v", podName, err)
}
return fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)",
len(pods))
}

func (c *Cluster) syncSecrets() error {
c.logger.Info("syncing secrets")
c.setProcessName("syncing secrets")
Expand Down
Loading