Skip to content

Metrics

uncertainty_flow.metrics

Metrics for evaluating probabilistic predictions.

calibration_error(y_true, lower, upper, nominal_coverage=0.9)

Absolute deviation of empirical coverage from nominal coverage.

Parameters:

Name Type Description Default
y_true Series | ndarray

True values

required
lower Series | ndarray

Lower bound of prediction interval

required
upper Series | ndarray

Upper bound of prediction interval

required
nominal_coverage float

Target coverage level (e.g. 0.9)

0.9

Returns:

Type Description
float

Absolute calibration error (float). Lower is better. 0 = perfectly calibrated.

Source code in uncertainty_flow/metrics/calibration.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def calibration_error(
    y_true: pl.Series | np.ndarray,
    lower: pl.Series | np.ndarray,
    upper: pl.Series | np.ndarray,
    nominal_coverage: float = 0.9,
) -> float:
    """
    Absolute deviation of empirical coverage from nominal coverage.

    Args:
        y_true: True values
        lower: Lower bound of prediction interval
        upper: Upper bound of prediction interval
        nominal_coverage: Target coverage level (e.g. 0.9)

    Returns:
        Absolute calibration error (float). Lower is better. 0 = perfectly calibrated.
    """
    y_true, lower, upper = as_numpy(y_true, lower, upper)

    empirical_coverage = np.mean((y_true >= lower) & (y_true <= upper))

    return float(np.abs(empirical_coverage - nominal_coverage))

diebold_mariano_test(errors_a, errors_b, one_sided=True)

Diebold-Mariano test for equal predictive accuracy.

Tests the null hypothesis that two sets of forecast errors have equal expected loss.

Parameters:

Name Type Description Default
errors_a ndarray

Per-sample errors from model A.

required
errors_b ndarray

Per-sample errors from model B.

required
one_sided bool

If True (default), tests A < B (A has lower loss). If False, two-sided test for equal loss.

True

Returns:

Type Description
DataFrame

Polars DataFrame with columns dm_statistic, p_value,

DataFrame

result (reject or not), better_model.

Source code in uncertainty_flow/metrics/comparison.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def diebold_mariano_test(
    errors_a: np.ndarray,
    errors_b: np.ndarray,
    one_sided: bool = True,
) -> pl.DataFrame:
    """Diebold-Mariano test for equal predictive accuracy.

    Tests the null hypothesis that two sets of forecast errors have equal
    expected loss.

    Args:
        errors_a: Per-sample errors from model A.
        errors_b: Per-sample errors from model B.
        one_sided: If True (default), tests A < B (A has lower loss).
            If False, two-sided test for equal loss.

    Returns:
        Polars DataFrame with columns ``dm_statistic``, ``p_value``,
        ``result`` (reject or not), ``better_model``.
    """
    errors_a = np.asarray(errors_a, dtype=float).ravel()
    errors_b = np.asarray(errors_b, dtype=float).ravel()

    if len(errors_a) != len(errors_b):
        raise ValueError(
            f"error arrays must have same length, got {len(errors_a)} vs {len(errors_b)}"
        )

    n = len(errors_a)
    d = errors_a - errors_b
    mean_d = float(np.mean(d))
    var_d = float(np.var(d, ddof=1))

    if var_d <= 0 or n < 2:
        dm_stat = 0.0
        p_value = 1.0
    else:
        dm_stat = mean_d / np.sqrt(var_d / n)
        if one_sided:
            p_value = 1.0 - float(norm.cdf(dm_stat))
        else:
            p_value = 2.0 * (1.0 - float(norm.cdf(abs(dm_stat))))

    alpha = 0.05
    reject = p_value < alpha

    if reject:
        better = "A" if mean_d < 0 else "B"
    else:
        better = "tie"

    return pl.DataFrame(
        [
            {
                "dm_statistic": dm_stat,
                "p_value": p_value,
                "result": "reject" if reject else "not_reject",
                "better_model": better,
            }
        ]
    )

model_confidence_set(predictions, y_true, metric='crps', alpha=0.05)

Hansen et al. (2011) Model Confidence Set.

Sequentially eliminates models that are significantly inferior to the best-performing model. Uses the Diebold-Mariano test for pairwise comparisons with a Bonferroni correction.

Parameters:

Name Type Description Default
predictions dict[str, DistributionPrediction]

Dict mapping model names to DistributionPrediction objects.

required
y_true ndarray

True values.

required
metric str

Scoring metric ("crps", "mae", "rmse", "pinball").

'crps'
alpha float

Significance level (default 0.05).

0.05

Returns:

Type Description
DataFrame

Polars DataFrame with columns model, score, in_set

DataFrame

(whether the model survives in the confidence set).

Source code in uncertainty_flow/metrics/comparison.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def model_confidence_set(
    predictions: dict[str, DistributionPrediction],
    y_true: np.ndarray,
    metric: str = "crps",
    alpha: float = 0.05,
) -> pl.DataFrame:
    """Hansen et al. (2011) Model Confidence Set.

    Sequentially eliminates models that are significantly inferior to the
    best-performing model. Uses the Diebold-Mariano test for pairwise
    comparisons with a Bonferroni correction.

    Args:
        predictions: Dict mapping model names to ``DistributionPrediction``
            objects.
        y_true: True values.
        metric: Scoring metric (``"crps"``, ``"mae"``, ``"rmse"``, ``"pinball"``).
        alpha: Significance level (default 0.05).

    Returns:
        Polars DataFrame with columns ``model``, ``score``, ``in_set``
        (whether the model survives in the confidence set).
    """
    model_names = list(predictions.keys())
    n_models = len(model_names)

    if n_models < 2:
        err = _extract_errors(predictions[model_names[0]], y_true, metric)
        results = [
            {
                "model": model_names[0],
                "score": float(np.mean(err)),
                "in_set": True,
            }
        ]
        return pl.DataFrame(results)

    errors: dict[str, np.ndarray] = {}
    scores: dict[str, float] = {}
    for name in model_names:
        err = _extract_errors(predictions[name], y_true, metric)
        errors[name] = err
        scores[name] = float(np.mean(err))

    surviving = set(model_names)

    changed = True
    while changed and len(surviving) > 1:
        changed = False
        eliminations: set[str] = set()

        surv_list = sorted(surviving)
        n_pairs = len(surv_list) * (len(surv_list) - 1) // 2
        corrected_alpha = alpha / max(1, n_pairs)

        for i in range(len(surv_list)):
            for j in range(i + 1, len(surv_list)):
                a, b = surv_list[i], surv_list[j]
                err_a = errors[a]
                err_b = errors[b]

                d = err_a - err_b
                n = len(d)
                mean_d = float(np.mean(d))
                var_d = float(np.var(d, ddof=1))

                if var_d <= 0 or n < 2:
                    continue

                dm_stat = mean_d / np.sqrt(var_d / n)
                p_value = 2.0 * (1.0 - float(norm.cdf(abs(dm_stat))))

                if p_value < corrected_alpha:
                    if mean_d > 0:
                        eliminations.add(a)
                    else:
                        eliminations.add(b)
                    changed = True

        surviving -= eliminations

    return pl.DataFrame(
        [
            {
                "model": name,
                "score": scores[name],
                "in_set": name in surviving,
            }
            for name in model_names
        ]
    )

skill_score(pred_a, pred_b, y_true, metric='crps')

Relative skill score of model A vs model B.

The skill score (SS) is defined as:

SS = 1 - score(A) / score(B)

where SS > 0 means model A outperforms model B.

Parameters:

Name Type Description Default
pred_a DistributionPrediction

Predictions from model A.

required
pred_b DistributionPrediction

Predictions from model B (baseline).

required
y_true ndarray

True values.

required
metric str

Scoring metric ("crps", "mae", "rmse", "pinball").

'crps'

Returns:

Type Description
DataFrame

Polars DataFrame with columns metric, score_a, score_b,

DataFrame

skill_score.

Source code in uncertainty_flow/metrics/comparison.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def skill_score(
    pred_a: DistributionPrediction,
    pred_b: DistributionPrediction,
    y_true: np.ndarray,
    metric: str = "crps",
) -> pl.DataFrame:
    """Relative skill score of model A vs model B.

    The skill score (SS) is defined as:

        SS = 1 - score(A) / score(B)

    where SS > 0 means model A outperforms model B.

    Args:
        pred_a: Predictions from model A.
        pred_b: Predictions from model B (baseline).
        y_true: True values.
        metric: Scoring metric (``"crps"``, ``"mae"``, ``"rmse"``, ``"pinball"``).

    Returns:
        Polars DataFrame with columns ``metric``, ``score_a``, ``score_b``,
        ``skill_score``.
    """
    errors_a = _extract_errors(pred_a, y_true, metric=metric)
    errors_b = _extract_errors(pred_b, y_true, metric=metric)

    score_a = float(np.mean(errors_a))
    score_b = float(np.mean(errors_b))

    if score_b <= 0:
        ss = 0.0
    else:
        ss = 1.0 - score_a / score_b

    return pl.DataFrame(
        [
            {
                "metric": metric,
                "score_a": score_a,
                "score_b": score_b,
                "skill_score": ss,
            }
        ]
    )

coverage_score(y_true, lower, upper)

Fraction of true values that fall within the prediction interval.

Parameters:

Name Type Description Default
y_true Series | ndarray

True values

required
lower Series | ndarray

Lower bound of prediction interval

required
upper Series | ndarray

Upper bound of prediction interval

required

Returns:

Type Description
float

Fraction of values within interval (float in [0, 1])

Raises:

Type Description
ValueError

If bounds are invalid

Examples:

>>> import polars as pl
>>> y_true = pl.Series([1, 2, 3, 4, 5])
>>> lower = pl.Series([0.5, 1.5, 2.5, 3.5, 4.5])
>>> upper = pl.Series([1.5, 2.5, 3.5, 4.5, 5.5])
>>> coverage_score(y_true, lower, upper)
0.6
Source code in uncertainty_flow/metrics/coverage.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def coverage_score(
    y_true: pl.Series | np.ndarray,
    lower: pl.Series | np.ndarray,
    upper: pl.Series | np.ndarray,
) -> float:
    """
    Fraction of true values that fall within the prediction interval.

    Args:
        y_true: True values
        lower: Lower bound of prediction interval
        upper: Upper bound of prediction interval

    Returns:
        Fraction of values within interval (float in [0, 1])

    Raises:
        ValueError: If bounds are invalid

    Examples:
        >>> import polars as pl
        >>> y_true = pl.Series([1, 2, 3, 4, 5])
        >>> lower = pl.Series([0.5, 1.5, 2.5, 3.5, 4.5])
        >>> upper = pl.Series([1.5, 2.5, 3.5, 4.5, 5.5])
        >>> coverage_score(y_true, lower, upper)
        0.6
    """
    y_true, lower, upper = as_numpy(y_true, lower, upper)

    if np.any(lower > upper):
        raise InvalidDataError("lower bound must be <= upper bound")

    within_interval = (y_true >= lower) & (y_true <= upper)

    return float(np.mean(within_interval))

crps_quantile(y_true, quantile_matrix, quantile_levels)

Exact CRPS from quantile predictions via the quantile-score decomposition.

Uses the closed-form decomposition:

CRPS = 2 * Σⱼ wⱼ * [𝟙(y < qⱼ) - τⱼ] * (qⱼ - y)

where wⱼ = (τⱼ₊₁ - τⱼ₋₁) / 2 are trapezoidal quadrature weights.

Reference: Laio & Tamea (2007); also used by properscoring and scoringrules packages.

Parameters:

Name Type Description Default
y_true ndarray

(n,) array of true values.

required
quantile_matrix ndarray

(n, k) array of predicted quantile values per sample.

required
quantile_levels ndarray

(k,) array of quantile levels in (0, 1), strictly increasing.

required

Returns:

Type Description
float

Mean CRPS across all samples. Lower is better.

Source code in uncertainty_flow/metrics/crps.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def crps_quantile(
    y_true: np.ndarray,
    quantile_matrix: np.ndarray,
    quantile_levels: np.ndarray,
) -> float:
    """
    Exact CRPS from quantile predictions via the quantile-score decomposition.

    Uses the closed-form decomposition:

        CRPS = 2 * Σⱼ wⱼ * [𝟙(y < qⱼ) - τⱼ] * (qⱼ - y)

    where wⱼ = (τⱼ₊₁ - τⱼ₋₁) / 2 are trapezoidal quadrature weights.

    Reference: Laio & Tamea (2007); also used by ``properscoring`` and
    ``scoringrules`` packages.

    Args:
        y_true: (n,) array of true values.
        quantile_matrix: (n, k) array of predicted quantile values per sample.
        quantile_levels: (k,) array of quantile levels in (0, 1), strictly increasing.

    Returns:
        Mean CRPS across all samples. Lower is better.
    """
    y_true = np.asarray(y_true, dtype=np.float64)
    quantile_matrix = np.asarray(quantile_matrix, dtype=np.float64)
    quantile_levels = np.asarray(quantile_levels, dtype=np.float64)

    k = len(quantile_levels)
    if k < 2:
        raise ValueError("quantile_levels must have at least 2 elements")

    if np.any(np.diff(quantile_matrix, axis=1) < 0):
        warnings.warn(
            "quantile_matrix contains non-monotone columns (quantile crossing). "
            "CRPS results may be unreliable. Ensure quantile values are sorted "
            "per sample.",
            UserWarning,
            stacklevel=2,
        )

    weights = np.empty(k)
    weights[0] = (quantile_levels[1] - quantile_levels[0]) / 2.0
    weights[-1] = (quantile_levels[-1] - quantile_levels[-2]) / 2.0
    if k > 2:
        weights[1:-1] = (quantile_levels[2:] - quantile_levels[:-2]) / 2.0

    indicator = (y_true[:, None] < quantile_matrix).astype(np.float64)
    diff = indicator - quantile_levels[None, :]
    residual = quantile_matrix - y_true[:, None]

    crps_per_sample = 2.0 * np.sum(weights[None, :] * diff * residual, axis=1)

    return float(np.mean(crps_per_sample))

crps_score(y_true, lower, upper, confidence=0.9)

Approximate CRPS from a prediction interval (Gaussian assumption).

.. deprecated:: Use :func:crps_quantile or DistributionPrediction.crps(y_true) instead. This function will be removed in v0.3.0.

Parameters:

Name Type Description Default
y_true Series | ndarray

True values

required
lower Series | ndarray

Lower bound of prediction interval

required
upper Series | ndarray

Upper bound of prediction interval

required
confidence float

Confidence level for the interval

0.9

Returns:

Type Description
float

Approximate CRPS score (float). Lower is better.

Source code in uncertainty_flow/metrics/crps.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def crps_score(
    y_true: pl.Series | np.ndarray,
    lower: pl.Series | np.ndarray,
    upper: pl.Series | np.ndarray,
    confidence: float = 0.9,
) -> float:
    """
    Approximate CRPS from a prediction interval (Gaussian assumption).

    .. deprecated::
        Use :func:`crps_quantile` or ``DistributionPrediction.crps(y_true)``
        instead. This function will be removed in v0.3.0.

    Args:
        y_true: True values
        lower: Lower bound of prediction interval
        upper: Upper bound of prediction interval
        confidence: Confidence level for the interval

    Returns:
        Approximate CRPS score (float). Lower is better.
    """
    warnings.warn(
        "crps_score() uses a Gaussian approximation and will be removed in "
        "v0.3.0. Use crps_quantile() or DistributionPrediction.crps() instead.",
        FutureWarning,
        stacklevel=2,
    )

    from scipy.stats import norm

    y_true, lower, upper = as_numpy(y_true, lower, upper)

    alpha = 1 - confidence
    z = float(norm.ppf(1 - alpha / 2))

    width = upper - lower
    midpoint = (upper + lower) / 2.0

    sigma = width / (2 * z)

    z_obs = np.where(sigma > 0, (y_true - midpoint) / sigma, 0.0)

    phi_z = norm.pdf(z_obs)

    crps_values = sigma * (z_obs * (2 * norm.cdf(z_obs) - 1) + 2 * phi_z - 1 / np.sqrt(np.pi))

    crps_values = np.where(sigma > 0, crps_values, np.abs(y_true - midpoint))

    return float(np.mean(crps_values))

log_score_kde(y_true, quantile_matrix, quantile_levels, n_draw=500, random_state=None)

Non-parametric log-score via kernel density estimation.

Draws samples from the piecewise-linear CDF defined by the quantile knots, fits a Gaussian KDE, and evaluates the log-density.

Parameters:

Name Type Description Default
y_true ndarray

(n,) array of true values.

required
quantile_matrix ndarray

(n, k) array of predicted quantile values.

required
quantile_levels ndarray

(k,) array of quantile levels.

required
n_draw int

Samples per observation for KDE fitting.

500
random_state int | None

Random seed.

None

Returns:

Type Description
float

Mean log-score (float).

Source code in uncertainty_flow/metrics/log_score.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def log_score_kde(
    y_true: np.ndarray,
    quantile_matrix: np.ndarray,
    quantile_levels: np.ndarray,
    n_draw: int = 500,
    random_state: int | None = None,
) -> float:
    """
    Non-parametric log-score via kernel density estimation.

    Draws samples from the piecewise-linear CDF defined by the quantile
    knots, fits a Gaussian KDE, and evaluates the log-density.

    Args:
        y_true: (n,) array of true values.
        quantile_matrix: (n, k) array of predicted quantile values.
        quantile_levels: (k,) array of quantile levels.
        n_draw: Samples per observation for KDE fitting.
        random_state: Random seed.

    Returns:
        Mean log-score (float).
    """
    from scipy.stats import gaussian_kde

    y_true = np.asarray(y_true, dtype=np.float64)
    quantile_matrix = np.asarray(quantile_matrix, dtype=np.float64)
    quantile_levels = np.asarray(quantile_levels, dtype=np.float64)

    rng = np.random.default_rng(random_state)
    n = len(y_true)
    log_densities = np.empty(n)

    for i in range(n):
        qv = quantile_matrix[i]
        u = rng.uniform(0, 1, size=n_draw)
        u_clipped = np.clip(u, quantile_levels[0], quantile_levels[-1])
        samples = np.interp(u_clipped, quantile_levels, qv)

        try:
            kde = gaussian_kde(samples)
            log_densities[i] = float(kde.logpdf(y_true[i])[0])
        except (np.linalg.LinAlgError, ValueError):
            log_densities[i] = -np.inf

    finite_mask = np.isfinite(log_densities)
    return float(np.mean(log_densities[finite_mask])) if np.any(finite_mask) else -np.inf

log_score_pooled(y_true, quantile_matrix, quantile_levels, family='auto')

Mean log-score using one pooled distribution fitted from mean quantiles.

This helper keeps the previous behavior for backward comparisons.

Source code in uncertainty_flow/metrics/log_score.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def log_score_pooled(
    y_true: np.ndarray,
    quantile_matrix: np.ndarray,
    quantile_levels: np.ndarray,
    family: str = "auto",
) -> float:
    """
    Mean log-score using one pooled distribution fitted from mean quantiles.

    This helper keeps the previous behavior for backward comparisons.
    """
    y_true = np.asarray(y_true, dtype=np.float64)
    quantile_matrix = np.asarray(quantile_matrix, dtype=np.float64)
    quantile_levels = np.asarray(quantile_levels, dtype=np.float64)

    from ..core.parametric import fit_parametric

    mean_qv = np.mean(quantile_matrix, axis=0)
    dist = fit_parametric(mean_qv, quantile_levels, family=family)
    log_densities = dist.logpdf(y_true)

    finite_mask = np.isfinite(log_densities)
    return float(np.mean(log_densities[finite_mask])) if np.any(finite_mask) else -np.inf

energy_score(pred, y_true, n_samples=1000, random_state=None)

Energy score — a proper multivariate scoring rule.

.. math::

\text{ES} = \mathbb{E}[\|X - y\|] - \frac{1}{2} \mathbb{E}[\|X - X'\|]

where :math:X, X' are independent draws from the forecast distribution.

Parameters:

Name Type Description Default
pred DistributionPrediction

DistributionPrediction with ≥2 targets.

required
y_true

True values (DataFrame / array matching targets).

required
n_samples int

Monte Carlo samples per observation.

1000
random_state int | None

Random seed.

None

Returns:

Type Description
float

Mean energy score (float). Lower is better.

Source code in uncertainty_flow/metrics/multivariate.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def energy_score(
    pred: DistributionPrediction,
    y_true,
    n_samples: int = 1000,
    random_state: int | None = None,
) -> float:
    """
    Energy score — a proper multivariate scoring rule.

    .. math::

        \\text{ES} = \\mathbb{E}[\\|X - y\\|] - \\frac{1}{2} \\mathbb{E}[\\|X - X'\\|]

    where :math:`X, X'` are independent draws from the forecast distribution.

    Args:
        pred: ``DistributionPrediction`` with ≥2 targets.
        y_true: True values (DataFrame / array matching targets).
        n_samples: Monte Carlo samples per observation.
        random_state: Random seed.

    Returns:
        Mean energy score (float). Lower is better.
    """
    y_arr = pred._coerce_y_true(y_true)
    n_obs = pred._n_samples
    d = len(pred._targets)

    if d < 2:
        raise ValueError("Energy score requires at least 2 targets.")

    rng = np.random.default_rng(random_state)
    rs1, rs2 = rng.integers(0, 2**31, size=2)

    sample_df1 = pred.sample(n_samples, random_state=int(rs1))
    sample_df2 = pred.sample(n_samples, random_state=int(rs2))

    targets = pred._targets
    sm1 = np.column_stack([sample_df1[t].to_numpy() for t in targets])
    sm1 = sm1.reshape(n_obs, n_samples, d)
    sm2 = np.column_stack([sample_df2[t].to_numpy() for t in targets])
    sm2 = sm2.reshape(n_obs, n_samples, d)

    if y_arr.ndim == 1:
        y_arr = np.tile(y_arr[:, None], (1, d))

    term1 = np.zeros(n_obs)
    term2 = np.zeros(n_obs)

    for i in range(n_obs):
        samples_i = sm1[i]
        samples_i_prime = sm2[i]
        y_i = y_arr[i]

        diff_x_y = samples_i - y_i[None, :]
        norms_x_y = np.sqrt(np.sum(diff_x_y**2, axis=1))
        term1[i] = np.mean(norms_x_y)

        diff_xx = samples_i - samples_i_prime
        norms_xx = np.sqrt(np.sum(diff_xx**2, axis=1))
        term2[i] = 0.5 * np.mean(norms_xx)

    return float(np.mean(term1 - term2))

variogram_score(pred, y_true, n_samples=1000, p=0.5, random_state=None)

Variogram score — sensitive to correlation structure.

.. math::

\text{VS}_p = \sum_{i \neq j} w_{ij}
    \left(|y_i - y_j|^p - \mathbb{E}[|X_i - X_j|^p]\right)^2

Parameters:

Name Type Description Default
pred DistributionPrediction

DistributionPrediction with ≥2 targets.

required
y_true

True values.

required
n_samples int

Monte Carlo samples per observation.

1000
p float

Power parameter (default 0.5).

0.5
random_state int | None

Random seed.

None

Returns:

Type Description
float

Mean variogram score (float). Lower is better.

Source code in uncertainty_flow/metrics/multivariate.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def variogram_score(
    pred: DistributionPrediction,
    y_true,
    n_samples: int = 1000,
    p: float = 0.5,
    random_state: int | None = None,
) -> float:
    """
    Variogram score — sensitive to correlation structure.

    .. math::

        \\text{VS}_p = \\sum_{i \\neq j} w_{ij}
            \\left(|y_i - y_j|^p - \\mathbb{E}[|X_i - X_j|^p]\\right)^2

    Args:
        pred: ``DistributionPrediction`` with ≥2 targets.
        y_true: True values.
        n_samples: Monte Carlo samples per observation.
        p: Power parameter (default 0.5).
        random_state: Random seed.

    Returns:
        Mean variogram score (float). Lower is better.
    """
    y_arr = pred._coerce_y_true(y_true)
    n_obs = pred._n_samples
    d = len(pred._targets)

    if d < 2:
        raise ValueError("Variogram score requires at least 2 targets.")

    sample_df = pred.sample(n_samples, random_state=random_state)

    targets = pred._targets
    sample_matrix = np.column_stack([sample_df[t].to_numpy() for t in targets])
    sample_matrix = sample_matrix.reshape(n_obs, n_samples, d)

    if y_arr.ndim == 1:
        y_arr = np.tile(y_arr[:, None], (1, d))

    scores = np.zeros(n_obs)

    for i in range(n_obs):
        samples_i = sample_matrix[i]
        y_i = y_arr[i]

        vs = 0.0
        for j in range(d):
            for k in range(j + 1, d):
                obs_diff = np.abs(y_i[j] - y_i[k]) ** p
                sim_diffs = np.abs(samples_i[:, j] - samples_i[:, k]) ** p
                exp_diff = np.mean(sim_diffs)
                vs += (obs_diff - exp_diff) ** 2

        scores[i] = vs

    return float(np.mean(scores))

pinball_loss(y_true, y_pred, quantile)

Quantile loss (pinball loss).

For quantile q, loss = max(q * (y_true - y_pred), (q - 1) * (y_true - y_pred)) Penalizes over-prediction and under-prediction asymmetrically.

Parameters:

Name Type Description Default
y_true Series | ndarray

True values

required
y_pred Series | ndarray

Predicted values

required
quantile float

Quantile level (e.g., 0.9 for 90th percentile)

required

Returns:

Type Description
float

Mean loss across all samples (float)

Raises:

Type Description
ValueError

If quantile is not in (0, 1)

Examples:

>>> import polars as pl
>>> y_true = pl.Series([1, 2, 3, 4, 5])
>>> y_pred = pl.Series([1.5, 2.5, 2.5, 4.5, 4.5])
>>> pinball_loss(y_true, y_pred, 0.5)
0.4
Source code in uncertainty_flow/metrics/pinball.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def pinball_loss(
    y_true: pl.Series | np.ndarray,
    y_pred: pl.Series | np.ndarray,
    quantile: float,
) -> float:
    """
    Quantile loss (pinball loss).

    For quantile q, loss = max(q * (y_true - y_pred), (q - 1) * (y_true - y_pred))
    Penalizes over-prediction and under-prediction asymmetrically.

    Args:
        y_true: True values
        y_pred: Predicted values
        quantile: Quantile level (e.g., 0.9 for 90th percentile)

    Returns:
        Mean loss across all samples (float)

    Raises:
        ValueError: If quantile is not in (0, 1)

    Examples:
        >>> import polars as pl
        >>> y_true = pl.Series([1, 2, 3, 4, 5])
        >>> y_pred = pl.Series([1.5, 2.5, 2.5, 4.5, 4.5])
        >>> pinball_loss(y_true, y_pred, 0.5)
        0.4
    """
    if not (0 < quantile < 1):
        raise QuantileError(f"quantile must be in (0, 1), got {quantile}")

    y_true, y_pred = as_numpy(y_true, y_pred)

    error = y_true - y_pred
    loss = np.maximum(quantile * error, (quantile - 1) * error)

    return float(np.mean(loss))

mae_score(y_true, y_pred)

Mean Absolute Error.

Parameters:

Name Type Description Default
y_true Series | ndarray

True values

required
y_pred Series | ndarray

Predicted values (point predictions, e.g. median)

required

Returns:

Type Description
float

Mean absolute error (float). Lower is better.

Source code in uncertainty_flow/metrics/point.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def mae_score(
    y_true: pl.Series | np.ndarray,
    y_pred: pl.Series | np.ndarray,
) -> float:
    """
    Mean Absolute Error.

    Args:
        y_true: True values
        y_pred: Predicted values (point predictions, e.g. median)

    Returns:
        Mean absolute error (float). Lower is better.
    """
    y_true, y_pred = as_numpy(y_true, y_pred)
    return float(np.mean(np.abs(y_true - y_pred)))

rmse_score(y_true, y_pred)

Root Mean Squared Error.

Parameters:

Name Type Description Default
y_true Series | ndarray

True values

required
y_pred Series | ndarray

Predicted values (point predictions, e.g. median)

required

Returns:

Type Description
float

Root mean squared error (float). Lower is better.

Source code in uncertainty_flow/metrics/point.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def rmse_score(
    y_true: pl.Series | np.ndarray,
    y_pred: pl.Series | np.ndarray,
) -> float:
    """
    Root Mean Squared Error.

    Args:
        y_true: True values
        y_pred: Predicted values (point predictions, e.g. median)

    Returns:
        Root mean squared error (float). Lower is better.
    """
    y_true, y_pred = as_numpy(y_true, y_pred)
    return float(np.sqrt(np.mean((y_true - y_pred) ** 2)))

winkler_score(y_true, lower, upper, confidence)

Winkler score for prediction intervals.

Penalizes: - Interval width (wider intervals = higher penalty) - Misses (if y_true outside interval, penalty proportional to distance)

Lower is better.

Parameters:

Name Type Description Default
y_true Series | ndarray

True values

required
lower Series | ndarray

Lower bound of prediction interval

required
upper Series | ndarray

Upper bound of prediction interval

required
confidence float

Confidence level (e.g., 0.9 for 90% interval)

required

Returns:

Type Description
float

Mean Winkler score across all samples (float)

Raises:

Type Description
ValueError

If confidence is not in (0, 1) or if bounds are invalid

Examples:

>>> import polars as pl
>>> y_true = pl.Series([1, 2, 3, 4, 5])
>>> lower = pl.Series([0.5, 1.5, 2.5, 3.5, 4.5])
>>> upper = pl.Series([1.5, 2.5, 3.5, 4.5, 5.5])
>>> winkler_score(y_true, lower, upper, 0.9)
1.0
Source code in uncertainty_flow/metrics/winkler.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def winkler_score(
    y_true: pl.Series | np.ndarray,
    lower: pl.Series | np.ndarray,
    upper: pl.Series | np.ndarray,
    confidence: float,
) -> float:
    """
    Winkler score for prediction intervals.

    Penalizes:
    - Interval width (wider intervals = higher penalty)
    - Misses (if y_true outside interval, penalty proportional to distance)

    Lower is better.

    Args:
        y_true: True values
        lower: Lower bound of prediction interval
        upper: Upper bound of prediction interval
        confidence: Confidence level (e.g., 0.9 for 90% interval)

    Returns:
        Mean Winkler score across all samples (float)

    Raises:
        ValueError: If confidence is not in (0, 1) or if bounds are invalid

    Examples:
        >>> import polars as pl
        >>> y_true = pl.Series([1, 2, 3, 4, 5])
        >>> lower = pl.Series([0.5, 1.5, 2.5, 3.5, 4.5])
        >>> upper = pl.Series([1.5, 2.5, 3.5, 4.5, 5.5])
        >>> winkler_score(y_true, lower, upper, 0.9)
        1.0
    """
    if not (0 < confidence < 1):
        raise QuantileError(f"confidence must be in (0, 1), got {confidence}")

    y_true, lower, upper = as_numpy(y_true, lower, upper)

    if np.any(lower > upper):
        raise InvalidDataError("lower bound must be <= upper bound")

    alpha = 1 - confidence

    width_penalty = upper - lower

    miss_penalty = np.zeros_like(y_true)
    below_mask = y_true < lower
    above_mask = y_true > upper

    miss_penalty[below_mask] = (2 / alpha) * (lower[below_mask] - y_true[below_mask])
    miss_penalty[above_mask] = (2 / alpha) * (y_true[above_mask] - upper[above_mask])

    score = width_penalty + miss_penalty

    return float(np.mean(score))

score(pred, y_true, metric, **kwargs)

Unified metric entry point.

Dispatches to the correct metric function based on metric name, extracting the right inputs from the DistributionPrediction object.

Parameters:

Name Type Description Default
pred DistributionPrediction

A DistributionPrediction from any model's .predict().

required
y_true

True values (Polars Series/DataFrame or numpy array).

required
metric str | Callable

One of "crps", "pinball", "coverage", "winkler", "mae", "rmse", "calibration_error", or a callable fn(pred, y_true) -> float.

required
**kwargs

Extra keyword arguments forwarded to the underlying metric.

{}

Returns:

Type Description
float | dict[str, float]

Scalar for univariate, or {target: value} dict for multivariate.

Source code in uncertainty_flow/metrics/__init__.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def score(
    pred: DistributionPrediction,
    y_true,
    metric: str | Callable,
    **kwargs,
) -> float | dict[str, float]:
    """Unified metric entry point.

    Dispatches to the correct metric function based on ``metric`` name,
    extracting the right inputs from the ``DistributionPrediction`` object.

    Args:
        pred: A ``DistributionPrediction`` from any model's ``.predict()``.
        y_true: True values (Polars Series/DataFrame or numpy array).
        metric: One of ``"crps"``, ``"pinball"``, ``"coverage"``,
            ``"winkler"``, ``"mae"``, ``"rmse"``, ``"calibration_error"``,
            or a callable ``fn(pred, y_true) -> float``.
        **kwargs: Extra keyword arguments forwarded to the underlying metric.

    Returns:
        Scalar for univariate, or ``{target: value}`` dict for multivariate.
    """
    if callable(metric):
        return float(metric(pred, y_true, **kwargs))

    if metric not in _METRIC_NAMES:
        raise ValueError(f"Unknown metric {metric!r}. Choose from: {sorted(_METRIC_NAMES)}")

    if metric == "crps":
        return pred.crps(y_true)

    if metric == "log_score":
        return pred.log_score(y_true, **{k: v for k, v in kwargs.items() if k == "family"})

    if metric == "energy_score":
        return pred.energy_score(
            y_true,
            n_samples=kwargs.get("n_samples", 1000),
            random_state=kwargs.get("random_state", None),
        )

    if metric == "variogram_score":
        return variogram_score(
            pred,
            y_true,
            **{k: v for k, v in kwargs.items() if k in ("n_samples", "p", "random_state")},
        )

    y_arr = pred._coerce_y_true(y_true)
    confidence = kwargs.get("confidence", 0.9)
    targets = pred._targets
    n_targets = len(targets)
    results: dict[str, float] = {}

    for t_idx, target in enumerate(targets):
        y_col = y_arr[:, t_idx] if y_arr.ndim == 2 else y_arr
        median_col = (
            _median_for_target(pred.median(), target) if metric in ("mae", "rmse") else None
        )

        if metric == "mae":
            results[target] = float(mae_score(y_col, median_col.to_numpy()))
            continue

        if metric == "rmse":
            results[target] = float(rmse_score(y_col, median_col.to_numpy()))
            continue

        if metric == "pinball":
            total = 0.0
            for level in pred._levels:
                q_df = pred.quantile(float(level))
                if n_targets == 1:
                    q_vals = q_df.to_numpy().flatten()
                else:
                    q_vals = q_df[f"{target}_q_{level:.3f}"].to_numpy()
                total += pinball_loss(y_col, q_vals, float(level))
            results[target] = total / len(pred._levels)
            continue

        interval_df = pred.interval(confidence)
        if n_targets == 1:
            lower = interval_df["lower"].to_numpy()
            upper = interval_df["upper"].to_numpy()
        else:
            lower = interval_df[f"{target}_lower"].to_numpy()
            upper = interval_df[f"{target}_upper"].to_numpy()

        if metric == "coverage":
            results[target] = float(coverage_score(y_col, lower, upper))
        elif metric == "winkler":
            results[target] = float(winkler_score(y_col, lower, upper, confidence))
        elif metric == "calibration_error":
            results[target] = float(calibration_error(y_col, lower, upper, confidence))

    if n_targets == 1:
        return results[targets[0]]
    return results