Skip to content

Commit

Permalink
fix invalid response id after closing file in blocking context
Browse files Browse the repository at this point in the history
Signed-off-by: silver-ymz <yinmingzhuo@gmail.com>
  • Loading branch information
silver-ymz committed Aug 9, 2023
1 parent 5f4799a commit eb5f465
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 42 deletions.
11 changes: 10 additions & 1 deletion src/auxiliary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{lowlevel::Extensions, SftpAuxiliaryData};
use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};

use once_cell::sync::OnceCell;
use tokio::sync::Notify;
use tokio::{runtime::Handle, sync::Notify};
use tokio_util::sync::CancellationToken;

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -55,13 +55,16 @@ pub(super) struct Auxiliary {
pub(super) auxiliary_data: SftpAuxiliaryData,

pub(super) tokio_compat_file_write_limit: usize,

pub(super) tokio_handle: Handle,
}

impl Auxiliary {
pub(super) fn new(
max_pending_requests: u16,
auxiliary_data: SftpAuxiliaryData,
tokio_compat_file_write_limit: usize,
tokio_handle: Handle,
) -> Self {
Self {
conn_info: OnceCell::new(),
Expand All @@ -83,6 +86,8 @@ impl Auxiliary {
auxiliary_data,

tokio_compat_file_write_limit,

tokio_handle,
}
}

Expand Down Expand Up @@ -162,4 +167,8 @@ impl Auxiliary {
pub(super) fn tokio_compat_file_write_limit(&self) -> usize {
self.tokio_compat_file_write_limit
}

pub(super) fn tokio_handle(&self) -> &Handle {
&self.tokio_handle
}
}
2 changes: 2 additions & 0 deletions src/changelog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#[allow(unused_imports)]
use crate::*;

/// # Fixed
/// Invalid response id after closing file caused by the change in v0.13.8
#[doc(hidden)]
pub mod unreleased {}

Expand Down
32 changes: 11 additions & 21 deletions src/file/tokio_compat_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ use std::{
use bytes::{Buf, Bytes, BytesMut};
use derive_destructure2::destructure;
use pin_project::{pin_project, pinned_drop};
use tokio::{
io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf},
runtime,
};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use tokio_io_utility::ready;
use tokio_util::sync::WaitForCancellationFutureOwned;

Expand Down Expand Up @@ -747,31 +744,24 @@ impl TokioCompatFile {
/// when they should not fail.
#[pinned_drop]
impl PinnedDrop for TokioCompatFile {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
fn drop(mut self: Pin<&mut Self>) {
let this = self.as_mut().project();

let file = this.inner.clone();
let read_future = this.read_future.take();
let write_futures = mem::take(this.write_futures);

let cancellation_fut = file
.inner
.get_auxiliary()
.cancel_token
.clone()
.cancelled_owned();
let cancellation_fut = self.auxiliary().cancel_token.clone().cancelled_owned();

let do_drop_fut = Self::do_drop(file, read_future, write_futures);

if let Ok(handle) = runtime::Handle::try_current() {
handle.spawn(async move {
tokio::select! {
biased;
self.auxiliary().tokio_handle().spawn(async move {
tokio::select! {
biased;

_ = cancellation_fut => (),
_ = do_drop_fut => (),
}
});
}
_ = cancellation_fut => (),
_ = do_drop_fut => (),
}
});
}
}
17 changes: 7 additions & 10 deletions src/fs/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::{

use futures_core::stream::{FusedStream, Stream};
use pin_project::{pin_project, pinned_drop};
use tokio::runtime;
use tokio_util::sync::WaitForCancellationFutureOwned;

type ResponseFuture = crate::lowlevel::AwaitableNameEntriesFuture<crate::Buffer>;
Expand Down Expand Up @@ -195,15 +194,13 @@ impl PinnedDrop for ReadDir {
let cancellation_fut = dir.0.get_auxiliary().cancel_token.clone().cancelled_owned();
let do_drop_fut = Self::do_drop(dir, future);

if let Ok(handle) = runtime::Handle::try_current() {
handle.spawn(async move {
tokio::select! {
biased;
this.dir.0.get_auxiliary().tokio_handle().spawn(async move {
tokio::select! {
biased;

_ = cancellation_fut => (),
_ = do_drop_fut => (),
}
});
}
_ = cancellation_fut => (),
_ = do_drop_fut => (),
}
});
}
}
18 changes: 8 additions & 10 deletions src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,14 @@ impl Drop for OwnedHandle {
// size of the Future blows out, becomes double of its size.
// 3. the more states the Futures have, the harder it is to optimize and take advantage of the niche.
let future = response.wait();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _res = future.await;
#[cfg(feature = "tracing")]
match _res {
Ok(_) => tracing::debug!("close handle success"),
Err(err) => tracing::error!(?err, "failed to close handle"),
}
});
}
self.get_auxiliary().tokio_handle().spawn(async move {
let _res = future.await;
#[cfg(feature = "tracing")]
match _res {
Ok(_) => tracing::debug!("close handle success"),
Err(err) => tracing::error!(?err, "failed to close handle"),
}
});
}
Err(_err) => {
#[cfg(feature = "tracing")]
Expand Down
4 changes: 4 additions & 0 deletions src/sftp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{
use derive_destructure2::destructure;
use tokio::{
io::{AsyncRead, AsyncWrite},
runtime::Handle,
sync::oneshot::Receiver,
task::JoinHandle,
};
Expand Down Expand Up @@ -146,6 +147,7 @@ impl Sftp {
options.get_max_pending_requests(),
auxiliary,
options.get_tokio_compat_file_write_limit(),
Handle::current(),
))?;

let flush_task = create_flush_task(
Expand All @@ -171,13 +173,15 @@ impl Sftp {
max_pending_requests: u16,
auxiliary: SftpAuxiliaryData,
tokio_compat_file_write_limit: usize,
tokio_handle: Handle,
) -> Result<WriteEnd, Error> {
connect(
MpscQueue::with_capacity(write_end_buffer_size),
Auxiliary::new(
max_pending_requests,
auxiliary,
tokio_compat_file_write_limit,
tokio_handle,
),
)
}
Expand Down

0 comments on commit eb5f465

Please sign in to comment.