Skip to content

Commit

Permalink
Merge pull request #678 from paritytech/dependabot/cargo/async-channe…
Browse files Browse the repository at this point in the history
…l-2.1.1

Bump async-channel from 1.9.0 to 2.1.1
  • Loading branch information
AndreiEres authored Mar 13, 2024
2 parents 75971e4 + f4c3db3 commit 76842f5
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 14 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ license = "MIT OR Apache-2.0"
repository = "https://github.com/paritytech/polkadot-introspector"

[workspace.dependencies]
async-channel = "1.9.0"
async-channel = "2.1.1"
async-trait = "0.1.77"
bincode = "1.3.3"
clap = { version = "4.5.2", features = ["derive"] }
Expand Down
3 changes: 2 additions & 1 deletion essentials/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,10 @@ impl Collector {
/// Process async channels in the endless loop
pub async fn run_with_consumer_channel(
mut self,
mut consumer_channel: Receiver<ChainSubscriptionEvent>,
consumer_channel: Receiver<ChainSubscriptionEvent>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut consumer_channel = Box::pin(consumer_channel);
loop {
match consumer_channel.next().await {
None => {
Expand Down
3 changes: 2 additions & 1 deletion parachain-tracer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl ParachainTracer {

async fn watch_node_broadcast(
self,
mut from_collector: Receiver<CollectorUpdateEvent>,
from_collector: Receiver<CollectorUpdateEvent>,
api_service: CollectorStorageApi,
) {
let mut trackers = HashMap::new();
Expand All @@ -277,6 +277,7 @@ impl ParachainTracer {
let mut best_known_block: u32 = 0;
let max_stall = self.opts.max_parachain_stall;
let mut futures = FuturesUnordered::new();
let mut from_collector = Box::pin(from_collector);

loop {
tokio::select! {
Expand Down
1 change: 1 addition & 0 deletions priority-channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ publish = true
[dependencies]
async-channel = { workspace = true }
futures = { workspace = true }
pin-project-lite = "0.2.13"
rand = { workspace = true }
thiserror = { workspace = true }

Expand Down
22 changes: 14 additions & 8 deletions priority-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{
use async_channel::bounded;
pub use async_channel::{TryRecvError, TrySendError};
use futures::Stream;
use pin_project_lite::pin_project;

/// Creates a new channel with a specified capacity and returns a tuple of Sender and Receiver structs.
///
Expand All @@ -44,11 +45,15 @@ pub fn channel_with_capacities<T>(bulk_capacity: usize, priority_capacity: usize
(Sender { inner_priority: tx_priority, inner: tx }, Receiver { inner_priority: rx_priority, inner: rx })
}

/// A receiver tracking the messages consumed by itself.
#[derive(Debug)]
pub struct Receiver<T> {
inner: async_channel::Receiver<T>,
inner_priority: async_channel::Receiver<T>,
pin_project! {
/// A receiver tracking the messages consumed by itself.
#[derive(Debug)]
pub struct Receiver<T> {
#[pin]
inner: async_channel::Receiver<T>,
#[pin]
inner_priority: async_channel::Receiver<T>,
}
}

impl<T> std::ops::Deref for Receiver<T> {
Expand All @@ -66,10 +71,11 @@ impl<T> std::ops::DerefMut for Receiver<T> {

impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match async_channel::Receiver::poll_next(Pin::new(&mut self.inner_priority), cx) {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match async_channel::Receiver::poll_next(this.inner_priority, cx) {
Poll::Ready(maybe_value) => Poll::Ready(maybe_value),
Poll::Pending => match async_channel::Receiver::poll_next(Pin::new(&mut self.inner), cx) {
Poll::Pending => match async_channel::Receiver::poll_next(this.inner, cx) {
Poll::Ready(maybe_value) => Poll::Ready(maybe_value),
Poll::Pending => Poll::Pending,
},
Expand Down

0 comments on commit 76842f5

Please sign in to comment.