当前位置:首页 » 数据仓库 » ibmq队列如何配置
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

ibmq队列如何配置

发布时间: 2023-05-20 14:08:09

1. 关于ActiveMQ的配备怎么解决

关于ActiveMQ的配置
目前常用的消息队列组建无非就是MSMQ和ActiveMQ,至于他们的异同,这里不想做过多的比较。简单来说,MSMQ内置于微软操作系统之中,在部署上包含一个隐性条件:Server需要是微软操作系统。(对于这点我并去调研过MSMQ是否可以部署在非微软系统,比如:Linux,只是拍脑袋想了想,感觉上是不可以)。对于ActiveMQ,微软系统和Linux都是可以部署的。从功能方面来说,一般最常用的就是:消息的收/发,感觉差异不大。从性能上来说,一般的说法是ActiveMQ略高。在稳定性上,个人感觉MSMQ更好。如果这两种常用队列都用过的同学,应该来说最大的差异在于:MSMQ如果要访问远程队列(比如机器A上的程序访问机器B上的队列),会比较恶心。在数据量比较大的情况之下,一般来说队列服务器会专门的一台或者多台(多台的话,用程序去做热备+负载比较方便,也不需要额外的硬件成本。简单来说做法可以这样:消息发送的时候随机向着多台队列服务器上发送消息;接受的时候开多个线程去分别监听;热备方面,可以维护一个带状态的队列连接池,如果消息收发失败那么将状态置为不可用,然后起一个线程去定时监测坏的连接是否可用,这个过程一般情况下可以不用加锁,为什么,大家根据各自需要去取舍吧)。最近搞完了短彩信的网关连接服务,这两种队列我均使用了。大致的过程是这样的:上层应用如果要发端彩信,那么将消息直接发送至ActiveMQ(目前用的就是上面说的多台热备+负载,因为实际中下行量非常大5千万条/天以上),然后端彩信网关连接服务部署多套,每套均依赖本机的MSMQ。为什么呢?用ActiveMQ的原因是:上层应用程序和网关连接服务彼此独立,消息需要跨机访问。用MSMQ的原因是:ActiveMQ中的数据是一条不分省的大队列,网关连接服务需要按省流控,所以端彩信网关连接服务:首先把消息从ActiveMQ取出来,然后存至本机上的分省MSMQ,这样做另外的一个好处就是:ActiveMQ不至于过多挤压,他的数据会分摊到N台短彩信网关连接服务所部署的机器上的MSMQ之中,也就说MSMQ可以起到分摊数据和缓冲的作用。
在之前的随笔中,已经介绍过MSMQ,现在先介绍一下ActiveMQ一些配置,目前好像ActiveMQ配置上的介绍还比较少。以下是自己总结一些相关资料,贴出来给大家共享
一)问题分析和解决
1)KahaDb和AMQ Message Store两种持久方式如何选择?
官方:
From 5.3 onwards - we recommend you use KahaDB - which offers improved scalability and recoverability over the AMQ Message Store.
The AMQ Message Store which although faster than KahaDB - does not scales as well as KahaDB and recovery times take longer.
非官方:
kaha文件系统实际上上是一个文件索引系统,有两部分组成,一个是数据文件系统,由一个个独立的文件组成,缺省文件大小是32M大(可配置),另外一个是索引文件系统,记录消息在数据文件中的位置信息以及数据文件中的空闲块信息。数据文件是存储硬盘上的,索引文件是缓存在内存中的。所以这个存储系统对大消息存储有利,象我们的memberId之类的文本消息,实际上是浪费,索引比消息还大,哈。
我方分析:
推荐: Amq持久方式
理由:虽然官方推荐使用KahaDB持久方式,但其提到的优势:可伸缩性和恢复性较好,对于我们实际的应用意义不大。从我们自己的使用经验来看,KahaDB持久方式,Data文件是一个大文件(感觉文件过大后,造成队列服务瘫死的可能性会增大),从官网的相关配置(附录1)也找不到哪里可以设置数据的文件的最大Size。)而Amq持久方式可以设置Data文件最大Size,这样可以保证即时消息积压很多,Data文件也不至于过大。
2)错误:Channel was inactive for too long
解决方法:
在建立连接的Uri中加入: wireFormat.maxInactivityDuration=0
参考资源:
http://jinguo.iteye.com/blog/243153
You can do the following to fix the issues:
1) Append max inactivity ration to your Uri in the format below: wireFormat.maxInactivityDuration=0
2) Use the same Uri at the client side as well as at the server side
Regards,
如果不这样设置,对应的错误会出现:
2008-05-07 09:22:56,343 [org.apache.activemq.ActiveMQConnection]-[WARN] Async exception with no exception listener: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
ActiveMQ的tcp url:tcp://localhost:61616后面要加入?wireFormat.maxInactivityDuration=0 这样的参数,否则当一段时间没有消息发送时会抛出 "Channel was inactive for too long"异常
3)错误:Wire format negotiation timeout: peer did not send his wire format.
解决方法:
1)关闭ActiveMqLog4j
打开:conf/log4j.properties
将:log4j.rootLogger=INFO, console, logfile
修改为:log4j.rootLogger=OFF
2)在建立连接的Uri中加入: =30000
例如北京的测试环境连接Uri:
tcp://192.168.199.80:61616?wireFormat.maxInactivityDuration=0&=30000&connection.AsyncSend=true
参考资源:
http://activemq.apache.org/javaxjmsjmsexception-wire-format-negociation-timeout-peer-did-not-send-his-wire-format.html
If you get exception like this,it can mean one of three things:
1. You're connecting to the port not used by ActiveMQ TCP transport
Make sure to check that you're connecting to the appropriate host:port
2. You're using log4j JMS appender and doesn't filter out ActiveMQ log messages
Be sure to read How do I use log4j JMS appender with ActiveMQ and more importantly to never send ActiveMQ log messages to JMS appender
3. Your broker is probably under heavy load (or network connection is unreliable), so connection setup cannot be completed in a reasonable time
If you experience sporadic exceptions like this, the best solution is to use failover transport, so that your clients can try connecting again if the first attempt fails. If you're getting these kind of exceptions more frequently you can also try extending wire format negotiation period (default 10 sec). You can do that by using wireFormat. property on the connection URL in your client.
For example
tcp://localhost:61616?wireFormat.=30000
will use 30 sec timeout.(貌似有问题!!!)
4)错误:Out of memory
解决方法:
1) 设置Java最大内存限制为合适大小:
Bin/activemq.bat 中ACTIVEMQ_OPTS=-Xmx512M(默认是512)
2)Activemq.xml配置节:systemUsage/ systemUsage配置大小合适,并且特别注意:大于所有rable desitination设置的memoryUsage之和。
备注:
1)尖括号:“>”代表通配符
2)ACTIVEMQ_OPTS的配置〉=memoryUsage中配置〉=所有rable desitination设置之和
3)SystemUsage配置设置了一些系统内存和硬盘容量,当系统消耗超过这些容量设置时,amq会“slow down procer”,还是很重要的。
参考资料:
http://m.oschina.net/blog/26216
参考-- http://activemq.apache.org/javalangoutofmemory.html
对于MQ的内容实用是可管理和可配置的。首先需要判断的是MQ的哪部分系统因内存不足而导致泄漏,是JVM,broker还是消费者、生产者?
一、内存管理
JVM内存管理:
1. 用bin/activemq命令在独立JVM中运行broker。用-Xmx和-Xss命令即可(activemq.bat文件中修改ACTIVEMQ_OPTS选项参数即可);
2. 默认情况下,MQ用512M的JVM;
broker内存管理:
1. broker使用的内存并不是由JVM的内存决定的。虽然受到JVM的限制,但broker确实独立管理器内存;
2. systemUsage和destination的内存限制与broker内存息息相关;
3. MQ中内存的关系是:JVM->Broker->broker features;
4. 所有destination的内存总量不能超过broker的总内存;
消费者:
1. 由于消息大小可以配置,prefetch limit往往是导致内存溢出的主要原因;
2. 减少prefetch limit的大小,会减少消费者内存中存储的消息数量;
生产者:
1. 除非消息数量超过了broker资源的限制,否则生产者不会导致内存溢出;
2. 当内存溢出后,生产者会收到broker的阻塞信息提示;
二、其他
将消息缓冲之硬盘:
1. 只有当消息在内存中存储时,才允许消息的快速匹配与分发,而当消费者很慢或者离开时,内存可能会耗尽;
2. 当destination到达它的内存临界值时,broker会用消息游标来缓存非持久化的消息到硬盘。
3. 临界值在broker中通过memoryUsage和systemUsage两个属性配置,请参考activemq.xml;
4. 对于缓慢的消费者,当尚未耗尽内存或者转变为生产者并发控制模式前,这个特性允许生产者继续发送消息到broker;
5. 当有多个destination的时候,默认的内存临界值可能被打破,而这种情况将消息缓存到硬盘就显得很有意义;
6. precentUsage配置:使用百分比来控制内存使用情况;
多个线程:
1. 默认情况下,MQ每个destination都对应唯一的线程;
2. -Dorg.apache.activema.UseDedicatedTaskRunner=false(activemq.bat文件中修改ACTIVEMQ_OPTS选项参数即可),用线程池来限制线程的数量,从而减少内存消耗;
大数据传输:
1. destination policies--maxPageSize:控制进入内存中的消息数量;lazyDispatch:增加控制使用当前消费者列表的预取值;
2. 使用blogMessage或者streamsMessage类型来进行大量文件的传输;
泄漏JMS资源:
1. 当session或者procer或者consumer大量存在而没有关闭的时候;
2. 使用PooledConnectionFactory;

2. rabbitmq 怎么设置集群发送 ip

AMQP(高级消息队列协议)是一个异步消息传递所使用的应用层协议规范,作为线路层协议,而不是API(例如JMS),AMQP客户端能够无视消息的来源任意发送和接受信息。AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件(MOM)系统,例如发布梁培/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由。AMQP当中有四个概念非常重要virtualhost,虚拟主机exchange,交换机queue,队列binding,绑定一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?因为RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机/。何谓虚拟主机(virtualhost),交换机(exchange),队列(queue)和绑定(binding)队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队帆纯列并且将其取走为止。不过,也可以将一个队列配置成这样的:一旦消息进入这个队列,此消息就被删除。队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具。这没什么问题,如果一个消费者试图创建一个已经存在的队列,RabbitMQ会直接忽略这个请求。因此我们可以将消息队列的配置写在应用程序的代码里面。而要把一个消息放进队列前,需要有一个交换机(Exchange)。交换机(Exchange)可以理解成具有路由表的路由程序。每个消息都有一个称为路由键(routingkey)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes)。(例如,指明具有路由键“X”的消息要到名为timbuku的队列当中去。)消费者程序(Consumer)要负责创建你的交换机。交换机可以存在多态渣咐个,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,可以创建5个交换机来用5个核,另外3个核留下来做消息处理。类似的,在RabbitMQ的集群当中,你可以用类似的思路来扩展交换机一边获取更高的吞吐量。交换机如何判断要把消息送到哪个队列?你需要路由规则,即绑定(binding)。一个绑定就是一个类似这样的规则:将交换机“desert(沙漠)”当中具有路由键“阿里巴巴”的消息送到队列“hideout(山洞)”里面去。换句话说,一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。例如,具有路由键“audit”的消息需要被送到两个队列,“log-forever”和“alert-the-big-de”。要做到这个,就需要创建两个绑定,每个都连接一个交换机和一个队列,两者都是由“audit”路由键触发。在这种情况下,交换机会复制一份消息并且把它们分别发送到两个队列当中。交换机不过就是一个由绑定构成的路由表。交换机有多种类型。他们都是做路由的,但是它们接受不同类型的绑定。为什么不创建一种交换机来处理所有类型的路由规则呢?因为每种规则用来做匹配分子的CPU开销是不同的。例如,一个“topic”类型的交换机试图将消息的路由键与类似“dogs.*”的模式进行匹配。匹配这种末端的通配符比直接将路由键与“dogs”比较(“direct”类型的交换机)要消耗的CPU。如果你不需要“topic”类型的交换机带来的灵活性,你可以通过使用“direct”类型的交换机获取更高的处理效率。

3. SpringBoot使用RabbitMQ看这几篇就够了(配置篇)!

上篇我们说到了消息队列RabbitMQ的模式概念,那么这里将会针对模式使用SpringBoot联合RabbitMQ做一个案例,实现消息的生产和消费。

这一篇也是这个主题的最后一篇了,建议配合着看冲山。助于理解。

博主会将Demo工程放在Gitee上,有兴简判竖趣的可以拉下来自己试试。

Gitee地址: https://gitee.com/lemon_ant/os.git

新建SpringBoot项目

添加配置文件

添加pom文件

启动类

[图片上传失败...(image-3e7425-1591871192134)]

[图片上传失败...(image-e9beeb-1591871192134)]

<font color=red>注意看时间,说明消息是轮询分发的,一个消息只由一个消费者消费。</font>

<font color=red>注意看时间,交换机会将消息推送到所有绑定到它的队列。</font>

[图片上传失败...(image-1c2e63-1591871192134)]

[图片上传失败...(image-c0c993-1591871192134)]

<font color=red>我这里测试传的就是routing key,方便看。</font>

[图片上传失败...(image-610dd9-1591871192134)]

[图片上传失败...(image-d8749-1591871192134)]

[图片上传失败...(image-25a6c2-1591871192134)]

<font color=red>这里用时间来区别。</font>

消息队列在拦大这里基本就结束了,结合前面两篇基本就能够了解队列的基本概念和用法了。

4. 消息队列之RabbitMQ-分布式部署

RabbitMQ分布式部署有3种方式:

Federation与Shovel都是以插件的形式来实现,复杂性相对高,而集群是RabbitMQ的自带属性,相对简单。

这三种方式并不是互斥的,可以根据需求选择相互组合来达到目的。

RabbitMQ本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。

因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。

我们把部署RabbitMQ的机器称为节点,也就是broker。broker有2种类型节点: 磁盘节点 内存节点 。顾名思义,磁盘节点的broker把元数据存储在磁盘中,内存节点把元数据存储在内存中,很明显,磁盘节点的broker在重启后元数据可以通过读取磁盘进行重建,保证了元数据不丢失,内存节点的broker可以获得更高的性能,但在重启后元数据就都丢了。

元数据包含以下内容:

单节点系统必须是磁盘节点 ,否则每次你重启RabbitMQ之后所有的系统配置信息都会丢失。

集群中至少有一个磁盘节点 ,当节点加入和离开集群时,必须通知磁盘 节点。

如果集群中的唯一一个磁盘节点,结果这个磁盘节点还崩溃了,那会发生什么情况?集群依然可以继续路由消息(因为其他节点元数据在还存在),但无法做以下操作:

也就是说,如果唯一磁盘的磁盘节点崩溃, 集群是可以保持运行的,但不能更改任何东西 。为了增加可靠性,一般会在集群中设置两个磁盘节点,只要任何一个处于工作状态,就可以保障集群的正常服务。

RabbitMQ的集群模式分为两种: 普通模式 镜像模式

普通模式,也是默认的集群模式。

对于Queue来说, 消息实体只存在于其中一个节点 ,A、B两个节点仅有相同的元数据,即队列结构。当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连A或B,出口总在A,会产生瓶颈。

队列所在的节点称为 宿主节点

队列创建时,只会在宿主节点创建队列的进程,宿主节点包含完整的队列信息,包括元数据、状态、内容等等。因此, 只有队列的宿主节点才能知道队列的所有信息

队列创建后,集群只会同步队列和交换器的元数据到集群中的其他节点,并不会同步队列本身,因此 非宿主节点就只知道队列的元数据和指向该队列宿主节点的指针

假如现在一个客户端需要对Queue A进行发布或者订阅,发起与集群的连接,有两种可能的场景:

由于节点之间存在路由转发的情况,对延迟非常敏感,应当只在本地局域网内使用,在广域网中不应该使用集群,而应该用Federation或者Shovel代替。

这样的设计,保证了不论从哪个broker中均可以消费所有队列的数据,并分担了负载,因此,增加broker可以线性提高服务的性能和吞吐量。

但该方案也有显着的缺陷,那就是 不能保证消息不会丢失 。当集群中某一节点崩溃时,崩溃节点所在的队列进程和关联的绑定都会消失,附加在那些队列上的消费者也会丢失其订阅信息,匹配该队列的新消息也会丢失。比如A为宿主节点,当A节点故障后,B节点无法取到A节点中还未消费的消息实体。如果做了消息持久化,那么得等A节点恢复,然后才可被消费;如果没有持久化的话,然后就没有然后了……

肯定有不少同学会问,想要实现HA方案,那将RabbitMQ集群中的所有Queue的完整数据在所有节点上都保存一份不就可以了吗?比如类似MySQL的主主模式,任何一个节点出现故障或者宕机不可用时,那么使用者的客户端只要能连接至其他节点,不就能够照常完成消息的发布和订阅吗?

RabbitMQ这么设计是基于性能和存储空间上来考虑:

引入 镜像队列 (Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能够自动切换到镜像中的另一个节点上以保证服务的可用性。

一个镜像队列中包含有1个主节点master和若干个从节点slave。其主从节点包含如下几个特点:

该模式和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。

一个队列想做成镜像队列,需要先设置policy,然后客户端创建队列的时候,rabbitmq集群根据队列名称自动设置为普通队列还是镜像队列。

镜像队列的配置通过添加policy完成,policy添加的命令为:

例如,对队列名称以hello开头的所有队列进行镜像,并在集群的两个节点上完成镜像,policy的设置命令为:

rabbitmqctl set_policy hello-ha "^hello" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

通常队列由两部分组成:一部分是AMQQueue,负责AMQP协议相关的消息处理,即接收生产者发布的消息、向消费者投递消息、处理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相关的接口供AMQQueue调用,完成消息的存储以及可能的持久化工作等。

镜像队列基本上就是一个特殊的BackingQueue,它内部包裹了一个普通的BackingQueue做本地消息持久化处理,在此基础上增加了将消息和ack复制到所有镜像的功能。所有对mirror_queue_master的操作,会通过组播GM(下面会讲到)的方式同步到各slave节点。GM负责消息的广播,mirror_queue_slave负责回调处理,而master上的回调处理是由coordinator负责完成。mirror_queue_slave中包含了普通的BackingQueue进行消息的存储,master节点中BackingQueue包含在mirror_queue_master中由AMQQueue进行调用。

消息的发布(除了Basic.Publish之外)与消费都是通过master节点完成。master节点对消息进行处理的同时将消息的处理动作通过GM广播给所有的slave节点,slave节点的GM收到消息后,通过回调交由mirror_queue_slave进行实际的处理。

GM(Guarenteed Multicast) 是一种可靠的组播通讯协议,该协议能够保证组播消息的原子性,即保证组中活着的节点要么都收到消息要么都收不到。它的实现大致如下:

将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上;当有节点失效时,相邻的节点会接管保证本次广播的消息会复制到所有的节点。在master节点和slave节点上的这些gm形成一个group,group(gm_group)的信息会记录在mnesia中。不同的镜像队列形成不同的group。消息从master节点对于的gm发出后,顺着链表依次传送到所有的节点,由于所有节点组成一个循环链表,master节点对应的gm最终会收到自己发送的消息,这个时候master节点就知道消息已经复制到所有的slave节点了。

slave节点先从gm_group中获取对应group的所有成员信息,然后随机选择一个节点并向这个节点发送请求,这个节点收到请求后,更新gm_group对应的信息,同时通知左右节点更新邻居信息(调整对左右节点的监控)及当前正在广播的消息,然后回复通知请求节点成功加入group。请求加入group的节点收到回复后再更新rabbit_queue中的相关信息,并根据需要进行消息的同步。

当slave节点失效时,仅仅是相邻节点感知,然后重新调整邻居节点信息、更新rabbit_queue、gm_group的记录等。如果是master节点失效,"资格最老"的slave节点被提升为master节点,slave节点会创建出新的coordinator,并告知gm修改回调处理为coordinator,原来的mirror_queue_slave充当amqqueue_process处理生产者发布的消息,向消费者投递消息等。

上面提到如果是slave节点失效,只有相邻的节点能感知到,那么master节点失效是不是也是只有相邻的节点能感知到?假如是这样的话,如果相邻的节点不是"资格最老"的节点,怎么通知"资格最老"的节点提升为新的master节点呢?

实际上,所有的slave节点在加入group时,mirror_queue_slave进程会对master节点的amqqueue_process进程(也可能是mirror_queue_slave进程)进行监控,如果master节点失效的话,mirror_queue_slave会感知,然后再通过gm进行广播,这样所有的节点最终都会知道master节点失效。当然,只有"资格最老"的节点会提升自己为新的master。

消息从master节点发出,顺着节点链表发送。在这期间,所有的slave节点都会对消息进行缓存,当master节点收到自己发送的消息后,会再次广播ack消息,同样ack消息会顺着节点链表经过所有的slave节点,其作用是通知slave节点可以清除缓存的消息,当ack消息回到master节点时对应广播消息的生命周期结束。

下图为一个简单的示意图,A节点为master节点,广播一条内容为"test"的消息。"1"表示消息为广播的第一条消息;"id=A"表示消息的发送者为节点A。右边是slave节点记录的状态信息。

为什么所有的节点都需要缓存一份发布的消息呢?

master发布的消息是依次经过所有slave节点,在这期间的任何时刻,有可能有节点失效,那么相邻的节点可能需要重新发送给新的节点。例如,A->B->C->D->A形成的循环链表,A为master节点,广播消息发送给节点B,B再发送给C,如果节点C收到B发送的消息还未发送给D时异常结束了,那么节点B感知后节点C失效后需要重新将消息发送给D。同样,如果B节点将消息发送给C后,B,C节点中新增了E节点,那么B节点需要再将消息发送给新增的E节点。

配置镜像队列的时候有个 ha-sync-mode 属性,这个有什么用呢?

新节点加入到group后,最多能从左边节点获取到当前正在广播的消息内容,加入group之前已经广播的消息则无法获取到。如果此时master节点不幸失效,而新节点有恰好成为了新的master,那么加入group之前已经广播的消息则会全部丢失。

注意:这里的消息具体是指新节点加入前已经发布并复制到所有slave节点的消息,并且这些消息还未被消费者消费或者未被消费者确认。如果新节点加入前,所有广播的消息被消费者消费并确认了,master节点删除消息的同时会通知slave节点完成相应动作。这种情况等同于新节点加入前没有发布任何消息。

避免这种问题的解决办法就是对新的slave节点进行消息同步。当 ha-sync-mode 配置为自动同步(automatic)时,新节点加入group时会自动进行消息的同步;如果配置为manually则需要手动操作完成同步。

Federation直译过来是联邦,它的设计目标是使 RabbitMQ 在不同的 Broker 节点之间进行消息传递而无须建
立集群。具有以下特点:

那么它到底有什么用呢?我们可以从一个实际场景入手:

有两个服务分别部署在国内和海外,它们之间需要通过消息队列来通讯。

很明显无论RabbitMQ部署在海外还是国内,另一方一定得忍受连接上的延迟。因此我们可以在海外和国内各部署一个MQ,这样一来海外连接海外的MQ,国内连接国内,就不会有连接上的延迟了。

但这样还会有问题,假设某生产者将消息存入海外MQ中的某个队列 queueB , 在国内的服务想要消费 queueB 消息,消息的流转及确认必然要忍受较大的网络延迟 ,内部编码逻辑也会因这一因素变得更加复杂。

此外,服务可能得维护两个MQ的配置,比如国内服务在生产消息时得使用国内MQ,消费消息时得监听海外MQ的队列,降低了系统的维护性。

可能有人想到可以用集群,但是RabbitMQ的集群对延迟非常敏感,一般部署在局域网内,如果部署在广域网可能会产生网络分区等等问题。

这时候,Federation就派上用场了。它被设计成能够容忍不稳定的网络连接情况,完全能够满足这样的场景。

那使用Federation之后是怎样的业务流程呢?

首先我们在海外MQ上定义exchangeA,它通过路由键“rkA”绑定着queueA。然后用Federation在exchangeA上建立一条 单向 连接到国内RabbitMQ,Federation则自动会在国内RabbitMQ建立一个exchangeA交换器(默认同名)。

这时候,如果部署在国内的client C在国内MQ上publish了一条消息,这条消息会通过 Federation link 转发到海外MQ的交换器exchangeA中,最终消息会存入与 exchangeA 绑定的队列 queueA 中,而client C也能立即得到返回。

实际上,Federation插件还会在国内MQ建立一个内部的交换器:exchangeA→ broker3 B(broker3是集群名),并通过路由键 "rkA"将它和国内MQ的exchangeA绑定起来。接下来还会在国内MQ上建立一个内部队列federation: exchangeA->broker3 B,并与内部exchange绑定。这些操作都是内部的,对客户端来说是透明的。

值得一提的是,Federation的连接是单向的,如果是在海外MQ的exchangeA上发送消息是不会转到国内的。

这种在exchange上建立连接进行联邦的,就叫做 联邦交换器 。一个联邦交换器接收上游(upstream)的信息,这里的上游指的是其他的MQ节点。

对比前面举的例子,国内MQ就是上游,联邦交换器能够将原本发送给上游交换器的消息路由到本地的某个队列中。

有联邦交换器自然也有联播队列,联邦队列则允许一个本地消费者接收到来自上游队列的消息 。

如图,海外MQ有队列A,给其设置一条链接,Federation则自动会在国内RabbitMQ建立一个队列A(默认同名)。

当有消费者 ClinetA连接海外MQ并消费 queueA 中的消息时,如果队列 queueA中本身有若干消息堆积,那么 ClientA直接消费这些消息,此时海外MQ中的queueA并不会拉取国内中的 queueA 的消息;如果队列 queueA中没有消息堆积或者消息被消费完了,那么它会通过 Federation link 拉取上游队列 queueA 中的消息(如果有消息),然后存储到本地,之后再被消费者 ClientA进行消费 。

首先开启Federation 功能:

值得注意的是,当需要在集群中使用 Federation 功能的时候,集群中所有的节点都应该开启 Federation 插件。

接下来我们要配置两个东西:upstreams和Policies。

每个 upstream 用于定义与其他 Broker 建立连接的信息。

通用参数如下:

然后定义一个 Policy, 用于匹配交换器:

^exchange 意思是将匹配所有以exchange名字开头的交换器,为它们在上游创建连接。这样就创建了一个 Federation link。

Shovel是RabbitMQ的一个插件, 能够可靠、持续地从一个Broker 中的队列(作为源端,即source )拉取数据并转发至另一个Broker 中的交换器(作为目的端,即destination )。作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker 上,也可以位于不同的 Broker 上。

使用Shovel有以下优势:

使用Shovel时,通常源为队列,目的为交换器:

但是,也可以源为队列,目的为队列。实际也是由交换器转发,只不过这个交换器是默认交换器。配置交换器做为源也是可行的。实际上会在源端自动新建一个队列,消息先存在这个队列,再被Shovel移走。

使用Shovel插件命令:

Shovel 既可以部署在源端,也可以部署在目的端。有两种方式可以部署 Shovel:

其主要差异如下:

来看一个使用Shovel治理消息堆积的案例。

当某个队列中的消息堆积严重时,比如超过某个设定的阈值,就可以通过 Shovel 将队列中的消息移交给另一个集群。