Skip to content

Utils

uncertainty_flow.utils

Utilities for uncertainty_flow.

CalibrationError

Bases: UncertaintyFlowError

Base class for calibration-related errors.

Source code in uncertainty_flow/utils/exceptions.py
39
40
class CalibrationError(UncertaintyFlowError):
    """Base class for calibration-related errors."""

CalibrationSizeError

Bases: CalibrationError

Raised when calibration set is too small.

Source code in uncertainty_flow/utils/exceptions.py
43
44
45
46
47
48
49
50
class CalibrationSizeError(CalibrationError):
    """Raised when calibration set is too small."""

    def __init__(self, n_samples: int, min_size: int = 20):
        super().__init__(
            f"Calibration set too small ({n_samples} samples). Minimum is {min_size}.",
            error_code="UF-E001",
        )

ConfigurationError

Bases: UncertaintyFlowError

Base class for configuration-related errors.

Source code in uncertainty_flow/utils/exceptions.py
53
54
class ConfigurationError(UncertaintyFlowError):
    """Base class for configuration-related errors."""

DataError

Bases: UncertaintyFlowError

Base class for data-related errors.

Source code in uncertainty_flow/utils/exceptions.py
28
29
class DataError(UncertaintyFlowError):
    """Base class for data-related errors."""

InvalidDataError

Bases: DataError

Raised when input data is invalid.

Source code in uncertainty_flow/utils/exceptions.py
32
33
34
35
36
class InvalidDataError(DataError):
    """Raised when input data is invalid."""

    def __init__(self, reason: str):
        super().__init__(f"Invalid data: {reason}", error_code="UF-E003")

ModelError

Bases: UncertaintyFlowError

Base class for model-related errors.

Source code in uncertainty_flow/utils/exceptions.py
14
15
class ModelError(UncertaintyFlowError):
    """Base class for model-related errors."""

ModelNotFittedError

Bases: ModelError

Raised when a model method is called before fitting.

Source code in uncertainty_flow/utils/exceptions.py
18
19
20
21
22
23
24
25
class ModelNotFittedError(ModelError):
    """Raised when a model method is called before fitting."""

    def __init__(self, model_name: str = "Model"):
        super().__init__(
            f"{model_name} not fitted. Call .fit() first.",
            error_code="UF-E002",
        )

QuantileError

Bases: ConfigurationError

Raised when quantile configuration is invalid.

Source code in uncertainty_flow/utils/exceptions.py
57
58
59
60
61
62
63
64
class QuantileError(ConfigurationError):
    """Raised when quantile configuration is invalid."""

    def __init__(self, reason: str):
        super().__init__(
            f"Invalid quantile configuration: {reason}",
            error_code="UF-E004",
        )

UncertaintyFlowError

Bases: ValueError

Base error class for uncertainty_flow.

Source code in uncertainty_flow/utils/exceptions.py
 4
 5
 6
 7
 8
 9
10
11
class UncertaintyFlowError(ValueError):
    """Base error class for uncertainty_flow."""

    def __init__(self, message: str, error_code: str | None = None):
        self.error_code = error_code
        if error_code:
            message = f"{message} [{error_code}]"
        super().__init__(message)

UncertaintyFlowWarning

Bases: UserWarning

Base warning class for uncertainty_flow.

Source code in uncertainty_flow/utils/exceptions.py
67
68
class UncertaintyFlowWarning(UserWarning):
    """Base warning class for uncertainty_flow."""

BaseSplit

Bases: ABC

Base class for calibration split strategies.

Source code in uncertainty_flow/utils/split.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class BaseSplit(ABC):
    """Base class for calibration split strategies."""

    @abstractmethod
    def split(
        self,
        data: pl.DataFrame,
        calibration_size: float,
    ) -> tuple[pl.DataFrame, pl.DataFrame]:
        """
        Split data into (train, calibration) sets.

        Args:
            data: Input DataFrame
            calibration_size: Fraction of data to use for calibration (0-1)

        Returns:
            Tuple of (train_data, calibration_data)

        Raises:
            ValueError: If calibration set would be too small (< 20 samples)
        """
        ...

    def _validate_calibration_size(
        self,
        n_total: int,
        n_calib: int,
    ) -> None:
        """Validate calibration set size."""
        if n_calib < 20:
            raise CalibrationSizeError(n_calib)
        if n_calib < 50:
            warnings.warn(
                f"Calibration set contains only {n_calib} samples. "
                f"Consider increasing calibration size for more stable uncertainty estimates. [UF-W001]",
                UncertaintyFlowWarning,
                stacklevel=3,
            )

split(data, calibration_size) abstractmethod

Split data into (train, calibration) sets.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame

required
calibration_size float

Fraction of data to use for calibration (0-1)

required

Returns:

Type Description
tuple[DataFrame, DataFrame]

Tuple of (train_data, calibration_data)

Raises:

Type Description
ValueError

If calibration set would be too small (< 20 samples)

Source code in uncertainty_flow/utils/split.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@abstractmethod
def split(
    self,
    data: pl.DataFrame,
    calibration_size: float,
) -> tuple[pl.DataFrame, pl.DataFrame]:
    """
    Split data into (train, calibration) sets.

    Args:
        data: Input DataFrame
        calibration_size: Fraction of data to use for calibration (0-1)

    Returns:
        Tuple of (train_data, calibration_data)

    Raises:
        ValueError: If calibration set would be too small (< 20 samples)
    """
    ...

RandomHoldoutSplit

Bases: BaseSplit

Random holdout for tabular data.

Source code in uncertainty_flow/utils/split.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class RandomHoldoutSplit(BaseSplit):
    """Random holdout for tabular data."""

    def __init__(self, random_state: int | None = None):
        """
        Initialize random holdout splitter.

        Args:
            random_state: Random seed for reproducibility
        """
        self.random_state = random_state

    def split(
        self,
        data: pl.DataFrame,
        calibration_size: float,
    ) -> tuple[pl.DataFrame, pl.DataFrame]:
        """
        Split data randomly into train and calibration sets.

        Args:
            data: Input DataFrame
            calibration_size: Fraction for calibration (0-1)

        Returns:
            Tuple of (train, calibration) DataFrames
        """
        n_total = len(data)
        n_calib = int(n_total * calibration_size)

        self._validate_calibration_size(n_total, n_calib)

        # Random split
        shuffled = data.sample(fraction=1.0, seed=self.random_state)
        train = shuffled[: n_total - n_calib]
        calib = shuffled[n_total - n_calib :]

        return train, calib

__init__(random_state=None)

Initialize random holdout splitter.

Parameters:

Name Type Description Default
random_state int | None

Random seed for reproducibility

None
Source code in uncertainty_flow/utils/split.py
66
67
68
69
70
71
72
73
def __init__(self, random_state: int | None = None):
    """
    Initialize random holdout splitter.

    Args:
        random_state: Random seed for reproducibility
    """
    self.random_state = random_state

split(data, calibration_size)

Split data randomly into train and calibration sets.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame

required
calibration_size float

Fraction for calibration (0-1)

required

Returns:

Type Description
tuple[DataFrame, DataFrame]

Tuple of (train, calibration) DataFrames

Source code in uncertainty_flow/utils/split.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def split(
    self,
    data: pl.DataFrame,
    calibration_size: float,
) -> tuple[pl.DataFrame, pl.DataFrame]:
    """
    Split data randomly into train and calibration sets.

    Args:
        data: Input DataFrame
        calibration_size: Fraction for calibration (0-1)

    Returns:
        Tuple of (train, calibration) DataFrames
    """
    n_total = len(data)
    n_calib = int(n_total * calibration_size)

    self._validate_calibration_size(n_total, n_calib)

    # Random split
    shuffled = data.sample(fraction=1.0, seed=self.random_state)
    train = shuffled[: n_total - n_calib]
    calib = shuffled[n_total - n_calib :]

    return train, calib

RollingOriginSplit

Expanding-window (rolling-origin) split for time series evaluation.

Each fold uses all data up to an origin point as training and the next horizon rows as the test set. The origin advances by step rows each fold, producing an expanding training window.

Parameters:

Name Type Description Default
n_splits int

Number of folds.

5
min_train_size int

Minimum number of rows in the first training window.

50
horizon int

Number of rows in each test set.

1
gap int

Number of rows between train end and test start (default 0).

0
step int | None

How far the origin advances per fold. Defaults to horizon.

None
Source code in uncertainty_flow/utils/split.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
class RollingOriginSplit:
    """Expanding-window (rolling-origin) split for time series evaluation.

    Each fold uses all data up to an origin point as training and the next
    ``horizon`` rows as the test set. The origin advances by ``step`` rows
    each fold, producing an expanding training window.

    Args:
        n_splits: Number of folds.
        min_train_size: Minimum number of rows in the first training window.
        horizon: Number of rows in each test set.
        gap: Number of rows between train end and test start (default 0).
        step: How far the origin advances per fold. Defaults to ``horizon``.
    """

    def __init__(
        self,
        n_splits: int = 5,
        min_train_size: int = 50,
        horizon: int = 1,
        gap: int = 0,
        step: int | None = None,
    ):
        if n_splits < 1:
            raise ValueError(f"n_splits must be >= 1, got {n_splits}")
        if min_train_size < 1:
            raise ValueError(f"min_train_size must be >= 1, got {min_train_size}")
        if horizon < 1:
            raise ValueError(f"horizon must be >= 1, got {horizon}")
        if gap < 0:
            raise ValueError(f"gap must be >= 0, got {gap}")

        self.n_splits = n_splits
        self.min_train_size = min_train_size
        self.horizon = horizon
        self.gap = gap
        self.step = step if step is not None else horizon

    def splits(
        self,
        data: pl.DataFrame,
    ) -> list[tuple[pl.DataFrame, pl.DataFrame]]:
        """
        Generate expanding-window (train, test) pairs.

        Args:
            data: DataFrame assumed to be in temporal order.

        Returns:
            List of (train_df, test_df) tuples.

        Raises:
            ValueError: If data is too short for the requested configuration.
        """
        n = len(data)
        last_train_end = n - self.gap - self.horizon
        first_train_end = self.min_train_size - 1

        if first_train_end > last_train_end:
            raise ValueError(
                f"Data too short for RollingOriginSplit: need at least "
                f"{self.min_train_size + self.gap + self.horizon} rows, got {n}"
            )

        available_folds = (last_train_end - first_train_end) // self.step + 1
        if self.n_splits > available_folds:
            raise ValueError(
                f"Requested {self.n_splits} splits but only {available_folds} "
                f"fit in data of length {n} with min_train_size={self.min_train_size}, "
                f"horizon={self.horizon}, gap={self.gap}"
            )

        origin = first_train_end
        result: list[tuple[pl.DataFrame, pl.DataFrame]] = []
        for _ in range(self.n_splits):
            train_end = origin + 1
            test_start = origin + 1 + self.gap
            test_end = test_start + self.horizon

            if test_end > n:
                raise ValueError(f"Fold extends beyond data: test_end={test_end} > n={n}")

            result.append((data[:train_end], data[test_start:test_end]))
            origin += self.step

        return result

splits(data)

Generate expanding-window (train, test) pairs.

Parameters:

Name Type Description Default
data DataFrame

DataFrame assumed to be in temporal order.

required

Returns:

Type Description
list[tuple[DataFrame, DataFrame]]

List of (train_df, test_df) tuples.

Raises:

Type Description
ValueError

If data is too short for the requested configuration.

Source code in uncertainty_flow/utils/split.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def splits(
    self,
    data: pl.DataFrame,
) -> list[tuple[pl.DataFrame, pl.DataFrame]]:
    """
    Generate expanding-window (train, test) pairs.

    Args:
        data: DataFrame assumed to be in temporal order.

    Returns:
        List of (train_df, test_df) tuples.

    Raises:
        ValueError: If data is too short for the requested configuration.
    """
    n = len(data)
    last_train_end = n - self.gap - self.horizon
    first_train_end = self.min_train_size - 1

    if first_train_end > last_train_end:
        raise ValueError(
            f"Data too short for RollingOriginSplit: need at least "
            f"{self.min_train_size + self.gap + self.horizon} rows, got {n}"
        )

    available_folds = (last_train_end - first_train_end) // self.step + 1
    if self.n_splits > available_folds:
        raise ValueError(
            f"Requested {self.n_splits} splits but only {available_folds} "
            f"fit in data of length {n} with min_train_size={self.min_train_size}, "
            f"horizon={self.horizon}, gap={self.gap}"
        )

    origin = first_train_end
    result: list[tuple[pl.DataFrame, pl.DataFrame]] = []
    for _ in range(self.n_splits):
        train_end = origin + 1
        test_start = origin + 1 + self.gap
        test_end = test_start + self.horizon

        if test_end > n:
            raise ValueError(f"Fold extends beyond data: test_end={test_end} > n={n}")

        result.append((data[:train_end], data[test_start:test_end]))
        origin += self.step

    return result

SlidingWindowSplit

Fixed-width sliding-window split for time series evaluation.

Each fold uses a training window of fixed train_size rows that slides forward by step rows each fold.

Parameters:

Name Type Description Default
n_splits int

Number of folds.

5
train_size int

Number of rows in each training window.

100
horizon int

Number of rows in each test set.

1
gap int

Number of rows between train end and test start (default 0).

0
step int | None

How far the window advances per fold. Defaults to horizon.

None
Source code in uncertainty_flow/utils/split.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
class SlidingWindowSplit:
    """Fixed-width sliding-window split for time series evaluation.

    Each fold uses a training window of fixed ``train_size`` rows that slides
    forward by ``step`` rows each fold.

    Args:
        n_splits: Number of folds.
        train_size: Number of rows in each training window.
        horizon: Number of rows in each test set.
        gap: Number of rows between train end and test start (default 0).
        step: How far the window advances per fold. Defaults to ``horizon``.
    """

    def __init__(
        self,
        n_splits: int = 5,
        train_size: int = 100,
        horizon: int = 1,
        gap: int = 0,
        step: int | None = None,
    ):
        if n_splits < 1:
            raise ValueError(f"n_splits must be >= 1, got {n_splits}")
        if train_size < 1:
            raise ValueError(f"train_size must be >= 1, got {train_size}")
        if horizon < 1:
            raise ValueError(f"horizon must be >= 1, got {horizon}")
        if gap < 0:
            raise ValueError(f"gap must be >= 0, got {gap}")

        self.n_splits = n_splits
        self.train_size = train_size
        self.horizon = horizon
        self.gap = gap
        self.step = step if step is not None else horizon

    def splits(
        self,
        data: pl.DataFrame,
    ) -> list[tuple[pl.DataFrame, pl.DataFrame]]:
        """
        Generate fixed-window (train, test) pairs.

        Args:
            data: DataFrame assumed to be in temporal order.

        Returns:
            List of (train_df, test_df) tuples.

        Raises:
            ValueError: If data is too short for the requested configuration.
        """
        n = len(data)
        if n < self.train_size + self.gap + self.horizon:
            raise ValueError(
                f"Data too short for SlidingWindowSplit: need at least "
                f"{self.train_size + self.gap + self.horizon} rows, got {n}"
            )

        result: list[tuple[pl.DataFrame, pl.DataFrame]] = []
        for i in range(self.n_splits):
            train_start = i * self.step
            train_end = train_start + self.train_size
            test_start = train_end + self.gap
            test_end = test_start + self.horizon

            if test_end > n:
                raise ValueError(
                    f"Fold {i} extends beyond data: test_end={test_end} > n={n}. "
                    f"Reduce n_splits or step."
                )

            result.append((data[train_start:train_end], data[test_start:test_end]))

        return result

splits(data)

Generate fixed-window (train, test) pairs.

Parameters:

Name Type Description Default
data DataFrame

DataFrame assumed to be in temporal order.

required

Returns:

Type Description
list[tuple[DataFrame, DataFrame]]

List of (train_df, test_df) tuples.

Raises:

Type Description
ValueError

If data is too short for the requested configuration.

Source code in uncertainty_flow/utils/split.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def splits(
    self,
    data: pl.DataFrame,
) -> list[tuple[pl.DataFrame, pl.DataFrame]]:
    """
    Generate fixed-window (train, test) pairs.

    Args:
        data: DataFrame assumed to be in temporal order.

    Returns:
        List of (train_df, test_df) tuples.

    Raises:
        ValueError: If data is too short for the requested configuration.
    """
    n = len(data)
    if n < self.train_size + self.gap + self.horizon:
        raise ValueError(
            f"Data too short for SlidingWindowSplit: need at least "
            f"{self.train_size + self.gap + self.horizon} rows, got {n}"
        )

    result: list[tuple[pl.DataFrame, pl.DataFrame]] = []
    for i in range(self.n_splits):
        train_start = i * self.step
        train_end = train_start + self.train_size
        test_start = train_end + self.gap
        test_end = test_start + self.horizon

        if test_end > n:
            raise ValueError(
                f"Fold {i} extends beyond data: test_end={test_end} > n={n}. "
                f"Reduce n_splits or step."
            )

        result.append((data[train_start:train_end], data[test_start:test_end]))

    return result

SplitPlanMetadata dataclass

Metadata describing how validation splits were selected.

Source code in uncertainty_flow/utils/split.py
327
328
329
330
331
332
333
334
335
336
337
338
@dataclass(frozen=True)
class SplitPlanMetadata:
    """Metadata describing how validation splits were selected."""

    strategy_name: str
    reason: str
    n_samples: int
    n_splits: int
    holdout_fraction: float
    random_state: int | None
    hybrid_mode: bool
    task_type: Literal["tabular", "time_series"]

TemporalHoldoutSplit

Bases: BaseSplit

Holdout from END for time series (no data leakage).

Source code in uncertainty_flow/utils/split.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class TemporalHoldoutSplit(BaseSplit):
    """Holdout from END for time series (no data leakage)."""

    def split(
        self,
        data: pl.DataFrame,
        calibration_size: float,
    ) -> tuple[pl.DataFrame, pl.DataFrame]:
        """
        Split data temporally, taking last n% for calibration.

        Args:
            data: Input DataFrame (assumed to be temporally ordered)
            calibration_size: Fraction for calibration (0-1)

        Returns:
            Tuple of (train, calibration) DataFrames
        """
        n_total = len(data)
        n_calib = int(n_total * calibration_size)

        self._validate_calibration_size(n_total, n_calib)

        # Take LAST n% for calibration (temporal ordering)
        train = data[: n_total - n_calib]
        calib = data[n_total - n_calib :]

        return train, calib

split(data, calibration_size)

Split data temporally, taking last n% for calibration.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame (assumed to be temporally ordered)

required
calibration_size float

Fraction for calibration (0-1)

required

Returns:

Type Description
tuple[DataFrame, DataFrame]

Tuple of (train, calibration) DataFrames

Source code in uncertainty_flow/utils/split.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def split(
    self,
    data: pl.DataFrame,
    calibration_size: float,
) -> tuple[pl.DataFrame, pl.DataFrame]:
    """
    Split data temporally, taking last n% for calibration.

    Args:
        data: Input DataFrame (assumed to be temporally ordered)
        calibration_size: Fraction for calibration (0-1)

    Returns:
        Tuple of (train, calibration) DataFrames
    """
    n_total = len(data)
    n_calib = int(n_total * calibration_size)

    self._validate_calibration_size(n_total, n_calib)

    # Take LAST n% for calibration (temporal ordering)
    train = data[: n_total - n_calib]
    calib = data[n_total - n_calib :]

    return train, calib

ValidationSplitPlan dataclass

Composable split plan with required outer split and optional inner splits.

Source code in uncertainty_flow/utils/split.py
341
342
343
344
345
346
347
@dataclass(frozen=True)
class ValidationSplitPlan:
    """Composable split plan with required outer split and optional inner splits."""

    outer_split: tuple[pl.DataFrame, pl.DataFrame]
    inner_splits: list[tuple[pl.DataFrame, pl.DataFrame]]
    metadata: SplitPlanMetadata

to_numpy(data, columns)

Convert Polars DataFrame or LazyFrame to NumPy array.

Raises:

Type Description
InvalidDataError

If any column is missing from the data.

Source code in uncertainty_flow/utils/polars_bridge.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def to_numpy(
    data: pl.DataFrame | pl.LazyFrame,
    columns: list[str],
) -> np.ndarray:
    """Convert Polars DataFrame or LazyFrame to NumPy array.

    Raises:
        InvalidDataError: If any column is missing from the data.
    """
    data = materialize_lazyframe(data)

    missing = [col for col in columns if col not in data.columns]
    if missing:
        raise InvalidDataError(f"Columns not found: {missing}")

    return data.select(columns).to_numpy()

to_numpy_series(series)

Convert Polars Series to NumPy array, zero-copy when possible.

Falls back to regular conversion if zero-copy isn't possible.

Raises:

Type Description
InvalidDataError

If input is not a pl.Series.

Source code in uncertainty_flow/utils/polars_bridge.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def to_numpy_series(series: pl.Series) -> np.ndarray:
    """Convert Polars Series to NumPy array, zero-copy when possible.

    Falls back to regular conversion if zero-copy isn't possible.

    Raises:
        InvalidDataError: If input is not a pl.Series.
    """
    if not isinstance(series, pl.Series):
        raise InvalidDataError(
            f"Expected pl.Series, got {type(series).__name__}. "
            "Use DataFrame[column] to select a Series."
        )
    try:
        return series.to_numpy(allow_copy=False)
    except (ValueError, RuntimeError):
        return series.to_numpy()

to_polars(array, columns, index=None)

Convert NumPy array back to Polars DataFrame.

Raises:

Type Description
InvalidDataError

If array shape doesn't match columns length.

Source code in uncertainty_flow/utils/polars_bridge.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def to_polars(
    array: np.ndarray,
    columns: list[str],
    index: pl.Series | None = None,
) -> pl.DataFrame:
    """Convert NumPy array back to Polars DataFrame.

    Raises:
        InvalidDataError: If array shape doesn't match columns length.
    """
    if array.ndim == 1:
        if len(columns) != 1:
            raise InvalidDataError(f"1D array requires single column name, got {len(columns)}")
        array = array.reshape(-1, 1)

    if array.shape[1] != len(columns):
        raise InvalidDataError(
            f"Array has {array.shape[1]} columns but {len(columns)} column names provided"
        )

    df = pl.DataFrame(array, schema=columns, orient="row")

    if index is not None:
        if len(index) != len(df):
            raise InvalidDataError(
                f"Index length {len(index)} doesn't match DataFrame length {len(df)}"
            )
        index_map = dict(enumerate(index.to_list()))
        df = (
            df.with_row_index("__index__")
            .with_columns(pl.col("__index__").replace_strict(index_map))
            .drop("__index__")
        )

    return df

rolling_origin_splits(data, n_splits=5, min_train_size=50, horizon=1, gap=0)

Convenience function for rolling-origin (expanding window) splits.

Parameters:

Name Type Description Default
data DataFrame

DataFrame in temporal order.

required
n_splits int

Number of folds.

5
min_train_size int

Minimum training rows in the first fold.

50
horizon int

Test set size per fold.

1
gap int

Rows between train end and test start.

0

Returns:

Type Description
list[tuple[DataFrame, DataFrame]]

List of (train_df, test_df) tuples.

Source code in uncertainty_flow/utils/split.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def rolling_origin_splits(
    data: pl.DataFrame,
    n_splits: int = 5,
    min_train_size: int = 50,
    horizon: int = 1,
    gap: int = 0,
) -> list[tuple[pl.DataFrame, pl.DataFrame]]:
    """Convenience function for rolling-origin (expanding window) splits.

    Args:
        data: DataFrame in temporal order.
        n_splits: Number of folds.
        min_train_size: Minimum training rows in the first fold.
        horizon: Test set size per fold.
        gap: Rows between train end and test start.

    Returns:
        List of (train_df, test_df) tuples.
    """
    splitter = RollingOriginSplit(
        n_splits=n_splits,
        min_train_size=min_train_size,
        horizon=horizon,
        gap=gap,
    )
    return splitter.splits(data)

select_validation_plan(data, *, task_type, random_state=None, holdout_fraction=0.2, small_data_threshold=250, cv_splits=5, hybrid_mode=False, enable_logging=True, rolling_origin=False, rolling_min_train=50, rolling_horizon=1)

Select a deterministic validation split plan for tuning/evaluation.

Hybrid mode means: - time_series: temporal outer split + random out-of-sample inner split(s) on outer-train - tabular: random outer split + random out-of-sample inner split(s) on outer-train

When rolling_origin=True and task_type="time_series", the outer split uses a single temporal holdout (as before) and the inner splits use :class:RollingOriginSplit instead of random K-fold.

Source code in uncertainty_flow/utils/split.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def select_validation_plan(
    data: pl.DataFrame,
    *,
    task_type: Literal["tabular", "time_series"],
    random_state: int | None = None,
    holdout_fraction: float = 0.2,
    small_data_threshold: int = 250,
    cv_splits: int = 5,
    hybrid_mode: bool = False,
    enable_logging: bool = True,
    rolling_origin: bool = False,
    rolling_min_train: int = 50,
    rolling_horizon: int = 1,
) -> ValidationSplitPlan:
    """Select a deterministic validation split plan for tuning/evaluation.

    Hybrid mode means:
    - time_series: temporal outer split + random out-of-sample inner split(s) on outer-train
    - tabular: random outer split + random out-of-sample inner split(s) on outer-train

    When ``rolling_origin=True`` and ``task_type="time_series"``, the outer
    split uses a single temporal holdout (as before) and the inner splits
    use :class:`RollingOriginSplit` instead of random K-fold.
    """
    n_samples = len(data)
    n_splits = 1

    if task_type == "time_series":
        outer_train, outer_val = _build_temporal_holdout(data, holdout_fraction)
        inner_splits: list[tuple[pl.DataFrame, pl.DataFrame]] = []
        strategy_name = "temporal_holdout"
        reason = "time_series task defaults to temporal holdout"

        if rolling_origin and len(outer_train) >= rolling_min_train + rolling_horizon:
            n_avail = (len(outer_train) - rolling_min_train) // rolling_horizon
            inner_splits = RollingOriginSplit(
                n_splits=min(cv_splits, max(2, n_avail)),
                min_train_size=rolling_min_train,
                horizon=rolling_horizon,
            ).splits(outer_train)
            n_splits = len(inner_splits)
            strategy_name = "rolling_origin"
            reason = "time_series task with rolling-origin evaluation"
        elif hybrid_mode:
            inner_splits = _build_kfold_splits(
                outer_train,
                n_splits=min(cv_splits, max(2, len(outer_train) // 20)),
                random_state=random_state,
            )
            n_splits = len(inner_splits)
            strategy_name = "temporal_outer_plus_oos_inner_cv"
            reason = "hybrid mode enabled: temporal outer split with out-of-sample inner CV"
    else:
        outer_train, outer_val = _build_random_holdout(data, holdout_fraction, random_state)
        inner_splits = []
        strategy_name = "random_holdout"
        reason = "tabular task defaults to random holdout"
        if n_samples <= small_data_threshold:
            inner_splits = _build_kfold_splits(
                data,
                n_splits=min(cv_splits, max(2, n_samples // 20)),
                random_state=random_state,
            )
            n_splits = len(inner_splits)
            strategy_name = "kfold_cv"
            reason = "small tabular dataset uses CV for more stable tuning"
        if hybrid_mode:
            inner_splits = _build_kfold_splits(
                outer_train,
                n_splits=min(cv_splits, max(2, len(outer_train) // 20)),
                random_state=random_state,
            )
            n_splits = len(inner_splits)
            strategy_name = "random_outer_plus_oos_inner_cv"
            reason = "hybrid mode enabled: out-of-sample outer and inner validation"

    metadata = SplitPlanMetadata(
        strategy_name=strategy_name,
        reason=reason,
        n_samples=n_samples,
        n_splits=n_splits,
        holdout_fraction=holdout_fraction,
        random_state=random_state,
        hybrid_mode=hybrid_mode,
        task_type=task_type,
    )

    if enable_logging:
        logger.info(
            "validation_strategy strategy=%s reason=%s task_type=%s n_samples=%d n_splits=%d "
            "holdout_fraction=%.3f random_state=%s hybrid_mode=%s",
            metadata.strategy_name,
            metadata.reason,
            metadata.task_type,
            metadata.n_samples,
            metadata.n_splits,
            metadata.holdout_fraction,
            metadata.random_state,
            metadata.hybrid_mode,
        )
        for idx, (train_df, val_df) in enumerate(inner_splits, start=1):
            logger.debug(
                "validation_strategy_fold strategy=%s fold=%d train_rows=%d val_rows=%d",
                metadata.strategy_name,
                idx,
                len(train_df),
                len(val_df),
            )

    return ValidationSplitPlan(
        outer_split=(outer_train, outer_val),
        inner_splits=inner_splits,
        metadata=metadata,
    )