なになれ

IT系のことを記録していきます

Redshift Serverlessを用いてdbtのGetting startedを実施する

本投稿では、Redshift Serverlessを用いてdbtのGetting startedを実施します。
dbtだとBigQueryを使っている事例が多い印象です。Redshiftでもできるということを確認してみたく実施してみました。

実施するのはdbtの公式ページにある下記の内容です。異なる点として、dbt Cloudではなく、dbt Coreを使用します。

docs.getdbt.com

dbtはSQLを使ってデータ変換処理を行い、データモデルを手軽に作成できるツールです。
dbtを試すにはdbtに対応したデータの環境が必要です。今回はRedshift Serverlessを使用します。
Redshift Serverlessには300ドル分の無料トライアルがありますので始めやすいと思います。
手順の詳細については参考情報も合わせてご覧ください。

手順

Redshift Serverlessを用意する

Redshift Serverlessの環境を作成する

AWSマネージメントコンソールを開いた状態です。
Amazon Redshiftの画面を開き、Redshift serverlessを選択します。
Amazon Redshift サーバーレスの使用を開始する」画面では、「デフォルト設定を使用」を選択します。
関連づけられたIAMロール→IAMロールを作成→「デフォルトのIAMロールを作成する」画面では、「任意のS3バケット」を選択し、IAMロールを作成します。そして、設定を保存します。
これでRedshift Serverlessが使えるようになります。

Redshift Serverlessに外部からアクセスできるようにする

Redshift Serverlessの画面から「default」のワークグループを選択し、設定画面を表示します。
ネットワークとセキュリティの項目で編集をします。
セキュリティグループを変更し、自身のローカルマシンからのアクセスを許可します。
「[パブリックにアクセス可能] をオンにする」に有効にして変更を保存します。

Redshift Serverlessの接続先情報をメモする

Redshift Serverlessの接続先情報をメモしておきます。
ワークグループの画面に表示されているエンドポイント名をメモしておきます。
名前空間の「default」を選択し、管理者パスワードを変更します。
管理者ユーザー名と管理者パスワードをメモしておきます。

これでRedshift Serverlessの準備は完了です。

dbtのGetting startedを実施する

dbtをインストールする

macの場合、brewでインストールできます。

$ brew update
$ brew tap dbt-labs/dbt
$ brew install dbt-redshift

バージョンが表示できればインストールできています。

$ dbt --version
Core:
  - installed: 1.2.1
  - latest:    1.2.1 - Up to date!

Plugins:
  - redshift: 1.2.1 - Up to date!
  - postgres: 1.2.1 - Up to date!

データを準備する

S3バケットを用意して、以下のファイルを該当のS3バケットにアップロードします。

Redshift Servelessの画面からRedshift query editor v2を表示します。
devデータベースに対して以下のクエリを実行します。

create schema if not exists jaffle_shop;
create schema if not exists stripe;

テーブルを作成します。

create table jaffle_shop.customers(
  id integer,
  first_name varchar(50),
  last_name varchar(50)
);

create table jaffle_shop.orders(
  id integer,
  user_id integer,
  order_date date,
  status varchar(50),
  _etl_loaded_at timestamp default current_timestamp
);

create table stripe.payment(
  id integer,
  orderid integer,
  paymentmethod varchar(50),
  status varchar(50),
  amount integer,
  created date,
  _batched_at timestamp default current_timestamp
);

データをコピーします。
fromには先ほどS3にアップロードしたファイルのS3 URIを指定します。
IAMロールにはRedshift Serverlessの環境を作成したときに同時作成したデフォルトのIAMロールのARNを指定します。

copy jaffle_shop.customers( id, first_name, last_name)
from 's3://dbt-data-lake-xxxx/jaffle_shop_customers.csv'
iam_role 'arn:aws:iam::XXXXXXXXXX:role/RoleName'
region 'ap-northeast-1'
delimiter ','
ignoreheader 1
acceptinvchars;
       
copy jaffle_shop.orders(id, user_id, order_date, status)
from 's3://dbt-data-lake-xxxx/jaffle_shop_orders.csv'
iam_role 'arn:aws:iam::XXXXXXXXXX:role/RoleName'
region 'ap-northeast-1'
delimiter ','
ignoreheader 1
acceptinvchars;

copy stripe.payment(id, orderid, paymentmethod, status, amount, created)
from 's3://dbt-data-lake-xxxx/stripe_payments.csv'
iam_role 'arn:aws:iam::XXXXXXXXXX:role/RoleName'
region 'ap-northeast-1'
delimiter ','
ignoreheader 1
Acceptinvchars;

データが存在することを確認します。

select * from jaffle_shop.customers;
select * from jaffle_shop.orders;
select * from stripe.payment;

dbt CoreでGetting startedを実施する

dbtの初期セットアップをします。

$ dbt init jaffle_shop

Redshift Serverlessの接続情報に変更します。
hostにはRedshift Serverlessのエンドポイント名を記載します。
userpasswordにはRedshift Serverlessの管理者ユーザー名と管理者パスワードを記載します。
dbnameにはdevデータベースを指定します。

~/.dbt/profiles.yml

jaffle_shop:
  outputs:
    dev:
      dbname: dev
      host: default.xxxxxxxxxx.ap-northeast-1.redshift-serverless.amazonaws.com
      password: xxxxxx
      port: 5439
      schema: dbt
      threads: 1
      type: redshift
      user: xxxxxx
  target: dev

接続できるかを確認します。

$ dbt debug

ここまででdbtの環境が作れました。以降はdbtのドキュメントどおりです。
dbtを使ってデータモデルを作ります。

models/customers.sql

with customers as (

    select
        id as customer_id,
        first_name,
        last_name

    from jaffle_shop.customers

),

orders as (

    select
        id as order_id,
        user_id as customer_id,
        order_date,
        status

    from jaffle_shop.orders

),

customer_orders as (

    select
        customer_id,

        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as number_of_orders

    from orders

    group by 1

),

final as (

    select
        customers.customer_id,
        customers.first_name,
        customers.last_name,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        coalesce(customer_orders.number_of_orders, 0) as number_of_orders

    from customers

    left join customer_orders using (customer_id)

)

select * from final

dbtを実行します。

$ dbt run

これでデータモデルが作成されます。

データモデルの作り方を変更します。
デフォルトはViewの形式でモデルを作りますが、テーブルの形式で実態がある状態でモデルを作ることができます。

dbt_project.yml

models:
  jaffle_shop:
    +materialized: table
    example:
      +materialized: view
$ dbt run

dbt.customersテーブルが新しく作成されていることが確認できます。

別のモデルを作成し、それを再利用する形でモデルを作ります。

models/stg_customers.sql

select
    id as customer_id,
    first_name,
    last_name

from jaffle_shop.customers

models/stg_orders.sql

select
    id as order_id,
    user_id as customer_id,
    order_date,
    status

from jaffle_shop.orders

models/customers.sql

with customers as (

    select * from {{ ref('stg_customers') }}

),

orders as (

    select * from {{ ref('stg_orders') }}

),

customer_orders as (

    select
        customer_id,

        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as number_of_orders

    from orders

    group by 1

),

final as (

    select
        customers.customer_id,
        customers.first_name,
        customers.last_name,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        coalesce(customer_orders.number_of_orders, 0) as number_of_orders

    from customers

    left join customer_orders using (customer_id)

)

select * from final

models/customers.sqlでref関数を使用してmodels/stg_customers.sqlmodels/stg_orders.sqlを利用して、モデルを作成します。

各データモデルに対して、テストを実施します。

models/schema.yml

version: 2

models:
  - name: customers
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null

  - name: stg_customers
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null

  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id
$ dbt test
23:29:11  Running with dbt=1.2.1
23:29:11  Found 3 models, 7 tests, 0 snapshots, 0 analyses, 289 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
23:29:11
23:29:12  Concurrency: 1 threads (target='dev')
23:29:12
23:29:12  1 of 7 START test accepted_values_stg_orders_status__placed__shipped__completed__return_pending__returned  [RUN]
23:29:12  1 of 7 PASS accepted_values_stg_orders_status__placed__shipped__completed__return_pending__returned  [PASS in 0.41s]
23:29:12  2 of 7 START test not_null_customers_customer_id ............................... [RUN]
23:29:13  2 of 7 PASS not_null_customers_customer_id ..................................... [PASS in 0.55s]
23:29:13  3 of 7 START test not_null_stg_customers_customer_id ........................... [RUN]
23:29:13  3 of 7 PASS not_null_stg_customers_customer_id ................................. [PASS in 0.23s]
23:29:13  4 of 7 START test not_null_stg_orders_order_id ................................. [RUN]
23:29:13  4 of 7 PASS not_null_stg_orders_order_id ....................................... [PASS in 0.21s]
23:29:13  5 of 7 START test unique_customers_customer_id ................................. [RUN]
23:29:14  5 of 7 PASS unique_customers_customer_id ....................................... [PASS in 0.56s]
23:29:14  6 of 7 START test unique_stg_customers_customer_id ............................. [RUN]
23:29:14  6 of 7 PASS unique_stg_customers_customer_id ................................... [PASS in 0.58s]
23:29:14  7 of 7 START test unique_stg_orders_order_id ................................... [RUN]
23:29:15  7 of 7 PASS unique_stg_orders_order_id ......................................... [PASS in 0.37s]
23:29:15
23:29:15  Finished running 7 tests in 0 hours 0 minutes and 3.58 seconds (3.58s).
23:29:15
23:29:15  Completed successfully
23:29:15
23:29:15  Done. PASS=7 WARN=0 ERROR=0 SKIP=0 TOTAL=7

各モデルのtestsに指定したuniquenot_nullの指定にしたがってテスト用のクエリが実行されます。

データモデルについてのドキュメントを作成します。

models/schema.yml

version: 2

models:
  - name: customers
    description: One record per customer
    columns:
      - name: customer_id
        description: Primary key
        tests:
          - unique
          - not_null
      - name: first_order_date
        description: NULL when a customer has not yet placed an order.

  - name: stg_customers
    description: This model cleans up customer data
    columns:
      - name: customer_id
        description: Primary key
        tests:
          - unique
          - not_null

  - name: stg_orders
    description: This model cleans up order data
    columns:
      - name: order_id
        description: Primary key
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']

データモデルのdescriptionやカラムのdescriptionに説明を記述できます。

$ dbt docs generate
$ dbt docs serve

データモデルの説明などがわかるドキュメントサイトが表示されます。

まとめ

dbtの公式Getting startedをRedshift Serverlessで試しました。dbtの使い方を知ることができる良いGetting startedだと思いました。
dbtにおいては、SQLを再利用できる他にテストやドキュメント生成をする機能が使えそうという印象を持ちました。機会があれば本格的に利用したいと思います。
レガシーなSQLコードをどのようにdbt向けにリファクタリングするかという内容やベストプラクティスの内容など本格的に使う場合にも参考になる情報が記載されていて良いと感じました。

参考情報