Producer :消息生产者,向broker发消息的客户端。
Consumer :消息消费者,向broker取消息的客户端
Topic :一个队列,主题。
Message:消息是kafka处理的对象,在kafka中,消息是被发布到broker的topic中。而consumer也是从相应的topic中拿数据。也就是说,message是按topic存储的
Consumer Group :将topic消息的广播发给consumer的手段。一个topic可以有多个CG。
Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
安装过程:
(1)下载解压。官网下载kafka,http://kafka.apache.org/
解压到安装目录下 tar -xcvf
(2)修改配置文件/usr/local/kafka/config/server.properties,修改如下内容
broker.id=0
host.name=hadoop1
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181,hadoop4:2181,hadoop5:2181,hadoop6:2181
(3)修改完配置文件即可将整个文件夹传输到其他节点 scp -r 。。。。
(4)传输完之后修改每个节点的broker.id的编号,递增。
(5)启动zookeeper。
这边可以使用kafka自带的zookeeper,也可以使用自己安装的zookeeper。
启动自己安装的zookeeper : /app/zookeeper-3.4.6/bin/zkServer.sh start
各个节点均启动完成之后,可以查看zk的状态 /app/zookeeper-3.4.6/bin/zkServer.sh status
(6)启动kafka
/app/kafka_2.9.2-0.8.2.1/bin/kafka-server-start.sh /app/kafka_2.9.2-0.8.2.1/config/server.properties &
尾部加上&的作用是可以启动完之后直接按回车退出,继续下一步操作。也可不加&
(7)创建topic
/app/kafka_2.9.2-0.8.2.1/bin/kafka-topics.sh --create --topic idoall_testTopic --replication-factor 6 --partitions 2 --zookeeper hadoop1:2181
可通过指令查看所有的topic /app/kafka_2.9.2-0.8.2.1/bin/kafka-topics.sh --list --zookeeper hadoop2:2181
(8)发送消息
/app/kafka_2.9.2-0.8.2.1/bin/kafka-console-producer.sh --broker-list hadoop3:9092 --sync --topic idoall_testTopic
(9)消费消息。重新打开一个终端执行指令
/app/kafka_2.9.2-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic idoall_testTopic --from-beginning
在原终端中输入消息,新终端中会显示出输入的消息。
关闭kafka指令。 /app/kafka_2.9.2-0.8.2.1/bin/kafka-server-stop.sh /app/kafka_2.9.2-0.8.2.1/config/server.properties &
java调用kafka
首先创建一个java project,网上很多说需创建maven工程,本人经过测验,发现maven工程和普通的java project均可调用。
需要注意,工程所采用的jar包,可以在相应版本的kafka安装文件夹的lib目录下引用,不同版本的jar包可能不通用,
如果出现java.lang.NoClassDefFoundError: scala/reflect/ClassManifest的报错,可能是由于jar包不匹配引起的。
Producer端代码:
package com;
import java.util.Date;
import java.util.Properties;
import java.text.SimpleDateFormat;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Producertest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zk.connect", "hadoop1:2181/kafka,hadoop2:2181/kafka,hadoop3:2181/kafka,hadoop4:2181/kafka,hadoop5:2181/kafka,hadoop6:2181/kafka");
// serializer.class为消息的序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 配置metadata.broker.list, 为了高可用, 最好配两个broker实例
props.put("metadata.broker.list", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
// 设置Partition类, 对队列进行合理的划分
//props.put("partitioner.class", "idoall.testkafka.Partitionertest");
// ACK机制, 消息发送需要kafka服务端确认
props.put("request.required.acks", "1");
props.put("num.partitions", "6");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (int i = 0; i < 10; i++)
{
// KeyedMessage<K, V>
// K对应Partition Key的类型
// V对应消息本身的类型
// topic: "test", key: "key", message: "message"
SimpleDateFormat formatter = new SimpleDateFormat ("yyyy年MM月dd日 HH:mm:ss SSS");
Date curDate = new Date(System.currentTimeMillis());//获取当前时间
String str = formatter.format(curDate);
String msg = "idoall.org" + i+"="+str;
String key = i+"";
producer.send(new KeyedMessage<String, String>("idoall_testTopic",key, msg));
}
}
}
Consumer端代码:
package com;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class Consumertest extends Thread{
private final ConsumerConnector consumer;
private final String topic;
public static void main(String[] args) {
Consumertest consumerThread = new Consumertest("idoall_testTopic");
consumerThread.start();
}
public Consumertest(String topic) {
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic =topic;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
// 设置zookeeper的链接地址
props.put("zookeeper.connect","hadoop1,hadoop2,hadoop3,hadoop4,hadoop5,hadoop6:2181");
// 设置group id
props.put("group.id", "1");
// kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
props.put("auto.commit.interval.ms", "1000");
props.put("zookeeper.session.timeout.ms","10000");
return new ConsumerConfig(props);
}
public void run(){
//设置Topic=>Thread Num映射关系, 构建具体的流
Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap);
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =stream.iterator();
System.out.println("*********Results********");
while(it.hasNext()){
System.err.println("get data:" +new String(it.next().message()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
之后运行代码。
运行Producer端代码,会发现之前的客户端中会接收到代码中发送的消息。
运行consumer端代码,在终端中输入消息,eclipse中会读取到发送的消息,打印出来。
至此,简单的java调用kafka操作完成。
如果将Kafka在zookeeper的默认目录,修改为自定义目录时,在运行过程中会报出java.lang.IllegalArgumentException: Path length must be > 0”错误
网上找了好久,发现别人说这是一个Bug,由于initZk()方法没有对路径进行处理导致
原代码:
private def initZk(): ZkClient = {
info("Connecting to zookeeper on " + config.zkConnect)
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}
解决代码:
private def initZk(): ZkClient = {
info("Connecting to zookeeper on " + config.zkConnect)
val chroot = { if (config.zkConnect.indexOf("/") > 0)
config.zkConnect.substring(config.zkConnect.indexOf("/")) else
""
} if (chroot.length > 1) {
val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
info("Created zookeeper path " + chroot)
zkClientForChrootCreation.close()
}
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}
如果无法修改,那么可以将自定义目录修改成原来的默认目录,则不会报错。
CIO之家 www.ciozj.com 公众号:imciow