-
Notifications
You must be signed in to change notification settings - Fork 0
/
realtime_index.js
225 lines (194 loc) · 6.81 KB
/
realtime_index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
const WebSocket = require('ws');
const sqlite3 = require('sqlite3').verbose();
const { promisify } = require('util');
const fs = require('fs');
const path = require('path');
const { handleContract, determineContractType, fetchAllTokensForContracts, fetchTokenOwners } = require('./cw721Helper');
const RPC_WEBSOCKET_URL = 'wss://rpc.sei-apis.com/websocket';
const restAddress = 'http://tasty.seipex.fi:1317'; // REST endpoint for queries
const DB_PATH = path.join(__dirname, 'data', 'smart_contracts.db');
const LOG_FILE = path.join(__dirname, 'logs', 'real_time_indexer.log');
let db;
let logStream;
// Define the log function at the top level
function log(message) {
const timestamp = new Date().toISOString();
const logMessage = `${timestamp} - ${message}\n`;
console.log(logMessage.trim());
if (logStream && !logStream.destroyed) {
logStream.write(logMessage);
}
}
// Set up logging
function setupLogging() {
const logDir = path.dirname(LOG_FILE);
if (!fs.existsSync(logDir)) {
fs.mkdirSync(logDir, { recursive: true });
}
logStream = fs.createWriteStream(LOG_FILE, { flags: 'a' });
}
// Set up database connection
async function setupDatabase() {
return new Promise((resolve, reject) => {
const dbDir = path.dirname(DB_PATH);
if (!fs.existsSync(dbDir)) {
fs.mkdirSync(dbDir, { recursive: true });
}
db = new sqlite3.Database(DB_PATH, (err) => {
if (err) {
log(`Error opening database: ${err.message}`);
reject(err);
} else {
log('Connected to the SQLite database.');
resolve();
}
});
});
}
// Create necessary tables for contracts, tokens, and owners
async function createTables() {
const tables = [
`CREATE TABLE IF NOT EXISTS contracts (
address TEXT PRIMARY KEY,
admin TEXT,
creator TEXT,
type TEXT DEFAULT 'UNKNOWN'
)`,
`CREATE TABLE IF NOT EXISTS contract_info (
address TEXT PRIMARY KEY,
type TEXT DEFAULT 'UNKNOWN'
)`,
`CREATE TABLE IF NOT EXISTS nft_owners (
collection_address TEXT,
token_id TEXT,
owner TEXT,
PRIMARY KEY (collection_address, token_id)
)`,
`CREATE TABLE IF NOT EXISTS contract_tokens (
contract_address TEXT PRIMARY KEY,
admin TEXT,
creator TEXT,
contract_type TEXT,
extra_data TEXT
)`
];
for (const table of tables) {
try {
await promisify(db.run).bind(db)(table);
} catch (error) {
log(`Error creating/updating table: ${error.message}`);
}
}
log('Tables created/updated successfully.');
}
// Fetch all contracts without a known type
async function fetchContractsWithoutType() {
const sql = "SELECT address FROM contracts WHERE type = 'UNKNOWN'";
const contracts = await promisify(db.all).bind(db)(sql);
return contracts.map(contract => contract.address);
}
// Update contract type in the database
async function updateContractType(contractAddress, contractType) {
const sql = "UPDATE contracts SET type = ? WHERE address = ?";
await promisify(db.run).bind(db)(sql, [contractType, contractAddress]);
const sqlInfo = "UPDATE contract_info SET type = ? WHERE address = ?";
await promisify(db.run).bind(db)(sqlInfo, [contractType, contractAddress]);
log(`Updated contract ${contractAddress} with type ${contractType}`);
}
// Step 1: Retroactively determine contract types using the test query
async function determineContractTypes() {
log('Determining contract types...');
const contracts = await fetchContractsWithoutType();
for (const contractAddress of contracts) {
log(`Checking contract type for ${contractAddress}...`);
try {
const contractType = await determineContractType(restAddress, contractAddress);
if (contractType) {
await updateContractType(contractAddress, contractType);
log(`Contract ${contractAddress} is labeled as ${contractType}`);
} else {
log(`Could not determine contract type for ${contractAddress}.`);
}
} catch (error) {
log(`Error determining contract type for ${contractAddress}: ${error.message}`);
}
}
log('Finished determining contract types.');
}
// Step 2: Iterate over all CW721 contracts to query token IDs and owners
async function processCW721Contracts() {
const sql = "SELECT address FROM contracts WHERE type = 'CW721'";
const cw721Contracts = await promisify(db.all).bind(db)(sql);
for (const contract of cw721Contracts) {
const contractAddress = contract.address;
log(`Querying tokens for CW721 contract ${contractAddress}...`);
try {
const tokens = await handleContract(restAddress, contractAddress, db);
if (tokens.length === 0) {
log(`No tokens found for contract ${contractAddress}.`);
}
} catch (error) {
log(`Error processing CW721 contract ${contractAddress}: ${error.message}`);
}
}
}
// WebSocket connection and subscription
function setupWebSocket() {
const ws = new WebSocket(RPC_WEBSOCKET_URL);
ws.on('open', () => {
log('Connected to WebSocket');
// Subscribe to instantiate contract events
ws.send(JSON.stringify({
jsonrpc: '2.0',
method: 'subscribe',
id: 2,
params: {
query: "tm.event='Tx' AND instantiate._contract_address EXISTS"
}
}));
});
ws.on('message', async (data) => {
try {
const message = JSON.parse(data);
if (message.result && message.result.events) {
const events = message.result.events;
if (events['instantiate._contract_address']) {
const instantiateEvent = {
_contract_address: events['instantiate._contract_address'][0],
code_id: events['instantiate.code_id'][0],
sender: events['message.sender'][0],
admin: events['instantiate.admin'] ? events['instantiate.admin'][0] : '',
label: events['instantiate.label'] ? events['instantiate.label'][0] : ''
};
log(`Detected new contract: ${instantiateEvent._contract_address}`);
await determineContractTypes(); // Run the contract type detection for the new contract
}
}
} catch (error) {
log(`Error processing WebSocket message: ${error.message}`);
}
});
ws.on('close', () => {
log('WebSocket connection closed. Reconnecting...');
setTimeout(setupWebSocket, 5000);
});
ws.on('error', (error) => {
log(`WebSocket error: ${error.message}`);
});
}
async function main() {
try {
setupLogging();
await setupDatabase();
await createTables();
// First, fetch and store token IDs for each CW721 contract
await fetchAllTokensForContracts(restAddress, db);
// After all token IDs are recorded, fetch and store the owner information for each token
await fetchTokenOwners(restAddress, db);
setupWebSocket();
} catch (error) {
log(`Error in main: ${error.message}`);
process.exit(1);
}
}
main();