"""A data model for synonyms."""
from __future__ import annotations
import builtins
import csv
import datetime
import gzip
import importlib.util
import itertools as itt
from collections import defaultdict
from collections.abc import Iterable, Mapping, Sequence
from pathlib import Path
from typing import TYPE_CHECKING, Any, Generic, Literal, NamedTuple, TypeAlias, cast, overload
from curies import NamableReference, Reference, ReferenceTuple
from curies import vocabulary as v
from pydantic import BaseModel, Field
from pydantic_extra_types.language_code import LanguageAlpha2
from pystow.utils import safe_open, safe_open_writer
from tqdm import tqdm
from typing_extensions import TypeVar
if TYPE_CHECKING:
import gilda
import pandas
__all__ = [
"DEFAULT_PREDICATE",
"PREDICATES",
"GildaErrorPolicy",
"LiteralMapping",
"LiteralMappingIndex",
"LiteralMappingTuple",
"R",
"Writer",
"append_literal_mapping",
"df_to_literal_mappings",
"get_prefixes",
"group_literal_mappings",
"lint_literal_mappings",
"literal_mappings_to_df",
"literal_mappings_to_gilda",
"read_gilda_terms",
"read_literal_mappings",
"remap_literal_mappings",
"write_gilda_terms",
"write_literal_mappings",
]
PANDAS_AVAILABLE = importlib.util.find_spec("pandas")
GILDA_AVAILABLE = importlib.util.find_spec("gilda")
R = TypeVar("R", bound=NamableReference, default=NamableReference)
[docs]
class LiteralMappingTuple(NamedTuple):
"""Represents rows in a spreadsheet."""
text: str
curie: str
name: str | None
predicate: str
type: str | None
provenance: str | None
contributor: str | None
date: str | None
language: str | None
comment: str | None
source: str | None
taxon: str | None
SynonymTuple = LiteralMappingTuple
NamableReferenceType: TypeAlias = type[NamableReference]
#: The header for the spreadsheet
HEADER = list(LiteralMappingTuple._fields)
#: A set of permissible predicates
PREDICATES = [v.has_label, *v.synonym_scopes.values()]
#: The default synonym type predicate was chosen based on the OBO
#: standard - when you don't specify a scope, this is what it infers
DEFAULT_PREDICATE = v.has_related_synonym
#: The error policy when converting to/from gilda terms
GildaErrorPolicy: TypeAlias = Literal["ignore", "raise"]
[docs]
class LiteralMapping(BaseModel, Generic[R]):
"""A data model for literal mappings."""
# the first four fields are the core of the literal mapping
reference: R = Field(..., description="The subject of the literal mapping")
predicate: Reference = Field(
default=DEFAULT_PREDICATE,
description="The predicate that connects the term (as subject) "
"to the textual synonym (as object)",
examples=PREDICATES,
)
text: str = Field(..., description="The object of the literal mapping")
language: LanguageAlpha2 | None = Field(
None,
description="The language of the synonym. If not given, typically "
"assumed to be american english.",
)
type: Reference | None = Field(
default=None,
title="Synonym type",
description="A qualification for the type of mapping",
examples=list(v.synonym_types),
)
provenance: list[Reference] = Field(
default_factory=list,
description="A list of articles (e.g., from PubMed, PMC, arXiv) where this synonym appears",
)
contributor: Reference | None = Field(
None,
description="The contributor, usually given as a reference to ORCID",
examples=[v.charlie],
)
comment: str | None = Field(
None, description="An optional comment on the synonym curation or status"
)
source: str | None = Field(
None, description="The name of the resource where the synonym was curated"
)
date: datetime.date | None = Field(None, description="The date of initial curation")
taxon: Reference | None = Field(
None,
description="If taxon-specific, annotate it here. "
"Only use `NCBITaxon` or `ncbitaxon` as the prefix.",
)
def __lt__(self, other: LiteralMapping[R]) -> bool:
return _lm_sort_key(self) < _lm_sort_key(other)
[docs]
def get_all_references(self) -> set[Reference]:
"""Get all references made by this object."""
rv: set[Reference] = {self.reference, self.predicate, *self.provenance}
if self.type:
rv.add(self.type)
if self.contributor:
rv.add(self.contributor)
return rv
@property
def name(self) -> str | None:
"""Get the reference's (optional) name."""
return self.reference.name
@property
def curie(self) -> str:
"""Get the reference's CURIE."""
return self.reference.curie
@property
def date_str(self) -> str:
"""Get the date as a string."""
if self.date is None:
raise ValueError("date is not set")
return self.date.strftime("%Y-%m-%d")
# docstr-coverage:excused `overload`
@overload
@classmethod
def from_row(
cls,
row: dict[str, Any],
*,
names: Mapping[Reference, str] | None = ...,
reference_cls: builtins.type[R] = ...,
) -> LiteralMapping[R]: ...
# docstr-coverage:excused `overload`
@overload
@classmethod
def from_row(
cls,
row: dict[str, Any],
*,
names: Mapping[Reference, str] | None = ...,
reference_cls: None = ...,
) -> LiteralMapping[NamableReference]: ...
[docs]
@classmethod
def from_row(
cls,
row: dict[str, Any],
*,
names: Mapping[Reference, str] | None = None,
reference_cls: builtins.type[R] | None = None,
) -> LiteralMapping[R] | LiteralMapping[NamableReference]:
"""Parse a dictionary representing a row in a TSV."""
if reference_cls is None:
reference_cls = NamableReference # type:ignore
assert reference_cls is not None # noqa:S101
reference = NamableReference.from_curie(row["curie"])
name = (names or {}).get(reference) or row.get("name")
data = {
"text": row["text"],
"reference": reference_cls(
prefix=reference.prefix, identifier=reference.identifier, name=name
),
"predicate": (
reference_cls.from_curie(predicate_curie.strip())
if (predicate_curie := row.get("predicate"))
else DEFAULT_PREDICATE
),
"provenance": [
reference_cls.from_curie(provenance_curie.strip())
for provenance_curie in (row.get("provenance") or "").split(",")
if provenance_curie.strip()
],
# get("X") or None protects against empty strings
"type": row.get("type") or None,
"language": row.get("language") or None,
"comment": row.get("comment") or None,
"source": row.get("source") or None,
"date": row.get("date") or None,
}
if contributor_curie := (row.get("contributor") or "").strip():
data["contributor"] = reference_cls.from_curie(contributor_curie)
return cast(LiteralMapping[NamableReference], cls.model_validate(data))
def _as_row(self) -> LiteralMappingTuple:
"""Get the synonym as a row for writing."""
return LiteralMappingTuple(
text=self.text,
curie=self.curie,
name=self.name,
predicate=self.predicate.curie,
type=self.type.curie if self.type else None,
provenance=",".join(p.curie for p in self.provenance) if self.provenance else None,
contributor=self.contributor.curie if self.contributor is not None else None,
date=self.date_str if self.date is not None else None,
language=self.language or None,
comment=self.comment or None,
source=self.source or None,
taxon=self.taxon.curie if self.taxon else None,
)
def _as_row_for_writer(self) -> Sequence[str]:
return tuple(x or "" for x in self._as_row())
@staticmethod
def _predicate_type_from_gilda(status: GildaStatus) -> tuple[Reference, Reference | None]:
if status == "name":
return v.has_label, None
elif status == "former_name":
return DEFAULT_PREDICATE, v.previous_name
elif status == "synonym":
return DEFAULT_PREDICATE, None
elif status == "curated":
# assume higher confidence in exact synonym
return v.has_exact_synonym, None
raise ValueError(f"unhandled gilda status: {status}")
# docstr-coverage:excused `overload`
@overload
@classmethod
def from_gilda(
cls, term: gilda.Term, *, reference_cls: builtins.type[R] = ...
) -> LiteralMapping[R]: ...
# docstr-coverage:excused `overload`
@overload
@classmethod
def from_gilda(
cls, term: gilda.Term, *, reference_cls: None = ...
) -> LiteralMapping[NamableReference]: ...
[docs]
@classmethod
def from_gilda(
cls, term: gilda.Term, *, reference_cls: builtins.type[R] | None = None
) -> LiteralMapping[R] | LiteralMapping[NamableReference]:
"""Construct a synonym from a :mod:`gilda` term.
:param term: A Gilda term
:param reference_cls: the class to use to instantiate references
:returns: A literal mapping object
.. warning::
Gilda's data model is less detailed, so resulting synonym objects will not
have detailed curation provenance
"""
if reference_cls is None:
reference_cls = NamableReference # type:ignore
assert reference_cls is not None # noqa:S101
predicate, synonym_type = cls._predicate_type_from_gilda(term.status)
data = {
"reference": reference_cls(prefix=term.db, identifier=term.id, name=term.entry_name),
"predicate": predicate,
"text": term.text,
"type": synonym_type,
"source": term.source,
}
if term.organism:
data["taxon"] = reference_cls(prefix="NCBITaxon", identifier=term.organism)
return cast(LiteralMapping[NamableReference], cls.model_validate(data))
def _get_gilda_status(self) -> GildaStatus:
"""Get the Gilda status for a synonym."""
if self.predicate and self.predicate.pair == v.has_label.pair:
return "name"
if self.type and self.type.pair == v.previous_name.pair:
return "former_name"
return "synonym"
[docs]
def to_gilda(self) -> gilda.Term:
"""Get this synonym as a :mod:`gilda` term.
:returns: An object that can be indexed by Gilda for NER and grounding
"""
if not self.name:
raise ValueError(f"can't make a Gilda term without a label for {self.reference.pair}")
if self.taxon and self.taxon.prefix.lower() != "ncbitaxon":
raise ValueError("NCBITaxon reference is required to convert to gilda.")
return _gilda_term(
text=self.text,
reference=self.reference,
status=self._get_gilda_status(),
source=self.source or self.reference.prefix,
ncbitaxon_id=self.taxon.identifier if self.taxon else None,
)
#: An index from the reference to a list of mappings that use the reference
LiteralMappingIndex: TypeAlias = dict[R, list[LiteralMapping[R]]]
[docs]
def literal_mappings_to_gilda(
literal_mappings: Iterable[LiteralMapping[R]], *, on_error: GildaErrorPolicy = "raise"
) -> list[gilda.Term]:
"""Convert literal mappings to gilda terms."""
gilda_terms = []
for literal_mapping in literal_mappings:
try:
gilda_term = literal_mapping.to_gilda()
except ValueError:
if on_error == "raise":
raise
else:
gilda_terms.append(gilda_term)
return gilda_terms
#: See https://github.com/gyorilab/gilda/blob/ea328734f26c91189438e6d3408562f990f38644/gilda/term.py#L167C1-L167C69
GildaStatus: TypeAlias = Literal["name", "synonym", "curated", "former_name"]
def _gilda_term(
*,
text: str,
reference: NamableReference,
status: GildaStatus,
source: str | None,
ncbitaxon_id: str | None = None,
) -> gilda.Term:
import gilda
from gilda.process import normalize
norm_text = normalize(text) # type:ignore[no-untyped-call]
return gilda.Term( # type:ignore[no-untyped-call]
norm_text,
text=text,
db=reference.prefix,
id=reference.identifier,
entry_name=reference.name or text,
status=status,
source=source,
organism=ncbitaxon_id,
)
[docs]
def literal_mappings_to_df(literal_mappings: Iterable[LiteralMapping[R]]) -> pandas.DataFrame:
"""Get a pandas dataframe from the literal mappings."""
import pandas as pd
df = pd.DataFrame(
(literal_mapping._as_row() for literal_mapping in literal_mappings), columns=HEADER
)
# remove any columns that are fully blank
for col in df.columns:
if df[col].isna().all():
del df[col]
return df
# docstr-coverage:excused `overload`
@overload
def df_to_literal_mappings(
df: pandas.DataFrame,
*,
names: Mapping[Reference, str] | None = ...,
reference_cls: None = ...,
) -> list[LiteralMapping[NamableReference]]: ...
# docstr-coverage:excused `overload`
@overload
def df_to_literal_mappings(
df: pandas.DataFrame,
*,
names: Mapping[Reference, str] | None = ...,
reference_cls: type[R] = ...,
) -> list[LiteralMapping[R]]: ...
[docs]
def df_to_literal_mappings(
df: pandas.DataFrame,
*,
names: Mapping[Reference, str] | None = None,
reference_cls: type[R] | None = None,
) -> list[LiteralMapping[R]] | list[LiteralMapping[NamableReference]]:
"""Get mapping objects from a dataframe."""
it = (row for _, row in df.iterrows())
if reference_cls is None:
return _from_dicts(it, names=names)
else:
return _from_dicts(it, names=names, reference_cls=reference_cls)
#: Valid writers
Writer = Literal["pandas", "csv"]
def _resolve_writer(writer: Writer | None = None) -> Writer:
if writer is None or writer == "pandas":
if PANDAS_AVAILABLE:
return "pandas"
else:
return "csv"
return writer
[docs]
def write_literal_mappings(
literal_mappings: Iterable[LiteralMapping[R]],
path: str | Path,
*,
writer: Writer | None = None,
) -> None:
"""Write literal mappings to a path."""
path = Path(path).expanduser().resolve()
writer = _resolve_writer(writer)
if writer == "pandas":
_write_pandas(literal_mappings=literal_mappings, path=path)
elif writer == "csv":
_write_builtin(literal_mappings=literal_mappings, path=path)
else:
raise ValueError(f"invalid writer: {writer}. Choose one of {Writer}")
def _write_builtin(*, path: Path, literal_mappings: Iterable[LiteralMapping[R]]) -> None:
with safe_open_writer(path) as writer:
writer.writerow(HEADER)
writer.writerows(
literal_mapping._as_row_for_writer() for literal_mapping in literal_mappings
)
def _write_pandas(*, path: Path, literal_mappings: Iterable[LiteralMapping[R]]) -> None:
df = literal_mappings_to_df(literal_mappings)
df.to_csv(path, index=False, sep="\t")
[docs]
def append_literal_mapping(literal_mapping: LiteralMapping[R], path: str | Path) -> None:
"""Append a literal mapping to an existing file."""
with Path(path).expanduser().resolve().open("a") as file:
print(*literal_mapping._as_row_for_writer(), sep="\t", file=file)
# docstr-coverage:excused `overload`
@overload
def read_literal_mappings(
path: str | Path,
*,
delimiter: str | None = ...,
names: Mapping[Reference, str] | None = ...,
reference_cls: type[R] = ...,
show_progress: bool = ...,
) -> list[LiteralMapping[R]]: ...
# docstr-coverage:excused `overload`
@overload
def read_literal_mappings(
path: str | Path,
*,
delimiter: str | None = ...,
names: Mapping[Reference, str] | None = ...,
reference_cls: None = ...,
show_progress: bool = ...,
) -> list[LiteralMapping[NamableReference]]: ...
[docs]
def read_literal_mappings(
path: str | Path,
*,
delimiter: str | None = None,
names: Mapping[Reference, str] | None = None,
reference_cls: type[R] | None = None,
show_progress: bool = False,
) -> list[LiteralMapping[R]] | list[LiteralMapping[NamableReference]]:
"""Load literal mappings from a file.
:param path: A local file path or URL for a biosynonyms-flavored CSV/TSV file
:param delimiter: The delimiter for the CSV/TSV file. Defaults to tab
:param names: A pre-parsed dictionary from references (i.e., prefix-luid pairs) to
default labels
:param reference_cls: The class used to parse references. E.g., swap out for
:class:`pyobo.Reference` to automatically do Bioregistry validation on
references.
:param show_progress: Should a progress bar be shown? Defaults to false.
:returns: A list of literal mappings parsed from the table
"""
if reference_cls is None:
reference_cls = NamableReference # type:ignore
assert reference_cls is not None # noqa:S101
if isinstance(path, str) and any(path.startswith(schema) for schema in ("https://", "http://")):
import requests
if path.endswith(".gz"):
with requests.get(path, stream=True, timeout=15) as res:
lines = gzip.decompress(res.content).decode().splitlines()
return _from_lines(
lines,
delimiter=delimiter,
names=names,
reference_cls=reference_cls,
show_progress=show_progress,
)
else:
res = requests.get(path, timeout=15)
res.raise_for_status()
return _from_lines(
res.iter_lines(decode_unicode=True),
delimiter=delimiter,
names=names,
reference_cls=reference_cls,
show_progress=show_progress,
)
path = Path(path).expanduser().resolve()
if path.suffix == ".numbers":
return _parse_numbers(
path, names=names, show_progress=show_progress, reference_cls=reference_cls
)
with safe_open(path) as file:
return _from_lines(
file,
delimiter=delimiter,
names=names,
reference_cls=reference_cls,
show_progress=show_progress,
)
# docstr-coverage:excused `overload`
@overload
def read_gilda_terms(
path: str | Path,
*,
reference_cls: type[R] = ...,
) -> list[LiteralMapping[R]]: ...
# docstr-coverage:excused `overload`
@overload
def read_gilda_terms(
path: str | Path,
*,
reference_cls: None = ...,
) -> list[LiteralMapping[NamableReference]]: ...
[docs]
def read_gilda_terms(
path: str | Path,
*,
reference_cls: type[R] | None = None,
) -> list[LiteralMapping[R]] | list[LiteralMapping[NamableReference]]:
"""Read Gilda terms from a file."""
import gilda.grounder
path = _prepare_gilda_path(path)
# we know the result will be homogenous, so we ignore
return [ # type:ignore[return-value]
LiteralMapping.from_gilda(gilda_term, reference_cls=reference_cls)
for gilda_term in gilda.grounder.load_entries_from_terms_file(path)
]
[docs]
def write_gilda_terms(
literal_mappings: Iterable[LiteralMapping[R]],
path: str | Path,
*,
on_error: GildaErrorPolicy = "ignore",
) -> None:
"""Write Gilda terms to a file."""
from gilda import dump_terms
path = _prepare_gilda_path(path)
dump_terms(literal_mappings_to_gilda(literal_mappings, on_error=on_error), path)
def _prepare_gilda_path(path: str | Path) -> Path:
path = Path(path).expanduser().resolve()
if not path.suffix.endswith(".gz"):
raise ValueError(f"gilda terms files are required to be gzipped and end with .gz: {path}")
return path
# docstr-coverage:excused `overload`
@overload
def _parse_numbers(
path: str | Path,
*,
names: Mapping[Reference, str] | None = ...,
reference_cls: None = ...,
show_progress: bool = ...,
) -> list[LiteralMapping[NamableReference]]: ...
# docstr-coverage:excused `overload`
@overload
def _parse_numbers(
path: str | Path,
*,
names: Mapping[Reference, str] | None = ...,
reference_cls: type[R] = ...,
show_progress: bool = ...,
) -> list[LiteralMapping[R]]: ...
def _parse_numbers(
path: str | Path,
*,
names: Mapping[Reference, str] | None = None,
reference_cls: type[R] | None = None,
show_progress: bool = False,
) -> list[LiteralMapping[R]] | list[LiteralMapping[NamableReference]]:
# code example from https://pypi.org/project/numbers-parser
import numbers_parser
doc = numbers_parser.Document(path)
sheets = doc.sheets
tables = sheets[0].tables
header, *rows = tables[0].rows(values_only=True)
return _from_dicts(
(dict(zip(header, row, strict=False)) for row in rows),
names=names,
reference_cls=reference_cls,
show_progress=show_progress,
)
# docstr-coverage:excused `overload`
@overload
def _from_lines(
lines: Iterable[str],
*,
delimiter: str | None = ...,
names: Mapping[Reference, str] | None = ...,
reference_cls: None = ...,
show_progress: bool = ...,
) -> list[LiteralMapping[NamableReference]]: ...
# docstr-coverage:excused `overload`
@overload
def _from_lines(
lines: Iterable[str],
*,
delimiter: str | None = ...,
names: Mapping[Reference, str] | None = ...,
reference_cls: type[R] = ...,
show_progress: bool = ...,
) -> list[LiteralMapping[R]]: ...
def _from_lines(
lines: Iterable[str],
*,
delimiter: str | None = None,
names: Mapping[Reference, str] | None = None,
reference_cls: type[R] | None = None,
show_progress: bool = False,
) -> list[LiteralMapping[R]] | list[LiteralMapping[NamableReference]]:
return _from_dicts(
csv.DictReader(lines, delimiter=delimiter or "\t"),
names=names,
reference_cls=reference_cls,
show_progress=show_progress,
)
# docstr-coverage:excused `overload`
@overload
def _from_dicts(
dicts: Iterable[dict[str, Any]],
*,
names: Mapping[Reference, str] | None = ...,
reference_cls: None = ...,
show_progress: bool = ...,
) -> list[LiteralMapping[NamableReference]]: ...
# docstr-coverage:excused `overload`
@overload
def _from_dicts(
dicts: Iterable[dict[str, Any]],
*,
names: Mapping[Reference, str] | None = ...,
reference_cls: type[R] = ...,
show_progress: bool = ...,
) -> list[LiteralMapping[R]]: ...
def _from_dicts(
dicts: Iterable[dict[str, Any]],
*,
names: Mapping[Reference, str] | None = None,
reference_cls: type[R] | None = None,
show_progress: bool = False,
) -> list[LiteralMapping[R]] | list[LiteralMapping[NamableReference]]:
rv = []
it = tqdm(
dicts,
unit_scale=True,
unit="mapping",
desc="parsing literal mappings",
disable=not show_progress,
)
for i, record in enumerate(it, start=2):
record = {
k: v
for k, v in record.items()
if k and v and isinstance(v, str) and k.strip() and v.strip()
}
if record:
try:
literal_mapping = LiteralMapping.from_row(
record, names=names, reference_cls=reference_cls
)
except ValueError as e:
raise ValueError(f"failed on row {i}: {record}") from e
rv.append(literal_mapping)
# ignore here since we know that the types will be homogenous
return rv # type:ignore[return-value]
[docs]
def group_literal_mappings(
literal_mappings: Iterable[LiteralMapping[R]],
) -> dict[R, list[LiteralMapping[R]]]:
"""Aggregate literal mappings by reference."""
dd: defaultdict[R, list[LiteralMapping[R]]] = defaultdict(list)
for literal_mapping in tqdm(
literal_mappings, unit="literal mapping", unit_scale=True, leave=False
):
dd[literal_mapping.reference].append(literal_mapping)
return dict(dd)
[docs]
def get_prefixes(
literal_mapping_index: LiteralMappingIndex[R] | list[LiteralMapping[R]],
) -> set[str]:
"""Get all prefixes appearing in a literal mapping index or iterable of literal mappings."""
if isinstance(literal_mapping_index, dict):
return _get_prefixes_from_index(literal_mapping_index)
elif isinstance(literal_mapping_index, list):
return _get_prefixes_from_iterable(literal_mapping_index)
else:
raise TypeError
def _get_prefixes_from_iterable(literal_mappings: Iterable[LiteralMapping[R]]) -> set[str]:
return {
reference.prefix
for literal_mapping in literal_mappings
for reference in literal_mapping.get_all_references()
}
def _get_prefixes_from_index(literal_mapping_index: LiteralMappingIndex[R]) -> set[str]:
return _get_prefixes_from_iterable(
literal_mapping
for literal_mappings in literal_mapping_index.values()
for literal_mapping in literal_mappings
)
[docs]
def lint_literal_mappings(
path: Path,
*,
delimiter: str | None = None,
reference_cls: type[R] | None = None,
) -> None:
"""Lint a literal mappings file."""
literal_mappings = read_literal_mappings(path, delimiter=delimiter, reference_cls=reference_cls)
literal_mappings = sorted(literal_mappings) # type:ignore[assignment]
# it's okay the type can't be ignored for this, since it doesn't matter what it is
write_literal_mappings(literal_mappings, path) # type:ignore[misc]
def _lm_sort_key(lm: LiteralMapping[R]) -> tuple[str, str, str, str]:
return lm.text.casefold(), lm.text, lm.reference.curie.casefold(), lm.reference.curie
[docs]
def remap_literal_mappings(
literal_mappings: list[LiteralMapping[R]],
mappings: list[tuple[R, R]],
*,
progress: bool = False,
) -> list[LiteralMapping[R]]:
"""Use a priority mapping to re-write terms with priority groundings.
:param literal_mappings: A list of literal mappings
:param mappings: A list of pairs that constitute mappings, e.g. from SeMRA
:param progress: Should a progress bar be shown?
:returns: A new list of literal mapping objects that have been remapped
"""
index = group_literal_mappings(literal_mappings)
# build a lookup table, since the mappings coming into this function
# might not have names associated with them, but the literal mappings do
refs: dict[ReferenceTuple, R] = {i.pair: i for i in index}
for source, target in tqdm(
mappings, unit="mapping", unit_scale=True, desc="applying mappings", disable=not progress
):
# overwrite the target with a reference that has a name, if it exists
target = refs.get(target.pair, target)
source_literal_mappings: list[LiteralMapping[R]] | None = index.pop(source, None)
if source_literal_mappings:
index.setdefault(target, []).extend(
_make_new_lm(literal_mapping, target) for literal_mapping in source_literal_mappings
)
# Unwind the terms index
new_terms = list(itt.chain.from_iterable(index.values()))
# TODO filter out duplicates?
return new_terms
def _make_new_lm(
term: LiteralMapping[R],
reference: Reference,
) -> LiteralMapping[R]:
"""Make a new literal term object by replacing the database, identifier, and name."""
new_ref: R = term.reference.__class__(
prefix=reference.prefix,
identifier=reference.identifier,
name=getattr(reference, "name", None),
)
return term.model_copy(update={"reference": new_ref})