Hudi 实践 | 使用 Apache Hudi 构建下一代数据湖 | 第十六期社群图文直播

by June 2021-11-22

智领云第16次社群图文技术直播文字回放:本次直播由大数据后台开发工程师David,为大家带来了主题分享《Hudi 实践 | 使用 Apache Hudi 构建下一代数据湖》,主要内容包括:什么是数据湖?Apache Hudi 核心优势、Apache Hudi 设计与架构最强解读、Apache Hudi 最佳实践。

Data Lakehouse产生的背景

过去十年随着物联网、云应用、社交媒体和机器学习的发展,公司收集的数据量呈指数级增长,同时对高质量数据的需求从几天和几小时的频率变为几分钟甚至几秒钟的时间。

数年来数据湖作为存储原始和丰富数据的存储库发挥了重要作用。但是随着它们的成熟,企业意识到维护高质量、最新和一致的数据是非常复杂的。除了摄取增量数据的复杂性之外,填充数据湖还需业务环境和高度依赖批处理。以下是现代数据湖的主要挑战:

  • 基于查询的变更数据捕获:提取增量源数据的最常见方法是依赖定义过滤条件的查询。当表没有有效字段来增量提取数据时,在源数据库上添加额外负载或无法捕获数据库的每一次变更,基于查询的CDC不包括已删除的记录,因为没有简单的方法来确定是否已删除了记录。基于日志的CDC是首选方法,可以解决上述挑战。本文将进一步讨论该方法。
  • 数据湖中的增量数据处理:负责更新数据湖的ETL作业必须读取数据湖中的所有文件进行更改,并将整个数据集重写为新文件(因为没有简单的方法更新记录所在的文件)。
  • 缺少对ACID事务的支持:如果同时存在读写,不遵从ACID事务会导致结果不一致。

数据体量的增加和保持最新数据使上述挑战更加复杂。Uber、Databricks和Netflix提出了旨在解决数据工程师面临的挑战的解决方案的数据湖处理框架,旨在在分布式文件系统(例如S3、OSS或HDFS)上的数据湖中执行插入和删除操作。下一代Data Lakes旨在以可扩展性、适应性和可靠的方式提供最新数据,即Data Lakehouse。

什么是Data Lakehouse

简而言之:Data Lakehouse = Data Lake + Data Warehouse

传统数据仓库旨在提供一个用于存储已针对特定用例/数据进行了转换/聚合的历史数据平台,以便与BI工具结合使用获取见解。通常数据仓库仅包含结构化数据,成本效益不高,使用批处理ETL作业加载。

Data Lakes 可以克服其中一些限制,即通过低成本存储支持结构化,半结构化和非结构化数据,以及使用批处理和流传输管道。与数据仓库相比,数据湖包含多种存储格式的原始数据,可用于当前和将来的用例。但是数据湖仍然存在局限性,包括事务支持(很难使数据湖保持最新状态)和ACID合规性(不支持并发读写)。

数据湖中心可利用S3,OSS,GCS,Azure Blob对象存储的数据湖低成本存储优势,以及数据仓库的数据结构和数据管理功能。支持ACID事务并确保并发读取和更新数据的一致性来克服数据湖的限制。此外与传统的数据仓库相比,Lakehouse能够以更低的延迟和更高的速度消费数据,因为可以直接从Lakehouse查询数据。

Lakehouse的主要特性如下

  • 事务支持
  • Schema enforcement and governance(模式实施和治理)
  • BI支持
  • 存储与计算分离
  • 开放性
  • 支持从非结构化数据到结构化数据的多种数据类型
  • 支持各种工作负载
  • 端到端流

为了构建Lakehouse,需要一个增量数据处理框架,例如Apache Hudi。

什么是Apache Hudi
简介

Apache Hudi(简称:Hudi)使得您能在hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:

  • Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询会处理最后一个提交的快照,并基于此输出结果。
  • 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势(类别)。

这些原语紧密结合,解锁了基于DFS抽象的流/增量处理能力。如果您熟悉流处理,那么这和从kafka主题消费事件,然后使用状态存储逐步累加中间结果类似。

设计原则

  • 流式读/写:Hudi借鉴了数据库设计的原理,从零设计,应用于大型数据集记录流的输入和输出。为此,Hudi提供了索引实现,可以将记录的键快速映射到其所在的文件位置。同样,对于流式输出数据,Hudi通过其特殊列添加并跟踪记录级的元数据,从而可以提供所有发生变更的精确增量流。
  • 自管理:Hudi注意到用户可能对数据新鲜度(写友好)与查询性能(读/查询友好)有不同的期望,它支持了三种查询类型,这些类型提供实时快照,增量流以及稍早的纯列数据。在每一步,Hudi都努力做到自我管理(例如自动优化编写程序的并行性,保持文件大小)和自我修复(例如:自动回滚失败的提交),即使这样做会稍微增加运行时成本(例如:在内存中缓存输入数据已分析工作负载)。如果没有这些内置的操作杠杆/自我管理功能,这些大型流水线的运营成本通常会翻倍。
  • 万物皆日志:Hudi还具有 append only、云数据友好的设计,该设计实现了日志结构化存储系统的原理,可以无缝管理所有云提供商的数据。
  • 键-值数据模型:在写方面,Hudi表被建模为键值对数据集,其中每条记录都有一个唯一的记录键。此外,一个记录键还可以包括分区路径,在该路径下,可以对记录进行分区和存储。这通常有助于减少索引查询的搜索空间。

表设计

了解了Hudi项目的关键技术动机后,现在让我们更深入地研究Hudi系统本身的设计。在较高的层次上,用于写Hudi表的组件使用了一种受支持的方式嵌入到Apache Spark作业中,它会在支持DFS的存储上生成代表Hudi表的一组文件。然后,在具有一定保证的情况下,诸如Apache Spark、Presto、Apache Hive之类的查询引擎可以查询该表。 

Hudi表的三个主要组件:

  • 有序的时间轴元数据。类似于数据库事务日志。
  • 分层布局的数据文件:实际写入表中的数据。
  • 索引(多种实现方式):映射包含指定记录的数据集。

Hudi提供了以下功能来对基础数据进行写入、查询,这使其成为大型数据湖的重要模块:

  • 支持快速,可插拔索引的upsert();
  • 高效、只扫描新数据的增量查询;
  • 原子性的数据发布和回滚,支持恢复的Savepoint;
  • 使用mvcc(多版本并发控制)风格设计的读和写快照隔离;
  • 使用统计信息管理文件大小;
  • 已有记录update/delta的自管理压缩;
  • 审核数据修改的时间轴元数据;
  • 满足GDPR(通用数据保护条例)、数据删除功能。

表类型

  • Copy On Write表 

COW表写的时候数据直接写入basefile,(parquet)不写log文件。所以COW表的文件片只包含basefile(一个parquet文件构成一个文件片)。这种的存储方式的Spark DAG相对简单。关键目标是是使用partitioner将tagged Hudi记录RDD(所谓的tagged是指已经通过索引查询,标记每条输入记录在表中的位置)分成一些列的updates和inserts.为了维护文件大小,我们先对输入进行采样,获得一个工作负载profile,这个profile记录了输入记录的insert和update、以及在分区中的分布等信息。把数据从新打包,这样:1)  对于updates, 该文件ID的最新版本都将被重写一次,并对所有已更改的记录使用新值2)  对于inserts.记录首先打包到每个分区路径中的最小文件中,直到达到配置的最大大小。之后的所有剩余记录将再次打包到新的文件组,新的文件组也会满足最大文件大小要求。

  • Merge On Read表

MOR表写数据时,记录首先会被快速的写进日志文件,稍后会使用时间轴上的压缩操作将其与基础文件合并。根据查询是读取日志中的合并快照流还是变更流,还是仅读取未合并的基础文件,MOR表支持多种查询类型。在高层次上,MOR writer在读取数据时会经历与COW writer 相同的阶段。这些更新将追加到最新文件篇的最新日志文件中,而不会合并。

对于insert,Hudi支持两种模式:1)插入到日志文件:有可索引日志文件的表会执行此操作(HBase索引);2)插入parquet文件:没有索引文件的表(例如布隆索引)与写时复制(COW)一样,对已标记位置的输入记录进行分区,以便将所有发往相同文件id的upsert分到一组。这批upsert会作为一个或多个日志块写入日志文件。Hudi允许客户端控制日志文件大小。对于写时复制(COW)和读时合并(MOR)writer来说,Hudi的WriteClient是相同的。几轮数据的写入将会累积一个或多个日志文件。这些日志文件与基本的parquet文件(如有)一起构成一个文件片,而这个文件片代表该文件的一个完整版本。这种表是用途最广、最高级的表。为写(可以指定不同的压缩策略,吸收突发写流量)和查询(例如权衡数据的新鲜度和查询性能)提供了很大的灵活性。同时它包含一个学习曲线,以便在操作上掌控他。 

查询

鉴于这种灵活而全面的数据布局和丰富的时间线,Hudi能够支持三种不同的查询表方式,具体取决于表的类型。

  • 快照查询

可查看给定delta commit或者commit即时操作后表的最新快照。在读时合并(MOR)表的情况下,它通过即时合并最新文件片的基本文件和增量文件来提供近实时表(几分钟)。对于写时复制(COW),它可以替代现有的parquet表(或相同基本文件类型的表),同时提供upsert/delete和其他写入方面的功能。

  • 增量查询

可查看自给定commit/delta commit即时操作以来新写入的数据。有效的提供变更流来启用增量数据管道。

  • 读优化查询

可查看给定的commit/compact即时操作的表的最新快照。仅将最新文件片的基本/列文件暴露给查询,并保证与非Hudi表相同的列查询性能。

使用用例

  • 使用变更数据捕获来更新/删除目标数据

变更数据捕获(CDC)是指识别和捕获源数据库中所做的变更。它将更改从源数据库复制到目的端(在本例中为数据湖)。这对于将插入、更新和删除操作捕获到目标表中特别重要。

基于日志的CDC为将更改从源数据库复制到数据湖库是最好的方案,因为它能够捕获已删除的记录,而这些记录无法使用基于查询的CDC捕获,因为如果在源数据中不存在删除标志,则无法捕获。

Hudi可以通过在现有数据集中找到适当的文件并重写它们来处理更改数据捕获。此外它还提供了基于特定时间点查询数据集的功能,并提供了时间旅行。

使用CDC工具(例如Oracle GoldenGate,Qlik Replicate(以前称为Attunity Replicate)和DMS)进行近实时数据摄取非常通用,将这些变更应用于现有数据集至关重要。

  • 隐私条例

GDPR等最新的隐私法规要求公司必须具有记录级别的更新和删除功能。通过支持Hudi数据集中的删除,可以大大简化针对特定用户或在特定时间范围内更新或删除信息的过程。

Hudi的优势

  • 解决数据质量问题,例如重复记录,延迟到达的更新等,这些问题通常在传统的增量批处理ETL管道中存在。
  • 提供对实时管道的支持。
  • 异常检测,机器学习用例,实时检测等
  • 创建可用于跟踪更改的机制(时间轴)。
  • 提供对通过Hive和Presto进行查询的原生支持。


解决方案案例

使用基于日志的CDC工具(Oracle GoldenGate),Apache Kafka和增量数据处理框架(在AWS上运行的Apache Hudi),在AWS S3上构建了数据湖,以减少延迟,改善数据质量并支持ACID事务。

架构

环境

  • Oracle大数据Oracle GoldenGate:19c
  • Confluent Kafka:5.5.0
  • Apache Spark:2.4.3
  • Apache Hudi:0.5.3

Oracle GoldenGate被用作基于日志的CDC工具从源系统日志中提取数据,日志实时地复制到Kafka,从中读取消息并以Hudi格式写入数据湖。

Apache Hudi与AWS EMR和Athena集成,是增量数据处理框架的理想选择。

实施步骤

  • 使用Oracle GoldenGate复制源数据
  • 在Kafka中捕获复制的数据
  • 从Kafka读取数据并以Hudi格式写入S3

结论

该方案成功解决了传统数据湖所面临的挑战:

  • 基于日志的CDC是用于捕获数据库事务/事件的更可靠的机制。
  • Apache Hudi负责(以前由数据平台所有者所有)通过管理大规模Lakehouse所需的索引和相关元数据来更新数据湖库中的目标数据。
  • ACID事务的支持消除了并发操作的麻烦,因为Apache Hudi API支持并发读写,而不会产生不一致的结果。

随着越来越多的企业采用数据平台并增强其数据分析/机器学习功能,必须为服务于数据的基础CDC工具和管道的重要性不断发展以应对一些最普遍面临的挑战。数据延迟和交付给下游消费者的数据的整体质量的改善表明Data Lakehouse范式是下一代数据平台。这将成为企业从其数据中获得更大价值和洞察力的基础。

留言

评论

${{item['author_name']}} 回复 ${{idToContentMap[item.parent] !== undefined ? idToContentMap[item.parent]['author_name'] : ''}} · ${{item.date.slice(0, 10)}} 回复

暂时还没有一条评论.