前言

这是kafka学习笔记的第一篇文章,主要的内容是kafka的概述以及安装,对应着尚硅谷2022版Kafka3.x教程(P1~P7)

kafka概述

定义

  • kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。

  • kafka最新定义:Kafka是 一个开源的分布式事件流平台(Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

消息队列

  • 目 前企 业中比 较常 见的 消息 队列产 品主 要有 Kafka、ActiveMQ 、RabbitMQ 、RocketMQ 等。

  • 在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。

传统消息队列的应用场景

传统的消息队列的主要应用场景包括:缓存/消峰解耦异步通信

  • 缓冲/消峰:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
  • 解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
  • 异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

消息队列的两种模式

  • 点对点模式:

    • 消费者主动拉取数据,消息收到后清除消息

  • 发布/订阅模式:

    • 可以有多个topic主题(浏览、点赞、收藏、评论等)
    • 消费者消费数据后,不删除数据
    • 每个消费者互相独立,都可以消费到数据

kafka基础架构

Producer

消息生产者,就是向 Kafka broker 发消息的客户端。

Consumer

消息消费者,向 Kafka broker 取消息的客户端。

Consumer Group(CG)

消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

Broker

一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topic。

Topic

可以理解为一个队列,生产者和消费者面向的都是一个 topic

Partition

为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个topic可以分为多个partition,每个 partition 是一个有序的队列。

Replica

副本,一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower

Leader

每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。

Follower

每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

kafka集群安装部署

kafka在2.8.0版本之前安装部署需要依赖zookeeper,2.8.0之后的版本可以不依赖,但是大多数的企业还没用到这么新的版本。

集群规划

Hadoop102 Hadoop103 Hadoop104
Zk Zk Zk
Kafka Kafka Kafka

安装部署

官方下载地址:http://kafka.apache.org/downloads.html

解压安装包

1
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

修改解压后的文件名称

1
mv kafka_2.12-3.0.0/ kafka

修改配置文件

1
2
cd config/
vim server.properties

修改以下内容:

1
2
3
4
5
6
7
8
9
10
11
#broker 的全局唯一编号,不能重复,只能是数字。
- broker.id=
+ broker.id=0

#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
- log.dirs=
+ log.dirs=/opt/module/kafka/datas

#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
- zookeeper.connect=
+ zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

分发安装包

1
2
3
4
5
# 分发到hadoop103
scp -r kafka root@hadoop103:/opt/module/

# 分发到hadoop104
scp -r kafka root@hadoop104:/opt/module/

修改103,104配置文件

分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2

其中broker.id不能重复,整个集群中唯一
1
2
3
4
5
6
7
8
9
# 修改hadoop103 kafka配置文件
vim kafka/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

# 修改hadoop104 kafka配置文件
vim kafka/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

启动集群

依次在 hadoop102、hadoop103、hadoop104 节点上启动 Kafka。

1
bin/kafka-server-start.sh -daemon config/server.properties

停止集群

依次在 hadoop102、hadoop103、hadoop104 节点上停止 Kafka。

1
bin/kafka-server-stop.sh

集群启停脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
vim kf.sh

# 脚本内容如下:

#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -
daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac


添加执行权限

1
chmod +x kf.sh

使用:

1
2
3
4
5
# 启动集群命令
kf.sh start

# 停止集群命令
kf.sh stop