Skip to content

Commit

Permalink
add retry and ack for broadcast flow
Browse files Browse the repository at this point in the history
  • Loading branch information
linhnt3400 committed Jan 7, 2025
1 parent 8e66fce commit b0b514b
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 30 deletions.
55 changes: 55 additions & 0 deletions v2/internal/server/broadcast_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package server

import (
"log"
"net/http"
"os"
"testing"
"time"

"github.com/KyberNetwork/tradinglib/pkg/httpsign"
"github.com/gorilla/websocket"
)

func TestSubscribe(t *testing.T) {
wsURL := os.Getenv("URL") // Replace with actual WebSocket URL
key := os.Getenv("KEY")
secret := os.Getenv("SECRET")

request, err := http.NewRequest(http.MethodGet, wsURL+"?id=cscv9ubrk77vgbjftu5g", nil)

Check failure on line 19 in v2/internal/server/broadcast_test.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

ineffectual assignment to err (ineffassign)
_, err = httpsign.Sign(request, key, []byte(secret))
if err != nil {
t.Fatal(err)
}

// Establish a connection with the WebSocket server
conn, _, err := websocket.DefaultDialer.Dial(wsURL+"?id=cscv9ubrk77vgbjftu5g", request.Header)
if err != nil {
log.Fatal("Error connecting to WebSocket server:", err)
}
defer conn.Close()

go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {

Check failure on line 35 in v2/internal/server/broadcast_test.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

S1000: should use for range instead of for { select {} } (gosimple)
select {
case <-ticker.C:
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Println("Ping error:", err)
return
}
}
}
}()

for {
// cscv9ubrk77vgbjftu5g
_, message, err := conn.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
return
}
log.Printf("Received: %s", message)
}
}
56 changes: 37 additions & 19 deletions v2/internal/worker/broadcast_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/IBM/sarama"
"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/KyberNetwork/tradelogs/v2/pkg/kafka"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types"
"github.com/gorilla/websocket"
"go.uber.org/zap"
)
Expand All @@ -28,34 +29,48 @@ func (c *Client) run(ctx context.Context, cfg *kafka.Config, topic string) error
}

tradeLogsChan := make(chan *sarama.ConsumerMessage, 100)
ackChan := make(chan bool)

go func() {
err = consumer.Consume(ctx, c.l, topic, tradeLogsChan)
err = consumer.Consume(ctx, c.l, topic, tradeLogsChan, ackChan)
if err != nil {
panic(fmt.Errorf("failed to consume trade logs: %w", err))
}
}()

go func() {
for msg := range tradeLogsChan {
c.broadcast(msg)
}
}()
retry := 3
delay := time.Second * 5
i := 0

return nil
for msg := range tradeLogsChan {
for i = 0; i < retry; i++ {
err = c.broadcast(msg)
if err != nil {
time.Sleep(delay)
continue
}
ackChan <- true
break
}
if i == retry {
ackChan <- false
break
}
}
return fmt.Errorf("broadcast trade logs to client %s failed", c.id)
}

func (c *Client) broadcast(msg *sarama.ConsumerMessage) {
func (c *Client) broadcast(msg *sarama.ConsumerMessage) error {
var newMsg kafka.Message
err := json.Unmarshal(msg.Value, &newMsg)
if err != nil {
c.l.Errorw("error when unmarshal message", "err", err, "msg", string(msg.Value))
return
return err
}
dataBytes, err := json.Marshal(newMsg.Data)
if err != nil {
c.l.Errorw("error when marshal message data", "err", err, "data", newMsg.Data)
return
return err
}

switch newMsg.Type {
Expand All @@ -64,32 +79,35 @@ func (c *Client) broadcast(msg *sarama.ConsumerMessage) {
err = json.Unmarshal(dataBytes, &blocks)
if err != nil {
c.l.Errorw("error when unmarshal reverted blocks", "err", err, "data", string(dataBytes))
return
return err
}
newMsg.Data = blocks
if err = c.ws.WriteJSON(newMsg); err != nil {
c.l.Errorw("error when send msg", "err", err)
return err
}
c.l.Infow("broadcast revert message", "message", newMsg)

case kafka.MessageTypeTradeLog:
var tradelog storage.TradeLog
err = json.Unmarshal(dataBytes, &tradelog)
var tradeLog types.TradeLog
err = json.Unmarshal(dataBytes, &tradeLog)
if err != nil {
c.l.Errorw("error when unmarshal trade log", "err", err, "data", string(dataBytes))
return
return err
}
newMsg.Data = tradelog
if c.match(tradelog) {
newMsg.Data = tradeLog
if c.match(tradeLog) {
if err = c.ws.WriteJSON(newMsg); err != nil {
c.l.Errorw("error when send msg", "err", err)
return err
}
c.l.Infow("broadcast trade log message", "message", newMsg)
}
c.l.Infow("broadcast trade log message", "message", newMsg)
}
return nil
}

func (c *Client) match(log storage.TradeLog) bool {
func (c *Client) match(log types.TradeLog) bool {
params := c.params
if len(params.EventHash) != 0 && !strings.EqualFold(params.EventHash, log.EventHash) {
return false
Expand Down
13 changes: 8 additions & 5 deletions v2/internal/worker/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@ func (b *Broadcaster) NewConn(req RegisterRequest, conn *websocket.Conn) error {

b.addClient(conn, req.ID, req)

err := b.clients[req.ID].run(ctx, b.config, b.topic)
if err != nil {
b.removeClient(cancel, conn, req.ID)
return fmt.Errorf("cannot run client: %w", err)
}
go func() {
err := b.clients[req.ID].run(ctx, b.config, b.topic)
if err != nil {
b.l.Errorw("error when run client", "id", req.ID, "err", err)
b.removeClient(cancel, conn, req.ID)
}
}()

return nil
}

Expand Down
19 changes: 13 additions & 6 deletions v2/pkg/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func NewConsumer(config *Config, consumerGroup string) (*SaramaConsumer, error)
}, nil
}

func (c *SaramaConsumer) Consume(ctx context.Context, l *zap.SugaredLogger, topic string, ch chan<- *sarama.ConsumerMessage) error {
messageHandler := newConsumerGroupHandler(ch)
defer close(ch)
func (c *SaramaConsumer) Consume(ctx context.Context, l *zap.SugaredLogger, topic string, msgCh chan<- *sarama.ConsumerMessage, ackCh chan bool) error {
messageHandler := newConsumerGroupHandler(msgCh, ackCh)
defer close(msgCh)
for {
select {
case <-ctx.Done():
Expand All @@ -52,11 +52,13 @@ func (c *SaramaConsumer) Consume(ctx context.Context, l *zap.SugaredLogger, topi

type consumerGroupHandler struct {
msgChan chan<- *sarama.ConsumerMessage
ackChan chan bool
}

func newConsumerGroupHandler(msgChan chan<- *sarama.ConsumerMessage) *consumerGroupHandler {
func newConsumerGroupHandler(msgChan chan<- *sarama.ConsumerMessage, ackCh chan bool) *consumerGroupHandler {
return &consumerGroupHandler{
msgChan: msgChan,
ackChan: ackCh,
}
}

Expand All @@ -70,8 +72,13 @@ func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
// Push the message to the channel
h.msgChan <- msg

// Acknowledge the message by marking it as processed (commit the offset)
session.MarkMessage(msg, "")
ack := <-h.ackChan
if ack {
// Acknowledge the message by marking it as processed (commit the offset)
session.MarkMessage(msg, "")
} else {
break // stop consume when fail to broadcast message
}
}
return nil
}

0 comments on commit b0b514b

Please sign in to comment.