Skip to content

Risk

uncertainty_flow.risk

Conformal risk control for arbitrary risk functions.

ConformalRiskControl

Conformal prediction for controlling arbitrary risk functions.

Unlike traditional conformal prediction that controls coverage probability, this class controls expected risk for arbitrary user-defined risk functions.

Parameters

base_model : BaseUncertaintyModel Fitted uncertainty model with predict() method risk_function : Callable[[np.ndarray, np.ndarray], np.ndarray] Risk function that takes (y_true, y_pred) and returns risk values target_risk : float, default=0.1 Target expected risk level to control calibration_method : str, default="quantile" Method for computing risk threshold ("quantile" or "mean") random_state : int, optional Random seed for reproducibility

Examples

import polars as pl from sklearn.ensemble import GradientBoostingRegressor from uncertainty_flow.wrappers import ConformalRegressor from uncertainty_flow.risk import ConformalRiskControl, asymmetric_loss

Base model

base_model = GradientBoostingRegressor(random_state=42) conformal_model = ConformalRegressor(base_model) conformal_model.fit(train_data, target="y")

Wrap with risk control

risk_model = ConformalRiskControl( ... base_model=conformal_model, ... risk_function=asymmetric_loss(underprediction_penalty=2.0), ... target_risk=0.1, ... )

Predictions are risk-calibrated

pred = risk_model.predict(test_data)

Source code in uncertainty_flow/risk/control.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class ConformalRiskControl:
    """
    Conformal prediction for controlling arbitrary risk functions.

    Unlike traditional conformal prediction that controls coverage probability,
    this class controls expected risk for arbitrary user-defined risk functions.

    Parameters
    ----------
    base_model : BaseUncertaintyModel
        Fitted uncertainty model with predict() method
    risk_function : Callable[[np.ndarray, np.ndarray], np.ndarray]
        Risk function that takes (y_true, y_pred) and returns risk values
    target_risk : float, default=0.1
        Target expected risk level to control
    calibration_method : str, default="quantile"
        Method for computing risk threshold ("quantile" or "mean")
    random_state : int, optional
        Random seed for reproducibility

    Examples
    --------
    >>> import polars as pl
    >>> from sklearn.ensemble import GradientBoostingRegressor
    >>> from uncertainty_flow.wrappers import ConformalRegressor
    >>> from uncertainty_flow.risk import ConformalRiskControl, asymmetric_loss
    >>>
    >>> # Base model
    >>> base_model = GradientBoostingRegressor(random_state=42)
    >>> conformal_model = ConformalRegressor(base_model)
    >>> conformal_model.fit(train_data, target="y")
    >>>
    >>> # Wrap with risk control
    >>> risk_model = ConformalRiskControl(
    ...     base_model=conformal_model,
    ...     risk_function=asymmetric_loss(underprediction_penalty=2.0),
    ...     target_risk=0.1,
    ... )
    >>>
    >>> # Predictions are risk-calibrated
    >>> pred = risk_model.predict(test_data)
    """

    def __init__(
        self,
        base_model: "BaseUncertaintyModel",
        risk_function: Callable[[np.ndarray, np.ndarray], np.ndarray],
        target_risk: float = 0.1,
        calibration_method: str = "quantile",
        random_state: int | None = None,
    ):
        self.base_model = base_model
        self.risk_function = risk_function
        self.target_risk = target_risk
        self.calibration_method = calibration_method
        self.random_state = random_state

        # Fitted attributes
        self._risk_threshold: float | None = None
        self._calibration_risks: np.ndarray | None = None
        self._calibration_proxy: np.ndarray | None = None
        self._proxy_grid: np.ndarray | None = None
        self._risk_curve: np.ndarray | None = None

    def fit(
        self,
        data: pl.DataFrame,
        target: str,
    ) -> "ConformalRiskControl":
        """
        Fit risk calibration using calibration data.

        Computes risk threshold needed to achieve target risk level.

        Args:
            data: Calibration data with features and target
            target: Target column name

        Returns
        -------
        self
            Fitted ConformalRiskControl instance
        """
        from ..utils.polars_bridge import to_numpy

        features = data.drop(target)
        y_true = to_numpy(data, [target]).flatten()

        prediction = self.base_model.predict(features)
        y_pred = _prediction_mean(prediction)
        proxy = _interval_half_width(prediction)

        risks = self.risk_function(y_true, y_pred)
        if risks.shape != proxy.shape:
            raise ValueError(
                "risk_function must return one scalar risk value per sample. "
                f"Got shape {risks.shape}, expected {proxy.shape}."
            )

        metric_fn = self._risk_metric_fn()
        order = np.argsort(proxy)
        sorted_proxy = proxy[order]
        sorted_risks = risks[order]
        unique_proxy, unique_idx = np.unique(sorted_proxy, return_index=True)

        risk_curve = np.empty_like(unique_proxy, dtype=float)
        threshold = float(unique_proxy[0])

        for i, start_idx in enumerate(unique_idx):
            accepted_risks = sorted_risks[: start_idx + 1]
            metric_value = float(metric_fn(accepted_risks))
            risk_curve[i] = metric_value
            if metric_value <= self.target_risk:
                threshold = float(unique_proxy[i])

        self._calibration_risks = risks
        self._calibration_proxy = proxy
        self._proxy_grid = unique_proxy
        self._risk_curve = risk_curve
        self._risk_threshold = threshold

        if self._risk_threshold is None:
            raise ValueError(
                f"Unknown calibration_method: {self.calibration_method}. Use 'quantile' or 'mean'."
            )

        return self

    def predict(
        self,
        data: pl.DataFrame,
    ) -> pl.DataFrame:
        """
        Generate risk-calibrated predictions.

        Args:
            data: Feature DataFrame for prediction

        Returns
        -------
        pl.DataFrame
            DataFrame with columns:
                - prediction: Point predictions
                - risk: Expected risk for each prediction
                - exceeds_threshold: Whether risk exceeds calibrated threshold

        Raises
        ------
        InvalidDataError
            If model has not been fitted
        """
        if self._risk_threshold is None:
            raise InvalidDataError(
                "ConformalRiskControl must be fitted before prediction. Call fit() first."
            )

        prediction = self.base_model.predict(data)
        y_pred = _prediction_mean(prediction)
        proxy = _interval_half_width(prediction)
        risks = self._estimate_risk(proxy)

        return pl.DataFrame(
            {
                "prediction": y_pred,
                "risk": risks,
                "exceeds_threshold": proxy > self._risk_threshold,
            }
        )

    def _risk_metric_fn(self) -> Callable[[np.ndarray], float]:
        """Return the calibration metric used to map proxy to realized risk."""
        if self.calibration_method == "mean":
            return lambda values: float(np.mean(values))
        if self.calibration_method == "quantile":
            quantile_level = min(max(1 - self.target_risk, 0.5), 0.99)
            return lambda values: float(np.quantile(values, quantile_level))

        raise ValueError(
            f"Unknown calibration_method: {self.calibration_method}. Use 'quantile' or 'mean'."
        )

    def _estimate_risk(self, proxy: np.ndarray) -> np.ndarray:
        """Estimate realized risk from the calibration proxy curve."""
        if self._proxy_grid is None or self._risk_curve is None:
            raise InvalidDataError(
                "ConformalRiskControl must be fitted before prediction. Call fit() first."
            )

        risk_curve = self._risk_curve
        proxy_grid = self._proxy_grid
        assert risk_curve is not None and proxy_grid is not None
        return np.interp(
            proxy,
            proxy_grid,
            risk_curve,
            left=float(risk_curve[0]),
            right=float(risk_curve[-1]),
        )

    def risk_threshold(self) -> float:
        """
        Return the calibrated risk threshold.

        Returns
        -------
        float
            Risk threshold used for predictions

        Raises
        ------
        InvalidDataError
            If model has not been fitted
        """
        if self._risk_threshold is None:
            raise InvalidDataError(
                "ConformalRiskControl must be fitted before accessing risk_threshold. "
                "Call fit() first."
            )
        return self._risk_threshold

    def summary(self) -> dict[str, object]:
        """
        Return summary of the risk control configuration.

        Returns
        -------
        dict
            Dictionary with configuration and calibration results
        """
        return {
            "target_risk": self.target_risk,
            "calibration_method": self.calibration_method,
            "risk_threshold": self._risk_threshold,
            "n_calibration_samples": (
                len(self._calibration_risks) if self._calibration_risks is not None else None
            ),
        }

fit(data, target)

Fit risk calibration using calibration data.

Computes risk threshold needed to achieve target risk level.

Parameters:

Name Type Description Default
data DataFrame

Calibration data with features and target

required
target str

Target column name

required
Returns

self Fitted ConformalRiskControl instance

Source code in uncertainty_flow/risk/control.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def fit(
    self,
    data: pl.DataFrame,
    target: str,
) -> "ConformalRiskControl":
    """
    Fit risk calibration using calibration data.

    Computes risk threshold needed to achieve target risk level.

    Args:
        data: Calibration data with features and target
        target: Target column name

    Returns
    -------
    self
        Fitted ConformalRiskControl instance
    """
    from ..utils.polars_bridge import to_numpy

    features = data.drop(target)
    y_true = to_numpy(data, [target]).flatten()

    prediction = self.base_model.predict(features)
    y_pred = _prediction_mean(prediction)
    proxy = _interval_half_width(prediction)

    risks = self.risk_function(y_true, y_pred)
    if risks.shape != proxy.shape:
        raise ValueError(
            "risk_function must return one scalar risk value per sample. "
            f"Got shape {risks.shape}, expected {proxy.shape}."
        )

    metric_fn = self._risk_metric_fn()
    order = np.argsort(proxy)
    sorted_proxy = proxy[order]
    sorted_risks = risks[order]
    unique_proxy, unique_idx = np.unique(sorted_proxy, return_index=True)

    risk_curve = np.empty_like(unique_proxy, dtype=float)
    threshold = float(unique_proxy[0])

    for i, start_idx in enumerate(unique_idx):
        accepted_risks = sorted_risks[: start_idx + 1]
        metric_value = float(metric_fn(accepted_risks))
        risk_curve[i] = metric_value
        if metric_value <= self.target_risk:
            threshold = float(unique_proxy[i])

    self._calibration_risks = risks
    self._calibration_proxy = proxy
    self._proxy_grid = unique_proxy
    self._risk_curve = risk_curve
    self._risk_threshold = threshold

    if self._risk_threshold is None:
        raise ValueError(
            f"Unknown calibration_method: {self.calibration_method}. Use 'quantile' or 'mean'."
        )

    return self

predict(data)

Generate risk-calibrated predictions.

Parameters:

Name Type Description Default
data DataFrame

Feature DataFrame for prediction

required
Returns

pl.DataFrame DataFrame with columns: - prediction: Point predictions - risk: Expected risk for each prediction - exceeds_threshold: Whether risk exceeds calibrated threshold

Raises

InvalidDataError If model has not been fitted

Source code in uncertainty_flow/risk/control.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def predict(
    self,
    data: pl.DataFrame,
) -> pl.DataFrame:
    """
    Generate risk-calibrated predictions.

    Args:
        data: Feature DataFrame for prediction

    Returns
    -------
    pl.DataFrame
        DataFrame with columns:
            - prediction: Point predictions
            - risk: Expected risk for each prediction
            - exceeds_threshold: Whether risk exceeds calibrated threshold

    Raises
    ------
    InvalidDataError
        If model has not been fitted
    """
    if self._risk_threshold is None:
        raise InvalidDataError(
            "ConformalRiskControl must be fitted before prediction. Call fit() first."
        )

    prediction = self.base_model.predict(data)
    y_pred = _prediction_mean(prediction)
    proxy = _interval_half_width(prediction)
    risks = self._estimate_risk(proxy)

    return pl.DataFrame(
        {
            "prediction": y_pred,
            "risk": risks,
            "exceeds_threshold": proxy > self._risk_threshold,
        }
    )

risk_threshold()

Return the calibrated risk threshold.

Returns

float Risk threshold used for predictions

Raises

InvalidDataError If model has not been fitted

Source code in uncertainty_flow/risk/control.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def risk_threshold(self) -> float:
    """
    Return the calibrated risk threshold.

    Returns
    -------
    float
        Risk threshold used for predictions

    Raises
    ------
    InvalidDataError
        If model has not been fitted
    """
    if self._risk_threshold is None:
        raise InvalidDataError(
            "ConformalRiskControl must be fitted before accessing risk_threshold. "
            "Call fit() first."
        )
    return self._risk_threshold

summary()

Return summary of the risk control configuration.

Returns

dict Dictionary with configuration and calibration results

Source code in uncertainty_flow/risk/control.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def summary(self) -> dict[str, object]:
    """
    Return summary of the risk control configuration.

    Returns
    -------
    dict
        Dictionary with configuration and calibration results
    """
    return {
        "target_risk": self.target_risk,
        "calibration_method": self.calibration_method,
        "risk_threshold": self._risk_threshold,
        "n_calibration_samples": (
            len(self._calibration_risks) if self._calibration_risks is not None else None
        ),
    }

asymmetric_loss(overprediction_penalty=1.0, underprediction_penalty=2.0)

Asymmetric loss function for different penalties on over/under prediction.

Useful when overpredictions and underpredictions have different costs.

Parameters

overprediction_penalty : float, default=1.0 Penalty coefficient for overpredictions (pred > true) underprediction_penalty : float, default=2.0 Penalty coefficient for underpredictions (pred < true)

Returns

Callable Risk function that computes asymmetric loss

Examples

import numpy as np from uncertainty_flow.risk import asymmetric_loss

risk_fn = asymmetric_loss(overprediction_penalty=1.0, underprediction_penalty=2.0) y_true = np.array([10, 20, 30]) y_pred = np.array([12, 18, 32]) risk = risk_fn(y_true, y_pred)

Source code in uncertainty_flow/risk/risk_functions.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def asymmetric_loss(
    overprediction_penalty: float = 1.0,
    underprediction_penalty: float = 2.0,
) -> Callable[[np.ndarray, np.ndarray], np.ndarray]:
    """
    Asymmetric loss function for different penalties on over/under prediction.

    Useful when overpredictions and underpredictions have different costs.

    Parameters
    ----------
    overprediction_penalty : float, default=1.0
        Penalty coefficient for overpredictions (pred > true)
    underprediction_penalty : float, default=2.0
        Penalty coefficient for underpredictions (pred < true)

    Returns
    -------
    Callable
        Risk function that computes asymmetric loss

    Examples
    --------
    >>> import numpy as np
    >>> from uncertainty_flow.risk import asymmetric_loss
    >>>
    >>> risk_fn = asymmetric_loss(overprediction_penalty=1.0, underprediction_penalty=2.0)
    >>> y_true = np.array([10, 20, 30])
    >>> y_pred = np.array([12, 18, 32])
    >>> risk = risk_fn(y_true, y_pred)
    """

    def _risk(y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        errors = y_pred - y_true
        loss = np.where(
            errors > 0,
            overprediction_penalty * errors,
            -underprediction_penalty * errors,
        )
        return loss

    return _risk

financial_var(var_level=0.95, excess_penalty=10.0)

Financial Value-at-Risk (VaR) style risk function.

Penalizes predictions that exceed VaR threshold.

Parameters

var_level : float, default=0.95 VaR confidence level (e.g., 0.95 for 95% VaR) excess_penalty : float, default=10.0 Multiplier for excess loss beyond VaR threshold

Returns

Callable Risk function that computes VaR-based penalty

Examples

import numpy as np from uncertainty_flow.risk import financial_var

risk_fn = financial_var(var_level=0.95) y_true = np.array([100, 100, 100]) y_pred = np.array([95, 105, 120]) risk = risk_fn(y_true, y_pred)

Source code in uncertainty_flow/risk/risk_functions.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def financial_var(
    var_level: float = 0.95,
    excess_penalty: float = 10.0,
) -> Callable[[np.ndarray, np.ndarray], np.ndarray]:
    """
    Financial Value-at-Risk (VaR) style risk function.

    Penalizes predictions that exceed VaR threshold.

    Parameters
    ----------
    var_level : float, default=0.95
        VaR confidence level (e.g., 0.95 for 95% VaR)
    excess_penalty : float, default=10.0
        Multiplier for excess loss beyond VaR threshold

    Returns
    -------
    Callable
        Risk function that computes VaR-based penalty

    Examples
    --------
    >>> import numpy as np
    >>> from uncertainty_flow.risk import financial_var
    >>>
    >>> risk_fn = financial_var(var_level=0.95)
    >>> y_true = np.array([100, 100, 100])
    >>> y_pred = np.array([95, 105, 120])
    >>> risk = risk_fn(y_true, y_pred)
    """

    def _risk(y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        losses = np.abs(y_true - y_pred)
        var_threshold = np.quantile(losses, var_level)
        excess_loss = np.maximum(losses - var_threshold, 0)
        return losses + excess_penalty * excess_loss

    return _risk

inventory_cost(holding_cost=1.0, stockout_cost=10.0)

Inventory management cost function.

Models the cost of holding excess inventory vs. stockouts. Useful for demand forecasting optimization.

Parameters

holding_cost : float, default=1.0 Cost per unit of overpredicted demand (excess inventory) stockout_cost : float, default=10.0 Cost per unit of underpredicted demand (stockout)

Returns

Callable Risk function that computes inventory cost

Examples

import numpy as np from uncertainty_flow.risk import inventory_cost

risk_fn = inventory_cost(holding_cost=1.0, stockout_cost=10.0) demand = np.array([100, 150, 200]) forecast = np.array([110, 140, 210]) cost = risk_fn(demand, forecast)

Source code in uncertainty_flow/risk/risk_functions.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def inventory_cost(
    holding_cost: float = 1.0,
    stockout_cost: float = 10.0,
) -> Callable[[np.ndarray, np.ndarray], np.ndarray]:
    """
    Inventory management cost function.

    Models the cost of holding excess inventory vs. stockouts.
    Useful for demand forecasting optimization.

    Parameters
    ----------
    holding_cost : float, default=1.0
        Cost per unit of overpredicted demand (excess inventory)
    stockout_cost : float, default=10.0
        Cost per unit of underpredicted demand (stockout)

    Returns
    -------
    Callable
        Risk function that computes inventory cost

    Examples
    --------
    >>> import numpy as np
    >>> from uncertainty_flow.risk import inventory_cost
    >>>
    >>> risk_fn = inventory_cost(holding_cost=1.0, stockout_cost=10.0)
    >>> demand = np.array([100, 150, 200])
    >>> forecast = np.array([110, 140, 210])
    >>> cost = risk_fn(demand, forecast)
    """

    def _risk(y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        # Overprediction: holding cost for excess inventory
        over = np.maximum(y_pred - y_true, 0) * holding_cost
        # Underprediction: stockout cost for missed demand
        under = np.maximum(y_true - y_pred, 0) * stockout_cost
        return over + under

    return _risk

threshold_penalty(threshold, penalty_above=10.0, penalty_below=1.0)

Threshold-based penalty function.

Applies higher penalty when error exceeds threshold.

Parameters

threshold : float Error threshold for penalty escalation penalty_above : float, default=10.0 Penalty when error exceeds threshold penalty_below : float, default=1.0 Base penalty when error is within threshold

Returns

Callable Risk function that computes threshold penalty

Examples

import numpy as np from uncertainty_flow.risk import threshold_penalty

risk_fn = threshold_penalty(threshold=5.0) y_true = np.array([100, 100, 100]) y_pred = np.array([95, 105, 120]) risk = risk_fn(y_true, y_pred)

Source code in uncertainty_flow/risk/risk_functions.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def threshold_penalty(
    threshold: float,
    penalty_above: float = 10.0,
    penalty_below: float = 1.0,
) -> Callable[[np.ndarray, np.ndarray], np.ndarray]:
    """
    Threshold-based penalty function.

    Applies higher penalty when error exceeds threshold.

    Parameters
    ----------
    threshold : float
        Error threshold for penalty escalation
    penalty_above : float, default=10.0
        Penalty when error exceeds threshold
    penalty_below : float, default=1.0
        Base penalty when error is within threshold

    Returns
    -------
    Callable
        Risk function that computes threshold penalty

    Examples
    --------
    >>> import numpy as np
    >>> from uncertainty_flow.risk import threshold_penalty
    >>>
    >>> risk_fn = threshold_penalty(threshold=5.0)
    >>> y_true = np.array([100, 100, 100])
    >>> y_pred = np.array([95, 105, 120])
    >>> risk = risk_fn(y_true, y_pred)
    """

    def _risk(y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        errors = np.abs(y_true - y_pred)
        loss = np.where(
            errors > threshold,
            penalty_above * (errors - threshold),
            penalty_below * errors,
        )
        return loss

    return _risk