Skip to content

Commit

Permalink
docker network and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Jun 30, 2024
1 parent 13eb0c9 commit bf5c464
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ tonic = "0.11.0"
prost = "0.12"
prost-types = "0.12"
tonic-build = "0.11.0"
tonic-health = "0.11.0"
tokio-stream = "0.1.15"

zetina-common = { path = "crates/common" }
Expand Down
1 change: 1 addition & 0 deletions crates/delegator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ async-stream.workspace = true
tokio-util.workspace = true
tokio-stream.workspace = true
thiserror.workspace = true
tonic-health.workspace = true

[build-dependencies]
tonic-build.workspace = true
7 changes: 6 additions & 1 deletion crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::sync::{broadcast, mpsc};
use tonic::{proto::delegator_service_server::DelegatorServiceServer, DelegatorGRPCServer};
use tracing_subscriber::EnvFilter;
use zetina_common::{
graceful_shutdown::shutdown_signal,
job_witness::JobWitness,
network::Network,
node_account::NodeAccount,
Expand Down Expand Up @@ -61,9 +62,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
job_witness_rx,
);

let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
health_reporter.set_serving::<DelegatorServiceServer<DelegatorGRPCServer>>().await;

Server::builder()
.add_service(health_service)
.add_service(DelegatorServiceServer::new(server))
.serve("[::1]:50051".parse().unwrap())
.serve_with_shutdown("0.0.0.0:50051".parse().unwrap(), shutdown_signal())
.await?;

Ok(())
Expand Down
10 changes: 1 addition & 9 deletions crates/delegator/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::error::Error;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

#[derive(NetworkBehaviour)]
pub struct PeerBehaviour {
Expand All @@ -20,7 +19,6 @@ pub struct PeerBehaviour {
}

pub struct SwarmRunner {
cancellation_token: CancellationToken,
handle: Option<JoinHandle<()>>,
}

Expand Down Expand Up @@ -53,14 +51,9 @@ impl SwarmRunner {
swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/5679".parse()?)?;

let cancellation_token = CancellationToken::new();

Ok(SwarmRunner {
cancellation_token: cancellation_token.to_owned(),
handle: Some(tokio::spawn(async move {
swarm_loop(swarm, transmit_topics, swarm_events_tx, cancellation_token)
.boxed()
.await
swarm_loop(swarm, transmit_topics, swarm_events_tx).boxed().await
})),
})
}
Expand All @@ -81,7 +74,6 @@ impl SwarmRunner {

impl Drop for SwarmRunner {
fn drop(&mut self) {
self.cancellation_token.cancel();
block_on(async move {
if let Some(handle) = self.handle.take() {
handle.await.unwrap();
Expand Down
5 changes: 2 additions & 3 deletions crates/delegator/src/swarm/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use libp2p::{
Swarm,
};
use tokio::sync::mpsc::{self, Receiver};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use zetina_common::graceful_shutdown::shutdown_signal;

pub(crate) async fn swarm_loop(
mut swarm: Swarm<PeerBehaviour>,
mut transmit_topics: Vec<(IdentTopic, Receiver<Vec<u8>>)>,
swarm_events_tx: mpsc::Sender<gossipsub::Event>,
cancellation_token: CancellationToken,
) {
loop {
tokio::select! {
Expand Down Expand Up @@ -52,7 +51,7 @@ pub(crate) async fn swarm_loop(
}
_ => {}
},
_ = cancellation_token.cancelled() => {
_ = shutdown_signal() => {
break
}
}
Expand Down
9 changes: 8 additions & 1 deletion crates/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
starknet.workspace = true
tonic.workspace = true
prost.workspace = true
async-stream.workspace = true
tokio-util.workspace = true
thiserror.workspace = true
tokio-stream.workspace = true
thiserror.workspace = true
tonic-health.workspace = true

[build-dependencies]
tonic-build.workspace = true
4 changes: 4 additions & 0 deletions crates/executor/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/executor.proto")?;
Ok(())
}
10 changes: 10 additions & 0 deletions crates/executor/proto/executor.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";

package executor;

service ExecutorService {
rpc Executor(ExecutorRequest) returns (ExecutorResponse) {}
}

message ExecutorRequest { string msg = 1; }
message ExecutorResponse { string msg = 1; }
15 changes: 15 additions & 0 deletions crates/executor/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
pub mod executor;
pub mod swarm;
pub mod tonic;

use ::tonic::transport::Server;
use executor::Executor;
use libp2p::gossipsub::{self};
use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient, Url};
use swarm::SwarmRunner;
use tokio::sync::mpsc;
use tonic::{proto::executor_service_server::ExecutorServiceServer, ExecutorGRPCServer};
use tracing_subscriber::EnvFilter;
use zetina_common::{
graceful_shutdown::shutdown_signal,
network::Network,
node_account::NodeAccount,
topic::{gossipsub_ident_topic, Topic},
Expand Down Expand Up @@ -65,5 +69,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

Executor::new(swarm_events_rx, finished_job_topic_tx, picked_job_topic_tx, runner, prover);

let server = ExecutorGRPCServer::default();

let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
health_reporter.set_serving::<ExecutorServiceServer<ExecutorGRPCServer>>().await;

Server::builder()
.add_service(health_service)
.add_service(ExecutorServiceServer::new(server))
.serve_with_shutdown("0.0.0.0:50052".parse().unwrap(), shutdown_signal())
.await?;

Ok(())
}
14 changes: 3 additions & 11 deletions crates/executor/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::error::Error;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

#[derive(NetworkBehaviour)]
pub struct PeerBehaviour {
Expand All @@ -20,7 +19,6 @@ pub struct PeerBehaviour {
}

pub struct SwarmRunner {
cancellation_token: CancellationToken,
handle: Option<JoinHandle<()>>,
}

Expand Down Expand Up @@ -50,17 +48,12 @@ impl SwarmRunner {
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
}

swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/5679".parse()?)?;

let cancellation_token = CancellationToken::new();
swarm.listen_on("/ip4/0.0.0.0/udp/5680/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/5681".parse()?)?;

Ok(SwarmRunner {
cancellation_token: cancellation_token.to_owned(),
handle: Some(tokio::spawn(async move {
swarm_loop(swarm, transmit_topics, swarm_events_tx, cancellation_token)
.boxed()
.await
swarm_loop(swarm, transmit_topics, swarm_events_tx).boxed().await
})),
})
}
Expand All @@ -81,7 +74,6 @@ impl SwarmRunner {

impl Drop for SwarmRunner {
fn drop(&mut self) {
self.cancellation_token.cancel();
block_on(async move {
if let Some(handle) = self.handle.take() {
handle.await.unwrap();
Expand Down
5 changes: 2 additions & 3 deletions crates/executor/src/swarm/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use libp2p::{
Swarm,
};
use tokio::sync::mpsc::{self, Receiver};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use zetina_common::graceful_shutdown::shutdown_signal;

pub(crate) async fn swarm_loop(
mut swarm: Swarm<PeerBehaviour>,
mut transmit_topics: Vec<(IdentTopic, Receiver<Vec<u8>>)>,
swarm_events_tx: mpsc::Sender<gossipsub::Event>,
cancellation_token: CancellationToken,
) {
// TODO make it nicer solution, extensible not manual!
let mut topic_one = transmit_topics.pop().unwrap();
Expand Down Expand Up @@ -63,7 +62,7 @@ pub(crate) async fn swarm_loop(
}
_ => {}
},
_ = cancellation_token.cancelled() => {
_ = shutdown_signal() => {
break
}
}
Expand Down
33 changes: 33 additions & 0 deletions crates/executor/src/tonic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
pub mod proto {
tonic::include_proto!("executor");
}
use proto::executor_service_server::ExecutorService;
use proto::{ExecutorRequest, ExecutorResponse};
use tonic::{Request, Response, Status};

pub struct ExecutorGRPCServer {}

impl ExecutorGRPCServer {
pub fn new() -> Self {
Self {}
}
}

impl Default for ExecutorGRPCServer {
fn default() -> Self {
Self::new()
}
}

#[tonic::async_trait]
impl ExecutorService for ExecutorGRPCServer {
async fn executor(
&self,
request: Request<ExecutorRequest>,
) -> Result<Response<ExecutorResponse>, Status> {
println!("Got a request from {:?}", request.remote_addr());

let reply = ExecutorResponse { msg: format!("Hello {}!", request.into_inner().msg) };
Ok(Response::new(reply))
}
}
2 changes: 1 addition & 1 deletion delegator.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ FROM runtime
RUN cargo build --release --bin zetina-delegator

# Expose necessary ports
EXPOSE 5678/udp 5679/tcp
EXPOSE 5678/udp 5679/tcp 50051/tcp

# Set the default command to run when the container starts
CMD ["bash", "-ci", "cargo run --release --bin zetina-delegator"]
26 changes: 16 additions & 10 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,13 @@ services:
context: stone-prover
dockerfile: Dockerfile
image: stone-prover
deploy:
resources:
limits:
cpus: '4'
memory: '10G'

runtime:
build:
dockerfile: runtime.dockerfile
image: runtime
depends_on:
- stone-prover
deploy:
resources:
limits:
cpus: '2'
memory: '10G'

delegator:
build:
Expand All @@ -32,6 +22,12 @@ services:
limits:
cpus: '8'
memory: '10G'
ports:
- "50051:50051/tcp"
environment:
- RUST_LOG=info
networks:
- zetina-network

executor:
build:
Expand All @@ -43,3 +39,13 @@ services:
limits:
cpus: '8'
memory: '10G'
ports:
- "50052:50052/tcp"
environment:
- RUST_LOG=info
networks:
- zetina-network

networks:
zetina-network:
driver: bridge
2 changes: 1 addition & 1 deletion executor.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ FROM runtime
RUN cargo build --release --bin zetina-executor

# Expose necessary ports
EXPOSE 5678/udp 5679/tcp
EXPOSE 5678/udp 5679/tcp 50052/tcp

# Set the default command to run when the container starts
CMD ["bash", "-ci", "cargo run --release --bin zetina-executor"]
1 change: 1 addition & 0 deletions runtime.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ RUN apt-get update && apt-get install -y \
git \
libgmp-dev \
libdw1 \
protobuf-compiler \
&& rm -rf /var/lib/apt/lists/*

# Install Rust using Rustup
Expand Down

0 comments on commit bf5c464

Please sign in to comment.