SoFunction
Updated on 2025-04-14

Methods for implementing large file slice upload and breakpoint continuous transmission using Python

summary

This article uses python to realize large file slice upload and supports breakpoint continuous transmission. It divides the functions into four functional modules: obtaining the upload file interface status, obtaining temporary folder status information, slicing upload, and slicing merging. Provide detailed ideas and functional source codes.

Overall architecture process

Step example: 1. After the front-end receives the user's uploaded file, it will perform the following operations:

  • Calculate file hash value: Calculate the hash value of the file through a specific algorithm.
  • Pass parameters to the backend: Pass the file path (including file name), target machine id, file hash value, and user id to the backend to determine the file status interface get_file_upload_status.

After the backend receives the data from the front end, it will process it according to the following logic:Check if the file already exists: Determine whether the file exists. If it already exists, return to the front end and let the user choose whether to overwrite the file.Check upload records in Redis: If it does not exist, further determine whether there is a record in redis that the file is being uploaded.

  • No upload record: If not, use the redis String data type to store file upload status, the key is "machine_id + file path (for example, 118/tmp/)", and the value is homemade one
{
'hash_code': documenthashvalue,
'person_id': user_id
}
  • Upload history: If there is a record that is being uploaded in Redis, then you can continue to upload by breaking down, otherwise other people will be prevented from uploading.

2. When the file status passes, the front-end will call the get_file_dir_info interface and pass it into the host_id, user_id, and file path (including file name). The get_file_dir_info interface has three main functions:

  • Check temporary folders: Determine whether the "temporary folder" used to store slices exists. The full path to the temporary folder is spliced ​​with "File Path + : + User ID", such as "/tmp/:12". If the folder does not exist, it will be created automatically. After creation is completed, the temporary folder path is hashed, and then the relevant information is stored with the help of Redis, using the hash value as the key, and the full path of the temporary folder is used as the value, which facilitates quick positioning when deleting the folder in the subsequent folder.
  • Check the temporary file upload record: Check whether the "Slice Upload Record Temporary File" under "Temporary Folder" already exists. If the file exists, read the contents; if it does not exist, initialize its contents. The content of the "temporary record file" is as follows:
class FileObject():
    def __init__(self,name,size,chunk,path=None,num=0):
         = name #file name         = size #Folder uploaded size         = chunk #Number of blocks         = path # Path         = num #There are blocks in the folder, and the design is redundant
  • Place the merge script: Place script files in a temporary folder to provide support for subsequent file processing processes.

Finally, iterate through the entire temporary folder and obtain a list of the number of uploaded blocks of the file (to support breakpoint continuous transmission), and return to the front-end temporary record file information, the hash value of the temporary folder, and the uploaded file block list.

3. The front-end slices the file in 10M, and continues the file_chunk_upload according to the information obtained by the above interface (file name, uploaded file block list) or directly transmits file_chunk_upload. Through the information in the list, the uploaded file blocks can be skipped and blocks that are lost for other reasons can be filled. 4. After the upload is completed, the front-end will call the get_file_dir_info interface again to obtain the number and list of existing file blocks in the current folder, and compare it with your own chunks. If successful, the merge file interface is called. 5. When the front-end initiates a merge request, the bigfile_merge interface will be called, and key parameters such as user id, file block number, file path and temporary folder path will be passed in at the same time. After receiving the data from the front end, the backend launches a series of verification and operations:

  • Parameter verification: Perform accuracy verification of the incoming path and the number of file blocks, and start merging if it is correct.
  • Merge method selection: The merge function is mainly divided into two execution paths. First, check whether the target machine supports running scripts. If supported, directly execute the incoming scripts to complete the merge; if not supported, execute the operation method of the target machine file to perform the merge.
  • Breakpoint continuation judgment: Before starting the merge, you must first confirm whether the file that needs to be merged already exists. If the file already exists, it means that this is a breakpoint file. At this time, the size and chunk values ​​in the temporary record file need to be read to determine the actual size of the file and the location of the merged block. Once it is found that the actual file size exceeds the value in the record file, it indicates that the merging process has been interrupted. At this time, the file pointer must be adjusted to the position corresponding to the record value, and then the subsequent merging must be started from the block corresponding to the record value.
  • Record update: Every time a piece of file merge is successfully completed, the new size and chunk value are written to the temporary record file.
  • Final cleaning: After the merge is completed, clear the temporary folder and delete the status information of the file upload record in Redis.

Complete a file upload operation here

Technical details

  • API

Get the upload file status interface

get_file_upload_status Check whether the file to be uploaded exists or breakpoints continue

class FileUploadStatus(APIView):
    
    def post(self, request, *args, **kwargs):
        
        #First determine whether the file exists under the target address        #If it exists, prompt whether it needs to be overwritten        try:
            forms = ()
            host_id = forms['host_id']
            
            full_path = forms['full_path']
            hash_code = forms['hash_code']
            person_id = forms['person_id']
        except Exception as e:
            ("The front-end parameter parsing error",repr(e))
        try:
            err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id) #Use the paramiko module to obtain the target machine connection, here readers can use other methods            if err:
                return Response({'code': -1, 'data': host_id, 'msg': "Error connecting to the target host"}, status=status.HTTP_400_BAD_REQUEST)
        except Exception as e:
            ("Connecting the target machine failed",repr(e))
        
        try:
            #Determine whether the file exists on the machine            tmp_sftp._sftp.stat(full_path)
            return Response({'code': -1, 'data': full_path, 'msg': "The file already exists, notify whether it is overwritten"}, status=status.HTTP_400_BAD_REQUEST)
        except IOError:
            #Secondly determine whether it exists in redis to prevent the same person from transmitting the same document in a short period of time, but the previous person has not been merged.            #If none of them are created, create a new key hashcode and put it into redis, and notify the front-end to upload            try:
                r = get_redis() #Get redis connection instance, reader implements it by themselves            except Exception as e:
                ("Redis acquisition failed",repr(e))
            #Example: 18115/app/home/            #Get records in redis            data = (str(host_id)+full_path)
            if not data:
                #Put key and hashcode into redis                #Add person_id                file_dict = {}
                file_dict['hash_code'] = hash_code
                file_dict['person_id'] = person_id
                (str(host_id)+full_path, str(file_dict))
                return Response({'code': 1, 'data': full_path, 'msg': "The file does not exist, can be uploaded"}, status=status.HTTP_200_OK)
            else:
                #If there is a record in redis, the host_id+ path can intercept the same file to be transferred on the target machine, but it will also intercept the same person's breakpoint transmission                #So the temporary folder path information with person_id is saved in the value of this key                #Use person_id to release the same person                retrieved_data = (str(host_id)+full_path)

                # Convert the retrieved data into a dictionary object                try:
                    retrieved_dict = eval(retrieved_data.decode())
                except Exception as e:
                    ("Redis gets the hashcode decoding of the target file",repr(e))
                if person_id ==retrieved_dict['person_id']:
                    return Response({'code': 1, 'data': full_path, 'msg': "The judgment of the breakpoint continuous transmission is passed"}, status=status.HTTP_200_OK)
                else:
                    return Response({'code': 2, 'data': full_path, 'msg': "This file is being uploaded, please do not operate it multiple times"}, status=status.HTTP_200_OK)

Get temporary folder status information interface

get_file_dir_info Get temporary folder status information interface

class GetFileDirInfo(APIView):
    
    def post(self, request, *args, **kwargs):
        forms = ()
        host_id = forms['host_id']
        person_id = forms['person_id']
        full_path = forms['full_path']
        err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id)#Use the paramiko module to obtain the target machine connection, here readers can use other methods        if err:
            return Response({'code': -1, 'data': host_id, 'msg': "Error connecting to the target host"}, status=status.HTTP_400_BAD_REQUEST)
        #Separate pathname and filename        #Example: /home/app        file_path,file_name = (full_path)
        if file_path[-1] != "/":
                file_path += "/"
        #Separate file name and file extension        #Example: 2-2.jpg        short_name,ext = (file_name)
        #Construct the folder name that stores temporary files        #Example: /home/app/:12/        tmp_dir = full_path +":"+ str(person_id)
        # and pass the temporary folder into hashcode. Pass the redis key to hashcode value as the path. The subsequent deletion of folders is one more layer of verification        tmp = hashlib.md5()    
        (tmp_dir.encode('utf-8'))
        hash_value = ()
        
        if tmp_dir[-1] != "/":
            tmp_dir += "/"
        try:
            #Judge whether the temporary folder exists            tmp_sftp._sftp.stat(tmp_dir)
        except Exception:
            try:
                print('Create temporary folder')
                tmp_sftp._sftp.mkdir(tmp_dir)
                r = get_redis() #Get redis connection                (hash_value,tmp_dir)
                
            except Exception as e:
                ("Creating temporary folder failed",e)
                tmp_sftp._sftp.close()
                return Response({'code': -1, 'data': "", 'msg': "Creating temporary folder failed"}, status=status.HTTP_400_BAD_REQUEST)
        #If the temporary folder exists, get file transfer txt in this folder        #Initialize the record file object        fobject = FileObject(file_name,0,0,tmp_dir,0)
        #Example: /home/app/:12/        record_file = tmp_dir+short_name+".txt"
        try:
            #Determine whether the record file exists            tmp_sftp._sftp.stat(record_file)
            tmp_load_file = tmp_sftp._sftp.open(record_file,'r')
            #If the record folder exists, read its saved uploaded file size and number of blocks            for line in tmp_load_file.readlines():
                title,value = ('\n','').split(':')
                print(title + ":"+ value)
                if title=='name':
                     = value
                elif title=='size':
                     = int(value)
                elif title =='chunk':
                     = int(value)
            tmp_load_file.close()
        except IOError:
            print("The record file does not exist, create a new record file and initialize it")
            store_f=tmp_sftp._sftp.open(record_file,'w+')
            store_f.writelines(['name:'++'\n','size:'+str()+'\n',
                                'chunk:'+str()+'\n'])
            print("Create record file successfully")
            store_f.close()
            
            serializer = RecordFileSerializer(fobject)
            data = 
            data['dir_hashcode'] = hash_value
            
            
            #Upload a file that supports asynchronous merge            command = 'command -v python'
            stdin, stdout, stderr = tmp_sftp._ssh.exec_command(command)
            output = ().decode().strip()
            try:
                print('Just whether merge is there')
                tmp_sftp._sftp.stat(tmp_dir + '')
            except Exception as e:
                if output:
                    print('Target machine has Python installed')
                    current_path = ((__file__))
                    if current_path[-1] != "/":
                        current_path += "/"
                    try:
                        print('The address that needs to be transferred',current_path+'')
                        print('Destination Address',tmp_dir + '')
                        tmp_sftp._sftp.put(current_path+'',tmp_dir + '')
                    except Exception as e:
                        ("Send local merge file failed",e)
                        tmp_sftp._sftp.close()
                        return Response({'code': -1, 'data': "", 'msg': "Send local merge file failed"}, status=status.HTTP_400_BAD_REQUEST)
                    #Send merge file                else:
                    tmp_sftp._sftp.close()
                    return Response({'code': -1, 'data': "", 'msg': "The target machine is not installedpython"}, status=status.HTTP_400_BAD_REQUEST)
            tmp_sftp._sftp.close()
            return Response({'code': 0, 'data': data, 'msg': "Create a new record file"}, status=status.HTTP_200_OK)
        
        file_list = tmp_sftp._sftp.listdir(tmp_dir)
        # There is a txt record file under the folder, and the total number of file blocks needs to be reduced by one         = len(file_list)-2
        chunk_list =[]
        for file in file_list:
            parts = (":", 1)  # Divide from right to left into up to two parts by colon
            if len(parts) == 2:
                last_part = parts[1]  # Get the last part                try:
                    chunk_list.append(int(last_part))
                except Exception as e:
                    continue
                print("Last value:", last_part)
            else:
                print("No colon found, skip")
        serializer = RecordFileSerializer(fobject)
        data = 
        data['dir_hashcode'] = hash_value
        data['chunk_list'] = chunk_list
        return Response({'code': 0, 'data': data, 'msg': "Record file already exists"}, status=status.HTTP_200_OK) 

Slice upload function

file_chunk_upload slice upload function

class FileChunkUploadHandler():
    
    def __init__(self, redis_host,request=None,file_info=None,chunk_num=0,people_id =0):
         = request
        self.file_info = ('file_info')
        # forms = ()
        #Data block information        
        #How many data blocks        self.chunk_num = ('chunk_num')
        self.full_path = ('full_path')
        self.person_id = ('person_id')
        self.redis_host = redis_host
    
    
    def upload_file(self):
        print("%s file block starts transfer",self.chunk_num)
        full_path = (self.full_path)
        file_path,file_name = (full_path)
        host_id = .query_params.get('host_id', None)
        err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id)
        #Separate extensions and file names        #2-2  .jpg
        short_name,ext = (file_name)
        if file_path[-1] != "/":
            file_path += "/"
        #The folder where temporary files are stored, the interface to obtain temporary folder status has been created, and the verification is performed in the same format.        #/home/app/:12
        tmp_dir = file_path+file_name+":"+ str(self.person_id)
        if tmp_dir[-1] != "/":
            tmp_dir += "/"
        #Temporary file name        #/home/app/:12/12-2-2:1
        tmp_file_name = tmp_dir + str(self.person_id) + "-"  + short_name + ":" + str(self.chunk_num)
        
        try:
            tmp_sftp._sftp.stat(tmp_dir)
        except IOError as e:
            ("Storage temporary folder does not exist",e)
            return 0,repr(e)
        
        try:
            tmp_sftp._sftp.stat(tmp_file_name)
        except IOError:
            try:
                #Upload and store file blocks                print('Create temporary file blocks',tmp_file_name)
                remote_file = tmp_sftp._sftp.open(tmp_file_name, mode="wb")
                my_bytes = self.file_info.read()     
                remote_file.write(my_bytes)
                remote_file.close()
            except Exception as e:
                ("Uploading file block failed",e)
                return 0,repr(e)
            print("Write to file complete:",tmp_file_name)
        record_file = tmp_dir+short_name+".txt"
            
        #Update upload record file information        fobject = FileObject(file_name,0,0)
        tmp_load_file = tmp_sftp._sftp.open(record_file,'r')
        for line in tmp_load_file.readlines():
            title,value = ('\n','').split(':')
            print(title + ":"+ value)
            if title=='name':
                 = value
            elif title=='size':
                 = int(value)
            elif title =='chunk':
                 = int(value)
        tmp_load_file.close()
        
        try:
            tmp_sftp._sftp.stat(record_file)
            load_file = tmp_sftp._sftp.open(record_file,'w+')
            load_file.writelines(['name:'++'\n','size:'+str()+'\n',
                            'chunk:'+str(0)+'\n'])
        except Exception as e:
            (e)
        tmp_sftp.close()
        return 1,None

File merging function

bigfile_merge file merging and verification function

class RemoteBigFileUploadHandler():
    
    def __init__(self, redis_host,request=None, chat=False,tmp_path=None,chunk_num=0,person_id=0):
         = request
        forms = ()
         = chat
        #Data block size        # self.chunk_size = forms['chunk_size']
        self.tmp_path = forms['tmp_path']
        #Number of data blocks        self.chunk_num = forms['chunk_num']
        self.full_path = forms['full_path']
        self.person_id = forms['person_id']
        self.redis_host = redis_host
        
        if :
             = 0
             = ()
            #Data progress chat room            self.channel_layer = get_channel_layer()
             = .query_params.get('chat_room', None)
             = .query_params.get('total', None)
            if not  or not :
                raise KeyError('params: chat_room and total is needed')
            
    
    def file_isavailable(self):
        
        tmp_path = self.tmp_path
        tmp_sftp = self.get_session()
        file_list = tmp_sftp._sftp.listdir(tmp_path)
        # There is a txt record file under the folder, and the total number of file blocks needs to be reduced by one        true_num = len(file_list)-2
        if true_num!=self.chunk_num:
            tmp_sftp._sftp.close()
            return False
        else:
            tmp_sftp._sftp.close()
            return True
    
    def merge_file(self):
        #------------------------------------------------------------------------------------------------------------------------------        host_id = .query_params.get('host_id', None)
        full_path = (self.full_path)
        file_path,file_name = (full_path)
        if file_path[-1] != "/":
            file_path += "/"
        #Separate extensions and file names        short_name,ext = (file_name)
        
        tmp_path = self.tmp_path
        tmp_sftp = self.get_session()
        
        
        file_list = tmp_sftp._sftp.listdir(tmp_path)
        try:
            tmp_sftp._sftp.stat(full_path)
            print("The file already exists")
        except IOError:
            print("The file does not exist")
            print("Create a new file")
            new_f = tmp_sftp._sftp.open(full_path,mode='a')
            new_f.write("")
            new_f.close()
        #Judge whether the file exists        if '' in file_list:
            com_path = tmp_path + ''
            command = f"nohup python {com_path} {self.person_id} {self.chunk_num} &"
            stdin, stdout, stderr = tmp_sftp._ssh.exec_command(command)
            merging = True
            
            while merging:
                if :
                    # websocket echo progress to obtain the current file bytes                     = tmp_sftp._sftp.stat(full_path).st_size
                    print('current',)
                    print('total',)
                    self.call_back()
                    if  >= int():
                        merging = False
                    (1)
                else :
                    merging = False
        else:
        #Create or open the user's storage file            
                
            # ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------            # rb+ Append overwrite method to open the user needs to store the file, the initial position of the pointer is the beginning of the file            remote_file = tmp_sftp._sftp.open(full_path,mode='rb+')
            
            #The default current block number starts from 1            i=1
            #Create a file record object            fobject = FileObject(file_name,0,0)
            print('Create a record object successfully')
            # Renewal record file            record_file = tmp_path+short_name+".txt"
            try :
                #If the record file exists, the storage file is likely to exist.                #If two files are opened error, go to excpt to recreate them                tmp_sftp._sftp.stat(record_file)
                upload_file = tmp_sftp._sftp.stat(full_path)
                
                #This is also the size of the local file                temp_size = upload_file.st_size
                print("Record file already exists")
                print("Current file real data size:",temp_size)
                # Read the file and get the number of uploaded blocks of the record, as well as the total number of blocks of size                tmp_load_file = tmp_sftp._sftp.open(record_file,'r')
                for line in tmp_load_file.readlines():
                    title,value = ('\n','').split(':')
                    print(title + ":"+ value)
                    if title=='name':
                         = value
                    elif title=='size':
                         = int(value)
                    elif title =='chunk':
                         = int(value)
                tmp_load_file.close()
            except IOError:
                print("The record file does not exist, create a new record file and initialize it")
                temp_size = 0
                store_f=tmp_sftp._sftp.open(record_file,'w+')
                store_f.writelines(['name:'++'\n','size:'+str()+'\n',
                                    'chunk:'+str()+'\n'])
                print("Create record file successfully")
                store_f.close()
                    
            # ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------            print("Real file data size",temp_size)
            print("Record file data size",)
            if temp_size>=:
                #If the actual file is larger than the record file, it means that the merge has been interrupted                print("Adjust pointer")
                remote_file.seek()
                i = +1
            
            tmp_file_name = str(self.person_id) + "-"  + short_name + ":"
            try:
                while i<=self.chunk_num:
                    if  tmp_file_name + str(i) in file_list:
                        print('true')
                        #Temporary folder/temporary file name                        file_path = tmp_path+tmp_file_name + str(i)
                        with tmp_sftp._sftp.file(file_path,'rb') as input_file:
                            print(file_path)
                            file_content = input_file.read()
                            remote_file.write(file_content)
                             += len(file_content)
                    i = i+1
                     += 1
                    if :
                        # websocket echo progress to obtain the current file bytes                         = tmp_sftp._sftp.stat(full_path).st_size
                        print('current',)
                        print('total',)
                        self.call_back()
                    #Records of blocks and sizes that have been written                    load_file = tmp_sftp._sftp.open(record_file,'w+')
                    load_file.writelines(['name:'++'\n','size:'+str()+'\n',
                                            'chunk:'+str()+'\n'])
                    load_file.close()
                    
            except Exception as e:
                ("Merge file exception",e)
                #------------------------------------------------------------------------------------------------------------------------------                #Delete all files in the temporary folder            remote_file.close()
        try:
            for name in file_list:
                del_path = tmp_path  + name
                tmp_sftp._sftp.remove(del_path)
                # Delete empty temporary folders            tmp_sftp._sftp.rmdir(tmp_path)
        except Exception as e:
            ("Delete file exception",e)
        
        #Close the operation file resources            
        tmp_sftp._sftp.close()
        #Delete the temporary record of the file in Redis, one is the folder hashcode, which is used to verify and delete the target folder        #One is the host_id+ path, which is used to mark the file under this path being transferred        r = self.redis_host
        data = (str(host_id)+full_path)
        if data:
            (str(host_id)+full_path)
        tmp = hashlib.md5()    
        (tmp_path.encode('utf-8'))
        hash_value = ()
        (hash_value)
        print("File Merge End")
        return 1,None

summary

This article provides an idea for implementing the large python file upload function, hoping that you can bring you some ideas and inspiration.

The above is the detailed content of the method of using Python to implement large file slice upload and breakpoint continuous transmission. For more information about Python large file slice upload and breakpoint continuous transmission, please pay attention to my other related articles!