當前位置:首頁 » 文件傳輸 » 消息緩沖隊列需要互斥訪問嗎
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

消息緩沖隊列需要互斥訪問嗎

發布時間: 2023-07-07 14:15:12

⑴ linux 進程間通信的幾種方式

第一種:管道通信
兩個進程利用管道進行通信時,發送信息的進程稱為寫進程;接收信息的進程稱為讀進程。管道通信方式的中間介質就是文件,通常稱這種文件為管道文件,它就像管道一樣將一個寫進程和一個讀進程連接在一起,實現兩個進程之間的通信。寫進程通過寫入端往管道文件中寫入信息;讀進程通過讀出端從管道文件中讀取信息。兩個進程協調不斷地進行寫和讀,便會構成雙方通過管道傳遞信息的流水線。
第二種:消息緩沖通信
多個獨立的進程之間可以通過消息緩沖機制來相互通信。這種通信的實現是以消息緩沖區為中間介質,通信雙方的發送和接收操作均以消息為單位。在存儲器中,消息緩沖區被組織成隊列,通常稱之為消息隊列。消息隊列一旦創建後即可由多進程共享,發送消息的進程可以在任意時刻發送任意個消息到指定的消息隊列上,並檢查是否有接收進程在等待它所發送的消息。若有則喚醒它,而接收消息的進程可以在需要消息的時候到指定的消息隊列上獲取消息,如果消息還沒有到來,則轉入睡眠等待狀態。
第三種:共享內存通信
針對消息緩沖需要佔用CPU進行消息復制的缺點,OS提供了一種進程間直接進行數據交換的通信方式。共享內存,顧名思義這種通信方式允許多個進程在外部通信協議或同步,互斥機制的支持下使用同一個內存段進行通信,它是一種最有效的數據通信方式,其特點是沒有中間環節,直接將共享的內存頁面通過附接映射到相互通信的進程各自的虛擬地址空間中,從而使多個進程可以直接訪問同一個物理內存頁面。

⑵ 什麼是臨界區和臨界資源對臨界區管理的基本原則是什麼

1、臨界區指的是一個訪問共用資源(例如:共用設備或是共用存儲器)的程序片段,而這些共用資源又無法同時被多個線程訪問的特性。

當有線程進入臨界區段時,其他線程或是進程必須等待,有一些同步的機制必須在臨界區段的進入點與離開點實現,以確保這些共用資源是被互斥獲得使用。只能被單一線程訪問的設備,例如:列印機。

2、臨界資源:多道程序系統中存在許多進程,它們共享各種資源,然而有很多資源一次只能供一個進程使用。一次僅允許一個進程使用的資源稱為臨界資源。許多物理設備都屬於臨界資源,如輸入機、列印機、磁帶機等。

3、進程進入臨界區的調度原則是:

(1)如果有若干進程要求進入空閑的臨界區,一次僅允許一個進程進入。

(2)進入臨界區的進程要在有限時間內退出,以便其它進程能及時進入自己的臨界區。

(3)任何時候,處於臨界區內的進程不可多於一個。

(4)如果進程不能進入自己的臨界區,則應讓出CPU,避免進程出現「忙等」現象。

(2)消息緩沖隊列需要互斥訪問嗎擴展閱讀

1、臨界區存在的問題

臨界區的退出,不會檢測是否是已經進入的線程,也就是說,可以在A線程中調用進入臨界區函數,在B線程調用退出臨界區的函數,同樣是成功。

臨界區內的數據一次只能同時被一個進程使用,當一個進程使用臨界區內的數據時,其他需要使用臨界區數據的進程進入等待狀態。

2、各進程採取互斥的方式,實現共享的資源稱作臨界資源。

屬於臨界資源的硬體有列印機、磁帶機等,軟體有消息緩沖隊列、變數、數組、緩沖區等。 諸進程間應採取互斥方式,實現對這種資源的共享。

⑶ 消息隊列之RabbitMQ-分布式部署

RabbitMQ分布式部署有3種方式:

Federation與Shovel都是以插件的形式來實現,復雜性相對高,而集群是RabbitMQ的自帶屬性,相對簡單。

這三種方式並不是互斥的,可以根據需求選擇相互組合來達到目的。

RabbitMQ本身是基於Erlang編寫,Erlang語言天生具備分布式特性(通過同步Erlang集群各節點的magic cookie來實現)。

因此,RabbitMQ天然支持Clustering。這使得RabbitMQ本身不需要像ActiveMQ、Kafka那樣通過ZooKeeper分別來實現HA方案和保存集群的元數據。集群是保證可靠性的一種方式,同時可以通過水平擴展以達到增加消息吞吐量能力的目的。

我們把部署RabbitMQ的機器稱為節點,也就是broker。broker有2種類型節點: 磁碟節點 內存節點 。顧名思義,磁碟節點的broker把元數據存儲在磁碟中,內存節點把元數據存儲在內存中,很明顯,磁碟節點的broker在重啟後元數據可以通過讀取磁碟進行重建,保證了元數據不丟失,內存節點的broker可以獲得更高的性能,但在重啟後元數據就都丟了。

元數據包含以下內容:

單節點系統必須是磁碟節點 ,否則每次你重啟RabbitMQ之後所有的系統配置信息都會丟失。

集群中至少有一個磁碟節點 ,當節點加入和離開集群時,必須通知磁碟 節點。

如果集群中的唯一一個磁碟節點,結果這個磁碟節點還崩潰了,那會發生什麼情況?集群依然可以繼續路由消息(因為其他節點元數據在還存在),但無法做以下操作:

也就是說,如果唯一磁碟的磁碟節點崩潰, 集群是可以保持運行的,但不能更改任何東西 。為了增加可靠性,一般會在集群中設置兩個磁碟節點,只要任何一個處於工作狀態,就可以保障集群的正常服務。

RabbitMQ的集群模式分為兩種: 普通模式 鏡像模式

普通模式,也是默認的集群模式。

對於Queue來說, 消息實體只存在於其中一個節點 ,A、B兩個節點僅有相同的元數據,即隊列結構。當消息進入A節點的Queue中後,consumer從B節點拉取時,RabbitMQ會臨時在A、B間進行消息傳輸,把A中的消息實體取出並經過B發送給consumer。所以consumer應盡量連接每一個節點,從中取消息。即對於同一個邏輯隊列,要在多個節點建立物理Queue。否則無論consumer連A或B,出口總在A,會產生瓶頸。

隊列所在的節點稱為 宿主節點

隊列創建時,只會在宿主節點創建隊列的進程,宿主節點包含完整的隊列信息,包括元數據、狀態、內容等等。因此, 只有隊列的宿主節點才能知道隊列的所有信息

隊列創建後,集群只會同步隊列和交換器的元數據到集群中的其他節點,並不會同步隊列本身,因此 非宿主節點就只知道隊列的元數據和指向該隊列宿主節點的指針

假如現在一個客戶端需要對Queue A進行發布或者訂閱,發起與集群的連接,有兩種可能的場景:

由於節點之間存在路由轉發的情況,對延遲非常敏感,應當只在本地區域網內使用,在廣域網中不應該使用集群,而應該用Federation或者Shovel代替。

這樣的設計,保證了不論從哪個broker中均可以消費所有隊列的數據,並分擔了負載,因此,增加broker可以線性提高服務的性能和吞吐量。

但該方案也有顯著的缺陷,那就是 不能保證消息不會丟失 。當集群中某一節點崩潰時,崩潰節點所在的隊列進程和關聯的綁定都會消失,附加在那些隊列上的消費者也會丟失其訂閱信息,匹配該隊列的新消息也會丟失。比如A為宿主節點,當A節點故障後,B節點無法取到A節點中還未消費的消息實體。如果做了消息持久化,那麼得等A節點恢復,然後才可被消費;如果沒有持久化的話,然後就沒有然後了……

肯定有不少同學會問,想要實現HA方案,那將RabbitMQ集群中的所有Queue的完整數據在所有節點上都保存一份不就可以了嗎?比如類似MySQL的主主模式,任何一個節點出現故障或者宕機不可用時,那麼使用者的客戶端只要能連接至其他節點,不就能夠照常完成消息的發布和訂閱嗎?

RabbitMQ這么設計是基於性能和存儲空間上來考慮:

引入 鏡像隊列 (Mirror Queue)的機制,可以將隊列鏡像到集群中的其他Broker節點之上,如果集群中的一個節點失效了,隊列能夠自動切換到鏡像中的另一個節點上以保證服務的可用性。

一個鏡像隊列中包含有1個主節點master和若干個從節點slave。其主從節點包含如下幾個特點:

該模式和普通模式不同之處在於,消息實體會主動在鏡像節點間同步,而不是在consumer取數據時臨時拉取。該模式帶來的副作用也很明顯,除了降低系統性能外,如果鏡像隊列數量過多,加之大量的消息進入,集群內部的網路帶寬將會被這種同步通訊大大消耗掉。所以在對可靠性要求較高的場合中適用。

一個隊列想做成鏡像隊列,需要先設置policy,然後客戶端創建隊列的時候,rabbitmq集群根據隊列名稱自動設置為普通隊列還是鏡像隊列。

鏡像隊列的配置通過添加policy完成,policy添加的命令為:

例如,對隊列名稱以hello開頭的所有隊列進行鏡像,並在集群的兩個節點上完成鏡像,policy的設置命令為:

rabbitmqctl set_policy hello-ha "^hello" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

通常隊列由兩部分組成:一部分是AMQQueue,負責AMQP協議相關的消息處理,即接收生產者發布的消息、向消費者投遞消息、處理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相關的介面供AMQQueue調用,完成消息的存儲以及可能的持久化工作等。

鏡像隊列基本上就是一個特殊的BackingQueue,它內部包裹了一個普通的BackingQueue做本地消息持久化處理,在此基礎上增加了將消息和ack復制到所有鏡像的功能。所有對mirror_queue_master的操作,會通過組播GM(下面會講到)的方式同步到各slave節點。GM負責消息的廣播,mirror_queue_slave負責回調處理,而master上的回調處理是由coordinator負責完成。mirror_queue_slave中包含了普通的BackingQueue進行消息的存儲,master節點中BackingQueue包含在mirror_queue_master中由AMQQueue進行調用。

消息的發布(除了Basic.Publish之外)與消費都是通過master節點完成。master節點對消息進行處理的同時將消息的處理動作通過GM廣播給所有的slave節點,slave節點的GM收到消息後,通過回調交由mirror_queue_slave進行實際的處理。

GM(Guarenteed Multicast) 是一種可靠的組播通訊協議,該協議能夠保證組播消息的原子性,即保證組中活著的節點要麼都收到消息要麼都收不到。它的實現大致如下:

將所有的節點形成一個循環鏈表,每個節點都會監控位於自己左右兩邊的節點,當有節點新增時,相鄰的節點保證當前廣播的消息會復制到新的節點上;當有節點失效時,相鄰的節點會接管保證本次廣播的消息會復制到所有的節點。在master節點和slave節點上的這些gm形成一個group,group(gm_group)的信息會記錄在mnesia中。不同的鏡像隊列形成不同的group。消息從master節點對於的gm發出後,順著鏈表依次傳送到所有的節點,由於所有節點組成一個循環鏈表,master節點對應的gm最終會收到自己發送的消息,這個時候master節點就知道消息已經復制到所有的slave節點了。

slave節點先從gm_group中獲取對應group的所有成員信息,然後隨機選擇一個節點並向這個節點發送請求,這個節點收到請求後,更新gm_group對應的信息,同時通知左右節點更新鄰居信息(調整對左右節點的監控)及當前正在廣播的消息,然後回復通知請求節點成功加入group。請求加入group的節點收到回復後再更新rabbit_queue中的相關信息,並根據需要進行消息的同步。

當slave節點失效時,僅僅是相鄰節點感知,然後重新調整鄰居節點信息、更新rabbit_queue、gm_group的記錄等。如果是master節點失效,"資格最老"的slave節點被提升為master節點,slave節點會創建出新的coordinator,並告知gm修改回調處理為coordinator,原來的mirror_queue_slave充當amqqueue_process處理生產者發布的消息,向消費者投遞消息等。

上面提到如果是slave節點失效,只有相鄰的節點能感知到,那麼master節點失效是不是也是只有相鄰的節點能感知到?假如是這樣的話,如果相鄰的節點不是"資格最老"的節點,怎麼通知"資格最老"的節點提升為新的master節點呢?

實際上,所有的slave節點在加入group時,mirror_queue_slave進程會對master節點的amqqueue_process進程(也可能是mirror_queue_slave進程)進行監控,如果master節點失效的話,mirror_queue_slave會感知,然後再通過gm進行廣播,這樣所有的節點最終都會知道master節點失效。當然,只有"資格最老"的節點會提升自己為新的master。

消息從master節點發出,順著節點鏈表發送。在這期間,所有的slave節點都會對消息進行緩存,當master節點收到自己發送的消息後,會再次廣播ack消息,同樣ack消息會順著節點鏈表經過所有的slave節點,其作用是通知slave節點可以清除緩存的消息,當ack消息回到master節點時對應廣播消息的生命周期結束。

下圖為一個簡單的示意圖,A節點為master節點,廣播一條內容為"test"的消息。"1"表示消息為廣播的第一條消息;"id=A"表示消息的發送者為節點A。右邊是slave節點記錄的狀態信息。

為什麼所有的節點都需要緩存一份發布的消息呢?

master發布的消息是依次經過所有slave節點,在這期間的任何時刻,有可能有節點失效,那麼相鄰的節點可能需要重新發送給新的節點。例如,A->B->C->D->A形成的循環鏈表,A為master節點,廣播消息發送給節點B,B再發送給C,如果節點C收到B發送的消息還未發送給D時異常結束了,那麼節點B感知後節點C失效後需要重新將消息發送給D。同樣,如果B節點將消息發送給C後,B,C節點中新增了E節點,那麼B節點需要再將消息發送給新增的E節點。

配置鏡像隊列的時候有個 ha-sync-mode 屬性,這個有什麼用呢?

新節點加入到group後,最多能從左邊節點獲取到當前正在廣播的消息內容,加入group之前已經廣播的消息則無法獲取到。如果此時master節點不幸失效,而新節點有恰好成為了新的master,那麼加入group之前已經廣播的消息則會全部丟失。

注意:這里的消息具體是指新節點加入前已經發布並復制到所有slave節點的消息,並且這些消息還未被消費者消費或者未被消費者確認。如果新節點加入前,所有廣播的消息被消費者消費並確認了,master節點刪除消息的同時會通知slave節點完成相應動作。這種情況等同於新節點加入前沒有發布任何消息。

避免這種問題的解決辦法就是對新的slave節點進行消息同步。當 ha-sync-mode 配置為自動同步(automatic)時,新節點加入group時會自動進行消息的同步;如果配置為manually則需要手動操作完成同步。

Federation直譯過來是聯邦,它的設計目標是使 RabbitMQ 在不同的 Broker 節點之間進行消息傳遞而無須建
立集群。具有以下特點:

那麼它到底有什麼用呢?我們可以從一個實際場景入手:

有兩個服務分別部署在國內和海外,它們之間需要通過消息隊列來通訊。

很明顯無論RabbitMQ部署在海外還是國內,另一方一定得忍受連接上的延遲。因此我們可以在海外和國內各部署一個MQ,這樣一來海外連接海外的MQ,國內連接國內,就不會有連接上的延遲了。

但這樣還會有問題,假設某生產者將消息存入海外MQ中的某個隊列 queueB , 在國內的服務想要消費 queueB 消息,消息的流轉及確認必然要忍受較大的網路延遲 ,內部編碼邏輯也會因這一因素變得更加復雜。

此外,服務可能得維護兩個MQ的配置,比如國內服務在生產消息時得使用國內MQ,消費消息時得監聽海外MQ的隊列,降低了系統的維護性。

可能有人想到可以用集群,但是RabbitMQ的集群對延遲非常敏感,一般部署在區域網內,如果部署在廣域網可能會產生網路分區等等問題。

這時候,Federation就派上用場了。它被設計成能夠容忍不穩定的網路連接情況,完全能夠滿足這樣的場景。

那使用Federation之後是怎樣的業務流程呢?

首先我們在海外MQ上定義exchangeA,它通過路由鍵「rkA」綁定著queueA。然後用Federation在exchangeA上建立一條 單向 連接到國內RabbitMQ,Federation則自動會在國內RabbitMQ建立一個exchangeA交換器(默認同名)。

這時候,如果部署在國內的client C在國內MQ上publish了一條消息,這條消息會通過 Federation link 轉發到海外MQ的交換器exchangeA中,最終消息會存入與 exchangeA 綁定的隊列 queueA 中,而client C也能立即得到返回。

實際上,Federation插件還會在國內MQ建立一個內部的交換器:exchangeA→ broker3 B(broker3是集群名),並通過路由鍵 "rkA"將它和國內MQ的exchangeA綁定起來。接下來還會在國內MQ上建立一個內部隊列federation: exchangeA->broker3 B,並與內部exchange綁定。這些操作都是內部的,對客戶端來說是透明的。

值得一提的是,Federation的連接是單向的,如果是在海外MQ的exchangeA上發送消息是不會轉到國內的。

這種在exchange上建立連接進行聯邦的,就叫做 聯邦交換器 。一個聯邦交換器接收上游(upstream)的信息,這里的上游指的是其他的MQ節點。

對比前面舉的例子,國內MQ就是上游,聯邦交換器能夠將原本發送給上游交換器的消息路由到本地的某個隊列中。

有聯邦交換器自然也有聯播隊列,聯邦隊列則允許一個本地消費者接收到來自上游隊列的消息 。

如圖,海外MQ有隊列A,給其設置一條鏈接,Federation則自動會在國內RabbitMQ建立一個隊列A(默認同名)。

當有消費者 ClinetA連接海外MQ並消費 queueA 中的消息時,如果隊列 queueA中本身有若干消息堆積,那麼 ClientA直接消費這些消息,此時海外MQ中的queueA並不會拉取國內中的 queueA 的消息;如果隊列 queueA中沒有消息堆積或者消息被消費完了,那麼它會通過 Federation link 拉取上游隊列 queueA 中的消息(如果有消息),然後存儲到本地,之後再被消費者 ClientA進行消費 。

首先開啟Federation 功能:

值得注意的是,當需要在集群中使用 Federation 功能的時候,集群中所有的節點都應該開啟 Federation 插件。

接下來我們要配置兩個東西:upstreams和Policies。

每個 upstream 用於定義與其他 Broker 建立連接的信息。

通用參數如下:

然後定義一個 Policy, 用於匹配交換器:

^exchange 意思是將匹配所有以exchange名字開頭的交換器,為它們在上游創建連接。這樣就創建了一個 Federation link。

Shovel是RabbitMQ的一個插件, 能夠可靠、持續地從一個Broker 中的隊列(作為源端,即source )拉取數據並轉發至另一個Broker 中的交換器(作為目的端,即destination )。作為源端的隊列和作為目的端的交換器可以同時位於同一個 Broker 上,也可以位於不同的 Broker 上。

使用Shovel有以下優勢:

使用Shovel時,通常源為隊列,目的為交換器:

但是,也可以源為隊列,目的為隊列。實際也是由交換器轉發,只不過這個交換器是默認交換器。配置交換器做為源也是可行的。實際上會在源端自動新建一個隊列,消息先存在這個隊列,再被Shovel移走。

使用Shovel插件命令:

Shovel 既可以部署在源端,也可以部署在目的端。有兩種方式可以部署 Shovel:

其主要差異如下:

來看一個使用Shovel治理消息堆積的案例。

當某個隊列中的消息堆積嚴重時,比如超過某個設定的閾值,就可以通過 Shovel 將隊列中的消息移交給另一個集群。

⑷ C#多線程 一個緩沖隊列,一個生產者線程,一個消費者線程,這兩個線程同時操作這個隊列,必須加互斥鎖嗎

加互斥鎖的目的,是解決多線程訪問同一資源而產生不可預期的異常,那麼你現在是一個線程只負責插入,另一線程負責查詢和刪除,查詢和刪除跟插入沒有關系的話,那麼顯然是不需要線程鎖的,你要是問隱患的話,那麼由於多線程或者多用戶的原因,頻繁操作幾張相關表,可能造成資料庫的表死鎖。