① java工程kafka传递自定义对象,消费端获取到的是null
3. 启服务
3.1 启zookeeper
启zk两种式第种使用kafka自带zk
bin/zookeeper-server-start.sh config/zookeeper.properties&
另种使用其zookeeper位于本机位于其址种情况需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 启 kafka
bin/kafka-server-start.sh config/server.properties
4.创建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
创建名testtopic副本区
通list命令查看刚刚创建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.启procer并发送消息启procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
启发送消息
比
test
hello boy
按Ctrl+C退发送消息
6.启consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
启consumerconsole看procer发送消息
启两终端发送消息接受消息
都行查看zookeeper进程kafkatopic步步排查原吧
② kafka消费者java版本读取不到消息怎么办
3. 启动服务
3.1 启动zookeeper
启动zk有两种方式,第一种是使用kafka自己带的一个zk。
bin/zookeeper-server-start.sh config/zookeeper.properties&
另一种是使用其它的zookeeper,可以位于本机也可以位于其它地址。这种情况需要修改config下面的sercer.properties里面的zookeeper地址
。例如zookeeper.connect=10.202.4.179:2181
3.2 启动 kafka
bin/kafka-server-start.sh config/server.properties
4.创建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
创建一个名为test的topic,只有一个副本,一个分区。
通过list命令查看刚刚创建的topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.启动procer并发送消息启动procer
bin/kafka-console-procer.sh --broker-list localhost:9092 --topic test
启动之后就可以发送消息了
比如
test
hello boy
按Ctrl+C退出发送消息
6.启动consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
启动consumer之后就可以在console中看到procer发送的消息了
可以开启两个终端,一个发送消息,一个接受消息。
如果这样都不行的话,查看zookeeper进程和kafka的topic,一步步排查原因吧。
③ 怎么配置sparkstreaming 让他解析kafka中的日志
1、KafkaUtils.createDstream构造函数KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 使用receivers接收数据利用Kafka高层消费者api于所receivers接收数据保存spark executors通Spark Streaming启job处理些数据默认丢失启用WAL志该志存储HDFS A、创建receiverkafka进行定拉取数据sscrdd区kafkatopic区概念故增加特定主体区数仅仅增加receiver消费topic线程数并增加spark并行处理数据数量 B、于同grouptopic使用receivers创建同DStream C、启用WAL需要设置存储级别即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)2.KafkaUtils.createDirectStream区别Receiver接收数据种式定期kafkatopic+partition查询新偏移量再根据偏移量范围每batch面处理数据使用kafka简单消费者api 优点: A、 简化并行需要kafka输入流该创建kafka区rdd数且kafka并行读取B、高效种式并需要WALWAL模式需要数据复制两第kafka复制另写wal C、恰语义(Exactly-once-semantics)传统读取kafka数据通kafka高层api偏移量写入zookeeper存数据丢失能性zookeeperssc偏移量致EOS通实现kafka低层api偏移量仅仅ssc保存checkpoint消除zkssc偏移量致问题缺点使用基于zookeeperkafka监控工具怎么配置sparkstreaming 让他解析kafka中的日志
④ apache kafka有哪几个部分组成
Kafka是一个分布式发布-订阅publish-subscribe消息传递系统,设计目标是快速、可伸缩和耐用冗余。
它是一个在一个较高的抽象水平上描述的但是又非常简单的系统,虽然当你更深地挖掘时会令人难以置信的技术细节。卡夫卡文档出色的解释了系统中许多设计和实现的微妙之处,所以我们不会在这里试图解释他们了。
像许多发布-订阅消息传递系统,卡夫卡能保存源消息的数据。生产者将输入写入到主题topic,而消费者则从主题topic中读取写入数据。因为卡夫卡是一个分布式系统,所以topic主题会实现跨多个节点分区和复制。
消息是简单byte数组,开发者能够使用它们存在对象的任何格式,如String或JSON和Avro,每个消息能够有一个Key,因此,生产者保证拥有相同key的消息到达同一个分区,而对主题topic的消费,可以使用多个消费者来配置消费组,每个消费组中消费者能够从他订阅的那个主题Topic所在分区子集中读取消息,这样每个消息发送给消费组中一个消费者,使用相同key的所有消息能够到达相同的消费者。
Kafka的特点是它将每个主题topic分区都看成一个日志log(一个有顺序的消息集合),一个分区中的每个消息被分配一个唯一的偏移量offset.
Kafka并不试图跟踪哪个消息被哪个消费者读取,而是只是保留未被读取的消息。
Kafka保留所有消息的时间,而消费者负责跟踪它们在每个日志(日志是一个消息序列集合代表一个topic分区)中的位置,
因此Kafka能够支持大量的消费者,使用最小的代价保留大量的数据。
Kafka如何工作?
假设我们正在开发一个大型多人在线游戏。在这些游戏中,玩家在一个虚拟的世界中互相合作和竞争。通常玩家互相发生贸易,比如交换物品和金钱,所以游戏开发者重要的是要确保球员不会欺骗,在下面两种情况下的交易将标记为特殊:如果贸易数量明显大于正常的球员;如果IP玩家登录是不同于过去的20场比赛使用的IP。除了对实时交易进行标记以外,我们也想加载数据到Apache
Hadoop,我们的数据,科学家们可以用它来训练和测试新算法。
基于游戏服务器内存数据缓存进行实时事件标记是最好的,能够让我们达到迅速决定,特别是对那些最活跃玩家。如果我们分区游戏服务器,我们的系统有多个游戏服务器和数据集,,包括过去登录的20个玩家和近20在内存的交易,。
我们的游戏服务器必须执行两个截然不同的角色:第一个是接受和执行用户操作,第二是实时处理贸易信息并标记可疑事件。为了有效执行第二个角色功能,每个用户整个贸易的历史事件都驻留在一个单独的服务器内存中。因为接收用户操作(第一个角色功能)的服务器可能没有他的贸易历史,这意味着我们必须通过服务器之间的消息实现第二个角色功能。为了让两个角色功能保持松散耦合,我们使用卡夫卡在服务器之间传递消息,您将看到如下:
卡夫卡有几个特性:
可伸缩性、数据分区,低延迟,并且能够处理大量不同的消费者。我们为登录和交易配置了一个Topic主题。我们把它们配置成一个topic主题的原因是:只有我们获得已经登录信息(我们可以确保玩家从他平时IP登录)后,才能确保他的交易是有效的。卡夫卡可以在在一个主题topic中维护这个前后顺序,而不是在两个topic之间。
当用户登录后进行交易,接受服务器立即发送事件到Kafka. 消息是将user id作为key, 事件作为值.
这能确保同一个用户的所有交易和登录发送到Kafka同样的分区. 每个事件处理服务器都运行一个Kafka消费者,
其每个消费者都被配置为同样组的一部分,这样,每个服务器从少量Kafka分区读取消息,所有关于某个特定用户的数据都能发往相同事件处理服务器,当事件处理服务器从Kafka读取一个用户交易时,将这个事件加入到用户事件历史缓存本地内存缓存,这样就无需额外网络磁盘开销直接标记那些可以事件。
重要的是我们为每个事件处理服务器创建一个分区,或者每个事件处理服务器上每核对应一个多线程应用,(记住 Kafka大部分是用少于 10,000
个分区实现所有主题topic,这样我们不能为每个用户创建一个分区,因为用户数是不断增加的。)
这好像是一种迂回方式处理事件:从游戏服务器发送事件到Kafka,
另外一台服务器读取这个事件再处理它,这种设计解耦了两种角色功能,允许我们为每个角色功能安排其需要的容量与能力。.另外,这样做不会增加处理时间,因为Kafka是设计为高吞吐量和低延迟的,即使只有一个小的三台节点服务器的集群环境,也能每秒处理接近一百万个事件,平均延迟只有3ms.
当服务器标识出一个事件作为可疑,它会发送这个标记了的事件到新的Kafka
topic,比如其主题名称为Alerts,这时报警服务器或仪表监控服务器会接受到这个事件,同时另外一个单独过程会从Alerts主题中读取这个事件,将它写入到Hadoop进行更进一步分析。
因为Kafka并不跟踪确认每个消费者的消息,它就能用很少的性能影响处理成千上万的消费者。Kafka甚至可以处理批量消费者:每一个小时处理过程唤醒激活处理一个队列中所有消息,根本就不会影响系统的吞吐量或延迟。
⑤ 如何查看kafka消费者信息
在Kafak中国社区的qq群中,这个问题被提及的比例是相当高的,这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。怎么确定分区数?“我应该选择几个分区?”——如果你在Kafka中国社区的群里,这样的问题你会经常碰到的。不过有些遗憾的是,我们似乎并没有很权威的答案能够解答这样的问题。其实这也不奇怪,毕竟这样的问题通常都是没有固定答案的。Kafka官网上标榜自己是"high-",即一个高吞吐量的分布式消息引擎。那么怎么达到高吞吐量呢?Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。但是,这只是一个方面,毕竟单机优化的能力是有上限的。如何通过水平扩展甚至是线性扩展来进一步提升吞吐量呢?Kafka就是使用了分区partition,通过将topic的消息打散到多个分区并分布保存在不同的broker上实现了消息处理不管是procer还是consumer的高吞吐量。Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于procer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费具体如何确定consumer线程数目我们后面会详细说明。所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。但分区是否越多越好呢?显然也不是,因为每个分区都有自己的开销:一、客户端/服务器端需要使用的内存就越多先说说客户端的情况。Kafka082之后推出了Java版的全新的procer,这个procer有个参数batchsize,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数大部分情况下是最佳的消费吞吐量配置的话,那么在consumerclient就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本越久越大。二、文件句柄的开销每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件:base_offsetlog和base_offsetindex。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄filehandler。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit-n的限制。三、降低高可用性Kafka通过副本replica机制来保证高可用。具体做法就是为每个分区保存若干个副本replica_factor指定副本数。每个副本保存在不同的broker上。期中的一个副本充当leader副本,负责处理procer和consumer请求。其他副本充当follower角色,由Kafkacontroller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。说了这么多“废话”,很多人肯定已经不耐烦了。那你说到底要怎么确定分区数呢?答案就是:视情况而定。基本上你还是需要通过一系列实验和测试来确定。当然测试的依据应该是吞吐量。虽然LinkedIn这篇文章做了Kafka的基准测试,但它的结果其实对你意义不大,因为不同的硬件、软件、负载情况测试出来的结果必然不一样。我经常碰到的问题类似于,官网说每秒能到10MB,为什么我的procer每秒才1MB?——且不说硬件条件,最后发现他使用的消息体有1KB,而官网的基准测试是用100B测出来的,因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的procer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数=Tt/maxTp,TcTp表示procer的吞吐量。测试procer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大,因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。另外,Kafka并不能真正地做到线性扩展其实任何系统都不能,所以你在规划你的分区数的时候最好多规划一下,这样未来扩展时候也更加方便。消息-分区的分配默认情况下,Kafka根据传递消息的key来进行分区的分配,即hashkey
⑥ kafka 消费者id是自动生成的吗
手动修改meta.properties文件,在配置的log.dir下
vi ../kafka_data/meta.properties1
So, 如果你想改kafka的broker id,比如第一遍写错了,应该遵循两个步骤:
1. 改server.prorperties文件配置;
2. 改meta.properties,默认情况下,应该在/tmp/kafka-logs目录下;
同时需注意数据存在多个目录时,需要修改多个目录的meta.propertie。
⑦ kafka server.properties怎样配置
Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中Spark Streaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。
然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及其使用方法。
Kafka架构与安全
首先,我们来了解下有关Kafka的几个基本概念:
Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的Topic Name标识。
Procer:向Topic发布消息的进程称为Procer。
Consumer:从Topic订阅消息的进程称为Consumer。
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。
Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Procer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Procer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行relalance. Broker接收和发送消息是被动的:由Procer主动发送消息,Consumer主动拉取消息。
然而,分析Kafka框架,我们会发现以下严重的安全问题:
1.网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Procer的消息,能够篡改消息并发送给Consumer。
2.网络中的任何一台主机,都可以启动恶意的Procer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。
3.Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。
4.Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Procer)都能对任意Topic读取(或发送)消息。
随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸弹,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。
Kafka安全设计
基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:
身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。
权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。
基于Kerberos的身份机制如下图所示:
Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。
Procer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:
1.Procer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出
2.Procer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Procer返回SessionKey(会话密钥)和ServiceTicket(服务票证)
3.Procer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Procer通信的SessionKey,然后使用SessionKey验证Procer的身份,通过则建立连接,否则拒绝连接。
ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。
Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl/<topic>/<user>,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。
另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。
构建安全的Kafka服务
首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/server.properties,安全相关的参数如下所示:
其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab.
认证模式为ipaddress时,Procer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:
public class SecureProcer extends Thread {
private final kafka.javaapi.procer.Procer<Integer, String> procer;
private final String topic;
private final Properties props = new Properties();
public SecureProcer(String topic) {
AuthenticationManager.setAuthMethod(“kerberos”);
AuthenticationManager.login(“procer1″, “/etc/procer1.keytab”);
props.put(“serializer.class”, “kafka.serializer.StringEncoder”);
props.put(“metadata.broker.list”,
“172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092″);
// Use random partitioner. Don’t need the key type. Just set it to Integer.
// The message is of type String.
procer = new kafka.javaapi.procer.Procer<Integer, String>(
new ProcerConfig(props));
this.topic = topic;
⑧ kafka的consumer.properties的group.id到底有什么用
kafka的consumer.properties的group.id到底有什么用,
在kafka分布式集群部署时,消费者的group.id,是否需要和consumer.properties配置的group.id一致。
我两个不同的topic,分别使用两个consumer消费。
其中一个consumer必须设置group.id和consumer.properties配置的group.id一致,才能消费消息。
另一个consumer必须设置group.id和consumer.properties配置的group.id不一致,才能消费消息。
⑨ kafka0.8怎么配置高版本的zookeeper
我这里是使用的是,kafka自带的zookeeper。以及关于kafka的日志文件啊,都放在默认里即/tmp下,我没修改。保存默认的 1、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ jps 2625 Jps 2、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties 此刻,这时,会一直停在这,因为是前端运行。另开一窗口, 3、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties 也是前端运行。
⑩ 10.kafka消费者如何分配分区
从好的方面来说,引入多个consumer的初衷大多是为了提升消费性能,即提升消费的吞吐量。试想你的业务消费代码打算消费100个分区的数据,使用一个consumer消费有很大可能使得各个分区的消费进度不均匀,且单个consumer单次poll回来的数据量是有限制的,最终消费端总的TPS也受限于单consumer的性能。
从不好的方面看,某个组内的consumer数越多,通常意味着该group经rebalance后达到稳定状态的时间也就越长,因而你可能需要为max.poll.interval.ms设置更大的值。曾经也见过国外有个用户发帖子抱怨说他的某个consumer group下有100个consumer,每次rebalance一次都要10分钟。具体的原因就在于coordinator需要等待所有的组成员都发送JoinGroup请求后才会将group置于AwaitingSync状态,然后等待leader成员分配方案并将方案发送给它,之后coordinator下发分配方案给各个成员。