Digdag & ECS ワークフローエンジンでバッチ処理

Yuya Sugano
33 min readDec 17, 2019

--

前回S3へのPUTイベントをトリガーとしてLambda関数による機械学習モデルの構築と推論サービスの更新を自動化しました。あとは自動化のパイプラインの中で、仮想通貨のデータを取得してS3へ配置する作業を定期的に行いたいです。APIからデータを取得して、csv形式でS3へ保存するだけなのでシェルスクリプトとAWS CLIでも簡単に処理できますが、本稿ではワークフローエンジンであるDigdagをAWS ECS上で試してみました(EC2でなくFargateです)。ネタとしては若干古いですが個人的な備忘録として残しておきます。なお公式リポジトリにはStep Functionsで機械学習のモデル構築からデプロイまでオーケストレーションしているがあるのでそちらのほうが今っぽいと思われます。

digdag by Treasure Data, Inc

機械学習における環境構築から開発・運用や推論を幅広くカバーするAWSのマネージドサービスであるAmazon SageMakerとLambdaを使用して仮想通貨の分足から終値価格のラベリングを予測する機械学習モデルの訓練や推論サービスのデプロイを自動化しました。Lambda関数でS3のPUTイベントを補足しましたが、公式リポジトリにはStep Functions/Lambda/CloudWatch Eventsを使用してデータをチェックし、モデルの再トレーニングや推論サービスのエンドポイントのデプロイを行うサンプルが既に存在します。[1]

Lambdaを使用する例にしても、Step Functions/Lambdaを使用する例にしても、データを取得してS3に置くという作業は必須となります。データの取得には定期的に取得する方法や、何かをトリガーとしてデータ取得をキックするなど複数の方法があると思いますが、今まで取り扱ってきた仮想通貨の分足については日次で特定の時間に取得することとして想定します。この日次のバッチ処理を行う上でどのような方法があるか考えてみました。

シェルスクリプト+AWS CLIを使用

シェルスクリプトでデータ取得や整形を行い、AWS CLIでS3へファイルを置くやり方です。作成したシェルスクリプトをcronなどのサービスで定期実行します。メリットは簡単に実装できることですが、まずエラー・失敗時の処理や排他処理が難しいです。またサーバやcronなどのサービスが動いていることを監視し、管理する必要があります。AWSの権限情報をローカルに保存する必要があるためにサーバのセキュリティも考慮する必要があります。

コンテナでワークフローエンジンを使用

Dockerのようなコンテナイメージにワークフローエンジンを導入してバッチ処理を行う方法です。Airflor/Argo/Digdagのようなワークフローエンジンはジョブの排他処理や依存関係のDAG(有向非巡回グラフ)表現ができます。エラー処理や排他処理はワークフローエンジンで実現できますが、サーバ管理を引き続き行う必要があります。AWSの権限情報をローカルに保存する必要があるためにコンテナを動作させるサーバのセキュリティも考慮する必要があります。

コンテナ管理でワークフローエンジンを使用

ECSやk8sなどのコンテナ管理・コンテナオーケストレーションツール上のコンテナ群でワークフローエンジンを動作させる方法です。Airflor/Argo/Digdagのようなワークフローエンジンはジョブの排他処理や依存関係のDAG(有向非巡回グラフ)表現ができます。エラー処理や排他処理をワークフローエンジンで吸収でき、AWS ECSで使用することでコンテナ管理から、さらにIAMロールによってAWSの権限の保存問題からも解放されます。またFargateの利用によってEC2のサーバインスタンスの管理も不要となります。

クラウド基盤やワークフローエンジンは複数あるため、ユースケースに応じて選択する必要がありますが、ワークフローエンジンについては2019年現在Airflow/Argo/Digdagが比較的よく使われているようです。k8sを使いたい場合は、Argoを選択できます。S3へ取得したデータをPUTするだけであれば、Digdagですぐに実装できそうだったのでDigdagを使用してみました。[2]

日本語での最強のDigdagまとめです。[3]

以下の流れを確認しつつ試してみました。

  1. ecs-cliでAWS ECSを使ってみる
  2. Digdagをdocker-composeで使う
  3. DigdagをAWS ECSへ乗せる

ecs-cliでAWS ECSを使ってみる

ECS(Elastic Container Service)では、docker-composeファイルを流用することでecs-cliを使う利点が広がります。ecs-cliだけで以下のようなECSへのデプロイ関連作業がほぼ可能です。今回はEC2ではなくFargateを使ってみました。[4]

  • docker buildとECRへのイメージのプッシュ
  • ECSタスク定義の作成と更新
  • ECSタスク定義の実行
  • ECSサービス定義
  • ECSサービスの更新

まずはDocker本で出てくるidentidockというFlaskアプリケーションをecs-cliでAWS ECSへ乗せてみました。identidockは固有の情報(IPアドレスやユーザ名)から識別できる画像を生成するidenticonと呼ばれるサービスのDockerバージョンです。本の内容と同じですがコードをリポジトリに一式置いてあります。

$ git clone https://github.com/yuyasugano/identidock
$ docker-compose up -d
Creating network "identidock_default" with the default driver
Creating identidock_redis_1 ... done
Creating identidock_dnmonster_1 ... done
Creating identidock_identidock_1 ... done
identidock with Donald Trump

ローカルサーバで実行して、『Donald Trump』と入力してみた例です。心なしか本人に似ているような気がします。

既にネタとなるdocker-composeはあるので、ここからecs-cliを使ってECSへサービスを乗せていきます。Amazon ECS CLIをAWS CLIとは別にインストールする必要があります。[5]

$ ecs-cli --version
ecs-cli version 1.15.1 (628b3e6)

チュートリアルを参考にFargateタスクのクラスターを作成してみました。チュートリアルの最初の方にあるタスク実行ロールの作成とタスク実行ロールのアタッチは事前に行っておく必要があります。

以下のコマンドでECS(Elastic Container Service)クラスタの設定ファイルを作成します。

$ ecs-cli configure --cluster identidock --config-name identidock --default-launch-type FARGATE --region ap-northeast-1
INFO[0000] Saved ECS CLI cluster configuration identidock.

.ecs/config にクラスタ設定のファイルが保存されました。

version: v1
default: identidock
clusters:
identidock:
cluster: identidock
region: ap-northeast-1
default_launch_type: FARGATE

デフォルトのクラスタを identidock に設定しておきます。

$ ecs-cli configure default --config-name identidock

ecs-cli コマンドでクラスタを立ち上げます。クラスタの実行環境となるVPCやサブネット情報が出力されます。この値は後ほど ecs-params.yml という設定ファイルで必要になります。

$ ecs-cli up
INFO[0001] Created cluster cluster=identidock region=ap-northeast-1
INFO[0002] Waiting for your cluster resources to be created...
INFO[0002] Cloudformation stack status stackStatus=CREATE_IN_PROGRESS
INFO[0063] Cloudformation stack status stackStatus=CREATE_IN_PROGRESS
VPC created: vpc-03f80f7e2f01300cf
Subnet created: subnet-03f8354246fc0f1e2
Subnet created: subnet-0cfd95169cc1b9541
Cluster creation succeeded.

タスクの実行に必要なセキュリティグループを作成します。この値も後ほど ecs-params.yml で設定します。

$ aws ec2 create-security-group --group-name "identidock" --description "default security group of identidock" --vpc-id "vpc-03f80f7e2f01300cf"
{
"GroupId": "sg-0b0f8986268441bb4"
}

docker-compose.ymlファイルをそのまま利用してDockerコンテナのビルドとECR(Elastic Container Registry)へのプッシュを一挙にやってしまいます。docker-compose.ymlファイルの image にECRリポジトリを指定しておくことで、 ecs-cli コマンドでプッシュやタスク定義がdocker-compose.ymlファイルのままで行えるようになります。

この指定でローカルに ${AWS_ACCOUNTID}.dkr.ecr.ap-northeast-1.amazonaws.com/identidock:1.0 というタグ付きのコンテナイメージが作成でき、また名前空間やタグを変更することなくECRへプッシュできるので大変便利です。

version: "3"
services:
identidock:
build: .
image: ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/identidock:1.0
ports:
- "5000:5000"
environment:
ENV: DEV
volumes:
- ./app:/app
links:
- dnmonster
- redis
dnmonster:
image: amouat/dnmonster:1.0
redis:
image: redis

まずは docker-compose build してみます。タグ付きのイメージがローカルに作成されました。

$ docker-compose build
dnmonster uses an image, skipping
redis uses an image, skipping
Building identidock
Step 1/10 : FROM python:3.6
---> 1bf411c4dc77
Step 2/10 : RUN groupadd -r uwsgi && useradd -r -g uwsgi uwsgi
---> Using cache
---> 4b5eed595214
...
Removing intermediate container 776d8b61ff88
---> a24bc0e40286
Successfully built a24bc0e40286
Successfully tagged ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/identidock:1.0

このイメージをそのまま ecs-cli push でECRへプッシュできてしまいます!! docker login を行わずしてECRへイメージをアップロードできました。

$ ecs-cli push --region ap-northeast-1 --cluster-config identidock ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/identidock:1.0
INFO[0001] Pushing image repository=<aws-account-region>.dkr.ecr.ap-northeast-1.amazonaws.com/identidock tag=1.0
INFO[0010] Image pushed

ローカルでdocker-compose.ymlを使用していますが、ECS用にdocker-compose.production.ymlを作成しました。docker-composeファイルに加えて ecs-params.yml を作成し、この2つのファイルからタスク定義や作成を行います。 ecs-cli へ渡すdocker-compose.production.ymlはローカルで実行する際に比べ使用できるオプションなどに制約が多くエラーが発生することがあります(ファイル名はなんでも構いません)。

  • docker-compose.production.yml
  • ecs-params.yml
identidock docker-compose ECS example

ecs-params.ymlは以下のようになりました。essentialというキーは、コンテナに障害があった場合にタスクを停止させるかどうかを指定しています。 true の場合は障害があった場合にタスクを停止します。ここで作成したサブネットのサブネットIDやセキュリティグループのIDを指定しています。このidentidockアプリケーションはセキュリティグループでTCPの5000番を事前に開けておかないとアクセスができません。

identidock ecs-params.yml ECS example

タスク定義の実行が行える段階になりました。ECSではタスク定義1つにつき1つのdocker-composeファイルです。今回は1つのタスク定義だけ作成し実行しています。ECSにおけるタスクは複数のコンテナの集合体です。

$ ecs-cli compose --file docker-compose.production.yml --ecs-params ecs-params.yml up --create-log-groups
ecs-cli compose up

Fargateを利用する場合にはnetworkModeawsvpc に設定する必要がありますが、その場合はdocker-composeファイルでのlinksはサポートされません。以下のエラーメッセージが出力されました。linksをdepends_onで置き換えましょう

$ ecs-cli compose --file docker-compose.production.yml --ecs-params ecs-pa
rams.yml up --create-log-groups
ERRO[0000] Error registering task definition error="ClientException: Links are not supported when networkMode=awsvpc.\n\tstatus code: 400, request id: b783bb43-024c-40c5-b8a4-f2fbac927ab7" family=identidock

ALBなどのロードバランサを使用せず、タスク定義だけで走らせる場合はここまででOKです。 ecs-cli compose ps コマンドで実行中のタスクを見ることでIPアドレスや稼働しているポートを確認可能です。ECSのコンソールからもタスク定義を確認することができました。docker-composeファイルと ecs-params.yml を使用することでdocker-composeに慣れている人はより直感的にECSへコンテナをデプロイすることができると思います。

$ ecs-cli compose ps
WARN[0000] Skipping unsupported YAML option for service... option name=build service name=identidock
Name State Ports TaskDefinition Health
b52b0e5d-cb1c-4041-a91c-338e5fb71126/redis RUNNING identidock:1 UNKNOWN
b52b0e5d-cb1c-4041-a91c-338e5fb71126/identidock RUNNING X.X.X.X:5000->5000/tcp identidock:1 UNKNOWN
b52b0e5d-cb1c-4041-a91c-338e5fb71126/dnmonster RUNNING identidock:1 UNKNOWN
AWS ECS Console for identidock

タスク定義や作成したクラスタをクリーンアップしておきます。

$ ecs-cli compose --file docker-compose.production.yml --ecs-params ecs-params.yml down
$ ecs-cli down
Are you sure you want to delete your cluster? [y/N]
y

Digdagをdocker-composeで使う

docker-composeとecs-cliを利用することでECSでのサービスやタスク定義、実行が効率良く行えることを確認しました。次はバッチ処理を行えるDigdag用のコンテナとその環境を立ち上げるdocker-composeファイルを準備します。AWSのコンテナ管理サービスであるECSやコンテナオーケストレータであるk8sを利用することで、サーバ管理も不要となりますが、運用しているサーバ上でdocker-composeをそのまま使うことも可能です。テストは手元のサーバで行っています。

Dockerコンテナのイメージを作成しました。このイメージはDigdagをサーバモードで立ち上げpostgresqlのデータベース接続を行います。Digdag実行時にオプションで渡せる他、 --config /etc/server.properties という設定ファイルでデータベースの接続設定を読み込むことができます。docker-composeでpostgresqlを同時に立ち上げることでサーバモードでDigdagを起動させることができました。接続先のデータベースとしてコンテナ外のクラウドやオンプレ上のpostgresqlを使用することももちろん可能です。DigdagをECS上で動かす際には、RDS(Relational Database Service)のpostgresqlを使用します。

Dockerfileは以下を使用してビルドしました。

Dockerfile example for Digdag ECS

Digdagはデフォルトポート65432でアクセスできますが、ここでは自環境の外からアクセスするためにポート5000へ変更しています。docker-compose.ymlのサンプルです(ファイル名はdocker-compose.development.yml)。このサンプルでは environment を使用して認証情報やS3の情報を渡しました。ローカルやサーバ上のDocker環境で実行するために認証情報をコンテナへ渡していますが、ECS上へ移行する際にECSのタスク実行ロールを許可することで、AWSの認証情報をコードや設定ファイルに保存しないというAWSのベストプラクティスに従うことができるようになります。

version: "3"
services:
digdag:
build: .
container_name: digdag_ecs
ports:
- "5000:65432"
depends_on:
- postgresql
environment:
ENVRONMENT: develop
AWS_ACCESS_KEY: "<Your AWS Access Key>"
AWS_SECRET_KEY: "<Your AWS Secret Key>"
AWS_S3_EXAMPLE_BUCKET: "<S3 Bucket name>"
AWS_S3_EXAMPLE_PATH: "<S3 Bucket path>"
postgresql:
image: postgres:11
container_name: postgres_ecs
volumes:
- ./fixtures/init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"

docker-compose -f docker-compose.development.yml up コマンドでDigdagとサーバモードで使用するpostgresqlを立ち上げることができます。 v0.9.39 のDigdagが立ち上がりました。

docker-compose up for digdag_ecs

py オペレータを使用することでpythonのコードを実行することができます。ここではクラスを使用せず <filename>.<methodname> で関数呼び出しを行う方法を採用しました。このコンテナイメージではPython 3.6.9と以下のライブラリをインストールしています。 numpypandasがインストールされているので pyオペレータとpythonコードでデータ整形にそのまま使用できます。※出力は pip list の一部です

awscli          1.16.139
boto3 1.9.129
botocore 1.12.129
numpy 1.17.4
pandas 0.25.3
requests 2.22.0

クラスを定義しない方法の解説です。ワークフローのdigファイルと同じ階層にPythonコードを配置することで、 <filename>.<methodname> という記載でワークフロー側から関数を呼び出せます。[6]

py> operator runs a Python script

サンプルで実行してみたPythonコードです。 python_tasks.py としてワークフローと同じディレクトリへ保存しています。

python_tasks.py

サンプルのワークフロー python_sample.dig です。 digdag push <project-name> でディレクトリ直下の .dig をプロジェクトへ登録し、 digdag run python_sample.dig コマンドでワークフローを実行できます。プロジェクト登録するとDigdag UI上でもプロジェクトやワークフローが見えるようになります。プロジェクト登録しなくてもコマンドラインからは digdag run でワークフローの実行ができました。

python_sample.dig

csvファイルへAPIからの取得情報を出力し、S3へファイルをアップロードするワークフローのサンプルです。 csv_ohlcvupload_tos3python_tasks.py に事前に定義してあります。ワークフローのスケジューリングは digdag check で確認可能です。システムの時刻設定とワークフロースクリプトでの timezone のギャップに注意してください。

Digdag scheduled workflow

使用したコンテナイメージはDockerHub上にアップロードしてあります。Python 3.6.9と numpypandas 入りで、Digdagのサーバモードを使用するコンテナイメージです。データベースの設定情報を変更したい場合は環境変数で渡すことができます。[7]

https://hub.docker.com/r/suganoyuya/digdag-ecs

S3へのファイルアップロードにBoto3を使っています。Embulkを使用する場合は digdag secrets でAWSの認証情報を暗号化し、embulk-output-s3の設定ファイルで読み込ませることが可能です。

DigdagをAWS ECSへ乗せる

Digdagをdocker-composeで実行することができたため、同様にECSでDigdagのコンテナを使用したタスクを定義します。Fargateを使用していきますので、サーバやインスタンスの管理は不要です。ECSのタスク実行ロールでS3へPUTする場合は、IAMロールの権限を変更しておく必要があります。その場合にはIAMの権限情報を渡さないように環境変数を渡す部分の削除やPythonコードの変更も必要です。まずは ecs-cli がインストールされていることを確認します。

$ ecs-cli --version
ecs-cli version 1.15.1 (628b3e6)

ECS(Elastic Container Cluster)のクラスタ設定ファイルから作成します。

$ ecs-cli configure --cluster digdag --config-name digdag --default-launch-type FARGATE --region ap-northeast-1
INFO[0010] Saved ECS CLI cluster configuration digdag.

デフォルトのクラスタを digdag に変更しました。

$ ecs-cli configure default --config-name digdag

ecs-cli コマンドでクラスタを立ち上げます。このクラスタの実行環境となるVPCやサブネット情報が出力されます。

$ ecs-cli up
INFO[0001] Created cluster cluster=digdag region=ap-northeast-1
INFO[0002] Waiting for your cluster resources to be created...
INFO[0002] Cloudformation stack status stackStatus=CREATE_IN_PROGRESS
INFO[0063] Cloudformation stack status stackStatus=CREATE_IN_PROGRESS
VPC created: vpc-0202d7f7bed7a11e7
Subnet created: subnet-08a00cf09659e174a
Subnet created: subnet-07f59ab725ec7a0a1
Cluster creation succeeded

タスクの実行に必要なセキュリティグループをVPC配下に作成します。

$ aws ec2 create-security-group --group-name "digdag" --description
"default security group of digdag" --vpc-id "vpc-0202d7f7bed7a11e7"
{
"GroupId": "sg-00aaf768b16c2653c"
}

docker-compose.production.ymlというファイルを、ECS環境用として作成しました。ECR(Elastic Container Registry)へプッシュするイメージはこのdocker-compose.production.ymlから作成します。 ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/digdag-ecs:1.0 というタグ付きのコンテナイメージを作成してECRへプッシュしてください。Fargateを利用する場合ホスト側が存在しないためボリュームマウントができません。そのためpostgresqlのイメージは含めずにRDS(Relational Database Service)のpostgresqlを活用することにします。[8]

version: "3"
services:
digdag:
image: ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/digdag-ecs:1.0
container_name: digdag_ecs
ports:
- "65432:65432"
environment:
ENVRONMENT: production
AWS_ACCESS_KEY: "<Your AWS Access Key>"
AWS_SECRET_KEY: "<Your AWS Secret Key>"
AWS_S3_EXAMPLE_BUCKET: "<S3 Bucket name>"
AWS_S3_EXAMPLE_PATH: "<S3 Bucket path>"
logging:
driver: awslogs
options:
awslogs-group: digdag-ecs
awslogs-region: ap-northeast-1
awslogs-stream-prefix: digdag-ecs

docker-compose -f docker-compose.production.yml build を実行してタグ付きのイメージをビルドし、そのまま ecs-cli push でECRへプッシュします。 docker login は不要です。

$ docker-compose -f docker-compose.production.yml build
$ ecs-cli push --region ap-northeast-1 --cluster-config digdag ${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/digdag-ecs:1.0
INFO[0001] Creating repository repository=digdag-ecs
INFO[0001] Repository created
INFO[0001] Pushing image repository=${AWS_ACCOUNT_ID}.dkr.ecr.ap-northeast-1.amazonaws.com/digdag-ecs tag=1.0
INFO[0090] Image pushed

ECS用のdocker-compose.production.ymlと ecs-params.yml を作成し、この2つのファイルからタスク定義を行います。ECS用のDigdagではデータベースにRDSのpostgresqlを使用します。データベース接続情報はSecrets Managerを使用してコンテナ環境変数としてECSへ渡しました。[9]

FargateのタスクがSegret Managerからシークレットを取得するためにECSのタスク実行ロールへ secretsmanager:GetSecretValue アクションを許可しておきます。RDSでpostgresqlを立ち上げ、各設定値をシークレットとしてSecret Managerへ保存しました。シークレットの作成は以下のように aws secretsmanager create-secret で行えます。

$ aws secretsmanager create-secret --region ap-northeast-1 --name DB_USER --secret-string digdag
...

Digdag ECS用に作成したdocker-composeファイルと ecs-params.yml です。

  • docker-compose.production.yml
  • ecs-params.yml
digdag docker-compose ECS example

ecs-params.ymlは以下のようになりました。 secrets 以下でSeret Managerの値を取得したものを環境変数として設定しています。 value_fromシークレットの名前か完全ARNで指定でき、 name はコンテナ環境変数として使用する名前そのものです。

digdag ecs-params.yml ECS example

タスク定義ができたので実行します。以下のように ecs-cli compose up で定義したタスクが立ち上がりました。

$ ecs-cli compose --file docker-compose.production.yml --ecs-params ecs-params.yml up --create-log-group
WARN[0000] Skipping unsupported YAML option for service... option name=container_name service name=digdag
INFO[0000] Using ECS task definition TaskDefinition="digdag-ecs:1"
...
INFO[0044] Started container... container=73c330a9-412b-4016-8e32-22470b1d12fd/digdag desiredStatus=RUNNING lastStatus=RUNNING taskDefinition="digdag-ecs:1"

ecs-cli compose ps コマンドで実行中のタスクを見ることができます。ポート65432を使用しているので、Digdag UIにアクセスするためにはECSのセキュリティグループで対象のTCPポート 65432を開放しておく必要があります。

$ ecs-cli compose ps
WARN[0000] Skipping unsupported YAML option for service... option name=container_name service name=digdag
Name State Ports TaskDefinition Health
1639028c-575c-4b14-8b54-2d843cdd9efs/digdag RUNNING X.X.X.X:65432->65432/tcp digdag-ecs:1 UNKNOWN

アクセスが確認できたら、タスク定義や作成したクラスタをクリーンアップしておきます。

$ ecs-cli compose --file docker-compose.production.yml --ecs-params ecs-params.yml down
$ ecs-cli down
Are you sure you want to delete your cluster? [y/N]
y

テストしたコードは以下のリポジトリへ置いてあります。

https://github.com/yuyasugano/digdag-ecs

まとめ

  • ワークフローエンジンは排他処理や依存関係のある処理もうまくやってくれる、依存関係はDAG(有向非巡回グラフ)で表現される
  • 2019年はAirflow/Argo/Digdagといったワークフローエンジンが比較的よく使用されている
  • ECSにFargateを使用する場合は制約がある(最新のドキュメントを確認要)

--

--

Yuya Sugano

Cloud Architect and Blockchain Enthusiast, techflare.blog, Vinyl DJ, Backpacker. ブロックチェーン・クラウド(AWS/Azure)関連の記事をパブリッシュ。バックパッカーとしてユーラシア大陸を陸路横断するなど旅が趣味。