kafka 是一种高吞吐量,分布式,基于发布/订阅的消息系统。

特点

  • 吞吐量高、延迟低
    • 每个topic可以支持consumer group(多个consumer)partition进行操作
  • 扩展性好
    • 集群支持热扩展
  • 持久性、可靠性好
    • 消息数据是被持久化到本地磁盘的,并且支持数据备份
  • 高并发
    • 支持千级别客户端同时读写

核心概念

  • Broker
    • kafka 服务器,负责消息存储和转发
  • Topic
    • 消息主题,用于分类消息
  • Partition
    • topic的分区(sharding),每个topic可以有多个partition,对应topic的消息保存在这些partition上,单partition的消息是有序的。同一个topic的多个partition不保证有序。
  • Offset
    • 消息在partition上的位置,也表示消息的唯一序号。
  • Producer
    • 消息生产者
  • Consumer Group
    • 消息消费者consumer的组

Controller Broker

Kafka使用Zookeeper保存了集群的brokertopicpartition等元数据。另外,Zookeeper负责从broker中选举一个作为Controller,并确保唯一性,当Controller宕机时,会重新选举一个新的。

Controller Broker本身就是一个普通的broker,只不过它需要多负载一些额外的工作

  • 创建、删除topic
  • 增加partition,并分配leader partition
  • 集群broker管理,包括:新增brokerbroker主动关闭,broker故障

存储

todo

提交 offset

consumer通过提交offset来记录当前消费的最后位置,以便consumer发生崩溃或者有新的consumer加入consumer group时触发分区再均衡操作,每个consumer重新分配新的分区可以获取到该分区当前消费的最后位置。

kafka 通过enable.auto.commit控制offset提交方式。

  • enable.auto.commit
    • =true
      • 通过后台线程周期性提交,默认周期是5s,参数auto.commit.interval.ms
    • =false
      • 不自动提交

自动提交

大部分自动提交是定时任务,pool()close()也会提交当前最大偏移量。

  • 缺点
    • 有丢失消息风险
      • consumer取到消息处理比较耗时,此时自动提交了offset,若此时consumer或处理程序崩溃便会导致当前消息丢失,因为在broker已经将此消息标识为处理完成了。因为当前consumer崩溃,所以其负责的partition被重新负载到其他consumer时,将从最新偏移量开始消费消息。
    • 有重复消费风险
      • consumer取到消息处理到一半,因为auto.commit.interval.ms配置时间过长,若此时consumer或处理程序崩溃,由于未提交offset,此消息会在负载再平衡后被其他consumer消费处理。

手动提交

为了避免消息丢失、重复消费,可以使用手动提交。

Consumer 数据重复场景及解决方案

原因

数据消费完,没有及时提交 offset 到 broker,consumer 异常重启了。

方案

业务幂等性设计

如何保证不丢消息

Producer 丢失情况

场景

当 Producer 调用send方法发送消息后,消息可能会因为网络问题发送失败。

方案

为了防止此种情况出现,我们需要关注send的结果,判断结果如果失败了,需要重新发送。一般重试次数会设置一个比较合理的值,比如 3,但也要根据业务场景做取舍。

Consumer 丢失情况

场景

Consumer拉取到消息后,Consumer自动提交了OffsetConsumer突然挂掉,但此时Consumer的业务处理逻辑尚未执行完成

方案

cd
关闭自动提交offset,处理完逻辑手动提交。

当处理完业务逻辑,Consumer突然挂掉,尚未手动提交offset,会导致消息被重复消费。需要Consumer的幂等性。

Kafka 弄丢了消息

kafka 分区为多副本机制,分区中的多个副本会有一个leader副本,其他副本统称为follower。当我们发送消息时,其实是发送到leader 副本,然后 follower 副本从 leader 副本拉取信息进行同步。ProducerConsumer只与leader副本进行交互。

场景

leader副本所在的broker突然挂掉,就会从follower副本中选出一个新的leader,但是如果挂掉的那个broker上的副本数据没有没follower完全同步的话,就会造成消息丢失。

方案

  • 设置 acks = all

    • acks 默认值为1。表示消息被leader副本接收之后就算成功,acks = all 表示所有副本都要接收到该消息之后才算成功。
  • 设置 replication.factor >= 3

    • 保证每个分区的副本个数量,此做法虽然会造成数据冗余,但也带来了数据的安全性。
  • 设置 min.insync.replicas > 1

    • 默认值为1,设置为大于1表示消息至少被写入2个副本才算发送成功