环境
ip | hostname | server_id |
---|---|---|
192.168.1.111 | UAT04 | 2 |
192.168.1.112 | UAT03 | 1 |
192.168.1.102 | UAT05 | 3 |
配置hosts:
192.168.1.112 UAT03192.168.1.111 UAT04192.168.1.102 UAT05
安装jdk
# rpm -ivh jdk-8u65-linux-x64.rpm# java -versionjava version "1.8.0_65"Java(TM) SE Runtime Environment (build 1.8.0_65-b17)Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
下载软件包
# wget http://apache.fayea.com/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz# wget http://apache.fayea.com/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
安装kafka+zookeeper
# tar -xf zookeeper-3.4.8.tar.gz -C /usr/local/# tar -xf kafka_2.11-0.10.0.1.tgz -C /usr/local/
三台机器都这样操作
配置zookeeper
# cd /usr/local/zookeeper/conf/# cp zoo_sample.cfg zoo.cfg# vim zoo.cfgtickTime=2000initLimit=10syncLimit=5dataDir=/usr/local/zookeeper/dataclientPort=2181dataLogDir=/usr/local/zookeeper/logsserver.1 = 192.168.1.112:3888:4888server.2 = 192.168.1.111:3888:4888server.3 = 192.168.1.102:3888:4888# mkdir /usr/local/zookeeper/{data,logs}# echo 1 > /usr/local/zookeeper/data/myid将整个目录拷贝到另外两台机器(注意修改myid、server_id、hostname)
配置kafka
# vim /usr/local/kafka/config/server.propertiesbroker.id=1port=9092host.name=UAT03advertised.host.name=UAT03num.network.threads=9num.io.threads=16socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=16default.replication.factor=2num.recovery.threads.per.data.dir=1log.flush.interval.messages=10000log.flush.interval.ms=1000log.retention.hours=720log.segment.bytes=1073741824log.retention.check.interval.ms=300000log.cleaner.enable=falsezookeeper.connect=192.168.1.112:2181,192.168.1.111:2181,192.168.1.102:2181zookeeper.connection.timeout.ms=6000同时将整个kafka目录拷到另外两台机器,修改broker.id、hostname、
拷贝zookeeper+kafka目录至另外机器
# scp -r zookeeper/ kafka/ 192.168.1.111:/usr/local/# scp -r zookeeper/ kafka/ 192.168.1.102:/usr/local/# echo 2 > /usr/local/zookeeper/data/myid# echo 3 > /usr/local/zookeeper/data/myid修改192.168.1.111上配置:/usr/local/kafka/config/server.propertiesbroker.id=2host.name=UAT04advertised.host.name=UAT04修改192.168.1.111上配置:/usr/local/kafka/config/server.propertiesbroker.id=3host.name=UAT05advertised.host.name=UAT05
启动zookeeper+kafka集群
# /usr/local/zookeeper/bin/zkServer.sh start & 3台机器都执行查看zookeeper集群状态:[root@UAT-03 ~]# /usr/local/zookeeper/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /usr/local/zookeeper/bin/../conf/zoo.cfgMode: follower[root@UAT-04 ~]# /usr/local/zookeeper/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /usr/local/zookeeper/bin/../conf/zoo.cfgMode: leader[root@UAT-05 ~]# /usr/local/zookeeper/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /usr/local/zookeeper/bin/../conf/zoo.cfgMode: follower# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & 3台机器都执行[2016-08-16 18:34:04,139] INFO [Kafka Server 1], started (kafka.server.KafkaServer)[2016-08-16 18:34:36,712] INFO [Kafka Server 2], started (kafka.server.KafkaServer)[2016-08-16 18:34:42,738] INFO [Kafka Server 3], started (kafka.server.KafkaServer)
上面的启动方式结束会话退出shell时,进程会被杀掉,依赖当前会话,使用下面方式启动
# nohup /usr/local/zookeeper/bin/zkServer.sh start &# nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties&
简单使用测试
创建topic:
# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.111:2181 --replication-factor 1 --partitions 1 --topic test &
列出topic:
# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.1.111:2181test
使用web监控
# mkdir /usr/local/kafka_monitor# cd /usr/local/kafka_monitor 将下载好的jar包上传到此处# vim mobile_start_en.sh#!/bin/bashjava -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 192.168.1.112:2181,192.168.1.111:2182,192.168.1.102:2181 --port 8086 --refresh 10.seconds --retain 7.days 1>mobile-logs/stdout.log 2>mobile-logs/stderr.log &# mkdir mobile-logs# chmod +x mobile_start_en.sh# ./mobile_start_en.sh访问:http://192.168.1.112:8086/
Zookeeper for systemd service
[Unit]Description=zookeeper.serviceAfter=network.target[Service]User=zkType=forkingEnvironment=ZOO_LOG_DIR=/data/zk/zookeeper_logsEnvironment=JAVA_HOME=/usr/java/jdk1.8.0_121ExecStart=/usr/local/zookeeper/bin/zkServer.sh startExecStop=/usr/local/zookeeper/bin/zkServer.sh stopExecReload=/usr/local/zookeeper/bin/zkServer.sh restart[Install]WantedBy=multi-user.target
systemctl daemon-reloadsystemctl restart zookeepersystemctl enable zookeeper
Kafka for systemd service
[Unit]Description=kafka.serviceAfter=network.target[Service]User=zkType=simplePIDFile=/var/run/kafka.pidEnvironment=KAFKA_LOG_DIR=/data/zk/kafka_logsExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiesExecStop=/usr/local/kafka/bin/kafka-server-stop.shExecReload=$ExecStop;$ExecStart[Install]WantedBy=multi-user.target
systemctl daemon-reloadsystemctl restart kafkasystemctl enable kafka
测试生产消费
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list zk1:9092,zk2:9092,zk3:9092 --topic test1 生产/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:2181 --from-beginning --topic test1 消费
kafka消息监控
可以使用kafka-manager来管理集群