From cb8b7fa8296156f0d72f70dd4f320f409a7e4509 Mon Sep 17 00:00:00 2001 From: Lucas Pedroza <40873230+pnwpedro@users.noreply.github.com> Date: Mon, 20 May 2024 18:33:43 +0200 Subject: [PATCH 1/2] Add streaming support (#177) Co-authored-by: fauna-chase <73842483+fauna-chase@users.noreply.github.com> Co-authored-by: James Rodewig --- .github/workflows/build_and_test.yml | 31 +- .github/workflows/validate-readme.yml | 6 +- README.md | 434 ++++++++++++++++++ README.rst | 387 ---------------- docker/Dockerfile | 10 + docker/docker-compose.yml | 38 ++ docker/feature-flags.json | 27 ++ fauna/client/__init__.py | 2 +- fauna/client/client.py | 339 +++++++------- fauna/client/retryable.py | 20 +- fauna/encoding/decoder.py | 8 +- fauna/encoding/encoder.py | 11 +- fauna/errors/errors.py | 197 +++++++- fauna/http/http_client.py | 4 +- fauna/http/httpx_client.py | 17 +- fauna/query/models.py | 17 +- setup.py | 6 +- tests/integration/conftest.py | 17 + .../test_client_with_query_limits.py | 2 +- tests/integration/test_stream.py | 235 ++++++++++ tests/unit/test_client.py | 163 ++++++- tests/unit/test_encoding.py | 16 +- tests/unit/test_httpx_client.py | 44 ++ 23 files changed, 1411 insertions(+), 620 deletions(-) create mode 100644 README.md delete mode 100644 README.rst create mode 100644 docker/Dockerfile create mode 100644 docker/docker-compose.yml create mode 100644 docker/feature-flags.json create mode 100644 tests/integration/test_stream.py create mode 100644 tests/unit/test_httpx_client.py diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 3495ee27..d5a3b072 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -19,48 +19,27 @@ jobs: - "3.11" - "3.10" - "3.9" - fauna-docker-service: - - name: "fauna/faunadb:latest" - host: "core" - port: "8443" - # if we had a nightly image, we could run the testsuite against it by uncommented this - # - name: "fauna/faunadb:nightly" - # host: "localhost" - # port: "8443" timeout-minutes: 5 runs-on: ubuntu-latest - container: - image: python:${{ matrix.python-version}} - - services: - core: - image: ${{ matrix.fauna-docker-service.name }} - steps: - uses: actions/checkout@v3 - name: Show file descriptor limit run: ulimit -a - - name: "Install ci dependencies" - run: pip install . .[test] .[lint] --use-pep517 + - name: Build docker + run: docker-compose -f docker/docker-compose.yml build --build-arg BASE_IMG=python:${{ matrix.python-version }} --no-cache - name: Run unit tests - run: pytest -v --cov=fauna --cov-context=test tests/unit + run: docker-compose -f docker/docker-compose.yml run --rm unit-test - name: Run integration tests - run: pytest -v --cov=fauna --cov-context=test tests/integration - # To get more insight into tests which are only flaky when run in github actions -- use commands like below - # run: env HTTPX_LOG_LEVEL=trace pytest --capture=no -v --cov=fauna --cov-context=test -k test_stream_max_open_streams - env: - FAUNA_ENDPOINT: "http://${{ matrix.fauna-docker-service.host }}:${{ matrix.fauna-docker-service.port }}" - FAUNA_ROOT_KEY: secret - USE_GITHUB_ACTION_OVERRIDES: 1 + run: docker-compose -f docker/docker-compose.yml run --rm integration-test - name: Generate coverage html report with dynamic contexts - run: coverage html --show-contexts + run: docker-compose -f docker/docker-compose.yml run --rm coverage - uses: actions/upload-artifact@v3 with: diff --git a/.github/workflows/validate-readme.yml b/.github/workflows/validate-readme.yml index 2b0a4db6..1c475777 100644 --- a/.github/workflows/validate-readme.yml +++ b/.github/workflows/validate-readme.yml @@ -2,7 +2,7 @@ name: Validate README on: push: paths: - - 'README.rst' + - 'README.md' jobs: validate: @@ -19,5 +19,5 @@ jobs: - name: Install dependencies run: pip install readme_renderer 'readme_renderer[md]' - - name: Validate readme.rst - run: python -m readme_renderer README.rst + - name: Validate README.md + run: python -m readme_renderer README.md diff --git a/README.md b/README.md new file mode 100644 index 00000000..370b154c --- /dev/null +++ b/README.md @@ -0,0 +1,434 @@ +# The Official Python Driver for [Fauna](https://fauna.com). + +[![Pypi Version](https://img.shields.io/pypi/v/fauna.svg?maxAge=21600)](https://pypi.python.org/pypi/fauna) +[![License](https://img.shields.io/badge/license-MPL_2.0-blue.svg?maxAge=2592000)](https://raw.githubusercontent.com/fauna/fauna-python/main/LICENSE) + +This driver can only be used with FQL v10, and is not compatible with earlier versions +of FQL. To query your databases with earlier API versions, see +the [faunadb](https://pypi.org/project/faunadb/) package. + +See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) +for additional information how to configure and query your databases. + +## Installation +Pre-release installations must specify the version you want to install. Find the version you want to install on [PyPI](https://pypi.org/project/fauna/#history). +```bash +pip install fauna== +``` + +## Compatibility + +The following versions of Python are supported: + +* Python 3.9 +* Python 3.10 +* Python 3.11 +* Python 3.12 + + +## Basic Usage +You can expect a ``Client`` instance to have reasonable defaults, like the Fauna endpoint ``https://db.fauna.com`` and a global HTTP client, but you will always need to configure a secret. + +You can configure your secret by passing it directly to the client or by setting an environment variable. + +Supported Environment Variables: + +* ``FAUNA_ENDPOINT``: The Fauna endpoint to use. For example, ``http://localhost:8443`` +* ``FAUNA_SECRET``: The Fauna secret to use. + +```python +from fauna import fql +from fauna.client import Client +from fauna.encoding import QuerySuccess +from fauna.errors import FaunaException + +client = Client() +# The client defaults to using using the value stored FAUNA_SECRET for its secret. +# Either set the FAUNA_SECRET env variable or retrieve it from a secret store. +# As a best practice, don't store your secret directly in your code. + +try: + # create a collection + q1 = fql('Collection.create({ name: "Dogs" })') + client.query(q1) + + # create a document + q2 = fql('Dogs.create({ name: "Scout" })') + res: QuerySuccess = client.query(q2) + doc = res.data + print(doc) +except FaunaException as e: + # handle errors + print(e) +``` + +## Query Composition + +This driver supports query composition with Python primitives, lists, dicts, and other FQL queries. + +For FQL templates, denote variables with ``${}`` and pass variables as kwargs to ``fql()``. You can escape a variable by prepending an additional ``$``. + +```python +from fauna import fql +from fauna.client import Client + +client = Client() + +def add_two(x): + return fql("${x} + 2", x=x) + +q = fql("${y} + 4", y=add_two(2)) +res = client.query(q) +print(res.data) # 8 +``` + +## Serialization / Deserialization + +Serialization and deserialization with user-defined classes is not yet supported. + +When building queries, adapt your classes into dicts or lists prior to using them in composition. When instantiating classes from the query result data, build them from the expected result. + +```python +class MyClass: + def __init__ (self, my_prop): + self.my_prop = my_prop + + def to_dict(self): + return { 'my_prop': self.my_prop } + + @static_method + def from_result(obj): + return MyClass(obj['my_prop']) +``` + +## Client Configuration + +### Max Attempts +The maximum number of times a query will be attempted if a retryable exception is thrown (ThrottlingError). Default 3, inclusive of the initial call. The retry strategy implemented is a simple exponential backoff. + +To disable retries, pass max_attempts less than or equal to 1. + +### Max Backoff +The maximum backoff in seconds to be observed between each retry. Default 20 seconds. + +### Timeouts + +There are a few different timeout settings that can be configured; each comes with a default setting. We recommend that most applications use the defaults. + +### Query Timeout +The query timeout is the time, as ``datetime.timedelta``, that Fauna will spend executing your query before aborting with a ``QueryTimeoutError``. + +The query timeout can be set using the ``query_timeout`` option. The default value if you do not provide one is ``DefaultClientBufferTimeout`` (5 seconds). + + +```python +from datetime import timedelta +from fauna.client import Client + +client = Client(query_timeout=timedelta(seconds=20)) +``` + +The query timeout can also be set to a different value for each query using the ``QueryOptions.query_timeout`` option. Doing so overrides the client configuration when performing this query. + +```python +from datetime import timedelta +from fauna.client import Client, QueryOptions + +response = client.query(myQuery, QueryOptions(query_timeout=timedelta(seconds=20))) +``` + +### Client Timeout + +The client timeout is the time, as ``datetime.timedelta``, that the client will wait for a network response before canceling the request. If a client timeout occurs, the driver will throw an instance of ``NetworkError``. + +The client timeout is always the query timeout plus an additional buffer. This ensures that the client always waits for at least as long Fauna could work on your query and account for network latency. + +The client timeout buffer is configured by setting the ``client_buffer_timeout`` option. The default value for the buffer if you do not provide on is ``DefaultClientBufferTimeout`` (5 seconds), therefore the default client timeout is 10 seconds when considering the default query timeout. + + +```python +from datetime import timedelta +from fauna.client import Client + +client = Client(client_buffer_timeout=timedelta(seconds=20)) +``` +### Idle Timeout + +The idle timeout is the time, as ``datetime.timedelta``, that a session will remain open after there is no more pending communication. Once the session idle time has elapsed the session is considered idle and the session is closed. Subsequent requests will create a new session; the session idle timeout does not result in an error. + +Configure the idle timeout using the ``http_idle_timeout`` option. The default value if you do not provide one is ``DefaultIdleConnectionTimeout`` (5 seconds). + +```python +from datetime import timedelta +from fauna.client import Client + +client = Client(http_idle_timeout=timedelta(seconds=6)) +``` + +> **Note** +> Your application process may continue executing after all requests are completed for the duration of the session idle timeout. To prevent this, it is recommended to call ``Client.close()`` once all requests are complete. It is not recommended to set ``http_idle_timeout`` to small values. + +### Connect Timeout + +The connect timeout is the maximum amount of time, as ``datetime.timedelta``, to wait until a connection to Fauna is established. If the client is unable to connect within this time frame, a ``ConnectTimeout`` exception is raised. + +Configure the connect timeout using the ``http_connect_timeout`` option. The default value if you do not provide one is ``DefaultHttpConnectTimeout`` (5 seconds). + + +```python +from datetime import timedelta +from fauna.client import Client + +client = Client(http_connect_timeout=timedelta(seconds=6)) +``` +### Pool Timeout + +The pool timeout specifies the maximum amount of time, as ``datetime.timedelta``, to wait for acquiring a connection from the connection pool. If the client is unable to acquire a connection within this time frame, a ``PoolTimeout`` exception is raised. This timeout may fire if 20 connections are currently in use and one isn't released before the timeout is up. + +Configure the pool timeout using the ``http_pool_timeout`` option. The default value if you do not provide one is ``DefaultHttpPoolTimeout`` (5 seconds). + +```python +from datetime import timedelta +from fauna.client import Client + +client = Client(http_pool_timeout=timedelta(seconds=6)) +``` +### Read Timeout + +The read timeout specifies the maximum amount of time, as ``datetime.timedelta``, to wait for a chunk of data to be received (for example, a chunk of the response body). If the client is unable to receive data within this time frame, a ``ReadTimeout`` exception is raised. + +Configure the read timeout using the ``http_read_timeout`` option. The default value if you do not provide one is ``DefaultHttpReadTimeout`` (None). + +```python +from datetime import timedelta +from fauna.client import Client + +client = Client(http_read_timeout=timedelta(seconds=6)) +``` + +### Write Timeout + +The write timeout specifies the maximum amount of time, as ``datetime.timedelta``, to wait for a chunk of data to be sent (for example, a chunk of the request body). If the client is unable to send data within this time frame, a ``WriteTimeout`` exception is raised. + +Configure the write timeout using the ``http_write_timeout`` option. The default value if you do not provide one is ``DefaultHttpWriteTimeout`` (5 seconds). + +```python +from datetime import timedelta +from fauna.client import Client + +client = Client(http_write_timeout=timedelta(seconds=6)) +``` +## Query Stats + +Stats are returned on query responses and ServiceErrors. + +```python +from fauna import fql +from fauna.client import Client +from fauna.encoding import QuerySuccess, QueryStats +from fauna.errors import ServiceError + +client = Client() + +def emit_stats(stats: QueryStats): + print(f"Compute Ops: {stats.compute_ops}") + print(f"Read Ops: {stats.read_ops}") + print(f"Write Ops: {stats.write_ops}") + +try: + q = fql('Collection.create({ name: "Dogs" })') + qs: QuerySuccess = client.query(q) + emit_stats(qs.stats) +except ServiceError as e: + if e.stats is not None: + emit_stats(e.stats) + # more error handling... +``` +## Event Streaming + +The driver supports `Event Streaming `_. + +_Start a stream_ + +To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a +`supported source +`_. + + +To start and subscribe to the stream, pass the stream token to +``Client.stream()``: + +```python + from fauna import fql + from fauna.client import Client + + client = Client() + + response = client.query(fql(''' + let set = Product.all() + { + initialPage: set.pageSize(10), + streamToken: set.toStream() + } + ''')) + + initialPage = response.data['initialPage'] + streamToken = response.data['streamToken'] + + client.stream(streamToken) +``` + +You can also pass a query that produces a stream token directly to +``Client.stream()``: + +```python + query = fql('Product.all().changesOn(.price, .quantity)') + + client.stream(query) +``` + +_Iterate on a stream_ + +``Client.stream()`` returns an iterator that emits events as they occur. You can +use a generator expression to iterate through the events: + +```python +query = fql('Product.all().changesOn(.price, .quantity)') + +with client.stream(query) as stream: + for event in stream: + eventType = event['type'] + if (eventType == 'add'): + print('Add event: ', event) + ## ... + elif (eventType == 'update'): + print('Update event: ', event) + ## ... + elif (eventType == 'remove'): + print('Remove event: ', event) + ## ... +``` + +_Close a stream_ + +Use ``.close()`` to close a stream: + +```python +query = fql('Product.all().changesOn(.price, .quantity)') + +count = 0 +with client.stream(query) as stream: + for event in stream: + print('Stream event', event) + # ... + count+=1 + + if (count == 2): + stream.close() +``` + +_Error handling_ + +If a non-retryable error occurs when opening or processing a stream, Fauna +raises a ``FaunaException``: + +```python +from fauna import fql +from fauna.client import Client +from fauna.errors import FaunaException + +client = Client() + +try: + with client.stream(fql( + 'Product.all().changesOn(.price, .quantity)' + )) as stream: + for event in stream: + print(event) + # ... +except FaunaException as e: + print('error ocurred with stream: ', e) +``` +_Stream options_ + +The client configuration sets default options for the ``Client.stream()`` +method. + +You can pass a ``StreamOptions`` object to override these defaults: + +```python +options = StreamOptions( + max_attempts=5, + max_backoff=1, + start_ts=1710968002310000, + status_events=True, + ) + +client.stream(fql('Product.all().toStream()'), options) +``` + +## Setup + +```bash +virtualenv venv +source venv/bin/activate +pip install . .[test] .[lint] +``` + +## Testing + +We use pytest. You can run tests directly or with docker. If you run integration tests directly, you must have fauna running locally. + +If you want to run fauna, then run integration tests separately: + +```bash +make run-fauna +source venv/bin/activate +make install +make integration-test +``` + +To run unit tests locally: + +```bash +source venv/bin/activate +make install +make unit-test +``` + +To stand up a container and run all tests at the same time: + + +```bash +make docker-test +``` + +See the ``Makefile`` for more. + +## Coverage + +```bash +source venv/bin/activate +make coverage +``` + +## Contribute + +GitHub pull requests are very welcome. + + +## License + +Copyright 2023 [Fauna, Inc.](https://fauna.com) + +Licensed under the Mozilla Public License, Version 2.0 (the +"License"); you may not use this software except in compliance with +the License. You can obtain a copy of the License at + +[http://mozilla.org/MPL/2.0/](http://mozilla.org/MPL/2.0/>) + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. diff --git a/README.rst b/README.rst deleted file mode 100644 index 89ee1bfa..00000000 --- a/README.rst +++ /dev/null @@ -1,387 +0,0 @@ -The Official Python Driver for `Fauna `_. -============================================================ - -.. image:: https://img.shields.io/pypi/v/fauna.svg?maxAge=21600 - :target: https://pypi.python.org/pypi/fauna -.. image:: https://img.shields.io/badge/license-MPL_2.0-blue.svg?maxAge=2592000 - :target: https://raw.githubusercontent.com/fauna/fauna-python/main/LICENSE - -This driver can only be used with FQL v10, and is not compatible with earlier versions -of FQL. To query your databases with earlier API versions, see -the `faunadb `_ package. - -See the `Fauna Documentation `_ -for additional information on how to configure and query your databases. - - -Installation ------------- - -.. code-block:: bash - - pip install fauna - - -Compatibility -------------- - -The following versions of Python are supported: - -* Python 3.9 -* Python 3.10 -* Python 3.11 -* Python 3.12 - - -Basic Usage -------------- -You can expect a ``Client`` instance to have reasonable defaults, like the Fauna endpoint ``https://db.fauna.com`` and a global HTTP client, but you will always need to configure a secret. - -You can configure your secret by passing it directly to the client or by setting an environment variable. - -Supported Environment Variables: - -* ``FAUNA_ENDPOINT``: The Fauna endpoint to use. For example, ``http://localhost:8443`` -* ``FAUNA_SECRET``: The Fauna secret to use. - -.. code-block:: python - - from fauna import fql - from fauna.client import Client - from fauna.encoding import QuerySuccess - from fauna.errors import FaunaException - - client = Client() - # The client defaults to using the value stored FAUNA_SECRET for its secret. - # Either set the FAUNA_SECRET env variable or retrieve it from a secret store. - # As a best practice, don't store your secret directly in your code. - - try: - # create a collection - q1 = fql('Collection.create({ name: "Dogs" })') - client.query(q1) - - # create a document - q2 = fql('Dogs.create({ name: "Scout" })') - res: QuerySuccess = client.query(q2) - doc = res.data - print(doc) - except FaunaException as e: - # handle errors - print(e) - -Query Composition ------------------ - -This driver supports query composition with Python primitives, lists, dicts, and other FQL queries. - -For FQL templates, denote variables with ``${}`` and pass variables as kwargs to ``fql()``. You can escape a variable by prepending an additional ``$``. - -.. code-block:: python - - from fauna import fql - from fauna.client import Client - - client = Client() - - def add_two(x): - return fql("${x} + 2", x=x) - - q = fql("${y} + 4", y=add_two(2)) - res = client.query(q) - print(res.data) # 8 - -Serialization / Deserialization -------------------------------- - -Serialization and deserialization with user-defined classes is not yet supported. - -When building queries, adapt your classes into dicts or lists before using them in composition. When instantiating classes from the query result data, build them from the expected result. - -.. code-block:: python - - class MyClass: - def __init__ (self, my_prop): - self.my_prop = my_prop - - def to_dict(self): - return { 'my_prop': self.my_prop } - - @static_method - def from_result(obj): - return MyClass(obj['my_prop']) - -Client Configuration --------------------- - -Max Attempts ------------- -The maximum number of times a query will be attempted if a retryable exception is thrown (ThrottlingError). Default 3, inclusive of the initial call. The retry strategy implemented is a simple exponential backoff. - -To disable retries, pass max_attempts less than or equal to 1. - -Max Backoff ------------- -The maximum backoff in seconds to be observed between each retry. Default 20 seconds. - -Timeouts --------- - -There are a few different timeout settings that can be configured; each comes with a default setting. We recommend that most applications use the defaults. - -Query Timeout -------------- - -The query timeout is the time, as ``datetime.timedelta``, that Fauna will spend executing your query before aborting with a ``QueryTimeoutError``. - -The query timeout can be set using the ``query_timeout`` option. The default value if you do not provide one is ``DefaultClientBufferTimeout`` (5 seconds). - -.. code-block:: python - - from datetime import timedelta - from fauna.client import Client - - client = Client(query_timeout=timedelta(seconds=20)) - -The query timeout can also be set to a different value for each query using the ``QueryOptions.query_timeout`` option. Doing so overrides the client configuration when performing this query. - -.. code-block:: python - - from datetime import timedelta - from fauna.client import Client, QueryOptions - - response = client.query(myQuery, QueryOptions(query_timeout=timedelta(seconds=20))) - -Client Timeout --------------- - -The client timeout is the time, as ``datetime.timedelta``, that the client will wait for a network response before canceling the request. If a client timeout occurs, the driver will throw an instance of ``NetworkError``. - -The client timeout is always the query timeout plus an additional buffer. This ensures that the client always waits for at least as long Fauna could work on your query and account for network latency. - -The client timeout buffer is configured by setting the ``client_buffer_timeout`` option. The default value for the buffer if you do not provide on is ``DefaultClientBufferTimeout`` (5 seconds), therefore the default client timeout is 10 seconds when considering the default query timeout. - -.. code-block:: python - - from datetime import timedelta - from fauna.client import Client - - client = Client(client_buffer_timeout=timedelta(seconds=20)) - - -Idle Timeout ------------- - -The idle timeout is the time, as ``datetime.timedelta``, that a session will remain open after there is no more pending communication. Once the session idle time has elapsed the session is considered idle and the session is closed. Subsequent requests will create a new session; the session idle timeout does not result in an error. - -Configure the idle timeout using the ``http_idle_timeout`` option. The default value if you do not provide one is ``DefaultIdleConnectionTimeout`` (5 seconds). - -.. code-block:: python - - from datetime import timedelta - from fauna.client import Client - - client = Client(http_idle_timeout=timedelta(seconds=6)) - -> **Note** -> Your application process may continue executing after all requests are completed for the duration of the session idle timeout. To prevent this, it is recommended to call ``Client.close()`` once all requests are complete. It is not recommended to set ``http_idle_timeout`` to small values. - -Connect Timeout ---------------- - -The connect timeout is the maximum amount of time, as ``datetime.timedelta``, to wait until a connection to Fauna is established. If the client is unable to connect within this time frame, a ``ConnectTimeout`` exception is raised. - -Configure the connect timeout using the ``http_connect_timeout`` option. The default value if you do not provide one is ``DefaultHttpConnectTimeout`` (5 seconds). - -.. code-block:: python - - from datetime import timedelta - from fauna.client import Client - - client = Client(http_connect_timeout=timedelta(seconds=6)) - -Pool Timeout ------------- - -The pool timeout specifies the maximum amount of time, as ``datetime.timedelta``, to wait for acquiring a connection from the connection pool. If the client is unable to acquire a connection within this time frame, a ``PoolTimeout`` exception is raised. This timeout may fire if 20 connections are currently in use and one isn't released before the timeout is up. - -Configure the pool timeout using the ``http_pool_timeout`` option. The default value if you do not provide one is ``DefaultHttpPoolTimeout`` (5 seconds). - -.. code-block:: python - - from datetime import timedelta - from fauna.client import Client - - client = Client(http_pool_timeout=timedelta(seconds=6)) - -Read Timeout ------------- - -The read timeout specifies the maximum amount of time, as ``datetime.timedelta``, to wait for a chunk of data to be received (for example, a chunk of the response body). If the client is unable to receive data within this time frame, a ``ReadTimeout`` exception is raised. - -Configure the read timeout using the ``http_read_timeout`` option. The default value if you do not provide one is ``DefaultHttpReadTimeout`` (None). - -.. code-block:: python - - from datetime import timedelta - from fauna.client import Client - - client = Client(http_read_timeout=timedelta(seconds=6)) - -Write Timeout -------------- - -The write timeout specifies the maximum amount of time, as ``datetime.timedelta``, to wait for a chunk of data to be sent (for example, a chunk of the request body). If the client is unable to send data within this time frame, a ``WriteTimeout`` exception is raised. - -Configure the write timeout using the ``http_write_timeout`` option. The default value if you do not provide one is ``DefaultHttpWriteTimeout`` (5 seconds). - -.. code-block:: python - - from datetime import timedelta - from fauna.client import Client - - client = Client(http_write_timeout=timedelta(seconds=6)) - -Query Stats ------------ - -Stats are returned on query responses and ServiceErrors. - -.. code-block:: python - - from fauna import fql - from fauna.client import Client - from fauna.encoding import QuerySuccess, QueryStats - from fauna.errors import ServiceError - - client = Client() - - def emit_stats(stats: QueryStats): - print(f"Compute Ops: {stats.compute_ops}") - print(f"Read Ops: {stats.read_ops}") - print(f"Write Ops: {stats.write_ops}") - - try: - q = fql('Collection.create({ name: "Dogs" })') - qs: QuerySuccess = client.query(q) - emit_stats(qs.stats) - except ServiceError as e: - if e.stats is not None: - emit_stats(e.stats) - # more error handling... - -Pagination ------------------- - -Use the ``Client.paginate()`` method to iterate sets that contain more than one -page of results. - -``Client.paginate()`` accepts the same query options as ``Client.query()``. - -Change the default items per page using FQL's ``.pageSize()`` method. - -.. code-block:: python - - from datetime import timedelta - from fauna import fql - from fauna.client import Client, QueryOptions - - # Adjust `pageSize()` size as needed. - query = fql( - """ - Product - .byName("limes") - .pageSize(60) { description }""" - ) - - client = Client() - - options = QueryOptions(query_timeout=timedelta(seconds=20)) - - pages = client.paginate(query, options) - - for products in pages: - for product in products: - print(products) - -Event Streaming ------------------- - -`Event Streaming `_ is currently available in the beta version of the driver: - -- `Beta Python driver `_ -- `Beta Python driver docs `_ - -Setup ------ - -.. code-block:: bash - - $ virtualenv venv - $ source venv/bin/activate - $ pip install . .[test] .[lint] - - -Testing -------- - -We use pytest. You can run tests directly or with docker. If you run integration tests directly, you must have fauna running locally. - -If you want to run fauna, then run integration tests separately: - -.. code-block:: bash - - $ make run-fauna - $ source venv/bin/activate - $ make install - $ make integration-test - -To run unit tests locally: - -.. code-block:: bash - - $ source venv/bin/activate - $ make install - $ make unit-test - -To stand up a container and run all tests at the same time: - -.. code-block:: bash - - $ make docker-test - -See the ``Makefile`` for more. - -Coverage --------- - -.. code-block:: bash - - $ source venv/bin/activate - $ make coverage - -Contribute ----------- - -GitHub pull requests are very welcome. - - -License -------- - -Copyright 2023 `Fauna, Inc. `_ - -Licensed under the Mozilla Public License, Version 2.0 (the -"License"); you may not use this software except in compliance with -the License. You can obtain a copy of the License at - -`http://mozilla.org/MPL/2.0/ `_ - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -implied. See the License for the specific language governing -permissions and limitations under the License. - - -.. _`tests`: https://github.com/fauna/fauna-python/blob/main/tests/ diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 00000000..0620f3df --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,10 @@ +ARG BASE_IMG=? + +FROM $BASE_IMG + +WORKDIR /fauna-python +VOLUME /fauna-python + +COPY . /fauna-python/ + +RUN cd /fauna-python && pip install . .[test] .[lint] --use-pep517 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 00000000..7127c868 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,38 @@ +version: "3" + +services: + db: + image: fauna/faunadb:latest + ports: + - "8443:8443" + volumes: + - ../docker/feature-flags.json:/etc/feature-flag-periodic.d/feature-flags.json + test: + image: fauna-python-test:latest + build: + context: ../ + dockerfile: docker/Dockerfile + integration-test: + image: fauna-python-test:latest + volumes: + - ..:/fauna-python + command: + - /bin/bash + - -cx + - | + while ! curl -s --fail -m 1 http://db:8443/ping 2>&1; do sleep 3; done + pytest -v --cov=fauna --cov-context=test tests/integration + environment: + - FAUNA_ENDPOINT=http://db:8443 + depends_on: + - db + unit-test: + image: fauna-python-test:latest + volumes: + - ..:/fauna-python + command: pytest -v --cov=fauna --cov-context=test tests/unit + coverage: + image: fauna-python-test:latest + volumes: + - ..:/fauna-python + command: coverage html --show-contexts diff --git a/docker/feature-flags.json b/docker/feature-flags.json new file mode 100644 index 00000000..2fc48a0a --- /dev/null +++ b/docker/feature-flags.json @@ -0,0 +1,27 @@ +{ + "version": 1, + "properties": [ + { + "property_name": "cluster_name", + "property_value": "fauna", + "flags": { + "fql2_schema": true, + "fqlx_typecheck_default": true, + "persisted_fields": true, + "changes_by_collection_index": true, + "fql2_streams": true + } + }, + { + "property_name": "account_id", + "property_value": 0, + "flags": { + "fql2_schema": true, + "fqlx_typecheck_default": true, + "persisted_fields": true, + "changes_by_collection_index": true, + "fql2_streams": true + } + } + ] +} diff --git a/fauna/client/__init__.py b/fauna/client/__init__.py index 88446fb6..73ad357a 100644 --- a/fauna/client/__init__.py +++ b/fauna/client/__init__.py @@ -1,3 +1,3 @@ -from .client import Client, QueryOptions +from .client import Client, QueryOptions, StreamOptions from .endpoints import Endpoints from .headers import Header diff --git a/fauna/client/client.py b/fauna/client/client.py index ecc319c9..a4a6ce42 100644 --- a/fauna/client/client.py +++ b/fauna/client/client.py @@ -1,15 +1,16 @@ from datetime import timedelta from dataclasses import dataclass -from typing import Any, Dict, Iterator, Mapping, Optional, List +from typing import Any, Dict, Iterator, Mapping, Optional, List, Union +from contextlib import contextmanager import fauna from fauna.client.retryable import Retryable -from fauna.errors import AuthenticationError, ClientError, ProtocolError, ServiceError, AuthorizationError, \ - ServiceInternalError, ServiceTimeoutError, ThrottlingError, QueryTimeoutError, QueryRuntimeError, \ - QueryCheckError, ContendedTransactionError, AbortError, InvalidRequestError +from fauna.errors import FaunaError, ClientError, ProtocolError, \ + RetryableFaunaException, NetworkError from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header from fauna.http.http_client import HTTPClient from fauna.query import Query, Page, fql +from fauna.query.models import StreamToken from fauna.client.utils import _Environment, LastTxnTs from fauna.encoding import FaunaEncoder, FaunaDecoder from fauna.encoding import QuerySuccess, ConstraintFailure, QueryTags, QueryStats @@ -48,6 +49,26 @@ class QueryOptions: additional_headers: Optional[Dict[str, str]] = None +@dataclass +class StreamOptions: + """ + A dataclass representing options available for a stream. + + * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown. + * max_backoff - The maximum backoff in seconds for an individual retry. + * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after + the timestamp. + * status_events - Indicates if stream should include status events. Status events are periodic events that + update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics + about the cost of maintaining the stream other than the cost of the received events. + """ + + max_attempts: Optional[int] = None + max_backoff: Optional[int] = None + start_ts: Optional[int] = None + status_events: bool = False + + class Client: def __init__( @@ -275,7 +296,7 @@ def query( except Exception as e: raise ClientError("Failed to encode Query") from e - retryable = Retryable( + retryable = Retryable[QuerySuccess]( self._max_attempts, self._max_backoff, self._query, @@ -350,7 +371,7 @@ def _query( dec: Any = FaunaDecoder.decode(response_json) if status_code > 399: - self._handle_error(dec, status_code) + FaunaError.parse_error_and_throw(dec, status_code) if "txn_ts" in dec: self.set_last_txn_ts(int(response_json["txn_ts"])) @@ -375,6 +396,42 @@ def _query( schema_version=schema_version, ) + def stream( + self, + fql: Union[StreamToken, Query], + opts: StreamOptions = StreamOptions() + ) -> "StreamIterator": + """ + Opens a Stream in Fauna and returns an iterator that consume Fauna events. + + :param fql: A Query that returns a StreamToken or a StreamToken. + :param opts: (Optional) Stream Options. + + :return: a :class:`StreamIterator` + + :raises NetworkError: HTTP Request failed in transit + :raises ProtocolError: HTTP error not from Fauna + :raises ServiceError: Fauna returned an error + :raises ValueError: Encoding and decoding errors + :raises TypeError: Invalid param types + """ + + if isinstance(fql, Query): + token = self.query(fql).data + else: + token = fql + + if not isinstance(token, StreamToken): + err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}." + raise TypeError(err_msg) + + headers = self._headers.copy() + headers[_Header.Format] = "tagged" + headers[_Header.Authorization] = self._auth.bearer() + + return StreamIterator(self._session, headers, self._endpoint + "/stream/1", + self._max_attempts, self._max_backoff, opts, token) + def _check_protocol(self, response_json: Any, status_code): # TODO: Logic to validate wire protocol belongs elsewhere. should_raise = False @@ -398,177 +455,6 @@ def _check_protocol(self, response_json: Any, status_code): f"Response is in an unknown format: \n{response_json}", ) - def _handle_error(self, body: Any, status_code: int): - err = body["error"] - code = err["code"] - message = err["message"] - - query_tags = QueryTags.decode( - body["query_tags"]) if "query_tags" in body else None - stats = QueryStats(body["stats"]) if "stats" in body else None - txn_ts = body["txn_ts"] if "txn_ts" in body else None - schema_version = body["schema_version"] if "schema_version" in body else None - summary = body["summary"] if "summary" in body else None - - constraint_failures: Optional[List[ConstraintFailure]] = None - if "constraint_failures" in err: - constraint_failures = [ - ConstraintFailure( - message=cf["message"], - name=cf["name"] if "name" in cf else None, - paths=cf["paths"] if "paths" in cf else None, - ) for cf in err["constraint_failures"] - ] - - if status_code == 400: - if code == "invalid_query": - raise QueryCheckError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - elif code == "invalid_request": - raise InvalidRequestError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - elif code == "abort": - abort = err["abort"] if "abort" in err else None - raise AbortError( - status_code=status_code, - code=code, - message=message, - summary=summary, - abort=abort, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - - else: - raise QueryRuntimeError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - elif status_code == 401: - raise AuthenticationError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - elif status_code == 403: - raise AuthorizationError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - elif status_code == 409: - raise ContendedTransactionError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - elif status_code == 429: - raise ThrottlingError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - elif status_code == 440: - raise QueryTimeoutError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - elif status_code == 500: - raise ServiceInternalError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - elif status_code == 503: - raise ServiceTimeoutError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - else: - raise ServiceError( - status_code=status_code, - code=code, - message=message, - summary=summary, - constraint_failures=constraint_failures, - query_tags=query_tags, - stats=stats, - txn_ts=txn_ts, - schema_version=schema_version, - ) - def _set_endpoint(self, endpoint): if endpoint is None: endpoint = _Environment.EnvFaunaEndpoint() @@ -579,6 +465,105 @@ def _set_endpoint(self, endpoint): self._endpoint = endpoint +class StreamIterator: + """A class that mixes a ContextManager and an Iterator so we can detected retryable errors.""" + + def __init__(self, http_client: HTTPClient, headers: Dict[str, str], + endpoint: str, max_attempts: int, max_backoff: int, + opts: StreamOptions, token: StreamToken): + self._http_client = http_client + self._headers = headers + self._endpoint = endpoint + self._max_attempts = max_attempts + self._max_backoff = max_backoff + self._opts = opts + self._token = token + self._stream = None + self.last_ts = None + self._ctx = self._create_stream() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + if self._stream is not None: + self._stream.close() + + self._ctx.__exit__(exc_type, exc_value, exc_traceback) + return False + + def __iter__(self): + return self + + def __next__(self): + if self._opts.max_attempts is not None: + max_attempts = self._opts.max_attempts + else: + max_attempts = self._max_attempts + + if self._opts.max_backoff is not None: + max_backoff = self._opts.max_backoff + else: + max_backoff = self._max_backoff + + retryable = Retryable[Any](max_attempts, max_backoff, self._next_element) + return retryable.run().response + + def _next_element(self): + try: + if self._stream is None: + try: + self._stream = self._ctx.__enter__() + except Exception: + self._retry_stream() + + if self._stream is not None: + event: Any = FaunaDecoder.decode(next(self._stream)) + + if event["type"] == "error": + FaunaError.parse_error_and_throw(event, 400) + + self.last_ts = event["txn_ts"] + + if event["type"] == "start": + return self._next_element() + + if not self._opts.status_events and event["type"] == "status": + return self._next_element() + + return event + + raise StopIteration + except NetworkError: + self._retry_stream() + + def _retry_stream(self): + if self._stream is not None: + self._stream.close() + + self._stream = None + + try: + self._ctx = self._create_stream() + except Exception: + pass + raise RetryableFaunaException + + def _create_stream(self): + data: Dict[str, Any] = {"token": self._token.token} + if self.last_ts is not None: + data["start_ts"] = self.last_ts + elif self._opts.start_ts is not None: + data["start_ts"] = self._opts.start_ts + + return self._http_client.stream( + url=self._endpoint, headers=self._headers, data=data) + + def close(self): + if self._stream is not None: + self._stream.close() + + class QueryIterator: """A class to provider an iterator on top of Fauna queries.""" diff --git a/fauna/client/retryable.py b/fauna/client/retryable.py index 3d5e184f..e55ad309 100644 --- a/fauna/client/retryable.py +++ b/fauna/client/retryable.py @@ -2,7 +2,7 @@ from dataclasses import dataclass from random import random from time import sleep -from typing import Callable, Optional +from typing import Callable, Optional, TypeVar, Generic from fauna.encoding import QuerySuccess from fauna.errors import RetryableFaunaException, ClientError @@ -28,15 +28,18 @@ def wait(self) -> float: return min(backoff, self._max_backoff) +T = TypeVar('T') + + @dataclass -class RetryableResponse: +class RetryableResponse(Generic[T]): attempts: int - response: QuerySuccess + response: T -class Retryable: +class Retryable(Generic[T]): """ - Retryable is a wrapper class that acts on a Callable that returns a QuerySuccess. + Retryable is a wrapper class that acts on a Callable that returns a T type. """ _strategy: RetryStrategy _error: Optional[Exception] @@ -45,7 +48,7 @@ def __init__( self, max_attempts: int, max_backoff: int, - func: Callable[..., QuerySuccess], + func: Callable[..., T], *args, **kwargs, ): @@ -56,13 +59,12 @@ def __init__( self._kwargs = kwargs self._error = None - def run(self) -> RetryableResponse: + def run(self) -> RetryableResponse[T]: """Runs the wrapped function. Retries up to max_attempts if the function throws a RetryableFaunaException. It propagates the thrown exception if max_attempts is reached or if a non-retryable is thrown. Returns the number of attempts and the response """ - err: Optional[RetryableFaunaException] = None attempt = 0 while True: sleep_time = 0.0 if attempt == 0 else self._strategy.wait() @@ -71,7 +73,7 @@ def run(self) -> RetryableResponse: try: attempt += 1 qs = self._func(*self._args, **self._kwargs) - return RetryableResponse(attempt, qs) + return RetryableResponse[T](attempt, qs) except RetryableFaunaException as e: if attempt >= self._max_attempts: raise e diff --git a/fauna/encoding/decoder.py b/fauna/encoding/decoder.py index e3dcbe8d..5e1d9426 100644 --- a/fauna/encoding/decoder.py +++ b/fauna/encoding/decoder.py @@ -3,7 +3,7 @@ from iso8601 import parse_date from fauna.query.models import Module, DocumentReference, Document, NamedDocument, NamedDocumentReference, Page, \ - NullDocument + NullDocument, StreamToken class FaunaDecoder: @@ -42,6 +42,8 @@ class FaunaDecoder: +--------------------+---------------+ | Page | @set | +--------------------+---------------+ + | StreamToken | @stream | + +--------------------+---------------+ """ @@ -59,6 +61,7 @@ def decode(obj: Any): - { "@ref": ... } decodes to a DocumentReference or NamedDocumentReference - { "@mod": ... } decodes to a Module - { "@set": ... } decodes to a Page + - { "@stream": ... } decodes to a StreamToken :param obj: the object to decode """ @@ -165,4 +168,7 @@ def _decode_dict(dct: dict, escaped: bool): return Page(data=data, after=after) + if "@stream" in dct: + return StreamToken(dct["@stream"]) + return {k: FaunaDecoder._decode(v) for k, v in dct.items()} diff --git a/fauna/encoding/encoder.py b/fauna/encoding/encoder.py index d0b2d074..f2e12b85 100644 --- a/fauna/encoding/encoder.py +++ b/fauna/encoding/encoder.py @@ -1,7 +1,7 @@ from datetime import datetime, date from typing import Any, Optional, List -from fauna.query.models import DocumentReference, Module, Document, NamedDocument, NamedDocumentReference, NullDocument +from fauna.query.models import DocumentReference, Module, Document, NamedDocument, NamedDocumentReference, NullDocument, StreamToken from fauna.query.query_builder import Query, Fragment, LiteralFragment, ValueFragment _RESERVED_TAGS = [ @@ -58,6 +58,8 @@ class FaunaEncoder: +-------------------------------+---------------+ | TemplateFragment | string | +-------------------------------+---------------+ + | StreamToken | string | + +-------------------------------+---------------+ """ @@ -76,6 +78,7 @@ def encode(obj: Any) -> Any: - Query encodes to { "fql": [...] } - ValueFragment encodes to { "value": } - LiteralFragment encodes to a string + - StreamToken encodes to a string :raises ValueError: If value cannot be encoded, cannot be encoded safely, or there's a circular reference. :param obj: the object to decode @@ -151,6 +154,10 @@ def from_fragment(obj: Fragment): def from_query_interpolation_builder(obj: Query): return {"fql": [FaunaEncoder.from_fragment(f) for f in obj.fragments]} + @staticmethod + def from_streamtoken(obj: StreamToken): + return {"@stream": obj.token} + @staticmethod def _encode(o: Any, _markers: Optional[List] = None): if _markers is None: @@ -191,6 +198,8 @@ def _encode(o: Any, _markers: Optional[List] = None): return FaunaEncoder._encode_dict(o, _markers) elif isinstance(o, Query): return FaunaEncoder.from_query_interpolation_builder(o) + elif isinstance(o, StreamToken): + return FaunaEncoder.from_streamtoken(o) else: raise ValueError(f"Object {o} of type {type(o)} cannot be encoded") diff --git a/fauna/errors/errors.py b/fauna/errors/errors.py index ce868b31..0c84da5e 100644 --- a/fauna/errors/errors.py +++ b/fauna/errors/errors.py @@ -1,6 +1,6 @@ from typing import Optional, List, Any, Mapping -from fauna.encoding import ConstraintFailure, QueryStats, QueryInfo +from fauna.encoding import ConstraintFailure, QueryStats, QueryInfo, QueryTags class FaunaException(Exception): @@ -84,6 +84,201 @@ def __init__( def __str__(self): return f"{self.status_code}: {self.code}\n{self.message}" + @staticmethod + def parse_error_and_throw(body: Any, status_code: int): + err = body["error"] + code = err["code"] + message = err["message"] + + query_tags = QueryTags.decode( + body["query_tags"]) if "query_tags" in body else None + stats = QueryStats(body["stats"]) if "stats" in body else None + txn_ts = body["txn_ts"] if "txn_ts" in body else None + schema_version = body["schema_version"] if "schema_version" in body else None + summary = body["summary"] if "summary" in body else None + + constraint_failures: Optional[List[ConstraintFailure]] = None + if "constraint_failures" in err: + constraint_failures = [ + ConstraintFailure( + message=cf["message"], + name=cf["name"] if "name" in cf else None, + paths=cf["paths"] if "paths" in cf else None, + ) for cf in err["constraint_failures"] + ] + + if status_code >= 400 and status_code < 500: + if code == "invalid_query": + raise QueryCheckError( + status_code=400, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif code == "invalid_request": + raise InvalidRequestError( + status_code=400, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif code == "abort": + abort = err["abort"] if "abort" in err else None + raise AbortError( + status_code=400, + code=code, + message=message, + summary=summary, + abort=abort, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif code == "unauthorized": + raise AuthenticationError( + status_code=401, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif code == "forbidden" and status_code == 403: + raise AuthorizationError( + status_code=403, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif code == "method_not_allowed": + raise QueryRuntimeError( + status_code=405, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif code == "conflict": + raise ContendedTransactionError( + status_code=409, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif code == "request_size_exceeded": + raise QueryRuntimeError( + status_code=413, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif code == "limit_exceeded": + raise ThrottlingError( + status_code=429, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif code == "time_out": + raise QueryTimeoutError( + status_code=440, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + else: + raise QueryRuntimeError( + status_code=status_code, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif status_code == 500: + raise ServiceInternalError( + status_code=status_code, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + elif status_code == 503: + raise ServiceTimeoutError( + status_code=status_code, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + else: + raise ServiceError( + status_code=status_code, + code=code, + message=message, + summary=summary, + constraint_failures=constraint_failures, + query_tags=query_tags, + stats=stats, + txn_ts=txn_ts, + schema_version=schema_version, + ) + class ServiceError(FaunaError, QueryInfo): """An error representing a query failure returned by Fauna.""" diff --git a/fauna/http/http_client.py b/fauna/http/http_client.py index 205525f4..bff18f05 100644 --- a/fauna/http/http_client.py +++ b/fauna/http/http_client.py @@ -1,6 +1,7 @@ import abc +import contextlib -from typing import Iterator, Mapping, Any +from typing import Iterator, Mapping, Any, Optional from dataclasses import dataclass @@ -62,6 +63,7 @@ def request( pass @abc.abstractmethod + @contextlib.contextmanager def stream( self, url: str, diff --git a/fauna/http/httpx_client.py b/fauna/http/httpx_client.py index a8395f08..6625b672 100644 --- a/fauna/http/httpx_client.py +++ b/fauna/http/httpx_client.py @@ -1,6 +1,7 @@ import json from json import JSONDecodeError from typing import Mapping, Any, Optional, Iterator +from contextlib import contextmanager import httpx @@ -93,13 +94,25 @@ def _send_with_retry( else: return self._send_with_retry(retryCount - 1, request) + @contextmanager def stream( self, url: str, headers: Mapping[str, str], data: Mapping[str, Any], - ) -> Iterator[HTTPResponse]: - raise NotImplementedError() + ) -> Iterator[Any]: + with self._c.stream( + "POST", url=url, headers=headers, json=data) as response: + yield self._transform(response) + + def _transform(self, response): + try: + for line in response.iter_lines(): + yield json.loads(line) + except httpx.ReadTimeout as e: + raise NetworkError("Stream timeout") from e + except (httpx.HTTPError, httpx.InvalidURL) as e: + raise NetworkError("Exception re-raised from HTTP request") from e def close(self): self._c.close() diff --git a/fauna/query/models.py b/fauna/query/models.py index bca1a2a4..c496c058 100644 --- a/fauna/query/models.py +++ b/fauna/query/models.py @@ -36,6 +36,19 @@ def __ne__(self, other): return not self.__eq__(other) +class StreamToken: + """A class represeting a Stream in Fauna.""" + + def __init__(self, token: str): + self.token = token + + def __eq__(self, other): + return isinstance(other, StreamToken) and self.token == other.token + + def __hash__(self): + return hash(self.token) + + class Module: """A class representing a Module in Fauna. Examples of modules include Collection, Math, and a user-defined collection, among others. @@ -102,7 +115,7 @@ def __init__(self, coll: Union[str, Module], id: str): self._id = id def __hash__(self): - hash((type(self), self._collection, self._id)) + return hash((type(self), self._collection, self._id)) def __repr__(self): return f"{self.__class__.__name__}(id={repr(self._id)},coll={repr(self._collection)})" @@ -136,7 +149,7 @@ def __init__(self, coll: Union[str, Module], name: str): self._name = name def __hash__(self): - hash((type(self), self._collection, self._name)) + return hash((type(self), self._collection, self._name)) def __repr__(self): return f"{self.__class__.__name__}(name={repr(self._name)},coll={repr(self._collection)})" diff --git a/setup.py b/setup.py index 35119ecd..119ec9c5 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ # Load the README file for use in the long description local_dir = path.abspath(path.dirname(__file__)) -with open(path.join(local_dir, "README.rst"), encoding="utf-8") as f: +with open(path.join(local_dir, "README.md"), encoding="utf-8") as f: long_description = f.read() requires = [ @@ -31,13 +31,13 @@ version=pkg_version, description="Fauna Python driver for FQL 10+", long_description=long_description, - long_description_content_type="text/x-rst", + long_description_content_type="text/markdown", url="https://github.com/fauna/fauna-python", author=pkg_author, author_email="priority@fauna.com", license=pkg_license, classifiers=[ - "Development Status :: 4 - Beta", + "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)", "Programming Language :: Python :: 3", diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 62ccdc48..0f925b02 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -6,6 +6,7 @@ from fauna import fql, Module from fauna.client import Client +from fauna.client.utils import _Environment from fauna.query import Query @@ -13,6 +14,10 @@ def create_collection(name) -> Query: return fql('Collection.create({ name: ${name} })', name=name) +def create_database(name) -> Query: + return fql('Database.create({ name: ${name} })', name=name) + + @pytest.fixture def suffix() -> str: letters = string.ascii_lowercase @@ -24,6 +29,18 @@ def client() -> Client: return Client() +@pytest.fixture +def scoped_client(scoped_secret) -> Client: + return Client(secret=scoped_secret) + + +@pytest.fixture +def scoped_secret(client, suffix) -> str: + db_name = f"Test_{suffix}" + client.query(create_database(db_name)) + return f"{_Environment.EnvFaunaSecret()}:{db_name}:admin" + + @pytest.fixture def a_collection(client, suffix) -> Module: col_name = f"Test_{suffix}" diff --git a/tests/integration/test_client_with_query_limits.py b/tests/integration/test_client_with_query_limits.py index 0d95948a..6f5208fb 100644 --- a/tests/integration/test_client_with_query_limits.py +++ b/tests/integration/test_client_with_query_limits.py @@ -1,6 +1,6 @@ from multiprocessing.pool import ThreadPool -import os from typing import Optional +import os import pytest diff --git a/tests/integration/test_stream.py b/tests/integration/test_stream.py new file mode 100644 index 00000000..484c064e --- /dev/null +++ b/tests/integration/test_stream.py @@ -0,0 +1,235 @@ +import threading +import time +import pytest +import httpx +import fauna +from fauna import fql +from fauna.client import Client, StreamOptions +from fauna.http.httpx_client import HTTPXClient +from fauna.errors import NetworkError, RetryableFaunaException, QueryRuntimeError + + +def test_stream(scoped_client): + scoped_client.query(fql("Collection.create({name: 'Product'})")) + + events = [] + + def thread_fn(): + stream = scoped_client.stream(fql("Product.all().toStream()")) + + with stream as iter: + for evt in iter: + events.append(evt["type"]) + + # close after 3 events + if len(events) == 3: + iter.close() + + stream_thread = threading.Thread(target=thread_fn) + stream_thread.start() + + # adds a delay so the thread can open the stream, + # otherwise we could miss some events + time.sleep(0.5) + + id = scoped_client.query(fql("Product.create({}).id")).data + scoped_client.query(fql("Product.byId(${id})!.delete()", id=id)) + scoped_client.query(fql("Product.create({})")) + scoped_client.query(fql("Product.create({})")) + + stream_thread.join() + assert events == ["add", "remove", "add"] + + +def test_close_method(scoped_client): + scoped_client.query(fql("Collection.create({name: 'Product'})")) + + events = [] + + def thread_fn(): + stream = scoped_client.stream(fql("Product.all().toStream()")) + + with stream as iter: + for evt in iter: + events.append(evt["type"]) + + # close after 2 events + if len(events) == 2: + iter.close() + + stream_thread = threading.Thread(target=thread_fn) + stream_thread.start() + + # adds a delay so the thread can open the stream, + # otherwise we could miss some events + time.sleep(0.5) + + scoped_client.query(fql("Product.create({})")) + scoped_client.query(fql("Product.create({})")) + + stream_thread.join() + assert events == ["add", "add"] + + +def test_error_on_stream(scoped_client): + scoped_client.query(fql("Collection.create({name: 'Product'})")) + + def thread_fn(): + stream = scoped_client.stream(fql("Product.all().map(.foo / 0).toStream()")) + + with pytest.raises(QueryRuntimeError): + with stream as iter: + for evt in iter: + pass + + stream_thread = threading.Thread(target=thread_fn) + stream_thread.start() + + # adds a delay so the thread can open the stream, + # otherwise we could miss some events + time.sleep(0.5) + + scoped_client.query(fql("Product.create({foo: 10})")) + scoped_client.query(fql("Product.create({foo: 10})")) + + stream_thread.join() + + +def test_max_retries(scoped_secret): + scoped_client = Client(secret=scoped_secret) + scoped_client.query(fql("Collection.create({name: 'Product'})")) + + httpx_client = httpx.Client(http1=False, http2=True) + client = Client( + secret=scoped_secret, + http_client=HTTPXClient(httpx_client), + max_attempts=3, + max_backoff=0) + + count = [0] + + def stream_func(*args, **kwargs): + count[0] += 1 + raise NetworkError('foo') + + httpx_client.stream = stream_func + + count[0] = 0 + with pytest.raises(RetryableFaunaException): + with client.stream(fql("Product.all().toStream()")) as iter: + events = [evt["type"] for evt in iter] + assert count[0] == 3 + + count[0] = 0 + with pytest.raises(RetryableFaunaException): + opts = StreamOptions(max_attempts=5) + with client.stream(fql("Product.all().toStream()"), opts) as iter: + events = [evt["type"] for evt in iter] + assert count[0] == 5 + + +def test_last_ts_is_monotonic(scoped_client): + scoped_client.query(fql("Collection.create({name: 'Product'})")) + + events = [] + + def thread_fn(): + stream = scoped_client.stream(fql("Product.all().toStream()")) + + with stream as iter: + last_ts = 0 + + for evt in iter: + assert iter.last_ts > last_ts + + last_ts = iter.last_ts + + events.append(evt["type"]) + + # close after 3 events + if len(events) == 3: + iter.close() + + stream_thread = threading.Thread(target=thread_fn) + stream_thread.start() + + # adds a delay so the thread can open the stream, + # otherwise we could miss some events + time.sleep(0.5) + + scoped_client.query(fql("Product.create({})")) + scoped_client.query(fql("Product.create({})")) + scoped_client.query(fql("Product.create({})")) + scoped_client.query(fql("Product.create({})")) + + stream_thread.join() + assert events == ["add", "add", "add"] + + +def test_providing_start_ts(scoped_client): + scoped_client.query(fql("Collection.create({name: 'Product'})")) + + stream_token = scoped_client.query(fql("Product.all().toStream()")).data + + createOne = scoped_client.query(fql("Product.create({})")) + createTwo = scoped_client.query(fql("Product.create({})")) + createThree = scoped_client.query(fql("Product.create({})")) + + events = [] + + def thread_fn(): + # replay excludes the ts that was passed in, it provides events for all ts after the one provided + stream = scoped_client.stream(stream_token, + StreamOptions(start_ts=createOne.txn_ts)) + with stream as iter: + for event in iter: + events.append(event) + if (len(events) == 3): + iter.close() + + stream_thread = threading.Thread(target=thread_fn) + stream_thread.start() + + # adds a delay so the thread can open the stream, + # otherwise we could miss some events + time.sleep(0.5) + + createFour = scoped_client.query(fql("Product.create({})")) + stream_thread.join() + assert events[0]["txn_ts"] == createTwo.txn_ts + assert events[1]["txn_ts"] == createThree.txn_ts + assert events[2]["txn_ts"] == createFour.txn_ts + + +@pytest.mark.xfail(reason="not currently supported by core") +def test_handle_status_events(scoped_client): + scoped_client.query(fql("Collection.create({name: 'Product'})")) + + events = [] + + def thread_fn(): + opts = StreamOptions(status_events=True) + stream = scoped_client.stream(fql("Product.all().toStream()"), opts) + + with stream as iter: + for evt in iter: + events.append(evt["type"]) + + # close after 3 events + if len(events) == 3: + iter.close() + + stream_thread = threading.Thread(target=thread_fn) + stream_thread.start() + + # adds a delay so the thread can open the stream, + # otherwise we could miss some events + time.sleep(0.5) + + scoped_client.query(fql("Product.create({})")) + scoped_client.query(fql("Product.create({})")) + scoped_client.query(fql("Product.create({})")) + scoped_client.query(fql("Product.create({})")) + + stream_thread.join() + assert events == ["status", "add", "add"] diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 247a64fc..da390cf4 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -1,15 +1,17 @@ +import json from datetime import timedelta -from typing import Dict +from typing import Dict, List, Any import httpx import pytest import pytest_subtests -from pytest_httpx import HTTPXMock +from pytest_httpx import HTTPXMock, IteratorStream import fauna from fauna import fql -from fauna.client import Client, Header, QueryOptions, Endpoints -from fauna.errors import QueryCheckError, ProtocolError, QueryRuntimeError +from fauna.client import Client, Header, QueryOptions, Endpoints, StreamOptions +from fauna.errors import QueryCheckError, ProtocolError, QueryRuntimeError, NetworkError, AbortError +from fauna.query.models import StreamToken from fauna.http import HTTPXClient @@ -413,3 +415,156 @@ def test_call_query_with_string(): match="'fql' must be a Query but was a . You can build a Query by " "calling fauna.fql()"): c.query("fake") # type: ignore + + +def test_client_stream(subtests, httpx_mock: HTTPXMock): + response = [ + b'{"type": "status", "txn_ts": 1}\n', b'{"type": "add", "txn_ts": 2}\n', + b'{"type": "remove", "txn_ts": 3}\n' + ] + + httpx_mock.add_response(stream=IteratorStream(response)) + + ret = [] + with httpx.Client() as mockClient: + http_client = HTTPXClient(mockClient) + c = Client(http_client=http_client) + with c.stream(StreamToken("token")) as stream: + ret = [obj for obj in stream] + + assert stream.last_ts == 3 + + assert ret == [{"type": "add", "txn_ts": 2}, {"type": "remove", "txn_ts": 3}] + + +def test_client_close_stream(subtests, httpx_mock: HTTPXMock): + response = [ + b'{"type": "status", "txn_ts": 1}\n', b'{"type": "add", "txn_ts": 2}\n' + ] + + httpx_mock.add_response(stream=IteratorStream(response)) + + with httpx.Client() as mockClient: + http_client = HTTPXClient(mockClient) + c = Client(http_client=http_client) + with c.stream(StreamToken("token")) as stream: + assert next(stream) == {"type": "add", "txn_ts": 2} + stream.close() + + assert stream.last_ts == 2 + + with pytest.raises(StopIteration): + next(stream) + + +def test_client_retry_stream(subtests, httpx_mock: HTTPXMock): + + def stream_iter0(): + yield b'{"type": "status", "txn_ts": 1}\n' + yield b'{"type": "add", "txn_ts": 2}\n' + raise NetworkError("Some network error") + yield b'{"type": "status", "txn_ts": 3}\n' + + def stream_iter1(): + yield b'{"type": "status", "txn_ts": 4}\n' + + httpx_mock.add_response(stream=IteratorStream(stream_iter0())) + httpx_mock.add_response(stream=IteratorStream(stream_iter1())) + + ret = [] + with httpx.Client() as mockClient: + http_client = HTTPXClient(mockClient) + c = Client(http_client=http_client) + with c.stream(StreamToken("token")) as stream: + ret = [obj for obj in stream] + + assert stream.last_ts == 4 + + assert ret == [{"type": "add", "txn_ts": 2}] + + +def test_client_close_stream_on_error(subtests, httpx_mock: HTTPXMock): + + def stream_iter(): + yield b'{"type": "status", "txn_ts": 1}\n' + yield b'{"type": "add", "txn_ts": 2}\n' + yield b'{"type": "error", "txn_ts": 3, "error": {"message": "message", "code": "abort"}}\n' + yield b'{"type": "status", "txn_ts": 4}\n' + + httpx_mock.add_response(stream=IteratorStream(stream_iter())) + + ret = [] + + with pytest.raises(AbortError): + with httpx.Client() as mockClient: + http_client = HTTPXClient(mockClient) + c = Client(http_client=http_client) + with c.stream(StreamToken("token")) as stream: + for obj in stream: + ret.append(obj) + + assert stream.last_ts == 5 + + assert ret == [{"type": "add", "txn_ts": 2}] + + +def test_client_ignore_start_event(subtests, httpx_mock: HTTPXMock): + + def stream_iter(): + yield b'{"type": "start", "txn_ts": 1}\n' + yield b'{"type": "status", "txn_ts": 2}\n' + yield b'{"type": "add", "txn_ts": 3}\n' + yield b'{"type": "remove", "txn_ts": 4}\n' + yield b'{"type": "status", "txn_ts": 5}\n' + + httpx_mock.add_response(stream=IteratorStream(stream_iter())) + + ret = [] + + with httpx.Client() as mockClient: + http_client = HTTPXClient(mockClient) + c = Client(http_client=http_client) + with c.stream(StreamToken("token")) as stream: + for obj in stream: + ret.append(obj) + + assert stream.last_ts == 5 + + assert ret == [{"type": "add", "txn_ts": 3}, {"type": "remove", "txn_ts": 4}] + + +def test_client_handle_status_events(subtests, httpx_mock: HTTPXMock): + + def stream_iter(): + yield b'{"type": "status", "txn_ts": 1}\n' + yield b'{"type": "add", "txn_ts": 2}\n' + yield b'{"type": "remove", "txn_ts": 3}\n' + yield b'{"type": "status", "txn_ts": 4}\n' + + httpx_mock.add_response(stream=IteratorStream(stream_iter())) + + ret = [] + + with httpx.Client() as mockClient: + http_client = HTTPXClient(mockClient) + c = Client(http_client=http_client) + with c.stream(StreamToken("token"), + StreamOptions(status_events=True)) as stream: + for obj in stream: + ret.append(obj) + + assert stream.last_ts == 4 + + assert ret == [{ + "type": "status", + "txn_ts": 1 + }, { + "type": "add", + "txn_ts": 2 + }, { + "type": "remove", + "txn_ts": 3 + }, { + "type": "status", + "txn_ts": 4 + }] diff --git a/tests/unit/test_encoding.py b/tests/unit/test_encoding.py index 80a5ca19..e54d29cb 100644 --- a/tests/unit/test_encoding.py +++ b/tests/unit/test_encoding.py @@ -7,7 +7,7 @@ from fauna import fql from fauna.encoding import FaunaEncoder, FaunaDecoder from fauna.query.models import DocumentReference, NamedDocumentReference, Document, NamedDocument, Module, Page, \ - NullDocument + NullDocument, StreamToken fixed_datetime = datetime.fromisoformat("2023-03-17T00:00:00+00:00") @@ -776,3 +776,17 @@ def test_encode_query_builder_sub_queries(subtests): } assert expected == actual + + +def test_decode_stream(subtests): + with subtests.test(msg="decode @stream into StreamToken"): + test = {"@stream": "asdflkj"} + decoded = FaunaDecoder.decode(test) + assert decoded == StreamToken("asdflkj") + + +def test_encode_stream(subtests): + with subtests.test(msg="encode StreamToken into @stream"): + test = {"@stream": "asdflkj"} + encoded = FaunaEncoder.encode(StreamToken("asdflkj")) + assert encoded == test diff --git a/tests/unit/test_httpx_client.py b/tests/unit/test_httpx_client.py new file mode 100644 index 00000000..3b2930ac --- /dev/null +++ b/tests/unit/test_httpx_client.py @@ -0,0 +1,44 @@ +import json + +import httpx +import pytest +from pytest_httpx import HTTPXMock, IteratorStream + +from fauna.client import Client +from fauna.http import HTTPXClient + + +def test_httx_client_stream(subtests, httpx_mock: HTTPXMock): + expected = [{"@int": "10"}, {"@long": "20"}] + + def to_json_bytes(obj): + return bytes(json.dumps(obj) + "\n", "utf-8") + + httpx_mock.add_response( + stream=IteratorStream([to_json_bytes(obj) for obj in expected])) + + with httpx.Client() as mockClient: + http_client = HTTPXClient(mockClient) + with http_client.stream("http://localhost:8443", {}, {}) as stream: + ret = [obj for obj in stream] + + assert ret == expected + + +def test_httx_client_close_stream(subtests, httpx_mock: HTTPXMock): + expected = [{"@int": "10"}, {"@long": "20"}] + + def to_json_bytes(obj): + return bytes(json.dumps(obj) + "\n", "utf-8") + + httpx_mock.add_response( + stream=IteratorStream([to_json_bytes(obj) for obj in expected])) + + with httpx.Client() as mockClient: + http_client = HTTPXClient(mockClient) + with http_client.stream("http://localhost:8443", {}, {}) as stream: + assert next(stream) == expected[0] + stream.close() + + with pytest.raises(StopIteration): + next(stream) From 4ac7d721728fdb27fb146344ce6db0e0a7f4c212 Mon Sep 17 00:00:00 2001 From: Lucas Pedroza <40873230+pnwpedro@users.noreply.github.com> Date: Mon, 20 May 2024 18:43:15 +0200 Subject: [PATCH 2/2] Port changes from README.rst into README.md (#178) Co-authored-by: fauna-chase <73842483+fauna-chase@users.noreply.github.com> Co-authored-by: James Rodewig --- README.md | 40 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 370b154c..cba0f357 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ of FQL. To query your databases with earlier API versions, see the [faunadb](https://pypi.org/project/faunadb/) package. See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) -for additional information how to configure and query your databases. +for additional information on how to configure and query your databases. ## Installation Pre-release installations must specify the version you want to install. Find the version you want to install on [PyPI](https://pypi.org/project/fauna/#history). @@ -43,7 +43,7 @@ from fauna.encoding import QuerySuccess from fauna.errors import FaunaException client = Client() -# The client defaults to using using the value stored FAUNA_SECRET for its secret. +# The client defaults to using the value stored FAUNA_SECRET for its secret. # Either set the FAUNA_SECRET env variable or retrieve it from a secret store. # As a best practice, don't store your secret directly in your code. @@ -86,7 +86,7 @@ print(res.data) # 8 Serialization and deserialization with user-defined classes is not yet supported. -When building queries, adapt your classes into dicts or lists prior to using them in composition. When instantiating classes from the query result data, build them from the expected result. +When building queries, adapt your classes into dicts or lists before using them in composition. When instantiating classes from the query result data, build them from the expected result. ```python class MyClass: @@ -244,6 +244,40 @@ except ServiceError as e: emit_stats(e.stats) # more error handling... ``` + +## Pagination + +Use the ``Client.paginate()`` method to iterate sets that contain more than one +page of results. + +``Client.paginate()`` accepts the same query options as ``Client.query()``. + +Change the default items per page using FQL's ``.pageSize()`` method. + +```python +from datetime import timedelta +from fauna import fql +from fauna.client import Client, QueryOptions + +# Adjust `pageSize()` size as needed. +query = fql( + """ + Product + .byName("limes") + .pageSize(60) { description }""" +) + +client = Client() + +options = QueryOptions(query_timeout=timedelta(seconds=20)) + +pages = client.paginate(query, options) + +for products in pages: + for product in products: + print(products) +``` + ## Event Streaming The driver supports `Event Streaming `_.