Introduction
Use Python to implement the monitoring and alarm function of database tables, and send alarm information to DingTalk group through DingTalk robot
Implementing the basic functions of data quality in DataWorks. Of course, there are many types of rules for DW's data quality, which is relatively convenient to use. Here, only two of the functions of the rules are simply implemented, for reference only;
Please give me some advice when using Python for the first time
Usage Tools: MaxCompute
1. Create a table
1. tmp_monitor_tbl_info
CREATE TABLE IF NOT EXISTS puture_bigdata.tmp_monitor_tbl_info ( `id` STRING COMMENT 'Table number id' , `tbl_name` STRING COMMENT 'Table name' , `pt_format` STRING COMMENT 'Parallel format: yyyy-MM-dd,yyyyMMdd, etc.' , `val_type` STRING COMMENT 'Value type: table row number, period value, etc.' , `monitor_flag` int COMMENT 'Monitoring identifier: 0: No monitoring, 1: Monitor;' , `rule_code` int COMMENT 'Rules encoding: 1: table row number, last cycle difference value, 2: table row number, fixed value, etc.' , `rule_type` STRING COMMENT 'Rule type: table row number, last cycle difference value; table row number, fixed value; compare with fixed value, etc.' , `expect_val` int COMMENT 'Expected Value' , `tbl_sort_code` int COMMENT 'Table type encoding: 0:other(Dimension table class), 1:Amazon, 2:Small and medium-sized platforms, 3:Market data wait' , `tbl_sort_name` STRING COMMENT 'Table type name: 0:other(Dimension table class), 1:Amazon, 2:Small and medium-sized platforms, 3:Market data wait' , `pt_num` INT COMMENT 'Partition date difference' ) COMMENT 'Data monitoring table information' tblproperties ("transactional"="true") ;
-- Insert data INSERT INTO TABLE puture_bigdata_dev.tmp_monitor_tbl_info SELECT * FROM ( VALUES (1 , 'ods_amazon_amz_customer_returns_df', 'yyyyMMdd', 'Table row count', 1, 1, 'Number of table rows, last cycle difference', 0, 1, 'Amazon' , -1) , (2 , 'ods_amazon_amz_flat_file_all_orders_df', 'yyyyMMdd', 'Table row count', 1, 1, 'Number of table rows, last cycle difference', 0, 1, 'Amazon' , -1) , (3 , 'dim_sys_salesman_info_df', 'yyyyMMdd', 'Table row count', 1, 1, 'Number of table rows, last cycle difference', 0, 0, 'other' , -1) ) AS table_name(id, tbl_name, pt_format, val_type, monitor_flag, rule_code, rule_type, expect_val, tbl_sort_code, tbl_sort_name, pt_num) ;
2. tmp_monitor_tbl_info_log_di
CREATE TABLE IF NOT EXISTS puture_bigdata_dev.tmp_monitor_tbl_info_log_di ( `id` STRING COMMENT 'monitoridcoding:md5(Table name_Partition)_Hour' , `tbl_name` STRING COMMENT 'Table name' , `stat_time` STRING COMMENT 'Statistics time' , `pt_format` STRING COMMENT 'Partition格式: yyyy-MM-dd,yyyyMMdd wait' , `stat_pt` STRING COMMENT '统计Partition' , `val_type` STRING COMMENT 'Value Type: Number of rows,周期值wait' , `val` int COMMENT 'Statistics' , `rule_code` int COMMENT '规则coding: 1:Number of rows,Last cycle difference, 2:Number of rows,Fixed value wait' , `rule_type` STRING COMMENT 'Rule type: Number of rows,Last cycle difference; Number of rows,Fixed value; 与Fixed value比较 wait' , `expect_val` int COMMENT 'Expected value' , `is_exc` int COMMENT 'Is it abnormal?: 0:no,1:yes,default value0' , `tbl_sort_code` int COMMENT '表类型coding: 0:other(Dimension table class), 1:Amazon, 2:Small and medium-sized platforms, 3:Market data wait' , `tbl_sort_name` STRING COMMENT 'Table type name: 0:other(Dimension table class), 1:Amazon, 2:Small and medium-sized platforms, 3:Market data wait' ) COMMENT '数据monitor信息记录表' PARTITIONED BY (pt STRING COMMENT 'Data date, yyyy-MM-dd') ;
2. Program development
1. Data checker program
'''PyODPS 3 Please make sure not to use download data from MaxCompute to process it. Download data operations often include the open_reader of Table/Instance and the to_pandas method of DataFrame. It is recommended to use PyODPS DataFrame (created from MaxCompute tables) and MaxCompute SQL to process data. For more detailed content, please refer to: /document_detail/ ''' import os from odps import ODPS, DataFrame from datetime import datetime, timedelta from dateutil import parser .use_instance_tunnel = True # Get the current timenow_time = ().strftime('%Y-%m-%d %H:%M:%S') print(now_time) pt = args['date'] print(pt) date = (pt, "%Y-%m-%d") # Monitoring table list tbl_sort_code -> 0: Others (dimensional table class), 1: Amazon, 2: Small and medium-sized platforms, 3: Market datasql_tbl_info = """ SELECT * FROM puture_bigdata.tmp_monitor_tbl_info WHERE monitor_flag = 1 AND tbl_sort_code = 3 """ # Results tableres_tbl_name = "puture_bigdata.tmp_monitor_tbl_info_log_di" # Statistics SQL code -- table row count, last cycle differencedef sql_upper_period_diff(): sql = f""" set =true ; INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}') SELECT , a.tbl_name , a.stat_time , a.pt_format , a.stat_pt , a.val_type , , a.rule_code , a.rule_type , a.expect_val , IF ( = 0, 1, (IF (( - NVL(,0)) >= {expect_val}, 0, 1 ))) AS is_exc , a.tbl_sort_code , a.tbl_sort_name FROM ( SELECT concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id , '{tbl_name}' AS tbl_name , '{now_time}' AS stat_time , '{pt_format}' AS pt_format , date_format('{date_str}' ,'{pt_format}') AS stat_pt , '{val_type}' AS val_type , COUNT(1) AS val , '{rule_code}' AS rule_code , '{rule_type}' AS rule_type , {expect_val} AS expect_val , {tbl_sort_code} AS tbl_sort_code , '{tbl_sort_name}' AS tbl_sort_name FROM puture_bigdata.{tbl_name} WHERE pt = date_format('{date_str}' ,'{pt_format}') ) a LEFT JOIN ( SELECT tbl_name, val FROM ( SELECT tbl_name, val , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC ) AS rn FROM {res_tbl_name} WHERE pt = DATE_ADD('{date_str}', -1) ) WHERE rn = 1 ) b ON a.tbl_name = b.tbl_name ; """ return sql # table row number, fixed valuedef sql_line_fixed_val(): sql = f""" set =true ; INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}') SELECT concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id , '{tbl_name}' AS tbl_name , '{now_time}' AS stat_time , '{pt_format}' AS pt_format , date_format('{date_str}' ,'{pt_format}') AS stat_pt , '{val_type}' AS val_type , COUNT(1) AS val , '{rule_code}' AS rule_code , '{rule_type}' AS rule_type , {expect_val} AS expect_val , IF (COUNT(1) >= {expect_val}, 0, 1 ) AS is_exc , {tbl_sort_code} AS tbl_sort_code , '{tbl_sort_name}' AS tbl_sort_name FROM puture_bigdata.{tbl_name} WHERE pt = date_format('{date_str}' ,'{pt_format}') ; """ return sql # Execute monitoring statistics codedef ex_monitor(sql: str): try : # print (sql) o.execute_sql(sql, hints={'': True , "":"script"}) print("{}: Run successfully".format(tbl_name) ) except Exception as e: print('{}: Running exception ======> '.format(tbl_name) + str(e)) if __name__ == '__main__': try : with o.execute_sql(sql_tbl_info, hints={'': True}).open_reader() as reader: for row_record in reader: # print(row_record) # Print a data value tbl_name = row_record.tbl_name pt_format = row_record.pt_format val_type = row_record.val_type monitor_flag = row_record.monitor_flag rule_code = row_record.rule_code rule_type = row_record.rule_type expect_val = row_record.expect_val tbl_sort_code = row_record.tbl_sort_code tbl_sort_name = row_record.tbl_sort_name pt_num = row_record.pt_num date_str = (date + timedelta(days=pt_num)).strftime('%Y-%m-%d') if rule_code == 1 : ex_monitor(sql_upper_period_diff()) elif rule_code == 2 : ex_monitor(sql_line_fixed_val()) else : print("Unknown rules!!!") except Exception as e: print('abnormal ======> ' + str(e))
2. Alarm information push program
'''PyODPS 3 Please make sure not to use download data from MaxCompute to process it. Download data operations often include the open_reader of Table/Instance and the to_pandas method of DataFrame. It is recommended to use PyODPS DataFrame (created from MaxCompute tables) and MaxCompute SQL to process data. For more detailed content, please refer to: /document_detail/ ''' import json import requests from datetime import datetime import os from odps import ODPS, DataFrame date_str = args['date'] # Interface address and token informationurl = '/robot/send?access_token=***********************' now_time = ().strftime('%Y-%m-%d %H:%M:%S') print (now_time) sql_query = f""" SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc FROM ( SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC) AS rn FROM puture_bigdata_dev.tmp_monitor_tbl_info_log_di WHERE pt = '{date_str}' AND tbl_sort_code = 1 -- Table types ) a WHERE rn = 1 AND is_exc = 1 """ # Dingding Robot, Send Messagedef dd_robot(url:str, content: str): HEADERS = {"Content-Type": "application/json;charset=utf-8"} Keywords to be set in #content data_info = { "msgtype": "text", "text": { "content": content }, "isAtAll": False #This is someone who needs @ to configure # ,"at": {"atMobiles": ["15xxxxxx06",'18xxxxxx1']} } value = (data_info) response = (url,data=value,headers=HEADERS) if ()['errmsg']!='ok': print() # Main functionif __name__ == '__main__': # py3 can be omitted try : with o.execute_sql(sql_query, hints={'': True}).open_reader() as reader: result_rows = list(reader) # Read all result rows result_count = len(result_rows) # Get the number of results #print("Result Number:", result_count) # Print the number of results if result_count > 0 : for row in result_rows: tbl_name = row.tbl_name stat_time = row.stat_time stat_pt = row.stat_pt val_type = row.val_type val = rule_type = row.rule_type expect_val = row.expect_val #print (tbl_name) content = "Data quality(DQC)Verification alarm \n " content = content + "【Object Name】:" + tbl_name + " \n " content = content + "【Actual partition】:pt=" + stat_pt + " \n " content = content + "【Trigger Rule】: " + rule_type + " | Current sample value: " + val + " | Threshold: " + expect_val + " \n " content = content + now_time + " \n " dd_robot(url, content) else : print ("No abnormal situation;") except Exception as e: print ('abnormal ========>' + str(e) )
3. Alarm sample
Data Quality (DQC) verification alarm
[Object Name]: dws_amazon_market_sales_stat_di
[Actual partition]: pt=20240103
[Trigger Rule]: Number of table rows, fixed value | Current sample value: 617 | Threshold: 650
2024-01-04 02:54:44
This is the article about Python's project practice of implementing monitoring warnings for database tables. For more related Python database table monitoring warnings, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!