Spark Streaming implements ranking of website hot words

Ranking of website hot words


1. MySQL installation tutorial

2. Spark Streaming implements ranking of website hot words

3. Spark Streaming real-time computing framework

4. Spark Streaming integrates Kafka to realize communication


The ranking of website hot words can analyze the user's preference for website content, so as to increase the content that users are interested in, thereby increasing the flow of users to the website. Using SparkStreaming counting, you can program to achieve the hot word sorting requirements, and output the top three hot words information to the MySQL data table for storage.

One, create a database to save data

mysql -uroot -pabc123456
show databases;
create database hotword;
use hotword;
create table searchKeyWord(insert_time data,keyword varchar(30),search_count integer);
Insert picture description here

2. Create HotWordBySort.scala to implement functions

import java.sql.{DriverManager, Statement}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object HotWordBySort {
  def main(args: Array[String]): Unit = {
    // 1.创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("HotWordBySort").setMaster("local[2]")
    // 2.创建SparkContext对象
    val sc: SparkContext = new SparkContext(sparkConf)
    // 3.设置日志级别
    sc.setLogLevel("WARN")
    // 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
    // 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
    val dstream: ReceiverInputDStream[String] = ssc
      .socketTextStream("192.168.142.128", 9999)
  
    // 6.通过逗号分隔第一个字段和第二个字段
    val itemPairs: DStream[(String, Int)] = dstream.map(line => (line
      .split(",")(0), 1))
    // 7.调用reduceByKeyAndWindow操作,需要三个参数
    val itemCount: DStream[(String, Int)] = itemPairs.reduceByKeyAndWindow((v1: Int, v2: Int) => 
      v1 + v2, Seconds(60), Seconds(10)) // 计算10s内出现的单词次数;
    // 8.Dstream没有sortByKey操作,所以排序用transform实现,false降序,take(3)取前3
    val hotWord = itemCount.transform(itemRDD => {
      val top3: Array[(String, Int)] = itemRDD.map(pair => (pair._2, pair._1))
        .sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
      //9.将本地的集合(排名前三热词组成的集合)转成RDD
      ssc.sparkContext.makeRDD(top3)
    })
    
    // 10. 调用foreachRDD操作,将输出的数据保存到mysql数据库的表中
    hotWord.foreachRDD(rdd => {
      val url = "jdbc:mysql://192.168.142.128:3306/hotword"
      val user = "root"
      val password = "abc123456"
      Class.forName("com.mysql.jdbc.Driver")
      val conn1 = DriverManager.getConnection(url, user, password)
      conn1.prepareStatement("delete from searchKeyWord where 1=1")
        .executeUpdate()
      conn1.close()
      rdd.foreachPartition(partitionOfRecords => {
        val url = "jdbc:mysql://192.168.142.128:3306/hotword"
        val user = "root"
        val password = "abc123456"
        Class.forName("com.mysql.jdbc.Driver")
        val conn2 = DriverManager.getConnection(url, user, password)
        conn2.setAutoCommit(false)
        val stat: Statement = conn2.createStatement()
        partitionOfRecords.foreach(record => {
          stat.addBatch("insert into searchKeyWord(insert_time, keyword, search_count) values (now(), ' "+record._1+"','"+record._2+"') ")
        })
        stat.executeBatch()
        conn2.commit()
        conn2.close()
      })
    })
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

execute program.

3. On the master node nc -lk 9999

输入命令:nc -lk 9999 (master)

输入数据:
hadoop,111        
spark,222      
hadoop,222
hadoop,222
hive,222
hive,333
Insert picture description here


Log in to MySQL to view the data table and find that the top three hot words are in the table.