こんにちは、コバヤシです。
今回は、「AWS Transfer Familyで簡単!サーバーレスなCSVバッチ処理」の最後で話をしていた、
EventBridgeとSQSを使用したサーバーレスなCSV処理について書いていきたいと思います。
目指す処理の流れ
前回は、AWS Transfer Familyを使用していましたが、今回はその部分を「Amazon EventBridge」と「Amazon SQS」に置き換えます。
- CSVファイルのアップロード
PCからS3にCSVのアップロードを行います。
CyberduckやWinSCPなどのS3対応アプリであれば直接S3にアップロードできます。 - S3のイベントをEventBridgeが処理する
S3のイベントをEventBridgeに送り、ファイル作成イベントであればSQSへメッセージを送る。 - SQSのキューをLambdaで処理する
キューにメッセージが入るとLambdaが起動する。LambdaでCSVデータを加工・整形します。
前回と同じく、複数のユーザーのテスト結果を集計する処理を行いたいと思います。 - 加工済みCSVの保存
加工されたCSVデータを別のS3バケットに保存します。EventBridgeはバケット単位の設定となるので同じバケットに保存するとループしてしまいます。なので、保存は別のバケットへの保存となります。
想定するイメージは以下になります。
Amazon EventBridgeとは
Amazon EventBridge は、AWSのさまざまなサービスや外部アプリから発生するイベントを自動的にキャッチし、指定したサービスに送る仕組みです。サーバーレスアプリや分散システムの連携を簡単に設定することが出来ます。
Amazon SQSとは
Amazon SQS(Simple Queue Service)は、AWSが提供するメッセージキューサービスです。アプリケーション間でメッセージを一時的に保存し、順番に処理できるようになります。
1.S3にバケットを作成する
バケットの作成
今回は、CSVアップロード用のバケットと処理済みのファイルを保存する2つのバケットを作成します。
バケットのオプションを変更する
アップロードのバケットのオプションから、Amazon EventBridgeの編集を押します。
「このバケット内のすべてのイベントについて Amazon EventBridge に通知を送信する」をオンにします。
2.Simple Queue Serviceでキューを作成する
Simple Queue Serviceの画面に行きキューを作成します。
今回はほとんど設定を変えずに作成していますが、実際は必要に応じでFIFOやデッドレターキューの設定も行います。
名前を入力し、アクセスポリシーのアドバンストからjsonを直接編集して以下のようにEventBridgeからのメッセージ送信を許可します。
{ "Version": "2012-10-17", "Id": "__default_policy_ID", "Statement": [ { "Sid": "__owner_statement", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::xxxxxxxxxxxx:root" }, "Action": "SQS:*", "Resource": "arn:aws:sqs:ap-northeast-1:xxxxxxxxxxxx:yyyyyyyyyy" }, { "Sid": "AllowEventBridge", "Effect": "Allow", "Principal": { "Service": "events.amazonaws.com" }, "Action": "sqs:SendMessage", "Resource": "arn:aws:sqs:ap-northeast-1:xxxxxxxxxxxx:yyyyyyyyyy" } ] }
3.EventBridgeでルールを作成
EventBridgeの画面からルールを作成します。
名前と説明を入力して次へを押します。
(ルールタイプは「イベントパターンを持つルール」のままです)
以下を入力して次へを押します。
- イベントソース:AWSのサービス
- AWSのサービス:Simple Storage Service(S3)
- イベントタイプ:S3イベント通知
- イベントタイプの仕様 1:特定のイベント
- 特定のイベント:Object Created
- イベントタイプの仕様 2:特定のバケット (名前別)
- 特定のバケット (名前別):【CSVアップロード用のバケット名】
AWS のサービスで「SQSキュー」を選択し、キューには先程作成したキューを選択して、次へを押します。
必要であればタグを入力し、次へを押します。
確認画面で、「ルールの作成」ボタンを押してルールを作成します。
4.Lambda関数の変更
バケットへの実行ロールを変更
前回作成したLambdaの実行ロールを変更して、2つのバケットにアクセスできるようにします。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:*:*:*" }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::bucket001/*", "arn:aws:s3:::bucket002/*" ] } ] }
SQSへのアクセスを追加
さらに、インラインポリシーを作成してSQSへのアクセスを許可します。
Resourceはキューのリソースを指定します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes" ], "Resource": "arn:aws:sqs:ap-northeast-1:xxxxxxxxxxxx:yyyyyy" } ] }
Lambda関数の処理を変更
前回作成したLambda関数を変更します。
今回はとりあえず動かしたかったので、ざっくりなプログラムとなっています。
import boto3 import csv import io import json from datetime import datetime, timezone, timedelta # S3クライアントを作成 s3 = boto3.client('s3') def lambda_handler(event, context): results = [] for record in event['Records']: try: # SQS メッセージの `body` を JSON にデコード message_body = json.loads(record['body']) # バケット名とオブジェクトキーを取得 bucket_name = message_body['detail']['bucket']['name'] object_key = message_body['detail']['object']['key'] except (KeyError, json.JSONDecodeError) as e: raise ValueError("入力エラー") try: # S3からCSVファイルを取得 response = s3.get_object(Bucket=bucket_name, Key=object_key) csv_content = response['Body'].read().decode('utf-8') except Exception as e: raise RuntimeError("処理に失敗しました") try: # CSVデータの処理 reader = csv.DictReader(io.StringIO(csv_content)) user_scores = {} for row in reader: user = row.get('ユーザー名') score = row.get('点数') if not user or not score: continue try: score = int(score) except ValueError: continue if user not in user_scores: user_scores[user] = 0 user_scores[user] += score # 処理後のCSVを作成 output = io.StringIO() writer = csv.writer(output) writer.writerow(['ユーザー名', '合計点']) for user, total_score in user_scores.items(): writer.writerow([user, total_score]) # 新しいファイル名を作成 base_name = object_key.split('/')[-1] jst = timezone(timedelta(hours=9)) current_time = datetime.now(jst).strftime("%Y%m%d_%H%M%S") new_file_name = f"{base_name.split('.')[0]}_processed_{current_time}.csv" upload_dir = "/".join(object_key.split('/')[:-1]) # ディレクトリ部分を取得 processed_key = f"{upload_dir}/{new_file_name}" if upload_dir else new_file_name # S3に保存 output.seek(0) s3.put_object( Bucket='bucket002', Key=processed_key, Body=output.getvalue(), ContentType='text/csv' ) results.append({ "processedFilePath": processed_key, "message": "処理が成功しました" }) except Exception as e: raise RuntimeError("処理に失敗しました") return results if results else {"message": "処理するデータがありません"}
5.実行
アップロード用のバケットに前回作成したCSVをアップロードすると、無事CSVを保存するバケットに処理されたCSVが保存されました。
中身も問題ないようです。
ユーザー名,合計点 ユーザー1,323 ユーザー2,327 ユーザー3,277 ユーザー4,321 ユーザー5,335 ユーザー6,342 ユーザー7,310 ユーザー8,335 ユーザー9,338 ユーザー10,257
まとめ
AWS Transfer Familyで実現していた処理を、EventBridgeとSQSを組み合わせて実行することができました。
Transfer Familyのときよりも関与するサービスが増えたことで、運用時にはエラー処理に少し気を使う必要がありますが、今回のような同一アカウント・同一リージョンでの利用であればEventBridgeは無料、SQSも100万件までは無料なので、コストを大きく抑えることが可能です。
今後は案件の要件に応じて、この方法も選択肢として検討していきたいと思います。