SoFunction
Updated on 2025-04-13

Detailed explanation of status monitoring and task management of Django integrated Celery

How to manage Celery tasks through Django? Provide tasks with query, view, retry, termination and other functions through the Django Admin interface? Here is a complete guide to the steps.

Manage Celery Worker with Django

Install Django and related packages

First, create a new virtual environment and install the required packages.

python -m venv myenv
source myenv/bin/activate  # Windows system usage: myenv\Scripts\activatepip install django django-celery-results django-celery-beat celery

Create Django projects and applications

django-admin startproject myproject
cd myproject
django-admin startapp myapp

Configure Django and Celery

existmyproject/Add the following to the file:

INSTALLED_APPS = [
    ...,
    'django_celery_results',
    'django_celery_beat',
    'myapp',  # Make sure the app is in this list]

CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Use Redis as an example to change it according to your needsCELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'

CELERY_TRACK_STARTED = True
CELERY_SEND_EVENTS = True

# Make sure the database is configuredDATABASES = {
    'default': {
        'ENGINE': '.sqlite3',
        'NAME': BASE_DIR / 'db.sqlite3',
    }
}

# Configure Django cache (optional)CACHES = {
    'default': {
        'BACKEND': '',
    }
}

existmyprojectCreate a directorydocument:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from  import settings

# Set up Django configuration module('DJANGO_SETTINGS_MODULE', '')

app = Celery('myproject')

# Configure Celery from Django settingsapp.config_from_object(':settings', namespace='CELERY')

# Automatically discover tasks# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks()

@(bind=True)
def debug_task(self):
    print(f'Request: {!r}')

Revisemyproject/__init__.pyFiles that make Django load Celery at startup:

from __future__ import absolute_import, unicode_literals

# This will ensure that it loads when Django startsfrom .celery import app as celery_app

__all__ = ('celery_app',)

Create a Celery task

existmyapp/Create a simple Celery task in:

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

Register a custom TaskResultAdmin

We need to customizeTaskResultAdminUnregistered the registered model before.

existmyapp/The following modifications are made:

from  import admin
from django_celery_results.models import TaskResult
from django_celery_results.admin import TaskResultAdmin as DefaultTaskResultAdmin
from  import path
from  import redirect
from  import AsyncResult
from  import app

# Cancel the registered TaskResult(TaskResult)

# Create a custom TaskResultAdmin inherits from the default TaskResultAdminclass CustomTaskResultAdmin(DefaultTaskResultAdmin):
    change_list_template = "admin/celery_task_changelist.html"

    def get_urls(self):
        urls = super().get_urls()
        custom_urls = [
            path('retry/<task_id>/', self.admin_site.admin_view(self.retry_task), name='retry-task'),
            path('terminate/<task_id>/', self.admin_site.admin_view(self.terminate_task), name='terminate-task'),
        ]
        return custom_urls + urls

    def retry_task(self, request, task_id, *args, **kwargs):
        AsyncResult(task_id, app=app).reapply()
        self.message_user(request, f'Task {task_id} retried successfully.')
        return redirect('..')

    def terminate_task(self, request, task_id, *args, **kwargs):
        AsyncResult(task_id, app=app).revoke(terminate=True)
        self.message_user(request, f'Task {task_id} terminated successfully.')
        return redirect('..')

# Register a custom TaskResultAdmin(TaskResult, CustomTaskResultAdmin)

TaskResultThe model has beendjango_celery_resultsAutomatically register with Django Admin.

We can inheritdjango_celery_resultsofTaskResultAdminAnd override the way to avoid duplicate registration of the model.

Create custom templates for the Django Admin interface

Create the following directory structure in a Django projecttemplates/adminAnd inadminCreate in foldercelery_task_changelist.html

{% extends "admin/change_list.html" %} {% block result_list %} {{  }}
<script>
  function handleTask(action, task_id) {
    fetch(`/${action}/${task_id}/`, {
      method: 'POST',
      headers: {
        'X-CSRFToken': ('[name=csrfmiddlewaretoken]')
          .value,
      },
    }).then((response) => {
      if () {
        ();
      } else {
        alert('Action failed.');
      }
    });
  }
</script>
<div>
  <form method="post">
    {% csrf_token %} {% for result in cl.result_list %}
    <button type="button" onclick="handleTask('retry', '{{ result.task_id }}')">
      Retry
    </button>
    <button
      type="button"
      onclick="handleTask('terminate', '{{ result.task_id }}')"
    >
      Terminate
    </button>
    {% endfor %}
  </form>
</div>
{% endblock %}

Make sure the custom template path is correct. For the default Django project template directory, the template folder should be inmyproject/templates/admin/celery_task_changelist.html

Run Django and Celery

  • Apply database migration:
python  migrate
  • Start the Django server:
python  runserver
  • Start Celery worker:
celery -A myproject worker -l info

Manage Celery tasks with Django Admin

Open a browser and accesshttp://127.0.0.1:8000/admin/, the Celery task will be displayed in the Django admin interface, and you can query, view, retry and terminate operations such as queries by clicking the button.

This completes the complete steps to manage Celery tasks through the Django Admin interface. The interface and functions can be further customized and optimized if necessary.

Start Django's local Celery Worker

In order to initiate a task to Worker after starting Celery Worker and demonstrate query, view, retry, and terminate tasks in the Django Admin interface, you can follow these steps:

Create a Celery task

existmyapp/Some example tasks are defined in:

# myapp/
from celery import shared_task
import time

@shared_task
def add(x, y):
    (10)  # Simulate long-running tasks    return x + y

@shared_task
def long_running_task(duration):
    (duration)
    return f"Task completed after {duration} seconds"# myapp/
from celery import shared_task
import time

@shared_task
def add(x, y):
    (10)  # Simulate long-running tasks    return x + y

@shared_task
def long_running_task(duration):
    (duration)
    return f"Task completed after {duration} seconds"

Create a view that triggers a task

For ease of demonstration, some views can be created to trigger these tasks. renewanddocument.

  • existmyapp/middle:
# myapp/
from  import JsonResponse
from  import add, long_running_task

def trigger_add_task(request):
    (3, 4)
    return JsonResponse({'status': 'Task add (3, 4) triggered'})

def trigger_long_running_task(request):
    long_running_task.delay(30)  # The task runs for 30 seconds    return JsonResponse({'status': 'Long running task for 30 seconds triggered'})
  • existmyapp/middle:
# myapp/
from  import path
from .views import trigger_add_task, trigger_long_running_task

urlpatterns = [
    path('trigger-add-task/', trigger_add_task, name='trigger-add-task'),
    path('trigger-long-task/', trigger_long_running_task, name='trigger-long-task'),
]
  • existmyproject/middle:
# myproject/
from  import admin
from  import path, include

urlpatterns = [
    path('admin/', ),
    path('tasks/', include('')),
]

Update Celery configuration

make sureCelery is configured:

# myproject/
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Use Redis as an example to change it according to your needsCELERY_RESULT_BACKEND = 'django-db'

Start Celery Worker and Django servers

Make sure the Redis service has been started:

redis-server

Then start the Django server and Celery Worker respectively:

# Start Django serverpython  runserver

# Start Celery Workercelery -A myproject worker -l info

Trigger the task and view it in the Django Admin interface

Open the browser and access the following URL to trigger the task:

  • http://127.0.0.1:8000/tasks/trigger-add-task/- Trigger the increase task
  • http://127.0.0.1:8000/tasks/trigger-long-task/- Trigger long-running tasks

Trigger the Celery task through these URLs. These tasks can then be queried, viewed, retryed and terminated through the Django Admin interface.

View task status in the Django Admin interface

Open a browser and accesshttp://127.0.0.1:8000/admin/, log in to the Django Admin interface, and navigate toTask Resultspart. You should be able to see the appropriate task list and customize it beforeTaskResultAdminThe operations defined in Retry and terminate tasks.

These steps can trigger tasks through Django and Celery demonstrations and query, view, retry, terminate and other operations in the Django Admin interface.

Start a remote Celery Worker

To manage and monitor Celery Workers running separately on remote servers and maintained by separate code repositories through Django Admin, multiple independent systems need to be configured and coordinated.

Install Celery Worker

On the remote server, create a separate project (assuming the name isworker_project) and install the required dependencies:

# On the remote serverpython -m venv venv
source venv/bin/activate
pip install celery redis

Configure Celery Worker

  • existworker_projectInternal configuration Celery (worker_project/):
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Set up Django configuration module('DJANGO_SETTINGS_MODULE', 'worker_project.settings')

app = Celery('worker_project')
app.config_from_object(':settings', namespace='CELERY')
app.autodiscover_tasks()

@(bind=True)
def debug_task(self):
    print(f'Request: {!r}')
  • existworker_project/Configure Celery:
CELERY_BROKER_URL = 'redis://your_redis_server:6379/0'  # Replace with the actual Redis addressCELERY_RESULT_BACKEND = 'redis://your_redis_server:6379/0'

Define tasks

Create some test tasks (worker_project/):

from celery import shared_task
import time

@shared_task
def add(x, y):
    (10)  # Simulate long-running tasks    return x + y

@shared_task
def long_running_task(duration):
    (duration)
    return f"Task completed after {duration} seconds"

Start Celery Worker

celery -A worker_project worker -l info

Start and test

Start the local Django server:

python  runserver

Make sure that the Celery Worker on the remote server is already running.

Trigger the task and view it in Django Admin:

  • accesshttp://127.0.0.1:8000/tasks/trigger-add-task/- Trigger the increase task
  • accesshttp://127.0.0.1:8000/tasks/trigger-long-task/- Trigger long-running tasks

passhttp://127.0.0.1:8000/admin/Log in to the Django Admin interface and navigate toTask ResultsIn part, you should be able to see these tasks and manage them (such as retry and termination).

The above configuration implements the management and monitoring of Celery Workers running separately on remote servers through Django Admin in a local Django project, and communicates through Redis. This architecture can better separate responsibilities and improve the robustness and scalability of the system in a real production environment.

Frequently Asked Questions about Managing Flask-Started Celery Workers with Django Admin

When starting a remote Celery Worker with the Flask App and monitoring and managing these Workers with Django Admin, you may encounter problems such as the Django Admin interface that does not display Celery Worker tasks and task execution results. There may be several reasons:

  1. The backend configuration error result: Make sure Flask and Django use the same result backend.
  2. Django configuration error: Make sure Django has correctly configured the Celery result backend.
  3. Flask app does not save results: Make sure that Flask's Celery configuration does not disable the result saving feature.

To fix this issue, check and fix settings as follows:

Check the Celery configuration of the Flask app

Make sure that Celery in Flask app is configured with the correct result backend and that the storage of task results is not disabled. For example:

celery_app = Celery(
    ,
    task_cls=FlaskTask,
    broker=["CELERY_BROKER_URL"],
    backend=["CELERY_BACKEND"],  # Make sure the result backend is configured    task_ignore_result=False,  # Make sure that task results are not ignored)

celery_app.(
    result_backend=["CELERY_RESULT_BACKEND"],  # Make sure the result backend is configured    broker_connection_retry_on_startup=True,
)

# Make sure there are no unnecessary configurations to disable result storage

Check Django's Celery configuration

existIn the case of the Celery result backend is defined and the configuration is consistent with that in Flask:

# Celery ConfigurationCELERY_BROKER_URL = 'redis://localhost:6379/0'  # Replace with the actual Broker URLCELERY_RESULT_BACKEND = 'django-db'  # Use the Django database as the result backendCELERY_CACHE_BACKEND = 'django-cache'
CELERY_RESULT_PERSISTENT = True

# Installed applicationINSTALLED_APPS = [
    #Other applications    'django_celery_results',
    'django_celery_beat',
]

# Other configurations

Synchronize the database

Make sure the Django database is consistent with the Celery result model:

python  migrate django_celery_results
python  migrate django_celery_beat

Make sure that the relevant model is registered in Django Admin

Make sure to be inRegistereddjango_celery_resultsanddjango_celery_beatmodel for viewing in the Admin interface:

from  import admin
from django_celery_results.models import TaskResult
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule

(TaskResult)
(PeriodicTask)
(IntervalSchedule)
(CrontabSchedule)

Test Celery tasks

Make sure that the Celery task sent from Flask stores the results correctly:

@celery_app.task(bind=True)
def debug_task(self, *args, **kwargs):
    print(f'Request: {!r}')
    return 'Test Result'

Call this task in the Flask application:

debug_task.delay()

Then check whether the task results in the Django Admin interface are displayed.

Check Celery Worker Configuration

make surecelery workerRuns on a shared Broker and Backend:

celery -A your_flask_app_name worker --loglevel=info

Through these steps, you should ensure that the tasks initiated by Celery Worker in the Flask application and the execution results of the task in the Django Admin interface are correctly displayed.

If the problem persists, check the logs and configuration for any errors and make sure all Celery configuration and database access for Flask and Django are valid and consistent.

Summarize

Managing Celery Worker tasks with Django Admin is a convenient way to enable tasks such as query, view, retry and termination through simple configuration and customization.

With the steps and examples provided in this article, you can easily integrate Celery Worker in your Django project and manage and monitor tasks through the Django Admin interface.

The above is personal experience. I hope you can give you a reference and I hope you can support me more.