Example Pipeline¶
Each stage of a DVC pipeline needs to be defined within an executable Python module, with the end-to-end pipeline defined within a YAML file. We demonstrate how to compose a two stage pipeline that first gets a dataset and then trains a model on this dataset, saving the model and metrics to local disk (so that the files can be tracked by DVC).
Shared Config¶
demos/dvc-pipelines/stages/config.py
"""
Pipeline stage configuration.
"""
DATASET_FILENAME = "artefacts/dataset.csv"
METRICS_FILENAME = "metrics/metrics.json"
MODEL_FILENAME = "artefacts/model.joblib"
Stage 1: Get Training Data¶
demos/dvc-pipelines/stages/get_data.py
"""
Stage that gets a dataset for training a ML model.
"""
import numpy as np
import pandas as pd
from config import DATASET_FILENAME
def run_stage() -> None:
x = np.random.standard_normal(1000)
y = 2.0 * x + 0.1 * np.random.standard_normal(1000)
df = pd.DataFrame({"y": y, "x": x})
df.to_csv(DATASET_FILENAME, index=False)
if __name__ == "__main__":
run_stage()
Stage 2: Train Model¶
demos/dvc-pipelines/stages/train_model.py
"""
Train regression model on dataset
"""
import joblib
import json
import pandas as pd
import yaml
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from config import DATASET_FILENAME, METRICS_FILENAME, MODEL_FILENAME
def run_stage() -> None:
params = yaml.safe_load(open("params.yaml"))["train"]
data = pd.read_csv(DATASET_FILENAME)
X_train, X_test, y_train, y_test = train_test_split(
data[["x"]], data["y"], random_state=params["random_state"]
)
model = LinearRegression()
model.fit(X_train, y_train)
joblib.dump(model, MODEL_FILENAME)
y_test_pred = model.predict(X_test)
mae = mean_absolute_error(y_test, y_test_pred)
with open(METRICS_FILENAME, "w") as metrics_file:
json.dump({"MAE": mae}, metrics_file, indent=4)
if __name__ == "__main__":
run_stage()
Pipeline Definition¶
demos/dvc-pipelines/dvc.yaml
stages:
get_data:
cmd: python stages/get_data.py
deps:
- stages/get_data.py
outs:
- artefacts/dataset.csv
train_model:
cmd: python stages/train_model.py
deps:
- artefacts/dataset.csv
- stages/get_data.py
params:
- train.random_state
outs:
- artefacts/model.joblib
metrics:
- metrics/metrics.json:
cache: false
Pipeline Parameters¶
demos/dvc-pipelines/params.yaml
train:
random_state: 42