# load python
library(reticulate)
use_python('C:/Users/Andrew/Anaconda3/')
use_condaenv(condaenv='my_ml', required=TRUE)
library(knitr)
Azure Synapse Analytics is a limitless analytics service that brings together data integration, enterprise data warehousing, and big data analytics. It gives you the freedom to query data on your terms, using either serverless or dedicated resources—at scale. Azure Synapse brings these worlds together with a unified experience to ingest, explore, prepare, manage, and serve data for immediate BI and machine learning needs.
In this demo, we will walk through the necessary procedures to:
In order to create an Azure Synapse environment, several preparatory objects are needed:
In the image below, we create the Synapse workspace, storage account, and file system name in one single effort.
We can now launch Azure Synapse Analytics, which looks like the below image:
We will first create a basic SQL server and database with AdventureWorks
data so that we can gain experience setting that up before we extract information from there. When creating the SQL server, ensure that you access the Networking
tab and select: Allow Azure services and resources to access this server
.
Azure makes it easy to load a SQL database with sample data. Once the SQL server is provisioned, we need to create an Azure SQL Database in a similar fashion. Ensure that on the Additional settings
tab, you select Sample
from Use existing data
.
Next, we need to create some specific permissions for Synapse such that it can connect to our SQL data base. This requires that we gain access to our SQL server to execute a few commands. There are two ways to do this:
Set server firewall
. Find your IP address and specify it here; hit Save
. Now, you can select Query editor
on the SQL Database page and login with the username and password you instantiated.Query editor
, and login with Azure Active Directory.Now that we have access to our SQL server, we need to execute a few commands to let our Synapse application have Managed Identity
access to it.
create user [synapseaftemp] from external provider;
exec sp_addrolemember 'db_datareader', 'synapseaftemp';
This should successfully run. Now we are ready to instantiate a link to our SQL database with Azure Synapse.
To use a modern data warehousing technology like a Data Lake, we need to fill it with data. Data Lakes store data of any size, shape, and speed, and do all types of processing and analytics across platforms and languages. Synapse can fill our Data Lake with data from just about any source we can imagine, but to do that, we must setup connections between the sources of our data (e.g., SQL tables) and the sink (our Data Lake) so that we can copy the data and have our own versions for analysis.
Our first task is to connect to an Azure SQL Database which has the commonly known AdventureWorks
data set already loaded. We take the following steps:
Data
tabLinked
tab+
signIntegration dataset
Azure
Azure SQL Database
– the data source we want a connection withaw_tables
+
under Linked Service; as its our first time establishing a connection heresqldatabase_adventureworks
Server name
: awtempserver
Database name
: adventureworks
Authentication type
: Managed Identity
(the SQL commands above gave it this privilege)Import schema
: None
We now have a connection established to our SQL database, select Publish all
!
Now we are ready to retrieve data from our SQL database. We proceed with the following steps:
Integrate
tab on the left of Synapse+
signCopy Data tool
Built-in copy task
Next
Source data store
, select the Connection
: sqldatabase_adventureworks
SalesLT.Address
and SalesLT.Customer
Next
Next
Destination data store
, select Connection
: synapseaftemp-WorkspaceDefaultStorage
(our ADLSGen2), Folder path
: root/RAW/adventure_works
(where the parquet files will be stored), Folder name suffix
: replace .txt
with .parquet
(or the extension of the file you want to use in the next step)File format settings
, select Parquet
from File format
, select Next
aw_datacopy_pipeline
Summary
and select Next
, and then Finish
Now we can take a peak into our data lake and see that our pipeline successfully extracted our tables and placed them into the root/RAW/adventure_works
folder.
Next, lets see how we can access and analyze the information in our data lake in conjunction with AzureML. At this stage, we assume that you have created an AzureML resource, created a compute cluster to use, and created a new notebook.
Next, we need to create a Service Principal. Service Prinicpals manages authentication and comes into handy when we want to automate workflows. This type of authentication decouples the authentication process from any specific user and allows for managed access control. To create a Service Principal, we:
App registrations
new registration
Register
Next, we need to retrieve information of interest from the app registration page:
The secret is obtained by selecting Certificates & secrets
and generating a new client secret
. Save this information, as follows, to a file named .env
.
AML_TENANT_ID=Directory (tenant) ID
AML_PRINCIPAL_ID=Application (client) ID
AML_PRINCIPAL_PASS=Secret
We do this in order to hide away this information from our notebooks. We will store this file on AzureML and load it with the python package python-dotenv
.
First, run these commands in a jupyter notebook:
!pip install --upgrade azureml-core
!pip install --upgrade azureml-pipeline-core
!pip install --upgrade azureml-sdk
!pip install --upgrade azureml-defaults
!pip install python-dotenv
# instantiate the service principal
sp = ServicePrincipalAuthentication(tenant_id=os.environ['AML_TENANT_ID'],
service_principal_id=os.environ['AML_PRINCIPAL_ID'],
service_principal_password=os.environ['AML_PRINCIPAL_PASS'])
# instantiate a workspace
ws = Workspace(subscription_id = "2c3b88a1-7aa0-4107-b413-d4c701e0afc8",
resource_group = "rg_chie_training",
auth=sp, # use service principal auth
workspace_name = "training_aml")
print("Found workspace {} at location {}".format(ws.name, ws.location))
# choose a name for your CPU cluster
cpu_cluster_name = "cpu-cluster"
# Verify that cluster does not exist already
try:
cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
print('Found existing cluster, use it.')
except ComputeTargetException:
compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_DS3_v2',
idle_seconds_before_scaledown='400',
min_nodes=0,
max_nodes=4)
cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
cpu_cluster.wait_for_completion(show_output=True)
# create a new runconfig object
aml_run_config = RunConfiguration()
# use the aml_compute you created above.
aml_run_config.target = cpu_cluster
# set docker base image to the default CPU-based image
aml_run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
# Use conda_dependencies.yml to create a conda environment in the Docker image for execution
aml_run_config.environment.python.user_managed_dependencies = False
# Specify CondaDependencies obj, add necessary packages
aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(
conda_packages=['pip', 'pandas', 'numpy', 'python==3.6'],
pip_packages=['azureml-core==1.31.0', 'azureml-sdk', 'python-dotenv'])
print ("Run configuration created.")
To setup a link to the Synapse ADLSGen2 Data Lake, we need to instantiate the code below and retrieve a bunch of information as follows:
subscription_id
: the subscription ID of the ADLSGen2 account found on the Azure portal -> Containersresource_group
: the resource group that is paying for the ADLSGen2 accountaccount_name
: the name of the storage account associated with the ADLSGen2 found on Azure portal -> Containerstenant_id
: the tenant ID of your organizationclient_id
: the Service Principal client ID. You must go to the Azure portal, click App Registration, and create a new app registration for the ADLSGen2 account. The client ID for this app registration goes here.client_secret
: the secret for the app registration. To create one, go to: Azure Active Directory -> App Registration (for #5 above) -> Certificates & secrets# set ADLS Gen2 storage account alias in AML
adlsgen2_datastore_name = 'my_lake'
# subscription id of ADLS account
subscription_id=os.getenv("ADL_SUBSCRIPTION", "redacted")
# resource group of ADLS account
resource_group=os.getenv("ADL_RESOURCE_GROUP", "redacted")
# ADLS Gen2 account name
account_name=os.getenv("ADLSGEN2_ACCOUNTNAME", "aflake")
# tenant id of service principal -> Azure Active Directory
tenant_id=os.getenv("ADLSGEN2_TENANT", "redacted")
# client id of service principal -> Azure Active Directory -> App Registration
client_id=os.getenv("ADLSGEN2_CLIENTID", "redacted")
# the secret of service principal -> Azure Active Directory -> App Registration -> Certificates & secrets
client_secret=os.getenv("ADLSGEN2_CLIENT_SECRET", "redacted")
adlsgen2_datastore = Datastore.register_azure_data_lake_gen2(workspace=ws,
datastore_name=adlsgen2_datastore_name,
account_name=account_name, # ADLS Gen2 account name
filesystem='root', # ADLS Gen2 filesystem
tenant_id=tenant_id, # tenant id of sp
client_id=client_id, # client id of sp
client_secret=client_secret) # the secret of sp
Notice that the location information below ignores root
, the name of the container for the ADLSGen2 account. This is necessary because we already specify the filesystem above in adlsgen2_datastore
.
# create a Dataset object from parquet files located on the data lake
my_dataset = Dataset.Tabular.from_parquet_files([(adlsgen2_datastore, '/RAW/adventure_works/SalesLTAddress.parquet')])
# location to write pipeline output to
step1_output_data_adlsgen2 = OutputFileDatasetConfig(name="processed_data",
destination=(adlsgen2_datastore, "/RAW/out")
).as_upload(overwrite=True)
Next we instantiate a PythonScriptStep
which is the workhorse of an AzureML pipeline.
# pipeline step 1
step1 = PythonScriptStep(
name="generate_data",
inputs=[my_dataset.as_named_input('raw_data')],
script_name="scripts/do_stuff.py",
arguments=["--save", step1_output_data_adlsgen2],
runconfig=aml_run_config,
allow_reuse=False
)
To do things to our parquet data, we need to prepare our scripts/do_stuff.py
file located in AzureML very carefully. Below is an minimally working example:
#
import os
from azureml.core import Run
import argparse
import pandas as pd
# load argparse
# note that --save refers to --save in PythonScriptStep
parser = argparse.ArgumentParser()
parser.add_argument("--save", type=str)
args = parser.parse_args()
# access input data
# note that 'raw_data' refers to my_dataset.as_named_input('raw_data') in PythonScriptStep
run = Run.get_context()
raw_data = run.input_datasets["raw_data"]
# turn the data into a dataframe
df1 = raw_data.to_pandas_dataframe()
# do something to the dataframe here
# finish by writing the processed data to storage
if not (args.save is None):
os.makedirs(args.save, exist_ok=True)
print("%s created" % args.save)
path = args.save + "/processed.parquet" # file name for parquet in ADLSGen2
write_df = df1.to_parquet(path)
#
Now we can execute the pipeline. If successful, we will move on to publish it so we can continually trigger it as our needs require.
# build the pipeline
pipeline1 = Pipeline(workspace=ws, steps=[step1])
# validate the pipeline
pipeline1.validate
# submit a pipeline run
pipeline_run1 = Experiment(ws, 'run_synapse_data').submit(pipeline1)
# run and wait for completion to check its results
pipeline_run1.wait_for_completion(show_output=True)
If we want to run this pipeline repeatedly through Synapse or AzureML, we need to publish it. The endpoint below provides the unique identifiers needed to select the pipeline from among the many we create.
# publish the pipeline for use in Synapse
published_pipeline = pipeline_run1.publish_pipeline(
name='synapse_aml_pipeline',
description='Endpoint for transforming synapse data and uploading to blob storage',
version='1.0')
print(published_pipeline.endpoint)
To close the Synapse -> AzureML pipeline loop, we need to establish a link between AzureML and Synapse so we can hook up our pipeline. To do so, we need to go into our AzureML resource and give our Synapse application a contributor
role through Access control (IAM)
. Once complete, we must do the following in Synapse to establish a connection:
Integrate
tab on the left of SynapseActivities
column, select Machine Learning
and drag Machine Learning Execute Pipeline
onto the pipeline space.Machine Learning Execute Pipeline
and select the Settings
tab.With a connection in hand, we can now connect our Synapse pipeline to AzureML such that after Synapse brings in fresh data to our data lake we can then have AzureML run some arbitrarily complex processing on it and save its output back to our data lake. To do so, we need to:
Machine Learning Execute Pipeline
Settings
Machine Learning pipeline name
, choose our published pipeline name: synapse_aml_pipeline
Machine learning pipeline ID
, choose the most recent result, or, the value returned from the above command in AzureML: print(published_pipeline.endpoint)
We are now ready to trigger the full pipeline which will:
out
folderTo run this operation, we need to select the Integrate
tab on the left and select Add trigger
-> Trigger Now
.
Our next logical task is to use our data in analysis with Data Bricks for parallel computing. To begin this process, we want to create an Azure Databricks workspace through the Azure Services homepage.
With our Data Bricks instance created, our next step is to create the security architecture that will allow Data Bricks to interact with our Data Lake.
Our first step is to create a key vault named dbkvdemo
using Azure Key vaults
. Next, we access Azure App registrations
and create a new registration called db_app
. We want to extract a couple important items from our registration once it is completed:
Next, we need to create a Client secret
inside our App registrations
for db_app
. We do this by selecting Certificates & secrets
in the left pane, clicking +
for New client secret
, adding a description db_secret
and setting an appropriate expiration. We then want to copy the value for db_secret
which should now be visible and will no longer be available after we leave this window: h_tVc.b2Z-Pq-alr~WU56mNm8V00Sez8l6.
Now, we need to return to our Key vaults
where we select our key vault dbkvdemo
and select Secrets
on the left pane. We then select + Generate/Import
, give it a name, dbappkey
, and give it the value from the App registrations
-> Certificates & secrets
-> Client secrets
-> Value
(h_tVc.b2Z-Pq-alr~WU56mNm8V00Sez8l6).
Now, we need to create a Secret Scope
in Data Bricks. We can access that by opening our Data Bricks connection and editing its URL like so: https://adb-368901377989901.1.azuredatabricks.net/?o=368901377989901
such that we add on this to the end: #secrets/createScope
. We set our ScopeName
to db_scope
. Ensure that Manage Principal
reads as All Users
as Creator
requires the premium subscription. For the DNS Name
we can get that information from opening our Key vault
-> Properties
-> Vault URI
(https://dbkvdemo.vault.azure.net/). The Resource ID
is found on the same page under Resource ID
and starts with /subscriptions/
.
With our Secret Scope
created, we can now create a new notebook and try accessing our Data Lake. To instantiate the access, we want to fill out the following code:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<application-id>",
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="<scope-name>",key="<service-credential-key-name>"),
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<directory-id>/oauth2/token"}
# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/",
mount_point = "/mnt/<mount-name>",
extra_configs = configs)
Where we replace the following fields with this data:
<application-id>
: The application (client) ID value found in our App registrations
.<scope-name>
: Our Data Bricks Secret Scope
name: db_scope
<service-credential-key-name>
: The name of the Secret
created under Secrets
in Azure Key vaults
. In our case, dbappkey
.<directory-id>
: The directly (tenant) ID found in our App registrations
<container-name>
: The name of our container for our Data Lake: demolake
(found in Azure Storage accounts
)<storage-account-name>
: The name of the Azure storage account: demoaf
<mount-name>
: We will remove this and mount at the base of our Data LakeLastly, we need to provide some accesses to our Data Lake. To do so, navigate to the Azure Storage accounts
and select Access Control (IAM)
. Next we select Add role assignment
and select Storage Blob Data Contributor
because we want to read and write data through our Data Bricks application. We need to assign this privilege to a member, so we click + Select members
and in the search box, we enter our App registration
name that we made earlier in this process named: db_app
. We then review/assign the assignment. This may take a few minutes for the security to work.
Now we can inject all of this data into our Data Bricks Notebook like so:
And we can see we now that have access to the parquet files located in our Data Lake and we can proceed with cleaning our data.
Lastly, we want to create and setup Azure DevOps inside Synapse for a few reasons. First, we want to instantiate version control and second, we want to backup our work.
Our first step is to access Azure DevOps (https://dev.azure.com/) which is hosted outside of the usual Azure services cloud portal. Inside DevOps, we want to create a new project, called synapse_demo
and initiate a repo
Next, in Azure Synapse, we want to select the Manage
tab on the left and select Git configuration
and connect to our recently created repo on DevOps. For the collaboration_branch
we should create a new branch called dev
(unlike the picture below) to separate our work from main
.
We now have version control working in Synapse. Next, we will hit Publish
at the top to send our work to DevOps and we will notice that in the dev
branch, Synapse has pushed our pipelines and integrated dataset connections to DevOps. Our work is now backed up.
Within Microsoft, Kusto servers are often used to store and retrieve data. Yet, extracting bulk data from Kusto servers can be tricky, owing to their limits on file sizes and memory limitations. The code below, set in the Source
section of our copy data tool, lets us extract any amount of information we desire and copy it to our data lake.
set query_fanout_nodes_percent = 50;
let startTime = ago(3d);
let endTime = ago(0d);
cluster("kusto.windows.net").database("db1").nodecpuinfo
| where TIMESTAMP between (startTime .. endTime)
| project TIMESTAMP, Var1, Var2, Var3
| take 1750000;