安装Kafka

下载地址
Apache Kafka
解压文件
tar -zxvf kafka_2.12-3.0.0.tgz
cd kafka_2.12-3.0.0

启动Kafka环境

注意:您的本地环境必须安装 Java 8+。
Apache Kafka可以使用ZooKeeper或KRaft启动。要开始使用任一配置,请遵循以下部分之一,但不能同时遵循两者。

ZooKeeper启动

运行以下命令以正确的顺序启动所有服务:

 cd bin
 bin/zookeeper-server-start.sh ../config/zookeeper.properties

打开另一个终端会话并运行:

#先进入KafKa bin目录
./kafka-server-start.sh ../config/server.properties

成功启动所有服务后,您将拥有一个基本的 Kafka 环境运行并可供使用。

Kafka-Kraft启动

cd config

生成集群 UUID

../bin/kafka-storage.sh random-uuid
#执行命令后会生成一个uuid EQE2bU8TTg-RIDHgtqRdUw 
#格式化目录的时候要用到这个uuid

格式化日志目录

../bin/kafka-storage.sh format -t EQE2bU8TTg-RIDHgtqRdUw -c ./kraft/server.properties
#默认目录 Formatting /tmp/kraft-combined-logs

启动 Kafka 服务器

../bin/kafka-server-start.sh ./kraft/server.properties
#关闭Kafka	ctrl+c

一旦 Kafka 服务器成功启动,您将拥有一个基本的 Kafka 环境运行并可供使用。

后台不间断运行

nohup ../bin/kafka-server-start.sh ./kraft/server.properties >/dev/null 2>&1 &
#关闭后台运行
../bin/kafka-server-stop.sh

主题命令行操作

1)查看操作主题命令参数

.bin/kafka-topics.sh

微信截图_20230617120603.png
2)查看当前服务器中的所有 topic

./kafka-topics.sh --bootstrap-server localhost:9092 --list

3)创建 first topic

./kafka-topics.sh --create --topic first --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
选项说明: 
--topic 定义 topic 名 
--replication-factor 定义副本数 
--partitions 定义分区数

4)查看 first 主题的详情

./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first

5)修改分区数(注意:分区数只能增加,不能减少)

./kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 2

6)删除 topic

./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first

生产者命令行操作

1)查看操作生产者命令参数

./kafka-console-producer.sh

20230617123240.png
2)发送消息

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first

消费者命令行操作

1)查看操作消费者命令参数

./kafka-console-consumer.sh

20230617123407.png
2)消费消息
(1)消费 first 主题中的数据。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first

(2)把主题中所有的数据都读取出来(包括历史数据)。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first

使用docker-compose搭建Kafka

搭建kafka环境的docker-compose.yml文件

version: '3'
services:
 zookepper:
 image: wurstmeister/zookeeper                    # 原镜像`wurstmeister/zookeeper`
 container_name: zookeeper                        # 容器名为'zookeeper'
 volumes:                                         # 数据卷挂载路径设置,将本机目录映射到容器目录
 - "/etc/localtime:/etc/localtime"
 ports:                                           # 映射端口
 - "2181:2181"
 kafka:
 image: wurstmeister/kafka                                # 原镜像`wurstmeister/kafka`
 container_name: kafka                                    # 容器名为'kafka'
 volumes:                                                 # 数据卷挂载路径设置,将本机目录映射到容器目录
 - "/etc/localtime:/etc/localtime"
 environment:                                                       # 设置环境变量,相当于docker run命令中的-e
 KAFKA_BROKER_ID: 0                                               # 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://ip:9092 # TODO 将kafka的地址端口注册给zookeeper
 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092                        # 配置kafka的监听端口
 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 
 KAFKA_CREATE_TOPICS: "hello_world"
 KAFKA_HEAP_OPTS: -Xmx1G -Xms256M
 ports:                              # 映射端口
 - "9092:9092"
 depends_on:                         # 解决容器依赖启动先后问题
 - zookepper
 kafka-manager:
 image: sheepkiller/kafka-manager                         # 原镜像`sheepkiller/kafka-manager`
 container_name: kafka-manager                            # 容器名为'kafka-manager'
 environment:                        # 设置环境变量,相当于docker run命令中的-e
 ZK_HOSTS: zookeeper:2181 
 APPLICATION_SECRET: xxxxx
 KAFKA_MANAGER_AUTH_ENABLED: "true"  # 开启kafka-manager权限校验
 KAFKA_MANAGER_USERNAME: admin       # 登陆账户
 KAFKA_MANAGER_PASSWORD: 123456      # 登陆密码
 ports:                              # 映射端口
 - "9000:9000"
 depends_on:                         # 解决容器依赖启动先后问题
 - kafka

Q.E.D.