Skip to content

Multimodal

uncertainty_flow.multimodal

CrossModalAggregator

Bases: BaseUncertaintyModel

Train per-group models and combine their predictions.

Each feature group is trained independently using the same base model (cloned per group). Predictions are aggregated using the chosen strategy.

Parameters:

Name Type Description Default
feature_groups dict[str, list[str]]

Mapping of group name to list of feature column names.

required
aggregation str

Aggregation strategy - one of "product", "copula", "independent".

'product'
random_state int | None

Random seed (forwarded to cloned models where supported).

None

Examples:

>>> from uncertainty_flow.multimodal import CrossModalAggregator
>>> groups = {"numeric": ["x1", "x2"], "lag": ["lag_1"]}
>>> agg = CrossModalAggregator(feature_groups=groups, aggregation="independent")
Source code in uncertainty_flow/multimodal/aggregator.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class CrossModalAggregator(BaseUncertaintyModel):
    """Train per-group models and combine their predictions.

    Each feature group is trained independently using the same base model
    (cloned per group). Predictions are aggregated using the chosen strategy.

    Args:
        feature_groups: Mapping of group name to list of feature column names.
        aggregation: Aggregation strategy - one of "product", "copula",
            "independent".
        random_state: Random seed (forwarded to cloned models where supported).

    Examples:
        >>> from uncertainty_flow.multimodal import CrossModalAggregator
        >>> groups = {"numeric": ["x1", "x2"], "lag": ["lag_1"]}
        >>> agg = CrossModalAggregator(feature_groups=groups, aggregation="independent")
    """

    def __init__(
        self,
        feature_groups: dict[str, list[str]],
        aggregation: str = "product",
        random_state: int | None = None,
    ):
        if aggregation not in VALID_AGGREGATIONS:
            raise ConfigurationError(
                f"Invalid aggregation '{aggregation}'. Must be one of {VALID_AGGREGATIONS}"
            )
        if not feature_groups:
            raise ConfigurationError("feature_groups cannot be empty")

        self.feature_groups = feature_groups
        self.aggregation = aggregation
        self.random_state = random_state

        self._fitted = False
        self._group_models: dict = {}
        self._quantile_levels: list[float] | None = None
        self._target_name: str = ""

    # ------------------------------------------------------------------
    # Fit
    # ------------------------------------------------------------------

    def fit(
        self,
        data: PolarsInput,
        target: TargetSpec | None = None,
        *,
        base_model=None,
        **kwargs,
    ) -> "CrossModalAggregator":
        """Fit a cloned base model for each feature group.

        Args:
            data: Polars DataFrame or LazyFrame with features and target.
            target: Target column name.
            base_model: An sklearn-compatible estimator (e.g. ConformalRegressor)
                to clone for each group. Required.
            **kwargs: Ignored.

        Returns:
            self

        Raises:
            ValueError: If base_model is not provided or target is missing.
        """
        data = materialize_lazyframe(data)

        if base_model is None:
            raise ConfigurationError("base_model is required for CrossModalAggregator.fit()")
        if target is None:
            raise ConfigurationError("target is required for CrossModalAggregator.fit()")

        target_str = target if isinstance(target, str) else target[0]
        self._target_name = target_str

        self._group_models = {}
        for group_name, feature_cols in self.feature_groups.items():
            model_clone = copy.deepcopy(base_model)
            select_cols = feature_cols + [target_str]
            group_data = data.select(select_cols)
            model_clone.fit(group_data, target=target_str, **kwargs)
            self._group_models[group_name] = model_clone

            # Capture quantile levels from the first prediction
            if self._quantile_levels is None:
                sample_pred = model_clone.predict(group_data.head(2).select(feature_cols))
                self._quantile_levels = list(sample_pred._levels)

        self._fitted = True
        return self

    # ------------------------------------------------------------------
    # Predict
    # ------------------------------------------------------------------

    def predict(self, data: PolarsInput) -> DistributionPrediction:
        """Generate aggregated predictions across all feature groups.

        Args:
            data: Polars DataFrame or LazyFrame with features.

        Returns:
            DistributionPrediction with group_predictions populated.

        Raises:
            ModelNotFittedError: If called before fit().
        """
        if not self._fitted:
            raise ModelNotFittedError("CrossModalAggregator")

        if self._quantile_levels is None:
            raise RuntimeError("Internal error: quantile levels not set during fitting")

        data = materialize_lazyframe(data)

        group_preds: dict[str, DistributionPrediction] = {}
        for group_name, feature_cols in self.feature_groups.items():
            model = self._group_models[group_name]
            group_data = data.select(feature_cols)
            group_preds[group_name] = model.predict(group_data)

        aggregated = self._aggregate(group_preds)

        return DistributionPrediction(
            quantile_matrix=aggregated,
            quantile_levels=self._quantile_levels,
            target_names=[self._target_name],
            group_predictions=group_preds,
        )

    # ------------------------------------------------------------------
    # Aggregation strategies
    # ------------------------------------------------------------------

    def _aggregate(self, group_preds: dict[str, DistributionPrediction]) -> np.ndarray:
        """Dispatch to the chosen aggregation strategy."""
        if self.aggregation == "product":
            return self._aggregate_product(group_preds)
        elif self.aggregation == "copula":
            raise NotImplementedError(
                "aggregation='copula' is not implemented yet for CrossModalAggregator. "
                "Use aggregation='product' or 'independent'."
            )
        else:  # independent
            return self._aggregate_independent(group_preds)

    @staticmethod
    def _aggregate_product(group_preds: dict[str, DistributionPrediction]) -> np.ndarray:
        """Product aggregation: average medians + average deviations, then sort."""
        matrices = [p._quantiles for p in group_preds.values()]
        n_samples = matrices[0].shape[0]
        n_quantiles = matrices[0].shape[1]

        # Find the median index (closest to 0.5)
        levels = list(group_preds.values())[0]._levels
        median_idx = int(np.argmin(np.abs(levels - 0.5)))

        medians = np.column_stack([m[:, median_idx] for m in matrices])
        avg_median = np.mean(medians, axis=1)

        # Average deviations from each group's median
        deviation_sum = np.zeros((n_samples, n_quantiles))
        for m in matrices:
            deviation_sum += m - m[:, median_idx : median_idx + 1]
        avg_deviations = deviation_sum / len(matrices)

        result = avg_median[:, np.newaxis] + avg_deviations

        # Sort each row to ensure non-crossing quantiles
        result = np.sort(result, axis=1)
        return result

    @staticmethod
    def _aggregate_independent(group_preds: dict[str, DistributionPrediction]) -> np.ndarray:
        """Independent aggregation: simple average of quantile matrices."""
        matrices = [p._quantiles for p in group_preds.values()]
        result = np.mean(matrices, axis=0)
        # Sort rows to maintain non-crossing property
        result = np.sort(result, axis=1)
        return result

fit(data, target=None, *, base_model=None, **kwargs)

Fit a cloned base model for each feature group.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame with features and target.

required
target TargetSpec | None

Target column name.

None
base_model

An sklearn-compatible estimator (e.g. ConformalRegressor) to clone for each group. Required.

None
**kwargs

Ignored.

{}

Returns:

Type Description
CrossModalAggregator

self

Raises:

Type Description
ValueError

If base_model is not provided or target is missing.

Source code in uncertainty_flow/multimodal/aggregator.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def fit(
    self,
    data: PolarsInput,
    target: TargetSpec | None = None,
    *,
    base_model=None,
    **kwargs,
) -> "CrossModalAggregator":
    """Fit a cloned base model for each feature group.

    Args:
        data: Polars DataFrame or LazyFrame with features and target.
        target: Target column name.
        base_model: An sklearn-compatible estimator (e.g. ConformalRegressor)
            to clone for each group. Required.
        **kwargs: Ignored.

    Returns:
        self

    Raises:
        ValueError: If base_model is not provided or target is missing.
    """
    data = materialize_lazyframe(data)

    if base_model is None:
        raise ConfigurationError("base_model is required for CrossModalAggregator.fit()")
    if target is None:
        raise ConfigurationError("target is required for CrossModalAggregator.fit()")

    target_str = target if isinstance(target, str) else target[0]
    self._target_name = target_str

    self._group_models = {}
    for group_name, feature_cols in self.feature_groups.items():
        model_clone = copy.deepcopy(base_model)
        select_cols = feature_cols + [target_str]
        group_data = data.select(select_cols)
        model_clone.fit(group_data, target=target_str, **kwargs)
        self._group_models[group_name] = model_clone

        # Capture quantile levels from the first prediction
        if self._quantile_levels is None:
            sample_pred = model_clone.predict(group_data.head(2).select(feature_cols))
            self._quantile_levels = list(sample_pred._levels)

    self._fitted = True
    return self

predict(data)

Generate aggregated predictions across all feature groups.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame with features.

required

Returns:

Type Description
DistributionPrediction

DistributionPrediction with group_predictions populated.

Raises:

Type Description
ModelNotFittedError

If called before fit().

Source code in uncertainty_flow/multimodal/aggregator.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def predict(self, data: PolarsInput) -> DistributionPrediction:
    """Generate aggregated predictions across all feature groups.

    Args:
        data: Polars DataFrame or LazyFrame with features.

    Returns:
        DistributionPrediction with group_predictions populated.

    Raises:
        ModelNotFittedError: If called before fit().
    """
    if not self._fitted:
        raise ModelNotFittedError("CrossModalAggregator")

    if self._quantile_levels is None:
        raise RuntimeError("Internal error: quantile levels not set during fitting")

    data = materialize_lazyframe(data)

    group_preds: dict[str, DistributionPrediction] = {}
    for group_name, feature_cols in self.feature_groups.items():
        model = self._group_models[group_name]
        group_data = data.select(feature_cols)
        group_preds[group_name] = model.predict(group_data)

    aggregated = self._aggregate(group_preds)

    return DistributionPrediction(
        quantile_matrix=aggregated,
        quantile_levels=self._quantile_levels,
        target_names=[self._target_name],
        group_predictions=group_preds,
    )