From 76b0ea91c1e0cca2dd0abc54b1ce7df02c8f0463 Mon Sep 17 00:00:00 2001 From: Brent Rager Date: Sat, 20 Jun 2026 22:28:49 -0400 Subject: [PATCH 1/2] SMOODEV-2026: Go observability transport via SmooAI/fetch (not net/http) Co-Authored-By: Claude Opus 4.8 (1M context) --- go/go.mod | 2 ++ go/go.sum | 4 +++ go/otel.go | 11 ++++++++ go/transport.go | 64 +++++++++++++++++++++++++++++--------------- go/transport_test.go | 50 ++++++++++++++++++++++++++++++++++ 5 files changed, 110 insertions(+), 21 deletions(-) diff --git a/go/go.mod b/go/go.mod index fede6d5..b7b264a 100644 --- a/go/go.mod +++ b/go/go.mod @@ -3,6 +3,7 @@ module github.com/SmooAI/observability/go go 1.25.0 require ( + github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a github.com/google/uuid v1.6.0 go.opentelemetry.io/otel v1.44.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.44.0 @@ -19,6 +20,7 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 // indirect + github.com/sony/gobreaker/v2 v2.0.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.44.0 // indirect go.opentelemetry.io/proto/otlp v1.10.0 // indirect diff --git a/go/go.sum b/go/go.sum index 0debfaf..7794a8d 100644 --- a/go/go.sum +++ b/go/go.sum @@ -1,3 +1,5 @@ +github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a h1:PC/sAX6DhpTSqqqNg0hBczWktpxgjRYSUMbX0jPqmyw= +github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a/go.mod h1:U+FFF0sdcQEdn98GltLEqRfviH6uHebuxOxfZIVR14A= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -19,6 +21,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 h1:5VipnvEpbqr2gA2VbM+nYVbkIF2 github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0/go.mod h1:Hyl3n6Twe1hvtd9XUXDec4pTvgMSEixRuQKPTMH2bNs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sony/gobreaker/v2 v2.0.0 h1:23AaR4JQ65y4rz8JWMzgXw2gKOykZ/qfqYunll4OwJ4= +github.com/sony/gobreaker/v2 v2.0.0/go.mod h1:8JnRUz80DJ1/ne8M8v7nmTs2713i58nIt4s7XcGe/DI= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= diff --git a/go/otel.go b/go/otel.go index e851683..31fa241 100644 --- a/go/otel.go +++ b/go/otel.go @@ -203,6 +203,17 @@ func buildResource(opts SetupOtelOptions) *resource.Resource { // buildHTTPClient returns the custom-auth HTTP client when a TokenProvider is // set, the caller's override, or nil to let the exporter use its default. +// +// SMOODEV-2026 note: the OTLP/HTTP exporters are deliberately left on plain +// net/http rather than routed through github.com/SmooAI/fetch/go/fetch. The +// otlptracehttp/otlpmetrichttp exporters only accept a *http.Client (via +// WithHTTPClient) and speak protobuf, while fetch is a generics-over-JSON client +// that exposes no http.RoundTripper adapter — there is no clean seam to drive +// the protobuf export through fetch's resilient pipeline without re-implementing +// the exporter transport. The exporters already carry their own sane retry +// (otlptracehttp.WithRetry, on by default with backoff), so wrapping them in +// fetch would also double the retry layer. Only the webhook batch transport +// (transport.go) was migrated to fetch. Revisit if fetch grows a RoundTripper. func buildHTTPClient(opts SetupOtelOptions) *http.Client { if opts.TokenProvider == nil && opts.HTTPClient == nil { return nil diff --git a/go/transport.go b/go/transport.go index 3ef7a05..9d05524 100644 --- a/go/transport.go +++ b/go/transport.go @@ -1,18 +1,29 @@ package observability import ( - "bytes" "context" - "encoding/json" + "errors" "net/http" "sync" "time" + + "github.com/SmooAI/fetch/go/fetch" ) // Batched HTTP transport. Holds a bounded queue, flushes on a timer or when // MaxBatchSize events are buffered, and retries a failed batch by pushing it // back to the front of the queue. Errors are swallowed — observability must // never throw into host code. Mirrors the TS Transport. +// +// Outbound delivery goes through github.com/SmooAI/fetch/go/fetch (SMOODEV-2026) +// so each batch POST gets the resilient stack — jittered exponential-backoff +// retries on 429/5xx + network errors, a per-request timeout, and an optional +// circuit breaker — instead of a bare net/http call. fetch handles transient +// blips within a single Flush; the transport's re-queue-on-failure handles +// longer outages across Flush cycles, so the two retry layers are complementary +// (fast in-call recovery vs. durable cross-cycle persistence), not duplicative: +// fetch aborts immediately on 4xx (RetryAbort), matching the old permanent-error +// behavior, so a persistent client error is not retried twice in a row. const ( defaultFlushMillis = 1000 @@ -26,7 +37,7 @@ type Transport struct { flushInterval time.Duration maxBatch int maxQueue int - client *http.Client + client *fetch.Client mu sync.Mutex queue []ObservabilityEvent @@ -41,7 +52,8 @@ type TransportOptions struct { FlushInterval time.Duration MaxBatchSize int MaxQueueSize int - // HTTPClient overrides the default client (test seam). + // HTTPClient overrides the underlying *http.Client that the resilient fetch + // client drives (test seam). When nil, fetch's default transport is used. HTTPClient *http.Client } @@ -59,19 +71,31 @@ func NewTransport(opts TransportOptions) *Transport { if queue <= 0 { queue = defaultQueueMax } - client := opts.HTTPClient - if client == nil { - client = &http.Client{Timeout: 10 * time.Second} - } return &Transport{ dsn: opts.DSN, flushInterval: flush, maxBatch: batch, maxQueue: queue, - client: client, + client: buildFetchClient(opts.HTTPClient), } } +// buildFetchClient assembles the resilient fetch client used for batch delivery. +// It keeps the prior 10s timeout and JSON content type, adds default retries +// (429/5xx + network, jittered backoff, aborts on 4xx), and lets a test seam +// swap the underlying *http.Client. +func buildFetchClient(httpClient *http.Client) *fetch.Client { + retry := fetch.DefaultRetryOptions + b := fetch.NewClientBuilder(). + WithTimeout(10 * time.Second). + WithRetry(&retry). + WithBaseHeaders(http.Header{"Content-Type": []string{"application/json"}}) + if httpClient != nil { + b = b.WithHTTPClient(httpClient) + } + return b.Build() +} + // newTransportFromClientOptions wires a Transport from ClientOptions, used by // the bootstrap / default init when a DSN is configured. func newTransportFromClientOptions(opts ClientOptions) *Transport { @@ -149,21 +173,19 @@ func (t *Transport) Flush(ctx context.Context) error { func (t *Transport) send(ctx context.Context, batch []ObservabilityEvent) error { payload := IngestPayload{Type: "error", Events: batch} - body, err := json.Marshal(payload) - if err != nil { - return err - } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, t.dsn, bytes.NewReader(body)) - if err != nil { - return err - } - req.Header.Set("content-type", "application/json") - resp, err := t.client.Do(req) + // fetch marshals the body to JSON and applies retries/timeout/backoff. It + // returns a typed *fetch.HTTPResponseError for non-2xx responses after + // retries are exhausted; normalize that to httpStatusError so callers and + // tests see the same surface as before. + resp, err := fetch.SimplePost(ctx, t.client, t.dsn, payload, nil) if err != nil { + var httpErr *fetch.HTTPResponseError + if errors.As(err, &httpErr) { + return &httpStatusError{status: httpErr.StatusCode} + } return err } - defer resp.Body.Close() - if resp.StatusCode >= 400 { + if !resp.OK { return &httpStatusError{status: resp.StatusCode} } return nil diff --git a/go/transport_test.go b/go/transport_test.go index bf1673d..2758d05 100644 --- a/go/transport_test.go +++ b/go/transport_test.go @@ -3,6 +3,7 @@ package observability import ( "context" "encoding/json" + "errors" "io" "net/http" "net/http/httptest" @@ -108,6 +109,55 @@ func TestFlushEmptyNoop(t *testing.T) { } } +// TestTransportFetchInCallRetry verifies the resilient fetch stack retries a +// transient 5xx within a single send/Flush (SMOODEV-2026) — the first attempt +// 500s, fetch backs off and retries, and the batch lands without ever needing +// the transport's slower re-queue path. queueSize stays 0 after the flush. +func TestTransportFetchInCallRetry(t *testing.T) { + var hits int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&hits, 1) == 1 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + tr := NewTransport(TransportOptions{DSN: srv.URL, MaxBatchSize: 1}) + tr.Enqueue(ObservabilityEvent{EventID: "1"}) // full batch -> synchronous Flush + + // fetch retried the 500 in-call and succeeded, so the server saw >= 2 hits + // and the queue drained without a re-queue. + waitFor(t, func() bool { return atomic.LoadInt32(&hits) >= 2 }) + waitFor(t, func() bool { return tr.queueSize() == 0 }) +} + +// TestTransportSendErrorOnPersistent4xx confirms a persistent client error +// surfaces as httpStatusError (fetch aborts retries on 4xx) so Flush re-queues +// the batch rather than dropping it. +func TestTransportSendErrorOnPersistent4xx(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + })) + defer srv.Close() + + tr := NewTransport(TransportOptions{DSN: srv.URL, MaxBatchSize: 10}) + tr.Enqueue(ObservabilityEvent{EventID: "1"}) + err := tr.Flush(context.Background()) + var statusErr *httpStatusError + if !errors.As(err, &statusErr) { + t.Fatalf("expected *httpStatusError, got %v", err) + } + if statusErr.status != http.StatusBadRequest { + t.Errorf("status = %d, want 400", statusErr.status) + } + // Batch was re-queued for a later attempt, not dropped. + if tr.queueSize() != 1 { + t.Errorf("queue size = %d, want 1 (re-queued)", tr.queueSize()) + } +} + func waitFor(t *testing.T, cond func() bool) { t.Helper() deadline := time.Now().Add(2 * time.Second) From 8ed77c5e0574063d4c61fed70d1ebe6d022120bb Mon Sep 17 00:00:00 2001 From: Brent Rager Date: Sat, 20 Jun 2026 23:08:55 -0400 Subject: [PATCH 2/2] SMOODEV-2026: add fetch/go go.sum entries in fiber+gin submodules (fix CI per-module vet) --- go/fiber/go.mod | 2 ++ go/fiber/go.sum | 4 ++++ go/gin/go.mod | 2 ++ go/gin/go.sum | 4 ++++ 4 files changed, 12 insertions(+) diff --git a/go/fiber/go.mod b/go/fiber/go.mod index e041a70..4c03dca 100644 --- a/go/fiber/go.mod +++ b/go/fiber/go.mod @@ -8,6 +8,7 @@ require ( ) require ( + github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -20,6 +21,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/rivo/uniseg v0.2.0 // indirect + github.com/sony/gobreaker/v2 v2.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.51.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect diff --git a/go/fiber/go.sum b/go/fiber/go.sum index b05b9a7..8307a83 100644 --- a/go/fiber/go.sum +++ b/go/fiber/go.sum @@ -1,3 +1,5 @@ +github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a h1:PC/sAX6DhpTSqqqNg0hBczWktpxgjRYSUMbX0jPqmyw= +github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a/go.mod h1:U+FFF0sdcQEdn98GltLEqRfviH6uHebuxOxfZIVR14A= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= @@ -34,6 +36,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/sony/gobreaker/v2 v2.0.0 h1:23AaR4JQ65y4rz8JWMzgXw2gKOykZ/qfqYunll4OwJ4= +github.com/sony/gobreaker/v2 v2.0.0/go.mod h1:8JnRUz80DJ1/ne8M8v7nmTs2713i58nIt4s7XcGe/DI= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= diff --git a/go/gin/go.mod b/go/gin/go.mod index 142aa4d..e156fb0 100644 --- a/go/gin/go.mod +++ b/go/gin/go.mod @@ -8,6 +8,7 @@ require ( ) require ( + github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect @@ -31,6 +32,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/sony/gobreaker/v2 v2.0.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect diff --git a/go/gin/go.sum b/go/gin/go.sum index eacf94f..dc77c6d 100644 --- a/go/gin/go.sum +++ b/go/gin/go.sum @@ -1,3 +1,5 @@ +github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a h1:PC/sAX6DhpTSqqqNg0hBczWktpxgjRYSUMbX0jPqmyw= +github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a/go.mod h1:U+FFF0sdcQEdn98GltLEqRfviH6uHebuxOxfZIVR14A= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= @@ -68,6 +70,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/sony/gobreaker/v2 v2.0.0 h1:23AaR4JQ65y4rz8JWMzgXw2gKOykZ/qfqYunll4OwJ4= +github.com/sony/gobreaker/v2 v2.0.0/go.mod h1:8JnRUz80DJ1/ne8M8v7nmTs2713i58nIt4s7XcGe/DI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=