Codepipeline 結果を Slack に通知する

今回は色々拡張もできるように Lambda を使ってやってみます。
他の方法では、下記のような実現方法があるようです。
Amazon SNS + AWS Chatbotを使ってコーディングなしでCodePipelineのデプロイステータスをSlackへ通知する

参考

前提

  • コードパイプラインが作成されている事。
  • Slack の Webhook のURLが取得済みである事。

処理の流れ

  1. コードパイプラインが実行される。
  2. EventBridge がコードパイプラインのステータス変更を検知する。
  3. EventBridge が Lambda を実行する。
  4. Lambda が Slack に通知する。

EventBridge を作成する

  1. EventBridge のページを開き「ルールを作成」をクリックする。
  2. 下記の設定をする。




    とりあえず今回は、成功と失敗のみ通知する。

    監視対象のパイプラインを選択する。パイプラインARNは「arn:aws:codepipeline:region:account:pipeline-name」の形で入力する。
  3. 「ルールの作成」をクリックする。

Lambda を作成する

  1. Lambda のページを開き「関数の作成」をクリックする。
  2. 下記の通り関数を作成する。
  3. 下記のコードを設定する。
import json
import urllib.request
SLACK_URL = "https://hooks.slack.com/services/xxx"
CODEPIPELINE_URL = "https://{0}.console.aws.amazon.com/codepipeline/home?region={0}#/view/{1}"
SLACK_MESSAGE_TEXT = '''\
*{0}* `{1}` {3} <{2}|CodePipeline>
```
execution_id: {4}
```
'''
STATE_ICONS = {
    'FAILED': ':sweat_drops:',
    'SUCCEEDED': ':rainbow::rainbow::rainbow:'
}
def lambda_handler(event, context):
    region = event["region"]
    pipeline = event["detail"]["pipeline"]
    execution_id = event["detail"]["execution-id"]
    state = event["detail"]["state"]
    url = CODEPIPELINE_URL.format(region, pipeline)
    send_data = {
        'text': SLACK_MESSAGE_TEXT.format(
            pipeline, state, url, STATE_ICONS.get(state, ''),
            execution_id),
    }
    send_text = json.dumps(send_data)
    request = urllib.request.Request(
        SLACK_URL, 
        data=send_text.encode('utf-8'), 
        method="POST"
    )
    with urllib.request.urlopen(request) as response:
        response_body = response.read().decode('utf-8')

EventBridge と Lambda を紐づける

  1. 「トリガーを追加」をクリックする。

  2. EventBridge を選択し「既存のルール」で先ほど作成した EventBridge の arn を選択する。

  3. Lambda のトリガーに EventBridge が追加されたのを確認する。