##![LearnAI Header](https://coursematerial.blob.core.windows.net/assets/LearnAI_header.png)

# Applying a pipeline to structured streaming data

## Overview (see also [Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html))

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.

## Load previously saved model

Let's take in the model we saved earlier, and apply it to some streaming data!

In [4]:
from pyspark.ml.pipeline import PipelineModel

fileName = "my_pipeline"
pipelineModel = PipelineModel.load(fileName)

## Initiate Data Stream

Here, we are going to simulate streaming data, by reading in the DataFrame from the previous lab, but serving it as a stream to our pipeline.

**Note**: You must specify a schema when creating a streaming source DataFrame. Why!?

In [6]:
from pyspark.sql.types import *

schema = StructType([
  StructField("tweet",StringType()), 
  StructField("existence",IntegerType()),
  StructField("confidence",FloatType())])

streamingData = (spark
                 .readStream
                 .schema(schema)
                 .option("maxFilesPerTrigger", 1)
                 .parquet("dbfs:/gwDF"))

Now we are going to use our `pipelineModel` to transform the `streamingData`. The output will be called `stream`: a confusion matrix for evaluating the performance of the model.

In [8]:
stream = (pipelineModel
          .transform(streamingData)
          .groupBy("existence", "prediction")
          .count()
          .sort("existence", "prediction"))

display(stream)

existence,prediction,count
0,0.0,890
0,1.0,185
1,0.0,58
1,1.0,2997


## Optimization

Why is this stream taking so long? What configuration should we set?

In [10]:
spark.conf.get("spark.sql.shuffle.partitions")

In [11]:
spark.conf.set("spark.sql.shuffle.partitions", "8")

> See this [post](https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/sparksqlshufflepartitions_draft.html) for a detailed look into how to estimate the size of your data and choosing the right number of partitions. 

Let's try this again

In [13]:
stream = (pipelineModel
          .transform(streamingData)
          .groupBy("existence", "prediction")
          .count()
          .sort("existence", "prediction"))

display(stream)

existence,prediction,count
0,0.0,890
0,1.0,185
1,0.0,58
1,1.0,2997


## Save the output

We can save the output of the processed stream to a file.

In [15]:
import re

streamingView = "username"
checkpointFile = "checkPoint"
dbutils.fs.rm(checkpointFile, True) # clear out the checkpointing directory

(stream
 .writeStream
 .format("memory")
 .option("checkpointLocation", checkpointFile)
 .outputMode("complete")
 .queryName(streamingView)
 .start())

In [16]:
display(sql("select * from " + streamingView))

Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.