forked from omni-network/omni
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.go
287 lines (243 loc) · 8.08 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package relayer
import (
"context"
"net/http"
"time"
"github.com/omni-network/omni/lib/cchain"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/ethclient"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/xchain"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// startMonitoring starts the monitoring goroutines.
func startMonitoring(ctx context.Context, network netconf.Network, xprovider xchain.Provider,
cprovider cchain.Provider, addr common.Address, rpcClients map[uint64]ethclient.Client) {
for _, srcChain := range network.Chains {
go monitorHeightsForever(ctx, srcChain, cprovider, rpcClients[srcChain.ID])
if srcChain.IsOmniConsensus {
// Below monitors only apply to EVM chains.
continue
}
go monitorAccountForever(ctx, addr, srcChain.Name, rpcClients[srcChain.ID])
for _, dstChain := range network.EVMChains() {
if srcChain.ID == dstChain.ID {
continue
}
go monitorOffsetsForever(ctx, srcChain.ID, dstChain.ID, srcChain.Name, dstChain.Name, xprovider)
}
}
go monitorConsOffsetForever(ctx, network, xprovider)
}
// monitorConsOffsetsForever blocks and periodically monitors the emitted
// offsets for a given consensus chain.
// Note that submitted offsets are not monitored as the consensus chain doesn't support submissions.
func monitorConsOffsetForever(ctx context.Context, network netconf.Network, xprovider xchain.Provider) {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := monitorConsOffsetOnce(ctx, network, xprovider)
if ctx.Err() != nil {
return
} else if err != nil {
log.Error(ctx, "Monitoring consensus stream offsets failed (will retry)", err)
continue
}
}
}
}
func monitorConsOffsetOnce(ctx context.Context, network netconf.Network, xprovider xchain.Provider) error {
cChain, ok := network.OmniConsensusChain()
if !ok {
return nil
}
// Consensus chain messages are broadacst, so query for each EVM chain.
for _, destChain := range network.EVMChains() {
emitted, ok, err := xprovider.GetEmittedCursor(ctx, cChain.ID, destChain.ID)
if err != nil {
return err
} else if !ok {
return nil
}
emitCursor.WithLabelValues(cChain.Name, destChain.Name).Set(float64(emitted.Offset))
}
return nil
}
// monitorHeightsForever blocks and periodically monitors the latest/safe/final heads
// and halo attested height of the given chain.
func monitorHeightsForever(ctx context.Context, chain netconf.Chain,
cprovider cchain.Provider, client ethclient.Client,
) {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// First get attested head (so it can't be lower than heads).
attested, err := getAttested(ctx, chain.ID, chain.DeployHeight, cprovider)
// then get chain heads (so it is always higher than attested).
var heads map[ethclient.HeadType]uint64
if chain.IsOmniConsensus {
heads = getConsXHead(ctx, cprovider)
} else {
heads = getEVMHeads(ctx, client)
}
// Then populate gauges "at the same time" so they update "atomically".
if err != nil {
log.Error(ctx, "Monitoring attested failed (will retry)", err,
"chain", chain.Name)
} else {
attestedHeight.WithLabelValues(chain.Name).Set(float64(attested))
}
for typ, head := range heads {
headHeight.WithLabelValues(chain.Name, typ.String()).Set(float64(head))
}
stratHeight, ok := heads[ethclient.HeadType(chain.FinalizationStrat)]
if ok {
headHeight.WithLabelValues(chain.Name, "xfinal").Set(float64(stratHeight))
}
}
}
}
// getConsXHead returns the latest XBlock height for the consensus chain.
// This is equivalent to the latest validator set id.
func getConsXHead(ctx context.Context, cprovider cchain.Provider) map[ethclient.HeadType]uint64 {
xblock, ok, err := cprovider.XBlock(ctx, 0, true)
if err != nil || !ok {
return nil
}
return map[ethclient.HeadType]uint64{
"xfinal": xblock.BlockHeight,
}
}
func getEVMHeads(ctx context.Context, client ethclient.Client) map[ethclient.HeadType]uint64 {
heads := []ethclient.HeadType{
ethclient.HeadLatest,
ethclient.HeadSafe,
ethclient.HeadFinalized,
}
resp := make(map[ethclient.HeadType]uint64)
for _, typ := range heads {
head, err := client.HeaderByType(ctx, typ)
if err != nil {
// Not all chains support all types, so just swallow the errors, this is best effort monitoring.
continue
}
resp[typ] = head.Number.Uint64()
}
return resp
}
// monitorAttestedOnce monitors of the latest attested height by chain.
func getAttested(ctx context.Context, chainID uint64, deployHeight uint64, cprovider cchain.Provider) (uint64, error) {
att, ok, err := cprovider.LatestAttestation(ctx, chainID)
if err != nil {
return 0, errors.Wrap(err, "latest attestation")
} else if !ok {
return deployHeight - 1, nil
}
return att.BlockHeader.BlockHeight, nil
}
// monitorAccountsForever blocks and periodically monitors the relayer accounts
// for the given chain.
func monitorAccountForever(ctx context.Context, addr common.Address, chainName string, client ethclient.Client) {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := monitorAccountOnce(ctx, addr, chainName, client)
if ctx.Err() != nil {
return
} else if err != nil {
log.Error(ctx, "Monitoring account failed (will retry)", err,
"chain", chainName)
continue
}
}
}
}
// monitorAccountOnce monitors the relayer account for the given chain.
func monitorAccountOnce(ctx context.Context, addr common.Address, chainName string, client ethclient.Client) error {
balance, err := client.BalanceAt(ctx, addr, nil)
if err != nil {
return errors.Wrap(err, "balance at")
}
nonce, err := client.NonceAt(ctx, addr, nil)
if err != nil {
return errors.Wrap(err, "nonce at")
}
bf, _ := balance.Float64()
bf /= params.Ether
accountBalance.WithLabelValues(chainName).Set(bf)
accountNonce.WithLabelValues(chainName).Set(float64(nonce))
return nil
}
// monitorOffsetsForever blocks and periodically monitors the emitted and submitted
// offsets for a given source and destination chain.
func monitorOffsetsForever(ctx context.Context, src, dst uint64, srcChain, dstChain string,
xprovider xchain.Provider) {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := monitorOffsetsOnce(ctx, src, dst, srcChain, dstChain, xprovider)
if ctx.Err() != nil {
return
} else if err != nil {
log.Error(ctx, "Monitoring stream offsets failed (will retry)", err,
"src_chain", srcChain, "dst_chain", dstChain)
continue
}
}
}
}
// monitorOffsetsOnce monitors the emitted and submitted offsets for a given source and
// destination chain.
func monitorOffsetsOnce(ctx context.Context, src, dst uint64, srcChain, dstChain string,
xprovider xchain.Provider) error {
emitted, ok, err := xprovider.GetEmittedCursor(ctx, src, dst)
if err != nil {
return err
} else if !ok {
return nil
}
submitted, _, err := xprovider.GetSubmittedCursor(ctx, dst, src)
if err != nil {
return err
}
emitCursor.WithLabelValues(srcChain, dstChain).Set(float64(emitted.Offset))
submitCursor.WithLabelValues(srcChain, dstChain).Set(float64(submitted.Offset))
return nil
}
// serveMonitoring starts a goroutine that serves the monitoring API. It
// returns a channel that will receive an error if the server fails to start.
func serveMonitoring(address string) <-chan error {
errChan := make(chan error)
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
srv := &http.Server{
Addr: address,
ReadHeaderTimeout: 5 * time.Second,
IdleTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
Handler: mux,
}
errChan <- errors.Wrap(srv.ListenAndServe(), "serve monitoring")
}()
return errChan
}