AWSにおけるSQLを使ったデータ基盤の監視
本投稿では、AWSを前提にして、SQLを使ってデータ基盤の監視を行う方法について紹介します。
以下のBigQueryを使用した事例をかなり参考にさせてもらっています。
データ基盤を運用していると、データ処理後に生成されるデータの中身を検証したいモチベーションがあります。
データ処理は成功したが、一部のデータが欠損していてデータは作られていないといったことがよくあるためです。
データの中身を検証するにあたり、SQLでデータを取得し、データを検証することで、簡単に監視の仕組みを実現できます。
なお、今回はシステムの構成として、Athenaを使用していることを前提にします。
内容
データ検証の監視プログラム
参考にさせてもらった事例と同様にPythonでSQLを実行するプログラムを用意します。
Athenaの場合は、PyAthenaというライブラリを利用すると簡単にAthenaのクエリを実行することができます。
下記のプログラムになります。
app.py
import glob import yaml import sys from pyathena import connect from pyathena.cursor import DictCursor cursor = connect( s3_staging_dir='s3://aws-athena-query-results-012345678901-ap-northeast-1/', region_name='ap-northeast-1', cursor_class=DictCursor ).cursor() files = glob.glob('sql/**/*.yaml', recursive=True) def main(): is_failed = False for file in files: print(f'Checking {file}...') with open(file, 'r') as yml: monitoring_items = yaml.safe_load(yml) for monitor in monitoring_items: query = monitor['sql'] try: cursor.execute(query) rows = cursor.fetchall() if len(rows) == 0: is_failed = True print('====================', file=sys.stderr) print(f'Not existed query result for {file}', file=sys.stderr) print(f'Executed SQL:\n{query}', file=sys.stderr) except Exception as e: print(e) if is_failed: sys.exit(1) main()
YAMLファイルに監視のためのクエリを定義します。こちらも参考にさせてもらった内容と同じ仕組みです。
YAMLを定義するだけで、監視対象を増やすことができるので便利な仕組みだと思います。
- description: データ存在チェック sql: | SELECT * FROM default.alb_logs limit 1
本Pythonのプログラムでは、YAMLに定義されているクエリを実行して、データがなければエラーを出力するといった処理にしています。
このあたりは検証したい内容によって、いろいろ工夫できるかと思います。
監視のシステム構成
前述のPythonプログラムでは、データの検証に失敗した場合、エラーを返すようにしています。
このエラーをキャッチして通知する仕組みによって、監視を行います。
そのために、以下のようなシステム構成にしました。
Pythonのプログラムを実行する環境には、ECSを採用しました。
これはLambdaだと実行時間が15分までの制限があるため、ECSにしました。
このあたりが許容できればLambdaでも問題ないと思います。
ECSでエラーをキャッチするには、ECS単体では難しいため、Step Functionsを経由してECSを実行するようにしました。
Step Functionsでは、ECSタスクの終了コードをハンドリングすることができるためです。
具体的には、以下のようにStep Functionsのワークフローを定義しています。
monitoring-data.asl.json
{ "StartAt": "monitoringData", "States": { "monitoringData": { "Type": "Task", "Resource": "arn:aws:states:::ecs:runTask.sync", "Parameters": { "LaunchType": "FARGATE", "Cluster": "${ecs_cluster_arn}", "TaskDefinition": "arn:aws:ecs:ap-northeast-1:012345678901:task-definition/monitoring-data:1", "NetworkConfiguration": { "AwsvpcConfiguration": { "Subnets": [ "${subnet_id}" ], "SecurityGroups": [ "${security_group_id}" ], "AssignPublicIp": "ENABLED" } } }, "ResultPath": "$.monitoringData", "Next": "monitoringData Success ?" }, "monitoringData Success ?": { "Type": "Choice", "Choices": [ { "Variable": "$.monitoringData.Containers[0].ExitCode", "NumericEquals": 0, "Next": "monitoringData SuccessState" } ], "Default": "monitoringData FailState" }, "monitoringData FailState": { "Type": "Fail" }, "monitoringData SuccessState": { "Type": "Succeed" } } }
Choice
Typeの分岐で$.monitoringData.Containers[0].ExitCode
を記述して、終了コードを条件にエラーであれば、Failにします。
あとは、CloudWatchのアラームでStep FunctionsのエラーをSlackなどに通知すれば、監視できます。
まとめ
今回の内容は元ネタのSQLを使って監視する方法を参考にさせてもらいました。
この監視の方法は分かりやすい仕組みになっていて、簡単に検証対象のデータを増やすことができるので、賢いやり方だと思います。