1. MapReduce
Decompose this word into Map and Reduce.
MapStage: At this stage, the input dataset is divided into small pieces and processed by multiple Map tasks. Each Map task maps the input data into a series of (key, value) pairs and generates intermediate results.
ReduceStage: At this stage, intermediate results are regrouped and sorted so that intermediate results of the same key are passed to the same Reduce task. Each Reduce task will merge, calculate intermediate results with the same key, and generate the final output.
For example, count the number of times a character appears in a very long string.
from collections import defaultdict def mapper(word): return word, 1 def reducer(key_value_pair): key, values = key_value_pair return key, sum(values) def map_reduce_function(input_list, mapper, reducer): ''' - input_list: character list - mapper: Mapping function that maps each element in the input list to a key-value pair - reducer: Aggregation function that aggregates each key-value pair in the mapping result into a key-value pair - return: Aggregation results ''' map_results = map(mapper, input_list) shuffler = defaultdict(list) for key, value in map_results: shuffler[key].append(value) return map(reducer, ()) if __name__ == "__main__": words = "python best language".split(" ") result = list(map_reduce_function(words, mapper, reducer)) print(result)
The output result is
[('python', 1), ('best', 1), ('language', 1)]
But the characteristics of MapReduce are not reflected here. It just shows the operating principle of MapReduce.
2. Implementing MapReduce based on multithreading
from collections import defaultdict import threading class MapReduceThread(): def __init__(self, input_list, mapper, shuffler): super(MapReduceThread, self).__init__() self.input_list = input_list = mapper = shuffler def run(self): map_results = map(, self.input_list) for key, value in map_results: [key].append(value) def reducer(key_value_pair): key, values = key_value_pair return key, sum(values) def mapper(word): return word, 1 def map_reduce_function(input_list, num_threads): shuffler = defaultdict(list) threads = [] chunk_size = len(input_list) // num_threads for i in range(0, len(input_list), chunk_size): chunk = input_list[i:i+chunk_size] thread = MapReduceThread(chunk, mapper, shuffler) () (thread) for thread in threads: () return map(reducer, ()) if __name__ == "__main__": words = "python is the best language for programming and python is easy to learn".split(" ") result = list(map_reduce_function(words, num_threads=4)) for i in result: print(i)
The essence here is exactly the same. Split the string into four copies, and distribute the four strings to different threads to execute, and finally reduce the execution result. However, due to Python's GIL mechanism, threads in Python are executed concurrently and cannot be implemented in parallel, so it is unreasonable to use threads to implement MapReduce in Python. (GIL mechanism: preemptive threads, cannot run multiple threads at the same time).
3. Implementing MapReduce based on multi-process
Due to the existence of the GIL mechanism in Python, true parallelism cannot be achieved. There are two solutions here. One is to use other languages, such as C, which we will not consider here; the other is to use multi-core and CPU multi-tasking capabilities.
from collections import defaultdict import multiprocessing def mapper(chunk): word_count = defaultdict(int) for word in (): word_count[word] += 1 return word_count def reducer(word_counts): result = defaultdict(int) for word_count in word_counts: for word, count in word_count.items(): result[word] += count return result def chunks(lst, n): for i in range(0, len(lst), n): yield lst[i:i + n] def map_reduce_function(text, num_processes): chunk_size = (len(text) + num_processes - 1) // num_processes chunks_list = list(chunks(text, chunk_size)) with (processes=num_processes) as pool: word_counts = (mapper, chunks_list) result = reducer(word_counts) return result if __name__ == "__main__": text = "python is the best language for programming and python is easy to learn" num_processes = 4 result = map_reduce_function(text, num_processes) for i in result: print(i, result[i])
Multi-processes are used to implement MapReduce. This is the true parallelism. It is still segmenting the data and processing these data in parallel. Only in this way can the efficient characteristics of MapReduce be reflected. But in this example, there may not be a big difference because the amount of data is too small. In practical applications, if the data set is too small, it is inapplicable and may not bring any benefits, or even cause greater overhead, resulting in performance degradation.
4. Search data in a 100GB file
The idea of MapReduce is still used here, but there are two problems
Slow reading speed
Solution:
Use chunked reading, but it should not be too small when partitioning. Because it will be serialized to the process when creating a partition, it needs to be untied in the process, so repeated serialization and deserialization will take up a lot of time. It should not be too large, because fewer processes will be created in this way, and the CPU's multi-core capabilities may not be fully utilized.
Memory consumption is particularly high
Solution:
Use generators and iterators, but need to be retrieved. For example, if the block is 8 blocks, the generator will read the content of one block at a time and return the corresponding iterator, and so on, which avoids the problem of excessive memory reading.
from datetime import datetime import multiprocessing def chunked_file_reader(file_path:str, chunk_size:int): """ Generator function: read file content in chunks - file_path: file path - chunk_size: Block size, default is 1MB """ with open(file_path, 'r', encoding='utf-8') as file: while True: chunk = (chunk_size) if not chunk: break yield chunk def search_in_chunk(chunk:str, keyword:str): """Search for keywords in file blocks - chunk: File blocks - keyword: keywords to search """ lines = ('\n') for line in lines: if keyword in line: print(f"Finished:", line) def search_in_file(file_path:str, keyword:str, chunk_size=1024*1024): """Search for keywords in files file_path: file path keyword: keywords to search chunk_size: file chunk size, 1MB """ with () as pool: for chunk in chunked_file_reader(file_path, chunk_size): pool.apply_async(search_in_chunk, args=(chunk, keyword)) if __name__ == "__main__": start = () file_path = "" keyword = "Zhang San" search_in_file(file_path, keyword) end = () print(f"Search completed,time consuming {end - start}")
The final program run time is about two and a half minutes.
This is the article about the example code of using Python to implement MapReduce. For more related content on Python to implement MapReduce, please search for my previous articles or continue browsing the following related articles. I hope everyone will support me in the future!