===== 安装 =====
==== 准备环境 ====
虚拟机:
* 192.168.0.2 master
* 192.168.0.3 slave1
* 192.168.0.4 slave2
* jave版本:1.7.0-80。oracle官网下载
* kafka版本:kafka_2.11-0.8.2.1
==== 开始安装 ====
- 官网下载kafka
- 解压到/usr/local下
- 添加环境变量
export KAFKA_HOME=/usr/local/kafka_2.11-0.8.2.1
export PATH=$KAFKA_HOME/bin:$PATH
- 将kafka复制到slave1,slave2
scp -r ~/.bashrc hadoop@slave1:/home/hadoop/
scp -r ~/.bashrc hadoop@slave2:/home/hadoop/
scp -r /usr/local/kafka_2.11-0.8.2.1 hadoop@slave1:/usr/local/
scp -r /usr/local/kafka_2.11-0.8.2.1 hadoop@slave2:/usr/local/
- 修改每个节点的配置文件vim $KAFKA_HOME/config/server.properties
- 配置zookeeper.connect=master:2181,slave1:2181,slave2:2181
- [[http://my.oschina.net/frankwu/blog/305010|参数详解]]
一定要保证每个节点broker.id=的值唯一
==== 常用命令 ====
**启动(每台机器执行)**
kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.8.2.1/config/server.properties
**关闭(每台机器执行)**
pkill -9 -f server.properties
**创建一个叫做“test”的topic,它只有3个分区,一个副本**
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
**令查看所有创建的topic**
kafka-topics.sh --list --zookeeper localhost:2181
**测试运行producer并在控制台中输一些消息,这些消息将被发送到服务端**
kafka-console-producer.sh --broker-list localhost:9092 --topic test
crtl + c 结束
**测试运行consumer读取消息并输出到标准输出**
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
crtl + c 结束
**查看topic信息**
kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
下面解释一下这些输出。第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。
* leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
* replicas:列出了所有的副本节点,不管节点是否在服务中.
* isr:是正在服务中的节点.
**删除topic**
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
登陆zookeeper 删除test的元数据
rmr /brokers/topics/test
删除数据文件(log.dirs 参数指定)
rm -rf /tmp/kafka-logs/test
**查看lag**
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group nirvana_enterprise --topic fang_topic --zookeeper 10.57.19.201:2181,10.57.19.180:2181,10.57.19.248:2181