Skip to content

AWS SQS/SNS完全ガイド

AWS SQS(Simple Queue Service)とSNS(Simple Notification Service)の実践的な使い方を、実務で使える実装例とベストプラクティスとともに詳しく解説します。

SQSは、メッセージキューサービスです。非同期処理とサービス間通信を実現します。

SQSの特徴
├─ 完全マネージドサービス
├─ 自動スケーリング
├─ 高可用性(99.99%)
├─ メッセージの永続化
└─ Dead Letter Queue対応

SNSは、パブリッシュ/サブスクライブメッセージングサービスです。複数のエンドポイントにメッセージを配信します。

SNSの特徴
├─ 完全マネージドサービス
├─ 複数のプロトコル対応(HTTP、HTTPS、Email、SMS、SQS、Lambda)
├─ 高可用性(99.99%)
├─ 自動リトライ
└─ フィルタリング対応
Terminal window
# AWS CLIでのキュー作成
aws sqs create-queue \
--queue-name my-queue \
--attributes \
VisibilityTimeout=30,MessageRetentionPeriod=345600
# キューのURL取得
aws sqs get-queue-url --queue-name my-queue
# Pythonの例(boto3)
import boto3
import json
sqs = boto3.client('sqs', region_name='us-east-1')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue'
# メッセージの送信
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
'order_id': '12345',
'user_id': 'user-123',
'amount': 100.00
}),
MessageAttributes={
'OrderType': {
'StringValue': 'Standard',
'DataType': 'String'
}
}
)
print(f"MessageId: {response['MessageId']}")
# Pythonの例
import boto3
import json
sqs = boto3.client('sqs', region_name='us-east-1')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue'
# メッセージの受信
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20, # Long Polling
MessageAttributeNames=['All']
)
if 'Messages' in response:
for message in response['Messages']:
body = json.loads(message['Body'])
receipt_handle = message['ReceiptHandle']
# メッセージの処理
process_message(body)
# メッセージの削除
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
# Standard Queueの特徴
# - 無制限のスループット
# - 少なくとも1回の配信保証
# - ベストエフォート順序
sqs = boto3.client('sqs')
queue_url = sqs.create_queue(QueueName='standard-queue')['QueueUrl']
# FIFO Queueの特徴
# - 厳密な順序保証
# - 正確に1回の配信保証
# - 300メッセージ/秒のスループット
sqs = boto3.client('sqs')
queue_url = sqs.create_queue(
QueueName='fifo-queue.fifo',
Attributes={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true'
}
)['QueueUrl']
# メッセージグループIDの指定
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({'data': 'message'}),
MessageGroupId='group-1',
MessageDeduplicationId='dedup-1'
)
# DLQの作成
dlq_url = sqs.create_queue(
QueueName='my-queue-dlq',
Attributes={
'MessageRetentionPeriod': '1209600' # 14日
}
)['QueueUrl']
dlq_arn = sqs.get_queue_attributes(
QueueUrl=dlq_url,
AttributeNames=['QueueArn']
)['Attributes']['QueueArn']
# メインキューにDLQを設定
main_queue_url = sqs.create_queue(
QueueName='my-queue',
Attributes={
'RedrivePolicy': json.dumps({
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': 3
})
}
)['QueueUrl']
# Pythonの例(boto3)
import boto3
sns = boto3.client('sns', region_name='us-east-1')
# Topicの作成
response = sns.create_topic(Name='my-topic')
topic_arn = response['TopicArn']
print(f"TopicArn: {topic_arn}")
# メッセージのパブリッシュ
response = sns.publish(
TopicArn=topic_arn,
Message=json.dumps({
'default': json.dumps({
'order_id': '12345',
'status': 'completed'
}),
'email': 'Order #12345 has been completed.',
'sms': 'Your order #12345 is ready!'
}),
MessageStructure='json',
Subject='Order Update'
)
print(f"MessageId: {response['MessageId']}")
# Emailサブスクリプション
subscription = sns.subscribe(
TopicArn=topic_arn,
Protocol='email',
Endpoint='user@example.com'
)
# SQSサブスクリプション
sqs_subscription = sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=sqs_queue_arn
)
# Lambdaサブスクリプション
lambda_subscription = sns.subscribe(
TopicArn=topic_arn,
Protocol='lambda',
Endpoint=lambda_function_arn
)
# フィルターポリシーの設定
filter_policy = {
'order_type': ['Standard', 'Premium'],
'amount': [{'numeric': ['>=', 100]}]
}
sns.set_subscription_attributes(
SubscriptionArn=subscription_arn,
AttributeName='FilterPolicy',
AttributeValue=json.dumps(filter_policy)
)
# フィルターに一致するメッセージのみ配信
sns.publish(
TopicArn=topic_arn,
Message=json.dumps({
'order_id': '12345',
'order_type': 'Standard',
'amount': 150
}),
MessageAttributes={
'order_type': {
'DataType': 'String',
'StringValue': 'Standard'
},
'amount': {
'DataType': 'Number',
'StringValue': '150'
}
}
)
# SNS Topicの作成
topic_arn = sns.create_topic(Name='order-topic')['TopicArn']
# SQS Queueの作成
queue_url = sqs.create_queue(QueueName='order-queue')['QueueUrl']
queue_arn = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['QueueArn']
)['Attributes']['QueueArn']
# SQS QueueにSNSからのアクセスを許可
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
'Policy': json.dumps({
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Principal': {'Service': 'sns.amazonaws.com'},
'Action': 'SQS:SendMessage',
'Resource': queue_arn,
'Condition': {
'ArnEquals': {
'aws:SourceArn': topic_arn
}
}
}]
})
}
)
# SNSからSQSへのサブスクリプション
sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=queue_arn
)
Resources:
MyQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: my-queue
VisibilityTimeoutSeconds: 30
MessageRetentionPeriod: 345600
ReceiveMessageWaitTimeSeconds: 20
RedrivePolicy:
deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
maxReceiveCount: 3
Tags:
- Key: Environment
Value: Production
DeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: my-queue-dlq
MessageRetentionPeriod: 1209600
Resources:
MyTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: my-topic
DisplayName: My Topic
Subscription:
- Endpoint: user@example.com
Protocol: email
- Endpoint: !GetAtt MyQueue.Arn
Protocol: sqs
MyQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: my-queue
QueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref MyQueue
PolicyDocument:
Statement:
- Effect: Allow
Principal:
Service: sns.amazonaws.com
Action: SQS:SendMessage
Resource: !GetAtt MyQueue.Arn
Condition:
ArnEquals:
aws:SourceArn: !Ref MyTopic
# Lambda関数
import json
import boto3
def lambda_handler(event, context):
for record in event['Records']:
body = json.loads(record['body'])
# メッセージの処理
process_message(body)
return {
'statusCode': 200,
'body': json.dumps('Success')
}
# CloudFormation
Resources:
ProcessQueueFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: process-queue
Runtime: python3.9
Handler: index.lambda_handler
Code:
ZipFile: |
def lambda_handler(event, context):
return {'statusCode': 200}
Events:
SQSQueue:
Type: SQS
Properties:
Queue: !GetAtt MyQueue.Arn
BatchSize: 10
# Lambda関数
import json
def lambda_handler(event, context):
for record in event['Records']:
sns_message = json.loads(record['Sns']['Message'])
# メッセージの処理
process_sns_message(sns_message)
return {
'statusCode': 200,
'body': json.dumps('Success')
}

10. 実践的なベストプラクティス

Section titled “10. 実践的なベストプラクティス”
# 処理時間に応じた可視性タイムアウトの設定
# 処理時間の2倍程度に設定
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
'VisibilityTimeout': '60' # 30秒の処理なら60秒
}
)
# 複数メッセージの一括送信
entries = [
{
'Id': '1',
'MessageBody': json.dumps({'id': 1})
},
{
'Id': '2',
'MessageBody': json.dumps({'id': 2})
}
]
sqs.send_message_batch(
QueueUrl=queue_url,
Entries=entries
)
import boto3
from botocore.exceptions import ClientError
def send_message_safely(queue_url, message):
try:
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message)
)
return response['MessageId']
except ClientError as e:
print(f"Error sending message: {e}")
# リトライロジック
raise
# SQSメトリクスの確認
import boto3
cloudwatch = boto3.client('cloudwatch')
# キューのメッセージ数
response = cloudwatch.get_metric_statistics(
Namespace='AWS/SQS',
MetricName='ApproximateNumberOfMessages',
Dimensions=[
{
'Name': 'QueueName',
'Value': 'my-queue'
}
],
StartTime=datetime.utcnow() - timedelta(hours=1),
EndTime=datetime.utcnow(),
Period=300,
Statistics=['Average']
)
Resources:
QueueDepthAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: HighQueueDepth
MetricName: ApproximateNumberOfMessages
Namespace: AWS/SQS
Statistic: Average
Period: 300
EvaluationPeriods: 1
Threshold: 1000
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: QueueName
Value: my-queue
AlarmActions:
- !Ref SNSTopic
# 解決: 冪等性の実装
processed_ids = set()
def process_message(message):
message_id = message.get('id')
if message_id in processed_ids:
return # 既に処理済み
# 処理
do_work(message)
processed_ids.add(message_id)
# 解決: DLQの設定と適切な確認応答
# DLQを設定し、maxReceiveCountを適切に設定
# 処理が成功したら必ずメッセージを削除
# 解決: バッチ処理とLong Polling
sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10, # バッチサイズ
WaitTimeSeconds=20 # Long Polling
)

AWS SQS/SNS完全ガイドのポイント:

  • SQS: Standard Queue、FIFO Queue、DLQ
  • SNS: Topic、Subscription、フィルタリング
  • 連携: SNS → SQS、SQS → Lambda、SNS → Lambda
  • CloudFormation: インフラのコード化
  • ベストプラクティス: 可視性タイムアウト、バッチ処理、エラーハンドリング
  • 監視: CloudWatchメトリクスとアラーム

適切なSQS/SNSの使用により、スケーラブルで信頼性の高いメッセージングシステムを構築できます。