SoFunction
Updated on 2025-04-12

Develop image data cleaning & image quality inspection tools based on Python

With more and more images collected in the data set, various problems such as very messy data format, inconsistent quality, and damage to some images have emerged.

This program provides various image data cleaning and image quality check functions to prevent various abnormalities when model training loads data.

1. Use various methods to read the image to check whether the image is damaged

2. Read the image exit information to prevent labeling exceptions

3. Record image information

① Image encoding format, resolution, number of channels, and file size to facilitate judgment of other image attributes

② MD5, PHash16 equal values, used to determine whether there is duplication

③ Peak signal-to-noise ratio (PSNR), structural similarity (SSIM), etc., used to judge image quality

Complete code

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Function: Image data cleaning & image quality inspection# Author: AYangSN# Time: 2025-03-12# Version: 1.0

# here is important imports
import csv
import os
import sys
import glob
import shutil
import argparse
import cv2
import hashlib
import imagehash
import numpy as np
from tqdm import tqdm
from PIL import Image, ImageOps, ExifTags
import pandas as pd
from  import ThreadPoolExecutor, as_completed
from  import structural_similarity as ssim
from  import entropy


def check_image_with_pil(filepath):
    """Use PIL to check if the image is corrupted"""
    try:
        img = (filepath)
        ()  # Verify image integrity        img = (filepath)  # Open again to make sure the image can load normally        return True, img
    except Exception as e:
        return False, str(e)

def check_image_with_opencv(filepath):
    """Use OpenCV to check if the image is corrupted"""
    try:
        image = (filepath)
        if image is None or  == 0:
            return False, "OpenCV cannot load images"
        return True, image
    except Exception as e:
        return False, str(e)

def check_file_header(filepath):
    """Check whether the image format is correct by reading the file header information"""
    valid_headers = {
        'JPEG': b'\xff\xd8\xff',
        'PNG': b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a',
        'GIF87a': b'GIF87a',
        'GIF89a': b'GIF89a',
        'BMP': b'BM'
    }
    with open(filepath, 'rb') as f:
        header = (8)  # Read the first 8 bytes to overwrite all formats        for format, magic in valid_headers.items():
            if (magic):
                return True, None
    return False, "Unknown file header"


def get_exif_orientation(image):
    try:
        exif = image._getexif()
    except AttributeError:
        exif = None
    if exif is None:
        return None
    exif = {
        [k]: v
        for k, v in ()
        if k in 
    }
    # Obtain image direction information    orientation = ('Orientation', None)
    return orientation


def exif_update_image_files(image, orientation, image_file, output_dir):
    '''Rotate the picture according to the parameters'''
    if orientation == 2:
        # left-to-right mirror
        image = (image)
    elif orientation == 3:
        # rotate 180
        image = (Image.ROTATE_180)
    elif orientation == 4:
        # top-to-bottom mirror
        image = (image)
    elif orientation == 5:
        # top-to-left mirror
        image = ((Image.ROTATE_270))
    elif orientation == 6:
        # rotate 270
        image = (Image.ROTATE_270)
    elif orientation == 7:
        # top-to-right mirror
        image =  ((Image.ROTATE_90))
    elif orientation == 8:
        # rotate 90
        image = (Image.ROTATE_90)
    else:
        pass
    
    # Generate output path    outpath = "{}/{}".format(output_dir, orientation)
    (outpath, exist_ok=True)

    # Use opencv to read to remove exif information    img = ((image), cv2.COLOR_RGB2BGR)

    # Get the image name    _, imgname = (image_file)
    
    # Resave the picture    (outpath+'/'+imgname, img)


def compute_md5(filepath):
    """Calculate the MD5 value of the file"""
    hash_md5 = hashlib.md5()
    with open(filepath, "rb") as f:
        for chunk in iter(lambda: (4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()


def compute_phash(imgpath, hash_size=16):
    # Calculate the phash value of the image    img = (imgpath)
    phash = (img, hash_size=hash_size, highfreq_factor=4)
    hex_string = str(phash)
    return hex_string


def diff_phash(p1, p2, hash_size = 8):
    # Calculate the similarity difference between two phase values    return (p1 - p2) / hash_size ** 2


def check_blur(image, ref_image=None):
    """Comprehensive evaluation of blur quality of images"""
    gray = (image, cv2.COLOR_BGR2GRAY)

    # Laplacian Variance    laplacian_var = (gray, cv2.CV_64F).var()

    # Fourier Transformation    f = .fft2(gray)
    fshift = (f)
    magnitude_spectrum = 20 * ((fshift))
    fourier_energy = (magnitude_spectrum) / (magnitude_spectrum.shape[0] * magnitude_spectrum.shape[1])

    # Tenengrad Method    gradient_x = (gray, cv2.CV_64F, 1, 0, ksize=3)
    gradient_y = (gray, cv2.CV_64F, 0, 1, ksize=3)
    gradient_magnitude = (gradient_x**2 + gradient_y**2)
    tenengrad_value = (gradient_magnitude)

    #Entropy    hist = ([gray], [0], None, [256], [0, 256])
    hist_norm = () / ()
    entropy_value = entropy(hist_norm, base=2)

    # SSIM (if there is a reference image)    ssim_score = None
    if ref_image is not None:
        gray_ref = (ref_image, cv2.COLOR_BGR2GRAY)
        ssim_score, _ = ssim(gray, gray_ref, full=True)

    return laplacian_var, fourier_energy, tenengrad_value, entropy_value, ssim_score


def process_images(filepath, output_dir):
    # Get file extension    file_extension = (filepath)[1].lower()

    # Check if the image is corrupted    pil_result, img_pil = check_image_with_pil(filepath)
    opencv_result, img_opencv = check_image_with_opencv(filepath)
    header_result, header_error = check_file_header(filepath)

    # If the image is not corrupted, continue processing    if pil_result and opencv_result and header_result:
        
        # Get file size bytes        file_size = (filepath)
        
        # Get resolution        width, height = img_pil.size
        
        # Get color mode        color_mode = img_pil.mode
        
        # Get the bit depth        bit_depth = img_pil.bits if hasattr(img_pil, 'bits') else None
        
        # Get the number of channels        channels = len(color_mode) if isinstance(color_mode, str) else None
        
        # Get the compression type        compression = img_pil.('compression', 'Unknown')
        
        # Get EXIF ​​data        orientation = get_exif_orientation(img_pil)
        
        # Update the image based on rotation information        if not (orientation is None or orientation==1):
            exif_update_image_files(img_pil, orientation, filepath, (output_dir,'exif'))

        # Calculate MD5 verification code        md5_checksum = compute_md5(filepath)

        # Calculate the phash16 verification code        hex_string = compute_phash(filepath, hash_size=16)

        # # Get histogram        # hist = img_pil.histogram()

        laplacian_var, fourier_energy, tenengrad_value, entropy_value, ssim_score = check_blur(img_opencv)

        log_entry = {
            'filename': filepath,
            'file_extension': file_extension,
            'pil_check': pil_result,
            'opencv_check': opencv_result,
            'header_check': header_result,
            'header_error': header_error,
            'file_size': file_size,
            'resolution': (width, height),
            'color_mode': color_mode,
            'bit_depth': bit_depth,
            'channels': channels,
            'compression': compression,
            'exif_data': orientation,
            'md5_checksum': md5_checksum,
            'phash16_checksum': hex_string,
            'laplacian_var': laplacian_var,
            'fourier_energy': fourier_energy,
            'tenengrad_value': tenengrad_value,
            'entropy_value': entropy_value,
            'ssim_score': ssim_score
        }
    else:
        log_entry = {
            'filename': filepath,
            'file_extension': file_extension,
            'pil_check': pil_result,
            'opencv_check': opencv_result,
            'header_check': header_result,
            'header_error': header_error,

        }
        # Copy the corrupted file to the specified output directory        (filepath, (output_dir, 'broken'))

    # Output result    print(f"file name: {filepath}")
    print(f"PILexamine: {'success' if pil_result else 'fail'}")
    print(f"OpenCVexamine: {'success' if opencv_result else 'fail'}")
    print(f"文件头examine: {'success' if header_result else 'fail'} - {header_error}")
    print("-" * 40)

    return log_entry


def write_to_csv(log_entries, output_path):
    fieldnames = [
        'filename', 'file_extension', 'pil_check', 'opencv_check', 'header_check', 'header_error', \
        'file_size', 'resolution', 'color_mode', 'bit_depth','channels', 'compression', 'exif_data', 'md5_checksum', 'phash16_checksum', \
        'laplacian_var', 'fourier_energy', 'tenengrad_value', 'entropy_value', 'ssim_score'
    ]
    mode = 'a' if (output_path) else 'w'
    with open(output_path, mode, newline='', encoding='utf-8-sig') as csvfile:
        writer = (csvfile, fieldnames=fieldnames)
        if mode == 'w':
            ()
        for entry in log_entries:
            (entry)


def main(input_dir, output_dir):
    (output_dir, exist_ok=True)
    output_csv_path = (output_dir, 'image_integrity_report.csv')

    filepaths = []
    # traverse all files in the input directory, including subdirectories    for root, dir, fs in tqdm((input_dir), desc='Processing Images...'):
        ([(root, f) for f in fs if ().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.tiff'))])
    print(f"Found {len(filepaths)} images to process.")

    # Use thread pool for parallel processing    batch_size = 100  # Batch size per processing    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = {(process_images, fp, output_dir): fp for fp in filepaths}
        processed_entries = []
        for future in tqdm(as_completed(futures), desc='Writing CSV...'):
            try:
                log_entry = ()
                processed_entries.append(log_entry)
                # print(f"log_entry: {log_entry}")
                # Write to CSV when the batch size is reached                if len(processed_entries) >= batch_size:
                    write_to_csv(processed_entries, output_csv_path)
                    processed_entries.clear()
            except Exception as exc:
                print(f'{futures[future]} generated an exception: {exc}')

        # Write remaining data        if processed_entries:
            write_to_csv(processed_entries, output_csv_path)

    print("Report has been generated.")


if __name__ == "__main__":
    # Example usage    input_directory = "your_inputpath"
    output_directory = "your_outputpath"
    main(input_directory, output_directory)

This is the article about developing image data cleaning & image quality inspection tools based on Python. For more related content on Python image data cleaning and quality inspection, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!