##![LearnAI Header](https://coursematerial.blob.core.windows.net/assets/LearnAI_header.png)

# Model deployment

In this lab, you will learn how to deploy your machine learning solution as a webservice for real-time scoring.

> Please ensure you have run all previous notebooks in sequence before running this.

Let's begin by loading our Azure ML Workspace.

In [4]:
from azureml.core import Workspace
import azureml.core
import os

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

config_path = '/dbfs/tmp/'

ws = Workspace.from_config(path=os.path.join(config_path, 'aml_config', 'config.json'))
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Resource group: ' + ws.resource_group, sep = '\n')

Recall that in an earlier Notebook we trained a logistic regression model and saved the pipeline object for it on disk. Let's now load the pipeline object into our current session.

In [6]:
from pyspark.ml import PipelineModel
model_name = "PdM_logistic_regression"
lr_model = PipelineModel.load(model_name)

NOTE: The service deployment always gets the model from the current working directory, so we will copy it there.

In [8]:
model_name_dbfs = os.path.join("/dbfs", model_name)

print("copy model from dbfs to local")
model_local = "file:" + os.getcwd() + "/" + model_name
dbutils.fs.cp(model_name, model_local, True)

We now register this model to our Azure ML Workspace. Once a model is registered, it is visible from the portal. If a model with the same name is registered, it will automatically be versioned.

In [10]:
# register the model
from azureml.core.model import Model
mymodel = Model.register(model_path = model_name, # this points to a local file
                       model_name = model_name, # this is the name the model is registered as, am using same name for both path and name.                 
                       description = "ADB trained model by an amazing data scientist",
                       workspace = ws)

print(mymodel.name, mymodel.description, mymodel.version)

## Converting your data to and from JSON

The most common way to interact with a webservice is using a [REST](https://en.wikipedia.org/wiki/Representational_state_transfer) API, sending and receiving [JSON](https://en.wikipedia.org/wiki/JSON) data.  

We therefore need to convert our dataframe to JSON to send it to the webservice, and the webservice has to then convert it back into a dataframe so that we can use our pyspark model to score the data.

Very often this is straightforward, because json can interpret the schema of our data correctly. However, this is not always the case.  Our usecase is an example where we need to help spark, by explicitly providing the schema when converting the JSON data back to a dataframe.

Let's start with an example to illustrate that.

  **Note**: Explicitly providing the schema of data is generally good practice, because it can speed up reading data and avoids surprises.  This is not only try when working with spark, but also e.g. in *R*  or *scikit-learn*.

In [12]:
keys = ['machineID', 'datetime']
X_keep = ['diff_maint_1', 'diff_error_1', 'volt_sd_3', 'diff_fail_3', 'pressure_ma_3', 'pressure_sd_3', 'diff_fail_1', 'diff_fail_0', 'age', 'vibration_ma_3', 'rotate_ma_3', 'diff_error_2', 'diff_fail_2', 'diff_error_3', 'diff_maint_2', 'volt_ma_3', 'diff_maint_0', 'vibration_sd_3', 'diff_maint_3', 'rotate_sd_3', 'diff_error_0', 'diff_error_4']
Y_keep = ['y_0', 'y_1', 'y_2', 'y_3']

Let's take a very small sample from the data and use it for testing our deployment. We will pretend that this is new data coming in for which we need to obtain predictions.

In [14]:
from pandas import datetime
from pyspark.sql.functions import col, hour

df = spark.read.parquet("dbfs:/FileStore/tables/preprocessed").cache()

df_new = df.sample(fraction = 0.00001) # let's pretent this is new data coming in
display(df_new)

machineID,datetime,age,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3,pressure_ma_3,pressure_sd_3,rotate_ma_3,rotate_sd_3,vibration_ma_3,vibration_sd_3,volt_ma_3,volt_sd_3,y_0,y_1,y_2,y_3
68,2015-06-11T00:00:00.000+0000,10,1100.0,416.0,1002.0,651.0,774.0,3958.0,1674.0,3958.0,3958.0,3834.0,1674.0,954.0,234.0,101.36787575531508,11.663934322580976,448.8813617566855,56.97296052552811,40.15481060046628,8.006951895336918,168.40829768863074,12.103602281291836,0,0,0,0
8,2015-03-23T00:00:00.000+0000,16,1146.0,1843.0,402.0,1129.0,42.0,2038.0,378.0,2038.0,18.0,1458.0,378.0,2038.0,18.0,106.97302399308325,2.962033606220988,439.655044610217,75.2039455934235,40.63333303324509,7.551693348820905,164.63642883764575,7.868736995819589,0,0,0,0
90,2015-02-21T17:00:00.000+0000,2,191.0,1134.0,83.0,497.0,1335.0,1335.0,59.0,1335.0,1335.0,419.0,59.0,1335.0,1335.0,99.20757569871394,10.16646052155249,442.6870849798343,76.8983557007575,34.72684309443692,9.04816005270136,168.88435631214998,8.540623889980933,0,0,0,0
48,2015-09-21T15:00:00.000+0000,10,134.0,5365.0,114.0,145.0,2674.0,321.0,1761.0,6421.0,6421.0,2481.0,321.0,1041.0,2841.0,92.4350711558527,3.061661579095558,461.95082176152505,64.25454416065492,34.20775892481438,5.136393852597367,174.27754872924248,14.49769987622925,0,0,0,0
44,2015-12-22T12:00:00.000+0000,7,2102.0,1454.0,718.0,923.0,1654.0,2790.0,3510.0,8626.0,8626.0,2430.0,270.0,630.0,2790.0,97.06027101219576,7.873252215072386,442.2793943734803,57.61793924427742,40.43819508673082,3.1574629361689515,175.0656889103545,16.989503817984126,0,0,0,0
90,2015-12-30T00:00:00.000+0000,2,760.0,672.0,714.0,1434.0,8806.0,2130.0,690.0,1410.0,8806.0,2130.0,690.0,1050.0,1410.0,92.73861186036191,16.109686427568136,456.5199641724365,30.679369170200037,42.9760341253346,1.9622930363638376,188.8771888444425,17.734110205156426,1,0,0,0
15,2015-12-24T13:00:00.000+0000,14,840.0,978.0,63.0,806.0,1303.0,3439.0,8675.0,8675.0,1279.0,2359.0,199.0,2719.0,1279.0,103.99974832404044,7.832419000976322,459.8489608105112,43.08099178998005,41.448786823157526,7.870970729922127,173.67847480836176,16.503019899383567,0,0,0,0
62,2015-10-09T21:00:00.000+0000,20,234.0,6859.0,1035.0,406.0,3255.0,1431.0,2511.0,6859.0,3231.0,1431.0,2511.0,2151.0,351.0,99.68158799488592,11.204825638525362,461.2238140300205,26.9229047108972,45.4723488587489,2.4661343580114496,183.6003598731925,26.390545550039235,0,0,0,0
23,2015-08-16T18:00:00.000+0000,17,543.0,3126.0,708.0,348.0,2569.0,3204.0,684.0,324.0,3564.0,1044.0,684.0,324.0,2484.0,87.96710674223742,13.558190351863413,480.1268520747572,11.636745448864469,40.74064986005583,6.187781142763204,166.97156410529675,14.697528620778836,0,0,0,0
13,2015-09-15T08:00:00.000+0000,15,5196.0,998.0,497.0,276.0,1994.0,6270.0,890.0,530.0,1970.0,530.0,170.0,1970.0,1250.0,102.6941114250169,10.07416502361119,495.1027615171543,52.39285769838519,35.75264084074375,5.876410127025472,167.985717395481,20.29199416403676,0,0,0,0


Before we proceed, let's make sure that we can run the trained model's pipeline on the data to obtain predictions. We already checked this in previous Notebooks but as a cautionary step, we check it once more.

In [16]:
display(lr_model.transform(df_new).drop("features", "norm_features"))

machineID,datetime,age,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3,pressure_ma_3,pressure_sd_3,rotate_ma_3,rotate_sd_3,vibration_ma_3,vibration_sd_3,volt_ma_3,volt_sd_3,y_0,y_1,y_2,y_3,rawPrediction,probability,prediction
34,2015-01-07T09:00:00.000+0000,10,247.0,247.0,247.0,247.0,247.0,247.0,247.0,247.0,247.0,247.0,147.0,247.0,247.0,103.99020776871572,11.829439310781217,433.9843170202576,89.91995378170073,45.17059553083417,2.3727341754490645,171.00677008930924,20.57081112324662,0,0,0,0,"List(1, 2, List(), List(4.245327312275322, -4.245327312275322))","List(1, 2, List(), List(0.9858714348775504, 0.01412856512244956))",0.0
8,2015-12-07T14:00:00.000+0000,16,5118.0,436.0,448.0,1400.0,822.0,8268.0,5528.0,8268.0,1208.0,1568.0,2648.0,128.0,848.0,104.32530757372224,2.8505601258311,449.311138225812,25.825460569795798,40.3556466827378,8.665288196430602,172.62070348407275,11.209223804760786,0,0,0,0,"List(1, 2, List(), List(4.34346344287238, -4.34346344287238))","List(1, 2, List(), List(0.9871751583219684, 0.01282484167803171))",0.0
73,2015-08-05T08:00:00.000+0000,20,506.0,808.0,3392.0,549.0,2666.0,482.0,5165.0,2282.0,2642.0,122.0,5286.0,842.0,2642.0,98.9153337635842,4.820750051673996,482.309840781884,46.825840454363686,39.3530766721399,2.6453299178837626,172.83360694287325,17.61381612638803,0,0,0,0,"List(1, 2, List(), List(4.235966827641628, -4.235966827641628))","List(1, 2, List(), List(0.9857404584431667, 0.014259541556833443))",0.0
9,2015-05-19T03:00:00.000+0000,7,479.0,2143.0,1387.0,1150.0,927.0,1821.0,3409.0,3409.0,3409.0,21.0,2181.0,1821.0,741.0,127.16780978416053,13.018887062023817,405.4229199955832,44.13352710008984,39.5170714069431,5.361516286464295,169.80185427994627,19.754616192279617,0,0,0,0,"List(1, 2, List(), List(4.3961560366742125, -4.3961560366742125))","List(1, 2, List(), List(0.9878254227227465, 0.012174577277253388))",0.0
9,2015-05-26T08:00:00.000+0000,7,652.0,2316.0,1560.0,1323.0,1100.0,1994.0,3582.0,3582.0,3582.0,194.0,2354.0,1994.0,914.0,104.63893688077472,13.01536623100278,469.7518959301863,30.988395284016832,39.781378607069,4.0238045071580855,162.24751754999926,10.314359026530887,0,0,0,0,"List(1, 2, List(), List(4.538732155516334, -4.538732155516334))","List(1, 2, List(), List(0.9894260558295408, 0.010573944170459293))",0.0
59,2015-04-07T10:00:00.000+0000,17,219.0,2264.0,51.0,1634.0,76.0,2287.0,2408.0,2408.0,52.0,412.0,2408.0,1852.0,52.0,95.47720069462405,12.735617812688796,427.1478250158435,33.33579040975496,42.46550520462797,1.8639014159918104,175.374919059263,13.242717446372938,0,0,0,0,"List(1, 2, List(), List(4.20747284660078, -4.20747284660078))","List(1, 2, List(), List(0.985334347775585, 0.014665652224414977))",0.0
77,2015-03-13T10:00:00.000+0000,12,970.0,164.0,1808.0,377.0,1808.0,1808.0,1808.0,1808.0,1808.0,1808.0,748.0,1468.0,28.0,105.49732227514212,10.889062591105324,461.90450702357657,44.349249784241024,37.55828339443645,6.46614679711183,164.99293466940526,8.724637415355184,0,0,0,0,"List(1, 2, List(), List(4.327581035284983, -4.327581035284983))","List(1, 2, List(), List(0.9869725175669309, 0.013027482433069029))",0.0
20,2015-12-25T08:00:00.000+0000,16,266.0,46.0,986.0,1057.0,1952.0,242.0,962.0,1322.0,3122.0,242.0,2042.0,602.0,2762.0,106.97798273877044,20.65181110969677,441.473730102749,72.96570101247417,48.065215585702326,8.958916750757377,168.83536548823375,8.83797295320501,0,0,0,0,"List(1, 2, List(), List(4.34113648296126, -4.34113648296126))","List(1, 2, List(), List(0.9871456647381766, 0.012854335261823419))",0.0
29,2015-02-08T07:00:00.000+0000,3,83.0,347.0,1013.0,299.0,78.0,1013.0,1013.0,1013.0,1013.0,1013.0,889.0,169.0,1013.0,106.15520708847868,10.201045962507989,402.9772347435328,68.04544744655568,37.524402785177,7.722355287746372,159.984346055672,17.287060587185003,0,0,0,0,"List(1, 2, List(), List(4.422188454502734, -4.422188454502734))","List(1, 2, List(), List(0.98813455466243, 0.011865445337569927))",0.0
29,2015-02-12T11:00:00.000+0000,3,183.0,447.0,1113.0,17.0,178.0,1113.0,1113.0,1113.0,1113.0,1113.0,989.0,269.0,1113.0,98.16680263060478,5.547111819192368,471.4771493105455,23.748086301132044,37.470690738764176,6.157130888224884,168.0748903139995,18.287681958073843,0,0,0,0,"List(1, 2, List(), List(4.214649406680703, -4.214649406680703))","List(1, 2, List(), List(0.9854376927682456, 0.014562307231754525))",0.0


Now we convert the "new data" from above to JSON, because in practice, this is the format that new data comes in. This is not necessarily data that is pretty to look at. All the more so because Python adds escape characters to quotes and curly braces. This is because it's just one long Python string here thet represents JSON data. With a few string replacements, we can clean out the escape characters and print the more properly formatted data. You can copy the below output into https://jsonformatter.org/ or some similar site to validate it and properly display it.

In [18]:
import json

df_new_str = json.dumps(df_new.toJSON().collect())
df_new_str = df_new_str.replace('\\', '').replace("\"{", '{').replace("}\"", '}')
print(df_new_str)

Let's now go in the oppsite direction: we reconstruct the original data from the JSON version we converted it into, so we can see what happens when the data is coming into a Spark application. Since Spark is a distributed framework, Spark needs to turn the data into a DataFrame (a distributed), its native and efficient data format.

In [20]:
df_new_json = json.loads(df_new_str)
df_new_rdd = sc.parallelize(df_new_json)
df_new_from_json = spark.read.json(df_new_rdd)
display(df_new_from_json)

age,datetime,diff_error_0,diff_error_1,diff_error_2,diff_error_3,diff_error_4,diff_fail_0,diff_fail_1,diff_fail_2,diff_fail_3,diff_maint_0,diff_maint_1,diff_maint_2,diff_maint_3,machineID,pressure_ma_3,pressure_sd_3,rotate_ma_3,rotate_sd_3,vibration_ma_3,vibration_sd_3,volt_ma_3,volt_sd_3,y_0,y_1,y_2,y_3
10,2015-06-11T00:00:00.000Z,1100.0,416.0,1002.0,651.0,774.0,3958.0,1674.0,3958.0,3958.0,3834.0,1674.0,954.0,234.0,68,101.36787575531508,11.663934322580976,448.8813617566855,56.97296052552811,40.15481060046628,8.006951895336918,168.40829768863074,12.103602281291836,0,0,0,0
16,2015-03-23T00:00:00.000Z,1146.0,1843.0,402.0,1129.0,42.0,2038.0,378.0,2038.0,18.0,1458.0,378.0,2038.0,18.0,8,106.97302399308325,2.962033606220988,439.655044610217,75.2039455934235,40.63333303324509,7.551693348820905,164.63642883764575,7.868736995819589,0,0,0,0
2,2015-02-21T17:00:00.000Z,191.0,1134.0,83.0,497.0,1335.0,1335.0,59.0,1335.0,1335.0,419.0,59.0,1335.0,1335.0,90,99.20757569871394,10.16646052155249,442.6870849798343,76.8983557007575,34.72684309443692,9.04816005270136,168.88435631214998,8.540623889980933,0,0,0,0
10,2015-09-21T15:00:00.000Z,134.0,5365.0,114.0,145.0,2674.0,321.0,1761.0,6421.0,6421.0,2481.0,321.0,1041.0,2841.0,48,92.4350711558527,3.061661579095558,461.95082176152505,64.25454416065492,34.20775892481438,5.136393852597367,174.27754872924248,14.49769987622925,0,0,0,0
7,2015-12-22T12:00:00.000Z,2102.0,1454.0,718.0,923.0,1654.0,2790.0,3510.0,8626.0,8626.0,2430.0,270.0,630.0,2790.0,44,97.06027101219576,7.873252215072386,442.2793943734803,57.61793924427742,40.43819508673082,3.1574629361689515,175.0656889103545,16.989503817984126,0,0,0,0
2,2015-12-30T00:00:00.000Z,760.0,672.0,714.0,1434.0,8806.0,2130.0,690.0,1410.0,8806.0,2130.0,690.0,1050.0,1410.0,90,92.73861186036191,16.109686427568136,456.5199641724365,30.679369170200037,42.9760341253346,1.9622930363638376,188.8771888444425,17.734110205156426,1,0,0,0
14,2015-12-24T13:00:00.000Z,840.0,978.0,63.0,806.0,1303.0,3439.0,8675.0,8675.0,1279.0,2359.0,199.0,2719.0,1279.0,15,103.99974832404044,7.832419000976322,459.8489608105112,43.08099178998005,41.448786823157526,7.870970729922127,173.67847480836176,16.503019899383567,0,0,0,0
20,2015-10-09T21:00:00.000Z,234.0,6859.0,1035.0,406.0,3255.0,1431.0,2511.0,6859.0,3231.0,1431.0,2511.0,2151.0,351.0,62,99.68158799488592,11.204825638525362,461.2238140300205,26.9229047108972,45.4723488587489,2.4661343580114496,183.6003598731925,26.390545550039235,0,0,0,0
17,2015-08-16T18:00:00.000Z,543.0,3126.0,708.0,348.0,2569.0,3204.0,684.0,324.0,3564.0,1044.0,684.0,324.0,2484.0,23,87.96710674223742,13.558190351863413,480.1268520747572,11.636745448864469,40.74064986005583,6.187781142763204,166.97156410529675,14.697528620778836,0,0,0,0
15,2015-09-15T08:00:00.000Z,5196.0,998.0,497.0,276.0,1994.0,6270.0,890.0,530.0,1970.0,530.0,170.0,1970.0,1250.0,13,102.6941114250169,10.07416502361119,495.1027615171543,52.39285769838519,35.75264084074375,5.876410127025472,167.985717395481,20.29199416403676,0,0,0,0


So it seems like everything worked! We were able to convert the "new data" from a DataFrame to JSON and back to a DataFrame. Not so fast though: Let's also check our column types. Here's one way we can compare the schema for the original data `df_new` with the reconstructed data `df_new_from_json`:

In [22]:
print("This is the schema of the original data frame:")
df_new.printSchema()

print("This is the schema of our data frame after converting it to/from JSON:")
df_new_from_json.printSchema()

try:
  assert(df_new.schema == df_new_from_json.schema)
except AssertionError:
  print("Sadly, the schemas of the two data frames are not the same.")

## Hands-on Lab

Help spark by explicitly providing the schema when reading the JSON data.

This requires several parts:
1. Create a schema definition that spark can use when reading the JSON data, so that it matches the original training data
2. Tell spark to use that schema definition when reading the JSON data

Manually comparing the schemas is a bit tedious. Also, technically, we only care that the schemas match for those columns that the model needs for predicting (referred to as "features"), not any other columns in the data. So we can let Python do the search for us and return any column mismatches it finds. Write such a Python script.

In [25]:
# put your solution here

# here are some things that can help
print(X_keep) # columns needed to make a prediction
print(df_new.schema) # how to print the schema of a DataFrame
print(df_new.schema['machineID']) # how to print the schema for just one column in a DataFrame
print(df_new.columns) # how to get a list of column names in a DataFrame

# now write a Python script that prints out the name of a feature when its schema doesn't match between the two datasets

In [26]:
# expand this cell to view the solution

good_schema = df_new.schema
bad_schema = df_new_from_json.schema

["For column `{}` the schema on the training data is {}, but schema on the JSON data is {}."
 .format(var, str(good_schema[var]), str(bad_schema[var]))
 for var in df_new.columns # loop over the columns
 if var in X_keep # only consider the features (columns needed for prediction)
 and df_new.schema[var] != df_new_from_json.schema[var]] # if there's a mismatch in the schema report it

You can use the search function of the pyspark API [documentation](https://spark.apache.org/docs/latest/api/python/index.html) to find the location of most of the definitions of these classes. To make things easy, we created a schema object in the next cell and formatted it neatly. However, this schema matches what we see in the data reconstructed from JSON, and we need to change it so it conforms to the schema of the data used for training. There are two things you need to do:

1. Load the required library and functions (see the documentation link we provided).
2. Based on the results of the last cell, go to any columns where there's a mismatch and fix it.

Note: Schema objects like the one below can also be created from JSON templates. In general, since we need to be over-cautious with production code, it's a good idea to use such templates for loading schemas.

In [28]:
# uncomment and modify this script
# load the required library and functions

# myschema = StructType([
#     StructField("datetime",StringType()),
#     StructField("machineID",LongType()),
#     StructField("age",LongType()),
#     StructField("diff_error_0",DoubleType()),
#     StructField("diff_error_1",DoubleType()),
#     StructField("diff_error_2",DoubleType()),
#     StructField("diff_error_3",DoubleType()),
#     StructField("diff_error_4",DoubleType()),
#     StructField("diff_fail_0",DoubleType()),
#     StructField("diff_fail_1",DoubleType()),
#     StructField("diff_fail_2",DoubleType()),
#     StructField("diff_fail_3",DoubleType()),
#     StructField("diff_maint_0",DoubleType()),
#     StructField("diff_maint_1",DoubleType()),
#     StructField("diff_maint_2",DoubleType()),
#     StructField("diff_maint_3",DoubleType()),
#     StructField("pressure_ma_3",DoubleType()),
#     StructField("pressure_sd_3",DoubleType()),
#     StructField("rotate_ma_3",DoubleType()),
#     StructField("rotate_sd_3",DoubleType()),
#     StructField("vibration_ma_3",DoubleType()),
#     StructField("vibration_sd_3",DoubleType()),
#     StructField("volt_ma_3",DoubleType()),
#     StructField("volt_sd_3",DoubleType()),
#     StructField("y_0",LongType()),
#     StructField("y_1",LongType()),
#     StructField("y_2",LongType()),
#     StructField("y_3",LongType())
# ])

In [29]:
# expand this cell to view the solution
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DoubleType, LongType

myschema = StructType([
    StructField("datetime",StringType()),
    StructField("machineID",LongType()),
    StructField("age",IntegerType()),
    StructField("diff_error_0",DoubleType()),
    StructField("diff_error_1",DoubleType()),
    StructField("diff_error_2",DoubleType()),
    StructField("diff_error_3",DoubleType()),
    StructField("diff_error_4",DoubleType()),
    StructField("diff_fail_0",DoubleType()),
    StructField("diff_fail_1",DoubleType()),
    StructField("diff_fail_2",DoubleType()),
    StructField("diff_fail_3",DoubleType()),
    StructField("diff_maint_0",DoubleType()),
    StructField("diff_maint_1",DoubleType()),
    StructField("diff_maint_2",DoubleType()),
    StructField("diff_maint_3",DoubleType()),
    StructField("pressure_ma_3",DoubleType()),
    StructField("pressure_sd_3",DoubleType()),
    StructField("rotate_ma_3",DoubleType()),
    StructField("rotate_sd_3",DoubleType()),
    StructField("vibration_ma_3",DoubleType()),
    StructField("vibration_sd_3",DoubleType()),
    StructField("volt_ma_3",DoubleType()),
    StructField("volt_sd_3",DoubleType()),
    StructField("y_0",IntegerType()),
    StructField("y_1",IntegerType()),
    StructField("y_2",IntegerType()),
    StructField("y_3",IntegerType())
])

Now that you were able to define the schema, tell spark to use it when reading the JSON data.

Instead of simply writing `spark.read.json(input_rdd)`, tell spark to use your schema while reading the data.

Use this [documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=spark%20read%20schema#pyspark.sql.DataFrameReader.schema) for some hint on how to do this.

In [31]:
# modify this script

df_new_json = json.loads(df_new_str)
df_new_rdd = sc.parallelize(df_new_json)
df_new_from_json = spark.read.json(df_new_rdd) # TODO: modify this line 

In [32]:
# expand this cell to view the solution

df_new_json = json.loads(df_new_str)
df_new_rdd = sc.parallelize(df_new_json)
df_new_from_json = spark.read.schema(myschema).json(df_new_rdd)

Now, let's see whether you were successful.

In [34]:
good_schema = df_new.schema
bad_schema = df_new_from_json.schema
assert(len([ _ for var in df_new.columns if var in X_keep and good_schema[var] != bad_schema[var]]) == 0)

## End of lab

## Create a score file

The next step of creating a web service is to define a score script that defines what the webservice does.

A typical score script has two methods defined:
- `init` is executed once, when the webservice is started
- `run` is executed everytime a user is interacting with the webservice to score data

Look at this score script below, can you see where we made the changes that are related to explicitly providing the schema when reading JSON data?

There are several places:
1. Importing the modules for defining the schema
1. Defining a global variable for holding the schema
1. Defining the schema
1. Using the schema when reading the data

In [37]:
score_sparkml = """
import json

def init():
    # One-time initialization of PySpark and predictive model
    import pyspark
    from azureml.core.model import Model
    from pyspark.ml import PipelineModel
    from pyspark.sql.types import StructField, StructType, IntegerType
    from pyspark.ml.linalg import VectorUDT

    global trainedModel
    global spark
    global schema
    
    spark = pyspark.sql.SparkSession.builder.appName("ADB and AML notebook by an amazing data scientist").getOrCreate()
    model_name = "{model_name}" #interpolated
    model_path = Model.get_model_path(model_name)
    trainedModel = PipelineModel.load(model_path)
    
    from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DoubleType, LongType

    schema = StructType([
        StructField("datetime",StringType()),
        StructField("machineID",LongType()),
        StructField("age",IntegerType()),
        StructField("diff_error_0",DoubleType()),
        StructField("diff_error_1",DoubleType()),
        StructField("diff_error_2",DoubleType()),
        StructField("diff_error_3",DoubleType()),
        StructField("diff_error_4",DoubleType()),
        StructField("diff_fail_0",DoubleType()),
        StructField("diff_fail_1",DoubleType()),
        StructField("diff_fail_2",DoubleType()),
        StructField("diff_fail_3",DoubleType()),
        StructField("diff_maint_0",DoubleType()),
        StructField("diff_maint_1",DoubleType()),
        StructField("diff_maint_2",DoubleType()),
        StructField("diff_maint_3",DoubleType()),
        StructField("pressure_ma_3",DoubleType()),
        StructField("pressure_sd_3",DoubleType()),
        StructField("rotate_ma_3",DoubleType()),
        StructField("rotate_sd_3",DoubleType()),
        StructField("vibration_ma_3",DoubleType()),
        StructField("vibration_sd_3",DoubleType()),
        StructField("volt_ma_3",DoubleType()),
        StructField("volt_sd_3",DoubleType()),
        StructField("y_0",IntegerType()),
        StructField("y_1",IntegerType()),
        StructField("y_2",IntegerType()),
        StructField("y_3",IntegerType())
    ])
    
def run(input_json):
    if isinstance(trainedModel, Exception):
        return json.dumps({{"trainedModel":str(trainedModel)}})
      
    try:
        sc = spark.sparkContext
        input_list = json.loads(input_json)
        input_rdd = sc.parallelize(input_list)
        input_df = spark.read.schema(schema).json(input_rdd)
        
        # Compute prediction
        prediction = trainedModel.transform(input_df)
        #result = prediction.first().prediction
        predictions = prediction.collect()

        #Get each scored result
        preds = [str(x['prediction']) for x in predictions]
        result = ",".join(preds)
        # you can return any data type as long as it is JSON-serializable
        return json.dumps({{"result":result}})        
    except Exception as e:
        result = str(e)
        return json.dumps({{"error":result}})
    
""".format(model_name=model_name)

exec(score_sparkml)

with open("score_sparkml.py", "w") as file:
    file.write(score_sparkml)

You can run this script to view the file and make sure it was created. Notice that `{model_name}` was a placeholder in the above script, meaning that in the actual file that was created `model_name` was replaced by the Python string it evaluates to.

In [39]:
%sh cat score_sparkml.py

Creating a webservice requires creating a docker container in which to run our score script. 

This can all be done with the python AML sdk. 

First we create a conda environment, which makes sure that all the python dependencies are installed in the docker container.  Then we create the container.

In [41]:
from azureml.core.conda_dependencies import CondaDependencies 

myacienv = CondaDependencies.create(conda_packages=['scikit-learn','numpy','pandas']) #showing how to add libs as an example - not needed for this model.

with open("mydeployenv.yml","w") as f:
    f.write(myacienv.serialize_to_string())

In [42]:
with open("mydeployenv.yml","r") as f:
  print(f.read())

The registered model can be downloaded and passed to the scoring script along with new data. The scoring script is the code we need to execute to get the model's prediction on the new data. The Conda environment is the Python runtime in which this code will execute. We can now package up all of the above into a Docker image.

In [44]:
# this will take 5 minutes to finish

service_name = "myaci"
image_name = 'myimage'
runtime = "spark-py" 
driver_file = "score_sparkml.py"
my_conda_file = "mydeployenv.yml"

# image creation
from azureml.core.image import ContainerImage
myimage_config = ContainerImage.image_configuration(execution_script = driver_file, 
                                    runtime = runtime, 
                                    conda_file = my_conda_file)

# Create container Image
myimage = ContainerImage.create(
  workspace=ws, 
  name=image_name,
  models = [mymodel],
  image_config = myimage_config)

myimage.wait_for_creation(show_output=True)

In [45]:
help(ContainerImage)

Now we create the actual webservice, using the Docker image that is stored in the Azure Container Registry. 

Before you continue, try to find your container image in the Azure portal.

In [47]:
# deploy to ACI
from azureml.core.webservice import AciWebservice, Webservice

myaci_config = AciWebservice.deploy_configuration(
    cpu_cores = 2, 
    memory_gb = 2, 
    tags = {'name':'Databricks Azure ML ACI'}, 
    description = 'This is for ADB and AML example. Azure Databricks & Azure ML SDK demo with ACI.',
    location='westus2')

# webservice creation
myservice = Webservice.deploy_from_image(
  workspace=ws, 
  name=service_name,
  image=myimage,
  deployment_config = myaci_config)

myservice.wait_for_deployment(show_output=True)

Let's see what we created above. Here is a summary.

In [49]:
print(myservice.serialize())

You can also print individual properties of your webservice, for example the URL used by the webservice.

In [51]:
#for using the Web HTTP API 
print(myservice.scoring_uri)

## Test Webservice

Time to put the service to the test by passing it data. We will pass it `df_new_str` which is what we obtained earlier by converting the `df_new` into a JSON string. If everything worked well, here's what should happen:

1. The string is passed to the scoring script where it is converted into a JSON object, and from there into a Spark DataFrame.
2. The trained model is loaded and the `transform` method is called to obtain predictions for the Spark DataFrame.
3. The predictions are re-packaged into a json object and returned to us as a string.

In [54]:
# We can use the test_json data we created above. 
myservice.run(input_data = df_new_str)

## Testing against a REST client (optional)

We were able to successfully test the scoring script from Python by calling the `run` method on it. But what happened in the background? We executed a REST call to the web application. Let's now re-do this from a REST client to confirm that everything works.

1. Install a REST client for your browser such as *RESTClient, a debugger for RESTful web services.* (Firefox)
2. In the section called **Request Headers** set Name to **Content-Type** and Attribute Value to **application/json**.
3. Copy the content of `df_new_str` to https://jsonformatter.org/ and validate it to make sure it's clean.
4. Paste the clean json into the section of the REST client called **Body**, set method to **POST** and add the API address (what we get when we run `myservice.scoring_uri`) to the section called **URL**.
5. Finally, hit **SEND** and scroll down to see your results in the tab called **Response**.
6. Note how you can see the original POST call, which looks like `curl -X POST -H 'Content-Type: application/json' -i 'http://<SCORING_URI>:80/score' --data ...`.

**ATTENTION:** To avoid un-necessary charges, we run the command below to delete the scoring service. Do not run this cell if you do not wish to delete the scoring service at this moment.

In [57]:
# comment out the below line to not delete the web service
myservice.delete()

In [58]:
assert isinstance(myservice, azureml.core.webservice.aci.AciWebservice)

Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.