##![LearnAI Header](https://coursematerial.blob.core.windows.net/assets/LearnAI_header.png)

# Data preperation and feature engineering for PdM

The previous lab offered an introduction to Azure Databricks (ADB) and DataFrames.  This lab applies, and builds on, this previous lab, to do the data preparation and feature engineering for predictive maintenance.

You will learn best practices for Data Prep w/ ADB, by building a data preprocessing pipeline with essential transformations and estimators on DataFrames.

## Mount the data

As usual, we start with mounting the data. At this point, the data are probably already mounted, because you ran previous labs. In this case, the next cell should run very fast and simply print "Already mounted."

In [3]:
%run "../includes/mnt_blob"

> ### Language conventions 
> From now on we will adopt to following consistent terminology to avoid confusion:
> 
> - A system as a whole will be called a **machine** and its parts are called **components**.
> - A machine can experience **errors** when anomalies happen. Errors do **NOT** result in shutdown, and they are **NOT** tied to any particular components, but they can cause one or several component to eventually fail.
> - A machine can experience **failure** when one of its components shuts down. This requires the component to be replaced before the machine can be operational again.
> - For our purposes, **maintenance** means a component was replaced. This can be either as part of a routine schedule or unscheduled maintenance (due to component failure).

### Telemetry data

Let's beginning by looking at the file format of our data. All data are stored in CSV files.  For example, `telemetry.csv` contains the data collected by four sensors on each machine ("volt", "rotate", "pressure", "vibration") once an hour.

We can use the cell magic `%sh` to see how the data are formatted.

In [6]:
%sh head /dbfs/mnt/data/telemetry/telemetry.csv

From this output, we can tell that there are 6 columns in this file, separated by commas. We can also see that the files includes column names (header).

We can now load the data into a DataFrame. On this first attempt, we will ask the read method to infer the schema of our data.

In [8]:
df_telemetry = spark.read.csv("/mnt/data/telemetry/telemetry.csv", header = "true", inferSchema = "true") # telemetry data

Let's look at the schema to check our column types. Notice that the timestamp column has a `string` type. This willl need to later be changed to `timestamp` so we can perform date-time operations on it.

In [10]:
df_telemetry.printSchema()

### Load other data

We have four separate data sources (files). Let's also load those, and as our first step of data preparation, we combine all of them into one. We do so in this case by appending everything to the telemetry data using a left join, using the keys `machineID` and `datetime`. 

We then convert the type of the `datetime` from `string` to `timestamp`.

In [12]:
df_failures = spark.read.csv("/mnt/data/telemetry/failures.csv", header = "true", inferSchema = "true") # failure log
df_errors = spark.read.csv("/mnt/data/telemetry/errors.csv", header = "true", inferSchema = "true") # critical error log
df_maint = spark.read.csv("/mnt/data/telemetry/maintenance.csv", header = "true", inferSchema = "true") # maintenance history
df_machines = spark.read.csv("/mnt/data/telemetry/machines.csv", header = "true", inferSchema = "true") # info about machine (model, age ..)

In [13]:
keys = ["machineID", "datetime"]

join_type = "left"

df = df_telemetry.join(df_errors.withColumnRenamed("errorID", "error"), keys, join_type) \
                 .join(df_failures.withColumnRenamed("failure", "fail"), keys, join_type) \
                 .join(df_maint.withColumnRenamed("comp", "maint"), keys, join_type) \
                 .join(df_machines, "machineID", join_type) \
                 .cache()

from pyspark.sql.functions import to_timestamp

df = df.withColumn("datetime", to_timestamp("datetime", "MM/dd/yyyy hh:mm:ss aa")) \
       .orderBy(keys) \
       .dropDuplicates(keys)

In [14]:
display(df)

machineID,datetime,volt,rotate,pressure,vibration,error,fail,maint,model,age
1,2015-01-01T06:00:00.000+0000,176.217853015625,418.504078221616,113.077935462083,45.0876857639276,,,,model3,18
1,2015-01-01T07:00:00.000+0000,162.87922289706,402.747489565395,95.4605253823187,43.4139726834815,,,,model3,18
1,2015-01-01T08:00:00.000+0000,170.989902405567,527.349825452291,75.2379048586662,34.1788471214451,,,,model3,18
1,2015-01-01T09:00:00.000+0000,162.462833264092,346.149335043074,109.248561276504,41.1221440884256,,,,model3,18
1,2015-01-01T10:00:00.000+0000,157.61002119306,435.376873016938,111.886648210168,25.9905109982024,,,,model3,18
1,2015-01-01T11:00:00.000+0000,172.504839196295,430.323362106675,95.9270416939636,35.6550173268837,,,,model3,18
1,2015-01-01T12:00:00.000+0000,156.556030606329,499.071623068962,111.755684290096,42.7539196974773,,,,model3,18
1,2015-01-01T13:00:00.000+0000,172.522780814836,409.624717000438,101.00108276407,35.4820086610704,,,,model3,18
1,2015-01-01T14:00:00.000+0000,175.324523915223,398.648780707752,110.624360548654,45.4822868466294,,,,model3,18
1,2015-01-01T15:00:00.000+0000,169.218423246933,460.850669930244,104.848229967003,39.9017354356787,,,,model3,18


Let's check our column types again. Can you confirm that the time of data collection is encoded correctly?

In [16]:
df.printSchema()

## Rolling Averages and Standard Deviations

Our first feature-engineering task will consist of getting rolling averages and standard deviations from the telemetries. Rolling statistics are calculated over a given time interval (or window), which we set at 3 hours. However, based on the use-case the window size can change.

We now create a temporary view from the DataFrame, which will allow us to run SQL commands on the data.

In [18]:
df.createOrReplaceTempView("df")

### Hands-on lab

Use SQL to create a new table called `df_roll` that in addition to the original features also contains moving averages and standard deviations over a 3-hour window for the telemetry data (voltage, rotation, pressure and vibration). To do this you will need to use the `over` clause along with aggregate functions `mean` and `stddev`.

To do this, you will have to `partition by` *machineID*, make sure to `order by` *datetime*, and operate over an intervall of 3 hours.

Here is an example of how to calculate how to calculate a running average over a 3-hour time window for the voltage sensor on machine 1.

In [20]:
%sql 

drop table if exists df_roll;

create table df_roll as
select *,
  mean(volt) over (order by datetime range between interval 3 hours preceding and current row) as volt_ma_3
from df
where machineid == 1
order by datetime

To create your solution, you have to change the code so that it executes over all machines. Then add lines for doing the same for `stddev`, as well as the other sensors.

> Hint: While you are iteratively developing your solution, it may be useful to delete the results of previous attempts.  You can do this by adding the line `drop table if exists df_roll;` towards the top of the cell.

In [22]:
%sql
-- put your solution here

In [23]:
%sql
-- maximize this cell (click the + button on the right) to see the solution:

drop table if exists df_roll;

create table df_roll as
select *,
  mean(volt) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as volt_ma_3,
  mean(rotate) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as rotate_ma_3,
  mean(pressure) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as pressure_ma_3,
  mean(vibration) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as vibration_ma_3,
  stddev(volt) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as volt_sd_3,
  stddev(rotate) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as rotate_sd_3,
  stddev(pressure) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as pressure_sd_3,
  stddev(vibration) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as vibration_sd_3
from df 
order by machineID, datetime

Create a time series plot comparing the original voltage telemetry to its 3-hour-window moving average, for machine with ID = 1. Does the moving average appear to be smoother compared to the original telemetry? How can we make it even smoother?

In [25]:
%sql
-- maximize this cell (click the + button on the right) to see the solution:
select * from df_roll where machineID = 1 order by datetime;

machineID,datetime,volt,rotate,pressure,vibration,error,fail,maint,model,age,volt_ma_3,rotate_ma_3,pressure_ma_3,vibration_ma_3,volt_sd_3,rotate_sd_3,pressure_sd_3,vibration_sd_3
1,2015-01-01T06:00:00.000+0000,176.217853015625,418.504078221616,113.077935462083,45.0876857639276,,,,model3,18,176.217853015625,418.504078221616,113.077935462083,45.0876857639276,,,,
1,2015-01-01T07:00:00.000+0000,162.87922289706,402.747489565395,95.4605253823187,43.4139726834815,,,,model3,18,169.5485379563425,410.6257838935055,104.26923042220083,44.25082922370455,9.431835808576428,11.141590687180878,12.457390134345571,1.183493868944067
1,2015-01-01T08:00:00.000+0000,170.989902405567,527.349825452291,75.2379048586662,34.1788471214451,,,,model3,18,170.02899277275068,449.533797746434,94.59212190102262,40.893501856284736,6.721032201366223,67.84959946425158,18.93495636307687,5.874969789283372
1,2015-01-01T09:00:00.000+0000,162.462833264092,346.149335043074,109.248561276504,41.1221440884256,,,,model3,18,168.13745289558602,423.687682070594,98.25623174489296,40.95066241431995,6.665323822752415,75.77025942779387,17.10919398372915,4.798254821728534
1,2015-01-01T10:00:00.000+0000,157.61002119306,435.376873016938,111.886648210168,25.9905109982024,,,,model3,18,163.48549493994477,427.90588076942447,97.95840993191425,36.17636872288865,5.545290069168625,75.85510590665875,16.772240129910625,7.84400822693189
1,2015-01-01T11:00:00.000+0000,172.504839196295,430.323362106675,95.9270416939636,35.6550173268837,,,,model3,18,165.8918990147535,434.79984890474447,98.07503900982545,34.2366298837392,7.072684747044087,74.03779138108905,16.7506889137327,6.256302004747778
1,2015-01-01T12:00:00.000+0000,156.556030606329,499.071623068962,111.755684290096,42.7539196974773,,,,model3,18,162.283431064944,427.73029830891227,107.2044838676829,36.38039802774725,7.283617351762186,62.74347846728939,7.615664616430348,7.562668484654668
1,2015-01-01T13:00:00.000+0000,172.522780814836,409.624717000438,101.00108276407,35.4820086610704,,,,model3,18,164.79841795263,443.5991437982533,105.1426142395744,34.970364170908454,8.91935549150083,38.623437396668166,7.985272902780372,6.8787602969052335
1,2015-01-01T14:00:00.000+0000,175.324523915223,398.648780707752,110.624360548654,45.4822868466294,,,,model3,18,169.22704363317075,434.4171207209568,104.8270423241959,39.8433081330152,8.550626960261598,45.05922385409984,7.647722010749782,5.060713176766235
1,2015-01-01T15:00:00.000+0000,169.218423246933,460.850669930244,104.848229967003,39.9017354356787,,,,model3,18,168.40543964583026,442.048947676849,107.05733939245576,40.90498766021395,8.284436459410381,46.690454043878944,5.0450138258297015,4.273384052152158


In [26]:
%sql
-- put your solution here

### End of lab

In the above example, we saw how Spark SQL can be used to compute the rolling averages. For illustration, let's now do the same computation using the DataFrame APIs. This allows us to compare and contrast the two.

In [29]:
# this is the alternative (probably better) way of calculating the rolling averages

from pyspark.sql.functions import avg, stddev, col
from pyspark.sql.window import Window

w = (Window.partitionBy(col("machineID")).orderBy(col("datetime").cast('long')).rangeBetween(-3 * 60 * 60, 0))

df_roll = df.withColumn('volt_ma_3', avg("volt").over(w)) \
            .withColumn('rotate_ma_3', avg("rotate").over(w)) \
            .withColumn('pressure_ma_3', avg("pressure").over(w)) \
            .withColumn('vibration_ma_3', avg("vibration").over(w)) \
            .withColumn('volt_sd_3', stddev("volt").over(w)) \
            .withColumn('rotate_sd_3', stddev("rotate").over(w)) \
            .withColumn('pressure_sd_3', stddev("pressure").over(w)) \
            .withColumn('vibration_sd_3', stddev("vibration").over(w)) \
            .cache()

df_roll.createOrReplaceTempView("df_roll")

To be safe, let's re-create the last time series plot to make sure that the two look similar.

In [31]:
display(df_roll.filter(col("machineID") == 1).orderBy(col("datetime")))

machineID,datetime,volt,rotate,pressure,vibration,error,fail,maint,model,age,volt_ma_3,rotate_ma_3,pressure_ma_3,vibration_ma_3,volt_sd_3,rotate_sd_3,pressure_sd_3,vibration_sd_3
1,2015-01-01T06:00:00.000+0000,176.217853015625,418.504078221616,113.077935462083,45.0876857639276,,,,model3,18,176.217853015625,418.504078221616,113.077935462083,45.0876857639276,,,,
1,2015-01-01T07:00:00.000+0000,162.87922289706,402.747489565395,95.4605253823187,43.4139726834815,,,,model3,18,169.5485379563425,410.6257838935055,104.26923042220083,44.25082922370455,9.431835808576428,11.141590687180878,12.457390134345571,1.183493868944067
1,2015-01-01T08:00:00.000+0000,170.989902405567,527.349825452291,75.2379048586662,34.1788471214451,,,,model3,18,170.02899277275068,449.533797746434,94.59212190102262,40.893501856284736,6.721032201366223,67.84959946425158,18.93495636307687,5.874969789283372
1,2015-01-01T09:00:00.000+0000,162.462833264092,346.149335043074,109.248561276504,41.1221440884256,,,,model3,18,168.13745289558602,423.687682070594,98.25623174489296,40.95066241431995,6.665323822752415,75.77025942779387,17.10919398372915,4.798254821728534
1,2015-01-01T10:00:00.000+0000,157.61002119306,435.376873016938,111.886648210168,25.9905109982024,,,,model3,18,163.48549493994477,427.90588076942447,97.95840993191425,36.17636872288865,5.545290069168625,75.85510590665875,16.772240129910625,7.84400822693189
1,2015-01-01T11:00:00.000+0000,172.504839196295,430.323362106675,95.9270416939636,35.6550173268837,,,,model3,18,165.8918990147535,434.79984890474447,98.07503900982545,34.2366298837392,7.072684747044087,74.03779138108905,16.7506889137327,6.256302004747778
1,2015-01-01T12:00:00.000+0000,156.556030606329,499.071623068962,111.755684290096,42.7539196974773,,,,model3,18,162.283431064944,427.73029830891227,107.2044838676829,36.38039802774725,7.283617351762186,62.74347846728939,7.615664616430348,7.562668484654668
1,2015-01-01T13:00:00.000+0000,172.522780814836,409.624717000438,101.00108276407,35.4820086610704,,,,model3,18,164.79841795263,443.5991437982533,105.1426142395744,34.970364170908454,8.91935549150083,38.623437396668166,7.985272902780372,6.8787602969052335
1,2015-01-01T14:00:00.000+0000,175.324523915223,398.648780707752,110.624360548654,45.4822868466294,,,,model3,18,169.22704363317075,434.4171207209568,104.8270423241959,39.8433081330152,8.550626960261598,45.05922385409984,7.647722010749782,5.060713176766235
1,2015-01-01T15:00:00.000+0000,169.218423246933,460.850669930244,104.848229967003,39.9017354356787,,,,model3,18,168.40543964583026,442.048947676849,107.05733939245576,40.90498766021395,8.284436459410381,46.690454043878944,5.0450138258297015,4.273384052152158


Let's now look at the distribution of errors for each machine. This helps us understand the errors a little better. Recall that errors don't break the machine immediately, but can eventually lead to downtime. However, we must ensure that errors are relatively evenly distributed across machines otherwise our model may fail to generalize well.

In [33]:
df1 = df_roll.select("machineID", "error").filter(col("error").isNull() == False).groupBy("machineID", "error").count().orderBy("machineID", "error")

In [34]:
display(df1)

machineID,error,count
1,error1,11
1,error2,4
1,error3,9
1,error4,5
1,error5,4
2,error1,4
2,error2,7
2,error3,5
2,error4,5
2,error5,3


Good news, the all error types are distributed across all machines.

## Feature engineering for error, failure and maintenance data

It's time for us to do some feature engineering with the error, failure and maintenance history. At a high level, our feature engineering will consist of three important steps:

- Create one-hot-encoded features (i.e. "dummy variables") from `error`, `failure` and `maint`. This is an intermediate step because we will not use these features for modeling. Instead we use them to compute the following set of features.
- Create features that measure for any timestamp how many hours have elapsed since the last time an error or a failure happened, and how many hours have elapsed since the last time maintenance was performed.
- Use `failure` to create a target variable.

Let's first take a look at how `error`, `failure` and `maint` break down:

In [36]:
df_roll.groupBy("error").count().sort("count").show()
df_roll.groupBy("fail").count().sort("count").show()
df_roll.groupBy("maint").count().sort("count").show()

As we can see, failure and maintenance are recorded component by component. So for any machine, a given component can fail at a given time and eventually be replaced at some other time. On the other hand, error is NOT component by component but concerns the machine as a whole. So we know that a machine at a given time can experience one of 5 kinds of errors.

To get to our features, we first need to run one-hot encoding on the error, failure and maintenance columns. Prior to that we need to replace any missing values, which we encode here using the string "zzzz".

In [38]:
# we fill NAs for errors prior to one-hot-encoding, and use 'zzzzz' so we can sort alphabetically
df_roll = df_roll.fillna("zzzzz", subset = ["error", "fail", "maint"]) \
                 .drop("volt", "rotate","pressure","vibration")

We can now create a pipeline that will do one-hot encoding on the above features. Since they are of `string` type, we begin by using `StringIndexer` to first index the features and we then pass the results to `OneHotEncoderEstimator` which runs one-hot encoding on the features and stores the results as a **sparse matrix**, encoded using `vectortype`.

In [40]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
stages = []
cat_cols = ["error", "fail", "maint"]

for cat_col in cat_cols:
  indexer = StringIndexer(inputCol = cat_col, outputCol = cat_col + "_index", stringOrderType = "alphabetAsc")
  encoder = OneHotEncoderEstimator(inputCols = [indexer.getOutputCol()], outputCols = [cat_col + "_vec"], dropLast = True)
  stages += [indexer, encoder]
  
pipeline = Pipeline(stages=stages)
df_encoded = pipeline.fit(df_roll).transform(df_roll)
df_encoded = df_encoded.drop("error").drop("fail").drop("maint")
display(df_encoded)

machineID,datetime,model,age,volt_ma_3,rotate_ma_3,pressure_ma_3,vibration_ma_3,volt_sd_3,rotate_sd_3,pressure_sd_3,vibration_sd_3,error_index,error_vec,fail_index,fail_vec,maint_index,maint_vec
31,2015-01-01T06:00:00.000+0000,model1,11,174.475534906489,483.60866481675,107.143516076057,31.6301315950459,,,,,5.0,"List(0, 5, List(), List())",4.0,"List(0, 4, List(), List())",4.0,"List(0, 4, List(), List())"
31,2015-01-01T07:00:00.000+0000,model1,11,174.454327382408,470.894921134,101.27896231174824,37.6822063090014,0.0299919681797135,17.9799487446803,8.293731470751615,8.558926140971138,5.0,"List(0, 5, List(), List())",4.0,"List(0, 4, List(), List())",4.0,"List(0, 4, List(), List())"
31,2015-01-01T08:00:00.000+0000,model1,11,169.0217324941,492.8754784185956,98.71704481096768,41.876980734419966,9.409554262582695,40.138185983609645,7.354131811225906,9.456003689917669,5.0,"List(0, 5, List(), List())",4.0,"List(0, 4, List(), List())",4.0,"List(0, 4, List(), List())"
31,2015-01-01T09:00:00.000+0000,model1,11,171.14787696081373,491.21770319704274,99.69031701896976,44.08921516250103,8.781140897040395,32.93997860924226,6.312253028414933,8.898685025420304,5.0,"List(0, 5, List(), List())",4.0,"List(0, 4, List(), List())",4.0,"List(0, 4, List(), List())"
31,2015-01-01T10:00:00.000+0000,model1,11,169.954787423027,487.661865549241,97.95456294820826,47.98673762432493,8.497946949324412,34.752968170368845,4.171025309037839,3.2337541221105783,5.0,"List(0, 5, List(), List())",4.0,"List(0, 4, List(), List())",4.0,"List(0, 4, List(), List())"
31,2015-01-01T11:00:00.000+0000,model1,11,179.40239854790374,470.40056866276046,98.04344404956107,47.689153233518006,23.282396249501677,61.29092948065508,4.10207186764863,3.7664259123574335,5.0,"List(0, 5, List(), List())",4.0,"List(0, 4, List(), List())",4.0,"List(0, 4, List(), List())"
31,2015-01-01T12:00:00.000+0000,model1,11,182.06919814906976,452.5768724519912,99.88796209255094,44.00059211415587,20.47994342334167,43.23910170593667,2.92344794954377,6.576959997525865,5.0,"List(0, 5, List(), List())",4.0,"List(0, 4, List(), List())",4.0,"List(0, 4, List(), List())"
31,2015-01-01T13:00:00.000+0000,model1,11,178.11043905951652,454.03031597803,96.951094473378,41.206868232054696,23.023315116141376,44.81696205822597,4.661354626050784,4.936877814234024,5.0,"List(0, 5, List(), List())",4.0,"List(0, 4, List(), List())",4.0,"List(0, 4, List(), List())"
31,2015-01-01T14:00:00.000+0000,model1,11,177.50046909330877,454.8516093037292,102.21110777295176,40.328539701053,23.350298336110928,45.22043121466925,13.340825578720503,3.6570581175676895,5.0,"List(0, 5, List(), List())",4.0,"List(0, 4, List(), List())",4.0,"List(0, 4, List(), List())"
31,2015-01-01T15:00:00.000+0000,model1,11,164.594565260634,467.7073543571013,100.02749165845351,39.47835922882047,4.057672401800658,21.286289610664905,15.315258823139631,3.3530066179621087,5.0,"List(0, 5, List(), List())",4.0,"List(0, 4, List(), List())",4.0,"List(0, 4, List(), List())"


In [41]:
df_encoded.printSchema()

### Time since last error, failure, maintenance

In order to calculate the time elapsed since the last error, failure and maintenance event, we first need to take our one-hot-encoded sparse vactor feature and break it up into indiviual binary features for each event.

In [43]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
cat_cols = ["error", "fail", "maint"]

for cc in cat_cols:
  indexes = df_encoded.select(cc + '_index').distinct().rdd.flatMap(lambda x : x).collect()
  indexes.sort()
  print(indexes)
  for index in indexes[:-1]: # we remove the last index
      function = udf(lambda item: 1 if item == index else 0, IntegerType())
      new_column_name = cc + '_' + str(int(index))
      df_encoded = df_encoded.withColumn(new_column_name, function(col(cc + '_index')))

We can now drop the original sparse vector features.

In [45]:
cols_drop = cat_cols + [c + "_vec" for c in cat_cols]
df_encoded = df_encoded.select([col for col in df_encoded.columns if col not in cols_drop])
display(df_encoded)

machineID,datetime,model,age,volt_ma_3,rotate_ma_3,pressure_ma_3,vibration_ma_3,volt_sd_3,rotate_sd_3,pressure_sd_3,vibration_sd_3,error_index,fail_index,maint_index,error_0,error_1,error_2,error_3,error_4,fail_0,fail_1,fail_2,fail_3,maint_0,maint_1,maint_2,maint_3
31,2015-01-01T06:00:00.000+0000,model1,11,174.475534906489,483.60866481675,107.143516076057,31.6301315950459,,,,,5.0,4.0,4.0,0,0,0,0,0,0,0,0,0,0,0,0,0
31,2015-01-01T07:00:00.000+0000,model1,11,174.454327382408,470.894921134,101.27896231174824,37.6822063090014,0.0299919681797135,17.9799487446803,8.293731470751615,8.558926140971138,5.0,4.0,4.0,0,0,0,0,0,0,0,0,0,0,0,0,0
31,2015-01-01T08:00:00.000+0000,model1,11,169.0217324941,492.8754784185956,98.71704481096768,41.876980734419966,9.409554262582695,40.138185983609645,7.354131811225906,9.456003689917669,5.0,4.0,4.0,0,0,0,0,0,0,0,0,0,0,0,0,0
31,2015-01-01T09:00:00.000+0000,model1,11,171.14787696081373,491.21770319704274,99.69031701896976,44.08921516250103,8.781140897040395,32.93997860924226,6.312253028414933,8.898685025420304,5.0,4.0,4.0,0,0,0,0,0,0,0,0,0,0,0,0,0
31,2015-01-01T10:00:00.000+0000,model1,11,169.954787423027,487.661865549241,97.95456294820826,47.98673762432493,8.497946949324412,34.752968170368845,4.171025309037839,3.2337541221105783,5.0,4.0,4.0,0,0,0,0,0,0,0,0,0,0,0,0,0
31,2015-01-01T11:00:00.000+0000,model1,11,179.40239854790374,470.40056866276046,98.04344404956107,47.689153233518006,23.282396249501677,61.29092948065508,4.10207186764863,3.7664259123574335,5.0,4.0,4.0,0,0,0,0,0,0,0,0,0,0,0,0,0
31,2015-01-01T12:00:00.000+0000,model1,11,182.06919814906976,452.5768724519912,99.88796209255094,44.00059211415587,20.47994342334167,43.23910170593667,2.92344794954377,6.576959997525865,5.0,4.0,4.0,0,0,0,0,0,0,0,0,0,0,0,0,0
31,2015-01-01T13:00:00.000+0000,model1,11,178.11043905951652,454.03031597803,96.951094473378,41.206868232054696,23.023315116141376,44.81696205822597,4.661354626050784,4.936877814234024,5.0,4.0,4.0,0,0,0,0,0,0,0,0,0,0,0,0,0
31,2015-01-01T14:00:00.000+0000,model1,11,177.50046909330877,454.8516093037292,102.21110777295176,40.328539701053,23.350298336110928,45.22043121466925,13.340825578720503,3.6570581175676895,5.0,4.0,4.0,0,0,0,0,0,0,0,0,0,0,0,0,0
31,2015-01-01T15:00:00.000+0000,model1,11,164.594565260634,467.7073543571013,100.02749165845351,39.47835922882047,4.057672401800658,21.286289610664905,15.315258823139631,3.3530066179621087,5.0,4.0,4.0,0,0,0,0,0,0,0,0,0,0,0,0,0


Let's take a look at an example of one of the one-hot-encoded features we created: `error_4` is a binary feature that is set to 1 for any timestamp that corresponds to an error of type 4 in the machine (and 0 otherwise). We can see in the below result how many errors of type 4 occured in the overall data (across all machines).

In [47]:
df_encoded.groupBy("error_4").count().show()

We now use the binary features we created in the previous step to create a new set of features. If `error_4` is a binary feature that represents the an occurence of error of type 4 for a given machine and timestamp, we can use it to create a corresponding feature called `diff_error_4` which measures the number of hours elapsed since the last time an error of type 4 occured.

In [49]:
from pyspark.sql.functions import avg, lag, when, lit, sum, row_number
from pyspark.sql.window import Window

w = Window.partitionBy(col("machineID")).orderBy(col("datetime"))

# first compute a lagged feature for datetime and convert both datetime and the lagged datetime to numbers
df_diff = df_encoded.withColumn('datetime_lag', lag(df_encoded['datetime']).over(w)) \
                    .withColumn('dt_long', col('datetime').cast('long')) \
                    .withColumn('dt_long_lag', col('datetime_lag').cast('long')) \
                    .drop('datetime_lag')

cat_vars = ["error", "fail", "maint"]

for cat_var in cat_vars:
  # find how many categories there are in each categorical feature
  indexes = df_encoded.select(cat_var + '_index').distinct().rdd.flatMap(lambda x : x).collect()
  indexes.sort()
  for index in indexes[:-1]: # we remove the last category (these are the non-events)
    cat_num = str(int(index))
    cat_col = cat_var + '_' + cat_num
    dif_col = 'd_' + cat_var + '_' + cat_num
    cum_col = 'diff_' + cat_var + '_' + cat_num
    print(cum_col)

    # compute an intermediate feature used for grouping results
    find_cum_grp = sum(col(cat_col)).over(w)
    df_diff = df_diff.withColumn('cum_grp', find_cum_grp).cache()

    # compute the number of hours elapsed since the last event (error, failure, or maintenance)
    ww = Window.partitionBy(col("machineID"), col("cum_grp")).orderBy(col("datetime"))
    find_lag_diff = when(col(cat_col) == 1, lit(0)).otherwise((col('dt_long') - col('dt_long_lag')) / (60*60))
    # for first occurence, we don't know when the last event was, so for now we just assume 100 hours
    df_diff = df_diff.withColumn(dif_col, find_lag_diff) \
                     .fillna(100, subset = [dif_col]).cache()
    
    # reset the time elapsed back to 0 every time an event occurs (otherwise compute cumulative sum)
    find_cum_diff = when(col(cat_col) == 1, lit(0)).otherwise(sum(col(dif_col)).over(ww))
    df_diff = df_diff.withColumn(cum_col, find_cum_diff) \
                     .drop('cum_grp').drop(dif_col).drop(cat_col).cache() # if I don't use cache here I get a out-of-memory error
  
  df_diff = df_diff.drop(cat_var)


df_diff = df_diff.drop('dt_long').drop('dt_long_lag').orderBy('machineID', 'datetime')
display(df_diff)

machineID,datetime,model,age,volt_ma_3,rotate_ma_3,pressure_ma_3,vibration_ma_3,volt_sd_3,rotate_sd_3,pressure_sd_3,vibration_sd_3,error_index,fail_index,maint_index,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3
1,2015-01-01T06:00:00.000+0000,model3,18,176.217853015625,418.504078221616,113.077935462083,45.0876857639276,,,,,5.0,4.0,4.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0
1,2015-01-01T07:00:00.000+0000,model3,18,169.5485379563425,410.6257838935055,104.26923042220083,44.25082922370455,9.431835808576428,11.141590687180878,12.457390134345571,1.183493868944067,5.0,4.0,4.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0
1,2015-01-01T08:00:00.000+0000,model3,18,170.02899277275068,449.533797746434,94.59212190102262,40.893501856284736,6.721032201366223,67.84959946425158,18.93495636307687,5.874969789283372,5.0,4.0,4.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0
1,2015-01-01T09:00:00.000+0000,model3,18,168.13745289558602,423.687682070594,98.25623174489296,40.95066241431995,6.665323822752415,75.77025942779387,17.10919398372915,4.798254821728534,5.0,4.0,4.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0
1,2015-01-01T10:00:00.000+0000,model3,18,163.48549493994477,427.90588076942447,97.95840993191425,36.17636872288865,5.545290069168625,75.85510590665875,16.772240129910625,7.84400822693189,5.0,4.0,4.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0
1,2015-01-01T11:00:00.000+0000,model3,18,165.8918990147535,434.79984890474447,98.07503900982545,34.2366298837392,7.072684747044087,74.03779138108905,16.7506889137327,6.256302004747778,5.0,4.0,4.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0
1,2015-01-01T12:00:00.000+0000,model3,18,162.283431064944,427.73029830891227,107.2044838676829,36.38039802774725,7.283617351762186,62.74347846728939,7.615664616430348,7.562668484654668,5.0,4.0,4.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0
1,2015-01-01T13:00:00.000+0000,model3,18,164.79841795263,443.5991437982533,105.1426142395744,34.970364170908454,8.91935549150083,38.623437396668166,7.985272902780372,6.8787602969052335,5.0,4.0,4.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0
1,2015-01-01T14:00:00.000+0000,model3,18,169.22704363317075,434.4171207209568,104.8270423241959,39.8433081330152,8.550626960261598,45.05922385409984,7.647722010749782,5.060713176766235,5.0,4.0,4.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0
1,2015-01-01T15:00:00.000+0000,model3,18,168.40543964583026,442.048947676849,107.05733939245576,40.90498766021395,8.284436459410381,46.690454043878944,5.0450138258297015,4.273384052152158,5.0,4.0,4.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0


We can visualize our new features to get a better sense of what they look like. In this example, we look at the maintenance history. Recall that there are 4 components to each machine and maintenance happens component by component. In the plot below, components are represented by colors. As we move along the x-axis, we generally see a linear increase that represents time elapsing until there's a sharp drop back to 0 whenever maintenance is performed, at which point we reset the timer.

In [51]:
display(df_diff.filter(col('machineID') == 1))

machineID,datetime,model,age,volt_ma_3,rotate_ma_3,pressure_ma_3,vibration_ma_3,volt_sd_3,rotate_sd_3,pressure_sd_3,vibration_sd_3,error_index,fail_index,maint_index,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3
1,2015-01-01T06:00:00.000+0000,model3,18,176.217853015625,418.504078221616,113.077935462083,45.0876857639276,,,,,5.0,4.0,4.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0
1,2015-01-01T07:00:00.000+0000,model3,18,169.5485379563425,410.6257838935055,104.26923042220083,44.25082922370455,9.431835808576428,11.141590687180878,12.457390134345571,1.183493868944067,5.0,4.0,4.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0
1,2015-01-01T08:00:00.000+0000,model3,18,170.02899277275068,449.533797746434,94.59212190102262,40.893501856284736,6.721032201366223,67.84959946425158,18.93495636307687,5.874969789283372,5.0,4.0,4.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0
1,2015-01-01T09:00:00.000+0000,model3,18,168.13745289558602,423.687682070594,98.25623174489296,40.95066241431995,6.665323822752415,75.77025942779387,17.10919398372915,4.798254821728534,5.0,4.0,4.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0
1,2015-01-01T10:00:00.000+0000,model3,18,163.48549493994477,427.90588076942447,97.95840993191425,36.17636872288865,5.545290069168625,75.85510590665875,16.772240129910625,7.84400822693189,5.0,4.0,4.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0
1,2015-01-01T11:00:00.000+0000,model3,18,165.8918990147535,434.79984890474447,98.07503900982545,34.2366298837392,7.072684747044087,74.03779138108905,16.7506889137327,6.256302004747778,5.0,4.0,4.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0
1,2015-01-01T12:00:00.000+0000,model3,18,162.283431064944,427.73029830891227,107.2044838676829,36.38039802774725,7.283617351762186,62.74347846728939,7.615664616430348,7.562668484654668,5.0,4.0,4.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0
1,2015-01-01T13:00:00.000+0000,model3,18,164.79841795263,443.5991437982533,105.1426142395744,34.970364170908454,8.91935549150083,38.623437396668166,7.985272902780372,6.8787602969052335,5.0,4.0,4.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0
1,2015-01-01T14:00:00.000+0000,model3,18,169.22704363317075,434.4171207209568,104.8270423241959,39.8433081330152,8.550626960261598,45.05922385409984,7.647722010749782,5.060713176766235,5.0,4.0,4.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0
1,2015-01-01T15:00:00.000+0000,model3,18,168.40543964583026,442.048947676849,107.05733939245576,40.90498766021395,8.284436459410381,46.690454043878944,5.0450138258297015,4.273384052152158,5.0,4.0,4.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0


## Target Labels - lag features

The last step in data prep is for us to create a targets for the PdM model. You might wonder why we don't just use `fail_0` through `fail_3` as our targets, since they indicate when a component failed (and hence the whole machine). In fact we could, but PdM is not about predicting when a machine fails, but predicting when it's **about to** fail. So it's better to create labels that indicate the state of the machine shortly **prior to failure** (how far back we want to go is something we need to determine use-case by use-case).

Another justification for picking the window size prior to failure is that it based on how long it takes to fix failures after they happen. If for example we expect a 3-day downtime, then we flag everything up to 3 days prior to failure.

In [53]:
import pandas as pd
from pyspark.sql.functions import avg, lag, when, lit, sum

df_all = df_diff.cache()

for i in range(4): # iterate over the four components
    # find all the times a component failed for a given machine
    df_temp = df_diff.filter(col('fail_index') == i).select('machineID', 'datetime').toPandas()
    label = 'y_' + str(i) # name of target column (one per component)
    # create a new target column for each component and set to 0 by default
    df_all = df_all.withColumn(label, lit(0))
    for n in range(df_temp.shape[0]): # iterate over all the failure times
        machineID, datetime = df_temp.iloc[n, :]
        dt_end = datetime - pd.Timedelta('10 minutes') # from 10 minutes prior to failure
        dt_start = datetime - pd.Timedelta('3 days') # up to 3 days prior to failure
        if n % 500 == 0:
            print("failure on machine {0} at {1}, so {2} is between {4} and {3}".format(machineID, datetime, label, dt_end, dt_start))
        # every time a failure happens, set the target feature to 1 within the provided window
        find_labels = when((col('machineID') == int(machineID)) & (col('datetime').between(dt_start, dt_end)), lit(1)).otherwise(col(label))
        df_all = df_all.withColumn(label, find_labels)
        
display(df_all)

machineID,datetime,model,age,volt_ma_3,rotate_ma_3,pressure_ma_3,vibration_ma_3,volt_sd_3,rotate_sd_3,pressure_sd_3,vibration_sd_3,error_index,fail_index,maint_index,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3,y_0,y_1,y_2,y_3
1,2015-01-01T06:00:00.000+0000,model3,18,176.217853015625,418.504078221616,113.077935462083,45.0876857639276,,,,,5.0,4.0,4.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,0,0,0,0
1,2015-01-01T07:00:00.000+0000,model3,18,169.5485379563425,410.6257838935055,104.26923042220083,44.25082922370455,9.431835808576428,11.141590687180878,12.457390134345571,1.183493868944067,5.0,4.0,4.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,0,0,0,0
1,2015-01-01T08:00:00.000+0000,model3,18,170.02899277275068,449.533797746434,94.59212190102262,40.893501856284736,6.721032201366223,67.84959946425158,18.93495636307687,5.874969789283372,5.0,4.0,4.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,0,0,0,0
1,2015-01-01T09:00:00.000+0000,model3,18,168.13745289558602,423.687682070594,98.25623174489296,40.95066241431995,6.665323822752415,75.77025942779387,17.10919398372915,4.798254821728534,5.0,4.0,4.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,0,0,0,0
1,2015-01-01T10:00:00.000+0000,model3,18,163.48549493994477,427.90588076942447,97.95840993191425,36.17636872288865,5.545290069168625,75.85510590665875,16.772240129910625,7.84400822693189,5.0,4.0,4.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,0,0,0,0
1,2015-01-01T11:00:00.000+0000,model3,18,165.8918990147535,434.79984890474447,98.07503900982545,34.2366298837392,7.072684747044087,74.03779138108905,16.7506889137327,6.256302004747778,5.0,4.0,4.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,0,0,0,0
1,2015-01-01T12:00:00.000+0000,model3,18,162.283431064944,427.73029830891227,107.2044838676829,36.38039802774725,7.283617351762186,62.74347846728939,7.615664616430348,7.562668484654668,5.0,4.0,4.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,0,0,0,0
1,2015-01-01T13:00:00.000+0000,model3,18,164.79841795263,443.5991437982533,105.1426142395744,34.970364170908454,8.91935549150083,38.623437396668166,7.985272902780372,6.8787602969052335,5.0,4.0,4.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,0,0,0,0
1,2015-01-01T14:00:00.000+0000,model3,18,169.22704363317075,434.4171207209568,104.8270423241959,39.8433081330152,8.550626960261598,45.05922385409984,7.647722010749782,5.060713176766235,5.0,4.0,4.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,0,0,0,0
1,2015-01-01T15:00:00.000+0000,model3,18,168.40543964583026,442.048947676849,107.05733939245576,40.90498766021395,8.284436459410381,46.690454043878944,5.0450138258297015,4.273384052152158,5.0,4.0,4.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,0,0,0,0


As one example, we can visualize `y_0` by machine. Notice how rare our labels are.

In [55]:
display(df_all.crosstab("machineID", "y_0"))

machineID_y_0,0,1
69,8761,0
88,8617,144
5,8473,288
10,8689,72
56,8545,216
42,8617,144
24,8593,168
37,8689,72
25,8689,72
52,8761,0


The difficult parts of our data prep is over. Now we have just some house-cleaning to do before we hand the data over to the machine learning algorithms. Let's begin by checking if any missing data are present.

In [57]:
from pyspark.sql.functions import col, sum, isnan
display(df_all.select(*(sum((col(c).isNull()).cast("int")).alias(c) for c in df_all.columns)))

machineID,datetime,model,age,volt_ma_3,rotate_ma_3,pressure_ma_3,vibration_ma_3,volt_sd_3,rotate_sd_3,pressure_sd_3,vibration_sd_3,error_index,fail_index,maint_index,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3,y_0,y_1,y_2,y_3
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [58]:
from pyspark.ml.feature import Imputer
impute_cols = [c for c in df_all.columns if c.endswith('_sd_3')]
print(impute_cols)
imputer = Imputer(inputCols = impute_cols, outputCols = impute_cols)
model = imputer.fit(df_all)
# model.surrogateDF.show()
df_all = model.transform(df_all)

In [59]:
X_drop = ['error_index', 'fail_index', 'maint_index', 'f_1', 'f_2', 'f_3', 'f_4', 'y_0, ''y_1', 'y_2', 'y_3', 'model']
Y_keep = ['y_0', 'y_1', 'y_2', 'y_3']

keys = ['machineID', 'datetime']
X_keep = list(set(df_all.columns) - set(X_drop + Y_keep + keys))

df = df_all.select(keys + sorted(X_keep + Y_keep))

display(df)

machineID,datetime,age,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3,pressure_ma_3,pressure_sd_3,rotate_ma_3,rotate_sd_3,vibration_ma_3,vibration_sd_3,volt_ma_3,volt_sd_3,y_0,y_1,y_2,y_3
1,2015-01-01T06:00:00.000+0000,18,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,113.077935462083,9.24388313341626,418.504078221616,46.240595083501134,45.0876857639276,4.618630790388552,176.217853015625,13.83274107709071,0,0,0,0
1,2015-01-01T07:00:00.000+0000,18,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,104.26923042220083,12.457390134345571,410.6257838935055,11.141590687180878,44.25082922370455,1.183493868944067,169.5485379563425,9.431835808576428,0,0,0,0
1,2015-01-01T08:00:00.000+0000,18,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,94.59212190102262,18.93495636307687,449.533797746434,67.84959946425158,40.893501856284736,5.874969789283372,170.02899277275068,6.721032201366223,0,0,0,0
1,2015-01-01T09:00:00.000+0000,18,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,98.25623174489296,17.10919398372915,423.687682070594,75.77025942779387,40.95066241431995,4.798254821728534,168.13745289558602,6.665323822752415,0,0,0,0
1,2015-01-01T10:00:00.000+0000,18,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,97.95840993191425,16.772240129910625,427.90588076942447,75.85510590665875,36.17636872288865,7.84400822693189,163.48549493994477,5.545290069168625,0,0,0,0
1,2015-01-01T11:00:00.000+0000,18,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,98.07503900982545,16.7506889137327,434.79984890474447,74.03779138108905,34.2366298837392,6.256302004747778,165.8918990147535,7.072684747044087,0,0,0,0
1,2015-01-01T12:00:00.000+0000,18,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,107.2044838676829,7.615664616430348,427.73029830891227,62.74347846728939,36.38039802774725,7.562668484654668,162.283431064944,7.283617351762186,0,0,0,0
1,2015-01-01T13:00:00.000+0000,18,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,105.1426142395744,7.985272902780372,443.5991437982533,38.623437396668166,34.970364170908454,6.8787602969052335,164.79841795263,8.91935549150083,0,0,0,0
1,2015-01-01T14:00:00.000+0000,18,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,104.8270423241959,7.647722010749782,434.4171207209568,45.05922385409984,39.8433081330152,5.060713176766235,169.22704363317075,8.550626960261598,0,0,0,0
1,2015-01-01T15:00:00.000+0000,18,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,107.05733939245576,5.0450138258297015,442.048947676849,46.690454043878944,40.90498766021395,4.273384052152158,168.40543964583026,8.284436459410381,0,0,0,0


In [60]:
display(df)

machineID,datetime,age,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3,pressure_ma_3,pressure_sd_3,rotate_ma_3,rotate_sd_3,vibration_ma_3,vibration_sd_3,volt_ma_3,volt_sd_3,y_0,y_1,y_2,y_3
1,2015-01-01T06:00:00.000+0000,18,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,113.077935462083,9.24388313341626,418.504078221616,46.240595083501134,45.0876857639276,4.618630790388552,176.217853015625,13.83274107709071,0,0,0,0
1,2015-01-01T07:00:00.000+0000,18,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,101.0,104.26923042220083,12.457390134345571,410.6257838935055,11.141590687180878,44.25082922370455,1.183493868944067,169.5485379563425,9.431835808576428,0,0,0,0
1,2015-01-01T08:00:00.000+0000,18,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,102.0,94.59212190102262,18.93495636307687,449.533797746434,67.84959946425158,40.893501856284736,5.874969789283372,170.02899277275068,6.721032201366223,0,0,0,0
1,2015-01-01T09:00:00.000+0000,18,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,103.0,98.25623174489296,17.10919398372915,423.687682070594,75.77025942779387,40.95066241431995,4.798254821728534,168.13745289558602,6.665323822752415,0,0,0,0
1,2015-01-01T10:00:00.000+0000,18,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,104.0,97.95840993191425,16.772240129910625,427.90588076942447,75.85510590665875,36.17636872288865,7.84400822693189,163.48549493994477,5.545290069168625,0,0,0,0
1,2015-01-01T11:00:00.000+0000,18,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,98.07503900982545,16.7506889137327,434.79984890474447,74.03779138108905,34.2366298837392,6.256302004747778,165.8918990147535,7.072684747044087,0,0,0,0
1,2015-01-01T12:00:00.000+0000,18,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,106.0,107.2044838676829,7.615664616430348,427.73029830891227,62.74347846728939,36.38039802774725,7.562668484654668,162.283431064944,7.283617351762186,0,0,0,0
1,2015-01-01T13:00:00.000+0000,18,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,107.0,105.1426142395744,7.985272902780372,443.5991437982533,38.623437396668166,34.970364170908454,6.8787602969052335,164.79841795263,8.91935549150083,0,0,0,0
1,2015-01-01T14:00:00.000+0000,18,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,104.8270423241959,7.647722010749782,434.4171207209568,45.05922385409984,39.8433081330152,5.060713176766235,169.22704363317075,8.550626960261598,0,0,0,0
1,2015-01-01T15:00:00.000+0000,18,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,109.0,107.05733939245576,5.0450138258297015,442.048947676849,46.690454043878944,40.90498766021395,4.273384052152158,168.40543964583026,8.284436459410381,0,0,0,0


In [61]:
dbutils.fs.rm("dbfs:/FileStore/tables/preprocessed", recurse = True)

df.write.parquet("dbfs:/FileStore/tables/preprocessed")

In [62]:
# You can ignore this code, we use it for testing our notebooks.
assert len(df.columns) == 28

Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.