Athenaテーブル(S3)をパーティショニングする

Athenaテーブル(S3)をパーティショニングする

前回の記事「Kinesis Firehose を使ってアプリからS3へログを保存する」
から大分間が空いてしまいましたが、アプリログ分析の続きをしたいと思います。

FirehoseからS3に置いたデータは、/yyyy/mm/dd/hh/ の形式で階層出力されるのですが、残念なことにAthenaだとそのままではパーティショニングテーブルとして利用できません。

そこで対応方法として下記を検討しまして、
(1) dt=yyyy-mm-dd のパスに変換
(2) DDLで現状のディレクトリのままパーティションを定義
今回は(2)を採用しました。

DDLは自動実行させたいので、Lambdaを使って書きました。
(パーティショニングと合わせてdtカラムの日時をJSTへと変換しています)

import datetime
import time
import boto3

DATABASE = 'DATABASE_NAME'
TABLE = 'TABLE_NAME'
BACKET = 's3://BACKET_NAME/app_logs/'
ATHENA_PARTITIONING_RESULTS = 's3://BACKET_NAME/athena-partitioning-results/'

athena = boto3.client('athena')

def lambda_handler(event, context):

    # use UTC for S3 location path
    utc = datetime.datetime.utcnow()
    utc_hour = utc.strftime('%Y/%m/%d/%H')
    
    # use JST for partitioning key(dt)
    jst = utc + datetime.timedelta(hours=9)
    jst_hour = jst.strftime('%Y-%m-%d-%H')

    query = "ALTER TABLE %s ADD IF NOT EXISTS PARTITION (dt = '%s') LOCATION '%s%s';"% (TABLE, jst_hour, BACKET, utc_hour)

    execute_query(query)

def execute_query(query):
    print('Executing query:', query)
    start_query_response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': DATABASE
        },
        ResultConfiguration={
            'OutputLocation': ATHENA_PARTITIONING_RESULTS,
        }
    )
    is_query_running = True
    while is_query_running:
        get_query_execution_response = athena.get_query_execution(
            QueryExecutionId=start_query_response['QueryExecutionId']
        )
        
        query_response = get_query_execution_response['QueryExecution']
        query_state = query_response['Status']['State']
        is_query_running = query_state == 'RUNNING'
        
        if not is_query_running and query_state != 'SUCCEEDED':
            print (query_response)
            raise Exception('Query failed. Status: ' + query_state)

        time.sleep(3)

    print('Query completed')

作成したLambda関数は、CloudWatchのルールで1時間毎に実行しています。
下記はクエリ実行例ですが、パーティション作成後はdtカラムでスキャンする範囲を絞り込まめるようになっています。

#フルスキャン
select * from app_logs;
 (Run time: 3.31 seconds, Data scanned: 54.15 KB)

#日付範囲指定
select * from app_logs where dt between '2018-11-26-00' and '2018-11-26-23';
 (Run time: 0.99 seconds, Data scanned: 0.23 KB)

一つ課題があって、Athenaのテーブル定義を変更した際はパーティショニングを全て作り直すことになります。
この場合は期間をLambda関数に渡すなどして対応する予定です。

次回はAthenaのクエリでアプリログの分析をしてみたいと思います。

TAG

  • このエントリーをはてなブックマークに追加
sawatari
sawatari sawatari

プロジェクトマネージャー。人手不足に直面し自ら実装して怒られたりしていますが、コードが書ける管理職がいてもいいよねと自分に言い聞かせています。元々はDBエンジニアだったので、ビッグデータを使った分析にチャレンジ中。