欢迎来到天剑厚德,这里有最新的大数据技术原创分享。

第三天-Spark Streaming

spark healthsun 0评论

0.简介

streaming data的实际例子还包括处理市场提要、天气数据、电子股票交易数据等等。所有这些应用程序都以非常快的速度生成大量数据,并且需要对数据进行特殊处理,以便您能够实时地从数据中获得洞察力。

streaming 的一个重要用例是复杂事件处理(CEP)。在CEP中,重要的是控制正在处理的数据的范围。这个范围称为窗口,它可以基于时间或大小。基于时间的窗口的一个例子是分析最近1分钟内的数据。一个基于大小的窗口的例子可以是给定股票最近100次交易的平均要价。

Spark Streaming是Spark的库,它支持处理实时数据。这个流可以来自任何来源,比如Twitter、Kafka或Flume。

1.典型的spark streaming

StreamingContext 是一个包装类,他是spark streaming 功能的入口点,进行时间分割,指定时间分片,RDD与批是一一对应,下面看一下持续的流数据被分割成批:

理想情况下,您希望处理数据的速度至少与接收数据的速度一样快;Spark流收集批处理间隔(例如,2秒)内的数据。当这2秒间隔结束时,在这2秒间隔内收集的数据将被发送给Spark用于处理,流将集中于收集下一批间隔的数据。就是说这个间隔时间的确定要基于数据源产生的数据时间。

Spark流中的连续RDDs流需要以抽象的形式表示,通过抽象可以处理它。这种抽象称为离散流(DStream)。应用于DStream的任何操作都会导致底层RDDs上的操作。

每个输入,DStream都与一个接收器相关联(除了文件流)。接收器接收来自输入源的数据并将其存储在Spark的内存中。有两种类型的流媒体资源:

  • basic source,如file和socket连接
  • advanced的来源,如kafka和flumeSpark流还提供了窗口计算,您可以将转换应用于数据的滑动窗口。滑动窗口操作基于两个参数:
    • 窗口长度:这是窗口的持续时间。例如,如果您想获得对最后1分钟数据的分析,窗口长度将为1分钟。
    • 滑动间隔:这描述了执行操作的频率。例如,您希望每10秒执行一次操作。这意味着10秒,窗口的1分钟将有50秒的数据,这对于最后一个窗口也很常见,以及10秒的新数据。

2.结构化的数据流

当前用于Spark streaming的API基于DataFrames。本章将重点介绍这个称为Structured Streaming的新API。强烈建议您使用此功能,但由于它处于beta测试阶段,需要谨慎行事。

1.基于structured streaming的Wordcount

我们怎么做呢?

1.开启一个kafka shell
$ spark-shell
2.创建DataFrame从8585这个端口读取:
scala> val lines = spark.readStream.format("socket").option("host","localhost").option("port",8585).load
3.将DataFrame中的行转换为具有String数据类型的Dataset,然后将其压平:
scala> val words = lines.as[String].flatMap(_.split(" "))
4.计算word count
scala> val wordCounts = words.groupBy("value").count()
5.开启netcat服务
nc -lk 8585
6.回到之前的控制台打印完整的counts集合,通过每次的更新
scala> val query=wordCounts.writeStream.outputMode("complete").format("console").start()
7.返回终端,在netcat输入不同的行,比如 to be or not to be:
to be or not to be
8.实时从控制台看到下面的结果
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| not| 1|
| be| 2|
| hi| 1|
| or| 1|
| to| 2|
+-----+-----+

2.仔细看看structured streaming

结构化流本质上是一个构建在Spark SQL引擎之上的流处理引擎。查看流数据的另一种方法是将其视为一个无限/无界表,当新数据到达时,该表将不断地追加。

结构化流有四个概念:

  • Input table:输入表
  • Trigger:表更新的频率
  • Result table:每次触发器更新后的最终表
  • Output table:每次触发后要写入存储的结果的哪一部分。

查询可能只对新添加的数据(自上次查询以来)、所有已更新的数据(显然包括添加的数据)或整个表感兴趣;这就引出了结构化流媒体的三种输出模式:

  • Append
  • Update
  • Complete

用于有界表的DataFrame/Dataset API在这里的工作方式完全相同,这意味着可以使用常规SQL或DSL的命令。让我们来做一个示例,看看它与静态数据aframe有何不同。

我们应该怎么做呢?

1.开启spark shell
$ spark-shell
2.从s3中读一个糖尿病数据作为DataFrame(静态):
scala> val sdf = spark.read.format("libsvm").load("s3a://sparkcookbook/patientdata")
3.抽取schema从sdf
scala> val schema =sdf.schema
4.从DataFrame读取糖尿病的数据
scala> val st_df=spark.readStream.schema(schema).format("libsvm").load("s3a://sparkcoobook/patientdata")
5.筛选出糖尿病人的数据
scala> val result=st_df.where($"label"===1.0)
6.写入数据到hdfs
scala>result.writeStream.option("checkpointLocation","cp").format("parquet").start("tested-positive")

3.基于kafka的数据流

由于Kafka是一个分布式消息服务,所以它有一个代理集群,在功能上充当一个Kafka代理,如下所示:

对于每个主题,Kafka维护分区日志。这个分区日志由分布在集群中的一个或多个分区组成,如下图所示:

以类似的方式,在Kafka分区中,记录以键-值对的形式存储和检索,其中键是一个名为offset的连续ID号,值是实际的message。

只要读者记住书签(当前offset),他/她就可以自由地以任何方式阅读。

为了在传统的消息传递系统中提供类似于发布/订阅和PTP(队列)的功能,Kafka有消费者组的概念。消费者组是一组消费者,Kafka集群将其视为一个单元。在消费者组中,只有一个消费者需要接收消息。在下面的图中,如果C1消费者接收到主题T1的第一条消息那么该topic上的所有以下消息也将被传递给该消费者。使用这种策略,Kafka保证了给定topic的消息传递顺序

怎么做呢?

$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic oscars
1.开启spark shell
$ spark-shell --packages org.apache.spark:spark-sqlkafka-
0-10_2.11:2.1.1
2.创建oscars topic 用于监听当前数据流
scala> val data =spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","oscars").load()
3.当前这个数据是不是Streaming DataFrame
scala> data.isStreaming
4.获取data DataFrame的schema
scala> data.printSchema
5.创建一个流用于监听oscars topic的数据
scala> val data=spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","oscars").load()
6.转换stream到String 数据类型
scala> val kvstream = data.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
7.写入数据流到控制台
scala> val feed=kvstream.writeStream.format("console").start
scala>feed.awaitTermination
8.在另外一个窗口生成数据到oscars topic
$ kafka-console-producer.sh --broker-list localhost:9092 --topic oscars
9.分发数据,写入数据,基于第8步
10.在Spark shell中看到刚刚生产的数据。

4.理解数据流的挑战

1. 延迟到达/无序的数据

在Streaming中有两个时间概念:

  • Event time
  • Processing time 在流处理应用程序中,事件时间和处理时间之间的时间延迟是不同的,这将导致数据延迟或无序。造成这种延误的原因有很多,例如:
    • 网络延迟
    • 数据负载的方差
    • 批处理的事件
2.在批次之间维护状态

在基于微批处理的流系统中,例如Spark,有时需要在批之间维护和/或更新状态。在当前的Spark实现中,有多种实现方法,例如窗口和updateStateByKey。在所有这些方法中,基本上都是对批执行join操作来维护状态。如果窗口长度很长,这将非常昂贵。另一个选项是在数据库中维护状态。内存中有一些数据库,比如MemSQL,但是它们需要维护另一个数据库系统的开销。

结构化流重写了状态管理,以在内存中维护这个正在运行的中间状态,支持文件系统中的提前写日志(WAL),以实现容错。

3.消息传递的可靠性

有一些消息传递语义是基于消息传递保证(或缺乏保证)的:

  • 最多一次传递: 传递一次或者零次,这种实现上最容易,但是消息可能丢失;
  • 至少一次传递:传递一次或者多次,这种方式消息可能会重复传递;
  • 恰好一次传递:保证了数据仅一次传递,不多不少,不重复不漏掉。 结构化流提供了一种端到端的消息传递保证,基于以下特性:
    • 在WAL中 offset的记录
    • 状态管理(用内存中的状态和WAL)
    • 容错的source和sink(用WAL)
4. streaming不是一个孤岛

Streaming 应用不是独立的应用,它属于一个大的生态系统的一部分,首先,流应用程序可能从复杂数据源摄取数据。数据可能来自关系源和非关系源。数据可以是各种格式的,比如.json和.avro。数据可以是不同级别的标准化。此外,每个实际数据源都有大量的脏数据;这可能导致大量的数据清理和数据准备工作。

流媒体应用程序可能与交互式分析应用程序或机器学习应用程序相接口。这意味着在每种情况下都需要以不同的方式解决吞吐量和可伸缩性方面的挑战。这里有一些技巧可以帮助优化:

  • 数据温度的感知数据的温度(数据的最新程度)和数据的重要性是相互关联的。
  • 避免在磁盘上持久化在磁盘上持久化/检查点数据是一项昂贵的操作,它会杀死MapReduce(除了其他因素)。最大限度地利用内存。
如果你觉得这篇文章或者我分享的主题对你有帮助,请支持我继续更新网站和主题 !捐赠本站
喜欢 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址