Skip to content

Commit

Permalink
Revert regression of changes in 0.9.3 that switched from the futures …
Browse files Browse the repository at this point in the history
…crate to futures-util instead. (NLnetLabs#389)
  • Loading branch information
ximon18 authored Sep 13, 2024
1 parent 874cc6f commit 1d022f4
Show file tree
Hide file tree
Showing 13 changed files with 30 additions and 31 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ rand = { version = "0.8", optional = true }
arc-swap = { version = "1.7.0", optional = true }
bytes = { version = "1.0", optional = true, default-features = false }
chrono = { version = "0.4.35", optional = true, default-features = false } # 0.4.35 deprecates Duration::seconds()
futures = { version = "0.3.22", optional = true } # Force futures to at least 0.3.22 for minimal-version build
futures-util = { version = "0.3", optional = true }
heapless = { version = "0.8", optional = true }
libc = { version = "0.2.153", default-features = false, optional = true } # 0.2.79 is the first version that has IP_PMTUDISC_OMIT
Expand Down Expand Up @@ -64,7 +63,7 @@ unstable-client-transport = ["moka", "net", "tracing"]
unstable-server-transport = ["arc-swap", "chrono/clock", "libc", "net", "siphasher", "tracing"]
unstable-stelline = ["tokio/test-util", "tracing", "tracing-subscriber", "unstable-server-transport", "zonefile"]
unstable-validator = ["validate", "zonefile", "unstable-client-transport"]
unstable-zonetree = ["futures", "parking_lot", "serde", "tokio", "tracing"]
unstable-zonetree = ["futures-util", "parking_lot", "serde", "tokio", "tracing"]

[dev-dependencies]
lazy_static = { version = "1.4.0" }
Expand Down
17 changes: 9 additions & 8 deletions examples/server-transports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::RwLock;
use std::vec::Vec;

use futures::channel::mpsc::unbounded;
use futures::stream::{once, Empty, Once, Stream};
use futures_util::stream::{once, Empty, Once, Stream};
use octseq::{FreezeBuilder, Octets};
use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket};
use tokio::sync::mpsc::unbounded_channel;
use tokio::time::Instant;
use tokio_rustls::rustls;
use tokio_rustls::TlsAcceptor;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tfo::{TfoListener, TfoStream};
use tracing_subscriber::EnvFilter;

Expand All @@ -43,7 +45,6 @@ use domain::net::server::sock::AsyncAccept;
use domain::net::server::stream::StreamServer;
use domain::net::server::util::{mk_builder_for_target, service_fn};
use domain::rdata::{Soa, A};
use std::vec::Vec;

//----------- mk_answer() ----------------------------------------------------

Expand Down Expand Up @@ -171,7 +172,7 @@ impl Service<Vec<u8>> for MyAsyncStreamingService {
return Box::pin(immediate_result) as Self::Stream;
}

let (sender, receiver) = unbounded();
let (sender, receiver) = unbounded_channel();
let cloned_sender = sender.clone();

tokio::spawn(async move {
Expand All @@ -180,22 +181,22 @@ impl Service<Vec<u8>> for MyAsyncStreamingService {
let builder = mk_builder_for_target();
let additional = mk_soa_answer(&request, builder).unwrap();
let item = Ok(CallResult::new(additional));
cloned_sender.unbounded_send(item).unwrap();
cloned_sender.send(item).unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;
let builder = mk_builder_for_target();
let additional = mk_answer(&request, builder).unwrap();
let item = Ok(CallResult::new(additional));
cloned_sender.unbounded_send(item).unwrap();
cloned_sender.send(item).unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;
let builder = mk_builder_for_target();
let additional = mk_soa_answer(&request, builder).unwrap();
let item = Ok(CallResult::new(additional));
cloned_sender.unbounded_send(item).unwrap();
cloned_sender.send(item).unwrap();
});

Box::pin(receiver) as Self::Stream
Box::pin(UnboundedReceiverStream::new(receiver)) as Self::Stream
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/net/server/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use arc_swap::ArcSwap;
use futures::StreamExt;
use futures_util::StreamExt;
use octseq::Octets;
use tokio::io::{
AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf,
Expand Down
2 changes: 1 addition & 1 deletion src/net/server/dgram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::string::ToString;
use std::sync::{Arc, Mutex};

use arc_swap::ArcSwap;
use futures::prelude::stream::StreamExt;
use futures_util::stream::StreamExt;
use octseq::Octets;
use tokio::io::ReadBuf;
use tokio::net::UdpSocket;
Expand Down
2 changes: 1 addition & 1 deletion src/net/server/middleware/cookies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use core::ops::ControlFlow;

use std::vec::Vec;

use futures::stream::{once, Once, Stream};
use futures_util::stream::{once, Once, Stream};
use octseq::Octets;
use rand::RngCore;
use tracing::{debug, error, trace, warn};
Expand Down
4 changes: 2 additions & 2 deletions src/net/server/middleware/edns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use core::future::{ready, Ready};
use core::marker::PhantomData;
use core::ops::ControlFlow;

use futures::stream::{once, Once, Stream};
use futures_util::stream::{once, Once, Stream};
use octseq::Octets;
use tracing::{debug, enabled, error, trace, warn, Level};

Expand Down Expand Up @@ -398,7 +398,7 @@ mod tests {
use std::vec::Vec;

use bytes::Bytes;
use futures::stream::StreamExt;
use futures_util::stream::StreamExt;
use tokio::time::Instant;

use crate::base::{Message, MessageBuilder, Name, Rtype};
Expand Down
4 changes: 2 additions & 2 deletions src/net/server/middleware/mandatory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use core::ops::ControlFlow;

use std::fmt::Display;

use futures::stream::{once, Once, Stream};
use futures_util::stream::{once, Once, Stream};
use octseq::Octets;
use tracing::{debug, error, trace, warn};

Expand Down Expand Up @@ -373,7 +373,7 @@ mod tests {
use std::vec::Vec;

use bytes::Bytes;
use futures::StreamExt;
use futures_util::StreamExt;
use tokio::time::Instant;

use crate::base::iana::Rcode;
Expand Down
14 changes: 7 additions & 7 deletions src/net/server/middleware/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use core::task::{ready, Context, Poll};

use std::pin::Pin;

use futures::prelude::future::FutureExt;
use futures::stream::{Stream, StreamExt};
use futures_util::future::FutureExt;
use futures_util::stream::{Stream, StreamExt};
use octseq::Octets;
use tracing::trace;

Expand Down Expand Up @@ -93,7 +93,7 @@ where

enum PostprocessingStreamState<Future, Stream>
where
Stream: futures::stream::Stream,
Stream: futures_util::stream::Stream,
Future: core::future::Future<Output = Stream>,
{
Pending(Future),
Expand Down Expand Up @@ -124,7 +124,7 @@ pub struct PostprocessingStream<
> where
RequestOctets: Octets + Send + Sync + Unpin,
Future: core::future::Future<Output = Stream>,
Stream: futures::stream::Stream,
Stream: futures_util::stream::Stream,
{
request: Request<RequestOctets, RequestMeta>,
state: PostprocessingStreamState<Future, Stream>,
Expand All @@ -148,7 +148,7 @@ impl<RequestOctets, Future, Stream, RequestMeta, PostProcessingMeta>
where
RequestOctets: Octets + Send + Sync + Unpin,
Future: core::future::Future<Output = Stream>,
Stream: futures::stream::Stream,
Stream: futures_util::stream::Stream,
{
pub fn new(
svc_call_fut: Future,
Expand All @@ -173,7 +173,7 @@ where
//--- impl Stream

impl<RequestOctets, Future, Stream, RequestMeta, PostProcessingMeta>
futures::stream::Stream
futures_util::stream::Stream
for PostprocessingStream<
RequestOctets,
Future,
Expand All @@ -184,7 +184,7 @@ impl<RequestOctets, Future, Stream, RequestMeta, PostProcessingMeta>
where
RequestOctets: Octets + Send + Sync + Unpin,
Future: core::future::Future<Output = Stream> + Unpin,
Stream: futures::stream::Stream + Unpin,
Stream: futures_util::stream::Stream + Unpin,
Self: Unpin,
RequestMeta: Clone,
PostProcessingMeta: Clone,
Expand Down
6 changes: 3 additions & 3 deletions src/net/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub type ServiceResult<Target> = Result<CallResult<Target>, ServiceError>;
///
/// use std::task::{Context, Poll};
///
/// use futures::stream::{once, Once, Stream};
/// use futures_util::stream::{once, Once, Stream};
///
/// use domain::base::iana::{Class, Rcode};
/// use domain::base::message_builder::AdditionalBuilder;
Expand Down Expand Up @@ -140,7 +140,7 @@ pub type ServiceResult<Target> = Result<CallResult<Target>, ServiceError>;
/// The above are minimalist examples to illustrate what you need to do, but
/// lacking any actual useful behaviour. They also only demonstrate returning
/// a response stream containing a single immediately available value via
/// `futures::stream::Once` and `std::future::Ready`.
/// `futures_util::stream::Once` and `std::future::Ready`.
///
/// In your own [`Service`] impl you would implement actual business logic
/// returning single or multiple responses synchronously or asynchronously as
Expand Down Expand Up @@ -181,7 +181,7 @@ pub trait Service<
type Target;

/// The type of stream that the service produces.
type Stream: futures::stream::Stream<Item = ServiceResult<Self::Target>>
type Stream: futures_util::stream::Stream<Item = ServiceResult<Self::Target>>
+ Unpin;

/// The type of future that will yield the service result stream.
Expand Down
2 changes: 1 addition & 1 deletion src/net/server/tests/unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl MySingle {
}
}

impl futures::stream::Stream for MySingle {
impl futures_util::stream::Stream for MySingle {
type Item = Result<CallResult<Vec<u8>>, ServiceError>;

fn poll_next(
Expand Down
2 changes: 1 addition & 1 deletion src/net/server/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use core::future::{ready, Ready};
use core::marker::PhantomData;
use std::string::{String, ToString};

use futures::stream::Once;
use futures_util::stream::Once;
use octseq::{Octets, OctetsBuilder};
use tracing::warn;

Expand Down
2 changes: 1 addition & 1 deletion src/zonetree/in_memory/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Weak;
use std::vec::Vec;
use std::{fmt, io};

use futures::future::Either;
use futures_util::future::Either;
use parking_lot::RwLock;
use tokio::sync::OwnedMutexGuard;

Expand Down

0 comments on commit 1d022f4

Please sign in to comment.