本文基于Kafka 0.8
1. 引言
Kafka是LinkedIn开发并开源出来的一个高吞吐的分布式消息系统。其具有以下特点:
1) 支持高Throughput的应用
2) scale out:无需停机即可扩展机器
3) 持久化:通过将数据持久化到硬盘以及replication防止数据丢失
4) 支持online和offline的场景。
2. 介绍
kafka使用scala开发,支持多语言客户端(c++、java、python、go等)其架构如下[2]:
Producer:消息发布者
Broker:消息中间件处理结点,一个kafka节点就是一个broker
Consumer:消息订阅者
kafka的消息分几个层次:
1) Topic:一类消息,例如page view日志,click日志等都可以以topic的形式存在,kafka集群能够同时负责多个topic的分发
2) Partition: Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
3) Message:消息,最小订阅单元
具体流程:
1. Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面
2. kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。
3. Consumer从kafka集群pull数据,并控制获取消息的offset
3. 设计
ThroughPut
High Throughput是kafka需要实现的核心目标之一,为此kafka做了以下一些设计:
1)数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能
2)zero-copy:减少IO操作步骤
3)数据批量发送
4)数据压缩
5)Topic划分为多个partition,提高parallelism
load balance&HA
1) producer根据用户指定的算法,将消息发送到指定的partition
2) 存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上
3) 多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over
4) 通过zookeeper管理broker与consumer的动态加入与离开
pull-based system
由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:
1)简化kafka设计
2)consumer根据消费能力自主控制消息拉取速度
3)consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等
Scale Out
当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作出调整。
Reference
[1] Apache Kafka
[2] Kafka: a Distributed Messaging System for Log Processing
[3] Kafka Client
Kafka(二):环境搭建&测试
在一台机器上构建一个3个节点的kafka集群,并测试producer、consumer在正常情况下的行为,以及在lead broker/follow broker失效情况下的行为
1.下载并解压kafka 0.8.0 release
$ mkdir kafka
$ wget http://apache.dataguru.cn/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz
$ tar -zxvf kafka_2.8.0-0.8.0.tar.gz
$ cd kafka_2.8.0-0.8.0
$ ll
total 2560
drwxr-xr-x 6 root root 4096 Dec 17 17:44 ./
drwxr-xr-x 4 root root 4096 Dec 17 18:20 ../
drwxr-xr-x 3 root root 4096 Dec 17 18:16 bin/
drwxr-xr-x 2 root root 4096 Dec 17 17:43 config/
-rw-r--r-- 1 root root 2520145 Nov 27 06:21 kafka_2.8.0-0.8.0.jar
drwxr-xr-x 2 root root 4096 Nov 27 06:21 libs/
-rw-r--r-- 1 root root 12932 Nov 27 06:21 LICENSE
drwxr-xr-x 2 root root 4096 Dec 17 18:00 logs/
-rw------- 1 root root 47165 Dec 17 18:10 nohup.out
-rw-r--r-- 1 root root 162 Nov 27 06:21 NOTICE
2.启动一个单节点的zookeeper
$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
3. 准备启动一个3个broker节点的kafka集群,因此做如下配置
$ cp config/server.properties config/server-1.properties
$ cp config/server.properties config/server-2.properties
并做如下修改:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
说明:
broker.id: broker节点的唯一标识
port: broker节点使用端口号
log.dir: 消息目录位置
4. 启动3个broker节点
$ JMX_PORT=9997 bin/kafka-server-start.sh config/server-1.properties &
$ JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &
$ JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
5. 创建topic并查看
$ bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 3 --partition 1 --topic 3test
creation succeeded!
$ bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,1,0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 1 replicas: 0,1,2 isr: 1,2,0
说明:
partiton: partion id,由于此处只有一个partition,因此partition id 为0
leader:当前负责读写的lead broker id
relicas:当前partition的所有replication broker list
isr:relicas的子集,只包含出于活动状态的broker
6.启动consumer & producer,并在producer启动后的console输入一些信息
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic 3test
message1
message3
message2
$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic 3test
message1
message3
message2
producer发送的数据consumer都能正常消费
7. 干掉follow broker
杀掉一个非lead broker(lead broker id为2)
$ pkill -9 -f server-1.properties
查看topic:
$ bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 2 replicas: 0,1,2 isr: 2,0
此时,存活的broker只有2,0
测试:produce发送消息,consumer能正常接收到
8. 继续干掉leader broker
干掉leader broker后,连续查看topic状态
$ pkill -9 -f server-2.properties
$ bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 2 replicas: 0,1,2 isr: 2,0
$ bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 2 replicas: 0,1,2 isr: 2,0
$ bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: 3test partition: 0 leader: 0 replicas: 2,1,0 isr: 0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 0 replicas: 0,1,2 isr: 0
$ bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: 3test partition: 0 leader: 0 replicas: 2,1,0 isr: 0
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
topic: test_topic partition: 0 leader: 0 replicas: 0,1,2 isr: 0
杀掉leader broker过了一会,broker 0成为新的leader broker
测试:produce发送消息,consumer能正常接收到
参考:Kafka QuickStart
CIO之家 www.ciozj.com 公众号:imciow