Skip to content

Commit

Permalink
uptimize status updates and serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
cryi committed Jul 28, 2024
1 parent 5ea1d49 commit e6b8a54
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 55 deletions.
51 changes: 27 additions & 24 deletions core/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ import (
)

type peakStatus struct {
Id string `json:"id,omitempty"` // peak instance id
Modules map[string]any `json:"modules,omitempty"`
Nodes map[string]common.NodeStatus `json:"nodes,omitempty"`
Id string `json:"id,omitempty"` // peak instance id
Modules map[string]json.RawMessage `json:"modules,omitempty"`
Nodes map[string]json.RawMessage `json:"nodes,omitempty"`
marshaled []byte `json:"-"`

mtx sync.RWMutex
mtx sync.RWMutex `json:"-"`
}

func newPeakStatus() *peakStatus {
return &peakStatus{
Id: "",
Modules: make(map[string]any),
Nodes: make(map[string]common.NodeStatus),
Modules: make(map[string]json.RawMessage),
Nodes: make(map[string]json.RawMessage),
mtx: sync.RWMutex{},
}
}
Expand All @@ -29,36 +30,38 @@ func (s *peakStatus) SetId(id string) {
s.Id = id
}

func (s *peakStatus) updateMarshaled() {
s.marshaled, _ = json.Marshal(s)
}

func (s *peakStatus) UpdateModuleStatus(id string, status any) {
defer s.updateMarshaled()

s.mtx.Lock()
defer s.mtx.Unlock()

if s.Modules == nil {
s.Modules = make(map[string]any)
marshaled, err := json.Marshal(status)
if err != nil {
slog.Error("failed to marshal module status", "error", err.Error())
return
}
s.Modules[id] = status
s.Modules[id] = marshaled
}

func (s *peakStatus) UpdateNodeStatus(id string, status common.NodeStatus) {
defer s.updateMarshaled()

s.mtx.Lock()
defer s.mtx.Unlock()

if s.Nodes == nil {
s.Nodes = make(map[string]common.NodeStatus)
marshaled, err := json.Marshal(status)
if err != nil {
slog.Error("failed to marshal module status", "error", err.Error())
return
}
s.Nodes[id] = status
s.Nodes[id] = marshaled
}

func (s *peakStatus) ToJSONString() string {
s.mtx.RLock()
defer s.mtx.RUnlock()

resultBytes, err := json.Marshal(s)
if err != nil {
slog.Error("failed to marshal peak status", "error", err.Error())
return "{}"
}
return string(resultBytes)
func (s *peakStatus) String() string {
return string(s.marshaled)
}

type PeakStatusUpdateReportKind string
Expand Down
31 changes: 12 additions & 19 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func registerStatusEndpoint(app *fiber.Group) {
return
}

fmt.Fprintf(w, "data: %v\n\n", status.ToJSONString())
fmt.Fprintf(w, "data: %v\n\n", status.String())
w.Flush()

defer unregisterClient()
Expand All @@ -137,46 +137,39 @@ func registerStatusEndpoint(app *fiber.Group) {
}

func notifyClients() {
serializedStatus := status.ToJSONString()
serializedStatus := status.String()

clients.Each(func(_ uuid.UUID, c *client) {
go c.Send(serializedStatus)
})
}

// TODO: optimize - diffing, module updates, etc
func runStatusUpdatesProcessing(moduleStatusChannel <-chan common.ModuleStatusUpdate) {
func runStatusUpdatesProcessing(statusChannel <-chan common.ModuleStatusUpdate) {
pendingUpdatesChannel := make(chan struct{}, 1)
defer close(pendingUpdatesChannel)
pendingUpdatesCounter := 0

for {
select {
case statusUpdate, ok := <-moduleStatusChannel:
case statusUpdate, ok := <-statusChannel:
if !ok {
return
}
if pendingUpdatesCounter > 10 {
notifyClients()
pendingUpdatesCounter = 0
}

module := statusUpdate.GetModule()
switch statusUpdate := statusUpdate.GetStatusUpdate().(type) {
case *common.NodeStatusUpdate:
status.UpdateNodeStatus(statusUpdate.Id, statusUpdate.Status)
default:
status.UpdateModuleStatus(module, statusUpdate.GetData())
}
pendingUpdatesCounter++
// try insert into pendingUpdatesChannel
select {
case pendingUpdatesChannel <- struct{}{}:
default:
}
case <-pendingUpdatesChannel:
if pendingUpdatesCounter > 0 {
notifyClients()
pendingUpdatesCounter = 0
}
notifyClients()
}
}
}
Expand All @@ -185,10 +178,10 @@ func Run(ctx context.Context, config *configuration.Runtime, app *fiber.Group) e
status.SetId(config.Id)
registerStatusEndpoint(app)

moduleStatusChannel := make(chan common.ModuleStatusUpdate, 100)
go runStatusUpdatesProcessing(moduleStatusChannel)
statusChannel := make(chan common.ModuleStatusUpdate, 100)
go runStatusUpdatesProcessing(statusChannel)

common.StartNodeStatusProviders(ctx, config.Nodes, createModuleStatusChannel("global", moduleStatusChannel))
common.StartNodeStatusProviders(ctx, config.Nodes, createModuleStatusChannel("global", statusChannel))
// modules
for id := range config.Modules {
switch id {
Expand All @@ -199,7 +192,7 @@ func Run(ctx context.Context, config *configuration.Runtime, app *fiber.Group) e
continue
}

err := tezbake.SetupModule(ctx, configuration, app, createModuleStatusChannel(id, moduleStatusChannel))
err := tezbake.SetupModule(ctx, configuration, app, createModuleStatusChannel(id, statusChannel))
if err != nil {
return err
}
Expand All @@ -210,7 +203,7 @@ func Run(ctx context.Context, config *configuration.Runtime, app *fiber.Group) e
continue
}

err := tezpay.SetupModule(ctx, configuration, app, createModuleStatusChannel(id, moduleStatusChannel))
err := tezpay.SetupModule(ctx, configuration, app, createModuleStatusChannel(id, statusChannel))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/providers/tezbake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func GetEmptyStatus() *Status {
return &Status{
Rights: RightsStatus{
Level: 0,
Rights: []*BlockRights{},
Rights: []BlockRights{},
},
Services: common.AplicationServicesStatus{
Applications: make(map[string]common.ApplicationServices),
Expand Down
14 changes: 7 additions & 7 deletions core/providers/tezbake/rights.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type BlockRights struct {
}

type RightsStatus struct {
Level int64 `json:"level"`
Rights []*BlockRights `json:"rights"`
Level int64 `json:"level"`
Rights []BlockRights `json:"rights"`
}

func (s *RightsStatus) Clone() RightsStatus {
Expand Down Expand Up @@ -114,7 +114,7 @@ func getBlockRights(ctx context.Context, block int64) (rights, rights, error) {
return bakingRights, attestationRights, errors.Join(attestationRightsErr, bakingRightsErr)
}

func getBlockRightsFor(ctx context.Context, block int64, bakers []string) (*BlockRights, error) {
func getBlockRightsFor(ctx context.Context, block int64, bakers []string) (BlockRights, error) {
relevantBakingRights, relevantAttestationRights := initRights(bakers)

bakingRights, attestationRights, err := getBlockRights(ctx, block-1)
Expand Down Expand Up @@ -142,13 +142,13 @@ func getBlockRightsFor(ctx context.Context, block int64, bakers []string) (*Bloc
rights[baker] = []int{relevantBakingRights[baker], relevantAttestationRights[baker]}
}

return &BlockRights{
return BlockRights{
Level: block,
Rights: rights,
}, nil
}

func checkRealized(ctx context.Context, rights *BlockRights) (*BlockRights, error) {
func checkRealized(ctx context.Context, rights BlockRights) (BlockRights, error) {
if rights.RealizedChecked {
return rights, nil
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func startRightsStatusProviders(ctx context.Context, bakers []string, blockWindo

status := RightsStatus{
Level: 0,
Rights: []*BlockRights{},
Rights: []BlockRights{},
}

for {
Expand All @@ -254,7 +254,7 @@ func startRightsStatusProviders(ctx context.Context, bakers []string, blockWindo
// get slice of levels to query
minLevel := max(0, block.Level-blockWindow/2)
maxLevel := block.Level + blockWindow/2
newRights := []*BlockRights{}
newRights := []BlockRights{}
lastCachedLevel := int64(0)
for _, right := range status.Rights {
if right.Level < minLevel || right.Level > maxLevel {
Expand Down
8 changes: 4 additions & 4 deletions core/providers/tezpay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ func SetupModule(ctx context.Context, configuration *configuration.TezpayModuleC
}

tezpayStatus := GetEmptyStatus()
tezbakeStatusChannel := make(chan common.StatusUpdate, 100)
tezpayStatusChannel := make(chan common.StatusUpdate, 100)

go func() {
for {
select {
case <-ctx.Done():
return
case statusUpdate := <-tezbakeStatusChannel:
case statusUpdate := <-tezpayStatusChannel:
switch statusUpdate := statusUpdate.(type) {
case *common.ServicesStatusUpdate:
application := statusUpdate.Application
Expand All @@ -77,8 +77,8 @@ func SetupModule(ctx context.Context, configuration *configuration.TezpayModuleC
}
}()

common.StartServiceStatusProviders(ctx, configuration.Applications, tezbakeStatusChannel)
startWalletStatusProviders(ctx, configuration.PayoutWallet, configuration.PayoutWalletPreferences, tezbakeStatusChannel)
common.StartServiceStatusProviders(ctx, configuration.Applications, tezpayStatusChannel)
startWalletStatusProviders(ctx, configuration.PayoutWallet, configuration.PayoutWalletPreferences, tezpayStatusChannel)

return nil
}

0 comments on commit e6b8a54

Please sign in to comment.