Contention, Actor Patterns, and Refining the Suzuka Full Node #340
Replies: 6 comments 18 replies
-
To follow the discussion started here #63 #63 The Process and Store step can use share resource and/or executed in parallel, it's mostly where the actor are used, and the decision depends on how we optimize resource sharing. The Suzuka node code can be organized around these state transition, where the basic pattern is applied to each Tx or group of Tx (blob, block). The whole process fit well in an event streaming API where actor generate event and the state manage and process the events. The whole pattern can be applied at different granularity like Russian dolls. Some state transitions are grouped together and processed in a sub manager. Last thing in the state manager pattern, a sort of supervisor can be plugged to manage all error case to decide what has to be done depending on the state currently processing. The error can go up the processing hierarchy until a supervisor know how to handle it. GetTxByHash query contention: I don't know how this pattern can be applied to the actor evolution. From my XP, it helps to have a good control on resource sharing and parallel process execution. It can evolve easily with the evolution of the functionalities because each evolution is defined in the state transition with allow to see what are the implication on the existing code and decide the best option. Last thing, it allows mixing sync and async code because each processing / storage can be defined as sync or async code. The state manage is able to call them correctly. We can even define how we allocate CPU between sync and async task. Not sure how it can be integrated, it's more to open the discussion and if you think it can be interesting we can see how we can integrate this pattern. |
Beta Was this translation helpful? Give feedback.
-
Draft notes for an eventual document. Composing async applications using actor modelTo avoid excessive contention and permit concurrent execution on a multi-threaded async runtime, sharing of mutable state between asynchronous tasks should be avoided. Generally, state used by each task should be Operation of a network-driven application such as a blockchain node can be modeled as a collection of actor tasks executed on an async runtime. An actor generally encapsulates tightly coupled mutable state that changes in reaction to events that arrive via cross-task communication channels or I/O interfaces. Organization of a taskThe funcitonality of an individual task, or a protocol unit that may form part or a task, can be split into three principal parts. The specific design may omit some of these. Front object
Background task
Construction API
Cross-task communicationMPSC channels, as provided by Tokio, are currently found to be sufficient to organize passing of events and request messages between agent tasks making a Movement node (as realized first in #308). If needed, Tokio's sync::broadcast could be used to realize event subscription for multiple tasks. At this point we're not looking to implement dynamic composition via named channels, publish-subscribe, etc. Concurrency on smaller scaleSuggested decisions on which concurrency approach to use for specific kinds of tasks. Async IO-bound: Tower frameworkTo implement tasks that involve processing a flow of incoming requests that needs to be back-pressured against available capacity, we can utilise the tower framework and the middlewares provided for it. External resources |
Beta Was this translation helpful? Give feedback.
-
@mzabaluev It would be great to have a the template/toy implementation of this pattern included in this discussion. That is, the simplest version of a Front Object, Construction API, and Background Task that you can provide. |
Beta Was this translation helpful? Give feedback.
-
A toy actor example: use std::future::Future;
use tokio::sync::{mpsc, oneshot};
/// Public handle object for the toy counter service.
///
/// A handle can be cheaply cloned to get multiple instances for using the
/// same service in different parts of the program.
#[derive(Clone)]
pub struct Counter {
sender: mpsc::Sender<Request>,
}
// Enum for the internal requests processed by the counter service.
enum Request {
Increment { by: i64 },
Get { response: oneshot::Sender<i64> },
}
impl Counter {
/// Instantiates the counter service.
/// Returns the handle object and the future to spawn on the async runtime.
pub fn new(
initial_count: i64,
) -> (
Self,
impl Future<Output = anyhow::Result<()>> + Send,
) {
let (sender, receiver) = mpsc::channel(16);
let background = Background {
receiver,
counter: initial_count,
}
.run();
(Counter { sender }, background)
}
pub async fn increment(&self, by: i64) -> anyhow::Result<()> {
self.sender.send(Request::Increment { by }).await?;
Ok(())
}
pub async fn get(&self) -> anyhow::Result<i64> {
let (response_tx, response_rx) = oneshot::channel();
self.sender
.send(Request::Get {
response: response_tx,
})
.await?;
let value = response_rx.await?;
Ok(value)
}
}
// State of the background task for the counter service.
// This is the actor in the actor pattern.
struct Background {
receiver: mpsc::Receiver<Request>,
counter: i64,
}
impl Background {
async fn run(mut self) -> anyhow::Result<()> {
while let Some(req) = self.receiver.recv().await {
match req {
Request::Increment { by } => self.increment(by).await?,
Request::Get { response } => {
let value = self.get().await?;
response.send(value).unwrap_or_else(|_| {
// The request method's future has been canceled,
// so it's OK to do nothing here.
});
}
}
}
// The last handle has been dropped, terminate.
Ok(())
}
async fn increment(&mut self, amount: i64) -> anyhow::Result<()> {
self.counter += amount;
Ok(())
}
// NOTE: if task state features generics or dynamic objects and the compiler
// says these lack `Sync`, consider making the recipient `&mut self`. See
// https://github.com/rust-lang/rust/issues/129105
async fn get(&self) -> anyhow::Result<i64> {
Ok(self.counter)
}
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let (counter, background_task) = Counter::new(0);
tokio::spawn(background_task);
counter.increment(42).await?;
let value = counter.get().await?;
println!("{value}");
Ok(())
} |
Beta Was this translation helpful? Give feedback.
-
From my experience, the difficulty of this model is to define the right granularity and how Actor are grouped. |
Beta Was this translation helpful? Give feedback.
-
For info since the first merge of the actor pattern, the indexer gRpc init weren't plugged anymore. I've added the init to the new code, but it was a little awkward. Where it's added to the executor init: And where it's integrated in the mail loop: here: After I add to change all the test that use executor.background in all dof and fin_view crate.The reason I do like that, it's because the Aptos part of the indexer init need the DbReaderWriter that is created and hidden behind the Context . That why the indexer init function is an implementation of Context:
With the way to do, all the call that are encapsulated : Suzuka node init call execution init that call indexer init and each sub element that access to main resource, I didn't manage to remove the indexer init from the Execution without changing a lot of code.I think the main reason is that the Aptos part of the indexer gRpc use the DbReaderWriter instead of a notification of Tx state change because the gRpc part role is only to send new tx, so there's no need to access the db.For me, it shows that components should be mobilized about their needs and not the resource they need to get their data and provide access to what they produce, here state change for the execution. |
Beta Was this translation helpful? Give feedback.
-
Tracking issue: #63
Summary
Strides have been made to refactor to the Suzuka Full Node to eliminate synchronization primitives, minimize logical awaiting, and generally make better usage of the
tokio
runtime. However, further, improvements should be endeavored.Beta Was this translation helpful? Give feedback.
All reactions