Kafka 教程 | 基础架构

Posted by Aiden on September 10, 2019

基本架构

了解Kafka之前,您必须了解TopicBrockerProducerConsumer 等主要术语。

image.png

一个 Topic 对应一个消息队列。

Producer 主要负责将数据发送到 kafka 对应的 topic 中去。Consumer 主要负责从 Topic 中去接收消息。

Partition 概念


我们知道 Kafka 的目标是大数据,如果将消息存在一个“中心”队列中,势必缺少可伸缩性。无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽机器的性能或存储。

因此,Kafka 在概念上将一个 Topic 分成了多个 Partition,写入 Topic 的消息会被(平均)分配到其中一个 PartitionPartition 中会为消息保存一个Partition内唯一的 ID ,一般称为偏移量(offset) 。这样当性能/存储不足时 Kafka 就可以通过增加 Partition 实现横向扩展。

image.png

为了防止数据损坏, 每个 Partition 可以设置多个 Replication,每个Replication将分散在不同的Broker上维护。

每一个Partition有且只有一个Replica可以作为Leader,除了 Leader 以外的所有 Replica 均为follower

Leader 主要负责处理所有Producer、Consumer的请求, follower 不处理任何来自客户端的请求;只通过Fetch Request拉取leader replica的数据进行同步。

多个ProducerConsumer可以同时发布和检索消息。

image.png

Broker


Kafka 集群通常由多个Broker组成以保持负载平衡。 Kafka代理是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。 一个Kafka Broker实例可以每秒处理数十万次读取和写入,每个Broker可以处理TB的消息,而没有性能影响。 Kafka leader 选举可以由ZooKeeper完成。

Zookeeper


Apache Kafka 的一个关键依赖是 Apache Zookeeper,它是一个分布式配置和同步服务。 Zookeeper 是Kafka Brocker 和 Customer 之间的协调接口。 Kafka 服务器通过Zookeeper集群共享信息。 Kafka在Zookeeper中存储基本元数据,例如关于 Topic,Brocker,Custermer 偏移(队列读取器)等的信息。 由于所有关键信息存储在Zookeeper中,并且它通常在其整体上复制此数据,因此Kafka代理/ Zookeeper的故障不会影响Kafka集群的状态。 Kafka将恢复状态,一旦Zookeeper重新启动。 这为Kafka带来了零停机时间。 Kafka代理之间的领导者选举也通过使用Zookeeper在领导者失败的情况下完成。

Producer


多个Producer可以同时将数据Push给Broker。 Producer 会将每一条消息发送到指定Topic 的指定 partition 中去。

关于 partition 的获取分为三种情况 :

  1. 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
  2. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
  3. 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

生产者支持同步异步的消息写入方式,提供了快速写入的能力。

Consumers


Consumer 采用 Pull 的方式从集群中拉取消息

因为 Kafka Broker 是无状态的,这意味着消费者必须通过使用分区偏移来维护已经消耗了多少消息。 如果消费者确认特定的消息偏移,则意味着消费者已经消费了所有先前的消息。消费者可以简单地通过提供偏移值来快退或跳到分区中的任何点。

对于消费者来说,当消费者消费 Topic 消息时, Kafka 只会保证在 Partition 内消息是有序的,而不管全局的情况。

在Kafka 中引入了一个叫 Consumer Group(消费组) 的概念, 每一个消费组都是按照 发布-订阅消息模型 来消费消息。 而在同一个消费组的内部,每一个消费者是按照消息队列模型来消费消息的。

  • 队列模式(也叫点对点模式)。多个消费者共同消费一个队列,每条消息只发送给一个消费者。
  • 发布/订阅模式。多个消费者订阅主题,每个消息会发布给所有的消费者。

image.png


Kafka 读写原理与存储结构

那么,对于写入一条消息,它的基本流程是怎样的呢?

下面,我们讲沿着:Producer发布消息 -> 消息存储格式 -> Consumer消费消息这三个步骤展开。

在消息发布之前首先要创建一个 topic, 创建时用户可以指定 topic 名称分区数副本数 等。

Controller 控制器


Kafka集群中的其中一个Broker会被选举为 Controller ,主要负责Partition管理和副本状态管理,也会执行类似于重分配Partition之类的管理任务。

而创建Partition,就需要Controller的协调。 创建topic的过程,客户端的操作很简单,它甚至不需要有Kafka集群的存在,因为kafka集群只负责存数据,至于它存的是哪个topic的数据,估计它自己都不知道,这些对应信息都存储在Zookeeper中。

因此客户端创建一个topic时,只需要和zookeeper通信,在对应的znode节点新建一个子节点即可。 但是有了topiic,还需要分配topic的partition,还要选出partition的leader,以及ISR等,而这些工作,都是由controller来完成。

1. controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2. controller 从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
 
 2.1 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
 2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state

3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

从上术的流程可以看出,它利用了zookeeper的watcher机制,动态监听topic节点的改变,从而进行分配partition,选择leader,设置ISR并反写到zookeeper中。

Producer发布消息


我们来回顾一下,要发送一条消息,首先一定要指定一个topic,表明他是那一种类的消息.

然后一条消息要发送到一个broker上的某一个partition中,由于需要支持HA,所以对这条消息进行持久化的时候,肯定要同时写入多个partition中,成功之后在回ack给client。

每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

image.png

写入消息分为下面几个步骤:

  1. 确定partition

这是第一步,由于一个topic对应这多个partitions,首先要确定和哪个partition进行通信并存储,发送到broker时,分区算法选择将其存储到哪一个 partition. 他遵循以下规则:

  • 指定了 patition,则直接使用;
  • 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
  • patition 和 key 都未指定,使用轮询选出一个 patition。
  1. 找到partition的通信地址

由于系统实现了HA,因此,一个partition有多个replication,Producer 先从 Zookeeper 的 /brokers/…/state 节点找到该 partition 的 leader, 之后就只与该leader通信。

注意,这里的leader是指partition的leader,而事实上,kafka集群不像zookeeper一样,有leader管理,发起提议等,它是没有leader的。

  1. 数据传输
  • producer 将消息发送给该 leader
  • leader 将消息写入本地 log
  • followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
  • leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

这里我们一定想到了各种意外的情况,比如说网络中断,某个partition宕机了等,因此,保证消息的传输有几种方式:

  • At most once 消息可能会丢,但绝不会重复传输
  • At least one 消息绝不会丢,但可能会重复传输
  • Exactly once 每条消息肯定会被传输一次且仅传输一次

消息存储格式


一个 topic 有一个 partition ?和 broker 数一样?

这配置文件里会进行设置: server.properties 中的 num.partitions=3

一条消息的格式如下:

消息长度:     4 bytes (value: 1+4+n)   
版本号:       1 byte  
CRC校验码:    4 bytes  
具体的消息:   n bytes

一个topic的一个partition,物理上对应着一个文件夹,里面存储这所有该partition的消息。

每条消息都可以对应一个64位的offest,offset标注了这条消息在发送到这个分区的消息流中的起始位置,每个日志文件的名称都是这个文件第一条日志的offset.

所以第一个日志文件的名字就是00000000000.kafka.所以每相邻的两个文件名字的差就是一个数字S,S差不多就是配置文件中指定的日志文件的最大容量。

image.png

文件夹下有一个索引文件,和若干个.kafka文件。

Consumer订阅消息


消费者要比生产者复杂得多。由于kafka不会为消费者维护offset,因此,要么自己(Consumer)维护offset,低级API,要么交给Zookeeper维护, 高级API。

此外还有 Consumer Group 的概念,一个 Consumer Group 有多个Consumer,一条消息只会被其中的一个Consumer消费。

一般的,Consumer的数量不要大于Partition的数量,这是因为一个Partition只能被一个Consumer消费,Consumer多了就会有的轮空。

  1. Low Level API:

这种API维持了单一个broker的连接,它是没有状态的,因此每次去消费数据,都有带上offset,因此它可以通过重新设置offset多次消费同一份数据。 这种操作方式更灵活,但是也更繁琐。一般的,即使使用Low Level API,也会把offset手动同步到zookeeper上,当我们处理失败了,就不去更新offset。

此外,选择Partition的Leader,处理Leader的故障转移等也需要客户端去实现。

  1. High Level API:

更自动化的一种操作。consumer的offset会自动同步到zookeeper上,也会自动的去选取leader,leader失效时重试等。

但是它也有缺点,就是它保证了每次都取的都是下一条消息,不能够回搠去读,因此,无论处理这条消息是否成功,都不能重复读了,这显然有些不合理。

Consumer与Partition的关系


如果consumer比partition多,会阻塞,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化