本文介绍: 消息中间件处理节点,一个 Kafka 节点就是一个 Broker,一个或者多个 Broker 可以组成一个 Kafka 集群。

消息中间件处理节点,一个 Kafka 节点就是一个 Broker,一个或者多个 Broker 可以组成一个 Kafka 集群。

Kafka 的消息通过 Topic 主题来分类,Topic类似于关系型数据库中的表,每个 Topic 包含一个或多(Partition)分区。

4、LogSegment

  • 首先用二分查找确定它是在哪个Segment文件中,其中0000000000000000000.index为最开始的文件,第二个文件为0000000000000170410.index(起始偏移为170410+1 = 170411),而第三个文件为0000000000000239430.index(起始偏移为239430+1 = 239431)。所以这个offset = 170417就落在第二个文件中。其他后续文件可以依此类推,以起始偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。
  • 用该offset减去索引文件的编号,即170417 – 170410 = 7,也用二分查找法找到索引文件中等于或者小于7的最大的那个编号。可以看出我们能够找到[4,476]这组数据,476即offset=170410 + 4 = 170414的消息在log文件中的偏移量。
  • 打开数据文件(0000000000000170410.log),从位置为476的那个地方开始顺序扫描直到找到offset为170417的那条Message。
  1. 多Partition分布式存储,利于集群数据的均衡。
  2. 并发读写,加快读写速度。
  3. 加快数据恢复的速率:当某台机器挂了,每个Topic仅需恢复一部分的数据,多机器并发。
  1. 指明partition的情况下,使用指定的partition;
  2. 没有指明partition,但是有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
  3. 既没有指定partition,也没有key的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition数取余得到partition值,也就是常说的round-robin算法。
  4. 自定义分区策略:实现 org.apache.kafka.clients.consumer.internals.PartitionAssignor接口。
  1. kafka可以保证分区消息的顺序。如果使用同一个生产者往同一个分区写入消息,而且消息B在消息A之后写入,那么kafka可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取到消息A再读取消息B。
  2. 只有当消息被写入分区的所有副本时,它才被认为是“已提交”的。生产者可以选择接收不同类型的确认,比如在消息被完全提交时的确认、在消息被写入分区首领时的确认,或者在消息被发送到网络时的确认。
  3. 只要还有一个副本是活跃的,那么已经提交的信息就不会丢失。
  4. 消费者只能读取到已经提交的消息。
  • 与zookeeper之间有一个活跃的会话,也就是说,它在过去的6s(可配置)内向zookeeper发送过心跳。
  • 在过去的10s(可配置)内从首领那里获取过最新的数据。
  • 0: producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没写入磁盘就已经返回,当broker故障时可能丢失数据;
  • 1: producer等待leader的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
  • **-1(all):**producer等待broker的ack,partition的leader和ISR里的follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成重复数据。(极端情况下也有可能丢数据:ISR中只有一个Leader时,相当于1的情况)。
  • follower故障
  • leader故障
  • **batch.size:**只有数据积累到batch.size后,sender才会发送数据。(单位:字节,注意:不是消息个数)。
  • linger.ms**:**如果数据迟迟未达到batch.size,sender等待 linger.ms之后也会发送数据。(单位:毫秒)。
  • client.id**:**该参数可以是任意字符串,服务器会用它来识别消息的来源,还可用用在日志和配额指标里。
  • max.in**.flight.requests.per.connection:**该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设置为1可以保证消息时按发送的顺序写入服务器的,即使发生了重试。
  • send方法使用同步发送,或者使用带回调的方法
  • 设置重试次数retries 合适的值,一般是3
  • 使用自动提交offset时,当消费者挂了,也会丢失。可以设置为手动提交offset,会带来消息被重新消费的问题,需要根据业务做幂等处理。
  • 设置 acks = all,所有副本都要接收到该消息之后该消息才算真正成功被发送。
  • 设置 replication.factor >= 3,为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
  • 设置 min.insync.replicas > 1,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。但是,为了保证整个 Kafka 服务的高可用性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下加入两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1
  • 设置 unclean.leader.election.enable = false,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
  • partition 并行处理
  • 顺序写磁盘,充分利用磁盘特性
  • 利用了现代操作系统分页存储 Page Cache 来利用内存提高 I/O 效率
  • 采用了零拷贝技术
  • Producer 生产的数据持久化到 broker,采用 mmap 文件映射,实现顺序的快速写入
  • Customer 从 broker 读取数据,采用 sendfile,将磁盘文件读到 OS 内核缓冲区后,转到 NIO buffer进行网络发送,减少 CPU 消耗
  • 脑裂:虽然ZooKeeper能保证注册到节点上的所有监听器都会按顺序被触发,但并不能保证同一个时刻所有副本看到的状态是一样的,可能造成不同副本的响应不一致
  • 羊群效应:如果宕机的那个Broker的Partition数量很多,会造成多个Watch被触发,引起集群内大量的调整
  • 每个副本都要在ZK的Partition上注册Watcher,当集群内Partition数量很多时,会造成ZooKeeper负载过重
    目前版本:不过每个broker还是会对/controller节点添加监听器的,以此来监听此节点的数据变化(参考ZkClient中的IZkDataListener)。
  • Leader的变化从监听器改为由Controller管理
  • 控制器负责检测Broker的失败,并为每个受影响的Partition选举新的Leader
  • 控制器会将每个Leader的变化事件发送给受影响的每个Broker
  • 控制器和Broker之间的通信采用直接的RPC,而不是通过ZK队列
  • 因为Leader管理被更加集中地管理,比较容易调试问题
  • Leader变化针对ZK的读写可以批量操作,减少在failover过程中端到端的延迟
  • 更少的ZooKeeper监听器
  • 使用直接RPC协议相比队列实现的ZK,能够更加高效地在节点之间通信
  • Partition的Leader变化事件
  • 新创建或删除一个topic
  • 重新分配Partition
  • 管理分区的状态机和副本的状态机
  • coordinator协调者组件完成订阅主题分区的分配过程中,该消费者所有的实例不能消费任何消息
  • 消费者很多的话,rebalance很慢,对业务产生影响
  • rebalance效率不高,所有的消费者都要参与进来,0.11版本提供了sticky assignor,尽量保留之前的分配方案,实现分区变动最小
  • 新的消费者加入消费者组
  • 消费者从消费者组退出
  • 消费者宕机下线。比如长时间GC、网络延迟导致长时间未向groupcoordinate发送心跳
  • 消费者消费超时,没有在指定的时间内提交offset
  • 消费者组对应得GroupCoordinate节点发生变化
  • 消费者组订阅得主题或者主题得分区数发生变化
  • session.timeout.ms 设置了超时时间,consumer和broker的心跳超时时间,默认10s
  • heartbeat.interval.ms 心跳时间间隔,consumer和broker的心跳检测时间,默认3s
  • max.poll.interval.ms 每次消费的处理时间,两次poll的最大时间间隔,默认5分钟,超时则触发重平衡
  • max.poll.records 每次消费的消息数,默认500条
  • 消费者端频繁地full gc

七、kafka为什么这么快

  1. 第一个是Kafka对顺序I/O的依赖。
  2. 赋予 Kafka 性能优势的第二个设计选择是它对效率的关注:零复制原则。

该图说明了数据如何在生产者和消费者之间传输,以及零拷贝的含义。

  • 步骤1.1 – 1.3:生产者将数据写入磁盘
  • 第2步:Consumer无需零拷贝读取数据

2.1 数据从磁盘加载到OS缓存

2.2 数据从OS缓存复制到Kafka应用程序

2.3 Kafka应用程序将数据复制到socket缓冲区

2.4 将数据从socket buffer复制到网卡

2.5 网卡将数据发送给消费者

  • 步骤3:消费者以零拷贝方式读取数据

3.1:数据从磁盘加载到OS缓存 3.2 OS缓存通过sendfile()命令直接将数据复制到网卡 3.3 网卡将数据发送给消费者

零拷贝是在应用程序上下文和内核上下文之间保存多个数据副本的快捷方式。

 

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注