Apache Iceberg: An Architectural Look Under the Covers

Introduction

数据湖的构建旨在实现数据的民主化,即允许越来越多的人、工具和应用程序利用越来越多的数据。实现这一目标所需的关键能力是将底层数据结构和物理数据存储的复杂性对用户隐藏起来。之前实现这一目标的事实上的标准是Hive表格式,但其在数据、用户和应用程序规模方面存在不足。那么答案是什么呢?答案是Apache Iceberg。

在本文中,我们将介绍以下内容:

  • 表格格式的定义,因为传统上表格格式的概念被嵌入在“Hive”的范畴之下,是隐含的。
  • 长期以来事实上的标准——Hive表格格式的详细信息,包括其优点和缺点。我们将看到这些问题如何导致了对全新表格格式定义的需求。
  • Apache Iceberg表格格式是如何应对这一需求而创建的。我们还将深入探讨Iceberg表格的架构结构,包括从规范的角度和逐步查看Iceberg表格中进行创建、读取、更新和删除(CRUD)操作时发生的情况。
  • 最后,我们将展示这种架构如何实现设计的结果所带来的好处。

What’s a Table Format?

一个定义表格格式的好方法是将数据集的文件组织起来,以将它们呈现为一个单一的“表格”。从用户的角度来看,另一个相对简单的定义是回答“这个表格中有什么数据?”这个问题的方式。对于这个问题的一个单一答案,允许多个人、团体和工具同时与表格中的数据进行交互,无论是向表格写入数据还是从表格读取数据。表格格式的主要目标是为人们和工具提供表格的抽象,并允许它们高效地与表格的底层数据进行交互。

A Brief History

在2009年,Facebook意识到虽然Hadoop解决了许多需求,如规模和成本效益,但在改善数据对非技术专家的民主化方面仍存在一些问题:

  1. 任何想使用数据的用户都必须弄清楚如何将他们的问题适应MapReduce编程模型,然后编写Java代码来实现它。
  2. 没有定义数据集的元数据,比如其模式。

为了让更多用户使用数据并解决这些问题,他们构建了Hive。为了解决问题1,他们意识到需要提供更通用的编程模型和人们熟悉的语言来访问数据,即SQL。他们将构建Hive,将用户的SQL查询转换为MapReduce作业,以便他们能够获得答案。作为解决问题1的要求以及解决问题2的需求,需要定义数据集的schema以及如何在用户的SQL查询中以表格的形式引用该数据集。为了满足第二个需求,定义了Hive表格格式(通过白皮书中的3个要点和Java实现),并且自那时以来一直成为事实上的标准。

The Hive Table Format

在Hive表格格式中,一个表格被定义为一个或多个目录的全部内容,实际上就是一个或多个目录的ls结果。对于非分区表格,这是一个单独的目录。对于分区表格,在实际世界中更常见,表格由许多目录组成,每个分区一个目录。组成表格的数据在目录级别进行跟踪,并且这种跟踪是在Hive元数据存储中完成的。分区值通过目录路径来定义,形式为/path/to/table/partition_column=partition_value

下面是一个按照列k1和k2进行分区的Hive表格的示例架构图。

hive-table-format

Pros

  • 它与几乎所有处理引擎兼容,因为它是当时唯一的表格格式——自从大数据的广泛采用以来,它一直是事实上的标准。
  • 多年来,它不断发展并提供了机制,使得Hive表格能够提供比每次查询都进行全表扫描更高效的访问模式,例如分区和桶
  • 它与文件格式无关,这使得公司和社区能够开发更适合分析的文件格式(例如Parquet、ORC),并且不需要在将数据放入Hive表格之前进行转换(例如Avro、CSV/TSV)。
  • Hive metastore,存储以Hive表格式排列(layout
    )的表为所有需要与表格进行交互的工具生态系统提供了一个单一的中心答案,即“这个表格中有什么数据?”无论是在读取方面还是写入方面。
  • 它提供了以整个分区为单位对表格中的数据进行原子级别更改的能力,通过Hive元数据存储库中的原子交换,从而实现了对外界的一致视图。

Cons

  1. 当在更大规模的数据、用户和应用程序中使用Hive表格式时,许多问题变得越来越严重
    • 数据的更改效率低下:由于分区存储在事务性存储中(Hive元数据存储库,由关系型数据库支持),您可以以事务性的方式添加和删除分区。然而,由于文件的跟踪是在不提供事务性能力的文件系统中进行的,因此无法以事务性的方式在文件级别上添加和删除数据。
    • 一般的解决方法是在分区级别上解决这个问题,通过在后台将整个分区复制到新位置,同时在复制分区时进行更新/删除操作,然后更新元数据存储库中分区的位置为新位置。
    • 这种方法效率低下,特别是当分区很大、在分区中更改的数据量相对较小和/或频繁进行更改时。
  2. 没有一种安全的方法可以在一个操作中安全地更改多个分区中的数据
    • 因为唯一具有事务一致性的操作是交换单个分区,所以无法以一致的方式同时更改多个分区中的数据。即使是像将文件添加到两个分区这样简单的操作也无法以事务一致的方式完成。因此,用户看到的是一个不一致的世界观,他们在做出正确决策时遇到问题,并且对数据的可信度也存在问题。
  3. 在实践中,多个作业同时修改同一数据集是不安全的操作
    • 在表格格式中,没有一种被广泛采用的方法来处理多个进程/人同时更新数据的情况。虽然有一种方法,但它非常限制,并且会引发问题,只有Hive才能遵守。这要么导致对谁可以何时写入数据进行严格控制,这需要组织自行定义和协调,要么导致多个进程同时更改数据,从而导致数据丢失,因为最后一次写入的数据会覆盖之前的写入。
  4. 对于大型表格,获取所有目录列表需要很长时间
    • 因为您没有一个列出所有分区目录中文件的列表,所以需要在运行时获取这个列表。获取所需的所有目录列表的响应通常需要很长时间,比如查询规划的时候。
  5. 用户必须了解表格的物理布局
    • 如果一个表格按事件发生的时间进行分区,通常会使用多级分区——首先是事件的年份,然后是事件的月份,然后是事件的日期,有时还会有更低的粒度。但是,当用户面对事件时,以获取某个时间点之后的事件的直观方式看起来像是 WHERE event_ts >= '2021-05-10 12:00:00'。在这种情况下,查询引擎会进行全表扫描,这比使用可用的分区剪枝来限制数据的时间要长得多。
    • 这种全表扫描发生的原因是事件的时间戳(用户所知道的2021-05-10 12:00:00)与物理分区方案(年份=2021,然后月份=05,然后日期=10)之间没有映射关系。
    • 相反,所有用户都需要了解分区方案,并将查询编写为 WHERE event_ts >= '2021-05-10 12:00:00' AND event_year >= '2021' AND event_month >= '05' AND (event_day >= '10' OR event_month >= '06')(如果要查看2020年5月之后的事件,这个分区剪枝查询会变得更加复杂)。
  6. Hive表格的统计信息通常是过时的
    • 因为表格统计信息是在异步周期性读取作业中收集的,所以统计信息经常是过时的。此外,由于收集这些统计信息需要昂贵的读取作业,需要大量的扫描和计算,这些作业很少运行,如果有的话。
    • 由于这两个方面,Hive中的表格统计信息通常是过时的,如果存在的话,这会导致优化器选择计划不佳,甚至有些引擎完全忽略Hive中的任何统计信息。
  7. 文件系统布局在云对象存储上的性能较差
    • 每当您想要读取一些数据时,云对象存储(例如S3、GCS)的架构规定这些读取应该具有尽可能多的不同前缀,以便由云对象存储中的不同节点处理。然而,在Hive表格格式中,一个分区中的所有数据具有相同的前缀,通常您会读取一个分区中的所有数据(或者至少读取一个分区中的所有Parquet/ORC页脚),这些都会命中同一个云对象存储节点,降低了读取操作的性能。

在Hive表格格式上再加上一些临时措施并不是解决方案,而是需要一种新的表格格式。

如何解决这些问题的呢?

Hive表格式的大部分问题都源于一个看起来可能相当次要的方面,但最终产生了重大后果——表格中的数据是以文件夹级别进行跟踪的。解决Hive表格格式引发的主要问题的关键是改为以文件级别跟踪表格中的数据。他们不再将表格指向一个目录或一组目录,而是将表格定义为文件的规范列表。文件级别的跟踪不仅可以解决他们在Hive表格格式中遇到的问题,还可以为实现更广泛的分析目标奠定基础:

  • 提供始终正确和始终一致的表格视图
  • 实现更快的查询计划和执行
  • 为用户提供良好的响应时间,而无需他们了解数据的物理布局
  • 实现更好和更安全的表格演进
  • 在数据、用户和应用规模上实现上述所有目标

The Iceberg Table Format

What Iceberg is What Iceberg is not
- A table format specification
- A set of APIs and libraries for
engines to interact with tables
following that specification
- A storage engine
- An execution engine
- A service

Iceberg表的架构

image-ice-arc

Iceberg组件

现在,让我们逐个介绍上图中的每个组件。在介绍它们的同时,我们还将逐步介绍SELECT查询通过这些组件读取Iceberg表中的数据的过程。您将在下面用带有此图标标记的方框中看到这些步骤:

Iceberg表的架构包括3个层级:

  • Iceberg Catalog
  • Metadata Layer,包含元metadata file,manifest list和manifest file
  • Data Layer

image-ice-1

Iceberg Catalog

读取表格第一步是找到当前元数据指针的位置,当前元数据指针在Iceberg Catalog。Iceberg Catalog的主要要求是必须支持原子操作来更新当前元数据指针(例如,HDFS、Hive Metastore、Nessie)。这就是允许Iceberg表上的事务具有原子性并提供正确性保证的原因。在Catalog中,对于每个表格,都有一个引用或指针指向该表格的当前元数据文件。例如,在上面显示的图表中,有2个元数据文件。目录中表格的当前元数据指针的值是右侧元数据文件的位置。这些数据的具体形式取决于使用的Iceberg目录是什么。以下是一些示例:

  • 使用HDFS Catalog时,表格的元数据文件夹中有一个名为version-hint.text的文件,其内容是当前元数据文件的版本号。
  • 使用Hive Metastore Catalog时,元数据存储中的表格条目具有一个表属性,用于存储当前元数据文件的位置。
  • 使用Nessie Catalog时,Nessie存储了表格的当前元数据文件的位置。

是的,当一个SELECT查询读取一个Iceberg表时,查询引擎首先访问Iceberg目录,然后检索要读取的表格的当前元数据文件位置的条目,然后打开该文件。

Metadata file

正如名称所示,元数据文件存储有关表格的元数据。这包括表格的schema信息、分区信息、快照以及哪个快照是当前快照。虽然上面是一个简化的示例,用于说明目的,但以下是一个完整的元数据文件内容示例:

v3.metadata.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
{
"format-version" : 1,
"table-uuid" : "4b96b6e8-9838-48df-a111-ec1ff6422816",
"location" : "/home/hadoop/warehouse/db2/part_table2",
"last-updated-ms" : 1611694436618,
"last-column-id" : 3,
"schema" : {
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : true,
"type" : "int"
}, {
"id" : 2,
"name" : "ts",
"required" : false,
"type" : "timestamptz"
}, {
"id" : 3,
"name" : "message",
"required" : false,
"type" : "string"
} ]
},
"partition-spec" : [ {
"name" : "ts_hour",
"transform" : "hour",
"source-id" : 2,
"field-id" : 1000
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "ts_hour",
"transform" : "hour",
"source-id" : 2,
"field-id" : 1000
} ]
} ],
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "hadoop"
},
"current-snapshot-id" : 1257424822184505371,
"snapshots" : [ {
"snapshot-id" : 8271497753230544300,
"timestamp-ms" : 1611694406483,
"summary" : {
"operation" : "append",
"spark.app.id" : "application_1611687743277_0002",
"added-data-files" : "1",
"added-records" : "1",
"added-files-size" : "960",
"changed-partition-count" : "1",
"total-records" : "1",
"total-data-files" : "1",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "/home/hadoop/warehouse/db2/part_table2/metadata/snap-8271497753230544300-1-d8a778f9-ad19-4e9c-88ff-28f49ec939fa.avro"
},
{
"snapshot-id" : 1257424822184505371,
"parent-snapshot-id" : 8271497753230544300,
"timestamp-ms" : 1611694436618,
"summary" : {
"operation" : "append",
"spark.app.id" : "application_1611687743277_0002",
"added-data-files" : "1",
"added-records" : "1",
"added-files-size" : "973",
"changed-partition-count" : "1",
"total-records" : "2",
"total-data-files" : "2",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "/home/hadoop/warehouse/db2/part_table2/metadata/snap-1257424822184505371-1-eab8490b-8d16-4eb1-ba9e-0dede788ff08.avro"
} ],
"snapshot-log" : [ {
"timestamp-ms" : 1611694406483,
"snapshot-id" : 8271497753230544300
},
{
"timestamp-ms" : 1611694436618,
"snapshot-id" : 1257424822184505371
} ],
"metadata-log" : [ {
"timestamp-ms" : 1611694097253,
"metadata-file" : "/home/hadoop/warehouse/db2/part_table2/metadata/v1.metadata.json"
},
{
"timestamp-ms" : 1611694406483,
"metadata-file" : "/home/hadoop/warehouse/db2/part_table2/metadata/v2.metadata.json"
} ]
}

当一个SELECT查询读取一个Iceberg表,并在从catalog中的表格Catalog中获取其location后打开当前元数据文件时,查询引擎会读取current-snapshot-id的值。然后,它使用这个值在快照数组中找到该快照的条目,然后检索该快照的manifest-list条目的值,并打开该位置指向的清单列表。

image

Manifest list

另一个恰如其名的文件是清单列表(manifest list),它是一个清单文件的列表。清单列表包含了组成该快照的每个清单文件的信息,例如清单文件的位置、它作为一部分添加的快照,以及它所属的分区和数据文件跟踪的分区列的下限和上限信息。

以下是一个清单列表文件的完整内容示例:
snap-1257424822184505371-1-eab8490b-8d16-4eb1-ba9e-0dede788ff08.avro(转换为JSON格式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
{
"manifest_path": "/home/hadoop/warehouse/db2/part_table2/metadata/eab8490b-8d16-4eb1-ba9e-0dede788ff08-m0.avro",
"manifest_length": 4884,
"partition_spec_id": 0,
"added_snapshot_id": {
"long": 1257424822184505300
},
"added_data_files_count": {
"int": 1
},
"existing_data_files_count": {
"int": 0
},
"deleted_data_files_count": {
"int": 0
},
"partitions": {
"array": [ {
"contains_null": false,
"lower_bound": {
"bytes": "¹Ô\\u0006\\u0000"
},
"upper_bound": {
"bytes": "¹Ô\\u0006\\u0000"
}
} ]
},
"added_rows_count": {
"long": 1
},
"existing_rows_count": {
"long": 0
},
"deleted_rows_count": {
"long": 0
}
}
{
"manifest_path": "/home/hadoop/warehouse/db2/part_table2/metadata/d8a778f9-ad19-4e9c-88ff-28f49ec939fa-m0.avro",
"manifest_length": 4884,
"partition_spec_id": 0,
"added_snapshot_id": {
"long": 8271497753230544000
},
"added_data_files_count": {
"int": 1
},
"existing_data_files_count": {
"int": 0
},
"deleted_data_files_count": {
"int": 0
},
"partitions": {
"array": [ {
"contains_null": false,
"lower_bound": {
"bytes": "¸Ô\\u0006\\u0000"
},
"upper_bound": {
"bytes": "¸Ô\\u0006\\u0000"
}
} ]
},
"added_rows_count": {
"long": 1
},
"existing_rows_count": {
"long": 0
},
"deleted_rows_count": {
"long": 0
}
}

当一个SELECT查询读取一个Iceberg表,并在从元数据文件获取快照的位置后打开清单列表时,查询引擎会读取manifest-path条目的值,并打开清单文件。在这个阶段,它还可以进行一些优化,比如使用行计数或使用分区信息对数据进行过滤。

image

Manifest file

清单文件(Manifest file)用于跟踪数据文件以及每个文件的附加细节和统计信息。正如前面提到的,Iceberg能够解决Hive表格格式的问题的主要区别在于在文件级别跟踪数据 - 清单文件就是实现这一目标的关键。

每个清单文件在规模化时跟踪一部分数据文件,以实现并行处理和重用效率。它们包含了大量有用的信息,用于在从这些数据文件读取数据时提高效率和性能,例如关于分区成员资格、记录计数以及列的下限和上限的详细信息。这些统计信息在写操作期间为每个清单的数据文件子集编写,并且比Hive中的统计信息更有可能存在、准确且最新。

以下是一个清单文件的完整内容示例:
eab8490b-8d16-4eb1-ba9e-0dede788ff08-m0.avro(转换为JSON格式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
{
"status": 1,
"snapshot_id": {
"long": 1257424822184505300
},
"data_file": {
"file_path": "/home/hadoop/warehouse/db2/part_table2/data/ts_hour=2021-01-26-01/00000-6-7c6cf3c0-8090-4f15-a4cc-3a3a562eed7b-00001.parquet",
"file_format": "PARQUET",
"partition": {
"ts_hour": {
"int": 447673
}
},
"record_count": 1,
"file_size_in_bytes": 973,
"block_size_in_bytes": 67108864,
"column_sizes": {
"array": [ {
"key": 1,
"value": 47
},
{
"key": 2,
"value": 57
},
{
"key": 3,
"value": 60
} ]
},
"value_counts": {
"array": [ {
"key": 1,
"value": 1
},
{
"key": 2,
"value": 1
},
{
"key": 3,
"value": 1
} ]
},
"null_value_counts": {
"array": [ {
"key": 1,
"value": 0
},
{
"key": 2,
"value": 0
},
{
"key": 3,
"value": 0
} ]
},
"lower_bounds": {
"array": [ {
"key": 1,
"value": "\\u0002\\u0000\\u0000\\u0000"
},
{
"key": 2,
"value": "\\u0000„ ,ù\\u0005\\u0000"
},
{
"key": 3,
"value": "test message 2"
} ]
},
"upper_bounds": {
"array": [ {
"key": 1,
"value": "\\u0002\\u0000\\u0000\\u0000"
},
{
"key": 2,
"value": "\\u0000„ ,ù\\u0005\\u0000"
},
{
"key": 3,
"value": "test message 2"
} ]
},
"key_metadata": null,
"split_offsets": {
"array": [
4
]
}
}
}

当一个SELECT查询读取一个Iceberg表,并在从清单列表获取清单文件的位置后打开清单文件时,查询引擎会读取每个数据文件对象的file-path条目的值,并打开数据文件。在这个阶段,它还可以进行一些优化,比如使用行计数或使用分区或列统计信息对数据进行过滤。

通过了解Iceberg表的不同组件以及任何访问Iceberg表中数据的引擎或工具所采取的路径,现在让我们更深入地了解在Iceberg表上执行CRUD操作时发生的内部过程。

A Look Under the Covers

CREATE TABLE

First, let’s create a table in our environment.

1
2
3
4
5
6
7
8
CREATE TABLE table1 (
order_id BIGINT,
customer_id BIGINT,
order_amount DECIMAL(10, 2),
order_ts TIMESTAMP
)
USING iceberg
PARTITIONED BY ( HOUR(order_ts) );

执行后如下图所示:
image

在上面的例子中,我们在数据库db1中创建了一个名为table1的表。该表有4个列,并以order_ts时间戳列的小时粒度进行分区(稍后会详细介绍)。

当执行上述查询时,在元数据层创建了一个名为s0的快照的元数据文件(快照s0不指向任何清单列表,因为表中还没有数据)。然后,更新了db1.table1的当前元数据指针的目录条目,使其指向这个新元数据文件的路径。

INSERT

1
2
3
4
5
6
INSERT INTO table1 VALUES (
123,
456,
36.17,
'2021-01-26 08:10:23'
);

image

当我们执行这个INSERT语句时,会发生以下过程:

  • 首先,以Parquet文件的形式创建数据 - table1/data/order_ts_hour=2021-01-26-08/00000-5-cae2d.parquet
  • 然后,创建一个指向该数据文件的清单文件(包括附加的细节和统计信息)- table1/metadata/d8f9-ad19-4e.avro
  • 然后,创建一个指向该清单文件的清单列表(包括附加的细节和统计信息)- table1/metadata/snap-2938-1-4103.avro
  • 然后,基于先前的当前元数据文件创建一个新的元数据文件,其中包含一个新的快照s1,并跟踪先前的快照s0,指向这个清单列表(包括附加的细节和统计信息)- table1/metadata/v2.metadata.json
  • 最后,在目录中以原子方式更新db1.table1的当前元数据指针的值,使其指向这个新的元数据文件。

在所有这些步骤中,任何读取该表的人都会继续读取第一个元数据文件,直到原子步骤#5完成,这意味着使用数据的任何人都不会看到表的状态和内容的不一致视图。

MERGE INTO / UPSERT

现在,让我们逐步介绍MERGE INTO / UPSERT操作。

假设我们已经将一些数据导入到我们在后台创建的暂存表中。在这个简单的例子中,每次订单发生变化时都会记录信息,我们希望保持该表显示每个订单的最新详细信息,因此如果订单ID已经存在于表中,我们会更新订单金额。如果我们还没有该订单的记录,我们希望插入一条新订单的记录。

在这个例子中,暂存表包括对已经存在于表中的订单(order_id=123)的更新,以及一个尚未存在于表中的新订单,该订单发生在2021年1月27日10:21:46。

1
2
3
4
5
6
7
MERGE INTO table1
USING ( SELECT * FROM table1_stage ) s
ON table1.order_id = s.order_id
WHEN MATCHED THEN
UPDATE table1.order_amount = s.order_amount
WHEN NOT MATCHED THEN
INSERT *

image

当我们执行这个MERGE INTO语句时,会发生以下过程:

  1. 按照之前详细介绍的读取路径,确定table1table1_stage中具有相同order_id的所有记录。
  2. table1中读取包含order_id=123的记录的文件进入查询引擎的内存(00000-5-cae2d.parquet),然后将该内存副本中order_id=123的记录的order_amount字段更新为反映table1_stage中匹配记录的新order_amount。然后,将修改后的原始文件副本写入一个新的Parquet文件 - table1/data/order_ts_hour=2021-01-26-08/00000-1-aef71.parquet。即使文件中还有其他不符合order_id更新条件的记录,整个文件仍然会被复制,并且在复制时更新匹配的记录,并将新文件写出 - 这种策略被称为写时复制(copy-on-write)。Iceberg即将推出一种名为merge-on-read的新的数据更改策略,它在内部的行为方式不同,但仍提供相同的更新和删除功能。
  3. table1_stage中没有与table1中的任何记录匹配的记录以新的Parquet文件的形式写入,因为它属于与匹配记录不同的分区 - table1/data/order_ts_hour=2021-01-27-10/00000-3-0fa3a.parquet
  4. 然后,创建一个指向这两个数据文件的新清单文件(包括附加的细节和统计信息)- table1/metadata/0d9a-98fa-77.avro
    在这种情况下,快照s1中唯一的数据文件中的唯一记录发生了更改,因此没有重用清单文件或数据文件。通常情况下,清单文件和数据文件会在快照之间进行重用。
  5. 接着,创建一个指向这个清单文件的新清单列表(包括附加的细节和统计信息)- table1/metadata/snap-9fa1-3-16c3.avro
  6. 然后,基于先前的当前元数据文件创建一个新的元数据文件,其中包含一个新的快照s2,并跟踪先前的快照s0和s1,指向这个清单列表(包括附加的细节和统计信息)- table1/metadata/v3.metadata.json
  7. 最后,在目录中以原子方式更新db1.table1的当前元数据指针的值,使其指向这个新的元数据文件。

虽然这个过程有多个步骤,但它发生得非常快速。例如,Adobe进行了一些基准测试,发现他们每分钟可以实现15次提交。

在上面的图表中,我们还展示了在执行此MERGE INTO之前,后台进行了垃圾回收作业以清理未使用的元数据文件 - 请注意,我们在创建表时的第一个快照s0的元数据文件已经不存在了。因为每个新的元数据文件也包含了来自先前文件的重要信息,所以可以安全地清理它们。未使用的清单列表、清单文件和数据文件也可以通过垃圾回收进行清理。

SELECT

1
2
SELECT *
FROM db1.table1

image

当执行这个SELECT语句时,会发生以下过程:

  1. 查询引擎访问Iceberg目录。
  2. 然后,检索db1.table1的当前元数据文件位置条目。
  3. 然后,打开这个元数据文件,并检索当前快照s2的清单列表位置条目。
  4. 然后,打开这个清单列表,并检索唯一的清单文件的位置。
  5. 然后,打开这个清单文件,并检索两个数据文件的位置。
  6. 然后,读取这些数据文件,并且由于是SELECT *,将数据返回给客户端。

隐藏分区

Hive表格式的一个问题,即用户需要了解表的物理布局,以避免非常慢的查询。假设用户想要查看某一天的所有记录,比如2021年1月26日,他们发出以下查询:

1
2
3
SELECT *
FROM table1
WHERE order_ts = DATE '2021-01-26'

在Hive表中,将其按照订单首次发生的时间戳的小时级别进行了分区。在Hive中,这个查询通常会导致对整个表进行扫描。让我们来看看Iceberg如何解决这个问题,并为用户提供以直观方式与表进行交互的能力,同时仍然实现良好的性能,避免对整个表进行扫描。

image

当执行这个SELECT语句时,会发生以下过程:

  1. 查询引擎访问Iceberg目录。
  2. 然后,检索db1.table1的当前元数据文件位置条目。
  3. 然后,打开这个元数据文件,并检索当前快照s2的清单列表位置条目。它还查找文件中的分区规范,并看到表是按照order_ts字段的小时级别进行分区的。
  4. 然后,打开这个清单列表,并检索唯一的清单文件的位置。
  5. 然后,打开这个清单文件,并查看每个数据文件的条目,将数据文件所属的分区值与用户查询请求的分区值进行比较。该文件中的值对应于自Unix纪元以来的小时数,查询引擎使用这个值确定只有一个数据文件中的事件发生在2021年1月26日(或者换句话说,在2021年1月26日的00:00:00到2021年1月26日的23:59:59之间)。
    • 具体来说,唯一匹配的事件是我们插入的第一个事件,因为它发生在2021年1月26日的08:10:23。另一个数据文件的订单时间戳是2021年1月27日的10:21:46,即不是在2021年1月26日,所以它不符合过滤条件。
  6. 然后,只读取匹配的数据文件,并且由于是SELECT *,将数据返回给客户端。

时间旅行

Iceberg表格式提供的另一个关键功能是所谓的“时间旅行”。为了追踪表在时间上的状态,以满足合规性、报告或可重现性的需求,传统上需要编写和管理作业,在特定时间点创建和管理表的副本。相反,Iceberg提供了开箱即用的功能,可以查看过去不同时间点的表的状态。

例如,假设今天用户需要查看我们的表在2021年1月28日的内容,由于这是一篇静态的文本文章,假设在这之前插入了2021年1月27日的订单,并且在我们上面进行的UPSERT操作中,2021年1月26日的订单金额已经更新。他们的查询将如下所示:

1
2
3
SELECT *
FROM table1 AS OF '2021-01-28 00:00:00'
-- (timestamp is from before UPSERT operation)

image

当执行这个SELECT语句时,会发生以下过程:

  1. 查询引擎访问Iceberg目录。
  2. 然后,检索db1.table1的当前元数据文件位置条目。
  3. 然后,打开这个元数据文件,并查看快照数组中的条目(其中包含快照创建时的毫秒级Unix纪元时间,因此成为最新的快照),确定在请求的时间点(2021年1月28日午夜)时活动的快照,并检索该快照的清单列表位置条目,即s1。
  4. 然后,打开这个清单列表,并检索唯一的清单文件的位置。
  5. 然后,打开这个清单文件,并检索两个数据文件的位置。
  6. 然后,读取这些数据文件,并且由于是SELECT *,将数据返回给客户端。

请注意,在上面的图表中的文件结构中,虽然旧的清单列表、清单文件和数据文件在表的当前状态下没有使用,但它们仍然存在于数据湖中,并可供使用。

当然,保留旧的元数据和数据文件在这些用例中提供了价值,但在某个时候,您将拥有不再访问的元数据和数据文件,或者允许人们访问它们的价值超过保留它们的成本。因此,有一个异步的后台进程来清理旧文件,称为垃圾回收。垃圾回收策略可以根据业务需求进行配置,并且是在存储旧文件所需的存储空间和提供多长时间以及以何种粒度进行权衡的结果。

Compaction

Iceberg设计的另一个关键功能是压缩,它有助于平衡写入和读取之间的权衡。

在Iceberg中,压缩是一个异步的后台进程,将一组小文件压缩成更少的大文件。由于它是异步的并在后台进行,对用户没有负面影响。实际上,它基本上是一个特定类型的普通Iceberg写入作业,其输入和输出具有相同的记录,但在写入作业提交事务后,文件大小和属性在分析方面得到了显著改善。

在处理数据时,总会有权衡需要考虑,而一般来说,写入和读取方面的激励是相互矛盾的。

  • 在写入方面,通常希望低延迟 - 尽快使数据可用,这意味着在获得记录后可能立即写入,甚至不需要将其转换为列格式。但是,如果对每条记录都这样做,最终会得到每个文件一个记录的情况(小文件问题的最极端形式)。
  • 在读取方面,通常希望高吞吐量 - 在单个文件中有许多记录,并且以列格式存储,这样数据相关的可变成本(读取数据)超过了固定成本(记录保留的开销,打开每个文件等)。通常也希望使用最新的数据,但这会增加读取操作的成本。

压缩有助于平衡写入和读取之间的权衡 - 您可以在获得数据后几乎立即写入数据,极端情况下每个文件中有1条以行格式存储的记录,读取器可以立即查看和使用,同时后台压缩进程定期将所有这些小文件合并为更少、更大、以列格式存储的文件。

通过压缩,读取器始终以他们想要的高吞吐量形式持续拥有99%的数据,但仍以低延迟低吞吐量形式查看最新的1%数据。

对于这种用例,还需要注意压缩作业的输入文件格式和输出文件格式可以是不同的文件类型。一个很好的例子是从流式写入中写入Avro格式,然后将其压缩成用于分析的更大的Parquet文件。

另一个重要的注意事项是,由于Iceberg不是一个引擎或工具,调度/触发和实际的压缩工作是由与Iceberg集成的其他工具和引擎完成的。

Design Benefits of the Format

现在,让我们将我们之前讨论的内容应用到架构和设计提供的更高级价值上。

  • 事务的快照隔离

    • Iceberg表上的读取和写入不会相互干扰。
    • Iceberg通过乐观并发控制提供了并发写入的能力。
    • 所有写入都是原子性的。
  • 更快的规划和执行

    • 这两个好处都源于在写入路径上记录了关于写入内容的详细信息,而不是在读取路径上获取这些信息。
    • 因为文件列表是在对表进行更改时写入的,所以在运行时不需要进行昂贵的文件系统列表操作,这意味着运行时的工作量和等待时间大大减少。
    • 因为关于文件中数据的统计信息是在写入方面进行的,所以统计信息不会缺失、错误或过时,这意味着基于成本的优化器可以在决定哪个查询计划提供最快响应时间时做出更好的决策。
    • 因为关于文件中数据的统计信息是在文件级别进行跟踪的,所以统计信息不是粗粒度的,这意味着引擎可以进行更多的数据修剪,处理更少的数据,从而实现更快的响应时间。
    • 在本文早期提到的Ryan Blue的演示中,他分享了Netflix的一个示例用例的结果:
      • 对于一个Hive表的查询,仅规划查询就需要9.6分钟。
      • 对于相同的Iceberg表的查询,仅需要42秒来规划和执行查询。
  • 抽象物理层,暴露逻辑视图

    • 在本文早些时候,我们看到使用Hive表时,用户通常需要了解表的潜在不直观的物理布局,以实现良好的性能。
    • Iceberg提供了持续向用户提供逻辑视图的能力,将逻辑交互点与数据的物理布局解耦。我们看到了隐藏分区和压缩等功能是如何极其有用的。
    • Iceberg通过模式演化、分区演化和排序顺序演化的能力,提供了透明地随时间演化表的能力。关于这些能力的更多详细信息可以在Iceberg文档网站上找到。
    • 对于数据工程师来说,更容易在幕后尝试不同的、潜在更好的表布局。一旦提交,更改将立即生效,而无需用户更改其应用程序代码或查询。如果实验结果变得更糟,可以回滚事务,用户将返回到之前的体验。使实验更安全可以进行更多的实验,从而找到更好的做事方式。
  • 所有引擎立即看到更改
    因为组成表内容的文件是在写入方面定义的,一旦文件列表发生变化,所有新的读取器都会指向这个新的列表(通过从目录开始的读取流程),所以当写入者对表进行更改时,所有使用该表的新查询立即看到新的数据。

  • 事件监听器
    Iceberg具有一个框架,允许其他服务在Iceberg表上发生事件时收到通知。目前,这个功能处于早期阶段,只能在扫描表时触发事件。然而,这个框架为未来的功能提供了可能性,比如保持缓存、物化视图和索引与原始数据同步。

  • 高效地进行较小的更新
    因为数据是在文件级别进行跟踪的,所以可以更高效地对数据集进行较小的更新。

References