BigQueryからJSON形式で取得したデータをS3にPutするLambdaをSAMで構築する

こんにちは、証券企画室エンジニアの田代です。
先日、私たちが開発しているferciと言うスマホアプリに、コミュニティ機能に続いて株取引機能が追加されました。
最短4タップで株取引ができます。是非使ってみてください。

はじめに

昨今機械学習やデータ分析が大変身近なものになり、それに伴って様々なBIツールやSaaSが登場しました。
そう言った中で、技術的な理由、何らかの経緯や事情によりデータの在りかが複数のプラットフォーム上に分散してしまい、集約する必要が出てくるケースもあるのではないでしょうか。
そこで今回は、Googleが提供するBI SaaSであるBigQuery上のデータをS3上に保管するLambda関数を、SAMで構築する方法をご紹介したいと思います。

AWS SAMについて簡単に

AWS SAM(Serverless Application Model)とは、CloudFormationのテンプレートを拡張したYAML(またはJSON)ファイルに基づいて、 Lambdaをビルド、CFnスタックとしてデプロイすることができるフレームワークです。
当記事ではテンプレートの記述内容を取り上げますので、 インストールやCLIの操作方法については公式のGitHubなどをご覧頂ければと思います。
github.com

やりたいこと

f:id:monex_engineer:20200115170111p:plain BigQueryからデータを取得する方法としてExtract JobやStorage APIを使うといったいくつかのオプションが存在しますが、 今回はシンプルにPythonでクエリを実行して1つのテキストファイルとして保存する方法を取りたいと思います。
この方法は手間はかかりませんが、ファイルサイズがLambdaの/tmp領域の上限である512MBまでになる、といった制約があることに注意してください。

実装とテンプレート

f:id:monex_engineer:20200115170137p:plain

# coding: utf-8

import boto3
from google.cloud import bigquery


bq = bigquery.Client()
s3 = boto3.resource('s3')


def lambda_handler(event, context):
  project = event['gcp_project'] # GCPのプロジェクト名
  dataset = event['bq_dataset'] # BigQueryの対象データセット名
  table = event['bq_table'] # BigQueryの対象テーブル名
  bucket_name = event['bucket_bame'] # 保管先のS3バケット名

  # Build and execute a query string
  query = 'SELECT TO_JSON_STRING(t) FROM (SELECT * FROM `{}.{}.{}`) AS t'.format(project, dataset, table) # *1
  res = bq.query(query).result()

  # Write results to a temporary file
  tmp_file = '/tmp/foo.txt'
  with open(tmp_file, mode='w', encoding='utf-8') as f:
    for row in res:
      f.write(row[0] + '\n')

  # Upload a file to a S3 bucket
  bucket = s3.Bucket(bucket_name)
  s3_object = 'raw/' + dataset + '/' + table + '.txt' # *2
  bucket.upload_file(
    tmp_file,
    s3_object,
    ExtraArgs={
      'ContentType': 'text/plain',
      'ACL': 'bucket-owner-full-control' # *3
    }
  )
  print('Upload completed: ' + bucket_name + '/' + s3_object)

  return {
    'statusCode': 200,
    }

*1 クエリ文字列を構築します。BigQueryの入れ子構造のデータに対応するため、TO_JSON_STRING()関数を使ってJSON形式で取得します。
*2 保管先のS3のプレフィックスとファイル名を指定します。日付毎にフォルダを切りたい場合は追加で指定してください。
*3 ファイルのACLを指定します。サンプルではバケットの所有者にフルアクセスを付与していますが、望ましくない場合は適宜指定してください。

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Sample template
Resources:
  SampleFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: lambda_function.lambda_handler
      Runtime: python3.7
      Environment:
        Variables:
          GOOGLE_APPLICATION_CREDENTIALS: './gcp-credential.json'
      Events:
        SampleEvent1:
          Type: Schedule
          Properties:
            Schedule: cron(0 0 ? * * *)
            Name: SampleEvent1
            Description: A sample cron event
            Input: '{"gcp_project: "foo", "bq_dataset": "bar", "bq_table": "hoge", "bucket_name": "fuga"}'
            Enabled: True

line 12: PythonGCPモジュールは環境変数GOOGLE_APPLICATION_CREDENTIALSを参照して接続用の認証情報を取得するので、認証情報を記載したファイルのパスをここで指定します。
サンプルではlambda_function.pyと同じ階層にgcp-credential.jsonが置かれている想定です。
line 17: Lambdaの起動条件としてCloudWatch EventsのcronスケジュールをUTCで指定します。
line 20: Lambdaに渡す引数をJSON形式で指定します。

boto3
google-cloud-bigquery

まとめ

上記のテンプレートを使用してビルド、デプロイすると、BigQueryのデータをS3にPutするLambda関数とCloudWatch Eventsを、Pythonの依存関係を解決しながらまとめて構築することができます。
これで、BigQuery上に存在していたデータをAWSのAthenaやGlueなどから扱えるようになりました。
実際の運用時には数GB以上のデータを扱いたい、イベントドリブンでリアルタイム処理がしたい、などの要件が出てくると思いますし、 そういったケースでは今回ご紹介した仕組みでは対応しきれなくなりますが、スモールスタートでとりあえずやってみたいといった際の参考にして頂ければと思います。

田代 侑大証券企画室 エンジニア