当前位置:首页 » 服务存储 » zk存储消息的位置发生变化
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

zk存储消息的位置发生变化

发布时间: 2023-01-13 08:49:57

A. Zookpeer是什么在系统中如何起作用

Zookeeper分布式服务框架是Apache Hadoop的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题。如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

我们先看看它都提供了哪些功能,然后再看看使用它的这些功能能做点什么。

简单的说,zookeeper=文件系统+通知机制。

Zookeeper维护一个类似文件系统的数据结构:

每个子目录项如 NameService 都被称作为 znode,和文件系统一样,我们能够自由的增加、删除znode,在一个znode下增加、删除子znode,唯一的不同在于znode是可以存储数据的。

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。

这个似乎最简单,在zookeeper的文件系统里创建一个目录,即有唯一的path。在我们使用tborg无法确定上游程序的部署机器时即可与下游程序约定好path,通过path即能互相探索发现,不见不散了。

程序总是需要配置的,如果程序分散部署在多台机器上,要逐个改变配置就变得困难。
可以把这些配置全部放到zookeeper上去,保存在 Zookeeper 的某个目录节点中,然后所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中就好。

集群管理无在乎两点:是否有机器退出和加入、选举master。

对于第一点,所有机器约定在父目录GroupMembers下创建临时目录节点,然后监听父目录节点的子节点变化消息。一旦有机器挂掉,该机器与 zookeeper的连接断开,其所创建的临时目录节点被删除,所有其他机器都收到通知:某个兄弟目录被删除,于是,所有人都知道:它下船了。当然又会有新机器加入,也是类似:所有机器收到通知---新兄弟目录加入,highcount又有了,有人上船了。

对于第二点,我们假设机器创建临时顺序编号目录节点,每次选取编号最小的机器作为master就好。

有了zookeeper的一致性文件系统,锁的问题变得容易。锁服务可以分为两类,一个是保持独占,另一个是控制时序。

对于第一类,我们将zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。厕所有言:来也冲冲,去也冲冲,用完删除掉自己创建的distribute_lock 节点就释放出锁。

对于第二类, /distribute_lock 已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,和选master一样,编号最小的获得锁,用完删除,依次方便。

两种类型的队列:

1、 同步队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。

2、队列按照 FIFO 方式进行入队和出队操作。

第一类,在约定目录下创建临时目录节点,监听节点数目是否是我们要求的数目。

第二类,和分布式锁服务中的控制时序场景基本原理一致,入列有编号,出列按编号。

Zookeeper中的角色主要有以下三类:

系统模型如图所示:

Zookeeper的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两种模式,它们分 别是恢复模式(选主)和广播模式(同步)。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和 leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和Server具有相同的系统状态。

为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上 了zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个 新的epoch,标识当前属于那个leader的统治时期。低32位用于递增计数。

每个Server在工作过程中有三种状态:

当leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有的 Server都恢复到一个正确的状态。Zk的选举算法有两种:一种是基于basic paxos实现的,另外一种是基于fast paxos算法实现的。系统默认的选举算法为fast paxos。先介绍basic paxos流程:

通过流程分析我们可以得出:要使Leader获得多数Server的支持,则Server总数必须是奇数2n+1,且存活的Server的数目不得少于n+1.

选完leader以后,zk就进入状态同步过程。

Leader主要有三个功能:

PING消息是指Learner的心跳信息;REQUEST消息是Follower发送的提议信息,包括写请求及同步请求;ACK消息是 Follower的对提议的回复,超过半数的Follower通过,则commit该提议;REVALIDATE消息是用来延长SESSION有效时间。
Leader的工作流程简图如下所示,在实际实现中,流程要比下图复杂得多,启动了三个线程来实现功能。

Follower主要有四个功能:

Follower的消息循环处理如下几种来自Leader的消息:

Follower的工作流程简图如下所示,在实际实现中,Follower是通过5个线程来实现功能的。

https://blog.csdn.net/xinguan1267/article/details/38422149
https://blog.csdn.net/gs80140/article/details/51496925
https://www.2cto.com/kf/201708/668587.html
https://blog.csdn.net/milhua/article/details/78931672

P.S. 这篇文章是本人对网络上关于ZK的文章阅读之后整理所得,作为入门级的了解。个人觉得看了上面的内容就能基本了解Zookeeper的作用了,后面在结合实际项目使用加深自己的了解。

end

B. Pulsar 的消息存储机制和 Bookie 的 GC 机制原理

[TOC]

本文是 Pulsar 技术系列中的一篇,主要简单梳理了 Pulsar 消息存储与 BookKeeper 存储文件的清理机制。其中,BookKeeper 可以理解为一个 NoSQL 的存储系统,默认使用 RocksDB 存储索引数据。

Pulsar 的消息存储在 BookKeeper 中,BookKeeper 是一个胖客户的系统,客户端部分称为 BookKeeper,服务器端集群中的每个存储节点称为 bookie。Pulsar 系统的 broker 作为 BookKeeper 存储系统的客户端,通过 BookKeeper 提供的客户端 SDK 将 Pulsar 的消息存储到 bookies 集群中。

Pulsar 中的每个 topic 的每个分区(非分区 topic,可以按照分区 0 理解,分区 topic 的编号是从 0 开始的),会对应一系列的 ledger,而每个 ledger 只会存储对应分区下的消息。对于每个分区同时只会有一个 ledger 处于 open 即可写状态。

Pulsar 在生产消息,存储消息时,会先找到当前分区使用的 ledger,然后生成当前消息对应的 entry ID,entry ID 在同一个 ledger 内是递增的。非批量生产的情况(procer 端可以配置这个参数,默认是批量的),一个 entry 中包含一条消息。批量方式下,一个 entry 可能包含多条消息。而 bookie 中只会按照 entry 维度进行写入、查找、获取。

因此,每个 Pulsar 下的消息的 msgID 需要有四部分组成(老版本由三部分组成),分别为(ledgerID,entryID,partition-index,batch-index),其中,partition-index 在非分区 topic 的时候为 -1,batch-index 在非批量消息的时候为 -1。

每个 ledger,当存在的时长或保存的 entry 个数超过阈值后会进行切换,同一个 partition 下的,新的消息会存储到下一个 ledger 中。Ledger 只是一个逻辑概念,是数据的一种逻辑组装维度,并没有对应的实体。

BookKeeper 集群中的每个 bookie 节点收到消息后,数据会分三部分进行存储处理,分别为:journal 文件、entryLog 文件、索引文件。

其中 journal 文件,entry 数据是按照 wal 方式写入的到 journal 文件中,每个 journal 文件有大小限制,当超过单个文件大小限制的时候会切换到下一个文件继续写,因为 journal 文件是实时刷盘的,所以为了提高性能,避免相互之间的读写 IO 相互影响,建议存储目录与存储 entrylog 的目录区分开,并且给每个 journal 文件的存储目录单独挂载一块硬盘(建议使用 ssd 硬盘)。journal 文件只会保存保存几个,超过配置个数的文件将会被删除。entry 存储到 journal 文件完全是随机的,先到先写入,journal 文件是为了保证消息不丢失而设计的。

如下图所示,每个 bookie 收到增加 entry 的请求后,会根据 ledger id 映射到存储到那个 journal 目录和 entry log 目录,entry 数据会存储在对应的目录下。目前 bookie 不支持在运行过程中变更存储目录(使用过程中,增加或减少目录会导致部分的数据查找不到)。

如下图所示,bookie 收到 entry 写入请求后,写入 journal 文件的同时,也会保存到 write cache 中,write cache 分为两部分,一部分是正在写入的 write cache, 一部分是正在正在刷盘的部分,两部分交替使用。

write cache 中有索引数据结构,可以通过索引查找到对应的 entry,write cache 中的索引是内存级别的,基于 bookie 自己定义的 ConcurrentLongLongPairHashMap 结构实现。

另外,每个 entorylog 的存储目录,会对应一个 类实例对象,而每个 对象里面会有一个基于 RocksDB 实现的索引结构,通过这个索引可以快速的查到每个 entry 存储在哪个 entrylog 文件中。每个 write cache 在增加 entry 的时候会进行排序处理,在同一个 write cache,同一个 ledger 下的数据是相邻有序的,这样在 write cache 中的数据 flush 到 entrylog 文件时,使得写入到 entrylog 文件中的数据是局部有序的,这样的设计能够极大的提高后续的读取效率。

中的索引数据也会随着 entry 的刷盘而刷盘到索引文件中。在 bookie 宕机重启时,可以通过 journal 文件和 entry log 文件还原数据,保证数据不丢失。

Pulsar consumer 在消费数据的时候,做了多层的缓存加速处理,如下图所示:

获取数据的顺序如下:

上面每一步,如果能获取到数据,都会直接返回,跳过后面的步骤。如果是从磁盘文件中获取的数据,会在返回的时候将数据存储到 read cache 中,另外如果是读取磁盘的操作,会多读取一部分磁盘上的时候,因为存储的时候有局部有序的处理,获取相邻数据的概率非常大,这种处理的话会极大的提高后续获取数据的效率。

我们在使用的过程中,应尽量避免或减少出现消费过老数据即触发读取磁盘文件中的消息的场景,以免对整体系统的性能造成影响。

BookKeeper 中的每个 bookie 都会周期的进行数据清理操作,默认 15 分钟检查处理一次,清理的主要流程如下:

通过上面的流程,我们可以了解 bookie 在清理 entrylog 文件时的大体流程。

需要特别说明的是,ledger 是否是可以删除的,完全是客户端的触发的,在 Pulsar 中是 broker 触发的。

broker 端有周期的处理线程(默认 2 分钟),清理已经消费过的消息所在的 ledger 机制,获取 topic 中包含的 cursor 最后确认的消息,将这个 topic 包含的 ledger 列表中,在这个 id 之前的(注意不包含当前的 ledger id)全部删除(包括 zk 中的元数据,同时通知 bookie 删除对应的 ledger)。

在运用的过程中我们多次遇到了 bookie 磁盘空间不足的场景,bookie 中存储了大量的 entry log 文件。比较典型的原因主要有如下两个。

原因一:

生产消息过于分散,例如,举个极端的场景,1w 个 topic,每个 topic 生产一条,1w 个 topic 顺序生产。这样每个 topic 对应的 ledger 短时间内不会因为时长或者存储大小进行切换,active 状态的 ledger id 分散在大量的 entry log 文件中。这些 entry log 文件是不能删除或者及时压缩的。

如果遇到这种场景,可以通过重启,强制 ledger 进行切换进行处理。当然如果这个时候消费进行没有跟上,消费的 last ack 位置所在的 ledger 也是处于 active 状态的,不能进行删除。

原因二:

GC 时间过程,如果现存的 enrylog 文件比较多,且大量符合 minor 或 major gc 阈值,这样,单次的 minor gc 或者 major gc 时间过长,在这段时间内是不能清理过期的 entry log 文件。

这是由于单次清理流程的顺序执行导致的,只有上次一轮执行完,才会执行下一次。目前,这块也在提优化流程,避免子流程执行实现过长,对整体产生影响。

C. Zookeeper 在 Kafka 中的作用

如上图所示,kafaka集群的 broker,和 Consumer 都需要连接 Zookeeper。
Procer 直接连接 Broker。

Procer 把数据上传到 Broker,Procer可以指定数据有几个分区、几个备份。上面的图中,数据有两个分区 0、1,每个分区都有自己的副本:0'、 1'。

黄色的分区为 leader,白色的为 follower。

leader 处理 partition 的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。 如下图所示,红色的为 leader,绿色的为 follower,leader复制自己到其他 Broker 中:

Topic 分区被放在不同的 Broker 中,保证 Procer 和 Consumer 错开访问 Broker,避免访问单个 Broker造成过度的IO压力,使得负载均衡。

Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来 ,此时就使用到了Zookeeper。在Zookeeper上会有一个专门 用来进行Broker服务器列表记录 的节点:

/brokers/ids

每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0...N]。

Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后, 每个Broker就会将自己的IP地址和端口信息记录 到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。

在Kafka中,同一个 Topic的消息会被分成多个分区 并将其分布在多个Broker上, 这些分区信息及与Broker的对应关系 也都是由Zookeeper在维护,由专门的节点来记录,如:

/borkers/topics

Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点表示Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。

由于同一个Topic消息会被分区并将其分布在多个Broker上,因此, 生产者需要将消息合理地发送到这些分布式的Broker上 ,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。

(1) 四层负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除。

(2) 使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。

与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者, 每条消息都只会发送给分组中的一个消费者 ,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。

消费组 (Consumer Group):
consumer group 下有多个 Consumer(消费者)。
对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。
同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。

在Kafka中,规定了 每个消息分区 只能被同组的一个消费者进行消费 ,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。

在消费者对指定消息分区进行消息消费的过程中, 需要定时地将分区消息的消费进度Offset记录到Zookeeper上 ,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

节点内容就是Offset的值。

消费者服务器在初始化启动时加入消费者分组的步骤如下

注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。

对 消费者分组 中的 消费者 的变化注册监听 。每个 消费者 都需要关注所属 消费者分组 中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。

对Broker服务器变化注册监听 。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。

进行消费者负载均衡 。为了让同一个Topic下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者 与 消息 分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡。

以下是kafka在zookeep中的详细存储结构图:

早期版本的 kafka 用 zk 做 meta 信息存储,consumer 的消费状态,group 的管理以及 offse t的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖

D. Kafka核心组件之控制器和协调器

[TOC]

我们已经知道Kafka的集群由n个的broker所组成,每个broker就是一个kafka的实例或者称之为kafka的服务。其实控制器也是一个broker,控制器也叫leader broker。

他除了具有一般broker的功能外,还负责分区leader的选取,也就是负责选举partition的leader replica。

kafka每个broker启动的时候,都会实例化一个KafkaController,并将broker的id注册到zookeeper,集群在启动过程中,通过选举机制选举出其中一个broker作为leader,也就是前面所说的控制器。

包括集群启动在内,有三种情况触发控制器选举:

1、集群启动

2、控制器所在代理发生故障

3、zookeeper心跳感知,控制器与自己的session过期

按照惯例,先看图。我们根据下图来讲解集群启动时,控制器选举过程。

假设此集群有三个broker,同时启动。

(一)3个broker从zookeeper获取/controller临时节点信息。/controller存储的是选举出来的leader信息。此举是为了确认是否已经存在leader。

(二)如果还没有选举出leader,那么此节点是不存在的,返回-1。如果返回的不是-1,而是leader的json数据,那么说明已经有leader存在,选举结束。

(三)三个broker发现返回-1,了解到目前没有leader,于是均会触发向临时节点/controller写入自己的信息。最先写入的就会成为leader。

(四)假设broker 0的速度最快,他先写入了/controller节点,那么他就成为了leader。而broker1、broker2很不幸,因为晚了一步,他们在写/controller的过程中会抛出ZkNodeExistsException,也就是zk告诉他们,此节点已经存在了。

经过以上四步,broker 0成功写入/controller节点,其它broker写入失败了,所以broker 0成功当选leader。

此外zk中还有controller_epoch节点,存储了leader的变更次数,初始值为0,以后leader每变一次,该值+1。所有向控制器发起的请求,都会携带此值。如果控制器和自己内存中比较,请求值小,说明kafka集群已经发生了新的选举,此请求过期,此请求无效。如果请求值大于控制器内存的值,说明已经有新的控制器当选了,自己已经退位,请求无效。kafka通过controller_epoch保证集群控制器的唯一性及操作的一致性。

由此可见,Kafka控制器选举就是看谁先争抢到/controller节点写入自身信息。

控制器的初始化,其实是初始化控制器所用到的组件及监听器,准备元数据。

前面提到过每个broker都会实例化并启动一个KafkaController。KafkaController和他的组件关系,以及各个组件的介绍如下图:

图中箭头为组件层级关系,组件下面还会再初始化其他组件。可见控制器内部还是有些复杂的,主要有以下组件:

1、ControllerContext,此对象存储了控制器工作需要的所有上下文信息,包括存活的代理、所有主题及分区分配方案、每个分区的AR、leader、ISR等信息。

2、一系列的listener,通过对zookeeper的监听,触发相应的操作,黄色的框的均为listener

3、分区和副本状态机,管理分区和副本。

4、当前代理选举器ZookeeperLeaderElector,此选举器有上位和退位的相关回调方法。

5、分区leader选举器,PartitionLeaderSelector

6、主题删除管理器,TopicDeletetionManager

7、leader向broker批量通信的ControllerBrokerRequestBatch。缓存状态机处理后产生的request,然后统一发送出去。

8、控制器平衡操作的KafkaScheler,仅在broker作为leader时有效。

Kafka集群的一些重要信息都记录在ZK中,比如集群的所有代理节点、主题的所有分区、分区的副本信息(副本集、主副本、同步的副本集)。每个broker都有一个控制器,为了管理整个集群Kafka选利用zk选举模式,为整个集群选举一个“中央控制器”或”主控制器“,控制器其实就是一个broker节点,除了一般broker功能外,还具有分区首领选举功能。中央控制器管理所有节点的信息,并通过向ZK注册各种监听事件来管理整个集群节点、分区的leader的选举、再平衡等问题。外部事件会更新ZK的数据,ZK中的数据一旦发生变化,控制器都要做不同的响应处理。

故障转移其实就是leader所在broker发生故障,leader转移为其他的broker。转移的过程就是重新选举leader的过程。

重新选举leader后,需要为该broker注册相应权限,调用的是ZookeeperLeaderElector的onControllerFailover()方法。在这个方法中初始化和启动了一系列的组件来完成leader的各种操作。具体如下,其实和控制器初始化有很大的相似度。

1、注册分区管理的相关监听器

2、注册主题管理的相关监听

3、注册代理变化监听器

4、重新初始化ControllerContext,

5、启动控制器和其他代理之间通信的ControllerChannelManager

6、创建用于删除主题的TopicDeletionManager对象,并启动。

7、启动分区状态机和副本状态机

8、轮询每个主题,添加监听分区变化的

9、如果设置了分区平衡定时操作,那么创建分区平衡的定时任务,默认300秒检查并执行。

除了这些组件的启动外,onControllerFailover方法中还做了如下操作:

1、/controller_epoch值+1,并且更新到ControllerContext

2、检查是否出发分区重分配,并做相关操作

3、检查需要将优先副本选为leader,并做相关操作

4、向kafka集群所有代理发送更新元数据的请求。

下面来看leader权限被取消时,调用的方法onControllerResignation

1、该方法中注销了控制器的权限。取消在zookeeper中对于分区、副本感知的相应监听器的监听。

2、关闭启动的各个组件

3、最后把ControllerContext中记录控制器版本的数值清零,并设置当前broker为RunnignAsBroker,变为普通的broker。

通过对控制器启动过程的学习,我们应该已经对kafka工作的原理有了了解, 核心是监听zookeeper的相关节点,节点变化时触发相应的操作

有新的broker加入集群时,称为代理上线。反之,当broker关闭,推出集群时,称为代理下线。

代理上线:

1、新代理启动时向/brokers/ids写数据

2、BrokerChangeListener监听到变化。对新上线节点调用controllerChannelManager.addBroker(),完成新上线代理网络层初始化

3、调用KafkaController.onBrokerStartup()处理

3.5恢复因新代理上线暂停的删除主题操作线程

代理下线:

1、查找下线节点集合

2、轮询下线节点,调用controllerChannelManager.removeBroker(),关闭每个下线节点网络连接。清空下线节点消息队列,关闭下线节点request请求

3、轮询下线节点,调用KafkaController.onBrokerFailure处理

4、向集群全部存活代理发送updateMetadataRequest请求

顾名思义,协调器负责协调工作。本节所讲的协调器,是用来协调消费者工作分配的。简单点说,就是消费者启动后,到可以正常消费前,这个阶段的初始化工作。消费者能够正常运转起来,全有赖于协调器。

主要的协调器有如下两个:

1、消费者协调器(ConsumerCoordinator)

2、组协调器(GroupCoordinator)

kafka引入协调器有其历史过程,原来consumer信息依赖于zookeeper存储,当代理或消费者发生变化时,引发消费者平衡,此时消费者之间是互不透明的,每个消费者和zookeeper单独通信,容易造成羊群效应和脑裂问题。

为了解决这些问题,kafka引入了协调器。服务端引入组协调器(GroupCoordinator),消费者端引入消费者协调器(ConsumerCoordinator)。每个broker启动的时候,都会创建GroupCoordinator实例,管理部分消费组(集群负载均衡)和组下每个消费者消费的偏移量(offset)。每个consumer实例化时,同时实例化一个ConsumerCoordinator对象,负责同一个消费组下各个消费者和服务端组协调器之前的通信。如下图:

消费者协调器,可以看作是消费者做操作的代理类(其实并不是),消费者很多操作通过消费者协调器进行处理。

消费者协调器主要负责如下工作:

1、更新消费者缓存的MetaData

2、向组协调器申请加入组

3、消费者加入组后的相应处理

4、请求离开消费组

5、向组协调器提交偏移量

6、通过心跳,保持组协调器的连接感知。

7、被组协调器选为leader的消费者的协调器,负责消费者分区分配。分配结果发送给组协调器。

8、非leader的消费者,通过消费者协调器和组协调器同步分配结果。

消费者协调器主要依赖的组件和说明见下图:

可以看到这些组件和消费者协调器担负的工作是可以对照上的。

组协调器负责处理消费者协调器发过来的各种请求。它主要提供如下功能:

组协调器在broker启动的时候实例化,每个组协调器负责一部分消费组的管理。它主要依赖的组件见下图:

这些组件也是和组协调器的功能能够对应上的。具体内容不在详述。

下图展示了消费者启动选取leader、入组的过程。

消费者入组的过程,很好的展示了消费者协调器和组协调器之间是如何配合工作的。leader consumer会承担分区分配的工作,这样kafka集群的压力会小很多。同组的consumer通过组协调器保持同步。消费者和分区的对应关系持久化在kafka内部主题。

消费者消费时,会在本地维护消费到的位置(offset),就是偏移量,这样下次消费才知道从哪里开始消费。如果整个环境没有变化,这样做就足够了。但一旦消费者平衡操作或者分区变化后,消费者不再对应原来的分区,而每个消费者的offset也没有同步到服务器,这样就无法接着前任的工作继续进行了。

因此只有把消费偏移量定期发送到服务器,由GroupCoordinator集中式管理,分区重分配后,各个消费者从GroupCoordinator读取自己对应分区的offset,在新的分区上继续前任的工作。

下图展示了不提交offset到服务端的问题:

开始时,consumer 0消费partition 0 和1,后来由于新的consumer 2入组,分区重新进行了分配。consumer 0不再消费partition2,而由consumer 2来消费partition 2,但由于consumer之间是不能通讯的,所有consumer2并不知道从哪里开始自己的消费。

因此consumer需要定期提交自己消费的offset到服务端,这样在重分区操作后,每个consumer都能在服务端查到分配给自己的partition所消费到的offset,继续消费。

由于kafka有高可用和横向扩展的特性,当有新的分区出现或者新的消费入组后,需要重新分配消费者对应的分区,所以如果偏移量提交的有问题,会重复消费或者丢消息。偏移量提交的时机和方式要格外注意!!

1、自动提交偏移量

设置 enable.auto.commit为true,设定好周期,默认5s。消费者每次调用轮询消息的poll() 方法时,会检查是否超过了5s没有提交偏移量,如果是,提交上一次轮询返回的偏移量。

这样做很方便,但是会带来重复消费的问题。假如最近一次偏移量提交3s后,触发了再均衡,服务器端存储的还是上次提交的偏移量,那么再均衡结束后,新的消费者会从最后一次提交的偏移量开始拉取消息,此3s内消费的消息会被重复消费。

2、手动提交偏移量

设置 enable.auto.commit为false。程序中手动调用commitSync()提交偏移量,此时提交的是poll方法返回的最新的偏移量。

commitSync()是同步提交偏移量,主程序会一直阻塞,偏移量提交成功后才往下运行。这样会限制程序的吞吐量。如果降低提交频次,又很容易发生重复消费。

这里我们可以使用commitAsync()异步提交偏移量。只管提交,而不会等待broker返回提交结果

commitSync只要没有发生不可恢复错误,会进行重试,直到成功。而commitAsync不会进行重试,失败就是失败了。commitAsync不重试,是因为重试提交时,可能已经有其它更大偏移量已经提交成功了,如果此时重试提交成功,那么更小的偏移量会覆盖大的偏移量。那么如果此时发生再均衡,新的消费者将会重复消费消息。

E. kafka术语和配置介绍

procer 是生产者,负责消息生产,上游程序中按照标准的消息格式组装(按照每个消息事件的字段定义)发送到指定的topic。procer生产消息的时候,不会因为consumer处理能力不够,而阻塞procer的生产。consumer会从指定的topic 拉取消息,然后处理消费,并提交offset(消息处理偏移量,消费掉的消息并不会主动删除,而是kafka系统根据保存周期自动消除)。

topic是消费分类存储的队列,可以按照消息类型来分topic存储。

replication是topic复制副本个数,用于解决数据丢失,防止leader topic宕机后,其他副本可以快代替。

broker是缓存代理,Kafka集群中的一台或多台服务器统称broker,用来保存procer发送的消息。Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。

partition是topic的物理分组,在创建topic的时候,可以指定partition 数量。每个partition是逻辑有序的,保证每个消息都是顺序插入的,而且每个消息的offset在不同partition的是唯一不同的

偏移量。kafka为每条在分区的消息保存一个偏移量offset,这也是消费者在分区的位置。比如一个偏移量是5的消费者,表示已经消费了从0-4偏移量的消息,下一个要消费的消息的偏移量是5。每次消息处理完后,要么主动提交offset,要么自动提交,把offset偏移到下一位,如处理offset=6消息。在kafka配置中,如果enable_auto_commit=True和auto_commit_interval_ms=xx,那表示每xx 毫秒自动提交偏移量

分组。是指在消费同一topic的不同consumer。每个consumer都有唯一的groupId,同一groupId 属于同一个group。不同groupId的consumer相互不影响。对于一个topic,同一个group的consumer数量不能超过 partition数量。比如,Topic A 有 16个partition,某一个group下有2个consumer,那2个consumer分别消费8个partition,而这个group的consumer数量最多不能超过16个。

kafka的配置主要分四类,分别是zookeeper、server、consumer、procer。其他的配置可以忽略。

zk的配置比较简单,也可以默认不改.dataDir是zk存储节点配置的目录地址,clientPort是zk启动的端口,默认2181,maxClientCnxns是限制ip的连接此处,设置0表示无连接次数,一般情况根据业务部署情况,配置合理的值。

F. 如何更改手机微信的消息存储位置

手机的设置——应用程序那 找到微信后可以选择放在手机或者内存卡里
也可以用360软件搬家功能

G. kafka问题求助

Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中Spark Streaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。
然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及其使用方法。
Kafka架构与安全
首先,我们来了解下有关Kafka的几个基本概念:
Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的Topic Name标识。
Procer:向Topic发布消息的进程称为Procer。
Consumer:从Topic订阅消息的进程称为Consumer。
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。
Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Procer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Procer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行relalance. Broker接收和发送消息是被动的:由Procer主动发送消息,Consumer主动拉取消息。

然而,分析Kafka框架,我们会发现以下严重的安全问题:
1.网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Procer的消息,能够篡改消息并发送给Consumer。
2.网络中的任何一台主机,都可以启动恶意的Procer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。
3.Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。
4.Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Procer)都能对任意Topic读取(或发送)消息。
随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸弹,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。
Kafka安全设计
基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:
身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。
权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。
基于Kerberos的身份机制如下图所示:

Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。
Procer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:
1.Procer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出
2.Procer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Procer返回SessionKey(会话密钥)和ServiceTicket(服务票证)
3.Procer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Procer通信的SessionKey,然后使用SessionKey验证Procer的身份,通过则建立连接,否则拒绝连接。
ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。
Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl/<topic>/<user>,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。
另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。
构建安全的Kafka服务
首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/server.properties,安全相关的参数如下所示:

其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab.
认证模式为ipaddress时,Procer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:
public class SecureProcer extends Thread {
private final kafka.javaapi.procer.Procer<Integer, String> procer;
private final String topic;
private final Properties props = new Properties();
public SecureProcer(String topic) {
AuthenticationManager.setAuthMethod(“kerberos”);
AuthenticationManager.login(“procer1″, “/etc/procer1.keytab”);
props.put(“serializer.class”, “kafka.serializer.StringEncoder”);
props.put(“metadata.broker.list”,
“172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092″);
// Use random partitioner. Don’t need the key type. Just set it to Integer.
// The message is of type String.
procer = new kafka.javaapi.procer.Procer<Integer, String>(
new ProcerConfig(props));
this.topic = topic;
}
. . .
Topic权限管理
Topic的权限管理主要是通过AuthorizationManager这个类来完成的,其类结构如下图所示:

其中,resetPermission(user, Permissions, topic) 为重置user对topic的权限。
grant(user, Permissions, topic) 为赋予user对topic权限。
revoke(user, Permissions, topic) 为取消user对topic权限。
isPermitted(user, Permissions, topic) 为检查user对topic是否具有指定权限。
调用grant或revoke进行权限设置完成后,需要commit命令提交修改到ZooKeeper
Kerberos模式下,AuthorizationManager需要先使用AuthenticationManager.login方法登录,与ZooKeeper建立安全的连接,再进行权限设置。示例代码如下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(“authentication”, “kerberos”);
props.setProperty(“zookeeper.connect”, “172.16.2.116:2181,172.16.2.117:2181,172.16.2.118:2181″);
props.setProperty(“principal”, “kafka/host1@TDH”);
props.setProperty(“keytab”, “/usr/lib/kafka/config/kafka.keytab”);
ZKConfig config = new ZKConfig(props);
AuthenticationManager.setAuthMethod(config.authentication());
AuthenticationManager.login(config.principal(), config.keytab());
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 172.16.1.87 on topic test
authzManager.resetPermission(“172.16.1.87″,
new Permissions(Permissions.READ, Permissions.WRITE), “test”);
// grant permission WRITE to ip 172.16.1.87 on topic test
authzManager.grant(“172.16.1.87″, new Permissions(Permissions.CREATE), “test”);
// revoke permission READ from ip 172.16.1.87 on topic test
authzManager.revoke(“172.16.1.87″, new Permissions(Permissions.READ), “test”);
// commit the permission settings
authzManager.commit();
authzManager.close();
}
}

ipaddress认证模式下,取消和赋予权限的操作如下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(“authentication”, “ipaddress”);
props.setProperty(“zookeeper.connect”,
“172.16.1.87:2181,172.16.1.88:2181,172.16.1.89:2181″);
ZKConfig config = new ZKConfig(props);
// new authorization manager
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 172.16.1.87 on topic test
authzManager.resetPermission(“172.16.1.87″,
new Permissions(Permissions.READ, Permissions.WRITE), “test”);
// grant permission WRITE to ip 172.16.1.87 on topic test
authzManager.grant(“172.16.1.87″, new Permissions(Permissions.CREATE), “test”);
// revoke permission READ from ip 172.16.1.87 on topic test
authzManager.revoke(“172.16.1.87″, new Permissions(Permissions.READ), “test”);
// commit the permission settings
authzManager.commit();
authzManager.close();
}
}
总结与展望
本文通过介绍Kafka现有架构,深入挖掘其中存在的安全问题,并给出Transwarp在Kafka安全上所做的工作及其使用方式。然而,纵观Hadoop & Spark生态系统,安全功能还存在很多问题,各组件的权限系统独立混乱,缺少集中易用的账户管理系统。某些组件的权限管理还很不成熟,如Spark的调度器缺少用户的概念,不能限制具体用户使用资源的多少。Transwarp基于开源版本,在安全方面已有相当多的积累,并持续改进开发,致力于为企业用户提供一个易用、高效、安全和稳定的基础数据平台。

H. kafka offset的存储

offset即消费消息的偏移值,记录了kafka每个consumergroup的下一个需要读取消费位置,保障其消息的消费可靠性。

kafka0.8.1.1以前,offset保存在zk中,存放在/consumers节点下。但是由于频繁访问zk,zk需要一个一个节点更新offset,不能批量或分组更新,导致offset更新成了瓶颈。后续两个过渡版本增加了参数“offsets.storage”,该参数可配置为“zookeeper”或“kafka”分别表示offset的保持位置在zk或是broker,默认保留在zk,0.9版本以后offset就默认保存在broker下。若配置的“kafka”,当设置了“al.commit.enabled”参数时,offset仍然可以提交到zk。
zk中保存offset结构为:

注意:由于kafka对客户端client向下兼容,低版本的client仍然能够通过链接zk消费数据,并提交offset数据,即使broker版本高于0.9,提交的offset仍然保存在zk;此时仍然存在offset更新瓶颈问题,所以建议尽量使用高版本client,通过链接broker方式消费数据。

例如:kafka broker版本2.6.0,consumer版本0.8.2.1:

构建consumer:

启动消费者消费全部10条历史消息,查看zk下/consumer节点的消费者信息:

可以看到group “test_group1”对topic “test1”的3个partition消费情况,offset分别为6,2,2。
这里kafka只记录了每个group的消费情况,没有对某一个consumer做单独记录。早期版本/ids节点记录consumer id信息,owner节点记录各个partition所属consumer信息

如上所述,新版本中offset由broker维护,offset信息由一个特殊的topic “ __consumer_offsets”来保存,offset以消息形式发送到该topic并保存在broker中。这样consumer提交offset时,只需连接到broker,不用访问zk,避免了zk节点更新瓶颈。
broker消息保存目录在配置文件server.properties中:

该目录下默认包含50个以__consumer_offsets开头的目录,用于存放offset:

offset的存放位置决定于groupid的hash值,其获取方式:

其中numPartitions由offsets.topic.num.partitions参数决定,默认值即50。以groupid “test-group”为例,计数其存储位置为:__consumer_offsets-12,当其消费全部10条数据后,使用命令查看该目录下消息记录:kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --partition 12 --from-beginning --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

该数据结构为以groupid-topic-partition作为key,value为OffsetAndMetadata,其中包含了offset信息。可以看到group“test-group”在topic“test1”的三个partition下offset值分别为6,2,2。同保存在zk数据一样,offset只记录groupid的消费情况,对于具体consumer是透明的。

那么offset具体被发送给哪个broker保存呢?
由上文可知,offset的存储分区是通过groupid的hash值取得的,那么offset发送的broker就是该分区的leader broker,这也符合kafka普通消息的发生逻辑。所以,每个group的offset都将发生到一个broker,broker中存在一个offset manager 实例负责接收处理offset提交请求,并返回提交操作结果。

参考:
https://wanwenli.com/kafka/2016/11/04/Kafka-Group-Coordinator.html
https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management

I. qq消息记录更改了存储位置怎么办

1、 在想要存储的区域新建文件夹,如: E:\099 Chat Data\Tencent Files\。

在这里插入图片描述
2、 打开电脑QQ,设置——文件管理。

在这里插入图片描述
3、 点击浏览,选择099 Chat Data下的Tencent Files,注意这里与微信的选择不同,微信是选择WeChat Files上一级文件夹后自动生成WeChat Files文件夹,而QQ是预先手动新建Tencent Files文件夹。

J. 如何解决doc文档存储后内容位置发生变化

给你发了消息,请查收一下