Implementation ideas
- FastAPI Server
- Celery Task Queue
- RabbitMQ as message broker
- Timed task processing
Complete steps
First create the project structure:
c:\Users\Administrator\Desktop\meitu\
├── app/
│ ├── __init__.py
│ ├──
│ ├── celery_app.py
│ ├──
│ └──
├──
└── celery_worker.py
1. Create first:
fastapi==0.104.1 uvicorn==0.24.0 celery==5.3.4 python-dotenv==1.0.0 requests==2.31.0
2. Create a configuration file:
from dotenv import load_dotenv import os load_dotenv() # RabbitMQ configurationRABBITMQ_HOST = ("RABBITMQ_HOST", "localhost") RABBITMQ_PORT = ("RABBITMQ_PORT", "5672") RABBITMQ_USER = ("RABBITMQ_USER", "guest") RABBITMQ_PASS = ("RABBITMQ_PASS", "guest") # Celery ConfigurationCELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//" CELERY_RESULT_BACKEND = "rpc://" # Timed task configurationCELERY_BEAT_SCHEDULE = { 'process-images-every-hour': { 'task': '.process_images', 'schedule': 3600.0, # Execute once an hour }, 'daily-cleanup': { 'task': '.cleanup_old_images', 'schedule': 86400.0, # Execute once a day } }
3. Create a Celery application:
from celery import Celery from import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULE celery_app = Celery( 'image_processing', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND, include=[''] ) # Configure timing taskscelery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE celery_app. = 'Asia/Shanghai'
4. Create a task file:
from app.celery_app import celery_app from import ImageWatermarker import os from datetime import datetime, timedelta @celery_app.task def add_watermark_task(image_path, text, position='center'): """Asynchronous watermark task""" watermarker = ImageWatermarker() try: result_path = watermarker.add_watermark( image_path=image_path, text=text, position=position ) return {"status": "success", "output_path": result_path} except Exception as e: return {"status": "error", "message": str(e)} @celery_app.task def process_images(): """Timely processing of picture tasks""" image_dir = "images/pending" if not (image_dir): return {"status": "error", "message": "Pending directory not found"} processed = 0 for image in (image_dir): if ().endswith(('.png', '.jpg', '.jpeg')): add_watermark_task.delay( (image_dir, image), "Automatically process watermarks", 'center' ) processed += 1 return {"status": "success", "processed": processed} @celery_app.task def cleanup_old_images(): """Cleaning old pictures mission""" output_dir = "images/processed" if not (output_dir): return {"status": "error", "message": "Output directory not found"} threshold_date = () - timedelta(days=7) cleaned = 0 for image in (output_dir): image_path = (output_dir, image) if (image_path) < threshold_date.timestamp(): (image_path) cleaned += 1 return {"status": "success", "cleaned": cleaned}
5. Create a FastAPI application:
from fastapi import FastAPI, File, UploadFile, BackgroundTasks from import JSONResponse import os from import add_watermark_task from app.celery_app import celery_app app = FastAPI(title="Picture Watermark Processing Service") @("/upload/") async def upload_image( file: UploadFile = File(...), text: str = "Watermark Text", position: str = "center" ): # Save the uploaded file file_path = f"images/uploads/{}" ((file_path), exist_ok=True) with open(file_path, "wb") as buffer: content = await () (content) # Create asynchronous tasks task = add_watermark_task.delay(file_path, text, position) return JSONResponse({ "status": "success", "message": "Image uploaded and added to the processing queue", "task_id": }) @("/task/{task_id}") async def get_task_status(task_id: str): task = celery_app.AsyncResult(task_id) if (): return {"status": "completed", "result": } return {"status": "processing"} @("/tasks/scheduled") async def get_scheduled_tasks(): return {"tasks": celery_app.conf.beat_schedule}
6. Create the Celery worker startup file:
from app.celery_app import celery_app if __name__ == '__main__': celery_app.start()
Instructions for use
First install the dependencies:
pip install -r
Make sure the RabbitMQ service is started
Start the FastAPI server:
uvicorn :app --reload
Start Celery worker:
celery -A celery_worker.celery_app worker --loglevel=info
Start Celery beat (timed task):
celery -A celery_worker.celery_app beat --loglevel=info
This system provides the following functions:
- Upload images through FastAPI interface and process watermarks asynchronously
- Use Celery to handle asynchronous task queues
- Use RabbitMQ as message broker
- Support timing tasks:
- Automatically process pending pictures every hour
- Clean up old pictures from a week ago every day
- Support task status query
- Support viewing scheduled task list
API Endpoint:
- POST /upload/ - Upload image and create watermark tasks
- GET /task/{task_id} - Query task status
- GET /tasks/scheduled - View scheduled task list
The above is the detailed content of Python FastAPI+Celery+RabbitMQ implementing a distributed image watermark processing system. For more information about Python image watermark, please pay attention to my other related articles!