Source code for bnl.ops

"""
This module provides the core algorithmic operations for transforming

The core components are:

- A generic `Strategy` base class that provides a registry pattern.
- Three specialized abstract strategy classes that inherit from `Strategy`:

  - `SalienceStrategy`: For calculating boundary importance.
  - `CleanStrategy`: For refining boundary contours.
  - `LevelStrategy`: For converting continuous salience into discrete levels.

- Concrete implementations for each strategy type, which can be extended.
"""

from __future__ import annotations

__all__ = [
    "SalienceStrategy",
    "SalByCount",
    "SalByDepth",
    "SalByProb",
    "CleanStrategy",
    "CleanByAbsorb",
    "CleanByKDE",
    "LevelStrategy",
    "LevelByUniqueSal",
    "LevelByMeanShift",
]

from abc import ABC, abstractmethod
from collections import Counter, defaultdict
from collections.abc import Callable
from typing import Any

import numpy as np
import scipy.signal
from mir_eval.hierarchy import _round
from sklearn.cluster import MeanShift
from sklearn.neighbors import KernelDensity

from .core import (
    BoundaryContour,
    BoundaryHierarchy,
    LeveledBoundary,
    MultiSegment,
    RatedBoundary,
    TimeSpan,
)

# region: Base Strategy Pattern


class Strategy(ABC):
    """Abstract base class for all strategy patterns in BNL."""

    # This is a placeholder; each subclass should have its own registry.
    _registry: dict[str, type[Strategy]] = {}

    @classmethod
    def register(cls, name: str) -> Callable[[type[Strategy]], type[Strategy]]:
        """A class method to register strategies in a central registry."""

        def decorator(strategy_cls: type[Strategy]) -> type[Strategy]:
            cls._registry[name] = strategy_cls
            return strategy_cls

        return decorator

    @abstractmethod
    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        raise NotImplementedError  # pragma: no cover


# endregion: Base Strategy Pattern


# region: Different Notions of Boundary Salience


[docs] class SalienceStrategy(Strategy): """Abstract base class for salience calculation strategies.""" _registry: dict[str, type[SalienceStrategy]] = {}
[docs] @abstractmethod def __call__(self, ms: MultiSegment) -> BoundaryContour: raise NotImplementedError # pragma: no cover
[docs] @SalienceStrategy.register("count") class SalByCount(SalienceStrategy): """Salience based on frequency of occurrence."""
[docs] def __call__(self, ms: MultiSegment) -> BoundaryHierarchy: """ The salience of each unique boundary time is the number of layers in the `MultiSegment` that it appears in. """ time_counts: Counter[float] = Counter(b.time for layer in ms.layers for b in layer.bs) return BoundaryHierarchy( bs=sorted( [LeveledBoundary(time=time, level=count) for time, count in time_counts.items()] ), name=ms.name or "Salience Hierarchy", )
[docs] @SalienceStrategy.register("depth") class SalByDepth(SalienceStrategy): """Salience based on the coarsest layer of appearance."""
[docs] def __call__(self, ms: MultiSegment) -> BoundaryHierarchy: """ Iterate from finest (last) to coarsest (first) layer. The salience is the layer's rank, starting from 1 for the finest layer. This ensures that if a boundary time exists in multiple layers, the one from the coarsest layer (with highest salience) is kept. """ boundary_map: dict[float, LeveledBoundary] = {} for salience, layer in enumerate(reversed(ms.layers), start=1): for boundary in layer.bs: boundary_map[boundary.time] = LeveledBoundary(time=boundary.time, level=salience) return BoundaryHierarchy( bs=sorted(list(boundary_map.values())), name=ms.name or "Salience Hierarchy" )
[docs] @SalienceStrategy.register("prob") class SalByProb(SalienceStrategy): """Salience weighted by layer density."""
[docs] def __call__(self, ms: MultiSegment) -> BoundaryContour: """ In layers with less boundaries, they are more important. This is because they are intrinsically in a higher level and more salient. """ time_saliences: defaultdict[float, float] = defaultdict(float) for layer in ms.layers: if len(layer.bs) > 2: # Weight is inversely proportional to the number of effective boundaries. weight = 1.0 / len(layer.bs[1:-1]) for boundary in layer.bs: time_saliences[boundary.time] += weight return BoundaryContour( name=ms.name or "Salience Contour", bs=sorted([RatedBoundary(t, s) for t, s in time_saliences.items()]), )
# endregion: Different Notions of Boundary Salience # region: Two ways to clean up boundaries closeby in time
[docs] class CleanStrategy(Strategy): """Abstract base class for boundary cleaning strategies.""" _registry: dict[str, type[CleanStrategy]] = {}
[docs] @abstractmethod def __call__(self, bc: BoundaryContour) -> BoundaryContour: """Cleans boundaries in a BoundaryContour.""" raise NotImplementedError # pragma: no cover
[docs] @CleanStrategy.register("absorb") class CleanByAbsorb(CleanStrategy): """Clean boundaries by absorbing less salient ones within a window.""" def __init__(self, window: float = 1.0) -> None: self.window = window
[docs] def __call__(self, bc: BoundaryContour) -> BoundaryContour: if len(bc.bs) <= 2: return bc inner_boundaries = sorted(bc.bs[1:-1], key=lambda b: b.salience, reverse=True) kept_boundaries = [bc.start, bc.end] for new_b in inner_boundaries: is_absorbed = any( abs(new_b.time - kept_b.time) <= self.window for kept_b in kept_boundaries ) if not is_absorbed: kept_boundaries.append(new_b) boundaries = [RatedBoundary(b.time, b.salience) for b in sorted(kept_boundaries)] return BoundaryContour(name=bc.name or "Cleaned Contour", bs=boundaries)
[docs] @CleanStrategy.register("kde") class CleanByKDE(CleanStrategy): """Clean boundaries by finding peaks in a weighted kernel density estimate.""" def __init__(self, bw: float = 1.0): self.time_kde = KernelDensity(kernel="gaussian", bandwidth=bw) def _build_time_grid(self, span: TimeSpan, frame_size: float = 0.1) -> np.ndarray: """ Build a grid of times using the same logic as mir_eval to build the ticks """ # Figure out how many frames we need by using `mir_eval`'s exact frame finding logic. n_frames = int( (_round(span.end.time, frame_size) - _round(span.start.time, frame_size)) / frame_size ) return np.arange(n_frames + 1) * frame_size + span.start.time
[docs] def __call__(self, bc: BoundaryContour) -> BoundaryContour: if len(bc.bs) < 4: # if only 3 boundaries (1 start, 1 end, 1 inner), just return return bc inner_boundaries = bc.bs[1:-1] times = np.array([b.time for b in inner_boundaries]) saliences = np.array([b.salience for b in inner_boundaries]) self.time_kde.fit(times.reshape(-1, 1), sample_weight=saliences) grid_times = self._build_time_grid(bc, frame_size=0.1) log_density = self.time_kde.score_samples(grid_times.reshape(-1, 1)) peak_indices = scipy.signal.find_peaks(log_density)[0] peak_times = grid_times.flatten()[peak_indices] peak_saliences = np.exp(log_density[peak_indices]) max_salience = np.max(peak_saliences) if peak_saliences.size > 0 else 1 new_inner_boundaries = [ RatedBoundary(t, s) for t, s in zip(peak_times, peak_saliences, strict=True) ] final_boundaries = [ RatedBoundary(bc.start.time, max_salience), *new_inner_boundaries, RatedBoundary(bc.end.time, max_salience), ] return BoundaryContour(name=bc.name or "Cleaned Contour", bs=sorted(final_boundaries))
# endregion: Two ways to clean up boundaries closeby in time # region: Stratification into levels
[docs] class LevelStrategy(Strategy): """Abstract base class for boundary level quantizing strategies.""" _registry: dict[str, type[LevelStrategy]] = {}
[docs] @abstractmethod def __call__(self, bc: BoundaryContour) -> BoundaryHierarchy: """Quantize boundaries' levels in a BoundaryContour.""" raise NotImplementedError # pragma: no cover
[docs] @LevelStrategy.register("unique") class LevelByUniqueSal(LevelStrategy): """ Find all distinct salience values and use their integer rank as level. """
[docs] def __call__(self, bc: BoundaryContour) -> BoundaryHierarchy: # Create a mapping from each unique salience value to its rank (level) unique_saliences = sorted({b.salience for b in bc.bs[1:-1]}) max_level = len(unique_saliences) if unique_saliences else 1 sal_level = {sal: lvl for lvl, sal in enumerate(unique_saliences, start=1)} # Create LeveledBoundary objects for each boundary in the contour inner_boundaries = [ LeveledBoundary(time=b.time, level=sal_level[b.salience]) for b in bc.bs[1:-1] ] return BoundaryHierarchy( bs=[ LeveledBoundary(time=bc.bs[0].time, level=max_level), *inner_boundaries, LeveledBoundary(time=bc.bs[-1].time, level=max_level), ], name=bc.name or "Unique Salience Hierarchy", )
[docs] @LevelStrategy.register("mean_shift") class LevelByMeanShift(LevelStrategy): """ Use mean shift clustering to find peaks in the salience values and clusters them into levels. """ def __init__(self, bw: float = 0.05): self.sal_ms = MeanShift(bandwidth=bw)
[docs] def __call__(self, bc: BoundaryContour) -> BoundaryHierarchy: if len(bc.bs) < 4: # if only 3 boundaries (1 start, 1 end, 1 inner), just return return bc.level(strategy="unique") inner_boundaries = bc.bs[1:-1] saliences = np.array([b.salience for b in inner_boundaries]) self.sal_ms.fit(saliences.reshape(-1, 1)) quantized_salience = self.sal_ms.cluster_centers_.flatten()[self.sal_ms.labels_] inner_boundaries = [ RatedBoundary(time=b.time, salience=s) for b, s in zip(inner_boundaries, quantized_salience) ] quantized_bc = BoundaryContour(bs=[bc.start, *inner_boundaries, bc.end], name=bc.name) return quantized_bc.level(strategy="unique")
# endregion: Stratification into levels