Skip to content

Commit

Permalink
Replace asynchronous API with synchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Nov 7, 2022
1 parent 46082e8 commit 33ae1de
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 271 deletions.
37 changes: 0 additions & 37 deletions activation/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func DefaultPostSetupOpts() atypes.PostSetupOpts {
// PostSetupProvider defines the functionality required for Post setup.
type PostSetupProvider interface {
Status() *atypes.PostSetupStatus
StatusChan() <-chan *atypes.PostSetupStatus
ComputeProviders() []atypes.PostSetupComputeProvider
Benchmark(p atypes.PostSetupComputeProvider) (int, error)
StartSession(opts atypes.PostSetupOpts, commitmentAtx types.ATXID) (chan struct{}, error)
Expand Down Expand Up @@ -110,42 +109,6 @@ func (mgr *PostSetupManager) Status() *atypes.PostSetupStatus {
return status
}

// StatusChan returns a channel with status updates of the setup current or the upcoming session.
func (mgr *PostSetupManager) StatusChan() <-chan *atypes.PostSetupStatus {
// Wait for session to start because only then the initializer instance
// used for retrieving the progress updates is already set.
mgr.mu.Lock()
startedChan := mgr.startedChan
mgr.mu.Unlock()

<-startedChan

statusChan := make(chan *atypes.PostSetupStatus, 1024)
go func() {
defer close(statusChan)

initialStatus := mgr.Status()
statusChan <- initialStatus

mgr.mu.Lock()
init := mgr.init
mgr.mu.Unlock()

ch := init.SessionNumLabelsWrittenChan()
for numLabelsWritten := range ch {
status := *initialStatus
status.NumLabelsWritten = numLabelsWritten
statusChan <- &status
}

if finalStatus := mgr.Status(); finalStatus.LastError != nil {
statusChan <- finalStatus
}
}()

return statusChan
}

// ComputeProviders returns a list of available compute providers for Post setup.
func (mgr *PostSetupManager) ComputeProviders() []atypes.PostSetupComputeProvider {
providers := initialization.Providers()
Expand Down
167 changes: 40 additions & 127 deletions activation/post_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package activation

import (
"sync"
"context"
"testing"
"time"

"github.com/spacemeshos/post/initialization"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

atypes "github.com/spacemeshos/go-spacemesh/activation/types"
"github.com/spacemeshos/go-spacemesh/common/types"
Expand Down Expand Up @@ -34,32 +35,42 @@ func TestPostSetupManager(t *testing.T) {
mgr, err := NewPostSetupManager(id, cfg, logtest.New(t), cdb, goldenATXID)
req.NoError(err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var eg errgroup.Group
lastStatus := &atypes.PostSetupStatus{}
go func() {
for status := range mgr.StatusChan() {
req.True(status.NumLabelsWritten >= lastStatus.NumLabelsWritten)
req.Equal(opts, *status.LastOpts)
req.Nil(status.LastError)

if status.NumLabelsWritten == uint64(opts.NumUnits)*cfg.LabelsPerUnit {
// TODO(moshababo): fix the following failure. `status.State` changes to `postSetupStateComplete` only after the channel event was triggered.
// req.Equal(postSetupStateComplete, status.State)
} else {
req.Equal(atypes.PostSetupStateInProgress, status.State)
}
eg.Go(func() error {
timer := time.NewTicker(50 * time.Millisecond)
defer timer.Stop()

lastStatus = status
for {
select {
case <-ctx.Done():
return nil
case <-timer.C:
status := mgr.Status()
req.True(status.NumLabelsWritten >= lastStatus.NumLabelsWritten)
req.Equal(opts, *status.LastOpts)
req.Nil(status.LastError)

if status.NumLabelsWritten < uint64(opts.NumUnits)*cfg.LabelsPerUnit {
req.Equal(atypes.PostSetupStateInProgress, status.State)
}
}
}
}()
})

// Create data.
doneChan, err := mgr.StartSession(opts, goldenATXID)
req.NoError(err)
<-doneChan
cancel()
eg.Wait()

req.Equal(opts, *mgr.LastOpts())
req.NoError(mgr.LastError())
// TODO(moshababo): fix the following failure. `status.State` changes to `postSetupStateComplete` only after the channel event was triggered.
// req.Equal(lastStatus, mgr.Status())
req.Equal(atypes.PostSetupStateComplete, mgr.Status().State)

// Create data (same opts).
doneChan, err = mgr.StartSession(opts, goldenATXID)
Expand All @@ -69,17 +80,15 @@ func TestPostSetupManager(t *testing.T) {
req.NoError(mgr.LastError())

// Cleanup.
err = mgr.StopSession(true)
req.NoError(err)
req.NoError(mgr.StopSession(true))

// Create data (same opts, after deletion).
doneChan, err = mgr.StartSession(opts, goldenATXID)
req.NoError(err)
<-doneChan
req.Equal(opts, *mgr.LastOpts())
req.NoError(mgr.LastError())
// TODO(moshababo): fix the following failure. `status.State` changes to `postSetupStateComplete` only after the channel event was triggered.
// req.Equal(lastStatus, mgr.Status())
req.Equal(atypes.PostSetupStateComplete, mgr.Status().State)
}

func TestPostSetupManager_InitialStatus(t *testing.T) {
Expand All @@ -101,10 +110,7 @@ func TestPostSetupManager_InitialStatus(t *testing.T) {
doneChan, err := mgr.StartSession(opts, goldenATXID)
req.NoError(err)
<-doneChan

// Compare the last status update to the status queried directly.
// TODO(moshababo): fix the following failure. `status.State` changes to `postSetupStateComplete` only after the channel event was triggered.
// req.Equal(lastStatus, mgr.Status())
req.Equal(atypes.PostSetupStateComplete, mgr.Status().State)

// Re-instantiate `PostSetupManager`.
mgr, err = NewPostSetupManager(id, cfg, logtest.New(t), cdb, goldenATXID)
Expand Down Expand Up @@ -149,94 +155,6 @@ func TestPostSetupManager_GenerateProof(t *testing.T) {
req.ErrorIs(err, errNotComplete)
}

func TestPostSetupManager_StatusChan_BeforeSessionStarted(t *testing.T) {
req := require.New(t)

cdb := newCachedDB(t)
cfg, opts := getTestConfig(t)
mgr, err := NewPostSetupManager(id, cfg, logtest.New(t), cdb, goldenATXID)
req.NoError(err)

// Verify that the status stream works properly when called *before* session started.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch := mgr.StatusChan()
var prevStatus *atypes.PostSetupStatus
for {
status, more := <-ch
if more {
if prevStatus == nil {
// Verify initial status.
req.Equal(uint64(0), status.NumLabelsWritten)
}
prevStatus = status
} else {
// Verify last status.
req.Equal(uint64(opts.NumUnits)*cfg.LabelsPerUnit, prevStatus.NumLabelsWritten)
break
}
}
}()

// Create data.
time.Sleep(1 * time.Second) // Short delay.
doneChan, err := mgr.StartSession(opts, goldenATXID)
req.NoError(err)
<-doneChan
wg.Wait()

// Cleanup.
err = mgr.StopSession(true)
req.NoError(err)
}

func TestPostSetupManager_StatusChan_AfterSessionStarted(t *testing.T) {
req := require.New(t)

cdb := newCachedDB(t)
cfg, opts := getTestConfig(t)
opts.NumUnits *= 10
mgr, err := NewPostSetupManager(id, cfg, logtest.New(t), cdb, goldenATXID)
req.NoError(err)

// Verify that the status stream works properly when called *after* session started (yet before it ended).
var wg sync.WaitGroup
wg.Add(1)
go func() {
time.Sleep(100 * time.Millisecond) // Short delay.

ch := mgr.StatusChan()
var prevStatus *atypes.PostSetupStatus
for {
status, more := <-ch
if more {
if prevStatus == nil {
// Verify initial status.
req.Equal(uint64(0), status.NumLabelsWritten)
}
prevStatus = status
} else {
// Verify last status.
req.Equal(uint64(opts.NumUnits)*cfg.LabelsPerUnit, prevStatus.NumLabelsWritten)
break
}
}
wg.Done()
}()

// Create data.
doneChan, err := mgr.StartSession(opts, goldenATXID)
req.NoError(err)
<-doneChan
wg.Wait()

// Cleanup.
err = mgr.StopSession(true)
req.NoError(err)
}

func TestPostSetupManager_Stop(t *testing.T) {
req := require.New(t)

Expand All @@ -248,40 +166,36 @@ func TestPostSetupManager_Stop(t *testing.T) {
// Verify state.
status := mgr.Status()
req.Equal(atypes.PostSetupStateNotStarted, status.State)
req.Zero(status.NumLabelsWritten)
req.Nil(status.LastOpts)

// Create data.
doneChan, err := mgr.StartSession(opts, goldenATXID)
req.NoError(err)
<-doneChan

// Verify state.
status = mgr.Status()
req.Equal(atypes.PostSetupStateComplete, status.State)
req.Equal(atypes.PostSetupStateComplete, mgr.Status().State)

// Stop without file deletion.
err = mgr.StopSession(false)
req.NoError(err)
req.NoError(mgr.StopSession(false))

// Verify state.
status = mgr.Status()
req.Equal(atypes.PostSetupStateComplete, status.State)
req.Equal(atypes.PostSetupStateComplete, mgr.Status().State)

// Stop with file deletion.
err = mgr.StopSession(true)
req.NoError(err)
req.NoError(mgr.StopSession(true))

// Verify state.
status = mgr.Status()
req.Equal(atypes.PostSetupStateNotStarted, status.State)
req.Equal(atypes.PostSetupStateNotStarted, mgr.Status().State)

// Create data again.
doneChan, err = mgr.StartSession(opts, goldenATXID)
req.NoError(err)
<-doneChan

// Verify state.
status = mgr.Status()
req.Equal(atypes.PostSetupStateComplete, status.State)
req.Equal(atypes.PostSetupStateComplete, mgr.Status().State)
}

func TestPostSetupManager_Stop_WhileInProgress(t *testing.T) {
Expand All @@ -306,8 +220,7 @@ func TestPostSetupManager_Stop_WhileInProgress(t *testing.T) {
req.Equal(atypes.PostSetupStateInProgress, status.State)

// Stop without files deletion.
err = mgr.StopSession(false)
req.NoError(err)
req.NoError(mgr.StopSession(false))

select {
case <-doneChan:
Expand Down
Loading

0 comments on commit 33ae1de

Please sign in to comment.