Skip to content

Pipeline

novelentitymatcher.pipeline.discovery

Public pipeline-first discovery API.

Classes

DiscoveryPipeline(entities=None, *, matcher=None, review_storage_path='./proposals/review_records.json', config=None, **kwargs)

Bases: DiscoveryBase

Pipeline-first public entry point for discovery and promotion workflows.

Owns its own Matcher, NoveltyDetector, ScalableClusterer, and LLMClassProposer instances, and routes everything through a PipelineOrchestrator built from PipelineConfig.

Source code in src/novelentitymatcher/pipeline/discovery.py
def __init__(
    self,
    entities: list[dict[str, Any]] | None = None,
    *,
    matcher: Matcher | None = None,
    review_storage_path: str | Path = "./proposals/review_records.json",
    config: PipelineConfig | None = None,
    **kwargs: Any,
):
    self._config = config or PipelineConfig.from_dict(kwargs)
    review_storage_path = str(review_storage_path)

    # Build owned Matcher
    if matcher is not None:
        self.matcher = matcher
    else:
        if entities is None:
            raise ValueError("entities is required when matcher is not provided")
        threshold = kwargs.get(
            "acceptance_threshold",
            kwargs.get("match_threshold", 0.5),
        )
        self.matcher = Matcher(
            entities=entities,
            model=kwargs.get("model", "potion-32m"),
            mode=kwargs.get("mode", "zero-shot"),
            threshold=threshold,
        )

    self.entities = (
        entities
        if entities is not None
        else list(getattr(self.matcher, "entities", []))
    )
    self.acceptance_threshold = kwargs.get(
        "acceptance_threshold",
        kwargs.get("match_threshold", getattr(self.matcher, "threshold", 0.5)),
    )

    # Build owned NoveltyDetector
    detection_config = self._build_detection_config(kwargs)
    self.detector = NoveltyDetector(config=detection_config)
    self.use_novelty_detector = self._config.ood_enabled

    # Build owned ScalableClusterer
    clustering_cfg = detection_config.clustering or ClusteringConfig(
        min_cluster_size=self._config.min_cluster_size,
    )
    self.clusterer = ScalableClusterer(
        backend=self._config.clustering_backend,
        min_cluster_size=clustering_cfg.min_cluster_size,
        min_samples=(
            self._config.clustering_min_samples
            or clustering_cfg.hdbscan_min_samples
        ),
        cluster_selection_epsilon=(
            self._config.clustering_cluster_selection_epsilon
        ),
        umap_metric=self._config.clustering_metric,
    )

    # Build owned LLMClassProposer
    self.llm_proposer = LLMClassProposer(
        primary_model=self._config.llm_model,
        provider=self._config.llm_provider,
        api_keys=kwargs.get("llm_api_keys"),
    )

    # HITL
    self.review_manager = ProposalReviewManager(review_storage_path)
    self.output_dir = self._config.output_dir
    self.auto_save = self._config.auto_save

    # Build orchestrator
    self._orchestrator = self._build_orchestrator()
Attributes
novel_entity_matcher property

Backward-compatible alias exposing detector/llm_proposer as if from NovelEntityMatcher.

Functions
discover(queries, *, existing_classes=None, context=None, return_metadata=True, run_llm_proposal=None) async

Run the full pipeline: match -> OOD -> cluster -> evidence -> propose.

Source code in src/novelentitymatcher/pipeline/discovery.py
async def discover(
    self,
    queries: list[str],
    *,
    existing_classes: list[str] | None = None,
    context: str | None = None,
    return_metadata: bool = True,
    run_llm_proposal: bool | None = None,
) -> NovelClassDiscoveryReport:
    """Run the full pipeline: match -> OOD -> cluster -> evidence -> propose."""
    pipeline = self._build_orchestrator(
        existing_classes=existing_classes,
        context=context,
        run_llm_proposal=run_llm_proposal,
    )
    ctx = StageContext(inputs=list(queries))

    if return_metadata:
        pipeline_result = await pipeline.run_async(ctx)
    else:
        pipeline_result = pipeline.run(ctx)

    report = self._build_discovery_report(
        pipeline_result=pipeline_result,
        detection_config_dump=self.detector.config.model_dump(),
        existing_classes=existing_classes,
        context=context,
    )

    if self._config.auto_create_review_records:
        report.review_records = self.review_manager.create_records(report)

    return self._finalize_report(report)
promote_proposal(review_id, *, promoter=None)

Promote a review record, optionally invoking a promoter callback.

If no promoter is provided, a default promoter updates the matcher's known entities from the proposal.

Source code in src/novelentitymatcher/pipeline/discovery.py
def promote_proposal(
    self,
    review_id: str,
    *,
    promoter: Callable[[ProposalReviewRecord], Any] | None = None,
) -> PromotionResult:
    """Promote a review record, optionally invoking a promoter callback.

    If no promoter is provided, a default promoter updates the matcher's
    known entities from the proposal.
    """
    effective_promoter = promoter or self._default_promoter
    return self.review_manager.promote(review_id, promoter=effective_promoter)

Functions

novelentitymatcher.pipeline.config

Unified pipeline configuration for the discovery pipeline.

Classes

PipelineConfig

Bases: BaseModel

Unified configuration driving stage selection and optional capabilities.

Functions
from_dict(data) classmethod

Construct a PipelineConfig from a plain dictionary.

Source code in src/novelentitymatcher/pipeline/config.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PipelineConfig:
    """Construct a PipelineConfig from a plain dictionary."""
    return cls(**{k: v for k, v in data.items() if k in cls.model_fields})
to_dict()

Serialize to a plain dictionary.

Source code in src/novelentitymatcher/pipeline/config.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a plain dictionary."""
    return self.model_dump()
stages()

Return an ordered list of enabled stage names.

Source code in src/novelentitymatcher/pipeline/config.py
def stages(self) -> list[str]:
    """Return an ordered list of enabled stage names."""
    enabled: list[str] = []
    if self.match_enabled:
        enabled.append("match")
    if self.ood_enabled:
        enabled.append("ood")
    if self.clustering_enabled:
        enabled.append("cluster")
    if self.evidence_enabled:
        enabled.append("evidence")
    if self.proposal_enabled:
        enabled.append("proposal")
    return enabled

novelentitymatcher.pipeline.contracts

Internal staged discovery pipeline contracts.

Classes

StageContext(inputs, artifacts=dict(), metadata=dict()) dataclass

Mutable context passed between internal pipeline stages.

Functions
artifact_summary()

Return a summary of artifact keys and their types.

Source code in src/novelentitymatcher/pipeline/contracts.py
def artifact_summary(self) -> dict[str, str]:
    """Return a summary of artifact keys and their types."""
    return {key: type(value).__name__ for key, value in self.artifacts.items()}

StageResult(stage_name, artifacts=dict(), metadata=dict(), contract_version='1.0', timing_ms=None, stage_config_snapshot=dict(), errors=list()) dataclass

Result returned by a single pipeline stage.

PipelineRunResult(context, stage_results=list(), timing_breakdown=dict()) dataclass

Terminal result for an internal pipeline run.

PipelineStage

Bases: ABC

Base contract for internal discovery stages.

Functions
run(context) abstractmethod

Execute the stage synchronously.

Source code in src/novelentitymatcher/pipeline/contracts.py
@abstractmethod
def run(self, context: StageContext) -> StageResult:
    """Execute the stage synchronously."""
run_async(context) async

Async entrypoint; stages can override when they have real async work.

Source code in src/novelentitymatcher/pipeline/contracts.py
async def run_async(self, context: StageContext) -> StageResult:
    """Async entrypoint; stages can override when they have real async work."""
    return self.run(context)

novelentitymatcher.pipeline.orchestrator

Internal pipeline orchestrator.

Classes

PipelineOrchestrator(stages)

Runs an ordered list of internal stages against a shared context.

Source code in src/novelentitymatcher/pipeline/orchestrator.py
def __init__(self, stages: Iterable[PipelineStage]):
    self.stages: list[PipelineStage] = list(stages)

novelentitymatcher.pipeline.pipeline_builder

Pipeline builder that consolidates 5-stage discovery pipeline construction.

Classes

PipelineStageConfig(match_enabled=True, collect_sync=None, collect_async=None, detector=None, clusterer=None, llm_proposer=None, use_novelty_detector=True, clustering_enabled=True, clustering_backend='auto', similarity_threshold=0.75, min_cluster_size=5, clustering_metric='cosine', clustering_min_samples=None, clustering_cluster_selection_epsilon=0.0, evidence_enabled=True, evidence_method='tfidf', max_keywords=8, max_examples=4, token_budget=256, use_tfidf=None, run_llm_proposal=True, existing_classes_resolver=None, context_text=None, max_retries=2, prefer_cluster_level=True, ood_strategies=None, ood_calibration_mode='none', ood_calibration_alpha=0.1, ood_mahalanobis_mode='class_conditional', proposal_mode='cluster', proposal_schema_discovery=False, proposal_schema_max_attributes=10, proposal_hierarchical=True) dataclass

Configuration for a single pipeline stage.

PipelineBuilder(config=None, **kwargs)

Builds a 5-stage discovery pipeline orchestrator.

Consolidates pipeline construction logic that was previously duplicated between DiscoveryPipeline and NovelEntityMatcher.

Source code in src/novelentitymatcher/pipeline/pipeline_builder.py
def __init__(self, config: PipelineStageConfig | None = None, **kwargs: Any):
    if config is not None:
        self._cfg = config
    else:
        self._cfg = self._from_kwargs(kwargs)
Functions
build(*, existing_classes=None, context=None, run_llm_proposal=None)

Build the 5-stage pipeline orchestrator.

Source code in src/novelentitymatcher/pipeline/pipeline_builder.py
def build(
    self,
    *,
    existing_classes: list[str] | None = None,
    context: str | None = None,
    run_llm_proposal: bool | None = None,
) -> PipelineOrchestrator:
    """Build the 5-stage pipeline orchestrator."""
    cfg = self._cfg
    if not cfg.match_enabled:
        raise ValueError(
            "match_enabled=False is not supported because downstream stages "
            "require matcher metadata artifacts."
        )

    if run_llm_proposal is None:
        run_llm_proposal = cfg.run_llm_proposal

    clusterer = cfg.clusterer
    if clusterer is None and cfg.clustering_enabled:
        try:
            from ..novelty.clustering.scalable import ScalableClusterer

            clusterer = ScalableClusterer(
                backend=cfg.clustering_backend,
                min_cluster_size=cfg.min_cluster_size,
                min_samples=cfg.clustering_min_samples or cfg.min_cluster_size,
                cluster_selection_epsilon=cfg.clustering_cluster_selection_epsilon,
                umap_metric=cfg.clustering_metric,
            )
        except ImportError:
            from ..utils.logging_config import get_logger

            get_logger(__name__).warning(
                "ScalableClusterer not available; disabling clustering"
            )
            clusterer = None

    stages = [
        MatcherMetadataStage(
            collect_sync=cfg.collect_sync,
            collect_async=cfg.collect_async,
        ),
        OODDetectionStage(
            detector=cfg.detector,
            enabled=cfg.use_novelty_detector,
            ood_strategies=cfg.ood_strategies,
            ood_calibration_mode=cfg.ood_calibration_mode,
            ood_calibration_alpha=cfg.ood_calibration_alpha,
            ood_mahalanobis_mode=cfg.ood_mahalanobis_mode,
        ),
        CommunityDetectionStage(
            clusterer=clusterer,
            enabled=cfg.clustering_enabled,
            similarity_threshold=cfg.similarity_threshold,
            min_cluster_size=max(2, cfg.min_cluster_size),
            clustering_metric=cfg.clustering_metric,
        ),
        ClusterEvidenceStage(
            enabled=cfg.evidence_enabled,
            max_keywords=cfg.max_keywords,
            max_examples=cfg.max_examples,
            token_budget=cfg.token_budget,
            evidence_method=cfg.evidence_method,
            use_tfidf=cfg.use_tfidf,
        ),
        ProposalStage(
            proposer=cfg.llm_proposer,
            existing_classes_resolver=cfg.existing_classes_resolver
            or (lambda: existing_classes or []),
            enabled=run_llm_proposal,
            context_text=context or cfg.context_text,
            max_retries=cfg.max_retries,
            force_cluster_level=cfg.prefer_cluster_level,
            proposal_mode=cfg.proposal_mode,
            proposal_schema_discovery=cfg.proposal_schema_discovery,
            proposal_schema_max_attributes=cfg.proposal_schema_max_attributes,
            proposal_hierarchical=cfg.proposal_hierarchical,
        ),
    ]

    return PipelineOrchestrator(stages=stages)
from_pipeline_config(config, *, collect_sync=None, collect_async=None, detector=None, clusterer=None, llm_proposer=None, existing_classes_resolver=None) classmethod

Factory to create PipelineBuilder from a PipelineConfig object.

Source code in src/novelentitymatcher/pipeline/pipeline_builder.py
@classmethod
def from_pipeline_config(
    cls,
    config: Any,
    *,
    collect_sync: Callable[[list[str]], tuple[Any, dict]] | None = None,
    collect_async: Callable[[list[str]], Awaitable[tuple[Any, dict]]] | None = None,
    detector: Any = None,
    clusterer: Any = None,
    llm_proposer: Any = None,
    existing_classes_resolver: Callable[[], list[str]] | None = None,
) -> PipelineBuilder:
    """Factory to create PipelineBuilder from a PipelineConfig object."""
    return cls(
        PipelineStageConfig(
            collect_sync=collect_sync,
            collect_async=collect_async,
            match_enabled=config.match_enabled,
            detector=detector,
            clusterer=clusterer,
            llm_proposer=llm_proposer,
            use_novelty_detector=config.ood_enabled,
            clustering_enabled=config.clustering_enabled,
            clustering_backend=config.clustering_backend,
            similarity_threshold=config.similarity_threshold,
            min_cluster_size=config.min_cluster_size,
            clustering_metric=getattr(config, "clustering_metric", "cosine"),
            clustering_min_samples=getattr(config, "clustering_min_samples", None),
            clustering_cluster_selection_epsilon=getattr(
                config, "clustering_cluster_selection_epsilon", 0.0
            ),
            evidence_enabled=config.evidence_enabled,
            evidence_method=getattr(config, "evidence_method", "tfidf"),
            max_keywords=config.max_keywords,
            max_examples=config.max_examples,
            token_budget=config.token_budget,
            use_tfidf=getattr(config, "use_tfidf", None),
            run_llm_proposal=config.proposal_enabled,
            existing_classes_resolver=existing_classes_resolver,
            context_text=None,
            max_retries=config.max_retries,
            prefer_cluster_level=config.prefer_cluster_level,
            ood_strategies=getattr(config, "ood_strategies", None),
            ood_calibration_mode=getattr(config, "ood_calibration_mode", "none"),
            ood_calibration_alpha=getattr(config, "ood_calibration_alpha", 0.1),
            ood_mahalanobis_mode=getattr(
                config, "ood_mahalanobis_mode", "class_conditional"
            ),
            proposal_mode=getattr(config, "proposal_mode", "cluster"),
            proposal_schema_discovery=getattr(
                config, "proposal_schema_discovery", False
            ),
            proposal_schema_max_attributes=getattr(
                config, "proposal_schema_max_attributes", 10
            ),
            proposal_hierarchical=getattr(config, "proposal_hierarchical", True),
        )
    )

novelentitymatcher.pipeline.adapters

Adapters that route existing matcher and discovery capabilities through stages.

Classes

MatcherMetadataStage(collect_sync, collect_async)

Bases: PipelineStage

Collect rich matcher metadata and reference corpus for downstream stages.

Source code in src/novelentitymatcher/pipeline/adapters.py
def __init__(
    self,
    collect_sync: Callable[[list[str]], tuple[Any, dict[Any, Any]]] | None,
    collect_async: Callable[[list[str]], Awaitable[tuple[Any, dict[Any, Any]]]]
    | None,
):
    if collect_sync is None or collect_async is None:
        raise ValueError("collect_sync and collect_async must be provided")
    self._collect_sync = collect_sync
    self._collect_async = collect_async

OODDetectionStage(detector, enabled=True, ood_strategies=None, ood_calibration_mode='none', ood_calibration_alpha=0.1, ood_mahalanobis_mode='class_conditional')

Bases: PipelineStage

Run novelty detection against the stable matcher metadata contract.

Source code in src/novelentitymatcher/pipeline/adapters.py
def __init__(
    self,
    detector: Any,
    enabled: bool = True,
    ood_strategies: list[str] | None = None,
    ood_calibration_mode: str = "none",
    ood_calibration_alpha: float = 0.1,
    ood_mahalanobis_mode: str = "class_conditional",
):
    self.detector = detector
    self.enabled = enabled
    self.ood_strategies = ood_strategies
    self.ood_calibration_mode = ood_calibration_mode
    self.ood_calibration_alpha = ood_calibration_alpha
    self.ood_mahalanobis_mode = ood_mahalanobis_mode
Functions
run_async(context) async

Async entrypoint; stages can override when they have real async work.

Source code in src/novelentitymatcher/pipeline/contracts.py
async def run_async(self, context: StageContext) -> StageResult:
    """Async entrypoint; stages can override when they have real async work."""
    return self.run(context)

CommunityDetectionStage(clusterer, *, enabled=True, similarity_threshold=0.75, min_cluster_size=2, clustering_metric='cosine')

Bases: PipelineStage

Cluster likely novel samples into discovery communities.

Source code in src/novelentitymatcher/pipeline/adapters.py
def __init__(
    self,
    clusterer: Any,
    *,
    enabled: bool = True,
    similarity_threshold: float = 0.75,
    min_cluster_size: int = 2,
    clustering_metric: str = "cosine",
):
    self.clusterer = clusterer
    self.enabled = enabled
    self.similarity_threshold = similarity_threshold
    self.min_cluster_size = min_cluster_size
    self.clustering_metric = clustering_metric
Functions
run_async(context) async

Async entrypoint; stages can override when they have real async work.

Source code in src/novelentitymatcher/pipeline/contracts.py
async def run_async(self, context: StageContext) -> StageResult:
    """Async entrypoint; stages can override when they have real async work."""
    return self.run(context)

ClusterEvidenceStage(*, enabled=True, max_keywords=8, max_examples=4, token_budget=256, use_rake=True, evidence_method='tfidf', use_tfidf=None)

Bases: PipelineStage

Extract compact evidence from clusters before proposal generation.

Source code in src/novelentitymatcher/pipeline/adapters.py
def __init__(
    self,
    *,
    enabled: bool = True,
    max_keywords: int = 8,
    max_examples: int = 4,
    token_budget: int = 256,
    use_rake: bool = True,
    evidence_method: str = "tfidf",
    use_tfidf: bool | None = None,
):
    self.enabled = enabled
    self.max_keywords = max_keywords
    self.max_examples = max_examples
    self.token_budget = token_budget
    self.use_rake = use_rake
    if use_tfidf is not None:
        evidence_method = "tfidf" if use_tfidf else "centroid"
    self.evidence_method = evidence_method
    self.use_tfidf = self.evidence_method == "tfidf"
Functions
run_async(context) async

Async entrypoint; stages can override when they have real async work.

Source code in src/novelentitymatcher/pipeline/contracts.py
async def run_async(self, context: StageContext) -> StageResult:
    """Async entrypoint; stages can override when they have real async work."""
    return self.run(context)

ProposalStage(proposer, existing_classes_resolver, enabled=True, context_text=None, max_retries=2, force_cluster_level=True, proposal_mode='cluster', proposal_schema_discovery=False, proposal_schema_max_attributes=10, proposal_hierarchical=True)

Bases: PipelineStage

Optionally generate class proposals from a novelty report.

Source code in src/novelentitymatcher/pipeline/adapters.py
def __init__(
    self,
    proposer: Any,
    existing_classes_resolver: Callable[[], list[str]],
    enabled: bool = True,
    context_text: str | None = None,
    max_retries: int = 2,
    force_cluster_level: bool = True,
    proposal_mode: str = "cluster",
    proposal_schema_discovery: bool = False,
    proposal_schema_max_attributes: int = 10,
    proposal_hierarchical: bool = True,
):
    self.proposer = proposer
    self._existing_classes_resolver = existing_classes_resolver
    self.enabled = enabled
    self.context_text = context_text
    self.max_retries = max_retries
    self.force_cluster_level = force_cluster_level
    self.proposal_mode = proposal_mode
    self.proposal_schema_discovery = proposal_schema_discovery
    self.proposal_schema_max_attributes = proposal_schema_max_attributes
    self.proposal_hierarchical = proposal_hierarchical
Functions
run_async(context) async

Async entrypoint; stages can override when they have real async work.

Source code in src/novelentitymatcher/pipeline/contracts.py
async def run_async(self, context: StageContext) -> StageResult:
    """Async entrypoint; stages can override when they have real async work."""
    return self.run(context)

novelentitymatcher.pipeline.discovery_support

Shared helpers for novelty-aware match and discovery orchestration.

Classes

Functions

collect_match_result_async(matcher, queries, top_k=5) async

Async helper to collect match result and reference corpus.

Consolidates duplicated logic from NovelEntityMatcher and DiscoveryPipeline.

Parameters:

Name Type Description Default
matcher Any

Matcher instance

required
queries list[str]

List of query texts

required
top_k int

Number of top candidates to retrieve

5

Returns:

Type Description
tuple[MatchResultWithMetadata, dict[str, Any]]

Tuple of (match_result, reference_corpus)

Source code in src/novelentitymatcher/pipeline/discovery_support.py
async def collect_match_result_async(
    matcher: Any,
    queries: list[str],
    top_k: int = 5,
) -> tuple[MatchResultWithMetadata, dict[str, Any]]:
    """Async helper to collect match result and reference corpus.

    Consolidates duplicated logic from NovelEntityMatcher and DiscoveryPipeline.

    Args:
        matcher: Matcher instance
        queries: List of query texts
        top_k: Number of top candidates to retrieve

    Returns:
        Tuple of (match_result, reference_corpus)
    """
    match_async = getattr(matcher, "match_async", None)
    if callable(match_async):
        result = await match_async(
            queries,
            return_metadata=True,
            top_k=top_k,
        )
    else:
        result = await asyncio.to_thread(
            matcher.match,
            queries,
            return_metadata=True,
            top_k=top_k,
        )

    return result, matcher.get_reference_corpus()

collect_match_result_sync(matcher, queries, top_k=5)

Sync helper to collect match result and reference corpus.

Consolidates duplicated logic from NovelEntityMatcher and DiscoveryPipeline.

Parameters:

Name Type Description Default
matcher Any

Matcher instance

required
queries list[str]

List of query texts

required
top_k int

Number of top candidates to retrieve

5

Returns:

Type Description
tuple[MatchResultWithMetadata, dict[str, Any]]

Tuple of (match_result, reference_corpus)

Source code in src/novelentitymatcher/pipeline/discovery_support.py
def collect_match_result_sync(
    matcher: Any,
    queries: list[str],
    top_k: int = 5,
) -> tuple[MatchResultWithMetadata, dict[str, Any]]:
    """Sync helper to collect match result and reference corpus.

    Consolidates duplicated logic from NovelEntityMatcher and DiscoveryPipeline.

    Args:
        matcher: Matcher instance
        queries: List of query texts
        top_k: Number of top candidates to retrieve

    Returns:
        Tuple of (match_result, reference_corpus)
    """
    result = matcher.match(
        queries,
        return_metadata=True,
        top_k=top_k,
    )
    return result, matcher.get_reference_corpus()

export_pipeline_metrics(*, metrics, format='json', path=None)

Export pipeline metrics to file.

Parameters:

Name Type Description Default
metrics dict[str, Any]

Key-value pairs of metric data.

required
format str

Export format ('json' or 'csv').

'json'
path str | None

Output file path (default: './metrics_{timestamp}.{ext}').

None

Returns:

Type Description
Path

Path to exported metrics file.

Raises:

Type Description
ValueError

If format is not 'json' or 'csv'.

Source code in src/novelentitymatcher/pipeline/discovery_support.py
def export_pipeline_metrics(
    *,
    metrics: dict[str, Any],
    format: str = "json",
    path: str | None = None,
) -> Path:
    """Export pipeline metrics to file.

    Args:
        metrics: Key-value pairs of metric data.
        format: Export format ('json' or 'csv').
        path: Output file path (default: './metrics_{timestamp}.{ext}').

    Returns:
        Path to exported metrics file.

    Raises:
        ValueError: If format is not 'json' or 'csv'.
    """
    if format not in ("json", "csv"):
        raise ValueError(f"Unsupported format: {format}. Use 'json' or 'csv'.")

    if path is None:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        path = f"metrics_{timestamp}.{format}"

    output_path = Path(path)

    if format == "json":
        data = {
            "timestamp": datetime.now().isoformat(),
            "metrics": metrics,
        }
        output_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
    else:
        with open(output_path, "w", newline="", encoding="utf-8") as f:
            writer = csv.writer(f)
            writer.writerow(["metric", "value"])
            for key, value in metrics.items():
                writer.writerow([key, value])

    return output_path

novelentitymatcher.pipeline.match_result

Stable matcher metadata contracts used by novelty and pipeline internals.

Classes

MatchRecord(text, predicted_id, confidence, embedding, candidates=list(), raw_result=None, metadata=dict(), match_method=None, reference_embedding=None, distance=None) dataclass

Normalized per-query match metadata for downstream discovery stages.

MatchResultWithMetadata(predictions, confidences, embeddings, scores=None, metadata=None, candidate_results=list(), records=list()) dataclass

Enhanced match result with stable downstream metadata.

The legacy attributes (predictions, confidences, embeddings, metadata) remain available, while candidate_results and records provide a consistent contract for novelty and pipeline stages.

Functions

normalize_candidate_results(raw_match_results, num_queries)

Normalize raw matcher outputs into a stable list-of-lists shape.

Source code in src/novelentitymatcher/pipeline/match_result.py
def normalize_candidate_results(
    raw_match_results: Any, num_queries: int
) -> list[list[Any]]:
    """Normalize raw matcher outputs into a stable list-of-lists shape."""
    if raw_match_results is None:
        return [[] for _ in range(num_queries)]

    if num_queries == 1:
        if isinstance(raw_match_results, list):
            if raw_match_results and all(
                isinstance(item, dict) for item in raw_match_results
            ):
                return [raw_match_results]
            if len(raw_match_results) == 1 and isinstance(raw_match_results[0], list):
                return [list(raw_match_results[0])]
            if len(raw_match_results) == 1:
                first = raw_match_results[0]
                return [[first] if first is not None else []]
        return [[raw_match_results] if raw_match_results is not None else []]

    if isinstance(raw_match_results, list):
        normalized: list[list[Any]] = []
        for result in raw_match_results:
            if result is None:
                normalized.append([])
            elif isinstance(result, list):
                normalized.append(list(result))
            else:
                normalized.append([result])
        return normalized

    return [[raw_match_results] for _ in range(num_queries)]

build_match_records(texts, predictions, confidences, embeddings, candidate_results, match_method=None, reference_embeddings=None)

Build normalized per-query records for downstream pipeline stages.

Source code in src/novelentitymatcher/pipeline/match_result.py
def build_match_records(
    texts: Sequence[str],
    predictions: Sequence[str],
    confidences: np.ndarray,
    embeddings: np.ndarray,
    candidate_results: Sequence[Sequence[Any]],
    match_method: str | None = None,
    reference_embeddings: np.ndarray | None = None,
) -> list[MatchRecord]:
    """Build normalized per-query records for downstream pipeline stages."""
    records: list[MatchRecord] = []
    for idx, prediction in enumerate(predictions):
        text = texts[idx] if idx < len(texts) else ""
        candidates = (
            list(candidate_results[idx]) if idx < len(candidate_results) else []
        )
        raw_result = (
            candidates
            if len(candidates) > 1
            else (candidates[0] if candidates else None)
        )
        distance: float | None = None
        ref_emb: np.ndarray | None = None
        if reference_embeddings is not None and idx < len(reference_embeddings):
            ref_emb = reference_embeddings[idx]
            if ref_emb is not None and idx < len(embeddings):
                norm_prod = np.linalg.norm(embeddings[idx]) * np.linalg.norm(ref_emb)
                if norm_prod > 1e-12:
                    distance = float(1.0 - np.dot(embeddings[idx], ref_emb) / norm_prod)
        records.append(
            MatchRecord(
                text=text,
                predicted_id=str(prediction),
                confidence=float(confidences[idx]) if idx < len(confidences) else 0.0,
                embedding=embeddings[idx],
                candidates=candidates,
                raw_result=raw_result,
                metadata={"index": idx},
                match_method=match_method,
                reference_embedding=ref_emb,
                distance=distance,
            )
        )
    return records

build_match_result_with_metadata(texts, predictions, confidences, embeddings, raw_match_results, metadata=None, scores=None, match_method=None)

Create a stable metadata result from matcher outputs.

Source code in src/novelentitymatcher/pipeline/match_result.py
def build_match_result_with_metadata(
    texts: Sequence[str],
    predictions: Sequence[str],
    confidences: np.ndarray,
    embeddings: np.ndarray,
    raw_match_results: Any,
    metadata: dict[str, Any] | None = None,
    scores: np.ndarray | None = None,
    match_method: str | None = None,
) -> MatchResultWithMetadata:
    """Create a stable metadata result from matcher outputs."""
    candidate_results = normalize_candidate_results(raw_match_results, len(predictions))
    combined_metadata = dict(metadata or {})
    combined_metadata.setdefault("texts", list(texts))
    combined_metadata.setdefault("raw_match_results", raw_match_results)
    combined_metadata.setdefault("candidate_results", candidate_results)
    if match_method is not None:
        combined_metadata.setdefault("match_method", match_method)

    records = build_match_records(
        texts=texts,
        predictions=predictions,
        confidences=confidences,
        embeddings=embeddings,
        candidate_results=candidate_results,
        match_method=match_method,
    )

    return MatchResultWithMetadata(
        predictions=list(predictions),
        confidences=confidences,
        embeddings=embeddings,
        scores=scores,
        metadata=combined_metadata,
        candidate_results=candidate_results,
        records=records,
    )

convert_match_result_to_metadata(match_result, embeddings, confidences=None)

Convert standard match result to metadata-enhanced result.

Source code in src/novelentitymatcher/pipeline/match_result.py
def convert_match_result_to_metadata(
    match_result: Any,
    embeddings: np.ndarray,
    confidences: np.ndarray | None = None,
) -> MatchResultWithMetadata:
    """
    Convert standard match result to metadata-enhanced result.
    """
    if isinstance(match_result, dict):
        predictions = [match_result.get("id", "unknown")]
        scores = np.array([match_result.get("score", 0.0)])
        raw_match_results = [match_result]
    elif isinstance(match_result, list):
        if all(isinstance(r, dict) for r in match_result):
            predictions = [r.get("id", "unknown") for r in match_result]
            scores = np.array([r.get("score", 0.0) for r in match_result])
            raw_match_results = match_result
        else:
            predictions = [str(item) for item in match_result]
            scores = None
            raw_match_results = match_result
    else:
        predictions = [str(match_result)]
        scores = None
        raw_match_results = [match_result]

    if confidences is None:
        confidences = np.ones(len(predictions))

    return build_match_result_with_metadata(
        texts=[""] * len(predictions),
        predictions=predictions,
        confidences=confidences,
        embeddings=embeddings,
        raw_match_results=raw_match_results,
        scores=scores,
    )