Skip to content

Novelty Clustering

novelentitymatcher.novelty.clustering.base

Abstract contract for clustering backends.

Classes

ClusteringBackend

Bases: ABC

Abstract contract for clustering backends.

Functions
fit_predict(embeddings, min_cluster_size=5, **kwargs) abstractmethod

Fit and predict cluster labels.

Parameters:

Name Type Description Default
embeddings ndarray

Input embeddings (n_samples, dim).

required
min_cluster_size int

Minimum points to form a cluster.

5
**kwargs Any

Backend-specific parameters.

{}

Returns:

Type Description
ndarray

(labels, probabilities, info_dict)

ndarray
  • labels: array of cluster assignments (-1 for noise)
dict[str, Any]
  • probabilities: array of cluster membership probabilities
tuple[ndarray, ndarray, dict[str, Any]]
  • info: dict with backend-specific metadata
Source code in src/novelentitymatcher/novelty/clustering/base.py
@abstractmethod
def fit_predict(
    self,
    embeddings: np.ndarray,
    min_cluster_size: int = 5,
    **kwargs: Any,
) -> tuple[np.ndarray, np.ndarray, dict[str, Any]]:
    """
    Fit and predict cluster labels.

    Args:
        embeddings: Input embeddings (n_samples, dim).
        min_cluster_size: Minimum points to form a cluster.
        **kwargs: Backend-specific parameters.

    Returns:
        (labels, probabilities, info_dict)
        - labels: array of cluster assignments (-1 for noise)
        - probabilities: array of cluster membership probabilities
        - info: dict with backend-specific metadata
    """
    ...

novelentitymatcher.novelty.clustering.backends

Concrete clustering backend implementations and registry.

Classes

ClusteringBackendRegistry

Registry for clustering backends.

HDBSCANBackend(min_samples=5, cluster_selection_epsilon=0.0, metric='cosine', prediction_data=True)

Bases: ClusteringBackend

HDBSCAN clustering backend.

Source code in src/novelentitymatcher/novelty/clustering/backends.py
def __init__(
    self,
    min_samples: int = 5,
    cluster_selection_epsilon: float = 0.0,
    metric: str = "cosine",
    prediction_data: bool = True,
):
    self.min_samples = min_samples
    self.cluster_selection_epsilon = cluster_selection_epsilon
    self.metric = metric
    self.prediction_data = prediction_data
    self._clusterer: Any = None

SOPTICSBackend(min_samples=5, metric='cosine')

Bases: ClusteringBackend

sOPTICS (LSH-accelerated OPTICS) clustering backend.

Source code in src/novelentitymatcher/novelty/clustering/backends.py
def __init__(
    self,
    min_samples: int = 5,
    metric: str = "cosine",
):
    self.min_samples = min_samples
    self.metric = metric

UMAPHDBSCANBackend(min_samples=5, cluster_selection_epsilon=0.0, n_neighbors=15, umap_dim=10, umap_metric='cosine', prediction_data=True)

Bases: ClusteringBackend

UMAP preprocessing followed by HDBSCAN clustering backend.

Source code in src/novelentitymatcher/novelty/clustering/backends.py
def __init__(
    self,
    min_samples: int = 5,
    cluster_selection_epsilon: float = 0.0,
    n_neighbors: int = 15,
    umap_dim: int = 10,
    umap_metric: str = "cosine",
    prediction_data: bool = True,
):
    self.min_samples = min_samples
    self.cluster_selection_epsilon = cluster_selection_epsilon
    self.n_neighbors = n_neighbors
    self.umap_dim = umap_dim
    self.umap_metric = umap_metric
    self.prediction_data = prediction_data
    self._umap_model: Any = None
    self._clusterer: Any = None

Functions

novelentitymatcher.novelty.clustering.scalable

Scalable density-based clustering for novelty detection.

Supports HDBSCAN, sOPTICS (accelerated), and UMAP-preprocessed clustering for handling up to 1M scale with subquadratic runtime.

Classes

ScalableClusterer(backend='auto', min_cluster_size=5, min_samples=5, cluster_selection_epsilon=0.0, n_neighbors=15, umap_dim=10, umap_metric='cosine', prediction_data=True)

Wrapper for scalable density-based clustering.

Supports: - HDBSCAN: Standard hierarchical DBSCAN (best for <100K points) - sOPTICS: LSH-accelerated OPTICS (for 100K-1M points) - UMAP+HDBSCAN: UMAP dimensionality reduction before HDBSCAN - Auto: Automatic backend selection based on dataset size

Parameters:

Name Type Description Default
backend str

Clustering backend ('hdbscan', 'soptics', 'umap_hdbscan', 'auto')

'auto'
min_cluster_size int

Minimum points to form a cluster.

5
min_samples int

Min samples for core distance (OPTICS).

5
cluster_selection_epsilon float

Distance threshold for cluster selection.

0.0
n_neighbors int

Neighbors for UMAP (if used).

15
umap_dim int

Target dimensionality for UMAP preprocessing.

10
umap_metric str

Metric for UMAP.

'cosine'
prediction_data bool

Whether to compute prediction_data for HDBSCAN.

True
Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def __init__(
    self,
    backend: str = "auto",
    min_cluster_size: int = 5,
    min_samples: int = 5,
    cluster_selection_epsilon: float = 0.0,
    n_neighbors: int = 15,
    umap_dim: int = 10,
    umap_metric: str = "cosine",
    prediction_data: bool = True,
):
    """
    Initialize scalable clusterer.

    Args:
        backend: Clustering backend ('hdbscan', 'soptics', 'umap_hdbscan', 'auto')
        min_cluster_size: Minimum points to form a cluster.
        min_samples: Min samples for core distance (OPTICS).
        cluster_selection_epsilon: Distance threshold for cluster selection.
        n_neighbors: Neighbors for UMAP (if used).
        umap_dim: Target dimensionality for UMAP preprocessing.
        umap_metric: Metric for UMAP.
        prediction_data: Whether to compute prediction_data for HDBSCAN.
    """
    self.backend = backend
    self.min_cluster_size = min_cluster_size
    self.min_samples = min_samples
    self.cluster_selection_epsilon = cluster_selection_epsilon
    self.n_neighbors = n_neighbors
    self.umap_dim = umap_dim
    self.umap_metric = umap_metric
    self.prediction_data = prediction_data

    self._backend_instance: Any | None = None
    self._labels: np.ndarray | None = None
    self._probabilities: np.ndarray | None = None
    self._n_points: int = 0
Attributes
labels property

Get cluster labels.

probabilities property

Get cluster membership probabilities.

Functions
fit_predict(embeddings, metric='cosine')

Fit clusterer and predict labels.

Parameters:

Name Type Description Default
embeddings ndarray

Input embeddings (n_samples, dim)

required
metric str

Distance metric ('cosine', 'euclidean', 'precomputed')

'cosine'

Returns:

Type Description
tuple[ndarray, ndarray, dict[str, Any]]

Tuple of (cluster_labels, probabilities, validation_info)

Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def fit_predict(
    self,
    embeddings: np.ndarray,
    metric: str = "cosine",
) -> tuple[np.ndarray, np.ndarray, dict[str, Any]]:
    """
    Fit clusterer and predict labels.

    Args:
        embeddings: Input embeddings (n_samples, dim)
        metric: Distance metric ('cosine', 'euclidean', 'precomputed')

    Returns:
        Tuple of (cluster_labels, probabilities, validation_info)
    """
    X = np.asarray(embeddings, dtype=np.float32)
    if X.ndim != 2:
        raise ValueError(f"Expected 2D array, got {X.ndim}D")
    self._n_points = X.shape[0]

    backend_name = self.backend
    if backend_name == self.BACKEND_AUTO:
        backend_name = self._auto_backend(self._n_points)
        logger.info(
            f"Auto-selected backend: {backend_name} for {self._n_points} points"
        )

    self._backend_instance = self._create_backend(backend_name)

    labels, probabilities, backend_info = self._backend_instance.fit_predict(
        X, min_cluster_size=self.min_cluster_size, metric=metric
    )

    self._labels = labels
    self._probabilities = probabilities

    unique_clusters = sorted({int(label) for label in labels if int(label) >= 0})
    validation_info: dict[str, Any] = {
        "backend": backend_name,
        "n_points": self._n_points,
        "n_clusters": len(unique_clusters),
        "n_noise": int(np.sum(labels == -1)),
        "persistences": backend_info.get("persistences", []),
        "unique_clusters": unique_clusters,
    }

    logger.info(
        f"Clustering complete: {validation_info['n_clusters']} clusters, "
        f"{validation_info['n_noise']} noise points"
    )

    return labels, probabilities, validation_info
fit(embeddings, metric='cosine')

Fit the clusterer (alias for compatibility).

Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def fit(
    self,
    embeddings: np.ndarray,
    metric: str = "cosine",
) -> ScalableClusterer:
    """Fit the clusterer (alias for compatibility)."""
    self.fit_predict(embeddings, metric=metric)
    return self
get_cluster_members(cluster_id)

Get indices of members in a specific cluster.

Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def get_cluster_members(
    self,
    cluster_id: int,
) -> np.ndarray:
    """Get indices of members in a specific cluster."""
    if self._labels is None:
        raise RuntimeError("Clusterer must be fitted first")
    return np.where(self._labels == cluster_id)[0]
get_noise_points()

Get indices of noise points (label = -1).

Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def get_noise_points(self) -> np.ndarray:
    """Get indices of noise points (label = -1)."""
    if self._labels is None:
        raise RuntimeError("Clusterer must be fitted first")
    return np.where(self._labels == -1)[0]

Functions

compute_cluster_quality(embeddings, labels, known_embeddings=None, metric='cosine')

Compute quality metrics for discovered clusters.

Parameters:

Name Type Description Default
embeddings ndarray

Cluster member embeddings (n_cluster, dim)

required
labels ndarray

Cluster labels for all points (n_total,)

required
known_embeddings ndarray | None

Optional known entity embeddings for ratio calculation

None
metric str

Distance metric

'cosine'

Returns:

Type Description
dict[str, float]

Dictionary with quality metrics:

dict[str, float]
  • cohesion: avg pairwise distance within clusters (lower = better)
dict[str, float]
  • separation: avg distance between cluster centroids
dict[str, float]
  • silhouette: standard silhouette score
dict[str, float]
  • known_ratio: fraction of cluster close to known entities
Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def compute_cluster_quality(
    embeddings: np.ndarray,
    labels: np.ndarray,
    known_embeddings: np.ndarray | None = None,
    metric: str = "cosine",
) -> dict[str, float]:
    """
    Compute quality metrics for discovered clusters.

    Args:
        embeddings: Cluster member embeddings (n_cluster, dim)
        labels: Cluster labels for all points (n_total,)
        known_embeddings: Optional known entity embeddings for ratio calculation
        metric: Distance metric

    Returns:
        Dictionary with quality metrics:
        - cohesion: avg pairwise distance within clusters (lower = better)
        - separation: avg distance between cluster centroids
        - silhouette: standard silhouette score
        - known_ratio: fraction of cluster close to known entities
    """
    unique_labels = sorted({int(label) for label in labels if int(label) >= 0})
    n_clusters = len(unique_labels)

    if n_clusters == 0:
        return {
            "cohesion": 0.0,
            "separation": 0.0,
            "silhouette": 0.0,
            "known_ratio": 0.0,
        }

    from sklearn.metrics import pairwise_distances

    cohesion_scores = []
    for cluster_id in unique_labels:
        member_indices = np.where(labels == cluster_id)[0]
        if len(member_indices) > 1:
            cluster_embeddings = embeddings[member_indices]
            pairwise_dists = pairwise_distances(cluster_embeddings, metric=metric)
            upper_tri = pairwise_dists[np.triu_indices_from(pairwise_dists, k=1)]
            cohesion_scores.append(float(np.mean(upper_tri)))

    cohesion = float(np.mean(cohesion_scores)) if cohesion_scores else 0.0

    centroids: list[Any] = []
    for cluster_id in unique_labels:
        member_indices = np.where(labels == cluster_id)[0]
        centroid = np.mean(embeddings[member_indices], axis=0)
        centroids.append(centroid)
    centroids_array = np.array(centroids)

    if len(centroids_array) > 1:
        centroid_distances = pairwise_distances(centroids_array, metric=metric)
        upper_tri = centroid_distances[np.triu_indices_from(centroid_distances, k=1)]
        separation = float(np.mean(upper_tri))
    else:
        separation = 0.0

    if len(unique_labels) > 1 and len(embeddings) > len(unique_labels):
        try:
            from sklearn.metrics import silhouette_score

            silhouette = float(silhouette_score(embeddings, labels, metric=metric))
        except (ValueError, TypeError, RuntimeError):
            silhouette = 0.0
    else:
        silhouette = 0.0

    known_ratio = 0.0
    if known_embeddings is not None and len(known_embeddings) > 0:
        known_dists = pairwise_distances(embeddings, known_embeddings, metric=metric)
        min_known_dists = np.min(known_dists, axis=1)
        threshold = np.percentile(min_known_dists, 25)
        known_ratio = float(np.mean(min_known_dists < threshold))

    return {
        "cohesion": cohesion,
        "separation": separation,
        "silhouette": silhouette,
        "known_ratio": known_ratio,
    }

validate_novel_cluster(cluster_embeddings, known_embeddings, cohesion_threshold=0.45, known_ratio_threshold=0.4, min_cluster_size=5, metric='cosine')

Validate that a cluster represents truly novel entities.

Parameters:

Name Type Description Default
cluster_embeddings ndarray

Embeddings of cluster members

required
known_embeddings ndarray

Embeddings of known entities

required
cohesion_threshold float

Max avg pairwise distance within cluster

0.45
known_ratio_threshold float

Max fraction that should be close to known

0.4
min_cluster_size int

Minimum required members

5
metric str

Distance metric

'cosine'

Returns:

Type Description
tuple[bool, float]

Tuple of (is_valid_novel, validation_score)

Source code in src/novelentitymatcher/novelty/clustering/scalable.py
def validate_novel_cluster(
    cluster_embeddings: np.ndarray,
    known_embeddings: np.ndarray,
    cohesion_threshold: float = 0.45,
    known_ratio_threshold: float = 0.4,
    min_cluster_size: int = 5,
    metric: str = "cosine",
) -> tuple[bool, float]:
    """
    Validate that a cluster represents truly novel entities.

    Args:
        cluster_embeddings: Embeddings of cluster members
        known_embeddings: Embeddings of known entities
        cohesion_threshold: Max avg pairwise distance within cluster
        known_ratio_threshold: Max fraction that should be close to known
        min_cluster_size: Minimum required members
        metric: Distance metric

    Returns:
        Tuple of (is_valid_novel, validation_score)
    """
    from sklearn.metrics import pairwise_distances

    n_members = len(cluster_embeddings)

    if n_members < min_cluster_size:
        return False, 0.0

    if len(cluster_embeddings) > 1:
        pairwise_dists = pairwise_distances(cluster_embeddings, metric=metric)
        upper_tri = pairwise_dists[np.triu_indices_from(pairwise_dists, k=1)]
        cohesion = float(np.mean(upper_tri)) if upper_tri.size > 0 else 0.0
    else:
        cohesion = 0.0

    cohesion_valid = cohesion <= cohesion_threshold

    if known_embeddings is not None and len(known_embeddings) > 0:
        known_dists = pairwise_distances(
            cluster_embeddings, known_embeddings, metric=metric
        )
        min_known_dists = np.min(known_dists, axis=1)
        known_ratio = float(np.mean(min_known_dists < cohesion_threshold))
    else:
        known_ratio = 0.0

    known_valid = known_ratio <= known_ratio_threshold

    is_valid = bool(cohesion_valid and known_valid)

    score = float(
        np.mean(
            [
                1.0 - min(cohesion / cohesion_threshold, 1.0)
                if cohesion_threshold > 0
                else 1.0,
                1.0 - min(known_ratio / known_ratio_threshold, 1.0)
                if known_ratio_threshold > 0
                else 1.0,
            ]
        )
    )

    return is_valid, score

novelentitymatcher.novelty.clustering.params

Pydantic parameter models for clustering backends.

Provides clean, validated configuration objects for each clustering backend, supporting benchmark sweeps over parameter combinations.

Classes

HDBSCANParams

Bases: BaseModel

Parameters for the HDBSCAN clustering backend.

SOPTICSParams

Bases: BaseModel

Parameters for the sOPTICS clustering backend.

UMAPHDBSCANParams

Bases: BaseModel

Parameters for the UMAP+HDBSCAN clustering backend.

novelentitymatcher.novelty.clustering.validation

Cluster validation logic for novelty detection.

This module provides utilities for validating clustering results and assessing cluster quality for novelty detection.

Classes

ClusterValidator(min_cohesion_threshold=0.45, min_persistence_threshold=0.1)

Validates clustering results for novelty detection.

Provides metrics and validation methods to assess cluster quality and determine if samples represent novel clusters.

Parameters:

Name Type Description Default
min_cohesion_threshold float

Minimum cohesion for valid clusters

0.45
min_persistence_threshold float

Minimum persistence for valid clusters

0.1
Source code in src/novelentitymatcher/novelty/clustering/validation.py
def __init__(
    self,
    min_cohesion_threshold: float = 0.45,
    min_persistence_threshold: float = 0.1,
):
    """
    Initialize the cluster validator.

    Args:
        min_cohesion_threshold: Minimum cohesion for valid clusters
        min_persistence_threshold: Minimum persistence for valid clusters
    """
    self.min_cohesion_threshold = min_cohesion_threshold
    self.min_persistence_threshold = min_persistence_threshold
Functions
compute_cohesion(embeddings, labels, cluster_id)

Compute cluster cohesion (compactness).

Cohesion is the average pairwise similarity within a cluster.

Parameters:

Name Type Description Default
embeddings ndarray

All embeddings

required
labels ndarray

Cluster labels for each embedding

required
cluster_id int

Cluster to compute cohesion for

required

Returns:

Type Description
float

Cohesion score (0-1, higher = more compact)

Source code in src/novelentitymatcher/novelty/clustering/validation.py
def compute_cohesion(
    self,
    embeddings: np.ndarray,
    labels: np.ndarray,
    cluster_id: int,
) -> float:
    """
    Compute cluster cohesion (compactness).

    Cohesion is the average pairwise similarity within a cluster.

    Args:
        embeddings: All embeddings
        labels: Cluster labels for each embedding
        cluster_id: Cluster to compute cohesion for

    Returns:
        Cohesion score (0-1, higher = more compact)
    """
    mask = labels == cluster_id
    if mask.sum() < 2:
        return 0.0

    cluster_embeddings = embeddings[mask]

    # Compute pairwise cosine similarities
    norms = np.linalg.norm(cluster_embeddings, axis=1)
    normalized = cluster_embeddings / norms[:, np.newaxis]

    # Average pairwise similarity
    similarity_matrix = np.dot(normalized, normalized.T)
    # Exclude diagonal
    np.fill_diagonal(similarity_matrix, 0)

    cohesion = similarity_matrix.sum() / (
        similarity_matrix.size - len(cluster_embeddings)
    )

    return float(cohesion)
compute_separation(embeddings, labels, cluster_id)

Compute cluster separation (distinctiveness from other clusters).

Separation is the minimum average distance to another cluster.

Parameters:

Name Type Description Default
embeddings ndarray

All embeddings

required
labels ndarray

Cluster labels for each embedding

required
cluster_id int

Cluster to compute separation for

required

Returns:

Type Description
float

Separation score (0-1, higher = more separated)

Source code in src/novelentitymatcher/novelty/clustering/validation.py
def compute_separation(
    self,
    embeddings: np.ndarray,
    labels: np.ndarray,
    cluster_id: int,
) -> float:
    """
    Compute cluster separation (distinctiveness from other clusters).

    Separation is the minimum average distance to another cluster.

    Args:
        embeddings: All embeddings
        labels: Cluster labels for each embedding
        cluster_id: Cluster to compute separation for

    Returns:
        Separation score (0-1, higher = more separated)
    """
    mask = labels == cluster_id
    if mask.sum() == 0:
        return 0.0

    cluster_embeddings = embeddings[mask]
    cluster_center = cluster_embeddings.mean(axis=0)

    unique_clusters = np.unique(labels)
    min_distance = float("inf")

    for other_id in unique_clusters:
        if other_id == cluster_id or other_id == -1:
            continue

        other_mask = labels == other_id
        other_embeddings = embeddings[other_mask]
        other_center = other_embeddings.mean(axis=0)

        # Cosine distance
        distance = 1.0 - np.dot(cluster_center, other_center) / (
            np.linalg.norm(cluster_center) * np.linalg.norm(other_center)
        )

        min_distance = min(min_distance, distance)

    return float(min_distance if min_distance != float("inf") else 0.0)
is_valid_cluster(embeddings, labels, cluster_id, min_size=5)

Determine if a cluster is valid (stable and meaningful).

Parameters:

Name Type Description Default
embeddings ndarray

All embeddings

required
labels ndarray

Cluster labels

required
cluster_id int

Cluster to validate

required
min_size int

Minimum number of samples for valid cluster

5

Returns:

Type Description
bool

True if cluster is valid

Source code in src/novelentitymatcher/novelty/clustering/validation.py
def is_valid_cluster(
    self,
    embeddings: np.ndarray,
    labels: np.ndarray,
    cluster_id: int,
    min_size: int = 5,
) -> bool:
    """
    Determine if a cluster is valid (stable and meaningful).

    Args:
        embeddings: All embeddings
        labels: Cluster labels
        cluster_id: Cluster to validate
        min_size: Minimum number of samples for valid cluster

    Returns:
        True if cluster is valid
    """
    # Check size
    mask = labels == cluster_id
    if mask.sum() < min_size:
        return False

    # Check cohesion
    cohesion = self.compute_cohesion(embeddings, labels, cluster_id)
    if cohesion < self.min_cohesion_threshold:
        return False

    return True
get_cluster_statistics(embeddings, labels)

Compute statistics for all clusters.

Parameters:

Name Type Description Default
embeddings ndarray

All embeddings

required
labels ndarray

Cluster labels

required

Returns:

Type Description
dict[int, dict[str, float]]

Dict mapping cluster_id to statistics dict

Source code in src/novelentitymatcher/novelty/clustering/validation.py
def get_cluster_statistics(
    self,
    embeddings: np.ndarray,
    labels: np.ndarray,
) -> dict[int, dict[str, float]]:
    """
    Compute statistics for all clusters.

    Args:
        embeddings: All embeddings
        labels: Cluster labels

    Returns:
        Dict mapping cluster_id to statistics dict
    """
    unique_clusters = np.unique(labels)
    stats = {}

    for cluster_id in unique_clusters:
        if cluster_id == -1:  # Noise points
            continue

        mask = labels == cluster_id
        size = mask.sum()

        stats[cluster_id] = {
            "size": int(size),
            "cohesion": self.compute_cohesion(embeddings, labels, cluster_id),
            "separation": self.compute_separation(embeddings, labels, cluster_id),
            "is_valid": self.is_valid_cluster(embeddings, labels, cluster_id),
        }

    return stats