Skip to content

Commit

Permalink
Merge pull request #31 from certego/develop
Browse files Browse the repository at this point in the history
0.4.0
  • Loading branch information
0ssigeno authored Oct 10, 2022
2 parents 637f8ea + 2d089aa commit 2d08f40
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 149 deletions.
53 changes: 38 additions & 15 deletions atlasq/queryset/index.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import fnmatch
from enum import Enum
from logging import getLogger
from typing import Dict, List

import requests
from requests.auth import HTTPDigestAuth

from atlasq.queryset.exceptions import AtlasIndexError
from atlasq.queryset.exceptions import AtlasIndexError, AtlasIndexFieldError

logger = getLogger(__name__)

Expand All @@ -16,14 +17,26 @@
)


class AtlasIndexType(Enum):
DOCUMENT = "document"
EMBEDDED_DOCUMENT = "embeddedDocuments"
STRING = "string"
INTEGER = "integer"
BOOLEAN = "boolean"
DATE = "date"

@classmethod
def values(cls) -> List[str]:
return [e.value for e in cls]


class AtlasIndex:

fields_to_copy = ["ensured", "_indexed_fields", "use_embedded_documents"]
fields_to_copy = ["ensured", "_indexed_fields"]

def __init__(self, index_name: str, use_embedded_documents: bool = True):
self._indexed_fields: List[str] = []
def __init__(self, index_name: str):
self._indexed_fields: Dict[str, str] = {}
self.ensured: bool = False
self.use_embedded_documents: bool = use_embedded_documents
self._index: str = index_name

def __copy__(self):
Expand Down Expand Up @@ -72,26 +85,36 @@ def ensure_index_exists(
return self.ensured

def _set_indexed_fields(self, index_result: Dict, base_field: str = ""):
if index_result["type"] == "document":
self._indexed_fields.append(base_field)
if index_result.get("dynamic", False):
self._indexed_fields.append(f"{base_field}.*" if base_field else "*")
else:
lucene_type = index_result["type"]
if lucene_type in [
AtlasIndexType.DOCUMENT.value,
AtlasIndexType.EMBEDDED_DOCUMENT.value,
]:
if not index_result.get("dynamic", False):
for field, value in index_result.get("fields", {}).items():
field = f"{base_field}.{field}" if base_field else field
self._set_indexed_fields(value, base_field=field)

else:
assert base_field
self._indexed_fields.append(base_field)
else:
self._indexed_fields[f"{base_field}.*" if base_field else "*"] = ""
if base_field:
if lucene_type not in AtlasIndexType.values():
logger.warning(f"Lucene type {lucene_type} not configured")
self._indexed_fields[base_field] = lucene_type

def _set_indexed_from_mappings(self, index_result: Dict):
mappings = index_result["mappings"]
mappings["type"] = "document"
mappings["type"] = AtlasIndexType.DOCUMENT.value
self._set_indexed_fields(mappings)
logger.debug(self._indexed_fields)

def ensure_keyword_is_indexed(self, keyword: str):
if not self.ensured:
raise AtlasIndexError("Index not ensured")
return any(fnmatch.fnmatch(keyword, field) for field in self._indexed_fields)

def get_type_from_keyword(self, keyword) -> str:
if not self.ensured:
raise AtlasIndexError("Index not ensured")
if keyword in self._indexed_fields:
return self._indexed_fields[keyword]
raise AtlasIndexFieldError(f"Keyword {keyword} not present in index")
7 changes: 1 addition & 6 deletions atlasq/queryset/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,9 @@ def __init__(
self,
atlas_index: Union[str, None],
save_execution_time: bool = False,
use_embedded_documents_indexes: bool = True,
):
super().__init__()
self._index = (
AtlasIndex(atlas_index, use_embedded_documents_indexes)
if atlas_index
else None
)
self._index = AtlasIndex(atlas_index) if atlas_index else None
self._save_execution_time = save_execution_time

def __get__(self, instance, owner):
Expand Down
81 changes: 46 additions & 35 deletions atlasq/queryset/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from mongoengine import QuerySet

from atlasq.queryset.exceptions import AtlasFieldError, AtlasIndexFieldError
from atlasq.queryset.index import AtlasIndex
from atlasq.queryset.index import AtlasIndex, AtlasIndexType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,29 +59,40 @@ class AtlasTransform:
"match",
]

def __init__(self, atlas_query):
def __init__(self, atlas_query, atlas_index: AtlasIndex):
self.atlas_query = atlas_query
self.atlas_index = atlas_index

def _regex(self, path: str, value: str):
return {"regex": {"query": value, "path": path}}

def _embedded_document(self, path: List[str], operator: Dict):
# recursive
if len(path) > 2:
new_path = path[1:]
new_path[0] = f"{path[0]}.{new_path[0]}"

return {
"embeddedDocument": {
"path": path[0],
"operator": self._embedded_document(new_path, operator),
}
def _embedded_document(self, path: str, content: Dict):
return {
"embeddedDocument": {
"path": path,
"operator": content,
}
# real exit case
if len(path) > 1:
return {"embeddedDocument": {"path": path[0], "operator": operator}}
# we do nothing in case it was not an embedded document
return operator
}

def _convert_to_embedded_document(
self, path: List[str], operator: Dict, start: str = ""
):
element = path.pop(0)
partial_path = f"{start}.{element}" if start else element
if not self.atlas_index.ensured:
return operator
if (
self.atlas_index.get_type_from_keyword(partial_path)
!= AtlasIndexType.EMBEDDED_DOCUMENT.value
):
return operator

if not path:
return operator
return self._embedded_document(
partial_path,
self._convert_to_embedded_document(path, operator, start=partial_path),
)

def _exists(self, path: str) -> Dict:
return {"exists": {"path": path}}
Expand Down Expand Up @@ -134,15 +145,18 @@ def _size(self, path: str, value: int, operator: str) -> Dict:
}
}

def _ensure_keyword_is_indexed(self, atlas_index: AtlasIndex, keyword: str) -> None:
if not atlas_index.ensure_keyword_is_indexed(keyword):
raise AtlasIndexFieldError(
f"The keyword {keyword} is not indexed in {atlas_index.index}"
)
def _ensure_path_is_indexed(self, path: List[str]) -> None:
start = ""
for element in path:
partial_path = f"{start}.{element}" if start else element

if not self.atlas_index.ensure_keyword_is_indexed(partial_path):
raise AtlasIndexFieldError(
f"The keyword {partial_path} is not indexed in {self.atlas_index.index}"
)
start = partial_path

def transform(
self, atlas_index: AtlasIndex
) -> Tuple[List[Dict], List[Dict], List[Dict]]:
def transform(self) -> Tuple[List[Dict], List[Dict], List[Dict]]:
other_aggregations = []
affirmative = []
negative = []
Expand Down Expand Up @@ -210,15 +224,12 @@ def transform(
obj = self._text(path, value)

if obj:
if atlas_index.use_embedded_documents:
# we are wrapping the result to an embedded document
obj = self._embedded_document(path.split("."), obj)

if atlas_index.ensured:
# if we are using the embedded object, in the index is defined only the first level
if atlas_index.use_embedded_documents:
path = path.split(".", maxsplit=1)[0]
self._ensure_keyword_is_indexed(atlas_index, path)
# we are wrapping the result to an embedded document
obj = self._convert_to_embedded_document(path.split("."), obj)

if self.atlas_index.ensured:

self._ensure_path_is_indexed(path.split("."))
logger.debug(obj)

if to_go == 1:
Expand Down
6 changes: 3 additions & 3 deletions atlasq/queryset/visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ def visit_combination(self, combination) -> List[Dict]:
def visit_query(self, query) -> List[Dict]:
from atlasq.queryset.transform import AtlasTransform

affirmative, negative, aggregations = AtlasTransform(query.query).transform(
self.atlas_index
)
affirmative, negative, aggregations = AtlasTransform(
query.query, self.atlas_index
).transform()
filters = {}
if affirmative:
filters.setdefault("compound", {})["filter"] = affirmative
Expand Down
10 changes: 7 additions & 3 deletions tests/queryset/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def raise_for_status(self):
raise HTTPError(self.status_code)


class TestManager(TestBaseCase):
class TestIndex(TestBaseCase):
def test_ensure_keyword_is_indexed(self):
index = AtlasIndex("myindex")
index._indexed_fields = ["field1", "field2.*"]
Expand Down Expand Up @@ -61,10 +61,13 @@ def test_set_indexed_from_mappings(self):
}
)
self.assertCountEqual(
index._indexed_fields,
index._indexed_fields.keys(),
[
"field1",
"field1.*",
"field2",
"field2.field",
"field2.field4",
"field2.field4.field5",
"field2.field4.field6",
],
Expand All @@ -89,7 +92,8 @@ def test_set_indexed_fields(self):
}
)
self.assertCountEqual(
index._indexed_fields, ["field1", "field2.field3", "field2.field4"]
index._indexed_fields,
["field1", "field2", "field2.field3", "field2.field4"],
)
index._indexed_fields.clear()
index._set_indexed_fields(
Expand Down
Loading

0 comments on commit 2d08f40

Please sign in to comment.