Analyzing and Comparing Lakehouse Storage Systems

背景

What Is A Lakehouse

Lakehouse是基于开放格式的数据管理系统,它运行在低成本存储上,提供丰富的管理功能,比如事务,数据版本控制,审计和索引等功能,同时可提供多个计算引擎访问。Lakehouse架构不仅被Databricks,还被各大公司(Meta,Uber,Netflix等)和云厂商(Redshift,EMR,Dataproc,Synapse)广泛采用。与Data Lake相比,lakehouse提供了丰富的数据管理功能,比如事务和元数据管理,与传统Data Warehouse相比,lakehouse让数据可以被任何引擎直接使用,比如BI,ML或者DS工具。目前,业界的Lakehouse系统基本上都是基于Hudi,Iceberg和Delta Lake三种表格式来实现的。

graph TD
L1-1(Compute Tier 

SQL Engine: Spark, Presto ... ML Engine: Tensorflow, Pytorch ... Others: BI, Data Science ...) --> L2 L2(Lakehouse Data Management Tier

Table Format: Apache Hudi, Apache Iceberg, Delta Lake

Providing Transactions, Metadata, Indexing, etc) L2 --> L3 L3(Storage Tier

Open Format: Parquet, ORC, AVRO

Data Lake: AWS S3, Microsoft Azure Storage, Google Cloud Storage)
classDiagram
direction BT
class Evolving
class Experimental
class PartitionScanTask {
<>
}
class Scan {
<>
}
class SparkBatchQueryScan
class SparkPartitioningAwareScan~T~
class SparkScan
class SupportsReportPartitioning {
<>
}
class SupportsReportStatistics {
<>
}
class SupportsRuntimeFiltering {
<>
}

Evolving  ..  Scan 
SparkBatchQueryScan  -->  SparkPartitioningAwareScan~T~ 
SparkBatchQueryScan  ..>  SupportsRuntimeFiltering 
SparkPartitioningAwareScan~T~  ..>  PartitionScanTask 
SparkPartitioningAwareScan~T~  -->  SparkScan 
SparkPartitioningAwareScan~T~  ..>  SupportsReportPartitioning 
SparkScan  ..>  Scan 
SparkScan  ..>  SupportsReportStatistics 
Evolving  ..  SupportsReportPartitioning 
SupportsReportPartitioning  -->  Scan 
Evolving  ..  SupportsReportStatistics 
SupportsReportStatistics  -->  Scan 
Experimental  ..  SupportsRuntimeFiltering 
SupportsRuntimeFiltering  -->  Scan

Lakehouse系统面临的挑战

如上所述,首先,Lakehouse系统运行在低成本的data lake上,与传统的数据仓库相比,有相对较高的延迟,弱事务保证;然后,Lakehouse系统旨在支持更广泛的工作负载,从PB级大规模的ELT工作负载到亚秒级延迟的小表查询;最后,Lakehouse系统旨在提供开放接口给各个计算引擎使用。这些挑战给上述三种Lakehouse系统带来设计上的权衡,关键的挑战与设计问题包括:

  • How to coordinate transactions
  • Where to store metadata & how to query metadata
  • How to efficiently handle updates

Lakehouse Systems Design

上文提到,业界的Lakehouse Systems基本上都是基于Hudi,Iceberg和Delta Lake三种表格式来实现的,他们面临基本相同的挑战,在设计上有类似和不同的选择,按照事务协调,元数据管理和更新策略来对比他们设计上的选择。

Transaction Coordination

三种系统都宣称对单表的读写提供ACID保证,但是每个系统的事务实现与保证都不相同,如下表所示。

Table Metadata Transaction Atomicity Isolation Levels
Delta Lake Transaction Log + Metadata Checkpoints Atomic Log Appendes Write serializable, Serializable
Hudi Transaction Log + Metadata Table Table-Level Lock Snapshot Isolation
Iceberg Hierarchical Files Table-Level Lock Snapshot Isolation, Serializable

Delta Lake,Hudi和Iceberg都是使用MVCC(Multi-Version Concurrency Control)实现事务。元数据结构定义了哪些文件版本属于该表。当事务开始时,它会读取此元数据结构以获得表的快照,然后从该快照执行所有读取。事务通过原子更新元数据结构来提交。Delta Lake依赖底层存储服务通过put-if-absent等操作提供原子性(通过 DynamoDB 协调不支持合适操作的存储层),而Hudi和Iceberg使用 ZooKeeper中实现的表级锁、Hive MetaStore或 DynamoDB(AWS CTO在其博客文章Diving Deep on S3 Consistency博客上提到,Netflix使用S3mper,通过DynamoDB as a consistent store做协调,也提到S3要支持强一致了)。

为了提供事务之间的隔离,Delta Lake、Hudi 和 Iceberg都使用了乐观并发控制。Iceberg和Hudi都采用了Adya术语快照隔离:事务始终从在它们开始时有效的已提交数据的快照中读取数据,并且只有在提交时没有已提交事务写入它们打算写入的数据时才能提交。Delta Lake与Iceberg都提供了Serializable的隔离级别,Iceberg默认是Snapshot Isolation,Delta Lake默认是其特有的Write serializable隔离级别

Metadata Management

查询规划时需要访问表的元数据(比如,所有表文件的names,sizes,每个文件的列统计信息),对象存储速度限制了查询速度,比如,S3的LIST操作一次只返回1000个keys。三种Lakehouse系统都将元数据文件在与实际数据文件一起存储。Listing元数据文件(数量比数据文件少得多)并从中读取元数据比直接从S3 listing数据文件(传统数据湖元数据在数据文件中)可以缩短查询规划的时间。 使用了两种元数据组织格式:表格格式和分层格式。

Tabular Format Hierarchical Format
Delta Lake, Hudi Iceberg

Each table’s metadata stored in
a separate Parquet/Avro table.
Each table’s metadata stored in a tree
of manifest files (in Avro).

Query planning is distributed. Queries planned on a single node.

Delta Lake和Hudi采用tabular format,lakehouse表的元数据存储在另一个特殊的表中:Hudi的元数据表和Delta Lake的transaction log checkpoint(Parquet和JSON格式的组合)。事务不直接写这种表,而是写日志记录,周期性的使用MOR(merge-on-read)策略将其合并到元数据表中。Iceberg采用hierarchical format,元数据分层存储在manifest文件中,底层中的每个文件存储一组数据文件的元数据,上层中的每个文件包含下层中一组清单文件的聚合元数据。这类似于表格格式,但上层充当表格索引。

ice

批处理作业必须扫描元数据表以查找查询中涉及的所有文件,以便将它们用于查询规划。Delta Lake和Hudi采用分布式的方式进行查询规划,而Iceberg在单个节点上进行查询规划,该节点使用清单层次结构的上层作为索引,以最大限度地减少它必须对下层进行的读取次数

Data Update Strategies

这三种表格式都支持CoW策略,因为大多数lakehouse工作负载都支持高读取性能。Iceberg和Hudi目前支持MoR,Delta也计划支持它Iceberg(以及未来的 Delta)MoR 实现使用辅助“墓碑”文件来标记Parquet/ORC数据文件中的记录。在查询时,这些逻辑删除记录被过滤掉。记录更新是通过对现有记录进行逻辑删除并将更新后的记录写入Parquet/ORC 文件来实现的。相比之下,MoR的Hudi实现将所有记录级别的插入、删除和更新存储在基于行的 Avro 文件中。查询时Hudi在从Parquet文件读取数据时协调这些更改。值得一提的事,Hudi 默认情况下会通过键对摄取的数据进行重复数据删除和排序,因此即使在使用 MoR 时也会产生额外的写入延迟。

Copy On Write Merge On Read
Delta Lake, Iceberg, Hudi Iceberg, Hude, Delta Lake (coming)

Identify files containing records that
need updates, then eagerly rewrite them.
Write changes to auxiliary files,
reconcile at query time.

High write amplification, no read amplification. Low write amplification, high read amplification.

Benchmarking Lakehouse Systems

Load and Query Performance

Delta、Hudi和Iceberg在3TB TPC-DS上的端到端比较,将每个查询运行三次并报告运行时间中值。结果显示,Hudi的数据加载速度几乎慢了10倍。这是因为Hudi 针对keyed upsert更新插入进行了优化,而不是批量数据摄取,并且在数据加载期间进行了耗时的预处理,包括密钥唯一性检查和密钥重新分配。 总体而言,TPC-DS在Delta Lake上的查询速度比在Hudi上快1.4倍,在Delta Lake 上比在Iceberg上快 1.7 倍。

3tb-tpcds-dec2022

重点分析两个重要的查询(Q90 & Q67),在Q90这个查询上,Delta Lake与Hudi比有特别大的性能优势。Q67是总体上最重的查询(占总运行时间的8%)。Spark在三种表格式上为这个两个查询生成了相同的查询计划,这3种lakehouse表格式之间的性能差异几乎完全由数据读取时间决定的

Table Format Q90 read cost (min)
Delta Lake 6.5
Iceberg 18.6
Hudi 18.8

Delta Lake优于Hudi,因为Hudi表的目标文件较小。 例如,Store Sales表的单个分区存储在Delta Lake中的一个128 MB文件中,但在Hudi中存储了22个8.3 MB文件。 这降低了列压缩的效率,并增加了TPC-DS 中常见的大表扫描的开销;例如,要读取Q90中的Web Sales Table,在Delta Lake中读取2128个文件(138 GB),在Hudi中必须读取18443个文件(186 GB)。

Delta Lake优于Iceberg,虽然两者读取的字节数相同,Iceberg在Spark中使用定制的Parquet Reader,它比Delta Lake和Hudi使用的默认Spark Parquet Reader慢很多(因为Iceberg对列删除和重命名的支持所需的定制化功能不在Apache Spark的内置 Parquet Reader中,所以Iceberg使用其定制的reader)。

对于Q68这样非常小的查询,性能瓶颈往往是查询规划中使用的元数据操作。由于我们报告的结果是连续三个运行的中值,这些查询在Hudi中是最快的,因为它缓存了查询计划。此外,由于Iceberg 使用Spark Data Source v2 (DSv2) API而不是Delta Lake和Hudi使用的Data Source v1 API (DSv1),因此Spark偶尔会对存储在Iceberg中的数据生成与存储在Delta Lake或Hudi中的数据不同的查询计划。DSv2 API不太成熟,不会报告一些对查询计划有用的指标,因此这通常会导致Iceberg的查询计划性能较低。例如,在Q9中,Spark在Delta Lake和Hudi 中优化了一个具有cross-join的复杂聚合,但在 Iceberg中却没有,这导致了所有TPC-DS中最大的相对性能差异。

Update Strategies

为了评估MoR于CoW更新策略的性能,运行了一个基于TPC-DS端到端的data refresh maintenance基准测试,以及一个综合的具有不同合并源大小的微基准测试。

TPC-DS Refresh Benchmark

TPC-DS基准规范提供了一组模拟数据仓库维护的数据刷新操作。针对100 GB TPC-DS数据集评估了 Delta、Hudi和Iceberg。在所有系统中测试CoW,在Hudi和Iceberg中测试 MoR,因为Delta 2.2.0没有实现 MoR。该基准测试过程如下,

  1. 首先加载100 GB TPC-DS基础数据集
  2. 然后运行五个示例查询(Q3、Q9、Q34、Q42 和 Q59)
  3. 使用MERGE INTO操作更新行,共运行10次刷新(每次刷新原始数据集的3%)
  4. 它在更新的表上重新运行五个示例查询。

tpcds-refresh-dec2022

结果如上图所示,显示了每个系统中刷新基准测试每个阶段的延迟,Hudi和Delta结果使用默认的EMR配置运行,没有任何变化。Iceberg 1.1.0 MoR在此基准测试中始终遇到S3连接超时错误,导致运行时间非常长。使用MoR的Iceberg 0.14.0后,它在连接限制增加的情况下表现良好。

  • Hudi MoR中的合并比Hudi CoW中的合并快 1.3倍,代价是合并后查询速度慢3.2倍
    -Hudi CoW和MoR 在初始加载期间的写入性能都很差,这是由于额外的预处理,以按key分布数据并重新平衡写入文件大小
  • Delta在合并和读取方面的性能具有竞争力,尽管仅使用 CoW,这是由于生成更少的文件、更快的扫描和更优化的MERGE命令的组合
  • Iceberg 版本 0.14.0中带有MoR的合并比CoW快1.4 倍
  • 表模式之间(MoR vs CoW)的合并后查询性能保持相似

Merge Microbenchmark

生成的TPC-DS 刷新数据没有可配置的比例参数,为了更好地理解刷新大小对合并和查询性能的影响,使用独立的微基准测试Iceberg CoW 1.1.0和Iceberg MoR 1.1.0。 加载一个包含四列的合成表,然后从一个随机抽样的表中应用单个合并,该表具有基表大小的可配置部分。除了合并操作的延迟之外,还比较了合并后查询的减速。对于评估的每个合并规模,50% 的行是插入行,50%是更新行。

micro-merge-dec2022

上图显示了100 GB基准测试的结果。此测试使用单个表中包含四列的合成数据集。在这次运行中,生成了一个 100GB 的数据集,比较了一系列更新大小的合并时间和合并后查询时间。更新大小的范围从数据集大小的0.0001%到数据集大小的0.1%。MoR合并时间在更新100,000行时开始优于CoW。正如预期的那样,由于读取放大,合并后的查询延迟对于读取时合并比写入时复制要高得多。

Impact of Distributed Metadata Processing

为了评估Lakehouse元数据处理策略对存储在许多文件中的大表的性能影响。我们生成TPC-DS数据(来自 store_sales表)并将其存储在Delta Lake和Iceberg的不同数量的10 MB文件(1K到 200K文件,或10 GB到2 TB的总数据)中。选择这两个系统来对比它们不同的元数据访问策略:Delta Lake通过在Spark上运行分布式元数据处理,而Iceberg在单个节点上运行它。为了隔离元数据操作的影响,我们使用了三个具有高选择性的查询:一个访问单个行,一个访问单个分区,一个只访问包含特定值的行(这允许两个系统最小化使用扫描的文件数量zone maps)。测量查询启动时间(定义为提交查询到第一个数据扫描作业开始执行之间经过的时间)和总查询执行时间。 所有测量值均取自热启动,作为三个运行的中值。在下图中绘制了结果。对于这些选择性查询,元数据访问策略对性能有很大影响,并且是更大表的瓶颈。具体来说,虽然Iceberg的单节点查询计划对于较小的表表现更好,但Delta Lake的分布式计划扩展性更好,并且对于 200K 文件表的性能提高了7-12 倍。

large-file-count-dec2022

References