欢迎来到天剑厚德,这里有最新的大数据技术原创分享。

17 flink数据案例分析

flink healthsun 0评论

17.1 批处理案例1

示例

使用Flink SQL统计用户消费订单的总金额、最大金额、最小金额、订单总数。

订单id用户名订单日期消费金额
1zhangsan2018-10-20 15:30358.5

测试数据(订单ID、用户名、订单日期、订单金额)

(1,”zhangsan”,”2018-10-20 15:30″,358.5),
(2,”zhangsan”,”2018-10-20 16:30″,131.5),
(3,”lisi”,”2018-10-20 16:30″,127.5),
(4,”lisi”,”2018-10-20 16:30″,328.5),
(5,”lisi”,”2018-10-20 16:30″,432.5),
(6,”zhaoliu”,”2018-10-20 22:30″,451.0),
(7,”zhaoliu”,”2018-10-20 22:30″,362.0),
(8,”zhaoliu”,”2018-10-20 22:30″,364.0),
(9,”zhaoliu”,”2018-10-20 22:30″,341.0)

步骤

  1. 获取一个批处理运行环境
  2. 获取一个Table运行环境
  3. 创建一个样例类Order用来映射数据(订单名、用户名、订单日期、订单金额)
  4. 基于本地Order集合创建一个DataSet source
  5. 使用Table运行环境将DataSet注册为一张表
  6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
  7. 使用TableEnv.toDataSet将Table转换为DataSet
  8. 打印测试

参考代码

import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.types.Row

object BatchFlinkSqlDemo {

  // 创建一个样例类Order用来映射数据(订单名、用户名、订单日期、订单金额)
  case class Order(id:Int, userName:String, createTime:String, money:Double)

  def main(args: Array[String]): Unit = {
    // 获取一个批处理运行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 获取一个Table运行环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    // 基于本地Order集合创建一个DataSet source
    val orderDataSet = env.fromCollection(List(
      Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
      Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
      Order(3, "lisi", "2018-10-20 16:30", 127.5),
      Order(4, "lisi", "2018-10-20 16:30", 328.5),
      Order(5, "lisi", "2018-10-20 16:30", 432.5),
      Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
      Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
      Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
      Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
    ))

    // 使用Table运行环境将DataSet注册为一张表
    tableEnv.registerDataSet("t_order", orderDataSet)

    // 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
    val sql =
      """
        |select
        | userName,
        | sum(money) as totalMoney,
        | max(money) as maxMoney,
        | min(money) as minMoney,
        | count(1) as totalCount
        |from
        | t_order
        |group by
        | userName
      """.stripMargin

    val table: Table = tableEnv.sqlQuery(sql)
    // 打印输出的这个表格的定义(哪些列、列的类型)
    table.printSchema()

    // 使用TableEnv.toDataSet将Table转换为DataSet
    val resultDataSet: DataSet[Row] = tableEnv.toDataSet[Row](table)

    // 打印测试
    resultDataSet.print()
  }
}

17.2 批处理案例2

示例

读取CSV文件,以table api的方式查询name张三的数据

1,张三,1,98
2,张三,2,77.5
3,张三,3,89
4,张三,4,65
5,张三,5,78
6,张三,6,70
7,李四,1,78
8,李四,2,58
9,李四,3,65
10,李四,4,78

步骤

  1. 获取批处理运行环境
  2. 获取Table运行环境
  3. 加载外部CSV文件
  4. 将外部数据构建成表
  5. 使用table方式查询数据
  6. 打印表结构
  7. 将数据落地到新的CSV文件中
  8. 执行任务

代码

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.api.{Table, TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource

object BatchTableDemo {

  def main(args: Array[String]): Unit = {
    //创建batch执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //创建table环境用于batch查询
    val tableEnvironment = TableEnvironment.getTableEnvironment(env)
    //加载外部CSV数据
    val csvTableSource:CsvTableSource = CsvTableSource.builder()
      .path("./data/score.csv") //文件路径
      .field("id", Types.INT) //第一列数据
      .field("name", Types.STRING) //第二列数据
      .field("subjectId", Types.INT) //第三列数据
      .field("score", Types.DOUBLE) //第三列数据
      .fieldDelimiter(",") //列分隔符,默认是","
      .lineDelimiter("\n") //换行符
      .ignoreFirstLine() //忽略第一行
      .ignoreParseErrors() //忽略解析错误
      .build()
    //将外部数据构建成表
    tableEnvironment.registerTableSource("tableA", csvTableSource)

    //TODO 1:使用table方式查询数据
    val table: Table = tableEnvironment.scan("tableA").select("id , name , subjectId,score").filter("name == '张三'")
    table.printSchema()
    //将数据写出去
    table.writeToSink(new CsvTableSink("./data/table.csv", ",", 1, FileSystem.WriteMode.OVERWRITE))
    //TODO 2:使用sql方式
    //    val sqlResult = tableEnvironment.sqlQuery("select id,name,subjectId,score from tableA where name='张三'")
    //    //将数据写出去
    //    sqlResult.writeToSink(new CsvTableSink("./data/table.csv", ",", 1, FileSystem.WriteMode.OVERWRITE))
    env.execute()
  }
}

17.3 流数据处理案例

流处理中也可以支持SQL。但是需要注意以下几点:

  1. 要使用流处理的SQL,必须要添加水印时间
  2. 使用registerDataStream注册表的时候,使用'来指定字段
  3. 注册表的时候,必须要指定一个rowtime,否则无法在SQL中使用窗口
  4. 必须要导入import org.apache.flink.table.api.scala._隐式参数
  5. SQL中使用tumble(时间列名, interval '时间' sencond)来进行定义窗口

示例

使用Flink SQL来统计5秒内用户的订单总数、订单的最大金额、订单的最小金额。

相关SQL

select
 userId,
 count(1) as totalCount,
 max(money) as maxMoney,
 min(money) as minMoney
from
 t_order
group by
 tumble(createTime, interval '5' second),
 userId

步骤

  1. 获取流处理运行环境
  2. 获取Table运行环境
  3. 设置处理时间为EventTime
  4. 创建一个订单样例类Order,包含四个字段(订单ID、用户ID、订单金额、时间戳)
  5. 创建一个自定义数据源
    • 使用for循环生成1000个订单
    • 随机生成订单ID(UUID)
    • 随机生成用户ID(0-2)
    • 随机生成订单金额(0-100)
    • 时间戳为当前系统时间
    • 每隔1秒生成一个订单
  6. 添加水印,允许延迟2秒
  7. 导入import org.apache.flink.table.api.scala._隐式参数
  8. 使用registerDataStream注册表,并分别指定字段,还要指定rowtime字段
  9. 编写SQL语句统计用户订单总数、最大金额、最小金额
    • 分组时要使用tumble(时间列, interval '窗口时间' second)来创建窗口
  10. 使用tableEnv.sqlQuery执行sql语句
  11. 将SQL的执行结果转换成DataStream再打印出来
  12. 启动流处理程序

代码

import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.commons.lang.time.FastDateFormat
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

import scala.util.Random

object StreamFlinkSqlDemo {
  // 创建一个订单样例类Order,包含四个字段(订单ID、用户ID、订单金额、时间戳)
  case class Order(id:String, userId:Int, money:Long, createTime:Long)

  def main(args: Array[String]): Unit = {
    // 获取流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置处理时间为EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 获取Table运行环境
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    // 创建一个自定义数据源
    val orderDataStream = env.addSource(new RichSourceFunction[Order] {
      override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
        // 使用for循环生成1000个订单
        for(i <- 0 until 1000) {
          // 随机生成订单ID(UUID)
          val id = UUID.randomUUID().toString
          // 随机生成用户ID(0-2)
          val userId = Random.nextInt(3)
          // 随机生成订单金额(0-100)
          val money = Random.nextInt(101)
          // 时间戳为当前系统时间
          val timestamp = System.currentTimeMillis()

          // 收集数据
          ctx.collect(Order(id, userId, money, timestamp))
          // 每隔1秒生成一个订单
          TimeUnit.SECONDS.sleep(1)
        }
      }

      override def cancel(): Unit = ()
    })

    // 添加水印,允许延迟2秒
    val watermarkDataStream: DataStream[Order] = orderDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Order] {
      var currentTimestamp: Long = _
      // 允许延迟2秒
      val delayTime = 2000

      // 生成一个水印数据
      override def getCurrentWatermark: Watermark = {
        // 减去两秒中,表示让window窗口延迟两秒计算
        val watermark = new Watermark(currentTimestamp - delayTime)
        val formater = FastDateFormat.getInstance("HH:mm:ss")

        println(s"水印时间: ${formater.format(watermark.getTimestamp)},事件时间:${formater.format(currentTimestamp)}, 系统时间:${formater.format(System.currentTimeMillis())}")
        watermark
      }

      // 表示从Order中获取对应的时间戳
      override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = {
        // 获取到Order订单事件的时间戳
        val timestamp = element.createTime
        // 表示时间轴不会往前推,不能因为某些数据延迟了,导致整个window数据得不到计算
        currentTimestamp = Math.max(currentTimestamp, timestamp)
        currentTimestamp
      }
    })

    // 使用registerDataStream注册表,并分别指定字段,还要指定rowtime字段
    // 导入import org.apache.flink.table.api.scala._隐式参数
    tableEnv.registerDataStream("t_order", watermarkDataStream, 'id, 'userId, 'money, 'createTime.rowtime)

    // 编写SQL语句统计用户订单总数、最大金额、最小金额
    // 分组时要使用tumble(时间列, interval '窗口时间' second)来创建窗口
    val sql =
      """
        |select
        | userId,
        | count(1) as totalCount,
        | max(money) as maxMoney,
        | min(money) as minMoney
        |from
        | t_order
        |group by
        | tumble(createTime, interval '5' second),
        | userId
      """.stripMargin

    // 使用tableEnv.sqlQuery执行sql语句
    val table: Table = tableEnv.sqlQuery(sql)
    table.printSchema()

    // 将SQL的执行结果转换成DataStream再打印出来
    tableEnv.toAppendStream[Row](table).print()

    // 启动流处理程序
    env.execute("StreamSQLApp")
  }
}

在SQL语句中,不要将名字取成SQL中的关键字,例如:timestamp。

如果你觉得这篇文章或者我分享的主题对你有帮助,请支持我继续更新网站和主题 !捐赠本站
喜欢 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址