Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add commited depth field to status protocol #4892

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,8 @@ components:
type: boolean
lastSyncedBlock:
type: integer
commitedDepth:
type: integer

StatusResponse:
type: object
Expand Down
1 change: 0 additions & 1 deletion pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,6 @@ func createRedistributionAgentService(
tranService,
&mockHealth{},
log.Noop,
0,
)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type statusSnapshotResponse struct {
BatchCommitment uint64 `json:"batchCommitment"`
IsReachable bool `json:"isReachable"`
LastSyncedBlock uint64 `json:"lastSyncedBlock"`
CommitedDepth uint8 `json:"commitedDepth"`
}

type statusResponse struct {
Expand Down Expand Up @@ -94,6 +95,7 @@ func (s *Service) statusGetHandler(w http.ResponseWriter, _ *http.Request) {
BatchCommitment: ss.BatchCommitment,
IsReachable: ss.IsReachable,
LastSyncedBlock: ss.LastSyncedBlock,
CommitedDepth: uint8(ss.CommitedDepth),
})
}

Expand Down Expand Up @@ -141,6 +143,7 @@ func (s *Service) statusGetPeersHandler(w http.ResponseWriter, r *http.Request)
snapshot.BatchCommitment = ss.BatchCommitment
snapshot.IsReachable = ss.IsReachable
snapshot.LastSyncedBlock = ss.LastSyncedBlock
snapshot.CommitedDepth = uint8(ss.CommitedDepth)
}

mu.Lock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestGetStatus(t *testing.T) {
BatchCommitment: 1,
IsReachable: true,
LastSyncedBlock: 6092500,
CommitedDepth: 1,
}

ssMock := &statusSnapshotMock{
Expand All @@ -49,6 +50,7 @@ func TestGetStatus(t *testing.T) {
storageRadius: ssr.StorageRadius,
commitment: ssr.BatchCommitment,
chainState: &postage.ChainState{Block: ssr.LastSyncedBlock},
commitedDepth: ssr.CommitedDepth,
}

statusSvc := status.NewService(
Expand Down Expand Up @@ -122,6 +124,7 @@ type statusSnapshotMock struct {
commitment uint64
chainState *postage.ChainState
neighborhoods []*storer.NeighborhoodStat
commitedDepth uint8
}

func (m *statusSnapshotMock) SyncRate() float64 { return m.syncRate }
Expand All @@ -135,3 +138,4 @@ func (m *statusSnapshotMock) ReserveSizeWithinRadius() uint64 {
func (m *statusSnapshotMock) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) {
return m.neighborhoods, nil
}
func (m *statusSnapshotMock) CommitedDepth() uint8 { return m.commitedDepth }
3 changes: 1 addition & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ func NewBee(
return nil, fmt.Errorf("status service: %w", err)
}

saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile, uint8(o.ReserveCapacityDoubling))
saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile)
b.saludCloser = saludService

rC, unsub := saludService.SubscribeNetworkStorageRadius()
Expand Down Expand Up @@ -1086,7 +1086,6 @@ func NewBee(
transactionService,
saludService,
logger,
uint8(o.ReserveCapacityDoubling),
)
if err != nil {
return nil, fmt.Errorf("storage incentives agent: %w", err)
Expand Down
48 changes: 21 additions & 27 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ type service struct {

radiusSubsMtx sync.Mutex
radiusC []chan uint8

capacityDoubling uint8
}

func New(
Expand All @@ -66,20 +64,18 @@ func New(
minPeersPerbin int,
durPercentile float64,
connsPercentile float64,
capacityDoubling uint8,
) *service {

metrics := newMetrics()

s := &service{
quit: make(chan struct{}),
logger: logger.WithName(loggerName).Register(),
status: status,
topology: topology,
metrics: metrics,
isSelfHealthy: atomic.NewBool(true),
reserve: reserve,
capacityDoubling: capacityDoubling,
quit: make(chan struct{}),
logger: logger.WithName(loggerName).Register(),
status: status,
topology: topology,
metrics: metrics,
isSelfHealthy: atomic.NewBool(true),
reserve: reserve,
}

s.wg.Add(1)
Expand Down Expand Up @@ -173,7 +169,7 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
return
}

networkRadius, nHoodRadius := s.radius(peers)
networkRadius, nHoodRadius := s.commitedDepth(peers)
avgDur := totaldur / float64(len(peers))
pDur := percentileDur(peers, durPercentile)
pConns := percentileConns(peers, connsPercentile)
Expand All @@ -199,8 +195,8 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
continue
}

if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-2) {
s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr)
if networkRadius > 0 && peer.status.CommitedDepth < uint32(networkRadius-2) {
s.logger.Debug("radius health failure", "radius", peer.status.CommitedDepth, "peer_address", peer.addr)
} else if peer.dur.Seconds() > pDur {
s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr)
} else if peer.status.ConnectedPeers < pConns {
Expand All @@ -220,12 +216,10 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
}
}

networkRadiusEstimation := s.reserve.StorageRadius() + s.capacityDoubling

selfHealth := true
if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius {
if nHoodRadius == networkRadius && s.reserve.CommitedDepth() != networkRadius {
selfHealth = false
s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", networkRadiusEstimation, "network_radius", networkRadius)
s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", s.reserve.CommitedDepth(), "network_radius", networkRadius)
}

s.isSelfHealthy.Store(selfHealth)
Expand Down Expand Up @@ -294,24 +288,24 @@ func percentileConns(peers []peer, p float64) uint64 {
}

// radius finds the most common radius.
func (s *service) radius(peers []peer) (uint8, uint8) {
func (s *service) commitedDepth(peers []peer) (uint8, uint8) {

var networkRadius [swarm.MaxBins]int
var nHoodRadius [swarm.MaxBins]int
var networkDepth [swarm.MaxBins]int
var nHoodDepth [swarm.MaxBins]int

for _, peer := range peers {
if peer.status.StorageRadius < uint32(swarm.MaxBins) {
if peer.status.CommitedDepth < uint32(swarm.MaxBins) {
martinconic marked this conversation as resolved.
Show resolved Hide resolved
if peer.neighbor {
nHoodRadius[peer.status.StorageRadius]++
nHoodDepth[peer.status.CommitedDepth]++
}
networkRadius[peer.status.StorageRadius]++
networkDepth[peer.status.CommitedDepth]++
}
}

networkR := maxIndex(networkRadius[:])
hoodR := maxIndex(nHoodRadius[:])
networkD := maxIndex(networkDepth[:])
hoodD := maxIndex(nHoodDepth[:])

return uint8(networkR), uint8(hoodR)
return uint8(networkD), uint8(hoodD)
}

// commitment finds the most common batch commitment.
Expand Down
43 changes: 23 additions & 20 deletions pkg/salud/salud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,28 @@ func TestSalud(t *testing.T) {
t.Parallel()
peers := []peer{
// fully healhy
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 8}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 8}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 8}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 8}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 8}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 8}, 1, true},

// healthy since radius >= most common radius - 2
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 7}, 1, true},

// radius too low
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 5}, 1, false},

// dur too long
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 8}, 2, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 8}, 2, false},

// connections not enough
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 90, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 90, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommitedDepth: 8}, 1, false},

// commitment wrong
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 35, ReserveSize: 100}, 1, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 35, ReserveSize: 100, CommitedDepth: 8}, 1, false},
}

statusM := &statusMock{make(map[string]peer)}
Expand All @@ -66,11 +66,12 @@ func TestSalud(t *testing.T) {
topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

reserve := mockstorer.NewReserve(
mockstorer.WithRadius(8),
mockstorer.WithRadius(6),
mockstorer.WithReserveSize(100),
mockstorer.WithCapacityDoubling(2),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 0)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand Down Expand Up @@ -114,9 +115,10 @@ func TestSelfUnhealthyRadius(t *testing.T) {
reserve := mockstorer.NewReserve(
mockstorer.WithRadius(7),
mockstorer.WithReserveSize(100),
mockstorer.WithCapacityDoubling(0),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 0)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
Expand All @@ -135,8 +137,8 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) {
t.Parallel()
peers := []peer{
// fully healhy
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", CommitedDepth: 8}, 0, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", CommitedDepth: 8}, 0, true},
}

statusM := &statusMock{make(map[string]peer)}
Expand All @@ -151,9 +153,10 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) {
reserve := mockstorer.NewReserve(
mockstorer.WithRadius(6),
mockstorer.WithReserveSize(100),
mockstorer.WithCapacityDoubling(2),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 2)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
Expand Down Expand Up @@ -183,7 +186,7 @@ func TestSubToRadius(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8, 0)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)

c, unsub := service.SubscribeNetworkStorageRadius()
t.Cleanup(unsub)
Expand Down Expand Up @@ -216,7 +219,7 @@ func TestUnsub(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8, 0)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

c, unsub := service.SubscribeNetworkStorageRadius()
Expand Down
Loading
Loading