SoFunction
Updated on 2025-03-04

Detailed explanation of how to batch clean mongodb historical data

Batch cleaning of mongodb historical data

The original cleaning program

  • At present, many platforms on the project team have launched historical data backlogs, resulting in slow in querying data in the database, and some of the historical data have been archived, so historical data is cleaned and deleted.
  • I wrote a shell script temporarily before, which was too simple. I reused Python for transformation, added backup functions, and deleted data within the specified fields and time ranges of configuration files.

Code Article

#!/usr/local/python3/bin/python3

import configparser,,sys,os,subprocess
import pymongo,ast
# from pymongo import MongoClient
from datetime import datetime,timedelta
from urllib import parse

def init_mongodb(MongoDBAuth):
    if mongodb_auth:
        username = parse.quote_plus(mongodb_user)
        password = parse.quote_plus(mongodb_passwd)
        ConnPasswd = "mongodb://" + username + ":" + password + "@" + mongodb_ip + ":" + mongodb_port + "/"
        try:
            clients = (ConnPasswd)
            ("init mongodb conn: " + ConnPasswd)
            return clients
        except  Exception as e:
            ("use mongodb user pass conn err: " +  str(e))
            return False
    else:
        try:
            clients = (mongodb_ip, int(mongodb_port))
            ("init mongodb conn: " + mongodb_ip +":" +mongodb_port)
            return clients
        except  Exception as e:
            ("use mongodb user pass conn err: " + str(e))
            return False

#View all dbsdef get_mongodb_dbname():
    db_names_list = []
    db_names  = mongo_client.list_database_names()
    for db_name  in db_names:
        db_names_list.append(db_name)
    for filter_dbname in need_filter_dbname_list:
        if filter_dbname in db_names_list:
            db_names_list.remove(filter_dbname)
            ("delete need filter dbname: " + filter_dbname)
    # ("get all db_name: " +str(db_names_list))
    return db_names_list

#Query all tables in dbdef get_mongodb_tables(entid):
    db_collections_list = []
    db=mongo_client[entid]
    collections = db.list_collection_names()
    for collection  in collections:
        db_collections_list.append(collection)
    ("get " + entid + " all collections: " +str(db_collections_list))
    return db_collections_list

#Query index and whether the index in the collection is shardeddef get_index_key_tables(entid,collection_name):
    index_list = []
    formatted_results = []
    db=mongo_client[entid]
    collection=db[collection_name]
    indexes = collection.list_indexes()
    ns_name = entid + "." + collection_name
    for result  in indexes:
        formatted_result = {(): v for k, v in ()}
        each_key = formatted_result.get("KEY")
        ns_name = formatted_result.get("NS")
        ok_index = {key: value for key, value in each_key.items()}
        index_list.append(ok_index)
    index_list = result = [d for d in index_list if not (isinstance(d, dict) and '_id' in d and d['_id'] == 1)]

    collection_stats = ("collstats", collection_name)
    collection_sharded = collection_stats.get("sharded", False)
    if len(index_list) != 0:
        ("get collection " + ns_name + " index: " +str(index_list))
    #("get now In the collection " + ns_name + " sharded status: " +str(collection_sharded))
    return index_list,collection_sharded


#Create a collection indexdef craete_index(entid,collection_name,index):
    db=mongo_client[entid]
    collection=db[collection_name]
    ("need craete index: " + entid +"."+collection_name + " : "+ str(index))
    
    # index = (list(())[0], list(())[0])
    index = [(k, v) for k, v in ()]
    result = collection.create_index(index)
    ("mongodb " +entid +"."+collection_name + " create index return msg: " + str(result) )

# Check whether the corresponding dbname is shards, deprecateddef is_database_sharded(database_name):
    db = mongo_client["admin"]
    sharded_databases = ("listshards")["shards"]
    for shard in sharded_databases:
        if database_name in ("listdatabases")["databases"]:
            return True
    return False

#Create shard index slice keydef create_sharded_func(entid, collection_name, shard_key):
    db = mongo_client["admin"]
    collection_path = '{}.{}'.format(entid, collection_name)
    ("need craete sharded key : " + collection_path + " : " + str(shard_key))
    sharding_colunm,sharding_type =  "",""
    for key, value in shard_key.items():
        sharding_colunm= key 
        sharding_type = value
    try:
        ('enableSharding', entid)
    except  Exception as e:
        ("create dbname sharded key error: return: " + str(e))

    try:
        result = ('shardCollection', collection_path,key = {sharding_colunm:sharding_type})
        (entid + "." + collection_path + " create sharded key return: " + str(result))
    except  Exception as e:
        ("create sharded key error: return: " + str(e))

#Read the file to obtain the corresponding index and slice key informationdef read_file_index(index_file):
    index_list = []
    Shard_list = []
    with open(index_file, 'r') as f:
        for line in ():
            line = (" ", "")
            #Use mongodbShard: to distinguish which piece of key can be written            # print(line)
            if "mongodbShard:" not in line:
                table, key_str = ().split("=")
                key = ast.literal_eval(key_str)
                index_list.append({table: key})
            else:
                Shard_key_str = ().split("mongodbShard:")[1]
                Shard_key_str = ast.literal_eval(Shard_key_str)
                Shard_list.append(Shard_key_str)
    return index_list,Shard_list

#Get the timestamp from how many days agodef get_timestamp_days_ago(get_days):
    # Get the current date and time    now = ()
    # Subtract 30 days    date_30_days_ago = now - timedelta(days=int(get_days))
    # Convert the result to the hourly 00:00:00 of the day    date_start_of_day  = date_30_days_ago.replace(hour=0, minute=0, second=0, microsecond=0)
    # Convert the result to a timestamp    timestamp = int(date_start_of_day .timestamp())
    return timestamp

#Judge the corresponding string type and length to return the time field value that needs to be deleteddef if_string_type(data_stamp):
    del_timestamp = ""
    get_need_del_timestamp =  get_timestamp_days_ago(int(Del_day))
    if isinstance(data_stamp, str) and  len(data_stamp) == 10:
        del_timestamp = str(get_need_del_timestamp)

    if isinstance(data_stamp, str) and  len(data_stamp) == 13:
        del_timestamp = str(get_need_del_timestamp) + "000"

    if isinstance(data_stamp, int) and  len(str(data_stamp)) == 10:
        del_timestamp = get_need_del_timestamp

    if isinstance(data_stamp, int) and  len(str(data_stamp)) == 13:
        del_timestamp = int(get_need_del_timestamp) * 1000

    return del_timestamp

#Get a data in this collectiondef get_one_data(entid,collection_name):
    db=mongo_client[entid]
    collection=db[collection_name]
    Filter_conditions_key = str(need_del_table_field)
    result = collection.find_one({}, {**{Filter_conditions_key: 1}, '_id': 0})
    if result and Filter_conditions_key in result:
        start_time_value = (Filter_conditions_key)
        ("get "+ entid + "." + collection_name + " Corresponding " +Filter_conditions_key + " field value: " + str(start_time_value) )
        return start_time_value
    else:
        # ("No " +Filter_conditions_key + " field found in the document. return: " + str(result) )
        return False

# Delete historical data in the collection by datedef del_data(entid,collection_name,get_del_timestamp):
    db=mongo_client[entid]
    collection=db[collection_name]
    Filter_conditions_key = str(need_del_table_field)
    Filter_conditions_value = get_del_timestamp
    try:
        result = collection.delete_many({Filter_conditions_key: {"$lt": Filter_conditions_value}})
        (entid +" run sql: db"+"."+collection_name+".remove({"+Filter_conditions_key+ ":"+"{$lt:"+str(Filter_conditions_value) +"})")
        if result.deleted_count > 0:
            ("By date delete " + str(entid) + "." + collection_name + " less than " + str(get_del_timestamp) + " del document count: " + str(result.deleted_count))
    except Exception as e:
        ("Error occurred while deleting documents: " + str(e))

# Delete all historical data in this collectiondef del_all_data(entid,collection_name):
    db=mongo_client[entid]
    collection=db[collection_name]
    try:
        result = collection.delete_many({})
        if result.deleted_count > 0:
            (entid + " run sql: db"+"."+collection_name+".remove({})")
            (entid + "." + collection_name +  " del all document count: " + str(result.deleted_count))
    except Exception as e:
        (entid + "." + collection_name +   " del all document error: " + str(result) )

# Backup datadef dump_mongodb_data(dbname,table,not_quiet_dump,del_time):
    status_info = ["1"]
    if is_del_bakcup_data:
        
        if (mongodump_command_path):
            run_status = " && echo $?"
            run_commnd = ""
            if not_quiet_dump:
                if mongodb_auth:
                    #run_commnd =  mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " --authenticationDatabase=" +mongodb_auth_db + " -u " + mongodb_user + " -p " + mongodb_passwd +  " -d " + dbname + " -c " + table  + " -q '{" + need_del_table_field + ": {" +   +  "}}'"  + " -o " +  bakcup_dir_path
                    run_command = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} --authenticationDatabase={mongodb_auth_db} -u {mongodb_user} -p {mongodb_passwd} -d {dbname} -c {table} -q '{{\"{need_del_table_field}\": {{\"$lt\": \"{del_time}\"}}}}' -o {bakcup_dir_path}"
                else:
                    # run_commnd =  mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " -d " + dbname + " -c " + table  + " -o " +  bakcup_dir_path
                    run_commnd =   f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} -d {dbname} -c {table}  -q '{{\"{need_del_table_field}\": {{\"$lt\": \"{del_time}\"}}}}' -o {bakcup_dir_path}"
            else:
                if mongodb_auth:
                    # run_commnd =  mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " --authenticationDatabase=" +mongodb_auth_db + " -u " + mongodb_user + " -p " + mongodb_passwd +  " -d " + dbname + " -c " + table  + " -o " +  bakcup_dir_path
                    run_command = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} --authenticationDatabase={mongodb_auth_db} -u {mongodb_user} -p {mongodb_passwd} -d {dbname} -c {table}  -o {bakcup_dir_path}"

                else:
                    # run_commnd =  mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " -d " + dbname + " -c " + table  + " -o " +  bakcup_dir_path
                    run_commnd =   f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} -d {dbname} -c {table}  -o {bakcup_dir_path}"
            ("run command: " + run_commnd)
            try:
                msg = (run_commnd + run_status)
                status_info = [() for line in ()]
                ("mongodump command result: " + str(status_info))
            except Exception as e:
                ("mongodump command error: " + str(e))
        else:
            ("mongodump command file not exists ," +  mongodump_command_path)
    else:
        ("config file not set is_del_bakcup_data = True, not dump data")
    return status_info

if __name__=="__main__":
    cfgpath = "./cfg/"
    conf = ()
    (cfgpath)
    mongodb_ip = ("main", "mongodb_ip")
    mongodb_port = ("main", "mongodb_port")
    mongodb_auth = ("main", "mongodb_auth")
    mongodb_user = ("main", "mongodb_user")
    mongodb_passwd = ("main", "mongodb_passwd")
    mongodb_auth_db = ("main", "mongodb_auth_db")
    need_filter_dbname = ("main", "need_filter_dbname")
    is_del_bakcup_data = ("main", "is_del_bakcup_data")
    bakcup_dir_path = ("main", "bakcup_dir_path")
    mongodump_command_path = ("main", "mongodump_command_path")
    Del_day = ("main", "Del_day")
    need_del_table_field = ("main", "need_del_table_field")
    need_del_table_list = ("main", "need_del_table_list")
    need_del_table_list = [item for item in need_del_table_list.split(",") if item != '']

    need_del_null_table_list = ("main", "need_del_null_table_list")
    need_del_null_table_list = [item for item in need_del_null_table_list.split(",") if item != '']
    auth_get_entid = ("main", "auth_get_entid")
    need_filter_dbname_list = [item for item in need_filter_dbname.split(",") if item != '']
    
    #Get configuration items    all_ent_id = ("main", "ent_id")
    get_dbname_list = all_ent_id.split(",")
    ("./cfg/")
    logger = ("rotatfile")

    # Initialize MongoDB    mongo_client = init_mongodb(mongodb_auth)
    if mongo_client:
        ("MongoDB init successfully")
    else:
        ("Failed to initialize MongoDB")
        (10)

    if auth_get_entid:
        get_dbname_list = get_mongodb_dbname()
        ("get all dbname list: " + str(get_dbname_list))
    else:
        ("file get dbname list: " + str(get_dbname_list))

    for dbname in get_dbname_list:
        get_end_all_table = get_mongodb_tables(dbname)
        for table in need_del_table_list:
            get_one_data_mes = get_one_data(dbname,table)
            if table in get_end_all_table:
                get_index_key_tables(dbname,table)
            else:
                (dbname + " not have table: " + table)
                continue
                # break
            #Delete data by date            if get_one_data_mes:
                get_del_timestmap = if_string_type(get_one_data_mes)
                if dump_mongodb_data(dbname,table,True,get_del_timestmap)[0] == '0' or is_del_bakcup_data == False:
                    if get_del_timestmap:
                        del_data(dbname,table,get_del_timestmap)
                    else:
                        ("get del timestmap fail")
                else:
                    if is_del_bakcup_data == False:
                        ("is_del_bakcup_data seting False, dump mongodb data fail")
                    else:
                        ("dump mongodb data fail, but is del backup data")
        for null_table in need_del_null_table_list:
            if dump_mongodb_data(dbname,null_table,False,"1")[0] == '0'  or is_del_bakcup_data == False:
                if null_table in get_end_all_table:
                    #Delete all historical data                    del_all_data(dbname,null_table)
                else:
                    ( dbname +  " not have table: " + null_table)
            else:
                if is_del_bakcup_data == False:
                    ("is_del_bakcup_data seting False, dump mongodb data fail")
                else:
                    ("dump mongodb data fail, but is del backup data")
    mongo_client.close()
    ("MongoDB closed")

Configuration file

  • The configuration item roughly uses instructions
    • It supports data backup before deleting the specified time and is configured according to different configuration items;
    • Similarly, it can support no backup, or clean and delete, and configure according to different configuration items;
    • Query filtering according to fields.
[DEFAULT]
mongodb_ip = 10.130.47.197
mongodb_port = 40000
mongodb_auth = False
mongodb_user = admin
mongodb_passwd =  test@123
mongodb_auth_db = admin
#Filter dbnames that do not need to be processed from all dbnames, and use commas to splitneed_filter_dbname = local,config,admin
#Specify the set that needs to be deleted by date, split using commasneed_del_table_list = new_r_ags_e_back,call_detail_back

#Specify the collection fields that need to be deleted by dateneed_del_table_field = start_time
#Specify clear deleted collections and split them with commasneed_del_null_table_list = call_duration_cache,duration_cache

[main]
#Whether to automatically obtain all dbnames in the corresponding mongodbauth_get_entid = False
#Get dbname from configuration fileent_id  = 20241205,20250107
#How many days ago data needs to be deletedDel_day = 97
#Does the data need to be backed up?is_del_bakcup_data = False
#Backup Directorybakcup_dir_path = ./data
#Backup command pathmongodump_command_path = /home/devops/Python/Mongodb_del_history/mongodump

Script running

  • Script running
[devops@db1 Mongodb_del_history]$ tar xf Mongodb_del_history.
[devops@db1 Mongodb_del_history]$ cd Mongodb_del_history
[devops@db1 Mongodb_del_history]$ nohup ./del_history_data &
2025-01-06 14:15:01 139749303605056 del_history_data.py:24 INFO init mongodb conn: 10.130.47.197:40000
2025-01-06 14:15:01 139749303605056 del_history_data.py:303 INFO MongoDB init successfully
2025-01-06 14:15:01 139749303605056 del_history_data.py:39 INFO delete need filter dbname: local
2025-01-06 14:15:01 139749303605056 del_history_data.py:310 INFO get all dbname list: ['0103290010', '0103290012', '0103290013', '0103290015']
2025-01-06 14:15:01 139749303605056 del_history_data.py:321 ERROR 0103290010 not have table: jhk_task_status
2025-01-06 14:15:01 139749303605056 del_history_data.py:321 ERROR 0103290010 not have table: sd_call_detail_back
2025-01-06 14:15:01 139749303605056 del_history_data.py:229 INFO run command: /home/devops/Python/Mongodb_del_history/mongodump -h 10.130.47.197:40000 -d 0103290010 -c call_duration_cache  -o ./data
2025-01-06 14:15:01 139749303605056 del_history_data.py:233 INFO mongodump command result: ['0']
2025-01-06 14:15:01 139749303605056 del_history_data.py:229 INFO run command: /home/devops/Python/Mongodb_del_history/mongodump -h 10.130.47.197:40000 -d 0103290010 -c duration_cache  -o ./data
2025-01-06 14:15:01 139749303605056 del_history_data.py:233 INFO mongodump command result: ['0']
2025-01-06 14:15:01 139749303605056 del_history_data.py:347 INFO MongoDB closed

Binary file program download

  • Download using the link
wget /LinuxPackage/Python/del_history_data

This is the article about how to clean mongodb historical data in batches. For more information about cleaning mongodb historical data, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!