SoFunction
Updated on 2025-03-06

Detailed explanation of the example of using MySqlBulkLoader to implement batch insertion data

introduce

Recently, I encountered a bottleneck for inserting data in a project. Tens of thousands, hundreds of thousands, or millions of data are saved to the MYSQL database. The speed of inserting data using EF is very slow. When the data volume is very large, EF insertion takes dozens of minutes or even hours. This speed is definitely not what we expect.

After understanding and research,MySqlBulkLoader, data can be inserted into the database in batches and the speed is much better than EF.

The main implementation method of MySqlBulkLoader: convert the data that needs to be inserted into DataTable, and convert the DataTable into a CSV file, and import the CSV file into the database using batch import.

Notice:

1). The database connection address needs to be added to the configurationAllowLoadLocalInfile=true, allow local files to be imported;

Data Source = database address; Port = port; Initial Catalog = database name; User Id = user name; Password = password; AllowLoadLocalInfile=true;

2). When inserting, the number of insert rows will be returned, but checking that all data is correct and no exception is reported. However, the number of inserts is 0 is returned. You can check whether the table has a unique index.Is the inserted data violating the unique index

(The following sections show the code. If you need to read the complete code, just read it.5. Complete code) 

1. Convert List to DataTable

/// <summary>
        /// Convert List to DataTable        /// </summary>
        /// <returns></returns>
        public DataTable ListToDataTable<T>(List<T> data)
        {
            #region Create a DataTable with the entity name as the DataTable name
            var tableName = typeof(T).Name;
            tableName = (); /*The entity name and table name are converted mainly according to the regulations of each project, and it is not necessarily the conversion method I wrote*/
            DataTable dt = new DataTable
            {
                TableName = tableName
            };

            #endregion

            #region Takes the column name, using the entity's attribute name as the column name
            var properties = typeof(T).GetProperties();
            foreach (var item in properties)
            {
                var curFileName = ;
                curFileName = ();/*Character names and field names are converted mainly according to the regulations of each project, and it is not necessarily the conversion method I wrote*/
                (curFileName);
            }

            #endregion

            #region column assignment            foreach (var item in data)
            {
                DataRow dr = ();
                var columns = ;

                var curPropertyList = ().GetProperties();
                foreach (var p in curPropertyList)
                {
                    var name = ;
                    name = ();/*Character names and field names are converted mainly according to the regulations of each project, and it is not necessarily the conversion method I wrote*/
                    var curValue = (item);

                    int i = (name);
                    dr[i] = curValue;
                }

                (dr);
            }

            #endregion  

            return dt;
        }

2. Convert DataTable to a standard CSV file

/// <summary>
    /// csv extension    /// </summary>
    public static class CSVEx
    {
        /// <summary>
        ///Convert DataTable to standard CSV file        /// </summary>
        /// <param name="table">Data table</param>        /// <param name="tmpPath">File address</param>        /// <returns>Return to standard CSV</returns>        public static void ToCsv(this DataTable table, string tmpPath)
        {
            // Use a half-width comma (i.e.,) as a separator, and the column is empty to express its existence.            //If there is a half-width comma (i.e.,), the field value is included in half-width quotes (i.e. "").            //If there are half-angle quotation marks (i.e. "), they should be replaced with half-angle double quotation marks ("") escaped, and the field value is included in half-angle quotation marks (i.e. "").            StringBuilder sb = new StringBuilder();
            DataColumn colum;
            foreach (DataRow row in )
            {
                for (int i = 0; i &lt; ; i++)
                {
                    Type _datatype = typeof(DateTime);
                    colum = [i];
                    if (i != 0) ("\t");
                    //if ( == typeof(string) &amp;&amp; row[colum].ToString().Contains(","))
                    //{
                    //    ("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\"");
                    //}
                    if ( == _datatype)
                    {
                        (((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss"));
                    }
                    else (row[colum].ToString());
                }
                ("\r\n");
            }
            StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8);
            (());
            ();
        }

    }

Import file data into database

/// &lt;summary&gt;
    /// Batch import of mysql help class    /// &lt;/summary&gt;
    public static class MySqlHelper
    {
        /// &lt;summary&gt;
        /// MySqlBulkLoader batch import        /// &lt;/summary&gt;
        /// <param name="_mySqlConnection">Database connection address</param>        /// &lt;param name="table"&gt;&lt;/param&gt;
        /// &lt;param name="csvName"&gt;&lt;/param&gt;
        /// &lt;returns&gt;&lt;/returns&gt;
        public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName)
        {
            var columns = &lt;DataColumn&gt;().Select(colum =&gt; ).ToList();
            MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection)
            {
                FieldTerminator = "\t",
                FieldQuotationCharacter = '"',
                EscapeCharacter = '"',
                LineTerminator = "\r\n",
                FileName = csvName,
                NumberOfLinesToSkip = 0,
                TableName = ,

            };

            (columns);
            return ();
        }
    }

4. Use MySqlBulkLoader to insert data in batches

/// &lt;summary&gt;
        /// Batch insertion of data using MySqlBulkLoader        /// &lt;/summary&gt;
        /// &lt;typeparam name="T"&gt;&lt;/typeparam&gt;
        /// &lt;param name="data"&gt;&lt;/param&gt;
        /// &lt;returns&gt;&lt;/returns&gt;
        /// &lt;exception cref="Exception"&gt;&lt;/exception&gt;
        public int BulkLoaderData&lt;T&gt;(List&lt;T&gt; data)
        {
            if ( &lt;= 0) return 0;

            var connectString = "Database connection address";
            using (MySqlConnection connection = new MySqlConnection(connectString))
            {
                MySqlTransaction sqlTransaction = null;
                try
                {
                    if ( == )
                    {
                        ();
                    }
                    sqlTransaction = ();


                    var dt = ListToDataTable&lt;T&gt;(data); //Convert List to dataTable                    string tmpPath = ();
                    (tmpPath); //Convert DataTable to CSV file                    var insertCount = (connection, dt, tmpPath); //Insert data using MySqlBulkLoader                    ();

                    try
                    {
                        if ((tmpPath)) (tmpPath);
                    }
                    catch (Exception)
                    {
                        //Delete file failed
                    }
                    return insertCount; //Return the number of successful execution                }
                catch (Exception e)
                {
                    if (sqlTransaction != null)
                    {
                        ();
                    }
                    //Execution exception                    throw e;
                }
            }

        }

5. Complete code

namespace 
{

    /// &lt;summary&gt;
    /// Batch insertion    /// &lt;/summary&gt;
    public class BulkLoader
    {


        /// &lt;summary&gt;
        /// Test batch insertion port        /// &lt;/summary&gt;
        /// &lt;returns&gt;&lt;/returns&gt;
        public int BrantchDataTest()
        {

            #region Simulate data            var data = new List&lt;CrmCouponTestDto&gt;() {
                 new CrmCouponTestDto {
                     Id=1,
                     CouponCode="test001",
                     CouponId = 1,
                     MemberId=100,
                     IssueTime=("2022-06-27 14:00:00"),
                     UsageTime=("3000-12-31 00:00:00"),
                     UsageShopId=0,
                     UsageBillNo="",
                     EffectiveStart=("2022-06-27 14:00:00"),
                     EffectiveEnd=("2023-06-27 14:00:00"),
                     Status=0
                 },
                 new CrmCouponTestDto {
                     Id=2,
                     CouponCode="test002",
                     CouponId = 1,
                       MemberId=101,
                     IssueTime=("2022-06-27 14:00:00"),
                     UsageTime=("2022-06-27 14:30:00"),
                     UsageShopId=2,
                     UsageBillNo="CS202206271430001",
                     EffectiveStart=("2022-06-27 14:00:00"),
                     EffectiveEnd=("2023-06-27 14:00:00"),
                     Status=1
                 },
                  new CrmCouponTestDto {
                     Id=3,
                     CouponCode="test003",
                     CouponId = 1,
                     MemberId=102,
                     IssueTime=("2022-06-27 14:00:00"),
                     UsageTime=("3000-12-31 00:00:00"),
                     UsageShopId=0,
                     UsageBillNo="",
                     EffectiveStart=("2022-06-27 14:00:00"),
                     EffectiveEnd=("2023-06-27 14:00:00"),
                     Status=0
                 },
                    new CrmCouponTestDto {
                     Id=4,
                     CouponCode="test004",
                     CouponId = 1,
                     MemberId=103,
                     IssueTime=("2022-06-27 14:00:00"),
                     UsageTime=("3000-12-31 00:00:00"),
                     UsageShopId=0,
                     UsageBillNo="",
                     EffectiveStart=("2022-06-27 14:00:00"),
                     EffectiveEnd=("2023-06-27 14:00:00"),
                     Status=0
                 }
             };
            #endregion
            var result = BulkLoaderData&lt;CrmCouponTestDto&gt;(data);
            return result;

        }


        /// &lt;summary&gt;
        /// Batch insertion of data using MySqlBulkLoader        /// &lt;/summary&gt;
        /// &lt;typeparam name="T"&gt;&lt;/typeparam&gt;
        /// &lt;param name="data"&gt;&lt;/param&gt;
        /// &lt;returns&gt;&lt;/returns&gt;
        /// &lt;exception cref="Exception"&gt;&lt;/exception&gt;
        public int BulkLoaderData&lt;T&gt;(List&lt;T&gt; data)
        {
            if ( &lt;= 0) return 0;

            var connectString = "Database connection address";
            using (MySqlConnection connection = new MySqlConnection(connectString))
            {
                MySqlTransaction sqlTransaction = null;
                try
                {
                    if ( == )
                    {
                        ();
                    }
                    sqlTransaction = ();


                    var dt = ListToDataTable&lt;T&gt;(data); //Convert List to dataTable                    string tmpPath = ();
                    (tmpPath); //Convert DataTable to CSV file                    var insertCount = (connection, dt, tmpPath); //Insert data using MySqlBulkLoader                    ();

                    try
                    {
                        if ((tmpPath)) (tmpPath);
                    }
                    catch (Exception)
                    {
                        //Delete file failed
                    }
                    return insertCount; //Return the number of successful execution                }
                catch (Exception e)
                {
                    if (sqlTransaction != null)
                    {
                        ();
                    }
                    //Execution exception                    throw e;
                }
            }

        }


        /// &lt;summary&gt;
        /// Convert List to DataTable core method        /// &lt;/summary&gt;
        /// &lt;returns&gt;&lt;/returns&gt;
        public DataTable ListToDataTable&lt;T&gt;(List&lt;T&gt; data)
        {
            #region Create a DataTable with the entity name as the DataTable name
            var tableName = typeof(T).Name;
            tableName = (); /*The entity name and table name are converted mainly according to the regulations of each project, and it is not necessarily the conversion method I wrote*/
            DataTable dt = new DataTable
            {
                TableName = tableName
            };

            #endregion

            #region Takes the column name, using the entity's attribute name as the column name
            var properties = typeof(T).GetProperties();
            foreach (var item in properties)
            {
                var curFileName = ;
                curFileName = ();/*Character names and field names are converted mainly according to the regulations of each project, and it is not necessarily the conversion method I wrote*/
                (curFileName);
            }

            #endregion

            #region column assignment            foreach (var item in data)
            {
                DataRow dr = ();
                var columns = ;

                var curPropertyList = ().GetProperties();
                foreach (var p in curPropertyList)
                {
                    var name = ;
                    name = ();/*Character names and field names are converted mainly according to the regulations of each project, and it is not necessarily the conversion method I wrote*/
                    var curValue = (item);

                    int i = (name);
                    dr[i] = curValue;
                }

                (dr);
            }

            #endregion  

            return dt;
        }


    }


    /// &lt;summary&gt;
    /// Batch import of mysql help class    /// &lt;/summary&gt;
    public static class MySqlHelper
    {
        /// &lt;summary&gt;
        /// MySqlBulkLoader batch import        /// &lt;/summary&gt;
        /// <param name="_mySqlConnection">Database connection address</param>        /// &lt;param name="table"&gt;&lt;/param&gt;
        /// &lt;param name="csvName"&gt;&lt;/param&gt;
        /// &lt;returns&gt;&lt;/returns&gt;
        public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName)
        {
            var columns = &lt;DataColumn&gt;().Select(colum =&gt; ).ToList();
            MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection)
            {
                FieldTerminator = "\t",
                FieldQuotationCharacter = '"',
                EscapeCharacter = '"',
                LineTerminator = "\r\n",
                FileName = csvName,
                NumberOfLinesToSkip = 0,
                TableName = ,

            };

            (columns);
            return ();
        }
    }


    /// &lt;summary&gt;
    /// csv extension    /// &lt;/summary&gt;
    public static class CSVEx
    {
        /// &lt;summary&gt;
        ///Convert DataTable to standard CSV file        /// &lt;/summary&gt;
        /// <param name="table">Data table</param>        /// <param name="tmpPath">File address</param>        /// <returns>Return to standard CSV</returns>        public static void ToCsv(this DataTable table, string tmpPath)
        {
            // Use a half-width comma (i.e.,) as a separator, and the column is empty to express its existence.            //If there is a half-width comma (i.e.,), the field value is included in half-width quotes (i.e. "").            //If there are half-angle quotation marks (i.e. "), they should be replaced with half-angle double quotation marks ("") escaped, and the field value is included in half-angle quotation marks (i.e. "").            StringBuilder sb = new StringBuilder();
            DataColumn colum;
            foreach (DataRow row in )
            {
                for (int i = 0; i &lt; ; i++)
                {
                    Type _datatype = typeof(DateTime);
                    colum = [i];
                    if (i != 0) ("\t");
                    //if ( == typeof(string) &amp;&amp; row[colum].ToString().Contains(","))
                    //{
                    //    ("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\"");
                    //}
                    if ( == _datatype)
                    {
                        (((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss"));
                    }
                    else (row[colum].ToString());
                }
                ("\r\n");
            }
            StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8);
            (());
            ();
        }

    }

    /// &lt;summary&gt;
    /// String conversion    /// &lt;/summary&gt;
    public static class StringExtensions
    {
        /// &lt;summary&gt;
        /// Convert to main_keys_id string method        /// &lt;/summary&gt;
        public static string ToSnakeCase(this string input)
        {
            if ((input)) { return input; }

            var startUnderscores = (input, @"^_+");
            return startUnderscores + (input, @"([a-z0-9])([A-Z])", "$1_$2").ToLower();
        }
    }


    /// &lt;summary&gt;
    /// Entity    /// &lt;/summary&gt;
    public class CrmCouponTestDto
    {
        /// &lt;summary&gt;
        /// ID
        /// &lt;/summary&gt;
        public long Id { get; set; }

        /// &lt;summary&gt;
        /// Card coupon number        /// &lt;/summary&gt;     
        public string CouponCode { get; set; }

        /// &lt;summary&gt;
        /// Card ID        /// &lt;/summary&gt;
        public int CouponId { get; set; }

        /// &lt;summary&gt;
        /// Member ID        /// &lt;/summary&gt;
        public int MemberId { get; set; }

        /// &lt;summary&gt;
        /// Distribution time        /// &lt;/summary&gt;   
        public DateTime IssueTime { get; set; }

        /// &lt;summary&gt;
        /// Use time        /// &lt;/summary&gt;      
        public DateTime UsageTime { get; set; }

        /// &lt;summary&gt;
        /// Use the store ID        /// &lt;/summary&gt;      

        public int UsageShopId { get; set; }

        /// &lt;summary&gt;
        /// Use single number        /// &lt;/summary&gt;      
        public string UsageBillNo { get; set; }

        /// &lt;summary&gt;
        /// Valid start time        /// &lt;/summary&gt;      
        public DateTime EffectiveStart { get; set; }

        /// &lt;summary&gt;
        /// Valid end time        /// &lt;/summary&gt;      
        public DateTime EffectiveEnd { get; set; }

        /// &lt;summary&gt;
        /// state        /// CouponStatus card status:        /// -1: Not received        /// 0: Not used        /// 1: Used        /// 2: Expired        ///3: Aborted        ///4: Returning to the gift        /// &lt;/summary&gt;

        public Int16 Status { get; set; }
    }
}

The above is a detailed explanation of the example of using MySqlBulkLoader to implement batch insertion data. For more information about batch insertion data of MySqlBulkLoader, please pay attention to my other related articles!