欢迎来到天剑厚德,这里有最新的大数据技术原创分享。

Flink电商指标分析项目2

flink healthsun 0评论

1. HBaseUtil工具类

前面我们实现了Flink整合Kafka,可以从Kafka中获取数据进行分析,分析之后我们要把结果存入HBase中,为了方便操作,我们先提前编写一个操作HBase的工具类。HBase作为一个数据库,我们肯定要进行数据的增删改查,那么我们就围绕这几个操作进行开发.

1.1. API介绍

先来看下我们要编写的工具类的所有方法,我们先明确了目标,再进行开发。

方法名用途参数说明返回值
getTable创建/获取表tableNameStr:表名 columnFamily:列族名HBase Table对象
putData插入/更新一列数据tableNameStr: 表名 rowkey:String rowkey columnFamily:列族名 column:String 列名 data:String 列值
putMapData插入/更新多个列数据tableNameStr: 表名 rowkey:String rowkey columnFamily:列族名 mapData:列名/列值
getData根据rowkey,列族+列名 获取单列数据tableNameStr: 表名 rowkey:String rowkey columnFamily:列族名 column:列名列对应的数据 String类型
getMapData根据rowkey,列族+列名集合 获取多列数据tableNameStr: 表名 rowkey:String rowkey columnFamily:列族名 column:列名集合列对应的数据 Map[列名, 列值]
deleteData根据rowkey删除一条数据tableNameStr: 表名 rowkey:rowkey columnFamily: 列族名

HBase操作基本类

类名用途获取方式
ConfigurationHBase的配置类HBaseConfiguration.create
Connection连接ConnectionFactory.createConnection(conf)
AdminHBase的操作APIConnection.getAdmin
Table用来链接HBase的单表Connection.getTable()
Get用来查询HBase的单行数据new Get(rowkey.getBytes())
Put保存单行数据new Put(rowkey.getBytes())
Delete删除单行数据new Delete(rowkey.getBytes())

1.2. 获取表

开发步骤:

  1. 将导入hbase-site.xml配置文件到resources目录
  2. util包中添加HBaseUtil
    • 使用HBaseConfiguration.create获取配置对象Configuration,该配置对象会自动加载hbase-site.xml
    • 使用ConnectionFactory.createConnection获取hbase连接
    • 使用Connection.getAdmin获取与master的连接
  3. 创建getTable方法
    • 构建TableName
    • 构建TableDescriptorBuilder
    • 构建ColumnFamilyDescriptor
    • 添加列族
    • 检查表是否存在,若不存在,则创建表

源码解析:

我们可以围绕Admin来展开源码的查看,请观察下图,通过源码的注释,我们可以很简单的找到该编写的代码。

示例代码:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client._

object HBaseUtil {

  // 读取配置文件
  val conf: Configuration = HBaseConfiguration.create

  // 创建连接
  val conn: Connection = ConnectionFactory.createConnection(conf)

  // 创建HBase管理类
  val admin: Admin = conn.getAdmin

  /**
    * 获得表,如果表不存在则创建
    *
    * @param tableNameStr     表名
    * @param columnFamilyName 列族名
    * @return
    */
  def getTable(tableNameStr: String, columnFamilyName: String): Table = {

    // 获取 TableName
    val tableName: TableName = TableName.valueOf(tableNameStr)

    //判断表是否可用
    if (!admin.tableExists(tableName)) {
      // 构建TableDescriptorBuilder
      val tableDescriptor: TableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName)

      // 构建列族 ColumnFamilyDescriptor
      val columnFamily: ColumnFamilyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(columnFamilyName.getBytes).build()
      tableDescriptor.setColumnFamily(columnFamily)
      // 创建表
      admin.createTable(tableDescriptor.build())
    }

    // 返回表
    conn.getTable(tableName)
  }

1.3. 存储数据

创建putData方法

  • 调用getTable获取表
  • 构建Put对象
  • 添加列、列值
  • 对table执行put操作
  • 启动编写main进行测试
/**
    * 保存数据
    *
    * @param tableNameStr     表名
    * @param rowkey           rowkey
    * @param columnFamilyName 列族名
    * @param columnName       列名
    * @param columnValue      列值
    */
def putData(tableNameStr: String,
            rowkey: String,
            columnFamilyName: String,
            columnName: String,
            columnValue: String
           ) = {
    val table = getTable(tableNameStr, columnFamilyName)

    try {
        // Put
        val put = new Put(rowkey.getBytes())
        // 参数1: 列族 参数2: 列名 参数3: 列值
        put.addColumn(columnFamilyName.getBytes, columnName.getBytes, columnValue.getBytes)
        // 保存数据
        table.put(put)
    } catch {
        case ex: Exception => {
            ex.printStackTrace()
        }
    } finally {
        // 关闭table
        table.close()
    }
}

1.4. 获取数据

  1. 使用Connection获取表
  2. 创建getData方法
    • 调用getTable获取表
    • 构建Get对象
    • 对table执行get操作,获取result
    • 使用Result.getValue获取列族列对应的值
    • 捕获异常
    • 关闭表
  3. 启动hhbase
  4. 启动编写main进行测试
/**
    * 查询数据
    *
    * @param tableNameStr     表名
    * @param rowkey           rowkey
    * @param columnFamilyName 列族名
    * @param columnName       列名
    */
def getData(tableNameStr: String,
            rowkey: String,
            columnFamilyName: String,
            columnName: String) = {

    val table: Table = getTable(tableNameStr, columnFamilyName)

    try {

        // Get对象
        val get: Get = new Get(rowkey.getBytes)
        // 查询结果
        val result = table.get(get)
        // 结果不为空, 并且结果包含我们要查询的列
        if (result != null && result.containsColumn(columnFamilyName.getBytes, columnName.getBytes)) {
            // - 使用Result.getValue获取列族列对应的值
            val colValueBytes: Array[Byte] = result.getValue(columnFamilyName.getBytes, columnName.getBytes)
            Bytes.toString(colValueBytes)
        }else {
        ""
      	}
    } catch {
        case ex: Exception =>
        ex.printStackTrace()
        ""
    } finally {
        // - 关闭表
        table.close()
    }
}

1.5. 批量存储数据

创建putMapData方法

  • 调用getTable获取表
  • 构建Put对象
  • 添加Map中的列、列值
  • 对table执行put操作
  • 捕获异常
  • 关闭表
  • 启动编写main进行测试
/**
    * 批量存储数据
    *
    * @param tableNameStr     表名
    * @param rowkey           rowkey
    * @param columnFamilyName 列族名
    * @param map              key/value集合
    */
def putMapData(tableNameStr: String,
               rowkey: String,
               columnFamilyName: String,
               map: Map[String, Any]
              ) = {
    val table = getTable(tableNameStr, columnFamilyName)

    try {
        //- 添加Map中的列、列值
        for ((columnName, columnValue) <- map) {
            // Put
            val put = new Put(rowkey.getBytes())
            put.addColumn(columnFamilyName.getBytes, columnName.getBytes, columnValue.toString.getBytes)
            table.put(put)
        }

    } catch {
        case ex: Exception => {
            ex.printStackTrace()
        }
    } finally {
        table.close()
    }
}

1.6. 批量获取数据

创建putMapData方法

  • 调用getTable获取表
  • 构建Get对象
  • 根据Get对象查询表
  • 构建可变Map
  • 遍历查询各个列的列值
  • 过滤掉不符合的结果
  • 把结果转换为Map返回
  • 捕获异常
  • 关闭表
  • 启动编写main进行测试
/**
    * 批量获取列的数据
    *
    * @param tableNameStr 表名
    * @param rowkey       rowkey
    * @param columnFamily 列族
    * @param columnList   列的名字列表
    * @return
    */
def getMapData(tableNameStr: String, rowkey: String, columnFamily: String, columnList: List[String]): Map[String, String] = {
    // 根据表名和列族名获取表
    val table: Table = getTable(tableNameStr, columnFamily)

    try {
        // 创建Get对象
        val get: Get = new Get(rowkey.getBytes)

        // 根据Get对象查询表
        val result: Result = table.get(get)

        // 构建可变Map
        val valueMap = collection.mutable.Map[String, String]()

        // 遍历要查询的列集合
        columnList.map {
            col =>
            // 从表查询结果中,根据列族,列名提取出列值
            val values: Array[Byte] = result.getValue(columnFamily.getBytes, col.getBytes)

            // 列值不为空 , 则设置 k-> v
            if (values != null && values.size > 0) {
                col -> Bytes.toString(values)
            }
            else {
                "" -> ""
            }
        }.filter(_._1 != "").toMap  // 过滤掉key为""的内容,并转换为Map类型
    } catch {
        case e: Exception =>
        e.printStackTrace()
        Map[String, String]()
    } finally {
        table.close()
    }
}

1.7. 删除数据

创建deleteData方法

  • 调用getTable获取表
  • 构建Delete对象
  • 对table执行delete操作
  • 捕获异常
  • 关闭表
  • 启动编写main进行测试
/**
    * 删除数据
    *
    * @param tableNameStr     表名
    * @param columnFamilyName 列族名
    * @param rowkey           rowkey
    */
def deleteData(tableNameStr: String,
               columnFamilyName: String,
               rowkey: String
              ) = {
    // 获取表
    val table = getTable(tableNameStr, columnFamilyName)
    try {
        // 创建Delete对象
        val delete = new Delete(rowkey.getBytes)
        table.delete(delete)
    } catch {
        case ex: Exception =>
        ex.printStackTrace()
    } finally {
        table.close()
    }
}

main方法测试代码

object HBaseUtil {
    
    def main(args: Array[String]): Unit = {
    // 测试存入单列数据
    putData("test", "123", "info", "t1", "hello world")

    // 测试存入多列数据
    val map = Map(
      "t2" -> "scala",
      "t3" -> "hive",
      "t4" -> "sqoop"
    )
    putMapData("test", "123", "info", map)

    println(getData("test", "123", "info", "t1"))
    println(getData("test", "123", "info", "t2"))
    println(getData("test", "123", "info", "t3"))
    println(getData("test", "123", "info", "t4"))
        
    println(getMapData("test", "123", "info", List("t1", "t2","t3","t4")))

    deleteData("test", "123", "info")
  }
}

2. 实时数据分析业务目标

  • 完成点击流日志数据预处理业务开发
  • 完成实时频道热点分析业务开发
  • 完成实时频道PV/UV分析业务开发
  • 完成实时频道用户新鲜度分析业务开发
  • 完成实时频道地域分析业务开发

3. 业务开发一般流程

一般流程

  1. 先对原始数据进行拓宽预处理
  2. 将拓宽后的数据转换为要进行分析业务字段
  3. 按照指定字段进行分组
  4. 将组内数据划分到窗口中
  5. 聚合计算
  6. 将数据落地到hbase

预处理的过程非常关键,它是业务的源头,如果预处理做得很好,后续业务的开发会变得简单很多

4. 点击流日志实时数据预处理

4.1. 业务分析

为了方便后续分析,我们需要对点击流日志,使用Flink进行实时预处理。在原有点击流日志的基础上添加一些字段,方便进行后续业务功能的统计开发。

以下为Kafka中消费得到的原始点击流日志字段:

字段名说明
channelID频道ID
categoryID产品类别ID
produceID产品ID
country国家
province省份
city城市
network网络方式
source来源方式
browserType浏览器类型
entryTime进入网站时间
leaveTime离开网站时间
userID用户的ID

我们需要在原有点击流日志字段基础上,再添加以下字段:

字段名说明
count用户访问的次数
timestamp用户访问的时间
address国家省份城市(拼接)
yearMonth年月
yearMonthDay年月日
yearMonthDayHour年月日时
isNew是否为访问某个频道的新用户
isHourNew在某一小时内是否为某个频道的新用户
isDayNew在某一天是否为某个频道的新用户
isMonthNew在某一个月是否为某个频道的新用户

我们不能直接从点击流日志中,直接计算得到上述后4个字段的值。而是需要在hbase中有一个历史记录表,来保存用户的历史访问状态才能计算得到。

该历史记录表(user_history表)结构如下:

列名说明示例
rowkey用户ID:频道ID10:220
userid用户ID10
channelid频道ID220
lastVisitedTime最后访问时间(时间戳)1553653555

4.2. 创建ClickLogWide样例类

我们使用ClickLogWide样例类来保存拓宽后的点击流日志数据。直接复制原有的ClickLog样例类,然后给它加上下列额外的字段:

步骤

  1. bean包中创建ClickLogWide样例类
  2. 复制原有ClickLog样例类
  3. 添加以下字段
    • 用户访问的次数(count)
    • 用户访问的时间(timestamp)
    • 国家省份城市(拼接)(address)
    • 年月(yearMonth)
    • 年月日(yearMonthDay)
    • 年月日时(yearMonthDayHour)
    • 是否为访问某个频道的新用户(isNew)——0表示否,1表示是
    • 在某一小时内是否为某个频道的新用户(isHourNew)——0表示否
    • 在某一天是否为某个频道的新用户(isDayNew)——0表示否,1表示是,
    • 在某一个月是否为某个频道的新用户(isMonthNew)——0表示否,1表示是

参考代码

ClickLogWide.scala

// 频道ID(channelID)
// 产品类别ID(categoryID)
// 产品ID(produceID)
// 国家(country)
// 省份(province)
// 城市(city)
// 网络方式(network)
// 来源方式(source)
// 浏览器类型(browserType)
// 进入网站时间(entryTime)
// 离开网站时间(leaveTime)
// 用户的ID(userID)
// 用户访问的次数(count)
// 用户访问的时间(timestamp)
// 国家省份城市(拼接)(address)
// 年月(yearMonth)
// 年月日(yearMonthDay)
// 年月日时(yearMonthDayHour)
// 是否为访问某个频道的新用户(isNew)——0表示否,1表示是
// 在某一小时内是否为某个频道的新用户(isHourNew)——0表示否
// 在某一天是否为某个频道的新用户(isDayNew)——0表示否,1表示是,
// 在某一个月是否为某个频道的新用户(isMonthNew)——0表示否,1表示是
case class ClickLog(
                     var channelID: String,
                     var categoryID: String,
                     var produceID: String,
                     var country: String,
                     var province: String,
                     var city: String,
                     var network: String,
                     var source: String,
                     var browserType: String,
                     var entryTime: String,
                     var leaveTime: String,
                     var userID: String,
                     var count: Long,
                     var timestamp: Long,
                     var address: String,
                     var yearMonth: String,
                     var yearMonthDay: String,
                     var yearMonthDayHour: String,
                     var isNew: Int,
                     var isHourNew: Int,
                     var isDayNew: Int,
                     var isMonthNew: Int
                   )

4.3. 预处理:地址、时间字段拓宽

为了方便后续进行数据分析,我们需要对地址、时间字段进行拓宽。我们先来拓宽以下几个字段:

字段名说明
address国家、省份、城市(拼接)
yearMonth年月
yearMonthDay年月日
yearMonthDayHour年月日时

步骤

  1. 对水印数据流进行预处理
  2. 拼接拓宽字段
  3. 使用SimpleDateFormat+FastDateFormat来获取格式化的时间(FastDateFormat是线程安全的)
  4. 构建ClickLogWide样例类
  5. 打印预处理数据
  6. 测试

实现

  • task包下创建PreprocessTask
  • 添加process接收水印流数据
  • 使用map算子进行数据转换
  • 构建以下拓宽字段
    • 国家省份城市(拼接)(address)
    • 年月(yearMonth)
    • 年月日(yearMonthDay)
    • 年月日时(yearMonthDayHour)
  • 编写工具方法timestamp2Str,传入时间戳和日期格式,使用FastDateFormat将时间戳转换为指定格式的日期
  • 分别使用timestamp2Str获取年月年月日年月日时字段
  • 构建ClickLogWide样例类
  • App中调用预处理任务的process方法,并打印测试

示例代码

object PreprocessTask {

  def process(watermarkData: DataStream[Message]) = {
    val clickWideDataStream: DataStream[ClickLogWide] = watermarkData.map {
      msg =>
        // 转换时间
        val yearMonth = FastDateFormat.getInstance("yyyyMM").format(msg.timeStamp)
        val yearMonthDay = FastDateFormat.getInstance("yyyyMMdd").format(msg.timeStamp)
        val yearMonthDayHour = FastDateFormat.getInstance("yyyyMMddHH").format(msg.timeStamp)

        // 拼接地址
        val address = msg.clickLog.country + msg.clickLog.province + msg.clickLog.city

        ClickLogWide(
          msg.clickLog.channelID,
          msg.clickLog.categoryID,
          msg.clickLog.produceID,
          msg.clickLog.country,
          msg.clickLog.province,
          msg.clickLog.city,
          msg.clickLog.network,
          msg.clickLog.source,
          msg.clickLog.browserType,
          msg.clickLog.entryTime,
          msg.clickLog.leaveTime,
          msg.clickLog.userID,
          msg.count,
          msg.timeStamp,
          address,
          yearMonth,
          yearMonthDay,
          yearMonthDayHour,
          0,
          0,
          0,
          0
        )
    }

    clickWideDataStream
  }
    
    /**
    * 将时间戳格式化为日期时间格式
    *
    * @param timestamp 时间戳
    * @param format    日期时间格式
    */
    def timestamp2Str(timestamp: Long, format: String) = {
        FastDateFormat.getInstance(format).format(timestamp)
    }
}

App.scala中添加执行任务代码

 val clickWideDataStream: DataStream[ClickLogWide] = PreprocessTask.process(watermarkDatastream)

4.4. 预处理:isNew字段处理

isNew字段是判断某个用户ID,是否已经访问过某个频道

实现思路

  1. 先把要拓宽的字段isNewisHourNewisDayNewisMonthNew都创建出来,初始化为0
  2. 从hbase查询rowkey为userid:channlid查询user_historyuserid列的数据
  3. 判断userid列数据是否为空
  • 如果为空
    • 设置isNew字段为1,表示是新用户
    • 将其他的isHourNewisDayNewisMonthNew字段都设置为1
    • 将该用户数据添加到user_history表中
  • 如果不为空
    • 设置isNew字段为0,表示是老用户
    • 更新user_history表中的lastVisitedTime

user_history表的列

  • 用户ID:频道ID(rowkey)
  • 用户ID(userid)
  • 频道ID(channelid)
  • 最后访问时间(时间戳)(lastVisitedTime)

步骤

  1. 创建一个object RealPreprogressTask,用来编写预处理的scala代码
  2. 创建一个方法process,接收水印流数据
  3. 使用map算子按照实现思路进行处理,计算得到isNew字段,然后返回
  4. 打印测试,查看hbase中是否已经成功创建user_history表并保存了用户数据

4.5. 预处理:isHourNew字段处理

isHourNew字段表示,用户虽然不是第一次访问某个频道,但是用户在某一个小时内,是第一次访问该频道。

我们通过时间比较来判断:

  1. 如果当前用户的访问时间是:2020-03-27 11点
  2. 历史记录表中,最后一次的访问时间是:2020-03-27 10点,isHourNew为1(11点确实没访问)
  3. 历史记录表中,最后一次的访问时间是:2020-03-27 11点,isHourNew为0

所以,我们根据时间来进行判断。

步骤

  1. user_history表中获取lastVisitedTime字段
  2. lastVisitedTime字段格式化为年月日时格式
  3. 和当前点击流日志的年/月/日/时字段比较
    • 如果当前时间字段 > lastVisited,则isHourNew为1
    • 否则isHourNew为0

4.6. 预处理:isDayNew字段处理

isHourNew字段处理类似,只不过需要使用年月日来进行比较

4.7. isMonthNew字段处理

isHourNew字段处理类似,只不过需要使用年月来进行比较

参考代码

package com.abc.realprocess.task

import com.abc.realprocess.bean.{ClickLogWide, Message}
import com.abc.realprocess.util.HBaseUtil
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang.time.FastDateFormat
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.api.scala._

object PreprocessTask {
  def process(watermarkData: DataStream[Message]) = {
    val clickWideDataStream: DataStream[ClickLogWide] = watermarkData.map {
      msg =>
        // 转换时间
        val yearMonth = FastDateFormat.getInstance("yyyyMM").format(msg.timeStamp)
        val yearMonthDay = FastDateFormat.getInstance("yyyyMMdd").format(msg.timeStamp)
        val yearMonthDayHour = FastDateFormat.getInstance("yyyyMMddHH").format(msg.timeStamp)

        // 判断各个时间段是否为新用户
        val isNewTuple = isNewProcess(msg)

        // 拼接地址
        val address = msg.clickLog.country + msg.clickLog.province + msg.clickLog.city

        ClickLogWide(
          msg.clickLog.channelID,
          msg.clickLog.categoryID,
          msg.clickLog.produceID,
          msg.clickLog.country,
          msg.clickLog.province,
          msg.clickLog.city,
          msg.clickLog.network,
          msg.clickLog.source,
          msg.clickLog.browserType,
          msg.clickLog.entryTime,
          msg.clickLog.leaveTime,
          msg.clickLog.userID,
          msg.count,
          msg.timeStamp,
          address,
          yearMonth,
          yearMonthDay,
          yearMonthDayHour,
          isNewTuple._1,
          isNewTuple._2,
          isNewTuple._3,
          isNewTuple._4
        )
    }

    clickWideDataStream
  }

  /**
    * 构建isNew、isHourNew、isDayNew、isMonthNew字段
    * 判断用户是否为新用户、是否一个小时/天/月之内是新用户
    *
    * @param message
    */
  private def isNewProcess(message: Message) = {
    // 1. 先把要拓宽的字段`isNew`、`isHourNew`、`isDayNew`、`isMonthNew`都创建出来,初始化为0
    var isNew = 0
    var isHourNew = 0
    var isDayNew = 0
    var isMonthNew = 0

    // 2. 从hbase查询rowkey为`userid:channlid`查询`user_history`中`userid`列数据

    // 表名, 渠道用户历史表
    val tableName = "channel_user_history"
    // 列族名
    val cfName = "info"
    // 拼接rowkey ------> 用户ID:频道ID(rowkey)
    val rowkey: String = message.clickLog.userID + ":" + message.clickLog.channelID
    // 用户ID(userid)
    val userIdColName = "userid"
    // 频道ID(channelid)
    val channelIdColName = "channelId"
    // 最后访问时间(时间戳)(lastVisitedTime)
    val lastVisitedTimeColName = "lastVisitedTime"

    val useridInHbase: String = HBaseUtil.getData(tableName, rowkey, cfName, userIdColName)

    //   3. 判断`userid`列数据是否为空 , 为空则认为是新用户,插入数据即可
    if (StringUtils.isBlank(useridInHbase)) {
      // - 如果为空
      // - 设置`isNew`字段为`1`,表示是新用户,
      isNew = 1
      // - 将其他的`isHourNew`、`isDayNew`、`isMonthNew`字段都设置为1
      isHourNew = 1
      isDayNew = 1
      isMonthNew = 1

      // - 将该用户数据添加到`user_history`表中
      HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
        userIdColName -> message.clickLog.userID,
        channelIdColName -> message.clickLog.channelID,
        lastVisitedTimeColName -> message.timeStamp.toString
      ))

    }
    else {
      // - 如果不为空
      // - 设置`isNew`字段为0,表示是老用户
      isNew = 0

      // 1. 从`user_history`表中获取`lastVisitedTime`字段
      val lastVisitedTime: String = HBaseUtil.getData(tableName, rowkey, cfName, lastVisitedTimeColName)

      //   2. 将`lastVisitedTime`字段格式化为`年月日时`格式
      isHourNew = compareDate(message.timeStamp, lastVisitedTime.toLong, "yyyyMMddHH")
      isDayNew = compareDate(message.timeStamp, lastVisitedTime.toLong, "yyyyMMdd")
      isMonthNew = compareDate(message.timeStamp, lastVisitedTime.toLong, "yyyyMM")

      // - 更新`user_history`表中的`lastVisitedTime`列
      HBaseUtil.putData(tableName, rowkey, cfName, lastVisitedTimeColName, message.timeStamp.toString)
    }

    (isNew, isHourNew, isDayNew, isMonthNew)
  }

  /**
    * 将时间戳格式化为日期时间格式
    *
    * @param timestamp 时间戳
    * @param format    日期时间格式
    */
  def timestamp2Str(timestamp: Long, format: String) = {
    FastDateFormat.getInstance(format).format(timestamp)
  }


  /**
    * 比较时间
    * @param currentTime    当前时间
    * @param historyTime    历史时间
    * @param format         时间格式化方式
    * @return
    *         1 : 当前时间大于历史时间返回
    *         0 : 当前时间小于等于历史时间
    */
  def compareDate(currentTime: Long, historyTime: Long, format: String): Int = {

    var result = timestamp2Str(historyTime, format).compareTo(timestamp2Str(currentTime, format))

    if (result > 0) {
      result = 1
    } else {
      result = 0
    }
    result
  }

  def main(args: Array[String]): Unit = {
    println(compareDate(1558104383523L, 1568304383523L, "yyyyMMdd"))
  }
}

注意:
日期格式
yyyy : 年
MM : 月
dd: 日
HH: 24小时

5. 实时频道热点分析业务开发

5.1. 业务介绍

频道热点,就是要统计频道被访问(点击)的数量。

分析得到以下的数据:

频道ID访问数量
频道ID1128
频道ID2401
频道ID3501

需要将历史的点击数据进行累加

5.2. 业务开发

步骤

  1. 创建实时热点样例类,专门用来计算实时热点的数据
  2. 将预处理后的数据,转换为要分析出来的数据(频道、访问次数)样例类
  3. 按照频道进行分组(分流)
  4. 划分时间窗口(3秒一个窗口)
  5. 进行合并计数统计
  6. 打印测试
  7. 将计算后的数据下沉到Hbase

实现

  1. 创建一个ChannelRealHotTask单例对象
  2. 添加一个ChannelRealHot样例类,它封装要统计的两个业务字段:频道ID(channelID)、访问数量(visited)
  3. ChannelRealHotTask中编写一个process方法,接收预处理后的DataStream
  4. 使用map算子,将ClickLog对象转换为ChannelRealHot
  5. 按照频道ID进行分流
  6. 划分时间窗口(3秒一个窗口)
  7. 执行reduce合并计算
  8. 将合并后的数据下沉到hbase
    • 判断hbase中是否已经存在结果记录
    • 若存在,则获取后进行累加
    • 若不存在,则直接写入

参考代码

// 添加一个`ChannelRealHot`样例类,它封装要统计的两个业务字段:频道ID(channelID)、访问数量(visited)
case class ChannelRealHot(var channelID: String, var visited: Long)

object ChannelRealHotTask {

	/**
    * 点击流日志宽表数据流
    *
    * 分组
    * 划分时间窗口
    * 聚合
    * 落地HBase
    *
    * @param clickLogWideDataStream
    * @return
    */
  def process(clickLogWideDataStream: DataStream[ClickLogWide]) = {

    // 1. 遍历日志宽表,转换为 样例类[ChannelRealHot]
    val channelRealHotDataStream: DataStream[ChannelRealHot] = clickLogWideDataStream.map {
      clicklog: ClickLogWide =>
        ChannelRealHot(clicklog.channelID, clicklog.count)
    }

    // 2. 根据渠道ID进行分组
    val groupedDataStream: KeyedStream[ChannelRealHot, String] = channelRealHotDataStream.keyBy(line => line.channelID)

    // 3. 设置时间窗口
    val windowDataStream: WindowedStream[ChannelRealHot, String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3))

    // 4. 执行reduce合并计算
    val reduceRealHotDataStream: DataStream[ChannelRealHot] = windowDataStream.reduce {
      // 上一次reduce之后的对象,当前待 reduce的对象
      (channelRealHotTotal: ChannelRealHot, channelRealHot: ChannelRealHot) =>
        ChannelRealHot(channelRealHot.channelID, channelRealHotTotal.visited + channelRealHot.visited)
    }

    // 5. 将合并后的数据下沉到hbase
    reduceRealHotDataStream.addSink(new SinkFunction[ChannelRealHot] {
      override def invoke(value: ChannelRealHot): Unit = {
        // hbase的相关字段名创建出来
        val tableName = "channel"
        val cfName = "info"
        // 频道ID(channelID)、访问数量(visited)
        val channelIDColName = "channelID"
        val visitedColName = "visited"
        val rowkey = value.channelID

        // - 判断hbase中是否已经存在结果记录
        val visitedValue: String = HBaseUtil.getData(tableName, rowkey, cfName, visitedColName)
        var totalCount: Long = 0

        // - 若存在,则获取后进行累加
        if (!StringUtils.isBlank(visitedValue)) {
          totalCount = value.visited + visitedValue.toLong
        }
        else {
          // - 若不存在,则直接写入
          totalCount = value.visited
        }

        // 保存数据
        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          channelIDColName -> value.channelID,
          visitedColName -> totalCount.toString
        ))

      }
    })
  }
}

6. 实时频道PV/UV分析

针对频道的PV、UV进行不同时间维度的分析。有以下三个维度:

  • 小时

我们会分别来实现不同维度的分析。

6.1. 业务介绍

PV(访问量)

即Page View,页面刷新一次算一次。

UV(独立访客)

即Unique Visitor,指定时间内相同的客户端只被计算一次

统计分析后得到的数据如下所示:

频道ID时间PVUV
频道120170101161230350
频道220170101171251330
频道320170101185512610

6.2. 小时维度PV/UV业务开发

步骤

  1. 创建频道PV、UV样例类
  2. 将预处理后的数据,转换为要分析出来的数据(频道、PV、UV)样例类
  3. 按照频道时间进行分组(分流)
  4. 划分时间窗口(3秒一个窗口)
  5. 进行合并计数统计
  6. 打印测试
  7. 将计算后的数据下沉到Hbase

实现

  1. 创建一个ChannelPvUvTask单例对象
  2. 添加一个ChannelPvUv样例类,它封装要统计的四个业务字段:频道ID(channelID)、年月日时、PV、UV
  3. ChannelPvUvTask中编写一个processHourDim方法,接收预处理后的DataStream
  4. 使用map算子,将ClickLog对象转换为ChannelPvUv
  5. 按照频道ID年月日时进行分流
  6. 划分时间窗口(3秒一个窗口)
  7. 执行reduce合并计算
  8. 打印测试
  9. 将合并后的数据下沉到hbase
    • 判断hbase中是否已经存在结果记录
    • 若存在,则获取后进行累加
    • 若不存在,则直接写入

参考代码

// 抽取一个公共的trait, 所有的任务都来实现它
trait BaseTask {
  def process(clickLogWideDataStream: DataStream[ClickLogWide]):Any
}


//添加一个`ChannelPvUv`样例类,它封装要统计的四个业务字段:频道ID(channelID)、年月日时、PV、UV
case class ChannelPvUv(
                        val channelID: String,
                        var yearMonthDayHour: String,
                        var pv: Long,
                        var uv: Long
                      )

object ChannelPvUvTask extends BaseTask {

  override def process(clickLogWideDataStream: DataStream[ClickLogWide]) = {

    // 1. 使用map算子,将`ClickLog`对象转换为`ChannelPvUv`
    val channelPvUvDataStream: DataStream[ChannelPvUv] = clickLogWideDataStream.map {
      clickLogWide =>
        ChannelPvUv(clickLogWide.channelID, clickLogWide.yearMonthDayHour, clickLogWide.count, clickLogWide.isHourNew)
    }

    // 2. 根据channelID+年月日时 分组 keyBy
    val keyedStream: KeyedStream[ChannelPvUv, String] = channelPvUvDataStream.keyBy {
      channelPvUv => channelPvUv.channelID + channelPvUv.yearMonthDayHour
    }

    // 3. 设置时间窗口
    val windowDataStream: WindowedStream[ChannelPvUv, String, TimeWindow] = keyedStream.timeWindow(Time.seconds(3))

    // 4. 聚合
    val reduceDataStream: DataStream[ChannelPvUv] = windowDataStream.reduce {
      (t1, t2) =>
        ChannelPvUv(t1.channelID, t1.yearMonthDayHour, t1.pv + t2.pv, t1.uv + t2.uv)
    }

    // 5. 落地HBase
    reduceDataStream.addSink(new SinkFunction[ChannelPvUv] {

      override def invoke(value: ChannelPvUv): Unit = {
        val tableName = "channel_pvuv"
        val cfName = "info"
        val channelIdColName = "channelID"
        val yearMonthDayHourColName = "yearMonthDayHour"
        val pvColName = "pv"
        val uvColName = "uv"
        val rowkey = value.channelID + ":" + value.yearMonthDayHour

        var totalPv = 0L
        var totalUv = 0L

        //   - 判断hbase中是否已经存在结果记录
        val pvValue: String = HBaseUtil.getData(tableName, rowkey, cfName, pvColName)
        val uvValue: String = HBaseUtil.getData(tableName, rowkey, cfName, uvColName)
        // - 若存在,则获取后进行累加
        if(!StringUtils.isBlank(pvValue)) {
          totalPv = pvValue.toLong + value.pv
        }
        // - 若不存在,则直接写入
        else {
          totalPv = value.pv
        }

        // - 若存在,则获取后进行累加
        if(!StringUtils.isBlank(uvValue)) {
          totalUv = uvValue.toLong + value.uv
        }
        // - 若不存在,则直接写入
        else {
          totalUv = value.uv
        }

        HBaseUtil.putMapData(tableName,rowkey,cfName,Map(
          channelIdColName -> value.channelID,
          yearMonthDayHourColName -> value.yearMonthDayHour,
          pvColName -> totalPv.toString,
          uvColName -> totalPv.toString
        ))

      }
    })
  }
}

6.3. 天维度PV/UV业务开发

按天的维度来统计PV、UV与按小时维度类似,就是分组字段不一样。可以直接复制按小时维度的PV/UV,然后修改即可。

6.4. 小时/天/月维度PV/UV业务开发

但是,其实上述代码,都是一样的。我们可以将小时三个时间维度的数据放在一起来进行分组

思路

  1. 每一条ClickLog生成三个维度的ChannelPvUv,分别用于三个维度的统计
  • ChannelPvUv –> 小时维度
  • ChannelPvUv –> 天维度
  • ChannelPvUv –> 月维度

实现

  1. 使用flatmap算子,将ClickLog转换为三个ChannelPvUv
  2. 重新运行测试

参考代码

def process(clicklogWideDataStream:DataStream[ClickLogWide]) = {
    ...
    val channelPvUvDataStream: DataStream[ChannelPvUv] = clicklogWideDataStream.flatMap {
      clicklog =>
        List(
          ChannelPvUv(clicklog.channelID, clicklog.yearMonthDayHour, clicklog.count, clicklog.isHourNew),
          ChannelPvUv(clicklog.channelID, clicklog.yearMonthDay, clicklog.count, clicklog.isDayNew),
          ChannelPvUv(clicklog.channelID, clicklog.yearMonth, clicklog.count, clicklog.isMonthNew)
        )
    }
    ...
  }

7. 实时频道用户新鲜度分析

7.1. 业务介绍

用户新鲜度即分析网站每小时、每天、每月活跃的新老用户占比

可以通过新鲜度:

  • 从宏观层面上了解每天的新老用户比例以及来源结构
  • 当天新增用户与当天推广行为是否相关

统计分析要得到的数据如下:

频道ID时间新用户老用户
频道1201703512144
频道1201703184114123
频道120170318103424412

7.2. 业务开发

步骤

  1. 创建频道新鲜度样例类,包含以下字段(频道、时间、新用户、老用户)
  2. 将预处理后的数据,转换为新鲜度样例类
  3. 按照频道时间进行分组(分流)
  4. 划分时间窗口(3秒一个窗口)
  5. 进行合并计数统计
  6. 打印测试
  7. 将计算后的数据下沉到Hbase

实现

  1. 创建一个ChannelFreshnessTask单例对象
  2. 添加一个ChannelFreshness样例类,它封装要统计的四个业务字段:频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
  3. ChannelFreshnessTask中编写一个process方法,接收预处理后的DataStream
  4. 使用flatMap算子,将ClickLog对象转换为三个不同时间维度ChannelFreshness
  5. 按照频道ID日期进行分流
  6. 划分时间窗口(3秒一个窗口)
  7. 执行reduce合并计算
  8. 打印测试
  9. 将合并后的数据下沉到hbase
    • 准备hbase的表名、列族名、rowkey名、列名
    • 判断hbase中是否已经存在结果记录
    • 若存在,则获取后进行累加
    • 若不存在,则直接写入

注意:

这个地方,老用户需要注意处理,因为如果不进行判断,就会计算重复的一些用户访问数据

  1. 新用户就是根据clicklog拓宽后的isNew来判断
  2. 老用户需要判断
  • 如果isNew是0,且isHourNew为1/isDayNew为1、isMonthNew为1,则进行老用户为1
  • 否则为0

参考代码

// 1. 添加一个`ChannelFreshness`样例类,它封装要统计的四个业务字段:频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
case class ChannelFreshness(var channelID:String,
                            var date:String,
                            var newCount:Long,
                            var oldCount:Long)



object ChannelFreshnessTask {
  // 2. 在`ChannelFreshnessTask`中编写一个`process`方法,接收预处理后的`DataStream`
  def process(clicklogWideDataStream:DataStream[ClickLogWide]) = {

    // 3. 使用flatMap算子,将`ClickLog`对象转换为`ChannelFreshness`
    val channelFreshnessDataStream: DataStream[ChannelFreshness] = clicklogWideDataStream.flatMap {
      clicklog =>
        val isOld = (isNew: Int, isDateNew:Int) => if (isNew == 0 && isDateNew == 1) 1 else 0

        List(
          ChannelFreshness(clicklog.channelID, clicklog.yearMonthDayHour, clicklog.isNew, isOld(clicklog.isNew, clicklog.isHourNew)),
          ChannelFreshness(clicklog.channelID, clicklog.yearMonthDay, clicklog.isNew, isOld(clicklog.isDayNew, clicklog.isDayNew)),
          ChannelFreshness(clicklog.channelID, clicklog.yearMonth, clicklog.isNew, isOld(clicklog.isMonthNew, clicklog.isMonthNew))
        )
    }

    // 4. 按照`频道ID`、`日期`进行分流

    val groupedDateStream: KeyedStream[ChannelFreshness, String] = channelFreshnessDataStream.keyBy {
      freshness =>
        freshness.channelID + freshness.date
    }

    // 5. 划分时间窗口(3秒一个窗口)
    val windowStream: WindowedStream[ChannelFreshness, String, TimeWindow] = groupedDateStream.timeWindow(Time.seconds(3))

    // 6. 执行reduce合并计算
    val reduceDataStream: DataStream[ChannelFreshness] = windowStream.reduce {
      (freshness1, freshness2) =>
        ChannelFreshness(freshness2.channelID, freshness2.date, freshness1.newCount + freshness2.newCount, freshness1.oldCount + freshness2.oldCount)
    }

    // 打印测试
    reduceDataStream.print()

    // 7. 将合并后的数据下沉到hbase
    reduceDataStream.addSink(new SinkFunction[ChannelFreshness] {
      override def invoke(value: ChannelFreshness): Unit = {
        val tableName = "channel_freshness"
        val cfName = "info"
        // 频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
        val channelIdColName = "channelID"
        val dateColName = "date"
        val newCountColName = "newCount"
        val oldCountColName = "oldCount"
        val rowkey = value.channelID + ":" + value.date

        // - 判断hbase中是否已经存在结果记录
        val newCountOldCountMap = HBaseUtil.getData(tableName, rowkey, cfName, List(newCountColName, oldCountColName))

        var totalNewCount = 0L
        var totalOldCount = 0L

        // - 若存在,则获取后进行累加
        if(newCountOldCountMap != null && StringUtils.isNotBlank(newCountOldCountMap.getOrElse(newCountColName, ""))) {
          totalNewCount = value.newCount + newCountOldCountMap(newCountColName).toLong
        }
        else {
          totalNewCount = value.newCount
        }
        // - 若不存在,则直接写入

        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          channelIdColName -> value.channelID,
          dateColName -> value.date,
          newCountColName -> totalNewCount.toString,
          oldCountColName -> totalOldCount.toString
        ))
      }
    })
  }
}

7.3 模板方法提取公共类

模板方法模式是在父类中定义算法的骨架,把具体实延迟到子类中去,可以在不改变一个算法的结构时可重定义该算法的某些步骤。

BaseTask.scala

package com.abc.realprocess.task

import com.abc.realprocess.bean.ClickLogWide
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

// 抽取一个公共的trait, 所有的任务都来实现它
trait BaseTask[T] {

  /**
    * 对原始日志数据流 进行map转换 分组 时间窗口 聚合 落地HBase
    * @param clickLogWideDataStream
    * @return
    */
  def process(clickLogWideDataStream: DataStream[ClickLogWide]):Any={
    val mapDataStream:DataStream[T] = map(clickLogWideDataStream)
    val keyedStream:KeyedStream[T, String] = keyBy(mapDataStream)
    val windowedStream: WindowedStream[T, String, TimeWindow] = timeWindow(keyedStream)
    val reduceDataStream: DataStream[T] = reduce(windowedStream)
    sink2HBase(reduceDataStream)
  }

  // Map转换数据流
  def map(source:DataStream[ClickLogWide]):DataStream[T]

  // 分组
  def keyBy(mapDataStream: DataStream[T]):KeyedStream[T,String]

  // 时间窗口
  def timeWindow(keyedStream: KeyedStream[T, String]):WindowedStream[T, String, TimeWindow]

  // 聚合  
  def reduce(windowedStream: WindowedStream[T, String, TimeWindow]): DataStream[T]

  // 落地HBase
  def sink2HBase(reduceDataStream: DataStream[T])
}

改造后的代码:

// 添加一个`ChannelFreshness`样例类,它封装要统计的四个业务字段:频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
case class ChannelFreshness(var channelID: String,
                            var date: String,
                            var newCount: Long,
                            var oldCount: Long)


object ChannelFreshnessTask extends BaseTask[ChannelFreshness] {

  // 1. 使用flatMap算子,将`ClickLog`对象转换为`ChannelFreshness`
  override def map(source: DataStream[ClickLogWide]): DataStream[ChannelFreshness] = {
    source.flatMap {
      clicklog =>
        val isOld = (isNew: Int, isDateNew: Int) => if (isNew == 0 && isDateNew == 1) 1 else 0

        List(
          ChannelFreshness(clicklog.channelID, clicklog.yearMonthDayHour, clicklog.isNew, isOld(clicklog.isNew, clicklog.isHourNew)),
          ChannelFreshness(clicklog.channelID, clicklog.yearMonthDay, clicklog.isNew, isOld(clicklog.isDayNew, clicklog.isDayNew)),
          ChannelFreshness(clicklog.channelID, clicklog.yearMonth, clicklog.isNew, isOld(clicklog.isMonthNew, clicklog.isMonthNew))
        )
    }
  }

  override def keyBy(mapDataStream: DataStream[ChannelFreshness]): KeyedStream[ChannelFreshness, String] = {
    mapDataStream.keyBy {
      freshness =>
        freshness.channelID + freshness.date
    }
  }

  override def timeWindow(keyedStream: KeyedStream[ChannelFreshness, String]): WindowedStream[ChannelFreshness, String, TimeWindow] = {
    keyedStream.timeWindow(Time.seconds(3))
  }

  override def reduce(windowedStream: WindowedStream[ChannelFreshness, String, TimeWindow]): DataStream[ChannelFreshness] = {
    windowedStream.reduce {
      (freshness1, freshness2) =>
        ChannelFreshness(freshness2.channelID, freshness2.date, freshness1.newCount + freshness2.newCount, freshness1.oldCount + freshness2.oldCount)
    }
  }

  override def sink2HBase(reduceDataStream: DataStream[ChannelFreshness]) = {
    reduceDataStream.addSink {
      value => {
        val tableName = "channel_freshness"
        val cfName = "info"
        // 频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
        val channelIdColName = "channelID"
        val dateColName = "date"
        val newCountColName = "newCount"
        val oldCountColName = "oldCount"
        val rowkey = value.channelID + ":" + value.date

        // - 判断hbase中是否已经存在结果记录
        val newCountInHBase = HBaseUtil.getData(tableName, rowkey, cfName, newCountColName)
        val oldCountInHBase = HBaseUtil.getData(tableName, rowkey, cfName, oldCountColName)

        var totalNewCount = 0L
        var totalOldCount = 0L

        // 判断hbase中是否有历史的指标数据
        if (StringUtils.isNotBlank(newCountInHBase)) {
          totalNewCount = newCountInHBase.toLong + value.newCount
        }
        else {
          totalNewCount = value.newCount
        }

        if (StringUtils.isNotBlank(oldCountInHBase)) {
          totalOldCount = oldCountInHBase.toLong + value.oldCount
        }
        else {
          totalOldCount = value.oldCount
        }

        // 将合并累计的数据写入到hbase中
        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          channelIdColName -> value.channelID,
          dateColName -> value.date,
          newCountColName -> totalNewCount,
          oldCountColName -> totalOldCount
        ))
      }
    }
  }
}

8. 实时频道地域分析业务开发

8.1. 业务介绍


通过地域分析,可以帮助查看地域相关的PV/UV、用户新鲜度。

需要分析出来指标

  • PV
  • UV
  • 新用户
  • 老用户

需要分析的维度

  • 地域(国家省市)——这里为了节省时间,只分析市级的地域维度,其他维度大家可以自己来实现
  • 时间维度(时、天、月)

统计分析后的结果如下:

频道ID地域(国/省/市)时间PVUV新用户老用户
频道1中国北京市朝阳区2018091000300123171
频道1中国北京市朝阳区2018091051212323100
频道1中国北京市朝阳区2018091010100411130

8.2. 业务开发

步骤

  1. 创建频道地域分析样例类(频道、地域(国省市)、时间、PV、UV、新用户、老用户)
  2. 将预处理后的数据,使用flatMap转换为样例类
  3. 按照频道时间地域进行分组(分流)
  4. 划分时间窗口(3秒一个窗口)
  5. 进行合并计数统计
  6. 打印测试
  7. 将计算后的数据下沉到Hbase

实现

  1. 创建一个ChannelAreaTask单例对象
  2. 添加一个ChannelArea样例类,它封装要统计的四个业务字段:频道ID(channelID)、地域(area)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
  3. ChannelAreaTask中编写一个process方法,接收预处理后的DataStream
  4. 使用flatMap算子,将ClickLog对象转换为三个不同时间维度ChannelArea
  5. 按照频道ID时间地域进行分流
  6. 划分时间窗口(3秒一个窗口)
  7. 执行reduce合并计算
  8. 打印测试
  9. 将合并后的数据下沉到hbase
    • 准备hbase的表名、列族名、rowkey名、列名
    • 判断hbase中是否已经存在结果记录
    • 若存在,则获取后进行累加
    • 若不存在,则直接写入

参考代码

ChannelFreshnessTask.scala

package com.abc.realprocess.task

import com.abc.realprocess.bean.ClickLogWide
import com.abc.realprocess.util.HBaseUtil
import org.apache.commons.lang.StringUtils
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, WindowedStream}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow


// 添加一个`ChannelFreshness`样例类,它封装要统计的四个业务字段:频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
case class ChannelFreshness(var channelID: String,
                            var date: String,
                            var newCount: Long,
                            var oldCount: Long)

object ChannelFreshnessTask extends BaseTask[ChannelFreshness] {

  // 1. 使用flatMap算子,将`ClickLog`对象转换为`ChannelFreshness`
  override def map(source: DataStream[ClickLogWide]): DataStream[ChannelFreshness] = {
    source.flatMap {
      clicklog =>
        val isOld = (isNew: Int, isDateNew: Int) => if (isNew == 0 && isDateNew == 1) 1 else 0

        List(
          ChannelFreshness(clicklog.channelID, clicklog.yearMonthDayHour, clicklog.isNew, isOld(clicklog.isNew, clicklog.isHourNew)),
          ChannelFreshness(clicklog.channelID, clicklog.yearMonthDay, clicklog.isNew, isOld(clicklog.isDayNew, clicklog.isDayNew)),
          ChannelFreshness(clicklog.channelID, clicklog.yearMonth, clicklog.isNew, isOld(clicklog.isMonthNew, clicklog.isMonthNew))
        )
    }
  }

  override def keyBy(mapDataStream: DataStream[ChannelFreshness]): KeyedStream[ChannelFreshness, String] = {
    mapDataStream.keyBy {
      freshness =>
        freshness.channelID + freshness.date
    }
  }

  override def timeWindow(keyedStream: KeyedStream[ChannelFreshness, String]): WindowedStream[ChannelFreshness, String, TimeWindow] = {
    keyedStream.timeWindow(Time.seconds(3))
  }

  override def reduce(windowedStream: WindowedStream[ChannelFreshness, String, TimeWindow]): DataStream[ChannelFreshness] = {
    windowedStream.reduce {
      (freshness1, freshness2) =>
        ChannelFreshness(freshness2.channelID, freshness2.date, freshness1.newCount + freshness2.newCount, freshness1.oldCount + freshness2.oldCount)
    }
  }

  override def sink2HBase(reduceDataStream: DataStream[ChannelFreshness]) = {
    reduceDataStream.addSink {
      value => {
        val tableName = "channel_freshness"
        val cfName = "info"
        // 频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
        val channelIdColName = "channelID"
        val dateColName = "date"
        val newCountColName = "newCount"
        val oldCountColName = "oldCount"
        val rowkey = value.channelID + ":" + value.date

        // - 判断hbase中是否已经存在结果记录
        val newCountInHBase = HBaseUtil.getData(tableName, rowkey, cfName, newCountColName)
        val oldCountInHBase = HBaseUtil.getData(tableName, rowkey, cfName, oldCountColName)

        var totalNewCount = 0L
        var totalOldCount = 0L

        // 判断hbase中是否有历史的指标数据
        if (StringUtils.isNotBlank(newCountInHBase)) {
          totalNewCount = newCountInHBase.toLong + value.newCount
        }
        else {
          totalNewCount = value.newCount
        }

        if (StringUtils.isNotBlank(oldCountInHBase)) {
          totalOldCount = oldCountInHBase.toLong + value.oldCount
        }
        else {
          totalOldCount = value.oldCount
        }

        // 将合并累计的数据写入到hbase中
        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          channelIdColName -> value.channelID,
          dateColName -> value.date,
          newCountColName -> totalNewCount,
          oldCountColName -> totalOldCount
        ))
      }
    }
  }
}

9. 实时运营商分析业务开发

9.1. 业务介绍

分析出来中国移动、中国联通、中国电信等运营商的指标。来分析,流量的主要来源是哪个运营商的,这样就可以进行较准确的网络推广。

需要分析出来指标

  • PV
  • UV
  • 新用户
  • 老用户

需要分析的维度

  • 运营商
  • 时间维度(时、天、月)

统计分析后的结果如下:

频道ID运营商时间PVUV新用户老用户
频道120180910003000300
频道1中国联通20180910123101
频道1中国电信201809101055220

9.2. 业务开发

步骤

  1. 将预处理后的数据,转换为要分析出来数据(频道、运营商、时间、PV、UV、新用户、老用户)样例类
  2. 按照频道时间运营商进行分组(分流)
  3. 划分时间窗口(3秒一个窗口)
  4. 进行合并计数统计
  5. 打印测试
  6. 将计算后的数据下沉到Hbase

实现

  1. 创建一个ChannelNetworkTask单例对象
  2. 添加一个ChannelNetwork样例类,它封装要统计的四个业务字段:频道ID(channelID)、运营商(network)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
  3. ChannelNetworkTask中编写一个process方法,接收预处理后的DataStream
  4. 使用flatMap算子,将ClickLog对象转换为三个不同时间维度ChannelNetwork
  5. 按照频道ID时间运营商进行分流
  6. 划分时间窗口(3秒一个窗口)
  7. 执行reduce合并计算
  8. 打印测试
  9. 将合并后的数据下沉到hbase
    • 准备hbase的表名、列族名、rowkey名、列名
    • 判断hbase中是否已经存在结果记录
    • 若存在,则获取后进行累加
    • 若不存在,则直接写入

参考代码

package com.abc.realprocess.task

import com.abc.realprocess.bean.ClickLogWide
import com.abc.realprocess.util.HBaseUtil
import org.apache.commons.lang.StringUtils
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.windowing.time.Time

// 2. 添加一个`ChannelNetwork`样例类,它封装要统计的四个业务字段:频道ID(channelID)、运营商(network)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
case class ChannelNetwork(var channelID: String,
                          var network: String,
                          var date: String,
                          var pv: Long,
                          var uv: Long,
                          var newCount: Long,
                          var oldCount: Long)

object ChannelNetworkTask extends BaseTask[ChannelNetwork] {

  override def map(source: DataStream[ClickLogWide]): DataStream[ChannelNetwork] = {

    source.flatMap {
      clicklog =>
        val isOld = (isNew: Int, isDateNew: Int) => if (isNew == 0 && isDateNew == 1) 1 else 0

        List(
          ChannelNetwork(clicklog.channelID,
            clicklog.network,
            clicklog.yearMonthDayHour,
            clicklog.count,
            clicklog.isHourNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isHourNew)), // 小时维度
          ChannelNetwork(clicklog.channelID,
            clicklog.network,
            clicklog.yearMonthDay,
            clicklog.count,
            clicklog.isDayNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isDayNew)), // 天维度
          ChannelNetwork(clicklog.channelID,
            clicklog.network,
            clicklog.yearMonth,
            clicklog.count,
            clicklog.isMonthNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isMonthNew)) // 月维度
        )
    }
  }

  override def keyBy(mapDataStream: DataStream[ChannelNetwork]): KeyedStream[ChannelNetwork, String] = {
    mapDataStream.keyBy {
      network =>
        network.channelID + network.date + network.network
    }
  }

  override def timeWindow(keyedStream: KeyedStream[ChannelNetwork, String]): WindowedStream[ChannelNetwork, String, TimeWindow] = {
    keyedStream.timeWindow(Time.seconds(3))
  }

  override def reduce(windowedStream: WindowedStream[ChannelNetwork, String, TimeWindow]): DataStream[ChannelNetwork] = {
    windowedStream.reduce {
      (network1, network2) =>
        ChannelNetwork(network2.channelID,
          network2.network,
          network2.date,
          network1.pv + network2.pv,
          network1.uv + network2.uv,
          network1.newCount + network2.newCount,
          network1.oldCount + network2.oldCount)
    }
  }

  override def sink2HBase(reduceDataStream: DataStream[ChannelNetwork]): Unit = {
    reduceDataStream.addSink(new SinkFunction[ChannelNetwork] {
      override def invoke(value: ChannelNetwork): Unit = {
        // - 准备hbase的表名、列族名、rowkey名、列名
        val tableName = "channel_network"
        val cfName = "info"
        // 频道ID(channelID)、运营商(network)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
        val rowkey = s"${value.channelID}:${value.date}:${value.network}"
        val channelIdColName = "channelID"
        val networkColName = "network"
        val dateColName = "date"
        val pvColName = "pv"
        val uvColName = "uv"
        val newCountColName = "newCount"
        val oldCountColName = "oldCount"

        // - 判断hbase中是否已经存在结果记录
        val resultMap: Map[String, String] = HBaseUtil.getMapData(tableName, rowkey, cfName, List(
          pvColName,
          uvColName,
          newCountColName,
          oldCountColName
        ))

        var totalPv = 0L
        var totalUv = 0L
        var totalNewCount = 0L
        var totalOldCount = 0L

        if(resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(pvColName))) {
          totalPv = resultMap(pvColName).toLong + value.pv
        }
        else {
          totalPv = value.pv
        }

        if(resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(uvColName))) {
          totalUv = resultMap(uvColName).toLong + value.uv
        }
        else {
          totalUv = value.uv
        }

        if(resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(newCountColName))) {
          totalNewCount = resultMap(newCountColName).toLong + value.newCount
        }
        else {
          totalNewCount = value.newCount
        }

        if(resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(oldCountColName))) {
          totalOldCount = resultMap(oldCountColName).toLong + value.oldCount
        }
        else {
          totalOldCount = value.oldCount
        }

        // 频道ID(channelID)、运营商(network)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          channelIdColName -> value.channelID,
          networkColName -> value.network,
          dateColName -> value.date,
          pvColName -> totalPv.toString,
          uvColName -> totalUv.toString,
          newCountColName -> totalNewCount.toString,
          oldCountColName -> totalOldCount.toString
        ))
      }
    })
  }
}

10. 实时频道浏览器分析业务开发

10.1. 业务介绍

需要分别统计不同浏览器(或者客户端)的占比

需要分析出来指标

  • PV
  • UV
  • 新用户
  • 老用户

需要分析的维度

  • 浏览器
  • 时间维度(时、天、月)

统计分析后的结果如下:

频道ID浏览器时间PVUV新用户老用户
频道1360浏览器20180910003000300
频道1IE20180910123101
频道1Chrome201809101055220

10.2. 业务开发

步骤

  1. 创建频道浏览器分析样例类(频道、浏览器、时间、PV、UV、新用户、老用户)
  2. 将预处理后的数据,使用flatMap转换为要分析出来数据样例类
  3. 按照频道时间浏览器进行分组(分流)
  4. 划分时间窗口(3秒一个窗口)
  5. 进行合并计数统计
  6. 打印测试
  7. 将计算后的数据下沉到Hbase

实现

  1. 创建一个ChannelBrowserTask单例对象
  2. 添加一个ChannelBrowser样例类,它封装要统计的四个业务字段:频道ID(channelID)、浏览器(browser)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
  3. ChannelBrowserTask中编写一个process方法,接收预处理后的DataStream
  4. 使用flatMap算子,将ClickLog对象转换为三个不同时间维度ChannelBrowser
  5. 按照频道ID时间浏览器进行分流
  6. 划分时间窗口(3秒一个窗口)
  7. 执行reduce合并计算
  8. 打印测试
  9. 将合并后的数据下沉到hbase
    • 准备hbase的表名、列族名、rowkey名、列名
    • 判断hbase中是否已经存在结果记录
    • 若存在,则获取后进行累加
    • 若不存在,则直接写入

参考代码

package com.abc.realprocess.task

import com.abc.realprocess.bean.ClickLogWide
import com.abc.realprocess.util.HBaseUtil
import org.apache.commons.lang.StringUtils
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.windowing.time.Time

// 2. 添加一个`ChannelBrowser`样例类,它封装要统计的四个业务字段:频道ID(channelID)、浏览器(browser)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
case class ChannelBrowser(var channelID: String,
                          var browser: String,
                          var date: String,
                          var pv: Long,
                          var uv: Long,
                          var newCount: Long,
                          var oldCount: Long)


object ChannelBrowserTask extends BaseTask[ChannelBrowser] {

  override def map(source: DataStream[ClickLogWide]): DataStream[ChannelBrowser] = {

    source.flatMap {
      clicklog =>
        val isOld = (isNew: Int, isDateNew: Int) => if (isNew == 0 && isDateNew == 1) 1 else 0

        List(
          ChannelBrowser(clicklog.channelID,
            clicklog.browserType,
            clicklog.yearMonthDayHour,
            clicklog.count,
            clicklog.isHourNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isHourNew)), // 小时维度
          ChannelBrowser(clicklog.channelID,
            clicklog.browserType,
            clicklog.yearMonthDayHour,
            clicklog.count,
            clicklog.isDayNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isDayNew)), // 天维度
          ChannelBrowser(clicklog.channelID,
            clicklog.browserType,
            clicklog.yearMonth,
            clicklog.count,
            clicklog.isMonthNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isMonthNew)) // 月维度
        )
    }
  }

  override def keyBy(mapDataStream: DataStream[ChannelBrowser]): KeyedStream[ChannelBrowser, String] = {
    mapDataStream.keyBy {
      broswer =>
        broswer.channelID + broswer.date + broswer.browser
    }
  }

  override def timeWindow(keyedStream: KeyedStream[ChannelBrowser, String]): WindowedStream[ChannelBrowser, String, TimeWindow] = {
    keyedStream.timeWindow(Time.seconds(3))
  }

  override def reduce(windowedStream: WindowedStream[ChannelBrowser, String, TimeWindow]): DataStream[ChannelBrowser] = {
    windowedStream.reduce {
      (broswer1, broswer2) =>
        ChannelBrowser(broswer2.channelID,
          broswer2.browser,
          broswer2.date,
          broswer1.pv + broswer2.pv,
          broswer1.uv + broswer2.uv,
          broswer1.newCount + broswer2.newCount,
          broswer1.oldCount + broswer2.oldCount)
    }
  }

  override def sink2HBase(reduceDataStream: DataStream[ChannelBrowser]): Unit = {

    reduceDataStream.addSink(new SinkFunction[ChannelBrowser] {
      override def invoke(value: ChannelBrowser): Unit = {
        // - 准备hbase的表名、列族名、rowkey名、列名
        val tableName = "channel_broswer"
        val cfName = "info"
        // 频道ID(channelID)、浏览器(browser)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
        val rowkey = s"${value.channelID}:${value.date}:${value.browser}"
        val channelIDColName = "channelID"
        val broswerColName = "browser"
        val dateColName = "date"
        val pvColName = "pv"
        val uvColName = "uv"
        val newCountColName = "newCount"
        val oldCountColName = "oldCount"

        var totalPv = 0L
        var totalUv = 0L
        var totalNewCount = 0L
        var totalOldCount = 0L

        val resultMap: Map[String, String] = HBaseUtil.getMapData(tableName, rowkey, cfName, List(
          pvColName,
          uvColName,
          newCountColName,
          oldCountColName
        ))

        // 计算PV,如果Hbase中存在pv数据,就直接进行累加

        if (resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(pvColName))) {
          totalPv = resultMap(pvColName).toLong + value.pv
        }
        else {
          totalPv = value.pv
        }

        if (resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(uvColName))) {
          totalUv = resultMap(uvColName).toLong + value.uv
        }
        else {
          totalUv = value.uv
        }


        // - 判断hbase中是否已经存在结果记录
        // - 若存在,则获取后进行累加
        // - 若不存在,则直接写入
        if (resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(newCountColName))) {
          totalNewCount = resultMap(newCountColName).toLong + value.newCount
        }
        else {
          totalNewCount = value.newCount
        }

        if (resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(oldCountColName))) {
          totalOldCount = resultMap(oldCountColName).toLong + value.oldCount
        }
        else {
          totalOldCount = value.oldCount
        }

        // 频道ID(channelID)、浏览器(browser)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          channelIDColName -> value.channelID,
          broswerColName -> value.browser,
          dateColName -> value.date,
          pvColName -> totalPv.toString,
          uvColName -> totalUv.toString,
          newCountColName -> totalNewCount.toString,
          oldCountColName -> totalOldCount.toString
        ))
      }
    })
  }
}
如果你觉得这篇文章或者我分享的主题对你有帮助,请支持我继续更新网站和主题 !捐赠本站
喜欢 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址