博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming进阶
阅读量:3964 次
发布时间:2019-05-24

本文共 6026 字,大约阅读时间需要 20 分钟。

  • 带状态的算子:UpdateStateByKey
  • 实战:计算到目前为止累积出现的单词个数写入到MySQL中
  • 基于window的统计
  • 实战:黑名单过滤
  • 实战:Spark Streaming整合Spark SQL实战

updateStateByKey算子

需求:统计到目前为止累计出现单词的个数(需要保持以前的状态)

import org.apache.spark.SparkConfimport org.apache.spark.streaming.{
Seconds, StreamingContext}/** * 使用Spark Streaming完成有状态统计 */object StatefulWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) // 如果使用了stateful的算子,必须要设置checkpoint // 在生产环境中,建议大家把checkpoint设置到HDFS的某个文件夹中 ssc.checkpoint(".") val lines = ssc.socketTextStream("192.168.0.230", 6789) val result = lines.flatMap(_.split(" ")).map((_,1)) val state = result.updateStateByKey[Int](updateFunction _) state.print() ssc.start() ssc.awaitTermination() } /** * 把当前的数据去更新已有的或者是老的数据 * @param currentValues 当前的 * @param preValues 老的 * @return */ def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum val pre = preValues.getOrElse(0) Some(current + pre) }}

计算到目前为止累计出现的单词个数写入到MySQL

  • 使用Spark Streaming进行词频统计
  • Spark Streaming统计结果写入到MySQL中去
import java.sql.DriverManagerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{
Seconds, StreamingContext}/** * 使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中 */object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("192.168.0.230", 6789) val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //result.print() //此处仅仅是将统计结果输出到控制台 //TODO... 将结果写入到MySQL // result.foreachRDD(rdd =>{
// val connection = createConnection() // executed at the driver // rdd.foreach {
record => // val sql = "insert into wordcount(word, wordcount) values('"+record._1 + "'," + record._2 +")" // connection.createStatement().execute(sql) // } // }) result.print() result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnection() partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")" connection.createStatement().execute(sql) }) connection.close() }) }) ssc.start() ssc.awaitTermination() } /** * 获取MySQL的连接 */ def createConnection() = {
Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://192.168.0.230:3306/imooc_spark", "root", "root") }}

window:定时的进行一个时间段的处理

window length:窗口的长度

sliding interval:窗口的间隔

这两个参数和我们batch size有关系的:倍数

每隔多久计算某个范围内的数据:每隔10秒计算前10分钟wc

==>每隔sliding interval统计前window length的值

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

在这里插入图片描述

黑名单过滤

  • transform 算子的使用
  • Spark Streaming整合RDD进行操作

需求:

访问日志 ==> DStream

201808085,ml
201892505,ml
201581512,zg

==> (ml:201808085,ml)

(ml,201892505,ml)
(zg,201581512,zg)

黑名单列表 ==>RDD

ml
zg

==>(ml:true)

==>201581512,zg

leftjoin

(ml:<201808085,ml>,< true>)
(ml:<201892505,ml>,< true>)
(zg:<201581512,zg>,< false>)

import org.apache.spark.SparkConfimport org.apache.spark.streaming.{
Seconds, StreamingContext}/** * 黑名单过滤 */object TransformApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") /** * 创建StreamingContext需要两个参数:SparkConf和batch interval */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 构建黑名单 */ val blacks = List("zs", "ls") val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true)) val lines = ssc.socketTextStream("192.168.0.230", 6789) val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD) .filter(x=> x._2._2.getOrElse(false) != true) .map(x=>x._2._1) }) clicklog.print() ssc.start() ssc.awaitTermination() }}

Spark Streaming整合Spark SQL完成词频统计

import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.{
Seconds, StreamingContext, Time}/** * Spark Streaming整合Spark SQL完成词频统计操作 */object SqlNetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("localhost", 6789) val words = lines.flatMap(_.split(" ")) // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD {
(rdd: RDD[String], time: Time) => val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) import spark.implicits._ // Convert RDD[String] to RDD[case class] to DataFrame val wordsDataFrame = rdd.map(w => Record(w)).toDF() // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words") // Do word count on table using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show() } ssc.start() ssc.awaitTermination() } /** Case class for converting RDD to DataFrame */ case class Record(word: String) /** Lazily instantiated singleton instance of SparkSession */ object SparkSessionSingleton {
@transient private var instance: SparkSession = _ def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession .builder .config(sparkConf) .getOrCreate() } instance } }}

转载地址:http://vcwki.baihongyu.com/

你可能感兴趣的文章
字符转译序列
查看>>
Java 数据类型
查看>>
UTF-16 编码简介
查看>>
Java 变量名
查看>>
Java 四舍五入运算
查看>>
Spring Batch 例子: 运行系统命令
查看>>
解析输入
查看>>
格式化输出
查看>>
Java 大数值
查看>>
括号及后向引用
查看>>
Spring Batch 核心概念
查看>>
Spring Batch 例子: 导入分隔符文件到数据库
查看>>
非贪婪匹配
查看>>
Spring Batch 例子: 导入定长文件到数据库
查看>>
匹配时刻
查看>>
为数值添加逗号
查看>>
忽略大小写匹配
查看>>
全局匹配模式
查看>>
Java 日期时间
查看>>
Java 字符串
查看>>