Preface
When we build offline data warehouses or migrate data, we usually use tools such as sqoop and datax to operate. Sqoop and datax have their own advantages, and the advantages of datax are also very obvious. It is based on memory, so the speed is very fast. Then writing a json file when performing full synchronization is a very tedious task. Can we write scripts to simplify the tedious things? Next I will share such a python script that fully synchronizes mysql to hive to automatically generate json files.
1. Display scripts
# coding=utf-8 import json import getopt import os import sys import pymysql # MySQL related configuration needs to be modified according to actual conditionsmysql_host = "XXXXXX" mysql_port = "XXXX" mysql_user = "XXX" mysql_passwd = "XXXXXX" # HDFS NameNode related configuration needs to be modified according to actual conditionshdfs_nn_host = "XXXXXX" hdfs_nn_port = "XXXX" # The target path to generate the configuration file can be modified according to the actual situationoutput_path = "/XXX/XXX/XXX" def get_connection(): return (host=mysql_host, port=int(mysql_port), user=mysql_user, password=mysql_passwd) def get_mysql_meta(database, table): connection = get_connection() cursor = () sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION" (sql, [database, table]) fetchall = () () () return fetchall def get_mysql_columns(database, table): return list(map(lambda x: x[0], get_mysql_meta(database, table))) def get_hive_columns(database, table): def type_mapping(mysql_type): mappings = { "bigint": "bigint", "int": "bigint", "smallint": "bigint", "tinyint": "bigint", "decimal": "string", "double": "double", "float": "float", "binary": "string", "char": "string", "varchar": "string", "datetime": "string", "time": "string", "timestamp": "string", "date": "string", "text": "string" } return mappings[mysql_type] meta = get_mysql_meta(database, table) return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)) def generate_json(source_database, source_table): job = { "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [{ "reader": { "name": "mysqlreader", "parameter": { "username": mysql_user, "password": mysql_passwd, "column": get_mysql_columns(source_database, source_table), "splitPk": "", "connection": [{ "table": [source_table], "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database] }] } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port, "fileType": "text", "path": "${targetdir}", "fileName": source_table, "column": get_hive_columns(source_database, source_table), "writeMode": "append", "fieldDelimiter": "\t", "compress": "gzip" } } }] } } if not (output_path): (output_path) with open((output_path, ".".join([source_database, source_table, "json"])), "w") as f: (job, f) def main(args): source_database = "" source_table = "" options, arguments = (args, '-d:-t:', ['sourcedb=', 'sourcetbl=']) for opt_name, opt_value in options: if opt_name in ('-d', '--sourcedb'): source_database = opt_value if opt_name in ('-t', '--sourcetbl'): source_table = opt_value generate_json(source_database, source_table) if __name__ == '__main__': main([1:])
2. Preparation for use
1. Install python environment
Here I installed python3 environment
sudo yum install -y python3
2. Install EPEL
EPEL (Extra Packages for Enterprise Linux) is a software repository maintained by the Fedora Special Interest Group, providing a large number of packages not available in the official RHEL or CentOS repository. When you need to install some software on CentOS or RHEL systems that are not in the official repository, you usually install epel - release first
sudo yum install -y epel-release
3. Install the third-party modules required for script execution
pip3 install pymysql pip3 install cryptography
Here, due to marking problems, cryptography cannot be installed and updated with pip and setuptools.
pip3 install --upgrade pip pip3 install --upgrade setuptools
Reinstall cryptography
pip3 install cryptography
3. How to use scripts
1. Configuration script
First, modify the script related configuration according to your own server
2. Create .py file
vim /xxx/xxx/xxx/gen_import_config.py
3. Execute the script
python3 /script path/gen_import_config.py -d database name -t table name
4. Test whether the generated json file is available
-p"-Dtargetdir=/Table stored path in hdfs"/generated json file path
When executing, you must first make sure that the targetdir target address exists on HDFS. If there is no need to create it, execute it again.
This is the article about Python scripts implementing full synchronization of datax to hive. For more related content on Python datax to fully synchronize mysql to hive, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!