① java工程kafka傳遞自定義對象,消費端獲取到的是null
3. 啟服務
3.1 啟zookeeper
啟zk兩種式第種使用kafka自帶zk
bin/zookeeper-server-start.sh config/zookeeper.properties&
另種使用其zookeeper位於本機位於其址種情況需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 啟 kafka
bin/kafka-server-start.sh config/server.properties
4.創建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
創建名testtopic副本區
通list命令查看剛剛創建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.啟procer並發送消息啟procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
啟發送消息
比
test
hello boy
按Ctrl+C退發送消息
6.啟consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
啟consumerconsole看procer發送消息
啟兩終端發送消息接受消息
都行查看zookeeper進程kafkatopic步步排查原吧
② kafka消費者java版本讀取不到消息怎麼辦
3. 啟動服務
3.1 啟動zookeeper
啟動zk有兩種方式,第一種是使用kafka自己帶的一個zk。
bin/zookeeper-server-start.sh config/zookeeper.properties&
另一種是使用其它的zookeeper,可以位於本機也可以位於其它地址。這種情況需要修改config下面的sercer.properties裡面的zookeeper地址
。例如zookeeper.connect=10.202.4.179:2181
3.2 啟動 kafka
bin/kafka-server-start.sh config/server.properties
4.創建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
創建一個名為test的topic,只有一個副本,一個分區。
通過list命令查看剛剛創建的topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.啟動procer並發送消息啟動procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
啟動之後就可以發送消息了
比如
test
hello boy
按Ctrl+C退出發送消息
6.啟動consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
啟動consumer之後就可以在console中看到procer發送的消息了
可以開啟兩個終端,一個發送消息,一個接受消息。
如果這樣都不行的話,查看zookeeper進程和kafka的topic,一步步排查原因吧。
③ 怎麼配置sparkstreaming 讓他解析kafka中的日誌
1、KafkaUtils.createDstream構造函數KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 使用receivers接收數據利用Kafka高層消費者api於所receivers接收數據保存spark executors通Spark Streaming啟job處理些數據默認丟失啟用WAL志該志存儲HDFS A、創建receiverkafka進行定拉取數據sscrdd區kafkatopic區概念故增加特定主體區數僅僅增加receiver消費topic線程數並增加spark並行處理數據數量 B、於同grouptopic使用receivers創建同DStream C、啟用WAL需要設置存儲級別即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)2.KafkaUtils.createDirectStream區別Receiver接收數據種式定期kafkatopic+partition查詢新偏移量再根據偏移量范圍每batch面處理數據使用kafka簡單消費者api 優點: A、 簡化並行需要kafka輸入流該創建kafka區rdd數且kafka並行讀取B、高效種式並需要WALWAL模式需要數據復制兩第kafka復制另寫wal C、恰語義(Exactly-once-semantics)傳統讀取kafka數據通kafka高層api偏移量寫入zookeeper存數據丟失能性zookeeperssc偏移量致EOS通實現kafka低層api偏移量僅僅ssc保存checkpoint消除zkssc偏移量致問題缺點使用基於zookeeperkafka監控工具怎麼配置sparkstreaming 讓他解析kafka中的日誌
④ apache kafka有哪幾個部分組成
Kafka是一個分布式發布-訂閱publish-subscribe消息傳遞系統,設計目標是快速、可伸縮和耐用冗餘。
它是一個在一個較高的抽象水平上描述的但是又非常簡單的系統,雖然當你更深地挖掘時會令人難以置信的技術細節。卡夫卡文檔出色的解釋了系統中許多設計和實現的微妙之處,所以我們不會在這里試圖解釋他們了。
像許多發布-訂閱消息傳遞系統,卡夫卡能保存源消息的數據。生產者將輸入寫入到主題topic,而消費者則從主題topic中讀取寫入數據。因為卡夫卡是一個分布式系統,所以topic主題會實現跨多個節點分區和復制。
消息是簡單byte數組,開發者能夠使用它們存在對象的任何格式,如String或JSON和Avro,每個消息能夠有一個Key,因此,生產者保證擁有相同key的消息到達同一個分區,而對主題topic的消費,可以使用多個消費者來配置消費組,每個消費組中消費者能夠從他訂閱的那個主題Topic所在分區子集中讀取消息,這樣每個消息發送給消費組中一個消費者,使用相同key的所有消息能夠到達相同的消費者。
Kafka的特點是它將每個主題topic分區都看成一個日誌log(一個有順序的消息集合),一個分區中的每個消息被分配一個唯一的偏移量offset.
Kafka並不試圖跟蹤哪個消息被哪個消費者讀取,而是只是保留未被讀取的消息。
Kafka保留所有消息的時間,而消費者負責跟蹤它們在每個日誌(日誌是一個消息序列集合代表一個topic分區)中的位置,
因此Kafka能夠支持大量的消費者,使用最小的代價保留大量的數據。
Kafka如何工作?
假設我們正在開發一個大型多人在線游戲。在這些游戲中,玩家在一個虛擬的世界中互相合作和競爭。通常玩家互相發生貿易,比如交換物品和金錢,所以游戲開發者重要的是要確保球員不會欺騙,在下面兩種情況下的交易將標記為特殊:如果貿易數量明顯大於正常的球員;如果IP玩家登錄是不同於過去的20場比賽使用的IP。除了對實時交易進行標記以外,我們也想載入數據到Apache
Hadoop,我們的數據,科學家們可以用它來訓練和測試新演算法。
基於游戲伺服器內存數據緩存進行實時事件標記是最好的,能夠讓我們達到迅速決定,特別是對那些最活躍玩家。如果我們分區游戲伺服器,我們的系統有多個游戲伺服器和數據集,,包括過去登錄的20個玩家和近20在內存的交易,。
我們的游戲伺服器必須執行兩個截然不同的角色:第一個是接受和執行用戶操作,第二是實時處理貿易信息並標記可疑事件。為了有效執行第二個角色功能,每個用戶整個貿易的歷史事件都駐留在一個單獨的伺服器內存中。因為接收用戶操作(第一個角色功能)的伺服器可能沒有他的貿易歷史,這意味著我們必須通過伺服器之間的消息實現第二個角色功能。為了讓兩個角色功能保持鬆散耦合,我們使用卡夫卡在伺服器之間傳遞消息,您將看到如下:
卡夫卡有幾個特性:
可伸縮性、數據分區,低延遲,並且能夠處理大量不同的消費者。我們為登錄和交易配置了一個Topic主題。我們把它們配置成一個topic主題的原因是:只有我們獲得已經登錄信息(我們可以確保玩家從他平時IP登錄)後,才能確保他的交易是有效的。卡夫卡可以在在一個主題topic中維護這個前後順序,而不是在兩個topic之間。
當用戶登錄後進行交易,接受伺服器立即發送事件到Kafka. 消息是將user id作為key, 事件作為值.
這能確保同一個用戶的所有交易和登錄發送到Kafka同樣的分區. 每個事件處理伺服器都運行一個Kafka消費者,
其每個消費者都被配置為同樣組的一部分,這樣,每個伺服器從少量Kafka分區讀取消息,所有關於某個特定用戶的數據都能發往相同事件處理伺服器,當事件處理伺服器從Kafka讀取一個用戶交易時,將這個事件加入到用戶事件歷史緩存本地內存緩存,這樣就無需額外網路磁碟開銷直接標記那些可以事件。
重要的是我們為每個事件處理伺服器創建一個分區,或者每個事件處理伺服器上每核對應一個多線程應用,(記住 Kafka大部分是用少於 10,000
個分區實現所有主題topic,這樣我們不能為每個用戶創建一個分區,因為用戶數是不斷增加的。)
這好像是一種迂迴方式處理事件:從游戲伺服器發送事件到Kafka,
另外一台伺服器讀取這個事件再處理它,這種設計解耦了兩種角色功能,允許我們為每個角色功能安排其需要的容量與能力。.另外,這樣做不會增加處理時間,因為Kafka是設計為高吞吐量和低延遲的,即使只有一個小的三台節點伺服器的集群環境,也能每秒處理接近一百萬個事件,平均延遲只有3ms.
當伺服器標識出一個事件作為可疑,它會發送這個標記了的事件到新的Kafka
topic,比如其主題名稱為Alerts,這時報警伺服器或儀表監控伺服器會接受到這個事件,同時另外一個單獨過程會從Alerts主題中讀取這個事件,將它寫入到Hadoop進行更進一步分析。
因為Kafka並不跟蹤確認每個消費者的消息,它就能用很少的性能影響處理成千上萬的消費者。Kafka甚至可以處理批量消費者:每一個小時處理過程喚醒激活處理一個隊列中所有消息,根本就不會影響系統的吞吐量或延遲。
⑤ 如何查看kafka消費者信息
在Kafak中國社區的qq群中,這個問題被提及的比例是相當高的,這也是Kafka用戶最常碰到的問題之一。本文結合Kafka源碼試圖對該問題相關的因素進行探討。希望對大家有所幫助。怎麼確定分區數?「我應該選擇幾個分區?」——如果你在Kafka中國社區的群里,這樣的問題你會經常碰到的。不過有些遺憾的是,我們似乎並沒有很權威的答案能夠解答這樣的問題。其實這也不奇怪,畢竟這樣的問題通常都是沒有固定答案的。Kafka官網上標榜自己是"high-",即一個高吞吐量的分布式消息引擎。那麼怎麼達到高吞吐量呢?Kafka在底層摒棄了Java堆緩存機制,採用了操作系統級別的頁緩存,同時將隨機寫操作改為順序寫,再結合Zero-Copy的特性極大地改善了IO性能。但是,這只是一個方面,畢竟單機優化的能力是有上限的。如何通過水平擴展甚至是線性擴展來進一步提升吞吐量呢?Kafka就是使用了分區partition,通過將topic的消息打散到多個分區並分布保存在不同的broker上實現了消息處理不管是procer還是consumer的高吞吐量。Kafka的生產者和消費者都可以多線程地並行操作,而每個線程處理的是一個分區的數據。因此分區實際上是調優Kafka並行度的最小單元。對於procer而言,它實際上是用多個線程並發地向不同分區所在的broker發起Socket連接同時給這些分區發送消息;而consumer呢,同一個消費組內的所有consumer線程都被指定topic的某一個分區進行消費具體如何確定consumer線程數目我們後面會詳細說明。所以說,如果一個topic分區越多,理論上整個集群所能達到的吞吐量就越大。但分區是否越多越好呢?顯然也不是,因為每個分區都有自己的開銷:一、客戶端/伺服器端需要使用的內存就越多先說說客戶端的情況。Kafka082之後推出了Java版的全新的procer,這個procer有個參數batchsize,默認是16KB。它會為每個分區緩存消息,一旦滿了就打包將消息批量發出。看上去這是個能夠提升性能的設計。不過很顯然,因為這個參數是分區級別的,如果分區數越多,這部分緩存所需的內存佔用也會。假設你有10000個分區,按照默認設置,這部分緩存需要佔用約157MB的內存。而consumer端呢?我們拋開獲取數據所需的內存不說,只說線程的開銷。如果還是假設有10000個分區,同時consumer線程數要匹配分區數大部分情況下是最佳的消費吞吐量配置的話,那麼在consumerclient就要創建10000個線程,也需要創建大約10000個Socket去獲取分區數據。這裡面的線程切換的開銷本身已經不容小覷了。伺服器端的開銷也不小,如果閱讀Kafka源碼的話可以發現,伺服器端的很多組件都在內存中維護了分區級別的緩存,比如controller,FetcherManager等,因此分區數越多,這種緩存的成本越久越大。二、文件句柄的開銷每個分區在底層文件系統都有屬於自己的一個目錄。該目錄下通常會有兩個文件:base_offsetlog和base_offsetindex。Kafak的controller和ReplicaManager會為每個broker都保存這兩個文件句柄filehandler。很明顯,如果分區數越多,所需要保持打開狀態的文件句柄數也就越多,最終可能會突破你的ulimit-n的限制。三、降低高可用性Kafka通過副本replica機制來保證高可用。具體做法就是為每個分區保存若干個副本replica_factor指定副本數。每個副本保存在不同的broker上。期中的一個副本充當leader副本,負責處理procer和consumer請求。其他副本充當follower角色,由Kafkacontroller負責保證與leader的同步。如果leader所在的broker掛掉了,contorller會檢測到然後在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間窗口,雖然大部分情況下可能只是幾毫秒級別。但如果你有10000個分區,10個broker,也就是說平均每個broker上有1000個分區。此時這個broker掛掉了,那麼zookeeper和controller需要立即對這1000個分區進行leader選舉。比起很少的分區leader選舉而言,這必然要花更長的時間,並且通常不是線性累加的。如果這個broker還同時是controller情況就更糟了。說了這么多「廢話」,很多人肯定已經不耐煩了。那你說到底要怎麼確定分區數呢?答案就是:視情況而定。基本上你還是需要通過一系列實驗和測試來確定。當然測試的依據應該是吞吐量。雖然LinkedIn這篇文章做了Kafka的基準測試,但它的結果其實對你意義不大,因為不同的硬體、軟體、負載情況測試出來的結果必然不一樣。我經常碰到的問題類似於,官網說每秒能到10MB,為什麼我的procer每秒才1MB?——且不說硬體條件,最後發現他使用的消息體有1KB,而官網的基準測試是用100B測出來的,因此根本沒有可比性。不過你依然可以遵循一定的步驟來嘗試確定分區數:創建一個只有1個分區的topic,然後測試這個topic的procer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位可以是MB/s。然後假設總的目標吞吐量是Tt,那麼分區數=Tt/maxTp,TcTp表示procer的吞吐量。測試procer通常是很容易的,因為它的邏輯非常簡單,就是直接發送消息到Kafka就好了。Tc表示consumer的吞吐量。測試Tc通常與應用的關系更大,因為Tc的值取決於你拿到消息之後執行什麼操作,因此Tc的測試通常也要麻煩一些。另外,Kafka並不能真正地做到線性擴展其實任何系統都不能,所以你在規劃你的分區數的時候最好多規劃一下,這樣未來擴展時候也更加方便。消息-分區的分配默認情況下,Kafka根據傳遞消息的key來進行分區的分配,即hashkey
⑥ kafka 消費者id是自動生成的嗎
手動修改meta.properties文件,在配置的log.dir下
vi ../kafka_data/meta.properties1
So, 如果你想改kafka的broker id,比如第一遍寫錯了,應該遵循兩個步驟:
1. 改server.prorperties文件配置;
2. 改meta.properties,默認情況下,應該在/tmp/kafka-logs目錄下;
同時需注意數據存在多個目錄時,需要修改多個目錄的meta.propertie。
⑦ kafka server.properties怎樣配置
Kafka是由LinkedIn設計的一個高吞吐量、分布式、基於發布訂閱模式的消息系統,使用Scala編寫,它以可水平擴展、可靠性、非同步通信和高吞吐率等特性而被廣泛使用。目前越來越多的開源分布式處理系統都支持與Kafka集成,其中Spark Streaming作為後端流引擎配合Kafka作為前端消息系統正成為當前流處理系統的主流架構之一。
然而,當下越來越多的安全漏洞、數據泄露等問題的爆發,安全正成為系統選型不得不考慮的問題,Kafka由於其安全機制的匱乏,也導致其在數據敏感行業的部署存在嚴重的安全隱患。本文將圍繞Kafka,先介紹其整體架構和關鍵概念,再深入分析其架構之中存在的安全問題,最後分享下Transwarp在Kafka安全性上所做的工作及其使用方法。
Kafka架構與安全
首先,我們來了解下有關Kafka的幾個基本概念:
Topic:Kafka把接收的消息按種類劃分,每個種類都稱之為Topic,由唯一的Topic Name標識。
Procer:向Topic發布消息的進程稱為Procer。
Consumer:從Topic訂閱消息的進程稱為Consumer。
Broker:Kafka集群包含一個或多個伺服器,這種伺服器被稱為Broker。
Kafka的整體架構如下圖所示,典型的Kafka集群包含一組發布消息的Procer,一組管理Topic的Broker,和一組訂閱消息的Consumer。Topic可以有多個分區,每個分區只存儲於一個Broker。Procer可以按照一定的策略將消息劃分給指定的分區,如簡單的輪詢各個分區或者按照特定欄位的Hash值指定分區。Broker需要通過ZooKeeper記錄集群的所有Broker、選舉分區的Leader,記錄Consumer的消費消息的偏移量,以及在Consumer Group發生變化時進行relalance. Broker接收和發送消息是被動的:由Procer主動發送消息,Consumer主動拉取消息。
然而,分析Kafka框架,我們會發現以下嚴重的安全問題:
1.網路中的任何一台主機,都可以通過啟動Broker進程而加入Kafka集群,能夠接收Procer的消息,能夠篡改消息並發送給Consumer。
2.網路中的任何一台主機,都可以啟動惡意的Procer/Consumer連接到Broker,發送非法消息或拉取隱私消息數據。
3.Broker不支持連接到啟用Kerberos認證的ZooKeeper集群,沒有對存放在ZooKeeper上的數據設置許可權。任意用戶都能夠直接訪問ZooKeeper集群,對這些數據進行修改或刪除。
4.Kafka中的Topic不支持設置訪問控制列表,任意連接到Kafka集群的Consumer(或Procer)都能對任意Topic讀取(或發送)消息。
隨著Kafka應用場景越來越廣泛,特別是一些數據隱私程度較高的領域(如道路交通的視頻監控),上述安全問題的存在猶如一顆定時炸彈,一旦內網被黑客入侵或者內部出現惡意用戶,所有的隱私數據(如車輛出行記錄)都能夠輕易地被竊取,而無需攻破Broker所在的伺服器。
Kafka安全設計
基於上述分析,Transwarp從以下兩個方面增強Kafka的安全性:
身份認證(Authentication):設計並實現了基於Kerberos和基於IP的兩種身份認證機制。前者為強身份認證,相比於後者具有更好的安全性,後者適用於IP地址可信的網路環境,相比於前者部署更為簡便。
許可權控制(Authorization):設計並實現了Topic級別的許可權模型。Topic的許可權分為READ(從Topic拉取數據)、WRITE(向Topic中生產數據)、CREATE(創建Topic)和DELETE(刪除Topic)。
基於Kerberos的身份機制如下圖所示:
Broker啟動時,需要使用配置文件中的身份和密鑰文件向KDC(Kerberos伺服器)認證,認證通過則加入Kafka集群,否則報錯退出。
Procer(或Consumer)啟動後需要經過如下步驟與Broker建立安全的Socket連接:
1.Procer向KDC認證身份,通過則得到TGT(票證請求票證),否則報錯退出
2.Procer使用TGT向KDC請求Kafka服務,KDC驗證TGT並向Procer返回SessionKey(會話密鑰)和ServiceTicket(服務票證)
3.Procer使用SessionKey和ServiceTicket與Broker建立連接,Broker使用自身的密鑰解密ServiceTicket,獲得與Procer通信的SessionKey,然後使用SessionKey驗證Procer的身份,通過則建立連接,否則拒絕連接。
ZooKeeper需要啟用Kerberos認證模式,保證Broker或Consumer與其的連接是安全的。
Topic的訪問控制列表(ACL)存儲於ZooKeeper中,存儲節點的路徑為/acl/<topic>/<user>,節點數據為R(ead)、W(rite)、C(reate)、D(elete)許可權的集合,如/acl/transaction/jack節點的數據為RW,則表示用戶jack能夠對transaction這個topic進行讀和寫。
另外,kafka為特權用戶,只有kafka用戶能夠賦予/取消許可權。因此,ACL相關的ZooKeeper節點許可權為kafka具有所有許可權,其他用戶不具有任何許可權。
構建安全的Kafka服務
首先,我們為Broker啟用Kerberos認證模式,配置文件為/etc/kafka/conf/server.properties,安全相關的參數如下所示:
其中,authentication參數表示認證模式,可選配置項為simple, kerberos和ipaddress,默認為simple。當認證模式為kerberos時,需要額外配置賬戶屬性principal和對應的密鑰文件路徑keytab.
認證模式為ipaddress時,Procer和Consumer創建時不需要做任何改變。而認證模式為kerberos時,需要預先創建好相應的principal和keytab,並使用API進行登錄,樣例代碼如下所示:
public class SecureProcer extends Thread {
private final kafka.javaapi.procer.Procer<Integer, String> procer;
private final String topic;
private final Properties props = new Properties();
public SecureProcer(String topic) {
AuthenticationManager.setAuthMethod(「kerberos」);
AuthenticationManager.login(「procer1″, 「/etc/procer1.keytab」);
props.put(「serializer.class」, 「kafka.serializer.StringEncoder」);
props.put(「metadata.broker.list」,
「172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092″);
// Use random partitioner. Don』t need the key type. Just set it to Integer.
// The message is of type String.
procer = new kafka.javaapi.procer.Procer<Integer, String>(
new ProcerConfig(props));
this.topic = topic;
⑧ kafka的consumer.properties的group.id到底有什麼用
kafka的consumer.properties的group.id到底有什麼用,
在kafka分布式集群部署時,消費者的group.id,是否需要和consumer.properties配置的group.id一致。
我兩個不同的topic,分別使用兩個consumer消費。
其中一個consumer必須設置group.id和consumer.properties配置的group.id一致,才能消費消息。
另一個consumer必須設置group.id和consumer.properties配置的group.id不一致,才能消費消息。
⑨ kafka0.8怎麼配置高版本的zookeeper
我這里是使用的是,kafka自帶的zookeeper。以及關於kafka的日誌文件啊,都放在默認里即/tmp下,我沒修改。保存默認的 1、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ jps 2625 Jps 2、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties 此刻,這時,會一直停在這,因為是前端運行。另開一窗口, 3、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties 也是前端運行。
⑩ 10.kafka消費者如何分配分區
從好的方面來說,引入多個consumer的初衷大多是為了提升消費性能,即提升消費的吞吐量。試想你的業務消費代碼打算消費100個分區的數據,使用一個consumer消費有很大可能使得各個分區的消費進度不均勻,且單個consumer單次poll回來的數據量是有限制的,最終消費端總的TPS也受限於單consumer的性能。
從不好的方面看,某個組內的consumer數越多,通常意味著該group經rebalance後達到穩定狀態的時間也就越長,因而你可能需要為max.poll.interval.ms設置更大的值。曾經也見過國外有個用戶發帖子抱怨說他的某個consumer group下有100個consumer,每次rebalance一次都要10分鍾。具體的原因就在於coordinator需要等待所有的組成員都發送JoinGroup請求後才會將group置於AwaitingSync狀態,然後等待leader成員分配方案並將方案發送給它,之後coordinator下發分配方案給各個成員。