A. 消息隊列核心原理
消息隊列已經逐漸成為分布式應用場景、內部通信、以及秒殺等高並發業務場景的核心手段,它具有低耦合、可靠投遞、廣播、流量控制、最終一致性 等一系列功能。
無論是 RabbitMQ、RocketMQ、ActiveMQ、Kafka還是其它等,都有的一些基本原理、術語、機制等,總結分享出來,希望大家在使用消息隊列技術的時候能夠快速理解。
1.消息生產者Procer:發送消息到消息隊列。
2.消息消費者Consumer:從消息隊列接收消息。
3.Broker:概念來自與Apache ActiveMQ,指MQ的服務端,幫你把消息從發送端傳送到接收端。
4.消息隊列Queue:一個先進先出的消息存儲區域。消息按照順序發送接收,一旦消息被消費處理,該消息將從隊列中刪除。
1)消息的轉儲:在更合適的時間點投遞,或者通過一系列手段輔助消息最終能送達消費機。
2)規范一種範式和通用的模式,以滿足解耦、最終一致性、錯峰等需求。
3)其實簡單理解就是一個消息轉發器,把一次RPC做成兩次RPC。發送者把消息投遞到broker,broker再將消息轉發一手到接收端。
總結起來就是兩次RPC加一次轉儲,如果要做消費確認,則是三次RPC。
點對點模型 用於 消息生產者 和 消息消費者 之間 點到點 的通信。
點對點模式包含三個角色:
發布訂閱模型包含三個角色:
生產者發送一條消息到隊列queue,只有一個消費者能收到。
發布者發送到topic的消息,只有訂閱了topic的訂閱者才會收到消息。
基於Queue消息模型,利用FIFO先進先出的特性,可以保證消息的順序性。
即消息的Ackownledge確認機制,為了保證消息不丟失,消息隊列提供了消息Acknowledge機制,即ACK機制,當Consumer確認消息已經被消費處理,發送一個ACK給消息隊列,此時消息隊列便可以刪除這個消息了。如果Consumer宕機/關閉,沒有發送ACK,消息隊列將認為這個消息沒有被處理,會將這個消息重新發送給其他的Consumer重新消費處理。
主要是用「記錄」和「補償」的方式。
1.本地事務維護業務變化和通知消息,一起落地,然後RPC到達broker,在broker成功落地後,RPC返回成功,本地消息可以刪除。否則本地消息一直靠定時任務輪詢不斷重發,這樣就保證了消息可靠落地broker。
2.broker往consumer發送消息的過程類似,一直發送消息,直到consumer發送消費成功確認。
3.我們先不理會重復消息的問題,通過兩次消息落地加補償,下游是一定可以收到消息的。然後依賴狀態機版本號等方式做判重,更新自己的業務,就實現了最終一致性。
4.如果出現消費方處理過慢消費不過來,要允許消費方主動ack error,並可以與broker約定下次投遞的時間。
5.對於broker投遞到consumer的消息,由於不確定丟失是在業務處理過程中還是消息發送丟失的情況下,有必要記錄下投遞的IP地址。決定重發之前詢問這個IP,消息處理成功了嗎?如果詢問無果,再重發。
6.事務:本地事務,本地落地,補償發送。本地事務做的,是業務落地和消息落地的事務,而不是業務落地和RPC成功的事務。消息只要成功落地,很大程度上就沒有丟失的風險。
消息的收發處理支持事務,例如:在任務中心場景中,一次處理可能涉及多個消息的接收、處理,這應該處於同一個事務范圍內,如果一個消息處理失敗,事務回滾,消息重新回到隊列中。
消息的持久化,對於一些關鍵的核心業務來說是非常重要的,啟用消息持久化後,消息隊列宕機重啟後,消息可以從持久化存儲恢復,消息不丟失,可以繼續消費處理。
在實際生產環境中,使用單個實例的消息隊列服務,如果遇到宕機、重啟等系統問題,消息隊列就無法提供服務了,因此很多場景下,我們希望消息隊列有高可用性支持,例如RabbitMQ的鏡像集群模式的高可用性方案,ActiveMQ也有基於LevelDB+ZooKeeper的高可用性方案,以及Kafka的Replication機制等。
B. 到底什麼是消息隊列Java中如何實現消息隊列
「消息隊列」是在消息的傳輸過程中保存消息的容器。和我們學過的LinkedHashMap,TreeSet等一樣,都是容器。既然是容器,就有有自己的特性,就像LinkedHashMap是以鍵值對存儲。存取順序不變。而消息隊列,看到隊列就可以知道。這個容器裡面的消息是站好隊的,一般遵從先進先出原則。
java中已經為我們封裝好了很多的消息隊列。在java 1.5版本時推出的java.util.concurrent中有很多現成的隊列供我們使用。特性繁多,種類齊全。是你居家旅遊開發必備QAQ。
下面簡單列舉這個包中的消息隊列
:阻塞隊列 BlockingQueue
數組阻塞隊列 ArrayBlockingQueue
延遲隊列 DelayQueue
鏈阻塞隊列 LinkedBlockingQueue
具有優先順序的阻塞隊列 PriorityBlockingQueue
同步隊列 SynchronousQueue
阻塞雙端隊列 BlockingDeque
鏈阻塞雙端隊列 LinkedBlockingDeque
不同的隊列不同的特性決定了隊列使用的時機,感興趣的話你可以詳細了解。具體的使用方式我就不贅述了
C. 消息隊列之zeroMQ、rabbitMQ、kafka
首先消息是網路通訊的載體,隊列可以理解是一種先進先出的數據結構,消息隊列是存放消息的容器,是分布式系統中的重要組件。消息隊列的優勢在於:解耦、非同步、削峰,把相關性不
強的模塊獨立分開視為解耦,非同步就是非必要邏輯非同步方式處理,加快響應速度,削峰是避免短期高並發導致系統問題進行緩沖隊列處理。消息隊列的缺點在於:加強系統復雜性、系統可用性降低,使
用了消息隊列系統出現問題排查的范圍就變大、需要考慮消息隊列導致的問題。
本文說明主流的消息隊列,針對使用過的zeroMQ和rabbitMQ、Kakfa:
zeroMQ :C語言開發,號稱最快的消息隊列,本著命名zero的含義,中油中間架構使用簡單,表面上是基於socket的封裝套接字API,在多個節點應用場景下非常靈活、架構的可擴展性很強,
實現N到M的協同處理;
zmq的socket模式: req、rep、push、pull、pub、sub、router、dealer。
(1)req和rep:請求回應模型,req和rep都可以請求和回答,不同的只是req是先send再rec,rep是先rec再send。支持N個請求端一個接收端,也支持N個接收端一個請求端。N個接收端采
用rr負載均衡。 哪個是「一」端,哪個就bind埠,「N」端就只能connect,所以,req+rep無論誰bind埠,肯定要有一個是「一」。
(2) router和dealer:隨時可以發送和接收的req和rep,看起來router+dealer跟 req+rep屬於同類功能。因為router和dealer可以隨時發送接收,所以它們可以用來做路由。一個router用來響
應N個req,然後它在響應處理的時候,再通過另一個socket把請求扔出去,接收者是另外的M個rep,這就做到N:M。
(3)pub和sub :訂閱和推送,對應發布者和訂閱者。
(4)push和pull:就是管道,一個只推數據,一個只拉數據。
rabbitMQ :使用erlang語言開發,高並發特點,基於AMQP(即Advanced Message Queuing Protocol)的開源高級消費隊列,AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發布/
訂閱)、可靠性、安全),企業級適應性和穩定性,並且有WEB管理界面方便用戶查看和管理。以下是rabbitMQ的結構圖:
(1)Procer:數據發送方,一般一個Message有兩個部分:payload(有效載荷)和label(標簽),payload是數據實際載體,label是exchange的名字或者一個tag,決定發給哪個Consumer;
(2)Exchange: 內部 消息交換器,exchange從生產者那收到消息後,一般會指定一個Routing Key,來指定這個消息的路由規則,當然Routing Key需要與Exchange Type及Binding key聯合使用
才能最終生效,根據路由規則,匹配查詢表中的routing key,分發消息到queue中;
(3)binding:即綁定,綁定(Binding)Exchange與Queue的同時,一般會指定一個Binding key,但不一定會生效,依賴於Exchange Type;
(4)Queue:即隊列是rabbitmq內部對象,用於存儲消息,一個message可以被同時拷貝到多個queue中,queue對load balance的處理是完美的。對於多個Consumer來說,RabbitMQ 使用循
環的方式(round-robin)的方式均衡的發送給不同的Consumer;
(5)Connection與Channel: Connection 就是一個TCP的連接,Procer和Consumer都是通過TCP連接到RabbitMQ Server, Channel 是為了節省開銷建立在上述的TCP連接中的介面,大部
分的業務操作是在Channel這個介面中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等;
(6)Consumer:即數據的接收方,如果有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者;
(7)Broker: 即RabbitMQ Server,其作用是維護一條從Procer到Consumer的路線,保證數據能夠按照指定的方式進行傳輸;
(8)Virtual host:即虛擬主機,當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以劃分出多個vhost,每個用戶在自己的vhost創建exchange/queue;
rabbitMQ消息轉發中的路由轉發是重點,生產者Procer在發送消息時,都需要指定一個RoutingKey和Exchange,Exchange收到消息後可以看到消息中指定的RoutingKey,再根據當前
Exchange的ExchangeType,按一定的規則將消息轉發到相應的queue中去。三種Exchage type:
(1)Direct exchange :直接轉發路由,原理是通過消息中的routing key,與binding 中的binding-key 進行比對,若二者匹配,則將消息發送到這個消息隊列;
比如:消息生成者生成一個message(payload是1,routing key為蘋果),兩個binding(binding key分別為蘋果、香蕉);exchange比對消息的routing key和binding key後,將消息發給了queue1,消息消費者1獲得queue1的消息;
(2)Topic exchange: 通配路由,是direct exchange的通配符模式,
比如:消息生成者生成一個message(payload是1,routing key為quick.orange.rabbit),兩個binding(binding key分別為*.orange. 、 *.*.rabbit);exchange比對消息的routing key和binding key
後,exchange將消息分發給兩個queue,兩個消費者獲得queue的消息;
(3)Fanout exchange: 復制分發路由,原理是不需要routkey,當exchange收到消息後,將消息復制多份轉發給與自己綁定的消息隊列,
比如:消息生成者生成一個message(payload是1,routing key為蘋果),兩個binding(binding key分別為蘋果、香蕉);exchange將消息分發給兩個queue,兩個消費者獲得queue的消息;
rabbiMQ如何保證消息的可靠性?
(1)Message rability:消息持久化,非持久化消息保存在內存中,持久化消息寫入內存同時也寫入磁碟;
(2)Message acknowledgment:消息確認機制,可以要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後才將該消息從Queue中移
除。通過ACK。每個Message都要被acknowledged(確認,ACK)。
(3)生產者消息確認機制:AMQP事務機制、生產者消息確認機制(publisher confirm)。
最後, 對比一下zeroMQ、rabbitMQ、kafka主流的消息隊列的性能情況:
對比方向 概要
吞吐量 萬級 RabbitMQ 的吞吐量要比 十萬級甚至是百萬級Kafka 低一個數量級。ZeroMQ號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。
可用性 都可以實現高可用。RabbitMQ 都是基於主從架構實現高可用性。 kafka 也是分布式的,一個數據多個副本,少數機器宕機,不會丟失數據,不會導致不可用
時效性 RabbitMQ 基於erlang開發,所以並發能力很強,性能極其好,延時很低,達到微秒級。其他兩個個都是 ms 級。
功能支持 Kafka 功能較為簡單,主要支持簡單的MQ功能,在大數據領域實時計算以及日誌採集被大規模使用;ZeroMQ能夠 實現RabbitMQ不擅長的高級/復雜 的隊列
消息丟失 RabbitMQ有ack模型,也有事務模型,保證至少不會丟數據, Kafka 理論上不會丟失,但不排除批量情況下。
開發環境 RabbitMQ需要erlang支持、kafka基於zookeeper管理部署、zeroMQ程序編譯調用即可
封裝庫 基於c++開發,使用RabbitMQ-C,cppKafka,而zeroMQ基於C語言開發,無需封裝
D. Kafka 設計詳解之隊列
在 上文 中我們介紹了 Kafka 的網路通信,本文打算詳細分析 Kafka 的核心 — 隊列 的設計和實現,來對 Kafka 進行更深一步的了解。
隊列是一種先進先出(FIFO)的數據結構,它是 Kafka 中最重要的部分,負責收集生產者生產的消息,並將這些消息傳遞給消費者。要實現一個隊列有多種方式,Kafka 作為一個消息隊列中間件,在設計隊列時主要要考慮兩個問題:
乍一看到這個問題,我們會想,內存的讀取速度遠快於磁碟,如果追求性能,內存也充足的話,當然是將生產者產生的消息數據寫到內存(比如用一個數組或者鏈表來存儲隊列數據),供消費者消費。真的是這樣嗎?
下面我們依次分析下寫內存和寫磁碟文件的優缺點,首先,內存的優點是讀寫速度非常快,但是,如果我們的目標是設計「大數據量」下的「高吞吐量」的消息隊列,會有以下幾個問題:
接下來我們來分析一下磁碟,寫磁碟文件方式存儲隊列數據的優點就是能規避上述內存的缺點,但其有很嚴重的缺點,就是讀寫速度慢,如果純依靠磁碟,那消息隊列肯定做不到「高吞吐量」這個目標。
分析了內存跟磁碟的優缺點,好像我們還是只能選寫內存,但我們忽視了磁碟的兩個情況:一是磁碟慢是慢在隨機讀寫,如果是順序讀寫,他的速度能達到 600MB/sec(RAID-5 磁碟陣列),並不慢,如果我們盡可能地將數據的讀寫設計成順序的,可以大大提升性能。二是 現代的操作系統會(盡可能地)將磁碟里的文件進行緩存 。
有了操作系統級別的文件緩存,那用磁碟存儲隊列數據的方式就變得有優勢了。首先,磁碟文件的數據會有文件緩存,所以不必擔心隨機讀寫的性能;其次,同樣是使用內存,磁碟文件使用的是操作系統級別的內存,相比於在 Java 內存堆中存儲隊列,它沒有 GC 問題,也沒有 Java 對象的額外內存開銷,更可以規避應用重啟後的內存 load 數據耗時的問題,而且,文件緩存是操作系統提供的,因為我們只要簡單的寫磁碟文件,系統復雜性大大降低。
因此,Kafka 直接使用磁碟來存儲消息隊列的數據。
剛才我們已經決定用磁碟文件來存儲隊列數據,那麼要如何選擇數據結構呢?一般情況下,如果需要查找數據並隨機訪問,我們會用 B+ 樹來存儲數據,但其時間復雜度是 O(log N),由於我們設計的是消息隊列,我們可以完全順序的寫收到的生產者消息,消費者消費時,只要記錄下消費者當前消費的位置,往後消費就可以了,這樣可以對文件盡可能的進行順序讀寫,同時,時間復雜度是O(1)。其實,這跟我們寫日誌的方式很像,每條日誌順序 append 到日誌文件。
之前我們已經確定採用直接順序寫磁碟文件的方式來存儲隊列數據,下面我們來剖析下具體的實現細節。
在 Kafka 中,用一個文件夾存儲一條消息隊列,成為一個 Log,每條消息隊列由多個文件組成,每個文件稱為一個 LogSegment,每當一個 LogSegment 的大小到達閾值,系統就會重新生成一個 LogSegment;當舊的 LogSegment 過期需要清理時(雖然磁碟空間相對於內存會寬裕很多,我們可以保存更長時間的消息數據,比如一周,以供消費者更靈活的使用,但還是需要定期清理太老的數據),系統會根據清理策略刪除這些文件。
現在我們知道一個隊列(Log)是由多個隊列段文件(LogSegment)組成的,那麼 Kafka 是如何將這些文件邏輯上連接從而組成一條有序隊列的呢?在生成每個隊列段文件時,Kafka 用該段的初始位移來對其命名,如在新建一個隊列時,會初始化第一個隊列段文件,那麼其文件名就是0,假設每個段的大小是固定值 L,那麼第二個段文件名就是 L,第 N 個就是 (N - 1)* L。這樣,我們就可以根據文件名對段文件進行排序,排序後的順序就是整個隊列的邏輯順序。
了解了隊列的基本實現,下面我們就來分析下隊列的核心操作—讀和寫。
寫操作發生在生產者向隊列生產消息時,在上篇文章講網路通信時我們已經說到,所有的客戶端請求會根據協議轉到一個 Handler 來具體處理,負責寫操作的 Handler 叫 ProcerHandler,整個寫請求的流程如下:
之前我們說過,如果是順序寫,由於省掉了磁頭定址的時間,磁碟的性能還是很高的,我們看到 Kakfa 隊列是以順序方式寫的,所以性能很高。但是,如果一台 Kafka 伺服器有很多個隊列,而硬碟的磁頭是有限的,所以還是得在不同的隊列直接來回切換定址,性能會有所下降。
隊列的讀操作發送在消費者消費隊列數據時,由於隊列是線性的,只需要記錄消費者上次消費到了哪裡(offset),接下去消費就好了。那麼首先會有一個問題,由誰來記消費者到底消費到哪裡了?
一般情況下,我們會想到讓服務端來記錄各個消費者當前的消費位置,當消費者來拉數據,根據記錄的消費位置和隊列的當前位置,要麼返回新的待消費數據,要麼返回空。讓服務端記錄消費位置,當遇到網路異常時會有一些問題,比如服務端將消息發給消費者後,如果網路異常消費者沒有收到消息,那麼這條消息就被「跳過」了,當然我們可以借鑒二階段提交的思想,服務端將消息發送給消費者後,標記狀態為「已發送」,等消費者消費成功後,返回一個 ack 給服務端,服務端再將其標記為「成功消費」。不過這樣設計還是會有一個問題,如果消費者沒有返回 ack 給服務端,此時這條消息可能在已經被消費也可能還沒被消費,服務端無從得知,只能根據人為策略跳過(可能會漏消息)或者重發(可能存在重復數據)。另一個問題是,如果有很多消費者,服務端需要記錄每條消息的每個消費者的消費狀態,這在大數據的場景下,非常消耗性能和內存。
Kafka 將每個消費者的消費狀態記錄在消費者本身(隔一段時間將最新消費狀態同步到 zookeeper),每次消費者要拉數據,就給服務端傳遞一個 offset,告訴服務端從隊列的哪個位置開始給我數據,以及一個參數 length,告訴服務端最多給我多大的數據(批量順序讀數據,更高性能),這樣就能使服務端的設計復雜度大大降低。當然這解決不了一致性的問題,不過消費者可以根據自己程序特點,更靈活地處理事務。
下面就來分析整個讀的流程:
分布式系統中不可避免的會遇到一致性問題,主要是兩塊:生產者與隊列服務端之間的一致性問題、消費者與隊列服務端之間的一致性問題,下面依次展開。
當生產者向服務端投遞消息時,可能會由於網路或者其他問題失敗,如果要保證一致性,需要生產者在失敗後重試,不過重試又會導致消息重復的問題,一個解決方案是每個消息給一個唯一的 id,通過服務端的主動去重來避免重復消息的問題,不過這一機制目前 Kafka 還未實現。目前 Kafka 提供配置,供用戶不同場景下選擇允許漏消息(失敗後不重試)還是允許重復消息(失敗後重試)。
由於在消費者里我們可以自己控制消費位置,就可以更靈活的進行個性化設計。如果我們在拉取到消息後,先增加 offset,然後再進行消息的後續處理,如果在消息還未處理完消費者就掛掉,就會存在消息遺漏的問題;如果我們在拉取到消息後,先進行消息處理,處理成功後再增加 offset,那麼如果消息處理一半消費者掛掉,會存在重復消息的問題。要做到完全一致,最好的辦法是將 offset 的存儲與消費者放一起,每消費一條數據就將 offset+1。
本文介紹了 Kafka 的隊列實現以及其讀寫過程。Kafka 認為操作系統級別的文件緩存比 Java 的堆內存更省空間和高效,如果生產者消費者之間比較「和諧」的話,大部分的讀寫操作都會落在文件緩存,且在順序讀寫的情況下,硬碟的速度並不慢,因此選擇直接寫磁碟文件的方式存儲隊列。在隊列的讀寫過程中,Kafka 盡可能地使用順序讀寫,並使用零拷貝來優化性能。最後,Kafka 讓消費者自己控制消費位置,提供了更加靈活的數據消費方式。
E. 消息隊列會把消息 存儲到哪裡
消息隊列由內核創建,所以最後的數據存放在內核中,並由內核維護!
F. 消息隊列(mq)是什麼
消息隊列(英語:Message queue)是一種進程間通信或同一進程的不同線程間的通信方式,軟體的貯列用來處理一系列的輸入,通常是來自用戶。
消息隊列提供了非同步的通信協議,每一個貯列中的紀錄包含詳細說明的資料,包含發生的時間,輸入設備的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不需要同時與消息隊列交互。消息會保存在隊列中,直到接收者取回它。
一個WIMP環境像是Microsoft Windows,藉由優先的某些形式(通常是事件的時間或是重要性的順序)來存儲用戶產生的事件到一個事件貯列中。系統把每個事件從事件貯列中傳遞給目標的應用程序。
實現
實際上,消息隊列常常保存在鏈表結構中。擁有許可權的進程可以向消息隊列中寫入或讀取消息。
目前,有很多消息隊列有很多開源的實現,包括JBoss Messaging、JORAM、Apache ActiveMQ、Sun Open Message Queue、RabbitMQ、IBM MQ、Apache Qpid、Apache RocketMQ和HTTPSQS。
(6)主流消息隊列怎麼存儲擴展閱讀:
優缺點
消息隊列本身是非同步的,它允許接收者在消息發送很長時間後再取回消息,這和大多數通信協議是不同的。例如WWW中使用的HTTP協議(HTTP/2之前)是同步的,因為客戶端在發出請求後必須等待伺服器回應。然而,很多情況下我們需要非同步的通信協議。
比如,一個進程通知另一個進程發生了一個事件,但不需要等待回應。但消息隊列的非同步特點,也造成了一個缺點,就是接收者必須輪詢消息隊列,才能收到最近的消息。
和信號相比,消息隊列能夠傳遞更多的信息。與管道相比,消息隊列提供了有格式的數據,這可以減少開發人員的工作量。但消息隊列仍然有大小限制。
消息隊列除了可以當不同線程或進程間的緩沖外,更可以透過消息隊列當前消息數量來偵測接收線程或進程性能是否有問題。
G. 消息隊列(mq)是什麼
「消息隊列」是在消息的傳輸過程中保存消息的容器。
「消息」是在兩台計算機間傳送的數據單位。消息可以非常簡單,例如只包含文本字元串;也可以更復雜,可能包含嵌入對象。
消息被發送到隊列中。「消息隊列」是在消息的傳輸過程中保存消息的容器。消息隊列管理器在將消息從它的源中繼到它的目標時充當中間人。隊列的主要目的是提供路由並保證消息的傳遞;如果發送消息時接收者不可用,消息隊列會保留消息,直到可以成功地傳遞它。
(7)主流消息隊列怎麼存儲擴展閱讀:
隊列的介紹:循環隊列
在實際使用隊列時,為了使隊列空間能重復使用,往往對隊列的使用方法稍加改進:無論插入或刪除,一旦rear指針增1或front指針增1 時超出了所分配的隊列空間,就讓它指向這片連續空間的起始位置。
自己真從MaxSize-1增1變到0,可用取余運算rear%MaxSize和front%MaxSize來實現。這實際上是把隊列空間想像成一個環形空間,環形空間中的存儲單元循環使用,用這種方法管理的隊列也就稱為循環隊列。除了一些簡單應用之外,真正實用的隊列是循環隊列。
在循環隊列中,當隊列為空時,有front=rear,而當所有隊列空間全占滿時,也有front=rear。
為了區別這兩種情況,規定循環隊列最多隻能有MaxSize-1個隊列元素,當循環隊列中只剩下一個空存儲單元時,隊列就已經滿了。因此,隊列判空的條件時front=rear,而隊列判滿的條件時front=(rear+1)%MaxSize。
H. 消息隊列原理及選型
消息隊列(Message Queue)是一種進程間通信或同一進程的不同線程間的通信方式。
Broker(消息伺服器)
Broker的概念來自與Apache ActiveMQ,通俗的講就是MQ的伺服器。
Procer(生產者)
業務的發起方,負責生產消息傳輸給broker
Consumer(消費者)
業務的處理方,負責從broker獲取消息並進行業務邏輯處理
Topic(主題)
發布訂閱模式下的消息統一匯集地,不同生產者向topic發送消息,由MQ伺服器分發到不同的訂閱 者,實現消息的廣播
Queue(隊列)
PTP模式下,特定生產者向特定queue發送消息,消費者訂閱特定的queue完成指定消息的接收。
Message(消息體)
根據不同通信協議定義的固定格式進行編碼的數據包,來封裝業務數據,實現消息的傳輸
點對點模型用於消息生產者和消息消費者之間點到點的通信。
點對點模式包含三個角色:
每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,可以放在內存 中也可以持久化,直到他們被消費或超時。
特點:
發布訂閱模型包含三個角色:
多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
特點:
AMQP即Advanced Message Queuing Protocol,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP 的主要特徵是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
優點:可靠、通用
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。該協議支持所有平台,幾乎可以把所有聯網物品和外部連接起來,被用來當做感測器和致動器(比如通過Twitter讓房屋聯網)的通信協議。
優點:格式簡潔、佔用帶寬小、移動端通信、PUSH、嵌入式系統
STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息協議,是一種為MOM(Message Oriented Middleware,面向消息的中間件)設計的簡單文本協議。STOMP提供一個可互操作的連接格式,允許客戶端與任意STOMP消息代理(Broker)進行交互。
優點:命令模式(非topicqueue模式)
XMPP(可擴展消息處理現場協議,Extensible Messaging and Presence Protocol)是基於可擴展標記語言(XML)的協議,多用於即時消息(IM)以及在線現場探測。適用於伺服器之間的准即時操作。核心是基於XML流傳輸,這個協議可能最終允許網際網路用戶向網際網路上的其他任何人發送即時消息,即使其操作系統和瀏覽器不同。
優點:通用公開、兼容性強、可擴展、安全性高,但XML編碼格式佔用帶寬大
RabbitMQ 是實現 AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。 RabbitMQ 主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者無法快速消費,那麼需要一個中間層。保存這個數據。
RabbitMQ 是一個開源的 AMQP 實現,伺服器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
Channel(通道)
道是兩個管理器之間的一種單向點對點的的通信連接,如果需要雙向交流,可以建立一對通道。
Exchange(消息交換機)
Exchange類似於數據通信網路中的交換機,提供消息路由策略。
RabbitMq中,procer不是通過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行綁定,procer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由演算法,將消息路由給指定的queue。和Queue一樣,Exchange也可設置為持久化,臨時或者自動刪除。
Exchange有4種類型:direct(默認),fanout, topic, 和headers。
不同類型的Exchange轉發消息的策略有所區別:
Binding(綁定)
所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定可以是多對多的關系。
Routing Key(路由關鍵字)
exchange根據這個關鍵字進行消息投遞。
vhost(虛擬主機)
在RabbitMq server上可以創建多個虛擬的message broker,又叫做virtual hosts (vhosts)。每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost相當於物理的server,可以為不同app提供邊界隔離,使得應用安全的運行在不同的vhost實例上,相互之間不會干擾。procer和consumer連接rabbit server需要指定一個vhost。
假設P1和C1注冊了相同的Broker,Exchange和Queue。P1發送的消息最終會被C1消費。
基本的通信流程大概如下所示:
Consumer收到消息時需要顯式的向rabbit broker發送basic。ack消息或者consumer訂閱消息時設置auto_ack參數為true。
在通信過程中,隊列對ACK的處理有以下幾種情況:
即消息的Ackownledge確認機制,為了保證消息不丟失,消息隊列提供了消息Acknowledge機制,即ACK機制,當Consumer確認消息已經被消費處理,發送一個ACK給消息隊列,此時消息隊列便可以刪除這個消息了。如果Consumer宕機/關閉,沒有發送ACK,消息隊列將認為這個消息沒有被處理,會將這個消息重新發送給其他的Consumer重新消費處理。
消息的收發處理支持事務,例如:在任務中心場景中,一次處理可能涉及多個消息的接收、處理,這應該處於同一個事務范圍內,如果一個消息處理失敗,事務回滾,消息重新回到隊列中。
消息的持久化,對於一些關鍵的核心業務來說是非常重要的,啟用消息持久化後,消息隊列宕機重啟後,消息可以從持久化存儲恢復,消息不丟失,可以繼續消費處理。
fanout 模式
模式特點:
direct 模式
任何發送到Direct Exchange的消息都會被轉發到routing_key中指定的Queue。
如果一個exchange 聲明為direct,並且bind中指定了routing_key,那麼發送消息時需要同時指明該exchange和routing_key。
簡而言之就是:生產者生成消息發送給Exchange, Exchange根據Exchange類型和basic_publish中的routing_key進行消息發送 消費者:訂閱Exchange並根據Exchange類型和binding key(bindings 中的routing key) ,如果生產者和訂閱者的routing_key相同,Exchange就會路由到那個隊列。
topic 模式
前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。
topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同。
它約定:
以上圖中的配置為例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,因為它們沒有匹配任何bindingKey。
RabbitMQ,部署分三種模式:單機模式,普通集群模式,鏡像集群模式。
普通集群模式
多台機器部署,每個機器放一個rabbitmq實例,但是創建的queue只會放在一個rabbitmq實例上,每個實例同步queue的元數據。
如果消費時連的是其他實例,那個實例會從queue所在實例拉取數據。這就會導致拉取數據的開銷,如果那個放queue的實例宕機了,那麼其他實例就無法從那個實例拉取,即便開啟了消息持久化,讓rabbitmq落地存儲消息的話,消息不一定會丟,但得等這個實例恢復了,然後才可以繼續從這個queue拉取數據, 這就沒什麼高可用可言,主要是提供吞吐量 ,讓集群中多個節點來服務某個queue的讀寫操作。
鏡像集群模式
queue的元數據和消息都會存放在多個實例,每次寫消息就自動同步到多個queue實例里。這樣任何一個機器宕機,其他機器都可以頂上,但是性能開銷太大,消息同步導致網路帶寬壓力和消耗很重,另外,沒有擴展性可言,如果queue負載很重,加機器,新增的機器也包含了這個queue的所有數據,並沒有辦法線性擴展你的queue。此時,需要開啟鏡像集群模式,在rabbitmq管理控制台新增一個策略,將數據同步到指定數量的節點,然後你再次創建queue的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。
Kafka 是 Apache 的子項目,是一個高性能跨語言的分布式發布/訂閱消息隊列系統(沒有嚴格實現 JMS 規范的點對點模型,但可以實現其效果),在企業開發中有廣泛的應用。高性能是其最大優勢,劣勢是消息的可靠性(丟失或重復),這個劣勢是為了換取高性能,開發者可以以稍降低性能,來換取消息的可靠性。
一個Topic可以認為是一類消息,每個topic將被分成多個partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條消息。它唯一的標記一條消息。kafka並沒有提供其他額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行「隨機讀寫」。
Kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即使消息被消費,消息仍然不會被立即刪除。日誌文件將會根據broker中的配置要求,保留一定的時間之後刪除;比如log文件保留2天,那麼兩天後,文件會被清除,無論其中的消息是否被消費。kafka通過這種簡單的手段,來釋放磁碟空間,以及減少消息消費之後對文件內容改動的磁碟IO開支。
對於consumer而言,它需要保存消費消息的offset,對於offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費。事實上consumer可以使用任意順序消費消息,它只需要將offset重置為任意值。(offset將會保存在zookeeper中,參見下文)
kafka集群幾乎不需要維護任何consumer和procer狀態信息,這些信息有zookeeper保存;因此procer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響。
partitions的設計目的有多個。最根本原因是kafka基於文件存儲。通過分區,可以將日誌內容分散到多個server上,來避免文件尺寸達到單機磁碟的上限,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions,來消息保存/消費的效率。此外越多的partitions意味著可以容納更多的consumer,有效提升並發消費的能力。(具體原理參見下文)。
一個Topic的多個partitions,被分布在kafka集群中的多個server上;每個server(kafka實例)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition將會被備份到多台機器上,以提高可用性。
基於replicated方案,那麼就意味著需要對多個備份進行調度;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那麼將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進,同步消息即可。由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩定。
Procers
Procer將消息發布到指定的Topic中,同時Procer也能決定將此消息歸屬於哪個partition;比如基於"round-robin"方式或者通過其他的一些演算法等。
Consumers
本質上kafka只支持Topic。每個consumer屬於一個consumer group;反過來說,每個group中可以有多個consumer。發送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費。
如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間負載均衡。
如果所有的consumer都具有不同的group,那這就是"發布-訂閱";消息將會廣播給所有的消費者。
在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息。kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的。事實上,從Topic角度來說,消息仍不是有序的。
Kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到消息。
Guarantees
Kafka就比較適合高吞吐量並且允許少量數據丟失的場景,如果非要保證「消息可靠傳輸」,可以使用JMS。
Kafka Procer 消息發送有兩種方式(配置參數 procer.type):
對於同步方式(procer.type=sync)?Kafka Procer 消息發送有三種確認方式(配置參數 acks):
kafka的設計初衷是希望作為一個統一的信息收集平台,能夠實時的收集反饋信息,並需要能夠支撐較大的數據量,且具備良好的容錯能力。
持久性
kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的本身特性。且無論任何OS下,對文件系統本身的優化幾乎沒有可能。文件緩存/直接內存映射等是常用的手段。因為kafka是對日誌文件進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO調用的次數。
性能
需要考慮的影響性能點很多,除磁碟IO之外,我們還需要考慮網路IO,這直接關繫到kafka的吞吐量問題。kafka並沒有提供太多高超的技巧;對於procer端,可以將消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對於consumer端也是一樣,批量fetch多條消息。不過消息量的大小可以通過配置文件來指定。對於kafka broker端,似乎有個sendfile系統調用可以潛在的提升網路IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即可,而無需進程再次和交換。 其實對於procer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮。可以將任何在網路上傳輸的消息都經過壓縮。kafka支持gzip/snappy等多種壓縮方式。
生產者
負載均衡: procer將會和Topic下所有partition leader保持socket連接;消息由procer直接通過socket發送到broker,中間不會經過任何「路由層「。事實上,消息被路由到哪個partition上,有procer客戶端決定。比如可以採用「random「「key-hash「「輪詢「等,如果一個topic中有多個partitions,那麼在procer端實現「消息均衡分發「是必要的。
其中partition leader的位置(host:port)注冊在zookeeper中,procer作為zookeeper client,已經注冊了watch用來監聽partition leader的變更事件。
非同步發送:將多條消息暫且在客戶端buffer起來,並將他們批量的發送到broker,小數據IO太多,會拖慢整體的網路延遲,批量延遲發送事實上提升了網路效率。不過這也有一定的隱患,比如說當procer失效時,那些尚未發送的消息將會丟失。
消費者
consumer端向broker發送「fetch」請求,並告知其獲取消息的offset;此後consumer將會獲得一定條數的消息;consumer端也可以重置offset來重新消費消息。
在JMS實現中,Topic模型基於push方式,即broker將消息推送給consumer端。不過在kafka中,採用了pull方式,即consumer在和broker建立連接之後,主動去pull(或者說fetch)消息;這中模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch消息並處理,且可以控制消息消費的進度(offset);此外,消費者可以良好的控制消息消費的數量,batch fetch。
其他JMS實現,消息消費的位置是有prodiver保留,以便避免重復發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態。這就要求JMS broker需要太多額外的工作。在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的。當消息被consumer接收之後,consumer可以在本地保存最後消息的offset,並間歇性的向zookeeper注冊offset。由此可見,consumer客戶端也很輕量級。
對於JMS實現,消息傳輸擔保非常直接:有且只有一次(exactly once)。
在kafka中稍有不同:
at most once: 消費者fetch消息,然後保存offset,然後處理消息;當client保存offset之後,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理。那麼此後"未處理"的消息將不能被fetch到,這就是"at most once"。
at least once: 消費者fetch消息,然後處理消息,然後保存offset。如果消息處理成功之後,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態。
exactly once: kafka中並沒有嚴格的去實現(基於2階段提交,事務),我們認為這種策略在kafka中是沒有必要的。
通常情況下「at-least-once」是我們首選。(相比at most once而言,重復接收數據總比丟失數據要好)。
kafka高可用由多個broker組成,每個broker是一個節點;
創建一個topic,這個topic會劃分為多個partition,每個partition存在於不同的broker上,每個partition就放一部分數據。
kafka是一個分布式消息隊列,就是說一個topic的數據,是分散放在不同的機器上,每個機器就放一部分數據。
在0.8版本以前,是沒有HA機制的,就是任何一個broker宕機了,那個broker上的partition就廢了,沒法寫也沒法讀,沒有什麼高可用性可言。
0.8版本以後,才提供了HA機制,也就是就是replica副本機制。每個partition的數據都會同步到其他的機器上,形成自己的多個replica副本。然後所有replica會選舉一個leader出來,那麼生產和消費都跟這個leader打交道,然後其他replica就是follower。
寫的時候,leader會負責把數據同步到所有follower上去,讀的時候就直接讀leader上數據即可。
kafka會均勻的將一個partition的所有replica分布在不同的機器上,從而提高容錯性。
如果某個broker宕機了也沒事,它上面的partition在其他機器上都有副本的,如果這上面有某個partition的leader,那麼此時會重新選舉一個新的leader出來,大家繼續讀寫那個新的leader即可。這就有所謂的高可用性了。
寫數據的時候,生產者就寫leader,然後leader將數據落地寫本地磁碟,接著其他follower自己主動從leader來pull數據。一旦所有follower同步好數據了,就會發送ack給leader,leader收到所有follower的ack之後,就會返回寫成功的消息給生產者。
消息丟失會出現在三個環節,分別是生產者、mq中間件、消費者:
RabbitMQ
Kafka
大體和RabbitMQ相同。
Rabbitmq
需要保證順序的消息投遞到同一個queue中,這個queue只能有一個consumer,如果需要提升性能,可以用內存隊列做排隊,然後分發給底層不同的worker來處理。
Kafka
寫入一個partition中的數據一定是有序的。生產者在寫的時候 ,可以指定一個key,比如指定訂單id作為key,這個訂單相關數據一定會被分發到一個partition中去。消費者從partition中取出數據的時候也一定是有序的,把每個數據放入對應的一個內存隊列,一個partition中有幾條相關數據就用幾個內存隊列,消費者開啟多個線程,每個線程處理一個內存隊列。
I. 消息隊列(mq)是什麼
是生產者先將消息投遞一個叫隊列的容器中,然後再從這個容器中取出消息,最後再轉發給消費者。
消息隊列是 Microsoft 的消息處理技術,它在任何安裝 Microsoft Windows 的計算機組合中,為任何應用程序提供消息處理和消息隊列功能,無論這些計算機是否在同一個網路上或者是否同時聯機。
消息隊列網路是能夠相互間來回發送消息的任何一組計算機。網路中的不同計算機在確保消息順利處理的過程中扮演不同的角色。它們中有些提供路由信息以確定如何發送消息,有些保存整個網路的重要信息,而有些只是發送和接收消息。
消息隊列的類型介紹:
消息隊列目前主要有兩種類型:POSIX消息隊列以及系統V消息隊列,系統V消息隊列目前被大量使用。每個消息隊列都有一個隊列頭,用結構struct msg_queue來描述。隊列頭中包含了該消息隊列的大量信息。包括消息隊列鍵值、用戶ID、組ID、消息隊列中消息數目等等。
消息隊列就是一個消息的鏈表,可以把消息看作一個記錄,具有特定的格式以及特定的優先順序。對消息隊列有寫許可權的進程可以向消息隊列中按照一定的規則添加新消息;對消息隊列有讀許可權的進程則可以從消息隊列中讀走消息。消息隊列是隨內核持續的。