欢迎来到天剑厚德,这里有最新的大数据技术原创分享。

16 Flink SQL开发

flink healthsun 0评论

16.1. 什么是flink关系型API

当我们在使用flink做流式和批式任务计算的时候,往往会想到几个问题:

  1. 需要熟悉两套API : DataStream/DataSet API,API有一定难度,开发人员无法集中精力到具体业务的开发
  2. 需要有Java或Scala的开发经验
  3. flink同时支持批任务与流任务,如何做到API层的统一

flink已经拥有了强大的DataStream/DataSetAPI,满足流计算和批计算中的各种场景需求,但是关于以上几个问题,我们还需要提供一种关系型的API来实现flink API层的流与批的统一,那么这就是flink的Table API & SQL

首先Table API& SQL 是一种关系型API,用户可以像操作MySQL数据库表一样的操作数据,而不需要写Java代码完成flink function,更不需要手工的优化Java代码调优。另外,SQL作为一个非程序员可操作的语言,学习成本很低,如果一个系统提供SQL支持,将很容易被用户接受。

总结来说,关系型API的好处:

  1. 关系型API是声明式的
  2. 查询能够被有效的优化
  3. 查询可以高效的执行
  4. “Everybody” knows SQL

Table API& SQL 是流处理和批处理统一的API层,如下图。

  • flink在runtime层是统一的,因为flink将批任务看做流的一种特例来执行
  • 在API层,flink为批和流提供了两套API(DataSet和DataStream)
  • Table API & SQL就统一了flink的API层,批数据上的查询会随着输入数据的结束而结束并生成DataSet,流数据的查询会一直运行并生成结果流。
  • Table API & SQL做到了批与流上的查询具有同样的语法语义,因此不用改代码就能同时在批和流上执行。

Table API的特点 Table API和SQL都是Apache Flink中高等级的分析API,SQL所具备的特点Table API也都具有,如下:

  • 声明式 – 用户只关心做什么,不用关心怎么做;
  • 高性能 – 支持查询优化,可以获取最好的执行性能;
  • 流批统一 – 相同的统计逻辑,既可以流模式运行,也可以批模式运行;
  • 标准稳定 – 语义遵循SQL标准,语法语义明确,不易变动。

当然除了SQL的特性,因为Table API是在Flink中专门设计的,所以Table API还具有自身的特点:

  • 表达方式的扩展性 – 在Flink中可以为Table API开发很多便捷性功能,如:Row.flatten(), map/flatMap 等
  • 功能的扩展性 – 在Flink中可以为Table API扩展更多的功能,如:Iteration,flatAggregate 等新功能
  • 编译检查 – Table API支持java和scala语言开发,支持IDE中进行编译检查。

Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.8.1</version>
</dependency>

另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.8.1</version>
</dependency>

16.2. Table API和SQL程序的结构

Table API一般与DataSet或者DataStream紧密关联,可以通过一个DataSet或DataStream创建出一个Table,再用类似于filter, join, 或者 select关系型转化操作来转化为一个新的Table对象。最后将一个Table对象转回一个DataSet或DataStream。从内部实现上来说,所有应用于Table的转化操作都变成一棵逻辑表操作树,在Table对象被转化回DataSet或者DataStream之后,转化器会将逻辑表操作树转化为对等的DataSet或者DataStream操作符。

Flink的批处理和流处理的Table API和SQL程序遵循相同的模式;所以我们只需要使用一种来演示即可要想执行flink的SQL语句,首先需要获取SQL的执行环境:两种方式(batch和streaming):

批处理:

val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

流处理:

val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责:

  • 在内部目录中注册一个表
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的(标量,表格或聚合)函数
  • 转换DataStream或DataSet成Table
  • 持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment

16.3. 在内部目录中注册一个表

TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格和输出表格。输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统输入表可以从各种来源注册:

  • 现有Table对象,通常是表API或SQL查询的结果。
  • TableSource,它访问外部数据,例如文件,数据库或消息传递系统。
  • DataStream或DataSet来自DataStream或DataSet程序。

输出表可以使用注册TableSink。

16.3.1. 注册一个表

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable)
// Table is the result of a simple projection query 
val projTable: Table = tableEnv.scan("projectedTable ").select(...)

16.3.2. 注册一个TableSource

TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],…)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,…)

// get a TableEnvironment 
val tableEnv = TableEnvironment.getTableEnvironment(env) 
// create a TableSource
val csvSource: TableSource = CsvTableSource.builder().path("./data/score.csv")...
// register the TableSource as table "CsvTable" 
tableEnv.registerTableSource("CsvTable", csvSource)

16.3.3. 注册一个TableSink

注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],…)

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

16.4. Table和DataStream和DataSet的集成

16.4.1. 将DataStream或DataSet转换为Table

在上面的例子讲解中,直接使用的是:registerTableSource注册表

对于flink来说,还有更灵活的方式:比如直接注册DataStream或者DataSet转换为一张表。

然后DataStream或者DataSet就相当于表,这样可以继续使用SQL来操作流或者批次的数据

语法:

// get TableEnvironment 
// registration of a DataSet is equivalent
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

示例

处理方式,加载下列数据,并注册为表,查询所有数据,写入到CSV文件中。

开发步骤

  1. 获取流处理环境
  2. 获取TableEnvironment
  3. 加载本地集合
  4. 根据数据注册表
  5. 执行SQL
  6. 写入CSV文件中
  7. 执行任务

代码

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.sinks.CsvTableSink

object DataSet_DataStreamToTable {
  def main(args: Array[String]): Unit = {

    // 1. 获取流处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. 获取TableEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    // 3. 加载本地集合
    val orderA: DataStream[Order1] = env.fromCollection(Seq(
      Order1(1L, "beer", 3),
      Order1(1L, "diaper", 4),
      Order1(3L, "rubber", 2)))

    // 4. 根据数据注册表
    tableEnv.registerDataStream("OrderA", orderA)
    // 5. 执行SQL
    val result = tableEnv.sqlQuery("SELECT * FROM OrderA")
    // 6. 写入CSV文件中
    result.writeToSink(new CsvTableSink("./data/ccc", ",", 1, FileSystem.WriteMode.OVERWRITE))
    // 7. 执行任务
    env.execute()
  }
}

// 定义样例类
case class Order1(user: Long, product: String, amount: Int)

16.4.2. 将Table转换为DataStream或DataSet

Table可以转换为DataStream或者DataSet,这样的话,自定义的DataStream或者DataSet程序就可以基于Table API或者SQL查询的结果来执行了。

当将一个Table转换为DataStream或者DataSet时,你需要指定生成的DataStream或者DataSet的数据类型,即需要转换表的行的数据类型,通常最方便的转换类型是Row,下面列表概述了不同选项的功能:

  • Row:字段通过位置映射、可以是任意数量字段,支持空值,非类型安全访问   
  • POJO:字段通过名称(POJO字段作为Table字段时,必须命名)映射,可以是任意数量字段,支持空值,类型安全访问   
  • Case Class:字段通过位置映射,不支持空值,类型安全访问   
  • Tuple:字段通过位置映射,不得多于22(Scala)或者25(Java)个字段,不支持空值,类型安全访问   
  • Atomic Type:Table必须有一个字段,不支持空值,类型安全访问。

将Table转换为DataStream

流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。

有两种模式可以将 Table转换为DataStream:

1:Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时,即仅附加,之前发送的结果不会被更新。

2:Retract Mode:始终都可以使用此模式,它使用一个boolean标识来编码INSERT和DELETE更改。

语法格式:

// get TableEnvironment. 
// registration of a DataSet is equivalent
// ge val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] = tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

示例

使用Flink流式环境, 加载下列集合数据, 转换为Table, 并将Table转换为DataStream

List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (6L, 6, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World"))
    )

开发步骤

  1. 获取流处理环境
  2. 设置并行度
  3. 获取Table运行环境
  4. 加载本地集合
  5. 转换DataStream为Table
  6. 将table转换为DataStream—-将一个表附加到流上Append Mode
  7. 将table转换为DataStream—-Retract Mode true代表添加消息,false代表撤销消息
  8. 打印输出
  9. 执行任务

代码

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.sinks.CsvTableSink


object TableTODataStream {
  def main(args: Array[String]): Unit = {
    // 1. 获取流处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. 设置并行度
    env.setParallelism(1)
    // 3. 获取Table运行环境
    val tEnv = TableEnvironment.getTableEnvironment(env)
    // 4. 加载本地集合
    val stream = env.fromCollection(
      List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (6L, 6, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World"))
    )
    // 5. 转换DataStream为Table
    val table: Table = tEnv.fromDataStream(stream)
    // 6. 将table转换为DataStream----将一个表附加到流上Append Mode
    val appendStream: DataStream[(Long, Int, String)] = tEnv.toAppendStream[(Long, Int, String)](table)
    // 7. 将table转换为DataStream----Retract Mode true代表添加消息,false代表撤销消息
    val retractStream: DataStream[(Boolean, (Long, Int, String))] = tEnv.toRetractStream[(Long, Int, String)](table)
    // 8. 打印输出
    appendStream.print()
    retractStream.print()
    // 9. 执行任务
    env.execute()
  }
}

将Table转换为DataSet

语法格式:

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

示例

使用Flink批处理环境, 加载下列集合数据, 转换为Table, 并将Table转换为DataSet

List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (6L, 6, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World"))
    )

开发步骤

  1. 获取批处理环境
  2. 设置并行度
  3. 获取Table运行环境
  4. 加载本地集合
  5. DataSet转换为Table
  6. table转换为dataSet
  7. 打印输出
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}

object TableToDataSet{
  def main(args: Array[String]): Unit = {
    //1. 批处理环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2. 设置并行度
    env.setParallelism(1)
    //3. 获取table运行环境
    val tableEnvironment = TableEnvironment.getTableEnvironment(env)
    //4. 加载本地集合
    val collection: DataSet[(Long, Int, String)] = env.fromCollection(List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (3L, 3, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World")))
    //5. DataSet转换为Table
    val table: Table = tableEnvironment.fromDataSet(collection)
    //6. table转换为dataSet
    val toDataSet: DataSet[(Long, Int, String)] = tableEnvironment.toDataSet[(Long, Int, String)](table)
    //7.打印数据
    toDataSet.print()
//    env.execute()
  }
}

如果你觉得这篇文章或者我分享的主题对你有帮助,请支持我继续更新网站和主题 !捐赠本站
喜欢 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址