本文共 6026 字,大约阅读时间需要 20 分钟。
updateStateByKey算子
需求:统计到目前为止累计出现单词的个数(需要保持以前的状态)import org.apache.spark.SparkConfimport org.apache.spark.streaming.{ Seconds, StreamingContext}/** * 使用Spark Streaming完成有状态统计 */object StatefulWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) // 如果使用了stateful的算子,必须要设置checkpoint // 在生产环境中,建议大家把checkpoint设置到HDFS的某个文件夹中 ssc.checkpoint(".") val lines = ssc.socketTextStream("192.168.0.230", 6789) val result = lines.flatMap(_.split(" ")).map((_,1)) val state = result.updateStateByKey[Int](updateFunction _) state.print() ssc.start() ssc.awaitTermination() } /** * 把当前的数据去更新已有的或者是老的数据 * @param currentValues 当前的 * @param preValues 老的 * @return */ def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = { val current = currentValues.sum val pre = preValues.getOrElse(0) Some(current + pre) }}
计算到目前为止累计出现的单词个数写入到MySQL
import java.sql.DriverManagerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{ Seconds, StreamingContext}/** * 使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中 */object ForeachRDDApp { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("192.168.0.230", 6789) val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //result.print() //此处仅仅是将统计结果输出到控制台 //TODO... 将结果写入到MySQL // result.foreachRDD(rdd =>{ // val connection = createConnection() // executed at the driver // rdd.foreach { record => // val sql = "insert into wordcount(word, wordcount) values('"+record._1 + "'," + record._2 +")" // connection.createStatement().execute(sql) // } // }) result.print() result.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val connection = createConnection() partitionOfRecords.foreach(record => { val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")" connection.createStatement().execute(sql) }) connection.close() }) }) ssc.start() ssc.awaitTermination() } /** * 获取MySQL的连接 */ def createConnection() = { Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://192.168.0.230:3306/imooc_spark", "root", "root") }}
window:定时的进行一个时间段的处理
window length:窗口的长度
sliding interval:窗口的间隔这两个参数和我们batch size有关系的:倍数
每隔多久计算某个范围内的数据:每隔10秒计算前10分钟wc==>每隔sliding interval统计前window length的值
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))黑名单过滤
需求:
访问日志 ==> DStream
201808085,ml 201892505,ml 201581512,zg==> (ml:201808085,ml)
(ml,201892505,ml) (zg,201581512,zg)黑名单列表 ==>RDD
ml zg==>(ml:true)
==>201581512,zg
leftjoin
(ml:<201808085,ml>,< true>) (ml:<201892505,ml>,< true>) (zg:<201581512,zg>,< false>)import org.apache.spark.SparkConfimport org.apache.spark.streaming.{ Seconds, StreamingContext}/** * 黑名单过滤 */object TransformApp { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") /** * 创建StreamingContext需要两个参数:SparkConf和batch interval */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 构建黑名单 */ val blacks = List("zs", "ls") val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true)) val lines = ssc.socketTextStream("192.168.0.230", 6789) val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => { rdd.leftOuterJoin(blacksRDD) .filter(x=> x._2._2.getOrElse(false) != true) .map(x=>x._2._1) }) clicklog.print() ssc.start() ssc.awaitTermination() }}
Spark Streaming整合Spark SQL完成词频统计
import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.{ Seconds, StreamingContext, Time}/** * Spark Streaming整合Spark SQL完成词频统计操作 */object SqlNetworkWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("localhost", 6789) val words = lines.flatMap(_.split(" ")) // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD { (rdd: RDD[String], time: Time) => val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) import spark.implicits._ // Convert RDD[String] to RDD[case class] to DataFrame val wordsDataFrame = rdd.map(w => Record(w)).toDF() // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words") // Do word count on table using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show() } ssc.start() ssc.awaitTermination() } /** Case class for converting RDD to DataFrame */ case class Record(word: String) /** Lazily instantiated singleton instance of SparkSession */ object SparkSessionSingleton { @transient private var instance: SparkSession = _ def getInstance(sparkConf: SparkConf): SparkSession = { if (instance == null) { instance = SparkSession .builder .config(sparkConf) .getOrCreate() } instance } }}
转载地址:http://vcwki.baihongyu.com/