Introduction
I originally set up my food classifier project with a single AWS Lambda function that took an image, ran inference, and returned predictions. However, I encountered frequent intermittent issues with back-end errors, long-running requests that exceeded API Gateway’s 30-second timeout and random failures under higher load.
I had also deployed the whole Lambda via ECR, so making quick changes for any request handling or CORS code in-line in the console was impossible, and required me to push the docker container again, which was an unnecessary hassle.
To address these problems, I decided to re-implement the entire pipeline using a more asynchronous architecture that splits responsibilities across three specialized Lambdas. This design not only handles longer-running inference tasks smoothly but also makes it easier to scale and debug each component.
In this post, I’ll walk you through the setup involving three AWS Lambda functions that work together to receive an image, queue it for inference, process it asynchronously, and allow the client to fetch the results once they’re ready. We’ll also look at how Amazon S3 stores the input and output, how Amazon SQS queues our processing jobs, and how to design a results fetcher Lambda to poll the final predictions.
Overview of Our Three Lambdas
-
Request Handler
- Receives an HTTP request with a Base64-encoded image (via API Gateway).
- Uploads the image to an S3 “input” bucket.
- Sends a message to an SQS queue with the job info (ID + S3 key).
- Returns an immediate response with a
job_id
so the client can later fetch results.
-
Inference Processing
- Triggered by an SQS event (your image job is in the queue).
- Pulls the image from S3, runs inference using a PyTorch model in Lambda.
- Writes the inference result (predictions) to an S3 “output” bucket.
- Marks the job as completed (so the results are now accessible).
-
Results Fetcher
- Receives an HTTP GET request with a
job_id
as a query parameter. - Looks in the S3 “output” bucket for the corresponding results file.
- If found, returns the inference results with a 200 status.
- If not found, returns a 202 status indicating the job is still processing.
- This allows clients to keep polling until the final predictions are available.
- Receives an HTTP GET request with a
Architecture Diagram
Below is a diagram illustrating how data flows through the redesigned architecture:
Step-by-Step Explanation
1. The Request Handler Lambda
Responsibility: Accept incoming requests (with a Base64 image), store the image in S3, enqueue a job in SQS, and return a job ID.
import json
import base64
import uuid
import os
import boto3
s3_client = boto3.client('s3')
sqs_client = boto3.client('sqs')
# Environment variables
S3_BUCKET = os.environ.get('INPUT_BUCKET', 'pk-food-classifier-input')
SQS_QUEUE_URL = os.environ.get('SQS_QUEUE_URL', 'https://sqs.us-east-1.amazonaws.com/061924184909/pk-food-classifier-inference-queue')
def lambda_handler(event, context):
try:
# 1) Decode the Base64-encoded image from the request
body = json.loads(event['body'])
image_b64 = body.get('image')
if not image_b64:
raise ValueError("Missing image data in request.")
image_data = base64.b64decode(image_b64)
# 2) Generate a unique job ID, then upload the image to S3
job_id = str(uuid.uuid4())
s3_key = f"input/{job_id}.jpg"
s3_client.put_object(
Bucket=S3_BUCKET,
Key=s3_key,
Body=image_data,
ContentType='image/jpeg'
)
# 3) Send a message to the SQS queue to trigger the next step
message = json.dumps({"job_id": job_id, "s3_key": s3_key})
sqs_client.send_message(
QueueUrl=SQS_QUEUE_URL,
MessageBody=message
)
# 4) Return an immediate response with the job ID
return {
"statusCode": 200,
"headers": {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Allow-Methods": "OPTIONS,POST"
},
"body": json.dumps({"message": "Job submitted", "job_id": job_id})
}
except Exception as e:
return {
"statusCode": 400,
"headers": {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Allow-Methods": "OPTIONS,POST"
},
"body": json.dumps({"error": str(e)})
}
Key Takeaways:
- We immediately return to the caller so the client isn’t blocked while inference happens.
- We store the original image in an “input” S3 bucket.
- We push a small JSON message (
job_id
,s3_key
) to SQS, which will wake up our inference Lambda.
2. The Inference Processing Lambda
Responsibility: Read messages from the SQS queue, fetch the image from the input bucket, run inference using a PyTorch model, and write results to the output bucket.
import json
import io
import os
import base64
import logging
import boto3
import torch
import torch.nn.functional as F
from torchvision import models, transforms
from PIL import Image
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
s3_client = boto3.client('s3')
# Environment variables
INPUT_BUCKET = os.environ.get('INPUT_BUCKET', 'pk-food-classifier-input')
OUTPUT_BUCKET = os.environ.get('OUTPUT_BUCKET', 'pk-food-classifier-output')
# Preprocessing
preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# Example class names
class_names = {...} # truncated for brevity
class InferenceModel:
def __init__(self):
self.model = None
self.device = torch.device("cpu")
def load_model(self):
if self.model is None:
model = models.mobilenet_v3_large(pretrained=False)
in_features = model.classifier[3].in_features
model.classifier[3] = torch.nn.Linear(in_features, 101)
model_path = os.path.join(os.getcwd(), "mobilenet_v3_food101.pt")
model.load_state_dict(torch.load(model_path, map_location=self.device))
model.eval()
self.model = model
return self.model
def predict_image(self, image_data, return_top_k=5):
model = self.load_model()
# Convert image data to PIL, then to tensor
if isinstance(image_data, str):
image_data = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_data)).convert("RGB")
input_tensor = preprocess(image).unsqueeze(0)
# Inference
with torch.no_grad():
outputs = model(input_tensor)
probabilities = F.softmax(outputs, dim=1)
top_k_prob, top_k_class = torch.topk(probabilities, return_top_k)
# Format predictions
top_k_prob = top_k_prob.cpu().numpy()[0] * 100
top_k_class = top_k_class.cpu().numpy()[0]
predictions = [
{
"class": class_names.get(str(idx), "unknown"),
"probability": float(prob)
}
for idx, prob in zip(top_k_class, top_k_prob)
]
return {
"predicted_class": predictions[0]["class"],
"top_5_predictions": predictions
}
# Create a global inference model instance
inference_model = InferenceModel()
def process_image_from_s3(bucket, key):
try:
response = s3_client.get_object(Bucket=bucket, Key=key)
return response['Body'].read()
except Exception as e:
logger.error(f"Error retrieving image from S3: {str(e)}")
raise
def create_response(status_code, body):
return {
"statusCode": status_code,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Credentials": True,
},
"body": json.dumps(body)
}
def lambda_handler(event, context):
try:
# Case 1: Direct API call with base64 (not always used in this architecture)
if event.get('body'):
body = json.loads(event['body'])
if "image" not in body:
return create_response(400, {"error": "Image data not found in request"})
predictions = inference_model.predict_image(body["image"])
return create_response(200, predictions)
# Case 2: Invoked by SQS event
elif event.get('Records'):
results = []
for record in event['Records']:
try:
message = json.loads(record['body'])
job_id = message.get("job_id")
s3_key = message.get("s3_key")
# Retrieve image and run inference
image_data = process_image_from_s3(INPUT_BUCKET, s3_key)
predictions = inference_model.predict_image(image_data)
# Build result object and store in output bucket
result = {
"job_id": job_id,
"status": "completed",
"predictions": predictions
}
result_key = f"results/{job_id}.json"
s3_client.put_object(
Bucket=OUTPUT_BUCKET,
Key=result_key,
Body=json.dumps(result),
ContentType='application/json'
)
results.append(result)
except Exception as e:
logger.error(f"Error processing record: {str(e)}")
results.append({
"job_id": message.get("job_id"),
"status": "failed",
"error": str(e)
})
return create_response(200, {"results": results})
else:
return create_response(400, {"error": "Invalid event type"})
except Exception as e:
logger.error(f"Unhandled error: {str(e)}")
return create_response(500, {"error": str(e)})
Key Takeaways:
- This Lambda is triggered by SQS messages (although you can call it directly if needed).
- PyTorch model is loaded inside the Lambda, and images are preprocessed before inference.
- Results (the predicted class and top-5 predictions) are stored in an S3 “output” bucket.
3. The Results Fetcher Lambda
Responsibility: Check if the results file exists in the “output” bucket. If yes, return it; if not, tell the client to wait (status 202).
import json
import os
import boto3
from botocore.exceptions import ClientError
s3_client = boto3.client('s3')
OUTPUT_BUCKET = os.environ.get('OUTPUT_BUCKET', 'pk-food-classifier-output')
def create_response(status_code, body):
return {
"statusCode": status_code,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Allow-Methods": "OPTIONS,GET"
},
"body": json.dumps(body)
}
def lambda_handler(event, context):
try:
query_parameters = event.get('queryStringParameters', {})
if not query_parameters or 'job_id' not in query_parameters:
return create_response(400, {
"error": "Missing job_id parameter"
})
job_id = query_parameters['job_id']
result_key = f"results/{job_id}.json"
try:
# Attempt to fetch the results file
response = s3_client.get_object(
Bucket=OUTPUT_BUCKET,
Key=result_key
)
result_data = json.loads(response['Body'].read().decode('utf-8'))
return create_response(200, result_data)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
# Not ready yet
return create_response(202, {
"message": "Processing",
"job_id": job_id,
"status": "PENDING"
})
else:
raise
except Exception as e:
return create_response(500, {
"error": str(e),
"status": "ERROR"
})
Key Takeaways:
- The client repeatedly calls this Lambda (e.g., every few seconds) with
GET /results?job_id=....
- If the S3 file doesn’t exist yet, the function returns a 202 Accepted status to indicate it’s still in progress.
- Once ready, the function returns a 200 status with the actual JSON results.
Why Split These Responsibilities?
API Gateway Timeout
API Gateway imposes a 30-second integration timeout. If inference might take longer, we need an asynchronous approach so the caller isn’t left hanging.
Separation of Concerns
- Request Handler does minimal work: just storing an image + enqueueing a job.
- Inference Processing does the CPU/GPU-intensive work; it’s decoupled and can be retried or scaled independently (e.g., through SQS settings).
- Results Fetcher simplifies returning final results in a “polling” pattern.
Scalability & Fault Tolerance
- If the inference Lambda fails, the SQS queue can handle retries or dead-letter queues.
- S3 ensures images and results are reliably stored, even if Lambdas have to restart.
S3 & SQS Considerations
-
S3 Buckets:
- An input bucket for storing raw images.
- An output bucket for storing inference JSON results.
-
SQS Queue:
- Connect your Request Handler to an SQS queue that triggers the Inference Processing Lambda.
- Make sure the Lambda function has permission to read messages from SQS, and ensure SQS has permission to invoke the Lambda.
-
Security & IAM:
- Each Lambda function should have IAM roles granting the necessary S3 and SQS permissions (and nothing more).
- Consider adding bucket policies to ensure only the correct IAM roles can access specific S3 paths.
Deploying & Testing
-
Create S3 Buckets
Set up two buckets in your AWS account: one for input images, one for output results. -
Create an SQS Queue
Use the AWS console or CLI to create an SQS queue.
Configure the Inference Processing Lambda as a subscriber (via an SQS trigger). -
Deploy Lambda Functions
- Request Handler: Hook up with API Gateway (
POST /image
). - Inference Processing: Triggered by SQS. Make sure to upload your
mobilenet_v3_food101.pt
model file in your Lambda package or set it up via a layer if it’s large. - Results Fetcher: Hook up with API Gateway (
GET /results
).
- Request Handler: Hook up with API Gateway (
-
End-to-End Test
- Make a
POST
request to/image
with a Base64-encoded image. You’ll get back ajob_id
. - Poll
GET /results?job_id=<YOUR_JOB_ID>
until you get a200
with predictions instead of202
.
- Make a
Conclusion
By splitting image inference across three specialized Lambdas, we:
- Offload heavy work to an asynchronous process to avoid timeouts.
- Keep each Lambda’s code focused on a single responsibility.
- Improve resilience and scalability with the combination of S3 and SQS.
The client only needs to handle two endpoints: one to submit the image (POST /image
) and one to poll the final results (GET /results?job_id=XYZ
). This pattern is powerful for any job requiring more than a few seconds of processing, from ML model predictions to large data transformations.
That’s it for this walkthrough! If you have any questions about structuring your serverless inference pipelines, feel free to reach out.
Thanks for reading!