-
Notifications
You must be signed in to change notification settings - Fork 3
/
client.go
209 lines (173 loc) · 5.23 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package raft
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
pb "github.com/bbengfort/raft/api/v1beta1"
"github.com/bbengfort/x/peers"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// DefaultRetries specifies the number of times to attempt a commit.
const DefaultRetries = 3
// NewClient creates a new raft client to conect to a quorum.
func NewClient(remote string, options *Config) (client *Client, err error) {
// Create a new configuration from defaults, configuration file, and the
// environment; verify it returning any errors.
config := new(Config)
if err = config.Load(); err != nil {
return nil, err
}
// Update the configuration with the passed in options
if err = config.Update(options); err != nil {
return nil, err
}
// Create the client
client = &Client{config: config}
// Compute the identity
hostname, _ := config.GetName()
if hostname != "" {
client.identity = fmt.Sprintf("%s-%04X", hostname, rand.Intn(0x10000))
} else {
client.identity = fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000))
}
// Connect when the client is created
// NOTE: connection errors still return the client!
// TODO: add synchronization to connect and reconnect
if err = client.connect(remote); err != nil {
return client, err
}
return client, nil
}
// Client maintains network information embedded in the configuration to
// connect to a Raft consensus quorum and make commit requests.
type Client struct {
sync.RWMutex
config *Config
conn *grpc.ClientConn
client pb.RaftClient
identity string
}
//===========================================================================
// Request API
//===========================================================================
// Commit a name and value to the distributed log.
func (c *Client) Commit(name string, value []byte) (entry *pb.LogEntry, err error) {
// Create the request
req := &pb.CommitRequest{Identity: c.identity, Name: name, Value: value}
// Send the request
rep, err := c.send(req, DefaultRetries)
if err != nil {
return nil, err
}
return rep.Entry, nil
}
// Send the commit request, handling redirects for the maximum number of tries.
func (c *Client) send(req *pb.CommitRequest, retries int) (*pb.CommitReply, error) {
// Don't attempt if there are no more retries.
if retries <= 0 {
return nil, ErrRetries
}
// Connect if not connected
if !c.isConnected() {
if err := c.connect(""); err != nil {
return nil, err
}
}
// Create the context
timeout, err := c.config.GetTimeout()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
rep, err := c.client.Commit(ctx, req)
if err != nil {
if retries > 1 {
// If there is an error connecting to the current host, try a
// different host on the network.
if err = c.connect(""); err != nil {
return nil, err
}
return c.send(req, retries-1)
}
return nil, err
}
if !rep.Success {
if rep.Redirect != "" {
// Redirect to the specified leader.
if err := c.connect(rep.Redirect); err != nil {
return nil, err
}
return c.send(req, retries-1)
}
return nil, errors.New(rep.Error)
}
return rep, nil
}
//===========================================================================
// Connection Handlers
//===========================================================================
// Close the connection to the remote host
func (c *Client) close() error {
// Ensure a valid state after close
defer func() {
c.conn = nil
c.client = nil
}()
if c.conn == nil {
return nil
}
return c.conn.Close()
}
// Connect to the remote using the specified timeout. If a remote is not
// specified (e.g. empty string) then a random replica is selected from the
// configuration to connect to.
func (c *Client) connect(remote string) (err error) {
// Close connection if one is already open.
c.close()
// Get the peer by name or select a random peer.
var host *peers.Peer
if host, err = c.selectRemote(remote); err != nil {
return err
}
// Parse timeout from configuration for the connection
var timeout time.Duration
if timeout, err = c.config.GetTimeout(); err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// Connect to the remote's address
addr := host.Endpoint(false)
if c.conn, err = grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
return fmt.Errorf("could not connect to '%s': %s", addr, err.Error())
}
// Create the gRPC client
c.client = pb.NewRaftClient(c.conn)
return nil
}
// Returns true if a client and a connection exist
func (c *Client) isConnected() bool {
return c.client != nil && c.conn != nil
}
// Returns a random remote from the configuration if the remote is not
// specified, otherwise searches for the remote by name.
func (c *Client) selectRemote(remote string) (*peers.Peer, error) {
if remote == "" {
if len(c.config.Peers) == 0 {
return nil, ErrNoNetwork
}
idx := rand.Intn(len(c.config.Peers))
return &c.config.Peers[idx], nil
}
for _, peer := range c.config.Peers {
if peer.Name == remote {
return &peer, nil
}
}
return nil, fmt.Errorf("could not find remote '%s' in configuration", remote)
}