Kafka
基础
介绍
Kafka 是一个分布式,多分区,多副本,多生产者,多订阅者的日志系统。
- 特性
- 每个 partition 内的消息顺序传输。
- 两种主要的消息传递模式:点对点传递模式、发布-订阅模式。
- 只有消息的拉取,没有推送,可以通过轮询实现消息的推送。
架构
- 消息(Message)
Kafka 的数据单元称为消息,可以类比数据库内的 ‘数据行’ 或 ‘一条记录’,消息由字节组成。消息有键,键也是一个字节数组,当消息以一种可控的方式写入不同分区时,会用到键。
- 批次(Batch)
消息被分批写入 Kafka,批次就是一组消息,这些消息属于同一个主题和分区。
- 主题(Topic)
Kafka 内的消息通过主题进行分类,主题可以看作一类消息的集合。
- 分区(Partition)
主题可以被分为若干分区,一个主题通过分区分布于 Kafka 集群中。(每个分区上的数据是不一样的,它们共同构成 Kafka 主题上的数据。)每个主题至少有一个分区。每个分区中的数据使用多个 Segment 文件存储。分区中的数据是有序的,不同分区间的数据丢失了数据的顺序。如果主题有多个分区,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将分区数目设为 1。
- 生产者(Producer)
生产者负责把消息均衡地分布到主题的所有分区上,可以通过下面几种方式:
- 直接指定消息的分区
- 根据消息的 key 散列取模计算分区
- 轮询指定分区
- 消费者(Consumer)
消费者负责消费数据,根据数据偏移量来区分消息是否已读。
- 消费组(Consumer Group)
多个消费者消费同一个主题不同分区上的数据,那么这些消费者就可以加入到同一消费组。
消费组是逻辑上的概念,是 Kafka 实现单播和广播两种消息模型的手段。
消费组可以保证一个消费组获取到特定主题的全部的消息。并且在消费组内部,若干个消费者消费主题分区的消息,同时也可以保证一个主题的每个分区只被消费组中的一个消费者消费。
- Broker
一个独立的 Kafka 服务器。
- 集群(Cluster)
多个 Kafka 服务器组成的服务。
- 副本(Replica)
为实现数据备份功能,一个主题的每个分区都有若干个副本,一个 Leader 副本和若干个 Follower 副本。
生产者
数据生产流程
- Producer 在创建的时候,会创建一个 Sender 线程并设置为守护线程。
- 生产消息时,内部其实是异步流程,消息的主要流程为:拦截器 -> 序列化器 -> 分区器,然后将消息缓存在缓冲区(该缓冲区也是在 Producer 创建时创建)。
- 缓冲区数据大小达到 batch.size 或者 linger.ms 的上限,(哪个先达到就算哪个)批次发送消息到分区。
- 批次发送消息到分区后,消息会落盘到 Broker;如果生产者配置了 retrires 参数大于 0 并且失败原因允许重试,那么客户端内部会对该消息进行重试。
- 落盘到 Broker 成功,返回生产元数据给生产者。(元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。)
生产者配置
参考如下文档地址:
https://kafka.apachecn.org/documentation.html#producerconfigs
消费者
消费者从订阅的主题内消费数据,消费的数据偏移量保存在 Kafka 内的 __consumer_offsets 主题中。
多个从同一个主题消费的消费者可以加入到一个消费组中。
Kafka 消费数据采用的是 Pull 的方式,(如果采取 Push 的模式,那么最大的缺点就是 Broker 不清楚消费者的消费速度,并且推送速率是 Broker 控制的,这样就很容易造成消息堆积)选择 Pull 模式,这时消费者可以根据自己的情况来拉取数据,也可以进行延迟处理。
如果 Broker 没有消息,那么每次消费者拉取的数据都是空数据,会一直循环返回空数据,针对这个问题,消费者在每次消费数据的时候,都会传递一个参数 timeout ,当返回空数据的时候,会进行阻塞,需要等待 timeout 再去消费,直到数据到达。
分区策略
轮询策略
将消息顺序分配到各个分区中,假设一个主题下有三个分区,第一条消息将被发送到分区 0,第二条消息被发送到分区 1,第三条消息被发送到分区 2,以此类推,第四条消息将被发送到分区 0。
随机策略
将消息随机地放到任意一个分区上,本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
按消息键保存策略
Kafka 允许为每条消息创建消息键,称为 Key,每个 Key 可以代表具体的业务含义,如业务 Id,用户 Id 等。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略。
再平衡
再平衡是一个协议,规定了如何让消费者组下的所有消费者来分配主题中的每一个分区。
触发再平衡的条件有三个:
- 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
- 主题的分区数发生变更,Kafka 目前只支持增加分区,当增加的时候就会触发重平衡。
- 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡。
要尽量避免再平衡
因为再平衡的过程中,消费者无法从 Kafka 消费消息,这对 Kafka 的性能影响极大,而如果 Kafka 集群内节点较多,比如数百个,那再平衡可能会耗时极多。
消费者故障是引起再平衡最常见的方式。
除去正常的消费者挂掉(网络问题),在实际中,会存在 Kafka 错误地认为一个正常的消费者已经挂掉的情况,应尽量避免这样的情况出现。
在分布式系统中,通常通过心跳来维持分布式系统,但由于会存在负载和网络的阻塞,所以一般需要控制心跳的超时时间,在 Kafka 内通过参数
session.timout.ms
来控制这个超时时间。同时,参数
heartbeat.interval.ms
可控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。除此之外,参数
max.poll.interval.ms
也可以用来控制。消费者每次拉取数据后,都需要一些时间来处理,再进行下一次拉取,两次拉取的时间间隔如果超过这个参数的设置,那么消费者就会被踢出消费者组。
- session.timout.ms -> 控制心跳超时时间,一般配置为 6s
- heartbeat.interval.ms -> 控制心跳发送频率,一般配置为 2s
- max.poll.interval.ms -> 控制消费者两个拉取数据的时间间隔,一般配置为消费者处理最长耗时 + 60s
消费者配置
https://kafka.apachecn.org/documentation.html#newconsumerconfigs
Broker
分区
Kafka 集群包含若干个 Broker,broker.id 指定 Broker 的编号,编号不能重复;Kafka 集群上创建的主题,又包含若干个分区,每个分区又包含若干个副本,副本又包括 Leader 副本和 Follower 副本。
Kafka 分区选举的过程如下:
- Kafka 使用 Zookeeper 的分布式锁选举控制器,并在节点加入集群或者退出集群时通知控制器。
- 控制器负责在节点加入或者离开时进行分区 Leader 选举。
- 控制器使用版本号来避免脑裂。(脑裂:两个节点同时认为自己是当前控制器)
副本
- 创建主题的时候可以指定副本数(replication-factor 配置),但副本数不能超过 broker 的数量。
- 生产者负责把消息发送给 Leader,Leader 负责读写,Follower 定期到 Leader 上 pull 数据。
- Leader 会维护当前活跃副本列表(ISR),如果一个 Follower 落后太多,Leader 会将它从 ISR 中移除。(落后的意思是:Follower 长时间没有向 Leader 发送心跳(时间有 replica.lag.time.max.ms 配置,默认10000ms))
日志存储
日志文件
Kafka 的消息根据主题进行归类,每个主题又可分为一个或多个分区,每个分区各自存在一个消费数据的日志文件。
在各个分区文件中,会存在多种类型的文件,如下图:
其中重点关注以下三种类型的文件:
- .index 偏移量索引文件,用于记录消息偏移量与物理地址之间的映射关系。
- .timestamp 时间戳索引文件,根据时间戳查找对应的偏移量。
- .log 日志文件
每一个主题每个分区下的日志文件,都带有一个基准偏移量,表示当前第一条消息的 offset。(偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则,如 日志文件名为 00000000000000000121.log,则当前日志文件的一条数据偏移量就是121(偏移量从 0 开始)。)
- 日志文件切分的条件
(满足以下一个条件就会切分)
- 日志分段文件大小超过 broker 端参数 log.segment.bytes 配置的值。(log.segment.bytes 参数的默认值为1G。)
- 日志分段中消息最大时间戳与当前系统的时间戳的差值大于 log.roll.ms 或 log.roll.hours 参数配置的值。(如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值为 7 天)
- 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes配置的值。(log.index.size.max.bytes 的默认值为 10MB。)
- 追加消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE ,即要追加的消息的偏移量不能转变为相对偏移量。
经验
Kafka 到底会不会丢数据
Kafka 只对已提交的消息做最大限度的持久化保证不丢失。
- 已提交
当 Kafka 中的 Broker 成功收到一条消息并写入到日志文件后,那么这个消息在 Kafka 中就变成已经提交。(这个可以选择只要一个 Broker 成功还是所有 Broker 都成功消息才算已经提交)
- 最大限度的持久化保证不丢失
Kafka 并不能保证在任何情况下都能做到数据不丢失,不丢失数据是有前提条件的,例如消息保存在多个 Broker 上,那么这些 Broker 至少要保证有一个是存活的。
发送端
- 消息根本就没发送到 Broker 上
- 网络问题:由于网络抖动导致数据根本就没发送到 Broker 端。
- 数据原因:消息体太大超出 Broker 的承受范围导致 Broker 拒收消息。
另外可以通过配置发送端的参数 acks 来确认消息是否生产成功。
- acks = 0:数据发送后就自认为发送成功,这时如果发生网络抖动, Producer 端并不会校验数据自然也就丢了,且无法重试。
- acks = 1:消息发送给 Leader 接收成功就表示发送成功,这时只要 Leader 没有挂掉,就可以保证 Leader 不丢数据,但是如果 Leader 异常挂掉了, Follower 还未同步完数据,这时就会丢数据。
- acks = -1 或者 all: 消息发送需要等待同步列表 (ISR) 中 Leader 和 所有的 Follower 都确认收到消息才算发送成功, 可靠性最高, 但也不能保证不丢数据,比如当同步列表 (ISR) 中只剩下 Leader 了, 这样就变成 acks = 1 的情况了。
解决方案
- 使用带回调通知函数的方法进行发送消息,即这样一旦发现发送失败, 就可以做针对性处理。
- 设置 acks = -1/ all。
- 设置消息发送的重试次数(retries,2.4 版本默认为 Integer.MAX_VALUE)。
- 设置消息发送的重试时间(retry.backoff.ms 默认为 100ms)
Broker 端
Broker 接收到数据后会将数据进行持久化存储到磁盘,为了提高吞吐量和性能,采用的是异步批量刷盘的策略,也就是按照一定消息和间隔时间进行刷盘。
首先将数据存储到 PageCache 中,至于什么时候将 Cache 中的数据刷盘是由操作系统根据自己的策略决定或者调用 fsync 命令进行强制刷盘,如果此时 Broker 挂掉,且选举一个落后 Leader 很多的 Follower 成为新的 Leader ,那么落后的消息数据就会丢失。
解决方案
- 设置 unclean.leader.election.enable = false。这个参数表示有哪些 Follower 可以有资格被选举为 Leader,如果一个 Follower 的数据落后 Leader 太多,那么一旦它被选举为新的 Leader, 数据就会丢失,我们将其设置为 false,防止此类情况发生。
- 设置 replication.factor >= 3,设置分区副本个数,设置大于3,保证如果有 Leader 异常,Follower 会被选举为新的 Leader。
- 设置 min.insync.replicas > 1,该参数表示消息至少要被写入成功到同步列表(ISR) 多少个副本才算已提交,设置大于1,这样才可以提升消息持久性,保证数据不丢失。
- 另外确保分区副本的设置要大于最小同步列表的设置(replication.factor > min.insync.replicas),如果相等,只要有一个副本异常挂掉,整个分区就无法工作了。
消费端
- 拉取消息后先提交 Offset ,后处理消息。
如果此时处理消息的时候宕机,由于 Offset 已经提交,待消费者重启之后,会从已经提交的 Offset 下一个位置重新开始消费,之前未处理完成的消息不会再被处理,此时数据丢失。
- 拉取消息后先处理消息,在进行提交 Offset。
如果此时在提交之前发生异常宕机,由于没有提交成功 Offset, 待下次重启消费者后还会从上次的 Offset 重新拉取消息,不会出现消息丢失的情况, 但是会出现重复消费的情况,这里只能业务自己保证幂等性。
解决方案
- 设置 enable.auto.commit = false, 采用手动提交位移的方式。
- 拉取消息后先处理消息,在进行提交 Offset。
- 业务自己保证幂等性,确保只成功消费一次即可。
Kafka 添加认证
版本: kafka_2.13-2.7.2.jar
/kafka/config
- 修改 server.properties
# 配置监听
listeners=SASL_PLAINTEXT://172.16.0.201:9092
advertised.listeners=SASL_PLAINTEXT://172.16.0.201:9092
# 使用认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
# SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
allow.everyone.if.no.acl.found=true
# 配置超级用户, 作用在集群之前的通信
super.users=User:admin
- 修改 zookeeper.properties
# 开启认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
# 认证规则
requireClientAuthScheme=sasl
# 延长登录时间
jaasLoginRenew=3600000
- 新增 kafka_server_jaas.conf 配置文件
- username 和 password 用于集群服务器之间的通信
- 定义 data 用户,密码为 123456 用于 KafKa 服务之间的通信
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_data="123456";
};
- 新增 kafka_client_jaas.conf 配置文件
- 这里的 username 和 password 对应的是配置文件 kafka_server_jaas.conf 自定义的用于 KafKa 服务之前的通信的用户名和密码
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="data"
password="123456";
};
- 新增 zookeeper_jaas.conf 配置文件
- username 和 password 用于集群服务器之间的通信
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_data="123456";
};
/kafka/bin
- 修改 kafka-server-start.sh
- 在 exec base_dir/kafka-run-class.sh EXTRA_ARGS kafka.Kafka “$@” 之前添加如下代码
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
fi
- 修改 kafka-console-producer.sh
- 在 exec (dirname 0)/kafka-run-class.sh kafka.tools.ConsoleProducer “$@” 文件之前添加如下代码
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"
fi
- 修改 kafka-console-consumer.sh
- 在exec base_dir/kafka-run-class.sh EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain “$@” 文件之前添加如下代码
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"
fi
- 修改 zookeeper-server-start.sh
- 在 base_dir/kafka-run-class.sh EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain “$@” 文件之前添加如下代码
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/zookeeper_jaas.conf"
fi
SpringBoot 添加对应的配置
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="data" password="123456";