當前位置:首頁 » 數據倉庫 » 清空kafka資料庫
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

清空kafka資料庫

發布時間: 2023-05-08 03:25:11

1. 軟體開發中的Kafka和資料庫的關系是什麼呢

首先明確說明Kafka不是資料庫,它沒有schema,也沒有表,更沒有索引。

1.它僅僅是生產消息流、消費消息流而已。從這個角度來說Kafka的確不像資料庫,至少不像我們熟知的關系型資料庫。

那麼到底什麼是資料庫呢?或者說什麼特性使得一個系統可以被稱為資料庫?經典的教科書是這么說的:資料庫是提供 ACID 特性的,我們依次討論下ACID。
1、持久性(rability)
我們先從最容易的持久性開始說起,因為持久性最容易理解。在80年代持久性指的是把數據寫入到磁帶中,這是一種很古老的存儲設備,現在應該已經絕跡了。目前實現持久性更常見的做法是將數據寫入到物理磁碟上,而這也只能實現單機的持久性。當演進到分布式系統時代後,持久性指的是將數據通過備份機制拷貝到多台機器的磁碟上。很多資料庫廠商都有自己的分布式系統解決方案,如GreenPlum和Oracle RAC。它們都提供了這種多機備份的持久性。和它們類似,Apache Kafka天然也是支持這種持久性的,它提供的副本機制在實現原理上幾乎和資料庫廠商的方案是一樣的。
2、原子性(atomicity)
資料庫中的原子性和多線程領域內的原子性不是一回事。我們知道在Java中有AtomicInteger這樣的類能夠提供線程安全的整數操作服務,這里的atomicity關心的是在多個線程並發的情況下如何保證正確性的問題。而在資料庫領域,原子性關心的是如何應對錯誤或異常情況,特別是對於事務的處理。如果服務發生故障,之前提交的事務要保證已經持久化,而當前運行的事務要終止(abort),它執行的所有操作都要回滾,最終的狀態就好像該事務從未運行過那樣。舉個實際的例子,
第三個方法是採用基於日誌結構的消息隊列來實現,比如使用Kafka來做,如下圖所示:
在這個架構中app僅僅是向Kafka寫入消息,而下面的資料庫、cache和index作為獨立的consumer消費這個日誌——Kafka分區的順序性保證了app端更新操作的順序性。如果某個consumer消費速度慢於其他consumer也沒關系,畢竟消息依然在Kafka中保存著。總而言之,有了Kafka所有的異質系統都能以相同的順序應用app端的更新操作,

3、隔離性(isolation)
在傳統的關系型資料庫中最強的隔離級別通常是指serializability,國內一般翻譯成可串列化或串列化。表達的思想就是連接資料庫的每個客戶端在執行各自的事務時資料庫會給它們一個假象:彷彿每個客戶端的事務都順序執行的,即執行完一個事務之後再開始執行下一個事務。其實資料庫端同時會處理多個事務,但serializability保證了它們就像單獨執行一樣。舉個例子,在一個論壇系統中,每個新用戶都需要注冊一個唯一的用戶名。一個簡單的app實現邏輯大概是這樣的:
4、一致性(consistency)
最後說說一致性。按照Kelppmann大神的原話,這是一個很奇怪的屬性:在所有ACID特性中,其他三項特性的確屬於資料庫層面需要實現或保證的,但只有一致性是由用戶來保證的。嚴格來說,它不屬於資料庫的特性,而應該屬於使用資料庫的一種方式。坦率說第一次聽到這句話時我本人還是有點震驚的,因為從沒有往這個方面考慮過,但仔細想想還真是這么回事。比如剛才的注冊用戶名的例子中我們要求每個用戶名是唯一的。這種一致性約束是由我們用戶做出的,而不是資料庫本身。資料庫本身並不關心或並不知道用戶名是否應該是唯一的。針對Kafka而言,這種一致性又意味著什麼呢?Kelppmann沒有具體展開,

希望能幫到你,謝謝!

2. 「根本就不需要 Kafka 這樣的大型分布式系統!」

作者 | Normcore Tech

譯者 | 彎月,責編 | 屠敏

出品 | CSDN(ID:CSDNnews)

以下為譯文:

可能有人沒有聽說過Kafka,這是一個非常復雜的分布式軟體,可協調多台計算機之間的數據傳輸。更具體地說,該軟體的功能是「展平」數據,然後快速地將數據從一個地方移動到另一個地方。一般來講,如果你有很多數據需要快速處理並發送到其他地方,那麼就可以考慮一下Kafka。Kafka還可以在一定期限內保留數據,比如設置數據保存2天、3天或7天,如果你的下游流程失敗,那麼你還可以利用存儲在Kafka中的數據重新處理。

許多處理匯總數據的公司(比如Facebook和Twitter等社交網路數據,以及每晚需要處理大量星體運動的天文學家,或需要快速了解車輛周圍環境數據的自動駕駛車輛公司等)都在使用Kafka,將任意地方生產的數據(即用戶通過鍵盤輸入的數據,通過望遠鏡讀取的數據,通過車輛遙測讀取的數據等)移動至下游流程進行處理和分析。

最近,WeWork更為名The We Company,他們在共享工作間領域取得了成功,其官網宣稱公司的使命為:

「提升世界的意識。」其核心業務是從房地產出租公司那裡租下辦公室,然後轉租給無法按照傳統流程租賃辦公室的個人和小公司。

為了「提升世界的意識」,該公司致力於為世界各地的個人和公司的團隊打造獨特卻又不完全相同的辦公空間。最近,該公司還開始涉足教育。

最近,因為上市,WeWork曝光了一些財務信息:

從好的方面來看,根據A xi os的數據,2018年WeWork的入住率為90%,且會員總數在不斷增加。

有人常常將WeWork視為矽谷地區的公司過高估值的完美例子。作為一家房地產企業,WeWork燒錢的速度非常快,毫無疑問他們必須努力讓公眾市場投資者相信公司有長遠的發展,同時和還要維護其作為 科技 公司的地位。

這家公司再三強調說它不是一家房地產公司(畢竟它在不斷燒錢對吧?),那麼一家消息中介技術公司究竟能提供什麼?WeWork曾宣布,它使用Kafka來實現「內部部署的物聯網需求」。這是什麼意思?

「我們的產品是物理空間,」WeWork的首席開發負責人David Fano說,他在會議期間穿著一件印有「bldgs = data」字樣的T恤。

每個辦公室都有10個環境感測器——小巧的壁掛式綠色盒子,這些感測器可跟蹤室內溫度、濕度、空氣質量、氣壓和環境光線水平。還有20個白色的壁掛式信標,呈三角形分布在公共空間(開放式辦公區和會議室),用於測量WeWork成員的室內位置(數據是匿名的)。頂部四分之一的感測器通過計算機視覺觀察成員的活動。

換句話說,WeWork會跟蹤WeWork的多個物理事件並記錄所有這些數據。但是......他們真的有必要這樣做嗎?記錄Keith Harring壁畫周圍開放區域的環境溫度能給他們帶來怎樣的競爭優勢?更重要的是,他們能否將這些信息用到重要的項目中?

對於公司而言,重要的是要了解辦公室的「單位組合」 ——私人辦公室、會議空間和開放式辦公桌——的比例,我們可以利用這些信息對下一個辦公間作出調整。

我覺得這家新聞報道機構需要建立一種思考技術的心理模型。Ben Thompson為Stratechery提供了出色的服務,他建立了聚合理論(https://stratechery .com /concepts/),我在努力為這些理論建立一個網站,如果必須從中選擇一個的話,那便是:

大多數創業公司(以及大公司)現有的技術棧都沒有必要。

在此,我想挑戰一下那些自認為可以在一個周末期間獨自建立Facebook的Hacker News上的開發人員,我認為WeWork的實際業務和架構問題在於:

WeWork需要的只不過是清點進出的人數,然後對容量規劃做優化而已,追蹤「氣壓」有什麼用?只要你有WeWork的ID,那你肯定是個人或公司。那麼,在大堂里安裝一個登記系統,並要求會議系統發放名牌,不是更簡單嗎?

第一項要求根本就不需要Kafka:

目前WeWork有280個辦公間。假設每個辦公間平均每天有1000個(有這么多嗎?)成員出入。那麼每天會產生280,000個事務。我們假設每個人在早餐時間進來一次,在午餐時間出入各一次,然後離開。那麼每個人會產生4個事務。那麼每天大約是100萬個事務,這點數據量存儲在最常用的開源關系資料庫Postgres中就可以了。保守地說,Postgres每秒可以提供10,000次寫入(如果設置得當,其寫入次數會更高)。每天100萬個事件,也就是每秒11次。根本就不是問題。

至於第二項要求,受預訂會議室人數的影響,產生的數據量可能更高,但你不需要實時傳輸數據。你完全可以等到一天結束時批量處理或收集,這同樣可以利用司空見慣的關系資料庫。

與大型Postgres(或者是BigQuery,或選擇其他關系資料庫連接到接收JSON感測器數據的Web服務)相比,Kafka的日常開銷要高出很多,因為分布式系統非常非常復雜,比傳統的系統復雜得多。

Kafka是一個非常優秀的強大的工具,但各個公司在採用該軟體時,需要三思而後行。殺雞焉用牛刀,WeWork用Kafka來記錄開放辦公間的氣壓,實屬大材小用。

雖然很多時候我們都不需要Kafka,但開發人員很喜歡推薦這個工具,因為他們可以借機積攢經驗和談資。開發人員喜歡用最尖端的技術來完成工作,有時甚至他們自己都沒意識到這一點。

過度架構真實存在。 Nemil在一篇文章中說:

在職業生涯的早期,你遇到的大量設計不良的軟體系統都要歸咎於那些傳播錯誤觀點的工程媒體。

在大學和培訓班中,你對工程的了解主要來自工程媒體,例如 Hacker News、聚會、會議、Free Code Camp和Hacker Noon等。這些網站廣泛討論的技術(比如微服務、前端框架或區塊鏈)自然會現在你的技術棧中,雖然不是很必要。

使用這些技術棧會導致各個公司承擔不必要的債務,導致他們不得不在風險投資周期中尋求更多的資金,無法邁向精益或從別人的資金中解脫出來。

這種不幸的趨勢只會持續下去,我們唯一能做的就是公之於眾。

原文:https://vicki.substack .com /p/you-dont-need-kafka

【END】

3. Kafka核心組件之控制器和協調器

[TOC]

我們已經知道Kafka的集群由n個的broker所組成,每個broker就是一個kafka的實例或者稱之為kafka的服務。其實控制器也是一個broker,控制器也叫leader broker。

他除了具有一般broker的功能外,還負責分區leader的選取,也就是負責選舉partition的leader replica。

kafka每個broker啟動的時候,都會實例化一個KafkaController,並將broker的id注冊到zookeeper,集群在啟動過程中,通過選舉機制選舉出其中一個broker作為leader,也就是前面所說的控制器。

包括集群啟動在內,有三種情況觸發控制器選舉:

1、集群啟動

2、控制器所在代理發生故障

3、zookeeper心跳感知,控制器與自己的session過期

按照慣例,先看圖。我們根據下圖來講解集群啟動時,控制器選舉過程。

假設此集群有三個broker,同時啟動。

(一)3個broker從zookeeper獲取/controller臨時節點信息。/controller存儲的是選舉出來的leader信息。此舉是為了確認是否已經存在leader。

(二)如果還沒有選舉出leader,那麼此節點是不存在的,返回-1。如果返回的不是-1,而是leader的json數據,那麼說明已經有leader存在,選舉結束。

(三)三個broker發現返回-1,了解到目前沒有leader,於是均會觸發向臨時節點/controller寫入自己的信息。最先寫入的就會成為leader。

(四)假設broker 0的速度最快,他先寫入了/controller節點,那麼他就成為了leader。而broker1、broker2很不幸,因為晚了一步,他們在寫/controller的過程中會拋出ZkNodeExistsException,也就是zk告訴他們,此節點已經存在了。

經過以上四步,broker 0成功寫入/controller節點,其它broker寫入失敗了,所以broker 0成功當選leader。

此外zk中還有controller_epoch節點,存儲了leader的變更次數,初始值為0,以後leader每變一次,該值+1。所有向控制器發起的請求,都會攜帶此值。如果控制器和自己內存中比較,請求值小,說明kafka集群已經發生了新的選舉,此請求過期,此請求無效。如果請求值大於控制器內存的值,說明已經有新的控制器當選了,自己已經退位,請求無效。kafka通過controller_epoch保證集群控制器的唯一性及操作的一致性。

由此可見,Kafka控制器選舉就是看誰先爭搶到/controller節點寫入自身信息。

控制器的初始化,其實是初始化控制器所用到的組件及監聽器,准備元數據。

前面提到過每個broker都會實例化並啟動一個KafkaController。KafkaController和他的組件關系,以及各個組件的介紹如下圖:

圖中箭頭為組件層級關系,組件下面還會再初始化其他組件。可見控制器內部還是有些復雜的,主要有以下組件:

1、ControllerContext,此對象存儲了控制器工作需要的所有上下文信息,包括存活的代理、所有主題及分區分配方案、每個分區的AR、leader、ISR等信息。

2、一系列的listener,通過對zookeeper的監聽,觸發相應的操作,黃色的框的均為listener

3、分區和副本狀態機,管理分區和副本。

4、當前代理選舉器ZookeeperLeaderElector,此選舉器有上位和退位的相關回調方法。

5、分區leader選舉器,PartitionLeaderSelector

6、主題刪除管理器,TopicDeletetionManager

7、leader向broker批量通信的ControllerBrokerRequestBatch。緩存狀態機處理後產生的request,然後統一發送出去。

8、控制器平衡操作的KafkaScheler,僅在broker作為leader時有效。

Kafka集群的一些重要信息都記錄在ZK中,比如集群的所有代理節點、主題的所有分區、分區的副本信息(副本集、主副本、同步的副本集)。每個broker都有一個控制器,為了管理整個集群Kafka選利用zk選舉模式,為整個集群選舉一個「中央控制器」或」主控制器「,控制器其實就是一個broker節點,除了一般broker功能外,還具有分區首領選舉功能。中央控制器管理所有節點的信息,並通過向ZK注冊各種監聽事件來管理整個集群節點、分區的leader的選舉、再平衡等問題。外部事件會更新ZK的數據,ZK中的數據一旦發生變化,控制器都要做不同的響應處理。

故障轉移其實就是leader所在broker發生故障,leader轉移為其他的broker。轉移的過程就是重新選舉leader的過程。

重新選舉leader後,需要為該broker注冊相應許可權,調用的是ZookeeperLeaderElector的onControllerFailover()方法。在這個方法中初始化和啟動了一系列的組件來完成leader的各種操作。具體如下,其實和控制器初始化有很大的相似度。

1、注冊分區管理的相關監聽器

2、注冊主題管理的相關監聽

3、注冊代理變化監聽器

4、重新初始化ControllerContext,

5、啟動控制器和其他代理之間通信的ControllerChannelManager

6、創建用於刪除主題的TopicDeletionManager對象,並啟動。

7、啟動分區狀態機和副本狀態機

8、輪詢每個主題,添加監聽分區變化的

9、如果設置了分區平衡定時操作,那麼創建分區平衡的定時任務,默認300秒檢查並執行。

除了這些組件的啟動外,onControllerFailover方法中還做了如下操作:

1、/controller_epoch值+1,並且更新到ControllerContext

2、檢查是否出發分區重分配,並做相關操作

3、檢查需要將優先副本選為leader,並做相關操作

4、向kafka集群所有代理發送更新元數據的請求。

下面來看leader許可權被取消時,調用的方法onControllerResignation

1、該方法中注銷了控制器的許可權。取消在zookeeper中對於分區、副本感知的相應監聽器的監聽。

2、關閉啟動的各個組件

3、最後把ControllerContext中記錄控制器版本的數值清零,並設置當前broker為RunnignAsBroker,變為普通的broker。

通過對控制器啟動過程的學習,我們應該已經對kafka工作的原理有了了解, 核心是監聽zookeeper的相關節點,節點變化時觸發相應的操作

有新的broker加入集群時,稱為代理上線。反之,當broker關閉,推出集群時,稱為代理下線。

代理上線:

1、新代理啟動時向/brokers/ids寫數據

2、BrokerChangeListener監聽到變化。對新上線節點調用controllerChannelManager.addBroker(),完成新上線代理網路層初始化

3、調用KafkaController.onBrokerStartup()處理

3.5恢復因新代理上線暫停的刪除主題操作線程

代理下線:

1、查找下線節點集合

2、輪詢下線節點,調用controllerChannelManager.removeBroker(),關閉每個下線節點網路連接。清空下線節點消息隊列,關閉下線節點request請求

3、輪詢下線節點,調用KafkaController.onBrokerFailure處理

4、向集群全部存活代理發送updateMetadataRequest請求

顧名思義,協調器負責協調工作。本節所講的協調器,是用來協調消費者工作分配的。簡單點說,就是消費者啟動後,到可以正常消費前,這個階段的初始化工作。消費者能夠正常運轉起來,全有賴於協調器。

主要的協調器有如下兩個:

1、消費者協調器(ConsumerCoordinator)

2、組協調器(GroupCoordinator)

kafka引入協調器有其歷史過程,原來consumer信息依賴於zookeeper存儲,當代理或消費者發生變化時,引發消費者平衡,此時消費者之間是互不透明的,每個消費者和zookeeper單獨通信,容易造成羊群效應和腦裂問題。

為了解決這些問題,kafka引入了協調器。服務端引入組協調器(GroupCoordinator),消費者端引入消費者協調器(ConsumerCoordinator)。每個broker啟動的時候,都會創建GroupCoordinator實例,管理部分消費組(集群負載均衡)和組下每個消費者消費的偏移量(offset)。每個consumer實例化時,同時實例化一個ConsumerCoordinator對象,負責同一個消費組下各個消費者和服務端組協調器之前的通信。如下圖:

消費者協調器,可以看作是消費者做操作的代理類(其實並不是),消費者很多操作通過消費者協調器進行處理。

消費者協調器主要負責如下工作:

1、更新消費者緩存的MetaData

2、向組協調器申請加入組

3、消費者加入組後的相應處理

4、請求離開消費組

5、向組協調器提交偏移量

6、通過心跳,保持組協調器的連接感知。

7、被組協調器選為leader的消費者的協調器,負責消費者分區分配。分配結果發送給組協調器。

8、非leader的消費者,通過消費者協調器和組協調器同步分配結果。

消費者協調器主要依賴的組件和說明見下圖:

可以看到這些組件和消費者協調器擔負的工作是可以對照上的。

組協調器負責處理消費者協調器發過來的各種請求。它主要提供如下功能:

組協調器在broker啟動的時候實例化,每個組協調器負責一部分消費組的管理。它主要依賴的組件見下圖:

這些組件也是和組協調器的功能能夠對應上的。具體內容不在詳述。

下圖展示了消費者啟動選取leader、入組的過程。

消費者入組的過程,很好的展示了消費者協調器和組協調器之間是如何配合工作的。leader consumer會承擔分區分配的工作,這樣kafka集群的壓力會小很多。同組的consumer通過組協調器保持同步。消費者和分區的對應關系持久化在kafka內部主題。

消費者消費時,會在本地維護消費到的位置(offset),就是偏移量,這樣下次消費才知道從哪裡開始消費。如果整個環境沒有變化,這樣做就足夠了。但一旦消費者平衡操作或者分區變化後,消費者不再對應原來的分區,而每個消費者的offset也沒有同步到伺服器,這樣就無法接著前任的工作繼續進行了。

因此只有把消費偏移量定期發送到伺服器,由GroupCoordinator集中式管理,分區重分配後,各個消費者從GroupCoordinator讀取自己對應分區的offset,在新的分區上繼續前任的工作。

下圖展示了不提交offset到服務端的問題:

開始時,consumer 0消費partition 0 和1,後來由於新的consumer 2入組,分區重新進行了分配。consumer 0不再消費partition2,而由consumer 2來消費partition 2,但由於consumer之間是不能通訊的,所有consumer2並不知道從哪裡開始自己的消費。

因此consumer需要定期提交自己消費的offset到服務端,這樣在重分區操作後,每個consumer都能在服務端查到分配給自己的partition所消費到的offset,繼續消費。

由於kafka有高可用和橫向擴展的特性,當有新的分區出現或者新的消費入組後,需要重新分配消費者對應的分區,所以如果偏移量提交的有問題,會重復消費或者丟消息。偏移量提交的時機和方式要格外注意!!

1、自動提交偏移量

設置 enable.auto.commit為true,設定好周期,默認5s。消費者每次調用輪詢消息的poll() 方法時,會檢查是否超過了5s沒有提交偏移量,如果是,提交上一次輪詢返回的偏移量。

這樣做很方便,但是會帶來重復消費的問題。假如最近一次偏移量提交3s後,觸發了再均衡,伺服器端存儲的還是上次提交的偏移量,那麼再均衡結束後,新的消費者會從最後一次提交的偏移量開始拉取消息,此3s內消費的消息會被重復消費。

2、手動提交偏移量

設置 enable.auto.commit為false。程序中手動調用commitSync()提交偏移量,此時提交的是poll方法返回的最新的偏移量。

commitSync()是同步提交偏移量,主程序會一直阻塞,偏移量提交成功後才往下運行。這樣會限製程序的吞吐量。如果降低提交頻次,又很容易發生重復消費。

這里我們可以使用commitAsync()非同步提交偏移量。只管提交,而不會等待broker返回提交結果

commitSync只要沒有發生不可恢復錯誤,會進行重試,直到成功。而commitAsync不會進行重試,失敗就是失敗了。commitAsync不重試,是因為重試提交時,可能已經有其它更大偏移量已經提交成功了,如果此時重試提交成功,那麼更小的偏移量會覆蓋大的偏移量。那麼如果此時發生再均衡,新的消費者將會重復消費消息。