Model Pipeline Overview
Model Pipeline
The ModelPipeline class trains and evaluates forecasting models with rolling-window validation.
Basic Usage
from epftoolbox2.pipelines import ModelPipelinefrom epftoolbox2.models import OLSModel, LassoCVModelfrom epftoolbox2.evaluators import MAEEvaluatorfrom epftoolbox2.exporters import ExcelExporter, TerminalExporter
pipeline = ( ModelPipeline() .add_model(OLSModel(predictors=predictors, name="OLS")) .add_model(LassoCVModel(predictors=predictors, name="LassoCV")) .add_evaluator(MAEEvaluator()) .add_exporter(TerminalExporter()) .add_exporter(ExcelExporter("results.xlsx")))
report = pipeline.run( data=df, test_start="2024-02-01", test_end="2024-03-01", target="price", horizon=7, freq="1h", # Optional. Allows sub-hourly testing like "15min" save_dir="results",)Pipeline Components
Models
Evaluators
Exporters
Predictor Specification
Predictors can be specified in four ways:
predictors = [ "load_actual", "load_actual_d-1", "price_d-1",]Use {horizon} placeholder — replaced with 1, 2, …, horizon at runtime:
predictors = [ "warsaw_temperature_2m_d+{horizon}", "is_monday_d+{horizon}", "is_holiday_d+{horizon}", "daylight_hours_d+{horizon}",]Callables receive horizon (1 param) or (horizon, hour) (2 params):
# horizon-onlypredictors = [ lambda horizon: f"weather_d+{horizon}",]
# horizon + hour — different feature per hour of daypredictors = [ lambda horizon, hour: f"load_h{hour:02d}_d+{horizon}",]Use list comprehensions for many features:
predictors = [ "load_actual", *[f"load_actual_h-{i}" for i in range(1, 169)], *[f"price_d-{i}" for i in range(1, 8)],]Parallelism
Models use a process pool with inner thread pools. Each worker process handles one forecast day (24 hours × horizon), keeping results consistent and progress tracking stable.
| Variable | Default | Description |
|---|---|---|
MAX_PROCESSES | cpu_count // THREADS_PER_PROCESS | Worker processes |
THREADS_PER_PROCESS | 16 | Threads per process (alias: MAX_THREADS) |
import os
# Must be set before any importsos.environ["PYTHON_GIL"] = "0" # Enable free-threading (Python 3.13t+)os.environ["MAX_PROCESSES"] = "4"os.environ["THREADS_PER_PROCESS"] = "16" # → 4 × 16 = 64 coresos.environ["OMP_NUM_THREADS"] = "1" # Prevent BLAS oversubscriptionSee Installation for full setup including platform notes.
Feature Scaling
Models automatically apply StandardScaler:
-
Numeric features are standardized (mean=0, std=1)
-
Binary features (0/1) are auto-detected and skipped
-
Target variable is scaled, predictions are inverse-scaled
Run Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
data | DataFrame | Required | Input data with DatetimeIndex |
test_start | str | Required | Test period start |
test_end | str | Required | Test period end |
target | str | "price" | Target column name |
horizon | int | 7 | Max forecast horizon (days) |
freq | str | "1h" | Forecasting frequency (e.g., "15min") |
save_dir | str | None | Directory for incremental results |
forecast_only | bool | False | Skip evaluators/exporters; test_start/test_end default to "today" if omitted |
EvaluationReport
The pipeline returns an EvaluationReport:
report.summary() # Overall metricsreport.by_hour() # Breakdown by hourreport.by_horizon() # Breakdown by horizonreport.by_hour_horizon() # Combined breakdownYAML Serialization
Save and reload a complete pipeline configuration:
pipeline.save("model_pipeline.yaml")pipeline = ModelPipeline.load("model_pipeline.yaml")See ModelPipeline Serialization for the full YAML format and serialization rules.