Kafka_高可用
为了更好的做负载均衡,Kafka 尽量将所有的 Partition 均匀分配到整个集群上。一个典型的部署方式是一个 Topic 的 Partition 数量大于 Broker 的数量。同时为了提高 Kafka 的容错能力,也需要将同一个 Partition 的 Replica 尽量分散到不同的机器。实际上,如果所有的 Replica 都在同一个 Broker 上,那一旦该 Broker 宕机,该 Partition 的所有 Replica 都无法工作,也就达不到 HA 的效果。同时,如果某个 Broker 宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有 Broker 上。
Kafka分配Replica的算法如下:
- 将所有
Broker(假设共n个Broker)和待分配的Partition排序。 - 将第
i个Partition分配到第(i mod n )个Broker上。 - 将第
i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上。
Producer在发布消息到某个 Partition 时,先通过 ZooKeeper 找到该 Partition 的 Leader ,然后无论该 Topic 的 ReplicationFactor 为多少,Producer 只将该消息发送到该 Partition 的 Leader 。Leader 会将该消息写入其本地 Log。每个 Follower 都从 Leader Pull 数据。这种方式上,Follower 存储的数据顺序与 Leader 保持一致。Follower 在收到该消息并写入其 Log 后,向 Leader 发送 ACK。一旦 Leader 收到了 ISR 中的所有 Replica 的ACK ,该消息就被认为已经 Commit了,Leader 将增加 HW 并且向 Producer 发送 ACK。
为了提高性能,每个 Follower 在接收到数据后就立马向 Leader 发送 ACK,而非等到数据写入 Log 中。因此,对于已经 Commit的消息,Kafka 只能保证它被存于多个 Replica 的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被 Consumer 消费。
Consumer 读消息也是从 Leader 读取,只有被 Commit 过的消息才会暴露给 Consumer。

对于Kafka而言,定义一个Broker是否“活着”包含两个条件:
- 1是它必须维护与
ZooKeeper的Session(这个通过ZooKeeper的Heartbeat机制来实现)。 - 2是
Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。
Leader会跟踪与其保持同步的 Replica 列表,该列表称为 ISR(即 In-Sync Replica )。如果一个 Follower 宕机,或者落后太多,Leader 将把它从 ISR 中移除。这里所描述的“落后太多”指 Follower 复制的消息落后于 Leader 后的条数超过预定值(该值可在$KAFKA_HOME/config/server.properties 中通过 replica.lag.max.messages 配置,其默认值是 4000)或者 Follower 超过一定时间(该值可在 $KAFKA_HOME/config/server.properties 中通过replica.lag.time.max.ms 来配置,其默认值是10000 )未向Leader 发 Fetch请求。
Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,完全同步复制要求所有能工作的 Follower 都复制完,这条消息才会被认为 Commit,这种复制方式极大的影响了吞吐率(高吞吐率是 Kafka 非常重要的一个特性)。而异步复制方式下,Follower 异步的从 Leader 复制数据,数据只要被 Leader 写入 Log就被认为已经 Commit,这种情况下如果 Follower 都复制完都落后于 Leader,而如果 Leader 突然宕机,则会丢失数据。而 Kafka 的这种使用 ISR 的方式则很好的均衡了确保数据不丢失以及吞吐率 Follower 可以批量的从 Leader 复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了 Follower 与 Leader 的差距。
需要说明的是,Kafka 只解决 Fail/Recover,不处理“Byzantine ”(“拜占庭”)问题。一条消息只有被 ISR 里的所有 Follower 都从 Leader 复制过去才会被认为已提交。这样就避免了部分数据被写进了 Leader,还没来得及被任何Follower 复制就宕机了,而造成数据丢失( Consumer 无法消费这些数据)。而对于Producer 而言,它可以选择是否等待消息 Commit,这可以通过 request.required.acks 来设置。这种机制确保了只要ISR有一个或以上的 Follower,一条被Commit 的消息就不会丢失。
Leader选举本质上是一个分布式锁,有两种方式实现基于ZooKeeper的分布式锁:
- 节点名称唯一性:多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁。
- 临时顺序节点:所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁。
一种非常常用的选举 Leader的方式是“Majority Vote ”(“少数服从多数”),但 Kafka 并未采用这种方式。这种模式下,如果我们有 2f+1 个 Replica(包含 Leader 和 Follower ),那在Commit 之前必须保证有 f+1 个 Replica复制完消息,为了保证正确选出新的 Leader, Fail的 Replica 不能超过 f 个。因为在剩下的任意 f+1 个 Replica 里,至少有一个 Replica 包含有最新的所有消息。这种方式有个很大的优势,系统的 Latency 只取决于最快的几个 Broker,而非最慢那个。Majority Vote也有一些劣势,为了保证 Leader Election 的正常进行,它所能容忍的 Fail 的 Follower 个数比较少。如果要容忍 1 个Follower 挂掉,必须要有 3 个以上的 Replica,如果要容忍 2 个Follower 挂掉,必须要有 5 个以上的 Replica 。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的 Replica,而大量的 Replica 又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在 ZooKeeper 这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如 HDFS 的HA Feature 是基于 Majority-Vote-Basedjournal,但是它的数据存储并没有使用这种方式。
Kafka 在 ZooKeeper 中动态维护了一个 ISR( In-Sync Replicas),这个 ISR 里的所有 Replica 都跟上了 Leader,只有 ISR 里的成员才有被选为 Leader 的可能。在这种模式下,对于 f+1 个 Replica,一个 Partition 能在保证不丢失已经 Commit的消息的前提下容忍 f 个 Replica 的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍 f 个 Replica的失败,Majority Vote 和 ISR 在 Commit 前需要等待的 Replica 数量是一样的,但是 ISR 需要的总的 Replica 的个数几乎是 Majority Vote 的一半。虽然 Majority Vote 与 ISR 相比有不需等待最慢的 Broker 这一优势,但是 Kafka 作者认为 Kafka 可以通过 Producer 选择是否被 Commit 阻塞来改善这一问题,并且节省下来的 Replica 和磁盘使得 ISR 模式仍然值得。
在 ISR 中至少有一个 Follower 时, Kafka 可以确保已经 Commit 的数据不丢失,但如果某个 Partition 的所有 Replica 都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
- 1.等待
ISR中的任一个Replica“活”过来,并且选它作为Leader。 - 2.选择第一个“活”过来的
Replica(不一定是ISR中的)作为Leader。
这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待 ISR 中的 Replica“活”过来,那不可用的时间就可能会相对较长。而且如果 ISR 中的所有 Replica 都无法“活”过来了,或者数据都丢失了,这个 Partition将永远不可用。选择第一个“活”过来的 Replica 作为 Leader ,而这个 Replica 不是 ISR 中的 Replica ,那即使它并不保证已经包含了所有已 Commit 的消息,它也会成为 Leader而作为 Consumer的数据源(前文有说明,所有读写都由 Leader完成)。Kafka0.8.* 使用了第二种方式。根据 Kafka的文档,在以后的版本中, Kafka 支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
最简单最直观的方案是,所有Follower 都在 ZooKeeper 上设置一个Watch ,一旦 Leader 宕机,其对应的 Ephemeral Znode 会自动删除,此时所有 Follower 都尝试创建该节点,而创建成功者(ZooKeeper 保证只有一个能创建成功)即是新的Leader,其它 Replica 即为 Follower 。
但是该方法会有3个问题:
- 1.
Split-Brain这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致。 - 2.
Herd Effect如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整。 - 3.
ZooKeeper负载过重 每个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增加到几千个Partition时ZooKeeper负载会过重。
Kafka 0.8.* 的 Leader Election 方案解决了上述问题,它在所有 Broker 中选出一个 Controller ,所有 Partition 的 Leader 选举都由 Controller 决定。controller 会将 Leader 的改变直接通过RPC 的方式(比ZooKeeper Queue 的方式更高效)通知需为为此作为响应的 Broker 。同时 controller 也负责增删 Topic 以及 Replica 的重新分配。