Kafka教程


Kafka教程

Kafka简介

1.1 消息队列

  • 消息队列– 用于存放消息的组件
  • 程序员可以将消息放入到队列中,也可以从消息队列中获取消息
  • 很多时候消息队列不是一个永久性的存储, 是作为临时存储存在的(设定一个期限: 设置消息在MQ中保存10
    天)
  • 消息队列中间件:消息队列的组件,例如: Kafka、 Active MQ、RabbitMQ、 RocketMQ、 ZeroMQ

1.2 Kafka的应用场景

  • 异步处理
    • 可以将一些耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据
    • 比较常见的:发送短信验证码、发送邮件
  • 系统解耦
    • 原先一个微服务是通过接口(Http)调用另一个微服务,这时候耦合严重,只要接口发送变化就会导致系统不可用。
    • 使用消息队列可以将系统进行解耦合,现在第一个微服务可以将消息放入到消息队列中,另一个微服务可
      以从消息队列中把消息取出来进行处理。进行系统解耦
  • 流量削峰
    • 因为消息队列是低延迟,高可靠、高吞吐、可以应对大量并发
  • 日志处理
    • 可以使用消息队列作为临时存储,或者一种通信管道

1.3 消息队列的两种模型

  • 生产者、消费者模型
    • 生产者负责将消息生产到MQ中
    • 消费者负责从MQ中获取消息
    • 生产者和消费者是解耦的,可能是生产者一个程序、消费者是另外一个程序
  • 消息队列的模式
    • 点对点:一个消费者消费一个消息
    • 发布订阅:多个消费者可以消费一个消息

Kafka集群搭建

  • Kafka集群是必须要有ZooKeeper的

注意:

  • 每一个Kafka的节点都需要修改broker.id(每个节点的标识,不能重复)
  • log.dir数据存储目录需要配置

2.1 搭建Kafaka集群

  1. 将Kafka的安装包上传到虚拟机,并解压
cd /opt/software/
tar -xvzf tar -xvzf kafka_2.12-2.4.1.tgz -C ../module/
cd /opt/module/kafka_2.12-2.4.1/
  1. 修改 server.properties
cd /opt/module/server/kafka_2.12-2.4.1/config
vim server.properties
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/opt/module/kafka_2.12-2.4.1/data
# 配置zk的三个节点
zookeeper.connect=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181
  1. 将安装好的kafka复制到另外两台服务器
cd /opt/modules
scp -r kafka_2.12-2.4.1/ node2.itcast.cn:$PWD
scp -r kafka_2.12-2.4.1/ node3.itcast.cn:$PWD

修改另外两个节点的broker.id分别为1和2
---------node2.itcast.cn--------------
cd /opt/modules/kafka_2.12-2.4.1/config
vim erver.properties
broker.id=1
--------node3.itcast.cn--------------
cd /opt/modules/kafka_2.12-2.4.1/config
vim server.properties
broker.id=2
  1. 配置KAFKA_HOME环境变量
vim /etc/profile
export KAFKA_HOME=/opt/modules/kafka_2.12-2.4.1
export PATH=:$PATH:${KAFKA_HOME}

分发到各个节点
scp /etc/profile node2.itcast.cn:$PWD
scp /etc/profile node3.itcast.cn:$PWD
每个节点加载环境变量
source /etc/profile
  1. 启动服务器
# 启动ZooKeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka
cd /opt/modules/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties &
# 测试Kafka集群是否启动成功
bin/kafka-topics.sh --bootstrap-server node1.itcast.cn:9092 --list

基础操作

3.1 创建topic

创建一个topic (主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。

# 创建名为test的主题
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic test
# 查看目前Kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092

3.2 生产消息到Kafka

使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。

bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic test

3.3 从Kafka消费消息

使用下面的命令来消费 test 主题中的消息。

bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic test --from-beginning

3.4 Kafka的生产者/消费者/工具

  • 安装Kafka集群,可以测试以下
    • 创建一个topic主题 (消息都是存放在topic中,类似mysq|建表的过程)
    • 基于kafka的内置测试生产者脚本来读取标准输入(键盘输入)的数据,并放入到topic中
    • 基于kafka的内置测试消费者脚本来消费topic中的数据
  • 推荐大家开发的使用Kafka Tool
    • 浏览Kafka集群节点多少个topic、 多少个分区
    • 创建topic/删除topic
    • 浏览ZooKeeper中的数据

Kafka基准测试

4.1 基准测试

4.1.1 基于1个分区1个副本的基准测试

测试步骤:

  1. 启动Kafka集群
  2. 创建一个1个分区1个副本的topic: benchmark
  3. 同时运行生产者、消费者基准测试程序
  4. 观察结果
4.1.1.1 创建topic
bin/kafka-topics.sh --zookeeper node1.itcast.cn:2181 --create --topic benchmark --partitions 1 --replication-factor 1
4.1.1.2 生产消息基准测试

在生产环境中,推荐使用生产5000W消息,这样会性能数据会更准确些。为了方便测试,课程上演示测试500W的消息作为基准测试。

bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1

文章作者:
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 !
  目录