In the previousTask-based asynchronous programming mode (TAP)》The article describes the self-implementation method of asynchronous operation under the .net 4.5 framework. In fact, some classes have implemented asynchronous encapsulation in .net 4.5. For example, in .net 4.5, the Stream class has added the Async method, so asynchronous operations can be implemented through stream-based communication methods.
1. Read file data asynchronously
public static void TaskFromIOStreamAsync(string fileName) { int chunkSize = 4096; byte[] buffer = new byte[chunkSize]; FileStream fileStream = new FileStream(fileName, , , , chunkSize, true); Task<int> task = (buffer, 0, ); ((readTask) => { int amountRead = ; //The file stream must be released in ContinueWith (); ($"Async(Simple) Read {amountRead} bytes"); }); }
In the above code, the asynchronous data is read only once, and after the reading is completed, the execution right is returned to the main thread. But in real scenarios, it is necessary to read from the stream multiple times to obtain all the data (such as file data is larger than a given buffer size, or processing data from the network stream (the data has not reached the machine yet)). Therefore, in order to complete the asynchronous read operation, it is necessary to continuously read data from the stream until all the required data is obtained.
The above problems result in two-level tasks being needed to deal with. The outer layer of the Task is used for all reading work for the caller to use. The inner layer's Task is used for each read operation.
The first asynchronous read will return a Task. If you directly return to the place where Wait or ContinueWith is called, the downward execution will continue after the first read is finished. In fact, I hope the caller will only execute after completing all read operations. Therefore, the first Task release conference cannot be given to the caller. A "pseudo Task" is needed before returning after completing all read operations.
The above problem needs to be solved using the TaskCompletionSource<T> class, which can generate a "pseudo-task" for return. When the asynchronous read operation is completed, call its object's TrySetResult, allowing the caller of Wait or ContinueWith to continue execution.
public static Task<long> AsynchronousRead(string fileName) { int chunkSize = 4096; byte[] buffer = new byte[chunkSize]; //Create a returned pseudo-Task object TaskCompletionSource<long> tcs = new TaskCompletionSource<long>(); MemoryStream fileContents = new MemoryStream();// Used to save the read content FileStream fileStream = new FileStream(fileName, , , , chunkSize, true); += chunkSize;//Specify the buffer size. It seems that Capacity will grow automatically, it doesn’t matter whether it is set or not. The number of data you write in the future will grow as much as you want. Task<int> task = (buffer, 0, ); (readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs)); //Read loop in ContinueWith, after reading is completed, return the Task of tcs return ; } /// <summary> /// Continue to read the data/// </summary> /// <param name="task">Threads that read data</param>/// <param name="fileStream">File Stream</param>/// <param name="fileContents">File storage location</param>/// <param name="buffer">Read data cache</param>/// <param name="tcs">Pseudo Task object</param>private static void ContinueRead(Task<int> task, FileStream fileStream, MemoryStream fileContents, byte[] buffer, TaskCompletionSource<long> tcs) { if () { int bytesRead = ; (buffer, 0, bytesRead);//Write to memory area. It seems Capacity will grow automatically if (bytesRead > 0) { //Although it seems to be a new task, it uses ContinueWith, so the same thread is used. //Not finished reading, enable another asynchronous reading to continue Task<int> newTask = (buffer, 0, ); //A loop was made here (readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs)); } else { //It has been read all, so the data needs to be returned (); (); ();//The data buffer data should be released after data is used } } }
2. Adapt to the asynchronous programming mode of Task
Older asynchronous methods in the .NET Framework all carry the "Begin-" and "End-" prefixes. These methods are still valid, and for the sake of consistency of the interface, they can be encapsulated into the Task.
FromAsyn method takes the BeginRead and EndRead methods of the stream as parameters, and adds the buffer for storing the data. The BeginRead and EndRead methods will be executed, and the Continuation Task is called after the EndRead is completed, and the control is returned to the main code. The above example closes the stream and returns the converted data
const int ReadSize = 256; /// <summary> /// Get string from file/// </summary> /// <param name="path">File path</param>/// <returns>String</returns>public static Task<string> GetStringFromFile(string path) { FileInfo file = new FileInfo(path); byte[] buffer = new byte[1024];//The buffer for storing data FileStream fileStream = new FileStream( path, , , , , | ); Task<int> task = Task<int>.(, , buffer, 0, ReadSize, null);//This parameter is the required parameter of BeginRead TaskCompletionSource<string> tcs = new TaskCompletionSource<string>(); (taskRead => OnReadBuffer(taskRead, fileStream, buffer, 0, tcs)); return ; } /// <summary> /// Read data/// </summary> /// <param name="taskRead">Read Task</param>/// <param name="fileStream">File Stream</param>/// <param name="buffer">Read data storage location</param>/// <param name="offset">Read offset</param>/// <param name="tcs">PseudoTask</param>private static void OnReadBuffer(Task<int> taskRead, FileStream fileStream, byte[] buffer, int offset, TaskCompletionSource<string> tcs) { int readLength = ; if (readLength > 0) { int newOffset = offset + readLength; Task<int> task = Task<int>.(, , buffer, newOffset, ( - newOffset, ReadSize), null); (callBackTask => OnReadBuffer(callBackTask, fileStream, buffer, newOffset, tcs)); } else { (.(buffer, 0, )); (); } }
3. Use async and await to read data
In the following example, the async and await keywords are used to asynchronously read one file while compressing and writing to another file. All operations before the await keyword run on the caller thread, and operations starting from await are run in the Continuation Task. However, there are occasions where these two keywords cannot be used: ① When the end time of the Task is unclear; ② When multi-level Task and TaskCompletionSource must be used
/// <summary> /// Compression of synchronization method/// </summary> /// <param name="lstFiles">File List</param>public static void SyncCompress(IEnumerable<string> lstFiles) { byte[] buffer = new byte[16384]; foreach(string file in lstFiles) { using (FileStream inputStream = (file)) { using (FileStream outputStream = (file + ".compressed")) { using ( compressStream = new (outputStream, )) { int read = 0; while((read=(buffer,0,))>0) { (buffer, 0,read); } } } } } } /// <summary> /// File compression of asynchronous method/// </summary> /// <param name="lstFiles">Files that require compression</param>/// <returns></returns> public static async Task AsyncCompress(IEnumerable<string> lstFiles) { byte[] buffer = new byte[16384]; foreach(string file in lstFiles) { using (FileStream inputStream = (file)) { using (FileStream outputStream = (file + ".compressed")) { using ( compressStream = new (outputStream, )) { int read = 0; while ((read = await (buffer, 0, )) > 0) { await (buffer, 0, read); } } } } } }
The above is the detailed content of c# using Task to implement non-blocking I/O operations. For more information on c# implementing non-blocking I/O operations, please pay attention to my other related articles!