# load python
library(reticulate)
use_python('C:/Users/Andrew/Anaconda3/')
use_condaenv(condaenv='my`_ml', required=TRUE)
library(knitr)

Introduction

Azure Machine Learning is an enterprise-grade machine learning service that builds and deploy models. In this guide, we will go through an end-to-end process that: (1) instantiates a work and compute space, (2) loads a tabular data set for prediction, (3) runs a single experiment, (4) scales a hyperparameter search using multiple VMs with HyperDrive, (5) deploys a model for inference to the web, and (6) show how to send new input data and retrieve predictions.

Instantiate Work and Compute Space

# required AML packages
from azureml.core import Workspace, Datastore, Dataset, Experiment, ScriptRunConfig
from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core.runconfig import RunConfiguration, DEFAULT_CPU_IMAGE
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
from azureml.core.environment import Environment
from azureml.train.hyperdrive import BayesianParameterSampling, HyperDriveConfig, PrimaryMetricGoal, choice, uniform
from azureml.core.webservice import AciWebservice
from azureml.core.model import Model, InferenceConfig

# python data
from platform import python_version
print(python_version())

# python packages
import os
import uuid
import requests
import pandas as pd
import numpy as np
import json

# environment data
from dotenv import load_dotenv  # pip install python-dotenv
load_dotenv('.env') # load .env file with sp info
# instantiate the service principal
sp = ServicePrincipalAuthentication(tenant_id=os.environ['AML_TENANT_ID'],
                                    service_principal_id=os.environ['AML_PRINCIPAL_ID'],
                                    service_principal_password=os.environ['AML_PRINCIPAL_PASS'])
# instantiate a workspace
ws = Workspace(subscription_id = "2c3b88a1-7aa0-4107-b413-d4c701e0afc8",
               resource_group = "rg_chie_training",
               auth=sp,  # use service principal auth
               workspace_name = "training_aml")

print("Found workspace {} at location {}".format(ws.name, ws.location))
# choose a name for your CPU cluster
compute_name = "hypercluster-cpu"

# verify that cluster does not exist already
if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size='Standard_DS3_v2',
                                                                idle_seconds_before_scaledown='400',
                                                                min_nodes=0,
                                                                max_nodes=10)
    # create the cluster
    compute_target = ComputeTarget.create(
        ws, compute_name, provisioning_config)

    # can poll for a minimum number of nodes and for a specific timeout.
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(
        show_output=True, min_node_count=None, timeout_in_minutes=20)

Generate Environment

# generate an environment with necessary packages
env = Environment('titanic-env')
cd = CondaDependencies.create(
    conda_packages=['pip', 'pandas', 'numpy', 'python==3.6'], 
    pip_packages=['azureml-core', 'azureml-sdk', 'azureml-dataset-runtime[pandas,fuse]', 'scikit-learn', 'azureml-defaults'])

# attach dependencies to the environment
env.python.conda_dependencies = cd

# register environment to re-use later
env.register(workspace=ws);

Load Tabular Data

# take a look at available datastores
ws.datastores
# place data on the blob datastore
ws.datastores['blob_datastore'].upload_files(files=['data/Titanic.csv'], # upload Titanic
                       target_path='data/', # set folder path on data store
                       overwrite=True,
                       show_progress=True)
# check the registration
ws.datasets
# look at the first three rows
titanic_ds.take(3).to_pandas_dataframe()

Prepare Single Experiment

# create central ScriptRunConfig
src = ScriptRunConfig(source_directory='scripts/',
                      script='train.py', 
                      arguments=['--regularization', 0.01,  # L2 default
                                 '--dataset', titanic_ds.as_named_input('titanic_ds')],
                      compute_target=compute_name,
                      environment=env)
%%writefile scripts/train.py
# create a training script for ML
import argparse
import os
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
import joblib

from azureml.core import Run


# generate args
def arg_parsing():
    # init argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--regularization', type=float, default=0.01, help='L2')
    parser.add_argument("--dataset", type=str, help='dataset')
    args = parser.parse_args()
    return args

def split_prepare(dataset):
    # trim to the vars of interest
    dataset = dataset.loc[:, ~dataset.columns.isin(['PassengerId', 'Name', 'Ticket', 'Cabin'])]

    # drop nan
    dataset = dataset.dropna().reset_index(drop=True)

    # prepare X, y
    y = dataset['Survived'].values
    X = dataset.loc[:, dataset.columns.isin(['Pclass', 'Age', 'SibSp', 'Parch', 'Fare'])]
    X_cats = pd.get_dummies(dataset[['Sex', 'Embarked']], drop_first=True)  # one-hot; drop the base
    X = pd.concat([X, X_cats], axis=1).values  # combine X again

    # create a train and test set
    train_test_set = train_test_split(X, y, test_size=0.15, random_state=333)
    
    return train_test_set

def main(arg_parsing):
    # get_context communicates to containers where metrics, files (artifacts), and models are contained
    run = Run.get_context()  # a run is a single trial
    # access input data
    titanic_ds = run.input_datasets["titanic_ds"].to_pandas_dataframe()
    # send to train-test split function
    X_train, X_test, y_train, y_test = split_prepare(titanic_ds)

    # prepare hyperparameter logging
    LAMBDA = arg_parsing.regularization
    run.log('L2', LAMBDA)

    # instantiate logistic regression
    C = 1 / LAMBDA  # C = inverse of lambda
    lr_model = LogisticRegression(solver='liblinear', penalty='l2', C=C).fit(X_train, y_train)

    # get y_hat
    y_hat = lr_model.predict(X_test)
    # calc F1
    f1s = f1_score(y_test, y_hat, average='weighted')
    # also log f1
    run.log('F1', np.float(f1s))

    # where output training data will be placed
    os.makedirs('outputs', exist_ok=True)
    # note file saved in the outputs folder is automatically uploaded into experiment record
    joblib.dump(value=lr_model, filename='outputs/titanic_lr.joblib')

    # an experiment is completed by calling:
    run.complete()

    # END

if __name__ == '__main__':
    args = arg_parsing()
    main(args)

Execute Single Experiment

# instantiate an experiment
experiment = Experiment(workspace=ws, name="titanic-lr-experiment")

# send the job to the cluster
run = experiment.submit(config=src)

# monitor the progress
run.wait_for_completion(show_output=True)

# ensure its done
assert(run.get_status() == "Completed")

View Experiment Results and Register the Model

# get the results
print('Metrics:\n', run.get_metrics(), '\n')

# files associated with the run
print('Run Files:\n', run.get_file_names())
# register model so we can later query, examine, and deploy this model
run.register_model(model_name='sklearn_lr_titanic',
                           model_path='outputs/titanic_lr.joblib',  # look at .get_file_names() method
                           tags={'Training context':'Run', 'F1': run.get_metrics()['F1']})  # tag info

HyperDrive

# sample L2 values
params = BayesianParameterSampling(
    {
        # uniform distribution from which samples are taken
        '--regularization': uniform(0.01, 5.0),
    }
)

# hyperdrive settings
hyperdriven = HyperDriveConfig(run_config=src,  # src from above
                          hyperparameter_sampling=params, # params to sample
                          policy=None, # early stopping policy
                          primary_metric_name='F1', # use a logged metric
                          primary_metric_goal=PrimaryMetricGoal.MAXIMIZE,  # maximize f1
                          max_total_runs=25,  # total number of runs
                          max_concurrent_runs=10)  # run on all 10 nodes

HyperDrive Experiment

# instantiate an experiment
experiment = Experiment(workspace=ws, name="titanic-lr-hyperdrive-experiment")

# send the job to the cluster
run = experiment.submit(config=hyperdriven)

# monitor the progress
run.wait_for_completion(show_output=True)

# ensure its done
assert(run.get_status() == "Completed")

HyperDrive Experiment: Results

# get the best result
best_run = run.get_best_run_by_primary_metric()
best_run_metrics = best_run.get_metrics()
# get the results
print('Metrics:\n', best_run.get_metrics(), '\n')

# files associated with the run
print('Run Files:\n', best_run.get_file_names())
# show hyperdrive results in notebook
for child_run in run.get_children_sorted_by_primary_metric():
    print(child_run)
# register the best_run hyperdrive model so we can later query, examine, and deploy this model
best_run.register_model(model_name='titanic-lr-hyperdrive-experiment',
                           model_path='outputs/titanic_lr.joblib',  # look at .get_file_names() method
                           tags={'Training context':'Hyperdrive', 'F1': best_run_metrics['F1']})  # tag info
# look at registered models
ws.models

Deploy a Model for Inference

%%writefile scripts/inference.py
# create a inference script for ML
import os
import json
import joblib
import numpy as np
from azureml.core.model import Model


def init():
    global model
    # AZUREML_MODEL_DIR is an environment variable created during deployment
    model_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"), "/var/azureml-app/azureml-models/titanic-lr-hyperdrive-experiment/5/titanic_lr.joblib")
    model = joblib.load(model_path)

def run(raw_data):
    # receive the input data as np.array
    data = np.array(json.loads(raw_data)["data"])
    # check method
    method = json.loads(raw_data)["method"]
    # get labels
    if method == 'predict':
        # make prediction
        y_hat = model.predict(data)
        # generate some class labels
        classnames = ['Died', 'Survived']
        predicted_classes = []
        for pred in y_hat:
            predicted_classes.append(classnames[pred])
        return predicted_classes

    # predicted probabilities
    if method == 'predict_proba':
        # make prediction
        y_hat = model.predict_proba(data)
        # round
        y_hat = np.round(y_hat, decimals=4)
        return y_hat.tolist()
# create a light deployment cluster
aciconfig = AciWebservice.deploy_configuration(
    cpu_cores=1,
    memory_gb=1,
    tags={"data": "Titanic", "method": "sklearn-lr"},
    description="Predict survival with logistic regression",
)

# create an inference config i.e. the scoring script and environment
inference_config = InferenceConfig(entry_script="scripts/inference.py", environment=env)

# get the registered model
deploying_model = Model(ws, "titanic-lr-hyperdrive-experiment")

# deploy the service
service_name = "sklearn-titanic-lr-hyper" + str(uuid.uuid4())[:4]
service = Model.deploy(
    workspace=ws,
    name=service_name,
    models=[deploying_model],
    inference_config=inference_config,
    deployment_config=aciconfig,
    overwrite=True
)

service.wait_for_deployment(show_output=True)  # check status on left menu; Endpoints

Generate Test Data for Inference Model

import pandas as pd
import numpy as np
import json
# get a row its seen probably already to test functionality
titanic_pd = titanic_ds.to_pandas_dataframe()
# trim to the vars of interest
titanic_pd = titanic_pd.loc[:, ~titanic_pd.columns.isin(['PassengerId', 'Name', 'Ticket', 'Cabin'])]
# drop nan
titanic_pd = titanic_pd.dropna().reset_index(drop=True)
# get y
y = titanic_pd['Survived'].iloc[333:335]
# grab these cols
X = titanic_pd.loc[:, titanic_pd.columns.isin(['Age', 'Fare'])]
# get categorical data
X_cats = pd.get_dummies(titanic_pd[['Sex', 'Embarked', 'Pclass', 'SibSp', 'Parch']], drop_first=True)
# combine the data
X = pd.concat([X, X_cats], axis=1).iloc[333:335]

Send Test Data to Model

# create a payload to predict
input_payload = json.dumps({
    'data': X.values.tolist(),
    'method': 'predict_proba'
    })

output = service.run(input_payload)
print(output)
# create a payload to predict
input_payload = json.dumps({
    'data': X.values.tolist(),
    'method': 'predict'
    })

output = service.run(input_payload)
print(output)
# use HTTP POST
headers = {'Content-Type':'application/json'}
resp = requests.post(url=service.scoring_uri, data=input_payload, headers=headers)
print("POST to url", service.scoring_uri)
print("label:", y)
print("prediction:", resp.text)

Delete the Service

# drop the service
service.delete()