-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathmongo_cache.py
105 lines (79 loc) · 3 KB
/
mongo_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from datetime import datetime
from typing import Callable
import pymongo
from pyeudiw.storage.base_cache import BaseCache, RetrieveStatus
from pymongo.collection import Collection
from pymongo.mongo_client import MongoClient
from pymongo.database import Database
class MongoCache(BaseCache):
"""
MongoDB cache implementation.
"""
def __init__(self, conf: dict, url: str, connection_params: dict = None) -> None:
"""
Create a MongoCache istance.
:param conf: the configuration of the cache.
:type conf: dict
:param url: the url of the MongoDB server.
:type url: str
:param connection_params: the connection parameters.
:type connection_params: dict, optional
"""
super().__init__()
self.storage_conf = conf
self.url = url
self.connection_params = connection_params
self.client: MongoClient = None
self.db: Database = None
self.collection: Collection = None
def close(self) -> None:
self._connect()
self.client.close()
def try_retrieve(self, object_name: str, on_not_found: Callable[[], str]) -> tuple[dict, RetrieveStatus]:
self._connect()
query = {"object_name": object_name}
cache_object = self.collection.find_one(query)
if cache_object is None:
cache_object = self._gen_cache_object(object_name, on_not_found())
self.collection.insert_one(cache_object)
return cache_object, RetrieveStatus.ADDED
return cache_object, RetrieveStatus.RETRIEVED
def overwrite(self, object_name: str, value_gen_fn: Callable[[], str]) -> dict:
self._connect()
update_time = datetime.now().isoformat()
new_data = value_gen_fn()
cache_object = {
"object_name": object_name,
"data": new_data,
"creation_date": update_time
}
query = {"object_name": object_name}
self.collection.update_one(query, {
"$set": {
"data": new_data,
"creation_date": update_time
}
})
return cache_object
def set(self, data: dict) -> dict:
self._connect()
return self.collection.insert_one(data)
def _connect(self) -> None:
if not self.client or not self.client.server_info():
self.client = pymongo.MongoClient(
self.url, **self.connection_params)
self.db = getattr(self.client, self.storage_conf["db_name"])
self.collection = getattr(self.db, "cache_storage")
def _gen_cache_object(self, object_name: str, data: str) -> dict:
"""
Helper function to generate a cache object.
:param object_name: the name of the object.
:type object_name: str
:param data: the data to store.
:type data: str
"""
return {
"object_name": object_name,
"data": data,
"creation_date": datetime.now().isoformat()
}