= True LOCAL_MODE
Building Machine Learning Systems That Don’t Suck
+This notebook creates a SageMaker Pipeline to build an end-to-end Machine Learning system to solve the problem of classifying penguin species. With a SageMaker Pipeline, you can create, automate, and manage end-to-end Machine Learning workflows at scale.
+You can find more information about Amazon SageMaker in the Amazon SageMaker Developer Guide. The AWS Machine Learning Blog is an excellent source to stay up-to-date with SageMaker.
+This example uses the Penguins dataset.
+This notebook is part of the Machine Learning School program.
+Session 1 - Introduction and Initial Setup
+The machine learning system we’ll build during this program consists of four main pipelines: A training pipeline, an inference pipeline, a deployment pipeline, and a monitoring pipeline.
+Here is an architectural diagram showing how the system is structured:
+ +Throughout the sessions, we’ll build each of these pipelines. We’ll also build variations to show you different alternatives and best practices.
+Let’s start by setting up the environment and preparing to run the notebook.
+We can run this notebook in Local Mode to test some of the system components in your local environment. Unfortunately, not every component is supported in Local Mode.
+Setting the LOCAL_MODE
variable to True
will run every supported pipeline component locally. Setting the variable to False
will run the pipeline in SageMaker.
Let’s now load the environment variables we need to run the notebook.
+import os
+
+= os.environ["BUCKET"]
+ bucket = os.environ["ROLE"]
+ role
+= os.environ.get("COMET_API_KEY", None)
+ COMET_API_KEY = os.environ.get("COMET_PROJECT_NAME", None) COMET_PROJECT_NAME
If you are running the pipeline in Local Mode on an ARM64 machine (for example, on Apple Silicon), you will need to use a custom Docker image to train and evaluate the model. Let’s create a variable indicating if we are running on an ARM64 machine.
+# We can retrieve the architecture of the local
+# computer using the `uname -m` command.
+= !(uname -m)
+ architecture
+= architecture[0] == "arm64" IS_ARM64_ARCHITECTURE
Let’s create a configuration dictionary with different settings depending on whether we are running the pipeline in Local Mode. We’ll use this dictionary to configure the pipeline components.
+import sagemaker
+from sagemaker.workflow.pipeline_context import LocalPipelineSession, PipelineSession
+
+= PipelineSession(default_bucket=bucket) if not LOCAL_MODE else None
+ pipeline_session
+if LOCAL_MODE:
+= {
+ config "session": LocalPipelineSession(default_bucket=bucket),
+ "instance_type": "local",
+ # We need to use a custom Docker image when we run the pipeline
+ # in Local Model on an ARM64 machine.
+ "image": (
+ "sagemaker-tensorflow-toolkit-local" if IS_ARM64_ARCHITECTURE else None
+
+ ),
+ }else:
+= {
+ config "session": pipeline_session,
+ "instance_type": "ml.m5.xlarge",
+ "image": None,
+
+ }
+# These specific settings refer to the SageMaker
+# TensorFlow container we'll use.
+"framework_version"] = "2.12"
+ config["py_version"] = "py310" config[
Let’s now initialize a few variables that we’ll need throughout the notebook:
+import boto3
+
+= f"s3://{bucket}/penguins"
+ S3_LOCATION
+= sagemaker.session.Session()
+ sagemaker_session = boto3.client("sagemaker")
+ sagemaker_client = boto3.client("iam")
+ iam_client = boto3.Session().region_name region
Session 2 - Exploratory Data Analysis
+Let’s run Exploratory Data Analysis on the Penguins dataset. The goal of this session is to understand the data and the problem we are trying to solve.
+Let’s load the Penguins dataset:
+import numpy as np
+import pandas as pd
+
+= pd.read_csv(DATA_FILEPATH)
+ penguins penguins.head()
+ | species | +island | +culmen_length_mm | +culmen_depth_mm | +flipper_length_mm | +body_mass_g | +sex | +
---|---|---|---|---|---|---|---|
0 | +Adelie | +Torgersen | +39.1 | +18.7 | +181.0 | +3750.0 | +MALE | +
1 | +Adelie | +Torgersen | +39.5 | +17.4 | +186.0 | +3800.0 | +FEMALE | +
2 | +Adelie | +Torgersen | +40.3 | +18.0 | +195.0 | +3250.0 | +FEMALE | +
3 | +Adelie | +Torgersen | +NaN | +NaN | +NaN | +NaN | +NaN | +
4 | +Adelie | +Torgersen | +36.7 | +19.3 | +193.0 | +3450.0 | +FEMALE | +
We can see the dataset contains the following columns:
+-
+
species
: The species of a penguin. This is the column we want to predict.
+island
: The island where the penguin was found
+culmen_length_mm
: The length of the penguin’s culmen (bill) in millimeters
+culmen_depth_mm
: The depth of the penguin’s culmen in millimeters
+flipper_length_mm
: The length of the penguin’s flipper in millimeters
+body_mass_g
: The body mass of the penguin in grams
+sex
: The sex of the penguin
+
If you are curious, here is the description of a penguin’s culmen:
+Now, let’s get the summary statistics for the features in our dataset.
+="all") penguins.describe(include
+ | species | +island | +culmen_length_mm | +culmen_depth_mm | +flipper_length_mm | +body_mass_g | +sex | +
---|---|---|---|---|---|---|---|
count | +344 | +344 | +342.000000 | +342.000000 | +342.000000 | +342.000000 | +334 | +
unique | +3 | +3 | +NaN | +NaN | +NaN | +NaN | +3 | +
top | +Adelie | +Biscoe | +NaN | +NaN | +NaN | +NaN | +MALE | +
freq | +152 | +168 | +NaN | +NaN | +NaN | +NaN | +168 | +
mean | +NaN | +NaN | +43.921930 | +17.151170 | +200.915205 | +4201.754386 | +NaN | +
std | +NaN | +NaN | +5.459584 | +1.974793 | +14.061714 | +801.954536 | +NaN | +
min | +NaN | +NaN | +32.100000 | +13.100000 | +172.000000 | +2700.000000 | +NaN | +
25% | +NaN | +NaN | +39.225000 | +15.600000 | +190.000000 | +3550.000000 | +NaN | +
50% | +NaN | +NaN | +44.450000 | +17.300000 | +197.000000 | +4050.000000 | +NaN | +
75% | +NaN | +NaN | +48.500000 | +18.700000 | +213.000000 | +4750.000000 | +NaN | +
max | +NaN | +NaN | +59.600000 | +21.500000 | +231.000000 | +6300.000000 | +NaN | +
Let’s now display the distribution of values for the three categorical columns in our data:
+= penguins["species"].value_counts()
+ species_distribution = penguins["island"].value_counts()
+ island_distribution = penguins["sex"].value_counts()
+ sex_distribution
+print(species_distribution, end="\n\n")
+print(island_distribution, end="\n\n")
+print(sex_distribution)
species
+Adelie 152
+Gentoo 124
+Chinstrap 68
+Name: count, dtype: int64
+
+island
+Biscoe 168
+Dream 124
+Torgersen 52
+Name: count, dtype: int64
+
+sex
+MALE 168
+FEMALE 165
+. 1
+Name: count, dtype: int64
+The distribution of the categories in our data are:
+-
+
species
: There are 3 species of penguins in the dataset: Adelie (152
), Gentoo (124
), and Chinstrap (68
).
+island
: Penguins are from 3 islands: Biscoe (168
), Dream (124
), and Torgersen (52
).
+sex
: We have168
male penguins,165
female penguins, and1
penguin with an ambiguous gender (.
).
+
Let’s replace the ambiguous value in the sex
column with a null
value:
"sex"] = penguins["sex"].replace(".", np.nan)
+ penguins[
+# Let's display the new distribution of the column:
+= penguins["sex"].value_counts()
+ sex_distribution sex_distribution
sex
+MALE 168
+FEMALE 165
+Name: count, dtype: int64
+Next, let’s check for any missing values in the dataset.
+sum() penguins.isna().
species 0
+island 0
+culmen_length_mm 2
+culmen_depth_mm 2
+flipper_length_mm 2
+body_mass_g 2
+sex 11
+dtype: int64
+Let’s get rid of the missing values. For now, we are going to replace the missing values with the most frequent value in the column. Later, we’ll use a different strategy to replace missing numeric values.
+from sklearn.impute import SimpleImputer
+
+= SimpleImputer(strategy="most_frequent")
+ imputer = imputer.fit_transform(penguins)
+ penguins.iloc[:, :]
+# Let's display again the number of missing values:
+sum() penguins.isna().
species 0
+island 0
+culmen_length_mm 0
+culmen_depth_mm 0
+flipper_length_mm 0
+body_mass_g 0
+sex 0
+dtype: int64
+Let’s visualize the distribution of categorical features.
+import matplotlib.pyplot as plt
+
+= plt.subplots(3, 1, figsize=(6, 10))
+ fig, axs
+0].bar(species_distribution.index, species_distribution.values)
+ axs[0].set_ylabel("Count")
+ axs[0].set_title("Distribution of Species")
+ axs[
+1].bar(island_distribution.index, island_distribution.values)
+ axs[1].set_ylabel("Count")
+ axs[1].set_title("Distribution of Island")
+ axs[
+2].bar(sex_distribution.index, sex_distribution.values)
+ axs[2].set_ylabel("Count")
+ axs[2].set_title("Distribution of Sex")
+ axs[
+
+ plt.tight_layout() plt.show()
Let’s visualize the distribution of numerical columns.
+= plt.subplots(2, 2, figsize=(8, 6))
+ fig, axs
+0, 0].hist(penguins["culmen_length_mm"], bins=20)
+ axs[0, 0].set_ylabel("Count")
+ axs[0, 0].set_title("Distribution of culmen_length_mm")
+ axs[
+0, 1].hist(penguins["culmen_depth_mm"], bins=20)
+ axs[0, 1].set_ylabel("Count")
+ axs[0, 1].set_title("Distribution of culmen_depth_mm")
+ axs[
+1, 0].hist(penguins["flipper_length_mm"], bins=20)
+ axs[1, 0].set_ylabel("Count")
+ axs[1, 0].set_title("Distribution of flipper_length_mm")
+ axs[
+1, 1].hist(penguins["body_mass_g"], bins=20)
+ axs[1, 1].set_ylabel("Count")
+ axs[1, 1].set_title("Distribution of body_mass_g")
+ axs[
+
+ plt.tight_layout() plt.show()
Let’s display the covariance matrix of the dataset. The “covariance” measures how changes in one variable are associated with changes in a second variable. In other words, the covariance measures the degree to which two variables are linearly associated.
+=True) penguins.cov(numeric_only
+ | culmen_length_mm | +culmen_depth_mm | +flipper_length_mm | +body_mass_g | +
---|---|---|---|---|
culmen_length_mm | +29.679415 | +-2.516984 | +50.260588 | +2596.971151 | +
culmen_depth_mm | +-2.516984 | +3.877201 | +-16.108849 | +-742.660180 | +
flipper_length_mm | +50.260588 | +-16.108849 | +197.269501 | +9792.552037 | +
body_mass_g | +2596.971151 | +-742.660180 | +9792.552037 | +640316.716388 | +
Here are three examples of what we get from interpreting the covariance matrix below:
+-
+
- The positive covariance of 50.26 between culmen length and flippler length suggests that larger values of culmen length are associated with larger values of flipper length. As one increases, generally so does the other. +
- The positive covariance of 2596.97 between culmen length and body mass suggests that heavier penguins generally have longer culmens. There is a tendency for these two variables to increase together. +
- The negative covariance of -742.66 between culmen depth and body mass suggests a general tendency that penguins with deeper culmens weigh less. +
Let’s now display the correlation matrix. “Correlation” measures both the strength and direction of the linear relationship between two variables:
+=True) penguins.corr(numeric_only
+ | culmen_length_mm | +culmen_depth_mm | +flipper_length_mm | +body_mass_g | +
---|---|---|---|---|
culmen_length_mm | +1.000000 | +-0.234635 | +0.656856 | +0.595720 | +
culmen_depth_mm | +-0.234635 | +1.000000 | +-0.582472 | +-0.471339 | +
flipper_length_mm | +0.656856 | +-0.582472 | +1.000000 | +0.871302 | +
body_mass_g | +0.595720 | +-0.471339 | +0.871302 | +1.000000 | +
Here are three examples of what we get from interpreting the correlation matrix below:
+-
+
- Penguins that weight more tend to have longer flippers. +
- Penguins with a shallower culmen tend to have longer flippers. +
- Penguins with longer culmens tend to have longer flippers. +
Let’s display the distribution of species by island:
+= penguins["species"].unique()
+ unique_species
+= plt.subplots(figsize=(6, 6))
+ fig, ax for species in unique_species:
+= penguins[penguins["species"] == species]
+ data "island"], bins=5, alpha=0.5, label=species)
+ ax.hist(data[
+"Island")
+ ax.set_xlabel("Count")
+ ax.set_ylabel("Distribution of Species by Island")
+ ax.set_title(
+ ax.legend() plt.show()
Let’s display the distribution of species by sex.
+= plt.subplots(figsize=(6, 6))
+ fig, ax
+for species in unique_species:
+= penguins[penguins["species"] == species]
+ data "sex"], bins=3, alpha=0.5, label=species)
+ ax.hist(data[
+"Sex")
+ ax.set_xlabel("Count")
+ ax.set_ylabel("Distribution of Species by Sex")
+ ax.set_title(
+
+ ax.legend() plt.show()
Session 3 - Splitting and Transforming the Data
+In this session we’ll build a simple SageMaker Pipeline with one step to split and transform the data:
+ +We’ll use a Scikit-Learn Pipeline for the transformations, and a Processing Step with a SKLearnProcessor to execute a preprocessing script. Check the SageMaker Pipelines Overview for an introduction to the fundamental components of a SageMaker Pipeline.
+Step 1 - Creating the Preprocessing Script
+The first step we need in the pipeline is a Processing Step to run a script that will split and transform the data.
+This Processing Step will create a SageMaker Processing Job in the background, run the script, and upload the output to S3. You can use Processing Jobs to perform data preprocessing, post-processing, feature engineering, data validation, and model evaluation. Check the ProcessingStep SageMaker’s SDK documentation for more information.
+We will store the script in a folder called processing
and add it to the system path so we can later import it as a module.
/ "processing").mkdir(parents=True, exist_ok=True)
+ (CODE_FOLDER f"./{CODE_FOLDER}/processing"]) sys.path.extend([
Let’s now create the script:
+script.py+
import os
+import tarfile
+import tempfile
+from pathlib import Path
+
+import joblib
+import numpy as np
+import pandas as pd
+from sklearn.compose import ColumnTransformer, make_column_selector
+from sklearn.impute import SimpleImputer
+from sklearn.model_selection import train_test_split
+from sklearn.pipeline import make_pipeline
+from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler
+
+
+def preprocess(base_directory):
+ """Load the supplied data, split it and transform it."""
+ df = _read_data_from_input_csv_files(base_directory)
+
+ target_transformer = ColumnTransformer(
+ transformers=[("species", OrdinalEncoder(), [0])],
+ )
+
+ numeric_transformer = make_pipeline(
+ SimpleImputer(strategy="mean"),
+ StandardScaler(),
+ )
+
+ categorical_transformer = make_pipeline(
+ SimpleImputer(strategy="most_frequent"),
+ OneHotEncoder(),
+ )
+
+ features_transformer = ColumnTransformer(
+ transformers=[
+ (
+ "numeric",
+ numeric_transformer,
+ make_column_selector(dtype_exclude="object"),
+ ),
+ ("categorical", categorical_transformer, ["island"]),
+ ],
+ )
+
+ df_train, df_validation, df_test = _split_data(df)
+
+ _save_train_baseline(base_directory, df_train)
+ _save_test_baseline(base_directory, df_test)
+
+ y_train = target_transformer.fit_transform(
+ np.array(df_train.species.values).reshape(-1, 1),
+ )
+ y_validation = target_transformer.transform(
+ np.array(df_validation.species.values).reshape(-1, 1),
+ )
+ y_test = target_transformer.transform(
+ np.array(df_test.species.values).reshape(-1, 1),
+ )
+
+ df_train = df_train.drop("species", axis=1)
+ df_validation = df_validation.drop("species", axis=1)
+ df_test = df_test.drop("species", axis=1)
+
+ X_train = features_transformer.fit_transform(df_train) # noqa: N806
+ X_validation = features_transformer.transform(df_validation) # noqa: N806
+ X_test = features_transformer.transform(df_test) # noqa: N806
+
+ _save_splits(
+ base_directory,
+ X_train,
+ y_train,
+ X_validation,
+ y_validation,
+ X_test,
+ y_test,
+ )
+ _save_model(base_directory, target_transformer, features_transformer)
+
+
+def _read_data_from_input_csv_files(base_directory):
+ """Read the data from the input CSV files.
+
+ This function reads every CSV file available and
+ concatenates them into a single dataframe.
+ """
+ input_directory = Path(base_directory) / "input"
+ files = list(input_directory.glob("*.csv"))
+
+ if len(files) == 0:
+ message = f"The are no CSV files in {input_directory.as_posix()}/"
+ raise ValueError(message)
+
+ raw_data = [pd.read_csv(file) for file in files]
+ df = pd.concat(raw_data)
+
+ # Shuffle the data
+ return df.sample(frac=1, random_state=42)
+
+
+def _split_data(df):
+ """Split the data into train, validation, and test."""
+ df_train, temp = train_test_split(df, test_size=0.3)
+ df_validation, df_test = train_test_split(temp, test_size=0.5)
+
+ return df_train, df_validation, df_test
+
+
+def _save_train_baseline(base_directory, df_train):
+ """Save the untransformed training data to disk.
+
+ We will need the training data to compute a baseline to
+ determine the quality of the data that the model receives
+ when deployed.
+ """
+ baseline_path = Path(base_directory) / "train-baseline"
+ baseline_path.mkdir(parents=True, exist_ok=True)
+
+ df = df_train.copy().dropna()
+
+ # To compute the data quality baseline, we don't need the
+ # target variable, so we'll drop it from the dataframe.
+ df = df.drop("species", axis=1)
+
+ df.to_csv(baseline_path / "train-baseline.csv", header=True, index=False)
+
+
+def _save_test_baseline(base_directory, df_test):
+ """Save the untransformed test data to disk.
+
+ We will need the test data to compute a baseline to
+ determine the quality of the model predictions when deployed.
+ """
+ baseline_path = Path(base_directory) / "test-baseline"
+ baseline_path.mkdir(parents=True, exist_ok=True)
+
+ df = df_test.copy().dropna()
+
+ # We'll use the test baseline to generate predictions later,
+ # and we can't have a header line because the model won't be
+ # able to make a prediction for it.
+ df.to_csv(baseline_path / "test-baseline.csv", header=False, index=False)
+
+
+def _save_splits(
+ base_directory,
+ X_train, # noqa: N803
+ y_train,
+ X_validation, # noqa: N803
+ y_validation,
+ X_test, # noqa: N803
+ y_test,
+):
+ """Save data splits to disk.
+
+ This function concatenates the transformed features
+ and the target variable, and saves each one of the split
+ sets to disk.
+ """
+ train = np.concatenate((X_train, y_train), axis=1)
+ validation = np.concatenate((X_validation, y_validation), axis=1)
+ test = np.concatenate((X_test, y_test), axis=1)
+
+ train_path = Path(base_directory) / "train"
+ validation_path = Path(base_directory) / "validation"
+ test_path = Path(base_directory) / "test"
+
+ train_path.mkdir(parents=True, exist_ok=True)
+ validation_path.mkdir(parents=True, exist_ok=True)
+ test_path.mkdir(parents=True, exist_ok=True)
+
+ pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
+ pd.DataFrame(validation).to_csv(
+ validation_path / "validation.csv",
+ header=False,
+ index=False,
+ )
+ pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)
+
+
+def _save_model(base_directory, target_transformer, features_transformer):
+ """Save the Scikit-Learn transformation pipelines.
+
+ This function creates a model.tar.gz file that
+ contains the two transformation pipelines we built
+ to transform the data.
+ """
+ with tempfile.TemporaryDirectory() as directory:
+ joblib.dump(target_transformer, Path(directory) / "target.joblib")
+ joblib.dump(features_transformer, Path(directory) / "features.joblib")
+
+ model_path = Path(base_directory) / "model"
+ model_path.mkdir(parents=True, exist_ok=True)
+
+ with tarfile.open(f"{(model_path / 'model.tar.gz').as_posix()}", "w:gz") as tar:
+ tar.add(Path(directory) / "target.joblib", arcname="target.joblib")
+ tar.add(
+ Path(directory) / "features.joblib", arcname="features.joblib",
+ )
+
+
+if __name__ == "__main__":
+ preprocess(base_directory="/opt/ml/processing")
Let’s test the script to ensure everything is working as expected:
+Code
+import os
+import shutil
+import tarfile
+import tempfile
+
+import pytest
+from processing.script import preprocess
+
+
+@pytest.fixture(autouse=False)
+def directory():
+= tempfile.mkdtemp()
+ directory = Path(directory) / "input"
+ input_directory =True, exist_ok=True)
+ input_directory.mkdir(parents/ "data.csv")
+ shutil.copy2(DATA_FILEPATH, input_directory
+= Path(directory)
+ directory =directory)
+ preprocess(base_directory
+yield directory
+
+
+ shutil.rmtree(directory)
+
+def test_preprocess_generates_data_splits(directory):
+= os.listdir(directory)
+ output_directories
+assert "train" in output_directories
+ assert "validation" in output_directories
+ assert "test" in output_directories
+
+
+def test_preprocess_generates_baselines(directory):
+= os.listdir(directory)
+ output_directories
+assert "train-baseline" in output_directories
+ assert "test-baseline" in output_directories
+
+
+def test_preprocess_creates_two_models(directory):
+= directory / "model"
+ model_path = tarfile.open(model_path / "model.tar.gz", "r:gz")
+ tar
+assert "features.joblib" in tar.getnames()
+ assert "target.joblib" in tar.getnames()
+
+
+def test_splits_are_transformed(directory):
+= pd.read_csv(directory / "train" / "train.csv", header=None)
+ train = pd.read_csv(directory / "validation" / "validation.csv", header=None)
+ validation = pd.read_csv(directory / "test" / "test.csv", header=None)
+ test
+# After transforming the data, the number of features should be 7:
+ # * 3 - island (one-hot encoded)
+ # * 1 - culmen_length_mm = 1
+ # * 1 - culmen_depth_mm
+ # * 1 - flipper_length_mm
+ # * 1 - body_mass_g
+ = 7
+ number_of_features
+# The transformed splits should have an additional column for the target
+ # variable.
+ assert train.shape[1] == number_of_features + 1
+ assert validation.shape[1] == number_of_features + 1
+ assert test.shape[1] == number_of_features + 1
+
+
+def test_train_baseline_is_not_transformed(directory):
+= pd.read_csv(
+ baseline / "train-baseline" / "train-baseline.csv",
+ directory =None,
+ header
+ )
+= baseline.iloc[:, 0].unique()
+ island
+assert "Biscoe" in island
+ assert "Torgersen" in island
+ assert "Dream" in island
+
+
+def test_test_baseline_is_not_transformed(directory):
+= pd.read_csv(
+ baseline / "test-baseline" / "test-baseline.csv", header=None
+ directory
+ )
+= baseline.iloc[:, 1].unique()
+ island
+assert "Biscoe" in island
+ assert "Torgersen" in island
+ assert "Dream" in island
+
+
+def test_train_baseline_includes_header(directory):
+= pd.read_csv(directory / "train-baseline" / "train-baseline.csv")
+ baseline assert baseline.columns[0] == "island"
+
+
+def test_test_baseline_does_not_include_header(directory):
+= pd.read_csv(directory / "test-baseline" / "test-baseline.csv")
+ baseline assert baseline.columns[0] != "island"
Step 2 - Caching Configuration
+Several SageMaker Pipeline steps support caching. When a step runs, and dependending on the configured caching policy, SageMaker will try to reuse the result of a previous successful run of the same step. You can find more information about this topic in Caching Pipeline Steps.
+Let’s define a caching policy that we’ll reuse on every step:
+from sagemaker.workflow.steps import CacheConfig
+
+= CacheConfig(enable_caching=True, expire_after="15d") cache_config
Step 3 - Pipeline Configuration
+We can parameterize a SageMaker Pipeline to make it more flexible. In this case, we’ll use a parameter to pass the location of the dataset we want to process. We can execute the pipeline with different datasets by changing the value of this parameter. Check Pipeline Parameters for more information.
+from sagemaker.workflow.parameters import ParameterString
+from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
+
+= PipelineDefinitionConfig(use_custom_job_prefix=True)
+ pipeline_definition_config
+= ParameterString(
+ dataset_location ="dataset_location",
+ name=f"{S3_LOCATION}/data",
+ default_value )
Step 4 - Setting up the Processing Step
+Let’s now define the ProcessingStep that we’ll use in the pipeline to run the script that will split and transform the data.
+A processor gives the Processing Step information about the hardware and software that SageMaker should use to launch a Processing Job. To run the script we created, we need access to Scikit-Learn, so we can use the SKLearnProcessor processor that comes out-of-the-box with the SageMaker’s Python SDK.
+SageMaker manages the infrastructure of a Processing Job. It provisions resources for the duration of the job, and cleans up when it completes. The Processing Container image that SageMaker uses to run a Processing Job can either be a SageMaker built-in image or a custom image:
+ +The Data Processing with Framework Processors page discusses other built-in processors you can use. The Docker Registry Paths and Example Code page contains information about the available framework versions for each region.
+from sagemaker.sklearn.processing import SKLearnProcessor
+
+= SKLearnProcessor(
+ processor ="preprocess-data",
+ base_job_name="1.2-1",
+ framework_version# By default, a new account doesn't have access to `ml.m5.xlarge` instances.
+ # If you haven't requested a quota increase yet, you can use an
+ # `ml.t3.medium` instance type instead. This will work out of the box, but
+ # the Processing Job will take significantly longer than it should have.
+ # To get access to `ml.m5.xlarge` instances, you can request a quota
+ # increase under the Service Quotas section in your AWS account.
+ =config["instance_type"],
+ instance_type=1,
+ instance_count=role,
+ role=config["session"],
+ sagemaker_session )
Let’s now define the Processing Step that we’ll use in the pipeline.
+This step will specify the list of inputs that we’ll access from the preprocessing script. In this case, the input is the dataset we stored in S3. We also have a few outputs that we want SageMaker to capture when the Processing Job finishes.
+from sagemaker.processing import ProcessingInput, ProcessingOutput
+from sagemaker.workflow.steps import ProcessingStep
+
+= ProcessingStep(
+ preprocessing_step ="preprocess-data",
+ name=processor.run(
+ step_args=f"{(CODE_FOLDER / 'processing' / 'script.py').as_posix()}",
+ code=[
+ inputs
+ ProcessingInput(=dataset_location,
+ source="/opt/ml/processing/input",
+ destination
+ ),
+ ],=[
+ outputs
+ ProcessingOutput(="train",
+ output_name="/opt/ml/processing/train",
+ source=f"{S3_LOCATION}/preprocessing/train",
+ destination
+ ),
+ ProcessingOutput(="validation",
+ output_name="/opt/ml/processing/validation",
+ source=f"{S3_LOCATION}/preprocessing/validation",
+ destination
+ ),
+ ProcessingOutput(="test",
+ output_name="/opt/ml/processing/test",
+ source=f"{S3_LOCATION}/preprocessing/test",
+ destination
+ ),
+ ProcessingOutput(="model",
+ output_name="/opt/ml/processing/model",
+ source=f"{S3_LOCATION}/preprocessing/model",
+ destination
+ ),
+ ProcessingOutput(="train-baseline",
+ output_name="/opt/ml/processing/train-baseline",
+ source=f"{S3_LOCATION}/preprocessing/train-baseline",
+ destination
+ ),
+ ProcessingOutput(="test-baseline",
+ output_name="/opt/ml/processing/test-baseline",
+ source=f"{S3_LOCATION}/preprocessing/test-baseline",
+ destination
+ ),
+ ],
+ ),=cache_config,
+ cache_config )
Step 5 - Creating the Pipeline
+We can now create the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+from sagemaker.workflow.pipeline import Pipeline
+
+= Pipeline(
+ session3_pipeline ="session3-pipeline",
+ name=[dataset_location],
+ parameters=[
+ steps
+ preprocessing_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session3_pipeline.upsert(role_arn
Session 4 - Training the Model
+This session extends the SageMaker Pipeline with a step to train a model. Check Train a Model with TensorFlow for more information about training a model with TensorFlow.
+ +We’ll also introduce experiment tracking using Amazon SageMaker Experiments and Comet.
+Step 1 - Creating the Training Script
+Let’s create the training script. This script is responsible for training a neural network using the train data, validating the model, and saving it so we can later use it.
+We will store the script in a folder called training
and add it to the system path so we can later import it as a module.
/ "training").mkdir(parents=True, exist_ok=True)
+ (CODE_FOLDER f"./{CODE_FOLDER}/training"]) sys.path.extend([
We can now create the script inside the folder:
+script.py+
import argparse
+import json
+import os
+import tarfile
+
+from pathlib import Path
+from comet_ml import Experiment
+
+import keras
+import numpy as np
+import pandas as pd
+from keras import Input
+from keras.layers import Dense
+from keras.models import Sequential
+from keras.optimizers import SGD
+from packaging import version
+from sklearn.metrics import accuracy_score
+
+
+def train(
+ model_directory,
+ train_path,
+ validation_path,
+ pipeline_path,
+ experiment,
+ epochs=50,
+ batch_size=32,
+):
+ print(f"Keras version: {keras.__version__}")
+
+ X_train = pd.read_csv(Path(train_path) / "train.csv")
+ y_train = X_train[X_train.columns[-1]]
+ X_train = X_train.drop(X_train.columns[-1], axis=1)
+
+ X_validation = pd.read_csv(Path(validation_path) / "validation.csv")
+ y_validation = X_validation[X_validation.columns[-1]]
+ X_validation = X_validation.drop(X_validation.columns[-1], axis=1)
+
+ model = Sequential(
+ [
+ Input(shape=(X_train.shape[1],)),
+ Dense(10, activation="relu"),
+ Dense(8, activation="relu"),
+ Dense(3, activation="softmax"),
+ ]
+ )
+
+ model.compile(
+ optimizer=SGD(learning_rate=0.01),
+ loss="sparse_categorical_crossentropy",
+ metrics=["accuracy"],
+ )
+
+ model.fit(
+ X_train,
+ y_train,
+ validation_data=(X_validation, y_validation),
+ epochs=epochs,
+ batch_size=batch_size,
+ verbose=2,
+ )
+
+ predictions = np.argmax(model.predict(X_validation), axis=-1)
+ val_accuracy = accuracy_score(y_validation, predictions)
+ print(f"Validation accuracy: {val_accuracy}")
+
+ # Starting on version 3, Keras changed the model saving format.
+ # Since we are running the training script using two different versions
+ # of Keras, we need to check to see which version we are using and save
+ # the model accordingly.
+ model_filepath = (
+ Path(model_directory) / "001"
+ if version.parse(keras.__version__) < version.parse("3")
+ else Path(model_directory) / "penguins.keras"
+ )
+
+ model.save(model_filepath)
+
+ # Let's save the transformation pipelines inside the
+ # model directory so they get bundled together.
+ with tarfile.open(Path(pipeline_path) / "model.tar.gz", "r:gz") as tar:
+ tar.extractall(model_directory)
+
+ if experiment:
+ experiment.log_parameters(
+ {
+ "epochs": epochs,
+ "batch_size": batch_size,
+ }
+ )
+ experiment.log_dataset_hash(X_train)
+ experiment.log_confusion_matrix(
+ y_validation.astype(int), predictions.astype(int)
+ )
+ experiment.log_model("penguins", model_filepath.as_posix())
+
+
+if __name__ == "__main__":
+ # Any hyperparameters provided by the training job are passed to
+ # the entry point as script arguments.
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--epochs", type=int, default=50)
+ parser.add_argument("--batch_size", type=int, default=32)
+ args, _ = parser.parse_known_args()
+
+ # Let's create a Comet experiment to log the metrics and parameters
+ # of this training job.
+ comet_api_key = os.environ.get("COMET_API_KEY", None)
+ comet_project_name = os.environ.get("COMET_PROJECT_NAME", None)
+
+ experiment = (
+ Experiment(
+ project_name=comet_project_name,
+ api_key=comet_api_key,
+ auto_metric_logging=True,
+ auto_param_logging=True,
+ log_code=True,
+ )
+ if comet_api_key and comet_project_name
+ else None
+ )
+
+ training_env = json.loads(os.environ.get("SM_TRAINING_ENV", {}))
+ job_name = training_env.get("job_name", None) if training_env else None
+
+ # We want to use the SageMaker's training job name as the name
+ # of the experiment so we can easily recognize it.
+ if job_name and experiment:
+ experiment.set_name(job_name)
+
+ train(
+ # This is the location where we need to save our model.
+ # SageMaker will create a model.tar.gz file with anything
+ # inside this directory when the training script finishes.
+ model_directory=os.environ["SM_MODEL_DIR"],
+ # SageMaker creates one channel for each one of the inputs
+ # to the Training Step.
+ train_path=os.environ["SM_CHANNEL_TRAIN"],
+ validation_path=os.environ["SM_CHANNEL_VALIDATION"],
+ pipeline_path=os.environ["SM_CHANNEL_PIPELINE"],
+ experiment=experiment,
+ epochs=args.epochs,
+ batch_size=args.batch_size,
+ )
Let’s test the script to ensure everything is working as expected:
+Code
+import os
+import shutil
+import pytest
+import tempfile
+
+from processing.script import preprocess
+from training.script import train
+
+@pytest.fixture(scope="function", autouse=False)
+def directory():
+= tempfile.mkdtemp()
+ directory = Path(directory) / "input"
+ input_directory =True, exist_ok=True)
+ input_directory.mkdir(parents/ "data.csv")
+ shutil.copy2(DATA_FILEPATH, input_directory
+ = Path(directory)
+ directory
+ =directory)
+ preprocess(base_directory
+ train(=directory / "model",
+ model_directory=directory / "train",
+ train_path=directory / "validation",
+ validation_path=directory / "model",
+ pipeline_path=None,
+ experiment=1
+ epochs
+ )
+ yield directory
+
+
+ shutil.rmtree(directory)
+
+def test_train_bundles_model_assets(directory):
+= os.listdir(directory / "model")
+ bundle assert "001" in bundle
+
+ = os.listdir(directory / "model" / "001")
+ assets assert "saved_model.pb" in assets
+
+
+def test_train_bundles_transformation_pipelines(directory):
+= os.listdir(directory / "model")
+ bundle assert "target.joblib" in bundle
+ assert "features.joblib" in bundle
Step 2 - Setting up the Training Step
+We can now create a Training Step that we can add to the pipeline. This Training Step will create a SageMaker Training Job in the background, run the training script, and upload the output to S3. Check the TrainingStep SageMaker’s SDK documentation for more information.
+SageMaker manages the infrastructure of a Training Job. It provisions resources for the duration of the job, and cleans up when it completes. The Training Container image that SageMaker uses to run a Training Job can either be a SageMaker built-in image or a custom image.
+ +The Available Deep Learning Container Images page contains the list of available containers for each region.
+Our training script uses Comet to track metrics from the Training Job. We need to create a requirements.txt
file to install the Comet library in the training container.
requirements.txt+
comet_ml
SageMaker uses the concept of an Estimator to handle end-to-end training and deployment tasks. For this example, we will use the built-in TensorFlow Estimator to run the training script we wrote before.
+Notice the list of hyperparameters defined below. SageMaker will pass these hyperparameters as arguments to the entry point of the training script.
+We are going to use Comet and SageMaker Experiments to track metrics from the Training Job. SageMaker Experiments will use the list of metric definitions to decide which metrics to track and how to parse them from the Training Job logs. For more information, check Manage Machine Learning with Amazon SageMaker Experiments and the SageMaker Experiments’ SDK documentation.
+Here are the environment variables we need to set on the traininng container:
+-
+
COMET_API_KEY
: This is your Comet API key.
+COMET_PROJECT_NAME
: The name of the project where you want to track the experiments.
+
from sagemaker.tensorflow import TensorFlow
+
+= TensorFlow(
+ estimator ="training",
+ base_job_name="script.py",
+ entry_point=f"{(CODE_FOLDER / 'training').as_posix()}",
+ source_dir# SageMaker will pass these hyperparameters as arguments
+ # to the entry point of the training script.
+ ={
+ hyperparameters"epochs": 50,
+ "batch_size": 32,
+
+ },# SageMaker will create these environment variables on the
+ # Training Job instance.
+ ={
+ environment"COMET_API_KEY": COMET_API_KEY,
+ "COMET_PROJECT_NAME": COMET_PROJECT_NAME,
+
+ },# SageMaker will track these metrics as part of the experiment
+ # associated to this pipeline. The metric definitions tells
+ # SageMaker how to parse the values from the Training Job logs.
+ =[
+ metric_definitions"Name": "loss", "Regex": "loss: ([0-9\\.]+)"},
+ {"Name": "accuracy", "Regex": "accuracy: ([0-9\\.]+)"},
+ {"Name": "val_loss", "Regex": "val_loss: ([0-9\\.]+)"},
+ {"Name": "val_accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"},
+ {
+ ],=config["image"],
+ image_uri=config["framework_version"],
+ framework_version=config["py_version"],
+ py_version=config["instance_type"],
+ instance_type=1,
+ instance_count=True,
+ disable_profiler=False,
+ debugger_hook_config=config["session"],
+ sagemaker_session=role,
+ role )
We can now create a Training Step. This Training Step will create a SageMaker Training Job in the background, run the training script, and upload the output to S3. Check the TrainingStep SageMaker’s SDK documentation for more information.
+This step will receive the train and validation split from the preprocessing step as inputs.
+Here, we are using three input channels, train
, validation
, and pipeline
. SageMaker will automatically create an environment variable corresponding to each of these channels following the format SM_CHANNEL_[channel_name]
:
-
+
SM_CHANNEL_TRAIN
: This environment variable will contain the path to the training data coming from the preprocessing step.
+SM_CHANNEL_VALIDATION
: This environment variable will contain the path to the validation data comimng from the preprocessing step.
+SM_CHANNEL_PIPELINE
: This environment variable will contain the path to the transformation pipeline that we saved in the preprocessing step.
+
Notice that we are creating a function that we can later reuse to create a training step using a different estimator.
+from sagemaker.inputs import TrainingInput
+from sagemaker.workflow.steps import TrainingStep
+
+
+def create_training_step(estimator):
+"""Create a SageMaker TrainingStep using the provided estimator."""
+ return TrainingStep(
+ ="train-model",
+ name=estimator.fit(
+ step_args={
+ inputs"train": TrainingInput(
+ =preprocessing_step.properties.ProcessingOutputConfig.Outputs[
+ s3_data"train"
+
+ ].S3Output.S3Uri,="text/csv",
+ content_type
+ ),"validation": TrainingInput(
+ =preprocessing_step.properties.ProcessingOutputConfig.Outputs[
+ s3_data"validation"
+
+ ].S3Output.S3Uri,="text/csv",
+ content_type
+ ),"pipeline": TrainingInput(
+ =preprocessing_step.properties.ProcessingOutputConfig.Outputs[
+ s3_data"model"
+
+ ].S3Output.S3Uri,="application/tar+gzip",
+ content_type
+ ),
+ },
+ ),=cache_config,
+ cache_config
+ )
+
+= create_training_step(estimator) train_model_step
Step 3 - Creating the Pipeline
+Let’s define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session4_pipeline ="session4-pipeline",
+ name=[dataset_location],
+ parameters=[
+ steps
+ preprocessing_step,
+ train_model_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session4_pipeline.upsert(role_arn
Session 5 - Custom Training Container
+This session creates a custom Docker image to train the model and have full control of the environment where the training script runs.
+For this example, we’ll run the training script using Keras 3 with a JAX backend. Check Adapting your own Docker container to work with SageMaker for more information about using your own Docker containers.
+Step 1 - Preparing the Docker Image
+The first step is to copy the training script to a folder where we’ll prepare the Docker image. We are going to reuse the training script we created before, since it’s compatible with the latest version of Keras.
+import shutil
+
+/ "containers" / "training").mkdir(parents=True, exist_ok=True)
+ (CODE_FOLDER
+ shutil.copy2(/ "training" / "script.py",
+ CODE_FOLDER / "containers" / "training" / "train.py",
+ CODE_FOLDER )
PosixPath('code/containers/training/train.py')
+Since we are creating a new Docker image, we need to install the libraries we need in the training container. We’ll use a requirements.txt
file to install these libraries. Notice that we are installing jax
to run it as our backend.
The sagemaker-training
library contains the common functionality necessary to create a container compatible with SageMaker and its Python SDK.
We can now create the Dockerfile containing the instructions to build the training image. Notice how this image will automatically run the train.py
script when it starts.
To use JAX as the backend for our model, we need to set the KERAS_BACKEND
environment variable to jax
.
Dockerfile+
FROM python:3.10-slim
+
+RUN apt-get -y update && apt-get install -y --no-install-recommends \
+ python3 \
+ build-essential \
+ libssl-dev
+
+# Let's install the required Python packages from
+# the requirements.txt file.
+COPY requirements.txt .
+RUN pip install --user --upgrade pip
+RUN pip3 install -r requirements.txt
+
+# We are going to be running the training script
+# as the entrypoint of this container.
+COPY train.py /opt/ml/code/train.py
+ENV SAGEMAKER_PROGRAM train.py
+
+# We want to use JAX as the backend for Keras.
+ENV KERAS_BACKEND=jax
Step 2 - Building the Docker Image
+We can now build the Docker image using the docker build
command. We are going to define the name of this image using the IMAGE_NAME
variable.
= "keras-custom-training-container"
+ IMAGE_NAME
+if not LOCAL_MODE:
+# If we aren't running the code in Local Mode, we need
+ # to specify we want to build the Docker image for the
+ # linux/amd64 architecture before uploading it to ECR.
+ print("Building Docker image for linux/amd64 architecture...")
+
+!docker build --platform="linux/amd64" -t $IMAGE_NAME \
+ /containers/training/
+ $CODE_FOLDERelse:
+# If we are running in Local Mode, we can use the
+ # default Docker build command.
+ print("Building Docker image for arm64 architecture...")
+
+!docker build -t $IMAGE_NAME \
+ /containers/training/ $CODE_FOLDER
Step 3 - Pushing Docker Image to ECR
+We can now push the Docker image to Amazon Elastic Container Registry (ECR). This is a fully-managed Docker container registry where we can manage Docker container images. This step is necessary to make the image available to SageMaker when running the pipeline.
+=$2
+ algorithm_name=$(aws sts get-caller-identity --query Account --output text)
+ account
+# Get the region defined in the current configuration
+# (default to us-east-1 if none defined)
+=$(aws configure get region)
+ region=${region:-us-east-1}
+ region
+="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"
+ repository
+# We only want to push the Docker image to ECR if
+# we are not running in Local Mode.
+if [ $1 = "False" ]
+
+ then# Create the repository if it doesn't exist in ECR
+ -repositories \
+ aws ecr describe--repository-names "${algorithm_name}" > /dev/null 2>&1
+ if [ $? -ne 0 ]
+
+ then-repository \
+ aws ecr create--repository-name "${algorithm_name}" > /dev/null
+
+ fi
+# Get the login command from ECR to run the
+ # Docker push command.
+ -login-password \
+ aws ecr get--region ${region}|docker \
+ --username AWS --password-stdin ${repository}
+ login
+# Push the Docker image to the ECR repository
+
+ docker tag ${algorithm_name} ${repository}
+ docker push ${repository} fi
Step 4 - Setting up the Training Step
+Let’s now compute the name of the training image we’ll use to run the Training Job.
+If we are running in LOCAL_MODE
, we’ll use the name of the image we built before (IMAGE_NAME
). Otherwise, we’ll use the name of the image we pushed to ECR.
= boto3.client("sts").get_caller_identity().get("Account")
+ account_id = ":latest"
+ tag
+= "amazonaws.com"
+ uri_suffix if region in ["cn-north-1", "cn-northwest-1"]:
+= "amazonaws.com.cn"
+ uri_suffix
+= (
+ training_container_image
+ IMAGE_NAMEif LOCAL_MODE
+ else (f"{account_id}.dkr.ecr.{region}.amazonaws.com/{IMAGE_NAME}:latest")
+
+ )
+ training_container_image
'keras-custom-training-container'
+We can now create an Estimator and a Training Step using the function we created before.
+from sagemaker.estimator import Estimator
+
+= Estimator(
+ keras_estimator =training_container_image,
+ image_uri=1,
+ instance_count=config["instance_type"],
+ instance_type=config["session"],
+ sagemaker_session=role,
+ role
+ )
+= create_training_step(keras_estimator) keras_train_model_step
Step 5 - Creating the Pipeline
+Let’s define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session5_pipeline ="session5-pipeline",
+ name=[dataset_location],
+ parameters=[
+ steps
+ preprocessing_step,# This time we want to use the new training step
+ # we created using the custom Docker image.
+
+ keras_train_model_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session5_pipeline.upsert(role_arn
Session 6 - Tuning the Model
+This session extends the SageMaker Pipeline with a step to tune the model using a Hyperparameter Tuning Job.
+ +Step 1 - Enabling Tuning
+Since we could use the Training of the Tuning Step to create a model, we’ll define a constant to indicate which approach we want to run. Notice that the Tuning Step is not supported in Local Mode.
+= False USE_TUNING_STEP
Step 2 - Setting up a Tuning Step
+Let’s now create a Tuning Step. This Tuning Step will create a SageMaker Hyperparameter Tuning Job in the background and use the training script to train different model variants and choose the best one. Check the TuningStep SageMaker’s SDK documentation for more information.
+The Tuning Step requires a HyperparameterTuner reference to configure the Hyperparameter Tuning Job.
+Here is the configuration that we’ll use to find the best model:
+-
+
objective_metric_name
: This is the name of the metric the tuner will use to determine the best model.
+objective_type
: This is the objective of the tuner. It specifies whether it should minimize the metric or maximize it. In this example, since we are using the validation accuracy of the model, we want the objective to be “Maximize.” If we were using the loss of the model, we would set the objective to “Minimize.”
+metric_definitions
: Defines how the tuner will determine the metric’s value by looking at the output logs of the training process.
+
The tuner expects the list of the hyperparameters you want to explore. You can use subclasses of the Parameter class to specify different types of hyperparameters. This example explores different values for the epochs
hyperparameter.
Finally, you can control the number of jobs and how many of them will run in parallel using the following two arguments:
+-
+
max_jobs
: Defines the maximum total number of training jobs to start for the hyperparameter tuning job.
+max_parallel_jobs
: Defines the maximum number of parallel training jobs to start.
+
from sagemaker.parameter import IntegerParameter
+from sagemaker.tuner import HyperparameterTuner
+
+= HyperparameterTuner(
+ tuner
+ estimator,="val_accuracy",
+ objective_metric_name="Maximize",
+ objective_type={
+ hyperparameter_ranges"epochs": IntegerParameter(10, 50),
+
+ },=[{"Name": "val_accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"}],
+ metric_definitions=3,
+ max_jobs=3,
+ max_parallel_jobs )
We can now create the Tuning Step using the tuner we configured before. SageMaker will create a Hyperparameter Tuning Job in the background and use the training script to train different model variants and choose the best one.
+ +from sagemaker.workflow.steps import TuningStep
+
+= TuningStep(
+ tune_model_step ="tune-model",
+ name=tuner.fit(
+ step_args={
+ inputs"train": TrainingInput(
+ =preprocessing_step.properties.ProcessingOutputConfig.Outputs[
+ s3_data"train"
+
+ ].S3Output.S3Uri,="text/csv",
+ content_type
+ ),"validation": TrainingInput(
+ =preprocessing_step.properties.ProcessingOutputConfig.Outputs[
+ s3_data"validation"
+
+ ].S3Output.S3Uri,="text/csv",
+ content_type
+ ),"pipeline": TrainingInput(
+ =preprocessing_step.properties.ProcessingOutputConfig.Outputs[
+ s3_data"model"
+
+ ].S3Output.S3Uri,="application/tar+gzip",
+ content_type
+ ),
+ },
+ ),=cache_config,
+ cache_config )
Step 3 - Creating the Pipeline
+Let’s define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session6_pipeline ="session6-pipeline",
+ name=[dataset_location],
+ parameters=[
+ steps
+ preprocessing_step,
+ tune_model_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session6_pipeline.upsert(role_arn
Session 7 - Evaluating the Model
+This session extends the SageMaker Pipeline with a step to evaluate the model using the holdout set we created during the preprocessing step.
+ +Step 1 - Creating the Evaluation Script
+We’ll use a Processing Step to execute the evaluation script.
+This script is responsible for loading the model we created and evaluating it on the test set. Before finishing, this script will generate an evaluation report of the model.
+We will store the script in a folder called evaluation
and add it to the system path so we can later import it as a module.
/ "evaluation").mkdir(parents=True, exist_ok=True)
+ (CODE_FOLDER f"./{CODE_FOLDER}/evaluation"]) sys.path.extend([
We can now create the script inside the folder:
+script.py+
import json
+import tarfile
+from pathlib import Path
+
+import numpy as np
+import pandas as pd
+from sklearn.metrics import accuracy_score
+from tensorflow import keras
+
+
+def evaluate(model_path, test_path, output_path):
+ X_test = pd.read_csv(Path(test_path) / "test.csv")
+ y_test = X_test[X_test.columns[-1]]
+ X_test = X_test.drop(X_test.columns[-1], axis=1)
+
+ # Let's now extract the model package so we can load
+ # it in memory.
+ with tarfile.open(Path(model_path) / "model.tar.gz") as tar:
+ tar.extractall(path=Path(model_path))
+
+ model = keras.models.load_model(Path(model_path) / "001")
+
+ predictions = np.argmax(model.predict(X_test), axis=-1)
+ accuracy = accuracy_score(y_test, predictions)
+ print(f"Test accuracy: {accuracy}")
+
+ # Let's create an evaluation report using the model accuracy.
+ evaluation_report = {
+ "metrics": {
+ "accuracy": {"value": accuracy},
+ },
+ }
+
+ Path(output_path).mkdir(parents=True, exist_ok=True)
+ with open(Path(output_path) / "evaluation.json", "w") as f:
+ f.write(json.dumps(evaluation_report))
+
+
+if __name__ == "__main__":
+ evaluate(
+ model_path="/opt/ml/processing/model/",
+ test_path="/opt/ml/processing/test/",
+ output_path="/opt/ml/processing/evaluation/",
+ )
Let’s test the script to ensure everything is working as expected:
+Code
+import os
+import shutil
+import tarfile
+import pytest
+import tempfile
+
+from processing.script import preprocess
+from training.script import train
+from evaluation.script import evaluate
+
+
+@pytest.fixture(scope="function", autouse=False)
+def directory():
+= tempfile.mkdtemp()
+ directory = Path(directory) / "input"
+ input_directory =True, exist_ok=True)
+ input_directory.mkdir(parents/ "data.csv")
+ shutil.copy2(DATA_FILEPATH, input_directory
+= Path(directory)
+ directory
+=directory)
+ preprocess(base_directory
+
+ train(=directory / "model",
+ model_directory=directory / "train",
+ train_path=directory / "validation",
+ validation_path=directory / "model",
+ pipeline_path=None,
+ experiment=1,
+ epochs
+ )
+# After training a model, we need to prepare a package just like
+ # SageMaker would. This package is what the evaluation script is
+ # expecting as an input.
+ with tarfile.open(directory / "model.tar.gz", "w:gz") as tar:
+ / "model" / "001", arcname="001")
+ tar.add(directory
+
+ evaluate(=directory,
+ model_path=directory / "test",
+ test_path=directory / "evaluation",
+ output_path
+ )
+yield directory / "evaluation"
+
+
+ shutil.rmtree(directory)
+
+def test_evaluate_generates_evaluation_report(directory):
+= os.listdir(directory)
+ output assert "evaluation.json" in output
+
+
+def test_evaluation_report_contains_accuracy(directory):
+with open(directory / "evaluation.json", "r") as file:
+ = json.load(file)
+ report
+assert "metrics" in report
+ assert "accuracy" in report["metrics"]
Step 2 - Referencing the Model Assets
+One of the inputs to the evaluation step is the model coming from the Training or the Tuning step. We can use the USE_TUNING_STEP
flag to determine whether we created the model using a Training Step or a Tuning Step.
In case we are using the Tuning Step, we can use the TuningStep.get_top_model_s3_uri() function to get the model assets from the top performing training job of the Hyperparameter Tuning Job.
+= train_model_step.properties.ModelArtifacts.S3ModelArtifacts
+ model_assets
+if USE_TUNING_STEP:
+= tune_model_step.get_top_model_s3_uri(
+ model_assets =0,
+ top_k=config["session"].default_bucket(),
+ s3_bucket )
Step 3 - Mapping the Output to a Property File
+SageMaker supports mapping outputs from a Processing Step to property files. This is useful when we want to access a specific property from the pipeline.
+We’ll map the evaluation report to a property file. Check How to Build and Manage Property Files for more information.
+from sagemaker.workflow.properties import PropertyFile
+
+= PropertyFile(
+ evaluation_report ="evaluation-report",
+ name="evaluation",
+ output_name="evaluation.json",
+ path )
Step 4 - Setting up the Evaluation Step
+To run the evaluation script, we will use a Processing Step configured with a TensorFlowProcessor because the script needs access to TensorFlow.
+from sagemaker.tensorflow import TensorFlowProcessor
+
+= TensorFlowProcessor(
+ evaluation_processor ="evaluation-processor",
+ base_job_name=config["image"],
+ image_uri=config["framework_version"],
+ framework_version=config["py_version"],
+ py_version=config["instance_type"],
+ instance_type=1,
+ instance_count=role,
+ role=config["session"],
+ sagemaker_session )
We are now ready to define the Processing Step that will run the evaluation script:
+= ProcessingStep(
+ evaluate_model_step ="evaluate-model",
+ name=evaluation_processor.run(
+ step_args=f"{(CODE_FOLDER / 'evaluation' / 'script.py').as_posix()}",
+ code=[
+ inputs# The first input is the test split that we generated on
+ # the first step of the pipeline when we split and
+ # transformed the data.
+
+ ProcessingInput(=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
+ source"test"
+
+ ].S3Output.S3Uri,="/opt/ml/processing/test",
+ destination
+ ),# The second input is the model that we generated on
+ # the Training or Tunning Step.
+
+ ProcessingInput(=model_assets,
+ source="/opt/ml/processing/model",
+ destination
+ ),
+ ],=[
+ outputs# The output is the evaluation report that we generated
+ # in the evaluation script.
+
+ ProcessingOutput(="evaluation",
+ output_name="/opt/ml/processing/evaluation",
+ source
+ ),
+ ],
+ ),=[evaluation_report],
+ property_files=cache_config,
+ cache_config )
Step 5 - Creating the Pipeline
+Let’s define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session7_pipeline ="session7-pipeline",
+ name=[dataset_location],
+ parameters=[
+ steps
+ preprocessing_step,if USE_TUNING_STEP else train_model_step,
+ tune_model_step
+ evaluate_model_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session7_pipeline.upsert(role_arn
Session 8 - Registering the Model
+This session extends the SageMaker Pipeline with a step to register the model in the SageMaker Model Registry.
+ +Step 1 - Configuring the Model Package Group
+First, let’s define the name of the group where we’ll register the model. The Model Registry uses groups to organize the versions of a model:
+= "basic-penguins" BASIC_MODEL_PACKAGE_GROUP
Step 2 - Creating the Model
+Let’s now create the model that we’ll register in the Model Registry. The model we trained uses TensorFlow, so we can use the built-in TensorFlowModel class to create an instance of the model:
+from sagemaker.tensorflow.model import TensorFlowModel
+
+= TensorFlowModel(
+ tensorflow_model =model_assets,
+ model_data=config["framework_version"],
+ framework_version=config["session"],
+ sagemaker_session=role,
+ role )
Step 3 - Configuring Model Metrics
+When we register a model in the Model Registry, we can attach relevant metadata to it. We’ll use the evaluation report we generated during the evaluation step to populate the metrics of this model:
+from sagemaker.model_metrics import MetricsSource, ModelMetrics
+from sagemaker.workflow.functions import Join
+
+= ModelMetrics(
+ model_metrics =MetricsSource(
+ model_statistics=Join(
+ s3_uri="/",
+ on=[
+ values
+ evaluate_model_step.properties.ProcessingOutputConfig.Outputs["evaluation"
+
+ ].S3Output.S3Uri,"evaluation.json",
+
+ ],
+ ),="application/json",
+ content_type
+ ), )
Step 4 - Registering the Model
+We can use a Model Step to register the model. Check the ModelStep SageMaker’s SDK documentation for more information.
+from sagemaker.workflow.model_step import ModelStep
+
+
+def create_registration_step(
+
+ model,
+ model_package_group_name,="Approved",
+ approval_status=["text/csv"],
+ content_types=["application/json"],
+ response_types=None,
+ model_metrics=None,
+ drift_check_baselines
+ ):"""Create a Registration Step using the supplied parameters."""
+ return ModelStep(
+ ="register",
+ name=model.register(
+ step_args=model_package_group_name,
+ model_package_group_name=approval_status,
+ approval_status=model_metrics,
+ model_metrics=drift_check_baselines,
+ drift_check_baselines=content_types,
+ content_types=response_types,
+ response_types=[config["instance_type"]],
+ inference_instances=[config["instance_type"]],
+ transform_instances=config["framework_version"],
+ framework_version="MACHINE_LEARNING",
+ domain="CLASSIFICATION",
+ task="TENSORFLOW",
+ framework
+ ),
+ )
+
+= create_registration_step(
+ register_model_step
+ tensorflow_model,
+ BASIC_MODEL_PACKAGE_GROUP,=model_metrics,
+ model_metrics )
Step 5 - Creating the Pipeline
+Let’s define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session8_pipeline ="session8-pipeline",
+ name=[dataset_location],
+ parameters=[
+ steps
+ preprocessing_step,if USE_TUNING_STEP else train_model_step,
+ tune_model_step
+ evaluate_model_step,
+ register_model_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session8_pipeline.upsert(role_arn
Session 9 - Conditional Registration
+This session extends the SageMaker Pipeline with a condition to register the model only if its accuracy is above a predefined threshold.
+Here’s a high-level overview of the Condition Step:
+ +Step 1 - Configuring the Accuracy Threshold
+Let’s define a new Pipeline Parameter to specify the minimum accuracy that the model should reach for it to be registered.
+from sagemaker.workflow.parameters import ParameterFloat
+
+= ParameterFloat(name="accuracy_threshold", default_value=0.70) accuracy_threshold
Step 2 - Setting up a Fail Step
+If the model’s accuracy is not greater than or equal to our threshold, we will send the pipeline to a Fail Step with the appropriate error message. Check the FailStep SageMaker’s SDK documentation for more information.
+from sagemaker.workflow.fail_step import FailStep
+
+= FailStep(
+ fail_step ="fail",
+ name=Join(
+ error_message=" ",
+ on=[
+ values"Execution failed because the model's accuracy was lower than",
+
+ accuracy_threshold,
+ ],
+ ), )
Step 3 - Defining the Condition
+We can use a ConditionGreaterThanOrEqualTo condition to compare the model’s accuracy with the threshold. Look at the Conditions section in the documentation for more information about the types of supported conditions.
+from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
+from sagemaker.workflow.functions import JsonGet
+
+= ConditionGreaterThanOrEqualTo(
+ condition =JsonGet(
+ left=evaluate_model_step.name,
+ step_name=evaluation_report,
+ property_file="metrics.accuracy.value",
+ json_path
+ ),=accuracy_threshold,
+ right )
Step 4 - Setting up the Condition Step
+Let’s now use a Condition Step together with the evaluation report we generated to determine whether the model’s accuracy is above the threshold:
+from sagemaker.workflow.condition_step import ConditionStep
+
+= ConditionStep(
+ condition_step ="check-model-accuracy",
+ name=[condition],
+ conditions=[register_model_step],
+ if_steps=[fail_step],
+ else_steps )
Step 5 - Creating the Pipeline
+We can now define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session9_pipeline ="session9-pipeline",
+ name=[dataset_location, accuracy_threshold],
+ parameters=[
+ steps
+ preprocessing_step,if USE_TUNING_STEP else train_model_step,
+ tune_model_step
+ evaluate_model_step,
+ condition_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session9_pipeline.upsert(role_arn
Session 10 - Serving the Model
+This session builds a simple Flask application to serve the model.
+ +Keep in mind that, while good for development and testing, this is not the best approach for production systems.
+Step 1 - Retrieving List of Approved Models
+We want to serve the latest approved model from the Model Registry. We can use the boto3 API to get this model:
+= sagemaker_client.list_model_packages(
+ response =BASIC_MODEL_PACKAGE_GROUP,
+ ModelPackageGroupName="Approved",
+ ModelApprovalStatus="CreationTime",
+ SortBy=1,
+ MaxResults
+ )
+= (
+ package "ModelPackageSummaryList"][0]
+ response[if response["ModelPackageSummaryList"]
+ else None
+
+ )
+ package
{'ModelPackageGroupName': 'basic-penguins',
+ 'ModelPackageVersion': 6,
+ 'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:325223348818:model-package/basic-penguins/6',
+ 'CreationTime': datetime.datetime(2024, 3, 29, 11, 19, 48, 782000, tzinfo=tzlocal()),
+ 'ModelPackageStatus': 'Completed',
+ 'ModelApprovalStatus': 'Approved'}
+Step 2 - Downloading the Model
+Let’s now download the model assets from the location specified in the Model Registry to your local environment.
+We will store this model in a folder called serving
:
/ "serving").mkdir(parents=True, exist_ok=True) (CODE_FOLDER
Let’s now download the model assets into the folder:
+from sagemaker.s3 import S3Downloader
+
+if package:
+= sagemaker_client.describe_model_package(
+ response =package["ModelPackageArn"],
+ ModelPackageName
+ )
+= response["InferenceSpecification"]["Containers"][0]["ModelDataUrl"]
+ model_data / "serving").as_posix()) S3Downloader.download(model_data, (CODE_FOLDER
Step 3 - Creating the Serving Script
+Let’s now write a simple Flask script to serve the model.
+When this application receives the first request, it will unpack and load the model into memory. From there, it will use the model to make predictions on the incoming requests.
+app.py+
import tarfile
+import tempfile
+import numpy as np
+
+from flask import Flask, request, jsonify
+from pathlib import Path
+from tensorflow import keras
+
+
+MODEL_PATH = Path(__file__).parent
+
+
+class Model:
+ model = None
+
+ def load(self):
+ """
+ Extracts the model package and loads the model in memory
+ if it hasn't been loaded yet.
+ """
+ # We want to load the model only if it is not loaded yet.
+ if not Model.model:
+ # Before we load the model, we need to extract it in
+ # a temporal directory.
+
+ with tempfile.TemporaryDirectory() as directory:
+ with tarfile.open(MODEL_PATH / "model.tar.gz") as tar:
+ tar.extractall(path=directory)
+
+ Model.model = keras.models.load_model(Path(directory) / "001")
+
+ def predict(self, data):
+ """
+ Generates predictions for the supplied data.
+ """
+ self.load()
+ return Model.model.predict(data)
+
+
+app = Flask(__name__)
+model = Model()
+
+
+@app.route("/predict/", methods=["POST"])
+def predict():
+ data = request.data.decode("utf-8")
+
+ data = np.array(data.split(",")).astype(np.float32)
+ data = np.expand_dims(data, axis=0)
+
+ predictions = model.predict(data=[data])
+
+ prediction = int(np.argmax(predictions[0], axis=-1))
+ confidence = float(predictions[0][prediction])
+
+ return jsonify({"prediction": prediction, "confidence": confidence})
Step 4 - Running the Flask Application
+We can now run the Flask application to serve the model from a terminal using the following command:
+$ flask --app program/code/serving/app.py --debug run --host=0.0.0.0 --port=4242
After the server is running, you can send a POST request to the server to get a prediction. Here is an example using the curl
command:
$ curl --location --request POST 'http://localhost:4242/predict' \
+--header 'Content-Type: text/plain' \
+ --data-raw '0.6569590202313976, -1.0813829646495108, 1.2097102831892812, 0.9226343641317372, 1.0, 0.0, 0.0'
Session 11 - Deploying the Model
+This session deploys the model from the Model Registry to an endpoint. We’ll do it manually, using the SageMaker SDK. Check Deploy to a SageMaker Endpoint for more information about deploying a model to an endpoint.
+ +Step 1 - Configuring the Endpoint Name
+Let’s start by defining the name of the endpoint where we’ll deploy the model:
+from sagemaker.predictor import Predictor
+
+= "penguins-endpoint" ENDPOINT
Step 2 - Creating a Model Package
+To deploy a model using the SageMaker’s Python SDK, we need to create a Model Package using the ARN of the model from the Model Registry. Remember we got the ARN of the latest approved model in the previous section.
+from sagemaker import ModelPackage
+
+if package:
+= ModelPackage(
+ model_package =package["ModelPackageArn"],
+ model_package_arn=sagemaker_session,
+ sagemaker_session=role,
+ role
+ )
+print(package["ModelPackageArn"])
arn:aws:sagemaker:us-east-1:325223348818:model-package/basic-penguins/6
+Step 3 - Deploying the Model
+Let’s now deploy the model to an endpoint.
+
+ model_package.deploy(=ENDPOINT,
+ endpoint_name=1,
+ initial_instance_count=config["instance_type"],
+ instance_type )
Step 4 - Testing the Endpoint
+After deploying the model, we can test the endpoint to make sure it works.
+Each line of the payload we’ll send to the endpoint contains the information of a penguin. Notice the model expects data that’s already transformed. We can’t provide the original data from our dataset because the model we registered will not work with it.
+The endpoint will return the predictions for each of these lines.
+= """
+ payload 0.6569590202313976,-1.0813829646495108,1.2097102831892812,0.9226343641317372,1.0,0.0,0.0
+-0.7751048801481084,0.8822689351285553,-1.2168066120762704,0.9226343641317372,0.0,1.0,0.0
+-0.837387834894918,0.3386660813829646,-0.26237731892812,-1.92351941317372,0.0,0.0,1.0
+"""
Let’s send the payload to the endpoint and print its response:
+= Predictor(endpoint_name=ENDPOINT)
+ predictor
+try:
+= predictor.predict(payload, initial_args={"ContentType": "text/csv"})
+ response = json.loads(response.decode("utf-8"))
+ response
+print(json.dumps(response, indent=2))
+ print(f"\nSpecies: {np.argmax(response['predictions'], axis=1)}")
+ except Exception as e:
+print(e)
An error occurred (ValidationError) when calling the InvokeEndpoint operation: Endpoint penguins-endpoint of account 325223348818 not found.
+Session 12 - Deploying From the Pipeline
+This session extends the SageMaker Pipeline with a step to automatically deploy the model to an endpoint.
+We’ll use a Lambda Step to create an endpoint and deploy the model.
+Here’s a high-level overview of the Deploy Step:
+ +Step 1 - Configuring Data Capture Settings
+We want to enable Data Capture as part of the endpoint configuration. With Data Capture we can record the inputs and outputs of the endpoint to use them later for monitoring the model. We need to configuration settings to enable Data Capture:
+-
+
DATA_CAPTURE_PERCENTAGE
: Represents the percentage of traffic that we want to capture.
+DATA_CAPTURE_DESTINATION
: Specifies the S3 location where we want to store the captured data.
+
= 100
+ DATA_CAPTURE_PERCENTAGE = f"{S3_LOCATION}/monitoring/data-capture" DATA_CAPTURE_DESTINATION
Step 2 - Setting up the Lambda Function
+Let’s start by writing a Lambda function that takes the model information and deploys it to an endpoint.
+There are three components that make up a SageMaker endpoint:
+ +We’ll store the code of the function in a folder called lambda
:
/ "lambda").mkdir(parents=True, exist_ok=True) (CODE_FOLDER
Let’s now write the code of the function:
+lambda.py+
import os
+import json
+import boto3
+import time
+
+sagemaker = boto3.client("sagemaker")
+
+
+def lambda_handler(event, context):
+ # If we are calling this function from EventBridge,
+ # we need to extract the model package ARN and the
+ # approval status from the event details. If we are
+ # calling this function from the pipeline, we can
+ # assume the model is approved and we can get the
+ # model package ARN as a direct parameter.
+ if "detail" in event:
+ model_package_arn = event["detail"]["ModelPackageArn"]
+ approval_status = event["detail"]["ModelApprovalStatus"]
+ else:
+ model_package_arn = event["model_package_arn"]
+ approval_status = "Approved"
+
+ print(f"Model: {model_package_arn}")
+ print(f"Approval status: {approval_status}")
+
+ if approval_status != "Approved":
+ response = {
+ "message": "Skipping deployment.",
+ "approval_status": approval_status,
+ }
+
+ print(response)
+ return {"statusCode": 200, "body": json.dumps(response)}
+
+ endpoint_name = os.environ["ENDPOINT"]
+ data_capture_percentage = int(os.environ["DATA_CAPTURE_PERCENTAGE"])
+ data_capture_destination = os.environ["DATA_CAPTURE_DESTINATION"]
+ role = os.environ["ROLE"]
+
+ timestamp = time.strftime("%m%d%H%M%S", time.localtime())
+ model_name = f"{endpoint_name}-model-{timestamp}"
+ endpoint_config_name = f"{endpoint_name}-config-{timestamp}"
+
+ sagemaker.create_model(
+ ModelName=model_name,
+ ExecutionRoleArn=role,
+ Containers=[{"ModelPackageName": model_package_arn}],
+ )
+
+ sagemaker.create_endpoint_config(
+ EndpointConfigName=endpoint_config_name,
+ ProductionVariants=[
+ {
+ "ModelName": model_name,
+ "InstanceType": "ml.m5.xlarge",
+ "InitialVariantWeight": 1,
+ "InitialInstanceCount": 1,
+ "VariantName": "AllTraffic",
+ }
+ ],
+ # We can enable Data Capture to record the inputs and outputs
+ # of the endpoint to use them later for monitoring the model.
+ DataCaptureConfig={
+ "EnableCapture": True,
+ "InitialSamplingPercentage": data_capture_percentage,
+ "DestinationS3Uri": data_capture_destination,
+ "CaptureOptions": [
+ {"CaptureMode": "Input"},
+ {"CaptureMode": "Output"},
+ ],
+ "CaptureContentTypeHeader": {
+ "CsvContentTypes": ["text/csv", "application/octect-stream"],
+ "JsonContentTypes": ["application/json", "application/octect-stream"],
+ },
+ },
+ )
+
+ response = sagemaker.list_endpoints(NameContains=endpoint_name, MaxResults=1)
+
+ if len(response["Endpoints"]) == 0:
+ # If the endpoint doesn't exist, let's create it.
+ sagemaker.create_endpoint(
+ EndpointName=endpoint_name,
+ EndpointConfigName=endpoint_config_name,
+ )
+ else:
+ # If the endpoint already exists, let's update it with the
+ # new configuration.
+ sagemaker.update_endpoint(
+ EndpointName=endpoint_name,
+ EndpointConfigName=endpoint_config_name,
+ )
+
+ return {"statusCode": 200, "body": json.dumps("Endpoint deployed successfully")}
Step 3 - Setting up Lambda Permissions
+We need to ensure our Lambda function has permission to interact with SageMaker, so let’s create a new role with the appropriate permissions.
+= "lambda-deployment-role"
+ lambda_role_name = None
+ lambda_role_arn
+try:
+= iam_client.create_role(
+ response =lambda_role_name,
+ RoleName=json.dumps(
+ AssumeRolePolicyDocument
+ {"Version": "2012-10-17",
+ "Statement": [
+
+ {"Effect": "Allow",
+ "Principal": {
+ "Service": ["lambda.amazonaws.com", "events.amazonaws.com"],
+
+ },"Acti,on": "sts:AssumeRole",
+
+ },
+ ],
+ },
+ ),="Lambda Endpoint Deployment",
+ Description
+ )
+= response["Role"]["Arn"]
+ lambda_role_arn
+
+ iam_client.attach_role_policy(="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
+ PolicyArn=lambda_role_name,
+ RoleName
+ )
+
+ iam_client.attach_role_policy(="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
+ PolicyArn=lambda_role_name,
+ RoleName
+ )
+print(f'Role "{lambda_role_name}" created with ARN "{lambda_role_arn}".')
+ except iam_client.exceptions.EntityAlreadyExistsException:
+= iam_client.get_role(RoleName=lambda_role_name)
+ response = response["Role"]["Arn"]
+ lambda_role_arn print(f'Role "{lambda_role_name}" already exists with ARN "{lambda_role_arn}".')
Step 4 - Creating the Lambda Function
+Let’s now create the Lambda function in AWS. We’ll pass the configuration settings we defined before as environment variables to the Lambda function.
+from sagemaker.lambda_helper import Lambda
+
+= Lambda(
+ deploy_lambda_fn ="deployment_fn",
+ function_name=lambda_role_arn,
+ execution_role_arn=(CODE_FOLDER / "lambda" / "lambda.py").as_posix(),
+ script="lambda.lambda_handler",
+ handler=600,
+ timeout=sagemaker_session,
+ session="python3.11",
+ runtime={
+ environment"Variables": {
+ "ENDPOINT": ENDPOINT,
+ "DATA_CAPTURE_DESTINATION": DATA_CAPTURE_DESTINATION,
+ "DATA_CAPTURE_PERCENTAGE": str(DATA_CAPTURE_PERCENTAGE),
+ "ROLE": role,
+
+ },
+ },
+ )
+= deploy_lambda_fn.upsert()
+ deploy_lambda_fn_response deploy_lambda_fn_response
Step 5 - Setting up the Lambda Step
+We can now define the Lambda Step that will run the function to deploy the model. We’ll do this in a function that we can reuse later.
+This step will send the model package ARN we want to deploy to the Lambda function as an input parameter.
+from sagemaker.workflow.lambda_step import LambdaStep
+
+
+def create_deployment_step(register_model_step):
+"""Create a Deploy Step using the supplied parameters."""
+ return LambdaStep(
+ ="deploy",
+ name=deploy_lambda_fn,
+ lambda_func={
+ inputs"model_package_arn": register_model_step.properties.ModelPackageArn,
+
+ },
+ )
+
+= create_deployment_step(register_model_step) deploy_step
Step 6 - Modifying the Condition Step
+We need to modify the Condition Step to include the new deployment step. If the condition succeeds, we will register and deploy the model.
+= ConditionStep(
+ condition_step ="check-model-accuracy",
+ name=[condition],
+ conditions=[register_model_step, deploy_step],
+ if_steps=[fail_step],
+ else_steps )
Step 7 - Creating the Pipeline
+We can now define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session12_pipeline ="session12-pipeline",
+ name=[dataset_location, accuracy_threshold],
+ parameters=[
+ steps
+ preprocessing_step,
+ train_model_step,
+ evaluate_model_step,
+ condition_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session12_pipeline.upsert(role_arn
Step 8 - Testing the Endpoint
+Let’s test the endpoint to make sure it works.
+The wait_for_endpoint
function will wait until the endpoint is ready to receive requests.
def wait_for_endpoint():
+"""Wait for the endpoint to come in service."""
+ = sagemaker_client.get_waiter("endpoint_in_service")
+ waiter =ENDPOINT, WaiterConfig={"Delay": 10, "MaxAttempts": 30})
+ waiter.wait(EndpointName
+
+= "0.6569590202313976,-1.0813829646495108,1.2097102831892812,0.9226343641317372,1.0,0.0,0.0" # noqa: E501
+ payload
+
+try:
+
+ wait_for_endpoint()
+= Predictor(endpoint_name=ENDPOINT)
+ predictor
+= predictor.predict(payload, initial_args={"ContentType": "text/csv"})
+ response = json.loads(response.decode("utf-8"))
+ response
+print(json.dumps(response, indent=2))
+ except Exception as e:
+print(e)
Waiter EndpointInService failed: Waiter encountered a terminal failure state: Matched expected service error code: ValidationException
+Session 13 - Deploying From an Event
+This session modifies the SageMaker Pipeline to register the model with PendingManualApproval
status and deploys it whenever its status changes to Approved
.
We will use Amazon EventBridge to trigger a Lambda function that will deploy the model whenever its status changes from “PendingManualApproval” to “Approved.”
+Step 1 - Configuring the Model Package Group
+We need to define the name of a new group where we’ll register models with PendingManualApproval
status.
= "pending-penguins" PENDING_MODEL_PACKAGE_GROUP
Step 2 - Setting Up EventBridge
+We can now create an EventBridge rule that triggers the deployment process whenever a model approval status becomes Approved
. To do this, let’s define the event pattern that will trigger the deployment process. Check Model package state change for more information.
= f"""
+ event_pattern {{
+ "source": ["aws.sagemaker"],
+ "detail-type": ["SageMaker Model Package State Change"],
+ "detail": {{
+ "ModelPackageGroupName": ["{PENDING_MODEL_PACKAGE_GROUP}"],
+ "ModelApprovalStatus": ["Approved"]
+ }}
+}}
+"""
Let’s now create the EventBridge rule:
+= "PendingModelApprovedRule"
+ rule_name
+= boto3.client("events")
+ events_client = events_client.put_rule(
+ rule_response =rule_name,
+ Name=event_pattern,
+ EventPattern="ENABLED",
+ State=role,
+ RoleArn )
Now, we need to define the target of the rule. The target will trigger whenever the rule matches an event. In this case, we want to trigger the Lambda function we created before:
+= events_client.put_targets(
+ response =rule_name,
+ Rule=[
+ Targets
+ {"Id": "1",
+ "Arn": deploy_lambda_fn_response["FunctionArn"],
+
+ },
+ ], )
Step 3 - Configuring the Lambda Permissions
+Finally, we need to give the Lambda function permissions to be triggered by the EventBridge rule:
+= deploy_lambda_fn_response["FunctionName"]
+ lambda_function_name = boto3.client("lambda")
+ lambda_client
+try:
+= lambda_client.add_permission(
+ response ="lambda:InvokeFunction",
+ Action=lambda_function_name,
+ FunctionName="events.amazonaws.com",
+ Principal=rule_response["RuleArn"],
+ SourceArn="EventBridge",
+ StatementId
+ )except lambda_client.exceptions.ResourceConflictException:
+print(f'Function "{lambda_function_name}" already has the specified permission.')
Function "deployment_fn" already has the specified permission.
+Step 4 - Registering the Model
+We need to modify the Model Step to register the model using PendingManualApproval
status.
= create_registration_step(
+ register_model_step
+ tensorflow_model,
+ PENDING_MODEL_PACKAGE_GROUP,="PendingManualApproval",
+ approval_status=model_metrics,
+ model_metrics )
Step 5 - Modifying the Condition Step
+Let’s modify the Condition Step to include the new registration step. If the condition succeeds, we will register the model with PendingManualApproval
status.
= ConditionStep(
+ condition_step ="check-model-accuracy",
+ name=[condition],
+ conditions=[register_model_step],
+ if_steps=[fail_step],
+ else_steps )
Step 6 - Creating the Pipeline
+Let’s define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session13_pipeline ="session13-pipeline",
+ name=[dataset_location, accuracy_threshold],
+ parameters=[
+ steps
+ preprocessing_step,
+ train_model_step,
+ evaluate_model_step,
+ condition_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session13_pipeline.upsert(role_arn
Session 14 - Building an Inference Pipeline
+This session creates an inference pipeline to control the data that goes in and comes out of the endpoint.
+Deploying the model we trained directly to an endpoint doesn’t lets us control the data that goes in and comes out of the endpoint. The TensorFlow model we trained requires transformed data, which makes it useless to other applications:
+ +To fix this, we can create an Inference Pipeline using SageMaker to control the data that goes in and comes out of the endpoint.
+Our inference pipeline will have three components:
+-
+
- A preprocessing component that will transform the input data into the format the model expects. +
- The TensorFlow model. +
- A postprocessing component that will transform the output of the model into a human-readable format. +
We want our endpoint to handle unprocessed data in CSV and JSON format and return the penguin’s species. Here is an example of the payload input we want the endpoint to support:
+{
+ "island": "Biscoe",
+ "culmen_length_mm": 48.6,
+ "culmen_depth_mm": 16.0,
+ "flipper_length_mm": 230.0,
+ "body_mass_g": 5800.0
+}
+And here is an example of the output we’d like to get from the endpoint:
+{
+ "prediction": "Adelie",
+ "confidence": 0.802672
+}
+Step 1 - Creating the Preprocessing Script
+The first component of our inference pipeline will transform the input data into the format the model expects.
+We’ll use the Scikit-Learn transformer we saved when we split and transformed the data. To deploy this component as part of an inference pipeline, we need to write a script that loads the transformer, uses it to modify the input data, and returns the output in the format the TensorFlow model expects.
+We’ll store the scripts of every component in a folder called pipeline
and add it to the system path so we can later import it as a module.
/ "pipeline").mkdir(parents=True, exist_ok=True)
+ (CODE_FOLDER f"./{CODE_FOLDER}/pipeline"]) sys.path.extend([
Let’s now create the script for the preprocessing component:
+preprocessing_component.py+
import os
+import pandas as pd
+import json
+import joblib
+
+from io import StringIO
+
+try:
+ from sagemaker_containers.beta.framework import worker
+except ImportError:
+ # We don't have access to the `worker` package when testing locally.
+ # We'll set it to None so we can change the way functions create
+ # a response.
+ worker = None
+
+
+TARGET_COLUMN = "species"
+FEATURE_COLUMNS = [
+ "island",
+ "culmen_length_mm",
+ "culmen_depth_mm",
+ "flipper_length_mm",
+ "body_mass_g",
+ "sex",
+]
+
+
+def model_fn(model_dir):
+ """
+ Deserializes the model that will be used in this container.
+ """
+
+ return joblib.load(os.path.join(model_dir, "features.joblib"))
+
+
+def input_fn(input_data, content_type):
+ """
+ Parses the input payload and creates a Pandas DataFrame.
+
+ This function will check whether the target column is present in the
+ input data and will remove it.
+ """
+
+ if content_type == "text/csv":
+ df = pd.read_csv(StringIO(input_data), header=None, skipinitialspace=True)
+
+ # If we find an extra column, it's probably the target
+ # feature, so let's drop it. We'll assume the target
+ # is always the first column,
+ if len(df.columns) == len(FEATURE_COLUMNS) + 1:
+ df = df.drop(df.columns[0], axis=1)
+
+ df.columns = FEATURE_COLUMNS
+ return df
+
+ if content_type == "application/json":
+ df = pd.DataFrame([json.loads(input_data)])
+
+ if TARGET_COLUMN in df.columns:
+ df = df.drop(TARGET_COLUMN, axis=1)
+
+ return df
+
+ raise ValueError(f"{content_type} is not supported!")
+
+
+def predict_fn(input_data, model):
+ """
+ Preprocess the input using the transformer.
+ """
+
+ try:
+ return model.transform(input_data)
+ except ValueError as e:
+ print("Error transforming the input data", e)
+ return None
+
+
+def output_fn(prediction, accept):
+ """
+ Formats the prediction output to generate a response.
+
+ The default accept/content-type between containers for serial inference
+ is JSON. Since this model preceeds a TensorFlow model, we want to
+ return a JSON object following TensorFlow's input requirements.
+ """
+
+ if prediction is None:
+ raise Exception("There was an error transforming the input data")
+
+ instances = [p for p in prediction.tolist()]
+ response = {"instances": instances}
+ return (
+ worker.Response(json.dumps(response), mimetype=accept)
+ if worker
+ else (response, accept)
+ )
Let’s test the script to ensure everything is working as expected:
+Code
+from pipeline.preprocessing_component import input_fn, predict_fn, output_fn, model_fn
+
+
+@pytest.fixture(scope="function", autouse=False)
+def directory():
+= tempfile.mkdtemp()
+ directory = Path(directory) / "input"
+ input_directory =True, exist_ok=True)
+ input_directory.mkdir(parents/ "data.csv")
+ shutil.copy2(DATA_FILEPATH, input_directory
+ = Path(directory)
+ directory
+ =directory)
+ preprocess(base_directory
+ with tarfile.open(directory / "model" / "model.tar.gz") as tar:
+ =directory / "model")
+ tar.extractall(path
+ yield directory / "model"
+
+
+ shutil.rmtree(directory)
+
+def test_input_csv_drops_target_column_if_present():
+= """
+ input_data Adelie, Torgersen, 39.1, 18.7, 181, 3750, MALE
+ """
+
+ = input_fn(input_data, "text/csv")
+ df assert len(df.columns) == 6 and "species" not in df.columns
+
+
+def test_input_json_drops_target_column_if_present():
+= json.dumps({
+ input_data "species": "Adelie",
+ "island": "Torgersen",
+ "culmen_length_mm": 44.1,
+ "culmen_depth_mm": 18.0,
+ "flipper_length_mm": 210.0,
+ "body_mass_g": 4000.0,
+ "sex": "MALE"
+
+ })
+ = input_fn(input_data, "application/json")
+ df assert len(df.columns) == 6 and "species" not in df.columns
+
+
+def test_input_csv_works_without_target_column():
+= """
+ input_data Torgersen, 39.1, 18.7, 181, 3750, MALE
+ """
+
+ = input_fn(input_data, "text/csv")
+ df assert len(df.columns) == 6
+
+
+def test_input_json_works_without_target_column():
+= json.dumps({
+ input_data "island": "Torgersen",
+ "culmen_length_mm": 44.1,
+ "culmen_depth_mm": 18.0,
+ "flipper_length_mm": 210.0,
+ "body_mass_g": 4000.0,
+ "sex": "MALE"
+
+ })
+ = input_fn(input_data, "application/json")
+ df assert len(df.columns) == 6
+
+
+def test_output_raises_exception_if_prediction_is_none():
+with pytest.raises(Exception):
+ None, "application/json")
+ output_fn(
+
+ def test_output_returns_tensorflow_ready_input():
+= np.array([
+ prediction -1.3944109908736013,1.15488062669371,-0.7954340636549508,-0.5536447804097907,0.0,1.0,0.0],
+ [1.0557485835338234,0.5040085971987002,-0.5824506029515057,-0.5851840035995248,0.0,1.0,0.0]
+ [
+ ])
+ = output_fn(prediction, "application/json")
+ response
+ assert response[0] == {
+ "instances": [
+ -1.3944109908736013,1.15488062669371,-0.7954340636549508,-0.5536447804097907,0.0,1.0,0.0],
+ [1.0557485835338234,0.5040085971987002,-0.5824506029515057,-0.5851840035995248,0.0,1.0,0.0]
+ [
+ ]
+ }
+ assert response[1] == "application/json"
+
+
+ def test_predict_transforms_data(directory):
+= """
+ input_data Torgersen, 39.1, 18.7, 181, 3750, MALE
+ """
+
+ = model_fn(directory.as_posix())
+ model = input_fn(input_data, "text/csv")
+ df = predict_fn(df, model)
+ response assert type(response) is np.ndarray
+
+
+def test_predict_returns_none_if_invalid_input(directory):
+= """
+ input_data Invalid, 39.1, 18.7, 181, 3750, MALE
+ """
+
+ = model_fn(directory.as_posix())
+ model = input_fn(input_data, "text/csv")
+ df assert predict_fn(df, model) is None
Step 2 - Creating the Postprocessing Script
+The final component of our inference pipeline will transform the output from the model into a human-readable format.
+We’ll use the Scikit-Learn target transformer we saved when we split and transformed the data. To deploy this component as part of an inference pipeline, we need to write a script that loads the transformer, uses it to modify the output from the model, and returns a human-readable format.
+postprocessing_component.py+
import os
+import numpy as np
+import json
+import joblib
+
+
+try:
+ from sagemaker_containers.beta.framework import encoders, worker
+except ImportError:
+ # We don't have access to the `worker` package when testing locally.
+ # We'll set it to None so we can change the way functions create
+ # a response.
+ worker = None
+
+
+def model_fn(model_dir):
+ """
+ Deserializes the target model and returns the list of fitted categories.
+ """
+
+ model = joblib.load(os.path.join(model_dir, "target.joblib"))
+ return model.named_transformers_["species"].categories_[0]
+
+
+def input_fn(input_data, content_type):
+ if content_type == "application/json":
+ return json.loads(input_data)["predictions"]
+
+ raise ValueError(f"{content_type} is not supported.")
+
+
+def predict_fn(input_data, model):
+ """
+ Transforms the prediction into its corresponding category.
+ """
+ predictions = np.argmax(input_data, axis=-1)
+ confidence = np.max(input_data, axis=-1)
+ return [
+ (model[prediction], confidence)
+ for confidence, prediction in zip(confidence, predictions)
+ ]
+
+def output_fn(prediction, accept):
+ if accept == "text/csv":
+ return (
+ worker.Response(encoders.encode(prediction, accept), mimetype=accept)
+ if worker
+ else (prediction, accept)
+ )
+
+ if accept == "application/json":
+ response = []
+ for p, c in prediction:
+ response.append({"prediction": p, "confidence": c})
+
+ # If there's only one prediction, we'll return it
+ # as a single object.
+ if len(response) == 1:
+ response = response[0]
+
+ return (
+ worker.Response(json.dumps(response), mimetype=accept)
+ if worker
+ else (response, accept)
+ )
+
+ raise Exception(f"{accept} accept type is not supported.")
Let’s test the script to ensure everything is working as expected:
+Code
+import numpy as np
+
+from pipeline.postprocessing_component import predict_fn, output_fn
+
+
+def test_predict_returns_prediction_as_first_column():
+= [
+ input_data 0.6, 0.2, 0.2],
+ [0.1, 0.8, 0.1],
+ [0.2, 0.1, 0.7]
+ [
+ ]
+ = ["Adelie", "Gentoo", "Chinstrap"]
+ categories
+ = predict_fn(input_data, categories)
+ response
+ assert response == [
+ "Adelie", 0.6),
+ ("Gentoo", 0.8),
+ ("Chinstrap", 0.7)
+ (
+ ]
+
+def test_output_does_not_return_array_if_single_prediction():
+= [("Adelie", 0.6)]
+ prediction = output_fn(prediction, "application/json")
+ response, _
+assert response["prediction"] == "Adelie"
+
+
+def test_output_returns_array_if_multiple_predictions():
+= [("Adelie", 0.6), ("Gentoo", 0.8)]
+ prediction = output_fn(prediction, "application/json")
+ response, _
+assert len(response) == 2
+ assert response[0]["prediction"] == "Adelie"
+ assert response[1]["prediction"] == "Gentoo"
Step 3 - Setting up the Inference Pipeline
+We can now create a PipelineModel to define our inference pipeline.
+We’ll use the model we generated in the Split and Transform step as the input to the first and last components of the inference pipeline. This model.tar.gz
file contains the two transformers we need to preprocess and postprocess the data.
Let’s create a variable with the URI to this file:
+= Join(
+ transformation_pipeline_model ="/",
+ on=[
+ values
+ preprocessing_step.properties.ProcessingOutputConfig.Outputs["model"
+
+ ].S3Output.S3Uri,"model.tar.gz",
+
+ ], )
Here is the first component of the inference pipeline. It will preprocess the data before sending it to the TensorFlow model:
+from sagemaker.sklearn.model import SKLearnModel
+
+= SKLearnModel(
+ preprocessing_model =transformation_pipeline_model,
+ model_data="preprocessing_component.py",
+ entry_point=(CODE_FOLDER / "pipeline").as_posix(),
+ source_dir="1.2-1",
+ framework_version=config["session"],
+ sagemaker_session=role,
+ role )
Here is the last component of the inference pipeline. It will postprocess the output from the TensorFlow model before sending it back to the user:
+= SKLearnModel(
+ postprocessing_model =transformation_pipeline_model,
+ model_data="postprocessing_component.py",
+ entry_point=(CODE_FOLDER / "pipeline").as_posix(),
+ source_dir="1.2-1",
+ framework_version=config["session"],
+ sagemaker_session=role,
+ role )
We can now create the inference pipeline using the three models:
+from sagemaker.pipeline import PipelineModel
+
+= PipelineModel(
+ pipeline_model ="inference-model",
+ name=[preprocessing_model, tensorflow_model, postprocessing_model],
+ models=config["session"],
+ sagemaker_session=role,
+ role )
Step 4 - Configuring the Model Package Group
+Let’s define a new package group to register the Pipeline Model:
+= "pipeline-penguins" PIPELINE_MODEL_PACKAGE_GROUP
Step 5 - Registering the Model
+We’ll modify the registration step to register the Pipeline Model in the Model Registry.
+= create_registration_step(
+ register_model_step
+ pipeline_model,
+ PIPELINE_MODEL_PACKAGE_GROUP,=["text/csv", "application/json"],
+ content_types=["text/csv", "application/json"],
+ response_types=model_metrics,
+ model_metrics )
Step 6 - Modifying the Deploy Step
+Let’s now modify the LambdaStep to use the updated Registration Step.
+= create_deployment_step(register_model_step) deploy_step
Step 7 - Modifying the Condition Step
+Since we modified the Registration Step, we also need to modify the Condition Step to use the new registration:
+= ConditionStep(
+ condition_step ="check-model-accuracy",
+ name=[condition],
+ conditions=[register_model_step, deploy_step],
+ if_steps=[fail_step],
+ else_steps )
Step 8 - Creating the Pipeline
+We can now define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session14_pipeline ="session14-pipeline",
+ name=[dataset_location, accuracy_threshold],
+ parameters=[
+ steps
+ preprocessing_step,
+ train_model_step,
+ evaluate_model_step,
+ condition_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session14_pipeline.upsert(role_arn
Step 9 - Testing the Endpoint
+Let’s now test the endpoint. Notice that we can now send the raw data to the endpoint, and it will return the penguin’s species in a human-readable format.
+from sagemaker.serializers import CSVSerializer
+
+= Predictor(
+ predictor =ENDPOINT,
+ endpoint_name=CSVSerializer(),
+ serializer=sagemaker_session,
+ sagemaker_session
+ )
+= pd.read_csv(DATA_FILEPATH)
+ data = data.drop("species", axis=1)
+ data
+= data.iloc[:3].to_csv(header=False, index=False)
+ payload print(f"Payload:\n{payload}")
+
+try:
+
+ wait_for_endpoint()
+= predictor.predict(payload, initial_args={"ContentType": "text/csv"})
+ response = json.loads(response.decode("utf-8"))
+ response print(json.dumps(response, indent=2))
+ except Exception as e:
+print(e)
Payload:
+Torgersen,39.1,18.7,181.0,3750.0,MALE
+Torgersen,39.5,17.4,186.0,3800.0,FEMALE
+Torgersen,40.3,18.0,195.0,3250.0,FEMALE
+
+Waiter EndpointInService failed: Waiter encountered a terminal failure state: Matched expected service error code: ValidationException
+We can also test the endpoint by sending a JSON payload. Notice how you can use a deserealizer to automatically decode the response from the model.
+from sagemaker.deserializers import JSONDeserializer
+from sagemaker.serializers import JSONSerializer
+
+= {
+ sample "island": "Biscoe",
+ "culmen_length_mm": 48.6,
+ "culmen_depth_mm": 16.0,
+ "flipper_length_mm": 230.0,
+ "body_mass_g": 5800.0,
+ "sex": "MALE",
+
+ }
+= Predictor(
+ predictor =ENDPOINT,
+ endpoint_name=JSONSerializer(),
+ serializer=JSONDeserializer(),
+ deserializer=sagemaker_session,
+ sagemaker_session
+ )
+try:
+= predictor.predict(sample)
+ response print(response)
+ except Exception as e:
+print(e)
An error occurred (ValidationError) when calling the InvokeEndpoint operation: Endpoint penguins-endpoint of account 325223348818 not found.
+And now let’s send the same payload but return the prediction in CSV format:
+from sagemaker.deserializers import CSVDeserializer
+
+= Predictor(
+ predictor =ENDPOINT,
+ endpoint_name=JSONSerializer(),
+ serializer=CSVDeserializer(),
+ deserializer=sagemaker_session,
+ sagemaker_session
+ )
+try:
+= predictor.predict(sample, initial_args={"Accept": "text/csv"})
+ response print(response)
+ except Exception as e:
+print(e)
An error occurred (ValidationError) when calling the InvokeEndpoint operation: Endpoint penguins-endpoint of account 325223348818 not found.
+Session 15 - Custom Inference Script
+This session creates a custom inference script to control the inference process in the SageMaker endpoint. This is an alternative to creating an inference pipeline to preprocess and postprocess the data that comes in and out of the model.
+Step 1 - Creating the Inference Script
+Let’s create a script where we’ll manage the inference process in the endpoint.
+We’ll’ include this code as part of the model assets to control the inference process on the SageMaker endpoint. SageMaker will automatically call the handler()
function for every request to the endpoint. Check How to implement the pre- and/or post-processing handler(s) for more information.
We can now create the script inside the folder.
+inference.py+
import os
+import json
+import requests
+import joblib
+import numpy as np
+import pandas as pd
+from pathlib import Path
+
+
+def handler(data, context, directory=Path("/opt/ml/model")):
+ """
+ This is the entrypoint that will be called by SageMaker
+ when the endpoint receives a request.
+ """
+ print("Handling endpoint request")
+
+ processed_input = _process_input(data, context, directory)
+ output = _predict(processed_input, context, directory) if processed_input else None
+ return _process_output(output, context, directory)
+
+
+def _process_input(data, context, directory):
+ print("Processing input data...")
+
+ if context is None:
+ # The context will be None when we are testing the code
+ # directly from a notebook. In that case, we can use the
+ # data directly.
+ endpoint_input = data
+ elif context.request_content_type in (
+ "application/json",
+ "application/octet-stream",
+ ):
+ # When the endpoint is running, we will receive a context
+ # object. We need to parse the input and turn it into
+ # JSON in that case.
+ endpoint_input = data.read().decode("utf-8")
+ else:
+ raise ValueError(
+ f"Unsupported content type: {context.request_content_type or 'unknown'}"
+ )
+
+ # Let's now transform the input data using the features pipeline.
+ try:
+ endpoint_input = json.loads(endpoint_input)
+ df = pd.json_normalize(endpoint_input)
+ features_pipeline = joblib.load(directory / "features.joblib")
+ result = features_pipeline.transform(df)
+ except Exception as e:
+ print(f"There was an error processing the input data. {e}")
+ return None
+
+ return result[0].tolist()
+
+
+def _predict(instance, context, directory):
+ print("Sending input data to model to make a prediction...")
+
+ if context is None:
+ # The context will be None when we are testing the code
+ # directly from a notebook. In that case, we want to load the
+ # model we trained and make a prediction using it.
+ import keras
+
+ model = keras.models.load_model(Path(directory) / "001")
+ predictions = model.predict(np.array([instance]))
+ result = {"predictions": predictions.tolist()}
+ else:
+ # When the endpoint is running, we will receive a context
+ # object. In that case we need to send the instance to the
+ # model to get a prediction back.
+ model_input = json.dumps({"instances": [instance]})
+ response = requests.post(context.rest_uri, data=model_input)
+
+ if response.status_code != 200:
+ raise ValueError(response.content.decode("utf-8"))
+
+ result = json.loads(response.content)
+
+ print(f"Response: {result}")
+ return result
+
+
+def _process_output(output, context, directory):
+ print("Processing prediction received from the model...")
+
+ if output:
+ prediction = np.argmax(output["predictions"][0])
+ confidence = output["predictions"][0][prediction]
+
+ target_pipeline = joblib.load(directory / "target.joblib")
+ classes = target_pipeline.named_transformers_["species"].categories_[0]
+
+ result = {
+ "prediction": classes[prediction],
+ "confidence": confidence,
+ }
+ else:
+ result = {"prediction": None}
+
+ print(result)
+
+ response_content_type = (
+ "application/json" if context is None else context.accept_header
+ )
+ return json.dumps(result), response_content_type
Let’s test the script to ensure everything is working as expected:
+Code
+import os
+import shutil
+import tarfile
+import pytest
+import tempfile
+
+from processing.script import preprocess
+from training.script import train
+from pipeline.inference import handler
+
+
+@pytest.fixture(scope="function", autouse=False)
+def directory():
+= tempfile.mkdtemp()
+ directory = Path(directory) / "input"
+ input_directory =True, exist_ok=True)
+ input_directory.mkdir(parents/ "data.csv")
+ shutil.copy2(DATA_FILEPATH, input_directory
+= Path(directory)
+ directory
+=directory)
+ preprocess(base_directory
+
+ train(=directory / "model",
+ model_directory=directory / "train",
+ train_path=directory / "validation",
+ validation_path=directory / "model",
+ pipeline_path=None,
+ experiment=1,
+ epochs
+ )
+# After training a model, we need to prepare a package just like
+ # SageMaker would. This package is what the evaluation script is
+ # expecting as an input.
+ with tarfile.open(directory / "model.tar.gz", "w:gz") as tar:
+ / "model" / "001", arcname="001")
+ tar.add(directory
+yield directory
+
+
+ shutil.rmtree(directory)
+
+@pytest.fixture(scope="function", autouse=False)
+def payload():
+return json.dumps({
+ "island": "Biscoe",
+ "culmen_length_mm": 48.6,
+ "culmen_depth_mm": 16.0,
+ "flipper_length_mm": 230.0,
+ "body_mass_g": 5800,
+ "utf-8")
+ }).encode(
+
+def test_handler_response_contains_prediction_and_confidence(directory, payload):
+= handler(
+ response =payload,
+ data=None,
+ context=directory / "model",
+ directory
+ )
+= json.loads(response[0])
+ response assert "prediction" in response
+ assert "confidence" in response
+
+
+def test_handler_response_includes_content_type(directory, payload):
+= handler(
+ response =payload,
+ data=None,
+ context=directory / "model",
+ directory
+ )
+assert response[1] == "application/json"
+
+
+def test_handler_response_prediction_is_categorical(directory, payload):
+= handler(
+ response =payload,
+ data=None,
+ context=directory / "model",
+ directory
+ )
+= json.loads(response[0])
+ response assert response["prediction"] in ["Adelie", "Gentoo", "Chinstrap"]
+
+
+def test_handler_deals_with_an_invalid_payload(directory):
+= handler(
+ response ="invalid payload",
+ data=None,
+ context=directory / "model",
+ directory
+ )
+= json.loads(response[0])
+ response assert response["prediction"] is None
Step 2 - Creating the Model
+We can now create a new TensorFlowModel including the inference.py
file.
SageMaker triggers a repack operation whenever we specify the source_dir
attribute in a model. We want that attribute to point to the local folder containing the inference.py
file. SageMaker will automatically modify the original model.tar.gz
package to include a /code
folder containing the file.
Since we need access to Scikit-Learn in our script, we can include a requirements.txt
file in the same location where the inference.py
script is, and SageMaker will install everything in it.
To repack the model assets, SageMaker will automatically include a new step in the pipeline right before registering the model.
+Here is what the new model.tar.gz
package will look like:
model/
+ |--[model_version_number]
+ |--assets/
+ |--variables/
+ |--saved_model.pb
+ |--features.joblib
+ |--target.joblib
+code/
+ |--inference.py
+ |--requirements.txt
+Let’s create a requirements.txt
file with all the libraries we want SageMaker to install in the inference container.
We can now create the model using the inference.py
script.
= TensorFlowModel(
+ custom_tensorflow_model ="penguins",
+ name=train_model_step.properties.ModelArtifacts.S3ModelArtifacts,
+ model_data="inference.py",
+ entry_point=(CODE_FOLDER / "pipeline").as_posix(),
+ source_dir=config["framework_version"],
+ framework_version=config["session"],
+ sagemaker_session=role,
+ role )
Step 3 - Configuring the Model Package Group
+Let’s define a new group where we’ll register the model using the custom inference.py
script.
= "custom-penguins" CUSTOM_MODEL_PACKAGE_GROUP
Step 4 - Registering the Model
+We can now modify the registration step to register the custom model in the Model Registry.
+= create_registration_step(
+ register_model_step
+ custom_tensorflow_model,=CUSTOM_MODEL_PACKAGE_GROUP,
+ model_package_group_name=["application/json"],
+ content_types=["application/json"],
+ response_types=model_metrics,
+ model_metrics )
Step 5 - Modifying the Deploy Step
+Let’s now modify the LambdaStep to use the updated Registration Step.
+= create_deployment_step(register_model_step) deploy_step
Step 6 - Modifying the Condition Step
+Since we modified the Registration Step, we also need to modify the Condition Step to use the new registration:
+= ConditionStep(
+ condition_step ="check-model-accuracy",
+ name=[condition],
+ conditions=[register_model_step, deploy_step],
+ if_steps=[fail_step],
+ else_steps )
Step 7 - Creating the Pipeline
+We can now define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session15_pipeline ="session15-pipeline",
+ name=[dataset_location, accuracy_threshold],
+ parameters=[
+ steps
+ preprocessing_step,
+ train_model_step,
+ evaluate_model_step,
+ condition_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session15_pipeline.upsert(role_arn
Step 8 - Testing the Endpoint
+Let’s test the endpoint to make sure it works.
+from sagemaker.deserializers import JSONDeserializer
+
+try:
+
+ wait_for_endpoint()
+= Predictor(
+ predictor =ENDPOINT,
+ endpoint_name=JSONSerializer(),
+ serializer=JSONDeserializer(),
+ deserializer
+ )
+= predictor.predict(
+ response
+ {"island": "Dream",
+ "culmen_length_mm": 46.4,
+ "culmen_depth_mm": 18.6,
+ "flipper_length_mm": 190.0,
+ "body_mass_g": 3450.0,
+
+ },
+ )
+print(response)
+
+except Exception as e:
+print(e)
Waiter EndpointInService failed: Waiter encountered a terminal failure state: Matched expected service error code: ValidationException
+Session 16 - Data Quality Baseline
+This session extends the SageMaker Pipeline with a Quality Check Step to compute a baseline for the data the endpoint expects.
+This step will compute statistics and constraints from the data. We’ll’ later use this information as the baseline to detect data drift and other data quality issues.
+ +Check Monitor data quality for more information about monitoring data quality in SageMaker.
+Step 1 - Configuring Baseline Location
+Let’s start by defining the location where SageMaker will store the baseline data:
+= f"{S3_LOCATION}/monitoring/data-quality" DATA_QUALITY_LOCATION
Step 2 - Generating Data Quality Baseline
+Let’s configure a QualityCheck Step to compute the general statistics of the data we used to build our model.
+We can configure the instance that will run the quality check using the CheckJobConfig class, and we can use the DataQualityCheckConfig
class to configure the job.
We are running this step with the following configuration:
+-
+
skip_check = True
: This parameter controls whether the step should skip checking the data against a previous baseline. Since we want to generate the baseline for the first time, we set it toTrue
. After running the pipeline once to generate the baseline, we can set this parameter toFalse
to ensure any new data follows the same distribution as the baseline.
+register_new_baseline = True
: This parameter controls whether the new calculated baseline will be registered in the Model Registry.
+
For more information about these configuration parameters, check Baseline calculation and registration.
+from sagemaker.model_monitor.dataset_format import DatasetFormat
+from sagemaker.workflow.check_job_config import CheckJobConfig
+from sagemaker.workflow.quality_check_step import (
+
+ DataQualityCheckConfig,
+ QualityCheckStep,
+ )
+= QualityCheckStep(
+ data_quality_baseline_step ="generate-data-quality-baseline",
+ name=CheckJobConfig(
+ check_job_config="ml.c5.xlarge",
+ instance_type=1,
+ instance_count=20,
+ volume_size_in_gb=config["session"],
+ sagemaker_session=role,
+ role
+ ),=DataQualityCheckConfig(
+ quality_check_config=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
+ baseline_dataset"train-baseline"
+
+ ].S3Output.S3Uri,=DatasetFormat.csv(header=True),
+ dataset_format=DATA_QUALITY_LOCATION,
+ output_s3_uri
+ ),=PIPELINE_MODEL_PACKAGE_GROUP,
+ model_package_group_name=True,
+ skip_check=True,
+ register_new_baseline=cache_config,
+ cache_config )
Step 3 - Setting up Model Metrics
+We can configure a new set of ModelMetrics using the results of the Quality Step. Check Baseline and model version lifecycle and evolution with SageMaker Pipelines for an explanation of how SageMaker uses the DriftCheckBaselines
.
from sagemaker.drift_check_baselines import DriftCheckBaselines
+
+= ModelMetrics(
+ data_quality_model_metrics =MetricsSource(
+ model_data_statistics=data_quality_baseline_step.properties.CalculatedBaselineStatistics,
+ s3_uri="application/json",
+ content_type
+ ),=MetricsSource(
+ model_data_constraints=data_quality_baseline_step.properties.CalculatedBaselineConstraints,
+ s3_uri="application/json",
+ content_type
+ ),
+ )
+= DriftCheckBaselines(
+ data_quality_drift_check_baselines =MetricsSource(
+ model_data_statistics=data_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
+ s3_uri="application/json",
+ content_type
+ ),=MetricsSource(
+ model_data_constraints=data_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
+ s3_uri="application/json",
+ content_type
+ ), )
Step 4 - Registering the Model
+Let’s modify the registration step to use the new metrics and the drift baseline:
+= create_registration_step(
+ register_model_step
+ pipeline_model,
+ PIPELINE_MODEL_PACKAGE_GROUP,=["text/csv", "application/json"],
+ content_types=["text/csv", "application/json"],
+ response_types=data_quality_model_metrics,
+ model_metrics=data_quality_drift_check_baselines,
+ drift_check_baselines )
Step 5 - Modifying the Condition Step
+Since we modified the Registration Step, we also need to modify the Condition Step to use the new registration:
+= ConditionStep(
+ condition_step ="check-model-accuracy",
+ name=[condition],
+ conditions=[register_model_step],
+ if_steps=[fail_step],
+ else_steps )
Step 6 - Creating the Pipeline
+We can now define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session16_pipeline ="session16-pipeline",
+ name=[dataset_location, accuracy_threshold],
+ parameters=[
+ steps
+ preprocessing_step,
+ train_model_step,
+ evaluate_model_step,
+ data_quality_baseline_step,
+ condition_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session16_pipeline.upsert(role_arn
Step 7 - Checking Constraints and Statistics
+Our pipeline generated data baseline statistics and constraints. We can take a look at what these values look like by downloading them from S3. You need to wait for the pipeline to finish running before these files are available.
+Here are the data quality statistics:
+try:
+= json.loads(
+ response f"{DATA_QUALITY_LOCATION}/statistics.json"),
+ S3Downloader.read_file(
+ )print(json.dumps(response["features"][0], indent=2))
+ except Exception: # noqa: S110
+pass
{
+ "name": "island",
+ "inferred_type": "String",
+ "string_statistics": {
+ "common": {
+ "num_present": 236,
+ "num_missing": 0
+ },
+ "distinct_count": 3.0,
+ "distribution": {
+ "categorical": {
+ "buckets": [
+ {
+ "value": "Dream",
+ "count": 84
+ },
+ {
+ "value": "Torgersen",
+ "count": 32
+ },
+ {
+ "value": "Biscoe",
+ "count": 120
+ }
+ ]
+ }
+ }
+ }
+}
+Here are the data quality constraints:
+try:
+= json.loads(
+ response f"{DATA_QUALITY_LOCATION}/constraints.json"),
+ S3Downloader.read_file(
+ )print(json.dumps(response, indent=2))
+ except Exception: # noqa: S110
+pass
{
+ "version": 0.0,
+ "features": [
+ {
+ "name": "island",
+ "inferred_type": "String",
+ "completeness": 1.0,
+ "string_constraints": {
+ "domains": [
+ "Dream",
+ "Torgersen",
+ "Biscoe"
+ ]
+ }
+ },
+ {
+ "name": "culmen_length_mm",
+ "inferred_type": "Fractional",
+ "completeness": 1.0,
+ "num_constraints": {
+ "is_non_negative": true
+ }
+ },
+ {
+ "name": "culmen_depth_mm",
+ "inferred_type": "Fractional",
+ "completeness": 1.0,
+ "num_constraints": {
+ "is_non_negative": true
+ }
+ },
+ {
+ "name": "flipper_length_mm",
+ "inferred_type": "Fractional",
+ "completeness": 1.0,
+ "num_constraints": {
+ "is_non_negative": true
+ }
+ },
+ {
+ "name": "body_mass_g",
+ "inferred_type": "Fractional",
+ "completeness": 1.0,
+ "num_constraints": {
+ "is_non_negative": true
+ }
+ },
+ {
+ "name": "sex",
+ "inferred_type": "String",
+ "completeness": 1.0,
+ "string_constraints": {
+ "domains": [
+ "FEMALE",
+ ".",
+ "MALE"
+ ]
+ }
+ }
+ ],
+ "monitoring_config": {
+ "evaluate_constraints": "Enabled",
+ "emit_metrics": "Enabled",
+ "datatype_check_threshold": 1.0,
+ "domain_content_threshold": 1.0,
+ "distribution_constraints": {
+ "perform_comparison": "Enabled",
+ "comparison_threshold": 0.1,
+ "comparison_method": "Robust",
+ "categorical_comparison_threshold": 0.1,
+ "categorical_drift_method": "LInfinity"
+ }
+ }
+}
+Session 17 - Model Quality Baseline
+This session extends the SageMaker Pipeline with a QualityCheck Step to compute a baseline for the model performance.
+This step will compute the baseline metrics we will later use as the baseline to detect model drift.
+To create a baseline to compare the model performance, we must create predictions for the test set and compare the model’s metrics with the model performance on production data. We can do this by running a Batch Transform Job to predict every sample from the test set. We can use a Transform Step as part of the pipeline to run this job.
+ +Check Monitor model quality for more information about monitoring model quality in SageMaker.
+Step 1 - Configuring Baseline Location
+Let’s start by defining the location where SageMaker will store the baseline data:
+= f"{S3_LOCATION}/monitoring/model-quality" MODEL_QUALITY_LOCATION
Step 2 - Creating the Model
+The Transform Step requires a model to generate predictions, so we need a Model Step that creates a model:
+= ModelStep(
+ create_model_step ="create-model",
+ name=pipeline_model.create(instance_type=config["instance_type"]),
+ step_args )
Step 3 - Setting up the Transform Step
+We are going to use a Batch Transform Job to generate predictions for every sample from the test set.
+This Batch Transform Job will run every sample from the training dataset through the model so we can compute the baseline metrics. Check Run a Batch Transform Job for more information about running a Batch Transform Job.
+Let’s start by configuring a Transformer instance:
+from sagemaker.transformer import Transformer
+
+= Transformer(
+ transformer =create_model_step.properties.ModelName,
+ model_name=config["instance_type"],
+ instance_type=1,
+ instance_count="MultiRecord",
+ strategy="text/csv",
+ accept="Line",
+ assemble_with=f"{S3_LOCATION}/transform",
+ output_path=config["session"],
+ sagemaker_session )
We can now set up the Transform Step using the Transformer we configured before.
+Notice the following:
+-
+
- We’ll generate predictions for the baseline test data that we generated when we split and transformed the data. This baseline is the same data we used to test the model, but it’s in raw format. +
- The output of this Batch Transform Job will have two fields. The first one will be the ground truth label, and the second one will be the prediction of the model. +
from sagemaker.workflow.steps import TransformStep
+
+= TransformStep(
+ generate_test_predictions_step ="generate-test-predictions",
+ name=transformer.transform(
+ step_args# We will use the baseline set we generated when we split the data.
+ # This set corresponds to the test split before the transformation step.
+ =preprocessing_step.properties.ProcessingOutputConfig.Outputs[
+ data"test-baseline"
+
+ ].S3Output.S3Uri,="Input",
+ join_source="Line",
+ split_type="text/csv",
+ content_type# We want to output the first and the second to last field from
+ # the joint set. The first field corresponds to the groundtruth,
+ # and the second to last field corresponds to the prediction.
+ #
+ # Here is an example of the data the Transform Job will generate
+ # after joining the input with the output from the model:
+ #
+ # Gentoo,39.1,18.7,181.0,3750.0,MALE,Gentoo,0.52
+ #
+ # Notice how the first field is the groundtruth coming from the
+ # test set. The second to last field is the prediction coming the
+ # model.
+ ="$[0,-2]",
+ output_filter
+ ),=cache_config,
+ cache_config )
Step 4 - Generating Model Quality Baseline
+Let’s now configure the Quality Check Step and feed it the data we generated in the Transform Step. This step will automatically compute the performance metrics of the model on the test set.
+We are running this step with the following configuration:
+-
+
skip_check = True
: This parameter controls whether the step should skip checking the data against a previous baseline. Since we want to generate the baseline for the first time, we set it toTrue
. After running the pipeline once to generate the baseline, we can set this parameter toFalse
to ensure any new data follows the same distribution as the baseline.
+register_new_baseline = True
: This parameter controls whether the new calculated baseline will be registered in the Model Registry.
+
from sagemaker.workflow.quality_check_step import ModelQualityCheckConfig
+
+= QualityCheckStep(
+ model_quality_baseline_step ="generate-model-quality-baseline",
+ name=CheckJobConfig(
+ check_job_config="ml.c5.xlarge",
+ instance_type=1,
+ instance_count=20,
+ volume_size_in_gb=config["session"],
+ sagemaker_session=role,
+ role
+ ),=ModelQualityCheckConfig(
+ quality_check_config# We are going to use the output of the Transform Step to generate
+ # the model quality baseline.
+ =generate_test_predictions_step.properties.TransformOutput.S3OutputPath,
+ baseline_dataset=DatasetFormat.csv(header=False),
+ dataset_format# We need to specify the problem type and the fields where the prediction
+ # and groundtruth are so the process knows how to interpret the results.
+ ="MulticlassClassification",
+ problem_type# Since the data doesn't have headers, SageMaker will autocreate headers for it.
+ # _c0 corresponds to the first column, and _c1 corresponds to the second column.
+ ="_c0",
+ ground_truth_attribute="_c1",
+ inference_attribute=MODEL_QUALITY_LOCATION,
+ output_s3_uri
+ ),=PIPELINE_MODEL_PACKAGE_GROUP,
+ model_package_group_name=True,
+ skip_check=True,
+ register_new_baseline=cache_config,
+ cache_config )
Step 5 - Setting up Model Metrics
+We can configure a new set of ModelMetrics using the results of the Quality Step. Check Baseline and model version lifecycle and evolution with SageMaker Pipelines for an explanation of how SageMaker uses the DriftCheckBaselines
.
from sagemaker.drift_check_baselines import DriftCheckBaselines
+
+= ModelMetrics(
+ model_quality_model_metrics =MetricsSource(
+ model_statistics=model_quality_baseline_step.properties.CalculatedBaselineStatistics,
+ s3_uri="application/json",
+ content_type
+ ),=MetricsSource(
+ model_constraints=model_quality_baseline_step.properties.CalculatedBaselineConstraints,
+ s3_uri="application/json",
+ content_type
+ ),=MetricsSource(
+ model_data_statistics=data_quality_baseline_step.properties.CalculatedBaselineStatistics,
+ s3_uri="application/json",
+ content_type
+ ),=MetricsSource(
+ model_data_constraints=data_quality_baseline_step.properties.CalculatedBaselineConstraints,
+ s3_uri="application/json",
+ content_type
+ ),
+ )
+= DriftCheckBaselines(
+ model_quality_drift_check_baselines =MetricsSource(
+ model_statistics=model_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
+ s3_uri="application/json",
+ content_type
+ ),=MetricsSource(
+ model_constraints=model_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
+ s3_uri="application/json",
+ content_type
+ ),=MetricsSource(
+ model_data_statistics=data_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
+ s3_uri="application/json",
+ content_type
+ ),=MetricsSource(
+ model_data_constraints=data_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
+ s3_uri="application/json",
+ content_type
+ ), )
Step 6 - Registering the Model
+Let’s modify the registration step to use the new metrics and the drift baseline:
+= create_registration_step(
+ register_model_step
+ pipeline_model,
+ PIPELINE_MODEL_PACKAGE_GROUP,=["text/csv", "application/json"],
+ content_types=["text/csv", "application/json"],
+ response_types=model_quality_model_metrics,
+ model_metrics=model_quality_drift_check_baselines,
+ drift_check_baselines )
Step 7 - Modifying the Condition Step
+We need to modify the Condition Step to include the new Registration Step and the Transform and Quality Check Steps.
+= ConditionStep(
+ condition_step ="check-model-accuracy",
+ name=[condition],
+ conditions=(
+ if_steps
+ [
+ create_model_step,
+ generate_test_predictions_step,
+ model_quality_baseline_step,
+ register_model_step,
+ ]
+ ),=[fail_step],
+ else_steps )
Step 8 - Creating the Pipeline
+We can now define the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn’t exist or update it if it does.
+= Pipeline(
+ session17_pipeline ="session17-pipeline",
+ name=[dataset_location, accuracy_threshold],
+ parameters=[
+ steps
+ preprocessing_step,
+ train_model_step,
+ evaluate_model_step,
+ data_quality_baseline_step,
+ condition_step,
+ ],=pipeline_definition_config,
+ pipeline_definition_config=config["session"],
+ sagemaker_session
+ )
+=role) session17_pipeline.upsert(role_arn
Step 9 - Checking Constraints
+Our pipeline generated model baseline constraints. We can take a look at what these values look like by downloading them from S3. You need to wait for the pipeline to finish running before the file is available.
+try:
+= json.loads(
+ response f"{MODEL_QUALITY_LOCATION}/constraints.json"),
+ S3Downloader.read_file(
+ )print(json.dumps(response, indent=2))
+ except Exception: # noqa: S110
+pass
{
+ "version": 0.0,
+ "multiclass_classification_constraints": {
+ "accuracy": {
+ "threshold": 1.0,
+ "comparison_operator": "LessThanThreshold"
+ },
+ "weighted_recall": {
+ "threshold": 1.0,
+ "comparison_operator": "LessThanThreshold"
+ },
+ "weighted_precision": {
+ "threshold": 1.0,
+ "comparison_operator": "LessThanThreshold"
+ },
+ "weighted_f0_5": {
+ "threshold": 1.0,
+ "comparison_operator": "LessThanThreshold"
+ },
+ "weighted_f1": {
+ "threshold": 1.0,
+ "comparison_operator": "LessThanThreshold"
+ },
+ "weighted_f2": {
+ "threshold": 1.0,
+ "comparison_operator": "LessThanThreshold"
+ }
+ }
+}
+Session 18 - Data Monitoring
+This session creates a Monitoring Job to monitor the quality of the data received by the endpoint. This schedule will run periodically and check the data that goes into the endpoint against the baseline we generated before.
+Check Amazon SageMaker Model Monitor for an explanation of how to use SageMaker’s Model Monitoring functionality. Monitor models for data and model quality, bias, and explainability is a much more extensive guide to monitoring in Amazon SageMaker.
+Step 1 - Deploying the Model
+Let’s deploy the latest approved model to an endpoint.
+Since we need to do the same later, we can create a function to deploy the model. Notice how we need to enable Data Capture to monitor the data that goes in and out of the endpoint.
+from sagemaker.model_monitor import DataCaptureConfig
+
+
+def deploy_model():
+"""Deploy the latest model registered in the Model Registry."""
+ = sagemaker_client.list_model_packages(
+ response =PIPELINE_MODEL_PACKAGE_GROUP,
+ ModelPackageGroupName="Approved",
+ ModelApprovalStatus="CreationTime",
+ SortBy=1,
+ MaxResults
+ )
+= (
+ package "ModelPackageSummaryList"][0]
+ response[if response["ModelPackageSummaryList"]
+ else None
+
+ )
+if package:
+ = ModelPackage(
+ model_package =package["ModelPackageArn"],
+ model_package_arn=sagemaker_session,
+ sagemaker_session=role,
+ role
+ )
+
+ model_package.deploy(=ENDPOINT,
+ endpoint_name=1,
+ initial_instance_count=config["instance_type"],
+ instance_type# We must enable Data Capture to monitor the model.
+ =DataCaptureConfig(
+ data_capture_config=True,
+ enable_capture=100,
+ sampling_percentage=DATA_CAPTURE_DESTINATION,
+ destination_s3_uri=["REQUEST", "RESPONSE"],
+ capture_options=["text/csv"],
+ csv_content_types=["application/json"],
+ json_content_types
+ ), )
deploy_model()
Step 2 - Generating Fake Traffic
+To test the monitoring functionality, we need to generate traffic to the endpoint. To generate traffic, we will send every sample from the dataset to the endpoint to simulate real prediction requests:
+from sagemaker.serializers import JSONSerializer
+
+
+def generate_fake_traffic():
+"""Generate fake traffic to the endpoint."""
+ try:
+ for index, row in data.iterrows():
+ = ",".join([str(x) for x in row.to_list()])
+ payload
+ predictor.predict(
+ payload,={"ContentType": "text/csv", "Accept": "text/csv"},
+ initial_args# The `inference_id` field is important to match
+ # it later with a corresponding ground-truth label.
+ =str(index),
+ inference_id
+ )except Exception as e:
+ print(e)
+
+
+ generate_fake_traffic()
We can check the location where the endpoint stores the captured data, download a file, and display its content. It may take a few minutes for the first few files to show up in S3.
+These files contain the data captured by the endpoint in a SageMaker-specific JSON-line format. Each inference request is captured in a single line in the jsonl
file. The line contains both the input and output merged together:
= S3Downloader.list(DATA_CAPTURE_DESTINATION)
+ files if len(files):
+= S3Downloader.read_file(files[-1])
+ lines print(f"File: {files[-1]}")
+ print(json.dumps(json.loads(lines.split("\n")[0]), indent=2))
File: s3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2024/03/30/17/32-02-242-191b135d-085a-484d-a119-45b26c51554c.jsonl
+{
+ "captureData": {
+ "endpointInput": {
+ "observedContentType": "text/csv",
+ "mode": "INPUT",
+ "data": "Torgersen,39.1,18.7,181.0,3750.0,MALE",
+ "encoding": "CSV"
+ },
+ "endpointOutput": {
+ "observedContentType": "text/csv; charset=utf-8",
+ "mode": "OUTPUT",
+ "data": "Adelie,0.964408875\n",
+ "encoding": "CSV"
+ }
+ },
+ "eventMetadata": {
+ "eventId": "3211434d-0db6-4ee2-8848-95ce11f6d5e6",
+ "inferenceId": "0",
+ "inferenceTime": "2024-03-30T17:32:02Z"
+ },
+ "eventVersion": "0"
+}
+Step 3 - Creating Custom Preprocessing Script
+SageMaker looks for violations in the data captured by the endpoint. By default, it combines the input data with the endpoint output and compares the result with the baseline we generated before. If we let SageMaker do this, we will get a few violations, for example an “extra column check” violation because the field confidence
doesn’t exist in the baseline data.
We can fix these violations by creating a preprocessing script configuring the data we want the monitoring job to use. Check Preprocessing and Postprocessing for more information about how to configure these scripts.
+We’ll store the script in a folder called monitoring
:
= "data_quality_preprocessor.py"
+ DATA_QUALITY_PREPROCESSOR
+/ "monitoring").mkdir(parents=True, exist_ok=True) (CODE_FOLDER
We can now define the preprocessing script. Notice that this script will return a JSON object with a name for each feature and their value.
+ +Step 4 - Uploading Preprocessing Script
+The monitoring schedule expects an S3 location pointing to the preprocessing script. Let’s upload the script to the default bucket.
+= boto3.Session().resource("s3").Bucket(config["session"].default_bucket())
+ bucket = Path("penguins/monitoring")
+ prefix / DATA_QUALITY_PREPROCESSOR).as_posix()).upload_file(
+ bucket.Object((prefix / "monitoring" / DATA_QUALITY_PREPROCESSOR).as_posix(),
+ (CODE_FOLDER
+ )= f"s3://{(bucket.name / prefix / DATA_QUALITY_PREPROCESSOR)}"
+ data_quality_preprocessor data_quality_preprocessor
Step 5 - Creating Monitoring Schedule
+We can now set up the Data Quality Monitoring Job using the DefaultModelMonitor class.
+from sagemaker.model_monitor import DefaultModelMonitor
+
+= DefaultModelMonitor(
+ data_monitor =config["instance_type"],
+ instance_type=1,
+ instance_count=1800,
+ max_runtime_in_seconds=20,
+ volume_size_in_gb=role,
+ role )
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
+INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
+Let’s now create the monitoring schedule. Notice how we specify the record_preprocessor_script
using the S3 location where we uploaded our script.
We are going to set up the monitoring schedule to run every hour. Keep in mind that SageMaker has a buffer period of 20 minutes to schedule an execution.
+import time
+from sagemaker.model_monitor import CronExpressionGenerator
+
+
+ data_monitor.create_monitoring_schedule(="penguins-data-monitoring-schedule",
+ monitor_schedule_name=ENDPOINT,
+ endpoint_input=data_quality_preprocessor,
+ record_preprocessor_script=f"{DATA_QUALITY_LOCATION}/statistics.json",
+ statistics=f"{DATA_QUALITY_LOCATION}/constraints.json",
+ constraints=CronExpressionGenerator.hourly(),
+ schedule_cron_expression=DATA_QUALITY_LOCATION,
+ output_s3_uri=True,
+ enable_cloudwatch_metrics
+ )
+# Let's give SageMaker some time to process the
+# monitoring job before we start it.
+10)
+ time.sleep( data_monitor.start_monitoring_schedule()
Step 6 - Checking Violations
+After the monitoring schedule runs for the first time, we can check the results of the last execution. If the job completed successfully, we can check if there are any violations.
+def check_execution(monitoring_schedule):
+"""Check the execution of the Monitoring Job.
+
+ This function checks the execution of the Monitoring
+ Job and prints out the list of violations if the job
+ completed.
+ """
+try:
+ = monitoring_schedule.list_executions()
+ executions
+if executions:
+ = executions[-1].describe()
+ execution print(f"Processing Job Status: {execution['ProcessingJobStatus']}")
+
+if execution["ProcessingJobStatus"] == "Completed":
+ print(f"Exit Message: \"{execution['ExitMessage']}\"")
+ print(
+ f"Last Modified Time: {execution['LastModifiedTime']}",
+ ="\n\n",
+ end
+ )print("Execution:")
+ print(json.dumps(execution, default=str, indent=2), end="\n\n")
+
+= (
+ latest_monitoring_violations
+ monitoring_schedule.latest_monitoring_constraint_violations()
+ )= json.loads(
+ response
+ S3Downloader.read_file(latest_monitoring_violations.file_s3_uri),
+ )print("Violations:")
+ print(json.dumps(response, indent=2))
+ except Exception as e:
+ print(e)
+
+
+ check_execution(data_monitor)
Processing Job Status: Completed
+Exit Message: "Completed: Job completed successfully with no violations."
+Last Modified Time: 2024-03-30 14:15:49.146000-04:00
+
+Execution:
+{
+ "ProcessingInputs": [
+ {
+ "InputName": "baseline",
+ "AppManaged": false,
+ "S3Input": {
+ "S3Uri": "s3://mlschool/penguins/monitoring/data-quality/statistics.json",
+ "LocalPath": "/opt/ml/processing/baseline/stats",
+ "S3DataType": "S3Prefix",
+ "S3InputMode": "File",
+ "S3DataDistributionType": "FullyReplicated"
+ }
+ },
+ {
+ "InputName": "constraints",
+ "AppManaged": false,
+ "S3Input": {
+ "S3Uri": "s3://mlschool/penguins/monitoring/data-quality/constraints.json",
+ "LocalPath": "/opt/ml/processing/baseline/constraints",
+ "S3DataType": "S3Prefix",
+ "S3InputMode": "File",
+ "S3DataDistributionType": "FullyReplicated"
+ }
+ },
+ {
+ "InputName": "pre_processor_script",
+ "AppManaged": false,
+ "S3Input": {
+ "S3Uri": "s3://mlschool/penguins/monitoring/data_quality_preprocessor.py",
+ "LocalPath": "/opt/ml/processing/code/preprocessing",
+ "S3DataType": "S3Prefix",
+ "S3InputMode": "File",
+ "S3DataDistributionType": "FullyReplicated"
+ }
+ },
+ {
+ "InputName": "endpoint_input_1",
+ "AppManaged": false,
+ "S3Input": {
+ "S3Uri": "s3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2024/03/30/17",
+ "LocalPath": "/opt/ml/processing/input/endpoint/penguins-endpoint/AllTraffic/2024/03/30/17",
+ "S3DataType": "S3Prefix",
+ "S3InputMode": "File",
+ "S3DataDistributionType": "FullyReplicated",
+ "S3CompressionType": "None"
+ }
+ }
+ ],
+ "ProcessingOutputConfig": {
+ "Outputs": [
+ {
+ "OutputName": "result",
+ "S3Output": {
+ "S3Uri": "s3://mlschool/penguins/monitoring/data-quality/penguins-endpoint/penguins-data-monitoring-schedule/2024/03/30/18",
+ "LocalPath": "/opt/ml/processing/output",
+ "S3UploadMode": "Continuous"
+ },
+ "AppManaged": false
+ }
+ ]
+ },
+ "ProcessingJobName": "model-monitoring-202403301800-17aa1fca873fac795ffba24a",
+ "ProcessingResources": {
+ "ClusterConfig": {
+ "InstanceCount": 1,
+ "InstanceType": "ml.m5.xlarge",
+ "VolumeSizeInGB": 20
+ }
+ },
+ "StoppingCondition": {
+ "MaxRuntimeInSeconds": 1800
+ },
+ "AppSpecification": {
+ "ImageUri": "156813124566.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer"
+ },
+ "Environment": {
+ "baseline_constraints": "/opt/ml/processing/baseline/constraints/constraints.json",
+ "baseline_statistics": "/opt/ml/processing/baseline/stats/statistics.json",
+ "dataset_format": "{\"sagemakerCaptureJson\":{\"captureIndexNames\":[\"endpointInput\",\"endpointOutput\"]}}",
+ "dataset_source": "/opt/ml/processing/input/endpoint",
+ "end_time": "2024-03-30T18:00:00Z",
+ "metric_time": "2024-03-30T17:00:00Z",
+ "monitoring_input_type": "ENDPOINT_INPUT",
+ "output_path": "/opt/ml/processing/output",
+ "publish_cloudwatch_metrics": "Enabled",
+ "record_preprocessor_script": "/opt/ml/processing/code/preprocessing/data_quality_preprocessor.py",
+ "sagemaker_endpoint_name": "penguins-endpoint",
+ "sagemaker_monitoring_schedule_name": "penguins-data-monitoring-schedule",
+ "start_time": "2024-03-30T17:00:00Z"
+ },
+ "RoleArn": "arn:aws:iam::325223348818:role/service-role/AmazonSageMaker-ExecutionRole-20230312T160501",
+ "ProcessingJobArn": "arn:aws:sagemaker:us-east-1:325223348818:processing-job/model-monitoring-202403301800-17aa1fca873fac795ffba24a",
+ "ProcessingJobStatus": "Completed",
+ "ExitMessage": "Completed: Job completed successfully with no violations.",
+ "ProcessingEndTime": "2024-03-30 14:15:48.732000-04:00",
+ "ProcessingStartTime": "2024-03-30 14:14:14.760000-04:00",
+ "LastModifiedTime": "2024-03-30 14:15:49.146000-04:00",
+ "CreationTime": "2024-03-30 14:09:54.896000-04:00",
+ "MonitoringScheduleArn": "arn:aws:sagemaker:us-east-1:325223348818:monitoring-schedule/penguins-data-monitoring-schedule",
+ "ResponseMetadata": {
+ "RequestId": "4e348652-7dff-4c40-96fb-b944aa6ed83b",
+ "HTTPStatusCode": 200,
+ "HTTPHeaders": {
+ "x-amzn-requestid": "4e348652-7dff-4c40-96fb-b944aa6ed83b",
+ "content-type": "application/x-amz-json-1.1",
+ "content-length": "3233",
+ "date": "Sat, 30 Mar 2024 18:34:16 GMT"
+ },
+ "RetryAttempts": 0
+ }
+}
+
+Violations:
+{
+ "violations": []
+}
+Step 7 - Deleting Monitoring Schedule
+Once we are done with it, we can delete the Data Monitoring schedule.
+try:
+
+ data_monitor.delete_monitoring_schedule()except Exception as e:
+print(e)
Session 19 - Model Monitoring
+This session creates a Monitoring Job to monitor the quality of the model outputs. This schedule will run periodically and check the data that goes into the endpoint against the baseline we generated before.
+Check Amazon SageMaker Model Monitor for an explanation of how to use SageMaker’s Model Monitoring functionality. Monitor models for data and model quality, bias, and explainability is a much more extensive guide to monitoring in Amazon SageMaker.
+Step 1 - Configuring Ground Truth Location
+Let’s start by defining the location where SageMaker will store the ground-truth generated by labeling the data received by the endpoint.
+= f"{S3_LOCATION}/monitoring/groundtruth" GROUND_TRUTH_LOCATION
Step 2 - Deploying the Model
+Let’s deploy the latest approved model to an endpoint.
+Here, we can reuse the function we created before to deploy the model.
+ deploy_model()
Step 3 - Generating Fake Traffic
+To test the monitoring functionality, we need to generate traffic to the endpoint. We can use the function we created before to generate fake traffic to the endpoint.
+ generate_fake_traffic()
We can check the location where the endpoint stores the captured data, download a file, and display its content. It may take a few minutes for the first few files to show up in S3.
+These files contain the data captured by the endpoint in a SageMaker-specific JSON-line format. Each inference request is captured in a single line in the jsonl
file. The line contains both the input and output merged together:
= S3Downloader.list(DATA_CAPTURE_DESTINATION)
+ files if len(files):
+= S3Downloader.read_file(files[-1])
+ lines print(f"File: {files[-1]}")
+ print(json.dumps(json.loads(lines.split("\n")[0]), indent=2))
File: s3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2024/03/30/18/40-45-068-0f144be9-ac73-4c4e-a0c7-82b1ba7db88b.jsonl
+{
+ "captureData": {
+ "endpointInput": {
+ "observedContentType": "text/csv",
+ "mode": "INPUT",
+ "data": "Torgersen,39.1,18.7,181.0,3750.0,MALE",
+ "encoding": "CSV"
+ },
+ "endpointOutput": {
+ "observedContentType": "text/csv; charset=utf-8",
+ "mode": "OUTPUT",
+ "data": "Adelie,0.964408875\n",
+ "encoding": "CSV"
+ }
+ },
+ "eventMetadata": {
+ "eventId": "08a239af-c98c-4984-b9bf-4ea049d88617",
+ "inferenceId": "0",
+ "inferenceTime": "2024-03-30T18:40:45Z"
+ },
+ "eventVersion": "0"
+}
+Step 4 - Generating Fake Labels
+To test the performance of the model, we need to label the samples captured by the endpoint. We can simulate the labeling process by generating a random label for every sample. Check Ingest Ground Truth Labels and Merge Them With Predictions for more information about this.
+import random
+from datetime import datetime, timezone
+
+from sagemaker.s3 import S3Uploader
+
+= []
+ records for inference_id in range(len(data)):
+
+ random.seed(inference_id)
+
+ records.append(
+ json.dumps(
+ {"groundTruthData": {
+ # For testing purposes, we will generate a random
+ # label for each request.
+ "data": random.choice(["Adelie", "Chinstrap", "Gentoo"]),
+ "encoding": "CSV",
+
+ },"eventMetadata": {
+ # This value should match the id of the request
+ # captured by the endpoint.
+ "eventId": str(inference_id),
+
+ },"eventVersion": "0",
+
+ },
+ ),
+ )
+= "\n".join(records)
+ groundtruth_payload = datetime.now(tz=timezone.utc)
+ upload_time = f"{GROUND_TRUTH_LOCATION}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
+ uri S3Uploader.upload_string_as_file_body(groundtruth_payload, uri)
Step 5 - Creating Monitoring Schedule
+To set up a Model Quality Monitoring Job, we can use the ModelQualityMonitor class.
+Check Amazon SageMaker Model Quality Monitor for a complete tutorial on how to run a Model Monitoring Job in SageMaker.
+from sagemaker.model_monitor import ModelQualityMonitor
+
+= ModelQualityMonitor(
+ model_monitor =config["instance_type"],
+ instance_type=1,
+ instance_count=1800,
+ max_runtime_in_seconds=20,
+ volume_size_in_gb=role,
+ role )
Let’s now create the monitoring schedule. The EndpointInput instance configures the attribute the monitoring job should use to determine the prediction from the model.
+We are going to set up the monitoring schedule to run every hour. Keep in mind that SageMaker has a buffer period of 20 minutes to schedule an execution.
+import time
+
+from sagemaker.model_monitor import CronExpressionGenerator, EndpointInput
+
+
+ model_monitor.create_monitoring_schedule(="penguins-model-monitoring-schedule",
+ monitor_schedule_name=EndpointInput(
+ endpoint_input=ENDPOINT,
+ endpoint_name# The first attribute is the prediction made
+ # by the model. For example, here is a
+ # potential output from the model:
+ # [Adelie,0.977324724\n]
+ ="0",
+ inference_attribute="/opt/ml/processing/input_data",
+ destination
+ ),="MulticlassClassification",
+ problem_type=GROUND_TRUTH_LOCATION,
+ ground_truth_input=f"{MODEL_QUALITY_LOCATION}/constraints.json",
+ constraints=CronExpressionGenerator.hourly(),
+ schedule_cron_expression=MODEL_QUALITY_LOCATION,
+ output_s3_uri=True,
+ enable_cloudwatch_metrics
+ )
+# Let's give SageMaker some time to process the
+# monitoring job before we start it.
+10)
+ time.sleep( model_monitor.start_monitoring_schedule()
Step 6 - Checking Violations
+After the monitoring schedule runs for the first time, we can check the results of the last execution. If the job completed successfully, we can check if there are any violations.
+ check_execution(model_monitor)
Processing Job Status: Completed
+Exit Message: "CompletedWithViolations: Job completed successfully with 5 violations."
+Last Modified Time: 2024-03-30 15:18:36.431000-04:00
+
+Execution:
+{
+ "ProcessingInputs": [
+ {
+ "InputName": "constraints",
+ "AppManaged": false,
+ "S3Input": {
+ "S3Uri": "s3://mlschool/penguins/monitoring/model-quality/constraints.json",
+ "LocalPath": "/opt/ml/processing/baseline/constraints",
+ "S3DataType": "S3Prefix",
+ "S3InputMode": "File",
+ "S3DataDistributionType": "FullyReplicated"
+ }
+ },
+ {
+ "InputName": "endpoint_input_1",
+ "AppManaged": false,
+ "S3Input": {
+ "S3Uri": "s3://mlschool/penguins/monitoring/model-quality/merge/penguins-endpoint/AllTraffic/2024/03/30/18",
+ "LocalPath": "/opt/ml/processing/input_data/penguins-endpoint/AllTraffic/2024/03/30/18",
+ "S3DataType": "S3Prefix",
+ "S3InputMode": "File",
+ "S3DataDistributionType": "FullyReplicated",
+ "S3CompressionType": "None"
+ }
+ }
+ ],
+ "ProcessingOutputConfig": {
+ "Outputs": [
+ {
+ "OutputName": "result",
+ "S3Output": {
+ "S3Uri": "s3://mlschool/penguins/monitoring/model-quality/penguins-endpoint/penguins-model-monitoring-schedule/2024/03/30/19",
+ "LocalPath": "/opt/ml/processing/output",
+ "S3UploadMode": "Continuous"
+ },
+ "AppManaged": false
+ }
+ ]
+ },
+ "ProcessingJobName": "model-quality-monitoring-202403301900-896e874cc3a809cdf37d6cc2",
+ "ProcessingResources": {
+ "ClusterConfig": {
+ "InstanceCount": 1,
+ "InstanceType": "ml.m5.xlarge",
+ "VolumeSizeInGB": 20
+ }
+ },
+ "StoppingCondition": {
+ "MaxRuntimeInSeconds": 1800
+ },
+ "AppSpecification": {
+ "ImageUri": "156813124566.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer"
+ },
+ "Environment": {
+ "analysis_type": "MODEL_QUALITY",
+ "baseline_constraints": "/opt/ml/processing/baseline/constraints/constraints.json",
+ "dataset_format": "{\"sagemakerMergeJson\":{\"captureIndexNames\":[\"endpointOutput\"],\"originalDatasetFormat\":null}}",
+ "dataset_source": "/opt/ml/processing/input_data",
+ "end_time": "2024-03-30T19:00:00Z",
+ "inference_attribute": "0",
+ "metric_time": "2024-03-30T18:00:00Z",
+ "monitoring_input_type": "ENDPOINT_INPUT",
+ "output_path": "/opt/ml/processing/output",
+ "problem_type": "MulticlassClassification",
+ "publish_cloudwatch_metrics": "Enabled",
+ "sagemaker_endpoint_name": "penguins-endpoint",
+ "sagemaker_monitoring_schedule_name": "penguins-model-monitoring-schedule",
+ "start_time": "2024-03-30T18:00:00Z"
+ },
+ "RoleArn": "arn:aws:iam::325223348818:role/service-role/AmazonSageMaker-ExecutionRole-20230312T160501",
+ "ProcessingJobArn": "arn:aws:sagemaker:us-east-1:325223348818:processing-job/model-quality-monitoring-202403301900-896e874cc3a809cdf37d6cc2",
+ "ProcessingJobStatus": "Completed",
+ "ExitMessage": "CompletedWithViolations: Job completed successfully with 5 violations.",
+ "ProcessingEndTime": "2024-03-30 15:18:35.908000-04:00",
+ "ProcessingStartTime": "2024-03-30 15:16:52.922000-04:00",
+ "LastModifiedTime": "2024-03-30 15:18:36.431000-04:00",
+ "CreationTime": "2024-03-30 15:12:22.569000-04:00",
+ "MonitoringScheduleArn": "arn:aws:sagemaker:us-east-1:325223348818:monitoring-schedule/penguins-model-monitoring-schedule",
+ "ResponseMetadata": {
+ "RequestId": "85abb737-543a-4c92-928b-4a293c599f18",
+ "HTTPStatusCode": 200,
+ "HTTPHeaders": {
+ "x-amzn-requestid": "85abb737-543a-4c92-928b-4a293c599f18",
+ "content-type": "application/x-amz-json-1.1",
+ "content-length": "2660",
+ "date": "Sat, 30 Mar 2024 19:33:23 GMT"
+ },
+ "RetryAttempts": 0
+ }
+}
+
+Violations:
+{
+ "violations": [
+ {
+ "constraint_check_type": "LessThanThreshold",
+ "description": "Metric weightedF2 with 0.3518870011147463 +/- 0.006730551075118943 was LessThanThreshold '1.0'",
+ "metric_name": "weightedF2"
+ },
+ {
+ "constraint_check_type": "LessThanThreshold",
+ "description": "Metric accuracy with 0.35755813953488375 +/- 0.007228798319401767 was LessThanThreshold '1.0'",
+ "metric_name": "accuracy"
+ },
+ {
+ "constraint_check_type": "LessThanThreshold",
+ "description": "Metric weightedRecall with 0.35755813953488375 +/- 0.007228798319401765 was LessThanThreshold '1.0'",
+ "metric_name": "weightedRecall"
+ },
+ {
+ "constraint_check_type": "LessThanThreshold",
+ "description": "Metric weightedPrecision with 0.35624627310673823 +/- 0.008910206698382583 was LessThanThreshold '1.0'",
+ "metric_name": "weightedPrecision"
+ },
+ {
+ "constraint_check_type": "LessThanThreshold",
+ "description": "Metric weightedF1 with 0.34769539574160063 +/- 0.006655863903356062 was LessThanThreshold '1.0'",
+ "metric_name": "weightedF1"
+ }
+ ]
+}
+Step 7 - Deleting Monitoring Schedule
+Once we are done with it, we can delete the Data Monitoring schedule.
+try:
+
+ model_monitor.delete_monitoring_schedule()except Exception as e:
+print(e)
Session 20 - Shadow Deployments
+This session configures an endpoint running a production and a shadow variant. Check Safely validate models in production for more information.
+ +Step 1 - Getting The Latest Models
+We want to deploy the two latest approved models from the Model Registry to the same endpoint. The latest version of the model will act as the Shadow variant, and the previous version will act as the Production variant.
+= sagemaker_client.list_model_packages(
+ response =BASIC_MODEL_PACKAGE_GROUP,
+ ModelPackageGroupName="Approved",
+ ModelApprovalStatus="CreationTime",
+ SortBy=2,
+ MaxResults
+ )
+if response["ModelPackageSummaryList"]:
+= response["ModelPackageSummaryList"][1]["ModelPackageArn"]
+ production_package = response["ModelPackageSummaryList"][0]["ModelPackageArn"]
+ shadow_package else:
+= None
+ production_package = None
+ shadow_package
+print(f"Production package: {production_package}")
+print(f"Shadow package: {shadow_package}")
Production package: arn:aws:sagemaker:us-east-1:325223348818:model-package/basic-penguins/5
+Shadow package: arn:aws:sagemaker:us-east-1:325223348818:model-package/basic-penguins/6
+Step 2 - Creating the Models
+We want to deploy the two packages to a new endpoint. We’ll use the boto3 API to deploy these models.
+Let’s start by creating the SageMaker Models.
+import time
+
+# We'll use a different name for this endpoint.
+= "shadow-penguins-endpoint"
+ SHADOW_DEPLOYMENT_ENDPOINT
+# The timestamp will help us create unique name for the
+# name of the models.
+= time.strftime("%m%d%H%M%S", time.localtime()) timestamp
Let’s now create the Production model.
+= f"{SHADOW_DEPLOYMENT_ENDPOINT}-production-{timestamp}"
+ production_model_name
+
+ sagemaker_client.create_model(=production_model_name,
+ ModelName=role,
+ ExecutionRoleArn=[{"ModelPackageName": production_package}],
+ Containers )
And now we can create the second model.
+= f"{SHADOW_DEPLOYMENT_ENDPOINT}-shadow-{timestamp}"
+ shadow_model_name
+
+ sagemaker_client.create_model(=shadow_model_name,
+ ModelName=role,
+ ExecutionRoleArn=[{"ModelPackageName": shadow_package}],
+ Containers )
{'ModelArn': 'arn:aws:sagemaker:us-east-1:325223348818:model/shadow-penguins-endpoint-shadow-0331125310',
+ 'ResponseMetadata': {'RequestId': '21aaeb87-98e5-49c3-8912-1143ef75f86c',
+ 'HTTPStatusCode': 200,
+ 'HTTPHeaders': {'x-amzn-requestid': '21aaeb87-98e5-49c3-8912-1143ef75f86c',
+ 'content-type': 'application/x-amz-json-1.1',
+ 'content-length': '104',
+ 'date': 'Sun, 31 Mar 2024 16:53:13 GMT'},
+ 'RetryAttempts': 0}}
+Step 3 - Creating the Endpoint Configuration
+We can now create the Endpoint Configuration using the two models
+Let’s define the location where SageMaker will output the information captured by the Shadow variant.
+= f"{S3_LOCATION}/endpoint/" SHADOW_DATA_DESTINATION
We can create the Endpoint Configuration now.
+= f"{SHADOW_DEPLOYMENT_ENDPOINT}-config-{timestamp}"
+ endpoint_config_name
+
+ sagemaker_client.create_endpoint_config(=endpoint_config_name,
+ EndpointConfigName=[
+ ProductionVariants
+ {"ModelName": production_model_name,
+ "InstanceType": "ml.m5.xlarge",
+ "InitialVariantWeight": 1,
+ "InitialInstanceCount": 1,
+ "VariantName": "ProductionTraffic",
+
+ },
+ ],=[
+ ShadowProductionVariants
+ {"ModelName": shadow_model_name,
+ "InstanceType": "ml.m5.xlarge",
+ "InitialVariantWeight": 1,
+ "InitialInstanceCount": 1,
+ "VariantName": "ShadowTraffic",
+
+ },
+ ],={
+ DataCaptureConfig"EnableCapture": True,
+ "InitialSamplingPercentage": 100,
+ "DestinationS3Uri": SHADOW_DATA_DESTINATION,
+ "CaptureOptions": [
+ "CaptureMode": "Input"},
+ {"CaptureMode": "Output"},
+ {
+ ],"CaptureContentTypeHeader": {
+ "CsvContentTypes": ["text/csv", "application/octect-stream"],
+ "JsonContentTypes": ["application/json", "application/octect-stream"],
+
+ },
+ }, )
{'EndpointConfigArn': 'arn:aws:sagemaker:us-east-1:325223348818:endpoint-config/shadow-penguins-endpoint-config-0331125310',
+ 'ResponseMetadata': {'RequestId': '24973c88-6726-4737-ae91-1138b77f5775',
+ 'HTTPStatusCode': 200,
+ 'HTTPHeaders': {'x-amzn-requestid': '24973c88-6726-4737-ae91-1138b77f5775',
+ 'content-type': 'application/x-amz-json-1.1',
+ 'content-length': '123',
+ 'date': 'Sun, 31 Mar 2024 16:53:17 GMT'},
+ 'RetryAttempts': 0}}
+Step 4 - Creating the Endpoint
+Finally, we can create the Endpoint using the Endpoint Configuration we created before.
+
+ sagemaker_client.create_endpoint(=SHADOW_DEPLOYMENT_ENDPOINT,
+ EndpointName=endpoint_config_name,
+ EndpointConfigName )
{'EndpointArn': 'arn:aws:sagemaker:us-east-1:325223348818:endpoint/shadow-penguins-endpoint',
+ 'ResponseMetadata': {'RequestId': 'df5ebd20-f59f-4895-96f4-18da3beb0cc4',
+ 'HTTPStatusCode': 200,
+ 'HTTPHeaders': {'x-amzn-requestid': 'df5ebd20-f59f-4895-96f4-18da3beb0cc4',
+ 'content-type': 'application/x-amz-json-1.1',
+ 'content-length': '92',
+ 'date': 'Sun, 31 Mar 2024 16:53:21 GMT'},
+ 'RetryAttempts': 0}}
+Step 5 - Generating Traffic
+Let’s generate some traffic to the endpoint so we can test the Shadow variant.
+= """
+ payload 0.6569590202313976,-1.0813829646495108,1.2097102831892812,0.9226343641317372,1.0,0.0,0.0
+-0.7751048801481084,0.8822689351285553,-1.2168066120762704,0.9226343641317372,0.0,1.0,0.0
+-0.837387834894918,0.3386660813829646,-0.26237731892812,-1.92351941317372,0.0,0.0,1.0
+"""
+
+= Predictor(
+ predictor =SHADOW_DEPLOYMENT_ENDPOINT,
+ endpoint_name=CSVSerializer(),
+ serializer=JSONDeserializer(),
+ deserializer
+ )
+try:
+= predictor.predict(payload)
+ response print(json.dumps(response, indent=2))
+ except Exception as e:
+print(e)
{
+ "predictions": [
+ [
+ 0.0403208546,
+ 0.0210227184,
+ 0.93865639
+ ],
+ [
+ 0.689678669,
+ 0.17514421,
+ 0.135177106
+ ],
+ [
+ 0.960919619,
+ 0.0248175282,
+ 0.0142629147
+ ]
+ ]
+}
+Step 6 - Checking Captured Data
+Let’s check the location where the endpoint stores the captured data, download a file, and display its content. It may take a few minutes for the first few files to show up in S3.
+The endpoint will capture the data for both the Production and the Shadow variants.
+= S3Downloader.list(
+ files f"{SHADOW_DATA_DESTINATION}{SHADOW_DEPLOYMENT_ENDPOINT}/ShadowTraffic/",
+
+ )if len(files):
+= S3Downloader.read_file(files[-1])
+ lines print(f"File: {files[-1]}")
+ print(json.dumps(json.loads(lines.split("\n")[0]), indent=2))
File: s3://mlschool/penguins/endpoint/shadow-penguins-endpoint/ShadowTraffic/2024/03/30/21/28-43-624-8f47e605-6bd2-44dd-bd91-293f29fd227e.jsonl
+{
+ "captureData": {
+ "endpointInput": {
+ "observedContentType": "text/csv",
+ "mode": "INPUT",
+ "data": "\n0.6569590202313976,-1.0813829646495108,1.2097102831892812,0.9226343641317372,1.0,0.0,0.0\n-0.7751048801481084,0.8822689351285553,-1.2168066120762704,0.9226343641317372,0.0,1.0,0.0\n-0.837387834894918,0.3386660813829646,-0.26237731892812,-1.92351941317372,0.0,0.0,1.0\n",
+ "encoding": "CSV"
+ },
+ "endpointOutput": {
+ "observedContentType": "application/json",
+ "mode": "OUTPUT",
+ "data": "{ \"predictions\": [[0.124825425, 0.0847824216, 0.79039216], [0.766525269, 0.220783874, 0.0126908608], [0.944253445, 0.0292692278, 0.0264772158] ]}",
+ "encoding": "JSON"
+ }
+ },
+ "eventMetadata": {
+ "eventId": "98c3c22e-20af-401c-9ca6-6d67d734a83f",
+ "invocationSource": "ShadowExperiment",
+ "inferenceTime": "2024-03-30T21:28:43Z"
+ },
+ "eventVersion": "0"
+}
+Step 7 - Deleting the Endpoint
+Let’s now delete the endpoint.
+try:
+=SHADOW_DEPLOYMENT_ENDPOINT)
+ sagemaker_client.delete_endpoint(EndpointNameexcept Exception as e:
+print(e)
Running the Pipeline
+We can run any of the pipelines we defined before by enabling the cell below and specifying the pipeline we want to run.
+ session3_pipeline.start()
Deleting the Endpoint
+After testing the endpoint, we need to ensure we delete it.
+try:
+=ENDPOINT)
+ sagemaker_client.delete_endpoint(EndpointNameexcept Exception as e:
+print(e)