From bf3f1ee3956bdc2fd78bd0f85bec85332d580b8f Mon Sep 17 00:00:00 2001 From: ppedziwiatr Date: Thu, 25 Jul 2024 00:00:06 +0200 Subject: [PATCH] fix: scheduler location cache --- servers/mu/src/domain/api/processMsg.js | 5 +++-- servers/mu/src/domain/api/sendDataItem.js | 10 ++++----- servers/mu/src/domain/clients/sqlite.js | 19 ++++++++++++---- servers/mu/src/domain/clients/worker.js | 3 ++- servers/mu/src/domain/index.js | 9 ++++++-- servers/mu/src/domain/lib/build-tx.js | 7 +++--- servers/mu/src/domain/lib/su-cache.js | 27 +++++++++++++++++++++++ 7 files changed, 63 insertions(+), 17 deletions(-) create mode 100644 servers/mu/src/domain/lib/su-cache.js diff --git a/servers/mu/src/domain/api/processMsg.js b/servers/mu/src/domain/api/processMsg.js index 7df03a2e0..c7e0bbca8 100644 --- a/servers/mu/src/domain/api/processMsg.js +++ b/servers/mu/src/domain/api/processMsg.js @@ -19,9 +19,10 @@ export function processMsgWith ({ logger, writeDataItemArweave, isWallet, - fetchSchedulerProcess + fetchSchedulerProcess, + db }) { - const buildTx = buildTxWith({ buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet }) + const buildTx = buildTxWith({ buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet, db }) const getCuAddress = getCuAddressWith({ selectNode, logger }) const writeMessage = writeMessageTxWith({ writeDataItem, logger, writeDataItemArweave }) const pullResult = pullResultWith({ fetchResult, logger }) diff --git a/servers/mu/src/domain/api/sendDataItem.js b/servers/mu/src/domain/api/sendDataItem.js index 36493eec0..255b968ba 100644 --- a/servers/mu/src/domain/api/sendDataItem.js +++ b/servers/mu/src/domain/api/sendDataItem.js @@ -6,6 +6,7 @@ import { pullResultWith } from '../lib/pull-result.js' import { parseDataItemWith } from '../lib/parse-data-item.js' import { verifyParsedDataItemWith } from '../lib/verify-parsed-data-item.js' import { writeProcessTxWith } from '../lib/write-process-tx.js' +import { getSchedulerOwner, setSchedulerOwner } from '../lib/su-cache.js' /** * Forward along the DataItem to the SU, @@ -23,7 +24,8 @@ export function sendDataItemWith ({ crank, logger, fetchSchedulerProcess, - writeDataItemArweave + writeDataItemArweave, + db }) { const verifyParsedDataItem = verifyParsedDataItemWith() const parseDataItem = parseDataItemWith({ createDataItem, logger }) @@ -34,8 +36,6 @@ export function sendDataItemWith ({ const locateProcessLocal = fromPromise(locateProcess) - const schedLocationCache = new Map() - /** * If the data item is a Message, then cranking and tracing * must also be performed. @@ -99,7 +99,7 @@ export function sendDataItemWith ({ return Rejected({ res }) } const schedulerTag = res.dataItem.tags.find((tag) => tag.name === 'Scheduler') - schedLocationCache.set(res.dataItem.id, schedulerTag.value) + setSchedulerOwner(db, res.dataItem.id, schedulerTag.value) return Resolved() }) .bichain(({ res }) => { @@ -129,7 +129,7 @@ export function sendDataItemWith ({ schedLocation and it will get sent directly to Arweave */ - return locateProcessLocal(ctx.dataItem.target, schedLocationCache.get(ctx.dataItem.target)) + return locateProcessLocal(ctx.dataItem.target, getSchedulerOwner(db, ctx.dataItem.target)) .chain((schedLocation) => sendMessage({ ...ctx, schedLocation })) } return sendProcess(ctx) diff --git a/servers/mu/src/domain/clients/sqlite.js b/servers/mu/src/domain/clients/sqlite.js index c4e529722..e7442a494 100644 --- a/servers/mu/src/domain/clients/sqlite.js +++ b/servers/mu/src/domain/clients/sqlite.js @@ -15,6 +15,14 @@ const createTasks = async (db) => db.prepare( ) WITHOUT ROWID;` ).run() +const createSchedulerOwner = async (db) => db.exec( + `CREATE TABLE IF NOT EXISTS scheduler_owner ( + process_id TEXT UNIQUE NOT NULL, + owner TEXT NOT NULL + ); + ` +) + let internalSqliteDb export async function createSqliteClient ({ url, bootstrap = false, walLimit = bytes.parse('100mb') }) { if (internalSqliteDb) return internalSqliteDb @@ -32,13 +40,16 @@ export async function createSqliteClient ({ url, bootstrap = false, walLimit = b if (stat && stat.size > walLimit) db.pragma('wal_checkpoint(RESTART)') }), 5000).unref() - await Promise.resolve() - .then(() => createTasks(db)) + await createTasks(db) + await createSchedulerOwner(db) + /* await Promise.resolve() + .then(() => createTasks(db)) */ } return { - query: async ({ sql, parameters }) => db.prepare(sql).all(...parameters), - run: async ({ sql, parameters }) => db.prepare(sql).run(...parameters), + query: ({ sql, parameters }) => db.prepare(sql).all(...parameters), + run: ({ sql, parameters }) => db.prepare(sql).run(...parameters), + get: ({ sql, parameters }) => db.prepare(sql).get(...parameters), transaction: async (statements) => db.transaction( (statements) => statements.map(({ sql, parameters }) => db.prepare(sql).run(...parameters)) )(statements), diff --git a/servers/mu/src/domain/clients/worker.js b/servers/mu/src/domain/clients/worker.js index 9112059a3..1ec8f0c5c 100644 --- a/servers/mu/src/domain/clients/worker.js +++ b/servers/mu/src/domain/clients/worker.js @@ -199,7 +199,8 @@ const processResult = processResultWith({ enqueueResults, processMsg, processSpawn, - processAssign + processAssign, + db }) const processResults = processResultsWith({ diff --git a/servers/mu/src/domain/index.js b/servers/mu/src/domain/index.js index 3bd7f1dbf..a10923e53 100644 --- a/servers/mu/src/domain/index.js +++ b/servers/mu/src/domain/index.js @@ -155,7 +155,8 @@ export const createApis = async (ctx) => { crank, isWallet: gatewayClient.isWalletWith({ fetch, histogram, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: sendDataItemLogger }), logger: sendDataItemLogger, - writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: sendDataItemLogger, fetch, histogram }) + writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: sendDataItemLogger, fetch, histogram }), + db }) const sendAssignLogger = logger.child('sendAssign') @@ -244,6 +245,9 @@ export const createResultApis = async (ctx) => { const getById = InMemoryClient.getByIdWith({ cache: isWalletCache }) const setById = InMemoryClient.setByIdWith({ cache: isWalletCache }) + const DB_URL = `${ctx.DB_URL}.sqlite` + const db = await SqliteClient.createSqliteClient({ url: DB_URL, bootstrap: false }) + const processMsgLogger = logger.child('processMsg') const processMsg = processMsgWith({ selectNode: cuClient.selectNodeWith({ CU_URL, logger: processMsgLogger }), @@ -256,7 +260,8 @@ export const createResultApis = async (ctx) => { fetchResult: cuClient.resultWith({ fetch: fetchWithCache, histogram, CU_URL, logger: processMsgLogger }), logger, isWallet: gatewayClient.isWalletWith({ fetch, histogram, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: processMsgLogger, setById, getById }), - writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: processMsgLogger, fetch, histogram }) + writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: processMsgLogger, fetch, histogram }), + db }) const processSpawnLogger = logger.child('processSpawn') diff --git a/servers/mu/src/domain/lib/build-tx.js b/servers/mu/src/domain/lib/build-tx.js index d32e17e23..b7f8da08e 100644 --- a/servers/mu/src/domain/lib/build-tx.js +++ b/servers/mu/src/domain/lib/build-tx.js @@ -1,6 +1,7 @@ import { Resolved, fromPromise, of } from 'hyper-async' import z from 'zod' import { checkStage } from '../utils.js' +import { getSchedulerOwner } from './su-cache.js' const ctxSchema = z.object({ tx: z.object({ @@ -13,7 +14,7 @@ const ctxSchema = z.object({ }).passthrough() export function buildTxWith (env) { - let { buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet } = env + let { buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet, db } = env locateProcess = fromPromise(locateProcess) fetchSchedulerProcess = fromPromise(fetchSchedulerProcess) buildAndSign = fromPromise(buildAndSign) @@ -24,7 +25,7 @@ export function buildTxWith (env) { return isWallet(ctx.cachedMsg.processId) .chain( (isWalletId) => { - return locateProcess(ctx.cachedMsg.fromProcessId) + return locateProcess(ctx.cachedMsg.fromProcessId, getSchedulerOwner(db, ctx.cachedMsg.fromProcessId)) .chain( (fromSchedLocation) => fetchSchedulerProcess( ctx.cachedMsg.fromProcessId, @@ -41,7 +42,7 @@ export function buildTxWith (env) { goes straight to Arweave. */ if (isWalletId) { return of({ fromProcessSchedData }) } - return locateProcess(ctx.cachedMsg.processId) + return locateProcess(ctx.cachedMsg.processId, getSchedulerOwner(db, ctx.cachedMsg.processId)) .map((schedLocation) => { return { schedLocation, diff --git a/servers/mu/src/domain/lib/su-cache.js b/servers/mu/src/domain/lib/su-cache.js new file mode 100644 index 000000000..9ab8bb55e --- /dev/null +++ b/servers/mu/src/domain/lib/su-cache.js @@ -0,0 +1,27 @@ +const setSchedulerOwnerQuery = (processId, owner) => ({ + sql: ` + INSERT INTO scheduler_owner(process_id, owner) VALUES(?, ?) + ON CONFLICT (process_id) DO NOTHING; + `, + parameters: [processId, owner] +}) + +export function setSchedulerOwner (db, processId, owner) { + const result = db.run(setSchedulerOwnerQuery(processId, owner)) + console.log(result) +} + +const getSchedulerOwnerQuery = (processId) => ({ + sql: ` + SELECT owner + FROM scheduler_owner + WHERE process_id = ?; + `, + parameters: [processId] +}) + +export function getSchedulerOwner (db, processId) { + const result = db.get(getSchedulerOwnerQuery(processId)) + console.log(result) + return result?.owner +}