Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/cargo/opentelemetry-aws-0.8.0
Browse files Browse the repository at this point in the history
  • Loading branch information
elliedavidson authored Aug 28, 2023
2 parents f830acd + 62b3e0b commit 73906a3
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 109 deletions.
80 changes: 49 additions & 31 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,6 @@ edition = "2021"
[features]

# async_std
full-ci = ["async-std-executor", "logging-utils", "channel-async-std"]

tokio-ci = ["tokio-executor", "logging-utils", "channel-tokio"]
async-std-executor = ["dep:async-std"]
tokio-executor = ["dep:tokio", "dep:tokio-stream", "dep:console-subscriber"]
channel-flume = ["flume"]
channel-tokio = ["dep:tokio", "dep:tokio-stream"]
channel-async-std = ["dep:async-channel"]
profiling = [
"opentelemetry-jaeger",
"tracing-opentelemetry",
Expand All @@ -27,38 +19,15 @@ logging-utils = ["tracing-subscriber"]

[dependencies]
async-lock = "2.7"
async-std = { version = "1.12", features = [
"attributes",
"unstable",
], optional = true }
async-channel = { version = "1.9.0", optional = true }
async-trait = "0.1.71"
color-eyre = "0.6.2"
flume = { version = "0.10.14", optional = true }
futures = "0.3.28"
tokio = { version = "1", optional = true, features = [
"fs",
"io-util",
"io-std",
"macros",
"net",
"parking_lot",
"process",
"rt",
"rt-multi-thread",
"signal",
"sync",
"time",
"tracing",
] }
tokio-stream = { version = "0.1.14", optional = true }
tracing = "0.1.37"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.17", features = [
"env-filter",
"json",
], optional = true }
console-subscriber = { version = "0.1.10", optional = true }
opentelemetry = { version = "0.19.0", features = [
"rt-tokio-current-thread",
"metrics",
Expand All @@ -69,3 +38,52 @@ opentelemetry-jaeger = { version = "0.17.0", features = [
"rt-tokio-current-thread",
], optional = true }
opentelemetry-aws = { version = "0.8.0", features = ["trace"], optional = true }

[target.'cfg(all(async_executor_impl = "tokio"))'.dependencies]
tokio = { version = "1", features = [
"fs",
"io-util",
"io-std",
"macros",
"net",
"parking_lot",
"process",
"rt",
"rt-multi-thread",
"signal",
"sync",
"time",
"tracing",
] }
tokio-stream = { version = "0.1.14" }

[target.'cfg(all(async_executor_impl = "async-std"))'.dependencies]
async-std = { version = "1.12", features = [
"attributes",
"unstable",
]}

[target.'cfg(all(async_channel_impl = "tokio"))'.dependencies]
console-subscriber = { version = "0.1.10" }
tokio = { version = "1", features = [
"fs",
"io-util",
"io-std",
"macros",
"net",
"parking_lot",
"process",
"rt",
"rt-multi-thread",
"signal",
"sync",
"time",
"tracing",
] }
tokio-stream = { version = "0.1.14" }

[target.'cfg(all(async_channel_impl = "async-std"))'.dependencies]
async-channel = { version = "1.9.0" }

[target.'cfg(all(async_channel_impl = "flume"))'.dependencies]
flume = { version = "0.10.14" }
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@

This crate exports four things:

- A compatibility/abstraction layer for writing async-executor agnostic code. We support two async executors: async-std and tokio. Each may be toggled with a feature flag.
- A compatibility/abstraction layer for writing async channel agnostic code. We support three async channel implementations: async-std-channels. Each may be toggled with a feature flag.
- A compatibility/abstraction layer for writing async-executor agnostic code. We support two async executors: async-std and tokio. Each may be toggled with a configuration flag.
- A compatibility/abstraction layer for writing async channel agnostic code. We support three async channel implementations: async-std-channels. Each may be toggled with a configuration flag.
- A library exporting a bunch of useful async primitives.
- A tracing configuration layer optionally supporting console and opentelemetry integration.

# Example usage

```bash
RUSTFLAGS='--cfg async_executor_impl="tokio" --cfg async_channel_impl="tokio"' cargo build
```

`async_executor_impl` may be either `tokio` or `async-std`. `async_channel_impl` may be either `tokio`, `async-std`, or `flume`. Note that using `tokio` channels requires `tokio` to be the runtime. Note that the async executor impl and async channel impl must be set in order for this crate to compile successfully.


24 changes: 12 additions & 12 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions src/async_primitives/subscribable_mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use futures::{stream::FuturesOrdered, Future, FutureExt};
use std::{fmt, time::Duration};
use tracing::warn;

#[cfg(feature = "async-std-executor")]
#[cfg(all(async_executor_impl = "async-std"))]
use async_std::prelude::StreamExt;
#[cfg(feature = "tokio-executor")]
#[cfg(all(async_executor_impl = "tokio"))]
use tokio_stream::StreamExt;
#[cfg(not(any(feature = "async-std-executor", feature = "tokio-executor")))]
std::compile_error! {"Either feature \"async-std-executor\" or feature \"tokio-executor\" must be enabled for this crate."}
#[cfg(not(any(async_executor_impl = "async-std", async_executor_impl = "tokio")))]
std::compile_error! {"The cfg flag async_executor_impl must be set in rustflags to either \"async-std\" or \"tokio\" for this crate. Try adding `--cfg async_executor_impl=\"tokio\""}

/// A mutex that can register subscribers to be notified. This works in the same way as [`Mutex`], but has some additional functions:
///
Expand Down Expand Up @@ -251,10 +251,10 @@ mod tests {
use std::{sync::Arc, time::Duration};

#[cfg_attr(
feature = "tokio-executor",
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(feature = "async-std-executor", async_std::test)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn test_wait_timeout_until() {
let mutex: Arc<SubscribableMutex<usize>> = Arc::default();
{
Expand All @@ -276,10 +276,10 @@ mod tests {
}

#[cfg_attr(
feature = "tokio-executor",
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(feature = "async-std-executor", async_std::test)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn test_wait_timeout_until_fail() {
let mutex: Arc<SubscribableMutex<usize>> = Arc::default();
{
Expand All @@ -300,10 +300,10 @@ mod tests {
}

#[cfg_attr(
feature = "tokio-executor",
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(feature = "async-std-executor", async_std::test)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn test_compare_and_set() {
let mutex = SubscribableMutex::new(5usize);
let subscriber = mutex.subscribe().await;
Expand All @@ -322,10 +322,10 @@ mod tests {
}

#[cfg_attr(
feature = "tokio-executor",
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(feature = "async-std-executor", async_std::test)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn test_subscriber() {
let mutex = SubscribableMutex::new(5usize);
let subscriber = mutex.subscribe().await;
Expand Down
4 changes: 2 additions & 2 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ mod oneshot;
/// Unbounded channels
mod unbounded;

#[cfg(all(feature = "async-std-executor", feature = "channel-tokio"))]
compile_error!("feature 'async-std-executor' and 'channel-tokio' cannot be used at the same time; 'channel-tokio' needs the tokio runtime");
#[cfg(all(async_executor_impl = "async-std", async_channel_impl = "tokio"))]
compile_error!("async_executor_impl = 'async-std-executor' and async_channel_impl = 'channel-tokio' cannot be used together; 'channel-tokio' needs the tokio runtime");

pub use bounded::{bounded, BoundedStream, Receiver, RecvError, SendError, Sender, TryRecvError};
pub use oneshot::{oneshot, OneShotReceiver, OneShotRecvError, OneShotSender, OneShotTryRecvError};
Expand Down
28 changes: 14 additions & 14 deletions src/channel/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::pin::Pin;
use futures::Stream;

/// inner module, used to group feature-specific imports
#[cfg(feature = "channel-tokio")]
#[cfg(all(async_channel_impl = "tokio"))]
mod inner {
pub use tokio::sync::mpsc::error::{SendError, TryRecvError};

Expand Down Expand Up @@ -45,7 +45,7 @@ mod inner {
}

/// inner module, used to group feature-specific imports
#[cfg(feature = "channel-flume")]
#[cfg(all(async_channel_impl = "flume"))]
mod inner {
pub use flume::{RecvError, SendError, TryRecvError};

Expand Down Expand Up @@ -75,7 +75,7 @@ mod inner {
}

/// inner module, used to group feature-specific imports
#[cfg(feature = "channel-async-std")]
#[cfg(all(async_channel_impl = "async-std"))]
mod inner {
pub use async_channel::{RecvError, SendError, TryRecvError};

Expand Down Expand Up @@ -114,9 +114,9 @@ impl<T> Sender<T> {
///
/// Will return an error if the receiver is dropped
pub async fn send(&self, msg: T) -> Result<(), SendError<T>> {
#[cfg(feature = "channel-flume")]
#[cfg(all(async_channel_impl = "flume"))]
let result = self.0.send_async(msg).await;
#[cfg(not(feature = "channel-flume"))]
#[cfg(not(all(async_channel_impl = "flume")))]
let result = self.0.send(msg).await;

result
Expand All @@ -130,11 +130,11 @@ impl<T> Receiver<T> {
///
/// Will return an error if the sender is dropped
pub async fn recv(&mut self) -> Result<T, RecvError> {
#[cfg(feature = "channel-flume")]
#[cfg(all(async_channel_impl = "flume"))]
let result = self.0.recv_async().await;
#[cfg(feature = "channel-tokio")]
#[cfg(all(async_channel_impl = "tokio"))]
let result = self.0.recv().await.ok_or(RecvError);
#[cfg(feature = "channel-async-std")]
#[cfg(all(async_channel_impl = "async-std"))]
let result = self.0.recv().await;

result
Expand All @@ -144,11 +144,11 @@ impl<T> Receiver<T> {
where
T: 'static,
{
#[cfg(feature = "channel-async-std")]
#[cfg(all(async_channel_impl = "async-std"))]
let result = self.0;
#[cfg(feature = "channel-tokio")]
#[cfg(all(async_channel_impl = "tokio"))]
let result = tokio_stream::wrappers::ReceiverStream::new(self.0);
#[cfg(feature = "channel-flume")]
#[cfg(all(async_channel_impl = "flume"))]
let result = self.0.into_stream();

BoundedStream(result)
Expand Down Expand Up @@ -219,14 +219,14 @@ impl<T> Stream for BoundedStream<T> {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
#[cfg(feature = "channel-flume")]
#[cfg(all(async_channel_impl = "flume"))]
return <flume::r#async::RecvStream<T>>::poll_next(Pin::new(&mut self.0), cx);
#[cfg(feature = "channel-tokio")]
#[cfg(all(async_channel_impl = "tokio"))]
return <tokio_stream::wrappers::ReceiverStream<T> as Stream>::poll_next(
Pin::new(&mut self.0),
cx,
);
#[cfg(feature = "channel-async-std")]
#[cfg(all(async_channel_impl = "async-std"))]
return <async_channel::Receiver<T> as Stream>::poll_next(Pin::new(&mut self.0), cx);
}
}
Expand Down
Loading

0 comments on commit 73906a3

Please sign in to comment.