Skip to content

Commit

Permalink
Merge pull request #48 from warp-contracts/janekolszak/forwarder
Browse files Browse the repository at this point in the history
Forwarder
  • Loading branch information
janekolszak authored Jun 6, 2023
2 parents 442e0f6 + 18d923b commit ae045fd
Show file tree
Hide file tree
Showing 17 changed files with 668 additions and 24 deletions.
224 changes: 224 additions & 0 deletions src/forward/arweave-fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package forward

import (
"github.com/warp-contracts/syncer/src/utils/config"
"github.com/warp-contracts/syncer/src/utils/model"
"github.com/warp-contracts/syncer/src/utils/monitoring"
"github.com/warp-contracts/syncer/src/utils/task"
"github.com/warp-contracts/syncer/src/utils/tool"
"gorm.io/gorm"
)

// Gets L1 (Arweave) interactions from the DB in batches
// Fills in last_sort_key for each interaction before emiting to the output channel
type ArweaveFetcher struct {
*task.Task
db *gorm.DB

monitor monitoring.Monitor

input chan uint64
Output chan *Payload
}

func NewArweaveFetcher(config *config.Config) (self *ArweaveFetcher) {
self = new(ArweaveFetcher)

self.Output = make(chan *Payload)

self.Task = task.NewTask(config, "arweave-fetcher").
WithSubtaskFunc(self.run)

return
}

func (self *ArweaveFetcher) WithDB(db *gorm.DB) *ArweaveFetcher {
self.db = db
return self
}

func (self *ArweaveFetcher) WithMonitor(monitor monitoring.Monitor) *ArweaveFetcher {
self.monitor = monitor
return self
}

func (self *ArweaveFetcher) WithInputChannel(input chan uint64) *ArweaveFetcher {
self.input = input
return self
}

func (self *ArweaveFetcher) run() (err error) {
for height := range self.input {
// Cache Contract id -> interaction sort key
lastSortKeys := make(map[string]string)

isFirstBatch := true

// Fetch interactions in batches, offset is the batch number
for offset := 0; ; offset++ {
// Fetch interactions in batches
var interactions []*model.Interaction

err = self.db.WithContext(self.Ctx).
Transaction(func(tx *gorm.DB) (err error) {
// Get a batch of L1 interactions
err = self.db.Table(model.TableInteraction).
Where("block_height = ?", height).
Where("source=?", "arweave").
Limit(self.Config.Forwarder.FetcherBatchSize).
Offset(offset * self.Config.Forwarder.FetcherBatchSize).

// FIXME: -----------------> IS THIS ORDERING CORRECT? <-----------------
// This is the order DRE gets L1 interactions
Order("sort_key ASC").
Find(&interactions).
Error
if err != nil {
return
}
if len(interactions) == 0 {
return
}

// Update last_sort_key for each interaction in the database
// As a optimization lastSortKeys are cached in memory
lastSortKeys, err = self.updateLastSortKey(tx, interactions, height, lastSortKeys)
if err != nil {
return
}

// Update sync height
return self.updateSyncedHeight(tx, height)
})
if err != nil {
self.Log.WithError(err).Error("Failed to fetch interactions from DB")
return
}

if len(interactions) == 0 && offset != 0 {
// Edge case: num of interactions is a multiple of batch size
payload := &Payload{First: false, Last: true, Interaction: nil}
self.Output <- payload
} else {
isLastBatch := len(interactions) < self.Config.Forwarder.FetcherBatchSize
for i, interaction := range interactions {
payload := &Payload{
First: isFirstBatch && i == 0,
Last: isLastBatch && i == len(interactions)-1,
Interaction: interaction,
}

// NOTE: Quit only when the whole batch is processed
// That's why we're not waiting for closing of this task
self.Output <- payload
}
}

isFirstBatch = false
}
}
return
}

func (self *ArweaveFetcher) updateSyncedHeight(tx *gorm.DB, height uint64) (err error) {
var state model.State
err = tx.WithContext(self.Ctx).
Where("name = ?", model.SyncedComponentForwarder).
First(&state).
Error
if err != nil {
self.Log.WithError(err).Error("Failed to get state")
return err
}

if state.FinishedBlockHeight < height {
state.FinishedBlockHeight = height

err = tx.Model(&model.State{
Name: model.SyncedComponentForwarder,
}).
Updates(model.State{
FinishedBlockHeight: height,
}).
Error
if err != nil {
self.Log.WithError(err).Error("Failed to update stmonitorate after last block")
self.monitor.GetReport().Contractor.Errors.DbLastTransactionBlockHeight.Inc()
return err
}
}
return
}

func (self *ArweaveFetcher) updateLastSortKey(tx *gorm.DB, interactions []*model.Interaction, height uint64, lastSortKeys map[string]string) (out map[string]string, err error) {
// Get contract ids of fetched interactions
// Omit those that are already in the lastSortKeys map
newContractIds := self.getNewContractIds(interactions, lastSortKeys)

// Get last sort key for each contract
newLastSortKeys, err := self.getLastSortKeys(tx, newContractIds, height)
if err != nil {
return
}

// Merge new LSK into the existing map
out = tool.AppendMap(lastSortKeys, newLastSortKeys)

// Fill in last sort key for each interaction
for _, interaction := range interactions {
interaction.LastSortKey = out[interaction.ContractId]
out[interaction.ContractId] = interaction.SortKey
}

// Update last sort key for each contract
for _, interaction := range interactions {
err = tx.Model(interaction).
Update("last_sort_key", interaction.LastSortKey).
Error
if err != nil {
return
}
}
return
}

func (self *ArweaveFetcher) getNewContractIds(interactions []*model.Interaction, lastSortKeys map[string]string) (out []string) {
contractIds := make(map[string]struct{})
for _, interaction := range interactions {
_, ok := lastSortKeys[interaction.ContractId]
if ok {
// There's already a sort key for this contract id
continue
}
contractIds[interaction.ContractId] = struct{}{}
}

for contractId := range contractIds {
out = append(out, contractId)
}

return
}

func (self *ArweaveFetcher) getLastSortKeys(tx *gorm.DB, contractIds []string, height uint64) (out map[string]string, err error) {
out = make(map[string]string)

// TODO: Receive a dedicated structure, not Interaction
var interactions []*model.Interaction
err = tx.Table(model.TableInteraction).
Select("contract_id, MAX(sort_key) AS sort_key").
Where("contract_id IN ?", contractIds).
Where("block_height < ?", height).
Group("contract_id").
Find(&interactions).
Error
if err != nil {
return
}

for _, interaction := range interactions {
out[interaction.ContractId] = interaction.SortKey
}

return
}
42 changes: 42 additions & 0 deletions src/forward/controller.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package forward

import (
"fmt"

"github.com/warp-contracts/syncer/src/utils/config"
"github.com/warp-contracts/syncer/src/utils/model"
"github.com/warp-contracts/syncer/src/utils/monitoring"
monitor_forwarder "github.com/warp-contracts/syncer/src/utils/monitoring/forwarder"
"github.com/warp-contracts/syncer/src/utils/publisher"
"github.com/warp-contracts/syncer/src/utils/task"
)

Expand Down Expand Up @@ -32,10 +35,49 @@ func NewController(config *config.Config) (self *Controller, err error) {
WithMonitor(monitor).
WithInitStartHeight(db)

// Fetches L1 interactions from the DB every time the block height changes
fetcher := NewArweaveFetcher(config).
WithDB(db).
WithMonitor(monitor).
WithInputChannel(sequencer.Output)

// Gets L2 interactions (just the needed fields) through Postgres notifications, parses and passes further
interactionStreamer := NewInteractionStreamer(config).
WithMonitor(monitor)

// Joins L1 and L2 interactions.
// L1 interactions take over the output chanel
joiner := task.NewJoiner[*Payload](config, "l1-l2-joiner").
WithInputChannel(fetcher.Output).
WithInputChannel(interactionStreamer.Output)

// Publish to all redis instances
redisMapper := redisMapper(config).
WithInputChannel(joiner.Output)

redisDuplicator := task.NewDuplicator[*model.InteractionNotification](config, "redis-duplicator").
WithOutputChannels(len(config.Redis), 0).
WithInputChannel(redisMapper.Output)

redisPublishers := make([]*task.Task, 0, len(config.Redis))
for i := range config.Redis {
redisPublisher := publisher.NewRedisPublisher[*model.InteractionNotification](config, config.Redis[i], fmt.Sprintf("interaction-redis-publisher-%d", i)).
WithChannelName(config.Forwarder.PublisherRedisChannelName).
WithMonitor(monitor).
WithInputChannel(redisDuplicator.NextChannel())
redisPublishers = append(redisPublishers, redisPublisher.Task)
}

// Setup everything, will start upon calling Controller.Start()
self.Task.
WithSubtask(sequencer.Task).
WithSubtask(monitor.Task).
WithSubtask(joiner.Task).
WithSubtask(fetcher.Task).
WithSubtask(interactionStreamer.Task).
WithSubtaskSlice(redisPublishers).
WithSubtask(redisDuplicator.Task).
WithSubtask(redisMapper.Task).
WithSubtask(server.Task)
return
}
79 changes: 79 additions & 0 deletions src/forward/interaction-streamer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package forward

import (
"encoding/json"
"errors"

"github.com/warp-contracts/syncer/src/utils/config"
"github.com/warp-contracts/syncer/src/utils/model"
"github.com/warp-contracts/syncer/src/utils/monitoring"
"github.com/warp-contracts/syncer/src/utils/streamer"
"github.com/warp-contracts/syncer/src/utils/task"
)

// Produces current syncer's height
type InteractionStreamer struct {
*task.Task

streamer *streamer.Streamer
monitor monitoring.Monitor

// Current Syncer's block height
Output chan *Payload
}

func NewInteractionStreamer(config *config.Config) (self *InteractionStreamer) {
self = new(InteractionStreamer)

self.Output = make(chan *Payload)

self.streamer = streamer.NewStreamer(config, "interaction-stream").
WithNotificationChannelName("interactions").
WithCapacity(10)

self.Task = task.NewTask(config, "interaction").
// Live source of interactions
WithSubtask(self.streamer.Task).
// Parse and pass the interaction
WithSubtaskFunc(self.run)

return
}

func (self *InteractionStreamer) WithMonitor(monitor monitoring.Monitor) *InteractionStreamer {
self.monitor = monitor
return self
}

func (self *InteractionStreamer) run() (err error) {
for {
select {
case <-self.Ctx.Done():
self.Log.Debug("Stop passing sync state")
return nil
case msg, ok := <-self.streamer.Output:
if !ok {
self.Log.Error("Streamer closed, can't receive sequencer's state changes!")
return nil
}

var interaction model.Interaction
err = json.Unmarshal([]byte(msg), &interaction)
if err != nil {
self.Log.WithError(err).Error("Failed to unmarshal sequencer sync state")
return
}

payload := &Payload{
Interaction: &interaction,
}

// Pass the interaction to the output channel
select {
case <-self.Ctx.Done():
return errors.New("InteractionStreamer stopped")
case self.Output <- payload:
}
}
}
}
17 changes: 17 additions & 0 deletions src/forward/payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package forward

import "github.com/warp-contracts/syncer/src/utils/model"

type Payload struct {
First bool
Last bool
Interaction *model.Interaction
}

func (self *Payload) IsFirst() bool {
return self.First
}

func (self *Payload) IsLast() bool {
return self.Last
}
Loading

0 comments on commit ae045fd

Please sign in to comment.