-
Notifications
You must be signed in to change notification settings - Fork 3
/
pool.go
148 lines (121 loc) · 3.81 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package qubic
import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/qubic/go-node-connector/types"
"github.com/silenceper/pool"
"io"
"math/rand"
"net/http"
"time"
)
type PoolConfig struct {
InitialCap int
MaxCap int
MaxIdle int
IdleTimeout time.Duration
NodeFetcherUrl string
NodeFetcherTimeout time.Duration
NodePort string
}
func NewPoolConnection(config PoolConfig) (*Pool, error) {
pcf := newPoolConnectionFactory(config.NodeFetcherTimeout, config.NodeFetcherUrl, config.NodePort)
cfg := pool.Config{
InitialCap: config.InitialCap,
MaxIdle: config.MaxIdle,
MaxCap: config.MaxCap,
Factory: pcf.Connect,
Close: pcf.Close,
//The maximum idle time of the connection, the connection exceeding this time will be closed, which can avoid the problem of automatic failure when connecting to EOF when idle
IdleTimeout: config.IdleTimeout,
}
chPool, err := pool.NewChannelPool(&cfg)
if err != nil {
return nil, errors.Wrap(err, "creating pool")
}
p := Pool{chPool: chPool}
return &p, nil
}
type Pool struct {
chPool pool.Pool
}
func (p *Pool) Get() (*Client, error) {
v, err := p.chPool.Get()
if err != nil {
return nil, errors.Wrap(err, "getting qubic pooled client connection")
}
return v.(*Client), nil
}
func (p *Pool) Put(c *Client) error {
err := p.chPool.Put(c)
if err != nil {
return errors.Wrap(err, "putting qubic pooled client connection")
}
return nil
}
func (p *Pool) Close(c *Client) error {
err := p.chPool.Close(c)
if err != nil {
return errors.Wrap(err, "closing qubic pool")
}
return nil
}
type poolConnectionFactory struct {
nodeFetcherTimeout time.Duration
nodeFetcherUrl string
nodePort string
}
func newPoolConnectionFactory(nodeFetcherTimeout time.Duration, nodeFetcherUrl string, nodePort string) *poolConnectionFactory {
return &poolConnectionFactory{nodeFetcherTimeout: nodeFetcherTimeout, nodeFetcherUrl: nodeFetcherUrl, nodePort: nodePort}
}
func (pcf *poolConnectionFactory) Connect() (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), pcf.nodeFetcherTimeout)
defer cancel()
peer, err := pcf.getNewRandomPeer(ctx)
if err != nil {
return nil, errors.Wrap(err, "getting new random peer")
}
client, err := NewClient(ctx, peer, pcf.nodePort)
if err != nil {
return nil, errors.Wrap(err, "creating qubic client")
}
fmt.Printf("connected to: %s\n", peer)
return client, nil
}
func (pcf *poolConnectionFactory) Close(v interface{}) error { return v.(*Client).Close() }
type statusResponse struct {
MaxTick uint32 `json:"max_tick"`
LastUpdate int64 `json:"last_update"`
ReliableNodes []nodeResponse `json:"reliable_nodes"`
MostReliableNode nodeResponse `json:"most_reliable_node"`
}
type nodeResponse struct {
Address string `json:"address"`
Peers types.PublicPeers `json:"peers"`
LastTick uint32 `json:"last_tick"`
LastUpdate int64 `json:"last_update"`
}
func (pcf *poolConnectionFactory) getNewRandomPeer(ctx context.Context) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, pcf.nodeFetcherUrl, nil)
if err != nil {
return "", errors.Wrap(err, "creating new request")
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return "", errors.Wrap(err, "getting peers from node fetcher")
}
var resp statusResponse
body, err := io.ReadAll(res.Body)
if err != nil {
return "", errors.Wrap(err, "reading response body")
}
err = json.Unmarshal(body, &resp)
if err != nil {
return "", errors.Wrap(err, "unmarshalling response")
}
peer := resp.ReliableNodes[rand.Intn(len(resp.ReliableNodes))]
fmt.Printf("Got %d new peers. Selected random %s\n", len(resp.ReliableNodes), peer.Address)
return peer.Address, nil
}