Automating Train-and-Deploy ML Workflows with Amazon SageMaker Pipelines

Automating Train-and-Deploy ML Workflows with Amazon SageMaker Pipelines

Jupyter Notebooks are excellent for exploring data and building machine learning models. However, retraining these models on new data by manually re-running notebook cells can be slow and prone to errors. When it is time to retrain a model on new data, practitioners often face the challenge of re-running numerous cells in the correct sequence, hoping no steps are missed.

A SageMaker Pipeline offers a solution by automating the entire workflow. It transforms manual notebook steps into a repeatable and automated process. This guide provides a complete walkthrough for building a production-grade pipeline, including all the necessary code and clear explanations for each step.

Why Use SageMaker Pipelines?

Notebooks are ideal for experimentation but not for production workflows. Relying solely on notebooks for repeated tasks can lead to several challenges:

  • Manual and Slow: Rerunning notebooks is error-prone and time-consuming. Cells can be executed out of order, or critical steps may be missed.
  • Environment Discrepancies: Differences in Python libraries between local and cloud environments can cause deployment failures.
  • Hidden Errors: Hard-coded values in features may lead to inaccurate model training if the data changes.
  • Lack of Traceability: Without a formal pipeline, tracking which dataset produced which model is difficult.

A SageMaker Pipeline solves these issues by defining workflows as a sequence of connected, repeatable steps. Once the pipeline is built, SageMaker executes it consistently, ensuring reliability, traceability, and reproducibility.

Our Goal: The Automated Train-and-Deploy Workflow

We will build a simple, three-step pipeline that takes raw data from AWS S3 and ends with a live prediction API.

A SageMaker Pipeline solves these issues by defining workflows as a sequence of connected, repeatable steps. Once the pipeline is built, SageMaker executes it consistently, ensuring reliability, traceability, and reproducibility.
DAG View of Pipeline
  • Preprocess: Reads raw data, cleans it, and prepares it for training.
  • Train: Trains an XGBoost model on the clean data.
  • Deploy: Creates a live, real-time API endpoint from the trained model.

Prerequisites

Before proceeding, ensure the following prerequisites are in place:

  • An active AWS account.
  • Basic knowledge of Python and some MLOps concepts.
  • Familiarity with the AWS Management Console, Amazon S3, and Amazon SageMaker AI.
  • A provisioned SageMaker Studio environment.
  • An IAM (Identity and Access Management) role with AmazonSageMakerFullAccess permissions to allow the pipeline to create and manage resources.

Steps To Prepare Amazon SageMaker Pipeline

Step 1: Organize Your Project Files

A clean project structure is essential for maintainability. Inside the SageMaker Studio environment, a project folder should be created with the following layout. This organization keeps the code structured and easy to manage.

/
└── fraud-detection-pipeline/
    │
    ├── scripts/
    │   ├── preprocess.py      # Cleans the data
    │   ├── inference.py       # Code for the live API
    │   ├── deploy.py          # Script that creates the API
    │   ├── runner.sh          # Helper script for the deployment step
    │   └── requirements.txt   # Extra libraries for the deployment step
    │
    └── Launcher.ipynb         # Our "master" notebook to start the pipeline

The raw data for this example is a simple CSV file stored in an S3 bucket.

Example Of The Dataset Stored in S3
Example Of The Dataset Stored in S3

Step 2: Create the Building Blocks (The Scripts)

Next, the scripts for each step of the workflow are created.

1. The Preprocessing Script

This script reads raw data, performs feature engineering, and splits the data into training and validation sets. It also saves the tools it uses, such as the LabelEncoder and TfidfVectorizer. These saved “artifacts” are essential for ensuring that live data is processed in the same way as the training data.

File: scripts/preprocess.py

import pandas as pd
import argparse
import os
import joblib
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import LabelEncoder

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data-path", type=str, required=True)
    args = parser.parse_args()
    input_file = os.path.join(args.input_data_path, "generated_dataset_unified.csv")
    df = pd.read_csv(input_file)
    df['policy_purchase_date'] = pd.to_datetime(df['policy_purchase_date'])
    df['incident_date'] = pd.to_datetime(df['incident_date'])
    df['days_between_purchase_and_incident'] = (df['incident_date'] - df['policy_purchase_date']).dt.days
    df['incident_weekday'] = df['incident_date'].dt.weekday
    df['incident_month'] = df['incident_date'].dt.month
    df['claim_to_premium_ratio'] = df['claim_amount'] / df['premium_amount']
    avg_premium_per_type = df.groupby('policy_type')['premium_amount'].transform('mean')
    df['premium_vs_type_avg'] = df['premium_amount'] / avg_premium_per_type
    label_encoder = LabelEncoder()
    df['policy_type_encoded'] = label_encoder.fit_transform(df['policy_type'])
    df['text_combined'] = df['incident_location'] + " " + df['incident_description']
    vectorizer = TfidfVectorizer(max_features=100)
    text_embeddings = vectorizer.fit_transform(df['text_combined']).toarray()
    embedding_df = pd.DataFrame(text_embeddings, columns=[f'embed_{i}' for i in range(text_embeddings.shape[1])])
    df = pd.concat([df.reset_index(drop=True), embedding_df], axis=1)
    df_processed = df.drop(['policy_purchase_date', 'incident_date', 'incident_location', 'incident_description', 'policy_type', 'text_combined'], axis=1)
    is_fraud_col = df_processed.pop('is_fraud')
    df_processed.insert(0, 'is_fraud', is_fraud_col)
    train_columns = list(df_processed.columns[1:])
    train_df, validation_df = train_test_split(df_processed, test_size=0.2, random_state=42)
    processed_data_dir = "/opt/ml/processing"
    train_dir = os.path.join(processed_data_dir, "train")
    validation_dir = os.path.join(processed_data_dir, "validation")
    artifacts_dir = os.path.join(processed_data_dir, "artifacts")
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(validation_dir, exist_ok=True)
    os.makedirs(artifacts_dir, exist_ok=True)
    train_df.to_csv(os.path.join(train_dir, "train.csv"), index=False, header=False)
    validation_df.to_csv(os.path.join(validation_dir, "validation.csv"), index=False, header=False)
    joblib.dump(label_encoder, os.path.join(artifacts_dir, "label_encoder.joblib"))
    joblib.dump(vectorizer, os.path.join(artifacts_dir, "vectorizer.joblib"))
    joblib.dump(train_columns, os.path.join(artifacts_dir, "train_columns.joblib"))
    print("Preprocessing script complete.")

2. The Inference Script

This code will run inside the live API endpoint. A multi-container endpoint is deployed, where the first container (Scikit-learn) executes this script. It loads the saved artifacts and transforms incoming JSON data into a CSV string, which is the format expected by the second container (XGBoost).

File: scripts/inference.py

import pandas as pd
import joblib
import os
import json

def model_fn(model_dir):
    print("Loading preprocessing artifacts...")
    artifacts = {
        "label_encoder": joblib.load(os.path.join(model_dir, "label_encoder.joblib")),
        "vectorizer": joblib.load(os.path.join(model_dir, "vectorizer.joblib")),
        "train_columns": joblib.load(os.path.join(model_dir, "train_columns.joblib")),
    }
    print("Artifacts loaded successfully.")
    return artifacts

def input_fn(request_body, request_content_type):
    if request_content_type == "application/json":
        return json.loads(request_body)
    raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(input_data, artifacts):
    print("Running preprocessing...")
    df = pd.DataFrame([input_data])
    df['policy_purchase_date'] = pd.to_datetime(df['policy_purchase_date'])
    df['incident_date'] = pd.to_datetime(df['incident_date'])
    df['days_between_purchase_and_incident'] = (df['incident_date'] - df['policy_purchase_date']).dt.days
    df['incident_weekday'] = df['incident_date'].dt.weekday
    df['incident_month'] = df['incident_date'].dt.month
    
    avg_premium_per_type_value = 1500
    df['premium_vs_type_avg'] = df['premium_amount'] / avg_premium_per_type_value
    df['claim_to_premium_ratio'] = df['claim_amount'] / df['premium_amount']
    df['policy_type_encoded'] = artifacts["label_encoder"].transform(df['policy_type'])
    df['text_combined'] = df['incident_location'] + " " + df['incident_description']
    text_embeddings = artifacts["vectorizer"].transform(df['text_combined']).toarray()
    embedding_df = pd.DataFrame(text_embeddings, columns=[f'embed_{i}' for i in range(text_embeddings.shape[1])])
    
    df = pd.concat([df.reset_index(drop=True), embedding_df], axis=1)
    
    df_processed = df[artifacts["train_columns"]]
    
    print("Preprocessing complete. Returning CSV output.")
    return df_processed.to_csv(header=False, index=False)

3. The Deployment Script

This script drives the final pipeline step. It receives the S3 paths to the trained model and the preprocessor artifacts, and packages them into a PipelineModel, and then creates or updates the live endpoint.

File: scripts/deploy.py

import os
import tarfile
import argparse
import sagemaker
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--endpoint-name", type=str, required=True)
    parser.add_argument("--role", type=str, required=True)
    parser.add_argument("--region", type=str, required=True)
    parser.add_argument("--model-data-s3-uri", type=str, required=True)
    args = parser.parse_args()

    boto_session = boto3.Session(region_name=args.region)
    sagemaker_session = sagemaker.Session(boto_session=boto_session)
    sm_client = boto_session.client("sagemaker")
    
    default_bucket = sagemaker_session.default_bucket()

    source_directory = "/opt/ml/processing/code"
    artifacts_path = "/opt/ml/processing/artifacts"
    
    output_dir = "/opt/ml/processing/output"
    os.makedirs(output_dir, exist_ok=True)
    artifacts_tar_path = os.path.join(output_dir, "artifacts.tar.gz")

    print("Packaging model artifacts (.joblib files)...")
    with tarfile.open(artifacts_tar_path, "w:gz") as tar:
        for filename in os.listdir(artifacts_path):
            if filename.endswith(".joblib"):
                tar.add(os.path.join(artifacts_path, filename), arcname=filename)

    artifacts_s3_uri = sagemaker_session.upload_data(
        path=artifacts_tar_path,
        bucket=default_bucket,
        key_prefix=f"{args.endpoint_name}/preprocessor-artifacts"
    )
    print(f"Uploaded preprocessor artifacts to: {artifacts_s3_uri}")

    preprocessor_model = Model(
        image_uri=sagemaker.image_uris.retrieve("sklearn", args.region, "1.2-1"),
        model_data=artifacts_s3_uri,
        role=args.role,
        entry_point="inference.py",
        source_dir=source_directory,
        sagemaker_session=sagemaker_session
    )
    
    xgboost_model = Model(
        image_uri=sagemaker.image_uris.retrieve("xgboost", args.region, "1.5-1"),
        model_data=args.model_data_s3_uri,
        role=args.role,
        sagemaker_session=sagemaker_session,
    )

    pipeline_model = PipelineModel(
        models=[preprocessor_model, xgboost_model],
        role=args.role,
        sagemaker_session=sagemaker_session
    )

    print(f"Deploying pipeline model to real-time endpoint '{args.endpoint_name}'...")
    
    try:
        sm_client.describe_endpoint(EndpointName=args.endpoint_name)
        update_existing_endpoint = True
        print(f"Endpoint '{args.endpoint_name}' exists. Will update.")
    except sm_client.exceptions.ClientError:
        update_existing_endpoint = False
        print(f"Endpoint '{args.endpoint_name}' does not exist. Will create.")

    pipeline_model.deploy(
        initial_instance_count=1,
        instance_type="ml.t2.medium",
        endpoint_name=args.endpoint_name,
        update_endpoint=update_existing_endpoint
    )
    
    print("Deployment request submitted successfully.")

4. The Helper Scripts

These small files manage the environment setup inside the deployment step. The requirements.txt file specifies dependencies, while runner.sh ensures they are installed before the Python deployment script runs.

File: scripts/requirements.txt

sagemaker>=2.194.0,<2.205.0
xgboost
numpy<2

File: scripts/runner.sh (This file must be made executable with chmod +x scripts/runner.sh)

#!/bin/bash
set -e
SCRIPT_DIR="/opt/ml/processing/code"
echo "Installing dependencies from ${SCRIPT_DIR}/requirements.txt..."
pip install -r "${SCRIPT_DIR}/requirements.txt"
echo "Running the deployment script: ${SCRIPT_DIR}/deploy.py..."
python "${SCRIPT_DIR}/deploy.py" "$@"
echo "Script finished successfully."

Step 3: The Command Center (The Launcher Notebook)

This is the only file that needs to be run manually. This notebook defines the pipeline’s structure, connects the inputs and outputs of each step, and instructs SageMaker to execute the entire workflow.

View Before Running The Pipeline
View Before Running The Pipeline

File: Launcher.ipynb

import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.estimator import Estimator
from sagemaker.image_uris import retrieve
from sagemaker.inputs import TrainingInput

# --- 1. CONFIGURATION ---
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name

# --- Pipeline inputs and names ---
default_bucket = sagemaker_session.default_bucket()
input_data_bucket = "your-raw-data-bucket" # IMPORTANT: Change this to the name of your S3 bucket!
pipeline_name = "Fraud-Detection-EndToEnd-Pipeline"
endpoint_name = "fraud-detection-endpoint"
base_job_prefix = "fraud-detection"

# --- 2. DEFINE PROCESSORS ---

# Use the SKLearnProcessor for our preprocessing script
preprocess_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name=f"{base_job_prefix}/preprocess",
    role=role,
)

# Use a generic ScriptProcessor for our deployment step to run the shell script
deploy_processor = ScriptProcessor(
    image_uri=retrieve("sklearn", region, "1.2-1"),
    command=["/bin/bash"],
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name=f"{base_job_prefix}/deploy",
    role=role,
)

# --- 3. DEFINE PIPELINE STEPS ---

# STEP 3.1: PREPROCESSING
step_preprocess = ProcessingStep(
    name="PreprocessData",
    processor=preprocess_processor,
    inputs=[ProcessingInput(source=f"s3://{input_data_bucket}/unified", destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="artifacts", source="/opt/ml/processing/artifacts"),
    ],
    code="scripts/preprocess.py",
    job_arguments=["--input-data-path", "/opt/ml/processing/input"],
)

# STEP 3.2: TRAINING
step_train = TrainingStep(
    name="TrainModel",
    estimator=Estimator(
        image_uri=retrieve("xgboost", region, "1.5-1"),
        role=role,
        instance_count=1,
        instance_type="ml.m4.xlarge",
        output_path=f"s3://{default_bucket}/pipeline-output/training",
        base_job_name=f"{base_job_prefix}/train",
        hyperparameters={"objective": "binary:logistic", "eval_metric": "auc", "num_round": 100},
    ),
    inputs={"train": TrainingInput(
        s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        content_type="text/csv"
    )}
)

# STEP 3.3: DEPLOYMENT
step_deploy = ProcessingStep(
    name="DeployEndpoint",
    processor=deploy_processor,
    code="scripts/runner.sh",
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_preprocess.properties.ProcessingOutputConfig.Outputs["artifacts"].S3Output.S3Uri,
            destination="/opt/ml/processing/artifacts"
        ),
        ProcessingInput(source="scripts", destination="/opt/ml/processing/code"),
    ],
    job_arguments=[
        "--endpoint-name", endpoint_name,
        "--role", role,
        "--region", region,
        "--model-data-s3-uri", step_train.properties.ModelArtifacts.S3ModelArtifacts,
    ]
)

# --- 4. ASSEMBLE AND RUN THE FULL PIPELINE ---
pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_preprocess, step_train, step_deploy],
    sagemaker_session=sagemaker_session,
)

print(f"Upserting and running the full end-to-end pipeline: '{pipeline_name}'...")
pipeline.upsert(role_arn=role)
execution = pipeline.start()

print(f"Pipeline execution started successfully.")
print("You can monitor its progress in the SageMaker console under 'Pipelines'.")
print(f"Execution ARN: {execution.arn}")

Step 4: Run the Pipeline

With all the components in place, the pipeline is ready to run.

  1. Check Permissions: Confirm that the SageMaker IAM role has AmazonSageMakerFullAccess permissions. This is a one-time setup that allows the pipeline to create endpoints and other required resources.
  2. Restart & Run: In the Launcher.ipynb notebook, execute all cells.

The automated workflow has now been launched.

Running Pipeline
Running Pipeline
Successfully Created Endpoint
Successfully Created Endpoint

Conclusion

The automated workflow has been successfully launched. Its progress can be monitored in the SageMaker console, where each step will turn green as it runs to completion. After approximately 20-30 minutes, the pipeline will finish, and a live, real-time API will be deployed and ready for predictions. This marks a significant step from manual experimentation to automated, production-ready MLOps, resulting in a repeatable and robust system for training and deploying models.