Amazon EventBridgeとAmazon SQSで簡単!サーバーレスなCSVバッチ処理

こんにちは、コバヤシです。
今回は、「AWS Transfer Familyで簡単!サーバーレスなCSVバッチ処理」の最後で話をしていた、
EventBridgeとSQSを使用したサーバーレスなCSV処理について書いていきたいと思います。

tech.arms-soft.co.jp

目指す処理の流れ

前回は、AWS Transfer Familyを使用していましたが、今回はその部分を「Amazon EventBridge」と「Amazon SQS」に置き換えます。

  1. CSVファイルのアップロード
    PCからS3にCSVのアップロードを行います。
    CyberduckやWinSCPなどのS3対応アプリであれば直接S3にアップロードできます。
  2. S3のイベントをEventBridgeが処理する
    S3のイベントをEventBridgeに送り、ファイル作成イベントであればSQSへメッセージを送る。
  3. SQSのキューをLambdaで処理する
    キューにメッセージが入るとLambdaが起動する。LambdaでCSVデータを加工・整形します。
    前回と同じく、複数のユーザーのテスト結果を集計する処理を行いたいと思います。
  4. 加工済みCSVの保存
    加工されたCSVデータを別のS3バケットに保存します。EventBridgeはバケット単位の設定となるので同じバケットに保存するとループしてしまいます。なので、保存は別のバケットへの保存となります。

想定するイメージは以下になります。

Amazon EventBridgeとは

Amazon EventBridge は、AWSのさまざまなサービスや外部アプリから発生するイベントを自動的にキャッチし、指定したサービスに送る仕組みです。サーバーレスアプリや分散システムの連携を簡単に設定することが出来ます。

aws.amazon.com

Amazon SQSとは

Amazon SQS(Simple Queue Service)は、AWSが提供するメッセージキューサービスです。アプリケーション間でメッセージを一時的に保存し、順番に処理できるようになります。

aws.amazon.com

1.S3にバケットを作成する

バケットの作成

今回は、CSVアップロード用のバケットと処理済みのファイルを保存する2つのバケットを作成します。

バケットのオプションを変更する

アップロードのバケットのオプションから、Amazon EventBridgeの編集を押します。

「このバケット内のすべてのイベントについて Amazon EventBridge に通知を送信する」をオンにします。

2.Simple Queue Serviceでキューを作成する

Simple Queue Serviceの画面に行きキューを作成します。
今回はほとんど設定を変えずに作成していますが、実際は必要に応じでFIFOやデッドレターキューの設定も行います。

名前を入力し、アクセスポリシーのアドバンストからjsonを直接編集して以下のようにEventBridgeからのメッセージ送信を許可します。

{
  "Version": "2012-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "__owner_statement",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::xxxxxxxxxxxx:root"
      },
      "Action": "SQS:*",
      "Resource": "arn:aws:sqs:ap-northeast-1:xxxxxxxxxxxx:yyyyyyyyyy"
    },
    {
      "Sid": "AllowEventBridge",
      "Effect": "Allow",
      "Principal": {
        "Service": "events.amazonaws.com"
      },
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:ap-northeast-1:xxxxxxxxxxxx:yyyyyyyyyy"
    }
  ]
}

3.EventBridgeでルールを作成

EventBridgeの画面からルールを作成します。

名前と説明を入力して次へを押します。
(ルールタイプは「イベントパターンを持つルール」のままです)

以下を入力して次へを押します。

  • イベントソース:AWSのサービス
  • AWSのサービス:Simple Storage Service(S3)
  • イベントタイプ:S3イベント通知
  • イベントタイプの仕様 1:特定のイベント
  • 特定のイベント:Object Created
  • イベントタイプの仕様 2:特定のバケット (名前別)
  • 特定のバケット (名前別):【CSVアップロード用のバケット名】

AWS のサービスで「SQSキュー」を選択し、キューには先程作成したキューを選択して、次へを押します。

必要であればタグを入力し、次へを押します。
確認画面で、「ルールの作成」ボタンを押してルールを作成します。

4.Lambda関数の変更

バケットへの実行ロールを変更

前回作成したLambdaの実行ロールを変更して、2つのバケットにアクセスできるようにします。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::bucket001/*",
                "arn:aws:s3:::bucket002/*"
            ]
        }
    ]
}

SQSへのアクセスを追加

さらに、インラインポリシーを作成してSQSへのアクセスを許可します。
Resourceはキューのリソースを指定します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": "arn:aws:sqs:ap-northeast-1:xxxxxxxxxxxx:yyyyyy"
        }
    ]
}

Lambda関数の処理を変更

前回作成したLambda関数を変更します。
今回はとりあえず動かしたかったので、ざっくりなプログラムとなっています。

import boto3
import csv
import io
import json
from datetime import datetime, timezone, timedelta

# S3クライアントを作成
s3 = boto3.client('s3')

def lambda_handler(event, context):
    results = []

    for record in event['Records']:
        try:
            # SQS メッセージの `body` を JSON にデコード
            message_body = json.loads(record['body'])

            # バケット名とオブジェクトキーを取得
            bucket_name = message_body['detail']['bucket']['name']
            object_key = message_body['detail']['object']['key']
        except (KeyError, json.JSONDecodeError) as e:
            raise ValueError("入力エラー")

        try:
            # S3からCSVファイルを取得
            response = s3.get_object(Bucket=bucket_name, Key=object_key)
            csv_content = response['Body'].read().decode('utf-8')
        except Exception as e:
            raise RuntimeError("処理に失敗しました")

        try:
            # CSVデータの処理
            reader = csv.DictReader(io.StringIO(csv_content))
            user_scores = {}
            for row in reader:
                user = row.get('ユーザー名')
                score = row.get('点数')
                if not user or not score:
                    continue
                try:
                    score = int(score)
                except ValueError:
                    continue
                
                if user not in user_scores:
                    user_scores[user] = 0
                user_scores[user] += score

            # 処理後のCSVを作成
            output = io.StringIO()
            writer = csv.writer(output)
            writer.writerow(['ユーザー名', '合計点'])
            for user, total_score in user_scores.items():
                writer.writerow([user, total_score])

            # 新しいファイル名を作成
            base_name = object_key.split('/')[-1]
            jst = timezone(timedelta(hours=9)) 
            current_time = datetime.now(jst).strftime("%Y%m%d_%H%M%S") 
            new_file_name = f"{base_name.split('.')[0]}_processed_{current_time}.csv"

            upload_dir = "/".join(object_key.split('/')[:-1])  # ディレクトリ部分を取得
            processed_key = f"{upload_dir}/{new_file_name}" if upload_dir else new_file_name

            # S3に保存
            output.seek(0)
            s3.put_object(
                Bucket='bucket002',
                Key=processed_key,
                Body=output.getvalue(),
                ContentType='text/csv'
            )

            results.append({
                "processedFilePath": processed_key,
                "message": "処理が成功しました"
            })
        except Exception as e:
            raise RuntimeError("処理に失敗しました")

    return results if results else {"message": "処理するデータがありません"}

5.実行

アップロード用のバケットに前回作成したCSVをアップロードすると、無事CSVを保存するバケットに処理されたCSVが保存されました。

中身も問題ないようです。

ユーザー名,合計点
ユーザー1,323
ユーザー2,327
ユーザー3,277
ユーザー4,321
ユーザー5,335
ユーザー6,342
ユーザー7,310
ユーザー8,335
ユーザー9,338
ユーザー10,257

まとめ

AWS Transfer Familyで実現していた処理を、EventBridgeとSQSを組み合わせて実行することができました。
Transfer Familyのときよりも関与するサービスが増えたことで、運用時にはエラー処理に少し気を使う必要がありますが、今回のような同一アカウント・同一リージョンでの利用であればEventBridgeは無料、SQSも100万件までは無料なので、コストを大きく抑えることが可能です。
今後は案件の要件に応じて、この方法も選択肢として検討していきたいと思います。