每台机器都需要kafka先安装kafka到h15,其他2台机器安装一样的kafka,但是需要修改 broker.id=?(保证?不和其他kafka机器重复)
1、进入kafka的zip所在目录下解压: unzip kafka_2.10-0.8.2.1.zip
2、进入目录/home/kafka_2.10-0.8.2.1/config/,修改文件server.propertis
注意参数:
broker.id=15 //kafka机器id,不能重复,必须从0开始
port=9092
log.dirs=/kafka-logs //消息文件所在的目录
num.partitions=1
log.retention.hours=168 //默认销毁数据时间是7天,公司中需要修改
zookeeper.connect=h15:2181,h16:2181,h17:2181
delete.topic.enable=true //默认是false,那么删除topic时,不会删除zookeeper上topic的元数据信息
Log.segment.bytes=1073741824 //默认的segment大小是1G,
注:不建议segment过大,因为过大会导致即使过期了,如果没有达到1G或者其他自定义数量,也不会删除,占用磁盘
3、启动zookeeper集群、再修改kafka的bin目录可执行权限:chmod +x /bin/*
分别启动h15\h16\h17 上的kafka #bash startkafka.sh
startkafka.sh文件内容:nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
修改h16\h17的server.propertis文件中的 broker.id=16 broker.id=17
4、插入topic,注意添加备份数量和partition数量 、查看topic
配置好了配置文件之后,启动zookeeper,然后bash startkafka.sh这个脚本,看到jps里面返回来有kafka这个进程就OKAY了!
# sh kafka-topics.sh --list --zookeeper h15:2181,h16:2181,h17:2181
#sh kafka-topics.sh --describe --zookeeper h15:2181,h16:2181,h17:2181
#sh kafka-topics.sh --create --topic 20160510a --zookeeper h15:2181,h16:2181,h17:2181 --partitions 2 --replication-factor 2
#sh kafka-topics.sh --describe --zookeeper h15:2181,h16:2181,h17:2181 --topic 20160510a
注:后台执行的语句
#nohup sh ??? &
一、在服务器上
暂时删除topic
#kafka-topics.sh --delete --zookeeper h15:2181,h16:2181,h17:2181 --topic 20160510a
Topic 20160510x is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
分别在两个窗口中启动生产者和消费者
#sh kafka-console-producer.sh --topic 20160510a --broker-list h15:9092,h16:9092,h17:9092
#sh kafka-console-consumer.sh --zookeeper h15:2181 --topic 20160510a
如果在生产者端输入数据后,在消费者端数据同步,说明正确!
错误:
kafka.common.KafkaException: fetching topic metadata for topics [Set(20160510a)] from broker [ArrayBuffer(id:1,host:h16,port:9020, id:0,host:h15,port:9020, id:2,host:h17,port:9020)] failed
ERROR Failed to send requests for topics 20160510a with correlation ids in [9,16] (kafka.producer.async.DefaultEventHandler)
原因:生产者端口不正确,更改9020为9092
二、写Java代码
对应项目:testkafka_jk01\src\kafka\examples的类进行测试
* @author root
* kafka生产者端
* 测试通过
* 测试步骤:
* 1\开启h15\h16\h17的zookeeper集群
* 2\再开启h15\h16\h17的kafka机器
* 3\先启动消费者端Consumer.java
* 4\再启动本程序,如果打印正常,那么正确,否则不正确
*
*/
/**
*
* @author root
* 消费者端启动
* 测试通过
*/
public class Consumer extends Thread
准备机器h18
1\上传Redis-3.2.0.tar.gz文件到/home目录下
2\解压文件
3\进入解压目录下
#cd redis-3.2.0
执行
#make
4\待编译安装完成后,启动服务器端
#src/redis-server
5\进入客户端,可以进行操作
#src/redis-cli
6\使用默认数据库(有16个默认数据库,数据库名称是0--->15)
#select 0
插入数据
#set cassie apple
查询数据
#get cassie
注意:最新版本的特点--》九宫格的Geohash算法,有位置信息的索引建立
CIO之家 www.ciozj.com 公众号:imciow