如今大数据在各行业的应用越来越广泛:运营基于数据关注运营效果,产品基于数据分析关注转化率情况,开发基于数据衡量系统优化效果等。
美图公司有美拍、美图秀秀、美颜相机等十几个 app,每个 app 都会基于数据做个性化推荐、搜索、报表分析、反作弊、广告等,整体对数据的业务需求比较多、应用也比较广泛。
因此美图数据技术团队的业务背景主要体现在:业务线多以及应用比较广泛。这也是促使我们搭建数据平台的一个最主要的原因,由业务驱动。
美图数据平台整体架构
如图所示是我们数据平台的整体架构。在数据收集这部分,我们构建一套采集服务端日志系统 Arachnia,支持各 app 集成的客户端 SDK,负责收集 app 客户端数据;同时也有基于 DataX 实现的数据集成(导入导出);Mor 爬虫平台支持可配置的爬取公网数据的任务开发。
数据存储层主要是根据业务特点来选择不同的存储方案,目前主要有用到 HDFS、MongoDB、Hbase、ES 等。在数据计算部分,当前离线计算主要还是基于 Hive&MR、实时流计算是 Storm 、 Flink 以及还有另外一个自研的 bitmap 系统 Naix。
在数据开发这块我们构建了一套数据工坊、数据总线分发、任务调度等平台。数据可视化与应用部分主要是基于用户需求构建一系列数据应用平台,包括:A/B 实验平台、渠道推广跟踪平台、数据可视化平台、用户画像等等。
右侧展示的是一些各组件都可能依赖的基础服务,包括地理位置、元数据管理、唯一设备标识等。
如下图所示是基本的数据架构流图,典型的 lamda 架构,从左端数据源收集开始,Arachnia、AppSDK 分别将服务端、客户端数据上报到代理服务 collector,通过解析数据协议,把数据写到 kafka,然后实时流会经过一层数据分发,最终业务消费 kafka 数据进行实时计算。
离线会由 ETL 服务负责从 Kafka dump 数据到 HDFS,然后异构数据源(比如 MySQL、Hbase 等)主要基于 DataX 以及 Sqoop 进行数据的导入导出,最终通过 hive、kylin、spark 等计算把数据写入到各类的存储层,最后通过统一的对外 API 对接业务系统以及我们自己的可视化平台等。
数据平台的阶段性发展
企业级数据平台建设主要分三个阶段:
刚开始是基本使用免费的第三方平台,这个阶段的特点是能快速集成并看到 app 的一些统计指标,但是缺点也很明显,没有原始数据除了那些第三方提供的基本指标其他分析、推荐等都无法实现。所以有从 0 到 1 的过程,让我们自己有数据可以用;
在有数据可用后,因为业务线、需求量的爆发,需要提高开发效率,让更多的人参与数据开发、使用到数据,而不仅仅局限于数据研发人员使用,所以就涉及到把数据、计算存储能力开放给各个业务线,而不是握在自己手上;
在当数据开放了以后,业务方会要求数据任务能否跑得更快,能否秒出,能否更实时;另外一方面,为了满足业务需求集群的规模越来越大,因此会开始考虑满足业务的同时,如何实现更节省资源。
美图现在是处于第二与第三阶段的过渡期,在不断完善数据开放的同时,也逐步提升查询分析效率,以及开始考虑如何进行优化成本。接下来会重点介绍 0 到 1 以及数据开放这两个阶段我们平台的实践以及优化思路。
从 0 到 1
从 0 到 1 解决从数据采集到最终可以使用数据。如图 4 所示是数据收集的演进过程,从刚开始使用类似 umeng、flurry 这类的免费第三方平台,到后面快速使用 rsync 同步日志到一台服务器上存储、计算,再到后面快速开发了一个简单的python脚本支持业务服务器上报日志,最终我们开发了服务端日志采集系统 Arachnia 以及客户端 AppSDK。
数据采集是数据的源头,在整个数据链路中是相对重要的环节,需要更多关注:数据是否完整、数据是否支持实时上报、数据埋点是否规范准确、以及维护管理成本。因此我们的日志采集系统需要满足以下需求:
能集成管理维护,包括 Agent 能自动化部署安装升级卸载、配置热更、延迟方面的监控;
在可靠性方面至少需要保证 at least once;
美图现在有多 IDC 的情况,需要能支持多个 IDC 数据采集汇总到数据中心;
在资源消耗方面尽量小,尽量做到不影响业务。
基于以上需求我们没有使用 flume、scribe、fluentd,最终选择自己开发一套采集系统 Arachnia。
上图是 Arachnia 的简易架构图,它通过系统大脑进行集中式管理。puppet 模块主要作为单个 IDC 内统一汇总 Agent 的 metrics,中转转发的 metrics 或者配置热更命令。采集器 Agent 主要是运维平台负责安装、启动后从 brain 拉取到配置,并开始采集上报数据到 collector。
接着看 Arachnia 的实践优化,首先是 at least once 的可靠性保证。不少的系统都是采用把上报失败的数据通过 WAL 的方式记录下来,重试再上报,以免上报失败丢失。我们的实践是去掉 WAL,增加了 coordinator 来统一的分发管理 tx 状态。
开始采集前会从 coordinator 发出 txid,source 接收到信号后开始采集,并交由 sink 发送数据,发送后会ack tx,告诉 coordinator 已经 commit。coordinator 会进行校验确认,然后再发送 commit 的信号给 source、sink 更新状态,最终 tx 完 source 会更新采集进度到持久层(默认是本地 file)。该方式如果在前面 3 步有问题,则数据没有发送成功,不会重复执行;如果后面 4 个步骤失败,则数据会重复,该 tx 会被重放。
基于上文的 at least once 可靠性保证,有些业务方是需要唯一性的,我们这边支持为每条日志生成唯一 ID 标识。另外一个数据采集系统的主要实践是:唯一定位一个文件以及给每条日志做唯一的 MsgID,方便业务方可以基于 MsgID 在发生日志重复时能在后面做清洗。
我们一开始是使用 filename,后面发现 filename 很多业务方都会变更,所以改为 inode,但是 inode linux 会回收重复利用,最后是以 inode & 文件头部内容做 hash 来作为fileID。而 MsgID 是通过 agentID & fileID & offset 来唯一确认。
数据上报之后由 collector 负责解析协议推送数据到 Kafka,那么 Kafka 如何落地到 HDFS 呢? 首先看美图的诉求:
支持分布式处理;
涉及到较多业务线因此有多种数据格式,所以需要支持多种数据格式的序列化,包括 json、avro、特殊分隔符等;
支持因为机器故障、服务问题等导致的数据落地失败重跑,而且需要能有比较快速的重跑能力,因为一旦这块故障,会影响到后续各个业务线的数据使用;
支持可配置的 HDFS 分区策略,能支持各个业务线相对灵活的、不一样的分区配置;
支持一些特殊的业务逻辑处理,包括:数据校验、过期过滤、测试数据过滤、注入等;
基于上述诉求痛点,美图从 Kafka 落地到 HDFS 的数据服务实现方式如图 7 所示。
基于 Kafka 和 MR 的特点,针对每个 kafka topic 的 partition,组装 mapper 的 inputsplit,然后起一个 mapper 进程处理消费这个批次的 kafka 数据,经过数据解析、业务逻辑处理、校验过滤、最终根据分区规则落地写到目标 HDFS 文件。
落地成功后会把这次处理的 meta 信息(包括 topic、partition、开始的 offset、结束的offset)存储到 MySQL。下次再处理的时候,会从上次处理的结束的 offset 开始读取消息,开始新一批的数据消费落地。
实现了基本功能后难免会遇到一些问题,比如不同的业务 topic 的数据量级是不一样的,这样会导致一次任务需要等待 partition 数据量最多以及处理时间最长的 mapper 结束,才能结束整个任务。那我们怎么解决这个问题呢?系统设计中有个不成文原则是:分久必合、合久必分,针对数据倾斜的问题我们采用了类似的思路。
首先对数据量级较小的 partition 合并到一个 inputsplit,达到一个 mapper 可以处理多个业务的 partition 数据,最终落地写多份文件。
另外对数据量级较大的 partition 支持分段拆分,平分到多个 mapper 处理同一个 partition,这样就实现了更均衡的 mapper 处理,能更好地应对业务量级的突增。
除了数据倾斜的问题,还出现各种原因导致数据 dump 到 HDFS 失败的情况,比如因为 kafka 磁盘问题、hadoop 集群节点宕机、网络故障、外部访问权限等导致该 ETL 程序出现异常,最终可能导致因为未 close HDFS 文件导致文件损坏等,需要重跑数据。那我们的数据时间分区基本都是以天为单位,用原来的方式可能会导致一个天粒度的文件损坏,解析无法读取。
我们采用了分两阶段处理的方式:mapper 1 先把数据写到一个临时目录,mapper 2 把 Hdfs 的临时目录的数据 append 到目标文件。这样当 mapper1 失败的时候可以直接重跑这个批次,而不用重跑整天的数据;当 mapper2 失败的时候能直接从临时目录 merge 数据替换最终文件,减少了重新 ETL 天粒度的过程。
在数据的实时分发订阅写入到 kafka1 的数据基本是每个业务的全量数据,但是针对需求方大部分业务都只关注某个事件、某小类别的数据,而不是任何业务都消费全量数据做处理,所以我们增加了一个实时分发 Databus 来解决这个问题。
Databus 支持业务方自定义分发 rules 往下游的 kafka 集群写数据,方便业务方订阅处理自己想要的数据,并且支持更小粒度的数据重复利用。
上图可以看出 Databus 的实现方式,它的主体基于 Storm 实现了 databus topology。Databus 有两个 spout,一个支持拉取全量以及新增的 rules,然后更新到下游的分发 bolt 更新缓存规则,另外一个是从 kafka 消费的 spout。而 distributionbolt 主要是负责解析数据、规则 match,以及把数据往下游的 kafka 集群发送。
数据开放
有了原始数据并且能做离线、实时的数据开发以后,随之而来的是数据开发需求的井喷,数据研发团队应接不暇。所以我们通过数据平台的方式开放数据计算、存储能力,赋予业务方有数据开发的能力。
对实现元数据管理、任务调度、数据集成、DAG 任务编排、可视化等不一一赘述,主要介绍数据开放后,美图对稳定性方面的实践心得。
数据开放和系统稳定性是相爱相杀的关系:一方面,开放了之后不再是有数据基础的研发人员来做,经常会遇到提交非法、高资源消耗等问题的数据任务,给底层的计算、存储集群的稳定性造成了比较大的困扰;另外一方面,其实也是因为数据开放,才不断推进我们必须提高系统稳定性。
针对不少的高资源、非法的任务,我们首先考虑能否在 HiveSQL 层面能做一些校验、限制。如图 13 所示是 HiveSQL 的整个解析编译为可执行的 MR 的过程:
首先基于 Antlr 做语法的解析,生成 AST,接着做语义解析,基于AST 会生成 JAVA 对象 QueryBlock。基于 QueryBlock 生成逻辑计划后做逻辑优化,最后生成物理计划,进行物理优化后,最终转换为一个可执行的 MR 任务。
我们主要在语义解析阶段生成 QueryBlock 后,拿到它做了不少的语句校验,包括:非法操作、查询条件限制、高资源消耗校验判断等。
第二个在稳定性方面的实践,主要是对集群的优化,包括:
我们完整地对 Hive、Hadoop 集群做了一次升级。主要是因为之前在低版本有 fix 一些问题以及合并一些社区的 patch,在后面新版本都有修复;另外一个原因是新版本的特性以及性能方面的优化。我们把 Hive 从 0.13 版本升级到 2.1 版本,Hadoop 从 2.4 升级到 2.7;
对 Hive 做了 HA 的部署优化。我们把 HiveServer 和 MetaStoreServer 拆分开来分别部署了多个节点,避免合并在一个服务部署运行相互影响;
之前执行引擎基本都是 On MapReduce 的,我们也在做 Hive On Spark 的迁移,逐步把线上任务从 Hive On MR 切换到 Hive On Spark;
拉一个内部分支对平时遇到的一些问题做 bugfix 或合并社区 patch 的特性;
在平台稳定性方面的实践最后一部分是提高权限、安全性,防止对集群、数据的非法访问、攻击等。提高权限主要分两块:API 访问与集群。
API Server :上文提到我们有 OneDataAPI,提供给各个业务系统访问数据的统一 API。这方面主要是额外实现了一个统一认证 CA 服务,业务系统必须接入 CA 拿到 token 后来访问OneDataAPI,OneDataAPI 在 CA 验证过后,合法的才允许真正访问数据,从而防止业务系统可以任意访问所有数据指标。
集群:目前主要是基于 Apache Ranger 来统一各类集群,包括 Kafka、Hbase、Hadoop 等做集群的授权管理和维护;
以上就是美图在搭建完数据平台并开放给各个业务线使用后,对平台稳定性做的一些实践和优化。
总结
首先在搭建数据平台之前,一定要先了解业务,看业务的整体体量是否比较大、业务线是否比较广、需求量是否多到严重影响我们的生产力。如果都是肯定答案,那可以考虑尽快搭建数据平台,以更高效、快速提高数据的开发应用效率。如果本身的业务量级、需求不多,不一定非得套大数据或者搭建多么完善的数据平台,以快速满足支撑业务优先。
在平台建设过程中,需要重点关注数据质量、平台的稳定性,比如关注数据源采集的完整性、时效性、设备的唯一标识,多在平台的稳定性方面做优化和实践,为业务方提供一个稳定可靠的平台。
在提高分析决策效率以及规模逐渐扩大后需要对成本、资源做一些优化和思考。
CIO之家 www.ciozj.com 公众号:imciow