From 7c0ef2f5e7616f24ad5d394e94169911e16339cb Mon Sep 17 00:00:00 2001 From: Charles Cooper Date: Fri, 10 Jan 2025 14:07:07 -0500 Subject: [PATCH] perf tuning and fixes --- boa/util/sqlitedb.py | 125 +++++++++++++++++++++++++++++++------------ tests/conftest.py | 6 +++ 2 files changed, 98 insertions(+), 33 deletions(-) diff --git a/boa/util/sqlitedb.py b/boa/util/sqlitedb.py index 40d4067c..7d9850fb 100644 --- a/boa/util/sqlitedb.py +++ b/boa/util/sqlitedb.py @@ -5,6 +5,8 @@ from eth.db.backends.base import BaseDB +# from vyper.utils import timeit + # poor man's constant _ONE_MONTH = 30 * 24 * 3600 @@ -21,46 +23,64 @@ class SqliteCache(BaseDB): _GLOBAL = None - _CREATE_CMDS = [ - """ - pragma journal_mode=wal - """, - """ - CREATE TABLE IF NOT EXISTS kv_store ( - key TEXT PRIMARY KEY, value BLOB, expires_at float - ) - """, + _debug = False + + _CREATE_CMD = """ + -- tuning + pragma journal_mode = wal; + pragma synchronous = normal; + pragma temp_store = memory; + + -- not sure if these help or hurt things: + pragma mmap_size = 30000000000; + pragma auto_vacuum = incremental; + pragma page_size = 512; + + -- initialize schema + CREATE TABLE IF NOT EXISTS kv_store ( + key TEXT PRIMARY KEY, value BLOB, expires_at float + ); + CREATE INDEX IF NOT EXISTS expires_at_index ON kv_store(expires_at); """ - CREATE INDEX IF NOT EXISTS expires_at_index ON kv_store(expires_at) - """, - ] - # flush at least once per second - _MAX_FLUSH_TIME = 1.0 + # flush at least once every second + _FLUSH_INTERVAL = 1.0 def __init__(self, db_path: Path | str, ttl: float = _ONE_MONTH) -> None: if db_path != ":memory:": # sqlite magic path db_path = Path(db_path) db_path.parent.mkdir(parents=True, exist_ok=True) + # short timeout so that we don't get stuck waiting for + # OperationalError on exit (e.g. in __del__()), but long enough + # so that we don't spinlock in acquire lock. 10ms seems like + # a good balance. + # note that informal benchmarks performed on my machine(TM) had + # the following results: + # __getitem__: 15us + # __getitem__ (from wal): 120us + # __setitem__: 300us + # gc: 40us + # flush: 6000us + TIMEOUT = 0.010 # 10ms + self.db: sqlite3.Connection = sqlite3.connect( - db_path, timeout=0.025, isolation_level=None + db_path, timeout=TIMEOUT, isolation_level=None ) # cache the cursor self._cursor = self.db.cursor() - with self.acquire_write_lock(): - for cmd in self.__class__._CREATE_CMDS: - self._cursor.execute(cmd) + # initialize other fields + self._last_flush = get_current_time() + self._expiry_updates: list[tuple[float, bytes]] = [] # ttl of cache entries in seconds self.ttl: float = float(ttl) - self.gc() + self._cursor.executescript(self._CREATE_CMD) - self._last_flush = get_current_time() - self._expiry_updates: list[tuple[float, bytes]] = [] + self.gc() def gc(self): with self.acquire_write_lock(): @@ -68,43 +88,76 @@ def gc(self): self._cursor.execute( "DELETE FROM kv_store WHERE expires_at < ?", (current_time,) ) + # batch writes in here + self._flush(nolock=True) def __del__(self): - self._flush() + # try to flush but if we fail, no worries. these are optional + # operations. + try: + self._flush() + except Exception: + pass def _flush_condition(self): + if len(self._expiry_updates) >= 1000: + return True + next_flush = self._last_flush + self._FLUSH_INTERVAL + if get_current_time() > next_flush: + return True + return False + + # @timeit("FLUSH") + def _flush(self, nolock=False): + # set nolock=True if the caller has already acquired a lock. if len(self._expiry_updates) == 0: - return False - - next_flush = self._last_flush + self._MAX_FLUSH_TIME - return len(self._expiry_updates) > 1000 or get_current_time() > next_flush + self._last_flush = get_current_time() + return - def _flush(self): - with self.acquire_write_lock(): + acquire_lock = contextlib.nullcontext if nolock else self.acquire_write_lock + with acquire_lock(): query_string = """ UPDATE kv_store SET expires_at=? WHERE key=? """ self._cursor.executemany(query_string, self._expiry_updates) - self._expiry_updates = [] + + self._expiry_updates = [] + self._last_flush = get_current_time() @contextlib.contextmanager def acquire_write_lock(self): + count = 0 while True: try: self._cursor.execute("BEGIN IMMEDIATE") break - except sqlite3.OperationalError: - # sleep 10 micros, roughly the min time for a query to complete + except sqlite3.OperationalError as e: + count += 1 + # deadlock scenario, should not happen + if count > 1000: + msg = "deadlock encountered! this means there is a bug in" + msg += " titanoboa. please report this on the titanoboa" + msg += " issue tracker on github. in the meantime as a" + msg += " workaround, try disabling the sqlite cache." + raise Exception(msg) from e + # sleep 100 micros, roughly the time for a write query to + # complete. + # keep in mind that time.sleep takes 50us+: + # https://github.com/python/cpython/issues/125997. time.sleep(1e-4) continue try: yield self._cursor.execute("COMMIT") except Exception as e: - raise - print("EXC", e) + if self._debug: + # if we are in tests, raise the exception + raise Exception("fail") from e + # this shouldn't really happen, but it could happen if there + # is some concurrent, non-boa write to the database, so just + # roll back (fail "gracefully") and move on. self._cursor.execute("ROLLBACK") @classmethod @@ -123,6 +176,7 @@ def __getitem__(self, key: bytes) -> bytes: SELECT value, expires_at FROM kv_store WHERE key=? """ + # with timeit("CACHE HIT"): res = self._cursor.execute(query_string, (key,)).fetchone() if res is None: raise KeyError(key) @@ -140,6 +194,7 @@ def __getitem__(self, key: bytes) -> bytes: return val def __setitem__(self, key: bytes, value: bytes) -> None: + # with timeit("CACHE MISS"): with self.acquire_write_lock(): query_string = """ INSERT INTO kv_store(key, value, expires_at) VALUES (?,?,?) @@ -150,6 +205,10 @@ def __setitem__(self, key: bytes, value: bytes) -> None: expiry_ts = self.get_expiry_ts() self._cursor.execute(query_string, (key, value, expiry_ts)) + # we already have a lock, batch in some writes if needed + if self._flush_condition(): + self._flush(nolock=True) + def _exists(self, key: bytes) -> bool: query_string = "SELECT count(*) FROM kv_store WHERE key=?" (res,) = self._cursor.execute(query_string, (key,)).fetchone() diff --git a/tests/conftest.py b/tests/conftest.py index 1068f8c6..0154a2d0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,10 +3,16 @@ import hypothesis import pytest +from boa.util.sqlitedb import SqliteCache + # disable hypothesis deadline globally hypothesis.settings.register_profile("ci", deadline=None) hypothesis.settings.load_profile("ci") +# disable ignoring certain exceptions, see note in +# `SqliteCache.acquire_write_lock()`. +SqliteCache._debug = True + @pytest.fixture(scope="module") def get_filepath(request):