kafka基础

什么是Kafka?

Apache Kafka 是一款开源的消息引擎系统,也是一个分布式的流式处理平台,还能被用作分布式存储系统。根据维基百科的定义,消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递。

消息引擎要设定具体的传输协议,即我用什么方法将消息传输出去,常用的方法有2种:

  1. 点对点模式。
  2. 发布/订阅模式。

Kafka同时支持两种模式。

名词解释

  • 消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。

  • 主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。

  • Kafka服务器端:Broker。Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化。

  • 分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。afka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区 0 中,要么在分区 1 中。如你所见,Kafka 的分区编号是从 0 开始的,如果 Topic 有 100 个分区,那么它们的分区号就是从 0 到 99。

  • 消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。一旦消息被成功写入到一个分区上,它的位移值就是固定的了。

  • 副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。

  • 生产者:Producer。向主题发布新消息的应用程序。

  • 消费者:Consumer。从主题订阅新消息的应用程序。

  • 消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移,消费者位移是随时变化的,毕竟它是消费者消费进度的指示器,每个消费者有着自己的消费者位移。

  • 消费者组:Consumer Group。 多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。

  • 重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka Broker是如何持久化数据的:总的来说,Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又进一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

Kafka的种类

  • Apache Kafka,也称社区版 Kafka。优势在于迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅提供基础核心组件,缺失一些高级的特性。

  • Confluent Kafka,Confluent 公司提供的 Kafka。优势在于集成了很多高级特性且由 Kafka 原班人马打造,质量上有保证;缺陷在于相关文档资料不全,普及率较低,没有太多可供参考的范例。

  • CDH/HDP Kafka,大数据云公司提供的 Kafka,内嵌 Apache Kafka。优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度较慢。

Kafka的基本使用-基本参数

Broker 端参数

存储信息:

  • log.dirs:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。

  • log.dir:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。

线上只要设置log.dirs,即第一个参数就好了,不要设置log.dir。而且更重要的是,在线上生产环境中一定要为log.dirs配置多个路径,好处有:

  1. 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
  2. 能够实现故障转移:即 Failover。例如一块磁盘挂掉了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。

ZooKeeper 相关:

ZooKeeper是一个分布式协调框架,负责协调管理并保存 Kafka 集群的所有元数据信息,比如集群都有哪些 Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少分区以及这些分区的 Leader 副本都在哪些机器上等信息。

  • zookeeper.connect:配置与zooker的连接信息,例如zk1:2181,zk2:2181,zk3:2181。

Broker 连接相关:

客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。有以下三个参数:

  • listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。

  • advertised.listeners:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。(外网访问场景,你的Kafka Broker机器上配置了双网卡,一块网卡用于内网访问(即我们常说的内网IP);另一个块用于外网访问。那么你可以配置listeners为内网IP,advertised.listeners为外网IP。)

  • host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。

Topic 管理;

  • auto.create.topics.enable:是否允许自动创建 Topic。(建议最好设置成 false,即不允许自动创建 Topic。在我们的线上环境里面有很多名字稀奇古怪的 Topic,我想大概都是因为该参数被设置成了 true 的缘故。)

  • unclean.leader.election.enable:是否允许 Unclean Leader 选举。(多个副本只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格做这件事。如果设置成 false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader。这样做的后果是这个分区就不可用了,因为没有 Leader 了。反之如果是 true,那么 Kafka 允许你从那些“跑得慢”的副本中选一个出来当 Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全,当了 Leader 之后它本人就变得膨胀了,认为自己的数据才是权威的。建设fasle)

  • auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。(设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!换一次 Leader 代价很高,建议false)

数据留存相关:

  • log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。(通常情况下我们还是设置 hours 级别的多一些,比如log.retention.hours=168表示默认保存 7 天的数据,自动删除 7 天前的数据。)

  • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。(默认是 -1,表明你想在这台 Broker 上保存多少数据都可以)

  • message.max.bytes:控制 Broker 能够接收的最大消息大小。(默认的 1000012 太少了,还不到 1MB。实际场景中突破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。)

Topic 级别参数

如果同时设置了 Topic 级别参数和全局 Broker 参数,到底听谁的呢?哪个说了算呢?答案就是 Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值,这就是所谓的 Topic 级别参数。

保存消息方面:

  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。

  • retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。

处理的消息大小:

max.message.bytes:它决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。

、、、
bin/kafka-configs.sh –zookeeper localhost:2181 –entity-type topics –entity-name transaction –alter –add-config max.message.bytes=10485760
、、、

JVM 参数

  • 堆大小这个参数:将你的 JVM 堆大小设置成 6GB 吧,这是目前业界比较公认的一个合理值。我见过很多人就是使用默认的 Heap Size 来跑 Kafka,说实话默认的 1GB 有点小,毕竟 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例,Heap Size 不能太小。

GC设置:

Java7: 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。

Java8:使用G1。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC。

  • KAFKA_HEAP_OPTS:指定堆大小。

  • KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。

操作系统参数

  • 文件描述符限制:ulimit -n。我觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000。不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。

  • 文件系统类型:这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS。

  • Swap 的调优:网上很多文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。(建议设置成一个较小的值。为什么呢?因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。)

  • Flush 落盘时间:向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

最后更新: 2022年03月06日 20:35

原始链接: https://jjw-story.github.io/2022/03/06/Kafka-基础/

× 请我吃糖~
打赏二维码