Skip to content

Commit

Permalink
Merge pull request #41 from terassyi/sartd-fib-retry-to-publish
Browse files Browse the repository at this point in the history
Sartd-fib retries to connect bgp endpoint
  • Loading branch information
terassyi authored Jun 27, 2023
2 parents 77ea736 + d2335f9 commit ff27df0
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
30 changes: 27 additions & 3 deletions sartd/src/fib/bgp.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::collections::VecDeque;
use std::net::IpAddr;
use std::time::Duration;

use ipnet::IpNet;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::Instant;

use crate::fib::rib::RequestType;
use crate::fib::route::ip_version_to_afi;
Expand All @@ -31,11 +34,16 @@ use super::route::ip_version_from;
#[derive(Debug, Deserialize, Serialize, Clone)]
pub(crate) struct Bgp {
pub endpoint: String,
#[serde(skip)]
queue: VecDeque<(RequestType, Route)>,
}

impl Bgp {
pub fn new(endpoint: String) -> Self {
Self { endpoint }
Self {
endpoint: endpoint,
queue: VecDeque::new()
}
}

#[tracing::instrument(skip(self))]
Expand Down Expand Up @@ -63,8 +71,7 @@ impl Bgp {

#[tracing::instrument(skip(self))]
pub async fn publish(&self, req: RequestType, route: Route) -> Result<(), Error> {
tracing::info!(endpoint=self.endpoint,"try to connect to BGP's gRPC server");
let mut client = connect_bgp(&self.endpoint).await?;
let mut client = connect_bgp_with_retry(&self.endpoint, Duration::from_secs(5)).await?;
tracing::info!(endpoint=self.endpoint,"connect to BGP's gRPC server");

match req {
Expand Down Expand Up @@ -239,3 +246,20 @@ async fn connect_bgp(
.await
.map_err(Error::FailedToCommunicateWithgRPC)
}

#[tracing::instrument]
async fn connect_bgp_with_retry(endpoint: &str, timeout: Duration) -> Result<crate::proto::sart::bgp_api_client::BgpApiClient<tonic::transport::Channel>, Error> {
let deadline = Instant::now() + timeout;
loop {
if Instant::now() > deadline {
return Err(Error::Timeout)
}
match connect_bgp(endpoint).await {
Ok(conn) => return Ok(conn),
Err(e) => {
tracing::error!(error=?e,"failed to connect bgp");
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
}
2 changes: 2 additions & 0 deletions sartd/src/fib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub(crate) enum Error {
},
#[error("failed to communicate with gRPC server/client")]
FailedToCommunicateWithgRPC(#[from] tonic::transport::Error),
#[error("timeout")]
Timeout,
#[error("got error {} from gRPC", e)]
GotgPRCError {
#[from]
Expand Down

0 comments on commit ff27df0

Please sign in to comment.