こんにちは、マネックス・ラボの田代です。
前回の記事(https://blog.tech-monex.com/entry/2020/07/17/191115)で次回は開発寄りの内容の記事を...などと書いておきながら、3回連続でAWSをテーマにしてしまいました。
書きやすいので。
はじめに
データ基盤におけるETLジョブの作成を、ビジュアルエディターと呼ばれるGUI上で行える新サービス、AWS Glue Studioが2020年9月にリリースされました。
このGlue Studioによってデータの抽出や変換と言ったETL処理をGUI操作のみで開発出来るようになり、また標準で用意されていない処理についてはCustom codeを記述して自由に開発する事も可能です。
今回はこのGlue Studioの標準機能とCustom codeを使って、タイトルの通りのETLジョブを作成してみたいと思います。
aws.amazon.com
やりたいこと
上図はData source及びData targetのそれぞれのS3のフォルダー構造とファイルの内容を表しています。Input fileはid
カラムの他にランダムな数字が入ったc1
~c3
とdate
カラムを持っており、日付毎に日次で蓄積されているものとします。
Input fileのデータをOutput fileの内容に変換するジョブを作成することが今回やりたいことです。
ポイントは、
- カラム
sd
を新たに追加して、c1
~c3
の標準偏差をデータとして格納する - Partition predicateを記述して処理対象のパーティションを日付で指定する
- 出力ファイルをParquet形式に変換する
の3点です。
前提事項
当記事では新機能であるGlue Studioの紹介にフォーカスしたいので、下記については前提事項として説明は割愛したいと思います。
- Data source及びData targetに当たるS3バケットとInput fileに当たるCSVファイルが用意されていること
- ETLジョブを実行するに当たって必要なGlueデータベース、テーブル、クローラ及びIAM Roleの設定が適切に行われていること
ジョブの全体フロー
各ノードの解説とCustom codeの実装
Data source - S3 bucket
Data sourceノードでS3を選択し、Glueデータベース及びテーブルを指定することでData sourceを設定することが出来ます。
オプション項目であるPartition predicateに任意のSpark SQLを記述して読み込み対象のパーティション(フォルダー)を指定可能です。ここでは下記の通りに記述して、ジョブ実行時の時刻から見て過去7日分のパーティションを読み込むようにしてみます。(先に記載した図の通りのS3のフォルダー構造になっていれば期待する動作になるはずです)
(partition_0 >= date_format(date_sub(current_date(), 7), 'yyyy') and partition_1 >= date_format(date_sub(current_date(), 7), 'yyyyMM') and partition_2 >= date_format(date_sub(current_date(), 7), 'yyyyMMdd'))
Transform - ApplyMapping
ApplyMappingノードを使って、各カラムのカラム名及びデータ型を簡単に変換することが出来ます。
ここではyyyy-mm-dd
のフォーマットのstring型文字列が入っているdate
カラムのデータ型を、date型に変更してみます。
Transform - Custom code
Custom codeノードでは独自の関数を定義することで、自由にETL処理を行わせることが出来ます。
今回は簡単ですがタイトルの通り、カラムsd
を追加してc1
~c3
の標準偏差を格納する処理を記述してみます。
様々な方法があると思いますが、当記事ではPySparkのudfを使って実装することにします。
def add_column (glueContext, dfc) -> DynamicFrameCollection: import numpy as np from pyspark.sql.functions import udf def get_std(c1, c2, c3): return float(np.std([c1, c2, c3])) udf_get_std = udf(get_std) df = dfc.select(list(dfc.keys())[0]).toDF() df_with_std = df.withColumn("sd", udf_get_std("c1", "c2", "c3")) output_df = DynamicFrame.fromDF(df_with_std, glueContext, "output_df") return DynamicFrameCollection({"output_df": output_df}, glueContext)
コードを入力したら、Output schemaにfloat型のカラムsd
を追加して、次のノードにDynamicFrameCollectionを渡します。
Transform - SelectFromCollection
先行ノードのCustom codeノードは出力がDynamicFrameCollection固定となっていますが、このままではData targetノードが受け取ることが出来ないので、
SelectFromCollectionノードを間に挟んでDynamicFrame型に変換します。今回はここでの設定項目は特にありません。
Data target - S3 bucket
最後にData targetノードでS3を選択して、出力ファイル形式、出力先S3のパスを設定します。
パーティションについては任意項目ですが、今回はData source側の構造をそのまま保持するように設定してみます。
ETLジョブの実行
ジョブが作成出来たら、Job detailsでIAM Roleの設定等を行なった上で、実行してみましょう。
Run detailsでSucceededになっていればETL処理は成功です。
それでは、ETL処理が行われた出力ファイルをAthenaで見てみます。
カラムdate
の後ろにsd
が追加され、標準偏差が格納されていることが分かります。
念のためランダムサンプリングして計算してみましたが、計算結果としても正しそうです。
まとめ
以上のように、Glue Studioを使ってETLジョブを簡単に作成することが出来ました。
未だリリースされてから日が浅いためか動作が不安定に感じられる部分もあったのですが(設定した内容が上手く保存されない等)、
AWSでデータ基盤を構築する上で強力な機能の一つであることは間違いないと思いますし、今後にも期待したいです。
今回の記事がこれからGlue Studioを使う方の何らかの参考になれば幸いです。