With more and more images collected in the data set, various problems such as very messy data format, inconsistent quality, and damage to some images have emerged.
This program provides various image data cleaning and image quality check functions to prevent various abnormalities when model training loads data.
1. Use various methods to read the image to check whether the image is damaged
2. Read the image exit information to prevent labeling exceptions
3. Record image information
① Image encoding format, resolution, number of channels, and file size to facilitate judgment of other image attributes
② MD5, PHash16 equal values, used to determine whether there is duplication
③ Peak signal-to-noise ratio (PSNR), structural similarity (SSIM), etc., used to judge image quality
Complete code
#!/usr/bin/env python # -*- encoding: utf-8 -*- # Function: Image data cleaning & image quality inspection# Author: AYangSN# Time: 2025-03-12# Version: 1.0 # here is important imports import csv import os import sys import glob import shutil import argparse import cv2 import hashlib import imagehash import numpy as np from tqdm import tqdm from PIL import Image, ImageOps, ExifTags import pandas as pd from import ThreadPoolExecutor, as_completed from import structural_similarity as ssim from import entropy def check_image_with_pil(filepath): """Use PIL to check if the image is corrupted""" try: img = (filepath) () # Verify image integrity img = (filepath) # Open again to make sure the image can load normally return True, img except Exception as e: return False, str(e) def check_image_with_opencv(filepath): """Use OpenCV to check if the image is corrupted""" try: image = (filepath) if image is None or == 0: return False, "OpenCV cannot load images" return True, image except Exception as e: return False, str(e) def check_file_header(filepath): """Check whether the image format is correct by reading the file header information""" valid_headers = { 'JPEG': b'\xff\xd8\xff', 'PNG': b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a', 'GIF87a': b'GIF87a', 'GIF89a': b'GIF89a', 'BMP': b'BM' } with open(filepath, 'rb') as f: header = (8) # Read the first 8 bytes to overwrite all formats for format, magic in valid_headers.items(): if (magic): return True, None return False, "Unknown file header" def get_exif_orientation(image): try: exif = image._getexif() except AttributeError: exif = None if exif is None: return None exif = { [k]: v for k, v in () if k in } # Obtain image direction information orientation = ('Orientation', None) return orientation def exif_update_image_files(image, orientation, image_file, output_dir): '''Rotate the picture according to the parameters''' if orientation == 2: # left-to-right mirror image = (image) elif orientation == 3: # rotate 180 image = (Image.ROTATE_180) elif orientation == 4: # top-to-bottom mirror image = (image) elif orientation == 5: # top-to-left mirror image = ((Image.ROTATE_270)) elif orientation == 6: # rotate 270 image = (Image.ROTATE_270) elif orientation == 7: # top-to-right mirror image = ((Image.ROTATE_90)) elif orientation == 8: # rotate 90 image = (Image.ROTATE_90) else: pass # Generate output path outpath = "{}/{}".format(output_dir, orientation) (outpath, exist_ok=True) # Use opencv to read to remove exif information img = ((image), cv2.COLOR_RGB2BGR) # Get the image name _, imgname = (image_file) # Resave the picture (outpath+'/'+imgname, img) def compute_md5(filepath): """Calculate the MD5 value of the file""" hash_md5 = hashlib.md5() with open(filepath, "rb") as f: for chunk in iter(lambda: (4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def compute_phash(imgpath, hash_size=16): # Calculate the phash value of the image img = (imgpath) phash = (img, hash_size=hash_size, highfreq_factor=4) hex_string = str(phash) return hex_string def diff_phash(p1, p2, hash_size = 8): # Calculate the similarity difference between two phase values return (p1 - p2) / hash_size ** 2 def check_blur(image, ref_image=None): """Comprehensive evaluation of blur quality of images""" gray = (image, cv2.COLOR_BGR2GRAY) # Laplacian Variance laplacian_var = (gray, cv2.CV_64F).var() # Fourier Transformation f = .fft2(gray) fshift = (f) magnitude_spectrum = 20 * ((fshift)) fourier_energy = (magnitude_spectrum) / (magnitude_spectrum.shape[0] * magnitude_spectrum.shape[1]) # Tenengrad Method gradient_x = (gray, cv2.CV_64F, 1, 0, ksize=3) gradient_y = (gray, cv2.CV_64F, 0, 1, ksize=3) gradient_magnitude = (gradient_x**2 + gradient_y**2) tenengrad_value = (gradient_magnitude) #Entropy hist = ([gray], [0], None, [256], [0, 256]) hist_norm = () / () entropy_value = entropy(hist_norm, base=2) # SSIM (if there is a reference image) ssim_score = None if ref_image is not None: gray_ref = (ref_image, cv2.COLOR_BGR2GRAY) ssim_score, _ = ssim(gray, gray_ref, full=True) return laplacian_var, fourier_energy, tenengrad_value, entropy_value, ssim_score def process_images(filepath, output_dir): # Get file extension file_extension = (filepath)[1].lower() # Check if the image is corrupted pil_result, img_pil = check_image_with_pil(filepath) opencv_result, img_opencv = check_image_with_opencv(filepath) header_result, header_error = check_file_header(filepath) # If the image is not corrupted, continue processing if pil_result and opencv_result and header_result: # Get file size bytes file_size = (filepath) # Get resolution width, height = img_pil.size # Get color mode color_mode = img_pil.mode # Get the bit depth bit_depth = img_pil.bits if hasattr(img_pil, 'bits') else None # Get the number of channels channels = len(color_mode) if isinstance(color_mode, str) else None # Get the compression type compression = img_pil.('compression', 'Unknown') # Get EXIF data orientation = get_exif_orientation(img_pil) # Update the image based on rotation information if not (orientation is None or orientation==1): exif_update_image_files(img_pil, orientation, filepath, (output_dir,'exif')) # Calculate MD5 verification code md5_checksum = compute_md5(filepath) # Calculate the phash16 verification code hex_string = compute_phash(filepath, hash_size=16) # # Get histogram # hist = img_pil.histogram() laplacian_var, fourier_energy, tenengrad_value, entropy_value, ssim_score = check_blur(img_opencv) log_entry = { 'filename': filepath, 'file_extension': file_extension, 'pil_check': pil_result, 'opencv_check': opencv_result, 'header_check': header_result, 'header_error': header_error, 'file_size': file_size, 'resolution': (width, height), 'color_mode': color_mode, 'bit_depth': bit_depth, 'channels': channels, 'compression': compression, 'exif_data': orientation, 'md5_checksum': md5_checksum, 'phash16_checksum': hex_string, 'laplacian_var': laplacian_var, 'fourier_energy': fourier_energy, 'tenengrad_value': tenengrad_value, 'entropy_value': entropy_value, 'ssim_score': ssim_score } else: log_entry = { 'filename': filepath, 'file_extension': file_extension, 'pil_check': pil_result, 'opencv_check': opencv_result, 'header_check': header_result, 'header_error': header_error, } # Copy the corrupted file to the specified output directory (filepath, (output_dir, 'broken')) # Output result print(f"file name: {filepath}") print(f"PILexamine: {'success' if pil_result else 'fail'}") print(f"OpenCVexamine: {'success' if opencv_result else 'fail'}") print(f"文件头examine: {'success' if header_result else 'fail'} - {header_error}") print("-" * 40) return log_entry def write_to_csv(log_entries, output_path): fieldnames = [ 'filename', 'file_extension', 'pil_check', 'opencv_check', 'header_check', 'header_error', \ 'file_size', 'resolution', 'color_mode', 'bit_depth','channels', 'compression', 'exif_data', 'md5_checksum', 'phash16_checksum', \ 'laplacian_var', 'fourier_energy', 'tenengrad_value', 'entropy_value', 'ssim_score' ] mode = 'a' if (output_path) else 'w' with open(output_path, mode, newline='', encoding='utf-8-sig') as csvfile: writer = (csvfile, fieldnames=fieldnames) if mode == 'w': () for entry in log_entries: (entry) def main(input_dir, output_dir): (output_dir, exist_ok=True) output_csv_path = (output_dir, 'image_integrity_report.csv') filepaths = [] # traverse all files in the input directory, including subdirectories for root, dir, fs in tqdm((input_dir), desc='Processing Images...'): ([(root, f) for f in fs if ().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.tiff'))]) print(f"Found {len(filepaths)} images to process.") # Use thread pool for parallel processing batch_size = 100 # Batch size per processing with ThreadPoolExecutor(max_workers=4) as executor: futures = {(process_images, fp, output_dir): fp for fp in filepaths} processed_entries = [] for future in tqdm(as_completed(futures), desc='Writing CSV...'): try: log_entry = () processed_entries.append(log_entry) # print(f"log_entry: {log_entry}") # Write to CSV when the batch size is reached if len(processed_entries) >= batch_size: write_to_csv(processed_entries, output_csv_path) processed_entries.clear() except Exception as exc: print(f'{futures[future]} generated an exception: {exc}') # Write remaining data if processed_entries: write_to_csv(processed_entries, output_csv_path) print("Report has been generated.") if __name__ == "__main__": # Example usage input_directory = "your_inputpath" output_directory = "your_outputpath" main(input_directory, output_directory)
This is the article about developing image data cleaning & image quality inspection tools based on Python. For more related content on Python image data cleaning and quality inspection, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!