diff --git a/README.md b/README.md index fdeffc6..cb8c594 100644 --- a/README.md +++ b/README.md @@ -511,23 +511,24 @@ Note that the deal's section in [fluence.yaml](./fluence.yaml) specifies the min Now that we have our services deployed and ready for action, it's time to look at Aqua, which is utilized by the Gateway to bridge HTTP to/from libp2p. Let's have a look at the Aqua code and structure. -[rpc.aqua]("./gateway/aqua/rpc.aqua") contains the necessary dependencies and algorithms. +File [rpc.aqua]("./gateway/aqua/rpc.aqua") is the file where fRPC algorithms and entrypoints used by gateway are defined. ```aqua -- rpc.aqua +aqua RPC + import "@fluencelabs/aqua-lib/builtin.aqua" -import "deals.aqua" +import Subnet, Worker from "@fluencelabs/aqua-lib/subnet.aqua" + import "services.aqua" -import "@fluencelabs/registry/subnetwork.aqua" -import Registry, Record from "@fluencelabs/registry/registry-service.aqua" -import "@fluencelabs/spell/spell_service.aqua" +use "deals.aqua" ``` -Two of the dependencies (should) stand out: *deals.aqua* and *services.aqua* as they are local files located in the project *.fluence* directory: *services.aqua* contains the interface exports from the *eth-rpc* wasm module and *deals.aqua* maps the values from *deployed.yaml* to data structures usable by your aqua code. Since these files are dynamically generated by Fluence CLI, you need to (re-) compile your Aqua after every change to your Wasm code or deal deploy updates. For all things Aqua refer to the [Aqua book](https://fluence.dev/docs/aqua-book/introduction), the [aqua playground](https://github.com/fluencelabs/aqua-playground) and the respective repos: [aqua-lib](https://github.com/fluencelabs/aqua-lib), [registry](https://github.com/fluencelabs/registry), [spell](https://github.com/fluencelabs/spell). +Two of the dependencies (should) stand out: *deals.aqua* and *services.aqua* as they are local files located in the project *.fluence* directory: *services.aqua* contains the interface exports from the *eth-rpc* wasm module and *deals.aqua* maps the values from *deployed.yaml* to data structures usable by your aqua code. Since these files are dynamically generated by Fluence CLI, you need to (re-) compile your Aqua after every change to your Wasm code or deal deploy updates. For further details and examples, consult the [Aqua book](https://fluence.dev/docs/aqua-book/introduction), explore the [aqua playground](https://github.com/fluencelabs/aqua-playground) and visit the relevant repositories: [aqua-lib](https://github.com/fluencelabs/aqua-lib), [registry](https://github.com/fluencelabs/registry), [spell](https://github.com/fluencelabs/spell). ### fRPC Gateway Configuration -The gateway config file, e.g., [quickstart_config.json](./configs/quickstart_config.json), contains the parameters necessary for the gateway to connect to the Fluence p2p network and to configure the gateway's behavior. The gateway config file is a json file with the following parameters: +The gateway config file, e.g., [quickstart_config.json](./configs/quickstart_config.json), contains the parameters for Fluence p2p network connection and gateway behavior. Key parameters include: * *providers*: an array of RPC endpoint urls, e.g., Infura, Alchemy, Ankr, etc. * *mode*: one of "random", "round-robin" or "quorum" to specify the endpoint selection algorithm @@ -537,7 +538,31 @@ The gateway config file, e.g., [quickstart_config.json](./configs/quickstart_con ### fRPC Algorithms -The fRPC substrate comes with basic implementations of several algorithms useful in mitigating failure as the result of availability and lack of trustlessness with respect to RPC endpoints and possibly peers. +The fRPC substrate offers basic algorithms to enhance reliability, addressing issues related to RPC endpoint availability and trustworthiness. + +Let's first examine *balancedEthCall* in [rpc.aqua]("./gateway/aqua/rpc.aqua"): + +```aqua +-- Call RPC method with load balancing +func balancedEthCall{Logger, Balancer}(method: string, jsonArgs: []string) -> JsonString: -- (1) + on HOST_PEER_ID: -- (2) + worker, provider <- Balancer.next() -- (3) + Logger.logWorker(worker) -- (4) + Logger.logCall(provider) -- (4) + rpc <- fromWorkerProvider(worker, provider) -- (5) + result <- rpcCall{rpc}(method, jsonArgs) -- (6) + <- result -- (7) +``` + +This function is a building block for other algorithms that allows to make a call to a RPC endpoint with some balancing logic. Let's go through the code line by line: + +* (1) Function declaration states that two abilities are required to execute it: *Logger* and *Balancer*. To learn more about abilities, see [Abilities](https://fluence.dev/docs/aqua-book/language/abilities). +* (2) The function is executed on the host peer, i.e. the relay peer we used to connect to Fluence p2p network. +* (3) Worker and RPC provider are determined by *Balancer*. +* (4) Worker and provider are logged for debugging purposes. +* (5) RPC ability is created from worker and provider with a helper function *fromWorkerProvider*. +* (6) RPC ability is passed to *rpcCall* function to make the actual call. +* (7) Result of the call is returned. #### Random @@ -547,26 +572,28 @@ Randomization the selection of one out of many RPC endpoints by itself is a weak The fRPC substrate implementation is very basic from a business logic perspective but illustrates how to randomly choose both a worker, which represents the deployed service on a particular peer, and an RPC endpoint: -```python -func randomLoadBalancing(uris: []string, method: string, jsonArgs: []string, callFunc: string, string, []string -> JsonString) -> JsonString: - on HOST_PEER_ID: --< 1 - workers <- getWorkers() --< 2 - workersNum = workers.length - -- choose worker randomly - timeW <- NumOp.identity(Peer.timestamp_sec()) - workerNumber = timeW % workers.length --< 3 - worker = workers[workerNumber] - -- choose provider randomly - timeP <- NumOp.identity(Peer.timestamp_sec()) - providerNumber = timeP % uris.length --< 5 - provider = uris[providerNumber] - result <- callOnWorker(worker, provider, method, jsonArgs, callFunc) --< 6 - Logger.logWorker(worker) --< 7 - Logger.logCall(uris[providerNumber]) - <- result +```aqua +func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) -> JsonString: + result: ?JsonString + + workers, error <- getWorkers() -- (1) + if error != nil: + result <- errorJsonString(error!) + else: + log <- initPeerLogger() -- (2) + random <- timeRandom() -- (2) + balancer <- randomBalancer{random}(workers, uris) -- (2) + result <- balancedEthCall{log, balancer}(method, jsonArgs) -- (3) + + <- result! ``` -We want to execute our Aqua program on the peer of the client, i.e. the gateway's client peer, connected to (1). To set up our randomized worker and endpoint selection, we need to get the workers running our eth-rpc service (2). We then calculate the index *workerNumber* as a random integer (3) and do the same for the endpoint provider (5). Finally, we call the chosen worker with the chosen endpoint url (6) and print our randomly chosen integers to the screen (7) before returning the result. +The *randomLoadBalancingEth* function is build upon *balancedEthCall*: +* (1) Workers that are part of the deal are fetched from the network. +* (2) Logger and random balancer are initialized. +* (3) *balancedEthCall* is called with logger and balancer. + +Note that time service is used to generate random numbers. This is not a good idea for production, but it's good enough for demonstration. #### Round robin @@ -576,24 +603,29 @@ We want to execute our Aqua program on the peer of the client, i.e. the gateway' * *counterServiceId*: the service id of the counter service * *counterPeerId*: the peer id of the counter service +A round robin algorithm cycles through the different options usually in a predictable manner. This substrate implementation is no different: + ```aqua -func roundRobin(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string, callFunc: string, string, []string -> JsonString) -> JsonString: - on counterPeerId: --< 1 - Counter counterServiceId - requestNumber <- Counter.incrementAndReturn() - on HOST_PEER_ID: --< 2 - workers <- getWorkers() - workerNumber = requestNumber % workers.length - worker = workers[workerNumber] --< 3 - providerNumber = requestNumber % uris.length - provider = uris[providerNumber] --< 4 - result <- callOnWorker(worker, provider, method, jsonArgs, callFunc) --< 5 - Logger.logWorker(worker) - Logger.logCall(uris[providerNumber]) - <- result +func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string) -> JsonString: + result: ?JsonString + + workers, error <- getWorkers() + if error != nil: + result <- errorJsonString(error!) + else: + log <- initPeerLogger() + counter <- onPeerCounter(counterPeerId, counterServiceId) -- (1) + balancer <- cycleBalancer{counter}(workers, uris) -- (2) + result <- balancedEthCall{log, balancer}(method, jsonArgs) + + <- result! ``` -A round robin algorithm cycles through the different options usually in a predictable manner. This substrate implementation is no different. To keep the state of the *cycle index*, we use a counter based on a local, [js-client](https://github.com/fluencelabs/js-client) based service [implementation]("./gateway/src/index.js) (1). Here, the peer executing the *Counter* service is the (local) client-peer implemented by the gateway. Note that the state of the counter is limited to the life of the gateway. With the incremented counter in place, we had to our relay (2), determine our worker (3) and provider (4) indexes, call for the service execution (5), log and return the result. +The *roundRobinEth* function is very similar to *randomLoadBalancingEth*, except for the balancer: +* (1) Counter ability is created from peer id and service id. +* (2) Cycle balancer is created from counter and workers. + +To keep the state of the *cycle index*, we use a counter based on a local, [js-client](https://github.com/fluencelabs/js-client) based [service](./gateway/src/index.js). The peer executing the *Counter* service is the (local) client-peer implemented by the gateway. Note that the state of the counter is limited to the life of the gateway. #### Quorum @@ -604,34 +636,57 @@ A round robin algorithm cycles through the different options usually in a predic * *quorumPeerId*: the peer id of the quorum service * *quorumNumber*: the number of results that must be equal to determine a quorum result +A quorum, aka "off-chain consensus", "determines" a result by a ranked frequency distribution of the results pool and makes a selection against a quorum threshold value, e.g., 2/3 of items in the results pool must be equal for a quorum result to be accepted. Moreover, additional parameters such as the minimum number of items in the result pool may be added. Depending on your trust in the peers processing the endpoint requests or even the peer executing the quorum algorithm, additional verification steps may have to be added. There is one more pertinent consideration when it comes to designing quorum algorithms: the differentiation between (on-chain) read and write operations. + +In the fRPC substrate implementation, we provide a basic quorum algorithm: + ```aqua -func quorum( - uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string, - callFunc: string, string, []string -> JsonString -) -> QuorumResult: +func quorum{ProviderBalancer, QuorumChecker}(workers: []Worker, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string) -> QuorumResult: -- (1) results: *JsonString - on HOST_PEER_ID: - workers <- getWorkers() - for worker <- workers par: --< 1 - on worker.metadata.peer_id via worker.metadata.relay_id: - -- choose provider randomly - timeP <- NumOp.identity(Peer.timestamp_ms()) - providerNumber = timeP % uris.length - provider = uris[providerNumber] - results <- callFunc(provider, method, jsonArgs) --< 2 - -- wait all results from all workers with timeout - join results[workers.length - 1] - par Peer.timeout(timeout, "") --< 3 - on quorumPeerId via HOST_PEER_ID: - Counter quorumServiceId - -- check all results that we got - quorumResult <- QuorumChecker.check(results, quorumNumber) --< 4 - <- quorumResult + on HOST_PEER_ID: -- (2) + for worker <- workers par: -- (3) + provider <- ProviderBalancer.nextProvider() -- (4) + rpc <- fromWorkerProvider(worker, provider) -- (5) + results <- rpcCall{rpc}(method, jsonArgs) -- (6) + + -- wait all results from all workers with timeout + join results[workers.length - 1] -- (7) + par Peer.timeout(timeout, "Workers timeout") -- (7) + + <- QuorumChecker.check(results, quorumNumber) -- (8) ``` -A quorum, aka "off-chain consensus", "determines" a result by a ranked frequency distribution of the results pool and makes a selection against a quorum threshold value, e.g., 2/3 of items in the results pool must be equal for a quorum result to be accepted. Moreover, additional parameters such as the minimum number of items in the result pool may be added. Depending on your trust in the peers processing the endpoint requests or even the peer executing the quorum algorithm, additional verification steps may have to be added. There is one more pertinent consideration when it comes to designing quorum algorithms: the differentiation between (on-chain) read and write operations. +Let's examine the code line by line: +* (1) Function declaration states that two abilities are required to execute it: *ProviderBalancer* and *QuorumChecker*. To learn more about abilities, see [Abilities](https://fluence.dev/docs/aqua-book/language/abilities). +* (2) The function is executed on the host peer, i.e. the relay peer we used to connect to Fluence p2p network. +* (3) For each worker in parallel: + * (4) Provider is determined by *ProviderBalancer*. + * (5) RPC ability is created from worker and provider with a helper function *fromWorkerProvider*. + * (6) RPC ability is passed to *rpcCall* function to make the actual call. Result is saved in *results* stream variable. +* (7) Results from all workers are waited for with a timeout. For more information, see [Timeout and race patterns](https://fluence.dev/docs/aqua-book/language/flow/parallel#timeout-and-race-patterns). +* (8) Final result is determined by *QuorumChecker* based on all results gathered at this point. + +As evidenced by the code, no considerations to differentiate between read and write operations are made, which might prove disadvantageous when submitting, for example, a signed transaction. + +The actual entrypoint, *quorumEthCall*, is a wrapper around *quorum*: + +```aqua +func quorumEth(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string) -> QuorumResult: + result: *QuorumResult + + workers, error <- getWorkers() + if error != nil: + result <- errorQuorumResult(error!) + else: + random <- timeRandom() + balancer <- randomBalancer{random}(workers, uris) + quorumChecker <- onPeerQuorumChecker(quorumPeerId, quorumServiceId) + result <- quorum{balancer, quorumChecker}(workers, quorumNumber, timeout, method, jsonArgs) + + <- result! +``` -In the fRPC substrate implementation, we provide a basic quorum algo that polls each endpoint in parallel (1) and captures the results in a stream variable (2) and binds the loop with a timeout condition (3) running in parallel to (1). See the [Aqua book](https://fluence.dev/docs/aqua-book/language/flow/parallel#timeout-and-race-patterns) for more details. Finally, we check the results and return the result (4). As evidenced by the code, no considerations to differentiate between read and write operations are made, which might prove disadvantageous when submitting, for example, a signed transaction. +It is very similar to *randomLoadBalancingEth* and *roundRobinEth*, except for the balancer and quorum checker initialization. To determine the quorum result, we use a local, [js-client](https://github.com/fluencelabs/js-client) based [service](./gateway/src/index.js). The peer executing the *QuorumChecker* service is the (local) client-peer implemented by the gateway. ## Summary diff --git a/gateway/aqua/balancer.aqua b/gateway/aqua/balancer.aqua new file mode 100644 index 0000000..980cb00 --- /dev/null +++ b/gateway/aqua/balancer.aqua @@ -0,0 +1,55 @@ +module Balancer declares Balancer, ProviderBalancer, WorkersBalancer, randomBalancer, cycleBalancer + +import Worker from "@fluencelabs/aqua-lib/subnet.aqua" + +import Counter from "counter.aqua" +import Random from "random.aqua" +import Provider from "provider.aqua" + +ability WorkersBalancer: + nextWorker() -> Worker + +ability ProviderBalancer: + nextProvider() -> Provider + +ability Balancer: + nextWorker() -> Worker + nextProvider() -> Provider + next() -> Worker, Provider + +-- Create balancer that returns +-- workers and providers in random order +func randomBalancer{Random}(workers: []Worker, providers: []Provider) -> Balancer: + -- closures do not capture topology here + nextWorker = func () -> Worker: + rand <- Random.next() + idx = rand % workers.length + <- workers[idx] + + nextProvider = func () -> Provider: + rand <- Random.next() + idx = rand % providers.length + <- providers[idx] + + next = func () -> Worker, Provider: + <- nextWorker(), nextProvider() + + <- Balancer(next=next, nextWorker=nextWorker, nextProvider=nextProvider) + +-- Create balancer that returns +-- workers and providers in cycle order +func cycleBalancer{Counter}(workers: []Worker, providers: []Provider) -> Balancer: + next = func () -> Worker, Provider: + n <- Counter.incrementAndReturn() + idx = n % workers.length + <- workers[idx], providers[idx] + + nextWorker = func () -> Worker: + w, p <- next() + <- w + + nextProvider = func () -> Provider: + w, p <- next() + <- p + + <- Balancer(next=next, nextWorker=nextWorker, nextProvider=nextProvider) \ No newline at end of file diff --git a/gateway/aqua/counter.aqua b/gateway/aqua/counter.aqua new file mode 100644 index 0000000..5f521bd --- /dev/null +++ b/gateway/aqua/counter.aqua @@ -0,0 +1,22 @@ +module Counter declares Counter, onPeerCounter + +export CounterSrv + +ability Counter: + incrementAndReturn() -> u32 + +service CounterSrv("counter"): + incrementAndReturn() -> u32 + +-- Create Counter ability that +-- counts on peer through CounterSrv(id) +func onPeerCounter(peer: string, id: string) -> Counter: + -- closure does not capture topology here + incAndReturn = func () -> u32: + on peer: + CounterSrv id + res <- CounterSrv.incrementAndReturn() + <- res + + <- Counter(incrementAndReturn = incAndReturn) + diff --git a/gateway/aqua/eth_rpc.aqua b/gateway/aqua/eth_rpc.aqua new file mode 100644 index 0000000..8a0d6bc --- /dev/null +++ b/gateway/aqua/eth_rpc.aqua @@ -0,0 +1,22 @@ +module RPCEth declares RPCEth, fromWorkerProvider + +import Worker from "@fluencelabs/aqua-lib/subnet.aqua" + +import "services.aqua" + +import Provider from "provider.aqua" + +-- Ability to call Ethereum JSON RPC methods +ability RPCEth: + call(method: string, jsonArgs: []string) -> JsonString + +-- Create RPCEth ability from Worker and Provider +func fromWorkerProvider(worker: Worker, provider: Provider) -> RPCEth: + -- closure does not capture topology here + call = func (method: string, jsonArgs: []string) -> JsonString: + -- TODO: Handle worker_id == nil? + on worker.worker_id! via worker.host_id: + res <- EthRpc.eth_call(provider, method, jsonArgs) + <- res + + <- RPCEth(call = call) diff --git a/gateway/aqua/logger.aqua b/gateway/aqua/logger.aqua new file mode 100644 index 0000000..af03a5c --- /dev/null +++ b/gateway/aqua/logger.aqua @@ -0,0 +1,38 @@ +module Logger declares Logger, initPeerLogger + +export LoggerSrv + +import Worker from "@fluencelabs/aqua-lib/subnet.aqua" + +ability Logger: + log(s: []string) + logNum(n: u32) + logCall(s: string) + logWorker(w: Worker) + +service LoggerSrv("logger"): + log(s: []string) + logNum(n: u32) + logCall(s: string) + logWorker(w: Worker) + +-- Create Logger ability that logs +-- on INIT_PEER_ID via HOST_PEER_ID +-- through LoggerSrv +func initPeerLogger() -> Logger: + -- closures do not capture topology here + + log = func (s: []string): + on INIT_PEER_ID via HOST_PEER_ID: + LoggerSrv.log(s) + logNum = func (n: u32): + on INIT_PEER_ID via HOST_PEER_ID: + LoggerSrv.logNum(n) + logCall = func (s: string): + on INIT_PEER_ID via HOST_PEER_ID: + LoggerSrv.logCall(s) + logWorker = func (w: Worker): + on INIT_PEER_ID via HOST_PEER_ID: + LoggerSrv.logWorker(w) + + <- Logger(log=log, logNum=logNum, logCall=logCall, logWorker=logWorker) \ No newline at end of file diff --git a/gateway/aqua/provider.aqua b/gateway/aqua/provider.aqua new file mode 100644 index 0000000..8fc08a4 --- /dev/null +++ b/gateway/aqua/provider.aqua @@ -0,0 +1,3 @@ +module Provider declares Provider + +alias Provider: string \ No newline at end of file diff --git a/gateway/aqua/quorum.aqua b/gateway/aqua/quorum.aqua new file mode 100644 index 0000000..b8f303a --- /dev/null +++ b/gateway/aqua/quorum.aqua @@ -0,0 +1,29 @@ +module Quorum declares QuorumChecker, QuorumResult, onPeerQuorumChecker + +import JsonString from "services.aqua" + +export QuorumCheckerSrv + +data QuorumResult: + value: string + results: []JsonString + error: string + +-- Ability to check if a quorum on results is reached +ability QuorumChecker: + check(results: []JsonString, minResults: u32) -> QuorumResult + +service QuorumCheckerSrv("quorum"): + check(results: []JsonString, minResults: u32) -> QuorumResult + +-- Create a QuorumChecker ability +-- that checks quorum on peer through QuorumCheckerSrv(id) +func onPeerQuorumChecker(peer: string, id: string) -> QuorumChecker: + -- closure does not capture topology here + check = func (results: []JsonString, minResults: u32) -> QuorumResult: + on peer: + QuorumCheckerSrv id + res <- QuorumCheckerSrv.check(results, minResults) + <- res + + <- QuorumChecker(check = check) \ No newline at end of file diff --git a/gateway/aqua/random.aqua b/gateway/aqua/random.aqua new file mode 100644 index 0000000..5863635 --- /dev/null +++ b/gateway/aqua/random.aqua @@ -0,0 +1,18 @@ +module Random declares Random, timeRandom + +import Peer from "@fluencelabs/aqua-lib/builtin.aqua" + +import NumOp from "utils.aqua" + +ability Random: + next() -> i64 + +-- Create random from timestamp +func timeRandom() -> Random: + -- closure does not capture topology here + next = func () -> i64: + t <- Peer.timestamp_sec() + n <- NumOp.identity(t) + <- n + + <- Random(next = next) \ No newline at end of file diff --git a/gateway/aqua/rpc.aqua b/gateway/aqua/rpc.aqua index 2754d32..328f39b 100644 --- a/gateway/aqua/rpc.aqua +++ b/gateway/aqua/rpc.aqua @@ -1,118 +1,108 @@ +aqua RPC + import "@fluencelabs/aqua-lib/builtin.aqua" import Subnet, Worker from "@fluencelabs/aqua-lib/subnet.aqua" + import "services.aqua" use "deals.aqua" -export randomLoadBalancingEth, roundRobinEth, quorumEth, Counter, Logger - -data QuorumResult: - value: string - results: []JsonString - error: string - -service Logger("logger"): - log(s: []string) - logNum(n: u32) - logCall(s: string) - logWorker(w: Worker) +import Logger, initPeerLogger from "logger.aqua" +import Balancer, ProviderBalancer, randomBalancer, cycleBalancer from "balancer.aqua" +import onPeerCounter from "counter.aqua" +import QuorumChecker, QuorumResult, onPeerQuorumChecker from "quorum.aqua" +import timeRandom from "random.aqua" +import RPCEth, fromWorkerProvider from "eth_rpc.aqua" -service NumOp("op"): - identity(n: u64) -> i64 +import NumOp from "utils.aqua" -service Counter("counter"): - incrementAndReturn() -> u32 +export randomLoadBalancingEth, roundRobinEth, quorumEth -service QuorumChecker("quorum"): - check(results: []JsonString, minResults: u32) -> QuorumResult +func errorQuorumResult(msg: string) -> QuorumResult: + <- QuorumResult(value = "", results = [], error = msg) -func empty() -> JsonString: - <- JsonString(value = "", success = true, error = "") - -func call(uri: string, method: string, jsonArgs: []string) -> JsonString: - res <- EthRpc.eth_call(uri, method, jsonArgs) - <- res +func errorJsonString(msg: string) -> JsonString: + <- JsonString(value = "", success = false, error = msg) +-- Get workers participating in deal func getWorkers() -> []Worker, ?string: - deals <- Deals.get() + on INIT_PEER_ID via HOST_PEER_ID: + deals <- Deals.get() dealId = deals.defaultWorker!.dealIdOriginal on HOST_PEER_ID: result <- Subnet.resolve(dealId) - workers = result.workers - error = result.error - <- workers, error + <- result.workers, result.error -func callOnWorker(worker: Worker, uri: string, method: string, jsonArgs: []string, callFunc: string, string, []string -> JsonString) -> JsonString: - on worker.worker_id! via worker.host_id: - result <- callFunc(uri, method, jsonArgs) - <- result +-- Call RPC method through ability +func rpcCall{RPCEth}(method: string, jsonArgs: []string) -> JsonString: + <- RPCEth.call(method, jsonArgs) -func randomLoadBalancing(uris: []string, method: string, jsonArgs: []string, callFunc: string, string, []string -> JsonString) -> JsonString: +-- Call RPC method with load balancing +func balancedEthCall{Logger, Balancer}(method: string, jsonArgs: []string) -> JsonString: on HOST_PEER_ID: - workers <- getWorkers() - workersNum = workers.length - -- choose worker randomly - timeW <- NumOp.identity(Peer.timestamp_sec()) - workerNumber = timeW % workers.length - worker = workers[workerNumber] - -- choose provider randomly - timeP <- NumOp.identity(Peer.timestamp_sec()) - providerNumber = timeP % uris.length - provider = uris[providerNumber] - result <- callOnWorker(worker, provider, method, jsonArgs, callFunc) - Logger.logWorker(worker) - Logger.logCall(uris[providerNumber]) + worker, provider <- Balancer.next() + Logger.logWorker(worker) + Logger.logCall(provider) + Op.noop() -- dirty hack for topology to converge + rpc <- fromWorkerProvider(worker, provider) + result <- rpcCall{rpc}(method, jsonArgs) <- result +-- Call RPC method with random load balancing func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) -> JsonString: - <- randomLoadBalancing(uris, method, jsonArgs, call) - -func roundRobin(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string, callFunc: string, string, []string -> JsonString) -> JsonString: - on counterPeerId: - Counter counterServiceId - requestNumber <- Counter.incrementAndReturn() - on HOST_PEER_ID: - workers, error <- getWorkers() - result: *JsonString - if error == nil: - workerNumber = requestNumber % workers.length - worker = workers[workerNumber] - providerNumber = requestNumber % uris.length - provider = uris[providerNumber] - result <<- callOnWorker(worker, provider, method, jsonArgs, callFunc) - Op.noop() -- dirty hack - on INIT_PEER_ID: - Logger.logWorker(worker) - Logger.logCall(uris[providerNumber]) - else: - result <<- JsonString(value = "", success = false, error = error!) - + result: ?JsonString + + workers, error <- getWorkers() + if error != nil: + result <- errorJsonString(error!) + else: + log <- initPeerLogger() + random <- timeRandom() + balancer <- randomBalancer{random}(workers, uris) + result <- balancedEthCall{log, balancer}(method, jsonArgs) + <- result! +-- Call RPC method with round-robin load balancing func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string) -> JsonString: - <- roundRobin(uris, method, jsonArgs, counterServiceId, counterPeerId, call) + result: ?JsonString + + workers, error <- getWorkers() + if error != nil: + result <- errorJsonString(error!) + else: + log <- initPeerLogger() + counter <- onPeerCounter(counterPeerId, counterServiceId) + balancer <- cycleBalancer{counter}(workers, uris) + result <- balancedEthCall{log, balancer}(method, jsonArgs) + + <- result! -func quorum( - uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string, - callFunc: string, string, []string -> JsonString -) -> QuorumResult: +-- Call RPC method with workers quorum and provider load balancing +func quorum{ProviderBalancer, QuorumChecker}(workers: []Worker, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string) -> QuorumResult: results: *JsonString - on HOST_PEER_ID: - workers <- getWorkers() + on HOST_PEER_ID: for worker <- workers par: - on worker.worker_id! via worker.host_id: - -- choose provider randomly - timeP <- NumOp.identity(Peer.timestamp_ms()) - providerNumber = timeP % uris.length - provider = uris[providerNumber] - results <- callFunc(provider, method, jsonArgs) - -- wait all results from all workers with timeout - join results[workers.length - 1] - par Peer.timeout(timeout, "") - on quorumPeerId via HOST_PEER_ID: - Counter quorumServiceId - -- check all results that we got - quorumResult <- QuorumChecker.check(results, quorumNumber) - <- quorumResult + provider <- ProviderBalancer.nextProvider() + rpc <- fromWorkerProvider(worker, provider) + results <- rpcCall{rpc}(method, jsonArgs) + + -- wait all results from all workers with timeout + join results[workers.length - 1] + par Peer.timeout(timeout, "Workers timeout") + + <- QuorumChecker.check(results, quorumNumber) +-- Call RPC method with workers quorum and provider load balancing func quorumEth(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string) -> QuorumResult: - <- quorum(uris, quorumNumber, timeout, method, jsonArgs, quorumServiceId, quorumPeerId, call) \ No newline at end of file + result: *QuorumResult + + workers, error <- getWorkers() + if error != nil: + result <- errorQuorumResult(error!) + else: + random <- timeRandom() + balancer <- randomBalancer{random}(workers, uris) + quorumChecker <- onPeerQuorumChecker(quorumPeerId, quorumServiceId) + result <- quorum{balancer, quorumChecker}(workers, quorumNumber, timeout, method, jsonArgs) + + <- result! \ No newline at end of file diff --git a/gateway/aqua/utils.aqua b/gateway/aqua/utils.aqua new file mode 100644 index 0000000..08c5d7e --- /dev/null +++ b/gateway/aqua/utils.aqua @@ -0,0 +1,5 @@ +module Utils declares NumOp + +-- Used to coerce types +service NumOp("op"): + identity(n: u64) -> i64 \ No newline at end of file diff --git a/gateway/package-lock.json b/gateway/package-lock.json index b5cd8b2..015cbe1 100644 --- a/gateway/package-lock.json +++ b/gateway/package-lock.json @@ -9,7 +9,7 @@ "version": "0.0.18", "license": "Apache-2.0", "dependencies": { - "@fluencelabs/js-client": "0.1.7", + "@fluencelabs/js-client": "0.3.0", "@fluencelabs/marine-worker": "0.3.3", "body-parser": "1.20.2", "express": "4.18.2", @@ -625,19 +625,22 @@ } }, "node_modules/@fluencelabs/js-client": { - "version": "0.1.7", - "resolved": "https://registry.npmjs.org/@fluencelabs/js-client/-/js-client-0.1.7.tgz", - "integrity": "sha512-S9chuqlOcPMWFjVjL099WvnjeLdEWJoMIwnxXJ5zlLdRXH0qqNCeZjgtFhvaP+6k1EcxIAksHT5hK5T5aV/q8g==", + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@fluencelabs/js-client/-/js-client-0.3.0.tgz", + "integrity": "sha512-mRlEeoDEAsHK4GK3vIlNBkXgfJ01maQ4WVefob4QNEqpshipf6XQpU6R8dpUsjyhx53nus3ui6BSUV6gi5jg8A==", "dependencies": { "@chainsafe/libp2p-noise": "13.0.0", "@chainsafe/libp2p-yamux": "5.0.0", + "@fluencelabs/avm": "0.52.0", "@fluencelabs/interfaces": "0.8.2", + "@fluencelabs/marine-worker": "0.4.0", "@libp2p/crypto": "2.0.3", "@libp2p/interface": "0.1.2", "@libp2p/peer-id": "3.0.2", "@libp2p/peer-id-factory": "3.0.3", "@libp2p/websockets": "7.0.4", "@multiformats/multiaddr": "11.3.0", + "assert": "2.1.0", "async": "3.2.4", "bs58": "5.0.0", "buffer": "6.0.3", @@ -649,16 +652,32 @@ "libp2p": "0.46.6", "multiformats": "11.0.1", "rxjs": "7.5.5", - "threads": "1.7.0", + "threads": "github:fluencelabs/threads.js#b00a5342380b0278d3ae56dcfb170effb3cad7cd", "ts-pattern": "3.3.3", "uint8arrays": "4.0.3", - "uuid": "8.3.2" + "uuid": "8.3.2", + "zod": "3.22.4" }, "engines": { "node": ">=10", "pnpm": ">=8" } }, + "node_modules/@fluencelabs/js-client/node_modules/@fluencelabs/avm": { + "version": "0.52.0", + "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.52.0.tgz", + "integrity": "sha512-T+/Hv/ZPfwWZAC4tH6wEDIRmtN6cTBebqbCaHfaq2PDLSMG0BgssdFF2BAaEXrvOvJbI5Bu/9bQhHv4ga7bYlA==" + }, + "node_modules/@fluencelabs/js-client/node_modules/@fluencelabs/marine-worker": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/@fluencelabs/marine-worker/-/marine-worker-0.4.0.tgz", + "integrity": "sha512-nWri+j8Ey4UXoB32NPKsmVYzUKj6mwD7vh/5MjzCxrnVthnWnFdnkETF2BnZwjZWc701xeVhF3L5ZSjiQzKywQ==", + "dependencies": { + "@fluencelabs/marine-js": "0.7.2", + "observable-fns": "0.6.1", + "threads": "github:fluencelabs/threads.js#b00a5342380b0278d3ae56dcfb170effb3cad7cd" + } + }, "node_modules/@fluencelabs/marine-js": { "version": "0.7.2", "resolved": "https://registry.npmjs.org/@fluencelabs/marine-js/-/marine-js-0.7.2.tgz", @@ -1704,6 +1723,18 @@ "resolved": "https://registry.npmjs.org/array-flatten/-/array-flatten-1.1.1.tgz", "integrity": "sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==" }, + "node_modules/assert": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/assert/-/assert-2.1.0.tgz", + "integrity": "sha512-eLHpSK/Y4nhMJ07gDaAzoX/XAKS8PSaojml3M0DM4JpV1LAi5JOJ/p6H/XWrl8L+DzVEvVCW1z3vWAaB9oTsQw==", + "dependencies": { + "call-bind": "^1.0.2", + "is-nan": "^1.3.2", + "object-is": "^1.1.5", + "object.assign": "^4.1.4", + "util": "^0.12.5" + } + }, "node_modules/async": { "version": "3.2.4", "resolved": "https://registry.npmjs.org/async/-/async-3.2.4.tgz", @@ -2064,6 +2095,35 @@ "node": ">=14" } }, + "node_modules/define-data-property": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/define-data-property/-/define-data-property-1.1.1.tgz", + "integrity": "sha512-E7uGkTzkk1d0ByLeSc6ZsFS79Axg+m1P/VsgYsxHgiuc3tFSj+MjMIwe90FC4lOAZzNBdY7kkO2P2wKdsQ1vgQ==", + "dependencies": { + "get-intrinsic": "^1.2.1", + "gopd": "^1.0.1", + "has-property-descriptors": "^1.0.0" + }, + "engines": { + "node": ">= 0.4" + } + }, + "node_modules/define-properties": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/define-properties/-/define-properties-1.2.1.tgz", + "integrity": "sha512-8QmQKqEASLd5nx0U1B1okLElbUuuttJ/AnYmRXbbbGDWh6uS208EjD4Xqq/I9wK7u0v6O08XhTWnt5XtEbR6Dg==", + "dependencies": { + "define-data-property": "^1.0.1", + "has-property-descriptors": "^1.0.0", + "object-keys": "^1.1.1" + }, + "engines": { + "node": ">= 0.4" + }, + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, "node_modules/delay": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/delay/-/delay-6.0.0.tgz", @@ -2409,19 +2469,22 @@ "integrity": "sha512-FNUvuTAJ3CqCQb5ELn+qCbGR/Zllhf2HtwsdAtBi59s1WeCjKMT81fHcSu7dwIskqGVK+MmOrb7VOBlq3/SItw==" }, "node_modules/function-bind": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz", - "integrity": "sha512-yIovAzMX49sF8Yl58fSCWJ5svSLuaibPxXQJFLmBObTuCr0Mf1KiPopGM9NiFjiYBCbfaa2Fh6breQ6ANVTI0A==" + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/function-bind/-/function-bind-1.1.2.tgz", + "integrity": "sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==", + "funding": { + "url": "https://github.com/sponsors/ljharb" + } }, "node_modules/get-intrinsic": { - "version": "1.2.1", - "resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.1.tgz", - "integrity": "sha512-2DcsyfABl+gVHEfCOaTrWgyt+tb6MSEGmKq+kI5HwLbIYgjgmMcV8KQ41uaKz1xxUcn9tJtgFbQUEVcEbd0FYw==", + "version": "1.2.2", + "resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.2.tgz", + "integrity": "sha512-0gSo4ml/0j98Y3lngkFEot/zhiCeWsbYIlZ+uZOVgzLyLaUw7wxUL+nCTP0XJvJg1AXulJRI3UJi8GsbDuxdGA==", "dependencies": { - "function-bind": "^1.1.1", - "has": "^1.0.3", + "function-bind": "^1.1.2", "has-proto": "^1.0.1", - "has-symbols": "^1.0.3" + "has-symbols": "^1.0.3", + "hasown": "^2.0.0" }, "funding": { "url": "https://github.com/sponsors/ljharb" @@ -2454,15 +2517,15 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/has": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/has/-/has-1.0.3.tgz", - "integrity": "sha512-f2dvO0VU6Oej7RkWJGrehjbzMAjFp5/VKPp5tTpWIV4JHHZK1/BxbFRtf/siA2SWTe09caDmVtYYzWEIbBS4zw==", + "node_modules/has-property-descriptors": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/has-property-descriptors/-/has-property-descriptors-1.0.1.tgz", + "integrity": "sha512-VsX8eaIewvas0xnvinAe9bw4WfIeODpGYikiWYLH+dma0Jw6KHYqWiWfhQlgOVK8D6PvjubK5Uc4P0iIhIcNVg==", "dependencies": { - "function-bind": "^1.1.1" + "get-intrinsic": "^1.2.2" }, - "engines": { - "node": ">= 0.4.0" + "funding": { + "url": "https://github.com/sponsors/ljharb" } }, "node_modules/has-proto": { @@ -2510,6 +2573,17 @@ "minimalistic-assert": "^1.0.1" } }, + "node_modules/hasown": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/hasown/-/hasown-2.0.0.tgz", + "integrity": "sha512-vUptKVTpIJhcczKBbgnS+RtcuYMB8+oNzPK2/Hp3hanz8JmpATdmmgLgSaadVREkDm+e2giHwY3ZRkyjSIDDFA==", + "dependencies": { + "function-bind": "^1.1.2" + }, + "engines": { + "node": ">= 0.4" + } + }, "node_modules/hmac-drbg": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/hmac-drbg/-/hmac-drbg-1.0.1.tgz", @@ -2662,6 +2736,21 @@ "resolved": "https://registry.npmjs.org/is-loopback-addr/-/is-loopback-addr-2.0.2.tgz", "integrity": "sha512-26POf2KRCno/KTNL5Q0b/9TYnL00xEsSaLfiFRmjM7m7Lw7ZMmFybzzuX4CcsLAluZGd+niLUiMRxEooVE3aqg==" }, + "node_modules/is-nan": { + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/is-nan/-/is-nan-1.3.2.tgz", + "integrity": "sha512-E+zBKpQ2t6MEo1VsonYmluk9NxGrbzpeeLC2xIViuO2EjU2xsXsBPwTr3Ykv9l08UYEVEdWeRZNouaZqF6RN0w==", + "dependencies": { + "call-bind": "^1.0.0", + "define-properties": "^1.1.3" + }, + "engines": { + "node": ">= 0.4" + }, + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, "node_modules/is-observable": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/is-observable/-/is-observable-2.1.0.tgz", @@ -3396,6 +3485,46 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/object-is": { + "version": "1.1.5", + "resolved": "https://registry.npmjs.org/object-is/-/object-is-1.1.5.tgz", + "integrity": "sha512-3cyDsyHgtmi7I7DfSSI2LDp6SK2lwvtbg0p0R1e0RvTqF5ceGx+K2dfSjm1bKDMVCFEDAQvy+o8c6a7VujOddw==", + "dependencies": { + "call-bind": "^1.0.2", + "define-properties": "^1.1.3" + }, + "engines": { + "node": ">= 0.4" + }, + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, + "node_modules/object-keys": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/object-keys/-/object-keys-1.1.1.tgz", + "integrity": "sha512-NuAESUOUMrlIXOfHKzD6bpPu3tYt3xvjNdRIQ+FeT0lNb4K8WR70CaDxhuNguS2XG+GjkyMwOzsN5ZktImfhLA==", + "engines": { + "node": ">= 0.4" + } + }, + "node_modules/object.assign": { + "version": "4.1.4", + "resolved": "https://registry.npmjs.org/object.assign/-/object.assign-4.1.4.tgz", + "integrity": "sha512-1mxKf0e58bvyjSCtKYY4sRe9itRk3PJpquJOjeIkz885CczcI4IvJJDLPS72oowuSh+pBxUFROpX+TU++hxhZQ==", + "dependencies": { + "call-bind": "^1.0.2", + "define-properties": "^1.1.4", + "has-symbols": "^1.0.3", + "object-keys": "^1.1.1" + }, + "engines": { + "node": ">= 0.4" + }, + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, "node_modules/observable-fns": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/observable-fns/-/observable-fns-0.6.1.tgz", @@ -3892,8 +4021,9 @@ }, "node_modules/threads": { "version": "1.7.0", - "resolved": "https://registry.npmjs.org/threads/-/threads-1.7.0.tgz", - "integrity": "sha512-Mx5NBSHX3sQYR6iI9VYbgHKBLisyB+xROCBGjjWm1O9wb9vfLxdaGtmT/KCjUqMsSNW6nERzCW3T6H43LqjDZQ==", + "resolved": "git+ssh://git@github.com/fluencelabs/threads.js.git#b00a5342380b0278d3ae56dcfb170effb3cad7cd", + "integrity": "sha512-7ej6ZkM5NHJhEGiOf+pxt+LqJebbvKrXLfvcpOy1X6PIop3ap+a3ULgx+VVdYj8Nw6jYVGYTHCriVtTyoWZN9Q==", + "license": "MIT", "dependencies": { "callsites": "^3.1.0", "debug": "^4.2.0", @@ -4483,9 +4613,9 @@ "integrity": "sha512-FIr/DEeoHfj7ftfylnoFt3rAIRoWXpx2AoDfrT2qD2wtp7Dp+COajvs/Icb7uHqRW9m60f5iXZwdsJJO3kvb7w==" }, "node_modules/zod": { - "version": "3.22.2", - "resolved": "https://registry.npmjs.org/zod/-/zod-3.22.2.tgz", - "integrity": "sha512-wvWkphh5WQsJbVk1tbx1l1Ly4yg+XecD+Mq280uBGt9wa5BKSWf4Mhp6GmrkPixhMxmabYY7RbzlwVP32pbGCg==", + "version": "3.22.4", + "resolved": "https://registry.npmjs.org/zod/-/zod-3.22.4.tgz", + "integrity": "sha512-iC+8Io04lddc+mVqQ9AZ7OQ2MrUKGN+oIQyq1vemgt46jwCwLfhq7/pwnBnNXXXZb8VTVLKwp9EDkx+ryxIWmg==", "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/gateway/package.json b/gateway/package.json index 1a085e2..03a9a05 100644 --- a/gateway/package.json +++ b/gateway/package.json @@ -21,7 +21,7 @@ "author": "Fluence Labs", "license": "Apache-2.0", "dependencies": { - "@fluencelabs/js-client": "0.1.7", + "@fluencelabs/js-client": "0.3.0", "@fluencelabs/marine-worker": "0.3.3", "body-parser": "1.20.2", "express": "4.18.2", diff --git a/gateway/src/config.js b/gateway/src/config.js index bc10a14..c3e1b0d 100644 --- a/gateway/src/config.js +++ b/gateway/src/config.js @@ -1,24 +1,34 @@ -import fs from 'fs'; +import fs from "fs"; -export const configHelp = "Config structure: { port, relay, serviceId, providers, mode, counterServiceId?, counterPeerId?}\n" + - "Where 'mode' can be: 'random' (default), 'round-robin' or 'quorum',\n" + - "'counterServiceId' and 'counterPeerId' will use local service if undefined.\n" - "'quorumServiceId' and 'quorumPeerId' will use local service if undefined.\n" +export const configHelp = + "Config structure: { port, relay, serviceId, providers, mode, counterServiceId?, counterPeerId?}\n" + + "Where 'mode' can be: 'random' (default), 'round-robin' or 'quorum',\n" + + "'counterServiceId' and 'counterPeerId' will use local service if undefined.\n"; +("'quorumServiceId' and 'quorumPeerId' will use local service if undefined.\n"); export function readConfig(path) { - const rawdata = fs.readFileSync(path); - const config = JSON.parse(rawdata); + const rawdata = fs.readFileSync(path); + const config = JSON.parse(rawdata); - let errors = [] - if (!config.port) { - errors.push("Specify port ('port') in config") - } - if (!config.relay) { - errors.push("Specify Fluence peer address ('relay') in config") - } + let errors = []; + if (!config.port) { + errors.push("Specify port ('port') in config"); + } + if (!config.relay) { + errors.push("Specify Fluence peer address ('relay') in config"); + } + if ( + !!config.mode && + !["random", "round-robin", "quorum"].includes(config.mode) + ) { + errors.push( + `Incorrect mode '${config.mode}' in config. Should be 'random', 'round-robin' or 'quorum'` + ); + } - return { - config, errors, - help: configHelp - } + return { + config, + errors, + help: configHelp, + }; } diff --git a/gateway/src/index.js b/gateway/src/index.js index f3a0917..7242340 100644 --- a/gateway/src/index.js +++ b/gateway/src/index.js @@ -5,15 +5,15 @@ import express from "express"; import bodyParser from "body-parser"; import { JSONRPCServer } from "json-rpc-2.0"; -import { Fluence } from '@fluencelabs/js-client'; +import { Fluence } from "@fluencelabs/js-client"; import { - quorumEth, - randomLoadBalancingEth, - registerCounter, - registerLogger, - registerQuorumChecker, - roundRobinEth + quorumEth, + randomLoadBalancingEth, + roundRobinEth, } from "../aqua-compiled/rpc.js"; +import { registerLoggerSrv } from "../aqua-compiled/logger.js"; +import { registerCounterSrv } from "../aqua-compiled/counter.js"; +import { registerQuorumCheckerSrv } from "../aqua-compiled/quorum.js"; import { readArguments } from "./arguments.js"; import { readConfig } from "./config.js"; import { methods } from "./methods.js"; @@ -21,17 +21,17 @@ import { methods } from "./methods.js"; const args = readArguments(process.argv.slice(2)); if (args.errors.length > 0) { - console.log(args.help); - args.errors.forEach((err) => console.log(err)); - process.exit(1); + console.log(args.help); + args.errors.forEach((err) => console.log(err)); + process.exit(1); } const { config, errors, help } = readConfig(args.configPath); if (errors.length > 0) { - errors.forEach((err) => console.log(err)); - console.log(help); - process.exit(1); + errors.forEach((err) => console.log(err)); + console.log(help); + process.exit(1); } console.log("Running server..."); @@ -41,128 +41,158 @@ const route = "/"; const server = new JSONRPCServer(); // initialize fluence client -await Fluence.connect(config.relay); -const peerId = (await Fluence.getClient()).getPeerId() +await Fluence.connect(config.relay, {}); +const peerId = (await Fluence.getClient()).getPeerId(); // handler for logger -registerLogger({ - log: s => { - console.log("log: " + s); - }, - logCall: s => { - console.log("Call will be to : " + s); - }, - logWorker: s => { - console.log("Worker used: " + JSON.stringify(s)); - }, - logNum: s => { - console.log("Number: " + s); - }, -}) +registerLoggerSrv({ + log: (s) => { + console.log("log: " + s); + }, + logCall: (s) => { + console.log("Call will be to : " + s); + }, + logWorker: (s) => { + console.log("Worker used: " + JSON.stringify(s)); + }, + logNum: (s) => { + console.log("Number: " + s); + }, +}); let counter = 0; -registerCounter("counter", { - incrementAndReturn: () => { - counter++; - console.log("Counter: " + counter) - return counter; - } -}) +registerCounterSrv("counter", { + incrementAndReturn: () => { + counter++; + console.log("Counter: " + counter); + return counter; + }, +}); function findSameResults(results, minNum) { - const resultCounts = results.filter((obj) => obj.success).map((obj) => obj.value).reduce(function (i, v) { - if (i[v] === undefined) { - i[v] = 1 - } else { - i[v] = i[v] + 1; - } - return i; + const resultCounts = results + .filter((obj) => obj.success) + .map((obj) => obj.value) + .reduce(function (i, v) { + if (i[v] === undefined) { + i[v] = 1; + } else { + i[v] = i[v] + 1; + } + return i; }, {}); - const getMaxRepeated = Math.max(...Object.values(resultCounts)); - if (getMaxRepeated >= minNum) { - console.log(resultCounts) - const max = Object.entries(resultCounts).find((kv) => kv[1] === getMaxRepeated) - return { - value: max[0], - results: [], - error: "" - } - } else { - return { - error: "No consensus in results", - results: results, - value: "" - } - } + const getMaxRepeated = Math.max(...Object.values(resultCounts)); + if (getMaxRepeated >= minNum) { + console.log(resultCounts); + const max = Object.entries(resultCounts).find( + (kv) => kv[1] === getMaxRepeated + ); + return { + value: max[0], + results: [], + error: "", + }; + } else { + return { + error: "No consensus in results", + results: results, + value: "", + }; + } } -registerQuorumChecker("quorum", - { - check: (ethResults, minQuorum) => { - console.log("Check quorum for:") - console.log(ethResults) - return findSameResults(ethResults, minQuorum) - } - } -) +registerQuorumCheckerSrv("quorum", { + check: (ethResults, minQuorum) => { + console.log("Check quorum for:"); + console.log(ethResults); + return findSameResults(ethResults, minQuorum); + }, +}); -const counterServiceId = config.counterServiceId || 'counter' -const counterPeerId = config.counterPeerId || peerId -const quorumServiceId = config.quorumServiceId || 'quorum' -const quorumPeerId = config.quorumPeerId || peerId -const quorumNumber = config.quorumNumber || 2 +const counterServiceId = config.counterServiceId || "counter"; +const counterPeerId = config.counterPeerId || peerId; +const quorumServiceId = config.quorumServiceId || "quorum"; +const quorumPeerId = config.quorumPeerId || peerId; +const quorumNumber = config.quorumNumber || 2; +const mode = config.mode || "random"; + +console.log(`Using mode '${mode}'`); async function methodHandler(reqRaw, method) { - const req = reqRaw.map((s) => JSON.stringify(s)) - console.log(`Receiving request '${method}'`); - let result; - if (!config.mode || config.mode === "random") { - result = await randomLoadBalancingEth(config.providers, method, req); - } else if (config.mode === "round-robin") { - result = await roundRobinEth(config.providers, method, req, counterServiceId, counterPeerId, config.serviceId); - } else if (config.mode === "quorum") { - const quorumResult = await quorumEth(config.providers, quorumNumber, 10000, method, req, quorumServiceId, quorumPeerId, { ttl: 20000 }); - - if (quorumResult.error) { - console.error(`quorum failed: ${quorumResult.error}\n${JSON.stringify(quorumResult.results)}`); - result = { success: false, error: quorumResult.error }; - } else { - result = { success: true, error: quorumResult.error, value: quorumResult.value }; - } + const req = reqRaw.map((s) => JSON.stringify(s)); + console.log(`Receiving request '${method}'`); + let result; + if (mode === "random") { + result = await randomLoadBalancingEth(config.providers, method, req); + } else if (mode === "round-robin") { + result = await roundRobinEth( + config.providers, + method, + req, + counterServiceId, + counterPeerId, + config.serviceId + ); + } else if (mode === "quorum") { + const quorumResult = await quorumEth( + config.providers, + quorumNumber, + 10000, + method, + req, + quorumServiceId, + quorumPeerId, + { ttl: 20000 } + ); + + if (quorumResult.error) { + console.error( + `quorum failed: ${quorumResult.error}\n${JSON.stringify( + quorumResult.results + )}` + ); + result = { success: false, error: quorumResult.error }; + } else { + result = { + success: true, + error: quorumResult.error, + value: quorumResult.value, + }; } + } - if (!result.success) { - throw new Error(result.error); - } + if (!result.success) { + throw new Error(result.error); + } - return JSON.parse(result.value || '{}'); + return JSON.parse(result.value || "{}"); } function addMethod(op) { - server.addMethod(op, async (req) => methodHandler(req, op)); + server.addMethod(op, async (req) => methodHandler(req, op)); } // register all eth methods methods.forEach((m) => { - addMethod(m); -}) + addMethod(m); +}); const app = express(); app.use(bodyParser.json()); // register JSON-RPC handler app.post(route, (req, res) => { - const jsonRPCRequest = req.body; - server.receive(jsonRPCRequest).then((jsonRPCResponse) => { - if (jsonRPCResponse) { - res.json(jsonRPCResponse); - } else { - res.sendStatus(204); - } - }); + const jsonRPCRequest = req.body; + server.receive(jsonRPCRequest).then((jsonRPCResponse) => { + if (jsonRPCResponse) { + res.json(jsonRPCResponse); + } else { + res.sendStatus(204); + } + }); }); app.listen(config.port); -console.log("Server was started on port " + config.port); \ No newline at end of file +console.log("Server was started on port " + config.port);