/// <summary>
/// Queue multi-threading, T represents a single type of processing ~
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class QueueThreadBase<T>
{
#region Variables & Properties
/// <summary>
/// Pending results
/// </summary>
private class PendingResult
{
/// <summary>
///Pending value
/// </summary>
public T PendingValue { get; set; }
/// <summary>
/// Is there a value
/// </summary>
public bool IsHad { get; set; }
}
/// <summary>
/// Number of threads
/// </summary>
public int ThreadCount
{
get { return this.m_ThreadCount; }
set { this.m_ThreadCount = value; }
}
private int m_ThreadCount = 5;
/// <summary>
/// Cancel=True
/// </summary>
public bool Cancel { get; set; }
/// <summary>
/// Thread list
/// </summary>
List<Thread> m_ThreadList;
/// <summary>
/// Number of completed queues
/// </summary>
private volatile int m_CompletedCount = 0;
/// <summary>
/// Total number of queues
/// </summary>
private int m_QueueCount = 0;
/// <summary>
/// Complete all locks
/// </summary>
private object m_AllCompletedLock = new object();
/// <summary>
/// Number of completed threads
/// </summary>
private int m_CompetedCount = 0;
/// <summary>
/// Queue lock
/// </summary>
private object m_PendingQueueLock = new object();
private Queue<T> m_InnerQueue;
#endregion
#region event related
/// <summary>
/// Complete all events
/// </summary>
public event Action<CompetedEventArgs> AllCompleted;
/// <summary>
/// Single completion event
/// </summary>
public event Action<T, CompetedEventArgs> OneCompleted;
/// <summary>
/// Raise all completed events
/// </summary>
/// <param name="args"></param>
private void OnAllCompleted(CompetedEventArgs args)
{
if (AllCompleted != null)
{
try
{
AllCompleted(args);//All completed events
}
catch { }
}
}
/// <summary>
/// Raise a single completion event
/// </summary>
/// <param name="pendingValue"></param>
/// <param name="args"></param>
private void OnOneCompleted(T pendingValue, CompetedEventArgs args)
{
if (OneCompleted != null)
{
try
{
OneCompleted(pendingValue, args);
}
catch { }
}
}
#endregion
#region construct
public QueueThreadBase(IEnumerable<T> collection)
{
m_InnerQueue = new Queue<T>(collection);
this.m_QueueCount = m_InnerQueue.Count;
}
#endregion
#region Subject
/// <summary>
/// Initialize the thread
/// </summary>
private void InitThread()
{
m_ThreadList = new List<Thread>();
for (int i = 0; i < ThreadCount; i++)
{
Thread t = new Thread(new ThreadStart(InnerDoWork));
m_ThreadList.Add(t);
= true;
();
}
}
/// <summary>
/// start
/// </summary>
public void Start()
{
InitThread();
}
/// <summary>
/// Thread work
/// </summary>
private void InnerDoWork()
{
try
{
Exception doWorkEx = null;
DoWorkResult doworkResult = ;
var t = CurrentPendingQueue;
while (! && )
{
try
{
doworkResult = DoWork();
}
catch (Exception ex)
{
doWorkEx = ex;
}
m_CompletedCount++;
int precent = m_CompletedCount * 100 / m_QueueCount;
OnOneCompleted(, new CompetedEventArgs() { CompetedPrecent = precent, InnerException = doWorkEx });
if (doworkResult == )
{
= true;
break;
}
else if (doworkResult == )
{
break;
}
t = CurrentPendingQueue;
}
lock (m_AllCompletedLock)
{
m_CompetedCount++;
if (m_CompetedCount == m_ThreadList.Count)
{
OnAllCompleted(new CompetedEventArgs() { CompetedPrecent = 100 });
}
}
}
catch
{
throw;
}
}
/// <summary>
/// Subclass rewrite
/// </summary>
/// <param name="pendingValue"></param>
/// <returns></returns>
protected virtual DoWorkResult DoWork(T pendingValue)
{
return ;
}
/// <summary>
/// Get the current result
/// </summary>
private PendingResult CurrentPendingQueue
{
get
{
lock (m_PendingQueueLock)
{
PendingResult t = new PendingResult();
if (m_InnerQueue.Count != 0)
{
= m_InnerQueue.Dequeue();
= true;
}
else
{
= default(T);
= false;
}
return t;
}
}
}
#endregion
#region related classes & enumeration
/// <summary>
/// Dowork result enumeration
/// </summary>
public enum DoWorkResult
{
/// <summary>
/// Continue to run, default
/// </summary>
ContinueThread = 0,
/// <summary>
/// Terminate the current thread
/// </summary>
AbortCurrentThread = 1,
/// <summary>
/// Terminate all threads
/// </summary>
AbortAllThread = 2
}
/// <summary>
/// Complete event data
/// </summary>
public class CompetedEventArgs : EventArgs
{
public CompetedEventArgs()
{
}
/// <summary>
/// Percentage of completion
/// </summary>
public int CompetedPrecent { get; set; }
/// <summary>
/// Exception information
/// </summary>
public Exception InnerException { get; set; }
}
#endregion
}