Warning: error_log(/data/www/wwwroot/hmttv.cn/caches/error_log.php): failed to open stream: Permission denied in /data/www/wwwroot/hmttv.cn/phpcms/libs/functions/global.func.php on line 537 Warning: error_log(/data/www/wwwroot/hmttv.cn/caches/error_log.php): failed to open stream: Permission denied in /data/www/wwwroot/hmttv.cn/phpcms/libs/functions/global.func.php on line 537
AssignmentManager模塊是HBase中一個(gè)非常重要的模塊,Assignment Manager(之后簡(jiǎn)稱(chēng)AM)負(fù)責(zé)了HBase中所有region的Assign,UnAssign,以及split/merge過(guò)程中region狀態(tài)變化的管理等等。在HBase-0.90之前,AM的狀態(tài)全部存在內(nèi)存中,自從HBASE-2485之后,AM把狀態(tài)持久化到了Zookeeper上。在此基礎(chǔ)上,社區(qū)對(duì)AM又修復(fù)了大量的bug和優(yōu)化(見(jiàn)此文章),最終形成了用在HBase-1.x版本上的這個(gè)AM。
相信深度使用過(guò)HBase的人一般都會(huì)被Region RIT的狀態(tài)困擾過(guò),長(zhǎng)時(shí)間的region in transition狀態(tài)簡(jiǎn)直令人抓狂。
除了一些確實(shí)是由于Region無(wú)法被RegionServer open的case,大部分的RIT,都是AM本身的問(wèn)題引起的。總結(jié)一下HBase-1.x版本中AM的問(wèn)題,主要有以下幾點(diǎn):
region狀態(tài)變化復(fù)雜
這張圖很好地展示了region在open過(guò)程中參與的組件和狀態(tài)變化。可以看到,多達(dá)7個(gè)組件會(huì)參與region狀態(tài)的變化。并且在region open的過(guò)程中多達(dá)20多個(gè)步驟!越復(fù)雜的邏輯意味著越容易出bug
region狀態(tài)多處緩存
region的狀態(tài)會(huì)緩存在多個(gè)地方,Master中RegionStates會(huì)保存Region的狀態(tài),Meta表中會(huì)保存region的狀態(tài),Zookeeper上也會(huì)保存region的狀態(tài),要保持這三者完全同步是一件很困難的事情。同時(shí),Master和RegionServer都會(huì)修改Meta表的狀態(tài)和Zookeeper的狀態(tài),非常容易導(dǎo)致?tīng)顟B(tài)的混亂。如果出現(xiàn)不一致,到底以哪里的狀態(tài)為準(zhǔn)?每一個(gè)region的transition流程都是各自為政,各自有各自的處理方法
重度依賴(lài)Zookeeper
在老的AM中,region狀態(tài)的通知完全通過(guò)Zookeeper。比如說(shuō)RegionServer打開(kāi)了一個(gè)region,它會(huì)在Zookeeper把這個(gè)region的RIT節(jié)點(diǎn)改成OPEN狀態(tài),而不去直接通知Master。Master會(huì)在Zookeeper上watch這個(gè)RIT節(jié)點(diǎn),通過(guò)Zookeeper的通知機(jī)制來(lái)通知Master這個(gè)region已經(jīng)發(fā)生變化。Master再根據(jù)Zookeeper上讀取出來(lái)的新?tīng)顟B(tài)進(jìn)行一定的操作。嚴(yán)重依賴(lài)Zookeeper的通知機(jī)制導(dǎo)致了region的上線(xiàn)/下線(xiàn)的速度存在了一定的瓶頸。特別是在region比較多的時(shí)候,Zookeeper的通知會(huì)出現(xiàn)嚴(yán)重的滯后現(xiàn)象。
正是這些問(wèn)題的存在,導(dǎo)致AM的問(wèn)題頻發(fā)。我本人就fix過(guò)多個(gè)AM導(dǎo)致region無(wú)法open的issue。比如說(shuō)這三個(gè)相互關(guān)聯(lián)的“連環(huán)”case:HBASE-17264,HBASE-17265,HBASE-17275。
面對(duì)這些問(wèn)題的存在,社區(qū)也在不斷嘗試解決這些問(wèn)題,特別是當(dāng)region的規(guī)模達(dá)到100w級(jí)別的時(shí)候,AM成為了一個(gè)嚴(yán)重的瓶頸。HBASE-11059中提出的ZK-less Region Assignment就是一個(gè)非常好的改良設(shè)計(jì)。在這個(gè)設(shè)計(jì)中,AM完全擺脫了Zookeeper的限制,在測(cè)試中,zk-less的assign比zk的assign快了一個(gè)數(shù)量級(jí)!
但是在這個(gè)設(shè)計(jì)中,它摒棄了Zookeeper這個(gè)持久化的存儲(chǔ),一些region transition過(guò)程中的中間狀態(tài)無(wú)法被保存。因此,在此基礎(chǔ)上,社區(qū)又更進(jìn)了一步,提出了Assignment Mananger V2在這個(gè)方案。在這個(gè)方案中,仍然摒棄了Zookeeper參與Assignment的整個(gè)過(guò)程。但是,它引入了ProcedureV2這個(gè)持久化存儲(chǔ)來(lái)保存Region transition中的各個(gè)狀態(tài),保證在master重啟時(shí),之前的assing/unassign,split等任務(wù)能夠從中斷點(diǎn)重新執(zhí)行。具體的來(lái)說(shuō),AMv2方案中,主要的改進(jìn)有以下幾點(diǎn):
Procedure V2
關(guān)于Procedure V2,我之后將獨(dú)立寫(xiě)文章介紹。這里,我只大概介紹下ProcedureV2和引入它所帶來(lái)的價(jià)值。
我們知道,Master中會(huì)有許多復(fù)雜的管理工作,比如說(shuō)建表,region的transition。這些工作往往涉及到非常多的步驟,如果master在做中間某個(gè)步驟的時(shí)候宕機(jī)了,這個(gè)任務(wù)就會(huì)永遠(yuǎn)停留在了中間狀態(tài)(RIT因?yàn)橹坝衂ookeeper做持久化因此會(huì)繼續(xù)從某個(gè)狀態(tài)開(kāi)始執(zhí)行)。比如說(shuō)在enable/disable table時(shí),如果master宕機(jī)了,可能表就停留在了enabling/disabling狀態(tài)。需要一些外部的手段進(jìn)行恢復(fù)。那么從本質(zhì)上來(lái)說(shuō),ProcedureV2提供了一個(gè)持久化的手段(通過(guò)ProcedureWAL,一種類(lèi)似RegionServer中WAL的日志持久化到HDFS上),使master在宕機(jī)后能夠繼續(xù)之前未完成的任務(wù)繼續(xù)完成。同時(shí),ProcedureV2提供了非常豐富的狀態(tài)轉(zhuǎn)換并支持回滾執(zhí)行,即使執(zhí)行到某一個(gè)步驟出錯(cuò),master也可以按照用戶(hù)的邏輯對(duì)之前的步驟進(jìn)行回滾。比如建表到某一個(gè)步驟失敗了,而之前已經(jīng)在HDFS中創(chuàng)建了一些新region的文件夾,那么ProcedureV2在rollback的時(shí)候,可以把這些殘留刪除掉。
Procedure中提供了兩種Procedure框架,順序執(zhí)行和狀態(tài)機(jī),同時(shí)支持在執(zhí)行過(guò)程中插入subProcedure,從而能夠支持非常豐富的執(zhí)行流程。在A(yíng)Mv2中,所有的Assign,UnAssign,TableCreate等等流程,都是基于Procedure實(shí)現(xiàn)的。
去除Zookeeper依賴(lài)
有了Procedure V2之后,所有的狀態(tài)都可以持久化在Procedure中,Procedure中每次的狀態(tài)變化,都能夠持久化到ProcedureWAL中,因此數(shù)據(jù)不會(huì)丟失,宕機(jī)后也能恢復(fù)。同時(shí),AMv2中region的狀態(tài)扭轉(zhuǎn)(OPENING,OPEN,CLOSING,CLOSE等)都會(huì)由Master記錄在Meta表中,不需要Zookeeper做持久化。再者,之前的AM使用的Zookeeper watch機(jī)制通知master region狀態(tài)的改變,而現(xiàn)在每當(dāng)RegionServer Open或者close一個(gè)region后,都會(huì)直接發(fā)送RPC給master匯報(bào),因此也不需要Zookeeper來(lái)做狀態(tài)的通知。綜合以上原因,Zookeeper已經(jīng)在A(yíng)Mv2中沒(méi)有了存在的必要。
減少狀態(tài)沖突的可能性
之前我說(shuō)過(guò),在之前的AM中,region的狀態(tài)會(huì)同時(shí)存在于meta表,Zookeeper和master的內(nèi)存狀態(tài)。同時(shí)Master和regionserver都會(huì)去修改Zookeeper和meta表,維護(hù)狀態(tài)統(tǒng)一的代價(jià)非常高,非常容易出bug。而在A(yíng)Mv2中,只有master才能去修改meta表。并在region整個(gè)transition中做為一個(gè)“權(quán)威”存在,如果regionserver匯報(bào)上來(lái)的region狀態(tài)與master看到的不一致,則master會(huì)命令RegionServer abort。Region的狀態(tài),都以master內(nèi)存中保存的RegionStates為準(zhǔn)。
除了上述這些優(yōu)化,AMv2中還有許多其他的優(yōu)化。比如說(shuō)AMv2依賴(lài)Procedure V2提供的一套locking機(jī)制,保證了對(duì)于一個(gè)實(shí)體,如一張表,一個(gè)region或者一個(gè)RegionServer同一時(shí)刻只有一個(gè)Procedure在執(zhí)行。同時(shí),在需要往RegionServer發(fā)送命令,如發(fā)送open,close等命令時(shí),AMv2實(shí)現(xiàn)了一個(gè)RemoteProcedureDispatcher來(lái)對(duì)這些請(qǐng)求做batch,批量把對(duì)應(yīng)服務(wù)器的指令一起發(fā)送等等。在代碼結(jié)構(gòu)上,之前處理相應(yīng)region狀態(tài)的代碼散落在A(yíng)ssignmentManager這個(gè)類(lèi)的各個(gè)地方,而在A(yíng)Mv2中,每個(gè)對(duì)應(yīng)的操作,都有對(duì)應(yīng)的Procedure實(shí)現(xiàn),如AssignProcedure,DisableTableProcedure,SplitTableRegionProcedure等等。這樣下來(lái),使AssignmentManager這個(gè)之前雜亂的類(lèi)變的清晰簡(jiǎn)單,代碼量從之前的4000多行減到了2000行左右。
AMv2中有太多的Procedure對(duì)應(yīng)各種不同的transition,這里不去詳細(xì)介紹每個(gè)Procedure的操作。我將以AssignProcedure為例,講解一下在A(yíng)Mv2中,一個(gè)region是怎么assign給一個(gè)RegionServer,并在對(duì)應(yīng)的RS上Open的。
AssignProcedure是一個(gè)基于Procedure實(shí)現(xiàn)的狀態(tài)機(jī)。它擁有3個(gè)狀態(tài):
AMv2中提供了一個(gè)Web頁(yè)面(Master頁(yè)面中的‘Procedures&Locks’鏈接)來(lái)展示當(dāng)前正在執(zhí)行的Procedure和持有的鎖。
其實(shí)通過(guò)log,我們也可以看到Assign的整個(gè)過(guò)程。
假設(shè),一臺(tái)server宕機(jī),此時(shí)master會(huì)產(chǎn)生一個(gè)ServerCrashProcedure 來(lái)處理,在這個(gè)Procedure中,會(huì)做一系列的工作,比如WAL的restore。當(dāng)這些前置的工作做完后,就會(huì)開(kāi)始assign之前在宕掉服務(wù)器上的region,比如56f985a727afe80a184dac75fbf6860c。此時(shí)會(huì)在ServerCrashProcedure產(chǎn)生一系列的子任務(wù):
2017-05-23 12:04:24,175 INFO [ProcExecWrkr-30] procedure2.ProcedureExecutor: Initialized subprocedures=[{pid=1178, ppid=1176, state=RUNNABLE:REGION_TRANSITION_QUEUE; AssignProcedure table=IntegrationTestBigLinkedList, region=bfd57f0b72fd3ca77e9d3c5e3ae48d76, target=ve0540.halxg.cloudera.com,16020,1495525111232}, {pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_QUEUE; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232}]
可以看到,ServerCrashProcedure的pid(Procedure ID)為1178,在此Procedure中產(chǎn)生的assign 56f985a727afe80a184dac75fbf6860c這個(gè)region的子Procedure的pid為1179,同時(shí)他的ppid(Parent Procedure ID)為1178。在A(yíng)Mv2中,通過(guò)追蹤這些ID,就非常容易把一個(gè)region的transition整個(gè)過(guò)程全部串起來(lái)。
接下來(lái),pid=1170這個(gè)Procedure開(kāi)始執(zhí)行,首先執(zhí)行的是REGION_TRANSITION_QUEUE狀態(tài)的邏輯,然后進(jìn)入睡眠狀態(tài)。
2017-05-23 12:04:24,241 INFO [ProcExecWrkr-30] assignment.AssignProcedure: Start pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_QUEUE; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232; rit=OFFLINE, location=ve0540.halxg.cloudera.com,16020,1495525111232; forceNewPlan=false, retain=false
當(dāng)target server被指定時(shí),Procedure進(jìn)入REGION_TRANSITION_DISPATCH狀態(tài),dispatch了region open的請(qǐng)求,同時(shí)把meta表中region的狀態(tài)改成了OPENING,然后再次進(jìn)入休眠狀態(tài)
2017-05-23 12:04:24,494 INFO [ProcExecWrkr-38] assignment.RegionStateStore: pid=1179 updating hbase:meta row=IntegrationTestBigLinkedList,H\xE3@\x8D\x964\x9D\xDF\x8F@9'\x0F\xC8\xCC\xC2,1495566261066.56f985a727afe80a184dac75fbf6860c., regionState=OPENING, regionLocation=ve0540.halxg.cloudera.com,16020,1495525111232 2017-05-23 12:04:24,498 INFO [ProcExecWrkr-38] assignment.RegionTransitionProcedure: Dispatch pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_DISPATCH; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232; rit=OPENING, location=ve0540.halxg.cloudera.com,16020,1495525111232
最后,當(dāng)RegionServer打開(kāi)了這個(gè)region后,會(huì)發(fā)RPC通知master,那么在通知過(guò)程中,這個(gè)Procedure再次被喚醒,開(kāi)始執(zhí)行REGION_TRANSITION_FINISH的邏輯,最后更新meta表,把這個(gè)region置為打開(kāi)狀態(tài)。
2017-05-23 12:04:26,643 DEBUG [RpcServer.default.FPBQ.Fifo.handler=46,queue=1,port=16000] assignment.RegionTransitionProcedure: Received report OPENED seqId=11984985, pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_DISPATCH; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232; rit=OPENING, location=ve0540.halxg.cloudera.com,16020,1495525111232 2017-05-23 12:04:26,643 INFO [ProcExecWrkr-9] assignment.RegionStateStore: pid=1179 updating hbase:meta row=IntegrationTestBigLinkedList,H\xE3@\x8D\x964\x9D\xDF\x8F@9'\x0F\xC8\xCC\xC2,1495566261066.56f985a727afe80a184dac75fbf6860c., regionState=OPEN, openSeqNum=11984985, regionLocation=ve0540.halxg.cloudera.com,16020,1495525111232 2017-05-23 12:04:26,836 INFO [ProcExecWrkr-9] procedure2.ProcedureExecutor: Finish suprocedure pid=1179, ppid=1176, state=SUCCESS; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232
一路看下來(lái),由于整個(gè)region assign的過(guò)程都是在Procedure中執(zhí)行,整個(gè)過(guò)程清晰明了,非常容易追述,也沒(méi)有了Zookeeper一些event事件的干擾。
Assignment Mananger V2依賴(lài)Procedure V2實(shí)現(xiàn)了一套清晰明了的region transition機(jī)制。去除了Zookeeper依賴(lài),減少了region狀態(tài)沖突的可能性。整體上來(lái)看,代碼的可讀性更強(qiáng),出了問(wèn)題也更好查錯(cuò)。對(duì)于解決之前AM中的一系列“頑疾”,AMv2做了很好的嘗試,也是一個(gè)非常好的方向。
AMv2之所以能保持簡(jiǎn)潔高效的一個(gè)重要原因就是重度依賴(lài)了Procedure V2,把一些復(fù)雜的邏輯都轉(zhuǎn)移到了Procedure V2中。但是這樣做的問(wèn)題是:一旦ProcedureWAL出現(xiàn)了損壞,或者Procedure本身存在bug,這個(gè)后果就是災(zāi)難性的。事實(shí)上在我們的測(cè)試環(huán)境中,就出現(xiàn)過(guò)PRocedureWAL損壞導(dǎo)致region RIT的情況。
另外需要注意的是,截止目前為止,HBCK仍然無(wú)法支持AMv2,這會(huì)導(dǎo)致一旦出現(xiàn)問(wèn)題,修復(fù)起來(lái)會(huì)比較困難。
當(dāng)然,新的事務(wù)還是要有一段成熟期,相信經(jīng)過(guò)一段時(shí)間的bug修復(fù)和完善后,我相信AMv2一定會(huì)完美解決之前的一些問(wèn)題,給HBase的運(yùn)維上帶來(lái)一些不同的體驗(yàn)。愿世界不再被HBase的RIT困擾 :-)。
阿里HBase目前已經(jīng)在阿里云提供商業(yè)化服務(wù),任何有需求的用戶(hù)都可以在阿里云端使用深入改進(jìn)的、一站式的HBase服務(wù)。云HBase版本與自建HBase相比在運(yùn)維、可靠性、性能、穩(wěn)定性、安全、成本等方面均有很多的改進(jìn)。
同時(shí),云HBase2.0 在2018年6月6日正式發(fā)布,點(diǎn)擊了解更多!
文作者為京東算法服務(wù)部的張穎和段學(xué)浩,并由 Apache Hive PMC,阿里巴巴技術(shù)專(zhuān)家李銳幫忙校對(duì)。主要內(nèi)容為:
1.背景
2.Flink SQL 的優(yōu)化
3.總結(jié)
目前,京東搜索推薦的數(shù)據(jù)處理流程如上圖所示。可以看到實(shí)時(shí)和離線(xiàn)是分開(kāi)的,離線(xiàn)數(shù)據(jù)處理大部分用的是 Hive / Spark,實(shí)時(shí)數(shù)據(jù)處理則大部分用 Flink / Storm。
這就造成了以下現(xiàn)象:在一個(gè)業(yè)務(wù)引擎里,用戶(hù)需要維護(hù)兩套環(huán)境、兩套代碼,許多共性不能復(fù)用,數(shù)據(jù)的質(zhì)量和一致性很難得到保障。且因?yàn)榱髋讓訑?shù)據(jù)模型不一致,導(dǎo)致需要做大量的拼湊邏輯;甚至為了數(shù)據(jù)一致性,需要做大量的同比、環(huán)比、二次加工等數(shù)據(jù)對(duì)比,效率極差,并且非常容易出錯(cuò)。
而支持批流一體的 Flink SQL 可以很大程度上解決這個(gè)痛點(diǎn),因此我們決定引入 Flink 來(lái)解決這種問(wèn)題。
在大多數(shù)作業(yè),特別是 Flink 作業(yè)中,執(zhí)行效率的優(yōu)化一直是 Flink 任務(wù)優(yōu)化的關(guān)鍵,在京東每天數(shù)據(jù)增量 PB 級(jí)情況下,作業(yè)的優(yōu)化顯得尤為重要。
寫(xiě)過(guò)一些 SQL 作業(yè)的同學(xué)肯定都知道,對(duì)于 Flink SQL 作業(yè),在一些情況下會(huì)造成同一個(gè) UDF 被反復(fù)調(diào)用的情況,這對(duì)一些消耗資源的任務(wù)非常不友好;此外,影響執(zhí)行效率大致可以從 shuffle、join、failover 策略等方面考慮;另外,F(xiàn)link 任務(wù)調(diào)試的過(guò)程也非常復(fù)雜,對(duì)于一些線(xiàn)上機(jī)器隔離的公司來(lái)說(shuō)尤甚。
為此,我們實(shí)現(xiàn)了內(nèi)嵌式的 Derby 來(lái)作為 Hive 的元數(shù)據(jù)存儲(chǔ)數(shù)據(jù)庫(kù) (allowEmbedded);在任務(wù)恢復(fù)方面,批式作業(yè)沒(méi)有 checkpoint 機(jī)制來(lái)實(shí)現(xiàn)failover,但是 Flink 特有的 region 策略可以使批式作業(yè)快速恢復(fù);此外,本文還介紹了對(duì)象重用等相關(guān)優(yōu)化措施。
1. UDF 重用
在 Flink SQL 任務(wù)里會(huì)出現(xiàn)以下這種情況:如果相同的 UDF 既出現(xiàn)在 LogicalProject 中,又出現(xiàn)在 Where 條件中,那么 UDF 會(huì)進(jìn)行多次調(diào)用 (見(jiàn)https://issues.apache.org/jira/browse/FLINK-20887)。但是如果該 UDF 非常耗 CPU 或者內(nèi)存,這種多余的計(jì)算會(huì)非常影響性能,為此我們希望能把 UDF 的結(jié)果緩存起來(lái)下次直接使用。在設(shè)計(jì)的時(shí)候需要考慮:(非常重要:請(qǐng)一定保證 LogicalProject 和 where 條件的 subtask chain 到一起)
根據(jù)以上考慮,我們用 guava cache 將 UDF 的結(jié)果緩存起來(lái),之后調(diào)用的時(shí)候直接去cache 里面拿數(shù)據(jù),最大可能降低任務(wù)的消耗。下面是一個(gè)簡(jiǎn)單的使用(同時(shí)設(shè)置了最大使用 size、超時(shí)時(shí)間,但是沒(méi)有寫(xiě)鎖):
public class RandomFunction extends ScalarFunction {
private static Cache<String, Integer> cache=CacheBuilder.newBuilder()
.maximumSize(2)
.expireAfterWrite(3, TimeUnit.SECONDS)
.build();
public int eval(String pvid) {
profileLog.error("RandomFunction invoked:" + atomicInteger.incrementAndGet());
Integer result=cache.getIfPresent(pvid);
if (null==result) {
int tmp=(int)(Math.random() * 1000);
cache.put("pvid", tmp);
return tmp;
}
return result;
}
@Override
public void close() throws Exception {
super.close();
cache.cleanUp();
}
}
2. 單元測(cè)試
大家可能會(huì)好奇為什么會(huì)把單元測(cè)試也放到優(yōu)化里面,大家都知道 Flink 任務(wù)調(diào)試過(guò)程非常復(fù)雜,對(duì)于一些線(xiàn)上機(jī)器隔離的公司來(lái)說(shuō)尤甚。京東的本地環(huán)境是沒(méi)有辦法訪(fǎng)問(wèn)任務(wù)服務(wù)器的,因此在初始階段調(diào)試任務(wù),我們耗費(fèi)了很多時(shí)間用來(lái)上傳 jar 包、查看日志等行為。
為了降低任務(wù)的調(diào)試時(shí)間、增加代碼開(kāi)發(fā)人員的開(kāi)發(fā)效率,實(shí)現(xiàn)了內(nèi)嵌式的 Derby 來(lái)作為 Hive 的元數(shù)據(jù)存儲(chǔ)數(shù)據(jù)庫(kù) (allowEmbedded),這算是一種優(yōu)化開(kāi)發(fā)時(shí)間的方法。具體思路如下:
首先創(chuàng)建 Hive Conf:
public static HiveConf createHiveConf() {
ClassLoader classLoader=new HiveOperatorTest().getClass().getClassLoader();
HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));
try {
TEMPORARY_FOLDER.create();
String warehouseDir=TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
String warehouseUri=String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);
HiveConf hiveConf=new HiveConf();
hiveConf.setVar(
HiveConf.ConfVars.METASTOREWAREHOUSE,
TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);
hiveConf.set("datanucleus.connectionPoolingType", "None");
hiveConf.set("hive.metastore.schema.verification", "false");
hiveConf.set("datanucleus.schema.autoCreateTables", "true");
return hiveConf;
} catch (IOException e) {
throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);
}
}
接下來(lái)創(chuàng)建 Hive Catalog:(利用反射的方式調(diào)用 embedded 的接口)
public static void createCatalog() throws Exception{
Class clazz=HiveCatalog.class;
Constructor c1=clazz.getDeclaredConstructor(new Class[]{String.class, String.class, HiveConf.class, String.class, boolean.class});
c1.setAccessible(true);
hiveCatalog=(HiveCatalog)c1.newInstance(new Object[]{"test-catalog", null, createHiveConf(), "2.3.4", true});
hiveCatalog.open();
}
創(chuàng)建 tableEnvironment:(同官網(wǎng))
EnvironmentSettings settings=EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv=TableEnvironment.create(settings);
TableConfig tableConfig=tableEnv.getConfig();
Configuration configuration=new Configuration();
configuration.setInteger("table.exec.resource.default-parallelism", 1);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
最后關(guān)閉 Hive Catalog:
public static void closeCatalog() {
if (hiveCatalog !=null) {
hiveCatalog.close();
}
}
此外,對(duì)于單元測(cè)試,構(gòu)建合適的數(shù)據(jù)集也是一個(gè)非常大的功能,我們實(shí)現(xiàn)了 CollectionTableFactory,允許自己構(gòu)建合適的數(shù)據(jù)集,使用方法如下:
CollectionTableFactory.reset();
CollectionTableFactory.initData(Arrays.asList(Row.of("this is a test"), Row.of("zhangying480"), Row.of("just for test"), Row.of("a test case")));
StringBuilder sbFilesSource=new StringBuilder();
sbFilesSource.append("CREATE temporary TABLE db1.`search_realtime_table_dump_p13`(" + " `pvid` string) with ('connector.type'='COLLECTION','is-bounded'='true')");
tableEnv.executeSql(sbFilesSource.toString());
3. join 方式的選擇
傳統(tǒng)的離線(xiàn) Batch SQL (面向有界數(shù)據(jù)集的 SQL) 有三種基礎(chǔ)的實(shí)現(xiàn)方式,分別是 Nested-loop Join、Sort-Merge Join 和 Hash Join。
效率 | 空間 | 備注 | |
Nested-loop Join | 差 | 占用大 | |
Sort-Merge Join | 有sort merge開(kāi)銷(xiāo) | 占用小 | 有序數(shù)據(jù)集的一種優(yōu)化措施 |
Hash Join | 高 | 占用大 | 適合大小表 |
注意:Sort-Merge Join 和 Hash Join 只適用于 Equi-Join ( Join 條件均使用等于作為比較算子)。
Flink 在 join 之上又做了一些細(xì)分,具體包括:
特點(diǎn) | 使用 | |
Repartition-Repartition strategy | 對(duì)數(shù)據(jù)集分別進(jìn)行分區(qū)和shuffle,如果數(shù)據(jù)集大的時(shí)候效率極差 | 兩個(gè)數(shù)據(jù)集相差不大 |
Broadcast-Forward strategy | 將小表的數(shù)據(jù)全部發(fā)送到大表數(shù)據(jù)的機(jī)器上 | 兩個(gè)數(shù)據(jù)集有較大的差距 |
眾所周知,batch 的 shuffle 非常耗時(shí)間。
可以通過(guò):table.optimizer.join.broadcast-threshold 來(lái)設(shè)置采用 broadcast 的 table 大小,如果設(shè)置為 “-1”,表示禁用 broadcast。
下圖為禁用前后的效果:
4. multiple input
在 Flink SQL 任務(wù)里,降低 shuffle 可以有效的提高 SQL 任務(wù)的吞吐量,在實(shí)際的業(yè)務(wù)場(chǎng)景中經(jīng)常遇到這樣的情況:上游產(chǎn)出的數(shù)據(jù)已經(jīng)滿(mǎn)足了數(shù)據(jù)分布要求 (如連續(xù)多個(gè) join 算子,其中 key 是相同的),此時(shí) Flink 的 forward shuffle 是冗余的 shuffle,我們希望將這些算子 chain 到一起。Flink 1.12 引入了 mutiple input 的特性,可以消除大部分沒(méi)必要的 forward shuffle,把 source 的算子 chain 到一起。
table.optimizer.multiple-input-enabled:true
下圖為開(kāi)了 multiple input 和沒(méi)有開(kāi)的拓?fù)鋱D ( operator chain 功能已經(jīng)打開(kāi)):?
5. 對(duì)象重用
上下游 operator 之間會(huì)經(jīng)過(guò)序列化 / 反序列化 / 復(fù)制階段來(lái)進(jìn)行數(shù)據(jù)傳輸,這種行為非常影響 Flink SQL 程序的性能,可以通過(guò)啟用對(duì)象重用來(lái)提高性能。但是這在 DataStream 里面非常危險(xiǎn),因?yàn)榭赡軙?huì)發(fā)生以下情況:在下一個(gè)算子中修改對(duì)象意外影響了上面算子的對(duì)象。
但是 Flink 的 Table / SQL API 中是非常安全的,可以通過(guò)如下方式來(lái)啟用:
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
或者是通過(guò)設(shè)置:pipeline-object-reuse:true
為什么啟用了對(duì)象重用會(huì)有這么大的性能提升?在 Blink planner 中,同一任務(wù)的兩個(gè)算子之間的數(shù)據(jù)交換最終將調(diào)用 BinaryString#copy,查看實(shí)現(xiàn)代碼,可以發(fā)現(xiàn) BinaryString#copy 需要復(fù)制底層 MemorySegment 的字節(jié),通過(guò)啟用對(duì)象重用來(lái)避免復(fù)制,可以有效提升效率。
下圖為沒(méi)有開(kāi)啟對(duì)象重用時(shí)相應(yīng)的火焰圖:
6. SQL 任務(wù)的 failover 策略
batch 任務(wù)模式下 checkpoint 以及其相關(guān)的特性全部都不可用,因此針對(duì)實(shí)時(shí)任務(wù)的基于 checkpoint 的 failover 策略是不能應(yīng)用在批任務(wù)上面的,但是 batch 任務(wù)允許 Task 之間通過(guò) Blocking Shuffle 進(jìn)行通信,當(dāng)一個(gè) Task 因?yàn)槿蝿?wù)未知的原因失敗之后,由于 Blocking Shuffle 中存儲(chǔ)了這個(gè) Task 所需要的全部數(shù)據(jù),所以只需要重啟這個(gè) Task 以及通過(guò) Pipeline Shuffle 與其相連的全部下游任務(wù)即可:
jobmanager.execution.failover-strategy:region (已經(jīng) finish 的 operator 可直接恢復(fù))
table.exec.shuffle-mode:ALL_EDGES_BLOCKING (shuffle 策略)。
7. shuffle
Flink 里的 shuffle 分為 pipeline shuffle 和 blocking shuffle。
相應(yīng)的控制參數(shù):
table.exec.shuffle-mode,該參數(shù)有多個(gè)參數(shù),默認(rèn)是 ALL_EDGES_BLOCKING,表示所有的邊都會(huì)用 blocking shuffle,不過(guò)大家可以試一下 POINTWISE_EDGES_PIPELINED,表示 forward 和 rescale edges 會(huì)自動(dòng)開(kāi)始 pipeline 模式。
taskmanager.network.sort-shuffle.min-parallelism ,將這個(gè)參數(shù)設(shè)置為小于你的并行度,就可以開(kāi)啟 sort-merge shuffle;這個(gè)參數(shù)的設(shè)置需要考慮一些其他的情況,具體的可以按照官網(wǎng)設(shè)置。
本文著重從 shuffle、join 方式的選擇、對(duì)象重用、UDF 重用等方面介紹了京東在 Flink SQL 任務(wù)方面做的優(yōu)化措施。另外,感謝京東實(shí)時(shí)計(jì)算研發(fā)部付海濤等全部同事的支持與幫助。
原文鏈接:http://click.aliyun.com/m/1000288770/
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
.創(chuàng)建主鍵
create table ESC_STOTE.TF_B_AIR_CONFIG(
TYPE_ID VARCHAR2(20) not null,
PROVINCE_CODE VARCHAR2(4) not null,
PROVINCE_TYPE VARCHAR2(2) not null,
LIMIT_NUM VARCHAR2(2) not null,
EFFECTIVE_FALG VARCHAR2 default '1',
UPDATE_TIME DATE default sysdate,
constraint TF_B_AIR_CONFIG_PK primary key(TYPE_ID)--單列主鍵
)
create table ECS_STORE.TF_B_AIR_CONFIG(
TYPE_ID VARCHAR2(20) not null,
PROVINCE_CODE VARCHAR2(4) not null,
PARAMETER_TYPE VARCHAR2(2) not null,
LIMIT_NUM VARCHAR2(4) not null,
EFFECTIVE_FALG VARCHAR2(2) default '1',
UPDATE_TIME DATE default sysdate,
constraint TF_B_AIR_CONFIG_PK primary key(TYPE_ID , PROVINCE_CODE)--復(fù)合主鍵
)
第二種:創(chuàng)建表后,再創(chuàng)建約束
alter table table_name add constraint constraint_name primary key(col1,col2,...coln);
示例:
----創(chuàng)建TF_B_AIR_CONFIG表
create table ECS_STORE.TF_B_AIR_CONFIG(
TYPE_ID VARCHAR2(20) not null,
PROVINCE_CODE VARCHAR2(4) not null,
PARAMETER_TYPE VARCHAR2(2) not null,
LIMIT_NUM VARCHAR2(4) not null,
EFFECTIVE_FALG VARCHAR2(2) default '1',
UPDATE_TIME DATE default sysdate
)
--單列主鍵
alter table ECS_STORE.TF_B_AIR_CONFIG add constraint TF_B_AIR_CONFIG_PK primary key (TYPE_ID);
--聯(lián)合主鍵
alter table ECS_STORE.TF_B_AIR_CONFIG add constraint TF_B_AIR_CONFIG_PK primary key (TYPE_ID , PROVINCE_CODE);
2.禁用主鍵
alter table table_name disable constraint constraint_name;
alter table ECS_STORE.TF_B_AIR_CONFIG disable constraint TF_B_AIR_CONFIG_PK ;
3.啟用主鍵
alter table table_name enable constraint constraint_name;
alter table ECS_STORE.TF_B_AIR_CONFIG enable constraint TF_B_AIR_CONFIG_PK ;
4.刪除主鍵
alter table table_name drop constraint constraint_name;
*請(qǐng)認(rèn)真填寫(xiě)需求信息,我們會(huì)在24小時(shí)內(nèi)與您取得聯(lián)系。