From 0fef5ec04eba82984e68fe664d8f634083e1c664 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Mon, 23 Sep 2024 22:34:15 -0400 Subject: [PATCH 1/5] feat: repair pin --- openapi/Swarm.yaml | 24 +++++ openapi/SwarmCommon.yaml | 15 +++ pkg/api/api.go | 7 +- pkg/api/pin.go | 59 ++++++++++- pkg/api/pin_test.go | 13 ++- pkg/api/router.go | 6 ++ pkg/storer/internal/pinning/pinning.go | 2 +- pkg/storer/uploadstore.go | 2 +- pkg/storer/validate.go | 99 +++++++++++++++++- pkg/storer/validate_test.go | 136 +++++++++++++++++++++++++ pkg/util/syncutil/syncutil.go | 6 ++ 11 files changed, 354 insertions(+), 15 deletions(-) create mode 100644 pkg/storer/validate_test.go diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 98afdaa4d34..fa877581abe 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -761,6 +761,30 @@ paths: default: description: Default response + "/pins/repair": + get: + summary: Repair pinned chunks by fetching missing/invalid chunks from the network + tags: + - Pinning + parameters: + - in: query + name: ref + schema: + $ref: "SwarmCommon.yaml#/components/schemas/SwarmOnlyReference" + required: false + description: The number of items to skip before starting to collect the result set. + responses: + "200": + description: List of repaired chunks + content: + application/json: + schema: + $ref: "SwarmCommon.yaml#/components/schemas/PinRepairResponse" + "500": + $ref: "SwarmCommon.yaml#/components/responses/500" + default: + description: Default response + "/pss/send/{topic}/{targets}": post: summary: Send to recipient or target with Postal Service for Swarm diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index d4b4feb8c5c..a89825ff41e 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -666,6 +666,21 @@ components: invalid: type: integer + PinRepairResponse: + type: object + properties: + reference: + $ref: "#/components/schemas/SwarmOnlyReference" + address: + $ref: "#/components/schemas/SwarmOnlyReference" + issue: + type: string + enum: + - "missing" + - "invalid" + error: + type: string + SwarmOnlyReferencesList: type: object properties: diff --git a/pkg/api/api.go b/pkg/api/api.go index 4fc70fbfb17..a1b6685419e 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -49,10 +49,10 @@ import ( "github.com/ethersphere/bee/v2/pkg/settlement/swap/erc20" "github.com/ethersphere/bee/v2/pkg/status" "github.com/ethersphere/bee/v2/pkg/steward" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storageincentives" "github.com/ethersphere/bee/v2/pkg/storageincentives/staking" - storer "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/topology" "github.com/ethersphere/bee/v2/pkg/topology/lightnode" @@ -143,7 +143,8 @@ type Storer interface { } type PinIntegrity interface { - Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat) + Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat, stat chan storer.CorruptedPinChunk) + Repair(ctx context.Context, logger log.Logger, pin string, store storer.NetStore, res chan storer.RepairPinResult) } type Service struct { diff --git a/pkg/api/pin.go b/pkg/api/pin.go index 6fc115fd1b1..5debb3586cc 100644 --- a/pkg/api/pin.go +++ b/pkg/api/pin.go @@ -12,9 +12,10 @@ import ( "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/storage" - storer "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/traversal" + "github.com/ethersphere/bee/v2/pkg/util/syncutil" "github.com/gorilla/mux" "golang.org/x/sync/semaphore" ) @@ -209,6 +210,13 @@ type PinIntegrityResponse struct { Invalid int `json:"invalid"` } +type PinRepairResponse struct { + Reference swarm.Address `json:"reference"` + Address swarm.Address `json:"address"` + Issue string `json:"issue"` + Error string `json:"error"` +} + func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) { logger := s.logger.WithName("get_pin_integrity").Build() @@ -222,8 +230,9 @@ func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) { } out := make(chan storer.PinStat) - - go s.pinIntegrity.Check(r.Context(), logger, querie.Ref.String(), out) + corrupted := make(chan storer.CorruptedPinChunk) + go s.pinIntegrity.Check(r.Context(), logger, querie.Ref.String(), out, corrupted) + go syncutil.Drain(corrupted) flusher, ok := w.(http.Flusher) if !ok { @@ -251,3 +260,47 @@ func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) { flusher.Flush() } } + +func (s *Service) repairPins(w http.ResponseWriter, r *http.Request) { + logger := s.logger.WithName("get_repair_pins").Build() + query := struct { + Ref swarm.Address `map:"ref"` + }{} + + if response := s.mapStructure(r.URL.Query(), &query); response != nil { + response("invalid query params", logger, w) + return + } + + res := make(chan storer.RepairPinResult) + go s.pinIntegrity.Repair(r.Context(), logger, query.Ref.String(), s.storer, res) + + flusher, ok := w.(http.Flusher) + if !ok { + http.NotFound(w, r) + return + } + + w.Header().Set("Transfer-Encoding", "chunked") + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + enc := json.NewEncoder(w) + for v := range res { + issue := "missing" + if v.Chunk.Invalid { + issue = "invalid" + } + resp := PinRepairResponse{ + Reference: v.Chunk.Ref, + Address: v.Chunk.Address, + Issue: issue, + Error: v.Error.Error(), + } + if err := enc.Encode(resp); err != nil { + break + } + flusher.Flush() + } +} diff --git a/pkg/api/pin_test.go b/pkg/api/pin_test.go index a85c34a9594..126c434e2e0 100644 --- a/pkg/api/pin_test.go +++ b/pkg/api/pin_test.go @@ -15,9 +15,9 @@ import ( "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/v2/pkg/log" mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/inmemstore" - storer "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer" mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -233,9 +233,16 @@ type mockPinIntegrity struct { Store storage.Store } -func (p *mockPinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat) { +func (p *mockPinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat, stat chan storer.CorruptedPinChunk) { if pin != pinRef { p.tester.Fatal("bad pin", pin) } close(out) } + +func (p *mockPinIntegrity) Repair(ctx context.Context, logger log.Logger, pin string, netStore storer.NetStore, res chan storer.RepairPinResult) { + if pin != pinRef { + p.tester.Fatal("bad pin", pin) + } + close(res) +} diff --git a/pkg/api/router.go b/pkg/api/router.go index f9a0716b84b..0af7e632f8f 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -354,6 +354,12 @@ func (s *Service) mountAPI() { }), )) + handle("/pins/repair", web.ChainHandlers( + web.FinalHandler(jsonhttp.MethodHandler{ + "GET": http.HandlerFunc(s.repairPins), + }), + )) + handle("/pins/{reference}", web.ChainHandlers( web.FinalHandler(jsonhttp.MethodHandler{ "GET": http.HandlerFunc(s.getPinnedRootHash), diff --git a/pkg/storer/internal/pinning/pinning.go b/pkg/storer/internal/pinning/pinning.go index 4af5a2f8b71..0f4f308869e 100644 --- a/pkg/storer/internal/pinning/pinning.go +++ b/pkg/storer/internal/pinning/pinning.go @@ -13,7 +13,7 @@ import ( "runtime" "github.com/ethersphere/bee/v2/pkg/encryption" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "golang.org/x/sync/errgroup" diff --git a/pkg/storer/uploadstore.go b/pkg/storer/uploadstore.go index f4f21fc59ba..9040b4630ce 100644 --- a/pkg/storer/uploadstore.go +++ b/pkg/storer/uploadstore.go @@ -10,7 +10,7 @@ import ( "fmt" "sort" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer/internal" pinstore "github.com/ethersphere/bee/v2/pkg/storer/internal/pinning" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" diff --git a/pkg/storer/validate.go b/pkg/storer/validate.go index 0220c4fb899..0da274d5c4b 100644 --- a/pkg/storer/validate.go +++ b/pkg/storer/validate.go @@ -10,18 +10,20 @@ import ( "os" "path" "sync" - "time" - "sync/atomic" + "time" "github.com/ethersphere/bee/v2/pkg/cac" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/sharky" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage/leveldbstore" "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore" pinstore "github.com/ethersphere/bee/v2/pkg/storer/internal/pinning" + "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/bee/v2/pkg/util/syncutil" ) // Validate ensures that all retrievalIndex chunks are correctly stored in sharky. @@ -240,7 +242,9 @@ func ValidatePinCollectionChunks(ctx context.Context, basePath, pin, location st defer f.Close() var ch = make(chan PinStat) - go pv.Check(ctx, logger, pin, ch) + corrupted := make(chan CorruptedPinChunk) + go pv.Check(ctx, logger, pin, ch, corrupted) + go syncutil.Drain(corrupted) for st := range ch { report := fmt.Sprintf("%d\t%d\t%d\t%s\n", st.Invalid, st.Missing, st.Total, st.Ref) @@ -264,7 +268,18 @@ type PinStat struct { Total, Missing, Invalid int } -func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan PinStat) { +type CorruptedPinChunk struct { + Ref swarm.Address + Address swarm.Address + Missing, Invalid bool +} + +type RepairPinResult struct { + Chunk CorruptedPinChunk + Error error +} + +func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan PinStat, corrupted chan CorruptedPinChunk) { var stats struct { total, read, invalid atomic.Int32 } @@ -272,6 +287,7 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, n := time.Now() defer func() { close(out) + close(corrupted) logger.Info("done", "duration", time.Since(n), "read", stats.read.Load(), "invalid", stats.invalid.Load(), "total", stats.total.Load()) }() @@ -342,6 +358,11 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, } if !validChunk(item, buf[:item.Location.Length]) { invalid.Add(1) + select { + case <-ctx.Done(): + return + case corrupted <- CorruptedPinChunk{Ref: pin, Address: item.Address, Invalid: true}: + } } } }() @@ -362,6 +383,11 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, rIdx := &chunkstore.RetrievalIndexItem{Address: addr} if err := p.Store.Get(rIdx); err != nil { missing.Add(1) + select { + case <-ctx.Done(): + return true, nil + case corrupted <- CorruptedPinChunk{Ref: pin, Address: addr, Missing: true}: + } } else { select { case <-ctx.Done(): @@ -400,3 +426,68 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, } } } + +func (p *PinIntegrity) Repair(ctx context.Context, logger log.Logger, pin string, netStore NetStore, res chan RepairPinResult) { + defer func() { + close(res) + }() + + out := make(chan PinStat) + corrupted := make(chan CorruptedPinChunk) + go p.Check(ctx, logger, pin, out, corrupted) + go syncutil.Drain(out) + + for v := range corrupted { + bStore := p.Store.(*leveldbstore.Store) + s := transaction.NewStorage(p.Sharky, bStore) + + var ch swarm.Chunk + var err error + if v.Missing || v.Invalid { + ch, err = netStore.Download(false).Get(ctx, v.Address) + if err != nil { + r := RepairPinResult{ + Chunk: v, + Error: fmt.Errorf("download failed: %s", err.Error()), + } + select { + case <-ctx.Done(): + return + case res <- r: + } + } + } + + if v.Missing { + logger.Info("repairing missing chunk", "address", v.Address) + err = s.Run(ctx, func(st transaction.Store) error { + err = st.ChunkStore().Put(ctx, ch) + if err != nil { + return fmt.Errorf("put missing chunk: %w", err) + } + return nil + }) + } + + if v.Invalid { + logger.Info("repairing invalid chunk (replace)", "address", v.Address) + err = s.Run(ctx, func(st transaction.Store) error { + err = st.ChunkStore().Replace(ctx, ch) + if err != nil { + return fmt.Errorf("replacing invalid chunk: %w", err) + } + return nil + }) + } + + r := RepairPinResult{ + Chunk: v, + Error: err, + } + select { + case <-ctx.Done(): + return + case res <- r: + } + } +} diff --git a/pkg/storer/validate_test.go b/pkg/storer/validate_test.go new file mode 100644 index 00000000000..9aea4aae852 --- /dev/null +++ b/pkg/storer/validate_test.go @@ -0,0 +1,136 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer + +import ( + "context" + "os" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/postage" + batchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock" + "github.com/ethersphere/bee/v2/pkg/storage" + chunktesting "github.com/ethersphere/bee/v2/pkg/storage/testing" + "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/bee/v2/pkg/topology" + kademlia "github.com/ethersphere/bee/v2/pkg/topology/mock" + "github.com/ethersphere/bee/v2/pkg/util/syncutil" +) + +func dbTestOps(baseAddr swarm.Address, reserveCapacity int, bs postage.Storer, radiusSetter topology.SetStorageRadiuser, reserveWakeUpTime time.Duration) *Options { + opts := DefaultOptions() + if radiusSetter == nil { + radiusSetter = kademlia.NewTopologyDriver() + } + + if bs == nil { + bs = batchstore.New() + } + + opts.Address = baseAddr + opts.RadiusSetter = radiusSetter + opts.ReserveCapacity = reserveCapacity + opts.Batchstore = bs + opts.ReserveWakeUpDuration = reserveWakeUpTime + opts.Logger = log.Noop + + return opts +} + +type testRetrieval struct { + fn func(swarm.Address) (swarm.Chunk, error) +} + +func (t *testRetrieval) RetrieveChunk(_ context.Context, address swarm.Address, _ swarm.Address) (swarm.Chunk, error) { + return t.fn(address) +} + +func TestPinIntegrity_Repair(t *testing.T) { + + opts := dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second) + db, err := New(context.Background(), os.TempDir(), opts) + if err != nil { + t.Fatal(err) + } + + chunks := chunktesting.GenerateTestRandomChunks(10) + session, err := db.NewCollection(context.TODO()) + if err != nil { + t.Fatal(err) + } + + for _, ch := range chunks { + err := session.Put(context.TODO(), ch) + if err != nil { + t.Fatal(err) + } + } + + pin := chunks[0].Address() + err = session.Done(pin) + if err != nil { + t.Fatal(err) + } + + // no repair needed + count := 0 + res := make(chan RepairPinResult) + db.pinIntegrity.Repair(context.Background(), log.Noop, pin.String(), db, res) + for range res { + count++ + } + if count != 0 { + t.Fatalf("expected 0 repairs, got %d", count) + } + + // repair needed + count = 0 + err = db.storage.Run(context.Background(), func(s transaction.Store) error { + return s.ChunkStore().Delete(context.Background(), chunks[1].Address()) + }) + if err != nil { + t.Fatal(err) + } + + res = make(chan RepairPinResult) + db.SetRetrievalService(&testRetrieval{fn: func(address swarm.Address) (swarm.Chunk, error) { + for _, ch := range chunks { + if ch.Address().Equal(address) { + return ch, nil + } + } + return nil, storage.ErrNotFound + }}) + go db.pinIntegrity.Repair(context.Background(), log.Noop, pin.String(), db, res) + for range res { + count++ + } + + if count != 1 { + t.Fatalf("expected 1 repair, got %d", count) + } + + out := make(chan PinStat) + corrupted := make(chan CorruptedPinChunk) + go db.pinIntegrity.Check(context.Background(), log.Noop, pin.String(), out, corrupted) + go syncutil.Drain(corrupted) + + v := <-out + if !v.Ref.Equal(pin) { + t.Fatalf("expected pin %s, got %s", pin, v.Ref) + } + if v.Total != 10 { + t.Fatalf("expected total 10, got %d", v.Total) + } + if v.Missing != 0 { + t.Fatalf("expected missing 0, got %d", v.Missing) + } + if v.Invalid != 0 { + t.Fatalf("expected invalid 0, got %d", v.Invalid) + } +} diff --git a/pkg/util/syncutil/syncutil.go b/pkg/util/syncutil/syncutil.go index c06e9ebdeba..b54e0beae3f 100644 --- a/pkg/util/syncutil/syncutil.go +++ b/pkg/util/syncutil/syncutil.go @@ -24,3 +24,9 @@ func WaitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { return false } } + +// Drain drains the channel until it's closed. +func Drain[T any](c <-chan T) { + for range c { + } +} From 034304b10d6240855602ba4f6f79d89183ae776f Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Wed, 25 Sep 2024 13:28:50 -0400 Subject: [PATCH 2/5] fix: goleak --- pkg/api/api.go | 2 +- pkg/api/pin_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index a1b6685419e..06a90df07fb 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -143,7 +143,7 @@ type Storer interface { } type PinIntegrity interface { - Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat, stat chan storer.CorruptedPinChunk) + Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat, corrupted chan storer.CorruptedPinChunk) Repair(ctx context.Context, logger log.Logger, pin string, store storer.NetStore, res chan storer.RepairPinResult) } diff --git a/pkg/api/pin_test.go b/pkg/api/pin_test.go index 126c434e2e0..2d8eaed5ed9 100644 --- a/pkg/api/pin_test.go +++ b/pkg/api/pin_test.go @@ -233,11 +233,12 @@ type mockPinIntegrity struct { Store storage.Store } -func (p *mockPinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat, stat chan storer.CorruptedPinChunk) { +func (p *mockPinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat, corrupted chan storer.CorruptedPinChunk) { if pin != pinRef { p.tester.Fatal("bad pin", pin) } close(out) + close(corrupted) } func (p *mockPinIntegrity) Repair(ctx context.Context, logger log.Logger, pin string, netStore storer.NetStore, res chan storer.RepairPinResult) { From eb090b654b9db8b7b1613fbac55c6661009f0f4b Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Mon, 25 Nov 2024 23:59:32 -0500 Subject: [PATCH 3/5] fix: use post --- openapi/Swarm.yaml | 2 +- pkg/api/pin.go | 2 +- pkg/api/router.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index fa877581abe..cfbac766973 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -762,7 +762,7 @@ paths: description: Default response "/pins/repair": - get: + post: summary: Repair pinned chunks by fetching missing/invalid chunks from the network tags: - Pinning diff --git a/pkg/api/pin.go b/pkg/api/pin.go index 5debb3586cc..7002c5af334 100644 --- a/pkg/api/pin.go +++ b/pkg/api/pin.go @@ -262,7 +262,7 @@ func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) { } func (s *Service) repairPins(w http.ResponseWriter, r *http.Request) { - logger := s.logger.WithName("get_repair_pins").Build() + logger := s.logger.WithName("post_repair_pins").Build() query := struct { Ref swarm.Address `map:"ref"` }{} diff --git a/pkg/api/router.go b/pkg/api/router.go index 0af7e632f8f..d058ea1b59c 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -356,7 +356,7 @@ func (s *Service) mountAPI() { handle("/pins/repair", web.ChainHandlers( web.FinalHandler(jsonhttp.MethodHandler{ - "GET": http.HandlerFunc(s.repairPins), + "POST": http.HandlerFunc(s.repairPins), }), )) From e4311c4bb30324224c738f2b1a532a38c3d67dfc Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Tue, 26 Nov 2024 00:11:52 -0500 Subject: [PATCH 4/5] fix: update route --- openapi/Swarm.yaml | 42 +++++++++++++++++++++--------------------- pkg/api/pin.go | 11 +++++------ pkg/api/router.go | 5 ++--- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 94122094158..b3fa5cf9eff 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -720,50 +720,50 @@ paths: default: description: Default response - "/pins": - get: - summary: Get the list of pinned root hash references + "/pins/{reference}/repair": + post: + summary: Repair pinned chunks by fetching missing/invalid chunks from the network tags: - Pinning + parameters: + - in: path + name: reference + schema: + $ref: "SwarmCommon.yaml#/components/schemas/SwarmOnlyReference" + required: true + description: Swarm reference of the root hash. responses: "200": - description: List of pinned root hash references + description: List of repaired chunks content: application/json: schema: - $ref: "SwarmCommon.yaml#/components/schemas/SwarmOnlyReferencesList" + $ref: "SwarmCommon.yaml#/components/schemas/PinRepairResponse" "500": $ref: "SwarmCommon.yaml#/components/responses/500" default: description: Default response - "/pins/check": + "/pins": get: - summary: Validate pinned chunks integerity + summary: Get the list of pinned root hash references tags: - Pinning - parameters: - - in: query - name: ref - schema: - $ref: "SwarmCommon.yaml#/components/schemas/SwarmOnlyReference" - required: false - description: The number of items to skip before starting to collect the result set. responses: "200": - description: List of checked root hash references + description: List of pinned root hash references content: application/json: schema: - $ref: "SwarmCommon.yaml#/components/schemas/PinCheckResponse" + $ref: "SwarmCommon.yaml#/components/schemas/SwarmOnlyReferencesList" "500": $ref: "SwarmCommon.yaml#/components/responses/500" default: description: Default response - "/pins/repair": - post: - summary: Repair pinned chunks by fetching missing/invalid chunks from the network + "/pins/check": + get: + summary: Validate pinned chunks integerity tags: - Pinning parameters: @@ -775,11 +775,11 @@ paths: description: The number of items to skip before starting to collect the result set. responses: "200": - description: List of repaired chunks + description: List of checked root hash references content: application/json: schema: - $ref: "SwarmCommon.yaml#/components/schemas/PinRepairResponse" + $ref: "SwarmCommon.yaml#/components/schemas/PinCheckResponse" "500": $ref: "SwarmCommon.yaml#/components/responses/500" default: diff --git a/pkg/api/pin.go b/pkg/api/pin.go index 7002c5af334..e46fbdbade5 100644 --- a/pkg/api/pin.go +++ b/pkg/api/pin.go @@ -263,17 +263,16 @@ func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) { func (s *Service) repairPins(w http.ResponseWriter, r *http.Request) { logger := s.logger.WithName("post_repair_pins").Build() - query := struct { - Ref swarm.Address `map:"ref"` + paths := struct { + Reference swarm.Address `map:"reference" validate:"required"` }{} - - if response := s.mapStructure(r.URL.Query(), &query); response != nil { - response("invalid query params", logger, w) + if response := s.mapStructure(mux.Vars(r), &paths); response != nil { + response("invalid path params", logger, w) return } res := make(chan storer.RepairPinResult) - go s.pinIntegrity.Repair(r.Context(), logger, query.Ref.String(), s.storer, res) + go s.pinIntegrity.Repair(r.Context(), logger, paths.Reference.String(), s.storer, res) flusher, ok := w.(http.Flusher) if !ok { diff --git a/pkg/api/router.go b/pkg/api/router.go index e89ce9a6581..3abc1ce30f4 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -373,10 +373,9 @@ func (s *Service) mountAPI() { "GET": http.HandlerFunc(s.getPinnedRootHash), "POST": http.HandlerFunc(s.pinRootHash), "DELETE": http.HandlerFunc(s.unpinRootHash), - }, - ) + }) - handle("/pins/repair", web.ChainHandlers( + handle("/pins/{reference}/repair", web.ChainHandlers( web.FinalHandler(jsonhttp.MethodHandler{ "POST": http.HandlerFunc(s.repairPins), }), From 19e579b48dbb922bb8f4af45d5f6de8616b92e09 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Tue, 26 Nov 2024 00:16:52 -0500 Subject: [PATCH 5/5] fix: validate --- pkg/storer/validate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storer/validate.go b/pkg/storer/validate.go index 0da274d5c4b..623b6845aab 100644 --- a/pkg/storer/validate.go +++ b/pkg/storer/validate.go @@ -472,7 +472,7 @@ func (p *PinIntegrity) Repair(ctx context.Context, logger log.Logger, pin string if v.Invalid { logger.Info("repairing invalid chunk (replace)", "address", v.Address) err = s.Run(ctx, func(st transaction.Store) error { - err = st.ChunkStore().Replace(ctx, ch) + err = st.ChunkStore().Replace(ctx, ch, false) if err != nil { return fmt.Errorf("replacing invalid chunk: %w", err) }