# load python
library(reticulate)
use_python('C:/Users/Andrew/Anaconda3/')
use_condaenv(condaenv='my`_ml', required=TRUE)
library(knitr)
Azure Machine Learning is an enterprise-grade machine learning service that builds and deploy models. In this guide, we will go through an end-to-end process that: (1) instantiates a work and compute space, (2) loads a tabular data set for prediction, (3) runs a single experiment, (4) scales a hyperparameter search using multiple VMs with HyperDrive
, (5) deploys a model for inference to the web, and (6) show how to send new input data and retrieve predictions.
# required AML packages
from azureml.core import Workspace, Datastore, Dataset, Experiment, ScriptRunConfig
from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core.runconfig import RunConfiguration, DEFAULT_CPU_IMAGE
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
from azureml.core.environment import Environment
from azureml.train.hyperdrive import BayesianParameterSampling, HyperDriveConfig, PrimaryMetricGoal, choice, uniform
from azureml.core.webservice import AciWebservice
from azureml.core.model import Model, InferenceConfig
# python data
from platform import python_version
print(python_version())
# python packages
import os
import uuid
import requests
import pandas as pd
import numpy as np
import json
# environment data
from dotenv import load_dotenv # pip install python-dotenv
load_dotenv('.env') # load .env file with sp info
# instantiate the service principal
sp = ServicePrincipalAuthentication(tenant_id=os.environ['AML_TENANT_ID'],
service_principal_id=os.environ['AML_PRINCIPAL_ID'],
service_principal_password=os.environ['AML_PRINCIPAL_PASS'])
# instantiate a workspace
ws = Workspace(subscription_id = "2c3b88a1-7aa0-4107-b413-d4c701e0afc8",
resource_group = "rg_chie_training",
auth=sp, # use service principal auth
workspace_name = "training_aml")
print("Found workspace {} at location {}".format(ws.name, ws.location))
# choose a name for your CPU cluster
compute_name = "hypercluster-cpu"
# verify that cluster does not exist already
if compute_name in ws.compute_targets:
compute_target = ws.compute_targets[compute_name]
if compute_target and type(compute_target) is AmlCompute:
print('found compute target. just use it. ' + compute_name)
else:
print('creating a new compute target...')
provisioning_config = AmlCompute.provisioning_configuration(vm_size='Standard_DS3_v2',
idle_seconds_before_scaledown='400',
min_nodes=0,
max_nodes=10)
# create the cluster
compute_target = ComputeTarget.create(
ws, compute_name, provisioning_config)
# can poll for a minimum number of nodes and for a specific timeout.
# if no min node count is provided it will use the scale settings for the cluster
compute_target.wait_for_completion(
show_output=True, min_node_count=None, timeout_in_minutes=20)
# generate an environment with necessary packages
env = Environment('titanic-env')
cd = CondaDependencies.create(
conda_packages=['pip', 'pandas', 'numpy', 'python==3.6'],
pip_packages=['azureml-core', 'azureml-sdk', 'azureml-dataset-runtime[pandas,fuse]', 'scikit-learn', 'azureml-defaults'])
# attach dependencies to the environment
env.python.conda_dependencies = cd
# register environment to re-use later
env.register(workspace=ws);
# take a look at available datastores
ws.datastores
# place data on the blob datastore
ws.datastores['blob_datastore'].upload_files(files=['data/Titanic.csv'], # upload Titanic
target_path='data/', # set folder path on data store
overwrite=True,
show_progress=True)
# check the registration
ws.datasets
# look at the first three rows
titanic_ds.take(3).to_pandas_dataframe()
# create central ScriptRunConfig
src = ScriptRunConfig(source_directory='scripts/',
script='train.py',
arguments=['--regularization', 0.01, # L2 default
'--dataset', titanic_ds.as_named_input('titanic_ds')],
compute_target=compute_name,
environment=env)
%%writefile scripts/train.py
# create a training script for ML
import argparse
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
import joblib
from azureml.core import Run
# generate args
def arg_parsing():
# init argparse
parser = argparse.ArgumentParser()
parser.add_argument('--regularization', type=float, default=0.01, help='L2')
parser.add_argument("--dataset", type=str, help='dataset')
args = parser.parse_args()
return args
def split_prepare(dataset):
# trim to the vars of interest
dataset = dataset.loc[:, ~dataset.columns.isin(['PassengerId', 'Name', 'Ticket', 'Cabin'])]
# drop nan
dataset = dataset.dropna().reset_index(drop=True)
# prepare X, y
y = dataset['Survived'].values
X = dataset.loc[:, dataset.columns.isin(['Pclass', 'Age', 'SibSp', 'Parch', 'Fare'])]
X_cats = pd.get_dummies(dataset[['Sex', 'Embarked']], drop_first=True) # one-hot; drop the base
X = pd.concat([X, X_cats], axis=1).values # combine X again
# create a train and test set
train_test_set = train_test_split(X, y, test_size=0.15, random_state=333)
return train_test_set
def main(arg_parsing):
# get_context communicates to containers where metrics, files (artifacts), and models are contained
run = Run.get_context() # a run is a single trial
# access input data
titanic_ds = run.input_datasets["titanic_ds"].to_pandas_dataframe()
# send to train-test split function
X_train, X_test, y_train, y_test = split_prepare(titanic_ds)
# prepare hyperparameter logging
LAMBDA = arg_parsing.regularization
run.log('L2', LAMBDA)
# instantiate logistic regression
C = 1 / LAMBDA # C = inverse of lambda
lr_model = LogisticRegression(solver='liblinear', penalty='l2', C=C).fit(X_train, y_train)
# get y_hat
y_hat = lr_model.predict(X_test)
# calc F1
f1s = f1_score(y_test, y_hat, average='weighted')
# also log f1
run.log('F1', np.float(f1s))
# where output training data will be placed
os.makedirs('outputs', exist_ok=True)
# note file saved in the outputs folder is automatically uploaded into experiment record
joblib.dump(value=lr_model, filename='outputs/titanic_lr.joblib')
# an experiment is completed by calling:
run.complete()
# END
if __name__ == '__main__':
args = arg_parsing()
main(args)
# instantiate an experiment
experiment = Experiment(workspace=ws, name="titanic-lr-experiment")
# send the job to the cluster
run = experiment.submit(config=src)
# monitor the progress
run.wait_for_completion(show_output=True)
# ensure its done
assert(run.get_status() == "Completed")
# get the results
print('Metrics:\n', run.get_metrics(), '\n')
# files associated with the run
print('Run Files:\n', run.get_file_names())
# register model so we can later query, examine, and deploy this model
run.register_model(model_name='sklearn_lr_titanic',
model_path='outputs/titanic_lr.joblib', # look at .get_file_names() method
tags={'Training context':'Run', 'F1': run.get_metrics()['F1']}) # tag info
# sample L2 values
params = BayesianParameterSampling(
{
# uniform distribution from which samples are taken
'--regularization': uniform(0.01, 5.0),
}
)
# hyperdrive settings
hyperdriven = HyperDriveConfig(run_config=src, # src from above
hyperparameter_sampling=params, # params to sample
policy=None, # early stopping policy
primary_metric_name='F1', # use a logged metric
primary_metric_goal=PrimaryMetricGoal.MAXIMIZE, # maximize f1
max_total_runs=25, # total number of runs
max_concurrent_runs=10) # run on all 10 nodes
# instantiate an experiment
experiment = Experiment(workspace=ws, name="titanic-lr-hyperdrive-experiment")
# send the job to the cluster
run = experiment.submit(config=hyperdriven)
# monitor the progress
run.wait_for_completion(show_output=True)
# ensure its done
assert(run.get_status() == "Completed")
# get the best result
best_run = run.get_best_run_by_primary_metric()
best_run_metrics = best_run.get_metrics()
# get the results
print('Metrics:\n', best_run.get_metrics(), '\n')
# files associated with the run
print('Run Files:\n', best_run.get_file_names())
# show hyperdrive results in notebook
for child_run in run.get_children_sorted_by_primary_metric():
print(child_run)
# register the best_run hyperdrive model so we can later query, examine, and deploy this model
best_run.register_model(model_name='titanic-lr-hyperdrive-experiment',
model_path='outputs/titanic_lr.joblib', # look at .get_file_names() method
tags={'Training context':'Hyperdrive', 'F1': best_run_metrics['F1']}) # tag info
# look at registered models
ws.models
%%writefile scripts/inference.py
# create a inference script for ML
import os
import json
import joblib
import numpy as np
from azureml.core.model import Model
def init():
global model
# AZUREML_MODEL_DIR is an environment variable created during deployment
model_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"), "/var/azureml-app/azureml-models/titanic-lr-hyperdrive-experiment/5/titanic_lr.joblib")
model = joblib.load(model_path)
def run(raw_data):
# receive the input data as np.array
data = np.array(json.loads(raw_data)["data"])
# check method
method = json.loads(raw_data)["method"]
# get labels
if method == 'predict':
# make prediction
y_hat = model.predict(data)
# generate some class labels
classnames = ['Died', 'Survived']
predicted_classes = []
for pred in y_hat:
predicted_classes.append(classnames[pred])
return predicted_classes
# predicted probabilities
if method == 'predict_proba':
# make prediction
y_hat = model.predict_proba(data)
# round
y_hat = np.round(y_hat, decimals=4)
return y_hat.tolist()
# create a light deployment cluster
aciconfig = AciWebservice.deploy_configuration(
cpu_cores=1,
memory_gb=1,
tags={"data": "Titanic", "method": "sklearn-lr"},
description="Predict survival with logistic regression",
)
# create an inference config i.e. the scoring script and environment
inference_config = InferenceConfig(entry_script="scripts/inference.py", environment=env)
# get the registered model
deploying_model = Model(ws, "titanic-lr-hyperdrive-experiment")
# deploy the service
service_name = "sklearn-titanic-lr-hyper" + str(uuid.uuid4())[:4]
service = Model.deploy(
workspace=ws,
name=service_name,
models=[deploying_model],
inference_config=inference_config,
deployment_config=aciconfig,
overwrite=True
)
service.wait_for_deployment(show_output=True) # check status on left menu; Endpoints
import pandas as pd
import numpy as np
import json
# get a row its seen probably already to test functionality
titanic_pd = titanic_ds.to_pandas_dataframe()
# trim to the vars of interest
titanic_pd = titanic_pd.loc[:, ~titanic_pd.columns.isin(['PassengerId', 'Name', 'Ticket', 'Cabin'])]
# drop nan
titanic_pd = titanic_pd.dropna().reset_index(drop=True)
# get y
y = titanic_pd['Survived'].iloc[333:335]
# grab these cols
X = titanic_pd.loc[:, titanic_pd.columns.isin(['Age', 'Fare'])]
# get categorical data
X_cats = pd.get_dummies(titanic_pd[['Sex', 'Embarked', 'Pclass', 'SibSp', 'Parch']], drop_first=True)
# combine the data
X = pd.concat([X, X_cats], axis=1).iloc[333:335]
# create a payload to predict
input_payload = json.dumps({
'data': X.values.tolist(),
'method': 'predict_proba'
})
output = service.run(input_payload)
print(output)
# create a payload to predict
input_payload = json.dumps({
'data': X.values.tolist(),
'method': 'predict'
})
output = service.run(input_payload)
print(output)
# use HTTP POST
headers = {'Content-Type':'application/json'}
resp = requests.post(url=service.scoring_uri, data=input_payload, headers=headers)
print("POST to url", service.scoring_uri)
print("label:", y)
print("prediction:", resp.text)
# drop the service
service.delete()