kafka shell脚本命令
Kafka运维Shell脚本命令
话题管理
查看当前kafka所有话题
./kafka-topics.sh --zookeeper localhost:2181 --list
查看具体话题配置
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic 话题名
创建话题
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 话题名
删除话题,kafka是标记删除并且服务端需配置server.properties delete.topic.enable=true
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic 话题名
增加话题分区数,只能增加不能减少
./kafka-topics.sh --zookeeper localhost:2181 .--alter --topic 话题名 --partitions 10
收发消息
发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --request-required-acks -1 --producer-property compression.type=gzip --topic 话题名
接收消息
offset 与partition组合使用,--group指定分组id
--offset latest|erliest|offset 指定消费最新、最早、位移
-- partition 指定分区
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 话题名 --group 分组id --consumer-property enable.auto.commit=false
Kafka Consumer重置Offset
在Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。
在Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。
在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作。
更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案。
Topic的作用域
--all-topics
:为consumer group下所有topic的所有分区调整位移)--topic t1 --topic t2
:为指定的若干个topic的所有分区调整位移--topic t1:0,1,2
:为指定的topic分区调整位移
重置策略
--to-earliest
:把位移调整到分区当前最小位移--to-latest
:把位移调整到分区当前最新位移--to-current
:把位移调整到分区当前位移--to-offset <offset>
: 把位移调整到指定位移处--shift-by N
: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动--to-datetime <datetime>
:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000--by-duration <duration>
:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S--from-file <file>
:从CSV文件中读取调整策略
确定执行方案
- 什么参数都不加:只是打印出位移调整方案,不具体执行
--execute
:执行真正的位移调整--export
:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用
注意事项
- consumer group状态必须是inactive的,即不能是处于正在工作中的状态
- 不加执行方案,默认是只做打印操作
常用示例
更新到当前group最初的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
更新到指定的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
更新到当前offset位置(解决offset的异常)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
offset位置按设置的值进行位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
offset设置到指定时刻开始
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.00x
授权
查看用户授权
./kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181
动态创建用户
./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=用户密码],SCRAM-SHA-512=[password=用户密码]' --entity-type users --entity-name 用户名
分配用户权限
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:用户名 --producer --consumer --group=分组id --topic 话题名
版权声明:
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
编程之家!
喜欢就支持一下吧