Skip to content

Commit

Permalink
fix some queries
Browse files Browse the repository at this point in the history
  • Loading branch information
charles-cooper committed Jan 10, 2025
1 parent 1b5bb70 commit 4b7c2dc
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions boa/util/sqlitedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,17 @@ def __init__(self, db_path: Path | str, ttl: float = _ONE_MONTH) -> None:
db_path.parent.mkdir(parents=True, exist_ok=True)

self.db: sqlite3.Connection = sqlite3.connect(
db_path, timeout=0.0, isolation_level=None
db_path, timeout=0.025, isolation_level=None
)

# cache the cursor
self._cursor = self.db.cursor()

with self.acquire_write_lock():
for cmd in self.__class__._CREATE_CMDS:
self.db.execute(cmd)
self._cursor.execute(cmd)

# ttl of cache entries in seconds
# ttl = 100
self.ttl: float = float(ttl)

self.gc()
Expand All @@ -62,7 +65,7 @@ def __init__(self, db_path: Path | str, ttl: float = _ONE_MONTH) -> None:
def gc(self):
with self.acquire_write_lock():
current_time = get_current_time()
self.db.execute(
self._cursor.execute(
"DELETE FROM kv_store WHERE expires_at < ?", (current_time,)
)

Expand All @@ -83,24 +86,26 @@ def _flush(self):
SET expires_at=?
WHERE key=?
"""
self.db.executemany(query_string, self._expiry_updates)
self._cursor.executemany(query_string, self._expiry_updates)
self._expiry_updates = []

@contextlib.contextmanager
def acquire_write_lock(self):
while True:
try:
self.db.execute("BEGIN IMMEDIATE")
self._cursor.execute("BEGIN IMMEDIATE")
break
except sqlite3.OperationalError:
# sleep 10 micros
# sleep 10 micros, roughly the min time for a query to complete
time.sleep(1e-4)
continue
try:
yield
self.db.commit()
except Exception:
self.db.rollback()
self._cursor.execute("COMMIT")
except Exception as e:
raise
print("EXC", e)
self._cursor.execute("ROLLBACK")

@classmethod
# Creates db as a singleton class variable
Expand All @@ -118,7 +123,7 @@ def __getitem__(self, key: bytes) -> bytes:
SELECT value, expires_at FROM kv_store
WHERE key=?
"""
res = self.db.execute(query_string, (key,)).fetchone()
res = self._cursor.execute(query_string, (key,)).fetchone()
if res is None:
raise KeyError(key)

Expand All @@ -135,23 +140,23 @@ def __getitem__(self, key: bytes) -> bytes:
return val

def __setitem__(self, key: bytes, value: bytes) -> None:
query_string = """
INSERT INTO kv_store(key, value, expires_at) VALUES (?,?,?)
ON CONFLICT
SET value=excluded.value,
expires_at=excluded.expires_at
"""
with self.acquire_write_lock():
query_string = """
INSERT INTO kv_store(key, value, expires_at) VALUES (?,?,?)
ON CONFLICT DO UPDATE
SET value=excluded.value,
expires_at=excluded.expires_at
"""
expiry_ts = self.get_expiry_ts()
self.db.execute(query_string, (key, value, expiry_ts))
self._cursor.execute(query_string, (key, value, expiry_ts))

def _exists(self, key: bytes) -> bool:
query_string = "SELECT count(*) FROM kv_store WHERE key=?"
(res,) = self.db.execute(query_string, (key,)).fetchone()
(res,) = self._cursor.execute(query_string, (key,)).fetchone()
return bool(res)

def __delitem__(self, key: bytes) -> None:
with self.acquire_write_lock():
res = self.db.execute("DELETE FROM kv_store WHERE key=?", (key,))
res = self._cursor.execute("DELETE FROM kv_store WHERE key=?", (key,))
if res.rowcount == 0:
raise KeyError(key)

0 comments on commit 4b7c2dc

Please sign in to comment.