Skip to content

Novelty Storage

novelentitymatcher.novelty.storage.index

Approximate Nearest Neighbor (ANN) index wrapper for efficient similarity search.

Supports HNSWlib and FAISS backends for O(log n) similarity search.

Classes

ANNBackend

Supported ANN backends.

ANNIndex(dim, backend=ANNBackend.HNSWLIB, max_elements=100000, ef_construction=200, M=16)

Wrapper for Approximate Nearest Neighbor indexing.

Provides efficient O(log n) similarity search using HNSWlib or FAISS.

Parameters:

Name Type Description Default
dim int

Dimensionality of embeddings

required
backend str

ANN backend to use ('hnswlib' or 'faiss')

HNSWLIB
max_elements int

Maximum number of elements to index

100000
ef_construction int

HNSW ef_construction parameter (higher = better quality)

200
M int

HNSW M parameter (higher = better quality, more memory)

16
Source code in src/novelentitymatcher/novelty/storage/index.py
def __init__(
    self,
    dim: int,
    backend: str = ANNBackend.HNSWLIB,
    max_elements: int = 100000,
    ef_construction: int = 200,
    M: int = 16,
):
    """
    Initialize ANN index.

    Args:
        dim: Dimensionality of embeddings
        backend: ANN backend to use ('hnswlib' or 'faiss')
        max_elements: Maximum number of elements to index
        ef_construction: HNSW ef_construction parameter (higher = better quality)
        M: HNSW M parameter (higher = better quality, more memory)
    """
    self.dim = dim
    self.backend = backend
    self.max_elements = max_elements
    self._index: Any = None
    self._labels: list[str] = []
    self._vector_buffer: list[np.ndarray] = []
    self._vectors: np.ndarray | None = None
    self._hnsw_params: dict = {}

    if backend == ANNBackend.HNSWLIB:
        self._init_hnswlib(ef_construction, M)
    elif backend == ANNBackend.FAISS:
        self._init_faiss()
    elif backend == ANNBackend.EXACT:
        logger.info("Initialized exact ANN fallback with dim=%s", self.dim)
    else:
        raise ValueError(f"Unsupported backend: {backend}")
Attributes
n_elements property

Get number of elements in the index.

labels property

Return the labels stored alongside indexed vectors.

Functions
add_vectors(vectors, labels=None)

Add vectors to the index.

Parameters:

Name Type Description Default
vectors ndarray

Array of shape (n_vectors, dim)

required
labels list[str] | None

Optional labels for the vectors

None
Source code in src/novelentitymatcher/novelty/storage/index.py
def add_vectors(self, vectors: np.ndarray, labels: list[str] | None = None) -> None:
    """
    Add vectors to the index.

    Args:
        vectors: Array of shape (n_vectors, dim)
        labels: Optional labels for the vectors
    """
    if len(vectors) == 0:
        return

    if vectors.shape[1] != self.dim:
        raise ValueError(
            f"Vector dimension mismatch: expected {self.dim}, got {vectors.shape[1]}"
        )

    # Normalize vectors for cosine similarity
    vectors = self._normalize(vectors).astype(np.float32, copy=False)

    if self.backend == ANNBackend.HNSWLIB:
        current_count = self._index.get_current_count()
        if current_count + len(vectors) > self.max_elements:
            self._resize_hnsw_index(current_count + len(vectors))
        self._index.add_items(vectors)
    elif self.backend == ANNBackend.FAISS:
        self._index.add(vectors)

    self._vector_buffer.append(vectors)
    self._vectors = None

    if labels:
        self._labels.extend(labels)
    else:
        start = len(self._labels)
        self._labels.extend([str(i) for i in range(start, start + len(vectors))])
knn_query(query, k=5)

Find k-nearest neighbors for query vector(s).

Parameters:

Name Type Description Default
query ndarray

Query vector or vectors of shape (n_queries, dim)

required
k int

Number of neighbors to return

5

Returns:

Type Description
ndarray

Tuple of (distances, indices)

ndarray
  • distances: Array of shape (n_queries, k) with similarity scores
tuple[ndarray, ndarray]
  • indices: Array of shape (n_queries, k) with neighbor indices
Source code in src/novelentitymatcher/novelty/storage/index.py
def knn_query(self, query: np.ndarray, k: int = 5) -> tuple[np.ndarray, np.ndarray]:
    """
    Find k-nearest neighbors for query vector(s).

    Args:
        query: Query vector or vectors of shape (n_queries, dim)
        k: Number of neighbors to return

    Returns:
        Tuple of (distances, indices)
        - distances: Array of shape (n_queries, k) with similarity scores
        - indices: Array of shape (n_queries, k) with neighbor indices
    """
    if query.ndim == 1:
        query = query.reshape(1, -1)

    # Normalize query vectors
    query = self._normalize(query)

    if self.backend == ANNBackend.HNSWLIB:
        labels, distances = self._index.knn_query(query, k=k)
        # HNSWlib returns distances (lower is better), convert to similarities
        similarities = 1 - distances
        return similarities, labels
    if self.backend == ANNBackend.FAISS:
        distances, indices = self._index.search(query, k)
        # FAISS IndexFlatIP returns similarities directly
        return distances, indices

    if self._ensure_vectors().size == 0:
        empty = np.empty((len(query), 0), dtype=np.float32)
        return empty, empty.astype(int)

    vectors = self._ensure_vectors()
    k = min(k, len(vectors))
    similarities = np.dot(query.astype(np.float32, copy=False), vectors.T)
    top_indices = np.argsort(-similarities, axis=1)[:, :k]
    top_similarities = np.take_along_axis(similarities, top_indices, axis=1)
    return top_similarities, top_indices
get_distance_matrix(queries, targets=None)

Get distance matrix between queries and all indexed vectors.

Parameters:

Name Type Description Default
queries ndarray

Query vectors of shape (n_queries, dim)

required
targets ndarray | None

Optional target vectors (if None, use all indexed vectors)

None

Returns:

Type Description
ndarray

Distance matrix of shape (n_queries, n_targets)

Source code in src/novelentitymatcher/novelty/storage/index.py
def get_distance_matrix(
    self, queries: np.ndarray, targets: np.ndarray | None = None
) -> np.ndarray:
    """
    Get distance matrix between queries and all indexed vectors.

    Args:
        queries: Query vectors of shape (n_queries, dim)
        targets: Optional target vectors (if None, use all indexed vectors)

    Returns:
        Distance matrix of shape (n_queries, n_targets)
    """
    if queries.ndim == 1:
        queries = queries.reshape(1, -1)

    # Normalize queries
    queries = self._normalize(queries).astype(np.float32, copy=False)

    if targets is None:
        vectors = self._ensure_vectors()
        if vectors.size == 0:
            return np.zeros((len(queries), 0), dtype=np.float32)
        return np.dot(queries, vectors.T)
    else:
        # Compute direct similarity
        targets = self._normalize(targets).astype(np.float32, copy=False)
        return np.dot(queries, targets.T)
save(path)

Save index to disk.

Source code in src/novelentitymatcher/novelty/storage/index.py
def save(self, path: str | Path) -> None:
    """Save index to disk."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    labels_path = path.with_suffix(".labels.json")
    vectors_path = path.with_suffix(".vectors.npy")

    if self.backend == ANNBackend.HNSWLIB:
        self._index.save_index(str(path.with_suffix(".bin")))
        logger.info(f"Saved HNSWlib index to {path}")
    elif self.backend == ANNBackend.FAISS:
        import faiss

        faiss.write_index(self._index, str(path.with_suffix(".index")))
        logger.info(f"Saved FAISS index to {path}")
    else:
        logger.info(f"Saved exact ANN fallback index to {path}")

    labels_path.write_text(
        json.dumps(self._labels, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )
    np.save(vectors_path, self._ensure_vectors())
load(path)

Load index from disk.

Source code in src/novelentitymatcher/novelty/storage/index.py
def load(self, path: str | Path) -> None:
    """Load index from disk."""
    path = Path(path)
    labels_path = path.with_suffix(".labels.json")
    vectors_path = path.with_suffix(".vectors.npy")

    if self.backend == ANNBackend.HNSWLIB:
        bin_path = path.with_suffix(".bin")
        if not bin_path.exists():
            raise FileNotFoundError(f"Index file not found: {bin_path}")
        self._index.load_index(str(bin_path))
        logger.info(f"Loaded HNSWlib index from {path}")
    elif self.backend == ANNBackend.FAISS:
        import faiss

        index_path = path.with_suffix(".index")
        if not index_path.exists():
            raise FileNotFoundError(f"Index file not found: {index_path}")
        self._index = faiss.read_index(str(index_path))
        logger.info(f"Loaded FAISS index from {path}")
    else:
        logger.info(f"Loaded exact ANN fallback index from {path}")

    if labels_path.exists():
        loaded_labels = json.loads(labels_path.read_text(encoding="utf-8"))
        self._labels = [str(label) for label in loaded_labels]
    else:
        # Backward-compatible fallback for older saved indexes.
        self._labels = [str(i) for i in range(self.n_elements)]

    if vectors_path.exists():
        self._vectors = np.load(vectors_path).astype(np.float32, copy=False)
        self._vector_buffer = [self._vectors]
    else:
        self._vectors = np.empty((0, self.dim), dtype=np.float32)
        self._vector_buffer = []
clear()

Clear all elements from the index.

Source code in src/novelentitymatcher/novelty/storage/index.py
def clear(self) -> None:
    """Clear all elements from the index."""
    if self.backend == ANNBackend.HNSWLIB:
        # HNSWlib doesn't support clear, need to reinitialize
        raise NotImplementedError(
            "HNSWlib doesn't support clearing. Create a new index instead."
        )
    elif self.backend == ANNBackend.FAISS:
        import faiss

        self._index = faiss.IndexFlatIP(self.dim)
        self._labels = []
        self._vectors = np.empty((0, self.dim), dtype=np.float32)
        self._vector_buffer = []
        logger.info("Cleared FAISS index")
    else:
        self._labels = []
        self._vectors = np.empty((0, self.dim), dtype=np.float32)
        self._vector_buffer = []
        logger.info("Cleared exact ANN fallback index")

Functions

novelentitymatcher.novelty.storage.persistence

File-based storage for novel class discovery results.

Provides utilities for saving and loading proposals in YAML format.

Classes

Functions

save_proposals(report, output_dir='./proposals', format='yaml')

Save novel class discovery report to file.

Parameters:

Name Type Description Default
report NovelClassDiscoveryReport

Discovery report to save

required
output_dir str | Path

Directory to save proposals in

'./proposals'
format str

Output format ('yaml' or 'json')

'yaml'

Returns:

Type Description
str

Path to saved file

Source code in src/novelentitymatcher/novelty/storage/persistence.py
def save_proposals(
    report: NovelClassDiscoveryReport,
    output_dir: str | Path = "./proposals",
    format: str = "yaml",
) -> str:
    """
    Save novel class discovery report to file.

    Args:
        report: Discovery report to save
        output_dir: Directory to save proposals in
        format: Output format ('yaml' or 'json')

    Returns:
        Path to saved file
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Generate a unique filename. Timestamp-only names can collide when
    # several discovery reports are saved within the same second.
    timestamp = report.timestamp.strftime("%Y%m%d-%H%M%S")
    filename = f"discovery_{timestamp}_{report.discovery_id}.{format}"
    output_path = output_dir / filename

    # Convert to dict
    data = _report_to_dict(report)

    # Save based on format
    if format == "yaml":
        with open(output_path, "w", encoding="utf-8") as f:
            yaml.dump(data, f, default_flow_style=False, allow_unicode=True)
    elif format == "json":
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
    else:
        raise ValueError(f"Unsupported format: {format}")

    logger.info(f"Saved discovery report to {output_path}")
    return str(output_path)

load_proposals(path)

Load novel class discovery report from file.

Parameters:

Name Type Description Default
path str | Path

Path to proposal file

required

Returns:

Type Description
NovelClassDiscoveryReport

NovelClassDiscoveryReport

Raises:

Type Description
FileNotFoundError

If file doesn't exist

ValueError

If file format is invalid

Source code in src/novelentitymatcher/novelty/storage/persistence.py
def load_proposals(path: str | Path) -> NovelClassDiscoveryReport:
    """
    Load novel class discovery report from file.

    Args:
        path: Path to proposal file

    Returns:
        NovelClassDiscoveryReport

    Raises:
        FileNotFoundError: If file doesn't exist
        ValueError: If file format is invalid
    """
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Proposal file not found: {path}")

    # Load based on format
    suffix = path.suffix.lower()
    if suffix in [".yaml", ".yml"]:
        with open(path, encoding="utf-8") as f:
            data = yaml.safe_load(f)
    elif suffix == ".json":
        with open(path, encoding="utf-8") as f:
            data = json.load(f)
    else:
        raise ValueError(f"Unsupported file format: {suffix}")

    # Convert dict to report
    report = _dict_to_report(data)

    logger.info(f"Loaded discovery report from {path}")
    return report

list_proposals(output_dir='./proposals', sort='newest')

List all discovery reports in output directory.

Parameters:

Name Type Description Default
output_dir str | Path

Directory containing proposals

'./proposals'
sort str

Sort order ('newest', 'oldest', 'name')

'newest'

Returns:

Type Description
list[dict[str, Any]]

List of proposal metadata dicts

Source code in src/novelentitymatcher/novelty/storage/persistence.py
def list_proposals(
    output_dir: str | Path = "./proposals",
    sort: str = "newest",
) -> list[dict[str, Any]]:
    """
    List all discovery reports in output directory.

    Args:
        output_dir: Directory containing proposals
        sort: Sort order ('newest', 'oldest', 'name')

    Returns:
        List of proposal metadata dicts
    """
    output_dir = Path(output_dir)
    if not output_dir.exists():
        return []

    proposals = []
    for path in output_dir.glob("discovery_*.yaml"):
        try:
            # Extract metadata from filename
            stem = path.stem  # e.g., "discovery_20250317-143000_ab12cd34"
            timestamp_str = stem.split("_", 2)[1]  # "20250317-143000"
            timestamp = datetime.strptime(timestamp_str, "%Y%m%d-%H%M%S").replace(
                tzinfo=timezone.utc
            )

            proposals.append(
                {
                    "path": str(path),
                    "filename": path.name,
                    "timestamp": timestamp,
                    "format": "yaml",
                }
            )
        except (ValueError, IndexError):
            logger.warning(f"Could not parse filename: {path.name}")
            continue

    # Also check JSON files
    for path in output_dir.glob("discovery_*.json"):
        try:
            stem = path.stem
            timestamp_str = stem.split("_", 2)[1]
            timestamp = datetime.strptime(timestamp_str, "%Y%m%d-%H%M%S").replace(
                tzinfo=timezone.utc
            )

            proposals.append(
                {
                    "path": str(path),
                    "filename": path.name,
                    "timestamp": timestamp,
                    "format": "json",
                }
            )
        except (ValueError, IndexError):
            logger.warning(f"Could not parse filename: {path.name}")
            continue

    # Sort
    if sort == "newest":
        proposals.sort(key=lambda x: str(x["timestamp"]), reverse=True)
    elif sort == "oldest":
        proposals.sort(key=lambda x: str(x["timestamp"]))
    elif sort == "name":
        proposals.sort(key=lambda x: str(x["filename"]))

    return proposals

export_summary(report, output_path, format='markdown')

Export a human-readable summary of the discovery report.

Parameters:

Name Type Description Default
report NovelClassDiscoveryReport

Discovery report to export

required
output_path str | Path

Path to save summary

required
format str

Output format ('markdown' or 'text')

'markdown'
Source code in src/novelentitymatcher/novelty/storage/persistence.py
def export_summary(
    report: NovelClassDiscoveryReport,
    output_path: str | Path,
    format: str = "markdown",
) -> None:
    """
    Export a human-readable summary of the discovery report.

    Args:
        report: Discovery report to export
        output_path: Path to save summary
        format: Output format ('markdown' or 'text')
    """
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Build summary
    lines = [
        "# Novel Class Discovery Report",
        "",
        f"**Discovery ID:** {report.discovery_id}",
        f"**Timestamp:** {report.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
        "",
        "## Overview",
        "",
        f"- **Novel Samples Detected:** {len(report.novel_sample_report.novel_samples)}",
        f"- **Detection Strategies:** {', '.join(report.novel_sample_report.detection_strategies)}",
        f"- **Discovery Clusters:** {len(report.discovery_clusters)}",
        "",
    ]

    if report.class_proposals:
        lines.extend(
            [
                "## Proposed Classes",
                "",
                f"**Number of Proposals:** {len(report.class_proposals.proposed_classes)}",
                f"**LLM Model Used:** {report.class_proposals.model_used}",
                "",
            ]
        )
        if report.review_records:
            pending = sum(
                1
                for record in report.review_records
                if record.state == "pending_review"
            )
            approved = sum(
                1 for record in report.review_records if record.state == "approved"
            )
            promoted = sum(
                1 for record in report.review_records if record.state == "promoted"
            )
            lines.extend(
                [
                    "## Review Lifecycle",
                    "",
                    f"- **Review Records:** {len(report.review_records)}",
                    f"- **Pending Review:** {pending}",
                    f"- **Approved:** {approved}",
                    f"- **Promoted:** {promoted}",
                    "",
                ]
            )

        for i, proposal in enumerate(report.class_proposals.proposed_classes, 1):
            lines.extend(
                [
                    f"### {i}. {proposal.name}",
                    "",
                    f"**Description:** {proposal.description}",
                    f"**Confidence:** {proposal.confidence:.2%}",
                    f"**Sample Count:** {proposal.sample_count}",
                    "",
                    f"**Justification:** {proposal.justification}",
                    "",
                    "**Example Samples:**",
                ]
            )
            for example in proposal.example_samples:
                lines.append(f"- {example}")
            lines.append("")

        if report.class_proposals.rejected_as_noise:
            lines.extend(
                [
                    "## Rejected as Noise",
                    "",
                ]
            )
            for noise in report.class_proposals.rejected_as_noise:
                lines.append(f"- {noise}")
            lines.append("")

    lines.extend(
        [
            "## Novel Samples",
            "",
        ]
    )

    for sample in report.novel_sample_report.novel_samples[:20]:  # Limit to 20
        lines.extend(
            [
                f"### Sample {sample.index}",
                "",
                f"**Text:** {sample.text}",
                f"**Predicted Class:** {sample.predicted_class}",
                f"**Confidence:** {sample.confidence:.2%}",
                f"**Novelty Score:** {sample.novelty_score:.2%}"
                if sample.novelty_score is not None
                else "**Novelty Score:** n/a",
                f"**Signals:** {', '.join([k for k, v in sample.signals.items() if v])}",
                "",
            ]
        )

    if len(report.novel_sample_report.novel_samples) > 20:
        lines.append(
            f"... and {len(report.novel_sample_report.novel_samples) - 20} more samples"
        )

    # Write to file
    content = "\n".join(lines)
    output_path.write_text(content, encoding="utf-8")

    logger.info(f"Exported summary to {output_path}")

novelentitymatcher.novelty.storage.review

Lifecycle-aware review and promotion storage for discovery proposals.

Classes

ProposalReviewManager(storage_path='./proposals/review_records.json')

Persist and update proposal review records for HITL workflows.

Source code in src/novelentitymatcher/novelty/storage/review.py
def __init__(self, storage_path: str | Path = "./proposals/review_records.json"):
    self.storage_path = Path(storage_path)
Functions
promote_with_index_update(review_id, matcher)

Promote and automatically update the matcher's entity index.

Parameters:

Name Type Description Default
review_id str

The review record to promote.

required
matcher Any

A NovelEntityMatcher or similar object with entities and optional reindex / fit methods.

required

Returns:

Type Description
PromotionResult

PromotionResult with full details of the promotion.

Source code in src/novelentitymatcher/novelty/storage/review.py
def promote_with_index_update(
    self,
    review_id: str,
    matcher: Any,
) -> PromotionResult:
    """Promote and automatically update the matcher's entity index.

    Args:
        review_id: The review record to promote.
        matcher: A NovelEntityMatcher or similar object with ``entities``
            and optional ``reindex`` / ``fit`` methods.

    Returns:
        PromotionResult with full details of the promotion.
    """
    entities = list(getattr(matcher, "entities", []))

    def index_updater(new_entities: list[dict[str, Any]]) -> None:
        matcher.entities = entities
        reindex = getattr(matcher, "reindex", None)
        if callable(reindex):
            reindex()
        else:
            fit = getattr(matcher, "fit", None)
            if callable(fit):
                fit()

    def retrain_callback() -> None:
        pass

    return self.promote(
        review_id,
        entities=entities,
        index_updater=index_updater,
        retrain_callback=retrain_callback,
    )