CDKでAPI GatewayのアクセスログをAthenaで検索する仕組みを作る

マネックス証券 システム管理部のHです。

はじめに

あるとき、なんとなくAmazon API Gatewayでパスごとの呼び出し回数をカウントしてみたいと思いました。これができればどの機能が多く利用されているかを分析できるかなと。またバックエンドの負荷の遷移をAPI呼び出しの観点から分析することもできます。特定の時間帯に負荷が偏っているような場合には、その時間帯に呼ばれるAPIを調べることでバックエンドで負荷をかけている処理を特定することもできます。

やりたいこと

やり方はいくつかあると思いますが、今回はAPI GatewayのアクセスログをAmazon Kinesis Data Firehose経由でS3に出力し、それをAmazon Athenaでクエリを使用して検索する方法にします。

実装してみる

今回はAPI Gatewayの作成からAthenaのテーブル作成まで一通りCDKで実装します。CDKの実装はいつも通りJavaでやります。
CDKでは以下のリソースを作成します。

  • アクセスログ出力用S3バケット
  • API Gateway
  • Firehose
  • Athena データベース、テーブル

ちなみにCDKについては、この辺の記事で説明しているので、こちらも参照ください。

S3バケットの作成

アクセスログを保存するバケットを作成します。

private Bucket createS3Bucket() {
    return Bucket
        .Builder
        .create(stack, S3BUCKET_NAME)
        .bucketName(S3BUCKET_NAME)
        .removalPolicy(RemovalPolicy.DESTROY)
        .build();
}

API Gateway、Firehoseのリソース作成

API Gatewayとアクセスログ出力先のFirehoseに関するリソースを作成します。

アクセスログのフォーマット設定

アクセスログにはリクエスト時間、APIのID、リクエストID、パス、HTTPメソッド、プロトコル、ステータスコードをカンマ区切りで出力します。さらにそのままでは改行されないため、最後に改行コード(\n)を追加します。 アクセスログに出力できる項目は下記が参考になります。

docs.aws.amazon.com

private AccessLogFormat createAccessLogFormat() {
    // アクセスログの設定。CSV形式にする。
    String logFormatStr = AccessLogField.contextRequestTime() + ","
            + AccessLogField.contextApiId() + ","
            + AccessLogField.contextRequestId() + ","
            + AccessLogField.contextResourcePath() + ","
            + AccessLogField.contextHttpMethod() + ","
            + AccessLogField.contextProtocol() + ","
            + AccessLogField.contextStatus() + "\n";
     
    return AccessLogFormat.custom(logFormatStr);
}
Firehoseリソースの作成

Firehoseに設定するロールとFirehoseの配信ストリームを作成します。ロールにはS3に対するアクセス許可を設定します。
配信ストリームを作成するときには1つ注意が必要で、ストリーム名は「amazon-apigateway」で始まる必要があります。
ちなみにCDKのAPIリファレンスはサンプルコードが書いてあるので、かなり参考になります。

private IAccessLogDestination createLogDestination() {
    // S3バケットを作成
    Bucket bucket = createS3Bucket();
    // Firehose用のロールを作成
    Role firehoseRole = Role.Builder.create(stack, "firehoseRole")
        .assumedBy(new ServicePrincipal("firehose.amazonaws.com"))
        .inlinePolicies(Map.of("firehosePolicy", PolicyDocument.Builder.create()
            .statements(List.of(PolicyStatement.Builder.create()
                .effect(Effect.ALLOW)
                .actions(List.of(
                    "s3:AbortMultipartUpload", 
                    "s3:GetBucketLocation", 
                    "s3:GetObject", 
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads", 
                    "s3:PutObject"))
                .resources(List.of(
                    bucket.getBucketArn(),
                    bucket.getBucketArn() + "/*"))
                .build()))
            .build()))
        .build();

    // Firehose配信ストリーム作成
    CfnDeliveryStream firehoseStream = CfnDeliveryStream.Builder.create(stack, "firehoseStream")
        .deliveryStreamName("amazon-apigateway-eblog202401-stream")
        .s3DestinationConfiguration(S3DestinationConfigurationProperty.builder()
            .bucketArn(bucket.getBucketArn())
            .roleArn(firehoseRole.getRoleArn())
            .prefix(S3PREFIX_NAME)
            .bufferingHints(BufferingHintsProperty.builder()
                .intervalInSeconds(30)
                .build())
            .build())
        .build();

    return new FirehoseLogDestination(firehoseStream);
}
API Gatewayリソースの作成

確認用のREST APIを一つ作り、その下にパスを2つ、それぞれGETメソッドを作成します。中身はモックで固定レスポンスを返します。

private void createApiGateway() {
    String apiName = "Eblog202401";
    // Rest APIを作成
    RestApi api = RestApi.Builder.create(stack, apiName)
        .cloudWatchRole(true)
        .cloudWatchRoleRemovalPolicy(RemovalPolicy.DESTROY)
        .endpointConfiguration(EndpointConfiguration.builder()
            .types(List.of(EndpointType.REGIONAL))
            .build())
        .deploy(true)
        .deployOptions(StageOptions.builder()
            .accessLogFormat(createAccessLogFormat()) // アクセスログフォーマットの設定
            .accessLogDestination(createLogDestination()) // アクセスログの出力先を設定
            .loggingLevel(MethodLoggingLevel.INFO)
            .build())
        .description("エンジニアブログ 2024年1月用テストAPI")
        .build();
    
    // リソースとメソッド作成1
    Resource resource = api.getRoot().addResource("test");
    resource.addMethod("GET", MockIntegration.Builder.create()
        .requestTemplates(Map.of("application/json", "{\"statusCode\": 200}"))
        .integrationResponses(List.of(IntegrationResponse.builder()
            .responseTemplates(Map.of(
                "application/json", """
                {
                  \"statusCode\": 200,
                  \"message\": \"test is OK.\"
                 }
                """))
            .statusCode("200")
            .build()))
        .passthroughBehavior(PassthroughBehavior.NEVER)
        .build(), MethodOptions.builder()
        .methodResponses(List.of(MethodResponse.builder()
            .statusCode("200")
            .build()))
        .build());

    // リソースとメソッド作成2
    Resource resource2 = api.getRoot().addResource("test2");
    resource2.addMethod("GET", MockIntegration.Builder.create()
        .requestTemplates(Map.of("application/json", "{\"statusCode\": 200}"))
        .integrationResponses(List.of(IntegrationResponse.builder()
            .responseTemplates(Map.of(
                "application/json", """
                {
                  \"statusCode\": 200,
                  \"message\": \"test2 is OK.\"
                 }
                """))
            .statusCode("200")
            .build()))
        .passthroughBehavior(PassthroughBehavior.NEVER)
        .build(), MethodOptions.builder()
        .methodResponses(List.of(MethodResponse.builder()
            .statusCode("200")
            .build()))
        .build());
}

Athenaのデータベース、テーブル作成

Athenaとは書いていますが、実際にはAWS Glueのデータベースとテーブルを作成します。 テーブルですが、参照するS3上には「バケット名/プレフィックス/年/月/日/時」という風に階層が作成されてその下にオブジェクトが配置されるので、検索するデータのコストやパフォーマンスを考慮してパーティション射影を使用します。Javaで実装されている例があまりなかったので、やや苦労しました。
そんなときにはやはりAPIリファレンスは参考になります。あとはCloudformation定義ですね。CfnTableはCDKのコンストラクトで言えばL1コンストラクトになるので、Cloudformationの定義がわかれば、そのままCDKの実装に繋げられます。

   private void createAthena() {
        // データベースを作成
        String dbName = "eblog202401-db";
        CfnDatabase.Builder.create(stack, dbName)
            .catalogId(stack.getAccount())
            .databaseInput(DatabaseInputProperty.builder()
                .name(dbName)
                .build())
            .build();
        
        // テーブルを作成
        String tableName = "apigateway-accesslog";
        CfnTable.Builder.create(stack, tableName)
            .catalogId(stack.getAccount())
            .databaseName(dbName)
            .tableInput(TableInputProperty.builder()
                .description("エンジニアブログ 2024年1月用テーブル")
                .name(tableName)
                .tableType("EXTERNAL_TABLE")
                .parameters(Map.of(
                     "projection.enabled", "true",
                     "projection.datehour.type", "date",
                     "projection.datehour.format", "yyyy/MM/dd/HH",
                     "projection.datehour.range", "2024/01/01/00,NOW",
                     "projection.datehour.interval", "1",
                     "projection.datehour.interval.unit", "HOURS",
                     "storage.location.template", "s3://" + S3BUCKET_NAME + "/" + S3PREFIX_NAME + "${datehour}/"
                )) // パーティション射影の設定
                .partitionKeys(List.of(ColumnProperty.builder()
                    .name("datehour")
                    .type("string")
                    .build())) // パーティションキーの設定
                .storageDescriptor(StorageDescriptorProperty.builder()
                    // カラム設定
                    .columns(List.of(ColumnProperty.builder()
                        .name("request_time")
                        .type("string")
                        .comment("CLF 形式の要求時間 (dd/MMM/yyyy:HH:mm:ss +-hhmm)。")
                        .build(),
                        ColumnProperty.builder()
                        .name("id")
                        .type("string")
                        .comment("API Gateway が API に割り当てる識別子。")
                        .build(),
                        ColumnProperty.builder()
                        .name("request_id")
                        .type("string")
                        .comment("リクエストの ID。")
                        .build(),
                        ColumnProperty.builder()
                        .name("path")
                        .type("string")
                        .comment("リクエストパス。")
                        .build(),
                        ColumnProperty.builder()
                        .name("method")
                        .type("string")
                        .comment("使用される HTTP メソッドです。")
                        .build(),
                        ColumnProperty.builder()
                        .name("protocol")
                        .type("string")
                        .comment("HTTP/1.1 などのリクエストプロトコル。")
                        .build(),
                        ColumnProperty.builder()
                        .name("status_code")
                        .type("int")
                        .comment("メソッドレスポンスのステータス。")
                        .build()))
                    .inputFormat("org.apache.hadoop.mapred.TextInputFormat")
                    .outputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
                    .location("s3://" + S3BUCKET_NAME + "/" + S3PREFIX_NAME)
                    .serdeInfo(SerdeInfoProperty.builder()
                        .serializationLibrary("org.apache.hadoop.hive.serde2.OpenCSVSerde")
                        .parameters(Map.of("separatorChar", ",", "quoteChar", "\"", "escapeChar", "\\"))
                        .build()) // 区切り文字(CSV)の設定
                    .build())
                .build())
            .build();
    }

AWS環境への反映

AWS環境への反映((デプロイ)は以下のコマンドを実行します。

cdk deploy

確認してみる

AWS環境にデプロイされていることを確認します。

API Gateway

APIにパスが2つ作成されています。

Firehose

S3に送信するストリームが作成されています。

Athena

以下のテーブルが作成されています。

クエリの実行

APIを実行して、Athenaでクエリを実行する。
/testに対するリクエストを3回、/test2に対するリクエストを2回実行する。

  • 全件検索
    APIのパスやステータスコードなどが確認できます。

  • パスごとの呼び出し回数をカウント
    パスごとの呼び出し回数を確認できます。SQLなので、書き方を工夫すれば呼び出し回数の多いパストップ10などを表示することも可能です。

おわりに

CDKでAthenaのテーブル作成に多少躓いたのですが、パスの呼び出し回数を確認することができました。
アクセスログに出力できる項目は今回取り上げた項目以外にもいろいろあるので、組み合わせればいろいろな分析が行えると思います。