SoFunction
Updated on 2025-03-03

Python scripts implement full datax synchronization mysql to hive

Preface

When we build offline data warehouses or migrate data, we usually use tools such as sqoop and datax to operate. Sqoop and datax have their own advantages, and the advantages of datax are also very obvious. It is based on memory, so the speed is very fast. Then writing a json file when performing full synchronization is a very tedious task. Can we write scripts to simplify the tedious things? Next I will share such a python script that fully synchronizes mysql to hive to automatically generate json files.

1. Display scripts

# coding=utf-8
import json
import getopt
import os
import sys
import pymysql

# MySQL related configuration needs to be modified according to actual conditionsmysql_host = "XXXXXX"
mysql_port = "XXXX"
mysql_user = "XXX"
mysql_passwd = "XXXXXX"

# HDFS NameNode related configuration needs to be modified according to actual conditionshdfs_nn_host = "XXXXXX"
hdfs_nn_port = "XXXX"

# The target path to generate the configuration file can be modified according to the actual situationoutput_path = "/XXX/XXX/XXX"


def get_connection():
    return (host=mysql_host, port=int(mysql_port), user=mysql_user, password=mysql_passwd)


def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = ()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    (sql, [database, table])
    fetchall = ()
    ()
    ()
    return fetchall


def get_mysql_columns(database, table):
    return list(map(lambda x: x[0], get_mysql_meta(database, table)))


def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta))


def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not (output_path):
        (output_path)
    with open((output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        (job, f)


def main(args):
    source_database = ""
    source_table = ""

    options, arguments = (args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)


if __name__ == '__main__':
    main([1:])

2. Preparation for use

1. Install python environment

Here I installed python3 environment

sudo yum install -y python3

2. Install EPEL

EPEL (Extra Packages for Enterprise Linux) is a software repository maintained by the Fedora Special Interest Group, providing a large number of packages not available in the official RHEL or CentOS repository. When you need to install some software on CentOS or RHEL systems that are not in the official repository, you usually install epel - release first

sudo yum install -y epel-release

3. Install the third-party modules required for script execution

pip3 install pymysql
pip3 install cryptography

Here, due to marking problems, cryptography cannot be installed and updated with pip and setuptools.

pip3 install --upgrade pip
pip3 install --upgrade setuptools

Reinstall cryptography

pip3 install cryptography

3. How to use scripts

1. Configuration script

First, modify the script related configuration according to your own server

2. Create .py file

vim /xxx/xxx/xxx/gen_import_config.py

3. Execute the script

python3 /script path/gen_import_config.py -d database name -t table name

4. Test whether the generated json file is available

-p"-Dtargetdir=/Table stored path in hdfs"/generated json file path

When executing, you must first make sure that the targetdir target address exists on HDFS. If there is no need to create it, execute it again.

This is the article about Python scripts implementing full synchronization of datax to hive. For more related content on Python datax to fully synchronize mysql to hive, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!