A. Zookpeer是什麼在系統中如何起作用
Zookeeper分布式服務框架是Apache Hadoop的一個子項目,它主要是用來解決分布式應用中經常遇到的一些數據管理問題。如:統一命名服務、狀態同步服務、集群管理、分布式應用配置項的管理等。
我們先看看它都提供了哪些功能,然後再看看使用它的這些功能能做點什麼。
簡單的說,zookeeper=文件系統+通知機制。
Zookeeper維護一個類似文件系統的數據結構:
每個子目錄項如 NameService 都被稱作為 znode,和文件系統一樣,我們能夠自由的增加、刪除znode,在一個znode下增加、刪除子znode,唯一的不同在於znode是可以存儲數據的。
客戶端注冊監聽它關心的目錄節點,當目錄節點發生變化(數據改變、被刪除、子目錄節點增加刪除)時,zookeeper會通知客戶端。
這個似乎最簡單,在zookeeper的文件系統里創建一個目錄,即有唯一的path。在我們使用tborg無法確定上遊程序的部署機器時即可與下遊程序約定好path,通過path即能互相探索發現,不見不散了。
程序總是需要配置的,如果程序分散部署在多台機器上,要逐個改變配置就變得困難。
可以把這些配置全部放到zookeeper上去,保存在 Zookeeper 的某個目錄節點中,然後所有相關應用程序對這個目錄節點進行監聽,一旦配置信息發生變化,每個應用程序就會收到 Zookeeper 的通知,然後從 Zookeeper 獲取新的配置信息應用到系統中就好。
集群管理無在乎兩點:是否有機器退出和加入、選舉master。
對於第一點,所有機器約定在父目錄GroupMembers下創建臨時目錄節點,然後監聽父目錄節點的子節點變化消息。一旦有機器掛掉,該機器與 zookeeper的連接斷開,其所創建的臨時目錄節點被刪除,所有其他機器都收到通知:某個兄弟目錄被刪除,於是,所有人都知道:它下船了。當然又會有新機器加入,也是類似:所有機器收到通知---新兄弟目錄加入,highcount又有了,有人上船了。
對於第二點,我們假設機器創建臨時順序編號目錄節點,每次選取編號最小的機器作為master就好。
有了zookeeper的一致性文件系統,鎖的問題變得容易。鎖服務可以分為兩類,一個是保持獨占,另一個是控制時序。
對於第一類,我們將zookeeper上的一個znode看作是一把鎖,通過createznode的方式來實現。所有客戶端都去創建 /distribute_lock 節點,最終成功創建的那個客戶端也即擁有了這把鎖。廁所有言:來也沖沖,去也沖沖,用完刪除掉自己創建的distribute_lock 節點就釋放出鎖。
對於第二類, /distribute_lock 已經預先存在,所有客戶端在它下面創建臨時順序編號目錄節點,和選master一樣,編號最小的獲得鎖,用完刪除,依次方便。
兩種類型的隊列:
1、 同步隊列,當一個隊列的成員都聚齊時,這個隊列才可用,否則一直等待所有成員到達。
2、隊列按照 FIFO 方式進行入隊和出隊操作。
第一類,在約定目錄下創建臨時目錄節點,監聽節點數目是否是我們要求的數目。
第二類,和分布式鎖服務中的控制時序場景基本原理一致,入列有編號,出列按編號。
Zookeeper中的角色主要有以下三類:
系統模型如圖所示:
Zookeeper的核心是原子廣播,這個機制保證了各個Server之間的同步。實現這個機制的協議叫做Zab協議。Zab協議有兩種模式,它們分 別是恢復模式(選主)和廣播模式(同步)。當服務啟動或者在領導者崩潰後,Zab就進入了恢復模式,當領導者被選舉出來,且大多數Server完成了和 leader的狀態同步以後,恢復模式就結束了。狀態同步保證了leader和Server具有相同的系統狀態。
為了保證事務的順序一致性,zookeeper採用了遞增的事務id號(zxid)來標識事務。所有的提議(proposal)都在被提出的時候加上 了zxid。實現中zxid是一個64位的數字,它高32位是epoch用來標識leader關系是否改變,每次一個leader被選出來,它都會有一個 新的epoch,標識當前屬於那個leader的統治時期。低32位用於遞增計數。
每個Server在工作過程中有三種狀態:
當leader崩潰或者leader失去大多數的follower,這時候zk進入恢復模式,恢復模式需要重新選舉出一個新的leader,讓所有的 Server都恢復到一個正確的狀態。Zk的選舉演算法有兩種:一種是基於basic paxos實現的,另外一種是基於fast paxos演算法實現的。系統默認的選舉演算法為fast paxos。先介紹basic paxos流程:
通過流程分析我們可以得出:要使Leader獲得多數Server的支持,則Server總數必須是奇數2n+1,且存活的Server的數目不得少於n+1.
選完leader以後,zk就進入狀態同步過程。
Leader主要有三個功能:
PING消息是指Learner的心跳信息;REQUEST消息是Follower發送的提議信息,包括寫請求及同步請求;ACK消息是 Follower的對提議的回復,超過半數的Follower通過,則commit該提議;REVALIDATE消息是用來延長SESSION有效時間。
Leader的工作流程簡圖如下所示,在實際實現中,流程要比下圖復雜得多,啟動了三個線程來實現功能。
Follower主要有四個功能:
Follower的消息循環處理如下幾種來自Leader的消息:
Follower的工作流程簡圖如下所示,在實際實現中,Follower是通過5個線程來實現功能的。
https://blog.csdn.net/xinguan1267/article/details/38422149
https://blog.csdn.net/gs80140/article/details/51496925
https://www.2cto.com/kf/201708/668587.html
https://blog.csdn.net/milhua/article/details/78931672
P.S. 這篇文章是本人對網路上關於ZK的文章閱讀之後整理所得,作為入門級的了解。個人覺得看了上面的內容就能基本了解Zookeeper的作用了,後面在結合實際項目使用加深自己的了解。
end
B. Pulsar 的消息存儲機制和 Bookie 的 GC 機制原理
[TOC]
本文是 Pulsar 技術系列中的一篇,主要簡單梳理了 Pulsar 消息存儲與 BookKeeper 存儲文件的清理機制。其中,BookKeeper 可以理解為一個 NoSQL 的存儲系統,默認使用 RocksDB 存儲索引數據。
Pulsar 的消息存儲在 BookKeeper 中,BookKeeper 是一個胖客戶的系統,客戶端部分稱為 BookKeeper,伺服器端集群中的每個存儲節點稱為 bookie。Pulsar 系統的 broker 作為 BookKeeper 存儲系統的客戶端,通過 BookKeeper 提供的客戶端 SDK 將 Pulsar 的消息存儲到 bookies 集群中。
Pulsar 中的每個 topic 的每個分區(非分區 topic,可以按照分區 0 理解,分區 topic 的編號是從 0 開始的),會對應一系列的 ledger,而每個 ledger 只會存儲對應分區下的消息。對於每個分區同時只會有一個 ledger 處於 open 即可寫狀態。
Pulsar 在生產消息,存儲消息時,會先找到當前分區使用的 ledger,然後生成當前消息對應的 entry ID,entry ID 在同一個 ledger 內是遞增的。非批量生產的情況(procer 端可以配置這個參數,默認是批量的),一個 entry 中包含一條消息。批量方式下,一個 entry 可能包含多條消息。而 bookie 中只會按照 entry 維度進行寫入、查找、獲取。
因此,每個 Pulsar 下的消息的 msgID 需要有四部分組成(老版本由三部分組成),分別為(ledgerID,entryID,partition-index,batch-index),其中,partition-index 在非分區 topic 的時候為 -1,batch-index 在非批量消息的時候為 -1。
每個 ledger,當存在的時長或保存的 entry 個數超過閾值後會進行切換,同一個 partition 下的,新的消息會存儲到下一個 ledger 中。Ledger 只是一個邏輯概念,是數據的一種邏輯組裝維度,並沒有對應的實體。
BookKeeper 集群中的每個 bookie 節點收到消息後,數據會分三部分進行存儲處理,分別為:journal 文件、entryLog 文件、索引文件。
其中 journal 文件,entry 數據是按照 wal 方式寫入的到 journal 文件中,每個 journal 文件有大小限制,當超過單個文件大小限制的時候會切換到下一個文件繼續寫,因為 journal 文件是實時刷盤的,所以為了提高性能,避免相互之間的讀寫 IO 相互影響,建議存儲目錄與存儲 entrylog 的目錄區分開,並且給每個 journal 文件的存儲目錄單獨掛載一塊硬碟(建議使用 ssd 硬碟)。journal 文件只會保存保存幾個,超過配置個數的文件將會被刪除。entry 存儲到 journal 文件完全是隨機的,先到先寫入,journal 文件是為了保證消息不丟失而設計的。
如下圖所示,每個 bookie 收到增加 entry 的請求後,會根據 ledger id 映射到存儲到那個 journal 目錄和 entry log 目錄,entry 數據會存儲在對應的目錄下。目前 bookie 不支持在運行過程中變更存儲目錄(使用過程中,增加或減少目錄會導致部分的數據查找不到)。
如下圖所示,bookie 收到 entry 寫入請求後,寫入 journal 文件的同時,也會保存到 write cache 中,write cache 分為兩部分,一部分是正在寫入的 write cache, 一部分是正在正在刷盤的部分,兩部分交替使用。
write cache 中有索引數據結構,可以通過索引查找到對應的 entry,write cache 中的索引是內存級別的,基於 bookie 自己定義的 ConcurrentLongLongPairHashMap 結構實現。
另外,每個 entorylog 的存儲目錄,會對應一個 類實例對象,而每個 對象裡面會有一個基於 RocksDB 實現的索引結構,通過這個索引可以快速的查到每個 entry 存儲在哪個 entrylog 文件中。每個 write cache 在增加 entry 的時候會進行排序處理,在同一個 write cache,同一個 ledger 下的數據是相鄰有序的,這樣在 write cache 中的數據 flush 到 entrylog 文件時,使得寫入到 entrylog 文件中的數據是局部有序的,這樣的設計能夠極大的提高後續的讀取效率。
中的索引數據也會隨著 entry 的刷盤而刷盤到索引文件中。在 bookie 宕機重啟時,可以通過 journal 文件和 entry log 文件還原數據,保證數據不丟失。
Pulsar consumer 在消費數據的時候,做了多層的緩存加速處理,如下圖所示:
獲取數據的順序如下:
上面每一步,如果能獲取到數據,都會直接返回,跳過後面的步驟。如果是從磁碟文件中獲取的數據,會在返回的時候將數據存儲到 read cache 中,另外如果是讀取磁碟的操作,會多讀取一部分磁碟上的時候,因為存儲的時候有局部有序的處理,獲取相鄰數據的概率非常大,這種處理的話會極大的提高後續獲取數據的效率。
我們在使用的過程中,應盡量避免或減少出現消費過老數據即觸發讀取磁碟文件中的消息的場景,以免對整體系統的性能造成影響。
BookKeeper 中的每個 bookie 都會周期的進行數據清理操作,默認 15 分鍾檢查處理一次,清理的主要流程如下:
通過上面的流程,我們可以了解 bookie 在清理 entrylog 文件時的大體流程。
需要特別說明的是,ledger 是否是可以刪除的,完全是客戶端的觸發的,在 Pulsar 中是 broker 觸發的。
broker 端有周期的處理線程(默認 2 分鍾),清理已經消費過的消息所在的 ledger 機制,獲取 topic 中包含的 cursor 最後確認的消息,將這個 topic 包含的 ledger 列表中,在這個 id 之前的(注意不包含當前的 ledger id)全部刪除(包括 zk 中的元數據,同時通知 bookie 刪除對應的 ledger)。
在運用的過程中我們多次遇到了 bookie 磁碟空間不足的場景,bookie 中存儲了大量的 entry log 文件。比較典型的原因主要有如下兩個。
原因一:
生產消息過於分散,例如,舉個極端的場景,1w 個 topic,每個 topic 生產一條,1w 個 topic 順序生產。這樣每個 topic 對應的 ledger 短時間內不會因為時長或者存儲大小進行切換,active 狀態的 ledger id 分散在大量的 entry log 文件中。這些 entry log 文件是不能刪除或者及時壓縮的。
如果遇到這種場景,可以通過重啟,強制 ledger 進行切換進行處理。當然如果這個時候消費進行沒有跟上,消費的 last ack 位置所在的 ledger 也是處於 active 狀態的,不能進行刪除。
原因二:
GC 時間過程,如果現存的 enrylog 文件比較多,且大量符合 minor 或 major gc 閾值,這樣,單次的 minor gc 或者 major gc 時間過長,在這段時間內是不能清理過期的 entry log 文件。
這是由於單次清理流程的順序執行導致的,只有上次一輪執行完,才會執行下一次。目前,這塊也在提優化流程,避免子流程執行實現過長,對整體產生影響。
C. Zookeeper 在 Kafka 中的作用
如上圖所示,kafaka集群的 broker,和 Consumer 都需要連接 Zookeeper。
Procer 直接連接 Broker。
Procer 把數據上傳到 Broker,Procer可以指定數據有幾個分區、幾個備份。上面的圖中,數據有兩個分區 0、1,每個分區都有自己的副本:0'、 1'。
黃色的分區為 leader,白色的為 follower。
leader 處理 partition 的所有讀寫請求,與此同時,follower會被動定期地去復制leader上的數據。 如下圖所示,紅色的為 leader,綠色的為 follower,leader復制自己到其他 Broker 中:
Topic 分區被放在不同的 Broker 中,保證 Procer 和 Consumer 錯開訪問 Broker,避免訪問單個 Broker造成過度的IO壓力,使得負載均衡。
Broker是分布式部署並且相互之間相互獨立,但是需要有一個注冊系統能夠將整個集群中的Broker管理起來 ,此時就使用到了Zookeeper。在Zookeeper上會有一個專門 用來進行Broker伺服器列表記錄 的節點:
/brokers/ids
每個Broker在啟動時,都會到Zookeeper上進行注冊,即到/brokers/ids下創建屬於自己的節點,如/brokers/ids/[0...N]。
Kafka使用了全局唯一的數字來指代每個Broker伺服器,不同的Broker必須使用不同的Broker ID進行注冊,創建完節點後, 每個Broker就會將自己的IP地址和埠信息記錄 到該節點中去。其中,Broker創建的節點類型是臨時節點,一旦Broker宕機,則對應的臨時節點也會被自動刪除。
在Kafka中,同一個 Topic的消息會被分成多個分區 並將其分布在多個Broker上, 這些分區信息及與Broker的對應關系 也都是由Zookeeper在維護,由專門的節點來記錄,如:
/borkers/topics
Kafka中每個Topic都會以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。Broker伺服器啟動後,會到對應Topic節點(/brokers/topics)上注冊自己的Broker ID並寫入針對該Topic的分區總數,如/brokers/topics/login/3->2,這個節點表示Broker ID為3的一個Broker伺服器,對於"login"這個Topic的消息,提供了2個分區進行消息存儲,同樣,這個分區節點也是臨時節點。
由於同一個Topic消息會被分區並將其分布在多個Broker上,因此, 生產者需要將消息合理地發送到這些分布式的Broker上 ,那麼如何實現生產者的負載均衡,Kafka支持傳統的四層負載均衡,也支持Zookeeper方式實現負載均衡。
(1) 四層負載均衡,根據生產者的IP地址和埠來為其確定一個相關聯的Broker。通常,一個生產者只會對應單個Broker,然後該生產者產生的消息都發往該Broker。這種方式邏輯簡單,每個生產者不需要同其他系統建立額外的TCP連接,只需要和Broker維護單個TCP連接即可。但是,其無法做到真正的負載均衡,因為實際系統中的每個生產者產生的消息量及每個Broker的消息存儲量都是不一樣的,如果有些生產者產生的消息遠多於其他生產者的話,那麼會導致不同的Broker接收到的消息總數差異巨大,同時,生產者也無法實時感知到Broker的新增和刪除。
(2) 使用Zookeeper進行負載均衡,由於每個Broker啟動時,都會完成Broker注冊過程,生產者會通過該節點的變化來動態地感知到Broker伺服器列表的變更,這樣就可以實現動態的負載均衡機制。
與生產者類似,Kafka中的消費者同樣需要進行負載均衡來實現多個消費者合理地從對應的Broker伺服器上接收消息,每個消費者分組包含若干消費者, 每條消息都只會發送給分組中的一個消費者 ,不同的消費者分組消費自己特定的Topic下面的消息,互不幹擾。
消費組 (Consumer Group):
consumer group 下有多個 Consumer(消費者)。
對於每個消費者組 (Consumer Group),Kafka都會為其分配一個全局唯一的Group ID,Group 內部的所有消費者共享該 ID。訂閱的topic下的每個分區只能分配給某個 group 下的一個consumer(當然該分區還可以被分配給其他group)。
同時,Kafka為每個消費者分配一個Consumer ID,通常採用"Hostname:UUID"形式表示。
在Kafka中,規定了 每個消息分區 只能被同組的一個消費者進行消費 ,因此,需要在 Zookeeper 上記錄 消息分區 與 Consumer 之間的關系,每個消費者一旦確定了對一個消息分區的消費權力,需要將其Consumer ID 寫入到 Zookeeper 對應消息分區的臨時節點上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一個 消息分區 的標識,節點內容就是該 消息分區 上 消費者的Consumer ID。
在消費者對指定消息分區進行消息消費的過程中, 需要定時地將分區消息的消費進度Offset記錄到Zookeeper上 ,以便在該消費者進行重啟或者其他消費者重新接管該消息分區的消息消費後,能夠從之前的進度開始繼續進行消息消費。Offset在Zookeeper中由一個專門節點進行記錄,其節點路徑為:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
節點內容就是Offset的值。
消費者伺服器在初始化啟動時加入消費者分組的步驟如下
注冊到消費者分組。每個消費者伺服器啟動時,都會到Zookeeper的指定節點下創建一個屬於自己的消費者節點,例如/consumers/[group_id]/ids/[consumer_id],完成節點創建後,消費者就會將自己訂閱的Topic信息寫入該臨時節點。
對 消費者分組 中的 消費者 的變化注冊監聽 。每個 消費者 都需要關注所屬 消費者分組 中其他消費者伺服器的變化情況,即對/consumers/[group_id]/ids節點注冊子節點變化的Watcher監聽,一旦發現消費者新增或減少,就觸發消費者的負載均衡。
對Broker伺服器變化注冊監聽 。消費者需要對/broker/ids/[0-N]中的節點進行監聽,如果發現Broker伺服器列表發生變化,那麼就根據具體情況來決定是否需要進行消費者負載均衡。
進行消費者負載均衡 。為了讓同一個Topic下不同分區的消息盡量均衡地被多個 消費者 消費而進行 消費者 與 消息 分區分配的過程,通常,對於一個消費者分組,如果組內的消費者伺服器發生變更或Broker伺服器發生變更,會發出消費者負載均衡。
以下是kafka在zookeep中的詳細存儲結構圖:
早期版本的 kafka 用 zk 做 meta 信息存儲,consumer 的消費狀態,group 的管理以及 offse t的值。考慮到zk本身的一些因素以及整個架構較大概率存在單點問題,新版本中確實逐漸弱化了zookeeper的作用。新的consumer使用了kafka內部的group coordination協議,也減少了對zookeeper的依賴
D. Kafka核心組件之控制器和協調器
[TOC]
我們已經知道Kafka的集群由n個的broker所組成,每個broker就是一個kafka的實例或者稱之為kafka的服務。其實控制器也是一個broker,控制器也叫leader broker。
他除了具有一般broker的功能外,還負責分區leader的選取,也就是負責選舉partition的leader replica。
kafka每個broker啟動的時候,都會實例化一個KafkaController,並將broker的id注冊到zookeeper,集群在啟動過程中,通過選舉機制選舉出其中一個broker作為leader,也就是前面所說的控制器。
包括集群啟動在內,有三種情況觸發控制器選舉:
1、集群啟動
2、控制器所在代理發生故障
3、zookeeper心跳感知,控制器與自己的session過期
按照慣例,先看圖。我們根據下圖來講解集群啟動時,控制器選舉過程。
假設此集群有三個broker,同時啟動。
(一)3個broker從zookeeper獲取/controller臨時節點信息。/controller存儲的是選舉出來的leader信息。此舉是為了確認是否已經存在leader。
(二)如果還沒有選舉出leader,那麼此節點是不存在的,返回-1。如果返回的不是-1,而是leader的json數據,那麼說明已經有leader存在,選舉結束。
(三)三個broker發現返回-1,了解到目前沒有leader,於是均會觸發向臨時節點/controller寫入自己的信息。最先寫入的就會成為leader。
(四)假設broker 0的速度最快,他先寫入了/controller節點,那麼他就成為了leader。而broker1、broker2很不幸,因為晚了一步,他們在寫/controller的過程中會拋出ZkNodeExistsException,也就是zk告訴他們,此節點已經存在了。
經過以上四步,broker 0成功寫入/controller節點,其它broker寫入失敗了,所以broker 0成功當選leader。
此外zk中還有controller_epoch節點,存儲了leader的變更次數,初始值為0,以後leader每變一次,該值+1。所有向控制器發起的請求,都會攜帶此值。如果控制器和自己內存中比較,請求值小,說明kafka集群已經發生了新的選舉,此請求過期,此請求無效。如果請求值大於控制器內存的值,說明已經有新的控制器當選了,自己已經退位,請求無效。kafka通過controller_epoch保證集群控制器的唯一性及操作的一致性。
由此可見,Kafka控制器選舉就是看誰先爭搶到/controller節點寫入自身信息。
控制器的初始化,其實是初始化控制器所用到的組件及監聽器,准備元數據。
前面提到過每個broker都會實例化並啟動一個KafkaController。KafkaController和他的組件關系,以及各個組件的介紹如下圖:
圖中箭頭為組件層級關系,組件下面還會再初始化其他組件。可見控制器內部還是有些復雜的,主要有以下組件:
1、ControllerContext,此對象存儲了控制器工作需要的所有上下文信息,包括存活的代理、所有主題及分區分配方案、每個分區的AR、leader、ISR等信息。
2、一系列的listener,通過對zookeeper的監聽,觸發相應的操作,黃色的框的均為listener
3、分區和副本狀態機,管理分區和副本。
4、當前代理選舉器ZookeeperLeaderElector,此選舉器有上位和退位的相關回調方法。
5、分區leader選舉器,PartitionLeaderSelector
6、主題刪除管理器,TopicDeletetionManager
7、leader向broker批量通信的ControllerBrokerRequestBatch。緩存狀態機處理後產生的request,然後統一發送出去。
8、控制器平衡操作的KafkaScheler,僅在broker作為leader時有效。
Kafka集群的一些重要信息都記錄在ZK中,比如集群的所有代理節點、主題的所有分區、分區的副本信息(副本集、主副本、同步的副本集)。每個broker都有一個控制器,為了管理整個集群Kafka選利用zk選舉模式,為整個集群選舉一個「中央控制器」或」主控制器「,控制器其實就是一個broker節點,除了一般broker功能外,還具有分區首領選舉功能。中央控制器管理所有節點的信息,並通過向ZK注冊各種監聽事件來管理整個集群節點、分區的leader的選舉、再平衡等問題。外部事件會更新ZK的數據,ZK中的數據一旦發生變化,控制器都要做不同的響應處理。
故障轉移其實就是leader所在broker發生故障,leader轉移為其他的broker。轉移的過程就是重新選舉leader的過程。
重新選舉leader後,需要為該broker注冊相應許可權,調用的是ZookeeperLeaderElector的onControllerFailover()方法。在這個方法中初始化和啟動了一系列的組件來完成leader的各種操作。具體如下,其實和控制器初始化有很大的相似度。
1、注冊分區管理的相關監聽器
2、注冊主題管理的相關監聽
3、注冊代理變化監聽器
4、重新初始化ControllerContext,
5、啟動控制器和其他代理之間通信的ControllerChannelManager
6、創建用於刪除主題的TopicDeletionManager對象,並啟動。
7、啟動分區狀態機和副本狀態機
8、輪詢每個主題,添加監聽分區變化的
9、如果設置了分區平衡定時操作,那麼創建分區平衡的定時任務,默認300秒檢查並執行。
除了這些組件的啟動外,onControllerFailover方法中還做了如下操作:
1、/controller_epoch值+1,並且更新到ControllerContext
2、檢查是否出發分區重分配,並做相關操作
3、檢查需要將優先副本選為leader,並做相關操作
4、向kafka集群所有代理發送更新元數據的請求。
下面來看leader許可權被取消時,調用的方法onControllerResignation
1、該方法中注銷了控制器的許可權。取消在zookeeper中對於分區、副本感知的相應監聽器的監聽。
2、關閉啟動的各個組件
3、最後把ControllerContext中記錄控制器版本的數值清零,並設置當前broker為RunnignAsBroker,變為普通的broker。
通過對控制器啟動過程的學習,我們應該已經對kafka工作的原理有了了解, 核心是監聽zookeeper的相關節點,節點變化時觸發相應的操作 。
有新的broker加入集群時,稱為代理上線。反之,當broker關閉,推出集群時,稱為代理下線。
代理上線:
1、新代理啟動時向/brokers/ids寫數據
2、BrokerChangeListener監聽到變化。對新上線節點調用controllerChannelManager.addBroker(),完成新上線代理網路層初始化
3、調用KafkaController.onBrokerStartup()處理
3.5恢復因新代理上線暫停的刪除主題操作線程
代理下線:
1、查找下線節點集合
2、輪詢下線節點,調用controllerChannelManager.removeBroker(),關閉每個下線節點網路連接。清空下線節點消息隊列,關閉下線節點request請求
3、輪詢下線節點,調用KafkaController.onBrokerFailure處理
4、向集群全部存活代理發送updateMetadataRequest請求
顧名思義,協調器負責協調工作。本節所講的協調器,是用來協調消費者工作分配的。簡單點說,就是消費者啟動後,到可以正常消費前,這個階段的初始化工作。消費者能夠正常運轉起來,全有賴於協調器。
主要的協調器有如下兩個:
1、消費者協調器(ConsumerCoordinator)
2、組協調器(GroupCoordinator)
kafka引入協調器有其歷史過程,原來consumer信息依賴於zookeeper存儲,當代理或消費者發生變化時,引發消費者平衡,此時消費者之間是互不透明的,每個消費者和zookeeper單獨通信,容易造成羊群效應和腦裂問題。
為了解決這些問題,kafka引入了協調器。服務端引入組協調器(GroupCoordinator),消費者端引入消費者協調器(ConsumerCoordinator)。每個broker啟動的時候,都會創建GroupCoordinator實例,管理部分消費組(集群負載均衡)和組下每個消費者消費的偏移量(offset)。每個consumer實例化時,同時實例化一個ConsumerCoordinator對象,負責同一個消費組下各個消費者和服務端組協調器之前的通信。如下圖:
消費者協調器,可以看作是消費者做操作的代理類(其實並不是),消費者很多操作通過消費者協調器進行處理。
消費者協調器主要負責如下工作:
1、更新消費者緩存的MetaData
2、向組協調器申請加入組
3、消費者加入組後的相應處理
4、請求離開消費組
5、向組協調器提交偏移量
6、通過心跳,保持組協調器的連接感知。
7、被組協調器選為leader的消費者的協調器,負責消費者分區分配。分配結果發送給組協調器。
8、非leader的消費者,通過消費者協調器和組協調器同步分配結果。
消費者協調器主要依賴的組件和說明見下圖:
可以看到這些組件和消費者協調器擔負的工作是可以對照上的。
組協調器負責處理消費者協調器發過來的各種請求。它主要提供如下功能:
組協調器在broker啟動的時候實例化,每個組協調器負責一部分消費組的管理。它主要依賴的組件見下圖:
這些組件也是和組協調器的功能能夠對應上的。具體內容不在詳述。
下圖展示了消費者啟動選取leader、入組的過程。
消費者入組的過程,很好的展示了消費者協調器和組協調器之間是如何配合工作的。leader consumer會承擔分區分配的工作,這樣kafka集群的壓力會小很多。同組的consumer通過組協調器保持同步。消費者和分區的對應關系持久化在kafka內部主題。
消費者消費時,會在本地維護消費到的位置(offset),就是偏移量,這樣下次消費才知道從哪裡開始消費。如果整個環境沒有變化,這樣做就足夠了。但一旦消費者平衡操作或者分區變化後,消費者不再對應原來的分區,而每個消費者的offset也沒有同步到伺服器,這樣就無法接著前任的工作繼續進行了。
因此只有把消費偏移量定期發送到伺服器,由GroupCoordinator集中式管理,分區重分配後,各個消費者從GroupCoordinator讀取自己對應分區的offset,在新的分區上繼續前任的工作。
下圖展示了不提交offset到服務端的問題:
開始時,consumer 0消費partition 0 和1,後來由於新的consumer 2入組,分區重新進行了分配。consumer 0不再消費partition2,而由consumer 2來消費partition 2,但由於consumer之間是不能通訊的,所有consumer2並不知道從哪裡開始自己的消費。
因此consumer需要定期提交自己消費的offset到服務端,這樣在重分區操作後,每個consumer都能在服務端查到分配給自己的partition所消費到的offset,繼續消費。
由於kafka有高可用和橫向擴展的特性,當有新的分區出現或者新的消費入組後,需要重新分配消費者對應的分區,所以如果偏移量提交的有問題,會重復消費或者丟消息。偏移量提交的時機和方式要格外注意!!
1、自動提交偏移量
設置 enable.auto.commit為true,設定好周期,默認5s。消費者每次調用輪詢消息的poll() 方法時,會檢查是否超過了5s沒有提交偏移量,如果是,提交上一次輪詢返回的偏移量。
這樣做很方便,但是會帶來重復消費的問題。假如最近一次偏移量提交3s後,觸發了再均衡,伺服器端存儲的還是上次提交的偏移量,那麼再均衡結束後,新的消費者會從最後一次提交的偏移量開始拉取消息,此3s內消費的消息會被重復消費。
2、手動提交偏移量
設置 enable.auto.commit為false。程序中手動調用commitSync()提交偏移量,此時提交的是poll方法返回的最新的偏移量。
commitSync()是同步提交偏移量,主程序會一直阻塞,偏移量提交成功後才往下運行。這樣會限製程序的吞吐量。如果降低提交頻次,又很容易發生重復消費。
這里我們可以使用commitAsync()非同步提交偏移量。只管提交,而不會等待broker返回提交結果
commitSync只要沒有發生不可恢復錯誤,會進行重試,直到成功。而commitAsync不會進行重試,失敗就是失敗了。commitAsync不重試,是因為重試提交時,可能已經有其它更大偏移量已經提交成功了,如果此時重試提交成功,那麼更小的偏移量會覆蓋大的偏移量。那麼如果此時發生再均衡,新的消費者將會重復消費消息。
E. kafka術語和配置介紹
procer 是生產者,負責消息生產,上遊程序中按照標準的消息格式組裝(按照每個消息事件的欄位定義)發送到指定的topic。procer生產消息的時候,不會因為consumer處理能力不夠,而阻塞procer的生產。consumer會從指定的topic 拉取消息,然後處理消費,並提交offset(消息處理偏移量,消費掉的消息並不會主動刪除,而是kafka系統根據保存周期自動消除)。
topic是消費分類存儲的隊列,可以按照消息類型來分topic存儲。
replication是topic復制副本個數,用於解決數據丟失,防止leader topic宕機後,其他副本可以快代替。
broker是緩存代理,Kafka集群中的一台或多台伺服器統稱broker,用來保存procer發送的消息。Broker沒有副本機制,一旦broker宕機,該broker的消息將都不可用。
partition是topic的物理分組,在創建topic的時候,可以指定partition 數量。每個partition是邏輯有序的,保證每個消息都是順序插入的,而且每個消息的offset在不同partition的是唯一不同的
偏移量。kafka為每條在分區的消息保存一個偏移量offset,這也是消費者在分區的位置。比如一個偏移量是5的消費者,表示已經消費了從0-4偏移量的消息,下一個要消費的消息的偏移量是5。每次消息處理完後,要麼主動提交offset,要麼自動提交,把offset偏移到下一位,如處理offset=6消息。在kafka配置中,如果enable_auto_commit=True和auto_commit_interval_ms=xx,那表示每xx 毫秒自動提交偏移量
分組。是指在消費同一topic的不同consumer。每個consumer都有唯一的groupId,同一groupId 屬於同一個group。不同groupId的consumer相互不影響。對於一個topic,同一個group的consumer數量不能超過 partition數量。比如,Topic A 有 16個partition,某一個group下有2個consumer,那2個consumer分別消費8個partition,而這個group的consumer數量最多不能超過16個。
kafka的配置主要分四類,分別是zookeeper、server、consumer、procer。其他的配置可以忽略。
zk的配置比較簡單,也可以默認不改.dataDir是zk存儲節點配置的目錄地址,clientPort是zk啟動的埠,默認2181,maxClientCnxns是限制ip的連接此處,設置0表示無連接次數,一般情況根據業務部署情況,配置合理的值。
F. 如何更改手機微信的消息存儲位置
手機的設置——應用程序那 找到微信後可以選擇放在手機或者內存卡里
也可以用360軟體搬家功能
G. kafka問題求助
Kafka是由LinkedIn設計的一個高吞吐量、分布式、基於發布訂閱模式的消息系統,使用Scala編寫,它以可水平擴展、可靠性、非同步通信和高吞吐率等特性而被廣泛使用。目前越來越多的開源分布式處理系統都支持與Kafka集成,其中Spark Streaming作為後端流引擎配合Kafka作為前端消息系統正成為當前流處理系統的主流架構之一。
然而,當下越來越多的安全漏洞、數據泄露等問題的爆發,安全正成為系統選型不得不考慮的問題,Kafka由於其安全機制的匱乏,也導致其在數據敏感行業的部署存在嚴重的安全隱患。本文將圍繞Kafka,先介紹其整體架構和關鍵概念,再深入分析其架構之中存在的安全問題,最後分享下Transwarp在Kafka安全性上所做的工作及其使用方法。
Kafka架構與安全
首先,我們來了解下有關Kafka的幾個基本概念:
Topic:Kafka把接收的消息按種類劃分,每個種類都稱之為Topic,由唯一的Topic Name標識。
Procer:向Topic發布消息的進程稱為Procer。
Consumer:從Topic訂閱消息的進程稱為Consumer。
Broker:Kafka集群包含一個或多個伺服器,這種伺服器被稱為Broker。
Kafka的整體架構如下圖所示,典型的Kafka集群包含一組發布消息的Procer,一組管理Topic的Broker,和一組訂閱消息的Consumer。Topic可以有多個分區,每個分區只存儲於一個Broker。Procer可以按照一定的策略將消息劃分給指定的分區,如簡單的輪詢各個分區或者按照特定欄位的Hash值指定分區。Broker需要通過ZooKeeper記錄集群的所有Broker、選舉分區的Leader,記錄Consumer的消費消息的偏移量,以及在Consumer Group發生變化時進行relalance. Broker接收和發送消息是被動的:由Procer主動發送消息,Consumer主動拉取消息。
然而,分析Kafka框架,我們會發現以下嚴重的安全問題:
1.網路中的任何一台主機,都可以通過啟動Broker進程而加入Kafka集群,能夠接收Procer的消息,能夠篡改消息並發送給Consumer。
2.網路中的任何一台主機,都可以啟動惡意的Procer/Consumer連接到Broker,發送非法消息或拉取隱私消息數據。
3.Broker不支持連接到啟用Kerberos認證的ZooKeeper集群,沒有對存放在ZooKeeper上的數據設置許可權。任意用戶都能夠直接訪問ZooKeeper集群,對這些數據進行修改或刪除。
4.Kafka中的Topic不支持設置訪問控制列表,任意連接到Kafka集群的Consumer(或Procer)都能對任意Topic讀取(或發送)消息。
隨著Kafka應用場景越來越廣泛,特別是一些數據隱私程度較高的領域(如道路交通的視頻監控),上述安全問題的存在猶如一顆定時炸彈,一旦內網被黑客入侵或者內部出現惡意用戶,所有的隱私數據(如車輛出行記錄)都能夠輕易地被竊取,而無需攻破Broker所在的伺服器。
Kafka安全設計
基於上述分析,Transwarp從以下兩個方面增強Kafka的安全性:
身份認證(Authentication):設計並實現了基於Kerberos和基於IP的兩種身份認證機制。前者為強身份認證,相比於後者具有更好的安全性,後者適用於IP地址可信的網路環境,相比於前者部署更為簡便。
許可權控制(Authorization):設計並實現了Topic級別的許可權模型。Topic的許可權分為READ(從Topic拉取數據)、WRITE(向Topic中生產數據)、CREATE(創建Topic)和DELETE(刪除Topic)。
基於Kerberos的身份機制如下圖所示:
Broker啟動時,需要使用配置文件中的身份和密鑰文件向KDC(Kerberos伺服器)認證,認證通過則加入Kafka集群,否則報錯退出。
Procer(或Consumer)啟動後需要經過如下步驟與Broker建立安全的Socket連接:
1.Procer向KDC認證身份,通過則得到TGT(票證請求票證),否則報錯退出
2.Procer使用TGT向KDC請求Kafka服務,KDC驗證TGT並向Procer返回SessionKey(會話密鑰)和ServiceTicket(服務票證)
3.Procer使用SessionKey和ServiceTicket與Broker建立連接,Broker使用自身的密鑰解密ServiceTicket,獲得與Procer通信的SessionKey,然後使用SessionKey驗證Procer的身份,通過則建立連接,否則拒絕連接。
ZooKeeper需要啟用Kerberos認證模式,保證Broker或Consumer與其的連接是安全的。
Topic的訪問控制列表(ACL)存儲於ZooKeeper中,存儲節點的路徑為/acl/<topic>/<user>,節點數據為R(ead)、W(rite)、C(reate)、D(elete)許可權的集合,如/acl/transaction/jack節點的數據為RW,則表示用戶jack能夠對transaction這個topic進行讀和寫。
另外,kafka為特權用戶,只有kafka用戶能夠賦予/取消許可權。因此,ACL相關的ZooKeeper節點許可權為kafka具有所有許可權,其他用戶不具有任何許可權。
構建安全的Kafka服務
首先,我們為Broker啟用Kerberos認證模式,配置文件為/etc/kafka/conf/server.properties,安全相關的參數如下所示:
其中,authentication參數表示認證模式,可選配置項為simple, kerberos和ipaddress,默認為simple。當認證模式為kerberos時,需要額外配置賬戶屬性principal和對應的密鑰文件路徑keytab.
認證模式為ipaddress時,Procer和Consumer創建時不需要做任何改變。而認證模式為kerberos時,需要預先創建好相應的principal和keytab,並使用API進行登錄,樣例代碼如下所示:
public class SecureProcer extends Thread {
private final kafka.javaapi.procer.Procer<Integer, String> procer;
private final String topic;
private final Properties props = new Properties();
public SecureProcer(String topic) {
AuthenticationManager.setAuthMethod(「kerberos」);
AuthenticationManager.login(「procer1″, 「/etc/procer1.keytab」);
props.put(「serializer.class」, 「kafka.serializer.StringEncoder」);
props.put(「metadata.broker.list」,
「172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092″);
// Use random partitioner. Don』t need the key type. Just set it to Integer.
// The message is of type String.
procer = new kafka.javaapi.procer.Procer<Integer, String>(
new ProcerConfig(props));
this.topic = topic;
}
. . .
Topic許可權管理
Topic的許可權管理主要是通過AuthorizationManager這個類來完成的,其類結構如下圖所示:
其中,resetPermission(user, Permissions, topic) 為重置user對topic的許可權。
grant(user, Permissions, topic) 為賦予user對topic許可權。
revoke(user, Permissions, topic) 為取消user對topic許可權。
isPermitted(user, Permissions, topic) 為檢查user對topic是否具有指定許可權。
調用grant或revoke進行許可權設置完成後,需要commit命令提交修改到ZooKeeper
Kerberos模式下,AuthorizationManager需要先使用AuthenticationManager.login方法登錄,與ZooKeeper建立安全的連接,再進行許可權設置。示例代碼如下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(「authentication」, 「kerberos」);
props.setProperty(「zookeeper.connect」, 「172.16.2.116:2181,172.16.2.117:2181,172.16.2.118:2181″);
props.setProperty(「principal」, 「kafka/host1@TDH」);
props.setProperty(「keytab」, 「/usr/lib/kafka/config/kafka.keytab」);
ZKConfig config = new ZKConfig(props);
AuthenticationManager.setAuthMethod(config.authentication());
AuthenticationManager.login(config.principal(), config.keytab());
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 172.16.1.87 on topic test
authzManager.resetPermission(「172.16.1.87″,
new Permissions(Permissions.READ, Permissions.WRITE), 「test」);
// grant permission WRITE to ip 172.16.1.87 on topic test
authzManager.grant(「172.16.1.87″, new Permissions(Permissions.CREATE), 「test」);
// revoke permission READ from ip 172.16.1.87 on topic test
authzManager.revoke(「172.16.1.87″, new Permissions(Permissions.READ), 「test」);
// commit the permission settings
authzManager.commit();
authzManager.close();
}
}
ipaddress認證模式下,取消和賦予許可權的操作如下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(「authentication」, 「ipaddress」);
props.setProperty(「zookeeper.connect」,
「172.16.1.87:2181,172.16.1.88:2181,172.16.1.89:2181″);
ZKConfig config = new ZKConfig(props);
// new authorization manager
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 172.16.1.87 on topic test
authzManager.resetPermission(「172.16.1.87″,
new Permissions(Permissions.READ, Permissions.WRITE), 「test」);
// grant permission WRITE to ip 172.16.1.87 on topic test
authzManager.grant(「172.16.1.87″, new Permissions(Permissions.CREATE), 「test」);
// revoke permission READ from ip 172.16.1.87 on topic test
authzManager.revoke(「172.16.1.87″, new Permissions(Permissions.READ), 「test」);
// commit the permission settings
authzManager.commit();
authzManager.close();
}
}
總結與展望
本文通過介紹Kafka現有架構,深入挖掘其中存在的安全問題,並給出Transwarp在Kafka安全上所做的工作及其使用方式。然而,縱觀Hadoop & Spark生態系統,安全功能還存在很多問題,各組件的許可權系統獨立混亂,缺少集中易用的賬戶管理系統。某些組件的許可權管理還很不成熟,如Spark的調度器缺少用戶的概念,不能限制具體用戶使用資源的多少。Transwarp基於開源版本,在安全方面已有相當多的積累,並持續改進開發,致力於為企業用戶提供一個易用、高效、安全和穩定的基礎數據平台。
H. kafka offset的存儲
offset即消費消息的偏移值,記錄了kafka每個consumergroup的下一個需要讀取消費位置,保障其消息的消費可靠性。
kafka0.8.1.1以前,offset保存在zk中,存放在/consumers節點下。但是由於頻繁訪問zk,zk需要一個一個節點更新offset,不能批量或分組更新,導致offset更新成了瓶頸。後續兩個過渡版本增加了參數「offsets.storage」,該參數可配置為「zookeeper」或「kafka」分別表示offset的保持位置在zk或是broker,默認保留在zk,0.9版本以後offset就默認保存在broker下。若配置的「kafka」,當設置了「al.commit.enabled」參數時,offset仍然可以提交到zk。
zk中保存offset結構為:
注意:由於kafka對客戶端client向下兼容,低版本的client仍然能夠通過鏈接zk消費數據,並提交offset數據,即使broker版本高於0.9,提交的offset仍然保存在zk;此時仍然存在offset更新瓶頸問題,所以建議盡量使用高版本client,通過鏈接broker方式消費數據。
例如:kafka broker版本2.6.0,consumer版本0.8.2.1:
構建consumer:
啟動消費者消費全部10條歷史消息,查看zk下/consumer節點的消費者信息:
可以看到group 「test_group1」對topic 「test1」的3個partition消費情況,offset分別為6,2,2。
這里kafka只記錄了每個group的消費情況,沒有對某一個consumer做單獨記錄。早期版本/ids節點記錄consumer id信息,owner節點記錄各個partition所屬consumer信息
如上所述,新版本中offset由broker維護,offset信息由一個特殊的topic 「 __consumer_offsets」來保存,offset以消息形式發送到該topic並保存在broker中。這樣consumer提交offset時,只需連接到broker,不用訪問zk,避免了zk節點更新瓶頸。
broker消息保存目錄在配置文件server.properties中:
該目錄下默認包含50個以__consumer_offsets開頭的目錄,用於存放offset:
offset的存放位置決定於groupid的hash值,其獲取方式:
其中numPartitions由offsets.topic.num.partitions參數決定,默認值即50。以groupid 「test-group」為例,計數其存儲位置為:__consumer_offsets-12,當其消費全部10條數據後,使用命令查看該目錄下消息記錄:kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --partition 12 --from-beginning --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
該數據結構為以groupid-topic-partition作為key,value為OffsetAndMetadata,其中包含了offset信息。可以看到group「test-group」在topic「test1」的三個partition下offset值分別為6,2,2。同保存在zk數據一樣,offset只記錄groupid的消費情況,對於具體consumer是透明的。
那麼offset具體被發送給哪個broker保存呢?
由上文可知,offset的存儲分區是通過groupid的hash值取得的,那麼offset發送的broker就是該分區的leader broker,這也符合kafka普通消息的發生邏輯。所以,每個group的offset都將發生到一個broker,broker中存在一個offset manager 實例負責接收處理offset提交請求,並返回提交操作結果。
參考:
https://wanwenli.com/kafka/2016/11/04/Kafka-Group-Coordinator.html
https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management
I. qq消息記錄更改了存儲位置怎麼辦
1、 在想要存儲的區域新建文件夾,如: E:\099 Chat Data\Tencent Files\。
在這里插入圖片描述
2、 打開電腦QQ,設置——文件管理。
在這里插入圖片描述
3、 點擊瀏覽,選擇099 Chat Data下的Tencent Files,注意這里與微信的選擇不同,微信是選擇WeChat Files上一級文件夾後自動生成WeChat Files文件夾,而QQ是預先手動新建Tencent Files文件夾。
J. 如何解決doc文檔存儲後內容位置發生變化
給你發了消息,請查收一下