こんにちは、証券企画室エンジニアの田代です。
先日、私たちが開発しているferciと言うスマホアプリに、コミュニティ機能に続いて株取引機能が追加されました。
最短4タップで株取引ができます。是非使ってみてください。
はじめに
昨今機械学習やデータ分析が大変身近なものになり、それに伴って様々なBIツールやSaaSが登場しました。
そう言った中で、技術的な理由、何らかの経緯や事情によりデータの在りかが複数のプラットフォーム上に分散してしまい、集約する必要が出てくるケースもあるのではないでしょうか。
そこで今回は、Googleが提供するBI SaaSであるBigQuery上のデータをS3上に保管するLambda関数を、SAMで構築する方法をご紹介したいと思います。
AWS SAMについて簡単に
AWS SAM(Serverless Application Model)とは、CloudFormationのテンプレートを拡張したYAML(またはJSON)ファイルに基づいて、
Lambdaをビルド、CFnスタックとしてデプロイすることができるフレームワークです。
当記事ではテンプレートの記述内容を取り上げますので、
インストールやCLIの操作方法については公式のGitHubなどをご覧頂ければと思います。
github.com
やりたいこと
BigQueryからデータを取得する方法としてExtract JobやStorage APIを使うといったいくつかのオプションが存在しますが、
今回はシンプルにPythonでクエリを実行して1つのテキストファイルとして保存する方法を取りたいと思います。
この方法は手間はかかりませんが、ファイルサイズがLambdaの/tmp領域の上限である512MBまでになる、といった制約があることに注意してください。
実装とテンプレート
# coding: utf-8 import boto3 from google.cloud import bigquery bq = bigquery.Client() s3 = boto3.resource('s3') def lambda_handler(event, context): project = event['gcp_project'] # GCPのプロジェクト名 dataset = event['bq_dataset'] # BigQueryの対象データセット名 table = event['bq_table'] # BigQueryの対象テーブル名 bucket_name = event['bucket_bame'] # 保管先のS3バケット名 # Build and execute a query string query = 'SELECT TO_JSON_STRING(t) FROM (SELECT * FROM `{}.{}.{}`) AS t'.format(project, dataset, table) # *1 res = bq.query(query).result() # Write results to a temporary file tmp_file = '/tmp/foo.txt' with open(tmp_file, mode='w', encoding='utf-8') as f: for row in res: f.write(row[0] + '\n') # Upload a file to a S3 bucket bucket = s3.Bucket(bucket_name) s3_object = 'raw/' + dataset + '/' + table + '.txt' # *2 bucket.upload_file( tmp_file, s3_object, ExtraArgs={ 'ContentType': 'text/plain', 'ACL': 'bucket-owner-full-control' # *3 } ) print('Upload completed: ' + bucket_name + '/' + s3_object) return { 'statusCode': 200, }
*1 クエリ文字列を構築します。BigQueryの入れ子構造のデータに対応するため、TO_JSON_STRING()関数を使ってJSON形式で取得します。
*2 保管先のS3のプレフィックスとファイル名を指定します。日付毎にフォルダを切りたい場合は追加で指定してください。
*3 ファイルのACLを指定します。サンプルではバケットの所有者にフルアクセスを付与していますが、望ましくない場合は適宜指定してください。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Sample template Resources: SampleFunction: Type: AWS::Serverless::Function Properties: Handler: lambda_function.lambda_handler Runtime: python3.7 Environment: Variables: GOOGLE_APPLICATION_CREDENTIALS: './gcp-credential.json' Events: SampleEvent1: Type: Schedule Properties: Schedule: cron(0 0 ? * * *) Name: SampleEvent1 Description: A sample cron event Input: '{"gcp_project: "foo", "bq_dataset": "bar", "bq_table": "hoge", "bucket_name": "fuga"}' Enabled: True
line 12: PythonのGCPモジュールは環境変数GOOGLE_APPLICATION_CREDENTIALSを参照して接続用の認証情報を取得するので、認証情報を記載したファイルのパスをここで指定します。
サンプルではlambda_function.pyと同じ階層にgcp-credential.jsonが置かれている想定です。
line 17: Lambdaの起動条件としてCloudWatch EventsのcronスケジュールをUTCで指定します。
line 20: Lambdaに渡す引数をJSON形式で指定します。
boto3 google-cloud-bigquery
まとめ
上記のテンプレートを使用してビルド、デプロイすると、BigQueryのデータをS3にPutするLambda関数とCloudWatch Eventsを、Pythonの依存関係を解決しながらまとめて構築することができます。
これで、BigQuery上に存在していたデータをAWSのAthenaやGlueなどから扱えるようになりました。
実際の運用時には数GB以上のデータを扱いたい、イベントドリブンでリアルタイム処理がしたい、などの要件が出てくると思いますし、
そういったケースでは今回ご紹介した仕組みでは対応しきれなくなりますが、スモールスタートでとりあえずやってみたいといった際の参考にして頂ければと思います。