SoFunction
Updated on 2025-04-11

How to manage connections to multiple PostgreSQL databases in Python

It mainly implements the connection to the corresponding PostgreSQL database by reading PostgreSQL service information in the configuration file. Two connection functions are provided.postgresql_connectandpostgresql_connect_encode, are ordinary and encoded parameters connection functions respectively.

Operation steps

1. Import related libraries

from pyhive import hive
import cx_Oracle
import pymysql
import pymssql
from optparse import OptionParser
import logging
import sys
import traceback
import re
import warnings
with warnings.catch_warnings(record=True):
    import psycopg2
reload(sys)

2. Configure PostgreSQL connection information

Here is a list containing multiple PostgreSQL service configuration information.pgconfigs. Each configuration item consists of a database name, user, password, host, and port.

pgconfigs = [
    {"asset_factory": {"database": "asset_factory", "user": "asset", "password": "Ro20ot16", "host": "10.251.80.202", "port": "65432"}},
    {"asset_register": {"database": "asset_register", "user": "asset", "password": "Ro20ot16", "host": "10.251.80.202", "port": "65432"}},
    {"creditdb": {"database": "creditdb", "user": "credit", "password": "hHJ98#pE40Y", "host": "10.251.101.175", "port": "18923"}},
    {"asset_portal": {"database": "asset_portal", "user": "asset", "password": "Ro20ot16", "host": "10.251.80.202", "port": "65432"}},
    {"tyjh": {"database": "tyjh", "user": "tyjh", "password": "k5y2dwoKcFm&^OsW", "host": "10.251.90.35", "port": "18922"}},
    {"cwrl": {"database": "cwrl", "user": "cwjh_yaxin", "password": "eKYy4R3&MbaaY3Zy", "host": "10.251.90.34", "port": "18921"}},
    {"dc_new_pg": {"database": "dc", "user": "dc", "password": "Ojjkcy@jVxKIeo5C", "host": "10.251.90.36", "port": "18923"}},
    {"credit_data": {"database": "credit_data", "user": "credit_data", "password": "@G*1f*3$3DzTH6%o", "host": "10.251.90.134", "port": "18923"}},
    {"new_jf_pg_cd": {"database": "accthuiju", "user": "datacenter", "password": "KridU593&%rj90", "host": "133.37.116.192", "port": "18921"}},
    {"new_accthuiju": {"database": "accthuiju", "user": "itf_ods", "password": "2dH~fZ^8", "host": "10.251.64.226", "port": "18921"}},
    {"crm3huijupg": {"database": "crm3huijupg", "user": "datacenter", "password": "lp*36^YD", "host": "10.251.65.93", "port": "18921"}},
    {"crm3hispg": {"database": "crm3hispg", "user": "datacenter", "password": "~4nD_jWG", "host": "10.251.64.196", "port": "18921"}},
    {"hana": {"database": "hana", "user": "hana", "password": "gvptHKXVNKEGw7pk", "host": "10.251.90.94", "port": "18924"}},
    {"dataos_71_pg_dev": {"database": "dacp", "user": "dacp", "password": "jxFgCKv9GJw2ohS3", "host": "10.251.110.104", "port": "18921"}}
]

3. Define connection functions

Normal connection function

def postgresql_connect(servicename):
    global job_task_content_ori
    global job_resource_conn_info
    global job_hdfs_dir_info
    try:
        for subconfig in pgconfigs:
            if servicename in subconfig:
                pgconfig = subconfig[servicename]
                conn = (database=pgconfig["database"], user=pgconfig["user"],
                                        password=pgconfig["password"], host=pgconfig["host"], port=pgconfig["port"])
                return conn
        print("servicename %s not found in config dictionary" % servicename)
        (6)
    except:
        traceback.print_exc()
        print("connect pg error " + pgconfig["database"])
        (6)

In the above codepostgresql_connectfunction:

  • TraversalpgconfigsList, search and parametersservicenameMatched configuration.
  • Establish a PostgreSQL connection using the matching configuration.
  • If no matching service name is found, print the error message and exit the program.

Connection function with encoded parameters

def postgresql_connect_encode(servicename, encoding='UTF-8'):
    global job_task_content_ori
    global job_resource_conn_info
    global job_hdfs_dir_info
    try:
        for subconfig in pgconfigs:
            if servicename in subconfig:
                pgconfig = subconfig[servicename]
                conn = (database=pgconfig["database"], user=pgconfig["user"],
                                        password=pgconfig["password"], host=pgconfig["host"], port=pgconfig["port"],
                                        client_encoding=encoding)
                return conn
        print("servicename %s not found in config dictionary" % servicename)
        (6)
    except:
        traceback.print_exc()
        print("connect pg error " + pgconfig["database"])
        (6)

postgresql_connect_encodeThe function is similar to the previous function, the only difference is that it accepts an optional encoding parameterencodingand pass it toto set client encoding.

4. Explain the key points

Configuration ManagementpgconfigsThe list contains multiple database configurations to facilitate quick search of matching configuration items based on the service name.

Error handling: Print the stack trace to help debug and exit the program when the connection fails.

Coding management: For connections that require a specific encoding, additional functions are provided to set the encoding.

5. Use examples

Suppose you want to connect to the namedataos_71_pg_devThe PostgreSQL service can use these functions like the following:

# Normal connectionconn = postgresql_connect("dataos_71_pg_dev")

# Connection with encoded parametersconn_with_encoding = postgresql_connect_encode("dataos_71_pg_dev", encoding='UTF-8')

Complete python

#!/data/apps/python2715/bin/python
# -*- coding:utf-8 -*-

from pyhive import hive
import cx_Oracle
import pymysql
import pymssql
from optparse import OptionParser
import logging
import sys
import traceback
import re
import warnings
with warnings.catch_warnings(record=True):
    import psycopg2
reload(sys)

pgconfigs = [
    {"asset_factory": {"database": "asset_factory", "user": "asset", "password": "Ro20ot16", "host": "10.251.80.202", "port": "65432"}},
    {"asset_register": {"database": "asset_register", "user": "asset", "password": "Ro20ot16", "host": "10.251.80.202", "port": "65432"}},
    {"creditdb": {"database": "creditdb", "user": "credit", "password": "hHJ98#pE40Y", "host": "10.251.101.175", "port": "18923"}},
    {"asset_portal": {"database": "asset_portal", "user": "asset", "password": "Ro20ot16", "host": "10.251.80.202", "port": "65432"}},
    {"tyjh": {"database": "tyjh", "user": "tyjh", "password": "k5y2dwoKcFm&^OsW", "host": "10.251.90.35", "port": "18922"}},
    {"cwrl": {"database": "cwrl", "user": "cwjh_yaxin", "password": "eKYy4R3&MbaaY3Zy", "host": "10.251.90.34", "port": "18921"}},
    {"dc_new_pg": {"database": "dc", "user": "dc", "password": "Ojjkcy@jVxKIeo5C", "host": "10.251.90.36", "port": "18923"}},
    {"credit_data": {"database": "credit_data", "user": "credit_data", "password": "@G*1f*3$3DzTH6%o", "host": "10.251.90.134", "port": "18923"}},
    {"new_jf_pg_cd": {"database": "accthuiju", "user": "datacenter", "password": "KridU593&%rj90", "host": "133.37.116.192", "port": "18921"}},
    {"new_accthuiju": {"database": "accthuiju", "user": "itf_ods", "password": "2dH~fZ^8", "host": "10.251.64.226", "port": "18921"}},
    {"crm3huijupg": {"database": "crm3huijupg", "user": "datacenter", "password": "lp*36^YD", "host": "10.251.65.93", "port": "18921"}},
    {"crm3hispg": {"database": "crm3hispg", "user": "datacenter", "password": "~4nD_jWG", "host": "10.251.64.196", "port": "18921"}},
    {"hana": {"database": "hana", "user": "hana", "password": "gvptHKXVNKEGw7pk", "host": "10.251.90.94", "port": "18924"}},
    {"dataos_71_pg_dev": {"database": "dacp", "user": "dacp", "password": "jxFgCKv9GJw2ohS3", "host": "10.251.110.104", "port": "18921"}}
]


def postgresql_connect(servicename):
    global job_task_content_ori
    global job_resource_conn_info
    global job_hdfs_dir_info
    try:
        for subconfig in pgconfigs:
           if subconfig.has_key(servicename):
                pgconfig = subconfig[servicename]
                conn = (database=pgconfig["database"], user=pgconfig["user"],
                                        password=pgconfig["password"], host=pgconfig["host"], port=pgconfig["port"])
                return conn
        print("servinamename %s not found in config dictionary" % servicename)
        (6)
    except:
        traceback.print_exc()
        print("connect pg error " + pgconfig["database"])
        (6)


def postgresql_connect_encode(servicename, encoding='UTF-8'):
    global job_task_content_ori
    global job_resource_conn_info
    global job_hdfs_dir_info
    try:
        for subconfig in pgconfigs:
            if subconfig.has_key(servicename):
                pgconfig = subconfig[servicename]
                conn = (database=pgconfig["database"], user=pgconfig["user"],
                                        password=pgconfig["password"], host=pgconfig["host"], port=pgconfig["port"],
                                        client_encoding=encoding)
                return conn
        print("servinamename %s not found in config dictionary" % servicename)
        (6)
    except:
        traceback.print_exc()
        print("connect pg error " + pgconfig["database"])
        (6)

Summarize

This code shows how to manage connections to multiple PostgreSQL databases through configuration files, providing ordinary connections and connection functions with encoded parameters, ensuring code flexibility and reusability. The reliability and maintainability of the code are also improved by handling exceptions and providing detailed error information.

The above is the detailed content of how Python manages connections to multiple PostgreSQL databases. For more information about Python PostgreSQL database connections, please follow my other related articles!