SoFunction
Updated on 2025-03-02

Sample code to implement MapReduce using Python

1. MapReduce

Decompose this word into Map and Reduce.

  • MapStage: At this stage, the input dataset is divided into small pieces and processed by multiple Map tasks. Each Map task maps the input data into a series of (key, value) pairs and generates intermediate results.

  • ReduceStage: At this stage, intermediate results are regrouped and sorted so that intermediate results of the same key are passed to the same Reduce task. Each Reduce task will merge, calculate intermediate results with the same key, and generate the final output.

For example, count the number of times a character appears in a very long string.

from collections import defaultdict
def mapper(word):
    return word, 1
 
def reducer(key_value_pair):
    key, values = key_value_pair
    return key, sum(values)
def map_reduce_function(input_list, mapper, reducer):
    '''
     - input_list: character list
     - mapper: Mapping function that maps each element in the input list to a key-value pair
     - reducer: Aggregation function that aggregates each key-value pair in the mapping result into a key-value pair
     - return: Aggregation results
     '''
    map_results = map(mapper, input_list)
    shuffler = defaultdict(list)
    for key, value in map_results:
        shuffler[key].append(value)
    return map(reducer, ())
 
if __name__ == "__main__":
    words = "python best language".split(" ")
    result = list(map_reduce_function(words, mapper, reducer))
    print(result)

The output result is

[('python', 1), ('best', 1), ('language', 1)]

But the characteristics of MapReduce are not reflected here. It just shows the operating principle of MapReduce.

2. Implementing MapReduce based on multithreading

from collections import defaultdict
import threading
 
class MapReduceThread():
    def __init__(self, input_list, mapper, shuffler):
        super(MapReduceThread, self).__init__()
        self.input_list = input_list
         = mapper
         = shuffler
 
    def run(self):
        map_results = map(, self.input_list)
        for key, value in map_results:
            [key].append(value)
 
def reducer(key_value_pair):
    key, values = key_value_pair
    return key, sum(values)
def mapper(word):
    return word, 1
def map_reduce_function(input_list, num_threads):
    shuffler = defaultdict(list)
    threads = []
    chunk_size = len(input_list) // num_threads
    
    for i in range(0, len(input_list), chunk_size):
        chunk = input_list[i:i+chunk_size]
        thread = MapReduceThread(chunk, mapper, shuffler)
        ()
        (thread)
    
    for thread in threads:
        ()
 
    return map(reducer, ())
 
if __name__ == "__main__":
    words = "python is the best language for programming and python is easy to learn".split(" ")
    result = list(map_reduce_function(words, num_threads=4))
    for i in result:
        print(i)

The essence here is exactly the same. Split the string into four copies, and distribute the four strings to different threads to execute, and finally reduce the execution result. However, due to Python's GIL mechanism, threads in Python are executed concurrently and cannot be implemented in parallel, so it is unreasonable to use threads to implement MapReduce in Python. (GIL mechanism: preemptive threads, cannot run multiple threads at the same time).

3. Implementing MapReduce based on multi-process

Due to the existence of the GIL mechanism in Python, true parallelism cannot be achieved. There are two solutions here. One is to use other languages, such as C, which we will not consider here; the other is to use multi-core and CPU multi-tasking capabilities.

from collections import defaultdict
import multiprocessing
 
def mapper(chunk):
    word_count = defaultdict(int)
    for word in ():
        word_count[word] += 1
    return word_count
 
def reducer(word_counts):
    result = defaultdict(int)
    for word_count in word_counts:
        for word, count in word_count.items():
            result[word] += count
    return result
 
def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
 
def map_reduce_function(text, num_processes):
    chunk_size = (len(text) + num_processes - 1) // num_processes
    chunks_list = list(chunks(text, chunk_size))
 
    with (processes=num_processes) as pool:
        word_counts = (mapper, chunks_list)
 
    result = reducer(word_counts)
    return result
 
if __name__ == "__main__":
    text = "python is the best language for programming and python is easy to learn"
    num_processes = 4
    result = map_reduce_function(text, num_processes)
    for i in result:
        print(i, result[i])

Multi-processes are used to implement MapReduce. This is the true parallelism. It is still segmenting the data and processing these data in parallel. Only in this way can the efficient characteristics of MapReduce be reflected. But in this example, there may not be a big difference because the amount of data is too small. In practical applications, if the data set is too small, it is inapplicable and may not bring any benefits, or even cause greater overhead, resulting in performance degradation.

4. Search data in a 100GB file

The idea of ​​MapReduce is still used here, but there are two problems

Slow reading speed

Solution:

Use chunked reading, but it should not be too small when partitioning. Because it will be serialized to the process when creating a partition, it needs to be untied in the process, so repeated serialization and deserialization will take up a lot of time. It should not be too large, because fewer processes will be created in this way, and the CPU's multi-core capabilities may not be fully utilized.

Memory consumption is particularly high

Solution:

Use generators and iterators, but need to be retrieved. For example, if the block is 8 blocks, the generator will read the content of one block at a time and return the corresponding iterator, and so on, which avoids the problem of excessive memory reading.

from datetime import datetime
import multiprocessing
def chunked_file_reader(file_path:str, chunk_size:int):
    """
     Generator function: read file content in chunks
     - file_path: file path
     - chunk_size: Block size, default is 1MB
     """
    with open(file_path, 'r', encoding='utf-8') as file:
        while True:
            chunk = (chunk_size)
            if not chunk:
                break
            yield chunk
 
def search_in_chunk(chunk:str, keyword:str):
    """Search for keywords in file blocks
     - chunk: File blocks
     - keyword: keywords to search
     """
    lines = ('\n')
    for line in lines:
        if keyword in line:
            print(f"Finished:", line)
 
def search_in_file(file_path:str, keyword:str, chunk_size=1024*1024):
    """Search for keywords in files
     file_path: file path
     keyword: keywords to search
     chunk_size: file chunk size, 1MB
     """
    with () as pool:
        for chunk in chunked_file_reader(file_path, chunk_size):
            pool.apply_async(search_in_chunk, args=(chunk, keyword))
        
if __name__ == "__main__":
    start = ()
    file_path = ""
    keyword = "Zhang San"
    search_in_file(file_path, keyword)
    end = ()
    print(f"Search completed,time consuming {end - start}")

The final program run time is about two and a half minutes.

This is the article about the example code of using Python to implement MapReduce. For more related content on Python to implement MapReduce, please search for my previous articles or continue browsing the following related articles. I hope everyone will support me in the future!