Workflow
Workflow
Workflow ties a DataPipeline YAML, a ModelPipeline YAML, and all run parameters together into a single experiment file.
Quick Example
from epftoolbox2.pipelines import Workflow
# Phase 1: save config (once, on your laptop)Workflow( data_pipeline="data_pipeline.yaml", model_pipeline="model_pipeline.yaml", data_start="2023-05-01", data_end="2024-08-01", data_cache=True, model_test_start="2024-06-01", model_test_end="2024-07-01", model_target="price", model_horizon=7, max_processes=32, threads_per_process=8, cache_path="/tmp/scratch/$SLURM_JOB_ID/cache",).save("experiment.yaml")
# Phase 2: run (cluster job)report = Workflow.load("experiment.yaml").run()print(report.summary())YAML Format
environment: max_processes: 32 threads_per_process: 8 cache_path: /tmp/scratch/$SLURM_JOB_ID/cache # $VAR expanded at runtime
data_pipeline: path: data_pipeline.yaml # relative to this file's directory start: "2023-05-01" end: "2024-08-01" cache: true
model_pipeline: path: model_pipeline.yaml test_start: "2024-06-01" test_end: "2024-07-01" target: price horizon: 7 freq: 1hForecast-only variant — add forecast_only: true and use date keywords so the file runs daily without editing:
data_pipeline: path: data_pipeline.yaml start: "now_d-735" # ~2 years of history, relative to today end: "today_d+7" # fetch weather/calendar through the forecast window cache: true
model_pipeline: path: model_pipeline.yaml target: price horizon: 7 freq: 1h # set to 15min for 15-minute forecasting forecast_only: true # test_start/test_end default to "today"; skips evaluators and exportersConstructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
data_start | str | Required | Data fetch start date |
data_end | str | Required | Data fetch end date |
model_test_start | str | Required | Test period start |
model_test_end | str | Required | Test period end |
data_pipeline | str | Path | None | Path to data_pipeline.yaml |
model_pipeline | str | Path | None | Path to model_pipeline.yaml |
data_cache | bool | str | False | Data caching (True, False, or path) |
model_target | str | "price" | Target column name |
model_horizon | int | 7 | Max forecast horizon (days) |
model_freq | str | "1h" | Forecasting frequency (e.g., "15min") |
max_processes | int | None | Worker process count |
threads_per_process | int | None | Threads per process |
cache_path | str | None | Cache directory (overrides data_cache if set) |
model_index | int | None | Run only models[model_index] — all models run if omitted |
model_forecast_only | bool | False | Skip evaluators/exporters; test_start/test_end default to "today" if omitted |
Environment Variables
Workflow.run() always sets these before spawning any worker processes:
| Variable | Value | Why |
|---|---|---|
OMP_NUM_THREADS | 1 | Prevents BLAS oversubscription with multiple processes |
MKL_NUM_THREADS | 1 | Pins Intel MKL threads |
OPENBLAS_NUM_THREADS | 1 | Pins OpenBLAS threads |
BLAS_NUM_THREADS | 1 | Generic BLAS thread pin |
PYTHON_GIL | 0 | Enables free-threading on Python 3.13t+ |
MAX_PROCESSES | from config | Applied if max_processes is set |
THREADS_PER_PROCESS | from config | Applied if threads_per_process is set |
Inline Component Overrides
Pipeline sections can be specified directly in the workflow YAML, overriding the corresponding section from the referenced path. If no path is given, the pipeline is built entirely from inline sections.
data_pipeline: path: data_pipeline.yaml start: "2023-05-01" end: "2024-08-01" cache: true transformers: # replaces transformers from data_pipeline.yaml - class: LagTransformer params: columns: [load_actual, price] lags: [-7, -6, -5, -4, -3, -2, -1, 0] freq: daymodel_pipeline: path: model_pipeline.yaml test_start: "2024-06-01" test_end: "2024-07-01" models: # replaces models from model_pipeline.yaml - class: OLSModel params: predictors: [load_actual, price_d-1] training_window: 180 name: OLS_shortdata_pipeline: start: "2023-05-01" end: "2024-08-01" cache: true sources: - class: EntsoeSource params: country_code: PL api_key: YOUR_KEY type: [load, price] transformers: - class: ResampleTransformer params: {freq: 1h} validators: - class: NullCheckValidator params: columns: [load_actual, price]Override semantics: inline sections replace the entire corresponding section from the loaded file — not merge. Sources + validators from path are kept unless also overridden.
Full Workflow Setup
-
Build and save pipeline configs
import osos.environ["PYTHON_GIL"] = "0"from epftoolbox2.pipelines import DataPipeline, ModelPipelinefrom epftoolbox2.data.sources import EntsoeSource, OpenMeteoSource, CalendarSourcefrom epftoolbox2.data.transformers import ResampleTransformer, LagTransformerfrom epftoolbox2.data.validators import NullCheckValidatorfrom epftoolbox2.models import OLSModel, LassoCVModelfrom epftoolbox2.evaluators import MAEEvaluatorfrom epftoolbox2.exporters import ExcelExporter, TerminalExporterpredictors = ["load_actual","is_monday_d+{horizon}", "is_tuesday_d+{horizon}","is_wednesday_d+{horizon}", "is_thursday_d+{horizon}","is_friday_d+{horizon}", "is_saturday_d+{horizon}","is_sunday_d+{horizon}", "is_holiday_d+{horizon}","daylight_hours_d+{horizon}","load_actual_d-1", "load_actual_d-2","price_d-1", "price_d-2","warsaw_temperature_2m_d+{horizon}",](DataPipeline().add_source(EntsoeSource(country_code="PL", api_key=os.environ["ENTSOE_API_KEY"], type=["load", "price"])).add_source(OpenMeteoSource(latitude=52.2297, longitude=21.0122, horizon=7, prefix="warsaw")).add_source(CalendarSource(country="PL", holidays="binary", daylight=True, weekday="onehot")).add_transformer(ResampleTransformer(freq="1h")).add_transformer(LagTransformer(columns=["load_actual", "price"], lags=[1, 2, 7], freq="day")).add_transformer(LagTransformer(lags=range(-7, 1), freq="day", columns=["is_monday", "daylight_hours", "load_forecast", "is_holiday"])).add_validator(NullCheckValidator(columns=["load_actual", "price"])).save("data_pipeline.yaml"))(ModelPipeline().add_model(OLSModel(predictors=predictors, training_window=365, name="OLS")).add_model(LassoCVModel(predictors=predictors, training_window=365, cv=5, name="LassoCV")).add_evaluator(MAEEvaluator()).add_exporter(TerminalExporter()).add_exporter(ExcelExporter("results.xlsx")).save("model_pipeline.yaml")) -
Save workflow config
from epftoolbox2.pipelines import WorkflowWorkflow(data_pipeline="data_pipeline.yaml",model_pipeline="model_pipeline.yaml",data_start="2023-05-01",data_end="2024-08-01",data_cache=True,model_test_start="2024-06-01",model_test_end="2024-07-01",model_target="price",model_horizon=7,max_processes=32,threads_per_process=8,cache_path="/tmp/cache",).save("experiment.yaml") -
Run job
import osos.environ["PYTHON_GIL"] = "0"from epftoolbox2.pipelines import Workflowif __name__ == "__main__":report = Workflow.load("experiment.yaml").run()print(report.summary())
Run only one model
model_index selects a single model by its 0-based position in the model list. When omitted, all models run. This enables submitting one job per model that allows further parallelization.
Workflow YAML for example SLURM cluster:
environment: max_processes: 8 threads_per_process: 8 cache_path: /tmp/scratch/$SLURM_JOB_ID/cache model_index: $SLURM_ARRAY_TASK_ID # 0-based, expanded at load() time