Skip to content

Releases: mosaicml/streaming

v0.7.0

06 Nov 01:23
4e8c944
Compare
Choose a tag to compare

🚀 Streaming v0.7.0

Streaming v0.7.0 is released! Install via pip:

pip install --upgrade mosaicml-streaming==0.7.0

📈 Better Defaults for StreamingDataset (#479)

  • The default values for StreamingDataset have been updated to be more performant and are applicable for most use cases, detailed below:
Parameter Old Value New Value Benefit
shuffle_algo py1s py1e Better shuffle and balanced downloading
num_canonical_nodes 64 * physical nodes if py1s or py2s, 64 * physical_nodes, otherwise physical_nodes Consistently good shuffle for all shuffle algos
shuffle_block_size 262,144 4,000,000 / num_canonical_nodes Consistently good shuffle for all num_canonical_nodes values
predownload max(batch_size, 256 * batch_size // num_canonical_nodes) 8 * batch_size Better balanced downloading
partition_algo orig relaxed More flexible deterministic resumptions on nodes

💎 New Features

🤖 Streaming Simulator: Easily simulate the performance of training configurations. (#385)

  • After installing this version of streaming, simply run the command simulator in your terminal to open the simulation interface.
  • Simulate throughput, network downloads, shuffle quality, and cache limit requirements for configurations.
  • Easily de-risk runs and find performant parameter settings.
  • Check out the docs for more information!

🔢 More flexible deterministic training and resumption (#476)

  • Deterministic training and resumptions are now possible on more numbers of nodes!
  • Previously, the num_canonical_nodes parameter had to divide or be a multiple of the number of physical nodes for determinism.
  • Now, deterministic training is possible on any number of nodes that also evenly divides your run's global batch size.

🐛 Bug Fixes

  • Check for invalid hash algorithm names (#486)

What's Changed

Full Changelog: v0.6.1...v0.7.0

v0.6.1

18 Oct 21:28
8827d7a
Compare
Choose a tag to compare

🚀 Streaming v0.6.1

Streaming v0.6.1 is released! Install via pip:

pip install --upgrade mosaicml-streaming==0.6.1

💎 New Features

🚃 Merge meta-data information from sub-directories dataset to form one unified dataset. (#449)

  • Addition of the merge_index() utility method to merge subdirectories index files from an MDS dataset. The subdirectories can be local or any supported cloud provider URL path.
  • Checkout dataset conversion and Spark Dataframe to MDS jupyter notebook for an example in action.

🔁 Retry uploading a file to a cloud provider path. (#448)

  • Added upload retry logic with backoff and jitter during dataset conversion as part of parameter retry in Writer.
from streaming import MDSWriter

with MDSWriter(
               ...,
               retry=3) as out:
    for sample in dataset:
        out.write(sample)

🐛 Bug Fixes

  • Validate Writer arguments and raise a ValueError exception if argument(s) is/are invalid. (#434)
  • Terminate the main process if one of the upload threads receives an Exception during dataset conversion. (#448)

🔧 Improvements

  • More balancing inter-node downloading for the py1e shuffling algorithm by varying shard sample ranges, helping to reduce throughput drops at scale. (#442)

What's Changed

New Contributors

  • @Hubert-Bonisseur made their first contribution in #450

Full Changelog: v0.6.0...v0.6.1

v0.6.0

13 Sep 20:11
65ac4ca
Compare
Choose a tag to compare

🚀 Streaming v0.6.0

Streaming v0.6.0 is released! Install via pip:

pip install --upgrade mosaicml-streaming==0.6.0

New Features

🆕  Databricks File System and Databricks Unity Catalog (#362)

Support for reading and writing data from and to the Databricks File System (DBFS) and Unity Catalog (UC) Volumes. This means that you can now use DBFS and UC Volumes as a source or sink for your streaming data pipelines or model training. Below is the path structure:

Databricks File System (DBFS)

DBFS path structure is a hierarchical namespace that is organized into directories and files. The DBFS prefix must starts with dbfs:/.

UC Volumes

The path structure for UC Volumes is similar to the path structure for DBFS, but with a few key differences.

The root of the UC Volumes namespace is dbfs:/Volumes/<catalog>/<schema>/<volume>, where:

  • <catalog> is the name of the catalog where the volume is created.
  • <schema> is the name of the schema where the volume is created.
  • <volume> is the name of the volume.

Hence, use a dbfs://Volumes prefix to specify a UC Volumes path.

💽 Spark Dataframe to MDS convertor (#363)

Introducing the new DataFrameToMDS API, empowering users to effortlessly leverage Spark's capabilities for handling diverse datasets in various formats. This API enables seamless conversion of Spark DataFrames into MDS datasets, with the flexibility to specify output locations to both local and cloud storage. Index files are optionally merged. Additionally, users can add data preprocessing steps by defining custom iterator functions and arguments. All these features are seamlessly bundled into a single Spark job, ensuring an efficient and streamlined workflow for data transformation. An example notebook is provided to help users get started.

🔀 Randomize and offset shuffle blocks algorithm (#373)

The new py1br shuffle algorithm helps mitigate download spikes that occur when using the py1b algorithm. With py1b, shuffle blocks are all the same size, so when progressing through training, nodes will have to download many shards at the same time. In contrast, with py1br, shuffle blocks are offset from each other and are variably sized. This results in more balanced downloads over time. The py1br algorithm is a replacement for the py1b algorithm, which will be deprecated soon.

from streaming import StreamingDataset

dataset = StreamingDataset(
    shuffle_algo='py1br',
    ...
)

🔀 Expanded range shuffle algorithm (#394)

The new py1e shuffle algorithm helps reduce the minimum cache limit needed for training, and results in much smoother downloads than both py1br and py1e. However, its shuffle quality is slightly lower. Rather than shuffling all samples in blocks of size shuffle_block_size, it instead spreads the samples of each shard over a range of maximum size shuffle_block_size, retaining most of the shuffle quality from py1b and py1br while reducing download spikes across the duration of training.

from streaming import StreamingDataset

dataset = StreamingDataset(
    shuffle_algo='py1e',
    ...
)

🔥 Per-Stream Batching (#407)

Users are now able to ensure that each batch comes has samples from only a single stream. You can now set the new parameter batching_method to per_stream to access this functionality. Per-stream batching will still take into account upsampling and downsampling of streams, set by proportion, repeat, or choose. To make batches contain only samples from a group of streams, merge streams’ index.json files to create a single one for each group.

from streaming import StreamingDataset

dataset = StreamingDataset(
    batching_method='per_stream',
    ...
)

🔥 Stratified Batching (#408)

Users are now able to ensure that each batch has a consistent number of samples from every stream. Previously, stream proportions were satisfied in the aggregate but not at the batch level. You can now set the new parameter batching_method to stratified to access this functionality. Stratified batching will still take into account upsampling and downsampling of streams, set by proportion, repeat, or choose.

from streaming import StreamingDataset

dataset = StreamingDataset(
    batching_method='stratified',
    ...
)

💪 Download-Efficient Sparse Sampling (#391)

Previous versions of StreamingDataset implement downsampling/upsampling by giving each sample equal probability of being selected (plus or minus one due when sampling is fractional), without regard to what shard a sample is on. This means that no matter how small your desired downsampling is, StreamingDataset will still use each shard at as equal a rate as possible. This is problematic for downloading performance.

In this version of Streaming, we have added a new optional StreamingDataset argument sampling_granularity which can be used to configure how sampling is done. It is an integer, defaulting to 1, that determines how many samples are to be drawn at a time from a single random shard until we have enough samples.

Note that the default setting of 1 is equivalent to the old non-shard-aware behavior. Setting it high, e.g. the number of samples in a full shard or more, means it will draw all the samples in a randomly chosen (without replacement) shard until it has enough samples, which is much more download-effiicient but results in the samples of each shard always being seen close together in training, which may have implications to convergence depending on your workload. Setting sampling granularity to half a shard means, roughly speaking, you'll see half the samples of a shard at a time during training.

from streaming import StreamingDataset

dataset = StreamingDataset(
    sampling_granularity=1,
    ...
)

📑 Reusable local directory (#406)

Users can now instantiate more than one StreamingDataset with same local directory and remote=None. This would be useful if there is a high-speed storage mounted on a node and multiple folks are trying to read the dataset directly from mount storage on the same node without having to copy the data on local disk.

from streaming import StreamingDataset

local = '<local disk directory or a mount point directory>'
dataset_0 = StreamingDataset(local=local, remote=None)
dataset_1 = StreamingDataset(local=local, remote=None)

🐛 Bug Fixes

  • Terminate the worker threads when process terminates to avoid deadlock. (#425)
  • Raise an exception if cache_limit is lower than the size of a single shard file to avoid deadlock. (#420)
  • Fixed predownload value to zero issue where users can now provide predownload=0 in StreamingDataset. (#383)

🔧 Improvements

  • Add google Application Default Credentials (#376).
    • The order of authentication has changed and added a new App Engine or Compute Engine authentication channel if these are available. The order of authentication is as follows:
      1. HMAC
      2. Google service account
      3. App Engine
      4. Compute Engine
      5. Raise an error
  • Check if index.json exists locally before downloading to avoid duplicate downloads (#372).

What's Changed

Read more

v0.5.2

19 Jun 05:58
a301cd0
Compare
Choose a tag to compare

🚀 Streaming v0.5.2

Streaming v0.5.2 is released! Install via pip:

pip install --upgrade mosaicml-streaming==0.5.2

New features

  • Allow authentication with GCS for service accounts #315
  • human-readable suffixes for size_limit and epoch_size #333
  • static sampling #348

Documentation changes

  • Update contribution guide and improved unittest logic #343
  • static sampling #348

Testing

  • Add a regression test for StreamingDataset instantiation and iteration #318
  • Fixed accidental shard delete test #341
  • Add a regression test for StreamingDataset using cloud providers #319
  • Add iteration time test as part of regression testing #358

Bug fix

  • Fix init local dir zip-only shard handling #330
  • added default behavior if no streams and epoch_size specified #348

What's Changed

New Contributors

Full Changelog: v0.5.1...v0.5.2

v0.5.1

08 Aug 18:59
ac53002
Compare
Choose a tag to compare

What's Changed

Full Changelog: v0.5.0...v0.5.1

v0.5.0

06 Jun 13:31
8e16aa9
Compare
Choose a tag to compare

🚀 Streaming v0.5.0

Streaming v0.5.0 is released! Install via pip:

pip install --upgrade mosaicml-streaming==0.5.0

New Features

🆕 Cold Shard Eviction. ( #219 )

Dynamically delete least recently used shards in order to keep disk usage under a specified limit. This is enabled by setting the StreamingDataset argument cache_limit. See the shuffling guide for more details.

from streaming import StreamingDataset

dataset = StreamingDataset(
    cache_limit='100gb',
    ...
)

🤙 Fetch sample using NumPy style indexing. ( #120 )

Users can now randomly access samples using NumPy-style indexing with StreamingDataset. For example,

import numpy as np
from streaming import StreamingDataset

dataset = StreamingDataset(local=local, remote=remote)

dataset[0]  # Fetch sample 0
dataset[-1]  # Fetch last sample
dataset[[10, 20]]  # Fetch sample 10 and 20
dataset[slice(1, 10, 2)]  # Fetch sample 1, 3, 5, 7, and 9
dataset[5:0:-1]  # Fetch sample 5, 4, 3, 2, 1
dataset[np.array([4, 7])]  # Fetch sample 4 and 7

🦾 Any S3 compatible object store. ( #265 )

Support of any S3 compatible object stores, meaning, an object store which uses the S3 API to communicate with any connected device or system. Some of the S3 compatible object stores are Cloudflare R2, Coreweave, Backblaze b2, etc. User needs to provide an environment variable S3_ENDPOINT_URL based on the object store that you are using. Details on how to configure credentials can be found here.

🦾 Azure cloud blob storage. ( #256 )

Support of Azure cloud blob storage. Details on how to configure credentials can be found here.

Bug Fixes

  • Wait for download and ready thread to finish before terminating job. ( #286 )
  • Fixed length calculation to use resampled epoch size, not underlying num samples. ( #278 )
  • Fixed mypy errors by adding a py.typed marker file. ( #245 )
  • Create a new boto3 session per thread to avoid sharing resources. ( #241 )

🔧 API changes

  • The argument samples_per_epoch has been renamed to epoch_size in StreamingDatasetto better distinguish the actual number of underlying samples as serialized and the number of observed samples when iterating (which may be different due to weighting sub-datasets).
  • The argument samples has been renamed to choose in Stream to better distinguish the underlying sample vs resampled data.
  • The argument keep_raw has been removed in StreamingDataset in the process of finalizing the design for shard eviction (see the newly-added cache_limit parameter).
  • The default value of predownload in StreamingDataset was updated; it is now derived using batch size and number of canonical nodes instead of previous constant value of 100_000. This is to prevent predownloaded shards from getting evicted before ever being used.
  • The default value of num_canonical_nodes in StreamingDataset was updated to 64 times the number of nodes of the initial run instead of number of nodes of the initial run to increase data source diversity and improve convergence.
  • The default value of shuffle_algo in StreamingDataset was changed from py1b to py1s as it requires less shards to be downloaded during iteration. More details about different shuffling algorithms can be found here.

What's Changed

New Contributors

Full Changelog: v0.4.1...v0.5.0

v0.4.1

25 Apr 14:01
552994a
Compare
Choose a tag to compare

🚀 Streaming v0.4.1

Streaming v0.4.1 is released! Install via pip:

pip install --upgrade mosaicml-streaming==0.4.1

New Feature

  • Support of Torch 2.0. (#234)
  • Addition of two new sample shuffling algorithm. (#223)
  • Support of AWS S3 requester payers bucket permission for streaming. (#231)

Documentation

  • Added a streaming installation guide and a streaming environment guide. (#221)
  • Added a instruction guide for converting a multimodal dataset into a MDS format. (#220)
  • Streaming documentation now support Algolia search. (#224)

What's Changed

Full Changelog: v0.4.0...v0.4.1

v0.4.0

31 Mar 02:22
d428ed8
Compare
Choose a tag to compare

🚀 Streaming v0.4.0

Streaming v0.4.0 is released! Install via pip:

pip install --upgrade mosaicml-streaming==0.4.0

New Feature

🔀 Dataset Mixing

  • Weighted mixing of sub-datasets on the fly during model training (#184). StreamingDataset now support an optional streams parameter which takes one or more sub-datasets and it intelligently fetches samples across sub-datasets. You can mix (upsample or downsample) datasets by defining each either relatively (proportion) or absolutely (repeat or samples or none of them to sample 1:1).

Documentation

  • Added a README which shows how to convert a raw dataset into an MDS format for Text and Vision dataset. (#183)

Bug Fixes

  • Raise an exception if the cloud storage bucket does not exist during shard file upload. (#212)
  • Remove unsupported ThreadPoolExecutor shutdown param for python38. (#199)

What's Changed

New Contributors

Full Changelog: v0.3.0...v0.4.0

v0.3.0

01 Mar 08:30
11d0944
Compare
Choose a tag to compare

🚀 Streaming v0.3.0

Streaming v0.3.0 is released! Install via pip:

pip install --upgrade mosaicml-streaming==0.3.0

New Features

☁️ Cloud uploading

Now, you can automatically upload shards to cloud storage on the fly by providing a cloud path to MDSWriter. Track the progress of individual uploads with progress_bar=True, and tune background upload workers with max_workers=4.

User can choose to upload a output shard files automatically to a supported cloud (AWS S3, GCP, OCI) by providing a out parameter as a cloud provider bucket location as part of Writer class. Below is the example to upload output files to AWS S3 bucket

output_dir = 's3://bucket/dir/path'
with MDSWriter(out=output_dir, ...) as out:
    for sample in samples:
        pass

User can choose to keep a output shard files locally by providing a local directory path as part of Writer. For example,

output_dir = '/tmp/mds'
with MDSWriter(out=output_dir, ...) as out:
    for sample in samples:
        pass

User can see the progress of the cloud upload file by setting progress_bar=True as part of Writer. For example,

output_dir = 's3://bucket/dir/path'
with MDSWriter(out=output_dir, progress_bar=True, ...) as out:
    for sample in samples:
        pass

User can control the number of background upload threads via parameter max_workers as part of Writer who is responsible for uploading the shard files to a remote location if provided. One thread is responsible for one file upload. For example, if max_workers=4, maximum 4 threads would be active at a same time uploading one shard file each.

output_dir = 's3://bucket/dir/path'
with MDSWriter(out=output_dir, max_workers=4, ...) as out:
    for sample in samples:
        pass

🔀 2x faster shuffling

We’ve added a new shuffling algorithm py1s which is twice as fast on typical workloads. You can toggle which shuffling algorithm is used by overriding shuffle_algo (old behavior: py2s). You will experience this as faster epoch starts and faster mid-epoch resumption for large datasets.

📨 2x faster partitioning

We’ve also reimplemented how shards/samples are assigned to nodes/devices/dataloader workers to run about twice as fast on typical workloads while giving identical results. This is exposed as the partition_algo argument to StreamingDataset. You will experience this as faster start and resumption for large datasets.

🔗 Extensible downloads

We provide examples of modifying StreamingDataset to stream from a dataset of links to external data sources. In our examples, using the WebVid dataset, each sample points to a video file which exists outside of the shards in its original format and is downloaded separately. Benchmarking is included.

API changes

  • Class Writer and its derived classes (MDSWriter, XSVWriter, TSVWriter, CSVWriter, and JSONWriter) parameter has been changed from dirname to out with the following advanced functionalities:

    • If out is a local directory, shard files are saved locally. For example, out=/tmp/mds/.
    • If out is a remote directory, a local temporary directory is created to cache the shard files and then the shard files are uploaded to a remote location. At the end, the temp directory is deleted once shards are uploaded. For example, out=s3://bucket/dir/path.
    • If out is a tuple of (local_dir, remote_dir), shard files are saved in the
      local_dir and also uploaded to a remote location. For example, out=('/tmp/mds/', 's3://bucket/dir/path').
  • Given the complexity of their arguments, and the need to be able to safely upgrade them over time, we have updated the APIs of Writer and its subclasses (like MDSWriter) and StreamingDataset to require kwargs.

Bug Fixes

What's Changed

New Contributors

Full Changelog: v0.2.5...v0.3.0

v0.2.5

14 Feb 05:29
4bf9c1c
Compare
Choose a tag to compare

🚀 Streaming v0.2.5

Streaming v0.2.5 is released! Install via pip:

pip install --upgrade mosaicml-streaming==0.2.5

Bug Fixes

  • Fixed CPU crash (#153)
  • Update example notebooks (#157)

What's Changed

Full Changelog: v0.2.4...v0.2.5