SoFunction
Updated on 2025-03-08

Detailed explanation of C# queue method that can be sorted in everything

need

The product needs to push data to different customers. The original implementation is that each piece of data will be immediately pushed to customers after it is generated, and the HTTP protocol is adopted. Because each piece of data is relatively small and the frequency of data generation is relatively high, HTTP connections will be established frequently, and the service data carried in each HTTP transmission is very small, and the actual utilization rate of the network is not high. It is hoped to increase the utilization rate of the network and reduce the load on the system.

analyze

A very natural idea is to send multiple pieces of data together. Here are a few key points:

1. The aggregation logic of multiple data:Should I save up a few sends, or send them according to the time period. If you save enough data to send, it may be difficult to save enough data to be needed when the data is sparse or the frequency of generation is not so stable. At this time, an expiration time is required because the customer may not be able to accept too much delay. Since I need to use time to control it no matter what, I simply chose to send it according to the time period. The idea is: since the last time of sending, after a certain period of time, all data generated by the customer during this period will be sent.

2. Methods for determining data expiration:Since you have chosen to send according to the time period, there must be a way to determine whether the sending time has reached. A very simple idea is to poll, poll all customers and see whoever has expired, send whoever has sent it. The time complexity of this algorithm is O(N). If there are many customers, it will consume too much time. There is another method: if the customer is sorted by time, then you only need to take the data and time judgment of the customer with the earliest time. If it is satisfied, send it and search backwards until the acquired customer data does not meet the conditions, then exit the processing and then wait for a while before making the judgment processing. This requires a data structure that supports sorting, which automatically sorts when writing data. The time complexity of this data structure can generally be O(log(n)). The read and write operation of this data structure is in principle the operation method of a queue, but it is just a sortable queue.

3. Distinguish between customers:Different customers have different data receiving addresses. When sending data to a specific customer, it should be easier to aggregate his data, and it is best to get the data that needs to be sent directly. A dictionary data structure can be used to meet this requirement, and the time complexity of taking a certain customer data can be reduced to O(1).

4. Data security issues:What if the program exits before the data is successfully sent, what should I do if the data is not sent? Can you continue to send it, or just throw it away and ignore it. If you want to restore unsent data after the program restarts, you must synchronize the data to other places, such as persisting to disk. Because the data security requirements here are not high, losing some data is also allowed, so the data to be sent can be put into memory after it is received.

accomplish

As mentioned above, the sortable data structure can be used, SortedList<TKey,TValue> can be used, the key is the time, and the value is the customer identification list that generates the data. However, its read and write operations are not thread-safe and require synchronization by yourself. Here you can use locks simply.

For data from different customers, for easy access, Dictionary<TKey,TValue> is used to satisfy the data. The key is the identity of the customer and the value is the accumulated unsent customer data. This data reading and writing is not thread-safe, and can be placed in the same lock as the SortedList.

Below are their definitions:

SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>();
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
readonly object _lock = new object();

When inserting data, you need to write to the SortedList first, and then write to the Dictionary. The code logic is relatively simple, please see:

    public void Publish(TKey key, TValue value)
    {
        DateTime now = ;
        lock (_lock)
        {
            if (_queue.TryGetValue(now, out List<TKey>? keys))
            {
                if (!keys!.Contains(key))
                {
                    (key);
                }
            }
            else
            {
                _queue.Add(now, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                (value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }

For consumption data, a data pull mode is adopted here. The logic of the method I wrote at the beginning is to read a piece of data, process it, and then delete it from the queue. However, this logic requires reading and writing to the queue, so it must be locked. Generally, it is time-consuming to process data. For example, if you want to send data through HTTP, locking may lead to a longer blocking time when writing data to the queue. So what is implemented here is to extract all the data that can be sent, then release the lock, and put the data processing outside the lock, so that the read and write performance of the queue is better.

    public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        DateTime now = ;

        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var first = _queue.First();
                var diffMillseconds = ().TotalMilliseconds;
                if (diffMillseconds < _valueDequeueMillseconds)
                {
                    break;
                }

                var keys = ;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        ((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }

This code is relatively long. I sorted out the logic: take the first piece of data in the queue, judge whether the time has reached the sending cycle, and exit directly if it has not been reached, and the method returns to the empty list. If the sending cycle is reached, the customer ID stored in the first piece of data is taken out, and the corresponding customer unsend data is obtained based on these identifiers, and the data is added to the return list according to the customer dimension, and the customers and their data are removed from the queue to return the list with data. A limit on the number of data to be pulled is also added here to facilitate control based on the actual situation of the business.

Let’s take a look at how to use this queue. Here we simulate multiple producers and one consumer. In fact, you can have as many producers and consumers as you like:

TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000);

List<Task> publishTasks = new List<Task>();

for (int i = 0; i < 4; i++)
{
    var j = i;
    ((() =>
    {
        int k = 0;
        while (true)
        {
            ($"key_{k}", $"value_{j}_{k}");
            (15);
            k++;
        }
    }, ));
}

(() =>
{
    while (true)
    {
        var list = (100);
        if ( <= 0)
        {
            (100);
            continue;
        }

        foreach (var item in list)
        {
            ($"{("")}:{}, {(",", )}");
        }
    }

}, );

(());

The above is a queue sorted by time implemented for this specific requirement.

A queue where everything can be sorted
It is easy to think that since it can be sorted by time, it is also possible to sort by other data types. This data structure can be applied in many scenarios, such as queues sorted by weight, queues sorted by priority, queues sorted by age, queues sorted by bank deposits, etc. This is a queue where everything can be sorted.

I'll post the main code here (please see the end of the article for the complete code and example):

public class SortedQueue<TSortKey, TKey, TValue>
where TSortKey : notnull, IComparable
where TKey : notnull
where TValue : notnull
{
    Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();

    SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>();

    readonly object _lock = new object();

    /// <summary>
    /// Create a new instance of SortedQueue
    /// </summary>
    public SortedQueue(int maxNumberOfMessageConsumedOnce)
    {
    }

    /// <summary>
    /// Publish a message to queue
    /// </summary>
    /// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param>
    /// <param name="key">The message key.</param>
    /// <param name="value">The message value.</param>
    public void Publish(TSortKey sortKey, TKey key, TValue value)
    {
        lock (_lock)
        {
            if (_queue.TryGetValue(sortKey, out List<TKey>? keys))
            {
                (key);
            }
            else
            {
                _queue.Add(sortKey, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                (value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }


    /// <summary>
    /// Pull a batch of messages.
    /// </summary>
    /// <param name="maxNumberOfMessages">The maximum number of pull messages.</param>
    /// <returns></returns>
    public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var keys = _queue.First().Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        ((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }
}

The code logic is relatively simple, so I won’t go into details. If you have any questions, please leave a message to communicate.

Let's talk about data security

Because in this implementation, all the data to be processed is in memory, losing data will bring certain risks, because there is a queue in front of my program. Even if the program crashes, it will only lose a small part of the data that has not been processed, which is acceptable in business, so there is no problem doing this. If you are interested in this program, you need to carefully consider your application scenario.

Let’s take a look at two possible situations where data loss can occur:

First, the program restarts when the data is still in the queue: For this case, the aforementioned mentioned synchronizing data to other places, such as writing to Redis, writing to database, writing to disk, etc. However, because network IO and disk IO are slow, this often leads to a significant drop in throughput. To ensure a certain throughput, some sharding mechanisms have to be introduced. Because of the unreliability of the distribution, some fault-tolerant and disaster-tolerant mechanisms may also be added. It is more complicated. You can refer to Kafka.

Second, the data processing failed: In this case, the program can be retryed; but if the exception causes the program to crash and the data has been removed from memory or other storage, the data will still be lost. At this time, an ACK mechanism can be used. After the processing is successful, an ACK is sent to the queue, carrying the processed data identifier, and the queue deletes data according to the identifier. Otherwise, consumers can still consume this data.

These problems do not have to be completely solved, but depend on the business scenario. It is possible that you can persist data to Redis, or you don’t need to introduce an ACK mechanism and just record which one you processed.

The above is all the content of this article. I hope it will be helpful to everyone's study and I hope everyone will support me more.