Shang Silicon Valley SparkSQL 3.0

Chapter 1 Overview of SparkSQL

Spark SQL is a module used by Spark to process structured data. The structured data
here is a two-dimensional data table with a value similar to a database.

1.2 Hive and SparkSQL

The predecessor of SparkSQL is Shark, which provides a quick tool for technicians who are familiar with back-end development but do not understand MapReduce.

Insert picture description here
Insert picture description here


Insert picture description here

1.3 Features of SparkSQL

Insert picture description here
Insert picture description here

1.4 What is a DataFrame

Insert picture description here

In the database, schema (pronounced "skee-muh" or "skee-mah", Chinese called schema) is the organization and structure of the database, and both schemes and schemes can be used as plural forms. The schema contains schema objects, which can be table (table), column (column), data type (data type), view (view), stored procedures (relationships), primary key (primary key), foreign Key (foreign key) and so on. The database schema can be represented by a visual diagram, which shows the database objects and their relationships with each other

Insert picture description here


Insert picture description here


Insert picture description here

1.5 What is a DataSet

Insert picture description here

DataFrame usage example

Insert picture description here
Insert picture description here


Insert picture description here

Chapter 2 SparkSQL Core Programming

Focus on learning how to use the DataFrame and DataSet models provided by Spark SQL to program. As well as understanding the relationship and conversion between them, mastering specific SQL writing is the early stage of learning.

2.1 A new starting point

Insert picture description here

2.2 DataFrame

Insert picture description here

2.2.1 Create DataFrame

Insert picture description here


Insert picture description here

2.2.2 SQL syntax

To write sql, you need to have a table

Note: df.createOrReplaceTempView("people" creates a view. There are concepts of view and table in the database. They are actually tables, but the view is a read-only table and cannot be modified, and tbale is a readable and writable table.

Insert picture description here

Within the scope of the Session, only temporary tables are valid only in one connection, and invalid in other connections.

Insert picture description here

2.2.3 DSL Syntax

Insert picture description here


Insert picture description here

2.2.4 Convert RDD to DataFrame

Convert RDD to DataFrame, the data in RDD has no structure, you need to tell the structure of DataFrame data, you can convert

Insert picture description here


Insert picture description here

2.2.5 Convert DataFrame to RDD

DataFrame is converted to RDD. RDD does not require data structure and can be converted directly.

Insert picture description here

2.3 DataSet

DataSet is a data collection with a strong type, and corresponding type information needs to be provided.
Seq

2.3.1 Create a DataSet

Insert picture description here

2.3.2 Convert RDD to DataSet

Note: In actual use, it is rarely used to convert a sequence into a DataSet, and more is to get a DataSet through RDD

Insert picture description here

2.3.3 Convert DataSet to RDD

Insert picture description here

2.4 DataFrame and DataSet conversion

The main difference between DataFrame and DataSet is that DataSet needs data type, and DataFrame can be transferred to DataSet to specify the type.

val ds = df.as[User]

Insert picture description here

2.5 The relationship between RDD, DataFrame, and DataSet

Insert picture description here

2.5.1 The commonality of the three

Insert picture description here

2.5.2 The difference between the three

Insert picture description here


Insert picture description here

2.5.3 Mutual conversion of the three

Insert picture description here

2.6 IDEA develops SparkSQL

In actual development, IDEA is used for development.

2.6.1 Add dependency

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-sql_2.12</artifactId>
 <version>3.0.0</version>
</dependency>

2.6.2 Code Implementation

object SparkSQL01_Demo {
 def main(args: Array[String]): Unit = {
 //创建上下文环境配置对象
 val conf: SparkConf = new 
SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
 //创建 SparkSession 对象
 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
 //RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换
 //spark 不是包名,是上下文环境对象名
 import spark.implicits._
 //读取 json 文件 创建 DataFrame {"username": "lisi","age": 18}
 val df: DataFrame = spark.read.json("input/test.json")
 //df.show()
 //SQL 风格语法
 df.createOrReplaceTempView("user")
 //spark.sql("select avg(age) from user").show
 //DSL 风格语法
 //df.select("username","age").show()
 //*****RDD=>DataFrame=>DataSet*****
 //RDD
 val rdd1: RDD[(Int, String, Int)] = 
spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu",
20)))
 //DataFrame
 val df1: DataFrame = rdd1.toDF("id","name","age")
 //df1.show()
 //DateSet
 val ds1: Dataset[User] = df1.as[User]
 //ds1.show()
 //*****DataSet=>DataFrame=>RDD*****
 //DataFrame
 val df2: DataFrame = ds1.toDF()
 //RDD 返回的 RDD 类型为 Row,里面提供的 getXXX 方法可以获取字段值,类似 jdbc 处理结果集,
但是索引从 0 开始
 val rdd2: RDD[Row] = df2.rdd
 //rdd2.foreach(a=>println(a.getString(1)))
 //*****RDD=>DataSet*****
 rdd1.map{
 case (id,name,age)=>User(id,name,age)
 }.toDS()
 //*****DataSet=>=>RDD*****
 ds1.rdd
 //释放资源
 spark.stop()
 } }
 //样例类
case class User(id:Int,name:String,age:Int)

2.7 User-defined functions

Insert picture description here

2.7.1 UDF

Insert picture description here
package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Spark02_SparkSQL_UDF {

    def main(args: Array[String]): Unit = {

        // TODO 创建SparkSQL的运行环境
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()
        import spark.implicits._

        val df = spark.read.json("datas/user.json")
        df.createOrReplaceTempView("user")

        //自定义一个函数,函数名为prefixName,传入参数为name:String
        spark.udf.register("prefixName", (name:String) => {
            "Name: " + name
        })
        //使用自定义的函数
        spark.sql("select age, prefixName(username) from user").show
        
        // TODO 关闭环境
        spark.close()
    }
}

Insert picture description here

2.7.2 UDAF

Insert picture description here


Implementation Method-UDAF-Strong Type
Find the average age

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession, TypedColumn, functions}

object Spark03_SparkSQL_UDAF2 {

    def main(args: Array[String]): Unit = {

        // TODO 创建SparkSQL的运行环境
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()
        import spark.implicits._
        val df = spark.read.json("datas/user.json")

        // 早期版本中,spark不能在sql中使用强类型UDAF操作
        // SQL & DSL
        // 早期的UDAF强类型聚合函数使用DSL语法操作
        val ds: Dataset[User] = df.as[User]

        // 将UDAF函数转换为查询的列对象
        val udafCol: TypedColumn[User, Long] = new MyAvgUDAF().toColumn

        ds.select(udafCol).show


        // TODO 关闭环境
        spark.close()
    }
    /*
     自定义聚合函数类:计算年龄的平均值
     1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
         IN : 输入的数据类型 User
         BUF : 缓冲区的数据类型 Buff
         OUT : 输出的数据类型 Long
     2. 重写方法(6)
     */
    case class User(username:String, age:Long)
    case class Buff( var total:Long, var count:Long )
    class MyAvgUDAF extends Aggregator[User, Buff, Long]{
        // z & zero : 初始值或零值
        // 缓冲区的初始化
        override def zero: Buff = {
            Buff(0L,0L)
        }

        // 根据输入的数据更新缓冲区的数据
        override def reduce(buff: Buff, in: User): Buff = {
            buff.total = buff.total + in.age
            buff.count = buff.count + 1
            buff
        }

        // 因为是分布式计算有多个缓冲区,需要合并每个缓冲区数据(即合并每个分区的计算结果)
        override def merge(buff1: Buff, buff2: Buff): Buff = {
            buff1.total = buff1.total + buff2.total
            buff1.count = buff1.count + buff2.count
            buff1
        }

        //根据最后的结果,再执行具体的业务计算逻辑
        override def finish(buff: Buff): Long = {
            buff.total / buff.count
        }

        // 缓冲区的编码操作
        override def bufferEncoder: Encoder[Buff] = Encoders.product

        // 输出的编码操作
        override def outputEncoder: Encoder[Long] = Encoders.scalaLong
    }
}


2.8 Loading and saving of data

2.8.1 General loading and saving methods

Insert picture description here


Insert picture description here


Insert picture description here

2.8.2 Parquet

Insert picture description here

2.8.3 JSON

Insert picture description here


Insert picture description here
Insert picture description here


Insert picture description here
Insert picture description here

2.8.4 CSV

Insert picture description here

2.8.5 MySQL

Insert picture description here


1) Import dependencies

<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.27</version>
</dependency>

2) Read data

Insert picture description here
val conf: SparkConf = new 
SparkConf().setMaster("local[*]").setAppName("SparkSQL")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//方式 1:通用的 load 方法读取
spark.read.format("jdbc")
 .option("url", "jdbc:mysql://linux1:3306/spark-sql")
 .option("driver", "com.mysql.jdbc.Driver")
 .option("user", "root")
 .option("password", "123123")
 .option("dbtable", "user")
 .load().show
//方式 2:通用的 load 方法读取 参数另一种形式
spark.read.format("jdbc")
 .options(Map("url"->"jdbc:mysql://linux1:3306/spark-sql?user=root&password=
123123",
 "dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show
//方式 3:使用 jdbc 方法读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")
val df: DataFrame = spark.read.jdbc("jdbc:mysql://linux1:3306/spark-sql", 
"user", props)
df.show
//释放资源
spark.stop()

3) Write data

case class User2(name: String, age: Long)
。。。
val conf: SparkConf = new 
SparkConf().setMaster("local[*]").setAppName("SparkSQL")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20), 
User2("zs", 30)))
val ds: Dataset[User2] = rdd.toDS
//方式 1:通用的方式 format 指定写出类型
ds.write
 .format("jdbc")
 .option("url", "jdbc:mysql://linux1:3306/spark-sql")
 .option("user", "root")
 .option("password", "123123")
 .option("dbtable", "user")
 .mode(SaveMode.Append)
 .save()
//方式 2:通过 jdbc 方法
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123123")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux1:3306/spark-sql", 
"user", props)
//释放资源
spark.stop()

2.8.6 Hive

Insert picture description here


Insert picture description here

1) Embedded HIVE

Insert picture description here


Insert picture description here


Insert picture description here

2) External HIVE

Insert picture description here
5) Code operation Hive

1) Import dependencies

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-hive_2.12</artifactId>
 <version>3.0.0</version>
</dependency>
<dependency>
 <groupId>org.apache.hive</groupId>
 <artifactId>hive-exec</artifactId>
 <version>1.2.1</version>
</dependency>
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.27</version>
</dependency>

2) Copy the hive-site.xml file to the resources directory of the project, code implementation

Insert picture description here


Insert picture description here
//创建 SparkSession
val spark: SparkSession = SparkSession
 .builder()
 .enableHiveSupport()
 .master("local[*]")
 .appName("sql")
 .getOrCreate()
package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql._

object Spark05_SparkSQL_Hive {

    def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
        // TODO 创建SparkSQL的运行环境
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        /启用Hive的支持
        val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

        // 使用SparkSQL连接外置的Hive
        // 1. 拷贝Hive-size.xml文件到classpath下
        // 2. 启用Hive的支持
        // 3. 增加对应的依赖关系(包含MySQL驱动)
        spark.sql("show tables").show

        // TODO 关闭环境
        spark.close()
    }
}
Insert picture description here

Chapter 3 Actual Combat of SparkSQL Project

3.1 Data preparation

All the data in our Spark-sql operation comes from Hive. First, create a table in Hive and import the data.
There are 3 tables in total: 1 user behavior table, 1 city table, and 1 product table

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql._

object Spark06_SparkSQL_Test {

    def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

        spark.sql("use atguigu")




        // 准备数据
        //多行写sql的方式,在hive中创建表
        spark.sql(
            """
              |CREATE TABLE `user_visit_action`(
              |  `date` string,
              |  `user_id` bigint,
              |  `session_id` string,
              |  `page_id` bigint,
              |  `action_time` string,
              |  `search_keyword` string,
              |  `click_category_id` bigint,
              |  `click_product_id` bigint,
              |  `order_category_ids` string,
              |  `order_product_ids` string,
              |  `pay_category_ids` string,
              |  `pay_product_ids` string,
              |  `city_id` bigint)
              |row format delimited fields terminated by '\t'
            """.stripMargin)

        //加载本地数据
        spark.sql(
            """
              |load data local inpath 'datas/user_visit_action.txt' into table atguigu.user_visit_action
            """.stripMargin)

        spark.sql(
            """
              |CREATE TABLE `product_info`(
              |  `product_id` bigint,
              |  `product_name` string,
              |  `extend_info` string)
              |row format delimited fields terminated by '\t'
            """.stripMargin)

        spark.sql(
            """
              |load data local inpath 'datas/product_info.txt' into table atguigu.product_info
            """.stripMargin)

        spark.sql(
            """
              |CREATE TABLE `city_info`(
              |  `city_id` bigint,
              |  `city_name` string,
              |  `area` string)
              |row format delimited fields terminated by '\t'
            """.stripMargin)

        spark.sql(
            """
              |load data local inpath 'datas/city_info.txt' into table atguigu.city_info
            """.stripMargin)

        spark.sql("""select * from city_info""").show


        spark.close()
    }
}

3.2.1 Introduction to requirements

Insert picture description here

3.2.2 Demand analysis

Insert picture description here

3.2.3 Function realization

Insert picture description here
package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Aggregator

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object Spark06_SparkSQL_Test2 {

    def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

        spark.sql("use atguigu")

        // 查询基本数据
        spark.sql(
            """
              |  select
              |     a.*,
              |     p.product_name,
              |     c.area,
              |     c.city_name
              |  from user_visit_action a
              |  join product_info p on a.click_product_id = p.product_id
              |  join city_info c on a.city_id = c.city_id
              |  where a.click_product_id > -1
            """.stripMargin).createOrReplaceTempView("t1")

        // 根据区域,商品进行数据聚合
        spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))
        spark.sql(
            """
              |  select
              |     area,
              |     product_name,
              |     count(*) as clickCnt,
              |     cityRemark(city_name) as city_remark
              |  from t1 group by area, product_name
            """.stripMargin).createOrReplaceTempView("t2")

        // 区域内对点击数量进行排行
        spark.sql(
            """
              |  select
              |      *,
              |      rank() over( partition by area order by clickCnt desc ) as rank
              |  from t2
            """.stripMargin).createOrReplaceTempView("t3")

        // 取前3名
        spark.sql(
            """
              | select
              |     *
              | from t3 where rank <= 3
            """.stripMargin).show(false)

        spark.close()
    }
    case class Buffer( var total : Long, var cityMap:mutable.Map[String, Long] )
    // 自定义聚合函数:实现城市备注功能
    // 1. 继承Aggregator, 定义泛型
    //    IN : 城市名称
    //    BUF : Buffer =>【总点击数量,Map[(city, cnt), (city, cnt)]】
    //    OUT : 备注信息
    // 2. 重写方法(6)
    class CityRemarkUDAF extends Aggregator[String, Buffer, String]{
        // 缓冲区初始化
        override def zero: Buffer = {
            Buffer(0, mutable.Map[String, Long]())
        }

        // 更新缓冲区数据
        override def reduce(buff: Buffer, city: String): Buffer = {
            buff.total += 1
            val newCount = buff.cityMap.getOrElse(city, 0L) + 1
            buff.cityMap.update(city, newCount)
            buff
        }

        //因为是分布式计算有多个缓冲区,需要合并每个缓冲区数据(即合并每个分区的计算结果)
        override def merge(buff1: Buffer, buff2: Buffer): Buffer = {
            buff1.total += buff2.total

            val map1 = buff1.cityMap
            val map2 = buff2.cityMap

            // 两个Map的合并操作方法1
//            buff1.cityMap = map1.foldLeft(map2) {
//                case ( map, (city, cnt) ) => {
//                    val newCount = map.getOrElse(city, 0L) + cnt
//                    map.update(city, newCount)
//                    map
//                }
//            }
            // 两个Map的合并操作方法2
            map2.foreach{
                case (city , cnt) => {
                    val newCount = map1.getOrElse(city, 0L) + cnt
                    map1.update(city, newCount)
                }
            }
            buff1.cityMap = map1
            buff1
        }
        根据最后的结果,再执行具体的业务计算逻辑: 将统计的结果生成字符串信息
        override def finish(buff: Buffer): String = {
            val remarkList = ListBuffer[String]()

            val totalcnt = buff.total
            val cityMap = buff.cityMap

            // 降序排列
            val cityCntList = cityMap.toList.sortWith(
                (left, right) => {
                    left._2 > right._2
                }
            ).take(2)

            val hasMore = cityMap.size > 2
            var rsum = 0L
            cityCntList.foreach{
                case ( city, cnt ) => {
                    val r = cnt * 100 / totalcnt
                    remarkList.append(s"${city} ${r}%")
                    rsum += r
                }
            }
            if ( hasMore ) {
                remarkList.append(s"其他 ${100 - rsum}%")
            }

            remarkList.mkString(", ")
        }

        override def bufferEncoder: Encoder[Buffer] = Encoders.product

        override def outputEncoder: Encoder[String] = Encoders.STRING
    }
}