From 69556b3a491ae1c29dde20255aa12d50cd7f75dd Mon Sep 17 00:00:00 2001 From: Matthias Fasching <5011972+fasmat@users.noreply.github.com> Date: Wed, 9 Nov 2022 20:18:33 +0000 Subject: [PATCH] Use updated Initializer API from PoST (#3715) ## Motivation Integrates changes of https://github.com/spacemeshos/post/issues/78 into go-spacemesh, only merge after the former has been merged. ## Changes The asynchronous `Initializer::SessionNumLabelsWrittenChan()` is removed in favor of the synchronous `Initializer::SessionNumLabelsWritten()` with the changes in https://github.com/spacemeshos/post/pull/84/ This updates go-spacemesh to not use the removed method any more. The GRPC server now sends PoST status updates to a client in 1 second intervals instead of relying on PoST to report its status in (possibly) irregular intervals. This also means that a client now has to cancel a request or it will receive status updates about PoST indefinitely. ## Test Plan ## TODO - [x] Explain motivation or link existing issue(s) - [x] Test changes and document test plan - [x] Update documentation as needed ## DevOps Notes - [x] This PR does not require configuration changes (e.g., environment variables, GitHub secrets, VM resources) - [x] This PR does not affect public APIs - [x] This PR does not rely on a new version of external services (PoET, elasticsearch, etc.) - [x] This PR does not make changes to log messages (which monitoring infrastructure may rely on) --- activation/mocks/post.go | 14 -- activation/post.go | 149 ++++++++------------- activation/post_test.go | 167 ++++++------------------ api/config/config.go | 7 + api/grpcserver/grpcserver_test.go | 209 +++++++++++++++--------------- api/grpcserver/smesher_service.go | 18 +-- cmd/node/node.go | 2 +- go.mod | 2 +- go.sum | 4 +- 9 files changed, 219 insertions(+), 353 deletions(-) diff --git a/activation/mocks/post.go b/activation/mocks/post.go index 22c2800ddc..2f14542554 100644 --- a/activation/mocks/post.go +++ b/activation/mocks/post.go @@ -151,20 +151,6 @@ func (mr *MockPostSetupProviderMockRecorder) Status() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Status", reflect.TypeOf((*MockPostSetupProvider)(nil).Status)) } -// StatusChan mocks base method. -func (m *MockPostSetupProvider) StatusChan() <-chan *types.PostSetupStatus { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StatusChan") - ret0, _ := ret[0].(<-chan *types.PostSetupStatus) - return ret0 -} - -// StatusChan indicates an expected call of StatusChan. -func (mr *MockPostSetupProviderMockRecorder) StatusChan() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatusChan", reflect.TypeOf((*MockPostSetupProvider)(nil).StatusChan)) -} - // StopSession mocks base method. func (m *MockPostSetupProvider) StopSession(deleteFiles bool) error { m.ctrl.T.Helper() diff --git a/activation/post.go b/activation/post.go index d4bde1850b..3f36802e34 100644 --- a/activation/post.go +++ b/activation/post.go @@ -1,6 +1,7 @@ package activation import ( + "context" "errors" "fmt" "sync" @@ -32,7 +33,6 @@ func DefaultPostSetupOpts() atypes.PostSetupOpts { // PostSetupProvider defines the functionality required for Post setup. type PostSetupProvider interface { Status() *atypes.PostSetupStatus - StatusChan() <-chan *atypes.PostSetupStatus ComputeProviders() []atypes.PostSetupComputeProvider Benchmark(p atypes.PostSetupComputeProvider) (int, error) StartSession(opts atypes.PostSetupOpts, commitmentAtx types.ATXID) (chan struct{}, error) @@ -53,8 +53,7 @@ type PostSetupManager struct { db *datastore.CachedDB goldenATXID types.ATXID - state atypes.PostSetupState - initCompletedChan chan struct{} + state atypes.PostSetupState // init is the current initializer instance. It is being // replaced at the beginning of every data creation session. @@ -63,9 +62,8 @@ type PostSetupManager struct { lastOpts *atypes.PostSetupOpts lastErr error - // startedChan indicates whether a data creation session has started. - // The channel instance is replaced in the end of the session. - startedChan chan struct{} + // cancel is the function that PostSetupManager can invoke to cancel the execution of the initializer + cancel context.CancelFunc // doneChan indicates whether the current data creation session has finished. // The channel instance is replaced in the beginning of the session. @@ -75,14 +73,12 @@ type PostSetupManager struct { // NewPostSetupManager creates a new instance of PostSetupManager. func NewPostSetupManager(id types.NodeID, cfg atypes.PostConfig, logger log.Log, db *datastore.CachedDB, goldenATXID types.ATXID) (*PostSetupManager, error) { mgr := &PostSetupManager{ - id: id, - cfg: cfg, - logger: logger, - db: db, - goldenATXID: goldenATXID, - state: atypes.PostSetupStateNotStarted, - initCompletedChan: make(chan struct{}), - startedChan: make(chan struct{}), + id: id, + cfg: cfg, + logger: logger, + db: db, + goldenATXID: goldenATXID, + state: atypes.PostSetupStateNotStarted, } return mgr, nil @@ -92,58 +88,23 @@ var errNotComplete = errors.New("not complete") // Status returns the setup current status. func (mgr *PostSetupManager) Status() *atypes.PostSetupStatus { - status := &atypes.PostSetupStatus{} - - mgr.mu.Lock() - status.State = mgr.state - init := mgr.init - mgr.mu.Unlock() - - if status.State == atypes.PostSetupStateNotStarted { - return status - } - - status.NumLabelsWritten = init.SessionNumLabelsWritten() - status.LastOpts = mgr.LastOpts() - status.LastError = mgr.LastError() - - return status -} - -// StatusChan returns a channel with status updates of the setup current or the upcoming session. -func (mgr *PostSetupManager) StatusChan() <-chan *atypes.PostSetupStatus { - // Wait for session to start because only then the initializer instance - // used for retrieving the progress updates is already set. mgr.mu.Lock() - startedChan := mgr.startedChan - mgr.mu.Unlock() - - <-startedChan - - statusChan := make(chan *atypes.PostSetupStatus, 1024) - go func() { - defer close(statusChan) - - initialStatus := mgr.Status() - statusChan <- initialStatus - - mgr.mu.Lock() - init := mgr.init - mgr.mu.Unlock() + defer mgr.mu.Unlock() - ch := init.SessionNumLabelsWrittenChan() - for numLabelsWritten := range ch { - status := *initialStatus - status.NumLabelsWritten = numLabelsWritten - statusChan <- &status - } + status := mgr.state - if finalStatus := mgr.Status(); finalStatus.LastError != nil { - statusChan <- finalStatus + if status == atypes.PostSetupStateNotStarted { + return &atypes.PostSetupStatus{ + State: status, } - }() + } - return statusChan + return &atypes.PostSetupStatus{ + State: mgr.state, + NumLabelsWritten: mgr.init.SessionNumLabelsWritten(), + LastOpts: mgr.lastOpts, + LastError: mgr.lastErr, + } } // ComputeProviders returns a list of available compute providers for Post setup. @@ -189,32 +150,27 @@ func (mgr *PostSetupManager) Benchmark(p atypes.PostSetupComputeProvider) (int, // It supports resuming a previously started session, as well as changing the Post setup options (e.g., number of units) // after initial setup. func (mgr *PostSetupManager) StartSession(opts atypes.PostSetupOpts, commitmentAtx types.ATXID) (chan struct{}, error) { - state := mgr.getState() + mgr.mu.Lock() - if state == atypes.PostSetupStateInProgress { + if mgr.state == atypes.PostSetupStateInProgress { + mgr.mu.Unlock() return nil, fmt.Errorf("post setup session in progress") } - if state == atypes.PostSetupStateComplete { + if mgr.state == atypes.PostSetupStateComplete { // Check whether the new request invalidates the current status. - lastOpts := mgr.LastOpts() + lastOpts := mgr.lastOpts invalidate := opts.DataDir != lastOpts.DataDir || opts.NumUnits != lastOpts.NumUnits if !invalidate { // Already complete. + mgr.mu.Unlock() return mgr.doneChan, nil } - - mgr.mu.Lock() - mgr.initCompletedChan = make(chan struct{}) - mgr.mu.Unlock() } - mgr.mu.Lock() - mgr.state = atypes.PostSetupStateInProgress - mgr.mu.Unlock() - if opts.ComputeProviderID == config.BestProviderID { p, err := mgr.BestProvider() if err != nil { + mgr.mu.Unlock() return nil, err } @@ -223,29 +179,35 @@ func (mgr *PostSetupManager) StartSession(opts atypes.PostSetupOpts, commitmentA } commitment := GetCommitmentBytes(mgr.id, commitmentAtx) - newInit, err := initialization.NewInitializer(config.Config(mgr.cfg), config.InitOpts(opts), commitment) + newInit, err := initialization.NewInitializer( + initialization.WithCommitment(commitment), + initialization.WithConfig(config.Config(mgr.cfg)), + initialization.WithInitOpts(config.InitOpts(opts)), + initialization.WithLogger(mgr.logger), + ) if err != nil { - mgr.mu.Lock() mgr.state = atypes.PostSetupStateError mgr.lastErr = err mgr.mu.Unlock() return nil, fmt.Errorf("new initializer: %w", err) } - newInit.SetLogger(mgr.logger) - - mgr.mu.Lock() + mgr.state = atypes.PostSetupStateInProgress mgr.init = newInit mgr.lastOpts = &opts mgr.lastErr = nil - close(mgr.startedChan) + // TODO(mafa): the context used here should be passed in as argument to StartSession + // and instead of having a StopSession method the caller should just cancel the context when they want to stop the session. + ctx, cancel := context.WithCancel(context.TODO()) + mgr.cancel = cancel mgr.doneChan = make(chan struct{}) mgr.mu.Unlock() go func() { defer func() { mgr.mu.Lock() - mgr.startedChan = make(chan struct{}) + mgr.cancel() + mgr.cancel = nil close(mgr.doneChan) mgr.mu.Unlock() }() @@ -258,12 +220,12 @@ func (mgr *PostSetupManager) StartSession(opts atypes.PostSetupOpts, commitmentA log.String("provider", fmt.Sprintf("%d", opts.ComputeProviderID)), ) - if err := newInit.Initialize(); err != nil { + if err := newInit.Initialize(ctx); err != nil { mgr.mu.Lock() defer mgr.mu.Unlock() - if errors.Is(err, initialization.ErrStopped) { - mgr.logger.Info("post setup session stopped") + if errors.Is(err, context.Canceled) { + mgr.logger.Info("post setup session was stopped") mgr.state = atypes.PostSetupStateNotStarted } else { mgr.state = atypes.PostSetupStateError @@ -281,7 +243,6 @@ func (mgr *PostSetupManager) StartSession(opts atypes.PostSetupOpts, commitmentA mgr.mu.Lock() mgr.state = atypes.PostSetupStateComplete - close(mgr.initCompletedChan) mgr.mu.Unlock() }() @@ -298,9 +259,7 @@ func (mgr *PostSetupManager) StopSession(deleteFiles bool) error { mgr.mu.Unlock() if state == atypes.PostSetupStateInProgress { - if err := init.Stop(); err != nil { - return fmt.Errorf("stop: %w", err) - } + mgr.cancel() // Block until the current data creation session will be finished. <-doneChan @@ -314,7 +273,6 @@ func (mgr *PostSetupManager) StopSession(deleteFiles bool) error { mgr.mu.Lock() // Reset internal state. mgr.state = atypes.PostSetupStateNotStarted - mgr.initCompletedChan = make(chan struct{}) mgr.mu.Unlock() } @@ -323,14 +281,16 @@ func (mgr *PostSetupManager) StopSession(deleteFiles bool) error { // GenerateProof generates a new Post. func (mgr *PostSetupManager) GenerateProof(challenge []byte, commitmentAtx types.ATXID) (*types.Post, *types.PostMetadata, error) { - state := mgr.getState() + mgr.mu.Lock() - if state != atypes.PostSetupStateComplete { + if mgr.state != atypes.PostSetupStateComplete { + mgr.mu.Unlock() return nil, nil, errNotComplete } + mgr.mu.Unlock() commitment := GetCommitmentBytes(mgr.id, commitmentAtx) - prover, err := proving.NewProver(config.Config(mgr.cfg), mgr.LastOpts().DataDir, commitment) + prover, err := proving.NewProver(config.Config(mgr.cfg), mgr.lastOpts.DataDir, commitment) if err != nil { return nil, nil, fmt.Errorf("new prover: %w", err) } @@ -374,13 +334,6 @@ func (mgr *PostSetupManager) Config() atypes.PostConfig { return mgr.cfg } -func (mgr *PostSetupManager) getState() atypes.PostSetupState { - mgr.mu.Lock() - defer mgr.mu.Unlock() - - return mgr.state -} - func GetCommitmentBytes(id types.NodeID, commitmentAtx types.ATXID) []byte { h := hash.Sum(append(id.ToBytes(), commitmentAtx.Bytes()...)) return h[:] diff --git a/activation/post_test.go b/activation/post_test.go index 49c66a0425..27a93a3d4c 100644 --- a/activation/post_test.go +++ b/activation/post_test.go @@ -1,12 +1,13 @@ package activation import ( - "sync" + "context" "testing" "time" "github.com/spacemeshos/post/initialization" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" atypes "github.com/spacemeshos/go-spacemesh/activation/types" "github.com/spacemeshos/go-spacemesh/common/types" @@ -34,32 +35,42 @@ func TestPostSetupManager(t *testing.T) { mgr, err := NewPostSetupManager(id, cfg, logtest.New(t), cdb, goldenATXID) req.NoError(err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var eg errgroup.Group lastStatus := &atypes.PostSetupStatus{} - go func() { - for status := range mgr.StatusChan() { - req.True(status.NumLabelsWritten >= lastStatus.NumLabelsWritten) - req.Equal(opts, *status.LastOpts) - req.Nil(status.LastError) - - if status.NumLabelsWritten == uint64(opts.NumUnits)*cfg.LabelsPerUnit { - // TODO(moshababo): fix the following failure. `status.State` changes to `postSetupStateComplete` only after the channel event was triggered. - // req.Equal(postSetupStateComplete, status.State) - } else { - req.Equal(atypes.PostSetupStateInProgress, status.State) - } + eg.Go(func() error { + timer := time.NewTicker(50 * time.Millisecond) + defer timer.Stop() - lastStatus = status + for { + select { + case <-ctx.Done(): + return nil + case <-timer.C: + status := mgr.Status() + req.GreaterOrEqual(status.NumLabelsWritten, lastStatus.NumLabelsWritten) + req.Equal(opts, *status.LastOpts) + req.Nil(status.LastError) + + if status.NumLabelsWritten < uint64(opts.NumUnits)*cfg.LabelsPerUnit { + req.Equal(atypes.PostSetupStateInProgress, status.State) + } + } } - }() + }) // Create data. doneChan, err := mgr.StartSession(opts, goldenATXID) req.NoError(err) <-doneChan + cancel() + eg.Wait() + req.Equal(opts, *mgr.LastOpts()) req.NoError(mgr.LastError()) - // TODO(moshababo): fix the following failure. `status.State` changes to `postSetupStateComplete` only after the channel event was triggered. - // req.Equal(lastStatus, mgr.Status()) + req.Equal(atypes.PostSetupStateComplete, mgr.Status().State) // Create data (same opts). doneChan, err = mgr.StartSession(opts, goldenATXID) @@ -69,8 +80,7 @@ func TestPostSetupManager(t *testing.T) { req.NoError(mgr.LastError()) // Cleanup. - err = mgr.StopSession(true) - req.NoError(err) + req.NoError(mgr.StopSession(true)) // Create data (same opts, after deletion). doneChan, err = mgr.StartSession(opts, goldenATXID) @@ -78,8 +88,7 @@ func TestPostSetupManager(t *testing.T) { <-doneChan req.Equal(opts, *mgr.LastOpts()) req.NoError(mgr.LastError()) - // TODO(moshababo): fix the following failure. `status.State` changes to `postSetupStateComplete` only after the channel event was triggered. - // req.Equal(lastStatus, mgr.Status()) + req.Equal(atypes.PostSetupStateComplete, mgr.Status().State) } func TestPostSetupManager_InitialStatus(t *testing.T) { @@ -101,10 +110,7 @@ func TestPostSetupManager_InitialStatus(t *testing.T) { doneChan, err := mgr.StartSession(opts, goldenATXID) req.NoError(err) <-doneChan - - // Compare the last status update to the status queried directly. - // TODO(moshababo): fix the following failure. `status.State` changes to `postSetupStateComplete` only after the channel event was triggered. - // req.Equal(lastStatus, mgr.Status()) + req.Equal(atypes.PostSetupStateComplete, mgr.Status().State) // Re-instantiate `PostSetupManager`. mgr, err = NewPostSetupManager(id, cfg, logtest.New(t), cdb, goldenATXID) @@ -149,94 +155,6 @@ func TestPostSetupManager_GenerateProof(t *testing.T) { req.ErrorIs(err, errNotComplete) } -func TestPostSetupManager_StatusChan_BeforeSessionStarted(t *testing.T) { - req := require.New(t) - - cdb := newCachedDB(t) - cfg, opts := getTestConfig(t) - mgr, err := NewPostSetupManager(id, cfg, logtest.New(t), cdb, goldenATXID) - req.NoError(err) - - // Verify that the status stream works properly when called *before* session started. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - ch := mgr.StatusChan() - var prevStatus *atypes.PostSetupStatus - for { - status, more := <-ch - if more { - if prevStatus == nil { - // Verify initial status. - req.Equal(uint64(0), status.NumLabelsWritten) - } - prevStatus = status - } else { - // Verify last status. - req.Equal(uint64(opts.NumUnits)*cfg.LabelsPerUnit, prevStatus.NumLabelsWritten) - break - } - } - }() - - // Create data. - time.Sleep(1 * time.Second) // Short delay. - doneChan, err := mgr.StartSession(opts, goldenATXID) - req.NoError(err) - <-doneChan - wg.Wait() - - // Cleanup. - err = mgr.StopSession(true) - req.NoError(err) -} - -func TestPostSetupManager_StatusChan_AfterSessionStarted(t *testing.T) { - req := require.New(t) - - cdb := newCachedDB(t) - cfg, opts := getTestConfig(t) - opts.NumUnits *= 10 - mgr, err := NewPostSetupManager(id, cfg, logtest.New(t), cdb, goldenATXID) - req.NoError(err) - - // Verify that the status stream works properly when called *after* session started (yet before it ended). - var wg sync.WaitGroup - wg.Add(1) - go func() { - time.Sleep(100 * time.Millisecond) // Short delay. - - ch := mgr.StatusChan() - var prevStatus *atypes.PostSetupStatus - for { - status, more := <-ch - if more { - if prevStatus == nil { - // Verify initial status. - req.Equal(uint64(0), status.NumLabelsWritten) - } - prevStatus = status - } else { - // Verify last status. - req.Equal(uint64(opts.NumUnits)*cfg.LabelsPerUnit, prevStatus.NumLabelsWritten) - break - } - } - wg.Done() - }() - - // Create data. - doneChan, err := mgr.StartSession(opts, goldenATXID) - req.NoError(err) - <-doneChan - wg.Wait() - - // Cleanup. - err = mgr.StopSession(true) - req.NoError(err) -} - func TestPostSetupManager_Stop(t *testing.T) { req := require.New(t) @@ -248,6 +166,8 @@ func TestPostSetupManager_Stop(t *testing.T) { // Verify state. status := mgr.Status() req.Equal(atypes.PostSetupStateNotStarted, status.State) + req.Zero(status.NumLabelsWritten) + req.Nil(status.LastOpts) // Create data. doneChan, err := mgr.StartSession(opts, goldenATXID) @@ -255,24 +175,19 @@ func TestPostSetupManager_Stop(t *testing.T) { <-doneChan // Verify state. - status = mgr.Status() - req.Equal(atypes.PostSetupStateComplete, status.State) + req.Equal(atypes.PostSetupStateComplete, mgr.Status().State) // Stop without file deletion. - err = mgr.StopSession(false) - req.NoError(err) + req.NoError(mgr.StopSession(false)) // Verify state. - status = mgr.Status() - req.Equal(atypes.PostSetupStateComplete, status.State) + req.Equal(atypes.PostSetupStateComplete, mgr.Status().State) // Stop with file deletion. - err = mgr.StopSession(true) - req.NoError(err) + req.NoError(mgr.StopSession(true)) // Verify state. - status = mgr.Status() - req.Equal(atypes.PostSetupStateNotStarted, status.State) + req.Equal(atypes.PostSetupStateNotStarted, mgr.Status().State) // Create data again. doneChan, err = mgr.StartSession(opts, goldenATXID) @@ -280,8 +195,7 @@ func TestPostSetupManager_Stop(t *testing.T) { <-doneChan // Verify state. - status = mgr.Status() - req.Equal(atypes.PostSetupStateComplete, status.State) + req.Equal(atypes.PostSetupStateComplete, mgr.Status().State) } func TestPostSetupManager_Stop_WhileInProgress(t *testing.T) { @@ -306,8 +220,7 @@ func TestPostSetupManager_Stop_WhileInProgress(t *testing.T) { req.Equal(atypes.PostSetupStateInProgress, status.State) // Stop without files deletion. - err = mgr.StopSession(false) - req.NoError(err) + req.NoError(mgr.StopSession(false)) select { case <-doneChan: diff --git a/api/config/config.go b/api/config/config.go index 182fb41e4a..af649087d2 100644 --- a/api/config/config.go +++ b/api/config/config.go @@ -4,6 +4,7 @@ package config import ( "errors" "fmt" + "time" ) const ( @@ -18,6 +19,8 @@ const ( defaultStartNodeService = false defaultStartSmesherService = false defaultStartTransactionService = false + + defaultSmesherStreamInterval = 1 * time.Second ) // Config defines the api config params. @@ -35,6 +38,8 @@ type Config struct { StartNodeService bool StartSmesherService bool StartTransactionService bool + + SmesherStreamInterval time.Duration } func init() { @@ -57,6 +62,8 @@ func DefaultConfig() Config { StartNodeService: defaultStartNodeService, StartSmesherService: defaultStartSmesherService, StartTransactionService: defaultStartTransactionService, + + SmesherStreamInterval: defaultSmesherStreamInterval, } } diff --git a/api/grpcserver/grpcserver_test.go b/api/grpcserver/grpcserver_test.go index 40a8a42d84..a763c055a3 100644 --- a/api/grpcserver/grpcserver_test.go +++ b/api/grpcserver/grpcserver_test.go @@ -6,7 +6,6 @@ import ( "context" "errors" "fmt" - "io" "io/ioutil" "log" "math" @@ -979,7 +978,7 @@ func TestGlobalStateService(t *testing.T) { func TestSmesherService(t *testing.T) { logtest.SetupGlobal(t) - svc := NewSmesherService(&PostAPIMock{}, &SmeshingAPIMock{}) + svc := NewSmesherService(&PostAPIMock{}, &SmeshingAPIMock{}, 10*time.Millisecond) shutDown := launchServer(t, svc) defer shutDown() @@ -988,112 +987,118 @@ func TestSmesherService(t *testing.T) { conn := dialGrpc(ctx, t, cfg) c := pb.NewSmesherServiceClient(conn) - // Construct an array of test cases to test each endpoint in turn - testCases := []struct { - name string - run func(*testing.T) - }{ - {"IsSmeshing", func(t *testing.T) { - logtest.SetupGlobal(t) - res, err := c.IsSmeshing(context.Background(), &empty.Empty{}) - require.NoError(t, err) - require.False(t, res.IsSmeshing, "expected IsSmeshing to be false") - }}, - {"StartSmeshingMissingArgs", func(t *testing.T) { - logtest.SetupGlobal(t) - _, err := c.StartSmeshing(context.Background(), &pb.StartSmeshingRequest{}) - require.Equal(t, codes.InvalidArgument, status.Code(err)) - }}, - {"StartSmeshing", func(t *testing.T) { - logtest.SetupGlobal(t) - opts := &pb.PostSetupOpts{} - opts.DataDir = t.TempDir() - opts.NumUnits = 1 - opts.NumFiles = 1 + t.Run("IsSmeshing", func(t *testing.T) { + logtest.SetupGlobal(t) + res, err := c.IsSmeshing(context.Background(), &empty.Empty{}) + require.NoError(t, err) + require.False(t, res.IsSmeshing, "expected IsSmeshing to be false") + }) - coinbase := &pb.AccountId{Address: addr1.String()} + t.Run("StartSmeshingMissingArgs", func(t *testing.T) { + logtest.SetupGlobal(t) + _, err := c.StartSmeshing(context.Background(), &pb.StartSmeshingRequest{}) + require.Equal(t, codes.InvalidArgument, status.Code(err)) + }) - res, err := c.StartSmeshing(context.Background(), &pb.StartSmeshingRequest{ - Opts: opts, - Coinbase: coinbase, - }) - require.NoError(t, err) - require.Equal(t, int32(code.Code_OK), res.Status.Code) - }}, - {"StopSmeshing", func(t *testing.T) { - logtest.SetupGlobal(t) - res, err := c.StopSmeshing(context.Background(), &pb.StopSmeshingRequest{}) - require.NoError(t, err) - require.Equal(t, int32(code.Code_OK), res.Status.Code) - }}, - {"SmesherID", func(t *testing.T) { - logtest.SetupGlobal(t) - res, err := c.SmesherID(context.Background(), &empty.Empty{}) - require.NoError(t, err) - nodeAddr := types.GenerateAddress(signer.NodeID().ToBytes()) - resAddr, err := types.StringToAddress(res.AccountId.Address) - require.NoError(t, err) - require.Equal(t, nodeAddr.String(), resAddr.String()) - }}, - {"SetCoinbaseMissingArgs", func(t *testing.T) { - logtest.SetupGlobal(t) - _, err := c.SetCoinbase(context.Background(), &pb.SetCoinbaseRequest{}) - require.Error(t, err) - statusCode := status.Code(err) - require.Equal(t, codes.InvalidArgument, statusCode) - }}, - {"SetCoinbase", func(t *testing.T) { - logtest.SetupGlobal(t) - res, err := c.SetCoinbase(context.Background(), &pb.SetCoinbaseRequest{ - Id: &pb.AccountId{Address: addr1.String()}, - }) - require.NoError(t, err) - require.Equal(t, int32(code.Code_OK), res.Status.Code) - }}, - {"Coinbase", func(t *testing.T) { - logtest.SetupGlobal(t) - res, err := c.Coinbase(context.Background(), &empty.Empty{}) - require.NoError(t, err) - addr, err := types.StringToAddress(res.AccountId.Address) - require.NoError(t, err) - require.Equal(t, addr1.Bytes(), addr.Bytes()) - }}, - {"MinGas", func(t *testing.T) { - logtest.SetupGlobal(t) - _, err := c.MinGas(context.Background(), &empty.Empty{}) - require.Error(t, err) - statusCode := status.Code(err) - require.Equal(t, codes.Unimplemented, statusCode) - }}, - {"SetMinGas", func(t *testing.T) { - logtest.SetupGlobal(t) - _, err := c.SetMinGas(context.Background(), &pb.SetMinGasRequest{}) - require.Error(t, err) - statusCode := status.Code(err) - require.Equal(t, codes.Unimplemented, statusCode) - }}, - {"PostSetupComputeProviders", func(t *testing.T) { - logtest.SetupGlobal(t) - _, err := c.PostSetupComputeProviders(context.Background(), &pb.PostSetupComputeProvidersRequest{Benchmark: false}) - require.NoError(t, err) - }}, - {"PostSetupStatusStream", func(t *testing.T) { - logtest.SetupGlobal(t) - stream, err := c.PostSetupStatusStream(context.Background(), &empty.Empty{}) + t.Run("StartSmeshing", func(t *testing.T) { + logtest.SetupGlobal(t) + opts := &pb.PostSetupOpts{} + opts.DataDir = t.TempDir() + opts.NumUnits = 1 + opts.NumFiles = 1 - // Expecting the stream to return a single update before closing. - require.NoError(t, err) + coinbase := &pb.AccountId{Address: addr1.String()} + + res, err := c.StartSmeshing(context.Background(), &pb.StartSmeshingRequest{ + Opts: opts, + Coinbase: coinbase, + }) + require.NoError(t, err) + require.Equal(t, int32(code.Code_OK), res.Status.Code) + }) + + t.Run("StopSmeshing", func(t *testing.T) { + logtest.SetupGlobal(t) + res, err := c.StopSmeshing(context.Background(), &pb.StopSmeshingRequest{}) + require.NoError(t, err) + require.Equal(t, int32(code.Code_OK), res.Status.Code) + }) + + t.Run("SmesherID", func(t *testing.T) { + logtest.SetupGlobal(t) + res, err := c.SmesherID(context.Background(), &empty.Empty{}) + require.NoError(t, err) + nodeAddr := types.GenerateAddress(signer.NodeID().ToBytes()) + resAddr, err := types.StringToAddress(res.AccountId.Address) + require.NoError(t, err) + require.Equal(t, nodeAddr.String(), resAddr.String()) + }) + + t.Run("SetCoinbaseMissingArgs", func(t *testing.T) { + logtest.SetupGlobal(t) + _, err := c.SetCoinbase(context.Background(), &pb.SetCoinbaseRequest{}) + require.Error(t, err) + statusCode := status.Code(err) + require.Equal(t, codes.InvalidArgument, statusCode) + }) + + t.Run("SetCoinbase", func(t *testing.T) { + logtest.SetupGlobal(t) + res, err := c.SetCoinbase(context.Background(), &pb.SetCoinbaseRequest{ + Id: &pb.AccountId{Address: addr1.String()}, + }) + require.NoError(t, err) + require.Equal(t, int32(code.Code_OK), res.Status.Code) + }) + + t.Run("Coinbase", func(t *testing.T) { + logtest.SetupGlobal(t) + res, err := c.Coinbase(context.Background(), &empty.Empty{}) + require.NoError(t, err) + addr, err := types.StringToAddress(res.AccountId.Address) + require.NoError(t, err) + require.Equal(t, addr1.Bytes(), addr.Bytes()) + }) + + t.Run("MinGas", func(t *testing.T) { + logtest.SetupGlobal(t) + _, err := c.MinGas(context.Background(), &empty.Empty{}) + require.Error(t, err) + statusCode := status.Code(err) + require.Equal(t, codes.Unimplemented, statusCode) + }) + + t.Run("SetMinGas", func(t *testing.T) { + logtest.SetupGlobal(t) + _, err := c.SetMinGas(context.Background(), &pb.SetMinGasRequest{}) + require.Error(t, err) + statusCode := status.Code(err) + require.Equal(t, codes.Unimplemented, statusCode) + }) + + t.Run("PostSetupComputeProviders", func(t *testing.T) { + logtest.SetupGlobal(t) + _, err := c.PostSetupComputeProviders(context.Background(), &pb.PostSetupComputeProvidersRequest{Benchmark: false}) + require.NoError(t, err) + }) + + t.Run("PostSetupStatusStream", func(t *testing.T) { + logtest.SetupGlobal(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := c.PostSetupStatusStream(ctx, &empty.Empty{}) + require.NoError(t, err) + + // Expecting the stream to return updates before closing. + for i := 0; i < 3; i++ { _, err = stream.Recv() require.NoError(t, err) - _, err = stream.Recv() - require.EqualError(t, err, io.EOF.Error()) - }}, - } + } - // Run subtests - for _, tc := range testCases { - t.Run(tc.name, tc.run) - } + cancel() + _, err = stream.Recv() + require.ErrorContains(t, err, context.Canceled.Error()) + }) } func TestMeshService(t *testing.T) { diff --git a/api/grpcserver/smesher_service.go b/api/grpcserver/smesher_service.go index 53e8600285..c2b1cad084 100644 --- a/api/grpcserver/smesher_service.go +++ b/api/grpcserver/smesher_service.go @@ -2,6 +2,7 @@ package grpcserver import ( "fmt" + "time" "github.com/golang/protobuf/ptypes/empty" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" @@ -19,7 +20,6 @@ import ( type PostSetupProvider interface { Status() *atypes.PostSetupStatus - StatusChan() <-chan *atypes.PostSetupStatus ComputeProviders() []atypes.PostSetupComputeProvider Benchmark(p atypes.PostSetupComputeProvider) (int, error) Config() atypes.PostConfig @@ -29,6 +29,8 @@ type PostSetupProvider interface { type SmesherService struct { postSetupProvider PostSetupProvider smeshingProvider api.SmeshingAPI + + streamInterval time.Duration } // RegisterService registers this service with a grpc server instance. @@ -37,8 +39,8 @@ func (s SmesherService) RegisterService(server *Server) { } // NewSmesherService creates a new grpc service using config data. -func NewSmesherService(post PostSetupProvider, smeshing api.SmeshingAPI) *SmesherService { - return &SmesherService{post, smeshing} +func NewSmesherService(post PostSetupProvider, smeshing api.SmeshingAPI, streamInterval time.Duration) *SmesherService { + return &SmesherService{post, smeshing, streamInterval} } // IsSmeshing reports whether the node is smeshing. @@ -183,13 +185,13 @@ func (s SmesherService) PostSetupStatus(context.Context, *empty.Empty) (*pb.Post func (s SmesherService) PostSetupStatusStream(_ *empty.Empty, stream pb.SmesherService_PostSetupStatusStreamServer) error { log.Info("GRPC SmesherService.PostSetupStatusStream") - statusChan := s.postSetupProvider.StatusChan() + timer := time.NewTicker(s.streamInterval) + defer timer.Stop() + for { select { - case status, more := <-statusChan: - if !more { - return nil - } + case <-timer.C: + status := s.postSetupProvider.Status() if err := stream.Send(&pb.PostSetupStatusStreamResponse{Status: statusToPbStatus(status)}); err != nil { return fmt.Errorf("send to stream: %w", err) } diff --git a/cmd/node/node.go b/cmd/node/node.go index 1eaf6c85df..1baebbaf42 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -831,7 +831,7 @@ func (app *App) startAPIServices(ctx context.Context) { app.closers = append(app.closers, nodeService) } if apiConf.StartSmesherService { - registerService(grpcserver.NewSmesherService(app.postSetupMgr, app.atxBuilder)) + registerService(grpcserver.NewSmesherService(app.postSetupMgr, app.atxBuilder, apiConf.SmesherStreamInterval)) } if apiConf.StartTransactionService { registerService(grpcserver.NewTransactionService(app.db, app.host, app.mesh, app.conState, app.syncer)) diff --git a/go.mod b/go.mod index 05c5e2682d..77b36cd853 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/spacemeshos/go-scale v1.1.0 github.com/spacemeshos/merkle-tree v0.1.0 github.com/spacemeshos/poet v0.2.1 - github.com/spacemeshos/post v0.1.2 + github.com/spacemeshos/post v0.2.0 github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.14.0 diff --git a/go.sum b/go.sum index 55b1376611..568e11b9ef 100644 --- a/go.sum +++ b/go.sum @@ -620,8 +620,8 @@ github.com/spacemeshos/merkle-tree v0.1.0 h1:3oGOfJab60BPy0qn05gHYQpqvpA/Szr7Cge github.com/spacemeshos/merkle-tree v0.1.0/go.mod h1:uGtTEKksCgRdSMyeDdHM1W2d/rI7CxYlzNnKP1GU5Mw= github.com/spacemeshos/poet v0.2.1 h1:m//vgqV5e6UrIv1xfYQbk0OLhVlr5037oa9RO2VQzic= github.com/spacemeshos/poet v0.2.1/go.mod h1:7EJZ+I19+UL39FxXG65SsBQtFoRdkZGBwZhqkcUsC/g= -github.com/spacemeshos/post v0.1.2 h1:XZTCxyuDwUudsdczzDcOBzFflOU3snWo/mQ6mXi0XGs= -github.com/spacemeshos/post v0.1.2/go.mod h1:9KZB+Wd/WeNkB8wU522BsG0a8IVBavb7BbiKMPUHQPI= +github.com/spacemeshos/post v0.2.0 h1:O4JeCDbqXyskRVlnmE8m1HMypeglgE0YvBtYQeKn4M4= +github.com/spacemeshos/post v0.2.0/go.mod h1:9KZB+Wd/WeNkB8wU522BsG0a8IVBavb7BbiKMPUHQPI= github.com/spacemeshos/smutil v0.0.0-20220819180433-6aaadca3eb1d h1:09R53k+V+cPCwTH3AnvOJO2OmCb4dJlG6hfgqQlTm6w= github.com/spacemeshos/smutil v0.0.0-20220819180433-6aaadca3eb1d/go.mod h1:ZCb6K2K6Dz51qrYp04sZtf4kJex9lCwGBH5Npok9uy8= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU=