From b0dd6e914447aaad558bc778ca1ba9b54fc24c4c Mon Sep 17 00:00:00 2001 From: InversionSpaces Date: Fri, 13 Oct 2023 11:26:21 +0000 Subject: [PATCH 1/8] Add Logger ability --- gateway/aqua/logger.aqua | 38 +++++++ gateway/aqua/rpc.aqua | 37 ++++--- gateway/src/index.js | 233 ++++++++++++++++++++++----------------- 3 files changed, 188 insertions(+), 120 deletions(-) create mode 100644 gateway/aqua/logger.aqua diff --git a/gateway/aqua/logger.aqua b/gateway/aqua/logger.aqua new file mode 100644 index 0000000..af03a5c --- /dev/null +++ b/gateway/aqua/logger.aqua @@ -0,0 +1,38 @@ +module Logger declares Logger, initPeerLogger + +export LoggerSrv + +import Worker from "@fluencelabs/aqua-lib/subnet.aqua" + +ability Logger: + log(s: []string) + logNum(n: u32) + logCall(s: string) + logWorker(w: Worker) + +service LoggerSrv("logger"): + log(s: []string) + logNum(n: u32) + logCall(s: string) + logWorker(w: Worker) + +-- Create Logger ability that logs +-- on INIT_PEER_ID via HOST_PEER_ID +-- through LoggerSrv +func initPeerLogger() -> Logger: + -- closures do not capture topology here + + log = func (s: []string): + on INIT_PEER_ID via HOST_PEER_ID: + LoggerSrv.log(s) + logNum = func (n: u32): + on INIT_PEER_ID via HOST_PEER_ID: + LoggerSrv.logNum(n) + logCall = func (s: string): + on INIT_PEER_ID via HOST_PEER_ID: + LoggerSrv.logCall(s) + logWorker = func (w: Worker): + on INIT_PEER_ID via HOST_PEER_ID: + LoggerSrv.logWorker(w) + + <- Logger(log=log, logNum=logNum, logCall=logCall, logWorker=logWorker) \ No newline at end of file diff --git a/gateway/aqua/rpc.aqua b/gateway/aqua/rpc.aqua index 2754d32..dec7c0b 100644 --- a/gateway/aqua/rpc.aqua +++ b/gateway/aqua/rpc.aqua @@ -1,21 +1,20 @@ +aqua RPC + import "@fluencelabs/aqua-lib/builtin.aqua" import Subnet, Worker from "@fluencelabs/aqua-lib/subnet.aqua" + import "services.aqua" use "deals.aqua" -export randomLoadBalancingEth, roundRobinEth, quorumEth, Counter, Logger +import Logger, initPeerLogger from "logger.aqua" + +export randomLoadBalancingEth, roundRobinEth, quorumEth, Counter, QuorumChecker data QuorumResult: value: string results: []JsonString error: string -service Logger("logger"): - log(s: []string) - logNum(n: u32) - logCall(s: string) - logWorker(w: Worker) - service NumOp("op"): identity(n: u64) -> i64 @@ -46,7 +45,7 @@ func callOnWorker(worker: Worker, uri: string, method: string, jsonArgs: []strin result <- callFunc(uri, method, jsonArgs) <- result -func randomLoadBalancing(uris: []string, method: string, jsonArgs: []string, callFunc: string, string, []string -> JsonString) -> JsonString: +func randomLoadBalancing{Logger}(uris: []string, method: string, jsonArgs: []string, callFunc: string, string, []string -> JsonString) -> JsonString: on HOST_PEER_ID: workers <- getWorkers() workersNum = workers.length @@ -58,15 +57,18 @@ func randomLoadBalancing(uris: []string, method: string, jsonArgs: []string, cal timeP <- NumOp.identity(Peer.timestamp_sec()) providerNumber = timeP % uris.length provider = uris[providerNumber] + Logger.logWorker(worker) + Logger.logCall(uris[providerNumber]) + Op.noop() -- dirty hack result <- callOnWorker(worker, provider, method, jsonArgs, callFunc) - Logger.logWorker(worker) - Logger.logCall(uris[providerNumber]) + Op.noop() -- dirty hack <- result func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) -> JsonString: - <- randomLoadBalancing(uris, method, jsonArgs, call) + log <- initPeerLogger() + <- randomLoadBalancing{log}(uris, method, jsonArgs, call) -func roundRobin(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string, callFunc: string, string, []string -> JsonString) -> JsonString: +func roundRobin{Logger}(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string, callFunc: string, string, []string -> JsonString) -> JsonString: on counterPeerId: Counter counterServiceId requestNumber <- Counter.incrementAndReturn() @@ -78,18 +80,19 @@ func roundRobin(uris: []string, method: string, jsonArgs: []string, counterServi worker = workers[workerNumber] providerNumber = requestNumber % uris.length provider = uris[providerNumber] + Logger.logWorker(worker) + Logger.logCall(uris[providerNumber]) + Op.noop() -- dirty hack result <<- callOnWorker(worker, provider, method, jsonArgs, callFunc) - Op.noop() -- dirty hack - on INIT_PEER_ID: - Logger.logWorker(worker) - Logger.logCall(uris[providerNumber]) + Op.noop() -- dirty hack else: result <<- JsonString(value = "", success = false, error = error!) <- result! func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string) -> JsonString: - <- roundRobin(uris, method, jsonArgs, counterServiceId, counterPeerId, call) + log <- initPeerLogger() + <- roundRobin{log}(uris, method, jsonArgs, counterServiceId, counterPeerId, call) func quorum( uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string, diff --git a/gateway/src/index.js b/gateway/src/index.js index f3a0917..2cf9889 100644 --- a/gateway/src/index.js +++ b/gateway/src/index.js @@ -5,15 +5,15 @@ import express from "express"; import bodyParser from "body-parser"; import { JSONRPCServer } from "json-rpc-2.0"; -import { Fluence } from '@fluencelabs/js-client'; +import { Fluence } from "@fluencelabs/js-client"; import { - quorumEth, - randomLoadBalancingEth, - registerCounter, - registerLogger, - registerQuorumChecker, - roundRobinEth + quorumEth, + randomLoadBalancingEth, + registerCounter, + registerQuorumChecker, + roundRobinEth, } from "../aqua-compiled/rpc.js"; +import { registerLoggerSrv } from "../aqua-compiled/logger.js"; import { readArguments } from "./arguments.js"; import { readConfig } from "./config.js"; import { methods } from "./methods.js"; @@ -21,17 +21,17 @@ import { methods } from "./methods.js"; const args = readArguments(process.argv.slice(2)); if (args.errors.length > 0) { - console.log(args.help); - args.errors.forEach((err) => console.log(err)); - process.exit(1); + console.log(args.help); + args.errors.forEach((err) => console.log(err)); + process.exit(1); } const { config, errors, help } = readConfig(args.configPath); if (errors.length > 0) { - errors.forEach((err) => console.log(err)); - console.log(help); - process.exit(1); + errors.forEach((err) => console.log(err)); + console.log(help); + process.exit(1); } console.log("Running server..."); @@ -42,127 +42,154 @@ const server = new JSONRPCServer(); // initialize fluence client await Fluence.connect(config.relay); -const peerId = (await Fluence.getClient()).getPeerId() +const peerId = (await Fluence.getClient()).getPeerId(); // handler for logger -registerLogger({ - log: s => { - console.log("log: " + s); - }, - logCall: s => { - console.log("Call will be to : " + s); - }, - logWorker: s => { - console.log("Worker used: " + JSON.stringify(s)); - }, - logNum: s => { - console.log("Number: " + s); - }, -}) +registerLoggerSrv({ + log: (s) => { + console.log("log: " + s); + }, + logCall: (s) => { + console.log("Call will be to : " + s); + }, + logWorker: (s) => { + console.log("Worker used: " + JSON.stringify(s)); + }, + logNum: (s) => { + console.log("Number: " + s); + }, +}); let counter = 0; registerCounter("counter", { - incrementAndReturn: () => { - counter++; - console.log("Counter: " + counter) - return counter; - } -}) + incrementAndReturn: () => { + counter++; + console.log("Counter: " + counter); + return counter; + }, +}); function findSameResults(results, minNum) { - const resultCounts = results.filter((obj) => obj.success).map((obj) => obj.value).reduce(function (i, v) { - if (i[v] === undefined) { - i[v] = 1 - } else { - i[v] = i[v] + 1; - } - return i; + const resultCounts = results + .filter((obj) => obj.success) + .map((obj) => obj.value) + .reduce(function (i, v) { + if (i[v] === undefined) { + i[v] = 1; + } else { + i[v] = i[v] + 1; + } + return i; }, {}); - const getMaxRepeated = Math.max(...Object.values(resultCounts)); - if (getMaxRepeated >= minNum) { - console.log(resultCounts) - const max = Object.entries(resultCounts).find((kv) => kv[1] === getMaxRepeated) - return { - value: max[0], - results: [], - error: "" - } - } else { - return { - error: "No consensus in results", - results: results, - value: "" - } - } + const getMaxRepeated = Math.max(...Object.values(resultCounts)); + if (getMaxRepeated >= minNum) { + console.log(resultCounts); + const max = Object.entries(resultCounts).find( + (kv) => kv[1] === getMaxRepeated + ); + return { + value: max[0], + results: [], + error: "", + }; + } else { + return { + error: "No consensus in results", + results: results, + value: "", + }; + } } -registerQuorumChecker("quorum", - { - check: (ethResults, minQuorum) => { - console.log("Check quorum for:") - console.log(ethResults) - return findSameResults(ethResults, minQuorum) - } - } -) +registerQuorumChecker("quorum", { + check: (ethResults, minQuorum) => { + console.log("Check quorum for:"); + console.log(ethResults); + return findSameResults(ethResults, minQuorum); + }, +}); -const counterServiceId = config.counterServiceId || 'counter' -const counterPeerId = config.counterPeerId || peerId -const quorumServiceId = config.quorumServiceId || 'quorum' -const quorumPeerId = config.quorumPeerId || peerId -const quorumNumber = config.quorumNumber || 2 +const counterServiceId = config.counterServiceId || "counter"; +const counterPeerId = config.counterPeerId || peerId; +const quorumServiceId = config.quorumServiceId || "quorum"; +const quorumPeerId = config.quorumPeerId || peerId; +const quorumNumber = config.quorumNumber || 2; async function methodHandler(reqRaw, method) { - const req = reqRaw.map((s) => JSON.stringify(s)) - console.log(`Receiving request '${method}'`); - let result; - if (!config.mode || config.mode === "random") { - result = await randomLoadBalancingEth(config.providers, method, req); - } else if (config.mode === "round-robin") { - result = await roundRobinEth(config.providers, method, req, counterServiceId, counterPeerId, config.serviceId); - } else if (config.mode === "quorum") { - const quorumResult = await quorumEth(config.providers, quorumNumber, 10000, method, req, quorumServiceId, quorumPeerId, { ttl: 20000 }); - - if (quorumResult.error) { - console.error(`quorum failed: ${quorumResult.error}\n${JSON.stringify(quorumResult.results)}`); - result = { success: false, error: quorumResult.error }; - } else { - result = { success: true, error: quorumResult.error, value: quorumResult.value }; - } + const req = reqRaw.map((s) => JSON.stringify(s)); + console.log(`Receiving request '${method}'`); + let result; + if (!config.mode || config.mode === "random") { + result = await randomLoadBalancingEth(config.providers, method, req); + } else if (config.mode === "round-robin") { + result = await roundRobinEth( + config.providers, + method, + req, + counterServiceId, + counterPeerId, + config.serviceId + ); + } else if (config.mode === "quorum") { + const quorumResult = await quorumEth( + config.providers, + quorumNumber, + 10000, + method, + req, + quorumServiceId, + quorumPeerId, + { ttl: 20000 } + ); + + if (quorumResult.error) { + console.error( + `quorum failed: ${quorumResult.error}\n${JSON.stringify( + quorumResult.results + )}` + ); + result = { success: false, error: quorumResult.error }; + } else { + result = { + success: true, + error: quorumResult.error, + value: quorumResult.value, + }; } + } - if (!result.success) { - throw new Error(result.error); - } + if (!result.success) { + throw new Error(result.error); + } - return JSON.parse(result.value || '{}'); + return JSON.parse(result.value || "{}"); } function addMethod(op) { - server.addMethod(op, async (req) => methodHandler(req, op)); + server.addMethod(op, async (req) => methodHandler(req, op)); } // register all eth methods methods.forEach((m) => { - addMethod(m); -}) + addMethod(m); +}); const app = express(); app.use(bodyParser.json()); // register JSON-RPC handler app.post(route, (req, res) => { - const jsonRPCRequest = req.body; - server.receive(jsonRPCRequest).then((jsonRPCResponse) => { - if (jsonRPCResponse) { - res.json(jsonRPCResponse); - } else { - res.sendStatus(204); - } - }); + const jsonRPCRequest = req.body; + server.receive(jsonRPCRequest).then((jsonRPCResponse) => { + if (jsonRPCResponse) { + res.json(jsonRPCResponse); + } else { + res.sendStatus(204); + } + }); }); app.listen(config.port); -console.log("Server was started on port " + config.port); \ No newline at end of file +console.log("Server was started on port " + config.port); From 5937d8dc2f439a41cc9815d0f13916ec1db15cc2 Mon Sep 17 00:00:00 2001 From: InversionSpaces Date: Mon, 16 Oct 2023 09:09:26 +0000 Subject: [PATCH 2/8] Rewrote random and round-robin --- gateway/aqua/balancer.aqua | 53 +++++++++++++++++ gateway/aqua/counter.aqua | 22 ++++++++ gateway/aqua/eth_rpc.aqua | 18 ++++++ gateway/aqua/provider.aqua | 3 + gateway/aqua/random.aqua | 16 ++++++ gateway/aqua/rpc.aqua | 113 +++++++++++++++---------------------- gateway/aqua/utils.aqua | 4 ++ gateway/src/index.js | 4 +- 8 files changed, 165 insertions(+), 68 deletions(-) create mode 100644 gateway/aqua/balancer.aqua create mode 100644 gateway/aqua/counter.aqua create mode 100644 gateway/aqua/eth_rpc.aqua create mode 100644 gateway/aqua/provider.aqua create mode 100644 gateway/aqua/random.aqua create mode 100644 gateway/aqua/utils.aqua diff --git a/gateway/aqua/balancer.aqua b/gateway/aqua/balancer.aqua new file mode 100644 index 0000000..9e53b5a --- /dev/null +++ b/gateway/aqua/balancer.aqua @@ -0,0 +1,53 @@ +module Balancer declares Balancer, randomBalancer, cycleBalancer + +import Worker from "@fluencelabs/aqua-lib/subnet.aqua" + +import Counter from "counter.aqua" +import Random from "random.aqua" +import Provider from "provider.aqua" + +ability RequestHandler: + request() + +ability WorkersBalancer: + nextWorker() -> Worker + +ability ProviderBalancer: + nextProvider() -> Provider + +ability Balancer: + nextWorker() -> Worker + nextProvider() -> Provider + next() -> Worker, Provider + +func randomBalancer{Random}(workers: []Worker, providers: []Provider) -> Balancer: + nextWorker = func () -> Worker: + rand <- Random.next() + idx = rand % workers.length + <- workers[idx] + + nextProvider = func () -> Provider: + rand <- Random.next() + idx = rand % providers.length + <- providers[idx] + + next = func () -> Worker, Provider: + <- nextWorker(), nextProvider() + + <- Balancer(next=next, nextWorker=nextWorker, nextProvider=nextProvider) + +func cycleBalancer{Counter}(workers: []Worker, providers: []Provider) -> Balancer: + next = func () -> Worker, Provider: + n <- Counter.incrementAndReturn() + idx = n % workers.length + <- workers[idx], providers[idx] + + nextWorker = func () -> Worker: + w, p <- next() + <- w + + nextProvider = func () -> Provider: + w, p <- next() + <- p + + <- Balancer(next=next, nextWorker=nextWorker, nextProvider=nextProvider) \ No newline at end of file diff --git a/gateway/aqua/counter.aqua b/gateway/aqua/counter.aqua new file mode 100644 index 0000000..5f521bd --- /dev/null +++ b/gateway/aqua/counter.aqua @@ -0,0 +1,22 @@ +module Counter declares Counter, onPeerCounter + +export CounterSrv + +ability Counter: + incrementAndReturn() -> u32 + +service CounterSrv("counter"): + incrementAndReturn() -> u32 + +-- Create Counter ability that +-- counts on peer through CounterSrv(id) +func onPeerCounter(peer: string, id: string) -> Counter: + -- closure does not capture topology here + incAndReturn = func () -> u32: + on peer: + CounterSrv id + res <- CounterSrv.incrementAndReturn() + <- res + + <- Counter(incrementAndReturn = incAndReturn) + diff --git a/gateway/aqua/eth_rpc.aqua b/gateway/aqua/eth_rpc.aqua new file mode 100644 index 0000000..5489154 --- /dev/null +++ b/gateway/aqua/eth_rpc.aqua @@ -0,0 +1,18 @@ +module RPCEth declares RPCEth, fromWorkerProvider + +import Worker from "@fluencelabs/aqua-lib/subnet.aqua" + +import "services.aqua" + +import Provider from "provider.aqua" + +ability RPCEth: + call(method: string, jsonArgs: []string) -> JsonString + +func fromWorkerProvider(worker: Worker, provider: Provider) -> RPCEth: + call = func (method: string, jsonArgs: []string) -> JsonString: + on worker.worker_id! via worker.host_id: + res <- EthRpc.eth_call(provider, method, jsonArgs) + <- res + + <- RPCEth(call = call) diff --git a/gateway/aqua/provider.aqua b/gateway/aqua/provider.aqua new file mode 100644 index 0000000..8fc08a4 --- /dev/null +++ b/gateway/aqua/provider.aqua @@ -0,0 +1,3 @@ +module Provider declares Provider + +alias Provider: string \ No newline at end of file diff --git a/gateway/aqua/random.aqua b/gateway/aqua/random.aqua new file mode 100644 index 0000000..97a502b --- /dev/null +++ b/gateway/aqua/random.aqua @@ -0,0 +1,16 @@ +module Random declares Random, timeRandom + +import Peer from "@fluencelabs/aqua-lib/builtin.aqua" + +import NumOp from "utils.aqua" + +ability Random: + next() -> i64 + +func timeRandom() -> Random: + next = func () -> i64: + t <- Peer.timestamp_sec() + n <- NumOp.identity(t) + <- n + + <- Random(next = next) \ No newline at end of file diff --git a/gateway/aqua/rpc.aqua b/gateway/aqua/rpc.aqua index dec7c0b..332f063 100644 --- a/gateway/aqua/rpc.aqua +++ b/gateway/aqua/rpc.aqua @@ -7,96 +7,77 @@ import "services.aqua" use "deals.aqua" import Logger, initPeerLogger from "logger.aqua" +import Balancer, randomBalancer, cycleBalancer from "balancer.aqua" +import onPeerCounter from "counter.aqua" +import timeRandom from "random.aqua" +import RPCEth, fromWorkerProvider from "eth_rpc.aqua" -export randomLoadBalancingEth, roundRobinEth, quorumEth, Counter, QuorumChecker +import NumOp from "utils.aqua" + +export randomLoadBalancingEth, roundRobinEth, quorumEth, QuorumChecker data QuorumResult: value: string results: []JsonString error: string -service NumOp("op"): - identity(n: u64) -> i64 - -service Counter("counter"): - incrementAndReturn() -> u32 - service QuorumChecker("quorum"): check(results: []JsonString, minResults: u32) -> QuorumResult -func empty() -> JsonString: - <- JsonString(value = "", success = true, error = "") - -func call(uri: string, method: string, jsonArgs: []string) -> JsonString: - res <- EthRpc.eth_call(uri, method, jsonArgs) - <- res +func errorJsonString(msg: string) -> JsonString: + <- JsonString(value = "", success = false, error = msg) func getWorkers() -> []Worker, ?string: - deals <- Deals.get() + on INIT_PEER_ID via HOST_PEER_ID: + deals <- Deals.get() dealId = deals.defaultWorker!.dealIdOriginal on HOST_PEER_ID: result <- Subnet.resolve(dealId) - workers = result.workers - error = result.error - <- workers, error + <- result.workers, result.error -func callOnWorker(worker: Worker, uri: string, method: string, jsonArgs: []string, callFunc: string, string, []string -> JsonString) -> JsonString: - on worker.worker_id! via worker.host_id: - result <- callFunc(uri, method, jsonArgs) - <- result +func rpcCall{RPCEth}(method: string, jsonArgs: []string) -> JsonString: + <- RPCEth.call(method, jsonArgs) -func randomLoadBalancing{Logger}(uris: []string, method: string, jsonArgs: []string, callFunc: string, string, []string -> JsonString) -> JsonString: +func balancedEthCall{Logger, Balancer}(method: string, jsonArgs: []string) -> JsonString: on HOST_PEER_ID: - workers <- getWorkers() - workersNum = workers.length - -- choose worker randomly - timeW <- NumOp.identity(Peer.timestamp_sec()) - workerNumber = timeW % workers.length - worker = workers[workerNumber] - -- choose provider randomly - timeP <- NumOp.identity(Peer.timestamp_sec()) - providerNumber = timeP % uris.length - provider = uris[providerNumber] + worker, provider <- Balancer.next() Logger.logWorker(worker) - Logger.logCall(uris[providerNumber]) - Op.noop() -- dirty hack - result <- callOnWorker(worker, provider, method, jsonArgs, callFunc) - Op.noop() -- dirty hack + Logger.logCall(provider) + Op.noop() -- dirty hack + rpc <- fromWorkerProvider(worker, provider) + result <- rpcCall{rpc}(method, jsonArgs) <- result func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) -> JsonString: - log <- initPeerLogger() - <- randomLoadBalancing{log}(uris, method, jsonArgs, call) - -func roundRobin{Logger}(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string, callFunc: string, string, []string -> JsonString) -> JsonString: - on counterPeerId: - Counter counterServiceId - requestNumber <- Counter.incrementAndReturn() - on HOST_PEER_ID: - workers, error <- getWorkers() - result: *JsonString - if error == nil: - workerNumber = requestNumber % workers.length - worker = workers[workerNumber] - providerNumber = requestNumber % uris.length - provider = uris[providerNumber] - Logger.logWorker(worker) - Logger.logCall(uris[providerNumber]) - Op.noop() -- dirty hack - result <<- callOnWorker(worker, provider, method, jsonArgs, callFunc) - Op.noop() -- dirty hack - else: - result <<- JsonString(value = "", success = false, error = error!) - + result: *JsonString + + workers, error <- getWorkers() + if error != nil: + result <- errorJsonString(error!) + else: + log <- initPeerLogger() + random <- timeRandom() + balancer <- randomBalancer{random}(workers, uris) + result <- balancedEthCall{log, balancer}(method, jsonArgs) + <- result! func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string) -> JsonString: - log <- initPeerLogger() - <- roundRobin{log}(uris, method, jsonArgs, counterServiceId, counterPeerId, call) + result: *JsonString + + workers, error <- getWorkers() + if error != nil: + result <- errorJsonString(error!) + else: + log <- initPeerLogger() + counter <- onPeerCounter(counterPeerId, counterServiceId) + balancer <- cycleBalancer{counter}(workers, uris) + result <- balancedEthCall{log, balancer}(method, jsonArgs) + + <- result! func quorum( - uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string, - callFunc: string, string, []string -> JsonString + uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string ) -> QuorumResult: results: *JsonString on HOST_PEER_ID: @@ -107,15 +88,15 @@ func quorum( timeP <- NumOp.identity(Peer.timestamp_ms()) providerNumber = timeP % uris.length provider = uris[providerNumber] - results <- callFunc(provider, method, jsonArgs) + rpc <- fromWorkerProvider(worker, provider) + results <- rpcCall{rpc}(method, jsonArgs) -- wait all results from all workers with timeout join results[workers.length - 1] par Peer.timeout(timeout, "") on quorumPeerId via HOST_PEER_ID: - Counter quorumServiceId -- check all results that we got quorumResult <- QuorumChecker.check(results, quorumNumber) <- quorumResult func quorumEth(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string) -> QuorumResult: - <- quorum(uris, quorumNumber, timeout, method, jsonArgs, quorumServiceId, quorumPeerId, call) \ No newline at end of file + <- quorum(uris, quorumNumber, timeout, method, jsonArgs, quorumServiceId, quorumPeerId) \ No newline at end of file diff --git a/gateway/aqua/utils.aqua b/gateway/aqua/utils.aqua new file mode 100644 index 0000000..d6c39e9 --- /dev/null +++ b/gateway/aqua/utils.aqua @@ -0,0 +1,4 @@ +module Utils declares NumOp + +service NumOp("op"): + identity(n: u64) -> i64 \ No newline at end of file diff --git a/gateway/src/index.js b/gateway/src/index.js index 2cf9889..d15cf49 100644 --- a/gateway/src/index.js +++ b/gateway/src/index.js @@ -9,11 +9,11 @@ import { Fluence } from "@fluencelabs/js-client"; import { quorumEth, randomLoadBalancingEth, - registerCounter, registerQuorumChecker, roundRobinEth, } from "../aqua-compiled/rpc.js"; import { registerLoggerSrv } from "../aqua-compiled/logger.js"; +import { registerCounterSrv } from "../aqua-compiled/counter.js"; import { readArguments } from "./arguments.js"; import { readConfig } from "./config.js"; import { methods } from "./methods.js"; @@ -61,7 +61,7 @@ registerLoggerSrv({ }); let counter = 0; -registerCounter("counter", { +registerCounterSrv("counter", { incrementAndReturn: () => { counter++; console.log("Counter: " + counter); From 01dfda1d0be832d97a7df5d952901a78bad3d9d9 Mon Sep 17 00:00:00 2001 From: InversionSpaces Date: Mon, 16 Oct 2023 09:42:53 +0000 Subject: [PATCH 3/8] Rewrote quorum --- gateway/aqua/balancer.aqua | 5 +--- gateway/aqua/quorum.aqua | 25 +++++++++++++++++ gateway/aqua/rpc.aqua | 57 +++++++++++++++++++------------------- gateway/src/index.js | 4 +-- 4 files changed, 56 insertions(+), 35 deletions(-) create mode 100644 gateway/aqua/quorum.aqua diff --git a/gateway/aqua/balancer.aqua b/gateway/aqua/balancer.aqua index 9e53b5a..2d5b60e 100644 --- a/gateway/aqua/balancer.aqua +++ b/gateway/aqua/balancer.aqua @@ -1,4 +1,4 @@ -module Balancer declares Balancer, randomBalancer, cycleBalancer +module Balancer declares Balancer, ProviderBalancer, WorkersBalancer, randomBalancer, cycleBalancer import Worker from "@fluencelabs/aqua-lib/subnet.aqua" @@ -6,9 +6,6 @@ import Counter from "counter.aqua" import Random from "random.aqua" import Provider from "provider.aqua" -ability RequestHandler: - request() - ability WorkersBalancer: nextWorker() -> Worker diff --git a/gateway/aqua/quorum.aqua b/gateway/aqua/quorum.aqua new file mode 100644 index 0000000..5fa5d5c --- /dev/null +++ b/gateway/aqua/quorum.aqua @@ -0,0 +1,25 @@ +module Quorum declares QuorumChecker, QuorumResult, onPeerQuorumChecker + +import JsonString from "services.aqua" + +export QuorumCheckerSrv + +data QuorumResult: + value: string + results: []JsonString + error: string + +ability QuorumChecker: + check(results: []JsonString, minResults: u32) -> QuorumResult + +service QuorumCheckerSrv("quorum"): + check(results: []JsonString, minResults: u32) -> QuorumResult + +func onPeerQuorumChecker(peer: string, id: string) -> QuorumChecker: + check = func (results: []JsonString, minResults: u32) -> QuorumResult: + on peer: + QuorumCheckerSrv id + res <- QuorumCheckerSrv.check(results, minResults) + <- res + + <- QuorumChecker(check = check) \ No newline at end of file diff --git a/gateway/aqua/rpc.aqua b/gateway/aqua/rpc.aqua index 332f063..c907685 100644 --- a/gateway/aqua/rpc.aqua +++ b/gateway/aqua/rpc.aqua @@ -7,22 +7,18 @@ import "services.aqua" use "deals.aqua" import Logger, initPeerLogger from "logger.aqua" -import Balancer, randomBalancer, cycleBalancer from "balancer.aqua" +import Balancer, ProviderBalancer, randomBalancer, cycleBalancer from "balancer.aqua" import onPeerCounter from "counter.aqua" +import QuorumChecker, QuorumResult, onPeerQuorumChecker from "quorum.aqua" import timeRandom from "random.aqua" import RPCEth, fromWorkerProvider from "eth_rpc.aqua" import NumOp from "utils.aqua" -export randomLoadBalancingEth, roundRobinEth, quorumEth, QuorumChecker +export randomLoadBalancingEth, roundRobinEth, quorumEth -data QuorumResult: - value: string - results: []JsonString - error: string - -service QuorumChecker("quorum"): - check(results: []JsonString, minResults: u32) -> QuorumResult +func errorQuorumResult(msg: string) -> QuorumResult: + <- QuorumResult(value = "", results = [], error = msg) func errorJsonString(msg: string) -> JsonString: <- JsonString(value = "", success = false, error = msg) @@ -76,27 +72,30 @@ func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterSe <- result! -func quorum( - uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string -) -> QuorumResult: +func quorum{ProviderBalancer, QuorumChecker}(workers: []Worker, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string) -> QuorumResult: results: *JsonString - on HOST_PEER_ID: - workers <- getWorkers() + on HOST_PEER_ID: for worker <- workers par: - on worker.worker_id! via worker.host_id: - -- choose provider randomly - timeP <- NumOp.identity(Peer.timestamp_ms()) - providerNumber = timeP % uris.length - provider = uris[providerNumber] - rpc <- fromWorkerProvider(worker, provider) - results <- rpcCall{rpc}(method, jsonArgs) - -- wait all results from all workers with timeout - join results[workers.length - 1] - par Peer.timeout(timeout, "") - on quorumPeerId via HOST_PEER_ID: - -- check all results that we got - quorumResult <- QuorumChecker.check(results, quorumNumber) - <- quorumResult + provider <- ProviderBalancer.nextProvider() + rpc <- fromWorkerProvider(worker, provider) + results <- rpcCall{rpc}(method, jsonArgs) + + -- wait all results from all workers with timeout + join results[workers.length - 1] + par Peer.timeout(timeout, "Workers timeout") + + <- QuorumChecker.check(results, quorumNumber) func quorumEth(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string) -> QuorumResult: - <- quorum(uris, quorumNumber, timeout, method, jsonArgs, quorumServiceId, quorumPeerId) \ No newline at end of file + result: *QuorumResult + + workers, error <- getWorkers() + if error != nil: + result <- errorQuorumResult(error!) + else: + random <- timeRandom() + balancer <- randomBalancer{random}(workers, uris) + quorumChecker <- onPeerQuorumChecker(quorumPeerId, quorumServiceId) + result <- quorum{balancer, quorumChecker}(workers, quorumNumber, timeout, method, jsonArgs) + + <- result! \ No newline at end of file diff --git a/gateway/src/index.js b/gateway/src/index.js index d15cf49..d21e5e0 100644 --- a/gateway/src/index.js +++ b/gateway/src/index.js @@ -9,11 +9,11 @@ import { Fluence } from "@fluencelabs/js-client"; import { quorumEth, randomLoadBalancingEth, - registerQuorumChecker, roundRobinEth, } from "../aqua-compiled/rpc.js"; import { registerLoggerSrv } from "../aqua-compiled/logger.js"; import { registerCounterSrv } from "../aqua-compiled/counter.js"; +import { registerQuorumCheckerSrv } from "../aqua-compiled/quorum.js"; import { readArguments } from "./arguments.js"; import { readConfig } from "./config.js"; import { methods } from "./methods.js"; @@ -102,7 +102,7 @@ function findSameResults(results, minNum) { } } -registerQuorumChecker("quorum", { +registerQuorumCheckerSrv("quorum", { check: (ethResults, minQuorum) => { console.log("Check quorum for:"); console.log(ethResults); From 987f8507e4fbb31406373758549fde500da25a35 Mon Sep 17 00:00:00 2001 From: InversionSpaces Date: Mon, 16 Oct 2023 09:50:43 +0000 Subject: [PATCH 4/8] Add comments --- gateway/aqua/balancer.aqua | 5 +++++ gateway/aqua/eth_rpc.aqua | 4 ++++ gateway/aqua/quorum.aqua | 4 ++++ gateway/aqua/random.aqua | 2 ++ gateway/aqua/rpc.aqua | 9 ++++++++- gateway/aqua/utils.aqua | 1 + 6 files changed, 24 insertions(+), 1 deletion(-) diff --git a/gateway/aqua/balancer.aqua b/gateway/aqua/balancer.aqua index 2d5b60e..980cb00 100644 --- a/gateway/aqua/balancer.aqua +++ b/gateway/aqua/balancer.aqua @@ -17,7 +17,10 @@ ability Balancer: nextProvider() -> Provider next() -> Worker, Provider +-- Create balancer that returns +-- workers and providers in random order func randomBalancer{Random}(workers: []Worker, providers: []Provider) -> Balancer: + -- closures do not capture topology here nextWorker = func () -> Worker: rand <- Random.next() idx = rand % workers.length @@ -33,6 +36,8 @@ func randomBalancer{Random}(workers: []Worker, providers: []Provider) -> Balance <- Balancer(next=next, nextWorker=nextWorker, nextProvider=nextProvider) +-- Create balancer that returns +-- workers and providers in cycle order func cycleBalancer{Counter}(workers: []Worker, providers: []Provider) -> Balancer: next = func () -> Worker, Provider: n <- Counter.incrementAndReturn() diff --git a/gateway/aqua/eth_rpc.aqua b/gateway/aqua/eth_rpc.aqua index 5489154..8a0d6bc 100644 --- a/gateway/aqua/eth_rpc.aqua +++ b/gateway/aqua/eth_rpc.aqua @@ -6,11 +6,15 @@ import "services.aqua" import Provider from "provider.aqua" +-- Ability to call Ethereum JSON RPC methods ability RPCEth: call(method: string, jsonArgs: []string) -> JsonString +-- Create RPCEth ability from Worker and Provider func fromWorkerProvider(worker: Worker, provider: Provider) -> RPCEth: + -- closure does not capture topology here call = func (method: string, jsonArgs: []string) -> JsonString: + -- TODO: Handle worker_id == nil? on worker.worker_id! via worker.host_id: res <- EthRpc.eth_call(provider, method, jsonArgs) <- res diff --git a/gateway/aqua/quorum.aqua b/gateway/aqua/quorum.aqua index 5fa5d5c..b8f303a 100644 --- a/gateway/aqua/quorum.aqua +++ b/gateway/aqua/quorum.aqua @@ -9,13 +9,17 @@ data QuorumResult: results: []JsonString error: string +-- Ability to check if a quorum on results is reached ability QuorumChecker: check(results: []JsonString, minResults: u32) -> QuorumResult service QuorumCheckerSrv("quorum"): check(results: []JsonString, minResults: u32) -> QuorumResult +-- Create a QuorumChecker ability +-- that checks quorum on peer through QuorumCheckerSrv(id) func onPeerQuorumChecker(peer: string, id: string) -> QuorumChecker: + -- closure does not capture topology here check = func (results: []JsonString, minResults: u32) -> QuorumResult: on peer: QuorumCheckerSrv id diff --git a/gateway/aqua/random.aqua b/gateway/aqua/random.aqua index 97a502b..5863635 100644 --- a/gateway/aqua/random.aqua +++ b/gateway/aqua/random.aqua @@ -7,7 +7,9 @@ import NumOp from "utils.aqua" ability Random: next() -> i64 +-- Create random from timestamp func timeRandom() -> Random: + -- closure does not capture topology here next = func () -> i64: t <- Peer.timestamp_sec() n <- NumOp.identity(t) diff --git a/gateway/aqua/rpc.aqua b/gateway/aqua/rpc.aqua index c907685..338c395 100644 --- a/gateway/aqua/rpc.aqua +++ b/gateway/aqua/rpc.aqua @@ -23,6 +23,7 @@ func errorQuorumResult(msg: string) -> QuorumResult: func errorJsonString(msg: string) -> JsonString: <- JsonString(value = "", success = false, error = msg) +-- Get workers participating in deal func getWorkers() -> []Worker, ?string: on INIT_PEER_ID via HOST_PEER_ID: deals <- Deals.get() @@ -31,19 +32,22 @@ func getWorkers() -> []Worker, ?string: result <- Subnet.resolve(dealId) <- result.workers, result.error +-- Call RPC method through ability func rpcCall{RPCEth}(method: string, jsonArgs: []string) -> JsonString: <- RPCEth.call(method, jsonArgs) +-- Call RPC method with load balancing func balancedEthCall{Logger, Balancer}(method: string, jsonArgs: []string) -> JsonString: on HOST_PEER_ID: worker, provider <- Balancer.next() Logger.logWorker(worker) Logger.logCall(provider) - Op.noop() -- dirty hack + Op.noop() -- dirty hack for topology to converge rpc <- fromWorkerProvider(worker, provider) result <- rpcCall{rpc}(method, jsonArgs) <- result +-- Call RPC method with random load balancing func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) -> JsonString: result: *JsonString @@ -58,6 +62,7 @@ func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) <- result! +-- Call RPC method with round-robin load balancing func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string) -> JsonString: result: *JsonString @@ -72,6 +77,7 @@ func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterSe <- result! +-- Call RPC method with workers quorum and provider load balancing func quorum{ProviderBalancer, QuorumChecker}(workers: []Worker, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string) -> QuorumResult: results: *JsonString on HOST_PEER_ID: @@ -86,6 +92,7 @@ func quorum{ProviderBalancer, QuorumChecker}(workers: []Worker, quorumNumber: u3 <- QuorumChecker.check(results, quorumNumber) +-- Call RPC method with workers quorum and provider load balancing func quorumEth(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string) -> QuorumResult: result: *QuorumResult diff --git a/gateway/aqua/utils.aqua b/gateway/aqua/utils.aqua index d6c39e9..08c5d7e 100644 --- a/gateway/aqua/utils.aqua +++ b/gateway/aqua/utils.aqua @@ -1,4 +1,5 @@ module Utils declares NumOp +-- Used to coerce types service NumOp("op"): identity(n: u64) -> i64 \ No newline at end of file From 42fedb7a1ab6b0e53febaaeef755c75266a8ce62 Mon Sep 17 00:00:00 2001 From: InversionSpaces Date: Mon, 16 Oct 2023 11:17:58 +0000 Subject: [PATCH 5/8] Add config.mode check --- gateway/src/config.js | 46 ++++++++++++++++++++++++++----------------- gateway/src/index.js | 9 ++++++--- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/gateway/src/config.js b/gateway/src/config.js index bc10a14..c3e1b0d 100644 --- a/gateway/src/config.js +++ b/gateway/src/config.js @@ -1,24 +1,34 @@ -import fs from 'fs'; +import fs from "fs"; -export const configHelp = "Config structure: { port, relay, serviceId, providers, mode, counterServiceId?, counterPeerId?}\n" + - "Where 'mode' can be: 'random' (default), 'round-robin' or 'quorum',\n" + - "'counterServiceId' and 'counterPeerId' will use local service if undefined.\n" - "'quorumServiceId' and 'quorumPeerId' will use local service if undefined.\n" +export const configHelp = + "Config structure: { port, relay, serviceId, providers, mode, counterServiceId?, counterPeerId?}\n" + + "Where 'mode' can be: 'random' (default), 'round-robin' or 'quorum',\n" + + "'counterServiceId' and 'counterPeerId' will use local service if undefined.\n"; +("'quorumServiceId' and 'quorumPeerId' will use local service if undefined.\n"); export function readConfig(path) { - const rawdata = fs.readFileSync(path); - const config = JSON.parse(rawdata); + const rawdata = fs.readFileSync(path); + const config = JSON.parse(rawdata); - let errors = [] - if (!config.port) { - errors.push("Specify port ('port') in config") - } - if (!config.relay) { - errors.push("Specify Fluence peer address ('relay') in config") - } + let errors = []; + if (!config.port) { + errors.push("Specify port ('port') in config"); + } + if (!config.relay) { + errors.push("Specify Fluence peer address ('relay') in config"); + } + if ( + !!config.mode && + !["random", "round-robin", "quorum"].includes(config.mode) + ) { + errors.push( + `Incorrect mode '${config.mode}' in config. Should be 'random', 'round-robin' or 'quorum'` + ); + } - return { - config, errors, - help: configHelp - } + return { + config, + errors, + help: configHelp, + }; } diff --git a/gateway/src/index.js b/gateway/src/index.js index d21e5e0..7c7ddef 100644 --- a/gateway/src/index.js +++ b/gateway/src/index.js @@ -115,14 +115,17 @@ const counterPeerId = config.counterPeerId || peerId; const quorumServiceId = config.quorumServiceId || "quorum"; const quorumPeerId = config.quorumPeerId || peerId; const quorumNumber = config.quorumNumber || 2; +const mode = config.mode || "random"; + +console.log(`Using mode '${mode}'`); async function methodHandler(reqRaw, method) { const req = reqRaw.map((s) => JSON.stringify(s)); console.log(`Receiving request '${method}'`); let result; - if (!config.mode || config.mode === "random") { + if (mode === "random") { result = await randomLoadBalancingEth(config.providers, method, req); - } else if (config.mode === "round-robin") { + } else if (mode === "round-robin") { result = await roundRobinEth( config.providers, method, @@ -131,7 +134,7 @@ async function methodHandler(reqRaw, method) { counterPeerId, config.serviceId ); - } else if (config.mode === "quorum") { + } else if (mode === "quorum") { const quorumResult = await quorumEth( config.providers, quorumNumber, From a201aaa1d7598d68344fd3d26855bf37729c9dd9 Mon Sep 17 00:00:00 2001 From: InversionSpaces Date: Tue, 24 Oct 2023 09:44:54 +0000 Subject: [PATCH 6/8] Update README --- README.md | 181 +++++++++++++++++++++++++++--------------- gateway/aqua/rpc.aqua | 4 +- 2 files changed, 119 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index fdeffc6..85e6dbf 100644 --- a/README.md +++ b/README.md @@ -511,23 +511,24 @@ Note that the deal's section in [fluence.yaml](./fluence.yaml) specifies the min Now that we have our services deployed and ready for action, it's time to look at Aqua, which is utilized by the Gateway to bridge HTTP to/from libp2p. Let's have a look at the Aqua code and structure. -[rpc.aqua]("./gateway/aqua/rpc.aqua") contains the necessary dependencies and algorithms. +File [rpc.aqua]("./gateway/aqua/rpc.aqua") is the file where fRPC algorithms and entrypoints used by gateway are defined. ```aqua -- rpc.aqua +aqua RPC + import "@fluencelabs/aqua-lib/builtin.aqua" -import "deals.aqua" +import Subnet, Worker from "@fluencelabs/aqua-lib/subnet.aqua" + import "services.aqua" -import "@fluencelabs/registry/subnetwork.aqua" -import Registry, Record from "@fluencelabs/registry/registry-service.aqua" -import "@fluencelabs/spell/spell_service.aqua" +use "deals.aqua" ``` -Two of the dependencies (should) stand out: *deals.aqua* and *services.aqua* as they are local files located in the project *.fluence* directory: *services.aqua* contains the interface exports from the *eth-rpc* wasm module and *deals.aqua* maps the values from *deployed.yaml* to data structures usable by your aqua code. Since these files are dynamically generated by Fluence CLI, you need to (re-) compile your Aqua after every change to your Wasm code or deal deploy updates. For all things Aqua refer to the [Aqua book](https://fluence.dev/docs/aqua-book/introduction), the [aqua playground](https://github.com/fluencelabs/aqua-playground) and the respective repos: [aqua-lib](https://github.com/fluencelabs/aqua-lib), [registry](https://github.com/fluencelabs/registry), [spell](https://github.com/fluencelabs/spell). +Two of the dependencies (should) stand out: *deals.aqua* and *services.aqua* as they are local files located in the project *.fluence* directory: *services.aqua* contains the interface exports from the *eth-rpc* wasm module and *deals.aqua* maps the values from *deployed.yaml* to data structures usable by your aqua code. Since these files are dynamically generated by Fluence CLI, you need to (re-) compile your Aqua after every change to your Wasm code or deal deploy updates. For further details and examples, consult the [Aqua book](https://fluence.dev/docs/aqua-book/introduction), explore the [aqua playground](https://github.com/fluencelabs/aqua-playground) and visit the relevant repositories: [aqua-lib](https://github.com/fluencelabs/aqua-lib), [registry](https://github.com/fluencelabs/registry), [spell](https://github.com/fluencelabs/spell). ### fRPC Gateway Configuration -The gateway config file, e.g., [quickstart_config.json](./configs/quickstart_config.json), contains the parameters necessary for the gateway to connect to the Fluence p2p network and to configure the gateway's behavior. The gateway config file is a json file with the following parameters: +The gateway config file, e.g., [quickstart_config.json](./configs/quickstart_config.json), contains the parameters for Fluence p2p network connection and gateway behavior. Key parameters include: * *providers*: an array of RPC endpoint urls, e.g., Infura, Alchemy, Ankr, etc. * *mode*: one of "random", "round-robin" or "quorum" to specify the endpoint selection algorithm @@ -537,7 +538,31 @@ The gateway config file, e.g., [quickstart_config.json](./configs/quickstart_con ### fRPC Algorithms -The fRPC substrate comes with basic implementations of several algorithms useful in mitigating failure as the result of availability and lack of trustlessness with respect to RPC endpoints and possibly peers. +The fRPC substrate offers basic algorithms to enhance reliability, addressing issues related to RPC endpoint availability and trustworthiness. + +Let's first examine *balancedEthCall* in [rpc.aqua]("./gateway/aqua/rpc.aqua"): + +```aqua +-- Call RPC method with load balancing +func balancedEthCall{Logger, Balancer}(method: string, jsonArgs: []string) -> JsonString: -- (1) + on HOST_PEER_ID: -- (2) + worker, provider <- Balancer.next() -- (3) + Logger.logWorker(worker) -- (4) + Logger.logCall(provider) -- (4) + rpc <- fromWorkerProvider(worker, provider) -- (5) + result <- rpcCall{rpc}(method, jsonArgs) -- (6) + <- result -- (7) +``` + +This function is a building block for other algorithms that allows to make a call to a RPC endpoint with some balancing logic. Let's go through the code line by line: + +* (1) Function declaration states that two abilities are required to execute it: *Logger* and *Balancer*. To learn more about abilities, see [Abilities](https://fluence.dev/docs/aqua-book/language/abilities). +* (2) The function is executed on the host peer, i.e. the relay peer we used to connect to Fluence p2p network. +* (3) Worker and RPC provider are determined by *Balancer*. +* (4) Worker and provider are logged for debugging purposes. +* (5) RPC ability is created from worker and provider with a helper function *fromWorkerProvider*. +* (6) RPC ability is passed to *rpcCall* function to make the actual call. +* (7) Result of the call is returned. #### Random @@ -547,26 +572,26 @@ Randomization the selection of one out of many RPC endpoints by itself is a weak The fRPC substrate implementation is very basic from a business logic perspective but illustrates how to randomly choose both a worker, which represents the deployed service on a particular peer, and an RPC endpoint: -```python -func randomLoadBalancing(uris: []string, method: string, jsonArgs: []string, callFunc: string, string, []string -> JsonString) -> JsonString: - on HOST_PEER_ID: --< 1 - workers <- getWorkers() --< 2 - workersNum = workers.length - -- choose worker randomly - timeW <- NumOp.identity(Peer.timestamp_sec()) - workerNumber = timeW % workers.length --< 3 - worker = workers[workerNumber] - -- choose provider randomly - timeP <- NumOp.identity(Peer.timestamp_sec()) - providerNumber = timeP % uris.length --< 5 - provider = uris[providerNumber] - result <- callOnWorker(worker, provider, method, jsonArgs, callFunc) --< 6 - Logger.logWorker(worker) --< 7 - Logger.logCall(uris[providerNumber]) - <- result +```aqua +func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) -> JsonString: + result: ?JsonString + + workers, error <- getWorkers() -- (1) + if error != nil: + result <- errorJsonString(error!) + else: + log <- initPeerLogger() -- (2) + random <- timeRandom() -- (2) + balancer <- randomBalancer{random}(workers, uris) -- (2) + result <- balancedEthCall{log, balancer}(method, jsonArgs) -- (3) + + <- result! ``` -We want to execute our Aqua program on the peer of the client, i.e. the gateway's client peer, connected to (1). To set up our randomized worker and endpoint selection, we need to get the workers running our eth-rpc service (2). We then calculate the index *workerNumber* as a random integer (3) and do the same for the endpoint provider (5). Finally, we call the chosen worker with the chosen endpoint url (6) and print our randomly chosen integers to the screen (7) before returning the result. +The *randomLoadBalancingEth* function is build upon *balancedEthCall*: +* (1) Workers that are part of the deal are fetched from the network. +* (2) Logger and random balancer are initialized. +* (3) *balancedEthCall* is called with logger and balancer. #### Round robin @@ -576,24 +601,29 @@ We want to execute our Aqua program on the peer of the client, i.e. the gateway' * *counterServiceId*: the service id of the counter service * *counterPeerId*: the peer id of the counter service +A round robin algorithm cycles through the different options usually in a predictable manner. This substrate implementation is no different: + ```aqua -func roundRobin(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string, callFunc: string, string, []string -> JsonString) -> JsonString: - on counterPeerId: --< 1 - Counter counterServiceId - requestNumber <- Counter.incrementAndReturn() - on HOST_PEER_ID: --< 2 - workers <- getWorkers() - workerNumber = requestNumber % workers.length - worker = workers[workerNumber] --< 3 - providerNumber = requestNumber % uris.length - provider = uris[providerNumber] --< 4 - result <- callOnWorker(worker, provider, method, jsonArgs, callFunc) --< 5 - Logger.logWorker(worker) - Logger.logCall(uris[providerNumber]) - <- result +func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string) -> JsonString: + result: ?JsonString + + workers, error <- getWorkers() + if error != nil: + result <- errorJsonString(error!) + else: + log <- initPeerLogger() + counter <- onPeerCounter(counterPeerId, counterServiceId) -- (1) + balancer <- cycleBalancer{counter}(workers, uris) -- (2) + result <- balancedEthCall{log, balancer}(method, jsonArgs) + + <- result! ``` -A round robin algorithm cycles through the different options usually in a predictable manner. This substrate implementation is no different. To keep the state of the *cycle index*, we use a counter based on a local, [js-client](https://github.com/fluencelabs/js-client) based service [implementation]("./gateway/src/index.js) (1). Here, the peer executing the *Counter* service is the (local) client-peer implemented by the gateway. Note that the state of the counter is limited to the life of the gateway. With the incremented counter in place, we had to our relay (2), determine our worker (3) and provider (4) indexes, call for the service execution (5), log and return the result. +The *roundRobinEth* function is very similar to *randomLoadBalancingEth*, except for the balancer: +* (1) Counter ability is created from peer id and service id. +* (2) Cycle balancer is created from counter and workers. + +To keep the state of the *cycle index*, we use a counter based on a local, [js-client](https://github.com/fluencelabs/js-client) based [service](./gateway/src/index.js). The peer executing the *Counter* service is the (local) client-peer implemented by the gateway. Note that the state of the counter is limited to the life of the gateway. #### Quorum @@ -604,34 +634,57 @@ A round robin algorithm cycles through the different options usually in a predic * *quorumPeerId*: the peer id of the quorum service * *quorumNumber*: the number of results that must be equal to determine a quorum result +A quorum, aka "off-chain consensus", "determines" a result by a ranked frequency distribution of the results pool and makes a selection against a quorum threshold value, e.g., 2/3 of items in the results pool must be equal for a quorum result to be accepted. Moreover, additional parameters such as the minimum number of items in the result pool may be added. Depending on your trust in the peers processing the endpoint requests or even the peer executing the quorum algorithm, additional verification steps may have to be added. There is one more pertinent consideration when it comes to designing quorum algorithms: the differentiation between (on-chain) read and write operations. + +In the fRPC substrate implementation, we provide a basic quorum algorithm: + ```aqua -func quorum( - uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string, - callFunc: string, string, []string -> JsonString -) -> QuorumResult: +func quorum{ProviderBalancer, QuorumChecker}(workers: []Worker, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string) -> QuorumResult: -- (1) results: *JsonString - on HOST_PEER_ID: - workers <- getWorkers() - for worker <- workers par: --< 1 - on worker.metadata.peer_id via worker.metadata.relay_id: - -- choose provider randomly - timeP <- NumOp.identity(Peer.timestamp_ms()) - providerNumber = timeP % uris.length - provider = uris[providerNumber] - results <- callFunc(provider, method, jsonArgs) --< 2 - -- wait all results from all workers with timeout - join results[workers.length - 1] - par Peer.timeout(timeout, "") --< 3 - on quorumPeerId via HOST_PEER_ID: - Counter quorumServiceId - -- check all results that we got - quorumResult <- QuorumChecker.check(results, quorumNumber) --< 4 - <- quorumResult + on HOST_PEER_ID: -- (2) + for worker <- workers par: -- (3) + provider <- ProviderBalancer.nextProvider() -- (4) + rpc <- fromWorkerProvider(worker, provider) -- (5) + results <- rpcCall{rpc}(method, jsonArgs) -- (6) + + -- wait all results from all workers with timeout + join results[workers.length - 1] -- (7) + par Peer.timeout(timeout, "Workers timeout") -- (7) + + <- QuorumChecker.check(results, quorumNumber) -- (8) ``` -A quorum, aka "off-chain consensus", "determines" a result by a ranked frequency distribution of the results pool and makes a selection against a quorum threshold value, e.g., 2/3 of items in the results pool must be equal for a quorum result to be accepted. Moreover, additional parameters such as the minimum number of items in the result pool may be added. Depending on your trust in the peers processing the endpoint requests or even the peer executing the quorum algorithm, additional verification steps may have to be added. There is one more pertinent consideration when it comes to designing quorum algorithms: the differentiation between (on-chain) read and write operations. +Let's examine the code line by line: +* (1) Function declaration states that two abilities are required to execute it: *ProviderBalancer* and *QuorumChecker*. To learn more about abilities, see [Abilities](https://fluence.dev/docs/aqua-book/language/abilities). +* (2) The function is executed on the host peer, i.e. the relay peer we used to connect to Fluence p2p network. +* (3) For each worker in parallel: + * (4) Provider is determined by *ProviderBalancer*. + * (5) RPC ability is created from worker and provider with a helper function *fromWorkerProvider*. + * (6) RPC ability is passed to *rpcCall* function to make the actual call. Result is saved in *results* stream variable. +* (7) Results from all workers are waited for with a timeout. For more information, see [Timeout and race patterns](https://fluence.dev/docs/aqua-book/language/flow/parallel#timeout-and-race-patterns). +* (8) Results are checked by *QuorumChecker*. + +As evidenced by the code, no considerations to differentiate between read and write operations are made, which might prove disadvantageous when submitting, for example, a signed transaction. + +The actual entrypoint, *quorumEthCall*, is a wrapper around *quorum*: + +```aqua +func quorumEth(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string) -> QuorumResult: + result: *QuorumResult + + workers, error <- getWorkers() + if error != nil: + result <- errorQuorumResult(error!) + else: + random <- timeRandom() + balancer <- randomBalancer{random}(workers, uris) + quorumChecker <- onPeerQuorumChecker(quorumPeerId, quorumServiceId) + result <- quorum{balancer, quorumChecker}(workers, quorumNumber, timeout, method, jsonArgs) + + <- result! +``` -In the fRPC substrate implementation, we provide a basic quorum algo that polls each endpoint in parallel (1) and captures the results in a stream variable (2) and binds the loop with a timeout condition (3) running in parallel to (1). See the [Aqua book](https://fluence.dev/docs/aqua-book/language/flow/parallel#timeout-and-race-patterns) for more details. Finally, we check the results and return the result (4). As evidenced by the code, no considerations to differentiate between read and write operations are made, which might prove disadvantageous when submitting, for example, a signed transaction. +It is very similar to *randomLoadBalancingEth* and *roundRobinEth*, except for the balancer and quorum checker initialization. To determine the quorum result, we use a local, [js-client](https://github.com/fluencelabs/js-client) based [service](./gateway/src/index.js). The peer executing the *QuorumChecker* service is the (local) client-peer implemented by the gateway. ## Summary diff --git a/gateway/aqua/rpc.aqua b/gateway/aqua/rpc.aqua index 338c395..328f39b 100644 --- a/gateway/aqua/rpc.aqua +++ b/gateway/aqua/rpc.aqua @@ -49,7 +49,7 @@ func balancedEthCall{Logger, Balancer}(method: string, jsonArgs: []string) -> Js -- Call RPC method with random load balancing func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) -> JsonString: - result: *JsonString + result: ?JsonString workers, error <- getWorkers() if error != nil: @@ -64,7 +64,7 @@ func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) -- Call RPC method with round-robin load balancing func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string) -> JsonString: - result: *JsonString + result: ?JsonString workers, error <- getWorkers() if error != nil: From 89a5042aa2e252506ec535df2c8310d4a7a83ad9 Mon Sep 17 00:00:00 2001 From: InversionSpaces Date: Tue, 24 Oct 2023 09:50:36 +0000 Subject: [PATCH 7/8] Small fixes --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 85e6dbf..cb8c594 100644 --- a/README.md +++ b/README.md @@ -593,6 +593,8 @@ The *randomLoadBalancingEth* function is build upon *balancedEthCall*: * (2) Logger and random balancer are initialized. * (3) *balancedEthCall* is called with logger and balancer. +Note that time service is used to generate random numbers. This is not a good idea for production, but it's good enough for demonstration. + #### Round robin **Use: Set `mode` to "round-robin" in your gateway config file** @@ -662,7 +664,7 @@ Let's examine the code line by line: * (5) RPC ability is created from worker and provider with a helper function *fromWorkerProvider*. * (6) RPC ability is passed to *rpcCall* function to make the actual call. Result is saved in *results* stream variable. * (7) Results from all workers are waited for with a timeout. For more information, see [Timeout and race patterns](https://fluence.dev/docs/aqua-book/language/flow/parallel#timeout-and-race-patterns). -* (8) Results are checked by *QuorumChecker*. +* (8) Final result is determined by *QuorumChecker* based on all results gathered at this point. As evidenced by the code, no considerations to differentiate between read and write operations are made, which might prove disadvantageous when submitting, for example, a signed transaction. From 4e3772fc9ac09f6bf77ad72467977a4ff5ab6a8e Mon Sep 17 00:00:00 2001 From: InversionSpaces Date: Tue, 24 Oct 2023 10:04:49 +0000 Subject: [PATCH 8/8] Update js-client --- gateway/package-lock.json | 184 ++++++++++++++++++++++++++++++++------ gateway/package.json | 2 +- gateway/src/index.js | 2 +- 3 files changed, 159 insertions(+), 29 deletions(-) diff --git a/gateway/package-lock.json b/gateway/package-lock.json index b5cd8b2..015cbe1 100644 --- a/gateway/package-lock.json +++ b/gateway/package-lock.json @@ -9,7 +9,7 @@ "version": "0.0.18", "license": "Apache-2.0", "dependencies": { - "@fluencelabs/js-client": "0.1.7", + "@fluencelabs/js-client": "0.3.0", "@fluencelabs/marine-worker": "0.3.3", "body-parser": "1.20.2", "express": "4.18.2", @@ -625,19 +625,22 @@ } }, "node_modules/@fluencelabs/js-client": { - "version": "0.1.7", - "resolved": "https://registry.npmjs.org/@fluencelabs/js-client/-/js-client-0.1.7.tgz", - "integrity": "sha512-S9chuqlOcPMWFjVjL099WvnjeLdEWJoMIwnxXJ5zlLdRXH0qqNCeZjgtFhvaP+6k1EcxIAksHT5hK5T5aV/q8g==", + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@fluencelabs/js-client/-/js-client-0.3.0.tgz", + "integrity": "sha512-mRlEeoDEAsHK4GK3vIlNBkXgfJ01maQ4WVefob4QNEqpshipf6XQpU6R8dpUsjyhx53nus3ui6BSUV6gi5jg8A==", "dependencies": { "@chainsafe/libp2p-noise": "13.0.0", "@chainsafe/libp2p-yamux": "5.0.0", + "@fluencelabs/avm": "0.52.0", "@fluencelabs/interfaces": "0.8.2", + "@fluencelabs/marine-worker": "0.4.0", "@libp2p/crypto": "2.0.3", "@libp2p/interface": "0.1.2", "@libp2p/peer-id": "3.0.2", "@libp2p/peer-id-factory": "3.0.3", "@libp2p/websockets": "7.0.4", "@multiformats/multiaddr": "11.3.0", + "assert": "2.1.0", "async": "3.2.4", "bs58": "5.0.0", "buffer": "6.0.3", @@ -649,16 +652,32 @@ "libp2p": "0.46.6", "multiformats": "11.0.1", "rxjs": "7.5.5", - "threads": "1.7.0", + "threads": "github:fluencelabs/threads.js#b00a5342380b0278d3ae56dcfb170effb3cad7cd", "ts-pattern": "3.3.3", "uint8arrays": "4.0.3", - "uuid": "8.3.2" + "uuid": "8.3.2", + "zod": "3.22.4" }, "engines": { "node": ">=10", "pnpm": ">=8" } }, + "node_modules/@fluencelabs/js-client/node_modules/@fluencelabs/avm": { + "version": "0.52.0", + "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.52.0.tgz", + "integrity": "sha512-T+/Hv/ZPfwWZAC4tH6wEDIRmtN6cTBebqbCaHfaq2PDLSMG0BgssdFF2BAaEXrvOvJbI5Bu/9bQhHv4ga7bYlA==" + }, + "node_modules/@fluencelabs/js-client/node_modules/@fluencelabs/marine-worker": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/@fluencelabs/marine-worker/-/marine-worker-0.4.0.tgz", + "integrity": "sha512-nWri+j8Ey4UXoB32NPKsmVYzUKj6mwD7vh/5MjzCxrnVthnWnFdnkETF2BnZwjZWc701xeVhF3L5ZSjiQzKywQ==", + "dependencies": { + "@fluencelabs/marine-js": "0.7.2", + "observable-fns": "0.6.1", + "threads": "github:fluencelabs/threads.js#b00a5342380b0278d3ae56dcfb170effb3cad7cd" + } + }, "node_modules/@fluencelabs/marine-js": { "version": "0.7.2", "resolved": "https://registry.npmjs.org/@fluencelabs/marine-js/-/marine-js-0.7.2.tgz", @@ -1704,6 +1723,18 @@ "resolved": "https://registry.npmjs.org/array-flatten/-/array-flatten-1.1.1.tgz", "integrity": "sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==" }, + "node_modules/assert": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/assert/-/assert-2.1.0.tgz", + "integrity": "sha512-eLHpSK/Y4nhMJ07gDaAzoX/XAKS8PSaojml3M0DM4JpV1LAi5JOJ/p6H/XWrl8L+DzVEvVCW1z3vWAaB9oTsQw==", + "dependencies": { + "call-bind": "^1.0.2", + "is-nan": "^1.3.2", + "object-is": "^1.1.5", + "object.assign": "^4.1.4", + "util": "^0.12.5" + } + }, "node_modules/async": { "version": "3.2.4", "resolved": "https://registry.npmjs.org/async/-/async-3.2.4.tgz", @@ -2064,6 +2095,35 @@ "node": ">=14" } }, + "node_modules/define-data-property": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/define-data-property/-/define-data-property-1.1.1.tgz", + "integrity": "sha512-E7uGkTzkk1d0ByLeSc6ZsFS79Axg+m1P/VsgYsxHgiuc3tFSj+MjMIwe90FC4lOAZzNBdY7kkO2P2wKdsQ1vgQ==", + "dependencies": { + "get-intrinsic": "^1.2.1", + "gopd": "^1.0.1", + "has-property-descriptors": "^1.0.0" + }, + "engines": { + "node": ">= 0.4" + } + }, + "node_modules/define-properties": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/define-properties/-/define-properties-1.2.1.tgz", + "integrity": "sha512-8QmQKqEASLd5nx0U1B1okLElbUuuttJ/AnYmRXbbbGDWh6uS208EjD4Xqq/I9wK7u0v6O08XhTWnt5XtEbR6Dg==", + "dependencies": { + "define-data-property": "^1.0.1", + "has-property-descriptors": "^1.0.0", + "object-keys": "^1.1.1" + }, + "engines": { + "node": ">= 0.4" + }, + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, "node_modules/delay": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/delay/-/delay-6.0.0.tgz", @@ -2409,19 +2469,22 @@ "integrity": "sha512-FNUvuTAJ3CqCQb5ELn+qCbGR/Zllhf2HtwsdAtBi59s1WeCjKMT81fHcSu7dwIskqGVK+MmOrb7VOBlq3/SItw==" }, "node_modules/function-bind": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz", - "integrity": "sha512-yIovAzMX49sF8Yl58fSCWJ5svSLuaibPxXQJFLmBObTuCr0Mf1KiPopGM9NiFjiYBCbfaa2Fh6breQ6ANVTI0A==" + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.2.tgz", + "integrity": "sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==", + "funding": { + "url": "https://github.com/sponsors/ljharb" + } }, "node_modules/get-intrinsic": { - "version": "1.2.1", - "resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.1.tgz", - "integrity": "sha512-2DcsyfABl+gVHEfCOaTrWgyt+tb6MSEGmKq+kI5HwLbIYgjgmMcV8KQ41uaKz1xxUcn9tJtgFbQUEVcEbd0FYw==", + "version": "1.2.2", + "resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.2.tgz", + "integrity": "sha512-0gSo4ml/0j98Y3lngkFEot/zhiCeWsbYIlZ+uZOVgzLyLaUw7wxUL+nCTP0XJvJg1AXulJRI3UJi8GsbDuxdGA==", "dependencies": { - "function-bind": "^1.1.1", - "has": "^1.0.3", + "function-bind": "^1.1.2", "has-proto": "^1.0.1", - "has-symbols": "^1.0.3" + "has-symbols": "^1.0.3", + "hasown": "^2.0.0" }, "funding": { "url": "https://github.com/sponsors/ljharb" @@ -2454,15 +2517,15 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/has": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/has/-/has-1.0.3.tgz", - "integrity": "sha512-f2dvO0VU6Oej7RkWJGrehjbzMAjFp5/VKPp5tTpWIV4JHHZK1/BxbFRtf/siA2SWTe09caDmVtYYzWEIbBS4zw==", + "node_modules/has-property-descriptors": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/has-property-descriptors/-/has-property-descriptors-1.0.1.tgz", + "integrity": "sha512-VsX8eaIewvas0xnvinAe9bw4WfIeODpGYikiWYLH+dma0Jw6KHYqWiWfhQlgOVK8D6PvjubK5Uc4P0iIhIcNVg==", "dependencies": { - "function-bind": "^1.1.1" + "get-intrinsic": "^1.2.2" }, - "engines": { - "node": ">= 0.4.0" + "funding": { + "url": "https://github.com/sponsors/ljharb" } }, "node_modules/has-proto": { @@ -2510,6 +2573,17 @@ "minimalistic-assert": "^1.0.1" } }, + "node_modules/hasown": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/hasown/-/hasown-2.0.0.tgz", + "integrity": "sha512-vUptKVTpIJhcczKBbgnS+RtcuYMB8+oNzPK2/Hp3hanz8JmpATdmmgLgSaadVREkDm+e2giHwY3ZRkyjSIDDFA==", + "dependencies": { + "function-bind": "^1.1.2" + }, + "engines": { + "node": ">= 0.4" + } + }, "node_modules/hmac-drbg": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/hmac-drbg/-/hmac-drbg-1.0.1.tgz", @@ -2662,6 +2736,21 @@ "resolved": "https://registry.npmjs.org/is-loopback-addr/-/is-loopback-addr-2.0.2.tgz", "integrity": "sha512-26POf2KRCno/KTNL5Q0b/9TYnL00xEsSaLfiFRmjM7m7Lw7ZMmFybzzuX4CcsLAluZGd+niLUiMRxEooVE3aqg==" }, + "node_modules/is-nan": { + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/is-nan/-/is-nan-1.3.2.tgz", + "integrity": "sha512-E+zBKpQ2t6MEo1VsonYmluk9NxGrbzpeeLC2xIViuO2EjU2xsXsBPwTr3Ykv9l08UYEVEdWeRZNouaZqF6RN0w==", + "dependencies": { + "call-bind": "^1.0.0", + "define-properties": "^1.1.3" + }, + "engines": { + "node": ">= 0.4" + }, + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, "node_modules/is-observable": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/is-observable/-/is-observable-2.1.0.tgz", @@ -3396,6 +3485,46 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/object-is": { + "version": "1.1.5", + "resolved": "https://registry.npmjs.org/object-is/-/object-is-1.1.5.tgz", + "integrity": "sha512-3cyDsyHgtmi7I7DfSSI2LDp6SK2lwvtbg0p0R1e0RvTqF5ceGx+K2dfSjm1bKDMVCFEDAQvy+o8c6a7VujOddw==", + "dependencies": { + "call-bind": "^1.0.2", + "define-properties": "^1.1.3" + }, + "engines": { + "node": ">= 0.4" + }, + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, + "node_modules/object-keys": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/object-keys/-/object-keys-1.1.1.tgz", + "integrity": "sha512-NuAESUOUMrlIXOfHKzD6bpPu3tYt3xvjNdRIQ+FeT0lNb4K8WR70CaDxhuNguS2XG+GjkyMwOzsN5ZktImfhLA==", + "engines": { + "node": ">= 0.4" + } + }, + "node_modules/object.assign": { + "version": "4.1.4", + "resolved": "https://registry.npmjs.org/object.assign/-/object.assign-4.1.4.tgz", + "integrity": "sha512-1mxKf0e58bvyjSCtKYY4sRe9itRk3PJpquJOjeIkz885CczcI4IvJJDLPS72oowuSh+pBxUFROpX+TU++hxhZQ==", + "dependencies": { + "call-bind": "^1.0.2", + "define-properties": "^1.1.4", + "has-symbols": "^1.0.3", + "object-keys": "^1.1.1" + }, + "engines": { + "node": ">= 0.4" + }, + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, "node_modules/observable-fns": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/observable-fns/-/observable-fns-0.6.1.tgz", @@ -3892,8 +4021,9 @@ }, "node_modules/threads": { "version": "1.7.0", - "resolved": "https://registry.npmjs.org/threads/-/threads-1.7.0.tgz", - "integrity": "sha512-Mx5NBSHX3sQYR6iI9VYbgHKBLisyB+xROCBGjjWm1O9wb9vfLxdaGtmT/KCjUqMsSNW6nERzCW3T6H43LqjDZQ==", + "resolved": "git+ssh://git@github.com/fluencelabs/threads.js.git#b00a5342380b0278d3ae56dcfb170effb3cad7cd", + "integrity": "sha512-7ej6ZkM5NHJhEGiOf+pxt+LqJebbvKrXLfvcpOy1X6PIop3ap+a3ULgx+VVdYj8Nw6jYVGYTHCriVtTyoWZN9Q==", + "license": "MIT", "dependencies": { "callsites": "^3.1.0", "debug": "^4.2.0", @@ -4483,9 +4613,9 @@ "integrity": "sha512-FIr/DEeoHfj7ftfylnoFt3rAIRoWXpx2AoDfrT2qD2wtp7Dp+COajvs/Icb7uHqRW9m60f5iXZwdsJJO3kvb7w==" }, "node_modules/zod": { - "version": "3.22.2", - "resolved": "https://registry.npmjs.org/zod/-/zod-3.22.2.tgz", - "integrity": "sha512-wvWkphh5WQsJbVk1tbx1l1Ly4yg+XecD+Mq280uBGt9wa5BKSWf4Mhp6GmrkPixhMxmabYY7RbzlwVP32pbGCg==", + "version": "3.22.4", + "resolved": "https://registry.npmjs.org/zod/-/zod-3.22.4.tgz", + "integrity": "sha512-iC+8Io04lddc+mVqQ9AZ7OQ2MrUKGN+oIQyq1vemgt46jwCwLfhq7/pwnBnNXXXZb8VTVLKwp9EDkx+ryxIWmg==", "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/gateway/package.json b/gateway/package.json index 1a085e2..03a9a05 100644 --- a/gateway/package.json +++ b/gateway/package.json @@ -21,7 +21,7 @@ "author": "Fluence Labs", "license": "Apache-2.0", "dependencies": { - "@fluencelabs/js-client": "0.1.7", + "@fluencelabs/js-client": "0.3.0", "@fluencelabs/marine-worker": "0.3.3", "body-parser": "1.20.2", "express": "4.18.2", diff --git a/gateway/src/index.js b/gateway/src/index.js index 7c7ddef..7242340 100644 --- a/gateway/src/index.js +++ b/gateway/src/index.js @@ -41,7 +41,7 @@ const route = "/"; const server = new JSONRPCServer(); // initialize fluence client -await Fluence.connect(config.relay); +await Fluence.connect(config.relay, {}); const peerId = (await Fluence.getClient()).getPeerId(); // handler for logger