Skip to content

Commit

Permalink
MongoDB/PyMongo: Add querying capabilities using JessiQL
Browse files Browse the repository at this point in the history
With corresponding improvements, the amalgamated PyMongo driver can now
run 95% of the MongoDB "getting started" tutorial successfully.
  • Loading branch information
amotl committed Jul 6, 2024
1 parent 35775a0 commit d0fbc8a
Show file tree
Hide file tree
Showing 13 changed files with 548 additions and 34 deletions.
42 changes: 42 additions & 0 deletions cratedb_toolkit/adapter/pymongo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Amalgamated PyMongo driver using CrateDB as backend


## Setup

```shell
pip install 'cratedb-toolkit[pymongo]'
```

```shell
docker run --rm -it --name=cratedb \
--publish=4200:4200 --publish=5432:5432 \
--env=CRATE_HEAP_SIZE=4g crate/crate:nightly \
-Cdiscovery.type=single-node
```


## Synopsis
```python
import pymongo
from cratedb_toolkit.adapter.pymongo import PyMongoCrateDbAdapter

with PyMongoCrateDbAdapter(dburi="crate://crate@localhost:4200"):
client = pymongo.MongoClient("localhost", 27017)
collection = client.foo.bar

inserted_id = collection.insert_one({"author": "Mike", "text": "My first blog post!"}).inserted_id
print(inserted_id)

document = collection.find({"author": "Mike"})
print(document)
```


## Examples

To inspect the capabilities of the driver adapter, there is a basic program at
[pymongo_adapter.py], and test cases at [test_pymongo.py].


[pymongo_adapter.py]: https://github.com/crate-workbench/cratedb-toolkit/blob/main/examples/pymongo_adapter.py
[test_pymongo.py]: https://github.com/crate-workbench/cratedb-toolkit/blob/main/tests/adapter/test_pymongo.py
9 changes: 7 additions & 2 deletions cratedb_toolkit/adapter/pymongo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import pymongo.collection

from cratedb_toolkit.adapter.pymongo.collection import collection_factory
from cratedb_toolkit.sqlalchemy.patch import patch_types_map
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.pandas import patch_pandas_sqltable_with_extended_mapping

patch_types_map()


class PyMongoCrateDbAdapter:
Expand Down Expand Up @@ -39,9 +43,10 @@ def configure_sqlalchemy(self):
https://cratedb.com/docs/crate/reference/en/latest/general/ddl/column-policy.html#dynamic
"""
# TODO: Provide unpatching hook.
from cratedb_toolkit.util.pandas import patch_pandas_io_sqldatabase_with_dialect_parameters
from cratedb_toolkit.util.pandas import patch_pandas_sqltable_with_dialect_parameters

patch_pandas_io_sqldatabase_with_dialect_parameters(table_kwargs={"crate_column_policy": "'dynamic'"})
patch_pandas_sqltable_with_dialect_parameters(table_kwargs={"crate_column_policy": "'dynamic'"})
patch_pandas_sqltable_with_extended_mapping()

def activate_pymongo_adapter(self):
"""
Expand Down
36 changes: 30 additions & 6 deletions cratedb_toolkit/adapter/pymongo/backlog.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,46 @@
# PyMongo CrateDB Adapter
# PyMongo CrateDB Adapter Backlog


## Iteration +1

Make it work.
- Add documentation.
- Add missing essential querying features: Examples: sort order, skip, limit
- Add missing essential methods. Example: `db.my_collection.drop()`.

## Iteration +2

- Decoding timestamps are yielding only Decimals instead of datetime objects
when printed on the terminal? See example program.
There is also a warning:
```shell
SAWarning: Dialect crate+crate-python does *not* support Decimal objects natively,
and SQLAlchemy must convert from floating point - rounding errors and other issues
may occur. Please consider storing Decimal numbers as strings or integers on this
platform for lossless storage.
```
So, why are `Decimal` types used here, at all?
- Mimic MongoDB exceptions.
Example:
```python
jessiql.exc.InvalidColumnError: Invalid column "x" for "Surrogate" specified in sort
```
## Done
- Make it work
- Using individual columns for fields does not work, because `insert_one` works
iteratively, and needs to evolve the table schema gradually. As a consequence,
we need to use `OBJECT(DYNAMIC)` for storing MongoDB fields.
- Add software tests
- Add documentation

## Iteration +2
Translate query expressions.
### Research
How to translate a MongoDB query expression?
- https://github.com/gordonbusyman/mongo-to-sql-converter
- https://github.com/2do2go/json-sql
- https://github.com/kolypto/py-mongosql
- https://github.com/SY-Xuan/mongo2sql
- https://github.com/nsragow/MongoToSqlParse
- https://github.com/sushmitharao2124/MongoToSQLConverter
Expand Down
64 changes: 55 additions & 9 deletions cratedb_toolkit/adapter/pymongo/collection.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import io
import logging
from typing import Any, Mapping, Optional, Union
from collections import abc
from typing import Any, Iterable, Iterator, Mapping, Optional, Union

import pandas as pd
from bson.raw_bson import RawBSONDocument
from pymongo import common
from pymongo.client_session import ClientSession
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from pymongo.results import InsertOneResult
from pymongo.results import InsertManyResult, InsertOneResult
from pymongo.typings import _DocumentType

from cratedb_toolkit.adapter.pymongo.cursor import cursor_factory
from cratedb_toolkit.adapter.pymongo.util import AmendedObjectId
from cratedb_toolkit.adapter.pymongo.util import AmendedObjectId as ObjectId
from cratedb_toolkit.util import DatabaseAdapter

from sqlalchemy_cratedb.support import insert_bulk

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -50,12 +54,9 @@ def insert_one(
session: Optional[ClientSession] = None,
comment: Optional[Any] = None,
) -> InsertOneResult:
logger.debug(
f"Pretending to insert document into MongoDB: database={self.database.name}, collection={self.name}"
)
logger.debug(f"Document: {document}")
logger.debug(f"Reading document: {document}")
data = pd.DataFrame.from_records([document])
# logger.debug(f"Dataframe: {self.get_df_info()}, {data.tail()}") # noqa: ERA001
# logger.debug(f"Dataframe: {self.get_df_info(data)}, {data.tail()}") # noqa: ERA001
logger.debug(f"Inserting record into CrateDB: schema={self.database.name}, table={self.name}")

object_id_cratedb: Optional[str] = None
Expand Down Expand Up @@ -101,8 +102,53 @@ def insert_returning_id(pd_table, conn, keys, data_iter):
if object_id_cratedb is None:
raise ValueError("Object may have been created, but there is no object id")

object_id_mongodb = AmendedObjectId.from_str(object_id_cratedb)
object_id_mongodb = ObjectId.from_str(object_id_cratedb)
logger.debug(f"Created object with id: {object_id_mongodb!r}")
return InsertOneResult(inserted_id=object_id_mongodb, acknowledged=True)

def insert_many(
self,
documents: Iterable[Union[_DocumentType, RawBSONDocument]],
ordered: bool = True,
bypass_document_validation: bool = False,
session: Optional[ClientSession] = None,
comment: Optional[Any] = None,
) -> InsertManyResult:
if not isinstance(documents, abc.Iterable) or isinstance(documents, abc.Mapping) or not documents:
raise TypeError("documents must be a non-empty list")
inserted_ids: list[ObjectId] = []

def gen() -> Iterator[Mapping[str, Any]]:
"""A generator that validates documents and handles _ids."""
for document in documents:
common.validate_is_document_type("document", document)
if not isinstance(document, RawBSONDocument):
if "_id" in document:
identifier = ObjectId(document["_id"])
else:
identifier = ObjectId()
document["_id"] = str(identifier) # type: ignore[index]
inserted_ids.append(identifier)
yield document

logger.debug("Converting documents")
documents_real = list(gen())

logger.debug(f"Reading documents: {documents_real}")
data = pd.DataFrame.from_records(documents_real)
logger.debug(f"Dataframe: {self.get_df_info(data)}, {data.tail()}") # noqa: ERA001
logger.debug(f"Inserting records into CrateDB: schema={self.database.name}, table={self.name}")

data.to_sql(
name=self.name,
schema=self.database.name,
con=cratedb.engine,
index=False,
# TODO: Handle `append` vs. `replace`.
if_exists="append",
method=insert_bulk,
)

return InsertManyResult(inserted_ids, acknowledged=True)

return AmendedCollection
30 changes: 27 additions & 3 deletions cratedb_toolkit/adapter/pymongo/cursor.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import copy
import logging
import warnings
from collections import deque
from typing import Any, Iterable, Mapping, Optional, Union

import sqlalchemy as sa
from bson import SON
from pymongo import CursorType, helpers
from pymongo.client_session import ClientSession
from pymongo.collation import validate_collation_or_none
from pymongo.collection import Collection
from pymongo.common import validate_is_document_type
from pymongo.common import validate_is_document_type, validate_is_mapping
from pymongo.cursor import _QUERY_OPTIONS, Cursor, _Hint, _Sort
from pymongo.errors import InvalidOperation
from pymongo.message import _GetMore, _Query
from pymongo.read_preferences import _ServerMode
from pymongo.typings import _Address, _CollationIn, _DocumentType
from pymongo.write_concern import validate_boolean
from sqlalchemy.util import to_list

from cratedb_toolkit.adapter.pymongo.reactor import mongodb_query, table_to_model
from cratedb_toolkit.adapter.pymongo.util import AmendedObjectId
from cratedb_toolkit.util import DatabaseAdapter

Expand Down Expand Up @@ -77,7 +82,6 @@ def __init__(
self.__explicit_session = False

spec: Mapping[str, Any] = filter or {}
"""
validate_is_mapping("filter", spec)
if not isinstance(skip, int):
raise TypeError("skip must be an instance of int")
Expand Down Expand Up @@ -112,7 +116,6 @@ def __init__(
if allow_disk_use is not None:
allow_disk_use = validate_boolean("allow_disk_use", allow_disk_use)

"""
if projection is not None:
projection = helpers._fields_list_to_dict(projection, "projection")

Expand Down Expand Up @@ -279,6 +282,12 @@ def _refresh(self) -> int:

return len(self.__data)

def sort(self, key_or_list: _Hint, direction: Optional[Union[int, str]] = None) -> Cursor[_DocumentType]:
""" """
keys = helpers._index_list(key_or_list, direction)
self.__ordering = to_list(helpers._index_document(keys).to_dict()) # type: ignore[assignment]
return self

def __send_message(self, operation: Union[_Query, _GetMore]) -> None:
"""
Usually sends a query or getmore operation and handles the response to/from a MongoDB server.
Expand All @@ -287,6 +296,19 @@ def __send_message(self, operation: Union[_Query, _GetMore]) -> None:
TODO: OperationFailure / self.close() / PinnedResponse / explain / batching
"""
metadata = sa.MetaData(schema=operation.db)
table_name = operation.coll

table = sa.Table(table_name, metadata, autoload_with=cratedb.engine)
table.append_column(sa.Column("_id", sa.String(), primary_key=True, system=True))
model = table_to_model(table)

query = mongodb_query(
model=model,
filter=dict(self.__spec) or {},
sort=self.__ordering and list(self.__ordering) or ["_id"],
)
"""
tbl = f'"{operation.db}"."{operation.coll}"'
sql = f"SELECT *, _id FROM {tbl}" # noqa: S608
if self.__ordering is not None:
Expand All @@ -307,6 +329,8 @@ def __send_message(self, operation: Union[_Query, _GetMore]) -> None:
sql += f" LIMIT {operation.limit}"
logger.debug(f"Running SQL: {sql}")
records = cratedb.run_sql(sql, records=True)
"""
records = query.fetchall(cratedb.connection)
for record in records:
record["_id"] = AmendedObjectId.from_str(record["_id"])
self.__data = deque(records)
Expand Down
74 changes: 74 additions & 0 deletions cratedb_toolkit/adapter/pymongo/reactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import typing as t
from collections import abc

import sqlalchemy as sa
from jessiql import Query, QueryObject, QueryObjectDict
from jessiql.exc import InvalidColumnError
from jessiql.typing import SARowDict


def table_to_model(table: sa.Table) -> t.Type[sa.orm.Mapper]:
"""
Create SQLAlchemy model class from Table object.
- https://docs.sqlalchemy.org/en/14/orm/mapping_styles.html#imperative-mapping
- https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
"""
mapper_registry = sa.orm.registry(metadata=table.metadata)
Surrogate = type("Surrogate", (), {})
mapper_registry.map_imperatively(Surrogate, table)
return Surrogate


def reflect_model(engine: t.Any, metadata: sa.MetaData, table_name: str) -> t.Type[sa.orm.Mapper]:
"""
Create SQLAlchemy model class by reflecting a database table.
"""
table = sa.Table(table_name, metadata, autoload_with=engine)
return table_to_model(table)


def mongodb_query(
model: t.Type[sa.orm.Mapper],
select: t.Union[t.List, None] = None,
filter: t.Union[t.Dict[str, t.Any], None] = None, # noqa: A002
sort: t.Union[t.List[str], None] = None,
) -> Query:
"""
Create a JessiQL Query object from an SQLAlchemy model class and typical MongoDB query parameters.
"""

select = select or list(model._sa_class_manager.keys()) # type: ignore[attr-defined]

filter = filter or {} # noqa: A001
sort = sort or []

# TODO: select, filter, sort, skip, limit
if "_id" in filter:
filter["_id"] = str(filter["_id"])
query_dict = QueryObjectDict({"select": select, "filter": filter, "sort": sort})
query_object = QueryObject.from_query_object(query_dict)

try:
return Query(query=query_object, Model=model)
except InvalidColumnError as ex:
msg = str(ex)
if "Invalid column" in msg and "specified in filter" in msg:
return EmptyQuery()
else:
raise


class EmptyQuery(Query):
"""
A surrogate QueryExecutor for propagating back empty results.
"""

def __init__(self, *args, **kwargs):
self.related_executors = {}

def _load_results(self, *args, **kwargs) -> abc.Iterator[SARowDict]:
raise StopIteration()

def _apply_operations_to_results(self, *args, **kwargs) -> t.List[SARowDict]:
return []
6 changes: 5 additions & 1 deletion cratedb_toolkit/adapter/pymongo/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ def __repr__(self) -> str:
return f"ObjectId('{self!s}')"

def __generate(self) -> None:
"""Generate a new value for this ObjectId."""
"""
Generate a new value for this ObjectId.
TODO: Generate IDs of the same width like CrateDB.
"""
# 4 bytes current time
oid = struct.pack(">I", int(time.time()))

Expand Down
Loading

0 comments on commit d0fbc8a

Please sign in to comment.