11 min read

Building a Simple Model Registry with S3 and DynamoDB

Table of Contents

Introduction

As ML models move from experimentation to production, you need a way to track different versions, store model artifacts, and retrieve the right model for deployment. A model registry serves as the central system for managing model versions, metadata, and artifacts.

In this post, I’ll walk through building a simple but practical model registry using AWS S3 for storing model artifacts and DynamoDB for tracking metadata. We’ll create Lambda functions to register new models, list available versions, and retrieve models for inference. This setup gives you version control, metadata tracking, and easy model retrieval without needing specialized MLOps platforms.

The complete code for this project is available on GitHub: s3-dynamodb-model-registry

What is a Model Registry?

A model registry is a system that:

  • Stores model artifacts (trained model files, weights, checkpoints)
  • Tracks metadata (version numbers, training metrics, creation dates, model descriptions)
  • Enables model retrieval (fetch specific versions for deployment or rollback)
  • Maintains version history (see what changed between versions)

For production ML systems, this helps you answer questions like: “Which model version is currently deployed?”, “What was the accuracy of version 2.3?”, or “Can I roll back to the previous model?”

Architecture Overview

Our model registry uses:

  1. S3 Bucket: Stores model artifacts (.pkl, .pt, .h5 files, etc.) organized by model name and version
  2. DynamoDB Table: Stores metadata about each model version (metrics, training date, description, S3 path)
  3. Lambda Functions: Handle registration, listing, and retrieval operations

The registry supports three main operations:

  • Register Model: Upload a model artifact to S3 and store metadata in DynamoDB
  • List Models: Query DynamoDB to see all versions of a model or all models
  • Get Model: Retrieve model metadata and generate a pre-signed S3 URL for downloading the artifact

DynamoDB Schema Design

Let’s start by designing the DynamoDB table structure. We’ll use a composite key to support querying by model name and version:

# Table: model-registry
# Partition Key: model_name (String)
# Sort Key: version (String)
# Additional attributes:
#   - s3_path (String) - Full S3 path to the model artifact
#   - training_date (String) - ISO format date
#   - metrics (Map) - Dictionary of evaluation metrics (accuracy, f1_score, etc.)
#   - description (String) - Human-readable description
#   - created_by (String) - Who registered the model
#   - is_production (Boolean) - Flag for currently deployed version

This schema allows us to:

  • Query all versions of a specific model (query by model_name)
  • Get a specific version (query by model_name and version)
  • Use GSI (Global Secondary Index) if we need to query by other attributes like is_production

Implementation

1. Register Model Lambda

This Lambda function handles registering a new model version. It accepts a model file (via multipart form data or Base64), uploads it to S3, and stores metadata in DynamoDB.

import json
import boto3
import os
import uuid
from datetime import datetime
from botocore.exceptions import ClientError

s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')

MODEL_BUCKET = os.environ.get('MODEL_BUCKET', 'ml-model-registry')
TABLE_NAME = os.environ.get('TABLE_NAME', 'model-registry')
table = dynamodb.Table(TABLE_NAME)

def create_response(status_code, body):
    return {
        "statusCode": status_code,
        "headers": {
            "Content-Type": "application/json",
            "Access-Control-Allow-Origin": "*",
        },
        "body": json.dumps(body)
    }

def lambda_handler(event, context):
    try:
        # Parse request body
        body = json.loads(event.get('body', '{}'))
        
        model_name = body.get('model_name')
        version = body.get('version')  # Optional, will auto-increment if not provided
        model_data = body.get('model_data')  # Base64 encoded model file
        metrics = body.get('metrics', {})
        description = body.get('description', '')
        created_by = body.get('created_by', 'system')
        
        if not model_name or not model_data:
            return create_response(400, {
                "error": "Missing required fields: model_name and model_data"
            })
        
        # If version not provided, get the latest version and increment
        if not version:
            version = get_next_version(model_name)
        
        # Decode base64 model data
        import base64
        model_bytes = base64.b64decode(model_data)
        
        # Determine file extension (default to .pkl)
        file_extension = body.get('file_extension', 'pkl')
        s3_key = f"models/{model_name}/v{version}/model.{file_extension}"
        
        # Upload to S3
        s3_client.put_object(
            Bucket=MODEL_BUCKET,
            Key=s3_key,
            Body=model_bytes,
            ContentType='application/octet-stream'
        )
        
        s3_path = f"s3://{MODEL_BUCKET}/{s3_key}"
        
        # Store metadata in DynamoDB
        item = {
            'model_name': model_name,
            'version': version,
            's3_path': s3_path,
            'training_date': datetime.utcnow().isoformat(),
            'metrics': metrics,
            'description': description,
            'created_by': created_by,
            'is_production': False
        }
        
        table.put_item(Item=item)
        
        return create_response(200, {
            "message": "Model registered successfully",
            "model_name": model_name,
            "version": version,
            "s3_path": s3_path
        })
        
    except Exception as e:
        return create_response(500, {
            "error": str(e)
        })

def get_next_version(model_name):
    """Get the next version number for a model by querying existing versions"""
    try:
        response = table.query(
            KeyConditionExpression='model_name = :name',
            ExpressionAttributeValues={':name': model_name},
            ScanIndexForward=False,  # Get latest first
            Limit=1
        )
        
        if response['Items']:
            latest_version = response['Items'][0]['version']
            # Simple version increment (assumes semantic versioning or numeric)
            try:
                # Try numeric version
                next_version = str(int(latest_version) + 1)
            except ValueError:
                # If not numeric, append .1 (e.g., "1.0" -> "1.1")
                parts = latest_version.split('.')
                if len(parts) == 2:
                    next_version = f"{parts[0]}.{int(parts[1]) + 1}"
                else:
                    next_version = f"{latest_version}.1"
        else:
            next_version = "1.0"
            
        return next_version
    except Exception as e:
        # If query fails, default to 1.0
        return "1.0"

Key Points:

  • Accepts model data as Base64 (you could also use multipart form data for larger files)
  • Auto-increments version numbers if not provided
  • Stores both the S3 path and metadata in DynamoDB
  • Uses a consistent S3 key structure: models/{model_name}/v{version}/model.{ext}

2. List Models Lambda

This function queries DynamoDB to retrieve model information. It can list all versions of a specific model or all models in the registry.

import json
import boto3
import os
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')

TABLE_NAME = os.environ.get('TABLE_NAME', 'model-registry')
table = dynamodb.Table(TABLE_NAME)

def create_response(status_code, body):
    return {
        "statusCode": status_code,
        "headers": {
            "Content-Type": "application/json",
            "Access-Control-Allow-Origin": "*",
        },
        "body": json.dumps(body)
    }

def lambda_handler(event, context):
    try:
        query_params = event.get('queryStringParameters') or {}
        model_name = query_params.get('model_name')
        
        if model_name:
            # List all versions of a specific model
            response = table.query(
                KeyConditionExpression=Key('model_name').eq(model_name),
                ScanIndexForward=False  # Latest versions first
            )
            models = response['Items']
        else:
            # List all models (scan the table)
            # Note: For production, consider using a GSI or pagination
            response = table.scan()
            models = response['Items']
            
            # Group by model_name and get latest version of each
            model_dict = {}
            for item in models:
                name = item['model_name']
                if name not in model_dict:
                    model_dict[name] = item
                else:
                    # Keep the latest version (simple comparison)
                    if item['version'] > model_dict[name]['version']:
                        model_dict[name] = item
            models = list(model_dict.values())
        
        # Format response (exclude S3 path for security, include in get_model instead)
        formatted_models = []
        for model in models:
            formatted_models.append({
                'model_name': model['model_name'],
                'version': model['version'],
                'training_date': model.get('training_date'),
                'metrics': model.get('metrics', {}),
                'description': model.get('description', ''),
                'is_production': model.get('is_production', False)
            })
        
        return create_response(200, {
            "models": formatted_models,
            "count": len(formatted_models)
        })
        
    except Exception as e:
        return create_response(500, {
            "error": str(e)
        })

Key Points:

  • Supports querying by model name or listing all models
  • Returns metadata without exposing S3 paths directly (security consideration)
  • Orders results with latest versions first

3. Get Model Lambda

This function retrieves a specific model version and generates a pre-signed S3 URL for downloading the artifact. Pre-signed URLs provide temporary, secure access to S3 objects without exposing bucket credentials.

import json
import boto3
import os
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key

s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')

MODEL_BUCKET = os.environ.get('MODEL_BUCKET', 'ml-model-registry')
TABLE_NAME = os.environ.get('TABLE_NAME', 'model-registry')
table = dynamodb.Table(TABLE_NAME)

# Pre-signed URL expiration (in seconds) - 1 hour default
URL_EXPIRATION = int(os.environ.get('URL_EXPIRATION', 3600))

def create_response(status_code, body):
    return {
        "statusCode": status_code,
        "headers": {
            "Content-Type": "application/json",
            "Access-Control-Allow-Origin": "*",
        },
        "body": json.dumps(body)
    }

def lambda_handler(event, context):
    try:
        query_params = event.get('queryStringParameters') or {}
        model_name = query_params.get('model_name')
        version = query_params.get('version')
        
        if not model_name:
            return create_response(400, {
                "error": "Missing required parameter: model_name"
            })
        
        # If version not specified, get the latest
        if not version:
            response = table.query(
                KeyConditionExpression=Key('model_name').eq(model_name),
                ScanIndexForward=False,
                Limit=1
            )
            if not response['Items']:
                return create_response(404, {
                    "error": f"Model '{model_name}' not found"
                })
            item = response['Items'][0]
            version = item['version']
        else:
            # Get specific version
            response = table.get_item(
                Key={
                    'model_name': model_name,
                    'version': version
                }
            )
            if 'Item' not in response:
                return create_response(404, {
                    "error": f"Model '{model_name}' version '{version}' not found"
                })
            item = response['Item']
        
        # Extract S3 key from s3_path (format: s3://bucket/key)
        s3_path = item['s3_path']
        s3_key = s3_path.replace(f"s3://{MODEL_BUCKET}/", "")
        
        # Generate pre-signed URL
        try:
            presigned_url = s3_client.generate_presigned_url(
                'get_object',
                Params={
                    'Bucket': MODEL_BUCKET,
                    'Key': s3_key
                },
                ExpiresIn=URL_EXPIRATION
            )
        except ClientError as e:
            return create_response(500, {
                "error": f"Failed to generate download URL: {str(e)}"
            })
        
        return create_response(200, {
            "model_name": item['model_name'],
            "version": item['version'],
            "training_date": item.get('training_date'),
            "metrics": item.get('metrics', {}),
            "description": item.get('description', ''),
            "is_production": item.get('is_production', False),
            "download_url": presigned_url,
            "url_expires_in": URL_EXPIRATION
        })
        
    except Exception as e:
        return create_response(500, {
            "error": str(e)
        })

Key Points:

  • Returns model metadata along with a pre-signed S3 URL
  • If version is not specified, returns the latest version
  • Pre-signed URLs expire after a configurable time (default 1 hour)
  • Handles missing models gracefully with 404 responses

4. Mark Production Lambda (Bonus)

A useful addition is a function to mark a specific model version as the production version. This helps track which model is currently deployed.

import json
import boto3
import os
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')

TABLE_NAME = os.environ.get('TABLE_NAME', 'model-registry')
table = dynamodb.Table(TABLE_NAME)

def create_response(status_code, body):
    return {
        "statusCode": status_code,
        "headers": {
            "Content-Type": "application/json",
            "Access-Control-Allow-Origin": "*",
        },
        "body": json.dumps(body)
    }

def lambda_handler(event, context):
    try:
        body = json.loads(event.get('body', '{}'))
        model_name = body.get('model_name')
        version = body.get('version')
        
        if not model_name or not version:
            return create_response(400, {
                "error": "Missing required fields: model_name and version"
            })
        
        # First, unset is_production for all versions of this model
        response = table.query(
            KeyConditionExpression=Key('model_name').eq(model_name)
        )
        
        for item in response['Items']:
            if item.get('is_production'):
                table.update_item(
                    Key={
                        'model_name': model_name,
                        'version': item['version']
                    },
                    UpdateExpression='SET is_production = :false',
                    ExpressionAttributeValues={':false': False}
                )
        
        # Set the specified version as production
        table.update_item(
            Key={
                'model_name': model_name,
                'version': version
            },
            UpdateExpression='SET is_production = :true',
            ExpressionAttributeValues={':true': True}
        )
        
        return create_response(200, {
            "message": f"Model '{model_name}' version '{version}' marked as production"
        })
        
    except Exception as e:
        return create_response(500, {
            "error": str(e)
        })

Setting Up the Infrastructure

1. Create S3 Bucket

aws s3 mb s3://ml-model-registry

Optionally enable versioning on the bucket to keep a history of model artifacts:

aws s3api put-bucket-versioning \
    --bucket ml-model-registry \
    --versioning-configuration Status=Enabled

2. Create DynamoDB Table

aws dynamodb create-table \
    --table-name model-registry \
    --attribute-definitions \
        AttributeName=model_name,AttributeType=S \
        AttributeName=version,AttributeType=S \
    --key-schema \
        AttributeName=model_name,KeyType=HASH \
        AttributeName=version,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST

3. Deploy Lambda Functions

For each Lambda function, you’ll need to:

  1. Create IAM Role with permissions for S3 and DynamoDB:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:GeneratePresignedUrl"
      ],
      "Resource": "arn:aws:s3:::ml-model-registry/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:PutItem",
        "dynamodb:GetItem",
        "dynamodb:Query",
        "dynamodb:Scan",
        "dynamodb:UpdateItem"
      ],
      "Resource": "arn:aws:dynamodb:*:*:table/model-registry"
    }
  ]
}
  1. Set Environment Variables:

    • MODEL_BUCKET: ml-model-registry
    • TABLE_NAME: model-registry
    • URL_EXPIRATION: 3600 (for get_model function)
  2. Connect to API Gateway for HTTP endpoints:

    • POST /models - Register model
    • GET /models - List models
    • GET /models/{model_name} - Get specific model
    • PUT /models/production - Mark as production

Usage Examples

Register a Model

import requests
import base64

# Load your model file
with open('model.pkl', 'rb') as f:
    model_data = base64.b64encode(f.read()).decode('utf-8')

response = requests.post('https://your-api.execute-api.us-east-1.amazonaws.com/models', json={
    'model_name': 'sentiment-classifier',
    'model_data': model_data,
    'metrics': {
        'accuracy': 0.92,
        'f1_score': 0.89
    },
    'description': 'BERT-based sentiment classifier',
    'created_by': 'ml-team'
})

print(response.json())

List All Models

response = requests.get('https://your-api.execute-api.us-east-1.amazonaws.com/models')
models = response.json()
print(f"Found {models['count']} models")

Get a Specific Model

response = requests.get(
    'https://your-api.execute-api.us-east-1.amazonaws.com/models',
    params={'model_name': 'sentiment-classifier', 'version': '1.0'}
)
result = response.json()
download_url = result['download_url']

# Download the model
import urllib.request
urllib.request.urlretrieve(download_url, 'model.pkl')

Considerations and Improvements

Security

  • Use IAM roles with least-privilege access
  • Consider encrypting model artifacts in S3 (SSE-S3 or SSE-KMS)
  • Add authentication/authorization to API endpoints (API Keys, Cognito, etc.)
  • Validate and sanitize model names and versions

Scalability

  • For large model files, consider using multipart uploads to S3
  • Add pagination to the list models endpoint
  • Use DynamoDB GSI if you need to query by is_production or other attributes
  • Consider using S3 Transfer Acceleration for faster uploads

Additional Features

  • Model comparison: Compare metrics across versions
  • Automatic rollback: Rollback to previous version if new model performs worse
  • Model validation: Run tests before accepting a new model version
  • Lifecycle policies: Automatically archive old model versions to Glacier
  • Webhooks: Notify external systems when new models are registered

Conclusion

This model registry provides a solid foundation for managing ML models in production. By using S3 for storage and DynamoDB for metadata, you get a scalable, cost-effective solution that integrates well with existing AWS infrastructure.

The registry handles versioning, metadata tracking, and secure model retrieval, which are core requirements for production ML systems. While specialized platforms like MLflow or SageMaker Model Registry offer more features, this custom solution gives you full control and can be extended as your needs grow.

Thanks for reading!