Skip to content

Commit

Permalink
fix: scheduler location cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ppedziwiatr committed Jul 24, 2024
1 parent d50662c commit bf3f1ee
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 17 deletions.
5 changes: 3 additions & 2 deletions servers/mu/src/domain/api/processMsg.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ export function processMsgWith ({
logger,
writeDataItemArweave,
isWallet,
fetchSchedulerProcess
fetchSchedulerProcess,
db
}) {
const buildTx = buildTxWith({ buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet })
const buildTx = buildTxWith({ buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet, db })
const getCuAddress = getCuAddressWith({ selectNode, logger })
const writeMessage = writeMessageTxWith({ writeDataItem, logger, writeDataItemArweave })
const pullResult = pullResultWith({ fetchResult, logger })
Expand Down
10 changes: 5 additions & 5 deletions servers/mu/src/domain/api/sendDataItem.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { pullResultWith } from '../lib/pull-result.js'
import { parseDataItemWith } from '../lib/parse-data-item.js'
import { verifyParsedDataItemWith } from '../lib/verify-parsed-data-item.js'
import { writeProcessTxWith } from '../lib/write-process-tx.js'
import { getSchedulerOwner, setSchedulerOwner } from '../lib/su-cache.js'

/**
* Forward along the DataItem to the SU,
Expand All @@ -23,7 +24,8 @@ export function sendDataItemWith ({
crank,
logger,
fetchSchedulerProcess,
writeDataItemArweave
writeDataItemArweave,
db
}) {
const verifyParsedDataItem = verifyParsedDataItemWith()
const parseDataItem = parseDataItemWith({ createDataItem, logger })
Expand All @@ -34,8 +36,6 @@ export function sendDataItemWith ({

const locateProcessLocal = fromPromise(locateProcess)

const schedLocationCache = new Map()

/**
* If the data item is a Message, then cranking and tracing
* must also be performed.
Expand Down Expand Up @@ -99,7 +99,7 @@ export function sendDataItemWith ({
return Rejected({ res })
}
const schedulerTag = res.dataItem.tags.find((tag) => tag.name === 'Scheduler')
schedLocationCache.set(res.dataItem.id, schedulerTag.value)
setSchedulerOwner(db, res.dataItem.id, schedulerTag.value)
return Resolved()
})
.bichain(({ res }) => {
Expand Down Expand Up @@ -129,7 +129,7 @@ export function sendDataItemWith ({
schedLocation and it will get sent directly to
Arweave
*/
return locateProcessLocal(ctx.dataItem.target, schedLocationCache.get(ctx.dataItem.target))
return locateProcessLocal(ctx.dataItem.target, getSchedulerOwner(db, ctx.dataItem.target))
.chain((schedLocation) => sendMessage({ ...ctx, schedLocation }))
}
return sendProcess(ctx)
Expand Down
19 changes: 15 additions & 4 deletions servers/mu/src/domain/clients/sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ const createTasks = async (db) => db.prepare(
) WITHOUT ROWID;`
).run()

const createSchedulerOwner = async (db) => db.exec(
`CREATE TABLE IF NOT EXISTS scheduler_owner (
process_id TEXT UNIQUE NOT NULL,
owner TEXT NOT NULL
);
`
)

let internalSqliteDb
export async function createSqliteClient ({ url, bootstrap = false, walLimit = bytes.parse('100mb') }) {
if (internalSqliteDb) return internalSqliteDb
Expand All @@ -32,13 +40,16 @@ export async function createSqliteClient ({ url, bootstrap = false, walLimit = b
if (stat && stat.size > walLimit) db.pragma('wal_checkpoint(RESTART)')
}), 5000).unref()

await Promise.resolve()
.then(() => createTasks(db))
await createTasks(db)
await createSchedulerOwner(db)
/* await Promise.resolve()
.then(() => createTasks(db)) */
}

return {
query: async ({ sql, parameters }) => db.prepare(sql).all(...parameters),
run: async ({ sql, parameters }) => db.prepare(sql).run(...parameters),
query: ({ sql, parameters }) => db.prepare(sql).all(...parameters),
run: ({ sql, parameters }) => db.prepare(sql).run(...parameters),
get: ({ sql, parameters }) => db.prepare(sql).get(...parameters),
transaction: async (statements) => db.transaction(
(statements) => statements.map(({ sql, parameters }) => db.prepare(sql).run(...parameters))
)(statements),
Expand Down
3 changes: 2 additions & 1 deletion servers/mu/src/domain/clients/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ const processResult = processResultWith({
enqueueResults,
processMsg,
processSpawn,
processAssign
processAssign,
db
})

const processResults = processResultsWith({
Expand Down
9 changes: 7 additions & 2 deletions servers/mu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ export const createApis = async (ctx) => {
crank,
isWallet: gatewayClient.isWalletWith({ fetch, histogram, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: sendDataItemLogger }),
logger: sendDataItemLogger,
writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: sendDataItemLogger, fetch, histogram })
writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: sendDataItemLogger, fetch, histogram }),
db
})

const sendAssignLogger = logger.child('sendAssign')
Expand Down Expand Up @@ -244,6 +245,9 @@ export const createResultApis = async (ctx) => {
const getById = InMemoryClient.getByIdWith({ cache: isWalletCache })
const setById = InMemoryClient.setByIdWith({ cache: isWalletCache })

const DB_URL = `${ctx.DB_URL}.sqlite`
const db = await SqliteClient.createSqliteClient({ url: DB_URL, bootstrap: false })

const processMsgLogger = logger.child('processMsg')
const processMsg = processMsgWith({
selectNode: cuClient.selectNodeWith({ CU_URL, logger: processMsgLogger }),
Expand All @@ -256,7 +260,8 @@ export const createResultApis = async (ctx) => {
fetchResult: cuClient.resultWith({ fetch: fetchWithCache, histogram, CU_URL, logger: processMsgLogger }),
logger,
isWallet: gatewayClient.isWalletWith({ fetch, histogram, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: processMsgLogger, setById, getById }),
writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: processMsgLogger, fetch, histogram })
writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: processMsgLogger, fetch, histogram }),
db
})

const processSpawnLogger = logger.child('processSpawn')
Expand Down
7 changes: 4 additions & 3 deletions servers/mu/src/domain/lib/build-tx.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Resolved, fromPromise, of } from 'hyper-async'
import z from 'zod'
import { checkStage } from '../utils.js'
import { getSchedulerOwner } from './su-cache.js'

const ctxSchema = z.object({
tx: z.object({
Expand All @@ -13,7 +14,7 @@ const ctxSchema = z.object({
}).passthrough()

export function buildTxWith (env) {
let { buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet } = env
let { buildAndSign, logger, locateProcess, fetchSchedulerProcess, isWallet, db } = env
locateProcess = fromPromise(locateProcess)
fetchSchedulerProcess = fromPromise(fetchSchedulerProcess)
buildAndSign = fromPromise(buildAndSign)
Expand All @@ -24,7 +25,7 @@ export function buildTxWith (env) {
return isWallet(ctx.cachedMsg.processId)
.chain(
(isWalletId) => {
return locateProcess(ctx.cachedMsg.fromProcessId)
return locateProcess(ctx.cachedMsg.fromProcessId, getSchedulerOwner(db, ctx.cachedMsg.fromProcessId))
.chain(
(fromSchedLocation) => fetchSchedulerProcess(
ctx.cachedMsg.fromProcessId,
Expand All @@ -41,7 +42,7 @@ export function buildTxWith (env) {
goes straight to Arweave.
*/
if (isWalletId) { return of({ fromProcessSchedData }) }
return locateProcess(ctx.cachedMsg.processId)
return locateProcess(ctx.cachedMsg.processId, getSchedulerOwner(db, ctx.cachedMsg.processId))
.map((schedLocation) => {
return {
schedLocation,
Expand Down
27 changes: 27 additions & 0 deletions servers/mu/src/domain/lib/su-cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
const setSchedulerOwnerQuery = (processId, owner) => ({
sql: `
INSERT INTO scheduler_owner(process_id, owner) VALUES(?, ?)
ON CONFLICT (process_id) DO NOTHING;
`,
parameters: [processId, owner]
})

export function setSchedulerOwner (db, processId, owner) {
const result = db.run(setSchedulerOwnerQuery(processId, owner))
console.log(result)
}

const getSchedulerOwnerQuery = (processId) => ({
sql: `
SELECT owner
FROM scheduler_owner
WHERE process_id = ?;
`,
parameters: [processId]
})

export function getSchedulerOwner (db, processId) {
const result = db.get(getSchedulerOwnerQuery(processId))
console.log(result)
return result?.owner
}

0 comments on commit bf3f1ee

Please sign in to comment.