1. 關於ActiveMQ的配備怎麼解決
關於ActiveMQ的配置
目前常用的消息隊列組建無非就是MSMQ和ActiveMQ,至於他們的異同,這里不想做過多的比較。簡單來說,MSMQ內置於微軟操作系統之中,在部署上包含一個隱性條件:Server需要是微軟操作系統。(對於這點我並去調研過MSMQ是否可以部署在非微軟系統,比如:Linux,只是拍腦袋想了想,感覺上是不可以)。對於ActiveMQ,微軟系統和Linux都是可以部署的。從功能方面來說,一般最常用的就是:消息的收/發,感覺差異不大。從性能上來說,一般的說法是ActiveMQ略高。在穩定性上,個人感覺MSMQ更好。如果這兩種常用隊列都用過的同學,應該來說最大的差異在於:MSMQ如果要訪問遠程隊列(比如機器A上的程序訪問機器B上的隊列),會比較惡心。在數據量比較大的情況之下,一般來說隊列伺服器會專門的一台或者多台(多台的話,用程序去做熱備+負載比較方便,也不需要額外的硬體成本。簡單來說做法可以這樣:消息發送的時候隨機向著多台隊列伺服器上發送消息;接受的時候開多個線程去分別監聽;熱備方面,可以維護一個帶狀態的隊列連接池,如果消息收發失敗那麼將狀態置為不可用,然後起一個線程去定時監測壞的連接是否可用,這個過程一般情況下可以不用加鎖,為什麼,大家根據各自需要去取捨吧)。最近搞完了短彩信的網關連接服務,這兩種隊列我均使用了。大致的過程是這樣的:上層應用如果要發端彩信,那麼將消息直接發送至ActiveMQ(目前用的就是上面說的多台熱備+負載,因為實際中下行量非常大5千萬條/天以上),然後端彩信網關連接服務部署多套,每套均依賴本機的MSMQ。為什麼呢?用ActiveMQ的原因是:上層應用程序和網關連接服務彼此獨立,消息需要跨機訪問。用MSMQ的原因是:ActiveMQ中的數據是一條不分省的大隊列,網關連接服務需要按省流控,所以端彩信網關連接服務:首先把消息從ActiveMQ取出來,然後存至本機上的分省MSMQ,這樣做另外的一個好處就是:ActiveMQ不至於過多擠壓,他的數據會分攤到N台短彩信網關連接服務所部署的機器上的MSMQ之中,也就說MSMQ可以起到分攤數據和緩沖的作用。
在之前的隨筆中,已經介紹過MSMQ,現在先介紹一下ActiveMQ一些配置,目前好像ActiveMQ配置上的介紹還比較少。以下是自己總結一些相關資料,貼出來給大家共享
一)問題分析和解決
1)KahaDb和AMQ Message Store兩種持久方式如何選擇?
官方:
From 5.3 onwards - we recommend you use KahaDB - which offers improved scalability and recoverability over the AMQ Message Store.
The AMQ Message Store which although faster than KahaDB - does not scales as well as KahaDB and recovery times take longer.
非官方:
kaha文件系統實際上上是一個文件索引系統,有兩部分組成,一個是數據文件系統,由一個個獨立的文件組成,預設文件大小是32M大(可配置),另外一個是索引文件系統,記錄消息在數據文件中的位置信息以及數據文件中的空閑塊信息。數據文件是存儲到硬碟上的,索引文件是緩存在內存中的。所以這個存儲系統對大消息存儲有利,象我們的memberId之類的文本消息,實際上是浪費,索引比消息還大,哈。
我方分析:
推薦: Amq持久方式
理由:雖然官方推薦使用KahaDB持久方式,但其提到的優勢:可伸縮性和恢復性較好,對於我們實際的應用意義不大。從我們自己的使用經驗來看,KahaDB持久方式,Data文件是一個大文件(感覺文件過大後,造成隊列服務癱死的可能性會增大),從官網的相關配置(附錄1)也找不到哪裡可以設置數據的文件的最大Size。)而Amq持久方式可以設置Data文件最大Size,這樣可以保證即時消息積壓很多,Data文件也不至於過大。
2)錯誤:Channel was inactive for too long
解決方法:
在建立連接的Uri中加入: wireFormat.maxInactivityDuration=0
參考資源:
http://jinguo.iteye.com/blog/243153
You can do the following to fix the issues:
1) Append max inactivity ration to your Uri in the format below: wireFormat.maxInactivityDuration=0
2) Use the same Uri at the client side as well as at the server side
Regards,
如果不這樣設置,對應的錯誤會出現:
2008-05-07 09:22:56,343 [org.apache.activemq.ActiveMQConnection]-[WARN] Async exception with no exception listener: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
ActiveMQ的tcp url:tcp://localhost:61616後面要加入?wireFormat.maxInactivityDuration=0 這樣的參數,否則當一段時間沒有消息發送時會拋出 "Channel was inactive for too long"異常
3)錯誤:Wire format negotiation timeout: peer did not send his wire format.
解決方法:
1)關閉ActiveMqLog4j
打開:conf/log4j.properties
將:log4j.rootLogger=INFO, console, logfile
修改為:log4j.rootLogger=OFF
2)在建立連接的Uri中加入: =30000
例如北京的測試環境連接Uri:
tcp://192.168.199.80:61616?wireFormat.maxInactivityDuration=0&=30000&connection.AsyncSend=true
參考資源:
http://activemq.apache.org/javaxjmsjmsexception-wire-format-negociation-timeout-peer-did-not-send-his-wire-format.html
If you get exception like this,it can mean one of three things:
1. You're connecting to the port not used by ActiveMQ TCP transport
Make sure to check that you're connecting to the appropriate host:port
2. You're using log4j JMS appender and doesn't filter out ActiveMQ log messages
Be sure to read How do I use log4j JMS appender with ActiveMQ and more importantly to never send ActiveMQ log messages to JMS appender
3. Your broker is probably under heavy load (or network connection is unreliable), so connection setup cannot be completed in a reasonable time
If you experience sporadic exceptions like this, the best solution is to use failover transport, so that your clients can try connecting again if the first attempt fails. If you're getting these kind of exceptions more frequently you can also try extending wire format negotiation period (default 10 sec). You can do that by using wireFormat. property on the connection URL in your client.
For example
tcp://localhost:61616?wireFormat.=30000
will use 30 sec timeout.(貌似有問題!!!)
4)錯誤:Out of memory
解決方法:
1) 設置Java最大內存限制為合適大小:
Bin/activemq.bat 中ACTIVEMQ_OPTS=-Xmx512M(默認是512)
2)Activemq.xml配置節:systemUsage/ systemUsage配置大小合適,並且特別注意:大於所有rable desitination設置的memoryUsage之和。
備註:
1)尖括弧:「>」代表通配符
2)ACTIVEMQ_OPTS的配置〉=memoryUsage中配置〉=所有rable desitination設置之和
3)SystemUsage配置設置了一些系統內存和硬碟容量,當系統消耗超過這些容量設置時,amq會「slow down procer」,還是很重要的。
參考資料:
http://m.oschina.net/blog/26216
參考-- http://activemq.apache.org/javalangoutofmemory.html
對於MQ的內容實用是可管理和可配置的。首先需要判斷的是MQ的哪部分系統因內存不足而導致泄漏,是JVM,broker還是消費者、生產者?
一、內存管理
JVM內存管理:
1. 用bin/activemq命令在獨立JVM中運行broker。用-Xmx和-Xss命令即可(activemq.bat文件中修改ACTIVEMQ_OPTS選項參數即可);
2. 默認情況下,MQ用512M的JVM;
broker內存管理:
1. broker使用的內存並不是由JVM的內存決定的。雖然受到JVM的限制,但broker確實獨立管理器內存;
2. systemUsage和destination的內存限制與broker內存息息相關;
3. MQ中內存的關系是:JVM->Broker->broker features;
4. 所有destination的內存總量不能超過broker的總內存;
消費者:
1. 由於消息大小可以配置,prefetch limit往往是導致內存溢出的主要原因;
2. 減少prefetch limit的大小,會減少消費者內存中存儲的消息數量;
生產者:
1. 除非消息數量超過了broker資源的限制,否則生產者不會導致內存溢出;
2. 當內存溢出後,生產者會收到broker的阻塞信息提示;
二、其他
將消息緩沖之硬碟:
1. 只有當消息在內存中存儲時,才允許消息的快速匹配與分發,而當消費者很慢或者離開時,內存可能會耗盡;
2. 當destination到達它的內存臨界值時,broker會用消息游標來緩存非持久化的消息到硬碟。
3. 臨界值在broker中通過memoryUsage和systemUsage兩個屬性配置,請參考activemq.xml;
4. 對於緩慢的消費者,當尚未耗盡內存或者轉變為生產者並發控制模式前,這個特性允許生產者繼續發送消息到broker;
5. 當有多個destination的時候,默認的內存臨界值可能被打破,而這種情況將消息緩存到硬碟就顯得很有意義;
6. precentUsage配置:使用百分比來控制內存使用情況;
多個線程:
1. 默認情況下,MQ每個destination都對應唯一的線程;
2. -Dorg.apache.activema.UseDedicatedTaskRunner=false(activemq.bat文件中修改ACTIVEMQ_OPTS選項參數即可),用線程池來限制線程的數量,從而減少內存消耗;
大數據傳輸:
1. destination policies--maxPageSize:控制進入內存中的消息數量;lazyDispatch:增加控制使用當前消費者列表的預取值;
2. 使用blogMessage或者streamsMessage類型來進行大量文件的傳輸;
泄漏JMS資源:
1. 當session或者procer或者consumer大量存在而沒有關閉的時候;
2. 使用PooledConnectionFactory;
2. rabbitmq 怎麼設置集群發送 ip
AMQP(高級消息隊列協議)是一個非同步消息傳遞所使用的應用層協議規范,作為線路層協議,而不是API(例如JMS),AMQP客戶端能夠無視消息的來源任意發送和接受信息。AMQP的原始用途只是為金融界提供一個可以彼此協作的消息協議,而現在的目標則是為通用消息隊列架構提供通用構建工具。因此,面向消息的中間件(MOM)系統,例如發布梁培/訂閱隊列,沒有作為基本元素實現。反而通過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規范的一部分,形成了在線路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如之前提到的發布/訂閱,隊列,事務以及流數據,並且添加了額外的特性,例如更易於擴展,基於內容的路由。AMQP當中有四個概念非常重要virtualhost,虛擬主機exchange,交換機queue,隊列binding,綁定一個虛擬主機持有一組交換機、隊列和綁定。為什麼需要多個虛擬主機呢?因為RabbitMQ當中,用戶只能在虛擬主機的粒度進行許可權控制。因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ伺服器都有一個默認的虛擬主機/。何謂虛擬主機(virtualhost),交換機(exchange),隊列(queue)和綁定(binding)隊列(Queues)是你的消息(messages)的終點,可以理解成裝消息的容器。消息就一直在裡面,直到有客戶端(也就是消費者,Consumer)連接到這個隊帆純列並且將其取走為止。不過,也可以將一個隊列配置成這樣的:一旦消息進入這個隊列,此消息就被刪除。隊列是由消費者(Consumer)通過程序建立的,不是通過配置文件或者命令行工具。這沒什麼問題,如果一個消費者試圖創建一個已經存在的隊列,RabbitMQ會直接忽略這個請求。因此我們可以將消息隊列的配置寫在應用程序的代碼裡面。而要把一個消息放進隊列前,需要有一個交換機(Exchange)。交換機(Exchange)可以理解成具有路由表的路由程序。每個消息都有一個稱為路由鍵(routingkey)的屬性,就是一個簡單的字元串。交換機當中有一系列的綁定(binding),即路由規則(routes)。(例如,指明具有路由鍵「X」的消息要到名為timbuku的隊列當中去。)消費者程序(Consumer)要負責創建你的交換機。交換機可以存在多態渣咐個,每個交換機在自己獨立的進程當中執行,因此增加多個交換機就是增加多個進程,可以充分利用伺服器上的CPU核以便達到更高的效率。例如,在一個8核的伺服器上,可以創建5個交換機來用5個核,另外3個核留下來做消息處理。類似的,在RabbitMQ的集群當中,你可以用類似的思路來擴展交換機一邊獲取更高的吞吐量。交換機如何判斷要把消息送到哪個隊列?你需要路由規則,即綁定(binding)。一個綁定就是一個類似這樣的規則:將交換機「desert(沙漠)」當中具有路由鍵「阿里巴巴」的消息送到隊列「hideout(山洞)」裡面去。換句話說,一個綁定就是一個基於路由鍵將交換機和隊列連接起來的路由規則。例如,具有路由鍵「audit」的消息需要被送到兩個隊列,「log-forever」和「alert-the-big-de」。要做到這個,就需要創建兩個綁定,每個都連接一個交換機和一個隊列,兩者都是由「audit」路由鍵觸發。在這種情況下,交換機會復制一份消息並且把它們分別發送到兩個隊列當中。交換機不過就是一個由綁定構成的路由表。交換機有多種類型。他們都是做路由的,但是它們接受不同類型的綁定。為什麼不創建一種交換機來處理所有類型的路由規則呢?因為每種規則用來做匹配分子的CPU開銷是不同的。例如,一個「topic」類型的交換機試圖將消息的路由鍵與類似「dogs.*」的模式進行匹配。匹配這種末端的通配符比直接將路由鍵與「dogs」比較(「direct」類型的交換機)要消耗的CPU。如果你不需要「topic」類型的交換機帶來的靈活性,你可以通過使用「direct」類型的交換機獲取更高的處理效率。
3. SpringBoot使用RabbitMQ看這幾篇就夠了(配置篇)!
上篇我們說到了消息隊列RabbitMQ的模式概念,那麼這里將會針對模式使用SpringBoot聯合RabbitMQ做一個案例,實現消息的生產和消費。
這一篇也是這個主題的最後一篇了,建議配合著看沖山。助於理解。
博主會將Demo工程放在Gitee上,有興簡判豎趣的可以拉下來自己試試。
Gitee地址: https://gitee.com/lemon_ant/os.git
新建SpringBoot項目
添加配置文件
添加pom文件
啟動類
[圖片上傳失敗...(image-3e7425-1591871192134)]
[圖片上傳失敗...(image-e9beeb-1591871192134)]
<font color=red>注意看時間,說明消息是輪詢分發的,一個消息只由一個消費者消費。</font>
<font color=red>注意看時間,交換機會將消息推送到所有綁定到它的隊列。</font>
[圖片上傳失敗...(image-1c2e63-1591871192134)]
[圖片上傳失敗...(image-c0c993-1591871192134)]
<font color=red>我這里測試傳的就是routing key,方便看。</font>
[圖片上傳失敗...(image-610dd9-1591871192134)]
[圖片上傳失敗...(image-d8749-1591871192134)]
[圖片上傳失敗...(image-25a6c2-1591871192134)]
<font color=red>這里用時間來區別。</font>
消息隊列在攔大這里基本就結束了,結合前面兩篇基本就能夠了解隊列的基本概念和用法了。
4. 消息隊列之RabbitMQ-分布式部署
RabbitMQ分布式部署有3種方式:
Federation與Shovel都是以插件的形式來實現,復雜性相對高,而集群是RabbitMQ的自帶屬性,相對簡單。
這三種方式並不是互斥的,可以根據需求選擇相互組合來達到目的。
RabbitMQ本身是基於Erlang編寫,Erlang語言天生具備分布式特性(通過同步Erlang集群各節點的magic cookie來實現)。
因此,RabbitMQ天然支持Clustering。這使得RabbitMQ本身不需要像ActiveMQ、Kafka那樣通過ZooKeeper分別來實現HA方案和保存集群的元數據。集群是保證可靠性的一種方式,同時可以通過水平擴展以達到增加消息吞吐量能力的目的。
我們把部署RabbitMQ的機器稱為節點,也就是broker。broker有2種類型節點: 磁碟節點 和 內存節點 。顧名思義,磁碟節點的broker把元數據存儲在磁碟中,內存節點把元數據存儲在內存中,很明顯,磁碟節點的broker在重啟後元數據可以通過讀取磁碟進行重建,保證了元數據不丟失,內存節點的broker可以獲得更高的性能,但在重啟後元數據就都丟了。
元數據包含以下內容:
單節點系統必須是磁碟節點 ,否則每次你重啟RabbitMQ之後所有的系統配置信息都會丟失。
集群中至少有一個磁碟節點 ,當節點加入和離開集群時,必須通知磁碟 節點。
如果集群中的唯一一個磁碟節點,結果這個磁碟節點還崩潰了,那會發生什麼情況?集群依然可以繼續路由消息(因為其他節點元數據在還存在),但無法做以下操作:
也就是說,如果唯一磁碟的磁碟節點崩潰, 集群是可以保持運行的,但不能更改任何東西 。為了增加可靠性,一般會在集群中設置兩個磁碟節點,只要任何一個處於工作狀態,就可以保障集群的正常服務。
RabbitMQ的集群模式分為兩種: 普通模式 與 鏡像模式 。
普通模式,也是默認的集群模式。
對於Queue來說, 消息實體只存在於其中一個節點 ,A、B兩個節點僅有相同的元數據,即隊列結構。當消息進入A節點的Queue中後,consumer從B節點拉取時,RabbitMQ會臨時在A、B間進行消息傳輸,把A中的消息實體取出並經過B發送給consumer。所以consumer應盡量連接每一個節點,從中取消息。即對於同一個邏輯隊列,要在多個節點建立物理Queue。否則無論consumer連A或B,出口總在A,會產生瓶頸。
隊列所在的節點稱為 宿主節點 。
隊列創建時,只會在宿主節點創建隊列的進程,宿主節點包含完整的隊列信息,包括元數據、狀態、內容等等。因此, 只有隊列的宿主節點才能知道隊列的所有信息 。
隊列創建後,集群只會同步隊列和交換器的元數據到集群中的其他節點,並不會同步隊列本身,因此 非宿主節點就只知道隊列的元數據和指向該隊列宿主節點的指針 。
假如現在一個客戶端需要對Queue A進行發布或者訂閱,發起與集群的連接,有兩種可能的場景:
由於節點之間存在路由轉發的情況,對延遲非常敏感,應當只在本地區域網內使用,在廣域網中不應該使用集群,而應該用Federation或者Shovel代替。
這樣的設計,保證了不論從哪個broker中均可以消費所有隊列的數據,並分擔了負載,因此,增加broker可以線性提高服務的性能和吞吐量。
但該方案也有顯著的缺陷,那就是 不能保證消息不會丟失 。當集群中某一節點崩潰時,崩潰節點所在的隊列進程和關聯的綁定都會消失,附加在那些隊列上的消費者也會丟失其訂閱信息,匹配該隊列的新消息也會丟失。比如A為宿主節點,當A節點故障後,B節點無法取到A節點中還未消費的消息實體。如果做了消息持久化,那麼得等A節點恢復,然後才可被消費;如果沒有持久化的話,然後就沒有然後了……
肯定有不少同學會問,想要實現HA方案,那將RabbitMQ集群中的所有Queue的完整數據在所有節點上都保存一份不就可以了嗎?比如類似MySQL的主主模式,任何一個節點出現故障或者宕機不可用時,那麼使用者的客戶端只要能連接至其他節點,不就能夠照常完成消息的發布和訂閱嗎?
RabbitMQ這么設計是基於性能和存儲空間上來考慮:
引入 鏡像隊列 (Mirror Queue)的機制,可以將隊列鏡像到集群中的其他Broker節點之上,如果集群中的一個節點失效了,隊列能夠自動切換到鏡像中的另一個節點上以保證服務的可用性。
一個鏡像隊列中包含有1個主節點master和若干個從節點slave。其主從節點包含如下幾個特點:
該模式和普通模式不同之處在於,消息實體會主動在鏡像節點間同步,而不是在consumer取數據時臨時拉取。該模式帶來的副作用也很明顯,除了降低系統性能外,如果鏡像隊列數量過多,加之大量的消息進入,集群內部的網路帶寬將會被這種同步通訊大大消耗掉。所以在對可靠性要求較高的場合中適用。
一個隊列想做成鏡像隊列,需要先設置policy,然後客戶端創建隊列的時候,rabbitmq集群根據隊列名稱自動設置為普通隊列還是鏡像隊列。
鏡像隊列的配置通過添加policy完成,policy添加的命令為:
例如,對隊列名稱以hello開頭的所有隊列進行鏡像,並在集群的兩個節點上完成鏡像,policy的設置命令為:
rabbitmqctl set_policy hello-ha "^hello" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
通常隊列由兩部分組成:一部分是AMQQueue,負責AMQP協議相關的消息處理,即接收生產者發布的消息、向消費者投遞消息、處理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相關的介面供AMQQueue調用,完成消息的存儲以及可能的持久化工作等。
鏡像隊列基本上就是一個特殊的BackingQueue,它內部包裹了一個普通的BackingQueue做本地消息持久化處理,在此基礎上增加了將消息和ack復制到所有鏡像的功能。所有對mirror_queue_master的操作,會通過組播GM(下面會講到)的方式同步到各slave節點。GM負責消息的廣播,mirror_queue_slave負責回調處理,而master上的回調處理是由coordinator負責完成。mirror_queue_slave中包含了普通的BackingQueue進行消息的存儲,master節點中BackingQueue包含在mirror_queue_master中由AMQQueue進行調用。
消息的發布(除了Basic.Publish之外)與消費都是通過master節點完成。master節點對消息進行處理的同時將消息的處理動作通過GM廣播給所有的slave節點,slave節點的GM收到消息後,通過回調交由mirror_queue_slave進行實際的處理。
GM(Guarenteed Multicast) 是一種可靠的組播通訊協議,該協議能夠保證組播消息的原子性,即保證組中活著的節點要麼都收到消息要麼都收不到。它的實現大致如下:
將所有的節點形成一個循環鏈表,每個節點都會監控位於自己左右兩邊的節點,當有節點新增時,相鄰的節點保證當前廣播的消息會復制到新的節點上;當有節點失效時,相鄰的節點會接管保證本次廣播的消息會復制到所有的節點。在master節點和slave節點上的這些gm形成一個group,group(gm_group)的信息會記錄在mnesia中。不同的鏡像隊列形成不同的group。消息從master節點對於的gm發出後,順著鏈表依次傳送到所有的節點,由於所有節點組成一個循環鏈表,master節點對應的gm最終會收到自己發送的消息,這個時候master節點就知道消息已經復制到所有的slave節點了。
slave節點先從gm_group中獲取對應group的所有成員信息,然後隨機選擇一個節點並向這個節點發送請求,這個節點收到請求後,更新gm_group對應的信息,同時通知左右節點更新鄰居信息(調整對左右節點的監控)及當前正在廣播的消息,然後回復通知請求節點成功加入group。請求加入group的節點收到回復後再更新rabbit_queue中的相關信息,並根據需要進行消息的同步。
當slave節點失效時,僅僅是相鄰節點感知,然後重新調整鄰居節點信息、更新rabbit_queue、gm_group的記錄等。如果是master節點失效,"資格最老"的slave節點被提升為master節點,slave節點會創建出新的coordinator,並告知gm修改回調處理為coordinator,原來的mirror_queue_slave充當amqqueue_process處理生產者發布的消息,向消費者投遞消息等。
上面提到如果是slave節點失效,只有相鄰的節點能感知到,那麼master節點失效是不是也是只有相鄰的節點能感知到?假如是這樣的話,如果相鄰的節點不是"資格最老"的節點,怎麼通知"資格最老"的節點提升為新的master節點呢?
實際上,所有的slave節點在加入group時,mirror_queue_slave進程會對master節點的amqqueue_process進程(也可能是mirror_queue_slave進程)進行監控,如果master節點失效的話,mirror_queue_slave會感知,然後再通過gm進行廣播,這樣所有的節點最終都會知道master節點失效。當然,只有"資格最老"的節點會提升自己為新的master。
消息從master節點發出,順著節點鏈表發送。在這期間,所有的slave節點都會對消息進行緩存,當master節點收到自己發送的消息後,會再次廣播ack消息,同樣ack消息會順著節點鏈表經過所有的slave節點,其作用是通知slave節點可以清除緩存的消息,當ack消息回到master節點時對應廣播消息的生命周期結束。
下圖為一個簡單的示意圖,A節點為master節點,廣播一條內容為"test"的消息。"1"表示消息為廣播的第一條消息;"id=A"表示消息的發送者為節點A。右邊是slave節點記錄的狀態信息。
為什麼所有的節點都需要緩存一份發布的消息呢?
master發布的消息是依次經過所有slave節點,在這期間的任何時刻,有可能有節點失效,那麼相鄰的節點可能需要重新發送給新的節點。例如,A->B->C->D->A形成的循環鏈表,A為master節點,廣播消息發送給節點B,B再發送給C,如果節點C收到B發送的消息還未發送給D時異常結束了,那麼節點B感知後節點C失效後需要重新將消息發送給D。同樣,如果B節點將消息發送給C後,B,C節點中新增了E節點,那麼B節點需要再將消息發送給新增的E節點。
配置鏡像隊列的時候有個 ha-sync-mode 屬性,這個有什麼用呢?
新節點加入到group後,最多能從左邊節點獲取到當前正在廣播的消息內容,加入group之前已經廣播的消息則無法獲取到。如果此時master節點不幸失效,而新節點有恰好成為了新的master,那麼加入group之前已經廣播的消息則會全部丟失。
注意:這里的消息具體是指新節點加入前已經發布並復制到所有slave節點的消息,並且這些消息還未被消費者消費或者未被消費者確認。如果新節點加入前,所有廣播的消息被消費者消費並確認了,master節點刪除消息的同時會通知slave節點完成相應動作。這種情況等同於新節點加入前沒有發布任何消息。
避免這種問題的解決辦法就是對新的slave節點進行消息同步。當 ha-sync-mode 配置為自動同步(automatic)時,新節點加入group時會自動進行消息的同步;如果配置為manually則需要手動操作完成同步。
Federation直譯過來是聯邦,它的設計目標是使 RabbitMQ 在不同的 Broker 節點之間進行消息傳遞而無須建
立集群。具有以下特點:
那麼它到底有什麼用呢?我們可以從一個實際場景入手:
有兩個服務分別部署在國內和海外,它們之間需要通過消息隊列來通訊。
很明顯無論RabbitMQ部署在海外還是國內,另一方一定得忍受連接上的延遲。因此我們可以在海外和國內各部署一個MQ,這樣一來海外連接海外的MQ,國內連接國內,就不會有連接上的延遲了。
但這樣還會有問題,假設某生產者將消息存入海外MQ中的某個隊列 queueB , 在國內的服務想要消費 queueB 消息,消息的流轉及確認必然要忍受較大的網路延遲 ,內部編碼邏輯也會因這一因素變得更加復雜。
此外,服務可能得維護兩個MQ的配置,比如國內服務在生產消息時得使用國內MQ,消費消息時得監聽海外MQ的隊列,降低了系統的維護性。
可能有人想到可以用集群,但是RabbitMQ的集群對延遲非常敏感,一般部署在區域網內,如果部署在廣域網可能會產生網路分區等等問題。
這時候,Federation就派上用場了。它被設計成能夠容忍不穩定的網路連接情況,完全能夠滿足這樣的場景。
那使用Federation之後是怎樣的業務流程呢?
首先我們在海外MQ上定義exchangeA,它通過路由鍵「rkA」綁定著queueA。然後用Federation在exchangeA上建立一條 單向 連接到國內RabbitMQ,Federation則自動會在國內RabbitMQ建立一個exchangeA交換器(默認同名)。
這時候,如果部署在國內的client C在國內MQ上publish了一條消息,這條消息會通過 Federation link 轉發到海外MQ的交換器exchangeA中,最終消息會存入與 exchangeA 綁定的隊列 queueA 中,而client C也能立即得到返回。
實際上,Federation插件還會在國內MQ建立一個內部的交換器:exchangeA→ broker3 B(broker3是集群名),並通過路由鍵 "rkA"將它和國內MQ的exchangeA綁定起來。接下來還會在國內MQ上建立一個內部隊列federation: exchangeA->broker3 B,並與內部exchange綁定。這些操作都是內部的,對客戶端來說是透明的。
值得一提的是,Federation的連接是單向的,如果是在海外MQ的exchangeA上發送消息是不會轉到國內的。
這種在exchange上建立連接進行聯邦的,就叫做 聯邦交換器 。一個聯邦交換器接收上游(upstream)的信息,這里的上游指的是其他的MQ節點。
對比前面舉的例子,國內MQ就是上游,聯邦交換器能夠將原本發送給上游交換器的消息路由到本地的某個隊列中。
有聯邦交換器自然也有聯播隊列,聯邦隊列則允許一個本地消費者接收到來自上游隊列的消息 。
如圖,海外MQ有隊列A,給其設置一條鏈接,Federation則自動會在國內RabbitMQ建立一個隊列A(默認同名)。
當有消費者 ClinetA連接海外MQ並消費 queueA 中的消息時,如果隊列 queueA中本身有若干消息堆積,那麼 ClientA直接消費這些消息,此時海外MQ中的queueA並不會拉取國內中的 queueA 的消息;如果隊列 queueA中沒有消息堆積或者消息被消費完了,那麼它會通過 Federation link 拉取上游隊列 queueA 中的消息(如果有消息),然後存儲到本地,之後再被消費者 ClientA進行消費 。
首先開啟Federation 功能:
值得注意的是,當需要在集群中使用 Federation 功能的時候,集群中所有的節點都應該開啟 Federation 插件。
接下來我們要配置兩個東西:upstreams和Policies。
每個 upstream 用於定義與其他 Broker 建立連接的信息。
通用參數如下:
然後定義一個 Policy, 用於匹配交換器:
^exchange 意思是將匹配所有以exchange名字開頭的交換器,為它們在上游創建連接。這樣就創建了一個 Federation link。
Shovel是RabbitMQ的一個插件, 能夠可靠、持續地從一個Broker 中的隊列(作為源端,即source )拉取數據並轉發至另一個Broker 中的交換器(作為目的端,即destination )。作為源端的隊列和作為目的端的交換器可以同時位於同一個 Broker 上,也可以位於不同的 Broker 上。
使用Shovel有以下優勢:
使用Shovel時,通常源為隊列,目的為交換器:
但是,也可以源為隊列,目的為隊列。實際也是由交換器轉發,只不過這個交換器是默認交換器。配置交換器做為源也是可行的。實際上會在源端自動新建一個隊列,消息先存在這個隊列,再被Shovel移走。
使用Shovel插件命令:
Shovel 既可以部署在源端,也可以部署在目的端。有兩種方式可以部署 Shovel:
其主要差異如下:
來看一個使用Shovel治理消息堆積的案例。
當某個隊列中的消息堆積嚴重時,比如超過某個設定的閾值,就可以通過 Shovel 將隊列中的消息移交給另一個集群。