diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index 496276038..4dd54c16b 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -206,6 +206,17 @@ spec: type: string user: type: string + env: + type: array + nullable: true + items: + type: object + x-kubernetes-preserve-unknown-fields: true + labels: + type: object + nullable: true + additionalProperties: + type: string databases: type: object additionalProperties: @@ -375,6 +386,18 @@ spec: type: object additionalProperties: type: string + monitor: + nullable: true + properties: + image: + type: string + env: + type: array + nullable: true + items: + type: object + x-kubernetes-preserve-unknown-fields: true + type: object nodeAffinity: type: object properties: @@ -553,6 +576,17 @@ spec: type: object additionalProperties: type: string + env: + type: array + nullable: true + items: + type: object + x-kubernetes-preserve-unknown-fields: true + labels: + type: object + nullable: true + additionalProperties: + type: string preparedDatabases: type: object additionalProperties: @@ -976,6 +1010,17 @@ spec: memory: type: string pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + env: + type: array + nullable: true + items: + type: object + x-kubernetes-preserve-unknown-fields: true + labels: + type: object + nullable: true + additionalProperties: + type: string required: - image - repos diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index 4df38fa16..8ed777c3b 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -196,6 +196,7 @@ rules: - get - list - patch + - update # to CRUD cron jobs for logical backups - apiGroups: - batch diff --git a/docs/hugo/content/en/connection_pooler/_index.md b/docs/hugo/content/en/connection_pooler/_index.md index f62488b20..3a5f76001 100644 --- a/docs/hugo/content/en/connection_pooler/_index.md +++ b/docs/hugo/content/en/connection_pooler/_index.md @@ -31,6 +31,7 @@ CPO relies on pgBouncer, a popular and above all lightweight open source tool. p - connection_poole.max_db_connections - How many connections the pooler can max hold. This value is divided among the pooler pods. Default is 60 which will make up 30 connections per pod for the default setup with two instances. - connection_pooler.mode - Defines pooler mode. Available Value: `session`, `transaction` or `statement`. Default is `transaction`. - connection_pooler.resources - Hardware definition for the pooler pods +- env: Allows you to add custom environment variables - enableConnectionPooler - Defines whether poolers for read/write access should be created based on the spec.connectionPooler definition. - enableReplicaConnectionPooler- Defines whether poolers for read-only access should be created based on the spec.connectionPooler definition. @@ -38,6 +39,11 @@ CPO relies on pgBouncer, a popular and above all lightweight open source tool. p ``` spec: connectionPooler: + env: + - name: POOLER_ENV + value: 'custom value' + labels: + custom_pooler_label: 'custom value' mode: transaction numberOfInstances: 2 resources: diff --git a/docs/hugo/content/en/crd/crd-postgresql.md b/docs/hugo/content/en/crd/crd-postgresql.md index 92325e82a..895d36000 100644 --- a/docs/hugo/content/en/crd/crd-postgresql.md +++ b/docs/hugo/content/en/crd/crd-postgresql.md @@ -35,8 +35,9 @@ weight: 331 | enableMasterPoolerLoadBalancer | boolean | false | Define whether to enable the load balancer pointing to the primary ConnectionPooler | | enableReplicaPoolerLoadBalancer| boolean | false | Define whether to enable the load balancer pointing to the Replica-ConnectionPooler | | enableShmVolume | boolean | false | Start a database pod without limitations on shm memory. By default Docker limit /dev/shm to 64M (see e.g. the docker issue, which could be not enough if PostgreSQL uses parallel workers heavily. If this option is present and value is true, to the target database pod will be mounted a new tmpfs volume to remove this limitation. | -| [env](#env) | array | false | Allows to add own Envs to the PostgreSQL containers | +| [env](#env) | array | false | Allows you to add custom environment variables to all cluster containers | | [initContainers](#initcontainers) | array | false | Enables the definition of init-containers | +| [labels](#labels) | object | false | Allows you to add custom labels to all cluster pods | | logicalBackupSchedule | string | false | Enables the scheduling of logical backups based on cron-syntax. Example: `30 00 * * *` | | maintenanceWindows | array | false | Enables the definition of maintenance windows for the cluster. Example: `Sat:00:00-04:00` | | masterServiceAnnotations | map | false | Enables the definition of annotations for the Primary Service | @@ -113,6 +114,9 @@ key, operator, value, effect and tolerationSeconds | | Name | Type | required | Description | | ------------------------------ |:-------:| ---------:| ------------------:| +| [env](#env) | array | false | Allows you to add custom environment variables to connection-pooler containers | +| [labels](#labels) | object | false | Allows you to add custom labels to connection-pooler pods | +| dockerImage | string | true | Defines the used pgbouncer container image for this cluster | | numberOfInstances | int | true | Number of Pods per Pooler | | mode | string | true | pooling mode for pgBouncer (session, transaction, statement) | | schema | string | true | Schema for Pooler (Default: pooler) | @@ -149,10 +153,22 @@ key, operator, value, effect and tolerationSeconds | --- +#### labels + +| Name | Type | required | Description | +| ------------------------------ |:-------:| ---------:| ------------------:| +| | string | true | Namefield for the label | +| | string | true | Value for the label | + +{{< back >}} + +--- + #### monitor | Name | Type | required | Description | | ------------------------------ |:-------:| ---------:| ------------------:| +| [env](#env) | array | false | Allows you to add custom environment variables to all expoerter-sidecar containers | | image | string | true | Docker-Image for the metric exporter | {{< back >}} @@ -184,6 +200,8 @@ key, operator, value, effect and tolerationSeconds | | Name | Type | required | Description | | ------------------------------ |:-------:| ---------:| ------------------:| +| [env](#env) | array | false | Allows you to add custom environment variables to all postgresql containers | +| [labels](#labels) | object | false | Allows you to add custom labels to poostgresql pods | | parameters | map | false | PostgreSQL-Parameter as item (Example: max_connections: "100"). For help check out the [CYBERTEC PostgreSQL Configurator](https://pgconfigurator.cybertec.at) | | version | string | false | a map of key-value pairs describing initdb parameters | @@ -401,8 +419,10 @@ key, operator, value, effect and tolerationSeconds | | Name | Type | required | Description | | ------------------------------ |:-------:| ---------:| ------------------:| | [configuration](#configuration)| object | false | Enables the definition of a pgbackrest-setup for the cluster | +| [env](#env) | array | false | Allows you to add custom environment variables to all pgbackrest containers | | global | object | false | | | image | string | true | | +| [labels](#labels) | object | false | Allows you to add custom labels to pgbackrest pods | | [repos](#repos) | array | true | | | [resources](#resources) | object | false | CPU & Memory (Limit & Request) definition for the pgBackRest container| diff --git a/docs/hugo/content/en/customize_cluster/env.md b/docs/hugo/content/en/customize_cluster/env.md new file mode 100644 index 000000000..3f3571f83 --- /dev/null +++ b/docs/hugo/content/en/customize_cluster/env.md @@ -0,0 +1,31 @@ +--- +title: "Environment variables" +date: 2023-12-28T14:26:51+01:00 +draft: false +weight: 1 +--- + +To flexibly manage containers within a cluster, the operator allows environment variables to be defined at various levels. This enables both global settings and specific configurations for individual components. +Hierarchy and Scope +The variables are defined within the Custom Resource (CR). The following logic applies for inheritance and assignment: + +| object | Scope | Description | +| :--- | :--- | :--- | +| `spec.env` | **Global** | These ENVs are inherited by **all** containers within the cluster (PostgreSQL, Backup, Monitoring, etc.). | +| `spec.postgresql.env` | **PostgreSQL** | These ENVs apply exclusively to the **PostgreSQL containers**. | +| `spec.backup.pgbackrest.env` | **pgBackRest** | These ENVs apply exclusively to the **Backup containers**. | +| `spec.monitor.env` | **Exporter-Sidecar** | These ENVs apply exclusively to the **ConnectionPooler containers**. | +| `spec.connectionPooler.env` | **ConnectionPooler** | These ENVs apply exclusively to the **Monitoring sidecars**. | + +{{< hint type=Warning >}}Updating the ENVs triggers a rolling update to the respective containers.{{< /hint >}} + + +### Configuration Logic + +The definition of variables follows the standard Kubernetes schema for key-value pairs. + +```yaml +env: + - name: ENV_NAME + value: ‘value’ +``` \ No newline at end of file diff --git a/docs/hugo/content/en/customize_cluster/labels.md b/docs/hugo/content/en/customize_cluster/labels.md new file mode 100644 index 000000000..f941b83a5 --- /dev/null +++ b/docs/hugo/content/en/customize_cluster/labels.md @@ -0,0 +1,27 @@ +--- +title: "Custom Labels" +date: 2023-12-28T14:26:51+01:00 +draft: false +weight: 2 +--- + +To manage and organise pods flexibly within a cluster, the operator allows labels to be defined at various levels. This enables both global labelling and specific metadata for individual components. Unlike environment variables, labels always refer to the pod as a whole, not to individual containers. + +| object | Scope | Description | +| :--- | :--- | :--- | +| `spec.labels` | **Global** | These labels are adopted by **all** pods within the cluster (**PostgreSQL**, **Backup**, **Pooler**, etc.). | +| `spec.postgresql.labels` | **PostgreSQL** | These labels apply exclusively to the PostgreSQL pods. **PostgreSQL pods**. | +| `spec.backup.pgbackrest.labels` | **pgBackRest** | These labels apply exclusively to the backup pods **pgBackRest pods**. | +| `spec.connectionPooler.labels` | **ConnectionPooler** | These labels apply exclusively to the **ConnectionPooler pods**. | + +{{< hint type=Warning >}}Updating the labels triggers a rolling update to the respective pods.{{< /hint >}} + + +### Configuration Logic + +The definition of labels follows the standard Kubernetes schema for key-value pairs. + +```yaml +labels: + custom_label: ‘value’ +``` \ No newline at end of file diff --git a/docs/hugo/content/en/customize_cluster/sidecars.md b/docs/hugo/content/en/customize_cluster/sidecars.md index babbec38b..3105ff4ce 100644 --- a/docs/hugo/content/en/customize_cluster/sidecars.md +++ b/docs/hugo/content/en/customize_cluster/sidecars.md @@ -2,7 +2,7 @@ title: "Sidecars" date: 2023-12-28T14:26:51+01:00 draft: false -weight: 1 +weight: 2 --- Starting with the Single-Node-Cluster from the previous section, we want to modify the Instance a bit to see. ## CPU and Memory diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go index bf7a39341..9ffcf2977 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go @@ -338,6 +338,26 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ "user": { Type: "string", }, + "env": { + Type: "array", + Nullable: true, + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + XPreserveUnknownFields: util.True(), + }, + }, + }, + "labels": { + Type: "object", + Nullable: true, + AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ + Allows: true, + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, }, }, "databases": { @@ -385,6 +405,16 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ }, }, }, + "labels": { + Type: "object", + Nullable: true, + AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ + Allows: true, + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, "init_containers": { Type: "array", Description: "deprecated", @@ -759,6 +789,26 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ }, }, }, + "env": { + Type: "array", + Nullable: true, + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + XPreserveUnknownFields: util.True(), + }, + }, + }, + "labels": { + Type: "object", + Nullable: true, + AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ + Allows: true, + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, }, }, "preparedDatabases": { @@ -1433,6 +1483,26 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ }, }, }, + "env": { + Type: "array", + Nullable: true, + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + XPreserveUnknownFields: util.True(), + }, + }, + }, + "labels": { + Type: "object", + Nullable: true, + AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ + Allows: true, + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, }, }, }, @@ -1465,6 +1535,16 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ "image": { Type: "string", }, + "env": { + Type: "array", + Nullable: true, + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + XPreserveUnknownFields: util.True(), + }, + }, + }, }, }, }, diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go index 51387778c..33c302030 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go @@ -89,13 +89,14 @@ type PostgresSpec struct { AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"` Streams []Stream `json:"streams,omitempty"` Env []v1.EnvVar `json:"env,omitempty"` + Labels map[string]string `json:"labels,omitempty" name:"labels" default:""` + Backup *Backup `json:"backup,omitempty"` + TDE *TDE `json:"tde,omitempty"` + Monitoring *Monitoring `json:"monitor,omitempty"` // deprecated json tags InitContainersOld []v1.Container `json:"init_containers,omitempty"` PodPriorityClassNameOld string `json:"pod_priority_class_name,omitempty"` - Backup *Backup `json:"backup,omitempty"` - TDE *TDE `json:"tde,omitempty"` - Monitoring *Monitoring `json:"monitor,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -154,6 +155,8 @@ type AdditionalVolume struct { type PostgresqlParam struct { PgVersion string `json:"version"` Parameters map[string]string `json:"parameters,omitempty"` + Env []v1.EnvVar `json:"env,omitempty"` + Labels map[string]string `json:"labels,omitempty" name:"labels" default:""` } // ResourceDescription describes CPU and memory resources defined for a cluster. @@ -246,12 +249,14 @@ type PostgresStatus struct { // makes sense to expose. E.g. pool size (min/max boundaries), max client // connections etc. type ConnectionPooler struct { - NumberOfInstances *int32 `json:"numberOfInstances,omitempty"` - Schema string `json:"schema,omitempty"` - User string `json:"user,omitempty"` - Mode string `json:"mode,omitempty"` - DockerImage string `json:"dockerImage,omitempty"` - MaxDBConnections *int32 `json:"maxDBConnections,omitempty"` + NumberOfInstances *int32 `json:"numberOfInstances,omitempty"` + Schema string `json:"schema,omitempty"` + User string `json:"user,omitempty"` + Mode string `json:"mode,omitempty"` + DockerImage string `json:"dockerImage,omitempty"` + MaxDBConnections *int32 `json:"maxDBConnections,omitempty"` + Env []v1.EnvVar `json:"env,omitempty"` + Labels map[string]string `json:"labels,omitempty" name:"labels" default:""` *Resources `json:"resources,omitempty"` } @@ -285,6 +290,8 @@ type Pgbackrest struct { Restore Restore `json:"restore"` Configuration Configuration `json:"configuration"` Resources *Resources `json:"resources,omitempty"` + Env []v1.EnvVar `json:"env,omitempty"` + Labels map[string]string `json:"labels,omitempty" name:"labels" default:""` } type PgbackrestClone struct { @@ -323,7 +330,8 @@ type TDE struct { // Monitoring Sidecar defines a container to be run in the same pod as the Postgres container. type Monitoring struct { - Image string `json:"image,omitempty"` + Image string `json:"image,omitempty"` + Env []v1.EnvVar `json:"env,omitempty"` } // Multisite enables cross Kubernetes replication coordinated via etcd diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1ee54d758..d3db6146f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -526,8 +526,8 @@ func (c *Cluster) compareStatefulSetWith(oldSts, newSts *appsv1.StatefulSet) *co reasons := make([]string, 0) var match, needsRollUpdate, needsReplace bool - match = true + //TODO: improve me if *oldSts.Spec.Replicas != *newSts.Spec.Replicas { match = false @@ -593,11 +593,10 @@ func (c *Cluster) compareStatefulSetWith(oldSts, newSts *appsv1.StatefulSet) *co reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one") } - // Some generated fields like creationTimestamp make it not possible to use DeepCompare on Spec.Template.ObjectMeta if !reflect.DeepEqual(oldSts.Spec.Template.Labels, newSts.Spec.Template.Labels) { - needsReplace = true + match = false needsRollUpdate = true - reasons = append(reasons, "new statefulset's metadata labels does not match the current one") + reasons = append(reasons, "new statefulset's pod template labels do not match the current one") } if (oldSts.Spec.Selector != nil) && (newSts.Spec.Selector != nil) { if !reflect.DeepEqual(oldSts.Spec.Selector.MatchLabels, newSts.Spec.Selector.MatchLabels) { @@ -1020,6 +1019,32 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { syncStatefulSet = true } + // Label-check for pg-pods + pgLabelsChanged := !reflect.DeepEqual(oldSpec.Spec.Labels, newSpec.Spec.Labels) || + !reflect.DeepEqual(oldSpec.Spec.PostgresqlParam.Labels, newSpec.Spec.PostgresqlParam.Labels) + + if pgLabelsChanged { + c.logger.Infof("Labels for Postgres changed, forcing StatefulSet sync") + syncStatefulSet = true + } + + // Label-check for pgbackrest-pods + var oldRepoL, newRepoL map[string]string + + if oldSpec.Spec.Backup != nil && oldSpec.Spec.Backup.Pgbackrest != nil { + oldRepoL = oldSpec.Spec.Backup.Pgbackrest.Labels + } + if newSpec.Spec.Backup != nil && newSpec.Spec.Backup.Pgbackrest != nil { + newRepoL = newSpec.Spec.Backup.Pgbackrest.Labels + } + + repoLabelsChanged := !reflect.DeepEqual(oldSpec.Spec.Labels, newSpec.Spec.Labels) || + !reflect.DeepEqual(oldRepoL, newRepoL) + + if repoLabelsChanged { + c.logger.Infof("Labels for pgBackRest changed, forcing Statefulset and Cronjob sync") + } + //sync sts when there is a change in the pgbackrest secret, since we need to mount this if newSpec.Spec.Backup != nil && oldSpec.Spec.Backup != nil && newSpec.Spec.Backup.Pgbackrest != nil && oldSpec.Spec.Backup.Pgbackrest != nil && @@ -1029,7 +1054,16 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { // Pgbackrest backup job func() { - if specHasPgbackrestPVCRepo(&newSpec.Spec) || specHasPgbackrestPVCRepo(&oldSpec.Spec) { + + repoLabelsChanged := !reflect.DeepEqual(oldSpec.Spec.Labels, newSpec.Spec.Labels) + if oldSpec.Spec.Backup != nil && newSpec.Spec.Backup != nil && + oldSpec.Spec.Backup.Pgbackrest != nil && newSpec.Spec.Backup.Pgbackrest != nil { + if !reflect.DeepEqual(oldSpec.Spec.Backup.Pgbackrest.Labels, newSpec.Spec.Backup.Pgbackrest.Labels) { + repoLabelsChanged = true + } + } + + if specHasPgbackrestPVCRepo(&newSpec.Spec) || specHasPgbackrestPVCRepo(&oldSpec.Spec) || repoLabelsChanged { if err := c.syncPgbackrestRepoHostConfig(&newSpec.Spec); err != nil { updateFailed = true return @@ -1044,10 +1078,12 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { } c.logger.Info("a pgbackrest config has been successfully created") - if err := c.syncPgbackrestJob(false); err != nil { - err = fmt.Errorf("could not create a k8s cron job for pgbackrest: %v", err) - updateFailed = true - return + if repoLabelsChanged || !reflect.DeepEqual(oldSpec.Spec.Backup, newSpec.Spec.Backup) { + if err := c.syncPgbackrestJob(false); err != nil { + err = fmt.Errorf("could not create a k8s cron job for pgbackrest: %v", err) + updateFailed = true + return + } } c.logger.Info("a k8s cron job for pgbackrest has been successfully created") } else if oldSpec.Spec.GetBackup().Pgbackrest != nil { @@ -1113,13 +1149,6 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { c.logger.Errorf("could not sync statefulsets: %v", err) updateFailed = true } - // TODO: avoid generating the StatefulSet object twice by passing it to syncStatefulSet - if err := c.syncStatefulSet(); err != nil { - c.logger.Errorf("could not sync statefulsets: %v", err) - updateFailed = true - return - } - } }() diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 27a310efb..63fb5dfe7 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -3,6 +3,7 @@ package cluster import ( "context" "fmt" + "reflect" "strings" "time" @@ -110,9 +111,9 @@ func (c *Cluster) poolerUser(spec *cpov1.PostgresSpec) string { } // when listing pooler k8s objects -func (c *Cluster) poolerLabelsSet(addExtraLabels bool) labels.Set { +func (c *Cluster) poolerLabelsSet(addExtraLabels bool, isPod bool) labels.Set { - poolerLabels := c.labelsSet(addExtraLabels) + poolerLabels := c.labelsSetWithType(addExtraLabels, TYPE_POOLER, isPod) // TODO should be config values poolerLabels["application"] = "db-connection-pooler" return poolerLabels @@ -124,8 +125,8 @@ func (c *Cluster) poolerLabelsSet(addExtraLabels bool) labels.Set { // have e.g. different `application` label, so that recreatePod operation will // not interfere with it (it lists all the pods via labels, and if there would // be no difference, it will recreate also pooler pods). -func (c *Cluster) connectionPoolerLabels(role PostgresRole, addExtraLabels bool) *metav1.LabelSelector { - poolerLabelsSet := c.poolerLabelsSet(addExtraLabels) +func (c *Cluster) connectionPoolerLabels(addExtraLabels bool, role PostgresRole, isPod bool) *metav1.LabelSelector { + poolerLabelsSet := c.poolerLabelsSet(addExtraLabels, isPod) // TODO should be config values poolerLabelsSet["connection-pooler"] = c.connectionPoolerName(role) @@ -218,7 +219,7 @@ func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar { minSize := defaultSize / 2 reserveSize := minSize - return []v1.EnvVar{ + envVars := []v1.EnvVar{ { Name: "CONNECTION_POOLER_PORT", Value: fmt.Sprint(pgPort), @@ -248,6 +249,17 @@ func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar { Value: fmt.Sprint(maxDBConn), }, } + + // fetch connection_pooler-specific variables that will override all subsequent global variables + if len(connectionPoolerSpec.Env) > 0 { + envVars = appendEnvVars(envVars, connectionPoolerSpec.Env...) + } + // fetch cluster-specific variables that will override all subsequent global variables + if len(spec.Env) > 0 { + envVars = appendEnvVars(envVars, spec.Env...) + } + + return envVars } func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) ( @@ -399,7 +411,7 @@ func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) ( podTemplate := &v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: c.connectionPoolerLabels(role, true).MatchLabels, + Labels: c.connectionPoolerLabels(true, role, true).MatchLabels, Namespace: c.Namespace, Annotations: c.annotationsSet(c.generatePodAnnotations(spec)), }, @@ -416,7 +428,7 @@ func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) ( nodeAffinity := c.nodeAffinity(c.OpConfig.NodeReadinessLabel, spec.NodeAffinity) if c.OpConfig.EnablePodAntiAffinity { - labelsSet := labels.Set(c.connectionPoolerLabels(role, false).MatchLabels) + labelsSet := labels.Set(c.connectionPoolerLabels(false, role, false).MatchLabels) podTemplate.Spec.Affinity = podAffinity( labelsSet, c.OpConfig.PodAntiAffinityTopologyKey, @@ -469,7 +481,7 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio ObjectMeta: metav1.ObjectMeta{ Name: connectionPooler.Name, Namespace: connectionPooler.Namespace, - Labels: c.connectionPoolerLabels(connectionPooler.Role, true).MatchLabels, + Labels: c.connectionPoolerLabels(true, connectionPooler.Role, false).MatchLabels, Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)), // make StatefulSet object its owner to represent the dependency. // By itself StatefulSet is being deleted with "Orphaned" @@ -481,7 +493,7 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio }, Spec: appsv1.DeploymentSpec{ Replicas: numberOfInstances, - Selector: c.connectionPoolerLabels(connectionPooler.Role, false), + Selector: c.connectionPoolerLabels(false, connectionPooler.Role, false), Template: *podTemplate, }, } @@ -514,7 +526,7 @@ func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPo ObjectMeta: metav1.ObjectMeta{ Name: connectionPooler.Name, Namespace: connectionPooler.Namespace, - Labels: c.connectionPoolerLabels(connectionPooler.Role, false).MatchLabels, + Labels: c.connectionPoolerLabels(false, connectionPooler.Role, true).MatchLabels, Annotations: c.annotationsSet(c.generatePoolerServiceAnnotations(poolerRole, spec)), // make StatefulSet object its owner to represent the dependency. // By itself StatefulSet is being deleted with "Orphaned" @@ -665,31 +677,21 @@ func (c *Cluster) deleteConnectionPoolerSecret() (err error) { return nil } -// Perform actual patching of a connection pooler deployment, assuming that all +// Perform updating the connection pooler deployment, assuming that all // the check were already done before. func updateConnectionPoolerDeployment(KubeClient k8sutil.KubernetesClient, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { if newDeployment == nil { return nil, fmt.Errorf("there is no connection pooler in the cluster") } - patchData, err := specPatch(newDeployment.Spec) - if err != nil { - return nil, fmt.Errorf("could not form patch for the connection pooler deployment: %v", err) - } - - // An update probably requires RetryOnConflict, but since only one operator - // worker at one time will try to update it chances of conflicts are - // minimal. deployment, err := KubeClient. - Deployments(newDeployment.Namespace).Patch( + Deployments(newDeployment.Namespace).Update( context.TODO(), - newDeployment.Name, - types.MergePatchType, - patchData, - metav1.PatchOptions{}, - "") + newDeployment, + metav1.UpdateOptions{}) + if err != nil { - return nil, fmt.Errorf("could not patch connection pooler deployment: %v", err) + return nil, fmt.Errorf("could not update connection pooler deployment: %v", err) } return deployment, nil @@ -998,47 +1000,79 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *cpov1.Postgresql, c.ConnectionPooler[role].Deployment = deployment // actual synchronization - var oldConnectionPooler *cpov1.ConnectionPooler - - if oldSpec != nil { - oldConnectionPooler = oldSpec.Spec.ConnectionPooler + desired, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + if err != nil { + return NoSync, fmt.Errorf("could not generate desired deployment: %v", err) } - newConnectionPooler := newSpec.Spec.ConnectionPooler - // sync implementation below assumes that both old and new specs are - // not nil, but it can happen. To avoid any confusion like updating a - // deployment because the specification changed from nil to an empty - // struct (that was initialized somewhere before) replace any nil with - // an empty spec. - if oldConnectionPooler == nil { - oldConnectionPooler = &cpov1.ConnectionPooler{} - } + // Check if replacement is needed because of selector changes + if !reflect.DeepEqual(deployment.Spec.Selector, desired.Spec.Selector) { + c.logger.Warningf("selector changed for connection pooler %s, recreating deployment", deployment.Name) + + policy := metav1.DeletePropagationForeground + options := metav1.DeleteOptions{PropagationPolicy: &policy} + + err := c.KubeClient.Deployments(c.Namespace).Delete(context.TODO(), deployment.Name, options) + if err != nil && !k8sutil.ResourceNotFound(err) { + return NoSync, fmt.Errorf("could not delete pooler deployment for recreation: %v", err) + } + + c.logger.Debugf("waiting for the pooler deployment %s to be deleted", deployment.Name) + err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + func() (bool, error) { + _, err2 := c.KubeClient.Deployments(c.Namespace).Get(context.TODO(), deployment.Name, metav1.GetOptions{}) + if err2 == nil { + return false, nil + } + if k8sutil.ResourceNotFound(err2) { + return true, nil + } + return false, err2 + }) + + if err != nil { + return NoSync, fmt.Errorf("timeout waiting for pooler deployment deletion: %v", err) + } + + deployment, err = c.KubeClient. + Deployments(desired.Namespace). + Create(context.TODO(), desired, metav1.CreateOptions{}) + + if err != nil { + return NoSync, fmt.Errorf("could not recreate pooler deployment: %v", err) + } + + c.ConnectionPooler[role].Deployment = deployment + c.logger.Infof("successfully recreated pooler deployment %s with new selector", deployment.Name) - if newConnectionPooler == nil { - newConnectionPooler = &cpov1.ConnectionPooler{} } var specSync bool var specReason []string + labelsSync := !reflect.DeepEqual(deployment.Labels, desired.Labels) + if labelsSync { + syncReason = append(syncReason, "labels changed") + } + if oldSpec != nil { - specSync, specReason = needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler, c.logger) + specSync, specReason = needSyncConnectionPoolerSpecs(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler, c.logger) syncReason = append(syncReason, specReason...) } - defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(&c.Config, newConnectionPooler, deployment) + defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(&c.Config, newSpec.Spec.ConnectionPooler, deployment) syncReason = append(syncReason, defaultsReason...) - if specSync || defaultsSync { - c.logger.Infof("update connection pooler deployment %s, reason: %+v", - c.connectionPoolerName(role), syncReason) - newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) - if err != nil { - return syncReason, fmt.Errorf("could not generate deployment for connection pooler: %v", err) - } + // to ensure we're also fetching global-label changes + templateSync := !reflect.DeepEqual(deployment.Spec.Template.Labels, desired.Spec.Template.Labels) + if templateSync { + syncReason = append(syncReason, "pod template labels changed") + } - deployment, err = updateConnectionPoolerDeployment(c.KubeClient, newDeployment) + if labelsSync || specSync || defaultsSync || templateSync { + c.logger.Infof("update connection pooler deployment %s, reason: %+v", c.connectionPoolerName(role), syncReason) + deployment, err = updateConnectionPoolerDeployment(c.KubeClient, desired) if err != nil { return syncReason, err } @@ -1057,7 +1091,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *cpov1.Postgresql, // check if pooler pods must be replaced due to secret update listOptions := metav1.ListOptions{ - LabelSelector: labels.Set(c.connectionPoolerLabels(role, true).MatchLabels).String(), + LabelSelector: labels.Set(c.connectionPoolerLabels(true, role, false).MatchLabels).String(), } pods, err = c.listPoolerPods(listOptions) if err != nil { diff --git a/pkg/cluster/connection_pooler_test.go b/pkg/cluster/connection_pooler_test.go index 3e9c9204d..50b2da52c 100644 --- a/pkg/cluster/connection_pooler_test.go +++ b/pkg/cluster/connection_pooler_test.go @@ -65,7 +65,7 @@ func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { } for _, role := range []PostgresRole{Master, Replica} { - poolerLabels := cluster.poolerLabelsSet(false) + poolerLabels := cluster.poolerLabelsSet(false, false) poolerLabels["application"] = "db-connection-pooler" poolerLabels["connection-pooler"] = cluster.connectionPoolerName(role) @@ -86,7 +86,7 @@ func MasterObjectsAreSaved(cluster *Cluster, err error, reason SyncReason) error return fmt.Errorf("Connection pooler resources are empty") } - poolerLabels := cluster.poolerLabelsSet(false) + poolerLabels := cluster.poolerLabelsSet(false, false) poolerLabels["application"] = "db-connection-pooler" poolerLabels["connection-pooler"] = cluster.connectionPoolerName(Master) @@ -106,7 +106,7 @@ func ReplicaObjectsAreSaved(cluster *Cluster, err error, reason SyncReason) erro return fmt.Errorf("Connection pooler resources are empty") } - poolerLabels := cluster.poolerLabelsSet(false) + poolerLabels := cluster.poolerLabelsSet(false, false) poolerLabels["application"] = "db-connection-pooler" poolerLabels["connection-pooler"] = cluster.connectionPoolerName(Replica) @@ -924,9 +924,9 @@ func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresR func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { poolerLabels := podSpec.ObjectMeta.Labels["connection-pooler"] - if poolerLabels != cluster.connectionPoolerLabels(role, true).MatchLabels["connection-pooler"] { + if poolerLabels != cluster.connectionPoolerLabels(true, role, true).MatchLabels["connection-pooler"] { return fmt.Errorf("Pod labels do not match, got %+v, expected %+v", - podSpec.ObjectMeta.Labels, cluster.connectionPoolerLabels(role, true).MatchLabels) + podSpec.ObjectMeta.Labels, cluster.connectionPoolerLabels(true, role, true).MatchLabels) } return nil @@ -934,7 +934,7 @@ func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error { labels := deployment.Spec.Selector.MatchLabels - expected := cluster.connectionPoolerLabels(Master, true).MatchLabels + expected := cluster.connectionPoolerLabels(true, Master, false).MatchLabels if labels["connection-pooler"] != expected["connection-pooler"] { return fmt.Errorf("Labels are incorrect, got %+v, expected %+v", diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index cccc7ee3d..7f7d7df6d 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -932,7 +932,7 @@ func (c *Cluster) generatePodTemplate( } // generatePodEnvVars generates environment variables for the Spilo Pod -func (c *Cluster) generateSpiloPodEnvVars( +func (c *Cluster) generatePostgresContainerEnvVars( spec *cpov1.PostgresSpec, uid types.UID, spiloConfiguration string) ([]v1.EnvVar, error) { @@ -1081,6 +1081,10 @@ func (c *Cluster) generateSpiloPodEnvVars( envVars = append(envVars, v1.EnvVar{Name: "KUBERNETES_USE_CONFIGMAPS", Value: "true"}) } + // fetch postgres-specific variables that will override all subsequent global variables + if len(spec.PostgresqlParam.Env) > 0 { + envVars = appendEnvVars(envVars, spec.PostgresqlParam.Env...) + } // fetch cluster-specific variables that will override all subsequent global variables if len(spec.Env) > 0 { envVars = appendEnvVars(envVars, spec.Env...) @@ -1153,8 +1157,9 @@ func (c *Cluster) generateSpiloPodEnvVars( } // generatePodEnvVars generates environment variables for the Spilo Pod -func (c *Cluster) generatepgBackRestPodEnvVars() []v1.EnvVar { - return []v1.EnvVar{ +func (c *Cluster) generatepgBackRestPodEnvVars(spec *cpov1.PostgresSpec) ([]v1.EnvVar, error) { + + envVars := []v1.EnvVar{ { Name: "USE_PGBACKREST", Value: "true", @@ -1164,6 +1169,17 @@ func (c *Cluster) generatepgBackRestPodEnvVars() []v1.EnvVar { Value: "repo", }, } + + // fetch pgbackrest-specific variables that will override all subsequent global variables + if len(spec.Backup.Pgbackrest.Env) > 0 { + envVars = appendEnvVars(envVars, spec.Backup.Pgbackrest.Env...) + } + // fetch cluster-specific variables that will override all subsequent global variables + if len(spec.Env) > 0 { + envVars = appendEnvVars(envVars, spec.Env...) + } + + return envVars, nil } func copyEnvVars(envs []v1.EnvVar) []v1.EnvVar { @@ -1435,10 +1451,10 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu return nil, fmt.Errorf("could not generate Spilo JSON configuration: %v", err) } - // generate environment variables for the spilo container - spiloEnvVars, err := c.generateSpiloPodEnvVars(spec, c.Postgresql.GetUID(), spiloConfiguration) + // generate environment variables for the postgres container + spiloEnvVars, err := c.generatePostgresContainerEnvVars(spec, c.Postgresql.GetUID(), spiloConfiguration) if err != nil { - return nil, fmt.Errorf("could not generate Spilo env vars: %v", err) + return nil, fmt.Errorf("could not generate Postgres-Container env vars: %v", err) } // pickup the docker image for the spilo container @@ -1626,7 +1642,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu // generate pod template for the statefulset, based on the spilo container and sidecars podTemplate, err = c.generatePodTemplate( c.Namespace, - c.labelsSetWithType(true, TYPE_POSTGRESQL), + c.labelsSetWithType(true, TYPE_POSTGRESQL, true), c.annotationsSet(podAnnotations), spiloContainer, initContainers, @@ -1695,7 +1711,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu ObjectMeta: metav1.ObjectMeta{ Name: c.statefulSetName(), Namespace: c.Namespace, - Labels: c.labelsSetWithType(true, TYPE_POSTGRESQL), + Labels: c.labelsSetWithType(true, TYPE_POSTGRESQL, false), Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)), OwnerReferences: c.createOwnerReference(), }, @@ -1779,6 +1795,7 @@ func (c *Cluster) generatePgbackrestRestoreContainer(spec *cpov1.PostgresSpec, r }, }, } + if repo_host_mode { pgbackrestRestoreEnvVars = appendEnvVars( pgbackrestRestoreEnvVars, v1.EnvVar{ @@ -1800,6 +1817,15 @@ func (c *Cluster) generatePgbackrestRestoreContainer(spec *cpov1.PostgresSpec, r }) } + // fetch pgbackrest-specific variables that will override all subsequent global variables + if len(spec.Backup.Pgbackrest.Env) > 0 { + pgbackrestRestoreEnvVars = appendEnvVars(pgbackrestRestoreEnvVars, spec.Backup.Pgbackrest.Env...) + } + // fetch cluster-specific variables that will override all subsequent global variables + if len(spec.Env) > 0 { + pgbackrestRestoreEnvVars = appendEnvVars(pgbackrestRestoreEnvVars, spec.Env...) + } + return v1.Container{ Name: constants.RestoreContainerName, Image: spec.Backup.Pgbackrest.Image, @@ -1834,7 +1860,10 @@ func (c *Cluster) generateRepoHostStatefulSet(spec *cpov1.PostgresSpec) (*appsv1 } // generate environment variables for the spilo container - repoEnvVars := c.generatepgBackRestPodEnvVars() + repoEnvVars, err := c.generatepgBackRestPodEnvVars(spec) + if err != nil { + return nil, fmt.Errorf("could not generate pgBackRest-RepoHost env vars: %v", err) + } // determine the User, Group and FSGroup for the spilo pod effectiveRunAsUser := c.OpConfig.Resources.SpiloRunAsUser @@ -1880,12 +1909,11 @@ func (c *Cluster) generateRepoHostStatefulSet(spec *cpov1.PostgresSpec) (*appsv1 effectivePodPriorityClassName := util.Coalesce(spec.PodPriorityClassName, c.OpConfig.PodPriorityClassName) podAnnotations := c.generatePodAnnotations(spec) - repoHostLabels := c.labelsSetWithType(true, TYPE_REPOSITORY) // generate pod template for the statefulset, based on the spilo container and sidecars podTemplate, err = c.generatePodTemplate( c.Namespace, - repoHostLabels, + c.labelsSetWithType(true, TYPE_REPOSITORY, true), c.annotationsSet(podAnnotations), repoContainer, initContainers, @@ -1957,7 +1985,7 @@ func (c *Cluster) generateRepoHostStatefulSet(spec *cpov1.PostgresSpec) (*appsv1 ObjectMeta: metav1.ObjectMeta{ Name: c.getPgbackrestRepoHostName(), Namespace: c.Namespace, - Labels: repoHostLabels, + Labels: c.labelsSetWithType(true, TYPE_REPOSITORY, false), Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)), OwnerReferences: c.createOwnerReference(), }, @@ -2516,7 +2544,7 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) lbls := c.labelsSet(true) if username == constants.ConnectionPoolerUserName { - lbls = c.connectionPoolerLabels("", false).MatchLabels + lbls = c.connectionPoolerLabels(false, "", false).MatchLabels } secret := v1.Secret{ @@ -2845,7 +2873,7 @@ func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { Spec: policyv1.PodDisruptionBudgetSpec{ MinAvailable: &minAvailable, Selector: &metav1.LabelSelector{ - MatchLabels: c.labelsSetWithType(false, "postgresql"), //c.roleLabelsSet(false, Master), + MatchLabels: c.labelsSetWithType(false, "postgresql", false), //c.roleLabelsSet(false, Master), }, }, } @@ -2909,7 +2937,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) { // re-use the method that generates DB pod templates if podTemplate, err = c.generatePodTemplate( c.Namespace, - c.labelsSetWithType(true, TYPE_LOGICAL_BACKUP), + c.labelsSetWithType(true, TYPE_LOGICAL_BACKUP, false), annotations, logicalBackupContainer, []v1.Container{}, @@ -2962,7 +2990,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) { ObjectMeta: metav1.ObjectMeta{ Name: c.getLogicalBackupJobName(), Namespace: c.Namespace, - Labels: c.labelsSetWithType(true, TYPE_LOGICAL_BACKUP), + Labels: c.labelsSetWithType(true, TYPE_LOGICAL_BACKUP, false), Annotations: c.annotationsSet(nil), }, Spec: batchv1.CronJobSpec{ @@ -3114,8 +3142,8 @@ func (c *Cluster) getMonitoringSecretName() string { "tprgroup", cpo.GroupName) } -func (c *Cluster) generateMonitoringEnvVars() []v1.EnvVar { - env := []v1.EnvVar{ +func (c *Cluster) generateMonitoringEnvVars(spec *cpov1.PostgresSpec, monitor *cpov1.Monitoring) []v1.EnvVar { + envVars := []v1.EnvVar{ { Name: "DATA_SOURCE_URI", Value: "localhost:5432/postgres?sslmode=disable", @@ -3136,7 +3164,17 @@ func (c *Cluster) generateMonitoringEnvVars() []v1.EnvVar { }, }, } - return env + + // fetch monitoring-specific variables that will override all subsequent global variables + if len(monitor.Env) > 0 { + envVars = appendEnvVars(envVars, monitor.Env...) + } + // fetch cluster-specific variables that will override all subsequent global variables + if len(spec.Env) > 0 { + envVars = appendEnvVars(envVars, spec.Env...) + } + + return envVars } func (c *Cluster) getPgbackrestRestoreConfigmapName() (jobName string) { @@ -3393,7 +3431,7 @@ func renderPgbackrestConfig(config map[string]map[string]string) (string, error) return out.String(), nil } -func (c *Cluster) generatePgbackrestJob(backup *cpov1.Pgbackrest, repo *cpov1.Repo, backupType string, schedule string) (*batchv1.CronJob, error) { +func (c *Cluster) generatePgbackrestJob(spec *cpov1.PostgresSpec, backup *cpov1.Pgbackrest, repo *cpov1.Repo, backupType string, schedule string) (*batchv1.CronJob, error) { var ( err error @@ -3409,7 +3447,7 @@ func (c *Cluster) generatePgbackrestJob(backup *cpov1.Pgbackrest, repo *cpov1.Re emptyResourceRequirements := v1.ResourceRequirements{} resourceRequirements = &emptyResourceRequirements - envVars := c.generatePgbackrestBackupJobEnvVars(repo, backupType) + envVars := c.generatePgbackrestBackupJobEnvVars(spec, repo, backupType) pgbackrestContainer := generateContainer( constants.BackupContainerName, &c.Postgresql.Spec.Backup.Pgbackrest.Image, @@ -3443,7 +3481,7 @@ func (c *Cluster) generatePgbackrestJob(backup *cpov1.Pgbackrest, repo *cpov1.Re // re-use the method that generates DB pod templates if podTemplate, err = c.generatePodTemplate( c.Namespace, - c.labelsSetWithType(true, TYPE_BACKUP_JOB), + c.labelsSetWithType(true, TYPE_BACKUP_JOB, true), annotations, pgbackrestContainer, []v1.Container{}, @@ -3495,7 +3533,7 @@ func (c *Cluster) generatePgbackrestJob(backup *cpov1.Pgbackrest, repo *cpov1.Re ObjectMeta: metav1.ObjectMeta{ Name: c.getPgbackrestJobName(repo.Name, backupType), Namespace: c.Namespace, - Labels: c.labelsSetWithType(true, TYPE_BACKUP_JOB), + Labels: c.labelsSetWithType(true, TYPE_BACKUP_JOB, false), Annotations: c.annotationsSet(nil), }, Spec: batchv1.CronJobSpec{ @@ -3508,13 +3546,13 @@ func (c *Cluster) generatePgbackrestJob(backup *cpov1.Pgbackrest, repo *cpov1.Re return cronJob, nil } -func (c *Cluster) generatePgbackrestBackupJobEnvVars(repo *cpov1.Repo, backupType string) []v1.EnvVar { +func (c *Cluster) generatePgbackrestBackupJobEnvVars(spec *cpov1.PostgresSpec, repo *cpov1.Repo, backupType string) []v1.EnvVar { selector := c.roleLabelsSet(false, Master).String() targetContainer := constants.PostgresContainerName if repo.Storage == "pvc" { // With a PVC based repo the backup command needs to run on the repository system // due to pgbackrest limitations - selector = c.labelsSetWithType(false, TYPE_REPOSITORY).String() + selector = c.labelsSetWithType(false, TYPE_REPOSITORY, false).String() targetContainer = constants.RepoContainerName } @@ -3540,6 +3578,16 @@ func (c *Cluster) generatePgbackrestBackupJobEnvVars(repo *cpov1.Repo, backupTyp Value: selector, }, } + + // fetch pgbackrest-specific variables that will override all subsequent global variables + if len(spec.Backup.Pgbackrest.Env) > 0 { + envVars = appendEnvVars(envVars, spec.Backup.Pgbackrest.Env...) + } + // fetch cluster-specific variables that will override all subsequent global variables + if len(spec.Env) > 0 { + envVars = appendEnvVars(envVars, spec.Env...) + } + return envVars } diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index a57be9517..36225d883 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -517,7 +517,7 @@ func testEnvs(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) return nil } -func TestGenerateSpiloPodEnvVars(t *testing.T) { +func TestGeneratePostgresContainerEnvVars(t *testing.T) { var dummyUUID = "efd12e58-5786-11e8-b5a7-06148230260c" expectedClusterNameLabel := []ExpectedValue{ @@ -959,7 +959,7 @@ func TestGenerateSpiloPodEnvVars(t *testing.T) { pgsql.Spec.StandbyCluster = tt.standbyDescription c.Postgresql = pgsql - actualEnvs, err := c.generateSpiloPodEnvVars(&pgsql.Spec, types.UID(dummyUUID), exampleSpiloConfig) + actualEnvs, err := c.generatePostgresContainerEnvVars(&pgsql.Spec, types.UID(dummyUUID), exampleSpiloConfig) assert.NoError(t, err) for _, ev := range tt.expectedValues { diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index af6a30635..b24d4734a 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -34,7 +34,7 @@ func (c *Cluster) listPods() ([]v1.Pod, error) { func (c *Cluster) listPodsOfType(podType PodType) ([]v1.Pod, error) { listOptions := metav1.ListOptions{ - LabelSelector: c.labelsSetWithType(false, podType).String(), + LabelSelector: c.labelsSetWithType(false, podType, false).String(), } pods, err := c.KubeClient.Pods(c.Namespace).List(context.TODO(), listOptions) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index b5122ccbe..e676d37c8 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -88,7 +88,7 @@ func (c *Cluster) generateExporterSidecar() *cpov1.Sidecar { Protocol: v1.ProtocolTCP, }, }, - Env: c.generateMonitoringEnvVars(), + Env: c.generateMonitoringEnvVars(&c.Postgresql.Spec, monitor), SecurityContext: &v1.SecurityContext{ AllowPrivilegeEscalation: c.OpConfig.Resources.SpiloAllowPrivilegeEscalation, Privileged: &c.OpConfig.Resources.SpiloPrivileged, @@ -241,25 +241,37 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *appsv1.StatefulSet) error { c.logger.Warningf("could not scale down: %v", err) } } - c.logger.Debugf("updating statefulset") - patchData, err := specPatch(newStatefulSet.Spec) + currentSts, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Get( + context.TODO(), + c.Statefulset.Name, + metav1.GetOptions{}, + ) if err != nil { - return fmt.Errorf("could not form patch for the statefulset %q: %v", statefulSetName, err) + return fmt.Errorf("could not get current statefulset %q: %v", statefulSetName, err) } - statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( + c.logger.Debugf("updating statefulset %q via full update to sync labels", statefulSetName) + + currentSts.Labels = newStatefulSet.Labels + currentSts.Annotations = newStatefulSet.Annotations + + currentSts.Spec.Replicas = newStatefulSet.Spec.Replicas + currentSts.Spec.Template = newStatefulSet.Spec.Template + currentSts.Spec.UpdateStrategy = newStatefulSet.Spec.UpdateStrategy + currentSts.Spec.PodManagementPolicy = newStatefulSet.Spec.PodManagementPolicy + currentSts.Spec.PersistentVolumeClaimRetentionPolicy = newStatefulSet.Spec.PersistentVolumeClaimRetentionPolicy + + updatedSts, err := c.KubeClient.StatefulSets(currentSts.Namespace).Update( context.TODO(), - c.Statefulset.Name, - types.MergePatchType, - patchData, - metav1.PatchOptions{}, - "") + currentSts, + metav1.UpdateOptions{}, + ) if err != nil { - return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err) + return fmt.Errorf("could not update statefulset spec %q: %v", statefulSetName, err) } - c.Statefulset = statefulSet + c.Statefulset = updatedSts return nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 7ebcc8b82..351a2def0 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -23,6 +23,7 @@ import ( "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util" "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/constants" "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/k8sutil" + "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/retryutil" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" @@ -172,6 +173,7 @@ func generateSerialNumber() (*big.Int, error) { // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them. func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { var err error + var syncErrors []error c.mu.Lock() defer c.mu.Unlock() @@ -187,6 +189,26 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { } }() + // Label-check for pg-pods + pgLabelsChanged := !reflect.DeepEqual(oldSpec.Spec.Labels, newSpec.Spec.Labels) || + !reflect.DeepEqual(oldSpec.Spec.PostgresqlParam.Labels, newSpec.Spec.PostgresqlParam.Labels) + + // Label-check for pgbackrest-pods + var oldRepoL, newRepoL map[string]string + if oldSpec.Spec.Backup != nil && oldSpec.Spec.Backup.Pgbackrest != nil { + oldRepoL = oldSpec.Spec.Backup.Pgbackrest.Labels + } + if newSpec.Spec.Backup != nil && newSpec.Spec.Backup.Pgbackrest != nil { + newRepoL = newSpec.Spec.Backup.Pgbackrest.Labels + } + + repoLabelsChanged := !reflect.DeepEqual(oldSpec.Spec.Labels, newSpec.Spec.Labels) || + !reflect.DeepEqual(oldRepoL, newRepoL) + + if pgLabelsChanged || repoLabelsChanged { + c.logger.Infof("Labels drift detected in Sync: pgLabelsChanged=%v, repoLabelsChanged=%v", pgLabelsChanged, repoLabelsChanged) + } + // Make sure we know about any in progress restores before touching other stuff if err = c.refreshRestoreConfigMap(); err != nil { return fmt.Errorf("error refreshing restore configmap: %v", err) @@ -208,13 +230,13 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { return err } - if err = c.syncPgbackrestConfig(); err != nil { - err = fmt.Errorf("could not sync pgbackrest repo-host config: %v", err) + if err = c.syncPgbackrestRepoHostConfig(&c.Spec); err != nil { + err = fmt.Errorf("could not sync pgbackrest config: %v", err) return err } if err = c.syncPgbackrestRepoHostConfig(&c.Spec); err != nil { - err = fmt.Errorf("could not sync pgbackrest config: %v", err) + err = fmt.Errorf("could not sync pgbackrest repo-host config: %v", err) return err } @@ -234,7 +256,8 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { if err = c.syncStatefulSet(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { err = fmt.Errorf("could not sync statefulsets: %v", err) - return err + syncErrors = append(syncErrors, err) + // return err } } @@ -304,7 +327,9 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { // sync connection pooler if _, err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil { - return fmt.Errorf("could not sync connection pooler: %v", err) + // return fmt.Errorf("could not sync connection pooler: %v", err) + err = fmt.Errorf("could not sync connection pooler: %v", err) + syncErrors = append(syncErrors, err) } if len(c.Spec.Streams) > 0 { @@ -331,7 +356,11 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { c.logger.Errorf("major version upgrade failed: %v", err) } - return err + if len(syncErrors) > 0 { + return fmt.Errorf("multiple sync errors: %v", syncErrors) + } + return nil + // return err } func (c *Cluster) deletePgbackrestRepoHostObjects() error { @@ -1190,7 +1219,7 @@ func (c *Cluster) rotatePasswordInSecret( // when password of connection pooler is rotated in place, pooler pods have to be replaced if roleOrigin == spec.RoleOriginConnectionPooler { listOptions := metav1.ListOptions{ - LabelSelector: c.poolerLabelsSet(true).String(), + LabelSelector: c.poolerLabelsSet(true, false).String(), } poolerPods, err := c.listPoolerPods(listOptions) if err != nil { @@ -1662,21 +1691,58 @@ func (c *Cluster) syncPgbackrestJob(forceRemove bool) error { for _, repo := range c.Postgresql.Spec.Backup.Pgbackrest.Repos { for name, schedule := range repo.Schedule { if rep == repo.Name && name == schedul { - job, err := c.generatePgbackrestJob(c.Postgresql.Spec.Backup.Pgbackrest, &repo, name, schedule) + job, err := c.generatePgbackrestJob(&c.Postgresql.Spec, c.Postgresql.Spec.Backup.Pgbackrest, &repo, name, schedule) if err != nil { return fmt.Errorf("could not generate pgbackrest job: %v", err) } remove = false - if _, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(context.TODO(), c.getPgbackrestJobName(repo.Name, name), metav1.GetOptions{}); err == nil { - if err := c.patchPgbackrestJob(job); err != nil { - return fmt.Errorf("could not update a pgbackrest cronjob: %v", err) + + currentJob, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(context.TODO(), c.getPgbackrestJobName(repo.Name, name), metav1.GetOptions{}) + + if err == nil { + if !reflect.DeepEqual(currentJob.Spec.JobTemplate.Spec.Selector, job.Spec.JobTemplate.Spec.Selector) { + c.logger.Warningf("selector changed for pgbackrest cronjob %s, recreating to avoid immutable field error", job.Name) + + err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(context.TODO(), job.Name, metav1.DeleteOptions{}) + if err != nil && !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not delete pgbackrest cronjob for recreation: %v", err) + } + + err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + func() (bool, error) { + _, err2 := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(context.TODO(), job.Name, metav1.GetOptions{}) + return k8sutil.ResourceNotFound(err2), nil + }) + if err != nil { + return fmt.Errorf("timeout waiting for pgbackrest cronjob deletion: %v", err) + } + + if err := c.createPgbackrestJob(job); err != nil { + return fmt.Errorf("could not recreate pgbackrest cronjob: %v", err) + } + c.logger.Infof("pgbackrest cronjob for %v %v has been successfully recreated", rep, schedul) + + } else { + currentJob.Labels = job.Labels + currentJob.Annotations = job.Annotations + currentJob.Spec.Schedule = job.Spec.Schedule + currentJob.Spec.JobTemplate.ObjectMeta.Labels = job.Spec.JobTemplate.ObjectMeta.Labels + currentJob.Spec.JobTemplate.Spec.Template.ObjectMeta.Labels = job.Spec.JobTemplate.Spec.Template.ObjectMeta.Labels + currentJob.Spec.JobTemplate.Spec.Template.Spec = job.Spec.JobTemplate.Spec.Template.Spec + + _, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Update(context.TODO(), currentJob, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("could not update pgbackrest cronjob via Update call: %v", err) + } + c.logger.Infof("pgbackrest cronjob for %v %v has been successfully updated (history preserved)", rep, schedul) } - c.logger.Infof("pgbackrest cronjob for %v %v has been successfully updated", rep, schedul) - } else { + } else if k8sutil.ResourceNotFound(err) { if err := c.createPgbackrestJob(job); err != nil { return fmt.Errorf("could not create a pgbackrest cronjob: %v", err) } c.logger.Infof("pgbackrest cronjob for %v %v has been successfully created", rep, schedul) + } else { + return fmt.Errorf("could not get pgbackrest cronjob: %v", err) } } } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 3a54bc1e6..f22da527b 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -398,7 +398,7 @@ func (c *Cluster) waitStatefulsetReady() error { return retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { listOptions := metav1.ListOptions{ - LabelSelector: c.labelsSetWithType(false, TYPE_POSTGRESQL).String(), + LabelSelector: c.labelsSetWithType(false, TYPE_POSTGRESQL, false).String(), } ss, err := c.KubeClient.StatefulSets(c.Namespace).List(context.TODO(), listOptions) if err != nil { @@ -417,7 +417,7 @@ func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error { var ( podsNumber int ) - ls := c.labelsSetWithType(false, TYPE_POSTGRESQL) + ls := c.labelsSetWithType(false, TYPE_POSTGRESQL, false) namespace := c.Namespace listOptions := metav1.ListOptions{ @@ -534,24 +534,26 @@ func (c *Cluster) getPrimaryLoadBalancerIp() (string, error) { // For backward compatibility, shouldAddExtraLabels must be false // when listing k8s objects. See operator PR #252 func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set { - return c.labelsSetWithType(shouldAddExtraLabels, "") + return c.labelsSetWithType(shouldAddExtraLabels, "", false) } -func (c *Cluster) labelsSetWithType(shouldAddExtraLabels bool, typeLabel PodType) labels.Set { +func (c *Cluster) labelsSetWithType(shouldAddExtraLabels bool, typeLabel PodType, isPod bool) labels.Set { lbls := make(map[string]string) + + // Basic Labels for k, v := range c.OpConfig.ClusterLabels { lbls[k] = v } lbls[c.OpConfig.ClusterNameLabel] = c.Name + if typeLabel != "" { lbls["member.cpo.opensource.cybertec.at/type"] = string(typeLabel) } + // extraLabels (inherited_labels, ...) if shouldAddExtraLabels { - // enables filtering resources owned by a team lbls["team"] = c.Postgresql.Spec.TeamID - // allow to inherit certain labels from the 'postgres' object if spec, err := c.GetSpec(); err == nil { for k, v := range spec.ObjectMeta.Labels { for _, match := range c.OpConfig.InheritedLabels { @@ -565,21 +567,52 @@ func (c *Cluster) labelsSetWithType(shouldAddExtraLabels bool, typeLabel PodType } } + // add custom labels + if isPod && typeLabel != "" { + // global labels + for k, v := range c.Postgresql.Spec.Labels { + lbls[k] = v + } + switch typeLabel { + case TYPE_POSTGRESQL: + // pg-specific labels + for k, v := range c.Postgresql.Spec.PostgresqlParam.Labels { + lbls[k] = v + } + + case TYPE_REPOSITORY, TYPE_BACKUP_JOB: + if c.Postgresql.Spec.Backup != nil && c.Postgresql.Spec.Backup.Pgbackrest != nil { + // backup-specific labels + for k, v := range c.Postgresql.Spec.Backup.Pgbackrest.Labels { + lbls[k] = v + } + } + + case TYPE_POOLER: + if c.Postgresql.Spec.ConnectionPooler != nil { + // pooler-specific labels + for k, v := range c.Postgresql.Spec.ConnectionPooler.Labels { + lbls[k] = v + } + } + } + } + return labels.Set(lbls) } func (c *Cluster) labelsSelector(typeLabel PodType) *metav1.LabelSelector { return &metav1.LabelSelector{ - MatchLabels: c.labelsSetWithType(false, typeLabel), + MatchLabels: c.labelsSetWithType(false, typeLabel, false), MatchExpressions: nil, } } func (c *Cluster) roleLabelsSelector(role PostgresRole) *metav1.LabelSelector { - lbls := c.labelsSetWithType(false, TYPE_POSTGRESQL) + lbls := c.labelsSetWithType(false, TYPE_POSTGRESQL, false) lbls[c.OpConfig.PodRoleLabel] = string(role) return &metav1.LabelSelector{ - MatchLabels: c.labelsSetWithType(false, TYPE_POSTGRESQL), + MatchLabels: c.labelsSetWithType(false, TYPE_POSTGRESQL, false), MatchExpressions: nil, } } @@ -592,7 +625,7 @@ func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) la lbls = c.labelsSet(shouldAddExtraLabels) //c.labelsSetWithType(shouldAddExtraLabels, TYPE_POSTGRESQL) lbls[c.OpConfig.PodRoleLabel] = string(role) } else { - lbls = c.labelsSetWithType(shouldAddExtraLabels, "") + lbls = c.labelsSetWithType(shouldAddExtraLabels, "", false) } return lbls }