大众点评ETL数据传输平台设计与实现
lvzhuyiyi CSDN

1.1 功能分析

1.1.1 调度系统

 image.png

              图 3.1  调度系统用例图

调度系统负责ETL数据传输平台的运行、监控和资源分配(如图3.1)。

1) 定时调度。ETL任务无论是抽取还是转换类型都必须每天在特定时间(一般是晚上)被调度起来完成当天增量数据的传输,所以一般ETL传输平台必须有定时调度的功能。

2) 支持集群和并发。现在工业生产环境单机性能已经不能满足大中型企业的ETL需求,所以ETL传输平台必须能够运行在多台机器上,数据的抽取和写入目标表一般是ETL任务的时间瓶颈所在,加入并发机制能够大大缩短ETL任务时间,提供资源的利用效率。

3) 监控和日志。ETL任务抽取和转换过程中很可能出现部分失败或者超时现象,甚至异常终止也是常见的。或者数据ETL的结果存在错误与源数据并不一致。一个好的数据传输平台必须有好的告警、日志机制。并且每个任务都要数据一致性检查和部分失败容忍度,不合格则认定此次任务传输失败。

4) 资源管理。无论是执行机资源还是公共资源如Hadoop集群,Hive和Mysql等,他们的量或者并发数都是有限的,调度机在给准备运行的任务分配资源时,必须检查资源是否超过一定的限度,否则一旦任务依赖的资源超出限度,容易引起大批量的任务运行异常。

5) 任务隔离。任务运行环境是共用执行机环境,当一个耗资源的任务在运行时将会极大的影响其他任务运行,这对中小任务来说是不公平的(因为没有证据表明数据量大的任务重要性就高),而且ETL任务时间要求并不是那么紧迫。所以任务的运行环境隔离是非常有必要的。

1.1.2 数据交换

 image.png

图 3.2  数据交换模块用例图

数据交换模块负责ETL传输平台的数据同步(如图3.2)。

1) 异构连接:数据交换模块应该能够连接到绝大部分的种类的数据源和文  件格式并提供以下的基本功能:从各种关系型和NoSql数据库获取数据,如常见的Mysql、Oracle、Hive、Hbase、Redis等;从有分隔符或者固定格式的文本文件中获取数据,如Hive Text文件或者Lzo文件,Hbase的Hfile,前后端的日志文件等。此外,有的ETL工具还能够从办公软件中获取数据,但在国内这一般能通过以上两种方式解决。

2) 数据同步:数据交换模块应该能够在不同数据库之间同步数据。同步时数据传输方式有增量传输和全量传输方式。增量传输方式依赖于CDC(增量数据捕获)机制,CDC的实现有基于自增字段或时间戳字段等。

3) 合理分片。数据交换工具由于要运行多线程技术读写数据,就必须更具数据源和目标的类型特点进行读写任务分片,合理的分片机制能够有效提高数据交换的效率。

4) 预、后处理。在数据传输开始之前,我们必须先做一些如连接、数据源是否有有效数据等检查工作。写入数据目的地之前、我们必须先连接然后清理目标库中可能与本次传输重复的数据然后再写入。写入完成后,在做一些如中间表清理等工作。这样才能保证多次重复传输不产生一致性问题。

5) 过程统计。一次数据传输是一个复杂的过程,只有对整个数据传输过程进行监控并有效统计才能了解任务传输实时状态、总体情况和数据量大小。特别的,一次传输任务是否成功也要根据读与写成功的行数之差是否在可容忍的范围内判断。

1.1.3 开发前台

 image.png

图 3.3  开发前台模块用例图

开发前台模块负责ETL解决方案的任务设计、实例状态查询、代码上线、日志查看等(如图3.3)。

1) 任务设计:任务设计就是在图形化界面上配置ETL任务。这里的任务或者是一次传输、或者是一次转换(这里转换可以是多个转换的组合)。还可以配置多个上下游任务组合成任务流,一个任务流对应一个完整的ETL过程。

2) 代码上线:一般的ETL解决方案的代码通过用户的设计的任务自动生成,但是对有专业数据团队的来说,除了传输类型的任务、任务的代码可以自己编写脚本实现,然后上传到我们的执行机上。

3) 任务管理:用户可以查看和修改自己的任务配置,修改后的配置只对任务的下次运行生效。由于任务在企业里是团队级别的,所以你还可以查询同一团队的任务。

4) 实例管理:用户可以查看实例及其上下游的运行状态,如果运行异常,还可以查看实例的运行日志进行调试。同时用户可以对实例进行管理,如挂起,重跑置为成功等。

5) 任务预跑:任务运行起来有两种可能,一种是到指定触发时间被调度系统调度起来。二是预跑模式。即立刻被调度系统调度而不等待。这种运行模式主要是为了一次补足历史数据,这种运行模式数据的传输方式是全量传输。

6) 工作流的强、弱依赖。工作流里的任务上下游关系具有强依赖关系和弱依赖关系两种。强依赖关系的任务之间的执行顺序必须予以保证,弱依赖任务关系的任务之间的执行顺序只是建议保证。弱依赖用于有非必需数据的任务依赖关系。

7) 用户权限管理。用户在开发前台进行数据开发必须申请成为平台开发者,有些功能由于涉及数据安全或者紧缺资源还必须区分管理员权限和一般开发者权限。

8) 元数据管理。元数据管理是任何ETL工具都必须具有的基本功能。元数据包括三种:数据元数据、转换元数据和过程元数据。数据元数据是指输入输出数据源的连接信息、字段和类型。转换元数据是指转换过程中字段与字段之间的映射规则和血缘关系。过程元数据是指在ETL过程中的统计,如传输成功、失败的行数,传输的时间以及传输的效率等。

1.2 非功能分析

一个企业级的ETL解决方案需要运行良好,除了强大的功能外,还需要满足一些非功能的因素。

1) 平台独立。一个ETL解决方案必须在任何平台上运行,包括各种操作系统等。然而要完全满足这一要求会使系统很臃肿且难以维护,因此有一个折中方案是解决方案的Client端平台独立,但Server端的生产环境统一使用Linux集群。

2) 设计灵活性。ETL解决方案是为了提高ETL团队的开发效率,但如果过于自动化而忽略了现实的复杂性,固定的套路可能使你的工具脱离生产实践而备受限制,最后被淘汰。

3) 拓展性。任何一款ETL解决方案都不可能考虑了所有可能的因素,因此提供一个整体的框架并预留扩展的办法是明智的选择。将来你可能会有新的数据源类型、或者新的功能要加入你的解决方案,措手不及将让你的代码偏离原来的初衷而变得笨拙。

4) UI需求。完全的手工操作是笨拙的,低效的。配置和管理ETL任务提供必要的图形界面具有必要性。甚至查看日志分析调试,数据传输质量检查等工作也可以放在界面上给用户提供服务。

5) 模块低耦合。一个ETL解决方案可以由多部分构成,如果各部分耦合太高,对一个部分进行改进和升级就要对整体进行替换,这大大提高了升级和维护的成本。因此如果ETL平台各部分耦合较低,则可以由专门团队维护,升级一个模块对其他模块并无影响,这就有效提高了开发效率和降低了维护难度。

6) 高效传输。虽然ETL解决方案是非业务系统,对传输速度的要求比不是特别高。然而传输速度的提升还是很有意义的,它能够有效的提高集群等计算机资源的利用效率,提高数据平台各项数据工作的效率,因此一个好的ETL解决方案必须有较高的数据传输和处理效率

2.1 平台整体架构

该企业的数据传输平台主要由三部分构成:

1) 一部分是基于Quartz和Docker的调度系统。主要功能是负责调度和运行ETL任务。监视和变换当前任务实例的状态,当实例状态发生变化或者符合变化条件时主动更新数据库和内存中实例状态。

2) 一部分是数据交换工具Wormhole。主要用来多个数据库之间数据的交换,是ETL平台的基础。这个工具我们一般把它集成到Docker镜像中,同时我们还集成了各种脚本任务和Java运行环境的镜像,具体的选择取决于任务的类型。

3) 最后是基于Web的图形化工具。主要用来帮助用户手工配置任务、监控任务运行状态和日志查看进行调试。对任务和实例进行管理。管理操作包括挂起,预跑,置为成功,修改,杀死正在运行的实例等。

image.png

如上图(图4.1)所示,用户通过Web界面来创建和管理任务,把数据存储到数据库中,而Quartz调度系统的定时任务会把状态为新建的任务读出,初始化为实例,接着检查上游实例和并发状况,并申请资源,接下来就是根据负载均衡选择执行机,远程启动执行机上的容器,并把容器和实例映射关系存入数据库中。执行机上Docker守护进程收到启动命令后,还需要从远程仓库中根据任务类型下载镜像,如果是传输任务就下载Wormhole镜像,并设置启动命令为Wormhole启动命令(其他任务类型类似)。容器运行后,有定时任务会向每台执行机查询容器列表,并逐一检查容器的状态,更新实例和容器在数据库中的状态。这样,整个ETL传输平台就正常运转了起来。

1.2 调度系统Kepler架构

image.png

调度系统Kepler机制的关键就是调度机上的三个定时任务(如图4.2):Init任务、Ready任务和Running任务。

1) Init定时任务的运行频率是10分钟一次,用途是从数据库中获得所有有效任务检查下一次触发时间是否在,根据任务新建它的实例,并把这些实例保存到实例表中和内存Init队列中。

2) Ready定时任务运行频率是1分钟一次,负责把Init队列中的任务实例,检查运行条件(如前置任务的实例是否运行完成,是否有同时并发运行的实例等)是否满足,然后申请一些共享资源,主要是Hive、Mysql等并发资源,这一切成功后就把实例的状态改为Ready并把它从Init队列移到Ready队列中,并在数据库中更改实例状态为Ready。

3) Running进程的频率是30秒一次,主要如任务是定时通过Zookeeper查询在线的机器以免出现分配新的任务到已下线的机器上,以及选择被占Slot(槽位,对应一个Docker容器用的标准资源量)数最少的机器为任务运行的目标机器。然后根据任务类型确定镜像名称,利用Docker Client API远程创建和启动Docker容器。

另外,调度系统的三个比较重要的组件:资源管理器,负责统计所有任务依赖的共享资源,保证它们的总和不达到上限;状态管理器,维护这各种状态的实例列表;容器调度器,维护当前运行的容器列表和存活的执行机列表。

1.3 数据交换工具Wormhole架构

下面介绍一下Wormhole的架构,这里有两张图:

image.png

image.png


图4.3所示的是整个Wormhole的架构,但Splitter的作用不是很明显。图4.4说明了Splitter的作用。可以看出传输任务和数据源的关系是一对一,而与输出目的地的关系是一对多。

Wormhole可以在各种数据存储类型之间高速交换数据。整体上是Framework + Plugin架构[15],Framework提供了以行为单位的数据流缓冲机制,分片和多线程读、写机制,读写分离的数据交换队列等高性能数据传输技术。框架为读写、分片、预处理等插件都提供了通用的接口,针对每个数据类型的插件实现类负责处理具体的连接和读写等规则。一个数据传输任务对应一个进程,全部数据传输都在内存中进行。

框架为读写、分片、预处理等插件都提供了通用的接口,这些接口分Reader和Writer两类,如果你需要开发面向某种类型数据源的插件,只需实现这些接口即可。比如数据源是Oracle,数据传输目标是Mysql,那么对应要开发的读写插件就是出OracleReader和MysqlWriter插件,分片是OracleReaderSplitter和MysqlWriterSplitter。预处理是OracleReaderPeriphery和MysqlWriterPeriphery。把这些类实现相关接口即可,就算是加入了框架中。。

在生产实践中,每个数据输入类型除了要开发Reader和Writer,考虑到传输过程的性能和复杂性,还要为每个Reader和Writer开发一个Splitter和Periphery来做分段读、写和读写预处理工作。

Wormhole的数据交换主要使用了两种技术:读写双缓冲队列与线程池。下面详细介绍。线程池是一种复用线程对象、减少创建线程花销的技术手段[16]。现在线程池技术应用广泛。无论是主流的Web服务器TCP连接、数据库访问连接、文件、邮件之类的连接都有使用。这主要是因为两点:第一,这些服务器访问都有一个共同的特点:访问量大、频率高但每次连接的时间短。第二,线程池相对其他多线程技术也有如下的优点:1)线程数可以预先设定,并在实际使用中控制在预定数的一定范围内。这样就能有效控制创建多个线程带来的内存消耗,同时也减轻了JVM在垃圾回收上的压力。2)复用预先创建或已经存在的线程,提高了资源的利用效率。多个访问连接复用线程,这就大大节省了线程对象创建的时间,并且节省了系统资源,防止资源浪费[17]。3)提高系统响应速度。现在有资料表明,现代服务器在短时间内处理大量访问请求会创建大量线程,线程的创建和销毁时间会成为系统性能的瓶颈。因此复用线程对象能够降低服务器访问的延迟。

在互联网经典TCP协议服务器的请求处理逻辑中,监听TCP连接、数据发送和接收等事件的是主线程,而具体数据的收发则由Handler线程处理。于是就需要一个队列在主线程和各个Handler线程之间交换数据,类似于经典的生产者-消费者模式。这个队列读写都需要加锁,在实际处理过程中实际并发性能并不特别好,如果我们要提高并发性能就要用到双端缓冲队列。

双端缓冲队列是读写分离的两个队列,发送数据的线程把数据插入写队列,而读取数据的线程则从读队列中读取数据[18]。如果读数据时读队列为空且写队列不为空,则交换两个队列,否则阻塞等待。这个过程中有两把锁发挥作用:一是写锁。写锁用于写线程把数据插入写队列时以及读队列和写队列进行交换时。不过队列交换时,读线程必须具有读写两把锁,否则会死锁。二是读锁。读锁只用于读线程从读队列中获取数据时。最后还要读写缓冲队列长度的问题,队列短时,能够保证数据交换的及时性,但如果太短,队列交换频繁会降低并发性能。所以一般数据量大时队列长一些,反之则短一些。另外,双端缓冲队列还有两种实现策略使用于不同场景:

1) 读优先。数据消费者发现读队列为空时尝试交换读写队列。这种情况适合读比写速度慢的情况下。

2) 写优先。数据生产者发现写队列满时尝试交换读写队列。这种情况适合写比读慢的情况下。

1.4 开发前台Galaxy架构

开发前台是MVC(模型-视图-控制器)的架构。用户在视图层次配置管理任务,控制层负责处理业务逻辑和转发请求,模型层(数据访问层)进行持久化管理。考虑到系统的灵活性,我们并没把控制层和数据访问层放在同一服务器集群上访问,而是进行的分离,中间采用了该企业的服务框架中间件,进行远程RPC访问。下面我们来看看Galaxy和Galaxy-Halley的后端架构图(如图4.5)。

image.png

可以看出,用户在Web上配置和管理自己的任务发起Http请求,请求从Nginx服务器通过负载均衡交替选择服务器,由于没有Session Sticky,所以必须在Web前端存储用户登录信息(内部系统,并不存在安全问题)。Galaxy服务器在处理完逻辑后,就需要向Galaxy-Halley数据库服务发起远程RPC调用,获取或者修改实例信息。

image.png

下面我们重点讨论任务代码上线模块。模块的架构如上图(如图4.6)。

用户有些文件必须从本地上传,有些代码则是项目的代码,可以从Github上拉取项目打包结果,发布到执行机上。整个发布流程大概是:用户把文件上传到Galaxy Server或者Galaxy Server拉取Github的打包结果成功后,再把文件上传到Hdfs。利用Zookeeper实现的、个数与执行机个数相当的分布式消息队列发布消息给执行机,运行在执行机上的消息消费程序接收到消息后,立刻到Hdfs上的指定地址下载代码到该任务在执行机上的预订目录,无论下载成功还是失败,立刻往数据库中插入一条消费状态信息。Galaxy通过间隔一段时间轮询数据库来获取整个发布消费过程中各执行机的消费状况,一旦有机器消费失败或者时间超时,则返回失败。

1.1 Kepler生产实践

1.1.1Kepler生产实践

调度系统Kepler自从开发使用以来。调度的任务数量从最开始的几百个到后来的一万五左右,在投入使用的几年里,调度本身还算稳定,基本没有出过较大的事故。调度系统从比较粗糙到日趋完善也经历了很多次完善。

开始时,调度系统被调度起来的任务信息都会暂存到Zookeeper集群中,但随着传输平台在公司的推广、业务大规模的扩张。Zookeeper集群已经暂存不下同时调度起来的任务数据,这些数据会慢慢拖垮Zookeeper集群[22]。于是改成了从数据库读取,并对数据库进行读写分离,准备了只用于读的备库。大大提高了数据库的访问效率,并且调度系统的稳定性也随之提高[23]。

但调度系统还存在很多问题,一是资源管理的问题,主要是如果同时发起Hive连接或者Mysql连接的任务过多会导致有些任务连接数据库超时而导致任务失败,现在每晚2点左右是调度高峰期,同时起来几千个任务会导致很多共享资源出现并发问题。二是大任务抢夺集群资源,中小任务因为资源不足运行超时的问题经常发生,但其实对各部门来说,大小任务本身是同样重要的,集群应该采取一视同仁的策略,进行资源隔离是有必要的。

考虑到上述第一个问题,该企业对调度系统增加了一个资源管理器,统计对公共资源的并发数。当并发数超过一定数量时,就暂时不为状态为Init的任务分配资源,让其等待。以保证整体任务运行的稳定性。针对上述第二个问题,我们对每个任务运行都启用一个Docker容器,由于容器的资源量都是一定的,因此就保证大小任务的运行的公平性。

当然,该企业的调度系统依然存在不足,比如调度机器的单点故障,如何搭建HA(High Availablity)机制[24]提高调度系统的安全性和可靠性仍然是以后需要主要考虑的问题。

1.1.2Kepler性能测试

调度和执行机测试环境是32核64G内存、硬盘从240GB到2TB不等。调度机一台,执行机八台。目前Kepler调度系统总共管理了5000多个传输任务,1万多个计算任务。我们可以用这些任务来测试调度系统性能。调度机每十分钟从数据库中读取到触发时间的实例,没有出现问题。调度队列正常值在600个任务左右,此时系统运行良好,但当调度队列超过峰值1200个任务时,调度系统出现响应缓慢的情况,任务的执行状态会出现长时间凝滞的情况。

任务正常运行情况统计如下,任务成功率高达99%,任务失败的主要原因来自高峰时段Mysql数据库连接的失效或者Hadoop集群的抗压问题。另外一个主要原因是任务超时(超过3个小时),一般是因为数据量太大,无效数据较多的原因。计算任务的平均运行时间是1小时,传输任务的运行时间是1.5小时。

表 6.1  Kepler调度系统测试数据

任务数

平均运行时间

调度队列平均值

调度队列峰值

任务成功率

15000+

1.33小时

600个

1200个

99%

 

1.2Wormhole生产实践

1.2.1Wormhole生产实践

Wormhole开发测试完毕后,我们先整个数据中心推广,然后就是作业的按计划的迁移使用新工具。Wormhole的成功推广大大提高了ETL过程的全局可控性和管理性,同时传输效率大大提高、传输可靠性得以保证。比如,我们能够更有效的面对Mysql集群的变化对ETL任务的影响,也可以对所有的数据传输任务进行监控和日志采集。现在我们能通过Wormhole统计整个平台的数据传输规模,以及传输效率。这在以前分散传输的情况下是不可能的。

目前Wormhole已经在整个企业数据中心普遍使用,在整个数据传输平台中,我们大概每天运行有5000多道数据传输任务运行于每天的不同时段,每个传输任务量级都在600多万条记录。每天为该企业传输的数据量接近1T,占整个企业线上到线下传输数据量的60%(剩下的是Kafka拉取的日志和Storm实时分析产生的数据)。可以说是Wormhole为整个该企业数据传输作出了巨大的贡献。

Wormhole整体架构比较优秀,在生产实践中,我们后续只做了一些Mysql,Hive等数据库读写splitter(分片)类的二次开发,让数据库的读写的并发数能够更有效合理的,有效改善了以前读取时一个分片的单线程状况,大大提高一些瓶颈任务的传输时间。双端缓冲队列的并发性能得到了生产的检验。

 表6.2  Wormhole传输效率分析

传输类型

源工具

新工具

速度提升

平均速度

Mysql-Hive

Datax

Wormhole

130%

3.5万行/s

Hive-Mysql

getMerge+jdbcdump

Wormhole

115%

2.3万行/s

Hbase-Mysql

bulkLoader+jdbcdump

Wormhole

89%

2.7万行/s

                  

1.2.2Wormhole性能测试

Wormhole测试机器参数与Kepler相同,都是32核64G内存、硬盘从240GB到2TB不等。经过测试wormhole可适用多种数据库,其中包括Mysql,Hive,Hbase,Greenplum等。测试的多个任务的传输成功率达99%,失败原因与上文相同。传输性能根据源、目的数据库的种类分别在2到6万行/每秒。持续运行的稳定性在三天以上,实际上大部分任务运行不超过三个小时。最佳传输数据量在100TB以内。

表 6.3  Wormhole测试数据

数据源种类

传输成功率

传输速度峰值

传输速度均值

稳定运行时间

最佳传输数据量

符合要求

99%

6万行/秒

2.7万行/秒

 72小时

100TB以内

 

1.3Galaxy生产实践

1.3.1Galaxy生产实践

在没有任务开发前台前,许多任务都是手工配置插入数据库,很多任务配置不规范,产生很多遗留问题。而且手工配任务的方式也阻碍了数据传输平台本身的推广和其他团队的认可。

为了能够自动化配置任务,我们开发了任务配置、任务管理、任务监控、任务状态分析和数据质量等一系列一条龙服务。用户可以在我们的平台上配置、管理和监控自己的任务。大大节省了用户的开发时间。

目前Galaxy前台大概有500多名活跃的开发者,总共管理了5000多个传输任务,1万多个计算任务。而且每天还在以几十个任务左右的数量递增。日均PV,UV都在千级。而且后来还接入Storm的实时计算平台,Flume+Kafka日志抓取平台等,可以说Galaxy前台已经是该企业数据平台最有影响力的管理工具。大部分的数据传输,加载和转换工作都通过此平台进行。

表 6.4  Galaxy平台规模的扩张

指标

开发者

PV

UV

任务数

最初规模

30+

100+

20+

500+

现在规模

500+

2000+

400+

15000+

1.3.2Galaxy性能测试

Galaxy和Galaxy-Halley的测试环境都是32核64G内存,硬盘1TB。机器各位两台。Galaxy能支持的日均PV数峰值为10万左右,日均UV峰值为5万左右,每秒请求数峰值是1千次。Galaxy-Halley能支持的RPC请求峰值为每秒1300次。Ngix负载均衡服务器运行良好,承载量在日均10万PV以上。

表 6.5  Galaxy平台性能测试

Galaxy QPS

PV峰值

UV峰值

Galaxy-Halley QPS

请求负载均衡瓶颈

1000次

10万

5万

1300次

10万PV

 


CIO之家 www.ciozj.com 公众号:imciow
关联的文档
也许您喜欢