diff --git a/README.md b/README.md index 20d6df3..e498143 100755 --- a/README.md +++ b/README.md @@ -85,6 +85,7 @@ For full documentation, visit https://docs.coldbrew.cloud ## Index - [Constants](<#constants>) +- [func AddWorkerRunOptions\(opts ...workers.RunOption\)](<#AddWorkerRunOptions>) - [func InitializeVTProto\(\)](<#InitializeVTProto>) - [func OTELMeterProvider\(\) otelmetric.MeterProvider](<#OTELMeterProvider>) - [func SetOTELGRPCClientOptions\(opts ...otelgrpc.Option\)](<#SetOTELGRPCClientOptions>) @@ -121,6 +122,17 @@ For full documentation, visit https://docs.coldbrew.cloud const SupportPackageIsVersion1 = true ``` + +## func [AddWorkerRunOptions]() + +```go +func AddWorkerRunOptions(opts ...workers.RunOption) +``` + +AddWorkerRunOptions appends \[workers.RunOption\] values applied when core.Run\(\) invokes \[workers.Run\]. Use this to configure framework\-wide worker behavior: metrics, run\-level interceptors, default jitter, etc. Must be called during init, before Run\(\). Not concurrency\-safe. + +By default, core wires a Prometheus metrics implementation using the service's APP\_NAME unless DISABLE\_PROMETHEUS=true or APP\_NAME is empty. Pass \[workers.WithMetrics\] here to override that default; a later WithMetrics wins because workers.WithMetrics overwrites runConfig.metrics on each apply. + ## func [InitializeVTProto]() @@ -319,7 +331,7 @@ type CB interface { ``` -### func [New]() +### func [New]() ```go func New(c config.Config) CB diff --git a/core.go b/core.go index 5eab7b1..a6f5610 100644 --- a/core.go +++ b/core.go @@ -846,9 +846,10 @@ func (c *cb) Run() error { if len(allWorkers) > 0 { workerCtx, workerCancel := context.WithCancel(gctx) c.workerCancel = workerCancel + runOpts := c.buildWorkerRunOpts() g.Go(func() error { // workers.Run returns nil on context cancellation (clean shutdown). - return workers.Run(workerCtx, allWorkers) + return workers.Run(workerCtx, allWorkers, runOpts...) }) } diff --git a/workers.go b/workers.go new file mode 100644 index 0000000..55882d8 --- /dev/null +++ b/workers.go @@ -0,0 +1,34 @@ +package core + +import "github.com/go-coldbrew/workers" + +// Run-level options applied when core.Run() invokes workers.Run. Mutated +// during init via AddWorkerRunOptions; not concurrency-safe by design (same +// contract as the OTEL setters in core.go). +var workerRunOpts []workers.RunOption + +// AddWorkerRunOptions appends [workers.RunOption] values applied when +// core.Run() invokes [workers.Run]. Use this to configure framework-wide +// worker behavior: metrics, run-level interceptors, default jitter, etc. +// Must be called during init, before Run(). Not concurrency-safe. +// +// By default, core wires a Prometheus metrics implementation using the +// service's APP_NAME unless DISABLE_PROMETHEUS=true or APP_NAME is empty. +// Pass [workers.WithMetrics] here to override that default; a later +// WithMetrics wins because workers.WithMetrics overwrites runConfig.metrics +// on each apply. +func AddWorkerRunOptions(opts ...workers.RunOption) { + workerRunOpts = append(workerRunOpts, opts...) +} + +// buildWorkerRunOpts assembles the option slice passed to workers.Run. The +// default Prometheus metrics (when enabled) is prepended so that any +// user-supplied workers.WithMetrics overrides it. +func (c *cb) buildWorkerRunOpts() []workers.RunOption { + opts := make([]workers.RunOption, 0, len(workerRunOpts)+1) + if !c.config.DisablePrometheus && !c.config.DisablePormetheus && c.config.AppName != "" { //nolint:staticcheck // intentional use of deprecated field for backward compatibility + opts = append(opts, workers.WithMetrics(workers.NewPrometheusMetrics(c.config.AppName))) + } + opts = append(opts, workerRunOpts...) + return opts +} diff --git a/workers_test.go b/workers_test.go new file mode 100644 index 0000000..5fc1cd0 --- /dev/null +++ b/workers_test.go @@ -0,0 +1,228 @@ +package core + +import ( + "context" + "errors" + "fmt" + "net/http" + "sync/atomic" + "testing" + "time" + + "github.com/go-coldbrew/core/config" + "github.com/go-coldbrew/workers" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "google.golang.org/grpc" +) + +// resetWorkerRunOpts clears the package global between tests. +func resetWorkerRunOpts(t *testing.T) { + t.Helper() + prev := workerRunOpts + workerRunOpts = nil + t.Cleanup(func() { workerRunOpts = prev }) +} + +// recordingMetrics is a forward-compatible workers.Metrics that counts +// WorkerStarted invocations. +type recordingMetrics struct { + workers.BaseMetrics + started atomic.Int32 +} + +func (m *recordingMetrics) WorkerStarted(string) { m.started.Add(1) } + +func TestBuildWorkerRunOpts_DefaultPrometheus_AppNameSet(t *testing.T) { + resetWorkerRunOpts(t) + c := &cb{config: config.Config{AppName: "test_buildopts_default"}} + + opts := c.buildWorkerRunOpts() + if len(opts) != 1 { + t.Fatalf("expected one default option, got %d", len(opts)) + } +} + +func TestBuildWorkerRunOpts_NoDefault_WhenDisablePrometheus(t *testing.T) { + resetWorkerRunOpts(t) + c := &cb{config: config.Config{ + AppName: "test_buildopts_disabled", + DisablePrometheus: true, + }} + + opts := c.buildWorkerRunOpts() + if len(opts) != 0 { + t.Fatalf("expected no options when DisablePrometheus=true, got %d", len(opts)) + } +} + +func TestBuildWorkerRunOpts_NoDefault_WhenDeprecatedDisablePormetheus(t *testing.T) { + resetWorkerRunOpts(t) + c := &cb{config: config.Config{ + AppName: "test_buildopts_deprecated", + DisablePormetheus: true, //nolint:staticcheck // testing deprecated field + }} + + opts := c.buildWorkerRunOpts() + if len(opts) != 0 { + t.Fatalf("expected no options when DisablePormetheus=true, got %d", len(opts)) + } +} + +func TestBuildWorkerRunOpts_NoDefault_WhenEmptyAppName(t *testing.T) { + resetWorkerRunOpts(t) + c := &cb{config: config.Config{AppName: ""}} + + opts := c.buildWorkerRunOpts() + if len(opts) != 0 { + t.Fatalf("expected no default option when AppName is empty, got %d", len(opts)) + } +} + +func TestAddWorkerRunOptions_AppendsAndCombinesWithDefault(t *testing.T) { + resetWorkerRunOpts(t) + AddWorkerRunOptions(workers.WithDefaultJitter(0)) + AddWorkerRunOptions(workers.AddInterceptors(noopMiddleware)) + + c := &cb{config: config.Config{AppName: "test_buildopts_combined"}} + opts := c.buildWorkerRunOpts() + // 1 default Prometheus + 2 user options + if len(opts) != 3 { + t.Fatalf("expected 3 options (1 default + 2 user), got %d", len(opts)) + } +} + +func TestAddWorkerRunOptions_NoDefault_WithoutAppName(t *testing.T) { + resetWorkerRunOpts(t) + AddWorkerRunOptions(workers.WithDefaultJitter(5)) + + c := &cb{config: config.Config{}} + opts := c.buildWorkerRunOpts() + if len(opts) != 1 { + t.Fatalf("expected 1 user option (no default), got %d", len(opts)) + } +} + +func noopMiddleware(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error { + return next(ctx, info) +} + +// runWithRecorder boots a core.cb with cfg, registers a CBWorkerProvider, +// injects rec via AddWorkerRunOptions, runs, waits for the recorder's +// WorkerStarted callback, then stops and drains. Used by the end-to-end +// tests below to verify that the option slice reaches workers.Run. +func runWithRecorder(t *testing.T, cfg config.Config, rec *recordingMetrics) { + t.Helper() + AddWorkerRunOptions(workers.WithMetrics(rec)) + + cfg.GRPCPort = 0 + cfg.HTTPPort = 0 + cfg.ListenHost = "127.0.0.1" + cfg.DisableSignalHandler = true + cfg.DisableNewRelic = true + cfg.DisableAutoMaxProcs = true + + instance := New(cfg) + instance.SetService(&workerLifecycleService{}) + + errCh := make(chan error, 1) + go func() { errCh <- instance.Run() }() + + // Always stop the instance and drain Run() before the test exits, so a + // failing assertion below never leaks the Run goroutine. If Run() doesn't + // exit within the timeout, fail the test explicitly rather than letting + // goleak surface a less actionable error later. + var stopped atomic.Bool + t.Cleanup(func() { + if !stopped.CompareAndSwap(false, true) { + return + } + _ = instance.Stop(2 * time.Second) + select { + case <-errCh: + case <-time.After(2 * time.Second): + t.Errorf("instance.Run() did not exit within 2s after Stop during cleanup") + } + }) + + startDeadline := time.After(5 * time.Second) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for rec.started.Load() == 0 { + select { + case err := <-errCh: + t.Fatalf("Run exited before WorkerStarted fired: %v", err) + case <-startDeadline: + t.Fatal("recordingMetrics.WorkerStarted was never invoked") + case <-ticker.C: + } + } + + if !stopped.CompareAndSwap(false, true) { + return + } + if err := instance.Stop(2 * time.Second); err != nil { + t.Fatalf("Stop failed: %v", err) + } + + select { + case err := <-errCh: + if err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, grpc.ErrServerStopped) { + t.Fatalf("unexpected Run error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Run did not exit after Stop") + } +} + +// TestRun_WorkerMetricsWired runs the full core.Run lifecycle with a worker +// and a recording Metrics implementation injected via AddWorkerRunOptions, +// and asserts that WorkerStarted fires — proving the option reaches workers.Run. +func TestRun_WorkerMetricsWired(t *testing.T) { + if testing.Short() { + t.Skip("skipping end-to-end Run lifecycle in short mode") + } + resetWorkerRunOpts(t) + rec := &recordingMetrics{} + runWithRecorder(t, config.Config{DisablePrometheus: true}, rec) +} + +// TestRun_UserMetricsOverridesDefaultPrometheus exercises the override +// contract: when AppName is set (so the default Prometheus metrics is +// prepended) and the caller also adds workers.WithMetrics via +// AddWorkerRunOptions, the caller's recorder must be the effective metrics +// implementation. Uses a unique app name per run to avoid colliding with +// the process-global namespace cache in workers.NewPrometheusMetrics. +func TestRun_UserMetricsOverridesDefaultPrometheus(t *testing.T) { + if testing.Short() { + t.Skip("skipping end-to-end Run lifecycle in short mode") + } + resetWorkerRunOpts(t) + rec := &recordingMetrics{} + runWithRecorder(t, config.Config{ + AppName: fmt.Sprintf("test_override_%d", time.Now().UnixNano()), + }, rec) +} + +// workerLifecycleService is a CBService + CBWorkerProvider used by the +// end-to-end test above. The worker blocks on ctx so it stays alive long +// enough for WorkerStarted to fire. +type workerLifecycleService struct{} + +func (s *workerLifecycleService) InitHTTP(_ context.Context, _ *runtime.ServeMux, _ string, _ []grpc.DialOption) error { + return nil +} + +func (s *workerLifecycleService) InitGRPC(_ context.Context, _ *grpc.Server) error { return nil } + +func (s *workerLifecycleService) Workers() []*workers.Worker { + return []*workers.Worker{ + workers.NewWorker("test-lifecycle-worker").HandlerFunc( + func(ctx context.Context, _ *workers.WorkerInfo) error { + <-ctx.Done() + return ctx.Err() + }, + ), + } +} + +var _ CBWorkerProvider = (*workerLifecycleService)(nil)