项目作者: anjijava16

项目描述 :
Spark Structured Streaming
高级语言: Scala
项目地址: git://github.com/anjijava16/Spark_Structured_Streaming.git
创建时间: 2019-12-23T04:08:50Z
项目社区:https://github.com/anjijava16/Spark_Structured_Streaming

开源协议:

下载


Spark Structured Streaming

Source :

  1. Kafka ,File Systems(CSV,Delimiter,Parquet,orc,avro,json),Socket

Target:

  1. Kafka ,Console,meory,foreach

IMP: Schema Definition is manadatory to process the data

By defualt it will fall in the column known as VALUE

Structured Streaming is a stream processing engine built on the Spark SQL engine.

StructuredNetworkWordCount maintains a running word count of text data received from a TCP socket. DataFrame lines represents an unbounded table containing the streaming text. The table contains one column of strings value, and each line in the streaming text data becomes a row in the table.The DataFrame is converted to a Dataset of String using .as[String]. We then apply the flatMap to split each line into words Dataset. Finally, we define the wordCounts DataFrame by grouping it by the word and counting them. wordCounts is a streaming DataFrame representing the running word counts.

1. Input sources:

i. Read From Socket Stream

  1. ```
  2. val socketDF = spark
  3. .readStream
  4. .format("socket")
  5. .option("host", "localhost")
  6. .option("port", 9999)
  7. .load()
  8. socketDF.isStreaming // Returns True for DataFrames that have streaming sources
  9. socketDF.printSchema
  10. ```

ii. Read from Files(CSV,JSON,Parquet,Orc…)

  1. ```
  2. // Read all the csv files written atomically in a directory
  3. val userSchema = new StructType().add("name", "string").add("age", "integer")
  4. val csvDF = spark
  5. .readStream
  6. .option("sep", ";")
  7. .schema(userSchema) // Specify schema of the csv files
  8. .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
  9. ```

val outputPathDir = workingDir + “/output.parquet” // A subdirectory for our output
val checkpointPath = workingDir + “/checkpoint” // A subdirectory for our checkpoint & W-A logs
val myStreamName = “lesson02_ss” // An arbitrary name for the stream

Output Modes

  1. Mode Example Notes
  2. Complete dsw.outputMode("complete") The entire updated Result Table is written to the sink. The individual sink implementation decides how to handle writing the entire table.
  3. Append dsw.outputMode("append") Only the new rows appended to the Result Table since the last trigger are written to the sink.
  4. Update dsw.outputMode("update") Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. Since Spark 2.1.1
  5. In the example below, we are writing to a Parquet directory which only supports the append mode:

Managing Streaming Queries

  1. id get unique identifier of the running query that persists across restarts from checkpoint data
  2. runId get unique id of this run of the query, which will be generated at every start/restart
  3. name get name of the auto-generated or user-specified name
  4. explain() print detailed explanations of the query
  5. stop() stop query
  6. awaitTermination() block until query is terminated, with stop() or with error
  7. exception exception if query terminated with error
  8. recentProgress array of most recent progress updates for this query
  9. lastProgress most recent progress update of this streaming query

Output Sinks

  1. DataStreamWriter.format accepts the following values, among others:
  2. Output Sink Example Notes
  3. File dsw.format("parquet"), dsw.format("csv")... Dumps the Result Table to a file. Supports Parquet, json, csv, etc.
  4. Kafka dsw.format("kafka") Writes the output to one or more topics in Kafka
  5. Console dsw.format("console") Prints data to the console (useful for debugging)
  6. Memory dsw.format("memory") Updates an in-memory table, which can be queried through Spark SQL or the DataFrame API
  7. foreach dsw.foreach(writer: ForeachWriter) This is your "escape hatch", allowing you to write your own type of sink.
  8. Delta dsw.format("delta") A proprietary sink

iii. Read From Kafka Source (Kafka Server)

  1. ```
  2. // Subscribe to 1 topic
  3. val df = spark
  4. .readStream
  5. .format("kafka")
  6. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  7. .option("subscribe", "topic1")
  8. .load()
  9. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  10. .as[(String, String)]
  11. // Subscribe to multiple topics
  12. val df = spark
  13. .readStream
  14. .format("kafka")
  15. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  16. .option("subscribe", "topic1,topic2")
  17. .load()
  18. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  19. .as[(String, String)]
  20. // Subscribe to a pattern
  21. val df = spark
  22. .readStream
  23. .format("kafka")
  24. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  25. .option("subscribePattern", "topic.*")
  26. .load()
  27. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  28. .as[(String, String)]
  29. ```

Note : Each Row As a Stream of Record : Each row in the source has the following schema:

  1. ```
  2. Column Type
  3. key binary
  4. value binary
  5. topic string
  6. partition int
  7. offset long
  8. timestamp long
  9. timestampType int
  10. ```

df.selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .as[(String, String)]

Here key and Value as columns of the stream record

3 The output can be defined in a different mode:

Complete Mode - The entire Result Table will be written.
Append Mode - Only new appended rows will be written. (Assume existing rows do not changed.)
Update Mode - Updated rows in the Result Table will be written.

4. Selection, Projection, Aggregation

  1. case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
  2. val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
  3. val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
  4. // Select the devices which have signal more than 10
  5. df.select("device").where("signal > 10") // using untyped APIs
  6. ds.filter(_.signal > 10).map(_.device) // using typed APIs
  7. // Running count of the number of updates for each device type
  8. df.groupBy("deviceType").count() // using untyped API
  9. // Running average signal for each device type
  10. import org.apache.spark.sql.expressions.scalalang.typed
  11. ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API

5. Windows operation on event time

  1. We want to have a 10-minutes window that report on every 5 minutes:
  1. import spark.implicits._
  2. val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
  3. // Group the data by window and word and compute the count of each group
  4. val windowedCounts = words.groupBy(
  5. window($"timestamp", "10 minutes", "5 minutes"),
  6. $"word"
  7. ).count()

6. Handling Late Data and Watermarking

  1. Late arrived data can be updated in the correct window:
  2. In the update mode, rows in the result table can be updated. To reduce the amount of intermediate in-memory state to maintain, we keep a watermarking as a threshold on how late a data can arrive.
  1. import spark.implicits._
  2. val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
  3. // Group the data by window and word and compute the count of each group
  4. val windowedCounts = words
  5. .withWatermark("timestamp", "10 minutes")
  6. .groupBy(
  7. window($"timestamp", "10 minutes", "5 minutes"),
  8. $"word")
  9. .count()
  10. Some sinks (like files) do not supported the updates that the Update Mode required. In Append Mode, new rows will not be appended until the watermarking period has passed so we can account for late arrived data.

7. Join operations

  1. Static DataFrames can be joined with streaming DataFrame:
  2. val staticDf = spark.read. ...
  3. val streamingDf = spark.readStream. ...
  4. streamingDf.join(staticDf, "type") // inner equi-join with a static DF
  5. streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF

8. Streaming Deduplication

  1. To filter duplicate records:
  2. val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
  3. // Without watermark using guid column
  4. streamingDf.dropDuplicates("guid")
  5. // With watermark using guid and eventTime columns
  6. streamingDf
  7. .withWatermark("eventTime", "10 seconds")
  8. .dropDuplicates("guid", "eventTime")

9. Sink :

i File based.

  1. writeStream
  2. .format("parquet") // can be "orc", "json", "csv", etc.
  3. .option("path", "path/to/destination/dir")
  4. .start()

ii.For debugging

  1. // Write to console
  2. writeStream
  3. .format("console")
  4. .start()

iii. To memory

  1. writeStream
  2. .format("memory")
  3. .queryName("tableName")
  4. .start()

Commit Asyn in Spark Streaming:

https://stackoverflow.com/questions/49375384/spark-kafka-streaming-multi-partition-commitasync-issue

iv Write into Kafka Topic:

  1. spark.readStream.format("kafka")
  2. .option("kafka.bootstartup.servers","host:port")
  3. .option("subscribe","kafkaTopic")
  4. .option("startingOffsets","latest")
  5. .load()
  6. .selectExpr("CAST(value as STRING) as json ","timestamp")
  7. .select(from_json(col("json"),streamctl).alias("parsed"))
  8. .select("parsed.*")
  9. .withColumn("results",explode($"results"))
  10. .select("results.user.username")
  11. .withColumn("value",regexp_replace(col("username"),"([0-9])","")
  12. .select("Value")
  13. .writeStream
  14. .option("checkpointLocation","D:/dev_path/checkpoint")
  15. .format("kafak")
  16. .option("kafka.bootstartup.servers","host:ip")
  17. .option("topic","spark-report-topic")
  18. .start()
  19. .awaitTermination()

Write into foreachBatch (MongoDB)

  1. ageAverage
  2. .writeStream
  3. .trigger(Trigger.ProcessingTime("10 seconds"))
  4. .outputMode("complete").foreachBatch { (batchDf: DataFrame, batchId: Long) =>
  5. val df = batchDf.withColumn("batchId", lit(batchId))
  6. df.printSchema()
  7. df.write.format("mongo").mode(SaveMode.Append)
  8. .option("uri", MongoDBConstants.spark_mongodb_output_uri)
  9. .option("database", MongoDBConstants.mongodb_database)
  10. .option("collection", MongoDBConstants.mongodb_collection_tbl)
  11. .save();
  12. df.show(20, false);
  13. }.start();

Write into foreachBatch (MySQL DB)

  1. ageAverage
  2. .writeStream
  3. .trigger(Trigger.ProcessingTime("10 seconds"))
  4. .outputMode("complete").foreachBatch{(batchDf:DataFrame,batchId:Long) =>
  5. val df=batchDf.withColumn("batchId",lit(batchId))
  6. df.printSchema()
  7. df.write.mode(SaveMode.Append).jdbc(url,"meetup_rsvp_tbl",prop)
  8. df.show(20,false);

10 .Managing Streaming Query

  1. val query = df.writeStream.format("console").start() // get the query object
  2. query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
  3. query.runId // get the unique id of this run of the query, which will be generated at every start/restart
  4. query.name // get the name of the auto-generated or user-specified name
  5. query.explain() // print detailed explanations of the query
  6. query.stop() // stop the query
  7. query.awaitTermination() // block until query is terminated, with stop() or with error
  8. query.exception // the exception if the query has been terminated with error
  9. query.recentProgress // an array of the most recent progress updates for this query
  10. query.lastProgress // the most recent progress update of this streaming query

11 Asynchronous API

  1. Query listener:
  2. val spark: SparkSession = ...
  3. spark.streams.addListener(new StreamingQueryListener() {
  4. override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
  5. println("Query started: " + queryStarted.id)
  6. }
  7. override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
  8. println("Query terminated: " + queryTerminated.id)
  9. }
  10. override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
  11. println("Query made progress: " + queryProgress.progress)
  12. }
  13. })

12 .Recovering from Failures with Checkpointing

  1. aggDF
  2. .writeStream
  3. .outputMode("complete")
  4. .option("checkpointLocation", "path/to/HDFS/dir")
  5. .format("memory")
  6. .start()