Skip to content

Commit

Permalink
fix(hermes): reconnect on wh connection termination (#1488)
Browse files Browse the repository at this point in the history
* fix(hermes): reconnect on wh connection termination

`tokio::select` disables the branch that runs the wh connection
if it returns OK and it never gets checked again. This change
changes the `run` return to never return OK.

* refactor(hermes): use Result<!> in pythnet network listener thread
  • Loading branch information
ali-bahjati authored Apr 22, 2024
1 parent 1b13bf6 commit f929217
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 55 deletions.
2 changes: 1 addition & 1 deletion apps/hermes/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/hermes/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.5.5"
version = "0.5.6"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
95 changes: 45 additions & 50 deletions apps/hermes/src/network/pythnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async fn fetch_bridge_data(
}
}

pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<()> {
pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<!> {
let client = PubsubClient::new(pythnet_ws_endpoint.as_ref()).await?;

let config = RpcProgramAccountsConfig {
Expand All @@ -160,59 +160,54 @@ pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<()> {
.program_subscribe(&system_program::id(), Some(config))
.await?;

loop {
match notif.next().await {
Some(update) => {
let account: Account = match update.value.account.decode() {
Some(account) => account,
None => {
tracing::error!(?update, "Failed to decode account from update.");
continue;
}
};

let accumulator_messages = AccumulatorMessages::try_from_slice(&account.data);
match accumulator_messages {
Ok(accumulator_messages) => {
let (candidate, _) = Pubkey::find_program_address(
&[
b"AccumulatorState",
&accumulator_messages.ring_index().to_be_bytes(),
],
&system_program::id(),
);

if candidate.to_string() == update.value.pubkey {
let store = store.clone();
tokio::spawn(async move {
if let Err(err) = Aggregates::store_update(
&*store,
Update::AccumulatorMessages(accumulator_messages),
)
.await
{
tracing::error!(error = ?err, "Failed to store accumulator messages.");
}
});
} else {
tracing::error!(
?candidate,
?update.value.pubkey,
"Failed to verify message public keys.",
);
}
}
while let Some(update) = notif.next().await {
let account: Account = match update.value.account.decode() {
Some(account) => account,
None => {
tracing::error!(?update, "Failed to decode account from update.");
continue;
}
};

let accumulator_messages = AccumulatorMessages::try_from_slice(&account.data);
match accumulator_messages {
Ok(accumulator_messages) => {
let (candidate, _) = Pubkey::find_program_address(
&[
b"AccumulatorState",
&accumulator_messages.ring_index().to_be_bytes(),
],
&system_program::id(),
);

Err(err) => {
tracing::error!(error = ?err, "Failed to parse AccumulatorMessages.");
}
};
if candidate.to_string() == update.value.pubkey {
let store = store.clone();
tokio::spawn(async move {
if let Err(err) = Aggregates::store_update(
&*store,
Update::AccumulatorMessages(accumulator_messages),
)
.await
{
tracing::error!(error = ?err, "Failed to store accumulator messages.");
}
});
} else {
tracing::error!(
?candidate,
?update.value.pubkey,
"Failed to verify message public keys.",
);
}
}
None => {
return Err(anyhow!("Pythnet network listener terminated"));

Err(err) => {
tracing::error!(error = ?err, "Failed to parse AccumulatorMessages.");
}
}
};
}

Err(anyhow!("Pythnet network listener connection terminated"))
}

/// Fetch existing GuardianSet accounts from Wormhole.
Expand Down
16 changes: 13 additions & 3 deletions apps/hermes/src/network/wormhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ use {
Digest,
Keccak256,
},
std::sync::Arc,
std::{
sync::Arc,
time::Duration,
},
tokio::time::Instant,
tonic::Request,
wormhole_sdk::{
vaa::{
Expand Down Expand Up @@ -158,10 +162,16 @@ mod proto {
pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
let mut exit = crate::EXIT.subscribe();
loop {
let current_time = Instant::now();
tokio::select! {
_ = exit.changed() => break,
Err(err) = run(opts.clone(), state.clone()) => {
tracing::error!(error = ?err, "Wormhole gRPC service failed.");

if current_time.elapsed() < Duration::from_secs(30) {
tracing::error!("Wormhole listener restarting too quickly. Sleep 1s.");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
Expand All @@ -170,7 +180,7 @@ pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
}

#[tracing::instrument(skip(opts, state))]
async fn run(opts: RunOptions, state: Arc<State>) -> Result<()> {
async fn run(opts: RunOptions, state: Arc<State>) -> Result<!> {
let mut client = SpyRpcServiceClient::connect(opts.wormhole.spy_rpc_addr).await?;
let mut stream = client
.subscribe_signed_vaa(Request::new(SubscribeSignedVaaRequest {
Expand All @@ -190,7 +200,7 @@ async fn run(opts: RunOptions, state: Arc<State>) -> Result<()> {
}
}

Ok(())
Err(anyhow!("Wormhole gRPC stream terminated."))
}

/// Process a message received via a Wormhole gRPC connection.
Expand Down

0 comments on commit f929217

Please sign in to comment.