Skip to content

Commit

Permalink
replaced conn pool with a more generic one; added more conf usages
Browse files Browse the repository at this point in the history
  • Loading branch information
0xluk committed Mar 4, 2024
1 parent 5247c79 commit 467c207
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 35 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ go 1.20

require (
github.com/ardanlabs/conf v1.5.0
github.com/buraksezer/connpool v0.6.0
github.com/cloudflare/circl v1.3.7
github.com/cockroachdb/pebble v1.1.0
github.com/google/go-cmp v0.6.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/pkg/errors v0.9.1
github.com/qubic/go-node-connector v0.3.1
github.com/silenceper/pool v1.0.0
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
google.golang.org/grpc v1.61.1
Expand Down Expand Up @@ -40,6 +40,7 @@ require (
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/net v0.20.0 // indirect
Expand Down
9 changes: 7 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/buraksezer/connpool v0.6.0 h1:NnTWkd3OH3BAn4qbeI+Ks1XDzU0DQRgOfF+SxsUMdtU=
github.com/buraksezer/connpool v0.6.0/go.mod h1:qPiG7gKXo+EjrwG/yqn2StZM4ek6gcYnnGgFIVKN6b0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -227,14 +225,19 @@ github.com/qubic/go-node-connector v0.3.1/go.mod h1:y0eMsGPY1DFEzz2JovUgEIwBZ/27
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/silenceper/pool v1.0.0 h1:JTCaA+U6hJAA0P8nCx+JfsRCHMwLTfatsm5QXelffmU=
github.com/silenceper/pool v1.0.0/go.mod h1:3DN13bqAbq86Lmzf6iUXWEPIWFPOSYVfaoceFvilKKI=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -375,6 +378,7 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down Expand Up @@ -531,6 +535,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
95 changes: 64 additions & 31 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ import (
"encoding/json"
"fmt"
"github.com/ardanlabs/conf"
"github.com/buraksezer/connpool"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/rpc"
"github.com/qubic/go-archiver/store"
"github.com/qubic/go-archiver/validator"
"github.com/silenceper/pool"
"io"
"log"
"math/rand"
"net"
"net/http"
"os"
"os/signal"
Expand All @@ -38,12 +37,16 @@ func run() error {
ReadTimeout time.Duration `conf:"default:5s"`
WriteTimeout time.Duration `conf:"default:5s"`
ShutdownTimeout time.Duration `conf:"default:5s"`
HttpHost string `conf:"0.0.0.0:8000"`
GrpcHost string `conf:"0.0.0.0:8001"`
}
Qubic struct {
NodeIp string `conf:"default:212.51.150.253"`
NodePort string `conf:"default:21841"`
FallbackTick uint64 `conf:"default:12543674"`
BatchSize uint64 `conf:"default:500"`
NodeIp string `conf:"default:212.51.150.253"`
NodePort string `conf:"default:21841"`
FallbackTick uint64 `conf:"default:12710000"`
BatchSize uint64 `conf:"default:500"`
NodeFetcherEndpoint string `conf:"default:http://127.0.0.1:8080/peers"`
StorageFolder string `conf:"default:store"`
}
}

Expand Down Expand Up @@ -73,24 +76,39 @@ func run() error {
}
log.Printf("main: Config :\n%v\n", out)

db, err := pebble.Open("store", &pebble.Options{})
db, err := pebble.Open(cfg.Qubic.StorageFolder, &pebble.Options{})
if err != nil {
log.Fatalf("err opening pebble: %s", err.Error())
}
defer db.Close()

ps := store.NewPebbleStore(db, nil)

rpcServer := rpc.NewServer("0.0.0.0:8001", "0.0.0.0:8000", ps, nil)
rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, ps, nil)
rpcServer.Start()

shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

ticker := time.NewTicker(5 * time.Second)

cf := &connectionFactory{nodeFetcherHost: "http://127.0.0.1:8080/peers"}
p, err := connpool.NewChannelPool(5, 30, cf.connect)
cf := &connectionFactory{nodeFetcherHost: cfg.Qubic.NodeFetcherEndpoint}
//p, err := connpool.NewChannelPool(5, 30, cf.connect)
//if err != nil {
// return errors.Wrap(err, "creating new connection pool")
//}

poolConfig := pool.Config{
InitialCap: 5,
MaxIdle: 20,
MaxCap: 30,
Factory: cf.connect,
Close: cf.close,
//Ping: ping,
//The maximum idle time of the connection, the connection exceeding this time will be closed, which can avoid the problem of automatic failure when connecting to EOF when idle
IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(&poolConfig)
if err != nil {
return errors.Wrap(err, "creating new connection pool")
}
Expand All @@ -102,43 +120,45 @@ func run() error {
case <-ticker.C:
err := do(p, cfg.Qubic.FallbackTick, cfg.Qubic.BatchSize, ps)
if err != nil {
log.Printf("do err: %s", err.Error())
log.Printf("do err: %s. pool len: %d\n", err.Error(), p.Len())
}
}
}
}

func do(pool connpool.Pool, fallbackTick, batchSize uint64, ps *store.PebbleStore) error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
func do(pool pool.Pool, fallbackTick, batchSize uint64, ps *store.PebbleStore) error {
//ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
//defer cancel()

conn, err := pool.Get(ctx)
v, err := pool.Get()
if err != nil {
return errors.Wrap(err, "getting initial connection")
}

err = validateMultiple(conn, fallbackTick, batchSize, ps)
client := v.(*qubic.Client)
err = validateMultiple(client, fallbackTick, batchSize, ps)
if err != nil {
if pc, ok := conn.(*connpool.PoolConn); ok {
fmt.Printf("Marking conn: %s unusable\n", pc.Conn.RemoteAddr().String())
pc.MarkUnusable()
cErr := pool.Close(v)
if cErr != nil {
log.Printf("Closing conn failed: %s", cErr.Error())
}
return errors.Wrap(err, "validating multiple")
} else {
err = pool.Put(client)
if err != nil {
log.Printf("Putting conn back to pool failed: %s", err.Error())
}
}
log.Printf("Batch completed, continuing to next one")

log.Println("Batch completed")

return nil
}

func validateMultiple(conn net.Conn, fallbackStartTick uint64, batchSize uint64, ps *store.PebbleStore) error {
func validateMultiple(client *qubic.Client, fallbackStartTick uint64, batchSize uint64, ps *store.PebbleStore) error {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()

client, err := qubic.NewClientWithConn(ctx, conn)
if err != nil {
return errors.Wrap(err, "creating qubic client")
}

val := validator.NewValidator(client, ps)
tickInfo, err := client.GetTickInfo(ctx)
if err != nil {
Expand Down Expand Up @@ -194,15 +214,26 @@ type connectionFactory struct {
nodeFetcherHost string
}

func (cf *connectionFactory) connect() (net.Conn, error) {
func (cf *connectionFactory) connect() (interface{}, error) {
peer, err := getNewRandomPeer(cf.nodeFetcherHost)
if err != nil {
return nil, errors.Wrap(err, "getting new random peer")
}
fmt.Printf("connecting to: %s\n", peer)
return net.DialTimeout("tcp", net.JoinHostPort(peer, "21841"), 5*time.Second)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

client, err := qubic.NewClient(ctx, peer, "21841")
if err != nil {
return nil, errors.Wrap(err, "creating qubic client")
}

fmt.Printf("connected to: %s\n", peer)
return client, nil
}

func (cf *connectionFactory) close(v interface{}) error { return v.(*qubic.Client).Close() }

type response struct {
Peers []string `json:"peers"`
Length int `json:"length"`
Expand All @@ -226,7 +257,9 @@ func getNewRandomPeer(host string) (string, error) {
return "", errors.Wrap(err, "unmarshalling response")
}

fmt.Printf("Got %d new peers\n", len(resp.Peers))
peer := resp.Peers[rand.Intn(len(resp.Peers))]

fmt.Printf("Got %d new peers. Selected random %s\n", len(resp.Peers), peer)

return resp.Peers[rand.Intn(len(resp.Peers))], nil
return peer, nil
}
2 changes: 1 addition & 1 deletion validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (v *Validator) ValidateTick(ctx context.Context, tickNumber uint64) error {
}

if len(quorumVotes) == 0 {
return errors.New("not quorum votes fetched")
return errors.New("no quorum votes fetched")
}

//getting computors from storage, otherwise get it from a node
Expand Down

0 comments on commit 467c207

Please sign in to comment.