S3→EventBridge→Step Functions→GlueでCSV取り込みを自動化した話
はじめに
こんにちは、エンジニアチームのこむたです。
本記事では、AWS Step Functions を用いて、S3にアップロードされたCSVファイルを起点に、Glueでデータ変換・DB登録を行うワークフローについて紹介します。
S3イベントをトリガーにGlueを起動する構成は、EventBridgeを使うことで比較的簡単に実現できます。
しかし、実運用を考えると「単にGlueを起動するだけ」では対応しきれないケースがありました。
今回の要件では、以下のようなポイントを満たす必要がありました。
- アップロードされた特定のファイルのみを確実に処理したい
- Glue実行後に、S3内の状態を見て後続処理を分岐したい
これらを実現するために、S3 → EventBridge → Step Functions → Glueという構成を採用しました。
本記事では、この構成にした理由と実装のポイントを紹介します。
課題
今回の要件はシンプルで、
「S3にCSVをアップロードするだけで、データベースに取り込まれる仕組みを作りたい」
というものでした。
日次でCSVファイルがアップロードされ、それらを加工してAurora MySQLに格納します。
当初は、Glue JobでS3の特定ディレクトリを参照し、その中のCSVをまとめて処理する構成を検討していました。
しかしこの構成にはいくつかの問題がありました。
まず、処理対象が曖昧になる点です。
ディレクトリ単位でCSVを処理する場合、どのファイルが今回の対象なのか分かりづらく、同じファイルを再処理してしまう可能性があります。
そのため、「アップロードされた1ファイル」を単位とした処理を行うことが難しい状態でした。
また、後続処理の制御がしづらいという課題もありました。
今回の要件では、Glueの処理が完了した後に、S3内に未処理ファイルが残っているかを確認し、その結果に応じて後続のバッチ処理(ECSなど)を実行するかどうかを制御する必要がありました。
しかし、Glue単体ではこのような処理後の分岐制御を持たせることが難しく、ワークフロー全体を統括する仕組みが必要でした。
ワークフロー構成の検討
まず、Glueのワークフロー制御としては、Glue Workflowという選択肢があります。
Glue Workflowは、Glue JobやCrawlerの実行順序を制御する用途には適しており、シンプルなETLパイプラインであれば有効な手段です。
しかし、今回の要件では以下の点が課題となりました。
- S3イベントの内容(bucket / key)をそのままGlue Jobに引き渡すことができない
- そのため、特定の1ファイルではなくディレクトリ単位で処理する構成になりやすい
- S3の状態(ファイル数など)をもとにした分岐処理をワークフロー内で実装できない
- ECSなど、Glue以外のサービスを含めた処理制御が難しい
これらを実現するには、Lambdaなどを挟んでイベントデータの加工や分岐処理を実装する必要があり、結果として処理の責務が分散し、構成が複雑になります。
そこで、イベントデータをそのまま扱いながら、処理の分岐や後続処理まで含めて一元的に制御できる方法として、Step Functionsに注目しました。
Step Functionsを用いることで、以下のような要件をシンプルに満たすことができます。
- S3イベントのbucket / keyをそのままGlue Jobに渡す
- S3の状態をもとにした分岐処理
- Glue以外のサービス(ECSなど)を含めたワークフロー制御
以上の理由から、本構成ではStep Functionsを中心としたアーキテクチャを採用しました。
アーキテクチャの構築
採用した構成は以下の通りです。
S3 → EventBridge → Step Functions → Glue
この構成により、S3にアップロードされたCSVファイルをイベントとして受け取り、その情報(bucket / key)をStep Functions経由でGlue Jobに引き渡すことで、ディレクトリ単位ではなく単一ファイル単位での処理を実現しています。
また、Glue Jobには同時実行数(Maximum concurrency)およびキューイング(Job run queuing)を設定し、同時実行数を1に制限することで、複数ファイルが同時にアップロードされた場合でも順番に処理されるようにしています。
さらに、S3にはnew/とold/のディレクトリを用意し、処理対象のファイルはnew/に配置、処理完了後にold/に移動する構成としています。
これにより、Step Functionsからnew/配下の状態を確認することで、未処理ファイルの有無に応じた分岐処理が可能になります。
処理の流れは次の通りです。
- S3にCSVファイルをアップロード
- EventBridgeがイベントを検知
- Step Functionsが起動
- Step Functionsが、イベントの bucket / key をGlueに渡して実行
- Glueが対象CSVを読み込み、Auroraに登録
- Glueが処理済みファイルを
old/に移動 - Step Functionsが
new/配下を確認 - 未処理ファイル数に応じて、ECSタスクを起動するか、処理を終了するか(次のGlue Jobを実行するか)を判定
まとめ
S3イベントからGlueを起動するだけの構成でもETLは実現できますが、実運用では「どのファイルを処理するか」「処理後にどう振る舞うか」といった制御が重要になります。
今回の構成ではStep Functionsを用いることで、
- S3イベントをそのままGlueに渡すことで処理対象を明確化
- 後続処理の制御
を可能にしたシンプルかつ拡張しやすいワークフローを実現しました。
同様に、S3イベント起点で制御を行いたい場合の参考になれば幸いです。
参考文献
Step Functions とは - AWS Step Functions