Skip to content

Commit

Permalink
Use updated Initializer API from PoST (#3715)
Browse files Browse the repository at this point in the history
## Motivation
Integrates changes of spacemeshos/post#78 into go-spacemesh, only merge after the former has been merged.

## Changes
The asynchronous `Initializer::SessionNumLabelsWrittenChan()` is removed in favor of the synchronous `Initializer::SessionNumLabelsWritten()` with the changes in spacemeshos/post#84

This updates go-spacemesh to not use the removed method any more.

The GRPC server now sends PoST status updates to a client in 1 second intervals instead of relying on PoST to report its status in (possibly) irregular intervals. This also means that a client now has to cancel a request or it will receive status updates about PoST indefinitely.

## Test Plan
<!-- Please specify how these changes were tested 
(e.g. unit tests, manual testing, etc.) -->

## TODO
<!-- This section should be removed when all items are complete -->
- [x] Explain motivation or link existing issue(s)
- [x] Test changes and document test plan
- [x] Update documentation as needed

## DevOps Notes
<!-- Please uncheck these items as applicable to make DevOps aware of changes that may affect releases -->
- [x] This PR does not require configuration changes (e.g., environment variables, GitHub secrets, VM resources)
- [x] This PR does not affect public APIs
- [x] This PR does not rely on a new version of external services (PoET, elasticsearch, etc.)
- [x] This PR does not make changes to log messages (which monitoring infrastructure may rely on)
  • Loading branch information
fasmat committed Nov 9, 2022
1 parent 58a8f87 commit 69556b3
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 353 deletions.
14 changes: 0 additions & 14 deletions activation/mocks/post.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

149 changes: 51 additions & 98 deletions activation/post.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package activation

import (
"context"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -32,7 +33,6 @@ func DefaultPostSetupOpts() atypes.PostSetupOpts {
// PostSetupProvider defines the functionality required for Post setup.
type PostSetupProvider interface {
Status() *atypes.PostSetupStatus
StatusChan() <-chan *atypes.PostSetupStatus
ComputeProviders() []atypes.PostSetupComputeProvider
Benchmark(p atypes.PostSetupComputeProvider) (int, error)
StartSession(opts atypes.PostSetupOpts, commitmentAtx types.ATXID) (chan struct{}, error)
Expand All @@ -53,8 +53,7 @@ type PostSetupManager struct {
db *datastore.CachedDB
goldenATXID types.ATXID

state atypes.PostSetupState
initCompletedChan chan struct{}
state atypes.PostSetupState

// init is the current initializer instance. It is being
// replaced at the beginning of every data creation session.
Expand All @@ -63,9 +62,8 @@ type PostSetupManager struct {
lastOpts *atypes.PostSetupOpts
lastErr error

// startedChan indicates whether a data creation session has started.
// The channel instance is replaced in the end of the session.
startedChan chan struct{}
// cancel is the function that PostSetupManager can invoke to cancel the execution of the initializer
cancel context.CancelFunc

// doneChan indicates whether the current data creation session has finished.
// The channel instance is replaced in the beginning of the session.
Expand All @@ -75,14 +73,12 @@ type PostSetupManager struct {
// NewPostSetupManager creates a new instance of PostSetupManager.
func NewPostSetupManager(id types.NodeID, cfg atypes.PostConfig, logger log.Log, db *datastore.CachedDB, goldenATXID types.ATXID) (*PostSetupManager, error) {
mgr := &PostSetupManager{
id: id,
cfg: cfg,
logger: logger,
db: db,
goldenATXID: goldenATXID,
state: atypes.PostSetupStateNotStarted,
initCompletedChan: make(chan struct{}),
startedChan: make(chan struct{}),
id: id,
cfg: cfg,
logger: logger,
db: db,
goldenATXID: goldenATXID,
state: atypes.PostSetupStateNotStarted,
}

return mgr, nil
Expand All @@ -92,58 +88,23 @@ var errNotComplete = errors.New("not complete")

// Status returns the setup current status.
func (mgr *PostSetupManager) Status() *atypes.PostSetupStatus {
status := &atypes.PostSetupStatus{}

mgr.mu.Lock()
status.State = mgr.state
init := mgr.init
mgr.mu.Unlock()

if status.State == atypes.PostSetupStateNotStarted {
return status
}

status.NumLabelsWritten = init.SessionNumLabelsWritten()
status.LastOpts = mgr.LastOpts()
status.LastError = mgr.LastError()

return status
}

// StatusChan returns a channel with status updates of the setup current or the upcoming session.
func (mgr *PostSetupManager) StatusChan() <-chan *atypes.PostSetupStatus {
// Wait for session to start because only then the initializer instance
// used for retrieving the progress updates is already set.
mgr.mu.Lock()
startedChan := mgr.startedChan
mgr.mu.Unlock()

<-startedChan

statusChan := make(chan *atypes.PostSetupStatus, 1024)
go func() {
defer close(statusChan)

initialStatus := mgr.Status()
statusChan <- initialStatus

mgr.mu.Lock()
init := mgr.init
mgr.mu.Unlock()
defer mgr.mu.Unlock()

ch := init.SessionNumLabelsWrittenChan()
for numLabelsWritten := range ch {
status := *initialStatus
status.NumLabelsWritten = numLabelsWritten
statusChan <- &status
}
status := mgr.state

if finalStatus := mgr.Status(); finalStatus.LastError != nil {
statusChan <- finalStatus
if status == atypes.PostSetupStateNotStarted {
return &atypes.PostSetupStatus{
State: status,
}
}()
}

return statusChan
return &atypes.PostSetupStatus{
State: mgr.state,
NumLabelsWritten: mgr.init.SessionNumLabelsWritten(),
LastOpts: mgr.lastOpts,
LastError: mgr.lastErr,
}
}

// ComputeProviders returns a list of available compute providers for Post setup.
Expand Down Expand Up @@ -189,32 +150,27 @@ func (mgr *PostSetupManager) Benchmark(p atypes.PostSetupComputeProvider) (int,
// It supports resuming a previously started session, as well as changing the Post setup options (e.g., number of units)
// after initial setup.
func (mgr *PostSetupManager) StartSession(opts atypes.PostSetupOpts, commitmentAtx types.ATXID) (chan struct{}, error) {
state := mgr.getState()
mgr.mu.Lock()

if state == atypes.PostSetupStateInProgress {
if mgr.state == atypes.PostSetupStateInProgress {
mgr.mu.Unlock()
return nil, fmt.Errorf("post setup session in progress")
}
if state == atypes.PostSetupStateComplete {
if mgr.state == atypes.PostSetupStateComplete {
// Check whether the new request invalidates the current status.
lastOpts := mgr.LastOpts()
lastOpts := mgr.lastOpts
invalidate := opts.DataDir != lastOpts.DataDir || opts.NumUnits != lastOpts.NumUnits
if !invalidate {
// Already complete.
mgr.mu.Unlock()
return mgr.doneChan, nil
}

mgr.mu.Lock()
mgr.initCompletedChan = make(chan struct{})
mgr.mu.Unlock()
}

mgr.mu.Lock()
mgr.state = atypes.PostSetupStateInProgress
mgr.mu.Unlock()

if opts.ComputeProviderID == config.BestProviderID {
p, err := mgr.BestProvider()
if err != nil {
mgr.mu.Unlock()
return nil, err
}

Expand All @@ -223,29 +179,35 @@ func (mgr *PostSetupManager) StartSession(opts atypes.PostSetupOpts, commitmentA
}

commitment := GetCommitmentBytes(mgr.id, commitmentAtx)
newInit, err := initialization.NewInitializer(config.Config(mgr.cfg), config.InitOpts(opts), commitment)
newInit, err := initialization.NewInitializer(
initialization.WithCommitment(commitment),
initialization.WithConfig(config.Config(mgr.cfg)),
initialization.WithInitOpts(config.InitOpts(opts)),
initialization.WithLogger(mgr.logger),
)
if err != nil {
mgr.mu.Lock()
mgr.state = atypes.PostSetupStateError
mgr.lastErr = err
mgr.mu.Unlock()
return nil, fmt.Errorf("new initializer: %w", err)
}

newInit.SetLogger(mgr.logger)

mgr.mu.Lock()
mgr.state = atypes.PostSetupStateInProgress
mgr.init = newInit
mgr.lastOpts = &opts
mgr.lastErr = nil
close(mgr.startedChan)
// TODO(mafa): the context used here should be passed in as argument to StartSession
// and instead of having a StopSession method the caller should just cancel the context when they want to stop the session.
ctx, cancel := context.WithCancel(context.TODO())
mgr.cancel = cancel
mgr.doneChan = make(chan struct{})
mgr.mu.Unlock()

go func() {
defer func() {
mgr.mu.Lock()
mgr.startedChan = make(chan struct{})
mgr.cancel()
mgr.cancel = nil
close(mgr.doneChan)
mgr.mu.Unlock()
}()
Expand All @@ -258,12 +220,12 @@ func (mgr *PostSetupManager) StartSession(opts atypes.PostSetupOpts, commitmentA
log.String("provider", fmt.Sprintf("%d", opts.ComputeProviderID)),
)

if err := newInit.Initialize(); err != nil {
if err := newInit.Initialize(ctx); err != nil {
mgr.mu.Lock()
defer mgr.mu.Unlock()

if errors.Is(err, initialization.ErrStopped) {
mgr.logger.Info("post setup session stopped")
if errors.Is(err, context.Canceled) {
mgr.logger.Info("post setup session was stopped")
mgr.state = atypes.PostSetupStateNotStarted
} else {
mgr.state = atypes.PostSetupStateError
Expand All @@ -281,7 +243,6 @@ func (mgr *PostSetupManager) StartSession(opts atypes.PostSetupOpts, commitmentA

mgr.mu.Lock()
mgr.state = atypes.PostSetupStateComplete
close(mgr.initCompletedChan)
mgr.mu.Unlock()
}()

Expand All @@ -298,9 +259,7 @@ func (mgr *PostSetupManager) StopSession(deleteFiles bool) error {
mgr.mu.Unlock()

if state == atypes.PostSetupStateInProgress {
if err := init.Stop(); err != nil {
return fmt.Errorf("stop: %w", err)
}
mgr.cancel()

// Block until the current data creation session will be finished.
<-doneChan
Expand All @@ -314,7 +273,6 @@ func (mgr *PostSetupManager) StopSession(deleteFiles bool) error {
mgr.mu.Lock()
// Reset internal state.
mgr.state = atypes.PostSetupStateNotStarted
mgr.initCompletedChan = make(chan struct{})
mgr.mu.Unlock()
}

Expand All @@ -323,14 +281,16 @@ func (mgr *PostSetupManager) StopSession(deleteFiles bool) error {

// GenerateProof generates a new Post.
func (mgr *PostSetupManager) GenerateProof(challenge []byte, commitmentAtx types.ATXID) (*types.Post, *types.PostMetadata, error) {
state := mgr.getState()
mgr.mu.Lock()

if state != atypes.PostSetupStateComplete {
if mgr.state != atypes.PostSetupStateComplete {
mgr.mu.Unlock()
return nil, nil, errNotComplete
}
mgr.mu.Unlock()

commitment := GetCommitmentBytes(mgr.id, commitmentAtx)
prover, err := proving.NewProver(config.Config(mgr.cfg), mgr.LastOpts().DataDir, commitment)
prover, err := proving.NewProver(config.Config(mgr.cfg), mgr.lastOpts.DataDir, commitment)
if err != nil {
return nil, nil, fmt.Errorf("new prover: %w", err)
}
Expand Down Expand Up @@ -374,13 +334,6 @@ func (mgr *PostSetupManager) Config() atypes.PostConfig {
return mgr.cfg
}

func (mgr *PostSetupManager) getState() atypes.PostSetupState {
mgr.mu.Lock()
defer mgr.mu.Unlock()

return mgr.state
}

func GetCommitmentBytes(id types.NodeID, commitmentAtx types.ATXID) []byte {
h := hash.Sum(append(id.ToBytes(), commitmentAtx.Bytes()...))
return h[:]
Expand Down
Loading

0 comments on commit 69556b3

Please sign in to comment.