Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

-sandbox
# Extending PdM

In this execise, we address the following concerns:
  - **A different feature set**: We go back to the original telemetry data, and this time instead of computing features consisting of rolling means and standard deviations, we instead run PCA (principal component analysis) on the telemetry data and then run the K-means clustering algorithm on the PCs (principal components). This will give us a set of K clusters based on the telemetry data. Our hope is that some of the clusters will represent cases where one or more telemetries "go off the charts". So we can use the clusters (after one-hot-encoding it) as features into the classification model instead of the original telemetries or the rolling means and standard deviations. If we are successful, we can argue that we have found a more simple feature set for the model.
  - **Multi-class classification**: We extend the problem of binary classification into multi-class classification. Recall that the PdM data flags failure *by component*, so we know which component of a machine failed at any time. In previous notebooks, we built binary classifiers for predicting failure for one component, but now we extend this to a all the components. Our model should be able to predict which component fails given the machine's telemetries, meta-data, and time elapsed since the last maintenance and failure (for each component).

## Getting Started

Run the following cell to configure our "classroom."

In [None]:
%run "../includes/setup_env"

Reading the data

We begin by reading the raw data which has the telemetry.

In [None]:
keys = ['machineID', 'datetime']
keep_left = ['volt', 'rotate', 'pressure', 'vibration']
df_raw = spark.read.parquet("dbfs:/FileStore/tables/raw").select(*keys + keep_left).cache()
display(df_raw)

We also load the data we finished pre-processing in a prior Notebook, but we ignore the moving average and standard deviation features.

In [None]:
df_processed = spark.read.parquet("dbfs:/FileStore/tables/processed").cache()
keep_right = ['age'] + [c for c in df_processed.columns if c.startswith('diff_') or c.startswith('y_')]
df_processed = df_processed.select(*keys + keep_right).cache()
display(df_processed)

We now join the two datasets into one.

In [None]:
df = df_raw.join(df_processed, on = keys, how = 'inner').cache()
display(df)

To perform multi-class classification, we need to create a single column that encodes the four classes. We will call it `label` and let `label = 1` when `y_0 = 1` (component 1 fails), `label = 2` when `y_1 = 1` (component 2 fails), `label = 3` when `y_2 = 1` (component 3 fails), `label = 4` when `y_3 = 1` (component 4 fails), and `label = 0` when no component fails.

In [None]:
from pyspark.sql.functions import when, lit, col

df = df.withColumn("label", lit(0))
for i in range(4): # iterate over the four components
    label = 'y_' + str(i) # name of target column (one per component)
    find_labels = when((col(label) == 1), lit(i+1)).otherwise(col("label"))
    df = df.withColumn("label", find_labels)

df = df.drop("y_0", "y_1", "y_2", "y_3").cache()
display(df)

Let's look at some summary statistics for the labels in the data.

In [None]:
display(df.groupBy("label").count())

Let's begin by dividing the data into training and test sets. With time-series data, we usually divide the data based on a time cut-off and to avoid **leakage** we also put a gap (2 weeks in this case) between the training and test data. Another option we have is to sample every n-th row of the data. The data is collected hourly, and if we do not wish to use such a high frequency for modeling, we can sample every n-th row of the data.

In [None]:
# from pyspark.sql.types import DateType
from pandas import datetime
from pyspark.sql.functions import col, hour

# we sample every nth row of the data using the `hour` function
df_train = df.filter((col('datetime') < datetime(2015, 10, 1))) # & (hour(col('datetime')) % 3 == 0))
df_test = df.filter(col('datetime') > datetime(2015, 10, 15))

Let's make sure we don't have any null values in our DataFrame.

In [None]:
recordCount = df_train.count()
noNullsRecordCount = df_train.na.drop().count()

print("We have {} records that contain null values.".format(recordCount - noNullsRecordCount))

In [None]:
display(df_train)

## Feature engineering using PCA and K-Means

In this section, we will learn to use two un-supervised learning algorithms, namely [PCA (principal component analysis)](https://en.wikipedia.org/wiki/Principal_component_analysis) and [K-means Clustering](https://www.google.com/search?client=firefox-b-1-ab&q=k-means+clustering) in order to expand on what we learned about feature engineering so far. Prior to running this exercise, we invite you to learn more about these algorithms if you are new to using them.

- [Documentation for `KMeans`](https://spark.apache.org/docs/2.4.0/ml-clustering.html#k-means)
- [Documentation for `PCA`](https://spark.apache.org/docs/2.4.0/ml-features.html)

In [None]:
from pyspark.ml.feature import StandardScaler, VectorAssembler, OneHotEncoderEstimator, MinMaxScaler
from pyspark.ml.feature import PCA
from pyspark.ml.clustering import KMeans

from pyspark.ml import Pipeline

PCA_features = ['volt', 'rotate', 'pressure', 'vibration']
diff_features = [c for c in df.columns if c.startswith('diff_')]

stages = []
# create a single vector feature from telemetry data
stages.append(VectorAssembler(inputCols = PCA_features, outputCol = "pca_raw_features"))
# extract principal components form the telemetry data (we chose k = 4 so there's no dimensionality reduction, just orthogonalization)
stages.append(PCA(k = 4, inputCol = "pca_raw_features", outputCol = "pca_features"))
# rescale principal components prior to running k-means
stages.append(StandardScaler(inputCol = "pca_features", outputCol="scaled_pca_features", withStd = True, withMean = False))
# run k-means on rescaled principal components (we chose K = 3 to keep it simple for now)
stages.append(KMeans(featuresCol = "scaled_pca_features", predictionCol = "cluster").setK(3).setSeed(1))
# run one-hot encoding on cluster feature
stages.append(OneHotEncoderEstimator(inputCols = ["cluster"], outputCols = ["cluster_vec"], dropLast = False))
# combine all "time-elapsed-since" features into single vector
stages.append(VectorAssembler(inputCols = diff_features, outputCol = "diff_features"))
# rescale all "time-elapsed-since" features
stages.append(MinMaxScaler(inputCol = "diff_features", outputCol="scaled_diff_features"))
# create one vector with all final features
stages.append(VectorAssembler(inputCols = ['scaled_diff_features', 'age', 'cluster_vec'], outputCol = "final_features"))

data_pipeline = Pipeline(stages = stages)
print(data_pipeline.getStages())

In [None]:
featurizer = data_pipeline.fit(df_train)

df_kmeans = featurizer.transform(df_train).select(*keys + PCA_features + ["label", "cluster", "final_features"])
display(df_kmeans)

In [None]:
display(df_kmeans.groupBy("cluster").mean())

## Train a Logistic Regression Model

Let's build some of the transformations we'll need in our pipeline.

[Logistic Regression Docs](https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression)

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = (LogisticRegression()
     .setLabelCol("label")
     .setFeaturesCol("final_features"))

model_pipeline = Pipeline(stages = [lr])
assert len(model_pipeline.getStages()) == 1 # make sure it's one stage only
print(model_pipeline.getStages())

lr_model = model_pipeline.fit(df_kmeans)

df_pred = lr_model.transform(featurizer.transform(df_test)) # apply the model to our held-out test set
display(df_pred)

## Evaluate the Model

We need to make sure that we use `MulticlassClassificationEvaluator`, not `BinaryClassificationEvaluator`. As we can see below, the evaluation metrics for the multi-class case are different from the binary case.

In [None]:
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator()
print(evaluator.explainParams())

In [None]:
def printEval(df, labelCol = "label"):
  evaluator = MulticlassClassificationEvaluator()
  evaluator.setLabelCol(labelCol)
  
  wrecall = evaluator.setMetricName("weightedPrecision").evaluate(df)
  wprecis = evaluator.setMetricName("weightedPrecision").evaluate(df)
  print("weighted recall: {}\nweighted precision: {}".format(wrecall, wprecis))

In [None]:
printEval(df_pred)

## Train a Random Forest Model

Let's now compare this to what we get if we use a random forest with cross-validation instead.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = (RandomForestClassifier()
      .setLabelCol("label")
      .setFeaturesCol("final_features")
      .setSeed(27))

# print(rf.explainParams())

We set up a single-stage pipeline for the model.

In [None]:
from pyspark.ml import Pipeline

model_pipeline = Pipeline(stages = [rf])

model_pipeline.getStages()

# model_pipeline.getStages()[0].extractParamMap()

We perform a grid search to find the optimal combination of `maxDepth` and `numTrees` for the random forest, and use cross validation to evaluate the algorithm's performance.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (ParamGridBuilder()\
            .addGrid(rf.maxDepth, [5, 10, 20]) \
            .addGrid(rf.numTrees, [20, 50]) \
            .build())

from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator().setMetricName("weightedPrecision")

cv = (CrossValidator()
      .setEstimator(model_pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(3)
      .setSeed(27))

cv_model = cv.fit(df_kmeans)

We can see the result of the grid search and the average (cross-validated) evaluation metric here:

In [None]:
ll = list(zip(cv_model.getEstimatorParamMaps(), cv_model.avgMetrics))
[(list(ll[i][0].values()), ll[i][1]) for i in range(len(ll))]

## Evaluate the Model

Finally, here's the final model's performance.

In [None]:
df_pred = cv_model.transform(featurizer.transform(df_test))

printEval(df_pred)

Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.