Stores
A store reads and writes statement-based FollowTheMoney (FtM) entities, and assembles them into their merged, canonical form on retrieval.
Stores are where deduplication decisions meet the data. When an entity is read from a store, its statements are grouped by canonical ID — as defined by a Linker — so a cluster of merged source records comes back as one entity. The source statements themselves are never rewritten; changing a judgement in the resolver changes what the store returns, not what it contains.
Three classes cooperate:
Store — the storage backend, bound to a dataset scope and a linker.
Writer — bulk write operations: add entities or individual statements, remove them by entity ID.
View — read access over a dataset scope: get an entity by ID, iterate all entities, or traverse inverted relationships (which entities reference this one?).
Choosing a backend
Use MemoryStore for datasets that fit in memory — this is what the nk command line uses when it reads entities from a file, via load_entity_file_store. Use SQLStore to persist statements to SQLite or PostgreSQL. Two further backends, LevelStore (LevelDB) and RedisStore, live in nomenklatura.store.level and nomenklatura.store.redis_ and require the optional plyvel and redis dependencies.
from pathlib import Path
from nomenklatura import Resolver
from nomenklatura.db import make_session
from nomenklatura.store import load_entity_file_store
with make_session() as session:
resolver = Resolver(session, create=True)
store = load_entity_file_store(Path("entities.ftm.json"), resolver)
view = store.default_view()
for entity in view.entities():
print(entity.caption)
Interface
nomenklatura.store.Store
Bases: Generic[DS, SE]
A data storage and retrieval mechanism for statement-based entity data.
Essentially, this is a triple store which can be implemented using various
backends.
Source code in nomenklatura/store/base.py
| class Store(Generic[DS, SE]):
"""A data storage and retrieval mechanism for statement-based entity data.
Essentially, this is a triple store which can be implemented using various
backends."""
def __init__(self, dataset: DS, linker: Linker[SE]):
self.dataset = dataset
self.linker = linker
self.entity_class = cast(Type[SE], StatementEntity)
def writer(self) -> "Writer[DS, SE]":
raise NotImplementedError()
def view(self, scope: DS, external: bool = False) -> "View[DS, SE]":
raise NotImplementedError()
def default_view(self, external: bool = False) -> "View[DS, SE]":
return self.view(self.dataset, external=external)
def assemble(self, statements: List[Statement]) -> Optional[SE]:
if not len(statements):
return None
canonicals: List[Statement] = []
for stmt in statements:
if get_prop_type(stmt.schema, stmt.prop) == registry.entity.name:
ov = stmt._value if stmt.original_value is None else stmt.original_value
stmt = stmt.clone(
value=self.linker.get_canonical(stmt._value),
original_value=ov,
)
canonicals.append(stmt)
entity = self.entity_class.from_statements(self.dataset, canonicals)
if entity.id is not None:
entity.extra_referents.update(self.linker.get_referents(entity.id))
return entity
def update(self, id: str) -> None:
canonical_id = self.linker.get_canonical(id)
with self.writer() as writer:
for referent in self.linker.get_referents(canonical_id):
for stmt in writer.pop(referent):
stmt.canonical_id = canonical_id
writer.add_statement(stmt)
def close(self) -> None:
pass
def __repr__(self) -> str:
return f"<{type(self).__name__}({self.dataset.name!r})>"
|
nomenklatura.store.Writer
Bases: Generic[DS, SE]
Bulk writing operations.
Source code in nomenklatura/store/base.py
| class Writer(Generic[DS, SE]):
"""Bulk writing operations."""
def __init__(self, store: Store[DS, SE]):
self.store = store
def add_statement(self, stmt: Statement) -> None:
raise NotImplementedError()
def add_entity(self, entity: SE) -> None:
for stmt in entity.statements:
self.add_statement(stmt)
def pop(self, entity_id: str) -> List[Statement]:
raise NotImplementedError()
def flush(self) -> None:
pass
def close(self) -> None:
self.store.close()
def __enter__(self) -> "Writer[DS, SE]":
return self
def __exit__(
self,
type: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.flush()
def __repr__(self) -> str:
return f"<{type(self).__name__}({self.store!r})>"
|
nomenklatura.store.View
Bases: Generic[DS, SE]
Read access to the entities in a store, scoped to a dataset.
Entities come back in their merged, canonical form. Use get_entity() for
a lookup by ID, entities() to stream the whole scope, and get_adjacent()
to traverse relationships in both directions.
Source code in nomenklatura/store/base.py
| class View(Generic[DS, SE]):
"""Read access to the entities in a store, scoped to a dataset.
Entities come back in their merged, canonical form. Use `get_entity()` for
a lookup by ID, `entities()` to stream the whole scope, and `get_adjacent()`
to traverse relationships in both directions."""
def __init__(self, store: Store[DS, SE], scope: DS, external: bool = False):
self.store = store
self.scope = scope
self.dataset_names = scope.leaf_names
self.external = external
def has_entity(self, id: str) -> bool:
raise NotImplementedError()
def get_entity(self, id: str) -> Optional[SE]:
raise NotImplementedError()
def get_inverted(self, id: str) -> Generator[Tuple[Property, SE], None, None]:
raise NotImplementedError()
def get_adjacent(
self, entity: SE, inverted: bool = True
) -> Generator[Tuple[Property, SE], None, None]:
for prop, value in entity.itervalues():
if prop.type == registry.entity:
child = self.get_entity(value)
if child is not None:
yield prop, child
if inverted and entity.id is not None:
for prop, adjacent in self.get_inverted(entity.id):
yield prop, adjacent
def entities(
self, include_schemata: List[Schema] = []
) -> Generator[SE, None, None]:
"""Iterate over all entities in the view.
If `include_schemata` is provided, only entities of the provided schemata will be returned.
Note that `schemata` will not be expanded via "is_a" relationships."""
raise NotImplementedError()
def __repr__(self) -> str:
return f"<{type(self).__name__}({self.scope.name!r})>"
|
entities(include_schemata=[])
Iterate over all entities in the view.
If include_schemata is provided, only entities of the provided schemata will be returned.
Note that schemata will not be expanded via "is_a" relationships.
Source code in nomenklatura/store/base.py
| def entities(
self, include_schemata: List[Schema] = []
) -> Generator[SE, None, None]:
"""Iterate over all entities in the view.
If `include_schemata` is provided, only entities of the provided schemata will be returned.
Note that `schemata` will not be expanded via "is_a" relationships."""
raise NotImplementedError()
|
Implementations
nomenklatura.store.MemoryStore
Bases: Store[DS, SE]
Hold statements in plain dictionaries, with no persistence.
The right choice for datasets that fit into memory, e.g. when processing
an entity file on the command line.
Source code in nomenklatura/store/memory.py
| class MemoryStore(Store[DS, SE]):
"""Hold statements in plain dictionaries, with no persistence.
The right choice for datasets that fit into memory, e.g. when processing
an entity file on the command line."""
def __init__(self, dataset: DS, linker: Linker[SE]):
super().__init__(dataset, linker)
self.stmts: Dict[str, Set[Statement]] = {}
self.inverted: Dict[str, Set[str]] = {}
self.entities: Dict[str, Set[str]] = {}
def writer(self) -> Writer[DS, SE]:
return MemoryWriter(self)
def view(self, scope: DS, external: bool = False) -> View[DS, SE]:
return MemoryView(self, scope, external=external)
|
nomenklatura.store.sql.SQLStore
Bases: Store[DS, SE]
Persist statements to a SQL database (SQLite or PostgreSQL).
Use this when a dataset is too large to hold in memory, or when several
processes need to work with the same store.
Source code in nomenklatura/store/sql.py
| class SQLStore(Store[DS, SE]):
"""Persist statements to a SQL database (SQLite or PostgreSQL).
Use this when a dataset is too large to hold in memory, or when several
processes need to work with the same store."""
def __init__(
self,
dataset: DS,
linker: Linker[SE],
uri: str = settings.DB_URL,
):
super().__init__(dataset, linker)
self._uri = uri
metadata = get_metadata()
self.engine: Engine = get_engine(uri)
self.table = make_statement_table(metadata)
metadata.create_all(self.engine, tables=[self.table], checkfirst=True)
def writer(self) -> Writer[DS, SE]:
return SQLWriter(self)
def close(self) -> None:
close_db(self._uri)
def view(self, scope: DS, external: bool = False) -> View[DS, SE]:
return SQLView(self, scope, external=external)
def _execute(
self, q: Select[Any], stream: bool = True
) -> Generator[Any, None, None]:
# execute any read query against sql backend
with self.engine.connect() as conn:
if stream:
conn = conn.execution_options(stream_results=True)
cursor = conn.execute(q)
while rows := cursor.fetchmany(10_000):
yield from rows
def _iterate_stmts(
self, q: Select[Any], stream: bool = True
) -> Generator[Statement, None, None]:
for row in self._execute(q, stream=stream):
yield Statement.from_db_row(row)
def _iterate(
self, q: Select[Any], stream: bool = True
) -> Generator[SE, None, None]:
current_id = None
current_stmts: list[Statement] = []
for stmt in self._iterate_stmts(q, stream=stream):
entity_id = stmt.entity_id
if current_id is None:
current_id = entity_id
if current_id != entity_id:
proxy = self.assemble(current_stmts)
if proxy is not None:
yield proxy
current_id = entity_id
current_stmts = []
current_stmts.append(stmt)
if len(current_stmts):
proxy = self.assemble(current_stmts)
if proxy is not None:
yield proxy
|
nomenklatura.store.load_entity_file_store(path, resolver, cleaned=True)
Create a simple in-memory store by reading FtM entities from a file path.
Source code in nomenklatura/store/__init__.py
| def load_entity_file_store(
path: Path,
resolver: Resolver[StatementEntity],
cleaned: bool = True,
) -> SimpleMemoryStore:
"""Create a simple in-memory store by reading FtM entities from a file path."""
name = slugify(path.stem, sep="_") or Dataset.UNDEFINED
dataset = Dataset.make({"name": name, "title": path.name})
store = MemoryStore(dataset, resolver)
with store.writer() as writer:
with open(path, "rb") as fh:
while line := fh.readline():
data = orjson.loads(line)
proxy = StatementEntity.from_data(dataset, data, cleaned=cleaned)
for ds in proxy.datasets:
if ds not in dataset.dataset_names:
discovered = Dataset.make({"name": ds})
dataset.children.add(discovered)
writer.add_entity(proxy)
return store
|