##![LearnAI Header](https://coursematerial.blob.core.windows.net/assets/LearnAI_header.png)

-sandbox

# Getting started with Machine Learning for Predictive Maintenance

In this lab, we will create our first Machine Learning solution for predictive maintenance. We will rely on a simple but powerful algorithm: [Logistic Regression](https://en.wikipedia.org/wiki/Logistic_regression).

## Reading the data

We begin by reading the data that we finished pre-processing in a prior Notebook.

> *Note:* If you you do get an error messages about a non-existent file, run the *feature_engineering* notebook of day 1 once more. Unfortunately, this will take a couple of minutes.

In [5]:
df = spark.read.parquet("dbfs:/FileStore/tables/preprocessed").cache()
df = df.withColumnRenamed("y_0", "label") # this is the label we're going to use

display(df)

machineID,datetime,age,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3,pressure_ma_3,pressure_sd_3,rotate_ma_3,rotate_sd_3,vibration_ma_3,vibration_sd_3,volt_ma_3,volt_sd_3,label,y_1,y_2,y_3
32,2015-02-23T21:00:00.000+0000,15,1095.0,166.0,409.0,1387.0,375.0,1071.0,1387.0,1387.0,351.0,711.0,1387.0,1387.0,351.0,99.47241720086753,4.027627389831299,451.3898073666962,37.143235255898006,36.43797824633008,5.246946040863718,175.77508414243,27.49122867502091,0,0,0,0
32,2015-02-23T22:00:00.000+0000,15,1096.0,167.0,410.0,1388.0,376.0,1072.0,1388.0,1388.0,352.0,712.0,1388.0,1388.0,352.0,100.09894109745454,3.794141420379903,435.17321536069,39.335699599244414,39.2308973568302,4.176148610708063,180.83567404661,23.740298384283044,0,0,0,0
32,2015-02-23T23:00:00.000+0000,15,1097.0,168.0,411.0,1389.0,377.0,1073.0,1389.0,1389.0,353.0,713.0,1389.0,1389.0,353.0,99.5443989022908,3.475883016729516,421.66272855994526,15.279105023168729,41.71121231384191,0.9869915716793244,169.48155636960098,21.84432149813136,0,0,0,0
32,2015-02-24T00:00:00.000+0000,15,1098.0,169.0,412.0,1390.0,378.0,1074.0,1390.0,1390.0,354.0,714.0,1390.0,1390.0,354.0,97.68908923167596,2.826179881000501,416.56100286652577,14.313070539126413,43.106649846394575,2.4962971618940304,168.40722316791698,19.97804980820004,0,0,0,0
32,2015-02-24T01:00:00.000+0000,15,1099.0,170.0,413.0,1391.0,379.0,1075.0,1391.0,1391.0,355.0,715.0,1391.0,1391.0,355.0,96.91916094721388,4.036157929664655,428.15124045533855,23.48933395357947,42.8987242528714,2.771261929609087,172.9402295425665,16.124035462907294,0,0,0,0
32,2015-02-24T02:00:00.000+0000,15,1100.0,171.0,414.0,1392.0,380.0,1076.0,1392.0,1392.0,356.0,716.0,1392.0,1392.0,356.0,98.60820305322396,6.429119080028224,444.72279625965575,26.72521659029804,43.55483208223855,2.8184999168524905,166.95757356868276,19.57722439213211,0,0,0,0
32,2015-02-24T03:00:00.000+0000,15,1101.0,172.0,415.0,1393.0,381.0,1077.0,1393.0,1393.0,357.0,717.0,1393.0,1393.0,357.0,97.75829917013183,6.35926449953383,455.000717946228,30.80270066186616,46.0032332920517,5.284176271790364,171.5146198226785,18.02024867702125,0,0,0,0
32,2015-02-24T04:00:00.000+0000,15,1102.0,173.0,416.0,1394.0,382.0,1078.0,1394.0,1394.0,358.0,718.0,1394.0,1394.0,358.0,97.46746997114926,6.507590246654807,464.6340331415173,13.781545445572954,44.00402649786548,6.376598773584249,163.08366794565802,10.130385844311215,0,0,0,0
32,2015-02-24T05:00:00.000+0000,15,1103.0,174.0,417.0,1395.0,383.0,1079.0,1395.0,1395.0,359.0,719.0,1395.0,1395.0,359.0,103.47412855441392,9.726682380322783,481.54989358335945,31.29419249089553,41.575592514816776,9.504942586795329,164.627163319354,11.547022705888777,0,0,0,0
32,2015-02-24T06:00:00.000+0000,15,1104.0,175.0,418.0,1396.0,384.0,1080.0,1396.0,1396.0,360.0,720.0,0.0,1396.0,360.0,99.97419460128005,10.652800910349333,479.901478683917,32.041360774820994,39.010816091632826,9.76504784353832,168.14560647024325,6.929237687861469,0,0,0,0


In [6]:
keys = ['machineID', 'datetime']
X_keep = ['diff_maint_1', 'diff_error_1', 'volt_sd_3', 'diff_fail_3', 'pressure_ma_3', 'pressure_sd_3', 'diff_fail_1', 'diff_fail_0', 'age', 'vibration_ma_3', 'rotate_ma_3', 'diff_error_2', 'diff_fail_2', 'diff_error_3', 'diff_maint_2', 'volt_ma_3', 'diff_maint_0', 'vibration_sd_3', 'diff_maint_3', 'rotate_sd_3', 'diff_error_0', 'diff_error_4']
Y_keep = ['y_0', 'y_1', 'y_2', 'y_3']

Let's begin by dividing the data into training and test sets. With time-series data, we usually divide the data based on a time cut-off and to avoid **leakage** we also put a gap (2 weeks in this case) between the training and test data. Another option we have is to sample every n-th row of the data. The data is collected hourly, and if we do not wish to use such a high frequency for modeling, we can sample every n-th row of the data.

In [8]:
# from pyspark.sql.types import DateType
from pandas import datetime
from pyspark.sql.functions import col, hour

# we sample every nth row of the data using the `hour` function
df_train = df.filter((col('datetime') < datetime(2015, 10, 1))) # & (hour(col('datetime')) % 3 == 0))
df_test = df.filter(col('datetime') > datetime(2015, 10, 15))

Let's look at some summary statistics for the labels in the data.

In [10]:
display(df_train.describe())

summary,machineID,age,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3,pressure_ma_3,pressure_sd_3,rotate_ma_3,rotate_sd_3,vibration_ma_3,vibration_sd_3,volt_ma_3,volt_sd_3,label,y_1,y_2,y_3
count,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0,654600.0
mean,50.5,11.33,767.0613458600673,998.2097525206232,933.592835319279,1020.7992407577146,1615.1251894286588,2133.154399633364,1935.9317461044916,2652.421269477544,2349.709482126489,1113.8832447296058,1109.771384051329,1144.651457378552,1120.9993217231895,100.82513452018438,9.23999324497866,446.6710800808024,46.207254381075714,40.3964837222776,4.619880429306799,170.79153282266068,13.81533582506386,0.0146379468377635,0.0188496791934005,0.0108203483043079,0.0150733272227314
stddev,28.86609209638016,5.8276197440095325,763.1257842489293,955.9490918887524,994.1383169517088,995.1194154164116,1427.1820509876295,1655.1293083917917,1552.799216609115,1935.571758820081,1866.5733315385492,1000.6749291181276,1023.5238622627412,1038.5993590886062,1038.328199929404,6.78311055919519,3.916499473522474,29.659338215200368,19.48655531583032,3.1722981813853623,1.9533044798825747,8.448016911894165,5.845427117512882,0.1200987068394556,0.1359941066395189,0.1034566803921302,0.1218447565916889
min,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,76.0053324677532,0.0936970591161633,198.770337377762,0.2159233438022444,22.9732894818614,0.0164138199561251,135.9794565589355,0.0299919681797135,0.0,0.0,0.0,0.0
max,100.0,20.0,5482.0,6645.0,6645.0,6617.0,6645.0,6645.0,6645.0,6645.0,6645.0,6645.0,6645.0,6645.0,6645.0,164.87532404477776,32.69926439338247,576.923563024815,164.91776461098644,67.31208520010537,15.995336982084046,233.13577630387576,54.584229250416726,1.0,1.0,1.0,1.0


Let's make sure we don't have any null values in our DataFrame.

In [12]:
recordCount = df_train.count()
noNullsRecordCount = df_train.na.drop().count()

print("We have {} records that contain null values.".format(recordCount - noNullsRecordCount))

In [13]:
display(df_train.groupBy("label").count())

label,count
1,9582
0,645018


## Train a Logistic Regression Model

Let's now start training our logistic regression model. It's always a good ideas to double-check schemas.

In [15]:
df_train.printSchema()

## Setting up the model

We set the `label` column of the LogisticRegression model to `error`, and the `features` column to `norm_features`.

In [17]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

vassembler = VectorAssembler(inputCols = X_keep, outputCol = "features")
stndscaler = StandardScaler(inputCol = "features", outputCol = "norm_features")

lr = (LogisticRegression()
     .setLabelCol("label")
     .setFeaturesCol("norm_features"))

### Hands-on lab
Create a pipeline that contains a single stage for the model we created above. Then fit the pipeline to the training data and then use the fitted model to `transform` the test data. Finally, return the counts of TP, TN, FP, and FN.

In [19]:
# put your solution here

# pipeline = Pipeline(stages = [])
# assert len(pipeline.getStages()) == 3 # make sure it's one stage only

# fit the pipeline
# get predictions on the test data
# group data by predictions and labels to get counts for TP, TN, FP, FN

In [20]:
# maximize this cell to see the solution:

pipeline = Pipeline(stages = [vassembler, stndscaler, lr])

assert len(pipeline.getStages()) == 3 # make sure it's one stage only
print(pipeline.getStages())

lr_model = pipeline.fit(df_train)

We can now run our pipeline on the test data to obtain predictions.

In [22]:
# maximize this cell to see the solution:

df_pred = lr_model.transform(df_test).drop("features", "norm_features") # apply the model to our held-out test set
display(df_pred.groupBy("label", "prediction").count())

label,prediction,count
1,0.0,2004
0,0.0,185558
1,1.0,84
0,1.0,154


### End of lab

In [24]:
df_pred.printSchema()

## Evaluate the Model

In [26]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
print(evaluator.explainParams())

In [27]:
evaluator.setLabelCol("label")
evaluator.setRawPredictionCol('rawPrediction')

metricName = evaluator.getMetricName()
metricVal = evaluator.evaluate(df_pred)

print("{}: {}".format(metricName, metricVal))

We could wrap this into a function to make it easier to get the output of multiple metrics.

In [29]:
evaluator = BinaryClassificationEvaluator()
evaluator.setLabelCol("label")
evaluator.setRawPredictionCol("rawPrediction")

auroc = evaluator.setMetricName("areaUnderROC").evaluate(df_pred)

print("AUROC: {}".format(auroc))

Let's now save the pipeline in case we need to make future use of it.

In [31]:
lr_model.write().overwrite().save("/dbfs/tmp/lr_model_ipeline")

## Conclusion
Hmmmm... our results are not great yet. We'll look into how to improve our results later.

In [33]:
# You can ignore this code, we use it for testing our notebooks.
assert auroc > .8

Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.