用户工具


安装

准备环境

虚拟机:

  • 192.168.0.2 master
  • 192.168.0.3 slave1
  • 192.168.0.4 slave2
  • jave版本:1.7.0-80。oracle官网下载
  • kafka版本:kafka_2.11-0.8.2.1

开始安装

  1. 官网下载kafka
  2. 解压到/usr/local下
  3. 添加环境变量
    export KAFKA_HOME=/usr/local/kafka_2.11-0.8.2.1
    export PATH=$KAFKA_HOME/bin:$PATH

  4. 将kafka复制到slave1,slave2

    scp -r ~/.bashrc hadoop@slave1:/home/hadoop/
    scp -r ~/.bashrc hadoop@slave2:/home/hadoop/
    scp -r /usr/local/kafka_2.11-0.8.2.1 hadoop@slave1:/usr/local/
    scp -r /usr/local/kafka_2.11-0.8.2.1 hadoop@slave2:/usr/local/

  5. 修改每个节点的配置文件vim $KAFKA_HOME/config/server.properties
    1. 配置zookeeper.connect=master:2181,slave1:2181,slave2:2181
    2. 参数详解

      一定要保证每个节点broker.id=的值唯一

常用命令

kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.8.2.1/config/server.properties
pkill -9 -f server.properties
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
kafka-topics.sh --list --zookeeper localhost:2181
kafka-console-producer.sh --broker-list localhost:9092 --topic test

crtl + c 结束 
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

crtl + c 结束 
kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

下面解释一下这些输出。第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。

  • leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
  • replicas:列出了所有的副本节点,不管节点是否在服务中.
  • isr:是正在服务中的节点.
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
登陆zookeeper 删除test的元数据
rmr /brokers/topics/test
删除数据文件(log.dirs 参数指定)
rm -rf /tmp/kafka-logs/test
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group nirvana_enterprise --topic fang_topic --zookeeper 10.57.19.201:2181,10.57.19.180:2181,10.57.19.248:2181