Skip to content

Commit

Permalink
perf tuning and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
charles-cooper committed Jan 10, 2025
1 parent 4b7c2dc commit 7c0ef2f
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 33 deletions.
125 changes: 92 additions & 33 deletions boa/util/sqlitedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from eth.db.backends.base import BaseDB

# from vyper.utils import timeit

# poor man's constant
_ONE_MONTH = 30 * 24 * 3600

Expand All @@ -21,90 +23,141 @@ class SqliteCache(BaseDB):

_GLOBAL = None

_CREATE_CMDS = [
"""
pragma journal_mode=wal
""",
"""
CREATE TABLE IF NOT EXISTS kv_store (
key TEXT PRIMARY KEY, value BLOB, expires_at float
)
""",
_debug = False

_CREATE_CMD = """
-- tuning
pragma journal_mode = wal;
pragma synchronous = normal;
pragma temp_store = memory;
-- not sure if these help or hurt things:
pragma mmap_size = 30000000000;
pragma auto_vacuum = incremental;
pragma page_size = 512;
-- initialize schema
CREATE TABLE IF NOT EXISTS kv_store (
key TEXT PRIMARY KEY, value BLOB, expires_at float
);
CREATE INDEX IF NOT EXISTS expires_at_index ON kv_store(expires_at);
"""
CREATE INDEX IF NOT EXISTS expires_at_index ON kv_store(expires_at)
""",
]

# flush at least once per second
_MAX_FLUSH_TIME = 1.0
# flush at least once every second
_FLUSH_INTERVAL = 1.0

def __init__(self, db_path: Path | str, ttl: float = _ONE_MONTH) -> None:
if db_path != ":memory:": # sqlite magic path
db_path = Path(db_path)
db_path.parent.mkdir(parents=True, exist_ok=True)

# short timeout so that we don't get stuck waiting for
# OperationalError on exit (e.g. in __del__()), but long enough
# so that we don't spinlock in acquire lock. 10ms seems like
# a good balance.
# note that informal benchmarks performed on my machine(TM) had
# the following results:
# __getitem__: 15us
# __getitem__ (from wal): 120us
# __setitem__: 300us
# gc: 40us
# flush: 6000us
TIMEOUT = 0.010 # 10ms

self.db: sqlite3.Connection = sqlite3.connect(
db_path, timeout=0.025, isolation_level=None
db_path, timeout=TIMEOUT, isolation_level=None
)

# cache the cursor
self._cursor = self.db.cursor()

with self.acquire_write_lock():
for cmd in self.__class__._CREATE_CMDS:
self._cursor.execute(cmd)
# initialize other fields
self._last_flush = get_current_time()
self._expiry_updates: list[tuple[float, bytes]] = []

# ttl of cache entries in seconds
self.ttl: float = float(ttl)

self.gc()
self._cursor.executescript(self._CREATE_CMD)

self._last_flush = get_current_time()
self._expiry_updates: list[tuple[float, bytes]] = []
self.gc()

def gc(self):
with self.acquire_write_lock():
current_time = get_current_time()
self._cursor.execute(
"DELETE FROM kv_store WHERE expires_at < ?", (current_time,)
)
# batch writes in here
self._flush(nolock=True)

def __del__(self):
self._flush()
# try to flush but if we fail, no worries. these are optional
# operations.
try:
self._flush()
except Exception:
pass

def _flush_condition(self):
if len(self._expiry_updates) >= 1000:
return True
next_flush = self._last_flush + self._FLUSH_INTERVAL
if get_current_time() > next_flush:
return True
return False

# @timeit("FLUSH")
def _flush(self, nolock=False):
# set nolock=True if the caller has already acquired a lock.
if len(self._expiry_updates) == 0:
return False

next_flush = self._last_flush + self._MAX_FLUSH_TIME
return len(self._expiry_updates) > 1000 or get_current_time() > next_flush
self._last_flush = get_current_time()
return

def _flush(self):
with self.acquire_write_lock():
acquire_lock = contextlib.nullcontext if nolock else self.acquire_write_lock
with acquire_lock():
query_string = """
UPDATE kv_store
SET expires_at=?
WHERE key=?
"""
self._cursor.executemany(query_string, self._expiry_updates)
self._expiry_updates = []

self._expiry_updates = []
self._last_flush = get_current_time()

@contextlib.contextmanager
def acquire_write_lock(self):
count = 0
while True:
try:
self._cursor.execute("BEGIN IMMEDIATE")
break
except sqlite3.OperationalError:
# sleep 10 micros, roughly the min time for a query to complete
except sqlite3.OperationalError as e:
count += 1
# deadlock scenario, should not happen
if count > 1000:
msg = "deadlock encountered! this means there is a bug in"
msg += " titanoboa. please report this on the titanoboa"
msg += " issue tracker on github. in the meantime as a"
msg += " workaround, try disabling the sqlite cache."
raise Exception(msg) from e
# sleep 100 micros, roughly the time for a write query to
# complete.
# keep in mind that time.sleep takes 50us+:
# https://github.com/python/cpython/issues/125997.
time.sleep(1e-4)
continue
try:
yield
self._cursor.execute("COMMIT")
except Exception as e:
raise
print("EXC", e)
if self._debug:
# if we are in tests, raise the exception
raise Exception("fail") from e
# this shouldn't really happen, but it could happen if there
# is some concurrent, non-boa write to the database, so just
# roll back (fail "gracefully") and move on.
self._cursor.execute("ROLLBACK")

@classmethod
Expand All @@ -123,6 +176,7 @@ def __getitem__(self, key: bytes) -> bytes:
SELECT value, expires_at FROM kv_store
WHERE key=?
"""
# with timeit("CACHE HIT"):
res = self._cursor.execute(query_string, (key,)).fetchone()
if res is None:
raise KeyError(key)
Expand All @@ -140,6 +194,7 @@ def __getitem__(self, key: bytes) -> bytes:
return val

def __setitem__(self, key: bytes, value: bytes) -> None:
# with timeit("CACHE MISS"):
with self.acquire_write_lock():
query_string = """
INSERT INTO kv_store(key, value, expires_at) VALUES (?,?,?)
Expand All @@ -150,6 +205,10 @@ def __setitem__(self, key: bytes, value: bytes) -> None:
expiry_ts = self.get_expiry_ts()
self._cursor.execute(query_string, (key, value, expiry_ts))

# we already have a lock, batch in some writes if needed
if self._flush_condition():
self._flush(nolock=True)

def _exists(self, key: bytes) -> bool:
query_string = "SELECT count(*) FROM kv_store WHERE key=?"
(res,) = self._cursor.execute(query_string, (key,)).fetchone()
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@
import hypothesis
import pytest

from boa.util.sqlitedb import SqliteCache

# disable hypothesis deadline globally
hypothesis.settings.register_profile("ci", deadline=None)
hypothesis.settings.load_profile("ci")

# disable ignoring certain exceptions, see note in
# `SqliteCache.acquire_write_lock()`.
SqliteCache._debug = True


@pytest.fixture(scope="module")
def get_filepath(request):
Expand Down

0 comments on commit 7c0ef2f

Please sign in to comment.