Introduction
Building a production-grade AI chatbot that handles millions of conversations across multiple channels requires more than just plugging in an LLM. I architected a system that processes 5M+ monthly conversations across WhatsApp Business API and Facebook Messenger with 99.9% uptime.
The system delivers:
- Sub-2-second response times at peak load
- 99.9% message delivery across all channels
- Context-aware conversations with state management
- Horizontal scalability handling 10K concurrent users
Architecture Overview
flowchart TB
subgraph Channels["Message Channels"]
WA[WhatsApp Business API]
FB[Facebook Messenger]
WEB[Web Widget]
end
subgraph Ingestion["Message Ingestion"]
APIGW[API Gateway]
AUTH[Auth Layer]
WEBHOOK[Webhook Handler]
end
subgraph Processing["Message Processing"]
SQS_IN[SQS: Incoming Queue]
LAMBDA_PROC[Lambda: Message Processor]
SQS_PRIORITY[SQS: Priority Queue]
end
subgraph AI["AI Engine"]
BEDROCK[AWS Bedrock<br/>Claude/Titan]
RAG[RAG with OpenSearch]
PROMPT[Prompt Manager]
end
subgraph State["State Management"]
DDB[DynamoDB: Conversations]
CACHE[ElastiCache: Session Cache]
S3[S3: Message Archive]
end
subgraph Delivery["Message Delivery"]
LAMBDA_SEND[Lambda: Message Sender]
SQS_OUT[SQS: Outgoing Queue]
KINESIS[Kinesis: Analytics Stream]
end
Channels --> APIGW
APIGW --> AUTH
AUTH --> WEBHOOK
WEBHOOK --> SQS_IN
SQS_IN --> LAMBDA_PROC
LAMBDA_PROC --> DDB
LAMBDA_PROC --> CACHE
LAMBDA_PROC --> BEDROCK
BEDROCK --> RAG
RAG --> PROMPT
LAMBDA_PROC --> SQS_OUT
SQS_OUT --> LAMBDA_SEND
LAMBDA_SEND --> Channels
LAMBDA_PROC --> KINESIS
LAMBDA_SEND --> S3
style Channels fill:#1a1a2e,stroke:#00d9ff,stroke-width:2px,color:#fff
style Processing fill:#264653,stroke:#2a9d8f,stroke-width:2px,color:#fff
style AI fill:#f77f00,stroke:#fff,stroke-width:2px,color:#fff
style State fill:#9b5de5,stroke:#fff,stroke-width:2px,color:#fff
style Delivery fill:#2a9d8f,stroke:#fff,stroke-width:2px,color:#fff
API Gateway & Webhook Handler
// lambda/webhook-handler/index.ts
import { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda';
import { SQS } from 'aws-sdk';
import crypto from 'crypto';
const sqs = new SQS();
const QUEUE_URL = process.env.INCOMING_QUEUE_URL!;
interface WhatsAppMessage {
object: string;
entry: Array<{
id: string;
changes: Array<{
value: {
messaging_product: string;
metadata: { phone_number_id: string };
messages?: Array<{
from: string;
id: string;
timestamp: string;
type: string;
text?: { body: string };
}>;
};
}>;
}>;
}
export const handler = async (
event: APIGatewayProxyEvent
): Promise<APIGatewayProxyResult> => {
// Webhook verification (GET request)
if (event.httpMethod === 'GET') {
return verifyWebhook(event);
}
// Message processing (POST request)
if (event.httpMethod === 'POST') {
return processIncomingMessage(event);
}
return {
statusCode: 405,
body: JSON.stringify({ error: 'Method not allowed' })
};
};
function verifyWebhook(event: APIGatewayProxyEvent): APIGatewayProxyResult {
const params = event.queryStringParameters || {};
const mode = params['hub.mode'];
const token = params['hub.verify_token'];
const challenge = params['hub.challenge'];
if (mode === 'subscribe' && token === process.env.VERIFY_TOKEN) {
console.log('Webhook verified');
return {
statusCode: 200,
body: challenge || ''
};
}
return {
statusCode: 403,
body: JSON.stringify({ error: 'Verification failed' })
};
}
async function processIncomingMessage(
event: APIGatewayProxyEvent
): Promise<APIGatewayProxyResult> {
try {
// Verify signature
if (!verifySignature(event)) {
return {
statusCode: 401,
body: JSON.stringify({ error: 'Invalid signature' })
};
}
const payload: WhatsAppMessage = JSON.parse(event.body || '{}');
// Extract messages
for (const entry of payload.entry || []) {
for (const change of entry.changes || []) {
const messages = change.value.messages || [];
for (const message of messages) {
await enqueueMessage({
messageId: message.id,
from: message.from,
timestamp: message.timestamp,
channel: 'whatsapp',
type: message.type,
content: message.text?.body || '',
metadata: {
phoneNumberId: change.value.metadata.phone_number_id
}
});
}
}
}
return {
statusCode: 200,
body: JSON.stringify({ status: 'ok' })
};
} catch (error) {
console.error('Error processing message:', error);
return {
statusCode: 500,
body: JSON.stringify({ error: 'Internal server error' })
};
}
}
async function enqueueMessage(message: any): Promise<void> {
await sqs.sendMessage({
QueueUrl: QUEUE_URL,
MessageBody: JSON.stringify(message),
MessageAttributes: {
channel: {
DataType: 'String',
StringValue: message.channel
},
priority: {
DataType: 'Number',
StringValue: determinePriority(message).toString()
}
}
}).promise();
}
function verifySignature(event: APIGatewayProxyEvent): boolean {
const signature = event.headers['x-hub-signature-256'];
if (!signature) return false;
const expectedSignature = crypto
.createHmac('sha256', process.env.APP_SECRET!)
.update(event.body || '')
.digest('hex');
return signature === `sha256=${expectedSignature}`;
}
function determinePriority(message: any): number {
// VIP users get priority
if (isVIPUser(message.from)) return 1;
// Payment-related messages get priority
if (containsPaymentKeywords(message.content)) return 2;
return 5; // Default priority
}Message Processor with Bedrock
// lambda/message-processor/index.ts
import { SQSEvent } from 'aws-lambda';
import {
BedrockRuntimeClient,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime';
import { DynamoDB } from 'aws-sdk';
const bedrock = new BedrockRuntimeClient({ region: 'us-east-1' });
const dynamodb = new DynamoDB.DocumentClient();
const CONVERSATIONS_TABLE = process.env.CONVERSATIONS_TABLE!;
const MODEL_ID = 'anthropic.claude-3-sonnet-20240229-v1:0';
interface ConversationContext {
userId: string;
channel: string;
messages: Array<{
role: 'user' | 'assistant';
content: string;
timestamp: number;
}>;
metadata: {
userName?: string;
language?: string;
lastIntent?: string;
};
}
export const handler = async (event: SQSEvent): Promise<void> => {
const promises = event.Records.map(record => processMessage(record));
await Promise.all(promises);
};
async function processMessage(record: any): Promise<void> {
const message = JSON.parse(record.body);
try {
// Load conversation context
const context = await loadConversationContext(
message.from,
message.channel
);
// Add user message to context
context.messages.push({
role: 'user',
content: message.content,
timestamp: Date.now()
});
// Generate AI response
const response = await generateAIResponse(context, message);
// Add assistant message to context
context.messages.push({
role: 'assistant',
content: response.text,
timestamp: Date.now()
});
// Update metadata
context.metadata.lastIntent = response.intent;
// Save conversation context
await saveConversationContext(context);
// Send response to user
await sendResponse(message, response.text);
// Track analytics
await trackConversation(message, response);
} catch (error) {
console.error('Error processing message:', error);
await sendErrorResponse(message);
}
}
async function generateAIResponse(
context: ConversationContext,
message: any
): Promise<{ text: string; intent: string }> {
// Build prompt with conversation history
const prompt = buildPrompt(context, message);
const command = new InvokeModelCommand({
modelId: MODEL_ID,
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 1024,
temperature: 0.7,
messages: prompt,
system: getSystemPrompt(context)
})
});
const response = await bedrock.send(command);
const responseBody = JSON.parse(
new TextDecoder().decode(response.body)
);
return {
text: responseBody.content[0].text,
intent: extractIntent(responseBody.content[0].text)
};
}
function buildPrompt(
context: ConversationContext,
message: any
): Array<{ role: string; content: string }> {
// Include last N messages for context
const recentMessages = context.messages.slice(-10);
return recentMessages.map(msg => ({
role: msg.role,
content: msg.content
}));
}
function getSystemPrompt(context: ConversationContext): string {
return `You are a helpful AI assistant for our e-commerce platform.
Your capabilities:
- Answer product questions
- Help with order tracking
- Process returns and refunds
- Provide customer support
User context:
- Name: ${context.metadata.userName || 'Customer'}
- Language: ${context.metadata.language || 'en'}
- Channel: ${context.channel}
Guidelines:
- Be concise and friendly
- Use the user's language
- Offer to escalate to human agent if needed
- Never share personal data
- Stay within your knowledge base`;
}
async function loadConversationContext(
userId: string,
channel: string
): Promise<ConversationContext> {
const result = await dynamodb.get({
TableName: CONVERSATIONS_TABLE,
Key: { userId, channel }
}).promise();
if (result.Item) {
return result.Item as ConversationContext;
}
// New conversation
return {
userId,
channel,
messages: [],
metadata: {}
};
}
async function saveConversationContext(
context: ConversationContext
): Promise<void> {
// Keep only last 50 messages to manage DynamoDB item size
if (context.messages.length > 50) {
context.messages = context.messages.slice(-50);
}
await dynamodb.put({
TableName: CONVERSATIONS_TABLE,
Item: {
...context,
updatedAt: Date.now(),
ttl: Math.floor(Date.now() / 1000) + (30 * 24 * 60 * 60) // 30 days
}
}).promise();
}DynamoDB Schema
# dynamodb/conversations.tf
resource "aws_dynamodb_table" "conversations" {
name = "chatbot-conversations"
billing_mode = "PAY_PER_REQUEST"
hash_key = "userId"
range_key = "channel"
attribute {
name = "userId"
type = "S"
}
attribute {
name = "channel"
type = "S"
}
attribute {
name = "lastMessageTime"
type = "N"
}
# GSI for querying recent conversations
global_secondary_index {
name = "RecentConversationsIndex"
hash_key = "channel"
range_key = "lastMessageTime"
projection_type = "ALL"
}
# TTL for automatic cleanup
ttl {
attribute_name = "ttl"
enabled = true
}
point_in_time_recovery {
enabled = true
}
tags = {
Service = "chatbot"
DataType = "conversations"
}
}
# Message archive table
resource "aws_dynamodb_table" "message_archive" {
name = "chatbot-message-archive"
billing_mode = "PAY_PER_REQUEST"
hash_key = "messageId"
range_key = "timestamp"
attribute {
name = "messageId"
type = "S"
}
attribute {
name = "timestamp"
type = "N"
}
attribute {
name = "userId"
type = "S"
}
global_secondary_index {
name = "UserMessagesIndex"
hash_key = "userId"
range_key = "timestamp"
projection_type = "ALL"
}
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
ttl {
attribute_name = "ttl"
enabled = true
}
tags = {
Service = "chatbot"
DataType = "messages"
}
}RAG with OpenSearch
// lambda/rag-retrieval/index.ts
import { Client } from '@opensearch-project/opensearch';
import { AwsSigv4Signer } from '@opensearch-project/opensearch/aws';
const client = new Client({
...AwsSigv4Signer({
region: process.env.AWS_REGION!,
service: 'es'
}),
node: process.env.OPENSEARCH_ENDPOINT!
});
interface Document {
id: string;
content: string;
metadata: {
category: string;
tags: string[];
lastUpdated: number;
};
}
export async function retrieveRelevantDocuments(
query: string,
topK: number = 5
): Promise<Document[]> {
const response = await client.search({
index: 'knowledge-base',
body: {
query: {
multi_match: {
query: query,
fields: ['content^2', 'metadata.tags', 'metadata.category'],
type: 'best_fields',
fuzziness: 'AUTO'
}
},
size: topK,
_source: ['content', 'metadata']
}
});
return response.body.hits.hits.map((hit: any) => ({
id: hit._id,
content: hit._source.content,
metadata: hit._source.metadata
}));
}
export async function enhancePromptWithRAG(
userQuery: string,
conversationContext: string
): Promise<string> {
// Retrieve relevant documents
const documents = await retrieveRelevantDocuments(userQuery);
if (documents.length === 0) {
return conversationContext;
}
// Build enhanced context
const ragContext = documents
.map((doc, idx) => `[Document ${idx + 1}]\n${doc.content}`)
.join('\n\n');
return `${conversationContext}
Relevant information from knowledge base:
${ragContext}
Please use the above information to answer the user's question accurately.`;
}Message Flow with Priority Queue
sequenceDiagram
participant User as User (WhatsApp)
participant APIGW as API Gateway
participant Webhook as Webhook Handler
participant SQS as SQS Queue
participant Processor as Message Processor
participant Bedrock as AWS Bedrock
participant DDB as DynamoDB
participant Sender as Message Sender
User->>APIGW: Send message
APIGW->>Webhook: Forward request
Webhook->>Webhook: Verify signature
Webhook->>SQS: Enqueue message
Note over Webhook,SQS: Priority based on:<br/>- User tier<br/>- Message type
Webhook-->>APIGW: 200 OK
APIGW-->>User: Acknowledged
SQS->>Processor: Poll message
Processor->>DDB: Load conversation context
DDB-->>Processor: Context data
Processor->>Bedrock: Generate response
Note over Bedrock: Claude 3 Sonnet<br/>with conversation history
Bedrock-->>Processor: AI response
Processor->>DDB: Update conversation
Processor->>SQS: Enqueue outgoing message
SQS->>Sender: Poll message
Sender->>User: Send response
Note over User,Sender: Total time: ~1.5s
Scaling Configuration
# lambda/autoscaling.tf
resource "aws_lambda_function" "message_processor" {
filename = "message_processor.zip"
function_name = "chatbot-message-processor"
role = aws_iam_role.lambda_exec.arn
handler = "index.handler"
runtime = "nodejs18.x"
timeout = 30
memory_size = 1024
reserved_concurrent_executions = 500
environment {
variables = {
BEDROCK_MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"
CONVERSATIONS_TABLE = aws_dynamodb_table.conversations.name
OPENSEARCH_ENDPOINT = aws_opensearchserverless_collection.kb.endpoint
OUTGOING_QUEUE_URL = aws_sqs_queue.outgoing.url
}
}
vpc_config {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.lambda.id]
}
}
# SQS event source mapping with batch configuration
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
event_source_arn = aws_sqs_queue.incoming.arn
function_name = aws_lambda_function.message_processor.arn
batch_size = 10
maximum_batching_window_in_seconds = 5
scaling_config {
maximum_concurrency = 100
}
function_response_types = ["ReportBatchItemFailures"]
}
# CloudWatch alarms for scaling
resource "aws_cloudwatch_metric_alarm" "high_queue_depth" {
alarm_name = "chatbot-high-queue-depth"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 60
statistic = "Average"
threshold = 1000
alarm_description = "Queue depth is too high"
dimensions = {
QueueName = aws_sqs_queue.incoming.name
}
alarm_actions = [aws_sns_topic.alerts.arn]
}Multi-Channel Message Sender
// lambda/message-sender/index.ts
import axios from 'axios';
import { SQS } from 'aws-sdk';
const sqs = new SQS();
interface OutgoingMessage {
userId: string;
channel: 'whatsapp' | 'messenger' | 'web';
content: string;
messageId: string;
}
export const handler = async (event: any): Promise<void> => {
const promises = event.Records.map((record: any) =>
sendMessage(JSON.parse(record.body))
);
await Promise.all(promises);
};
async function sendMessage(message: OutgoingMessage): Promise<void> {
try {
switch (message.channel) {
case 'whatsapp':
await sendWhatsAppMessage(message);
break;
case 'messenger':
await sendMessengerMessage(message);
break;
case 'web':
await sendWebMessage(message);
break;
}
console.log(`Message sent successfully: ${message.messageId}`);
} catch (error) {
console.error(`Failed to send message: ${message.messageId}`, error);
// Retry logic
await retryMessage(message);
}
}
async function sendWhatsAppMessage(message: OutgoingMessage): Promise<void> {
const WHATSAPP_API_URL = process.env.WHATSAPP_API_URL!;
const ACCESS_TOKEN = process.env.WHATSAPP_ACCESS_TOKEN!;
await axios.post(
`${WHATSAPP_API_URL}/messages`,
{
messaging_product: 'whatsapp',
recipient_type: 'individual',
to: message.userId,
type: 'text',
text: {
body: message.content
}
},
{
headers: {
'Authorization': `Bearer ${ACCESS_TOKEN}`,
'Content-Type': 'application/json'
}
}
);
}
async function sendMessengerMessage(message: OutgoingMessage): Promise<void> {
const MESSENGER_API_URL = 'https://graph.facebook.com/v18.0/me/messages';
const ACCESS_TOKEN = process.env.MESSENGER_ACCESS_TOKEN!;
await axios.post(
MESSENGER_API_URL,
{
recipient: { id: message.userId },
message: { text: message.content }
},
{
headers: {
'Authorization': `Bearer ${ACCESS_TOKEN}`,
'Content-Type': 'application/json'
}
}
);
}
async function retryMessage(message: OutgoingMessage): Promise<void> {
// Implement exponential backoff
await sqs.sendMessage({
QueueUrl: process.env.DLQ_URL!,
MessageBody: JSON.stringify(message),
DelaySeconds: 60
}).promise();
}Performance Metrics
flowchart TB
subgraph Metrics["System Performance"]
M1["Response Time<br/>P50: 1.2s<br/>P95: 2.8s<br/>P99: 4.5s"]
M2["Throughput<br/>5M msgs/month<br/>~2000 msgs/min peak"]
M3["Availability<br/>99.9% uptime<br/>< 45min downtime/month"]
M4["Cost<br/>$0.008 per conversation<br/>$40K/month total"]
end
subgraph Scale["Scaling Capacity"]
S1["10K concurrent users"]
S2["500 Lambda concurrent executions"]
S3["100K msgs/hour sustained"]
end
style Metrics fill:#2a9d8f,stroke:#fff,stroke-width:2px,color:#fff
style Scale fill:#f77f00,stroke:#fff,stroke-width:2px,color:#fff
Cost Optimization
| Component | Monthly Cost | Optimization |
|---|---|---|
| Bedrock API calls | $25,000 | Prompt caching, batch inference |
| Lambda | $8,000 | Right-sized memory, reserved concurrency |
| DynamoDB | $4,000 | On-demand billing, TTL cleanup |
| SQS | $500 | Message batching |
| Data transfer | $2,500 | CloudFront for static assets |
| Total | $40,000 | ~$0.008 per conversation |
Best Practices
| Practice | Implementation | Benefit |
|---|---|---|
| Async processing | SQS + Lambda | Handle spikes |
| Context management | DynamoDB with TTL | Maintain conversation flow |
| RAG integration | OpenSearch + Bedrock | Accurate responses |
| Multi-channel support | Abstract sender interface | Easy to add channels |
| Monitoring | CloudWatch + X-Ray | Observability |
| Cost control | Prompt optimization | 30% cost reduction |
Conclusion
Building a production-ready AI chatbot system requires careful architecture beyond just integrating an LLM. The combination of:
- AWS Bedrock for conversational AI
- SQS for reliable message queuing
- DynamoDB for conversation state
- Lambda for serverless scalability
- OpenSearch for RAG capabilities
Creates a system that handles millions of conversations monthly while maintaining sub-2-second response times and 99.9% availability. The key is treating messaging as an event-driven, asynchronous system with proper state management and scaling controls.