欢迎来到天剑厚德,这里有最新的大数据技术原创分享。

Flink电商指标分析项目3

flink healthsun 0评论

1. 模板方法提取公共类

模板方法

模板方法模式是在父类中定义算法的骨架,把具体实延迟到子类中去,可以在不改变一个算法的结构时可重定义该算法的某些步骤。

前面我们已经编写了三个业务的分析代码,代码结构都是分五部分,非常的相似。针对这样的代码,我们可以进行优化,提取模板类,让所有的任务类都按照模板的顺序去执行。

BaseTask.scala

package com.abc.realprocess.task

import com.abc.realprocess.bean.ClickLogWide
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

// 抽取一个公共的trait, 所有的任务都来实现它
trait BaseTask[T] {
  /**
    * 对原始日志数据流 进行map转换 分组 时间窗口 聚合 落地HBase
    * @param clickLogWideDataStream
    * @return
    */
  def process(clickLogWideDataStream: DataStream[ClickLogWide]):Any={
    val mapDataStream:DataStream[T] = map(clickLogWideDataStream)
    val keyedStream:KeyedStream[T, String] = keyBy(mapDataStream)
    val windowedStream: WindowedStream[T, String, TimeWindow] = timeWindow(keyedStream)
    val reduceDataStream: DataStream[T] = reduce(windowedStream)
    sink2HBase(reduceDataStream)
  }

  // Map转换数据流
  def map(source:DataStream[ClickLogWide]):DataStream[T]

  // 分组
  def keyBy(mapDataStream: DataStream[T]):KeyedStream[T,String]

  // 时间窗口
  def timeWindow(keyedStream: KeyedStream[T, String]):WindowedStream[T, String, TimeWindow]

  // 聚合  
  def reduce(windowedStream: WindowedStream[T, String, TimeWindow]): DataStream[T]

  // 落地HBase
  def sink2HBase(reduceDataStream: DataStream[T])
}

改造后的代码:

// 添加一个`ChannelFreshness`样例类,它封装要统计的四个业务字段:频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
case class ChannelFreshness(var channelID: String,
                            var date: String,
                            var newCount: Long,
                            var oldCount: Long)


object ChannelFreshnessTask extends BaseTask[ChannelFreshness] {

  // 1. 使用flatMap算子,将`ClickLog`对象转换为`ChannelFreshness`
  override def map(source: DataStream[ClickLogWide]): DataStream[ChannelFreshness] = {
    source.flatMap {
      clicklog =>
        val isOld = (isNew: Int, isDateNew: Int) => if (isNew == 0 && isDateNew == 1) 1 else 0

        List(
          ChannelFreshness(clicklog.channelID, clicklog.yearMonthDayHour, clicklog.isNew, isOld(clicklog.isNew, clicklog.isHourNew)),
          ChannelFreshness(clicklog.channelID, clicklog.yearMonthDay, clicklog.isNew, isOld(clicklog.isNew, clicklog.isDayNew)),
          ChannelFreshness(clicklog.channelID, clicklog.yearMonth, clicklog.isNew, isOld(clicklog.isNew, clicklog.isMonthNew))
        )
    }
  }

  override def keyBy(mapDataStream: DataStream[ChannelFreshness]): KeyedStream[ChannelFreshness, String] = {
    mapDataStream.keyBy {
      freshness =>
        freshness.channelID + freshness.date
    }
  }

  override def timeWindow(keyedStream: KeyedStream[ChannelFreshness, String]): WindowedStream[ChannelFreshness, String, TimeWindow] = {
    keyedStream.timeWindow(Time.seconds(3))
  }

  override def reduce(windowedStream: WindowedStream[ChannelFreshness, String, TimeWindow]): DataStream[ChannelFreshness] = {
    windowedStream.reduce {
      (freshness1, freshness2) =>
        ChannelFreshness(freshness2.channelID, freshness2.date, freshness1.newCount + freshness2.newCount, freshness1.oldCount + freshness2.oldCount)
    }
  }

  override def sink2HBase(reduceDataStream: DataStream[ChannelFreshness]) = {
    reduceDataStream.addSink {
      value => {
        val tableName = "channel_freshness"
        val cfName = "info"
        // 频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
        val channelIdColName = "channelID"
        val dateColName = "date"
        val newCountColName = "newCount"
        val oldCountColName = "oldCount"
        val rowkey = value.channelID + ":" + value.date

        // - 判断hbase中是否已经存在结果记录
        val newCountInHBase = HBaseUtil.getData(tableName, rowkey, cfName, newCountColName)
        val oldCountInHBase = HBaseUtil.getData(tableName, rowkey, cfName, oldCountColName)

        var totalNewCount = 0L
        var totalOldCount = 0L

        // 判断hbase中是否有历史的指标数据
        if (StringUtils.isNotBlank(newCountInHBase)) {
          totalNewCount = newCountInHBase.toLong + value.newCount
        }
        else {
          totalNewCount = value.newCount
        }

        if (StringUtils.isNotBlank(oldCountInHBase)) {
          totalOldCount = oldCountInHBase.toLong + value.oldCount
        }
        else {
          totalOldCount = value.oldCount
        }

        // 将合并累计的数据写入到hbase中
        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          channelIdColName -> value.channelID,
          dateColName -> value.date,
          newCountColName -> totalNewCount,
          oldCountColName -> totalOldCount
        ))
      }
    }
  }
}

2. 实时频道地域分析业务开发

2.1. 业务介绍

通过地域分析,可以帮助查看地域相关的PV/UV、用户新鲜度。

需要分析出来指标

  • PV
  • UV
  • 新用户
  • 老用户

需要分析的维度

  • 地域(国家省市)——这里为了节省时间,只分析市级的地域维度,其他维度大家可以自己来实现
  • 时间维度(时、天、月)

统计分析后的结果如下:

频道ID地域(国/省/市)时间PVUV新用户老用户
频道1中国北京市朝阳区2018091000300123171
频道1中国北京市朝阳区2018091051212323100
频道1中国北京市朝阳区2018091010100411130

2.2. 业务开发

步骤

  1. 创建频道地域分析样例类(频道、地域(国省市)、时间、PV、UV、新用户、老用户)
  2. 将预处理后的数据,使用flatMap转换为样例类
  3. 按照频道时间地域进行分组(分流)
  4. 划分时间窗口(3秒一个窗口)
  5. 进行合并计数统计
  6. 打印测试
  7. 将计算后的数据下沉到Hbase

实现

  1. 创建一个ChannelAreaTask单例对象
  2. 添加一个ChannelArea样例类,它封装要统计的四个业务字段:频道ID(channelID)、地域(area)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
  3. ChannelAreaTask中编写一个process方法,接收预处理后的DataStream
  4. 使用flatMap算子,将ClickLog对象转换为三个不同时间维度ChannelArea
  5. 按照频道ID时间地域进行分流
  6. 划分时间窗口(3秒一个窗口)
  7. 执行reduce合并计算
  8. 打印测试
  9. 将合并后的数据下沉到hbase
    • 准备hbase的表名、列族名、rowkey名、列名
    • 判断hbase中是否已经存在结果记录
    • 若存在,则获取后进行累加
    • 若不存在,则直接写入

参考代码

ChannelFreshnessTask.scala

package com.abc.realprocess.task

import com.abc.realprocess.bean.ClickLogWide
import com.abc.realprocess.util.HBaseUtil
import org.apache.commons.lang.StringUtils
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, WindowedStream}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow


// 添加一个`ChannelFreshness`样例类,它封装要统计的四个业务字段:频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
case class ChannelFreshness(var channelID: String,
                            var date: String,
                            var newCount: Long,
                            var oldCount: Long)
object ChannelFreshnessTask extends BaseTask[ChannelFreshness] {

  // 1. 使用flatMap算子,将`ClickLog`对象转换为`ChannelFreshness`
  override def map(source: DataStream[ClickLogWide]): DataStream[ChannelFreshness] = {
    source.flatMap {
      clicklog =>
        val isOld = (isNew: Int, isDateNew: Int) => if (isNew == 0 && isDateNew == 1) 1 else 0

        List(
          ChannelFreshness(clicklog.channelID, clicklog.yearMonthDayHour, clicklog.isNew, isOld(clicklog.isNew, clicklog.isHourNew)),
          ChannelFreshness(clicklog.channelID, clicklog.yearMonthDay, clicklog.isNew, isOld(clicklog.isDayNew, clicklog.isDayNew)),
          ChannelFreshness(clicklog.channelID, clicklog.yearMonth, clicklog.isNew, isOld(clicklog.isMonthNew, clicklog.isMonthNew))
        )
    }
  }

  override def keyBy(mapDataStream: DataStream[ChannelFreshness]): KeyedStream[ChannelFreshness, String] = {
    mapDataStream.keyBy {
      freshness =>
        freshness.channelID + freshness.date
    }
  }

  override def timeWindow(keyedStream: KeyedStream[ChannelFreshness, String]): WindowedStream[ChannelFreshness, String, TimeWindow] = {
    keyedStream.timeWindow(Time.seconds(3))
  }

  override def reduce(windowedStream: WindowedStream[ChannelFreshness, String, TimeWindow]): DataStream[ChannelFreshness] = {
    windowedStream.reduce {
      (freshness1, freshness2) =>
        ChannelFreshness(freshness2.channelID, freshness2.date, freshness1.newCount + freshness2.newCount, freshness1.oldCount + freshness2.oldCount)
    }
  }

  override def sink2HBase(reduceDataStream: DataStream[ChannelFreshness]) = {
    reduceDataStream.addSink {
      value => {
        val tableName = "channel_freshness"
        val cfName = "info"
        // 频道ID(channelID)、日期(date)、新用户(newCount)、老用户(oldCount)
        val channelIdColName = "channelID"
        val dateColName = "date"
        val newCountColName = "newCount"
        val oldCountColName = "oldCount"
        val rowkey = value.channelID + ":" + value.date

        // - 判断hbase中是否已经存在结果记录
        val newCountInHBase = HBaseUtil.getData(tableName, rowkey, cfName, newCountColName)
        val oldCountInHBase = HBaseUtil.getData(tableName, rowkey, cfName, oldCountColName)

        var totalNewCount = 0L
        var totalOldCount = 0L

        // 判断hbase中是否有历史的指标数据
        if (StringUtils.isNotBlank(newCountInHBase)) {
          totalNewCount = newCountInHBase.toLong + value.newCount
        }
        else {
          totalNewCount = value.newCount
        }

        if (StringUtils.isNotBlank(oldCountInHBase)) {
          totalOldCount = oldCountInHBase.toLong + value.oldCount
        }
        else {
          totalOldCount = value.oldCount
        }

        // 将合并累计的数据写入到hbase中
        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          channelIdColName -> value.channelID,
          dateColName -> value.date,
          newCountColName -> totalNewCount,
          oldCountColName -> totalOldCount
        ))
      }
    }
  }
}

3. 实时运营商分析业务开发

3.1. 业务介绍

分析出来中国移动、中国联通、中国电信等运营商的指标。来分析,流量的主要来源是哪个运营商的,这样就可以进行较准确的网络推广。

需要分析出来指标

  • PV
  • UV
  • 新用户
  • 老用户

需要分析的维度

  • 运营商
  • 时间维度(时、天、月)

统计分析后的结果如下:

频道ID运营商时间PVUV新用户老用户
频道1中国联通20180910003000300
频道1中国联通20180910123101
频道1中国电信201809101055220

3.2. 业务开发

步骤

  1. 将预处理后的数据,转换为要分析出来数据(频道、运营商、时间、PV、UV、新用户、老用户)样例类
  2. 按照频道时间运营商进行分组(分流)
  3. 划分时间窗口(3秒一个窗口)
  4. 进行合并计数统计
  5. 打印测试
  6. 将计算后的数据下沉到Hbase

实现

  1. 创建一个ChannelNetworkTask单例对象
  2. 添加一个ChannelNetwork样例类,它封装要统计的四个业务字段:频道ID(channelID)、运营商(network)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
  3. ChannelNetworkTask中编写一个process方法,接收预处理后的DataStream
  4. 使用flatMap算子,将ClickLog对象转换为三个不同时间维度ChannelNetwork
  5. 按照频道ID时间运营商进行分流
  6. 划分时间窗口(3秒一个窗口)
  7. 执行reduce合并计算
  8. 打印测试
  9. 将合并后的数据下沉到hbase
    • 准备hbase的表名、列族名、rowkey名、列名
    • 判断hbase中是否已经存在结果记录
    • 若存在,则获取后进行累加
    • 若不存在,则直接写入

参考代码

package com.abc.realprocess.task

import com.abc.realprocess.bean.ClickLogWide
import com.abc.realprocess.util.HBaseUtil
import org.apache.commons.lang.StringUtils
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.windowing.time.Time

// 2. 添加一个`ChannelNetwork`样例类,它封装要统计的四个业务字段:频道ID(channelID)、运营商(network)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
case class ChannelNetwork(var channelID: String,
                          var network: String,
                          var date: String,
                          var pv: Long,
                          var uv: Long,
                          var newCount: Long,
                          var oldCount: Long)

object ChannelNetworkTask extends BaseTask[ChannelNetwork] {

  override def map(source: DataStream[ClickLogWide]): DataStream[ChannelNetwork] = {

    source.flatMap {
      clicklog =>
        val isOld = (isNew: Int, isDateNew: Int) => if (isNew == 0 && isDateNew == 1) 1 else 0

        List(
          ChannelNetwork(clicklog.channelID,
            clicklog.network,
            clicklog.yearMonthDayHour,
            clicklog.count,
            clicklog.isHourNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isHourNew)), // 小时维度
          ChannelNetwork(clicklog.channelID,
            clicklog.network,
            clicklog.yearMonthDay,
            clicklog.count,
            clicklog.isDayNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isDayNew)), // 天维度
          ChannelNetwork(clicklog.channelID,
            clicklog.network,
            clicklog.yearMonth,
            clicklog.count,
            clicklog.isMonthNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isMonthNew)) // 月维度
        )
    }
  }

  override def keyBy(mapDataStream: DataStream[ChannelNetwork]): KeyedStream[ChannelNetwork, String] = {
    mapDataStream.keyBy {
      network =>
        network.channelID + network.date + network.network
    }
  }

  override def timeWindow(keyedStream: KeyedStream[ChannelNetwork, String]): WindowedStream[ChannelNetwork, String, TimeWindow] = {
    keyedStream.timeWindow(Time.seconds(3))
  }

  override def reduce(windowedStream: WindowedStream[ChannelNetwork, String, TimeWindow]): DataStream[ChannelNetwork] = {
    windowedStream.reduce {
      (network1, network2) =>
        ChannelNetwork(network2.channelID,
          network2.network,
          network2.date,
          network1.pv + network2.pv,
          network1.uv + network2.uv,
          network1.newCount + network2.newCount,
          network1.oldCount + network2.oldCount)
    }
  }

  override def sink2HBase(reduceDataStream: DataStream[ChannelNetwork]): Unit = {
    reduceDataStream.addSink(new SinkFunction[ChannelNetwork] {
      override def invoke(value: ChannelNetwork): Unit = {
        // - 准备hbase的表名、列族名、rowkey名、列名
        val tableName = "channel_network"
        val cfName = "info"
        // 频道ID(channelID)、运营商(network)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
        val rowkey = s"${value.channelID}:${value.date}:${value.network}"
        val channelIdColName = "channelID"
        val networkColName = "network"
        val dateColName = "date"
        val pvColName = "pv"
        val uvColName = "uv"
        val newCountColName = "newCount"
        val oldCountColName = "oldCount"

        // - 判断hbase中是否已经存在结果记录
        val resultMap: Map[String, String] = HBaseUtil.getMapData(tableName, rowkey, cfName, List(
          pvColName,
          uvColName,
          newCountColName,
          oldCountColName
        ))

        var totalPv = 0L
        var totalUv = 0L
        var totalNewCount = 0L
        var totalOldCount = 0L

        if(resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(pvColName))) {
          totalPv = resultMap(pvColName).toLong + value.pv
        }
        else {
          totalPv = value.pv
        }

        if(resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(uvColName))) {
          totalUv = resultMap(uvColName).toLong + value.uv
        }
        else {
          totalUv = value.uv
        }

        if(resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(newCountColName))) {
          totalNewCount = resultMap(newCountColName).toLong + value.newCount
        }
        else {
          totalNewCount = value.newCount
        }

        if(resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(oldCountColName))) {
          totalOldCount = resultMap(oldCountColName).toLong + value.oldCount
        }
        else {
          totalOldCount = value.oldCount
        }

        // 频道ID(channelID)、运营商(network)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          channelIdColName -> value.channelID,
          networkColName -> value.network,
          dateColName -> value.date,
          pvColName -> totalPv.toString,
          uvColName -> totalUv.toString,
          newCountColName -> totalNewCount.toString,
          oldCountColName -> totalOldCount.toString
        ))
      }
    })
  }
}

4. 实时频道浏览器分析业务开发

4.1. 业务介绍

需要分别统计不同浏览器(或者客户端)的占比

需要分析出来指标

  • PV
  • UV
  • 新用户
  • 老用户

需要分析的维度

  • 浏览器
  • 时间维度(时、天、月)

统计分析后的结果如下:

频道ID浏览器时间PVUV新用户老用户
频道1360浏览器20180910003000300
频道1IE20180910123101
频道1Chrome201809101055220

4.2. 业务开发

步骤

  1. 创建频道浏览器分析样例类(频道、浏览器、时间、PV、UV、新用户、老用户)
  2. 将预处理后的数据,使用flatMap转换为要分析出来数据样例类
  3. 按照频道时间浏览器进行分组(分流)
  4. 划分时间窗口(3秒一个窗口)
  5. 进行合并计数统计
  6. 打印测试
  7. 将计算后的数据下沉到Hbase

实现

  1. 创建一个ChannelBrowserTask单例对象
  2. 添加一个ChannelBrowser样例类,它封装要统计的四个业务字段:频道ID(channelID)、浏览器(browser)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
  3. ChannelBrowserTask中编写一个process方法,接收预处理后的DataStream
  4. 使用flatMap算子,将ClickLog对象转换为三个不同时间维度ChannelBrowser
  5. 按照频道ID时间浏览器进行分流
  6. 划分时间窗口(3秒一个窗口)
  7. 执行reduce合并计算
  8. 打印测试
  9. 将合并后的数据下沉到hbase
    • 准备hbase的表名、列族名、rowkey名、列名
    • 判断hbase中是否已经存在结果记录
    • 若存在,则获取后进行累加
    • 若不存在,则直接写入

参考代码

package com.abc.realprocess.task

import com.abc.realprocess.bean.ClickLogWide
import com.abc.realprocess.util.HBaseUtil
import org.apache.commons.lang.StringUtils
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.windowing.time.Time

// 2. 添加一个`ChannelBrowser`样例类,它封装要统计的四个业务字段:频道ID(channelID)、浏览器(browser)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
case class ChannelBrowser(var channelID: String,
                          var browser: String,
                          var date: String,
                          var pv: Long,
                          var uv: Long,
                          var newCount: Long,
                          var oldCount: Long)


object ChannelBrowserTask extends BaseTask[ChannelBrowser] {

  override def map(source: DataStream[ClickLogWide]): DataStream[ChannelBrowser] = {

    source.flatMap {
      clicklog =>
        val isOld = (isNew: Int, isDateNew: Int) => if (isNew == 0 && isDateNew == 1) 1 else 0

        List(
          ChannelBrowser(clicklog.channelID,
            clicklog.browserType,
            clicklog.yearMonthDayHour,
            clicklog.count,
            clicklog.isHourNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isHourNew)), // 小时维度
          ChannelBrowser(clicklog.channelID,
            clicklog.browserType,
            clicklog.yearMonthDayHour,
            clicklog.count,
            clicklog.isDayNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isDayNew)), // 天维度
          ChannelBrowser(clicklog.channelID,
            clicklog.browserType,
            clicklog.yearMonth,
            clicklog.count,
            clicklog.isMonthNew,
            clicklog.isNew,
            isOld(clicklog.isNew, clicklog.isMonthNew)) // 月维度
        )
    }
  }

  override def keyBy(mapDataStream: DataStream[ChannelBrowser]): KeyedStream[ChannelBrowser, String] = {
    mapDataStream.keyBy {
      broswer =>
        broswer.channelID + broswer.date + broswer.browser
    }
  }

  override def timeWindow(keyedStream: KeyedStream[ChannelBrowser, String]): WindowedStream[ChannelBrowser, String, TimeWindow] = {
    keyedStream.timeWindow(Time.seconds(3))
  }

  override def reduce(windowedStream: WindowedStream[ChannelBrowser, String, TimeWindow]): DataStream[ChannelBrowser] = {
    windowedStream.reduce {
      (broswer1, broswer2) =>
        ChannelBrowser(broswer2.channelID,
          broswer2.browser,
          broswer2.date,
          broswer1.pv + broswer2.pv,
          broswer1.uv + broswer2.uv,
          broswer1.newCount + broswer2.newCount,
          broswer1.oldCount + broswer2.oldCount)
    }
  }

  override def sink2HBase(reduceDataStream: DataStream[ChannelBrowser]): Unit = {

    reduceDataStream.addSink(new SinkFunction[ChannelBrowser] {
      override def invoke(value: ChannelBrowser): Unit = {
        // - 准备hbase的表名、列族名、rowkey名、列名
        val tableName = "channel_broswer"
        val cfName = "info"
        // 频道ID(channelID)、浏览器(browser)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
        val rowkey = s"${value.channelID}:${value.date}:${value.browser}"
        val channelIDColName = "channelID"
        val broswerColName = "browser"
        val dateColName = "date"
        val pvColName = "pv"
        val uvColName = "uv"
        val newCountColName = "newCount"
        val oldCountColName = "oldCount"

        var totalPv = 0L
        var totalUv = 0L
        var totalNewCount = 0L
        var totalOldCount = 0L

        val resultMap: Map[String, String] = HBaseUtil.getMapData(tableName, rowkey, cfName, List(
          pvColName,
          uvColName,
          newCountColName,
          oldCountColName
        ))

        // 计算PV,如果Hbase中存在pv数据,就直接进行累加

        if (resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(pvColName))) {
          totalPv = resultMap(pvColName).toLong + value.pv
        }
        else {
          totalPv = value.pv
        }

        if (resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(uvColName))) {
          totalUv = resultMap(uvColName).toLong + value.uv
        }
        else {
          totalUv = value.uv
        }


        // - 判断hbase中是否已经存在结果记录
        // - 若存在,则获取后进行累加
        // - 若不存在,则直接写入
        if (resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(newCountColName))) {
          totalNewCount = resultMap(newCountColName).toLong + value.newCount
        }
        else {
          totalNewCount = value.newCount
        }

        if (resultMap != null && resultMap.size > 0 && StringUtils.isNotBlank(resultMap(oldCountColName))) {
          totalOldCount = resultMap(oldCountColName).toLong + value.oldCount
        }
        else {
          totalOldCount = value.oldCount
        }

        // 频道ID(channelID)、浏览器(browser)、日期(date)pv、uv、新用户(newCount)、老用户(oldCount)
        HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
          channelIDColName -> value.channelID,
          broswerColName -> value.browser,
          dateColName -> value.date,
          pvColName -> totalPv.toString,
          uvColName -> totalUv.toString,
          newCountColName -> totalNewCount.toString,
          oldCountColName -> totalOldCount.toString
        ))
      }
    })
  }
}

5. 实时数据同步系统介绍

5.1. 实时数据同步系统目标

  • 理解canal数据同步解决方案
  • 安装canal
  • 实现Flink数据同步系统

5.2. 业务同步系统需求分析

5.2.1. 业务场景

一个大型的电商网站,每天都需要分析当天的成交量。如果使用mysql去分析,会非常慢,甚至会导致mysql宕机。要进行海量数据分析,需要将mysql中的数据同步到其他的海量数据存储介质(HDFS、hbase)中。那如何来导出呢?

5.2.2. sqoop解决方案一

  1. 使用sqoop定期导出mysql的数据到hbase或hdfs
  2. sqoop导出mysql的数据,需要通过sql语句来查询数据,再执行导出

存在的问题

mysql本来压力就比较大,sqoop再执行查询时,还需要执行sql查询,到加大mysql的压力,导致mysql速度更慢

5.2.3. logstash解决方案二

  1. 通过logstash将mysql的数据抽取到kafka
  2. logstatsh编写sql语句,从mysql中查询数据

存在的问题

logstash也需要在mysql中执行sql语句,也会加大mysql的压力,拖慢mysql

5.2.4. canal解决方案三

  1. 通过canal来解析mysql中的binlog日志来获取数据
  2. 不需要使用sql查询mysql,不会增加mysql的压力

6. Canal数据采集平台

6.1. MySql安装

6.2. mysql创建测试表

步骤

  1. 创建pyg数据库
  2. 创建数据库表

实现

推荐使用sqlyog来创建数据库、创建表

  1. 创建pyg数据库
  2. 资料\mysql脚本\下的创建表.sql贴入到sqlyog中执行,创建数据库表

6.3. binlog日志介绍

  • 用来记录mysql中的增加删除修改操作
  • select操作不会保存到binlog中
  • 必须要打开mysql中的binlog功能,才会生成binlog日志
  • binlog日志就是一系列的二进制文件-rw-rw—- 1 mysql mysql 669 11⽉月 10 21:29 mysql-bin.000001
    -rw-rw—- 1 mysql mysql 126 11⽉月 10 22:06 mysql-bin.000002
    -rw-rw—- 1 mysql mysql 11799 11⽉月 15 18:17 mysql-bin.00000

6.4. 开启binlog

步骤

  1. 修改mysql配置文件,添加binlog支持
  2. 重启mysql,查看binlog是否配置成功

实现

  1. 使用vi打开/etc/my.cnf
  2. 添加以下配置
[mysqld]
log-bin=/var/lib/mysql/mysql-bin 
binlog-format=ROW
server_id=1

配置说明

# 2. 配置binlog日志的存放路径为/var/lib/mysql目录,文件以mysql-bin开头
log-bin=/var/lib/mysql/mysql-bin 

# 3. 配置mysql中每一行记录的变化都会详细记录下来
binlog-format=ROW 

# 4. 配置当前机器器的服务ID(如果是mysql集群,不能重复)
server_id=1

3. 重启mysql

service mysql restart

4.mysql -u root -p登录到mysql,执行以下命令

show variables like ‘%log_bin%’;

5. mysql输出以下内容,表示binlog已经成功开启

+———————————+——————————–+
| Variable_name | Value |
+———————————+——————————–+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+———————————+——————————–+
6 rows in set (0.00 sec)

6. 进入到/var/lib/mysql可以查看到mysql-bin.000001文件已经生成

6.5. 安装Canal

6.5.1. Canal介绍

  • canal是阿里巴巴的一个使用Java开发的开源项目
  • 它是专门用来进行数据库同步
  • 目前支持mysql、以及(mariaDB)

6.5.2. Canal原理

  1. Canal模拟mysql slave的交互协议,伪装自己为mysql slave
  2. 向mysql master发送dump协议
  3. mysql master收到dump协议,发送binary log给slave(canal)
  4. canal解析binary log字节流对象

6.5.3. 安装Canal

步骤

  1. 上传canal安装包
  2. 解压canal
  3. 配置canal
  4. 启动canal

实现

  1. 上传\资料\软件包\canal.deployer-1.0.24.tar.gz/export/software目录
  2. /export/servers下创建canal目录,一会直接将canal的文件解压到这个目录中
cd /export/servers
mkdir canal

3. 解压canal到/export/servers目录

tar -xvzf canal.deployer-1.0.24.tar.gz -C ../servers/canal

4. 修改canal/conf/example目录中的instance.properties文件

## mysql serverId
  canal.instance.mysql.slaveId = 1234
  
  # position info
  canal.instance.master.address = cdh1:3306
  
  canal.instance.dbUsername = root
  canal.instance.dbPassword = 000000
1. canal.instance.mysql.slaveId这个ID不能与之前配置的`service_id`重复
2. canal.instance.master.address配置为mysql安装的机器名和端口号

5. 执行/export/servers/canal/bin目录中的startup.sh启动canal

cd /export/servers/canal/bin
./startup.sh

6. 控制台如果输出如下,表示canal已经启动成功

cd to /export/servers/canal/bin for workaround relative path
LOG CONFIGURATION : /export/servers/canal/bin/../conf/logback.xml
canal conf : /export/servers/canal/bin/../conf/canal.properties
CLASSPATH :/export/servers/canal/bin/../conf:/export/servers/canal/bin/../lib/zookeeper-
...
cd to /export/servers/canal/bin for continue
注意:

Canal的远程连接端口号默认为`11111`,当然如果需要,可以在`canal.properties`文件中修改

6.6. Canal数据采集系统 – 项目初始化


步骤

  1. 导入Maven依赖
  2. 拷贝资料\工具类\03.Canal数据采集系统中的pom.xml的依赖到canal-kakfa项目的pom.xml文件夹
  3. 拷贝资料\工具类\03.Canal数据采集系统中的log4j.properties配置文件
  4. 拷贝资料\工具类\03.Canal数据采集系统中的application.properties文件

6.7. Canal采集程序搭建

使用java语言将canal中的binlog日志解析,并写入到Kafka中

在canal-kafka项目的java目录中,创建以下包结构:

包名说明
com.itheima.canal_kafka用来存放操作mysql binlog的工具
com.itheima.canal_kafka.util用来存放工具类的源代码

6.7.1. 编写配置文件加载代码

步骤

  1. 创建GlobalConfigUtil工具类,读取application.properties中的canalkafka配置
  2. 添加main方法,测试是否能正确读取配置

实现

  1. util包中创建GlobalConfigUtil,用来读取application.properties中的配置。我们使用以下代码来读取application.properties中的配置
 ResourceBundle bundle = ResourceBundle.getBundle("配置文件名", Locale.ENGLISH);
  String host = bundle.getString("属性key");

application.properties中的canalkafka配置读取出来

  1. 编写main方法测试是否能够正确读取配置

参考代码

GlobalConfigUtil.java

public class GlobalConfigUtil {
    public static ResourceBundle bundle = ResourceBundle.getBundle("application", Locale.ENGLISH);

    public static String canalHost = bundle.getString("canal.host");
    public static String canalPort = bundle.getString("canal.port");
    public static String canalInstance = bundle.getString("canal.instance");
    public static String mysqlUsername = bundle.getString("mysql.username");
    public static String mysqlPassword = bundle.getString("mysql.password");

    public static String kafkaBootstrapServers = bundle.getString("kafka.bootstrap.servers");
    public static String kafkaZookeeperConnect = bundle.getString("kafka.zookeeper.connect");
    public static String kafkaInputTopic = bundle.getString("kafka.input.topic");

    public static void main(String[] args) {
        System.out.println(canalHost);
        System.out.println(canalPort);
        System.out.println(canalInstance);
        System.out.println(mysqlUsername);
        System.out.println(mysqlPassword);

        System.out.println(kafkaBootstrapServers);
        System.out.println(kafkaZookeeperConnect);
        System.out.println(kafkaInputTopic);
    }
}

注意:
使用ResourceBundle.getBundle(“application”, Locale.ENGLISH); 读取application.properties读取配置文件,不需要写后缀名

6.7.2. 导入Kafka工具类代码

导入资料\工具类\03.Canal数据采集系统下工具类KafkaSender.java

6.7.3. 导入Canal解析binlog日志工具类代码

  • 将mysql中的binlog日志解析
  • 将解析后的数据写入到Kafka

导入资料\工具类\03.Canal数据采集系统下工具类CanalClient.java

6.7.4. 测试工具类代码

步骤

  1. 启动mysql
  2. 启动canal
  3. 启动zookeeper集群
  4. 启动kafka集群
  5. 在kafka创建一个canaltopic
bin/kafka-topics.sh --create --zookeeper cdh1:2181 --replication-factor 2 --partitions 3 --topic canal

6. 启动kafka的控制台消费者程序

bin/kafka-console-consumer.sh --zookeeper cdh1:2181 --from-beginning --topic canal

7. 启动工具类canal同步程序

8. 打开sqlyog,往mysql中插入一些数据

INSERT INTO commodity(commodityId , commodityName , commodityTypeId , originalPrice , activityPrice) VALUES (1 , '耐克' , 1 , 888.00 , 820.00);
INSERT INTO commodity(commodityId , commodityName , commodityTypeId , originalPrice , activityPrice) VALUES (2 , '阿迪达斯' , 1 , 900.00 , 870.00);
INSERT INTO commodity(commodityId , commodityName , commodityTypeId , originalPrice , activityPrice) VALUES (3 , 'MacBook Pro' , 2 , 18000.00 , 17500.00);
INSERT INTO commodity(commodityId , commodityName , commodityTypeId , originalPrice , activityPrice) VALUES (4 , '联想' , 2 , 5500.00 , 5320.00);
INSERT INTO commodity(commodityId , commodityName , commodityTypeId , originalPrice , activityPrice) VALUES (5 , '索菲亚' , 3 , 35000.00 , 30100.00);
INSERT INTO commodity(commodityId , commodityName , commodityTypeId , originalPrice , activityPrice) VALUES (6 , '欧派' , 3 , 43000.00 , 40000.00);

9. 如果kafka中能看到打印以下消息,表示canal已经正常工作

{"emptyCount":2,"logFileName":"mysql-bin.000002","dbName":"pyg","logFileOffset":250,"eventType":"INSERT","columnValueList":[{"columnName":"commodityId","columnValue":"1","isValid":"true"},{"columnName":"commodityName","columnValue":"耐克","isValid":"true"},{"columnName":"commodityTypeId","columnValue":"1","isValid":"true"},{"columnName":"originalPrice","columnValue":"888.0","isValid":"true"},{"columnName":"activityPrice","columnValue":"820.0","isValid":"true"}],"tableName":"commodity","timestamp":1553741346000}
如果你觉得这篇文章或者我分享的主题对你有帮助,请支持我继续更新网站和主题 !捐赠本站
喜欢 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址