Skip to content

Exceptions & Config

novelentitymatcher.exceptions

Custom exceptions for novel_entity_matcher with helpful context and suggestions.

Classes

SemanticMatcherError

Bases: Exception

Base exception for all novel_entity_matcher errors.

ValidationError(message, *, entity=None, field=None, suggestion=None)

Bases: ValueError, SemanticMatcherError

Raised when input validation fails with helpful context.

Attributes:

Name Type Description
entity

The entity that failed validation (if applicable)

field

The specific field that failed validation

suggestion

Helpful suggestion for fixing the error

Source code in src/novelentitymatcher/exceptions.py
def __init__(
    self,
    message: str,
    *,
    entity: dict[str, Any] | None = None,
    field: str | None = None,
    suggestion: str | None = None,
):
    self.raw_message = message
    self.entity = entity
    self.field = field
    self.suggestion = suggestion
    super().__init__(self._format_message())

TrainingError(message, *, training_mode=None, details=None)

Bases: RuntimeError, SemanticMatcherError

Raised when training fails with diagnostic information.

Attributes:

Name Type Description
training_mode

The mode that was being trained

details

Additional diagnostic information

Source code in src/novelentitymatcher/exceptions.py
def __init__(
    self,
    message: str,
    *,
    training_mode: str | None = None,
    details: dict[str, Any] | None = None,
):
    self.raw_message = message
    self.training_mode = training_mode
    self.details = details or {}
    super().__init__(self._format_message())

MatchingError

Bases: RuntimeError, SemanticMatcherError

Raised when matching operations fail.

ModeError(message, *, invalid_mode=None, valid_modes=None)

Bases: ValueError, SemanticMatcherError

Raised when matcher mode configuration is invalid.

Attributes:

Name Type Description
invalid_mode

The mode that was provided

valid_modes

List of valid mode options

Source code in src/novelentitymatcher/exceptions.py
def __init__(
    self,
    message: str,
    *,
    invalid_mode: str | None = None,
    valid_modes: list[str] | None = None,
):
    self.raw_message = message
    self.invalid_mode = invalid_mode
    self.valid_modes = valid_modes or [
        "zero-shot",
        "head-only",
        "full",
        "hybrid",
        "auto",
    ]
    super().__init__(self._format_message())

LLMError(message, *, last_error=None, attempted_models=None)

Bases: SemanticMatcherError

Raised when LLM operations fail after all retries.

Attributes:

Name Type Description
last_error

The last exception that caused all models to fail

attempted_models

List of models that were attempted

Source code in src/novelentitymatcher/exceptions.py
def __init__(
    self,
    message: str,
    *,
    last_error: Exception | None = None,
    attempted_models: list[str] | None = None,
):
    self.raw_message = message
    self.last_error = last_error
    self.attempted_models = attempted_models or []
    super().__init__(self._format_message())

novelentitymatcher.config

Classes

Config(custom_path=None)

Configuration loader with optional custom override merging.

Source code in src/novelentitymatcher/config.py
def __init__(self, custom_path: PathLike | None = None):
    self._config: dict[str, Any] = self._load_default_config()
    if custom_path:
        self._merge_custom_config(custom_path)

novelentitymatcher.config_registry

novelentitymatcher.api

Single import surface for the novel_entity_matcher public API.

Usage

from novelentitymatcher.api import *

or selective imports:

from novelentitymatcher.api import ( Matcher, NovelEntityMatcher, DiscoveryPipeline, PipelineConfig, DetectionConfig, NovelSampleMetadata, DiscoveryCluster, ClassProposal, )

Classes

BERTClassifier(labels, model_name='distilbert-base-uncased', num_epochs=3, batch_size=16, learning_rate=2e-05, max_length=128, use_fp16=True)

BERT-based text classifier using transformers library.

This classifier provides a drop-in alternative to SetFitClassifier with identical interface. It uses fine-tuned BERT models for classification, offering superior accuracy for complex pattern-driven tasks.

Example

from novelentitymatcher.core.bert_classifier import BERTClassifier labels = ["DE", "FR", "US"] clf = BERTClassifier(labels=labels, model_name="distilbert-base-uncased") training_data = [ ... {"text": "Germany", "label": "DE"}, ... {"text": "France", "label": "FR"}, ... {"text": "USA", "label": "US"}, ... ] clf.train(training_data, num_epochs=3) prediction = clf.predict("Deutschland") # "DE" proba = clf.predict_proba("Deutschland") # [0.02, 0.01, 0.97]

Parameters:

Name Type Description Default
labels list[str]

List of class labels for classification.

required
model_name str

HuggingFace model name or path. Default: "distilbert-base-uncased".

'distilbert-base-uncased'
num_epochs int

Number of training epochs. Default: 3.

3
batch_size int

Training batch size. Default: 16.

16
learning_rate float

Learning rate for training. Default: 2e-5.

2e-05
max_length int

Maximum sequence length for tokenization. Default: 128.

128
use_fp16 bool

Whether to use mixed precision training (faster, less memory). Only works on GPU. Default: True.

True
Source code in src/novelentitymatcher/core/bert_classifier.py
def __init__(
    self,
    labels: list[str],
    model_name: str = "distilbert-base-uncased",
    num_epochs: int = 3,
    batch_size: int = 16,
    learning_rate: float = 2e-5,
    max_length: int = 128,
    use_fp16: bool = True,
):
    """Initialize BERTClassifier.

    Args:
        labels: List of class labels for classification.
        model_name: HuggingFace model name or path. Default: "distilbert-base-uncased".
        num_epochs: Number of training epochs. Default: 3.
        batch_size: Training batch size. Default: 16.
        learning_rate: Learning rate for training. Default: 2e-5.
        max_length: Maximum sequence length for tokenization. Default: 128.
        use_fp16: Whether to use mixed precision training (faster, less memory).
            Only works on GPU. Default: True.
    """
    if not TRANSFORMERS_AVAILABLE:
        raise ImportError(
            "transformers is required for BERTClassifier. "
            "Install with: pip install transformers torch"
        )

    self.labels = labels
    self.label2id = {label: idx for idx, label in enumerate(labels)}
    self.id2label = {idx: label for label, idx in self.label2id.items()}
    self.model_name = model_name
    self.num_epochs = num_epochs
    self.batch_size = batch_size
    self.learning_rate = learning_rate
    self.max_length = max_length
    self.use_fp16 = use_fp16

    self.model: Any | None = None
    self.tokenizer: Any | None = None
    self.is_trained = False
    self.logger = get_logger(__name__)
Functions
train(training_data, num_epochs=None, batch_size=None, show_progress=True)

Train the BERT classifier.

Parameters:

Name Type Description Default
training_data list[dict]

List of training examples with 'text' and 'label' keys.

required
num_epochs int | None

Number of training epochs (overrides default).

None
batch_size int | None

Batch size for training (overrides default).

None
show_progress bool

Whether to show progress bar during training.

True

Raises:

Type Description
TrainingError

If training fails or data is invalid.

Source code in src/novelentitymatcher/core/bert_classifier.py
def train(
    self,
    training_data: list[dict],
    num_epochs: int | None = None,
    batch_size: int | None = None,
    show_progress: bool = True,
):
    """Train the BERT classifier.

    Args:
        training_data: List of training examples with 'text' and 'label' keys.
        num_epochs: Number of training epochs (overrides default).
        batch_size: Batch size for training (overrides default).
        show_progress: Whether to show progress bar during training.

    Raises:
        TrainingError: If training fails or data is invalid.
    """
    # Suppress third-party library logs
    suppress_third_party_loggers()

    epochs = num_epochs or self.num_epochs
    batch = batch_size or self.batch_size

    # Initialize tokenizer and model
    try:
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.model_name, use_fast=True
        )
        self.model = AutoModelForSequenceClassification.from_pretrained(
            self.model_name,
            num_labels=len(self.labels),
            id2label=self.id2label,
            label2id=self.label2id,
        )
    except (OSError, ValueError, KeyError, RuntimeError) as e:
        raise TrainingError(
            f"Failed to load model/tokenizer: {e}",
            details={"model_name": self.model_name},
        ) from e

    # Prepare dataset
    try:
        dataset = Dataset.from_list(training_data)

        # Tokenize data
        tokenizer = self.tokenizer

        def tokenize_function(examples):
            return tokenizer(
                examples["text"],
                padding="max_length",
                truncation=True,
                max_length=self.max_length,
            )

        tokenized_dataset = dataset.map(tokenize_function, batched=True)

        # Convert string labels to numeric IDs
        def format_labels(example):
            example["label"] = self.label2id[example["label"]]
            return example

        tokenized_dataset = tokenized_dataset.map(format_labels)

        # Remove text column as it's not needed for training
        tokenized_dataset = tokenized_dataset.remove_columns(["text"])
        tokenized_dataset = tokenized_dataset.rename_column("label", "labels")

        # Set format for PyTorch
        tokenized_dataset.set_format("torch")

    except (OSError, ValueError, KeyError, RuntimeError) as e:
        raise TrainingError(
            f"Failed to prepare training data: {e}",
            details={"num_examples": len(training_data)},
        ) from e

    # Determine if we should use fp16 (disable for MPS due to compatibility)
    use_fp16 = self.use_fp16
    if use_fp16:
        try:
            import torch

            # Disable fp16 on MPS (Apple Silicon) due to PyTorch version requirements
            if torch.backends.mps.is_available():
                import warnings

                warnings.warn(
                    "Disabling fp16 on MPS (Apple Silicon) due to compatibility. "
                    "This may slightly slow down training but will not affect accuracy.",
                    stacklevel=2,
                )
                use_fp16 = False
        except ImportError:
            use_fp16 = False

    # Training arguments
    training_args = TrainingArguments(
        output_dir=f".tmp/bert_classifier_{id(self)}",
        num_train_epochs=epochs,
        per_device_train_batch_size=batch,
        learning_rate=self.learning_rate,
        weight_decay=0.01,
        logging_dir=None,  # Suppress transformer logs
        logging_steps=50,
        save_strategy="no",  # Don't save checkpoints during training
        report_to="none",  # Disable wandb/tensorboard
        fp16=use_fp16,
        load_best_model_at_end=False,
    )

    # Initialize trainer
    trainer = Trainer(
        model=self.model,
        args=training_args,
        train_dataset=tokenized_dataset,
    )

    # Train with optional progress tracking
    use_tqdm = False
    if show_progress:
        try:
            from tqdm.auto import tqdm

            use_tqdm = True
        except ImportError:
            # tqdm not available, training will be silent
            pass

    if use_tqdm:
        # Wrap training with tqdm progress bar
        with tqdm(total=epochs, desc="Training BERT", unit="epoch") as pbar:
            # Store original train method
            original_train = trainer.train

            # Wrap train method to update progress bar
            def train_with_progress(*args_train, **kwargs_train):
                result = original_train(*args_train, **kwargs_train)
                pbar.update(epochs)
                return result

            trainer.train = train_with_progress
            trainer.train()
    else:
        # Silent training
        trainer.train()

    self.is_trained = True
predict(texts)

Predict labels for input text(s).

Parameters:

Name Type Description Default
texts str | list[str]

Single text string or list of text strings.

required

Returns:

Type Description
str | list[str]

Predicted label(s). If input is single string, returns single label.

str | list[str]

If input is list, returns list of labels.

Raises:

Type Description
TrainingError

If model is not trained yet.

Source code in src/novelentitymatcher/core/bert_classifier.py
def predict(self, texts: str | list[str]) -> str | list[str]:
    """Predict labels for input text(s).

    Args:
        texts: Single text string or list of text strings.

    Returns:
        Predicted label(s). If input is single string, returns single label.
        If input is list, returns list of labels.

    Raises:
        TrainingError: If model is not trained yet.
    """
    if not self.is_trained or self.model is None or self.tokenizer is None:
        raise TrainingError(
            "Model not trained. Call train() first.",
            details={"model_name": self.model_name},
        )

    single_input = isinstance(texts, str)
    if single_input:
        texts_list: list[str] = [texts]  # type: ignore[list-item]
    else:
        texts_list = texts  # type: ignore[assignment]

    # Tokenize
    tokenizer = self.tokenizer
    inputs = tokenizer(
        texts_list,
        padding=True,
        truncation=True,
        max_length=self.max_length,
        return_tensors="pt",
    )

    # Move to same device as model
    device = next(self.model.parameters()).device
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Predict
    with torch.no_grad():
        outputs = self.model(**inputs)
        predictions = outputs.logits.argmax(dim=-1)

    # Convert to labels
    predicted_labels = [self.id2label[pred.item()] for pred in predictions]

    if single_input:
        return predicted_labels[0]
    return predicted_labels
predict_proba(text)

Get prediction probabilities for all labels.

Parameters:

Name Type Description Default
text str

Input text string.

required

Returns:

Type Description
ndarray

NumPy array of probabilities for each label, in same order as self.labels.

Raises:

Type Description
TrainingError

If model is not trained yet.

Source code in src/novelentitymatcher/core/bert_classifier.py
def predict_proba(self, text: str) -> np.ndarray:
    """Get prediction probabilities for all labels.

    Args:
        text: Input text string.

    Returns:
        NumPy array of probabilities for each label, in same order as self.labels.

    Raises:
        TrainingError: If model is not trained yet.
    """
    if not self.is_trained or self.model is None or self.tokenizer is None:
        raise TrainingError(
            "Model not trained. Call train() first.",
            details={"model_name": self.model_name},
        )

    # Tokenize
    inputs = self.tokenizer(
        [text],
        padding=True,
        truncation=True,
        max_length=self.max_length,
        return_tensors="pt",
    )

    # Move to same device as model
    device = next(self.model.parameters()).device
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Predict with probabilities
    with torch.no_grad():
        outputs = self.model(**inputs)
        probs = torch.nn.functional.softmax(outputs.logits, dim=-1)

    return probs.cpu().numpy()[0]
save(path)

Save the trained model and tokenizer.

Parameters:

Name Type Description Default
path str

Directory path to save the model.

required

Raises:

Type Description
TrainingError

If model is not trained yet.

Source code in src/novelentitymatcher/core/bert_classifier.py
def save(self, path: str):
    """Save the trained model and tokenizer.

    Args:
        path: Directory path to save the model.

    Raises:
        TrainingError: If model is not trained yet.
    """
    if not self.is_trained or self.model is None or self.tokenizer is None:
        raise TrainingError(
            "Model not trained. Call train() first.",
            details={"model_name": self.model_name},
        )

    save_path = Path(path)
    save_path.mkdir(parents=True, exist_ok=True)

    self.model.save_pretrained(save_path)
    self.tokenizer.save_pretrained(save_path)

    # Save labels
    labels_path = save_path / "labels.txt"
    with open(labels_path, "w") as f:
        f.write("\n".join(self.labels))
load(path) classmethod

Load a trained BERTClassifier from disk.

Parameters:

Name Type Description Default
path str

Directory path containing the saved model.

required

Returns:

Type Description
BERTClassifier

Loaded BERTClassifier instance.

Source code in src/novelentitymatcher/core/bert_classifier.py
@classmethod
def load(cls, path: str) -> "BERTClassifier":
    """Load a trained BERTClassifier from disk.

    Args:
        path: Directory path containing the saved model.

    Returns:
        Loaded BERTClassifier instance.
    """
    load_path = Path(path)

    # Load labels
    labels_path = load_path / "labels.txt"
    if not labels_path.exists():
        raise FileNotFoundError(f"Labels file not found at {labels_path}")

    with open(labels_path) as f:
        labels = f.read().splitlines()

    # Initialize classifier
    clf = cls(labels=labels)

    # Load model and tokenizer
    clf.tokenizer = AutoTokenizer.from_pretrained(load_path)
    clf.model = AutoModelForSequenceClassification.from_pretrained(load_path)
    clf.is_trained = True

    return clf

EmbeddingMatcher(entities, model_name='sentence-transformers/paraphrase-mpnet-base-v2', threshold=0.7, normalize=True, embedding_dim=None, cache=None)

Embedding-based similarity matching without training.

Source code in src/novelentitymatcher/core/embedding_matcher.py
def __init__(
    self,
    entities: list[dict[str, Any]],
    model_name: str = "sentence-transformers/paraphrase-mpnet-base-v2",
    threshold: float = 0.7,
    normalize: bool = True,
    embedding_dim: int | None = None,
    cache: ModelCache | None = None,
):
    validate_entities(entities)
    validate_model_name(model_name)

    self.entities = entities
    self.model_name = model_name
    self.threshold = validate_threshold(threshold)
    self.normalize = normalize
    self.embedding_dim = embedding_dim

    self.normalizer = TextNormalizer() if normalize else None
    self.cache = cache if cache is not None else get_default_cache()
    self.model: EmbeddingModel | None = None
    self.entity_texts: list[str] = []
    self.entity_ids: list[str] = []
    self.embeddings: np.ndarray | None = None
    self._async_executor: Any | None = None

HierarchicalScoring(hierarchy_index, alpha=0.7, beta=0.3)

Calculate hierarchy-aware confidence scores.

Combines: - Semantic similarity (cosine similarity of embeddings) - Hierarchical proximity boost (based on relationship type) - Depth penalty (deeper relationships = lower scores)

Parameters:

Name Type Description Default
hierarchy_index HierarchyIndex

HierarchyIndex for graph operations

required
alpha float

Weight for semantic similarity (0-1)

0.7
beta float

Weight for hierarchical boost (0-1)

0.3
Source code in src/novelentitymatcher/core/hierarchy.py
def __init__(
    self, hierarchy_index: HierarchyIndex, alpha: float = 0.7, beta: float = 0.3
):
    """
    Initialize hierarchical scorer.

    Args:
        hierarchy_index: HierarchyIndex for graph operations
        alpha: Weight for semantic similarity (0-1)
        beta: Weight for hierarchical boost (0-1)
    """
    self.hierarchy = hierarchy_index
    self.alpha = alpha
    self.beta = beta
Functions
compute_score(query_embedding, entity_embedding, entity_id, relationship_type='self', depth=0)

Compute hierarchical score combining semantic and hierarchical features.

Formula

final_score = ( semantic_similarity * alpha + hierarchical_boost * beta ) * depth_penalty

Parameters:

Name Type Description Default
query_embedding ndarray

Query text embedding

required
entity_embedding ndarray

Entity text embedding

required
entity_id str

Entity identifier

required
relationship_type str

"self", "parent", "child", "ancestor", "descendant"

'self'
depth int

Relationship depth (0=self, 1=direct, etc.)

0

Returns:

Type Description
float

Final hierarchical score (0-1)

Source code in src/novelentitymatcher/core/hierarchy.py
def compute_score(
    self,
    query_embedding: np.ndarray,
    entity_embedding: np.ndarray,
    entity_id: str,
    relationship_type: str = "self",
    depth: int = 0,
) -> float:
    """
    Compute hierarchical score combining semantic and hierarchical features.

    Formula:
        final_score = (
            semantic_similarity * alpha +
            hierarchical_boost * beta
        ) * depth_penalty

    Args:
        query_embedding: Query text embedding
        entity_embedding: Entity text embedding
        entity_id: Entity identifier
        relationship_type: "self", "parent", "child", "ancestor", "descendant"
        depth: Relationship depth (0=self, 1=direct, etc.)

    Returns:
        Final hierarchical score (0-1)
    """
    # Compute semantic similarity
    semantic_score = self._compute_semantic_similarity(
        query_embedding, entity_embedding
    )

    # Get hierarchical boost for this relationship type
    hierarchical_boost = self._get_hierarchical_boost(relationship_type)

    # Get depth penalty
    depth_penalty = self.DEPTH_PENALTIES.get(depth, 0.4)

    # Combine scores
    final_score = (
        semantic_score * self.alpha + hierarchical_boost * self.beta
    ) * depth_penalty

    return float(final_score)

HierarchyIndex(entities)

Graph-based index for hierarchical entity relationships.

Supports: - Multi-parent hierarchies (DAG structure) - Weighted edges for relationship strength - Fast ancestor/descendant queries - Path finding and depth calculation

Parameters:

Name Type Description Default
entities list[dict[str, Any]]

List of entity dicts with optional 'hierarchy' key hierarchy format: { 'parents': ['parent_id1', 'parent_id2'], 'children': ['child_id1', 'child_id2'], 'level': int, 'weights': {'parent_id': float} }

required
Source code in src/novelentitymatcher/core/hierarchy.py
def __init__(self, entities: list[dict[str, Any]]):
    """
    Build hierarchy index from entity definitions.

    Args:
        entities: List of entity dicts with optional 'hierarchy' key
                 hierarchy format: {
                     'parents': ['parent_id1', 'parent_id2'],
                     'children': ['child_id1', 'child_id2'],
                     'level': int,
                     'weights': {'parent_id': float}
                 }
    """
    self.entities = {e["id"]: e for e in entities}
    self.graph: Any = nx.DiGraph()
    self._build_graph()
    self._cache: dict[str, Any] = {}
Functions
get_ancestors(entity_id, max_depth=None)

Get all ancestor entities for a given entity.

Parameters:

Name Type Description Default
entity_id str

Entity to find ancestors for

required
max_depth int | None

Maximum depth to traverse (None = unlimited)

None

Returns:

Type Description
list[str]

List of ancestor entity IDs

Source code in src/novelentitymatcher/core/hierarchy.py
def get_ancestors(self, entity_id: str, max_depth: int | None = None) -> list[str]:
    """
    Get all ancestor entities for a given entity.

    Args:
        entity_id: Entity to find ancestors for
        max_depth: Maximum depth to traverse (None = unlimited)

    Returns:
        List of ancestor entity IDs
    """
    return self._bfs_traverse(entity_id, max_depth, self.graph.predecessors)
get_descendants(entity_id, max_depth=None)

Get all descendant entities for a given entity.

Parameters:

Name Type Description Default
entity_id str

Entity to find descendants for

required
max_depth int | None

Maximum depth to traverse (None = unlimited)

None

Returns:

Type Description
list[str]

List of descendant entity IDs

Source code in src/novelentitymatcher/core/hierarchy.py
def get_descendants(
    self, entity_id: str, max_depth: int | None = None
) -> list[str]:
    """
    Get all descendant entities for a given entity.

    Args:
        entity_id: Entity to find descendants for
        max_depth: Maximum depth to traverse (None = unlimited)

    Returns:
        List of descendant entity IDs
    """
    return self._bfs_traverse(entity_id, max_depth, self.graph.successors)
get_relationship_depth(entity_a, entity_b)

Calculate the depth of relationship between two entities.

Parameters:

Name Type Description Default
entity_a str

First entity ID

required
entity_b str

Second entity ID

required

Returns:

Type Description
int

Depth (0 = same entity, 1 = direct parent/child, 2 = grandparent, etc.)

int

Returns -1 if no relationship found

Source code in src/novelentitymatcher/core/hierarchy.py
def get_relationship_depth(self, entity_a: str, entity_b: str) -> int:
    """
    Calculate the depth of relationship between two entities.

    Args:
        entity_a: First entity ID
        entity_b: Second entity ID

    Returns:
        Depth (0 = same entity, 1 = direct parent/child, 2 = grandparent, etc.)
        Returns -1 if no relationship found
    """
    if entity_a == entity_b:
        return 0

    if entity_a not in self.graph or entity_b not in self.graph:
        return -1

    try:
        # Try to find shortest path in the directed graph
        path = nx.shortest_path(self.graph, entity_a, entity_b)
        return len(path) - 1
    except nx.NetworkXNoPath:
        # Try reverse direction (child to parent)
        try:
            path = nx.shortest_path(self.graph, entity_b, entity_a)
            return len(path) - 1
        except nx.NetworkXNoPath:
            return -1
get_path(from_entity, to_entity)

Get shortest path between two entities in the hierarchy.

Parameters:

Name Type Description Default
from_entity str

Starting entity ID

required
to_entity str

Ending entity ID

required

Returns:

Type Description
list[str]

List of entity IDs representing the path (inclusive)

list[str]

Returns empty list if no path exists

Source code in src/novelentitymatcher/core/hierarchy.py
def get_path(self, from_entity: str, to_entity: str) -> list[str]:
    """
    Get shortest path between two entities in the hierarchy.

    Args:
        from_entity: Starting entity ID
        to_entity: Ending entity ID

    Returns:
        List of entity IDs representing the path (inclusive)
        Returns empty list if no path exists
    """
    try:
        return nx.shortest_path(self.graph, from_entity, to_entity)
    except (nx.NetworkXNoPath, nx.NodeNotFound):
        return []
is_ancestor(ancestor_id, descendant_id)

Check if ancestor_id is an ancestor of descendant_id.

Parameters:

Name Type Description Default
ancestor_id str

Potential ancestor

required
descendant_id str

Potential descendant

required

Returns:

Type Description
bool

True if ancestor_id is an ancestor of descendant_id

Source code in src/novelentitymatcher/core/hierarchy.py
def is_ancestor(self, ancestor_id: str, descendant_id: str) -> bool:
    """
    Check if ancestor_id is an ancestor of descendant_id.

    Args:
        ancestor_id: Potential ancestor
        descendant_id: Potential descendant

    Returns:
        True if ancestor_id is an ancestor of descendant_id
    """
    if ancestor_id == descendant_id:
        return False

    ancestors = self.get_ancestors(descendant_id)
    return ancestor_id in ancestors

HDBSCANBackend(min_samples=5, cluster_selection_epsilon=0.0, metric='cosine', prediction_data=True)

Bases: ClusteringBackend

HDBSCAN clustering backend.

Source code in src/novelentitymatcher/novelty/clustering/backends.py
def __init__(
    self,
    min_samples: int = 5,
    cluster_selection_epsilon: float = 0.0,
    metric: str = "cosine",
    prediction_data: bool = True,
):
    self.min_samples = min_samples
    self.cluster_selection_epsilon = cluster_selection_epsilon
    self.metric = metric
    self.prediction_data = prediction_data
    self._clusterer: Any = None

SOPTICSBackend(min_samples=5, metric='cosine')

Bases: ClusteringBackend

sOPTICS (LSH-accelerated OPTICS) clustering backend.

Source code in src/novelentitymatcher/novelty/clustering/backends.py
def __init__(
    self,
    min_samples: int = 5,
    metric: str = "cosine",
):
    self.min_samples = min_samples
    self.metric = metric

UMAPHDBSCANBackend(min_samples=5, cluster_selection_epsilon=0.0, n_neighbors=15, umap_dim=10, umap_metric='cosine', prediction_data=True)

Bases: ClusteringBackend

UMAP preprocessing followed by HDBSCAN clustering backend.

Source code in src/novelentitymatcher/novelty/clustering/backends.py
def __init__(
    self,
    min_samples: int = 5,
    cluster_selection_epsilon: float = 0.0,
    n_neighbors: int = 15,
    umap_dim: int = 10,
    umap_metric: str = "cosine",
    prediction_data: bool = True,
):
    self.min_samples = min_samples
    self.cluster_selection_epsilon = cluster_selection_epsilon
    self.n_neighbors = n_neighbors
    self.umap_dim = umap_dim
    self.umap_metric = umap_metric
    self.prediction_data = prediction_data
    self._umap_model: Any = None
    self._clusterer: Any = None

ScalableClusterer(backend='auto', min_cluster_size=5, min_samples=5, cluster_selection_epsilon=0.0, n_neighbors=15, umap_dim=10, umap_metric='cosine', prediction_data=True)

Wrapper for scalable density-based clustering.

Supports: - HDBSCAN: Standard hierarchical DBSCAN (best for <100K points) - sOPTICS: LSH-accelerated OPTICS (for 100K-1M points) - UMAP+HDBSCAN: UMAP dimensionality reduction before HDBSCAN - Auto: Automatic backend selection based on dataset size

Parameters:

Name Type Description Default
backend str

Clustering backend ('hdbscan', 'soptics', 'umap_hdbscan', 'auto')

'auto'
min_cluster_size int

Minimum points to form a cluster.

5
min_samples int

Min samples for core distance (OPTICS).

5
cluster_selection_epsilon float

Distance threshold for cluster selection.

0.0
n_neighbors int

Neighbors for UMAP (if used).

15
umap_dim int

Target dimensionality for UMAP preprocessing.

10
umap_metric str

Metric for UMAP.

'cosine'
prediction_data bool

Whether to compute prediction_data for HDBSCAN.

True
Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def __init__(
    self,
    backend: str = "auto",
    min_cluster_size: int = 5,
    min_samples: int = 5,
    cluster_selection_epsilon: float = 0.0,
    n_neighbors: int = 15,
    umap_dim: int = 10,
    umap_metric: str = "cosine",
    prediction_data: bool = True,
):
    """
    Initialize scalable clusterer.

    Args:
        backend: Clustering backend ('hdbscan', 'soptics', 'umap_hdbscan', 'auto')
        min_cluster_size: Minimum points to form a cluster.
        min_samples: Min samples for core distance (OPTICS).
        cluster_selection_epsilon: Distance threshold for cluster selection.
        n_neighbors: Neighbors for UMAP (if used).
        umap_dim: Target dimensionality for UMAP preprocessing.
        umap_metric: Metric for UMAP.
        prediction_data: Whether to compute prediction_data for HDBSCAN.
    """
    self.backend = backend
    self.min_cluster_size = min_cluster_size
    self.min_samples = min_samples
    self.cluster_selection_epsilon = cluster_selection_epsilon
    self.n_neighbors = n_neighbors
    self.umap_dim = umap_dim
    self.umap_metric = umap_metric
    self.prediction_data = prediction_data

    self._backend_instance: Any | None = None
    self._labels: np.ndarray | None = None
    self._probabilities: np.ndarray | None = None
    self._n_points: int = 0
Attributes
labels property

Get cluster labels.

probabilities property

Get cluster membership probabilities.

Functions
fit_predict(embeddings, metric='cosine')

Fit clusterer and predict labels.

Parameters:

Name Type Description Default
embeddings ndarray

Input embeddings (n_samples, dim)

required
metric str

Distance metric ('cosine', 'euclidean', 'precomputed')

'cosine'

Returns:

Type Description
tuple[ndarray, ndarray, dict[str, Any]]

Tuple of (cluster_labels, probabilities, validation_info)

Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def fit_predict(
    self,
    embeddings: np.ndarray,
    metric: str = "cosine",
) -> tuple[np.ndarray, np.ndarray, dict[str, Any]]:
    """
    Fit clusterer and predict labels.

    Args:
        embeddings: Input embeddings (n_samples, dim)
        metric: Distance metric ('cosine', 'euclidean', 'precomputed')

    Returns:
        Tuple of (cluster_labels, probabilities, validation_info)
    """
    X = np.asarray(embeddings, dtype=np.float32)
    if X.ndim != 2:
        raise ValueError(f"Expected 2D array, got {X.ndim}D")
    self._n_points = X.shape[0]

    backend_name = self.backend
    if backend_name == self.BACKEND_AUTO:
        backend_name = self._auto_backend(self._n_points)
        logger.info(
            f"Auto-selected backend: {backend_name} for {self._n_points} points"
        )

    self._backend_instance = self._create_backend(backend_name)

    labels, probabilities, backend_info = self._backend_instance.fit_predict(
        X, min_cluster_size=self.min_cluster_size, metric=metric
    )

    self._labels = labels
    self._probabilities = probabilities

    unique_clusters = sorted({int(label) for label in labels if int(label) >= 0})
    validation_info: dict[str, Any] = {
        "backend": backend_name,
        "n_points": self._n_points,
        "n_clusters": len(unique_clusters),
        "n_noise": int(np.sum(labels == -1)),
        "persistences": backend_info.get("persistences", []),
        "unique_clusters": unique_clusters,
    }

    logger.info(
        f"Clustering complete: {validation_info['n_clusters']} clusters, "
        f"{validation_info['n_noise']} noise points"
    )

    return labels, probabilities, validation_info
fit(embeddings, metric='cosine')

Fit the clusterer (alias for compatibility).

Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def fit(
    self,
    embeddings: np.ndarray,
    metric: str = "cosine",
) -> ScalableClusterer:
    """Fit the clusterer (alias for compatibility)."""
    self.fit_predict(embeddings, metric=metric)
    return self
get_cluster_members(cluster_id)

Get indices of members in a specific cluster.

Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def get_cluster_members(
    self,
    cluster_id: int,
) -> np.ndarray:
    """Get indices of members in a specific cluster."""
    if self._labels is None:
        raise RuntimeError("Clusterer must be fitted first")
    return np.where(self._labels == cluster_id)[0]
get_noise_points()

Get indices of noise points (label = -1).

Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def get_noise_points(self) -> np.ndarray:
    """Get indices of noise points (label = -1)."""
    if self._labels is None:
        raise RuntimeError("Clusterer must be fitted first")
    return np.where(self._labels == -1)[0]

ClusterValidator(min_cohesion_threshold=0.45, min_persistence_threshold=0.1)

Validates clustering results for novelty detection.

Provides metrics and validation methods to assess cluster quality and determine if samples represent novel clusters.

Parameters:

Name Type Description Default
min_cohesion_threshold float

Minimum cohesion for valid clusters

0.45
min_persistence_threshold float

Minimum persistence for valid clusters

0.1
Source code in src/novelentitymatcher/novelty/clustering/validation.py
def __init__(
    self,
    min_cohesion_threshold: float = 0.45,
    min_persistence_threshold: float = 0.1,
):
    """
    Initialize the cluster validator.

    Args:
        min_cohesion_threshold: Minimum cohesion for valid clusters
        min_persistence_threshold: Minimum persistence for valid clusters
    """
    self.min_cohesion_threshold = min_cohesion_threshold
    self.min_persistence_threshold = min_persistence_threshold
Functions
compute_cohesion(embeddings, labels, cluster_id)

Compute cluster cohesion (compactness).

Cohesion is the average pairwise similarity within a cluster.

Parameters:

Name Type Description Default
embeddings ndarray

All embeddings

required
labels ndarray

Cluster labels for each embedding

required
cluster_id int

Cluster to compute cohesion for

required

Returns:

Type Description
float

Cohesion score (0-1, higher = more compact)

Source code in src/novelentitymatcher/novelty/clustering/validation.py
def compute_cohesion(
    self,
    embeddings: np.ndarray,
    labels: np.ndarray,
    cluster_id: int,
) -> float:
    """
    Compute cluster cohesion (compactness).

    Cohesion is the average pairwise similarity within a cluster.

    Args:
        embeddings: All embeddings
        labels: Cluster labels for each embedding
        cluster_id: Cluster to compute cohesion for

    Returns:
        Cohesion score (0-1, higher = more compact)
    """
    mask = labels == cluster_id
    if mask.sum() < 2:
        return 0.0

    cluster_embeddings = embeddings[mask]

    # Compute pairwise cosine similarities
    norms = np.linalg.norm(cluster_embeddings, axis=1)
    normalized = cluster_embeddings / norms[:, np.newaxis]

    # Average pairwise similarity
    similarity_matrix = np.dot(normalized, normalized.T)
    # Exclude diagonal
    np.fill_diagonal(similarity_matrix, 0)

    cohesion = similarity_matrix.sum() / (
        similarity_matrix.size - len(cluster_embeddings)
    )

    return float(cohesion)
compute_separation(embeddings, labels, cluster_id)

Compute cluster separation (distinctiveness from other clusters).

Separation is the minimum average distance to another cluster.

Parameters:

Name Type Description Default
embeddings ndarray

All embeddings

required
labels ndarray

Cluster labels for each embedding

required
cluster_id int

Cluster to compute separation for

required

Returns:

Type Description
float

Separation score (0-1, higher = more separated)

Source code in src/novelentitymatcher/novelty/clustering/validation.py
def compute_separation(
    self,
    embeddings: np.ndarray,
    labels: np.ndarray,
    cluster_id: int,
) -> float:
    """
    Compute cluster separation (distinctiveness from other clusters).

    Separation is the minimum average distance to another cluster.

    Args:
        embeddings: All embeddings
        labels: Cluster labels for each embedding
        cluster_id: Cluster to compute separation for

    Returns:
        Separation score (0-1, higher = more separated)
    """
    mask = labels == cluster_id
    if mask.sum() == 0:
        return 0.0

    cluster_embeddings = embeddings[mask]
    cluster_center = cluster_embeddings.mean(axis=0)

    unique_clusters = np.unique(labels)
    min_distance = float("inf")

    for other_id in unique_clusters:
        if other_id == cluster_id or other_id == -1:
            continue

        other_mask = labels == other_id
        other_embeddings = embeddings[other_mask]
        other_center = other_embeddings.mean(axis=0)

        # Cosine distance
        distance = 1.0 - np.dot(cluster_center, other_center) / (
            np.linalg.norm(cluster_center) * np.linalg.norm(other_center)
        )

        min_distance = min(min_distance, distance)

    return float(min_distance if min_distance != float("inf") else 0.0)
is_valid_cluster(embeddings, labels, cluster_id, min_size=5)

Determine if a cluster is valid (stable and meaningful).

Parameters:

Name Type Description Default
embeddings ndarray

All embeddings

required
labels ndarray

Cluster labels

required
cluster_id int

Cluster to validate

required
min_size int

Minimum number of samples for valid cluster

5

Returns:

Type Description
bool

True if cluster is valid

Source code in src/novelentitymatcher/novelty/clustering/validation.py
def is_valid_cluster(
    self,
    embeddings: np.ndarray,
    labels: np.ndarray,
    cluster_id: int,
    min_size: int = 5,
) -> bool:
    """
    Determine if a cluster is valid (stable and meaningful).

    Args:
        embeddings: All embeddings
        labels: Cluster labels
        cluster_id: Cluster to validate
        min_size: Minimum number of samples for valid cluster

    Returns:
        True if cluster is valid
    """
    # Check size
    mask = labels == cluster_id
    if mask.sum() < min_size:
        return False

    # Check cohesion
    cohesion = self.compute_cohesion(embeddings, labels, cluster_id)
    if cohesion < self.min_cohesion_threshold:
        return False

    return True
get_cluster_statistics(embeddings, labels)

Compute statistics for all clusters.

Parameters:

Name Type Description Default
embeddings ndarray

All embeddings

required
labels ndarray

Cluster labels

required

Returns:

Type Description
dict[int, dict[str, float]]

Dict mapping cluster_id to statistics dict

Source code in src/novelentitymatcher/novelty/clustering/validation.py
def get_cluster_statistics(
    self,
    embeddings: np.ndarray,
    labels: np.ndarray,
) -> dict[int, dict[str, float]]:
    """
    Compute statistics for all clusters.

    Args:
        embeddings: All embeddings
        labels: Cluster labels

    Returns:
        Dict mapping cluster_id to statistics dict
    """
    unique_clusters = np.unique(labels)
    stats = {}

    for cluster_id in unique_clusters:
        if cluster_id == -1:  # Noise points
            continue

        mask = labels == cluster_id
        size = mask.sum()

        stats[cluster_id] = {
            "size": int(size),
            "cohesion": self.compute_cohesion(embeddings, labels, cluster_id),
            "separation": self.compute_separation(embeddings, labels, cluster_id),
            "is_valid": self.is_valid_cluster(embeddings, labels, cluster_id),
        }

    return stats

DetectionConfig

Bases: BaseModel

Main configuration for novelty detection.

This config specifies which strategies to use, their individual configurations, and how to combine their signals.

Attributes
strategies = Field(default_factory=(lambda: ['confidence', 'knn_distance', 'setfit_centroid'])) class-attribute instance-attribute

List of strategy IDs to use for novelty detection.

Available strategies: - confidence: Confidence threshold - knn_distance: kNN distance-based - uncertainty: Margin/entropy uncertainty - clustering: Clustering-based - self_knowledge: Sparse autoencoder - pattern: Pattern-based - oneclass: One-Class SVM - prototypical: Prototypical networks - setfit: SetFit contrastive

combine_method = Field(default='weighted') class-attribute instance-attribute

Method for combining strategy signals.

Options: - weighted: Weighted fusion of scores - union: Flag if any strategy flags - intersection: Flag if all strategies flag - voting: Flag if majority of strategies flag - meta_learner: Logistic regression meta-learner (requires training)

confidence = None class-attribute instance-attribute

Configuration for confidence strategy.

knn_distance = None class-attribute instance-attribute

Configuration for kNN distance strategy.

uncertainty = None class-attribute instance-attribute

Configuration for uncertainty strategy.

clustering = None class-attribute instance-attribute

Configuration for clustering strategy.

self_knowledge = None class-attribute instance-attribute

Configuration for self-knowledge strategy.

pattern = None class-attribute instance-attribute

Configuration for pattern strategy.

oneclass = None class-attribute instance-attribute

Configuration for One-Class SVM strategy.

prototypical = None class-attribute instance-attribute

Configuration for prototypical strategy.

setfit = None class-attribute instance-attribute

Configuration for SetFit strategy.

setfit_centroid = None class-attribute instance-attribute

Configuration for SetFit centroid distance strategy.

mahalanobis = None class-attribute instance-attribute

Configuration for Mahalanobis distance strategy.

lof = None class-attribute instance-attribute

Configuration for Local Outlier Factor strategy.

weights = None class-attribute instance-attribute

Weights for signal combination.

enable_lazy_initialization = Field(default=True) class-attribute instance-attribute

Whether to lazily initialize strategies (only when first used).

debug_mode = Field(default=False) class-attribute instance-attribute

Enable debug mode for verbose logging.

candidate_top_k = Field(default=5, ge=1) class-attribute instance-attribute

How many matcher candidates to request when collecting metadata.

allowed_maturities = Field(default_factory=(lambda: ['production', 'experimental', 'internal'])) class-attribute instance-attribute

Allowed strategy maturity levels. Strategies outside these levels are rejected during validation.

Functions
get_strategy_config(strategy_id)

Get configuration for a specific strategy.

Returns the strategy-specific config if it exists, otherwise returns a default config for that strategy.

Parameters:

Name Type Description Default
strategy_id str

The strategy identifier

required

Returns:

Type Description
Any

Strategy-specific configuration object

Source code in src/novelentitymatcher/novelty/config/base.py
def get_strategy_config(self, strategy_id: str) -> Any:
    """
    Get configuration for a specific strategy.

    Returns the strategy-specific config if it exists, otherwise
    returns a default config for that strategy.

    Args:
        strategy_id: The strategy identifier

    Returns:
        Strategy-specific configuration object
    """
    config_map = {
        "confidence": self.confidence or ConfidenceConfig(),
        "knn_distance": self.knn_distance or KNNConfig(),
        "uncertainty": self.uncertainty or UncertaintyConfig(),
        "clustering": self.clustering or ClusteringConfig(),
        "self_knowledge": self.self_knowledge or SelfKnowledgeConfig(),
        "pattern": self.pattern or PatternConfig(),
        "oneclass": self.oneclass or OneClassConfig(),
        "prototypical": self.prototypical or PrototypicalConfig(),
        "setfit": self.setfit or SetFitConfig(),
        "setfit_centroid": self.setfit_centroid or SetFitCentroidConfig(),
        "mahalanobis": self.mahalanobis or MahalanobisConfig(),
        "lof": self.lof or LOFConfig(),
    }

    return config_map.get(strategy_id)
get_weight_config()

Get the weight configuration, with defaults if not set.

Returns:

Type Description
WeightConfig

WeightConfig instance

Source code in src/novelentitymatcher/novelty/config/base.py
def get_weight_config(self) -> WeightConfig:
    """
    Get the weight configuration, with defaults if not set.

    Returns:
        WeightConfig instance
    """
    if self.weights is None:
        return WeightConfig()
    return self.weights
validate_strategies()

Validate that all configured strategies are available and allowed by maturity.

Strategies are registered at module load time via decorators. This method only validates — it does not trigger imports.

Raises:

Type Description
ValueError

If an unknown strategy is configured or maturity not allowed

Source code in src/novelentitymatcher/novelty/config/base.py
def validate_strategies(self) -> None:
    """
    Validate that all configured strategies are available and allowed by maturity.

    Strategies are registered at module load time via decorators.
    This method only validates — it does not trigger imports.

    Raises:
        ValueError: If an unknown strategy is configured or maturity not allowed
    """
    from ..core.strategies import StrategyRegistry

    for strategy_id in self.strategies:
        if not StrategyRegistry.is_registered(strategy_id):
            available = ", ".join(StrategyRegistry.list_strategies())
            raise ValueError(
                f"Unknown strategy: '{strategy_id}'. Available: {available}"
            )
        strategy_cls = StrategyRegistry.get(strategy_id)
        strategy_maturity = getattr(strategy_cls, "maturity", "experimental")
        if strategy_maturity not in self.allowed_maturities:
            raise ValueError(
                f"Strategy '{strategy_id}' has maturity '{strategy_maturity}' "
                f"which is not in allowed_maturities={self.allowed_maturities}"
            )

ClusteringConfig

Bases: BaseModel

Configuration for clustering-based strategy.

Attributes
min_cluster_size = Field(default=5, ge=1) class-attribute instance-attribute

Minimum cluster size to be considered valid.

persistence_threshold = Field(default=0.1, ge=0.0, le=1.0) class-attribute instance-attribute

Persistence threshold for cluster stability.

cohesion_threshold = Field(default=0.45, ge=0.0, le=1.0) class-attribute instance-attribute

Cohesion threshold for cluster compactness.

hdbscan_min_cluster_size = Field(default=5, ge=1) class-attribute instance-attribute

min_cluster_size parameter for HDBSCAN.

hdbscan_min_samples = Field(default=1, ge=1) class-attribute instance-attribute

min_samples parameter for HDBSCAN.

cluster_selection_epsilon = Field(default=0.0, ge=0.0) class-attribute instance-attribute

cluster_selection_epsilon for HDBSCAN.

ConfidenceConfig

Bases: BaseModel

Configuration for confidence threshold strategy.

Attributes
threshold = Field(default=0.7, ge=0.0, le=1.0) class-attribute instance-attribute

Minimum confidence threshold. Samples below this are flagged as novel.

KNNConfig

Bases: BaseModel

Configuration for kNN distance-based strategy.

Attributes
k = Field(default=20, ge=1, le=100) class-attribute instance-attribute

Number of nearest neighbors to consider.

distance_threshold = Field(default=0.55, ge=0.0, le=1.0) class-attribute instance-attribute

Threshold for kNN distance score. Samples above this are flagged.

strong_threshold = Field(default=0.85, ge=0.0, le=1.0) class-attribute instance-attribute

Strong novelty threshold for high-confidence detection.

metric = Field(default='cosine') class-attribute instance-attribute

Distance metric to use ('cosine', 'euclidean', etc.).

LOFConfig

Bases: BaseModel

Configuration for Local Outlier Factor (LOF) strategy.

Attributes
n_neighbors = Field(default=20, ge=2) class-attribute instance-attribute

Number of neighbors to use for LOF.

contamination = Field(default=0.1, gt=0.0, le=0.5) class-attribute instance-attribute

Expected proportion of outliers in the reference set.

metric = Field(default='cosine') class-attribute instance-attribute

Distance metric to use ('cosine', 'euclidean', 'manhattan', etc.).

score_threshold = Field(default=0.0) class-attribute instance-attribute

LOF score threshold. Samples below this are flagged as novel.

MahalanobisConfig

Bases: BaseModel

Configuration for Mahalanobis distance-based strategy.

Attributes
threshold = Field(default=3.0, gt=0.0) class-attribute instance-attribute

Mahalanobis distance threshold. Samples above this are flagged as novel.

regularization = Field(default=0.0001, gt=0.0) class-attribute instance-attribute

Covariance matrix regularization (ridge) for numerical stability.

use_class_conditional = Field(default=True) class-attribute instance-attribute

Whether to use per-class distributions (True) or a single global distribution (False).

calibration_mode = Field(default='none') class-attribute instance-attribute

Calibration mode: 'none' for raw threshold, 'conformal' for p-value calibration.

calibration_alpha = Field(default=0.1, gt=0.0, le=1.0) class-attribute instance-attribute

Significance level for conformal prediction. Lower = stricter.

calibration_method = Field(default='split') class-attribute instance-attribute

Conformal calibration method: 'split' or 'mondrian' (class-conditional).

calibration_set_fraction = Field(default=0.2, gt=0.0, le=0.5) class-attribute instance-attribute

Fraction of reference data held out for conformal calibration.

OneClassConfig

Bases: BaseModel

Configuration for One-Class SVM strategy.

Attributes
nu = Field(default=0.1, ge=0.0, le=1.0) class-attribute instance-attribute

Expected outlier fraction. Lower = stricter boundary.

kernel = Field(default='rbf') class-attribute instance-attribute

SVM kernel type ('rbf', 'linear', 'poly', 'sigmoid').

gamma = Field(default='scale') class-attribute instance-attribute

Kernel coefficient ('scale', 'auto', or float).

model_name = Field(default='sentence-transformers/all-MiniLM-L6-v2') class-attribute instance-attribute

Sentence transformer model name for embeddings.

PatternConfig

Bases: BaseModel

Configuration for pattern-based strategy.

Attributes
threshold = Field(default=0.5, ge=0.0, le=1.0) class-attribute instance-attribute

Novelty score threshold for pattern-based detection.

char_ngram_n = Field(default=3, ge=1, le=5) class-attribute instance-attribute

Character n-gram size for pattern extraction.

char_4gram_n = Field(default=4, ge=1, le=5) class-attribute instance-attribute

Character 4-gram size.

prefix_suffix_n = Field(default=3, ge=1, le=5) class-attribute instance-attribute

Prefix/suffix length for distribution analysis.

PrototypicalConfig

Bases: BaseModel

Configuration for prototypical networks strategy.

Attributes
distance_threshold = Field(default=0.5, ge=0.0, le=1.0) class-attribute instance-attribute

Distance threshold for novelty detection.

model_name = Field(default='sentence-transformers/all-MiniLM-L6-v2') class-attribute instance-attribute

Sentence transformer model name for embeddings.

support_samples_per_class = Field(default=5, ge=1) class-attribute instance-attribute

Number of support samples per class for prototype computation.

SelfKnowledgeConfig

Bases: BaseModel

Configuration for sparse autoencoder strategy.

Attributes
hidden_dim = Field(default=128, ge=1) class-attribute instance-attribute

Hidden dimension for the autoencoder.

threshold = Field(default=0.5, ge=0.0, le=1.0) class-attribute instance-attribute

Reconstruction error threshold for novelty detection.

epochs = Field(default=100, ge=1) class-attribute instance-attribute

Number of training epochs.

batch_size = Field(default=32, ge=1) class-attribute instance-attribute

Training batch size.

learning_rate = Field(default=0.001, gt=0.0) class-attribute instance-attribute

Learning rate for training.

SetFitConfig

Bases: BaseModel

Configuration for SetFit contrastive strategy.

Attributes
margin = Field(default=0.5, ge=0.0) class-attribute instance-attribute

Contrastive loss margin.

model_name = Field(default='sentence-transformers/all-MiniLM-L6-v2') class-attribute instance-attribute

Sentence transformer model name.

epochs = Field(default=10, ge=1) class-attribute instance-attribute

Number of training epochs.

batch_size = Field(default=16, ge=1) class-attribute instance-attribute

Training batch size.

learning_rate = Field(default=2e-05, gt=0.0) class-attribute instance-attribute

Learning rate for fine-tuning.

threshold = Field(default=0.7, ge=0.0, le=1.0) class-attribute instance-attribute

Similarity threshold for novelty detection.

UncertaintyConfig

Bases: BaseModel

Configuration for uncertainty-based strategy.

Attributes
margin_threshold = Field(default=0.3, ge=0.0, le=1.0) class-attribute instance-attribute

Margin between top predictions. Small margin = high uncertainty.

entropy_threshold = Field(default=1.5, ge=0.0) class-attribute instance-attribute

Entropy threshold for uncertainty detection.

WeightConfig

Bases: BaseModel

Weights for signal combination from different strategies.

Each strategy's contribution to the final novelty score is weighted. Weights should sum to approximately 1.0, but this is not enforced as normalization is applied during combination.

Attributes
confidence = Field(default=0.35, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for confidence threshold strategy.

uncertainty = Field(default=0.35, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for uncertainty-based strategy.

knn = Field(default=0.45, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for kNN distance-based strategy.

cluster = Field(default=0.2, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for clustering-based strategy.

self_knowledge = Field(default=0.08, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for sparse autoencoder strategy.

pattern = Field(default=0.2, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for pattern-based strategy.

oneclass = Field(default=0.1, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for One-Class SVM strategy.

prototypical = Field(default=0.02, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for prototypical networks strategy.

setfit = Field(default=0.02, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for SetFit contrastive strategy.

setfit_centroid = Field(default=0.45, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for SetFit centroid distance strategy (recommended, highest weight).

mahalanobis = Field(default=0.35, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for Mahalanobis distance strategy.

lof = Field(default=0.15, ge=0.0, le=1.0) class-attribute instance-attribute

Weight for Local Outlier Factor strategy.

adaptive = Field(default=False) class-attribute instance-attribute

Enable adaptive weight computation based on dataset characteristics.

novelty_threshold = Field(default=0.6, ge=0.0, le=1.0) class-attribute instance-attribute

Final novelty score threshold for flagging samples.

knn_gate_threshold = Field(default=0.45, ge=0.0, le=1.0) class-attribute instance-attribute

kNN gate threshold - samples above this are always considered novel.

strong_uncertainty_threshold = Field(default=0.85, ge=0.0, le=1.0) class-attribute instance-attribute

Strong uncertainty threshold - samples above this are always novel.

strong_knn_threshold = Field(default=0.85, ge=0.0, le=1.0) class-attribute instance-attribute

Strong kNN threshold - samples above this are always novel.

Functions
normalize_weights()

Normalize weights to sum to 1.0.

Returns:

Type Description
WeightConfig

A new WeightConfig with normalized weights

Source code in src/novelentitymatcher/novelty/config/weights.py
def normalize_weights(self) -> "WeightConfig":
    """
    Normalize weights to sum to 1.0.

    Returns:
        A new WeightConfig with normalized weights
    """
    strategy_weights = [
        self.confidence,
        self.uncertainty,
        self.knn,
        self.cluster,
        self.self_knowledge,
        self.pattern,
        self.oneclass,
        self.prototypical,
        self.setfit,
        self.setfit_centroid,
        self.mahalanobis,
        self.lof,
    ]

    total = sum(strategy_weights)
    if total == 0:
        return self

    factor = 1.0 / total

    return WeightConfig(
        confidence=self.confidence * factor,
        uncertainty=self.uncertainty * factor,
        knn=self.knn * factor,
        cluster=self.cluster * factor,
        self_knowledge=self.self_knowledge * factor,
        pattern=self.pattern * factor,
        oneclass=self.oneclass * factor,
        prototypical=self.prototypical * factor,
        setfit=self.setfit * factor,
        setfit_centroid=self.setfit_centroid * factor,
        mahalanobis=self.mahalanobis * factor,
        lof=self.lof * factor,
        novelty_threshold=self.novelty_threshold,
        knn_gate_threshold=self.knn_gate_threshold,
        strong_uncertainty_threshold=self.strong_uncertainty_threshold,
        strong_knn_threshold=self.strong_knn_threshold,
    )

MetadataBuilder()

Builds comprehensive reports for novelty detection results.

Aggregates information from all strategies and creates detailed reports with per-sample metrics and explanations.

Source code in src/novelentitymatcher/novelty/core/metadata.py
def __init__(self):
    """Initialize the metadata builder."""
Functions
build_report(texts, confidences, predicted_classes, novel_indices, novelty_scores, all_metrics, strategy_outputs, config)

Build a comprehensive novelty detection report.

Parameters:

Name Type Description Default
texts list[str]

Input texts

required
confidences ndarray

Prediction confidence scores

required
predicted_classes list[str]

Predicted class for each sample

required
novel_indices set[int]

Indices flagged as novel

required
novelty_scores dict[int, float]

Final novelty scores

required
all_metrics dict[int, dict[str, Any]]

All per-sample metrics

required
strategy_outputs dict[str, tuple[set[int], dict]]

Per-strategy outputs

required
config DetectionConfig

Detection configuration

required

Returns:

Type Description
NovelSampleReport

NovelSampleReport with all detection results

Source code in src/novelentitymatcher/novelty/core/metadata.py
def build_report(
    self,
    texts: list[str],
    confidences: np.ndarray,
    predicted_classes: list[str],
    novel_indices: set[int],
    novelty_scores: dict[int, float],
    all_metrics: dict[int, dict[str, Any]],
    strategy_outputs: dict[str, tuple[set[int], dict]],
    config: DetectionConfig,
) -> NovelSampleReport:
    """
    Build a comprehensive novelty detection report.

    Args:
        texts: Input texts
        confidences: Prediction confidence scores
        predicted_classes: Predicted class for each sample
        novel_indices: Indices flagged as novel
        novelty_scores: Final novelty scores
        all_metrics: All per-sample metrics
        strategy_outputs: Per-strategy outputs
        config: Detection configuration

    Returns:
        NovelSampleReport with all detection results
    """
    signal_counts: dict[str, int] = {}
    novel_samples: list[NovelSampleMetadata] = []

    for strategy_id, (flags, _) in strategy_outputs.items():
        signal_counts[strategy_id] = len(flags)

    for idx in sorted(novel_indices):
        metrics = all_metrics.get(idx, {})
        signals = {
            strategy_id: idx in flags
            for strategy_id, (flags, _) in strategy_outputs.items()
        }
        novel_samples.append(
            NovelSampleMetadata(
                text=texts[idx],
                index=idx,
                confidence=float(confidences[idx]),
                predicted_class=predicted_classes[idx],
                novelty_score=float(novelty_scores.get(idx, 0.0)),
                margin_score=metrics.get("margin_score"),
                entropy_score=metrics.get("entropy_score"),
                uncertainty_score=metrics.get("uncertainty_score"),
                knn_novelty_score=metrics.get("knn_novelty_score"),
                knn_mean_distance=metrics.get("knn_mean_distance"),
                knn_max_distance=metrics.get("knn_max_distance"),
                cluster_id=metrics.get("cluster_label"),
                cluster_support_score=metrics.get("cluster_support_score"),
                signals=signals,
                metrics=metrics,
            )
        )

    return NovelSampleReport(
        novel_samples=novel_samples,
        detection_strategies=list(strategy_outputs.keys()),
        config=config.model_dump() if hasattr(config, "model_dump") else {},
        signal_counts=signal_counts,
    )
build_summary(report)

Build a summary of the detection report.

Parameters:

Name Type Description Default
report NovelSampleReport

NovelSampleReport to summarize

required

Returns:

Type Description
dict[str, Any]

Summary dictionary with key statistics

Source code in src/novelentitymatcher/novelty/core/metadata.py
def build_summary(self, report: NovelSampleReport) -> dict[str, Any]:
    """
    Build a summary of the detection report.

    Args:
        report: NovelSampleReport to summarize

    Returns:
        Summary dictionary with key statistics
    """
    total_samples = len(report.novel_samples)
    return {
        "total_samples": total_samples,
        "novel_samples": len(report.novel_samples),
        "novel_ratio": len(report.novel_samples) / total_samples
        if total_samples
        else 0.0,
        "avg_novelty_score": np.mean(
            [
                sample.novelty_score
                for sample in report.novel_samples
                if sample.novelty_score is not None
            ]
        )
        if report.novel_samples
        else 0.0,
        "strategies_used": report.detection_strategies,
        "strategy_counts": report.signal_counts,
    }

SignalCombiner(config)

Handles signal combination from multiple strategies.

Supports several combination methods: - weighted: Weighted fusion of strategy scores - union: Flag if any strategy flags - intersection: Flag if all strategies flag - voting: Flag if majority of strategies flag

Parameters:

Name Type Description Default
config DetectionConfig

Detection configuration

required
Source code in src/novelentitymatcher/novelty/core/signal_combiner.py
def __init__(self, config: DetectionConfig):
    """
    Initialize the signal combiner.

    Args:
        config: Detection configuration
    """
    self.config = config
    self.weights: WeightConfig = config.get_weight_config()
    self.combine_method = config.combine_method
    self._meta_model: Any | None = None
    self._feature_names: list[str] = _SCORE_KEYS + _FLAG_KEYS
Functions
combine(strategy_outputs, all_metrics)

Combine strategy signals into final novelty decisions.

Parameters:

Name Type Description Default
strategy_outputs dict[str, tuple[set[int], dict]]

Dict mapping strategy_id to (flags, metrics)

required
all_metrics dict[int, dict[str, Any]]

Dict mapping sample index to all metrics

required

Returns:

Type Description
set[int]

(novel_indices, novelty_scores)

dict[int, float]
  • novel_indices: Set of indices flagged as novel
tuple[set[int], dict[int, float]]
  • novelty_scores: Dict mapping index to final novelty score
Source code in src/novelentitymatcher/novelty/core/signal_combiner.py
def combine(
    self,
    strategy_outputs: dict[str, tuple[set[int], dict]],
    all_metrics: dict[int, dict[str, Any]],
) -> tuple[set[int], dict[int, float]]:
    """
    Combine strategy signals into final novelty decisions.

    Args:
        strategy_outputs: Dict mapping strategy_id to (flags, metrics)
        all_metrics: Dict mapping sample index to all metrics

    Returns:
        (novel_indices, novelty_scores)
        - novel_indices: Set of indices flagged as novel
        - novelty_scores: Dict mapping index to final novelty score
    """
    if self.combine_method == "weighted":
        return self._weighted_combination(strategy_outputs, all_metrics)
    elif self.combine_method == "union":
        return self._union_combination(strategy_outputs)
    elif self.combine_method == "intersection":
        return self._intersection_combination(strategy_outputs)
    elif self.combine_method == "voting":
        return self._voting_combination(strategy_outputs)
    elif self.combine_method == "meta_learner":
        return self._meta_learner_combination(strategy_outputs, all_metrics)
    else:
        raise ValueError(f"Unknown combine_method: {self.combine_method}")
train_meta_learner(features, labels)

Train the logistic regression meta-learner.

Parameters:

Name Type Description Default
features ndarray

(n_samples, n_features) matrix of strategy scores

required
labels ndarray

(n_samples,) binary novelty labels (1=novel, 0=known)

required

Returns:

Type Description
float

Training accuracy

Source code in src/novelentitymatcher/novelty/core/signal_combiner.py
def train_meta_learner(
    self,
    features: np.ndarray,
    labels: np.ndarray,
) -> float:
    """
    Train the logistic regression meta-learner.

    Args:
        features: (n_samples, n_features) matrix of strategy scores
        labels: (n_samples,) binary novelty labels (1=novel, 0=known)

    Returns:
        Training accuracy
    """
    try:
        from sklearn.linear_model import LogisticRegression
    except ImportError:
        raise ImportError(
            "scikit-learn is required for meta-learner training. "
            "Install with: pip install scikit-learn"
        )

    self._meta_model = LogisticRegression(
        C=1.0,
        max_iter=1000,
        solver="lbfgs",
        class_weight="balanced",
    )
    self._meta_model.fit(features, labels)
    accuracy = float(self._meta_model.score(features, labels))
    logger.info("Meta-learner trained with accuracy=%.4f", accuracy)
    return accuracy
save_meta_learner(path)

Persist the trained meta-learner to disk.

Source code in src/novelentitymatcher/novelty/core/signal_combiner.py
def save_meta_learner(self, path: str) -> None:
    """Persist the trained meta-learner to disk."""
    if self._meta_model is None:
        raise RuntimeError("No trained meta-learner to save")

    import joblib

    p = Path(path)
    p.mkdir(parents=True, exist_ok=True)
    joblib.dump(self._meta_model, p / "meta_learner.pkl")

    meta = {
        "feature_names": _SCORE_KEYS + _FLAG_KEYS,
        "n_features": len(_SCORE_KEYS) + len(_FLAG_KEYS),
        "novelty_threshold": self.weights.novelty_threshold,
    }
    with open(p / "meta_learner_meta.json", "w") as f:
        json.dump(meta, f, indent=2)
load_meta_learner(path)

Load a trained meta-learner from disk.

Source code in src/novelentitymatcher/novelty/core/signal_combiner.py
def load_meta_learner(self, path: str) -> None:
    """Load a trained meta-learner from disk."""
    import joblib

    p = Path(path)
    self._meta_model = joblib.load(p / "meta_learner.pkl")

    with open(p / "meta_learner_meta.json") as f:
        meta = json.load(f)
    self._feature_names = meta.get("feature_names", _SCORE_KEYS + _FLAG_KEYS)
    logger.info("Meta-learner loaded from %s", path)

StrategyRegistry

Registry for novelty detection strategies.

Strategies are registered using the @StrategyRegistry.register decorator. Once registered, they can be instantiated by their strategy_id.

Functions
register(strategy_cls) classmethod

Register a strategy class.

Usage

@StrategyRegistry.register class MyStrategy(NoveltyStrategy): strategy_id = "my_strategy" ...

Parameters:

Name Type Description Default
strategy_cls type[NoveltyStrategy]

Strategy class to register

required

Returns:

Type Description
type[NoveltyStrategy]

The same strategy class (for decorator use)

Source code in src/novelentitymatcher/novelty/core/strategies.py
@classmethod
def register(cls, strategy_cls: type[NoveltyStrategy]) -> type[NoveltyStrategy]:
    """
    Register a strategy class.

    Usage:
        @StrategyRegistry.register
        class MyStrategy(NoveltyStrategy):
            strategy_id = "my_strategy"
            ...

    Args:
        strategy_cls: Strategy class to register

    Returns:
        The same strategy class (for decorator use)
    """
    if not hasattr(strategy_cls, "strategy_id"):
        raise ValueError(
            f"Strategy class {strategy_cls.__name__} must have a 'strategy_id' attribute"
        )

    strategy_id = strategy_cls.strategy_id
    if strategy_id in cls._strategies:
        raise ValueError(
            f"Strategy ID '{strategy_id}' is already registered "
            f"(existing: {cls._strategies[strategy_id].__name__}, "
            f"new: {strategy_cls.__name__})"
        )

    cls._strategies[strategy_id] = strategy_cls
    return strategy_cls
get(strategy_id) classmethod

Get a strategy class by ID.

Parameters:

Name Type Description Default
strategy_id str

Unique strategy identifier

required

Returns:

Type Description
type[NoveltyStrategy]

Strategy class

Raises:

Type Description
ValueError

If strategy_id is not registered

Source code in src/novelentitymatcher/novelty/core/strategies.py
@classmethod
def get(cls, strategy_id: str) -> type[NoveltyStrategy]:
    """
    Get a strategy class by ID.

    Args:
        strategy_id: Unique strategy identifier

    Returns:
        Strategy class

    Raises:
        ValueError: If strategy_id is not registered
    """
    if strategy_id not in cls._strategies:
        available = ", ".join(cls.list_strategies())
        raise ValueError(
            f"Unknown strategy: '{strategy_id}'. Available strategies: {available}"
        )
    return cls._strategies[strategy_id]
create(strategy_id) classmethod

Create an instance of a strategy.

Parameters:

Name Type Description Default
strategy_id str

Unique strategy identifier

required

Returns:

Type Description
NoveltyStrategy

Instantiated strategy object

Source code in src/novelentitymatcher/novelty/core/strategies.py
@classmethod
def create(cls, strategy_id: str) -> NoveltyStrategy:
    """
    Create an instance of a strategy.

    Args:
        strategy_id: Unique strategy identifier

    Returns:
        Instantiated strategy object
    """
    strategy_cls = cls.get(strategy_id)
    return strategy_cls()
list_strategies(maturity=None) classmethod

List all registered strategy IDs, optionally filtered by maturity.

Parameters:

Name Type Description Default
maturity str | None

Optional maturity filter ("production", "experimental", "internal").

None

Returns:

Type Description
list[str]

List of strategy IDs in registration order

Source code in src/novelentitymatcher/novelty/core/strategies.py
@classmethod
def list_strategies(cls, maturity: str | None = None) -> list[str]:
    """
    List all registered strategy IDs, optionally filtered by maturity.

    Args:
        maturity: Optional maturity filter ("production", "experimental", "internal").

    Returns:
        List of strategy IDs in registration order
    """
    if maturity is None:
        return list(cls._strategies.keys())
    return [
        sid
        for sid, scls in cls._strategies.items()
        if getattr(scls, "maturity", "experimental") == maturity
    ]
is_registered(strategy_id) classmethod

Check if a strategy is registered.

Parameters:

Name Type Description Default
strategy_id str

Strategy identifier to check

required

Returns:

Type Description
bool

True if strategy is registered

Source code in src/novelentitymatcher/novelty/core/strategies.py
@classmethod
def is_registered(cls, strategy_id: str) -> bool:
    """
    Check if a strategy is registered.

    Args:
        strategy_id: Strategy identifier to check

    Returns:
        True if strategy is registered
    """
    return strategy_id in cls._strategies
unregister(strategy_id) classmethod

Unregister a strategy.

This is primarily useful for testing.

Parameters:

Name Type Description Default
strategy_id str

Strategy identifier to unregister

required

Raises:

Type Description
ValueError

If strategy_id is not registered

Source code in src/novelentitymatcher/novelty/core/strategies.py
@classmethod
def unregister(cls, strategy_id: str) -> None:
    """
    Unregister a strategy.

    This is primarily useful for testing.

    Args:
        strategy_id: Strategy identifier to unregister

    Raises:
        ValueError: If strategy_id is not registered
    """
    if strategy_id not in cls._strategies:
        raise ValueError(f"Cannot unregister unknown strategy: '{strategy_id}'")
    del cls._strategies[strategy_id]
clear() classmethod

Clear all registered strategies.

This is primarily useful for testing.

Source code in src/novelentitymatcher/novelty/core/strategies.py
@classmethod
def clear(cls) -> None:
    """
    Clear all registered strategies.

    This is primarily useful for testing.
    """
    cls._strategies.clear()

NovelEntityMatchResult(id, score, is_match, is_novel, novel_score=None, match_method='accepted_known', alternatives=list(), signals=dict(), predicted_id=None, metadata=dict()) dataclass

Operational result for a single novelty-aware match decision.

NoveltyEvaluator(mode='benchmark', metrics=None)

Unified evaluator for novelty detection.

Supports two modes: - benchmark: Quick evaluation on OOD splits with core metrics - research: Comprehensive evaluation with confusion matrices and threshold sweeping

Metrics computed: - AUROC, AUPRC - Detection rates at 1%, 5%, 10% FPR - Precision, Recall, F1 at optimal threshold

Parameters:

Name Type Description Default
mode Literal['benchmark', 'research']

Evaluation mode ('benchmark' or 'research')

'benchmark'
metrics list[str] | None

List of metrics to compute (None for default based on mode)

None
Source code in src/novelentitymatcher/novelty/evaluation/evaluator.py
def __init__(
    self,
    mode: Literal["benchmark", "research"] = "benchmark",
    metrics: list[str] | None = None,
):
    """
    Initialize the evaluator.

    Args:
        mode: Evaluation mode ('benchmark' or 'research')
        metrics: List of metrics to compute (None for default based on mode)
    """
    self.mode = mode
    self.metrics = metrics or self._default_metrics_for_mode(mode)
Functions
evaluate(novelty_scores, is_novel_true, threshold=None)

Evaluate novelty detection performance.

Parameters:

Name Type Description Default
novelty_scores ndarray

Predicted novelty scores (higher = more novel)

required
is_novel_true ndarray

Ground truth novelty labels (True = novel)

required
threshold float | None

Optional threshold for discrete predictions

None

Returns:

Type Description
dict[str, float]

Dictionary of metric name -> value

Source code in src/novelentitymatcher/novelty/evaluation/evaluator.py
def evaluate(
    self,
    novelty_scores: np.ndarray,
    is_novel_true: np.ndarray,
    threshold: float | None = None,
) -> dict[str, float]:
    """
    Evaluate novelty detection performance.

    Args:
        novelty_scores: Predicted novelty scores (higher = more novel)
        is_novel_true: Ground truth novelty labels (True = novel)
        threshold: Optional threshold for discrete predictions

    Returns:
        Dictionary of metric name -> value
    """
    scores = np.asarray(novelty_scores)
    labels = np.asarray(is_novel_true, dtype=bool)

    results = {}

    # AUROC and AUPRC
    if "auroc" in self.metrics:
        results["auroc"] = compute_auroc(scores, labels)

    if "auprc" in self.metrics:
        results["auprc"] = compute_auprc(scores, labels)

    # Detection rates at various FPR thresholds
    if any(m.startswith("detection_rate_") for m in self.metrics):
        dr_metrics = [m for m in self.metrics if m.startswith("detection_rate_")]
        fpr_thresholds = []
        for m in dr_metrics:
            if m == "detection_rate_1":
                fpr_thresholds.append(0.01)
            elif m == "detection_rate_5":
                fpr_thresholds.append(0.05)
            elif m == "detection_rate_10":
                fpr_thresholds.append(0.10)

        if fpr_thresholds:
            detection_rates = compute_detection_rates(
                scores, labels, tuple(fpr_thresholds)
            )
            results.update(detection_rates)

    # Precision, Recall, F1
    if any(m in ["precision", "recall", "f1"] for m in self.metrics):
        prf_results = compute_precision_recall_f1(scores, labels, threshold)
        if "precision" in self.metrics:
            results["precision"] = prf_results["precision"]
        if "recall" in self.metrics:
            results["recall"] = prf_results["recall"]
        if "f1" in self.metrics:
            results["f1"] = prf_results["f1"]
        results["optimal_threshold"] = prf_results["threshold"]

    return results
create_report(novelty_scores, is_novel_true, threshold=None)

Create a comprehensive evaluation report.

Parameters:

Name Type Description Default
novelty_scores ndarray

Predicted novelty scores (higher = more novel)

required
is_novel_true ndarray

Ground truth novelty labels (True = novel)

required
threshold float | None

Optional threshold for discrete predictions

None

Returns:

Type Description
EvaluationReport

EvaluationReport with all metrics

Source code in src/novelentitymatcher/novelty/evaluation/evaluator.py
def create_report(
    self,
    novelty_scores: np.ndarray,
    is_novel_true: np.ndarray,
    threshold: float | None = None,
) -> EvaluationReport:
    """
    Create a comprehensive evaluation report.

    Args:
        novelty_scores: Predicted novelty scores (higher = more novel)
        is_novel_true: Ground truth novelty labels (True = novel)
        threshold: Optional threshold for discrete predictions

    Returns:
        EvaluationReport with all metrics
    """
    scores = np.asarray(novelty_scores)
    labels = np.asarray(is_novel_true, dtype=bool)

    # Compute all metrics
    auroc = compute_auroc(scores, labels)
    auprc = compute_auprc(scores, labels)

    detection_rates = compute_detection_rates(scores, labels)
    dr_at_1 = detection_rates.get("detection_rate_1", 0.0)
    dr_at_5 = detection_rates.get("detection_rate_5", 0.0)
    dr_at_10 = detection_rates.get("detection_rate_10", 0.0)

    prf_results = compute_precision_recall_f1(scores, labels, threshold)
    optimal_threshold = prf_results["threshold"]

    # Confusion matrix
    cm = compute_confusion_matrix(scores, labels, optimal_threshold)

    return EvaluationReport(
        auroc=auroc,
        auprc=auprc,
        detection_rate_at_1=dr_at_1,
        detection_rate_at_5=dr_at_5,
        detection_rate_at_10=dr_at_10,
        precision=prf_results["precision"],
        recall=prf_results["recall"],
        f1=prf_results["f1"],
        optimal_threshold=optimal_threshold,
        confusion_matrix=cm,
        num_samples=len(scores),
        num_novel=int(np.sum(labels)),
        timestamp=datetime.now().isoformat(),
    )
sweep_thresholds(novelty_scores, is_novel_true, num_thresholds=100)

Sweep across thresholds and compute metrics at each.

Parameters:

Name Type Description Default
novelty_scores ndarray

Predicted novelty scores (higher = more novel)

required
is_novel_true ndarray

Ground truth novelty labels (True = novel)

required
num_thresholds int

Number of thresholds to evaluate

100

Returns:

Type Description
dict[str, ndarray]

Dict with arrays for thresholds and metrics

Source code in src/novelentitymatcher/novelty/evaluation/evaluator.py
def sweep_thresholds(
    self,
    novelty_scores: np.ndarray,
    is_novel_true: np.ndarray,
    num_thresholds: int = 100,
) -> dict[str, np.ndarray]:
    """
    Sweep across thresholds and compute metrics at each.

    Args:
        novelty_scores: Predicted novelty scores (higher = more novel)
        is_novel_true: Ground truth novelty labels (True = novel)
        num_thresholds: Number of thresholds to evaluate

    Returns:
        Dict with arrays for thresholds and metrics
    """
    from .metrics import sweep_thresholds

    thresholds = np.linspace(0, 1, num_thresholds)
    return sweep_thresholds(novelty_scores, is_novel_true, thresholds)
compare_thresholds(novelty_scores, is_novel_true, thresholds)

Compare metrics at specific thresholds.

Parameters:

Name Type Description Default
novelty_scores ndarray

Predicted novelty scores (higher = more novel)

required
is_novel_true ndarray

Ground truth novelty labels (True = novel)

required
thresholds list[float]

List of thresholds to evaluate

required

Returns:

Type Description
list[dict[str, float]]

List of dicts with metrics at each threshold

Source code in src/novelentitymatcher/novelty/evaluation/evaluator.py
def compare_thresholds(
    self,
    novelty_scores: np.ndarray,
    is_novel_true: np.ndarray,
    thresholds: list[float],
) -> list[dict[str, float]]:
    """
    Compare metrics at specific thresholds.

    Args:
        novelty_scores: Predicted novelty scores (higher = more novel)
        is_novel_true: Ground truth novelty labels (True = novel)
        thresholds: List of thresholds to evaluate

    Returns:
        List of dicts with metrics at each threshold
    """
    results = []
    for thresh in thresholds:
        metrics = self.evaluate(novelty_scores, is_novel_true, threshold=thresh)
        metrics["threshold"] = thresh
        results.append(metrics)
    return results

GradualNoveltySplitter(known_ratios=None, random_state=42)

Creates multiple splits with gradually increasing novelty.

Useful for testing how novelty detection performance degrades as the number of novel classes increases.

Parameters:

Name Type Description Default
known_ratios list[float] | None

List of known ratios to create splits for

None
random_state int

Random seed for reproducibility

42
Source code in src/novelentitymatcher/novelty/evaluation/splitters.py
def __init__(
    self,
    known_ratios: list[float] | None = None,
    random_state: int = 42,
):
    """
    Initialize gradual novelty splitter.

    Args:
        known_ratios: List of known ratios to create splits for
        random_state: Random seed for reproducibility
    """
    self.known_ratios = known_ratios or [0.95, 0.9, 0.8, 0.7, 0.5]
    self.random_state = random_state
Functions
create_splits(texts, labels)

Create multiple splits with different novelty levels.

Parameters:

Name Type Description Default
texts list[str]

List of input texts

required
labels list[str]

List of corresponding labels

required

Returns:

Type Description
list[dict[str, Any]]

List of split dictionaries, one per known_ratio

Source code in src/novelentitymatcher/novelty/evaluation/splitters.py
def create_splits(
    self,
    texts: list[str],
    labels: list[str],
) -> list[dict[str, Any]]:
    """
    Create multiple splits with different novelty levels.

    Args:
        texts: List of input texts
        labels: List of corresponding labels

    Returns:
        List of split dictionaries, one per known_ratio
    """
    splits = []

    for ratio in self.known_ratios:
        splitter = OODSplitter(known_ratio=ratio, random_state=self.random_state)
        split_data = splitter.create_split_with_indices(texts, labels)
        split_data["known_ratio"] = ratio
        splits.append(split_data)

    return splits
get_novelty_progression(texts, labels)

Get summary of novelty progression across splits.

Parameters:

Name Type Description Default
texts list[str]

List of input texts

required
labels list[str]

List of corresponding labels

required

Returns:

Type Description
dict[str, list]

Dict with arrays for known_ratio, n_known, n_novel

Source code in src/novelentitymatcher/novelty/evaluation/splitters.py
def get_novelty_progression(
    self,
    texts: list[str],
    labels: list[str],
) -> dict[str, list]:
    """
    Get summary of novelty progression across splits.

    Args:
        texts: List of input texts
        labels: List of corresponding labels

    Returns:
        Dict with arrays for known_ratio, n_known, n_novel
    """
    splits = self.create_splits(texts, labels)

    return {
        "known_ratios": [s["known_ratio"] for s in splits],
        "n_known": [s["n_known"] for s in splits],
        "n_novel": [s["n_novel"] for s in splits],
        "n_train": [s["n_train"] for s in splits],
        "n_test": [s["n_test"] for s in splits],
    }

OODSplitter(known_ratio=0.8, random_state=42)

Creates OOD (Out-of-Distribution) splits for novelty detection evaluation.

Splits data into known classes and unknown/novel classes to simulate the novelty detection scenario.

Parameters:

Name Type Description Default
known_ratio float

Fraction of classes to keep as known (0-1)

0.8
random_state int

Random seed for reproducibility

42
Source code in src/novelentitymatcher/novelty/evaluation/splitters.py
def __init__(
    self,
    known_ratio: float = 0.8,
    random_state: int = 42,
):
    """
    Initialize OOD splitter.

    Args:
        known_ratio: Fraction of classes to keep as known (0-1)
        random_state: Random seed for reproducibility
    """
    self.known_ratio = known_ratio
    self.random_state = random_state
Functions
create_split(texts, labels)

Create OOD train/test split.

Parameters:

Name Type Description Default
texts list[str]

List of input texts

required
labels list[str]

List of corresponding labels

required

Returns:

Type Description
list[str]

Tuple of (train_texts, train_labels, test_texts, test_is_novel)

list[str]
  • test_is_novel: True for novel (previously unknown) classes
Source code in src/novelentitymatcher/novelty/evaluation/splitters.py
def create_split(
    self,
    texts: list[str],
    labels: list[str],
) -> tuple[list[str], list[str], list[str], list[bool]]:
    """
    Create OOD train/test split.

    Args:
        texts: List of input texts
        labels: List of corresponding labels

    Returns:
        Tuple of (train_texts, train_labels, test_texts, test_is_novel)
        - test_is_novel: True for novel (previously unknown) classes
    """
    np.random.seed(self.random_state)

    unique_labels = sorted(set(labels))
    n_classes = len(unique_labels)
    n_known = max(1, int(n_classes * self.known_ratio))

    known_classes = set(np.random.choice(unique_labels, n_known, replace=False))

    train_texts = []
    train_labels = []
    test_texts = []
    test_is_novel = []

    for text, label in zip(texts, labels, strict=False):
        if label in known_classes:
            train_texts.append(text)
            train_labels.append(label)
        else:
            test_texts.append(text)
            test_is_novel.append(True)

    return train_texts, train_labels, test_texts, test_is_novel
create_split_with_indices(texts, labels)

Create OOD split with additional metadata.

Parameters:

Name Type Description Default
texts list[str]

List of input texts

required
labels list[str]

List of corresponding labels

required

Returns:

Type Description
dict[str, Any]

Dict with split data and metadata

Source code in src/novelentitymatcher/novelty/evaluation/splitters.py
def create_split_with_indices(
    self,
    texts: list[str],
    labels: list[str],
) -> dict[str, Any]:
    """
    Create OOD split with additional metadata.

    Args:
        texts: List of input texts
        labels: List of corresponding labels

    Returns:
        Dict with split data and metadata
    """
    train_texts, train_labels, test_texts, test_is_novel = self.create_split(
        texts, labels
    )

    unique_labels = sorted(set(labels))
    known_classes = sorted(set(train_labels))
    novel_classes = sorted(set(unique_labels) - set(known_classes))

    return {
        "train_texts": train_texts,
        "train_labels": train_labels,
        "test_texts": test_texts,
        "test_is_novel": test_is_novel,
        "known_classes": known_classes,
        "novel_classes": novel_classes,
        "n_known": len(known_classes),
        "n_novel": len(novel_classes),
        "n_train": len(train_texts),
        "n_test": len(test_texts),
    }

BGERetriever(model_name='BAAI/bge-m3', device=None, batch_size=32)

BGE-M3 style dense retriever for examples.

Simple wrapper that uses sentence-transformers for dense retrieval of in-context examples.

Parameters:

Name Type Description Default
model_name str

Model name for sentence-transformers

'BAAI/bge-m3'
device str | None

Device to use ("cuda", "cpu", or None for auto)

None
batch_size int

Batch size for encoding

32
Source code in src/novelentitymatcher/novelty/proposal/retrieval.py
def __init__(
    self,
    model_name: str = "BAAI/bge-m3",
    device: str | None = None,
    batch_size: int = 32,
):
    """
    Initialize BGE retriever.

    Args:
        model_name: Model name for sentence-transformers
        device: Device to use ("cuda", "cpu", or None for auto)
        batch_size: Batch size for encoding
    """
    self.model_name = model_name
    self.device = device
    self.batch_size = batch_size
    self._model: Any | None = None
    self._is_initialized = False
Functions
encode(texts, batch_size=None)

Encode texts to embeddings.

Parameters:

Name Type Description Default
texts list[str]

List of texts to encode

required
batch_size int | None

Override batch size

None

Returns:

Type Description
Any

numpy array of embeddings (n, dim)

Source code in src/novelentitymatcher/novelty/proposal/retrieval.py
def encode(
    self,
    texts: list[str],
    batch_size: int | None = None,
) -> Any:
    """
    Encode texts to embeddings.

    Args:
        texts: List of texts to encode
        batch_size: Override batch size

    Returns:
        numpy array of embeddings (n, dim)
    """
    self._initialize()

    batch_size = batch_size or self.batch_size
    assert self._model is not None, "Model should be initialized"
    embeddings = self._model.encode(
        texts,
        batch_size=batch_size,
        show_progress_bar=False,
        normalize_embeddings=True,
    )
    return embeddings
similarity(query_embeddings, corpus_embeddings)

Compute similarity between query and corpus.

Parameters:

Name Type Description Default
query_embeddings Any

Query embeddings (n, dim)

required
corpus_embeddings Any

Corpus embeddings (m, dim)

required

Returns:

Type Description
ndarray

Similarity matrix (n, m)

Source code in src/novelentitymatcher/novelty/proposal/retrieval.py
def similarity(
    self,
    query_embeddings: Any,
    corpus_embeddings: Any,
) -> np.ndarray:
    """
    Compute similarity between query and corpus.

    Args:
        query_embeddings: Query embeddings (n, dim)
        corpus_embeddings: Corpus embeddings (m, dim)

    Returns:
        Similarity matrix (n, m)
    """
    from sklearn.metrics.pairwise import cosine_similarity

    return cosine_similarity(query_embeddings, corpus_embeddings)

RetrievalAugmentedProposer(retriever=None, llm_proposer=None, k_examples=5, k_novel_per_class=3, retrieval_metric='cosine', rerank=False)

LLM class proposer enhanced with retrieval-based in-context examples.

Retrieves most relevant examples from a corpus to include in the LLM prompt, improving class naming quality.

Parameters:

Name Type Description Default
retriever EmbeddingBackend | None

Embedding backend for retrieval (e.g., BGE-M3)

None
llm_proposer Any | None

Existing LLMClassProposer to enhance

None
k_examples int

Number of in-context examples to retrieve

5
k_novel_per_class int

Number of novel examples per proposed class

3
retrieval_metric str

Similarity metric for retrieval

'cosine'
rerank bool

Whether to use reranking for better examples

False
Source code in src/novelentitymatcher/novelty/proposal/retrieval.py
def __init__(
    self,
    retriever: EmbeddingBackend | None = None,
    llm_proposer: Any | None = None,
    k_examples: int = 5,
    k_novel_per_class: int = 3,
    retrieval_metric: str = "cosine",
    rerank: bool = False,
):
    """
    Initialize retrieval-augmented proposer.

    Args:
        retriever: Embedding backend for retrieval (e.g., BGE-M3)
        llm_proposer: Existing LLMClassProposer to enhance
        k_examples: Number of in-context examples to retrieve
        k_novel_per_class: Number of novel examples per proposed class
        retrieval_metric: Similarity metric for retrieval
        rerank: Whether to use reranking for better examples
    """
    self.retriever = retriever
    self.llm_proposer = llm_proposer
    self.k_examples = k_examples
    self.k_novel_per_class = k_novel_per_class
    self.retrieval_metric = retrieval_metric
    self.rerank = rerank

    self._example_corpus: list[str] = []
    self._example_embeddings: Any | None = None
    self._is_indexed: bool = False
Attributes
is_ready property

Check if proposer is ready for use.

Functions
index_examples(examples, embeddings=None)

Index examples for retrieval.

Parameters:

Name Type Description Default
examples list[str]

List of example texts to index

required
embeddings Any | None

Pre-computed embeddings (if None, will compute)

None
Source code in src/novelentitymatcher/novelty/proposal/retrieval.py
def index_examples(
    self,
    examples: list[str],
    embeddings: Any | None = None,
) -> None:
    """
    Index examples for retrieval.

    Args:
        examples: List of example texts to index
        embeddings: Pre-computed embeddings (if None, will compute)
    """
    self._example_corpus = examples

    if embeddings is not None:
        self._example_embeddings = embeddings
    elif self.retriever is not None:
        self._example_embeddings = self.retriever.encode(examples)

    self._is_indexed = True
    logger.info(f"Indexed {len(examples)} examples for retrieval")
retrieve(query, k=None)

Retrieve k most relevant examples for a query.

Parameters:

Name Type Description Default
query str

Query text

required
k int | None

Number of examples to retrieve (default: k_examples)

None

Returns:

Type Description
list[dict[str, Any]]

List of dicts with 'text', 'score', 'index'

Source code in src/novelentitymatcher/novelty/proposal/retrieval.py
def retrieve(
    self,
    query: str,
    k: int | None = None,
) -> list[dict[str, Any]]:
    """
    Retrieve k most relevant examples for a query.

    Args:
        query: Query text
        k: Number of examples to retrieve (default: k_examples)

    Returns:
        List of dicts with 'text', 'score', 'index'
    """
    if not self._is_indexed:
        raise RuntimeError("Must call index_examples() before retrieve()")

    k = k or self.k_examples

    if self.retriever is None:
        logger.warning("No retriever available, returning empty results")
        return []

    query_embedding = self.retriever.encode([query])

    from sklearn.metrics.pairwise import cosine_similarity

    similarities = cosine_similarity(
        query_embedding,
        self._example_embeddings,
    )[0]

    top_indices = sorted(
        range(len(similarities)),
        key=lambda i: similarities[i],
        reverse=True,
    )[:k]

    results = [
        {
            "text": self._example_corpus[idx],
            "score": float(similarities[idx]),
            "index": int(idx),
        }
        for idx in top_indices
    ]

    return results
retrieve_by_class(class_name, novel_samples, existing_classes)

Retrieve examples relevant to a proposed class.

Parameters:

Name Type Description Default
class_name str

Proposed class name

required
novel_samples list[Any]

Novel samples to find examples for

required
existing_classes list[str]

List of existing class names

required

Returns:

Type Description
dict[str, Any]

Dict with retrieved examples and metadata

Source code in src/novelentitymatcher/novelty/proposal/retrieval.py
def retrieve_by_class(
    self,
    class_name: str,
    novel_samples: list[Any],
    existing_classes: list[str],
) -> dict[str, Any]:
    """
    Retrieve examples relevant to a proposed class.

    Args:
        class_name: Proposed class name
        novel_samples: Novel samples to find examples for
        existing_classes: List of existing class names

    Returns:
        Dict with retrieved examples and metadata
    """
    if not novel_samples:
        return {"examples": [], "class_name": class_name}

    texts = [s.text if hasattr(s, "text") else str(s) for s in novel_samples]
    query = f"{class_name}: {', '.join(texts[:3])}"

    retrieved = self.retrieve(query, k=self.k_novel_per_class)

    return {
        "class_name": class_name,
        "examples": retrieved,
        "query": query,
    }
build_prompt(novel_samples, existing_classes, context=None, use_retrieval=True)

Build prompt for LLM class proposal with retrieval.

Parameters:

Name Type Description Default
novel_samples list[Any]

Novel samples to propose classes for

required
existing_classes list[str]

List of existing class names

required
context str | None

Optional domain context

None
use_retrieval bool

Whether to include retrieved examples

True

Returns:

Type Description
str

Formatted prompt string

Source code in src/novelentitymatcher/novelty/proposal/retrieval.py
    def build_prompt(
        self,
        novel_samples: list[Any],
        existing_classes: list[str],
        context: str | None = None,
        use_retrieval: bool = True,
    ) -> str:
        """
        Build prompt for LLM class proposal with retrieval.

        Args:
            novel_samples: Novel samples to propose classes for
            existing_classes: List of existing class names
            context: Optional domain context
            use_retrieval: Whether to include retrieved examples

        Returns:
            Formatted prompt string
        """
        sample_texts = [
            f"- {s.text if hasattr(s, 'text') else str(s)}" for s in novel_samples[:20]
        ]
        if len(novel_samples) > 20:
            sample_texts.append(f"... and {len(novel_samples) - 20} more samples")

        samples_section = "\n".join(sample_texts)

        existing_section = ", ".join(existing_classes) if existing_classes else "None"

        context_section = f"\n\nDomain Context: {context}" if context else ""

        retrieval_section = ""
        if use_retrieval and self._is_indexed and self.retriever:
            retrieved_examples = []
            for sample in novel_samples[:5]:
                text = sample.text if hasattr(sample, "text") else str(sample)
                results = self.retrieve(text, k=2)
                for r in results:
                    retrieved_examples.append(
                        f'- Example: "{r["text"]}" (relevance: {r["score"]:.2f})'
                    )

            if retrieved_examples:
                retrieval_section = "\n\nRetrieved relevant examples:\n" + "\n".join(
                    retrieved_examples[:10]
                )

        prompt = f"""You are analyzing text samples that don't fit well into existing categories.

Existing Classes: {existing_section}{context_section}{retrieval_section}

Novel Samples (detected as not fitting existing classes):
{samples_section}

Your task is to:
1. Analyze these samples to identify meaningful new categories
2. Propose concise, descriptive class names
3. Provide justifications for each proposal
4. Identify samples that should be rejected as noise

IMPORTANT RESPONSE FORMAT:
You must respond with a valid JSON object matching this schema:
{{
  "proposed_classes": [
    {{
      "name": "class name (2-4 words)",
      "description": "clear description of what this class represents",
      "confidence": 0.0-1.0,
      "sample_count": number of samples fitting this class,
      "example_samples": ["sample1", "sample2", "sample3"],
      "justification": "why this class makes sense",
      "suggested_parent": null or "parent class name if hierarchical"
    }}
  ],
  "rejected_as_noise": ["sample text to reject"],
  "analysis_summary": "brief summary of your analysis",
  "cluster_count": number of distinct clusters found
}}

Guidelines:
- Class names should be concise (2-4 words), descriptive
- Confidence should reflect how clearly the samples form a coherent category
- Only propose classes with at least 3 supporting samples
- Reject samples that appear to be noise, errors, or too diverse
- Return "proposed_classes": [] if no coherent new class should be created
- Consider hierarchical relationships if relevant to the domain

Provide your analysis as a JSON object:"""

        return prompt
propose_classes(novel_samples, existing_classes, context=None)

Propose new classes with retrieval-augmented prompting.

Parameters:

Name Type Description Default
novel_samples list[Any]

Novel samples to propose classes for

required
existing_classes list[str]

List of existing class names

required
context str | None

Optional domain context

None

Returns:

Type Description
Any | None

NovelClassAnalysis from LLM or None if unavailable

Source code in src/novelentitymatcher/novelty/proposal/retrieval.py
def propose_classes(
    self,
    novel_samples: list[Any],
    existing_classes: list[str],
    context: str | None = None,
) -> Any | None:
    """
    Propose new classes with retrieval-augmented prompting.

    Args:
        novel_samples: Novel samples to propose classes for
        existing_classes: List of existing class names
        context: Optional domain context

    Returns:
        NovelClassAnalysis from LLM or None if unavailable
    """
    if not self.llm_proposer:
        logger.warning("No LLM proposer configured")
        return None

    prompt = self.build_prompt(
        novel_samples=novel_samples,
        existing_classes=existing_classes,
        context=context,
        use_retrieval=True,
    )

    try:
        response, model_used = self._call_llm_with_fallback(prompt)
        analysis = self._parse_response(response, model_used)
        return analysis
    except (ValueError, TypeError, ConnectionError, RuntimeError) as e:
        logger.error(f"LLM proposal failed: {e}")
        return None

ClassProposal

Bases: BaseModel

A proposed class for a cluster of novel samples.

ClusterEvidence

Bases: BaseModel

Compact statistical evidence extracted for a cluster.

DiscoveryCluster

Bases: BaseModel

Community of likely novel samples discovered in a batch.

NovelClassAnalysis

Bases: BaseModel

Class proposals generated from a novelty discovery run.

NovelClassDiscoveryReport

Bases: BaseModel

End-to-end report for novelty detection and optional proposal generation.

NovelSampleMetadata

Bases: BaseModel

Metadata for a single sample flagged as novel.

NovelSampleReport

Bases: BaseModel

Novel samples found during a detection run.

ProposalReviewRecord

Bases: BaseModel

Lifecycle-aware review record for a proposed class.

DetectionReport(novelty_report, strategies_used, runtime_seconds, timestamp, additional_info=dict()) dataclass

Report from a complete detection run.

Contains the NovelSampleReport plus additional metadata about the detection run (timing, strategy performance, etc.).

Attributes
novelty_report instance-attribute

The core novelty detection report.

strategies_used instance-attribute

List of strategies that were used.

runtime_seconds instance-attribute

Time taken for detection in seconds.

timestamp instance-attribute

ISO timestamp of when detection was run.

additional_info = field(default_factory=dict) class-attribute instance-attribute

Any additional information to include in the report.

EvaluationReport(auroc, auprc, detection_rate_at_1, detection_rate_at_5, detection_rate_at_10, precision, recall, f1, optimal_threshold, confusion_matrix=None, per_class_metrics=None, num_samples=0, num_novel=0, timestamp='') dataclass

Report from evaluating novelty detection.

Contains metrics from evaluating on a labeled dataset.

Attributes
auroc instance-attribute

Area under ROC curve.

auprc instance-attribute

Area under Precision-Recall curve.

detection_rate_at_1 instance-attribute

Detection rate at 1% false positive rate.

detection_rate_at_5 instance-attribute

Detection rate at 5% false positive rate.

detection_rate_at_10 instance-attribute

Detection rate at 10% false positive rate.

precision instance-attribute

Precision at optimal threshold.

recall instance-attribute

Recall at optimal threshold.

f1 instance-attribute

F1 score at optimal threshold.

optimal_threshold instance-attribute

Threshold that maximizes F1 score.

confusion_matrix = None class-attribute instance-attribute

Confusion matrix at optimal threshold.

per_class_metrics = None class-attribute instance-attribute

Per-class metrics if available.

num_samples = 0 class-attribute instance-attribute

Total number of samples evaluated.

num_novel = 0 class-attribute instance-attribute

Number of actually novel samples.

timestamp = '' class-attribute instance-attribute

ISO timestamp of when evaluation was run.

SampleMetrics(index, text, predicted_class, confidence, is_novel, novelty_score, strategy_flags, raw_metrics) dataclass

Aggregated metrics for a single sample.

Contains metrics from all strategies for a specific sample.

Attributes
index instance-attribute

Sample index in the input batch.

text instance-attribute

The input text.

predicted_class instance-attribute

Predicted class for this sample.

confidence instance-attribute

Prediction confidence score.

is_novel instance-attribute

Whether this sample was flagged as novel.

novelty_score instance-attribute

Final combined novelty score.

strategy_flags instance-attribute

Which strategies flagged this sample.

raw_metrics instance-attribute

Raw metrics from each strategy.

StrategyMetrics(strategy_id, flags, metrics) dataclass

Metrics from a single strategy.

Contains the flags and per-sample metrics produced by a strategy.

Attributes
strategy_id instance-attribute

Identifier for the strategy.

flags instance-attribute

Indices flagged as novel by this strategy.

metrics instance-attribute

Per-sample metrics from this strategy.

ANNBackend

Supported ANN backends.

ANNIndex(dim, backend=ANNBackend.HNSWLIB, max_elements=100000, ef_construction=200, M=16)

Wrapper for Approximate Nearest Neighbor indexing.

Provides efficient O(log n) similarity search using HNSWlib or FAISS.

Parameters:

Name Type Description Default
dim int

Dimensionality of embeddings

required
backend str

ANN backend to use ('hnswlib' or 'faiss')

HNSWLIB
max_elements int

Maximum number of elements to index

100000
ef_construction int

HNSW ef_construction parameter (higher = better quality)

200
M int

HNSW M parameter (higher = better quality, more memory)

16
Source code in src/novelentitymatcher/novelty/storage/index.py
def __init__(
    self,
    dim: int,
    backend: str = ANNBackend.HNSWLIB,
    max_elements: int = 100000,
    ef_construction: int = 200,
    M: int = 16,
):
    """
    Initialize ANN index.

    Args:
        dim: Dimensionality of embeddings
        backend: ANN backend to use ('hnswlib' or 'faiss')
        max_elements: Maximum number of elements to index
        ef_construction: HNSW ef_construction parameter (higher = better quality)
        M: HNSW M parameter (higher = better quality, more memory)
    """
    self.dim = dim
    self.backend = backend
    self.max_elements = max_elements
    self._index: Any = None
    self._labels: list[str] = []
    self._vector_buffer: list[np.ndarray] = []
    self._vectors: np.ndarray | None = None
    self._hnsw_params: dict = {}

    if backend == ANNBackend.HNSWLIB:
        self._init_hnswlib(ef_construction, M)
    elif backend == ANNBackend.FAISS:
        self._init_faiss()
    elif backend == ANNBackend.EXACT:
        logger.info("Initialized exact ANN fallback with dim=%s", self.dim)
    else:
        raise ValueError(f"Unsupported backend: {backend}")
Attributes
n_elements property

Get number of elements in the index.

labels property

Return the labels stored alongside indexed vectors.

Functions
add_vectors(vectors, labels=None)

Add vectors to the index.

Parameters:

Name Type Description Default
vectors ndarray

Array of shape (n_vectors, dim)

required
labels list[str] | None

Optional labels for the vectors

None
Source code in src/novelentitymatcher/novelty/storage/index.py
def add_vectors(self, vectors: np.ndarray, labels: list[str] | None = None) -> None:
    """
    Add vectors to the index.

    Args:
        vectors: Array of shape (n_vectors, dim)
        labels: Optional labels for the vectors
    """
    if len(vectors) == 0:
        return

    if vectors.shape[1] != self.dim:
        raise ValueError(
            f"Vector dimension mismatch: expected {self.dim}, got {vectors.shape[1]}"
        )

    # Normalize vectors for cosine similarity
    vectors = self._normalize(vectors).astype(np.float32, copy=False)

    if self.backend == ANNBackend.HNSWLIB:
        current_count = self._index.get_current_count()
        if current_count + len(vectors) > self.max_elements:
            self._resize_hnsw_index(current_count + len(vectors))
        self._index.add_items(vectors)
    elif self.backend == ANNBackend.FAISS:
        self._index.add(vectors)

    self._vector_buffer.append(vectors)
    self._vectors = None

    if labels:
        self._labels.extend(labels)
    else:
        start = len(self._labels)
        self._labels.extend([str(i) for i in range(start, start + len(vectors))])
knn_query(query, k=5)

Find k-nearest neighbors for query vector(s).

Parameters:

Name Type Description Default
query ndarray

Query vector or vectors of shape (n_queries, dim)

required
k int

Number of neighbors to return

5

Returns:

Type Description
ndarray

Tuple of (distances, indices)

ndarray
  • distances: Array of shape (n_queries, k) with similarity scores
tuple[ndarray, ndarray]
  • indices: Array of shape (n_queries, k) with neighbor indices
Source code in src/novelentitymatcher/novelty/storage/index.py
def knn_query(self, query: np.ndarray, k: int = 5) -> tuple[np.ndarray, np.ndarray]:
    """
    Find k-nearest neighbors for query vector(s).

    Args:
        query: Query vector or vectors of shape (n_queries, dim)
        k: Number of neighbors to return

    Returns:
        Tuple of (distances, indices)
        - distances: Array of shape (n_queries, k) with similarity scores
        - indices: Array of shape (n_queries, k) with neighbor indices
    """
    if query.ndim == 1:
        query = query.reshape(1, -1)

    # Normalize query vectors
    query = self._normalize(query)

    if self.backend == ANNBackend.HNSWLIB:
        labels, distances = self._index.knn_query(query, k=k)
        # HNSWlib returns distances (lower is better), convert to similarities
        similarities = 1 - distances
        return similarities, labels
    if self.backend == ANNBackend.FAISS:
        distances, indices = self._index.search(query, k)
        # FAISS IndexFlatIP returns similarities directly
        return distances, indices

    if self._ensure_vectors().size == 0:
        empty = np.empty((len(query), 0), dtype=np.float32)
        return empty, empty.astype(int)

    vectors = self._ensure_vectors()
    k = min(k, len(vectors))
    similarities = np.dot(query.astype(np.float32, copy=False), vectors.T)
    top_indices = np.argsort(-similarities, axis=1)[:, :k]
    top_similarities = np.take_along_axis(similarities, top_indices, axis=1)
    return top_similarities, top_indices
get_distance_matrix(queries, targets=None)

Get distance matrix between queries and all indexed vectors.

Parameters:

Name Type Description Default
queries ndarray

Query vectors of shape (n_queries, dim)

required
targets ndarray | None

Optional target vectors (if None, use all indexed vectors)

None

Returns:

Type Description
ndarray

Distance matrix of shape (n_queries, n_targets)

Source code in src/novelentitymatcher/novelty/storage/index.py
def get_distance_matrix(
    self, queries: np.ndarray, targets: np.ndarray | None = None
) -> np.ndarray:
    """
    Get distance matrix between queries and all indexed vectors.

    Args:
        queries: Query vectors of shape (n_queries, dim)
        targets: Optional target vectors (if None, use all indexed vectors)

    Returns:
        Distance matrix of shape (n_queries, n_targets)
    """
    if queries.ndim == 1:
        queries = queries.reshape(1, -1)

    # Normalize queries
    queries = self._normalize(queries).astype(np.float32, copy=False)

    if targets is None:
        vectors = self._ensure_vectors()
        if vectors.size == 0:
            return np.zeros((len(queries), 0), dtype=np.float32)
        return np.dot(queries, vectors.T)
    else:
        # Compute direct similarity
        targets = self._normalize(targets).astype(np.float32, copy=False)
        return np.dot(queries, targets.T)
save(path)

Save index to disk.

Source code in src/novelentitymatcher/novelty/storage/index.py
def save(self, path: str | Path) -> None:
    """Save index to disk."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    labels_path = path.with_suffix(".labels.json")
    vectors_path = path.with_suffix(".vectors.npy")

    if self.backend == ANNBackend.HNSWLIB:
        self._index.save_index(str(path.with_suffix(".bin")))
        logger.info(f"Saved HNSWlib index to {path}")
    elif self.backend == ANNBackend.FAISS:
        import faiss

        faiss.write_index(self._index, str(path.with_suffix(".index")))
        logger.info(f"Saved FAISS index to {path}")
    else:
        logger.info(f"Saved exact ANN fallback index to {path}")

    labels_path.write_text(
        json.dumps(self._labels, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )
    np.save(vectors_path, self._ensure_vectors())
load(path)

Load index from disk.

Source code in src/novelentitymatcher/novelty/storage/index.py
def load(self, path: str | Path) -> None:
    """Load index from disk."""
    path = Path(path)
    labels_path = path.with_suffix(".labels.json")
    vectors_path = path.with_suffix(".vectors.npy")

    if self.backend == ANNBackend.HNSWLIB:
        bin_path = path.with_suffix(".bin")
        if not bin_path.exists():
            raise FileNotFoundError(f"Index file not found: {bin_path}")
        self._index.load_index(str(bin_path))
        logger.info(f"Loaded HNSWlib index from {path}")
    elif self.backend == ANNBackend.FAISS:
        import faiss

        index_path = path.with_suffix(".index")
        if not index_path.exists():
            raise FileNotFoundError(f"Index file not found: {index_path}")
        self._index = faiss.read_index(str(index_path))
        logger.info(f"Loaded FAISS index from {path}")
    else:
        logger.info(f"Loaded exact ANN fallback index from {path}")

    if labels_path.exists():
        loaded_labels = json.loads(labels_path.read_text(encoding="utf-8"))
        self._labels = [str(label) for label in loaded_labels]
    else:
        # Backward-compatible fallback for older saved indexes.
        self._labels = [str(i) for i in range(self.n_elements)]

    if vectors_path.exists():
        self._vectors = np.load(vectors_path).astype(np.float32, copy=False)
        self._vector_buffer = [self._vectors]
    else:
        self._vectors = np.empty((0, self.dim), dtype=np.float32)
        self._vector_buffer = []
clear()

Clear all elements from the index.

Source code in src/novelentitymatcher/novelty/storage/index.py
def clear(self) -> None:
    """Clear all elements from the index."""
    if self.backend == ANNBackend.HNSWLIB:
        # HNSWlib doesn't support clear, need to reinitialize
        raise NotImplementedError(
            "HNSWlib doesn't support clearing. Create a new index instead."
        )
    elif self.backend == ANNBackend.FAISS:
        import faiss

        self._index = faiss.IndexFlatIP(self.dim)
        self._labels = []
        self._vectors = np.empty((0, self.dim), dtype=np.float32)
        self._vector_buffer = []
        logger.info("Cleared FAISS index")
    else:
        self._labels = []
        self._vectors = np.empty((0, self.dim), dtype=np.float32)
        self._vector_buffer = []
        logger.info("Cleared exact ANN fallback index")

PromotionResult(review_record, entities_added=list(), index_updated=False, retrain_required=False) dataclass

Captures what happened during a promotion.

Attributes
state property

Backward-compatible alias for review_record.state.

promoted_at property

Backward-compatible alias for review_record.promoted_at.

ProposalReviewManager(storage_path='./proposals/review_records.json')

Persist and update proposal review records for HITL workflows.

Source code in src/novelentitymatcher/novelty/storage/review.py
def __init__(self, storage_path: str | Path = "./proposals/review_records.json"):
    self.storage_path = Path(storage_path)
Functions
promote_with_index_update(review_id, matcher)

Promote and automatically update the matcher's entity index.

Parameters:

Name Type Description Default
review_id str

The review record to promote.

required
matcher Any

A NovelEntityMatcher or similar object with entities and optional reindex / fit methods.

required

Returns:

Type Description
PromotionResult

PromotionResult with full details of the promotion.

Source code in src/novelentitymatcher/novelty/storage/review.py
def promote_with_index_update(
    self,
    review_id: str,
    matcher: Any,
) -> PromotionResult:
    """Promote and automatically update the matcher's entity index.

    Args:
        review_id: The review record to promote.
        matcher: A NovelEntityMatcher or similar object with ``entities``
            and optional ``reindex`` / ``fit`` methods.

    Returns:
        PromotionResult with full details of the promotion.
    """
    entities = list(getattr(matcher, "entities", []))

    def index_updater(new_entities: list[dict[str, Any]]) -> None:
        matcher.entities = entities
        reindex = getattr(matcher, "reindex", None)
        if callable(reindex):
            reindex()
        else:
            fit = getattr(matcher, "fit", None)
            if callable(fit):
                fit()

    def retrain_callback() -> None:
        pass

    return self.promote(
        review_id,
        entities=entities,
        index_updater=index_updater,
        retrain_callback=retrain_callback,
    )

NoveltyStrategy

Bases: ABC

Base protocol for all novelty detection strategies.

Each strategy is responsible for: 1. Initializing with reference embeddings and labels 2. Detecting novel samples from a batch of inputs 3. Providing per-sample metrics for signal combination 4. Specifying its weight for signal fusion

Attributes
config_schema abstractmethod property

Return the config dataclass type for this strategy.

This is used for validation and defaults.

Functions
initialize(reference_embeddings, reference_labels, config) abstractmethod

Initialize strategy with reference data.

Parameters:

Name Type Description Default
reference_embeddings ndarray

Embeddings of known samples

required
reference_labels list[str]

Class labels for known samples

required
config Any

Strategy-specific configuration object

required
Source code in src/novelentitymatcher/novelty/strategies/base.py
@abstractmethod
def initialize(
    self,
    reference_embeddings: np.ndarray,
    reference_labels: list[str],
    config: Any,
) -> None:
    """
    Initialize strategy with reference data.

    Args:
        reference_embeddings: Embeddings of known samples
        reference_labels: Class labels for known samples
        config: Strategy-specific configuration object
    """
detect(texts, embeddings, predicted_classes, confidences, **kwargs) abstractmethod

Detect novel samples.

Parameters:

Name Type Description Default
texts list[str]

Input texts

required
embeddings ndarray

Text embeddings

required
predicted_classes list[str]

Predicted class for each sample

required
confidences ndarray

Prediction confidence scores

required
**kwargs

Additional strategy-specific parameters

{}

Returns:

Type Description
set[int]

(flags, metrics) - flagged indices and per-sample metrics

dict[int, dict[str, Any]]
  • flags: Set of indices flagged as novel
tuple[set[int], dict[int, dict[str, Any]]]
  • metrics: Dict mapping index to metric dict
Source code in src/novelentitymatcher/novelty/strategies/base.py
@abstractmethod
def detect(
    self,
    texts: list[str],
    embeddings: np.ndarray,
    predicted_classes: list[str],
    confidences: np.ndarray,
    **kwargs,
) -> tuple[set[int], dict[int, dict[str, Any]]]:
    """
    Detect novel samples.

    Args:
        texts: Input texts
        embeddings: Text embeddings
        predicted_classes: Predicted class for each sample
        confidences: Prediction confidence scores
        **kwargs: Additional strategy-specific parameters

    Returns:
        (flags, metrics) - flagged indices and per-sample metrics
        - flags: Set of indices flagged as novel
        - metrics: Dict mapping index to metric dict
    """
get_weight() abstractmethod

Return weight for signal combination.

This weight determines how much this strategy contributes to the final novelty score.

Source code in src/novelentitymatcher/novelty/strategies/base.py
@abstractmethod
def get_weight(self) -> float:
    """
    Return weight for signal combination.

    This weight determines how much this strategy contributes
    to the final novelty score.
    """
get_config()

Get the current configuration for this strategy.

Override this if your strategy stores its config differently.

Source code in src/novelentitymatcher/novelty/strategies/base.py
def get_config(self) -> Any:
    """
    Get the current configuration for this strategy.

    Override this if your strategy stores its config differently.
    """
    return getattr(self, "_config", None)

ClusteringStrategy()

Bases: NoveltyStrategy

Clustering-based strategy for novelty detection.

Uses HDBSCAN to cluster samples and identifies novel samples as those that are in small or low-cohesion clusters.

Source code in src/novelentitymatcher/novelty/strategies/clustering.py
def __init__(self):
    self._config: ClusteringConfig = None
    self._clusterer: ScalableClusterer = None
    self._validator: ClusterValidator = None
    self._reference_embeddings: np.ndarray = None
    self._reference_labels: list[str] = None
Attributes
config_schema property

Return ClusteringConfig as the config schema.

Functions
get_config()

Get the current configuration for this strategy.

Override this if your strategy stores its config differently.

Source code in src/novelentitymatcher/novelty/strategies/base.py
def get_config(self) -> Any:
    """
    Get the current configuration for this strategy.

    Override this if your strategy stores its config differently.
    """
    return getattr(self, "_config", None)
initialize(reference_embeddings, reference_labels, config)

Initialize the clustering strategy.

Parameters:

Name Type Description Default
reference_embeddings ndarray

Embeddings of known samples

required
reference_labels list[str]

Labels of known samples

required
config ClusteringConfig

ClusteringConfig with thresholds

required
Source code in src/novelentitymatcher/novelty/strategies/clustering.py
def initialize(
    self,
    reference_embeddings: np.ndarray,
    reference_labels: list[str],
    config: ClusteringConfig,
) -> None:
    """
    Initialize the clustering strategy.

    Args:
        reference_embeddings: Embeddings of known samples
        reference_labels: Labels of known samples
        config: ClusteringConfig with thresholds
    """
    self._config = config or ClusteringConfig()
    self._reference_embeddings = reference_embeddings
    self._reference_labels = reference_labels

    # Initialize clusterer
    self._clusterer = ScalableClusterer(
        min_cluster_size=self._config.hdbscan_min_cluster_size,
        min_samples=self._config.hdbscan_min_samples,
        cluster_selection_epsilon=self._config.cluster_selection_epsilon,
    )

    # Initialize validator
    self._validator = ClusterValidator(
        min_cohesion_threshold=self._config.cohesion_threshold,
        min_persistence_threshold=self._config.persistence_threshold,
    )
detect(texts, embeddings, predicted_classes, confidences, **kwargs)

Detect novel samples using clustering.

Parameters:

Name Type Description Default
texts list[str]

Input texts

required
embeddings ndarray

Text embeddings

required
predicted_classes list[str]

Predicted classes

required
confidences ndarray

Prediction confidences

required
**kwargs

Additional parameters

{}

Returns:

Type Description
tuple[set[int], dict[int, dict[str, Any]]]

(flags, metrics) - Flagged indices and per-sample metrics

Source code in src/novelentitymatcher/novelty/strategies/clustering.py
def detect(
    self,
    texts: list[str],
    embeddings: np.ndarray,
    predicted_classes: list[str],
    confidences: np.ndarray,
    **kwargs,
) -> tuple[set[int], dict[int, dict[str, Any]]]:
    """
    Detect novel samples using clustering.

    Args:
        texts: Input texts
        embeddings: Text embeddings
        predicted_classes: Predicted classes
        confidences: Prediction confidences
        **kwargs: Additional parameters

    Returns:
        (flags, metrics) - Flagged indices and per-sample metrics
    """
    # Combine reference and query embeddings for clustering
    all_embeddings = np.vstack([self._reference_embeddings, embeddings])

    # Fit clusterer on all embeddings
    self._clusterer.fit(all_embeddings)

    # Get cluster labels
    labels = self._clusterer.labels

    # Separate query labels (reference samples come first)
    query_labels = labels[len(self._reference_embeddings) :]

    flags = set()
    metrics = {}

    # Validate clusters and identify novel samples
    unique_labels = np.unique(query_labels)

    for label in unique_labels:
        if label == -1:  # Noise points
            # All noise points are novel
            mask = query_labels == label
            indices = np.where(mask)[0]
            for idx in indices:
                flags.add(idx)
                metrics[idx] = {
                    "cluster_label": -1,
                    "cluster_support_score": 0.0,
                    "cluster_is_novel": True,
                    "cluster_size": 1,
                }
        else:
            # Check if cluster is valid
            # Get all embeddings with this label (including reference)
            all_mask = labels == label
            _cluster_embeddings = all_embeddings[all_mask]

            is_valid = self._validator.is_valid_cluster(
                all_embeddings,
                labels,
                label,
                min_size=self._config.min_cluster_size,
            )

            # Compute support score (1 - cohesion)
            cohesion = self._validator.compute_cohesion(
                all_embeddings, labels, label
            )
            support_score = 1.0 - cohesion

            # Get query indices for this cluster
            query_mask = query_labels == label
            query_indices = np.where(query_mask)[0]

            for idx in query_indices:
                # Novel if cluster is invalid or support score is low
                is_novel = not is_valid or support_score < (
                    1.0 - self._config.cohesion_threshold
                )

                if is_novel:
                    flags.add(idx)

                metrics[idx] = {
                    "cluster_label": int(label),
                    "cluster_support_score": support_score,
                    "cluster_is_novel": is_novel,
                    "cluster_size": int(np.sum(all_mask)),
                    "cluster_cohesion": cohesion,
                }

    return flags, metrics
get_weight()

Return weight for signal combination.

Source code in src/novelentitymatcher/novelty/strategies/clustering.py
def get_weight(self) -> float:
    """Return weight for signal combination."""
    # Clustering provides complementary signal
    return 0.2

ConfidenceStrategy()

Bases: NoveltyStrategy

Confidence threshold strategy for novelty detection.

Flags samples as novel if their prediction confidence falls below a configured threshold.

Source code in src/novelentitymatcher/novelty/strategies/confidence.py
def __init__(self):
    self._config: ConfidenceConfig = None
Attributes
config_schema property

Return ConfidenceConfig as the config schema.

Functions
get_config()

Get the current configuration for this strategy.

Override this if your strategy stores its config differently.

Source code in src/novelentitymatcher/novelty/strategies/base.py
def get_config(self) -> Any:
    """
    Get the current configuration for this strategy.

    Override this if your strategy stores its config differently.
    """
    return getattr(self, "_config", None)
initialize(reference_embeddings, reference_labels, config)

Initialize the confidence strategy.

Parameters:

Name Type Description Default
reference_embeddings ndarray

Embeddings of known samples (not used)

required
reference_labels list[str]

Labels of known samples (not used)

required
config ConfidenceConfig

ConfidenceConfig with threshold parameter

required
Source code in src/novelentitymatcher/novelty/strategies/confidence.py
def initialize(
    self,
    reference_embeddings: np.ndarray,
    reference_labels: list[str],
    config: ConfidenceConfig,
) -> None:
    """
    Initialize the confidence strategy.

    Args:
        reference_embeddings: Embeddings of known samples (not used)
        reference_labels: Labels of known samples (not used)
        config: ConfidenceConfig with threshold parameter
    """
    self._config = config or ConfidenceConfig()
detect(texts, embeddings, predicted_classes, confidences, **kwargs)

Detect novel samples using confidence threshold.

Parameters:

Name Type Description Default
texts list[str]

Input texts

required
embeddings ndarray

Text embeddings (not used)

required
predicted_classes list[str]

Predicted classes (not used)

required
confidences ndarray

Prediction confidence scores

required
**kwargs

Additional parameters

{}

Returns:

Type Description
tuple[set[int], dict[int, dict[str, Any]]]

(flags, metrics) - Flagged indices and per-sample metrics

Source code in src/novelentitymatcher/novelty/strategies/confidence.py
def detect(
    self,
    texts: list[str],
    embeddings: np.ndarray,
    predicted_classes: list[str],
    confidences: np.ndarray,
    **kwargs,
) -> tuple[set[int], dict[int, dict[str, Any]]]:
    """
    Detect novel samples using confidence threshold.

    Args:
        texts: Input texts
        embeddings: Text embeddings (not used)
        predicted_classes: Predicted classes (not used)
        confidences: Prediction confidence scores
        **kwargs: Additional parameters

    Returns:
        (flags, metrics) - Flagged indices and per-sample metrics
    """
    flags = set()
    metrics = {}

    for idx, confidence in enumerate(confidences):
        is_novel = confidence < self._config.threshold

        if is_novel:
            flags.add(idx)

        metrics[idx] = {
            "confidence_score": float(confidence),
            "confidence_is_novel": is_novel,
        }

    return flags, metrics
get_weight()

Return weight for signal combination.

Source code in src/novelentitymatcher/novelty/strategies/confidence.py
def get_weight(self) -> float:
    """Return weight for signal combination."""
    # Confidence is a foundational signal, give it moderate weight
    return 0.35

KNNDistanceStrategy()

Bases: NoveltyStrategy

kNN distance strategy for novelty detection.

Flags samples as novel if their average distance to k-nearest neighbors in the reference set exceeds a threshold.

Source code in src/novelentitymatcher/novelty/strategies/knn_distance.py
def __init__(self):
    self._config: KNNConfig = None
    self._ann_index: ANNIndex | None = None
Attributes
config_schema property

Return KNNConfig as the config schema.

Functions
get_config()

Get the current configuration for this strategy.

Override this if your strategy stores its config differently.

Source code in src/novelentitymatcher/novelty/strategies/base.py
def get_config(self) -> Any:
    """
    Get the current configuration for this strategy.

    Override this if your strategy stores its config differently.
    """
    return getattr(self, "_config", None)
initialize(reference_embeddings, reference_labels, config)

Initialize the kNN strategy with reference data.

Parameters:

Name Type Description Default
reference_embeddings ndarray

Embeddings of known samples

required
reference_labels list[str]

Labels of known samples

required
config KNNConfig

KNNConfig with k, thresholds, and metric

required
Source code in src/novelentitymatcher/novelty/strategies/knn_distance.py
def initialize(
    self,
    reference_embeddings: np.ndarray,
    reference_labels: list[str],
    config: KNNConfig,
) -> None:
    """
    Initialize the kNN strategy with reference data.

    Args:
        reference_embeddings: Embeddings of known samples
        reference_labels: Labels of known samples
        config: KNNConfig with k, thresholds, and metric
    """
    self._config = config or KNNConfig()

    # Initialize ANN index
    self._ann_index = ANNIndex(
        dim=reference_embeddings.shape[1],
        max_elements=len(reference_labels),
    )
    self._ann_index.add_vectors(reference_embeddings, reference_labels)
detect(texts, embeddings, predicted_classes, confidences, **kwargs)

Detect novel samples using kNN distance.

Parameters:

Name Type Description Default
texts list[str]

Input texts

required
embeddings ndarray

Text embeddings

required
predicted_classes list[str]

Predicted classes

required
confidences ndarray

Prediction confidences

required
**kwargs

Additional parameters

{}

Returns:

Type Description
tuple[set[int], dict[int, dict[str, Any]]]

(flags, metrics) - Flagged indices and per-sample metrics

Source code in src/novelentitymatcher/novelty/strategies/knn_distance.py
def detect(
    self,
    texts: list[str],
    embeddings: np.ndarray,
    predicted_classes: list[str],
    confidences: np.ndarray,
    **kwargs,
) -> tuple[set[int], dict[int, dict[str, Any]]]:
    """
    Detect novel samples using kNN distance.

    Args:
        texts: Input texts
        embeddings: Text embeddings
        predicted_classes: Predicted classes
        confidences: Prediction confidences
        **kwargs: Additional parameters

    Returns:
        (flags, metrics) - Flagged indices and per-sample metrics
    """
    k = min(self._config.k, self._ann_index.n_elements)

    # Query kNN
    similarities, neighbor_indices = self._ann_index.knn_query(embeddings, k=k)

    flags = set()
    metrics = {}

    for idx in range(len(embeddings)):
        metric = self._compute_knn_metrics(
            idx,
            similarities[idx],
            neighbor_indices[idx],
            predicted_classes[idx],
        )
        metrics[idx] = metric

        # Check if novelty score exceeds threshold
        if metric["knn_novelty_score"] >= self._config.distance_threshold:
            flags.add(idx)

    return flags, metrics
get_weight()

Return weight for signal combination.

Source code in src/novelentitymatcher/novelty/strategies/knn_distance.py
def get_weight(self) -> float:
    """Return weight for signal combination."""
    # kNN is a strong signal, give it high weight
    return 0.45

LOFStrategy()

Bases: NoveltyStrategy

LOF strategy for novelty detection.

Trains a Local Outlier Factor model on reference embeddings in novelty=True mode, then scores new samples. Samples with scores below the configurable threshold are flagged as novel.

Source code in src/novelentitymatcher/novelty/strategies/lof.py
def __init__(self):
    self._config: LOFConfig | None = None
    self._lof_model: LocalOutlierFactor | None = None
    self._fallback: bool = False
Attributes
config_schema property

Return LOFConfig as the config schema.

Functions
get_config()

Get the current configuration for this strategy.

Override this if your strategy stores its config differently.

Source code in src/novelentitymatcher/novelty/strategies/base.py
def get_config(self) -> Any:
    """
    Get the current configuration for this strategy.

    Override this if your strategy stores its config differently.
    """
    return getattr(self, "_config", None)
initialize(reference_embeddings, reference_labels, config)

Initialize LOF strategy by fitting on reference embeddings.

Parameters:

Name Type Description Default
reference_embeddings ndarray

Embeddings of known samples

required
reference_labels list[str]

Labels of known samples

required
config LOFConfig

LOFConfig with n_neighbors, contamination, metric, threshold

required
Source code in src/novelentitymatcher/novelty/strategies/lof.py
def initialize(
    self,
    reference_embeddings: np.ndarray,
    reference_labels: list[str],
    config: LOFConfig,
) -> None:
    """
    Initialize LOF strategy by fitting on reference embeddings.

    Args:
        reference_embeddings: Embeddings of known samples
        reference_labels: Labels of known samples
        config: LOFConfig with n_neighbors, contamination, metric, threshold
    """
    self._config = config or LOFConfig()
    self._fallback = False

    n_ref = len(reference_embeddings)
    n_neighbors = self._config.n_neighbors

    if n_ref < n_neighbors:
        logger.warning(
            "LOF: reference set too small (%d < %d neighbors). "
            "Falling back to flagging all samples.",
            n_ref,
            n_neighbors,
        )
        self._lof_model = None
        self._fallback = True
        return

    try:
        self._lof_model = LocalOutlierFactor(
            n_neighbors=n_neighbors,
            contamination=self._config.contamination,
            metric=self._config.metric,
            novelty=True,
        )
        self._lof_model.fit(reference_embeddings)
    except (ValueError, TypeError, RuntimeError) as exc:
        logger.warning("LOF: failed to fit model: %s. Falling back.", exc)
        self._lof_model = None
        self._fallback = True
detect(texts, embeddings, predicted_classes, confidences, **kwargs)

Detect novel samples using LOF anomaly scores.

Parameters:

Name Type Description Default
texts list[str]

Input texts

required
embeddings ndarray

Text embeddings

required
predicted_classes list[str]

Predicted classes

required
confidences ndarray

Prediction confidences

required
**kwargs

Additional parameters

{}

Returns:

Type Description
tuple[set[int], dict[int, dict[str, Any]]]

(flags, metrics) - Flagged indices and per-sample metrics

Source code in src/novelentitymatcher/novelty/strategies/lof.py
def detect(
    self,
    texts: list[str],
    embeddings: np.ndarray,
    predicted_classes: list[str],
    confidences: np.ndarray,
    **kwargs,
) -> tuple[set[int], dict[int, dict[str, Any]]]:
    """
    Detect novel samples using LOF anomaly scores.

    Args:
        texts: Input texts
        embeddings: Text embeddings
        predicted_classes: Predicted classes
        confidences: Prediction confidences
        **kwargs: Additional parameters

    Returns:
        (flags, metrics) - Flagged indices and per-sample metrics
    """
    flags: set[int] = set()
    metrics: dict[int, dict[str, Any]] = {}

    if self._fallback or self._lof_model is None:
        for idx in range(len(embeddings)):
            metrics[idx] = {
                "lof_score": 0.0,
                "lof_novelty_score": 1.0,
                "lof_is_outlier": True,
            }
            flags.add(idx)
        return flags, metrics

    try:
        raw_scores = self._lof_model.score_samples(embeddings)
    except (ValueError, TypeError, RuntimeError) as exc:
        logger.warning("LOF: score_samples failed: %s. Flagging all.", exc)
        for idx in range(len(embeddings)):
            metrics[idx] = {
                "lof_score": 0.0,
                "lof_novelty_score": 1.0,
                "lof_is_outlier": True,
            }
            flags.add(idx)
        return flags, metrics

    threshold = self._config.score_threshold

    for idx in range(len(embeddings)):
        score = float(raw_scores[idx])
        novelty_score = -score
        is_outlier = score < threshold

        metrics[idx] = {
            "lof_score": score,
            "lof_novelty_score": novelty_score,
            "lof_is_outlier": is_outlier,
        }

        if is_outlier:
            flags.add(idx)

    return flags, metrics
get_weight()

Return weight for signal combination.

Source code in src/novelentitymatcher/novelty/strategies/lof.py
def get_weight(self) -> float:
    """Return weight for signal combination."""
    return 0.30

MahalanobisDistanceStrategy()

Bases: NoveltyStrategy

Mahalanobis distance strategy for novelty detection.

Computes the Mahalanobis distance from each sample to the class-conditional distribution (mean + shared covariance) of its predicted class. Samples whose distance exceeds a configurable threshold are flagged as novel.

When calibration_mode="conformal", raw distances are wrapped with conformal p-values for statistically grounded routing. This is backward- compatible: calibration_mode="none" produces identical results to the original threshold-only behavior.

Source code in src/novelentitymatcher/novelty/strategies/mahalanobis.py
def __init__(self):
    self._config: MahalanobisConfig = None
    self._class_means: dict[str, np.ndarray] = {}
    self._cov_inv: np.ndarray | None = None
    self._dim: int = 0
    self._calibrator: Any = None
Attributes
config_schema property

Return MahalanobisConfig as the config schema.

Functions
get_config()

Get the current configuration for this strategy.

Override this if your strategy stores its config differently.

Source code in src/novelentitymatcher/novelty/strategies/base.py
def get_config(self) -> Any:
    """
    Get the current configuration for this strategy.

    Override this if your strategy stores its config differently.
    """
    return getattr(self, "_config", None)
initialize(reference_embeddings, reference_labels, config)

Initialize the Mahalanobis strategy with reference data.

Computes per-class mean vectors and a shared (pooled) covariance matrix with regularization for numerical stability.

Parameters:

Name Type Description Default
reference_embeddings ndarray

Embeddings of known samples (n_samples, dim)

required
reference_labels list[str]

Class labels for known samples

required
config MahalanobisConfig

MahalanobisConfig with threshold, regularization, etc.

required
Source code in src/novelentitymatcher/novelty/strategies/mahalanobis.py
def initialize(
    self,
    reference_embeddings: np.ndarray,
    reference_labels: list[str],
    config: MahalanobisConfig,
) -> None:
    """
    Initialize the Mahalanobis strategy with reference data.

    Computes per-class mean vectors and a shared (pooled) covariance matrix
    with regularization for numerical stability.

    Args:
        reference_embeddings: Embeddings of known samples (n_samples, dim)
        reference_labels: Class labels for known samples
        config: MahalanobisConfig with threshold, regularization, etc.
    """
    self._config = config or MahalanobisConfig()
    self._dim = reference_embeddings.shape[1]
    self._class_means = {}
    self._cov_inv = None
    self._calibrator = None

    if self._config.calibration_mode == "conformal":
        self._initialize_with_calibration(reference_embeddings, reference_labels)
    else:
        self._initialize_core(reference_embeddings, reference_labels)
detect(texts, embeddings, predicted_classes, confidences, **kwargs)

Detect novel samples using Mahalanobis distance.

When calibration_mode="conformal", flagging uses p-values instead of raw distance thresholds. A sample is flagged if p_value < calibration_alpha.

Parameters:

Name Type Description Default
texts list[str]

Input texts

required
embeddings ndarray

Text embeddings

required
predicted_classes list[str]

Predicted classes

required
confidences ndarray

Prediction confidences

required
**kwargs

Additional parameters

{}

Returns:

Type Description
tuple[set[int], dict[int, dict[str, Any]]]

(flags, metrics) - Flagged indices and per-sample metrics

Source code in src/novelentitymatcher/novelty/strategies/mahalanobis.py
def detect(
    self,
    texts: list[str],
    embeddings: np.ndarray,
    predicted_classes: list[str],
    confidences: np.ndarray,
    **kwargs,
) -> tuple[set[int], dict[int, dict[str, Any]]]:
    """
    Detect novel samples using Mahalanobis distance.

    When ``calibration_mode="conformal"``, flagging uses p-values
    instead of raw distance thresholds. A sample is flagged if
    ``p_value < calibration_alpha``.

    Args:
        texts: Input texts
        embeddings: Text embeddings
        predicted_classes: Predicted classes
        confidences: Prediction confidences
        **kwargs: Additional parameters

    Returns:
        (flags, metrics) - Flagged indices and per-sample metrics
    """
    flags = set()
    metrics = {}

    if (
        self._config.calibration_mode == "conformal"
        and self._calibrator is not None
        and self._calibrator.is_calibrated
    ):
        raw_distances = self._compute_all_distances(embeddings, predicted_classes)
        if self._config.calibration_method == "mondrian":
            p_values = self._calibrator.predict_pvalues_for_class(
                raw_distances, predicted_classes
            )
        else:
            p_values = self._calibrator.predict_pvalues(raw_distances)

        for idx in range(len(embeddings)):
            metric = self._compute_mahalanobis_metrics(
                idx,
                embeddings[idx],
                predicted_classes[idx],
            )
            metric["p_value"] = float(p_values[idx])
            metric["calibration_mode"] = "conformal"
            metrics[idx] = metric

            if p_values[idx] < self._config.calibration_alpha:
                flags.add(idx)
    else:
        for idx in range(len(embeddings)):
            metric = self._compute_mahalanobis_metrics(
                idx,
                embeddings[idx],
                predicted_classes[idx],
            )
            metrics[idx] = metric

            if metric["mahalanobis_distance"] >= self._config.threshold:
                flags.add(idx)

    return flags, metrics
get_weight()

Return weight for signal combination.

Source code in src/novelentitymatcher/novelty/strategies/mahalanobis.py
def get_weight(self) -> float:
    """Return weight for signal combination."""
    return 0.35

SelfKnowledgeStrategy()

Bases: NoveltyStrategy

Self-knowledge strategy for novelty detection.

Uses a sparse autoencoder to learn representations of known samples and flags high reconstruction error as novel.

Source code in src/novelentitymatcher/novelty/strategies/self_knowledge.py
def __init__(self):
    self._config: SelfKnowledgeConfig = None
    self._detector: SelfKnowledgeDetector = None
Functions
get_config()

Get the current configuration for this strategy.

Override this if your strategy stores its config differently.

Source code in src/novelentitymatcher/novelty/strategies/base.py
def get_config(self) -> Any:
    """
    Get the current configuration for this strategy.

    Override this if your strategy stores its config differently.
    """
    return getattr(self, "_config", None)

UncertaintyStrategy()

Bases: NoveltyStrategy

Uncertainty-based strategy for novelty detection.

Flags samples as novel if their prediction uncertainty exceeds configured thresholds (margin or entropy).

Source code in src/novelentitymatcher/novelty/strategies/uncertainty.py
def __init__(self):
    self._config: UncertaintyConfig = None
Attributes
config_schema property

Return UncertaintyConfig as the config schema.

Functions
get_config()

Get the current configuration for this strategy.

Override this if your strategy stores its config differently.

Source code in src/novelentitymatcher/novelty/strategies/base.py
def get_config(self) -> Any:
    """
    Get the current configuration for this strategy.

    Override this if your strategy stores its config differently.
    """
    return getattr(self, "_config", None)
initialize(reference_embeddings, reference_labels, config)

Initialize the uncertainty strategy.

Parameters:

Name Type Description Default
reference_embeddings ndarray

Embeddings of known samples (not used)

required
reference_labels list[str]

Labels of known samples (not used)

required
config UncertaintyConfig

UncertaintyConfig with thresholds

required
Source code in src/novelentitymatcher/novelty/strategies/uncertainty.py
def initialize(
    self,
    reference_embeddings: np.ndarray,
    reference_labels: list[str],
    config: UncertaintyConfig,
) -> None:
    """
    Initialize the uncertainty strategy.

    Args:
        reference_embeddings: Embeddings of known samples (not used)
        reference_labels: Labels of known samples (not used)
        config: UncertaintyConfig with thresholds
    """
    self._config = config or UncertaintyConfig()
detect(texts, embeddings, predicted_classes, confidences, **kwargs)

Detect novel samples using uncertainty metrics.

Parameters:

Name Type Description Default
texts list[str]

Input texts

required
embeddings ndarray

Text embeddings (not used)

required
predicted_classes list[str]

Predicted classes (not used)

required
confidences ndarray

Prediction confidence scores

required
**kwargs

Additional parameters, may include 'all_probs' for full distribution

{}

Returns:

Type Description
tuple[set[int], dict[int, dict[str, Any]]]

(flags, metrics) - Flagged indices and per-sample metrics

Source code in src/novelentitymatcher/novelty/strategies/uncertainty.py
def detect(
    self,
    texts: list[str],
    embeddings: np.ndarray,
    predicted_classes: list[str],
    confidences: np.ndarray,
    **kwargs,
) -> tuple[set[int], dict[int, dict[str, Any]]]:
    """
    Detect novel samples using uncertainty metrics.

    Args:
        texts: Input texts
        embeddings: Text embeddings (not used)
        predicted_classes: Predicted classes (not used)
        confidences: Prediction confidence scores
        **kwargs: Additional parameters, may include 'all_probs' for full distribution

    Returns:
        (flags, metrics) - Flagged indices and per-sample metrics
    """
    flags = set()
    metrics = {}

    # Check if we have full probability distributions
    all_probs = kwargs.get("all_probs", None)

    for idx, confidence in enumerate(confidences):
        metric = self._compute_uncertainty_metrics(
            idx,
            confidence,
            all_probs[idx] if all_probs is not None else None,
        )
        metrics[idx] = metric

        # Check if uncertainty exceeds thresholds
        is_novel = (
            metric["margin_score"] < self._config.margin_threshold
            or metric["entropy_score"] > self._config.entropy_threshold
        )

        if is_novel:
            flags.add(idx)

    return flags, metrics
get_weight()

Return weight for signal combination.

Source code in src/novelentitymatcher/novelty/strategies/uncertainty.py
def get_weight(self) -> float:
    """Return weight for signal combination."""
    # Uncertainty is a strong signal
    return 0.35

MatchRecord(text, predicted_id, confidence, embedding, candidates=list(), raw_result=None, metadata=dict(), match_method=None, reference_embedding=None, distance=None) dataclass

Normalized per-query match metadata for downstream discovery stages.

MatchResultWithMetadata(predictions, confidences, embeddings, scores=None, metadata=None, candidate_results=list(), records=list()) dataclass

Enhanced match result with stable downstream metadata.

The legacy attributes (predictions, confidences, embeddings, metadata) remain available, while candidate_results and records provide a consistent contract for novelty and pipeline stages.