diff --git a/Cargo.lock b/Cargo.lock index af9939129..07dbe2fb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -221,7 +221,6 @@ dependencies = [ "arc-swap", "bytes", "chrono", - "futures", "futures-util", "heapless", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 554aae9ff..396299b20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ rand = { version = "0.8", optional = true } arc-swap = { version = "1.7.0", optional = true } bytes = { version = "1.0", optional = true, default-features = false } chrono = { version = "0.4.35", optional = true, default-features = false } # 0.4.35 deprecates Duration::seconds() -futures = { version = "0.3.22", optional = true } # Force futures to at least 0.3.22 for minimal-version build futures-util = { version = "0.3", optional = true } heapless = { version = "0.8", optional = true } libc = { version = "0.2.153", default-features = false, optional = true } # 0.2.79 is the first version that has IP_PMTUDISC_OMIT @@ -64,7 +63,7 @@ unstable-client-transport = ["moka", "net", "tracing"] unstable-server-transport = ["arc-swap", "chrono/clock", "libc", "net", "siphasher", "tracing"] unstable-stelline = ["tokio/test-util", "tracing", "tracing-subscriber", "unstable-server-transport", "zonefile"] unstable-validator = ["validate", "zonefile", "unstable-client-transport"] -unstable-zonetree = ["futures", "parking_lot", "serde", "tokio", "tracing"] +unstable-zonetree = ["futures-util", "parking_lot", "serde", "tokio", "tracing"] [dev-dependencies] lazy_static = { version = "1.4.0" } diff --git a/examples/server-transports.rs b/examples/server-transports.rs index ece257ce6..1a4c03136 100644 --- a/examples/server-transports.rs +++ b/examples/server-transports.rs @@ -11,14 +11,16 @@ use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use std::sync::RwLock; +use std::vec::Vec; -use futures::channel::mpsc::unbounded; -use futures::stream::{once, Empty, Once, Stream}; +use futures_util::stream::{once, Empty, Once, Stream}; use octseq::{FreezeBuilder, Octets}; use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket}; +use tokio::sync::mpsc::unbounded_channel; use tokio::time::Instant; use tokio_rustls::rustls; use tokio_rustls::TlsAcceptor; +use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_tfo::{TfoListener, TfoStream}; use tracing_subscriber::EnvFilter; @@ -43,7 +45,6 @@ use domain::net::server::sock::AsyncAccept; use domain::net::server::stream::StreamServer; use domain::net::server::util::{mk_builder_for_target, service_fn}; use domain::rdata::{Soa, A}; -use std::vec::Vec; //----------- mk_answer() ---------------------------------------------------- @@ -171,7 +172,7 @@ impl Service> for MyAsyncStreamingService { return Box::pin(immediate_result) as Self::Stream; } - let (sender, receiver) = unbounded(); + let (sender, receiver) = unbounded_channel(); let cloned_sender = sender.clone(); tokio::spawn(async move { @@ -180,22 +181,22 @@ impl Service> for MyAsyncStreamingService { let builder = mk_builder_for_target(); let additional = mk_soa_answer(&request, builder).unwrap(); let item = Ok(CallResult::new(additional)); - cloned_sender.unbounded_send(item).unwrap(); + cloned_sender.send(item).unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; let builder = mk_builder_for_target(); let additional = mk_answer(&request, builder).unwrap(); let item = Ok(CallResult::new(additional)); - cloned_sender.unbounded_send(item).unwrap(); + cloned_sender.send(item).unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; let builder = mk_builder_for_target(); let additional = mk_soa_answer(&request, builder).unwrap(); let item = Ok(CallResult::new(additional)); - cloned_sender.unbounded_send(item).unwrap(); + cloned_sender.send(item).unwrap(); }); - Box::pin(receiver) as Self::Stream + Box::pin(UnboundedReceiverStream::new(receiver)) as Self::Stream }) } } diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index a805b1b17..7fce0c141 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use std::sync::Arc; use arc_swap::ArcSwap; -use futures::StreamExt; +use futures_util::StreamExt; use octseq::Octets; use tokio::io::{ AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index 752b8f4ca..e44488290 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -21,7 +21,7 @@ use std::string::ToString; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwap; -use futures::prelude::stream::StreamExt; +use futures_util::stream::StreamExt; use octseq::Octets; use tokio::io::ReadBuf; use tokio::net::UdpSocket; diff --git a/src/net/server/middleware/cookies.rs b/src/net/server/middleware/cookies.rs index 58708e504..fd6b089be 100644 --- a/src/net/server/middleware/cookies.rs +++ b/src/net/server/middleware/cookies.rs @@ -5,7 +5,7 @@ use core::ops::ControlFlow; use std::vec::Vec; -use futures::stream::{once, Once, Stream}; +use futures_util::stream::{once, Once, Stream}; use octseq::Octets; use rand::RngCore; use tracing::{debug, error, trace, warn}; diff --git a/src/net/server/middleware/edns.rs b/src/net/server/middleware/edns.rs index affe96362..fea9fba30 100644 --- a/src/net/server/middleware/edns.rs +++ b/src/net/server/middleware/edns.rs @@ -3,7 +3,7 @@ use core::future::{ready, Ready}; use core::marker::PhantomData; use core::ops::ControlFlow; -use futures::stream::{once, Once, Stream}; +use futures_util::stream::{once, Once, Stream}; use octseq::Octets; use tracing::{debug, enabled, error, trace, warn, Level}; @@ -398,7 +398,7 @@ mod tests { use std::vec::Vec; use bytes::Bytes; - use futures::stream::StreamExt; + use futures_util::stream::StreamExt; use tokio::time::Instant; use crate::base::{Message, MessageBuilder, Name, Rtype}; diff --git a/src/net/server/middleware/mandatory.rs b/src/net/server/middleware/mandatory.rs index 9d1a9f90b..aebd9dcc8 100644 --- a/src/net/server/middleware/mandatory.rs +++ b/src/net/server/middleware/mandatory.rs @@ -5,7 +5,7 @@ use core::ops::ControlFlow; use std::fmt::Display; -use futures::stream::{once, Once, Stream}; +use futures_util::stream::{once, Once, Stream}; use octseq::Octets; use tracing::{debug, error, trace, warn}; @@ -373,7 +373,7 @@ mod tests { use std::vec::Vec; use bytes::Bytes; - use futures::StreamExt; + use futures_util::StreamExt; use tokio::time::Instant; use crate::base::iana::Rcode; diff --git a/src/net/server/middleware/stream.rs b/src/net/server/middleware/stream.rs index 6d4c7fb2a..f16ce3cd5 100644 --- a/src/net/server/middleware/stream.rs +++ b/src/net/server/middleware/stream.rs @@ -4,8 +4,8 @@ use core::task::{ready, Context, Poll}; use std::pin::Pin; -use futures::prelude::future::FutureExt; -use futures::stream::{Stream, StreamExt}; +use futures_util::future::FutureExt; +use futures_util::stream::{Stream, StreamExt}; use octseq::Octets; use tracing::trace; @@ -93,7 +93,7 @@ where enum PostprocessingStreamState where - Stream: futures::stream::Stream, + Stream: futures_util::stream::Stream, Future: core::future::Future, { Pending(Future), @@ -124,7 +124,7 @@ pub struct PostprocessingStream< > where RequestOctets: Octets + Send + Sync + Unpin, Future: core::future::Future, - Stream: futures::stream::Stream, + Stream: futures_util::stream::Stream, { request: Request, state: PostprocessingStreamState, @@ -148,7 +148,7 @@ impl where RequestOctets: Octets + Send + Sync + Unpin, Future: core::future::Future, - Stream: futures::stream::Stream, + Stream: futures_util::stream::Stream, { pub fn new( svc_call_fut: Future, @@ -173,7 +173,7 @@ where //--- impl Stream impl - futures::stream::Stream + futures_util::stream::Stream for PostprocessingStream< RequestOctets, Future, @@ -184,7 +184,7 @@ impl where RequestOctets: Octets + Send + Sync + Unpin, Future: core::future::Future + Unpin, - Stream: futures::stream::Stream + Unpin, + Stream: futures_util::stream::Stream + Unpin, Self: Unpin, RequestMeta: Clone, PostProcessingMeta: Clone, diff --git a/src/net/server/service.rs b/src/net/server/service.rs index f4b0fa1fc..99f9f6b48 100644 --- a/src/net/server/service.rs +++ b/src/net/server/service.rs @@ -48,7 +48,7 @@ pub type ServiceResult = Result, ServiceError>; /// /// use std::task::{Context, Poll}; /// -/// use futures::stream::{once, Once, Stream}; +/// use futures_util::stream::{once, Once, Stream}; /// /// use domain::base::iana::{Class, Rcode}; /// use domain::base::message_builder::AdditionalBuilder; @@ -140,7 +140,7 @@ pub type ServiceResult = Result, ServiceError>; /// The above are minimalist examples to illustrate what you need to do, but /// lacking any actual useful behaviour. They also only demonstrate returning /// a response stream containing a single immediately available value via -/// `futures::stream::Once` and `std::future::Ready`. +/// `futures_util::stream::Once` and `std::future::Ready`. /// /// In your own [`Service`] impl you would implement actual business logic /// returning single or multiple responses synchronously or asynchronously as @@ -181,7 +181,7 @@ pub trait Service< type Target; /// The type of stream that the service produces. - type Stream: futures::stream::Stream> + type Stream: futures_util::stream::Stream> + Unpin; /// The type of future that will yield the service result stream. diff --git a/src/net/server/tests/unit.rs b/src/net/server/tests/unit.rs index 7d50ff628..d292afd6f 100644 --- a/src/net/server/tests/unit.rs +++ b/src/net/server/tests/unit.rs @@ -286,7 +286,7 @@ impl MySingle { } } -impl futures::stream::Stream for MySingle { +impl futures_util::stream::Stream for MySingle { type Item = Result>, ServiceError>; fn poll_next( diff --git a/src/net/server/util.rs b/src/net/server/util.rs index adf5f969c..220b6171f 100644 --- a/src/net/server/util.rs +++ b/src/net/server/util.rs @@ -4,7 +4,7 @@ use core::future::{ready, Ready}; use core::marker::PhantomData; use std::string::{String, ToString}; -use futures::stream::Once; +use futures_util::stream::Once; use octseq::{Octets, OctetsBuilder}; use tracing::warn; diff --git a/src/zonetree/in_memory/write.rs b/src/zonetree/in_memory/write.rs index c4a787b5f..5afa1a46c 100644 --- a/src/zonetree/in_memory/write.rs +++ b/src/zonetree/in_memory/write.rs @@ -9,7 +9,7 @@ use std::sync::Weak; use std::vec::Vec; use std::{fmt, io}; -use futures::future::Either; +use futures_util::future::Either; use parking_lot::RwLock; use tokio::sync::OwnedMutexGuard;