celery で非同期通信を試してみる

2022年6月19日

参考

役割

  • タスク
    • 非同期で実行させる処理内容をひとかたまりにまとめたもの。
  • キュー
    • タスクを格納する入れ物。入れ物の構造が、先に入ったタスクから順に取り出す様になっているもの。
  • プロデューサー
    • タスクを作成してブローカーに渡す役割を持ったもの。(今回この役割を Celery Client が行う。)
  • ブローカー
    • 作成されたタスクをキューに登録したり、キューに登録されているタスクをワーカーに渡したりする役割を持ったもの。(今回この役割を redis が行う。)
  • ワーカー
    • ブローカーによってキューから取り出されたタスクを実際に処理する役割を持ったもの。(今回この役割を Celery Woker が行う。)

階層

.
├── config
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── tasks.py
│   └── urls.py
├── manage.py
└── testapp
    ├── migrations
    ├── templates
    │   └── testapp
    │       ├── check_task.html
    │       └── register_task.html
    ├── urls.py
    └── views.py

インストール

  • apt install redis-server
  • pip install django-redis
  • pip install celery
  • pip install django-celery-results

settings.py

testapp と django_celery_results を INSTALLED_APPS に下記を追記します。

INSTALLED_APPS = [
    ...
    'testapp',
    'django_celery_results',
]

状態や結果をデータベースで管理するために必要となるので下記を追記します。

CELERY_RESULT_BACKEND = 'django-db'

redis を使用しているので接続出来るよう下記を追記します。(import os も忘れずに...)

CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/1')

celery.py

settings.py と同じ階層に celery.py を作成します。
コードは下記のとおりです。
※ config の所は適宜変更が必要です。

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

init.py

settings.py と同じ階層にある init.py に下記を記載します。
Django 起動時に celery.py から app をインポートしてロードします。

from .celery import app as celery_app

__all__ = ('celery_app',)

tasks.py

settings.py と同じ階層に tasks.py を作成します。
コードは下記のとおりです。
ここには非同期で処理したいプログラムを記載します。
今回は第一引数の値が第二引数になるまで +1 するプログラムを記載します。
デコレータ「@shared_task」を付ける事でタスクとして登録されます。

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(i, until):
    print("処理開始")
    while(i < until):
        i += 1
        print(i)

    print('処理完了')
    return i

views.py

register_task で前述の tasks.add を呼び出してタスクを登録しています。
GETパラメータでタスクのIDを受け取り該当する登録されたタスクの状態を取得しています。

from django.shortcuts import render
from celery.result import AsyncResult
from django_celery_results.models import TaskResult
from config.tasks import add

def register_task(request):
    task_id = add.delay(1, 100000)
    context = {'task_id': task_id}
    return render(request, 'testapp/register_task.html', context)

def check_task(request):
    task_id = request.GET.get("task_id")
    task = AsyncResult(task_id)

    # チェック中のIDを取得する。
    print(task.id)

    # タスクの状態を取得する。
    # PENDING: 保留中
    # FAILURE: 失敗
    # SUCCESS: 成功
    print(task.state)

    # 非同期処理が終了したかのステータスを取得する。
    # True: 終了
    # False: 終了していない
    print(task.ready())

    # タスクの結果を取得する。
    # None: 完了していない場合は None となる。
    print(task.result)

    context = {'task': task}
    return render(request, 'testapp/check_task.html', context)

register_task.html

登録されたタスクIDを表示します。

<html>
<head>
    <title>タスク登録結果</title>
</head>
<body>
    <h1>タスク登録結果</h1>
    <p>登録されたタスクID: {{ task_id }}</p>
</body>
</html>

check_task.html

登録されたタスクの状態を表示します。

<html>
<head>
    <title>登録タスク状況</title>
</head>
<body>
    <h1>登録タスク状況</h1>
    <p>タスクID: {{ task.id }}</p>
    <p>タスク状態: {{ task.state }}</p>
    <p>非同期処理完了: {{ task.ready }}</p>
    <p>タスク結果: {{ task.result }}</p>
</body>
</html>

urls.py

config.urls.py に下記の通り追記します。

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('testapp/', include('testapp.urls')), # <- 追加
]

testapp.urls.py を作成し下記の通り記述します。

from django.urls import path
from . import views

app_name = 'testapp'
urlpatterns = [
    path('register_task', views.register_task, name='register_task'),
    path('check_task', views.check_task, name='check_task'),
]

migrate してテーブルを作成する

migrate して celery が管理するテーブルを作成します。

python manage.py migrate

redis を起動する

ブローカーである redis を起動します。

redis-server

ワーカーを起動する

実際にタスクをこなすワーカーを次のコマンドで起動します。
コマンドを実行する階層は manage.py がある階層です。

celery -A config worker -l info

確認してみる

次のURLにアクセスしてタスクを登録してみます。

http://localhost:8000/testapp/register_task

下記の通り登録されたタスクIDが表示されました。

次に下記のURLにアクセスしてタスクの状態を表示してみます。

http://localhost:8000/testapp/check_task?task_id=b9bd21d7-3022-4b6b-a63c-611135eb8f08

まだ処理中の場合は下記の通りのステータスが表示されるようです。

処理が終わった場合は下記の通りのステータスが表示されるようです。

データベースの値を取得する

データベースの値を取得するには下記の通りです。

# TaskResultをインポート
from django_celery_results.models import TaskResult

# TaskResultオブジェクトから実行結果を取得
result_object = TaskResult.objects.get(task_id=task_id)

しかし処理が完了していないと Object Does Not Exist となるようです。

2022年6月19日