diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 42859f88700..79dbd259cd3 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -877,6 +877,9 @@ paths: required: false description: ID of the postage batch to use. Either this or `swarm-postage-stamp` must be supplied. - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmTagParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmDeferredUpload" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" requestBody: @@ -895,6 +898,10 @@ paths: schema: $ref: "SwarmCommon.yaml#/components/schemas/ReferenceResponse" headers: + "swarm-tag": + description: Tag UID, returned when an upload session is in use (either because `swarm-tag` was supplied, `swarm-deferred-upload` requested deferred mode, or `swarm-pin` was set). + schema: + $ref: "SwarmCommon.yaml#/components/schemas/Uid" "swarm-act-history-address": $ref: "SwarmCommon.yaml#/components/headers/SwarmActHistoryAddress" "400": @@ -903,6 +910,8 @@ paths: $ref: "SwarmCommon.yaml#/components/responses/401" "402": $ref: "SwarmCommon.yaml#/components/responses/402" + "404": + $ref: "SwarmCommon.yaml#/components/responses/404" "500": $ref: "SwarmCommon.yaml#/components/responses/500" default: diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 07623e3ff52..e8a1a08c4de 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -8,6 +8,7 @@ import ( "bytes" "encoding/hex" "errors" + "fmt" "io" "net/http" "strconv" @@ -17,6 +18,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/gorilla/mux" @@ -49,6 +51,9 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { headers := struct { BatchID []byte `map:"Swarm-Postage-Batch-Id"` StampSig []byte `map:"Swarm-Postage-Stamp"` + SwarmTag uint64 `map:"Swarm-Tag"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` Act bool `map:"Swarm-Act"` HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` }{} @@ -63,10 +68,39 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { return } + // Resolve the deferred upload mode. Historically /soc always pushed + // directly to the network; preserve that default when neither header is + // provided. An explicit Swarm-Deferred-Upload header wins, otherwise the + // presence of a Swarm-Tag opts the caller into deferred mode (matching + // /chunks' auto-defer semantics). var ( - putter storer.PutterSession - err error + tag uint64 + deferred bool + err error ) + switch { + case headers.Deferred != nil: + deferred = *headers.Deferred + case headers.SwarmTag > 0: + deferred = true + } + + if deferred || headers.Pin { + tag, err = s.getOrCreateSessionID(headers.SwarmTag) + if err != nil { + logger.Debug("get or create tag failed", "error", err) + logger.Error(nil, "get or create tag failed") + switch { + case errors.Is(err, storage.ErrNotFound): + jsonhttp.NotFound(w, "tag not found") + default: + jsonhttp.InternalServerError(w, "cannot get or create tag") + } + return + } + } + + var putter storer.PutterSession if len(headers.StampSig) != 0 { stamp := postage.Stamp{} @@ -80,16 +114,16 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { putter, err = s.newStampedPutter(r.Context(), putterOptions{ BatchID: stamp.BatchID(), - TagID: 0, - Pin: false, - Deferred: false, + TagID: tag, + Pin: headers.Pin, + Deferred: deferred, }, &stamp) } else { putter, err = s.newStamperPutter(r.Context(), putterOptions{ BatchID: headers.BatchID, - TagID: 0, - Pin: false, - Deferred: false, + TagID: tag, + Pin: headers.Pin, + Deferred: deferred, }) } if err != nil { @@ -206,9 +240,14 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.InternalServerError(ow, "done split failed") return } + if tag != 0 { + w.Header().Set(SwarmTagHeader, fmt.Sprint(tag)) + } + + w.Header().Set(AccessControlExposeHeaders, SwarmTagHeader) if headers.Act { w.Header().Set(SwarmActHistoryAddressHeader, historyReference.String()) - w.Header().Set(AccessControlExposeHeaders, SwarmActHistoryAddressHeader) + w.Header().Add(AccessControlExposeHeaders, SwarmActHistoryAddressHeader) } jsonhttp.Created(w, socPostResponse{Reference: reference}) diff --git a/pkg/api/soc_test.go b/pkg/api/soc_test.go index b1240708300..30b0295a9aa 100644 --- a/pkg/api/soc_test.go +++ b/pkg/api/soc_test.go @@ -156,7 +156,6 @@ func TestSOC(t *testing.T) { DirectUpload: true, }) jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated, - jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"), jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, hexbatch), jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())), ) @@ -166,6 +165,58 @@ func TestSOC(t *testing.T) { } }) + t.Run("deferred upload", func(t *testing.T) { + s := testingsoc.GenerateMockSOC(t, testData) + hexbatch := hex.EncodeToString(batchOk) + storer := mockstorer.New() + client, _, _, _ := newTestServer(t, testServerOptions{ + Storer: storer, + Post: newTestPostService(), + DirectUpload: true, + }) + jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"), + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, hexbatch), + jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())), + ) + // Deferred upload goes through the upload store, not DirectUpload. + has, err := storer.ChunkStore().Has(context.Background(), s.Address()) + if err != nil { + t.Fatal(err) + } + if !has { + t.Fatal("expected chunk in storer after deferred upload") + } + }) + + t.Run("tag header opts in to deferred and echoes tag", func(t *testing.T) { + s := testingsoc.GenerateMockSOC(t, testData) + hexbatch := hex.EncodeToString(batchOk) + storer := mockstorer.New() + client, _, _, _ := newTestServer(t, testServerOptions{ + Storer: storer, + Post: newTestPostService(), + DirectUpload: true, + }) + tag, err := storer.NewSession() + if err != nil { + t.Fatalf("failed creating tag: %v", err) + } + jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmTagHeader, fmt.Sprintf("%d", tag.TagID)), + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, hexbatch), + jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())), + jsonhttptest.WithExpectedResponseHeader(api.SwarmTagHeader, fmt.Sprintf("%d", tag.TagID)), + ) + has, err := storer.ChunkStore().Has(context.Background(), s.Address()) + if err != nil { + t.Fatal(err) + } + if !has { + t.Fatal("expected chunk in storer after tagged upload") + } + }) + // TestPreSignedUpload tests that chunk can be uploaded with pre-signed postage stamp t.Run("pre-signed upload", func(t *testing.T) { t.Parallel()