安装Kafka
下载地址
Apache Kafka
解压文件
tar -zxvf kafka_2.12-3.0.0.tgz
cd kafka_2.12-3.0.0
启动Kafka环境
注意:您的本地环境必须安装 Java 8+。
Apache Kafka可以使用ZooKeeper或KRaft启动。要开始使用任一配置,请遵循以下部分之一,但不能同时遵循两者。
ZooKeeper启动
运行以下命令以正确的顺序启动所有服务:
cd bin
bin/zookeeper-server-start.sh ../config/zookeeper.properties
打开另一个终端会话并运行:
#先进入KafKa bin目录
./kafka-server-start.sh ../config/server.properties
成功启动所有服务后,您将拥有一个基本的 Kafka 环境运行并可供使用。
Kafka-Kraft启动
cd config
生成集群 UUID
../bin/kafka-storage.sh random-uuid
#执行命令后会生成一个uuid EQE2bU8TTg-RIDHgtqRdUw
#格式化目录的时候要用到这个uuid
格式化日志目录
../bin/kafka-storage.sh format -t EQE2bU8TTg-RIDHgtqRdUw -c ./kraft/server.properties
#默认目录 Formatting /tmp/kraft-combined-logs
启动 Kafka 服务器
../bin/kafka-server-start.sh ./kraft/server.properties
#关闭Kafka ctrl+c
一旦 Kafka 服务器成功启动,您将拥有一个基本的 Kafka 环境运行并可供使用。
后台不间断运行
nohup ../bin/kafka-server-start.sh ./kraft/server.properties >/dev/null 2>&1 &
#关闭后台运行
../bin/kafka-server-stop.sh
主题命令行操作
1)查看操作主题命令参数
.bin/kafka-topics.sh
2)查看当前服务器中的所有 topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list
3)创建 first topic
./kafka-topics.sh --create --topic first --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
选项说明:
--topic 定义 topic 名
--replication-factor 定义副本数
--partitions 定义分区数
4)查看 first 主题的详情
./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first
5)修改分区数(注意:分区数只能增加,不能减少)
./kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 2
6)删除 topic
./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first
生产者命令行操作
1)查看操作生产者命令参数
./kafka-console-producer.sh
2)发送消息
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
消费者命令行操作
1)查看操作消费者命令参数
./kafka-console-consumer.sh
2)消费消息
(1)消费 first 主题中的数据。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
(2)把主题中所有的数据都读取出来(包括历史数据)。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
使用docker-compose搭建Kafka
搭建kafka环境的docker-compose.yml
文件
version: '3'
services:
zookepper:
image: wurstmeister/zookeeper # 原镜像`wurstmeister/zookeeper`
container_name: zookeeper # 容器名为'zookeeper'
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "/etc/localtime:/etc/localtime"
ports: # 映射端口
- "2181:2181"
kafka:
image: wurstmeister/kafka # 原镜像`wurstmeister/kafka`
container_name: kafka # 容器名为'kafka'
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "/etc/localtime:/etc/localtime"
environment: # 设置环境变量,相当于docker run命令中的-e
KAFKA_BROKER_ID: 0 # 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://ip:9092 # TODO 将kafka的地址端口注册给zookeeper
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 # 配置kafka的监听端口
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "hello_world"
KAFKA_HEAP_OPTS: -Xmx1G -Xms256M
ports: # 映射端口
- "9092:9092"
depends_on: # 解决容器依赖启动先后问题
- zookepper
kafka-manager:
image: sheepkiller/kafka-manager # 原镜像`sheepkiller/kafka-manager`
container_name: kafka-manager # 容器名为'kafka-manager'
environment: # 设置环境变量,相当于docker run命令中的-e
ZK_HOSTS: zookeeper:2181
APPLICATION_SECRET: xxxxx
KAFKA_MANAGER_AUTH_ENABLED: "true" # 开启kafka-manager权限校验
KAFKA_MANAGER_USERNAME: admin # 登陆账户
KAFKA_MANAGER_PASSWORD: 123456 # 登陆密码
ports: # 映射端口
- "9000:9000"
depends_on: # 解决容器依赖启动先后问题
- kafka
Q.E.D.