Skip to content

aschey/stream-download-rs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

stream-download-rs

crates.io docs.rs Dependency Status license CI codecov GitHub repo size Lines of Code

stream-download is a library for streaming content from a remote location to a local cache and using it as a read and seek-able source. The requested content is downloaded in the background and read or seek operations are allowed before the download is finished. Seek operations may cause the stream to be restarted from the requested position if the download is still in progress. This is useful for media applications that need to stream large files that may take a long time to download.

This library makes heavy use of the adapter pattern to allow for pluggable transports and storage implementations.

Installation

cargo add stream-download

Feature Flags

  • http - adds an HTTP-based implementation of the SourceStream trait (enabled by default).
  • reqwest - enables streaming content over http using reqwest (enabled by default).
  • reqwest-native-tls - enables reqwest's native-tls feature. Also enables the reqwest feature.
  • reqwest-rustls - enables reqwest's rustls feature. Also enables the reqwest feature.
  • reqwest-middleware - enables integration with reqwest-middleware. Can be used to add retry policies and additional observability. Also enables the reqwest feature.
  • open-dal - adds a SourceStream implementation that uses Apache OpenDAL as the backend.
  • async-read - adds a SourceStream implementation for any type implementing AsyncRead.
  • temp-storage - adds a temporary file-based storage backend (enabled by default).

One of reqwest-native-tls or reqwest-rustls is required if you wish to use https streams.

Usage

use std::error::Error;
use std::io;
use std::io::Read;
use std::result::Result;

use stream_download::source::DecodeError;
use stream_download::storage::temp::TempStorageProvider;
use stream_download::{Settings, StreamDownload};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut reader = match StreamDownload::new_http(
        "https://some-cool-url.com/some-file.mp3".parse()?,
        TempStorageProvider::new(),
        Settings::default(),
    )
    .await
    {
        Ok(reader) => reader,
        Err(e) => Err(e.decode_error().await)?,
    };

    tokio::task::spawn_blocking(move || {
        let mut buf = Vec::new();
        reader.read_to_end(&mut buf)?;
        Ok::<_, io::Error>(())
    })
    .await??;

    Ok(())
}

Examples

See examples.

Transports

Transports implement the SourceStream trait. A few types of transports are provided out of the box:

  • http for typical HTTP-based sources.
  • open_dal which is more complex, but supports a large variety of services.
  • async_read for any source implementing AsyncRead.

Only http is enabled by default. You can provide a custom transport by implementing SourceStream yourself.

Streams with Unknown Length

Resources such as standalone songs or videos have a finite length that we use to support certain seeking functionality. Infinite streams or those that otherwise don't have a known length are still supported, but attempting to seek from the end of the stream will return an error. This may cause issues with certain audio or video libraries that attempt to perform such seek operations. If it's necessary to explicitly check for an infinite stream, you can check the stream's content length ahead of time.

use std::error::Error;
use std::io;
use std::io::Read;
use std::result::Result;

use stream_download::http::reqwest::Client;
use stream_download::http::HttpStream;
use stream_download::source::DecodeError;
use stream_download::source::SourceStream;
use stream_download::storage::temp::TempStorageProvider;
use stream_download::{Settings, StreamDownload};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let stream =
        HttpStream::<Client>::create("https://some-cool-url.com/some-stream".parse()?).await?;
    let content_length = stream.content_length();
    let is_infinite = content_length.is_none();
    println!("Infinite stream = {is_infinite}");

    let mut reader = match StreamDownload::from_stream(
        stream,
        TempStorageProvider::default(),
        Settings::default(),
    )
    .await
    {
        Ok(reader) => reader,
        Err(e) => Err(e.decode_error().await)?,
    };

    tokio::task::spawn_blocking(move || {
        let mut buf = [0; 256];
        reader.read_exact(&mut buf)?;
        Ok::<_, io::Error>(())
    })
    .await??;

    Ok(())
}

Icecast/Shoutcast Streams

If you're using this library to handle Icecast streams or one if its derivatives, check out the icy-metadata crate. There are examples for how to use it with stream-download in the repo.

Storage

The storage module provides ways to customize how the stream is cached locally. Pre-configured implementations are available for memory and temporary file-based storage. Typically you'll want to use temporary file-based storage to prevent using too much memory, but memory-based storage may be preferable if you know the stream size is small or you need to run your application on a read-only filesystem.

use std::error::Error;
use std::io::Read;
use std::result::Result;

use stream_download::source::DecodeError;
use stream_download::storage::memory::MemoryStorageProvider;
use stream_download::{Settings, StreamDownload};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut reader = match StreamDownload::new_http(
        "https://some-cool-url.com/some-file.mp3".parse()?,
        // buffer will be stored in memory instead of on disk
        MemoryStorageProvider,
        Settings::default(),
    )
    .await
    {
        Ok(reader) => reader,
        Err(e) => Err(e.decode_error().await)?,
    };

    Ok(())
}

Bounded Storage

When using infinite streams which don't need to support seeking, it usually isn't desirable to let the underlying cache grow indefinitely if the stream may be running for a while. For these cases, you may want to use bounded storage. Bounded storage uses a circular buffer which will overwrite the oldest contents once it fills up. If the reader falls too far behind the writer, the writer will pause so the reader can catch up.

use std::error::Error;
use std::io::Read;
use std::num::NonZeroUsize;
use std::result::Result;

use stream_download::source::DecodeError;
use stream_download::storage::bounded::BoundedStorageProvider;
use stream_download::storage::memory::MemoryStorageProvider;
use stream_download::{Settings, StreamDownload};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut reader = match StreamDownload::new_http(
        "https://some-cool-url.com/some-file.mp3".parse()?,
        // use bounded storage to keep the underlying size from growing indefinitely
        BoundedStorageProvider::new(
            // you can use any other kind of storage provider here
            MemoryStorageProvider,
            // be liberal with the buffer size, you need to make sure it holds enough space to
            // prevent any out-of-bounds reads
            NonZeroUsize::new(512 * 1024).unwrap(),
        ),
        Settings::default(),
    )
    .await
    {
        Ok(reader) => reader,
        Err(e) => Err(e.decode_error().await)?,
    };

    Ok(())
}

Adaptive Storage

When you need to support both finite and infinite streams, you may want to use adaptive storage. This is a convenience wrapper that will use bounded storage when the stream has no content length and unbounded storage when the stream does return a content length.

use std::error::Error;
use std::io::Read;
use std::num::NonZeroUsize;
use std::result::Result;

use stream_download::source::DecodeError;
use stream_download::storage::adaptive::AdaptiveStorageProvider;
use stream_download::storage::temp::TempStorageProvider;
use stream_download::{Settings, StreamDownload};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut reader = match StreamDownload::new_http(
        "https://some-cool-url.com/some-file.mp3".parse()?,
        // use adaptive storage to keep the underlying size from growing indefinitely
        // when the content type is not known
        AdaptiveStorageProvider::new(
            // you can use any other kind of storage provider here
            TempStorageProvider::default(),
            // be liberal with the buffer size, you need to make sure it holds enough space to
            // prevent any out-of-bounds reads
            NonZeroUsize::new(512 * 1024).unwrap(),
        ),
        Settings::default(),
    )
    .await
    {
        Ok(reader) => reader,
        Err(e) => return Err(e.decode_error().await)?,
    };

    Ok(())
}

Handling Errors and Reconnects

Some automatic support is available for retrying stalled streams. See the docs for the StreamDownload struct for more details.

If using reqwest-middleware, a retry policy can be used to handle transient server errors. See retry_middleware for an example of adding retry middleware.

Authentication and Other Customization

It's possible to customize your HTTP requests if you need to perform authentication or change other settings.

See client_options for customizing the HTTP client builder.

See custom_client for dynamically modifying each HTTP request.

Supported Rust Versions

The MSRV is currently 1.75.0.