欢迎来到天剑厚德,这里有最新的大数据技术原创分享。

第二天-Spark SQL

spark healthsun 0评论

1.理解schema进化

DataFrames

Spark SQL使用一个名为DataFrame的编程抽象。它是一个分布式的数据集合,组织在指定的列中。DataFrame相当于数据库表,但提供了更精细的优化级别。DataFrame API还确保Spark在不同语言绑定之间的性能是一致的。

让我们将数据流与RDDs进行比较。RDD是不透明的对象集合,不知道底层数据的格式。相反,DataFrames有一个与之关联的模式。您还可以将数据aframes视为添加了模式的RDDs。事实上,在Spark 1.2之前,有一个名为SchemaRDD的构件,现在它已经演化为DataFrame API。它们提供的功能比schemardd丰富得多。

关于模式的这些额外信息使您能够进行许多优化,否则是不可能的。

DataFrames还可以使用JDBC透明地加载来自各种数据源的数据,比如Hive表、Parquet文件、JSON文件和外部数据库。数据流可以看作是行对象的RDDs,允许用户调用过程性Spark api,比如map。

DataFrame API可以在Scala、Java、Python和R中使用,从Spark 1.4开始。

用户可以使用领域特定语言(DSL)对数据流执行关系操作。DataFrames支持所有常见的关系操作符,它们在有限的DSL中获取表达式对象,从而允许Spark捕获表达式的结构。

我们将从触发SQL的入口点开始,即SQLContext。我们还将介绍HiveContext,它是一个围绕SQLContext的包装器,以支持Hive功能。注意,HiveContext经过了更多的战斗测试,并且提供了更丰富的功能,因此强烈建议您使用它,即使您不打算连接到Hive。慢慢地,SQLContext将达到与HiveContext相同的功能级别。

有两种方法可以将模式与RDDs关联起来创建数据流。简单的方法是利用Scala的case类,我们将首先介绍这些类。Spark使用Java反射从case类推断模式。还有一种方法可以通过编程为高级需求指定模式,我们将在接下来讨论。Spark SQL提供了一种加载和保存拼花文件的简单方法,这也将介绍。最后,我们将介绍如何从JSON加载和保存数据。

从2.0版开始,直接调用SQLContext或HiveContext的需求已经大大减少,现在大多数功能都可以通过直接处理数据aframes来完成。

DataSets

在Spark 1.6中添加了数据集,以便为数据流提供强类型。在Spark 2.0中,Dataset和DataFrame API被合并为一个抽象。在底层,可以将数据aframe视为行对象的数据集。这将在不同的地方得到增强,您将在后面的章节中看到(例如,在机器学习中,一个数据集(label, BestVector))。

数据集提供了一组丰富的转换api,比如RDDs。建议尽可能多地使用datasets。

支持模式的文件格式

仅拥有模式感知的构建块(如DataFrames)是不够的。在尝试找出模式并在读取数据时应用它(schema-on-read)之后应用模式是很痛苦的。

最好的方法是在数据存储时将模式嵌入其中。这就是模式感知文件格式出现的地方,这也是它们成为标准的原因。

大数据世界中最流行的schema感知文件格式是Parquet。JSON (XML杀手)在web领域非常流行。优化的行列(ORC)格式是Hortonworks和微软对Twitter和Cloudera的Parquet的回应。

2.理解Catalyst优化器

2.1优化器目标

优化器主要有两个方面的目标:

1.使添加新的优化技术变得容易

2.使外部开发人员能够扩展优化器

2.2SparkSQL 用Catalyst转换框架四个阶段

2.2.1.分析用于解析引用的逻辑计划

1.分析阶段分为两大部分,第一部分包括

a.查看SQL查询或DataFrame/Dataset

b.确定没有语法错误

c.创建一个逻辑执行计划

d.逻辑执行计划未解决(因为所引用的列可能不存在或数据类型错误)

2.第二部分包括

a.解决这个执行计划用catalyst对象(链接物理数据源)

b.创建逻辑执行计划,如下图所示

2.2.2.逻辑执行计划优化

这里要特别注意谓词下拉规则。这个概念很简单:如果您在一个地方发出一个查询来运行一些大型数据,而这些数据又在另一个地方,那么它可能导致许多不必要的数据在网络上移动。 ​ 如果我们可以下推存储数据的查询部分,从而过滤掉不必要的数据,那么将显著减少网络流量。下图说明了这一点:

2.2.3.物理执行计划

在物理计划阶段,Spark SQL接受一个逻辑计划并生成一个或多个物理计划。 ​ 然后它度量每个物理计划的代价,并根据它生成一个物理计划。参考下图说明了这一点:

2.2.4.代码生成,将查询部分编译成Java字节码。

查询优化的最后阶段包括生成要在每台机器上运行的Java字节码。它使用一种称为准引号(Quasi quotes)的特殊Scala特性来实现这一点。下图说明了这一点:

3.用样例类推理schema

接下来的两个小节将介绍如何将模式附加到原始数据。 ​ Case类可以很好地将数据封装为对象。

我们应该怎么做:
1.开启spark-shell
$ spark-shell
2.创建person样例类
scala> case class Person(first_name:String, last_name:String,age:Int)
3.导入数据到Person
scala> val p = spark.read.textFile("s3a://sparkcookbook/person")
4.检查第一项
scala> p.first
5.将每一行分割成一个字符串数组,以逗号作为分隔符:
scala> val pmap = p.map( line => line.split(","))
6.将Array[String]数据集转换为Person case对象数据集:
scala> val personDS = pmap.map( p => Person(p(0),p(1),p(2).toInt))
7.注册personDS成为一个视图:
scala> personDS.createOrReplaceTempView("person")
8.运行一个SQL查询:
scala> val people = spark.sql("select * from person")
9.获取people的输出
scala> people.show
还有以下这种:
这部分是关于学习如何使用用例类来附加模式,这里我们使用的输入数据是CSV格式的。此外,还有另一种加载数据的方法:
scala> val people = spark.read.format("csv").load("s3a://sparkcookbook/person")
下面看起来像这样
+------+-------+---+
| _c0| _c1|_c2|
+------+-------+---+
|Barack| Obama| 55|
|George| Bush| 70|
| Bill|Clinton| 70|
+------+-------+---+
有两种方法可以解决这个问题。第一种方法是提供自定义列名,例如:
scala> val people = spark.read.format("csv").load("s3a://sparkcookbook/person").select($"_c0".as("first_name"),$"_c1".as("last_name"),$"_c2".as(age))
                                                                                     
第二种方法是加载已经有列标头的数据,例如:
scala> val p = spark.read.format("csv").option("header","true").load("s3a://sparkcookbook/people")       我们怎么看模式呢:
                                                                                      scala> p.printSchema
root
|-- First Name: string (nullable = true)
|-- Last Name: string (nullable = true)
|-- Age: string (nullable = true)
它起了作用,但不像我们预期的那样。我们期望 *年龄* 是个整数。所以让我们来解决这个问题:                       val p =spark.read.option("delimiter",",").option("header","true").option("inferschema","true").csv("s3a://sparkcookbook/people")

4.以编程方式指定schema

在一些情况下,样例类可能无法工作;其中一种情况是case类不能包含超过22个字段;另一种情况是您事先不知道模式。在这种方法中,数据作为行对象的RDD加载。方法分别创建模式StructType和StructField对象,它们分别表示一个表和一个字段。模式应用于行RDD以创建dataframe。

怎么工作呢?
1.开启spark shell
$ spark-shell

2.导入spark SQL datatypes 和 row 对象:
scala> import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._

3.使用StructType和StructField对象创建模式。StructField对象以param名称、param类型和可空性的形式接受参数:
scala> val schema = StructType(Array(StructField("first_name",StringType,true),
StructField("last_name",StringType,true),StructField("age",IntegerType,true)))
4.加载数据集中的person数据,并将模式附加到其中:
scala> val personDS =spark.read.schema(schema).csv("s3a://sparkcookbook/person")

5.注册personDS作为一个视图:
scala> personDS.createOrReplaceTempView("person")

6.运行SQL查询:
scala> val persons = spark.sql("select * from person")

7.获取Person的输出值:
scala> persons.show

StructType对象定义模式。您可以将其视为关系世界中的表或行。StructType接收StructField对象的数组,如下面的签名所示:

StructType(fields: Array[StructField])

StructField对象具有以下签名:

StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty)

以下是一些关于使用的参数的更多信息:

name:表示字段的名称。 数据类型:这显示了字段的数据类型。

允许使用以下数据类型: IntegerType

FloatType BooleanType

ShortType LongType

ByteType

DoubleType

StringType

nullable:这显示字段是否可以为空。 metadata:这显示了字段的元数据。元数据是Map (String, Any)的包装器,因此它可以包含任意元数据。

5.理解Parquet 格式

在Parquet格式中,单个列中的数据是连续存储的。传统关系模式,例如,如果有一个表,有100列,并且您主要以基于行的格式存取10列,您必须加载所有的100列,因为粒度级别在行级别。Parquet格式存储解决了我们快速查询和尽可能的低成本的问题,尤其在公有云方面。

应该怎么做呢?

1.开启spark-shell
$ spark-shell
2.创建Person样例类
scala> case class Person(firstName:String,lastName:String,age:Int)
3.导入Person数据作为DataSet和映射到Person样例类
scala> val personDS= spark.read.textFile("s3a://sparkcookbook/person").map(line=>line.split(",")).map(str=>Person(str(0),str(1),str(2).toInt))
4.注册person DataSet作为一个临时表以至于SQL查询能够运行它,不强制要求表名和Dataframe名一致:
scala> personDS.createOrReplaceTempView("person")
5.选择一个大于60岁的所有persons:
scala> val sixtyPlus = spark.sql("select * from person where age>60")
6.打印结果
scala> sixtyPlus.show
7.保存sixtyPlus DataFrame到Parquet格式
scala> sixtyPlus.write.parquet("hdfs://192.168.139.150:9000/usr/hduser/sp.parquet")
压缩的方式存储
scala> sixtyPlus.write.option("compression","snappy").parquet("h
dfs://192.168.139.150:9000/usr/hduser/sp.parquet")
8.前面的步骤在HDFS根目录中创建了一个名为sp.parquet的目录。您可以在另一个shell中运行hdfs dfs -ls命令,以确保它已创建:
$ hdfs dfs -ls sp.parquet
9.在Spark shell中加载Parquet文件的内容:
scala> val parquetDF = spark.read.parquet("hdfs://192.168.139.150:9000/usr/hduser/sp.parquet")
10.注册导入Parquet DataFrame作为临时试图:
scala> parquetDF.createOrReplaceTempView("sixty_plus")
11.运行这个临时表:
scala> spark.sql("select * from sixty_plus")

行组:这显示数据的水平分区为行。行组由列块组成。 列块:列块具有行组中给定列的数据。列块总是物理上连续的。行组每列只有一个列块。

页眉:每个列块都有一个共享的页眉。 Page:列块被划分为多个页面。页面是存储单元,不能进一步划分。页面在一个列块中背对背地写。页面上的数据可以压缩。

如果列有空值,则不存储它。如果一个列块有null值,那么即使这个列块也不会被存储。Parquet中的数据不仅被压缩,而且被编码。这将显著减少磁盘占用、网络I/O占用和内存占用。由于需要压缩/解压和编码/解码数据,CPU时钟占用略有增加。

分区

Parquet内置了对分区的支持。当通过选择列(例如,按年、月、日或小时)对数据进行分区时,这种方法非常有效。另一方面,它使即席查询变慢。

谓词下推

Parquet格式作为文件系统(比如HDFS S3)的存储格式,Parquet只提取为行组、列块和选择分区过滤的数据。这使得查询既快速又轻松。

Parquest和Hive的互操作性

如果在Hive表中有一些数据,可以直接保存Parquet格式通过下面的步骤:

1.创建名称为person_parquet的schema:
hive> create table person_parquet like person stored as parquet
2.插入数据到person_parquet从person表里:
hive> insert overwrite table person_parquet select * from person;
3.有时候,从其他源(比如Impala)导入的数据会以二进制形式保存字符串。要在读取时将其转换为字符串,请在SparkConf中设置以下属性,如下面的代码所示:
scala> spark.setConf("spark.sql.parquet.binaryAsString","true")

6.用JSON格式导入和保存数据

Json解决了XML的比较重的情况,JSON是一种轻量级的数据交换格式。

介绍JSON的语义如下规则:

1.数据是以键值对形式表示:
"firstName":"Bill"
2.在JSON种有四种数据类型
* String ("firstName" : "Barack")
* Number ("age" : 56)
* Boolean ("alive": true)
* null ("manager" : null)
3.数据被逗号分割
4.一对大括号{}代表一个对象
{ "firstName" : "Bill", "lastName": "Clinton", "age": 70 }
5.中括号[]代表一个数组
[{ "firstName" : "Bill", "lastName": "Clinton", "age": 70 }
{"firstName": "Barack","lastName": "Obama", "age": 55}]

我们应该怎么做呢?

1.打开一个终端,创建JSON格式person数据:
$ mkdir jsondata
$ vim jsondata/person.json
{"first_name" : "Barack", "last_name" : "Obama", "age" : 55}
{"first_name" : "George", "last_name" : "Bush", "age" : 70 }
{"first_name" : "Bill", "last_name" : "Clinton", "age" : 70 }
2.上传jsondata到HDFS:
$ hdfs dfs -put jsondata /user/hduser/jsondata
3.开启一个spark-shell
$ spark-shell
4.导入jsondata目录到HDFS:
scala> val person = spark.read.json("hdfs://192.168.139.150:9000/user/hdusr/jsondata")
5.注册person DataFrame(df)为一个临时表作为SQL查询
scala> jsondata.createOrReplaceTempView("person")
6.选择所有大于60岁的人
scala> val sixtyPlus = spark.sql("select * from person where age>60")
7.打印值
scala> sixtyPlus.show
8.保存sixtyPlus DataFraem 到json格式
scala> sixtyPlus.write.format("json").save("hdfs://192.168.139.150:9000/user/hdusr/sp")
9.最后一步,在hdfs根目录创建目录sp,运行hdfs dfs -ls 命令确保创建成功
$ hdfs ddfs -ls sp

spark.read.json内部使用TextInputFormat,它一次处理一行。因此,一个JSON记录不能在多行上。如果您使用多行,它将是一个有效的JSON格式,但是它不能与Spark一起工作,并且会引发异常。

它允许在一行中有多个对象。例如,您可以将两个人的信息以数组的形式放在一行中,如下所示:

[{"firstName":"Barack", "lastName":"Obama"},{"firstName":"Bill",
"lastName":"Clinton"}]

7.从关系数据库中导入数据

应该怎么做?

1.创建person表在mysql用下面DDL脚本
CREATE TABLE 'person' (
'person_id' int(11) NOT NULL AUTO_INCREMENT,
'first_name' varchar(30) DEFAULT NULL,
'last_name' varchar(30) DEFAULT NULL,
'gender' char(1) DEFAULT NULL,
'age' tinyint(4) DEFAULT NULL,
PRIMARY KEY ('person_id')
)
2.插入数据
Insert into person values('Barack','Obama','M',55);
Insert into person values('Bill','Clinton','M',70);
Insert into person values('Hillary','Clinton','F',69);
Insert into person values('Bill','Gates','M',61);
Insert into person values('Michelle','Obama','F',52);
3.下载Download mysql-connector-java-x.x.xx-bin.jar 从以下网站
http://dev.mysql.com/downloads/connector/j/.
4.使MYSql驱动对spark-shell可用并启动
$ spark-shell --driver-class-path/path-to-mysql-jar/mysqlconnector-
java-5.1.34-bin.jar
5.构建JDBC URL:
scala> val url="dbc:mysql://localhost:3306/hadoopdb"
6.创建连接属性通过用户名和密码
scala> val prop = new java.util.Properties
scala> prop.setProperty("user","root")
scala> prop.setProperty("passsword","123456")
7.导入DataFrame通过jdbc数据源(url,table name和属性):
scala> val people = sqlContext.read.jdbc(url,"person",prop)
8.展示结果
scala> people.show
9.导入整张表,或者只导入males,运行以下命令:
scala> val males = spark.read.jdbc(url,"person",Array("gender='M'"),prop)
scala> males.show
10.仅显示firstName通过执行以下命令:
scala> val first_names = people.select("first_name")
scala> first_names.show
11.显示小于60岁的人:
scala> val below60 = people.filter(peopel("age")<60)
scala> below60.show
12.通过gender分组
scala> val grouped = people.groupBy("gender")
13.找出男女性别人数
scala> val gender_count = grouped.count
scala> gender_count.show
14.找出男女平均年龄
scala> avg_age = grouped.avg("age")
scala> avg_age.show
15.保存avg_age到新表
scala> gender_count.write.jdbc(url,"gender_count",prop)
16.保存people DataFrame在Parquet格式
scala>people.write.parquet("people.parquet")
17.保存people DataFrame在JSON格式
scala> people.write.json("people.parquet")

8.从任意数据源导入和保存数据

到目前为止,我们已经介绍了内置在DataFrames中的五个数据源:Parquet (默认)、text、json、csv和jdbc。

我们应该怎么做呢?

1.启动spark-shell
$ spark-shell
2.导入Parquet数据,默认数据源格式,可以不指定格式
scala> val people=spark.read.load("hdfs://192.168.139.150:9000/usr/hduser/people.parquet")
3.从parquet导入数据手动指定格式
scala> val people=spark.read.format("parquet").load("hdfs://192.168.139.150:9000/usr/hduser/people.parquet")
4.内建dataTypes,不需要指定完整格式名,仅指定"parquet","json",或者"jdbc"
scala> val people=spark.read.format("parquet").load("hdfs://192.168.139.150:9000/usr/hduser/people.parquet")
5.保存数据到JSON Append模式
scala> val people=people.write.format("json").mode("append").save("hdfs://192.168.139.150:9000/usr/hduser/people.json")

9.理解joins

SQL join是基于一个公共列组合两个数据集的过程。

1.开启spark-shell
$ spark-shell
2.加载业务数据
scala> val businesses = spark.read.format("json").load("s3a://sparkcookbook/yelpdata/yelp_academic_dataset_business.json")
3.导入review数据表到DataFrame:
scala> val reviews = spark.read.format("json").load("s3a://sparkcookbook/yelpdata/yelp_academic_dataset_review.json")
4.导入user表到DataFrame:
scala> val users = spark.read.format("json").load("s3a://sparkcookbook/yelpdata/yelp_academic_dataset_user.json")
5.导入tips表到DataFrame:
scala> val tips = spark.read.format("json").load("s3a://sparkcookbook/yelpdata/yelp_academic_dataset_tip.json")
6.导入reviews和tips作为临时表
scala>reviews.createOrReplaceTempView("reviews")
scala>tips.createOrReplaceTempView("tips")
7.做一个shuffle hash join在两个表上
scala> val reviews_tips = spark.sql("select * from reviews join tips on reviews.user_id=tips.user_id and reviews.business_id=tips.business_id")
8.我们要所有的reviews,不管是否有tips的评论
scala> val all_reviews_tips = spark.sql("select * from reviews LEFT JOIN tips ON reviews.user_id=tips.user_id and reviews.business_id = tips.business_id")

Spark中主要有三种连接类型:

shuffle hash join(默认)

Shuffle hash连接是最基本的连接类型,它是从join in派生而来的MapReduce。假设我们想要加入每个用户的reviews数据和tips数据。一个Shuffle散列连接将经过以下步骤:

  1. 使用user_id、business_id作为键映射审查数据aframe。
  2. 使用user_id、business_id作为键映射tip DataFrame。
  3. 根据user_id、business_id重新洗牌检查数据。
  4. 根据user_id、business_id重新洗牌提示数据。
  5. 使用reduce阶段连接两个数据集。具有相同密钥的数据将在同一台机器上进行排序。在MapReduce中,当数据没有倾斜并均匀分布在键之间时,Shuffle散列连接工作得最好。
Broadcast hash join

最简单的优化是,如果其中一个数据集足够小,可以装入内存,那么它应该广播到每个计算节点。这种用例非常常见,因为始终需要将数据与次要数据(比如字典)结合起来。由于太多的数据在网络上被打乱,大多数连接都很慢。与广播连接,将较小的数据集复制到所有工作节点,从而保持较大Dataframe的原始并行性。

Cartesian join(笛卡尔积)

当一个数据aframe的每一行与另一个数据aframe的每一行连接时,就会发生笛卡尔连接。这会使行数急剧增加。两个数据流,每个只有100 K行,可以产生100亿行笛卡尔连接。

10.分析嵌套结构

在使用Spark时候,会使用DataFrame及SQLContext,而在众多的数据格式中,很可能会遇到JSON格式数据,此数据还可能包含嵌套关系,比如像如下的JSON数据:

{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
{"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}
{"name":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}

如果我们直接用DataFrame的show方法可以看到:

+---+--------------------+-------+
|age|             myScore|   name|
+---+--------------------+-------+
| 25| [[23,19], [50,58]]|Michael|
| 30|[[33,29], [52,38]...|   Andy|
| 19| [[43,39], [53,28]]| Justin|
+---+--------------------+-------+

root
|-- age: long (nullable = true)
|-- myScore: array (nullable = true)
|   |-- element: struct (containsNull = true)
|   |   |-- score2: long (nullable = true)
|   |   |-- score1: long (nullable = true)
|-- name: string (nullable = true)

由于myScore是一个数组,所以,在上述show得到的表中,我们不能直接使用sql来查询或聚合,那么如何才能将myScore的数组类型展开呢? 我们可以考虑使用explode函数,如下:

val df = sqlContext.read.json("hdfs://master:9000/test/people_Array.json")
df.show()
df.printSchema()
val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
dfScore.show()

此时,会得到如下结果,这个时候的表,就跟我们平时看到的关系型数据庫的表是一样的了,接下来,我们就可以执行相关的sql查询了。

+-------+-----------------+------------------+
|   name|           score1|           score2|
+-------+-----------------+------------------+
|Michael|               19|               23|
|Michael|               58|               50|
|   Andy|               29|               33|
|   Andy|               38|               52|
|   Andy|               88|               71|
| Justin|               39|               43|
| Justin|               28|               53|
+-------+-----------------+------------------+

代码参考如下:

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.junit.{After, Before, Test}
import org.junit.Assert.assertEquals
​
class Test {
  @transient var sc: SparkContext = _
  @transient var sqlContext:SQLContext = _
  @Before
  def init(): Unit ={
    val conf = new SparkConf().setAppName("Test").setMaster("spark://master:7077")
    sc = new SparkContext(conf)
    sqlContext = new org.apache.spark.sql.SQLContext(sc)
  }
​
  @Test
  def TestMapFun(): Unit ={
    val df = sqlContext.read.json("hdfs://master:9000/test/people_Array.json")
    df.show()
    df.printSchema()
​
    val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
    val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
    dfMyScore.show()
​
    dfMyScore.registerTempTable("table1")
    val result = sqlContext.sql("select name,avg(hwScore_Std),avg(exScore_Std) from table1")
    assertEquals(7,dfMyScore.count())
  }
}
如果你觉得这篇文章或者我分享的主题对你有帮助,请支持我继续更新网站和主题 !捐赠本站
喜欢 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址