Skip to content

Commit

Permalink
feat(gateway): Refactor aqua (#115)
Browse files Browse the repository at this point in the history
* Add Logger ability

* Rewrote random and round-robin

* Rewrote quorum

* Add comments

* Add config.mode check

* Update README

* Small fixes

* Update js-client
  • Loading branch information
InversionSpaces authored Oct 24, 2023
1 parent c6c9a2e commit d608bf8
Show file tree
Hide file tree
Showing 14 changed files with 712 additions and 305 deletions.
183 changes: 119 additions & 64 deletions README.md

Large diffs are not rendered by default.

55 changes: 55 additions & 0 deletions gateway/aqua/balancer.aqua
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
module Balancer declares Balancer, ProviderBalancer, WorkersBalancer, randomBalancer, cycleBalancer

import Worker from "@fluencelabs/aqua-lib/subnet.aqua"

import Counter from "counter.aqua"
import Random from "random.aqua"
import Provider from "provider.aqua"

ability WorkersBalancer:
nextWorker() -> Worker

ability ProviderBalancer:
nextProvider() -> Provider

ability Balancer:
nextWorker() -> Worker
nextProvider() -> Provider
next() -> Worker, Provider

-- Create balancer that returns
-- workers and providers in random order
func randomBalancer{Random}(workers: []Worker, providers: []Provider) -> Balancer:
-- closures do not capture topology here
nextWorker = func () -> Worker:
rand <- Random.next()
idx = rand % workers.length
<- workers[idx]

nextProvider = func () -> Provider:
rand <- Random.next()
idx = rand % providers.length
<- providers[idx]

next = func () -> Worker, Provider:
<- nextWorker(), nextProvider()

<- Balancer(next=next, nextWorker=nextWorker, nextProvider=nextProvider)

-- Create balancer that returns
-- workers and providers in cycle order
func cycleBalancer{Counter}(workers: []Worker, providers: []Provider) -> Balancer:
next = func () -> Worker, Provider:
n <- Counter.incrementAndReturn()
idx = n % workers.length
<- workers[idx], providers[idx]

nextWorker = func () -> Worker:
w, p <- next()
<- w

nextProvider = func () -> Provider:
w, p <- next()
<- p

<- Balancer(next=next, nextWorker=nextWorker, nextProvider=nextProvider)
22 changes: 22 additions & 0 deletions gateway/aqua/counter.aqua
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module Counter declares Counter, onPeerCounter

export CounterSrv

ability Counter:
incrementAndReturn() -> u32

service CounterSrv("counter"):
incrementAndReturn() -> u32

-- Create Counter ability that
-- counts on peer through CounterSrv(id)
func onPeerCounter(peer: string, id: string) -> Counter:
-- closure does not capture topology here
incAndReturn = func () -> u32:
on peer:
CounterSrv id
res <- CounterSrv.incrementAndReturn()
<- res

<- Counter(incrementAndReturn = incAndReturn)

22 changes: 22 additions & 0 deletions gateway/aqua/eth_rpc.aqua
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module RPCEth declares RPCEth, fromWorkerProvider

import Worker from "@fluencelabs/aqua-lib/subnet.aqua"

import "services.aqua"

import Provider from "provider.aqua"

-- Ability to call Ethereum JSON RPC methods
ability RPCEth:
call(method: string, jsonArgs: []string) -> JsonString

-- Create RPCEth ability from Worker and Provider
func fromWorkerProvider(worker: Worker, provider: Provider) -> RPCEth:
-- closure does not capture topology here
call = func (method: string, jsonArgs: []string) -> JsonString:
-- TODO: Handle worker_id == nil?
on worker.worker_id! via worker.host_id:
res <- EthRpc.eth_call(provider, method, jsonArgs)
<- res

<- RPCEth(call = call)
38 changes: 38 additions & 0 deletions gateway/aqua/logger.aqua
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module Logger declares Logger, initPeerLogger

export LoggerSrv

import Worker from "@fluencelabs/aqua-lib/subnet.aqua"

ability Logger:
log(s: []string)
logNum(n: u32)
logCall(s: string)
logWorker(w: Worker)

service LoggerSrv("logger"):
log(s: []string)
logNum(n: u32)
logCall(s: string)
logWorker(w: Worker)

-- Create Logger ability that logs
-- on INIT_PEER_ID via HOST_PEER_ID
-- through LoggerSrv
func initPeerLogger() -> Logger:
-- closures do not capture topology here

log = func (s: []string):
on INIT_PEER_ID via HOST_PEER_ID:
LoggerSrv.log(s)
logNum = func (n: u32):
on INIT_PEER_ID via HOST_PEER_ID:
LoggerSrv.logNum(n)
logCall = func (s: string):
on INIT_PEER_ID via HOST_PEER_ID:
LoggerSrv.logCall(s)
logWorker = func (w: Worker):
on INIT_PEER_ID via HOST_PEER_ID:
LoggerSrv.logWorker(w)

<- Logger(log=log, logNum=logNum, logCall=logCall, logWorker=logWorker)
3 changes: 3 additions & 0 deletions gateway/aqua/provider.aqua
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module Provider declares Provider

alias Provider: string
29 changes: 29 additions & 0 deletions gateway/aqua/quorum.aqua
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
module Quorum declares QuorumChecker, QuorumResult, onPeerQuorumChecker

import JsonString from "services.aqua"

export QuorumCheckerSrv

data QuorumResult:
value: string
results: []JsonString
error: string

-- Ability to check if a quorum on results is reached
ability QuorumChecker:
check(results: []JsonString, minResults: u32) -> QuorumResult

service QuorumCheckerSrv("quorum"):
check(results: []JsonString, minResults: u32) -> QuorumResult

-- Create a QuorumChecker ability
-- that checks quorum on peer through QuorumCheckerSrv(id)
func onPeerQuorumChecker(peer: string, id: string) -> QuorumChecker:
-- closure does not capture topology here
check = func (results: []JsonString, minResults: u32) -> QuorumResult:
on peer:
QuorumCheckerSrv id
res <- QuorumCheckerSrv.check(results, minResults)
<- res

<- QuorumChecker(check = check)
18 changes: 18 additions & 0 deletions gateway/aqua/random.aqua
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module Random declares Random, timeRandom

import Peer from "@fluencelabs/aqua-lib/builtin.aqua"

import NumOp from "utils.aqua"

ability Random:
next() -> i64

-- Create random from timestamp
func timeRandom() -> Random:
-- closure does not capture topology here
next = func () -> i64:
t <- Peer.timestamp_sec()
n <- NumOp.identity(t)
<- n

<- Random(next = next)
170 changes: 80 additions & 90 deletions gateway/aqua/rpc.aqua
Original file line number Diff line number Diff line change
@@ -1,118 +1,108 @@
aqua RPC

import "@fluencelabs/aqua-lib/builtin.aqua"
import Subnet, Worker from "@fluencelabs/aqua-lib/subnet.aqua"

import "services.aqua"
use "deals.aqua"

export randomLoadBalancingEth, roundRobinEth, quorumEth, Counter, Logger

data QuorumResult:
value: string
results: []JsonString
error: string

service Logger("logger"):
log(s: []string)
logNum(n: u32)
logCall(s: string)
logWorker(w: Worker)
import Logger, initPeerLogger from "logger.aqua"
import Balancer, ProviderBalancer, randomBalancer, cycleBalancer from "balancer.aqua"
import onPeerCounter from "counter.aqua"
import QuorumChecker, QuorumResult, onPeerQuorumChecker from "quorum.aqua"
import timeRandom from "random.aqua"
import RPCEth, fromWorkerProvider from "eth_rpc.aqua"

service NumOp("op"):
identity(n: u64) -> i64
import NumOp from "utils.aqua"

service Counter("counter"):
incrementAndReturn() -> u32
export randomLoadBalancingEth, roundRobinEth, quorumEth

service QuorumChecker("quorum"):
check(results: []JsonString, minResults: u32) -> QuorumResult
func errorQuorumResult(msg: string) -> QuorumResult:
<- QuorumResult(value = "", results = [], error = msg)

func empty() -> JsonString:
<- JsonString(value = "", success = true, error = "")

func call(uri: string, method: string, jsonArgs: []string) -> JsonString:
res <- EthRpc.eth_call(uri, method, jsonArgs)
<- res
func errorJsonString(msg: string) -> JsonString:
<- JsonString(value = "", success = false, error = msg)

-- Get workers participating in deal
func getWorkers() -> []Worker, ?string:
deals <- Deals.get()
on INIT_PEER_ID via HOST_PEER_ID:
deals <- Deals.get()
dealId = deals.defaultWorker!.dealIdOriginal
on HOST_PEER_ID:
result <- Subnet.resolve(dealId)
workers = result.workers
error = result.error
<- workers, error
<- result.workers, result.error

func callOnWorker(worker: Worker, uri: string, method: string, jsonArgs: []string, callFunc: string, string, []string -> JsonString) -> JsonString:
on worker.worker_id! via worker.host_id:
result <- callFunc(uri, method, jsonArgs)
<- result
-- Call RPC method through ability
func rpcCall{RPCEth}(method: string, jsonArgs: []string) -> JsonString:
<- RPCEth.call(method, jsonArgs)

func randomLoadBalancing(uris: []string, method: string, jsonArgs: []string, callFunc: string, string, []string -> JsonString) -> JsonString:
-- Call RPC method with load balancing
func balancedEthCall{Logger, Balancer}(method: string, jsonArgs: []string) -> JsonString:
on HOST_PEER_ID:
workers <- getWorkers()
workersNum = workers.length
-- choose worker randomly
timeW <- NumOp.identity(Peer.timestamp_sec())
workerNumber = timeW % workers.length
worker = workers[workerNumber]
-- choose provider randomly
timeP <- NumOp.identity(Peer.timestamp_sec())
providerNumber = timeP % uris.length
provider = uris[providerNumber]
result <- callOnWorker(worker, provider, method, jsonArgs, callFunc)
Logger.logWorker(worker)
Logger.logCall(uris[providerNumber])
worker, provider <- Balancer.next()
Logger.logWorker(worker)
Logger.logCall(provider)
Op.noop() -- dirty hack for topology to converge
rpc <- fromWorkerProvider(worker, provider)
result <- rpcCall{rpc}(method, jsonArgs)
<- result

-- Call RPC method with random load balancing
func randomLoadBalancingEth(uris: []string, method: string, jsonArgs: []string) -> JsonString:
<- randomLoadBalancing(uris, method, jsonArgs, call)

func roundRobin(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string, callFunc: string, string, []string -> JsonString) -> JsonString:
on counterPeerId:
Counter counterServiceId
requestNumber <- Counter.incrementAndReturn()
on HOST_PEER_ID:
workers, error <- getWorkers()
result: *JsonString
if error == nil:
workerNumber = requestNumber % workers.length
worker = workers[workerNumber]
providerNumber = requestNumber % uris.length
provider = uris[providerNumber]
result <<- callOnWorker(worker, provider, method, jsonArgs, callFunc)
Op.noop() -- dirty hack
on INIT_PEER_ID:
Logger.logWorker(worker)
Logger.logCall(uris[providerNumber])
else:
result <<- JsonString(value = "", success = false, error = error!)

result: ?JsonString

workers, error <- getWorkers()
if error != nil:
result <- errorJsonString(error!)
else:
log <- initPeerLogger()
random <- timeRandom()
balancer <- randomBalancer{random}(workers, uris)
result <- balancedEthCall{log, balancer}(method, jsonArgs)

<- result!

-- Call RPC method with round-robin load balancing
func roundRobinEth(uris: []string, method: string, jsonArgs: []string, counterServiceId: string, counterPeerId: string) -> JsonString:
<- roundRobin(uris, method, jsonArgs, counterServiceId, counterPeerId, call)
result: ?JsonString

workers, error <- getWorkers()
if error != nil:
result <- errorJsonString(error!)
else:
log <- initPeerLogger()
counter <- onPeerCounter(counterPeerId, counterServiceId)
balancer <- cycleBalancer{counter}(workers, uris)
result <- balancedEthCall{log, balancer}(method, jsonArgs)

<- result!

func quorum(
uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string,
callFunc: string, string, []string -> JsonString
) -> QuorumResult:
-- Call RPC method with workers quorum and provider load balancing
func quorum{ProviderBalancer, QuorumChecker}(workers: []Worker, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string) -> QuorumResult:
results: *JsonString
on HOST_PEER_ID:
workers <- getWorkers()
on HOST_PEER_ID:
for worker <- workers par:
on worker.worker_id! via worker.host_id:
-- choose provider randomly
timeP <- NumOp.identity(Peer.timestamp_ms())
providerNumber = timeP % uris.length
provider = uris[providerNumber]
results <- callFunc(provider, method, jsonArgs)
-- wait all results from all workers with timeout
join results[workers.length - 1]
par Peer.timeout(timeout, "")
on quorumPeerId via HOST_PEER_ID:
Counter quorumServiceId
-- check all results that we got
quorumResult <- QuorumChecker.check(results, quorumNumber)
<- quorumResult
provider <- ProviderBalancer.nextProvider()
rpc <- fromWorkerProvider(worker, provider)
results <- rpcCall{rpc}(method, jsonArgs)

-- wait all results from all workers with timeout
join results[workers.length - 1]
par Peer.timeout(timeout, "Workers timeout")

<- QuorumChecker.check(results, quorumNumber)

-- Call RPC method with workers quorum and provider load balancing
func quorumEth(uris: []string, quorumNumber: u32, timeout: u32, method: string, jsonArgs: []string, quorumServiceId: string, quorumPeerId: string) -> QuorumResult:
<- quorum(uris, quorumNumber, timeout, method, jsonArgs, quorumServiceId, quorumPeerId, call)
result: *QuorumResult

workers, error <- getWorkers()
if error != nil:
result <- errorQuorumResult(error!)
else:
random <- timeRandom()
balancer <- randomBalancer{random}(workers, uris)
quorumChecker <- onPeerQuorumChecker(quorumPeerId, quorumServiceId)
result <- quorum{balancer, quorumChecker}(workers, quorumNumber, timeout, method, jsonArgs)

<- result!
Loading

0 comments on commit d608bf8

Please sign in to comment.