diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml new file mode 100644 index 0000000..a031fdb --- /dev/null +++ b/.github/workflows/docs.yaml @@ -0,0 +1,46 @@ +# Adjusted workflow from: https://github.com/mitmproxy/pdoc/blob/25f325d06aaacba9711f957f48e770029f608f6a/.github/workflows/docs.yml + +name: Generate documentation using pdoc and deploy as gh page. + +# build the documentation whenever there are new commits on main +on: + push: + branches: + tags: + - '*' + +# security: restrict permissions for CI jobs. +permissions: + contents: read + +jobs: + # Build the documentation and upload the static HTML files as an artifact. + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: '3.11' + + # Install all dependencies (including pdoc) + - run: pip install -e .[docs] + - run: pdoc -d numpy -o docs tensorflow_time_series_dataset + - uses: actions/upload-pages-artifact@v3 + with: + path: docs/ + + # Deploy the artifact to GitHub pages. + # This is a separate job so that only actions/deploy-pages has the necessary permissions. + deploy: + needs: build + runs-on: ubuntu-latest + permissions: + pages: write + id-token: write + environment: + name: github-pages + url: ${{ steps.deployment.outputs.page_url }} + steps: + - id: deployment + uses: actions/deploy-pages@v4 diff --git a/.github/workflows/pre-commit.yaml b/.github/workflows/pre-commit.yaml new file mode 100644 index 0000000..846f557 --- /dev/null +++ b/.github/workflows/pre-commit.yaml @@ -0,0 +1,11 @@ +name: run pre-commit hooks + +on: [push] + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v4 + - uses: pre-commit/action@v3.0.1 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..a3753ba --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,118 @@ +# Inspired by: https://packaging.python.org/en/latest/guides/publishing-package-distribution-releases-using-github-actions-ci-cd-workflows/ + +name: Publish Python Distribution to PyPI and TestPyPI + +on: push + +jobs: + build: + name: Build + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: 3.x + - name: Install pypa/build + run: >- + python3 -m + pip install + build + --user + - name: Build a binary wheel and a source tarball + run: python3 -m build + - name: Store the distribution packages + uses: actions/upload-artifact@v3 + with: + name: python-package-distributions + path: dist/ + + publish-to-pypi: + name: Publish to PyPi + if: startsWith(github.ref, 'refs/tags/') # only publish to PyPI on tag pushes + needs: + - build + runs-on: ubuntu-latest + environment: + name: pypi + url: https://pypi.org/p/tensorflow-time-series-dataset + permissions: + id-token: write # IMPORTANT: mandatory for trusted publishing + steps: + - name: Download all the dists + uses: actions/download-artifact@v3 + with: + name: python-package-distributions + path: dist/ + - name: Publish distribution to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + + github-release: + name: >- + Sign the Python distribution with Sigstore + and upload them to GitHub Release + needs: + - publish-to-pypi + runs-on: ubuntu-latest + + permissions: + contents: write # IMPORTANT: mandatory for making GitHub Releases + id-token: write # IMPORTANT: mandatory for sigstore + + steps: + - name: Download all the dists + uses: actions/download-artifact@v3 + with: + name: python-package-distributions + path: dist/ + - name: Sign the dists with Sigstore + uses: sigstore/gh-action-sigstore-python@v1.2.3 + with: + inputs: >- + ./dist/*.tar.gz + ./dist/*.whl + - name: Create GitHub Release + env: + GITHUB_TOKEN: ${{ github.token }} + run: >- + gh release create + '${{ github.ref_name }}' + --repo '${{ github.repository }}' + --notes "" + - name: Upload artifact signatures to GitHub Release + env: + GITHUB_TOKEN: ${{ github.token }} + # Upload to GitHub Release using the `gh` CLI. + # `dist/` contains the built packages, and the + # sigstore-produced signatures and certificates. + run: >- + gh release upload + '${{ github.ref_name }}' dist/** + --repo '${{ github.repository }}' + + publish-to-testpypi: + name: Publish Python distribution to TestPyPI + needs: + - build + runs-on: ubuntu-latest + + environment: + name: testpypi + url: https://test.pypi.org/p/tensorflow-time-series-dataset + + permissions: + id-token: write # IMPORTANT: mandatory for trusted publishing + + steps: + - name: Download all the dists + uses: actions/download-artifact@v3 + with: + name: python-package-distributions + path: dist/ + - name: Publish distribution to TestPyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + skip-existing: true + repository-url: https://test.pypi.org/legacy/ diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 0000000..ba6c48c --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,26 @@ +name: Run tests + +on: [push] + +jobs: + build: + + strategy: + matrix: + platform: [ubuntu-latest, macos-latest, windows-latest] + python-version: [3.7, 3.8, 3.9, '3.10', '3.11'] + + runs-on: ${{ matrix.platform }} + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install .[test] + - name: Test with pytest + run: pytest -n auto diff --git a/.images/example1.svg b/.images/example1.svg new file mode 100644 index 0000000..261cd51 --- /dev/null +++ b/.images/example1.svg @@ -0,0 +1,1083 @@ + + + + + + + + 2024-02-16T13:46:58.875315 + image/svg+xml + + + Matplotlib v3.8.3, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.images/example2.svg b/.images/example2.svg new file mode 100644 index 0000000..572c88e --- /dev/null +++ b/.images/example2.svg @@ -0,0 +1,1088 @@ + + + + + + + + 2024-02-16T13:47:00.119903 + image/svg+xml + + + Matplotlib v3.8.3, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.images/example3.svg b/.images/example3.svg new file mode 100644 index 0000000..f582604 --- /dev/null +++ b/.images/example3.svg @@ -0,0 +1,1736 @@ + + + + + + + + 2024-02-16T13:47:01.361885 + image/svg+xml + + + Matplotlib v3.8.3, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..aec91b5 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,35 @@ +repos: +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.2.1 + hooks: + - id: ruff + args: [--fix] + - id: ruff-format +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 + hooks: + - id: check-added-large-files + - id: check-executables-have-shebangs + - id: check-merge-conflict + - id: check-shebang-scripts-are-executable + - id: check-toml + - id: check-yaml + - id: end-of-file-fixer + - id: mixed-line-ending + - id: trailing-whitespace +- repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks + rev: v2.12.0 + hooks: + - id: pretty-format-yaml + args: [--autofix, --indent, '2'] + - id: pretty-format-toml + args: [--autofix] + - id: pretty-format-ini + args: [--autofix] +- repo: https://github.com/commitizen-tools/commitizen + rev: v3.14.1 + hooks: + - id: commitizen + - id: commitizen-branch + stages: + - push diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..a8e9e4f --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,13 @@ +## v0.1.2 (2024-02-19) + +## v0.1.1 (2024-02-19) + +### Fix + +- use `min` instead of deprecated `T` for pandas freq +- reduce required matplotlib version for python 3.7 compatibility +- TimeSeriesSplit now works with newer pandas versions + + +- update project repository urls +- Update the readme with new project url diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..eacabe4 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,17 @@ +# Contributing + +Contributions are what make the open source community such an amazing +place to learn, inspire, and create. +Thank you for considering a contribution to this project. +Any contributions you make are _greatly appreciated_. + +If you have a suggestion that would make this better, please fork the repo and create a pull request. +You can also simply open an issue with the tag "enhancement". + +1. Fork the project +2. Create a feature branch for your changes +3. Commit your changes +4. Push to the branch +5. Open a pull request + +Thanks again for considering contribution. diff --git a/README.md b/README.md index e69de29..364f9ab 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,245 @@ +[![img](https://img.shields.io/github/contributors/MArpogaus/tensorflow_time_series_dataset.svg?style=flat-square)](https://github.com/MArpogaus/tensorflow_time_series_dataset/graphs/contributors) +[![img](https://img.shields.io/github/forks/MArpogaus/tensorflow_time_series_dataset.svg?style=flat-square)](https://github.com/MArpogaus/tensorflow_time_series_dataset/network/members) +[![img](https://img.shields.io/github/stars/MArpogaus/tensorflow_time_series_dataset.svg?style=flat-square)](https://github.com/MArpogaus/tensorflow_time_series_dataset/stargazers) +[![img](https://img.shields.io/github/issues/MArpogaus/tensorflow_time_series_dataset.svg?style=flat-square)](https://github.com/MArpogaus/tensorflow_time_series_dataset/issues) +[![img](https://img.shields.io/github/license/MArpogaus/tensorflow_time_series_dataset.svg?style=flat-square)](https://github.com/MArpogaus/tensorflow_time_series_dataset/blob/main/LICENSE) +[![img](https://img.shields.io/github/actions/workflow/status/MArpogaus/tensorflow_time_series_dataset/test.yaml.svg?label=test&style=flat-square)](https://github.com/MArpogaus/tensorflow_time_series_dataset/actions/workflows/test.yaml) +[![img](https://img.shields.io/badge/pre--commit-enabled-brightgreen.svg?logo=pre-commit&style=flat-square)](https://github.com/MArpogaus/tensorflow_time_series_dataset/blob/main/.pre-commit-config.yaml) +[![img](https://img.shields.io/badge/-LinkedIn-black.svg?style=flat-square&logo=linkedin&colorB=555)](https://linkedin.com/in/MArpogaus) + +[![img](https://img.shields.io/pypi/v/tensorflow_time_series_dataset.svg?style=flat-square)](https://pypi.org/project/tensorflow_time_series_dataset) + + +# TensorFlow time-series Dataset + +1. [About The Project](#orgd7601fa) +2. [Installation](#org4baeaee) +3. [Usage](#org78d3afa) + 1. [Example Data](#org437e43f) + 2. [Single-Step Prediction](#org87ebbfe) + 3. [Multi-Step Prediction](#org11d5abf) + 4. [Preprocessing: Add Metadata features](#org3006a48) +4. [Contributing](#org5b01553) +5. [License](#org8585d54) +6. [Contact](#org66ff6ba) +7. [Acknowledgments](#org8768712) + + + + +## About The Project + +This python package should help you to create TensorFlow datasets for time-series data. + + + + +## Installation + +This package is available on [PyPI](https://pypi.org/project/tensorflow-time-series-dataset/). +You install it and all of its dependencies using pip: + + pip install tensorflow_time_series_dataset + + + + +## Usage + + + + +### Example Data + +Suppose you have a dataset in the following form: + + import numpy as np + import pandas as pd + + # make things determeinisteic + np.random.seed(1) + + columns=['x1', 'x2', 'x3'] + periods=48 * 14 + test_df=pd.DataFrame( + index=pd.date_range( + start='1/1/1992', + periods=periods, + freq='30min' + ), + data=np.stack( + [ + np.random.normal(0,0.5,periods), + np.random.normal(1,0.5,periods), + np.random.normal(2,0.5,periods) + ], + axis=1 + ), + columns=columns + ) + test_df.head() + + x1 x2 x3 + 1992-01-01 00:00:00 0.812173 1.205133 1.578044 + 1992-01-01 00:30:00 -0.305878 1.429935 1.413295 + 1992-01-01 01:00:00 -0.264086 0.550658 1.602187 + 1992-01-01 01:30:00 -0.536484 1.159828 1.644974 + 1992-01-01 02:00:00 0.432704 1.159077 2.005718 + + + + +### Single-Step Prediction + +The factory class `WindowedTimeSeriesDatasetFactory` is used to create a TensorFlow dataset from pandas dataframes, or other data sources as we will see later. +We will use it now to create a dataset with `48` historic time-steps as the input to predict a single time-step in the future. + + from tensorflow_time_series_dataset.factory import WindowedTimeSeriesDatasetFactory as Factory + + factory_kwargs=dict( + history_size=48, + prediction_size=1, + history_columns=['x1', 'x2', 'x3'], + prediction_columns=['x3'], + batch_size=4, + drop_remainder=True, + ) + factory=Factory(**factory_kwargs) + ds1=factory(test_df) + ds1 + +This returns the following TensorFlow Dataset: + + <_PrefetchDataset element_spec=(TensorSpec(shape=(4, 48, 3), dtype=tf.float32, name=None), TensorSpec(shape=(4, 1, 1), dtype=tf.float32, name=None))> + +We can plot the result with the utility function `plot_path`: + + from tensorflow_time_series_dataset.utils.visualisation import plot_patch + + githubusercontent="https://raw.githubusercontent.com/MArpogaus/tensorflow_time_series_dataset/master/" + + fig=plot_patch( + ds1, + figsize=(8,4), + **factory_kwargs + ) + + fname='.images/example1.svg' + fig.savefig(fname) + + f"[[{githubusercontent}{fname}]]" + +![img](https://raw.githubusercontent.com/MArpogaus/tensorflow_time_series_dataset/master/.images/example1.svg) + + + + +### Multi-Step Prediction + +Lets now increase the prediction size to `6` half-hour time-steps. + + factory_kwargs.update(dict( + prediction_size=6 + )) + factory=Factory(**factory_kwargs) + ds2=factory(test_df) + ds2 + +This returns the following TensorFlow Dataset: + + <_PrefetchDataset element_spec=(TensorSpec(shape=(4, 48, 3), dtype=tf.float32, name=None), TensorSpec(shape=(4, 6, 1), dtype=tf.float32, name=None))> + +Again, lets plot the results to see what changed: + + fig=plot_patch( + ds2, + figsize=(8,4), + **factory_kwargs + ) + + fname='.images/example2.svg' + fig.savefig(fname) + + f"[[{githubusercontent}{fname}]]" + +![img](https://raw.githubusercontent.com/MArpogaus/tensorflow_time_series_dataset/master/.images/example2.svg) + + + + +### Preprocessing: Add Metadata features + +Preprocessors can be used to transform the data before it is fed into the model. +A Preprocessor can be any python callable. +In this case we will be using the a class called `CyclicalFeatureEncoder` to encode our one-dimensional cyclical features like the *time* or *weekday* to two-dimensional coordinates using a sine and cosine transformation as suggested in [this blogpost](). + + import itertools + from tensorflow_time_series_dataset.preprocessors import CyclicalFeatureEncoder + encs = { + "weekday": dict(cycl_max=6), + "dayofyear": dict(cycl_max=366, cycl_min=1), + "month": dict(cycl_max=12, cycl_min=1), + "time": dict( + cycl_max=24 * 60 - 1, + cycl_getter=lambda df, k: df.index.hour * 60 + df.index.minute, + ), + } + factory_kwargs.update(dict( + meta_columns=list(itertools.chain(*[[c+'_sin', c+'_cos'] for c in encs.keys()])) + )) + factory=Factory(**factory_kwargs) + for name, kwargs in encs.items(): + factory.add_preprocessor(CyclicalFeatureEncoder(name, **kwargs)) + + ds3=factory(test_df) + ds3 + +This returns the following TensorFlow Dataset: + + <_PrefetchDataset element_spec=((TensorSpec(shape=(4, 48, 3), dtype=tf.float32, name=None), TensorSpec(shape=(4, 1, 8), dtype=tf.float32, name=None)), TensorSpec(shape=(4, 6, 1), dtype=tf.float32, name=None))> + +Again, lets plot the results to see what changed: + + fig=plot_patch( + ds3, + figsize=(8,4), + **factory_kwargs + ) + + fname='.images/example3.svg' + fig.savefig(fname) + + f"[[{githubusercontent}{fname}]]" + +![img](https://raw.githubusercontent.com/MArpogaus/tensorflow_time_series_dataset/master/.images/example3.svg) + + + + +## Contributing + +Any Contributions are greatly appreciated! If you have a question, an issue or would like to contribute, please read our [contributing guidelines](CONTRIBUTING.md). + + + + +## License + +Distributed under the [Apache License 2.0](LICENSE) + + + + +## Contact + +[Marcel Arpogaus](https://github.com/marpogaus) - [marcel.arpogaus@gmail.com](mailto:marcel.arpogaus@gmail.com) + +Project Link: + + + + + +## Acknowledgments + +Parts of this work have been funded by the Federal Ministry for the Environment, Nature Conservation and Nuclear Safety due to a decision of the German Federal Parliament (AI4Grids: 67KI2012A). diff --git a/README.org b/README.org new file mode 100644 index 0000000..e940d10 --- /dev/null +++ b/README.org @@ -0,0 +1,228 @@ +# Inspired by: https://github.com/othneildrew/Best-README-Template +#+OPTIONS: toc:nil + +[[https://github.com/MArpogaus/tensorflow_time_series_dataset/graphs/contributors][https://img.shields.io/github/contributors/MArpogaus/tensorflow_time_series_dataset.svg?style=flat-square]] +[[https://github.com/MArpogaus/tensorflow_time_series_dataset/network/members][https://img.shields.io/github/forks/MArpogaus/tensorflow_time_series_dataset.svg?style=flat-square]] +[[https://github.com/MArpogaus/tensorflow_time_series_dataset/stargazers][https://img.shields.io/github/stars/MArpogaus/tensorflow_time_series_dataset.svg?style=flat-square]] +[[https://github.com/MArpogaus/tensorflow_time_series_dataset/issues][https://img.shields.io/github/issues/MArpogaus/tensorflow_time_series_dataset.svg?style=flat-square]] +[[https://github.com/MArpogaus/tensorflow_time_series_dataset/blob/main/LICENSE][https://img.shields.io/github/license/MArpogaus/tensorflow_time_series_dataset.svg?style=flat-square]] +[[https://github.com/MArpogaus/tensorflow_time_series_dataset/actions/workflows/test.yaml][https://img.shields.io/github/actions/workflow/status/MArpogaus/tensorflow_time_series_dataset/test.yaml.svg?label=test&style=flat-square]] +[[https://github.com/MArpogaus/tensorflow_time_series_dataset/blob/main/.pre-commit-config.yaml][https://img.shields.io/badge/pre--commit-enabled-brightgreen.svg?logo=pre-commit&style=flat-square]] +[[https://linkedin.com/in/MArpogaus][https://img.shields.io/badge/-LinkedIn-black.svg?style=flat-square&logo=linkedin&colorB=555]] + +[[https://pypi.org/project/tensorflow_time_series_dataset][https://img.shields.io/pypi/v/tensorflow_time_series_dataset.svg?style=flat-square]] + +* TensorFlow time-series Dataset + +#+TOC: headlines 2 local + +** About The Project + +This python package should help you to create TensorFlow datasets for time-series data. + +** Installation + +This package is available on [[https://pypi.org/project/tensorflow-time-series-dataset/][PyPI]]. +You install it and all of its dependencies using pip: + +#+begin_src bash :exports nil + pip install tensorflow_time_series_dataset +#+end_src + +** Usage + +*** Example Data +Suppose you have a dataset in the following form: + +#+NAME: df +#+begin_src python :session :exports both + import numpy as np + import pandas as pd + + # make things determeinisteic + np.random.seed(1) + + columns=['x1', 'x2', 'x3'] + periods=48 * 14 + test_df=pd.DataFrame( + index=pd.date_range( + start='1/1/1992', + periods=periods, + freq='30min' + ), + data=np.stack( + [ + np.random.normal(0,0.5,periods), + np.random.normal(1,0.5,periods), + np.random.normal(2,0.5,periods) + ], + axis=1 + ), + columns=columns + ) + test_df.head() +#+end_src + +#+RESULTS: df +: x1 x2 x3 +: 1992-01-01 00:00:00 0.812173 1.205133 1.578044 +: 1992-01-01 00:30:00 -0.305878 1.429935 1.413295 +: 1992-01-01 01:00:00 -0.264086 0.550658 1.602187 +: 1992-01-01 01:30:00 -0.536484 1.159828 1.644974 +: 1992-01-01 02:00:00 0.432704 1.159077 2.005718 + + +*** Single-Step Prediction +The factory class =WindowedTimeSeriesDatasetFactory= is used to create a TensorFlow dataset from pandas dataframes, or other data sources as we will see later. +We will use it now to create a dataset with =48= historic time-steps as the input to predict a single time-step in the future. + +#+NAME: ds1 +#+begin_src python :session :exports both + from tensorflow_time_series_dataset.factory import WindowedTimeSeriesDatasetFactory as Factory + + factory_kwargs=dict( + history_size=48, + prediction_size=1, + history_columns=['x1', 'x2', 'x3'], + prediction_columns=['x3'], + batch_size=4, + drop_remainder=True, + ) + factory=Factory(**factory_kwargs) + ds1=factory(test_df) + ds1 +#+end_src + +This returns the following TensorFlow Dataset: + +#+RESULTS: ds1 +: <_PrefetchDataset element_spec=(TensorSpec(shape=(4, 48, 3), dtype=tf.float32, name=None), TensorSpec(shape=(4, 1, 1), dtype=tf.float32, name=None))> + +We can plot the result with the utility function =plot_path=: + +#+NAME: ds1_plot +#+begin_src python :session :results raw :exports both + from tensorflow_time_series_dataset.utils.visualisation import plot_patch + + githubusercontent="https://raw.githubusercontent.com/MArpogaus/tensorflow_time_series_dataset/master/" + + fig=plot_patch( + ds1, + figsize=(8,4), + ,**factory_kwargs + ) + + fname='.images/example1.svg' + fig.savefig(fname) + + f"[[{githubusercontent}{fname}]]" +#+end_src + +#+RESULTS: ds1_plot +[[https://raw.githubusercontent.com/MArpogaus/tensorflow_time_series_dataset/master/.images/example1.svg]] + +*** Multi-Step Prediction +Lets now increase the prediction size to =6= half-hour time-steps. +#+Name: ds2 +#+begin_src python :session :exports both + factory_kwargs.update(dict( + prediction_size=6 + )) + factory=Factory(**factory_kwargs) + ds2=factory(test_df) + ds2 +#+end_src + + +This returns the following TensorFlow Dataset: +#+RESULTS: ds2 +: <_PrefetchDataset element_spec=(TensorSpec(shape=(4, 48, 3), dtype=tf.float32, name=None), TensorSpec(shape=(4, 6, 1), dtype=tf.float32, name=None))> + +Again, lets plot the results to see what changed: +#+NAME: ds2_plot +#+begin_src python :session :results raw :exports both + fig=plot_patch( + ds2, + figsize=(8,4), + ,**factory_kwargs + ) + + fname='.images/example2.svg' + fig.savefig(fname) + + f"[[{githubusercontent}{fname}]]" +#+end_src + +#+RESULTS: ds2_plot +[[https://raw.githubusercontent.com/MArpogaus/tensorflow_time_series_dataset/master/.images/example2.svg]] + + +*** Preprocessing: Add Metadata features +Preprocessors can be used to transform the data before it is fed into the model. +A Preprocessor can be any python callable. +In this case we will be using the a class called =CyclicalFeatureEncoder= to encode our one-dimensional cyclical features like the /time/ or /weekday/ to two-dimensional coordinates using a sine and cosine transformation as suggested in [this blogpost](https://www.kaggle.com/avanwyk/encoding-cyclical-features-for-deep-learning). +#+NAME: ds3 +#+begin_src python :session :exports both + import itertools + from tensorflow_time_series_dataset.preprocessors import CyclicalFeatureEncoder + encs = { + "weekday": dict(cycl_max=6), + "dayofyear": dict(cycl_max=366, cycl_min=1), + "month": dict(cycl_max=12, cycl_min=1), + "time": dict( + cycl_max=24 * 60 - 1, + cycl_getter=lambda df, k: df.index.hour * 60 + df.index.minute, + ), + } + factory_kwargs.update(dict( + meta_columns=list(itertools.chain(*[[c+'_sin', c+'_cos'] for c in encs.keys()])) + )) + factory=Factory(**factory_kwargs) + for name, kwargs in encs.items(): + factory.add_preprocessor(CyclicalFeatureEncoder(name, **kwargs)) + + ds3=factory(test_df) + ds3 +#+end_src + +This returns the following TensorFlow Dataset: +#+RESULTS: ds3 +: <_PrefetchDataset element_spec=((TensorSpec(shape=(4, 48, 3), dtype=tf.float32, name=None), TensorSpec(shape=(4, 1, 8), dtype=tf.float32, name=None)), TensorSpec(shape=(4, 6, 1), dtype=tf.float32, name=None))> + +Again, lets plot the results to see what changed: +#+NAME: ds3_plot +#+begin_src python :session :results raw :exports both + fig=plot_patch( + ds3, + figsize=(8,4), + ,**factory_kwargs + ) + + fname='.images/example3.svg' + fig.savefig(fname) + + f"[[{githubusercontent}{fname}]]" +#+end_src + +#+RESULTS: ds3_plot +[[https://raw.githubusercontent.com/MArpogaus/tensorflow_time_series_dataset/master/.images/example3.svg]] + +** Contributing + +Any Contributions are greatly appreciated! If you have a question, an issue or would like to contribute, please read our [[file:CONTRIBUTING.md][contributing guidelines]]. + + +** License + +Distributed under the [[file:LICENSE][Apache License 2.0]] + +** Contact + +[[https://github.com/marpogaus][Marcel Arpogaus]] - [[mailto:marcel.arpogaus@gmail.com][marcel.arpogaus@gmail.com]] + +Project Link: +[[https://github.com/MArpogaus/tensorflow_time_series_dataset]] + +** Acknowledgments + +Parts of this work have been funded by the Federal Ministry for the Environment, Nature Conservation and Nuclear Safety due to a decision of the German Federal Parliament (AI4Grids: 67KI2012A). diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..31f4f4d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,80 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools >= 61", "setuptools_scm>=7"] + +[project] +classifiers = [ + "Operating System :: OS Independent", + "Intended Audience :: Developers", + "Intended Audience :: Education", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Scientific/Engineering :: Mathematics", + "Topic :: Scientific/Engineering", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development" +] +dependencies = [ + "tensorflow>=2.5", + "pandas>=1.1.0", + "matplotlib>=3.5" +] +dynamic = ["version"] +license = {text = "Apache Software License (Apache 2.0)"} +name = "tensorflow_time_series_dataset" +readme = "README.md" +requires-python = ">=3.7" + +[project.optional-dependencies] +develop = [ + 'tensorflow_time_series_dataset[test]', + 'commitizen', + 'ipykernel', + 'pre-commit', + 'python-lsp-ruff', + 'python-lsp-server[all]', + 'ruff' +] +docs = [ + 'tensorflow_time_series_dataset[test]', + 'pdoc' +] +test = [ + 'pytest', + 'pytest-xdist' +] + +[project.urls] +Changelog = "https://github.com/MArpogaus/tensorflow_time_series_dataset/blob/dev/CHANGELOG.md" +Documentation = "https://marpogaus.github.io/tensorflow_time_series_dataset" +Issues = "https://github.com/MArpogaus/tensorflow_time_series_dataset/issues" +Repository = "https://github.com/MArpogaus/tensorflow_time_series_dataset" + +[tool.commitizen] +major_version_zero = true +name = "cz_conventional_commits" +tag_format = "v$version" +update_changelog_on_bump = true +version_provider = "scm" +version_scheme = "pep440" + +[tool.ruff] +indent-width = 4 +line-length = 88 +target-version = "py37" + +[tool.ruff.lint] +select = ["I", "E", "F"] + +[tool.setuptools_scm] +# https://stackoverflow.com/a/74404703 +# omits local version, useful because pypi does not support it +local_scheme = "no-local-version" diff --git a/setup.py b/setup.py index 34b2407..7f4ecfe 100644 --- a/setup.py +++ b/setup.py @@ -1,41 +1,8 @@ -from setuptools import setup, find_packages +# Only provided for legacy compatibility +# https://setuptools.pypa.io/en/latest/userguide/pyproject_config.html +# +# Metadate is specified pyproject.toml -package = "tensorflow_time_series_dataset" -version = "0.1" +from setuptools import setup -with open("README.md", "r", encoding="utf-8") as fh: - long_description = fh.read() - -setup( - name=package, - version=version, - author="Marcel Arpogaus", - author_email="marcel.arpogaus@gmail.com", - packages=find_packages("src"), - package_dir={"": "src"}, - install_requires=[ - "tensorflow==2.6.*", - ], - description="A tensorflow dataset from time series data.", - long_description=long_description, - long_description_content_type="text/markdown", - classifiers=[ - "Operating System :: OS Independent", - "Intended Audience :: Developers", - "Intended Audience :: Education", - "Intended Audience :: Science/Research", - "License :: OSI Approved :: Apache Software License", - "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Topic :: Scientific/Engineering :: Artificial Intelligence", - "Topic :: Scientific/Engineering :: Mathematics", - "Topic :: Scientific/Engineering", - "Topic :: Software Development :: Libraries :: Python Modules", - "Topic :: Software Development :: Libraries", - "Topic :: Software Development", - ], - python_requires=">=3.6", -) +setup() diff --git a/src/tensorflow_time_series_dataset/__init__.py b/src/tensorflow_time_series_dataset/__init__.py new file mode 100644 index 0000000..0ae4d86 --- /dev/null +++ b/src/tensorflow_time_series_dataset/__init__.py @@ -0,0 +1,25 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : __init__.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 13:55:19 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +""".. include:: ../../README.md""" diff --git a/src/tensorflow_time_series_dataset/factory.py b/src/tensorflow_time_series_dataset/factory.py new file mode 100644 index 0000000..ff47474 --- /dev/null +++ b/src/tensorflow_time_series_dataset/factory.py @@ -0,0 +1,146 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : windowed_time_series_dataset_factory.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 12:57:42 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +from typing import Any, Dict, List + +import tensorflow as tf + +from .pipeline import WindowedTimeSeriesPipeline + + +class WindowedTimeSeriesDatasetFactory: + """A factory to initialize a new WindowedTimeSeriesPipeline. + + Parameters + ---------- + history_columns : List[str] + List of columns representing the history data. + prediction_columns : List[str] + List of columns representing the prediction data. + meta_columns : List[str], optional + List of columns representing the meta data, by default []. + dtype : tf.DType, optional + Data type of the dataset, by default tf.float32. + **pipeline_kwargs : Any + Keyword arguments for the data pipeline. + + """ + + default_pipeline_kwargs: Dict[str, Any] = { + "shift": None, + "batch_size": 32, + "cycle_length": 1, + "shuffle_buffer_size": 1000, + "cache": True, + } + + def __init__( + self, + history_columns: List[str], + prediction_columns: List[str], + meta_columns: List[str] = [], + dtype: tf.DType = tf.float32, + **pipeline_kwargs: Any, + ) -> None: + self.columns: set = set(history_columns + meta_columns + prediction_columns) + self.preprocessors: List[Any] = [] + self.data_loader: Any = None + self.dtype: tf.DType = dtype + self.data_pipeline: WindowedTimeSeriesPipeline = WindowedTimeSeriesPipeline( + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + **{**self.default_pipeline_kwargs, **pipeline_kwargs}, + ) + + def add_preprocessor(self, preprocessor: Any) -> None: + """Add a preprocessor to the list of preprocessors. + + Parameters + ---------- + preprocessor : Any + The preprocessor function to be added. + + """ + self.preprocessors.append(preprocessor) + + def set_data_loader(self, data_loader: Any) -> None: + """Set a data loader for the dataset. + bk. + + Parameters + ---------- + data_loader : Any + The data loader function. + + """ + self.data_loader = data_loader + + def get_dataset(self, data: Any = None) -> tf.data.Dataset: + """Get the dataset for the given data. + + Parameters + ---------- + data : Any, optional + The data to create the dataset from, by default None. + + Returns + ------- + tf.data.Dataset + The created TensorFlow dataset. + + """ + if self.data_loader is not None and data is None: + data = self.data_loader() + elif data is not None: + data = data + else: + raise ValueError("No data provided") + + for preprocessor in self.preprocessors: + data = preprocessor(data) + + if not isinstance(data, tf.data.Dataset): + data = data[sorted(self.columns)] + tensors = tf.convert_to_tensor(data, dtype=self.dtype) + data = tf.data.Dataset.from_tensors(tensors) + + ds = self.data_pipeline(data) + return ds + + def __call__(self, data: Any = None) -> tf.data.Dataset: + """Call method to get the dataset. + + Parameters + ---------- + data : Any, optional + The data to create the dataset from, by default None. + + Returns + ------- + tf.data.Dataset + The created TensorFlow dataset. + + """ + return self.get_dataset(data) diff --git a/src/tensorflow_time_series_dataset/loaders/__init__.py b/src/tensorflow_time_series_dataset/loaders/__init__.py new file mode 100644 index 0000000..fd970f4 --- /dev/null +++ b/src/tensorflow_time_series_dataset/loaders/__init__.py @@ -0,0 +1,25 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : __init__.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-15 17:17:28 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +from .csv_data_loader import CSVDataLoader # noqa: F401 diff --git a/src/tensorflow_time_series_dataset/loaders/csv_data_loader.py b/src/tensorflow_time_series_dataset/loaders/csv_data_loader.py new file mode 100644 index 0000000..d30e494 --- /dev/null +++ b/src/tensorflow_time_series_dataset/loaders/csv_data_loader.py @@ -0,0 +1,93 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : csv_data_loader.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 13:01:07 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +from typing import Union + +import pandas as pd + + +def _read_csv_file( + file_path: str, date_time_col: str = "date_time", **kwargs: Union[str, bool] +) -> pd.DataFrame: + """Read CSV file into a pandas DataFrame. + + Parameters + ---------- + file_path : str + File path to the CSV file. + date_time_col : str, optional + Name of the datetime column in the CSV file. + **kwargs + Additional keyword arguments for pd.read_csv. + + Returns + ------- + pd.DataFrame + DataFrame containing the data from the CSV file. + + Raises + ------ + ValueError + If the data contains NaN values. + + """ + load_data = pd.read_csv( + file_path, + parse_dates=[date_time_col], + index_col=[date_time_col], + **kwargs, + ) + + if load_data.isnull().any().sum() != 0: + raise ValueError("Data contains NaNs") + + return load_data + + +class CSVDataLoader: + """Load data from a CSV file. + + Parameters + ---------- + file_path : str + File path to the CSV file. + **kwargs + Additional keyword arguments for pd.read_csv. + + """ + + def __init__(self, file_path: str, **kwargs: Union[str, bool]): + self.file_path = file_path + self.kwargs = kwargs + + def __call__(self) -> pd.DataFrame: + """Load data from the CSV file using _read_csv_file. + + Returns + ------- + pd.DataFrame + DataFrame containing the data from the CSV file. + + """ + return _read_csv_file(self.file_path, **self.kwargs) diff --git a/src/tensorflow_time_series_dataset/pipeline/__init__.py b/src/tensorflow_time_series_dataset/pipeline/__init__.py new file mode 100644 index 0000000..368492d --- /dev/null +++ b/src/tensorflow_time_series_dataset/pipeline/__init__.py @@ -0,0 +1,25 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : __init__.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-15 17:17:46 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +from .windowed_time_series_pipeline import WindowedTimeSeriesPipeline # noqa: F401 diff --git a/src/tensorflow_time_series_dataset/pipeline/patch_generator.py b/src/tensorflow_time_series_dataset/pipeline/patch_generator.py new file mode 100644 index 0000000..447373e --- /dev/null +++ b/src/tensorflow_time_series_dataset/pipeline/patch_generator.py @@ -0,0 +1,76 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : patch_generator.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 12:52:06 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### + +import tensorflow as tf + + +class PatchGenerator: + """A generator class that creates windows from provided time-series data. + + Attributes + ---------- + window_size : int + The size of each patch. + shift : int + The shift between patches. + + """ + + def __init__(self, window_size: int, shift: int) -> None: + """Parameters + ---------- + window_size : int + The size of each patch. + shift : int + The shift between patches. + + """ + self.window_size: int = window_size + self.shift: int = shift + + def __call__(self, data: tf.Tensor) -> tf.data.Dataset: + """Converts input data into patches of provided window size. + + Parameters + ---------- + data : tf.Tensor + The input data to generate patches from. + + Returns + ------- + tf.data.Dataset + A dataset of patches generated from the input data. + + """ + + data_set: tf.data.Dataset = tf.data.Dataset.from_tensor_slices(data) + + data_set = data_set.window( + size=self.window_size, + shift=self.shift, + drop_remainder=True, + ).flat_map(lambda sub: sub.batch(self.window_size, drop_remainder=True)) + + return data_set diff --git a/src/tensorflow_time_series_dataset/pipeline/patch_processor.py b/src/tensorflow_time_series_dataset/pipeline/patch_processor.py new file mode 100644 index 0000000..2b2d673 --- /dev/null +++ b/src/tensorflow_time_series_dataset/pipeline/patch_processor.py @@ -0,0 +1,133 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : patch_processor.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 12:41:38 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +from typing import Dict, List, Tuple + +import tensorflow as tf + + +class PatchPreprocessor: + """A class for preprocessing patches of time series data. + + Parameters + ---------- + history_size : int + The number of past time steps to include for each sample. + history_columns : List[str] + The names of columns in the dataset that are part of the historical data. + meta_columns : List[str] + The names of columns in the dataset that are part of the meta data. + prediction_columns : List[str] + The names of columns in the dataset that are to be predicted. + + Attributes + ---------- + history_size : int + The number of past time steps to include for each sample. + history_columns : List[str] + The names of columns in the dataset that are part of the historical data. + meta_columns : List[str] + The names of columns in the dataset that are part of the meta data. + prediction_columns : List[str] + The names of columns in the dataset that are to be predicted. + column_idx : Dict[str, int] + A mapping from column names to their indices in the input dataset. + + """ + + def __init__( + self, + history_size: int, + history_columns: List[str], + meta_columns: List[str], + prediction_columns: List[str], + ) -> None: + self.history_size: int = history_size + self.history_columns: List[str] = history_columns + self.meta_columns: List[str] = meta_columns + self.prediction_columns: List[str] = prediction_columns + + assert ( + len(set(history_columns + meta_columns)) != 0 + ), "No feature columns provided" + if len(meta_columns) == 0: + assert history_size > 0, ( + "history_size must be a positive integer greater than zero" + ", when no meta date is used" + ) + else: + assert history_size >= 0, "history_size must be a positive integer" + assert len(prediction_columns) != 0, "No prediction columns provided" + + columns = sorted(list(set(history_columns + prediction_columns + meta_columns))) + self.column_idx: Dict[str, int] = {c: i for i, c in enumerate(columns)} + + def __call__(self, patch: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]: + """Process a single patch of data. + + Parameters + ---------- + patch : tf.Tensor + A tensor representing a patch of time series data. + + Returns + ------- + Tuple[tf.Tensor, tf.Tensor] + A tuple containing the input features (as one or more tensors) and + the target values as tensors. + + """ + y: List[tf.Tensor] = [] + x_hist: List[tf.Tensor] = [] + x_meta: List[tf.Tensor] = [] + + x_columns = sorted(set(self.history_columns + self.meta_columns)) + y_columns = sorted(self.prediction_columns) + + assert ( + len(set(x_columns + y_columns)) == patch.shape[-1] + ), "Patch shape does not match column number" + + for c in y_columns: + column = patch[self.history_size :, self.column_idx[c]] + y.append(column) + + for c in x_columns: + column = patch[:, self.column_idx[c], None] + if self.history_size and c in self.history_columns: + x_hist.append(column[: self.history_size, 0]) + if c in self.meta_columns: + x_meta.append(column[self.history_size, None, ...]) + + y = tf.stack(y, axis=-1) + x: List[tf.Tensor] = [] + if len(x_hist): + x.append(tf.stack(x_hist, axis=-1)) + if len(x_meta): + x.append(tf.concat(x_meta, axis=-1)) + + if len(x) > 1: + return tuple(x), y + else: + return x[0], y diff --git a/src/tensorflow_time_series_dataset/pipeline/windowed_time_series_pipeline.py b/src/tensorflow_time_series_dataset/pipeline/windowed_time_series_pipeline.py new file mode 100644 index 0000000..36ea891 --- /dev/null +++ b/src/tensorflow_time_series_dataset/pipeline/windowed_time_series_pipeline.py @@ -0,0 +1,145 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : windowed_time_series_pipeline.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 12:53:06 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +# REQUIRED PYTHON MODULES ##################################################### +from typing import List, Union + +import tensorflow as tf +from tensorflow.data import Dataset + +from tensorflow_time_series_dataset.pipeline.patch_generator import PatchGenerator +from tensorflow_time_series_dataset.pipeline.patch_processor import PatchPreprocessor + + +class WindowedTimeSeriesPipeline: + """A Pipeline to process time-series data in a rolling window for TensorFlow models. + + This class provides functionality to generate windowed datasets from time series + data for training, validating, and testing machine learning models. It allows for + the generation of datasets that include historical data windows along with + corresponding future values (for either regression or classification tasks). + + Parameters + ---------- + history_size : int + The size of the history window. + prediction_size : int + The size of the prediction window. + history_columns : List[str] + The names of the columns to be used for the history window. + meta_columns : List[str] + The names of the columns to be used for metadata in each window. + prediction_columns : List[str] + The names of the columns from which the future values are predicted. + shift : int + The shift (in time units) for the sliding window. + batch_size : int + The size of the batch for each dataset iteration. + cycle_length : int + The number of data files that the dataset can process in parallel. + shuffle_buffer_size : int + The buffer size for data shuffling. + cache : Union[str, bool] + Whether to cache the dataset in memory or to a specific file. + drop_remainder : bool + Whether to drop the remainder of batches that are not equal to the batch size. + + Raises + ------ + AssertionError + If the prediction_size is not greater than zero. + + """ + + def __init__( + self, + history_size: int, + prediction_size: int, + history_columns: List[str], + meta_columns: List[str], + prediction_columns: List[str], + shift: int, + batch_size: int, + cycle_length: int, + shuffle_buffer_size: int, + cache: Union[str, bool], + drop_remainder: bool, + ) -> None: + assert ( + prediction_size > 0 + ), "prediction_size must be a positive integer greater than zero" + self.history_size = history_size + self.prediction_size = prediction_size + self.window_size = history_size + prediction_size + self.history_columns = history_columns + self.meta_columns = meta_columns + self.prediction_columns = prediction_columns + self.shift = shift + self.batch_size = batch_size + self.cycle_length = cycle_length + self.shuffle_buffer_size = shuffle_buffer_size + self.cache = cache + self.drop_remainder = drop_remainder + + def __call__(self, ds: Dataset) -> Dataset: + """Applies the pipeline operations to the given dataset. + + Parameters + ---------- + ds : Dataset + The input dataset to transform. + + Returns + ------- + Dataset + The transformed dataset, ready for model training or evaluation. + + """ + ds = ds.interleave( + PatchGenerator(self.window_size, self.shift), + cycle_length=self.cycle_length, + num_parallel_calls=tf.data.experimental.AUTOTUNE, + ) + + ds = ds.map( + PatchPreprocessor( + self.history_size, + self.history_columns, + self.meta_columns, + self.prediction_columns, + ), + num_parallel_calls=tf.data.experimental.AUTOTUNE, + ) + + if self.cache: + ds = ds.cache() + + if self.shuffle_buffer_size: + ds = ds.shuffle(self.shuffle_buffer_size) + + ds = ds.batch(self.batch_size, drop_remainder=self.drop_remainder) + + ds = ds.prefetch(tf.data.experimental.AUTOTUNE) + + return ds diff --git a/src/tensorflow_time_series_dataset/preprocessors/__init__.py b/src/tensorflow_time_series_dataset/preprocessors/__init__.py new file mode 100644 index 0000000..a52d1ef --- /dev/null +++ b/src/tensorflow_time_series_dataset/preprocessors/__init__.py @@ -0,0 +1,28 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : __init__.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-15 17:18:03 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +from .cyclical_feature_encoder import CyclicalFeatureEncoder # noqa: F401 +from .groupby_dataset_generator import GroupbyDatasetGenerator # noqa: F401 +from .time_series_k_fold import TimeSeriesKFold # noqa: F401 +from .time_series_split import TimeSeriesSplit # noqa: F401 diff --git a/src/tensorflow_time_series_dataset/preprocessors/cyclical_feature_encoder.py b/src/tensorflow_time_series_dataset/preprocessors/cyclical_feature_encoder.py new file mode 100644 index 0000000..5801c72 --- /dev/null +++ b/src/tensorflow_time_series_dataset/preprocessors/cyclical_feature_encoder.py @@ -0,0 +1,148 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : cyclical_feature_encoder.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 12:53:30 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +from typing import Any, Callable, Tuple + +import numpy as np +import pandas as pd + + +def default_cycl_getter(df: pd.DataFrame, k: str) -> Any: + """Get the cyclical feature 'k' from a DataFrame or its index. + + Parameters + ---------- + df : pd.DataFrame + DataFrame object. + k : str + Key to get the cyclical feature. + + Returns + ------- + Any + Cyclical feature corresponding to 'k'. + + """ + try: + cycl = getattr(df, k) + except AttributeError: + cycl = getattr(df.index, k) + return cycl + + +class CyclicalFeatureEncoder: + def __init__( + self, + cycl_name: str, + cycl_max: int, + cycl_min: int = 0, + cycl_getter: Callable = default_cycl_getter, + ) -> None: + """Initialize the CyclicalFeatureEncoder object. + + Parameters + ---------- + cycl_name : str + Name of the cyclical feature. + cycl_max : int + Maximum value of the cyclical feature. + cycl_min : int, optional + Minimum value of the cyclical feature, by default 0. + cycl_getter : Callable, optional + Function to get the cyclical feature, by default default_cycl_getter. + + """ + self.cycl_max = cycl_max + self.cycl_min = cycl_min + self.cycl_getter = cycl_getter + self.cycl_name = cycl_name + + def encode(self, data: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray]: + """Encode the cyclical feature into sine and cosine components. + + Parameters + ---------- + data : pd.DataFrame + Data containing the cyclical feature. + + Returns + ------- + Tuple[np.ndarray, np.ndarray] + Sine and cosine components of the encoded cyclical feature. + + """ + cycl = self.cycl_getter(data, self.cycl_name) + sin = np.sin( + 2 * np.pi * (cycl - self.cycl_min) / (self.cycl_max - self.cycl_min + 1) + ) + cos = np.cos( + 2 * np.pi * (cycl - self.cycl_min) / (self.cycl_max - self.cycl_min + 1) + ) + assert np.allclose(cycl, self.decode(sin, cos)), ( + 'Decoding failed. Is "cycl_min/max"' + f"({self.cycl_min}/{self.cycl_max}) correct?" + ) + return sin, cos + + def __call__(self, data: pd.DataFrame) -> pd.DataFrame: + """Call method to encode the cyclical feature and add the sine and cosine + components as new columns in the data. + + Parameters + ---------- + data : pd.DataFrame + Data containing the cyclical feature. + + Returns + ------- + pd.DataFrame + Updated data with sine and cosine components. + + """ + data = data.copy() + sin, cos = self.encode(data) + data[self.cycl_name + "_sin"] = sin + data[self.cycl_name + "_cos"] = cos + return data + + def decode(self, sin: np.ndarray, cos: np.ndarray) -> np.ndarray: + """Decode the encoded sine and cosine components back to the cyclical feature. + + Parameters + ---------- + sin : np.ndarray + Sine component of the encoded cyclical feature. + cos : np.ndarray + Cosine component of the encoded cyclical feature. + + Returns + ------- + np.ndarray + Decoded cyclical feature. + + """ + angle = (np.arctan2(sin, cos) + 2 * np.pi) % (2 * np.pi) + return (angle * (self.cycl_max - self.cycl_min + 1)) / ( + 2 * np.pi + ) + self.cycl_min diff --git a/src/tensorflow_time_series_dataset/preprocessors/groupby_dataset_generator.py b/src/tensorflow_time_series_dataset/preprocessors/groupby_dataset_generator.py new file mode 100644 index 0000000..877cb0c --- /dev/null +++ b/src/tensorflow_time_series_dataset/preprocessors/groupby_dataset_generator.py @@ -0,0 +1,116 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : groupby_dataset_generator.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 12:57:02 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### + +from typing import Callable, List + +import pandas as pd +import tensorflow as tf + + +class GroupbyDatasetGenerator: + """Creates a generator that yields the data for each group in the given column. + + Parameters + ---------- + groupby : str + The column name to group the dataframe by. + columns : List[str] + The list of columns to keep in the dataset. + dtype : tf.dtypes.DType, optional + The dtype of the output tensors, by default tf.float32. + shuffle : bool, optional + Whether to shuffle the dataset or not, by default False. + test_mode : bool, optional + Whether the generator is in test mode or not, by default False. + + """ + + def __init__( + self, + groupby: str, + columns: List[str], + dtype: tf.dtypes.DType = tf.float32, + shuffle: bool = False, + test_mode: bool = False, + ) -> None: + self.groupby: str = groupby + self.columns: List[str] = sorted(list(set(columns))) + self.dtype: tf.dtypes.DType = dtype + self.shuffle: bool = shuffle + self.test_mode: bool = test_mode + + def get_generator(self, df: pd.DataFrame) -> Callable[[], tf.Tensor]: + """Returns the dataset generator with the given parameters. + + Parameters + ---------- + df : pd.DataFrame + The source DataFrame to generate data from. + + Returns + ------- + Callable[[], tf.Tensor] + A function that when called returns a generator yielding batches of + data as tensors. + + """ + df.sort_index(inplace=True) + if self.test_mode: + ids = df[self.groupby].unique() + ids = ids[:2] + df = df[df[self.groupby].isin(ids)] + + grpd = df.groupby(self.groupby) + + def generator() -> tf.Tensor: + for _, d in grpd: + yield d[self.columns].values.astype(self.dtype.as_numpy_dtype) + + return generator + + def __call__(self, df: pd.DataFrame) -> tf.data.Dataset: + """Makes the class instance callable and returns a TensorFlow dataset. + + Parameters + ---------- + df : pd.DataFrame + The DataFrame to create the dataset from. + + Returns + ------- + tf.data.Dataset + A TensorFlow dataset created from the DataFrame. + + """ + ds: tf.data.Dataset = tf.data.Dataset.from_generator( + self.get_generator(df), + output_signature=( + tf.TensorSpec(shape=[None, len(self.columns)], dtype=self.dtype) + ), + ) + if self.shuffle: + len_ids = df[self.groupby].unique().size + ds = ds.shuffle(len_ids) + return ds diff --git a/src/tensorflow_time_series_dataset/preprocessors/time_series_k_fold.py b/src/tensorflow_time_series_dataset/preprocessors/time_series_k_fold.py new file mode 100644 index 0000000..e2d07b0 --- /dev/null +++ b/src/tensorflow_time_series_dataset/preprocessors/time_series_k_fold.py @@ -0,0 +1,73 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : time_series_k_fold.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 12:35:49 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +# REQUIRED PYTHON MODULES ##################################################### +from typing import Dict, List + +import numpy as np +import pandas as pd + + +class TimeSeriesKFold: + """Time series cross-validation using a sliding window. + + Parameters + ---------- + fold : int + The current fold to be used. + n_folds : int, optional + The total number of folds (the default is 20). + + """ + + def __init__(self, fold: int, n_folds: int = 20) -> None: + self.n_folds: int = n_folds + self.fold: int = fold + + def __call__(self, data: pd.DataFrame) -> pd.DataFrame: + """Divide the data into train/test sets for the current fold. + + Parameters + ---------- + data : pd.DataFrame + The data to be folded, requiring a datetime index. + + Returns + ------- + pd.DataFrame + The subset of data corresponding to the current fold. + + """ + days: pd.DatetimeIndex = pd.date_range( + data.index.min(), data.index.max(), freq="D" + ) + fold_idx: List[np.ndarray] = np.array_split(days.to_numpy(), self.n_folds) + folds: Dict[int, List[str]] = { + f: (idx[[0, -1]].astype("datetime64[m]") + np.array([0, 60 * 24 - 1])) + .astype(str) + .tolist() + for f, idx in enumerate(fold_idx) + } + + return data[folds[self.fold][0] : folds[self.fold][1]] diff --git a/src/tensorflow_time_series_dataset/preprocessors/time_series_split.py b/src/tensorflow_time_series_dataset/preprocessors/time_series_split.py new file mode 100644 index 0000000..516e373 --- /dev/null +++ b/src/tensorflow_time_series_dataset/preprocessors/time_series_split.py @@ -0,0 +1,72 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : time_series_split.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 12:09:44 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### + + +from typing import Any + +import numpy as np + + +class TimeSeriesSplit: + RIGHT: int = 0 + LEFT: int = 1 + + def __init__(self, split_size: float, split: int) -> None: + """Initialize the TimeSeriesSplit object. + + Parameters + ---------- + split_size : float + Proportion of split size for the time series data. + split : int + Split position indicator (LEFT or RIGHT). + + """ + self.split_size: float = split_size + self.split: int = split + + def __call__(self, data: Any) -> Any: + """Splits the time series data based on the split position. + + Parameters + ---------- + data : Any + Input time series data to split. + + Returns + ------- + Any + Left or right split of the time series data based on the split position. + + """ + data = data.sort_index() + days = data.index.date + days = days.astype("datetime64[m]") + right = days[int(len(days) * self.split_size)] + left = right - np.timedelta64(1, "m") + if self.split == self.LEFT: + return data.loc[: str(left)] + else: + return data.loc[str(right) :] diff --git a/src/tensorflow_time_series_dataset/utils/__init__.py b/src/tensorflow_time_series_dataset/utils/__init__.py new file mode 100644 index 0000000..d7c3685 --- /dev/null +++ b/src/tensorflow_time_series_dataset/utils/__init__.py @@ -0,0 +1,24 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : __init__.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2022-01-07 09:02:38 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### diff --git a/src/tensorflow_time_series_dataset/utils/test.py b/src/tensorflow_time_series_dataset/utils/test.py new file mode 100644 index 0000000..a05b062 --- /dev/null +++ b/src/tensorflow_time_series_dataset/utils/test.py @@ -0,0 +1,342 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : test.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 09:02:38 (Marcel Arpogaus) +# changed : 2024-02-19 13:09:50 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# Copyright 2022 Marcel Arpogaus +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### +from contextlib import nullcontext +from typing import ContextManager, List, Union + +import numpy as np +import pytest +from numpy.typing import ArrayLike +from pandas import DataFrame + + +def get_idx(ref: ArrayLike, val: float) -> int: + """Get the index of a value in a reference array. + + Parameters + ---------- + ref : ArrayLike + Reference array to search. + val : float + Value to find in the reference array. + + Returns + ------- + int + Index of the value in the reference array. + + """ + idx = np.where(ref == val)[0] + assert len(idx) == 1, "Could not determine index from reference value" + return int(idx) + + +def gen_patch(df: DataFrame, idx: int, size: int) -> np.ndarray: + """Generate a patch of data from a DataFrame. + + Parameters + ---------- + df : DataFrame + DataFrame containing the data. + idx : int + Starting index of the patch. + size : int + Size of the patch to generate. + + Returns + ------- + np.ndarray + Patch of data from the DataFrame. + + """ + x = df.values[idx : idx + size] + assert len(x) == size, "Could not generate patch from reference data" + return x + + +def gen_batch( + df: DataFrame, columns: list, size: int, refs: ArrayLike, ref_col: str +) -> np.ndarray: + """Generate a batch of data from a DataFrame based on reference values. + + Parameters + ---------- + df : DataFrame + DataFrame containing the data. + columns : list + List of columns to include in the batch. + size : int + Size of each patch in the batch. + refs : ArrayLike + Array of reference values. + ref_col : str + Name of the column containing reference values. + + Returns + ------- + np.ndarray + Batch of data based on reference values. + + """ + batch = [] + columns = list(sorted(columns)) + for ref in refs: + if "id" in df.columns: + id = ref // 1e5 + idx = get_idx(df[df.id == id][ref_col], ref) + p = gen_patch(df[df.id == id][columns], idx, size) + else: + idx = get_idx(df[ref_col], ref) + p = gen_patch(df[columns], idx, size) + batch.append(p) + return np.float32(batch) + + +def validate_dataset( + df: DataFrame, + ds: object, + batch_size: int, + history_size: int, + prediction_size: int, + shift: int, + history_columns: list, + meta_columns: list, + prediction_columns: list, + drop_remainder: bool, + history_reference_column: str = "ref", + meta_reference_column: str = "ref", + prediction_reference_column: str = "ref", + **kwargs, +) -> int: + """Validate the dataset generator with the given parameters. + + Parameters + ---------- + df : DataFrame + DataFrame containing the data. + ds : object + Dataset object used for validation. + batch_size : int + Size of each batch. + history_size : int + Size of the history window. + prediction_size : int + Size of the prediction window. + shift : int + Shift between consecutive windows. + history_columns : list + List of columns for the history window. + meta_columns : list + List of columns for the meta data. + prediction_columns : list + List of columns for the prediction. + drop_remainder : bool + Whether to drop remainder batches. + history_reference_column : str, optional + Name of the column containing history reference values. + meta_reference_column : str, optional + Name of the column containing meta reference values. + prediction_reference_column : str, optional + Name of the column containing prediction reference values. + + Returns + ------- + int + Number of batches validated. + + """ + df = df.sort_index() + + if "id" in df.columns: + ids = len(df.id.unique()) + else: + ids = 1 + + window_size = history_size + prediction_size + initial_size = window_size - shift + patch_data = df.index.unique().size - initial_size + patches = patch_data / shift * ids + has_remainder = (patches % batch_size != 0) and not drop_remainder + expected_batches = int(patches // batch_size) + int(has_remainder) + + history_columns = list(sorted(history_columns)) + meta_columns = list(sorted(meta_columns)) + prediction_columns = list(sorted(prediction_columns)) + + history_columns_idx = {c: i for i, c in enumerate(history_columns)} + meta_columns_idx = {c: i for i, c in enumerate(meta_columns)} + prediction_columns_idx = {c: i for i, c in enumerate(prediction_columns)} + + assert ( + len(set(history_columns + meta_columns + prediction_columns) - set(df.columns)) + == 0 + ), "Not all columns in test df" + assert ( + history_reference_column in df.columns + ), f"history_reference_column ({history_reference_column}) not in df.columns" + assert ( + meta_reference_column in df.columns + ), f"meta_reference_column ({meta_reference_column}) not in df.columns" + assert ( + prediction_reference_column in df.columns + ), f"prediction_reference_column ({prediction_reference_column}) not in df.columns" + + for batch_no, (x, y) in enumerate(ds.as_numpy_iterator(), start=1): + if batch_no == expected_batches and has_remainder: + bs = int(patches % batch_size) + else: + bs = batch_size + x1_shape = (bs, history_size, len(history_columns)) + x2_shape = (bs, 1, len(meta_columns)) + y_shape = (bs, prediction_size, len(prediction_columns)) + + x1, x2 = None, None + if history_size and len(history_columns) and len(meta_columns): + x1, x2 = x + elif history_size and len(history_columns): + x1 = x + elif len(meta_columns): + x2 = x + + if x1 is not None: + assert ( + x1.shape == x1_shape + ), f"Wrong shape: history ({batch_no}: {x1.shape} != {x1_shape})" + ref = x1[:, 0, history_columns_idx[history_reference_column]] + assert np.all( + x1 + == gen_batch( + df, history_columns, history_size, ref, history_reference_column + ) + ), f"Wrong data: history ({batch_no})" + if x2 is not None: + assert np.all( + x2 + == gen_batch( + df, + meta_columns, + 1, + ref + history_size, + history_reference_column, + ) + ), f"wrong data: meta not consecutive ({batch_no})" + + last_val = x1[:, -1, history_columns_idx[history_reference_column]] + y_test = gen_batch( + df, + prediction_columns, + prediction_size, + last_val + 1, + history_reference_column, + ) + assert np.all( + y == y_test + ), f"Wrong data: prediction not consecutive ({batch_no})" + + if x2 is not None: + first_val = x2[:, 0, meta_columns_idx[meta_reference_column]] + assert x2.shape == x2_shape, f"Wrong shape: meta ({batch_no})" + assert np.all( + x2 == gen_batch(df, meta_columns, 1, first_val, meta_reference_column) + ), f"Wrong data: meta ({batch_no})" + + assert y.shape == y_shape, f"Wrong shape: prediction ({batch_no})" + first_val = y[:, 0, prediction_columns_idx[prediction_reference_column]] + assert np.all( + y + == gen_batch( + df, + prediction_columns, + prediction_size, + first_val, + prediction_reference_column, + ) + ), f"Wrong data: prediction ({batch_no})" + + assert batch_no is not None, "No iteration. Is there enough data?" + + assert ( + batch_no == expected_batches + ), f"Not enough batches. {batch_no} != {expected_batches}" + + return batch_no + + +def validate_args( + prediction_size: int, + history_columns: List[str], + meta_columns: List[str], + history_size: int, + prediction_columns: List[str], +) -> ContextManager[Union[None]]: + """Validate input arguments of the PatchPreprocessor class. + + Parameters + ---------- + prediction_size : int + The size of the prediction. + history_columns : List[str] + List of history columns. + meta_columns : List[str] + List of meta columns. + history_size : int + The size of the history. + prediction_columns : List[str] + List of prediction columns. + + Returns + ------- + ctxmgr : contextlib.ContextManager + Context manager indicating if input is valid or raises an AssertionError. + + """ + if prediction_size <= 0: + ctxmgr = pytest.raises( + AssertionError, + match="prediction_size must be a positive integer greater than zero", + ) + elif len(set(history_columns + meta_columns)) == 0: + ctxmgr = pytest.raises( + AssertionError, + match="No feature columns provided", + ) + elif len(meta_columns) == 0 and history_size <= 0: + ctxmgr = pytest.raises( + AssertionError, + match="history_size must be a positive integer greater than zero," + " when no meta date is used", + ) + elif history_size < 0: + ctxmgr = pytest.raises( + AssertionError, + match="history_size must be a positive integer", + ) + elif len(prediction_columns) == 0: + ctxmgr = pytest.raises( + AssertionError, + match="No prediction columns provided", + ) + else: + ctxmgr = nullcontext() + return ctxmgr diff --git a/src/tensorflow_time_series_dataset/utils/visualisation.py b/src/tensorflow_time_series_dataset/utils/visualisation.py new file mode 100644 index 0000000..3b1a45c --- /dev/null +++ b/src/tensorflow_time_series_dataset/utils/visualisation.py @@ -0,0 +1,114 @@ +# -*- time-stamp-pattern: "changed[\s]+:[\s]+%%$"; -*- +# AUTHOR INFORMATION ########################################################## +# file : visualisation.py +# author : Marcel Arpogaus +# +# created : 2022-01-07 16:50:58 (Marcel Arpogaus) +# changed : 2024-02-19 12:07:12 (Marcel Arpogaus) +# DESCRIPTION ################################################################# +# ... +# LICENSE ##################################################################### +# ... +############################################################################### +from typing import Any, List, Optional, Tuple + +import matplotlib.pyplot as plt +import numpy as np +from tensorflow.data import Dataset + + +def plot_patch( + ds: Dataset, + history_size: int, + prediction_size: int, + history_columns: List[str], + prediction_columns: List[str], + meta_columns: Optional[List[str]] = None, + figsize: Tuple[int, int] = (16, 8), + **kwargs: Any, +) -> plt.Figure: + """Plot history and prediction data from a dataset. + + Parameters + ---------- + ds : Dataset + Dataset to extract data from. + history_size : int + Number of timesteps in the history. + prediction_size : int + Number of timesteps in the prediction. + history_columns : List[str] + Columns to use for history data. + prediction_columns : List[str] + Columns to use for prediction data. + meta_columns : List[str], optional + Columns for additional meta data. Defaults to []. + figsize : Tuple[int, int], optional + Size of the figure. Defaults to (16, 8). + + Returns + ------- + plt.Figure + The generated plot figure. + + """ + prop_cycle = plt.rcParams["axes.prop_cycle"] + colors = prop_cycle.by_key()["color"] + + x, y = next(ds.as_numpy_iterator()) + x1, x2 = None, None + + if history_size and len(history_columns) and len(meta_columns): + x1, x2 = x + elif history_size and len(history_columns): + x1 = x + elif len(meta_columns): + x2 = x + + x1_columns = sorted(history_columns) + x2_columns = sorted(meta_columns) + y_columns = sorted(prediction_columns) + + x1_column_ch = {k: c for c, k in enumerate(x1_columns)} + y_column_ch = {k: c for c, k in enumerate(y_columns)} + + fig = plt.figure(figsize=figsize) + if x1 is not None: + for c in x1_columns: + ch = x1_column_ch[c] + plt.plot( + np.arange(history_size), + x1[0, ..., ch], + color=colors[ch % len(colors)], + label=c, + ) + + if x2 is not None: + plt.table( + cellText=x2[0], + rowLabels=["meta data"], + colLabels=x2_columns, + colLoc="right", + loc="top", + edges="horizontal", + ) + for c in y_columns: + ch = y_column_ch[c] + color_idx = x1_column_ch.get(c, ch) + plt.scatter( + np.arange(prediction_size) + history_size, + y[0, ..., ch], + color=colors[color_idx % len(colors)], + label=c + " (target)", + s=32, + edgecolors="k", + ) + + # Adjust layout to make room for the table: + plt.subplots_adjust(left=0.1, top=1.2) + plt.legend(loc="lower right") + plt.xlabel("Time Steps") + plt.ylabel("Observations") + plt.tight_layout() + + return fig diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e7644cc --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,143 @@ +import itertools + +import numpy as np +import pandas as pd +import pytest +import tensorflow as tf + +columns = ["ref", "x1", "x2"] + +seed = 1 +tf.random.set_seed(seed) +np.random.seed(seed) + + +def gen_value(column, line, id=0): + if column == "ref": + return id * 1e5 + line + else: + return np.random.randint(0, 1000) + + +def gen_df(columns, date_range, id=0): + periods = date_range.size + df = pd.DataFrame( + { + "date_time": date_range, + **{ + col: [gen_value(col, line, id) for line in range(periods)] + for col in columns + }, + } + ) + return df + + +def gen_df_with_id(ids, columns, date_range): + dfs = [] + for i in ids: + df = gen_df(columns, date_range, i) + df["id"] = i + dfs.append(df) + df = pd.concat(dfs) + + return df + + +@pytest.fixture( + scope="function", + params=[(columns, 48 * 30 * 3)], +) +def time_series_df(request): + df = gen_df( + columns=request.param[0], + date_range=pd.date_range("1/1/1", periods=request.param[1], freq="30min"), + ) + return df + + +@pytest.fixture( + scope="function", + params=[ + (list(range(5)), columns, 48 * 30 * 3), + ], +) +def time_series_df_with_id(request): + ids, columns, periods = request.param + df = gen_df_with_id( + ids=ids, + columns=columns, + date_range=pd.date_range("1/1/1", periods=periods, freq="30min"), + ) + return df + + +@pytest.fixture(scope="function") +def tmp_csv(tmpdir_factory, time_series_df): + file_path = tmpdir_factory.mktemp("csv_data") / "test.csv" + + time_series_df.to_csv(file_path, index=False) + + return file_path, time_series_df + + +@pytest.fixture(scope="function") +def tmp_csv_with_id(tmpdir_factory, time_series_df_with_id): + file_path = tmpdir_factory.mktemp("csv_data") / "test.csv" + + time_series_df_with_id.to_csv(file_path, index=False) + + return file_path, time_series_df_with_id + + +@pytest.fixture(params=[0, 1, 48]) +def history_size(request): + return request.param + + +@pytest.fixture +def prediction_size(history_size): + return history_size + + +@pytest.fixture +def shift(prediction_size): + return prediction_size + + +@pytest.fixture(params=[32]) +def batch_size(request): + return request.param + + +@pytest.fixture(params=[[], ["ref"], ["x2", "ref"], columns]) +def history_columns(request): + return request.param + + +@pytest.fixture(params=[[], ["ref", "x1"]]) +def meta_columns(request): + return request.param + + +@pytest.fixture( + params=[ + [], + list( + itertools.chain( + ["ref"], + *[ + [c + "_sin", c + "_cos"] + for c in ["weekday", "dayofyear", "time", "month"] + ], + ) + ), + ] +) +def meta_columns_cycle(request): + return request.param + + +@pytest.fixture +def prediction_columns(history_columns): + return history_columns diff --git a/tests/test_factory.py b/tests/test_factory.py new file mode 100644 index 0000000..340daf8 --- /dev/null +++ b/tests/test_factory.py @@ -0,0 +1,215 @@ +from tensorflow_time_series_dataset.factory import WindowedTimeSeriesDatasetFactory +from tensorflow_time_series_dataset.loaders import CSVDataLoader +from tensorflow_time_series_dataset.preprocessors import ( + CyclicalFeatureEncoder, + GroupbyDatasetGenerator, + TimeSeriesSplit, +) +from tensorflow_time_series_dataset.utils.test import validate_args, validate_dataset + + +def test_windowed_time_series_dataset_factory( + time_series_df, + batch_size, + history_size, + prediction_size, + shift, + history_columns, + meta_columns, + prediction_columns, +): + df = time_series_df.set_index("date_time") + + common_kwargs = dict( + history_size=history_size, + prediction_size=prediction_size, + shift=shift, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + batch_size=batch_size, + drop_remainder=False, + ) + factory_kwargs = dict( + cycle_length=1, + shuffle_buffer_size=100, + ) + + with validate_args( + history_size=history_size, + prediction_size=prediction_size, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + ): + factory = WindowedTimeSeriesDatasetFactory(**common_kwargs, **factory_kwargs) + ds = factory(df) + validate_dataset( + df, + ds, + **common_kwargs, + ) + + +def test_windowed_time_series_dataset_factory_groupby( + time_series_df_with_id, + batch_size, + history_size, + prediction_size, + shift, + history_columns, + meta_columns, + prediction_columns, +): + df = time_series_df_with_id.set_index("date_time") + + ids = df.id.unique() + common_kwargs = dict( + history_size=history_size, + prediction_size=prediction_size, + shift=shift, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + batch_size=batch_size, + drop_remainder=True, + ) + factory_kwargs = dict( + cycle_length=len(ids), + shuffle_buffer_size=100, + ) + + with validate_args( + history_size=history_size, + prediction_size=prediction_size, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + ): + factory = WindowedTimeSeriesDatasetFactory(**common_kwargs, **factory_kwargs) + factory.add_preprocessor( + GroupbyDatasetGenerator( + "id", columns=history_columns + meta_columns + prediction_columns + ) + ) + + ds = factory(df) + validate_dataset( + df, + ds, + **common_kwargs, + ) + + +def test_windowed_time_series_dataset_factory_csv_loader( + tmp_csv, + batch_size, + history_size, + prediction_size, + shift, + history_columns, + meta_columns, + prediction_columns, +): + test_data_path, df = tmp_csv + df = df.set_index("date_time") + + common_kwargs = dict( + history_size=history_size, + prediction_size=prediction_size, + shift=shift, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + batch_size=batch_size, + drop_remainder=False, + ) + factory_kwargs = dict( + cycle_length=1, + shuffle_buffer_size=100, + ) + with validate_args( + history_size=history_size, + prediction_size=prediction_size, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + ): + factory = WindowedTimeSeriesDatasetFactory(**common_kwargs, **factory_kwargs) + factory.set_data_loader(CSVDataLoader(file_path=test_data_path)) + ds = factory() + validate_dataset( + df, + ds, + **common_kwargs, + ) + + +def test_windowed_time_series_dataset_factory_csv_loader_with_preprocessors( + tmp_csv_with_id, + batch_size, + history_size, + prediction_size, + shift, + history_columns, + meta_columns_cycle, + prediction_columns, +): + # define encoder args + # [name: kwargs] + encs = { + "weekday": dict(cycl_max=6), + "dayofyear": dict(cycl_max=366, cycl_min=1), + "month": dict(cycl_max=12, cycl_min=1), + "time": dict( + cycl_max=24 * 60 - 1, + cycl_getter=lambda df, k: df.index.hour * 60 + df.index.minute, + ), + } + test_data_path, df = tmp_csv_with_id + test_df = df.set_index("date_time") + for name, kwargs in encs.items(): + enc = CyclicalFeatureEncoder(name, **kwargs) + test_df = enc(test_df) + splitter = TimeSeriesSplit(0.5, TimeSeriesSplit.LEFT) + test_df = splitter(test_df) + + ids = df.id.unique() + + common_kwargs = dict( + history_size=history_size, + prediction_size=prediction_size, + shift=shift, + history_columns=history_columns, + meta_columns=meta_columns_cycle, + prediction_columns=prediction_columns, + batch_size=batch_size, + drop_remainder=True, + ) + factory_kwargs = dict( + cycle_length=len(ids), + shuffle_buffer_size=100, + ) + with validate_args( + history_size=history_size, + prediction_size=prediction_size, + history_columns=history_columns, + meta_columns=meta_columns_cycle, + prediction_columns=prediction_columns, + ): + factory = WindowedTimeSeriesDatasetFactory(**common_kwargs, **factory_kwargs) + factory.set_data_loader(CSVDataLoader(file_path=test_data_path)) + factory.add_preprocessor(splitter) + for name, kwargs in encs.items(): + factory.add_preprocessor(CyclicalFeatureEncoder(name, **kwargs)) + factory.add_preprocessor( + GroupbyDatasetGenerator( + "id", columns=history_columns + meta_columns_cycle + prediction_columns + ) + ) + ds = factory() + validate_dataset( + test_df, + ds, + **common_kwargs, + ) diff --git a/tests/test_loaders.py b/tests/test_loaders.py new file mode 100644 index 0000000..5e7ec7e --- /dev/null +++ b/tests/test_loaders.py @@ -0,0 +1,8 @@ +from tensorflow_time_series_dataset.loaders import CSVDataLoader + + +def test_csv_loader(tmp_csv_with_id): + test_data_path, test_df = tmp_csv_with_id + csv_loader = CSVDataLoader(test_data_path) + loaded_df = csv_loader() + assert loaded_df.equals(test_df.set_index("date_time")), "Error: CSVLoader failed" diff --git a/tests/test_pipleine.py b/tests/test_pipleine.py new file mode 100644 index 0000000..14e8100 --- /dev/null +++ b/tests/test_pipleine.py @@ -0,0 +1,227 @@ +import numpy as np +import pytest +import tensorflow as tf +from tensorflow_time_series_dataset.pipeline.patch_generator import PatchGenerator +from tensorflow_time_series_dataset.pipeline.patch_processor import PatchPreprocessor +from tensorflow_time_series_dataset.pipeline.windowed_time_series_pipeline import ( + WindowedTimeSeriesPipeline, +) +from tensorflow_time_series_dataset.preprocessors.groupby_dataset_generator import ( + GroupbyDatasetGenerator, +) +from tensorflow_time_series_dataset.utils.test import validate_args, validate_dataset + +# FIXTURES #################################################################### + + +@pytest.fixture +def groupby_dataset( + time_series_df_with_id, history_columns, meta_columns, prediction_columns +): + time_series_df_with_id.set_index("date_time", inplace=True) + used_cols = set(history_columns + meta_columns + prediction_columns) + gen = GroupbyDatasetGenerator("id", columns=used_cols) + return gen(time_series_df_with_id), time_series_df_with_id[list(used_cols) + ["id"]] + + +@pytest.fixture +def patched_dataset( + request, time_series_df, history_columns, meta_columns, prediction_columns +): + window_size, shift = request.param + df = time_series_df.set_index("date_time") + used_cols = set(history_columns + meta_columns + prediction_columns) + + ds = tf.data.Dataset.from_tensors(df[sorted(used_cols)]) + ds = ds.interleave( + PatchGenerator(window_size=window_size, shift=shift), + num_parallel_calls=tf.data.experimental.AUTOTUNE, + ) + return ds, df, window_size, shift + + +# TESTS ####################################################################### +@pytest.mark.parametrize("window_size,shift", [(2 * 48, 48), (48 + 1, 1)]) +def test_patch_generator(time_series_df, window_size, shift): + df = time_series_df.set_index("date_time") + + initial_size = window_size - shift + data_size = df.index.size - initial_size + patches = data_size // shift + + expected_shape = (window_size, len(df.columns)) + + ds = tf.data.Dataset.from_tensors(df) + ds_patched = ds.interleave( + PatchGenerator(window_size=window_size, shift=shift), + num_parallel_calls=tf.data.experimental.AUTOTUNE, + ) + for i, patch in enumerate(ds_patched.as_numpy_iterator()): + assert patch.shape == expected_shape, "Wrong shape" + x1 = patch[0, 0] + idx = int(x1 % 1e5) + expected_values = df.iloc[idx : idx + window_size] + assert np.all(patch == expected_values), "Patch contains wrong data" + assert i + 1 == patches, "Not enough patches" + + +@pytest.mark.parametrize("window_size,shift", [(2 * 48, 48), (48 + 1, 1)]) +def test_patch_generator_groupby(groupby_dataset, window_size, shift): + ds, df = groupby_dataset + records_per_id = df.groupby("id").size().max() + ids = df.id.unique() + columns = sorted([c for c in df.columns if c != "id"]) + + initial_size = window_size - shift + data_size = records_per_id - initial_size + patches = data_size / shift * len(ids) + expected_shape = (window_size, len(columns)) + + ds_patched = ds.interleave( + PatchGenerator(window_size=window_size, shift=shift), + num_parallel_calls=tf.data.experimental.AUTOTUNE, + ) + + for i, patch in enumerate(ds_patched.as_numpy_iterator()): + assert patch.shape == expected_shape, "Wrong shape" + if len(columns): + x1 = patch[0, 0] + id = int(x1 // 1e5) + idx = int(x1 % 1e5) + expected_values = df[df.id == id].iloc[idx : idx + window_size] + assert np.all( + patch == expected_values[columns].values + ), "Patch contains wrong data" + assert i + 1 == patches, "Not enough patches" + + +@pytest.mark.parametrize( + "patched_dataset", [(2 * 48, 48), (48 + 1, 1)], indirect=["patched_dataset"] +) +def test_batch_processor( + patched_dataset, + history_size, + batch_size, + history_columns, + meta_columns, + prediction_columns, +): + ds, df, window_size, shift = patched_dataset + prediction_size = window_size - history_size + + batch_kwargs = dict( + history_size=history_size, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + ) + + with validate_args( + history_size=history_size, + prediction_size=prediction_size, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + ): + ds_batched = ds.map( + PatchPreprocessor(**batch_kwargs), + num_parallel_calls=tf.data.experimental.AUTOTUNE, + ).batch(batch_size) + + validate_dataset( + df, + ds_batched, + batch_size=batch_size, + prediction_size=prediction_size, + shift=shift, + drop_remainder=False, + **batch_kwargs, + ) + + +def test_windowed_time_series_pipeline( + time_series_df, + batch_size, + history_size, + prediction_size, + shift, + history_columns, + meta_columns, + prediction_columns, +): + df = time_series_df.set_index("date_time") + used_cols = sorted( + set( + history_columns + meta_columns + prediction_columns, + ) + ) + + batch_kwargs = dict( + history_size=history_size, + prediction_size=prediction_size, + shift=shift, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + batch_size=batch_size, + drop_remainder=True, + ) + pipeline_kwargs = dict(cycle_length=1, shuffle_buffer_size=100, cache=True) + + with validate_args( + history_size=history_size, + prediction_size=prediction_size, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + ): + pipeline = WindowedTimeSeriesPipeline(**batch_kwargs, **pipeline_kwargs) + ds = tf.data.Dataset.from_tensors(df[used_cols]) + ds = pipeline(ds) + validate_dataset( + df, + ds, + **batch_kwargs, + ) + + +def test_windowed_time_series_pipeline_groupby( + groupby_dataset, + batch_size, + history_size, + prediction_size, + shift, + history_columns, + meta_columns, + prediction_columns, +): + ds, df = groupby_dataset + + ids = df.id.unique() + + batch_kwargs = dict( + history_size=history_size, + prediction_size=prediction_size, + shift=shift, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + batch_size=batch_size, + drop_remainder=False, + ) + pipeline_kwargs = dict(cycle_length=len(ids), shuffle_buffer_size=1000, cache=True) + + with validate_args( + history_size=history_size, + prediction_size=prediction_size, + history_columns=history_columns, + meta_columns=meta_columns, + prediction_columns=prediction_columns, + ): + pipeline = WindowedTimeSeriesPipeline(**batch_kwargs, **pipeline_kwargs) + ds = pipeline(ds) + validate_dataset( + df, + ds, + **batch_kwargs, + ) diff --git a/tests/test_preprocessors.py b/tests/test_preprocessors.py new file mode 100644 index 0000000..c80fbe9 --- /dev/null +++ b/tests/test_preprocessors.py @@ -0,0 +1,100 @@ +import numpy as np +import pandas as pd +import pytest +from tensorflow_time_series_dataset.preprocessors import ( + CyclicalFeatureEncoder, + GroupbyDatasetGenerator, + TimeSeriesSplit, +) + + +@pytest.fixture +def cycl_df(): + date_range = pd.date_range(start="1/1/1992", end="31/12/1992", freq="30min") + test_df = pd.DataFrame( + index=date_range, + data=np.stack( + [ + np.random.randint(0, 10, date_range.size), + np.random.randint(20, 100, date_range.size), + ], + axis=1, + ), + columns=["x1", "x2"], + ) + return test_df + + +def test_cyclical_data_encoder(cycl_df): + encs = { + "x1": dict(cycl_max=9), + "x2": dict(cycl_max=99, cycl_min=20), + "weekday": dict(cycl_max=6), + "dayofyear": dict(cycl_max=366, cycl_min=1), + "month": dict(cycl_max=12, cycl_min=1), + "time": dict( + cycl_max=24 * 60 - 1, + cycl_getter=lambda df, k: df.index.hour * 60 + df.index.minute, + ), + } + for name, kwargs in encs.items(): + enc = CyclicalFeatureEncoder(name, **kwargs) + enc_dat = enc(cycl_df) + cycl = enc.cycl_getter(cycl_df, name) + assert np.isclose( + enc.decode(enc_dat[name + "_sin"], enc_dat[name + "_cos"]), cycl + ).all(), "Decoding failed" + + +def test_cyclical_data_encoder_except(cycl_df): + encs = { + "x1": dict(cycl_max=cycl_df.x1.max() - 1), + "weekday": dict(cycl_max=cycl_df.index.weekday.max() - 1), + "dayofyear": dict(cycl_max=cycl_df.index.dayofyear.max() - 1), + "month": dict(cycl_max=cycl_df.index.month.max() - 1), + "time": dict( + cycl_max=(cycl_df.index.hour * 60 + cycl_df.index.minute).max() - 1, + cycl_getter=lambda df, k: df.index.hour * 60 + df.index.minute, + ), + } + for name, kwargs in encs.items(): + enc = CyclicalFeatureEncoder(name, **kwargs) + with pytest.raises(AssertionError): + enc(cycl_df) + + +def test_time_series_split(time_series_df_with_id): + df = time_series_df_with_id.set_index("date_time") + l_splitter = TimeSeriesSplit(0.5, TimeSeriesSplit.LEFT) + r_splitter = TimeSeriesSplit(0.5, TimeSeriesSplit.RIGHT) + l_split = l_splitter(df) + r_split = r_splitter(df) + assert not l_split.index.isin(r_split.index).any(), "Splits overlap" + assert l_split.index.max() < r_split.index.min(), "wrong split" + assert np.unique(l_split.index.date).size == np.floor( + np.unique(df.index.date).size / 2 + ) and np.unique(r_split.index.date).size == np.ceil( + np.unique(df.index.date).size / 2 + ), "Unexpected split size" + assert (r_split.groupby([r_split.index.date, "id"]).x1.count() == 48).all() and ( + l_split.groupby([l_split.index.date, "id"]).x1.count() == 48 + ).all(), "Incomplete Days in Split" + + +def test_groupby_dataset_generator(time_series_df_with_id): + df = time_series_df_with_id.set_index("date_time") + columns = list(sorted(df.columns)) + df = df[columns] + + records_per_id = df.groupby("id").size().max() + expected_shape = (records_per_id, len(columns)) + + idx_from_column = {c: i for i, c in enumerate(columns)} + gen = GroupbyDatasetGenerator("id", columns=columns) + for d in gen(df.sample(frac=1)): + assert d.shape == expected_shape, "Wrong shape" + id = int(d[0, idx_from_column["ref"]] // 1e5) + expected_values = df[df.id == id].sort_index().values + assert np.all( + d.numpy() == expected_values, + ), "Error: DatasetGenerator failed"