Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/fiber/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
)

require (
github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand All @@ -20,6 +21,7 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sony/gobreaker/v2 v2.0.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.51.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go/fiber/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a h1:PC/sAX6DhpTSqqqNg0hBczWktpxgjRYSUMbX0jPqmyw=
github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a/go.mod h1:U+FFF0sdcQEdn98GltLEqRfviH6uHebuxOxfZIVR14A=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
Expand Down Expand Up @@ -34,6 +36,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/sony/gobreaker/v2 v2.0.0 h1:23AaR4JQ65y4rz8JWMzgXw2gKOykZ/qfqYunll4OwJ4=
github.com/sony/gobreaker/v2 v2.0.0/go.mod h1:8JnRUz80DJ1/ne8M8v7nmTs2713i58nIt4s7XcGe/DI=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
Expand Down
2 changes: 2 additions & 0 deletions go/gin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
)

require (
github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a // indirect
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
Expand All @@ -31,6 +32,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/sony/gobreaker/v2 v2.0.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go/gin/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a h1:PC/sAX6DhpTSqqqNg0hBczWktpxgjRYSUMbX0jPqmyw=
github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a/go.mod h1:U+FFF0sdcQEdn98GltLEqRfviH6uHebuxOxfZIVR14A=
github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0=
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
Expand Down Expand Up @@ -68,6 +70,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/sony/gobreaker/v2 v2.0.0 h1:23AaR4JQ65y4rz8JWMzgXw2gKOykZ/qfqYunll4OwJ4=
github.com/sony/gobreaker/v2 v2.0.0/go.mod h1:8JnRUz80DJ1/ne8M8v7nmTs2713i58nIt4s7XcGe/DI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
2 changes: 2 additions & 0 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/SmooAI/observability/go
go 1.25.0

require (
github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a
github.com/google/uuid v1.6.0
go.opentelemetry.io/otel v1.44.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.44.0
Expand All @@ -19,6 +20,7 @@ require (
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 // indirect
github.com/sony/gobreaker/v2 v2.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.44.0 // indirect
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a h1:PC/sAX6DhpTSqqqNg0hBczWktpxgjRYSUMbX0jPqmyw=
github.com/SmooAI/fetch/go/fetch v0.0.0-20260518194757-88f518b7813a/go.mod h1:U+FFF0sdcQEdn98GltLEqRfviH6uHebuxOxfZIVR14A=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand All @@ -19,6 +21,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 h1:5VipnvEpbqr2gA2VbM+nYVbkIF2
github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0/go.mod h1:Hyl3n6Twe1hvtd9XUXDec4pTvgMSEixRuQKPTMH2bNs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sony/gobreaker/v2 v2.0.0 h1:23AaR4JQ65y4rz8JWMzgXw2gKOykZ/qfqYunll4OwJ4=
github.com/sony/gobreaker/v2 v2.0.0/go.mod h1:8JnRUz80DJ1/ne8M8v7nmTs2713i58nIt4s7XcGe/DI=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
Expand Down
11 changes: 11 additions & 0 deletions go/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,17 @@ func buildResource(opts SetupOtelOptions) *resource.Resource {

// buildHTTPClient returns the custom-auth HTTP client when a TokenProvider is
// set, the caller's override, or nil to let the exporter use its default.
//
// SMOODEV-2026 note: the OTLP/HTTP exporters are deliberately left on plain
// net/http rather than routed through github.com/SmooAI/fetch/go/fetch. The
// otlptracehttp/otlpmetrichttp exporters only accept a *http.Client (via
// WithHTTPClient) and speak protobuf, while fetch is a generics-over-JSON client
// that exposes no http.RoundTripper adapter — there is no clean seam to drive
// the protobuf export through fetch's resilient pipeline without re-implementing
// the exporter transport. The exporters already carry their own sane retry
// (otlptracehttp.WithRetry, on by default with backoff), so wrapping them in
// fetch would also double the retry layer. Only the webhook batch transport
// (transport.go) was migrated to fetch. Revisit if fetch grows a RoundTripper.
func buildHTTPClient(opts SetupOtelOptions) *http.Client {
if opts.TokenProvider == nil && opts.HTTPClient == nil {
return nil
Expand Down
64 changes: 43 additions & 21 deletions go/transport.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
package observability

import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"sync"
"time"

"github.com/SmooAI/fetch/go/fetch"
)

// Batched HTTP transport. Holds a bounded queue, flushes on a timer or when
// MaxBatchSize events are buffered, and retries a failed batch by pushing it
// back to the front of the queue. Errors are swallowed — observability must
// never throw into host code. Mirrors the TS Transport.
//
// Outbound delivery goes through github.com/SmooAI/fetch/go/fetch (SMOODEV-2026)
// so each batch POST gets the resilient stack — jittered exponential-backoff
// retries on 429/5xx + network errors, a per-request timeout, and an optional
// circuit breaker — instead of a bare net/http call. fetch handles transient
// blips within a single Flush; the transport's re-queue-on-failure handles
// longer outages across Flush cycles, so the two retry layers are complementary
// (fast in-call recovery vs. durable cross-cycle persistence), not duplicative:
// fetch aborts immediately on 4xx (RetryAbort), matching the old permanent-error
// behavior, so a persistent client error is not retried twice in a row.

const (
defaultFlushMillis = 1000
Expand All @@ -26,7 +37,7 @@ type Transport struct {
flushInterval time.Duration
maxBatch int
maxQueue int
client *http.Client
client *fetch.Client

mu sync.Mutex
queue []ObservabilityEvent
Expand All @@ -41,7 +52,8 @@ type TransportOptions struct {
FlushInterval time.Duration
MaxBatchSize int
MaxQueueSize int
// HTTPClient overrides the default client (test seam).
// HTTPClient overrides the underlying *http.Client that the resilient fetch
// client drives (test seam). When nil, fetch's default transport is used.
HTTPClient *http.Client
}

Expand All @@ -59,19 +71,31 @@ func NewTransport(opts TransportOptions) *Transport {
if queue <= 0 {
queue = defaultQueueMax
}
client := opts.HTTPClient
if client == nil {
client = &http.Client{Timeout: 10 * time.Second}
}
return &Transport{
dsn: opts.DSN,
flushInterval: flush,
maxBatch: batch,
maxQueue: queue,
client: client,
client: buildFetchClient(opts.HTTPClient),
}
}

// buildFetchClient assembles the resilient fetch client used for batch delivery.
// It keeps the prior 10s timeout and JSON content type, adds default retries
// (429/5xx + network, jittered backoff, aborts on 4xx), and lets a test seam
// swap the underlying *http.Client.
func buildFetchClient(httpClient *http.Client) *fetch.Client {
retry := fetch.DefaultRetryOptions
b := fetch.NewClientBuilder().
WithTimeout(10 * time.Second).
WithRetry(&retry).
WithBaseHeaders(http.Header{"Content-Type": []string{"application/json"}})
if httpClient != nil {
b = b.WithHTTPClient(httpClient)
}
return b.Build()
}

// newTransportFromClientOptions wires a Transport from ClientOptions, used by
// the bootstrap / default init when a DSN is configured.
func newTransportFromClientOptions(opts ClientOptions) *Transport {
Expand Down Expand Up @@ -149,21 +173,19 @@ func (t *Transport) Flush(ctx context.Context) error {

func (t *Transport) send(ctx context.Context, batch []ObservabilityEvent) error {
payload := IngestPayload{Type: "error", Events: batch}
body, err := json.Marshal(payload)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, t.dsn, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("content-type", "application/json")
resp, err := t.client.Do(req)
// fetch marshals the body to JSON and applies retries/timeout/backoff. It
// returns a typed *fetch.HTTPResponseError for non-2xx responses after
// retries are exhausted; normalize that to httpStatusError so callers and
// tests see the same surface as before.
resp, err := fetch.SimplePost(ctx, t.client, t.dsn, payload, nil)
if err != nil {
var httpErr *fetch.HTTPResponseError
if errors.As(err, &httpErr) {
return &httpStatusError{status: httpErr.StatusCode}
}
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
if !resp.OK {
return &httpStatusError{status: resp.StatusCode}
}
return nil
Expand Down
50 changes: 50 additions & 0 deletions go/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package observability
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -108,6 +109,55 @@ func TestFlushEmptyNoop(t *testing.T) {
}
}

// TestTransportFetchInCallRetry verifies the resilient fetch stack retries a
// transient 5xx within a single send/Flush (SMOODEV-2026) — the first attempt
// 500s, fetch backs off and retries, and the batch lands without ever needing
// the transport's slower re-queue path. queueSize stays 0 after the flush.
func TestTransportFetchInCallRetry(t *testing.T) {
var hits int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if atomic.AddInt32(&hits, 1) == 1 {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

tr := NewTransport(TransportOptions{DSN: srv.URL, MaxBatchSize: 1})
tr.Enqueue(ObservabilityEvent{EventID: "1"}) // full batch -> synchronous Flush

// fetch retried the 500 in-call and succeeded, so the server saw >= 2 hits
// and the queue drained without a re-queue.
waitFor(t, func() bool { return atomic.LoadInt32(&hits) >= 2 })
waitFor(t, func() bool { return tr.queueSize() == 0 })
}

// TestTransportSendErrorOnPersistent4xx confirms a persistent client error
// surfaces as httpStatusError (fetch aborts retries on 4xx) so Flush re-queues
// the batch rather than dropping it.
func TestTransportSendErrorOnPersistent4xx(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
}))
defer srv.Close()

tr := NewTransport(TransportOptions{DSN: srv.URL, MaxBatchSize: 10})
tr.Enqueue(ObservabilityEvent{EventID: "1"})
err := tr.Flush(context.Background())
var statusErr *httpStatusError
if !errors.As(err, &statusErr) {
t.Fatalf("expected *httpStatusError, got %v", err)
}
if statusErr.status != http.StatusBadRequest {
t.Errorf("status = %d, want 400", statusErr.status)
}
// Batch was re-queued for a later attempt, not dropped.
if tr.queueSize() != 1 {
t.Errorf("queue size = %d, want 1 (re-queued)", tr.queueSize())
}
}

func waitFor(t *testing.T, cond func() bool) {
t.Helper()
deadline := time.Now().Add(2 * time.Second)
Expand Down
Loading