Skip to content

Stores

A store reads and writes statement-based FollowTheMoney (FtM) entities, and assembles them into their merged, canonical form on retrieval.

Stores are where deduplication decisions meet the data. When an entity is read from a store, its statements are grouped by canonical ID — as defined by a Linker — so a cluster of merged source records comes back as one entity. The source statements themselves are never rewritten; changing a judgement in the resolver changes what the store returns, not what it contains.

Three classes cooperate:

  • Store — the storage backend, bound to a dataset scope and a linker.
  • Writer — bulk write operations: add entities or individual statements, remove them by entity ID.
  • View — read access over a dataset scope: get an entity by ID, iterate all entities, or traverse inverted relationships (which entities reference this one?).

Choosing a backend

Use MemoryStore for datasets that fit in memory — this is what the nk command line uses when it reads entities from a file, via load_entity_file_store. Use SQLStore to persist statements to SQLite or PostgreSQL. Two further backends, LevelStore (LevelDB) and RedisStore, live in nomenklatura.store.level and nomenklatura.store.redis_ and require the optional plyvel and redis dependencies.

from pathlib import Path
from nomenklatura import Resolver
from nomenklatura.db import make_session
from nomenklatura.store import load_entity_file_store

with make_session() as session:
    resolver = Resolver(session, create=True)
    store = load_entity_file_store(Path("entities.ftm.json"), resolver)
    view = store.default_view()
    for entity in view.entities():
        print(entity.caption)

Interface

nomenklatura.store.Store

Bases: Generic[DS, SE]

A data storage and retrieval mechanism for statement-based entity data. Essentially, this is a triple store which can be implemented using various backends.

Source code in nomenklatura/store/base.py
class Store(Generic[DS, SE]):
    """A data storage and retrieval mechanism for statement-based entity data.
    Essentially, this is a triple store which can be implemented using various
    backends."""

    def __init__(self, dataset: DS, linker: Linker[SE]):
        self.dataset = dataset
        self.linker = linker
        self.entity_class = cast(Type[SE], StatementEntity)

    def writer(self) -> "Writer[DS, SE]":
        raise NotImplementedError()

    def view(self, scope: DS, external: bool = False) -> "View[DS, SE]":
        raise NotImplementedError()

    def default_view(self, external: bool = False) -> "View[DS, SE]":
        return self.view(self.dataset, external=external)

    def assemble(self, statements: List[Statement]) -> Optional[SE]:
        if not len(statements):
            return None
        canonicals: List[Statement] = []
        for stmt in statements:
            if get_prop_type(stmt.schema, stmt.prop) == registry.entity.name:
                ov = stmt._value if stmt.original_value is None else stmt.original_value
                stmt = stmt.clone(
                    value=self.linker.get_canonical(stmt._value),
                    original_value=ov,
                )
            canonicals.append(stmt)
        entity = self.entity_class.from_statements(self.dataset, canonicals)
        if entity.id is not None:
            entity.extra_referents.update(self.linker.get_referents(entity.id))
        return entity

    def update(self, id: str) -> None:
        canonical_id = self.linker.get_canonical(id)
        with self.writer() as writer:
            for referent in self.linker.get_referents(canonical_id):
                for stmt in writer.pop(referent):
                    stmt.canonical_id = canonical_id
                    writer.add_statement(stmt)

    def close(self) -> None:
        pass

    def __repr__(self) -> str:
        return f"<{type(self).__name__}({self.dataset.name!r})>"

nomenklatura.store.Writer

Bases: Generic[DS, SE]

Bulk writing operations.

Source code in nomenklatura/store/base.py
class Writer(Generic[DS, SE]):
    """Bulk writing operations."""

    def __init__(self, store: Store[DS, SE]):
        self.store = store

    def add_statement(self, stmt: Statement) -> None:
        raise NotImplementedError()

    def add_entity(self, entity: SE) -> None:
        for stmt in entity.statements:
            self.add_statement(stmt)

    def pop(self, entity_id: str) -> List[Statement]:
        raise NotImplementedError()

    def flush(self) -> None:
        pass

    def close(self) -> None:
        self.store.close()

    def __enter__(self) -> "Writer[DS, SE]":
        return self

    def __exit__(
        self,
        type: Optional[Type[BaseException]],
        value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        self.flush()

    def __repr__(self) -> str:
        return f"<{type(self).__name__}({self.store!r})>"

nomenklatura.store.View

Bases: Generic[DS, SE]

Read access to the entities in a store, scoped to a dataset.

Entities come back in their merged, canonical form. Use get_entity() for a lookup by ID, entities() to stream the whole scope, and get_adjacent() to traverse relationships in both directions.

Source code in nomenklatura/store/base.py
class View(Generic[DS, SE]):
    """Read access to the entities in a store, scoped to a dataset.

    Entities come back in their merged, canonical form. Use `get_entity()` for
    a lookup by ID, `entities()` to stream the whole scope, and `get_adjacent()`
    to traverse relationships in both directions."""

    def __init__(self, store: Store[DS, SE], scope: DS, external: bool = False):
        self.store = store
        self.scope = scope
        self.dataset_names = scope.leaf_names
        self.external = external

    def has_entity(self, id: str) -> bool:
        raise NotImplementedError()

    def get_entity(self, id: str) -> Optional[SE]:
        raise NotImplementedError()

    def get_inverted(self, id: str) -> Generator[Tuple[Property, SE], None, None]:
        raise NotImplementedError()

    def get_adjacent(
        self, entity: SE, inverted: bool = True
    ) -> Generator[Tuple[Property, SE], None, None]:
        for prop, value in entity.itervalues():
            if prop.type == registry.entity:
                child = self.get_entity(value)
                if child is not None:
                    yield prop, child

        if inverted and entity.id is not None:
            for prop, adjacent in self.get_inverted(entity.id):
                yield prop, adjacent

    def entities(
        self, include_schemata: List[Schema] = []
    ) -> Generator[SE, None, None]:
        """Iterate over all entities in the view.

        If `include_schemata` is provided, only entities of the provided schemata will be returned.
        Note that `schemata` will not be expanded via "is_a" relationships."""

        raise NotImplementedError()

    def __repr__(self) -> str:
        return f"<{type(self).__name__}({self.scope.name!r})>"

entities(include_schemata=[])

Iterate over all entities in the view.

If include_schemata is provided, only entities of the provided schemata will be returned. Note that schemata will not be expanded via "is_a" relationships.

Source code in nomenklatura/store/base.py
def entities(
    self, include_schemata: List[Schema] = []
) -> Generator[SE, None, None]:
    """Iterate over all entities in the view.

    If `include_schemata` is provided, only entities of the provided schemata will be returned.
    Note that `schemata` will not be expanded via "is_a" relationships."""

    raise NotImplementedError()

Implementations

nomenklatura.store.MemoryStore

Bases: Store[DS, SE]

Hold statements in plain dictionaries, with no persistence.

The right choice for datasets that fit into memory, e.g. when processing an entity file on the command line.

Source code in nomenklatura/store/memory.py
class MemoryStore(Store[DS, SE]):
    """Hold statements in plain dictionaries, with no persistence.

    The right choice for datasets that fit into memory, e.g. when processing
    an entity file on the command line."""

    def __init__(self, dataset: DS, linker: Linker[SE]):
        super().__init__(dataset, linker)
        self.stmts: Dict[str, Set[Statement]] = {}
        self.inverted: Dict[str, Set[str]] = {}
        self.entities: Dict[str, Set[str]] = {}

    def writer(self) -> Writer[DS, SE]:
        return MemoryWriter(self)

    def view(self, scope: DS, external: bool = False) -> View[DS, SE]:
        return MemoryView(self, scope, external=external)

nomenklatura.store.sql.SQLStore

Bases: Store[DS, SE]

Persist statements to a SQL database (SQLite or PostgreSQL).

Use this when a dataset is too large to hold in memory, or when several processes need to work with the same store.

Source code in nomenklatura/store/sql.py
class SQLStore(Store[DS, SE]):
    """Persist statements to a SQL database (SQLite or PostgreSQL).

    Use this when a dataset is too large to hold in memory, or when several
    processes need to work with the same store."""

    def __init__(
        self,
        dataset: DS,
        linker: Linker[SE],
        uri: str = settings.DB_URL,
    ):
        super().__init__(dataset, linker)
        self._uri = uri
        metadata = get_metadata()
        self.engine: Engine = get_engine(uri)
        self.table = make_statement_table(metadata)
        metadata.create_all(self.engine, tables=[self.table], checkfirst=True)

    def writer(self) -> Writer[DS, SE]:
        return SQLWriter(self)

    def close(self) -> None:
        close_db(self._uri)

    def view(self, scope: DS, external: bool = False) -> View[DS, SE]:
        return SQLView(self, scope, external=external)

    def _execute(
        self, q: Select[Any], stream: bool = True
    ) -> Generator[Any, None, None]:
        # execute any read query against sql backend
        with self.engine.connect() as conn:
            if stream:
                conn = conn.execution_options(stream_results=True)
            cursor = conn.execute(q)
            while rows := cursor.fetchmany(10_000):
                yield from rows

    def _iterate_stmts(
        self, q: Select[Any], stream: bool = True
    ) -> Generator[Statement, None, None]:
        for row in self._execute(q, stream=stream):
            yield Statement.from_db_row(row)

    def _iterate(
        self, q: Select[Any], stream: bool = True
    ) -> Generator[SE, None, None]:
        current_id = None
        current_stmts: list[Statement] = []
        for stmt in self._iterate_stmts(q, stream=stream):
            entity_id = stmt.entity_id
            if current_id is None:
                current_id = entity_id
            if current_id != entity_id:
                proxy = self.assemble(current_stmts)
                if proxy is not None:
                    yield proxy
                current_id = entity_id
                current_stmts = []
            current_stmts.append(stmt)
        if len(current_stmts):
            proxy = self.assemble(current_stmts)
            if proxy is not None:
                yield proxy

nomenklatura.store.load_entity_file_store(path, resolver, cleaned=True)

Create a simple in-memory store by reading FtM entities from a file path.

Source code in nomenklatura/store/__init__.py
def load_entity_file_store(
    path: Path,
    resolver: Resolver[StatementEntity],
    cleaned: bool = True,
) -> SimpleMemoryStore:
    """Create a simple in-memory store by reading FtM entities from a file path."""
    name = slugify(path.stem, sep="_") or Dataset.UNDEFINED
    dataset = Dataset.make({"name": name, "title": path.name})
    store = MemoryStore(dataset, resolver)
    with store.writer() as writer:
        with open(path, "rb") as fh:
            while line := fh.readline():
                data = orjson.loads(line)
                proxy = StatementEntity.from_data(dataset, data, cleaned=cleaned)
                for ds in proxy.datasets:
                    if ds not in dataset.dataset_names:
                        discovered = Dataset.make({"name": ds})
                        dataset.children.add(discovered)
                writer.add_entity(proxy)
    return store