Actual combat of SparkStreaming project, real-time calculation of PV and UV (hard liver)

Follow me, reply to "data", get big data

Recently, there is a demand for real-time statistics of pv, uv, the results are displayed according to date, hour, pv, uv, statistics are calculated by day, and statistics are re-stated the next day. Of course, it is actually necessary to classify pv and uv according to the type field, for example, according to date. ,hour,pv,uv,type to show. Here introduce the most basic pv, uv display.


For what is pv and uv, please refer to this blog:

1. Project process

Log data is collected from flume, and then dropped into hdfs for other offline business use. It also sinks to kafka, sparkStreaming pulls data from kafka, calculates pv, uv, and uv using the redis set set to remove duplicates, and finally writes the result The mysql database is used for front-end display.

2. The specific process

1) Calculation of pv

There are two ways to pull data. Based on the received and direct methods, the direct pull method is used here, and the mapWithState operator is used to save the state. This operator is the same as updateStateByKey and has better performance. Of course, the actual data needs to be cleaned and filtered before it can be used.

Define a state function

// 实时流量状态更新函数  val mapFunction = (datehour:String, pv:Option[Long], state:State[Long]) => {    val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L)    val output = (datehour,accuSum)    state.update(accuSum)    output  }

In this way, pv can be easily calculated.

2) Calculation of uv

UV needs to be deduplicated throughout the day. Every time a batch of data comes in, if the native reduceByKey or groupByKey is too high in configuration requirements, in the case of low configuration, we apply for a 93G redis for deduplication. Every time a piece of data comes in, the date is used as the key and the guid is added to the set collection, refreshed every 20 seconds, that is, the size of the set collection is taken out and the database is updated.

helper_data.foreachRDD(rdd => {        rdd.foreachPartition(eachPartition => {        // 获取redis连接          val jedis = getJedis          eachPartition.foreach(x => {            // 省略若干...            jedis.sadd(key,x._2)            // 设置存储每天的数据的set过期时间,防止超过redis容量,这样每天的set集合,定期会被自动删除            jedis.expire(key,ConfigFactory.rediskeyexists)          })          // 关闭连接          closeJedis(jedis)        })      })

3) Save the results to the database

The result is saved to mysql, database, the database is refreshed every 10 seconds, and the front-end display refreshes once, and the database will be queried again to achieve the purpose of real-time statistical display of pv and uv.

/** * 插入数据    * @param data (addTab(datehour)+helperversion)    * @param tbName    * @param colNames    */  def insertHelper(data: DStream[(String, Long)], tbName: String, colNames: String*): Unit = {    data.foreachRDD(rdd => {      val tmp_rdd = => x._1.substring(11, 13).toInt)      if (!rdd.isEmpty()) {        val hour_now = tmp_rdd.max() // 获取当前结果中最大的时间,在数据恢复中可以起作用        rdd.foreachPartition(eachPartition => {          try {            val jedis = getJedis            val conn = MysqlPoolUtil.getConnection()            conn.setAutoCommit(false)            val stmt = conn.createStatement()            eachPartition.foreach(x => {              // val sql = ....              // 省略若干              stmt.addBatch(sql)            })            closeJedis(jedis)            stmt.executeBatch() // 批量执行sql语句            conn.commit()            conn.close()          } catch {            case e: Exception => {              logger.error(e)              logger2.error(HelperHandle.getClass.getSimpleName + e)            }          }        })      }    })  }  // 计算当前时间距离次日零点的时长(毫秒)def resetTime = {    val now = new Date()    val todayEnd = Calendar.getInstance    todayEnd.set(Calendar.HOUR_OF_DAY, 23) // Calendar.HOUR 12小时制    todayEnd.set(Calendar.MINUTE, 59)    todayEnd.set(Calendar.SECOND, 59)    todayEnd.set(Calendar.MILLISECOND, 999)    todayEnd.getTimeInMillis - now.getTime }

4) Data fault tolerance

Stream processing consumption Kafka will take into account the problem of data loss, and generally can be saved to any storage system, including mysql, hdfs, hbase, redis, zookeeper and so on. Here, SparkStreaming's built-in checkpoint mechanism is used to realize data recovery when the application restarts.


The checkpoint mechanism is used here. After restart or failure, the task that was not completed last time can be directly read after restart, and data can be read from the offset corresponding to kafka.

// 初始化配置文件ConfigFactory.initConfig() val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname)conf.set("spark.streaming.stopGracefullyOnShutdown","true")conf.set("spark.streaming.kafka.maxRatePerPartition",consumeRate)conf.set("spark.default.parallelism","24")val sc = new SparkContext(conf) while (true){ val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0),getStreamingContext _ )    ssc.start()    ssc.awaitTerminationOrTimeout(resetTime)    ssc.stop(false,true)}

Checkpoint is a directory every day, the StreamingContext object is destroyed regularly in the early morning of the next day, and the statistics are calculated again for pv and uv.

Note: ssc.stop(false,true) means that the StreamingContext object is destroyed gracefully, and the SparkContext object cannot be destroyed. ssc.stop(true,true) will stop the SparkContext object and the program will stop directly.
Application migration or program upgrade

During this process, we upgraded the application. For example, if a certain function was not well written, or there was a logical error, it was necessary to modify the code and re-type the jar package. At this time, if the program was stopped, the new The application will still read the old checkpoint, there may be two problems:

  1. The last program is executed, because there is also serialized code in checkpoint;
  2. Direct execution fails and deserialization fails;

In fact, sometimes, after modifying the code, it can take effect directly without deleting the checkpoint. After many tests, I found that if the data filtering operation causes the data filtering logic to change, and the state operation saves the modification, it will also cause the restart to fail. Only delete the checkpoint. OK, but in practice, once the checkpoint is deleted, it will cause the last unfinished task and the offset of the consumption kafka to be lost, which directly leads to data loss. In this case, I usually do this.

This situation is usually in another cluster, or the checkpoint directory is modified. We separate the code from the configuration file, so it is very convenient to modify the location of the configuration file checkpoint. Then the two programs run together. Except that the checkpoint directory is different, they will be rebuilt and inserted into the same database. After running for a period of time, just stop the old program. I used to look at the official website to say so, I can only remember that it is not clear, and only when I do it myself will I think of ways to ensure the accuracy of the data.

5) Save the offset to mysql

If you save the offset to mysql, you can save pv, uv, and offset to mysql as a statement, so that exactly-once semantics can be guaranteed.

var messages: InputDStream[ConsumerRecord[String, String]] = null      if (tpMap.nonEmpty) {        messages = KafkaUtils.createDirectStream[String, String](          ssc          , LocationStrategies.PreferConsistent          , ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, tpMap.toMap)        )      } else {         messages = KafkaUtils.createDirectStream[String, String](          ssc          , LocationStrategies.PreferConsistent          , ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)        )      }             messages.foreachRDD(rdd => {          ....})

Read offset from mysql and parse:

/**    * 从mysql查询offset    *    * @param tbName    * @return    */  def getLastOffsets(tbName: String): mutable.HashMap[TopicPartition, Long] = {    val sql = s"select offset from ${tbName} where id = (select max(id) from ${tbName})"    val conn = MysqlPool.getConnection(config)    val psts = conn.prepareStatement(sql)    val res = psts.executeQuery()    var tpMap: mutable.HashMap[TopicPartition, Long] = mutable.HashMap[TopicPartition, Long]()    while ( {      val o = res.getString(1)      val jSONArray = JSON.parseArray(o)      jSONArray.toArray().foreach(offset => {        val json = JSON.parseObject(offset.toString)        val topicAndPartition = new TopicPartition(json.getString("topic"), json.getInteger("partition"))        tpMap.put(topicAndPartition, json.getLong("untilOffset"))      })    }    MysqlPool.closeCon(res, psts, conn)    tpMap}

6) Log

The log4j2 used for the log is saved locally, and the ERROR level log will be sent to the mobile phone by email. If there are too many errors, it will be bombarded by the email, so you need to pay attention.

val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName)  // 邮件level=error日志  val logger2 = LogManager.getLogger("email") 

you may also like

Flink state management and state consistency (long text)

Flink calculates the topN hot list in real time

A thrilling experience of Hbase cluster crashing

Data warehouse modeling layered theory

The history of data warehouse architecture

Data warehouse modeling methodology

Learning suggestions, the big data component focuses on learning these