ML Lifecycle Management¶
This notebook is based around a multi-class classification task using the iris dataset. The aim is to demonstrate how MLflow can be used to:
- track training metrics
- manage model persistence
- deploy scoring services
Imports¶
All package imports are declared in this section below.
import os
import random
import shutil
from datetime import datetime
from random import randint
from urllib.request import urlopen
import mlflow
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score, balanced_accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from tqdm import tqdm
Starting the Tracking Server¶
In order to enable all MLflow features, it is necessary to start a MLflow tracking server backed by a database and a filesystem. To do this, open a shell and execute the following command,
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root mlruns \
--host 127.0.0.1
Which will create a SQLite database and the mlruns
directory, both locally, before starting the tracking server.
Configuring the MLflow Client¶
- set the MLFlow Pyton client to use the local tracking server we've just started.
- create an MLFlow
experiment
for tracking run and models associated with our ML task and then set it as the global default for this session. - define a model name constant.
mlflow.set_tracking_uri('http://127.0.0.1:5000')
mlflow.set_experiment('Iris Classification')
MODEL_NAME = 'iris_classifier'
INFO: 'Iris Classification' does not exist. Creating a new experiment
Download Data¶
Currently hosted as a CSV file on AWS S3 object storage.
data_url = (
'http://bodywork-ml-ops-project'
'.s3.eu-west-2.amazonaws.com/data/iris_classification_data.csv'
)
data = pd.read_csv(urlopen(data_url))
data
sepal length (cm) | sepal width (cm) | petal length (cm) | petal width (cm) | species | |
---|---|---|---|---|---|
0 | 5.1 | 3.5 | 1.4 | 0.2 | setosa |
1 | 4.9 | 3.0 | 1.4 | 0.2 | setosa |
2 | 4.7 | 3.2 | 1.3 | 0.2 | setosa |
3 | 4.6 | 3.1 | 1.5 | 0.2 | setosa |
4 | 5.0 | 3.6 | 1.4 | 0.2 | setosa |
... | ... | ... | ... | ... | ... |
145 | 6.7 | 3.0 | 5.2 | 2.3 | virginica |
146 | 6.3 | 2.5 | 5.0 | 1.9 | virginica |
147 | 6.5 | 3.0 | 5.2 | 2.0 | virginica |
148 | 6.2 | 3.4 | 5.4 | 2.3 | virginica |
149 | 5.9 | 3.0 | 5.1 | 1.8 | virginica |
150 rows × 5 columns
Data Preparation¶
- extract class labels
- split features from labels
feature_columns = [
'sepal length (cm)',
'sepal width (cm)',
'petal length (cm)',
'petal width (cm)'
]
label_column = 'species'
species_to_class_map = {'setosa': 0, 'versicolor': 1, 'virginica': 2}
X = data[feature_columns].values
y = data[label_column].apply(lambda e: species_to_class_map[e]).values
Split Data into Train and Test Subsets¶
X_train, X_test, y_train, y_test = train_test_split(
X,
y,
test_size=0.2,
stratify=y,
random_state=42
)
Logging Model Performance Metrics¶
- compute accuracy and the f1-score
- log the metrics to the MLflow tracking server
def log_metrics(y_actual, y_predicted) -> None:
time_now = datetime.now().isoformat(timespec='seconds')
accuracy = balanced_accuracy_score(
y_actual,
y_predicted,
adjusted=True
)
f1 = f1_score(
y_actual,
y_predicted,
average='weighted'
)
mlflow.log_metric('accuracy', accuracy)
mlflow.log_metric('f1', f1)
Training a Model¶
We start by writing a function that will train a decision tree classifier, given a choice of two hyper-parameters.
def train_model(
X: np.ndarray,
y: np.ndarray,
max_depth: int,
random_state: int
) -> DecisionTreeClassifier:
"""Train a single model, given hyper-parameters."""
iris_tree_classifier = DecisionTreeClassifier(
class_weight='balanced',
random_state=random_state,
max_depth=max_depth
)
iris_tree_classifier.fit(X_train, y_train)
return iris_tree_classifier
Hyper-Parameter Search¶
Within a single MLflow 'parent' training run, we create 'child' runs to encapsulate models trained using randomly generated hyper-paraeters. Each child run logs the parameters used and the associated performance metrics, to MLflow.
Once all the models are trained, we search for the best performing set of parameters and use them to train a model on the full dataset, which is then logged to the MLflow model registry and transitioned to Production
. We also persist some ad hoc model metadata, in this case the feature names and class-name-to-label mapping, as text files.
with mlflow.start_run(run_name='Best Model') as parent_run:
for _ in tqdm(range(10)):
with mlflow.start_run(run_name='Candidate Run', nested=True) as child_run:
max_depth = random.randint(1, 4)
random_state = random.randint(1, 100)
mlflow.log_param('random_state', random_state)
mlflow.log_param('max_depth', max_depth)
trained_model = train_model(X_train, y_train, max_depth, random_state)
log_metrics(y_test, trained_model.predict(X_test))
# get best model parameters
best_run = (
mlflow.search_runs(parent_run.info.experiment_id)
.sort_values(by=['metrics.f1', 'metrics.accuracy'], ascending=False)
[:1]
)
best_f1 = float(best_run['metrics.f1'])
best_accuracy = float(best_run['metrics.accuracy'])
best_max_depth = int(best_run['params.max_depth'])
best_random_state = int(best_run['params.random_state'])
best_model = train_model(X, y, best_max_depth, best_random_state)
# log best model parameters
mlflow.log_param('best_max_depth', best_max_depth)
mlflow.log_param('best_random_state', best_random_state)
mlflow.log_metric('best_f1', best_f1)
mlflow.log_metric('best_accuracy', best_accuracy)
mlflow.set_tag('model_estimated_on_full_dataset', "true")
# persist additional metadata
with open('features.txt', 'w') as f:
f.write(', '.join(feature_columns))
mlflow.log_artifact('features.txt')
with open('class_labels.txt', 'w') as f:
f.write(', '.join(f'{k}: {v}' for k, v in species_to_class_map.items()))
mlflow.log_artifact('class_labels.txt')
# train final model using best parameters on full dataset
mlflow.sklearn.log_model(sk_model=best_model, artifact_path=MODEL_NAME)
new_model_metadata = mlflow.register_model(
model_uri=f'runs:/{parent_run.info.run_id}/{MODEL_NAME}',
name=MODEL_NAME
)
# push new model to production
mlflow.tracking.MlflowClient().transition_model_version_stage(
name=MODEL_NAME,
version=int(new_model_metadata.version),
stage='Production'
)
100%|██████████| 10/10 [00:02<00:00, 4.72it/s] Successfully registered model 'iris_classifier'. 2020/12/23 17:37:43 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: iris_classifier, version 1 Created version '1' of model 'iris_classifier'.
Retrieving Models from the Registry¶
We test that the best model found in our training run has been correctly persisted to the MLflow model registry, by loading the latest version available from Production
.
model = mlflow.sklearn.load_model(model_uri=f'models:/{MODEL_NAME}/Production')
model
DecisionTreeClassifier(class_weight='balanced', max_depth=3, random_state=35)
Serve Predictions¶
We would like to make the model available as a scoring service with a REST API. Assuming that there is a Conda-copatible version of Python installed locally, then this can be achieved from a new shell using the command below,
MLFLOW_TRACKING_URI="http://127.0.0.1:5000" \
MLFLOW_CONDA_HOME="/Users/alexioannides/opt/anaconda3" \
mlflow models serve -m "models:/iris_classifier/Production" -
Test the Model-Scoring Service¶
To test the model scoring service started above, open yet another new shell and use the curl tool to issue a HTTP request,
curl http://127.0.0.1:5001/invocations -H 'Content-Type: application/json' -d '{
"columns": ["a", "b", "c", "d"],
"data": [[5.1, 3.5, 1.4, 0.2]]
}'
Which will return the predicted class label from the model.
Clean-Up¶
Stop the scoring service and model tracking server and then delete:
- the SQLite database used by the tracking server.
- the local directory used by the tracking server to persist models and artefacts.
- all temporary artefact files.
os.remove('mlflow.db')
os.remove('features.txt')
os.remove('class_labels.txt')
shutil.rmtree('mlruns', ignore_errors=True)