How to manage Celery tasks through Django? Provide tasks with query, view, retry, termination and other functions through the Django Admin interface? Here is a complete guide to the steps.
Manage Celery Worker with Django
Install Django and related packages
First, create a new virtual environment and install the required packages.
python -m venv myenv source myenv/bin/activate # Windows system usage: myenv\Scripts\activatepip install django django-celery-results django-celery-beat celery
Create Django projects and applications
django-admin startproject myproject cd myproject django-admin startapp myapp
Configure Django and Celery
existmyproject/
Add the following to the file:
INSTALLED_APPS = [ ..., 'django_celery_results', 'django_celery_beat', 'myapp', # Make sure the app is in this list] CELERY_BROKER_URL = 'redis://localhost:6379/0' # Use Redis as an example to change it according to your needsCELERY_RESULT_BACKEND = 'django-db' CELERY_CACHE_BACKEND = 'django-cache' CELERY_TRACK_STARTED = True CELERY_SEND_EVENTS = True # Make sure the database is configuredDATABASES = { 'default': { 'ENGINE': '.sqlite3', 'NAME': BASE_DIR / 'db.sqlite3', } } # Configure Django cache (optional)CACHES = { 'default': { 'BACKEND': '', } }
existmyproject
Create a directorydocument:
from __future__ import absolute_import, unicode_literals import os from celery import Celery from import settings # Set up Django configuration module('DJANGO_SETTINGS_MODULE', '') app = Celery('myproject') # Configure Celery from Django settingsapp.config_from_object(':settings', namespace='CELERY') # Automatically discover tasks# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) app.autodiscover_tasks() @(bind=True) def debug_task(self): print(f'Request: {!r}')
Revisemyproject/__init__.py
Files that make Django load Celery at startup:
from __future__ import absolute_import, unicode_literals # This will ensure that it loads when Django startsfrom .celery import app as celery_app __all__ = ('celery_app',)
Create a Celery task
existmyapp/
Create a simple Celery task in:
from celery import shared_task @shared_task def add(x, y): return x + y
Register a custom TaskResultAdmin
We need to customizeTaskResultAdmin
Unregistered the registered model before.
existmyapp/
The following modifications are made:
from import admin from django_celery_results.models import TaskResult from django_celery_results.admin import TaskResultAdmin as DefaultTaskResultAdmin from import path from import redirect from import AsyncResult from import app # Cancel the registered TaskResult(TaskResult) # Create a custom TaskResultAdmin inherits from the default TaskResultAdminclass CustomTaskResultAdmin(DefaultTaskResultAdmin): change_list_template = "admin/celery_task_changelist.html" def get_urls(self): urls = super().get_urls() custom_urls = [ path('retry/<task_id>/', self.admin_site.admin_view(self.retry_task), name='retry-task'), path('terminate/<task_id>/', self.admin_site.admin_view(self.terminate_task), name='terminate-task'), ] return custom_urls + urls def retry_task(self, request, task_id, *args, **kwargs): AsyncResult(task_id, app=app).reapply() self.message_user(request, f'Task {task_id} retried successfully.') return redirect('..') def terminate_task(self, request, task_id, *args, **kwargs): AsyncResult(task_id, app=app).revoke(terminate=True) self.message_user(request, f'Task {task_id} terminated successfully.') return redirect('..') # Register a custom TaskResultAdmin(TaskResult, CustomTaskResultAdmin)
TaskResult
The model has beendjango_celery_results
Automatically register with Django Admin.
We can inheritdjango_celery_results
ofTaskResultAdmin
And override the way to avoid duplicate registration of the model.
Create custom templates for the Django Admin interface
Create the following directory structure in a Django projecttemplates/admin
And inadmin
Create in foldercelery_task_changelist.html
:
{% extends "admin/change_list.html" %} {% block result_list %} {{ }} <script> function handleTask(action, task_id) { fetch(`/${action}/${task_id}/`, { method: 'POST', headers: { 'X-CSRFToken': ('[name=csrfmiddlewaretoken]') .value, }, }).then((response) => { if () { (); } else { alert('Action failed.'); } }); } </script> <div> <form method="post"> {% csrf_token %} {% for result in cl.result_list %} <button type="button" onclick="handleTask('retry', '{{ result.task_id }}')"> Retry </button> <button type="button" onclick="handleTask('terminate', '{{ result.task_id }}')" > Terminate </button> {% endfor %} </form> </div> {% endblock %}
Make sure the custom template path is correct. For the default Django project template directory, the template folder should be inmyproject/templates/admin/celery_task_changelist.html
。
Run Django and Celery
- Apply database migration:
python migrate
- Start the Django server:
python runserver
- Start Celery worker:
celery -A myproject worker -l info
Manage Celery tasks with Django Admin
Open a browser and accesshttp://127.0.0.1:8000/admin/
, the Celery task will be displayed in the Django admin interface, and you can query, view, retry and terminate operations such as queries by clicking the button.
This completes the complete steps to manage Celery tasks through the Django Admin interface. The interface and functions can be further customized and optimized if necessary.
Start Django's local Celery Worker
In order to initiate a task to Worker after starting Celery Worker and demonstrate query, view, retry, and terminate tasks in the Django Admin interface, you can follow these steps:
Create a Celery task
existmyapp/
Some example tasks are defined in:
# myapp/ from celery import shared_task import time @shared_task def add(x, y): (10) # Simulate long-running tasks return x + y @shared_task def long_running_task(duration): (duration) return f"Task completed after {duration} seconds"# myapp/ from celery import shared_task import time @shared_task def add(x, y): (10) # Simulate long-running tasks return x + y @shared_task def long_running_task(duration): (duration) return f"Task completed after {duration} seconds"
Create a view that triggers a task
For ease of demonstration, some views can be created to trigger these tasks. renewand
document.
- exist
myapp/
middle:
# myapp/ from import JsonResponse from import add, long_running_task def trigger_add_task(request): (3, 4) return JsonResponse({'status': 'Task add (3, 4) triggered'}) def trigger_long_running_task(request): long_running_task.delay(30) # The task runs for 30 seconds return JsonResponse({'status': 'Long running task for 30 seconds triggered'})
- exist
myapp/
middle:
# myapp/ from import path from .views import trigger_add_task, trigger_long_running_task urlpatterns = [ path('trigger-add-task/', trigger_add_task, name='trigger-add-task'), path('trigger-long-task/', trigger_long_running_task, name='trigger-long-task'), ]
- exist
myproject/
middle:
# myproject/ from import admin from import path, include urlpatterns = [ path('admin/', ), path('tasks/', include('')), ]
Update Celery configuration
make sureCelery is configured:
# myproject/ CELERY_BROKER_URL = 'redis://localhost:6379/0' # Use Redis as an example to change it according to your needsCELERY_RESULT_BACKEND = 'django-db'
Start Celery Worker and Django servers
Make sure the Redis service has been started:
redis-server
Then start the Django server and Celery Worker respectively:
# Start Django serverpython runserver # Start Celery Workercelery -A myproject worker -l info
Trigger the task and view it in the Django Admin interface
Open the browser and access the following URL to trigger the task:
-
http://127.0.0.1:8000/tasks/trigger-add-task/
- Trigger the increase task -
http://127.0.0.1:8000/tasks/trigger-long-task/
- Trigger long-running tasks
Trigger the Celery task through these URLs. These tasks can then be queried, viewed, retryed and terminated through the Django Admin interface.
View task status in the Django Admin interface
Open a browser and accesshttp://127.0.0.1:8000/admin/
, log in to the Django Admin interface, and navigate toTask Results
part. You should be able to see the appropriate task list and customize it beforeTaskResultAdmin
The operations defined in Retry and terminate tasks.
These steps can trigger tasks through Django and Celery demonstrations and query, view, retry, terminate and other operations in the Django Admin interface.
Start a remote Celery Worker
To manage and monitor Celery Workers running separately on remote servers and maintained by separate code repositories through Django Admin, multiple independent systems need to be configured and coordinated.
Install Celery Worker
On the remote server, create a separate project (assuming the name isworker_project
) and install the required dependencies:
# On the remote serverpython -m venv venv source venv/bin/activate pip install celery redis
Configure Celery Worker
- exist
worker_project
Internal configuration Celery (worker_project/
):
from __future__ import absolute_import, unicode_literals import os from celery import Celery # Set up Django configuration module('DJANGO_SETTINGS_MODULE', 'worker_project.settings') app = Celery('worker_project') app.config_from_object(':settings', namespace='CELERY') app.autodiscover_tasks() @(bind=True) def debug_task(self): print(f'Request: {!r}')
- exist
worker_project/
Configure Celery:
CELERY_BROKER_URL = 'redis://your_redis_server:6379/0' # Replace with the actual Redis addressCELERY_RESULT_BACKEND = 'redis://your_redis_server:6379/0'
Define tasks
Create some test tasks (worker_project/
):
from celery import shared_task import time @shared_task def add(x, y): (10) # Simulate long-running tasks return x + y @shared_task def long_running_task(duration): (duration) return f"Task completed after {duration} seconds"
Start Celery Worker
celery -A worker_project worker -l info
Start and test
Start the local Django server:
python runserver
Make sure that the Celery Worker on the remote server is already running.
Trigger the task and view it in Django Admin:
- access
http://127.0.0.1:8000/tasks/trigger-add-task/
- Trigger the increase task - access
http://127.0.0.1:8000/tasks/trigger-long-task/
- Trigger long-running tasks
passhttp://127.0.0.1:8000/admin/
Log in to the Django Admin interface and navigate toTask Results
In part, you should be able to see these tasks and manage them (such as retry and termination).
The above configuration implements the management and monitoring of Celery Workers running separately on remote servers through Django Admin in a local Django project, and communicates through Redis. This architecture can better separate responsibilities and improve the robustness and scalability of the system in a real production environment.
Frequently Asked Questions about Managing Flask-Started Celery Workers with Django Admin
When starting a remote Celery Worker with the Flask App and monitoring and managing these Workers with Django Admin, you may encounter problems such as the Django Admin interface that does not display Celery Worker tasks and task execution results. There may be several reasons:
- The backend configuration error result: Make sure Flask and Django use the same result backend.
- Django configuration error: Make sure Django has correctly configured the Celery result backend.
- Flask app does not save results: Make sure that Flask's Celery configuration does not disable the result saving feature.
To fix this issue, check and fix settings as follows:
Check the Celery configuration of the Flask app
Make sure that Celery in Flask app is configured with the correct result backend and that the storage of task results is not disabled. For example:
celery_app = Celery( , task_cls=FlaskTask, broker=["CELERY_BROKER_URL"], backend=["CELERY_BACKEND"], # Make sure the result backend is configured task_ignore_result=False, # Make sure that task results are not ignored) celery_app.( result_backend=["CELERY_RESULT_BACKEND"], # Make sure the result backend is configured broker_connection_retry_on_startup=True, ) # Make sure there are no unnecessary configurations to disable result storage
Check Django's Celery configuration
existIn the case of the Celery result backend is defined and the configuration is consistent with that in Flask:
# Celery ConfigurationCELERY_BROKER_URL = 'redis://localhost:6379/0' # Replace with the actual Broker URLCELERY_RESULT_BACKEND = 'django-db' # Use the Django database as the result backendCELERY_CACHE_BACKEND = 'django-cache' CELERY_RESULT_PERSISTENT = True # Installed applicationINSTALLED_APPS = [ #Other applications 'django_celery_results', 'django_celery_beat', ] # Other configurations
Synchronize the database
Make sure the Django database is consistent with the Celery result model:
python migrate django_celery_results python migrate django_celery_beat
Make sure that the relevant model is registered in Django Admin
Make sure to be inRegistered
django_celery_results
anddjango_celery_beat
model for viewing in the Admin interface:
from import admin from django_celery_results.models import TaskResult from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule (TaskResult) (PeriodicTask) (IntervalSchedule) (CrontabSchedule)
Test Celery tasks
Make sure that the Celery task sent from Flask stores the results correctly:
@celery_app.task(bind=True) def debug_task(self, *args, **kwargs): print(f'Request: {!r}') return 'Test Result'
Call this task in the Flask application:
debug_task.delay()
Then check whether the task results in the Django Admin interface are displayed.
Check Celery Worker Configuration
make surecelery worker
Runs on a shared Broker and Backend:
celery -A your_flask_app_name worker --loglevel=info
Through these steps, you should ensure that the tasks initiated by Celery Worker in the Flask application and the execution results of the task in the Django Admin interface are correctly displayed.
If the problem persists, check the logs and configuration for any errors and make sure all Celery configuration and database access for Flask and Django are valid and consistent.
Summarize
Managing Celery Worker tasks with Django Admin is a convenient way to enable tasks such as query, view, retry and termination through simple configuration and customization.
With the steps and examples provided in this article, you can easily integrate Celery Worker in your Django project and manage and monitor tasks through the Django Admin interface.
The above is personal experience. I hope you can give you a reference and I hope you can support me more.