Skip to content

Calibration

uncertainty_flow.calibration

Calibration diagnostics for uncertainty models.

RecalibratedModel

Bases: BaseUncertaintyModel

Wrap a fitted model with isotonic recalibration.

Learns a monotone mapping from predicted quantile levels to empirical coverage using isotonic regression (Kuleshov et al., 2018).

Supports two modes: - Separate calibration set (default): fit the isotonic map on held-out data provided to fit(). - Cross-fitting (cross_calibrate=True): K-fold fit on the inner model's training data to avoid overfitting the isotonic map.

Examples:

>>> from sklearn.ensemble import GradientBoostingRegressor
>>> from uncertainty_flow.wrappers import ConformalRegressor
>>> from uncertainty_flow.calibration import RecalibratedModel
>>> import polars as pl
>>>
>>> base = ConformalRegressor(base_model=GradientBoostingRegressor())
>>> base.fit(df_train, target="y")
>>> recal = RecalibratedModel(model=base)
>>> recal.fit(df_calib, target="y")
>>> pred = recal.predict(df_test)
Source code in uncertainty_flow/calibration/recalibration.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
class RecalibratedModel(BaseUncertaintyModel):
    """
    Wrap a fitted model with isotonic recalibration.

    Learns a monotone mapping from predicted quantile levels to empirical
    coverage using isotonic regression (Kuleshov et al., 2018).

    Supports two modes:
    - **Separate calibration set** (default): fit the isotonic map on held-out
      data provided to ``fit()``.
    - **Cross-fitting** (``cross_calibrate=True``): K-fold fit on the inner
      model's training data to avoid overfitting the isotonic map.

    Examples:
        >>> from sklearn.ensemble import GradientBoostingRegressor
        >>> from uncertainty_flow.wrappers import ConformalRegressor
        >>> from uncertainty_flow.calibration import RecalibratedModel
        >>> import polars as pl
        >>>
        >>> base = ConformalRegressor(base_model=GradientBoostingRegressor())
        >>> base.fit(df_train, target="y")
        >>> recal = RecalibratedModel(model=base)
        >>> recal.fit(df_calib, target="y")
        >>> pred = recal.predict(df_test)
    """

    def __init__(
        self,
        model: BaseUncertaintyModel,
        quantile_levels: list[float] | None = None,
        cross_calibrate: bool = False,
        n_folds: int = 5,
        random_state: int | None = None,
    ):
        """
        Args:
            model: A fitted BaseUncertaintyModel to recalibrate.
            quantile_levels: Quantile levels for the output predictions.
                Defaults to the inner model's levels.
            cross_calibrate: If True, use K-fold cross-fitting on the
                calibration data to avoid overfitting the isotonic map.
            n_folds: Number of folds when cross_calibrate=True.
            random_state: Random seed for cross-fitting.
        """
        self.model = model
        self.quantile_levels = quantile_levels
        self.cross_calibrate = cross_calibrate
        self.n_folds = n_folds
        self.random_state = random_state

        self._fitted = False
        self._isotonic_regressors: list[IsotonicRegression] | None = None
        self._output_levels: np.ndarray | None = None
        self._target_names: list[str] | None = None

    def fit(
        self,
        data: PolarsInput,
        target: TargetSpec | None = None,
        **kwargs,
    ) -> RecalibratedModel:
        """
        Learn the isotonic recalibration mapping.

        Args:
            data: Calibration dataset. If cross_calibrate=True, this data
                is split into K folds for cross-fitting.
            target: Target column name(s).

        Returns:
            self
        """
        data = materialize_lazyframe(data)

        if self.cross_calibrate:
            self._fit_cross_calibrated(data, target)
        else:
            self._fit_direct(data, target)

        self._fitted = True
        return self

    def _fit_direct(
        self,
        data: pl.DataFrame,
        target: TargetSpec | None,
    ) -> None:
        pred = self.model.predict(data)
        y_arr = self._extract_y(data, target, pred)

        self._target_names = pred._targets
        self._output_levels = (
            np.asarray(self.quantile_levels, dtype=float)
            if self.quantile_levels is not None
            else pred._levels.copy()
        )

        self._isotonic_regressors = self._fit_isotonic_map(pred, y_arr)

    def _fit_cross_calibrated(
        self,
        data: pl.DataFrame,
        target: TargetSpec | None,
    ) -> None:
        from sklearn.model_selection import KFold

        n = len(data)
        n_splits = min(self.n_folds, n)
        if n_splits < 2:
            # Fall back to direct fit if too few samples for cross-fitting
            self._fit_direct(data, target)
            return

        kf = KFold(n_splits=n_splits, shuffle=True, random_state=self.random_state)

        ref_pred = self.model.predict(data)
        self._target_names = ref_pred._targets
        self._output_levels = (
            np.asarray(self.quantile_levels, dtype=float)
            if self.quantile_levels is not None
            else ref_pred._levels.copy()
        )

        n_targets = len(ref_pred._targets)
        n_quantiles = ref_pred._n_quantiles

        # Accumulate empirical coverage per fold per quantile level
        fold_empirical_sums = np.zeros((n_targets, n_quantiles))
        fold_counts = 0

        for train_idx, val_idx in kf.split(range(n)):
            val_df = (
                data.with_row_index("__idx__")
                .filter(pl.col("__idx__").is_in(list(val_idx)))
                .drop("__idx__")
            )

            fold_pred = self.model.predict(val_df)
            y_val = self._extract_y(val_df, target, fold_pred)

            for t_idx in range(n_targets):
                q_start = t_idx * fold_pred._n_quantiles
                q_end = q_start + fold_pred._n_quantiles
                q_slice = fold_pred._quantiles[:, q_start:q_end]
                y_col = y_val[:, t_idx] if y_val.ndim == 2 else y_val

                empirical = np.mean(y_col[:, None] <= q_slice, axis=0)
                fold_empirical_sums[t_idx] += empirical

            fold_counts += 1

        avg_empirical = fold_empirical_sums / max(fold_counts, 1)

        self._isotonic_regressors = []
        for t_idx in range(n_targets):
            iso = IsotonicRegression(out_of_bounds="clip")
            iso.fit(ref_pred._levels, avg_empirical[t_idx])
            self._isotonic_regressors.append(iso)

    def predict(self, data: PolarsInput) -> DistributionPrediction:
        """
        Generate recalibrated predictions.

        Args:
            data: Input data for prediction.

        Returns:
            DistributionPrediction with recalibrated quantile values.
        """
        if not self._fitted:
            raise ModelNotFittedError("RecalibratedModel")

        data = materialize_lazyframe(data)
        pred = self.model.predict(data)

        if self._isotonic_regressors is None:
            raise RuntimeError("Internal error: isotonic regressors not fitted")
        if self._output_levels is None:
            raise RuntimeError("Internal error: output levels not set")
        if self._target_names is None:
            raise RuntimeError("Internal error: target names not set")

        n_targets = len(pred._targets)
        n_quantiles = pred._n_quantiles
        n_out = len(self._output_levels)

        output_matrix = np.empty((pred._n_samples, n_targets * n_out))

        for t_idx in range(n_targets):
            q_start = t_idx * n_quantiles
            q_end = q_start + n_quantiles
            q_slice = pred._quantiles[:, q_start:q_end]

            iso = self._isotonic_regressors[t_idx]

            # Build inverse mapping: for desired empirical coverage,
            # find the nominal quantile level that produces it.
            # iso maps nominal_level -> empirical_coverage.
            # We evaluate on the model's levels (which are increasing) to get
            # empirical coverage at each level, then interpolate back.
            empirical_at_levels = iso.predict(pred._levels)

            out_q = np.empty((pred._n_samples, n_out))
            for j in range(n_out):
                level = self._output_levels[j]

                # Find nominal level such that empirical coverage ≈ level
                # Since both empirical_at_levels and pred._levels are
                # monotone increasing, np.interp gives the inverse.
                if empirical_at_levels[-1] <= empirical_at_levels[0]:
                    # Degenerate isotonic map (should not happen)
                    input_level = level
                else:
                    input_level = float(np.interp(level, empirical_at_levels, pred._levels))

                # Find nearest model quantile to input_level
                nearest_idx = int(np.argmin(np.abs(pred._levels - input_level)))
                out_q[:, j] = q_slice[:, nearest_idx]

            o_start = t_idx * n_out
            o_end = o_start + n_out
            output_matrix[:, o_start:o_end] = out_q

        output_matrix = np.sort(output_matrix, axis=1)

        return DistributionPrediction(
            quantile_matrix=output_matrix,
            quantile_levels=self._output_levels.tolist(),
            target_names=list(self._target_names),
        )

    def _fit_isotonic_map(
        self,
        pred: DistributionPrediction,
        y_arr: np.ndarray,
    ) -> list[IsotonicRegression]:
        n_targets = len(pred._targets)
        regressors: list[IsotonicRegression] = []

        for t_idx in range(n_targets):
            q_start = t_idx * pred._n_quantiles
            q_end = q_start + pred._n_quantiles
            q_slice = pred._quantiles[:, q_start:q_end]
            y_col = y_arr[:, t_idx] if y_arr.ndim == 2 else y_arr

            # Empirical coverage at each nominal quantile level:
            # fraction of observations where true value <= predicted quantile
            empirical_coverage = np.mean(y_col[:, None] <= q_slice, axis=0)

            iso = IsotonicRegression(out_of_bounds="clip")
            iso.fit(pred._levels, empirical_coverage)
            regressors.append(iso)

        return regressors

    @staticmethod
    def _extract_y(
        data: pl.DataFrame,
        target: TargetSpec | None,
        pred: DistributionPrediction,
    ) -> np.ndarray:
        targets = pred._targets
        cols = [to_numpy_series(data[t]) for t in targets]
        if len(cols) == 1:
            return cols[0]
        return np.column_stack(cols)

__init__(model, quantile_levels=None, cross_calibrate=False, n_folds=5, random_state=None)

Parameters:

Name Type Description Default
model BaseUncertaintyModel

A fitted BaseUncertaintyModel to recalibrate.

required
quantile_levels list[float] | None

Quantile levels for the output predictions. Defaults to the inner model's levels.

None
cross_calibrate bool

If True, use K-fold cross-fitting on the calibration data to avoid overfitting the isotonic map.

False
n_folds int

Number of folds when cross_calibrate=True.

5
random_state int | None

Random seed for cross-fitting.

None
Source code in uncertainty_flow/calibration/recalibration.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(
    self,
    model: BaseUncertaintyModel,
    quantile_levels: list[float] | None = None,
    cross_calibrate: bool = False,
    n_folds: int = 5,
    random_state: int | None = None,
):
    """
    Args:
        model: A fitted BaseUncertaintyModel to recalibrate.
        quantile_levels: Quantile levels for the output predictions.
            Defaults to the inner model's levels.
        cross_calibrate: If True, use K-fold cross-fitting on the
            calibration data to avoid overfitting the isotonic map.
        n_folds: Number of folds when cross_calibrate=True.
        random_state: Random seed for cross-fitting.
    """
    self.model = model
    self.quantile_levels = quantile_levels
    self.cross_calibrate = cross_calibrate
    self.n_folds = n_folds
    self.random_state = random_state

    self._fitted = False
    self._isotonic_regressors: list[IsotonicRegression] | None = None
    self._output_levels: np.ndarray | None = None
    self._target_names: list[str] | None = None

fit(data, target=None, **kwargs)

Learn the isotonic recalibration mapping.

Parameters:

Name Type Description Default
data PolarsInput

Calibration dataset. If cross_calibrate=True, this data is split into K folds for cross-fitting.

required
target TargetSpec | None

Target column name(s).

None

Returns:

Type Description
RecalibratedModel

self

Source code in uncertainty_flow/calibration/recalibration.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def fit(
    self,
    data: PolarsInput,
    target: TargetSpec | None = None,
    **kwargs,
) -> RecalibratedModel:
    """
    Learn the isotonic recalibration mapping.

    Args:
        data: Calibration dataset. If cross_calibrate=True, this data
            is split into K folds for cross-fitting.
        target: Target column name(s).

    Returns:
        self
    """
    data = materialize_lazyframe(data)

    if self.cross_calibrate:
        self._fit_cross_calibrated(data, target)
    else:
        self._fit_direct(data, target)

    self._fitted = True
    return self

predict(data)

Generate recalibrated predictions.

Parameters:

Name Type Description Default
data PolarsInput

Input data for prediction.

required

Returns:

Type Description
DistributionPrediction

DistributionPrediction with recalibrated quantile values.

Source code in uncertainty_flow/calibration/recalibration.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def predict(self, data: PolarsInput) -> DistributionPrediction:
    """
    Generate recalibrated predictions.

    Args:
        data: Input data for prediction.

    Returns:
        DistributionPrediction with recalibrated quantile values.
    """
    if not self._fitted:
        raise ModelNotFittedError("RecalibratedModel")

    data = materialize_lazyframe(data)
    pred = self.model.predict(data)

    if self._isotonic_regressors is None:
        raise RuntimeError("Internal error: isotonic regressors not fitted")
    if self._output_levels is None:
        raise RuntimeError("Internal error: output levels not set")
    if self._target_names is None:
        raise RuntimeError("Internal error: target names not set")

    n_targets = len(pred._targets)
    n_quantiles = pred._n_quantiles
    n_out = len(self._output_levels)

    output_matrix = np.empty((pred._n_samples, n_targets * n_out))

    for t_idx in range(n_targets):
        q_start = t_idx * n_quantiles
        q_end = q_start + n_quantiles
        q_slice = pred._quantiles[:, q_start:q_end]

        iso = self._isotonic_regressors[t_idx]

        # Build inverse mapping: for desired empirical coverage,
        # find the nominal quantile level that produces it.
        # iso maps nominal_level -> empirical_coverage.
        # We evaluate on the model's levels (which are increasing) to get
        # empirical coverage at each level, then interpolate back.
        empirical_at_levels = iso.predict(pred._levels)

        out_q = np.empty((pred._n_samples, n_out))
        for j in range(n_out):
            level = self._output_levels[j]

            # Find nominal level such that empirical coverage ≈ level
            # Since both empirical_at_levels and pred._levels are
            # monotone increasing, np.interp gives the inverse.
            if empirical_at_levels[-1] <= empirical_at_levels[0]:
                # Degenerate isotonic map (should not happen)
                input_level = level
            else:
                input_level = float(np.interp(level, empirical_at_levels, pred._levels))

            # Find nearest model quantile to input_level
            nearest_idx = int(np.argmin(np.abs(pred._levels - input_level)))
            out_q[:, j] = q_slice[:, nearest_idx]

        o_start = t_idx * n_out
        o_end = o_start + n_out
        output_matrix[:, o_start:o_end] = out_q

    output_matrix = np.sort(output_matrix, axis=1)

    return DistributionPrediction(
        quantile_matrix=output_matrix,
        quantile_levels=self._output_levels.tolist(),
        target_names=list(self._target_names),
    )

calibration_report(model, data, target, quantile_levels=None)

Generate calibration report for a fitted model.

Parameters:

Name Type Description Default
model BaseUncertaintyModel

Fitted uncertainty model

required
data DataFrame

Validation data

required
target str | list[str]

Target column name(s)

required
quantile_levels list[float] | None

Quantile levels to evaluate (default: [0.8, 0.9, 0.95])

None

Returns:

Type Description
DataFrame

Polars DataFrame with calibration metrics:

DataFrame
  • quantile: Quantile level
DataFrame
  • requested_coverage: Requested coverage
DataFrame
  • achieved_coverage: Actual empirical coverage
DataFrame
  • sharpness: Average interval width
DataFrame
  • winkler_score: Winkler interval score

Examples:

>>> from uncertainty_flow.wrappers import ConformalRegressor
>>> from sklearn.ensemble import GradientBoostingRegressor
>>> model = ConformalRegressor(GradientBoostingRegressor())
>>> model.fit(train_df, target="price")
>>> report = model.calibration_report(val_df, target="price")
>>> print(report)
Source code in uncertainty_flow/utils/calibration_utils.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def calibration_report(
    model: "BaseUncertaintyModel",
    data: pl.DataFrame,
    target: str | list[str],
    quantile_levels: list[float] | None = None,
) -> pl.DataFrame:
    """
    Generate calibration report for a fitted model.

    Args:
        model: Fitted uncertainty model
        data: Validation data
        target: Target column name(s)
        quantile_levels: Quantile levels to evaluate (default: [0.8, 0.9, 0.95])

    Returns:
        Polars DataFrame with calibration metrics:
        - quantile: Quantile level
        - requested_coverage: Requested coverage
        - achieved_coverage: Actual empirical coverage
        - sharpness: Average interval width
        - winkler_score: Winkler interval score

    Examples:
        >>> from uncertainty_flow.wrappers import ConformalRegressor
        >>> from sklearn.ensemble import GradientBoostingRegressor
        >>> model = ConformalRegressor(GradientBoostingRegressor())
        >>> model.fit(train_df, target="price")
        >>> report = model.calibration_report(val_df, target="price")
        >>> print(report)
    """
    if quantile_levels is None:
        quantile_levels = [0.8, 0.9, 0.95]

    # Handle multivariate targets
    if isinstance(target, str):
        targets = [target]
    else:
        targets = target

    results = []
    prediction = model.predict(data)

    for level in quantile_levels:
        intervals = prediction.interval(confidence=level)

        # Get actuals and predictions
        row_results: dict[str, float] = {}
        sharpness_sum: float = 0.0
        winkler_sum: float = 0.0
        total_coverage: float = 0.0
        n_targets: int = 0

        for t in targets:
            actuals = data[t]
            if len(targets) == 1:
                lower = intervals["lower"]
                upper = intervals["upper"]
            else:
                lower = intervals[f"{t}_lower"]
                upper = intervals[f"{t}_upper"]

            achieved = coverage_score(actuals, lower, upper)
            sharpness_values = to_numpy_series(upper - lower)
            sharpness = float(np.mean(sharpness_values))
            winkler = winkler_score(actuals, lower, upper, level)

            row_results[f"{t}_achieved"] = achieved
            sharpness_sum += sharpness
            winkler_sum += winkler
            total_coverage += achieved
            n_targets += 1

        # Average across targets
        avg_coverage = total_coverage / n_targets
        avg_sharpness = sharpness_sum / n_targets
        avg_winkler = winkler_sum / n_targets

        results.append(
            {
                "quantile": level,
                "requested_coverage": level,
                "achieved_coverage": avg_coverage,
                "sharpness": avg_sharpness,
                "winkler_score": avg_winkler,
            }
        )

        # Warn if coverage gap > 5%
        if abs(avg_coverage - level) > 0.05:
            warnings.warn(
                f"Requested {level} coverage but achieved {avg_coverage:.2f}. "
                f"Model may be miscalibrated. [UF-W003]",
                UncertaintyFlowWarning,
                stacklevel=3,
            )

    return pl.DataFrame(results)

compute_uncertainty_drivers(residuals, features)

Compute correlation between features and squared residuals.

Features with high correlation to squared residuals indicate heteroscedasticity - they drive uncertainty in predictions.

Parameters:

Name Type Description Default
residuals ndarray

Model residuals (y_true - y_pred)

required
features DataFrame

Feature DataFrame

required

Returns:

Type Description
DataFrame

Polars DataFrame with columns:

DataFrame
  • feature: Feature name
DataFrame
  • residual_correlation: Pearson correlation with squared residuals
DataFrame
  • p_value: Statistical significance
DataFrame

Sorted by absolute correlation descending.

Examples:

>>> import numpy as np
>>> import polars as pl
>>> residuals = np.array([1, -2, 3, -1, 2])
>>> features = pl.DataFrame({
...     "a": [1, 2, 3, 4, 5],
...     "b": [5, 4, 3, 2, 1],
... })
>>> compute_uncertainty_drivers(residuals, features)
shape: (2, 3)
┌─────────┬─────────────────────┬─────────┐
│ feature ┆ residual_correlation ┆ p_value │
│ ---     ┆ ---                   ┆ ---     │
│ str     ┆ f64                  ┆ f64     │
╞═════════╪══════════════════════╪═════════╡
│ a       ┆ 0.89                  ┆ 0.11    │
│ b       ┆ -0.89                 ┆ 0.11    │
└─────────┴─────────────────────┴─────────┘
Source code in uncertainty_flow/calibration/residual_analysis.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def compute_uncertainty_drivers(
    residuals: np.ndarray,
    features: pl.DataFrame,
) -> pl.DataFrame:
    """
    Compute correlation between features and squared residuals.

    Features with high correlation to squared residuals indicate
    heteroscedasticity - they drive uncertainty in predictions.

    Args:
        residuals: Model residuals (y_true - y_pred)
        features: Feature DataFrame

    Returns:
        Polars DataFrame with columns:
        - feature: Feature name
        - residual_correlation: Pearson correlation with squared residuals
        - p_value: Statistical significance

        Sorted by absolute correlation descending.

    Examples:
        >>> import numpy as np
        >>> import polars as pl
        >>> residuals = np.array([1, -2, 3, -1, 2])
        >>> features = pl.DataFrame({
        ...     "a": [1, 2, 3, 4, 5],
        ...     "b": [5, 4, 3, 2, 1],
        ... })
        >>> compute_uncertainty_drivers(residuals, features)
        shape: (2, 3)
        ┌─────────┬─────────────────────┬─────────┐
        │ feature ┆ residual_correlation ┆ p_value │
        │ ---     ┆ ---                   ┆ ---     │
        │ str     ┆ f64                  ┆ f64     │
        ╞═════════╪══════════════════════╪═════════╡
        │ a       ┆ 0.89                  ┆ 0.11    │
        │ b       ┆ -0.89                 ┆ 0.11    │
        └─────────┴─────────────────────┴─────────┘
    """
    squared_residuals = residuals**2

    results = []
    for col in features.columns:
        feature_vals = features[col].to_numpy()

        # Skip constant features
        if np.std(feature_vals) == 0:
            continue

        # Compute correlation
        corr, p_value = stats.pearsonr(feature_vals, squared_residuals)

        results.append(
            {
                "feature": col,
                "residual_correlation": corr,
                "p_value": p_value,
            }
        )

    if not results:
        return pl.DataFrame(
            schema={
                "feature": pl.String,
                "residual_correlation": pl.Float64,
                "p_value": pl.Float64,
            }
        )

    df = pl.DataFrame(results)
    df = df.sort(by="residual_correlation", descending=True)

    # Warn if no significant drivers found
    if df.filter(pl.col("p_value") < 0.05).height == 0:
        warnings.warn(
            "Residual correlation analysis found no significant drivers. "
            "Intervals may be uniformly conservative. [UF-W004]",
            UncertaintyFlowWarning,
            stacklevel=3,
        )

    return df

uncertainty_shap(model, X, background=None, quantile_pairs=None)

Compute SHAP values for quantile intervals to identify interval width drivers.

Runs SHAP on the upper and lower quantile predictions separately, then computes the difference to identify what drives interval width.

Parameters:

Name Type Description Default
model

Fitted uncertainty model with predict() returning DistributionPrediction

required
X DataFrame

Feature DataFrame for SHAP evaluation

required
background DataFrame | None

Background dataset for SHAP explanation. If None, uses X[:100] as suggested in roadmap.

None
quantile_pairs list[tuple[float, float]] | None

List of (lower, upper) quantile pairs to analyze. Defaults to [(0.1, 0.9), (0.05, 0.95)].

None

Returns:

Type Description
DataFrame

Polars DataFrame with columns:

DataFrame
  • feature: Feature name
DataFrame
  • quantile_level: e.g., "Q0.1", "Q0.9"
DataFrame
  • shap_value: Mean absolute SHAP value
DataFrame
  • interval_width_contribution: Difference between upper and lower SHAP

Examples:

>>> import numpy as np
>>> import polars as pl
>>> from uncertainty_flow import DeepQuantileNet
>>> from uncertainty_flow.calibration import uncertainty_shap
>>>
>>> np.random.seed(42)
>>> n = 100
>>> df = pl.DataFrame({
...     "x1": np.random.randn(n),
...     "x2": np.random.randn(n),
...     "y": 3 * np.random.randn(n) + 5,
... })
>>> model = DeepQuantileNet(random_state=42)
>>> model.fit(df, target="y")
>>> X = df.select(["x1", "x2"])
>>> shap_df = uncertainty_shap(model, X)
>>> shap_df
Source code in uncertainty_flow/calibration/shap_values.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def uncertainty_shap(
    model,
    X: pl.DataFrame,  # noqa: N803
    background: pl.DataFrame | None = None,
    quantile_pairs: list[tuple[float, float]] | None = None,
) -> pl.DataFrame:
    """
    Compute SHAP values for quantile intervals to identify interval width drivers.

    Runs SHAP on the upper and lower quantile predictions separately,
    then computes the difference to identify what drives interval width.

    Args:
        model: Fitted uncertainty model with predict() returning DistributionPrediction
        X: Feature DataFrame for SHAP evaluation
        background: Background dataset for SHAP explanation.
                   If None, uses X[:100] as suggested in roadmap.
        quantile_pairs: List of (lower, upper) quantile pairs to analyze.
                       Defaults to [(0.1, 0.9), (0.05, 0.95)].

    Returns:
        Polars DataFrame with columns:
        - feature: Feature name
        - quantile_level: e.g., "Q0.1", "Q0.9"
        - shap_value: Mean absolute SHAP value
        - interval_width_contribution: Difference between upper and lower SHAP

    Examples:
        >>> import numpy as np
        >>> import polars as pl
        >>> from uncertainty_flow import DeepQuantileNet
        >>> from uncertainty_flow.calibration import uncertainty_shap
        >>>
        >>> np.random.seed(42)
        >>> n = 100
        >>> df = pl.DataFrame({
        ...     "x1": np.random.randn(n),
        ...     "x2": np.random.randn(n),
        ...     "y": 3 * np.random.randn(n) + 5,
        ... })
        >>> model = DeepQuantileNet(random_state=42)
        >>> model.fit(df, target="y")
        >>> X = df.select(["x1", "x2"])
        >>> shap_df = uncertainty_shap(model, X)
        >>> shap_df
    """
    try:
        import shap
    except ImportError:
        raise ImportError(
            "shap is required for uncertainty_shap. "
            "Install with: pip install 'uncertainty-flow[ml]'"
        )

    if quantile_pairs is None:
        quantile_pairs = [(0.1, 0.9), (0.05, 0.95)]

    if background is None:
        background = X.head(100)

    background_np = to_numpy(background, background.columns)
    x_np = to_numpy(X, X.columns)
    feature_names = X.columns

    pred = model.predict(X.head(1))
    n_targets = len(pred.target_names)

    results = []

    for target_idx, target_name in enumerate(pred.target_names):
        for lower_q, upper_q in quantile_pairs:

            def lower_quantile_model(x):
                preds = model.predict(pl.DataFrame(x, schema=X.columns))
                q_matrix = preds._quantiles
                lower_idx = preds._find_nearest_quantile_index(lower_q)

                if n_targets == 1:
                    return q_matrix[:, lower_idx]
                else:
                    start = target_idx * preds._n_quantiles
                    return q_matrix[:, start + lower_idx]

            def upper_quantile_model(x):
                preds = model.predict(pl.DataFrame(x, schema=X.columns))
                q_matrix = preds._quantiles
                upper_idx = preds._find_nearest_quantile_index(upper_q)

                if n_targets == 1:
                    return q_matrix[:, upper_idx]
                else:
                    start = target_idx * preds._n_quantiles
                    return q_matrix[:, start + upper_idx]

            try:
                lower_explainer = shap.KernelExplainer(lower_quantile_model, background_np)
                lower_shap = lower_explainer.shap_values(x_np)

                upper_explainer = shap.KernelExplainer(upper_quantile_model, background_np)
                upper_shap = upper_explainer.shap_values(x_np)

                for feat_idx, feat_name in enumerate(feature_names):
                    lower_vals = lower_shap[:, feat_idx]
                    upper_vals = upper_shap[:, feat_idx]

                    results.append(
                        {
                            "feature": feat_name,
                            "target": target_name,
                            f"shap_Q{lower_q}": float(np.mean(np.abs(lower_vals))),
                            f"shap_Q{upper_q}": float(np.mean(np.abs(upper_vals))),
                            "interval_width_contribution": float(
                                np.mean(np.abs(upper_vals - lower_vals))
                            ),
                        }
                    )
            except (ValueError, RuntimeError, np.linalg.LinAlgError):
                continue

    if not results:
        return pl.DataFrame(
            schema={
                "feature": pl.String,
                "target": pl.String,
                "shap_Q0.1": pl.Float64,
                "shap_Q0.9": pl.Float64,
                "interval_width_contribution": pl.Float64,
            }
        )

    return pl.DataFrame(results)