===== 安装 ===== ==== 准备环境 ==== 虚拟机: * 192.168.0.2 master * 192.168.0.3 slave1 * 192.168.0.4 slave2 * jave版本:1.7.0-80。oracle官网下载 * kafka版本:kafka_2.11-0.8.2.1 ==== 开始安装 ==== - 官网下载kafka - 解压到/usr/local下 - 添加环境变量 export KAFKA_HOME=/usr/local/kafka_2.11-0.8.2.1 export PATH=$KAFKA_HOME/bin:$PATH - 将kafka复制到slave1,slave2 scp -r ~/.bashrc hadoop@slave1:/home/hadoop/ scp -r ~/.bashrc hadoop@slave2:/home/hadoop/ scp -r /usr/local/kafka_2.11-0.8.2.1 hadoop@slave1:/usr/local/ scp -r /usr/local/kafka_2.11-0.8.2.1 hadoop@slave2:/usr/local/ - 修改每个节点的配置文件vim $KAFKA_HOME/config/server.properties - 配置zookeeper.connect=master:2181,slave1:2181,slave2:2181 - [[http://my.oschina.net/frankwu/blog/305010|参数详解]] 一定要保证每个节点broker.id=的值唯一 ==== 常用命令 ==== **启动(每台机器执行)** kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.8.2.1/config/server.properties **关闭(每台机器执行)** pkill -9 -f server.properties **创建一个叫做“test”的topic,它只有3个分区,一个副本** kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test **令查看所有创建的topic** kafka-topics.sh --list --zookeeper localhost:2181 **测试运行producer并在控制台中输一些消息,这些消息将被发送到服务端** kafka-console-producer.sh --broker-list localhost:9092 --topic test crtl + c 结束 **测试运行consumer读取消息并输出到标准输出** kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning crtl + c 结束 **查看topic信息** kafka-topics.sh --describe --zookeeper localhost:2181 --topic test 下面解释一下这些输出。第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。 * leader:负责处理消息的读和写,leader是从所有节点中随机选择的. * replicas:列出了所有的副本节点,不管节点是否在服务中. * isr:是正在服务中的节点. **删除topic** bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name 登陆zookeeper 删除test的元数据 rmr /brokers/topics/test 删除数据文件(log.dirs 参数指定) rm -rf /tmp/kafka-logs/test **查看lag** ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group nirvana_enterprise --topic fang_topic --zookeeper 10.57.19.201:2181,10.57.19.180:2181,10.57.19.248:2181