celery で非同期通信を試してみる
参考
役割
- タスク
- 非同期で実行させる処理内容をひとかたまりにまとめたもの。
- キュー
- タスクを格納する入れ物。入れ物の構造が、先に入ったタスクから順に取り出す様になっているもの。
- プロデューサー
- タスクを作成してブローカーに渡す役割を持ったもの。(今回この役割を Celery Client が行う。)
- ブローカー
- 作成されたタスクをキューに登録したり、キューに登録されているタスクをワーカーに渡したりする役割を持ったもの。(今回この役割を redis が行う。)
- ワーカー
- ブローカーによってキューから取り出されたタスクを実際に処理する役割を持ったもの。(今回この役割を Celery Woker が行う。)
階層
.
├── config
│ ├── __init__.py
│ ├── celery.py
│ ├── settings.py
│ ├── tasks.py
│ └── urls.py
├── manage.py
└── testapp
├── migrations
├── templates
│ └── testapp
│ ├── check_task.html
│ └── register_task.html
├── urls.py
└── views.py
インストール
- apt install redis-server
- pip install django-redis
- pip install celery
- pip install django-celery-results
settings.py
testapp と django_celery_results を INSTALLED_APPS に下記を追記します。
INSTALLED_APPS = [
...
'testapp',
'django_celery_results',
]
状態や結果をデータベースで管理するために必要となるので下記を追記します。
CELERY_RESULT_BACKEND = 'django-db'
redis を使用しているので接続出来るよう下記を追記します。(import os も忘れずに...)
CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/1')
celery.py
settings.py と同じ階層に celery.py を作成します。
コードは下記のとおりです。
※ config の所は適宜変更が必要です。
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
init.py
settings.py と同じ階層にある init.py に下記を記載します。
Django 起動時に celery.py から app をインポートしてロードします。
from .celery import app as celery_app
__all__ = ('celery_app',)
tasks.py
settings.py と同じ階層に tasks.py を作成します。
コードは下記のとおりです。
ここには非同期で処理したいプログラムを記載します。
今回は第一引数の値が第二引数になるまで +1 するプログラムを記載します。
デコレータ「@shared_task」を付ける事でタスクとして登録されます。
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(i, until):
print("処理開始")
while(i < until):
i += 1
print(i)
print('処理完了')
return i
views.py
register_task で前述の tasks.add を呼び出してタスクを登録しています。
GETパラメータでタスクのIDを受け取り該当する登録されたタスクの状態を取得しています。
from django.shortcuts import render
from celery.result import AsyncResult
from django_celery_results.models import TaskResult
from config.tasks import add
def register_task(request):
task_id = add.delay(1, 100000)
context = {'task_id': task_id}
return render(request, 'testapp/register_task.html', context)
def check_task(request):
task_id = request.GET.get("task_id")
task = AsyncResult(task_id)
# チェック中のIDを取得する。
print(task.id)
# タスクの状態を取得する。
# PENDING: 保留中
# FAILURE: 失敗
# SUCCESS: 成功
print(task.state)
# 非同期処理が終了したかのステータスを取得する。
# True: 終了
# False: 終了していない
print(task.ready())
# タスクの結果を取得する。
# None: 完了していない場合は None となる。
print(task.result)
context = {'task': task}
return render(request, 'testapp/check_task.html', context)
register_task.html
登録されたタスクIDを表示します。
<html>
<head>
<title>タスク登録結果</title>
</head>
<body>
<h1>タスク登録結果</h1>
<p>登録されたタスクID: {{ task_id }}</p>
</body>
</html>
check_task.html
登録されたタスクの状態を表示します。
<html>
<head>
<title>登録タスク状況</title>
</head>
<body>
<h1>登録タスク状況</h1>
<p>タスクID: {{ task.id }}</p>
<p>タスク状態: {{ task.state }}</p>
<p>非同期処理完了: {{ task.ready }}</p>
<p>タスク結果: {{ task.result }}</p>
</body>
</html>
urls.py
config.urls.py に下記の通り追記します。
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('testapp/', include('testapp.urls')), # <- 追加
]
testapp.urls.py を作成し下記の通り記述します。
from django.urls import path
from . import views
app_name = 'testapp'
urlpatterns = [
path('register_task', views.register_task, name='register_task'),
path('check_task', views.check_task, name='check_task'),
]
migrate してテーブルを作成する
migrate して celery が管理するテーブルを作成します。
python manage.py migrate
redis を起動する
ブローカーである redis を起動します。
redis-server
ワーカーを起動する
実際にタスクをこなすワーカーを次のコマンドで起動します。
コマンドを実行する階層は manage.py がある階層です。
celery -A config worker -l info
確認してみる
次のURLにアクセスしてタスクを登録してみます。
http://localhost:8000/testapp/register_task
下記の通り登録されたタスクIDが表示されました。
次に下記のURLにアクセスしてタスクの状態を表示してみます。
http://localhost:8000/testapp/check_task?task_id=b9bd21d7-3022-4b6b-a63c-611135eb8f08
まだ処理中の場合は下記の通りのステータスが表示されるようです。
処理が終わった場合は下記の通りのステータスが表示されるようです。
データベースの値を取得する
データベースの値を取得するには下記の通りです。
# TaskResultをインポート
from django_celery_results.models import TaskResult
# TaskResultオブジェクトから実行結果を取得
result_object = TaskResult.objects.get(task_id=task_id)
しかし処理が完了していないと Object Does Not Exist となるようです。
ディスカッション
コメント一覧
まだ、コメントがありません