AWS SQS/SNS完全ガイド
AWS SQS/SNS完全ガイド
Section titled “AWS SQS/SNS完全ガイド”AWS SQS(Simple Queue Service)とSNS(Simple Notification Service)の実践的な使い方を、実務で使える実装例とベストプラクティスとともに詳しく解説します。
1. AWS SQS/SNSとは
Section titled “1. AWS SQS/SNSとは”SQS(Simple Queue Service)
Section titled “SQS(Simple Queue Service)”SQSは、メッセージキューサービスです。非同期処理とサービス間通信を実現します。
SQSの特徴 ├─ 完全マネージドサービス ├─ 自動スケーリング ├─ 高可用性(99.99%) ├─ メッセージの永続化 └─ Dead Letter Queue対応SNS(Simple Notification Service)
Section titled “SNS(Simple Notification Service)”SNSは、パブリッシュ/サブスクライブメッセージングサービスです。複数のエンドポイントにメッセージを配信します。
SNSの特徴 ├─ 完全マネージドサービス ├─ 複数のプロトコル対応(HTTP、HTTPS、Email、SMS、SQS、Lambda) ├─ 高可用性(99.99%) ├─ 自動リトライ └─ フィルタリング対応2. SQSの基本操作
Section titled “2. SQSの基本操作”キューの作成
Section titled “キューの作成”# AWS CLIでのキュー作成aws sqs create-queue \ --queue-name my-queue \ --attributes \ VisibilityTimeout=30,MessageRetentionPeriod=345600
# キューのURL取得aws sqs get-queue-url --queue-name my-queueメッセージの送信
Section titled “メッセージの送信”# Pythonの例(boto3)import boto3import json
sqs = boto3.client('sqs', region_name='us-east-1')queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue'
# メッセージの送信response = sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps({ 'order_id': '12345', 'user_id': 'user-123', 'amount': 100.00 }), MessageAttributes={ 'OrderType': { 'StringValue': 'Standard', 'DataType': 'String' } })
print(f"MessageId: {response['MessageId']}")メッセージの受信
Section titled “メッセージの受信”# Pythonの例import boto3import json
sqs = boto3.client('sqs', region_name='us-east-1')queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue'
# メッセージの受信response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=20, # Long Polling MessageAttributeNames=['All'])
if 'Messages' in response: for message in response['Messages']: body = json.loads(message['Body']) receipt_handle = message['ReceiptHandle']
# メッセージの処理 process_message(body)
# メッセージの削除 sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle )3. SQSのキュータイプ
Section titled “3. SQSのキュータイプ”Standard Queue(標準キュー)
Section titled “Standard Queue(標準キュー)”# Standard Queueの特徴# - 無制限のスループット# - 少なくとも1回の配信保証# - ベストエフォート順序
sqs = boto3.client('sqs')queue_url = sqs.create_queue(QueueName='standard-queue')['QueueUrl']FIFO Queue(先入先出キュー)
Section titled “FIFO Queue(先入先出キュー)”# FIFO Queueの特徴# - 厳密な順序保証# - 正確に1回の配信保証# - 300メッセージ/秒のスループット
sqs = boto3.client('sqs')queue_url = sqs.create_queue( QueueName='fifo-queue.fifo', Attributes={ 'FifoQueue': 'true', 'ContentBasedDeduplication': 'true' })['QueueUrl']
# メッセージグループIDの指定sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps({'data': 'message'}), MessageGroupId='group-1', MessageDeduplicationId='dedup-1')4. Dead Letter Queue(DLQ)
Section titled “4. Dead Letter Queue(DLQ)”DLQの設定
Section titled “DLQの設定”# DLQの作成dlq_url = sqs.create_queue( QueueName='my-queue-dlq', Attributes={ 'MessageRetentionPeriod': '1209600' # 14日 })['QueueUrl']
dlq_arn = sqs.get_queue_attributes( QueueUrl=dlq_url, AttributeNames=['QueueArn'])['Attributes']['QueueArn']
# メインキューにDLQを設定main_queue_url = sqs.create_queue( QueueName='my-queue', Attributes={ 'RedrivePolicy': json.dumps({ 'deadLetterTargetArn': dlq_arn, 'maxReceiveCount': 3 }) })['QueueUrl']5. SNSの基本操作
Section titled “5. SNSの基本操作”Topicの作成
Section titled “Topicの作成”# Pythonの例(boto3)import boto3
sns = boto3.client('sns', region_name='us-east-1')
# Topicの作成response = sns.create_topic(Name='my-topic')topic_arn = response['TopicArn']
print(f"TopicArn: {topic_arn}")メッセージのパブリッシュ
Section titled “メッセージのパブリッシュ”# メッセージのパブリッシュresponse = sns.publish( TopicArn=topic_arn, Message=json.dumps({ 'default': json.dumps({ 'order_id': '12345', 'status': 'completed' }), 'email': 'Order #12345 has been completed.', 'sms': 'Your order #12345 is ready!' }), MessageStructure='json', Subject='Order Update')
print(f"MessageId: {response['MessageId']}")サブスクリプションの作成
Section titled “サブスクリプションの作成”# Emailサブスクリプションsubscription = sns.subscribe( TopicArn=topic_arn, Protocol='email', Endpoint='user@example.com')
# SQSサブスクリプションsqs_subscription = sns.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint=sqs_queue_arn)
# Lambdaサブスクリプションlambda_subscription = sns.subscribe( TopicArn=topic_arn, Protocol='lambda', Endpoint=lambda_function_arn)6. SNSのフィルタリング
Section titled “6. SNSのフィルタリング”メッセージフィルター
Section titled “メッセージフィルター”# フィルターポリシーの設定filter_policy = { 'order_type': ['Standard', 'Premium'], 'amount': [{'numeric': ['>=', 100]}]}
sns.set_subscription_attributes( SubscriptionArn=subscription_arn, AttributeName='FilterPolicy', AttributeValue=json.dumps(filter_policy))
# フィルターに一致するメッセージのみ配信sns.publish( TopicArn=topic_arn, Message=json.dumps({ 'order_id': '12345', 'order_type': 'Standard', 'amount': 150 }), MessageAttributes={ 'order_type': { 'DataType': 'String', 'StringValue': 'Standard' }, 'amount': { 'DataType': 'Number', 'StringValue': '150' } })7. SQSとSNSの連携
Section titled “7. SQSとSNSの連携”SNS → SQS
Section titled “SNS → SQS”# SNS Topicの作成topic_arn = sns.create_topic(Name='order-topic')['TopicArn']
# SQS Queueの作成queue_url = sqs.create_queue(QueueName='order-queue')['QueueUrl']queue_arn = sqs.get_queue_attributes( QueueUrl=queue_url, AttributeNames=['QueueArn'])['Attributes']['QueueArn']
# SQS QueueにSNSからのアクセスを許可sqs.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'Policy': json.dumps({ 'Version': '2012-10-17', 'Statement': [{ 'Effect': 'Allow', 'Principal': {'Service': 'sns.amazonaws.com'}, 'Action': 'SQS:SendMessage', 'Resource': queue_arn, 'Condition': { 'ArnEquals': { 'aws:SourceArn': topic_arn } } }] }) })
# SNSからSQSへのサブスクリプションsns.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint=queue_arn)8. CloudFormationでの設定
Section titled “8. CloudFormationでの設定”SQS Queue
Section titled “SQS Queue”Resources: MyQueue: Type: AWS::SQS::Queue Properties: QueueName: my-queue VisibilityTimeoutSeconds: 30 MessageRetentionPeriod: 345600 ReceiveMessageWaitTimeSeconds: 20 RedrivePolicy: deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn maxReceiveCount: 3 Tags: - Key: Environment Value: Production
DeadLetterQueue: Type: AWS::SQS::Queue Properties: QueueName: my-queue-dlq MessageRetentionPeriod: 1209600SNS Topic
Section titled “SNS Topic”Resources: MyTopic: Type: AWS::SNS::Topic Properties: TopicName: my-topic DisplayName: My Topic Subscription: - Endpoint: user@example.com Protocol: email - Endpoint: !GetAtt MyQueue.Arn Protocol: sqs
MyQueue: Type: AWS::SQS::Queue Properties: QueueName: my-queue
QueuePolicy: Type: AWS::SQS::QueuePolicy Properties: Queues: - !Ref MyQueue PolicyDocument: Statement: - Effect: Allow Principal: Service: sns.amazonaws.com Action: SQS:SendMessage Resource: !GetAtt MyQueue.Arn Condition: ArnEquals: aws:SourceArn: !Ref MyTopic9. Lambdaとの連携
Section titled “9. Lambdaとの連携”SQS → Lambda
Section titled “SQS → Lambda”# Lambda関数import jsonimport boto3
def lambda_handler(event, context): for record in event['Records']: body = json.loads(record['body'])
# メッセージの処理 process_message(body)
return { 'statusCode': 200, 'body': json.dumps('Success') }# CloudFormationResources: ProcessQueueFunction: Type: AWS::Lambda::Function Properties: FunctionName: process-queue Runtime: python3.9 Handler: index.lambda_handler Code: ZipFile: | def lambda_handler(event, context): return {'statusCode': 200} Events: SQSQueue: Type: SQS Properties: Queue: !GetAtt MyQueue.Arn BatchSize: 10SNS → Lambda
Section titled “SNS → Lambda”# Lambda関数import json
def lambda_handler(event, context): for record in event['Records']: sns_message = json.loads(record['Sns']['Message'])
# メッセージの処理 process_sns_message(sns_message)
return { 'statusCode': 200, 'body': json.dumps('Success') }10. 実践的なベストプラクティス
Section titled “10. 実践的なベストプラクティス”可視性タイムアウトの設定
Section titled “可視性タイムアウトの設定”# 処理時間に応じた可視性タイムアウトの設定# 処理時間の2倍程度に設定sqs.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'VisibilityTimeout': '60' # 30秒の処理なら60秒 })# 複数メッセージの一括送信entries = [ { 'Id': '1', 'MessageBody': json.dumps({'id': 1}) }, { 'Id': '2', 'MessageBody': json.dumps({'id': 2}) }]
sqs.send_message_batch( QueueUrl=queue_url, Entries=entries)エラーハンドリング
Section titled “エラーハンドリング”import boto3from botocore.exceptions import ClientError
def send_message_safely(queue_url, message): try: response = sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps(message) ) return response['MessageId'] except ClientError as e: print(f"Error sending message: {e}") # リトライロジック raise11. 監視とアラート
Section titled “11. 監視とアラート”CloudWatchメトリクス
Section titled “CloudWatchメトリクス”# SQSメトリクスの確認import boto3
cloudwatch = boto3.client('cloudwatch')
# キューのメッセージ数response = cloudwatch.get_metric_statistics( Namespace='AWS/SQS', MetricName='ApproximateNumberOfMessages', Dimensions=[ { 'Name': 'QueueName', 'Value': 'my-queue' } ], StartTime=datetime.utcnow() - timedelta(hours=1), EndTime=datetime.utcnow(), Period=300, Statistics=['Average'])CloudWatchアラーム
Section titled “CloudWatchアラーム”Resources: QueueDepthAlarm: Type: AWS::CloudWatch::Alarm Properties: AlarmName: HighQueueDepth MetricName: ApproximateNumberOfMessages Namespace: AWS/SQS Statistic: Average Period: 300 EvaluationPeriods: 1 Threshold: 1000 ComparisonOperator: GreaterThanThreshold Dimensions: - Name: QueueName Value: my-queue AlarmActions: - !Ref SNSTopic12. よくある問題と解決方法
Section titled “12. よくある問題と解決方法”問題1: メッセージの重複処理
Section titled “問題1: メッセージの重複処理”# 解決: 冪等性の実装processed_ids = set()
def process_message(message): message_id = message.get('id')
if message_id in processed_ids: return # 既に処理済み
# 処理 do_work(message) processed_ids.add(message_id)問題2: メッセージの損失
Section titled “問題2: メッセージの損失”# 解決: DLQの設定と適切な確認応答# DLQを設定し、maxReceiveCountを適切に設定# 処理が成功したら必ずメッセージを削除問題3: パフォーマンスの低下
Section titled “問題3: パフォーマンスの低下”# 解決: バッチ処理とLong Pollingsqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=10, # バッチサイズ WaitTimeSeconds=20 # Long Polling)AWS SQS/SNS完全ガイドのポイント:
- SQS: Standard Queue、FIFO Queue、DLQ
- SNS: Topic、Subscription、フィルタリング
- 連携: SNS → SQS、SQS → Lambda、SNS → Lambda
- CloudFormation: インフラのコード化
- ベストプラクティス: 可視性タイムアウト、バッチ処理、エラーハンドリング
- 監視: CloudWatchメトリクスとアラーム
適切なSQS/SNSの使用により、スケーラブルで信頼性の高いメッセージングシステムを構築できます。