Kafka教程
Kafka简介
1.1 消息队列
- 消息队列– 用于存放消息的组件
- 程序员可以将消息放入到队列中,也可以从消息队列中获取消息
- 很多时候消息队列不是一个永久性的存储, 是作为临时存储存在的(设定一个期限: 设置消息在MQ中保存10
天) - 消息队列中间件:消息队列的组件,例如: Kafka、 Active MQ、RabbitMQ、 RocketMQ、 ZeroMQ
1.2 Kafka的应用场景
- 异步处理
- 可以将一些耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据
- 比较常见的:发送短信验证码、发送邮件
- 系统解耦
- 原先一个微服务是通过接口(Http)调用另一个微服务,这时候耦合严重,只要接口发送变化就会导致系统不可用。
- 使用消息队列可以将系统进行解耦合,现在第一个微服务可以将消息放入到消息队列中,另一个微服务可
以从消息队列中把消息取出来进行处理。进行系统解耦
- 流量削峰
- 因为消息队列是低延迟,高可靠、高吞吐、可以应对大量并发
- 日志处理
- 可以使用消息队列作为临时存储,或者一种通信管道
1.3 消息队列的两种模型
- 生产者、消费者模型
- 生产者负责将消息生产到MQ中
- 消费者负责从MQ中获取消息
- 生产者和消费者是解耦的,可能是生产者一个程序、消费者是另外一个程序
- 消息队列的模式
- 点对点:一个消费者消费一个消息
- 发布订阅:多个消费者可以消费一个消息
Kafka集群搭建
- Kafka集群是必须要有ZooKeeper的
注意:
- 每一个Kafka的节点都需要修改broker.id(每个节点的标识,不能重复)
- log.dir数据存储目录需要配置
2.1 搭建Kafaka集群
- 将Kafka的安装包上传到虚拟机,并解压
cd /opt/software/
tar -xvzf tar -xvzf kafka_2.12-2.4.1.tgz -C ../module/
cd /opt/module/kafka_2.12-2.4.1/
- 修改 server.properties
cd /opt/module/server/kafka_2.12-2.4.1/config
vim server.properties
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/opt/module/kafka_2.12-2.4.1/data
# 配置zk的三个节点
zookeeper.connect=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181
- 将安装好的kafka复制到另外两台服务器
cd /opt/modules
scp -r kafka_2.12-2.4.1/ node2.itcast.cn:$PWD
scp -r kafka_2.12-2.4.1/ node3.itcast.cn:$PWD
修改另外两个节点的broker.id分别为1和2
---------node2.itcast.cn--------------
cd /opt/modules/kafka_2.12-2.4.1/config
vim erver.properties
broker.id=1
--------node3.itcast.cn--------------
cd /opt/modules/kafka_2.12-2.4.1/config
vim server.properties
broker.id=2
- 配置KAFKA_HOME环境变量
vim /etc/profile
export KAFKA_HOME=/opt/modules/kafka_2.12-2.4.1
export PATH=:$PATH:${KAFKA_HOME}
分发到各个节点
scp /etc/profile node2.itcast.cn:$PWD
scp /etc/profile node3.itcast.cn:$PWD
每个节点加载环境变量
source /etc/profile
- 启动服务器
# 启动ZooKeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka
cd /opt/modules/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties &
# 测试Kafka集群是否启动成功
bin/kafka-topics.sh --bootstrap-server node1.itcast.cn:9092 --list
基础操作
3.1 创建topic
创建一个topic (主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。
# 创建名为test的主题
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic test
# 查看目前Kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092
3.2 生产消息到Kafka
使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。
bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic test
3.3 从Kafka消费消息
使用下面的命令来消费 test 主题中的消息。
bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic test --from-beginning
3.4 Kafka的生产者/消费者/工具
- 安装Kafka集群,可以测试以下
- 创建一个topic主题 (消息都是存放在topic中,类似mysq|建表的过程)
- 基于kafka的内置测试生产者脚本来读取标准输入(键盘输入)的数据,并放入到topic中
- 基于kafka的内置测试消费者脚本来消费topic中的数据
- 推荐大家开发的使用Kafka Tool
- 浏览Kafka集群节点多少个topic、 多少个分区
- 创建topic/删除topic
- 浏览ZooKeeper中的数据
Kafka基准测试
4.1 基准测试
4.1.1 基于1个分区1个副本的基准测试
测试步骤:
- 启动Kafka集群
- 创建一个1个分区1个副本的topic: benchmark
- 同时运行生产者、消费者基准测试程序
- 观察结果
4.1.1.1 创建topic
bin/kafka-topics.sh --zookeeper node1.itcast.cn:2181 --create --topic benchmark --partitions 1 --replication-factor 1
4.1.1.2 生产消息基准测试
在生产环境中,推荐使用生产5000W消息,这样会性能数据会更准确些。为了方便测试,课程上演示测试500W的消息作为基准测试。
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1