SoFunction
Updated on 2025-03-04

Sample code for encapsulating SQLite3 through python

Create a database instance

 from main import SQLiteDB
 ​
 db = SQLiteDB("")

Execute SQL directly using execute

 ("INSERT INTO user (name, amount, createtime) VALUES (?, ?, ?)",("Zhang San", 25.6, '2023-07-01 15:25:30'))

Create a table

 # Create a table ("""
     CREATE TABLE users(
         id INTEGER PRIMARY KEY AUTOINCREMENT,
         name TEXT NOT NULL,
         age INTEGER,
         email TEXT UNIQUE,
         created_at TEXT DEFAULT (datetime('now', 'localtime'))
     );
     """)

Insert Example

 # Single insertion ("user",['name', 'amount', 'createtime'],('Li Si', 25.6, '2023-07-01 15:25:30'))
 ​
 # Tuple list batch insertion fields = ['name', 'amount', 'createtime']
 values = [
 ('User 1', 22, '2024-11-12 12:13:11'),
 ('User 2', 23, '2024-11-12 12:13:11'),
 ('User 3', 24, '2024-11-12 12:13:11')
 ]
 db.batch_insert('user', fields, values)
 ​
 # Dictionary list batch insertion users_dict = [
 {'name': 'Xiao Ming', 'amount': 22, 'createtime': '2024-11-12 12:13:11'},
 {'name': 'Little Red', 'amount': 24, 'createtime': '2024-11-12 12:13:11'},
 {'name': 'Xiao Zhang', 'amount': 26, 'createtime': '2024-11-12 12:13:11'}
 ]
 db.insert_many_dict('user', users_dict)

Delete Example

 # Delete by condition affected_rows = ("user", "age > ?", (30,))
 ​
 # Delete by ID db.delete_by_id("user", 1)
 ​
 # Batch Delete id_list = [1, 2, 3, 4, 5]
 db.delete_many("user", id_list)
 ​
 # Clear the table db.truncate_table("user")

Modify example

 # Basic update ('users', ['name', 'age'], ('Zhang San', 25), 'id = ?', (1,))
 ​
 # Update by ID db.update_by_id('users', ['name', 'age'], ('Zhang San', 25), 1)
 ​
 #Update with dictionary db.update_dict('users', {'name': 'Zhang San', 'age': 25}, 'id = ?', (1,))
 ​
 # Batch update of dictionary data dict_list = [
     {'id': 1, 'name': 'Zhang San', 'age': 25, 'email': 'zhangsan@'},
     {'id': 2, 'name': 'Li Si', 'age': 30, 'email': 'lisi@'}
 ]
 db.batch_update_dict('users', dict_list)
 ​
 # Batch update values_list = [
     ('Zhang San', 25, 1),
     ('Li Si', 30, 2)
 ]
 db.batch_update('users', ['name', 'age'], values_list)

Query example

 # Query a single record result = db.fetch_one("SELECT *FROM user WHERE name = ?", ("Zhang San",))
 
 # Pagination query db.fetch_page("SELECT * FROM user", 3, 2)

 # Query multiple records results = db.fetch_all("SELECT *FROM user LIMIT 5")
 for row in results:
     print(row)
     
 # Conditional query results = db.fetch_all("SELECT *FROM user WHERE amount > ?", (20,))
 for row in results:
     print(row)

Conjunction table query example

Suppose there are two tables: user and orders

Inner connection query example

 sql = """
 SELECT , o.order_number, 
 FROM user u
 INNER JOIN orders o ON  = o.user_id
 WHERE  > ?
 """
 results = db.fetch_all(sql, (100,))
 ​
 for row in results:
     print(row)

Left connection query example

 sql = """
 SELECT , COUNT() as order_count
 FROM user u
 LEFT JOIN orders o ON  = o.user_id
 GROUP BY 
 """
 results = db.fetch_all(sql)
 ​
 for row in results:
     print(row)

SQLiteDB

import sqlite3


class SQLiteDB:
    def __init__(self, db_name):
        """Initialize database connection"""
         = None
         = None
        try:
             = (db_name)
             = ()
        except  as e:
            print(f"An error occurred while connecting to the database: {e}")

    def execute(self, sql, params=None):
        """Execute SQL query
         parameter:
             sql: SQL statement
             params: SQL parameters, used for parameterized query
         return:
             Return True after successful execution, and False after failure
         """
        try:
            if params:
                (sql, params)
            else:
                (sql)
            ()
            return True
        except  as e:
            print(f"An error occurred while executing a query: {e}")
            return False

    # Create a table    def create_table(self, table_name, fields):
        """Create a data table
        parameter:
            table_name: Table name
            fields: Field definition list,Each element is a tuple (Field name, Type definition)
        return:
            执行成功returnTrue,失败returnFalse
        """
        try:
            fields_str = ', '.join([f"{name} {definition}" for name, definition in fields])
            sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({fields_str})"
            return (sql)
        except  as e:
            print(f"An error occurred while creating the table: {e}")
            return False

    # Insert##############################################################################################    def insert(self, table_name, fields, values):
        """Insert data
        parameter:
            table_name: Table name
            fields: Field name列表,For example ['name', 'age']
            values: Value list,For example ('Zhang San', 25)
        return:
            执行成功returnTrue,失败returnFalse
        """
        try:
            # Construct SQL statements            placeholders = ','.join(['?' for _ in fields])
            fields_str = ','.join(fields)
            sql = f"INSERT INTO {table_name} ({fields_str}) VALUES({placeholders})"
            return (sql, values)
        except  as e:
            print(f"Insert data时出错: {e}")
            ()  # Roll back when an error occurs            return False

    # Batch Insert 1    def batch_insert(self, table_name, fields, values_list):
        """批量Insert data
        parameter:
            table_name: Table name
            fields: Field name列表,For example ['name', 'age']
            values_list: Value list,Each element is a tuple,For example [('Zhang San', 25), ('Li Si', 30)]
        return:
            执行成功returnTrue,失败returnFalse
        """
        try:
            # Construct SQL statements            placeholders = ','.join(['?' for _ in fields])
            fields_str = ','.join(fields)
            sql = f"INSERT INTO {table_name} ({fields_str}) VALUES ({placeholders})"

            # Perform batch insertion            (sql, values_list)
            ()
            return True
        except  as e:
            print(f"批量Insert data时出错: {e}")
            ()  # Roll back when an error occurs            return False

    # Batch insert 2    def insert_many_dict(self, table_name, dict_list):
        """使用Dictionary list批量Insert data
        parameter:
            table_name: Table name
            dict_list: Dictionary list,Each dictionary represents one line of data,For example:
                     [{'name': 'Zhang San', 'age': 25}, {'name': 'Li Si', 'age': 30}]
        return:
            执行成功returnTrue,失败returnFalse
        """
        if not dict_list:
            return False
        try:
            # Get field name from the first dictionary            fields = list(dict_list[0].keys())
            # Convert dictionary list to value list            values_list = [tuple(()) for d in dict_list]
            return self.batch_insert(table_name, fields, values_list)
        except Exception as e:
            print(f"An error occurred while processing dictionary data: {e}")
            return False

    ############################################################

    # delete###########################################    def delete(self, table_name, condition, params=None):
        """Delete data
         parameter:
             table_name: table name
             condition: WHERE conditional statement, for example "age > ?" or "name = ?"
            params: 条件parameter,For example (20,) or ('Zhang San',)
        return:
            执行成功return受影响的行数,失败return-1
        """
        try:
            sql = f"DELETE FROM {table_name} WHERE {condition}"
            (sql, params or ())
            ()
            return 
        except  as e:
            print(f"An error occurred while deleting data: {e}")
            ()
            return -1

    def delete_by_id(self, table_name, id_value, id_field='id'):
        """Delete data based on ID
         parameter:
             table_name: table name
             id_value: ID value
             id_field: ID field name, default is 'id'
         return:
             The execution returns the number of affected rows successfully, and the failure returns -1
         """
        return (table_name, f"{id_field} = ?", (id_value,))

    def delete_many(self, table_name, id_list, id_field='id'):
        """Batch delete data
         parameter:
             table_name: table name
             id_list: ID list
             id_field: ID field name, default is 'id'
         return:
             The execution returns the number of affected rows successfully, and the failure returns -1
         """
        try:
            placeholders = ','.join(['?' for _ in id_list])
            sql = f"DELETE FROM {table_name} WHERE {id_field} IN ({placeholders})"
            (sql, id_list)
            ()
            return 
        except  as e:
            print(f"批量An error occurred while deleting data: {e}")
            ()
            return -1

    def truncate_table(self, table_name):
        """Clear table data
         parameter:
             table_name: table name
         return:
             Return True after successful execution, and False after failure
         """
        try:
            (f"DELETE FROM {table_name}")
            ()
            return True
        except  as e:
            print(f"An error occurred while clearing table data: {e}")
            ()
            return False

    ############################################################

    # renew###########################################    def update(self, table_name, fields, values, condition, condition_params=None):
        """Update data
        parameter:
            table_name: Table name
            fields: List of fields to update,For example ['name', 'age']
            values: 新的Value list,For example ('Zhang San', 25)
            condition: WHEREConditional statements,For example "id = ?"
            condition_params: 条件parameter,For example (1,)
        return:
            执行成功return受影响的行数,失败return-1
        """
        try:
            # Construct SET clause            set_clause = ','.join([f"{field} = ?" for field in fields])
            sql = f"UPDATE {table_name} SET {set_clause} WHERE {condition}"

            # Merge values ​​and condition_params            params = list(values)
            if condition_params:
                (condition_params)

            (sql, params)
            ()
            return 
        except  as e:
            print(f"Update data时出错: {e}")
            ()
            return -1

    def update_by_id(self, table_name, fields, values, id_value, id_field='id'):
        """according toIDUpdate data
        parameter:
            table_name: Table name
            fields: List of fields to update,For example ['name', 'age']
            values: 新的Value list,For example ('Zhang San', 25)
            id_value: IDvalue
            id_field: IDField name,Default is'id'
        return:
            执行成功return受影响的行数,失败return-1
        """
        return (table_name, fields, values, f"{id_field} = ?", (id_value,))

    def update_dict(self, table_name, update_dict, condition, condition_params=None):
        """使用字典Update data
        parameter:
            table_name: Table name
            update_dict: 要更新的Fields和value的字典,For example {'name': 'Zhang San', 'age': 25}
            condition: WHEREConditional statements,For example "id = ?"
            condition_params: 条件parameter,For example (1,)
        return:
            执行成功return受影响的行数,失败return-1
        """
        fields = list(update_dict.keys())
        values = list(update_dict.values())
        return (table_name, fields, values, condition, condition_params)

    def batch_update_dict(self, table_name, dict_list, id_field='id'):
        """使用Dictionary list批量Update data
        parameter:
            table_name: Table name
            dict_list: Dictionary list,Each dictionary must containid_fieldFields,For example:
                     [{'id': 1, 'name': 'Zhang San', 'age': 25},
                      {'id': 2, 'name': 'Li Si', 'age': 30}]
            id_field: IDField name,Default is'id'
        return:
            执行成功return受影响的行数,失败return-1
        """
        if not dict_list:
            return 0
        try:
            # Get all field names from the first dictionary (excluding ID fields)            fields = [f for f in dict_list[0].keys() if f != id_field]

            # Convert dictionary list to value list            values_list = []
            for d in dict_list:
                # Make sure the dictionary contains ID fields                if id_field not in d:
                    raise ValueError(f"Missing in the dictionary {id_field} Fields")
                # Build a value tuple: first add the field value to be updated, and finally add the ID value                values = tuple(d[f] for f in fields)
                values += (d[id_field],)
                values_list.append(values)

            return self.batch_update(table_name, fields, values_list, id_field)

        except Exception as e:
            print(f"An error occurred while batch update of dictionary data: {e}")
            return -1

    def batch_update(self, table_name, fields, values_list, id_field='id'):
        """批量Update data
        parameter:
            table_name: Table name
            fields: List of fields to update,For example ['name', 'age']
            values_list: Value list,Each element is a tuple,包含新value和ID,For example [('Zhang San', 25, 1), ('Li Si', 30, 2)]
            id_field: IDField name,Default is'id'
        return:
            执行成功return受影响的行数,失败return-1
        """
        try:
            # Construct SET clause            set_clause = ','.join([f"{field} = ?" for field in fields])
            sql = f"UPDATE {table_name} SET {set_clause} WHERE {id_field} = ?"

            (sql, values_list)
            ()
            return 
        except  as e:
            print(f"批量Update data时出错: {e}")
            ()
            return -1

    ############################################################
    # Query    def fetch_all(self, sql, params=None):
        """Get all query results
         parameter:
             sql: SQL query statement
             params: SQL parameters, used for parameterized query
         return:
             Query the result list, fail to return to the empty list
         """
        try:
            if params:
                (sql, params)
            else:
                (sql)
            return ()
        except  as e:
            print(f"An error occurred while obtaining data: {e}")
            return []

    # Pagination query    def fetch_page(self, sql, page_num, page_size, params=None):
        page_sql = f" limit {(page_num - 1) * page_size},{page_size}"
        print(sql + page_sql)
        return self.fetch_all(sql + page_sql, params)

    def fetch_one(self, sql, params=None):
        """Get single query results

         parameter:
             sql: SQL query statement
             params: SQL parameters, used for parameterized query
         return:
             Single query result, failed to return None
         """
        try:
            if params:
                (sql, params)
            else:
                (sql)
            return ()
        except  as e:
            print(f"An error occurred while obtaining data: {e}")
            return None

    ############################################################

    # Close the database connection when destroying an object    def __del__(self):
        try:
            ("VACUUM;")
            """Close the database connection"""
            if :
                ()
                ()
        except  as e:
            pass

This is the end of this article about the sample code of encapsulating SQLite3 through python. For more related content about encapsulating SQLite3 on python, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!