Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions pkg/mediorum/crudr/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,42 @@ func TestDoSweepAdvancesCursorToLastScannedHeader(t *testing.T) {
require.NoError(t, db.Where("host = ?", peer.Host).First(&cursor).Error)
require.Equal(t, lastScannedULID, cursor.LastULID)
}

func TestDoSweepAdvancesCursorThroughSuppressedRetryOps(t *testing.T) {
db := SetupTestDB()
z := zap.NewNop()
c := New("https://self.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr suppressed sweep test", z), z, nil).
RegisterModels(&Upload{})
require.NoError(t, db.AutoMigrate(Upload{}))
require.NoError(t, db.Exec("TRUNCATE ops, cursors, uploads").Error)

suppressedULID := "01KT26A1XRE7JYJ3FQBTT4C3CY"
peerServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/internal/crud/sweep", r.URL.Path)
w.Header().Set("Content-Type", "application/json")
_, _ = io.WriteString(w, `[{
"ulid":"`+suppressedULID+`",
"host":"https://peer.example",
"action":"update",
"table":"uploads",
"data":[{"id":"upload-1","status":"error","error_count":6,"results":{}}]
}]`)
}))
t.Cleanup(peerServer.Close)

peer := NewPeerClient(peerServer.URL, c, "https://self.example")
require.NoError(t, peer.doSweep(context.Background()))

var cursor Cursor
require.NoError(t, db.Where("host = ?", peer.Host).First(&cursor).Error)
require.Equal(t, suppressedULID, cursor.LastULID)

var opsCount int64
require.NoError(t, db.Model(&Op{}).Count(&opsCount).Error)
require.Zero(t, opsCount)

var upload Upload
require.NoError(t, db.First(&upload, "id = ?", "upload-1").Error)
require.Equal(t, "error", upload.Status)
require.Equal(t, 6, upload.ErrorCount)
}
41 changes: 40 additions & 1 deletion pkg/mediorum/crudr/crudr.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (c *Crudr) ApplyOp(op *Op) error {

// create op + records in a db transaction
err = c.DB.Transaction(func(tx *gorm.DB) error {
if !op.Transient {
if c.shouldPersistOp(op) {
res := tx.Clauses(clause.OnConflict{DoNothing: true}).Create(op)
if res.Error != nil {
return res.Error
Expand Down Expand Up @@ -324,6 +324,45 @@ func (c *Crudr) ApplyOp(op *Op) error {
return nil
}

func (c *Crudr) shouldPersistOp(op *Op) bool {
if op.Transient {
return false
}
if op.Host == c.host {
return true
}
return !isLegacyTransientUploadRetryOp(op)
}

func isLegacyTransientUploadRetryOp(op *Op) bool {
if op.Table != "uploads" || op.Action != ActionUpdate {
return false
}

var rows []struct {
Status string `json:"status"`
ErrorCount int `json:"error_count"`
Results map[string]string `json:"results"`
}
if err := json.Unmarshal(op.Data, &rows); err != nil || len(rows) == 0 {
return false
}

for _, row := range rows {
if row.Status != "busy" && row.Status != "error" {
return false
}
if row.ErrorCount <= 5 {
return false
}
if row.Results["320"] != "" {
return false
}
}

return true
}

func (c *Crudr) GetOutboxSizes() map[string]int {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
279 changes: 279 additions & 0 deletions pkg/mediorum/crudr/crudr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package crudr

import (
"context"
"fmt"
"testing"
"time"

"github.com/OpenAudio/go-openaudio/pkg/lifecycle"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"gorm.io/gorm"
)
Expand All @@ -19,6 +21,13 @@ type TestBlobThing struct {
DeletedAt gorm.DeletedAt `gorm:"index"`
}

type Upload struct {
ID string `json:"id" gorm:"primaryKey"`
Status string `json:"status"`
ErrorCount int `json:"error_count"`
Results map[string]string `json:"results" gorm:"serializer:json"`
}

func TestCrudr(t *testing.T) {

db := SetupTestDB()
Expand Down Expand Up @@ -72,3 +81,273 @@ func TestCrudr(t *testing.T) {
assert.Len(t, blobs, 3)
}
}

func TestRemoteLegacyTransientUploadRetryOpsApplyWithoutPersisting(t *testing.T) {
db := SetupTestDB()

require.NoError(t, db.AutoMigrate(Upload{}))
require.NoError(t, db.Exec("TRUNCATE ops, uploads").Error)

z := zap.NewNop()
c := New("https://self.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr retry op test", z), z, nil).
RegisterModels(&Upload{})

op := &Op{
ULID: "01KTC3Y9SW2GND1R4QZW2SAS01",
Host: "https://peer.example",
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"id":"upload-1","status":"busy","error_count":6,"results":{}}]`),
}

require.NoError(t, c.ApplyOp(op))

var opsCount int64
require.NoError(t, db.Model(&Op{}).Count(&opsCount).Error)
require.Zero(t, opsCount)

var upload Upload
require.NoError(t, db.First(&upload, "id = ?", "upload-1").Error)
require.Equal(t, "busy", upload.Status)
require.Equal(t, 6, upload.ErrorCount)

op = &Op{
ULID: "01KTC3Y9SW2GND1R4QZW2SAS02",
Host: "https://peer.example",
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"id":"upload-2","status":"error","error_count":6}]`),
}

require.NoError(t, c.ApplyOp(op))
require.NoError(t, db.Model(&Op{}).Count(&opsCount).Error)
require.Zero(t, opsCount)

upload = Upload{}
require.NoError(t, db.First(&upload, "id = ?", "upload-2").Error)
require.Equal(t, "error", upload.Status)
require.Equal(t, 6, upload.ErrorCount)
}

func TestLegacyTransientUploadRetryOpsPersistForLocalHost(t *testing.T) {
db := SetupTestDB()

require.NoError(t, db.AutoMigrate(Upload{}))
require.NoError(t, db.Exec("TRUNCATE ops, uploads").Error)

z := zap.NewNop()
c := New("https://self.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr local retry op test", z), z, nil).
RegisterModels(&Upload{})

op := &Op{
ULID: "01KTC3Y9SW2GND1R4QZW2SAS03",
Host: "https://self.example",
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"id":"upload-1","status":"error","error_count":6,"results":{}}]`),
}

require.NoError(t, c.ApplyOp(op))

var opsCount int64
require.NoError(t, db.Model(&Op{}).Count(&opsCount).Error)
require.EqualValues(t, 1, opsCount)
}

func TestRemoteUploadRetryOpsPersistWhenNotLegacyTransient(t *testing.T) {
tests := []struct {
name string
data string
}{
{
name: "retry limit not exceeded",
data: `[{"id":"upload-1","status":"error","error_count":5,"results":{}}]`,
},
{
name: "transcode result exists",
data: `[{"id":"upload-1","status":"error","error_count":6,"results":{"320":"cid-320"}}]`,
},
{
name: "final done",
data: `[{"id":"upload-1","status":"done","error_count":6,"results":{}}]`,
},
{
name: "mixed batch has durable row",
data: `[{"id":"upload-1","status":"error","error_count":6,"results":{}},{"id":"upload-2","status":"done","error_count":6,"results":{}}]`,
},
}

for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := SetupTestDB()

require.NoError(t, db.AutoMigrate(Upload{}))
require.NoError(t, db.Exec("TRUNCATE ops, uploads").Error)

z := zap.NewNop()
c := New("https://self.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr durable upload op test", z), z, nil).
RegisterModels(&Upload{})

op := &Op{
ULID: fmt.Sprintf("01KTC3Y9SW2GND1R4QZW2SAS%02d", i+3),
Host: "https://peer.example",
Action: ActionUpdate,
Table: "uploads",
Data: []byte(tt.data),
}

require.NoError(t, c.ApplyOp(op))

var opsCount int64
require.NoError(t, db.Model(&Op{}).Count(&opsCount).Error)
require.EqualValues(t, 1, opsCount)
})
}
}

func TestLegacyTransientUploadRetryOpClassifierIsNarrow(t *testing.T) {
tests := []struct {
name string
op Op
want bool
}{
{
name: "busy retry without result",
op: Op{
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"status":"busy","error_count":6,"results":{}}]`),
},
want: true,
},
{
name: "error retry without results field",
op: Op{
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"status":"error","error_count":6}]`),
},
want: true,
},
{
name: "retry limit boundary is durable",
op: Op{
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"status":"error","error_count":5,"results":{}}]`),
},
},
{
name: "transcode result is durable",
op: Op{
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"status":"error","error_count":6,"results":{"320":"cid-320"}}]`),
},
},
{
name: "done status is durable",
op: Op{
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"status":"done","error_count":6,"results":{}}]`),
},
},
{
name: "mixed batch is durable",
op: Op{
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"status":"error","error_count":6,"results":{}},{"status":"done","error_count":6,"results":{}}]`),
},
},
{
name: "wrong table is durable",
op: Op{
Action: ActionUpdate,
Table: "qm_audio_analyses",
Data: []byte(`[{"status":"error","error_count":6,"results":{}}]`),
},
},
{
name: "create is durable",
op: Op{
Action: ActionCreate,
Table: "uploads",
Data: []byte(`[{"status":"error","error_count":6,"results":{}}]`),
},
},
{
name: "malformed json is durable",
op: Op{
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`{"status":"error","error_count":6,"results":{}}`),
},
},
{
name: "unexpected results shape is durable",
op: Op{
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"status":"error","error_count":6,"results":[]}]`),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.want, isLegacyTransientUploadRetryOp(&tt.op))
})
}
}

func TestSuppressedRetryOpsDoNotPoisonReplayHistory(t *testing.T) {
db := SetupTestDB()

require.NoError(t, db.AutoMigrate(Upload{}))
require.NoError(t, db.Exec("TRUNCATE ops, uploads").Error)

z := zap.NewNop()
c := New("https://self.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr replay source test", z), z, nil).
RegisterModels(&Upload{})

require.NoError(t, c.ApplyOp(&Op{
ULID: "01KTC3Y9SW2GND1R4QZW2SAS10",
Host: "https://peer.example",
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"id":"upload-1","status":"busy","error_count":6,"results":{}}]`),
}))
require.NoError(t, c.ApplyOp(&Op{
ULID: "01KTC3Y9SW2GND1R4QZW2SAS11",
Host: "https://peer.example",
Action: ActionUpdate,
Table: "uploads",
Data: []byte(`[{"id":"upload-1","status":"done","error_count":6,"results":{"320":"cid-320"}}]`),
}))

var persisted []Op
require.NoError(t, db.Order("ulid ASC").Find(&persisted).Error)
require.Len(t, persisted, 1)
require.Equal(t, "01KTC3Y9SW2GND1R4QZW2SAS11", persisted[0].ULID)

var live Upload
require.NoError(t, db.First(&live, "id = ?", "upload-1").Error)
require.Equal(t, "done", live.Status)
require.Equal(t, "cid-320", live.Results["320"])

require.NoError(t, db.Exec("TRUNCATE ops, uploads").Error)

replay := New("https://fresh.example", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "crudr replay target test", z), z, nil).
RegisterModels(&Upload{})
for i := range persisted {
require.NoError(t, replay.ApplyOp(&persisted[i]))
}

var replayed Upload
require.NoError(t, db.First(&replayed, "id = ?", "upload-1").Error)
require.Equal(t, "done", replayed.Status)
require.Equal(t, 6, replayed.ErrorCount)
require.Equal(t, "cid-320", replayed.Results["320"])
}
Loading