Skip to content

Commit

Permalink
Merge pull request #173 from opensanctions/pudo/resolved-store
Browse files Browse the repository at this point in the history
Implement a resolved store that can act as a cache
  • Loading branch information
pudo authored Nov 25, 2024
2 parents 14a5064 + 660e276 commit d404d33
Show file tree
Hide file tree
Showing 3 changed files with 344 additions and 1 deletion.
3 changes: 2 additions & 1 deletion nomenklatura/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from redis.client import Redis
from fakeredis import FakeStrictRedis

from rigour.env import ENCODING
from nomenklatura import settings

log = logging.getLogger(__name__)
Expand All @@ -31,7 +32,7 @@ def close_redis() -> None:

def b(s: str) -> bytes:
"""Encode a string to bytes."""
return s.encode("utf-8")
return s.encode(ENCODING)


def bv(s: Union[bytes, str, int, float]) -> bytes:
Expand Down
216 changes: 216 additions & 0 deletions nomenklatura/store/resolved.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import orjson
import logging
from redis.client import Redis
from typing import Generator, List, Optional, Tuple
from followthemoney.property import Property
from followthemoney.types import registry
from rigour.env import ENCODING as ENC

from nomenklatura.kv import get_redis, close_redis
from nomenklatura.dataset import DS
from nomenklatura.entity import CE
from nomenklatura.resolver import Linker, StrIdent
from nomenklatura.statement import Statement
from nomenklatura.store.base import Store, View, Writer

log = logging.getLogger(__name__)


class ResolvedStore(Store[DS, CE]):
"""A store implementation which is built to store fully resolved entities. This
implementation is not designed to be updated, and cannot store individual statements."""

def __init__(
self,
dataset: DS,
linker: Linker[CE],
prefix: Optional[str] = None,
db: Optional["Redis[bytes]"] = None,
):
super().__init__(dataset, linker)
if db is None:
db = get_redis()
self.db = db
self.prefix = f"xre:{prefix or dataset.name}"

def writer(self) -> Writer[DS, CE]:
return ResolvedWriter(self)

def view(self, scope: DS, external: bool = False) -> View[DS, CE]:
if external:
raise NotImplementedError("External views not supported!")
return ResolvedView(self, scope, external=external)

def update(self, id: StrIdent) -> None:
raise NotImplementedError("Entity store cannot update entities")

def assemble(self, statements: List[Statement]) -> Optional[CE]:
# This is simplified because the store is considered to be fully resolved
if not len(statements):
return None
return self.entity_class.from_statements(self.dataset, statements)

def drop(self, prefix: Optional[str] = None) -> None:
"""Delete all data associated with a prefix of the store."""
pipeline = self.db.pipeline()
prefix = f"xre:{prefix}" if prefix else self.prefix
cmds = 0
for key in self.db.scan_iter(f"{prefix}:*"):
pipeline.delete(key)
cmds += 1
if cmds > 10_000:
pipeline.execute()
pipeline = self.db.pipeline()
cmds = 0
if cmds > 0:
pipeline.execute()

def derive(self, store: Store[DS, CE]) -> None:
"""Copy all data from another store into this one."""
writer = self.writer()
view = store.default_view()
for idx, entity in enumerate(view.entities()):
if idx > 0 and idx % 10_000 == 0:
log.info("Deriving resolved store %s: %s...", store.dataset.name, idx)
writer.add_entity(entity)
writer.flush()

def close(self) -> None:
close_redis()


class ResolvedWriter(Writer[DS, CE]):
BATCH_ENTITIES = 1_000

def __init__(self, store: ResolvedStore[DS, CE]):
self.store: ResolvedStore[DS, CE] = store
self.entities: List[CE] = []

def flush(self) -> None:
pipeline = self.store.db.pipeline()
for entity in self.entities:
stmts = []
for stmt in entity.statements:
row = (
stmt.id,
stmt.entity_id,
stmt.prop,
stmt.schema,
stmt.value,
stmt.dataset,
stmt.lang,
stmt.original_value,
stmt.target,
# stmt.external,
stmt.first_seen,
stmt.last_seen,
)
stmts.append(row)
obj = {"i": entity.id, "c": entity.caption, "s": stmts}
key = f"{self.store.prefix}:e:{entity.id}"
pipeline.set(key.encode(ENC), orjson.dumps(obj))
for inv_id in entity.get_type_values(registry.entity, matchable=True):
inv_key = f"{self.store.prefix}:i:{inv_id}"
pipeline.sadd(inv_key.encode(ENC), key)
pipeline.execute()
self.entities = []

def add_statement(self, stmt: Statement) -> None:
raise NotImplementedError("Entity store cannot add invididual statements")

def add_entity(self, entity: CE) -> None:
self.entities.append(entity)
if len(self.entities) >= self.BATCH_ENTITIES:
self.flush()

def pop(self, entity_id: str) -> List[Statement]:
raise NotImplementedError("Entity store cannot pop entities")


class ResolvedView(View[DS, CE]):
def __init__(
self, store: ResolvedStore[DS, CE], scope: DS, external: bool = False
) -> None:
super().__init__(store, scope, external=external)
self.store: ResolvedStore[DS, CE] = store

def has_entity(self, id: str) -> bool:
key = f"{self.store.prefix}:e:{id}"
return self.store.db.exists(key) > 0

def _unpack(self, data: bytes) -> CE:
obj = orjson.loads(data)
statements: List[Statement] = []
for stmt in obj["s"]:
(
stmt_id,
entity_id,
prop,
schema,
value,
dataset,
lang,
original_value,
target,
# external,
first_seen,
last_seen,
) = stmt
statements.append(
Statement(
id=stmt_id,
entity_id=entity_id,
canonical_id=obj["i"],
prop=prop,
schema=schema,
value=value,
dataset=dataset,
lang=lang,
original_value=original_value,
target=target,
external=False,
first_seen=first_seen,
last_seen=last_seen,
)
)
entity = self.store.entity_class.from_statements(self.store.dataset, statements)
entity._caption = obj["c"]
return entity

def get_entity(self, id: str) -> Optional[CE]:
key = f"{self.store.prefix}:e:{id}"
data = self.store.db.get(key.encode(ENC))
if data is None:
return None
return self._unpack(data)

def get_inverted(self, id: str) -> Generator[Tuple[Property, CE], None, None]:
key = f"{self.store.prefix}:i:{id}"
inv_keys = self.store.db.smembers(key.encode(ENC))
inv_data = self.store.db.mget(inv_keys)
for data in inv_data:
if data is None:
continue
entity = self._unpack(data)
for prop, value in entity.itervalues():
if value == id and prop.reverse is not None:
yield prop.reverse, entity

def entities(self) -> Generator[CE, None, None]:
prefix = f"{self.store.prefix}:e:*"
batch: List[bytes] = []
for id in self.store.db.scan_iter(prefix, count=50_000):
batch.append(id)
if len(batch) >= 1_000:
datas = self.store.db.mget(batch)
for data in datas:
if data is None:
continue
yield self._unpack(data)
batch = []
if len(batch):
datas = self.store.db.mget(batch)
for data in datas:
if data is None:
continue
yield self._unpack(data)
126 changes: 126 additions & 0 deletions tests/store/test_resolved.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import pytest
import orjson
import fakeredis
from pathlib import Path
from followthemoney import model
from datetime import datetime

from nomenklatura.resolver import Resolver
from nomenklatura.judgement import Judgement
from nomenklatura.store.memory import MemoryStore
from nomenklatura.store.resolved import ResolvedStore
from nomenklatura.dataset import Dataset
from nomenklatura.entity import CompositeEntity
from nomenklatura.util import datetime_iso

DAIMLER = "66ce9f62af8c7d329506da41cb7c36ba058b3d28"
PERSON = {
"id": "john-doe",
"schema": "Person",
"properties": {"name": ["John Doe"], "birthDate": ["1976"]},
}

PERSON_EXT = {
"id": "john-doe-2",
"schema": "Person",
"properties": {"birthPlace": ["North Texas"]},
}


def test_store_basics(test_dataset: Dataset):
redis = fakeredis.FakeStrictRedis(version=6, decode_responses=False)
resolver = Resolver[CompositeEntity]()
store = ResolvedStore(test_dataset, resolver, db=redis)
entity = CompositeEntity.from_data(test_dataset, PERSON)
entity_ext = CompositeEntity.from_data(test_dataset, PERSON_EXT)
assert len(list(store.view(test_dataset).entities())) == 0
writer = store.writer()
ts = datetime_iso(datetime.now())
for stmt in entity.statements:
stmt.first_seen = ts
stmt.last_seen = ts
writer.add_entity(entity)
writer.flush()
view = store.view(test_dataset)
assert len(list(view.entities())) == 1
writer.add_entity(entity_ext)
writer.flush()
assert len(list(store.view(test_dataset).entities())) == 2

merged_id = resolver.decide(
"john-doe",
"john-doe-2",
judgement=Judgement.POSITIVE,
user="test",
)
with pytest.raises(NotImplementedError):
store.update(merged_id)


def test_graph_query(donations_path: Path, test_dataset: Dataset):
redis = fakeredis.FakeStrictRedis(version=6, decode_responses=False)
resolver = Resolver[CompositeEntity]()
store = ResolvedStore(test_dataset, resolver, db=redis)
assert len(list(store.view(test_dataset).entities())) == 0
with store.writer() as writer:
with open(donations_path, "rb") as fh:
while line := fh.readline():
data = orjson.loads(line)
proxy = CompositeEntity.from_data(test_dataset, data)
writer.add_entity(proxy)

assert len(list(store.view(test_dataset).entities())) == 474

view = store.default_view()
entity = view.get_entity("banana")
assert entity is None, entity
assert not view.has_entity("banana")
entity = view.get_entity(DAIMLER)
assert entity is not None, entity
assert view.has_entity(DAIMLER)
assert "Daimler" in entity.caption, entity.caption
assert len(entity.datasets) == 1
ds = entity.datasets.pop()
assert test_dataset.name in ds, ds

adjacent = list(view.get_adjacent(entity))
assert len(adjacent) == 10, len(adjacent)
schemata = [e.schema for (_, e) in adjacent]
assert model.get("Payment") in schemata, set(schemata)
assert model.get("Address") in schemata, set(schemata)
assert model.get("Company") not in schemata, set(schemata)

ext_entity = CompositeEntity.from_data(test_dataset, PERSON)
with store.writer() as writer:
writer.add_entity(ext_entity)
with pytest.raises(NotImplementedError):
for stmt in entity.statements:
writer.add_statement(stmt)

view = store.view(test_dataset)
ret_entity = view.get_entity("john-doe")
assert ret_entity is not None, ret_entity
assert len(list(ret_entity.statements)) == len(list(ext_entity.statements))
assert view.has_entity("john-doe")
assert not view.has_entity("john-doe-333")


def test_custom_functions(donations_path: Path, test_dataset: Dataset):
redis = fakeredis.FakeStrictRedis(version=6, decode_responses=False)
resolver = Resolver[CompositeEntity]()
prefix = "test123"
mem_store = MemoryStore(test_dataset, resolver)
store = ResolvedStore(test_dataset, resolver, prefix=prefix, db=redis)
with mem_store.writer() as writer:
with open(donations_path, "rb") as fh:
while line := fh.readline():
data = orjson.loads(line)
proxy = CompositeEntity.from_data(test_dataset, data)
writer.add_entity(proxy)

assert len(list(mem_store.view(test_dataset).entities())) == 474
assert len(list(store.view(test_dataset).entities())) == 0
store.derive(mem_store)
assert len(list(store.view(test_dataset).entities())) == 474
store.drop()
assert len(list(store.view(test_dataset).entities())) == 0

0 comments on commit d404d33

Please sign in to comment.