Skip to content

Commit

Permalink
fix(common): resolve mutlithread conflict in zodb IO
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuzhongshu123 committed Dec 16, 2024
1 parent b78b7a3 commit 5a9c73e
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions kag/common/checkpointer/bin_checkpointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import logging
import transaction
import threading

import BTrees.OOBTree
from ZODB import DB
from ZODB.FileStorage import FileStorage
from kag.common.checkpointer.base import CheckPointer
Expand Down Expand Up @@ -115,6 +117,8 @@ def open(self):
with self._lock:
storage = FileStorage(self._ckpt_file_path)
db = DB(storage)
with db.transaction() as conn:
conn.root.data = BTrees.OOBTree.BTree()
return db

def read_from_ckpt(self, key):
Expand All @@ -129,7 +133,7 @@ def read_from_ckpt(self, key):
"""
with self._lock:
with self._ckpt.transaction() as conn:
obj = conn.root().get(key, None)
obj = conn.root.data.get(key, None)
return obj

def write_to_ckpt(self, key, value):
Expand All @@ -143,7 +147,7 @@ def write_to_ckpt(self, key, value):
with self._lock:
try:
with self._ckpt.transaction() as conn:
conn.root()[key] = value
conn.root.data[key] = value
except Exception as e:
logger.warn(f"failed to write checkpoint {key} to db, info: {e}")

Expand Down Expand Up @@ -171,9 +175,9 @@ def exists(self, key):
"""
with self._lock:
with self._ckpt.transaction() as conn:
return key in conn.root()
return key in conn.root.data

def size(self):
with self._lock:
with self._ckpt.transaction() as conn:
return len(conn.root())
return len(conn.root.data)

0 comments on commit 5a9c73e

Please sign in to comment.