This repository has been archived by the owner on Jan 27, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
index.js
108 lines (90 loc) · 3.17 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
const http = require('http')
const path = require('path')
const fs = require('fs/promises')
const Nanoeth = require('nanoeth/http')
const GoogleRPC = require('@grpc/grpc-js')
const { vega } = require('@vegaprotocol/vega-grpc')
const { parse } = require('eth-helpers').utils
const crypto = require('./lib/crypto')
const EthTail = require('./lib/eth-tail')
const pc = require('./lib/promisify-callback')
const config = require('rc-toml')('vega-ethereum-event-forwarder')
const logger = require('pino')({
level: process.env.LOG_LEVEL ?? config?.log_level ?? 'trace'
})
if (config.primaryConfig == null) {
logger.fatal('Failed to read config file')
process.exit(1)
}
const db = require('toiletdb')(path.resolve(config.event_queue.datadir, 'db.json'))
logger.info('Starting')
logger.info(`Using primary config file '${config.primaryConfig}'`)
logger.info(`LOG_LEVEL: ${logger.level}`)
let t
const healthcheckHttp = http.createServer((req, res) => {
logger.debug('HTTP Healthcheck')
res
.writeHead(200, {
'Content-Type': 'application/json'
})
.end(JSON.stringify({
status: (!t?.stopped) ?? false
}))
})
logger.info(`Connecting to Vega GRPC on '${config.vega.grpc_endpoint}'`)
/* eslint-disable-next-line new-cap */
const grpc = new vega.api.v1.core_grpc.CoreServiceClient(
config.vega.grpc_endpoint,
GoogleRPC.credentials.createInsecure()
)
healthcheckHttp.listen(config.event_queue.healthcheck_port, config.event_queue.healthcheck_iface)
;(async () => {
await fs.mkdir(path.resolve(config.event_queue.datadir), { recursive: true })
await db.open()
const keypair = await crypto.ensureKey(path.resolve(config.event_queue.secretkey_path))
logger.info(`Using public key: '${keypair.publicKey.toString('hex')}'`)
const eth = new Nanoeth(config.ethereum.http_endpoint)
let erc20BridgeStartHeight = config.ethereum.erc20_bridge.start_height
if (erc20BridgeStartHeight < 0) erc20BridgeStartHeight = parse.number(await latestBlockNumber(eth))
let stakingStartHeight = config.ethereum.staking.start_height
if (stakingStartHeight < 0) stakingStartHeight = parse.number(await latestBlockNumber(eth))
const startHeight = await db.read('checkpoint') ??
Math.min(
erc20BridgeStartHeight,
stakingStartHeight
)
logger.info(`Starting at block: ${startHeight}`)
t = new EthTail({
eth,
startHeight,
confirmations: config.ethereum.confirmations,
stakingAddresses: config.ethereum.staking.addresses.map(function (address) { return address.toLowerCase() }),
stakingStartHeight,
erc20BridgeAddress: config.ethereum.erc20_bridge.address.toLowerCase(),
erc20BridgeStartHeight,
db,
grpc,
keypair,
logger
})
t.start()
process.once('SIGINT', onstop)
process.once('SIGTERM', onstop)
async function onstop () {
if (t.stopped) return
logger.info('Stopping')
await t.stop()
await pc(cb => healthcheckHttp.close(cb))
}
async function latestBlockNumber (eth) {
let retries = 0
while (true) {
retries++
try {
return await eth.blockNumber('latest')
} catch {
if (retries % 5 === 0) logger.warn(`Retried eth_blockNumber('latest') ${retries} times`)
}
}
}
})()