diff --git a/pkg/operator/encryption/kms/health/checker.go b/pkg/operator/encryption/kms/health/checker.go new file mode 100644 index 0000000000..2694858f43 --- /dev/null +++ b/pkg/operator/encryption/kms/health/checker.go @@ -0,0 +1,86 @@ +package health + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "time" + + k8senvelopekmsv2 "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2" + kmsservice "k8s.io/kms/pkg/service" +) + +type plugin struct { + keyID string + service kmsservice.Service +} + +type Checker struct { + plugins []plugin + now func() time.Time +} + +func NewChecker(ctx context.Context, udsPaths []string, timeout time.Duration) (*Checker, error) { + c := Checker{ + plugins: make([]plugin, 0, len(udsPaths)), + now: time.Now, + } + + for _, socket := range udsPaths { + service, err := k8senvelopekmsv2.NewGRPCService( + ctx, + "unix://"+socket, + providerName, + timeout, + ) + if err != nil { + return nil, fmt.Errorf("dial KMS plugin at %q: %w", socket, err) + } + + c.plugins = append(c.plugins, plugin{ + keyID: keyIDFromSocket(socket), + service: service, + }) + } + + return &c, nil +} + +func keyIDFromSocket(path string) string { + name := strings.TrimSuffix(filepath.Base(path), filepath.Ext(path)) + if i := strings.LastIndex(name, "-"); i >= 0 { + return name[i+1:] + } + return name +} + +// CheckStatus checks the KMS plugin health via UDS. It never reports an error, +// it encodes the error into a Condition. +func (c *Checker) CheckStatus(ctx context.Context) []PluginHealthCondition { + conditions := make([]PluginHealthCondition, 0, len(c.plugins)) + + // Safe to parallelise: each plugin probes an independent socket / has a unique index in slice. + for _, p := range c.plugins { + cond := PluginHealthCondition{ + KeyID: p.keyID, + LastChecked: c.now(), + } + + resp, err := p.service.Status(ctx) + switch { + case err != nil: + cond.Status = "error" + cond.Detail = err.Error() + case resp.Healthz == "ok": + cond.Status = "healthy" + cond.KEKID = resp.KeyID + default: + cond.Status = "unhealthy" + cond.Detail = resp.Healthz + } + + conditions = append(conditions, cond) + } + return conditions +} diff --git a/pkg/operator/encryption/kms/health/checker_test.go b/pkg/operator/encryption/kms/health/checker_test.go new file mode 100644 index 0000000000..d02a191eb6 --- /dev/null +++ b/pkg/operator/encryption/kms/health/checker_test.go @@ -0,0 +1,68 @@ +package health + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + kmsservice "k8s.io/kms/pkg/service" +) + +type fakeService struct { + resp *kmsservice.StatusResponse + err error +} + +func (f *fakeService) Status(context.Context) (*kmsservice.StatusResponse, error) { + return f.resp, f.err +} +func (f *fakeService) Encrypt(context.Context, string, []byte) (*kmsservice.EncryptResponse, error) { + return nil, nil +} +func (f *fakeService) Decrypt(context.Context, string, *kmsservice.DecryptRequest) ([]byte, error) { + return nil, nil +} + +func TestChecker_CheckStatus(t *testing.T) { + fixed := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + c := &Checker{ + plugins: []plugin{ + {keyID: "1", service: &fakeService{resp: &kmsservice.StatusResponse{Healthz: "ok", KeyID: "kek-abc"}}}, + {keyID: "2", service: &fakeService{err: fmt.Errorf("connection refused")}}, + {keyID: "3", service: &fakeService{resp: &kmsservice.StatusResponse{Healthz: "degraded"}}}, + }, + now: func() time.Time { return fixed }, + } + + got := c.CheckStatus(context.Background()) + want := []PluginHealthCondition{ + {KeyID: "1", KEKID: "kek-abc", Status: "healthy", LastChecked: fixed}, + {KeyID: "2", Status: "error", Detail: "connection refused", LastChecked: fixed}, + {KeyID: "3", Status: "unhealthy", Detail: "degraded", LastChecked: fixed}, + } + if !reflect.DeepEqual(got, want) { + t.Errorf("CheckStatus():\n got: %+v\n want: %+v", got, want) + } +} + +func Test_keyIDFromSocket(t *testing.T) { + tests := []struct { + path string + want string + }{ + {"/var/run/kmsplugin/kms-1.sock", "1"}, + {"/var/run/kmsplugin/kms-2.sock", "2"}, + {"kms-42.sock", "42"}, + {"plugin.sock", "plugin"}, + {"/tmp/my-custom-provider.sock", "provider"}, + } + for _, tt := range tests { + t.Run(tt.path, func(t *testing.T) { + if got := keyIDFromSocket(tt.path); got != tt.want { + t.Errorf("keyIDFromSocket(%q) = %q, want %q", tt.path, got, tt.want) + } + }) + } +} diff --git a/pkg/operator/encryption/kms/health/cmd.go b/pkg/operator/encryption/kms/health/cmd.go new file mode 100644 index 0000000000..90beebe5dc --- /dev/null +++ b/pkg/operator/encryption/kms/health/cmd.go @@ -0,0 +1,131 @@ +package health + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" +) + +const providerName = "kms-health-monitor" + +type commandOptions struct { + kmsSockets []string + readInterval time.Duration + readTimeout time.Duration + writeTimeout time.Duration + targetOperator string + nodeName string + kubeconfig string +} + +func NewCommand(ctx context.Context) *cobra.Command { + o := &commandOptions{ + readInterval: 30 * time.Second, + readTimeout: 5 * time.Second, + writeTimeout: 10 * time.Second, + } + cmd := &cobra.Command{ + Use: "kms-health-monitor", + Short: "Observes a co-located KMSv2 plugin and publishes status as an OperatorCondition.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := o.validate(); err != nil { + return err + } + return o.run(ctx) + }, + } + o.addFlags(cmd.Flags()) + return cmd +} + +func (o *commandOptions) addFlags(fs *pflag.FlagSet) { + fs.StringSliceVar(&o.kmsSockets, "kms-sockets", nil, "filesystem paths to the KMSv2 plugin UDS") + fs.DurationVar(&o.readInterval, "read-interval", o.readInterval, "cadence between checks") + fs.DurationVar(&o.readTimeout, "read-timeout", o.readTimeout, "deadline for each Status RPC") + fs.DurationVar(&o.writeTimeout, "write-timeout", o.writeTimeout, "deadline for each condition update; should fit inside --read-interval") + fs.StringVar(&o.targetOperator, "target-operator", "", "target operator CRD: "+strings.Join(supportedOperatorKeys(), ", ")) + fs.StringVar(&o.nodeName, "node-name", "", "node name recorded in the condition used to help to identify the origin") + fs.StringVar(&o.kubeconfig, "kubeconfig", "", "path to a kubeconfig; empty uses in-cluster config") +} + +func (o *commandOptions) validate() error { + if len(o.kmsSockets) == 0 { + return fmt.Errorf("--kms-sockets is required, at least one") + } + for _, s := range o.kmsSockets { + if strings.TrimSpace(s) == "" { + return fmt.Errorf("--kms-sockets cannot contain empty entries") + } + } + if o.readInterval <= 0 { + return fmt.Errorf("--read-interval must be positive") + } + if o.readTimeout <= 0 { + return fmt.Errorf("--read-timeout must be positive") + } + if o.writeTimeout <= 0 { + return fmt.Errorf("--write-timeout must be positive") + } + if o.nodeName == "" { + return fmt.Errorf("--node-name is required") + } + if _, ok := supportedOperators[TargetOperator(o.targetOperator)]; !ok { + return fmt.Errorf("--target-operator must be one of %s (got %q)", + strings.Join(supportedOperatorKeys(), ", "), o.targetOperator) + } + return nil +} + +func (o *commandOptions) run(ctx context.Context) error { + klog.InfoS("kms-health-monitor starting", + "socket", o.kmsSockets, + "targetOperator", o.targetOperator, + "observerNode", o.nodeName, + "interval", o.readInterval, + "readTimeout", o.readTimeout, + "writeTimeout", o.writeTimeout, + ) + + cfg, err := buildRESTConfig(o.kubeconfig) + if err != nil { + return fmt.Errorf("build rest config: %w", err) + } + writer, err := buildWriter(cfg, TargetOperator(o.targetOperator), o.nodeName) + if err != nil { + return fmt.Errorf("create new writer: %w", err) + } + checker, err := NewChecker(ctx, o.kmsSockets, o.readTimeout) + if err != nil { + return fmt.Errorf("create new checker: %w", err) + } + + wait.JitterUntilWithContext(ctx, func(ctx context.Context) { + probeCtx, cancel := context.WithTimeout(ctx, o.readTimeout) + defer cancel() + conditions := checker.CheckStatus(probeCtx) + + writeCtx, writeCancel := context.WithTimeout(ctx, o.writeTimeout) + defer writeCancel() + if err := writer.Apply(writeCtx, conditions); err != nil { + klog.ErrorS(err, "apply operator status") + } + }, o.readInterval, 0.1, false) + + return nil +} + +func buildRESTConfig(kubeconfig string) (*rest.Config, error) { + if kubeconfig != "" { + return clientcmd.BuildConfigFromFlags("", kubeconfig) + } + return rest.InClusterConfig() +} diff --git a/pkg/operator/encryption/kms/health/types.go b/pkg/operator/encryption/kms/health/types.go new file mode 100644 index 0000000000..75e8ae04ae --- /dev/null +++ b/pkg/operator/encryption/kms/health/types.go @@ -0,0 +1,53 @@ +package health + +import ( + "sort" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type TargetOperator string + +const ( + kubeAPIServer TargetOperator = "kube-apiserver" + openshiftAPIServer TargetOperator = "openshift-apiserver" + authAPIServer TargetOperator = "auth-apiserver" +) + +var supportedOperators = map[TargetOperator]struct { + GVR schema.GroupVersionResource + GVK schema.GroupVersionKind +}{ + kubeAPIServer: { + GVR: schema.GroupVersionResource{Group: "operator.openshift.io", Version: "v1", Resource: "kubeapiservers"}, + GVK: schema.GroupVersionKind{Group: "operator.openshift.io", Version: "v1", Kind: "KubeAPIServer"}, + }, + authAPIServer: { + GVR: schema.GroupVersionResource{Group: "operator.openshift.io", Version: "v1", Resource: "authentications"}, + GVK: schema.GroupVersionKind{Group: "operator.openshift.io", Version: "v1", Kind: "Authentication"}, + }, + openshiftAPIServer: { + GVR: schema.GroupVersionResource{Group: "operator.openshift.io", Version: "v1", Resource: "openshiftapiservers"}, + GVK: schema.GroupVersionKind{Group: "operator.openshift.io", Version: "v1", Kind: "OpenShiftAPIServer"}, + }, +} + +type PluginHealthCondition struct { + // KeyID is the sequential key identifier assigned by the key controller. + KeyID string `json:"keyID"` + // KEKID is the ID of the key used by the KMS provider for encryption/decryption. + KEKID string `json:"kekID,omitempty"` + Status string `json:"status"` + LastChecked time.Time `json:"lastChecked"` + Detail string `json:"detail,omitempty"` +} + +func supportedOperatorKeys() []string { + keys := make([]string, 0, len(supportedOperators)) + for k := range supportedOperators { + keys = append(keys, string(k)) + } + sort.Strings(keys) + return keys +} diff --git a/pkg/operator/encryption/kms/health/writer.go b/pkg/operator/encryption/kms/health/writer.go new file mode 100644 index 0000000000..b824940a5c --- /dev/null +++ b/pkg/operator/encryption/kms/health/writer.go @@ -0,0 +1,59 @@ +package health + +import ( + "context" + "encoding/json" + "fmt" + + operatorv1 "github.com/openshift/api/operator/v1" + applyoperatorv1 "github.com/openshift/client-go/operator/applyconfigurations/operator/v1" + "github.com/openshift/library-go/pkg/operator/genericoperatorclient" + "github.com/openshift/library-go/pkg/operator/v1helpers" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/rest" + "k8s.io/utils/clock" +) + +type Writer struct { + operatorClient v1helpers.OperatorClientWithFinalizers + nodeName string +} + +func (w *Writer) Apply(ctx context.Context, conditions []PluginHealthCondition) error { + msg, err := json.Marshal(conditions) + if err != nil { + return fmt.Errorf("marshal conditions: %w", err) + } + + // Hardcoded to avoid StatusSyncer side-effects; will be rewritten after API change in operator CR status. + cond := applyoperatorv1.OperatorCondition(). + WithType("KMSHealthReporter_" + w.nodeName). + WithStatus(operatorv1.ConditionTrue). + WithReason("AsExpected"). + WithMessage(string(msg)) + + status := applyoperatorv1.OperatorStatus().WithConditions(cond) + fieldManager := "kms-health-monitor-" + w.nodeName + return w.operatorClient.ApplyOperatorStatus(ctx, fieldManager, status) +} + +func buildWriter(cfg *rest.Config, targetOperator TargetOperator, nodeName string) (*Writer, error) { + target := supportedOperators[targetOperator] + operatorClient, _, err := genericoperatorclient.NewClusterScopedOperatorClient( + clock.RealClock{}, cfg, target.GVR, target.GVK, + emptyOperatorSpec, emptyOperatorStatus, + ) + if err != nil { + return nil, fmt.Errorf("build operator client for %s: %w", targetOperator, err) + } + + return &Writer{operatorClient: operatorClient, nodeName: nodeName}, nil +} + +func emptyOperatorSpec(_ *unstructured.Unstructured, _ string) (*applyoperatorv1.OperatorSpecApplyConfiguration, error) { + return applyoperatorv1.OperatorSpec(), nil +} + +func emptyOperatorStatus(_ *unstructured.Unstructured, _ string) (*applyoperatorv1.OperatorStatusApplyConfiguration, error) { + return applyoperatorv1.OperatorStatus(), nil +} diff --git a/pkg/operator/encryption/kms/health/writer_test.go b/pkg/operator/encryption/kms/health/writer_test.go new file mode 100644 index 0000000000..476233b7f6 --- /dev/null +++ b/pkg/operator/encryption/kms/health/writer_test.go @@ -0,0 +1,99 @@ +package health + +import ( + "context" + "encoding/json" + "testing" + "time" + + operatorv1 "github.com/openshift/api/operator/v1" + "github.com/openshift/library-go/pkg/operator/v1helpers" +) + +func TestWriter_Apply(t *testing.T) { + tests := []struct { + name string + nodeName string + conditions []PluginHealthCondition + wantType string + wantMsg string + }{ + { + name: "single healthy plugin", + nodeName: "master-0", + conditions: []PluginHealthCondition{ + {KeyID: "1", KEKID: "key-abc", Status: "healthy", LastChecked: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)}, + }, + wantType: "KMSHealthReporter_master-0", + wantMsg: `[{"keyID":"1","kekID":"key-abc","status":"healthy","lastChecked":"2025-01-01T00:00:00Z"}]`, + }, + { + name: "mixed healthy and error", + nodeName: "master-1", + conditions: []PluginHealthCondition{ + {KeyID: "1", KEKID: "key-abc", Status: "healthy", LastChecked: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)}, + {KeyID: "2", Status: "error", Detail: "connection refused", LastChecked: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)}, + }, + wantType: "KMSHealthReporter_master-1", + wantMsg: `[{"keyID":"1","kekID":"key-abc","status":"healthy","lastChecked":"2025-01-01T00:00:00Z"},{"keyID":"2","status":"error","lastChecked":"2025-01-01T00:00:00Z","detail":"connection refused"}]`, + }, + { + name: "unhealthy plugin", + nodeName: "master-2", + conditions: []PluginHealthCondition{ + {KeyID: "1", Status: "unhealthy", Detail: "not ready", LastChecked: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)}, + }, + wantType: "KMSHealthReporter_master-2", + wantMsg: `[{"keyID":"1","status":"unhealthy","lastChecked":"2025-01-01T00:00:00Z","detail":"not ready"}]`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := v1helpers.NewFakeOperatorClient( + &operatorv1.OperatorSpec{}, + &operatorv1.OperatorStatus{}, + nil, + ) + w := &Writer{operatorClient: fakeClient, nodeName: tt.nodeName} + + if err := w.Apply(context.Background(), tt.conditions); err != nil { + t.Fatalf("Apply() error: %v", err) + } + + _, status, _, err := fakeClient.GetOperatorState() + if err != nil { + t.Fatalf("GetOperatorState() error: %v", err) + } + + if len(status.Conditions) != 1 { + t.Fatalf("expected 1 condition, got %d", len(status.Conditions)) + } + + cond := status.Conditions[0] + if cond.Type != tt.wantType { + t.Errorf("condition type = %q, want %q", cond.Type, tt.wantType) + } + if cond.Status != operatorv1.ConditionTrue { + t.Errorf("condition status = %q, want %q", cond.Status, operatorv1.ConditionTrue) + } + if cond.Reason != "AsExpected" { + t.Errorf("condition reason = %q, want %q", cond.Reason, "AsExpected") + } + + // Compare as parsed JSON to ignore key ordering differences. + var gotParsed, wantParsed any + if err := json.Unmarshal([]byte(cond.Message), &gotParsed); err != nil { + t.Fatalf("parse condition message: %v", err) + } + if err := json.Unmarshal([]byte(tt.wantMsg), &wantParsed); err != nil { + t.Fatalf("parse want message: %v", err) + } + gotJSON, _ := json.Marshal(gotParsed) + wantJSON, _ := json.Marshal(wantParsed) + if string(gotJSON) != string(wantJSON) { + t.Errorf("condition message:\n got: %s\n want: %s", gotJSON, wantJSON) + } + }) + } +}