Skip to content

Wrappers

uncertainty_flow.wrappers

Wrappers for adding uncertainty quantification to sklearn models.

AdaptiveConformalForecaster

Bases: BaseUncertaintyModel

Adaptive Conformal Inference wrapper for sequential prediction.

Wraps a fitted BaseUncertaintyModel and adjusts prediction interval width dynamically. After each observation, call :meth:update (or :meth:update_batch) to adapt the coverage level.

The adaptive rule (Gibbs & Candes 2021):

alpha_{t+1} = alpha_t + gamma * (alpha_t - 1(|y_t - yhat_t| > q_{1-alpha_t}))

If recent coverage is too low (errors exceed intervals), alpha grows, widening intervals. If coverage is too high, alpha shrinks, narrowing intervals.

Examples:

>>> from sklearn.ensemble import GradientBoostingRegressor
>>> from uncertainty_flow.wrappers import ConformalRegressor
>>> from uncertainty_flow.wrappers import AdaptiveConformalForecaster
>>>
>>> base = ConformalRegressor(GradientBoostingRegressor())
>>> base.fit(df_train, target="y")
>>> aci = AdaptiveConformalForecaster(model=base)
>>> aci.fit(df_calib, target="y")
>>>
>>> for t in range(n_steps):
...     pred = aci.predict(df.iloc[t:t+1])
...     aci.update(y_true[t])
Source code in uncertainty_flow/wrappers/adaptive_conformal.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
class AdaptiveConformalForecaster(BaseUncertaintyModel):
    """
    Adaptive Conformal Inference wrapper for sequential prediction.

    Wraps a fitted ``BaseUncertaintyModel`` and adjusts prediction interval
    width dynamically. After each observation, call :meth:`update` (or
    :meth:`update_batch`) to adapt the coverage level.

    The adaptive rule (Gibbs & Candes 2021):

        alpha_{t+1} = alpha_t + gamma * (alpha_t - 1(|y_t - yhat_t| > q_{1-alpha_t}))

    If recent coverage is too low (errors exceed intervals), alpha grows,
    widening intervals. If coverage is too high, alpha shrinks, narrowing
    intervals.

    Examples:
        >>> from sklearn.ensemble import GradientBoostingRegressor
        >>> from uncertainty_flow.wrappers import ConformalRegressor
        >>> from uncertainty_flow.wrappers import AdaptiveConformalForecaster
        >>>
        >>> base = ConformalRegressor(GradientBoostingRegressor())
        >>> base.fit(df_train, target="y")
        >>> aci = AdaptiveConformalForecaster(model=base)
        >>> aci.fit(df_calib, target="y")
        >>>
        >>> for t in range(n_steps):
        ...     pred = aci.predict(df.iloc[t:t+1])
        ...     aci.update(y_true[t])
    """

    def __init__(
        self,
        model: BaseUncertaintyModel,
        alpha: float = 0.1,
        gamma: float = 0.01,
    ):
        """
        Args:
            model: A fitted BaseUncertaintyModel.
            alpha: Initial miscoverage level (default 0.1 → 90% coverage).
            gamma: Learning rate for alpha adaptation (default 0.01).
        """
        if not (0 < alpha < 1):
            raise ValueError(f"alpha must be in (0, 1), got {alpha}")
        if gamma <= 0:
            raise ValueError(f"gamma must be positive, got {gamma}")

        self.model = model
        self._initial_alpha = alpha
        self.gamma = gamma

        self._fitted = False
        self._alpha_t = alpha
        self._scores: list[float] = []
        self._feature_cols: list[str] = []
        self._target_col: str = ""
        # Stored from the last predict() call for use in update()
        self._last_point_pred: np.ndarray | None = None
        self._last_q_value: float | None = None
        self._last_n_targets: int = 1

    @property
    def current_alpha(self) -> float:
        """Current adaptive miscoverage level."""
        return self._alpha_t

    def fit(
        self,
        data: PolarsInput,
        target: TargetSpec | None = None,
        **kwargs,
    ) -> AdaptiveConformalForecaster:
        """
        Initialize ACI with calibration conformal scores.

        Args:
            data: Calibration dataset for computing initial nonconformity scores.
            target: Target column name.

        Returns:
            self
        """
        data = materialize_lazyframe(data)

        if target is None:
            from ..utils.exceptions import ConfigurationError

            raise ConfigurationError("target is required for AdaptiveConformalForecaster")
        target_str = target if isinstance(target, str) else target[0]
        self._target_col = target_str

        self._feature_cols = [c for c in data.columns if c != target_str]

        pred = self.model.predict(data)
        y_true = to_numpy_series(data[target_str])

        if len(pred._targets) == 1:
            median_vals = pred.median()
            if isinstance(median_vals, pl.DataFrame):
                point_preds = median_vals.to_numpy().ravel()
            else:
                point_preds = median_vals.to_numpy()
        else:
            median_df = pred.median()
            point_preds = median_df[target_str].to_numpy()

        residuals = np.abs(y_true - point_preds)
        self._scores = residuals.tolist()
        self._alpha_t = self._initial_alpha
        self._fitted = True
        return self

    def predict(
        self,
        data: PolarsInput,
        steps: int = 1,
    ) -> DistributionPrediction:
        """
        Generate adaptive prediction intervals.

        Args:
            data: Input data for prediction.
            steps: Number of steps ahead (propagates alpha adjustment for
                multi-step forecasts). Default 1.

        Returns:
            DistributionPrediction with intervals reflecting current alpha_t.
        """
        if not self._fitted:
            raise ModelNotFittedError("AdaptiveConformalForecaster")

        data = materialize_lazyframe(data)
        pred = self.model.predict(data)

        n_pred = pred._n_samples
        n_quantiles = pred._n_quantiles

        alphas = self._propagate_alpha(steps)
        alpha = alphas[-1]

        score_arr = np.array(self._scores)
        if len(score_arr) > 0:
            q_value = np.quantile(score_arr, min(1 - alpha, 1.0))
        else:
            q_value = 0.0

        # Scale factors: map quantile levels to conformal interval bounds.
        # The outermost levels should map to +/- q_value around the median.
        lower_scale = 0.5 - pred._levels[0]
        upper_scale = pred._levels[-1] - 0.5
        lower_scale = max(lower_scale, 1e-12)
        upper_scale = max(upper_scale, 1e-12)

        if len(pred._targets) == 1:
            median_idx = pred._find_nearest_quantile_index(0.5)
            point_preds = pred._quantiles[:, median_idx].copy()

            output_quantiles = np.empty((n_pred, n_quantiles))
            for j in range(n_quantiles):
                level = pred._levels[j]
                if level < 0.5:
                    output_quantiles[:, j] = point_preds - q_value * (0.5 - level) / lower_scale
                elif level > 0.5:
                    output_quantiles[:, j] = point_preds + q_value * (level - 0.5) / upper_scale
                else:
                    output_quantiles[:, j] = point_preds
        else:
            output_quantiles = pred._quantiles.copy()
            for t_idx in range(len(pred._targets)):
                q_start = t_idx * n_quantiles
                median_idx = pred._find_nearest_quantile_index(0.5)
                point_preds = pred._quantiles[:, q_start + median_idx].copy()

                for j in range(n_quantiles):
                    col_idx = q_start + j
                    level = pred._levels[j]
                    if level < 0.5:
                        output_quantiles[:, col_idx] = (
                            point_preds - q_value * (0.5 - level) / lower_scale
                        )
                    elif level > 0.5:
                        output_quantiles[:, col_idx] = (
                            point_preds + q_value * (level - 0.5) / upper_scale
                        )
                    else:
                        output_quantiles[:, col_idx] = point_preds

        output_quantiles = np.sort(output_quantiles, axis=1)

        # Store the last prediction's point estimates and conformal quantile
        # for use in the subsequent update() call.
        if n_pred > 0:
            n_targets = len(pred._targets)
            self._last_n_targets = n_targets
            if n_targets == 1:
                self._last_point_pred = np.array([float(point_preds[-1])])
            else:
                last_point_preds = np.empty(n_targets)
                for t_idx in range(n_targets):
                    q_start = t_idx * n_quantiles
                    median_idx_t = pred._find_nearest_quantile_index(0.5)
                    last_point_preds[t_idx] = float(output_quantiles[-1, q_start + median_idx_t])
                self._last_point_pred = last_point_preds
            self._last_q_value = float(q_value)

        return DistributionPrediction(
            quantile_matrix=output_quantiles,
            quantile_levels=pred._levels.tolist(),
            target_names=list(pred._targets),
        )

    def update(self, y_true: float | int | np.ndarray) -> None:
        """
        Update alpha after observing a true value.

        Must be called after :meth:`predict` with the corresponding true
        observation. Updates internal conformal scores and adapts alpha.

        For univariate models, pass a scalar. For multivariate, pass an
        array-like with one value per target.

        Args:
            y_true: Observed true value (scalar for univariate, array for
                multivariate).
        """
        if not self._fitted:
            raise ModelNotFittedError("AdaptiveConformalForecaster")

        if self._last_point_pred is None:
            raise RuntimeError(
                "update() called before predict(). "
                "Call predict() first to generate a prediction for this time step."
            )

        y_arr = np.atleast_1d(np.asarray(y_true, dtype=float))

        if y_arr.shape[0] != self._last_n_targets:
            raise ValueError(
                f"y_true has {y_arr.shape[0]} value(s) but model has "
                f"{self._last_n_targets} target(s)."
            )

        if self._last_n_targets == 1:
            new_score = abs(float(y_arr[0]) - self._last_point_pred[0])
        else:
            new_score = float(np.mean(np.abs(y_arr - self._last_point_pred)))

        q_value = self._last_q_value if self._last_q_value is not None else 0.0
        exceeded = 1.0 if new_score > q_value else 0.0

        self._alpha_t = self._alpha_t + self.gamma * (self._alpha_t - exceeded)
        self._alpha_t = max(1e-6, min(self._alpha_t, 1.0 - 1e-6))

        self._scores.append(new_score)

    def update_batch(self, y_true: pl.Series | np.ndarray) -> None:
        """
        Sequentially update alpha for a batch of observations.

        Calls :meth:`update` for each observation in order. This is
        equivalent to calling ``update()`` in a loop but more convenient
        for offline replay and testing.

        .. note::
            This method assumes you have already called ``predict()`` for
            each corresponding observation and are now providing the true
            values in sequence. It does NOT call ``predict()`` internally.

        Args:
            y_true: Array or Series of observed true values.
                Shape ``(n,)`` for univariate, ``(n, n_targets)`` for
                multivariate.
        """
        if isinstance(y_true, pl.Series):
            y_arr = to_numpy_series(y_true)
        else:
            y_arr = np.asarray(y_true, dtype=float)

        if y_arr.ndim == 1:
            for y in y_arr:
                self.update(float(y))
        else:
            for row in y_arr:
                self.update(row)

    def _propagate_alpha(self, steps: int) -> list[float]:
        """Project alpha forward for multi-step prediction (no actual update).

        Without observing future values, alpha cannot be adapted. This method
        returns a constant projection (current alpha repeated), which is a
        conservative default.
        """
        return [self._alpha_t] * steps

current_alpha property

Current adaptive miscoverage level.

__init__(model, alpha=0.1, gamma=0.01)

Parameters:

Name Type Description Default
model BaseUncertaintyModel

A fitted BaseUncertaintyModel.

required
alpha float

Initial miscoverage level (default 0.1 → 90% coverage).

0.1
gamma float

Learning rate for alpha adaptation (default 0.01).

0.01
Source code in uncertainty_flow/wrappers/adaptive_conformal.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def __init__(
    self,
    model: BaseUncertaintyModel,
    alpha: float = 0.1,
    gamma: float = 0.01,
):
    """
    Args:
        model: A fitted BaseUncertaintyModel.
        alpha: Initial miscoverage level (default 0.1 → 90% coverage).
        gamma: Learning rate for alpha adaptation (default 0.01).
    """
    if not (0 < alpha < 1):
        raise ValueError(f"alpha must be in (0, 1), got {alpha}")
    if gamma <= 0:
        raise ValueError(f"gamma must be positive, got {gamma}")

    self.model = model
    self._initial_alpha = alpha
    self.gamma = gamma

    self._fitted = False
    self._alpha_t = alpha
    self._scores: list[float] = []
    self._feature_cols: list[str] = []
    self._target_col: str = ""
    # Stored from the last predict() call for use in update()
    self._last_point_pred: np.ndarray | None = None
    self._last_q_value: float | None = None
    self._last_n_targets: int = 1

fit(data, target=None, **kwargs)

Initialize ACI with calibration conformal scores.

Parameters:

Name Type Description Default
data PolarsInput

Calibration dataset for computing initial nonconformity scores.

required
target TargetSpec | None

Target column name.

None

Returns:

Type Description
AdaptiveConformalForecaster

self

Source code in uncertainty_flow/wrappers/adaptive_conformal.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def fit(
    self,
    data: PolarsInput,
    target: TargetSpec | None = None,
    **kwargs,
) -> AdaptiveConformalForecaster:
    """
    Initialize ACI with calibration conformal scores.

    Args:
        data: Calibration dataset for computing initial nonconformity scores.
        target: Target column name.

    Returns:
        self
    """
    data = materialize_lazyframe(data)

    if target is None:
        from ..utils.exceptions import ConfigurationError

        raise ConfigurationError("target is required for AdaptiveConformalForecaster")
    target_str = target if isinstance(target, str) else target[0]
    self._target_col = target_str

    self._feature_cols = [c for c in data.columns if c != target_str]

    pred = self.model.predict(data)
    y_true = to_numpy_series(data[target_str])

    if len(pred._targets) == 1:
        median_vals = pred.median()
        if isinstance(median_vals, pl.DataFrame):
            point_preds = median_vals.to_numpy().ravel()
        else:
            point_preds = median_vals.to_numpy()
    else:
        median_df = pred.median()
        point_preds = median_df[target_str].to_numpy()

    residuals = np.abs(y_true - point_preds)
    self._scores = residuals.tolist()
    self._alpha_t = self._initial_alpha
    self._fitted = True
    return self

predict(data, steps=1)

Generate adaptive prediction intervals.

Parameters:

Name Type Description Default
data PolarsInput

Input data for prediction.

required
steps int

Number of steps ahead (propagates alpha adjustment for multi-step forecasts). Default 1.

1

Returns:

Type Description
DistributionPrediction

DistributionPrediction with intervals reflecting current alpha_t.

Source code in uncertainty_flow/wrappers/adaptive_conformal.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def predict(
    self,
    data: PolarsInput,
    steps: int = 1,
) -> DistributionPrediction:
    """
    Generate adaptive prediction intervals.

    Args:
        data: Input data for prediction.
        steps: Number of steps ahead (propagates alpha adjustment for
            multi-step forecasts). Default 1.

    Returns:
        DistributionPrediction with intervals reflecting current alpha_t.
    """
    if not self._fitted:
        raise ModelNotFittedError("AdaptiveConformalForecaster")

    data = materialize_lazyframe(data)
    pred = self.model.predict(data)

    n_pred = pred._n_samples
    n_quantiles = pred._n_quantiles

    alphas = self._propagate_alpha(steps)
    alpha = alphas[-1]

    score_arr = np.array(self._scores)
    if len(score_arr) > 0:
        q_value = np.quantile(score_arr, min(1 - alpha, 1.0))
    else:
        q_value = 0.0

    # Scale factors: map quantile levels to conformal interval bounds.
    # The outermost levels should map to +/- q_value around the median.
    lower_scale = 0.5 - pred._levels[0]
    upper_scale = pred._levels[-1] - 0.5
    lower_scale = max(lower_scale, 1e-12)
    upper_scale = max(upper_scale, 1e-12)

    if len(pred._targets) == 1:
        median_idx = pred._find_nearest_quantile_index(0.5)
        point_preds = pred._quantiles[:, median_idx].copy()

        output_quantiles = np.empty((n_pred, n_quantiles))
        for j in range(n_quantiles):
            level = pred._levels[j]
            if level < 0.5:
                output_quantiles[:, j] = point_preds - q_value * (0.5 - level) / lower_scale
            elif level > 0.5:
                output_quantiles[:, j] = point_preds + q_value * (level - 0.5) / upper_scale
            else:
                output_quantiles[:, j] = point_preds
    else:
        output_quantiles = pred._quantiles.copy()
        for t_idx in range(len(pred._targets)):
            q_start = t_idx * n_quantiles
            median_idx = pred._find_nearest_quantile_index(0.5)
            point_preds = pred._quantiles[:, q_start + median_idx].copy()

            for j in range(n_quantiles):
                col_idx = q_start + j
                level = pred._levels[j]
                if level < 0.5:
                    output_quantiles[:, col_idx] = (
                        point_preds - q_value * (0.5 - level) / lower_scale
                    )
                elif level > 0.5:
                    output_quantiles[:, col_idx] = (
                        point_preds + q_value * (level - 0.5) / upper_scale
                    )
                else:
                    output_quantiles[:, col_idx] = point_preds

    output_quantiles = np.sort(output_quantiles, axis=1)

    # Store the last prediction's point estimates and conformal quantile
    # for use in the subsequent update() call.
    if n_pred > 0:
        n_targets = len(pred._targets)
        self._last_n_targets = n_targets
        if n_targets == 1:
            self._last_point_pred = np.array([float(point_preds[-1])])
        else:
            last_point_preds = np.empty(n_targets)
            for t_idx in range(n_targets):
                q_start = t_idx * n_quantiles
                median_idx_t = pred._find_nearest_quantile_index(0.5)
                last_point_preds[t_idx] = float(output_quantiles[-1, q_start + median_idx_t])
            self._last_point_pred = last_point_preds
        self._last_q_value = float(q_value)

    return DistributionPrediction(
        quantile_matrix=output_quantiles,
        quantile_levels=pred._levels.tolist(),
        target_names=list(pred._targets),
    )

update(y_true)

Update alpha after observing a true value.

Must be called after :meth:predict with the corresponding true observation. Updates internal conformal scores and adapts alpha.

For univariate models, pass a scalar. For multivariate, pass an array-like with one value per target.

Parameters:

Name Type Description Default
y_true float | int | ndarray

Observed true value (scalar for univariate, array for multivariate).

required
Source code in uncertainty_flow/wrappers/adaptive_conformal.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def update(self, y_true: float | int | np.ndarray) -> None:
    """
    Update alpha after observing a true value.

    Must be called after :meth:`predict` with the corresponding true
    observation. Updates internal conformal scores and adapts alpha.

    For univariate models, pass a scalar. For multivariate, pass an
    array-like with one value per target.

    Args:
        y_true: Observed true value (scalar for univariate, array for
            multivariate).
    """
    if not self._fitted:
        raise ModelNotFittedError("AdaptiveConformalForecaster")

    if self._last_point_pred is None:
        raise RuntimeError(
            "update() called before predict(). "
            "Call predict() first to generate a prediction for this time step."
        )

    y_arr = np.atleast_1d(np.asarray(y_true, dtype=float))

    if y_arr.shape[0] != self._last_n_targets:
        raise ValueError(
            f"y_true has {y_arr.shape[0]} value(s) but model has "
            f"{self._last_n_targets} target(s)."
        )

    if self._last_n_targets == 1:
        new_score = abs(float(y_arr[0]) - self._last_point_pred[0])
    else:
        new_score = float(np.mean(np.abs(y_arr - self._last_point_pred)))

    q_value = self._last_q_value if self._last_q_value is not None else 0.0
    exceeded = 1.0 if new_score > q_value else 0.0

    self._alpha_t = self._alpha_t + self.gamma * (self._alpha_t - exceeded)
    self._alpha_t = max(1e-6, min(self._alpha_t, 1.0 - 1e-6))

    self._scores.append(new_score)

update_batch(y_true)

Sequentially update alpha for a batch of observations.

Calls :meth:update for each observation in order. This is equivalent to calling update() in a loop but more convenient for offline replay and testing.

.. note:: This method assumes you have already called predict() for each corresponding observation and are now providing the true values in sequence. It does NOT call predict() internally.

Parameters:

Name Type Description Default
y_true Series | ndarray

Array or Series of observed true values. Shape (n,) for univariate, (n, n_targets) for multivariate.

required
Source code in uncertainty_flow/wrappers/adaptive_conformal.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def update_batch(self, y_true: pl.Series | np.ndarray) -> None:
    """
    Sequentially update alpha for a batch of observations.

    Calls :meth:`update` for each observation in order. This is
    equivalent to calling ``update()`` in a loop but more convenient
    for offline replay and testing.

    .. note::
        This method assumes you have already called ``predict()`` for
        each corresponding observation and are now providing the true
        values in sequence. It does NOT call ``predict()`` internally.

    Args:
        y_true: Array or Series of observed true values.
            Shape ``(n,)`` for univariate, ``(n, n_targets)`` for
            multivariate.
    """
    if isinstance(y_true, pl.Series):
        y_arr = to_numpy_series(y_true)
    else:
        y_arr = np.asarray(y_true, dtype=float)

    if y_arr.ndim == 1:
        for y in y_arr:
            self.update(float(y))
    else:
        for row in y_arr:
            self.update(row)

ConformalRegressor

Bases: BaseUncertaintyModel

Wrap any scikit-learn regressor with conformal prediction.

Coverage guarantee: ✅ (exchangeability assumption) Non-crossing: ✅ (post-sort)

Examples:

>>> from sklearn.ensemble import GradientBoostingRegressor
>>> from uncertainty_flow.wrappers import ConformalRegressor
>>> import polars as pl
>>>
>>> df = pl.DataFrame({
...     "feature1": [1, 2, 3, 4, 5],
...     "feature2": [2, 4, 6, 8, 10],
...     "target": [1.5, 3.5, 5.5, 7.5, 9.5],
... })
>>> base = GradientBoostingRegressor(random_state=42)
>>> model = ConformalRegressor(base_model=base, random_state=42)
>>> model.fit(df, target="target")
>>> pred = model.predict(df)
>>> pred.interval(0.9)
Source code in uncertainty_flow/wrappers/conformal.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class ConformalRegressor(BaseUncertaintyModel):
    """
    Wrap any scikit-learn regressor with conformal prediction.

    Coverage guarantee: ✅ (exchangeability assumption)
    Non-crossing: ✅ (post-sort)

    Examples:
        >>> from sklearn.ensemble import GradientBoostingRegressor
        >>> from uncertainty_flow.wrappers import ConformalRegressor
        >>> import polars as pl
        >>>
        >>> df = pl.DataFrame({
        ...     "feature1": [1, 2, 3, 4, 5],
        ...     "feature2": [2, 4, 6, 8, 10],
        ...     "target": [1.5, 3.5, 5.5, 7.5, 9.5],
        ... })
        >>> base = GradientBoostingRegressor(random_state=42)
        >>> model = ConformalRegressor(base_model=base, random_state=42)
        >>> model.fit(df, target="target")
        >>> pred = model.predict(df)
        >>> pred.interval(0.9)
    """

    def __init__(
        self,
        base_model: BaseEstimator,
        calibration_method: str = "holdout",
        calibration_size: float = 0.2,
        coverage_target: float = 0.9,
        auto_tune: bool = True,
        uncertainty_features: list[str] | None = None,
        random_state: int | None = None,
    ):
        """
        Initialize ConformalRegressor.

        Args:
            base_model: Any sklearn-compatible regressor
            calibration_method: "holdout" or "cross"
            calibration_size: Fraction of data for calibration (0-1)
            coverage_target: Default coverage level for intervals
            auto_tune: Whether to tune supported hyperparameters before final fit
            uncertainty_features: Optional hint for heteroscedastic features
            random_state: Random seed
        """
        self.base_model = base_model
        self.calibration_method = calibration_method
        self.calibration_size = calibration_size
        self.coverage_target = coverage_target
        self.auto_tune = auto_tune
        self.uncertainty_features = uncertainty_features
        self.random_state = random_state

        # Fitted attributes
        self._fitted = False
        self._feature_cols_: list[str] = []
        self._target_col_: str = ""
        self._quantiles_: np.ndarray | None = None
        self._quantile_levels_: np.ndarray | None = None
        self._uncertainty_drivers_: pl.DataFrame | None = None
        self.tuned_params_: dict[str, float | int] = {}

    def _resolve_quantile_levels(self) -> np.ndarray:
        """Return quantile levels used to compute stored residual quantiles."""
        if self._quantile_levels_ is not None:
            return self._quantile_levels_

        fallback_levels = np.asarray(list(DEFAULT_QUANTILES), dtype=float)
        if self._quantiles_ is not None and len(fallback_levels) != len(self._quantiles_):
            raise ConfigurationError(
                "Current config quantile count does not match fitted residual quantiles. "
                "Refit the model after setting the desired quantile configuration."
            )
        return fallback_levels

    def _auto_tune(
        self,
        data: pl.DataFrame,
        target: str,
    ) -> None:
        """Tune params using validation splits, with CV averaging when inner splits exist."""
        eval_splits = build_tune_splits(data, task_type="tabular", random_state=self.random_state)

        best_score = float("inf")
        best_params: dict[str, float | int] = {}
        best_model = clone(self.base_model)

        for base_params in estimator_param_candidates(self.base_model):
            tuned_base = clone(self.base_model)
            if base_params:
                tuned_base.set_params(**base_params)

            for calib_size in valid_calibration_candidates(
                len(eval_splits[0][0]), self.calibration_size, [0.15, 0.2, 0.25, 0.3]
            ):
                split_scores: list[float] = []
                for split_train, split_val in eval_splits:
                    candidate = ConformalRegressor(
                        base_model=tuned_base,
                        calibration_method=self.calibration_method,
                        calibration_size=calib_size,
                        coverage_target=self.coverage_target,
                        auto_tune=False,
                        uncertainty_features=self.uncertainty_features,
                        random_state=self.random_state,
                    )
                    with warnings.catch_warnings():
                        warnings.simplefilter("ignore")
                        candidate.fit(split_train, target=target)
                        pred = candidate.predict(split_val)
                    score = score_distribution_prediction(
                        pred,
                        split_val[target],
                        [target],
                        confidence=self.coverage_target,
                    )
                    split_scores.append(score)
                avg_score = float(np.mean(split_scores))
                if avg_score < best_score:
                    best_score = avg_score
                    best_model = clone(tuned_base)
                    best_params = {**base_params, "calibration_size": calib_size}

        self.base_model = best_model
        self.calibration_size = float(best_params.get("calibration_size", self.calibration_size))
        self.tuned_params_ = best_params

    def fit(
        self,
        data: PolarsInput,
        target: TargetSpec | None = None,
        **kwargs,
    ) -> "ConformalRegressor":
        """
        Fit the conformal regressor.

        Args:
            data: Polars DataFrame or LazyFrame with features and target
            target: Target column name(s)
            **kwargs: Additional parameters (unused)

        Returns:
            self (for method chaining)
        """
        # Materialize LazyFrame once at the start
        data = materialize_lazyframe(data)

        # Get feature columns
        if target is None:
            raise ConfigurationError("target is required for ConformalRegressor")
        target_str = target if isinstance(target, str) else target[0]
        self._target_col_ = target_str

        if target_str not in data.columns:
            raise InvalidDataError(
                f"Target column '{target_str}' not found in data. "
                f"Available columns: {list(data.columns)}"
            )

        if self.auto_tune:
            self._auto_tune(data, target_str)

        feature_cols = [col for col in data.columns if col != target_str]
        if not feature_cols:
            raise InvalidDataError(
                f"No feature columns remaining after excluding target '{target_str}'. "
                f"Data must have at least one feature column."
            )
        self._feature_cols_ = feature_cols

        # Automatic validation split strategy selection
        plan = select_validation_plan(
            data,
            task_type="tabular",
            random_state=self.random_state,
            holdout_fraction=self.calibration_size,
        )
        train, calib = plan.outer_split

        # Convert to numpy - single collect already done above
        x_train = to_numpy(train, feature_cols)
        y_train = to_numpy_series(train[target_str]).flatten()
        x_calib = to_numpy(calib, feature_cols)
        y_calib = to_numpy_series(calib[target_str]).flatten()

        # Fit base model
        self.base_model.fit(x_train, y_train)

        # Compute conformal quantiles on calibration set
        calib_preds = self.base_model.predict(x_calib)
        residuals = y_calib - calib_preds

        # Store quantiles for prediction
        self._quantile_levels_ = np.asarray(list(DEFAULT_QUANTILES), dtype=float)
        self._quantiles_ = np.quantile(residuals, self._quantile_levels_)

        # Compute uncertainty drivers using calibration features
        calib_features = calib.select(feature_cols)
        self._uncertainty_drivers_ = compute_uncertainty_drivers(residuals, calib_features)

        self._fitted = True
        return self

    def predict(self, data: PolarsInput) -> DistributionPrediction:
        """
        Generate probabilistic predictions.

        Args:
            data: Polars DataFrame or LazyFrame with features

        Returns:
            DistributionPrediction with quantile predictions
        """
        if not self._fitted:
            raise ModelNotFittedError("ConformalRegressor")

        # Materialize LazyFrame if needed
        data = materialize_lazyframe(data)

        # Get predictions
        x = to_numpy(data, self._feature_cols_)
        point_preds = self.base_model.predict(x)

        # Add conformal quantiles
        if self._quantiles_ is None:
            raise ModelNotFittedError("ConformalRegressor")
        quantile_levels = self._resolve_quantile_levels()
        quantile_matrix = np.zeros((len(point_preds), len(quantile_levels)))
        for i, q in enumerate(self._quantiles_):
            quantile_matrix[:, i] = point_preds + q

        return DistributionPrediction(
            quantile_matrix=quantile_matrix,
            quantile_levels=quantile_levels.tolist(),
            target_names=[self._target_col_],
        )

    @property
    def uncertainty_drivers_(self) -> pl.DataFrame | None:
        """Return residual correlation analysis results."""
        return self._uncertainty_drivers_

uncertainty_drivers_ property

Return residual correlation analysis results.

__init__(base_model, calibration_method='holdout', calibration_size=0.2, coverage_target=0.9, auto_tune=True, uncertainty_features=None, random_state=None)

Initialize ConformalRegressor.

Parameters:

Name Type Description Default
base_model BaseEstimator

Any sklearn-compatible regressor

required
calibration_method str

"holdout" or "cross"

'holdout'
calibration_size float

Fraction of data for calibration (0-1)

0.2
coverage_target float

Default coverage level for intervals

0.9
auto_tune bool

Whether to tune supported hyperparameters before final fit

True
uncertainty_features list[str] | None

Optional hint for heteroscedastic features

None
random_state int | None

Random seed

None
Source code in uncertainty_flow/wrappers/conformal.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(
    self,
    base_model: BaseEstimator,
    calibration_method: str = "holdout",
    calibration_size: float = 0.2,
    coverage_target: float = 0.9,
    auto_tune: bool = True,
    uncertainty_features: list[str] | None = None,
    random_state: int | None = None,
):
    """
    Initialize ConformalRegressor.

    Args:
        base_model: Any sklearn-compatible regressor
        calibration_method: "holdout" or "cross"
        calibration_size: Fraction of data for calibration (0-1)
        coverage_target: Default coverage level for intervals
        auto_tune: Whether to tune supported hyperparameters before final fit
        uncertainty_features: Optional hint for heteroscedastic features
        random_state: Random seed
    """
    self.base_model = base_model
    self.calibration_method = calibration_method
    self.calibration_size = calibration_size
    self.coverage_target = coverage_target
    self.auto_tune = auto_tune
    self.uncertainty_features = uncertainty_features
    self.random_state = random_state

    # Fitted attributes
    self._fitted = False
    self._feature_cols_: list[str] = []
    self._target_col_: str = ""
    self._quantiles_: np.ndarray | None = None
    self._quantile_levels_: np.ndarray | None = None
    self._uncertainty_drivers_: pl.DataFrame | None = None
    self.tuned_params_: dict[str, float | int] = {}

fit(data, target=None, **kwargs)

Fit the conformal regressor.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame with features and target

required
target TargetSpec | None

Target column name(s)

None
**kwargs

Additional parameters (unused)

{}

Returns:

Type Description
ConformalRegressor

self (for method chaining)

Source code in uncertainty_flow/wrappers/conformal.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def fit(
    self,
    data: PolarsInput,
    target: TargetSpec | None = None,
    **kwargs,
) -> "ConformalRegressor":
    """
    Fit the conformal regressor.

    Args:
        data: Polars DataFrame or LazyFrame with features and target
        target: Target column name(s)
        **kwargs: Additional parameters (unused)

    Returns:
        self (for method chaining)
    """
    # Materialize LazyFrame once at the start
    data = materialize_lazyframe(data)

    # Get feature columns
    if target is None:
        raise ConfigurationError("target is required for ConformalRegressor")
    target_str = target if isinstance(target, str) else target[0]
    self._target_col_ = target_str

    if target_str not in data.columns:
        raise InvalidDataError(
            f"Target column '{target_str}' not found in data. "
            f"Available columns: {list(data.columns)}"
        )

    if self.auto_tune:
        self._auto_tune(data, target_str)

    feature_cols = [col for col in data.columns if col != target_str]
    if not feature_cols:
        raise InvalidDataError(
            f"No feature columns remaining after excluding target '{target_str}'. "
            f"Data must have at least one feature column."
        )
    self._feature_cols_ = feature_cols

    # Automatic validation split strategy selection
    plan = select_validation_plan(
        data,
        task_type="tabular",
        random_state=self.random_state,
        holdout_fraction=self.calibration_size,
    )
    train, calib = plan.outer_split

    # Convert to numpy - single collect already done above
    x_train = to_numpy(train, feature_cols)
    y_train = to_numpy_series(train[target_str]).flatten()
    x_calib = to_numpy(calib, feature_cols)
    y_calib = to_numpy_series(calib[target_str]).flatten()

    # Fit base model
    self.base_model.fit(x_train, y_train)

    # Compute conformal quantiles on calibration set
    calib_preds = self.base_model.predict(x_calib)
    residuals = y_calib - calib_preds

    # Store quantiles for prediction
    self._quantile_levels_ = np.asarray(list(DEFAULT_QUANTILES), dtype=float)
    self._quantiles_ = np.quantile(residuals, self._quantile_levels_)

    # Compute uncertainty drivers using calibration features
    calib_features = calib.select(feature_cols)
    self._uncertainty_drivers_ = compute_uncertainty_drivers(residuals, calib_features)

    self._fitted = True
    return self

predict(data)

Generate probabilistic predictions.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame with features

required

Returns:

Type Description
DistributionPrediction

DistributionPrediction with quantile predictions

Source code in uncertainty_flow/wrappers/conformal.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def predict(self, data: PolarsInput) -> DistributionPrediction:
    """
    Generate probabilistic predictions.

    Args:
        data: Polars DataFrame or LazyFrame with features

    Returns:
        DistributionPrediction with quantile predictions
    """
    if not self._fitted:
        raise ModelNotFittedError("ConformalRegressor")

    # Materialize LazyFrame if needed
    data = materialize_lazyframe(data)

    # Get predictions
    x = to_numpy(data, self._feature_cols_)
    point_preds = self.base_model.predict(x)

    # Add conformal quantiles
    if self._quantiles_ is None:
        raise ModelNotFittedError("ConformalRegressor")
    quantile_levels = self._resolve_quantile_levels()
    quantile_matrix = np.zeros((len(point_preds), len(quantile_levels)))
    for i, q in enumerate(self._quantiles_):
        quantile_matrix[:, i] = point_preds + q

    return DistributionPrediction(
        quantile_matrix=quantile_matrix,
        quantile_levels=quantile_levels.tolist(),
        target_names=[self._target_col_],
    )

ConformalClassifier

Conformal classifier with Adaptive Prediction Sets.

Wraps any sklearn classifier with a predict_proba method.

Examples:

>>> from sklearn.ensemble import RandomForestClassifier
>>> import polars as pl
>>> from uncertainty_flow.wrappers import ConformalClassifier
>>>
>>> df = pl.DataFrame({
...     "x1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
...     "x2": [2, 4, 6, 8, 10, 12, 14, 16, 18, 20],
...     "label": ["a", "a", "a", "a", "a", "b", "b", "b", "b", "b"],
... })
>>> model = ConformalClassifier(
...     base_model=RandomForestClassifier(random_state=42),
...     coverage_target=0.9,
...     random_state=42,
... )
>>> model.fit(df, target="label")
>>> pred = model.predict(df)
>>> pred.set(0)
>>> pred.size
Source code in uncertainty_flow/wrappers/conformal_classifier.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
class ConformalClassifier:
    """Conformal classifier with Adaptive Prediction Sets.

    Wraps any sklearn classifier with a ``predict_proba`` method.

    Examples:
        >>> from sklearn.ensemble import RandomForestClassifier
        >>> import polars as pl
        >>> from uncertainty_flow.wrappers import ConformalClassifier
        >>>
        >>> df = pl.DataFrame({
        ...     "x1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        ...     "x2": [2, 4, 6, 8, 10, 12, 14, 16, 18, 20],
        ...     "label": ["a", "a", "a", "a", "a", "b", "b", "b", "b", "b"],
        ... })
        >>> model = ConformalClassifier(
        ...     base_model=RandomForestClassifier(random_state=42),
        ...     coverage_target=0.9,
        ...     random_state=42,
        ... )
        >>> model.fit(df, target="label")
        >>> pred = model.predict(df)
        >>> pred.set(0)
        >>> pred.size
    """

    def __init__(
        self,
        base_model: BaseEstimator,
        coverage_target: float = 0.9,
        calibration_size: float = 0.2,
        random_state: int | None = None,
    ):
        if not (0 < coverage_target < 1):
            raise ValueError(f"coverage_target must be in (0, 1), got {coverage_target}")
        if not (0 < calibration_size < 1):
            raise ValueError(f"calibration_size must be in (0, 1), got {calibration_size}")

        self.base_model = base_model
        self.coverage_target = coverage_target
        self.calibration_size = calibration_size
        self.random_state = random_state

        self._fitted = False
        self._model: BaseEstimator | None = None
        self._feature_cols_: list[str] = []
        self._target_col_: str = ""
        self._class_names_: list[str] = []
        self._threshold: float | None = None

    def fit(
        self,
        data: PolarsInput,
        target: str | None = None,
        **kwargs,
    ) -> ConformalClassifier:
        data = materialize_lazyframe(data)

        if target is None:
            raise ConfigurationError("target is required for ConformalClassifier")
        target_str = target if isinstance(target, str) else target[0]
        self._target_col_ = target_str

        if target_str not in data.columns:
            raise ValueError(
                f"Target column '{target_str}' not found in data. "
                f"Available columns: {list(data.columns)}"
            )

        self._feature_cols_ = [c for c in data.columns if c != target_str]
        if not self._feature_cols_:
            raise ValueError("No feature columns found.")

        n = len(data)
        n_calib = max(1, int(n * self.calibration_size))
        n_train = n - n_calib

        rng = np.random.default_rng(self.random_state)
        shuffled_idx = rng.permutation(n)
        train_idx = shuffled_idx[:n_train]
        calib_idx = shuffled_idx[n_train:]

        train_data = data[train_idx.tolist()]
        calib_data = data[calib_idx.tolist()]

        x_train = to_numpy(train_data, self._feature_cols_)
        y_train = train_data[target_str].to_numpy().flatten()
        x_calib = to_numpy(calib_data, self._feature_cols_)
        y_calib = calib_data[target_str].to_numpy().flatten()

        self._model = clone(self.base_model)
        if self.random_state is not None and "random_state" in self._model.get_params(deep=False):
            self._model.set_params(random_state=self.random_state)
        self._model.fit(x_train, y_train)

        self._class_names_ = list(self._model.classes_)

        calib_probs = self._model.predict_proba(x_calib)
        miscoverage_alpha = 1.0 - self.coverage_target
        self._threshold = self._compute_aps_threshold(calib_probs, y_calib, miscoverage_alpha)

        self._fitted = True
        return self

    def predict(self, data: PolarsInput) -> PredictionSet:
        if not self._fitted:
            raise ModelNotFittedError("ConformalClassifier")
        if self._model is None:
            raise RuntimeError("Internal error: model is None after fit")
        if self._threshold is None:
            raise RuntimeError("Internal error: threshold is None after fit")

        data = materialize_lazyframe(data)
        x = to_numpy(data, self._feature_cols_)
        probs = self._model.predict_proba(x)

        class_sets = self._build_prediction_sets(probs, self._threshold, self._class_names_)

        return PredictionSet(
            class_sets=class_sets,
            class_names=self._class_names_,
            probabilities=probs,
            coverage_target=self.coverage_target,
            threshold=self._threshold,
        )

    def predict_batch(
        self,
        data: PolarsInput,
        batch_size: int = 1000,
    ) -> Iterator[PredictionSet]:
        """
        Generate prediction sets in chunks.

        Args:
            data: Feature DataFrame.
            batch_size: Rows per batch.

        Yields:
            ``PredictionSet`` per chunk.
        """
        data = materialize_lazyframe(data)
        n = len(data)
        for start in range(0, n, batch_size):
            yield self.predict(data[start : start + batch_size])

    @property
    def metadata(self) -> dict | None:
        """Return persisted metadata, or None for unfitted models."""
        cached_metadata = getattr(self, "_metadata", None)
        if cached_metadata is not None:
            return cached_metadata
        if not self._fitted:
            return None
        from ..core._persistence import build_metadata

        return build_metadata(self, include_metadata=True)

    def save(self, path: str | Path, include_metadata: bool = True) -> None:
        """Persist the conformal classifier via the standard .uf archive."""
        from ..core._persistence import save_model_archive

        self._metadata = save_model_archive(self, path, include_metadata=include_metadata)

    @classmethod
    def load(cls, path: str | Path, **kwargs) -> ConformalClassifier:
        """Load a conformal classifier from a .uf archive."""
        from ..core._persistence import load_model_archive

        model, _ = load_model_archive(path, **kwargs)
        if not isinstance(model, cls):
            raise TypeError(f"Expected {cls.__name__}, got {type(model).__name__}")
        return model

    def _compute_aps_threshold(
        self,
        calib_probs: np.ndarray,
        y_calib: np.ndarray,
        miscoverage_alpha: float,
    ) -> float:
        """Compute APS threshold at the conformal level 1 - alpha."""
        classes = self._model.classes_
        class_to_idx = {c: i for i, c in enumerate(classes)}

        aps_scores = np.empty(len(y_calib))
        for i in range(len(y_calib)):
            probs = calib_probs[i]
            true_idx = class_to_idx.get(y_calib[i], -1)
            if true_idx < 0:
                raise ValueError(f"Calibration label '{y_calib[i]}' not in model classes")

            order = np.argsort(probs)[::-1]
            cumsum = 0.0
            for cls_idx in order:
                cumsum += probs[cls_idx]
                if cls_idx == true_idx:
                    aps_scores[i] = cumsum
                    break

        n = len(aps_scores)
        quantile_idx = int(np.ceil((n + 1) * (1.0 - miscoverage_alpha)))
        quantile_idx = min(quantile_idx, n) - 1
        quantile_idx = max(quantile_idx, 0)

        sorted_scores = np.sort(aps_scores)
        return float(sorted_scores[quantile_idx])

    @staticmethod
    def _build_prediction_sets(
        probs: np.ndarray,
        threshold: float,
        class_names: list[str],
    ) -> list[list[str]]:
        sets: list[list[str]] = []

        for i in range(probs.shape[0]):
            row = probs[i]
            order = np.argsort(row)[::-1]

            included: list[str] = []
            cumsum = 0.0
            for cls_idx in order:
                included.append(class_names[cls_idx])
                cumsum += row[cls_idx]
                if cumsum >= threshold:
                    break

            sets.append(included)

        return sets

metadata property

Return persisted metadata, or None for unfitted models.

predict_batch(data, batch_size=1000)

Generate prediction sets in chunks.

Parameters:

Name Type Description Default
data PolarsInput

Feature DataFrame.

required
batch_size int

Rows per batch.

1000

Yields:

Type Description
PredictionSet

PredictionSet per chunk.

Source code in uncertainty_flow/wrappers/conformal_classifier.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def predict_batch(
    self,
    data: PolarsInput,
    batch_size: int = 1000,
) -> Iterator[PredictionSet]:
    """
    Generate prediction sets in chunks.

    Args:
        data: Feature DataFrame.
        batch_size: Rows per batch.

    Yields:
        ``PredictionSet`` per chunk.
    """
    data = materialize_lazyframe(data)
    n = len(data)
    for start in range(0, n, batch_size):
        yield self.predict(data[start : start + batch_size])

save(path, include_metadata=True)

Persist the conformal classifier via the standard .uf archive.

Source code in uncertainty_flow/wrappers/conformal_classifier.py
189
190
191
192
193
def save(self, path: str | Path, include_metadata: bool = True) -> None:
    """Persist the conformal classifier via the standard .uf archive."""
    from ..core._persistence import save_model_archive

    self._metadata = save_model_archive(self, path, include_metadata=include_metadata)

load(path, **kwargs) classmethod

Load a conformal classifier from a .uf archive.

Source code in uncertainty_flow/wrappers/conformal_classifier.py
195
196
197
198
199
200
201
202
203
@classmethod
def load(cls, path: str | Path, **kwargs) -> ConformalClassifier:
    """Load a conformal classifier from a .uf archive."""
    from ..core._persistence import load_model_archive

    model, _ = load_model_archive(path, **kwargs)
    if not isinstance(model, cls):
        raise TypeError(f"Expected {cls.__name__}, got {type(model).__name__}")
    return model

ConformalForecaster

Bases: BaseUncertaintyModel

Time series forecasting with conformal prediction.

Coverage guarantee: ✅ (with temporal correction) Non-crossing: ✅ (post-sort)

Examples:

>>> from sklearn.ensemble import GradientBoostingRegressor
>>> from uncertainty_flow.wrappers import ConformalForecaster
>>> import polars as pl
>>>
>>> df = pl.DataFrame({
...     "date": range(10),
...     "price": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
... })
>>> model = ConformalForecaster(
...     base_model=GradientBoostingRegressor(),
...     targets="price",
...     horizon=3,
...     lags=2,
... )
>>> model.fit(df)
>>> pred = model.predict(df)
Source code in uncertainty_flow/wrappers/conformal_ts.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
class ConformalForecaster(BaseUncertaintyModel):
    """
    Time series forecasting with conformal prediction.

    Coverage guarantee: ✅ (with temporal correction)
    Non-crossing: ✅ (post-sort)

    Examples:
        >>> from sklearn.ensemble import GradientBoostingRegressor
        >>> from uncertainty_flow.wrappers import ConformalForecaster
        >>> import polars as pl
        >>>
        >>> df = pl.DataFrame({
        ...     "date": range(10),
        ...     "price": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        ... })
        >>> model = ConformalForecaster(
        ...     base_model=GradientBoostingRegressor(),
        ...     targets="price",
        ...     horizon=3,
        ...     lags=2,
        ... )
        >>> model.fit(df)
        >>> pred = model.predict(df)
    """

    def __init__(
        self,
        base_model: BaseEstimator,
        horizon: int,
        targets: str | list[str],
        copula_family: str = "auto",
        lags: int | list[int] = 1,
        calibration_method: str = "holdout",
        calibration_size: float = 0.2,
        auto_tune: bool = True,
        uncertainty_features: list[str] | None = None,
        random_state: int | None = None,
    ):
        """
        Initialize ConformalForecaster.

        Args:
            base_model: Any sklearn-compatible regressor
            horizon: Forecast horizon (steps ahead)
            targets: Target column name(s)
            copula_family: (
                "auto" (BIC selection) or one of "gaussian", "clayton", "gumbel", "frank". "
                "Use "independent" for no inter-target correlation."
            )
            lags: Lag order(s) to generate
            calibration_method: "holdout" or "cross"
            calibration_size: Fraction for calibration (from END)
            auto_tune: Whether to tune supported hyperparameters before final fit
            uncertainty_features: Optional hint for heteroscedastic features
            random_state: Random seed
        """
        self.base_model = base_model
        self.horizon = horizon
        self.targets = [targets] if isinstance(targets, str) else targets
        self.copula_family = copula_family
        self.lags = [lags] if isinstance(lags, int) else lags
        self.calibration_method = calibration_method
        self.calibration_size = calibration_size
        self.auto_tune = auto_tune
        self.uncertainty_features = uncertainty_features
        self.random_state = random_state

        # Fitted attributes
        self._fitted = False
        self._copula: BaseCopula | None = None
        self._models_: dict[str, BaseEstimator] = {}
        self._quantiles_: dict[str, np.ndarray] = {}
        self._quantile_levels_: np.ndarray | None = None
        self._feature_cols_: dict[str, list[str]] = {}
        self._uncertainty_drivers_: pl.DataFrame | None = None
        self.tuned_params_: dict[str, float | int] = {}

    def _resolve_quantile_levels(self) -> np.ndarray:
        """Return fit-time quantile levels, with backward-compatible fallback."""
        if self._quantile_levels_ is not None:
            return self._quantile_levels_

        fallback_levels = np.asarray(list(DEFAULT_QUANTILES), dtype=float)
        if self._quantiles_:
            first_target = next(iter(self._quantiles_))
            if len(self._quantiles_[first_target]) != len(fallback_levels):
                raise InvalidDataError(
                    "Current config quantile count does not match fitted residual quantiles. "
                    "Refit the model after setting the desired quantile configuration."
                )
        return fallback_levels

    def _auto_tune(self, data: pl.DataFrame) -> None:
        """Tune params using validation splits, with CV averaging when inner splits exist."""
        eval_splits = build_tune_splits(
            data, task_type="time_series", random_state=self.random_state
        )

        best_score = float("inf")
        best_params: dict[str, float | int] = {}
        best_model = clone(self.base_model)

        for base_params in estimator_param_candidates(self.base_model):
            tuned_base = clone(self.base_model)
            if base_params:
                tuned_base.set_params(**base_params)

            for calib_size in valid_calibration_candidates(
                len(eval_splits[0][0]), self.calibration_size, [0.15, 0.2, 0.25, 0.3]
            ):
                for lags in candidate_values(self.lags[0], [1, 2, 3]):
                    split_scores: list[float] = []
                    for split_train, split_val in eval_splits:
                        candidate = ConformalForecaster(
                            base_model=tuned_base,
                            horizon=self.horizon,
                            targets=self.targets,
                            copula_family=self.copula_family,
                            lags=lags,
                            calibration_method=self.calibration_method,
                            calibration_size=calib_size,
                            auto_tune=False,
                            uncertainty_features=self.uncertainty_features,
                            random_state=self.random_state,
                        )
                        with warnings.catch_warnings():
                            warnings.simplefilter("ignore")
                            candidate.fit(split_train)
                            pred = candidate.predict(split_val)
                        actuals = split_val.select(self.targets)
                        score = score_distribution_prediction(
                            pred,
                            actuals,
                            self.targets,
                            confidence=0.9,
                        )
                        split_scores.append(score)
                    avg_score = float(np.mean(split_scores))
                    if avg_score < best_score:
                        best_score = avg_score
                        best_model = clone(tuned_base)
                        best_params = {
                            **base_params,
                            "calibration_size": calib_size,
                            "lags": int(lags),
                        }

        self.base_model = best_model
        self.calibration_size = float(best_params.get("calibration_size", self.calibration_size))
        self.lags = [int(best_params.get("lags", self.lags[0]))]
        self.tuned_params_ = best_params

    def _create_lag_features(
        self,
        data: pl.DataFrame,
        target: str,
    ) -> pl.DataFrame:
        """Create lag features for a target."""
        result = data
        for lag in self.lags:
            result = result.with_columns(pl.col(target).shift(lag).alias(f"{target}_lag_{lag}"))
        return result.drop_nulls()

    def fit(
        self,
        data: PolarsInput,
        target: TargetSpec | None = None,
        **kwargs,
    ) -> "ConformalForecaster":
        """
        Fit the conformal forecaster.

        Args:
            data: Polars DataFrame or LazyFrame with time series data
            target: Target column name(s) - uses self.targets if not provided
            **kwargs: Additional parameters (unused)

        Returns:
            self (for method chaining)
        """
        # Materialize if needed
        data = materialize_lazyframe(data)

        if self.auto_tune:
            self._auto_tune(data)

        # Create lag features for each target
        data_with_lags = data
        self._quantile_levels_ = np.asarray(list(DEFAULT_QUANTILES), dtype=float)
        for target in self.targets:
            data_with_lags = self._create_lag_features(data_with_lags, target)

        # Automatic validation split strategy selection
        plan = select_validation_plan(
            data_with_lags,
            task_type="time_series",
            random_state=self.random_state,
            holdout_fraction=self.calibration_size,
        )
        train, calib = plan.outer_split

        # Fit per target
        residual_matrix = []

        for target in self.targets:
            feature_cols = [col for col in train.columns if col not in self.targets]
            self._feature_cols_[target] = feature_cols

            x_train = to_numpy(train, feature_cols)
            y_train = to_numpy(train, [target]).flatten()
            x_calib = to_numpy(calib, feature_cols)
            y_calib = to_numpy(calib, [target]).flatten()

            model = clone(self.base_model)
            if self.random_state is not None and "random_state" in model.get_params(deep=False):
                model.set_params(random_state=self.random_state)
            model.fit(x_train, y_train)
            self._models_[target] = model

            calib_preds = model.predict(x_calib)
            residuals = y_calib - calib_preds
            if self._quantile_levels_ is None:
                raise RuntimeError("Internal error: _quantile_levels_ not set before calibration")
            self._quantiles_[target] = np.quantile(residuals, self._quantile_levels_)

            residual_matrix.append(residuals)

        # Fit copula if multivariate
        if len(self.targets) > 1 and self.copula_family != "independent":
            stacked_residuals = np.column_stack(residual_matrix)

            if self.copula_family == "auto":
                selected = auto_select_copula(stacked_residuals)
            elif self.copula_family in COPULA_FAMILIES:
                selected = self.copula_family
            else:
                raise InvalidDataError(
                    f"Unknown copula_family: {self.copula_family}. "
                    f"Valid options: auto, gaussian, clayton, gumbel, frank, independent"
                )

            copula_cls = COPULA_FAMILIES[selected]
            self._copula = copula_cls().fit(stacked_residuals)
        else:
            self._copula = None

        self._fitted = True
        return self

    def predict(
        self,
        data: PolarsInput,
        steps: int | None = None,
    ) -> DistributionPrediction:
        """
        Generate probabilistic forecasts.

        Args:
            data: Polars DataFrame or LazyFrame
            steps: Number of steps to forecast (default: self.horizon)

        Returns:
            DistributionPrediction with quantile forecasts
        """
        if not self._fitted:
            raise ModelNotFittedError("ConformalForecaster")

        steps = steps or self.horizon

        # Materialize if needed
        data = materialize_lazyframe(data)

        # Create lag features
        data_with_lags = data
        for target in self.targets:
            data_with_lags = self._create_lag_features(data_with_lags, target)

        # Get predictions for each target
        quantile_levels = self._resolve_quantile_levels()
        all_quantiles = []
        for target in self.targets:
            x = to_numpy(data_with_lags, self._feature_cols_[target])
            point_preds = self._models_[target].predict(x)

            # Add conformal quantiles
            target_quantiles = self._quantiles_[target]
            if len(target_quantiles) != len(quantile_levels):
                raise InvalidDataError(
                    "Stored quantiles do not match configured quantile levels. "
                    "Refit the model to regenerate compatible quantiles."
                )
            quantile_matrix = np.zeros((len(point_preds), len(quantile_levels)))
            for i, q in enumerate(self._quantiles_[target]):
                quantile_matrix[:, i] = point_preds + q

            all_quantiles.append(quantile_matrix)

        # Stack for multivariate
        if len(self.targets) == 1:
            final_matrix = all_quantiles[0]
        else:
            # Interleave: [target1_q1, target1_q2, ..., target2_q1, target2_q2, ...]
            final_matrix = np.column_stack(
                [
                    all_quantiles[t][:, i]
                    for t in range(len(self.targets))
                    for i in range(len(quantile_levels))
                ]
            )

        return DistributionPrediction(
            quantile_matrix=final_matrix,
            quantile_levels=quantile_levels.tolist(),
            target_names=self.targets,
            copula=self._copula,
        )

    @property
    def uncertainty_drivers_(self) -> pl.DataFrame | None:
        """Return residual correlation analysis results."""
        return self._uncertainty_drivers_

uncertainty_drivers_ property

Return residual correlation analysis results.

__init__(base_model, horizon, targets, copula_family='auto', lags=1, calibration_method='holdout', calibration_size=0.2, auto_tune=True, uncertainty_features=None, random_state=None)

Initialize ConformalForecaster.

Parameters:

Name Type Description Default
base_model BaseEstimator

Any sklearn-compatible regressor

required
horizon int

Forecast horizon (steps ahead)

required
targets str | list[str]

Target column name(s)

required
copula_family str

( "auto" (BIC selection) or one of "gaussian", "clayton", "gumbel", "frank". " "Use "independent" for no inter-target correlation."

'auto'
lags int | list[int]

Lag order(s) to generate

1
calibration_method str

"holdout" or "cross"

'holdout'
calibration_size float

Fraction for calibration (from END)

0.2
auto_tune bool

Whether to tune supported hyperparameters before final fit

True
uncertainty_features list[str] | None

Optional hint for heteroscedastic features

None
random_state int | None

Random seed

None
Source code in uncertainty_flow/wrappers/conformal_ts.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def __init__(
    self,
    base_model: BaseEstimator,
    horizon: int,
    targets: str | list[str],
    copula_family: str = "auto",
    lags: int | list[int] = 1,
    calibration_method: str = "holdout",
    calibration_size: float = 0.2,
    auto_tune: bool = True,
    uncertainty_features: list[str] | None = None,
    random_state: int | None = None,
):
    """
    Initialize ConformalForecaster.

    Args:
        base_model: Any sklearn-compatible regressor
        horizon: Forecast horizon (steps ahead)
        targets: Target column name(s)
        copula_family: (
            "auto" (BIC selection) or one of "gaussian", "clayton", "gumbel", "frank". "
            "Use "independent" for no inter-target correlation."
        )
        lags: Lag order(s) to generate
        calibration_method: "holdout" or "cross"
        calibration_size: Fraction for calibration (from END)
        auto_tune: Whether to tune supported hyperparameters before final fit
        uncertainty_features: Optional hint for heteroscedastic features
        random_state: Random seed
    """
    self.base_model = base_model
    self.horizon = horizon
    self.targets = [targets] if isinstance(targets, str) else targets
    self.copula_family = copula_family
    self.lags = [lags] if isinstance(lags, int) else lags
    self.calibration_method = calibration_method
    self.calibration_size = calibration_size
    self.auto_tune = auto_tune
    self.uncertainty_features = uncertainty_features
    self.random_state = random_state

    # Fitted attributes
    self._fitted = False
    self._copula: BaseCopula | None = None
    self._models_: dict[str, BaseEstimator] = {}
    self._quantiles_: dict[str, np.ndarray] = {}
    self._quantile_levels_: np.ndarray | None = None
    self._feature_cols_: dict[str, list[str]] = {}
    self._uncertainty_drivers_: pl.DataFrame | None = None
    self.tuned_params_: dict[str, float | int] = {}

fit(data, target=None, **kwargs)

Fit the conformal forecaster.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame with time series data

required
target TargetSpec | None

Target column name(s) - uses self.targets if not provided

None
**kwargs

Additional parameters (unused)

{}

Returns:

Type Description
ConformalForecaster

self (for method chaining)

Source code in uncertainty_flow/wrappers/conformal_ts.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def fit(
    self,
    data: PolarsInput,
    target: TargetSpec | None = None,
    **kwargs,
) -> "ConformalForecaster":
    """
    Fit the conformal forecaster.

    Args:
        data: Polars DataFrame or LazyFrame with time series data
        target: Target column name(s) - uses self.targets if not provided
        **kwargs: Additional parameters (unused)

    Returns:
        self (for method chaining)
    """
    # Materialize if needed
    data = materialize_lazyframe(data)

    if self.auto_tune:
        self._auto_tune(data)

    # Create lag features for each target
    data_with_lags = data
    self._quantile_levels_ = np.asarray(list(DEFAULT_QUANTILES), dtype=float)
    for target in self.targets:
        data_with_lags = self._create_lag_features(data_with_lags, target)

    # Automatic validation split strategy selection
    plan = select_validation_plan(
        data_with_lags,
        task_type="time_series",
        random_state=self.random_state,
        holdout_fraction=self.calibration_size,
    )
    train, calib = plan.outer_split

    # Fit per target
    residual_matrix = []

    for target in self.targets:
        feature_cols = [col for col in train.columns if col not in self.targets]
        self._feature_cols_[target] = feature_cols

        x_train = to_numpy(train, feature_cols)
        y_train = to_numpy(train, [target]).flatten()
        x_calib = to_numpy(calib, feature_cols)
        y_calib = to_numpy(calib, [target]).flatten()

        model = clone(self.base_model)
        if self.random_state is not None and "random_state" in model.get_params(deep=False):
            model.set_params(random_state=self.random_state)
        model.fit(x_train, y_train)
        self._models_[target] = model

        calib_preds = model.predict(x_calib)
        residuals = y_calib - calib_preds
        if self._quantile_levels_ is None:
            raise RuntimeError("Internal error: _quantile_levels_ not set before calibration")
        self._quantiles_[target] = np.quantile(residuals, self._quantile_levels_)

        residual_matrix.append(residuals)

    # Fit copula if multivariate
    if len(self.targets) > 1 and self.copula_family != "independent":
        stacked_residuals = np.column_stack(residual_matrix)

        if self.copula_family == "auto":
            selected = auto_select_copula(stacked_residuals)
        elif self.copula_family in COPULA_FAMILIES:
            selected = self.copula_family
        else:
            raise InvalidDataError(
                f"Unknown copula_family: {self.copula_family}. "
                f"Valid options: auto, gaussian, clayton, gumbel, frank, independent"
            )

        copula_cls = COPULA_FAMILIES[selected]
        self._copula = copula_cls().fit(stacked_residuals)
    else:
        self._copula = None

    self._fitted = True
    return self

predict(data, steps=None)

Generate probabilistic forecasts.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame

required
steps int | None

Number of steps to forecast (default: self.horizon)

None

Returns:

Type Description
DistributionPrediction

DistributionPrediction with quantile forecasts

Source code in uncertainty_flow/wrappers/conformal_ts.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def predict(
    self,
    data: PolarsInput,
    steps: int | None = None,
) -> DistributionPrediction:
    """
    Generate probabilistic forecasts.

    Args:
        data: Polars DataFrame or LazyFrame
        steps: Number of steps to forecast (default: self.horizon)

    Returns:
        DistributionPrediction with quantile forecasts
    """
    if not self._fitted:
        raise ModelNotFittedError("ConformalForecaster")

    steps = steps or self.horizon

    # Materialize if needed
    data = materialize_lazyframe(data)

    # Create lag features
    data_with_lags = data
    for target in self.targets:
        data_with_lags = self._create_lag_features(data_with_lags, target)

    # Get predictions for each target
    quantile_levels = self._resolve_quantile_levels()
    all_quantiles = []
    for target in self.targets:
        x = to_numpy(data_with_lags, self._feature_cols_[target])
        point_preds = self._models_[target].predict(x)

        # Add conformal quantiles
        target_quantiles = self._quantiles_[target]
        if len(target_quantiles) != len(quantile_levels):
            raise InvalidDataError(
                "Stored quantiles do not match configured quantile levels. "
                "Refit the model to regenerate compatible quantiles."
            )
        quantile_matrix = np.zeros((len(point_preds), len(quantile_levels)))
        for i, q in enumerate(self._quantiles_[target]):
            quantile_matrix[:, i] = point_preds + q

        all_quantiles.append(quantile_matrix)

    # Stack for multivariate
    if len(self.targets) == 1:
        final_matrix = all_quantiles[0]
    else:
        # Interleave: [target1_q1, target1_q2, ..., target2_q1, target2_q2, ...]
        final_matrix = np.column_stack(
            [
                all_quantiles[t][:, i]
                for t in range(len(self.targets))
                for i in range(len(quantile_levels))
            ]
        )

    return DistributionPrediction(
        quantile_matrix=final_matrix,
        quantile_levels=quantile_levels.tolist(),
        target_names=self.targets,
        copula=self._copula,
    )

EnsembleBootstrapPI

Bases: BaseUncertaintyModel

Ensemble Bootstrap Prediction Intervals (EnbPI).

Trains n_estimators bootstrap copies of a sklearn regressor, then constructs prediction intervals from the ensemble distribution with conformal calibration via stored nonconformity scores.

Usage pattern
  1. fit(train_data, target) — trains the bootstrap ensemble.
  2. predict(test_data) — returns calibrated prediction intervals.
  3. update(y_true) — after observing the true value, updates the nonconformity score pool for future predictions.

Examples:

>>> from sklearn.ensemble import GradientBoostingRegressor
>>> import polars as pl
>>> from uncertainty_flow.wrappers import EnsembleBootstrapPI
>>>
>>> df = pl.DataFrame({
...     "x": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
...     "y": [1.5, 3.5, 5.5, 7.5, 9.5, 9.0, 7.0, 5.0, 3.0, 1.0],
... })
>>> model = EnsembleBootstrapPI(
...     base_model=GradientBoostingRegressor(random_state=42),
...     n_estimators=20,
...     random_state=42,
... )
>>> model.fit(df, target="y")
>>> pred = model.predict(df)
>>> model.update(df["y"][0])
Source code in uncertainty_flow/wrappers/enbpi.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class EnsembleBootstrapPI(BaseUncertaintyModel):
    """Ensemble Bootstrap Prediction Intervals (EnbPI).

    Trains ``n_estimators`` bootstrap copies of a sklearn regressor, then
    constructs prediction intervals from the ensemble distribution with
    conformal calibration via stored nonconformity scores.

    Usage pattern:
        1. ``fit(train_data, target)`` — trains the bootstrap ensemble.
        2. ``predict(test_data)`` — returns calibrated prediction intervals.
        3. ``update(y_true)`` — after observing the true value, updates the
           nonconformity score pool for future predictions.

    Examples:
        >>> from sklearn.ensemble import GradientBoostingRegressor
        >>> import polars as pl
        >>> from uncertainty_flow.wrappers import EnsembleBootstrapPI
        >>>
        >>> df = pl.DataFrame({
        ...     "x": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        ...     "y": [1.5, 3.5, 5.5, 7.5, 9.5, 9.0, 7.0, 5.0, 3.0, 1.0],
        ... })
        >>> model = EnsembleBootstrapPI(
        ...     base_model=GradientBoostingRegressor(random_state=42),
        ...     n_estimators=20,
        ...     random_state=42,
        ... )
        >>> model.fit(df, target="y")
        >>> pred = model.predict(df)
        >>> model.update(df["y"][0])
    """

    def __init__(
        self,
        base_model: BaseEstimator,
        n_estimators: int = 100,
        coverage_target: float = 0.9,
        subsample_ratio: float = 1.0,
        random_state: int | None = None,
    ):
        if n_estimators < 2:
            raise ValueError(f"n_estimators must be >= 2, got {n_estimators}")
        if not (0 < coverage_target < 1):
            raise ValueError(f"coverage_target must be in (0, 1), got {coverage_target}")
        if not (0 < subsample_ratio <= 1):
            raise ValueError(f"subsample_ratio must be in (0, 1], got {subsample_ratio}")

        self.base_model = base_model
        self.n_estimators = n_estimators
        self.coverage_target = coverage_target
        self.subsample_ratio = subsample_ratio
        self.random_state = random_state

        self._fitted = False
        self._models: list[BaseEstimator] = []
        self._feature_cols_: list[str] = []
        self._target_col_: str = ""
        self._scores: list[float] = []
        self._quantile_levels_: np.ndarray | None = None
        self._last_preds: np.ndarray | None = None

    def fit(
        self,
        data: PolarsInput,
        target: TargetSpec | None = None,
        **kwargs,
    ) -> EnsembleBootstrapPI:
        data = materialize_lazyframe(data)

        if target is None:
            raise ConfigurationError("target is required for EnsembleBootstrapPI")
        target_str = target if isinstance(target, str) else target[0]
        self._target_col_ = target_str

        if target_str not in data.columns:
            raise ValueError(
                f"Target column '{target_str}' not found in data. "
                f"Available columns: {list(data.columns)}"
            )

        self._feature_cols_ = [c for c in data.columns if c != target_str]
        if not self._feature_cols_:
            raise ValueError("No feature columns remaining after excluding target.")

        x_all = to_numpy(data, self._feature_cols_)
        y_all = data[target_str].to_numpy().flatten()
        n = len(data)
        subsample_size = max(1, int(n * self.subsample_ratio))

        rng = np.random.default_rng(self.random_state)
        self._models = []

        for i in range(self.n_estimators):
            boot_idx = rng.integers(0, n, size=subsample_size)
            x_boot = x_all[boot_idx]
            y_boot = y_all[boot_idx]

            model = clone(self.base_model)
            seed = (self.random_state if self.random_state is not None else 42) + i
            if "random_state" in model.get_params(deep=False):
                model.set_params(random_state=seed)
            model.fit(x_boot, y_boot)
            self._models.append(model)

        calib_preds = np.mean([m.predict(x_all) for m in self._models], axis=0)
        residuals = np.abs(y_all - calib_preds)
        self._scores = residuals.tolist()
        self._quantile_levels_ = np.asarray(list(DEFAULT_QUANTILES), dtype=float)

        self._fitted = True
        return self

    def predict(
        self,
        data: PolarsInput,
    ) -> DistributionPrediction:
        if not self._fitted:
            raise ModelNotFittedError("EnsembleBootstrapPI")

        data = materialize_lazyframe(data)
        x = to_numpy(data, self._feature_cols_)

        all_preds = np.column_stack([m.predict(x) for m in self._models])
        point_preds = np.mean(all_preds, axis=1)

        # Store the last point predictions for update() to compute residuals
        self._last_preds = point_preds.copy()

        score_arr = np.array(self._scores) if self._scores else np.array([0.0])
        conformal_level = min(self.coverage_target, 1.0)
        q_value = np.quantile(score_arr, conformal_level)

        if self._quantile_levels_ is None:
            raise RuntimeError("Internal error: quantile levels not set")

        quantile_matrix = np.zeros((len(point_preds), len(self._quantile_levels_)))
        ens_std = np.std(all_preds, axis=1)
        ens_std = np.clip(ens_std, 1e-12, None)

        for j, level in enumerate(self._quantile_levels_):
            z = float(
                np.percentile(
                    (all_preds - point_preds[:, None]).ravel()
                    / np.repeat(ens_std, self.n_estimators),
                    level * 100,
                )
            )
            quantile_matrix[:, j] = point_preds + z * ens_std + q_value * (level - 0.5)

        quantile_matrix = np.sort(quantile_matrix, axis=1)

        return DistributionPrediction(
            quantile_matrix=quantile_matrix,
            quantile_levels=self._quantile_levels_.tolist(),
            target_names=[self._target_col_],
        )

    def update(self, y_true: float | np.ndarray) -> None:
        if not self._fitted:
            raise ModelNotFittedError("EnsembleBootstrapPI")

        y_arr = np.atleast_1d(np.asarray(y_true, dtype=float))

        if self._last_preds is None:
            raise RuntimeError(
                "update() called before predict(). "
                "Call predict() first to generate predictions, "
                "then update() with the corresponding true values."
            )

        if len(y_arr) != len(self._last_preds):
            raise ValueError(
                f"y_true length ({len(y_arr)}) must match the number of "
                f"predictions from the most recent predict() call ({len(self._last_preds)})."
            )

        for i in range(len(y_arr)):
            residual = abs(float(y_arr[i]) - float(self._last_preds[i]))
            self._scores.append(residual)