diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 8028998e970..76b866819a3 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -59,6 +59,11 @@ paths: $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" name: swarm-act-history-address required: false + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false requestBody: required: true content: @@ -89,6 +94,11 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/SwarmEncryptedReference" required: true description: Grantee list reference + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: OK @@ -139,6 +149,11 @@ paths: $ref: "SwarmCommon.yaml#/components/parameters/SwarmDeferredUpload" name: swarm-deferred-upload required: false + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false requestBody: required: true content: @@ -281,6 +296,12 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false + description: Redundancy level for ACT encryption only requestBody: description: Chunk binary data containing at least 8 bytes. content: @@ -659,6 +680,12 @@ paths: summary: Pin a root hash by reference tags: - Pinning + parameters: + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: Pin already exists @@ -867,6 +894,12 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false + description: Redundancy level for ACT encryption only requestBody: required: true description: The SOC binary data, composed of the span (8 bytes) and up to 4KB of payload. @@ -965,6 +998,12 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false + description: Redundancy level for ACT encryption only responses: "201": description: Created @@ -1064,6 +1103,11 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/SwarmReference" required: true description: "Root hash of content (can be of any type: collection, file, chunk)" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: Returns if the content is retrievable @@ -1093,6 +1137,11 @@ paths: $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" name: swarm-postage-batch-id description: Postage batch to use for re-upload. If none is provided and the file was uploaded on the same node before, it will reuse the same batch. If not found, it will return error. If a new batch is provided, the chunks are stamped again with the new batch. + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: OK diff --git a/pkg/accesscontrol/controller_test.go b/pkg/accesscontrol/controller_test.go index c282ef825fb..4d3e0a2a529 100644 --- a/pkg/accesscontrol/controller_test.go +++ b/pkg/accesscontrol/controller_test.go @@ -183,7 +183,7 @@ func TestController_UpdateHandler(t *testing.T) { assertNoError(t, "Session key", err) refCipher := encryption.New(keys[0], 0, 0, sha3.NewLegacyKeccak256) ls := createLs() - gls := loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), true, redundancy.NONE), redundancy.DefaultLevel) + gls := loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), true, redundancy.NONE), redundancy.DefaultDownloadLevel) c := accesscontrol.NewController(al) href, err := getHistoryFixture(t, ctx, ls, al, &publisher.PublicKey) assertNoError(t, "history fixture create", err) @@ -310,7 +310,7 @@ func TestController_Get(t *testing.T) { al1 := accesscontrol.NewLogic(diffieHellman1) al2 := accesscontrol.NewLogic(diffieHellman2) ls := createLs() - gls := loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), true, redundancy.NONE), redundancy.DefaultLevel) + gls := loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), true, redundancy.NONE), redundancy.DefaultDownloadLevel) c1 := accesscontrol.NewController(al1) c2 := accesscontrol.NewController(al2) diff --git a/pkg/accesscontrol/grantee_test.go b/pkg/accesscontrol/grantee_test.go index e11f2c5a555..33f4285d409 100644 --- a/pkg/accesscontrol/grantee_test.go +++ b/pkg/accesscontrol/grantee_test.go @@ -32,7 +32,7 @@ func requestPipelineFactory(ctx context.Context, s storage.Putter, encrypt bool, } func createLs() file.LoadSaver { - return loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), false, redundancy.NONE), redundancy.DefaultLevel) + return loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), false, redundancy.NONE), redundancy.DefaultDownloadLevel) } func generateKeyListFixture() ([]*ecdsa.PublicKey, error) { diff --git a/pkg/accesscontrol/history_test.go b/pkg/accesscontrol/history_test.go index ab976270205..d4218491f47 100644 --- a/pkg/accesscontrol/history_test.go +++ b/pkg/accesscontrol/history_test.go @@ -38,7 +38,7 @@ func TestSingleNodeHistoryLookup(t *testing.T) { t.Parallel() storer := mockstorer.New() ctx := context.Background() - ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultLevel) + ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultDownloadLevel) h, err := accesscontrol.NewHistory(ls) assertNoError(t, "create history", err) @@ -62,7 +62,7 @@ func TestMultiNodeHistoryLookup(t *testing.T) { t.Parallel() storer := mockstorer.New() ctx := context.Background() - ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultLevel) + ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultDownloadLevel) h, err := accesscontrol.NewHistory(ls) assertNoError(t, "create history", err) @@ -134,7 +134,7 @@ func TestHistoryStore(t *testing.T) { t.Parallel() storer := mockstorer.New() ctx := context.Background() - ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultLevel) + ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false), redundancy.DefaultDownloadLevel) h1, err := accesscontrol.NewHistory(ls) assertNoError(t, "create history", err) diff --git a/pkg/accesscontrol/kvs/kvs_test.go b/pkg/accesscontrol/kvs/kvs_test.go index d61c9b9b9cd..65caf50a641 100644 --- a/pkg/accesscontrol/kvs/kvs_test.go +++ b/pkg/accesscontrol/kvs/kvs_test.go @@ -31,7 +31,7 @@ func requestPipelineFactory(ctx context.Context, s storage.Putter, encrypt bool, } func createLs() file.LoadSaver { - return loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), false, redundancy.NONE), redundancy.DefaultLevel) + return loadsave.New(mockStorer.ChunkStore(), mockStorer.Cache(), requestPipelineFactory(context.Background(), mockStorer.Cache(), false, redundancy.NONE), redundancy.DefaultDownloadLevel) } func keyValuePair(t *testing.T) ([]byte, []byte) { diff --git a/pkg/accesscontrol/mock/controller.go b/pkg/accesscontrol/mock/controller.go index 51df0fce8dc..457dd6a0cf4 100644 --- a/pkg/accesscontrol/mock/controller.go +++ b/pkg/accesscontrol/mock/controller.go @@ -55,7 +55,7 @@ func New(o ...Option) accesscontrol.Controller { refMap: make(map[string]swarm.Address), publisher: "", encrypter: encryption.New(encryption.Key("b6ee086390c280eeb9824c331a4427596f0c8510d5564bc1b6168d0059a46e2b"), 0, 0, sha3.NewLegacyKeccak256), - ls: loadsave.New(storer.ChunkStore(), storer.Cache(), requestPipelineFactory(context.Background(), storer.Cache(), false, redundancy.NONE), redundancy.DefaultLevel), + ls: loadsave.New(storer.ChunkStore(), storer.Cache(), requestPipelineFactory(context.Background(), storer.Cache(), false, redundancy.NONE), redundancy.DefaultDownloadLevel), } for _, v := range o { v.apply(m) diff --git a/pkg/api/accesscontrol.go b/pkg/api/accesscontrol.go index 1bc4b03a6c3..d893f1b0673 100644 --- a/pkg/api/accesscontrol.go +++ b/pkg/api/accesscontrol.go @@ -100,10 +100,11 @@ func (s *Service) actDecryptionHandler() func(h http.Handler) http.Handler { } headers := struct { - Timestamp *int64 `map:"Swarm-Act-Timestamp"` - Publisher *ecdsa.PublicKey `map:"Swarm-Act-Publisher"` - HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` - Cache *bool `map:"Swarm-Cache"` + Timestamp *int64 `map:"Swarm-Act-Timestamp"` + Publisher *ecdsa.PublicKey `map:"Swarm-Act-Publisher"` + HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` + Cache *bool `map:"Swarm-Cache"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -125,8 +126,14 @@ func (s *Service) actDecryptionHandler() func(h http.Handler) http.Handler { if headers.Cache != nil { cache = *headers.Cache } + + rLevel := redundancy.DefaultDownloadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + ctx := r.Context() - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), rLevel) reference, err := s.accesscontrol.DownloadHandler(ctx, ls, paths.Address, headers.Publisher, *headers.HistoryAddress, timestamp) if err != nil { logger.Debug("access control download failed", "error", err) @@ -157,9 +164,10 @@ func (s *Service) actEncryptionHandler( putter storer.PutterSession, reference swarm.Address, historyRootHash swarm.Address, + rLevel redundancy.Level, ) (swarm.Address, swarm.Address, error) { publisherPublicKey := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), rLevel) storageReference, historyReference, encryptedReference, err := s.accesscontrol.UploadHandler(ctx, ls, reference, publisherPublicKey, historyRootHash) if err != nil { return swarm.ZeroAddress, swarm.ZeroAddress, err @@ -193,7 +201,8 @@ func (s *Service) actListGranteesHandler(w http.ResponseWriter, r *http.Request) } headers := struct { - Cache *bool `map:"Swarm-Cache"` + Cache *bool `map:"Swarm-Cache"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -203,8 +212,14 @@ func (s *Service) actListGranteesHandler(w http.ResponseWriter, r *http.Request) if headers.Cache != nil { cache = *headers.Cache } + + rLevel := redundancy.DefaultDownloadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + publisher := &s.publicKey - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), rLevel) grantees, err := s.accesscontrol.Get(r.Context(), ls, publisher, paths.GranteesAddress) if err != nil { logger.Debug("could not get grantees", "error", err) @@ -239,11 +254,12 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - SwarmTag uint64 `map:"Swarm-Tag"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address" validate:"required"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + SwarmTag uint64 `map:"Swarm-Tag"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address" validate:"required"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -255,6 +271,11 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) historyAddress = *headers.HistoryAddress } + rLevel := redundancy.DefaultUploadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( tag uint64 err error @@ -342,8 +363,8 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) granteeref := paths.GranteesAddress publisher := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) - gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), rLevel) + gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), rLevel) granteeref, encryptedglref, historyref, actref, err := s.accesscontrol.UpdateHandler(ctx, ls, gls, granteeref, historyAddress, publisher, grantees.Addlist, grantees.Revokelist) if err != nil { logger.Debug("failed to update grantee list", "error", err) @@ -403,11 +424,12 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - SwarmTag uint64 `map:"Swarm-Tag"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + SwarmTag uint64 `map:"Swarm-Tag"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -419,6 +441,11 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques historyAddress = *headers.HistoryAddress } + rLevel := redundancy.DefaultUploadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( tag uint64 err error @@ -494,8 +521,8 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques } publisher := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) - gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), rLevel) + gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), rLevel) granteeref, encryptedglref, historyref, actref, err := s.accesscontrol.UpdateHandler(ctx, ls, gls, swarm.ZeroAddress, historyAddress, publisher, list, nil) if err != nil { logger.Debug("failed to create grantee list", "error", err) diff --git a/pkg/api/accesscontrol_test.go b/pkg/api/accesscontrol_test.go index 737a47ebc11..1fdfd25cc44 100644 --- a/pkg/api/accesscontrol_test.go +++ b/pkg/api/accesscontrol_test.go @@ -35,7 +35,7 @@ import ( //nolint:ireturn func prepareHistoryFixture(storer api.Storer) (accesscontrol.History, swarm.Address) { ctx := context.Background() - ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, redundancy.NONE), redundancy.DefaultDownloadLevel) h, _ := accesscontrol.NewHistory(ls) @@ -170,6 +170,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) { jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "true"), jsonhttptest.WithRequestHeader(api.SwarmTagHeader, fmt.Sprintf("%d", tag.TagID)), + jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, "0"), jsonhttptest.WithRequestBody(v.data), jsonhttptest.WithExpectedJSONResponse(v.resp), jsonhttptest.WithRequestHeader(api.ContentTypeHeader, v.contenttype), diff --git a/pkg/api/bytes.go b/pkg/api/bytes.go index 314e39fcb56..7ba80efdc06 100644 --- a/pkg/api/bytes.go +++ b/pkg/api/bytes.go @@ -35,20 +35,25 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { defer span.Finish() headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - SwarmTag uint64 `map:"Swarm-Tag"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - Encrypt bool `map:"Swarm-Encrypt"` - RLevel redundancy.Level `map:"Swarm-Redundancy-Level" validate:"rLevel"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + SwarmTag uint64 `map:"Swarm-Tag"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + Encrypt bool `map:"Swarm-Encrypt"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) return } + rLevel := redundancy.DefaultUploadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( tag uint64 err error @@ -103,7 +108,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { logger: logger, } - p := requestPipelineFn(putter, headers.Encrypt, headers.RLevel) + p := requestPipelineFn(putter, headers.Encrypt, rLevel) reference, err := p(ctx, r.Body) if err != nil { logger.Debug("split write all failed", "error", err) @@ -121,7 +126,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { encryptedReference := reference historyReference := swarm.ZeroAddress if headers.Act { - encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/bytes_test.go b/pkg/api/bytes_test.go index 9351dc511d0..5a15694ea01 100644 --- a/pkg/api/bytes_test.go +++ b/pkg/api/bytes_test.go @@ -57,6 +57,7 @@ func TestBytes(t *testing.T) { jsonhttptest.Request(t, client, http.MethodPost, resource, http.StatusCreated, jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"), jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, "0"), jsonhttptest.WithRequestBody(bytes.NewReader(content)), jsonhttptest.WithExpectedJSONResponse(api.BytesPostResponse{ Reference: chunkAddr, diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index a365212a4ec..a3be8d00be8 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -303,7 +303,7 @@ func (s *Service) fileUploadHandler( reference := manifestReference historyReference := swarm.ZeroAddress if act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress) + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") @@ -525,7 +525,7 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV cache = *headers.Cache } - rLevel := redundancy.DefaultLevel + rLevel := redundancy.DefaultDownloadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } @@ -748,7 +748,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h jsonhttp.BadRequest(w, "could not parse headers") return } - rLevel := redundancy.DefaultLevel + rLevel := redundancy.DefaultDownloadLevel if headers.RLevel != nil { rLevel = *headers.RLevel } diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 589222c3f77..3a122c9d0ed 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -877,7 +877,7 @@ func TestFeedIndirection(t *testing.T) { } m, err := manifest.NewDefaultManifest( - loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0), redundancy.DefaultLevel), + loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0), redundancy.DefaultDownloadLevel), false, ) if err != nil { diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 6ddb2a50b10..80130df2d31 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -14,6 +14,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storer" @@ -185,7 +186,7 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { reference := chunk.Address() historyReference := swarm.ZeroAddress if headers.Act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, redundancy.DefaultUploadLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/dirs.go b/pkg/api/dirs.go index ae0f15d23c0..d3fa10ab1d3 100644 --- a/pkg/api/dirs.go +++ b/pkg/api/dirs.go @@ -103,7 +103,7 @@ func (s *Service) dirUploadHandler( encryptedReference := reference historyReference := swarm.ZeroAddress if act { - encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress) + encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/dirs_test.go b/pkg/api/dirs_test.go index bcf4f1ea1c5..3f7ac099ba5 100644 --- a/pkg/api/dirs_test.go +++ b/pkg/api/dirs_test.go @@ -283,7 +283,7 @@ func TestDirs(t *testing.T) { // verify manifest content verifyManifest, err := manifest.NewDefaultManifestReference( resp.Reference, - loadsave.NewReadonly(storer.ChunkStore(), storer.Cache(), redundancy.DefaultLevel), + loadsave.NewReadonly(storer.ChunkStore(), storer.Cache(), redundancy.DefaultDownloadLevel), ) if err != nil { t.Fatal(err) diff --git a/pkg/api/feed.go b/pkg/api/feed.go index 1b9f1367acf..12c87736896 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -168,17 +168,23 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) return } + rLevel := redundancy.DefaultUploadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( tag storer.SessionInfo err error @@ -227,7 +233,7 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { logger: logger, } - l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0), redundancy.DefaultLevel) + l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, rLevel), rLevel) feedManifest, err := manifest.NewDefaultManifest(l, false) if err != nil { logger.Debug("create manifest failed", "error", err) @@ -279,7 +285,7 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { encryptedReference := ref historyReference := swarm.ZeroAddress if headers.Act { - encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, ref, headers.HistoryAddress) + encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, ref, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/feed_test.go b/pkg/api/feed_test.go index 3f08e063b85..4429d1dc7c2 100644 --- a/pkg/api/feed_test.go +++ b/pkg/api/feed_test.go @@ -280,7 +280,7 @@ func TestFeed_Post(t *testing.T) { }), ) - ls := loadsave.NewReadonly(mockStorer.ChunkStore(), mockStorer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(mockStorer.ChunkStore(), mockStorer.Cache(), redundancy.DefaultDownloadLevel) i, err := manifest.NewMantarayManifestReference(expReference, ls) if err != nil { t.Fatal(err) diff --git a/pkg/api/pin.go b/pkg/api/pin.go index 696f5185150..9ce1bf053bd 100644 --- a/pkg/api/pin.go +++ b/pkg/api/pin.go @@ -32,6 +32,19 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { return } + headers := struct { + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + rLevel := redundancy.DefaultDownloadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + has, err := s.storer.HasPin(paths.Reference) if err != nil { logger.Debug("pin root hash: has pin failed", "chunk_address", paths.Reference, "error", err) @@ -53,7 +66,7 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { } getter := s.storer.Download(true) - traverser := traversal.New(getter, s.storer.Cache(), redundancy.DefaultLevel) + traverser := traversal.New(getter, s.storer.Cache()) sem := semaphore.NewWeighted(100) var errTraverse error @@ -93,6 +106,7 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { }() return nil }, + rLevel, ) wg.Wait() diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 07623e3ff52..00309ea3043 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -14,6 +14,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/soc" @@ -47,10 +48,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id"` - StampSig []byte `map:"Swarm-Postage-Stamp"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` + StampSig []byte `map:"Swarm-Postage-Stamp"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -181,7 +183,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { reference := sch.Address() historyReference := swarm.ZeroAddress if headers.Act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + rLevel := redundancy.DefaultUploadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/stewardship.go b/pkg/api/stewardship.go index b11b5ea5a6c..9767334baca 100644 --- a/pkg/api/stewardship.go +++ b/pkg/api/stewardship.go @@ -8,6 +8,7 @@ import ( "errors" "net/http" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/postage" storage "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -29,13 +30,19 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) return } + rLevel := redundancy.DefaultUploadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( batchID []byte err error @@ -57,7 +64,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) return } - err = s.steward.Reupload(r.Context(), paths.Address, stamper) + err = s.steward.Reupload(r.Context(), paths.Address, stamper, rLevel) if err != nil { logger.Debug("re-upload failed", "chunk_address", paths.Address, "error", err) logger.Error(nil, "re-upload failed") @@ -91,7 +98,20 @@ func (s *Service) stewardshipGetHandler(w http.ResponseWriter, r *http.Request) return } - res, err := s.steward.IsRetrievable(r.Context(), paths.Address) + headers := struct { + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level" validate:"omitempty,rLevel"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + rLevel := redundancy.DefaultDownloadLevel + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + + res, err := s.steward.IsRetrievable(r.Context(), paths.Address, rLevel) if err != nil { logger.Debug("is retrievable check failed", "chunk_address", paths.Address, "error", err) logger.Error(nil, "is retrievable") diff --git a/pkg/file/addresses/addresses_getter_test.go b/pkg/file/addresses/addresses_getter_test.go index 3ad1c8237e7..5f6f0678882 100644 --- a/pkg/file/addresses/addresses_getter_test.go +++ b/pkg/file/addresses/addresses_getter_test.go @@ -64,7 +64,7 @@ func TestAddressesGetterIterateChunkAddresses(t *testing.T) { addressesGetter := addresses.NewGetter(store, addressIterFunc) - j, _, err := joiner.New(ctx, addressesGetter, store, rootChunk.Address(), redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, addressesGetter, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } diff --git a/pkg/file/file_test.go b/pkg/file/file_test.go index 2a5cc038631..810aa284740 100644 --- a/pkg/file/file_test.go +++ b/pkg/file/file_test.go @@ -63,7 +63,7 @@ func testSplitThenJoin(t *testing.T) { } // then join - r, l, err := joiner.New(ctx, store, store, resultAddress, redundancy.DefaultLevel) + r, l, err := joiner.New(ctx, store, store, resultAddress, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } diff --git a/pkg/file/joiner/joiner_test.go b/pkg/file/joiner/joiner_test.go index bff2b3d2aa6..dd1a7e4228e 100644 --- a/pkg/file/joiner/joiner_test.go +++ b/pkg/file/joiner/joiner_test.go @@ -43,7 +43,7 @@ func TestJoiner_ErrReferenceLength(t *testing.T) { t.Parallel() store := inmemchunkstore.New() - _, _, err := joiner.New(context.Background(), store, store, swarm.ZeroAddress, redundancy.DefaultLevel) + _, _, err := joiner.New(context.Background(), store, store, swarm.ZeroAddress, redundancy.DefaultDownloadLevel) if !errors.Is(err, storage.ErrReferenceLength) { t.Fatalf("expected ErrReferenceLength %x but got %v", swarm.ZeroAddress, err) @@ -72,7 +72,7 @@ func TestJoinerSingleChunk(t *testing.T) { } // read back data and compare - joinReader, l, err := joiner.New(ctx, store, store, mockAddr, redundancy.DefaultLevel) + joinReader, l, err := joiner.New(ctx, store, store, mockAddr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -110,7 +110,7 @@ func TestJoinerDecryptingStore_NormalChunk(t *testing.T) { } // read back data and compare - joinReader, l, err := joiner.New(ctx, st, st, mockAddr, redundancy.DefaultLevel) + joinReader, l, err := joiner.New(ctx, st, st, mockAddr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -157,7 +157,7 @@ func TestJoinerWithReference(t *testing.T) { } // read back data and compare - joinReader, l, err := joiner.New(ctx, st, st, rootChunk.Address(), redundancy.DefaultLevel) + joinReader, l, err := joiner.New(ctx, st, st, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -212,7 +212,7 @@ func TestJoinerMalformed(t *testing.T) { t.Fatal(err) } - joinReader, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultLevel) + joinReader, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -257,7 +257,7 @@ func TestEncryptDecrypt(t *testing.T) { if err != nil { t.Fatal(err) } - reader, l, err := joiner.New(context.Background(), store, store, resultAddress, redundancy.DefaultLevel) + reader, l, err := joiner.New(context.Background(), store, store, resultAddress, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -343,7 +343,7 @@ func TestSeek(t *testing.T) { t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -619,7 +619,7 @@ func TestPrefetch(t *testing.T) { t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -667,7 +667,7 @@ func TestJoinerReadAt(t *testing.T) { t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -715,7 +715,7 @@ func TestJoinerOneLevel(t *testing.T) { t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -809,7 +809,7 @@ func TestJoinerTwoLevelsAcrossChunk(t *testing.T) { t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -865,7 +865,7 @@ func TestJoinerIterateChunkAddresses(t *testing.T) { createdAddresses := []swarm.Address{rootChunk.Address(), firstAddress, secondAddress} - j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, rootChunk.Address(), redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -919,7 +919,7 @@ func TestJoinerIterateChunkAddresses_Encrypted(t *testing.T) { if err != nil { t.Fatal(err) } - j, l, err := joiner.New(context.Background(), store, store, resultAddress, redundancy.DefaultLevel) + j, l, err := joiner.New(context.Background(), store, store, resultAddress, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -1123,7 +1123,7 @@ func TestJoinerRedundancy(t *testing.T) { t.Fatal(err) } - joinReader, rootSpan, err := joiner.New(ctx, store, store, swarmAddr, redundancy.DefaultLevel) + joinReader, rootSpan, err := joiner.New(ctx, store, store, swarmAddr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -1318,7 +1318,7 @@ func runRedundancyTest(t *testing.T, rLevel redundancy.Level, encrypt bool, size t.Fatal(err) } - j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultLevel) + j, _, err := joiner.New(ctx, store, store, addr, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } diff --git a/pkg/file/loadsave/loadsave_test.go b/pkg/file/loadsave/loadsave_test.go index a5828402d04..892f29ed32a 100644 --- a/pkg/file/loadsave/loadsave_test.go +++ b/pkg/file/loadsave/loadsave_test.go @@ -29,7 +29,7 @@ func TestLoadSave(t *testing.T) { t.Parallel() store := inmemchunkstore.New() - ls := loadsave.New(store, store, pipelineFn(store), redundancy.DefaultLevel) + ls := loadsave.New(store, store, pipelineFn(store), redundancy.DefaultDownloadLevel) ref, err := ls.Save(context.Background(), data) if err != nil { t.Fatal(err) @@ -51,7 +51,7 @@ func TestReadonlyLoadSave(t *testing.T) { store := inmemchunkstore.New() factory := pipelineFn(store) - ls := loadsave.NewReadonly(store, store, redundancy.DefaultLevel) + ls := loadsave.NewReadonly(store, store, redundancy.DefaultDownloadLevel) _, err := ls.Save(context.Background(), data) if !errors.Is(err, loadsave.ErrReadonlyLoadSave) { t.Fatal("expected error but got none") diff --git a/pkg/file/pipeline/hashtrie/hashtrie_test.go b/pkg/file/pipeline/hashtrie/hashtrie_test.go index 9f502f3fd5d..7df56b8f0ad 100644 --- a/pkg/file/pipeline/hashtrie/hashtrie_test.go +++ b/pkg/file/pipeline/hashtrie/hashtrie_test.go @@ -194,7 +194,7 @@ func TestLevels_TrieFull(t *testing.T) { Params: *r, } - ht = hashtrie.NewHashTrieWriter(ctx, hashSize, rMock, pf, s, redundancy.DefaultLevel) + ht = hashtrie.NewHashTrieWriter(ctx, hashSize, rMock, pf, s, redundancy.DefaultDownloadLevel) ) // to create a level wrap we need to do branching^(level-1) writes diff --git a/pkg/file/redundancy/level.go b/pkg/file/redundancy/level.go index 8a3b30a82ff..118033df4f7 100644 --- a/pkg/file/redundancy/level.go +++ b/pkg/file/redundancy/level.go @@ -173,5 +173,9 @@ func GetReplicaCounts() [5]int { // we use an approximation as the successive powers of 2 var replicaCounts = [5]int{0, 2, 4, 8, 16} -// DefaultLevel is the default redundancy level -const DefaultLevel = PARANOID +// DefaultDownloadLevel is the default redundancy level for downloading chunks +// expected to exist in the network (non-feed chunks) +const DefaultDownloadLevel = PARANOID + +// DefaultUploadLevel is the default redundancy level for uploading chunks +const DefaultUploadLevel = MEDIUM diff --git a/pkg/steward/mock/steward.go b/pkg/steward/mock/steward.go index b8cee8b3588..737ddf8b3d4 100644 --- a/pkg/steward/mock/steward.go +++ b/pkg/steward/mock/steward.go @@ -7,6 +7,7 @@ package mock import ( "context" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -18,14 +19,14 @@ type Steward struct { // Reupload implements steward.Interface Reupload method. // The given address is recorded. -func (s *Steward) Reupload(_ context.Context, addr swarm.Address, _ postage.Stamper) error { +func (s *Steward) Reupload(_ context.Context, addr swarm.Address, _ postage.Stamper, _ redundancy.Level) error { s.addr = addr return nil } // IsRetrievable implements steward.Interface IsRetrievable method. // The method always returns true. -func (s *Steward) IsRetrievable(_ context.Context, addr swarm.Address) (bool, error) { +func (s *Steward) IsRetrievable(_ context.Context, addr swarm.Address, _ redundancy.Level) (bool, error) { return addr.Equal(s.addr), nil } diff --git a/pkg/steward/steward.go b/pkg/steward/steward.go index 4272095f4ee..463d389814e 100644 --- a/pkg/steward/steward.go +++ b/pkg/steward/steward.go @@ -24,11 +24,11 @@ import ( type Interface interface { // Reupload root hash and all of its underlying // associated chunks to the network. - Reupload(context.Context, swarm.Address, postage.Stamper) error + Reupload(context.Context, swarm.Address, postage.Stamper, redundancy.Level) error // IsRetrievable checks whether the content // on the given address is retrievable. - IsRetrievable(context.Context, swarm.Address) (bool, error) + IsRetrievable(context.Context, swarm.Address, redundancy.Level) (bool, error) } type steward struct { @@ -41,8 +41,8 @@ type steward struct { func New(ns storer.NetStore, r retrieval.Interface, joinerPutter storage.Putter) Interface { return &steward{ netStore: ns, - traverser: traversal.New(ns.Download(true), joinerPutter, redundancy.DefaultLevel), - netTraverser: traversal.New(&netGetter{r}, joinerPutter, redundancy.DefaultLevel), + traverser: traversal.New(ns.Download(true), joinerPutter), + netTraverser: traversal.New(&netGetter{r}, joinerPutter), netGetter: r, } } @@ -52,7 +52,7 @@ func New(ns storer.NetStore, r retrieval.Interface, joinerPutter storage.Putter) // addresses and push every chunk individually to the network. // It assumes all chunks are available locally. It is therefore // advisable to pin the content locally before trying to reupload it. -func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper postage.Stamper) error { +func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper postage.Stamper, rLevel redundancy.Level) error { uploaderSession := s.netStore.DirectUpload() getter := s.netStore.Download(false) @@ -70,7 +70,7 @@ func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper post return uploaderSession.Put(ctx, c.WithStamp(stamp)) } - if err := s.traverser.Traverse(ctx, root, fn); err != nil { + if err := s.traverser.Traverse(ctx, root, fn, rLevel); err != nil { return errors.Join( fmt.Errorf("traversal of %s failed: %w", root.String(), err), uploaderSession.Cleanup(), @@ -81,12 +81,12 @@ func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper post } // IsRetrievable implements Interface.IsRetrievable method. -func (s *steward) IsRetrievable(ctx context.Context, root swarm.Address) (bool, error) { +func (s *steward) IsRetrievable(ctx context.Context, root swarm.Address, rLevel redundancy.Level) (bool, error) { fn := func(a swarm.Address) error { _, err := s.netGetter.RetrieveChunk(ctx, a, swarm.ZeroAddress) return err } - switch err := s.netTraverser.Traverse(ctx, root, fn); { + switch err := s.netTraverser.Traverse(ctx, root, fn, rLevel); { case errors.Is(err, storage.ErrNotFound): return false, nil case errors.Is(err, topology.ErrNotFound): diff --git a/pkg/steward/steward_test.go b/pkg/steward/steward_test.go index 7f80614e6d6..336729d9b12 100644 --- a/pkg/steward/steward_test.go +++ b/pkg/steward/steward_test.go @@ -87,7 +87,7 @@ func TestSteward(t *testing.T) { } }() - err = s.Reupload(ctx, addr, stamper) + err = s.Reupload(ctx, addr, stamper, redundancy.PARANOID) if err != nil { t.Fatal(err) } @@ -104,7 +104,7 @@ func TestSteward(t *testing.T) { default: } - isRetrievable, err := s.IsRetrievable(ctx, addr) + isRetrievable, err := s.IsRetrievable(ctx, addr, redundancy.PARANOID) if err != nil { t.Fatal(err) } diff --git a/pkg/traversal/traversal.go b/pkg/traversal/traversal.go index 7bb3ade6ce9..2516be5301a 100644 --- a/pkg/traversal/traversal.go +++ b/pkg/traversal/traversal.go @@ -26,25 +26,24 @@ import ( // Traverser represents service which traverse through address dependent chunks. type Traverser interface { // Traverse iterates through each address related to the supplied one, if possible. - Traverse(context.Context, swarm.Address, swarm.AddressIterFunc) error + Traverse(context.Context, swarm.Address, swarm.AddressIterFunc, redundancy.Level) error } // New constructs for a new Traverser. -func New(getter storage.Getter, putter storage.Putter, rLevel redundancy.Level) Traverser { - return &service{getter: getter, putter: putter, rLevel: rLevel} +func New(getter storage.Getter, putter storage.Putter) Traverser { + return &service{getter: getter, putter: putter} } // service is implementation of Traverser using storage.Storer as its storage. type service struct { getter storage.Getter putter storage.Putter - rLevel redundancy.Level } // Traverse implements Traverser.Traverse method. -func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm.AddressIterFunc) error { +func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm.AddressIterFunc, rLevel redundancy.Level) error { processBytes := func(ref swarm.Address) error { - j, _, err := joiner.New(ctx, s.getter, s.putter, ref, s.rLevel) + j, _, err := joiner.New(ctx, s.getter, s.putter, ref, rLevel) if err != nil { return fmt.Errorf("traversal: joiner error on %q: %w", ref, err) } @@ -67,7 +66,7 @@ func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm } } - j, _, err := joiner.New(ctx, s.getter, s.putter, addr, s.rLevel) + j, _, err := joiner.New(ctx, s.getter, s.putter, addr, rLevel) if err != nil { return err } @@ -77,7 +76,7 @@ func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm // then the reference is likely a manifest reference. This is because manifest holds metadata // that points to the actual data file, and this metadata is assumed to be small - Less than or equal to swarm.ChunkSize. if j.Size() <= swarm.ChunkSize { - ls := loadsave.NewReadonly(s.getter, s.putter, s.rLevel) + ls := loadsave.NewReadonly(s.getter, s.putter, rLevel) switch mf, err := manifest.NewDefaultManifestReference(addr, ls); { case errors.Is(err, manifest.ErrInvalidManifestType): break diff --git a/pkg/traversal/traversal_test.go b/pkg/traversal/traversal_test.go index 2853e7961f6..70a3c50f37d 100644 --- a/pkg/traversal/traversal_test.go +++ b/pkg/traversal/traversal_test.go @@ -167,7 +167,7 @@ func TestTraversalBytes(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock, redundancy.DefaultLevel).Traverse(ctx, address, iter.Next) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -261,7 +261,7 @@ func TestTraversalFiles(t *testing.T) { t.Fatal(err) } - ls := loadsave.New(storerMock, storerMock, pipelineFactory(storerMock, false), redundancy.DefaultLevel) + ls := loadsave.New(storerMock, storerMock, pipelineFactory(storerMock, false), redundancy.DefaultDownloadLevel) fManifest, err := manifest.NewDefaultManifest(ls, false) if err != nil { t.Fatal(err) @@ -293,7 +293,7 @@ func TestTraversalFiles(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock, redundancy.DefaultLevel).Traverse(ctx, address, iter.Next) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -419,7 +419,7 @@ func TestTraversalManifest(t *testing.T) { } wantHashes = append(wantHashes, tc.manifestHashes...) - ls := loadsave.New(storerMock, storerMock, pipelineFactory(storerMock, false), redundancy.DefaultLevel) + ls := loadsave.New(storerMock, storerMock, pipelineFactory(storerMock, false), redundancy.DefaultDownloadLevel) dirManifest, err := manifest.NewMantarayManifest(ls, false) if err != nil { t.Fatal(err) @@ -450,7 +450,7 @@ func TestTraversalManifest(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock, redundancy.DefaultLevel).Traverse(ctx, address, iter.Next) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultDownloadLevel) if err != nil { t.Fatal(err) } @@ -488,7 +488,7 @@ func TestTraversalSOC(t *testing.T) { t.Fatal(err) } - err = traversal.New(store, store, 0).Traverse(ctx, sch.Address(), iter.Next) + err = traversal.New(store, store).Traverse(ctx, sch.Address(), iter.Next, redundancy.NONE) if err != nil { t.Fatal(err) }