diff --git a/event_fetcher.go b/event_fetcher.go index 09d21c6..58c6593 100644 --- a/event_fetcher.go +++ b/event_fetcher.go @@ -13,6 +13,25 @@ import ( // A function to customize the transaction builder type OverflowEventFetcherOption func(*OverflowEventFetcherBuilder) +type ProgressReaderWriter interface { + /// can return 0 if we do not have any progress thus far + ReadProgress() (int64, error) + WriteProgress(progress int64) error +} + +type InMemoryProgressKeeper struct { + Progress int64 +} + +func (self *InMemoryProgressKeeper) ReadProgress() (int64, error) { + return self.Progress, nil +} + +func (self *InMemoryProgressKeeper) WriteProgress(progress int64) error { + self.Progress = progress + return nil +} + // OverflowEventFetcherBuilder builder to hold info about eventhook context. type OverflowEventFetcherBuilder struct { OverflowState *OverflowState @@ -21,6 +40,7 @@ type OverflowEventFetcherBuilder struct { EndAtCurrentHeight bool EndIndex uint64 ProgressFile string + ProgressRW ProgressReaderWriter NumberOfWorkers int EventBatchSize uint64 ReturnWriterFunction bool @@ -96,6 +116,13 @@ func (o *OverflowState) FetchEventsWithResult(opts ...OverflowEventFetcherOption } e.FromIndex = oldHeight } + } else if e.ProgressRW != nil { + oldHeight, err := e.ProgressRW.ReadProgress() + if err != nil { + res.Error = fmt.Errorf("could not parse progress file as block height %v", err) + return res + } + e.FromIndex = oldHeight } endIndex := e.EndIndex @@ -148,10 +175,25 @@ func (o *OverflowState) FetchEventsWithResult(opts ...OverflowEventFetcherOption } } - progressWriter := func() error { - return writeProgressToFile(e.ProgressFile, endIndex+1) - } if e.ProgressFile != "" { + + progressWriter := func() error { + return writeProgressToFile(e.ProgressFile, int64(endIndex+1)) + } + if e.ReturnWriterFunction { + res.ProgressWriteFunction = progressWriter + } else { + err := progressWriter() + if err != nil { + res.Error = fmt.Errorf("could not write progress to file %v", err) + return res + } + } + } else if e.ProgressRW != nil { + progressWriter := func() error { + return e.ProgressRW.WriteProgress(int64(endIndex + 1)) + } + if e.ReturnWriterFunction { res.ProgressWriteFunction = progressWriter } else { @@ -263,6 +305,16 @@ func WithTrackProgressIn(fileName string) OverflowEventFetcherOption { } } +// track what block we have read since last run in a file +func WithTrackProgress(progressReaderWriter ProgressReaderWriter) OverflowEventFetcherOption { + return func(e *OverflowEventFetcherBuilder) { + e.ProgressRW = progressReaderWriter + e.EndIndex = 0 + e.FromIndex = 0 + e.EndAtCurrentHeight = true + } +} + // track what block we have read since last run in a file func WithReturnProgressWriter() OverflowEventFetcherOption { return func(e *OverflowEventFetcherBuilder) { diff --git a/event_fetcher_integration_test.go b/event_fetcher_integration_test.go index de883dd..afe1a1e 100644 --- a/event_fetcher_integration_test.go +++ b/event_fetcher_integration_test.go @@ -116,6 +116,32 @@ func TestIntegrationEventFetcher(t *testing.T) { assert.Equal(t, float64(10), marshalTo.BlockEventData.Amount) }) + t.Run("Fetch last write progress in memory that exists and marshal events", func(t *testing.T) { + + imr := &InMemoryProgressKeeper{Progress: 1} + + ev, err := startOverflowAndMintTokens(t).FetchEvents( + WithEvent("A.0ae53cb6e3f42a79.FlowToken.TokensMinted"), + WithTrackProgress(imr), + ) + assert.NoError(t, err) + assert.Equal(t, 3, len(ev)) + event := ev[0] + + graffleEvent := event.ToGraffleEvent() + + var eventMarshal map[string]interface{} + assert.NoError(t, event.MarshalAs(&eventMarshal)) + assert.NotEmpty(t, eventMarshal) + + autogold.Equal(t, graffleEvent.BlockEventData, autogold.Name("graffle-event")) + var marshalTo MarketEvent + assert.NoError(t, graffleEvent.MarshalAs(&marshalTo)) + assert.Equal(t, float64(10), marshalTo.BlockEventData.Amount) + + assert.Equal(t, int64(9), imr.Progress) + }) + t.Run("Return progress writer ", func(t *testing.T) { progressFile := "progress" diff --git a/utils.go b/utils.go index a802435..c5749fa 100644 --- a/utils.go +++ b/utils.go @@ -66,7 +66,7 @@ func exists(path string) (bool, error) { return true, err } -func writeProgressToFile(fileName string, blockHeight uint64) error { +func writeProgressToFile(fileName string, blockHeight int64) error { err := os.WriteFile(fileName, []byte(fmt.Sprintf("%d", blockHeight)), 0644)