Skip to content

Wikidata

WikidataClient reads items from the Wikidata API and SPARQL endpoint, with language-aware label selection and database-backed caching of responses.

Wikidata is a useful source of structured data on politicians, companies, and other entities of interest. This module is the low-level client used by the Wikidata enricher and by crawlers that turn Wikidata items into FollowTheMoney entities. It handles the parts that are error-prone to reimplement: request throttling and retries, response caching via a nomenklatura.cache.Cache, and picking a display label from the many languages an item may carry.

The client returns items as Item objects, which expose labels, aliases, descriptions, and claims. A Claim is one property statement on an item — for example P569 (date of birth) — with its qualifiers and references. Text values are wrapped in LangText, which keeps the language tag alongside the string.

Fetching an item requires a Cache, which stores API responses in the same SQL database the rest of nomenklatura uses:

from followthemoney import Dataset
from nomenklatura.cache import Cache
from nomenklatura.db import make_session
from nomenklatura.wikidata import WikidataClient

dataset = Dataset.make({"name": "wikidata_demo", "title": "Wikidata demo"})
with make_session() as session:
    cache = Cache(session, dataset, create=True)
    client = WikidataClient(cache)
    item = client.fetch_item("Q7747")
    if item is not None:
        print(item.id, client.get_label(item.id))

Interface

nomenklatura.wikidata.WikidataClient

Bases: object

Read items and labels from the Wikidata API and SPARQL endpoint.

Responses are cached in a SQL-backed Cache so that crawlers and enrichers can re-run without fetching the same data again, and requests carry a descriptive user agent and retry handling to stay within Wikidata's API etiquette.

Source code in nomenklatura/wikidata/client.py
class WikidataClient(object):
    """Read items and labels from the Wikidata API and SPARQL endpoint.

    Responses are cached in a SQL-backed `Cache` so that crawlers and enrichers
    can re-run without fetching the same data again, and requests carry a
    descriptive user agent and retry handling to stay within Wikidata's API
    etiquette."""

    WD_API = "https://www.wikidata.org/w/api.php"
    QUERY_API = "https://query.wikidata.org/sparql"
    QUERY_HEADERS = {
        "Accept": "application/sparql-results+json",
        "User-Agent": USER_AGENT,
    }
    CACHE_SHORT = 1
    CACHE_MEDIUM = CACHE_SHORT * 7
    CACHE_LONG = CACHE_SHORT * 30

    LABEL_PREFIX = "wd:lb:"
    LABEL_CACHE_DAYS = 100

    def __init__(
        self, cache: Cache, session: Optional[Session] = None, cache_days: int = 14
    ) -> None:
        self.cache = cache
        # A bare session gets 403'd (default UA) and throttled by Wikidata, so
        # default to a configured session with a descriptive UA and retries.
        self.session = session or make_session()
        self.cache_days = cache_days
        # self.cache.preload(f"{self.LABEL_PREFIX}%")

    @lru_cache(maxsize=MEMO_SMALL)
    def fetch_item(
        self,
        qid: str,
        cache_days: Optional[int] = None,
        randomize: bool = True,
    ) -> Optional[Item]:
        # https://www.mediawiki.org/wiki/Wikibase/API
        # https://www.wikidata.org/w/api.php?action=help&modules=wbgetentities
        params = {
            "format": "json",
            "ids": qid,
            "action": "wbgetentities",
            # Ask for sitelink URLs for proper wikipedia links:
            "props": "info|sitelinks/urls|aliases|labels|descriptions|claims|datatype",
        }
        url = build_url(self.WD_API, params=params)
        cache_days = cache_days or self.cache_days
        raw = self.cache.get(url, max_age=cache_days, randomize=randomize)
        if raw is None:
            log.debug("Cache MISS fetching Wikidata item: %s cache_days=%s", qid, cache_days)
            res = self.session.get(url)
            res.raise_for_status()
            raw = res.text
            self.cache.set(url, raw)
        else:
            log.debug("Cache HIT fetching Wikidata item: %s cache_days=%s", qid, cache_days)
        data = json.loads(raw)
        entity = data.get("entities", {}).get(qid)
        if entity is None:
            return None
        item = Item(self, entity)
        if item.id != qid:
            # Redirected/merged item:
            return self.fetch_item(item.id, cache_days=cache_days, randomize=randomize)
        return item

    @lru_cache(maxsize=100000)
    def get_label(self, qid: str) -> LangText:
        cache_key = f"{self.LABEL_PREFIX}{qid}"
        cached = self.cache.get_json(cache_key, max_age=self.LABEL_CACHE_DAYS)
        if cached is not None:
            return LangText.parse(cached)
        params = {
            "format": "json",
            "ids": qid,
            "action": "wbgetentities",
            "props": "labels",
        }
        url = build_url(self.WD_API, params=params)
        res = self.session.get(url)
        res.raise_for_status()
        data: Dict[str, Any] = res.json()
        entity = data.get("entities", {}).get(qid)
        if entity is None:
            return LangText(None)
        labels = LangText.from_dict(entity.get("labels", {}))
        label = LangText.pick(labels)
        if label is None:
            label = LangText(qid)
        label.original = qid
        self.cache.set_json(cache_key, label.pack())
        return label

    def query(
        self, query_text: str, cache_days: Optional[int] = None
    ) -> SparqlResponse:
        """Query the Wikidata SPARQL endpoint.

        Args:
          cache_days: overrides the client-level default for this call.
        """
        clean_text = squash_spaces(query_text)
        if len(clean_text) == 0:
            raise RuntimeError("Invalid query: %r" % query_text)
        params = {"query": clean_text}
        url = build_url(self.QUERY_API, params=params)
        effective_cache = cache_days if cache_days is not None else self.cache_days
        raw = self.cache.get(url, max_age=effective_cache)
        if raw is None:
            res = self.session.get(url, headers=self.QUERY_HEADERS)
            res.raise_for_status()
            raw = res.text
            self.cache.set(url, raw)
        try:
            data = json.loads(raw)
        except json.JSONDecodeError as err:
            self.cache.delete(url)
            log.exception("Failed to parse JSON: %s", err)
            return SparqlResponse(
                clean_text, {"head": {"vars": []}, "results": {"bindings": []}}
            )
        return SparqlResponse(clean_text, data)

    def search_items(
        self, entity: StatementEntity, aliases: bool = False, limit: int = 7
    ) -> List[str]:
        """Find Wikidata QIDs that might be the same as an OpenSanctions entity.

        Reach for this when reconciling an OS entity against Wikidata: it runs the
        entity's names through the `wbsearchentities` API and returns candidate
        QIDs for a downstream matcher to rank. It returns only QIDs — the caller
        decides which items to fetch and how to project them — so the client stays
        decoupled from the matcher's needs.

        All `name` values are searched. With `aliases`, the search also covers
        aliases (every matchable name-type value), trading more API calls for
        better recall on transliterated or aliased names. `limit` is the per-name
        result cap (the `wbsearchentities` default is 7, max 50); raise it for
        better recall on common names.
        """
        if aliases:
            names = entity.get_type_values(registry.name, matchable=True)
        else:
            names = entity.get("name", quiet=True)
        qids: List[str] = []
        seen: Set[str] = set()
        for name in names:
            for qid in self._search_name(name, limit=limit):
                if qid not in seen:
                    seen.add(qid)
                    qids.append(qid)
        return qids

    def _search_name(self, name: str, limit: int = 7) -> List[str]:
        if not name.strip():
            return []
        params = {
            "format": "json",
            "action": "wbsearchentities",
            "type": "item",
            "language": "en",
            "strictlanguage": "false",
            "limit": str(limit),
            "search": name,
        }
        url = build_url(self.WD_API, params=params)
        raw = self.cache.get(url, max_age=self.cache_days)
        if raw is None:
            res = self.session.get(url)
            res.raise_for_status()
            raw = res.text
            self.cache.set(url, raw)
        data = json.loads(raw)
        results = data.get("search")
        if results is None:
            # A response without a `search` key is malformed/transient; don't
            # keep it around to be served from cache.
            self.cache.delete(url)
            log.info("Wikidata search has no results: %s", name)
            return []
        qids: List[str] = []
        for result in results:
            qid = result.get("id")
            if qid is not None and is_qid(qid):
                qids.append(qid)
        return qids

    @lru_cache(maxsize=30000)
    def _type_props(self, qid: str) -> List[str]:
        item = self.fetch_item(qid)
        if item is None:
            return []
        types: List[str] = []
        for claim in item.claims:
            # historical countries are always historical:
            ended = claim.is_ended and claim.qid != "Q3024240"
            if ended or claim.qid is None or claim.deprecated:
                continue
            if claim.property in ("P31", "P279"):
                types.append(claim.qid)
        return types

    def __repr__(self) -> str:
        return "<WikidataClient()>"

    def __hash__(self) -> int:
        return 42

query(query_text, cache_days=None)

Query the Wikidata SPARQL endpoint.

Parameters:

Name Type Description Default
cache_days Optional[int]

overrides the client-level default for this call.

None
Source code in nomenklatura/wikidata/client.py
def query(
    self, query_text: str, cache_days: Optional[int] = None
) -> SparqlResponse:
    """Query the Wikidata SPARQL endpoint.

    Args:
      cache_days: overrides the client-level default for this call.
    """
    clean_text = squash_spaces(query_text)
    if len(clean_text) == 0:
        raise RuntimeError("Invalid query: %r" % query_text)
    params = {"query": clean_text}
    url = build_url(self.QUERY_API, params=params)
    effective_cache = cache_days if cache_days is not None else self.cache_days
    raw = self.cache.get(url, max_age=effective_cache)
    if raw is None:
        res = self.session.get(url, headers=self.QUERY_HEADERS)
        res.raise_for_status()
        raw = res.text
        self.cache.set(url, raw)
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as err:
        self.cache.delete(url)
        log.exception("Failed to parse JSON: %s", err)
        return SparqlResponse(
            clean_text, {"head": {"vars": []}, "results": {"bindings": []}}
        )
    return SparqlResponse(clean_text, data)

search_items(entity, aliases=False, limit=7)

Find Wikidata QIDs that might be the same as an OpenSanctions entity.

Reach for this when reconciling an OS entity against Wikidata: it runs the entity's names through the wbsearchentities API and returns candidate QIDs for a downstream matcher to rank. It returns only QIDs — the caller decides which items to fetch and how to project them — so the client stays decoupled from the matcher's needs.

All name values are searched. With aliases, the search also covers aliases (every matchable name-type value), trading more API calls for better recall on transliterated or aliased names. limit is the per-name result cap (the wbsearchentities default is 7, max 50); raise it for better recall on common names.

Source code in nomenklatura/wikidata/client.py
def search_items(
    self, entity: StatementEntity, aliases: bool = False, limit: int = 7
) -> List[str]:
    """Find Wikidata QIDs that might be the same as an OpenSanctions entity.

    Reach for this when reconciling an OS entity against Wikidata: it runs the
    entity's names through the `wbsearchentities` API and returns candidate
    QIDs for a downstream matcher to rank. It returns only QIDs — the caller
    decides which items to fetch and how to project them — so the client stays
    decoupled from the matcher's needs.

    All `name` values are searched. With `aliases`, the search also covers
    aliases (every matchable name-type value), trading more API calls for
    better recall on transliterated or aliased names. `limit` is the per-name
    result cap (the `wbsearchentities` default is 7, max 50); raise it for
    better recall on common names.
    """
    if aliases:
        names = entity.get_type_values(registry.name, matchable=True)
    else:
        names = entity.get("name", quiet=True)
    qids: List[str] = []
    seen: Set[str] = set()
    for name in names:
        for qid in self._search_name(name, limit=limit):
            if qid not in seen:
                seen.add(qid)
                qids.append(qid)
    return qids

nomenklatura.wikidata.Item

Bases: object

A wikidata item (or entity).

Source code in nomenklatura/wikidata/model.py
class Item(object):
    """A wikidata item (or entity)."""

    def __init__(self, client: "WikidataClient", data: Dict[str, Any]) -> None:
        self.client = client
        self.id: str = data.pop("id")
        self.modified: Optional[str] = data.pop("modified", None)

        self.labels: Set[LangText] = LangText.from_dict(data.pop("labels", {}))
        self.aliases: Set[LangText] = LangText.from_dict(data.pop("aliases", {}))

        descriptions = LangText.from_dict(data.pop("descriptions", {}))
        self.description = LangText.pick(descriptions)

        self.claims: List[Claim] = []
        claims: Dict[str, List[Dict[str, Any]]] = data.pop("claims", {})
        for prop, values in claims.items():
            for value in values:
                self.claims.append(Claim(client, value, prop))

        # Merged pages handling:
        redirects = data.pop("redirects", {})
        self.redirect_id = redirects.get("to", None)
        if self.redirect_id is not None:
            self.id = self.redirect_id

        self.sitelinks: List[SiteLink] = []
        for data in data.pop("sitelinks", {}).values():
            self.sitelinks.append(SiteLink(self.id, data))

    @property
    def label(self) -> Optional[LangText]:
        label = LangText.pick(self.labels)
        if label is not None:
            return label
        return LangText.pick(self.aliases)

    @property
    def sorted_labels(self) -> List[LangText]:
        return LangText.sorted(self.labels)

    @property
    def sorted_aliases(self) -> List[LangText]:
        return LangText.sorted(self.aliases)

    @property
    def wikilinks(self) -> List[SiteLink]:
        wikilinks = [s for s in self.sitelinks if s.is_wiki]
        # Skip commonswiki since it doesn't offer much more than wikidata as a wiki website.
        return [s for s in wikilinks if s.site != "commonswiki"]

    def is_instance(self, qid: str) -> bool:
        for claim in self.claims:
            if claim.property == "P31" and claim.qid == qid:
                return True
        return False

    def _types(self, path: List[str]) -> Set[str]:
        qid = path[-1]
        types = set([qid])
        if len(path) > 6:
            return types
        for type_ in self.client._type_props(qid):
            if type_ not in path:
                types.update(self._types(path + [type_]))
        return types

    @property
    def types(self) -> Set[str]:
        """Get all the `instance of` and `subclass of` types for an item."""
        return self._types([self.id])

    def __repr__(self) -> str:
        return f"<Item({self.id})>"

    def __hash__(self) -> int:
        return hash(self.id)

types property

Get all the instance of and subclass of types for an item.

nomenklatura.wikidata.Claim

Bases: Snak

One property statement on a Wikidata item — e.g. P569 (date of birth) on a person — including its qualifiers, references, and rank.

Source code in nomenklatura/wikidata/model.py
class Claim(Snak):
    """One property statement on a Wikidata item — e.g. `P569` (date of birth)
    on a person — including its qualifiers, references, and rank."""

    def __init__(
        self, client: "WikidataClient", data: Dict[str, Any], prop: str
    ) -> None:
        self.id = data.pop("id")
        self.rank = data.pop("rank")
        super().__init__(client, data.pop("mainsnak"))
        self.qualifiers: Dict[str, List[Snak]] = {}
        for prop, snaks in data.pop("qualifiers", {}).items():
            self.qualifiers[prop] = [Snak(client, s) for s in snaks]

        self.references = [Reference(client, r) for r in data.pop("references", [])]
        self.property = self.property or prop
        self.deprecated = bool(self.rank == "deprecated")

    def get_qualifier(self, prop: str) -> List[Snak]:
        return self.qualifiers.get(prop, [])

    @property
    def is_ended(self) -> bool:
        snak = self.qualifiers.get("P582")
        if snak is not None and len(snak) > 0:
            return True
        return False

    def __repr__(self) -> str:
        return f"<Claim({self.qid}, {self.property}, {self.value_type})>"

    def __hash__(self) -> int:
        return hash((self.qid, self.property, self.id))

nomenklatura.wikidata.LangText

Bases: object

A text value together with the language it is expressed in.

Wikidata labels and descriptions exist in many languages. Keeping the language tag with the string lets apply() write the value to an entity property with the language attached, and lets callers pick a preferred display language.

Source code in nomenklatura/wikidata/lang.py
class LangText(object):
    """A text value together with the language it is expressed in.

    Wikidata labels and descriptions exist in many languages. Keeping the
    language tag with the string lets `apply()` write the value to an entity
    property with the language attached, and lets callers pick a preferred
    display language."""

    __slots__ = ["text", "lang", "original"]

    def __init__(
        self,
        text: Optional[str],
        lang: Optional[str] = None,
        original: Optional[str] = None,
    ) -> None:
        if text is None or len(text.strip()) == 0:
            text = None
        if text is not None:
            text = remove_unsafe_chars(text)
        self.text = text
        self.lang: Optional[str] = None
        if lang is not None:
            if lang == MULTI_LANG:
                self.lang = MULTI_LANG
            else:
                self.lang = registry.language.clean_text(lang)
        if lang is not None and self.lang is None:
            # Language is given, but it is not one supported by the FtM ecosystem:
            self.text = None
        self.original = original or self.text

    def apply(
        self,
        entity: StatementEntity,
        prop: str,
        clean: Optional[Callable[[str], Optional[str]]] = None,
    ) -> None:
        if self.text is None:
            return
        clean_text = self.text if clean is None else clean(self.text)
        if clean_text is None or clean_text.strip() == "":
            return
        lang = None if self.lang == MULTI_LANG else self.lang
        entity.add(prop, clean_text, lang=lang, original_value=self.original)

    def pack(self) -> Dict[str, Optional[str]]:
        data = {"t": self.text, "l": self.lang}
        if self.original is not None and self.original != self.text:
            data["o"] = self.original
        return data

    @classmethod
    def parse(cls, data: Dict[str, Optional[str]]) -> "LangText":
        return LangText(data["t"], data["l"], original=data.get("o"))

    @classmethod
    def pick(cls, texts: Iterable["LangText"]) -> Optional["LangText"]:
        for lang in PREFERRED_WD_LANGS:
            for lt in texts:
                if lt.lang == lang:
                    return lt
        for lt in texts:
            return lt
        return None

    @classmethod
    def sorted(cls, texts: Iterable["LangText"]) -> List["LangText"]:
        def sort_key(lt: LangText) -> Any:
            if lt.lang is None or lt.lang not in PREFERRED_WD_LANGS:
                index = len(PREFERRED_WD_LANGS)
            else:
                index = PREFERRED_WD_LANGS.index(lt.lang) + 1
            return (index, lt.text or "")

        return sorted(texts, key=sort_key)

    @classmethod
    def from_dict(cls, data: Dict[str, List[Dict[str, str]]]) -> Set["LangText"]:
        langs: Set[LangText] = set()
        for objs in data.values():
            if not isinstance(objs, list):
                objs = [objs]
            for obj in objs:
                value = obj["value"]
                if value is None:
                    continue
                lang = obj["language"]
                lt = LangText(value, lang, original=value)
                if lt.text is None:
                    continue
                langs.add(lt)
        return langs

    def __str__(self) -> str:
        if self.text is None:
            return ""
        return self.text

    def __hash__(self) -> int:
        return hash((self.text, self.lang, self.original))

    def __eq__(self, other: Any) -> bool:
        return hash(self) == hash(other)

    def __repr__(self) -> str:
        return f"<LangText({self.text!r}, {self.lang!r}, {self.original!r})>"