Skip to content

Workflow

Workflow

Workflow ties a DataPipeline YAML, a ModelPipeline YAML, and all run parameters together into a single experiment file.

Quick Example

from epftoolbox2.pipelines import Workflow
# Phase 1: save config (once, on your laptop)
Workflow(
data_pipeline="data_pipeline.yaml",
model_pipeline="model_pipeline.yaml",
data_start="2023-05-01",
data_end="2024-08-01",
data_cache=True,
model_test_start="2024-06-01",
model_test_end="2024-07-01",
model_target="price",
model_horizon=7,
max_processes=32,
threads_per_process=8,
cache_path="/tmp/scratch/$SLURM_JOB_ID/cache",
).save("experiment.yaml")
# Phase 2: run (cluster job)
report = Workflow.load("experiment.yaml").run()
print(report.summary())

YAML Format

environment:
max_processes: 32
threads_per_process: 8
cache_path: /tmp/scratch/$SLURM_JOB_ID/cache # $VAR expanded at runtime
data_pipeline:
path: data_pipeline.yaml # relative to this file's directory
start: "2023-05-01"
end: "2024-08-01"
cache: true
model_pipeline:
path: model_pipeline.yaml
test_start: "2024-06-01"
test_end: "2024-07-01"
target: price
horizon: 7
freq: 1h

Forecast-only variant — add forecast_only: true and use date keywords so the file runs daily without editing:

data_pipeline:
path: data_pipeline.yaml
start: "now_d-735" # ~2 years of history, relative to today
end: "today_d+7" # fetch weather/calendar through the forecast window
cache: true
model_pipeline:
path: model_pipeline.yaml
target: price
horizon: 7
freq: 1h # set to 15min for 15-minute forecasting
forecast_only: true # test_start/test_end default to "today"; skips evaluators and exporters

Constructor Parameters

ParameterTypeDefaultDescription
data_startstrRequiredData fetch start date
data_endstrRequiredData fetch end date
model_test_startstrRequiredTest period start
model_test_endstrRequiredTest period end
data_pipelinestr | PathNonePath to data_pipeline.yaml
model_pipelinestr | PathNonePath to model_pipeline.yaml
data_cachebool | strFalseData caching (True, False, or path)
model_targetstr"price"Target column name
model_horizonint7Max forecast horizon (days)
model_freqstr"1h"Forecasting frequency (e.g., "15min")
max_processesintNoneWorker process count
threads_per_processintNoneThreads per process
cache_pathstrNoneCache directory (overrides data_cache if set)
model_indexintNoneRun only models[model_index] — all models run if omitted
model_forecast_onlyboolFalseSkip evaluators/exporters; test_start/test_end default to "today" if omitted

Environment Variables

Workflow.run() always sets these before spawning any worker processes:

VariableValueWhy
OMP_NUM_THREADS1Prevents BLAS oversubscription with multiple processes
MKL_NUM_THREADS1Pins Intel MKL threads
OPENBLAS_NUM_THREADS1Pins OpenBLAS threads
BLAS_NUM_THREADS1Generic BLAS thread pin
PYTHON_GIL0Enables free-threading on Python 3.13t+
MAX_PROCESSESfrom configApplied if max_processes is set
THREADS_PER_PROCESSfrom configApplied if threads_per_process is set

Inline Component Overrides

Pipeline sections can be specified directly in the workflow YAML, overriding the corresponding section from the referenced path. If no path is given, the pipeline is built entirely from inline sections.

data_pipeline:
path: data_pipeline.yaml
start: "2023-05-01"
end: "2024-08-01"
cache: true
transformers: # replaces transformers from data_pipeline.yaml
- class: LagTransformer
params:
columns: [load_actual, price]
lags: [-7, -6, -5, -4, -3, -2, -1, 0]
freq: day

Override semantics: inline sections replace the entire corresponding section from the loaded file — not merge. Sources + validators from path are kept unless also overridden.


Full Workflow Setup

  1. Build and save pipeline configs

    import os
    os.environ["PYTHON_GIL"] = "0"
    from epftoolbox2.pipelines import DataPipeline, ModelPipeline
    from epftoolbox2.data.sources import EntsoeSource, OpenMeteoSource, CalendarSource
    from epftoolbox2.data.transformers import ResampleTransformer, LagTransformer
    from epftoolbox2.data.validators import NullCheckValidator
    from epftoolbox2.models import OLSModel, LassoCVModel
    from epftoolbox2.evaluators import MAEEvaluator
    from epftoolbox2.exporters import ExcelExporter, TerminalExporter
    predictors = [
    "load_actual",
    "is_monday_d+{horizon}", "is_tuesday_d+{horizon}",
    "is_wednesday_d+{horizon}", "is_thursday_d+{horizon}",
    "is_friday_d+{horizon}", "is_saturday_d+{horizon}",
    "is_sunday_d+{horizon}", "is_holiday_d+{horizon}",
    "daylight_hours_d+{horizon}",
    "load_actual_d-1", "load_actual_d-2",
    "price_d-1", "price_d-2",
    "warsaw_temperature_2m_d+{horizon}",
    ]
    (
    DataPipeline()
    .add_source(EntsoeSource(country_code="PL", api_key=os.environ["ENTSOE_API_KEY"], type=["load", "price"]))
    .add_source(OpenMeteoSource(latitude=52.2297, longitude=21.0122, horizon=7, prefix="warsaw"))
    .add_source(CalendarSource(country="PL", holidays="binary", daylight=True, weekday="onehot"))
    .add_transformer(ResampleTransformer(freq="1h"))
    .add_transformer(LagTransformer(columns=["load_actual", "price"], lags=[1, 2, 7], freq="day"))
    .add_transformer(LagTransformer(lags=range(-7, 1), freq="day", columns=["is_monday", "daylight_hours", "load_forecast", "is_holiday"]))
    .add_validator(NullCheckValidator(columns=["load_actual", "price"]))
    .save("data_pipeline.yaml")
    )
    (
    ModelPipeline()
    .add_model(OLSModel(predictors=predictors, training_window=365, name="OLS"))
    .add_model(LassoCVModel(predictors=predictors, training_window=365, cv=5, name="LassoCV"))
    .add_evaluator(MAEEvaluator())
    .add_exporter(TerminalExporter())
    .add_exporter(ExcelExporter("results.xlsx"))
    .save("model_pipeline.yaml")
    )
  2. Save workflow config

    from epftoolbox2.pipelines import Workflow
    Workflow(
    data_pipeline="data_pipeline.yaml",
    model_pipeline="model_pipeline.yaml",
    data_start="2023-05-01",
    data_end="2024-08-01",
    data_cache=True,
    model_test_start="2024-06-01",
    model_test_end="2024-07-01",
    model_target="price",
    model_horizon=7,
    max_processes=32,
    threads_per_process=8,
    cache_path="/tmp/cache",
    ).save("experiment.yaml")
  3. Run job

    import os
    os.environ["PYTHON_GIL"] = "0"
    from epftoolbox2.pipelines import Workflow
    if __name__ == "__main__":
    report = Workflow.load("experiment.yaml").run()
    print(report.summary())

Run only one model

model_index selects a single model by its 0-based position in the model list. When omitted, all models run. This enables submitting one job per model that allows further parallelization.

Workflow YAML for example SLURM cluster:

environment:
max_processes: 8
threads_per_process: 8
cache_path: /tmp/scratch/$SLURM_JOB_ID/cache
model_index: $SLURM_ARRAY_TASK_ID # 0-based, expanded at load() time

Learn More