《如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單易用且可靠的消息隊(duì)列框架?》要點(diǎn):
本文介紹了如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單易用且可靠的消息隊(duì)列框架?,希望對(duì)您有用。如果有疑問(wèn),可以聯(lián)系我們。
作者:李艷鵬
編輯:Gary
消息隊(duì)列在互聯(lián)網(wǎng)領(lǐng)域里得到了廣泛的應(yīng)用,它多應(yīng)用在異步處理、模塊之間的解偶和高并發(fā)的消峰等場(chǎng)景,消息隊(duì)列中表現(xiàn)最好的當(dāng)屬Apache開源項(xiàng)目Kafka,Kafka使用支持高并發(fā)的Scala語(yǔ)言開發(fā),利用操作系統(tǒng)的緩存原理達(dá)到高性能,并且天生具有可分區(qū),分布式的特點(diǎn),而且有不同語(yǔ)言的客戶端,使用起來(lái)非常的方便.
Kclient是Kafka生產(chǎn)者客戶端和消費(fèi)者客戶端的一個(gè)簡(jiǎn)單易用的框架,它具有高效集成、高性能、高穩(wěn)定的高級(jí)特點(diǎn).
在繼續(xù)閱讀kclient的功能特性、架構(gòu)設(shè)計(jì)和使用方法之前,讀者需要對(duì)Kafka進(jìn)行基本的學(xué)習(xí)和了解.如果讀者是Kafka的初學(xué)者,而且英文還不錯(cuò),也可以直接參考Kafka官方在線文檔:Kafka 0.8.2 Documentation,如果對(duì)英文不感性趣,可以在網(wǎng)上搜索Kakfa的中文資料進(jìn)行學(xué)習(xí),內(nèi)容需要涵蓋Kafka的使用向?qū)б约袄貌僮飨到y(tǒng)緩存的“空中接力”、持久化、分片機(jī)制、高可用等原理.
本文包含了背景介紹、功能特性、架構(gòu)設(shè)計(jì)、使用指南、API簡(jiǎn)介、后臺(tái)監(jiān)控和管理、消息處理機(jī)模板項(xiàng)目、以及性能壓測(cè)相關(guān)章節(jié).如果你想使用kclient快速的構(gòu)建Kafka處理機(jī)服務(wù),請(qǐng)參考消息處理機(jī)模板項(xiàng)目章節(jié); 如果你想了解kclient的其他使用方式、功能特性、監(jiān)控和管理等,請(qǐng)參考功能特性、使用指南、API簡(jiǎn)介、后臺(tái)監(jiān)控和管理等章節(jié); 如果你想更深入的理解kclient的架構(gòu)設(shè)計(jì)和性能指標(biāo),請(qǐng)參考架構(gòu)設(shè)計(jì)和性能壓測(cè)章節(jié).
簡(jiǎn)化了Kafka客戶端API的使用方法, 特別是對(duì)消費(fèi)端開發(fā),消費(fèi)端開發(fā)者只需要實(shí)現(xiàn)MessageHandler接口或者相關(guān)子類,在實(shí)現(xiàn)中處理消息完成業(yè)務(wù)邏輯,并且在主線程中啟動(dòng)封裝的消費(fèi)端服務(wù)器即可.它提供了各種常用的MessageHandler,框架自動(dòng)轉(zhuǎn)換消息到領(lǐng)域?qū)ο竽P突蛘逬SON對(duì)象等數(shù)據(jù)結(jié)構(gòu),讓開發(fā)者更專注于業(yè)務(wù)處理.如果使用服務(wù)源碼注解的方式聲明消息處理機(jī)的后臺(tái),可以將一個(gè)通用的服務(wù)方法直接轉(zhuǎn)變成具有完善功能的處理Kafka消息隊(duì)列的處理機(jī),使用起來(lái)極其簡(jiǎn)單,代碼看起來(lái)一目了然,在框架級(jí)別通過(guò)多種線程池技術(shù)保證了處理機(jī)的高性能.
在使用方面,它提供了多種使用方式:
除此之外,它基于注解提供了消息處理機(jī)的模板項(xiàng)目,可以根據(jù)模板項(xiàng)目通過(guò)配置快速開發(fā)Kafka的消息處理機(jī).
為了在不同的業(yè)務(wù)場(chǎng)景下實(shí)現(xiàn)高性能, 它提供不同的線程模型:
為了在不同的業(yè)務(wù)場(chǎng)景下實(shí)現(xiàn)高性能, 它提供不同的線程模型:
另外,異步模型中的線程池也支持確定數(shù)量線程的線程池和線程數(shù)量可伸縮的線程池.
框架級(jí)別處理了常見的異常,計(jì)入錯(cuò)誤日志,可用于錯(cuò)誤手工恢復(fù)或者洗數(shù)據(jù),并實(shí)現(xiàn)了優(yōu)雅關(guān)機(jī)和重啟等功能.
同步線程模型
在這種線程模型中,客戶端為每一個(gè)消費(fèi)者流使用一個(gè)線程,每個(gè)線程負(fù)責(zé)從Kafka隊(duì)列里消費(fèi)消息,并且在同一個(gè)線程里進(jìn)行業(yè)務(wù)處理.我們把這些線程稱為消費(fèi)線程,把這些線程所在的線程池叫做消息消費(fèi)線程池.這種模型之所以在消息消費(fèi)線程池處理業(yè)務(wù),是因?yàn)樗嘤糜谔幚磔p量級(jí)別的業(yè)務(wù),例如:緩存查詢、本地計(jì)算等.
異步線程模型
在這種線程模型中,客戶端為每一個(gè)消費(fèi)者流使用一個(gè)線程,每個(gè)線程負(fù)責(zé)從Kafka隊(duì)列里消費(fèi)消息,并且傳遞消費(fèi)得到的消息到后端的異步線程池,在異步線程池中處理業(yè)務(wù).我們?nèi)匀话亚懊尕?fù)責(zé)消費(fèi)消息的線程池稱為消息消費(fèi)線程池,把后面的異步線程池稱為異步業(yè)務(wù)線程池.這種線程模型適合重量級(jí)的業(yè)務(wù),例如:業(yè)務(wù)中有大量的IO操作、網(wǎng)絡(luò)IO操作、復(fù)雜計(jì)算、對(duì)外部系統(tǒng)的調(diào)用等.
后端的異步業(yè)務(wù)線程池又細(xì)分為所有消費(fèi)者流共享線程池和每個(gè)流獨(dú)享線程池.下面詳細(xì)介紹下.
所有消費(fèi)者流共享線程池:所有消費(fèi)者流共享線程池對(duì)比每個(gè)流獨(dú)享線程池,創(chuàng)建更少的線程池對(duì)象,能節(jié)省些許的內(nèi)存,但是,由于多個(gè)流共享同一個(gè)線程池,在數(shù)據(jù)量較大的時(shí)候,流之間的處理可能互相影響.例如,一個(gè)業(yè)務(wù)使用2個(gè)區(qū)和兩個(gè)流,他們一一對(duì)應(yīng),通過(guò)生產(chǎn)者指定定制化的散列函數(shù)替換默認(rèn)的key-hash, 實(shí)現(xiàn)一個(gè)流(區(qū))用來(lái)處理普通用戶,另外一個(gè)流(區(qū))用來(lái)處理VIP用戶,如果兩個(gè)流共享一個(gè)線程池,當(dāng)普通用戶的消息大量產(chǎn)生的時(shí)候,VIP用戶只有少量,并且排在了隊(duì)列的后頭,就會(huì)產(chǎn)生餓死的情況.這個(gè)場(chǎng)景是可以使用多個(gè)topic來(lái)解決,一個(gè)普通用戶的topic,一個(gè)VIP用戶的topic,但是這樣又要多維護(hù)一個(gè)topic,客戶端發(fā)送的時(shí)候需要顯式的進(jìn)行判斷topic目標(biāo),也沒有多少好處.
每個(gè)流獨(dú)享線程池:每個(gè)流獨(dú)享線程池使用不同的異步業(yè)務(wù)線程池來(lái)處理不同的流里面的消息,互相隔離,互相獨(dú)立,不互相影響,對(duì)于不同的流(區(qū))的優(yōu)先級(jí)不同的情況,或者消息在不同流(區(qū))不均衡的情況下表現(xiàn)會(huì)更好,當(dāng)然,創(chuàng)建多個(gè)線程池會(huì)多使用些許內(nèi)存,但是這并不是一個(gè)大問(wèn)題.
另外,異步業(yè)務(wù)線程池支持確定數(shù)量線程的線程池和線程數(shù)量可伸縮的線程池.
對(duì)于消息處理過(guò)程中產(chǎn)生的業(yè)務(wù)異常,當(dāng)前在業(yè)務(wù)處理的上層捕捉了Throwable, 在專用的錯(cuò)誤恢復(fù)日志中記錄出錯(cuò)的消息,后續(xù)可根據(jù)錯(cuò)誤恢復(fù)日志人工處理錯(cuò)誤消息,也可重做或者洗數(shù)據(jù).TODO:考慮實(shí)現(xiàn)異常Listener體系結(jié)構(gòu), 對(duì)異常處理實(shí)現(xiàn)監(jiān)聽者模式,異常處理器可插拔等,默認(rèn)打印錯(cuò)誤日志.
由于默認(rèn)的異常處理中,捕捉異常,在專用的錯(cuò)誤回復(fù)日志中記錄錯(cuò)誤,并且繼續(xù)處理下一個(gè)消息.考慮到可能上線失敗,或者上游消息格式出錯(cuò),導(dǎo)致所有消息處理都出錯(cuò),堆滿錯(cuò)誤恢復(fù)日志的情況,我們需要訴諸于報(bào)警和監(jiān)控系統(tǒng)來(lái)解決.
由于消費(fèi)者本身是一個(gè)事件驅(qū)動(dòng)的服務(wù)器,類似Tomcat,Tomcat接收HTTP請(qǐng)求返回HTTP響應(yīng),Consumer則接收Kafka消息,然后處理業(yè)務(wù)后返回,也可以將處理結(jié)果發(fā)送到下一個(gè)消息隊(duì)列.所以消費(fèi)者本身是非常復(fù)雜的,除了線程模型,異常處理,性能,穩(wěn)定性,可用性等都是需要思考點(diǎn).既然消費(fèi)者是一個(gè)后臺(tái)的服務(wù)器,我們需要考慮如何優(yōu)雅的關(guān)機(jī),也就是在消費(fèi)者服務(wù)器在處理消息的時(shí)候,如果關(guān)機(jī)才能不導(dǎo)致處理的消息中斷而丟失.
優(yōu)雅關(guān)機(jī)的重點(diǎn)在于解決如下3個(gè)問(wèn)題:
第一個(gè)問(wèn)題: 如果一個(gè)后臺(tái)程序運(yùn)行在控制臺(tái)的前臺(tái),通過(guò)Ctrl + C可以發(fā)送退出信號(hào)給JVM,也可以通過(guò)kill -2 PS_IS 或者 kill -15 PS_IS發(fā)送退出信號(hào),但是不能發(fā)送kill -9 PS_IS, 否則進(jìn)程會(huì)無(wú)條件強(qiáng)制退出.JVM收到退出信號(hào)后,會(huì)調(diào)用注冊(cè)的鉤子,我們通過(guò)的注冊(cè)的JVM退出鉤子進(jìn)行優(yōu)雅關(guān)機(jī).
第二個(gè)問(wèn)題: 線程分為Daemon線程和非Daemon線程,一個(gè)線程默認(rèn)繼承父線程的Daemon屬性,如果當(dāng)前線程池是由Daemon線程創(chuàng)建的,則Worker線程是Daemon線程.如果Worker線程是Daemon線程,我們需要在JVM退出鉤子中等待Worker線程完成當(dāng)前手頭處理的消息,再退出JVM.如果不是Daemon線程,即使JVM收到退出信號(hào),也得等待Worker線程退出后再退出,不會(huì)丟掉正在處理的消息.
第三個(gè)問(wèn)題: 在Worker線程從Kafka服務(wù)器消費(fèi)消息的時(shí)候,Worker線程可能處于阻塞,這時(shí)需要中斷線程以退出,沒有消息被丟掉.在Worker線程處理業(yè)務(wù)時(shí)有可能有阻塞,例如:IO,網(wǎng)絡(luò)IO,在指定退出時(shí)間內(nèi)沒有完成,我們也需要中斷線程退出,這時(shí)會(huì)產(chǎn)生一個(gè)InterruptedException, 在異常處理的默認(rèn)處理器中被捕捉,并寫入錯(cuò)誤日志,Worker線程隨后退出.
kclient提供了三種使用方法,對(duì)于每一種方法,按照下面的步驟可快速構(gòu)建Kafka生產(chǎn)者和消費(fèi)者程序.
1) 下載源代碼后在項(xiàng)目根目錄執(zhí)行如下命令安裝打包文件到你的Maven本地庫(kù).
mvn install
2) 在你的項(xiàng)目pom.xml文件中添加對(duì)kclient的依賴.
3) 根據(jù)Kafka官方文檔搭建Kafka環(huán)境,并創(chuàng)建兩個(gè)Topic, test1和test2.
4) 然后,從Kafka安裝目錄的config目錄下拷貝kafka-consumer.properties和kafka-producer.properties到你的項(xiàng)目類路徑下,通常是src/main/resources目錄.
Java API提供了最直接,最簡(jiǎn)單的使用kclient的方法.
構(gòu)建Producer示例:
構(gòu)建Consumer示例:
kclient可以與Spring環(huán)境無(wú)縫集成,你可以像使用Spring Bean一樣來(lái)使用KafkaProducer和KafkaConsumer.
構(gòu)建Producer示例:
構(gòu)建Consumer示例:
kclient提供了類似Spring聲明式的編程方法,使用注解聲明Kafka處理器方法,所有的線程模型、異常處理、服務(wù)啟動(dòng)和關(guān)閉等都由后臺(tái)服務(wù)自動(dòng)完成,極大程度的簡(jiǎn)化了API的使用方法,提高了開發(fā)者的工作效率.
注解聲明Kafka消息處理器:
注解啟動(dòng)程序:
注解Spring環(huán)境配置:
KafkaProducer類提供了豐富的API來(lái)發(fā)送不同類型的消息,它支持發(fā)送字符串消息,發(fā)送一個(gè)普通的Bean,以及發(fā)送JSON對(duì)象等.在這些API中可以指定發(fā)送到某個(gè)Topic,也可以不指定而使用默認(rèn)的Topic.對(duì)于發(fā)送的數(shù)據(jù),支持帶Key值的消息和不帶Key值的消息.
發(fā)送字符串消息:
發(fā)送Bean消息:
發(fā)送JSON對(duì)象消息:
KafkaConsumer類提供了豐富的構(gòu)造函數(shù)用來(lái)指定Kafka消費(fèi)者服務(wù)器的各項(xiàng)參數(shù),包括線程池策略,線程池類型,流數(shù)量等等.
使用PROPERTIES文件初始化:
使用PROPERTIES對(duì)象初始化以及消息處理器注解、消息處理機(jī)模板項(xiàng)目可以查看以下鏈接繼續(xù)閱讀:
http://www.jianshu.com/p/304f2fd8388b
Benchmark應(yīng)該覆蓋推送QPS、接收處理QPS以及單線程、多線程生產(chǎn)者的用例.
用例1: 輕量級(jí)服務(wù)同步線程模型和異步線程模型的性能對(duì)比.
用例2: 重量級(jí)服務(wù)同步線程模型和異步線程模型的性能對(duì)比.
用例3: 重量級(jí)服務(wù)異步線程模型中所有消費(fèi)者流共享線程池和每個(gè)流獨(dú)享線程池的性能對(duì)比.
用例4: 重量級(jí)服務(wù)異步線程模型中每個(gè)流獨(dú)享線程池的對(duì)比的確定數(shù)量線程的線程池和線程數(shù)量可伸縮的線程池的性能對(duì)比.
由于筆者在發(fā)文的時(shí)候還沒有時(shí)間完成前面四種場(chǎng)景的壓測(cè),暫時(shí)留給讀者親自動(dòng)手,作為一個(gè)練習(xí).
盡管本文設(shè)計(jì)和實(shí)現(xiàn)的kclient項(xiàng)目提供了許多高級(jí)功能,并且使用起來(lái)方便,而且筆者在幾家公司里在線上進(jìn)行了應(yīng)用,已經(jīng)發(fā)揮了不少的作用,但是,還有一些細(xì)節(jié)需要提高.
用例1:kclient處理器項(xiàng)目中管理Restful服務(wù)暫時(shí)只提供了獲得狀態(tài)的API,需要進(jìn)行進(jìn)一步的豐富,增加對(duì)線程池的監(jiān)控,以及消息處理性能的監(jiān)控.
用例2:當(dāng)前注解@ErrorHandler里面的exception參數(shù)為必選,完全可以使用方法第一參數(shù)進(jìn)行推導(dǎo),簡(jiǎn)化開發(fā)人員配置的工作.
用例3:模板項(xiàng)目還不完善,需要增加啟動(dòng)和關(guān)閉腳本,這樣讀者可以直接拷貝使用.
用例4:盡管線上應(yīng)用已經(jīng)證明了kclient沒有性能問(wèn)題,但是開發(fā)一款中間件系統(tǒng)是需要閉環(huán)的,需要盡快把性能壓測(cè)這塊昨晚并且形成壓測(cè)報(bào)表.
文章來(lái)自微信公眾號(hào):聊聊架構(gòu)
轉(zhuǎn)載請(qǐng)注明本頁(yè)網(wǎng)址:
http://www.snjht.com/jiaocheng/4211.html