大数据平台的数据同步服务实践

来源:知乎 作者:lfyzjck

引言

在大数据系统中,我们往往无法直接对在线系统中的数据直接进行检索和计算。在线系统所使用关系型数据库、缓存数据库存储数据的方式都非常不同,很多存储系统并不适合分析型(OLAP)的查询,也不允许分析查询影响到在线业务的稳定性。从数仓建设的角度思考,数据仓库需要依赖于稳定和规范的数据源,数据需要经过采集加工后才能真正被数仓所使用。推动数据同步服务的平台化,才有可能从源头规范数据的产出。数据同步服务不像数据挖掘一样可以直接产生价值,但它更像是连接在线系统和离线系统的高速公路,好的同步工具可以很大程度上提升数据开发的效率。

本文主要介绍知乎在数据同步这方面的建设,工具选型和平台化的实践。

业务场景及架构

由于在线业务的数据库在知乎内部还是以 MySQL 为主,在数据同步的数据源方面主要考虑 MySQL 和 Hive 的互相同步,后续可以考虑支持 HBase。早期数据同步使用 Oozie + Sqoop 来完成,基本满足业务需求。但是随着数据同步任务的不断变多,出现了很多重复同步的例子,对同步任务的负载管理也是空白。凌晨同步数据高峰导致 MySQL 不断报警,DBA 苦不堪言。对于业务来说,哪些表已经被同步了,哪些表还没有也是一个黑盒子,依赖其他业务方的数据都只能靠口头的约定。为了解决这些问题,决定对数据同步做一个统一的平台,简化同步任务的配置,调度平衡负载,管理元信息等等。到现在为止,数据同步平台支撑了上千张表的同步,每天同步的数据量超过 10TB。

技术选型

数据同步工具市面上有很多解决方案,面向批的主要有 Apache Sqoop 和阿里开源的 DataX,其他商业的数据同步工具不在本文讨论范围。下面主要对比这两种数据同步工具。

Sqoop

Pros:

  • 基于 MapReduce 实现,容易并行和利用现有集群的计算资源

  • 和 Hive 兼容性好,支持 Parquet,ORC 等格式

  • 支持自动迁移 Schema

  • 社区强大,遇到的问题容易解决

Cons:

  • 支持的数据源不算太丰富(比如 ES),扩展难度大

  • 不支持限速,容易对 MySQL 造成压力

DataX

Pros:

  • 支持的数据源丰富尤其是支持从非关系型数据库到关系型数据库的同步

  • 支持限速

  • 扩展方便,插件开发难度低

Cons:

  • 需要额外的运行资源,当任务比较多的时候费机器

  • 没有原生支持导出到 Hive,需要做很多额外的工作才能满足需求

考虑到同步本身要消耗不少的计算和带宽资源,Sqoop 可以更好的利用 Hadoop 集群的资源,而且和 Hive 适配的更好,最终选择了 Sqoop 作为数据同步的工具。

平台化及实践

平台化的目标是构建一个相对通用的数据同步平台,更好的支持新业务的接入,和公司内部的系统集成,满足业务需求。平台初期设计的目标有以下几个:

  • 简单的任务配置界面,方便新的任务接入

  • 监控和报警

  • 屏蔽 MySQL DDL 造成的影响

  • 可扩展新数据源

整体系统架构如下图:

image.png

API Server 用于提供用户界面和 RESTFul API。数据源中心存储数据源信息,并从真实的数据源定期更新,保持比较新的数据。Scheduler 负责规划任务的执行资源,保护 MySQL 集群避免负载过高。Worker 真实的执行任务,分布在多个节点上。

简化任务接入

平台不应该要求每个用户都理解底层数据同步的原理,对用户而言,应该是描述数据源 (Source) 和目标存储 (Sink),还有同步周期等配置。所有提供的同步任务应该经过审核,防止未经许可的数据被同步,或者同步配置不合理,增加平台负担。最后暴露给用户的 UI 大概如下图。

增量同步

对于数据量非常大的数据源,如果每次同步都是全量,对于 MySQL 的压力会特别大,同步需要的时间也会很长。因此需要一种可以每次只同步新增数据的机制,减少对于 MySQL 端的压力。但是增量同步不是没有代价的,它要求业务在设计业务逻辑和表结构的时候,满足下面任意条件:

  • 只插入新数据,不做删除和修改(类似日志)

  • 只有插入和更新操作,删除操作通过一个标志位的更新做软删除代替,同时数据库有一个字段用来标记该行记录最后更新的时间戳

如果满足上面条件,数据量比较大的表就可以采用增量同步的方式拉取。小数据量的表不需要考虑增量同步,因为数据和合并也需要时间,如果收益不大就不应该引入额外的复杂性。一个经验值是行数 <= 2000w 的都属于数据量比较小的表,具体还取决于存储的数据内容(比如有很多 Text 类型的字段)。

处理 Schema 变更

做数据同步永远回避不掉的一个问题就是 Schema 的变更,对 MySQL 来说,Schema 变更就是数据库的 DDL 操作。数据同步平台应该尽可能屏蔽 MySQL DDL 对同步任务的影响,并且对兼容的变更,及时变更推送到目标存储。

数据同步平台会通过数据源中心定时的扫描每个同步任务上游的数据源,保存当前 Schema 的快照,如果发现 Schema 发生变化,就通知下游做出一样的变更。绝大部分的 DDL 还是增加字段,对于这种情况数据同步平台可以很好屏蔽变更对数仓的影响。对于删除字段的操作原则上禁止的,如果一定要做,需要走变更流程,通知到依赖该表的业务方,进行 Schema 同步的调整。

和调度平台的集成

MySQL 的数据通常会作为后续 ETL 的数据源,位于整个数据链路的最顶端。知乎内部自研了离线任务调度器,根据数据的依赖关系自动解析任务的依赖,按照合理的顺序启动 ETL 任务。数据同步平台和调度平台打通后,可以在每个同步任务结束后,通知调度器启动下游的后继任务,而不用依赖平台和用户口头约定启动时间。如果数据同步出现延时,调度器也可以很好的屏蔽这个问题。待数据同步恢复后,数据链路也随之恢复。

监控和报警

根据 USE 原则,大概整理出下面几个需要监控的指标:

  • MySQL 机器的负载,IOPS,出入带宽

  • 调度队列长度,Yarn 提交队列长度

  • 任务执行错误数

报警更多是针对队列饱和度和同步错误进行的

平台优化和实践

资源管理

当同步任务越来越多时,单纯的按照任务启动时间来触发同步任务已经不能满足需求。数据同步应该保证对于线上业务没有影响,在此基础上速度越快越好。这里本质上是让 Sqoop 充分又不过度利用 MySQL 的 IOPS,快速拉取数据同时避免资源过度竞争。为了避免数据同步对线上服务的影响,对于需要数据同步的 MySQL 单独建立一个从节点,隔离线上流量,只提供给数据同步和业务离线查询使用。除此之外,需要一个调度策略来决定一个任务何时执行。由于任务的总数量并不多,但是每个任务可能会执行非常长的时间,对调度器的压力并不大,最终决定采用类似 YARN 的一个中央式的资源调度器,调度器的状态都持久化在数据库中,方便重启或者故障恢复。最终架构图如下

image.png

最终任务的调度流程如下:

  1. 每个 MySQL 实例是调度器的一个队列,根据同步的元信息决定该任务属于哪个队列

  2. 根据要同步数据量预估资源消耗,向调度器申请该队列对应大小的资源

  3. 调度器将任务提交到执行队列,没有意外的话会立刻开始执行

  4. Monitor 定时向调度器汇报 MySQL 节点的负载,如果负载过高就停止向该队列提交新的任务

  5. 任务结束后向调度器释放资源

从早期依靠 Crontab 调度任务到引入调度器,MySQL 集群的资源被更充分的利用。在数据同步高峰期基本不会出现负载空置的情况,任务的平均执行时间只有原先的一半。对 DBA 来说,MySQL 集群的负载报警也大幅减少。

存储格式

Hive 默认的格式是 Textfile,这是一种类似 CSV 的存储方式,所有数据以文本的形式存储,但是对于 OLAP 查询来说压缩比太低,查询性能不好。通常我们会选择一些列式存储提高存储和检索的效率。Hive 中比较成熟的列式存储格式有 Parquet 和 ORC。这两个存储的查询性能相差不大,但是 ORC 和 Hive 集成更好而且对于非嵌套数据结构查询性能是优于 Parquet 的。但是知乎内部因为也用了 Impala,早期的 Impala 版本不支持 ORC 格式的文件,为了兼容 Impala 最终选择了 Parquet 作为默认的存储格式。

关于列式存储的原理和 Benchmark,可以参考这个 Slide

针对不同的数据源选择合适的并发数

Sqoop 是基于 MapReduce 实现的,提交任务前先会生成 MapReduce 代码,然后提交到 Hadoop 集群。Job 整体的并发度就取决于 Mapper 的个数。Sqoop 默认的并发数是 4,对于数据量比较大的表的同步显然是不够的,对于数据量比较小的任务又太多了,这个参数一定要在运行时根据数据源的元信息去动态决定。

优化 Distributed Cache 避免任务启动对 HDFS 的压力

在平台上线后,随着任务越来越多,发现如果 HDFS 的性能出现抖动,对同步任务整体的执行时间影响非常大,导致夜间的很多后继任务受到影响。开始推测是数据写入 HDFS 性能慢导致同步出现延时,但是任务大多数会卡在提交阶段。随着进一步排查,发现 MapReduce 为了解决不同作业依赖问题,引入了 Distributed Cache 机制可以将 Job 依赖的 lib 上传到 HDFS,然后再启动作业。Sqoop 也使用了类似的机制,会依赖 Hive 的相关 lib,这些依赖加起来有好几十个文件,总大小接近 150MB,虽然对于 HDFS 来说是很小数字,但是当同步任务非常多的时候,集群一点点的性能抖动都会导致调度器的吞吐大幅度下降,最终同步的产出会有严重延时。最后的解决方法是将 Sqoop 安装到集群中,然后通过 Sqoop 的参数 --skip-distcache避免在任务提交阶段上传依赖的 jar。

关闭推测执行(Speculative Execution)

所谓推测执行是这样一种机制:在集群环境下运行 MapReduce,一个 job 下的多个 task 执行速度不一致,比如有的任务已经完成,但是有些任务可能只跑了10%,这些任务将会成为整个 job 的短板。推测执行会对运行慢的 task 启动备份任务,然后以先运行完成的 task 的结果为准,kill 掉另外一个 task。这个策略可以提升 job 的稳定性,在一些极端情况下加快 job 的执行速度。

Sqoop 默认的分片策略是按照数据库的主键和 Mapper 数量来决定每个分片拉取的数据量。如果主键不是单调递增或者递增的步长有大幅波动,分片就会出现数据倾斜。对于一个数据量较大的表来说,适度的数据倾斜是一定会存在的情况,当 Mapper 结束时间不均而触发推测执行机制时,MySQL 的数据被重复且并发的读取,占用了大量 io 资源,也会影响到其他同步的任务。在一个 Hadoop 集群中,我们仍然认为一个节点不可用导致整个 MapReduce 失败仍然是小概率事件,对这种错误,在调度器上增加重试就可以很好的解决问题而不是依赖推测执行机制。

展望

数据同步发展到比较多的任务后,新增的同步任务越来越多,删除的速度远远跟不上新增的速度,总体来说同步的压力会越来越大,需要一个更好的机制去发现无用的同步任务并通知业务删除,减轻平台的压力。

另外就是数据源的支持不够,Hive 和 HBase、ElasticSearch 互通已经成了一个呼声很强烈的需求。Hive 虽然可以通过挂外部表用 SQL 的方式写入数据,但是效率不高有很难控制并发,很容易影响到线上集群,需要有更好的实现方案才能在生产环境真正的运行起来。

另外这里没有谈到的一个话题就是流式数据如何做同步,一个典型的场景就是 Kafka 的日志实时落地然后实时进行 OLAP 的查询,或者通过 MySQL binlog 实时更新 Kudu 或者 ElasticSearch。关于这块的基础设置知乎也在快速建设中,非常欢迎感兴趣同学投简历到 data-arch@zhihu.com ,加入知乎大数据架构组。


相关文档推荐

大语言模型服务管理的实践.PDF

1744103522 马元元 8.6MB 27页 积分5

百度构建人机协同新范式的实践.PDF

1744103389 牛万鹏 2.72MB 33页 积分6

质量大模型及其在接口测试场景下的实践.PDF

1744026767 李庆泉 2.18MB 21页 积分5

基于GenAI的混合云智能运维实践.PDF

1744026734 周彩钦 2.29MB 23页 积分5

AI ChecklistQUNAR测试域结合AIGC提效实践.PDF

1744026643 崔宸 2.73MB 30页 积分6

大模型驱动的手工测试用例生成的探索与实践.PDF

1743985422 张克鹏 3.12MB 17页 积分5

人工智能技术发展与应用实践.PDF

1743586449 史树明 5.88MB 35页 积分6

离散制造破局之道主数据管理平台重构.PDF

1742450737 詹慧超 4.6MB 37页 积分6

Database Copilot在数据库领域的落地.PDF

1741937032 李粒 6.08MB 59页 积分6

相关文章推荐