Digdag & ECS ワークフローエンジンでバッチ処理
前回S3へのPUTイベントをトリガーとしてLambda関数による機械学習モデルの構築と推論サービスの更新を自動化しました。あとは自動化のパイプラインの中で、仮想通貨のデータを取得してS3へ配置する作業を定期的に行いたいです。APIからデータを取得して、csv形式でS3へ保存するだけなのでシェルスクリプトとAWS CLIでも簡単に処理できますが、本稿ではワークフローエンジンであるDigdagをAWS ECS上で試してみました(EC2でなくFargateです)。ネタとしては若干古いですが個人的な備忘録として残しておきます。なお公式リポジトリにはStep Functionsで機械学習のモデル構築からデプロイまでオーケストレーションしている例があるのでそちらのほうが今っぽいと思われます。
機械学習における環境構築から開発・運用や推論を幅広くカバーするAWSのマネージドサービスであるAmazon SageMakerとLambdaを使用して仮想通貨の分足から終値価格のラベリングを予測する機械学習モデルの訓練や推論サービスのデプロイを自動化しました。Lambda関数でS3のPUTイベントを補足しましたが、公式リポジトリにはStep Functions/Lambda/CloudWatch Eventsを使用してデータをチェックし、モデルの再トレーニングや推論サービスのエンドポイントのデプロイを行うサンプルが既に存在します。[1]
Lambdaを使用する例にしても、Step Functions/Lambdaを使用する例にしても、データを取得してS3に置くという作業は必須となります。データの取得には定期的に取得する方法や、何かをトリガーとしてデータ取得をキックするなど複数の方法があると思いますが、今まで取り扱ってきた仮想通貨の分足については日次で特定の時間に取得することとして想定します。この日次のバッチ処理を行う上でどのような方法があるか考えてみました。
シェルスクリプト+AWS CLIを使用
シェルスクリプトでデータ取得や整形を行い、AWS CLIでS3へファイルを置くやり方です。作成したシェルスクリプトをcronなどのサービスで定期実行します。メリットは簡単に実装できることですが、まずエラー・失敗時の処理や排他処理が難しいです。またサーバやcronなどのサービスが動いていることを監視し、管理する必要があります。AWSの権限情報をローカルに保存する必要があるためにサーバのセキュリティも考慮する必要があります。
コンテナでワークフローエンジンを使用
Dockerのようなコンテナイメージにワークフローエンジンを導入してバッチ処理を行う方法です。Airflor/Argo/Digdagのようなワークフローエンジンはジョブの排他処理や依存関係のDAG(有向非巡回グラフ)表現ができます。エラー処理や排他処理はワークフローエンジンで実現できますが、サーバ管理を引き続き行う必要があります。AWSの権限情報をローカルに保存する必要があるためにコンテナを動作させるサーバのセキュリティも考慮する必要があります。
コンテナ管理でワークフローエンジンを使用
ECSやk8sなどのコンテナ管理・コンテナオーケストレーションツール上のコンテナ群でワークフローエンジンを動作させる方法です。Airflor/Argo/Digdagのようなワークフローエンジンはジョブの排他処理や依存関係のDAG(有向非巡回グラフ)表現ができます。エラー処理や排他処理をワークフローエンジンで吸収でき、AWS ECSで使用することでコンテナ管理から、さらにIAMロールによってAWSの権限の保存問題からも解放されます。またFargateの利用によってEC2のサーバインスタンスの管理も不要となります。
ecs-cliでAWS ECSを使ってみる
ECS(Elastic Container Service)では、docker-composeファイルを流用することでecs-cliを使う利点が広がります。ecs-cliだけで以下のようなECSへのデプロイ関連作業がほぼ可能です。今回はEC2ではなくFargateを使ってみました。[4]
- docker buildとECRへのイメージのプッシュ
- ECSタスク定義の作成と更新
- ECSタスク定義の実行
- ECSサービス定義
- ECSサービスの更新
まずはDocker本で出てくるidentidockというFlaskアプリケーションをecs-cliでAWS ECSへ乗せてみました。identidockは固有の情報(IPアドレスやユーザ名)から識別できる画像を生成するidenticonと呼ばれるサービスのDockerバージョンです。本の内容と同じですがコードをリポジトリに一式置いてあります。
$ git clone https://github.com/yuyasugano/identidock
$ docker-compose up -d
Creating network "identidock_default" with the default driver
Creating identidock_redis_1 ... done
Creating identidock_dnmonster_1 ... done
Creating identidock_identidock_1 ... done
ローカルサーバで実行して、『Donald Trump』と入力してみた例です。心なしか本人に似ているような気がします。
既にネタとなるdocker-composeはあるので、ここからecs-cliを使ってECSへサービスを乗せていきます。Amazon ECS CLIをAWS CLIとは別にインストールする必要があります。[5]
$ ecs-cli --version
ecs-cli version 1.15.1 (628b3e6)
チュートリアルを参考にFargateタスクのクラスターを作成してみました。チュートリアルの最初の方にあるタスク実行ロールの作成とタスク実行ロールのアタッチは事前に行っておく必要があります。
以下のコマンドでECS(Elastic Container Service)クラスタの設定ファイルを作成します。
$ ecs-cli configure --cluster identidock --config-name identidock --default-launch-type FARGATE --region ap-northeast-1
INFO[0000] Saved ECS CLI cluster configuration identidock.
.ecs/config
にクラスタ設定のファイルが保存されました。
version: v1
default: identidock
clusters:
identidock:
cluster: identidock
region: ap-northeast-1
default_launch_type: FARGATE
デフォルトのクラスタを identidock
に設定しておきます。
$ ecs-cli configure default --config-name identidock
ecs-cli
コマンドでクラスタを立ち上げます。クラスタの実行環境となるVPCやサブネット情報が出力されます。この値は後ほど ecs-params.yml
という設定ファイルで必要になります。
$ ecs-cli up
INFO[0001] Created cluster cluster=identidock region=ap-northeast-1
INFO[0002] Waiting for your cluster resources to be created...
INFO[0002] Cloudformation stack status stackStatus=CREATE_IN_PROGRESS
INFO[0063] Cloudformation stack status stackStatus=CREATE_IN_PROGRESS
VPC created: vpc-03f80f7e2f01300cf
Subnet created: subnet-03f8354246fc0f1e2
Subnet created: subnet-0cfd95169cc1b9541
Cluster creation succeeded.
タスクの実行に必要なセキュリティグループを作成します。この値も後ほど ecs-params.yml
で設定します。
$ aws ec2 create-security-group --group-name "identidock" --description "default security group of identidock" --vpc-id "vpc-03f80f7e2f01300cf"
{
"GroupId": "sg-0b0f8986268441bb4"
}
docker-compose.ymlファイルをそのまま利用してDockerコンテナのビルドとECR(Elastic Container Registry)へのプッシュを一挙にやってしまいます。docker-compose.ymlファイルの image
にECRリポジトリを指定しておくことで、 ecs-cli
コマンドでプッシュやタスク定義がdocker-compose.ymlファイルのままで行えるようになります。
この指定でローカルに ${AWS_ACCOUNTID}.dkr.ecr.ap-northeast-1.amazonaws.com/identidock:1.0
というタグ付きのコンテナイメージが作成でき、また名前空間やタグを変更することなくECRへプッシュできるので大変便利です。
version: "3"
services:
identidock:
build: .
image: ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/identidock:1.0
ports:
- "5000:5000"
environment:
ENV: DEV
volumes:
- ./app:/app
links:
- dnmonster
- redisdnmonster:
image: amouat/dnmonster:1.0redis:
image: redis
まずは docker-compose build
してみます。タグ付きのイメージがローカルに作成されました。
$ docker-compose build
dnmonster uses an image, skipping
redis uses an image, skipping
Building identidock
Step 1/10 : FROM python:3.6
---> 1bf411c4dc77
Step 2/10 : RUN groupadd -r uwsgi && useradd -r -g uwsgi uwsgi
---> Using cache
---> 4b5eed595214
...Removing intermediate container 776d8b61ff88
---> a24bc0e40286
Successfully built a24bc0e40286
Successfully tagged ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/identidock:1.0
このイメージをそのまま ecs-cli push
でECRへプッシュできてしまいます!! docker login
を行わずしてECRへイメージをアップロードできました。
$ ecs-cli push --region ap-northeast-1 --cluster-config identidock ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/identidock:1.0
INFO[0001] Pushing image repository=<aws-account-region>.dkr.ecr.ap-northeast-1.amazonaws.com/identidock tag=1.0
INFO[0010] Image pushed
ローカルでdocker-compose.ymlを使用していますが、ECS用にdocker-compose.production.ymlを作成しました。docker-composeファイルに加えて ecs-params.yml
を作成し、この2つのファイルからタスク定義や作成を行います。 ecs-cli
へ渡すdocker-compose.production.ymlはローカルで実行する際に比べ使用できるオプションなどに制約が多くエラーが発生することがあります(ファイル名はなんでも構いません)。
- docker-compose.production.yml
- ecs-params.yml
ecs-params.yml
は以下のようになりました。essentialというキーは、コンテナに障害があった場合にタスクを停止させるかどうかを指定しています。 true
の場合は障害があった場合にタスクを停止します。ここで作成したサブネットのサブネットIDやセキュリティグループのIDを指定しています。このidentidockアプリケーションはセキュリティグループでTCPの5000番を事前に開けておかないとアクセスができません。
タスク定義の実行が行える段階になりました。ECSではタスク定義1つにつき1つのdocker-composeファイルです。今回は1つのタスク定義だけ作成し実行しています。ECSにおけるタスクは複数のコンテナの集合体です。
$ ecs-cli compose --file docker-compose.production.yml --ecs-params ecs-params.yml up --create-log-groups
Fargateを利用する場合にはnetworkMode
を awsvpc
に設定する必要がありますが、その場合はdocker-composeファイルでのlinksはサポートされません。以下のエラーメッセージが出力されました。linksをdepends_onで置き換えましょう。
$ ecs-cli compose --file docker-compose.production.yml --ecs-params ecs-pa
rams.yml up --create-log-groups
ERRO[0000] Error registering task definition error="ClientException: Links are not supported when networkMode=awsvpc.\n\tstatus code: 400, request id: b783bb43-024c-40c5-b8a4-f2fbac927ab7" family=identidock
ALBなどのロードバランサを使用せず、タスク定義だけで走らせる場合はここまででOKです。 ecs-cli compose ps
コマンドで実行中のタスクを見ることでIPアドレスや稼働しているポートを確認可能です。ECSのコンソールからもタスク定義を確認することができました。docker-composeファイルと ecs-params.yml
を使用することでdocker-composeに慣れている人はより直感的にECSへコンテナをデプロイすることができると思います。
$ ecs-cli compose ps
WARN[0000] Skipping unsupported YAML option for service... option name=build service name=identidock
Name State Ports TaskDefinition Health
b52b0e5d-cb1c-4041-a91c-338e5fb71126/redis RUNNING identidock:1 UNKNOWN
b52b0e5d-cb1c-4041-a91c-338e5fb71126/identidock RUNNING X.X.X.X:5000->5000/tcp identidock:1 UNKNOWN
b52b0e5d-cb1c-4041-a91c-338e5fb71126/dnmonster RUNNING identidock:1 UNKNOWN
タスク定義や作成したクラスタをクリーンアップしておきます。
$ ecs-cli compose --file docker-compose.production.yml --ecs-params ecs-params.yml down
$ ecs-cli down
Are you sure you want to delete your cluster? [y/N]
y
Digdagをdocker-composeで使う
docker-composeとecs-cliを利用することでECSでのサービスやタスク定義、実行が効率良く行えることを確認しました。次はバッチ処理を行えるDigdag用のコンテナとその環境を立ち上げるdocker-composeファイルを準備します。AWSのコンテナ管理サービスであるECSやコンテナオーケストレータであるk8sを利用することで、サーバ管理も不要となりますが、運用しているサーバ上でdocker-composeをそのまま使うことも可能です。テストは手元のサーバで行っています。
Dockerコンテナのイメージを作成しました。このイメージはDigdagをサーバモードで立ち上げpostgresqlのデータベース接続を行います。Digdag実行時にオプションで渡せる他、 --config /etc/server.properties
という設定ファイルでデータベースの接続設定を読み込むことができます。docker-composeでpostgresqlを同時に立ち上げることでサーバモードでDigdagを起動させることができました。接続先のデータベースとしてコンテナ外のクラウドやオンプレ上のpostgresqlを使用することももちろん可能です。DigdagをECS上で動かす際には、RDS(Relational Database Service)のpostgresqlを使用します。
Dockerfileは以下を使用してビルドしました。
Digdagはデフォルトポート65432でアクセスできますが、ここでは自環境の外からアクセスするためにポート5000へ変更しています。docker-compose.ymlのサンプルです(ファイル名はdocker-compose.development.yml)。このサンプルでは environment
を使用して認証情報やS3の情報を渡しました。ローカルやサーバ上のDocker環境で実行するために認証情報をコンテナへ渡していますが、ECS上へ移行する際にECSのタスク実行ロールを許可することで、AWSの認証情報をコードや設定ファイルに保存しないというAWSのベストプラクティスに従うことができるようになります。
version: "3"
services:
digdag:
build: .
container_name: digdag_ecs
ports:
- "5000:65432"
depends_on:
- postgresql
environment:
ENVRONMENT: develop
AWS_ACCESS_KEY: "<Your AWS Access Key>"
AWS_SECRET_KEY: "<Your AWS Secret Key>"
AWS_S3_EXAMPLE_BUCKET: "<S3 Bucket name>"
AWS_S3_EXAMPLE_PATH: "<S3 Bucket path>"
postgresql:
image: postgres:11
container_name: postgres_ecs
volumes:
- ./fixtures/init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
docker-compose -f docker-compose.development.yml up
コマンドでDigdagとサーバモードで使用するpostgresqlを立ち上げることができます。 v0.9.39
のDigdagが立ち上がりました。
py
オペレータを使用することでpythonのコードを実行することができます。ここではクラスを使用せず <filename>.<methodname>
で関数呼び出しを行う方法を採用しました。このコンテナイメージではPython 3.6.9と以下のライブラリをインストールしています。 numpy
や pandas
がインストールされているので py
オペレータとpythonコードでデータ整形にそのまま使用できます。※出力は pip list
の一部です
awscli 1.16.139
boto3 1.9.129
botocore 1.12.129
numpy 1.17.4
pandas 0.25.3
requests 2.22.0
クラスを定義しない方法の解説です。ワークフローのdigファイルと同じ階層にPythonコードを配置することで、 <filename>.<methodname>
という記載でワークフロー側から関数を呼び出せます。[6]
サンプルで実行してみたPythonコードです。 python_tasks.py
としてワークフローと同じディレクトリへ保存しています。
サンプルのワークフロー python_sample.dig
です。 digdag push <project-name>
でディレクトリ直下の .dig
をプロジェクトへ登録し、 digdag run python_sample.dig
コマンドでワークフローを実行できます。プロジェクト登録するとDigdag UI上でもプロジェクトやワークフローが見えるようになります。プロジェクト登録しなくてもコマンドラインからは digdag run
でワークフローの実行ができました。
csvファイルへAPIからの取得情報を出力し、S3へファイルをアップロードするワークフローのサンプルです。 csv_ohlcv
や upload_tos3
は python_tasks.py
に事前に定義してあります。ワークフローのスケジューリングは digdag check
で確認可能です。システムの時刻設定とワークフロースクリプトでの timezone
のギャップに注意してください。
使用したコンテナイメージはDockerHub上にアップロードしてあります。Python 3.6.9と numpy
や pandas
入りで、Digdagのサーバモードを使用するコンテナイメージです。データベースの設定情報を変更したい場合は環境変数で渡すことができます。[7]
S3へのファイルアップロードにBoto3を使っています。Embulkを使用する場合は digdag secrets
でAWSの認証情報を暗号化し、embulk-output-s3の設定ファイルで読み込ませることが可能です。
DigdagをAWS ECSへ乗せる
Digdagをdocker-composeで実行することができたため、同様にECSでDigdagのコンテナを使用したタスクを定義します。Fargateを使用していきますので、サーバやインスタンスの管理は不要です。ECSのタスク実行ロールでS3へPUTする場合は、IAMロールの権限を変更しておく必要があります。その場合にはIAMの権限情報を渡さないように環境変数を渡す部分の削除やPythonコードの変更も必要です。まずは ecs-cli
がインストールされていることを確認します。
$ ecs-cli --version
ecs-cli version 1.15.1 (628b3e6)
ECS(Elastic Container Cluster)のクラスタ設定ファイルから作成します。
$ ecs-cli configure --cluster digdag --config-name digdag --default-launch-type FARGATE --region ap-northeast-1
INFO[0010] Saved ECS CLI cluster configuration digdag.
デフォルトのクラスタを digdag
に変更しました。
$ ecs-cli configure default --config-name digdag
ecs-cli
コマンドでクラスタを立ち上げます。このクラスタの実行環境となるVPCやサブネット情報が出力されます。
$ ecs-cli up
INFO[0001] Created cluster cluster=digdag region=ap-northeast-1
INFO[0002] Waiting for your cluster resources to be created...
INFO[0002] Cloudformation stack status stackStatus=CREATE_IN_PROGRESS
INFO[0063] Cloudformation stack status stackStatus=CREATE_IN_PROGRESS
VPC created: vpc-0202d7f7bed7a11e7
Subnet created: subnet-08a00cf09659e174a
Subnet created: subnet-07f59ab725ec7a0a1
Cluster creation succeeded
タスクの実行に必要なセキュリティグループをVPC配下に作成します。
$ aws ec2 create-security-group --group-name "digdag" --description
"default security group of digdag" --vpc-id "vpc-0202d7f7bed7a11e7"
{
"GroupId": "sg-00aaf768b16c2653c"
}
docker-compose.production.ymlというファイルを、ECS環境用として作成しました。ECR(Elastic Container Registry)へプッシュするイメージはこのdocker-compose.production.ymlから作成します。 ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/digdag-ecs:1.0
というタグ付きのコンテナイメージを作成してECRへプッシュしてください。Fargateを利用する場合ホスト側が存在しないためボリュームマウントができません。そのためpostgresqlのイメージは含めずにRDS(Relational Database Service)のpostgresqlを活用することにします。[8]
version: "3"
services:
digdag:
image: ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/digdag-ecs:1.0
container_name: digdag_ecs
ports:
- "65432:65432"
environment:
ENVRONMENT: production
AWS_ACCESS_KEY: "<Your AWS Access Key>"
AWS_SECRET_KEY: "<Your AWS Secret Key>"
AWS_S3_EXAMPLE_BUCKET: "<S3 Bucket name>"
AWS_S3_EXAMPLE_PATH: "<S3 Bucket path>"
logging:
driver: awslogs
options:
awslogs-group: digdag-ecs
awslogs-region: ap-northeast-1
awslogs-stream-prefix: digdag-ecs
docker-compose -f docker-compose.production.yml build
を実行してタグ付きのイメージをビルドし、そのまま ecs-cli push
でECRへプッシュします。 docker login
は不要です。
$ docker-compose -f docker-compose.production.yml build
$ ecs-cli push --region ap-northeast-1 --cluster-config digdag ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/digdag-ecs:1.0
INFO[0001] Creating repository repository=digdag-ecs
INFO[0001] Repository created
INFO[0001] Pushing image repository=${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/digdag-ecs tag=1.0
INFO[0090] Image pushed
ECS用のdocker-compose.production.ymlと ecs-params.yml
を作成し、この2つのファイルからタスク定義を行います。ECS用のDigdagではデータベースにRDSのpostgresqlを使用します。データベース接続情報はSecrets Managerを使用してコンテナ環境変数としてECSへ渡しました。[9]
FargateのタスクがSegret Managerからシークレットを取得するためにECSのタスク実行ロールへ secretsmanager:GetSecretValue
アクションを許可しておきます。RDSでpostgresqlを立ち上げ、各設定値をシークレットとしてSecret Managerへ保存しました。シークレットの作成は以下のように aws secretsmanager create-secret
で行えます。
$ aws secretsmanager create-secret --region ap-northeast-1 --name DB_USER --secret-string digdag
...
Digdag ECS用に作成したdocker-composeファイルと ecs-params.yml
です。
- docker-compose.production.yml
- ecs-params.yml
ecs-params.yml
は以下のようになりました。 secrets
以下でSeret Managerの値を取得したものを環境変数として設定しています。 value_from
はシークレットの名前か完全ARNで指定でき、 name
はコンテナ環境変数として使用する名前そのものです。
タスク定義ができたので実行します。以下のように ecs-cli compose up
で定義したタスクが立ち上がりました。
$ ecs-cli compose --file docker-compose.production.yml --ecs-params ecs-params.yml up --create-log-group
WARN[0000] Skipping unsupported YAML option for service... option name=container_name service name=digdag
INFO[0000] Using ECS task definition TaskDefinition="digdag-ecs:1"
...INFO[0044] Started container... container=73c330a9-412b-4016-8e32-22470b1d12fd/digdag desiredStatus=RUNNING lastStatus=RUNNING taskDefinition="digdag-ecs:1"
ecs-cli compose ps
コマンドで実行中のタスクを見ることができます。ポート65432を使用しているので、Digdag UIにアクセスするためにはECSのセキュリティグループで対象のTCPポート 65432を開放しておく必要があります。
$ ecs-cli compose ps
WARN[0000] Skipping unsupported YAML option for service... option name=container_name service name=digdag
Name State Ports TaskDefinition Health
1639028c-575c-4b14-8b54-2d843cdd9efs/digdag RUNNING X.X.X.X:65432->65432/tcp digdag-ecs:1 UNKNOWN
アクセスが確認できたら、タスク定義や作成したクラスタをクリーンアップしておきます。
$ ecs-cli compose --file docker-compose.production.yml --ecs-params ecs-params.yml down
$ ecs-cli down
Are you sure you want to delete your cluster? [y/N]
y
テストしたコードは以下のリポジトリへ置いてあります。
まとめ
- ワークフローエンジンは排他処理や依存関係のある処理もうまくやってくれる、依存関係はDAG(有向非巡回グラフ)で表現される
- 2019年はAirflow/Argo/Digdagといったワークフローエンジンが比較的よく使用されている
- ECSにFargateを使用する場合は制約がある(最新のドキュメントを確認要)