SoFunction
Updated on 2025-03-02

Project Practice for Implementing Monitoring Warning of Database Tables in Python

Introduction

Use Python to implement the monitoring and alarm function of database tables, and send alarm information to DingTalk group through DingTalk robot

Implementing the basic functions of data quality in DataWorks. Of course, there are many types of rules for DW's data quality, which is relatively convenient to use. Here, only two of the functions of the rules are simply implemented, for reference only;

Please give me some advice when using Python for the first time

Usage Tools: MaxCompute

1. Create a table

1. tmp_monitor_tbl_info

CREATE TABLE IF NOT EXISTS puture_bigdata.tmp_monitor_tbl_info (
      `id`					STRING COMMENT 'Table number id'
	, `tbl_name`			STRING COMMENT 'Table name'
	, `pt_format`			STRING COMMENT 'Parallel format: yyyy-MM-dd,yyyyMMdd, etc.'
	, `val_type`			STRING COMMENT 'Value type: table row number, period value, etc.'
    , `monitor_flag` 		int COMMENT 'Monitoring identifier: 0: No monitoring, 1: Monitor;'
    , `rule_code` 			int COMMENT 'Rules encoding: 1: table row number, last cycle difference value, 2: table row number, fixed value, etc.'
    , `rule_type`			STRING COMMENT 'Rule type: table row number, last cycle difference value; table row number, fixed value; compare with fixed value, etc.'
    , `expect_val` 			int COMMENT 'Expected Value'
    , `tbl_sort_code`       int COMMENT 'Table type encoding: 0:other(Dimension table class), 1:Amazon, 2:Small and medium-sized platforms, 3:Market data wait'
    , `tbl_sort_name`       STRING COMMENT 'Table type name: 0:other(Dimension table class), 1:Amazon, 2:Small and medium-sized platforms, 3:Market data wait'
    , `pt_num`				INT COMMENT 'Partition date difference'
) COMMENT 'Data monitoring table information' 
tblproperties ("transactional"="true") 
;
-- Insert data
INSERT INTO TABLE puture_bigdata_dev.tmp_monitor_tbl_info
SELECT * FROM (
  VALUES  (1 , 'ods_amazon_amz_customer_returns_df',              'yyyyMMdd', 'Table row count', 1, 1, 'Number of table rows, last cycle difference', 0,      1, 'Amazon' , -1)     
        , (2 , 'ods_amazon_amz_flat_file_all_orders_df',          'yyyyMMdd', 'Table row count', 1, 1, 'Number of table rows, last cycle difference', 0,      1, 'Amazon' , -1)         
        , (3 , 'dim_sys_salesman_info_df',                        'yyyyMMdd', 'Table row count', 1, 1, 'Number of table rows, last cycle difference', 0,      0, 'other' , -1)  
) AS table_name(id, tbl_name, pt_format, val_type, monitor_flag, rule_code, rule_type, expect_val, tbl_sort_code, tbl_sort_name, pt_num) ;

2. tmp_monitor_tbl_info_log_di

CREATE TABLE IF NOT EXISTS puture_bigdata_dev.tmp_monitor_tbl_info_log_di (
	  `id`					STRING COMMENT 'monitoridcoding:md5(Table name_Partition)_Hour'
	, `tbl_name`			STRING COMMENT 'Table name'
	, `stat_time`			STRING COMMENT 'Statistics time'
	, `pt_format`			STRING COMMENT 'Partition格式: yyyy-MM-dd,yyyyMMdd wait'
	, `stat_pt`				STRING COMMENT '统计Partition'
	, `val_type`			STRING COMMENT 'Value Type: Number of rows,周期值wait'
    , `val` 				int COMMENT 'Statistics'
    , `rule_code` 			int COMMENT '规则coding: 1:Number of rows,Last cycle difference, 2:Number of rows,Fixed value wait'
    , `rule_type`			STRING COMMENT 'Rule type: Number of rows,Last cycle difference; Number of rows,Fixed value; 与Fixed value比较 wait'
    , `expect_val` 			int COMMENT 'Expected value'
    , `is_exc` 				int COMMENT 'Is it abnormal?: 0:no,1:yes,default value0'
    , `tbl_sort_code`       int COMMENT '表类型coding: 0:other(Dimension table class), 1:Amazon, 2:Small and medium-sized platforms, 3:Market data wait'
    , `tbl_sort_name`       STRING COMMENT 'Table type name: 0:other(Dimension table class), 1:Amazon, 2:Small and medium-sized platforms, 3:Market data wait'
) COMMENT '数据monitor信息记录表'
PARTITIONED BY (pt STRING COMMENT 'Data date, yyyy-MM-dd') ;

2. Program development

1. Data checker program

'''PyODPS 3
 Please make sure not to use download data from MaxCompute to process it.  Download data operations often include the open_reader of Table/Instance and the to_pandas method of DataFrame. 
 It is recommended to use PyODPS DataFrame (created from MaxCompute tables) and MaxCompute SQL to process data.
 For more detailed content, please refer to: /document_detail/
 '''

import os
from odps import ODPS, DataFrame
from datetime import datetime, timedelta
from dateutil import parser
.use_instance_tunnel = True

# Get the current timenow_time = ().strftime('%Y-%m-%d %H:%M:%S')
print(now_time)
pt = args['date']
print(pt)
date = (pt, "%Y-%m-%d") 

# Monitoring table list tbl_sort_code -> 0: Others (dimensional table class), 1: Amazon, 2: Small and medium-sized platforms, 3: Market datasql_tbl_info = """
SELECT * FROM puture_bigdata.tmp_monitor_tbl_info
WHERE monitor_flag = 1 AND tbl_sort_code = 3
"""

# Results tableres_tbl_name = "puture_bigdata.tmp_monitor_tbl_info_log_di"

# Statistics SQL code -- table row count, last cycle differencedef sql_upper_period_diff():
    sql = f"""
    set =true ;

    INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}')
    SELECT 
          
        , a.tbl_name
        , a.stat_time
        , a.pt_format
        , a.stat_pt
        , a.val_type
        , 
        , a.rule_code
        , a.rule_type
        , a.expect_val
        , IF ( = 0, 1, (IF (( - NVL(,0)) >= {expect_val}, 0, 1 ))) AS is_exc
        , a.tbl_sort_code
        , a.tbl_sort_name 
    FROM (
        SELECT 
              concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id
            , '{tbl_name}' AS tbl_name
            , '{now_time}' AS stat_time
            , '{pt_format}' AS pt_format
            , date_format('{date_str}' ,'{pt_format}') AS stat_pt
            , '{val_type}' AS val_type
            , COUNT(1) AS val 
            , '{rule_code}' AS rule_code
            , '{rule_type}' AS rule_type
            , {expect_val} AS expect_val
            , {tbl_sort_code} AS tbl_sort_code
            , '{tbl_sort_name}' AS tbl_sort_name
        FROM puture_bigdata.{tbl_name}
        WHERE pt = date_format('{date_str}' ,'{pt_format}')
    ) a 
    LEFT JOIN 
    (
        SELECT tbl_name, val FROM (
            SELECT tbl_name, val
                , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC ) AS rn 
            FROM {res_tbl_name}
            WHERE pt = DATE_ADD('{date_str}', -1)
        ) WHERE rn = 1
    ) b
    ON a.tbl_name = b.tbl_name
    ;
    """
    return sql

# table row number, fixed valuedef sql_line_fixed_val():
    sql = f"""
    set =true ;

    INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}')
    SELECT 
          concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id
        , '{tbl_name}' AS tbl_name
        , '{now_time}' AS stat_time
        , '{pt_format}' AS pt_format
        , date_format('{date_str}' ,'{pt_format}') AS stat_pt
        , '{val_type}' AS val_type
        , COUNT(1) AS val 
        , '{rule_code}' AS rule_code
        , '{rule_type}' AS rule_type
        , {expect_val} AS expect_val
        , IF (COUNT(1) >= {expect_val}, 0, 1 ) AS is_exc
        , {tbl_sort_code} AS tbl_sort_code
        , '{tbl_sort_name}' AS tbl_sort_name
    FROM puture_bigdata.{tbl_name}
    WHERE pt = date_format('{date_str}' ,'{pt_format}') ;
    """
    return sql

# Execute monitoring statistics codedef ex_monitor(sql: str):
    try :
        # print (sql)
        o.execute_sql(sql, hints={'': True , "":"script"})
        print("{}: Run successfully".format(tbl_name) )
    except Exception as e:
        print('{}: Running exception ======> '.format(tbl_name) + str(e))


if __name__ == '__main__':
    try :
        with o.execute_sql(sql_tbl_info, hints={'': True}).open_reader() as reader:

            for row_record in reader:
                # print(row_record) # Print a data value                tbl_name = row_record.tbl_name
                pt_format = row_record.pt_format
                val_type = row_record.val_type
                monitor_flag = row_record.monitor_flag
                rule_code = row_record.rule_code
                rule_type = row_record.rule_type
                expect_val = row_record.expect_val
                tbl_sort_code = row_record.tbl_sort_code
                tbl_sort_name = row_record.tbl_sort_name
                pt_num = row_record.pt_num
                date_str = (date + timedelta(days=pt_num)).strftime('%Y-%m-%d')
                
                if rule_code == 1 :
                    ex_monitor(sql_upper_period_diff())
                elif rule_code == 2 :
                    ex_monitor(sql_line_fixed_val())
                else :
                    print("Unknown rules!!!")
                           
    except Exception as e:
        print('abnormal ======> ' + str(e))

2. Alarm information push program

'''PyODPS 3
 Please make sure not to use download data from MaxCompute to process it.  Download data operations often include the open_reader of Table/Instance and the to_pandas method of DataFrame. 
 It is recommended to use PyODPS DataFrame (created from MaxCompute tables) and MaxCompute SQL to process data.
 For more detailed content, please refer to: /document_detail/
 '''

import json
import requests 
from datetime import datetime
import os
from odps import ODPS, DataFrame

date_str = args['date']

# Interface address and token informationurl = '/robot/send?access_token=***********************'

now_time = ().strftime('%Y-%m-%d %H:%M:%S')
print (now_time)

sql_query = f"""
SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc
FROM (
    SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc
        , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC) AS rn 
    FROM puture_bigdata_dev.tmp_monitor_tbl_info_log_di 
    WHERE pt = '{date_str}' 
         AND tbl_sort_code = 1 -- Table types
) a
WHERE rn = 1 AND is_exc = 1 
"""

# Dingding Robot, Send Messagedef dd_robot(url:str, content: str):
  HEADERS = {"Content-Type": "application/json;charset=utf-8"}
  Keywords to be set in #content  data_info = {
    "msgtype": "text",
    "text": {
    "content": content
    },
    "isAtAll": False
    #This is someone who needs @ to configure     # ,"at": {"atMobiles": ["15xxxxxx06",'18xxxxxx1']}
  }
  value = (data_info)
  response = (url,data=value,headers=HEADERS)
  if ()['errmsg']!='ok':
    print()

# Main functionif __name__ == '__main__': # py3 can be omitted    try :
        with o.execute_sql(sql_query, hints={'': True}).open_reader() as reader:
            result_rows = list(reader) # Read all result rows            result_count = len(result_rows) # Get the number of results            #print("Result Number:", result_count) # Print the number of results
            if result_count > 0 :
                for row in result_rows:
                    tbl_name = row.tbl_name
                    stat_time = row.stat_time
                    stat_pt = row.stat_pt
                    val_type = row.val_type
                    val = 
                    rule_type = row.rule_type
                    expect_val = row.expect_val
                    #print (tbl_name)
                    content = "Data quality(DQC)Verification alarm \n  "
                    content = content + "【Object Name】:" + tbl_name + " \n  "
                    content = content + "【Actual partition】:pt=" + stat_pt + " \n  "
                    content = content + "【Trigger Rule】: " + rule_type + " | Current sample value: " + val + " | Threshold: " + expect_val + " \n  "
                    content = content + now_time  + " \n  "
                    dd_robot(url, content)
            else :
                print ("No abnormal situation;")
    except Exception as e:
        print ('abnormal ========>' + str(e) )

3. Alarm sample

Data Quality (DQC) verification alarm
[Object Name]: dws_amazon_market_sales_stat_di
[Actual partition]: pt=20240103
[Trigger Rule]: Number of table rows, fixed value | Current sample value: 617 | Threshold: 650
  2024-01-04 02:54:44 

This is the article about Python's project practice of implementing monitoring warnings for database tables. For more related Python database table monitoring warnings, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!