Skip to content

Resolver

The resolver records judgements about whether pairs of entities are the same, and derives a canonical identifier for each cluster of merged entities.

Deduplication in nomenklatura is non-destructive: instead of rewriting entity records, every decision — "these two are the same", "these are different", "not sure yet" — is stored as an edge between two entity IDs. The Judgement on each edge can be POSITIVE, NEGATIVE, or UNSURE; candidate pairs produced by nk xref are stored as NO_JUDGEMENT edges until a human decides them. Positive judgements are transitive: if A is B and B is C, then A, B, and C form one cluster, and the resolver assigns the whole cluster one canonical ID. Source data stays untouched, and any decision can be revisited later.

The Resolver is backed by a SQL database via SQLAlchemy — by default a SQLite file named nomenklatura.db in the working directory. Set NOMENKLATURA_DB_URL to use a different database, e.g. PostgreSQL for a shared, long-running installation.

The Linker is the read-only view of the same information: a plain mapping from entity IDs to canonical IDs, holding only the positive merges. Loading a Linker (via Resolver.get_linker()) takes much less memory than the full resolver, so prefer it when applying decisions in bulk — for example when streaming statements through nk apply-statements.

To decide entity pairs programmatically rather than through the nk dedupe interface:

from nomenklatura import Resolver, Judgement
from nomenklatura.db import make_session

with make_session() as session:
    resolver = Resolver(session, create=True)
    resolver.load_into_memory()
    canonical = resolver.decide(
        "source-a-entity-17",
        "source-b-entity-233",
        Judgement.POSITIVE,
    )
# Exiting the session block commits the decision to the database.

Interface

nomenklatura.resolver.Resolver

Bases: Linker[SE]

Store judgements about which entities are the same and derive a canonical identifier for each cluster of merged entities.

Positive judgements are transitive: clusters are the connected components of the judgement graph. Decisions persist in a SQL table, so they survive re-runs and can be shared between processes. For read-only bulk application of the decisions, use the leaner Linker returned by get_linker().

Source code in nomenklatura/resolver/resolver.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
class Resolver(Linker[SE]):
    """Store judgements about which entities are the same and derive a canonical
    identifier for each cluster of merged entities.

    Positive judgements are transitive: clusters are the connected components of
    the judgement graph. Decisions persist in a SQL table, so they survive
    re-runs and can be shared between processes. For read-only bulk application
    of the decisions, use the leaner `Linker` returned by `get_linker()`.
    """

    UNDECIDED = (Judgement.NO_JUDGEMENT, Judgement.UNSURE)

    def __init__(
        self,
        session: Session,
        create: bool = False,
        table_name: str = "resolver",
    ) -> None:
        self._session = session
        # The initial load only needs active edges.
        self._max_ts: Optional[str] = None
        # Suggestions remain in the table; hot reads use these derived indexes.
        self._linker: Linker[SE] = Linker({})
        self._blockers: Dict[Tuple[str, str], Judgement] = {}

        unique_kw: Dict[str, Any] = {"unique": True}
        if session.is_sqlite:
            unique_kw["sqlite_where"] = text("deleted_at IS NULL")
        if session.is_postgres:
            unique_kw["postgresql_where"] = text("deleted_at IS NULL")
        unique_pair = Index(
            f"{table_name}_source_target_uniq",
            text("source"),
            text("target"),
            **unique_kw,
        )
        # Backs the candidate scan: NO_JUDGEMENT suggestions ordered by score.
        suggested = Index(
            f"{table_name}_judgement_score",
            text("judgement"),
            text("score"),
        )
        self._table = Table(
            table_name,
            MetaData(),
            Column("id", Integer(), primary_key=True),
            Column("target", Unicode(512), index=True),
            Column("source", Unicode(512), index=True),
            Column("judgement", Unicode(14), nullable=False),
            Column("score", Float, nullable=True),
            Column("user", Unicode(512), nullable=False),
            Column("created_at", Unicode(28)),
            Column("deleted_at", Unicode(28), nullable=True),
            unique_pair,
            suggested,
        )
        if create:
            session.create(self._table)

    def _index_row(self, target: str, source: str, judgement: Judgement) -> None:
        """Fold a live database row into the in-memory indexes."""
        if judgement == Judgement.POSITIVE:
            self._linker.add(source, target)
        elif judgement in (Judgement.NEGATIVE, Judgement.UNSURE):
            self._blockers[(target, source)] = judgement

    def _load_all(self) -> None:
        """Rebuild both indexes from scratch over every live edge."""
        self._linker = Linker({})
        self._blockers = {}
        max_ts: Optional[str] = None
        stmt = select(
            self._table.c.target,
            self._table.c.source,
            self._table.c.judgement,
            self._table.c.created_at,
        ).where(self._table.c.deleted_at.is_(None))
        cursor = self._session.execute(stmt)
        while batch := cursor.fetchmany(10000):
            for row in batch:
                target, source, judgement_, created_at = row
                if created_at is not None:
                    if max_ts is None or created_at > max_ts:
                        max_ts = created_at
                self._index_row(target, source, Judgement(judgement_))
        cursor.close()
        self._max_ts = max_ts

    def _update_from_db(self) -> None:
        """Apply this session's recent writes to the indexes.

        Removing a positive edge may split a cluster and requires a full rebuild.
        Use ``load_into_memory`` to pick up writes from other sessions.
        """
        if self._max_ts is None:
            self._load_all()
            return
        stmt = select(
            self._table.c.target,
            self._table.c.source,
            self._table.c.judgement,
            self._table.c.created_at,
            self._table.c.deleted_at,
        ).where(
            or_(
                self._table.c.created_at >= self._max_ts,
                self._table.c.deleted_at >= self._max_ts,
            )
        )
        cursor = self._session.execute(stmt)
        max_ts = self._max_ts
        needs_rebuild = False
        while batch := cursor.fetchmany(10000):
            for row in batch:
                target, source, judgement_, created_at, deleted_at = row
                judgement = Judgement(judgement_)
                if created_at is not None and created_at > max_ts:
                    max_ts = created_at
                if deleted_at is not None:
                    if deleted_at > max_ts:
                        max_ts = deleted_at
                    if judgement == Judgement.POSITIVE:
                        needs_rebuild = True
                    else:
                        self._blockers.pop((target, source), None)
                else:
                    self._index_row(target, source, judgement)
        cursor.close()
        if needs_rebuild:
            self._load_all()
        else:
            self._max_ts = max_ts

    def load_into_memory(self) -> None:
        """Load the in-memory indexes from the database.

        Call this to pick up database writes made by another session. A full
        rebuild is required because commit order can differ from write order.
        """
        self._load_all()

    def get_linker(self) -> Linker[SE]:
        """Return a linker object that can be used to resolve entities.
        This is less memory-consuming than the full resolver object.
        """
        linker: Linker[SE] = Linker({})
        stmt = self._table.select()
        stmt = stmt.where(self._table.c.judgement == Judgement.POSITIVE.value)
        stmt = stmt.where(self._table.c.deleted_at.is_(None))
        stmt.order_by(self._table.c.created_at.asc())
        cursor = self._session.execute(stmt)
        while batch := cursor.fetchmany(20000):
            for row in batch:
                linker.add(row.source, row.target)
        cursor.close()
        return linker

    def get_edge(self, left_id: StrIdent, right_id: StrIdent) -> Optional[Edge]:
        (target, source) = Identifier.pair(left_id, right_id)
        stmt = self._table.select()
        stmt = stmt.where(self._table.c.target == target.id)
        stmt = stmt.where(self._table.c.source == source.id)
        stmt = stmt.where(self._table.c.deleted_at.is_(None))
        row = self._session.execute(stmt).first()
        if row is None:
            return None
        return Edge.from_dict(row._mapping)

    def connected(self, node: Identifier) -> Set[Identifier]:
        return self._linker.connected(node)

    def get_canonical(self, entity_id: str) -> str:
        """Return the canonical identifier for the given entity ID."""
        node = Identifier.get(entity_id)
        max_ = max(self.connected(node))
        if max_.canonical:
            return max_.id
        return node.id

    def canonicals(self) -> Generator[Identifier, None, None]:
        """Return all the canonical cluster identifiers."""
        return self._linker.canonicals()

    def get_referents(self, canonical_id: str, canonicals: bool = True) -> Set[str]:
        """Get all the non-canonical entity identifiers which refer to a given
        canonical identifier."""
        node = Identifier.get(canonical_id)
        referents: Set[str] = set()
        for connected in self.connected(node):
            if not canonicals and connected.canonical:
                continue
            if connected == node:
                continue
            referents.add(connected.id)
        return referents

    def get_resolved_edge(
        self, left_id: StrIdent, right_id: StrIdent
    ) -> Optional[Edge]:
        """
        Return _some_ edge that connects the two entities, if it exists.
        """
        (left, right) = Identifier.pair(left_id, right_id)
        left_connected = self.connected(left)
        right_connected = self.connected(right)
        for e in left_connected:
            for o in right_connected:
                if e == o:
                    continue
                edge = self.get_edge(e, o)
                if edge is not None:
                    return edge
        return None

    def get_judgement(self, entity_id: StrIdent, other_id: StrIdent) -> Judgement:
        """Get the existing decision between two entities with dedupe factored in."""
        entity = str(entity_id)
        other = str(other_id)
        if entity == other:
            return Judgement.POSITIVE
        entity_connected = self._linker.connected_ids(entity)
        if other in entity_connected:
            return Judgement.POSITIVE
        # Check QIDs after connected because we sometimes insert an edge to say
        # one QID is canonical for another. Not common but important.
        if is_qid(entity) and is_qid(other):
            return Judgement.NEGATIVE

        # Any blocking (negative/unsure) edge spanning the two clusters decides
        # the pair. A positive edge can't span them — it would have merged the
        # clusters above — so only blockers remain to check.
        other_connected = self._linker.connected_ids(other)
        for e in entity_connected:
            for o in other_connected:
                judgement = self._blockers.get((e, o))
                if judgement is None:
                    judgement = self._blockers.get((o, e))
                if judgement is not None:
                    return judgement

        return Judgement.NO_JUDGEMENT

    def check_candidate(self, left: StrIdent, right: StrIdent) -> bool:
        """Check if the two IDs could be merged, i.e. if there's no existing
        judgement."""
        judgement = self.get_judgement(left, right)
        return judgement == Judgement.NO_JUDGEMENT

    def get_judgements(
        self, limit: Optional[int] = None
    ) -> Generator[Edge, None, None]:
        """Get most recently updated edges other than NO_JUDGEMENT."""
        stmt = self._table.select()
        stmt = stmt.where(self._table.c.judgement != Judgement.NO_JUDGEMENT.value)
        stmt = stmt.where(self._table.c.deleted_at.is_(None))
        stmt = stmt.order_by(self._table.c.created_at.desc())
        if limit is not None:
            stmt = stmt.limit(limit)
        cursor = self._session.execute(stmt)
        while batch := cursor.fetchmany(25):
            for row in batch:
                yield Edge.from_dict(row._mapping)
        cursor.close()

    def _get_suggested(self) -> List[Edge]:
        """Get all NO_JUDGEMENT edges in descending order of score."""
        stmt = self._table.select()
        stmt = stmt.where(self._table.c.judgement == Judgement.NO_JUDGEMENT.value)
        stmt = stmt.where(self._table.c.deleted_at.is_(None))
        stmt = stmt.order_by(self._table.c.score.desc().nulls_last())
        cursor = self._session.execute(stmt)
        edges = [Edge.from_dict(row._mapping) for row in cursor]
        cursor.close()
        return edges

    def get_candidates(
        self, limit: Optional[int] = None
    ) -> Generator[Tuple[str, str, Optional[float]], None, None]:
        returned = 0
        for edge in self._get_suggested():
            if not self.check_candidate(edge.source, edge.target):
                continue
            yield edge.target.id, edge.source.id, edge.score
            returned += 1
            if limit is not None and returned >= limit:
                break

    def suggest(
        self,
        left_id: StrIdent,
        right_id: StrIdent,
        score: float,
        user: Optional[str] = None,
    ) -> Identifier:
        """Make a NO_JUDGEMENT link between two identifiers to suggest that a user
        should make a decision about whether they are the same or not."""
        edge = Edge(left_id, right_id, judgement=Judgement.NO_JUDGEMENT)
        edge.created_at = timestamp()
        edge.user = user or getpass.getuser()
        edge.score = score

        stmt = self._session.insert(self._table).values(edge.to_dict())
        stmt = stmt.on_conflict_do_update(
            index_elements=[self._table.c.source, self._table.c.target],
            index_where=self._table.c.deleted_at.is_(None),
            set_={"score": stmt.excluded.score},
            where=self._table.c.judgement == Judgement.NO_JUDGEMENT.value,
        )
        self._session.execute(stmt)
        return edge.target

    def decide(
        self,
        left_id: StrIdent,
        right_id: StrIdent,
        judgement: Judgement,
        user: Optional[str] = None,
        score: Optional[float] = None,
    ) -> Identifier:
        result = self._decide(left_id, right_id, judgement, user=user, score=score)
        # Suggestions are not indexed in memory.
        if judgement != Judgement.NO_JUDGEMENT:
            self._update_from_db()
        return result

    def _decide(
        self,
        left_id: StrIdent,
        right_id: StrIdent,
        judgement: Judgement,
        user: Optional[str] = None,
        score: Optional[float] = None,
    ) -> Identifier:
        """Write a judgement without refreshing the indexes.

        Used recursively so ``decide`` can refresh once after canonicalisation.
        """
        edge = self.get_edge(left_id, right_id)
        if edge is None:
            edge = Edge(left_id, right_id, judgement=judgement)

        # Canonicalise positive matches, i.e. make both identifiers refer to a
        # canonical identifier, instead of making a direct link.
        if judgement == Judgement.POSITIVE:
            connected = set(self.connected(edge.target))
            connected.update(self.connected(edge.source))
            target = max(connected)
            if not target.canonical:
                canonical = Identifier.make()
                self._remove_edge(edge)
                self._decide(edge.source, canonical, judgement=judgement, user=user)
                self._decide(edge.target, canonical, judgement=judgement, user=user)
                return canonical

        edge.judgement = judgement
        edge.created_at = timestamp()
        edge.user = user or getpass.getuser()
        edge.score = score or edge.score
        self._register(edge)
        return edge.target

    def _register(self, edge: Edge) -> None:
        """Write the edge to the table, superseding any live edge for the pair."""
        if edge.judgement != Judgement.NO_JUDGEMENT:
            edge.score = None

        ustmt = update(self._table)
        ustmt = ustmt.values({"deleted_at": edge.created_at})
        ustmt = ustmt.where(self._table.c.source == edge.source.id)
        ustmt = ustmt.where(self._table.c.target == edge.target.id)
        ustmt = ustmt.where(self._table.c.deleted_at.is_(None))
        self._session.execute(ustmt)

        stmt = insert(self._table).values(edge.to_dict())
        self._session.execute(stmt)

    def _remove_edge(self, edge: Edge) -> None:
        """Soft-delete the live row for the edge's pair, if any."""
        edge.deleted_at = timestamp()
        stmt = update(self._table)
        stmt = stmt.values({"deleted_at": edge.deleted_at})
        stmt = stmt.where(self._table.c.target == edge.target.id)
        stmt = stmt.where(self._table.c.source == edge.source.id)
        stmt = stmt.where(self._table.c.deleted_at.is_(None))
        self._session.execute(stmt)

    def _remove_node(self, node: Identifier) -> None:
        """Soft-delete every live edge touching the node."""
        deleted_at = timestamp()
        stmt = update(self._table)
        stmt = stmt.values({"deleted_at": deleted_at})
        cond = or_(
            self._table.c.source == node.id,
            self._table.c.target == node.id,
        )
        stmt = stmt.where(cond)
        stmt = stmt.where(self._table.c.deleted_at.is_(None))
        self._session.execute(stmt)

    def remove(self, node_id: StrIdent) -> None:
        """Remove all edges linking to the given node from the graph."""
        node = Identifier.get(node_id)
        self._remove_node(node)
        self._update_from_db()

    def rename_node(self, old_id: StrIdent, new_id: StrIdent) -> int:
        """Rewrite every live edge touching an identifier to its replacement.

        Use this when an external identifier has been merged upstream, for
        example when Wikidata redirects one QID to another and downstream
        resolver state should pretend the old QID never existed.
        """
        old = Identifier.get(old_id)
        new = Identifier.get(new_id)
        if old == new:
            return 0

        now = timestamp()
        touching = or_(
            self._table.c.source == old.id,
            self._table.c.target == old.id,
        )
        changed = 0
        for edge in self._live_edges(touching):
            left = new if edge.target == old else edge.target
            right = new if edge.source == old else edge.source
            self._remove_edge(edge)
            if left == right:
                changed += 1
                continue
            self._register(
                Edge(
                    left_id=left,
                    right_id=right,
                    judgement=edge.judgement,
                    score=edge.score,
                    user=edge.user,
                    created_at=now,
                )
            )
            changed += 1
        self._update_from_db()
        return changed

    def explode(self, node_id: StrIdent) -> Set[str]:
        """Dissolve all edges linked to the cluster to which the node belongs.
        This is the hard way to make sure we re-do context once we realise
        there's been a mistake."""
        node = Identifier.get(node_id)
        affected: Set[str] = set()
        for part in self.connected(node):
            affected.add(str(part))
            self._remove_node(part)
        self._update_from_db()
        return affected

    def prune(
        self,
        cleanup_after: timedelta = timedelta(days=6 * 30),
        user: Optional[str] = None,
    ) -> None:
        """Drop suggestions and simplify the merge graph.

        Rewrites preserve cluster membership, allowing one refresh after all
        pruning passes complete.
        """
        self._prune_suggestions(user=user)
        self._prune_noncanonical_targets()
        cutoff_ts = (utc_now() - cleanup_after).isoformat()[:28]
        self._prune_intermediate_merges(cutoff_ts)
        self._update_from_db()

    def _live_edges(self, *conditions: Any) -> List[Edge]:
        """Materialise the live edges matching the given column conditions."""
        stmt = self._table.select().where(self._table.c.deleted_at.is_(None))
        for condition in conditions:
            stmt = stmt.where(condition)
        cursor = self._session.execute(stmt)
        edges = [Edge.from_dict(row._mapping) for row in cursor]
        cursor.close()
        return edges

    def _prune_suggestions(self, user: Optional[str] = None) -> None:
        """Hard-delete NO_JUDGEMENT suggestions (optionally for one user)."""
        stmt = delete(self._table)
        stmt = stmt.where(self._table.c.judgement == Judgement.NO_JUDGEMENT.value)
        if user is not None:
            stmt = stmt.where(self._table.c.user == user)
        self._session.execute(stmt)

    def _prune_noncanonical_targets(self) -> None:
        """Replace raw positive merge links with canonical links.

        Attach both endpoints to avoid splitting a cluster when one already
        resolves to the winning canonical.
        """
        now = timestamp()
        positive = self._table.c.judgement == Judgement.POSITIVE.value
        for edge in self._live_edges(positive):
            if edge.target.canonical:
                continue
            canonical = Identifier.get(self.get_canonical(edge.target.id))
            if not canonical.canonical:
                log.warning("Invalid target: %s -> %s" % (edge.source, edge.target))
                continue
            log.info(
                "Rewriting edge: %s = %s -> %s" % (edge.target, edge.source, canonical)
            )
            self._remove_edge(edge)
            for member in (edge.source, edge.target):
                self._register(
                    Edge(
                        left_id=member,
                        right_id=canonical,
                        judgement=Judgement.POSITIVE,
                        user=edge.user,
                        created_at=now,
                    )
                )

    def _prune_intermediate_merges(self, cutoff_ts: str) -> None:
        """Collapse old canonical-to-canonical merges onto the final canonical."""
        now = timestamp()
        positive = self._table.c.judgement == Judgement.POSITIVE.value
        old = self._table.c.created_at < cutoff_ts
        removed: Set[Pair] = set()
        for edge in self._live_edges(positive, old):
            if edge.key in removed:
                continue
            if not (edge.source.canonical and edge.target.canonical):
                continue
            canonical = Identifier.get(self.get_canonical(edge.source.id))
            log.info(
                "Removing intermediate merge: %s -> %s (%s)"
                % (edge.source, edge.target, canonical)
            )
            touching = or_(
                self._table.c.source == edge.source.id,
                self._table.c.target == edge.source.id,
            )
            for linked_edge in self._live_edges(touching):
                if linked_edge.key == edge.key or linked_edge.key in removed:
                    continue
                if linked_edge.other(edge.source) == canonical:
                    log.warning(" -> Skipping self-referential edge: %s" % linked_edge)
                else:
                    log.info(
                        " -> Rewriting edge: %s <-> %s -> %s (%s)"
                        % (
                            edge.source,
                            linked_edge.other(edge.source),
                            canonical,
                            linked_edge.judgement,
                        )
                    )
                    nu_edge = Edge(
                        left_id=linked_edge.other(edge.source),
                        right_id=canonical,
                        judgement=linked_edge.judgement,
                        user=linked_edge.user,
                        created_at=now,
                    )
                    self._register(nu_edge)
                self._remove_edge(linked_edge)
                removed.add(linked_edge.key)
            self._remove_edge(edge)
            removed.add(edge.key)

    def apply_statement(self, stmt: Statement) -> Statement:
        """Canonicalise Statement Entity IDs and ID values"""
        if stmt.entity_id is not None:
            stmt.canonical_id = self.get_canonical(stmt.entity_id)
        if stmt.prop_type == registry.entity.name:
            canon_value = self.get_canonical(stmt.value)
            if canon_value != stmt.value:
                if stmt.original_value is None:
                    stmt.original_value = stmt.value
                stmt = stmt.clone(value=canon_value)
        return stmt

    def dump(self, path: PathLike) -> None:
        """Store the resolver adjacency list to a plain text JSON list."""
        stmt = self._table.select()
        stmt = stmt.where(self._table.c.judgement != Judgement.NO_JUDGEMENT.value)
        stmt.order_by(self._table.c.created_at.asc())
        with open(path, "w") as fh:
            cursor = self._session.execute(stmt)
            for row in cursor.yield_per(20000):
                edge = Edge.from_dict(row._mapping)
                fh.write(edge.to_line())

    def load(self, path: PathLike) -> None:
        """Load edges directly into the database"""
        edge_count = 0
        with open(path, "r") as fh:
            while True:
                line = fh.readline()
                if not line:
                    break
                edge = Edge.from_line(line)
                self._register(edge)
                edge_count += 1
                if edge_count % 10000 == 0:
                    log.info("Loaded %s edges." % edge_count)
        log.info("Done. Loaded %s edges." % edge_count)
        self._update_from_db()

    def __repr__(self) -> str:
        parts = self._session.engine.url
        url = f"{parts.drivername}://{parts.host or ''}/{parts.database}/{self._table.name}"
        return f"<Resolver({url})>"

apply_statement(stmt)

Canonicalise Statement Entity IDs and ID values

Source code in nomenklatura/resolver/resolver.py
def apply_statement(self, stmt: Statement) -> Statement:
    """Canonicalise Statement Entity IDs and ID values"""
    if stmt.entity_id is not None:
        stmt.canonical_id = self.get_canonical(stmt.entity_id)
    if stmt.prop_type == registry.entity.name:
        canon_value = self.get_canonical(stmt.value)
        if canon_value != stmt.value:
            if stmt.original_value is None:
                stmt.original_value = stmt.value
            stmt = stmt.clone(value=canon_value)
    return stmt

canonicals()

Return all the canonical cluster identifiers.

Source code in nomenklatura/resolver/resolver.py
def canonicals(self) -> Generator[Identifier, None, None]:
    """Return all the canonical cluster identifiers."""
    return self._linker.canonicals()

check_candidate(left, right)

Check if the two IDs could be merged, i.e. if there's no existing judgement.

Source code in nomenklatura/resolver/resolver.py
def check_candidate(self, left: StrIdent, right: StrIdent) -> bool:
    """Check if the two IDs could be merged, i.e. if there's no existing
    judgement."""
    judgement = self.get_judgement(left, right)
    return judgement == Judgement.NO_JUDGEMENT

dump(path)

Store the resolver adjacency list to a plain text JSON list.

Source code in nomenklatura/resolver/resolver.py
def dump(self, path: PathLike) -> None:
    """Store the resolver adjacency list to a plain text JSON list."""
    stmt = self._table.select()
    stmt = stmt.where(self._table.c.judgement != Judgement.NO_JUDGEMENT.value)
    stmt.order_by(self._table.c.created_at.asc())
    with open(path, "w") as fh:
        cursor = self._session.execute(stmt)
        for row in cursor.yield_per(20000):
            edge = Edge.from_dict(row._mapping)
            fh.write(edge.to_line())

explode(node_id)

Dissolve all edges linked to the cluster to which the node belongs. This is the hard way to make sure we re-do context once we realise there's been a mistake.

Source code in nomenklatura/resolver/resolver.py
def explode(self, node_id: StrIdent) -> Set[str]:
    """Dissolve all edges linked to the cluster to which the node belongs.
    This is the hard way to make sure we re-do context once we realise
    there's been a mistake."""
    node = Identifier.get(node_id)
    affected: Set[str] = set()
    for part in self.connected(node):
        affected.add(str(part))
        self._remove_node(part)
    self._update_from_db()
    return affected

get_canonical(entity_id)

Return the canonical identifier for the given entity ID.

Source code in nomenklatura/resolver/resolver.py
def get_canonical(self, entity_id: str) -> str:
    """Return the canonical identifier for the given entity ID."""
    node = Identifier.get(entity_id)
    max_ = max(self.connected(node))
    if max_.canonical:
        return max_.id
    return node.id

get_judgement(entity_id, other_id)

Get the existing decision between two entities with dedupe factored in.

Source code in nomenklatura/resolver/resolver.py
def get_judgement(self, entity_id: StrIdent, other_id: StrIdent) -> Judgement:
    """Get the existing decision between two entities with dedupe factored in."""
    entity = str(entity_id)
    other = str(other_id)
    if entity == other:
        return Judgement.POSITIVE
    entity_connected = self._linker.connected_ids(entity)
    if other in entity_connected:
        return Judgement.POSITIVE
    # Check QIDs after connected because we sometimes insert an edge to say
    # one QID is canonical for another. Not common but important.
    if is_qid(entity) and is_qid(other):
        return Judgement.NEGATIVE

    # Any blocking (negative/unsure) edge spanning the two clusters decides
    # the pair. A positive edge can't span them — it would have merged the
    # clusters above — so only blockers remain to check.
    other_connected = self._linker.connected_ids(other)
    for e in entity_connected:
        for o in other_connected:
            judgement = self._blockers.get((e, o))
            if judgement is None:
                judgement = self._blockers.get((o, e))
            if judgement is not None:
                return judgement

    return Judgement.NO_JUDGEMENT

get_judgements(limit=None)

Get most recently updated edges other than NO_JUDGEMENT.

Source code in nomenklatura/resolver/resolver.py
def get_judgements(
    self, limit: Optional[int] = None
) -> Generator[Edge, None, None]:
    """Get most recently updated edges other than NO_JUDGEMENT."""
    stmt = self._table.select()
    stmt = stmt.where(self._table.c.judgement != Judgement.NO_JUDGEMENT.value)
    stmt = stmt.where(self._table.c.deleted_at.is_(None))
    stmt = stmt.order_by(self._table.c.created_at.desc())
    if limit is not None:
        stmt = stmt.limit(limit)
    cursor = self._session.execute(stmt)
    while batch := cursor.fetchmany(25):
        for row in batch:
            yield Edge.from_dict(row._mapping)
    cursor.close()

get_linker()

Return a linker object that can be used to resolve entities. This is less memory-consuming than the full resolver object.

Source code in nomenklatura/resolver/resolver.py
def get_linker(self) -> Linker[SE]:
    """Return a linker object that can be used to resolve entities.
    This is less memory-consuming than the full resolver object.
    """
    linker: Linker[SE] = Linker({})
    stmt = self._table.select()
    stmt = stmt.where(self._table.c.judgement == Judgement.POSITIVE.value)
    stmt = stmt.where(self._table.c.deleted_at.is_(None))
    stmt.order_by(self._table.c.created_at.asc())
    cursor = self._session.execute(stmt)
    while batch := cursor.fetchmany(20000):
        for row in batch:
            linker.add(row.source, row.target)
    cursor.close()
    return linker

get_referents(canonical_id, canonicals=True)

Get all the non-canonical entity identifiers which refer to a given canonical identifier.

Source code in nomenklatura/resolver/resolver.py
def get_referents(self, canonical_id: str, canonicals: bool = True) -> Set[str]:
    """Get all the non-canonical entity identifiers which refer to a given
    canonical identifier."""
    node = Identifier.get(canonical_id)
    referents: Set[str] = set()
    for connected in self.connected(node):
        if not canonicals and connected.canonical:
            continue
        if connected == node:
            continue
        referents.add(connected.id)
    return referents

get_resolved_edge(left_id, right_id)

Return some edge that connects the two entities, if it exists.

Source code in nomenklatura/resolver/resolver.py
def get_resolved_edge(
    self, left_id: StrIdent, right_id: StrIdent
) -> Optional[Edge]:
    """
    Return _some_ edge that connects the two entities, if it exists.
    """
    (left, right) = Identifier.pair(left_id, right_id)
    left_connected = self.connected(left)
    right_connected = self.connected(right)
    for e in left_connected:
        for o in right_connected:
            if e == o:
                continue
            edge = self.get_edge(e, o)
            if edge is not None:
                return edge
    return None

load(path)

Load edges directly into the database

Source code in nomenklatura/resolver/resolver.py
def load(self, path: PathLike) -> None:
    """Load edges directly into the database"""
    edge_count = 0
    with open(path, "r") as fh:
        while True:
            line = fh.readline()
            if not line:
                break
            edge = Edge.from_line(line)
            self._register(edge)
            edge_count += 1
            if edge_count % 10000 == 0:
                log.info("Loaded %s edges." % edge_count)
    log.info("Done. Loaded %s edges." % edge_count)
    self._update_from_db()

load_into_memory()

Load the in-memory indexes from the database.

Call this to pick up database writes made by another session. A full rebuild is required because commit order can differ from write order.

Source code in nomenklatura/resolver/resolver.py
def load_into_memory(self) -> None:
    """Load the in-memory indexes from the database.

    Call this to pick up database writes made by another session. A full
    rebuild is required because commit order can differ from write order.
    """
    self._load_all()

prune(cleanup_after=timedelta(days=(6 * 30)), user=None)

Drop suggestions and simplify the merge graph.

Rewrites preserve cluster membership, allowing one refresh after all pruning passes complete.

Source code in nomenklatura/resolver/resolver.py
def prune(
    self,
    cleanup_after: timedelta = timedelta(days=6 * 30),
    user: Optional[str] = None,
) -> None:
    """Drop suggestions and simplify the merge graph.

    Rewrites preserve cluster membership, allowing one refresh after all
    pruning passes complete.
    """
    self._prune_suggestions(user=user)
    self._prune_noncanonical_targets()
    cutoff_ts = (utc_now() - cleanup_after).isoformat()[:28]
    self._prune_intermediate_merges(cutoff_ts)
    self._update_from_db()

remove(node_id)

Remove all edges linking to the given node from the graph.

Source code in nomenklatura/resolver/resolver.py
def remove(self, node_id: StrIdent) -> None:
    """Remove all edges linking to the given node from the graph."""
    node = Identifier.get(node_id)
    self._remove_node(node)
    self._update_from_db()

rename_node(old_id, new_id)

Rewrite every live edge touching an identifier to its replacement.

Use this when an external identifier has been merged upstream, for example when Wikidata redirects one QID to another and downstream resolver state should pretend the old QID never existed.

Source code in nomenklatura/resolver/resolver.py
def rename_node(self, old_id: StrIdent, new_id: StrIdent) -> int:
    """Rewrite every live edge touching an identifier to its replacement.

    Use this when an external identifier has been merged upstream, for
    example when Wikidata redirects one QID to another and downstream
    resolver state should pretend the old QID never existed.
    """
    old = Identifier.get(old_id)
    new = Identifier.get(new_id)
    if old == new:
        return 0

    now = timestamp()
    touching = or_(
        self._table.c.source == old.id,
        self._table.c.target == old.id,
    )
    changed = 0
    for edge in self._live_edges(touching):
        left = new if edge.target == old else edge.target
        right = new if edge.source == old else edge.source
        self._remove_edge(edge)
        if left == right:
            changed += 1
            continue
        self._register(
            Edge(
                left_id=left,
                right_id=right,
                judgement=edge.judgement,
                score=edge.score,
                user=edge.user,
                created_at=now,
            )
        )
        changed += 1
    self._update_from_db()
    return changed

suggest(left_id, right_id, score, user=None)

Make a NO_JUDGEMENT link between two identifiers to suggest that a user should make a decision about whether they are the same or not.

Source code in nomenklatura/resolver/resolver.py
def suggest(
    self,
    left_id: StrIdent,
    right_id: StrIdent,
    score: float,
    user: Optional[str] = None,
) -> Identifier:
    """Make a NO_JUDGEMENT link between two identifiers to suggest that a user
    should make a decision about whether they are the same or not."""
    edge = Edge(left_id, right_id, judgement=Judgement.NO_JUDGEMENT)
    edge.created_at = timestamp()
    edge.user = user or getpass.getuser()
    edge.score = score

    stmt = self._session.insert(self._table).values(edge.to_dict())
    stmt = stmt.on_conflict_do_update(
        index_elements=[self._table.c.source, self._table.c.target],
        index_where=self._table.c.deleted_at.is_(None),
        set_={"score": stmt.excluded.score},
        where=self._table.c.judgement == Judgement.NO_JUDGEMENT.value,
    )
    self._session.execute(stmt)
    return edge.target

nomenklatura.resolver.Linker

Bases: Generic[SE]

A class to manage the canonicalisation of entities. This stores only the positive merges of entities and is used as a lightweight way to apply the harmonisation post de-duplication.

Internally stores a dict[str, tuple[str, ...]] where each cluster is a sorted tuple with the canonical ID at index 0 and referents following. Every node in a cluster maps to the same shared tuple object.

Source code in nomenklatura/resolver/linker.py
class Linker(Generic[SE]):
    """A class to manage the canonicalisation of entities. This stores only the positive
    merges of entities and is used as a lightweight way to apply the harmonisation
    post de-duplication.

    Internally stores a dict[str, tuple[str, ...]] where each cluster is a sorted
    tuple with the canonical ID at index 0 and referents following. Every node in
    a cluster maps to the same shared tuple object."""

    def __init__(self, mapping: Dict[str, Tuple[str, ...]]) -> None:
        self._mapping: Dict[str, Tuple[str, ...]] = mapping

    def add(self, left: str, right: str) -> str:
        """Merge two identifier clusters and return their canonical.

        Idempotence lets the resolver replay database updates without tracking
        which edges it has already applied.
        """
        idents: Set[str] = set()
        for node in (left, right):
            cluster = self._mapping.get(node)
            if cluster is not None:
                idents.update(cluster)
        idents.update((left, right))
        # Sort via Identifier ordering (weight, id) so the canonical lands first.
        members = tuple(i.id for i in sorted(map(Identifier.get, idents), reverse=True))
        for node in members:
            self._mapping[node] = members
        return members[0]

    def connected(self, node: Identifier) -> Set[Identifier]:
        """Return all entities connected to the given node. Constructs Identifier
        objects on the fly from the internal string representation."""
        return {Identifier.get(n) for n in self.connected_ids(node.id)}

    def connected_ids(self, entity_id: str) -> Tuple[str, ...]:
        """Return the stored identifiers connected to an entity ID."""
        return self._mapping.get(entity_id, (entity_id,))

    def get_canonical(self, entity_id: str) -> str:
        """Return the canonical identifier for the given entity ID."""
        if isinstance(entity_id, Identifier):
            warnings.warn(
                "Passing Identifier objects to get_canonical is deprecated",
                DeprecationWarning,
                stacklevel=2,
            )
            entity_id = entity_id.id
        cluster = self._mapping.get(entity_id)
        if cluster is not None:
            return cluster[0]
        return entity_id

    def canonicals(self) -> Generator[Identifier, None, None]:
        """Return all the canonical cluster identifiers."""
        seen: Set[str] = set()
        for cluster in self._mapping.values():
            canonical = cluster[0]
            if canonical not in seen:
                ident = Identifier.get(canonical)
                if ident.canonical:
                    seen.add(canonical)
                    yield ident

    def get_referents(self, canonical_id: str, canonicals: bool = True) -> Set[str]:
        """Get all the non-canonical entity identifiers which refer to a given
        canonical identifier."""
        if isinstance(canonical_id, Identifier):
            warnings.warn(
                "Passing Identifier objects to get_referents is deprecated",
                DeprecationWarning,
                stacklevel=2,
            )
            canonical_id = canonical_id.id
        cluster = self._mapping.get(canonical_id)
        if cluster is None:
            return set()
        referents = set(cluster)
        referents.discard(canonical_id)
        if not canonicals:
            referents = {r for r in referents if not Identifier.get(r).canonical}
        return referents

    def apply(self, proxy: SE) -> SE:
        """Replace all entity references in a given proxy with their canonical
        identifiers. This is essentially the harmonisation post de-dupe."""
        if proxy.id is None:
            return proxy
        proxy.id = self.get_canonical(proxy.id)
        return self.apply_properties(proxy)

    def apply_stream(self, proxy: ValueEntity) -> ValueEntity:
        if proxy.id is None:
            return proxy
        proxy.id = self.get_canonical(proxy.id)
        for prop in proxy.iterprops():
            if prop.type == registry.entity:
                values = proxy.pop(prop)
                for value in values:
                    proxy.unsafe_add(prop, self.get_canonical(value), cleaned=True)
        return proxy

    def apply_properties(self, proxy: SE) -> SE:
        for stmt in proxy._iter_stmt():
            if proxy.id is not None:
                stmt.canonical_id = proxy.id
            if stmt.prop_type == registry.entity.name:
                canon_value = self.get_canonical(stmt._value)
                if canon_value != stmt.value:
                    stmt = stmt.clone(
                        value=canon_value,
                        original_value=stmt.original_value or stmt._value,
                    )
        return proxy

    def apply_statement(self, stmt: Statement) -> Statement:
        if stmt.entity_id is not None:
            stmt.canonical_id = self.get_canonical(stmt.entity_id)
        if stmt.prop_type == registry.entity.name:
            canon_value = self.get_canonical(stmt._value)
            if canon_value != stmt._value:
                stmt = stmt.clone(
                    value=canon_value,
                    original_value=stmt.original_value or stmt._value,
                )
        return stmt

    def __repr__(self) -> str:
        return f"<Linker({len(self._mapping)})>"

add(left, right)

Merge two identifier clusters and return their canonical.

Idempotence lets the resolver replay database updates without tracking which edges it has already applied.

Source code in nomenklatura/resolver/linker.py
def add(self, left: str, right: str) -> str:
    """Merge two identifier clusters and return their canonical.

    Idempotence lets the resolver replay database updates without tracking
    which edges it has already applied.
    """
    idents: Set[str] = set()
    for node in (left, right):
        cluster = self._mapping.get(node)
        if cluster is not None:
            idents.update(cluster)
    idents.update((left, right))
    # Sort via Identifier ordering (weight, id) so the canonical lands first.
    members = tuple(i.id for i in sorted(map(Identifier.get, idents), reverse=True))
    for node in members:
        self._mapping[node] = members
    return members[0]

apply(proxy)

Replace all entity references in a given proxy with their canonical identifiers. This is essentially the harmonisation post de-dupe.

Source code in nomenklatura/resolver/linker.py
def apply(self, proxy: SE) -> SE:
    """Replace all entity references in a given proxy with their canonical
    identifiers. This is essentially the harmonisation post de-dupe."""
    if proxy.id is None:
        return proxy
    proxy.id = self.get_canonical(proxy.id)
    return self.apply_properties(proxy)

canonicals()

Return all the canonical cluster identifiers.

Source code in nomenklatura/resolver/linker.py
def canonicals(self) -> Generator[Identifier, None, None]:
    """Return all the canonical cluster identifiers."""
    seen: Set[str] = set()
    for cluster in self._mapping.values():
        canonical = cluster[0]
        if canonical not in seen:
            ident = Identifier.get(canonical)
            if ident.canonical:
                seen.add(canonical)
                yield ident

connected(node)

Return all entities connected to the given node. Constructs Identifier objects on the fly from the internal string representation.

Source code in nomenklatura/resolver/linker.py
def connected(self, node: Identifier) -> Set[Identifier]:
    """Return all entities connected to the given node. Constructs Identifier
    objects on the fly from the internal string representation."""
    return {Identifier.get(n) for n in self.connected_ids(node.id)}

connected_ids(entity_id)

Return the stored identifiers connected to an entity ID.

Source code in nomenklatura/resolver/linker.py
def connected_ids(self, entity_id: str) -> Tuple[str, ...]:
    """Return the stored identifiers connected to an entity ID."""
    return self._mapping.get(entity_id, (entity_id,))

get_canonical(entity_id)

Return the canonical identifier for the given entity ID.

Source code in nomenklatura/resolver/linker.py
def get_canonical(self, entity_id: str) -> str:
    """Return the canonical identifier for the given entity ID."""
    if isinstance(entity_id, Identifier):
        warnings.warn(
            "Passing Identifier objects to get_canonical is deprecated",
            DeprecationWarning,
            stacklevel=2,
        )
        entity_id = entity_id.id
    cluster = self._mapping.get(entity_id)
    if cluster is not None:
        return cluster[0]
    return entity_id

get_referents(canonical_id, canonicals=True)

Get all the non-canonical entity identifiers which refer to a given canonical identifier.

Source code in nomenklatura/resolver/linker.py
def get_referents(self, canonical_id: str, canonicals: bool = True) -> Set[str]:
    """Get all the non-canonical entity identifiers which refer to a given
    canonical identifier."""
    if isinstance(canonical_id, Identifier):
        warnings.warn(
            "Passing Identifier objects to get_referents is deprecated",
            DeprecationWarning,
            stacklevel=2,
        )
        canonical_id = canonical_id.id
    cluster = self._mapping.get(canonical_id)
    if cluster is None:
        return set()
    referents = set(cluster)
    referents.discard(canonical_id)
    if not canonicals:
        referents = {r for r in referents if not Identifier.get(r).canonical}
    return referents

nomenklatura.judgement.Judgement

Bases: Enum

A judgement of whether two entities are the same.

Source code in nomenklatura/judgement.py
class Judgement(Enum):
    """A judgement of whether two entities are the same."""

    POSITIVE = "positive"
    NEGATIVE = "negative"
    UNSURE = "unsure"
    NO_JUDGEMENT = "no_judgement"

    def __add__(self, other: "Judgement") -> "Judgement":
        pair = {self, other}
        if pair == {Judgement.POSITIVE}:
            return Judgement.POSITIVE
        elif pair == {Judgement.POSITIVE, Judgement.NEGATIVE}:
            return Judgement.NEGATIVE
        return Judgement.UNSURE

    def to_dict(self) -> str:
        return str(self.value)

nomenklatura.resolver.Identifier

Bases: object

Source code in nomenklatura/resolver/identifier.py
class Identifier(object):
    PREFIX = "NK-"

    __slots__ = ("id", "canonical", "weight")

    def __init__(self, id: str):
        self.id = id
        self.weight: int = 1
        if self.id.startswith(self.PREFIX):
            self.weight = 2
        elif is_qid(id):
            self.weight = 3
        self.canonical = self.weight > 1

    def __eq__(self, other: Any) -> bool:
        return self.id == str(other)

    def __lt__(self, other: Any) -> bool:
        return (self.weight, self.id) < (other.weight, other.id)

    def __str__(self) -> str:
        return self.id

    def __hash__(self) -> int:
        return hash(self.id)

    def __len__(self) -> int:
        return len(self.id)

    def __repr__(self) -> str:
        return f"<I({self.id})>"

    @classmethod
    def get(cls, id: StrIdent) -> "Identifier":
        if isinstance(id, str):
            return cls(id)
        return id

    @classmethod
    def pair(cls, left_id: StrIdent, right_id: StrIdent) -> Pair:
        left = cls.get(left_id)
        right = cls.get(right_id)
        if left == right:
            raise ResolverLogicError("%s/%s" % (left, right))
        return (max(left, right), min(left, right))

    @classmethod
    def make(cls, value: Optional[str] = None) -> "Identifier":
        key = value or shortuuid.uuid()
        return cls.get(f"{cls.PREFIX}{key}")

nomenklatura.resolver.Edge

Bases: object

Source code in nomenklatura/resolver/edge.py
class Edge(object):
    __slots__ = (
        "key",
        "source",
        "target",
        "judgement",
        "score",
        "user",
        "created_at",
        "deleted_at",
    )

    def __init__(
        self,
        left_id: StrIdent,
        right_id: StrIdent,
        judgement: Judgement = Judgement.NO_JUDGEMENT,
        score: Optional[float] = None,
        user: Optional[str] = None,
        created_at: Optional[str] = None,
        deleted_at: Optional[str] = None,
    ):
        self.key = Identifier.pair(left_id, right_id)
        self.target, self.source = self.key
        self.judgement = judgement
        self.score = score
        self.user = user
        self.created_at = created_at
        self.deleted_at = deleted_at

    def other(self, cur: Identifier) -> Identifier:
        if cur == self.target:
            return self.source
        return self.target

    def to_dict(self) -> Dict[str, Any]:
        return {
            "target": self.target.id,
            "source": self.source.id,
            "judgement": self.judgement.value,
            "score": self.score,
            "user": self.user,
            "created_at": self.created_at,
            "deleted_at": self.deleted_at,
        }

    def to_line(self) -> str:
        row = [
            self.target.id,
            self.source.id,
            self.judgement.value,
            self.score,
            self.user,
            self.created_at,
        ]
        return json.dumps(row) + "\n"

    def __str__(self) -> str:
        return self.to_line()

    def __hash__(self) -> int:
        return hash(self.key)

    def __eq__(self, other: Any) -> bool:
        return hash(self) == hash(other)

    def __lt__(self, other: Any) -> bool:
        return bool(self.key < other.key)

    def __repr__(self) -> str:
        return f"<E({self.target.id}, {self.source.id}, {self.judgement.value})>"

    @classmethod
    def from_line(cls, line: str) -> "Edge":
        data = json.loads(line)
        edge = cls(
            data[0],
            data[1],
            judgement=Judgement(data[2]),
            score=data[3],
            user=data[4],
            created_at=data[5],
        )
        if len(data) > 6:
            edge.deleted_at = data[6]
        return edge

    @classmethod
    def from_dict(cls, data: Union[RowMapping, Dict[str, Any]]) -> "Edge":
        return cls(
            left_id=data["target"],
            right_id=data["source"],
            judgement=Judgement(data["judgement"]),
            score=data["score"],
            user=data["user"],
            created_at=data.get("created_at"),
            deleted_at=data.get("deleted_at"),
        )