Python multithreading multiple files at the same time
When you need to perform the same operation on a large number of files, traversing them one by one is very time-consuming. At this point, we can use Python's multi-threaded operation to greatly improve processing efficiency and reduce processing time.
Background to the issue
For example, let's say we now need to read out all the videos from underneath a folder and then process each video frame by frame.
As the video frame-by-frame processing itself is a relatively time-consuming task, if the serial way to deal with each video file in order is very inefficient, at this point, an easier solution is to use Python's multi-threaded processing.
Defining Generic Handler Functions
Concurrency is suitable for processing similar tasks. For example, if we need to process each frame in a video, and the processing function accepts a list of video names and processes the videos within the list sequentially, then we can build the following processing function:
import cv2 def func(video_names): for video_name in video_names: cap = (video_name) while True: ret, frame = () if ret: # process temp frame else: break
In this way, we only need to divide the names of the videos to be processed into multiple sub-lists according to the number of threads opened for concurrent batch processing.
Multi-threaded Thread
Multithreading in Python is realized through the threading library, in addition, multiprocessing can also realize concurrent processing, there is a certain difference between the two. Here we use threading to realize concurrent processing of multiple different files.
import threading # video_names_list = [part_names_1_list, part_names_2_list, ..., part_names_k_list] for part_video in video_names_list: thread = (target=func, args=([part_video])) ()
Here, the list of file names to be processed is first divided into sublists, and then a thread is opened for each sublist to be processed.
Python multithreaded file manipulation
Use python to write one million URLs in a csv file to a mongo database, using multithreading here.
Directly post the code as follows:
import os import threading # Import process import csv import time from Mongo_cache import MongoCache import import winsound NUM_THREAD = 5 COUNT = 0 lock = () cache = MongoCache() # Database connection initialization def worker(): """ func: read data from a csv file and return the data """ for path in (()): #print("Current working directory", path) file_name = ('.') #print(file_name) if file_name[-1] == 'csv': #print("The address is:", path) file = open(path) data = (file) return data else: pass def save_info(data,i, num_retries=2): """ func: save the data """ global COUNT global lock global cache for _, website in data: try: () #print("Thread {} is running".format(threading.current_thread().name, i)) item = {'website':website} cache(item) COUNT += 1 except: if num_retries > 0: save_info(data, i, num_retries-1) finally: () def main(): """ Starting a thread """ print("start working") print("working...") data = worker() threads = [] #Set the main thread for i in range(NUM_THREAD): t = (target=save_info, args=(data, i)) (t) for i in range(NUM_THREAD): threads[i].start() for i in range(NUM_THREAD): threads[i].join() print("all was done!") if __name__ == '__main__': s_time = () main() e_time = () print("Total number of message entries:", COUNT) print("Total time consumed:", e_time-s_time) speak = ('') ("Good morning, eric, end of program!")
Data Storage Module
import pickle import zlib from import Binary from datetime import datetime, timedelta from pymongo import MongoClient import time class MongoCache(object): def __init__(self, client=None, expires=timedelta(days=30)): = MongoClient('localhost', 27017) if client is None else client = #.create_index('timestamp', expireAfterSeconds=expires.total_seconds()) #Setting the automatic deletion time def __call__(self,url): (url) #print("Saved successfully") def __contains__(self,url): try: self[url] except KeyError: return False else: return True def __getitem__(self, url): record = .find_one({'_id':url}) if record: return ((record['result'])) else: raise KeyError(url + 'dose not exist') def __setitem__(self, url, result): record = {'result': Binary(((result))), 'timestamp':()} ({'_id':url},{'$set':record},upsert=True) def clear(self): ()
The time taken to save one million URLs from the csv file to the database is
start working working... all was done! Total number of messages: 1000000 total time consumption: 427.4034459590912
summarize
The above is a personal experience, I hope it can give you a reference, and I hope you can support me more.