Kafka是一个分布式的数据流平台,常用来作为消息传送中间件。
本篇介绍Kafka的使用方法,以Linux系统为例 (windows系统只需将下面的命令"bin/"都改为"bin\windows\",脚本扩展名“.sh”改为“.bat”),适合刚接触Kafka和zookeeper的新手。原文http://kafka.apache.org/documentation#quickstart
1. 下载软件包,并解压
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz
> tar -xzf kafka_2.11-0.10.1.0.tgz
> cd kafka_2.11-0.10.1.0
2. 启动服务
kafka自带了zookeeper,可以直接使用它建立一个单节点的zookeeper 实例。也可以自己安装配置zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
注意:windows上因为文件夹空格问题可能导致启动zookeeper失败,我直接启动就报错
错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_51\lib;D:\Program:
解决方法:修改bin\windows\kafka-run-class.bat 文件里面142行,给%CLASSPATH%加上双引号:
[html] view plain copy 在CODE上查看代码片派生到我的代码片
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
启动kafka server
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
3. 创建topic
topic 名字为test,1个分块和1个副本
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic列表
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
4. producer发送消息
kafka的命令行客户端可以将文件或标准输入作为消息来发送到kafka集群。默认情况下每一行都是独立一个消息。
发送消息时要运行producer。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
按ctrl+c退出消息发送。
5. consumer接收消息
consumer订阅了topic test就可以接收上面的消息。命令行运行consumer将接收到的消息显示在终端:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
你可以在两个终端分别运行producer和consumer,更直观地在这边发送消息,另一边接收消息。
6. 设置多个代理(multi-broker)集群
以上我们是在一个代理来传送消息,实际上一个代理也是集群,只不过是单节点。下面我们就创建两个代理实例来看一下它们是怎么协作的。
在只有一台机器的情况下,可以开放不同的端口来区分不同的代理。
先将server.properties拷贝两个副本:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
编辑副本:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id属性对于每个节点必须是唯一的。使用不同的log目录是为了每个节点存储自己的日志文件,不会互相覆盖。
启动这两个节点:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
创建一个新的topic,有1个分块和3个副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
执行describe topics命令来观察哪一个代理在处理该topic。
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:
Topic: my-replicated-topicPartition: 0Leader: 1Replicas: 1,2,0Isr: 1,2,0
第一行是topic概观,描述该topic的信息。接下来每一行显示该topic一个分块的信息。因为我们这个topic只有一个分块,所以只有一行。
Leader:负责该分块的读写,随机选举出来的节点;
replica:复制分块的节点;
isr:是replica的子集,leader的候选集,即slave。
在该例子中,node 1是my-replicated-topic唯一一个分块的leader。
接下来我们向my-replicated-topic发送一些消息:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
通过消费者接受消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
容错测试。我们kill作为leader的Broker 1:
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564
执行describe topics:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:
Topic: my-replicated-topicPartition: 0Leader: 2Replicas: 1,2,0Isr: 2,0
可以看到node 2 成为了新的leader,备胎集Isr里面也没了node 1。这样的变动完全不影响消费者取数据:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
7. kafka connect输入/输出数据
只从终端来输入发送消息和显示接受消息显然不能满足大家的需求。kafka提供了一个连接外部系统的工具kafka connector。kafka connector可以将各种数据源作为消息输入(包括文件、程序输出、log),也可以将接受到的消息写入文件。
本例以读写文件为例,创建文件:
> echo -e "foo\nbar" > test.txt
接着运行两个connectors。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
三个配置文件:
第一个是kafka链接过程,包括一些通用的配置如连接的broker、数据序列化的格式;
第二个和第三个配置文件各指定了一个connector,里面的配置项包括唯一的连接名、连接类等。source connector读文件,将每一行作为一个消息发送出去。sink connector从kafka接收消息并逐行写入文件。
source connector的配置:
[plain] view plain copy 在CODE上查看代码片派生到我的代码片
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
sink connector的配置:
[plain] view plain copy 在CODE上查看代码片派生到我的代码片
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
source创建了topic “connect-test” ,sink订阅topic “connect-test”。两个connector启动过后,可以在kafka根目录下生成了test.sink.txt,验证内容是不是正确:
> cat test.sink.txt
foo
bar
当然也可以从终端消费者读取topic connect-test
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
动态地往test.txt里面追加内容,消费者能接近实时地获取消息。
> echo "Another line" >> test.txt
追加的这一行会在test.sink.txt和终端都显示出来。
CIO之家 www.ciozj.com 公众号:imciow