From 9b99708ab2c23e1ce449703c310abadb0f250bf5 Mon Sep 17 00:00:00 2001 From: Ankur Shrivastava Date: Thu, 30 Apr 2026 14:37:08 +0800 Subject: [PATCH 1/4] feat: expose worker run options and wire Prometheus metrics by default core.Run() invoked workers.Run with no RunOptions, so the workers package silently fell back to BaseMetrics{} (no-op) and every worker_* Prometheus metric stayed at zero for any service using CBWorkerProvider. Add AddWorkerRunOptions for init-time configuration and prepend a default workers.WithMetrics(workers.NewPrometheusMetrics(AppName)) when APP_NAME is set and Prometheus is not disabled. User-supplied WithMetrics still wins because workers.WithMetrics overwrites runConfig.metrics on each apply. --- README.md | 14 +++- core.go | 3 +- workers.go | 34 ++++++++++ workers_test.go | 175 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 workers.go create mode 100644 workers_test.go diff --git a/README.md b/README.md index 20d6df3..76ed094 100755 --- a/README.md +++ b/README.md @@ -85,6 +85,7 @@ For full documentation, visit https://docs.coldbrew.cloud ## Index - [Constants](<#constants>) +- [func AddWorkerRunOptions\(opts ...workers.RunOption\)](<#AddWorkerRunOptions>) - [func InitializeVTProto\(\)](<#InitializeVTProto>) - [func OTELMeterProvider\(\) otelmetric.MeterProvider](<#OTELMeterProvider>) - [func SetOTELGRPCClientOptions\(opts ...otelgrpc.Option\)](<#SetOTELGRPCClientOptions>) @@ -121,6 +122,17 @@ For full documentation, visit https://docs.coldbrew.cloud const SupportPackageIsVersion1 = true ``` + +## func [AddWorkerRunOptions]() + +```go +func AddWorkerRunOptions(opts ...workers.RunOption) +``` + +AddWorkerRunOptions appends \[workers.RunOption\] values applied when core.Run\(\) invokes \[workers.Run\]. Use this to configure framework\-wide worker behaviour: metrics, run\-level interceptors, default jitter, etc. Must be called during init, before Run\(\). Not concurrency\-safe. + +By default, core wires a Prometheus metrics implementation using the service's APP\_NAME unless DISABLE\_PROMETHEUS=true or APP\_NAME is empty. Pass \[workers.WithMetrics\] here to override that default; a later WithMetrics wins because workers.WithMetrics overwrites runConfig.metrics on each apply. + ## func [InitializeVTProto]() @@ -319,7 +331,7 @@ type CB interface { ``` -### func [New]() +### func [New]() ```go func New(c config.Config) CB diff --git a/core.go b/core.go index 5eab7b1..a6f5610 100644 --- a/core.go +++ b/core.go @@ -846,9 +846,10 @@ func (c *cb) Run() error { if len(allWorkers) > 0 { workerCtx, workerCancel := context.WithCancel(gctx) c.workerCancel = workerCancel + runOpts := c.buildWorkerRunOpts() g.Go(func() error { // workers.Run returns nil on context cancellation (clean shutdown). - return workers.Run(workerCtx, allWorkers) + return workers.Run(workerCtx, allWorkers, runOpts...) }) } diff --git a/workers.go b/workers.go new file mode 100644 index 0000000..a9bbb20 --- /dev/null +++ b/workers.go @@ -0,0 +1,34 @@ +package core + +import "github.com/go-coldbrew/workers" + +// Run-level options applied when core.Run() invokes workers.Run. Mutated +// during init via AddWorkerRunOptions; not concurrency-safe by design (same +// contract as the OTEL setters in core.go). +var workerRunOpts []workers.RunOption + +// AddWorkerRunOptions appends [workers.RunOption] values applied when +// core.Run() invokes [workers.Run]. Use this to configure framework-wide +// worker behaviour: metrics, run-level interceptors, default jitter, etc. +// Must be called during init, before Run(). Not concurrency-safe. +// +// By default, core wires a Prometheus metrics implementation using the +// service's APP_NAME unless DISABLE_PROMETHEUS=true or APP_NAME is empty. +// Pass [workers.WithMetrics] here to override that default; a later +// WithMetrics wins because workers.WithMetrics overwrites runConfig.metrics +// on each apply. +func AddWorkerRunOptions(opts ...workers.RunOption) { + workerRunOpts = append(workerRunOpts, opts...) +} + +// buildWorkerRunOpts assembles the option slice passed to workers.Run. The +// default Prometheus metrics (when enabled) is prepended so that any +// user-supplied workers.WithMetrics overrides it. +func (c *cb) buildWorkerRunOpts() []workers.RunOption { + opts := make([]workers.RunOption, 0, len(workerRunOpts)+1) + if !c.config.DisablePrometheus && !c.config.DisablePormetheus && c.config.AppName != "" { //nolint:staticcheck // intentional use of deprecated field for backward compatibility + opts = append(opts, workers.WithMetrics(workers.NewPrometheusMetrics(c.config.AppName))) + } + opts = append(opts, workerRunOpts...) + return opts +} diff --git a/workers_test.go b/workers_test.go new file mode 100644 index 0000000..b4b69df --- /dev/null +++ b/workers_test.go @@ -0,0 +1,175 @@ +package core + +import ( + "context" + "errors" + "net/http" + "sync/atomic" + "testing" + "time" + + "github.com/go-coldbrew/core/config" + "github.com/go-coldbrew/workers" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "google.golang.org/grpc" +) + +// resetWorkerRunOpts clears the package global between tests. +func resetWorkerRunOpts(t *testing.T) { + t.Helper() + prev := workerRunOpts + workerRunOpts = nil + t.Cleanup(func() { workerRunOpts = prev }) +} + +// recordingMetrics is a forward-compatible workers.Metrics that counts +// WorkerStarted invocations. +type recordingMetrics struct { + workers.BaseMetrics + started atomic.Int32 +} + +func (m *recordingMetrics) WorkerStarted(string) { m.started.Add(1) } + +func TestBuildWorkerRunOpts_DefaultPrometheus_AppNameSet(t *testing.T) { + resetWorkerRunOpts(t) + c := &cb{config: config.Config{AppName: "test_buildopts_default"}} + + opts := c.buildWorkerRunOpts() + if len(opts) != 1 { + t.Fatalf("expected one default option, got %d", len(opts)) + } +} + +func TestBuildWorkerRunOpts_NoDefault_WhenDisablePrometheus(t *testing.T) { + resetWorkerRunOpts(t) + c := &cb{config: config.Config{ + AppName: "test_buildopts_disabled", + DisablePrometheus: true, + }} + + opts := c.buildWorkerRunOpts() + if len(opts) != 0 { + t.Fatalf("expected no options when DisablePrometheus=true, got %d", len(opts)) + } +} + +func TestBuildWorkerRunOpts_NoDefault_WhenDeprecatedDisablePormetheus(t *testing.T) { + resetWorkerRunOpts(t) + c := &cb{config: config.Config{ + AppName: "test_buildopts_deprecated", + DisablePormetheus: true, //nolint:staticcheck // testing deprecated field + }} + + opts := c.buildWorkerRunOpts() + if len(opts) != 0 { + t.Fatalf("expected no options when DisablePormetheus=true, got %d", len(opts)) + } +} + +func TestBuildWorkerRunOpts_NoDefault_WhenEmptyAppName(t *testing.T) { + resetWorkerRunOpts(t) + c := &cb{config: config.Config{AppName: ""}} + + opts := c.buildWorkerRunOpts() + if len(opts) != 0 { + t.Fatalf("expected no default option when AppName is empty, got %d", len(opts)) + } +} + +func TestAddWorkerRunOptions_AppendsAndCombinesWithDefault(t *testing.T) { + resetWorkerRunOpts(t) + AddWorkerRunOptions(workers.WithDefaultJitter(0)) + AddWorkerRunOptions(workers.AddInterceptors(noopMiddleware)) + + c := &cb{config: config.Config{AppName: "test_buildopts_combined"}} + opts := c.buildWorkerRunOpts() + // 1 default Prometheus + 2 user options + if len(opts) != 3 { + t.Fatalf("expected 3 options (1 default + 2 user), got %d", len(opts)) + } +} + +func TestAddWorkerRunOptions_NoDefault_WithoutAppName(t *testing.T) { + resetWorkerRunOpts(t) + AddWorkerRunOptions(workers.WithDefaultJitter(5)) + + c := &cb{config: config.Config{}} + opts := c.buildWorkerRunOpts() + if len(opts) != 1 { + t.Fatalf("expected 1 user option (no default), got %d", len(opts)) + } +} + +func noopMiddleware(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error { + return next(ctx, info) +} + +// TestRun_WorkerMetricsWired runs the full core.Run lifecycle with a worker +// and a recording Metrics implementation injected via AddWorkerRunOptions, +// and asserts that WorkerStarted fires — proving the option reaches workers.Run. +func TestRun_WorkerMetricsWired(t *testing.T) { + if testing.Short() { + t.Skip("skipping end-to-end Run lifecycle in short mode") + } + resetWorkerRunOpts(t) + + rec := &recordingMetrics{} + AddWorkerRunOptions(workers.WithMetrics(rec)) + + svc := &workerLifecycleService{} + instance := New(config.Config{ + GRPCPort: 0, + HTTPPort: 0, + ListenHost: "127.0.0.1", + DisableSignalHandler: true, + DisableNewRelic: true, + DisableAutoMaxProcs: true, + DisablePrometheus: true, // suppress the default; recording metrics wins anyway + }) + instance.SetService(svc) + + errCh := make(chan error, 1) + go func() { errCh <- instance.Run() }() + + deadline := time.Now().Add(5 * time.Second) + for rec.started.Load() == 0 && time.Now().Before(deadline) { + time.Sleep(10 * time.Millisecond) + } + if rec.started.Load() == 0 { + t.Fatal("recordingMetrics.WorkerStarted was never invoked") + } + + if err := instance.Stop(2 * time.Second); err != nil { + t.Fatalf("Stop failed: %v", err) + } + + err := <-errCh + if err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, grpc.ErrServerStopped) { + t.Fatalf("unexpected Run error: %v", err) + } +} + +// workerLifecycleService is a CBService + CBWorkerProvider used by the +// end-to-end test above. The worker blocks on ctx so it stays alive long +// enough for WorkerStarted to fire. +type workerLifecycleService struct{} + +func (s *workerLifecycleService) InitHTTP(_ context.Context, _ *runtime.ServeMux, _ string, _ []grpc.DialOption) error { + return nil +} + +func (s *workerLifecycleService) InitGRPC(_ context.Context, _ *grpc.Server) error { return nil } + +func (s *workerLifecycleService) Workers() []*workers.Worker { + return []*workers.Worker{ + workers.NewWorker("test-lifecycle-worker").HandlerFunc( + func(ctx context.Context, _ *workers.WorkerInfo) error { + <-ctx.Done() + return ctx.Err() + }, + ), + } +} + +var _ CBWorkerProvider = (*workerLifecycleService)(nil) From 231e75a5065ad8c358d122d876211444b6e27926 Mon Sep 17 00:00:00 2001 From: Ankur Shrivastava Date: Thu, 30 Apr 2026 15:15:12 +0800 Subject: [PATCH 2/4] fix: address review feedback on metrics wiring - harden TestRun_WorkerMetricsWired: bound the startup wait on errCh, bound the post-Stop receive, and guarantee Stop+drain via t.Cleanup so a failing assertion never leaks the Run goroutine - workers.go: spell "behavior" in American English to match the rest of the package; regenerate README.md --- README.md | 2 +- workers.go | 2 +- workers_test.go | 44 +++++++++++++++++++++++++++++++++++--------- 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 76ed094..e498143 100755 --- a/README.md +++ b/README.md @@ -129,7 +129,7 @@ const SupportPackageIsVersion1 = true func AddWorkerRunOptions(opts ...workers.RunOption) ``` -AddWorkerRunOptions appends \[workers.RunOption\] values applied when core.Run\(\) invokes \[workers.Run\]. Use this to configure framework\-wide worker behaviour: metrics, run\-level interceptors, default jitter, etc. Must be called during init, before Run\(\). Not concurrency\-safe. +AddWorkerRunOptions appends \[workers.RunOption\] values applied when core.Run\(\) invokes \[workers.Run\]. Use this to configure framework\-wide worker behavior: metrics, run\-level interceptors, default jitter, etc. Must be called during init, before Run\(\). Not concurrency\-safe. By default, core wires a Prometheus metrics implementation using the service's APP\_NAME unless DISABLE\_PROMETHEUS=true or APP\_NAME is empty. Pass \[workers.WithMetrics\] here to override that default; a later WithMetrics wins because workers.WithMetrics overwrites runConfig.metrics on each apply. diff --git a/workers.go b/workers.go index a9bbb20..55882d8 100644 --- a/workers.go +++ b/workers.go @@ -9,7 +9,7 @@ var workerRunOpts []workers.RunOption // AddWorkerRunOptions appends [workers.RunOption] values applied when // core.Run() invokes [workers.Run]. Use this to configure framework-wide -// worker behaviour: metrics, run-level interceptors, default jitter, etc. +// worker behavior: metrics, run-level interceptors, default jitter, etc. // Must be called during init, before Run(). Not concurrency-safe. // // By default, core wires a Prometheus metrics implementation using the diff --git a/workers_test.go b/workers_test.go index b4b69df..80804bd 100644 --- a/workers_test.go +++ b/workers_test.go @@ -132,21 +132,47 @@ func TestRun_WorkerMetricsWired(t *testing.T) { errCh := make(chan error, 1) go func() { errCh <- instance.Run() }() - deadline := time.Now().Add(5 * time.Second) - for rec.started.Load() == 0 && time.Now().Before(deadline) { - time.Sleep(10 * time.Millisecond) - } - if rec.started.Load() == 0 { - t.Fatal("recordingMetrics.WorkerStarted was never invoked") + // Always stop the instance and drain Run() before the test exits, so a + // failing assertion below doesn't leak the Run goroutine. + var stopped atomic.Bool + t.Cleanup(func() { + if !stopped.CompareAndSwap(false, true) { + return + } + _ = instance.Stop(2 * time.Second) + select { + case <-errCh: + case <-time.After(2 * time.Second): + } + }) + + startDeadline := time.After(5 * time.Second) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for rec.started.Load() == 0 { + select { + case err := <-errCh: + t.Fatalf("Run exited before WorkerStarted fired: %v", err) + case <-startDeadline: + t.Fatal("recordingMetrics.WorkerStarted was never invoked") + case <-ticker.C: + } } + if !stopped.CompareAndSwap(false, true) { + return + } if err := instance.Stop(2 * time.Second); err != nil { t.Fatalf("Stop failed: %v", err) } - err := <-errCh - if err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, grpc.ErrServerStopped) { - t.Fatalf("unexpected Run error: %v", err) + select { + case err := <-errCh: + if err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, grpc.ErrServerStopped) { + t.Fatalf("unexpected Run error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Run did not exit after Stop") } } From 2aa9b0068f73efbe05863d827c387d5b54a4f864 Mon Sep 17 00:00:00 2001 From: Ankur Shrivastava Date: Thu, 30 Apr 2026 16:41:34 +0800 Subject: [PATCH 3/4] test: cover the user-metrics-overrides-default-prometheus contract Adds TestRun_UserMetricsOverridesDefaultPrometheus and extracts the shared lifecycle scaffolding into runWithRecorder. The new test runs the full Run lifecycle with a unique AppName (so the default NewPrometheusMetrics is prepended) and a caller-supplied workers.WithMetrics(rec); asserting rec.WorkerStarted fires proves the caller-supplied metrics is the effective implementation. --- workers_test.go | 68 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/workers_test.go b/workers_test.go index 80804bd..356bfc5 100644 --- a/workers_test.go +++ b/workers_test.go @@ -3,6 +3,7 @@ package core import ( "context" "errors" + "fmt" "net/http" "sync/atomic" "testing" @@ -105,35 +106,29 @@ func noopMiddleware(ctx context.Context, info *workers.WorkerInfo, next workers. return next(ctx, info) } -// TestRun_WorkerMetricsWired runs the full core.Run lifecycle with a worker -// and a recording Metrics implementation injected via AddWorkerRunOptions, -// and asserts that WorkerStarted fires — proving the option reaches workers.Run. -func TestRun_WorkerMetricsWired(t *testing.T) { - if testing.Short() { - t.Skip("skipping end-to-end Run lifecycle in short mode") - } - resetWorkerRunOpts(t) - - rec := &recordingMetrics{} +// runWithRecorder boots a core.cb with cfg, registers a CBWorkerProvider, +// injects rec via AddWorkerRunOptions, runs, waits for the recorder's +// WorkerStarted callback, then stops and drains. Used by the end-to-end +// tests below to verify that the option slice reaches workers.Run. +func runWithRecorder(t *testing.T, cfg config.Config, rec *recordingMetrics) { + t.Helper() AddWorkerRunOptions(workers.WithMetrics(rec)) - svc := &workerLifecycleService{} - instance := New(config.Config{ - GRPCPort: 0, - HTTPPort: 0, - ListenHost: "127.0.0.1", - DisableSignalHandler: true, - DisableNewRelic: true, - DisableAutoMaxProcs: true, - DisablePrometheus: true, // suppress the default; recording metrics wins anyway - }) - instance.SetService(svc) + cfg.GRPCPort = 0 + cfg.HTTPPort = 0 + cfg.ListenHost = "127.0.0.1" + cfg.DisableSignalHandler = true + cfg.DisableNewRelic = true + cfg.DisableAutoMaxProcs = true + + instance := New(cfg) + instance.SetService(&workerLifecycleService{}) errCh := make(chan error, 1) go func() { errCh <- instance.Run() }() // Always stop the instance and drain Run() before the test exits, so a - // failing assertion below doesn't leak the Run goroutine. + // failing assertion below never leaks the Run goroutine. var stopped atomic.Bool t.Cleanup(func() { if !stopped.CompareAndSwap(false, true) { @@ -176,6 +171,35 @@ func TestRun_WorkerMetricsWired(t *testing.T) { } } +// TestRun_WorkerMetricsWired runs the full core.Run lifecycle with a worker +// and a recording Metrics implementation injected via AddWorkerRunOptions, +// and asserts that WorkerStarted fires — proving the option reaches workers.Run. +func TestRun_WorkerMetricsWired(t *testing.T) { + if testing.Short() { + t.Skip("skipping end-to-end Run lifecycle in short mode") + } + resetWorkerRunOpts(t) + rec := &recordingMetrics{} + runWithRecorder(t, config.Config{DisablePrometheus: true}, rec) +} + +// TestRun_UserMetricsOverridesDefaultPrometheus exercises the override +// contract: when AppName is set (so the default Prometheus metrics is +// prepended) and the caller also adds workers.WithMetrics via +// AddWorkerRunOptions, the caller's recorder must be the effective metrics +// implementation. Uses a unique app name per run to avoid colliding with +// the process-global namespace cache in workers.NewPrometheusMetrics. +func TestRun_UserMetricsOverridesDefaultPrometheus(t *testing.T) { + if testing.Short() { + t.Skip("skipping end-to-end Run lifecycle in short mode") + } + resetWorkerRunOpts(t) + rec := &recordingMetrics{} + runWithRecorder(t, config.Config{ + AppName: fmt.Sprintf("test_override_%d", time.Now().UnixNano()), + }, rec) +} + // workerLifecycleService is a CBService + CBWorkerProvider used by the // end-to-end test above. The worker blocks on ctx so it stays alive long // enough for WorkerStarted to fire. From c506e33984ee741335ed0c97490eb857f9baf87e Mon Sep 17 00:00:00 2001 From: Ankur Shrivastava Date: Thu, 30 Apr 2026 17:35:11 +0800 Subject: [PATCH 4/4] test: fail loudly if Run does not exit during cleanup The cleanup's drain previously fell through silently on the 2s timeout, which could leak the Run goroutine and surface as a less actionable goleak error later. Report the timeout via t.Errorf so the failure points directly at the cleanup path. --- workers_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/workers_test.go b/workers_test.go index 356bfc5..5fc1cd0 100644 --- a/workers_test.go +++ b/workers_test.go @@ -128,7 +128,9 @@ func runWithRecorder(t *testing.T, cfg config.Config, rec *recordingMetrics) { go func() { errCh <- instance.Run() }() // Always stop the instance and drain Run() before the test exits, so a - // failing assertion below never leaks the Run goroutine. + // failing assertion below never leaks the Run goroutine. If Run() doesn't + // exit within the timeout, fail the test explicitly rather than letting + // goleak surface a less actionable error later. var stopped atomic.Bool t.Cleanup(func() { if !stopped.CompareAndSwap(false, true) { @@ -138,6 +140,7 @@ func runWithRecorder(t *testing.T, cfg config.Config, rec *recordingMetrics) { select { case <-errCh: case <-time.After(2 * time.Second): + t.Errorf("instance.Run() did not exit within 2s after Stop during cleanup") } })