Skip to content

Example Pipeline

Each stage of a DVC pipeline needs to be defined within an executable Python module, with the end-to-end pipeline defined within a YAML file. We demonstrate how to compose a two stage pipeline that first gets a dataset and then trains a model on this dataset, saving the model and metrics to local disk (so that the files can be tracked by DVC).

Shared Config

demos/dvc-pipelines/stages/config.py
"""
Pipeline stage configuration.
"""
DATASET_FILENAME = "artefacts/dataset.csv"
METRICS_FILENAME = "metrics/metrics.json"
MODEL_FILENAME = "artefacts/model.joblib"

Stage 1: Get Training Data

demos/dvc-pipelines/stages/get_data.py
"""
Stage that gets a dataset for training a ML model.
"""
import numpy as np
import pandas as pd

from config import DATASET_FILENAME


def run_stage() -> None:
    x = np.random.standard_normal(1000)
    y = 2.0 * x + 0.1 * np.random.standard_normal(1000)
    df = pd.DataFrame({"y": y, "x": x})
    df.to_csv(DATASET_FILENAME, index=False)


if __name__ == "__main__":
    run_stage()

Stage 2: Train Model

demos/dvc-pipelines/stages/train_model.py
"""
Train regression model on dataset
"""
import joblib
import json
import pandas as pd
import yaml
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split

from config import DATASET_FILENAME, METRICS_FILENAME, MODEL_FILENAME


def run_stage() -> None:
    params = yaml.safe_load(open("params.yaml"))["train"]
    data = pd.read_csv(DATASET_FILENAME)
    X_train, X_test, y_train, y_test = train_test_split(
        data[["x"]], data["y"], random_state=params["random_state"]
    )
    model = LinearRegression()
    model.fit(X_train, y_train)
    joblib.dump(model, MODEL_FILENAME)

    y_test_pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_test_pred)
    with open(METRICS_FILENAME, "w") as metrics_file:
        json.dump({"MAE": mae}, metrics_file, indent=4)


if __name__ == "__main__":
    run_stage()

Pipeline Definition

demos/dvc-pipelines/dvc.yaml
stages:
  get_data:
    cmd: python stages/get_data.py
    deps:
      - stages/get_data.py
    outs:
      - artefacts/dataset.csv
  train_model:
    cmd: python stages/train_model.py
    deps:
      - artefacts/dataset.csv
      - stages/get_data.py
    params:
      - train.random_state
    outs:
      - artefacts/model.joblib
    metrics:
      - metrics/metrics.json:
          cache: false

Pipeline Parameters

demos/dvc-pipelines/params.yaml
train:
  random_state: 42