diff --git a/docs/source/_substitutions.rst b/docs/source/_substitutions.rst index 290da7d7..17d99735 100644 --- a/docs/source/_substitutions.rst +++ b/docs/source/_substitutions.rst @@ -2,6 +2,9 @@ .. |arg_record_id| replace:: An Airtable record ID. +.. |arg_table_id_or_name| replace:: An Airtable table ID or name. + Table name should be unencoded, as shown on browser. + .. |kwarg_view| replace:: The name or ID of a view. If set, only the records in that view will be returned. The records will be sorted according to the order of the view. diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 8e9f285e..e70024b0 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -46,6 +46,7 @@ Changelog * Added support for `Upload attachment `_ via :meth:`Table.upload_attachment ` or :meth:`AttachmentsList.upload `. +* Added :class:`pyairtable.testing.MockAirtable` for easier testing. 2.3.3 (2024-03-22) ------------------------ diff --git a/docs/source/tables.rst b/docs/source/tables.rst index 84ec8cd5..670120a0 100644 --- a/docs/source/tables.rst +++ b/docs/source/tables.rst @@ -299,3 +299,14 @@ and :meth:`~pyairtable.Table.add_comment` methods will return instances of >>> table.comments("recMNxslc6jG0XedV")[0].text 'Never mind!' >>> comment.delete() + +Testing Your Code +----------------- + +pyAirtable provides a :class:`~pyairtable.testing.MockAirtable` class that can be used to +test your code without making real requests to Airtable. + +.. autoclass:: pyairtable.testing.MockAirtable + :noindex: + +For more information, see :mod:`pyairtable.testing`. diff --git a/pyairtable/api/api.py b/pyairtable/api/api.py index a6fa6ea1..65c91d36 100644 --- a/pyairtable/api/api.py +++ b/pyairtable/api/api.py @@ -3,6 +3,7 @@ from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, TypeVar, Union import requests +from requests import PreparedRequest from requests.sessions import Session from typing_extensions import TypeAlias @@ -273,14 +274,17 @@ def request( json=json, ) - response = self.session.send(prepared, timeout=self.timeout) - return self._process_response(response) + return self._perform_request(prepared) get = partialmethod(request, "GET") post = partialmethod(request, "POST") patch = partialmethod(request, "PATCH") delete = partialmethod(request, "DELETE") + def _perform_request(self, prepared: PreparedRequest) -> Any: + response = self.session.send(prepared, timeout=self.timeout) + return self._process_response(response) + def _process_response(self, response: requests.Response) -> Any: try: response.raise_for_status() diff --git a/pyairtable/api/base.py b/pyairtable/api/base.py index e850cc3d..67bee396 100644 --- a/pyairtable/api/base.py +++ b/pyairtable/api/base.py @@ -107,8 +107,7 @@ def table( Build a new :class:`Table` instance using this instance of :class:`Base`. Args: - id_or_name: An Airtable table ID or name. Table name should be unencoded, - as shown on browser. + id_or_name: |arg_table_id_or_name| validate: |kwarg_validate_metadata| force: |kwarg_force_metadata| diff --git a/pyairtable/api/types.py b/pyairtable/api/types.py index 2753a603..81807057 100644 --- a/pyairtable/api/types.py +++ b/pyairtable/api/types.py @@ -319,6 +319,9 @@ class UpdateRecordDict(TypedDict): fields: WritableFields +AnyRecordDict: TypeAlias = Union[RecordDict, CreateRecordDict, UpdateRecordDict] + + class RecordDeletedDict(TypedDict): """ A ``dict`` representing the payload returned by the Airtable API to confirm a deletion. diff --git a/pyairtable/testing.py b/pyairtable/testing.py index ccbdd7a3..b3b0da4b 100644 --- a/pyairtable/testing.py +++ b/pyairtable/testing.py @@ -1,16 +1,58 @@ """ -Helper functions for writing tests that use the pyairtable library. +pyAirtable provides a number of helper functions for testing code that uses +the Airtable API. These functions are designed to be used with the standard +Python :mod:`unittest.mock` library, and can be used to create fake records, +users, and attachments, as well as to mock the Airtable API itself. """ import datetime +import inspect +import mimetypes import random import string -from typing import Any, Optional, Union +from collections import defaultdict +from contextlib import ExitStack, contextmanager +from functools import partialmethod +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Sequence, + Tuple, + Union, + cast, + overload, +) +from unittest import mock + +import urllib3 +from typing_extensions import Self, TypeAlias from pyairtable.api import retrying -from pyairtable.api.api import TimeoutTuple -from pyairtable.api.types import AttachmentDict, CollaboratorDict, Fields, RecordDict -from pyairtable.utils import is_airtable_id +from pyairtable.api.api import Api, TimeoutTuple +from pyairtable.api.table import Table +from pyairtable.api.types import ( + AnyRecordDict, + AttachmentDict, + CollaboratorDict, + CreateRecordDict, + FieldName, + Fields, + RecordDeletedDict, + RecordDict, + RecordId, + UpdateRecordDict, + UpsertResultDict, + WritableFields, +) +from pyairtable.utils import fieldgetter, is_airtable_id + + +def _now() -> str: + return datetime.datetime.now().isoformat() + "Z" def fake_id(type: str = "rec", value: Any = None) -> str: @@ -53,8 +95,6 @@ def fake_meta( "timeout": timeout, "retry": retry, "typecast": typecast, - "timeout": timeout, - "retry": retry, "use_field_ids": use_field_ids, "memoize": memoize, } @@ -70,20 +110,29 @@ def fake_record( Generate a fake record dict with the given field values. >>> fake_record({"Name": "Alice"}) - {'id': '...', 'createdTime': '...', 'fields': {'Name': 'Alice'}} - - >>> fake_record(name="Alice", address="123 Fake St") - {'id': '...', 'createdTime': '...', 'fields': {'name': 'Alice', 'address': '123 Fake St'}} + { + 'id': '...', + 'createdTime': '...', + 'fields': {'name': 'Alice'} + } >>> fake_record(name="Alice", id="123") - {'id': 'rec00000000000123', 'createdTime': '...', 'fields': {'name': 'Alice'}} + { + 'id': 'rec00000000000123', + 'createdTime': '...', + 'fields': {'name': 'Alice'} + } >>> fake_record(name="Alice", id="recABC00000000123") - {'id': 'recABC00000000123', 'createdTime': '...', 'fields': {'name': 'Alice'}} + { + 'id': 'recABC00000000123', + 'createdTime': '...', + 'fields': {'name': 'Alice'} + } """ return { "id": str(id) if is_airtable_id(id, "rec") else fake_id(value=id), - "createdTime": datetime.datetime.now().isoformat() + "Z", + "createdTime": _now(), "fields": {**(fields or {}), **other_fields}, } @@ -92,25 +141,476 @@ def fake_user(value: Any = None) -> CollaboratorDict: """ Generate a fake user dict with the given value for an email prefix. - >>> fake_user("alice") - {'id': 'usr000000000Alice', 'email': 'alice@example.com', 'name': 'Fake User'} + >>> fake_user("Alice") + { + 'id': 'usr000000000Alice', + 'email': 'alice@example.com' + 'name': 'Alice' + } """ id = fake_id("usr", value) return { "id": id, - "email": f"{value or id}@example.com", - "name": "Fake User", + "email": f"{str(value or id).lower()}@example.com", + "name": str(value or "Fake User"), } -def fake_attachment() -> AttachmentDict: +def fake_attachment(url: str = "", filename: str = "") -> AttachmentDict: """ Generate a fake attachment dict. + + >>> fake_attachment() + { + 'id': 'att...', + 'url': 'https://example.com/', + 'filename': 'foo.txt', + 'size': 100, + 'type': 'text/plain', + } + + >>> fake_attachment('https://example.com/image.png', 'foo.png') + { + 'id': 'att...', + 'url': 'https://example.com/image.png', + 'filename': 'foo.png', + 'size': 100, + 'type': 'text/plain', + } """ + if not filename: + filename = (urllib3.util.parse_url(url).path or "").split("/")[-1] + filename = filename or "foo.txt" return { "id": fake_id("att"), - "url": "https://example.com/", - "filename": "foo.txt", + "url": url or "https://example.com/", + "filename": filename, "size": 100, - "type": "text/plain", + "type": mimetypes.guess_type(filename)[0] or "text/plain", + } + + +BaseAndTableId: TypeAlias = Tuple[str, str] + + +class MockAirtable: + """ + This class acts as a context manager which mocks several pyAirtable APIs, + so that your tests can operate against tables without making network requests. + + .. code-block:: python + + from pyairtable import Api + from pyairtable.testing import MockAirtable + + table = Api.base("baseId").table("tableName") + + with MockAirtable() as m: + m.add_records(table, [{"Name": "Alice"}]) + records = table.all() + assert len(table.all()) == 1 + + If you use pytest, you might want to include this as a fixture. + + .. code-block:: python + + import pytest + from pyairtable.testing import MockAirtable + + @pytest.fixture(autouse=True) + def mock_airtable(): + with MockAirtable() as m: + yield m + + def test_your_function(): + ... + + Not all API methods are supported; if your test calls a method that would + make a network request, a RuntimeError will be raised instead. + + >>> with MockAirtable() as m: + ... table.schema() + ... + Traceback (most recent call last): ... + RuntimeError: unhandled call to Api.request + + You can allow unhandled requests by setting the ``passthrough`` argument to True, + either on the constructor or temporarily on the MockAirtable instance. This is + useful when using another library, like `requests-mock `_, + to prepare responses for complex cases (like code that retrieves the schema). + + .. code-block:: python + + def test_your_function(requests_mock, mock_airtable, monkeypatch): + base = Api.base("baseId") + + # load and cache our mock schema + requests_mock.get( + base.meta_url("tables"), + json={"tables": [...]} + ) + with mock_airtable.enable_passthrough(): + base.schema() + + # code below will fail if any more unhandled requests are made + ... + + """ + + # The list of APIs that are mocked by this class. + mocked = [ + "Api._perform_request", + "Table.iterate", + "Table.get", + "Table.create", + "Table.update", + "Table.delete", + "Table.batch_create", + "Table.batch_update", + "Table.batch_delete", + "Table.batch_upsert", + ] + + # 2-layer mapping of (base, table) IDs --> record IDs --> record dicts. + records: Dict[BaseAndTableId, Dict[RecordId, RecordDict]] + + _stack: Optional[ExitStack] + _mocks: Dict[str, Any] + + def __init__(self, passthrough: bool = False) -> None: + """ + Args: + passthrough: if True, unmocked methods will still be allowed to + perform real network requests. If False, they will raise an error. + """ + self.passthrough = passthrough + self._reset() + + def _reset(self) -> None: + self._stack = None + self._mocks = {} + self.records = defaultdict(dict) + + def __enter__(self) -> Self: + if self._stack: + raise RuntimeError("MockAirtable is not reentrant") + if hasattr(Api._perform_request, "mock"): + raise RuntimeError("MockAirtable cannot be nested") + self._reset() + self._stack = ExitStack() + + for name in self.mocked: + side_effect_name = name.replace(".", "_").lower() + side_effect = getattr(self, f"_{side_effect_name}", None) + mocked_method = self._mocks[name] = mock.patch( + f"pyairtable.{name}", + side_effect=side_effect, + autospec=True, + ) + self._stack.enter_context(mocked_method) + + return self + + def __exit__(self, *exc_info: Any) -> None: + if self._stack: + self._stack.__exit__(*exc_info) + + @contextmanager + def set_passthrough(self, allowed: bool) -> Iterator[Self]: + """ + Context manager that temporarily changes whether unmocked methods + are allowed to perform real network requests. For convenience, there are + also shortcuts ``enable_passthrough()`` and ``disable_passthrough()``. + + Usage: + + .. code-block:: python + + with MockAirtable() as m: + with m.enable_passthrough(): + schema = base.schema() + hooks = table.webhooks() + + # no more network requests allowed + ... + + Args: + allowed: If ``True``, unmocked methods will be allowed to perform real + network requests within this context manager. If ``False``, + they will not be allowed. + """ + original = self.passthrough + self.passthrough = allowed + try: + yield self + finally: + self.passthrough = original + + enable_passthrough = partialmethod(set_passthrough, True) + disable_passthrough = partialmethod(set_passthrough, False) + + @overload + def add_records( + self, + base_id: str, + table_id_or_name: str, + /, + records: Iterable[Dict[str, Any]], + ) -> List[RecordDict]: ... + + @overload + def add_records( + self, + table: Table, + /, + records: Iterable[Dict[str, Any]], + ) -> List[RecordDict]: ... + + def add_records(self, *args: Any, **kwargs: Any) -> List[RecordDict]: + """ + Add a list of records to the mock Airtable instance. These will be returned + from methods like :meth:`~pyairtable.Table.all` and :meth:`~pyairtable.Table.get`. + + Can be called with either a base ID and table name, + or an instance of :class:`~pyairtable.Table`: + + .. code-block:: + + m = MockAirtable() + m.add_records("baseId", "tableName", [{"Name": "Alice"}]) + m.add_records(table, records=[{"id": "recFake", {"Name": "Alice"}}]) + + .. note:: + + The parameters to :meth:`~pyairtable.Table.all` are not supported by MockAirtable, + and constraints like ``formula=`` and ``limit=`` will be ignored. It is assumed + that you are adding records to specifically test a particular use case. + MockAirtable is not a full in-memory replacement for the Airtable API. + + Args: + base_id: |arg_base_id| + *This must be the first positional argument.* + table_id_or_name: |arg_table_id_or_name| + This should be the same ID or name used in the code under test. + *This must be the second positional argument.* + table: An instance of :class:`~pyairtable.Table`. + *This is an alternative to providing base and table IDs, + and must be the first positional argument.* + records: A sequence of :class:`~pyairtable.api.types.RecordDict`, + :class:`~pyairtable.api.types.UpdateRecordDict`, + :class:`~pyairtable.api.types.CreateRecordDict`, + or :class:`~pyairtable.api.types.Fields`. + """ + base_id, table_name, records = _extract_args(args, kwargs, ["records"]) + coerced = [coerce_fake_record(record) for record in records] + self.records[(base_id, table_name)].update( + {record["id"]: record for record in coerced} + ) + return coerced + + @overload + def set_records( + self, + base_id: str, + table_id_or_name: str, + /, + records: Iterable[Dict[str, Any]], + ) -> None: ... + + @overload + def set_records( + self, + table: Table, + /, + records: Iterable[Dict[str, Any]], + ) -> None: ... + + def set_records(self, *args: Any, **kwargs: Any) -> None: + """ + Set the mock records for a particular base and table, replacing any existing records. + See :meth:`~MockAirtable.add_records` for more information. + + Args: + base_id: |arg_base_id| + *This must be the first positional argument.* + table_id_or_name: |arg_table_id_or_name| + This should be the same ID or name used in the code under test. + *This must be the second positional argument.* + table: An instance of :class:`~pyairtable.Table`. + *This is an alternative to providing base and table IDs, + and must be the first positional argument.* + records: A sequence of :class:`~pyairtable.api.types.RecordDict`, + :class:`~pyairtable.api.types.UpdateRecordDict`, + :class:`~pyairtable.api.types.CreateRecordDict`, + or :class:`~pyairtable.api.types.Fields`. + """ + base_id, table_name, records = _extract_args(args, kwargs, ["records"]) + self.records[(base_id, table_name)].clear() + self.add_records(base_id, table_name, records=records) + + def clear(self) -> None: + """ + Clear all records from the mock Airtable instance. + """ + self.records.clear() + + # side effects + + def _api__perform_request(self, method: str, url: str, **kwargs: Any) -> Any: + if not self.passthrough: + raise RuntimeError("unhandled call to Api.request") + mocked = self._mocks["Api._perform_request"] + return mocked.temp_original(method, url, **kwargs) + + def _table_iterate(self, table: Table, **options: Any) -> List[List[RecordDict]]: + return [list(self.records[(table.base.id, table.name)].values())] + + def _table_get(self, table: Table, record_id: str, **options: Any) -> RecordDict: + return self.records[(table.base.id, table.name)][record_id] + + def _table_create( + self, + table: Table, + record: CreateRecordDict, + **kwargs: Any, + ) -> RecordDict: + records = self.records[(table.base.id, table.name)] + record = coerce_fake_record(record) + while record["id"] in records: + record["id"] = fake_id() # pragma: no cover + records[record["id"]] = record + return record + + def _table_update( + self, + table: Table, + record_id: RecordId, + fields: WritableFields, + **kwargs: Any, + ) -> RecordDict: + exists = self.records[(table.base.id, table.name)][record_id] + exists["fields"].update(fields) + return exists + + def _table_delete(self, table: Table, record_id: RecordId) -> RecordDeletedDict: + self.records[(table.base.id, table.name)].pop(record_id) + return {"id": record_id, "deleted": True} + + def _table_batch_create( + self, + table: Table, + records: Iterable[CreateRecordDict], + **kwargs: Any, + ) -> List[RecordDict]: + return [self._table_create(table, record) for record in records] + + def _table_batch_update( + self, + table: Table, + records: Iterable[UpdateRecordDict], + **kwargs: Any, + ) -> List[RecordDict]: + return [ + self._table_update(table, record["id"], record["fields"]) + for record in records + ] + + def _table_batch_delete( + self, + table: Table, + record_ids: Iterable[RecordId], + ) -> List[RecordDeletedDict]: + return [self._table_delete(table, record_id) for record_id in record_ids] + + def _table_batch_upsert( + self, + table: Table, + records: Iterable[AnyRecordDict], + key_fields: Iterable[FieldName], + **kwargs: Any, + ) -> UpsertResultDict: + """ + Perform a batch upsert operation on the mocked records for the table. + """ + key = fieldgetter(*key_fields) + existing_by_id = self.records[(table.base.id, table.name)] + existing_by_key = {key(r): r for r in existing_by_id.values()} + result: UpsertResultDict = { + "updatedRecords": [], + "createdRecords": [], + "records": [], + } + + for record in records: + existing_record: Optional[RecordDict] + if "id" in record: + record_id = str(record.get("id")) + existing_record = existing_by_id[record_id] + existing_record["fields"].update(record["fields"]) + result["updatedRecords"].append(record_id) + result["records"].append(existing_record) + elif existing_record := existing_by_key.get(key(record)): + existing_record["fields"].update(record["fields"]) + result["updatedRecords"].append(existing_record["id"]) + result["records"].append(existing_record) + else: + created_record = self._table_create(table, record) + result["createdRecords"].append(created_record["id"]) + result["records"].append(created_record) + + return result + + +def coerce_fake_record(record: Union[AnyRecordDict, Fields]) -> RecordDict: + """ + Coerce a record dict or field mapping to the expected format for + an Airtable record, creating a fake ID and createdTime if necessary. + + >>> coerce_fake_record({"Name": "Alice"}) + {'id': 'rec000...', 'createdTime': '...', 'fields': {'Name': 'Alice'}} + """ + if "fields" not in record: + record = {"fields": cast(Fields, record)} + return { + "id": str(record.get("id") or fake_id()), + "createdTime": str(record.get("createdTime") or _now()), + "fields": record["fields"], } + + +def _extract_args( + args: Sequence[Any], + kwargs: Dict[str, Any], + extract: Optional[Sequence[str]] = None, +) -> Tuple[Any, ...]: + """ + Convenience function for functions/methods which accept either + a Table or a (base_id, table_name) as their first posargs. + """ + extract = extract or [] + extracted = set() + caller = inspect.stack()[1].function + + if type(args[0]) is Table: + args = (args[0].base.id, args[0].name, *args[1:]) + + argtypes = tuple(type(arg) for arg in args) + if argtypes[:2] != (str, str): + raise TypeError( + f"{caller} expected (str, str, ...), got ({', '.join(t.__name__ for t in argtypes)})" + ) + + for extract_name in extract: + if extract_name in kwargs: + extracted.add(extract_name) + args = (*args, kwargs.pop(extract_name)) + + if kwargs: + raise TypeError( + f"{caller} got unexpected keyword arguments: {', '.join(kwargs)}" + ) + if len(args) < len(extract) + 2 and len(extracted) < len(extract): + missing = set(extract) - extracted + raise TypeError(f"{caller} missing keyword arguments: {', '.join(missing)}") + + return tuple(args) diff --git a/pyairtable/utils.py b/pyairtable/utils.py index 510630fc..6b12744a 100644 --- a/pyairtable/utils.py +++ b/pyairtable/utils.py @@ -21,7 +21,7 @@ import requests from typing_extensions import ParamSpec, Protocol -from pyairtable.api.types import CreateAttachmentByUrl +from pyairtable.api.types import AnyRecordDict, CreateAttachmentByUrl, FieldValue P = ParamSpec("P") R = TypeVar("R", covariant=True) @@ -89,7 +89,7 @@ def attachment(url: str, filename: str = "") -> CreateAttachmentByUrl: Usage: >>> table = Table(...) - >>> profile_url = "https://myprofile.com/id/profile.jpg + >>> profile_url = "https://example.com/profile.jpg" >>> rec = table.create({"Profile Photo": [attachment(profile_url)]}) { 'id': 'recZXOZ5gT9vVGHfL', @@ -97,8 +97,8 @@ def attachment(url: str, filename: str = "") -> CreateAttachmentByUrl: 'attachment': [ { 'id': 'attu6kbaST3wUuNTA', - 'url': 'https://aws1.discourse-cdn.com/airtable/original/2X/4/411e4fac00df06a5e316a0585a831549e11d0705.png', - 'filename': '411e4fac00df06a5e316a0585a831549e11d0705.png' + 'url': 'https://content.airtable.com/...', + 'filename': 'profile.jpg' } ] }, @@ -206,13 +206,13 @@ def _wrapper(func: F) -> F: return _wrapper -class FetchMethod(Protocol, Generic[C, R]): +class _FetchMethod(Protocol, Generic[C, R]): def __get__(self, instance: C, owner: Any) -> Callable[..., R]: ... def __call__(self_, self: C, *, force: bool = False) -> R: ... -def cache_unless_forced(func: Callable[[C], R]) -> FetchMethod[C, R]: +def cache_unless_forced(func: Callable[[C], R]) -> _FetchMethod[C, R]: """ Wrap a method (e.g. ``Base.shares()``) in a decorator that will save a memoized version of the return value for future reuse, but will also @@ -232,7 +232,7 @@ def _inner(self: C, *, force: bool = False) -> R: _inner.__annotations__["force"] = bool _append_docstring_text(_inner, "Args:\n\tforce: |kwarg_force_metadata|") - return cast(FetchMethod[C, R], _inner) + return cast(_FetchMethod[C, R], _inner) def coerce_iso_str(value: Any) -> Optional[str]: @@ -261,6 +261,56 @@ def coerce_list_str(value: Optional[Union[str, Iterable[str]]]) -> List[str]: return list(value) +def fieldgetter( + *fields: str, + required: Union[bool, Iterable[str]] = False, +) -> Callable[[AnyRecordDict], Any]: + """ + Create a function that extracts ID, created time, or field values from a record. + Intended to be used in similar situations as + `operator.itemgetter `_. + + >>> record = {"id": "rec001", "fields": {"Name": "Alice"}} + >>> fieldgetter("Name")(record) + 'Alice' + >>> fieldgetter("id")(record) + 'rec001' + >>> fieldgetter("id", "Name", "Missing")(record) + ('rec001', 'Alice', None) + + Args: + fields: The field names to extract from the record. The values + ``"id"`` and ``"createdTime"`` are special cased; all other + values are interpreted as field names. + required: If True, will raise KeyError if a value is missing. + If False, missing values will return as None. + If a sequence of field names is provided, only those names + will be required. + """ + if isinstance(required, str): + required = {required} + elif required is True: + required = set(fields) + elif required is False: + required = [] + else: + required = set(required) + + def _get_field(record: AnyRecordDict, field: str) -> FieldValue: + src = record if field in ("id", "createdTime") else record["fields"] + if field in required and field not in src: + raise KeyError(field) + return src.get(field) + + if len(fields) == 1: + return partial(_get_field, field=fields[0]) + + def _getter(record: AnyRecordDict) -> Any: + return tuple(_get_field(record, field) for field in fields) + + return _getter + + # [[[cog]]] # import re # contents = "".join(open(cog.inFile).readlines()[:cog.firstLineNum]) @@ -284,6 +334,7 @@ def coerce_list_str(value: Optional[Union[str, Iterable[str]]]) -> List[str]: "datetime_to_iso_str", "docstring_from", "enterprise_only", + "fieldgetter", "is_airtable_id", "is_base_id", "is_field_id", @@ -291,4 +342,4 @@ def coerce_list_str(value: Optional[Union[str, Iterable[str]]]) -> List[str]: "is_table_id", "is_user_id", ] -# [[[end]]] (checksum: 2e24ae7bd070c354cece2852ade7cdf9) +# [[[end]]] (checksum: 7cf950d19fee128ae3f395ddbc475c0f) diff --git a/tests/test_testing.py b/tests/test_testing.py index 98b3d301..7400b0a0 100644 --- a/tests/test_testing.py +++ b/tests/test_testing.py @@ -1,50 +1,90 @@ -from unittest.mock import ANY, call - -import pytest +import re +from unittest.mock import ANY from pyairtable import testing as T -@pytest.mark.parametrize( - "funcname,sig,expected", - [ - ("fake_id", call(value=123), "rec00000000000123"), - ("fake_id", call("tbl", "x"), "tbl0000000000000x"), - ( - "fake_record", - call(id=123), - {"id": "rec00000000000123", "createdTime": ANY, "fields": {}}, - ), - ( - "fake_record", - call(id="recABC00000000123"), - {"id": "recABC00000000123", "createdTime": ANY, "fields": {}}, - ), - ( - "fake_record", - call({"A": 1}, 123), - {"id": "rec00000000000123", "createdTime": ANY, "fields": {"A": 1}}, - ), - ( - "fake_record", - call(one=1, two=2), - { - "id": ANY, - "createdTime": ANY, - "fields": {"one": 1, "two": 2}, - }, - ), - ( - "fake_user", - call("alice"), - { - "id": "usr000000000alice", - "email": "alice@example.com", - "name": "Fake User", - }, - ), - ], -) -def test_fake_function(funcname, sig, expected): - func = getattr(T, funcname) - assert func(*sig.args, **sig.kwargs) == expected +def test_fake_id(): + assert re.match(r"rec[a-zA-Z0-9]{14}", T.fake_id()) + assert T.fake_id(value=123) == "rec00000000000123" + assert T.fake_id("tbl", "x") == "tbl0000000000000x" + + +def test_fake_record(): + assert T.fake_record(id=123) == { + "id": "rec00000000000123", + "createdTime": ANY, + "fields": {}, + } + assert T.fake_record(id="recABC00000000123") == { + "id": "recABC00000000123", + "createdTime": ANY, + "fields": {}, + } + assert T.fake_record({"A": 1}, 123) == { + "id": "rec00000000000123", + "createdTime": ANY, + "fields": {"A": 1}, + } + assert T.fake_record(one=1, two=2) == { + "id": ANY, + "createdTime": ANY, + "fields": {"one": 1, "two": 2}, + } + + +def test_fake_user(): + user = T.fake_user() + assert user == { + "id": ANY, + "email": f"{user['id'].lower()}@example.com", + "name": "Fake User", + } + assert T.fake_user("Alice") == { + "id": "usr000000000Alice", + "email": "alice@example.com", + "name": "Alice", + } + + +def test_fake_attachment(): + assert T.fake_attachment() == { + "id": ANY, + "url": "https://example.com/", + "filename": "foo.txt", + "size": 100, + "type": "text/plain", + } + assert T.fake_attachment(url="https://example.com/image.png") == { + "id": ANY, + "url": "https://example.com/image.png", + "filename": "image.png", + "size": 100, + "type": "image/png", + } + assert T.fake_attachment(url="https://example.com", filename="image.png") == { + "id": ANY, + "url": "https://example.com", + "filename": "image.png", + "size": 100, + "type": "image/png", + } + + +def test_coerce_fake_record(): + assert T.coerce_fake_record({"Name": "Alice"}) == { + "id": ANY, + "createdTime": ANY, + "fields": {"Name": "Alice"}, + } + assert T.coerce_fake_record({"fields": {"Name": "Alice"}}) == { + "id": ANY, + "createdTime": ANY, + "fields": {"Name": "Alice"}, + } + assert T.coerce_fake_record({"id": "rec123", "fields": {"Name": "Alice"}}) == { + "id": "rec123", + "createdTime": ANY, + "fields": {"Name": "Alice"}, + } + assert T.coerce_fake_record(fake := T.fake_record()) == fake diff --git a/tests/test_testing__mock_airtable.py b/tests/test_testing__mock_airtable.py new file mode 100644 index 00000000..60168198 --- /dev/null +++ b/tests/test_testing__mock_airtable.py @@ -0,0 +1,266 @@ +from unittest.mock import ANY + +import pytest + +from pyairtable import testing as T + + +@pytest.fixture +def mock_airtable(requests_mock): + with T.MockAirtable() as m: + yield m + + +def test_not_reentrant(): + """ + Test that nested MockAirtable contexts raise an error. + """ + mocked = T.MockAirtable() + with mocked: + with pytest.raises(RuntimeError): + with mocked: + pass + + +def test_multiple_nested_contexts(): + """ + Test that nested MockAirtable contexts raise an error. + """ + with T.MockAirtable(): + with pytest.raises(RuntimeError): + with T.MockAirtable(): + pass + + +def test_add_records__ids(mock_airtable, table): + fake_records = [T.fake_record() for _ in range(3)] + mock_airtable.add_records(table.base.id, table.name, fake_records) + assert table.all() == fake_records + + +def test_add_records__ids_kwarg(mock_airtable, table): + fake_records = [T.fake_record() for _ in range(3)] + mock_airtable.add_records(table.base.id, table.name, records=fake_records) + assert table.all() == fake_records + + +def test_add_records__kwarg(mock_airtable, table): + fake_records = [T.fake_record() for _ in range(3)] + mock_airtable.add_records(table, records=fake_records) + assert table.all() == fake_records + + +def test_add_records__missing_kwarg(mock_airtable, table): + with pytest.raises(TypeError, match="add_records missing keyword"): + mock_airtable.add_records(table) + with pytest.raises(TypeError, match="add_records missing keyword"): + mock_airtable.add_records("base", "table") + + +def test_add_records__invalid_types(mock_airtable): + with pytest.raises( + TypeError, + match=r"add_records expected \(str, str, \.\.\.\), got \(int, float\)", + ): + mock_airtable.add_records(1, 2.0, records=[]) + + +def test_add_records__invalid_kwarg(mock_airtable, table): + with pytest.raises( + TypeError, + match="add_records got unexpected keyword arguments: asdf", + ): + mock_airtable.add_records(table, records=[], asdf=1) + + +@pytest.fixture +def mock_records(mock_airtable, table): + mock_records = [T.fake_record() for _ in range(5)] + mock_airtable.add_records(table, mock_records) + return mock_records + + +@pytest.fixture +def mock_record(mock_records): + return mock_records[0] + + +def test_set_records(mock_airtable, mock_records, table): + replace = [T.fake_record()] + mock_airtable.set_records(table, replace) + assert table.all() == replace + + +def test_set_records__ids(mock_airtable, mock_records, table): + replace = [T.fake_record()] + mock_airtable.set_records(table.base.id, table.name, replace) + assert table.all() == replace + + +def test_set_records__ids_kwarg(mock_airtable, mock_records, table): + replace = [T.fake_record()] + mock_airtable.set_records(table.base.id, table.name, records=replace) + assert table.all() == replace + + +def test_set_records__kwarg(mock_airtable, mock_records, table): + replace = [T.fake_record()] + mock_airtable.set_records(table, records=replace) + assert table.all() == replace + + +@pytest.mark.parametrize( + "funcname,expected", + [ + ("all", "mock_records"), + ("iterate", "[mock_records]"), + ("first", "mock_records[0]"), + ], +) +def test_table_iterate(mock_records, table, funcname, expected): + expected = eval(expected, {}, {"mock_records": mock_records}) + assert getattr(table, funcname)() == expected + + +def test_table_get(mock_record, table): + assert table.get(mock_record["id"]) == mock_record + + +def test_table_create(mock_airtable, table): + record = table.create(T.fake_record()["fields"]) + assert record in table.all() + + +def test_table_update(mock_record, table): + table.update(mock_record["id"], {"Name": "Bob"}) + assert table.get(mock_record["id"])["fields"]["Name"] == "Bob" + + +def test_table_delete(mock_record, table): + table.delete(mock_record["id"]) + assert mock_record not in table.all() + + +def test_table_batch_create(mock_airtable, mock_records, table): + mock_airtable.clear() + table.batch_create(mock_records) + assert all(r in table.all() for r in mock_records) + + +def test_table_batch_update(mock_records, table): + table.batch_update( + [{"id": record["id"], "fields": {"Name": "Bob"}} for record in mock_records] + ) + assert all(r["fields"]["Name"] == "Bob" for r in table.all()) + + +def test_table_batch_delete(mock_records, table): + table.batch_delete([r["id"] for r in mock_records]) + assert table.all() == [] + + +def test_table_batch_upsert(mock_airtable, table): + """ + Test that MockAirtable actually performs upsert logic correctly. + """ + mock_airtable.clear() + mock_airtable.add_records( + table, + [ + {"id": "rec001", "fields": {"Name": "Alice"}}, + {"id": "rec002", "fields": {"Name": "Bob"}}, + {"id": "rec003", "fields": {"Name": "Carol"}}, + ], + ) + table.batch_upsert( + records=[ + # matches by Name to rec001 + {"fields": {"Name": "Alice", "Email": "alice@example.com"}}, + # matches by Name to rec002 + {"fields": {"Name": "Bob", "Email": "bob@example.com"}}, + # matches by id to rec003 + {"id": "rec003", "fields": {"Email": "carol@example.com"}}, + # no match; will create the record + {"fields": {"Name": "Dave", "Email": "dave@example.com"}}, + ], + key_fields=["Name"], + ) + assert table.all() == [ + { + "id": "rec001", + "createdTime": ANY, + "fields": {"Name": "Alice", "Email": "alice@example.com"}, + }, + { + "id": "rec002", + "createdTime": ANY, + "fields": {"Name": "Bob", "Email": "bob@example.com"}, + }, + { + "id": "rec003", + "createdTime": ANY, + "fields": {"Name": "Carol", "Email": "carol@example.com"}, + }, + { + "id": ANY, + "createdTime": ANY, + "fields": {"Name": "Dave", "Email": "dave@example.com"}, + }, + ] + + +def test_table_batch_upsert__invalid_id(mock_airtable, table): + with pytest.raises(KeyError): + table.batch_upsert( + records=[ + # record does not exist + {"id": "rec999", "fields": {"Name": "Alice"}} + ], + key_fields=["Name"], + ) + + +@pytest.mark.parametrize( + "expr", + [ + "base.collaborators()", + "base.create_table('Name', fields=[])", + "base.delete()", + "base.shares()", + "base.webhooks()", + "table.add_comment('recordId', 'value')", + "table.comments('recordId')", + "table.create_field('name', 'type')", + "table.schema()", + ], +) +def test_unhandled_methods(mock_airtable, monkeypatch, expr, api, base, table): + """ + Test that unhandled methods raise an error. + """ + with pytest.raises(RuntimeError): + eval(expr, {}, {"api": api, "base": base, "table": table}) + + +def test_passthrough(mock_airtable, requests_mock, base, monkeypatch): + """ + Test that we can temporarily pass through unhandled methods to the requests library. + """ + requests_mock.get(base.meta_url("tables"), json={"tables": []}) + + with monkeypatch.context() as mctx: + mctx.setattr(mock_airtable, "passthrough", True) + assert base.schema(force=True).tables == [] # no RuntimeError + + with mock_airtable.enable_passthrough(): + assert base.schema(force=True).tables == [] # no RuntimeError + with mock_airtable.disable_passthrough(): + with pytest.raises(RuntimeError): + base.schema(force=True) + + with mock_airtable.set_passthrough(True): + assert base.schema(force=True).tables == [] # no RuntimeError + + with mock_airtable.set_passthrough(False): + with pytest.raises(RuntimeError): + base.schema(force=True) diff --git a/tests/test_utils.py b/tests/test_utils.py index b18e28d8..c8b9dd40 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -4,6 +4,7 @@ import pytest from pyairtable import utils +from pyairtable.testing import fake_record utc_tz = partial(datetime, tzinfo=timezone.utc) @@ -97,3 +98,53 @@ def test_converter(func, input, expected): return assert func(input) == expected + + +def test_fieldgetter(): + get_a = utils.fieldgetter("A") + get_abc = utils.fieldgetter("A", "B", "C") + + assert get_a(fake_record(A=1)) == 1 + assert get_a({"fields": {"A": 1}}) == 1 + assert get_abc(fake_record(A=1, C=3)) == (1, None, 3) + assert get_abc({"fields": {"A": 1, "C": 3}}) == (1, None, 3) + + record = fake_record(A="one", B="two") + assert get_a(record) == "one" + assert get_abc(record) == ("one", "two", None) + assert utils.fieldgetter("id")(record) == record["id"] + assert utils.fieldgetter("createdTime")(record) == record["createdTime"] + + +def test_fieldgetter__required(): + """ + Test that required=True means all fields are required. + """ + require_ab = utils.fieldgetter("A", "B", required=True) + record = fake_record(A="one", B="two") + assert require_ab(record) == ("one", "two") + with pytest.raises(KeyError): + require_ab(fake_record(A="one")) + + +def test_fieldgetter__required_list(): + """ + Test that required=["A", "B"] means only A and B are required. + """ + get_abc_require_ab = utils.fieldgetter("A", "B", "C", required=["A", "B"]) + record = fake_record(A="one", B="two") + assert get_abc_require_ab(record) == ("one", "two", None) + with pytest.raises(KeyError): + get_abc_require_ab(fake_record(A="one", C="three")) + + +def test_fieldgetter__required_str(): + """ + Test that required="Bravo" means only Bravo is required, + rather than ["B", "r", "a", "v", "o"]. + """ + get_abc_require_b = utils.fieldgetter("Alpha", "Bravo", required="Bravo") + record = fake_record(Alpha="one", Bravo="two") + assert get_abc_require_b(record) == ("one", "two") + with pytest.raises(KeyError): + get_abc_require_b(fake_record(Alpha="one"))