14.并发与异步 - 2.任务Task -《果壳中的c#》

Task.png

线程是创建并发的底层工具,因此具有一定的局限性。

  • 没有简单的方法可以从联合(Join)线程得到“返回值”。因此必须创建一些共享域。当抛出一个异常时,捕捉和处理异常也是麻烦的。
  • 线程完成之后,无法再次启动该线程。相反,只能联合(Join)它(在进程阻塞当前线程)。

与线程相比,Task是一个更高级的抽象概念,它标识一个通过或不通过线程实现的并发操作。
任务是可组合的——使用延续将它们串联在一起。它们可以使用线程池减少启动延迟,而且它们可以通过TaskCompletionSource使用回调方法,避免多个线程同时等待I/O密集操作。

14.3.1 启动任务

从Framework 4.5开始,启动一个由后台线程实现的Task,最简单的方法是使用静态方法Task.Run。调用时需要传入一个Action代理:

Task.Run(() => Console.WriteLine("hello"));

Task.Run是Framework 4.5新引入的方法,在Framework 4.0中,调用Task.Factory.StartNew,可以实现相同效果,前者相当于后者的快捷方式。

Task默认使用线程池,它们都是后台线程。意味当主线程结束时,所有任务都会随之停止。因此,要在控制台应用程序中运行这些例子,必须在启动任务之后阻塞主线程。例如,挂起(Waiting)该让你误,或者调用Console.ReadLine:

    static void Main(string[] args)
    {
        Task.Run(() => Console.WriteLine("Foo"));
        Console.ReadLine();
    }

采用这种方式调用Task.Run,与下面启动线程方式类似(唯一不同的是没有隐含使用线程池):

new Thread(() => Console.WriteLine("Foo")).Start();

Task.Run会返回一个Task对象,它可以用来监控任务执行过程,这一点与Thread对象不同。(这里没有调用Start,因为Task.Run创建是“热”任务;相反,想创建“冷”任务,必须使用Task构造函数,但这种方法在实践中很少用)

任务的Status属性可用于跟踪任务的执行状态。

1.等待(Wait)

调用Wait方法,可以阻塞任务,直至任务完成,效果等同于Thread.Join

    Task task = Task.Run(() =>
    {
        Thread.Sleep(2000);
        Console.WriteLine("Foo");
    });
    Console.WriteLine(task.IsCompleted); //False
    task.Wait();//阻塞,直至任务完成
    Console.WriteLine(task.IsCompleted); //True
    Console.ReadLine();

可以在Wait中指定一个超时时间和一个取消令牌。

2.长任务

默认情况下,CLR会运行在池化线程上,这种线程非常适合执行短计算密集作业。如果要执行长阻塞操作,则可以按下面方式避免使用池化线程:

    Task task = Task.Factory.StartNew(() =>
    {
        Console.WriteLine("Task started");
        Thread.Sleep(2000);
        Console.WriteLine("Foo");
    }, TaskCreationOptions.LongRunning);

    task.Wait();  // Blocks until task is complete

提示:
在池化线程上运行一个长任务问题并不大,但是如果要同时运行多个长任务(特别会阻塞的任务),则会对性能产生影响。在这种情况下,通常更好的方法是使用TaskCreationOptions.LongRunning:

  • 如果运行I/O密集任务,则可以使用TaskCompletionSource和异步函数,通过回调函数(延续)实现并发性,而不通过线程实现。
  • 如果是运行计算密集任务,则可以使用一个生产者/消费者队列,控制这些任务的并发数量,避免出现线程和进程阻塞的问题。

14.3.2 返回值

Task<TResult>允许任务返回一个值。调用Task.Run,传入一个Func<TResult>代理(或者兼容的Lambda表达式),代替Action,就可以获得一个Task<TResult>:

Task<int> task = Task.Run (() => { Console.WriteLine ("Foo"); return 3; });

int result = task.Result;      // Blocks if not already finished
Console.WriteLine (result);    // 3

下面的例子创建一个任务,它使用LINQ就按前3百万个整数(从2开始)中的素数个数:

    Task<int> primeNumberTask = Task.Run(() =>
        Enumerable.Range(2, 3000000).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));

    Console.WriteLine("Task running...");
    Console.WriteLine("The answer is " + primeNumberTask.Result);

这段代码会打印“Task running...”,然后几秒钟后打印216815。

14.3.3 异常

与线程不同,Task可以随时抛出异常。
任务代码抛出一个未处理异常,那么这个异常会自动传递到调用Wait()的任务上或者访问Task<TResult>Result属性的代码上:

    Task task = Task.Run(() => { throw null; });
    try
    {
        task.Wait();
    }
    catch (AggregateException aex)
    {
        if (aex.InnerException is NullReferenceException)
            Console.WriteLine("Null!");
        else
            throw;
    }

CLR会将异常封装在AggregateException中,从而更适合并行编程场景;

使用Task的IsFaultedIsCanceled属性,就可以不重新抛出异常而检测出错的任务。
如果都返回false,则没有出错;
IsCanceledtrue,任务抛出 OperationCanceledOPeration
IsFaultedtrue,则任务抛出另一种异常,而Exception属性包含该错误。

1.异常和自主任务

使用静态事件 TaskScheduler.UnobservedTaskException,可以在全局范围订阅为监控的异常;处理这个事件,然后记录发生的错误,是一个很好的异常处理方法。

14.3.4 延续

延续(continuation)告诉任务在完成之后继续执行下面的操作。
延续通常由一个回调方法实现,它会在操作完成之后执行一次。

给一个任务附加延续的方法有两种

第一种是C# 5.0异步功能使用的方法GetAwaiter方法

Task<int> primeNumberTask = Task.Run (() =>
    Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));

//获取用于等待此 System.Threading.Tasks.Task<TResult>的等待者
var awaiter = primeNumberTask.GetAwaiter();
//将操作设置为当 System.Runtime.CompilerServices.TaskAwaiter<TResult> 对象停止等待异步任务完成时执行
awaiter.OnCompleted (() => 
{
    int result = awaiter.GetResult(); //异步任务完成后关闭等待任务
    Console.WriteLine (result);       //打印结果
});

调用GetAwaiter会返回一个等待者(awaiter)对象,它的方法会让先导(antecedent)任务(primeNumberTask)在完成(或出错)之后执行一个代理已经完成的任务也可以附加一个延续,这时延续就马上执行。

提示:
等待者可以是任意对象,但它必须包含前面所示两个方法(OnCompletedGetResult)和一个Boolean类型属性IsCompleted对象,它不需要实现包含所有这些成员的特定接口或继承特定基类。

调用GetResult()的好处在于,一旦先前的Task有异常,就会抛出该异常。而且该异常和之前演示的异常不同,它不需要经过AggregateException再包装了。

另一种附加延续的方法是调用任务的ContinueWith方法:

Task<int> primeNumberTask = Task.Run (() =>
    Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));

primeNumberTask.ContinueWith (antecedent => 
{
    int result = antecedent.Result;
    Console.WriteLine (result);          // Writes 123
});

ContinueWith本身返回一个Task,它非常适合添加更多延续。然而,任务出错,我们必须直接处理AggregateException,然后编写额外代码,将延续编列到UI应用程序。而非UI上下文中,如果要让延续运行在同一个线程上,则必须指定TaskContinuationOptions.ExcuteSynchronously;否则弹回线程池。

14.3.5 TaskCompletionSource

前面介绍Task.Run如何创建一个在池化(或非池化)线程运行代理的任务。另一种就是TaskCompletionSource。

TaskCompletionSource可以创建任务,不包含任何必须在后面启动和结束的操作。原理是提供一个可以手工操作的“附属”任务——和其他任务一样。然而,这个任务完全通过下面的方法由TaskCompletionSource对象控制:

public class TaskCompletionSource<TResult>
{
    public void SetCanceled();
    public void SetResult(TResult result);
    public void SetException(Exception exception);
    public bool TrySetCanceled();
    public bool TrySetException(Exception exception);
    ...
}

调用这些方法可以给任务发送信号,将任务修改为完成、异常或取消状态。
这些方法只能调用一次,如果多次调用SetCanceledSetResultSetException,将抛出异常,而Try***等方法则会返回false。

    var tcs = new TaskCompletionSource<int>();

    new Thread(() => { Thread.Sleep(5000); tcs.SetResult(42); }).Start();

    Task<int> task = tcs.Task;         // Our "slave" task.
    Console.WriteLine(task.Result);   // 42

使用TaskCompletionSource,可以编写自定义的Run方法:

        static void Main(string[] args)
        {
            Task<int> task = Run(() => { Thread.Sleep(5000); return 42; });
            Console.WriteLine(task.Result);
            Console.Read();

        }

        static Task<TResult> Run<TResult>(Func<TResult> function)
        {
            var tcs = new TaskCompletionSource<TResult>();
            new Thread(() =>
            {
                try { tcs.SetResult(function()); }
                catch (Exception ex) { tcs.SetException(ex); }
            }).Start();
            return tcs.Task;
        }

调用这个方法等同于使用TaskCreationOptions.LongRunning选项调用Task.Factory.StartNew,请求一个非池化线程

TaskCompletionSource真正作用是创建一个不绑定线程的任务。例如,假设一个任务需要等待5秒钟,然后返回数字42.我们可以使用Timer类实现,而不需要使用线程,由CLR在x毫秒之后触发一个事件:

    static void Main(string[] args)
    {
        var awaiter = GetAnswerToLife().GetAwaiter();
        awaiter.OnCompleted(() => Console.WriteLine(awaiter.GetResult()));
    }
    static Task<int> GetAnswerToLife()
    {
        var tcs = new TaskCompletionSource<int>();
        // Create a timer that fires once in 5000 ms:
        var timer = new System.Timers.Timer(5000) { AutoReset = false };
        timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult(42); };
        timer.Start();
        return tcs.Task;
    }

通过给任务附加一个延续,就可以在不阻塞任何线程的前提下打印这个结果。

    var awaiter = GetAnswerToLife().GetAwaiter();
    awaiter.OnCompleted(() => Console.WriteLine(awaiter.GetResult()));

将延迟时间参数化,并且删除返回值,可以优化这段代码。并且将它变成一个通用的Delay方法。意味让它返回一个Task而不是Task<int>。然而,TaskCompletionSource没有泛型版本,因此无法创建一个非泛型任务。但变通方法很简单:因为Task<TResult>派生自Task,所以创建一个TaskCompletionSource<anything>,然后将它隐式转换为Task<anything>,就可以得到一个Task:

var tcs = new TaskCompletionSource<object>();
Task task = tcs.Task;

写出Delay方法,然后让它5秒打印“42”:

    static void Main(string[] args)
    {
        Delay(5000).GetAwaiter().OnCompleted(() => Console.WriteLine(42));
        Console.Read();
    }
    static Task Delay(int milliseconds)
    {
        var tcs = new TaskCompletionSource<object>();
        var timer = new System.Timers.Timer(milliseconds) { AutoReset = false };
        timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult(null); };
        timer.Start();
        return tcs.Task;
    }

不在线程上使用TaskCompletionSource,意味着只有在延续启动时才创建线程。同时启动10000个这种操作,而不会出错或超出资源限制:

for (int i = 0; i < 10000; i++)
        Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));

14.3.6 Task.Delay

Task.DelayThread.Sleep的异步版本

Task.Delay(5000).GetAwaiter().OnCompleted(()=>Console.WriteLine(42));

或者

 Task.Delay(5000).ContinueWith(ant => Console.WriteLine(42));
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容

  • 14.5.2 编写异步函数 编译器会扩展异步函数,它会将任务返回给使用TaskCompletionSource的代...
    大捕猎店阅读 1,282评论 0 2
  • 今日逛宜家,与先生同行,发现先生简直就不是凡人啊 逛宜家的时候只看床和沙发,并且一眼否定,我不要这个床,这个沙发太...
    趣悦书阅读 281评论 1 0
  • 京鲁大战历来火爆,在本场比赛开始前,两队在中超联赛一共交锋25次,国安战绩为8胜8平9负,进39球丢39球,两队...
    奋斗星人_小帆阅读 201评论 4 0
  • 回到梅州第二天,感觉还好吧,踏实。 不过,下午睡过头,跑去提款机才发现银行卡里钱还没到账,心一横,既然已经出来了,...
    恐血阅读 194评论 0 0
  • 令猴们尊重不已的大圣: 还好吗?昨天聊了许多,简单回顾了我记忆当中您初级阶段滴亮眼表现,也是我命不该绝,您救了...
    大大师阅读 309评论 0 1