summary
This article uses python to realize large file slice upload and supports breakpoint continuous transmission. It divides the functions into four functional modules: obtaining the upload file interface status, obtaining temporary folder status information, slicing upload, and slicing merging. Provide detailed ideas and functional source codes.
Overall architecture process
Step example: 1. After the front-end receives the user's uploaded file, it will perform the following operations:
- Calculate file hash value: Calculate the hash value of the file through a specific algorithm.
- Pass parameters to the backend: Pass the file path (including file name), target machine id, file hash value, and user id to the backend to determine the file status interface get_file_upload_status.
After the backend receives the data from the front end, it will process it according to the following logic:Check if the file already exists: Determine whether the file exists. If it already exists, return to the front end and let the user choose whether to overwrite the file.Check upload records in Redis: If it does not exist, further determine whether there is a record in redis that the file is being uploaded.
- No upload record: If not, use the redis String data type to store file upload status, the key is "machine_id + file path (for example, 118/tmp/)", and the value is homemade one
{ 'hash_code': documenthashvalue, 'person_id': user_id }
- Upload history: If there is a record that is being uploaded in Redis, then you can continue to upload by breaking down, otherwise other people will be prevented from uploading.
2. When the file status passes, the front-end will call the get_file_dir_info interface and pass it into the host_id, user_id, and file path (including file name). The get_file_dir_info interface has three main functions:
- Check temporary folders: Determine whether the "temporary folder" used to store slices exists. The full path to the temporary folder is spliced with "File Path + : + User ID", such as "/tmp/:12". If the folder does not exist, it will be created automatically. After creation is completed, the temporary folder path is hashed, and then the relevant information is stored with the help of Redis, using the hash value as the key, and the full path of the temporary folder is used as the value, which facilitates quick positioning when deleting the folder in the subsequent folder.
- Check the temporary file upload record: Check whether the "Slice Upload Record Temporary File" under "Temporary Folder" already exists. If the file exists, read the contents; if it does not exist, initialize its contents. The content of the "temporary record file" is as follows:
class FileObject(): def __init__(self,name,size,chunk,path=None,num=0): = name #file name = size #Folder uploaded size = chunk #Number of blocks = path # Path = num #There are blocks in the folder, and the design is redundant
- Place the merge script: Place script files in a temporary folder to provide support for subsequent file processing processes.
Finally, iterate through the entire temporary folder and obtain a list of the number of uploaded blocks of the file (to support breakpoint continuous transmission), and return to the front-end temporary record file information, the hash value of the temporary folder, and the uploaded file block list.
3. The front-end slices the file in 10M, and continues the file_chunk_upload according to the information obtained by the above interface (file name, uploaded file block list) or directly transmits file_chunk_upload. Through the information in the list, the uploaded file blocks can be skipped and blocks that are lost for other reasons can be filled. 4. After the upload is completed, the front-end will call the get_file_dir_info interface again to obtain the number and list of existing file blocks in the current folder, and compare it with your own chunks. If successful, the merge file interface is called. 5. When the front-end initiates a merge request, the bigfile_merge interface will be called, and key parameters such as user id, file block number, file path and temporary folder path will be passed in at the same time. After receiving the data from the front end, the backend launches a series of verification and operations:
- Parameter verification: Perform accuracy verification of the incoming path and the number of file blocks, and start merging if it is correct.
- Merge method selection: The merge function is mainly divided into two execution paths. First, check whether the target machine supports running scripts. If supported, directly execute the incoming scripts to complete the merge; if not supported, execute the operation method of the target machine file to perform the merge.
- Breakpoint continuation judgment: Before starting the merge, you must first confirm whether the file that needs to be merged already exists. If the file already exists, it means that this is a breakpoint file. At this time, the size and chunk values in the temporary record file need to be read to determine the actual size of the file and the location of the merged block. Once it is found that the actual file size exceeds the value in the record file, it indicates that the merging process has been interrupted. At this time, the file pointer must be adjusted to the position corresponding to the record value, and then the subsequent merging must be started from the block corresponding to the record value.
- Record update: Every time a piece of file merge is successfully completed, the new size and chunk value are written to the temporary record file.
- Final cleaning: After the merge is completed, clear the temporary folder and delete the status information of the file upload record in Redis.
Complete a file upload operation here
Technical details
- API
Get the upload file status interface
get_file_upload_status Check whether the file to be uploaded exists or breakpoints continue
class FileUploadStatus(APIView): def post(self, request, *args, **kwargs): #First determine whether the file exists under the target address #If it exists, prompt whether it needs to be overwritten try: forms = () host_id = forms['host_id'] full_path = forms['full_path'] hash_code = forms['hash_code'] person_id = forms['person_id'] except Exception as e: ("The front-end parameter parsing error",repr(e)) try: err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id) #Use the paramiko module to obtain the target machine connection, here readers can use other methods if err: return Response({'code': -1, 'data': host_id, 'msg': "Error connecting to the target host"}, status=status.HTTP_400_BAD_REQUEST) except Exception as e: ("Connecting the target machine failed",repr(e)) try: #Determine whether the file exists on the machine tmp_sftp._sftp.stat(full_path) return Response({'code': -1, 'data': full_path, 'msg': "The file already exists, notify whether it is overwritten"}, status=status.HTTP_400_BAD_REQUEST) except IOError: #Secondly determine whether it exists in redis to prevent the same person from transmitting the same document in a short period of time, but the previous person has not been merged. #If none of them are created, create a new key hashcode and put it into redis, and notify the front-end to upload try: r = get_redis() #Get redis connection instance, reader implements it by themselves except Exception as e: ("Redis acquisition failed",repr(e)) #Example: 18115/app/home/ #Get records in redis data = (str(host_id)+full_path) if not data: #Put key and hashcode into redis #Add person_id file_dict = {} file_dict['hash_code'] = hash_code file_dict['person_id'] = person_id (str(host_id)+full_path, str(file_dict)) return Response({'code': 1, 'data': full_path, 'msg': "The file does not exist, can be uploaded"}, status=status.HTTP_200_OK) else: #If there is a record in redis, the host_id+ path can intercept the same file to be transferred on the target machine, but it will also intercept the same person's breakpoint transmission #So the temporary folder path information with person_id is saved in the value of this key #Use person_id to release the same person retrieved_data = (str(host_id)+full_path) # Convert the retrieved data into a dictionary object try: retrieved_dict = eval(retrieved_data.decode()) except Exception as e: ("Redis gets the hashcode decoding of the target file",repr(e)) if person_id ==retrieved_dict['person_id']: return Response({'code': 1, 'data': full_path, 'msg': "The judgment of the breakpoint continuous transmission is passed"}, status=status.HTTP_200_OK) else: return Response({'code': 2, 'data': full_path, 'msg': "This file is being uploaded, please do not operate it multiple times"}, status=status.HTTP_200_OK)
Get temporary folder status information interface
get_file_dir_info Get temporary folder status information interface
class GetFileDirInfo(APIView): def post(self, request, *args, **kwargs): forms = () host_id = forms['host_id'] person_id = forms['person_id'] full_path = forms['full_path'] err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id)#Use the paramiko module to obtain the target machine connection, here readers can use other methods if err: return Response({'code': -1, 'data': host_id, 'msg': "Error connecting to the target host"}, status=status.HTTP_400_BAD_REQUEST) #Separate pathname and filename #Example: /home/app file_path,file_name = (full_path) if file_path[-1] != "/": file_path += "/" #Separate file name and file extension #Example: 2-2.jpg short_name,ext = (file_name) #Construct the folder name that stores temporary files #Example: /home/app/:12/ tmp_dir = full_path +":"+ str(person_id) # and pass the temporary folder into hashcode. Pass the redis key to hashcode value as the path. The subsequent deletion of folders is one more layer of verification tmp = hashlib.md5() (tmp_dir.encode('utf-8')) hash_value = () if tmp_dir[-1] != "/": tmp_dir += "/" try: #Judge whether the temporary folder exists tmp_sftp._sftp.stat(tmp_dir) except Exception: try: print('Create temporary folder') tmp_sftp._sftp.mkdir(tmp_dir) r = get_redis() #Get redis connection (hash_value,tmp_dir) except Exception as e: ("Creating temporary folder failed",e) tmp_sftp._sftp.close() return Response({'code': -1, 'data': "", 'msg': "Creating temporary folder failed"}, status=status.HTTP_400_BAD_REQUEST) #If the temporary folder exists, get file transfer txt in this folder #Initialize the record file object fobject = FileObject(file_name,0,0,tmp_dir,0) #Example: /home/app/:12/ record_file = tmp_dir+short_name+".txt" try: #Determine whether the record file exists tmp_sftp._sftp.stat(record_file) tmp_load_file = tmp_sftp._sftp.open(record_file,'r') #If the record folder exists, read its saved uploaded file size and number of blocks for line in tmp_load_file.readlines(): title,value = ('\n','').split(':') print(title + ":"+ value) if title=='name': = value elif title=='size': = int(value) elif title =='chunk': = int(value) tmp_load_file.close() except IOError: print("The record file does not exist, create a new record file and initialize it") store_f=tmp_sftp._sftp.open(record_file,'w+') store_f.writelines(['name:'++'\n','size:'+str()+'\n', 'chunk:'+str()+'\n']) print("Create record file successfully") store_f.close() serializer = RecordFileSerializer(fobject) data = data['dir_hashcode'] = hash_value #Upload a file that supports asynchronous merge command = 'command -v python' stdin, stdout, stderr = tmp_sftp._ssh.exec_command(command) output = ().decode().strip() try: print('Just whether merge is there') tmp_sftp._sftp.stat(tmp_dir + '') except Exception as e: if output: print('Target machine has Python installed') current_path = ((__file__)) if current_path[-1] != "/": current_path += "/" try: print('The address that needs to be transferred',current_path+'') print('Destination Address',tmp_dir + '') tmp_sftp._sftp.put(current_path+'',tmp_dir + '') except Exception as e: ("Send local merge file failed",e) tmp_sftp._sftp.close() return Response({'code': -1, 'data': "", 'msg': "Send local merge file failed"}, status=status.HTTP_400_BAD_REQUEST) #Send merge file else: tmp_sftp._sftp.close() return Response({'code': -1, 'data': "", 'msg': "The target machine is not installedpython"}, status=status.HTTP_400_BAD_REQUEST) tmp_sftp._sftp.close() return Response({'code': 0, 'data': data, 'msg': "Create a new record file"}, status=status.HTTP_200_OK) file_list = tmp_sftp._sftp.listdir(tmp_dir) # There is a txt record file under the folder, and the total number of file blocks needs to be reduced by one = len(file_list)-2 chunk_list =[] for file in file_list: parts = (":", 1) # Divide from right to left into up to two parts by colon if len(parts) == 2: last_part = parts[1] # Get the last part try: chunk_list.append(int(last_part)) except Exception as e: continue print("Last value:", last_part) else: print("No colon found, skip") serializer = RecordFileSerializer(fobject) data = data['dir_hashcode'] = hash_value data['chunk_list'] = chunk_list return Response({'code': 0, 'data': data, 'msg': "Record file already exists"}, status=status.HTTP_200_OK)
Slice upload function
file_chunk_upload slice upload function
class FileChunkUploadHandler(): def __init__(self, redis_host,request=None,file_info=None,chunk_num=0,people_id =0): = request self.file_info = ('file_info') # forms = () #Data block information #How many data blocks self.chunk_num = ('chunk_num') self.full_path = ('full_path') self.person_id = ('person_id') self.redis_host = redis_host def upload_file(self): print("%s file block starts transfer",self.chunk_num) full_path = (self.full_path) file_path,file_name = (full_path) host_id = .query_params.get('host_id', None) err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id) #Separate extensions and file names #2-2 .jpg short_name,ext = (file_name) if file_path[-1] != "/": file_path += "/" #The folder where temporary files are stored, the interface to obtain temporary folder status has been created, and the verification is performed in the same format. #/home/app/:12 tmp_dir = file_path+file_name+":"+ str(self.person_id) if tmp_dir[-1] != "/": tmp_dir += "/" #Temporary file name #/home/app/:12/12-2-2:1 tmp_file_name = tmp_dir + str(self.person_id) + "-" + short_name + ":" + str(self.chunk_num) try: tmp_sftp._sftp.stat(tmp_dir) except IOError as e: ("Storage temporary folder does not exist",e) return 0,repr(e) try: tmp_sftp._sftp.stat(tmp_file_name) except IOError: try: #Upload and store file blocks print('Create temporary file blocks',tmp_file_name) remote_file = tmp_sftp._sftp.open(tmp_file_name, mode="wb") my_bytes = self.file_info.read() remote_file.write(my_bytes) remote_file.close() except Exception as e: ("Uploading file block failed",e) return 0,repr(e) print("Write to file complete:",tmp_file_name) record_file = tmp_dir+short_name+".txt" #Update upload record file information fobject = FileObject(file_name,0,0) tmp_load_file = tmp_sftp._sftp.open(record_file,'r') for line in tmp_load_file.readlines(): title,value = ('\n','').split(':') print(title + ":"+ value) if title=='name': = value elif title=='size': = int(value) elif title =='chunk': = int(value) tmp_load_file.close() try: tmp_sftp._sftp.stat(record_file) load_file = tmp_sftp._sftp.open(record_file,'w+') load_file.writelines(['name:'++'\n','size:'+str()+'\n', 'chunk:'+str(0)+'\n']) except Exception as e: (e) tmp_sftp.close() return 1,None
File merging function
bigfile_merge file merging and verification function
class RemoteBigFileUploadHandler(): def __init__(self, redis_host,request=None, chat=False,tmp_path=None,chunk_num=0,person_id=0): = request forms = () = chat #Data block size # self.chunk_size = forms['chunk_size'] self.tmp_path = forms['tmp_path'] #Number of data blocks self.chunk_num = forms['chunk_num'] self.full_path = forms['full_path'] self.person_id = forms['person_id'] self.redis_host = redis_host if : = 0 = () #Data progress chat room self.channel_layer = get_channel_layer() = .query_params.get('chat_room', None) = .query_params.get('total', None) if not or not : raise KeyError('params: chat_room and total is needed') def file_isavailable(self): tmp_path = self.tmp_path tmp_sftp = self.get_session() file_list = tmp_sftp._sftp.listdir(tmp_path) # There is a txt record file under the folder, and the total number of file blocks needs to be reduced by one true_num = len(file_list)-2 if true_num!=self.chunk_num: tmp_sftp._sftp.close() return False else: tmp_sftp._sftp.close() return True def merge_file(self): #------------------------------------------------------------------------------------------------------------------------------ host_id = .query_params.get('host_id', None) full_path = (self.full_path) file_path,file_name = (full_path) if file_path[-1] != "/": file_path += "/" #Separate extensions and file names short_name,ext = (file_name) tmp_path = self.tmp_path tmp_sftp = self.get_session() file_list = tmp_sftp._sftp.listdir(tmp_path) try: tmp_sftp._sftp.stat(full_path) print("The file already exists") except IOError: print("The file does not exist") print("Create a new file") new_f = tmp_sftp._sftp.open(full_path,mode='a') new_f.write("") new_f.close() #Judge whether the file exists if '' in file_list: com_path = tmp_path + '' command = f"nohup python {com_path} {self.person_id} {self.chunk_num} &" stdin, stdout, stderr = tmp_sftp._ssh.exec_command(command) merging = True while merging: if : # websocket echo progress to obtain the current file bytes = tmp_sftp._sftp.stat(full_path).st_size print('current',) print('total',) self.call_back() if >= int(): merging = False (1) else : merging = False else: #Create or open the user's storage file # --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- # rb+ Append overwrite method to open the user needs to store the file, the initial position of the pointer is the beginning of the file remote_file = tmp_sftp._sftp.open(full_path,mode='rb+') #The default current block number starts from 1 i=1 #Create a file record object fobject = FileObject(file_name,0,0) print('Create a record object successfully') # Renewal record file record_file = tmp_path+short_name+".txt" try : #If the record file exists, the storage file is likely to exist. #If two files are opened error, go to excpt to recreate them tmp_sftp._sftp.stat(record_file) upload_file = tmp_sftp._sftp.stat(full_path) #This is also the size of the local file temp_size = upload_file.st_size print("Record file already exists") print("Current file real data size:",temp_size) # Read the file and get the number of uploaded blocks of the record, as well as the total number of blocks of size tmp_load_file = tmp_sftp._sftp.open(record_file,'r') for line in tmp_load_file.readlines(): title,value = ('\n','').split(':') print(title + ":"+ value) if title=='name': = value elif title=='size': = int(value) elif title =='chunk': = int(value) tmp_load_file.close() except IOError: print("The record file does not exist, create a new record file and initialize it") temp_size = 0 store_f=tmp_sftp._sftp.open(record_file,'w+') store_f.writelines(['name:'++'\n','size:'+str()+'\n', 'chunk:'+str()+'\n']) print("Create record file successfully") store_f.close() # --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- print("Real file data size",temp_size) print("Record file data size",) if temp_size>=: #If the actual file is larger than the record file, it means that the merge has been interrupted print("Adjust pointer") remote_file.seek() i = +1 tmp_file_name = str(self.person_id) + "-" + short_name + ":" try: while i<=self.chunk_num: if tmp_file_name + str(i) in file_list: print('true') #Temporary folder/temporary file name file_path = tmp_path+tmp_file_name + str(i) with tmp_sftp._sftp.file(file_path,'rb') as input_file: print(file_path) file_content = input_file.read() remote_file.write(file_content) += len(file_content) i = i+1 += 1 if : # websocket echo progress to obtain the current file bytes = tmp_sftp._sftp.stat(full_path).st_size print('current',) print('total',) self.call_back() #Records of blocks and sizes that have been written load_file = tmp_sftp._sftp.open(record_file,'w+') load_file.writelines(['name:'++'\n','size:'+str()+'\n', 'chunk:'+str()+'\n']) load_file.close() except Exception as e: ("Merge file exception",e) #------------------------------------------------------------------------------------------------------------------------------ #Delete all files in the temporary folder remote_file.close() try: for name in file_list: del_path = tmp_path + name tmp_sftp._sftp.remove(del_path) # Delete empty temporary folders tmp_sftp._sftp.rmdir(tmp_path) except Exception as e: ("Delete file exception",e) #Close the operation file resources tmp_sftp._sftp.close() #Delete the temporary record of the file in Redis, one is the folder hashcode, which is used to verify and delete the target folder #One is the host_id+ path, which is used to mark the file under this path being transferred r = self.redis_host data = (str(host_id)+full_path) if data: (str(host_id)+full_path) tmp = hashlib.md5() (tmp_path.encode('utf-8')) hash_value = () (hash_value) print("File Merge End") return 1,None
summary
This article provides an idea for implementing the large python file upload function, hoping that you can bring you some ideas and inspiration.
The above is the detailed content of the method of using Python to implement large file slice upload and breakpoint continuous transmission. For more information about Python large file slice upload and breakpoint continuous transmission, please pay attention to my other related articles!