# Chapter 27 Compute-Bound Asynchronous Operations

# Introducing the CLR’s Thread Pool

As stated in the previous chapter, creating and destroying a thread is an expensive operation in terms of time. In addition, having lots of threads wastes memory resources and also hurts performance due to the operating system having to schedule and context switch between the runnable threads. To improve this situation, the CLR contains code to manage its own thread pool. You can think of a thread pool as being a set of threads that are available for your application’s own use. There is one thread pool per CLR; this thread pool is shared by all AppDomains controlled by that CLR. If multiple CLRs load within a single process, then each CLR has its own thread pool.

When the CLR initializes, the thread pool has no threads in it. Internally, the thread pool maintains a queue of operation requests. When your application wants to perform an asynchronous operation, you call some method that appends an entry into the thread pool’s queue. The thread pool’s code will extract entries from this queue and dispatch the entry to a thread pool thread. If there are no threads in the thread pool, a new thread will be created. Creating a thread has a performance hit associated with it (as already discussed). However, when a thread pool thread has completed its task, the thread is not destroyed; instead, the thread is returned to the thread pool, where it sits idle waiting to respond to another request. Because the thread doesn’t destroy itself, there is no added performance hit.

If your application makes many requests of the thread pool, the thread pool will try to service all of the requests by using just this one thread. However, if your application is queuing up several requests faster than the thread pool thread can handle them, additional threads will be created. Your application will eventually get to a point at which all of its requests can be handled by a small number of threads, so the thread pool should have no need to create a lot of threads.

If your application stops making requests of the thread pool, the pool may have a lot of threads in it that are doing nothing. This is wasteful of memory resources. So when a thread pool thread has been idle with nothing to do for some period of time (subject to change with different versions of the CLR), the thread wakes itself up and kills itself to free up resources. As the thread is killing itself, there is a performance hit. However, this probably doesn’t matter, because the thread is killing itself because it has been idle, which means that your application isn’t performing a lot of work.

The great thing about the thread pool is that it manages the tension between having a few threads, to keep from wasting resources, and having more threads, to take advantage of multiprocessors, hyperthreaded processors, and multi-core processors. And the thread pool is heuristic. If your application needs to perform many tasks and CPUs are available, the thread pool creates more threads. If your application’s workload decreases, the thread pool threads kill themselves.

💡小结:创建和销毁线程是一个昂贵的操作,要耗费大量时间。另外,太多的线程会浪费内存资源。由于操作系统必须调度可运行的线程并执行上下文切换,所以太多的线程还对性能不利。为了改善这个情况,CLR 包含了代码来管理它自己的线程池。每 CLR 一个线程池,这个线程池由 CLR 控制的所有 AppDomain 共享。如果一个进程中加载了多个 CLR,那么每个 CLR 都有它自己的线程池。CLR 初始化时,线程池中是没有线程的。在内部,线程池维护了一个操作请求队列。应用程序执行一个异步操作时,就调用某个方法,将一个记录项(entry)追加到线程池的队列中。线程池的代码从这个队列中提取记录项,将这个记录项派发(dispatch)给一个线程池线程。然而,当线程池线程完成任务后,线程不会被销毁。相反,线程会返回线程池,在那里进入空闲状态,等待响应另一个请求。由于线程不销毁自身,所以不再产生额外的性能损失。当应用程序发出请求的速度超过了线程池线程处理它们的速度,就会创建额外的线程。当一个线程池线程闲着没事儿一段时间之后 (不同版本的 CLR 对这个时间的定义不同),线程会自己醒来终止自己以释放资源。线程终止自己会产生一定的性能损失。然而,线程终止自己是因为它闲的慌,表明应用程序本身就么有做太多的事情,所以这个性能损失关系不大。线程池可以只容纳少量线程,从而避免浪费资源;也可以容纳更多的线程,以利用多处理器、超线程处理器和多核处理器。它能在这两种不同的状态之间从容地切换。线程池是启发式的。如果应用程序需要执行许多任务,同时有可能的 CPU,那么线程池会创建更多的线程。应用程序负载减轻,线程池线程就终止它们自己。

# Performing a Simple Compute-Bound Operation

To queue an asynchronous compute-bound operation to the thread pool, you typically call one of the following methods defined by the ThreadPool class.

static Boolean QueueUserWorkItem(WaitCallback callBack); 
static Boolean QueueUserWorkItem(WaitCallback callBack, Object state);

These methods queue a “work item” and optional state data to the thread pool’s queue, and then all of these methods return immediately. A work item is simply a method identified by the callback parameter that will be called by a thread pool thread. The method can be passed a single parameter specified via the state (the state data) argument. The version of QueueUserWorkItem without the state parameter passes null to the callback method. Eventually, some thread in the pool will process the work item, causing your method to be called. The callback method you write must match the System.Threading.WaitCallback delegate type, which is defined as follows.

delegate void WaitCallback(Object state);

💡注意: WaitCallback 委托、 TimerCallback 委托 (参见本章 27.8 节 “执行定时计算限制操作” 的讨论) 和 ParameterizedThreadStart 委托 (在第 26 章 “线程基础” 中讨论) 签名完全一致。定义和该签名匹配的方法后,使用 ThreadPool.QueueUserWorkItemSystem.Threading.TimerSystem.Threading.Thread 对象都可调用该方法。

The following code demonstrates how to have a thread pool thread call a method asynchronously.

using System; 
using System.Threading; 
public static class Program { 
 public static void Main() { 
 Console.WriteLine("Main thread: queuing an asynchronous operation"); 
 ThreadPool.QueueUserWorkItem(ComputeBoundOp, 5); 
 Console.WriteLine("Main thread: Doing other work here..."); 
 Thread.Sleep(10000); // Simulating other work (10 seconds) 
 Console.WriteLine("Hit <Enter> to end this program..."); 
 Console.ReadLine(); 
 } 
 // This method's signature must match the WaitCallback delegate 
 private static void ComputeBoundOp(Object state) { 
 // This method is executed by a thread pool thread 
 Console.WriteLine("In ComputeBoundOp: state={0}", state); 
 Thread.Sleep(1000); // Simulates other work (1 second) 
 // When this method returns, the thread goes back 
 // to the pool and waits for another task 
 } 
}

When I compile and run this code, I get the following output.

Main thread: queuing an asynchronous operation 
Main thread: Doing other work here... 
In ComputeBoundOp: state=5

And, sometimes when I run this code, I get this output.

Main thread: queuing an asynchronous operation 
In ComputeBoundOp: state=5 
Main thread: Doing other work here...

The difference in the order of the lines in the output is attributed to the fact that the two methods are running asynchronously with respect to one another. The Windows scheduler determines which thread to schedule first, or it may schedule them both simultaneously if the application is running on a multi-CPU machine.

💡注意:一旦回调方法抛出未处理的异常,CLR 会终止进程 (除非宿主强加了它自己的策略)。未处理异常的详情已在第 20 章 “异常和状态管理” 进行了讨论。

💡注意:对于 Windows Store 应用, System.Threading.ThreadPool 类是没有公开的。但在使用 System.Threading.Tasks 命名空间中的类型时,这个类被间接地使用 (详情参见本章稍后的 27.5 节 “任务”)。

💡小结:要将一个异步的计算限制操作放到线程池的队列中,通常可以调用 ThreadPool 类定义的 QueueUserWorkItem 方法及其重载,这些方法向线程池的队列添加一个 “工作项”(work item) 以及可选的状态数据。然后,所有方法会立即返回。工作项其实就是由 callBack 参数标识的一个方法,该方法将由线程池线程调用。可向方法传递一个 state 实参 (状态数据)。无 state 参数的那个版本的 QueueUserWorkItem 则向回调方法传递 null 。最终,池中的某个线程会处理工作项,造成你指定的方法被调用。

# Execution Contexts

Every thread has an execution context data structure associated with it. The execution context includes things such as security settings (compressed stack, Thread’s Principal property, and Windows identity), host settings (see System.Threading.HostExecutionContextManager), and logical call context data (see System.Runtime.Remoting.Messaging.CallContext’s LogicalSetData and LogicalGetData methods). When a thread executes code, some operations are affected by the values of the thread’s execution context settings. This is certainly true for the security settings. Ideally, whenever a thread uses another (helper) thread to perform tasks, the issuing thread’s execution context should flow (be copied) to the helper thread. This ensures that any operations performed by helper thread(s) are executing with the same security settings and host settings. It also ensures that any data stored in the initiating thread’s logical call context is available to the helper thread.

By default, the CLR automatically causes the initiating thread’s execution context to flow to any helper threads. This transfers context information to the helper thread, but it comes at a performance cost, because there is a lot of information in an execution context, and accumulating all of this information and then copying it for the helper thread takes a fair amount of time. If the helper thread then employs additional helper threads, then more execution context data structures have to be created and initialized as well.

In the System.Threading namespace, there is an ExecutionContext class that allows you to control how a thread’s execution context flows from one thread to another. Here is what the class looks like.

public sealed class ExecutionContext : IDisposable, ISerializable {
 [SecurityCritical] public static AsyncFlowControl SuppressFlow();
 public static void RestoreFlow();
 public static Boolean IsFlowSuppressed();
 // Less commonly used methods are not shown
}

You can use this class to suppress the flowing of an execution context, thereby improving your application’s performance. The performance gains can be quite substantial for a server application. There is not much performance benefit for a client application, and the SuppressFlow method is marked with the [SecurityCritical] attribute, making it impossible to call in some client applications (like Microsoft Silverlight). Of course, you should suppress the flowing of execution context only if the helper thread does not need to access the context information. If the initiating thread’s execution context does not flow to a helper thread, the helper thread will use whatever execution context it last associated with it. Therefore, the helper thread really shouldn’t execute any code that relies on the execution context state (such as a user’s Windows identity).

Here is an example showing how suppressing the flow of execution context affects data in a thread’s logical call context when queuing a work item to the CLR’s thread pool.

public static void Main() {
 // Put some data into the Main thread's logical call context
 CallContext.LogicalSetData("Name", "Jeffrey");
 // Initiate some work to be done by a thread pool thread
 // The thread pool thread can access the logical call context data 
 ThreadPool.QueueUserWorkItem(
 state => Console.WriteLine("Name={0}", CallContext.LogicalGetData("Name")));
 // Now, suppress the flowing of the Main thread's execution context
 ExecutionContext.SuppressFlow();
 // Initiate some work to be done by a thread pool thread
 // The thread pool thread CANNOT access the logical call context data
 ThreadPool.QueueUserWorkItem(
 state => Console.WriteLine("Name={0}", CallContext.LogicalGetData("Name")));
 // Restore the flowing of the Main thread's execution context in case
 // it employs more thread pool threads in the future
 ExecutionContext.RestoreFlow();
 ...
 Console.ReadLine();
}

When I compile and run the preceding code, I get the following output.

Name=Jeffrey
Name=

Although this discussion has focused on suppressing the flow of execution context when calling ThreadPool.QueueUserWorkItem, this technique is also useful when using Task objects (discussed in the “Tasks” section of this chapter) and is also useful when initiating asynchronous I/O operations (discussed in Chapter 28, “I/O-Bound Asynchronous Operations”).

💡小结:每个线程都关联了一个执行上下文数据结构。执行上下文(execution context)包括的东西有安全设置(压缩栈、Thread 的 Principal 属性和 Windows 身份)、宿主设置(参见 System.Threading.HostExecutionContextManager)以及逻辑调用上下文数据(参见 System.Runtime.Remoting.Messaging.CallContext 的 LogicalSetData 和 LogicalGetData 方法)。线程执行它的操作时,一些操作会收到线程执行上下文设置(尤其是安全设置)的影响。理想情况下,每当一个线程(初始线程)使用另一个线程(辅助线程)执行任务时,前者的执行上下文应该流向(复制到)辅助线程。这就确保了辅助线程执行的任何操作使用的是相同的安全设置和宿主设置。还确保了在初始线程的逻辑调用上下文中存储的任何数据都适用于辅助线程。默认情况下,CLR 自动造成初始线程的执行上下文 “流向” 任何辅助线程。这造成将上下文信息传给辅助线程,但这会对性能造成一定影响。这是因为执行上下文中包含大量信息,而收集所有这些信息,再把它们复制到辅助线程,要耗费不少时间。可用这个类阻止执行上下文流动以提升应用程序的性能。对于服务器应用程序,性能的提升可能非常显著。但客户端应用程序的性能提升不了多少。另外,由于 SuppressFlow 方法用 [SecurityCritical] 特性进行了标识,所以在某些客户端应用程序 (比如 Silverlight) 中是无法调用的。当然,只有在辅助线程不需要或者不访问上下文信息时,才应阻止执行上下文的流动。当然,只有在辅助线程不需要或者不访问上下文信息时,才应阻止执行上下文的流动。如果初始线程的执行上下文不流向辅助线程,辅助线程会使用上一次和它关联的任意执行上下文。在这种情况下,辅助线程不应执行任何要依赖于执行上下文状态 (不如用户的 Windows 身份) 的代码。

# Cooperative Cancellation and Timeout

The Microsoft .NET Framework offers a standard pattern for canceling operations. This pattern is cooperative, meaning that the operation that you want to cancel has to explicitly support being canceled. In other words, the code performing the operation that you want to cancel and the code that attempts to cancel the operation must both use the types mentioned in this section. It is nice when long-running compute-bound operations offer cancellation, so you should consider adding cancellation to your own compute-bound operations. In this section, I’ll explain how to accomplish this. But, first, let me explain the two main types provided in the Framework Class Library (FCL) that are part of the standard cooperative cancellation pattern.

To cancel an operation, you must first create a System.Threading.CancellationTokenSource object. This class looks like this.

public sealed class CancellationTokenSource : IDisposable { // A reference type
 public CancellationTokenSource();
 public Boolean IsCancellationRequested { get; }
 public CancellationToken Token { get; }
 public void Cancel(); // Internally, calls Cancel passing false
 public void Cancel(Boolean throwOnFirstException);
 ...
}

This object contains all the states having to do with managing cancellation. After constructing a CancellationTokenSource (a reference type), one or more CancellationToken (a value type) instances can be obtained by querying its Token property and passed around to your operations that allow themselves to be canceled. Here are the most useful members of the CancellationToken value type.

public struct CancellationToken { // A value type
 public static CancellationToken None { get; } // Very convenient
 public Boolean IsCancellationRequested { get; } // Called by non-Task invoked operations
 public void ThrowIfCancellationRequested(); // Called by Task-invoked operations
 // WaitHandle is signaled when the CancellationTokenSource is canceled
 public WaitHandle WaitHandle { get; } 
 // GetHashCode, Equals, operator== and operator!= members are not shown
 public Boolean CanBeCanceled { get; } // Rarely used
 public CancellationTokenRegistration Register(Action<Object> callback, Object state,
 Boolean useSynchronizationContext); // Simpler overloads not shown
}

A CancellationToken instance is a lightweight value type because it contains a single private field: a reference to its CancellationTokenSource object. A compute-bound operation’s loop can periodically call CancellationToken’s IsCancellationRequested property to know if the loop should terminate early, thereby ending the compute-bound operation. Of course, the benefit here is that CPU time is no longer being wasted on an operation whose result you know you’re not interested in. Now, let me put all this together with some sample code.

internal static class CancellationDemo {
 public static void Main() {
 CancellationTokenSource cts = new CancellationTokenSource();
 // Pass the CancellationToken and the number-to-count-to into the operation
 ThreadPool.QueueUserWorkItem(o => Count(cts.Token, 1000));
 Console.WriteLine("Press <Enter> to cancel the operation.");
 Console.ReadLine();
 cts.Cancel(); // If Count returned already, Cancel has no effect on it
 // Cancel returns immediately, and the method continues running here...
 
 Console.ReadLine();
 }
 private static void Count(CancellationToken token, Int32 countTo) {
 for (Int32 count = 0; count <countTo; count++) {
 if (token.IsCancellationRequested) {
 Console.WriteLine("Count is cancelled");
 break; // Exit the loop to stop the operation
 }
 Console.WriteLine(count);
 Thread.Sleep(200); // For demo, waste some time
 }
 Console.WriteLine("Count is done");
 }
}

💡注意:要执行一个不允许被取消的操作,可向该操作传递通过调用 CancellationToken 的静态 None 属性而返回的 CancellationToken 。该属性返回一个特殊的 CancellationToken 实例,它不和任何 CancellationTokenSource 对象关联 (实例的私有字段为 null )。由于没有 CancellationTokenSource ,所以没有代码能调用 Cancel 。一个操作如果查询这个特殊 CancellationTokenIsCancellationRequested 属性,将总是返回 false 。使用某个特殊 CancellationToken 实例查询 CancellationTokenCanBeCanceled 属性,属性会返回 false 。相反,对于通过查询 CancellationTokenSource 对象的 Token 属性而获得的其他所有 CancellationToken 实例,该属性 ( CancellationToken ) 都会返回 true

If you’d like, you can call CancellationTokenSource’s Register method to register one or more methods to be invoked when a CancellationTokenSource is canceled. To this method, you pass an Action delegate, a state value that will be passed to the callback via the delegate, and a Boolean indicating whether or not to invoke the delegate by using the calling thread’s SynchronizationContext. If you pass false for the useSynchronizationContext parameter, then the thread that calls Cancel will invoke all the registered methods sequentially. If you pass true for the useSynchronizationContext parameter, then the callbacks are sent (as opposed to posted) to the captured SynchronizationContext object that governs which thread invokes the callback. The SynchronizationContext class is discussed more in the “Applications and Their Threading Models” section in Chapter 28.

💡注意:向被取消的 CancellationTokenSource 登记一个回调方法,将由调用 Register 的线程调用回调方法 (如果为 useSynchronizationContext 参数传递了 true 值,就可能要通过调用线程的 SynchronizationContext 进行)。

If Register is called multiple times, then multiple callback methods will be invoked. These callback methods could throw an unhandled exception. If you call CancellationTokenSource’s Cancel, passing it true, then the first callback method that throws an unhandled exception stops the other callback methods from executing, and the exception thrown will be thrown from Cancel as well. If you call Cancel passing it false, then all registered callback methods are invoked. Any unhandled exceptions that occur are added to a collection. After all callback methods have executed, if any of them threw an unhandled exception, then Cancel throws an AggregateException with its InnerExceptions property set to the collection of exception objects that were thrown. If no registered callback methods threw an unhandled exception, then Cancel simply returns without throwing any exception at all.

💡重要提示:没有办法将 AggregateExceptionInnerExceptions 集合中的一个异常对象和特的操作对应起来;你只知道某个操作出错,并通过异常类型知道出了什么错。要跟踪错误的具体位置,需要检查异常对象的 StackTrace 属性,并手动扫描你的源代码。

CancellationToken’s Register method returns a CancellationTokenRegistration, which looks like this.

public struct CancellationTokenRegistration : 
 IEquatable<CancellationTokenRegistration>, IDisposable {
 public void Dispose();
 // GetHashCode, Equals, operator== and operator!= members are not shown
}

You can call Dispose to remove a registered callback from the CancellationTokenSource that it is associated with so that it does not get invoked when calling Cancel. Here is some code that demonstrates registering two callbacks with a single CancellationTokenSource.

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Canceled 1"));
cts.Token.Register(() => Console.WriteLine("Canceled 2"));
// To test, let's just cancel it now and have the 2 callbacks execute
cts.Cancel();

When I run this code, I get the following output as soon as the Cancel method is called.

Canceled 2
Canceled 1

Finally, you can create a new CancellationTokenSource object by linking a bunch of other CancellationTokenSource objects. This new CancellationTokenSource object will be canceled when any of the linked CancellationTokenSource objects are canceled. The following code demonstrates.

// Create a CancellationTokenSource
var cts1 = new CancellationTokenSource();
cts1.Token.Register(() => Console.WriteLine("cts1 canceled"));
// Create another CancellationTokenSource
var cts2 = new CancellationTokenSource();
cts2.Token.Register(() => Console.WriteLine("cts2 canceled"));
// Create a new CancellationTokenSource that is canceled when cts1 or ct2 is canceled
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token);
linkedCts.Token.Register(() => Console.WriteLine("linkedCts canceled"));
// Cancel one of the CancellationTokenSource objects (I chose cts2)
cts2.Cancel();
// Display which CancellationTokenSource objects are canceled
Console.WriteLine("cts1 canceled={0}, cts2 canceled={1}, linkedCts canceled={2}",
 cts1.IsCancellationRequested, cts2.IsCancellationRequested, 
linkedCts.IsCancellationRequested);

When I run the preceding code, I get the following output.

linkedCts canceled
cts2 canceled
cts1 canceled=False, cts2 canceled=True, linkedCts canceled=True

It is often valuable to cancel an operation after a period of time has elapsed. For example, imagine a server application that starts computing some work based on a client request. But the server application needs to respond to the client within two seconds, no matter what. In some scenarios, it is better to respond in a short period of time with an error or with partially computed results as opposed to waiting a long time for a complete result. Fortunately, CancellationTokenSource gives you a way to have it self-cancel itself after a period of time. To take advantage of this, you can either construct a CancellationTokenSource object by using one of the constructors that accepts a delay, or you can call CancellationTokenSource’s CancelAfter method.

public sealed class CancellationTokenSource : IDisposable { // A reference type
 public CancellationTokenSource(Int32 millisecondsDelay);
 public CancellationTokenSource(TimeSpan delay);
 public void CancelAfter(Int32 millisecondsDelay);
 public void CancelAfter(TimeSpan delay);
 ...
}

💡小结:Microsoft .NET Framework 提供了标准的取消操作模式。这个模式是协作式的,意味着要取消的操作必须显式支持取消。换言之,无论执行操作的代码,还是试图取消操作的代码,还是试图取消操作的代码,都必须使用本节提到的类型。这个对象包含了和管理取消有关的所有状态。构造好一个 CancellationTokenSource (一个引用类型) 之后,可从它的 Token 属性获得一个或多个 CancellationToken (一个值类型) 实例,并传给你的操作,使操作可以取消。CancellationToken 实例是轻量级值类型,包含单个私有字段,即对其 CancellationTokenSource 对象的引用。在计算限制操作的循环中,可定时调用 CancellationTokenIsCancellationRequested 属性,了解循环是否应该提前终止,从而终止计算限制的操作。如果愿意,可调用 CancellationTokenSourceRegister 方法登记一个或多个在取消一个 CancellationTokenSource 时调用的方法。要向方法传递一个 Action<Object> 委托;一个要通过委托传给回到 (方法) 的状态值;以及一个 Boolean 值 (名为 useSynchronizationContext ),该值指明是否要使用调用线程的 SynchronizationContext 来调用委托。如果为 useSynchronizationContext 参数传递 false ,那么调用 Cancel 的线程会顺序调用已登记的所有方法。为 useSynchronizationContext 参数传递 true ,则回调 (方法) 会被 send (而不是 post) 给已捕捉的 SynchronizationContext 对象,后者决定由哪个线程调用回调 (方法)。多次调用 Register ,多个回调方法都会调用。这些回调方法可能抛出未处理的异常。如果调用 CancellationTokenSourceCancel 方法,向它传递 true ,那么抛出了未处理异常的第一个回调方法会阻止其他回调方法的执行,抛出的异常也会从 Cancel 中抛出。如果调用 Cancel 并向它传递 false ,那么登记的所有回调方法都会调用。所有未处理的异常都会添加到一个集合中。所有回调方法都执行好后,其中任何一个抛出了未处理的异常, Cancel 就会抛出一个 AggregateException ,该异常实例的 InnerExceptions 属性被设为已抛出的所有异常对象的集合。如果登记的所有回调方法都没有抛出未处理的异常,那么 Cancel 直接返回,不抛出任何异常。 CancellationTokenRegister 方法返回一个 CancellationTokenRegistration ,可以调用 Dispose 从关联的 CancellationTokenSource 中删除已登记的回调;这样一来,在调用 Cancel 时,便不会再调用这个回调。可以通过链接另一组 CancellationTokenSource 来新建一个 CancellationTokenSource 对象。任何一个链接的 CancellationTokenSource 被取消,这个新的 CancellationTokenSource 对象就会被取消。 CancellationTokenSource 提供了在指定时间后自动取消的机制。为了利用这个机制,要么用接受延时参数的构造构造一个 CancellationTokenSource 对象,要么调用 CancellationTokenSourceCancelAfter 方法。

# Tasks

Calling ThreadPool’s QueueUserWorkItem method to initiate an asynchronous compute-bound operation is very simple. However, this technique has many limitations. The biggest problem is that there is no built-in way for you to know when the operation has completed, and there is no way to get a return value back when the operation completes. To address these limitations and more, Microsoft introduced the concept of tasks, and you use them via types in the System.Threading.Tasks namespace.

So, instead of calling ThreadPool’s QueueUserWorkItem method, you can do the same via tasks.

ThreadPool.QueueUserWorkItem(ComputeBoundOp, 5); // Calling QueueUserWorkItem
new Task(ComputeBoundOp, 5).Start(); // Equivalent of preceding using Task
Task.Run(() => ComputeBoundOp(5)); // Another equivalent

In the second line of preceding code, I am creating the Task object and then immediately calling Start to schedule the task to run. Naturally, you can create the Task object and then call Start on it later. You could imagine code that creates a Task object and then passes it to some other method that decides when to call Start to schedule the task. Because it is common to create a Task object and then immediately call Start on it, you can call Task’s convenient static Run method as shown on the last line of the preceding code.

When creating a Task, you call a constructor, passing it an Action or an Action delegate that indicates the operation that you want performed. If you pass a method that expects an Object, then you must also pass to Task’s constructor the argument that you ultimately want passed to the operation. When calling Run, you can pass it an Action or Func delegate indicating the operation you want performed. When calling a constructor or when calling Run, you can optionally pass a CancellationToken, which allows the Task to be canceled before it has been scheduled (see the “Canceling a Task” section later in this chapter).

You can also optionally pass to the constructor some TaskCreationOptions flags that control how the Task executes. TaskCreationOptions is an enumerated type defining a set of flags that you can bitwise-OR together. It is defined as follows.

[Flags, Serializable]
public enum TaskCreationOptions {
 None = 0x0000,// The default
 // Hints to the TaskScheduler that you want this task to run sooner than later.
 PreferFairness = 0x0001,
 // Hints to the TaskScheduler that it should more aggressively create thread pool threads.
 LongRunning = 0x0002,
 // Always honored: Associates a Task with its parent Task (discussed shortly)
 AttachedToParent = 0x0004,
 // If a task attempts to attach to this parent task, it is a normal task, not a child task.
 DenyChildAttach = 0x0008,
 // Forces child tasks to use the default scheduler as opposed to the parent’s scheduler.
 HideScheduler = 0x0010
}

Some of these flags are hints that may or may not be honored by the TaskScheduler that is being used to schedule a Task; the AttachedToParent, DenyChildAttach, and HideScheduler flags are always honored, because they have nothing to do with the TaskScheduler itself. TaskScheduler objects are discussed later in the “Task Schedulers” section.

# Waiting for a Task to Complete and Getting Its Result

With tasks, it is also possible to wait for them to complete and then get their result. Let’s say that we have a Sum method that is computationally intensive if n is a large value.

private static Int32 Sum(Int32 n) {
 Int32 sum = 0;
 for (; n > 0; n--) 
 checked { sum += n; } // if n is large, this will throw System.OverflowException
 return sum;
}

We can now construct a Task object (which is derived from Task), and we pass for the generic TResult argument the compute-bound operation’s return type. Now, after starting the task, we can wait for it to complete and then get its result by using the following code.

// Create a Task (it does not start running now)
Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 1000000000);
// You can start the task sometime later
t.Start(); 
// Optionally, you can explicitly wait for the task to complete
t.Wait(); // FYI: Overloads exist accepting timeout/CancellationToken
// You can get the result (the Result property internally calls Wait)
Console.WriteLine("The Sum is: " + t.Result); // An Int32 value

💡重要提示:线程调用 Wait 方法时,系统检查线程要等待的 Task 是否已开始执行。如果是,调用 Wait 的线程来执行 Task 。在这种情况下,调用 Wait 的线程不会阻塞;它会执行 Task 并立即返回。好处在于,没有线程会被阻塞,所以减少了对资源的占用 (因为不需要创建一个线程来替代被阻塞的线程),并提升了性能 (因为不需要花时间创建线程,也没有上下文切换)。不好的地方在于,假如线程在调用 Wait 前已获得了一个线程同步锁,而 Task 试图获取同一个锁,就会造成死锁的线程!

If the compute-bound task throws an unhandled exception, the exception will be swallowed, stored in a collection, and the thread pool thread is allowed to return to the thread pool. When the Wait method or the Result property is invoked, these members will throw a System.AggregateException object.

The AggregateException type is used to encapsulate a collection of exception objects (which can happen if a parent task spawns multiple child tasks that throw exceptions). It contains an InnerExceptions property that returns a ReadOnlyCollection object. Do not confuse the InnerExceptions property with the InnerException property, which the AggregateException class inherits from the System.Exception base class. For the preceding example, element 0 of AggregateException’s InnerExceptions property would refer to the actual System.OverflowException object thrown by the compute-bound method (Sum).

As a convenience, AggregateException overrides Exception’s GetBaseException method. AggregateException’s implementation returns the innermost AggregateException that is the root cause of the problem (assuming that there is just one innermost exception in the collection). AggregateException also offers a Flatten method that creates a new AggregateException, whose InnerExceptions property contains a list of exceptions produced by walking the original AggregateException’s inner exception hierarchy. Finally, AggregateException also provides a Handle method that invokes a callback method for each exception contained in the AggregateException. The callback can then decide, for each exception, how to handle the exception; the callback returns true to consider the exception handled and false if not. If, after calling Handle, at least one exception is not handled, then a new AggregateException object is created containing just the unhandled exceptions and the new AggregateException object is thrown. Later in this chapter, I show examples using the Flatten and Handle methods.

💡重要提示:如果一致不调用 WaitResult ,或者一直不查询 TaskException 属性,代码就一直注意不到这个异常的发生。这当然不好,因为程序遇到了未预料到的问题,而你居然没注意到。为了帮助你检测没有被注意到。为了帮助你检测没有被注意到的异常,可以向 TaskScheduler 的静态 UnobservedTaskException 事件登记一个回调方法。每次放一个 Task 被垃圾回收时,如果存在一个没有被注意到的异常,CLR 的终结器线程就会引发这个事件。一旦引发,就会向你的事件处理方法传递一个 UnobservedTaskExceptionEventArgs 对象,其中包含你没有注意到的 AggregateException

In addition to waiting for a single task, the Task class also offers two static methods that allow a thread to wait on an array of Task objects. Task’s static WaitAny method blocks the calling thread until any of the Task objects in the array have completed. This method returns an Int32 index into the array indicating which Task object completed, causing the thread to wake and continue running. The method returns -1 if the timeout occurs and throws an OperationCanceledException if WaitAny is canceled via a CancellationToken.

Similarly, the Task class has a static WaitAll method that blocks the calling thread until all the Task objects in the array have completed. The WaitAll method returns true if all the Task objects complete and false if a timeout occurs; an OperationCanceledException is thrown if WaitAll is canceled via a CancellationToken.

# Canceling a Task

Of course, you can use a CancellationTokenSource to cancel a Task. First, we must revise our Sum method so that it accepts a CancellationToken.

private static Int32 Sum(CancellationToken ct, Int32 n) {
 Int32 sum = 0;
 for (; n > 0; n--) {
 // The following line throws OperationCanceledException when Cancel 
 // is called on the CancellationTokenSource referred to by the token
 ct.ThrowIfCancellationRequested();
 checked { sum += n; } // if n is large, this will throw System.OverflowException
 }
 return sum;
}

In this code, the compute-bound operation’s loop periodically checks to see if the operation has been canceled by calling CancellationToken’s ThrowIfCancellationRequested method. This method is similar to CancellationToken’s IsCancellationRequested property shown earlier in the “Cooperative Cancellation and Timeout” section. However, ThrowIfCancellationRequested throws an OperationCanceledException if the CancellationTokenSource has been canceled. The reason for throwing an exception is because, unlike work items initiated with ThreadPool’s QueueUserWorkItem method, tasks have the notion of having completed and a task can even return a value. So, there needs to be a way to distinguish a completed task from a canceled task, and having the task throw an exception lets you know that the task did not run all the way to completion.

Now, we will create the CancellationTokenSource and Task objects as follows.

CancellationTokenSource cts = new CancellationTokenSource();
Task<Int32> t = Task.Run(() => Sum(cts.Token, 1000000000), cts.Token);
// Sometime later, cancel the CancellationTokenSource to cancel the Task
cts.Cancel(); // This is an asynchronous request, the Task may have completed already
try {
 // If the task got canceled, Result will throw an AggregateException
 Console.WriteLine("The sum is: " + t.Result); // An Int32 value
}
catch (AggregateException x) {
 // Consider any OperationCanceledException objects as handled. 
 // Any other exceptions cause a new AggregateException containing
 // only the unhandled exceptions to be thrown
 x.Handle(e => e is OperationCanceledException);
 // If all the exceptions were handled, the following executes
 Console.WriteLine("Sum was canceled");
}

When creating a Task, you can associate a CancellationToken with it by passing it to Task’s constructor (as shown in the preceding code). If the CancellationToken gets canceled before the Task is scheduled, the Task gets canceled and never executes at all.2 But if the Task has already been scheduled (by calling the Start method), then the Task’s code must explicitly support cancellation if it allows its operation to be canceled while executing. Unfortunately, while a Task object has a CancellationToken associated with it, there is no way to access it, so you must somehow get the same CancellationToken that was used to create the Task object into the Task’s code itself. The easiest way to write this code is to use a lambda expression and “pass” the CancellationToken as a closure variable (as I’ve done in the previous code example).

# Starting a New Task Automatically When Another Task Completes

In order to write scalable software, you must not have your threads block. This means that calling Wait or querying a task’s Result property when the task has not yet finished running will most likely cause the thread pool to create a new thread, which increases resource usage and hurts performance. Fortunately, there is a better way to find out when a task has completed running. When a task completes, it can start another task. Here is a rewrite of the earlier code that doesn’t block any threads.

// Create and start a Task, continue with another task
Task<Int32> t = Task.Run(() => Sum(CancellationToken.None, 10000));
// ContinueWith returns a Task but you usually don't care
Task cwt = t.ContinueWith(task => Console.WriteLine("The sum is: " + task.Result));

Now, when the task executing Sum completes, this task will start another task (also on some thread pool thread) that displays the result. The thread that executes the preceding code does not block waiting for either of these two tasks to complete; the thread is allowed to execute other code or, if it is a thread pool thread itself, it can return to the pool to perform other operations. Note that the task executing Sum could complete before ContinueWith is called. This will not be a problem because the ContinueWith method will see that the Sum task is complete and it will immediately start the task that displays the result.

Also, note that ContinueWith returns a reference to a new Task object (which my code placed in the cwt variable). Of course, you can invoke various members (like Wait, Result, or even ContinueWith) using this Task object, but usually you will ignore this Task object and will not save a reference to it in a variable.

I should also mention that Task objects internally contain a collection of ContinueWith tasks. So you can actually call ContinueWith several times using a single Task object. When the task completes, all the ContinueWith tasks will be queued to the thread pool. In addition, when calling ContinueWith, you can specify a bitwise OR’d set of TaskContinuationOptions. The first six flags—None, PreferFairness, LongRunning, AttachedToParent, DenyChildAttach, and HideScheduler—are identical to the flags offered by the TaskCreationOptions enumerated type shown earlier. Here is what the TaskContinuationOptions type looks like.

[Flags, Serializable]
public enum TaskContinuationOptions {
 None = 0x0000,// The default
 // Hints to the TaskScheduler that you want this task to run sooner than later.
 PreferFairness = 0x0001,
 // Hints to the TaskScheduler that it should more aggressively create thread pool threads.
 LongRunning = 0x0002,
 // Always honored: Associates a Task with its parent Task (discussed shortly)
 AttachedToParent = 0x0004,
 // If a task attempts to attach to this parent task, an InvalidOperationException is thrown.
 DenyChildAttach = 0x0008,
 // Forces child tasks to use the default scheduler as opposed to the parent’s scheduler.
 HideScheduler = 0x0010,
 // Prevents completion of the continuation until the antecedent has completed.
 LazyCancellation = 0x0020,
 // This flag indicates that you want the thread that executed the first task to also
 // execute the ContinueWith task. If the first task has already completed, then the
 // thread calling ContinueWith will execute the ContinueWith task.
 ExecuteSynchronously = 0x80000,
 // These flags indicate under what circumstances to run the ContinueWith task
 NotOnRanToCompletion = 0x10000,
 NotOnFaulted = 0x20000,
 NotOnCanceled = 0x40000,
 // These flags are convenient combinations of the above three flags
 OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted,
 OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled,
 OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled,
}

When you call ContinueWith, you can indicate that you want the new task to execute only if the first task is canceled by specifying the TaskContinuationOptions.OnlyOnCanceled flag. Similarly, you have the new task execute only if the first task throws an unhandled exception using the TaskContinuationOptions.OnlyOnFaulted flag. And, of course, you can use the TaskContinuationOptions.OnlyOnRanToCompletion flag to have the new task execute only if the first task runs all the way to completion without being canceled or throwing an unhandled exception. By default, if you do not specify any of these flags, then the new task will run regardless of how the first task completes. When a Task completes, any of its continue-with tasks that do not run are automatically canceled. Here is an example that puts all of this together.

// Create and start a Task, continue with multiple other tasks
Task<Int32> t = Task.Run(() => Sum(10000)); 
// Each ContinueWith returns a Task but you usually don't care
t.ContinueWith(task => Console.WriteLine("The sum is: " + task.Result),
 TaskContinuationOptions.OnlyOnRanToCompletion);
t.ContinueWith(task => Console.WriteLine("Sum threw: " + task.Exception.InnerException), 
 TaskContinuationOptions.OnlyOnFaulted);
t.ContinueWith(task => Console.WriteLine("Sum was canceled"), 
 TaskContinuationOptions.OnlyOnCanceled);

# A Task May Start Child Tasks

Finally, tasks support parent/child relationships, as demonstrated by the following code.

Task<Int32[]> parent = new Task<Int32[]>(() => {
 var results = new Int32[3]; // Create an array for the results
 // This tasks creates and starts 3 child tasks
 new Task(() => results[0] = Sum(10000), TaskCreationOptions.AttachedToParent).Start();
 new Task(() => results[1] = Sum(20000), TaskCreationOptions.AttachedToParent).Start();
 new Task(() => results[2] = Sum(30000), TaskCreationOptions.AttachedToParent).Start();
 // Returns a reference to the array (even though the elements may not be initialized yet)
 return results; 
});
// When the parent and its children have run to completion, display the results
var cwt = parent.ContinueWith(
 parentTask => Array.ForEach(parentTask.Result, Console.WriteLine));
// Start the parent Task so it can start its children
parent.Start();

Here, the parent task creates and starts three Task objects. By default, Task objects created by another task are top-level tasks that have no relationship to the task that creates them. However, the TaskCreationOptions.AttachedToParent flag associates a Task with the Task that creates it so that the creating task is not considered finished until all its children (and grandchildren) have finished running. When creating a Task by calling the ContinueWith method, you can make the continuewith task be a child by specifying the TaskContinuationOptions.AttachedToParent flag.

# Inside a Task

Each Task object has a set of fields that make up the task’s state. There is an Int32 ID (see Task’s read-only Id property), an Int32 representing the execution state of the Task, a reference to the parent task, a reference to the TaskScheduler specified when the Task was created, a reference to the callback method, a reference to the object that is to be passed to the callback method (queryable via Task’s read-only AsyncState property), a reference to an ExecutionContext, and a reference to a ManualResetEventSlim object. In addition, each Task object has a reference to some supplementary state that is created on demand. The supplementary state includes a CancellationToken, a collection of ContinueWithTask objects, a collection of Task objects for child tasks that have thrown unhandled exceptions, and more. My point is that although tasks provide you a lot of features, there is some cost to tasks because memory must be allocated for all this state. If you don’t need the additional features offered by tasks, then your program will use resources more efficiently if you use ThreadPool.QueueUserWorkItem.

The Task and Task classes implement the IDisposable interface, allowing you to call Dispose when you are done with the Task object. Today, all the Dispose method does is close the ManualResetEventSlim object. However, it is possible to define classes derived from Task and Task, and these classes could allocate their own resources, which would be freed in their override of the Dispose method. I recommend that developers not explicitly call Dispose on a Task object in their code; instead, just let the garbage collector clean up any resources when it determines that they are no longer in use.

You’ll notice that each Task object contains an Int32 field representing a Task’s unique ID. When you create a Task object, the field is initialized to zero. Then the first time you query Task’s read-only Id property, the property assigns a unique Int32 value to this field and returns it from the property. Task IDs start at 1 and increment by 1 as each ID is assigned. Just looking at a Task object in the Microsoft Visual Studio debugger will cause the debugger to display the Task’s ID, forcing the Task to be assigned an ID.

The idea behind the ID is that each Task can be identified by a unique value. In fact, Visual Studio shows you these task IDs in its Parallel Tasks and Parallel Stacks windows. But because you don’t assign the IDs yourself in your code, it is practically impossible to correlate an ID number with what your code is doing. While running a task’s code, you can query Task’s static CurrentId property, which returns a nullable Int32 (Int32?). You can also call this from Visual Studio’s Watch window or Immediate window while debugging to get the ID for the code that you are currently stepping through. Then you can find your task in the Parallel Tasks/Stacks windows. If you query the CurrentId property while a task is not executing, it returns null.

During a Task object’s existence, you can learn where it is in its lifecycle by querying Task’s readonly Status property. This property returns a TaskStatus value that is defined as follows.

public enum TaskStatus {
 // These flags indicate the state of a Task during its lifetime:
 Created, // Task created explicitly; you can manually Start() this task
 WaitingForActivation,// Task created implicitly; it starts automatically
 WaitingToRun, // The task was scheduled but isn’t running yet
 Running, // The task is actually running
 // The task is waiting for children to complete before it considers itself complete
 WaitingForChildrenToComplete,
 // A task's final state is one of these:
 RanToCompletion,
 Canceled,
 Faulted
}

When you first construct a Task object, its status is Created. Later, when the task is started, its status changes to WaitingToRun. When the Task is actually running on a thread, its status changes to Running. When the task stops running and is waiting for any child tasks, the status changes to WaitingForChildrenToComplete. When a task is completely finished, it enters one of three final states: RanToCompletion, Canceled, or Faulted. When a Task runs to completion, you can query the task’s result via Task’s Result property. When a Task or Task faults, you can obtain the unhandled exception that the task threw by querying Task’s Exception property; which always returns an AggregateException object whose collection contains the set of unhandled exceptions.

For convenience, Task offers several read-only, Boolean properties: IsCanceled, IsFaulted, and IsCompleted. Note that IsCompleted returns true when the Task is in the RanToCompletion, Canceled, or Faulted state. The easiest way to determine if a Task completed successfully is to use code like the following.

if (task.Status == TaskStatus.RanToCompletion) ...

A Task object is in the WaitingForActivation state if that Task is created by calling one of these functions: ContinueWith, ContinueWhenAll, ContinueWhenAny, or FromAsync. A Task created by constructing a TaskCompletionSource object is also created in the WaitingForActivation state. This state means that the Task’s scheduling is controlled by the task infrastructure. For example, you cannot explicitly start a Task object that was created by calling ContinueWith. This Task will start automatically when its antecedent task has finished executing.

# Task Factories

Occasionally, you might want to create a bunch of Task objects that share the same configuration. To keep you from having to pass the same parameters to each Task’s constructor over and over again, you can create a task factory that encapsulates the common configuration. The System.Threading. Tasks namespace defines a TaskFactory type as well as a TaskFactory type. Both of these types are derived from System.Object; that is, they are peers of each other.

If you want to create a bunch of tasks that return void, then you will construct a TaskFactory. If you want to create a bunch of tasks that have a specific return type, then you will construct a TaskFactory where you pass the task’s desired return type for the generic TResult argument. When you create one of these task factory classes, you pass to its constructor the defaults that you want the tasks that the factory creates to have. Specifically, you pass to the task factory the CancellationToken, TaskScheduler, TaskCreationOptions, and TaskContinuationOptions settings that you want factory-created tasks to have.

Here is some sample code demonstrating the use of a TaskFactory.

Task parent = new Task(() => {
 var cts = new CancellationTokenSource();
 var tf = new TaskFactory<Int32>(cts.Token, TaskCreationOptions.AttachedToParent, 
 TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
 // This task creates and starts 3 child tasks
 var childTasks = new[] {
 tf.StartNew(() => Sum(cts.Token, 10000)),
 tf.StartNew(() => Sum(cts.Token, 20000)),
 tf.StartNew(() => Sum(cts.Token, Int32.MaxValue)) // Too big, throws OverflowException
 };
 // If any of the child tasks throw, cancel the rest of them
 for (Int32 task = 0; task < childTasks.Length; task++)
 childTasks[task].ContinueWith(
 t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);
 // When all children are done, get the maximum value returned from the 
 // non-faulting/canceled tasks. Then pass the maximum value to another 
 // task that displays the maximum result
 tf.ContinueWhenAll(
 childTasks, 
 completedTasks => 
 completedTasks.Where(t => t.Status == TaskStatus.RanToCompletion).Max(t => t.Result),
 CancellationToken.None)
 .ContinueWith(t =>Console.WriteLine("The maximum is: " + t.Result),
 TaskContinuationOptions.ExecuteSynchronously);
});
// When the children are done, show any unhandled exceptions too
parent.ContinueWith(p => {
 // I put all this text in a StringBuilder and call Console.WriteLine just once 
 // because this task could execute concurrently with the task above & I don't 
 // want the tasks' output interspersed
 StringBuilder sb = new StringBuilder(
 "The following exception(s) occurred:" + Environment.NewLine);
 foreach (var e in p.Exception.Flatten().InnerExceptions) 
 sb.AppendLine(" "+ e.GetType().ToString());
 Console.WriteLine(sb.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
// Start the parent Task so it can start its children
parent.Start();

With this code, I am creating a TaskFactory object that I will use to create three Task objects. I want to configure the child tasks all the same way: each Task object shares the same CancellationTokenSource token, tasks are considered children of their parent, all continue-with tasks created by the TaskFactory execute synchronously, and all Task objects created by this TaskFactory use the default TaskScheduler.

Then I create an array consisting of the three child Task objects, all created by calling TaskFactory’s StartNew method. This method conveniently creates and starts each child task. In a loop, I tell each child task that throws an unhandled exception to cancel all the other child tasks that are still running. Finally, using the TaskFactory, I call ContinueWhenAll, which creates a Task that runs when all the child tasks have completed running. Because this task is created with the TaskFactory, it will also be considered a child of the parent task and it will execute synchronously using the default TaskScheduler. However, I want this task to run even if the other child tasks were canceled, so I override the TaskFactory’s CancellationToken by passing in CancellationToken. None, which prevents this task from being cancelable at all. Finally, when the task that processes all the results is complete, I create another task that displays the highest value returned from all the child tasks.

💡注意:调用 TaskFactoryTaskFactory<TResult> 的静态 ContinueWhenAllContinueWhenAny 方法时,以下 TaskContinuationOption 标志是非法的: NotOnRanToCompletionNotOnFaultedNotOnCanceled 。当然,基于这些标志组合起来的标志 ( OnlyOnCanceledOnlyOnFaultedOnlyOnRanToCompletion ) 也是非法的。也就是说,无论前置任务是如何完成的, ContinueWhenAllContinueWhenAny 都会执行延续任务。

# Task Schedulers

The task infrastructure is very flexible, and TaskScheduler objects are a big part of this flexibility. A TaskScheduler object is responsible for executing scheduled tasks and also exposes task information to the Visual Studio debugger. The FCL ships with two TaskScheduler-derived types: the thread pool task scheduler and a synchronization context task scheduler. By default, all applications use the thread pool task scheduler. This task scheduler schedules tasks to the thread pool’s worker threads and is discussed in more detail in this chapter’s “How the Thread Pool Manages Its Threads” section. You can get a reference to the default task scheduler by querying TaskScheduler’s static Default property.

The synchronization context task scheduler is typically used for applications sporting a graphical user interface, such as Windows Forms, Windows Presentation Foundation (WPF), Silverlight, and Windows Store applications. This task scheduler schedules all tasks onto the application’s GUI thread so that all the task code can successfully update UI components like buttons, menu items, and so on. The synchronization context task scheduler does not use the thread pool at all. You can get a reference to a synchronization context task scheduler by querying TaskScheduler’s static FromCurrentSynchronizationContext method.

Here is a simple Windows Forms application that demonstrates the use of the synchronization context task scheduler.

internal sealed class MyForm : Form {
 private readonly TaskScheduler m_syncContextTaskScheduler;
 public MyForm() {
 // Get a reference to a synchronization context task scheduler
 m_syncContextTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
 Text = "Synchronization Context Task Scheduler Demo";
 Visible = true; Width = 600; Height = 100;
 }
 private CancellationTokenSource m_cts;
 protected override void OnMouseClick(MouseEventArgs e) {
 if (m_cts != null) { // An operation is in flight, cancel it
 m_cts.Cancel();
 m_cts = null;
 } else { // An operation is not in flight, start it
 Text = "Operation running";
 m_cts = new CancellationTokenSource();
 // This task uses the default task scheduler and executes on a thread pool thread
 Task<Int32> t = Task.Run(() => Sum(m_cts.Token, 20000), m_cts.Token);
 // These tasks use the sync context task scheduler and execute on the GUI thread
 t.ContinueWith(task => Text = "Result: " + task.Result, 
 CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion,
 m_syncContextTaskScheduler);
 t.ContinueWith(task => Text = "Operation canceled", 
 CancellationToken.None, TaskContinuationOptions.OnlyOnCanceled,
 m_syncContextTaskScheduler);
 t.ContinueWith(task => Text = "Operation faulted",
 CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted,
 m_syncContextTaskScheduler);
 }
 base.OnMouseClick(e);
 }
}

When you click in the client area of this form, a compute-bound task will start executing on a thread pool thread. This is good because the GUI thread is not blocked during this time and can therefore respond to other UI operations. However, the code executed by the thread pool thread should not attempt to update UI components or else an InvalidOperationException will be thrown.

When the compute-bound task is done, one of the three continue-with tasks will execute. These tasks are all issued against the synchronization context task scheduler corresponding to the GUI thread, and this task scheduler queues the tasks to the GUI thread, allowing the code executed by these tasks to update UI components successfully. All of these tasks update the form’s caption via the inherited Text property.

Because the compute-bound work (Sum) is running on a thread pool thread, the user can interact with the UI to cancel the operation. In my simple code example, I allow the user to cancel the operation by clicking in the form’s client area while an operation is running.

You can, of course, define your own class derived from TaskScheduler if you have special task scheduling needs. Microsoft has provided a bunch of sample code for tasks and includes the source code for a bunch of task schedulers in the Parallel Extensions Extras package, which can be downloaded from here: http://code.msdn.microsoft.com/ParExtSamples. Here are some of the task schedulers included in this package:

  • IOTaskScheduler This task scheduler queues tasks to the thread pool’s I/O threads instead of its worker threads.

  • LimitedConcurrencyLevelTaskScheduler This task scheduler allows no more than n (a constructor parameter) tasks to execute simultaneously.

  • OrderedTaskScheduler This task scheduler allows only one task to execute at a time. This class is derived from LimitedConcurrencyLevelTaskScheduler and just passes 1 for n.

  • PrioritizingTaskScheduler This task scheduler queues tasks to the CLR’s thread pool. After this has occurred, you can call Prioritize to indicate that a Task should be processed before all normal tasks (if it hasn’t been processed already). You can call Deprioritize to make a Task be processed after all normal tasks.

  • ThreadPerTaskScheduler This task scheduler creates and starts a separate thread for each task; it does not use the thread pool at all.

💡小结:很容易调用 ThreadPoolQueueUserWorkItem 方法发起一次异步的计算限制操作。但这个技术有许多限制。最大的问题是没有内建的机制让你知道操作在什么时候完成,也没有机制在操作完成时获得返回值。为了克服这些限制 (并解决其他一些问题),Microsoft 引入了任务的概念。我们通过 System.Threading.Tasks 命名空间中的类型来使用任务。无论调用构造器还是 Run ,都可选择传递一个 CancellationToken ,它使 Task 能在调度前取消。还可选择向构造器传递一些 TaskCreationOptions 标志来控制 Task 的执行方式。 TaskCreationOptions 枚举类型还定义了一组可按位 OR 的标志。有的标志只是 “提议”, TaskScheduler 在调度一个 Task 时,可能会、也可能不会采纳这些提议。不过, AttachedToParentDenyChildAttachHideScheduler 总是得以采纳,因为它们和 TaskScheduler 本身无关。如果计算限制的任务抛出未处理的异常,异常会被 “吞噬” 并存储到一个集合中,而线程池线程可以返回到线程池中。调用 Wait 方法或者 Result 属性时,这些成员会抛出一个 System.AggregateException 对象。 AggregateException 类型封装了异常对象的一个集合 (如果父任务生成了多个子任务,而多个子任务都抛出了异常,这个集合便可能包含多个异常)。该类型的 InnerExceptions 属性返回一个 ReadOnlyCollection<Exception> 对象。不要混淆 InnerExceptions 属性和 InnerException 属性,后者是 AggregateException 类从 System.Exception 基类继承的。除了等待单个任务, Task 类还提供了两个静态方法,允许线程等待一个 Task 对象数组。其中, Task 的静态 WaitAny 方法会阻塞调用线程,直到数组中的任何 Task 对象完成。方法返回 Int32 数组索引值,指明完成的是哪个 Task 对象。方法返回后,线程被唤醒并继续运行。如果发生超时,方法将返回 -1 。如果 WaitAny 通过一个 CancellationToken 取消,会抛出一个 OperationCanceledException 。类似地, Task 类还有一个静态 WaitAll 方法,它阻塞调用线程,直到数组中的所有 Task 对象完成。如果所有 Task 对象都完成, WaitAll 方法返回 true 。发生超时则返回 false 。如果 WaitAll 通过一个 CancellationToken 取消,会抛出一个 OperationCanceledException 。可用一个 CancellationTokenSource 取消 TaskCancellationTokenThrowIfCancellationRequested 方法定时检查操作是否已取消。这个方法与 CancellationTokenIsCancellationRequested 属性相似。如果 CancellationTokenSource 已经取消, ThrowIfCancellationRequested 会抛出一个 OperationCanceledException 。之所以选择抛出异常,是因为和 ThreadPoolQueueUserWorkItem 方法初始化的工作项不同,任务有办法表示完成,任务甚至能返回一个值。所以,需要采取一种方式将已完成的任务和出错的任务区分开。而让任务抛出异常,就可以知道任务没有一直运行到结束。可在创建 Task 时将一个 CancellationToken 传给构造器 (如上例所示),从而将两者关联。如果 CancellationTokenTask 调度前取消, Task 会被取消,永远都不执行。但如果 Task 已调度 (通过调用 Start 方法),那么 Task 的代码只有显示支持取消,其操作才能在执行期间取消。遗憾的是,虽然 Task 对象关联了一个 CancellationToken ,但却没有办法访问它。因此,必须在 Task 的代码中获得创建 Task 对象时的同一个 CancellationToken 。为此,最简单的办法就是使用一个 lambda 表达式,将 CancellationToken 作为闭包变量 “传递”。伸缩性好的软件不应该使线程阻塞。调用 Wait ,或者在任务尚未完成时查询任务的 Result 属性,极有可能造成线程池创建新线程,这增大了资源的消耗,也不利于性能和伸缩性。幸好,有更好的办法可以知道一个任务在什么时候结束运行。任务完成时可启动另一个任务。线程不会进入阻塞状态并等待这两个任务中的任何一个完成。相反,线程可以执行其他代码。如果线程本身就是一个线程池线程,它可以返回池中以执行其他操作。另外, Task 对象内部包含了 ContinueWith 任务的一个集合。所以,实际可以用一个 Task 对象来多次调用 ContinueWith 。任务完成时,所有 ContinueWith 任务都会进入线程池的队列中。此外,可在调用 ContinueWith 时传递对一组 TaskContinuationOptions 枚举值进行按位 OR 运算的结果。前 6 个标志 ( NonePreferFairnessLongRunningAttachedToParentDenyChildAttachHideScheduler ) 与之前描述的 TaskCreationOptions 枚举类型提供的标志完全一致。默认情况下,如果不指定上述任何标志,则新任务无论如何都会运行,不管第一个任务如何完成。一个 Task 完成时,它的所有未运行的延续任务都被自动取消。一个任务创建的一个或多个 Task 对象默认是顶级任务,它们与创建它们的任务无关。但 TaskCreationOptions.AttachedToParent 标志将一个 Task 和创建它的 Task 关联,结果是除非所有子任务 (以及子任务的子任务) 结束运行,否则创建任务 (父任务) 不认为已经结束。调用 ContinueWith 方法创建 Task 时,可指定 TaskCreationOptions.AttachedToParent 标志将延续任务指定成子任务。每个 Task 对象都有一组字段,这些字段构成了任务的状态。其中包括一个 Int32 ID (参见 Task 的只读 Id 属性)、代表 Task 执行状态的一个 Int32 、对父任务的引用、对 Task 创建时指定的 TaskScheduler 的引用、对回调方法的引用、对要传给回调方法的对象的引用 (可通过 Task 的只读 AsyncState 属性查询)、对 ExecutionContext 的引用以及对 ManualResetEventSlim 对象的引用。另外,每个 Task 对象都有对根据需要创建的补充状态的引用。补充状态包含一个 CancellationToken 、一个 ContinueWithTask 对象集合、为抛出未处理异常的子任务而准备的一个 Task 对象集合等。说了这么多,重点不需要任务的附加功能,那么使用 ThreadPool.QueueUserWorkItem 能获得更好的资源利用率。 TaskTask<TResult> 类实现了 IDisposable 接口,允许在用完 Task 对象后调用 Dispose 。如今,所有 Dispose 方法所做的都是关闭 ManualResetEventSlim 对象。但可定义从 TaskTask<TResult> 派生的类,在这些类中分配它们自己的资源,并在它们重写的 Dispose 方法中释放这些资源。不要在代码中为 Task 对象显式调用 Dispose ;相反,应该让垃圾回收器自己清理任何不再需要的资源。每个 Task 对象都包含代表 Task 唯一 ID 的 Int32 字段。创建 Task 对象时该字段初始化为零。首次查询 Task 的只读 Id 属性时,属性将一个唯一的 Int32 值分配给该字段,并返回该值。任务 ID 从 1 开始,没分配一个 ID 都递增 1。在一个 Task 对象的存在期间,可查询 Task 的只读 Status 属性了解它在其生存期的什么位置。首次构造 Task 对象时,它的状态是 Created 。以后,当任务启动时,它的状态变成 WaitingToRunTask 实际在一个线程上运行时,它的状态变成 Running 。任务停止运行,状态变成 WaitingForChildrenToComplete 。任务完成时进入一下状态之一: RanToCompletion (运行完成), Canceled (取消) 或 Faulted (出错)。如果运行完成,可通过 Task<TResult>Result 属性来查询任务结果。 TaskTask<TResult> 出错时,可查询 TaskException 属性来获得任务抛出的未处理异常;该属性总是返回一个 AggregateException 对象,对象的 InnerException 集合包含了所有未处理的异常。为简化编码, Task 提供了几个只读 Boolean 属性,包括 IsCanceledIsFaultedIsCompleted 。注意当 Task 处于 RanToCompletionCanceledFaulted 状态时, IsCompleted 返回 true 。调用 ContinueWithContinueWhenAllContinueWhenAnyFromAsync 等方法来创建的 Task 对象处于 WaitingForActivation 装填。该状态意味着 Task 的调度由任务基础结构控制。例如,不可显式启动通过调用 ContinueWith 来创建的对象,该 Task 在它的前置任务 (antecedent task) 执行完毕后自动启动。有时需要创建一组共享相同配置的 Task 对象。为避免机械地将相同的参数传给每个 Task 的构造器,可创建一个任务工厂来封装通用的配置。 System.Threading.Tasks 命名空间定义了一个 TaskFactory 类型和一个 TaskFactory<TResult> 类型。两个类型都派生自 System.Object ;也就是说,它们是平级的。要创建一组返回 void 的任务,就构造一个 TaskFactory ;要创建一组具有特定返回类型的任务,就构造一个 TaskFactory<TResult> ,并通过泛型 TResult 实参传递任务的返回类型。创建上述任何工厂类时,要向构造器传递工厂创建的所有任务都具有的默认值。具体地说,要向任务工厂传递希望任务具有的 CancellationTokenTaskSchedulerTaskCreationOptionsTaskContinuationOptions 设置。任务基础结构非常灵活,其中 TaskScheduler 对象功不可没。 TaskScheduler 对象负责执行被调度的任务,同时向 Visual Studio 调试器公开任务信息。FCL 提供了两个派生自 TaskScheduler 的类型:线程池任务调度器 (thread pool task scheduler),和同步上下文任务调度器 (synchronization context task scheduler)。默认情况下,所有应用程序使用的都是线程池任务调度器。这个任务调度器将任务调度给线程池的工作者线程。可查询 TaskScheduler 的静态 Default 属性来获得对默认任务调度器的引用。

# Parallel’s Static For, ForEach, and Invoke Methods

There are some common programming scenarios that can potentially benefit from the improved performance possible with tasks. To simplify programming, the static System.Threading.Tasks. Parallel class encapsulates these common scenarios while using Task objects internally. For example, instead of processing all the items in a collection like this.

// One thread performs all this work sequentially
for (Int32 i = 0; i < 1000; i++) DoWork(i);

you can instead get multiple thread pool threads to assist in performing this work by using the Parallel class’s For method.

// The thread pool’s threads process the work in parallel
Parallel.For(0, 1000, i => DoWork(i));

Similarly, if you have a collection, instead of doing this:

// One thread performs all this work sequentially
foreach (var item in collection) DoWork(item);

you can do this.

// The thread pool's threads process the work in parallel
Parallel.ForEach(collection, item => DoWork(item));

If you can use either For or ForEach in your code, then it is recommended that you use For because it executes faster.

And finally, if you have several methods that you need to execute, you could execute them all sequentially, like this:

// One thread executes all the methods sequentially
Method1();
Method2();
Method3();

or you could execute them in parallel, like this.

// The thread pool’s threads execute the methods in parallel
Parallel.Invoke(
 () => Method1(), 
 () => Method2(), 
 () => Method3());

All of Parallel’s methods have the calling thread participate in the processing of the work, which is good in terms of resource usage because we wouldn’t want the calling thread to just suspend itself while waiting for thread pool threads to do all the work. However, if the calling thread finishes its work before the thread pool threads complete their part of the work, then the calling thread will suspend itself until all the work is done, which is also good because this gives you the same semantics as you’d have when using a for or foreach loop: the thread doesn’t continue running until all the work is done. Also note that if any operation throws an unhandled exception, the Parallel method you called will ultimately throw an AggregateException.

Of course, you should not go through all your source code replacing for loops with calls to Parallel.For and foreach loops with calls to Parallel.ForEach. When calling the Parallel method, there is an assumption that it is OK for the work items to be performed concurrently. Therefore, do not use the Parallel methods if the work must be processed in sequential order. Also, avoid work items that modify any kind of shared data because the data could get corrupted if it is manipulated by multiple threads simultaneously. Normally, you would fix this by adding thread synchronization locks around the data access, but if you do this, then one thread at a time can access the data and you would lose the benefit of processing multiple items in parallel.

In addition, there is overhead associated with the Parallel methods; delegate objects have to be allocated, and these delegates are invoked once for each work item. If you have lots of work items that can be processed by multiple threads, then you might gain a performance increase. Also, if you have lots of work to do for each item, then the performance hit of calling through the delegate is negligible. You will actually hurt your performance if you use the Parallel methods for just a few work items or for work items that are processed very quickly.

I should mention that Parallel’s For, ForEach, and Invoke methods all have overloads that accept a ParallelOptions object, which looks like this.

public class ParallelOptions{
 public ParallelOptions();
 // Allows cancellation of the operation
 public CancellationToken CancellationToken { get; set; } // Default=CancellationToken.None
 // Allows you to specify the maximum number of work items 
 // that can be operated on concurrently
 public Int32 MaxDegreeOfParallelism { get; set; } // Default=-1 (# of available CPUs)
 // Allows you to specify which TaskScheduler to use
 public TaskScheduler TaskScheduler { get; set; } // Default=TaskScheduler.Default
}

In addition, there are overloads of the For and ForEach methods that let you pass three delegates:

  • The task local initialization delegate (localInit) is invoked once for each task participating in the work. This delegate is invoked before the task is asked to process a work item.

  • The body delegate (body) is invoked once for each item being processed by the various threads participating in the work.

  • The task local finally delegate (localFinally) is invoked once for each task participating in the work. This delegate is invoked after the task has processed all the work items that will be dispatched to it. It is even invoked if the body delegate code experiences an unhandled exception.

Here is some sample code that demonstrates the use of the three delegates by adding up the bytes for all files contained within a directory.

private static Int64 DirectoryBytes(String path, String searchPattern, 
 SearchOption searchOption) {
 var files = Directory.EnumerateFiles(path, searchPattern, searchOption);
 Int64 masterTotal = 0;
 ParallelLoopResult result = Parallel.ForEach<String, Int64>(
 files,
 () => { // localInit: Invoked once per task at start
 // Initialize that this task has seen 0 bytes
 return 0; // Set taskLocalTotal initial value to 0
 },
 (file, loopState, index, taskLocalTotal) => { // body: Invoked once per work item
 // Get this file's size and add it to this task's running total
 Int64 fileLength = 0;
 FileStream fs = null;
 try {
 fs = File.OpenRead(file);
 fileLength = fs.Length;
 } 
 catch (IOException) { /* Ignore any files we can't access */ }
 finally { if (fs != null) fs.Dispose(); }
 return taskLocalTotal + fileLength;
 },
 taskLocalTotal => { // localFinally: Invoked once per task at end
 // Atomically add this task's total to the "master" total
 Interlocked.Add(ref masterTotal, taskLocalTotal);
 });
 return masterTotal;
}

Each task maintains its own running total (in the taskLocalTotal variable) for the files that it is given. As each task completes its work, the master total is updated in a thread-safe way by calling the Interlocked.Add method (discussed in Chapter 29, “Primitive Thread Synchronization Constructs”). Because each task has its own running total, no thread synchronization is required during the processing of the items. Because thread synchronization would hurt performance, not requiring thread synchronization is good. It’s only after each task returns that masterTotal has to be updated in a thread-safe way, so the performance hit of calling Interlocked.Add occurs only once per task instead of once per work item.

You’ll notice that the body delegate is passed a ParallelLoopState object, which looks like this.

public class ParallelLoopState{
 public void Stop();
 public Boolean IsStopped { get; }
 public void Break();
 public Int64? LowestBreakIteration{ get; }
 public Boolean IsExceptional { get; }
 public Boolean ShouldExitCurrentIteration { get; }
}

Each task participating in the work gets its own ParallelLoopState object, and it can use this object to interact with the other task participating in the work. The Stop method tells the loop to stop processing any more work, and future querying of the IsStopped property will return true. The Break method tells the loop to stop processing any items that are beyond the current item. For example, let’s say that ForEach is told to process 100 items and Break is called while processing the fifth item, then the loop will make sure that the first five items are processed before ForEach returns. Note, however, that additional items may have been processed. The LowestBreakIteration property returns the lowest item number whose processing called the Break method. The LowestBreakIteration property returns null if Break was never called.

The IsException property returns true if the processing of any item resulted in an unhandled exception. If the processing of an item takes a long time, your code can query the ShouldExitCurrentIteration property to see if it should exit prematurely. This property returns true if Stop was called, Break was called, the CancellationTokenSource (referred to by the ParallelOption’s CancellationToken property) is canceled, or if the processing of an item resulted in an unhandled exception.

Parallel’s For and ForEach methods both return a ParallelLoopResult instance, which looks like this.

public struct ParallelLoopResult {
 // Returns false if the operation was ended prematurely
 public Boolean IsCompleted { get; }
 public Int64? LowestBreakIteration{ get; }
}

You can examine the properties to determine the result of the loop. If IsCompleted returns true, then the loop ran to completion and all the items were processed. If IsCompleted is false and LowestBreakIteration is null, then some thread participating in the work called the Stop method. If IsCompleted is false and LowestBreakIteration is not null, then some thread participating in the work called the Break method and the Int64 value returned from LowestBreakIteration indicates the index of the lowest item guaranteed to have been processed. If an exception is thrown, then you should catch an AggregateException in order to recover gracefully.

💡小结:一些常见的编程情形可通过任务提升性能。为简化编程,静态 System.Threading.Tasks.Parallel 类封装了这些情形,它内部使用 Task 对象。 Parallel 的所有方法都让调用线程参与处理。从资源利用的角度说,这是一件好事,因为我们不希望调用线程停下来 (阻塞),等线程池线程做完所有工作才能继续。然而,如果调用线程在线程池线程完成自己额那一部分工作之前完成工作,调用线程会将自己挂起,直到所有工作完成工作。这也是一件好事,因为这提供了和使用普通 forforeach 循环时相同的语义:线程要在所有工作完成后才继续运行。还要注意,如果任何操作抛出未处理的异常,你调用的 Parallel 方法最后会抛出一个 AggregateException 。但这并不是说需要对自己的源代码进行全文替换,将 for 循环替换成对 Parallel.For 的调用,将 foreach 循环替换成对 Parallel.ForEach 的调用。调用 Parallel 的方法时有一个很重要的前提条件:工作项必须能并行执行!所以,如果工作必须顺序执行,就不要使用 Parallel 的方法。另外,要避免会修改任何共享数据的工作项,否则多个线程同时处理可能会损坏数据。解决这个问题一般的办法是围绕数据访问添加线程同步锁。但这样一次就只能有一个线程访问数据,无法享受并行处理多个项所带来的好处。另外, Parallel 的方法本身也有开销;委托对象必须分配,而针对每个工作项都要调用一次这些委托。如果有大量可由多个线程处理的工作项,那么也许能获得性能的提升。另外,如果每一项都涉及大量工作,那么通过委托来调用所产生的性能损失是可以忽略不计的。但如果只为区区几个工作项使用 Parallel 的方法,或者为处理得非常快的工作项使用 Parallel 的方法,就会得不偿失,反而降低性能。

# Parallel Language Integrated Query

Microsoft’s Language Integrated Query (LINQ) feature offers a convenient syntax for performing queries over collections of data. Using LINQ, you can easily filter items, sort items, return a projected set of items, and much more. When you use LINQ to Objects, only one thread processes all the items in your data collection sequentially; we call this a sequential query. You can potentially improve the performance of this processing by using Parallel LINQ, which can turn your sequential query into a parallel query, which internally uses tasks (queued to the default TaskScheduler) to spread the processing of the collection’s items across multiple CPUs so that multiple items are processed concurrently. Like Parallel’s methods, you will get the most benefit from Parallel LINQ if you have many items to process or if the processing of each item is a lengthy compute-bound operation.

The static System.Linq.ParallelEnumerable class (defined in System.Core.dll) implements all of the Parallel LINQ functionality, and so you must import the System.Linq namespace into your source code via C#’s using directive. In particular, this class exposes parallel versions of all the standard LINQ operators such as Where, Select, SelectMany, GroupBy, Join, OrderBy, Skip, Take, and so on. All of these methods are extension methods that extend the System.Linq. ParallelQuery type. To have your LINQ to Objects query invoke the parallel versions of these methods, you must convert your sequential query (based on IEnumerable or IEnumerable) to a parallel query (based on ParallelQuery or ParallelQuery) using ParallelEnumerable’s AsParallel extension method, which looks like this.

public static ParallelQuery<TSource> AsParallel<TSource>(this IEnumerable<TSource> source)
public static ParallelQuery AsParallel(this IEnumerable source)

Here is an example of a sequential query that has been converted to a parallel query. This query returns all the obsolete methods defined within an assembly.

private static void ObsoleteMethods(Assembly assembly) {
 var query =
 from type in assembly.GetExportedTypes().AsParallel()
 from method in type.GetMethods(BindingFlags.Public | 
 BindingFlags.Instance | BindingFlags.Static)
 let obsoleteAttrType = typeof(ObsoleteAttribute)
 where Attribute.IsDefined(method, obsoleteAttrType)
 orderby type.FullName
 let obsoleteAttrObj = (ObsoleteAttribute) 
 Attribute.GetCustomAttribute(method, obsoleteAttrType)
 select String.Format("Type={0}\nMethod={1}\nMessage={2}\n",
 type.FullName, method.ToString(), obsoleteAttrObj.Message);
 // Display the results
 foreach (var result in query) Console.WriteLine(result);
}

Although uncommon, within a query you can switch from performing parallel operations back to performing sequential operations by calling ParallelEnumerable’s AsSequential method.

public static IEnumerable<TSource> AsSequential<TSource>(this ParallelQuery<TSource> source)

This method basically turns a ParallelQuery back to an IEnumerable so that operations performed after calling AsSequential are performed by just one thread.

Normally, the resulting data produced by a LINQ query is evaluated by having some thread execute a foreach statement (as shown earlier). This means that just one thread iterates over all the query’s results. If you want to have the query’s results processed in parallel, then you should process the resulting query by using ParallelEnumerable’s ForAll method.

static void ForAll<TSource>(this ParallelQuery<TSource> source, Action<TSource> action)

This method allows multiple threads to process the results simultaneously. I could modify my code earlier to use this method as follows.

// Display the results
query.ForAll(Console.WriteLine);

However, having multiple threads call Console.WriteLine simultaneously actually hurts performance, because the Console class internally synchronizes threads, ensuring that only one at a time can access the console window. This prevents text from multiple threads from being interspersed, making the output unintelligible. Use the ForAll method when you intend to perform calculations on each result.

Because Parallel LINQ processes items by using multiple threads, the items are processed concurrently and the results are returned in an unordered fashion. If you need to have Parallel LINQ preserve the order of items as they are processed, then you can call ParallelEnumerable’s AsOrdered method. When you call this method, threads will process items in groups and then the groups are merged back together, preserving the order; this will hurt performance. The following operators produce unordered operations: Distinct, Except, Intersect, Union, Join, GroupBy, GroupJoin, and ToLookup. If you want to enforce ordering again after one of these operators, just call the AsOrdered method.

The following operators produce ordered operations: OrderBy, OrderByDescending, ThenBy, and ThenByDescending. If you want to go back to unordered processing again to improve performance after one of these operators, just call the AsUnordered method.

Parallel LINQ offers some additional ParallelEnumerable methods that you can call to control how the query is processed.

public static ParallelQuery<TSource> WithCancellation<TSource>(
 this ParallelQuery<TSource> source, CancellationTokencancellationToken)
public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource>(
 this ParallelQuery<TSource> source, Int32degreeOfParallelism)
public static ParallelQuery<TSource> WithExecutionMode<TSource>(
 this ParallelQuery<TSource> source, ParallelExecutionModeexecutionMode)
public static ParallelQuery<TSource> WithMergeOptions<TSource>(
 this ParallelQuery<TSource> source, ParallelMergeOptionsmergeOptions)

Obviously, the WithCancellation method allows you to pass a CancellationToken so that the query processing can be stopped prematurely. The WithDegreeOfParallelism method specifies the maximum number of threads allowed to process the query; it does not force the threads to be created if not all of them are necessary. Usually you will not call this method, and, by default, the query will execute using one thread per core. However, you could call WIthDegreeOfParallelism, passing a number that is smaller than the number of available cores if you want to keep some cores available for doing other work. You could also pass a number that is greater than the number of cores if the query performs synchronous I/O operations because threads will be blocking during these operations. This wastes more threads but can produce the final result in less time. You might consider doing this in a client application, but I’d highly recommend against performing synchronous I/O operations in a server application.

Parallel LINQ analyzes a query and then decides how to best process it. Sometimes processing a query sequentially yields better performance. This is usually true when using any of these operations: Concat, ElementAt(OrDefault), First(OrDefault), Last(OrDefault), Skip(While), Take(While), or Zip. It is also true when using overloads of Select(Many) or Where that pass a position index into your selector or predicate delegate. However, you can force a query to be processed in parallel by calling WithExecutionMode, passing it one of the ParallelExecutionMode flags.

public enum ParallelExecutionMode {
 Default = 0, // Let Parallel LINQ decide to best process the query
 ForceParallelism = 1 // Force the query to be processed in parallel
}

As mentioned before, Parallel LINQ has multiple threads processing items, and then the results must be merged back together. You can control how the items are buffered and merged by calling WithMergeOptions, passing it one of the ParallelMergeOptions flags.

public enum ParallelMergeOptions {
 Default = 0, // Same as AutoBuffered today (could change in the future)
 NotBuffered = 1, // Results are processed as ready
 AutoBuffered = 2, // Each thread buffers some results before processed
 FullyBuffered = 3 // Each thread buffers all results before processed
}

These options basically give you some control over speed versus memory consumption. NotBuffered saves memory but processes items slower. FullyBuffered consumes more memory while running fastest. AutoBuffered is the compromise in between NotBuffered and FullyBuffered. Really, the best way to know which of these to choose for any given query is to try them all and compare their performance results, or just accept the default, which tends to work pretty well for many queries. See the following blog posts for more information about how Parallel LINQ partitions work across CPU cores:

  • http://blogs.msdn.com/pfxteam/archive/2009/05/28/9648672.aspx

  • http://blogs.msdn.com/pfxteam/archive/2009/06/13/9741072.aspx

💡小结:Microsoft 的语言集成查询 (Language Integrated Query,LINQ) 功能提供了一个简捷的语法来查询数据集合。可用 LINQ 轻松对数据项进行筛选、排序、投射等操作。使用 LINQ to Objects 时,只有一个线程顺序处理数据集合中的所有项;我们称之为顺序查询 (sequential query)。要提高处理性能,可以使用并行 LINQ (Parallel LINQ),它将顺序查询转换成并行查询,在内部使用任务 (排队给默认 TaskScheduler ),将集合中的数据项的处理工作分散到多个 CPU 上,以便并发处理多个数据项。和 Parallel 的方法相似,要同时处理大量项,或者每一项的处理过程都是一个耗时的计算限制的操作,那么能从并行 LINQ 获得最大的收益。

# Performing a Periodic Compute-Bound Operation

The System.Threading namespace defines a Timer class, which you can use to have a thread pool thread call a method periodically. When you construct an instance of the Timer class, you are telling the thread pool that you want a method of yours called back at a future time that you specify. The Timer class offers several constructors, all quite similar to each other.

public sealed class Timer : MarshalByRefObject, IDisposable { 
 public Timer(TimerCallback callback, Object state, Int32 dueTime, Int32 period); 
 public Timer(TimerCallback callback, Object state, UInt32 dueTime, UInt32 period); 
 public Timer(TimerCallback callback, Object state, Int64 dueTime, Int64 period); 
 public Timer(TimerCallback callback, Object state, Timespan dueTime, TimeSpan period); 
}

All four constructors construct a Timer object identically. The callback parameter identifies the method that you want called back by a thread pool thread. Of course, the callback method that you write must match the System.Threading.TimerCallback delegate type, which is defined as follows.

delegate void TimerCallback(Object state);

The constructor’s state parameter allows you to pass state data to the callback method each time it is invoked; you can pass null if you have no state data to pass. You use the dueTime parameter to tell the CLR how many milliseconds to wait before calling your callback method for the very first time. You can specify the number of milliseconds by using a signed or unsigned 32-bit value, a signed 64- bit value, or a TimeSpan value. If you want the callback method called immediately, specify 0 for the dueTime parameter. The last parameter, period, allows you to specify how long, in milliseconds, to wait before each successive call to the callback method. If you pass Timeout.Infinite (-1) for this parameter, a thread pool thread will call the callback method just once.

Internally, the thread pool has just one thread that it uses for all Timer objects. This thread knows when the next Timer object’s time is due. When the next Timer object is due, the thread wakes up, and internally calls ThreadPool’s QueueUserWorkItem to enter an entry into the thread pool’s queue, causing your callback method to get called. If your callback method takes a long time to execute, the timer could go off again. This could cause multiple thread pool threads to be executing your callback method simultaneously. To work around this problem, I recommend the following: construct the Timer specifying Timeout.Infinite for the period parameter. Now, the timer will fire only once. Then, in your callback method, call the Change method specifying a new due time and again specify Timeout.Infinite for the period parameter. Here is what the Change method overloads look like.

public sealed class Timer : MarshalByRefObject, IDisposable { 
 public Boolean Change(Int32 dueTime, Int32 period); 
 public Boolean Change(UInt32 dueTime, UInt32 period); 
 public Boolean Change(Int64 dueTime, Int64 period); 
 public Boolean Change(TimeSpan dueTime, TimeSpan period); 
}

The Timer class also offers a Dispose method that allows you to cancel the timer altogether and optionally signal the kernel object identified by the notifyObject parameter when all pending callbacks for the time have completed. Here is what the Dispose method overloads look like.

public sealed class Timer : MarshalByRefObject, IDisposable { 
 public Boolean Dispose(); 
 public Boolean Dispose(WaitHandle notifyObject); 
}

💡重要提示: Timer 对象被垃圾回收时,它的终结代码告诉线程池取消计时器,使它不再触发。所以,使用 Timer 对象时,要确定有一个变量在保持 Timer 对象的存活,否则对你的回调方法的调用就会停止。21.1.3 节 “垃圾回收与调试” 对此进行了详细讨论和演示。

The following code demonstrates how to have a thread pool thread call a method starting immediately and then every two seconds thereafter.

internal static class TimerDemo {
 private static Timer s_timer;
 public static void Main() {
 Console.WriteLine("Checking status every 2 seconds");
 // Create the Timer ensuring that it never fires. This ensures that
 // s_timer refers to it BEFORE Status is invoked by a thread pool thread
 s_timer = new Timer(Status, null, Timeout.Infinite, Timeout.Infinite);
 // Now that s_timer is assigned to, we can let the timer fire knowing
 // that calling Change in Status will not throw a NullReferenceException
 s_timer.Change(0, Timeout.Infinite);
 Console.ReadLine(); // Prevent the process from terminating
 }
 // This method's signature must match the TimerCallback delegate
 private static void Status(Object state) {
 // This method is executed by a thread pool thread
 Console.WriteLine("In Status at {0}", DateTime.Now);
 Thread.Sleep(1000); // Simulates other work (1 second)
 // Just before returning, have the Timer fire again in 2 seconds
 s_timer.Change(2000, Timeout.Infinite);
 // When this method returns, the thread goes back
 // to the pool and waits for another work item
 }
}

If you have an operation you want performed periodically, there is another way you can structure your code by taking advantage of Task’s static Delay method along with C#’s async and await keywords (discussed extensively in Chapter 28). Here is a rewrite of the preceding code demonstrating this.

internal static class DelayDemo {
 public static void Main() {
 Console.WriteLine("Checking status every 2 seconds");
 Status();
 Console.ReadLine(); // Prevent the process from terminating
 }
 // This method can take whatever parameters you desire
 private static async void Status() {
 while (true) {
 Console.WriteLine("Checking status at {0}", DateTime.Now);
 // Put code to check status here...
 // At end of loop, delay 2 seconds without blocking a thread
 await Task.Delay(2000); // await allows thread to return
 // After 2 seconds, some thread will continue after await to loop around
 }
 }
}

# So Many Timers, So Little Time

Unfortunately, the FCL actually ships with several timers, and it is not clear to most programmers what makes each timer unique. Let me attempt to explain:

  • System.Threading’s Timer class This is the timer discussed in the previous section, and it is the best timer to use when you want to perform periodic background tasks on a thread pool thread.

  • System.Windows.Forms’s Timer class Constructing an instance of this class tells Windows to associate a timer with the calling thread (see the Win32 SetTimer function). When this timer goes off, Windows injects a timer message (WM_TIMER) into the thread’s message queue. The thread must execute a message pump that extracts these messages and dispatches them to the desired callback method. Notice that all of the work is done by just one thread—the thread that sets the timer is guaranteed to be the thread that executes the callback method. This also means that your timer method will not be executed by multiple threads concurrently.

  • System.Windows.Threading’s DispatcherTimer class This class is the equivalent of the System.Windows.Forms’s Timer class for Silverlight and WPF applications.

  • Windows.UI.Xaml’s DispatcherTimer class This class is the equivalent of the System. Windows.Forms’s Timer class for Windows Store apps.

  • System.Timers’s Timer class This timer is basically a wrapper around System.Threading’s Timer class that causes the CLR to queue events into the thread pool when the timer comes due. The System.Timers.Timer class is derived from System.ComponentModel’s Component class, which allows these timer objects to be placed on a design surface in Visual Studio. Also, it exposes properties and events, allowing it to be used more easily from Visual Studio’s designer. This class was added to the FCL years ago while Microsoft was still sorting out the threading and timer stuff. This class probably should have been removed so that everyone would be using the System.Threading.Timer class instead. In fact, I never use the System. Timers.Timer class, and I’d discourage you from using it, too, unless you really want a timer on a design surface.

💡小结: System.Threading 命名空间定义了一个 Timer 类,可用它让一个线程池线程定时调用一个方法。构造 Timer 类的实例相当于告诉线程池:在将来某个时间 (具体由你指定) 回调你的一个方法。在内部,线程池为所有 Timer 对象只使用了一个线程。这个线程知道下一个 Timer 对象在什么时候到期 (计时器还有多久触发)。下一个 Timer 对象到期时,线程就会唤醒,在内部调用 ThreadPoolQueueUserWorkItem , 将一个工作项添加到线程池的队列中,使你的回调方法得到调用。如果回调方法的执行时间很长,计时器可能 (在上个回调还没有完成的时候) 再次触发。这可能造成多个线程池线程同时执行你的回调方法。为解决这个问题,我的建议是:构造 Timer 时,为 period 参数指定 Timeout.Infinite 。这样,计时器就只触发一次。然后,在你的回调方法中,调用 Change 方法来指定一个新的 dueTime ,并再次为 period 参数指定 Timeout.InfiniteTimer 类还提供了一个 Dispose 方法,允许完全取消计时器,并可在当时处于 pending 状态的所有回调完成之后,向 notifyObject 参数标识的内核对象发出信号。

# How the Thread Pool Manages Its Threads

Now I’d like to talk about how the thread pool code manages worker and I/O threads. However, I don’t want to go into a lot of detail, because the internal implementation has changed greatly over the years with each version of the CLR, and it will continue changing with future versions. It is best to think of the thread pool as a black box. The black box is not perfect for any one application, because it is a general purpose thread-scheduling technology designed to work with a large myriad of applications; it will work better for some applications than for others. It works very well today, and I highly recommend that you trust it, because it would be very hard for you to produce a thread pool that works better than the one shipping in the CLR. And, over time, most applications should improve as the thread pool code internally changes how it manages threads.

# Setting Thread Pool Limits

The CLR allows developers to set a maximum number of threads that the thread pool will create. However, it turns out that thread pools should never place an upper limit on the number of threads in the pool because starvation or deadlock might occur. Imagine queuing 1,000 work items that all block on an event that is signaled by the 1,001st item. If you’ve set a maximum of 1,000 threads, the 1,001st work item won’t be executed, and all 1,000 threads will be blocked forever, forcing end users to terminate the application and lose all their work. Also, it is very unusual for developers to artificially limit the resources that they have available to their application. For example, would you ever start your application and tell the system you’d like to restrict the amount of memory that the application can use or limit the amount of network bandwidth that your application can use? Yet, for some reason, developers feel compelled to limit the number of threads that the thread pool can have.

Because customers have had starvation and deadlock issues, the CLR team has steadily increased the default maximum number of threads that the thread pool can have. The default maximum is now about 1,000 threads, which is effectively limitless because a 32-bit process has at most 2 GB of usable address space within it. After a bunch of Win32 DLLs load, the CLR DLLs load, the native heap and the managed heap is allocated, there is approximately 1.5 GB of address space left over. Because each thread requires more than 1 MB of memory for its user-mode stack and thread environment block (TEB), the most threads you can get in a 32-bit process is about 1,360. Attempting to create more threads than this will result in an OutOfMemoryException being thrown. Of course, a 64-bit process offers 8 terabytes of address space, so you could theoretically create hundreds of thousands of threads. But allocating anywhere near this number of threads is really just a waste of resources, especially when the ideal number of threads to have is equal to the number of CPUs in the machine. What the CLR team should do is remove the limits entirely, but they can’t do this now because doing so might break some applications that expect thread pool limits to exist.

The System.Threading.ThreadPool class offers several static methods that you can call to manipulate the number of threads in the thread pool: GetMaxThreads, SetMaxThreads, GetMinThreads, SetMinThreads, and GetAvailableThreads. I highly recommend that you do not call any of these methods. Playing with thread pool limits usually results in making an application perform worse, not better. If you think that your application needs hundreds or thousands of threads, there is something seriously wrong with the architecture of your application and the way that it’s using threads. This chapter and Chapter 28 demonstrate the proper way to use threads.

# How Worker Threads Are Managed

Figure 27-1 shows the various data structures that make up the worker threads' part of the thread pool. The ThreadPool.QueueUserWorkItem method and the Timer class always queue work items to the global queue. Worker threads pull items from this queue using a first-in-first-out (FIFO) algorithm and process them. Because multiple worker threads can be removing items from the global queue simultaneously, all worker threads contend on a thread synchronization lock to ensure that two or more threads don’t take the same work item. This thread synchronization lock can become a bottleneck in some applications, thereby limiting scalability and performance to some degree.

image-20230206220449823

Now let’s talk about Task objects scheduled using the default TaskScheduler (obtained by querying TaskScheduler’s static Default property).4 When a non-worker thread schedules a Task, the Task is added to the global queue. But, each worker thread has its own local queue, and when a worker thread schedules a Task, the Task is added to calling the thread’s local queue.

When a worker thread is ready to process an item, it always checks its local queue for a Task first. If a Task exists, the worker thread removes the Task from its local queue and processes the item. Note that a worker thread pulls tasks from its local queue by using a last-in-first-out (LIFO) algorithm. Because a worker thread is the only thread allowed to access the head of its own local queue, no thread synchronization lock is required and adding and removing Tasks from the queue is very fast. A side effect of this behavior is that Tasks are executed in the reverse order that they were queued.

💡重要提示:线程池从来不保证排队中的工作项的处理顺序。这是合理的,尤其是考虑到多个线程可能同时处理工作项。但上述副作用使这个问题变得恶化了。你必须保证自己的应用程序对于工作项或 Task 的执行顺序不作任何预设。

If a worker thread sees that its local queue is empty, then the worker thread will attempt to steal a Task from another worker thread’s local queue. Tasks are stolen from the tail of a local queue and require that a thread synchronization lock be taken, which hurts performance a little bit. Of course, the hope is that stealing rarely occurs, so this lock is taken rarely. If all the local queues are empty, then the worker thread will extract an item from the global queue (taking its lock) using the FIFO algorithm. If the global queue is empty, then the worker thread puts itself to sleep waiting for something to show up. If it sleeps for a long time, then it will wake itself up and destroy itself, allowing the system to reclaim the resources (kernel object, stacks, TEB) that were used by the thread.

The thread pool will quickly create worker threads so that the number of worker threads is equal to the value pass to ThreadPool’s SetMinThreads method. If you never call this method (and it’s recommended that you never call this method), then the default value is equal to the number of CPUs that your process is allowed to use as determined by your process’s affinity mask. Usually your process is allowed to use all the CPUs on the machine, so the thread pool will quickly create worker threads up to the number of CPUs on the machine. After this many threads have been created, the thread pool monitors the completion rate of work items and if items are taking a long time to complete (the meaning of which is not documented), it creates more worker threads. If items start completing quickly, then worker threads will be destroyed.

💡小结:CLR 允许开发人员设置线程池要创建的最大线程数。但实践证明,线程池永远都不应该设置线程数上限,因为可能发生饥饿或死锁。 System.Threading.ThreadPool 类提供了几个静态方法,可调用它们设置和查询线程池的线程数: GetMaxThreadsSetMaxThreadsGetMinThreadsSetMinThreadsGetAvailableThreads 。强烈建议不要调用上述任何方法。限制线程池的线程数,一般都只会造成应用程序的性能变得更差,而不是更好。如果认为自己的应用程序需要几百或几千个线程,表明你的应用程序的架构和使用线程的方式已出现严重问题。 ThreadPool.QueueUserWorkItem 方法和 Timer 类总是将工作项放到全局队列中。工作者线程采用一个先入先出 (first-in-first-out,FIFO) 算法将工作项从这个队列中取出,并处理它们。由于多个工作者线程可能同时从全局队列中拿走工作项,所以所有工作者线程都竞争一个线程同步锁,以保证两个或多个线程不会获取同一个工作项。这个线程同步锁在某些应用程序中可能成为瓶颈,对伸缩性和性能造成某种程度的限制。使用默认 TaskScheduler (查询 TaskScheduler 的静态 Default 属性获得) 来调度的 Task 对象,非工作者线程调度一个 Task 时,该 Task 被添加到全局队列。但每个工作者线程都有自己的本地队列。工作者线程调度一个 Task 时,该 Task 被添加到调用线程的本地队列。工作者线程准备好处理工作项时,它总是先检查本地队列来查找一个 Task 。存在一个 Task ,工作者线程就从本地队列移除 Task 并处理工作项。要注意的是,工作者线程采用后入先出 (LIFO) 算法将任务从本地队列取出。由于工作者线程是唯一允许访问它自己的本地队列头的线程,所以无需同步锁,而且在队列中添加和删除 Task 的速度非常快。这个行为的副作用是 Task 按照和进入队列时相反的顺序执行。如果工作者线程发现它的本地队列变空了,会尝试从另一个工作者线程的本地队列 “偷” 一个 Task 。 这个 Task 是从本地队列的尾部 “偷” 走的,并要求获取一个线程同步锁,这对性能有少许影响。当然,希望这种 “偷盗” 行为很少发生,从而很少需要获取锁。如果所有本地队列都变空,那么工作者线程会使用 FIFO 算法,从全局队列提取一个工作项 (取得它的锁)。如果全局队列也为空,工作者线程会进入睡眠状态,等待事情的发生。如果睡眠了太长时间,它会自己醒来,并销毁自身,允许系统回收线程使用的资源 (内核对象、栈、TEB 等)。线程池会快速创建工作者线程,使工作者线程的数量等于传给 ThreadPoolSetMinThreads 方法的值。如果从不调用这个方法 (也建议你永远不调用这个方法),那么默认值等于你的进程允许使用的 CPU 数量,这是由进程的 affinity mask (关联掩码) 决定的。通常,你的进程允许使用机器上的所有 CPU,所以线程池创建的工作者线程数量很快就会达到机器的 CPU 数。创建了这么多 (CPU 数量) 的线程后,线程池会监视工作项的完成速度。如果工作项完成的时间太长 (具体多长没有正式公布),线程池会创建更多的工作者线程。如果工作项的完成速度开始变快,工作者线程会被销毁。