当前位置:首页 » 文件传输 » 消息缓冲队列需要互斥访问吗
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

消息缓冲队列需要互斥访问吗

发布时间: 2023-07-07 14:15:12

⑴ linux 进程间通信的几种方式

第一种:管道通信
两个进程利用管道进行通信时,发送信息的进程称为写进程;接收信息的进程称为读进程。管道通信方式的中间介质就是文件,通常称这种文件为管道文件,它就像管道一样将一个写进程和一个读进程连接在一起,实现两个进程之间的通信。写进程通过写入端往管道文件中写入信息;读进程通过读出端从管道文件中读取信息。两个进程协调不断地进行写和读,便会构成双方通过管道传递信息的流水线。
第二种:消息缓冲通信
多个独立的进程之间可以通过消息缓冲机制来相互通信。这种通信的实现是以消息缓冲区为中间介质,通信双方的发送和接收操作均以消息为单位。在存储器中,消息缓冲区被组织成队列,通常称之为消息队列。消息队列一旦创建后即可由多进程共享,发送消息的进程可以在任意时刻发送任意个消息到指定的消息队列上,并检查是否有接收进程在等待它所发送的消息。若有则唤醒它,而接收消息的进程可以在需要消息的时候到指定的消息队列上获取消息,如果消息还没有到来,则转入睡眠等待状态。
第三种:共享内存通信
针对消息缓冲需要占用CPU进行消息复制的缺点,OS提供了一种进程间直接进行数据交换的通信方式。共享内存,顾名思义这种通信方式允许多个进程在外部通信协议或同步,互斥机制的支持下使用同一个内存段进行通信,它是一种最有效的数据通信方式,其特点是没有中间环节,直接将共享的内存页面通过附接映射到相互通信的进程各自的虚拟地址空间中,从而使多个进程可以直接访问同一个物理内存页面。

⑵ 什么是临界区和临界资源对临界区管理的基本原则是什么

1、临界区指的是一个访问共用资源(例如:共用设备或是共用存储器)的程序片段,而这些共用资源又无法同时被多个线程访问的特性。

当有线程进入临界区段时,其他线程或是进程必须等待,有一些同步的机制必须在临界区段的进入点与离开点实现,以确保这些共用资源是被互斥获得使用。只能被单一线程访问的设备,例如:打印机。

2、临界资源:多道程序系统中存在许多进程,它们共享各种资源,然而有很多资源一次只能供一个进程使用。一次仅允许一个进程使用的资源称为临界资源。许多物理设备都属于临界资源,如输入机、打印机、磁带机等。

3、进程进入临界区的调度原则是:

(1)如果有若干进程要求进入空闲的临界区,一次仅允许一个进程进入。

(2)进入临界区的进程要在有限时间内退出,以便其它进程能及时进入自己的临界区。

(3)任何时候,处于临界区内的进程不可多于一个。

(4)如果进程不能进入自己的临界区,则应让出CPU,避免进程出现“忙等”现象。

(2)消息缓冲队列需要互斥访问吗扩展阅读

1、临界区存在的问题

临界区的退出,不会检测是否是已经进入的线程,也就是说,可以在A线程中调用进入临界区函数,在B线程调用退出临界区的函数,同样是成功。

临界区内的数据一次只能同时被一个进程使用,当一个进程使用临界区内的数据时,其他需要使用临界区数据的进程进入等待状态。

2、各进程采取互斥的方式,实现共享的资源称作临界资源。

属于临界资源的硬件有打印机、磁带机等,软件有消息缓冲队列、变量、数组、缓冲区等。 诸进程间应采取互斥方式,实现对这种资源的共享。

⑶ 消息队列之RabbitMQ-分布式部署

RabbitMQ分布式部署有3种方式:

Federation与Shovel都是以插件的形式来实现,复杂性相对高,而集群是RabbitMQ的自带属性,相对简单。

这三种方式并不是互斥的,可以根据需求选择相互组合来达到目的。

RabbitMQ本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。

因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。

我们把部署RabbitMQ的机器称为节点,也就是broker。broker有2种类型节点: 磁盘节点 内存节点 。顾名思义,磁盘节点的broker把元数据存储在磁盘中,内存节点把元数据存储在内存中,很明显,磁盘节点的broker在重启后元数据可以通过读取磁盘进行重建,保证了元数据不丢失,内存节点的broker可以获得更高的性能,但在重启后元数据就都丢了。

元数据包含以下内容:

单节点系统必须是磁盘节点 ,否则每次你重启RabbitMQ之后所有的系统配置信息都会丢失。

集群中至少有一个磁盘节点 ,当节点加入和离开集群时,必须通知磁盘 节点。

如果集群中的唯一一个磁盘节点,结果这个磁盘节点还崩溃了,那会发生什么情况?集群依然可以继续路由消息(因为其他节点元数据在还存在),但无法做以下操作:

也就是说,如果唯一磁盘的磁盘节点崩溃, 集群是可以保持运行的,但不能更改任何东西 。为了增加可靠性,一般会在集群中设置两个磁盘节点,只要任何一个处于工作状态,就可以保障集群的正常服务。

RabbitMQ的集群模式分为两种: 普通模式 镜像模式

普通模式,也是默认的集群模式。

对于Queue来说, 消息实体只存在于其中一个节点 ,A、B两个节点仅有相同的元数据,即队列结构。当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连A或B,出口总在A,会产生瓶颈。

队列所在的节点称为 宿主节点

队列创建时,只会在宿主节点创建队列的进程,宿主节点包含完整的队列信息,包括元数据、状态、内容等等。因此, 只有队列的宿主节点才能知道队列的所有信息

队列创建后,集群只会同步队列和交换器的元数据到集群中的其他节点,并不会同步队列本身,因此 非宿主节点就只知道队列的元数据和指向该队列宿主节点的指针

假如现在一个客户端需要对Queue A进行发布或者订阅,发起与集群的连接,有两种可能的场景:

由于节点之间存在路由转发的情况,对延迟非常敏感,应当只在本地局域网内使用,在广域网中不应该使用集群,而应该用Federation或者Shovel代替。

这样的设计,保证了不论从哪个broker中均可以消费所有队列的数据,并分担了负载,因此,增加broker可以线性提高服务的性能和吞吐量。

但该方案也有显着的缺陷,那就是 不能保证消息不会丢失 。当集群中某一节点崩溃时,崩溃节点所在的队列进程和关联的绑定都会消失,附加在那些队列上的消费者也会丢失其订阅信息,匹配该队列的新消息也会丢失。比如A为宿主节点,当A节点故障后,B节点无法取到A节点中还未消费的消息实体。如果做了消息持久化,那么得等A节点恢复,然后才可被消费;如果没有持久化的话,然后就没有然后了……

肯定有不少同学会问,想要实现HA方案,那将RabbitMQ集群中的所有Queue的完整数据在所有节点上都保存一份不就可以了吗?比如类似MySQL的主主模式,任何一个节点出现故障或者宕机不可用时,那么使用者的客户端只要能连接至其他节点,不就能够照常完成消息的发布和订阅吗?

RabbitMQ这么设计是基于性能和存储空间上来考虑:

引入 镜像队列 (Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能够自动切换到镜像中的另一个节点上以保证服务的可用性。

一个镜像队列中包含有1个主节点master和若干个从节点slave。其主从节点包含如下几个特点:

该模式和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。

一个队列想做成镜像队列,需要先设置policy,然后客户端创建队列的时候,rabbitmq集群根据队列名称自动设置为普通队列还是镜像队列。

镜像队列的配置通过添加policy完成,policy添加的命令为:

例如,对队列名称以hello开头的所有队列进行镜像,并在集群的两个节点上完成镜像,policy的设置命令为:

rabbitmqctl set_policy hello-ha "^hello" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

通常队列由两部分组成:一部分是AMQQueue,负责AMQP协议相关的消息处理,即接收生产者发布的消息、向消费者投递消息、处理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相关的接口供AMQQueue调用,完成消息的存储以及可能的持久化工作等。

镜像队列基本上就是一个特殊的BackingQueue,它内部包裹了一个普通的BackingQueue做本地消息持久化处理,在此基础上增加了将消息和ack复制到所有镜像的功能。所有对mirror_queue_master的操作,会通过组播GM(下面会讲到)的方式同步到各slave节点。GM负责消息的广播,mirror_queue_slave负责回调处理,而master上的回调处理是由coordinator负责完成。mirror_queue_slave中包含了普通的BackingQueue进行消息的存储,master节点中BackingQueue包含在mirror_queue_master中由AMQQueue进行调用。

消息的发布(除了Basic.Publish之外)与消费都是通过master节点完成。master节点对消息进行处理的同时将消息的处理动作通过GM广播给所有的slave节点,slave节点的GM收到消息后,通过回调交由mirror_queue_slave进行实际的处理。

GM(Guarenteed Multicast) 是一种可靠的组播通讯协议,该协议能够保证组播消息的原子性,即保证组中活着的节点要么都收到消息要么都收不到。它的实现大致如下:

将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上;当有节点失效时,相邻的节点会接管保证本次广播的消息会复制到所有的节点。在master节点和slave节点上的这些gm形成一个group,group(gm_group)的信息会记录在mnesia中。不同的镜像队列形成不同的group。消息从master节点对于的gm发出后,顺着链表依次传送到所有的节点,由于所有节点组成一个循环链表,master节点对应的gm最终会收到自己发送的消息,这个时候master节点就知道消息已经复制到所有的slave节点了。

slave节点先从gm_group中获取对应group的所有成员信息,然后随机选择一个节点并向这个节点发送请求,这个节点收到请求后,更新gm_group对应的信息,同时通知左右节点更新邻居信息(调整对左右节点的监控)及当前正在广播的消息,然后回复通知请求节点成功加入group。请求加入group的节点收到回复后再更新rabbit_queue中的相关信息,并根据需要进行消息的同步。

当slave节点失效时,仅仅是相邻节点感知,然后重新调整邻居节点信息、更新rabbit_queue、gm_group的记录等。如果是master节点失效,"资格最老"的slave节点被提升为master节点,slave节点会创建出新的coordinator,并告知gm修改回调处理为coordinator,原来的mirror_queue_slave充当amqqueue_process处理生产者发布的消息,向消费者投递消息等。

上面提到如果是slave节点失效,只有相邻的节点能感知到,那么master节点失效是不是也是只有相邻的节点能感知到?假如是这样的话,如果相邻的节点不是"资格最老"的节点,怎么通知"资格最老"的节点提升为新的master节点呢?

实际上,所有的slave节点在加入group时,mirror_queue_slave进程会对master节点的amqqueue_process进程(也可能是mirror_queue_slave进程)进行监控,如果master节点失效的话,mirror_queue_slave会感知,然后再通过gm进行广播,这样所有的节点最终都会知道master节点失效。当然,只有"资格最老"的节点会提升自己为新的master。

消息从master节点发出,顺着节点链表发送。在这期间,所有的slave节点都会对消息进行缓存,当master节点收到自己发送的消息后,会再次广播ack消息,同样ack消息会顺着节点链表经过所有的slave节点,其作用是通知slave节点可以清除缓存的消息,当ack消息回到master节点时对应广播消息的生命周期结束。

下图为一个简单的示意图,A节点为master节点,广播一条内容为"test"的消息。"1"表示消息为广播的第一条消息;"id=A"表示消息的发送者为节点A。右边是slave节点记录的状态信息。

为什么所有的节点都需要缓存一份发布的消息呢?

master发布的消息是依次经过所有slave节点,在这期间的任何时刻,有可能有节点失效,那么相邻的节点可能需要重新发送给新的节点。例如,A->B->C->D->A形成的循环链表,A为master节点,广播消息发送给节点B,B再发送给C,如果节点C收到B发送的消息还未发送给D时异常结束了,那么节点B感知后节点C失效后需要重新将消息发送给D。同样,如果B节点将消息发送给C后,B,C节点中新增了E节点,那么B节点需要再将消息发送给新增的E节点。

配置镜像队列的时候有个 ha-sync-mode 属性,这个有什么用呢?

新节点加入到group后,最多能从左边节点获取到当前正在广播的消息内容,加入group之前已经广播的消息则无法获取到。如果此时master节点不幸失效,而新节点有恰好成为了新的master,那么加入group之前已经广播的消息则会全部丢失。

注意:这里的消息具体是指新节点加入前已经发布并复制到所有slave节点的消息,并且这些消息还未被消费者消费或者未被消费者确认。如果新节点加入前,所有广播的消息被消费者消费并确认了,master节点删除消息的同时会通知slave节点完成相应动作。这种情况等同于新节点加入前没有发布任何消息。

避免这种问题的解决办法就是对新的slave节点进行消息同步。当 ha-sync-mode 配置为自动同步(automatic)时,新节点加入group时会自动进行消息的同步;如果配置为manually则需要手动操作完成同步。

Federation直译过来是联邦,它的设计目标是使 RabbitMQ 在不同的 Broker 节点之间进行消息传递而无须建
立集群。具有以下特点:

那么它到底有什么用呢?我们可以从一个实际场景入手:

有两个服务分别部署在国内和海外,它们之间需要通过消息队列来通讯。

很明显无论RabbitMQ部署在海外还是国内,另一方一定得忍受连接上的延迟。因此我们可以在海外和国内各部署一个MQ,这样一来海外连接海外的MQ,国内连接国内,就不会有连接上的延迟了。

但这样还会有问题,假设某生产者将消息存入海外MQ中的某个队列 queueB , 在国内的服务想要消费 queueB 消息,消息的流转及确认必然要忍受较大的网络延迟 ,内部编码逻辑也会因这一因素变得更加复杂。

此外,服务可能得维护两个MQ的配置,比如国内服务在生产消息时得使用国内MQ,消费消息时得监听海外MQ的队列,降低了系统的维护性。

可能有人想到可以用集群,但是RabbitMQ的集群对延迟非常敏感,一般部署在局域网内,如果部署在广域网可能会产生网络分区等等问题。

这时候,Federation就派上用场了。它被设计成能够容忍不稳定的网络连接情况,完全能够满足这样的场景。

那使用Federation之后是怎样的业务流程呢?

首先我们在海外MQ上定义exchangeA,它通过路由键“rkA”绑定着queueA。然后用Federation在exchangeA上建立一条 单向 连接到国内RabbitMQ,Federation则自动会在国内RabbitMQ建立一个exchangeA交换器(默认同名)。

这时候,如果部署在国内的client C在国内MQ上publish了一条消息,这条消息会通过 Federation link 转发到海外MQ的交换器exchangeA中,最终消息会存入与 exchangeA 绑定的队列 queueA 中,而client C也能立即得到返回。

实际上,Federation插件还会在国内MQ建立一个内部的交换器:exchangeA→ broker3 B(broker3是集群名),并通过路由键 "rkA"将它和国内MQ的exchangeA绑定起来。接下来还会在国内MQ上建立一个内部队列federation: exchangeA->broker3 B,并与内部exchange绑定。这些操作都是内部的,对客户端来说是透明的。

值得一提的是,Federation的连接是单向的,如果是在海外MQ的exchangeA上发送消息是不会转到国内的。

这种在exchange上建立连接进行联邦的,就叫做 联邦交换器 。一个联邦交换器接收上游(upstream)的信息,这里的上游指的是其他的MQ节点。

对比前面举的例子,国内MQ就是上游,联邦交换器能够将原本发送给上游交换器的消息路由到本地的某个队列中。

有联邦交换器自然也有联播队列,联邦队列则允许一个本地消费者接收到来自上游队列的消息 。

如图,海外MQ有队列A,给其设置一条链接,Federation则自动会在国内RabbitMQ建立一个队列A(默认同名)。

当有消费者 ClinetA连接海外MQ并消费 queueA 中的消息时,如果队列 queueA中本身有若干消息堆积,那么 ClientA直接消费这些消息,此时海外MQ中的queueA并不会拉取国内中的 queueA 的消息;如果队列 queueA中没有消息堆积或者消息被消费完了,那么它会通过 Federation link 拉取上游队列 queueA 中的消息(如果有消息),然后存储到本地,之后再被消费者 ClientA进行消费 。

首先开启Federation 功能:

值得注意的是,当需要在集群中使用 Federation 功能的时候,集群中所有的节点都应该开启 Federation 插件。

接下来我们要配置两个东西:upstreams和Policies。

每个 upstream 用于定义与其他 Broker 建立连接的信息。

通用参数如下:

然后定义一个 Policy, 用于匹配交换器:

^exchange 意思是将匹配所有以exchange名字开头的交换器,为它们在上游创建连接。这样就创建了一个 Federation link。

Shovel是RabbitMQ的一个插件, 能够可靠、持续地从一个Broker 中的队列(作为源端,即source )拉取数据并转发至另一个Broker 中的交换器(作为目的端,即destination )。作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker 上,也可以位于不同的 Broker 上。

使用Shovel有以下优势:

使用Shovel时,通常源为队列,目的为交换器:

但是,也可以源为队列,目的为队列。实际也是由交换器转发,只不过这个交换器是默认交换器。配置交换器做为源也是可行的。实际上会在源端自动新建一个队列,消息先存在这个队列,再被Shovel移走。

使用Shovel插件命令:

Shovel 既可以部署在源端,也可以部署在目的端。有两种方式可以部署 Shovel:

其主要差异如下:

来看一个使用Shovel治理消息堆积的案例。

当某个队列中的消息堆积严重时,比如超过某个设定的阈值,就可以通过 Shovel 将队列中的消息移交给另一个集群。

⑷ C#多线程 一个缓冲队列,一个生产者线程,一个消费者线程,这两个线程同时操作这个队列,必须加互斥锁吗

加互斥锁的目的,是解决多线程访问同一资源而产生不可预期的异常,那么你现在是一个线程只负责插入,另一线程负责查询和删除,查询和删除跟插入没有关系的话,那么显然是不需要线程锁的,你要是问隐患的话,那么由于多线程或者多用户的原因,频繁操作几张相关表,可能造成数据库的表死锁。