Warning: error_log(/data/www/wwwroot/hmttv.cn/caches/error_log.php): failed to open stream: Permission denied in /data/www/wwwroot/hmttv.cn/phpcms/libs/functions/global.func.php on line 537 Warning: error_log(/data/www/wwwroot/hmttv.cn/caches/error_log.php): failed to open stream: Permission denied in /data/www/wwwroot/hmttv.cn/phpcms/libs/functions/global.func.php on line 537
錄
考慮到深度學(xué)習(xí)領(lǐng)域中的數(shù)據(jù)規(guī)模一般都比較大,尤其是訓(xùn)練集,這個(gè)限制條件對(duì)應(yīng)到實(shí)際編程中就意味著,我們很有可能無(wú)法將整個(gè)數(shù)據(jù)文件的內(nèi)容全部都加載到內(nèi)存中。那么就需要一些特殊的處理方式,比如:創(chuàng)建內(nèi)存映射文件來(lái)替代原始文件被加載到內(nèi)存中、預(yù)處理數(shù)據(jù)后再加載內(nèi)存中以及單次只加載文件的片段。其中關(guān)于內(nèi)存映射技術(shù)的一些應(yīng)用,在前面的這2篇博客1和博客2中有所介紹,而本文將要介紹的是從文件中只讀取特定行的內(nèi)容的3種解決方案。
在python中如果要將一個(gè)文件完全加載到內(nèi)存中,通過(guò)file.readlines()即可,但是在文件占用較高時(shí),我們是無(wú)法完整的將文件加載到內(nèi)存中的,這時(shí)候就需要用到python的file.readline()進(jìn)行迭代式的逐行讀取:
filename = 'hello.txt'
with open(filename, 'r') as file:
line = file.readline()
counts = 1
while line:
if counts >= 50000000:
break
line = file.readline()
counts += 1
這里我們的實(shí)現(xiàn)方式是先用一個(gè)with語(yǔ)句打開(kāi)一個(gè)文件,然后用readline()函數(shù)配合while循環(huán)逐行加載,最終通過(guò)一個(gè)序號(hào)標(biāo)記來(lái)結(jié)束循環(huán)遍歷,輸出文件第50000000行的內(nèi)容。該代碼的執(zhí)行效果如下:
dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py
real 0m10.359s
user 0m10.062s
sys 0m0.296s
雖然在python的readline函數(shù)中并沒(méi)有實(shí)現(xiàn)讀取指定行內(nèi)容的方案,但是在另一個(gè)庫(kù)linecache中是實(shí)現(xiàn)了的,由于使用的方式較為簡(jiǎn)單,這里直接放上代碼示例供參考:
filename = 'hello.txt'
import linecache
text = linecache.getline(filename, 50000000)
該代碼的執(zhí)行結(jié)果如下:
dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py
real 0m11.904s
user 0m5.672s
sys 0m6.231s
雖然在實(shí)現(xiàn)方式上簡(jiǎn)化了許多,但是我們發(fā)現(xiàn)這個(gè)實(shí)現(xiàn)的用時(shí)超過(guò)了11s,還不如我們自己手動(dòng)實(shí)現(xiàn)的循環(huán)遍歷方案。因此如果是對(duì)于性能有一定要求的場(chǎng)景,是不建議采用這個(gè)方案的。
我們知道用Linux系統(tǒng)本身自帶的sed指令也是可以獲取到文件指定行或者是指定行范圍的數(shù)據(jù)的,其執(zhí)行指令為:sed -n 50000000p filename即表示讀取文件的第50000000行的內(nèi)容。同時(shí)結(jié)合python的話(huà),我們可以在python代碼中執(zhí)行系統(tǒng)指令并獲取輸出結(jié)果:
filename = 'hello.txt'
import os
result = os.popen('sed -n {}p {}'.format(50000000, filename)).read()
需要注意的是,如果直接運(yùn)行os.system()是沒(méi)有返回值的,只有os.popen()是有返回值的,并且需要在尾巴加上一個(gè)read()的選項(xiàng)。該代碼的執(zhí)行結(jié)果如下:
dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py
real 0m2.532s
user 0m0.032s
sys 0m0.020s
可以看到直接使用sed指令的執(zhí)行速度很快,但是用這種方法并不是一本萬(wàn)利的,比如以下這個(gè)例子:
filename = 'hello.txt'
import os
result = os.popen('sed -n {}p {}'.format(500, filename)).read()
我們把讀取第50000000行內(nèi)容改為讀取第500行的內(nèi)容,再運(yùn)行一次程序:
dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py
real 0m2.540s
user 0m0.037s
sys 0m0.013s
然而我們發(fā)現(xiàn)這個(gè)速度并沒(méi)有因?yàn)橐x取的行數(shù)減少了而變少,而是幾乎保持不變的。
本文通過(guò)4個(gè)測(cè)試案例分析了在python中讀取文件指定行內(nèi)容的方案,并得到了一些運(yùn)行耗時(shí)的數(shù)據(jù)。從需求上來(lái)說(shuō),如果是對(duì)于小規(guī)模的數(shù)據(jù),比如幾百行規(guī)模的數(shù)據(jù),建議使用readline循環(huán)遍歷來(lái)操作,速度也相當(dāng)不錯(cuò),或者是linecache中的函數(shù)實(shí)現(xiàn)也是可以的,甚至可以直接用readlines將整個(gè)文本內(nèi)容加載到內(nèi)存中。但是對(duì)于數(shù)據(jù)規(guī)模比較大的場(chǎng)景,比如超過(guò)了千萬(wàn)行的級(jí)別,那么使用sed指令的方式對(duì)指定行內(nèi)容進(jìn)行讀取的方式,應(yīng)該是所有方式中最快速的。
本文首發(fā)鏈接為:https://www.cnblogs.com/dechinphy/p/lbl.html
作者ID:DechinPhy
更多原著文章請(qǐng)參考:https://www.cnblogs.com/dechinphy/
打賞專(zhuān)用鏈接:https://www.cnblogs.com/dechinphy/gallery/image/379634.html
騰訊云專(zhuān)欄同步:https://cloud.tencent.com/developer/column/91958
頭條號(hào)同步:https://www.toutiao.com/c/user/token/MS4wLjABAAAA0XQA38KoijOGhh-GLoqYttlkilR7BRepm5pujqXuidY/?tab=all
>微服務(wù)環(huán)境搭建本章中,我們將介紹旨在利用多核處理器的多線程 API 和構(gòu)造:
這些構(gòu)造統(tǒng)稱(chēng)為(松散地)并行 (PFX)。并行類(lèi)與任務(wù)并行性構(gòu)造一起稱(chēng)為 (TPL)。
在閱讀本章之前,您需要熟悉第 中的基礎(chǔ)知識(shí),尤其是鎖定、線程安全和 Task 類(lèi)。
.NET 提供了許多其他專(zhuān)用 API 來(lái)幫助進(jìn)行并行和異步編程:
在過(guò)去的 15 年里,CPU 制造商已經(jīng)從單核處理器轉(zhuǎn)向多核處理器。這對(duì)我們程序員來(lái)說(shuō)是有問(wèn)題的,因?yàn)閱尉€程代碼不會(huì)因?yàn)檫@些額外的內(nèi)核而自動(dòng)運(yùn)行得更快。
對(duì)于大多數(shù)服務(wù)器應(yīng)用程序來(lái)說(shuō),使用多個(gè)內(nèi)核很容易,其中每個(gè)線程都可以獨(dú)立處理單獨(dú)的客戶(hù)端請(qǐng)求,但在桌面上則更加困難,因?yàn)樗ǔP枰@取計(jì)算密集型代碼并執(zhí)行以下操作:
盡管您可以使用經(jīng)典的多線程構(gòu)造完成所有這些操作,但它很尷尬,尤其是分區(qū)和整理的步驟。另一個(gè)問(wèn)題是,當(dāng)許多線程同時(shí)處理相同的數(shù)據(jù)時(shí),通常的線程安全鎖定策略會(huì)導(dǎo)致大量爭(zhēng)用。
PFX 庫(kù)專(zhuān)為幫助這些方案而設(shè)計(jì)。
利用多核或多處理器的編程稱(chēng)為。這是更廣泛的多線程概念的子集。
在線程之間劃分工作有兩種策略:。
當(dāng)必須對(duì)許多數(shù)據(jù)值執(zhí)行一組任務(wù)時(shí),我們可以通過(guò)讓每個(gè)線程對(duì)值子集執(zhí)行(相同的)任務(wù)集來(lái)并行化。這稱(chēng)為數(shù)據(jù),因?yàn)槲覀冊(cè)诰€程之間對(duì)進(jìn)行分區(qū)。相反,使用任務(wù),我們對(duì)進(jìn)行分區(qū);換句話(huà)說(shuō),我們讓每個(gè)線程執(zhí)行不同的任務(wù)。
通常,數(shù)據(jù)并行性更容易,并且可以更好地?cái)U(kuò)展到高度并行的硬件,因?yàn)樗梢詼p少或消除共享數(shù)據(jù)(從而減少爭(zhēng)用和線程安全問(wèn)題)。此外,數(shù)據(jù)并行性利用了數(shù)據(jù)值通常比離散任務(wù)更多的事實(shí),從而增加了并行性的潛力。
數(shù)據(jù)并行性也有利于結(jié)構(gòu)化并行,這意味著并行工作單元在程序中的同一位置開(kāi)始和結(jié)束。相比之下,任務(wù)并行性往往是非結(jié)構(gòu)化的,這意味著并行工作單元可能在分散在程序中的位置開(kāi)始和結(jié)束。結(jié)構(gòu)化并行性更簡(jiǎn)單,更不容易出錯(cuò),允許您將分區(qū)和線程協(xié)調(diào)(甚至結(jié)果排序)的困難工作轉(zhuǎn)移到庫(kù)中。
PFX 包含兩層功能,如圖 所示。較高層由兩個(gè) API 組成:PLINQ 和 Parallel 類(lèi)。下層包含任務(wù)并行類(lèi),以及一組有助于并行編程活動(dòng)的附加構(gòu)造。
PLINQ 提供了最豐富的功能:它自動(dòng)執(zhí)行并行化的所有步驟,包括將工作劃分為任務(wù)、在線程上執(zhí)行這些任務(wù)以及將結(jié)果整理到單個(gè)輸出序列中。它稱(chēng)為聲明式,因?yàn)槟恍枰⑿谢墓ぷ鳎鷮⑵錁?gòu)造為 LINQ 查詢(xún)),并讓運(yùn)行時(shí)處理實(shí)現(xiàn)細(xì)節(jié)。相比之下,其他方法是的,因?yàn)槟枰@式編寫(xiě)代碼來(lái)分區(qū)或整理。如以下概要所示,對(duì)于并行類(lèi),您必須自己整理結(jié)果;使用任務(wù)并行構(gòu)造,您還必須自己對(duì)工作進(jìn)行分區(qū):
分區(qū)工作 | 整理結(jié)果 | |
普林克 | 是的 | 是的 |
并行類(lèi) | 是的 | 不 |
PFX 的任務(wù)并行性 | 不 | 不 |
并發(fā)集合和旋轉(zhuǎn)基元可幫助您進(jìn)行較低級(jí)別的并行編程活動(dòng)。這些很重要,因?yàn)?PFX 不僅適用于當(dāng)今的硬件,還適用于具有更多內(nèi)核的未來(lái)幾代處理器。如果你想移動(dòng)一堆切碎的木頭,并且你有32名工人來(lái)完成這項(xiàng)工作,那么最大的挑戰(zhàn)是在工人互不妨礙的情況下移動(dòng)木材。這與在 32 個(gè)內(nèi)核之間劃分算法相同:如果使用普通鎖來(lái)保護(hù)公共資源,則由此產(chǎn)生的阻塞可能意味著這些內(nèi)核中只有一小部分實(shí)際上同時(shí)處于繁忙狀態(tài)。并發(fā)集合專(zhuān)門(mén)針對(duì)高并發(fā)訪問(wèn)進(jìn)行了優(yōu)化,重點(diǎn)是最小化或消除阻塞。PLINQ 和 Parallel 類(lèi)本身依賴(lài)于并發(fā)集合和旋轉(zhuǎn)基元來(lái)有效地管理工作。
PFX 的主要用例是:利用多核處理器來(lái)加速計(jì)算密集型代碼。
并行編程中的一個(gè)挑戰(zhàn)是阿姆達(dá)爾定律,該定律指出并行化的最大性能改進(jìn)由必須按順序執(zhí)行的代碼部分決定。例如,如果算法的執(zhí)行時(shí)間只有三分之二是可并行化的,則即使內(nèi)核數(shù)量無(wú)限,性能提升也永遠(yuǎn)不會(huì)超過(guò)三倍。
并行編程結(jié)構(gòu)不僅可用于利用多核,還可用于其他方案:
因此,在繼續(xù)之前,有必要驗(yàn)證瓶頸是否在可并行化代碼中。同樣值得考慮的是,計(jì)算密集型 - 優(yōu)化通常是最簡(jiǎn)單、最有效的方法。但是,需要權(quán)衡的是,某些優(yōu)化技術(shù)可能會(huì)使并行化代碼變得更加困難。
最簡(jiǎn)單的收益來(lái)自所謂的問(wèn)題 - 這是當(dāng)一個(gè)作業(yè)可以很容易地劃分為可以有效地自行執(zhí)行的任務(wù)時(shí)(結(jié)構(gòu)化并行性非常適合此類(lèi)問(wèn)題)。示例包括數(shù)學(xué)或密碼學(xué)中的許多圖像處理任務(wù)、光線追蹤和暴力破解方法。非尷尬并行問(wèn)題的一個(gè)例子是實(shí)現(xiàn)快速排序算法的優(yōu)化版本 - 一個(gè)好的結(jié)果需要一些思考,并且可能需要非結(jié)構(gòu)化并行性。
PLINQ 會(huì)自動(dòng)并行處理本地 LINQ 查詢(xún)。PLINQ 的優(yōu)點(diǎn)是易于使用,因?yàn)樗鼘⒐ぷ鞣謪^(qū)和結(jié)果排序規(guī)則的負(fù)擔(dān)卸載到 .NET。
若要使用 PLINQ,只需在輸入序列上調(diào)用 AsParallel(),然后像往常一樣繼續(xù) LINQ 查詢(xún)。以下查詢(xún)計(jì)算 3 到 100,000 之間的質(zhì)數(shù),充分利用目標(biāo)計(jì)算機(jī)上的所有內(nèi)核:
// Calculate prime numbers using a simple (unoptimized) algorithm.
IEnumerable<int> numbers = Enumerable.Range (3, 100000-3);
var parallelQuery =
from n in numbers.AsParallel()
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;
int[] primes = parallelQuery.ToArray();
AsParallel 是 System.Linq.ParallelEnumerable 中的擴(kuò)展方法。它將輸入包裝在基于 ParallelQuery<TSource> 的序列中,這會(huì)導(dǎo)致隨后調(diào)用的 LINQ 查詢(xún)運(yùn)算符綁定到在 ParallelEnumerable 中定義的一組備用擴(kuò)展方法。它們提供了每個(gè)標(biāo)準(zhǔn)查詢(xún)運(yùn)算符的并行實(shí)現(xiàn)。本質(zhì)上,它們的工作原理是將輸入序列劃分為在不同線程上執(zhí)行的塊,將結(jié)果整理回單個(gè)輸出序列以供使用,如圖 所示。
調(diào)用 AsSequential() 將解開(kāi) ParallelQuery 序列,以便后續(xù)查詢(xún)運(yùn)算符綁定到標(biāo)準(zhǔn)查詢(xún)運(yùn)算符并按順序執(zhí)行。在調(diào)用具有副作用或不是線程安全的方法之前,這是必需的。
對(duì)于接受兩個(gè)輸入序列(聯(lián)接、組聯(lián)、康卡特、并集、相交、except 和 Zip)的查詢(xún)運(yùn)算符,必須將 AsParallel() 應(yīng)用于兩個(gè)輸入序列(否則將引發(fā)異常)。但是,您不需要在查詢(xún)進(jìn)行時(shí)繼續(xù)將 AsParallel 應(yīng)用于查詢(xún),因?yàn)?PLINQ 的查詢(xún)運(yùn)算符會(huì)輸出另一個(gè) ParallelQuery 序列。事實(shí)上,再次調(diào)用 AsParallel 會(huì)導(dǎo)致效率低下,因?yàn)樗鼤?huì)強(qiáng)制合并和重新分區(qū)查詢(xún):
mySequence.AsParallel() // Wraps sequence in ParallelQuery<int>
.Where (n => n > 100) // Outputs another ParallelQuery<int>
.AsParallel() // Unnecessary - and inefficient!
.Select (n => n * n)
并非所有查詢(xún)運(yùn)算符都可以有效地并行化。對(duì)于那些不能實(shí)現(xiàn)運(yùn)算符(請(qǐng)參閱 將按順序?qū)崿F(xiàn)運(yùn)算符。如果 PLINQ 懷疑并行化的開(kāi)銷(xiāo)實(shí)際上會(huì)減慢特定查詢(xún)的速度,則 PLINQ 也可能按順序運(yùn)行。
PLINQ 僅適用于本地集合:例如,它不適用于實(shí)體框架,因?yàn)樵谶@些情況下,LINQ 會(huì)轉(zhuǎn)換為 SQL,然后在數(shù)據(jù)庫(kù)服務(wù)器上執(zhí)行。但是, PLINQ 對(duì)從數(shù)據(jù)庫(kù)查詢(xún)獲取的結(jié)果集執(zhí)行其他本地查詢(xún)。
如果 PLINQ 查詢(xún)引發(fā)異常,則會(huì)將其重新引發(fā)為 AggregateException,其 InnerExceptions 屬性包含真正的異常(或多個(gè)異常)。有關(guān)更多詳細(xì)信息,請(qǐng)參閱
與普通的 LINQ 查詢(xún)一樣,PLINQ 查詢(xún)是延遲計(jì)算的。這意味著只有在您開(kāi)始使用結(jié)果時(shí)才會(huì)觸發(fā)執(zhí)行 — 通常通過(guò) foreach 循環(huán)(盡管它也可以通過(guò)轉(zhuǎn)換運(yùn)算符(如 ToArray 或返回單個(gè)元素或值的運(yùn)算符)觸發(fā))。
但是,在枚舉結(jié)果時(shí),執(zhí)行過(guò)程與普通順序查詢(xún)的執(zhí)行方式略有不同。順序查詢(xún)完全由使用者以“拉取”方式提供支持:輸入序列中的每個(gè)元素在使用者需要時(shí)準(zhǔn)確獲取。并行查詢(xún)通常使用獨(dú)立的線程從輸入序列中獲取元素,略于使用者需要它們的時(shí)間(更像新聞閱讀器的提詞器)。然后,它通過(guò)查詢(xún)鏈并行處理元素,將結(jié)果保存在一個(gè)小緩沖區(qū)中,以便它們?yōu)榘葱枋褂谜咦龊脺?zhǔn)備。如果使用者提前暫停或中斷枚舉,查詢(xún)處理器也會(huì)暫停或停止,以免浪費(fèi) CPU 時(shí)間或內(nèi)存。
您可以通過(guò)在 AsParallel 之后調(diào)用 WithMergeOptions 來(lái)調(diào)整 PLINQ 的緩沖行為。自動(dòng)緩沖的默認(rèn)值通常可提供最佳的總體結(jié)果。NotBuffered 禁用緩沖區(qū),如果您希望盡快看到結(jié)果,它很有用;完全緩沖在將整個(gè)結(jié)果集呈現(xiàn)給使用者之前對(duì)其進(jìn)行緩存(OrderBy 和反向運(yùn)算符自然以這種方式工作,元素、聚合和轉(zhuǎn)換運(yùn)算符也是如此)。
鑒于 AsParallel 透明地并行化 LINQ 查詢(xún),問(wèn)題就來(lái)了:為什么Microsoft不簡(jiǎn)單地并行化標(biāo)準(zhǔn)查詢(xún)運(yùn)算符并將 PLINQ 設(shè)為默認(rèn)值?
方法的原因有很多。首先,要使 PLINQ 有用,必須有合理數(shù)量的計(jì)算密集型工作才能將其分配給工作線程。大多數(shù) LINQ 到對(duì)象查詢(xún)的執(zhí)行速度都非常快;因此,不僅不需要并行化,而且分區(qū)、整理和協(xié)調(diào)額外線程的開(kāi)銷(xiāo)實(shí)際上可能會(huì)減慢速度。
此外:
最后,PLINQ 提供了相當(dāng)多的鉤子用于調(diào)整和調(diào)整。給標(biāo)準(zhǔn)的 LINQ to-Objects API 增加這些細(xì)微差別的負(fù)擔(dān)會(huì)增加分心。
并行化查詢(xún)運(yùn)算符的副作用是,在整理結(jié)果時(shí),結(jié)果的提交順序不一定相同(參見(jiàn))。換句話(huà)說(shuō),LINQ 對(duì)序列的正常順序保留保證不再有效。
如果需要順序保留,可以通過(guò)在 之后調(diào)用 AsOrdered() 來(lái)強(qiáng)制保留它:
myCollection.AsParallel().AsOrdered()...
調(diào)用 AsOrdered 會(huì)導(dǎo)致大量元素的性能下降,因?yàn)?PLINQ 必須跟蹤每個(gè)元素的原始位置。
您可以通過(guò)調(diào)用 AsUnordered 來(lái)抵消查詢(xún)中稍后的 AsOrdered 的影響:這將引入一個(gè)“隨機(jī)隨機(jī)洗牌點(diǎn)”,它允許點(diǎn)開(kāi)始更有效地執(zhí)行。因此,如果只想保留前兩個(gè)查詢(xún)運(yùn)算符的輸入序列順序,則可以執(zhí)行以下操作:
inputSequence.AsParallel().AsOrdered()
.QueryOperator1()
.QueryOperator2()
.AsUnordered() // From here on, ordering doesn’t matter
.QueryOperator3()
...
AsOrdered 不是默認(rèn)值,因?yàn)閷?duì)于大多數(shù)查詢(xún),原始輸入順序無(wú)關(guān)緊要。換句話(huà)說(shuō),如果 AsOrdered 是默認(rèn)值,則需要將 AsUnordered 應(yīng)用于大多數(shù)并行查詢(xún)以獲得最佳性能,這將很麻煩。
PLINQ 可以并行化的內(nèi)容存在實(shí)際限制。默認(rèn)情況下,以下查詢(xún)運(yùn)算符會(huì)阻止并行化,除非源元素位于其原始索引位置:
Select 、SelectMany 和 ElementAt 的索引版本
大多數(shù)查詢(xún)運(yùn)算符會(huì)更改元素的索引位置(包括刪除元素的運(yùn)算符,例如 Where )。這意味著,如果要使用前面的運(yùn)算符,它們通常需要位于查詢(xún)的開(kāi)頭。
以下查詢(xún)運(yùn)算符是可并行化的,但使用昂貴的分區(qū)策略,有時(shí)可能比順序處理慢:
連接 、分組依據(jù) 、組連接 、非重復(fù) 、 并集 、 相交 和 除外
聚合運(yùn)算符在其標(biāo)準(zhǔn)化身中的重載不可并行化 — PLINQ 提供了特殊的重載來(lái)處理此問(wèn)題(請(qǐng)參閱)。
所有其他運(yùn)算符都是可并行化的,盡管使用這些運(yùn)算符并不能保證查詢(xún)將被并行化。如果 PLINQ 懷疑并行化的開(kāi)銷(xiāo)會(huì)減慢該特定查詢(xún)的速度,則 PLINQ 可能會(huì)按順序運(yùn)行查詢(xún)。您可以通過(guò)在 AsParallel() 之后調(diào)用以下內(nèi)容來(lái)覆蓋此行為并強(qiáng)制并行:
.WithExecutionMode (ParallelExecutionMode.ForceParallelism)
假設(shè)我們要編寫(xiě)一個(gè)拼寫(xiě)檢查器,該拼寫(xiě)檢查器利用所有可用內(nèi)核快速運(yùn)行非常大的文檔。通過(guò)將我們的算法表述為 LINQ 查詢(xún),我們可以很容易地將其并行化。
第一步是將英語(yǔ)單詞詞典下載到HashSet中,以便高效查找:
if (!File.Exists ("WordLookup.txt") // Contains about 150,000 words
File.WriteAllText ("WordLookup.txt",
await new HttpClient().GetStringAsync (
"http://www.albahari.com/ispell/allwords.txt"));
var wordLookup = new HashSet<string> (
File.ReadAllLines ("WordLookup.txt"),
StringComparer.InvariantCultureIgnoreCase);
然后,我們使用單詞查找創(chuàng)建一個(gè)測(cè)試“文檔”,其中包含一百萬(wàn)個(gè)隨機(jī)單詞的數(shù)組。構(gòu)建數(shù)組后,讓我們引入幾個(gè)拼寫(xiě)錯(cuò)誤:
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();
wordsToTest [12345] = "woozsh"; // Introduce a couple
wordsToTest [23456] = "wubsie"; // of spelling mistakes.
現(xiàn)在我們可以通過(guò)測(cè)試單詞到測(cè)試來(lái)執(zhí)行并行拼寫(xiě)檢查 wordLookup .PLINQ 使這變得非常簡(jiǎn)單:
var query = wordsToTest
.AsParallel()
.Select ((word, index) => new IndexedWord { Word=word, Index=index })
.Where (iword => !wordLookup.Contains (iword.Word))
.OrderBy (iword => iword.Index);
foreach (var mistake in query)
Console.WriteLine (mistake.Word + " - index = " + mistake.Index);
// OUTPUT:
// woozsh - index = 12345
// wubsie - index = 23456
索引字是一個(gè)自定義結(jié)構(gòu),我們定義如下:
struct IndexedWord { public string Word; public int Index; }
謂詞中的 wordLookup.Contains 方法為查詢(xún)提供了一些“肉”,使其值得并行化。
我們可以通過(guò)使用匿名類(lèi)型而不是 IndexedWord 結(jié)構(gòu)來(lái)稍微簡(jiǎn)化查詢(xún)。但是,這會(huì)降低性能,因?yàn)槟涿?lèi)型(類(lèi),因此是引用類(lèi)型)會(huì)產(chǎn)生基于堆的分配和后續(xù)垃圾回收的成本。
對(duì)于順序查詢(xún),這種差異可能不足以影響,但對(duì)于并行查詢(xún),支持基于堆棧的分配可能非常有利。這是因?yàn)榛诙褩5姆峙涫歉叨瓤刹⑿谢模ㄒ驗(yàn)槊總€(gè)線程都有自己的堆棧),而所有線程必須競(jìng)爭(zhēng)同一個(gè)堆 - 由單個(gè)內(nèi)存管理器和垃圾回收器管理。
讓我們通過(guò)并行創(chuàng)建隨機(jī)測(cè)試詞列表本身來(lái)擴(kuò)展我們的示例。我們將其構(gòu)造為 LINQ 查詢(xún),因此應(yīng)該很容易。這是順序版本:
string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();
不幸的是,調(diào)用隨機(jī)。Next 不是線程安全的,因此它不像將 AsParallel() 插入查詢(xún)那么簡(jiǎn)單。一個(gè)可能的解決方案是編寫(xiě)一個(gè)鎖定隨機(jī)的函數(shù)。下一個(gè);但是,這將限制并發(fā)性。更好的選擇是使用 ThreadLocal<Random>(參見(jiàn)中的創(chuàng)建一個(gè)單獨(dú)的 Random 對(duì)象。然后,我們可以并行化查詢(xún),如下所示:
var localRandom = new ThreadLocal<Random>
( () => new Random (Guid.NewGuid().GetHashCode()) );
string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
.Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
.ToArray();
在用于實(shí)例化隨機(jī)對(duì)象的工廠函數(shù)中,我們傳入 Guid 的哈希碼,以確保如果在短時(shí)間內(nèi)創(chuàng)建兩個(gè)隨機(jī)對(duì)象,它們將產(chǎn)生不同的隨機(jī)數(shù)序列。
由于 PLINQ 在并行線程上運(yùn)行查詢(xún),因此必須注意不要執(zhí)行線程不安全的操作。特別是,寫(xiě)入,因此線程不安全:
// The following query multiplies each element by its position.
// Given an input of Enumerable.Range(0,999), it should output squares.
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;
我們可以通過(guò)使用鎖使增量 i 線程安全,但問(wèn)題仍然存在,i 不一定對(duì)應(yīng)于輸入元素的位置。將 AsOrdered 添加到查詢(xún)中并不能解決后一個(gè)問(wèn)題,因?yàn)?AsOrdered 只能確保元素的輸出順序與按順序處理的順序一致,而實(shí)際上它并不按順序它們。
正確的解決方案是重寫(xiě)我們的查詢(xún)以使用 Select 的索引版本:
在現(xiàn)有應(yīng)用程序中搜索 LINQ 查詢(xún)并嘗試并行化它們很誘人。這通常是無(wú)效的,因?yàn)?LINQ 顯然是最佳解決方案的大多數(shù)問(wèn)題往往執(zhí)行得非常快,因此無(wú)法從并行化中受益。更好的方法是查找 CPU 密集型瓶頸,然后考慮是否可以將其表示為 LINQ 查詢(xún)。(這種重組的一個(gè)受歡迎的副作用是 LINQ 通常使代碼更小且更具可讀性。
PLINQ 非常適合令人尷尬的并行問(wèn)題。然而,對(duì)于成像來(lái)說(shuō),這可能是一個(gè)糟糕的選擇,因?yàn)閷?shù)百萬(wàn)像素整理成一個(gè)輸出序列會(huì)產(chǎn)生瓶頸。相反,最好將像素直接寫(xiě)入數(shù)組或非托管內(nèi)存塊,并使用并行類(lèi)或任務(wù)并行性來(lái)管理多線程。(但是,使用 ForAll 可以擊敗結(jié)果排序規(guī)則 — 我們?cè)谥袑?duì)此進(jìn)行了討論。如果圖像處理算法自然適合 LINQ,則這樣做是有意義的。
var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);
為了獲得最佳性能,從查詢(xún)運(yùn)算符調(diào)用的任何方法都應(yīng)該是線程安全的,因?yàn)樗粚?xiě)入字段或?qū)傩裕ǚ歉弊饔没颍H绻鼈兺ㄟ^(guò)鎖定是線程安全的,則查詢(xún)的并行性潛力將受到持續(xù)時(shí)間除以在該函數(shù)中花費(fèi)的總時(shí)間的限制。
默認(rèn)情況下,PLINQ 為正在使用的處理器選擇最佳并行度。您可以通過(guò)在 AsParallel 之后調(diào)用 WithDegreeOfParallelism 來(lái)覆蓋它:
...AsParallel().WithDegreeOfPallelism(4)...
當(dāng)并行度增加到超出核心計(jì)數(shù)時(shí),一個(gè)例子是 I/O 密集型工作(例如,一次下載多個(gè)網(wǎng)頁(yè))。然而,任務(wù)組合器和異步函數(shù)提供了一種同樣簡(jiǎn)單和的解決方案(參見(jiàn)中的)。與任務(wù) s 不同,PLINQ 無(wú)法在不阻塞線程(和線程,使情況更糟)的情況下執(zhí)行 I/O 綁定工作。
在 PLINQ 查詢(xún)中只能調(diào)用一次 WithDegreeOfParallelism。如果需要再次調(diào)用它,則必須通過(guò)在查詢(xún)中再次調(diào)用 AsParallel() 來(lái)強(qiáng)制合并和重新分區(qū)查詢(xún):
"The Quick Brown Fox"
.AsParallel().WithDegreeOfParallelism (2)
.Where (c => !char.IsWhiteSpace (c))
.AsParallel().WithDegreeOfParallelism (3) // Forces Merge + Partition
.Select (c => char.ToUpper (c))
取消 PLINQ 查詢(xún)(在 foreach 循環(huán)中使用其結(jié)果)很容易:只需脫離 foreach ,查詢(xún)將自動(dòng)取消,因?yàn)槊杜e器是隱式釋放的。
對(duì)于以轉(zhuǎn)換、元素或聚合運(yùn)算符終止的查詢(xún),可以通過(guò)取消令牌從另一個(gè)線程它(請(qǐng)參閱中的)。若要插入令牌,請(qǐng)?jiān)谡{(diào)用 AsParallel 后調(diào)用 WithCancel,傳入 CancelTokenSource 對(duì)象的 Token 屬性。然后,另一個(gè)線程可以在令牌源上調(diào)用 Cancel,這會(huì)在查詢(xún)的使用者上引發(fā) OperationCanceledException:
IEnumerable<int> million = Enumerable.Range (3, 1000000);
var cancelSource = new CancellationTokenSource();
var primeNumberQuery =
from n in million.AsParallel().WithCancellation (cancelSource.Token)
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;
new Thread (() => {
Thread.Sleep (100); // Cancel query after
cancelSource.Cancel(); // 100 milliseconds.
}
).Start();
try
{
// Start query running:
int[] primes = primeNumberQuery.ToArray();
// We'll never get here because the other thread will cancel us.
}
catch (OperationCanceledException)
{
Console.WriteLine ("Query canceled");
}
取消后,PLINQ 會(huì)等待每個(gè)工作線程完成其當(dāng)前元素,然后再結(jié)束查詢(xún)。這意味著查詢(xún)調(diào)用的任何外部方法都將運(yùn)行完成。
PLINQ 的優(yōu)勢(shì)之一是它可以方便地將并行化工作的結(jié)果整理到單個(gè)輸出序列中。但是,有時(shí),您最終對(duì)該序列所做的只是在每個(gè)元素上運(yùn)行一次某個(gè)函數(shù):
foreach (int n in parallelQuery)
DoSomething (n);
如果是這種情況,并且您不關(guān)心元素的處理順序,則可以使用 PLINQ 的 ForAll 方法提高效率。
ForAll 方法對(duì) ParallelQuery 的每個(gè)輸出元素運(yùn)行一個(gè)委托。它直接掛接到 PLINQ 的內(nèi)部,繞過(guò)整理和枚舉結(jié)果的步驟。這里有一個(gè)簡(jiǎn)單的例子:
"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);
顯示了該過(guò)程。
整理和枚舉結(jié)果并不是一項(xiàng)成本高昂的操作,因此當(dāng)有大量快速執(zhí)行的輸入時(shí),F(xiàn)orAll 優(yōu)化會(huì)產(chǎn)生最大的收益。
PLINQ 有三種分區(qū)策略,用于將輸入元素分配給線程:
策略 | 元素分配 | 相對(duì)性能 |
區(qū)塊分區(qū) | 動(dòng)態(tài) | 平均 |
范圍分區(qū) | 靜態(tài)的 | 從差到優(yōu) |
哈希分區(qū) | 靜態(tài)的 | 窮 |
對(duì)于需要比較元素( GroupBy 、 Join 、 GroupJoin 、 相交 、 except 、 并集 和 非重復(fù) )的查詢(xún)運(yùn)算符,您別無(wú)選擇:PLINQ 始終使用。哈希分區(qū)的效率相對(duì)較低,因?yàn)樗仨氼A(yù)先計(jì)算每個(gè)元素的哈希代碼(以便可以在同一線程上處理具有相同哈希代碼的元素)。如果發(fā)現(xiàn)這太慢,唯一的選擇是調(diào)用 AsSequential 以禁用并行化。
對(duì)于所有其他查詢(xún)運(yùn)算符,您可以選擇是使用范圍分區(qū)還是塊分區(qū)。默認(rèn)情況下:
簡(jiǎn)而言之,對(duì)于長(zhǎng)序列,范圍分區(qū)更快,每個(gè)元素都需要相似的 CPU 時(shí)間來(lái)處理。否則,塊分區(qū)通常更快。
要強(qiáng)制范圍分區(qū):
ParallelEnumerable.Range 不僅僅是調(diào)用 Enumerable.Range( ... ) 的快捷方式。作為并行() .它通過(guò)激活范圍分區(qū)來(lái)更改查詢(xún)的性能。
要強(qiáng)制塊分區(qū),請(qǐng)將輸入序列包裝在對(duì) Partitioner.Create(在 System.Collection.Concurrent 中)的調(diào)用中,如下所示:
int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery =
Partitioner.Create (numbers, true).AsParallel()
.Where (...)
Partitioner.Create 的第二個(gè)參數(shù)表示您希望對(duì)查詢(xún),這是您希望塊分區(qū)的另一種說(shuō)法。
塊分區(qū)的工作原理是讓每個(gè)工作線程定期從輸入序列中抓取小的元素“塊”進(jìn)行處理(參見(jiàn))。PLINQ 首先分配非常小的塊(一次一個(gè)或兩個(gè)元素)。然后,它會(huì)隨著查詢(xún)的進(jìn)行增加塊大小:這可確保小序列有效地并行化,并且大序列不會(huì)導(dǎo)致過(guò)多的往返。如果一個(gè)工人碰巧得到了“簡(jiǎn)單”的元素(這個(gè)過(guò)程很快),它最終會(huì)得到更多的塊。該系統(tǒng)使每個(gè)線程保持同樣繁忙(并且內(nèi)核“平衡”);唯一的缺點(diǎn)是從共享輸入序列中獲取元素需要同步(通常是獨(dú)占鎖),這可能會(huì)導(dǎo)致一些開(kāi)銷(xiāo)和爭(zhēng)用。
范圍分區(qū)繞過(guò)正常的輸入端枚舉,并為每個(gè)工作線程預(yù)分配相同數(shù)量的元素,從而避免對(duì)輸入序列進(jìn)行爭(zhēng)用。但是,如果某些線程碰巧獲得簡(jiǎn)單的元素并提前完成,則它們將閑置,而其余線程將繼續(xù)工作。我們?cè)缙诘乃財(cái)?shù)計(jì)算器在范圍分區(qū)方面可能表現(xiàn)不佳。當(dāng)范圍分區(qū)可以很好地工作時(shí),計(jì)算前 10 萬(wàn)個(gè)整數(shù)的平方根之和的一個(gè)例子:
ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i))
ParallelEnumerable.Range 返回一個(gè) ParallelQuery<T> ,因此您隨后不需要調(diào)用 AsParallel 。
范圍分區(qū)不一定在塊中分配元素范圍,而是選擇“條帶化”策略。例如,如果有兩個(gè)工作器,則一個(gè)工作器可能處理奇數(shù)元素,而另一個(gè)工作器處理偶數(shù)元素。TakeWhile 運(yùn)算符幾乎肯定會(huì)觸發(fā)條帶化策略,以避免在序列的后期不必要地處理元素。
PLINQ 可以有效地并行化 Sum、平均值、最小值和最大值運(yùn)算符,無(wú)需額外干預(yù)。但是,聚合運(yùn)算符給 PLINQ 帶來(lái)了特殊的挑戰(zhàn)。如所述,聚合執(zhí)行自定義聚合。例如,下面對(duì)數(shù)字序列求和,模仿 Sum 運(yùn)算符:
int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate (0, (total, n) => total + n); // 6
我們還在第 中看到,對(duì)于聚合,提供的委托必須是關(guān)聯(lián)和交換的。如果違反此規(guī)則,PLINQ 將給出不正確的結(jié)果,因?yàn)樗鼜妮斎胄蛄兄刑崛《鄠€(gè)種子,以便同時(shí)聚合序列的分區(qū)。
顯式種子聚合似乎是 PLINQ 的安全選項(xiàng),但不幸的是,由于依賴(lài)于單個(gè)種子,這些聚合通常按順序執(zhí)行。為了緩解此問(wèn)題,PLINQ 提供了另一個(gè)聚合重載,允許您指定多個(gè)種子,或者更確切地說(shuō),指定。對(duì)于每個(gè)線程,它執(zhí)行此函數(shù)以生成一個(gè)單獨(dú)的種子,該種子成為累加器,它在本地聚合元素。
您還必須提供一個(gè)函數(shù)來(lái)指示如何組合本地和主累加器。最后,此聚合重載(有點(diǎn)無(wú)償)期望委托對(duì)結(jié)果執(zhí)行任何最終轉(zhuǎn)換(您可以通過(guò)之后自己對(duì)結(jié)果運(yùn)行某些函數(shù)來(lái)輕松實(shí)現(xiàn)此目的)。因此,以下是四個(gè)代表,按通過(guò)順序排列:
種子工廠
返回新的本地累加器
更新累加器功能
將元素聚合到本地累加器
組合蓄能器功能
將本地蓄能器與主蓄能器相結(jié)合
結(jié)果選擇器
對(duì)最終結(jié)果應(yīng)用任何最終轉(zhuǎn)換
在簡(jiǎn)單方案中,可以指定種子工廠。當(dāng)種子是要更改的引用類(lèi)型時(shí),此策略將失敗,因?yàn)槊總€(gè)線程將共享相同的實(shí)例。
舉一個(gè)非常簡(jiǎn)單的例子,下面對(duì)數(shù)字?jǐn)?shù)組中的值求和:
numbers.AsParallel().Aggregate (
() => 0, // seedFactory
(localTotal, n) => localTotal + n, // updateAccumulatorFunc
(mainTot, localTot) => mainTot + localTot, // combineAccumulatorFunc
finalResult => finalResult) // resultSelector
這個(gè)例子是人為的,因?yàn)槲覀兛梢允褂酶?jiǎn)單的方法(例如未播種聚合,或者更好的 Sum 運(yùn)算符)同樣有效地獲得相同的答案。舉一個(gè)更現(xiàn)實(shí)的例子,假設(shè)我們要計(jì)算給定字符串中英文字母表中每個(gè)字母的頻率。一個(gè)簡(jiǎn)單的順序解決方案可能如下所示:
string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) letterFrequencies [index]++;
};
輸入文本可能很長(zhǎng)的一個(gè)例子是基因測(cè)序。然后,“字母表”將由字母,,和組成。
為了并行化這一點(diǎn),我們可以將 foreach 語(yǔ)句替換為對(duì) Parallel.ForEach 的調(diào)用(我們將在下一節(jié)中介紹),但這將讓我們處理共享陣列上的并發(fā)問(wèn)題。鎖定訪問(wèn)該陣列幾乎會(huì)扼殺并行化的潛力。
聚合提供了一個(gè)整潔的解決方案。在這種情況下,累加器是一個(gè)數(shù)組,就像我們前面示例中的 letterFrequency 數(shù)組一樣。下面是使用聚合的順序版本:
int[] result =
text.Aggregate (
new int[26], // Create the "accumulator"
(letterFrequencies, c) => // Aggregate a letter into the accumulator
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) letterFrequencies [index]++;
return letterFrequencies;
});
現(xiàn)在并行版本,使用 PLINQ 的特殊重載:
int[] result =
text.AsParallel().Aggregate (
() => new int[26], // Create a new local accumulator
(localFrequencies, c) => // Aggregate into the local accumulator
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) localFrequencies [index]++;
return localFrequencies;
},
// Aggregate local->main accumulator
(mainFreq, localFreq) =>
mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),
finalResult => finalResult // Perform any final transformation
); // on the end result.
請(qǐng)注意,局部累積函數(shù) localFrequency 數(shù)組。這種執(zhí)行此優(yōu)化的能力非常重要,并且是合法的,因?yàn)?localFrequency 是每個(gè)線程的本地。
PFX 通過(guò)并行類(lèi)中的三種靜態(tài)方法提供結(jié)構(gòu)化并行的基本形式:
Parallel.Invoke
并行執(zhí)行委托數(shù)組
Parallel.For
執(zhí)行 C# for 循環(huán)的并行等效項(xiàng)
Parallel.ForEach
執(zhí)行 C# foreach 循環(huán)的并行等效項(xiàng)
所有三種方法都會(huì)阻止,直到所有工作完成。與 PLINQ 一樣,在發(fā)生未經(jīng)處理的異常后,剩余的工作線程將在當(dāng)前迭代后停止,并將異常(或多個(gè)異常)拋回調(diào)用方 — 包裝在 AggregateException 中(請(qǐng)參閱)。
Parallel.Invoke 并行執(zhí)行一組 Action 委托,然后等待它們完成。該方法的最簡(jiǎn)單版本定義如下:
public static void Invoke (params Action[] actions);
與 PLINQ 一樣,并行 .* 方法針對(duì)計(jì)算密集型工作而非 I/O 密集型工作進(jìn)行了優(yōu)化。但是,一次下載兩個(gè)網(wǎng)頁(yè)提供了一種演示 Parallel.Invoke 的簡(jiǎn)單方法:
Parallel.Invoke (
() => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
() => new WebClient().DownloadFile ("http://microsoft.com", "ms.html"));
從表面上看,這似乎是創(chuàng)建和等待兩個(gè)線程綁定 Task 對(duì)象的便捷快捷方式。但有一個(gè)重要的區(qū)別:如果你傳入一個(gè)包含一百萬(wàn)個(gè)委托的數(shù)組,Parallel.Invoke 仍然有效地工作。這是因?yàn)樗鼘⒋罅吭貫榉峙浣o少數(shù)基礎(chǔ)任務(wù)的批次,而不是為每個(gè)委托創(chuàng)建單獨(dú)的任務(wù)。
與 Parallel 的所有方法一樣,在整理結(jié)果時(shí),您只能靠自己。這意味著您需要牢記線程安全性。例如,以下內(nèi)容是線程不安全的:
var data = new List<string>();
Parallel.Invoke (
() => data.Add (new WebClient().DownloadString ("http://www.foo.com")),
() => data.Add (new WebClient().DownloadString ("http://www.far.com")));
鎖定添加到列表可以解決此問(wèn)題,但如果具有更大的快速執(zhí)行委托數(shù)組,則鎖定會(huì)產(chǎn)生瓶頸。更好的解決方案是使用線程安全集合,我們將在后面的部分中介紹 - 將是理想的選擇。
Parallel.Invoke 也被重載以接受 ParallelOptions 對(duì)象:
public static void Invoke (ParallelOptions options,
params Action[] actions);
使用 ParallelOptions ,您可以插入取消令牌、限制最大并發(fā)性以及指定自定義任務(wù)計(jì)劃程序。當(dāng)您執(zhí)行的任務(wù)(大約)多于核心時(shí),取消令牌是相關(guān)的:取消后,任何未啟動(dòng)的委托都將被放棄。但是,任何已經(jīng)執(zhí)行任務(wù)的代表將繼續(xù)完成。有關(guān)如何使用取消令牌的示例,請(qǐng)參閱
Parallel.For 和 Parallel.ForEach 執(zhí)行等效的 C# for 和 foreach 循環(huán),但每次迭代并行執(zhí)行,而不是按順序執(zhí)行。以下是他們的(最簡(jiǎn)單的)簽名:
public static ParallelLoopResult For (
int fromInclusive, int toExclusive, Action<int> body)
public static ParallelLoopResult ForEach<TSource> (
IEnumerable<TSource> source, Action<TSource> body)
此順序 for 循環(huán)
for (int i = 0; i < 100; i++)
Foo (i);
像這樣并行化
Parallel.For (0, 100, i => Foo (i));
或者更簡(jiǎn)單地說(shuō):
Parallel.For (0, 100, Foo);
而這個(gè)順序的 foreach
foreach (char c in "Hello, world")
Foo (c);
像這樣并行化:
Parallel.ForEach ("Hello, world", Foo);
舉一個(gè)實(shí)際的例子,如果我們導(dǎo)入 System.Security.Cryptography 命名空間,我們可以并行生成六個(gè)公鑰/私鑰對(duì)字符串,如下所示:
var keyPairs = new string[6];
Parallel.For (0, keyPairs.Length,
i => keyPairs[i] = RSA.Create().ToXmlString (true));
與 Parallel.Invoke 一樣,我們可以為 Parallel.For 和 Parallel.ForEach 提供大量的工作項(xiàng),它們將被有效地劃分為幾個(gè)任務(wù)。
后一個(gè)查詢(xún)也可以使用 PLINQ 完成:
string[] keyPairs =
ParallelEnumerable.Range (0, 6)
.Select (i => RSA.Create().ToXmlString (true))
.ToArray();
Parallel.For 和 Parallel.ForEach 通常在外部循環(huán)而不是內(nèi)部循環(huán)上效果最好。這是因?yàn)閷?duì)于前者,您將提供更大的工作塊來(lái)并行化,從而稀釋了管理開(kāi)銷(xiāo)。通常不需要并行化內(nèi)部和外部循環(huán)。在以下示例中,我們通常需要 100 多個(gè)內(nèi)核才能從內(nèi)部并行化中受益:
Parallel.For (0, 100, i =>
{
Parallel.For (0, 50, j => Foo (i, j)); // Sequential would be better
}); // for the inner loop.
有時(shí),了解循環(huán)迭代索引很有用。使用順序 foreach ,很容易:
int i = 0;
foreach (char c in "Hello, world")
Console.WriteLine (c.ToString() + i++);
但是,在并行上下文中,遞增共享變量不是線程安全的。您必須改用以下版本的 ForEach :
public static ParallelLoopResult ForEach<TSource> (
IEnumerable<TSource> source, Action<TSource,ParallelLoopState,long> body)
我們將忽略 ParallelLoopState(我們將在下一節(jié)中介紹)。現(xiàn)在,我們對(duì) Action 的第三個(gè)類(lèi)型參數(shù) long 感興趣,它表示循環(huán)索引:
Parallel.ForEach ("Hello, world", (c, state, i) =>
{
Console.WriteLine (c.ToString() + i);
});
為了將其置于實(shí)際上下文中,讓我們重新審視一下我們使用 PLINQ 編寫(xiě)的拼寫(xiě)檢查器。以下代碼加載一個(gè)字典以及一個(gè)包含一百萬(wàn)個(gè)單詞的數(shù)組進(jìn)行測(cè)試:
if (!File.Exists ("WordLookup.txt")) // Contains about 150,000 words
new WebClient().DownloadFile (
"http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
var wordLookup = new HashSet<string> (
File.ReadAllLines ("WordLookup.txt"),
StringComparer.InvariantCultureIgnoreCase);
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();
wordsToTest [12345] = "woozsh"; // Introduce a couple
wordsToTest [23456] = "wubsie"; // of spelling mistakes.
我們可以使用 Parallel.ForEach 的索引版本對(duì)我們的 wordsToTest 數(shù)組執(zhí)行拼寫(xiě)檢查,如下所示:
var misspellings = new ConcurrentBag<Tuple<int,string>>();
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
if (!wordLookup.Contains (word))
misspellings.Add (Tuple.Create ((int) i, word));
});
請(qǐng)注意,我們必須將結(jié)果整理到線程安全集合中:與使用 PLINQ 相比,必須這樣做是缺點(diǎn)。與 PLINQ 相比,它的優(yōu)勢(shì)在于我們避免了應(yīng)用索引選擇查詢(xún)運(yùn)算符的成本,而索引 Select 查詢(xún)運(yùn)算符的效率低于索引的 ForEach。
由于并行 For 或 ForEach 中的循環(huán)主體是委托,因此不能使用 break 語(yǔ)句提前退出循環(huán)。相反,您必須在 ParallelLoopState 對(duì)象上調(diào)用 Break 或 Stop:
public class ParallelLoopState
{
public void Break();
public void Stop();
public bool IsExceptional { get; }
public bool IsStopped { get; }
public long? LowestBreakIteration { get; }
public bool ShouldExitCurrentIteration { get; }
}
獲取 ParallelLoopState 很容易:所有版本的 For 和 ForEach 都被重載以接受 Action<TSource,ParallelLoopState> 類(lèi)型的循環(huán)體。所以,為了并行化這個(gè)
foreach (char c in "Hello, world")
if (c == ',')
break;
else
Console.Write (c);
這樣做:
Parallel.ForEach ("Hello, world", (c, loopState) =>
{
if (c == ',')
loopState.Break();
else
Console.Write (c);
});
// OUTPUT: Hlloe
從輸出中可以看出,循環(huán)主體可以按隨機(jī)順序完成。除了這種差異之外,調(diào)用 Break 至少會(huì)產(chǎn)生與按順序執(zhí)行循環(huán)相同的元素:此示例將始終至少按某種 、、l、 和 。 相反,調(diào)用 Stop 而不是 Break 會(huì)強(qiáng)制所有線程在其當(dāng)前迭代后立即完成。在我們的示例中,如果另一個(gè)線程滯后,調(diào)用 Stop 可以為我們提供字母 、、l、 和 的子集。 當(dāng)您找到要查找的內(nèi)容時(shí),或者當(dāng)出現(xiàn)問(wèn)題并且您不會(huì)查看結(jié)果時(shí),呼叫 Stop 非常有用。
Parallel.For 和 Parallel.ForEach 方法返回一個(gè) ParallelLoopResult 對(duì)象,該對(duì)象公開(kāi)名為 IsDone 和 LowestBreakIteration 的屬性。這些告訴您循環(huán)是否運(yùn)行完成;如果沒(méi)有,則指示循環(huán)在哪個(gè)周期中斷。
如果 LowestBreakIteration 返回 null,則表示您在循環(huán)中調(diào)用了 Stop(而不是 Break)。
如果循環(huán)體很長(zhǎng),則可能希望其他線程在方法主體中途中斷,以防早期中斷或停止。可以通過(guò)在代碼中的不同位置輪詢(xún) ShouldExitCurrentIteration 屬性來(lái)執(zhí)行此操作;此屬性在停止后立即變?yōu)?true,或者在中斷后不久變?yōu)?true。
ShouldExitCurrentIteration 在取消請(qǐng)求后也會(huì)變?yōu)?true,或者在循環(huán)中拋出異常。
IsExceptional 可讓您知道另一個(gè)線程上是否發(fā)生了異常。任何未處理的異常都將導(dǎo)致循環(huán)在每個(gè)線程的當(dāng)前迭代后停止:若要避免這種情況,必須在代碼中顯式處理異常。
Parallel.For 和 Parallel.ForEach 都提供了一組重載,這些重載具有一個(gè)名為 TLocal 的泛型類(lèi)型參數(shù)。這些重載旨在幫助您使用迭代密集型循環(huán)優(yōu)化數(shù)據(jù)排序規(guī)則。最簡(jiǎn)單的是這樣的:
public static ParallelLoopResult For <TLocal> (
int fromInclusive,
int toExclusive,
Func <TLocal> localInit,
Func <int, ParallelLoopState, TLocal, TLocal> body,
Action <TLocal> localFinally);
在實(shí)踐中很少需要這些方法,因?yàn)樗鼈兊哪繕?biāo)場(chǎng)景主要由 PLINQ 覆蓋(這是幸運(yùn)的,因?yàn)檫@些重載有點(diǎn)嚇人!
從本質(zhì)上講,問(wèn)題是這樣的:假設(shè)我們要對(duì)數(shù)字 1 到 10,000,000 的平方根求和。計(jì)算 10 萬(wàn)個(gè)平方根很容易并行化,但對(duì)它們的值求和很麻煩,因?yàn)槲覀儽仨氭i定更新總數(shù):
object locker = new object();
double total = 0;
Parallel.For (1, 10000000,
i => { lock (locker) total += Math.Sqrt (i); });
并行化的收益被獲得 10 萬(wàn)個(gè)鎖的成本以及由此產(chǎn)生的阻塞所抵消。
然而,現(xiàn)實(shí)情況是,我們實(shí)際上不需要10萬(wàn)把鎖。想象一下,一隊(duì)志愿者撿起大量垃圾。如果所有工人共用一個(gè)垃圾桶,那么旅行和爭(zhēng)用將使該過(guò)程效率極低。顯而易見(jiàn)的解決方案是讓每個(gè)工人都有一個(gè)私人或“本地”垃圾桶,偶爾會(huì)將其倒入主垃圾箱。
For 和 ForEach 的 TLocal 版本正是以這種方式工作的。志愿者是內(nèi)部工作線程,本地垃圾桶。要使執(zhí)行此作業(yè),必須向其提供兩個(gè)指示的附加委托:
此外,而不是返回 void 的主體委托,它應(yīng)該返回本地值的新聚合。下面是重構(gòu)的示例:
object locker = new object();
double grandTotal = 0;
Parallel.For (1, 10000000,
() => 0.0, // Initialize the local value.
(i, state, localTotal) => // Body delegate. Notice that it
localTotal + Math.Sqrt (i), // returns the new local total.
localTotal => // Add the local value
{ lock (locker) grandTotal += localTotal; } // to the master value.
);
我們?nèi)匀槐仨氭i定,但只能將局部值聚合到總計(jì)。這使得該過(guò)程大大提高了效率。
如前所述,PLINQ 通常非常適合這些場(chǎng)景。我們的示例可以像這樣與 PLINQ 并行化:
ParallelEnumerable.Range (1, 10000000)
.Sum (i => Math.Sqrt (i))
(請(qǐng)注意,我們使用 ParallelEnumerable 來(lái)強(qiáng)制:在這種情況下,這可以提高性能,因?yàn)樗袛?shù)字的處理時(shí)間都相同。
在更復(fù)雜的方案中,可以使用 LINQ 的聚合運(yùn)算符而不是 Sum 。如果您提供了本地種子工廠,則情況有點(diǎn)類(lèi)似于使用 Parallel.For 提供本地值函數(shù)。
是使用 PFX 并行化的最低級(jí)別方法。用于在此級(jí)別工作的類(lèi)在 System.Threading.Tasks 命名空間中定義,包括以下內(nèi)容:
類(lèi) | 目的 |
任務(wù) | 用于管理工作單元 |
任務(wù)<TResult> | 用于管理具有返回值的工作單位 |
任務(wù)工廠 | 用于創(chuàng)建任務(wù) |
TaskFactory<TResult> | 用于創(chuàng)建具有相同返回類(lèi)型的任務(wù)和延續(xù) |
任務(wù)計(jì)劃程序 | 用于管理任務(wù)計(jì)劃 |
任務(wù)完成源 | 用于手動(dòng)控制任務(wù)的工作流 |
我們?cè)谥薪榻B了任務(wù)的基礎(chǔ)知識(shí);在本節(jié)中,我們將介紹針對(duì)并行編程的任務(wù)的高級(jí)功能:
任務(wù)并行庫(kù)允許您以最小的開(kāi)銷(xiāo)創(chuàng)建數(shù)百(甚至數(shù)千)個(gè)任務(wù)。但是,如果要?jiǎng)?chuàng)建數(shù)百萬(wàn)個(gè)任務(wù),則需要將這些任務(wù)劃分為更大的工作單元以保持效率。并行類(lèi)和 PLINQ 會(huì)自動(dòng)執(zhí)行此操作。
Visual Studio 提供了一個(gè)用于監(jiān)視任務(wù)(調(diào)試→窗口→并行任務(wù))的窗口。這等效于“線程”窗口,但用于任務(wù)。“并行堆棧”窗口還具有用于任務(wù)的特殊模式。
如所述,Task.Run創(chuàng)建并啟動(dòng)一個(gè)任務(wù)或任務(wù)<> 。此方法實(shí)際上是調(diào)用 Task.Factory.StartNew 的快捷方式,它通過(guò)額外的重載提供了更大的靈活性。
Task.Factory.StartNew 允許您指定傳遞給目標(biāo)然后,目標(biāo)方法的簽名必須包含單個(gè)對(duì)象類(lèi)型參數(shù):
var task = Task.Factory.StartNew (Greet, "Hello");
task.Wait(); // Wait for task to complete.
void Greet (object state) { Console.Write (state); } // Hello
這避免了執(zhí)行調(diào)用 Greet 的 lambda 表達(dá)式所需的閉包成本。這是一個(gè)微優(yōu)化,在實(shí)踐中很少需要,因此我們可以更好地利用對(duì)象,即為任務(wù)分配一個(gè)有意義的名稱(chēng)。然后,我們可以使用 AsyncState 屬性來(lái)查詢(xún)其名稱(chēng):
var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");
Console.WriteLine (task.AsyncState); // Greeting
task.Wait();
void Greet (string message) { Console.Write (message); }
Visual Studio 在“并行任務(wù)”窗口中顯示每個(gè)任務(wù)的 AsyncState,因此在此處使用有意義的名稱(chēng)可以大大簡(jiǎn)化調(diào)試。
您可以通過(guò)在調(diào)用 StartNew(或?qū)嵗蝿?wù))時(shí)指定 TaskCreationOptions 枚舉來(lái)調(diào)整任務(wù)的執(zhí)行。TaskCreationOptions 是一個(gè)具有以下(可組合)值的標(biāo)志枚舉:
LongRunning, PreferFairness, AttachedToParent
LongRun 建議調(diào)度程序?qū)⒁粋€(gè)線程專(zhuān)用于任務(wù),正如我們中所述,這對(duì)于 I/O 綁定任務(wù)和長(zhǎng)時(shí)間運(yùn)行的任務(wù)是有益的,否則這些任務(wù)可能會(huì)迫使短期運(yùn)行的任務(wù)在調(diào)度之前等待不合理的時(shí)間。
PreferFairness 指示調(diào)度程序嘗試確保按啟動(dòng)順序調(diào)度任務(wù)。它通常可能會(huì)這樣做,因?yàn)樗褂帽镜毓ぷ鞲`取隊(duì)列在內(nèi)部?jī)?yōu)化任務(wù)調(diào)度 — 這種優(yōu)化允許創(chuàng)建任務(wù),而不會(huì)產(chǎn)生單個(gè)工作隊(duì)列產(chǎn)生的爭(zhēng)用開(kāi)銷(xiāo)。通過(guò)指定“附加到父項(xiàng)”來(lái)創(chuàng)建子任務(wù)。
當(dāng)一個(gè)任務(wù)啟動(dòng)另一個(gè)任務(wù)時(shí),您可以選擇建立父子:
Task parent = Task.Factory.StartNew (() =>
{
Console.WriteLine ("I am a parent");
Task.Factory.StartNew (() => // Detached task
{
Console.WriteLine ("I am detached");
});
Task.Factory.StartNew (() => // Child task
{
Console.WriteLine ("I am a child");
}, TaskCreationOptions.AttachedToParent);
});
子任務(wù)的特殊之處在于,當(dāng)您等待任務(wù)完成時(shí),它也會(huì)等待任何子任務(wù)。此時(shí),任何子異常都會(huì)冒泡:
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() =>
{
Task.Factory.StartNew (() => // Child
{
Task.Factory.StartNew (() => { throw null; }, atp); // Grandchild
}, atp);
});
// The following call throws a NullReferenceException (wrapped
// in nested AggregateExceptions):
parent.Wait();
當(dāng)子任務(wù)是延續(xù)時(shí),這可能特別有用,您很快就會(huì)看到。
我們?cè)谥锌吹剑梢酝ㄟ^(guò)調(diào)用其 Wait 方法或訪問(wèn)其 Result 屬性(如果它是 Task<TResult> )來(lái)等待單個(gè)任務(wù)。您還可以通過(guò)靜態(tài)方法 Task.WaitAll(等待所有指定任務(wù)完成)和 Task.WaitAny(僅等待一個(gè)任務(wù)完成)一次等待多個(gè)任務(wù)。
WaitAll 類(lèi)似于依次等待每個(gè)任務(wù),但效率更高,因?yàn)樗ㄗ疃啵┲恍枰粋€(gè)上下文切換。此外,如果一個(gè)或多個(gè)任務(wù)引發(fā)未經(jīng)處理的異常,WaitAll 仍會(huì)等待每個(gè)任務(wù)。然后,它會(huì)重新拋出一個(gè) AggregateException,該異常累積每個(gè)錯(cuò)誤任務(wù)的異常(這是 AggregateException 真正有用的地方)。這相當(dāng)于這樣做:
// Assume t1, t2 and t3 are tasks:
var exceptions = new List<Exception>();
try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
if (exceptions.Count > 0) throw new AggregateException (exceptions);
調(diào)用 WaitAny 等效于等待 ManualResetEventSlim,該任務(wù)在完成時(shí)由每個(gè)任務(wù)發(fā)出信號(hào)。
除了超時(shí)之外,您還可以將傳遞給 Wait 方法:這允許您取消等待,。
您可以選擇在啟動(dòng)任務(wù)時(shí)傳入取消令牌。然后,如果通過(guò)該令牌取消,任務(wù)本身將進(jìn)入“已取消”狀態(tài):
var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
cts.CancelAfter (500);
Task task = Task.Factory.StartNew (() =>
{
Thread.Sleep (1000);
token.ThrowIfCancellationRequested(); // Check for cancellation request
}, token);
try { task.Wait(); }
catch (AggregateException ex)
{
Console.WriteLine (ex.InnerException is TaskCanceledException); // True
Console.WriteLine (task.IsCanceled); // True
Console.WriteLine (task.Status); // Canceled
}
TaskCanceledException 是 OperationCanceledException 的一個(gè)子類(lèi)。如果要顯式拋出 OperationCanceledException(而不是調(diào)用令牌。ThrowIfCancelRequest),您必須將取消令牌傳遞到 OperationCanceledException 的構(gòu)造函數(shù)中。如果不這樣做,任務(wù)將不會(huì)以 TaskStatus.Canceled 狀態(tài)結(jié)束,也不會(huì)觸發(fā) OnlyOnCanceled 。
如果任務(wù)在啟動(dòng)之前被取消,則不會(huì)計(jì)劃它 - 操作已取消異常將立即拋給任務(wù)。
由于取消令牌由其他 API 識(shí)別,因此您可以將它們傳遞到其他構(gòu)造中,取消將無(wú)縫傳播:
var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
Task task = Task.Factory.StartNew (() =>
{
// Pass our cancellation token into a PLINQ query:
var query = someSequence.AsParallel().WithCancellation (token)...
... enumerate query ...
});
在此示例中調(diào)用 Cancel on cancelSource 將取消 PLINQ 查詢(xún),該查詢(xún)將在任務(wù)正文上引發(fā) OperationCanceledException,然后取消任務(wù)。
可以傳遞到“等待”和“取消和等待”等方法中的取消令牌允許您取消操作,而不是取消任務(wù)本身。
方法在任務(wù)結(jié)束后立即執(zhí)行委托:
Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));
一旦任務(wù) 1(任務(wù))完成、失敗或取消,任務(wù) 2()就會(huì)啟動(dòng)。(如果任務(wù) 1 在第二行代碼運(yùn)行之前已完成,則任務(wù) 2 將計(jì)劃立即執(zhí)行。傳遞給延續(xù)的 lambda 表達(dá)式的 ant 參數(shù)是對(duì)先驗(yàn)任務(wù)的引用。ContinueWith 本身會(huì)返回一個(gè)任務(wù),以便輕松添加進(jìn)一步的延續(xù)。
默認(rèn)情況下,前置任務(wù)和延續(xù)任務(wù)可以在不同的線程上執(zhí)行。您可以通過(guò)指定 TaskContinuationOptions.ExecuteSync 在調(diào)用 ContinueWith 時(shí)強(qiáng)制它們?cè)谕痪€程上執(zhí)行:這可以通過(guò)減少間接性來(lái)提高非常細(xì)粒度的延續(xù)的性能。
就像普通任務(wù)一樣,延續(xù)可以是 Task<TResult> 類(lèi)型并返回?cái)?shù)據(jù)。在下面的示例中,我們使用一系列鏈?zhǔn)饺蝿?wù)計(jì)算 Math.Sqrt(8*2),然后寫(xiě)出結(jié)果:
Task.Factory.StartNew<int> (() => 8)
.ContinueWith (ant => ant.Result * 2)
.ContinueWith (ant => Math.Sqrt (ant.Result))
.ContinueWith (ant => Console.WriteLine (ant.Result)); // 4
為了簡(jiǎn)單起見(jiàn),我們的例子有些做作;在現(xiàn)實(shí)生活中,這些 lambda 表達(dá)式會(huì)調(diào)用計(jì)算密集型函數(shù)。
延續(xù)可以通過(guò)查詢(xún)前置任務(wù)的 Exception 屬性來(lái)了解前置任務(wù)是否出錯(cuò),或者只是通過(guò)調(diào)用 Result / Wait 并捕獲生成的 AggregateException。如果先前的錯(cuò)誤,而延續(xù)也不存在,則異常被視為,并且靜態(tài) TaskScheduler.UnobservedTaskException 事件在稍后對(duì)任務(wù)進(jìn)行垃圾回收時(shí)觸發(fā)。
安全的模式是重新引發(fā)先前的異常。只要延續(xù)是 Wait edon,異常就會(huì)傳播并重新拋出到 Waiter:
Task continuation = Task.Factory.StartNew (() => { throw null; })
.ContinueWith (ant =>
{
ant.Wait();
// Continue processing...
});
continuation.Wait(); // Exception is now thrown back to caller.
處理異常的另一種方法是為特殊與非異常結(jié)果指定不同的延續(xù)。這是通過(guò)任務(wù)延續(xù)選項(xiàng)完成的:
Task task1 = Task.Factory.StartNew (() => { throw null; });
Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),
TaskContinuationOptions.OnlyOnFaulted);
Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),
TaskContinuationOptions.NotOnFaulted);
此模式在與子任務(wù)結(jié)合使用時(shí)特別有用,您很快就會(huì)看到。
以下擴(kuò)展方法“吞噬”任務(wù)的未處理異常:
public static void IgnoreExceptions (this Task task)
{
task.ContinueWith (t => { var ignore = t.Exception; },
TaskContinuationOptions.OnlyOnFaulted);
}
(這可以通過(guò)添加代碼來(lái)記錄異常來(lái)改進(jìn)。以下是它的使用方式:
Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();
延續(xù)的一個(gè)強(qiáng)大功能是,它們僅在所有子任務(wù)完成時(shí)啟動(dòng)(參見(jiàn))。此時(shí),子項(xiàng)引發(fā)的任何異常都將封送到延續(xù)中。
在下面的示例中,我們啟動(dòng)三個(gè)子任務(wù),每個(gè)子任務(wù)拋出一個(gè) NullReferenceException 。然后,我們通過(guò)父級(jí)的延續(xù)一舉捕獲所有這些:
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew (() =>
{
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith (p => Console.WriteLine (p.Exception),
TaskContinuationOptions.OnlyOnFaulted);
默認(rèn)情況下,無(wú)論前置任務(wù)完成、引發(fā)異常還是取消,都會(huì)無(wú)地安排延續(xù)。您可以通過(guò) TaskContinuationOptions 枚舉中包含的一組(可組合的)標(biāo)志來(lái)更改此行為。以下是控制條件延續(xù)的三個(gè)核心標(biāo)志:
NotOnRanToCompletion = 0x10000,
NotOnFaulted = 0x20000,
NotOnCanceled = 0x40000,
這些標(biāo)志是減法的,因?yàn)槟鷳?yīng)用的越多,執(zhí)行延續(xù)的可能性就越小。為方便起見(jiàn),還有以下預(yù)組合值:
OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled,
OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled,
OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted
(組合所有 Not* 標(biāo)志 [ NotOnRanToCompletion, NotOnFaulted , NotOnCanceled ] 是荒謬的,因?yàn)樗鼤?huì)導(dǎo)致延續(xù)總是。
“RanToComplete”表示前置成功,沒(méi)有取消或未處理的異常。
“錯(cuò)誤”表示在前置項(xiàng)上拋出未處理的異常。
“已取消”是指以下兩種情況之一:
必須掌握的是,當(dāng)延續(xù)沒(méi)有通過(guò)這些標(biāo)志執(zhí)行時(shí),延續(xù)不會(huì)被遺忘或放棄 - 它會(huì)被取消。這意味著延續(xù)本身的任何延續(xù),除非您使用 NotOnCanceled 謂詞它們。例如,考慮一下:
Task t1 = Task.Factory.StartNew (...);
Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
TaskContinuationOptions.OnlyOnFaulted);
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));
就目前而言,t3 將始終被調(diào)度 — 即使 t1 沒(méi)有引發(fā)異常(參見(jiàn))。這是因?yàn)槿绻?t1 成功,故障任務(wù)將被取消,并且對(duì) t3 沒(méi)有繼續(xù)限制,t3 將無(wú)條件執(zhí)行。
如果我們希望 t3 僅在錯(cuò)誤實(shí)際運(yùn)行時(shí)執(zhí)行,我們必須這樣做:
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),
TaskContinuationOptions.NotOnCanceled);
(或者,我們可以指定 OnlyOnRanToCompletion;不同之處在于,如果在錯(cuò)誤中拋出異常,t3 將不會(huì)執(zhí)行。
可以使用 TaskFactory 類(lèi)中的 ContinueWhenAll 和 ContinueWhenAny 方法,根據(jù)多個(gè)前因的完成情況來(lái)安排繼續(xù)執(zhí)行。然而,隨著(WhenAll和WhenAny)中討論的任務(wù)組合器的引入,這些方法變得多余。具體來(lái)說(shuō),給定以下任務(wù)
var task1 = Task.Run (() => Console.Write ("X"));
var task2 = Task.Run (() => Console.Write ("Y"));
我們可以安排在兩者都完成后執(zhí)行的延續(xù),如下所示:
var continuation = Task.Factory.ContinueWhenAll (
new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));
下面是與 WhenAll 任務(wù)組合器相同的結(jié)果:
var continuation = Task.WhenAll (task1, task2)
.ContinueWith (ant => Console.WriteLine ("Done"));
對(duì)同一任務(wù)多次調(diào)用 ContinueWith 會(huì)在單個(gè)前因上創(chuàng)建多個(gè)延續(xù)。當(dāng)前置完成時(shí),所有延續(xù)將一起開(kāi)始(除非您指定 TaskContinuationOptions.ExecuteSyncly ,在這種情況下,延續(xù)將按順序執(zhí)行)。
以下內(nèi)容等待一秒鐘,然后寫(xiě)入 XY 或 YX:
var t = Task.Factory.StartNew (() => Thread.Sleep (1000));
t.ContinueWith (ant => Console.Write ("X"));
t.ContinueWith (ant => Console.Write ("Y"));
任務(wù)計(jì)劃程序?qū)⑷蝿?wù)分配給線程,并由抽象的任務(wù)類(lèi)表示。 .NET 提供了兩個(gè)具體的實(shí)現(xiàn):與 CLR 線程池協(xié)同工作的默認(rèn)計(jì)劃程序和。 后者(主要)旨在幫助您使用 WPF 和 Windows 窗體的線程模型,該模型要求用戶(hù)界面元素和控件只能從創(chuàng)建它們的線程訪問(wèn)(請(qǐng)參見(jiàn)中的)。通過(guò)捕獲它,我們可以指示任務(wù)或延續(xù)在此上下文中執(zhí)行:
// Suppose we are on a UI thread in a Windows Forms / WPF application:
_uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
假設(shè) Foo 是一個(gè)返回字符串的計(jì)算綁定方法,并且 lblResult 是一個(gè) WPF 或 Windows 窗體標(biāo)簽,那么我們可以在操作完成后安全地更新標(biāo)簽,如下所示:
Task.Run (() => Foo())
.ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);
當(dāng)然,C# 的異步函數(shù)更常用于這種事情。
也可以編寫(xiě)我們自己的任務(wù)調(diào)度程序(通過(guò)子類(lèi)化 任務(wù)調(diào)度程序 ),盡管這只有在非常專(zhuān)業(yè)的情況下才會(huì)這樣做。對(duì)于自定義計(jì)劃,您更常使用 TaskCompletionSource。
當(dāng)您調(diào)用 Task.Factory 時(shí),您將在 Task 上調(diào)用返回默認(rèn) TaskFactory 對(duì)象的靜態(tài)屬性。任務(wù)工廠的目的是創(chuàng)建任務(wù);具體來(lái)說(shuō),有三種任務(wù):
創(chuàng)建任務(wù)的另一種方法是實(shí)例化任務(wù)并調(diào)用 Start 。但是,這允許您僅創(chuàng)建“普通”任務(wù),而不創(chuàng)建延續(xù)。
TaskFactory 不是一個(gè)工廠:您實(shí)際上可以實(shí)例化類(lèi),當(dāng)您想要使用相同(非標(biāo)準(zhǔn))值為 TaskCreationOptions、TaskContinuationOptions 或 TaskScheduler 重復(fù)創(chuàng)建任務(wù)時(shí),這很有用。例如,如果我們想重復(fù)創(chuàng)建長(zhǎng)時(shí)間運(yùn)行的任務(wù),我們可以創(chuàng)建一個(gè)自定義工廠,如下所示:
var factory = new TaskFactory (
TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,
TaskContinuationOptions.None);
然后,只需在工廠調(diào)用 StartNew,即可創(chuàng)建任務(wù):
Task task1 = factory.StartNew (Method1);
Task task2 = factory.StartNew (Method2);
...
自定義繼續(xù)選項(xiàng)在調(diào)用 ContinueWhenAll 和 ContinueWhenAny 時(shí)應(yīng)用。
正如我們所看到的,PLINQ、并行類(lèi)和任務(wù)會(huì)自動(dòng)將異常封送到使用者。若要了解為什么這是必不可少的,請(qǐng)考慮以下 LINQ 查詢(xún),該查詢(xún)?cè)诘谝淮蔚鷷r(shí)引發(fā) DivideByZeroException:
try
{
var query = from i in Enumerable.Range (0, 1000000)
select 100 / i;
...
}
catch (DivideByZeroException)
{
...
}
如果我們要求 PLINQ 并行化此查詢(xún),并且它忽略了異常的處理,則可能會(huì)在上拋出 DivideByZeroException,從而繞過(guò)我們的 catch 塊并導(dǎo)致應(yīng)用程序死亡。
因此,會(huì)自動(dòng)捕獲異常并將其重新拋出給調(diào)用方。但不幸的是,這并不像捕獲DivideByZeroException那么簡(jiǎn)單。由于這些庫(kù)使用許多線程,因此實(shí)際上可以同時(shí)引發(fā)兩個(gè)或多個(gè)異常。因此,為了確保報(bào)告所有異常,異常將包裝在 AggregateException 容器中,該容器公開(kāi)一個(gè)包含每個(gè)捕獲異常的 InnerExceptions 屬性:
try
{
var query = from i in ParallelEnumerable.Range (0, 1000000)
select 100 / i;
// Enumerate query
...
}
catch (AggregateException aex)
{
foreach (Exception ex in aex.InnerExceptions)
Console.WriteLine (ex.Message);
}
PLINQ 和 Parallel 類(lèi)在遇到第一個(gè)異常時(shí)都會(huì)結(jié)束查詢(xún)或循環(huán)執(zhí)行,方法是不處理任何其他元素或循環(huán)體。但是,在當(dāng)前周期完成之前,可能會(huì)引發(fā)更多異常。AggregateException 中的第一個(gè)異常在 InnerException 屬性中可見(jiàn)。
AggregateException 類(lèi)提供了幾種方法來(lái)簡(jiǎn)化異常處理:Flatten 和 Handle 。
AggregateExceptions 通常會(huì)包含其他 AggregateExceptions。發(fā)生這種情況的一個(gè)示例是子任務(wù)引發(fā)異常。您可以通過(guò)調(diào)用 Flatten 來(lái)消除任何級(jí)別的嵌套以簡(jiǎn)化處理。此方法返回一個(gè)新的 AggregateException,其中包含內(nèi)部異常的簡(jiǎn)單平面列表:
catch (AggregateException aex)
{
foreach (Exception ex in aex.Flatten().InnerExceptions)
myLogWriter.LogException (ex);
}
有時(shí),僅捕獲特定的異常類(lèi)型并重新引發(fā)其他類(lèi)型的異常類(lèi)型很有用。AggregateException 上的 Handle 方法提供了執(zhí)行此操作的快捷方式。它接受一個(gè)異常謂詞,它在每個(gè)內(nèi)部上運(yùn)行:
public void Handle (Func<Exception, bool> predicate)
如果謂詞返回 true,則認(rèn)為該異常“已處理”。委托運(yùn)行每個(gè)異常后,將發(fā)生以下情況:
例如,以下內(nèi)容最終會(huì)重新拋出另一個(gè)包含單個(gè) NullReferenceException 的 AggregateException:
var parent = Task.Factory.StartNew (() =>
{
// We’ll throw 3 exceptions at once using 3 child tasks:
int[] numbers = { 0 };
var childFactory = new TaskFactory
(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
childFactory.StartNew (() => 5 / numbers[0]); // Division by zero
childFactory.StartNew (() => numbers [1]); // Index out of range
childFactory.StartNew (() => { throw null; }); // Null reference
});
try { parent.Wait(); }
catch (AggregateException aex)
{
aex.Flatten().Handle (ex => // Note that we still need to call Flatten
{
if (ex is DivideByZeroException)
{
Console.WriteLine ("Divide by zero");
return true; // This exception is "handled"
}
if (ex is IndexOutOfRangeException)
{
Console.WriteLine ("Index out of range");
return true; // This exception is "handled"
}
return false; // All other exceptions will get rethrown
});
}
.NET 在 System.Collections.Concurrent 中提供線程安全集合:
并發(fā)收集 | 非并發(fā)等效項(xiàng) |
ConcurrentStack<T> | 堆棧<T> |
ConcurrentQueue<T> | 隊(duì)列<T> |
ConcurrentBag<T> | (無(wú)) |
并發(fā)詞典<啦,電視> | 詞典<啦,電視> |
并發(fā)集合針對(duì)高并發(fā)場(chǎng)景進(jìn)行了優(yōu)化;但是,每當(dāng)需要線程安全集合(作為鎖定普通集合的替代方法)時(shí),它們也很有用。不過(guò),有一些注意事項(xiàng):
換句話(huà)說(shuō),這些集合不僅僅是使用帶鎖的普通集合的快捷方式。為了演示,如果我們?cè)诰€程上執(zhí)行以下代碼
var d = new ConcurrentDictionary<int,int>();
for (int i = 0; i < 1000000; i++) d[i] = 123;
它的運(yùn)行速度比這慢三倍:
var d = new Dictionary<int,int>();
for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123;
(但是 ConcurrentDictionary 讀取速度很快,因?yàn)樽x取是。
并發(fā)集合與傳統(tǒng)集合的不同之處在于,它們公開(kāi)了執(zhí)行原子測(cè)試和操作操作的特殊方法,例如 TryPop 。這些方法中的大多數(shù)都是通過(guò)IProducerConsumerCollection<T>統(tǒng)一的。
生產(chǎn)者/使用者集合是兩個(gè)主要用例的集合:
典型的示例是堆棧和隊(duì)列。生產(chǎn)者/使用者集合在并行編程中非常重要,因?yàn)樗鼈冇欣诟咝У臒o(wú)鎖實(shí)現(xiàn)。
IProducerConsumerCollection<T> 接口表示線程安全的集合。以下類(lèi)實(shí)現(xiàn)此接口:
ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>
IProducerConsumerCollection<T>擴(kuò)展了ICollection,增加了以下方法:
void CopyTo (T[] array, int index);
T[] ToArray();
bool TryAdd (T item);
bool TryTake (out T item);
TryAdd 和 TryTake 方法測(cè)試是否可以執(zhí)行添加/刪除操作;如果是這樣,他們將執(zhí)行添加/刪除。測(cè)試和操作以原子方式執(zhí)行,無(wú)需像在傳統(tǒng)集合周?chē)菢渔i定:
int result;
lock (myStack) if (myStack.Count > 0) result = myStack.Pop();
如果集合為空,則 TryTake 返回 false。TryAdd 始終成功,并在提供的三個(gè)實(shí)現(xiàn)中返回 true。但是,如果您編寫(xiě)了自己的并發(fā)集合,該集合禁止重復(fù),則如果元素已存在,則會(huì)使 TryAdd 返回 false(例如,如果您編寫(xiě)了并發(fā))。
TryTake 刪除的特定元素由子類(lèi)定義:
這三個(gè)具體的類(lèi)大多顯式地實(shí)現(xiàn) TryTake 和 TryAdd 方法,通過(guò)更具體命名的公共方法(如 TryDequeue 和 TryPop)公開(kāi)相同的功能。
ConcurrentBag<T> 存儲(chǔ)對(duì)象的集合(允許重復(fù))。ConcurrentBag<T> 適用于在調(diào)用 Take 或 TryTake 時(shí)您哪個(gè)元素的情況。
ConcurrentBag<T> 相對(duì)于并發(fā)隊(duì)列或堆棧的好處是,當(dāng)同時(shí)被多個(gè)線程調(diào)用時(shí),包的 Add 用。相反,在隊(duì)列或堆棧上并行調(diào)用 Add 會(huì)導(dǎo)致?tīng)?zhēng)用(盡管比鎖定集合要少得多)。在并發(fā)包上調(diào)用 Take 也非常有效——只要每個(gè)線程占用的元素不超過(guò)它添加的元素。
在并發(fā)包中,每個(gè)線程都有自己的私有鏈表。元素被添加到屬于調(diào)用 Add 的線程的私有列表中,消除了。枚舉包時(shí),枚舉器遍歷每個(gè)線程的私有列表,依次生成其每個(gè)元素。
當(dāng)您調(diào)用 Take 時(shí),包首先查看當(dāng)前線程的私有列表。如果至少有一個(gè)元素,1它可以輕松完成任務(wù),而不會(huì)爭(zhēng)用。但是,如果列表為空,則必須從另一個(gè)線程的私有列表中“竊取”元素,并引起爭(zhēng)用的可能性。
因此,準(zhǔn)確地說(shuō),調(diào)用 Take 會(huì)為您提供最近在該線程上添加的元素;如果該線程上沒(méi)有元素,它將為您提供最近在另一個(gè)線程上添加的元素,這是隨機(jī)選擇的。
當(dāng)集合上的并行操作主要包括添加元素時(shí),或者當(dāng)添加 s 和 Take s 在線程上平衡時(shí),并發(fā)袋是理想的選擇。我們之前在使用 Parallel.ForEach 實(shí)現(xiàn)并行拼寫(xiě)檢查器時(shí)看到了前者的示例:
var misspellings = new ConcurrentBag<Tuple<int,string>>();
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
if (!wordLookup.Contains (word))
misspellings.Add (Tuple.Create ((int) i, word));
});
對(duì)于生產(chǎn)者/消費(fèi)者隊(duì)列來(lái)說(shuō),并發(fā)包將是一個(gè)糟糕的選擇,因?yàn)樵厥怯删€程添加和刪除的。
如果在上一節(jié)中討論的任何生產(chǎn)者/使用者集合上調(diào)用 TryTake,ConcurrentStack<T>、ConcurrentQueue<T> 和 ConcurrentBag<T> ,并且集合為空,則該方法返回 false 。有時(shí),在這種情況下,到元素可用會(huì)更有用。
PFX 的設(shè)計(jì)人員沒(méi)有使用此功能重載 TryTake 方法(在允許取消令牌和超時(shí)后會(huì)導(dǎo)致成員井噴),而是將此功能封裝到名為 BlockingCollection<T> 的包裝類(lèi)中。阻塞集合包裝實(shí)現(xiàn) IProducerConsumerCollection<T> 的任何集合,并允許您從包裝的集合中獲取元素 — 如果沒(méi)有可用的元素,則阻止。
阻止集合還允許您限制集合的總大小,如果超過(guò)該大小,則阻止。以這種方式限制的集合稱(chēng)為。
要使用 BlockingCollection<T> :
如果在不傳入集合的情況下調(diào)用構(gòu)造函數(shù),則該類(lèi)將自動(dòng)實(shí)例化 ConcurrentQueue<T> 。生成和使用方法允許您指定取消令牌和超時(shí)。如果集合大小有限,則添加和 TryAdd 可能會(huì)阻止;當(dāng)集合為空時(shí),Take 和 TryTake 塊。
使用元素的另一種方法是調(diào)用 GetConsumingEnumerable 。這將返回一個(gè)(可能)無(wú)限序列,該序列在元素可用時(shí)生成元素。您可以通過(guò)調(diào)用 CompleteAdd 來(lái)強(qiáng)制序列結(jié)束:此方法還可以防止其他元素排隊(duì)。
BlockingCollection 還提供了稱(chēng)為 AddToAny 和 TakeFromAny 的靜態(tài)方法,它們?cè)试S您在指定多個(gè)阻塞集合的同時(shí)添加或獲取元素。然后,能夠?yàn)檎?qǐng)求提供服務(wù)的第一個(gè)集合將執(zhí)行該操作。
生產(chǎn)者/使用者隊(duì)列是一種有用的結(jié)構(gòu),無(wú)論是在并行編程還是常規(guī)并發(fā)方案中。以下是它的工作原理:
生產(chǎn)者/使用者隊(duì)列可讓您精確控制一次執(zhí)行多少工作線程,這不僅可用于限制 CPU 消耗,還可用于限制其他資源。例如,如果任務(wù)執(zhí)行密集型磁盤(pán) I/O,則可以限制并發(fā)性以避免使操作系統(tǒng)和其他應(yīng)用程序匱乏。您還可以在隊(duì)列的整個(gè)生命周期內(nèi)動(dòng)態(tài)添加和刪除工作線程。CLR 的線程池本身是一種生產(chǎn)者/使用者隊(duì)列,針對(duì)短期運(yùn)行的計(jì)算綁定作業(yè)進(jìn)行了優(yōu)化。
生產(chǎn)者/使用者隊(duì)列通常包含對(duì)其執(zhí)行(相同)任務(wù)的數(shù)據(jù)項(xiàng)。例如,數(shù)據(jù)項(xiàng)可能是文件名,任務(wù)可能是加密這些文件。但是,通過(guò)將項(xiàng)目設(shè)置為委托,您可以編寫(xiě)一個(gè)更通用的生產(chǎn)者/使用者隊(duì)列,其中每個(gè)項(xiàng)目都可以執(zhí)行任何操作。
,我們展示了如何使用AutoResetEvent從頭開(kāi)始編寫(xiě)生產(chǎn)者/消費(fèi)者隊(duì)列(以及后來(lái)使用監(jiān)視器的等待和脈沖)。但是,從頭開(kāi)始編寫(xiě)生產(chǎn)者/消費(fèi)者是不必要的,因?yàn)榇蠖鄶?shù)功能都是由 BlockingCollection<T> 提供的。以下是使用它的方法:
public class PCQueue : IDisposable
{
BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew (Consume);
}
public void Enqueue (Action action) { _taskQ.Add (action); }
void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available and will end when CompleteAdding is called.
foreach (Action action in _taskQ.GetConsumingEnumerable())
action(); // Perform task.
}
public void Dispose() { _taskQ.CompleteAdding(); }
}
因?yàn)槲覀儧](méi)有將任何東西傳遞到 BlockingCollection 的構(gòu)造函數(shù)中,所以它會(huì)自動(dòng)實(shí)例化一個(gè)并發(fā)隊(duì)列。如果我們?cè)贑oncurrentStack中傳遞,我們最終會(huì)得到一個(gè)生產(chǎn)者/消費(fèi)者堆棧。
我們剛剛編寫(xiě)的生產(chǎn)者/使用者是不靈活的,因?yàn)槲覀儫o(wú)法在工作項(xiàng)排隊(duì)后跟蹤它們。如果我們能做到以下幾點(diǎn),那就太好了:
一個(gè)理想的解決方案是讓 Enqueue 方法返回一些對(duì)象,為我們提供剛才描述的功能。好消息是,已經(jīng)存在一個(gè)類(lèi)來(lái)做到這一點(diǎn) - Task 類(lèi),我們可以使用 TaskCompletionSource 或通過(guò)直接實(shí)例化(創(chuàng)建未啟動(dòng)或任務(wù))來(lái)生成它:
public class PCQueue : IDisposable
{
BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();
public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew (Consume);
}
public Task Enqueue (Action action, CancellationToken cancelToken
= default (CancellationToken))
{
var task = new Task (action, cancelToken);
_taskQ.Add (task);
return task;
}
public Task<TResult> Enqueue<TResult> (Func<TResult> func,
CancellationToken cancelToken = default (CancellationToken))
{
var task = new Task<TResult> (func, cancelToken);
_taskQ.Add (task);
return task;
}
void Consume()
{
foreach (var task in _taskQ.GetConsumingEnumerable())
try
{
if (!task.IsCanceled) task.RunSynchronously();
}
catch (InvalidOperationException) { } // Race condition
}
public void Dispose() { _taskQ.CompleteAdding(); }
}
在 排隊(duì) ,我們將創(chuàng)建但不啟動(dòng)的任務(wù)排隊(duì)并返回給調(diào)用方。
在 消費(fèi) 中,我們?cè)谑褂谜叩木€程上同步運(yùn)行任務(wù)。我們捕獲一個(gè) InvalidOperationException 來(lái)處理在檢查任務(wù)是否已取消和運(yùn)行它之間取消任務(wù)的不太可能的事件。
以下是我們?nèi)绾问褂么祟?lèi):
var pcQ = new PCQueue (2); // Maximum concurrency of 2
string result = await pcQ.Enqueue (() => "That was easy!");
...
因此,我們擁有任務(wù)的所有好處(異常傳播、返回值和取消),同時(shí)完全控制調(diào)度。
*請(qǐng)認(rèn)真填寫(xiě)需求信息,我們會(huì)在24小時(shí)內(nèi)與您取得聯(lián)系。