なになれ

IT系のことを記録していきます

AWSにおけるSQLを使ったデータ基盤の監視

本投稿では、AWSを前提にして、SQLを使ってデータ基盤の監視を行う方法について紹介します。
以下のBigQueryを使用した事例をかなり参考にさせてもらっています。

tech-blog.monotaro.com

データ基盤を運用していると、データ処理後に生成されるデータの中身を検証したいモチベーションがあります。
データ処理は成功したが、一部のデータが欠損していてデータは作られていないといったことがよくあるためです。
データの中身を検証するにあたり、SQLでデータを取得し、データを検証することで、簡単に監視の仕組みを実現できます。

なお、今回はシステムの構成として、Athenaを使用していることを前提にします。

内容

データ検証の監視プログラム

参考にさせてもらった事例と同様にPythonSQLを実行するプログラムを用意します。
Athenaの場合は、PyAthenaというライブラリを利用すると簡単にAthenaのクエリを実行することができます。

下記のプログラムになります。

app.py

import glob
import yaml
import sys
from pyathena import connect
from pyathena.cursor import DictCursor

cursor = connect(
    s3_staging_dir='s3://aws-athena-query-results-012345678901-ap-northeast-1/',
    region_name='ap-northeast-1',
    cursor_class=DictCursor
).cursor()

files = glob.glob('sql/**/*.yaml', recursive=True)

def main():
    is_failed = False
    for file in files:
        print(f'Checking {file}...')
        with open(file, 'r') as yml:
            monitoring_items = yaml.safe_load(yml)
            for monitor in monitoring_items:
                query = monitor['sql']
                try:
                    cursor.execute(query)
                    rows = cursor.fetchall()
                    if len(rows) == 0:
                        is_failed = True
                        print('====================', file=sys.stderr)
                        print(f'Not existed query result for {file}', file=sys.stderr)
                        print(f'Executed SQL:\n{query}', file=sys.stderr)
                except Exception as e:
                    print(e)
    if is_failed:
        sys.exit(1)

main()

YAMLファイルに監視のためのクエリを定義します。こちらも参考にさせてもらった内容と同じ仕組みです。
YAMLを定義するだけで、監視対象を増やすことができるので便利な仕組みだと思います。

sql/sample.yaml

- description: データ存在チェック
  sql: |
    SELECT * FROM default.alb_logs limit 1

Pythonのプログラムでは、YAMLに定義されているクエリを実行して、データがなければエラーを出力するといった処理にしています。
このあたりは検証したい内容によって、いろいろ工夫できるかと思います。

監視のシステム構成

前述のPythonプログラムでは、データの検証に失敗した場合、エラーを返すようにしています。
このエラーをキャッチして通知する仕組みによって、監視を行います。
そのために、以下のようなシステム構成にしました。
f:id:hi1280:20211030115031p:plain

Pythonのプログラムを実行する環境には、ECSを採用しました。
これはLambdaだと実行時間が15分までの制限があるため、ECSにしました。
このあたりが許容できればLambdaでも問題ないと思います。

ECSでエラーをキャッチするには、ECS単体では難しいため、Step Functionsを経由してECSを実行するようにしました。
Step Functionsでは、ECSタスクの終了コードをハンドリングすることができるためです。
具体的には、以下のようにStep Functionsのワークフローを定義しています。

monitoring-data.asl.json

{
  "StartAt": "monitoringData",
  "States": {
    "monitoringData": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.sync",
      "Parameters": {
        "LaunchType": "FARGATE",
        "Cluster": "${ecs_cluster_arn}",
        "TaskDefinition": "arn:aws:ecs:ap-northeast-1:012345678901:task-definition/monitoring-data:1",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "Subnets": [
              "${subnet_id}"
            ],
            "SecurityGroups": [
              "${security_group_id}"
            ],
            "AssignPublicIp": "ENABLED"
          }
        }
      },
      "ResultPath": "$.monitoringData",
      "Next": "monitoringData Success ?"
    },
    "monitoringData Success ?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.monitoringData.Containers[0].ExitCode",
          "NumericEquals": 0,
          "Next": "monitoringData SuccessState"
        }
      ],
      "Default": "monitoringData FailState"
    },
    "monitoringData FailState": {
      "Type": "Fail"
    },
    "monitoringData SuccessState": {
      "Type": "Succeed"
    }
  }
}

ChoiceTypeの分岐で$.monitoringData.Containers[0].ExitCodeを記述して、終了コードを条件にエラーであれば、Failにします。
あとは、CloudWatchのアラームでStep FunctionsのエラーをSlackなどに通知すれば、監視できます。

まとめ

今回の内容は元ネタのSQLを使って監視する方法を参考にさせてもらいました。
この監視の方法は分かりやすい仕組みになっていて、簡単に検証対象のデータを増やすことができるので、賢いやり方だと思います。