in kafka 监控 monitor consumer group ~ read.

一个简单的kafka consumer group的监控系统

当kafka第一次在生产环境上线的时候我们需要对kafka的状态进行监控,如果公司已经有现成的监控报警系统的话事情就会非常简单,只需要将kafka的metrics收集到监控系统中并设置一些报警指标即可。但现实是不是每一家公司都有监控报警系统的,或者现有的监控报警系统由于某种原因暂时无法支持kafka的监控。此时如果安排专人定时去生产环境检查kafka的状态,如果有问题则立即报告给相关人员,这是非常大的成本,姑且不说是否有人,即时有人的时候也不一定就不会出错。程序员的特点之一即是将一切手工的体力活自动化起来,下面我们来介绍如果使用简单的shell脚本去监控kafka的consumer group的状态。

监控脚本

  • 新建脚本如下,脚本命名为kafka-monitor.sh:
#!/bin/bash

# 修改此处指向正确的kafka home
KAFKA_GROUP_COMMAND="/{kafka_home}/kafka-consumer-groups.sh --bootstrap-server kafka1:9092"

# 获取kafka的consumer group
KAFKA_GROUP_LIST=$($KAFKA_GROUP_COMMAND --list)

# 将上面获取到的consumer group转换为数组
eval $(echo $KAFKA_GROUP_LIST | awk '{split($0, a, " ");for(i in a) print "KAFKA_GROUP_ARRAY["i"]="a[i]}')

len=${#KAFKA_GROUP_ARRAY[*]}

# 判断consumer group个数是否符合预期(此处是5),如果不符合发出报警邮件
if [ $len -ne 5 ]  
then  
  echo "kafka groups should be 5, but only found: "$KAFKA_GROUP_LIST | mail -s "kafka-warning" email@address.com
fi

echo "Totally $len groups"  
echo "GROUP   CURRENT-OFFSET  LOG-END-OFFSET    LAG"

# 循环检查每个consumer group的状态(此处只检查lag是否小于一万),如果不正常则发出报警邮件
i=0  
while [ $i -lt $len ]  
do  
  ((i++))
  group=${KAFKA_GROUP_ARRAY[$i]}

  $KAFKA_GROUP_COMMAND --describe --group $group | tail -1 | awk '{print $1" "$4" "$5" "$6}'

  result=$($KAFKA_GROUP_COMMAND --describe --group $group | tail -1 | awk '{print $6}')
  if [ $result -gt 10000 ]
  then
    echo $group": lag is great than 10000"
    echo "lag of kafka group["$group"] is "$result" now, please check its status" | mail -s "kafka-warning" email@address.com
  fi
done  
  • 通过以下命令将该脚本修改为可执行命令:
chmod 700 kafka-monitor.sh  

配置定期执行脚本

  • 修改/etc/crontab文件中的PATH为与你本机的PATH相同(可使用echo $PATH命令查看),这是因为kafka提供的脚本是封装的java调用,而cron默认的PATH是不包含JDK和其他一些潜在依赖项的。如下所示
PATH=/usr/local/kafka/bin::/usr/local/jdk/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin  
  • 在/etc/crontab文件中通过添加以下行来将上述新建的shell脚本定期执行。以下是指每小时的第一分钟只用root用户执行路径为/root/kafka-monitor.sh的shell命令,请根据需求进行修改。
01 * * * * root /root/kafka-monitor.sh  
  • 执行以下命令让cron任务生效
service crond reload  

到此配置完毕,当consumer group消费消息的offset低于kafka中最新消息的offset的时候就会有报警邮件自动发送出来,这说明这个kafka的消费者消费速度过低或者突然有大量消息进入kafka,此时请密切关注kafka及相关业务系统的的运行状态以避免出现重大问题。

分享按钮