Skip to content

Enrichers

An enricher connects entities to an external data source, implementing a match step that finds candidate records and an expand step that fetches related entities.

For the workflow these classes plug into — configuration files, the nk match and nk enrich commands, and the role of the resolver — see the enrichment guide. This page documents the framework interface and the configuration options of each built-in enricher.

All enrichers accept the shared options cache_days, schemata, and topics, described in the enrichment guide. String options can reference environment variables with ${VAR} syntax.

Framework

nomenklatura.enrich.make_enricher(dataset, cache, config, http_session=None)

Instantiate the enricher class named by the type import path in the given configuration, e.g. nomenklatura.enrich.wikidata:WikidataEnricher.

Source code in nomenklatura/enrich/__init__.py
def make_enricher(
    dataset: DS,
    cache: Cache,
    config: EnricherConfig,
    http_session: Optional[Session] = None,
) -> Enricher[DS]:
    """Instantiate the enricher class named by the `type` import path in the
    given configuration, e.g. `nomenklatura.enrich.wikidata:WikidataEnricher`."""
    enricher_type = config.pop("type")
    if ":" not in enricher_type:
        raise RuntimeError("Invalid import path: %r" % enricher_type)
    module_name, clazz_name = enricher_type.split(":", 1)
    module = import_module(module_name)
    clazz = getattr(module, clazz_name)
    if clazz is None or not issubclass(clazz, Enricher):
        raise RuntimeError("Invalid enricher: %r" % enricher_type)
    enr_clazz = cast(Type[Enricher[DS]], clazz)
    return enr_clazz(dataset, cache, config, session=http_session)

nomenklatura.enrich.match(enricher, resolver, entities, config=None)

Stream entities through the enricher and record candidate matches in the resolver.

Yields each input entity, followed by the candidates found for it. Each candidate pair is scored and stored in the resolver as a suggestion, to be confirmed or rejected in a later review step (e.g. nk dedupe).

Source code in nomenklatura/enrich/__init__.py
def match(
    enricher: Enricher[DS],
    resolver: Resolver[SE],
    entities: Iterable[SE],
    config: Optional[ScoringConfig] = None,
) -> Generator[SE, None, None]:
    """Stream entities through the enricher and record candidate matches in
    the resolver.

    Yields each input entity, followed by the candidates found for it. Each
    candidate pair is scored and stored in the resolver as a suggestion, to be
    confirmed or rejected in a later review step (e.g. `nk dedupe`)."""
    if config is None:
        config = ScoringConfig.defaults()
    for entity in entities:
        yield entity
        try:
            for match in enricher.match_wrapped(entity):
                if entity.id is None or match.id is None:
                    continue
                if not resolver.check_candidate(entity.id, match.id):
                    continue
                if not entity.schema.can_match(match.schema):
                    continue
                result = DefaultAlgorithm.compare(entity, match, config)
                log.info("Match [%s]: %.2f -> %s", entity, result.score, match)
                resolver.suggest(entity.id, match.id, result.score)
                match.datasets.add(enricher.dataset.name)
                match = resolver.apply(match)
                yield match
        except EnrichmentException:
            log.exception("Failed to match: %r" % entity)

nomenklatura.enrich.enrich(enricher, resolver, entities)

Fetch data for entities whose matches have been confirmed.

For each candidate that the resolver holds a positive judgement on, yields the matched entity and its related records from the enrichment source. Run this after judging the suggestions recorded by match().

Source code in nomenklatura/enrich/__init__.py
def enrich(
    enricher: Enricher[DS], resolver: Resolver[SE], entities: Iterable[SE]
) -> Generator[SE, None, None]:
    """Fetch data for entities whose matches have been confirmed.

    For each candidate that the resolver holds a positive judgement on, yields
    the matched entity and its related records from the enrichment source. Run
    this after judging the suggestions recorded by `match()`."""
    for entity in entities:
        try:
            for match in enricher.match_wrapped(entity):
                if entity.id is None or match.id is None:
                    continue
                judgement = resolver.get_judgement(match.id, entity.id)
                if judgement != Judgement.POSITIVE:
                    continue

                log.info("Enrich [%s]: %r", entity, match)
                for adjacent in enricher.expand_wrapped(entity, match):
                    adjacent.datasets.add(enricher.dataset.name)
                    adjacent = resolver.apply(adjacent)
                    yield adjacent
        except EnrichmentException:
            log.exception("Failed to enrich: %r" % entity)

nomenklatura.enrich.Enricher

Bases: BaseEnricher[DS], ABC

A connector to an external data source that finds candidate matches for entities and retrieves their related records.

Subclasses implement match() and expand(). The base class provides an HTTP session and caching request helpers, so repeated runs against the same source don't re-fetch from the remote API.

Source code in nomenklatura/enrich/common.py
class Enricher(BaseEnricher[DS], ABC):
    """A connector to an external data source that finds candidate matches
    for entities and retrieves their related records.

    Subclasses implement `match()` and `expand()`. The base class provides an
    HTTP session and caching request helpers, so repeated runs against the
    same source don't re-fetch from the remote API."""

    def __init__(
        self,
        dataset: DS,
        cache: Cache,
        config: EnricherConfig,
        session: Optional[Session] = None,
    ):
        super().__init__(dataset, cache, config)
        self._session: Optional[Session] = session

    @property
    def session(self) -> Session:
        if self._session is None:
            self._session = Session()
            self._session.headers["User-Agent"] = USER_AGENT
        return self._session

    def http_get_cached(
        self,
        url: str,
        params: ParamsType = None,
        hidden: ParamsType = None,
        cache_days: Optional[int] = None,
    ) -> str:
        url = build_url(url, params=params)
        cache_days_ = self.cache_days if cache_days is None else cache_days
        response = self.cache.get(url, max_age=cache_days_)
        if response is None:
            log.debug("HTTP GET: %s", url)
            hidden_url = build_url(url, params=hidden)
            try:
                resp = self.session.get(hidden_url)
                resp.raise_for_status()
            except RequestException as rex:
                if rex.response is not None and rex.response.status_code in (401, 403):
                    raise EnrichmentAbort("Authorization failure: %s" % url) from rex
                msg = "HTTP fetch failed [%s]: %s" % (url, rex)
                log.info(f"{msg}\n{traceback.format_exc()}")
                raise EnrichmentException(msg) from rex
            response = resp.text
            if cache_days_ > 0:
                self.cache.set(url, response)
        return response

    def http_remove_cache(self, url: str, params: ParamsType = None) -> None:
        url = build_url(url, params=params)
        self.cache.delete(url)

    def http_get_json_cached(
        self,
        url: str,
        params: ParamsType = None,
        hidden: ParamsType = None,
        cache_days: Optional[int] = None,
    ) -> Any:
        res = self.http_get_cached(url, params, hidden=hidden, cache_days=cache_days)
        return json.loads(res)

    def http_post_json_cached(
        self,
        url: str,
        cache_key: str,
        json: Any = None,
        data: Any = None,
        headers: HeadersType = None,
        cache_days: Optional[int] = None,
        retry_chunked_encoding_error: int = 1,
    ) -> Any:
        cache_days_ = self.cache_days if cache_days is None else cache_days
        resp_data = self.cache.get_json(cache_key, max_age=cache_days_)
        if resp_data is None:
            try:
                resp = self.session.post(url, json=json, data=data, headers=headers)
                resp.raise_for_status()
            except ChunkedEncodingError as rex:
                # Due to https://github.com/urllib3/urllib3/issues/2751#issuecomment-2567630065,
                # urllib3's Retry strategy will not retry on chunked encoding errors.
                # Since urllib won't retry it, retry it here.
                # urllib does close the connection.
                if (
                    "Response ended prematurely" in str(rex)
                    and retry_chunked_encoding_error > 0
                ):
                    log.info("Retrying due to chunked encoding error: %s", rex)
                    return self.http_post_json_cached(
                        url,
                        cache_key,
                        json=json,
                        data=data,
                        headers=headers,
                        cache_days=cache_days,
                        retry_chunked_encoding_error=retry_chunked_encoding_error - 1,
                    )

                msg = "HTTP POST failed [%s]: %s" % (url, rex)
                raise EnrichmentException(msg) from rex
            except RequestException as rex:
                if rex.response is not None and rex.response.status_code in (401, 403):
                    raise EnrichmentAbort("Authorization failure: %s" % url) from rex

                msg = "HTTP POST failed [%s]: %s" % (url, rex)
                log.info(f"{msg}\n{traceback.format_exc()}")
                raise EnrichmentException(msg) from rex
            resp_data = resp.json()
            if cache_days_ > 0:
                self.cache.set_json(cache_key, resp_data)
        return resp_data

    def _make_data_entity(
        self, entity: SE, data: Dict[str, Any], cleaned: bool = True
    ) -> SE:
        """Create an entity which is of the same sub-type of SE as the given
        query entity."""
        return type(entity).from_data(self.dataset, data, cleaned=cleaned)

    def load_entity(self, entity: SE, data: Dict[str, Any]) -> SE:
        proxy = self._make_data_entity(entity, data, cleaned=False)
        for prop in proxy.iterprops():
            if prop.stub:
                proxy.pop(prop)
        return proxy

    def make_entity(self, entity: SE, schema: str) -> SE:
        """Create a new entity of the given schema."""
        return self._make_data_entity(entity, {"schema": schema})

    def match_wrapped(self, entity: SE) -> Generator[SE, None, None]:
        if not self._filter_entity(entity):
            return
        yield from self.match(entity)

    def expand_wrapped(self, entity: SE, match: SE) -> Generator[SE, None, None]:
        if not self._filter_entity(entity):
            return
        yield from self.expand(entity, match)

    @abstractmethod
    def match(self, entity: SE) -> Generator[SE, None, None]:
        """Yield candidates from the external source that may describe the
        same real-world entity as the given query entity."""
        raise NotImplementedError()

    @abstractmethod
    def expand(self, entity: SE, match: SE) -> Generator[SE, None, None]:
        """Yield the confirmed match itself, followed by entities related to
        it in the external source (e.g. officers, owners, family members)."""
        raise NotImplementedError()

    def close(self) -> None:
        if self._session is not None:
            self._session.close()

expand(entity, match) abstractmethod

Yield the confirmed match itself, followed by entities related to it in the external source (e.g. officers, owners, family members).

Source code in nomenklatura/enrich/common.py
@abstractmethod
def expand(self, entity: SE, match: SE) -> Generator[SE, None, None]:
    """Yield the confirmed match itself, followed by entities related to
    it in the external source (e.g. officers, owners, family members)."""
    raise NotImplementedError()

make_entity(entity, schema)

Create a new entity of the given schema.

Source code in nomenklatura/enrich/common.py
def make_entity(self, entity: SE, schema: str) -> SE:
    """Create a new entity of the given schema."""
    return self._make_data_entity(entity, {"schema": schema})

match(entity) abstractmethod

Yield candidates from the external source that may describe the same real-world entity as the given query entity.

Source code in nomenklatura/enrich/common.py
@abstractmethod
def match(self, entity: SE) -> Generator[SE, None, None]:
    """Yield candidates from the external source that may describe the
    same real-world entity as the given query entity."""
    raise NotImplementedError()

nomenklatura.enrich.EnrichmentException

Bases: Exception

A lookup failed for one entity; processing continues with the next.

Source code in nomenklatura/enrich/common.py
class EnrichmentException(Exception):
    """A lookup failed for one entity; processing continues with the next."""

nomenklatura.enrich.EnrichmentAbort

Bases: Exception

The enrichment source cannot be used at all, e.g. because of an authorization failure. Callers should stop the run.

Source code in nomenklatura/enrich/common.py
class EnrichmentAbort(Exception):
    """The enrichment source cannot be used at all, e.g. because of an
    authorization failure. Callers should stop the run."""

Wikidata

nomenklatura.enrich.wikidata.WikidataEnricher

Bases: Enricher[DS]

Match Person entities against Wikidata items and import the matched item's claims as entity properties.

Family members and close associates of a matched person are followed and imported up to depth hops away.

Source code in nomenklatura/enrich/wikidata.py
class WikidataEnricher(Enricher[DS]):
    """Match `Person` entities against Wikidata items and import the matched
    item's claims as entity properties.

    Family members and close associates of a matched person are followed and
    imported up to `depth` hops away."""

    def __init__(
        self,
        dataset: DS,
        cache: Cache,
        config: EnricherConfig,
        session: Optional[Session] = None,
    ):
        super().__init__(dataset, cache, config, session)
        self.depth = self.get_config_int("depth", 1)
        self.aliases = bool(self.get_config_bool("aliases", False))
        self.search_limit = self.get_config_int("search_limit", 7)
        self.client = WikidataClient(cache, self.session, cache_days=self.cache_days)

    def keep_entity(self, entity: StatementEntity) -> bool:
        if check_person_cutoff(entity):
            return False
        return True

    def match(self, entity: SE) -> Generator[SE, None, None]:
        if not entity.schema.is_a("Person"):
            return

        wikidata_id = entity_qid(entity)

        # Already has an ID associated with it:
        if wikidata_id is not None:
            item = self.client.fetch_item(wikidata_id)
            if item is not None:
                proxy = self.item_proxy(entity, item, schema=entity.schema.name)
                if proxy is not None and self.keep_entity(proxy):
                    yield proxy
            return

        for qid in self.client.search_items(
            entity, aliases=self.aliases, limit=self.search_limit
        ):
            item = self.client.fetch_item(qid)
            if item is not None:
                proxy = self.item_proxy(entity, item, schema=entity.schema.name)
                if proxy is not None and self.keep_entity(proxy):
                    yield proxy

    def expand(self, entity: SE, match: SE) -> Generator[SE, None, None]:
        wikidata_id = entity_qid(match)
        if wikidata_id is None:
            return
        item = self.client.fetch_item(wikidata_id)
        if item is None:
            return
        if item.id != wikidata_id:
            log.warning(
                "Redirected matched QID: %s -> %s",
                wikidata_id,
                item.id,
            )
        proxy = self.item_proxy(match, item, schema=match.schema.name)
        if proxy is None or not self.keep_entity(proxy):
            return
        if "role.pep" in entity.get("topics", quiet=True):
            proxy.add("topics", "role.pep")
        yield proxy
        yield from self.item_graph(proxy, item)

    def make_link(
        self,
        proxy: SE,
        claim: Claim,
        depth: int,
        seen: Set[str],
        schema: str,
        other_schema: str,
        source_prop: str,
        target_prop: str,
    ) -> Generator[SE, None, None]:
        if depth < 1 or claim.qid is None or claim.qid in seen:
            return
        item = self.client.fetch_item(claim.qid)
        if item is None:
            return

        other = self.item_proxy(proxy, item, schema=other_schema)
        if other is None or not self.keep_entity(other):
            return None
        if proxy.id is None or other.id is None:
            return None

        # Hacky: if an entity is a PEP, then by definition their relatives and
        # associates are RCA (relatives and close associates).
        # if "role.pep" in proxy.get("topics", quiet=True):
        #     if "role.pep" not in other.get("topics"):
        #         other.add("topics", "role.rca")
        # Removed as per: https://github.com/opensanctions/opensanctions/issues/3354

        yield other
        yield from self.item_graph(other, item, depth=depth - 1, seen=seen)
        link = self.make_entity(proxy, schema)
        min_id, max_id = sorted((proxy.id, other.id))
        # FIXME: doesn't lead to collisions because claim.property has an inverse:
        link.id = f"wd-{claim.property}-{min_id}-{max_id}"
        link.id = link.id.lower()
        link.add(source_prop, proxy.id)
        link.add(target_prop, item.id)
        claim.property_label.apply(link, "relationship")

        for qual in claim.get_qualifier("P580"):
            qual.text.apply(link, "startDate")

        for qual in claim.get_qualifier("P582"):
            qual.text.apply(link, "endDate")

        for qual in claim.get_qualifier("P585"):
            qual.text.apply(link, "date")

        for qual in claim.get_qualifier("P1039"):
            qual.text.apply(link, "relationship")

        for qual in claim.get_qualifier("P2868"):
            qual.text.apply(link, "relationship")

        for ref in claim.references:
            for snak in ref.get("P854"):
                snak.text.apply(link, "sourceUrl")
        yield link

    def item_graph(
        self,
        proxy: SE,
        item: Item,
        depth: Optional[int] = None,
        seen: Optional[Set[str]] = None,
    ) -> Generator[SE, None, None]:
        if seen is None:
            seen = set()
        seen = seen.union([item.id])
        if depth is None:
            depth = self.depth
        for claim in item.claims:
            # Ignore all deprecated claims:
            if claim.deprecated:
                continue

            # TODO: memberships, employers?
            if claim.property in PROPS_FAMILY:
                yield from self.make_link(
                    proxy,
                    claim,
                    depth,
                    seen,
                    schema="Family",
                    other_schema="Person",
                    source_prop="person",
                    target_prop="relative",
                )
                continue
            if claim.property in PROPS_ASSOCIATION:
                yield from self.make_link(
                    proxy,
                    claim,
                    depth,
                    seen,
                    schema="Associate",
                    other_schema="Person",
                    source_prop="person",
                    target_prop="associate",
                )
                continue

    def item_proxy(self, ref: SE, item: Item, schema: str = "Person") -> Optional[SE]:
        proxy = self.make_entity(ref, schema)
        proxy.id = item.id
        if item.modified is None:
            return None
        # proxy.add("modifiedAt", item.modified)
        proxy.add("wikidataId", item.id)
        names: Set[str] = set()
        for label in item.sorted_labels:
            if label.text is None:
                continue
            ltext = label.text.casefold()
            if ltext in names:
                continue
            label.apply(proxy, "name", clean=clean_wikidata_name)
            names.add(ltext)
        if item.description is not None:
            item.description.apply(proxy, "notes")
        for alias in item.sorted_aliases:
            if alias.text is None:
                continue
            ltext = alias.text.casefold()
            if ltext in names:
                continue
            if is_alias_strong(alias.text, names):
                alias.apply(proxy, "alias", clean=clean_wikidata_name)
                names.add(ltext)
            else:
                alias.apply(proxy, "weakAlias", clean=clean_wikidata_name)

        if proxy.schema.is_a("Person") and not item.is_instance("Q5"):
            log.debug("Person is not a Q5 [%s]: %s", item.id, item.labels)
            return None

        names_concat = " ".join(names)
        for claim in item.claims:
            if claim.property is None:
                continue
            ftm_prop = PROPS_DIRECT.get(claim.property)
            if ftm_prop is None:
                continue
            if ftm_prop not in proxy.schema.properties:
                log.info("Entity %s does not have property: %s", item.id, ftm_prop)
                continue
            ftm_prop_ = proxy.schema.get(ftm_prop)
            if ftm_prop_ is None:
                log.info("Entity %s does not have property: %s", item.id, ftm_prop)
                continue
            if ftm_prop_.type == registry.country:
                territory = get_territory_by_qid(claim.qid)
                if territory is None or territory.ftm_country is None:
                    continue
                value = LangText(territory.ftm_country, original=claim.qid)
            else:
                value = claim.text

            # Sanity check that the name parts are in any of the full names:
            if ftm_prop in ("firstName", "lastName", "fatherName"):
                if value.text is None or value.text.lower() not in names_concat:
                    continue

            # Make sure the aliases look like the main name, otherwise mark them as weak:
            if ftm_prop == "alias":
                if value.text is None or value.text.lower() in names:
                    continue
                _strong = is_alias_strong(value.text, names)
                ftm_prop = "alias" if _strong else "weakAlias"

            if ftm_prop in PROPS_QUALIFIED:
                value = qualify_value(value, claim)
            if ftm_prop == "topics":
                topic = PROPS_TOPICS.get(claim.qid or "")
                if topic is None:
                    continue
                value = LangText(topic, original=claim.qid)
            value.apply(proxy, ftm_prop)

        # See https://github.com/opensanctions/opensanctions/issues/3651
        for wikilink in item.wikilinks:
            if wikilink.site == "enwiki":
                proxy.add(
                    "wikipediaUrl",
                    wikilink.url,
                    original_value=wikilink.title,
                    origin=wikilink.site,
                    lang=wikilink.lang,
                )
                break
        # We only use this if there are very few, since what we pick is then potentially significant for them.
        if not proxy.has("wikipediaUrl") and len(item.wikilinks) < 3:
            # Sort to be sure we pick the same link consistently
            for wikilink in sorted(item.wikilinks, key=lambda s: s.site):
                proxy.add(
                    "wikipediaUrl",
                    wikilink.url,
                    original_value=wikilink.title,
                    origin=wikilink.site,
                    lang=wikilink.lang,
                )
                break
        # TODO: do we want to do more sophisticated handling of wikilinks? For
        # example, if there are no English links, but there are many in other
        # languages, then maybe we should still include the first one?

        # has_english = len([i for i in item.wikilinks if i.site == "enwiki"]) > 0
        # num_wikilinks = len(item.wikilinks)
        # if not has_english and num_wikilinks > 2:
        #     log.warning(
        #         "I got %d wikilinks, but English ain't one: %s", num_wikilinks, item.id
        #     )
        return proxy
  • depth — how many hops of family and associate relationships to follow from a matched person (default 1).
  • aliases — also search on the entity's alias names, not only its primary names (default false).
  • search_limit — how many search results to consider per name (default 7).

The enricher builds on the Wikidata client.

yente

nomenklatura.enrich.yente.YenteEnricher

Bases: Enricher[DS]

Match entities against a yente instance — the OpenSanctions API server or any self-hosted deployment.

Any matchable schema can be queried. On expansion, related entities are read from the match's nested entity record.

Source code in nomenklatura/enrich/yente.py
class YenteEnricher(Enricher[DS]):
    """Match entities against a yente instance — the OpenSanctions API server
    or any self-hosted deployment.

    Any matchable schema can be queried. On expansion, related entities are
    read from the match's nested entity record."""

    def __init__(
        self,
        dataset: DS,
        cache: Cache,
        config: EnricherConfig,
        session: Optional[Session] = None,
    ):
        super().__init__(dataset, cache, config, session)
        self._api: str = config.pop("api")
        self._yente_dataset: str = config.pop("dataset", "default")
        self._cutoff: Optional[float] = config.pop("cutoff", None)
        self._algorithm: Optional[float] = config.pop("algorithm", "best")
        self._nested: bool = config.pop("expand_nested", True)
        self._fuzzy: bool = config.pop("fuzzy", False)
        self._ns: Optional[Namespace] = None
        if self.get_config_bool("strip_namespace"):
            self._ns = Namespace()

        api_key: Optional[str] = os.path.expandvars(config.pop("api_key", "")).strip()
        if api_key is None or not len(api_key):
            api_key = os.environ.get("YENTE_API_KEY")
        self._api_key: Optional[str] = api_key
        if self._api_key is not None:
            self.session.headers["Authorization"] = f"ApiKey {self._api_key}"

    def make_url(self, entity: StatementEntity) -> str:
        return urljoin(self._api, f"entities/{entity.id}")

    def match(self, entity: SE) -> Generator[SE, None, None]:
        if not entity.schema.matchable:
            return
        url = urljoin(self._api, f"match/{self._yente_dataset}")
        params: Dict[str, Any] = {"fuzzy": self._fuzzy, "algorithm": self._algorithm}
        if self._cutoff is not None:
            params["cutoff"] = self._cutoff
        url = build_url(url, params)
        cache_key = f"{url}:{entity.id}"
        props: Dict[str, List[str]] = {}
        for prop in entity.iterprops():
            if prop.type == registry.entity:
                continue
            if prop.matchable:
                props[prop.name] = entity.get(prop)
        query = {
            "queries": {
                "entity": {
                    "schema": entity.schema.name,
                    "properties": props,
                }
            }
        }
        for retry in range(4):
            try:
                response = self.http_post_json_cached(url, cache_key, query)
                inner_resp = response.get("responses", {}).get("entity", {})
                for result in inner_resp.get("results", []):
                    proxy = self.load_entity(entity, result)
                    proxy.add("sourceUrl", self.make_url(proxy))
                    if self._ns is not None:
                        proxy = self._ns.apply(proxy)
                    yield proxy
                return
            except EnrichmentException as exc:
                log.info("Error matching %r: %s", entity, exc)
                if retry == 3:
                    raise
                time.sleep((retry + 1) ** 2)

    def _traverse_nested(self, entity: SE, response: Any) -> Generator[SE, None, None]:
        entity = self.load_entity(entity, response)
        if self._ns is not None:
            entity = self._ns.apply(entity)
        yield entity
        for prop_name, values in response.get("properties", {}).items():
            prop = entity.schema.properties.get(prop_name)
            if prop is None or prop.type != registry.entity:
                continue
            for value in ensure_list(values):
                if isinstance(value, dict):
                    if prop.reverse is not None and not prop.reverse.stub:
                        reverse = prop.reverse.name
                        if reverse not in value["properties"]:
                            value["properties"][reverse] = []
                        value["properties"][reverse].append(entity.id)
                    yield from self._traverse_nested(entity, value)

    def expand(self, entity: SE, match: SE) -> Generator[SE, None, None]:
        url = self.make_url(match)
        for source_url in match.get("sourceUrl", quiet=True):
            if source_url.startswith(self._api):
                url = source_url
        url = build_url(url, {"nested": self._nested})
        response = self.http_get_json_cached(url)
        yield from self._traverse_nested(match, response)
  • api (required) — base URL of the yente instance, e.g. https://api.opensanctions.org/.
  • dataset — the yente dataset scope to match against (default default).
  • api_key — API key, sent as an Authorization header. Falls back to the YENTE_API_KEY environment variable.
  • algorithm — the yente scoring algorithm to use (default best).
  • cutoff — minimum score for returned candidates.
  • fuzzy — enable fuzzy name matching in the query (default false).
  • expand_nested — include related entities when expanding a match (default true).
  • strip_namespace — remove namespace suffixes from entity IDs (default false).

Aleph

nomenklatura.enrich.aleph.AlephEnricher

Bases: Enricher[DS]

Match entities against an Aleph instance, optionally scoped to a single collection, and import matched records with their nested relationships.

Source code in nomenklatura/enrich/aleph.py
class AlephEnricher(Enricher[DS]):
    """Match entities against an Aleph instance, optionally scoped to a single
    collection, and import matched records with their nested relationships."""

    def __init__(
        self,
        dataset: DS,
        cache: Cache,
        config: EnricherConfig,
        session: Optional[Session] = None,
    ):
        super().__init__(dataset, cache, config, session)
        self._host: str = os.environ.get("ALEPH_HOST", "https://aleph.occrp.org/")
        self._host = self.get_config_expand("host") or self._host
        self._base_url: str = urljoin(self._host, "/api/2/")
        self._collection: Optional[str] = self.get_config_expand("collection")
        self._ns: Optional[Namespace] = None
        if self.get_config_bool("strip_namespace"):
            self._ns = Namespace()
        self._api_key: Optional[str] = os.environ.get("ALEPH_API_KEY")
        self._api_key = self.get_config_expand("api_key") or self._api_key
        if self._api_key is not None:
            self.session.headers["Authorization"] = f"ApiKey {self._api_key}"
        self.session.headers["X-Aleph-Session"] = str(uuid.uuid4())

    @cached_property
    def collection_id(self) -> Optional[str]:
        if self._collection is None:
            return None
        url = urljoin(self._base_url, "collections")
        url = build_url(url, {"filter:foreign_id": self._collection})
        res = self.session.get(url)
        res.raise_for_status()
        response = res.json()
        for result in response.get("results", []):
            return cast(str, result["id"])
        return None

    def load_aleph_entity(self, entity: SE, data: Dict[str, Any]) -> Optional[SE]:
        data["referents"] = [data["id"]]
        try:
            proxy = super().load_entity(entity, data)
        except InvalidData:
            log.warning("Server model mismatch: %s" % data.get("schema"))
            return None
        links = data.get("links", {})
        proxy.add("alephUrl", links.get("self"), quiet=True, cleaned=True)
        collection = data.get("collection", {})
        proxy.add("publisher", collection.get("label"), quiet=True, cleaned=True)
        # clinks = collection.get("links", {})
        # entity.add("publisherUrl", clinks.get("ui"), quiet=True, cleaned=True)
        return proxy

    def convert_nested(
        self, entity: SE, data: Dict[str, Any]
    ) -> Generator[SE, None, None]:
        proxy = self.load_aleph_entity(entity, data)
        if proxy is not None:
            if self._ns is not None:
                entity = self._ns.apply(entity)
            yield proxy
        properties = data.get("properties", {})
        for _, values in properties.items():
            for value in ensure_list(values):
                if isinstance(value, dict) and "id" in value:
                    proxy = self.load_aleph_entity(entity, value)
                    if proxy is not None:
                        yield proxy

    # def enrich_entity(self, entity):
    #     url = self.api._make_url("match")
    #     for page in range(10):
    #         data = self.post_match(url, entity)
    #         for res in data.get("results", []):
    #             proxy = self.convert_entity(res)
    #             yield self.make_match(entity, proxy)

    #         url = data.get("next")
    #         if url is None:
    #             break

    # def expand_entity(self, entity):
    #     for url in entity.get("alephUrl", quiet=True):
    #         data = self.get_api(url)
    #         yield from self.convert_nested(data)

    #         _, entity_id = url.rsplit("/", 1)
    #         filters = (("entities", entity_id),)
    #         search_api = self.api._make_url("entities", filters=filters)
    #         while True:
    #             res = self.get_api(search_api)
    #             for data in ensure_list(res.get("results")):
    #                 yield from self.convert_nested(data)

    #             search_api = res.get("next")
    #             if search_api is None:
    #                 break

    def match(self, entity: SE) -> Generator[SE, None, None]:
        if not entity.schema.matchable:
            return
        url = urljoin(self._base_url, "match")
        if self.collection_id is not None:
            url = build_url(url, {"collection_ids": self.collection_id})
        query = {
            "schema": entity.schema.name,
            "properties": entity.properties,
        }
        cache_id = entity.id or hash_data(query)
        cache_key = f"{url}:{cache_id}"
        response = self.http_post_json_cached(url, cache_key, query)
        for result in response.get("results", []):
            proxy = self.load_aleph_entity(entity, result)
            if proxy is not None:
                if self._ns is not None:
                    entity = self._ns.apply(entity)
                yield proxy

    def expand(self, entity: SE, match: SE) -> Generator[SE, None, None]:
        url = urljoin(self._base_url, f"entities/{match.id}")
        for aleph_url in match.get("alephUrl", quiet=True):
            if aleph_url.startswith(self._base_url):
                url = aleph_url.replace("/entities/", "/api/2/entities/")
        response = self.http_get_json_cached(url)
        yield from self.convert_nested(match, response)
  • host — base URL of the Aleph instance. Falls back to the ALEPH_HOST environment variable, then https://aleph.occrp.org/.
  • api_key — Aleph API key. Falls back to the ALEPH_API_KEY environment variable.
  • collection — foreign ID of a collection to search within; if unset, the whole instance is searched.
  • strip_namespace — remove namespace suffixes from entity IDs (default false).

OpenCorporates

nomenklatura.enrich.opencorporates.OpenCorporatesEnricher

Bases: Enricher[DS]

Match companies and their officers against OpenCorporates, the global aggregator of company registry data. Requires an API token.

Source code in nomenklatura/enrich/opencorporates.py
class OpenCorporatesEnricher(Enricher[DS]):
    """Match companies and their officers against OpenCorporates, the global
    aggregator of company registry data. Requires an API token."""

    COMPANY_SEARCH_API = "https://api.opencorporates.com/v0.4/companies/search"
    OFFICER_SEARCH_API = "https://api.opencorporates.com/v0.4/officers/search"
    UI_PART = "://opencorporates.com/"
    API_PART = "://api.opencorporates.com/v0.4/"
    QUERY_SKIP = "'\\\"|&()[]{}^~*?:;!+-/<>@#$%`"

    def __init__(
        self,
        dataset: DS,
        cache: Cache,
        config: EnricherConfig,
        session: Optional[Session] = None,
    ):
        super().__init__(dataset, cache, config, session)
        token_var = "${OPENCORPORATES_API_TOKEN}"
        self.api_token: Optional[str] = self.get_config_expand("api_token", token_var)
        self.quota_exceeded = False
        if self.api_token == token_var:
            self.api_token = None
        if self.api_token is None:
            log.warning("OpenCorporates has no API token (%s)" % token_var)
        self.headers = {"X-API-TOKEN": self.api_token}
        # self.cache.preload(f"{self.COMPANY_SEARCH_API}%")

        self.skip_jurisdictions = set(self.get_config_list("skip_jurisdictions"))
        """Set of jurisdiction codes to skip during enrichment because they're not covered by
        OpenCorporates."""
        self.skip_jurisdictions.update(["xk", "su"])

    def oc_get_cached(self, url: str, params: ParamsType = None) -> Optional[Any]:
        url = build_url(url, params=params)
        response = self.cache.get(url, max_age=self.cache_days)
        if response is None:
            if self.quota_exceeded:
                return None
            try:
                log.info("OpenCorporates fetch: %s", url)
                resp = self.session.get(url, headers=self.headers)
                resp.raise_for_status()
            except RequestException as rex:
                if rex.response is not None:
                    if rex.response.status_code == 429:
                        log.warning(
                            "OpenCorporates quota exceeded (%s); using only cache now.",
                            rex.response.status_code,
                        )
                        self.quota_exceeded = True
                        return None
                    elif rex.response.status_code == 401:
                        raise EnrichmentAbort(
                            "Authorization failure: %s" % url
                        ) from rex
                msg = "HTTP fetch failed [%s]: %s" % (url, rex)
                raise EnrichmentException(msg) from rex
            response = resp.text
            self.cache.set(url, response)
        return json.loads(response)

    def match(self, entity: SE) -> Generator[SE, None, None]:
        if not entity.schema.matchable:
            return
        if entity.has("opencorporatesUrl"):
            # TODO: fetch entity here when we start to expand with content!
            return

        if entity.schema.name in ["Company", "Organization", "LegalEntity"]:
            yield from self.search_companies(entity)
        if entity.schema.name in ["Person", "LegalEntity", "Company", "Organization"]:
            # yield from self.search_officers(entity)
            pass

    def expand(self, entity: SE, match: SE) -> Generator[SE, None, None]:
        clone = self.make_entity(match, match.schema.name)
        clone.id = match.id
        clone.add("opencorporatesUrl", match.get("opencorporatesUrl"))
        yield clone

    # def expand_entity(self, entity):
    #     for url in entity.get("opencorporatesUrl", quiet=True):
    #         url = self.make_url(url)
    #         data = self.get_api(url).get("results", {})
    #         if "company" in data:
    #             yield from self.expand_company(entity, data)
    #         if "officer" in data:
    #             yield from self.expand_officer(data, officer=entity)

    def make_entity_id(self, url: str) -> str:
        parsed = urlparse(url)
        path = slugify_text(parsed.path, sep="-")
        assert path is not None, "Invalid OpenCorporates URL: %s" % url
        return f"oc-{path}"

    def clean_query(self, query: str) -> str:
        """Clean a query string for OpenCorporates search."""
        out: List[str] = []
        for char in query:
            if char in self.QUERY_SKIP:
                char = " "
            out.append(char)
        return squash_spaces("".join(out))

    def filter_ftm_countries(self, countries: List[str]) -> List[str]:
        """Filter a list of country codes to those known to followthemoney."""
        valid_countries = []
        for code in countries:
            territory = get_territory(code)
            if territory is None:
                continue
            if territory.parent is not None:
                territory = territory.parent
            if territory.is_historical:
                continue
            if not territory.is_country:
                continue
            valid_countries.append(territory.code)
        return valid_countries

    def jurisdiction_to_country(self, juris: Optional[Any]) -> Optional[str]:
        if juris is None:
            return None
        return str(juris).split("_", 1)[0]

    def company_entity(
        self, ref: SE, data: Dict[str, Any], entity: Optional[SE] = None
    ) -> SE:
        if "company" in data:
            data = ensure_dict(data.get("company", data))
        oc_url = cast(Optional[str], data.get("opencorporates_url"))
        if oc_url is None:
            raise ValueError("Company has no URL: %r" % data)
        if entity is None:
            entity = self.make_entity(ref, "Company")
            entity.id = self.make_entity_id(oc_url)
        entity.add("name", data.get("name"))

        # TODO: make this an adjacent object?
        address: Dict[str, Any] = ensure_dict(data.get("registered_address"))
        entity.add("country", address.get("country"))

        juris = self.jurisdiction_to_country(data.get("jurisdiction_code"))
        entity.add("jurisdiction", juris)
        entity.add("alias", data.get("alternative_names"))
        entity.add("address", data.get("registered_address_in_full"))
        entity.add("sourceUrl", data.get("registry_url"))
        entity.add("legalForm", data.get("company_type"))
        inc_date = data.get("incorporation_date")
        entity.add("incorporationDate", parse_date(inc_date))
        dis_date = data.get("dissolution_date")
        entity.add("dissolutionDate", parse_date(dis_date))
        entity.add("status", data.get("current_status"))
        entity.add("registrationNumber", data.get("company_number"))
        entity.add("opencorporatesUrl", oc_url)
        source = data.get("source", {})
        entity.add("publisher", source.get("publisher"))
        entity.add("publisherUrl", source.get("url"))
        entity.add("retrievedAt", parse_date(source.get("retrieved_at")))
        for code in data.get("industry_codes", []):
            code = code.get("industry_code", code)
            entity.add("sector", code.get("description"))
        for previous in data.get("previous_names", []):
            entity.add("previousName", previous.get("company_name"))
        for alias in data.get("alternative_names", []):
            entity.add("alias", alias.get("company_name"))
        return entity

    # def officer_entity(self, data, entity=None):
    #     if "officer" in data:
    #         data = ensure_dict(data.get("officer", data))
    #     person = data.get("occupation") or data.get("date_of_birth")
    #     schema = "Person" if person else "LegalEntity"
    #     entity = model.make_entity(schema)
    #     entity.make_id(data.get("opencorporates_url"))
    #     entity.add("name", data.get("name"))
    #     entity.add("country", data.get("nationality"))
    #     entity.add("jurisdiction", data.get("jurisdiction_code"))
    #     entity.add("address", data.get("address"))
    #     entity.add("birthDate", data.get("date_of_birth"), quiet=True)
    #     entity.add("position", data.get("occupation"), quiet=True)
    #     entity.add("opencorporatesUrl", data.get("opencorporates_url"))
    #     source = data.get("source", {})
    #     entity.add("publisher", source.get("publisher"))
    #     entity.add("publisherUrl", source.get("url"))
    #     entity.add("retrievedAt", source.get("retrieved_at"))
    #     return entity

    def search_companies(self, entity: SE) -> Generator[SE, None, None]:
        query = self.clean_query(entity.caption)
        # if query != entity.caption:
        #     log.info("OC query cleaned: %r -> %r", entity.caption, query)
        if not len(query):
            return
        params = {"q": query, "sparse": True}
        countries = entity.get_type_values(registry.country, matchable=True)
        countries = self.filter_ftm_countries(countries)

        if len(countries) > 0 and all(c in self.skip_jurisdictions for c in countries):
            return

        if len(countries) > 0:
            country_codes = "|".join(countries) if countries else None
            params["country_codes"] = country_codes

        for page in range(1, 9):
            params["page"] = page
            results = self.oc_get_cached(self.COMPANY_SEARCH_API, params=params)
            if results is None:
                break

            # print(results)
            for company in results.get("results", {}).get("companies", []):
                proxy = self.company_entity(entity, company)
                yield proxy
            if page >= results.get("total_pages", 0):
                break
  • api_token — OpenCorporates API token. Defaults to the OPENCORPORATES_API_TOKEN environment variable.
  • skip_jurisdictions — list of jurisdiction codes to exclude from lookups.

OpenFIGI

nomenklatura.enrich.openfigi.OpenFIGIEnricher

Bases: Enricher[DS]

Look up organizations and securities in OpenFIGI, Bloomberg's open database of financial instrument identifiers.

Matching an organization yields the securities it has issued; matching a security by ISIN links it to its issuer.

Source code in nomenklatura/enrich/openfigi.py
class OpenFIGIEnricher(Enricher[DS]):
    """Look up organizations and securities in OpenFIGI, Bloomberg's open
    database of financial instrument identifiers.

    Matching an organization yields the securities it has issued; matching a
    security by ISIN links it to its issuer."""

    SEARCH_URL = "https://api.openfigi.com/v3/search"
    MAPPING_URL = "https://api.openfigi.com/v3/mapping"

    def __init__(
        self,
        dataset: DS,
        cache: Cache,
        config: EnricherConfig,
        session: Optional[Session] = None,
    ):
        super().__init__(dataset, cache, config, session)
        api_key_var = "${OPENFIGI_API_KEY}"
        self.api_key: Optional[str] = self.get_config_expand("api_key", api_key_var)
        if self.api_key == api_key_var:
            self.api_key = None
        if self.api_key is None:
            log.warning("PermID has no API token (%s)" % api_key_var)

        api_key = os.environ.get("OPENFIGI_API_KEY")
        if api_key is not None:
            self.session.headers["X-OPENFIGI-APIKEY"] = api_key

    def make_company_id(self, name: str) -> str:
        return f"figi-company-{make_entity_id(name)}"

    def make_security_id(self, figi: str) -> str:
        return f"figi-{figi}"

    def search(self, query: str) -> Generator[Dict[str, str], None, None]:
        body = {"query": query}
        next = None

        while True:
            if next is not None:
                body["start"] = next

            log.info(f"Searching {query!r}, offset={next}")
            cache_key = f"{self.SEARCH_URL}:{query}:{next}"
            resp = self.http_post_json_cached(self.SEARCH_URL, cache_key, json=body)
            if "data" in resp:
                yield from resp["data"]

            next = resp.get("next", None)
            if next is None:
                break

    def match_organization(self, entity: SE) -> Generator[SE, None, None]:
        for name in entity.get("name"):
            for match in self.search(name):
                match_name = match.get("name", None)
                if match_name is None:
                    continue
                other = self.make_entity(entity, "Company")
                other.id = self.make_company_id(match_name)
                other.add("name", match_name)
                other.add("topics", "corp.public")
                yield other

    def match_security(self, entity: SE) -> Generator[SE, None, None]:
        for isin in entity.get("isin"):
            cache_key = f"{self.MAPPING_URL}:ISIN:{isin}"
            query = [{"idType": "ID_ISIN", "idValue": isin}]
            resp = self.http_post_json_cached(self.MAPPING_URL, cache_key, json=query)
            for section in resp:
                for item in section.get("data", []):
                    figi = item["figi"]
                    if figi != item.get("compositeFIGI", figi):
                        continue
                    security = self.make_entity(entity, "Security")
                    # security.id = self.make_security_id(item["figi"])
                    security.id = entity.id
                    security.add("isin", isin)
                    security.add("figiCode", item["figi"])
                    security.add("ticker", item["ticker"])
                    security.add("type", item["securityType"])
                    yield security

    def match(self, entity: SE) -> Generator[SE, None, None]:
        if entity.schema.is_a("Organization"):
            yield from self.match_organization(entity)
        if entity.schema.is_a("Security"):
            yield from self.match_security(entity)

    def expand(self, entity: SE, match: SE) -> Generator[SE, None, None]:
        if match.schema.is_a("Security"):
            yield match
        if match.schema.is_a("Organization"):
            name = match.first("name")
            if name is None:
                return
            yield match
            for item in self.search(name):
                # Only emit the securities which match the name of the positive match
                # to the company exactly. Skip everything else.
                if item["name"] != name:
                    continue

                figi = item["figi"]
                security = self.make_entity(match, "Security")
                security.id = self.make_security_id(figi)
                security.add("figiCode", figi)
                security.add("issuer", match)
                security.add("ticker", item["ticker"])
                security.add("type", item["securityType"])
                # if item["exchCode"] is not None:
                #     security.add("notes", f'exchange {item["exchCode"]}')
                security.add("description", item["securityDescription"])
                yield security
  • api_key — OpenFIGI API key. Defaults to the OPENFIGI_API_KEY environment variable.

PermID

nomenklatura.enrich.permid.PermIDEnricher

Bases: Enricher[DS]

Match organizations against PermID, the open entity identifier system published by LSEG (formerly Refinitiv). Requires an API token.

Source code in nomenklatura/enrich/permid.py
class PermIDEnricher(Enricher[DS]):
    """Match organizations against PermID, the open entity identifier system
    published by LSEG (formerly Refinitiv). Requires an API token."""

    MATCHING_API = "https://api-eit.refinitiv.com/permid/match"

    def __init__(
        self,
        dataset: DS,
        cache: Cache,
        config: EnricherConfig,
        session: Optional[Session] = None,
    ):
        super().__init__(dataset, cache, config, session)
        token_var = "${PERMID_API_TOKEN}"
        self.api_token: Optional[str] = self.get_config_expand("api_token", token_var)
        if self.api_token == token_var:
            self.api_token = None
        if self.api_token is None:
            log.warning("PermID has no API token (%s)" % token_var)
        self.quota_exceeded = False

    def entity_to_queries(self, entity: StatementEntity) -> bytes:
        names = entity.get_type_values(registry.name, matchable=True)
        countries = entity.get("jurisdiction", quiet=True)
        if not len(countries):
            countries = entity.get_type_values(registry.country, matchable=True)
        country_set = {c.upper()[:2] for c in countries}
        if len(country_set) == 0:
            country_set.add("")
        if len(names) * len(country_set) < 999:
            country_set.add("")
        if len(names) * len(country_set) < 999:
            fp = fingerprint_name(entity.caption)
            if fp is not None and fp not in names:
                names.append(fp)
        for name in entity.get("name", quiet=True):
            if len(names) * len(country_set) >= 999:
                break
            fp = fingerprint_name(entity.caption)
            if fp is not None and fp not in names:
                names.append(fp)
        sio = io.StringIO()
        writer = csv.writer(sio, dialect=csv.unix_dialect, delimiter=",")
        # LocalID,Standard Identifier,Name,Country,Street,City,PostalCode,State,Website
        writer.writerow(["LocalID", "Standard Identifier", "Name", "Country"])
        lei_code = entity.first("leiCode", quiet=True)
        if lei_code is not None:
            lei_code = f"LEI:{lei_code}"
        else:
            lei_code = ""
        for name, country in list(product(names, country_set))[:999]:
            writer.writerow([entity.id, lei_code, name, country])
        sio.seek(0)
        return sio.getvalue().encode("utf-8")

    @lru_cache(maxsize=1000)
    def fetch_placename(self, value: Optional[str]) -> Optional[str]:
        if value is None:
            return None
        if not value.startswith("http://sws.geonames.org/"):
            raise ValueError("Not a GeoNames URL: %s" % value)
        url = urljoin(value, "about.rdf")
        res = self.http_get_cached(url, cache_days=120)
        try:
            doc = etree.fromstring(res.encode("utf=8"))
        except Exception:
            log.warn("Invalid GeoNames response: %s", url)
            self.http_remove_cache(url)
            return None
        for code in doc.findall(".//%scountryCode" % GN):
            return code.text
        for name in doc.findall(".//%sname" % GN):
            return name.text
        return value

    def fetch_permid(self, url: str) -> Optional[Dict[str, Any]]:
        params = {"format": "json-ld"}
        hidden = {"access-token": self.api_token}
        res_raw = self.http_get_cached(url, params=params, hidden=hidden, cache_days=90)
        try:
            return cast(Dict[str, Any], json.loads(res_raw))
        except Exception:
            log.info("Invalid response from PermID: %s", url)
            self.http_remove_cache(url, params=params)
            return None

    def fetch_perm_org(self, entity: SE, url: str) -> Optional[SE]:
        res = self.fetch_permid(url)
        if res is None:
            return None
        res.pop("@id", None)
        res.pop("@type", None)
        res.pop("@context", None)
        res.pop("hasPrimaryIndustryGroup", None)

        perm_id = res.pop("tr-common:hasPermId", url.rsplit("-", 1)[-1])
        lei_code = res.pop("tr-org:hasLEI", None)
        match = self.make_entity(entity, "Company")
        match.id = f"lei-{lei_code}" if lei_code is not None else f"permid-{perm_id}"
        match.add("sourceUrl", url)
        match.add("leiCode", lei_code)
        match.add("permId", perm_id)
        match.add("name", res.pop("vcard:organization-name", None))
        match.add("website", res.pop("hasURL", None))
        match.add("country", self.fetch_placename(res.pop("isDomiciledIn", None)))
        incorporated = self.fetch_placename(res.pop("isIncorporatedIn", None))
        match.add("jurisdiction", incorporated)
        inc_date = res.pop("hasLatestOrganizationFoundedDate", None)
        match.add("incorporationDate", inc_date)

        hq_addr = res.pop("mdaas:HeadquartersAddress", None)
        reg_addr = res.pop("mdaas:RegisteredAddress", None)
        for addr in (hq_addr, reg_addr):
            if addr is not None:
                addr = ", ".join(addr.split("\n"))
                addr = addr.replace(",,", ",").strip().strip(",")
                match.add("address", addr)
        status_uri = res.pop("hasActivityStatus", None)
        status = STATUS.get(status_uri)
        if status is None:
            log.warning("Unknown status: %s" % status_uri)
        match.add("status", status)
        match.add("phone", res.pop("tr-org:hasHeadquartersPhoneNumber", None))
        match.add("phone", res.pop("tr-org:hasRegisteredPhoneNumber", None))
        res.pop("tr-org:hasHeadquartersFaxNumber", None)
        res.pop("tr-org:hasRegisteredFaxNumber", None)

        quote = res.pop("hasOrganizationPrimaryQuote", None)
        if quote is not None:
            quote_res = self.fetch_permid(quote)
            if quote_res is not None:
                match.add("ticker", quote_res.pop("tr-fin:hasExchangeTicker", None))
                match.add("ricCode", quote_res.pop("tr-fin:hasRic", None))
                match.add("topics", "corp.public")
        return match

    def match(self, entity: SE) -> Generator[SE, None, None]:
        if self.quota_exceeded:
            return
        if not entity.schema.is_a("Organization"):
            return
        try:
            for permid in entity.get("permId", quiet=True):
                permid_url = f"https://permid.org/1-{permid}"
                match = self.fetch_perm_org(entity, permid_url)
                if match is not None:
                    yield match
            headers = {
                "x-openmatch-numberOfMatchesPerRecord": "4",
                "X-AG-Access-Token": self.api_token,
                "x-openmatch-dataType": "Organization",
            }
            cache_key = f"permid:{entity.id}"
            query = self.entity_to_queries(entity)
            res = self.http_post_json_cached(
                self.MATCHING_API,
                cache_key,
                data=query,
                headers=headers,
                cache_days=self.cache_days,
            )
            seen_matches: Set[str] = set()
            for result in res.get("outputContentResponse", []):
                match_permid_url = result.get("Match OpenPermID")
                if match_permid_url is None or match_permid_url in seen_matches:
                    continue
                seen_matches.add(match_permid_url)
                match = self.fetch_perm_org(entity, match_permid_url)
                if match is not None:
                    yield match
        except EnrichmentAbort as exc:
            self.quota_exceeded = True
            log.warning("PermID quota exceeded: %s", exc)

    def expand(self, entity: SE, match: SE) -> Generator[SE, None, None]:
        yield match
  • api_token — PermID API token. Defaults to the PERMID_API_TOKEN environment variable.

BrightQuery

nomenklatura.enrich.brightquery.BrightQueryEnricher

Bases: Enricher[DS]

Match organizations against the BrightQuery Business Identity API, which covers US legal entities and their state registrations. Requires an API key.

Source code in nomenklatura/enrich/brightquery.py
class BrightQueryEnricher(Enricher[DS]):
    """Match organizations against the BrightQuery Business Identity API,
    which covers US legal entities and their state registrations. Requires
    an API key."""

    BASE_URL = "https://apigw.brightquery.com/search/identity/org"

    def __init__(
        self,
        dataset: DS,
        cache: Cache,
        config: EnricherConfig,
        session: Optional[requests.Session] = None,
    ):
        super().__init__(dataset, cache, config, session)
        self._api_key = self.get_config_expand("api_key", "${BRIGHTQUERY_API_KEY}")
        if not self._api_key:
            raise ValueError("Missing BrightQuery API key.")

        retries = Retry(
            total=5,
            backoff_factor=40,
            backoff_max=600,
            status_forcelist=[
                504,  # 504 Gateway Timeout
                502,  # 502 Bad Gateway
            ]
            + list(Retry.RETRY_AFTER_STATUS_CODES),
            allowed_methods=frozenset(["POST"]),
        )
        adapter = HTTPAdapter(max_retries=retries)
        self.session.mount("https://apigw.brightquery.com", adapter)
        self.session.headers.update({"x-api-key": self._api_key})

        self.skip_jurisdictions = set(self.get_config_list("skip_jurisdictions"))
        """Set of jurisdiction codes to skip during enrichment because they're not covered by
        BrightQuery/opendata.org."""

    def create_proxy(
        self, entity: SE, child: Dict[str, Any]
    ) -> Generator[SE, None, None]:
        # Primary, most common name of the Organization, which equals the name of
        # the ultimate parent or sole entity that comprises the Organization.
        org_name = child.get("bq_organization_name")
        # Some records do not have Legal Entity names. Then we fall back to the org_name.
        name = child.get("bq_legal_entity_name") or org_name
        if not name:
            log.warning(
                "BrightQuery record without name: %s",
                child.get("bq_legal_entity_id"),
            )
            return
        proxy = self.make_entity(entity, "Company")
        # Unique ID of the Organization. An Organization is the concept of a company,
        # which is constructed as a collection of Legal Entities (child and parent entities)
        # and Locations (e.g., offices, stores).
        bq_org_id = child.get("bq_organization_id")
        # Unique ID of the Legal Entity. A Legal Entity is part of an Organization and is
        # registered with the Secretary of State of a jurisdiction.
        # LegalEntity is the primary object of interest for our processing.
        bq_entity_id = child.get("bq_legal_entity_id")
        if bq_entity_id is not None:
            proxy.id = slugify(bq_entity_id, "-")
        if proxy.id is None and bq_org_id is not None:
            proxy.id = f"bqo-{slugify(bq_org_id, '-')}"

        if proxy.id is None:
            log.error("BrightQuery record without IDs: %s", name)
            return

        if len(proxy.id) > registry.entity.max_length:
            log.error(
                "BrightQuery generated ID too long (%d): %s",
                len(proxy.id),
                proxy.id,
            )
            return

        if not proxy.id.startswith("bq-") and not proxy.id.startswith("bqo-"):
            log.error("BrightQuery ID without prefix: %s", proxy.id)
            return

        # Legal name of the Legal Entity
        proxy.add("name", name)
        proxy.add("brightQueryOrgId", bq_org_id)
        proxy.add("brightQueryId", bq_entity_id)
        # Link to the Organization's primary website.
        proxy.add("website", child.get("bq_website"))
        proxy.add("address", child.get("bq_legal_entity_address_summary"))
        # Jurisdiction code (2-digit state name) in which the Legal Entity is registered,
        # typically with the Secretary of State.
        proxy.add("jurisdiction", child.get("bq_legal_entity_jurisdiction_code"))
        # Date on which the Legal Entity was registered with the Secretary of State.
        founded = child.get("bq_legal_entity_date_founded")
        proxy.add("incorporationDate", founded)
        log.info("Candidate [%s]: %s", proxy.id, name)
        yield proxy

    def search(self, payload: dict[str, Any]) -> Generator[Dict[str, str], None, None]:
        cache_id = hash_data(payload)
        cache_key = f"{self.BASE_URL}:{cache_id}"

        # We have to re-implement http_post_json_cached here because the endpoint doesn't
        # return JSON when there are no results.
        resp_data = self.cache.get_json(cache_key, max_age=self.cache_days)
        if not resp_data:
            log.info("BrightQuery search: %r", payload)
            response = self.session.post(self.BASE_URL, json=payload, timeout=(10, 300))
            # When no results are found, the API helpfully doesn't return JSON
            # but just a 204 with an empty response body.
            if response.status_code == 204:
                # Cache the empty result to avoid hitting the API again
                # for the same query.
                self.cache.set_json(cache_key, {})
                return
            response.raise_for_status()
            resp_data = response.json()
            self.cache.set_json(cache_key, resp_data)
        # Number of records per hit is 10. Records are sorted by revenue and employees headcount.
        for child in resp_data.get("root", {}).get("children", []):
            yield child

    def match(self, entity: SE) -> Generator[SE, None, None]:
        if not entity.schema.is_a("Organization"):
            return

        countries = entity.get_type_values(registry.country, matchable=True)
        if len(countries) > 0 and all(c in self.skip_jurisdictions for c in countries):
            log.info(
                "Skipping BrightQuery enrichment for %r: unsupported jurisdictions %r",
                entity.id,
                countries,
            )
            return

        # Get the name and address to search
        addresses = entity.get("address")
        address = max(addresses, key=len) if len(addresses) > 0 else None
        for name in entity.get("name"):
            # If we have an address, we can search by both name and address
            payload = {"company_name": name}
            if address:
                payload["address"] = address
            for match in self.search(payload):
                yield from self.create_proxy(entity, match)

    def expand(self, entity: SE, match: SE) -> Generator[SE, None, None]:
        yield match
  • api_key (required) — BrightQuery API key. Defaults to the BRIGHTQUERY_API_KEY environment variable.
  • skip_jurisdictions — list of jurisdiction codes to exclude, for entities outside BrightQuery's US coverage.