AmazonAthenaで日時バッチ処理を作成

Pocket

Amazon Athenaで日時バッチ処理を作成

はじめに

EMR on Sparkで実行していた日時処理(の一部)を、AthenaのCTASで実装し直した記事です。
何故このような対応をしたかというと、単にコスト削減&高速化のためです。

ただ、実行するクエリや処理するデータ量によっては、この対応により逆に高コスト&低速になる場合もあるので、事前に評価を行う必要があります。

この対応でどのようになったか

この処理が毎日READ/WRITEするデータのサイズは下記のとおりです。

  • READデータ:約 80 GB/日
  • WRITEデータ:約 6 GB/日

この対応により、速度とコストは下記のようになりました。

EMR Athena
速度 約20分 約24分
コスト 約$1.3 約$0.4
  • 計算式については、後述します。
  • Athenaの速度は、Sparkと同様のクエリでは約24分でしたが、クエリ内の「ORDER BY」を外すと約6分で終了しました。
  • そのため、「ORDER BY」はこの処理で行わず、別の後続の処理でソートするようにしました。(後述)

Athenaの「ORDER BY」に関する記載は下記サイトを参照いただければと思います。
https://aws.amazon.com/jp/blogs/news/top-10-performance-tuning-tips-for-amazon-athena/

変更前(EMR on Spark)

変更前は下記のようになっていました。

サービス構成

AWSサービスの構成を図1に示します。VPC回りは省略しています。

EMR

図1 EMR on Spark

ハードウェア構成

ハードウェア構成です。

  • マスターノード:m4.xlarge
  • コアノード:c5.2xlarge (16台)

※マスターノード、コアノードともにスポットインスタンスを使用しています。

処理手順

  1. 事前に、EMR上で実行するPythonファイル(PySpark)をS3上に配置します。
  2. 毎日1:00にLambda関数を実行します。(CloudWatch Events)
  3. Lambda関数で、EMRのClusterを作成し、Stepを2つ追加します。
    追加するステップの内容です。

    • Step1:S3上のPythonファイルをマスターノードの「/home/hadoop/」にコピーします。
    • Step2:Pythonファイル(PySpark)を実行します。
  4. PySparkにより日時処理が実行され、目的のデータがS3上に出力されます。

コスト

2019年6月末時点で、アジアパシフィック(東京)のスポットインスタンスおよびEMRの価格は下記のようになっていました。

  • m4.xlarge
    • スポットインスタンス:$0.0665 /1 時間
    • EMR:$0.06 /1 時間
  • c5.2xlarge
    • スポットインスタンス:$0.1331 /1 時間
    • EMR:$0.085 /1 時間

処理1回分の費用を計算します。

  • コスト:約 $1.3
    • 計算式:(0.0665 + 0.06) + (0.1331 + 0.085) * 16 * 20 / 60 = 1.2897
    • S3の金額はAthenaでも同額、またLambda, EBSは低額なので省略しています。

変更後(Athena CTAS)

変更後は下記のようになりました。

サービス構成

AWSサービスの構成を図2に示します。

EMR

図2 Athena CTAS

処理手順

  1. 毎日1:00にLambda関数を実行します。(CloudWatch Events)
    Lambda内では下記の処理を行います。

    • 前日のCTASで作成したテーブルを削除します。(DROP TABLE)
    • AthenaのCTASを実行します。
  2. CTASにより日時処理が実行され、目的のデータがS3上に出力されます。

コスト

変更後のコストを計算してみます。

  • スキャンデータ:約80BG
    • 日次データのREADファイルのサイズ

処理1回分の費用を計算します。

  • コスト:$ 0.4
    • 計算式:5 * 80 / 1000
    • S3の金額はEMRと同額なので省略

今回の変更では、コストが1/3になっていました。

実装例

実装例を下記に記載します。SQLクエリはテスト用のものです。

変更前

まずは、変更前のPySparkのスクリプトです。このスクリプトをS3に置いて、EMRのマスターノードへダウンロード後、「spark-submit –driver-memory 10g exec.py」で実行していました。

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import DataFrame
from datetime import datetime, timedelta


def exec_query(spark: SparkSession, date: str, before_30_date):
    prefix = f"s3://mybucket/input_table/"
    df_table = spark.read.parquet(prefix)
    df_table.registerTempTable("input_table")
    sql = f"""
    select col1, col2
    from input_table
    where date between '{before_30_date}' and '{date}'
    group by col1, col2
    order by col1, col2
    """
    return spark.sql(sql)


def write_dataframe(df: DataFrame, date: str):
    df.write.parquet(f"s3://mybucket/output_table/date={date}/", mode="overwrite")


if __name__ == '__main__':
    target_day = datetime.now()
    date = f'{target_day.strftime("%Y%m%d")}'
    before_30_date = (target_day - timedelta(days=30)).strftime("%Y%m%d")

    spark = SparkSession.builder.appName("test").getOrCreate()
    df = exec_query(spark, date, before_30_date)
    write_dataframe(df, date)

変更後

変更後は、Lambda関数で実行しています。前述のとおり、高速化のためクエリから「order by col1, col2」は除外しています。

import boto3
from datetime import datetime, timedelta


def execute_query(session: boto3.session.Session, sql: str):
    client = session.client("athena")
    response = client.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={
            "Database": "my_database"
        },
        ResultConfiguration={
            "OutputLocation": "s3://mybucket/output_location/",
        },
    )
    query_execution_id = response["QueryExecutionId"]
    print(query_execution_id)
    query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
    query_execution_status = query_status["QueryExecution"]["Status"]["State"]
    print(query_execution_status)


def drop_table(session: boto3.session.Session):
    drop_table_sql = f"""
    drop table output_table_write
    """
    execute_query(session, drop_table_sql)

def ctas(session: boto3.session.Session):
    target_day = datetime.now()
    date = f"{target_day.strftime('%Y%m%d')}"
    before_30_date = (target_day - timedelta(days=30)).strftime("%Y%m%d")
    ctas_sql = f"""
    create table output_table_write
    with (
      format = 'PARQUET',
      external_location = 's3://mybucket/output_table/date={date}/'
    )
    as
    select col1, col2
    from input_table
    where date between '{before_30_date}' and '{date}'
    group by col1, col2
    """
    execute_query(session, ctas_sql)

def lambda_handler(event, context):
    session = boto3.session.Session()
    drop_table(session)
    ctas(session)
    return {
        "status": "success"
    }

なお、CTASは、既に同名のテーブルがある場合や、データの出力先(external_location)にデータがある場合は処理が失敗しますので、注意が必要です。
CTASのクエリに関する考慮事項と制約事項については、下記のサイトを参照いただければと思います。
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/considerations-ctas.html

以上

Pocket

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です