System requirements
Python
Redis Server
Dependency package:
- celery==5.3.6
- redis==5.0.1
System architecture
The system is mainly composed of the following components:
- Task definition module (): Contains all executable task definitions
- Main program module (): Responsible for task scheduling and monitoring
- Redis server: as a message broker and a result backend
Configuration instructions
Celery Configuration
broker_url = 'redis://:[email protected]:6379/1' result_backend = 'redis://:[email protected]:6379/1'
Main configuration items:
- Task Serialization: JSON
- Time zone: Asia/Shanghai
- Number of worker processes: 1
Functional module
1. Basic operation tasks
add(x, y): add operation
multiply(x, y): multiply operation
chain_calculation(numbers): chain calculation (sum, average, maximum, minimum)
2. Text processing tasks
process_text(text): text processing (capsulation conversion, length statistics, word counting)
3. System monitoring tasks
system_monitor(): executes every 5 seconds to monitor system status
- CPU usage rate
- Memory usage
- System status
4. Report generation task
generate_report(): generate real-time report
daily_report(): Generate daily reports at 9 a.m. every day
workday_task(): tasks executed every hour on weekdays
Timed task configuration
The system contains the following timing tasks:
- System monitoring: execute every 5 seconds
- Daily Generation: Execute at 9 am every day
- Workday tasks: Perform every hour from Monday to Friday (Monday) 9:00-18:00
Instructions for use
1. Start the system
Make sure the Redis server is started
Start the Celery worker process:
celery -A tasks worker --loglevel=info
Start the Celery Beat process (for timing tasks):
celery -A tasks beat
Run the main program:
python
2. System monitoring
After the main program is run, the following operations will be automatically performed:
- Real-time display of system monitoring data
- Example of performing general tasks
- You can exit the program gracefully by pressing Ctrl+C
Error handling
The system implements a complete error handling mechanism:
- Task execution error capture and logging
- Elegant program exit processing
- Automatic retry mechanism
Things to note
Redis connection configuration needs to be modified according to the actual environment
Make sure the system time zone is set correctly
It is recommended to adjust the number of work processes in the production environment
The monitoring data is currently simulated data, and it needs to be replaced with real system monitoring indicators when used.
Code Example
Task execution example:
# Execute addition tasksresult = (4, 6) print(f"TaskID: {}") if (): print(f"result: {()}")
System monitoring example
# Execute system monitoringresult = system_monitor.delay() data = () print(f"CPUUsage rate: {data['cpu_usage']:.1f}%") print(f"内存Usage rate: {data['memory_usage']:.1f}%") ### All codes: ```python from tasks import add, multiply, process_text, generate_report, chain_calculation, system_monitor import time import json from datetime import datetime import threading import signal import sys from import AsyncResult # Global variable control program runrunning = True def signal_handler(signum, frame): """Processing the exit signal""" global running print("\nReceived an exit signal and closed the program...") running = False def monitor_system_task(): """Supervising the execution results of system tasks""" while running: try: # Execute system monitoring tasks result = system_monitor.delay() # Wait for the result (waiting up to 4 seconds) for _ in range(4): if (): data = () if data: print(f"\nSystem Monitoring Results:") print(f"CPUUsage rate: {data['cpu_usage']:.1f}%") print(f"内存Usage rate: {data['memory_usage']:.1f}%") print(f"System status: {data['status']}") print("-" * 50) break (1) # Wait for the remaining time, make sure to execute approximately every 5 seconds (1) except Exception as e: print(f"Error in monitoring task: {e}") (5) def run_regular_task(): """Example of running a normal task""" while running: try: # Perform some regular tasks print(f"\n[{().strftime('%Y-%m-%d %H:%M:%S')}] Perform regular tasks...") # 1. Perform the addition task result = (4, 6) print(f"Addition taskID: {}") if (): print(f"4 + 6 = {()}") # 2. Perform text processing text = f"This is a test message - {()}" result = process_text.delay(text) print(f"Text processing tasksID: {}") if (): print(((), indent=2, ensure_ascii=False)) # Sleep for 5 seconds and continue to the next round for _ in range(5): if not running: break (1) except Exception as e: print(f"An error occurred while executing a task: {e}") (5) def main(): """Main Function""" # Register a signal processor (for elegant exit) (, signal_handler) (, signal_handler) print("Program start...") print("Tip: Press Ctrl+C to exit the program gracefully") print("\n=== The main program starts running ===") print("- System monitoring is executed every 5 seconds") print("- Regular tasks are executed every 5 seconds") print("- The execution results of all tasks will be displayed in real time") try: # Create and start monitoring thread monitor_thread = (target=monitor_system_task) monitor_thread.daemon = True monitor_thread.start() # Create and start a regular task thread task_thread = (target=run_regular_task) task_thread.daemon = True task_thread.start() # Main thread keeps running while running: (1) except KeyboardInterrupt: print("\nThe program is closing...") finally: print("The program has been exited.") if __name__ == "__main__": main()
from celery import Celery from import crontab import time from datetime import datetime import random # Create a Celery instanceapp = Celery('tasks') # Configure Celery( broker_url='redis://:[email protected]:6379/1', result_backend='redis://:[email protected]:6379/1', task_serializer='json', result_serializer='json', accept_content=['json'], timezone='Asia/Shanghai', enable_utc=True, worker_pool_restarts=True, worker_concurrency=1, ) # Configure timing tasks.beat_schedule = { # Execute system monitoring every 5 seconds 'monitor-every-5-seconds': { 'task': 'tasks.system_monitor', 'schedule': 5.0, # Execute every 5 seconds }, # Execute at 9 am every day 'daily-morning-report': { 'task': 'tasks.daily_report', 'schedule': crontab(hour=9, minute=0), }, # Perform every hour on weekdays 'workday-hourly-task': { 'task': 'tasks.workday_task', 'schedule': crontab(hour='9-18', minute=0, day_of_week='1-5'), } } @ def add(x, y): """Simple Addition Task""" (1) return x + y @ def multiply(x, y): """Multiple operation task""" (2) return x * y @ def process_text(text): """Text Processing Task""" (1) result = { 'original': text, 'upper': (), 'length': len(text), 'words': len(()) } return result @ def generate_report(): """Generate Report Task""" (3) current_time = ().strftime("%Y-%m-%d %H:%M:%S") data = { 'timestamp': current_time, 'temperature': (20, 30), 'humidity': (40, 80), 'status': (['normal', 'warn', 'mistake']) } return data @ def chain_calculation(numbers): """Chain Computing Task""" (2) result = sum(numbers) average = result / len(numbers) maximum = max(numbers) minimum = min(numbers) return { 'sum': result, 'average': average, 'max': maximum, 'min': minimum, 'count': len(numbers) } @ def system_monitor(): """System monitoring tasks executed every 5 seconds""" current_time = ().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{current_time}] Every5System monitoring tasks that are executed once in seconds Execute system monitoring...") # Simulate to obtain system information data = { 'timestamp': current_time, 'cpu_usage': (0, 100), 'memory_usage': (0, 100), 'status': 'running' } # Print monitoring information print(f"CPUUsage rate: {data['cpu_usage']:.1f}%") print(f"内存Usage rate: {data['memory_usage']:.1f}%") print(f"System status: {data['status']}") print("-" * 50) return data @ def daily_report(): """Daily mission performed at 9 a.m. every day""" current_time = ().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{current_time}] 生成Every日报告...") report = { 'report_type': 'daily', 'generated_at': current_time, 'summary': 'This is an automatically generated daily report example', 'metrics': { 'total_tasks': (100, 1000), 'completed_tasks': (50, 500), 'success_rate': (0.8, 1.0) } } print(f"Report Type: {report['report_type']}") print(f"Generation time: {report['generated_at']}") print(f"Task completion rate: {report['metrics']['success_rate']:.1%}") print("-" * 50) return report @ def workday_task(): """Tasks performed every hour on weekdays""" current_time = ().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{current_time}] Perform work hours tasks...") data = { 'task_type': 'Working hours tasks', 'executed_at': current_time, 'status': (['Finish', 'in progress', 'Planning']), 'workload': (1, 100) } print(f"Task status: {data['status']}") print(f"Workloads: {data['workload']}%") print("-" * 50) return data
The above is a detailed analysis of how Python implements the Celery task queue system. For more information about Python Celery task queues, please pay attention to my other related articles!