Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,9 @@ paths:
required: false
description: ID of the postage batch to use. Either this or `swarm-postage-stamp` must be supplied.
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmTagParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmDeferredUpload"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress"
requestBody:
Expand All @@ -895,6 +898,10 @@ paths:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/ReferenceResponse"
headers:
"swarm-tag":
description: Tag UID, returned when an upload session is in use (either because `swarm-tag` was supplied, `swarm-deferred-upload` requested deferred mode, or `swarm-pin` was set).
schema:
$ref: "SwarmCommon.yaml#/components/schemas/Uid"
"swarm-act-history-address":
$ref: "SwarmCommon.yaml#/components/headers/SwarmActHistoryAddress"
"400":
Expand All @@ -903,6 +910,8 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/401"
"402":
$ref: "SwarmCommon.yaml#/components/responses/402"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
Expand Down
57 changes: 48 additions & 9 deletions pkg/api/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
"strconv"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -49,6 +51,9 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
StampSig []byte `map:"Swarm-Postage-Stamp"`
SwarmTag uint64 `map:"Swarm-Tag"`
Pin bool `map:"Swarm-Pin"`
Deferred *bool `map:"Swarm-Deferred-Upload"`
Act bool `map:"Swarm-Act"`
HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"`
}{}
Expand All @@ -63,10 +68,39 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}

// Resolve the deferred upload mode. Historically /soc always pushed
// directly to the network; preserve that default when neither header is
// provided. An explicit Swarm-Deferred-Upload header wins, otherwise the
// presence of a Swarm-Tag opts the caller into deferred mode (matching
// /chunks' auto-defer semantics).
var (
putter storer.PutterSession
err error
tag uint64
deferred bool
err error
)
switch {
case headers.Deferred != nil:
deferred = *headers.Deferred
case headers.SwarmTag > 0:
deferred = true
}

if deferred || headers.Pin {
tag, err = s.getOrCreateSessionID(headers.SwarmTag)
if err != nil {
logger.Debug("get or create tag failed", "error", err)
logger.Error(nil, "get or create tag failed")
switch {
case errors.Is(err, storage.ErrNotFound):
jsonhttp.NotFound(w, "tag not found")
default:
jsonhttp.InternalServerError(w, "cannot get or create tag")
}
return
}
}

var putter storer.PutterSession

if len(headers.StampSig) != 0 {
stamp := postage.Stamp{}
Expand All @@ -80,16 +114,16 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {

putter, err = s.newStampedPutter(r.Context(), putterOptions{
BatchID: stamp.BatchID(),
TagID: 0,
Pin: false,
Deferred: false,
TagID: tag,
Pin: headers.Pin,
Deferred: deferred,
}, &stamp)
} else {
putter, err = s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: 0,
Pin: false,
Deferred: false,
TagID: tag,
Pin: headers.Pin,
Deferred: deferred,
})
}
if err != nil {
Expand Down Expand Up @@ -206,9 +240,14 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(ow, "done split failed")
return
}
if tag != 0 {
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag))
}

w.Header().Set(AccessControlExposeHeaders, SwarmTagHeader)
if headers.Act {
w.Header().Set(SwarmActHistoryAddressHeader, historyReference.String())
w.Header().Set(AccessControlExposeHeaders, SwarmActHistoryAddressHeader)
w.Header().Add(AccessControlExposeHeaders, SwarmActHistoryAddressHeader)
}

jsonhttp.Created(w, socPostResponse{Reference: reference})
Expand Down
53 changes: 52 additions & 1 deletion pkg/api/soc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ func TestSOC(t *testing.T) {
DirectUpload: true,
})
jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"),
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, hexbatch),
jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())),
)
Expand All @@ -166,6 +165,58 @@ func TestSOC(t *testing.T) {
}
})

t.Run("deferred upload", func(t *testing.T) {
s := testingsoc.GenerateMockSOC(t, testData)
hexbatch := hex.EncodeToString(batchOk)
storer := mockstorer.New()
client, _, _, _ := newTestServer(t, testServerOptions{
Storer: storer,
Post: newTestPostService(),
DirectUpload: true,
})
jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"),
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, hexbatch),
jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())),
)
// Deferred upload goes through the upload store, not DirectUpload.
has, err := storer.ChunkStore().Has(context.Background(), s.Address())
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("expected chunk in storer after deferred upload")
}
})

t.Run("tag header opts in to deferred and echoes tag", func(t *testing.T) {
s := testingsoc.GenerateMockSOC(t, testData)
hexbatch := hex.EncodeToString(batchOk)
storer := mockstorer.New()
client, _, _, _ := newTestServer(t, testServerOptions{
Storer: storer,
Post: newTestPostService(),
DirectUpload: true,
})
tag, err := storer.NewSession()
if err != nil {
t.Fatalf("failed creating tag: %v", err)
}
jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmTagHeader, fmt.Sprintf("%d", tag.TagID)),
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, hexbatch),
jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())),
jsonhttptest.WithExpectedResponseHeader(api.SwarmTagHeader, fmt.Sprintf("%d", tag.TagID)),
)
has, err := storer.ChunkStore().Has(context.Background(), s.Address())
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("expected chunk in storer after tagged upload")
}
})

// TestPreSignedUpload tests that chunk can be uploaded with pre-signed postage stamp
t.Run("pre-signed upload", func(t *testing.T) {
t.Parallel()
Expand Down
Loading