From 09b1731aea1ecd8c6e9695f77c3c1c3c7e0ff514 Mon Sep 17 00:00:00 2001 From: Rolf Hoefer Date: Thu, 4 Jun 2026 00:57:12 +0800 Subject: [PATCH 1/3] fix(mediorum): skip legacy transient retry ops --- pkg/mediorum/crudr/crudr.go | 41 +++++++++++- pkg/mediorum/crudr/crudr_test.go | 111 +++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 1 deletion(-) diff --git a/pkg/mediorum/crudr/crudr.go b/pkg/mediorum/crudr/crudr.go index c962dba4..5d5f2752 100644 --- a/pkg/mediorum/crudr/crudr.go +++ b/pkg/mediorum/crudr/crudr.go @@ -271,7 +271,7 @@ func (c *Crudr) ApplyOp(op *Op) error { // create op + records in a db transaction err = c.DB.Transaction(func(tx *gorm.DB) error { - if !op.Transient { + if c.shouldPersistOp(op) { res := tx.Clauses(clause.OnConflict{DoNothing: true}).Create(op) if res.Error != nil { return res.Error @@ -324,6 +324,45 @@ func (c *Crudr) ApplyOp(op *Op) error { return nil } +func (c *Crudr) shouldPersistOp(op *Op) bool { + if op.Transient { + return false + } + if op.Host == c.host { + return true + } + return !isLegacyTransientUploadRetryOp(op) +} + +func isLegacyTransientUploadRetryOp(op *Op) bool { + if op.Table != "uploads" || op.Action != ActionUpdate { + return false + } + + var rows []struct { + Status string `json:"status"` + ErrorCount int `json:"error_count"` + Results map[string]string `json:"results"` + } + if err := json.Unmarshal(op.Data, &rows); err != nil || len(rows) == 0 { + return false + } + + for _, row := range rows { + if row.Status != "busy" && row.Status != "error" { + return false + } + if row.ErrorCount <= 5 { + return false + } + if row.Results["320"] != "" { + return false + } + } + + return true +} + func (c *Crudr) GetOutboxSizes() map[string]int { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/mediorum/crudr/crudr_test.go b/pkg/mediorum/crudr/crudr_test.go index 4836e708..6d40a459 100644 --- a/pkg/mediorum/crudr/crudr_test.go +++ b/pkg/mediorum/crudr/crudr_test.go @@ -2,11 +2,13 @@ package crudr import ( "context" + "fmt" "testing" "time" "github.com/OpenAudio/go-openaudio/pkg/lifecycle" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "gorm.io/gorm" ) @@ -19,6 +21,13 @@ type TestBlobThing struct { DeletedAt gorm.DeletedAt `gorm:"index"` } +type Upload struct { + ID string `json:"id" gorm:"primaryKey"` + Status string `json:"status"` + ErrorCount int `json:"error_count"` + Results map[string]string `json:"results" gorm:"serializer:json"` +} + func TestCrudr(t *testing.T) { db := SetupTestDB() @@ -72,3 +81,105 @@ func TestCrudr(t *testing.T) { assert.Len(t, blobs, 3) } } + +func TestRemoteLegacyTransientUploadRetryOpsApplyWithoutPersisting(t *testing.T) { + db := SetupTestDB() + + require.NoError(t, db.AutoMigrate(Upload{})) + require.NoError(t, db.Exec("TRUNCATE ops, uploads").Error) + + z := zap.NewNop() + c := New("https://self.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr retry op test", z), z, nil). + RegisterModels(&Upload{}) + + op := &Op{ + ULID: "01KTC3Y9SW2GND1R4QZW2SAS01", + Host: "https://peer.example", + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"id":"upload-1","status":"busy","error_count":6,"results":{}}]`), + } + + require.NoError(t, c.ApplyOp(op)) + + var opsCount int64 + require.NoError(t, db.Model(&Op{}).Count(&opsCount).Error) + require.Zero(t, opsCount) + + var upload Upload + require.NoError(t, db.First(&upload, "id = ?", "upload-1").Error) + require.Equal(t, "busy", upload.Status) + require.Equal(t, 6, upload.ErrorCount) +} + +func TestLegacyTransientUploadRetryOpsPersistForLocalHost(t *testing.T) { + db := SetupTestDB() + + require.NoError(t, db.AutoMigrate(Upload{})) + require.NoError(t, db.Exec("TRUNCATE ops, uploads").Error) + + z := zap.NewNop() + c := New("https://self.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr local retry op test", z), z, nil). + RegisterModels(&Upload{}) + + op := &Op{ + ULID: "01KTC3Y9SW2GND1R4QZW2SAS02", + Host: "https://self.example", + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"id":"upload-1","status":"error","error_count":6,"results":{}}]`), + } + + require.NoError(t, c.ApplyOp(op)) + + var opsCount int64 + require.NoError(t, db.Model(&Op{}).Count(&opsCount).Error) + require.EqualValues(t, 1, opsCount) +} + +func TestRemoteUploadRetryOpsPersistWhenNotLegacyTransient(t *testing.T) { + tests := []struct { + name string + data string + }{ + { + name: "retry limit not exceeded", + data: `[{"id":"upload-1","status":"error","error_count":5,"results":{}}]`, + }, + { + name: "transcode result exists", + data: `[{"id":"upload-1","status":"error","error_count":6,"results":{"320":"cid-320"}}]`, + }, + { + name: "final done", + data: `[{"id":"upload-1","status":"done","error_count":6,"results":{}}]`, + }, + } + + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db := SetupTestDB() + + require.NoError(t, db.AutoMigrate(Upload{})) + require.NoError(t, db.Exec("TRUNCATE ops, uploads").Error) + + z := zap.NewNop() + c := New("https://self.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr durable upload op test", z), z, nil). + RegisterModels(&Upload{}) + + op := &Op{ + ULID: fmt.Sprintf("01KTC3Y9SW2GND1R4QZW2SAS%02d", i+3), + Host: "https://peer.example", + Action: ActionUpdate, + Table: "uploads", + Data: []byte(tt.data), + } + + require.NoError(t, c.ApplyOp(op)) + + var opsCount int64 + require.NoError(t, db.Model(&Op{}).Count(&opsCount).Error) + require.EqualValues(t, 1, opsCount) + }) + } +} From 28308b4077893e7d81afaa94e11a7b1d2accf395 Mon Sep 17 00:00:00 2001 From: Rolf Hoefer Date: Thu, 4 Jun 2026 20:35:10 +0800 Subject: [PATCH 2/3] test(mediorum): cover retry-op persistence boundaries --- pkg/mediorum/crudr/crudr_test.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/pkg/mediorum/crudr/crudr_test.go b/pkg/mediorum/crudr/crudr_test.go index 6d40a459..5488adb1 100644 --- a/pkg/mediorum/crudr/crudr_test.go +++ b/pkg/mediorum/crudr/crudr_test.go @@ -110,6 +110,23 @@ func TestRemoteLegacyTransientUploadRetryOpsApplyWithoutPersisting(t *testing.T) require.NoError(t, db.First(&upload, "id = ?", "upload-1").Error) require.Equal(t, "busy", upload.Status) require.Equal(t, 6, upload.ErrorCount) + + op = &Op{ + ULID: "01KTC3Y9SW2GND1R4QZW2SAS02", + Host: "https://peer.example", + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"id":"upload-2","status":"error","error_count":6}]`), + } + + require.NoError(t, c.ApplyOp(op)) + require.NoError(t, db.Model(&Op{}).Count(&opsCount).Error) + require.Zero(t, opsCount) + + upload = Upload{} + require.NoError(t, db.First(&upload, "id = ?", "upload-2").Error) + require.Equal(t, "error", upload.Status) + require.Equal(t, 6, upload.ErrorCount) } func TestLegacyTransientUploadRetryOpsPersistForLocalHost(t *testing.T) { @@ -123,7 +140,7 @@ func TestLegacyTransientUploadRetryOpsPersistForLocalHost(t *testing.T) { RegisterModels(&Upload{}) op := &Op{ - ULID: "01KTC3Y9SW2GND1R4QZW2SAS02", + ULID: "01KTC3Y9SW2GND1R4QZW2SAS03", Host: "https://self.example", Action: ActionUpdate, Table: "uploads", @@ -154,6 +171,10 @@ func TestRemoteUploadRetryOpsPersistWhenNotLegacyTransient(t *testing.T) { name: "final done", data: `[{"id":"upload-1","status":"done","error_count":6,"results":{}}]`, }, + { + name: "mixed batch has durable row", + data: `[{"id":"upload-1","status":"error","error_count":6,"results":{}},{"id":"upload-2","status":"done","error_count":6,"results":{}}]`, + }, } for i, tt := range tests { From 7b92d0bdb0fcaf8a6e64db629bb3d48bdae36333 Mon Sep 17 00:00:00 2001 From: Rolf Hoefer Date: Thu, 4 Jun 2026 21:17:48 +0800 Subject: [PATCH 3/3] test(mediorum): harden retry-op replay boundaries --- pkg/mediorum/crudr/client_test.go | 39 ++++++++ pkg/mediorum/crudr/crudr_test.go | 147 ++++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+) diff --git a/pkg/mediorum/crudr/client_test.go b/pkg/mediorum/crudr/client_test.go index 5e2e8856..adc1c533 100644 --- a/pkg/mediorum/crudr/client_test.go +++ b/pkg/mediorum/crudr/client_test.go @@ -36,3 +36,42 @@ func TestDoSweepAdvancesCursorToLastScannedHeader(t *testing.T) { require.NoError(t, db.Where("host = ?", peer.Host).First(&cursor).Error) require.Equal(t, lastScannedULID, cursor.LastULID) } + +func TestDoSweepAdvancesCursorThroughSuppressedRetryOps(t *testing.T) { + db := SetupTestDB() + z := zap.NewNop() + c := New("https://self.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr suppressed sweep test", z), z, nil). + RegisterModels(&Upload{}) + require.NoError(t, db.AutoMigrate(Upload{})) + require.NoError(t, db.Exec("TRUNCATE ops, cursors, uploads").Error) + + suppressedULID := "01KT26A1XRE7JYJ3FQBTT4C3CY" + peerServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/internal/crud/sweep", r.URL.Path) + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `[{ + "ulid":"`+suppressedULID+`", + "host":"https://peer.example", + "action":"update", + "table":"uploads", + "data":[{"id":"upload-1","status":"error","error_count":6,"results":{}}] + }]`) + })) + t.Cleanup(peerServer.Close) + + peer := NewPeerClient(peerServer.URL, c, "https://self.example") + require.NoError(t, peer.doSweep(context.Background())) + + var cursor Cursor + require.NoError(t, db.Where("host = ?", peer.Host).First(&cursor).Error) + require.Equal(t, suppressedULID, cursor.LastULID) + + var opsCount int64 + require.NoError(t, db.Model(&Op{}).Count(&opsCount).Error) + require.Zero(t, opsCount) + + var upload Upload + require.NoError(t, db.First(&upload, "id = ?", "upload-1").Error) + require.Equal(t, "error", upload.Status) + require.Equal(t, 6, upload.ErrorCount) +} diff --git a/pkg/mediorum/crudr/crudr_test.go b/pkg/mediorum/crudr/crudr_test.go index 5488adb1..c49e267c 100644 --- a/pkg/mediorum/crudr/crudr_test.go +++ b/pkg/mediorum/crudr/crudr_test.go @@ -204,3 +204,150 @@ func TestRemoteUploadRetryOpsPersistWhenNotLegacyTransient(t *testing.T) { }) } } + +func TestLegacyTransientUploadRetryOpClassifierIsNarrow(t *testing.T) { + tests := []struct { + name string + op Op + want bool + }{ + { + name: "busy retry without result", + op: Op{ + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"status":"busy","error_count":6,"results":{}}]`), + }, + want: true, + }, + { + name: "error retry without results field", + op: Op{ + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"status":"error","error_count":6}]`), + }, + want: true, + }, + { + name: "retry limit boundary is durable", + op: Op{ + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"status":"error","error_count":5,"results":{}}]`), + }, + }, + { + name: "transcode result is durable", + op: Op{ + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"status":"error","error_count":6,"results":{"320":"cid-320"}}]`), + }, + }, + { + name: "done status is durable", + op: Op{ + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"status":"done","error_count":6,"results":{}}]`), + }, + }, + { + name: "mixed batch is durable", + op: Op{ + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"status":"error","error_count":6,"results":{}},{"status":"done","error_count":6,"results":{}}]`), + }, + }, + { + name: "wrong table is durable", + op: Op{ + Action: ActionUpdate, + Table: "qm_audio_analyses", + Data: []byte(`[{"status":"error","error_count":6,"results":{}}]`), + }, + }, + { + name: "create is durable", + op: Op{ + Action: ActionCreate, + Table: "uploads", + Data: []byte(`[{"status":"error","error_count":6,"results":{}}]`), + }, + }, + { + name: "malformed json is durable", + op: Op{ + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`{"status":"error","error_count":6,"results":{}}`), + }, + }, + { + name: "unexpected results shape is durable", + op: Op{ + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"status":"error","error_count":6,"results":[]}]`), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.want, isLegacyTransientUploadRetryOp(&tt.op)) + }) + } +} + +func TestSuppressedRetryOpsDoNotPoisonReplayHistory(t *testing.T) { + db := SetupTestDB() + + require.NoError(t, db.AutoMigrate(Upload{})) + require.NoError(t, db.Exec("TRUNCATE ops, uploads").Error) + + z := zap.NewNop() + c := New("https://self.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr replay source test", z), z, nil). + RegisterModels(&Upload{}) + + require.NoError(t, c.ApplyOp(&Op{ + ULID: "01KTC3Y9SW2GND1R4QZW2SAS10", + Host: "https://peer.example", + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"id":"upload-1","status":"busy","error_count":6,"results":{}}]`), + })) + require.NoError(t, c.ApplyOp(&Op{ + ULID: "01KTC3Y9SW2GND1R4QZW2SAS11", + Host: "https://peer.example", + Action: ActionUpdate, + Table: "uploads", + Data: []byte(`[{"id":"upload-1","status":"done","error_count":6,"results":{"320":"cid-320"}}]`), + })) + + var persisted []Op + require.NoError(t, db.Order("ulid ASC").Find(&persisted).Error) + require.Len(t, persisted, 1) + require.Equal(t, "01KTC3Y9SW2GND1R4QZW2SAS11", persisted[0].ULID) + + var live Upload + require.NoError(t, db.First(&live, "id = ?", "upload-1").Error) + require.Equal(t, "done", live.Status) + require.Equal(t, "cid-320", live.Results["320"]) + + require.NoError(t, db.Exec("TRUNCATE ops, uploads").Error) + + replay := New("https://fresh.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr replay target test", z), z, nil). + RegisterModels(&Upload{}) + for i := range persisted { + require.NoError(t, replay.ApplyOp(&persisted[i])) + } + + var replayed Upload + require.NoError(t, db.First(&replayed, "id = ?", "upload-1").Error) + require.Equal(t, "done", replayed.Status) + require.Equal(t, 6, replayed.ErrorCount) + require.Equal(t, "cid-320", replayed.Results["320"]) +}