目录
kafka的启动脚本
kafka集群没有脚本能直接启动所有的节点,所以为了繁杂去每台机器启动,所以编写了脚本启动所有
注意参照替换
vim kafka-startall.sh
1 |
|
kafka的shell命令
jps -ml 查看kafka的运行情况
启动
1 | nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > /dev/null 2>&1 & |
创建 topic
1 | $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 2 --partitions 3 --topic shuaige |
查看消费位置
1 | $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk1:2181 --group testGroup |
查看某个 Topic 的详情
1 | $KAFKA_HOME/bin/kafka-topics.sh --topic test --describe --zookeeper zk1:2181,zk2:2181,zk3:2181 |
producer创建命令行消息生产者
1 | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic shuaige |
Consumer创建命令行消费者
也可以说是查看message
注意:从 kafka-0.9 版本及以后,kafka 的消费者组和 offset 信息就不存 zookeeper 了,而是存到 broker 服务器上,所以,如果你为某个消费者指定了一个消费者组名称(group.id),那么,一旦这个消费者启动,这个消费者组名和它要消费的那个 topic 的 offset 信息就会被记录在 broker 服务器上。
old版本
1 | $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic shuaige --from-beginning --group testGroup |
new版本
1 | $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic shuaige --from-beginning --group testGroup |
消费者要从头开始消费某个 topic 的全量数据,需要满足 2 个条件(spring-kafka):
1 | (1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过); |
对应的 spring-kafka 消费者客户端配置参数为:
1 | <!-- 指定消费组名 --> |
查看所有 topic
zk 表示 zk 的地址 地址表示 zookeeper 的地址
1 | $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper zk1:2181,zk2:2181,zk3:2181 |
删除 topic
1 | sh $KAFKA_HOME/bin/kafka-topics.sh --delete --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic test |