v0.6.0
🚀 Streaming v0.6.0
Streaming v0.6.0
is released! Install via pip
:
pip install --upgrade mosaicml-streaming==0.6.0
New Features
🆕 Databricks File System and Databricks Unity Catalog (#362)
Support for reading and writing data from and to the Databricks File System (DBFS) and Unity Catalog (UC) Volumes. This means that you can now use DBFS and UC Volumes as a source or sink for your streaming data pipelines or model training. Below is the path structure:
Databricks File System (DBFS)
DBFS path structure is a hierarchical namespace that is organized into directories and files. The DBFS prefix must starts with dbfs:/
.
UC Volumes
The path structure for UC Volumes is similar to the path structure for DBFS, but with a few key differences.
The root of the UC Volumes namespace is dbfs:/Volumes/<catalog>/<schema>/<volume>
, where:
<catalog>
is the name of the catalog where the volume is created.<schema>
is the name of the schema where the volume is created.<volume>
is the name of the volume.
Hence, use a dbfs://Volumes
prefix to specify a UC Volumes path.
💽 Spark Dataframe to MDS convertor (#363)
Introducing the new DataFrameToMDS
API, empowering users to effortlessly leverage Spark's capabilities for handling diverse datasets in various formats. This API enables seamless conversion of Spark DataFrames into MDS datasets, with the flexibility to specify output locations to both local and cloud storage. Index files are optionally merged. Additionally, users can add data preprocessing steps by defining custom iterator functions and arguments. All these features are seamlessly bundled into a single Spark job, ensuring an efficient and streamlined workflow for data transformation. An example notebook is provided to help users get started.
🔀 Randomize and offset shuffle blocks algorithm (#373)
The new py1br
shuffle algorithm helps mitigate download spikes that occur when using the py1b
algorithm. With py1b
, shuffle blocks are all the same size, so when progressing through training, nodes will have to download many shards at the same time. In contrast, with py1br
, shuffle blocks are offset from each other and are variably sized. This results in more balanced downloads over time. The py1br
algorithm is a replacement for the py1b
algorithm, which will be deprecated soon.
from streaming import StreamingDataset
dataset = StreamingDataset(
shuffle_algo='py1br',
...
)
🔀 Expanded range shuffle algorithm (#394)
The new py1e
shuffle algorithm helps reduce the minimum cache limit needed for training, and results in much smoother downloads than both py1br
and py1e
. However, its shuffle quality is slightly lower. Rather than shuffling all samples in blocks of size shuffle_block_size
, it instead spreads the samples of each shard over a range of maximum size shuffle_block_size
, retaining most of the shuffle quality from py1b
and py1br
while reducing download spikes across the duration of training.
from streaming import StreamingDataset
dataset = StreamingDataset(
shuffle_algo='py1e',
...
)
🔥 Per-Stream Batching (#407)
Users are now able to ensure that each batch comes has samples from only a single stream. You can now set the new parameter batching_method
to per_stream
to access this functionality. Per-stream batching will still take into account upsampling and downsampling of streams, set by proportion
, repeat
, or choose
. To make batches contain only samples from a group of streams, merge streams’ index.json
files to create a single one for each group.
from streaming import StreamingDataset
dataset = StreamingDataset(
batching_method='per_stream',
...
)
🔥 Stratified Batching (#408)
Users are now able to ensure that each batch has a consistent number of samples from every stream. Previously, stream proportions were satisfied in the aggregate but not at the batch level. You can now set the new parameter batching_method
to stratified
to access this functionality. Stratified batching will still take into account upsampling and downsampling of streams, set by proportion
, repeat
, or choose
.
from streaming import StreamingDataset
dataset = StreamingDataset(
batching_method='stratified',
...
)
💪 Download-Efficient Sparse Sampling (#391)
Previous versions of StreamingDataset implement downsampling/upsampling by giving each sample equal probability of being selected (plus or minus one due when sampling is fractional), without regard to what shard a sample is on. This means that no matter how small your desired downsampling is, StreamingDataset will still use each shard at as equal a rate as possible. This is problematic for downloading performance.
In this version of Streaming, we have added a new optional StreamingDataset argument sampling_granularity
which can be used to configure how sampling is done. It is an integer, defaulting to 1, that determines how many samples are to be drawn at a time from a single random shard until we have enough samples.
Note that the default setting of 1 is equivalent to the old non-shard-aware behavior. Setting it high, e.g. the number of samples in a full shard or more, means it will draw all the samples in a randomly chosen (without replacement) shard until it has enough samples, which is much more download-effiicient but results in the samples of each shard always being seen close together in training, which may have implications to convergence depending on your workload. Setting sampling granularity to half a shard means, roughly speaking, you'll see half the samples of a shard at a time during training.
from streaming import StreamingDataset
dataset = StreamingDataset(
sampling_granularity=1,
...
)
📑 Reusable local directory (#406)
Users can now instantiate more than one StreamingDataset with same local
directory and remote=None
. This would be useful if there is a high-speed storage mounted on a node and multiple folks are trying to read the dataset directly from mount storage on the same node without having to copy the data on local disk.
from streaming import StreamingDataset
local = '<local disk directory or a mount point directory>'
dataset_0 = StreamingDataset(local=local, remote=None)
dataset_1 = StreamingDataset(local=local, remote=None)
🐛 Bug Fixes
- Terminate the worker threads when process terminates to avoid deadlock. (#425)
- Raise an exception if
cache_limit
is lower than the size of a single shard file to avoid deadlock. (#420) - Fixed
predownload
value to zero issue where users can now providepredownload=0
inStreamingDataset
. (#383)
🔧 Improvements
- Add google Application Default Credentials (#376).
- The order of authentication has changed and added a new App Engine or Compute Engine authentication channel if these are available. The order of authentication is as follows:
- HMAC
- Google service account
- App Engine
- Compute Engine
- Raise an error
- The order of authentication has changed and added a new App Engine or Compute Engine authentication channel if these are available. The order of authentication is as follows:
- Check if
index.json
exists locally before downloading to avoid duplicate downloads (#372).
What's Changed
- Bump fastapi from 0.100.0 to 0.101.0 by @dependabot in #367
- Bump uvicorn from 0.23.1 to 0.23.2 by @dependabot in #368
- Check if index.json exists locally before downloading by @karan6181 in #372
- Bench/plot sample access times across data and across formats by @knighton in #365
- Apply ruff pre-commit hook by @Skylion007 in #364
- Add a regression test for shuffling sample order by @b-chu in #359
- Epoch size default behavior by @snarayan21 in #374
- Stream unspecified docstring change by @snarayan21 in #377
- fixed comments by @snarayan21 in #378
- Add google Application Default Credentials to download by @fgerzer in #376
- Fixed fake AWS credentials by @karan6181 in #382
- Fixed predownload value to zero issue by @karan6181 in #383
- Bump fastapi from 0.101.0 to 0.101.1 by @dependabot in #387
- Bump pydantic from 2.1.1 to 2.2.1 by @dependabot in #389
- Add a regression test for mixing of different dataset streams by @b-chu in #375
- Add support for Databricks File System backend by @maddiedawson in #362
- Add support for downloading from Unity Catalog volumes by @maddiedawson in #361
- Fix MosaicML platform credential setup links by @karan6181 in #396
- Plug hole in MDS type system: add arbitrary-precision decimal by @knighton in #390
- Bump fastapi from 0.101.1 to 0.103.0 by @dependabot in #402
- Bump pydantic from 2.2.1 to 2.3.0 by @dependabot in #403
- Bump databricks-sdk from 0.3.1 to 0.6.0 by @dependabot in #404
- Py1br algorithm implementation by @snarayan21 in #373
- Benchmarking partitioning by @knighton in #379
- Expanded range shuffle by @snarayan21 in #394
- Reusable local directory when remote is None by @karan6181 in #406
- Bump gitpython from 3.1.32 to 3.1.34 by @dependabot in #410
- Bump pytest from 7.4.0 to 7.4.1 by @dependabot in #411
- Bump fastapi from 0.103.0 to 0.103.1 by @dependabot in #413
- Bump databricks-sdk from 0.6.0 to 0.8.0 by @dependabot in #414
- Per Stream Batching by @snarayan21 in #407
- Update Databricks download and upload functionality using new Databricks python sdk by @karan6181 in #418
- Add delta to mds converter by @XiaohanZhangCMU in #363
- Stratified Batching by @snarayan21 in #408
- Raise an exception if cache limit is too low by @karan6181 in #420
- Remove torchtext by @mvpatel2000 in #423
- Fix nb by @XiaohanZhangCMU in #422
- Fixed python version by @karan6181 in #424
- Improve shard efficiency of sampling for fractional stream repeats. by @knighton in #391
- Optimize dataframe writer (small change) by @Skylion007 in #426
- Fix deadlock by @acutkosky in #425
- changed choose to epoch_size in stream proportion docstring by @snarayan21 in #432
- Bump version to 0.6.0 by @XiaohanZhangCMU in #433
New Contributors
- @Skylion007 made their first contribution in #364
- @fgerzer made their first contribution in #376
- @maddiedawson made their first contribution in #362
- @XiaohanZhangCMU made their first contribution in #363
- @mvpatel2000 made their first contribution in #423
- @acutkosky made their first contribution in #425
Full Changelog: v0.5.2...v0.6.0