kafka集群与使用参数调优
Kafka常见参数调优
服务器
操作系统
- 操作系统配置可打开文件限制数据为100000
root soft nofile 100000
root hard nofile 100000
- 调整内核读写缓冲区大小,修改/etc/sysctl.conf文件
// 单个 socket buffer 的最大内存限制
net.core.optmem_max = 33554432
//单个 socket receive buffer 默认内存限制
net.core.rmem_default = 33554432
//单个 socket receive buffer 最大内存限制
net.core.rmem_max = 67108864
- 配置文件内存映射最大值,修改/etc/syctl.conf文件
vm.max_map_count=262144
JVM配置
-Xmx6g -Xms6g
-XX:MetaspaceSize=96m
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50
-XX:MaxMetaspaceFreeRatio=80
-XX:+ExplicitGCInvokesConcurrent
Kafka Borker配置
- kafka listeners与advertised.listeners区别,listeners配置是用来绑定主机IP跟端口地址,并且可以配置监听地址的认证协议。advertised.listners用于对外部公开的网络IP和端口,用于告知客户端如何连接到kafka broker,一般用于nginx代理等情况。
listeners: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9094
advertised_listeners: INTERNAL://内网IP:9092,EXTERNAL://公网IP:端口
kafka_listener_security_protocol_map: "INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT"
- 在生产环境把自动创建topic关闭,便于运维管理
auto.create.topics.enable=false
- 消息保持时间,容量,最大消息长度
log.retention.{hour|minutes|ms}:控制一条消息保存多少时间
log.retention.bytes:指定 Broker 为消息保存磁盘容量的大小
message.max.bytes:控制 Broker 能够接受的最大消息
log.retention.hour=168 //7天
message.max.bytes=209715200 //200MB,设置大点,防止过长消息丢失
- 落后的副本不参与竞选leader,防止数据丢失
unclean.leader.election.enable=false
- 禁止定期进行leader自动选举,这里设置为false,避免kafka集群性能防止抖动
auto.leader.rebalance.enable=false
``
5. borker处理客户端请求最大线程数,参考cpu核心数据
num.network.threads=4
- broker处理磁盘IO线程数,数值应该大于客户端请求线程数
num.io.threads=8
生产者
max.request.size=10485760 #最大发送消息大小
bootstrap.servers=localhost:9092 #kafka集群地址
retries=1 #发生异常重试次数
max.in.flight.requests.per.connection=1 #该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
acks=0|1|all #0代表不管brokers端是否接收到、1代表分区leader接收到、all代表分区leader和所有副本都接收到了,才代表发送成功
retry.backoff.ms=200 #消息重试间隔时间
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer.class #key序列化方式
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer.class #value序列化方式
compression.type=gzip #消息压缩算法
linger.ms=10 #一个Batch被创建之后,最多等待延迟时间,到点时间后不管这个Batch有没有写满,消息都必须发送出去
batch.size=16384 #batch.size默认值是16KB,kafka Producer凑够16KB的数据才会发送
max.request.size=1638 #客户端能够发送的最大消息大小
buffer.memory=67108864 #KafkaProducer能够使用的内存缓冲的大小的,默认值32MB,buffer.memory设置的太小,可能导致的问题是:消息快速的写入内存缓冲里,但Sender线程来不及把Request发送到Kafka服务器,会造成内存缓冲很快就被写满。而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。
消费者
group.id=log #消费者组id
enable.auto.commit=false #禁止自动提交位移
session.timeout.ms=60000 #向消费者所消费的分区的broker的coordinator提交心跳信息最大间隔,避免被认为挂掉
heartbeat.interval.ms=100 #发送心跳时间的频率
max.poll.interval.ms=30000 #consumer两次调用poll的最大间隔,超过这个间隔则认为该消费者挂掉
auto.offset.reset=earliest|latest|none #earliest 已经消费过数据,从上次消费处接着消费。未消费从最早一条消费,lastest:已经消费过数据,从上次消费处接着消费。未消费从最新一条开始消费,none:各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常
max.poll.records=10 #一次poll拉取消息的个数
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer.class #key的反序化方法
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer.class #value的反序列化方法
fetch.min|max.bytes=1024*1024*5 #一次拉取最少|最多消息大小
版权声明:
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
编程之家!
喜欢就支持一下吧