なになれ

IT系のことを記録していきます

AWSにおけるデータ基盤の作り方

データ活用によって新たな価値を提供したいというニーズは日に日に高まっているように思います。
そんな昨今、データ基盤の構築に関わっています。
今回はAWSで実現するデータ基盤について、いくつかの例を交えて紹介します。
AWSではデータ基盤向けのサービスがいくつもあります。利用者はそれらを取捨選択する必要があります。
今回の内容を通して、取捨選択のヒントになればと思います。

概要

データ基盤の構築にあたっては、データを蓄積する、分析する、可視化するといった用途に応じてデータを扱う環境を分けることをオススメします。
環境を適切に分けることで、データの出どころが分かりやすくなるため、データの品質をコントロールしやすくなります。
全体のシステム構成イメージとしては以下の通りです。
f:id:hi1280:20201011123517p:plain:w1000

用途ごとのデータ環境の作り方について紹介をしていきます。

データ蓄積環境を作る

データを蓄積する環境とは一般的にはデータレイクと呼ばれます。
データレイクとは、データを加工せずにそのままの形でデータを蓄積しておく環境のことです。
AWSにおいては、あらゆるデータ形式をそのまま保存することができるため、S3が適しています。
これ以降はS3に各種データを蓄積する前提での話になります。

ログを蓄積する

ログを使って、サーバへのアクセスを分析することが想定されます。
ログの出どころによって、様々なやり方が考えられます。
ここで紹介するのはほんの一例です。

ALBのログを蓄積する

ALBでは、S3にログを出力する機能があります。
Application Load Balancer のアクセスログ - Elastic Load Balancing

アプリケーションのログを蓄積する

Fluendを使うと、S3上にアプリケーションのログを蓄積することができます。
s3 - Fluentd

アプリケーションのログはJSON形式といった構造化ログ形式にしておくと、この後のデータ分析が楽になります。

RDSのデータを蓄積する

ほとんどの場合、アプリケーションのデータはRDBに溜められていると思います。
RDSの場合、スナップショットのデータをS3にエクスポートする機能があります。
Amazon S3 への DB スナップショットデータのエクスポート - Amazon Relational Database Service

エクスポートの過程でParquet形式に変換されます。Parquetはデータ分析に適したファイル形式です。

ただこのエクスポート機能には、注意すべきことがあります。
Parquet形式へのデータ変換の際にデータ型が変わります。
これにより、この後のデータ分析で扱うデータ型と差異が生じることがあります。

例えば、MySQLの場合、timestamp型の値をParquet形式にするとINT64のマイクロ秒単位でのunixtime値に変換されます。
これはAthenaのtimestamp型と扱う値の範囲が異なります。
そのため、Athenaではtimestamp型の値としてそのまま利用することはできません。
Athenaの型をBIGINT型にして、unixtimeの値でParquet形式のtimestamp値を扱う必要があります。

アプリケーションのデータをリアルタイムに蓄積する

RDBにデータを保存せずに、リアルタイムに生のデータを蓄積したいケースがあると思います。
これにはアプリケーションの修正が伴います。
ゼロからそのような仕掛けを作る場合に、オススメの方法はFirehoseを使うことです。

Firehoseの利点は以下の通りです。

  • Lambdaを使ったデータの前処理を行うことができる
  • レコードの形式をParquet形式に変換できる
  • データをバックアップすることができる

特にデータをバックアップする機能は、データの前処理に失敗した場合に役立ちます。
このようにデータ分析をするにあたって役立つ機能が揃っています。

S3へのデータ配置は、AWS SDKを使うとできます。
アプリケーションで扱うデータをFirehoseに送信することができます。
AWS SDK を使用した Kinesis Data Firehose への書き込み - Amazon Kinesis Data Firehose

データ分析環境を作る

ここでのデータ分析環境とは、データレイク環境のデータにETL処理を行い、データ分析をしやすい形にデータを配置した環境のことです。
また、分析の過程でデータレイクのデータをそのまま参照するといったことも行います。

S3上のデータを扱う

S3上のデータを扱うには、まずデータカタログを作成します。
データカタログを作成することで、S3上のデータをAthenaを使って扱うことができます。
データカタログを作るには、Glueクローラを使います。
AWS Glue コンソールでのクローラの使用 - AWS Glue

データカタログを作る方法はいくつかありますが、Glueクローラで作るとデータ型をデータの内容によって自動で決めてくれるため簡単にデータカタログを作成できます。

データカタログを作った後、そのままのデータでは扱いづらい場合にデータの変換処理を行います。
Athenaの機能であるCREATE TABLE AS SELECT(CTAS)を使うと、SELECTクエリを使用してS3上にデータを作成して新規にAthenaのテーブルを作成することができます。
ETL およびデータ分析での CTAS および INSERT INTO の使用 - Amazon Athena

データ分析のパフォーマンスを良くするためには、扱うデータの範囲、サイズをできるだけ小さくする必要があります。
そのために以下の点を考慮してテーブルを作成すると良いです。

  • データを圧縮する
  • Parquet、ORCといった列指向形式のデータに変換する
  • パーティション分割を行う

パーティション分割について

パーティション分割をすることでデータを扱う範囲を限定することができるため、クエリのパフォーマンス向上につながります。
どのようにパーティション分割するかは、どのようなクエリを実行するかが関係します。
例えば、時系列データの場合には、データの作成日時がクエリの条件になることが多いため、作成日時でパーティションを区切ると良いです。
FirehoseでデータをS3に配置すると、フォルダが日時毎に作られるので、パーティションを作りやすいです。

Glueクローラを使うとパーティション分割を行ってくれます。
ただ既にデータを蓄積する仕組みがある状態でパーティション分割するだけの場合、コストが見合わないです。
その場合、LambdaからAthenaでパーティション分割をするだけのクエリを実行するという方法をオススメします。
以下の過去記事のパーティション分割方法が具体的なやり方になります。

hi1280.hatenablog.com

データ可視化環境を作る

ここでのデータ可視化環境とは、データを集計処理して、頻繁に利用される集計値を格納する環境のことです。
BIツールを使って、データを可視化するのが主なユースケースになります。

集計値をRDBにロードする

データ可視化環境では、集計値を扱いやすい環境を作ります。
Athenaの場合、データを新規に作成することしかできません。集計値は後で再集計を行うことがあるため、更新処理をサポートしているRDBなどのデータベースを使用するのが良いです。
このデータ可視化環境を作るには、Embulkを利用します。Embulkはバッチ実行に対応したETLツールです。
各種データソースに合わせてプラグインが存在し、プラグインを介した統一的な設定によって、ETL処理を行うことができます。
Embulkを使うと、Athenaのクエリを使用してデータを取得し、RDBにデータをロードすることができます。

Embulkの使い方

ここでは、AthenaからMySQLにデータをロードするという前提でのやり方を紹介します。

インストール
Embulkを利用するには、Dockerイメージ化しておくと楽です。
Embulkを実行可能にするまでの環境をDockerで作ります。

Dockerfile

FROM openjdk:8-jre-alpine

RUN wget -q https://dl.embulk.org/embulk-0.9.23.jar -O /bin/embulk \
  && chmod +x /bin/embulk

RUN apk add --no-cache libc6-compat
RUN embulk gem install embulk-input-athena
RUN embulk gem install embulk-output-mysql

WORKDIR /work

ENTRYPOINT ["java", "-jar", "/bin/embulk"]

Embulk設定ファイル例

Embulkでは、データの取得元とロード先を設定ファイルに記述します。
下記は、AthenaからMysqlにデータをロードする設定ファイルの例です。

athena2mysql.yml.liquid

in:
  type: athena
  database: {{ env.ATHENA_DATABASE }}
  athena_url: jdbc:awsathena://athena.ap-northeast-1.amazonaws.com:443
  s3_staging_dir: {{ env.ATHENA_DIR }}
  access_key: {{ env.ATHENA_AWS_ACCESS_KEY }}
  secret_key: {{ env.ATHENA_AWS_SECRET_KEY }}
  query: |
    SELECT * FROM alb_logs
  columns:
    - {name: type, type: string}
    - {name: time, type: string}
    - {name: elb, type: string}
    - {name: client_ip, type: string}
    - {name: client_port, type: long}
    - {name: target_ip, type: string}
    - {name: target_port, type: long}
    - {name: request_processing_time, type: double}
    - {name: target_processing_time, type: double}
    - {name: response_processing_time, type: double}
out:
  type: mysql
  host: {{ env.MYSQL_HOST }}
  user: {{ env.MYSQL_USER }}
  password: {{ env.MYSQL_PASSWORD }}
  database: {{ env.MYSQL_DATABASE }}
  port: {{ env.MYSQL_PORT }}
  table: log
  mode: merge

liquidファイルにすることで、{{env.XXX}}の記述で環境変数を埋め込むことができます。
プラグインの設定値については、下記プラグインのドキュメントを参考にしてください。

実行

Embulkを実行します。

Dockerイメージをビルドします。

$ docker build -t embulk .

embulk run 設定ファイル名で実行できます。

$ docker run --rm -it -v $(pwd):/work --env-file .env embulk run athena2mysql.yml.liquid

--env-fileオプションで環境変数を定義したファイルを読み込みます。

各環境を連携する

特定の用途向けにデータの加工方法が固まるにつれて、データを蓄積する、分析する、可視化するといったプロセスを経ていきます。
これを制御するためにワークフローを定義します。
構成イメージは以下の通りです。
f:id:hi1280:20201010234150p:plain

EventBridgeとStep Functionsを使用する

AWSでは、スケジュール実行や各種AWSサービスのイベントを検知して実行をトリガーするEventBridgeがあります。
また、各種AWSサービスを連携してワークフローとして処理ができるStep Functionsがあります。
この2つを組み合わせることで、データのプロセスを適切に制御することができます。

EventBridgeはスケジュール実行でStep Functionsを実行します。
スケジュールに従ってトリガーする EventBridge ルールの作成 - Amazon EventBridge

Athenaの実行をStep Functionsに組み込むには、LambdaでAthenaのクエリを実行します。
EmbulkはDockerイメージ化しているので、ECSで実行できます。

Step Functionsでは、JSONでワークフローを定義します。
LambdaとECSを連携するワークフローは以下のように記述できます。

{
  "Comment": "workflow",
  "StartAt": "lambda-example",
  "States": {
    "lambda-example": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:example",
      "ResultPath": "$.lambda-example",
      "Next": "ecs-example"
    },
    "ecs-example": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.sync",
      "Parameters": {
        "LaunchType": "FARGATE",
        "Cluster": "arn:aws:ecs:ap-northeast-1:000000000000:cluster/default",
        "TaskDefinition": "arn:aws:ecs:ap-northeast-1:000000000000:task-definition/example:1",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "Subnets": [
              "subnet-00000000000000000",
              "subnet-00000000000000001"
            ],
            "SecurityGroups": [
              "sg-00000000000000000"
            ],
            "AssignPublicIp": "ENABLED"
          }
        }
      },
      "ResultPath": "$.ecs-example",
      "End": true
    }
  }
}

このようにワークフローを定義することで、データのETL処理から集計処理に処理をつなげることができます。

まとめ

データ分析環境では、データの内容を確認する、統計処理するといったことを行います。これはデータの探索を行うという目的です。
このようなデータの探索を通して、本当に役立つデータを見つけ出し、そのようなデータを多くの利用者に提供することが大事だと思います。
そのためには今回紹介した用途に応じたデータ環境を作ることが役立つと思います。

参考

今回はあまり紹介していませんが、GlueはAWSでデータ活用をする上で良く紹介されるサービスです。
Glueを含めてAWSでのデータ活用について網羅的に知りたい場合は、以下の書籍が参考になると思います。