Skip to content

Commit

Permalink
TT-493 Tradelogs-backfill-main-program-server-api (#10)
Browse files Browse the repository at this point in the history
* (TT-493) Tradelogs-backfill-main-program-server-api

* Small change
  • Loading branch information
Haiss2 authored Apr 21, 2023
1 parent c82a734 commit 1d6525c
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 18 deletions.
30 changes: 27 additions & 3 deletions cmd/tradelogs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
"os"

libapp "github.com/KyberNetwork/tradelogs/internal/app"
"github.com/KyberNetwork/tradelogs/internal/bigquery"
"github.com/KyberNetwork/tradelogs/internal/dbutil"
"github.com/KyberNetwork/tradelogs/internal/evmlistenerclient"
"github.com/KyberNetwork/tradelogs/internal/parser/kyberswap"
"github.com/KyberNetwork/tradelogs/internal/parser/paraswap"
"github.com/KyberNetwork/tradelogs/internal/parser/tokenlon"
"github.com/KyberNetwork/tradelogs/internal/parser/zxotc"
"github.com/KyberNetwork/tradelogs/internal/parser/zxrfq"
"github.com/KyberNetwork/tradelogs/internal/server"
backfill "github.com/KyberNetwork/tradelogs/internal/server/backfill"
tradelogs "github.com/KyberNetwork/tradelogs/internal/server/tradelogs"
"github.com/KyberNetwork/tradelogs/internal/storage"
"github.com/KyberNetwork/tradelogs/internal/worker"
"github.com/go-redis/redis/v8"
Expand All @@ -31,6 +33,7 @@ func main() {
app.Flags = append(app.Flags, libapp.RedisFlags()...)
app.Flags = append(app.Flags, libapp.EvmListenerFlags()...)
app.Flags = append(app.Flags, libapp.HTTPServerFlags()...)
app.Flags = append(app.Flags, libapp.BigqueryFlags()...)

if err := app.Run(os.Args); err != nil {
log.Panic(err)
Expand Down Expand Up @@ -76,9 +79,30 @@ func run(c *cli.Context) error {
return err
}

http := server.New(l, s, c.String(libapp.HTTPServerFlag.Name))
backfillWorker, err := bigquery.NewWorker(
libapp.BigqueryProjectIDFFromCli(c),
s,
kyberswap.MustNewParser(),
zxotc.MustNewParser(),
zxrfq.MustNewParser(),
tokenlon.MustNewParser(),
paraswap.MustNewParser(),
)
if err != nil {
l.Errorw("Error while init backfillWorker")
return err
}

httpBackfill := backfill.New(c.String(libapp.HTTPBackfillServerFlag.Name), backfillWorker)
go func() {
if err := httpBackfill.Run(); err != nil {
panic(err)
}
}()

httpTradelogs := tradelogs.New(l, s, c.String(libapp.HTTPServerFlag.Name))
go func() {
if err := http.Run(); err != nil {
if err := httpTradelogs.Run(); err != nil {
panic(err)
}
}()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/stretchr/testify v1.8.1
github.com/urfave/cli v1.22.5
go.uber.org/zap v1.24.0
google.golang.org/api v0.114.0
)

require (
Expand Down Expand Up @@ -61,7 +62,6 @@ require (
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.114.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230330154414-c0448cd141ea // indirect
google.golang.org/grpc v1.54.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions internal/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,16 @@ var HTTPServerFlag = cli.StringFlag{
Value: "localhost:8080",
}

var HTTPBackfillServerFlag = cli.StringFlag{
Name: "backfill-server-address",
Usage: "Run the rest for backfill server",
EnvVar: "BACKFILL_SERVER_ADDRESS",
Value: "localhost:8081",
}

func HTTPServerFlags() []cli.Flag {
return []cli.Flag{
HTTPServerFlag,
HTTPBackfillServerFlag,
}
}
19 changes: 12 additions & 7 deletions internal/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (w *Worker) queryDateData(
AND block_timestamp >= TIMESTAMP_SECONDS(@minTime)
AND block_timestamp <= TIMESTAMP_SECONDS(@maxTime)
AND ARRAY_LENGTH(topics) > 0
AND topics[OFFSET(0)] IN @topics
AND topics[OFFSET(0)] IN UNNEST(@topics)
ORDER BY
block_timestamp DESC
LIMIT @limit
Expand Down Expand Up @@ -157,6 +157,11 @@ func (w *Worker) parseLog(row BQLog) (storage.TradeLog, error) {
return ps.Parse(ehtLog, uint64(row.BlockTimestamp.Unix()))
}

func endOfDay(t time.Time) time.Time {
y, m, d := t.Date()
return time.Date(y, m, d, 23, 59, 59, 1e9-1, t.Location())
}

func (w *Worker) run(minTime, maxTime time.Time) {
l := w.l.With("minTime", minTime, "maxTime", maxTime)
l.Info("Start running worker")
Expand All @@ -166,7 +171,7 @@ func (w *Worker) run(minTime, maxTime time.Time) {
w.state = stateStopped
}()

temp := maxTime
temp := endOfDay(maxTime)
offset := int64(0)
minTs, maxTs := minTime.Unix(), maxTime.Unix()
for temp.After(minTime) {
Expand All @@ -187,7 +192,7 @@ func (w *Worker) run(minTime, maxTime time.Time) {
"err", err, "temp", temp, "offset", offset)
return
}
l.Infow("Successfully get logsFromRowIterator", "temp", temp, "count", count, "tradelogs", tradelogs)
l.Infow("Successfully get logsFromRowIterator", "temp", temp, "count", count)

err = w.storage.Insert(tradelogs)
if err != nil {
Expand All @@ -212,7 +217,7 @@ func (w *Worker) BackFillAllData() error {
return ErrWorkerRunning
}

go w.run(minBlockTime, time.Now())
go w.run(minBlockTime, time.Now().UTC())
return nil
}

Expand All @@ -223,14 +228,14 @@ func (w *Worker) BackFillPartialData(fromTime, toTime int64) error {
}

// minTime = max(fromTime, minBlockTime)
minTime := time.Unix(fromTime, 0)
minTime := time.Unix(fromTime, 0).UTC()
if minTime.Before(minBlockTime) {
minTime = minBlockTime
}

// maxTime = min(toTime, now)
now := time.Now()
maxTime := time.Unix(toTime, 0)
now := time.Now().UTC()
maxTime := time.Unix(toTime, 0).UTC()
if maxTime.After(now) {
maxTime = now
}
Expand Down
23 changes: 16 additions & 7 deletions internal/bigquery/types.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
package bigquery

import (
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

type BQLog struct {
Index uint `bigquery:"log_index"`
Index int64 `bigquery:"log_index"`
TxHash string `bigquery:"transaction_hash"`
TxIndex uint `bigquery:"transaction_index"`
TxIndex int64 `bigquery:"transaction_index"`
Address string `bigquery:"address"`
Data string `bigquery:"data"`
Topics []string `bigquery:"topics"`
BlockTimestamp time.Time `bigquery:"block_timestamp"`
BlockNumber uint64 `bigquery:"block_number"`
BlockNumber int64 `bigquery:"block_number"`
BlockHash string `bigquery:"block_hash"`
}

func hex2Bytes(raw string) []byte {
if strings.HasPrefix(raw, "0x") {
return common.Hex2Bytes(raw[2:])
}

return common.Hex2Bytes(raw)
}

func (log BQLog) ToEthLog() types.Log {
topics := make([]common.Hash, len(log.Topics))
for id, topic := range log.Topics {
Expand All @@ -28,11 +37,11 @@ func (log BQLog) ToEthLog() types.Log {
return types.Log{
Address: common.HexToAddress(log.Address),
Topics: topics,
Data: common.Hex2Bytes(log.Data),
BlockNumber: log.BlockNumber,
Data: hex2Bytes(log.Data),
BlockNumber: uint64(log.BlockNumber),
TxHash: common.HexToHash(log.TxHash),
TxIndex: log.TxIndex,
TxIndex: uint(log.TxIndex),
BlockHash: common.HexToHash(log.BlockHash),
Index: log.Index,
Index: uint(log.Index),
}
}
90 changes: 90 additions & 0 deletions internal/server/backfill/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package server

import (
"fmt"
"net/http"
"time"

"github.com/KyberNetwork/tradelogs/internal/bigquery"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)

// Server to serve the service.
type Server struct {
l *zap.SugaredLogger
r *gin.Engine
bq *bigquery.Worker
bindAddr string
}

// New returns a new server.
func New(bindAddr string, bq *bigquery.Worker) *Server {
engine := gin.New()
engine.Use(gin.Recovery())

server := &Server{
l: zap.S(),
r: engine,
bq: bq,
bindAddr: bindAddr,
}

gin.SetMode(gin.ReleaseMode)
server.register()

return server
}

// Run runs server.
func (s *Server) Run() error {
if err := s.r.Run(s.bindAddr); err != nil {
return fmt.Errorf("run server: %w", err)
}

return nil
}

func (s *Server) register() {
pprof.Register(s.r, "/debug")
s.r.POST("/backfill", s.backfill)

}

func responseErr(c *gin.Context, err error) {
c.JSON(http.StatusBadRequest, gin.H{
"success": false,
"error": err.Error(),
})
}

func (s *Server) backfill(c *gin.Context) {
var (
query struct {
FromTime int64 `form:"from_time" json:"from_time,omitempty"` // milliseconds
ToTime int64 `form:"to_time" json:"to_time,omitempty"` // milliseconds
}
err = c.ShouldBind(&query)
)
if err != nil {
responseErr(c, err)
return
}

s.l.Infow("Request backfill", "query", query)
if query.FromTime == 0 || query.ToTime == 0 {
err = s.bq.BackFillAllData()
} else {
err = s.bq.BackFillPartialData(query.FromTime/1000, query.ToTime/1000)
}
if err != nil {
responseErr(c, err)
return
}

c.JSON(http.StatusOK, gin.H{
"success": true,
"time": time.Now().UnixMilli(),
})
}
File renamed without changes.

0 comments on commit 1d6525c

Please sign in to comment.