-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
340 lines (297 loc) · 11.8 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
const axios = require('axios');
const sqlite3 = require('sqlite3').verbose();
const fs = require('fs');
const { promisify } = require('util');
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const restAddress = "http://tasty.seipex.fi:1317"
const NUM_WORKERS = 3;
const API_KEY = 'a48f0d74';
const BATCH_SIZE = 100;
const MAX_RETRIES = 3;
const RETRY_DELAY = 1000;
let logStream;
let db;
// Set up logging
function setupLogging() {
logStream = fs.createWriteStream('logs/data_collection.log', { flags: 'a' }); // Update log directory to 'logs'
}
function log(message) {
const timestamp = new Date().toISOString();
const logMessage = `${timestamp} - ${message}\n`;
console.log(logMessage.trim());
if (logStream && !logStream.destroyed) {
logStream.write(logMessage);
}
}
// Set up database
async function setupDatabase() {
return new Promise((resolve, reject) => {
db = new sqlite3.Database('./data/smart_contracts.db', (err) => { // Update database directory to 'data'
if (err) {
log(`Error opening database: ${err.message}`);
reject(err);
} else {
log('Connected to the SQLite database.');
resolve();
}
});
});
}
// Create tables
async function createTables() {
const tables = [
`CREATE TABLE IF NOT EXISTS code_infos (
code_id INTEGER PRIMARY KEY,
creator TEXT,
data_hash TEXT,
instantiate_permission TEXT
)`,
`CREATE TABLE IF NOT EXISTS contracts (
address TEXT PRIMARY KEY,
code_id INTEGER,
FOREIGN KEY (code_id) REFERENCES code_infos (code_id)
)`,
`CREATE TABLE IF NOT EXISTS contract_history (
address TEXT,
operation TEXT,
code_id INTEGER,
msg TEXT,
FOREIGN KEY (address) REFERENCES contracts (address)
)`,
`CREATE TABLE IF NOT EXISTS contract_info (
address TEXT PRIMARY KEY,
code_id INTEGER,
creator TEXT,
admin TEXT,
label TEXT,
ibc_port_id TEXT,
FOREIGN KEY (address) REFERENCES contracts (address)
)`,
`CREATE TABLE IF NOT EXISTS indexer_progress (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_processed_code_id INTEGER,
last_processed_block_height INTEGER
)`
];
for (const table of tables) {
await promisify(db.run).bind(db)(table);
log(`Table created successfully.`);
}
}
// Create an axios instance with the API key
function createApiInstance() {
return axios.create({
baseURL: restAddress,
headers: {
'x-api-key': API_KEY
}
});
}
// Helper function to insert data into the database in batches
async function batchInsert(table, dataArray) {
if (dataArray.length === 0) return;
const keys = Object.keys(dataArray[0]);
const placeholders = dataArray.map(() => `(${keys.map(() => '?').join(',')})`).join(',');
const sql = `INSERT OR REPLACE INTO ${table} (${keys.join(',')}) VALUES ${placeholders}`;
const values = dataArray.flatMap(obj => keys.map(key => obj[key]));
await promisify(db.run).bind(db)(sql, values);
log(`Inserted ${dataArray.length} rows into ${table}`);
}
// Function to get the last processed state
async function getLastProcessedState() {
const result = await promisify(db.get).bind(db)('SELECT * FROM indexer_progress WHERE id = 1');
return result || { last_processed_code_id: 0, last_processed_block_height: 0 };
}
// Function to update the last processed state
async function updateLastProcessedState(codeId, blockHeight) {
await promisify(db.run).bind(db)(
'INSERT OR REPLACE INTO indexer_progress (id, last_processed_code_id, last_processed_block_height) VALUES (1, ?, ?)',
[codeId, blockHeight]
);
}
// Modified worker function to handle incremental updates
async function workerFunction(workerId, codeInfos, startBlockHeight) {
const api = createApiInstance();
const contractsBatch = [];
const contractHistoryBatch = [];
const contractInfoBatch = [];
for (const [codeInfoIndex, codeInfo] of codeInfos.entries()) {
try {
log(`Worker ${workerId}: Processing code info ${codeInfoIndex + 1}/${codeInfos.length} (ID: ${codeInfo.code_id})`);
let nextKey = null;
let totalContracts = 0;
do {
const response = await retryOperation(() => api.get(`/cosmwasm/wasm/v1/code/${codeInfo.code_id}/contracts`, {
params: { 'pagination.key': nextKey }
}));
const data = response.data;
log(`Worker ${workerId}: Received ${data.contracts.length} contracts for code ID ${codeInfo.code_id}. Total so far: ${totalContracts + data.contracts.length}`);
for (const [contractIndex, contractAddress] of data.contracts.entries()) {
totalContracts++;
log(`Worker ${workerId}: Processing contract ${contractIndex + 1}/${data.contracts.length} (Total: ${totalContracts}) for code ID ${codeInfo.code_id}`);
// Check if contract already exists and skip if it does
const existingContract = await promisify(db.get).bind(db)('SELECT * FROM contracts WHERE address = ?', [contractAddress]);
if (existingContract) {
log(`Worker ${workerId}: Contract ${contractAddress} already exists. Skipping.`);
continue;
}
contractsBatch.push({ address: contractAddress, code_id: codeInfo.code_id });
// Fetch contract history
const historyResponse = await retryOperation(() => api.get(`/cosmwasm/wasm/v1/contract/${contractAddress}/history`));
for (const entry of historyResponse.data.entries) {
contractHistoryBatch.push({
address: contractAddress,
operation: entry.operation,
code_id: entry.code_id,
msg: entry.msg
});
}
// Fetch contract info
const infoResponse = await retryOperation(() => api.get(`/cosmwasm/wasm/v1/contract/${contractAddress}`));
const contractInfo = infoResponse.data.contract_info;
contractInfoBatch.push({
address: contractAddress,
code_id: contractInfo.code_id,
creator: contractInfo.creator,
admin: contractInfo.admin,
label: contractInfo.label,
ibc_port_id: contractInfo.ibc_port_id
});
// Perform batch inserts when batch size reaches BATCH_SIZE
if (contractsBatch.length >= BATCH_SIZE) {
await batchInsert('contracts', contractsBatch);
contractsBatch.length = 0;
}
if (contractHistoryBatch.length >= BATCH_SIZE) {
await batchInsert('contract_history', contractHistoryBatch);
contractHistoryBatch.length = 0;
}
if (contractInfoBatch.length >= BATCH_SIZE) {
await batchInsert('contract_info', contractInfoBatch);
contractInfoBatch.length = 0;
}
// Update the last processed state after each contract
await updateLastProcessedState(codeInfo.code_id, startBlockHeight);
}
nextKey = data.pagination.next_key;
log(`Worker ${workerId}: Processed ${totalContracts} contracts for code ID ${codeInfo.code_id} so far. Next key: ${nextKey || 'None'}`);
} while (nextKey);
log(`Worker ${workerId}: Finished processing all contracts for code ID ${codeInfo.code_id}. Total processed: ${totalContracts}`);
} catch (error) {
log(`Worker ${workerId}: Error processing code info ${codeInfo.code_id}: ${error.message}`);
if (error.response) {
log(`Worker ${workerId}: Response status: ${error.response.status}`);
log(`Worker ${workerId}: Response data: ${JSON.stringify(error.response.data)}`);
}
}
}
// Insert any remaining items in batches
if (contractsBatch.length > 0) await batchInsert('contracts', contractsBatch);
if (contractHistoryBatch.length > 0) await batchInsert('contract_history', contractHistoryBatch);
if (contractInfoBatch.length > 0) await batchInsert('contract_info', contractInfoBatch);
parentPort.postMessage('done');
}
// Retry operation with exponential backoff
async function retryOperation(operation, maxRetries = MAX_RETRIES) {
for (let i = 0; i < maxRetries; i++) {
try {
return await operation();
} catch (error) {
if (i === maxRetries - 1) throw error;
await new Promise(resolve => setTimeout(resolve, RETRY_DELAY * Math.pow(2, i)));
}
}
}
// Modified main function to handle incremental updates
async function main() {
if (isMainThread) {
setupLogging();
log('Starting data collection process...');
try {
await setupDatabase();
await createTables();
const api = createApiInstance();
// Get the last processed state
const { last_processed_code_id, last_processed_block_height } = await getLastProcessedState();
log(`Resuming from Code ID: ${last_processed_code_id}, Block Height: ${last_processed_block_height}`);
// Fetch the current block height
const latestBlockResponse = await retryOperation(() => api.get('/cosmos/base/tendermint/v1beta1/blocks/latest'));
const currentBlockHeight = parseInt(latestBlockResponse.data.block.header.height);
// Fetch all code infos after the last processed code_id
const codeInfos = [];
let nextKey = null;
do {
const response = await retryOperation(() => api.get('/cosmwasm/wasm/v1/code', {
params: {
'pagination.key': nextKey,
'pagination.reverse': false,
'pagination.limit': 100
}
}));
const data = response.data;
const newCodeInfos = data.code_infos
.filter(info => parseInt(info.code_id) > last_processed_code_id)
.map(info => ({
code_id: info.code_id,
creator: info.creator,
data_hash: info.data_hash,
instantiate_permission: JSON.stringify(info.instantiate_permission)
}));
codeInfos.push(...newCodeInfos);
nextKey = data.pagination.next_key;
log(`Fetched ${codeInfos.length} new code infos so far. Next key: ${nextKey || 'None'}`);
} while (nextKey);
log(`Fetched a total of ${codeInfos.length} new code infos. Starting processing with workers.`);
// Insert new code infos in batches
for (let i = 0; i < codeInfos.length; i += BATCH_SIZE) {
const batch = codeInfos.slice(i, i + BATCH_SIZE);
await batchInsert('code_infos', batch);
}
// Divide work among workers
const chunkSize = Math.ceil(codeInfos.length / NUM_WORKERS);
const workers = [];
for (let i = 0; i < NUM_WORKERS; i++) {
const start = i * chunkSize;
const end = start + chunkSize;
const worker = new Worker(__filename, {
workerData: {
workerId: i,
codeInfos: codeInfos.slice(start, end),
startBlockHeight: currentBlockHeight
}
});
workers.push(worker);
}
// Wait for all workers to complete
await Promise.all(workers.map(worker => new Promise((resolve) => {
worker.on('message', resolve);
})));
// Update the last processed state after all workers are done
if (codeInfos.length > 0) {
const lastCodeId = codeInfos[codeInfos.length - 1].code_id;
await updateLastProcessedState(lastCodeId, currentBlockHeight);
}
log('Data collection completed successfully.');
} catch (error) {
log(`An unexpected error occurred: ${error.message}`);
log(error.stack);
} finally {
if (db) {
await promisify(db.close).bind(db)();
log('Database connection closed.');
}
if (logStream) {
logStream.end();
}
}
} else {
// Worker thread code
await setupDatabase();
await workerFunction(workerData.workerId, workerData.codeInfos, workerData.startBlockHeight);
}
}
main().catch(error => {
console.error(`Unhandled error in main: ${error.message}`);
console.error(error.stack);
process.exit(1);
});