Skip to content

Commit

Permalink
Merge pull request #9 from certego/develop
Browse files Browse the repository at this point in the history
0.1.0
  • Loading branch information
0ssigeno authored Jul 4, 2022
2 parents 9633166 + a015fef commit 0a29f39
Show file tree
Hide file tree
Showing 18 changed files with 311 additions and 528 deletions.
21 changes: 14 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ The main idea, is that the `filter` should work like an `aggregation`.
For doing so, and with keeping the compatibility on how MongoEngine works (i.e. the filter should return a queryset of `Document`) we had to do some work.
Calling `.aggregate` instead has to work as MongoEngine expect, meaning a list of dictionaries.
#### Features
##### Cache
To complicate things, we even decided to add a cache!
The issue there is that we do query on a large collection, and the query is composed of many clauses, meaning that
the query could take 2/3 minutes to have a result. The second constraint is that the result of this query does not change rapidly
(or at least not the `filtering` part). If the cache is enabled, we save the objects retrieved in a new temporary collection
and future queries that checks the same parameters, will retrieve the documents from this new collection.

##### Validation
We also decided to have, optionally, a validation of the index.
Expand All @@ -45,10 +39,12 @@ from mongoengine import Document, fields

from atlasq import AtlasManager, AtlasQ, AtlasQuerySet

index_name = str("my_index")

class MyDocument(Document):
name = fields.StringField(required=True)
surname = fields.StringField(required=True)
atlas = AtlasManager("myindex", "default")
atlas = AtlasManager(index_name)

obj = MyDocument.objects.create(name="value", surname="value2")

Expand All @@ -59,4 +55,15 @@ assert obj == obj_from_atlas

obj2_from_atlas = MyDocument.atlas.get(AtlasQ(name="value") & AtlasQ(surname="value2"))
assert obj == obj2_from_atlas


obj3_from_atlas = MyDocument.atlas.get(AtlasQ(wrong_field="value"))
assert obj3_from_atlas is None

result = MyDocument.atlas.ensure_index("user", "pwd", "group", "cluster")
assert result is True
obj3_from_atlas = MyDocument.atlas.get(AtlasQ(wrong_field="value")) # raises AtlasIndexFieldError



```
11 changes: 10 additions & 1 deletion atlasq/queryset/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
from atlasq.queryset.exceptions import AtlasIndexError, AtlasIndexFieldError
from atlasq.queryset.index import AtlasIndex
from atlasq.queryset.manager import AtlasManager
from atlasq.queryset.node import AtlasQ
from atlasq.queryset.queryset import AtlasQuerySet

__all__ = ["AtlasQ", "AtlasQuerySet", "AtlasManager"]
__all__ = [
"AtlasQ",
"AtlasQuerySet",
"AtlasManager",
"AtlasIndex",
"AtlasIndexFieldError",
"AtlasIndexError",
]
16 changes: 7 additions & 9 deletions atlasq/queryset/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
import hashlib
import json
import logging
from typing import Any, Dict, List, Tuple, Union
from typing import Any, Dict, List, Tuple, Type, Union

from mongoengine import Document, QuerySet, get_db
from mongoengine.context_managers import switch_collection, switch_db
from pymongo.collection import Collection

logger = logging.getLogger(__name__)

Expand All @@ -20,9 +19,9 @@ class KeyError(KeyError):
def __init__(self, key):
super().__init__(f"Cache key {key} not found")

def __init__(self, document: Document, collection: Collection, **kwargs):
def __init__(self, document: Type[Document], **kwargs):
self._document = document
self._collection = collection
self._collection = self._document._get_collection_name()
self._max_minutes = 30

def _aggregations_to_key(self, aggregations: List[Dict]) -> str:
Expand All @@ -48,12 +47,11 @@ def remove(self, aggregations: List[Dict]) -> None:
class AtlasDbCache(_AtlasCache):
def __init__(
self,
document: Document,
collection: Collection,
document: Type[Document],
db_connection_alias: str,
**kwargs,
):
super().__init__(document, collection, **kwargs)
super().__init__(document, **kwargs)
self._db_connection_alias = db_connection_alias
self.db_name = get_db(self._db_connection_alias).name

Expand Down Expand Up @@ -126,8 +124,8 @@ def set_collection_expiration(


class AtlasRamCache(_AtlasCache):
def __init__(self, document: Document, collection: Collection, **kwargs):
super().__init__(document, collection, **kwargs)
def __init__(self, document: Type[Document], **kwargs):
super().__init__(document, **kwargs)
self._cache: Dict[str, Tuple[datetime.datetime, QuerySet]] = {}

def remove(self, aggregations: List[Dict]) -> None:
Expand Down
14 changes: 14 additions & 0 deletions atlasq/queryset/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class AtlasError(Exception):
"""Base class for all AtlasQ exceptions."""


class AtlasIndexFieldError(AtlasError):
pass


class AtlasIndexError(AtlasError):
pass


class AtlasFieldError(AtlasError):
pass
12 changes: 10 additions & 2 deletions atlasq/queryset/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import requests
from requests.auth import HTTPDigestAuth

from atlasq.queryset.exceptions import AtlasIndexError

logger = getLogger(__name__)

ATLAS_BASE_URL = "https://cloud.mongodb.com/api/atlas/v1.0"
Expand All @@ -20,6 +22,12 @@ def __init__(self, index_name: str):
self.ensured: bool = False
self._index: str = index_name

def __copy__(self):
res = AtlasIndex(self._index)
res.ensured = self.ensured
res._indexed_fields = self._indexed_fields
return res

@property
def index(self) -> str:
return self._index
Expand All @@ -39,7 +47,7 @@ def ensure_index_exists(
collection_name: str,
):
if not self.index:
raise ValueError("No index defined")
raise AtlasIndexError("No index defined")
url = LIST_TEXT_INDEXES_ENDPOINT.format(
GROUP_ID=group_id,
CLUSTER_NAME=cluster_name,
Expand Down Expand Up @@ -80,5 +88,5 @@ def _set_indexed_from_mappings(self, index_result: Dict):

def ensure_keyword_is_indexed(self, keyword: str):
if not self.ensured:
raise ValueError("Index not ensured")
raise AtlasIndexError("Index not ensured")
return any(fnmatch.fnmatch(keyword, field) for field in self._indexed_fields)
11 changes: 3 additions & 8 deletions atlasq/queryset/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from mongoengine import QuerySetManager

from atlasq.queryset import AtlasIndex
from atlasq.queryset.queryset import AtlasQuerySet


Expand All @@ -15,20 +16,14 @@ def default(self):
if self._index:
return AtlasQuerySet
res = super().default
res.cache_expire_in = lambda x, y: x
res.sort_by_count = lambda x, field: x.aggregate(
[{"$sortByCount": f"${field}"}]
)
return res

def __init__(self, index: Union[str, None], cache_db_alias: str):
def __init__(self, atlas_index: Union[str, None]):
super().__init__()
self._index = index
self._cache_db_alias = cache_db_alias
self._index = AtlasIndex(atlas_index) if atlas_index else None

def __get__(self, instance, owner):
queryset = super().__get__(instance, owner)
if isinstance(queryset, AtlasQuerySet):
queryset.index = self._index
queryset.cache = self._cache_db_alias
return queryset
8 changes: 3 additions & 5 deletions atlasq/queryset/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ def operation(self):
return self.AND

def to_query( # pylint: disable=arguments-differ
self, document, atlas_index: Union[None, AtlasIndex] = None
) -> Tuple[Dict, List[Dict]]:
if AtlasIndex is None:
return super().to_query(document)
self, document, atlas_index: AtlasIndex
) -> List[Dict]:
logger.debug(f"to_query {self.__class__.__name__} {document}")
query = self.accept(AtlasSimplificationVisitor())
query = query.accept(AtlasQueryCompilerVisitor(document, atlas_index))
Expand All @@ -31,7 +29,7 @@ def to_query( # pylint: disable=arguments-differ
def _combine(self, other, operation) -> Union["AtlasQ", "AtlasQCombination"]:
logger.debug(f"_combine {self.__class__.__name__} {other}, {operation}")

result = super(AtlasQ, self)._combine(other, operation)
result = super()._combine(other, operation)
if isinstance(result, QCombination):
return AtlasQCombination(result.operation, result.children)
return AtlasQ(**result.query)
Expand Down
Loading

0 comments on commit 0a29f39

Please sign in to comment.