Skip to content

Models

uncertainty_flow.models

Native uncertainty quantification models.

DeepQuantileNet

Bases: BaseQuantileNeuralNet, RegressorMixin

Multi-quantile neural network with shared trunk architecture (sklearn backend).

Architecture

Input → Shared MLP Trunk → Hidden Features → [Linear Head Q0, Linear Head Q1, ...]

The shared trunk is implemented by extracting the hidden layer representation from a median-trained MLP and using it as features for linear quantile heads.

Coverage guarantee: ⚠️ Empirical only Non-crossing: ✅ (via post-prediction sorting, or post-hoc projection when non_crossing_penalty > 0)

Examples:

>>> from uncertainty_flow.models import DeepQuantileNet
>>> import polars as pl
>>> import numpy as np
>>> np.random.seed(42)
>>> df = pl.DataFrame({
...     "x1": np.random.randn(100),
...     "x2": np.random.randn(100),
...     "y": 2 * np.random.randn(100) + 5,
... })
>>> model = DeepQuantileNet(
...     hidden_layer_sizes=(64, 32),
...     random_state=42,
... )
>>> model.fit(df, target="y")
>>> pred = model.predict(df)
>>> pred.interval(0.9)
Source code in uncertainty_flow/models/deep_quantile.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class DeepQuantileNet(BaseQuantileNeuralNet, RegressorMixin):
    """
    Multi-quantile neural network with shared trunk architecture (sklearn backend).

    Architecture:
        Input → Shared MLP Trunk → Hidden Features → [Linear Head Q0, Linear Head Q1, ...]

    The shared trunk is implemented by extracting the hidden layer representation
    from a median-trained MLP and using it as features for linear quantile heads.

    Coverage guarantee: ⚠️ Empirical only
    Non-crossing: ✅ (via post-prediction sorting, or post-hoc projection
    when ``non_crossing_penalty > 0``)

    Examples:
        >>> from uncertainty_flow.models import DeepQuantileNet
        >>> import polars as pl
        >>> import numpy as np

        >>> np.random.seed(42)
        >>> df = pl.DataFrame({
        ...     "x1": np.random.randn(100),
        ...     "x2": np.random.randn(100),
        ...     "y": 2 * np.random.randn(100) + 5,
        ... })
        >>> model = DeepQuantileNet(
        ...     hidden_layer_sizes=(64, 32),
        ...     random_state=42,
        ... )
        >>> model.fit(df, target="y")
        >>> pred = model.predict(df)
        >>> pred.interval(0.9)
    """

    def __init__(
        self,
        hidden_layer_sizes: tuple[int, ...] = (100, 50),
        quantile_levels: list[float] | None = None,
        calibration_size: float = 0.2,
        trunk_alpha: float = 0.0001,
        trunk_max_iter: int = 500,
        head_solver: str = "pinball",
        non_crossing_penalty: float = 0.1,
        random_state: int | None = None,
    ):
        """
        Initialize DeepQuantileNet.

        Args:
            hidden_layer_sizes: Tuple of hidden layer sizes for the trunk MLP.
                E.g., (100, 50) means two hidden layers with 100 and 50 units.
            quantile_levels: Quantile levels to predict. Defaults to DEFAULT_QUANTILES.
            calibration_size: Fraction of data held out as calibration set (0-1).
            trunk_alpha: L2 regularization parameter for the trunk MLP.
            trunk_max_iter: Maximum iterations for the trunk MLP optimizer.
            head_solver: Solver for quantile heads. Currently only "pinball" supported.
            non_crossing_penalty: When > 0, adds a training-time penalty term
                λ Σ max(0, q_{i+1} - q_i)^2 to the joint pinball loss, encouraging
                monotone quantiles during fitting. Higher values enforce stricter
                monotonicity (default 0.0 = disabled).
            random_state: Random seed for reproducibility.
        """
        super().__init__(
            hidden_layer_sizes=hidden_layer_sizes,
            quantile_levels=quantile_levels,
            calibration_size=calibration_size,
            random_state=random_state,
        )
        self.trunk_alpha = trunk_alpha
        self.trunk_max_iter = trunk_max_iter
        self.head_solver = head_solver
        self.non_crossing_penalty = non_crossing_penalty

    def _fit_backend(
        self,
        x: np.ndarray,
        y: np.ndarray,
        **kwargs: Any,
    ) -> None:
        self._trunk_ = MLPRegressor(
            hidden_layer_sizes=self.hidden_layer_sizes,
            activation="relu",
            solver="adam",
            alpha=self.trunk_alpha,
            max_iter=self.trunk_max_iter,
            random_state=self.random_state,
        )
        self._trunk_.fit(x, y)

        self._trunk_features_ = self._extract_trunk_features(x)

        self._head_coefs_: dict[float, np.ndarray] = {}
        self._head_intercepts_: dict[float, float] = {}

        for q in self.quantile_levels:
            head = LinearQuantileHead(solver=self.head_solver)
            head.fit(self._trunk_features_, y, quantile=q)
            assert head.coef_ is not None
            assert head.intercept_ is not None
            self._head_coefs_[q] = head.coef_
            self._head_intercepts_[q] = head.intercept_

        if self.non_crossing_penalty > 0:
            self._apply_non_crossing_training_penalty(x, y)

    def _apply_non_crossing_training_penalty(
        self,
        x: np.ndarray,
        y: np.ndarray,
    ) -> None:
        """Jointly refine all quantile heads with a non-crossing penalty.

        Minimises the sum of pinball losses across all quantiles plus a
        penalty term: λ * mean( Σ_i max(0, q_{i+1} - q_i)^2 ) where
        λ = ``self.non_crossing_penalty``.
        """
        from scipy.optimize import minimize

        trunk_features = self._extract_trunk_features(x)
        n_samples, n_features = trunk_features.shape
        n_quantiles = len(self.quantile_levels)
        lam = self.non_crossing_penalty

        # Flatten all head parameters into one vector:
        # [coef_q1 (n_features), intercept_q1, coef_q2 (n_features), intercept_q2, ...]
        def _pack() -> np.ndarray:
            parts = []
            for q in self.quantile_levels:
                parts.append(self._head_coefs_[q])
                parts.append(np.array([self._head_intercepts_[q]]))
            return np.concatenate(parts)

        def _unpack(params: np.ndarray) -> tuple[list[np.ndarray], list[float]]:
            coefs = []
            intercepts = []
            idx = 0
            for _ in self.quantile_levels:
                coefs.append(params[idx : idx + n_features])
                intercepts.append(float(params[idx + n_features]))
                idx += n_features + 1
            return coefs, intercepts

        def _joint_loss(params: np.ndarray) -> float:
            coefs, intercepts = _unpack(params)

            # Compute predictions for all quantiles
            preds = np.empty((n_samples, n_quantiles))
            for j in range(n_quantiles):
                preds[:, j] = trunk_features @ coefs[j] + intercepts[j]

            # Pinball loss for each quantile
            total_pinball = 0.0
            for j, q in enumerate(self.quantile_levels):
                residuals = y - preds[:, j]
                weights = np.where(residuals < 0, q, 1 - q)
                total_pinball += np.sum(weights * np.abs(residuals))

            # Non-crossing penalty: λ * Σ max(0, q_{j+1} - q_j)^2
            crossing_penalty = 0.0
            if n_quantiles > 1:
                diffs = preds[:, 1:] - preds[:, :-1]
                crossing_penalty = lam * np.sum(np.maximum(0, -diffs) ** 2)

            # Small L2 regularisation to keep parameters stable
            l2_penalty = 1e-6 * np.sum(params**2)

            return total_pinball + crossing_penalty + l2_penalty

        x0 = _pack()
        result = minimize(
            _joint_loss,
            x0,
            method="L-BFGS-B",
            options={"maxiter": 500},
        )

        coefs, intercepts = _unpack(result.x)
        for j, q in enumerate(self.quantile_levels):
            self._head_coefs_[q] = coefs[j]
            self._head_intercepts_[q] = intercepts[j]

    def _predict_backend_raw(self, trunk_features: np.ndarray) -> np.ndarray:
        coef_matrix = np.column_stack([self._head_coefs_[q] for q in self.quantile_levels])
        intercepts = np.array([self._head_intercepts_[q] for q in self.quantile_levels])
        return trunk_features @ coef_matrix + intercepts

    def _predict_backend(self, x: np.ndarray) -> np.ndarray:
        trunk_features = self._extract_trunk_features(x)
        coef_matrix = np.column_stack([self._head_coefs_[q] for q in self.quantile_levels])
        intercepts = np.array([self._head_intercepts_[q] for q in self.quantile_levels])
        return trunk_features @ coef_matrix + intercepts  # type: ignore[no-any-return]

    def _extract_trunk_features(self, x: np.ndarray) -> np.ndarray:
        """
        Extract hidden layer features from the trunk MLP.

        Args:
            x: Scaled input features.

        Returns:
            Hidden layer activations.
        """
        activations = x
        for coef, intercept in zip(self._trunk_.coefs_[:-1], self._trunk_.intercepts_[:-1]):
            activations = np.dot(activations, coef) + intercept
            activations = self._relu(activations)
        return activations

    def _relu(self, x: np.ndarray) -> np.ndarray:
        """ReLU activation function."""
        return np.maximum(0, x)  # type: ignore[no-any-return]

__init__(hidden_layer_sizes=(100, 50), quantile_levels=None, calibration_size=0.2, trunk_alpha=0.0001, trunk_max_iter=500, head_solver='pinball', non_crossing_penalty=0.1, random_state=None)

Initialize DeepQuantileNet.

Parameters:

Name Type Description Default
hidden_layer_sizes tuple[int, ...]

Tuple of hidden layer sizes for the trunk MLP. E.g., (100, 50) means two hidden layers with 100 and 50 units.

(100, 50)
quantile_levels list[float] | None

Quantile levels to predict. Defaults to DEFAULT_QUANTILES.

None
calibration_size float

Fraction of data held out as calibration set (0-1).

0.2
trunk_alpha float

L2 regularization parameter for the trunk MLP.

0.0001
trunk_max_iter int

Maximum iterations for the trunk MLP optimizer.

500
head_solver str

Solver for quantile heads. Currently only "pinball" supported.

'pinball'
non_crossing_penalty float

When > 0, adds a training-time penalty term λ Σ max(0, q_{i+1} - q_i)^2 to the joint pinball loss, encouraging monotone quantiles during fitting. Higher values enforce stricter monotonicity (default 0.0 = disabled).

0.1
random_state int | None

Random seed for reproducibility.

None
Source code in uncertainty_flow/models/deep_quantile.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __init__(
    self,
    hidden_layer_sizes: tuple[int, ...] = (100, 50),
    quantile_levels: list[float] | None = None,
    calibration_size: float = 0.2,
    trunk_alpha: float = 0.0001,
    trunk_max_iter: int = 500,
    head_solver: str = "pinball",
    non_crossing_penalty: float = 0.1,
    random_state: int | None = None,
):
    """
    Initialize DeepQuantileNet.

    Args:
        hidden_layer_sizes: Tuple of hidden layer sizes for the trunk MLP.
            E.g., (100, 50) means two hidden layers with 100 and 50 units.
        quantile_levels: Quantile levels to predict. Defaults to DEFAULT_QUANTILES.
        calibration_size: Fraction of data held out as calibration set (0-1).
        trunk_alpha: L2 regularization parameter for the trunk MLP.
        trunk_max_iter: Maximum iterations for the trunk MLP optimizer.
        head_solver: Solver for quantile heads. Currently only "pinball" supported.
        non_crossing_penalty: When > 0, adds a training-time penalty term
            λ Σ max(0, q_{i+1} - q_i)^2 to the joint pinball loss, encouraging
            monotone quantiles during fitting. Higher values enforce stricter
            monotonicity (default 0.0 = disabled).
        random_state: Random seed for reproducibility.
    """
    super().__init__(
        hidden_layer_sizes=hidden_layer_sizes,
        quantile_levels=quantile_levels,
        calibration_size=calibration_size,
        random_state=random_state,
    )
    self.trunk_alpha = trunk_alpha
    self.trunk_max_iter = trunk_max_iter
    self.head_solver = head_solver
    self.non_crossing_penalty = non_crossing_penalty

QuantileForestForecaster

Bases: BaseUncertaintyModel

Quantile Regression Forest for time series.

Stores full leaf distributions to compute true quantiles (not just split conformal like wrappers).

Coverage guarantee: ⚠️ Empirical only Non-crossing: ✅ (by leaf distribution construction)

Examples:

>>> from uncertainty_flow.models import QuantileForestForecaster
>>> import polars as pl
>>>
>>> df = pl.DataFrame({
...     "date": range(10),
...     "price": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
... })
>>> model = QuantileForestForecaster(
...     targets="price",
...     horizon=3,
...     random_state=42,
... )
>>> model.fit(df)
>>> pred = model.predict(df)
Source code in uncertainty_flow/models/quantile_forest.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
class QuantileForestForecaster(BaseUncertaintyModel):
    """
    Quantile Regression Forest for time series.

    Stores full leaf distributions to compute true quantiles
    (not just split conformal like wrappers).

    Coverage guarantee: ⚠️ Empirical only
    Non-crossing: ✅ (by leaf distribution construction)

    Examples:
        >>> from uncertainty_flow.models import QuantileForestForecaster
        >>> import polars as pl
        >>>
        >>> df = pl.DataFrame({
        ...     "date": range(10),
        ...     "price": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        ... })
        >>> model = QuantileForestForecaster(
        ...     targets="price",
        ...     horizon=3,
        ...     random_state=42,
        ... )
        >>> model.fit(df)
        >>> pred = model.predict(df)
    """

    def __init__(
        self,
        targets: str | list[str],
        horizon: int,
        n_estimators: int = 200,
        min_samples_leaf: int = 5,
        max_depth: int | None = None,
        copula_family: str = "auto",
        calibration_size: float = 0.2,
        auto_tune: bool = True,
        uncertainty_features: list[str] | None = None,
        random_state: int | None = None,
    ):
        """
        Initialize QuantileForestForecaster.

        Args:
            targets: Target column name(s)
            horizon: Forecast horizon
            n_estimators: Number of trees in the forest
            min_samples_leaf: Minimum samples per leaf (controls distribution richness)
            max_depth: Maximum tree depth
            copula_family: "auto" (BIC selection) or one of "gaussian", "clayton",
                "gumbel", "frank". Use "independent" for no inter-target correlation.
            calibration_size: Fraction for calibration (from END)
            auto_tune: Whether to tune supported hyperparameters before final fit
            uncertainty_features: Optional hint for heteroscedastic features
            random_state: Random seed
        """
        self.targets = [targets] if isinstance(targets, str) else targets
        self.horizon = horizon
        self.n_estimators = n_estimators
        self.min_samples_leaf = min_samples_leaf
        self.max_depth = max_depth
        self.copula_family = copula_family
        self.calibration_size = calibration_size
        self.auto_tune = auto_tune
        self.uncertainty_features = uncertainty_features
        self.random_state = random_state

        # Fitted attributes
        self._fitted = False
        self._copula: BaseCopula | None = None
        self._models: dict[str, RandomForestRegressor] = {}
        self._leaf_distributions: dict[str, list] = {}
        self._quantile_levels_: np.ndarray | None = None
        self._feature_cols_: dict[str, list[str]] = {}
        self._uncertainty_drivers_: pl.DataFrame | None = None
        self.tuned_params_: dict[str, float | int] = {}

    def _resolve_quantile_levels(self) -> np.ndarray:
        """Return fit-time quantile levels, with backward-compatible fallback."""
        if self._quantile_levels_ is not None:
            return self._quantile_levels_

        fallback_levels = np.asarray(list(DEFAULT_QUANTILES), dtype=float)
        if self._leaf_distributions:
            first_target = next(iter(self._leaf_distributions))
            first_tree = self._leaf_distributions[first_target][0]
            if first_tree["quantiles"].shape[1] != len(fallback_levels):
                raise InvalidDataError(
                    "Current config quantile count does not match fitted leaf distributions. "
                    "Refit the model after setting the desired quantile configuration."
                )
        return fallback_levels

    def _auto_tune(self, data: pl.DataFrame) -> None:
        """Tune params using validation splits, with CV averaging when inner splits exist."""
        plan = select_validation_plan(
            data,
            task_type="time_series",
            random_state=self.random_state,
            holdout_fraction=0.2,
            hybrid_mode=False,
        )
        eval_splits = plan.inner_splits if plan.inner_splits else [plan.outer_split]
        tune_calibration_size = valid_calibration_candidates(
            len(eval_splits[0][0]), self.calibration_size, [0.25, 0.3]
        )[0]

        best_score = float("inf")
        best_params: dict[str, float | int] = {}

        for n_estimators in candidate_values(self.n_estimators, [20, 30, 50]):
            for min_samples_leaf in candidate_values(self.min_samples_leaf, [3, 5, 10]):
                split_scores: list[float] = []
                for split_train, split_val in eval_splits:
                    candidate = QuantileForestForecaster(
                        targets=self.targets,
                        horizon=self.horizon,
                        n_estimators=n_estimators,
                        min_samples_leaf=min_samples_leaf,
                        max_depth=self.max_depth,
                        copula_family=self.copula_family,
                        calibration_size=tune_calibration_size,
                        auto_tune=False,
                        uncertainty_features=self.uncertainty_features,
                        random_state=self.random_state,
                    )
                    with warnings.catch_warnings():
                        warnings.simplefilter("ignore")
                        candidate.fit(split_train)
                        pred = candidate.predict(split_val)
                    actuals = split_val.select(self.targets)
                    score = score_distribution_prediction(
                        pred, actuals, self.targets, confidence=0.9
                    )
                    split_scores.append(score)
                avg_score = float(np.mean(split_scores))
                if avg_score < best_score:
                    best_score = avg_score
                    best_params = {
                        "n_estimators": int(n_estimators),
                        "min_samples_leaf": int(min_samples_leaf),
                    }

        self.n_estimators = int(best_params.get("n_estimators", self.n_estimators))
        self.min_samples_leaf = int(best_params.get("min_samples_leaf", self.min_samples_leaf))
        self.tuned_params_ = best_params

    def fit(
        self,
        data: PolarsInput,
        target: TargetSpec | None = None,
        **kwargs,
    ) -> "QuantileForestForecaster":
        """
        Fit the quantile forest forecaster.

        Args:
            data: Polars DataFrame or LazyFrame with time series
            target: Target column name(s) - uses self.targets if not provided
            **kwargs: Additional parameters (unused)

        Returns:
            self (for method chaining)
        """
        # Materialize if needed
        data = materialize_lazyframe(data)
        self._quantile_levels_ = np.asarray(list(DEFAULT_QUANTILES), dtype=float)

        if self.auto_tune:
            self._auto_tune(data)

        # Automatic validation split strategy selection
        plan = select_validation_plan(
            data,
            task_type="time_series",
            random_state=self.random_state,
            holdout_fraction=self.calibration_size,
        )
        train, calib = plan.outer_split
        residual_matrix = []

        for target in self.targets:
            feature_cols = [col for col in train.columns if col not in self.targets]
            self._feature_cols_[target] = feature_cols

            x_train = to_numpy(train, feature_cols)
            y_train = to_numpy(train, [target]).flatten()
            x_calib = to_numpy(calib, feature_cols)
            y_calib = to_numpy(calib, [target]).flatten()

            if not np.all(np.isfinite(x_train)):
                raise InvalidDataError("Feature matrix contains NaN or Inf values")
            if not np.all(np.isfinite(y_train)):
                raise InvalidDataError("Target vector contains NaN or Inf values")

            rf = RandomForestRegressor(
                n_estimators=self.n_estimators,
                min_samples_leaf=self.min_samples_leaf,
                max_depth=self.max_depth,
                random_state=self.random_state,
            )
            rf.fit(x_train, y_train)
            self._models[target] = rf

            self._leaf_distributions[target] = self._extract_leaf_distributions(
                rf, x_train, y_train, self._quantile_levels_
            )
            residual_matrix.append(y_calib - rf.predict(x_calib))

        if len(self.targets) > 1 and self.copula_family != "independent":
            stacked_residuals = np.column_stack(residual_matrix)

            if self.copula_family == "auto":
                selected = auto_select_copula(stacked_residuals)
            elif self.copula_family in COPULA_FAMILIES:
                selected = self.copula_family
            else:
                raise InvalidDataError(
                    f"Unknown copula_family: {self.copula_family}. "
                    f"Valid options: auto, gaussian, clayton, gumbel, frank, independent"
                )

            self._copula = COPULA_FAMILIES[selected]().fit(stacked_residuals)
        else:
            self._copula = None

        self._fitted = True
        return self

    def _extract_leaf_distributions(
        self,
        rf: RandomForestRegressor,
        x: np.ndarray,
        y: np.ndarray,
        quantile_levels: np.ndarray,
    ) -> list[dict[str, np.ndarray]]:
        """
        Extract training values that fall into each leaf.

        Args:
            rf: Fitted random forest
            X: Feature matrix
            y: Target values

        Returns:
            List of dicts mapping leaf_id to leaf values
        """
        distributions: list[dict[str, np.ndarray]] = []

        for tree in rf.estimators_:
            leaf_ids = tree.apply(x)
            unique_leaves, inverse = np.unique(leaf_ids, return_inverse=True)

            leaf_quantiles = np.zeros((len(unique_leaves), len(quantile_levels)))
            for leaf_idx in range(len(unique_leaves)):
                leaf_values = y[inverse == leaf_idx]
                leaf_quantiles[leaf_idx] = np.quantile(leaf_values, quantile_levels)

            distributions.append(
                {
                    "leaf_ids": unique_leaves.astype(int),
                    "quantiles": leaf_quantiles,
                }
            )

        return distributions

    def _predict_quantiles(
        self,
        rf: RandomForestRegressor,
        leaf_dists: list,
        x: np.ndarray,
        quantile_levels: list[float] | None = None,
    ) -> np.ndarray:
        """
        Predict quantiles from leaf distributions.

        Args:
            rf: Fitted random forest
            leaf_dists: Leaf distributions from training
            X: Feature matrix
            quantile_levels: Quantile levels to compute (unused, kept for API compat)

        Returns:
            Quantile predictions shape (n_samples, n_quantiles)
        """
        del quantile_levels
        quantile_count = leaf_dists[0]["quantiles"].shape[1]
        predictions = np.zeros((len(x), quantile_count))
        all_leaf_ids = rf.apply(x)

        for tree_idx, tree_leaf_ids in enumerate(all_leaf_ids.T):
            tree_dist = leaf_dists[tree_idx]
            positions = np.searchsorted(tree_dist["leaf_ids"], tree_leaf_ids)
            predictions += tree_dist["quantiles"][positions]

        predictions /= len(leaf_dists)

        return predictions

    def predict(self, data: PolarsInput) -> DistributionPrediction:
        """
        Generate probabilistic forecasts.

        Args:
            data: Polars DataFrame or LazyFrame

        Returns:
            DistributionPrediction with quantile forecasts
        """
        if not self._fitted:
            raise ModelNotFittedError("QuantileForestForecaster")

        # Materialize if needed
        data = materialize_lazyframe(data)

        all_quantiles = []
        quantile_levels = self._resolve_quantile_levels()

        for target in self.targets:
            x = to_numpy(data, self._feature_cols_[target])
            rf = self._models[target]
            leaf_dists = self._leaf_distributions[target]

            quantile_matrix = self._predict_quantiles(rf, leaf_dists, x)
            if quantile_matrix.shape[1] != len(quantile_levels):
                raise InvalidDataError(
                    "Stored leaf distribution quantiles do not match configured quantile levels. "
                    "Refit the model to regenerate compatible quantiles."
                )

            all_quantiles.append(quantile_matrix)

        if len(self.targets) == 1:
            final_matrix = all_quantiles[0]
        else:
            final_matrix = np.column_stack(
                [
                    all_quantiles[t][:, i]
                    for t in range(len(self.targets))
                    for i in range(len(quantile_levels))
                ]
            )

        return DistributionPrediction(
            quantile_matrix=final_matrix,
            quantile_levels=quantile_levels.tolist(),
            target_names=self.targets,
            copula=self._copula,
        )

    @property
    def uncertainty_drivers_(self) -> pl.DataFrame | None:
        """Return residual correlation analysis results."""
        return self._uncertainty_drivers_

uncertainty_drivers_ property

Return residual correlation analysis results.

__init__(targets, horizon, n_estimators=200, min_samples_leaf=5, max_depth=None, copula_family='auto', calibration_size=0.2, auto_tune=True, uncertainty_features=None, random_state=None)

Initialize QuantileForestForecaster.

Parameters:

Name Type Description Default
targets str | list[str]

Target column name(s)

required
horizon int

Forecast horizon

required
n_estimators int

Number of trees in the forest

200
min_samples_leaf int

Minimum samples per leaf (controls distribution richness)

5
max_depth int | None

Maximum tree depth

None
copula_family str

"auto" (BIC selection) or one of "gaussian", "clayton", "gumbel", "frank". Use "independent" for no inter-target correlation.

'auto'
calibration_size float

Fraction for calibration (from END)

0.2
auto_tune bool

Whether to tune supported hyperparameters before final fit

True
uncertainty_features list[str] | None

Optional hint for heteroscedastic features

None
random_state int | None

Random seed

None
Source code in uncertainty_flow/models/quantile_forest.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(
    self,
    targets: str | list[str],
    horizon: int,
    n_estimators: int = 200,
    min_samples_leaf: int = 5,
    max_depth: int | None = None,
    copula_family: str = "auto",
    calibration_size: float = 0.2,
    auto_tune: bool = True,
    uncertainty_features: list[str] | None = None,
    random_state: int | None = None,
):
    """
    Initialize QuantileForestForecaster.

    Args:
        targets: Target column name(s)
        horizon: Forecast horizon
        n_estimators: Number of trees in the forest
        min_samples_leaf: Minimum samples per leaf (controls distribution richness)
        max_depth: Maximum tree depth
        copula_family: "auto" (BIC selection) or one of "gaussian", "clayton",
            "gumbel", "frank". Use "independent" for no inter-target correlation.
        calibration_size: Fraction for calibration (from END)
        auto_tune: Whether to tune supported hyperparameters before final fit
        uncertainty_features: Optional hint for heteroscedastic features
        random_state: Random seed
    """
    self.targets = [targets] if isinstance(targets, str) else targets
    self.horizon = horizon
    self.n_estimators = n_estimators
    self.min_samples_leaf = min_samples_leaf
    self.max_depth = max_depth
    self.copula_family = copula_family
    self.calibration_size = calibration_size
    self.auto_tune = auto_tune
    self.uncertainty_features = uncertainty_features
    self.random_state = random_state

    # Fitted attributes
    self._fitted = False
    self._copula: BaseCopula | None = None
    self._models: dict[str, RandomForestRegressor] = {}
    self._leaf_distributions: dict[str, list] = {}
    self._quantile_levels_: np.ndarray | None = None
    self._feature_cols_: dict[str, list[str]] = {}
    self._uncertainty_drivers_: pl.DataFrame | None = None
    self.tuned_params_: dict[str, float | int] = {}

fit(data, target=None, **kwargs)

Fit the quantile forest forecaster.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame with time series

required
target TargetSpec | None

Target column name(s) - uses self.targets if not provided

None
**kwargs

Additional parameters (unused)

{}

Returns:

Type Description
QuantileForestForecaster

self (for method chaining)

Source code in uncertainty_flow/models/quantile_forest.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def fit(
    self,
    data: PolarsInput,
    target: TargetSpec | None = None,
    **kwargs,
) -> "QuantileForestForecaster":
    """
    Fit the quantile forest forecaster.

    Args:
        data: Polars DataFrame or LazyFrame with time series
        target: Target column name(s) - uses self.targets if not provided
        **kwargs: Additional parameters (unused)

    Returns:
        self (for method chaining)
    """
    # Materialize if needed
    data = materialize_lazyframe(data)
    self._quantile_levels_ = np.asarray(list(DEFAULT_QUANTILES), dtype=float)

    if self.auto_tune:
        self._auto_tune(data)

    # Automatic validation split strategy selection
    plan = select_validation_plan(
        data,
        task_type="time_series",
        random_state=self.random_state,
        holdout_fraction=self.calibration_size,
    )
    train, calib = plan.outer_split
    residual_matrix = []

    for target in self.targets:
        feature_cols = [col for col in train.columns if col not in self.targets]
        self._feature_cols_[target] = feature_cols

        x_train = to_numpy(train, feature_cols)
        y_train = to_numpy(train, [target]).flatten()
        x_calib = to_numpy(calib, feature_cols)
        y_calib = to_numpy(calib, [target]).flatten()

        if not np.all(np.isfinite(x_train)):
            raise InvalidDataError("Feature matrix contains NaN or Inf values")
        if not np.all(np.isfinite(y_train)):
            raise InvalidDataError("Target vector contains NaN or Inf values")

        rf = RandomForestRegressor(
            n_estimators=self.n_estimators,
            min_samples_leaf=self.min_samples_leaf,
            max_depth=self.max_depth,
            random_state=self.random_state,
        )
        rf.fit(x_train, y_train)
        self._models[target] = rf

        self._leaf_distributions[target] = self._extract_leaf_distributions(
            rf, x_train, y_train, self._quantile_levels_
        )
        residual_matrix.append(y_calib - rf.predict(x_calib))

    if len(self.targets) > 1 and self.copula_family != "independent":
        stacked_residuals = np.column_stack(residual_matrix)

        if self.copula_family == "auto":
            selected = auto_select_copula(stacked_residuals)
        elif self.copula_family in COPULA_FAMILIES:
            selected = self.copula_family
        else:
            raise InvalidDataError(
                f"Unknown copula_family: {self.copula_family}. "
                f"Valid options: auto, gaussian, clayton, gumbel, frank, independent"
            )

        self._copula = COPULA_FAMILIES[selected]().fit(stacked_residuals)
    else:
        self._copula = None

    self._fitted = True
    return self

predict(data)

Generate probabilistic forecasts.

Parameters:

Name Type Description Default
data PolarsInput

Polars DataFrame or LazyFrame

required

Returns:

Type Description
DistributionPrediction

DistributionPrediction with quantile forecasts

Source code in uncertainty_flow/models/quantile_forest.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def predict(self, data: PolarsInput) -> DistributionPrediction:
    """
    Generate probabilistic forecasts.

    Args:
        data: Polars DataFrame or LazyFrame

    Returns:
        DistributionPrediction with quantile forecasts
    """
    if not self._fitted:
        raise ModelNotFittedError("QuantileForestForecaster")

    # Materialize if needed
    data = materialize_lazyframe(data)

    all_quantiles = []
    quantile_levels = self._resolve_quantile_levels()

    for target in self.targets:
        x = to_numpy(data, self._feature_cols_[target])
        rf = self._models[target]
        leaf_dists = self._leaf_distributions[target]

        quantile_matrix = self._predict_quantiles(rf, leaf_dists, x)
        if quantile_matrix.shape[1] != len(quantile_levels):
            raise InvalidDataError(
                "Stored leaf distribution quantiles do not match configured quantile levels. "
                "Refit the model to regenerate compatible quantiles."
            )

        all_quantiles.append(quantile_matrix)

    if len(self.targets) == 1:
        final_matrix = all_quantiles[0]
    else:
        final_matrix = np.column_stack(
            [
                all_quantiles[t][:, i]
                for t in range(len(self.targets))
                for i in range(len(quantile_levels))
            ]
        )

    return DistributionPrediction(
        quantile_matrix=final_matrix,
        quantile_levels=quantile_levels.tolist(),
        target_names=self.targets,
        copula=self._copula,
    )