Skip to content

v0.6.0

Compare
Choose a tag to compare
@XiaohanZhangCMU XiaohanZhangCMU released this 13 Sep 20:11
· 207 commits to main since this release
65ac4ca

🚀 Streaming v0.6.0

Streaming v0.6.0 is released! Install via pip:

pip install --upgrade mosaicml-streaming==0.6.0

New Features

🆕  Databricks File System and Databricks Unity Catalog (#362)

Support for reading and writing data from and to the Databricks File System (DBFS) and Unity Catalog (UC) Volumes. This means that you can now use DBFS and UC Volumes as a source or sink for your streaming data pipelines or model training. Below is the path structure:

Databricks File System (DBFS)

DBFS path structure is a hierarchical namespace that is organized into directories and files. The DBFS prefix must starts with dbfs:/.

UC Volumes

The path structure for UC Volumes is similar to the path structure for DBFS, but with a few key differences.

The root of the UC Volumes namespace is dbfs:/Volumes/<catalog>/<schema>/<volume>, where:

  • <catalog> is the name of the catalog where the volume is created.
  • <schema> is the name of the schema where the volume is created.
  • <volume> is the name of the volume.

Hence, use a dbfs://Volumes prefix to specify a UC Volumes path.

💽 Spark Dataframe to MDS convertor (#363)

Introducing the new DataFrameToMDS API, empowering users to effortlessly leverage Spark's capabilities for handling diverse datasets in various formats. This API enables seamless conversion of Spark DataFrames into MDS datasets, with the flexibility to specify output locations to both local and cloud storage. Index files are optionally merged. Additionally, users can add data preprocessing steps by defining custom iterator functions and arguments. All these features are seamlessly bundled into a single Spark job, ensuring an efficient and streamlined workflow for data transformation. An example notebook is provided to help users get started.

🔀 Randomize and offset shuffle blocks algorithm (#373)

The new py1br shuffle algorithm helps mitigate download spikes that occur when using the py1b algorithm. With py1b, shuffle blocks are all the same size, so when progressing through training, nodes will have to download many shards at the same time. In contrast, with py1br, shuffle blocks are offset from each other and are variably sized. This results in more balanced downloads over time. The py1br algorithm is a replacement for the py1b algorithm, which will be deprecated soon.

from streaming import StreamingDataset

dataset = StreamingDataset(
    shuffle_algo='py1br',
    ...
)

🔀 Expanded range shuffle algorithm (#394)

The new py1e shuffle algorithm helps reduce the minimum cache limit needed for training, and results in much smoother downloads than both py1br and py1e. However, its shuffle quality is slightly lower. Rather than shuffling all samples in blocks of size shuffle_block_size, it instead spreads the samples of each shard over a range of maximum size shuffle_block_size, retaining most of the shuffle quality from py1b and py1br while reducing download spikes across the duration of training.

from streaming import StreamingDataset

dataset = StreamingDataset(
    shuffle_algo='py1e',
    ...
)

🔥 Per-Stream Batching (#407)

Users are now able to ensure that each batch comes has samples from only a single stream. You can now set the new parameter batching_method to per_stream to access this functionality. Per-stream batching will still take into account upsampling and downsampling of streams, set by proportion, repeat, or choose. To make batches contain only samples from a group of streams, merge streams’ index.json files to create a single one for each group.

from streaming import StreamingDataset

dataset = StreamingDataset(
    batching_method='per_stream',
    ...
)

🔥 Stratified Batching (#408)

Users are now able to ensure that each batch has a consistent number of samples from every stream. Previously, stream proportions were satisfied in the aggregate but not at the batch level. You can now set the new parameter batching_method to stratified to access this functionality. Stratified batching will still take into account upsampling and downsampling of streams, set by proportion, repeat, or choose.

from streaming import StreamingDataset

dataset = StreamingDataset(
    batching_method='stratified',
    ...
)

💪 Download-Efficient Sparse Sampling (#391)

Previous versions of StreamingDataset implement downsampling/upsampling by giving each sample equal probability of being selected (plus or minus one due when sampling is fractional), without regard to what shard a sample is on. This means that no matter how small your desired downsampling is, StreamingDataset will still use each shard at as equal a rate as possible. This is problematic for downloading performance.

In this version of Streaming, we have added a new optional StreamingDataset argument sampling_granularity which can be used to configure how sampling is done. It is an integer, defaulting to 1, that determines how many samples are to be drawn at a time from a single random shard until we have enough samples.

Note that the default setting of 1 is equivalent to the old non-shard-aware behavior. Setting it high, e.g. the number of samples in a full shard or more, means it will draw all the samples in a randomly chosen (without replacement) shard until it has enough samples, which is much more download-effiicient but results in the samples of each shard always being seen close together in training, which may have implications to convergence depending on your workload. Setting sampling granularity to half a shard means, roughly speaking, you'll see half the samples of a shard at a time during training.

from streaming import StreamingDataset

dataset = StreamingDataset(
    sampling_granularity=1,
    ...
)

📑 Reusable local directory (#406)

Users can now instantiate more than one StreamingDataset with same local directory and remote=None. This would be useful if there is a high-speed storage mounted on a node and multiple folks are trying to read the dataset directly from mount storage on the same node without having to copy the data on local disk.

from streaming import StreamingDataset

local = '<local disk directory or a mount point directory>'
dataset_0 = StreamingDataset(local=local, remote=None)
dataset_1 = StreamingDataset(local=local, remote=None)

🐛 Bug Fixes

  • Terminate the worker threads when process terminates to avoid deadlock. (#425)
  • Raise an exception if cache_limit is lower than the size of a single shard file to avoid deadlock. (#420)
  • Fixed predownload value to zero issue where users can now provide predownload=0 in StreamingDataset. (#383)

🔧 Improvements

  • Add google Application Default Credentials (#376).
    • The order of authentication has changed and added a new App Engine or Compute Engine authentication channel if these are available. The order of authentication is as follows:
      1. HMAC
      2. Google service account
      3. App Engine
      4. Compute Engine
      5. Raise an error
  • Check if index.json exists locally before downloading to avoid duplicate downloads (#372).

What's Changed

New Contributors

Full Changelog: v0.5.2...v0.6.0