欢迎来到天剑厚德,这里有最新的大数据技术原创分享。

Flink电商指标分析项目4

flink healthsun 0评论

1. Flink实时数据同步系统开发

1.1. binlog日志格式分析


测试日志数据

{
    "emptyCount": 2,
    "logFileName": "mysql-bin.000002",
    "dbName": "pyg",
    "logFileOffset": 250,
    "eventType": "INSERT",
    "columnValueList": [
        {
            "columnName": "commodityId",
            "columnValue": "1",
            "isValid": "true"
        },
        {
            "columnName": "commodityName",
            "columnValue": "耐克",
            "isValid": "true"
        },
        {
            "columnName": "commodityTypeId",
            "columnValue": "1",
            "isValid": "true"
        },
        {
            "columnName": "originalPrice",
            "columnValue": "888.0",
            "isValid": "true"
        },
        {
            "columnName": "activityPrice",
            "columnValue": "820.0",
            "isValid": "true"
        }
    ],
    "tableName": "commodity",
    "timestamp": 1553741346000
}

格式分析

字段以及说明

字段名称说明示例
emptyCount操作序号(第几条记录)12
logFileNamebinlog文件名mysql-bin.000001
dbName数据库名称pyg
logFileOffsetbinlog文件偏移位置100
eventType操作类型INSERT或UPDATE或DELETE
columnValueList列值列表{ “columnName”: “列名”,”columnValue”: “列值”,”isValid”: “是否有效”}
tableName表名commodity
timestamp执行时间戳1553701139000

1.2. Flink实时同步应用开发

整体架构

具体架构

  1. Flink对接Kafka
  2. 对数据进行预处理
  3. 将数据落地到hbase

数据同步说明

要确保hbase中的rowkey是唯一的,数据落地不能被覆盖

1.3. 实时数据同步项目初始化

在sync-db项目的scala目录中,创建以下包结构:

包名说明
com.itheima.syncdb.task存放所有flink任务代码
com.itheima.syncdb.util工具类
com.itheima.syncdb.bean存放实体类

步骤

  1. 资料\工具类\04.Flink数据同步系统目录的pom.xml文件中的依赖导入到sync-db项目的pom.xml
  2. sync-db模块添加scala支持
  3. main和test创建scala文件夹,并标记为源代码和测试代码目录
  4. 资料\工具类\04.Flink数据同步系统目录的application.conflog4j.properties配置文件
  5. 复制之前Flink项目中的GlobalConfigUtilHBaseUtil

1.4. Flink程序开发

步骤

  1. 编写App.scala,初始化Flink环境
  2. 运行Flink程序,测试是否能够消费到kafka中topic为canal的数据

App.scala

object FlinkUtils {

 // ---------------初始化FLink的流式环境--------------
  def initFlinkStreamEnv():StreamExecutionEnvironment={
   
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置处理的时间为EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 设置并行度
    env.setParallelism(1)
    // 添加Checkpoint的支持
    // 5s钟启动一次checkpoint
    env.enableCheckpointing(5000)
    // 设置checkpoint只checkpoint一次
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 设置两次checkpoint的最小时间间隔
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
    // 设置checkpoint的超时时长
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    // 最大并行度
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    // 当程序关闭时,触发额外的checkpoint
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    // 设置checkpoint的地址
    env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/"))

    env
  }

  /**
    * Kafka属性设置
    * @return
    */
  def getKafkaProperties(): Properties ={

    val properties = new Properties()
    properties.setProperty("bootstrap.servers" , GlobalConfigUtil.bootstrapServers)
    properties.setProperty("zookeeper.connect" , GlobalConfigUtil.zookeeperConnect)
    properties.setProperty("group.id" , GlobalConfigUtil.groupId)
    properties.setProperty("enable.auto.commit" , GlobalConfigUtil.enableAutoCommit)
    properties.setProperty("auto.commit.interval.ms" , GlobalConfigUtil.autoCommitIntervalMs)

    // 配置下次重新消费的话,从哪里开始消费
    // latest:从上一次提交的offset位置开始的
    // earlist:从头开始进行(重复消费数据)
    properties.setProperty("auto.offset.reset" , GlobalConfigUtil.autoOffsetReset)
    // 配置序列化和反序列化
    properties.setProperty("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer")

    properties
  }
}

1.4.1. 定义原始Canal消息样例类

步骤

  1. bean包下创建Canal原始消息映射样例类
  2. Cannal样例类中编写apply方法,使用FastJSON来解析数据,转换为Cannal样例类对象
  3. 编写main方法测试是否能够成功构建样例类对象

参考代码

case class Canal(var emptyCount:Long,
                 var logFileName:String,
                 var dbName:String,
                 var logFileOffset:Long,
                 var eventType:String,
                 var columnValueList:String,
                 var tableName:String,
                 var timestamp:Long)

object Canal {
  def apply(json:String): Canal = {
    val canal: Canal = JSON.parseObject[Canal](json,classOf[Canal])
    canal
  }

  def main(args: Array[String]): Unit = {
    val json = "{\"emptyCount\":2,\"logFileName\":\"mysql-bin.000002\",\"dbName\":\"pyg\",\"logFileOffset\":250,\"eventType\":\"INSERT\",\"columnValueList\":[{\"columnName\":\"commodityId\",\"columnValue\":\"1\",\"isValid\":\"true\"},{\"columnName\":\"commodityName\",\"columnValue\":\"耐克\",\"isValid\":\"true\"},{\"columnName\":\"commodityTypeId\",\"columnValue\":\"1\",\"isValid\":\"true\"},{\"columnName\":\"originalPrice\",\"columnValue\":\"888.0\",\"isValid\":\"true\"},{\"columnName\":\"activityPrice\",\"columnValue\":\"820.0\",\"isValid\":\"true\"}],\"tableName\":\"commodity\",\"timestamp\":1553741346000}"
    println( Canal(json).dbName)
  }
}

1.4.2. 解析Kafka数据流为Canal样例类

步骤

  1. map算子将消息转换为Canal样例类对象
  2. 打印测试,如果能输出以下信息,表示成功
Canal(mysql-bin.000002,1893,pyg,commodity,DELETE,[{"isValid":"false","columnValue":"5","columnName":"commodityId"},{"isValid":"false","columnValue":"索菲亚","columnName":"commodityName"},{"isValid":"false","columnValue":"3","columnName":"commodityTypeId"},{"isValid":"false","columnValue":"35000.0","columnName":"originalPrice"},{"isValid":"false","columnValue":"30100.0","columnName":"activityPrice"}],3,1553743567000)
HBaseOperation(DELETE,commodity,info,5,,)

代码:

// 将kafka中拿到的JSON字符串canal消息,转换为Canal样例类
val canalDataStream: DataStream[Canal] = kafkaDataStream.map {
    json =>
    	Canal(json)
}

1.4.3. 添加水印支持

步骤

  1. 使用Canal中的timestamp字段,生成水印数据
  2. 重新运行Flink,打印添加水印后的数据
// -----------添加水印支持--------------------
var watermarkDataStrem = canalDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Canal] {
    var currentTimeStamp = 0l
    // 延迟时间
    var maxDelayTime = 2000l

    // 获取当前时间戳
    override def getCurrentWatermark: Watermark = {
        new Watermark(currentTimeStamp - maxDelayTime)
    }

    // 获取事件时间
    override def extractTimestamp(element: Canal, previousElementTimestamp: Long): Long = {
        currentTimeStamp = Math.max(element.timestamp, previousElementTimestamp)
        currentTimeStamp
    }
})

1.4.4. 定义HBaseOperation样例类

HbaseOperation样例类主要封装对Hbase的操作,主要封装以下字段:

  • 操作类型(opType)= INSERT/DELETE/UPDATE
  • 表名(tableName)= mysql.binlog数据库名.binlog表名
  • 列族名(cfName)= 固定为info
  • rowkey = 唯一主键(取binlog中列数据的第一个)
  • 列名(colName)= binlog中列名
  • 列值(colValue)= binlog中列值

参考代码

/**
  * HBase操作样例类
  * 
  * 操作类型(opType) = INSERT/DELETE/UPDATE
  * 表名(tableName)= binlog数据库名_binlog表名
  * 列蔟名(cfName)= 固定为info
  * rowkey = 唯一主键(去binlog中列数据的第一个)
  * 列名(colName)= binlog中列名
  * 列值(colValue)= binlog中列值
  */
case class HBaseOperation(
                           var opType: String,
                           var tableName: String,
                           var cfName: String,
                           var rowkey: String,
                           var colName: String,
                           var colValue: String
                         )

1.4.5. 将Canal样例类转换为HBaseOperation样例类

一个binlog消息中,有会有多个列的操作。它们的映射关系如下:

我们可以使用flatMap算子,来生成一组HBaseOperation操作

步骤

  1. 创建一个预处理任务对象
  2. 使用flatMap对水印数据流转换为HBaseOperation
    • 根据eventType分别处理HBaseOperation列表
    • 生成的表名为mysql.数据库名.表名
    • rowkey就是第一个列的值
    • INSERT操作 -> 将所有列值转换为HBaseOperation
    • UPDATE操作 -> 过滤掉isValid字段为false的列,再转换为HBaseOperation
    • DELETE操作 -> 只生成一条DELETE的HBaseOperation的List

INSERT操作记录

序号说明
eventTypeINSERT表示这是一个插入操作
columnValueListJSON数组columnValueList中,isValid总是为true

UPDATE操作记录

序号说明
eventTypeUPDATE表示这是一个更新操作
columnValueListJSON数组columnValueList中,更新的isValid字段为true,未更新的为false

DELETE操作记录

序号说明
eventTypeDELETE表示这是一个删除操作
columnValueListJSON数组columnValueList中,isValid总是为false

实现

  1. task包下创建PreprocessTask单例对象,添加process方法
  2. 使用flatMap对Canal样例类进行扩展
  3. 使用FastJSON解析Canal样例类中的列值列表数据,并存储到一个Seq中
  4. 遍历集合,构建HBaseOperation样例类对象
  5. 打印测试
  6. 启动Flink验证程序是否正确处理

JSON字符串转List

List<T> parseArray(String text, Class<T> clazz)

classOf[T]: 获取class对象

Java的List转Scala的集合

注意要导入: import scala.collection.JavaConverters._

var scalaList: mutable.Buffer[T] = javaList.asScala

参考代码

package com.abc.syncdb.task

import java.util
import com.alibaba.fastjson.{JSON, JSONArray}
import com.itheima.syncdb.bean.{Canal, HBaseOperation}
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.api.scala._
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

/**
  * 封装列名、列值、是否有效
  *
  * @param columnName     列名
  * @param columnValue    列值
  * @param isValid        是否有效
  */
case class ColumnValuePair(
                            var columnName: String,
                            var columnValue: String,
                            var isValid: Boolean
                          )

/**
  * 预处理任务
  * 把Canal-> HBaseOperation
  */
object PreprocessTask {

  def process(watermarkDataStream: DataStream[Canal]) = {

    watermarkDataStream.flatMap {
      canal => {
        // 转换json串为集合
        val valuePairs: util.List[ColumnValuePair] = JSON.parseArray[ColumnValuePair](canal.columnValueList, classOf[ColumnValuePair])
        // java的集合转换为scala集合 需要import scala.collection.JavaConverters._
        var colValueList: mutable.Buffer[ColumnValuePair] = valuePairs.asScala

        // 获取操作类型
        val opType = canal.eventType
        // 生成的表名为mysql.数据库名.表名
        val tableName = s"mysql.${canal.dbName}.${canal.tableName}"
        // 列族名称
        val cfName = "info"
        // 获取mysql中的主键,获取第一列的值
        val rowkey = colValueList(0).columnValue

        canal.eventType match {
          case "INSERT" =>
            // 如果是INSERT操作,将每一个列值对转换为一个HBaseOperation
            colValueList.map{
              colValue =>
                HBaseOperation(opType, tableName, cfName, rowkey, colValue.columnName, colValue.columnValue)
            }
          case "UPDATE" =>
            // UPDATE操作 -> 过滤出来isValid字段为true的列,再转换为HBaseOperation
            colValueList.filter(_.isValid).map {
              colValue =>
                HBaseOperation(opType, tableName, cfName, rowkey, colValue.columnName, colValue.columnValue)
            }
          case "DELETE" =>
            // DELETE操作 -> 只生成一条DELETE的HBaseOperation的List
            List(HBaseOperation(opType, tableName, cfName, rowkey, "", ""))
        }
      }
    }
  }
}

1.4.6. Flink数据同步到hbase

步骤

  1. 分两个落地实现,一个是delete,一个是insert/update(因为hbase中只有一个put操作,所以只要是insert/update都转换为put操作)
  2. 启动hbase
  3. 启动flink测试

参考代码

// 落地HBase    
val operationDataStream: DataStream[HBaseOperation] = PreprocessTask.process(watermarkData)
operationDataStream.addSink(new SinkFunction[HBaseOperation] {
    override def invoke(value: HBaseOperation): Unit = {
        value.opType match {
            case "DELETE" =>
            HBaseUtil.deleteData(value.tableName, value.rowkey, value.cfName)
            case _ =>  // INSERT/UPDATE
            HBaseUtil.putData(value.tableName, value.rowkey, value.cfName, value.colName, value.colValue)
        }
    }
})

1.4.7. 验证Flink同步数据功能

步骤

  1. 启动mysql
  2. 启动canal
  3. 启动zookeeper集群
  4. 启动kafka集群
  5. 启动hdfs集群
  6. 启动hbase集群
  7. 启动Flink数据同步程序
  8. 启动Canal数据同步程序
  9. 在mysql中执行insert、update、delete语句,查看hbase数据是否落地

2. Flink离线分析系统开发

2.1. 案例介绍

  • 使用之前开发的数据库实时同步系统,将mysql中的数据实时同步到hbase
  • 使用Flink程序开发批处理应用,对hbase中的数据进行离线分析

2.2. 主要业务数据

订单业务数据表orderRecord介绍

字段名称说明
orderId订单ID
userId⽤户ID
merchantId商家ID
orderAmount下单金额
payAmount支付金额
payMethod支付方式
payTime支付时间
benefitAmount红包金额
voucherAmount代⾦券金额
commodityId产品id
activityNum活动编号(大于0代表有活动)
createTime创建时间

2.3. 业务目标

  • 按照天、月、年维度统计不同支付方式订单数量支付金额
  • 按照天、月、年维度统计不同商家订单数量支付金额

2.4. 构建Flink批处理项目结构

2.4.1. 导入Maven依赖

  • 导入flink离线计算案例集\资料\工具类中的pom.xml依赖到batch-process模块中
  • 分别在maintest目录创建scala目录,并分别标记为源码目录和测试源码目录

2.4.2. 创建包结构

包名说明
com.itheima.batch_process.task存放离线分析业务任务
com.itheima.batch_process.bean存在样例类
com.itheima.batch_process.util存放工具类

2.5. 初始化Flink批处理运行环境

步骤

  1. 获取Flink批处理运行环境ExecutionEnvironment
  2. 使用fromCollection构建测试数据
  3. 调用print操作
  4. 执行测试
// 构建一个批处理的运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(1)
// 测试输出
val value: DataSet[String] = env.fromCollection(List("1","2","3"))
value.print()

2.7. 导入测试数据

步骤

  1. 启动Flink实时同步系统
  2. 启动canal
  3. 导入资料\sql脚本中的测试数据
  4. 检查数据是否已经成功导入到hbase中

2.8. 整合HBase

步骤

  1. 导入资料\工具类中的log4j.propertieshbase-site.xml文件
  2. 导入HBaseUtil.scala工具类`
  3. 导入ItheimaAbstractTableInputFormat.java
  4. 导入 ItheimaTableInputFormat.java
  5. 导入ItheimaTableInputSplit.java
  6. 编写HBaseTableInputFormat.scala去读取HBase数据
  7. 使用env.createInput构建一个HBase的DataSet,将orderRecord表中的数据读取出来
  8. 测试打印是否能够正确读取到HBase表中的数据

实现

HBaseTableInputFormat.scala

package com.abc.batch_process.util

import com.alibaba.fastjson.serializer.SerializerFeature
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.hbase.{CellUtil}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.flink.api.java.tuple.Tuple2

/**
  * 操作HBase的表
  * 需要继承 TableInputFormat
  * 由于flink-hbase_2.11_1.6.1 jar包引用的是hbase1.4.3版本,我们现在用的是hbase2.0,版本不匹配,我们重写了相关类
  *
  * @param tableName
  */
class HBaseTableInputFormat(var tableName: String) extends ItheimaTableInputFormat[Tuple2[String, String]] {


  /**
    * 返回操作HBase表的Scan对象
    */
  override def getScanner: Scan = {
    scan = new Scan()
    scan
  }

  /**
    * 返回表名
    */
  override def getTableName: String = {
    tableName
  }

  /**
    *
    * @param result
    */
  override def mapResultToTuple(result: Result): Tuple2[String, String] = {
    // 获取rowkey
    val rowkey = Bytes.toString(result.getRow)
    // 获取列单元格
    val cellArray = result.rawCells()
	// 构造JSONObject
    val jsonObject = new JSONObject
    for (i <- 0 until cellArray.size) {
      // 列族名
      val columnFamilyName = Bytes.toString(CellUtil.cloneFamily(cellArray(i)))
      // 列名
      val columnName = Bytes.toString(CellUtil.cloneQualifier(cellArray(i)))
      // 列值
      val value = Bytes.toString(CellUtil.cloneValue(cellArray(i)))
      // 插入列名和列值到JSONObject
      jsonObject.put(columnName, value)
    }

	// 返回元组
    new Tuple2[String, String](rowkey, jsonObject.toString)
  }

  override def close(): Unit = {
    if (table != null) table.close()
  }
}

App.scala

// 加载批处理环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 构建DataSet
val tupleDataSet: DataSet[tuple.Tuple2[String, String]] = env.createInput(new HBaseTableInputFormat("orderRecord"))
tupleDataSet.print()

2.9. JSON数据封装

将整合好HBase后获取到的JSON字符串,通过转换映射成scala的样例类

步骤

  1. 创建样例类OrderRecord用来封装对接HBase后获取到的JSON数据
  2. 并在OrderRecord伴生对象中创建apply方法解析JSON数据
  3. 编写main方法测试
  4. 使用map操作将数据转换为样例类
  5. 启动Flink程序测试

实现

OrderRecord样例类

package com.abc.batch_process.bean

import com.alibaba.fastjson.JSON

case class OrderRecord(benefitAmount:Double,  // 红包金额
                       orderAmount:Double,    // 订单金额
                       payAmount:Double,      // 支付金额
                       activityNum:String,    // 获取ID
                       createTime:String,     // 订单创建时间
                       merchantId:String,     // 商家ID
                       orderId:String,        // 订单ID
                       payTime:String,        // 支付时间
                       payMethod:String,      // 支付方式
                       voucherAmount:String,  // 优惠券金额
                       commodityId:String,    // 产品ID
                       userId:String)        // 用户ID

object OrderRecord {
  def apply(json:String): OrderRecord = {
    JSON.parseObject[OrderRecord](json,classOf[OrderRecord])
  }

  def main(args: Array[String]): Unit = {
    // 测试apply方法是否能够封装JSON数据
    val json = "{\"benefitAmount\":\"20.0\",\"orderAmount\":\"300.0\",\"payAmount\":\"457.0\",\"activityNum\":\"0\",\"createTime\":\"2018-08-13 00:00:06\",\"merchantId\":\"1\",\"orderId\":\"99\",\"payTime\":\"2018-08-13 00:00:06\",\"payMethod\":\"1\",\"voucherAmount\":\"20.0\",\"commodityId\":\"1101\",\"userId\":\"4\"}"
    println(OrderRecord(json))
  }
}

App.scala

val env = ExecutionEnvironment.getExecutionEnvironment

val tupleDataSet: DataSet[tuple.Tuple2[String, String]] = env.createInput(new HBaseTableInputFormat("orderRecord"))

val orderDataSet: DataSet[OrderRecord] = tupleDataSet.map {
    tuple => OrderRecord(tuple.f1)
}
orderDataSet.print()

2.10. 一般业务流程

2.11. 数据预处理

以为后续需要按照不同的时间维度来对订单数据进行分析,需要拓宽以下字段

字段名说明
yearMonthDay年/月/天
yearMonth年/月
year

步骤

  1. 创建OrderRecordWide样例类,添加上述需要拓宽的字段
  2. 创建PreprocessTask单例对象,编写process方法来进行字段拓宽
  3. 创建formatDateTime方法,用于进行时间格式转换
    • 将字符串时间转换为时间戳
    • 再转换为其他类型格式
  4. 重新运行Flink程序测试

参考代码

PreProcessTask.scala

object PreProcessTask {

  def process(orderDataSet:DataSet[OrderRecord]) = {
    orderDataSet.map{
      order => {
          OrderRecordWide(order.orderId,
              order.userId,
              order.merchantId,
              order.orderAmount,
              order.payAmount,
              order.payMethod,
              order.payTime,
              order.benefitAmount,
              order.voucherAmount,
              order.commodityId,
              order.activityNum,
              order.createTime,
            formatDateTime(order.payTime, "yyyyMMdd"),
            formatDateTime(order.payTime, "yyyyMM"),
            formatDateTime(order.payTime, "yyyy")
          )
      }
    }
  }

  def formatDateTime(date:String, format:String) = {
    val timestampFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

    val timestamp = timestampFormat.parse(date).getTime

    val formatDate = FastDateFormat.getInstance(format)
    formatDate.format(timestamp)
  }

  def main(args: Array[String]): Unit = {
    println(formatDateTime("2018-11-28 00:00:00", "yyyy-MM"))
    println(formatDateTime("2018-11-28 00:00:00", "yyyy-MM-dd"))
    println(formatDateTime("2018-11-28 00:00:00", "yyyy-MM-dd-HH"))
  }
}

2.12. 统计不同支付方式的商家的订单数量、支付金额

按照天、月、年的维度来统计出不同支付方式的订单数量、支付金额

统计结果

时间支付方式订单数量支付金额
2020-11-11支付宝12381238
2020-11微信123551234555
2020银联1555511235555555

步骤

  1. 创建PaymethodMoneyCountTask单例对象
  2. 创建样例类PaymethodMoneyCount,包含以下字段:支付方式、时间、订单数、支付金额
  3. 使用flatMap操作生成不同维度的数据
  4. 使用groupBy按照支付方式日期进行分组
  5. 使用reduceGroup进行聚合计算
  6. 测试:打印结果
  7. 将使用collect收集计算结果,并转换为List
  8. 使用foreach将数据下沉到HBase的analysis_payment表中
package com.abc.batch_process.task

import com.abc.batch_process.bean.OrderRecordWide
import com.abc.batch_process.util.HBaseUtil
import org.apache.flink.api.scala.DataSet
import org.apache.flink.api.scala._

// 设计一个样例类来保存统计的数据
case class PaymethodMoneyCount(paymethod:String, date:String, money:Double, count:Long)

/**
  * 按照不同的时间维度来统计不同支付方式的订单金额、订单数量
  */
object PaymethodMoneyCountTask {

  def process(orderRecordWide:DataSet[OrderRecordWide]) = {
    // Money => Mny
    // Count => cnt
    // Message => msg
    val paymethodMnyCnt = orderRecordWide.flatMap{
      order =>
        // 因为当前要统计不同时间维度的数据
        // 使用flatMap来将一条数据转换为不同维度的多条数据
        List(
          PaymethodMoneyCount(order.payMethod, order.yearMonthDay, order.payAmount, 1), // 天维度
          PaymethodMoneyCount(order.payMethod, order.yearMonth, order.payAmount, 1), // 月维度
          PaymethodMoneyCount(order.payMethod, order.year, order.payAmount, 1)   // 年维度
        )
    }

    // 按照指定的字段进行分组
    val groupedDataSet = paymethodMnyCnt.groupBy{
      pay =>
        pay.paymethod + pay.date
    }

    // 进行聚合计算
    val reducedDataSet = groupedDataSet.reduce{
      (p1, p2) =>
        PaymethodMoneyCount(p2.paymethod, p2.date, p1.money + p2.money, p1.count + p2.count)
    }

    // 打印测试
    reducedDataSet.print()

    // 将结果数据保存到hbase中
    reducedDataSet.collect().foreach{
      pay =>
        // 构建hbase所必须的参数
        val tableName = "analysis_payment"
        val rowkey = pay.paymethod + ":" + pay.date
        val cfName = "info"
        val paymethodColName = "paymethod"
        val dateColName = "date"
        val totalMoneyColName = "totalMoney"
        val totalCountColName = "totalCount"

        // 将Flink计算后的结果写入到hbase
        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          paymethodColName -> pay.paymethod,
          dateColName -> pay.date,
          totalMoneyColName -> pay.money,
          totalCountColName -> pay.count
        ))
    }
  }
}

2.13. 统计天、月、年维度商家的订单数量、支付金额

根据不同商家,按照天、月、年、所有维度来进行统计订单数量、支付金额

统计结果

商家ID时间订单数量支付金额
12018-10-0112381238
22018-10123551234555
320181555511235555555

步骤

  1. 创建MerchantCountMoneyTask单例对象
  2. 创建样例类MerchantCountMoney,包含以下字段:商家ID、时间、订单数、支付金额
  3. 使用flatMap操作生成不同维度的数据
  4. 使用groupBy按照商家ID日期进行分组
  5. 使用reduceGroup进行聚合计算
  6. 测试:打印结果
  7. 将使用collect收集计算结果,并转换为List
  8. 使用foreach将数据下沉到HBase的analysis_merchant表中

参考代码

package com.abc.batch_process.task

import com.abc.batch_process.bean.OrderRecordWide
import com.abc.batch_process.util.HBaseUtil
import org.apache.flink.api.scala.DataSet
import org.apache.flink.api.scala._

/**
  * 1. 创建样例类 MerchantCountMoney
  * 商家ID, 时间,支付金额,订单数量,
  *
  * 2. 编写process方法  *
  * 转换  flatmap  *
  * 分组 groupby  *
  * 聚合 reduce  *
  * 落地保存到HBase  *
  */

case class MerchantCountMoney(
                               var merchantId: String,
                               var date: String,
                               var amount: Double,
                               var count: Long
                             )

object MerchantCountMoneyTask {

  def process(wideDataSet: DataSet[OrderRecordWide]) = {
    //转换  flatmap
    val mapDataSet: DataSet[MerchantCountMoney] = wideDataSet.flatMap {
      orderRecordWide => {
        List(
          MerchantCountMoney(orderRecordWide.merchantId, orderRecordWide.yearMonthDay, orderRecordWide.payAmount.toDouble, 1),
          MerchantCountMoney(orderRecordWide.merchantId, orderRecordWide.yearMonth, orderRecordWide.payAmount.toDouble, 1),
          MerchantCountMoney(orderRecordWide.merchantId, orderRecordWide.year, orderRecordWide.payAmount.toDouble, 1)
        )
      }
    }

    //分组 groupby
    val groupDataSet: GroupedDataSet[MerchantCountMoney] = mapDataSet.groupBy {
      merchant => (merchant.merchantId + merchant.date)
    }

    // 聚合
    val reduceDataSet: DataSet[MerchantCountMoney] = groupDataSet.reduce {
      (p1, p2) => {
        MerchantCountMoney(p1.merchantId, p1.date, p1.amount + p2.amount, p1.count + p2.count)
      }
    }

    // 保存到HBase中
    reduceDataSet.collect().foreach {
      merchant => {
        // HBase相关字段
        val tableName = "analysis_merchant"
        val rowkey = merchant.merchantId + ":" + merchant.date
        val clfName = "info"

        val merchantIdColumn = "merchantId"
        val dateColumn = "date"
        val amountColumn = "amount"
        val countColumn = "count"

        HBaseUtil.putMapData(tableName, rowkey, clfName, Map(
          merchantIdColumn -> merchant.merchantId,
          dateColumn -> merchant.date,
          amountColumn -> merchant.amount,
          countColumn -> merchant.count
        ))
      }
    }
  }
}

3. 项目总结

项目简介

项目名称: 品优购电商指标分析

开发语言: Java、scala

开发框架: Spring Boot、Hadoop、HBase、Kafka、Flink、Canal

子模块: 上报服务系统、实时分析系统、数据库同步系统、实时同步系统、离线分析系统

流程图

模块概述

上报服务系统

  • 上报服务系统是由SpringBoot技术开发的JavaWeb应用,SpringBoot是一个快速开发Spring应用的框架
  • 电商的WEB页面会通过HTTP的方式提交用户的点击流日志到上报服务系统
  • 接收用户数据经过转换处理之后发送到Kafka

实时分析系统

  • 使用Flink的流式计算开发的业务分析系统
  • 实时读取Kafka的数据,进行处理
  • 内部处理方式: 原始数据拓宽->转换->分组->时间窗口->聚合->落地HBase

数据库同步系统

  • 使用Java开发, 主要连接canal和kafka,进行mysql数据的传输
  • 通过canal取读取mysql的binlog文件
  • 解析binlog,转换成JSON格式的数据存入Kafka

实时同步系统

  • 使用Java开发
  • 实时接收kafka的数据
  • 获取到mysql的数据
  • 转换存储到HBase

离线分析系统

  • 使用Flink的批处理方式
  • 批量读取HBase的数据,进行业务分析
  • 内部处理方式: 原始数据拓宽->转换->分组->聚合->落地HBase

Web可视化平台

如果你觉得这篇文章或者我分享的主题对你有帮助,请支持我继续更新网站和主题 !捐赠本站
喜欢 (1)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址