diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go index bf7a3934..b6a90ebc 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go @@ -889,6 +889,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ "standby_port": { Type: "string", }, + "standby_primary_slot_name": { + Type: "string", + }, }, OneOf: []apiextv1.JSONSchemaProps{ apiextv1.JSONSchemaProps{Required: []string{"s3_wal_path"}}, @@ -1465,6 +1468,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ "image": { Type: "string", }, + "customQueries": { + Type: "string", + }, }, }, }, diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go index 51387778..4a4c2549 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go @@ -186,10 +186,11 @@ type Patroni struct { // StandbyDescription contains remote primary config or s3/gs wal path type StandbyDescription struct { - S3WalPath string `json:"s3_wal_path,omitempty"` - GSWalPath string `json:"gs_wal_path,omitempty"` - StandbyHost string `json:"standby_host,omitempty"` - StandbyPort string `json:"standby_port,omitempty"` + S3WalPath string `json:"s3_wal_path,omitempty"` + GSWalPath string `json:"gs_wal_path,omitempty"` + StandbyHost string `json:"standby_host,omitempty"` + StandbyPort string `json:"standby_port,omitempty"` + StandbyPrimarySlotName string `json:"standby_primary_slot_name,omitempty"` } // TLSDescription specs TLS properties @@ -323,7 +324,8 @@ type TDE struct { // Monitoring Sidecar defines a container to be run in the same pod as the Postgres container. type Monitoring struct { - Image string `json:"image,omitempty"` + Image string `json:"image,omitempty"` + CustomQueries string `json:"customQueries,omitempty"` } // Multisite enables cross Kubernetes replication coordinated via etcd diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1ee54d75..27bb96d0 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1123,6 +1123,13 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { } }() + // add or remove standby_cluster section from Patroni config depending on changes in standby section + if !reflect.DeepEqual(oldSpec.Spec.StandbyCluster, newSpec.Spec.StandbyCluster) { + if err := c.syncStandbyClusterConfiguration(); err != nil { + return fmt.Errorf("could not set StandbyCluster configuration options: %v", err) + } + } + // pod disruption budget if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances { c.logger.Debug("syncing pod disruption budgets") diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index cccc7ee3..82a2bc15 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1623,6 +1623,12 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu additionalVolumes = append(additionalVolumes, c.generatePgbackrestCloneConfigVolumes(spec.Clone)...) } + if c.Spec.Monitoring != nil && c.Spec.Monitoring.CustomQueries != "" { + if queryVol := c.generateCustomQueriesVolume(c.Spec.Monitoring.CustomQueries); queryVol != nil { + additionalVolumes = append(additionalVolumes, *queryVol) + } + } + // generate pod template for the statefulset, based on the spilo container and sidecars podTemplate, err = c.generatePodTemplate( c.Namespace, @@ -2146,6 +2152,24 @@ func (c *Cluster) generateCertSecretVolume() cpov1.AdditionalVolume { } } +func (c *Cluster) generateCustomQueriesVolume(customQueryConfigMap string) *cpov1.AdditionalVolume { + defaultMode := int32(0640) + return &cpov1.AdditionalVolume{ + Name: "custom-queries-vol", + MountPath: "/postgres_exporter/custom_queries", + SubPath: "", + TargetContainers: []string{"postgres-exporter"}, + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: customQueryConfigMap, + }, + DefaultMode: &defaultMode, + }, + }, + } +} + func (c *Cluster) generatePodAnnotations(spec *cpov1.PostgresSpec) map[string]string { annotations := make(map[string]string) for k, v := range c.OpConfig.CustomPodAnnotations { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 7ebcc8b8..f1a6a280 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -238,6 +238,13 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { } } + // add or remove standby_cluster section from Patroni config depending on changes in standby section + if !reflect.DeepEqual(oldSpec.Spec.StandbyCluster, newSpec.Spec.StandbyCluster) { + if err := c.syncStandbyClusterConfiguration(); err != nil { + return fmt.Errorf("could not sync StandbyCluster configuration: %v", err) + } + } + c.logger.Debug("syncing pod disruption budgets") if err = c.syncPodDisruptionBudget(false); err != nil { err = fmt.Errorf("could not sync pod disruption budget: %v", err) @@ -973,6 +980,63 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv return configPatched, requiresMasterRestart, nil } +// syncStandbyClusterConfiguration checks whether standby cluster +// parameters have changed and if necessary sets it via the Patroni API +func (c *Cluster) syncStandbyClusterConfiguration() error { + var ( + err error + pods []v1.Pod + ) + + standbyOptionsToSet := make(map[string]interface{}) + if c.Spec.StandbyCluster != nil { + c.logger.Infof("turning %q into a standby cluster", c.Name) + standbyOptionsToSet["create_replica_methods"] = []string{"bootstrap_standby_with_wale", "basebackup_fast_xlog"} + standbyOptionsToSet["restore_command"] = "envdir \"/run/etc/wal-e.d/env-standby\" /scripts/restore_command.sh \"%f\" \"%p\"" + + if c.Spec.StandbyCluster.StandbyHost != "" { + standbyOptionsToSet["host"] = c.Spec.StandbyCluster.StandbyHost + } else { + standbyOptionsToSet["host"] = nil + } + + if c.Spec.StandbyCluster.StandbyPort != "" { + standbyOptionsToSet["port"] = c.Spec.StandbyCluster.StandbyPort + } else { + standbyOptionsToSet["port"] = nil + } + + if c.Spec.StandbyCluster.StandbyPrimarySlotName != "" { + standbyOptionsToSet["primary_slot_name"] = c.Spec.StandbyCluster.StandbyPrimarySlotName + } else { + standbyOptionsToSet["primary_slot_name"] = nil + } + } else { + c.logger.Infof("promoting standby cluster and detach from source") + standbyOptionsToSet = nil + } + + if pods, err = c.listPods(); err != nil { + return err + } + if len(pods) == 0 { + return fmt.Errorf("could not call Patroni API: cluster has no pods") + } + // try all pods until the first one that is successful, as it doesn't matter which pod + // carries the request to change configuration through + for _, pod := range pods { + podName := util.NameFromMeta(pod.ObjectMeta) + c.logger.Infof("patching Postgres config via Patroni API on pod %s with following options: %s", + podName, standbyOptionsToSet) + if err = c.patroni.SetStandbyClusterParameters(&pod, standbyOptionsToSet); err == nil { + return nil + } + c.logger.Warningf("could not patch postgres parameters within pod %s: %v", podName, err) + } + return fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)", + len(pods)) +} + func (c *Cluster) syncSecrets() error { c.logger.Info("syncing secrets") c.setProcessName("syncing secrets")