Skip to content

Commit

Permalink
node: admin events (#4641)
Browse files Browse the repository at this point in the history
api: spacemeshos/api#247

launch standalone node
> ./build/go-spacemesh --preset=standalone --genesis-time=2023-07-04T9:05:00.000Z

watch events
> grpcurl -plaintext 0.0.0.0:10093 spacemesh.v1.AdminService.EventsStream

```
{
  "timestamp": "2023-07-04T08:55:00.355785009Z",
  "help": "Published proposal. Rewards will be received, once proposal is included into the block.",
  "proposal": {
    "layer": 150,
    "proposal": "i5iBp35b53G7OdTKsd9Y1tRc5HM="
  }
}
{
  "timestamp": "2023-07-04T08:55:01.025198892Z",
  "help": "Node started post execution for the challenge from poet.",
  "postStart": {
    "challenge": "mvy+3M6Ja2e8GWRsCP5Rqzuw+n8xIKtgnDGz296zKuU="
  }
}
```

admin events can be watched by subscribing to spacemesh.v1.AdminService.EventsStream. each event has 4 fields:
- timestamp
- failure. denotes that certain expected step has failed. we considered an alternative to provide a special failure events, but it would require maintaining a mapping on client side, and it is unclear if we need that. for now if any step has failure=true, user will be referred to logs or relevant documentation.
- help. this is initial approach to communicate what each events means to facilitate integration. later smapp can write better explanation and we will deprecate help.
- details. uses protobuf oneof, it enables smapp to reliably use those fields for custom representation and use this events in tests for better assertions. i considered encoding details in json, but it makes everything else worse but makes it a bit easier to add new events, which is probably not the right tradeoff
  • Loading branch information
dshulyak committed Jul 6, 2023
1 parent f8bd113 commit 97607df
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 20 deletions.
13 changes: 13 additions & 0 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/signing"
Expand Down Expand Up @@ -274,10 +275,13 @@ func (b *Builder) generateInitialPost(ctx context.Context) error {
// Create the initial post and save it.
startTime := time.Now()
var err error
events.EmitPostStart(shared.ZeroChallenge)
b.initialPost, _, err = b.postSetupProvider.GenerateProof(ctx, shared.ZeroChallenge)
if err != nil {
events.EmitPostFailure()
return fmt.Errorf("post execution: %w", err)
}
events.EmitPostComplete(shared.ZeroChallenge)
metrics.PostDuration.Set(float64(time.Since(startTime).Nanoseconds()))

if err := savePost(b.nipostBuilder.DataDir(), b.initialPost); err != nil {
Expand Down Expand Up @@ -376,6 +380,9 @@ func (b *Builder) buildNIPostChallenge(ctx context.Context) (*types.NIPostChalle
log.Uint32("current epoch", current.Uint32()),
log.Duration("wait", wait),
)
if wait > 0 {
events.EmitPoetWaitRound(current, current+1, wait)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down Expand Up @@ -517,6 +524,12 @@ func (b *Builder) PublishActivationTx(ctx context.Context) error {

logger.Event().Info("atx published", log.Inline(atx), log.Int("size", size))

events.EmitAtxPublished(
atx.PublishEpoch, atx.TargetEpoch(),
atx.ID(),
time.Until(b.layerClock.LayerToTime(atx.TargetEpoch().FirstLayer())),
)

select {
case <-atxReceived:
logger.With().Info("received atx in db", atx.ID())
Expand Down
7 changes: 6 additions & 1 deletion activation/nipost.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/spacemeshos/go-spacemesh/activation/metrics"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/signing"
)
Expand Down Expand Up @@ -189,6 +190,8 @@ func (nb *NIPostBuilder) BuildNIPost(ctx context.Context, challenge *types.NIPos
}
getProofsCtx, cancel := context.WithDeadline(ctx, poetProofDeadline)
defer cancel()

events.EmitPoetWaitProof(challenge.PublishEpoch, challenge.TargetEpoch(), time.Until(poetRoundEnd))
poetProofRef, membership, err := nb.getBestProof(getProofsCtx, nb.state.Challenge)
if err != nil {
return nil, 0, &PoetSvcUnstableError{msg: "getBestProof failed", source: err}
Expand All @@ -206,11 +209,13 @@ func (nb *NIPostBuilder) BuildNIPost(ctx context.Context, challenge *types.NIPos
if nb.state.NIPost.Post == nil {
nb.log.With().Info("starting post execution", log.Binary("challenge", nb.state.PoetProofRef[:]))
startTime := time.Now()
events.EmitPostStart(nb.state.PoetProofRef[:])
proof, proofMetadata, err := nb.postSetupProvider.GenerateProof(ctx, nb.state.PoetProofRef[:])
if err != nil {
events.EmitPostFailure()
return nil, 0, fmt.Errorf("failed to generate Post: %v", err)
}

events.EmitPostComplete(nb.state.PoetProofRef[:])
postGenDuration = time.Since(startTime)
nb.log.With().Info("finished post execution", log.Duration("duration", postGenDuration))

Expand Down
4 changes: 3 additions & 1 deletion activation/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
Expand Down Expand Up @@ -293,8 +294,9 @@ func (mgr *PostSetupManager) StartSession(ctx context.Context) error {
log.String("labels_per_unit", fmt.Sprintf("%d", mgr.cfg.LabelsPerUnit)),
log.String("provider", fmt.Sprintf("%d", mgr.lastOpts.ProviderID)),
)

events.EmitInitStart(mgr.id, mgr.commitmentAtxId)
err = mgr.init.Initialize(ctx)
events.EmitInitComplete(err != nil)

mgr.mu.Lock()
defer mgr.mu.Unlock()
Expand Down
26 changes: 26 additions & 0 deletions api/grpcserver/admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/spacemeshos/go-spacemesh/checkpoint"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)
Expand Down Expand Up @@ -95,3 +96,28 @@ func (a AdminService) Recover(_ context.Context, _ *pb.RecoverRequest) (*empty.E
a.logger.Panic("going to recover from checkpoint")
return &empty.Empty{}, nil
}

func (a AdminService) EventsStream(req *pb.EventStreamRequest, stream pb.AdminService_EventsStreamServer) error {
sub, err := events.Subscribe[events.UserEvent]()
if err != nil {
return status.Errorf(codes.FailedPrecondition, err.Error())
}
defer sub.Close()
// send empty header after subscribing to the channel.
// this is optional but allows subscriber to wait until stream is fully initialized.
if err := stream.SendHeader(metadata.MD{}); err != nil {
return status.Errorf(codes.Unavailable, "can't send header")
}
for {
select {
case <-stream.Context().Done():
return nil
case <-sub.Full():
return status.Errorf(codes.Canceled, "buffer is full")
case ev := <-sub.Out():
if err := stream.Send(ev.Event); err != nil {
return fmt.Errorf("send to stream: %w", err)
}
}
}
}
13 changes: 8 additions & 5 deletions config/presets/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"time"

postCfg "github.com/spacemeshos/post/config"
"github.com/spacemeshos/post/initialization"

"github.com/spacemeshos/go-spacemesh/common/types"
Expand All @@ -20,6 +21,7 @@ func standalone() config.Config {
conf := config.DefaultConfig()
conf.Address = types.DefaultTestAddressConfig()

conf.TIME.Peersync.Disable = true
conf.Standalone = true
conf.DataDirParent = filepath.Join(os.TempDir(), "spacemesh")
conf.FileLock = filepath.Join(conf.DataDirParent, "LOCK")
Expand Down Expand Up @@ -48,16 +50,17 @@ func standalone() config.Config {
conf.POST.K1 = 12
conf.POST.K2 = 4
conf.POST.K3 = 4
conf.POST.LabelsPerUnit = 128
conf.POST.MaxNumUnits = 4
conf.POST.MinNumUnits = 2
conf.POST.LabelsPerUnit = 64
conf.POST.MaxNumUnits = 2
conf.POST.MinNumUnits = 1

conf.SMESHING.CoinbaseAccount = types.GenerateAddress([]byte("1")).String()
conf.SMESHING.Start = true
conf.SMESHING.Opts.ProviderID = int(initialization.CPUProviderID())
conf.SMESHING.Opts.NumUnits = 2
conf.SMESHING.Opts.NumUnits = 1
conf.SMESHING.Opts.Throttle = true
conf.SMESHING.Opts.DataDir = conf.DataDirParent
conf.SMESHING.ProvingOpts.Flags = postCfg.RecommendedPowFlags()

conf.Beacon.Kappa = 40
conf.Beacon.Theta = big.NewRat(1, 4)
Expand All @@ -71,7 +74,7 @@ func standalone() config.Config {
conf.Beacon.VotesLimit = 100

conf.PoETServers = []string{"http://0.0.0.0:10010"}
conf.POET.GracePeriod = 10 * time.Second
conf.POET.GracePeriod = 5 * time.Second
conf.POET.CycleGap = 30 * time.Second
conf.POET.PhaseShift = 30 * time.Second

Expand Down
205 changes: 205 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package events

import (
"time"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
)

type UserEvent struct {
Event *pb.Event
}

func EmitBeacon(epoch types.EpochID, beacon types.Beacon) {
const help = "Node computed randomness beacon, it will be used to determine eligibility to participate in the consensus."
emitUserEvent(
help,
false,
&pb.Event_Beacon{
Beacon: &pb.EventBeacon{
Epoch: epoch.Uint32(),
Beacon: beacon[:],
},
},
)
}

func EmitInitStart(smesher types.NodeID, commitment types.ATXID) {
const help = "Node started post data initialization. Note that init is noop if node restarted when init was ready."
emitUserEvent(
help,
false,
&pb.Event_InitStart{
InitStart: &pb.EventInitStart{
Smesher: smesher[:],
Commitment: commitment[:],
},
},
)
}

func EmitInitComplete(failure bool) {
const help = "Node completed post data initialization."
emitUserEvent(
help,
failure,
&pb.Event_InitComplete{
InitComplete: &pb.EventInitComplete{},
},
)
}

func EmitPoetWaitRound(current, publish types.EpochID, wait time.Duration) {
const help = "Node needs to wait for poet registration window in current epoch to open. " +
"Once opened it will submit challenge and wait till poet round ends in publish epoch."
emitUserEvent(
help,
false,
&pb.Event_PoetWaitRound{PoetWaitRound: &pb.EventPoetWaitRound{
Current: current.Uint32(),
Publish: publish.Uint32(),
Wait: durationpb.New(wait),
}},
)
}

type EventPoetWaitEnd struct {
Publish types.EpochID `json:"publish"`
Target types.EpochID `json:"target"`
Wait time.Duration `json:"wait"`
}

func EmitPoetWaitProof(publish, target types.EpochID, wait time.Duration) {
const help = "Node needs to wait for poet to complete in publish epoch. " +
"Once completed, node fetches proof from poet and runs post on that proof. " +
"After that publish an ATX that will be eligible for rewards in target epoch."
emitUserEvent(
help,
false,
&pb.Event_PoetWaitProof{
PoetWaitProof: &pb.EventPoetWaitProof{
Publish: publish.Uint32(),
Target: target.Uint32(),
Wait: durationpb.New(wait),
},
},
)
}

func EmitPostStart(challenge []byte) {
const help = "Node started post execution for the challenge from poet."
emitUserEvent(
help,
false,
&pb.Event_PostStart{PostStart: &pb.EventPostStart{Challenge: challenge}},
)
}

func EmitPostComplete(challenge []byte) {
const help = "Node finished post execution for challenge."
emitUserEvent(
help,
false,
&pb.Event_PostComplete{PostComplete: &pb.EventPostComplete{Challenge: challenge}},
)
}

func EmitPostFailure() {
const help = "Node failed post execution."
emitUserEvent(
help,
true,
&pb.Event_PostComplete{PostComplete: &pb.EventPostComplete{}},
)
}

func EmitAtxPublished(
current, target types.EpochID,
id types.ATXID,
wait time.Duration,
) {
const help = "Published activation for the current epoch. " +
"Node needs to wait till the start of the target epoch in order to be eligible for rewards."
emitUserEvent(
help,
false,
&pb.Event_AtxPublished{
AtxPublished: &pb.EventAtxPubished{
Current: current.Uint32(),
Target: target.Uint32(),
Id: id[:],
Wait: durationpb.New(wait),
},
},
)
}

func EmitEligibilities(
epoch types.EpochID,
beacon types.Beacon,
atx types.ATXID,
activeSetSize uint32,
eligibilities map[types.LayerID][]types.VotingEligibility,
) {
const help = "Computed eligibilities for the epoch. " +
"Rewards will be received after publishing proposals at specified layers. " +
"Total amount of rewards in SMH will be based on other participants in the layer."
emitUserEvent(
help,
false,
&pb.Event_Eligibilities{
Eligibilities: &pb.EventEligibilities{
Epoch: epoch.Uint32(),
Beacon: beacon[:],
Atx: atx[:],
ActiveSetSize: activeSetSize,
Eligibilities: castEligibilities(eligibilities),
},
},
)
}

func castEligibilities(proofs map[types.LayerID][]types.VotingEligibility) []*pb.ProposalEligibility {
rst := make([]*pb.ProposalEligibility, 0, len(proofs))
for lid, eligs := range proofs {
rst = append(rst, &pb.ProposalEligibility{
Layer: lid.Uint32(),
Count: uint32(len(eligs)),
})
}
return rst
}

func EmitProposal(layer types.LayerID, proposal types.ProposalID) {
const help = "Published proposal. Rewards will be received, once proposal is included into the block."
emitUserEvent(
help,
false,
&pb.Event_Proposal{
Proposal: &pb.EventProposal{
Layer: layer.Uint32(),
Proposal: proposal[:],
},
},
)
}

func emitUserEvent(help string, failure bool, details pb.IsEventDetails) {
mu.RLock()
defer mu.RUnlock()
if reporter != nil {
if err := reporter.eventsEmitter.Emit(UserEvent{Event: &pb.Event{
Timestamp: timestamppb.New(time.Now()),
Help: help,
Failure: failure,
Details: details,
}}); err != nil {
log.With().Error("failed to emit event", log.Err(err))
}
}
}
Loading

0 comments on commit 97607df

Please sign in to comment.