-
Notifications
You must be signed in to change notification settings - Fork 267
CNTRLPLANE-3234: health reporter #2193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
520a9ce
27214dd
3e971c5
dcf239c
e33de33
24159d6
8d9b06a
4ae1530
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| package health | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "path/filepath" | ||
| "strings" | ||
| "time" | ||
|
|
||
| k8senvelopekmsv2 "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2" | ||
| kmsservice "k8s.io/kms/pkg/service" | ||
| ) | ||
|
|
||
| type plugin struct { | ||
| keyID string | ||
| service kmsservice.Service | ||
| } | ||
|
|
||
| type Checker struct { | ||
| plugins []plugin | ||
| now func() time.Time | ||
| } | ||
|
|
||
| func NewChecker(ctx context.Context, udsPaths []string, timeout time.Duration) (*Checker, error) { | ||
| c := Checker{ | ||
| plugins: make([]plugin, 0, len(udsPaths)), | ||
| now: time.Now, | ||
| } | ||
|
|
||
| for _, socket := range udsPaths { | ||
| service, err := k8senvelopekmsv2.NewGRPCService( | ||
| ctx, | ||
| "unix://"+socket, | ||
| providerName, | ||
| timeout, | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("dial KMS plugin at %q: %w", socket, err) | ||
| } | ||
|
|
||
| c.plugins = append(c.plugins, plugin{ | ||
| keyID: keyIDFromSocket(socket), | ||
| service: service, | ||
| }) | ||
| } | ||
|
|
||
| return &c, nil | ||
| } | ||
|
|
||
| func keyIDFromSocket(path string) string { | ||
| name := strings.TrimSuffix(filepath.Base(path), filepath.Ext(path)) | ||
| if i := strings.LastIndex(name, "-"); i >= 0 { | ||
| return name[i+1:] | ||
| } | ||
| return name | ||
| } | ||
|
|
||
| // CheckStatus checks the KMS plugin health via UDS. It never reports an error, | ||
| // it encodes the error into a Condition. | ||
| func (c *Checker) CheckStatus(ctx context.Context) []PluginHealthCondition { | ||
| conditions := make([]PluginHealthCondition, 0, len(c.plugins)) | ||
|
|
||
| // Safe to parallelise: each plugin probes an independent socket / has a unique index in slice. | ||
| for _, p := range c.plugins { | ||
| cond := PluginHealthCondition{ | ||
| KeyID: p.keyID, | ||
| LastChecked: c.now(), | ||
| } | ||
|
|
||
| resp, err := p.service.Status(ctx) | ||
| switch { | ||
| case err != nil: | ||
| cond.Status = "error" | ||
| cond.Detail = err.Error() | ||
| case resp.Healthz == "ok": | ||
| cond.Status = "healthy" | ||
| cond.KEKID = resp.KeyID | ||
| default: | ||
| cond.Status = "unhealthy" | ||
| cond.Detail = resp.Healthz | ||
| } | ||
|
|
||
| conditions = append(conditions, cond) | ||
| } | ||
| return conditions | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| package health | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "reflect" | ||
| "testing" | ||
| "time" | ||
|
|
||
| kmsservice "k8s.io/kms/pkg/service" | ||
| ) | ||
|
|
||
| type fakeService struct { | ||
| resp *kmsservice.StatusResponse | ||
| err error | ||
| } | ||
|
|
||
| func (f *fakeService) Status(context.Context) (*kmsservice.StatusResponse, error) { | ||
| return f.resp, f.err | ||
| } | ||
| func (f *fakeService) Encrypt(context.Context, string, []byte) (*kmsservice.EncryptResponse, error) { | ||
| return nil, nil | ||
| } | ||
| func (f *fakeService) Decrypt(context.Context, string, *kmsservice.DecryptRequest) ([]byte, error) { | ||
| return nil, nil | ||
| } | ||
|
|
||
| func TestChecker_CheckStatus(t *testing.T) { | ||
| fixed := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) | ||
| c := &Checker{ | ||
| plugins: []plugin{ | ||
| {keyID: "1", service: &fakeService{resp: &kmsservice.StatusResponse{Healthz: "ok", KeyID: "kek-abc"}}}, | ||
| {keyID: "2", service: &fakeService{err: fmt.Errorf("connection refused")}}, | ||
| {keyID: "3", service: &fakeService{resp: &kmsservice.StatusResponse{Healthz: "degraded"}}}, | ||
| }, | ||
| now: func() time.Time { return fixed }, | ||
| } | ||
|
|
||
| got := c.CheckStatus(context.Background()) | ||
| want := []PluginHealthCondition{ | ||
| {KeyID: "1", KEKID: "kek-abc", Status: "healthy", LastChecked: fixed}, | ||
| {KeyID: "2", Status: "error", Detail: "connection refused", LastChecked: fixed}, | ||
| {KeyID: "3", Status: "unhealthy", Detail: "degraded", LastChecked: fixed}, | ||
| } | ||
| if !reflect.DeepEqual(got, want) { | ||
| t.Errorf("CheckStatus():\n got: %+v\n want: %+v", got, want) | ||
| } | ||
| } | ||
|
|
||
| func Test_keyIDFromSocket(t *testing.T) { | ||
| tests := []struct { | ||
| path string | ||
| want string | ||
| }{ | ||
| {"/var/run/kmsplugin/kms-1.sock", "1"}, | ||
| {"/var/run/kmsplugin/kms-2.sock", "2"}, | ||
| {"kms-42.sock", "42"}, | ||
| {"plugin.sock", "plugin"}, | ||
| {"/tmp/my-custom-provider.sock", "provider"}, | ||
| } | ||
| for _, tt := range tests { | ||
| t.Run(tt.path, func(t *testing.T) { | ||
| if got := keyIDFromSocket(tt.path); got != tt.want { | ||
| t.Errorf("keyIDFromSocket(%q) = %q, want %q", tt.path, got, tt.want) | ||
| } | ||
| }) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,131 @@ | ||||||||||||||||||||||||
| package health | ||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What this package is supposed to do is very similar to https://github.com/openshift/library-go/blob/master/pkg/operator/encryption/kms/preflight/checker.go. I think it is better to mimic from it. The only difference is health monitor will periodically write the status to somewhere.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are the differences? Both are cobra commands. They have flags, validations and a run method. I create a reader, a writer and put them into a monitor. Writer is an interface, because I want to pivot to operator conditions.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, it seems there is room for reuse. it would be better to have a single binary with custom logic. |
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||
| "strings" | ||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| "github.com/spf13/cobra" | ||||||||||||||||||||||||
| "github.com/spf13/pflag" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| "k8s.io/apimachinery/pkg/util/wait" | ||||||||||||||||||||||||
| "k8s.io/client-go/rest" | ||||||||||||||||||||||||
| "k8s.io/client-go/tools/clientcmd" | ||||||||||||||||||||||||
| "k8s.io/klog/v2" | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| const providerName = "kms-health-monitor" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| type commandOptions struct { | ||||||||||||||||||||||||
| kmsSockets []string | ||||||||||||||||||||||||
| readInterval time.Duration | ||||||||||||||||||||||||
| readTimeout time.Duration | ||||||||||||||||||||||||
| writeTimeout time.Duration | ||||||||||||||||||||||||
| targetOperator string | ||||||||||||||||||||||||
| nodeName string | ||||||||||||||||||||||||
| kubeconfig string | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func NewCommand(ctx context.Context) *cobra.Command { | ||||||||||||||||||||||||
| o := &commandOptions{ | ||||||||||||||||||||||||
| readInterval: 30 * time.Second, | ||||||||||||||||||||||||
| readTimeout: 5 * time.Second, | ||||||||||||||||||||||||
| writeTimeout: 10 * time.Second, | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| cmd := &cobra.Command{ | ||||||||||||||||||||||||
| Use: "kms-health-monitor", | ||||||||||||||||||||||||
| Short: "Observes a co-located KMSv2 plugin and publishes status as an OperatorCondition.", | ||||||||||||||||||||||||
| RunE: func(cmd *cobra.Command, args []string) error { | ||||||||||||||||||||||||
| if err := o.validate(); err != nil { | ||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| return o.run(ctx) | ||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| o.addFlags(cmd.Flags()) | ||||||||||||||||||||||||
| return cmd | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func (o *commandOptions) addFlags(fs *pflag.FlagSet) { | ||||||||||||||||||||||||
| fs.StringSliceVar(&o.kmsSockets, "kms-sockets", nil, "filesystem paths to the KMSv2 plugin UDS") | ||||||||||||||||||||||||
| fs.DurationVar(&o.readInterval, "read-interval", o.readInterval, "cadence between checks") | ||||||||||||||||||||||||
| fs.DurationVar(&o.readTimeout, "read-timeout", o.readTimeout, "deadline for each Status RPC") | ||||||||||||||||||||||||
| fs.DurationVar(&o.writeTimeout, "write-timeout", o.writeTimeout, "deadline for each condition update; should fit inside --read-interval") | ||||||||||||||||||||||||
| fs.StringVar(&o.targetOperator, "target-operator", "", "target operator CRD: "+strings.Join(supportedOperatorKeys(), ", ")) | ||||||||||||||||||||||||
| fs.StringVar(&o.nodeName, "node-name", "", "node name recorded in the condition used to help to identify the origin") | ||||||||||||||||||||||||
| fs.StringVar(&o.kubeconfig, "kubeconfig", "", "path to a kubeconfig; empty uses in-cluster config") | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func (o *commandOptions) validate() error { | ||||||||||||||||||||||||
| if len(o.kmsSockets) == 0 { | ||||||||||||||||||||||||
| return fmt.Errorf("--kms-sockets is required, at least one") | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
|
||||||||||||||||||||||||
| for _, s := range o.kmsSockets { | ||||||||||||||||||||||||
| if strings.TrimSpace(s) == "" { | ||||||||||||||||||||||||
| return fmt.Errorf("--kms-sockets cannot contain empty entries") | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| if o.readInterval <= 0 { | ||||||||||||||||||||||||
| return fmt.Errorf("--read-interval must be positive") | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| if o.readTimeout <= 0 { | ||||||||||||||||||||||||
| return fmt.Errorf("--read-timeout must be positive") | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| if o.writeTimeout <= 0 { | ||||||||||||||||||||||||
| return fmt.Errorf("--write-timeout must be positive") | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
Comment on lines
+75
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Enforce the unhealthy cadence budget.
♻️ Suggested fix if o.writeTimeout <= 0 {
return fmt.Errorf("--write-timeout must be positive")
}
+ if o.probeTimeout+o.writeTimeout >= o.probeIntervalUnhealthy {
+ return fmt.Errorf(
+ "--probe-timeout + --write-timeout must be smaller than --probe-interval-unhealthy",
+ )
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||
| if o.nodeName == "" { | ||||||||||||||||||||||||
| return fmt.Errorf("--node-name is required") | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| if _, ok := supportedOperators[TargetOperator(o.targetOperator)]; !ok { | ||||||||||||||||||||||||
| return fmt.Errorf("--target-operator must be one of %s (got %q)", | ||||||||||||||||||||||||
| strings.Join(supportedOperatorKeys(), ", "), o.targetOperator) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func (o *commandOptions) run(ctx context.Context) error { | ||||||||||||||||||||||||
| klog.InfoS("kms-health-monitor starting", | ||||||||||||||||||||||||
| "socket", o.kmsSockets, | ||||||||||||||||||||||||
| "targetOperator", o.targetOperator, | ||||||||||||||||||||||||
| "observerNode", o.nodeName, | ||||||||||||||||||||||||
| "interval", o.readInterval, | ||||||||||||||||||||||||
| "readTimeout", o.readTimeout, | ||||||||||||||||||||||||
| "writeTimeout", o.writeTimeout, | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| cfg, err := buildRESTConfig(o.kubeconfig) | ||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| return fmt.Errorf("build rest config: %w", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| writer, err := buildWriter(cfg, TargetOperator(o.targetOperator), o.nodeName) | ||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| return fmt.Errorf("create new writer: %w", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| checker, err := NewChecker(ctx, o.kmsSockets, o.readTimeout) | ||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||
| return fmt.Errorf("create new checker: %w", err) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| wait.JitterUntilWithContext(ctx, func(ctx context.Context) { | ||||||||||||||||||||||||
| probeCtx, cancel := context.WithTimeout(ctx, o.readTimeout) | ||||||||||||||||||||||||
| defer cancel() | ||||||||||||||||||||||||
| conditions := checker.CheckStatus(probeCtx) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| writeCtx, writeCancel := context.WithTimeout(ctx, o.writeTimeout) | ||||||||||||||||||||||||
| defer writeCancel() | ||||||||||||||||||||||||
| if err := writer.Apply(writeCtx, conditions); err != nil { | ||||||||||||||||||||||||
| klog.ErrorS(err, "apply operator status") | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| }, o.readInterval, 0.1, false) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func buildRESTConfig(kubeconfig string) (*rest.Config, error) { | ||||||||||||||||||||||||
| if kubeconfig != "" { | ||||||||||||||||||||||||
| return clientcmd.BuildConfigFromFlags("", kubeconfig) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| return rest.InClusterConfig() | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| package health | ||
|
|
||
| import ( | ||
| "sort" | ||
| "time" | ||
|
|
||
| "k8s.io/apimachinery/pkg/runtime/schema" | ||
| ) | ||
|
|
||
| type TargetOperator string | ||
|
|
||
| const ( | ||
| kubeAPIServer TargetOperator = "kube-apiserver" | ||
| openshiftAPIServer TargetOperator = "openshift-apiserver" | ||
| authAPIServer TargetOperator = "auth-apiserver" | ||
| ) | ||
|
|
||
| var supportedOperators = map[TargetOperator]struct { | ||
| GVR schema.GroupVersionResource | ||
| GVK schema.GroupVersionKind | ||
| }{ | ||
| kubeAPIServer: { | ||
| GVR: schema.GroupVersionResource{Group: "operator.openshift.io", Version: "v1", Resource: "kubeapiservers"}, | ||
| GVK: schema.GroupVersionKind{Group: "operator.openshift.io", Version: "v1", Kind: "KubeAPIServer"}, | ||
| }, | ||
| authAPIServer: { | ||
| GVR: schema.GroupVersionResource{Group: "operator.openshift.io", Version: "v1", Resource: "authentications"}, | ||
| GVK: schema.GroupVersionKind{Group: "operator.openshift.io", Version: "v1", Kind: "Authentication"}, | ||
| }, | ||
| openshiftAPIServer: { | ||
| GVR: schema.GroupVersionResource{Group: "operator.openshift.io", Version: "v1", Resource: "openshiftapiservers"}, | ||
| GVK: schema.GroupVersionKind{Group: "operator.openshift.io", Version: "v1", Kind: "OpenShiftAPIServer"}, | ||
| }, | ||
| } | ||
|
|
||
| type PluginHealthCondition struct { | ||
| // KeyID is the sequential key identifier assigned by the key controller. | ||
| KeyID string `json:"keyID"` | ||
| // KEKID is the ID of the key used by the KMS provider for encryption/decryption. | ||
| KEKID string `json:"kekID,omitempty"` | ||
| Status string `json:"status"` | ||
| LastChecked time.Time `json:"lastChecked"` | ||
| Detail string `json:"detail,omitempty"` | ||
| } | ||
|
|
||
| func supportedOperatorKeys() []string { | ||
| keys := make([]string, 0, len(supportedOperators)) | ||
| for k := range supportedOperators { | ||
| keys = append(keys, string(k)) | ||
| } | ||
| sort.Strings(keys) | ||
| return keys | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've implemented an alternative #2271 to suggest the changes. Please let me know your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wow, now I am a reviewer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is just to give you an idea. You shouldn't review that one, but there are some bits to show you how we can reuse what we have in library-go.