SoFunction
Updated on 2025-04-14

Python FastAPI+Celery+RabbitMQ implements distributed image watermark processing system

Implementation ideas

  • FastAPI Server
  • Celery Task Queue
  • RabbitMQ as message broker
  • Timed task processing

Complete steps

First create the project structure:

c:\Users\Administrator\Desktop\meitu\
├── app/
│   ├── __init__.py
│   ├──
│   ├── celery_app.py
│   ├──
│   └──
├──
└── celery_worker.py

1. Create first:

fastapi==0.104.1
uvicorn==0.24.0
celery==5.3.4
python-dotenv==1.0.0
requests==2.31.0

2. Create a configuration file:

from dotenv import load_dotenv
import os

load_dotenv()

# RabbitMQ configurationRABBITMQ_HOST = ("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = ("RABBITMQ_PORT", "5672")
RABBITMQ_USER = ("RABBITMQ_USER", "guest")
RABBITMQ_PASS = ("RABBITMQ_PASS", "guest")

# Celery ConfigurationCELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//"
CELERY_RESULT_BACKEND = "rpc://"

# Timed task configurationCELERY_BEAT_SCHEDULE = {
    'process-images-every-hour': {
        'task': '.process_images',
        'schedule': 3600.0,  # Execute once an hour    },
    'daily-cleanup': {
        'task': '.cleanup_old_images',
        'schedule': 86400.0,  # Execute once a day    }
}

3. Create a Celery application:

from celery import Celery
from  import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULE

celery_app = Celery(
    'image_processing',
    broker=CELERY_BROKER_URL,
    backend=CELERY_RESULT_BACKEND,
    include=['']
)

# Configure timing taskscelery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE
celery_app. = 'Asia/Shanghai'

4. Create a task file:

from app.celery_app import celery_app
from  import ImageWatermarker
import os
from datetime import datetime, timedelta

@celery_app.task
def add_watermark_task(image_path, text, position='center'):
    """Asynchronous watermark task"""
    watermarker = ImageWatermarker()
    try:
        result_path = watermarker.add_watermark(
            image_path=image_path,
            text=text,
            position=position
        )
        return {"status": "success", "output_path": result_path}
    except Exception as e:
        return {"status": "error", "message": str(e)}

@celery_app.task
def process_images():
    """Timely processing of picture tasks"""
    image_dir = "images/pending"
    if not (image_dir):
        return {"status": "error", "message": "Pending directory not found"}

    processed = 0
    for image in (image_dir):
        if ().endswith(('.png', '.jpg', '.jpeg')):
            add_watermark_task.delay(
                (image_dir, image),
                "Automatically process watermarks",
                'center'
            )
            processed += 1

    return {"status": "success", "processed": processed}

@celery_app.task
def cleanup_old_images():
    """Cleaning old pictures mission"""
    output_dir = "images/processed"
    if not (output_dir):
        return {"status": "error", "message": "Output directory not found"}

    threshold_date = () - timedelta(days=7)
    cleaned = 0

    for image in (output_dir):
        image_path = (output_dir, image)
        if (image_path) < threshold_date.timestamp():
            (image_path)
            cleaned += 1

    return {"status": "success", "cleaned": cleaned}

5. Create a FastAPI application:

from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from  import JSONResponse
import os
from  import add_watermark_task
from app.celery_app import celery_app

app = FastAPI(title="Picture Watermark Processing Service")

@("/upload/")
async def upload_image(
    file: UploadFile = File(...),
    text: str = "Watermark Text",
    position: str = "center"
):
    # Save the uploaded file    file_path = f"images/uploads/{}"
    ((file_path), exist_ok=True)
    
    with open(file_path, "wb") as buffer:
        content = await ()
        (content)
    
    # Create asynchronous tasks    task = add_watermark_task.delay(file_path, text, position)
    
    return JSONResponse({
        "status": "success",
        "message": "Image uploaded and added to the processing queue",
        "task_id": 
    })

@("/task/{task_id}")
async def get_task_status(task_id: str):
    task = celery_app.AsyncResult(task_id)
    if ():
        return {"status": "completed", "result": }
    return {"status": "processing"}

@("/tasks/scheduled")
async def get_scheduled_tasks():
    return {"tasks": celery_app.conf.beat_schedule}

6. Create the Celery worker startup file:

from app.celery_app import celery_app

if __name__ == '__main__':
    celery_app.start()

Instructions for use

First install the dependencies:

pip install -r 

Make sure the RabbitMQ service is started

Start the FastAPI server:

uvicorn :app --reload

Start Celery worker:

celery -A celery_worker.celery_app worker --loglevel=info

Start Celery beat (timed task):

celery -A celery_worker.celery_app beat --loglevel=info

This system provides the following functions:

  • Upload images through FastAPI interface and process watermarks asynchronously
  • Use Celery to handle asynchronous task queues
  • Use RabbitMQ as message broker
  • Support timing tasks:
    • Automatically process pending pictures every hour
    • Clean up old pictures from a week ago every day
  • Support task status query
  • Support viewing scheduled task list

API Endpoint:

  • POST /upload/ - Upload image and create watermark tasks
  • GET /task/{task_id} - Query task status
  • GET /tasks/scheduled - View scheduled task list

The above is the detailed content of Python FastAPI+Celery+RabbitMQ implementing a distributed image watermark processing system. For more information about Python image watermark, please pay attention to my other related articles!