stream-download is a library for
streaming content from a remote location to a local cache and using it as a
read
and
seek
-able source.
The requested content is downloaded in the background and read or seek
operations are allowed before the download is finished. Seek operations may
cause the stream to be restarted from the requested position if the download is
still in progress. This is useful for media applications that need to stream
large files that may take a long time to download.
This library makes heavy use of the adapter pattern to allow for pluggable transports and storage implementations.
cargo add stream-download
http
- adds an HTTP-based implementation of theSourceStream
trait (enabled by default).reqwest
- enables streaming content over http using reqwest (enabled by default).reqwest-native-tls
- enables reqwest'snative-tls
feature. Also enables thereqwest
feature.reqwest-rustls
- enables reqwest'srustls
feature. Also enables thereqwest
feature.reqwest-middleware
- enables integration withreqwest-middleware
. Can be used to add retry policies and additional observability. Also enables thereqwest
feature.open-dal
- adds aSourceStream
implementation that uses Apache OpenDAL as the backend.async-read
- adds aSourceStream
implementation for any type implementingAsyncRead
.temp-storage
- adds a temporary file-based storage backend (enabled by default).
One of reqwest-native-tls
or reqwest-rustls
is required if you wish to use
https streams.
use std::error::Error;
use std::io;
use std::io::Read;
use std::result::Result;
use stream_download::source::DecodeError;
use stream_download::storage::temp::TempStorageProvider;
use stream_download::{Settings, StreamDownload};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut reader = match StreamDownload::new_http(
"https://some-cool-url.com/some-file.mp3".parse()?,
TempStorageProvider::new(),
Settings::default(),
)
.await
{
Ok(reader) => reader,
Err(e) => Err(e.decode_error().await)?,
};
tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
reader.read_to_end(&mut buf)?;
Ok::<_, io::Error>(())
})
.await??;
Ok(())
}
See examples.
Transports implement the
SourceStream
trait. A few types of transports are provided out of the box:
http
for typical HTTP-based sources.open_dal
which is more complex, but supports a large variety of services.async_read
for any source implementingAsyncRead
.
Only http
is enabled by default. You can provide a custom transport by
implementing SourceStream
yourself.
Resources such as standalone songs or videos have a finite length that we use to support certain seeking functionality. Infinite streams or those that otherwise don't have a known length are still supported, but attempting to seek from the end of the stream will return an error. This may cause issues with certain audio or video libraries that attempt to perform such seek operations. If it's necessary to explicitly check for an infinite stream, you can check the stream's content length ahead of time.
use std::error::Error;
use std::io;
use std::io::Read;
use std::result::Result;
use stream_download::http::reqwest::Client;
use stream_download::http::HttpStream;
use stream_download::source::DecodeError;
use stream_download::source::SourceStream;
use stream_download::storage::temp::TempStorageProvider;
use stream_download::{Settings, StreamDownload};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let stream =
HttpStream::<Client>::create("https://some-cool-url.com/some-stream".parse()?).await?;
let content_length = stream.content_length();
let is_infinite = content_length.is_none();
println!("Infinite stream = {is_infinite}");
let mut reader = match StreamDownload::from_stream(
stream,
TempStorageProvider::default(),
Settings::default(),
)
.await
{
Ok(reader) => reader,
Err(e) => Err(e.decode_error().await)?,
};
tokio::task::spawn_blocking(move || {
let mut buf = [0; 256];
reader.read_exact(&mut buf)?;
Ok::<_, io::Error>(())
})
.await??;
Ok(())
}
If you're using this library to handle Icecast streams or one if its
derivatives, check out the icy-metadata
crate. There are examples for how to use it with stream-download
in the repo.
The storage module provides ways to customize how the stream is cached locally. Pre-configured implementations are available for memory and temporary file-based storage. Typically you'll want to use temporary file-based storage to prevent using too much memory, but memory-based storage may be preferable if you know the stream size is small or you need to run your application on a read-only filesystem.
use std::error::Error;
use std::io::Read;
use std::result::Result;
use stream_download::source::DecodeError;
use stream_download::storage::memory::MemoryStorageProvider;
use stream_download::{Settings, StreamDownload};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut reader = match StreamDownload::new_http(
"https://some-cool-url.com/some-file.mp3".parse()?,
// buffer will be stored in memory instead of on disk
MemoryStorageProvider,
Settings::default(),
)
.await
{
Ok(reader) => reader,
Err(e) => Err(e.decode_error().await)?,
};
Ok(())
}
When using infinite streams which don't need to support seeking, it usually isn't desirable to let the underlying cache grow indefinitely if the stream may be running for a while. For these cases, you may want to use bounded storage. Bounded storage uses a circular buffer which will overwrite the oldest contents once it fills up. If the reader falls too far behind the writer, the writer will pause so the reader can catch up.
use std::error::Error;
use std::io::Read;
use std::num::NonZeroUsize;
use std::result::Result;
use stream_download::source::DecodeError;
use stream_download::storage::bounded::BoundedStorageProvider;
use stream_download::storage::memory::MemoryStorageProvider;
use stream_download::{Settings, StreamDownload};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut reader = match StreamDownload::new_http(
"https://some-cool-url.com/some-file.mp3".parse()?,
// use bounded storage to keep the underlying size from growing indefinitely
BoundedStorageProvider::new(
// you can use any other kind of storage provider here
MemoryStorageProvider,
// be liberal with the buffer size, you need to make sure it holds enough space to
// prevent any out-of-bounds reads
NonZeroUsize::new(512 * 1024).unwrap(),
),
Settings::default(),
)
.await
{
Ok(reader) => reader,
Err(e) => Err(e.decode_error().await)?,
};
Ok(())
}
When you need to support both finite and infinite streams, you may want to use adaptive storage. This is a convenience wrapper that will use bounded storage when the stream has no content length and unbounded storage when the stream does return a content length.
use std::error::Error;
use std::io::Read;
use std::num::NonZeroUsize;
use std::result::Result;
use stream_download::source::DecodeError;
use stream_download::storage::adaptive::AdaptiveStorageProvider;
use stream_download::storage::temp::TempStorageProvider;
use stream_download::{Settings, StreamDownload};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut reader = match StreamDownload::new_http(
"https://some-cool-url.com/some-file.mp3".parse()?,
// use adaptive storage to keep the underlying size from growing indefinitely
// when the content type is not known
AdaptiveStorageProvider::new(
// you can use any other kind of storage provider here
TempStorageProvider::default(),
// be liberal with the buffer size, you need to make sure it holds enough space to
// prevent any out-of-bounds reads
NonZeroUsize::new(512 * 1024).unwrap(),
),
Settings::default(),
)
.await
{
Ok(reader) => reader,
Err(e) => return Err(e.decode_error().await)?,
};
Ok(())
}
Some automatic support is available for retrying stalled streams. See the docs
for
the StreamDownload
struct
for more details.
If using reqwest-middleware
, a retry policy can be used to handle transient
server errors. See
retry_middleware
for an example of adding retry middleware.
It's possible to customize your HTTP requests if you need to perform authentication or change other settings.
See client_options for customizing the HTTP client builder.
See custom_client for dynamically modifying each HTTP request.
The MSRV is currently 1.75.0
.