Skip to content

Core

uncertainty_flow.core

Core classes for uncertainty_flow.

BaseUncertaintyModel

Bases: ABC

Base class for all uncertainty quantification models.

All models must implement fit() and predict() methods. Calibration reports are provided via default implementation.

Source code in uncertainty_flow/core/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
class BaseUncertaintyModel(ABC):
    """
    Base class for all uncertainty quantification models.

    All models must implement fit() and predict() methods.
    Calibration reports are provided via default implementation.
    """

    @abstractmethod
    def fit(
        self,
        data: PolarsInput,
        target: TargetSpec | None = None,
        **kwargs,
    ) -> "BaseUncertaintyModel":
        """
        Fit the model to training data.

        Args:
            data: Polars DataFrame or LazyFrame with features and target
            target: Target column name(s) - optional for some models
            **kwargs: Additional model-specific parameters

        Returns:
            self (for method chaining)
        """
        ...

    @abstractmethod
    def predict(self, data: PolarsInput) -> "DistributionPrediction":
        """
        Generate probabilistic predictions.

        Args:
            data: Polars DataFrame or LazyFrame with features

        Returns:
            DistributionPrediction object with quantile predictions
        """
        ...

    def predict_batch(
        self,
        data: PolarsInput,
        batch_size: int = 1000,
    ) -> Iterator["DistributionPrediction"]:
        """
        Generate probabilistic predictions in chunks.

        Default implementation slices the data into batches and yields a
        ``DistributionPrediction`` per batch.  Models with native batch
        / GPU support (e.g. torch) should override this.

        Args:
            data: Polars DataFrame or LazyFrame with features
            batch_size: Number of rows per batch (default 1000).

        Yields:
            DistributionPrediction for each chunk.
        """
        data = materialize_lazyframe(data)
        n = len(data)

        for start in range(0, n, batch_size):
            chunk = data[start : start + batch_size]
            yield self.predict(chunk)

    def calibration_report(
        self,
        data: PolarsInput,
        target: TargetSpec | None = None,
        quantile_levels: list[float] | None = None,
    ) -> pl.DataFrame:
        """
        Generate calibration diagnostics.

        Default implementation - can be overridden by subclasses.

        Args:
            data: Validation data
            target: Target column name(s) - optional for some models
            quantile_levels: Quantile levels to evaluate (default: [0.8, 0.9, 0.95])

        Returns:
            Polars DataFrame with calibration metrics
        """
        # Lazy import to avoid circular dependency:
        # calibration_utils -> DistributionPrediction -> this module
        from ..utils.calibration_utils import calibration_report as _calibration_report

        # Collect lazyframe if needed
        data = materialize_lazyframe(data)

        return _calibration_report(self, data, target, quantile_levels)  # type: ignore[arg-type]

    def save(
        self,
        path: str | Path,
        include_metadata: bool = True,
    ) -> None:
        """
        Save the model to a .uf archive.

        Args:
            path: Output archive path.
            include_metadata: Whether to include extended metadata.
        """
        from ._persistence import save_model_archive

        self._metadata = save_model_archive(self, path, include_metadata=include_metadata)

    @classmethod
    def load(
        cls,
        path: str | Path,
        *,
        expected_archive_sha256: str | None = None,
    ) -> "BaseUncertaintyModel":
        """
        Load a model from a .uf archive.

        Args:
            path: Archive path produced by save().
            expected_archive_sha256: Optional SHA-256 hex digest expected for the archive.
                When provided, load() fails if the on-disk archive digest does not match.

        Returns:
            Loaded model instance.
        """
        from ._persistence import _class_path, load_model_archive

        model, _ = load_model_archive(path, expected_archive_sha256=expected_archive_sha256)
        if cls is not BaseUncertaintyModel and not isinstance(model, cls):
            raise TypeError(
                f"Loaded archive contains {_class_path(model)}, "
                f"which is not an instance of {_class_path(cls)}."
            )
        return model

    @property
    def metadata(self) -> dict | None:
        """
        Return persisted or derived metadata for the model.

        Returns None for fresh unfitted models with no persisted metadata.
        """
        cached_metadata = getattr(self, "_metadata", None)
        if cached_metadata is not None:
            return cached_metadata

        if not getattr(self, "_fitted", False):
            return None

        from ._persistence import build_metadata

        return build_metadata(self, include_metadata=True)

    @property
    def uncertainty_drivers_(self) -> pl.DataFrame | None:
        """
        Return residual correlation analysis results.

        Returns None if model has not been fitted.

        Returns:
            Polars DataFrame with feature-residual correlations, or None
        """
        return None

    def explain_interval_width(
        self,
        X: PolarsInput,  # noqa: N803
        background: PolarsInput | None = None,
        quantile_pairs: list[tuple[float, float]] | None = None,
    ) -> pl.DataFrame:
        """
        Compute SHAP values for quantile interval widths.

        Identifies which features drive prediction interval width.
        Thin wrapper around :func:`uncertainty_shap`.

        Subclasses with native feature importance (e.g. quantile forests)
        may override this with a faster implementation.

        Args:
            X: Feature DataFrame to explain.
            background: Background dataset for SHAP. Defaults to ``X[:100]``.
            quantile_pairs: ``(lower, upper)`` quantile pairs to analyse.
                Defaults to ``[(0.1, 0.9), (0.05, 0.95)]``.

        Returns:
            Polars DataFrame with SHAP attributions per feature.
        """
        from ..calibration.shap_values import uncertainty_shap

        X = materialize_lazyframe(X)  # noqa: N806
        if background is not None:
            background = materialize_lazyframe(background)

        return uncertainty_shap(self, X, background=background, quantile_pairs=quantile_pairs)

    def analyze_leverage(
        self,
        X: PolarsInput,  # noqa: N803
        **kwargs,
    ) -> pl.DataFrame:
        """
        Analyze which features most influence prediction uncertainty.

        Thin wrapper around :class:`FeatureLeverageAnalyzer`.

        Args:
            X: Feature DataFrame for leverage analysis.
            **kwargs: Forwarded to ``FeatureLeverageAnalyzer`` (e.g.
                ``confidence``, ``n_perturbations``, ``random_state``).

        Returns:
            Polars DataFrame with leverage scores per feature.
        """
        from ..analysis.leverage import FeatureLeverageAnalyzer

        X = materialize_lazyframe(X)  # noqa: N806
        analyzer = FeatureLeverageAnalyzer(self, **kwargs)
        return analyzer.analyze(X)

metadata property

Return persisted or derived metadata for the model.

Returns None for fresh unfitted models with no persisted metadata.

uncertainty_drivers_ property

Return residual correlation analysis results.

Returns None if model has not been fitted.

Returns:

Type Description
DataFrame | None

Polars DataFrame with feature-residual correlations, or None

fit(data, target=None, **kwargs) abstractmethod

Fit the model to training data.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame with features and target

required
target TargetSpec | None

Target column name(s) - optional for some models

None
**kwargs

Additional model-specific parameters

{}

Returns:

Type Description
BaseUncertaintyModel

self (for method chaining)

Source code in uncertainty_flow/core/base.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@abstractmethod
def fit(
    self,
    data: PolarsInput,
    target: TargetSpec | None = None,
    **kwargs,
) -> "BaseUncertaintyModel":
    """
    Fit the model to training data.

    Args:
        data: Polars DataFrame or LazyFrame with features and target
        target: Target column name(s) - optional for some models
        **kwargs: Additional model-specific parameters

    Returns:
        self (for method chaining)
    """
    ...

predict(data) abstractmethod

Generate probabilistic predictions.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame with features

required

Returns:

Type Description
DistributionPrediction

DistributionPrediction object with quantile predictions

Source code in uncertainty_flow/core/base.py
45
46
47
48
49
50
51
52
53
54
55
56
@abstractmethod
def predict(self, data: PolarsInput) -> "DistributionPrediction":
    """
    Generate probabilistic predictions.

    Args:
        data: Polars DataFrame or LazyFrame with features

    Returns:
        DistributionPrediction object with quantile predictions
    """
    ...

predict_batch(data, batch_size=1000)

Generate probabilistic predictions in chunks.

Default implementation slices the data into batches and yields a DistributionPrediction per batch. Models with native batch / GPU support (e.g. torch) should override this.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame with features

required
batch_size int

Number of rows per batch (default 1000).

1000

Yields:

Type Description
DistributionPrediction

DistributionPrediction for each chunk.

Source code in uncertainty_flow/core/base.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def predict_batch(
    self,
    data: PolarsInput,
    batch_size: int = 1000,
) -> Iterator["DistributionPrediction"]:
    """
    Generate probabilistic predictions in chunks.

    Default implementation slices the data into batches and yields a
    ``DistributionPrediction`` per batch.  Models with native batch
    / GPU support (e.g. torch) should override this.

    Args:
        data: Polars DataFrame or LazyFrame with features
        batch_size: Number of rows per batch (default 1000).

    Yields:
        DistributionPrediction for each chunk.
    """
    data = materialize_lazyframe(data)
    n = len(data)

    for start in range(0, n, batch_size):
        chunk = data[start : start + batch_size]
        yield self.predict(chunk)

calibration_report(data, target=None, quantile_levels=None)

Generate calibration diagnostics.

Default implementation - can be overridden by subclasses.

Parameters:

Name Type Description Default
data PolarsInput

Validation data

required
target TargetSpec | None

Target column name(s) - optional for some models

None
quantile_levels list[float] | None

Quantile levels to evaluate (default: [0.8, 0.9, 0.95])

None

Returns:

Type Description
DataFrame

Polars DataFrame with calibration metrics

Source code in uncertainty_flow/core/base.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def calibration_report(
    self,
    data: PolarsInput,
    target: TargetSpec | None = None,
    quantile_levels: list[float] | None = None,
) -> pl.DataFrame:
    """
    Generate calibration diagnostics.

    Default implementation - can be overridden by subclasses.

    Args:
        data: Validation data
        target: Target column name(s) - optional for some models
        quantile_levels: Quantile levels to evaluate (default: [0.8, 0.9, 0.95])

    Returns:
        Polars DataFrame with calibration metrics
    """
    # Lazy import to avoid circular dependency:
    # calibration_utils -> DistributionPrediction -> this module
    from ..utils.calibration_utils import calibration_report as _calibration_report

    # Collect lazyframe if needed
    data = materialize_lazyframe(data)

    return _calibration_report(self, data, target, quantile_levels)  # type: ignore[arg-type]

save(path, include_metadata=True)

Save the model to a .uf archive.

Parameters:

Name Type Description Default
path str | Path

Output archive path.

required
include_metadata bool

Whether to include extended metadata.

True
Source code in uncertainty_flow/core/base.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def save(
    self,
    path: str | Path,
    include_metadata: bool = True,
) -> None:
    """
    Save the model to a .uf archive.

    Args:
        path: Output archive path.
        include_metadata: Whether to include extended metadata.
    """
    from ._persistence import save_model_archive

    self._metadata = save_model_archive(self, path, include_metadata=include_metadata)

load(path, *, expected_archive_sha256=None) classmethod

Load a model from a .uf archive.

Parameters:

Name Type Description Default
path str | Path

Archive path produced by save().

required
expected_archive_sha256 str | None

Optional SHA-256 hex digest expected for the archive. When provided, load() fails if the on-disk archive digest does not match.

None

Returns:

Type Description
BaseUncertaintyModel

Loaded model instance.

Source code in uncertainty_flow/core/base.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@classmethod
def load(
    cls,
    path: str | Path,
    *,
    expected_archive_sha256: str | None = None,
) -> "BaseUncertaintyModel":
    """
    Load a model from a .uf archive.

    Args:
        path: Archive path produced by save().
        expected_archive_sha256: Optional SHA-256 hex digest expected for the archive.
            When provided, load() fails if the on-disk archive digest does not match.

    Returns:
        Loaded model instance.
    """
    from ._persistence import _class_path, load_model_archive

    model, _ = load_model_archive(path, expected_archive_sha256=expected_archive_sha256)
    if cls is not BaseUncertaintyModel and not isinstance(model, cls):
        raise TypeError(
            f"Loaded archive contains {_class_path(model)}, "
            f"which is not an instance of {_class_path(cls)}."
        )
    return model

explain_interval_width(X, background=None, quantile_pairs=None)

Compute SHAP values for quantile interval widths.

Identifies which features drive prediction interval width. Thin wrapper around :func:uncertainty_shap.

Subclasses with native feature importance (e.g. quantile forests) may override this with a faster implementation.

Parameters:

Name Type Description Default
X PolarsInput

Feature DataFrame to explain.

required
background PolarsInput | None

Background dataset for SHAP. Defaults to X[:100].

None
quantile_pairs list[tuple[float, float]] | None

(lower, upper) quantile pairs to analyse. Defaults to [(0.1, 0.9), (0.05, 0.95)].

None

Returns:

Type Description
DataFrame

Polars DataFrame with SHAP attributions per feature.

Source code in uncertainty_flow/core/base.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def explain_interval_width(
    self,
    X: PolarsInput,  # noqa: N803
    background: PolarsInput | None = None,
    quantile_pairs: list[tuple[float, float]] | None = None,
) -> pl.DataFrame:
    """
    Compute SHAP values for quantile interval widths.

    Identifies which features drive prediction interval width.
    Thin wrapper around :func:`uncertainty_shap`.

    Subclasses with native feature importance (e.g. quantile forests)
    may override this with a faster implementation.

    Args:
        X: Feature DataFrame to explain.
        background: Background dataset for SHAP. Defaults to ``X[:100]``.
        quantile_pairs: ``(lower, upper)`` quantile pairs to analyse.
            Defaults to ``[(0.1, 0.9), (0.05, 0.95)]``.

    Returns:
        Polars DataFrame with SHAP attributions per feature.
    """
    from ..calibration.shap_values import uncertainty_shap

    X = materialize_lazyframe(X)  # noqa: N806
    if background is not None:
        background = materialize_lazyframe(background)

    return uncertainty_shap(self, X, background=background, quantile_pairs=quantile_pairs)

analyze_leverage(X, **kwargs)

Analyze which features most influence prediction uncertainty.

Thin wrapper around :class:FeatureLeverageAnalyzer.

Parameters:

Name Type Description Default
X PolarsInput

Feature DataFrame for leverage analysis.

required
**kwargs

Forwarded to FeatureLeverageAnalyzer (e.g. confidence, n_perturbations, random_state).

{}

Returns:

Type Description
DataFrame

Polars DataFrame with leverage scores per feature.

Source code in uncertainty_flow/core/base.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def analyze_leverage(
    self,
    X: PolarsInput,  # noqa: N803
    **kwargs,
) -> pl.DataFrame:
    """
    Analyze which features most influence prediction uncertainty.

    Thin wrapper around :class:`FeatureLeverageAnalyzer`.

    Args:
        X: Feature DataFrame for leverage analysis.
        **kwargs: Forwarded to ``FeatureLeverageAnalyzer`` (e.g.
            ``confidence``, ``n_perturbations``, ``random_state``).

    Returns:
        Polars DataFrame with leverage scores per feature.
    """
    from ..analysis.leverage import FeatureLeverageAnalyzer

    X = materialize_lazyframe(X)  # noqa: N806
    analyzer = FeatureLeverageAnalyzer(self, **kwargs)
    return analyzer.analyze(X)

DistributionPrediction

Holds predicted distributions for N samples.

Internal storage: NumPy arrays for efficiency. External interface: Polars DataFrames/Series.

Source code in uncertainty_flow/core/distribution.py
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
class DistributionPrediction:
    """
    Holds predicted distributions for N samples.

    Internal storage: NumPy arrays for efficiency.
    External interface: Polars DataFrames/Series.
    """

    def __init__(
        self,
        quantile_matrix: np.ndarray,
        quantile_levels: list[float],
        target_names: list[str],
        posterior: np.ndarray | dict[str, np.ndarray] | None = None,
        posterior_chains: dict[str, np.ndarray] | None = None,
        posterior_predictive: np.ndarray | None = None,
        group_predictions: dict[str, "DistributionPrediction"] | None = None,
        treatment_info: dict | None = None,
        copula: "BaseCopula | None" = None,
    ):
        if not np.all(np.isfinite(quantile_matrix)):
            raise InvalidDataError("quantile_matrix contains NaN or Inf values")

        if quantile_matrix.ndim != 2:
            raise InvalidDataError(f"quantile_matrix must be 2D, got shape {quantile_matrix.shape}")

        if quantile_matrix.shape[0] == 0:
            raise InvalidDataError("quantile_matrix must have at least one row")

        if len(target_names) == 0:
            raise InvalidDataError("target_names cannot be empty")

        n_targets = len(target_names)
        expected_cols = n_targets * len(quantile_levels)

        if quantile_matrix.shape[1] != expected_cols:
            raise InvalidDataError(
                f"quantile_matrix has {quantile_matrix.shape[1]} columns "
                f"but expected {expected_cols} columns for {n_targets} target(s) "
                f"with {len(quantile_levels)} quantile levels each"
            )

        self._quantiles = quantile_matrix
        self._levels = np.array(quantile_levels)
        self._targets = target_names
        self._n_samples = quantile_matrix.shape[0]
        self._n_quantiles = len(quantile_levels)

        # Optional extensions for Bayesian, Multi-Modal, Causal modules
        self._posterior = posterior
        self._posterior_chains = posterior_chains
        self._posterior_predictive = posterior_predictive
        self._group_predictions = group_predictions
        self._treatment_info = treatment_info
        self._copula = copula

    def quantile(self, q: float | list[float]) -> pl.DataFrame:
        """
        Extract specific quantile levels.

        Args:
            q: Single quantile level or list of levels

        Returns:
            Polars DataFrame with columns like "q_0.05" or "price_q_0.05" for multivariate
        """
        if isinstance(q, (int, float)):
            q = [q]

        indices = [self._find_nearest_quantile_index(level) for level in q]

        if len(self._targets) == 1:
            columns = [f"q_{level:.3f}" for level in q]
            data = self._quantiles[:, indices]
        else:
            columns = [f"{target}_q_{level:.3f}" for target in self._targets for level in q]
            data = np.column_stack(
                [
                    self._quantile_slice(t_idx)[:, q_idx]
                    for t_idx in range(len(self._targets))
                    for q_idx in indices
                ]
            )

        return pl.DataFrame(data, schema=columns, orient="row")

    def interval(self, confidence: float = 0.9) -> pl.DataFrame:
        """
        Return prediction interval.

        For 0.9 confidence: uses 0.05 and 0.95 quantiles.
        Returns columns: lower, upper (or price_lower, price_upper for multivariate)

        Args:
            confidence: Confidence level (e.g., 0.9 for 90% interval)

        Returns:
            Polars DataFrame with lower/upper bounds
        """
        if not (0 < confidence < 1):
            raise QuantileError(f"confidence must be in (0, 1), got {confidence}")

        alpha = (1 - confidence) / 2
        lower_idx = self._find_nearest_quantile_index(alpha)
        upper_idx = self._find_nearest_quantile_index(1 - alpha)

        if len(self._targets) == 1:
            columns = ["lower", "upper"]
            data = np.column_stack(
                [
                    self._quantiles[:, lower_idx],
                    self._quantiles[:, upper_idx],
                ]
            )
        else:
            columns = []
            data_parts = []
            for t_idx, target in enumerate(self._targets):
                columns.append(f"{target}_lower")
                columns.append(f"{target}_upper")
                q_slice = self._quantile_slice(t_idx)
                data_parts.append(q_slice[:, lower_idx])
                data_parts.append(q_slice[:, upper_idx])
            data = np.column_stack(data_parts)

        return pl.DataFrame(data, schema=columns, orient="row")

    def median(self) -> pl.Series | pl.DataFrame:
        """Return the 0.5 quantile as a point estimate."""
        median_idx = self._find_nearest_quantile_index(0.5)

        if len(self._targets) == 1:
            return pl.Series("median", self._quantiles[:, median_idx])
        else:
            data = np.column_stack(
                [self._quantile_slice(t_idx)[:, median_idx] for t_idx in range(len(self._targets))]
            )
            return pl.DataFrame(data, schema=self._targets, orient="row")

    def crps(
        self,
        y_true: pl.Series | pl.DataFrame | np.ndarray,
    ) -> float | dict[str, float]:
        """
        Compute the exact CRPS from quantile predictions.

        Uses the quantile-score decomposition (Laio & Tamea 2007) — no
        Gaussian approximation.  Requires at least 2 quantile levels.

        Args:
            y_true: True values.  Polars Series (univariate), DataFrame
                (multivariate — one column per target), or numpy array.

        Returns:
            Float CRPS for univariate predictions, or ``{target: crps}`` dict
            for multivariate.
        """
        from ..metrics.crps import crps_quantile

        if self._n_quantiles < 2:
            raise InvalidDataError(
                "CRPS requires at least 2 quantile levels, "
                f"but DistributionPrediction has {self._n_quantiles}"
            )

        y_arr = self._coerce_y_true(y_true)

        if len(self._targets) == 1:
            return crps_quantile(y_arr, self._quantiles, self._levels)

        result = {}
        for t_idx, target in enumerate(self._targets):
            q_slice = self._quantile_slice(t_idx)
            result[target] = crps_quantile(
                self._target_truth(y_arr, t_idx),
                q_slice,
                self._levels,
            )
        return result

    @staticmethod
    def _forward_cdf(
        quantile_values: np.ndarray,
        levels: np.ndarray,
        y: np.ndarray,
    ) -> np.ndarray:
        """
        Evaluate the piecewise-linear CDF at observed values.

        Args:
            quantile_values: (n_samples, n_quantiles) predicted quantile values.
            levels: (n_quantiles,) quantile levels.
            y: (n_samples,) true values.

        Returns:
            (n_samples,) PIT values in [0, 1].
        """
        n, k = quantile_values.shape
        if k == 1:
            return np.where(y <= quantile_values[:, 0], levels[0], 1.0)

        pit = np.empty(n)
        for i in range(n):
            qi = quantile_values[i]
            yi = y[i]
            if yi <= qi[0]:
                if qi[0] == qi[1]:
                    pit[i] = levels[0] * 0.5
                else:
                    frac = (yi - qi[0]) / (qi[1] - qi[0])
                    pit[i] = max(levels[0] + frac * levels[0], 0.0)
            elif yi >= qi[-1]:
                if qi[-1] == qi[-2]:
                    pit[i] = 1.0 - (1.0 - levels[-1]) * 0.5
                else:
                    frac = (yi - qi[-1]) / (qi[-1] - qi[-2])
                    pit[i] = min(levels[-1] + frac * (1.0 - levels[-1]), 1.0)
            else:
                j = int(np.searchsorted(qi, yi, side="right")) - 1
                j = max(0, min(j, k - 2))
                denom = qi[j + 1] - qi[j]
                if denom == 0:
                    pit[i] = (levels[j] + levels[j + 1]) / 2.0
                else:
                    frac = (yi - qi[j]) / denom
                    pit[i] = levels[j] + frac * (levels[j + 1] - levels[j])
        return np.clip(pit, 0.0, 1.0)

    def _pit_values(
        self,
        y_true: pl.Series | pl.DataFrame | np.ndarray,
    ) -> np.ndarray | dict[str, np.ndarray]:
        """
        Compute PIT values F_i(y_i) for each observation.

        Args:
            y_true: True values.

        Returns:
            (n,) array for univariate, or {target: array} for multivariate.
        """
        if self._n_quantiles < 2:
            raise InvalidDataError(
                "PIT requires at least 2 quantile levels, "
                f"but DistributionPrediction has {self._n_quantiles}"
            )

        y_arr = self._coerce_y_true(y_true)

        if len(self._targets) == 1:
            return self._forward_cdf(self._quantiles, self._levels, y_arr.ravel())

        result = {}
        for t_idx, target in enumerate(self._targets):
            q_slice = self._quantile_slice(t_idx)
            result[target] = self._forward_cdf(
                q_slice, self._levels, self._target_truth(y_arr, t_idx)
            )
        return result

    def pit_histogram(
        self,
        y_true: pl.Series | pl.DataFrame | np.ndarray,
        n_bins: int = 10,
        chi2_test: bool = False,
    ) -> pl.DataFrame | dict[str, pl.DataFrame]:
        """
        Compute PIT histogram for calibration assessment.

        If forecasts are perfectly calibrated, PIT values ~ Uniform(0, 1),
        so each bin should contain roughly n / n_bins observations.

        Args:
            y_true: True values.
            n_bins: Number of histogram bins (default 10).
            chi2_test: If True, include a chi-squared uniformity test p-value
                as a ``chi2_pvalue`` column (default False).

        Returns:
            DataFrame with columns: bin_center, count, expected.
            If ``chi2_test=True``, also includes ``chi2_pvalue``.
            For multivariate, returns {target: DataFrame}.
        """
        pit = self._pit_values(y_true)

        if isinstance(pit, dict):
            return {
                t: self._build_pit_hist(arr, n_bins, chi2_test=chi2_test) for t, arr in pit.items()
            }

        return self._build_pit_hist(pit, n_bins, chi2_test=chi2_test)

    @staticmethod
    def _build_pit_hist(
        pit_values: np.ndarray,
        n_bins: int,
        chi2_test: bool = False,
    ) -> pl.DataFrame:
        n = len(pit_values)
        edges = np.linspace(0.0, 1.0, n_bins + 1)
        counts, _ = np.histogram(pit_values, bins=edges)
        centers = (edges[:-1] + edges[1:]) / 2.0
        expected = np.full(n_bins, n / n_bins)

        data: dict[str, Any] = {
            "bin_center": centers,
            "count": counts.astype(float),
            "expected": expected,
        }

        if chi2_test:
            # Chi-squared test for uniformity
            # Avoid division by zero — expected is constant and > 0 when n > 0
            if n > 0 and np.all(expected > 0):
                from scipy.stats import chisquare

                _, pvalue = chisquare(counts, expected)
                data["chi2_pvalue"] = np.full(n_bins, float(pvalue))
            else:
                data["chi2_pvalue"] = np.full(n_bins, np.nan)

        return pl.DataFrame(data)

    def calibration_curve(
        self,
        y_true: pl.Series | pl.DataFrame | np.ndarray,
        n_bins: int = 20,
    ) -> pl.DataFrame | dict[str, pl.DataFrame]:
        """
        Compute reliability diagram data (calibration curve).

        Bins PIT values and compares expected (nominal) coverage to observed
        (empirical) coverage at increasing probability thresholds.

        Args:
            y_true: True values.
            n_bins: Number of bins (default 20).

        Returns:
            DataFrame with columns: expected_coverage, observed_coverage.
            For multivariate, returns {target: DataFrame}.
        """
        pit = self._pit_values(y_true)

        if isinstance(pit, dict):
            return {t: self._build_cal_curve(arr, n_bins) for t, arr in pit.items()}

        return self._build_cal_curve(pit, n_bins)

    @staticmethod
    def _build_cal_curve(pit_values: np.ndarray, n_bins: int) -> pl.DataFrame:
        edges = np.linspace(0.0, 1.0, n_bins + 1)
        centers = (edges[:-1] + edges[1:]) / 2.0
        counts, _ = np.histogram(pit_values, bins=edges)
        observed = np.cumsum(counts).astype(float) / len(pit_values)
        expected = np.cumsum(np.diff(edges))
        return pl.DataFrame(
            {
                "expected_coverage": expected,
                "observed_coverage": observed,
                "bin_center": centers,
            }
        )

    def plot_pit(
        self,
        y_true: pl.Series | pl.DataFrame | np.ndarray,
        n_bins: int = 10,
    ) -> None:
        """Plot PIT histogram with uniform reference line. Requires matplotlib.

        Args:
            y_true: True values.
            n_bins: Number of histogram bins (default 10).
        """
        from ..viz._plotting import plot_pit as _plot_pit

        _plot_pit(self, y_true, n_bins=n_bins)

    def _coerce_y_true(self, y_true: pl.Series | pl.DataFrame | np.ndarray) -> np.ndarray:
        """Convert y_true input to a numpy array shaped for this prediction."""
        if isinstance(y_true, pl.DataFrame):
            cols = [y_true[t].to_numpy() for t in self._targets]
            return np.column_stack(cols)
        if isinstance(y_true, pl.Series):
            return to_numpy_series(y_true)
        return np.asarray(y_true, dtype=np.float64)

    def sample(self, n: int, random_state: int | None = None) -> pl.DataFrame:
        """
        Draw n samples per input row via piecewise-linear inverse CDF.

        For each row and each target, builds a CDF from the predicted quantile
        matrix (quantile values -> cumulative probability) and draws samples
        by inverting the CDF.

        Args:
            n: Number of samples to draw per input row.
            random_state: Optional random seed for reproducibility.

        Returns:
            Polars DataFrame with (n * n_samples) rows and columns:
            - sample_id: index of the original input row (0 to n_samples-1, repeated n times)
            - One column per target with sampled values

        Raises:
            InvalidDataError: If n is invalid or would exceed memory limits.
        """
        if not isinstance(n, int) or n < 1:
            raise InvalidDataError(f"n must be a positive integer, got {n}")

        total_samples = self._n_samples * n
        if total_samples > MAX_TOTAL_SAMPLES:
            raise InvalidDataError(
                f"Total samples ({total_samples:,}) exceeds maximum ({MAX_TOTAL_SAMPLES:,}). "
                f"Reduce n (currently {n}) or number of input rows ({self._n_samples})."
            )

        rng = np.random.default_rng(random_state)

        if self._copula is not None and len(self._targets) > 1:
            if n <= MAX_SAMPLE_CHUNK_SIZE:
                return self._sample_joint_chunk(n, rng)
            return self._sample_joint_chunked(n, rng)

        if n <= MAX_SAMPLE_CHUNK_SIZE:
            return self._sample_chunk(n, rng)

        return self._sample_chunked(n, rng)

    def _marginal_quantiles(self) -> np.ndarray:
        """Reshape the flat quantile matrix into [row, target, quantile]."""
        return self._quantiles.reshape(self._n_samples, len(self._targets), self._n_quantiles)

    def _sample_joint_chunk(
        self,
        n: int,
        rng: np.random.Generator,
    ) -> pl.DataFrame:
        """Sample jointly using a fitted copula."""
        joint_samples = self._copula.sample(
            self._marginal_quantiles(),
            n_samples=n,
            quantile_levels=self._levels,
            random_state=rng,
        )

        if joint_samples.ndim == 2:
            sample_matrix = joint_samples
        else:
            sample_matrix = joint_samples.reshape(self._n_samples * n, len(self._targets))

        sample_ids = np.repeat(np.arange(self._n_samples), n)
        result = pl.DataFrame(sample_matrix, schema=self._targets, orient="row")
        result.insert_column(0, pl.Series("sample_id", sample_ids))
        return result

    def _sample_joint_chunked(
        self,
        n: int,
        rng: np.random.Generator,
    ) -> pl.DataFrame:
        """Sample jointly in chunks — delegates to _sample_joint_chunk."""
        chunks = []
        remaining = n
        while remaining > 0:
            chunk_size = min(remaining, MAX_SAMPLE_CHUNK_SIZE)
            chunks.append(self._sample_joint_chunk(chunk_size, rng))
            remaining -= chunk_size
        return pl.concat(chunks)

    def _sample_chunk(
        self,
        n: int,
        rng: np.random.Generator,
    ) -> pl.DataFrame:
        """Sample with a single chunk (n <= MAX_SAMPLE_CHUNK_SIZE)."""
        uniform_samples = rng.uniform(0, 1, size=(self._n_samples, n))
        uniform_clipped = np.clip(uniform_samples, self._levels[0], self._levels[-1])

        target_samples_list = [
            self._vectorized_inverse_cdf(
                self._quantile_slice(t_idx), uniform_clipped, self._levels
            ).flatten()
            for t_idx in range(len(self._targets))
        ]

        sample_matrix = np.column_stack(target_samples_list)
        sample_ids = np.repeat(np.arange(self._n_samples), n)

        result = pl.DataFrame(sample_matrix, schema=self._targets, orient="row")
        result.insert_column(0, pl.Series("sample_id", sample_ids))
        return result

    def _sample_chunked(
        self,
        n: int,
        rng: np.random.Generator,
    ) -> pl.DataFrame:
        """Sample with chunking for large n values — delegates to _sample_chunk."""
        chunks = []
        remaining = n
        while remaining > 0:
            chunk_size = min(remaining, MAX_SAMPLE_CHUNK_SIZE)
            chunks.append(self._sample_chunk(chunk_size, rng))
            remaining -= chunk_size
        return pl.concat(chunks)

    @staticmethod
    def _vectorized_inverse_cdf(
        quantile_values: np.ndarray,
        uniform_clipped: np.ndarray,
        levels: np.ndarray,
    ) -> np.ndarray:
        """
        Vectorized inverse CDF sampling via piecewise-linear interpolation.

        Args:
            quantile_values: (n_samples, n_quantiles) array of quantile values
            uniform_clipped: (n_samples, n) array of uniform samples
            levels: (n_quantiles,) array of quantile levels

        Returns:
            (n_samples, n) array of sampled values
        """
        lower_idx = np.searchsorted(levels, uniform_clipped, side="right") - 1
        lower_idx = np.clip(lower_idx, 0, len(levels) - 2)
        upper_idx = lower_idx + 1

        row_idx = np.arange(quantile_values.shape[0])[:, None]
        lower_x = levels[lower_idx]
        upper_x = levels[upper_idx]
        lower_y = quantile_values[row_idx, lower_idx]
        upper_y = quantile_values[row_idx, upper_idx]

        denom = upper_x - lower_x
        denom[denom == 0] = 1.0
        weight = (uniform_clipped - lower_x) / denom
        return lower_y + weight * (upper_y - lower_y)

    def plot(
        self,
        actuals: pl.Series | pl.DataFrame | None = None,
        confidence_bands: list[float] | None = None,
        title: str | None = None,
        targets: str | list[str] = "all",
        max_targets: int = 6,
    ) -> None:
        """Fan chart of quantile bands. Requires matplotlib.

        Args:
            actuals: Optional actual values for comparison.
            confidence_bands: Confidence levels (default: [0.5, 0.8, 0.9, 0.95]).
            title: Optional plot title.
            targets: Target(s) to plot. ``"all"`` plots every target.
            max_targets: Maximum subplot panels (default 6).
        """
        from ..viz._plotting import plot as _plot

        _plot(
            self,
            actuals=actuals,
            confidence_bands=confidence_bands,
            title=title,
            targets=targets,
            max_targets=max_targets,
        )

    @lru_cache(maxsize=128)
    def _find_nearest_quantile_index(self, q: float) -> int:
        """Find index of nearest quantile level. Cached for repeated lookups."""
        distances = np.abs(self._levels - q)
        return int(np.argmin(distances))

    def _quantile_slice(self, target_idx: int) -> np.ndarray:
        """Return (n_samples, n_quantiles) slice for a given target index."""
        q_start = target_idx * self._n_quantiles
        return self._quantiles[:, q_start : q_start + self._n_quantiles]

    def _interval_columns(self, target: str) -> tuple[str, str]:
        """Return (lower_col, upper_col) column name pair for a target."""
        if len(self._targets) == 1:
            return "lower", "upper"
        return f"{target}_lower", f"{target}_upper"

    def _target_truth(self, y_arr: np.ndarray, t_idx: int, i: int | None = None) -> np.ndarray:
        """Extract truth values for a single target from a full y array."""
        if y_arr.ndim == 1:
            return y_arr
        if i is not None:
            return np.array([y_arr[i, t_idx]])
        return y_arr[:, t_idx]

    def __repr__(self) -> str:
        parts = [
            f"DistributionPrediction(n={self._n_samples}, "
            f"targets={self._targets}, quantiles={len(self._levels)}"
        ]
        if self._posterior is not None:
            parts.append(", posterior=True")
        if self._posterior_predictive is not None:
            parts.append(", posterior_predictive=True")
        parts.append(")")
        return "".join(parts)

    # --- Bayesian posterior methods ---

    def posterior_samples(self) -> np.ndarray:
        """Return raw posterior parameter draws as a 2D matrix."""
        if self._posterior is None:
            raise InvalidDataError(
                "posterior_samples() requires posterior data. "
                "Use a BayesianQuantileRegressor to generate predictions with posteriors."
            )
        if isinstance(self._posterior, np.ndarray):
            return self._posterior

        # Concatenate named posterior tensors into [draw, feature] for summaries.
        matrices = []
        for arr in self._posterior.values():
            arr_np = np.asarray(arr)
            if arr_np.ndim == 1:
                matrices.append(arr_np[:, np.newaxis])
            else:
                matrices.append(arr_np.reshape(arr_np.shape[0], -1))
        if not matrices:
            raise InvalidDataError("posterior data is empty")
        return np.column_stack(matrices)

    def posterior_parameter_interval(self, confidence: float = 0.9) -> pl.DataFrame:
        """Compute parameter credible intervals from posterior draws."""
        if not (0 < confidence < 1):
            raise QuantileError(f"confidence must be in (0, 1), got {confidence}")
        samples = self.posterior_samples()
        alpha = (1 - confidence) / 2
        lower = np.quantile(samples, alpha, axis=0)
        upper = np.quantile(samples, 1 - alpha, axis=0)
        return pl.DataFrame({"lower": lower, "upper": upper}, orient="row")

    def credible_interval(self, confidence: float = 0.9) -> pl.DataFrame:
        """Compute predictive credible intervals for each prediction row."""
        import warnings

        if not (0 < confidence < 1):
            raise QuantileError(f"confidence must be in (0, 1), got {confidence}")

        if self._posterior_predictive is None:
            warnings.warn(
                "credible_interval() currently falls back to parameter intervals when "
                "posterior predictive draws are unavailable. Use "
                "posterior_parameter_interval() for explicit parameter intervals.",
                FutureWarning,
                stacklevel=2,
            )
            return self.posterior_parameter_interval(confidence)

        alpha = (1 - confidence) / 2
        lower = np.quantile(self._posterior_predictive, alpha, axis=1)
        upper = np.quantile(self._posterior_predictive, 1 - alpha, axis=1)
        return pl.DataFrame({"lower": lower, "upper": upper}, orient="row")

    def rhat(self) -> np.ndarray:
        """Compute Gelman-Rubin R-hat convergence diagnostic from true chains."""
        if self._posterior_chains is None:
            raise InvalidDataError(
                "rhat() requires posterior chain data. "
                "Fit BayesianQuantileRegressor with num_chains > 1."
            )

        all_rhat = []
        for name, arr in self._posterior_chains.items():
            chains = np.asarray(arr)
            if chains.ndim < 2:
                raise InvalidDataError(f"posterior chain array for '{name}' must be at least 2D")
            n_chains = chains.shape[0]
            chain_len = chains.shape[1]
            if n_chains < 2:
                raise InvalidDataError(
                    f"rhat() requires at least 2 chains for '{name}', got {n_chains}."
                )
            if chain_len < 2:
                raise InvalidDataError(
                    f"rhat() requires at least 2 draws per chain for '{name}', got {chain_len}."
                )
            reshaped = chains.reshape(n_chains, chain_len, -1)
            chain_means = reshaped.mean(axis=1)
            b = chain_len * np.var(chain_means, axis=0, ddof=1)
            w = np.mean(np.var(reshaped, axis=1, ddof=1), axis=0)
            if np.any(w < 1e-10):
                raise InvalidDataError(
                    f"Within-chain variance too close to zero for R-hat calculation for '{name}'."
                )
            var_hat = (1 - 1 / chain_len) * w + (1 / chain_len) * b
            all_rhat.append(np.sqrt(var_hat / w))

        if not all_rhat:
            raise InvalidDataError(
                "rhat() requires at least one posterior chain parameter to evaluate."
            )
        return np.concatenate(all_rhat)

    def posterior_summary(self) -> pl.DataFrame:
        """Return summary statistics of posterior samples."""
        if self._posterior is None:
            raise InvalidDataError(
                "posterior_summary() requires posterior data. "
                "Use a BayesianQuantileRegressor to generate predictions with posteriors."
            )
        samples = self.posterior_samples()
        return pl.DataFrame(
            {
                "mean": np.mean(samples, axis=0),
                "std": np.std(samples, axis=0),
                "q025": np.quantile(samples, 0.025, axis=0),
                "q50": np.quantile(samples, 0.5, axis=0),
                "q975": np.quantile(samples, 0.975, axis=0),
            }
        )

    # --- Multi-modal group methods ---

    def group_uncertainty(self) -> dict[str, float]:
        """Return per-group uncertainty contribution (interval width)."""
        if self._group_predictions is None:
            raise InvalidDataError(
                "group_uncertainty() requires group predictions. "
                "Use a CrossModalAggregator to generate predictions with groups."
            )
        result = {}
        for name, pred in self._group_predictions.items():
            interval = pred.interval(0.9)
            lower = interval["lower"].to_numpy()
            upper = interval["upper"].to_numpy()
            result[name] = float(np.mean(upper - lower))
        return result

    def group_intervals(self, confidence: float = 0.9) -> dict[str, pl.DataFrame]:
        """Return per-group prediction intervals."""
        if self._group_predictions is None:
            raise InvalidDataError(
                "group_intervals() requires group predictions. "
                "Use a CrossModalAggregator to generate predictions with groups."
            )
        return {name: pred.interval(confidence) for name, pred in self._group_predictions.items()}

    def cross_group_correlation(self) -> np.ndarray:
        """Return cross-group correlation matrix based on group median predictions."""
        if self._group_predictions is None:
            raise InvalidDataError(
                "cross_group_correlation() requires group predictions. "
                "Use a CrossModalAggregator to generate predictions with groups."
            )
        medians = np.column_stack(
            [
                pred._quantiles[:, pred._find_nearest_quantile_index(0.5)]
                for pred in self._group_predictions.values()
            ]
        )
        return np.corrcoef(medians.T)  # type: ignore

    # --- Causal treatment methods ---

    def treatment_effect(self) -> np.ndarray:
        """Return CATE point estimates."""
        if self._treatment_info is None:
            raise InvalidDataError(
                "treatment_effect() requires treatment info. "
                "Use a CausalUncertaintyEstimator to generate predictions with treatment data."
            )
        return self._treatment_info["cate"]  # type: ignore

    def average_treatment_effect(self) -> dict:
        """Return ATE with confidence interval."""
        if self._treatment_info is None:
            raise InvalidDataError(
                "Use a CausalUncertaintyEstimator to generate predictions with treatment data."
            )
        if "ate" not in self._treatment_info or "ate_ci" not in self._treatment_info:
            raise InvalidDataError(
                "average_treatment_effect() requires evaluated treatment metrics. "
                "Call CausalUncertaintyEstimator.evaluate(...) on labeled data first."
            )
        return {
            "ate": self._treatment_info["ate"],
            "ci": self._treatment_info["ate_ci"],
        }

    def heterogeneity_score(self) -> float:
        """Return CATE variance as heterogeneity measure."""
        if self._treatment_info is None:
            raise InvalidDataError(
                "heterogeneity_score() requires treatment info. "
                "Use a CausalUncertaintyEstimator to generate predictions with treatment data."
            )
        return float(np.var(self._treatment_info["cate"]))

    def uncertainty_decomposition(
        self,
        confidence: float = 0.9,
    ) -> dict[str, float]:
        """
        Return a lightweight heuristic uncertainty decomposition.
        Aleatoric uncertainty (data noise): Irreducible uncertainty inherent in the data.
        Epistemic uncertainty (model uncertainty): Reducible uncertainty due to limited
            data/knowledge.

        This method does not refit or evaluate an ensemble of models. It is a cheap
        summary derived from this single `DistributionPrediction` object:
        - Aleatoric: Average width of prediction intervals (data uncertainty)
        - Epistemic: Variance of interval widths across samples (model uncertainty)

        For model-based decomposition with bootstrap refits, use
        `uncertainty_flow.decomposition.EnsembleDecomposition`.

        Args:
            confidence: Confidence level for interval width calculation (default: 0.9)

        Returns:
            Dictionary with:
                - aleatoric: Irreducible uncertainty (average interval width)
                - epistemic: Heuristic uncertainty summary (variance of interval widths)
                - total: Combined uncertainty

        Examples
        --------
        >>> pred = model.predict(X_test)
        >>> decomposition = pred.uncertainty_decomposition()
        >>> print(f"Total: {decomposition['total']:.2f}")
        >>> print(f"  Aleatoric: {decomposition['aleatoric']:.2f}")
        >>> print(f"  Epistemic: {decomposition['epistemic']:.2f}")
        """
        return self._decomposition_for_target(0, confidence)

    def _decomposition_for_target(
        self, target_idx: int, confidence: float, interval: pl.DataFrame | None = None
    ) -> dict[str, float]:
        if interval is None:
            interval = self.interval(confidence)
        target = self._targets[target_idx]
        lower_col, upper_col = self._interval_columns(target)

        lower = to_numpy_series(interval[lower_col])
        upper = to_numpy_series(interval[upper_col])
        widths = upper - lower
        aleatoric = float(np.mean(widths))
        epistemic = float(np.var(widths))

        return {
            "aleatoric": aleatoric,
            "epistemic": epistemic,
            "total": aleatoric + epistemic,
        }

    def summary(self, confidence: float = 0.9) -> pl.DataFrame:
        """
        One-row-per-target overview of the prediction distribution.

        Columns: target, median, mean_width_90, mean_width_50,
        aleatoric, epistemic, total_uncertainty.

        ``mean_width_90`` is the mean width at the ``confidence`` level.
        ``mean_width_50`` is the mean inter-quartile range (25th–75th percentile).

        Args:
            confidence: Confidence level for the primary interval width (default 0.9).

        Returns:
            Polars DataFrame with one row per target.
        """
        median_idx = self._find_nearest_quantile_index(0.5)
        alpha = (1 - confidence) / 2
        lower_idx = self._find_nearest_quantile_index(alpha)
        upper_idx = self._find_nearest_quantile_index(1 - alpha)
        lower_50_idx = self._find_nearest_quantile_index(0.25)
        upper_50_idx = self._find_nearest_quantile_index(0.75)

        _warn_if_far(
            self._levels,
            {
                0.5: median_idx,
                alpha: lower_idx,
                1 - alpha: upper_idx,
                0.25: lower_50_idx,
                0.75: upper_50_idx,
            },
        )

        rows = []
        interval = self.interval(confidence)
        for t_idx, target in enumerate(self._targets):
            q_slice = self._quantile_slice(t_idx)

            median_val = float(np.mean(q_slice[:, median_idx]))
            width = float(np.mean(q_slice[:, upper_idx] - q_slice[:, lower_idx]))
            narrow = float(np.mean(q_slice[:, upper_50_idx] - q_slice[:, lower_50_idx]))

            decomp = self._decomposition_for_target(t_idx, confidence, interval)

            rows.append(
                {
                    "target": target,
                    "median": median_val,
                    "mean_width_90": width,
                    "mean_width_50": narrow,
                    "aleatoric": decomp["aleatoric"],
                    "epistemic": decomp["epistemic"],
                    "total_uncertainty": decomp["total"],
                }
            )

        return pl.DataFrame(rows)

    def fit_distribution(
        self,
        family: str = "auto",
        row_index: int | None = None,
    ) -> ParametricDistribution | list[ParametricDistribution]:
        """
        Fit a parametric distribution to the quantile predictions.

        For univariate predictions, fits a single distribution. For
        multivariate, fits one distribution per target.

        Args:
            family: One of ``"normal"``, ``"student_t"``, ``"lognormal"``,
                ``"gamma"``, or ``"auto"`` (best fit by KS distance).
            row_index: If given, fit only for that row.  Otherwise fit for
                the *mean* quantile vector across all rows.

        Returns:
            A single ``ParametricDistribution`` for univariate, or a list
            of ``ParametricDistribution`` (one per target) for multivariate.
            When ``row_index`` is given, always returns a single distribution
            for univariate or a list for multivariate.
        """
        from .parametric import fit_parametric

        if len(self._targets) == 1:
            if row_index is not None:
                qv = self._quantiles[row_index, : self._n_quantiles]
            else:
                qv = np.mean(self._quantiles[:, : self._n_quantiles], axis=0)
            return fit_parametric(qv, self._levels, family=family)

        results = []
        for t_idx in range(len(self._targets)):
            q_slice = self._quantile_slice(t_idx)
            if row_index is not None:
                qv = q_slice[row_index]
            else:
                qv = np.mean(q_slice, axis=0)
            results.append(fit_parametric(qv, self._levels, family=family))
        return results

    def log_score(
        self,
        y_true: pl.Series | pl.DataFrame | np.ndarray,
        family: str = "auto",
    ) -> float | dict[str, float]:
        """
        Compute the mean negative log-likelihood (log-score).

        Fits a parametric distribution from the predicted quantiles, then
        evaluates the log-density at the true values.  Higher is better
        (less negative).

        Args:
            y_true: True values.
            family: Distribution family for fitting, or ``"auto"``.

        Returns:
            Mean log-score (float) for univariate, or ``{target: score}`` dict
            for multivariate.
        """
        from ..metrics.log_score import log_score as _log_score

        y_arr = self._coerce_y_true(y_true)

        if len(self._targets) == 1:
            return _log_score(
                y_arr, self._quantiles[:, : self._n_quantiles], self._levels, family=family
            )

        result = {}
        for t_idx, target in enumerate(self._targets):
            result[target] = _log_score(
                self._target_truth(y_arr, t_idx),
                self._quantile_slice(t_idx),
                self._levels,
                family=family,
            )
        return result

    def energy_score(
        self,
        y_true: pl.Series | pl.DataFrame | np.ndarray,
        n_samples: int = 1000,
        random_state: int | None = None,
    ) -> float:
        """
        Compute the energy score for multivariate predictions.

        A proper scoring rule that generalises CRPS to the multivariate
        case.  Requires at least 2 targets.

        Args:
            y_true: True values (array with columns matching targets).
            n_samples: Monte Carlo samples per observation.
            random_state: Random seed.

        Returns:
            Mean energy score (float).
        """
        from ..metrics.multivariate import energy_score as _energy_score

        return _energy_score(self, y_true, n_samples=n_samples, random_state=random_state)

    def variogram_score(
        self,
        y_true: pl.Series | pl.DataFrame | np.ndarray,
        n_samples: int = 1000,
        p: float = 0.5,
        random_state: int | None = None,
    ) -> float:
        """
        Compute the variogram score for multivariate predictions.

        A proper scoring rule sensitive to the correlation structure.
        Requires at least 2 targets.

        Args:
            y_true: True values (array with columns matching targets).
            n_samples: Monte Carlo samples per observation.
            p: Power parameter (default 0.5).
            random_state: Random seed.

        Returns:
            Mean variogram score (float).
        """
        from ..metrics.multivariate import variogram_score as _variogram_score

        return _variogram_score(self, y_true, n_samples=n_samples, p=p, random_state=random_state)

quantile(q)

Extract specific quantile levels.

Parameters:

Name Type Description Default
q float | list[float]

Single quantile level or list of levels

required

Returns:

Type Description
DataFrame

Polars DataFrame with columns like "q_0.05" or "price_q_0.05" for multivariate

Source code in uncertainty_flow/core/distribution.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def quantile(self, q: float | list[float]) -> pl.DataFrame:
    """
    Extract specific quantile levels.

    Args:
        q: Single quantile level or list of levels

    Returns:
        Polars DataFrame with columns like "q_0.05" or "price_q_0.05" for multivariate
    """
    if isinstance(q, (int, float)):
        q = [q]

    indices = [self._find_nearest_quantile_index(level) for level in q]

    if len(self._targets) == 1:
        columns = [f"q_{level:.3f}" for level in q]
        data = self._quantiles[:, indices]
    else:
        columns = [f"{target}_q_{level:.3f}" for target in self._targets for level in q]
        data = np.column_stack(
            [
                self._quantile_slice(t_idx)[:, q_idx]
                for t_idx in range(len(self._targets))
                for q_idx in indices
            ]
        )

    return pl.DataFrame(data, schema=columns, orient="row")

interval(confidence=0.9)

Return prediction interval.

For 0.9 confidence: uses 0.05 and 0.95 quantiles. Returns columns: lower, upper (or price_lower, price_upper for multivariate)

Parameters:

Name Type Description Default
confidence float

Confidence level (e.g., 0.9 for 90% interval)

0.9

Returns:

Type Description
DataFrame

Polars DataFrame with lower/upper bounds

Source code in uncertainty_flow/core/distribution.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def interval(self, confidence: float = 0.9) -> pl.DataFrame:
    """
    Return prediction interval.

    For 0.9 confidence: uses 0.05 and 0.95 quantiles.
    Returns columns: lower, upper (or price_lower, price_upper for multivariate)

    Args:
        confidence: Confidence level (e.g., 0.9 for 90% interval)

    Returns:
        Polars DataFrame with lower/upper bounds
    """
    if not (0 < confidence < 1):
        raise QuantileError(f"confidence must be in (0, 1), got {confidence}")

    alpha = (1 - confidence) / 2
    lower_idx = self._find_nearest_quantile_index(alpha)
    upper_idx = self._find_nearest_quantile_index(1 - alpha)

    if len(self._targets) == 1:
        columns = ["lower", "upper"]
        data = np.column_stack(
            [
                self._quantiles[:, lower_idx],
                self._quantiles[:, upper_idx],
            ]
        )
    else:
        columns = []
        data_parts = []
        for t_idx, target in enumerate(self._targets):
            columns.append(f"{target}_lower")
            columns.append(f"{target}_upper")
            q_slice = self._quantile_slice(t_idx)
            data_parts.append(q_slice[:, lower_idx])
            data_parts.append(q_slice[:, upper_idx])
        data = np.column_stack(data_parts)

    return pl.DataFrame(data, schema=columns, orient="row")

median()

Return the 0.5 quantile as a point estimate.

Source code in uncertainty_flow/core/distribution.py
169
170
171
172
173
174
175
176
177
178
179
def median(self) -> pl.Series | pl.DataFrame:
    """Return the 0.5 quantile as a point estimate."""
    median_idx = self._find_nearest_quantile_index(0.5)

    if len(self._targets) == 1:
        return pl.Series("median", self._quantiles[:, median_idx])
    else:
        data = np.column_stack(
            [self._quantile_slice(t_idx)[:, median_idx] for t_idx in range(len(self._targets))]
        )
        return pl.DataFrame(data, schema=self._targets, orient="row")

crps(y_true)

Compute the exact CRPS from quantile predictions.

Uses the quantile-score decomposition (Laio & Tamea 2007) — no Gaussian approximation. Requires at least 2 quantile levels.

Parameters:

Name Type Description Default
y_true Series | DataFrame | ndarray

True values. Polars Series (univariate), DataFrame (multivariate — one column per target), or numpy array.

required

Returns:

Type Description
float | dict[str, float]

Float CRPS for univariate predictions, or {target: crps} dict

float | dict[str, float]

for multivariate.

Source code in uncertainty_flow/core/distribution.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def crps(
    self,
    y_true: pl.Series | pl.DataFrame | np.ndarray,
) -> float | dict[str, float]:
    """
    Compute the exact CRPS from quantile predictions.

    Uses the quantile-score decomposition (Laio & Tamea 2007) — no
    Gaussian approximation.  Requires at least 2 quantile levels.

    Args:
        y_true: True values.  Polars Series (univariate), DataFrame
            (multivariate — one column per target), or numpy array.

    Returns:
        Float CRPS for univariate predictions, or ``{target: crps}`` dict
        for multivariate.
    """
    from ..metrics.crps import crps_quantile

    if self._n_quantiles < 2:
        raise InvalidDataError(
            "CRPS requires at least 2 quantile levels, "
            f"but DistributionPrediction has {self._n_quantiles}"
        )

    y_arr = self._coerce_y_true(y_true)

    if len(self._targets) == 1:
        return crps_quantile(y_arr, self._quantiles, self._levels)

    result = {}
    for t_idx, target in enumerate(self._targets):
        q_slice = self._quantile_slice(t_idx)
        result[target] = crps_quantile(
            self._target_truth(y_arr, t_idx),
            q_slice,
            self._levels,
        )
    return result

pit_histogram(y_true, n_bins=10, chi2_test=False)

Compute PIT histogram for calibration assessment.

If forecasts are perfectly calibrated, PIT values ~ Uniform(0, 1), so each bin should contain roughly n / n_bins observations.

Parameters:

Name Type Description Default
y_true Series | DataFrame | ndarray

True values.

required
n_bins int

Number of histogram bins (default 10).

10
chi2_test bool

If True, include a chi-squared uniformity test p-value as a chi2_pvalue column (default False).

False

Returns:

Type Description
DataFrame | dict[str, DataFrame]

DataFrame with columns: bin_center, count, expected.

DataFrame | dict[str, DataFrame]

If chi2_test=True, also includes chi2_pvalue.

DataFrame | dict[str, DataFrame]

For multivariate, returns {target: DataFrame}.

Source code in uncertainty_flow/core/distribution.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def pit_histogram(
    self,
    y_true: pl.Series | pl.DataFrame | np.ndarray,
    n_bins: int = 10,
    chi2_test: bool = False,
) -> pl.DataFrame | dict[str, pl.DataFrame]:
    """
    Compute PIT histogram for calibration assessment.

    If forecasts are perfectly calibrated, PIT values ~ Uniform(0, 1),
    so each bin should contain roughly n / n_bins observations.

    Args:
        y_true: True values.
        n_bins: Number of histogram bins (default 10).
        chi2_test: If True, include a chi-squared uniformity test p-value
            as a ``chi2_pvalue`` column (default False).

    Returns:
        DataFrame with columns: bin_center, count, expected.
        If ``chi2_test=True``, also includes ``chi2_pvalue``.
        For multivariate, returns {target: DataFrame}.
    """
    pit = self._pit_values(y_true)

    if isinstance(pit, dict):
        return {
            t: self._build_pit_hist(arr, n_bins, chi2_test=chi2_test) for t, arr in pit.items()
        }

    return self._build_pit_hist(pit, n_bins, chi2_test=chi2_test)

calibration_curve(y_true, n_bins=20)

Compute reliability diagram data (calibration curve).

Bins PIT values and compares expected (nominal) coverage to observed (empirical) coverage at increasing probability thresholds.

Parameters:

Name Type Description Default
y_true Series | DataFrame | ndarray

True values.

required
n_bins int

Number of bins (default 20).

20

Returns:

Type Description
DataFrame | dict[str, DataFrame]

DataFrame with columns: expected_coverage, observed_coverage.

DataFrame | dict[str, DataFrame]

For multivariate, returns {target: DataFrame}.

Source code in uncertainty_flow/core/distribution.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def calibration_curve(
    self,
    y_true: pl.Series | pl.DataFrame | np.ndarray,
    n_bins: int = 20,
) -> pl.DataFrame | dict[str, pl.DataFrame]:
    """
    Compute reliability diagram data (calibration curve).

    Bins PIT values and compares expected (nominal) coverage to observed
    (empirical) coverage at increasing probability thresholds.

    Args:
        y_true: True values.
        n_bins: Number of bins (default 20).

    Returns:
        DataFrame with columns: expected_coverage, observed_coverage.
        For multivariate, returns {target: DataFrame}.
    """
    pit = self._pit_values(y_true)

    if isinstance(pit, dict):
        return {t: self._build_cal_curve(arr, n_bins) for t, arr in pit.items()}

    return self._build_cal_curve(pit, n_bins)

plot_pit(y_true, n_bins=10)

Plot PIT histogram with uniform reference line. Requires matplotlib.

Parameters:

Name Type Description Default
y_true Series | DataFrame | ndarray

True values.

required
n_bins int

Number of histogram bins (default 10).

10
Source code in uncertainty_flow/core/distribution.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def plot_pit(
    self,
    y_true: pl.Series | pl.DataFrame | np.ndarray,
    n_bins: int = 10,
) -> None:
    """Plot PIT histogram with uniform reference line. Requires matplotlib.

    Args:
        y_true: True values.
        n_bins: Number of histogram bins (default 10).
    """
    from ..viz._plotting import plot_pit as _plot_pit

    _plot_pit(self, y_true, n_bins=n_bins)

sample(n, random_state=None)

Draw n samples per input row via piecewise-linear inverse CDF.

For each row and each target, builds a CDF from the predicted quantile matrix (quantile values -> cumulative probability) and draws samples by inverting the CDF.

Parameters:

Name Type Description Default
n int

Number of samples to draw per input row.

required
random_state int | None

Optional random seed for reproducibility.

None

Returns:

Type Description
DataFrame

Polars DataFrame with (n * n_samples) rows and columns:

DataFrame
  • sample_id: index of the original input row (0 to n_samples-1, repeated n times)
DataFrame
  • One column per target with sampled values

Raises:

Type Description
InvalidDataError

If n is invalid or would exceed memory limits.

Source code in uncertainty_flow/core/distribution.py
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def sample(self, n: int, random_state: int | None = None) -> pl.DataFrame:
    """
    Draw n samples per input row via piecewise-linear inverse CDF.

    For each row and each target, builds a CDF from the predicted quantile
    matrix (quantile values -> cumulative probability) and draws samples
    by inverting the CDF.

    Args:
        n: Number of samples to draw per input row.
        random_state: Optional random seed for reproducibility.

    Returns:
        Polars DataFrame with (n * n_samples) rows and columns:
        - sample_id: index of the original input row (0 to n_samples-1, repeated n times)
        - One column per target with sampled values

    Raises:
        InvalidDataError: If n is invalid or would exceed memory limits.
    """
    if not isinstance(n, int) or n < 1:
        raise InvalidDataError(f"n must be a positive integer, got {n}")

    total_samples = self._n_samples * n
    if total_samples > MAX_TOTAL_SAMPLES:
        raise InvalidDataError(
            f"Total samples ({total_samples:,}) exceeds maximum ({MAX_TOTAL_SAMPLES:,}). "
            f"Reduce n (currently {n}) or number of input rows ({self._n_samples})."
        )

    rng = np.random.default_rng(random_state)

    if self._copula is not None and len(self._targets) > 1:
        if n <= MAX_SAMPLE_CHUNK_SIZE:
            return self._sample_joint_chunk(n, rng)
        return self._sample_joint_chunked(n, rng)

    if n <= MAX_SAMPLE_CHUNK_SIZE:
        return self._sample_chunk(n, rng)

    return self._sample_chunked(n, rng)

plot(actuals=None, confidence_bands=None, title=None, targets='all', max_targets=6)

Fan chart of quantile bands. Requires matplotlib.

Parameters:

Name Type Description Default
actuals Series | DataFrame | None

Optional actual values for comparison.

None
confidence_bands list[float] | None

Confidence levels (default: [0.5, 0.8, 0.9, 0.95]).

None
title str | None

Optional plot title.

None
targets str | list[str]

Target(s) to plot. "all" plots every target.

'all'
max_targets int

Maximum subplot panels (default 6).

6
Source code in uncertainty_flow/core/distribution.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
def plot(
    self,
    actuals: pl.Series | pl.DataFrame | None = None,
    confidence_bands: list[float] | None = None,
    title: str | None = None,
    targets: str | list[str] = "all",
    max_targets: int = 6,
) -> None:
    """Fan chart of quantile bands. Requires matplotlib.

    Args:
        actuals: Optional actual values for comparison.
        confidence_bands: Confidence levels (default: [0.5, 0.8, 0.9, 0.95]).
        title: Optional plot title.
        targets: Target(s) to plot. ``"all"`` plots every target.
        max_targets: Maximum subplot panels (default 6).
    """
    from ..viz._plotting import plot as _plot

    _plot(
        self,
        actuals=actuals,
        confidence_bands=confidence_bands,
        title=title,
        targets=targets,
        max_targets=max_targets,
    )

posterior_samples()

Return raw posterior parameter draws as a 2D matrix.

Source code in uncertainty_flow/core/distribution.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
def posterior_samples(self) -> np.ndarray:
    """Return raw posterior parameter draws as a 2D matrix."""
    if self._posterior is None:
        raise InvalidDataError(
            "posterior_samples() requires posterior data. "
            "Use a BayesianQuantileRegressor to generate predictions with posteriors."
        )
    if isinstance(self._posterior, np.ndarray):
        return self._posterior

    # Concatenate named posterior tensors into [draw, feature] for summaries.
    matrices = []
    for arr in self._posterior.values():
        arr_np = np.asarray(arr)
        if arr_np.ndim == 1:
            matrices.append(arr_np[:, np.newaxis])
        else:
            matrices.append(arr_np.reshape(arr_np.shape[0], -1))
    if not matrices:
        raise InvalidDataError("posterior data is empty")
    return np.column_stack(matrices)

posterior_parameter_interval(confidence=0.9)

Compute parameter credible intervals from posterior draws.

Source code in uncertainty_flow/core/distribution.py
671
672
673
674
675
676
677
678
679
def posterior_parameter_interval(self, confidence: float = 0.9) -> pl.DataFrame:
    """Compute parameter credible intervals from posterior draws."""
    if not (0 < confidence < 1):
        raise QuantileError(f"confidence must be in (0, 1), got {confidence}")
    samples = self.posterior_samples()
    alpha = (1 - confidence) / 2
    lower = np.quantile(samples, alpha, axis=0)
    upper = np.quantile(samples, 1 - alpha, axis=0)
    return pl.DataFrame({"lower": lower, "upper": upper}, orient="row")

credible_interval(confidence=0.9)

Compute predictive credible intervals for each prediction row.

Source code in uncertainty_flow/core/distribution.py
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
def credible_interval(self, confidence: float = 0.9) -> pl.DataFrame:
    """Compute predictive credible intervals for each prediction row."""
    import warnings

    if not (0 < confidence < 1):
        raise QuantileError(f"confidence must be in (0, 1), got {confidence}")

    if self._posterior_predictive is None:
        warnings.warn(
            "credible_interval() currently falls back to parameter intervals when "
            "posterior predictive draws are unavailable. Use "
            "posterior_parameter_interval() for explicit parameter intervals.",
            FutureWarning,
            stacklevel=2,
        )
        return self.posterior_parameter_interval(confidence)

    alpha = (1 - confidence) / 2
    lower = np.quantile(self._posterior_predictive, alpha, axis=1)
    upper = np.quantile(self._posterior_predictive, 1 - alpha, axis=1)
    return pl.DataFrame({"lower": lower, "upper": upper}, orient="row")

rhat()

Compute Gelman-Rubin R-hat convergence diagnostic from true chains.

Source code in uncertainty_flow/core/distribution.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
def rhat(self) -> np.ndarray:
    """Compute Gelman-Rubin R-hat convergence diagnostic from true chains."""
    if self._posterior_chains is None:
        raise InvalidDataError(
            "rhat() requires posterior chain data. "
            "Fit BayesianQuantileRegressor with num_chains > 1."
        )

    all_rhat = []
    for name, arr in self._posterior_chains.items():
        chains = np.asarray(arr)
        if chains.ndim < 2:
            raise InvalidDataError(f"posterior chain array for '{name}' must be at least 2D")
        n_chains = chains.shape[0]
        chain_len = chains.shape[1]
        if n_chains < 2:
            raise InvalidDataError(
                f"rhat() requires at least 2 chains for '{name}', got {n_chains}."
            )
        if chain_len < 2:
            raise InvalidDataError(
                f"rhat() requires at least 2 draws per chain for '{name}', got {chain_len}."
            )
        reshaped = chains.reshape(n_chains, chain_len, -1)
        chain_means = reshaped.mean(axis=1)
        b = chain_len * np.var(chain_means, axis=0, ddof=1)
        w = np.mean(np.var(reshaped, axis=1, ddof=1), axis=0)
        if np.any(w < 1e-10):
            raise InvalidDataError(
                f"Within-chain variance too close to zero for R-hat calculation for '{name}'."
            )
        var_hat = (1 - 1 / chain_len) * w + (1 / chain_len) * b
        all_rhat.append(np.sqrt(var_hat / w))

    if not all_rhat:
        raise InvalidDataError(
            "rhat() requires at least one posterior chain parameter to evaluate."
        )
    return np.concatenate(all_rhat)

posterior_summary()

Return summary statistics of posterior samples.

Source code in uncertainty_flow/core/distribution.py
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
def posterior_summary(self) -> pl.DataFrame:
    """Return summary statistics of posterior samples."""
    if self._posterior is None:
        raise InvalidDataError(
            "posterior_summary() requires posterior data. "
            "Use a BayesianQuantileRegressor to generate predictions with posteriors."
        )
    samples = self.posterior_samples()
    return pl.DataFrame(
        {
            "mean": np.mean(samples, axis=0),
            "std": np.std(samples, axis=0),
            "q025": np.quantile(samples, 0.025, axis=0),
            "q50": np.quantile(samples, 0.5, axis=0),
            "q975": np.quantile(samples, 0.975, axis=0),
        }
    )

group_uncertainty()

Return per-group uncertainty contribution (interval width).

Source code in uncertainty_flow/core/distribution.py
763
764
765
766
767
768
769
770
771
772
773
774
775
776
def group_uncertainty(self) -> dict[str, float]:
    """Return per-group uncertainty contribution (interval width)."""
    if self._group_predictions is None:
        raise InvalidDataError(
            "group_uncertainty() requires group predictions. "
            "Use a CrossModalAggregator to generate predictions with groups."
        )
    result = {}
    for name, pred in self._group_predictions.items():
        interval = pred.interval(0.9)
        lower = interval["lower"].to_numpy()
        upper = interval["upper"].to_numpy()
        result[name] = float(np.mean(upper - lower))
    return result

group_intervals(confidence=0.9)

Return per-group prediction intervals.

Source code in uncertainty_flow/core/distribution.py
778
779
780
781
782
783
784
785
def group_intervals(self, confidence: float = 0.9) -> dict[str, pl.DataFrame]:
    """Return per-group prediction intervals."""
    if self._group_predictions is None:
        raise InvalidDataError(
            "group_intervals() requires group predictions. "
            "Use a CrossModalAggregator to generate predictions with groups."
        )
    return {name: pred.interval(confidence) for name, pred in self._group_predictions.items()}

cross_group_correlation()

Return cross-group correlation matrix based on group median predictions.

Source code in uncertainty_flow/core/distribution.py
787
788
789
790
791
792
793
794
795
796
797
798
799
800
def cross_group_correlation(self) -> np.ndarray:
    """Return cross-group correlation matrix based on group median predictions."""
    if self._group_predictions is None:
        raise InvalidDataError(
            "cross_group_correlation() requires group predictions. "
            "Use a CrossModalAggregator to generate predictions with groups."
        )
    medians = np.column_stack(
        [
            pred._quantiles[:, pred._find_nearest_quantile_index(0.5)]
            for pred in self._group_predictions.values()
        ]
    )
    return np.corrcoef(medians.T)  # type: ignore

treatment_effect()

Return CATE point estimates.

Source code in uncertainty_flow/core/distribution.py
804
805
806
807
808
809
810
811
def treatment_effect(self) -> np.ndarray:
    """Return CATE point estimates."""
    if self._treatment_info is None:
        raise InvalidDataError(
            "treatment_effect() requires treatment info. "
            "Use a CausalUncertaintyEstimator to generate predictions with treatment data."
        )
    return self._treatment_info["cate"]  # type: ignore

average_treatment_effect()

Return ATE with confidence interval.

Source code in uncertainty_flow/core/distribution.py
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
def average_treatment_effect(self) -> dict:
    """Return ATE with confidence interval."""
    if self._treatment_info is None:
        raise InvalidDataError(
            "Use a CausalUncertaintyEstimator to generate predictions with treatment data."
        )
    if "ate" not in self._treatment_info or "ate_ci" not in self._treatment_info:
        raise InvalidDataError(
            "average_treatment_effect() requires evaluated treatment metrics. "
            "Call CausalUncertaintyEstimator.evaluate(...) on labeled data first."
        )
    return {
        "ate": self._treatment_info["ate"],
        "ci": self._treatment_info["ate_ci"],
    }

heterogeneity_score()

Return CATE variance as heterogeneity measure.

Source code in uncertainty_flow/core/distribution.py
829
830
831
832
833
834
835
836
def heterogeneity_score(self) -> float:
    """Return CATE variance as heterogeneity measure."""
    if self._treatment_info is None:
        raise InvalidDataError(
            "heterogeneity_score() requires treatment info. "
            "Use a CausalUncertaintyEstimator to generate predictions with treatment data."
        )
    return float(np.var(self._treatment_info["cate"]))

uncertainty_decomposition(confidence=0.9)

Return a lightweight heuristic uncertainty decomposition. Aleatoric uncertainty (data noise): Irreducible uncertainty inherent in the data. Epistemic uncertainty (model uncertainty): Reducible uncertainty due to limited data/knowledge.

This method does not refit or evaluate an ensemble of models. It is a cheap summary derived from this single DistributionPrediction object: - Aleatoric: Average width of prediction intervals (data uncertainty) - Epistemic: Variance of interval widths across samples (model uncertainty)

For model-based decomposition with bootstrap refits, use uncertainty_flow.decomposition.EnsembleDecomposition.

Parameters:

Name Type Description Default
confidence float

Confidence level for interval width calculation (default: 0.9)

0.9

Returns:

Type Description
dict[str, float]

Dictionary with: - aleatoric: Irreducible uncertainty (average interval width) - epistemic: Heuristic uncertainty summary (variance of interval widths) - total: Combined uncertainty

Examples

pred = model.predict(X_test) decomposition = pred.uncertainty_decomposition() print(f"Total: {decomposition['total']:.2f}") print(f" Aleatoric: {decomposition['aleatoric']:.2f}") print(f" Epistemic: {decomposition['epistemic']:.2f}")

Source code in uncertainty_flow/core/distribution.py
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
def uncertainty_decomposition(
    self,
    confidence: float = 0.9,
) -> dict[str, float]:
    """
    Return a lightweight heuristic uncertainty decomposition.
    Aleatoric uncertainty (data noise): Irreducible uncertainty inherent in the data.
    Epistemic uncertainty (model uncertainty): Reducible uncertainty due to limited
        data/knowledge.

    This method does not refit or evaluate an ensemble of models. It is a cheap
    summary derived from this single `DistributionPrediction` object:
    - Aleatoric: Average width of prediction intervals (data uncertainty)
    - Epistemic: Variance of interval widths across samples (model uncertainty)

    For model-based decomposition with bootstrap refits, use
    `uncertainty_flow.decomposition.EnsembleDecomposition`.

    Args:
        confidence: Confidence level for interval width calculation (default: 0.9)

    Returns:
        Dictionary with:
            - aleatoric: Irreducible uncertainty (average interval width)
            - epistemic: Heuristic uncertainty summary (variance of interval widths)
            - total: Combined uncertainty

    Examples
    --------
    >>> pred = model.predict(X_test)
    >>> decomposition = pred.uncertainty_decomposition()
    >>> print(f"Total: {decomposition['total']:.2f}")
    >>> print(f"  Aleatoric: {decomposition['aleatoric']:.2f}")
    >>> print(f"  Epistemic: {decomposition['epistemic']:.2f}")
    """
    return self._decomposition_for_target(0, confidence)

summary(confidence=0.9)

One-row-per-target overview of the prediction distribution.

Columns: target, median, mean_width_90, mean_width_50, aleatoric, epistemic, total_uncertainty.

mean_width_90 is the mean width at the confidence level. mean_width_50 is the mean inter-quartile range (25th–75th percentile).

Parameters:

Name Type Description Default
confidence float

Confidence level for the primary interval width (default 0.9).

0.9

Returns:

Type Description
DataFrame

Polars DataFrame with one row per target.

Source code in uncertainty_flow/core/distribution.py
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
def summary(self, confidence: float = 0.9) -> pl.DataFrame:
    """
    One-row-per-target overview of the prediction distribution.

    Columns: target, median, mean_width_90, mean_width_50,
    aleatoric, epistemic, total_uncertainty.

    ``mean_width_90`` is the mean width at the ``confidence`` level.
    ``mean_width_50`` is the mean inter-quartile range (25th–75th percentile).

    Args:
        confidence: Confidence level for the primary interval width (default 0.9).

    Returns:
        Polars DataFrame with one row per target.
    """
    median_idx = self._find_nearest_quantile_index(0.5)
    alpha = (1 - confidence) / 2
    lower_idx = self._find_nearest_quantile_index(alpha)
    upper_idx = self._find_nearest_quantile_index(1 - alpha)
    lower_50_idx = self._find_nearest_quantile_index(0.25)
    upper_50_idx = self._find_nearest_quantile_index(0.75)

    _warn_if_far(
        self._levels,
        {
            0.5: median_idx,
            alpha: lower_idx,
            1 - alpha: upper_idx,
            0.25: lower_50_idx,
            0.75: upper_50_idx,
        },
    )

    rows = []
    interval = self.interval(confidence)
    for t_idx, target in enumerate(self._targets):
        q_slice = self._quantile_slice(t_idx)

        median_val = float(np.mean(q_slice[:, median_idx]))
        width = float(np.mean(q_slice[:, upper_idx] - q_slice[:, lower_idx]))
        narrow = float(np.mean(q_slice[:, upper_50_idx] - q_slice[:, lower_50_idx]))

        decomp = self._decomposition_for_target(t_idx, confidence, interval)

        rows.append(
            {
                "target": target,
                "median": median_val,
                "mean_width_90": width,
                "mean_width_50": narrow,
                "aleatoric": decomp["aleatoric"],
                "epistemic": decomp["epistemic"],
                "total_uncertainty": decomp["total"],
            }
        )

    return pl.DataFrame(rows)

fit_distribution(family='auto', row_index=None)

Fit a parametric distribution to the quantile predictions.

For univariate predictions, fits a single distribution. For multivariate, fits one distribution per target.

Parameters:

Name Type Description Default
family str

One of "normal", "student_t", "lognormal", "gamma", or "auto" (best fit by KS distance).

'auto'
row_index int | None

If given, fit only for that row. Otherwise fit for the mean quantile vector across all rows.

None

Returns:

Type Description
ParametricDistribution | list[ParametricDistribution]

A single ParametricDistribution for univariate, or a list

ParametricDistribution | list[ParametricDistribution]

of ParametricDistribution (one per target) for multivariate.

ParametricDistribution | list[ParametricDistribution]

When row_index is given, always returns a single distribution

ParametricDistribution | list[ParametricDistribution]

for univariate or a list for multivariate.

Source code in uncertainty_flow/core/distribution.py
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
def fit_distribution(
    self,
    family: str = "auto",
    row_index: int | None = None,
) -> ParametricDistribution | list[ParametricDistribution]:
    """
    Fit a parametric distribution to the quantile predictions.

    For univariate predictions, fits a single distribution. For
    multivariate, fits one distribution per target.

    Args:
        family: One of ``"normal"``, ``"student_t"``, ``"lognormal"``,
            ``"gamma"``, or ``"auto"`` (best fit by KS distance).
        row_index: If given, fit only for that row.  Otherwise fit for
            the *mean* quantile vector across all rows.

    Returns:
        A single ``ParametricDistribution`` for univariate, or a list
        of ``ParametricDistribution`` (one per target) for multivariate.
        When ``row_index`` is given, always returns a single distribution
        for univariate or a list for multivariate.
    """
    from .parametric import fit_parametric

    if len(self._targets) == 1:
        if row_index is not None:
            qv = self._quantiles[row_index, : self._n_quantiles]
        else:
            qv = np.mean(self._quantiles[:, : self._n_quantiles], axis=0)
        return fit_parametric(qv, self._levels, family=family)

    results = []
    for t_idx in range(len(self._targets)):
        q_slice = self._quantile_slice(t_idx)
        if row_index is not None:
            qv = q_slice[row_index]
        else:
            qv = np.mean(q_slice, axis=0)
        results.append(fit_parametric(qv, self._levels, family=family))
    return results

log_score(y_true, family='auto')

Compute the mean negative log-likelihood (log-score).

Fits a parametric distribution from the predicted quantiles, then evaluates the log-density at the true values. Higher is better (less negative).

Parameters:

Name Type Description Default
y_true Series | DataFrame | ndarray

True values.

required
family str

Distribution family for fitting, or "auto".

'auto'

Returns:

Type Description
float | dict[str, float]

Mean log-score (float) for univariate, or {target: score} dict

float | dict[str, float]

for multivariate.

Source code in uncertainty_flow/core/distribution.py
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
def log_score(
    self,
    y_true: pl.Series | pl.DataFrame | np.ndarray,
    family: str = "auto",
) -> float | dict[str, float]:
    """
    Compute the mean negative log-likelihood (log-score).

    Fits a parametric distribution from the predicted quantiles, then
    evaluates the log-density at the true values.  Higher is better
    (less negative).

    Args:
        y_true: True values.
        family: Distribution family for fitting, or ``"auto"``.

    Returns:
        Mean log-score (float) for univariate, or ``{target: score}`` dict
        for multivariate.
    """
    from ..metrics.log_score import log_score as _log_score

    y_arr = self._coerce_y_true(y_true)

    if len(self._targets) == 1:
        return _log_score(
            y_arr, self._quantiles[:, : self._n_quantiles], self._levels, family=family
        )

    result = {}
    for t_idx, target in enumerate(self._targets):
        result[target] = _log_score(
            self._target_truth(y_arr, t_idx),
            self._quantile_slice(t_idx),
            self._levels,
            family=family,
        )
    return result

energy_score(y_true, n_samples=1000, random_state=None)

Compute the energy score for multivariate predictions.

A proper scoring rule that generalises CRPS to the multivariate case. Requires at least 2 targets.

Parameters:

Name Type Description Default
y_true Series | DataFrame | ndarray

True values (array with columns matching targets).

required
n_samples int

Monte Carlo samples per observation.

1000
random_state int | None

Random seed.

None

Returns:

Type Description
float

Mean energy score (float).

Source code in uncertainty_flow/core/distribution.py
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
def energy_score(
    self,
    y_true: pl.Series | pl.DataFrame | np.ndarray,
    n_samples: int = 1000,
    random_state: int | None = None,
) -> float:
    """
    Compute the energy score for multivariate predictions.

    A proper scoring rule that generalises CRPS to the multivariate
    case.  Requires at least 2 targets.

    Args:
        y_true: True values (array with columns matching targets).
        n_samples: Monte Carlo samples per observation.
        random_state: Random seed.

    Returns:
        Mean energy score (float).
    """
    from ..metrics.multivariate import energy_score as _energy_score

    return _energy_score(self, y_true, n_samples=n_samples, random_state=random_state)

variogram_score(y_true, n_samples=1000, p=0.5, random_state=None)

Compute the variogram score for multivariate predictions.

A proper scoring rule sensitive to the correlation structure. Requires at least 2 targets.

Parameters:

Name Type Description Default
y_true Series | DataFrame | ndarray

True values (array with columns matching targets).

required
n_samples int

Monte Carlo samples per observation.

1000
p float

Power parameter (default 0.5).

0.5
random_state int | None

Random seed.

None

Returns:

Type Description
float

Mean variogram score (float).

Source code in uncertainty_flow/core/distribution.py
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
def variogram_score(
    self,
    y_true: pl.Series | pl.DataFrame | np.ndarray,
    n_samples: int = 1000,
    p: float = 0.5,
    random_state: int | None = None,
) -> float:
    """
    Compute the variogram score for multivariate predictions.

    A proper scoring rule sensitive to the correlation structure.
    Requires at least 2 targets.

    Args:
        y_true: True values (array with columns matching targets).
        n_samples: Monte Carlo samples per observation.
        p: Power parameter (default 0.5).
        random_state: Random seed.

    Returns:
        Mean variogram score (float).
    """
    from ..metrics.multivariate import variogram_score as _variogram_score

    return _variogram_score(self, y_true, n_samples=n_samples, p=p, random_state=random_state)

ParametricDistribution

Parametric distribution fitted from quantile predictions.

Supported families: "normal", "student_t", "lognormal", "gamma", "auto" (selects best by KS distance).

Source code in uncertainty_flow/core/parametric.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ParametricDistribution:
    """
    Parametric distribution fitted from quantile predictions.

    Supported families: ``"normal"``, ``"student_t"``, ``"lognormal"``,
    ``"gamma"``, ``"auto"`` (selects best by KS distance).
    """

    _FAMILIES = ("normal", "student_t", "lognormal", "gamma")

    def __init__(
        self,
        family: str,
        params: dict[str, float],
        quantile_values: np.ndarray | None = None,
        quantile_levels: np.ndarray | None = None,
    ):
        if family not in self._FAMILIES:
            raise ValueError(f"Unknown family {family!r}. Choose from {self._FAMILIES}")
        self.family = family
        self._params = params
        self._quantile_values = quantile_values
        self._quantile_levels = quantile_levels

    @property
    def mean(self) -> float:
        return float(self._rv().mean())

    @property
    def variance(self) -> float:
        return float(self._rv().var())

    @property
    def shape_params(self) -> dict[str, float]:
        return dict(self._params)

    def pdf(self, x: np.ndarray | float) -> np.ndarray:
        x = np.asarray(x, dtype=np.float64)
        return self._rv().pdf(x)

    def cdf(self, x: np.ndarray | float) -> np.ndarray:
        x = np.asarray(x, dtype=np.float64)
        return self._rv().cdf(x)

    def ppf(self, q: np.ndarray | float) -> np.ndarray:
        q = np.asarray(q, dtype=np.float64)
        return self._rv().ppf(q)

    def logpdf(self, x: np.ndarray | float) -> np.ndarray:
        x = np.asarray(x, dtype=np.float64)
        return self._rv().logpdf(x)

    def sample(self, n: int, random_state: int | None = None) -> np.ndarray:
        return self._rv().rvs(size=n, random_state=random_state)

    def _rv(self) -> stats.rv_continuous:
        return _FAMILY_REGISTRY[self.family].rv_factory(self._params)

    def __repr__(self) -> str:
        parts = ", ".join(f"{k}={v:.4g}" for k, v in self._params.items())
        return f"ParametricDistribution(family={self.family!r}, {parts})"

PredictionSet

Prediction set for conformal classification.

For each sample, stores the set of classes included at the calibrated coverage level. Analogous to DistributionPrediction for regression.

Parameters:

Name Type Description Default
class_sets list[list[str]]

List of lists — each inner list contains the class labels included in that sample's prediction set.

required
class_names list[str]

Ordered list of all class names.

required
probabilities ndarray

(n_samples, n_classes) matrix of softmax probabilities.

required
coverage_target float

The target marginal coverage level.

required
threshold float

The APS threshold used to construct the sets.

required
Source code in uncertainty_flow/core/prediction_set.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class PredictionSet:
    """Prediction set for conformal classification.

    For each sample, stores the set of classes included at the calibrated
    coverage level. Analogous to ``DistributionPrediction`` for regression.

    Args:
        class_sets: List of lists — each inner list contains the class labels
            included in that sample's prediction set.
        class_names: Ordered list of all class names.
        probabilities: (n_samples, n_classes) matrix of softmax probabilities.
        coverage_target: The target marginal coverage level.
        threshold: The APS threshold used to construct the sets.
    """

    def __init__(
        self,
        class_sets: list[list[str]],
        class_names: list[str],
        probabilities: np.ndarray,
        coverage_target: float,
        threshold: float,
    ):
        self._class_sets = class_sets
        self._class_names = class_names
        self._probabilities = probabilities
        self._coverage_target = coverage_target
        self._threshold = threshold
        self._n_samples = len(class_sets)
        self._n_classes = len(class_names)

    def set(self, sample_index: int | None = None) -> list[str] | list[list[str]]:
        """Return the prediction set for one or all samples.

        Args:
            sample_index: Index of a single sample, or ``None`` for all.

        Returns:
            Single list of class labels, or list of lists for all samples.
        """
        if sample_index is not None:
            return self._class_sets[sample_index]
        return self._class_sets

    @property
    def coverage(self) -> float:
        """Return the theoretical target coverage level."""
        return self._coverage_target

    @property
    def size(self) -> float:
        """Return the average set size across all samples."""
        return float(np.mean([len(s) for s in self._class_sets]))

    def size_by_sample(self) -> list[int]:
        """Return the set size for each sample."""
        return [len(s) for s in self._class_sets]

    def probabilities(self) -> pl.DataFrame:
        """Return the softmax probability matrix as a Polars DataFrame.

        Returns:
            DataFrame with columns ``class_<name>``.
        """
        schema = [f"class_{c}" for c in self._class_names]
        return pl.DataFrame(self._probabilities, schema=schema, orient="row")

    def summary(self) -> pl.DataFrame:
        """Return a one-row summary of the prediction set.

        Columns: coverage_target, avg_set_size, n_samples, n_classes.
        """
        return pl.DataFrame(
            [
                {
                    "coverage_target": self._coverage_target,
                    "avg_set_size": self.size,
                    "n_samples": self._n_samples,
                    "n_classes": self._n_classes,
                }
            ]
        )

    def __repr__(self) -> str:
        return (
            f"PredictionSet(n_samples={self._n_samples}, "
            f"n_classes={self._n_classes}, "
            f"coverage={self._coverage_target:.2f}, "
            f"avg_size={self.size:.2f})"
        )

coverage property

Return the theoretical target coverage level.

size property

Return the average set size across all samples.

set(sample_index=None)

Return the prediction set for one or all samples.

Parameters:

Name Type Description Default
sample_index int | None

Index of a single sample, or None for all.

None

Returns:

Type Description
list[str] | list[list[str]]

Single list of class labels, or list of lists for all samples.

Source code in uncertainty_flow/core/prediction_set.py
44
45
46
47
48
49
50
51
52
53
54
55
def set(self, sample_index: int | None = None) -> list[str] | list[list[str]]:
    """Return the prediction set for one or all samples.

    Args:
        sample_index: Index of a single sample, or ``None`` for all.

    Returns:
        Single list of class labels, or list of lists for all samples.
    """
    if sample_index is not None:
        return self._class_sets[sample_index]
    return self._class_sets

size_by_sample()

Return the set size for each sample.

Source code in uncertainty_flow/core/prediction_set.py
67
68
69
def size_by_sample(self) -> list[int]:
    """Return the set size for each sample."""
    return [len(s) for s in self._class_sets]

probabilities()

Return the softmax probability matrix as a Polars DataFrame.

Returns:

Type Description
DataFrame

DataFrame with columns class_<name>.

Source code in uncertainty_flow/core/prediction_set.py
71
72
73
74
75
76
77
78
def probabilities(self) -> pl.DataFrame:
    """Return the softmax probability matrix as a Polars DataFrame.

    Returns:
        DataFrame with columns ``class_<name>``.
    """
    schema = [f"class_{c}" for c in self._class_names]
    return pl.DataFrame(self._probabilities, schema=schema, orient="row")

summary()

Return a one-row summary of the prediction set.

Columns: coverage_target, avg_set_size, n_samples, n_classes.

Source code in uncertainty_flow/core/prediction_set.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def summary(self) -> pl.DataFrame:
    """Return a one-row summary of the prediction set.

    Columns: coverage_target, avg_set_size, n_samples, n_classes.
    """
    return pl.DataFrame(
        [
            {
                "coverage_target": self._coverage_target,
                "avg_set_size": self.size,
                "n_samples": self._n_samples,
                "n_classes": self._n_classes,
            }
        ]
    )

get_config()

Get the global configuration instance, creating a default on first call.

Source code in uncertainty_flow/core/config.py
84
85
86
87
88
89
def get_config() -> QuantileConfig:
    """Get the global configuration instance, creating a default on first call."""
    global _config
    if _config is None:
        _config = QuantileConfig()
    return _config

reset_config()

Reset configuration to defaults.

Source code in uncertainty_flow/core/config.py
 98
 99
100
101
def reset_config() -> None:
    """Reset configuration to defaults."""
    global _config
    _config = None

set_config(config)

Set a custom global configuration.

Source code in uncertainty_flow/core/config.py
92
93
94
95
def set_config(config: QuantileConfig) -> None:
    """Set a custom global configuration."""
    global _config
    _config = config

fit_parametric(quantile_values, quantile_levels, family='auto')

Fit a parametric distribution from quantile knots.

Parameters:

Name Type Description Default
quantile_values ndarray

1-D array of predicted quantile values.

required
quantile_levels ndarray

1-D array of quantile levels in (0, 1).

required
family str

Distribution family or "auto" for best selection.

'auto'

Returns:

Type Description
ParametricDistribution

Fitted ParametricDistribution.

Source code in uncertainty_flow/core/parametric.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def fit_parametric(
    quantile_values: np.ndarray,
    quantile_levels: np.ndarray,
    family: str = "auto",
) -> ParametricDistribution:
    """
    Fit a parametric distribution from quantile knots.

    Args:
        quantile_values: 1-D array of predicted quantile values.
        quantile_levels: 1-D array of quantile levels in (0, 1).
        family: Distribution family or ``"auto"`` for best selection.

    Returns:
        Fitted ``ParametricDistribution``.
    """
    qv = np.sort(np.asarray(quantile_values, dtype=np.float64))
    ql = np.asarray(quantile_levels, dtype=np.float64)

    if family == "auto":
        best_dist = None
        best_ks = np.inf
        for fam in ParametricDistribution._FAMILIES:
            spec = _FAMILY_REGISTRY[fam]
            try:
                init = spec.initial_fit(qv, ql)
                refined = _refine_params_impl(fam, init, qv, ql)
                ks = _ks_distance_impl(spec, refined, qv, ql)
                if ks < best_ks:
                    best_ks = ks
                    best_dist = ParametricDistribution(fam, refined, qv, ql)
            except (ValueError, OverflowError, RuntimeError):
                continue
        if best_dist is None:
            spec = _FAMILY_REGISTRY["normal"]
            init = spec.initial_fit(qv, ql)
            best_dist = ParametricDistribution("normal", init, qv, ql)
        return best_dist

    spec = _FAMILY_REGISTRY[family]
    init = spec.initial_fit(qv, ql)
    refined = _refine_params_impl(family, init, qv, ql)
    return ParametricDistribution(family, refined, qv, ql)