私も触るまで、バッチ=夜間処理?とか考えてしまいがちでしたが Azure BatchもAWS Batchも、時間のかかる処理を分散並列で処理する、分散コンピューティングを管理するサービスです。
今回は、軽い分析プログラムを実行するだけにとどめておきますが、定期的に最新のデータを利用してCNNのモデル更新を行ったりする必要があるシステムとか、GPUをガンガン利用する分析やMPI等を使用して分散並列するプログラムに良さそうです。
インスタンスを常時起動してたりすると、料金が高くなってしまうので、料金を抑えたい目的にも合うかもしれません。マッチしそうなシステムでは是非使ってみてください。
では、今回は例として、以下の簡易システムを構築するのを目標とします。
- プログラム経由で、Azure Batchサービスを呼び出しJob(実行タスク)を登録します。
- Jobが溜まるとAzure Batchが実行するインスタンスを起動します。
- Azure Filesサービス上に前もって実行プログラムを配置しておき、起動したインスタンスにマウントします。
代替案として、Gitからcloneする方法などもありえます。 - Azure Container Registerにプログラム実行環境のdockerイメージを登録しておき
インスタンス上にpullして環境のセットアップを簡略化します。 - プログラム内では、Blob StorageにあるCSVファイルを読み込み、結果を画像でBlob Storageに出力します。
コンテナ準備
今回の簡単な処理だとあまりdockerを利用する意味もなさそうですが、Azure Batchでコンテナ実行する学習と思って使用しております。
実行環境が整うまでの速度は、Azureがマーケットで準備している環境だけで実行するのが一番速いので、Azureの環境にノード起動時にインストールシェル等を実行して環境を作成するか、軽量なコンテナを利用するか、カスタムOSイメージを使うかの選択は各システムのプライオリティで選択してください。
Dockerfileの作成
本Dockerfileでは、python3.6をベースにして、日本語環境に変更後、./requirements.txtファイルで必要なライブラリをpipするのみです。
FROM python:3.6
USER root
RUN apt-get update
RUN apt-get -y install locales && \
localedef -f UTF-8 -i ja_JP ja_JP.UTF-8
ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm
RUN pip install --upgrade pip
ADD requirements.txt /tmp/
WORKDIR /tmp/
RUN pip install -r ./requirements.txt
azure-common
azure-nspkg
azure-storage
pandas
matplotlib
fbprophet
Azure Container Registryサービスの作成
Dockerイメージは、Azure Container Registryにpushしておきますので、サービスを事前に作成しておきます。
以下の項目については、以後使用しますが、サンプルコード等に書かれている箇所はユーザが作成した名前に読み替えてください。
- ログインサーバー
- ユーザ名
- パスワード
では、Dockerイメージをビルドして、Azure Container Registryにpushします。
# docker build -t pg_master . →1.
# docker login --username pgrep --password {password} pgrep.azurecr.io →2.
# docker tag pg_master pgrep.azurecr.io/pg_master:0.1 →3.
# docker push pgrep.azurecr.io/pg_master:0.1 →4.
- Dockerfileとrequirements.txtがあるディレクトリで実行します。タグの名前は任意に変更してください。(例:pg_master)
- Azure Container Registryのユーザ名、パスワード、サーバを指定してログインします。
- 先ほどビルドしたpg_masterをpgrep.azurecr.io/pg_master:0.1とサーバ情報とバージョンを付けます。このタグは、Azure Batchの実行コンテナを指定するときに利用します。
- Azure Container Registryにpushします。
本作業で、作成したDocker imageがAzure Portal上から確認できます。
分析プログラムの準備
今回使用する分析プログラムは、Facebookが公開している時系列解析ライブラリ Prophetを使用して、日経平均株価の予測を行うといったものです。
ストレージアカウントの準備
プログラムでアクセスするストレージアカウントを準備します。アカウント作成後、プログラムで利用するアクセス キーを記憶しておきます。
分析データファイルの保存先であるcsvコンテナ、分析結果を保存するimagesコンテナを作成しておきます。
分析データ
実際のシステムですと、なにかしらのAPIにアクセスして、データを取得する等になると思いますが、今回はYahooファイナンスから2014/06/12~2016/06/12のデータをダウンロードして、先ほど作成したcsvコンテナにdata.csvとして保存します。
コード
Azure Blob StorageからCSVをダウンロードしてきて、Prophetを実行してAzure Blob Storageにグラフ画像を出力するといった処理を行います。今回は、Azure Batchのシステムを構築する事をメインにしておりますので、コード詳細は省略しますが、Prophetは時系列データの予測を簡単に行えるので興味のある方は調べてみてください。(要望があれば次回のネタにするかも?)
pg.shはAzure Batchから呼び出すときに直接pythonを呼び出すのではなく、後処理とかを追加する必要になったりする場合に便利なのでシェルにしております。
import pandas as pd
import matplotlib.pyplot as plt
from fbprophet import Prophet
from datetime import datetime
from azure.storage.blob import BlockBlobService
import tempfile
account_name='{account name}'
account_key='{account key}'
container_name='csv'
upload_container_name='images'
blob_name='data.csv'
with tempfile.NamedTemporaryFile() as fp:
file_name = fp.name
service = BlockBlobService(account_name=account_name, account_key=account_key)
service.get_blob_to_path(container_name,blob_name,file_name)
data = pd.DataFrame()
data2 = pd.read_csv(file_name, skiprows=1,header=None, names=['ds','Open','High','Low','Close','y','Volume'])
data = data.append(data2)
model = Prophet()
model.fit(data)
future_data = model.make_future_dataframe(periods=250, freq = 'd')
future_data = future_data[future_data['ds'].dt.weekday < 5]
forecast_data = model.predict(future_data)
upload_filename = datetime.now().strftime('%s')
with tempfile.NamedTemporaryFile() as fp:
file_name = fp.name
fig = model.plot(forecast_data).savefig(file_name)
service.create_blob_from_path(upload_container_name,upload_filename + '-1.png',file_name + ".png")
with tempfile.NamedTemporaryFile() as fp:
file_name = fp.name
model.plot_components(forecast_data).savefig(file_name)
service.create_blob_from_path(upload_container_name,upload_filename + '-2.png',file_name + ".png")
cd /pg/
python pg.py
exit 0
Azure Files上にpg.pyとpg.shを保存します。
Azure Batchの準備
リージョンを合わせておく以外には、入力に困る箇所は基本ありませんが、1点注意する箇所としては、プール割り当てモードを通常はBatchサービスを選択するのが大半ですが、Azure Reserved VM Instances を使用して Batch プールを作成する場合には、ユーザー サブスクリプション モードを選択する必要があります。
プールの作成
Azure Batchでは、起動するVMをプール単位で管理します。今回は、待機状態のタスクがあれば自動的にVMを立ち上げて、待機がなければシャットダウンするように作成します。
- コンテナが実行できるホストオペレーションシステムを選択します。オファーの項目にcontainerがあるubuntuを選択します。
- コンテナイメージには、今回作成したDockerイメージの情報を設定します。コンテナーレジストリには、Azure Container Registryサービスの情報を設定します。
- VMサイズは、任意のサイズを指定してください。今回は安いやつですが、GPU使う場合等はそれ相応のVMを指定する必要があります。
- スケール設定は、5分毎にチェックを行い、指定された数式を実行してVM数を決定します。以下の数式は、直行180秒の未実行のジョブ(タスク)の平均分、低優先度インスタンスを自動スケールする 最小インスタンス数は0、最大は2といった意味になります。
startingNumberOfVMs = 0;
maxNumberofVMs = 2;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetLowPriorityNodes=min(maxNumberofVMs,pendingTaskSamples);
Azure Batchを実行するインスタンスには、低優先度インスタンスと専用インスタンスがありますが
低優先度インスタンスは、料金が安いのですがAzure内でインスタンスが足りない場合等は停止する可能性があるインスタンスとなります。実際のシステムでは、専用インスタンスになるでしょうが、料金が安いので分析プログラムの優先度によっては、良い選択になるかもしれません。
次に開始タスクを設定します。開始タスクとは、インスタンスが初めてプールに参加する場合(スケール等)に1度実行される処理の事です。
コマンドラインでは、Azure Filesを/mnt/AzureFileShare にマウントして、タイムゾーンをAsia/Tokyoに設定しています。
/bin/sh -c "apt-get -y update && apt-get install -y cifs-utils && mkdir -p /mnt/AzureFileShare && mount -t cifs //pgstg.file.core.windows.net/pgstg /mnt/AzureFileShare -o vers=3.0,username={your name},password={your password},dir_mode=0777,file_mode=0777,serverino && ls -l /mnt/AzureFileShare && timedatectl set-timezone Asia/Tokyo"
今回のプール設定では使用しませんが、Azure Batchの利点として以下の項目があります。
- プールを任意の仮想ネットワーク上に配置する事ができ、仮想ネットワークのみに閉じれないPaaSサービスに比べてセキュリティを担保しやすい。
- 1インスタンス(ノード)の1vCoreに対して最大4個までのタスクを割り当てる事ができるので、分析が遅くなるが料金を抑える事ができる。逆に、1タスクが必要なvCoreを保証する事も出来ます。
ジョブの作成
Azure Batchの実行単位にはジョブの下に、タスクがあります。1つの分析プログラムを実行するぐらいの簡単な物の場合は、前もってジョブを作成しておき、プログラムやAzure Portalからはタスクとして投入する形の方が管理しやすいです。
複雑な処理の場合(前処理、実行、後処理などの工程がある)は、ジョブ単位で管理する方がよいでしょう。
ジョブIDは一意となるIDである必要があります。プールは前もって準備したプールを選択します。
タスクの作成と投入(分析実行)
実際に分析を実行するタスクを投入します。設定箇所は以下の項目です。
- タスクID:任意の一意なID
- ユーザーID:プールのautouser管理者に対して、Azure Filesのドライブマウントを行っているために、「プール autouser、管理者」を選択。
- コマンドライン:/pgディレクトリにAzure Filesのドライブがマウントされているので、/pg/pg.shを実行。
/bin/sh -c "/pg/pg.sh"
- イメージの名前:Dockerイメージのタグ
- コンテナーの実行オプション:docker runのオプション。ドライブをリンクする。実行後コンテナは削除する。
-v /mnt/AzureFileShare:/pg --rm
送信ボタンを押下して、5分ほど経過(自動スケールチェック)すると、プールサイズが変更され、ノードがアイドル状態になってからタスクが実行されます。
タスクの詳細を表示すると、実行が完了してAzure Blob Serviceにも画像が2枚保存されていることが確認できました。(内容が当たっているかは・・)
Javaからタスクの作成と投入(分析実行)
Javaでタスクを投入する例を以下に記載します。Azure BatchのJavaライブラリがあるので、基本的にはAzure Portal上で行った設定を同じようにプログラムで設定し、createTaskで投入します。
投入後は、ループ等でタスクの状態を取得して完了・エラー処理を実施します。
String azureBatchUrl = "{azure batch url}";
String azureBatchAccount = "azure batch account";
String azureBatchKey= "azure batch key";
String jobCmdLine = "/bin/sh -c \"/pg/pg.sh\"";
String taskId= "1";
String azureBatchJobName = "normal_job";
String azureBatchImageName = "pgrep.azurecr.io/pg_master:0.1";
String azureBatchContainerOption = "-v /mnt/AzureFileShare:/pg --rm";
BatchClient client = BatchClient.open(new BatchSharedKeyCredentials(azureBatchUrl, azureBatchAccount, azureBatchKey));
try {
TaskAddParameter taskToAdd = new TaskAddParameter();
taskToAdd.withId(taskId)
.withUserIdentity(
new UserIdentity().withAutoUser(new AutoUserSpecification().withElevationLevel(ElevationLevel.ADMIN).withScope(AutoUserScope.TASK)))
.withContainerSettings(new TaskContainerSettings()
.withImageName(azureBatchImageName)
.withContainerRunOptions(azureBatchContainerOption))
.withConstraints(new TaskConstraints().withMaxTaskRetryCount(-1)).withCommandLine(jobCmdLine);
client.taskOperations().createTask(azureBatchJobName, taskToAdd);
long timeout = execJobBatchTimeout * 1000;
long startTime = System.currentTimeMillis();
// タイムアウト判定
while (true) {
if (System.currentTimeMillis() - startTime > timeout) {
break;
}
// 終了チェック
CloudTask task = client.taskOperations().getTask(azureBatchJobName, taskToAdd.id());
if (task.state() != TaskState.COMPLETED) {
Thread.sleep(100);
}
}
} catch (Exception e) {
logger.error("Batch実行時のエラー", e);
}
<!-- https://mvnrepository.com/artifact/com.microsoft.azure/azure-batch -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-batch</artifactId>
<version>3.3.0</version>
</dependency>
Dockerイメージを作成して、Azure Batchを利用して分析プログラムを動かす一連の設定を行ってみました。
今回は触れていませんが、ジョブやタスクの実行にはリトライを設定する事も出来ますので、分析や集計等を定期的に実行する必要がある色々な用途に使用できると考えておりますので、興味がある方は是非利用してみてください。