Skip to content

Commit

Permalink
api: exclude malicious atxs from epoch atx stream (#4776)
Browse files Browse the repository at this point in the history
## Motivation
exclude malicious atxs from epoch atxs (activeset)
  • Loading branch information
countvonzero committed Aug 5, 2023
1 parent a396bf6 commit 86efc27
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 57 deletions.
61 changes: 46 additions & 15 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/spacemeshos/go-spacemesh/activation"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
vm "github.com/spacemeshos/go-spacemesh/genvm"
"github.com/spacemeshos/go-spacemesh/genvm/sdk"
Expand All @@ -49,6 +50,8 @@ import (
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/identities"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/txs"
)
Expand Down Expand Up @@ -305,10 +308,6 @@ func (m *MeshAPIMock) GetLayer(tid types.LayerID) (*types.Layer, error) {
return types.NewExistingLayer(tid, ballots, blocks), nil
}

func (m *MeshAPIMock) EpochAtxs(types.EpochID) ([]types.ATXID, error) {
return types.RandomActiveSet(activesetSize), nil
}

func (m *MeshAPIMock) GetATXs(context.Context, []types.ATXID) (map[types.ATXID]*types.VerifiedActivationTx, []types.ATXID) {
atxs := map[types.ATXID]*types.VerifiedActivationTx{
globalAtx.ID(): globalAtx,
Expand Down Expand Up @@ -1051,7 +1050,7 @@ func TestMeshService(t *testing.T) {
genesis := time.Unix(genTimeUnix, 0)
genTime.EXPECT().GenesisTime().Return(genesis)
genTime.EXPECT().CurrentLayer().Return(layerCurrent).AnyTimes()
grpcService := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
t.Cleanup(launchServer(t, cfg, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -1997,7 +1996,7 @@ func TestAccountMeshDataStream_comprehensive(t *testing.T) {

ctrl := gomock.NewController(t)
genTime := NewMockgenesisTimeAPI(ctrl)
grpcService := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
t.Cleanup(launchServer(t, cfg, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -2171,7 +2170,7 @@ func TestLayerStream_comprehensive(t *testing.T) {

ctrl := gomock.NewController(t)
genTime := NewMockgenesisTimeAPI(ctrl)
grpcService := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
t.Cleanup(launchServer(t, cfg, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -2313,7 +2312,7 @@ func TestMultiService(t *testing.T) {
genesis := time.Unix(genTimeUnix, 0)
genTime.EXPECT().GenesisTime().Return(genesis)
svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, "v0.0.0", "cafebabe", logtest.New(t).WithName("grpc.Node"))
svc2 := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
svc2 := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
shutDown := launchServer(t, cfg, svc1, svc2)
t.Cleanup(shutDown)

Expand Down Expand Up @@ -2368,7 +2367,7 @@ func TestJsonApi(t *testing.T) {
genesis := time.Unix(genTimeUnix, 0)
genTime.EXPECT().GenesisTime().Return(genesis)
svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, "v0.0.0", "cafebabe", logtest.New(t).WithName("grpc.Node"))
svc2 := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
svc2 := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
t.Cleanup(launchServer(t, cfg, svc1, svc2))
time.Sleep(time.Second)

Expand Down Expand Up @@ -2712,26 +2711,58 @@ func TestVMAccountUpdates(t *testing.T) {
require.Equal(t, len(accounts), i)
}

func createAtxs(tb testing.TB, epoch types.EpochID, atxids []types.ATXID) []*types.VerifiedActivationTx {
all := make([]*types.VerifiedActivationTx, 0, len(atxids))
for _, id := range atxids {
atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: epoch,
},
NumUnits: 1,
}}
atx.SetID(id)
atx.SetEffectiveNumUnits(atx.NumUnits)
atx.SetReceived(time.Now())
atx.SmesherID = types.RandomNodeID()
vAtx, err := atx.Verify(0, 1)
require.NoError(tb, err)
all = append(all, vAtx)
}
return all
}

func TestMeshService_EpochStream(t *testing.T) {
ctrl := gomock.NewController(t)
genTime := NewMockgenesisTimeAPI(ctrl)
srv := NewMeshService(meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
db := sql.InMemory()
srv := NewMeshService(datastore.NewCachedDB(db, logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh"))
t.Cleanup(launchServer(t, cfg, srv))

epoch := types.EpochID(3)
atxids := types.RandomActiveSet(100)
all := createAtxs(t, epoch, atxids)
var expected, got []types.ATXID
for i, vatx := range all {
require.NoError(t, atxs.Add(db, vatx))
if i%2 == 0 {
require.NoError(t, identities.SetMalicious(db, vatx.SmesherID, []byte("bad"), time.Now()))
} else {
expected = append(expected, vatx.ID())
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn := dialGrpc(ctx, t, cfg.PublicListener)
client := pb.NewMeshServiceClient(conn)

stream, err := client.EpochStream(ctx, &pb.EpochStreamRequest{Epoch: 3})
stream, err := client.EpochStream(ctx, &pb.EpochStreamRequest{Epoch: epoch.Uint32()})
require.NoError(t, err)
var total int
for {
_, err = stream.Recv()
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
total++
got = append(got, types.ATXID(types.BytesToHash(resp.GetId().GetId())))
}
require.Equal(t, activesetSize, total)
require.ElementsMatch(t, expected, got)
}
1 change: 0 additions & 1 deletion api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ type genesisTimeAPI interface {

// meshAPI is an api for getting mesh status about layers/blocks/rewards.
type meshAPI interface {
EpochAtxs(types.EpochID) ([]types.ATXID, error)
GetATXs(context.Context, []types.ATXID) (map[types.ATXID]*types.VerifiedActivationTx, []types.ATXID)
GetLayer(types.LayerID) (*types.Layer, error)
GetRewards(types.Address) ([]*types.Reward, error)
Expand Down
39 changes: 26 additions & 13 deletions api/grpcserver/mesh_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
)

// MeshService exposes mesh data such as accounts, blocks, and transactions.
type MeshService struct {
logger log.Logger
cdb *datastore.CachedDB
mesh meshAPI // Mesh
conState conservativeState
genTime genesisTimeAPI
Expand All @@ -34,12 +36,20 @@ func (s MeshService) RegisterService(server *Server) {

// NewMeshService creates a new service using config data.
func NewMeshService(
msh meshAPI, cstate conservativeState, genTime genesisTimeAPI,
layersPerEpoch uint32, genesisID types.Hash20, layerDuration time.Duration,
layerAvgSize, txsPerProposal uint32, lg log.Logger,
cdb *datastore.CachedDB,
msh meshAPI,
cstate conservativeState,
genTime genesisTimeAPI,
layersPerEpoch uint32,
genesisID types.Hash20,
layerDuration time.Duration,
layerAvgSize,
txsPerProposal uint32,
lg log.Logger,
) *MeshService {
return &MeshService{
logger: lg,
cdb: cdb,
mesh: msh,
conState: cstate,
genTime: genTime,
Expand All @@ -63,7 +73,7 @@ func (s MeshService) GenesisTime(context.Context, *pb.GenesisTimeRequest) (*pb.G
func (s MeshService) CurrentLayer(context.Context, *pb.CurrentLayerRequest) (*pb.CurrentLayerResponse, error) {
s.logger.Info("GRPC MeshService.CurrentLayer")
return &pb.CurrentLayerResponse{Layernum: &pb.LayerNumber{
Number: uint32(s.genTime.CurrentLayer().Uint32()),
Number: s.genTime.CurrentLayer().Uint32(),
}}, nil
}

Expand Down Expand Up @@ -571,21 +581,24 @@ func convertLayerStatus(in int) pb.Layer_LayerStatus {

func (s MeshService) EpochStream(req *pb.EpochStreamRequest, stream pb.MeshService_EpochStreamServer) error {
epoch := types.EpochID(req.Epoch)
atxids, err := s.mesh.EpochAtxs(epoch)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
for _, id := range atxids {
if err := s.cdb.IterateEpochATXHeaders(epoch+1, func(header *types.ActivationTxHeader) error {
select {
case <-stream.Context().Done():
return nil
default:
var res pb.EpochStreamResponse
res.Id = &pb.ActivationId{Id: id.Bytes()}
if err = stream.Send(&res); err != nil {
return status.Error(codes.Internal, err.Error())
malicious, err := s.cdb.IsMalicious(header.NodeID)
if err != nil {
return err
}
if malicious {
return nil
}
var res pb.EpochStreamResponse
res.Id = &pb.ActivationId{Id: header.ID.Bytes()}
return stream.Send(&res)
}
}); err != nil {
return status.Error(codes.Internal, err.Error())
}
return nil
}
15 changes: 0 additions & 15 deletions api/grpcserver/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 26 additions & 6 deletions cmd/bootstrapper/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/bootstrap"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -110,14 +113,29 @@ func (m *MeshAPIMock) GetATXs(context.Context, []types.ATXID) (map[types.ATXID]*
panic("not implemented")
}
func (m *MeshAPIMock) MeshHash(types.LayerID) (types.Hash32, error) { panic("not implemented") }
func (m *MeshAPIMock) EpochAtxs(types.EpochID) ([]types.ATXID, error) {
return types.RandomActiveSet(activeSetSize), nil

func createAtxs(tb testing.TB, db sql.Executor, epoch types.EpochID, atxids []types.ATXID) {
for _, id := range atxids {
atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: epoch,
},
NumUnits: 1,
}}
atx.SetID(id)
atx.SetEffectiveNumUnits(atx.NumUnits)
atx.SetReceived(time.Now())
atx.SmesherID = types.RandomNodeID()
vAtx, err := atx.Verify(0, 1)
require.NoError(tb, err)
require.NoError(tb, atxs.Add(db, vAtx))
}
}

func launchServer(tb testing.TB) func() {
func launchServer(tb testing.TB, cdb *datastore.CachedDB) func() {
grpcService := grpcserver.New(fmt.Sprintf("127.0.0.1:%d", grpcPort), logtest.New(tb).Named("grpc"))
jsonService := grpcserver.NewJSONHTTPServer(fmt.Sprintf("127.0.0.1:%d", jsonport), logtest.New(tb).WithName("grpc.JSON"))
s := grpcserver.NewMeshService(&MeshAPIMock{}, nil, nil, 0, types.Hash20{}, 0, 0, 0, logtest.New(tb).WithName("grpc.Mesh"))
s := grpcserver.NewMeshService(cdb, &MeshAPIMock{}, nil, nil, 0, types.Hash20{}, 0, 0, 0, logtest.New(tb).WithName("grpc.Mesh"))

pb.RegisterMeshServiceServer(grpcService.GrpcServer, s)
// start gRPC and json servers
Expand Down Expand Up @@ -152,7 +170,10 @@ func verifyUpdate(t *testing.T, data []byte, epoch types.EpochID, expBeacon stri
}

func TestGenerator_Generate(t *testing.T) {
t.Cleanup(launchServer(t))
targetEpoch := types.EpochID(3)
db := sql.InMemory()
createAtxs(t, db, targetEpoch-1, types.RandomActiveSet(activeSetSize))
t.Cleanup(launchServer(t, datastore.NewCachedDB(db, logtest.New(t))))

for _, tc := range []struct {
desc string
Expand Down Expand Up @@ -198,7 +219,6 @@ func TestGenerator_Generate(t *testing.T) {
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
targetEpoch := types.EpochID(3)
persisted, err := g.Generate(ctx, targetEpoch, tc.beacon, tc.actives)
require.NoError(t, err)

Expand Down
6 changes: 5 additions & 1 deletion cmd/bootstrapper/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (

"github.com/spacemeshos/go-spacemesh/bootstrap"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
)

const checkpointdata = `{
Expand Down Expand Up @@ -95,7 +97,8 @@ func updateCheckpoint(t *testing.T, ctx context.Context, data string) {
}

func TestServer(t *testing.T) {
t.Cleanup(launchServer(t))
db := sql.InMemory()
t.Cleanup(launchServer(t, datastore.NewCachedDB(db, logtest.New(t))))

fs := afero.NewMemMapFs()
g := NewGenerator(
Expand Down Expand Up @@ -123,6 +126,7 @@ func TestServer(t *testing.T) {
srv.Start(ctx, ch, np)

for _, epoch := range epochs {
createAtxs(t, db, epoch-1, types.RandomActiveSet(activeSetSize))
fname := PersistedFilename(epoch, bootstrap.SuffixBoostrap)
require.Eventually(t, func() bool {
_, err := fs.Stat(fname)
Expand Down
5 changes: 0 additions & 5 deletions mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/spacemeshos/go-spacemesh/hash"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/blocks"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
Expand Down Expand Up @@ -616,10 +615,6 @@ func (msh *Mesh) GetATXs(ctx context.Context, atxIds []types.ATXID) (map[types.A
return atxs, mIds
}

func (msh *Mesh) EpochAtxs(epoch types.EpochID) ([]types.ATXID, error) {
return atxs.GetIDsByEpoch(msh.cdb, epoch)
}

// GetRewards retrieves account's rewards by the coinbase address.
func (msh *Mesh) GetRewards(coinbase types.Address) ([]*types.Reward, error) {
return rewards.List(msh.cdb, coinbase)
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ func (app *App) initService(ctx context.Context, svc grpcserver.Service) (grpcse
case grpcserver.GlobalState:
return grpcserver.NewGlobalStateService(app.mesh, app.conState, app.log.WithName("grpc.GlobalState")), nil
case grpcserver.Mesh:
return grpcserver.NewMeshService(app.mesh, app.conState, app.clock, app.Config.LayersPerEpoch, app.Config.Genesis.GenesisID(), app.Config.LayerDuration, app.Config.LayerAvgSize, uint32(app.Config.TxsPerProposal), app.log.WithName("grpc.Mesh")), nil
return grpcserver.NewMeshService(app.cachedDB, app.mesh, app.conState, app.clock, app.Config.LayersPerEpoch, app.Config.Genesis.GenesisID(), app.Config.LayerDuration, app.Config.LayerAvgSize, uint32(app.Config.TxsPerProposal), app.log.WithName("grpc.Mesh")), nil
case grpcserver.Node:
return grpcserver.NewNodeService(app.host, app.mesh, app.clock, app.syncer, cmd.Version, cmd.Commit, app.log.WithName("grpc.Node")), nil
case grpcserver.Admin:
Expand Down

0 comments on commit 86efc27

Please sign in to comment.