##![LearnAI Header](https://coursematerial.blob.core.windows.net/assets/LearnAI_header.png)

# Introduction to Azure Databricks

## Use-case

Over the course of this AI Airlift, we will develop and E2E solution for [predictive maintenance](https://en.wikipedia.org/wiki/Predictive_maintenance) (PdM).  For this purpose, we are using a dataset that contains data for 100 different manufacturing machines, collected over the course of one year.
    
One important detail here is that we are dealing with time series data, because datapoints are collected once an hour.  Using time series data offers an opportunity to apply some feature engineering methods unique to these kind of data.  Further into the data science process for this use-case, we will see that dealing with time series data also has important implications when doing machine learning experimentation.

## Learning Goals

This hands-on lab offers an introduction to Azure Databricks (ADB), with a focus on DataFrames. We will

- Learn how to mount your data in Azure Blob storage.
- Understand the relationship between between DataFrames, Datasets, and RDDs.
- Use *actions*, such as `show()`, `display()`, and `count()`.
- Use the *transformations*, such as `limit()`, `select()`, and `drop()`.
- Understand the difference between `actions` and `transformations`.
- Know how to find documentation on Spark.
- Do some basic data cleansing and description.
- Select and drop columns of your data.
- Performing conversion between SQL and DataFrames.

## Documentation

The [Spark API Documentation](https://spark.apache.org/docs/latest/api/python/index.html) is essential for getting started with Azure Databricks, we will be using it throughout our labs.

## Mount Azure Blob storage in Azure Databricks

For this and most other labs, we stored the data in [Azure Blob storage](https://azure.microsoft.com/en-au/services/storage/blobs/).

There are two things you can take away from how we mount the data:
1. The next cell demonstrates how to run another notebook from this notebook.  This can be very useful for creating [Notebook Workflows](https://docs.databricks.com/user-guide/notebooks/notebook-workflows.html). Here the `mnt_blob` notebook is stored in the subdirectory `includes` in the parent directory to the present notebook.
2. You can learn about how to configure `Shared Access Signatures` (SAS) for providing secure access to data stored in Azure Blob storage. See the databricks [documentation](https://docs.databricks.com/spark/latest/data-sources/azure/azure-storage.html#mount-azure-blob-storage-containers-with-dbfs) for more details.

If you are curious, look at the contents of the notebook `../includes/mnt_blob` to see what happens there.

In [4]:
%run "../includes/mnt_blob"

## The Data Source


The relevant data sources for predictive maintenance (PdM) include, but are not limited to:
- **Machine operating conditions (telemetry.csv):** data of the equipment health over time (usually sensor-based and streaming). We will refer to this data as machine telemetry data.
- **Error history (errors.csv):** this data contains logs of non-breaking errors that happen thoughout a machine's operation and which parts of the machine they came from
- **Failure history (failures.csv):** this data contains logs of severe errors that broke the machine down (requiring maintenance to operate again) and parts of the machine that caused it
- **Maintenance/repair history (maintenance.csv):** what parts were fixed/replaced (as part of scheduled maintenance or due to failure) and when
    Equipment metadata: anything we know about equipment and parts (such as make, model, etc.)
- **Machine information (machines.csv):** The model number of each machine, and its age.
    
We can use the cell magic `%fs` to list the files in a folder.  In general `%fs` allows you to use `dbutils` filesystem commands. For more information, see [Access DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html#dbfs-dbutils) with `dbutils`.

There are other magic commands for [mixing languages](https://docs.databricks.com/user-guide/notebooks/notebook-use.html#mix-languages) within a notebook.

In [6]:
%fs ls /mnt/data/telemetry

path,name,size
dbfs:/mnt/data/telemetry/X_test.csv,X_test.csv,26072742
dbfs:/mnt/data/telemetry/X_train.csv,X_train.csv,90539541
dbfs:/mnt/data/telemetry/anoms.csv,anoms.csv,3501773
dbfs:/mnt/data/telemetry/errors.csv,errors.csv,125671
dbfs:/mnt/data/telemetry/failures.csv,failures.csv,23500
dbfs:/mnt/data/telemetry/machines.csv,machines.csv,1376
dbfs:/mnt/data/telemetry/maintenance.csv,maintenance.csv,101529
dbfs:/mnt/data/telemetry/telemetry.csv,telemetry.csv,81198917
dbfs:/mnt/data/telemetry/y_test.csv,y_test.csv,500816
dbfs:/mnt/data/telemetry/y_train.csv,y_train.csv,1745616


## Create a DataFrame

Let's read a CSV file into a `DataFrame`.

* We'll start with the `spark` object, an instance of the [SparkSession](https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/SparkSession.html) class and the entry point to Spark applications.
* From there we can access the `read` object which gives us an instance of the class [DataFrameReader](https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html).  This class offeres different methods for different types of data, such as `csv`, `json`, and `parquet`, which we use here.

Make sure you understand what the optional arguments to the `csv` method are.  Investigate how changes to the those arguments affect the output in the following two cells.

Closely look at the output of running the following cell. Can you see the type of object that is returned? What columns are in the data, and what are the data types of each column? Are you satisfied with the encoding of the first column?

In [8]:
telemetry_file = "/mnt/data/telemetry/telemetry.csv"

telemetryDF = (spark  # our SparkSession & Entry Point
  .read               # our DataFrameReader
  .csv(telemetry_file, header=True, inferSchema=True)      # returns an instance of DataFrame
)

## Let's take a peek at the data

Let's continue by taking a look at the type of data we have. 

We can do this with the `printSchema()` command.

In [10]:
telemetryDF.printSchema()

We can now see that we have four columns of data:
* **datetime** (*string*): The time at which sensor data was collected. (In another notebook, we will ensure that this column is properly encoded as a *timestamp*, rather than a *string*)
* **machineID** (*integer*): The id of a machine (from 1-100).
* **volt** (*double*): Input voltage to a machine
* **rotate** (*double*): rotation produced by machine
* **pressure** (*double*): pressure produced by machine
* **vibration** (*double*): vibration produced by machine
  
Next, let's look at the first 1000 rows of data, using `display`.

## Actions and Transformations

On first sight, the distrinction between actions and transformations might be confusing. 

[Transformations](https://databricks.com/glossary/what-are-transformations) instruct Spark how you would like to modify the DataFrame you have into the one that you want.  The key thing to understand about transformations is that they don't actually do the transformation at the time they're specified. They describe a transformation that will be done. Another thing to understand is that you can "pile up" transformations, one after the other.

*Actions* are operations that are executed immediately. Actions are often taken after a transformation, or sequences of transformations, to show the results of the transformations.

For example `count()` is an action that triggers a job to process the request and return a value: the count of all records in our `DataFrame`.

In [13]:
total = telemetryDF.count()

print("Record Count: {0:,}".format( total ))

That tells us that there are around 900K rows in the `DataFrame`.

## Spark API Documentation

You just encountered one command available to the `DataFrame` class, namely `DataFrame.count()`

Try to find the documentation for `count()`.  Hint: There are two ways to find the documentation on this action:
- Go to the online Spark API  documentation for [count](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=printSchema#pyspark.sql.DataFrame.count).
- Create a new cell below with the code `help(telemetryDF.count)` and execute the cell.
  
Take a look at the API to see what other operations we have available.

0. Got to the [Spark API Documentation](https://spark.apache.org/docs/latest/api/python/index.html) 
0. Look up the documentation for `pyspark.sql.DataFrame`.
  0. In the lower-left-hand-corner type **DataFrame** into the search field.
  0. Hit **[Enter]**.
  0. The search results should appear in the right-hand pane.
  0. Click on **pyspark.sql.DataFrame (Python class, in pyspark.sql module)**
  0. The documentation should open in the right-hand pane.

## Caching data

Before we take a closer look at the contents of the `DataFrame`, let us introduce a technique that speeds up processing.

The ability to cache data is one technique for achieving better performance with Apache Spark. 

This is because every action requires Spark to read the data from its source (Azure Blob, Amazon S3, HDFS, etc.) but caching moves that data into the memory of the local executor for "instant" access.

>`persist()` is an alias for `cache()`. Both can be used to achieve identical results.

Let's demonstrate this by running the action `count` twice, but including a `cache` action of the `DataFrame` when we ran it earlier. This should allow us to execute the `count` action much faster the second time.

In [17]:
(telemetryDF
  .cache()         # mark the DataFrame as cached
  .count()         # materialize the cache
) 

If you re-run that command, it should take significantly less time.

In [19]:
telemetryDF.count()

And as a quick side note, you can remove a cache by calling the `DataFrame`'s `unpersist()` method, but it is not necessary. Garbage collection takes care of this for you.

In [21]:
telemetryDF.unpersist()

Note however that in practice caching data can be sub-optimal for very large datasets and [better approaches](https://docs.databricks.com/delta/delta-cache.html#delta-cache) exist.

## show()

What we want to look for next is a function that will allow us to print the data to the console.

In the API docs for `DataFrame`/`Dataset` find the docs for the `show()`` command.

In the case of Python, we have one method with two optional parameters.  
In the case of Scala, we have several overloaded methods.

In either case, the `show()` method effectively has two optional parameters:
- `n`: The number of records to print to the console, the default being 20.
- `truncate`: If true, columns wider than 20 characters will be truncated, where the default is true.

Let's take a look at the data in our `DataFrame` with the `show()` command:

In [24]:
telemetryDF.show(n=20, truncate=20) # Using defaults for keyword arguments

In the cell above, change the parameters of the show command to:
* print only the first five records
* disable truncation
* print only the first ten records and disable truncation

**Note:** The function `show(..)` is an **action** which triggers a job.

## display()

The `show()` command is part of the core Spark API and simply prints the results to the console.

Our notebooks have a slightly more elegant alternative.

Instead of calling `show()` on an existing `DataFrame` we can instead pass our `DataFrame` to the `display()` command:

In [27]:
display(telemetryDF)

datetime,machineID,volt,rotate,pressure,vibration
1/1/2015 6:00:00 AM,1,176.217853015625,418.504078221616,113.077935462083,45.0876857639276
1/1/2015 7:00:00 AM,1,162.87922289706,402.747489565395,95.4605253823187,43.4139726834815
1/1/2015 8:00:00 AM,1,170.989902405567,527.349825452291,75.2379048586662,34.1788471214451
1/1/2015 9:00:00 AM,1,162.462833264092,346.149335043074,109.248561276504,41.1221440884256
1/1/2015 10:00:00 AM,1,157.61002119306,435.376873016938,111.886648210168,25.9905109982024
1/1/2015 11:00:00 AM,1,172.504839196295,430.323362106675,95.9270416939636,35.6550173268837
1/1/2015 12:00:00 PM,1,156.556030606329,499.071623068962,111.755684290096,42.7539196974773
1/1/2015 1:00:00 PM,1,172.522780814836,409.624717000438,101.00108276407,35.4820086610704
1/1/2015 2:00:00 PM,1,175.324523915223,398.648780707752,110.624360548654,45.4822868466294
1/1/2015 3:00:00 PM,1,169.218423246933,460.850669930244,104.848229967003,39.9017354356787


### show() vs display(...)
* `show()` is part of core spark - `display(...)` is specific to our notebooks.
* `show()` is ugly - `display(...)` is pretty.
* `show()` has parameters for truncating both columns and rows - `display(...)` does not.
* `show()` is a function of the `DataFrame`/`Dataset` class - `display(...)` works with a number of different objects.
* `display()` is more powerful - with it, you can...
  * Download the results as CSV
  * Render line charts, bar chart & other graphs, maps and more.
  * See up to 1000 records at a time.
  
For the most part, the difference between the two is going to come down to preference.

Like `DataFrame.show()`, `display(...)` is an **action** which triggers a job.

## Transformations

Both `show()` and `display(...)` are **actions** that trigger jobs (though in slightly different ways).

If you recall, `show()` has a parameter to control how many records are printed but, `display(...)` does not.

We can address that difference with our first *transformation*, `limit()`.

### limit()

If you look at the API docs, `limit()` is described like this:
> Returns a new Dataset by taking the first n rows...

`show()`, like many actions, does not return anything. 

On the other hand, transformations like `limit()` return a **new** `DataFrame`:

In [30]:
limitedDF = telemetryDF.limit(5) # "limit" the number of records to the first 5

Notice how "nothing" happened - that is no job was triggered.

This is because we are simply defining the second step in our transformations.
  1. Read in the parquet file (represented by **telemetryDF**).
  1. Limit those records to just the first 5 (represented by **limitedDF**).

It's not until we induce an action that a job is triggered and the data is processed

We can to this with either the `show()` or the `display(...)` actions.

For example, we can `show` the first 100 rows of the DataFrame `limitedDF`, which only has 5 row.

In [32]:
limitedDF.show(100, False) #show up to 100 records and don't truncate the columns

We can use `display` to achieve a similar, but prettier result.

In [34]:
display(limitedDF) # defaults to the first 1000 records

datetime,machineID,volt,rotate,pressure,vibration
1/1/2015 6:00:00 AM,1,176.217853015625,418.504078221616,113.077935462083,45.0876857639276
1/1/2015 7:00:00 AM,1,162.87922289706,402.747489565395,95.4605253823187,43.4139726834815
1/1/2015 8:00:00 AM,1,170.989902405567,527.349825452291,75.2379048586662,34.1788471214451
1/1/2015 9:00:00 AM,1,162.462833264092,346.149335043074,109.248561276504,41.1221440884256
1/1/2015 10:00:00 AM,1,157.61002119306,435.376873016938,111.886648210168,25.9905109982024


In [35]:
telemetryDF.printSchema()

For example, maybe the customer suspects that low `voltage` causes machines to not produce enough `pressure` (There is no strong relationship).

In this case, we could just select those two columns, which will save time and space.

In [37]:
# Transform the data by selecting only three columns
onlyThreeDF = (telemetryDF
  .select("datetime", "machineID", "volt", "pressure")
)
# Now let's take a look at what the schema looks like
onlyThreeDF.printSchema()

Again, notice how the call to `select()` does not trigger a job.

That's because `select()` is a transformation. It's just one more step in a long list of transformations.

Let's go ahead and invoke the action `show()` and take a look at the result.

In [39]:
# And lastly, show the first five records.
onlyThreeDF.show(5, False)

The `select()` command is one of the most powerful and most commonly used transformations. 

We will see plenty of other examples of its usage as we progress.

If you look at the API docs, `select()` is described like this:
> Returns a new Dataset by computing the given Column expression for each element.

The "Column expression" referred to there is where the true power of this operation shows up. Again, we will go deeper on these later.

Just like `limit()`, `select()` 
- does not trigger a job
- returns a new `DataFrame`
- simply defines the next transformation in a sequence of transformations.

## drop()

As a quick side note, you will quickly discover there are a lot of ways to accomplish the same task.

Take the transformation `drop()` for example - instead of selecting everything we wanted, `drop()` allows us to specify the columns we don't want.

If you look at the API docs, `drop()` is described like this:
> Returns a new Dataset with a column dropped.

And we can see that we can produce the same result as the last exercise this way:

In [42]:
# Transform the data by selecting only three columns
droppedDF = (telemetryDF
  .drop("rotate", "vibration") # Our second transformation after the initial read (4 columns down to 3)
)
# Now let's take a look at what the schema looks like
droppedDF.printSchema()

Again, `drop()` is just one more transformation - that is no job is triggered.

In [44]:
# And lastly, show the first five records again.
droppedDF.show(5, False)

## distinct() & dropDuplicates()

These two transformations do the same thing. In fact, they are aliases for one another.
* You can see this by looking at the source code for these two methods
* ```def distinct(): Dataset[T] = dropDuplicates()```
* See <a href="https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala" target="_blank">Dataset.scala</a>

The difference between them has everything to do with the programmer and their perspective.
- The name **distinct** will resonate with developers, analyst and DB admins with a background in SQL.
- The name **dropDuplicates** will resonate with developers that have a background or experience in functional programming.

As you become more familiar with the various APIs, you will see this pattern reassert itself.

The designers of the API are trying to make the API as approachable as possible for multiple target audiences.

If you look at the API docs, both `distinct(..)` and `dropDuplicates(..)` are described like this:
> Returns a new Dataset that contains only the unique rows from this Dataset....

With this transformation, we can now tackle our first business question:

If you recall, our original `DataFrame` has this schema:

In [47]:
telemetryDF.printSchema()

The transformation `distinct()` is applied to the row as a whole. In other words, data in all columns will effect this evaluation.

To get the distinct list of machines, and only machines, we need to reduce the number of columns to just the one column, **machineID**. 

We can do this with the `select()` transformation and then we can introduce the `distinct()` transformation.

In [49]:
distinctDF = (telemetryDF     # Our original DataFrame from spark.read.parquet()
  .select("machineID")        # Drop all columns except the "project" column
  .distinct()                 # Reduce the set of all records to just the distinct column.
)

Just to reinforce, we have three transformations:
0. Read the data (now represented by `telemetryDF`)
0. Select just the one column
0. Reduce the records to a distinct set

No job is triggered until we perform an action like `show()`:

In [51]:
# There will not be more than 100 projects
distinctDF.show(100, False)               

You can count those if you like.

But, it would be easier to ask the `DataFrame` for the `count()`:

In [53]:
total = distinctDF.count()     
print("Distinct machine IDs: {0:,}".format( total ))

## dropDuplicates()

The method `dropDuplicates()` has a second variant that accepts one or more columns.
* The distinction is not performed across the entire record unlike `distinct()` or even `dropDuplicates()`.
* The distinction is based only on the specified columns.
* This allows us to keep all the original columns in our `DataFrame`.

## Recap

Our code is spread out over many cells which can make this a little hard to follow.

Let's take a look at the same code in a single cell.

In [56]:
telemetry_file = "/mnt/data/telemetry/telemetry.csv"

telemetryDF = (spark       # Our SparkSession & Entry Point
  .read                    # Our DataFrameReader
  .csv(telemetry_file, header=True, inferSchema=True)           # Returns an instance of DataFrame
)
(telemetryDF               # Only if we are running multiple queries
  .cache()                 # mark the DataFrame as cachable
  .count()                 # materialize the cache
)
distinctDF = (telemetryDF  # Our original DataFrame from spark.read.parquet(..)
  .select("machineID")     # Drop all columns except the "project" column
  .distinct()              # Reduce the set of all records to just the distinct column.
)
total = distinctDF.count()     
print("Distinct machineIDs: {0:,}".format( total ))

## DataFrames vs SQL & Temporary Views


This might also be a good time to read up on the history and difference between [RDDs, DataFrames, and Datasets](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html). 

The `DataFrame` API is built upon a SQL engine.

As such we can "convert" a `DataFrame` into a temporary view (or table) and then use it in "standard" SQL.

Let's start by creating a temporary view from a previous `DataFrame`.

In [58]:
telemetryDF.createOrReplaceTempView("telemetry")

Now that we have a temporary view (or table) we can start expressing our queries and transformations in SQL:

In [60]:
%sql

SELECT * FROM telemetry

datetime,machineID,volt,rotate,pressure,vibration
1/1/2015 6:00:00 AM,1,176.217853015625,418.504078221616,113.077935462083,45.0876857639276
1/1/2015 7:00:00 AM,1,162.87922289706,402.747489565395,95.4605253823187,43.4139726834815
1/1/2015 8:00:00 AM,1,170.989902405567,527.349825452291,75.2379048586662,34.1788471214451
1/1/2015 9:00:00 AM,1,162.462833264092,346.149335043074,109.248561276504,41.1221440884256
1/1/2015 10:00:00 AM,1,157.61002119306,435.376873016938,111.886648210168,25.9905109982024
1/1/2015 11:00:00 AM,1,172.504839196295,430.323362106675,95.9270416939636,35.6550173268837
1/1/2015 12:00:00 PM,1,156.556030606329,499.071623068962,111.755684290096,42.7539196974773
1/1/2015 1:00:00 PM,1,172.522780814836,409.624717000438,101.00108276407,35.4820086610704
1/1/2015 2:00:00 PM,1,175.324523915223,398.648780707752,110.624360548654,45.4822868466294
1/1/2015 3:00:00 PM,1,169.218423246933,460.850669930244,104.848229967003,39.9017354356787


And we can just as easily express in SQL the distinct list of projects, and just because we can, we'll sort that list:

In [62]:
%sql

SELECT DISTINCT machineID FROM telemetry ORDER BY machineID

machineID
1
2
3
4
5
6
7
8
9
10


And converting from SQL back to a `DataFrame` is just as easy:

In [64]:
tableDF = spark.sql("SELECT DISTINCT machineID FROM telemetry ORDER BY machineID")
display(tableDF)

machineID
1
2
3
4
5
6
7
8
9
10


In [65]:
# You can ignore this code, we use it for testing our notebooks.
# try:
assert tableDF.count() == 100
#   None
#   except

Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.