Amazon Athenaで日時バッチ処理を作成
はじめに
EMR on Sparkで実行していた日時処理(の一部)を、AthenaのCTASで実装し直した記事です。
何故このような対応をしたかというと、単にコスト削減&高速化のためです。
ただ、実行するクエリや処理するデータ量によっては、この対応により逆に高コスト&低速になる場合もあるので、事前に評価を行う必要があります。
この対応でどのようになったか
この処理が毎日READ/WRITEするデータのサイズは下記のとおりです。
- READデータ:約 80 GB/日
- WRITEデータ:約 6 GB/日
この対応により、速度とコストは下記のようになりました。
EMR | Athena | |
---|---|---|
速度 | 約20分 | 約24分 |
コスト | 約$1.3 | 約$0.4 |
- 計算式については、後述します。
- Athenaの速度は、Sparkと同様のクエリでは約24分でしたが、クエリ内の「ORDER BY」を外すと約6分で終了しました。
- そのため、「ORDER BY」はこの処理で行わず、別の後続の処理でソートするようにしました。(後述)
Athenaの「ORDER BY」に関する記載は下記サイトを参照いただければと思います。
https://aws.amazon.com/jp/blogs/news/top-10-performance-tuning-tips-for-amazon-athena/
変更前(EMR on Spark)
変更前は下記のようになっていました。
サービス構成
AWSサービスの構成を図1に示します。VPC回りは省略しています。
図1 EMR on Spark
ハードウェア構成
ハードウェア構成です。
- マスターノード:m4.xlarge
- コアノード:c5.2xlarge (16台)
※マスターノード、コアノードともにスポットインスタンスを使用しています。
処理手順
- 事前に、EMR上で実行するPythonファイル(PySpark)をS3上に配置します。
- 毎日1:00にLambda関数を実行します。(CloudWatch Events)
- Lambda関数で、EMRのClusterを作成し、Stepを2つ追加します。
追加するステップの内容です。- Step1:S3上のPythonファイルをマスターノードの「/home/hadoop/」にコピーします。
- Step2:Pythonファイル(PySpark)を実行します。
- PySparkにより日時処理が実行され、目的のデータがS3上に出力されます。
コスト
2019年6月末時点で、アジアパシフィック(東京)のスポットインスタンスおよびEMRの価格は下記のようになっていました。
- m4.xlarge
- スポットインスタンス:$0.0665 /1 時間
- EMR:$0.06 /1 時間
- c5.2xlarge
- スポットインスタンス:$0.1331 /1 時間
- EMR:$0.085 /1 時間
処理1回分の費用を計算します。
- コスト:約 $1.3
- 計算式:(0.0665 + 0.06) + (0.1331 + 0.085) * 16 * 20 / 60 = 1.2897
- S3の金額はAthenaでも同額、またLambda, EBSは低額なので省略しています。
変更後(Athena CTAS)
変更後は下記のようになりました。
サービス構成
AWSサービスの構成を図2に示します。
図2 Athena CTAS
処理手順
- 毎日1:00にLambda関数を実行します。(CloudWatch Events)
Lambda内では下記の処理を行います。- 前日のCTASで作成したテーブルを削除します。(DROP TABLE)
- AthenaのCTASを実行します。
- CTASにより日時処理が実行され、目的のデータがS3上に出力されます。
コスト
変更後のコストを計算してみます。
- スキャンデータ:約80BG
- 日次データのREADファイルのサイズ
処理1回分の費用を計算します。
- コスト:$ 0.4
- 計算式:5 * 80 / 1000
- S3の金額はEMRと同額なので省略
今回の変更では、コストが1/3になっていました。
実装例
実装例を下記に記載します。SQLクエリはテスト用のものです。
変更前
まずは、変更前のPySparkのスクリプトです。このスクリプトをS3に置いて、EMRのマスターノードへダウンロード後、「spark-submit –driver-memory 10g exec.py」で実行していました。
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import DataFrame
from datetime import datetime, timedelta
def exec_query(spark: SparkSession, date: str, before_30_date):
prefix = f"s3://mybucket/input_table/"
df_table = spark.read.parquet(prefix)
df_table.registerTempTable("input_table")
sql = f"""
select col1, col2
from input_table
where date between '{before_30_date}' and '{date}'
group by col1, col2
order by col1, col2
"""
return spark.sql(sql)
def write_dataframe(df: DataFrame, date: str):
df.write.parquet(f"s3://mybucket/output_table/date={date}/", mode="overwrite")
if __name__ == '__main__':
target_day = datetime.now()
date = f'{target_day.strftime("%Y%m%d")}'
before_30_date = (target_day - timedelta(days=30)).strftime("%Y%m%d")
spark = SparkSession.builder.appName("test").getOrCreate()
df = exec_query(spark, date, before_30_date)
write_dataframe(df, date)
変更後
変更後は、Lambda関数で実行しています。前述のとおり、高速化のためクエリから「order by col1, col2」は除外しています。
import boto3
from datetime import datetime, timedelta
def execute_query(session: boto3.session.Session, sql: str):
client = session.client("athena")
response = client.start_query_execution(
QueryString=sql,
QueryExecutionContext={
"Database": "my_database"
},
ResultConfiguration={
"OutputLocation": "s3://mybucket/output_location/",
},
)
query_execution_id = response["QueryExecutionId"]
print(query_execution_id)
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status["QueryExecution"]["Status"]["State"]
print(query_execution_status)
def drop_table(session: boto3.session.Session):
drop_table_sql = f"""
drop table output_table_write
"""
execute_query(session, drop_table_sql)
def ctas(session: boto3.session.Session):
target_day = datetime.now()
date = f"{target_day.strftime('%Y%m%d')}"
before_30_date = (target_day - timedelta(days=30)).strftime("%Y%m%d")
ctas_sql = f"""
create table output_table_write
with (
format = 'PARQUET',
external_location = 's3://mybucket/output_table/date={date}/'
)
as
select col1, col2
from input_table
where date between '{before_30_date}' and '{date}'
group by col1, col2
"""
execute_query(session, ctas_sql)
def lambda_handler(event, context):
session = boto3.session.Session()
drop_table(session)
ctas(session)
return {
"status": "success"
}
なお、CTASは、既に同名のテーブルがある場合や、データの出力先(external_location)にデータがある場合は処理が失敗しますので、注意が必要です。
CTASのクエリに関する考慮事項と制約事項については、下記のサイトを参照いただければと思います。
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/considerations-ctas.html
以上