kafka架构探索及安装


1. 发展历程

https://kafka.apache.org/downloads
在这里插入图片描述

1.1 版本命名

Kafka在1.0.0版本前的命名规则是4位,比如0.8.2.1,0.8是大版本号,2是小版本号,1表示打过1个补丁。

现在的版本号命名规则是3位,格式是“大版本号”+“小版本号”+“修订补丁数”,比如2.5.0,前面的2代表的是大版本号,中间的5代表的是小版本号,0表示没有打过补丁。

我们所看到的下载包,前面是scala编译器的版本,后面才是真正的kafka版本。比如
在这里插入图片描述

1.2 演进历史

0.7版本 只提供了最基础的消息队列功能。

0.8版本 引入了副本机制,至此Kafka成为了一个真正意义上完备的分布式高可靠消息队列解决方案。

0.9版本 增加权限和认证,使用Java重写了新的consumer API,Kafka Connect功能;不建议使用consumer API;

0.10版本 引入Kafka Streams功能,正式升级成分布式流处理平台;建议版本0.10.2.2;建议使用新版consumer API

0.11版本 producer API幂等,事务API,消息格式重构;建议版本0.11.0.3;谨慎对待消息格式变化

1.0和2.0版本 Kafka Streams改进;建议版本2.0;

2. 单机安装

前提是zookeeper已经安装并启动了。

本次安装kafka的版本为2.8.0

2.1 下载 kafka

地址: http://kafka.apache.org/downloads.html

 Source download: kafka-2.5.0-src.tgz (asc, sha512)  # 此为源代码, 需要自己编译
Binary downloads:
    Scala 2.12  - kafka_2.12-2.5.0.tgz (asc, sha512)  # 已编译好的, 前面的2.12为 Scala版本
    Scala 2.13  - kafka_2.13-2.5.0.tgz (asc, sha512)

2.2 解压安装包

tar -zxvf kafka_2.13-2.5.0.tgz

2.3 修改配置文件

进入到kafka的config目录下

vim server.properties

其实主要就修改zookeeper的链接配置,和启用监听器。其余都不用动
在这里插入图片描述
在这里插入图片描述

2.4 启动 kafka

在kafka根目录下执行以下命令来后台运行kafka

nohup ./bin/kafka-server-start.sh config/server.properties &

如果启动的时候出现错误:
在这里插入图片描述
也就是找不到你的jdk。
这个时候去修改bin/kafka-run-class.sh

vim ./bin/kafka-run-class.sh

搜索JAVA_HOME,并且将红色框内容替换为JDK的绝对路径。
在这里插入图片描述

2.5 验证

使用jps命令可以看到是否启动成功:
在这里插入图片描述

3. 组件使用

3.1 创建 topic

2.x版本

在kafka根目录下执行:

# 创建 1个分区 1个副本的 topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hello

在这里插入图片描述

3.x版本

在kafka根目录下执行:

# 创建 1个分区 1个副本的 topic
./bin/kafka-topics.sh --create --bootstrap-server 192.168.80.16:9092 --replication-factor 1 --partitions 1 --topic testTopic(主题名称)

在这里插入图片描述

注意的点:--bootstrap-server后不能使用localhost,必须填写ip

而且还需要注意:

对于版本 2.*,您必须使用 Zookeper 创建主题,并使用默认端口 2181 作为参数。

对于版本 3.*,zookeeper 不再是参数,您应该使用 --bootstrap-server 使用 localhost 或服务器的 IP 地址和默认端口 9092

3.2 查看 topic

./bin/kafka-topics.sh --list --zookeeper localhost:2181

在这里插入图片描述

3.3 发送消息

发送消息默认使用的是 9092 端口, kafka 链接 zookeeper 默认端口是 2181 这两个是不一样的, 要注意.

./bin/kafka-console-producer.sh --broker-list  192.168.80.16:9092 --topic hello

不能用localhost

在这里插入图片描述

3.4 消费消息

复制一个新的窗口进行消息消费。

./bin/kafka-console-consumer.sh --bootstrap-server  192.168.80.16:9092 --topic hello

不能用localhost

此时在发送端进行发送,再来看接收端
在这里插入图片描述

3.5 删除 topic

./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic hello

在这里插入图片描述
再来看看topic列表发现hello已经被删除了
在这里插入图片描述

3.6 主题详情

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic testTopic

在这里插入图片描述

3.7 分组消费

启动两个consumer时,如果不指定group信息,消息被广播
在这里插入图片描述
指定相同的group,发现只有一个消费者可以获得消息
在这里插入图片描述

注意,此时的testTopic的分区数为:1个,所以同一group的2个消费者会有一个闲置。
在这里插入图片描述


新创建一个主题hi,分区为2
在这里插入图片描述
此时可以看到两个消费者是交替进行消费的
在这里插入图片描述

总结:如果同一group下的 ( 消费者数量 > 分区数量 ) 那么就会有消费者闲置。

3.8 指定分区

指定消费者消费分区通过参数 --partition,注意!需要去掉group。
指定分区的意义在于,保障消息传输的顺序性。

./bin/kafka-console-consumer.sh --bootstrap-server  192.168.80.16:9092 --topic hi --partition 0

./bin/kafka-console-consumer.sh --bootstrap-server  192.168.80.16:9092 --topic hi --partition 1

在这里插入图片描述
发送1-4条消息,交替出现。说明消息被均分到各个分区中投递


默认的发送是没有指定key的,要指定分区发送,就需要定义key。那么相同的key被路由到同一个分区。

./bin/kafka-console-producer.sh --broker-list  192.168.80.16:9092 --topic hi --property parese.key=true

携带key再发送,注意key和value之间用tab分割
在这里插入图片描述
结果:相同的key被同一个consumer消费掉

3.9 偏移量

偏移量决定了消息从哪开始消费,支持:开头,还是末尾。

  • earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
./bin/kafka-console-consumer.sh --bootstrap-server  192.168.80.16:9092 --topic hi --partition 0 --offset earliest

在这里插入图片描述
结果:之前发送的消息,从头又消费了一遍

4. zookeeper分析

zk存储了kafka集群的相关信息,本节来探索内部的秘密。
在zk客户端输入命令

ls /

在这里插入图片描述
得到以下配置信息
在这里插入图片描述

4.1 broker信息

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4.2 主题与分区

分区节点路径
在这里插入图片描述
分区信息,leader所在的机器id,isr列表等
在这里插入图片描述

4.3 消费者与偏移量

在这里插入图片描述
居然是空的??原来现在kafka已经自身去维护 group 的消费 偏移量了。

查看方式:
上面的消费用的是控制台工具,这个工具使用–bootstrap-server,不经过zk,也就不会记录到/consumers下。

其消费者的offset会更新到一个kafka自带的topic【__consumer_offsets】下面。

消费消息后,查看TOPIC可以发现多了一个__consumer_offsets
在这里插入图片描述
我们之前用过一个group:aaa

输入命令查看消费者及偏移量情况:

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.80.16:9092 --list

在这里插入图片描述
查看偏移量详情:

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.80.16:9092 --describe --group aaa

在这里插入图片描述


如果当前与LEO保持一致,说明消息都完整的被消费过:
在这里插入图片描述
停掉consumer后,往provider中再发几条记录,offset开始滞后:
在这里插入图片描述
重新启动consumer,消费到最新的消息,同时再返回看偏移量,消息得到同步:
在这里插入图片描述

4.4 controller

当前集群中的主控节点是谁
在这里插入图片描述


文章作者: fFee-ops
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 fFee-ops !
评论
  目录