微信号:ai-front

介绍:面向AI爱好者、开发者和科学家,提供最新最全AI领域技术资讯、一线业界实践案例、搜罗整理业界技术分享干货、最新AI论文解读。每周一节技术分享公开课,助力你全面拥抱人工智能技术。

Flink关系型API解读:Table API 与SQL

2018-08-14 17:58 王剑

  

作者 | 王剑
编辑 | Vincent
AI 前线导读:

本篇文章主要介绍 Flink的关系型 API,整个文章主要分为下面几个部分来介绍:

一、    什么是 Flink 关系型 API 

二、    Flink 关系型 API 的各版本演进

三、    Flink 关系型 API 执行原理

四、    Flink 关系型 API 目前适用场景

五、    Table&SQL API 介绍

六、    Table&SQL API 举例

七、    动态表

八、    总结

更多优质内容请关注微信公众号“AI 前线”(ID:ai-front)
一、什么是 Flink 关系型 API

当我们在使用 Flink 做流式和批式任务计算的时候,往往会想到几个问题:  

  1. 需要熟悉两套 API : DataStream/DataSetAPI,API 有一定难度,开发人员无法 focus on business

  2. 需要有 Java 或 Scala 的开发经验

  3. Flink 同时支持批任务与流任务,如何做到 API 层的统一

Flink 已经拥有了强大的 DataStream/DataSetAPI,满足流计算和批计算中的各种场景需求,但是关于以上几个问题,我们还需要提供一种关系型的 API 来实现 Flink API 层的流与批的统一,那么这就是 Flink 的 Table & SQL API。

首先 Table & SQL API 是一种关系型 API,用户可以像操作 mysql 数据库表一样的操作数据,而不需要写 java 代码完成 Flink Function,更不需要手工的优化 java 代码调优。另外,SQL 作为一个非程序员可操作的语言,学习成本很低,如果一个系统提供 SQL 支持,将很容易被用户接受。

总结来说,关系型 API 的好处:  

  1. 关系型 API 是声明式的

  2. 查询能够被有效的优化

  3. 查询可以高效的执行

  4. “Everybody” knows SQL  

Table&SQL API 是流处理和批处理统一的 API 层,如下图。Flink 在 runtime 层是统一的,因为 Flink 将批任务看做流的一种特例来执行,然而在 API 层,Flink 为批和流提供了两套 API(DataSet 和 DataStream)。所以 Table&SQL API 就统一了 Flink 的 API 层,批数据上的查询会随着输入数据的结束而结束并生成 DataSet,流数据的查询会一直运行并生成结果流。Table&SQL API 做到了批与流上的查询具有同样的语法语义,因此不用改代码就能同时在批和流上执行。

关于 DataSet API 和 DataStream API 对应的 Table 如下图:

二、Flink 关系型 API 的各版本演进

关于 Table&SQL API,Flink 在 0.9 版本的时候,引进了 Table API,支持 Java 和 Scala 两种语言,是一个类似于 LINQ 模式的 API。用于对关系型数据进行处理。这系列 Table API 的操作对象就是能够进行简单的关系型操作的结构化数据流。结构如下图。然而 0.9 版本的 Table&SQL API 有着很大的局限性,0.9 版本 Table API 不能单独使用,必须嵌入到 DataSet 或者 DataStream 的程序中,对于批处理表的查询并不支持 outer join、order by 等操作。在流处理 Table 中只支持 filters、union,不支持 aggregations 以及 joins。并且,这个转化处理过程没有查询优化。整体来说 0.9 版本的 Flink Table API 还不是十分好用。

在后续的版本中,1.1.0 引入了 SQL,因此在 1.1.0 版本以后,Flink 提供了两个语义的关系型 API:语言内嵌的 Table API(用于 Java 和 Scala)以及标准 SQL。这两种 API 被设计用于在流和批的任务中处理数据 在 API 层的统一,这意味着无论输入是批处理数据还是流数据,查询产生完全相同的结果。

在 1.20 版本之后逐渐增加 SQL 的功能,并对 Table API 做了大量的 Enhancement 了。在 1.2.0 版本中,Flink 的关系 API 在数据流中,支持关系操作包括投影、过滤和窗口聚合。

在 1.30 版本中开始支持各种流上 SQL 操作,例如 SELECT, FROM, WHERE,UNION、aggregation 和 UDF 能力。在 2017 年 3 月 2 日进行的 Flink Meetup 与 2017 年 5 月 24 日 Strata 会议,Flink 都有相应的 topic 讨论,未来在 Flink SQL 方面会支持更细粒度的 join 操作和对 dynamic table 的支持。

三、Flink 关系型 API 执行原理

Flink 使用基于 Apache Calcite 这个 SQL 解析器做 SQL 语义解析。利用 Calcite 的查询优化框架与 SQL 解释器来进行 SQL 的解析、查询优化、逻辑树生成,得到 Calcite 的 RelRoot 类的一颗逻辑执行计划树,并最终生成 Flink 的 Table。Table 里的执行计划会转化成 DataSet 或 DataStream 的计算,经历物理执行计划优化等步骤。但是,Table API 和 SQL 最终还是基于 Flink 的已有的 DataStream API 和 DataSet API,任何对于 DataStream API 和 DataSet API 的性能调优提升都能够自动地提升 Table API 或者 SQL 查询的效率。这两种 API 的查询都会用包含注册过的 Table 的 catalog 进行验证,然后转换成统一 Calcite 的 logical plan。再利用 Calcite 的优化器优化转换规则和 logical plan。根据数据源的性质 (流和批) 使用不同的规则进行优化。最终优化后的 plan 转传成常规的 Flink DataSet 或 DataStream 程序。结构如下图:

3.1 Translation to Logical Plan

每次调用 Table&SQL API,就会生成 Flink 逻辑计划的节点。比如对 groupBy 和 select 的调用会生成节点 Project、Aggregate、Project,而 filter 的调用会生成节点 Filter。这些节点的逻辑关系,就会组成下图的一个 Flink 自身数据结构表达的一颗逻辑树 ; 根据这个已经生成的 Flink 的 logical Plan,将它转换成 calcite 的 logical Plan,这样我们才能用到 calcite 强大的优化规则。Flink 由上往下依次调用各个节点的 construct 方法,将 Flink 节点转换成 calcite 的 RelNode 节点。

3.2 Translation to DataStream Plan

优化逻辑计划并转换成 Flink 的物理计划,Flink 的这部分实现统一封装在 optimize 方法里头。这部分涉及到多个阶段,每个阶段都是用 Rule 对逻辑计划进行优化和改进。声明定义于派生 RelOptRule 的一个类,然后再构造函数中要求传入 RelOptRuleOperand 对象,该对象需要传入这个 Rule 将要匹配的节点类型。如果这个自定义的 Rule 只用于 LogicalTableScan 节点,那么这个 operand 对象应该是 operand(LogicalTableScan.class, any())。通过以上代码对逻辑计划进行了优化和转换,最后会将逻辑计划的每个节点转换成 Flink Node,既可物理计划。

3.3     Translation to Flink Program

四、Flink 关系型 API 目前适用场景
4.1 目前支持范围

Batch SQL & Table API 支持:

  • Selection, Projection, Sort, Inner & Outer Joins, Set operations

  • Windows for Slide, Tumble, Session

Streaming Table API 支持:

  • Selection, Projection, Union

  • Windows for Slide, Tumble, Session

Streaming SQL:

  • Selection, Projection, Union, Tumble

4.2 目前使用场景

五、Table&SQL API 介绍

Table API 一般与 DataSet 或者 DataStream 紧密关联,可以通过一个 DataSet 或 DataStream 创建出一个 Table,再用类似于 filter, join, 或者 select 关系型转化操作来转化为一个新的 Table 对象。最后将一个 Table 对象转回一个 DataSet 或 DataStream。从内部实现上来说,所有应用于 Table 的转化操作都变成一棵逻辑表操作树,在 Table 对象被转化回 DataSet 或者 DataStream 之后,转化器会将逻辑表操作树转化为对等的 DataSet 或者 DataStream 操作符。

5.1 Table&SQL API 的简单介绍
1.   Create a TableEnvironment

TableEnvironment 对象是 Table API 和 SQL 集成的一个核心,支持以下场景:

  • 注册一个 Table

  • 注册一个外部的 catalog

  • 执行 SQL 查询

  • 注册一个用户自定义的 function

  • 将 DataStream 或 DataSet 转成 Table

一个查询中只能绑定一个指定的 TableEnvironment,TableEnvironment 可以通过来配置 TableConfig 来配置,通过 TableConfig 可以自定义查询优化以及 translation 的进程。

TableEnvironment 执行过程如下:

(1)TableEnvironment.sql() 为调用入口

(2)Flink 实现了个 FlinkPlannerImpl,执行 parse(sql),validate(sqlNode),rel(sqlNode) 操作

(3)生成 Table

其中,LogicalRelNode 是 Flink 执行计算树里的叶子节点。

源码如下图:

 2.    Register a Table

(1)将一个 Table 注册给 TableEnvironment

(2)将一个 TableSource 注册给 TableEnvironment, 这里的 TableSource 指的是将数据存储系统的作为 Table,例如 mysql,hbase,CSV,Kakfa,RabbitMQ 等等。

(3)将一个外部的 Catalog 注册给 TableEnvironment,访问外部系统的数据或文件。

(4)将 DataStream 或 DataSet 注册为 Table

 3.    Query a Table

(1)    Table API

Table API 是一个 Scala 和 Java 的集成查询序言。与 SQL 不同的是,Table API 的查询不是一个指定的 sql 字符串,而是调用指定的 API 方法。Table API 中的每一个方法输入都是一个 Table,输出也是一个新的 Table。

通过 table API 来提交任务的话,也会经过 calcite 优化等阶段,基本流程和直接运行 sql 类似:

  1. table API parser: Flink 会把 table API 表达的计算逻辑也表示成逻辑树,用 treeNode 表示 ;

  2. 在这棵树上的每个节点的计算逻辑用 Expression 来表示。

  3. Validate: 会结合 catalog 将树的每个节点的 Unresolved  Expression 进行绑定,生成 Resolved Expression;

  4. 生成 Logical Plan: 依次遍历数的每个节点,调用 construct 方法将原先用 treeNode 表达的节点转成成用 calcite 内部的数据结构 relNode 来表达。即生成了 LogicalPlan, 用 relNode 表示 ;

  5. 生成 optimized Logical Plan: 先基于 calcite rules 去优化 logical Plan,

  6. 再基于 Flink 定制的一些优化 rules 去优化 logical Plan;

  7. 生成 Flink Physical Plan: 这里也是基于 Flink 里头的 rules 将,将 optimized LogicalPlan 转成成 Flink 的物理执行计划;

  8. 将物理执行计划转成 Flink Execution Plan: 就是调用相应的 tanslateToPlan 方法转换。

(2)    SQL

Flink SQL 是基于 Apache Calcite 的实现的,Calcite 实现了 SQL 标准解析。SQL 查询是一个完整的 sql 字符串来查询。一条 stream sql 从提交到 calcite 解析、优化最后到 Flink 引擎执行,一般分为以下几个阶段:

  1. Sql Parser: 将 sql 语句解析成一个逻辑树, 在 calcite 中用 SqlNode 表示逻辑树 ;

  2. Sql Validator: 结合 catalog 去验证 sql 语法;

  3. 生成 Logical Plan: 将 sqlNode 表示的逻辑树转换成 Logical Plan, 用 relNode 表示 ;

  4. 生成 optimized Logical Plan: 先基于 calcite rules 去优化 logical Plan,

  5. 再基于 Flink 定制的一些优化 rules 去优化 logical Plan;

  6. 生成 Flink Physical Plan: 这里也是基于 Flink 里头的 rules 将,将 optimized Logical Plan 转成成 Flink 的物理执行计划;

  7. 将物理执行计划转成 Flink Execution Plan: 就是调用相应的 tanslateToPlan 方法转换。

(3)Table&SQL API 混合使用

Table API 和 SQL 查询可以很容易的混合使用,因为它们的返回结果都是 Table 对象。一个基于 Table API 的查询可以基于一个 SQL 查询的结果。同样地,一个 SQL 查询可以被定义一个 Table API 注册 TableEnvironment 作为 Table 的查询结果。

 4.    输出 Table

为了将 Table 进行输出,我们可以使用 TableSink。TableSink 是一个通用的接口,支持各种各样的文件格式 (e.g. CSV, Apache Parquet, Apache Avro),也支持各种各样的外部系统 (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch),同样支持各种各样的 MQ(e.g., Apache Kafka, RabbitMQ)。

批数据的导出 Table 使用 BatchTableSink, 流数据的导出 Table 使用的是 AppendStreamTableSink,RetractStreamTableSink 和 UpsertStreamTableSink.

 5.    解析 Query 并执行

Table&SQL API 查询被解析成 DataStream 或 DataSet 程序。一次查询就是一个 logical query plan,解析这个 logical query plan 分为两步:  

  1. 优化 logical plan,

  2. 将 logical plan 转为 DataStream 或 DataSet

一旦 Table&SQL API 解析完毕, Table& SQL API 的查询就会被当做普通 DataStream 或 DataSet 被执行。

5.2 Table 转为 DataStream 或 DataSet


5.3 Convert a Table into a DataSet


5.4 Table&SQL API 与 Window

Window 种类

  • tumbling window (GROUP BY)

  • sliding window (window functions)

(1)    Tumbling Window

(2)    Sliding Window

5.5 Table&SQL API 与 Stream Join


六、Table&SQL API 举例

首先需要引入 Flink 关系型 api 和 scala 的相关 jar 包:

6.1 批数据的相关代码

6.2 流数据的相关代码

七、动态表

Flink1.3 以后,在 Flink sql 上支持动态表查询,也就是说动态表是持续更新,并且能够像常规的静态表一样查询的表。但是,与批处理表查询终止后返回一个静态表作为结果不同的是,动态表中的查询会持续运行,并根据输入表的修改产生一个持续更新的表。因此,结果表也是动态的。在动态表中运行查询并产生一个新的动态表,这是因为流和动态表是可以相互转换的。

流被转换为动态表,动态表使用一个持续查询进行查询,产生一个新的动态表。最后,结果表被转换成流。上面所说的只是逻辑模型,并不意味着实际执行的查询查询也是这个步骤。实际上,持续查询在内部被转换成传统的 DataStream 程序去执行。

动态表查询步骤如下:

  1. 在流中定义动态表

  2. 查询动态表

  3. 生成动态表

7.1 在流中定义动态表

动态表上的 SQL 查询的第一步是在流中定义一个动态表。这意味着我们必须指定流中的记录如何修改现有的动态表。流携带的记录必须具有映射到表的关系模式。在流中定义动态表有两种模式:append 模式和 update 模式。

在 append 模式中,流中的每条记录是对动态表的插入修改。因此,流中的所有记录都 append 到动态表中,使得它的大小不断增长并且无限大。下图说明了 append 模式。append 模式如下图。

在 update 模式中,流中的记录可以作为动态表的插入、更新或者删除修改(append 模式实际上是一种特殊的 update 模式)。当在流中通过 update 模式定义一个动态表时,我们可以在表中指定一个唯一的键属性。在这种情况下,更新和删除操作会带着键属性一起执行。更新模式如下图所示。

7.2 查询动态表

一旦我们定义了动态表,我们可以在上面执行查询。由于动态表随着时间进行改变,我们必须定义查询动态表的意义。假定我们有一个特定时间的动态表的 snapshot,这个 snapshot 可以作为一个标准的静态批处理表。我们将动态表 A 在点 t 的 snapshot 表示为 A[t],可以使用 SQL 查询来查询 snapshot,该查询产生了一个标准的静态表作为结果,我们把在时间 t 对动态表 A 做的查询 q 的结果表示为 q(A[t])。如果我们反复在动态表的 snapshot 上计算查询结果,以获取进度时间点,我们将获得许多静态结果表,它们随着时间的推移而改变,并且有效的构成了一个动态表。我们在动态表的查询中定义如下语义。

查询 q 在动态表 A 上产生了一个动态表 R,它在每个时间点 t 等价于在 A[t] 上执行 q 的结果,即 R[t]=q(A[t])。该定义意味着在批处理表和流表上执行相同的查询 q 会产生相同的结果。

在下面的例子中,我们给出了两个例子来说明动态表查询的语义。在下图中,我们看到左侧的动态输入表 A,定义成 append 模式。在时间 t=8 时,A 由 6 行(标记成蓝色)组成。在时间 t=9 和 t=12 时,有一行追加到 A(分别用绿色和橙色标记)。我们在表 A 上运行一个如图中间所示的简单查询,这个查询根据属性 k 分组,并统计每组的记录数。在右侧我们看到了 t=8(蓝色),t=9(绿色)和 t=12(橙色)时查询 q 的结果。在每个时间点 t,结果表等价于在时间 t 时再动态表 A 上执行批查询。

这个例子中的查询是一个简单的分组(但是没有窗口)聚合查询。因此,结果表的大小依赖于输入表的分组键的数量。此外,这个查询会持续更新之前产生的结果行,而不只是添加新行。

第二个例子展示了一个类似的查询,但是有一个很重要的差异。除了对属性 k 分组以外,查询还将记录每 5 秒钟分组为一个滚动窗口,这意味着它每 5 秒钟计算一次 k 的总数。 我们使用 Calcite 的分组窗口函数来指定这个查询。在图的左侧,我们看到输入表 A ,以及它在 append 模式下随着时间而改变。在右侧,我们看到结果表,以及它随着时间演变。

与第一个例子的结果不同的是,这个结果表随着时间而增长,例如每 5 秒钟计算出新的结果行。虽然非窗口查询更新结果表的行,但是窗口聚合查询只追加新行到结果表中。无论输入表什么时候更新,都不可能计算查询的完整结果。相反,查询编译成流应用,根据输入的变化持续更新它的结果。这意味着不是所有的有效 SQL 都支持,只有那些持续性的、递增的和高效计算的被支持。

7.3 生成动态表

查询动态表生成的动态表,其相当于查询结果。根据查询和它的输入表,结果表会通过插入、更新和删除持续更改,就像普通的 mysql 数据表一样。它可能是一个不断被更新的单行表,或是一个只插入不更新的表。

传统的 mysql 数据库在故障和复制的时候,通过日志重建表。比如 UNDO、REDO 和 UNDO/REDO 日志。UNDO 日志记录被修改元素之前的值来回滚不完整的事务,REDO 日志记录元素修改的新值来重做已完成事务丢失的改变,UNDO/REDO 日志同时记录了被修改元素的旧值和新值来撤销未完成的事务,并重做已完成事务丢失的改变。基于这些日志,动态表可以转换成两类更改日志流:REDO 流和 REDO+UNDO 流。

通过将表中的修改转换为流消息,动态表被转换为 redo+undo 流。插入修改生成一条新行的插入消息,删除修改生成一条旧行的删除消息,更新修改生成一条旧行的删除消息以及一条新行的插入消息。行为如下图所示。

左侧显示了一个维护在 append 模式下的动态表,作为中间查询的输入。查询的结果转换为显示在底部的 redo+undo 流。输入表的第一条记录 (1,A) 作为结果表的一条新纪录,因此插入了一条消息 +(A,1) 到流中。第二条输入记录 k=‘A’(4,A) 导致了结果表中 (A,1) 记录的更新,从而产生了一条删除消息 -(A,1) 和一条插入消息 +(A,2)。所有的下游操作或数据汇总都需要能够正确处理这两种类型的消息。

在两种情况下,动态表会转换成 redo 流:要么它只是一个 append 表(即只有插入修改),要么它有一个唯一的键属性。动态表上的每一个插入修改会产生一条新行的插入消息到 redo 流。由于 redo 流的限制,只有带有唯一键的表能够进行更新和删除修改。如果一个键从动态表中删除,要么是因为行被删除,要么是因为行的键属性值被修改了,所以一条带有被移除键的删除消息发送到 redo 流。更新修改生成带有更新的更新消息,比如新行。由于删除和更新修改根据唯一键来定义,下游操作需要能够根据键来访问之前的值。下图描述如何将上述相同查询的结果表转换为 redo 流。

插入到动态表的 (1,A) 产生了 +(A,1) 插入消息。产生更新的 (4,A) 生成了 *(A,2) 的更新消息。Redo 流的通常做法是将查询结果写到仅 append 的存储系统,比如滚动文件或者 Kafka topic ,或者是基于 key 访问的数据存储,比如 Cassandra、关系型 mysql。

切换到动态表发生的改变

在 1.2 版本中,Flink 关系 API 的所有流操作,例如过滤和分组窗口聚合,只会产生新行,并且不能更新先前发布的结果。 相比之下,动态表能够处理更新和删除修改。

1.2 版本中的处理模型是动态表模型的一个子集, 通过附加模式将流转换为动态表,即一个无限增长的表。 由于所有操作仅接受插入更改并在其结果表上生成插入更改(即,产生新行),因此所有在动态 append 表上已经支持的查询,将使用重做模型转换回 DataStreams,仅用于 append 表。最后,值得注意的是在开发代码中,我们无论是使用 Table API 还是 SQL,优化和转换程序并不知道查询是通过 Table API 还是 SQL 来定义的。由于 Table API 和 SQL 在语义方面等同,只是在样式上有些区别而已。

八、总结

本篇文章整理了 Flink 关系型 API 的相关知识,整体上来说,Flink 关系型 API 还是很好用的,其原理与实现结构清晰,存在很多可借鉴的地方。

Reference:  

  1. http://flink.apache.org/

  1. Flink 社区

  2. 2017 Strata Meeting

  3. 2017 Flink Forward Meeting

 作者简介

王剑,东北大学硕士,资深大数据工程师、资深 java 工程师,多年 JAVA 架构设计研发,大数据统计分析平台设计与研发经验。曾就职于美国硅谷公司 Zuora 和国内大数据领域独角兽 TalkingData,现就职于新加坡互联网公司 sea,擅长高并发、高可用、分布式系统设计的后端服务、支付领域业务开发以及大数据流式处理有较深入研究。


如果你喜欢这篇文章,或希望看到更多类似优质报道,记得给我留言和点赞哦!

 
AI前线 更多文章 谷歌曝重大隐私问题,安卓苹果无一幸免! 百度谷歌都作恶,但到底哪家技术更厉害? 我数学不好、不爱刷题,如何入门机器学习? 开源Featuretools:机器学习开发提速10倍! 集Python、C++、R为一体!Julia 1.0重磅发布
猜您喜欢 Spark专辑:从Scala开始(三),函数式编程 socket.inet_aton在Python2和3中引发的血案 苹果发布会,你想知道的都在这里 BoCloud博云云计算产品荣获IT产品信息安全认证 字符串hash函数的本质