欢迎来到天剑厚德,这里有最新的大数据技术原创分享。

Flink电商指标分析项目1

flink healthsun 0评论

1. 项目背景

1.1. 项目简介

公司有一个正在运营中的电商网站,名称叫做【品优购】。这是一个B2B2C的电商平台,类似京东。

现在我们想了解一下该电商网站的各种用户行为(访问行为、购物行为、点击行为等),统计出PV、UV等数据。

针对这样的大数据分析项目,我们可以采用MapReduce、Spark或者Flink来进行开发。

由于本项目会对实时数据和静态数据进行分析,所以我们采用性能更加优越的Flink来开发。

业务目标

  • 帮助产品经理、数据分析师以及管理人员分析现有产品的情况
  • 根据用户行为分析结果持续改进产品的设计
  • 调整公司战略和业务
  • 用大数据技术来帮助提升业绩、营业额以及市场占有率

常见电商模式

C2C–个人对个人 案例:淘宝、瓜子二手车

B2B–企业对企业 案例:阿里巴巴、慧聪网

B2C–企业对个人 案例:唯品会、乐蜂网

B2B2C -企业-企业-个人 案例:京东商城、天猫商城

C2B–个人对企业 案例:海尔商城、 尚品宅配

O2O–线上到线下 案例:美团、饿了么

1.2. 多维度指标分析

用户指标

商品指标

电商小概念:
SPU = Standard Product Unit (标准产品单位)SPU是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息的集合,该集合描述了一个产品的特性。 例如:iPhone X 可以确定一个产品即为一个SPU。
SKU = Stock Keeping Unit(库存量单位)。一款商品有多个颜色,则是有多个SKU,例:一件衣服,有红色、白色、蓝色,则SKU编码也不相同,如相同则会出现混淆,发错货。 例如:iPhone X 64G 银色 则是一个SKU。

2. 项目整体介绍

2.1. 使用到的技术

  • 语言Java、scala
  • 框架Spring Boot、Hadoop、HBase、Kafka、Flink、Canal

2.2. 项目整体流程

  1. 上报服务系统将商城访问日志推送到kafka
  2. 数据库同步系统将mysql数据推送到kafka
  3. 实时分析系统消费kafka数据,经过分析后,下沉到HBase
  4. 实时同步系统消费kafka数据,下沉到HBase
  5. 批处理分析系统从hbase取出数据,进行数据分析
  6. web可视化平台展示HBase中的分析结果数据

3. 项目的技术选型

为什么要选择架构中的技术?

  • Kafka
  • Hbase
  • Canal
  • Flink

3.1. Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统。

  • 吞吐量高
    • 每秒达到几十万的数据
  • 速度快
    • 内存接力(消费者直接消费PageCache内存区数据)
  • 数据安全
    • 冗余机制可以有效避免消息丢失错误
    • 其中几台实例挂掉也可以继续工作
  • 高并发
    • 突发型流量可以将峰值的数据持久化,等到流量正常或低估时再取出消费

3.2. HBase

HBase是一个分布式的、面向列的开源数据库。

  • 业务不复杂
    • PV/UV、页面点击量、新鲜度(没有复杂SQL查询)
  • 实时电商平台
    • 存取海量的数据,一个表可以有上亿行,上百万列
  • 社区活跃Hbase社区非常大,Facebook、小米、网易等公司都在使用HBase
  • 高可用没有单点故障,高可用性高

3.3. Canal

数据库同步常用的有两种方案:

  • 方案1mysql --> logstash --> kafka --> flink --> hbase
  • 方案2mysql --> sqoop --> kafka --> flink-->hbase

上述方案存在的问题

logstash、sqoop还是需要使用SQL语句查询mysql,会给mysql增加压力,如果要跑大量数据的同步,会拖垮mysql

  • 解决方案mysql --> cannal(binlog) --> kafka --> flink --> hbase

cannal

Canal可以实时解析mysql的binlog日志,通过读取binlog日志,将数据输出到Kafka。不需要执行SQL语句,不会增加mysql压力

3.4. Flink

  • 速度要比Spark、MapReduce更快
  • 保证EXACTLY_ONCE
  • 容错更轻量级
  • 自动程序调优,自动避免性能消耗较大的操作(例如:shuffle、sort)
  • 高吞吐

问题:

为什么要选择基于canal来进行数据库同步技术?

4. 项目整体工程搭建

4.1. 工程结构

本项目采用Maven构建,下面是我们的项目的整体工程架构。总工程为pyg,下面包含我们要开发的5个子模块。

IDEA创建工程

  1. 在指定目录创建父模块pyg删除总工程的src目录

在父模块配置Java版本为1.8

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
  1. 创建report模块
  2. 创建real-process实时处理Maven模块
  3. 创建canal-kafka数据采集Maven模块
  4. 创建sync-db数据库同步处理Maven模块
  5. 创建batch-process批数据处理Maven模块

5. 上报服务系统开发

上报服务系统是一个Java Web工程,为了快速开发web项目,我们采用时下JavaWeb最流行的技术Spring Boot

5.1. Spring Boot简介

Spring Boot是一个基于Spring之上的快速应用构建框架。使用Spring Boot可以快速开发出基于Spring的应用。Spring Boot主要解决两方面的问题。

  • 依赖太多问题
    • 轻量级JavaEE开发,需要导入大量的依赖
    • 依赖之间还存在版本冲突
  • 配置太多问题
    • 大量的XML配置

Spring Boot内部整合了大量的依赖,而且经过大量测试,选择的依赖都是没有版本冲突的。Spring Boot简化了大量的配置,通过少量的配置,就可以让程序工作。

开发Spring Boot程序的基本步骤

  • 导入Spring Boot依赖(起步依赖)
  • 编写application.properties配置文件
  • 编写Application入口程序

5.2. 配置Maven本地仓库

因为网络下载慢的原因,所以建议使用提供给大家的本地仓库,直接使用来开发项目

  1. 资料中的maven_本地仓库.zip解压
  2. 并在Maven、IDEA中配置该本地仓库

5.3. 导入Maven依赖

资料/工具类/01.上报服务系统目录中的pom.xml中拷贝依赖到上报服务系统项目

主要导入以下依赖:

  1. 导入Spring Boot依赖
  2. 操作JSON导入FastJSON依赖
  3. 导入Kafka依赖

5.4. 创建项目包结构

包名说明
com.itheima.report.controller存放Spring MVC的controller
com.itheima.report.bean存放相关的Java Bean实体类
com.itheima.report.util用来存放相关的工具类

5.5. 验证Spring Boot工程是否创建成功

步骤

  1. 创建SpringBoot入口程序Application
  2. 创建application.properties配置文件
  3. 编写一个简单Spring MVC Controller/Handler,接收浏览器请求参数并打印回显
  4. 打开浏览器测试

实现

  1. 创建SpringBoot入口程序ReportApplication,用来启动SpringBoot程序
    • 在类上添加以下注解@SpringBootApplication
    • 在main方法中添加以下代码,用来运行Spring Boot程序SpringApplication.run(ReportApplication.class);
  2. 在创建一个TestController注意在类上要添加@RestController注解
@RestController
public class TestController {

}

3.编写一个test Handler,从浏览器上接收一个叫做json参数,并打印回显

@RequestMapping("/test")
public String test(String json) {
    System.out.println(json);
    return json;
}

4. 编写application.properties配置文件

  • 配置端口号(8888)
server.port=8888

5. 启动Spring Boot程序

6. 打开浏览器测试Handler是否能够接收到数据

  http://localhost:8888/test?json=123123

5.6. 安装Kafka-Manager


Kafka-manager是Yahoo!开源的一款Kafka监控管理工具。

安装步骤

  1. 上传资料\软件包中的kafka-manager-1.3.3.7.tar.gz
  2. 解压到/export/serverstar -zxf kafka-manager-1.3.3.7.tar.gz -C /export/servers/
  3. 修改conf/application.confkafka-manager.zkhosts=”node01:2181,node02:2181,node03:2181″
  4. 启动zookeepercd /export/servers/zookeeper-3.4.9/
    bin/zkServer.sh start
  5. 启动kafka# 启动3台机器的Kafka
    cd /export/servers/kafka_2.11-0.10.1.0
    bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
  6. 直接运行bin/kafka-managercd /export/servers/kafka-manager-1.3.3.7
    nohup bin/kafka-manager 2>&1 &
  7. 浏览器中使用node01:9000访问即可

默认kafka-manager的端口号为9000,如果该端口被占用,请使用下面的命令修改端口

bin/kafka-manager -Dconfig.file=/export/servers/kafka-manager-1.3.3.7/conf/application.conf -Dhttp.port=10086

5.7. 编写Kafka生产者配置工具类


由于我们项目要操作Kafka, 我们先来构建出KafkaTemplate, 这是一个Kafka的模板对象, 通过它我们可以很方便的发送消息到Kafka.

开发步骤

  1. 编写Kafka生产者配置
  2. 编写Kafka生产者SpringBoot配置工具类KafkaProducerConfig,构建KafkaTemplate

实现

  1. 导入Kafka生产者配置文件将下面的代码拷贝到application.properties
#
# kakfa
#
#kafka的服务器地址
kafka.bootstrap_servers_config=node01:9092,node02:9092,node03:9092
#如果出现发送失败的情况,允许重试的次数
kafka.retries_config=0
#每个批次发送多大的数据
kafka.batch_size_config=4096
#定时发送,达到1ms发送
kafka.linger_ms_config=1
#缓存的大小
kafka.buffer_memory_config=40960
#TOPIC的名字
kafka.topic=pyg
定时定量**
1. kafka生产者发送一批数据的大小:kafka.producer.batch.size=4096  (单位:字节)
   实际环境可以调大一些,提高效率
2. 定时发送:kafka.producer.linger=1
   达到一毫秒后发送

2. 编写KafkaProducerConfig,主要创建KafkaTemplate,用于发送Kafka消息

  • 使用@Value(“${配置项}“)来读取配置
  • 构建DefaultKafkaProducerFactory
  • 构建KafkaTemplate
@Configuration
public class KafkaConfig {

    @Value("${kafka.bootstrap_servers_config}")
    private String bootstrap_servers_config;

    @Value("${kafka.retries_config}")
    private int retries_config;

    @Value("${kafka.batch_size_config}")
    private int batch_size_config;

    @Value("${kafka.linger_ms_config}")
    private int linger_ms_config;

    @Value("${kafka.buffer_memory_config}")
    private int buffer_memory_config;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config);
        configs.put(ProducerConfig.RETRIES_CONFIG, retries_config);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG, batch_size_config);
        configs.put(ProducerConfig.LINGER_MS_CONFIG, linger_ms_config);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, buffer_memory_config);
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);

        return new KafkaTemplate<String, String>(producerFactory);
    }
}

3. 在test测试源码中创建一个Junit测试用例

  • 整合Spring Boot Test
  • 注入KafkaTemplate
  • 测试发送100条消息到testtopic
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkTest {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    public void sendTest01() {
        for(int i = 0; i < 100; i++) {
            kafkaTemplate.send("test", "key", "test msg!");
        }
    }
}

4. 在KafkaManager创建testtopic,三个分区、两个副本

5. 启动kafka-console-consumer

bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic test

6. 打开kafka-manager的consumer监控页面,查看对应的logsize参数,消息是否均匀的分布在不同的分区中

5.8. 均匀分区

编写RoundRobbinPartitioner,实现Partitioner接口,确保消息能够发送到Kafka的每个分区

  • 实现Partitioner接口的partition方法
  • 创建一个AtomicInteger变量,用来保存当前的计数器,每次生产一条消息加1
  • 使用计数器变量模除以分区数量得到当前消息需要发送的分区号

参考代码

RoundRobinPartitioner.java

public class RoundRobinPartitioner implements Partitioner {

    // 计数器,每次生产一条消息+1
    private AtomicInteger counter = new AtomicInteger();
    private String topic = "";

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        // 获取分区数量
        Integer partitions = cluster.partitionCountForTopic(s);

        int curPartition = counter.incrementAndGet() % partitions;

        if(counter.get() > 65535) {
            counter.set(0);
        }

        return curPartition;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> map) {}
}

5.9. 上报服务开发

上报服务系统要能够接收http请求,并将http请求中的数据写入到kafka

步骤

  1. 创建Message实体类对象所有的点击流消息都会封装到Message实体类中
  2. 设计一个Controller来接收http请求
  3. 将http请求发送的消息封装到一个Message实体类对象
  4. 使用FastJSONMessage实体类对象转换为JSON字符串
  5. 将JSON字符串使用kafkaTemplate写入到kafka
  6. 返回给客户端一个写入结果JSON串

实现

  1. 创建Message实体类
    • 包含以下字段:消息次数(count)、消息时间(timeStamp)、消息体(message)
    • 生成getter/setter、toString方法

Message.java

/**
   * 因为Kafka中pyg的topic会保存所有类型的消息(用户浏览、订单等)
   * 所以都使用一个Message实体类来封装
   */
public class Message {
    private int count ; 		//消息的次数
    private Long timeStamp ; 	//消息的时间
    private String message ; 	//消息体

    // getter/setter

    @Override
    public String toString() {
        return "Message{" +
            "count=" + count +
            ", timeStamp=" + timeStamp +
            ", message='" + message + '\'' +
            '}';
    }
}
  • com.abc.report.controller包下创建ReportController
    • 编写receiveDataHandler接收从客户端JSON数据,并将响应结果封装到Map结构中,返回给客户端 注意:接收JSON数据要在参数前面添加@RequestBody注解
    • 将接收的参数封装到Message实体类
    • 使用FastJSONMessage实体类对象转换为JSON字符串
    • 将JSON字符串发送到Kafka的pygtopic
    • 将响应结果封装到Map结构中,返回给客户端注意:
    1. 在ReportController类上要添加@RestController注解
    2. 需要添加@AutoWired注解来注入KafkaTemplate
    3. 请求参数上要加上@RequestBody注解

ReportController.java

@RestController
public class ReportController {

    @Value("${kafka.topic}")
    private String topic;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @RequestMapping("/receive")
    public Map receive(@RequestBody String json) {
        Map<String, String> result = new HashMap<>();

        try {
            Message msg = new Message();

            msg.setMsg(json);
            msg.setCount(1L);
            msg.setTimestamp(System.currentTimeMillis());

            String msgJSON = JSON.toJSONString(msg);

            kafkaTemplate.send(topic, msgJSON);
            System.out.println(msgJSON);

            result.put("success", "true");
        } catch (Exception e) {
            e.printStackTrace();
            result.put("success", "false");
        }

        return result;
    }
}

5.10. 模拟生产点击流日志消息到Kafka

为了方便进行测试,我们可以使用一个消息生成工具来生成点击流日志,然后发送给上报服务系统。该消息生成工具可以一次生成100条ClickLog信息,并转换成JSON,通过HttpClient把消息内容发送到我们编写好的ReportController。

步骤

  1. 导入资料/工具类/01.上报服务系统中的ClickLog实体类(ClickLog.java)
  2. 导入资料/工具类/01.上报服务系统中的点击流日志生成器(ClickLogGenerator.java)

点击流日志字段

字段说明
channelID频道ID
categoryID产品的类别ID
produceID产品ID
country国家
province省份
city城市
network网络方式(移动、联通、电信…)
source来源方式
browserType浏览器类型
entryTime进入网站时间
leaveTime离开网站时间
userID用户ID

5.11. 验证测试代码

步骤

  1. 创建Kafka的topic(pyg
  2. 使用kafka-console-consumer.sh消费 topic中的数据
  3. 启动上报服务
  4. 执行ClickLogGenerator的main方法,生成一百条用户浏览消息到Kafka

实现

  • 创建kafka topic
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic pyg
  • 启动kafka消费者
bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic pyg

6. Flink实时数据分析系统开发

前边我们已经开发完毕了上报服务系统, 我们可以通过上报服务系统把电商页面中的点击流数据发送到Kafka中, 那么接下来我们就来开发Flink实时分析系统, 通过流的方式读取Kafka中的消息, 进而分析数据。

业务

  • 实时分析频道热点
  • 实时分析频道PV/UV
  • 实时分析频道新鲜度
  • 实时分析频道地域分布
  • 实时分析运营商平台
  • 实时分析浏览器类型

技术

  • Flink实时处理算子
  • 使用CheckPoint水印解决Flink生产上遇到的问题(网络延迟、丢数据)
  • Flink整合Kafka
  • Flink整合HBase

6.1. 搭建【Flink实时数据分析系统】项目环境

6.1.1. 导入Maven项目依赖

  1. 资料\工具类\02.Flink实时分析系统目录的pom.xml文件中的依赖导入到real-process项目的pom.xml
  2. real-process模块添加scala支持
  3. main和test创建scala文件夹,并标记为源代码和测试代码目录

6.1.2. 创建项目包结构

包名说明
com.itheima.realprocess.util存放工具类
com.itheima.realprocess.bean存放实体类
com.itheima.realprocess.task存放具体的分析任务 每一个业务都是一个任务,对应的分析处理都写在这里

6.1.3. 导入实时系统Kafka/Hbase配置

  1. 资料\工具类\02.Flink实时分析系统目录中的application.conf导入到resources目录
  2. 资料\工具类\02.Flink实时分析系统目录中的log4j.propertiesresources目录下

注意修改kafka服务器hbase服务器的机器名称

6.1.4. 获取配置文件API介绍

ConfigFactory.load()介绍

  • 使用ConfigFactory.load()可以自动加载配置文件中的application.conf文件(注意:名字一定不要写错,否则无法加载),并返回一个Config对象
  • 使用Config对象可以获取到配置文件中的配置项
  • application.conf文件是一个properties文件,存放key-value键值对的数据。

常用API

方法名说明
getString("key")获取配置文件中指定key的值对应的字符串
getInt(“key”)获取配置文件中指定key的值对应的整型数字
getLong(“key”)同上
getBoolean(“key”)同上

6.1.5. 编写scala代码读取配置工具类

com.abc.realprocess.util包下创建GlobalConfigUtil单例对象(object)

步骤

  1. 使用ConfigFactory.load获取配置对象
  2. 编写方法加载application.conf配置
  3. 添加一个main方法测试,工具类是否能够正确读取出配置项。

示例代码:

GlobalConfigUtil.scala

/**
  * 加载配置文件工具类
  */
object GlobalConfigUtil {

  private val config: Config = ConfigFactory.load()

  /*------------------
   * Kafka配置
   *------------------*/

  val bootstrapServers = config.getString("bootstrap.servers")
  val zookeeperConnect = config.getString("zookeeper.connect")
  val inputTopic = config.getString("input.topic")
  val groupId = config.getString("group.id")
  val enableAutoCommit = config.getString("enable.auto.commit")
  val autoCommitIntervalMs = config.getString("auto.commit.interval.ms")
  val autoOffsetReset = config.getString("auto.offset.reset")
    
  
  // 测试配置文件读取类
  def main(args: Array[String]): Unit = {
    println(bootstrapServers)
    println(zookeeperConnect)
    println(inputTopic)
    println(groupId)
    println(enableAutoCommit)
    println(autoCommitIntervalMs)
    println(autoOffsetReset)
  }
}

6.2 初始化Flink流式计算环境

步骤

  1. 创建App单例对象
  2. 创建main方法,获取StreamExecutionEnvironment运行环境
  3. 设置流处理的时间为EventTime,使用数据发生的时间来进行数据处理
  4. 设置Flink的并行度
  5. 编写测试代码,测试Flink程序是否能够正确执行

参考代码

// 初始化Flink运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置流处理的时间为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设置并行度
env.setParallelism(1)

// 从本地集合中创建DataStream
val testDataStream: DataStream[String] = env.fromCollection(
    List("hadoop", "hive", "spark")
)
// 打印测试结果
testDataStream.print()

env.execute("App")

注意:
一定要导入import org.apache.flink.api.scala._隐式转换,否则Flink程序无法执行
到导入org.apache.flink.streaming.api下的TimeCharacteristic,否则没有EventTime

6.3. Flink添加checkpoint容错支持

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。

步骤

  1. Flink envrionment中添加checkpoint支持
  2. 运行Flink程序测试checkpoint是否配置成功(检查HDFS中是否已经保存snapshot数据)

实现

  • 在Flink流式处理环境中,添加以下checkpoint的支持,确保Flink的高容错性,数据不丢失。
////////
//
// 保证程序长时间运行的安全性进行checkpoint操作
//
// 5秒启动一次checkpoint
env.enableCheckpointing(5000)
// 设置checkpoint只checkpoint一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置两次checkpoint的最小时间间隔
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
// checkpoint超时的时长
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许的最大checkpoint并行度
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 当程序关闭的时,触发额外的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 设置checkpoint的地址
env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/"))
  • 启动HDFS
  • 启动Flink程序测试
  • 如果测试成功,在HDFS中应该生成如下几个目录

6.4. Flink整合Kafka

6.4.1. Flink读取Kafa数据

步骤

  1. 配置Kafka连接属性
  2. 使用FlinkKafkaConsumer010整合Kafka
  3. 添加一个source到当前Flink环境
  4. 启动zookeeper
  5. 启动kafka
  6. 运行Flink程序测试是否能够从Kafka中消费到数据

实现

  • 配置Kafka连接属性
////////
//
// 整合Kafka
//
val properties = new Properties()

// # Kafka集群地址
properties.setProperty("bootstrap.servers",GlobalConfigUtil.bootstrapServers)
// # ZooKeeper集群地址
properties.setProperty("zookeeper.connect",GlobalConfigUtil.zookeeperConnect)
// # Kafka Topic名称
properties.setProperty("input.topic",GlobalConfigUtil.inputTopic)
// # 消费组ID
properties.setProperty("group.id",GlobalConfigUtil.groupId)
// # 自动提交拉取到消费端的消息offset到kafka
properties.setProperty("enable.auto.commit",GlobalConfigUtil.enableAutoCommit)
// # 自动提交offset到zookeeper的时间间隔单位(毫秒)
properties.setProperty("auto.commit.interval.ms",GlobalConfigUtil.autoCommitIntervalMs)
// # 每次消费最新的数据
properties.setProperty("auto.offset.reset",GlobalConfigUtil.autoOffsetReset)

val consumer = new FlinkKafkaConsumer010[String](
    GlobalConfigUtil.inputTopic,
    new SimpleStringSchema(),
    properties
)
  • 添加一个source到当前Flink环境
val kafkaDataStream: DataStream[String] = env.addSource(consumer)
  • 打印DataStream中的数据
kafkaDataStream.print()
  • 启动zookeeper
  • 启动kafka
  • 运行Flink程序
  • 运行上报服务系统
  • 启动消息生成器, 测试是否能够从Kafka中消费到数据

如果Flink从Kafka消费成功会打印以下数据,就证明我们的代码是正确的。

  {
      "count": 1,
      "message": "{\"browserType\":\"谷歌浏览器\",\"categoryID\":6,\"channelID\":4,\"city\":\"America\",\"country\":\"china\",\"entryTime\":1544601660000,\"leaveTime\":1544634060000,\"network\":\"联通\",\"produceID\":4,\"province\":\"china\",\"source\":\"百度跳转\",\"userID\":13}",
      "timeStamp": 1553188417573
  }

6.4.2. Kafka消息解析为元组

步骤

  • 使用map算子,遍历kafka中消费到的数据
  • 使用FastJSON转换为JSON对象
  • 将JSON的数据解析成一个元组
  • 打印map映射后的元组数据
  • 测试是否能够正确解析

代码

  1. 使用map算子,将kafka中消费到的数据,使用FastJSON准换为JSON对象
  2. 将JSON的数据解析成一个元组

object App {
def main(args: Array[String]): Unit = {

// 使用map算子,将kafka中消费到的数据
val tupleDataStream = kafkaDataStream.map {
  msgJson =>
    // 使用FastJSON转换为JSON对象
    val jsonObject = JSON.parseObject(msgJson)
    val count = jsonObject.getLong("count")
    val message = jsonObject.getString("message")
    val timestamp = jsonObject.getLong("timeStamp")

    // 将JSON的数据解析封装到元组中
   (count, message,timestamp)
}
tupleDataStream.print()

...
}
}

打印经过map映射后的元组数据,测试是否能够正确解析

(1,{"browserType":"360浏览器","categoryID":6,"channelID":15,"city":"ShiJiaZhuang","country":"china","entryTime":1544619660000,"leaveTime":1544634060000,"network":"移动","produceID":2,"province":"Beijing","source":"直接输入","userID":8},1557474908321)

6.4.3. Flink封装点击流消息为样例类

步骤

  1. 创建一个ClickLog样例类中来封装消息
  2. 使用map算子将数据封装到ClickLog样例类

代码

  1. 在bean包中,创建ClickLog样例类,添加以下字段
  • 频道ID(channelID)
  • 产品类别ID(categoryID)
  • 产品ID(produceID)
  • 国家(country)
  • 省份(province)
  • 城市(city)
  • 网络方式(network)
  • 来源方式(source)
  • 浏览器类型(browserType)
  • 进入网站时间(entryTime)
  • 离开网站时间(leaveTime)
  • 用户的ID(userID)
  1. ClickLog伴生对象中实现apply方法
  2. 使用FastJSON的JSON.parseObject方法将JSON字符串构建一个ClickLog实例对象
  3. 使用map算子将数据封装到ClickLog样例类
  4. 在样例类中编写一个main方法,传入一些JSON字符串测试是否能够正确解析
  5. 重新运行Flink程序,测试数据是否能够完成封装

使用JSONObject.get(“key”)获取到的数据是一个Any类型,要调用toString方法转换为String类型

参考代码

/**
  * 使用ClickLog样例类来封装点击流日志
  */
// - 频道ID(channelID)
// - 产品类别ID(categoryID)
// - 产品ID(produceID)
// - 国家(country)
// - 省份(province)
// - 城市(city)
// - 网络方式(network)
// - 来源方式(source)
// - 浏览器类型(browserType)
// - 进入网站时间(entryTime)
// - 离开网站时间(leaveTime)
// - 用户的ID(userID)
case class ClickLog (
    var channelID:String,
    var categoryID:String,
    var produceID:String,
    var country:String,
    var province:String,
    var city:String,
    var network:String,
    var source:String,
    var browserType:String,
    var entryTime:String,
    var leaveTime:String,
    var userI:String
)

object ClickLog {
  def apply(json:String): ClickLog = {
    val jsonObject = JSON.parseObject(json)

    val channelID = jsonObject.getString("channelID")
    val categoryID = jsonObject.getString("categoryID")
    val produceID = jsonObject.getString("produceID")
    val country = jsonObject.getString("country")
    val province = jsonObject.getString("province")
    val city = jsonObject.getString("city")
    val network = jsonObject.getString("network")
    val source = jsonObject.getString("source")
    val browserType = jsonObject.getString("browserType")
    val entryTime = jsonObject.getString("entryTime")
    val leaveTime = jsonObject.getString("leaveTime")
    val userID = jsonObject.getString("userID")

    ClickLog(
      channelID,
      categoryID,
      produceID,
      country,
      province,
      city,
      network,
      source,
      browserType,
      entryTime,
      leaveTime,
      userID
    )

  def main(args: Array[String]): Unit = {
    val clickLog = ClickLog("{\"browserType\":\"火狐\",\"categoryID\":20,\"channelID\":19,\"city\":\"ZhengZhou\",\"country\":\"china\",\"entryTime\":1544605260000,\"leaveTime\":1544634060000,\"network\":\"电信\",\"produceID\":7,\"province\":\"HeBeijing\",\"source\":\"百度跳转\",\"userID\":6}")
    println(clickLog)
  }
}

6.4.4. 封装Kafka消息为Message样例类

步骤

  1. 创建一个Message样例类,将ClickLog、时间戳、数量封装
  2. 将Kafka中的数据整个封装到Message类中
  3. 运行Flink测试

参考代码

Message.scala

/**
  * 封装Kafka中的消息
  * @param clicklog 点击流浏览数据Bean对象
  * @param count 数量
  * @param timestamp 时间戳
  */
case class Message(var clicklog:ClickLog, var count:Long, var timestamp:Long)

App.scala

object App {
  def main(args: Array[String]): Unit = {
	...
    // 使用map算子,将kafka中消费到的数据
    val messageDataStream = kafkaDataStream.map {
      msgJson =>
        // 使用FastJSON转换为JSON对象
        val jsonObject = JSON.parseObject(msgJson)
        val count = jsonObject.getLong("count")
        val message = jsonObject.getString("message")
        val timestamp = jsonObject.getLong("timestamp")

        // 将JSON的数据解析封装到Message样例类中
        // 将数据封装到ClickLog样例类
        Message(count, timestamp, ClickLog(message))
    }
    ...
  }
}

6.5. Flink添加水印支持

水印(watermark)就是一个时间戳,Flink可以给数据流添加水印,可以理解为:Flink收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印

  • 水印并不会影响原有Eventtime
  • 一般会设置水印时间,比Eventtime小几秒钟
  • 当数据流添加水印后,会按照水印时间来触发窗口计算
  • 当接收到的水印时间 >= 窗口的endTime,则触发计算

观察下图:

60号消息的EventTime为10:10:00, 正好符合10:00:00到10:10:00这个时间窗口的endtime。正常情况, 该消息应该被这个窗口计算的。但是当发生网络延迟的时候,该消息可能会晚到几秒钟,那么当它到达flink时,该窗口已经运算完毕。为了解决该问题,我们为该消息设置watermark时间10:09:57,当它到达flink时,会把该watermark时间设置为窗口的当前时间,由于小于endtime,所以该消息到达后并不会立即计算。直到一个携带watermark时间大于或者等于endtime的时候,窗口计算才会被触发。这样就可以有效的解决由于网络延迟造成的数据计算不精确的情况。

  • App.scala中添加水印支持
tupleDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message]{
    // 当前时间戳
    var currentTimestamp = 0l
    //延迟时间
    var maxDelayTime = 2000l

    // 获取水印时间
    override def getCurrentWatermark: Watermark = {
        new Watermark(currentTimestamp - maxDelayTime)
    }

    // 获取EventTime
    override def extractTimestamp(element: Message, previousElementTimestamp: Long): Long = {

        // 比较当前消息的时间戳和上一个消息的时间戳,取最大值
        currentTimestamp = Math.max(element.timeStamp,previousElementTimestamp)
        currentTimestamp
    }
})
  • 启动执行测试,观察输出
Message(1,1557475915909,ClickLog(火狐,9,14,ShiJiaZhuang,china,1544612460000,1544634060000,移动,2,HeNan,必应跳转,11))
如果你觉得这篇文章或者我分享的主题对你有帮助,请支持我继续更新网站和主题 !捐赠本站
喜欢 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址