# load python
library(reticulate)
use_python('C:/Users/Andrew/Anaconda3/')
use_condaenv(condaenv='my_ml', required=TRUE)
library(knitr)

Introduction

Azure Synapse Analytics is a limitless analytics service that brings together data integration, enterprise data warehousing, and big data analytics. It gives you the freedom to query data on your terms, using either serverless or dedicated resources—at scale. Azure Synapse brings these worlds together with a unified experience to ingest, explore, prepare, manage, and serve data for immediate BI and machine learning needs.

In this demo, we will walk through the necessary procedures to:

Create an Azure Synapse Environment

In order to create an Azure Synapse environment, several preparatory objects are needed:

In the image below, we create the Synapse workspace, storage account, and file system name in one single effort.

Initiate Azure Synapse Analytics

We can now launch Azure Synapse Analytics, which looks like the below image:

Create SQL Server and Database

We will first create a basic SQL server and database with AdventureWorks data so that we can gain experience setting that up before we extract information from there. When creating the SQL server, ensure that you access the Networking tab and select: Allow Azure services and resources to access this server.

Create AdventureWorks Database

Azure makes it easy to load a SQL database with sample data. Once the SQL server is provisioned, we need to create an Azure SQL Database in a similar fashion. Ensure that on the Additional settings tab, you select Sample from Use existing data.

Giving Synapse Access to the SQL Server

Next, we need to create some specific permissions for Synapse such that it can connect to our SQL data base. This requires that we gain access to our SQL server to execute a few commands. There are two ways to do this:

  1. Set a firewall allowance for your IP. On the SQL Database page, select Set server firewall. Find your IP address and specify it here; hit Save. Now, you can select Query editor on the SQL Database page and login with the username and password you instantiated.

  1. Set yourself as an Azure Active Directory admin over the SQL Server. Now, return to the SQL Database, select Query editor, and login with Azure Active Directory.

Query Editor Commands

Now that we have access to our SQL server, we need to execute a few commands to let our Synapse application have Managed Identity access to it.

create user [synapseaftemp] from external provider;
exec sp_addrolemember 'db_datareader', 'synapseaftemp';

This should successfully run. Now we are ready to instantiate a link to our SQL database with Azure Synapse.

Create a Connection to Azure SQL Database

To use a modern data warehousing technology like a Data Lake, we need to fill it with data. Data Lakes store data of any size, shape, and speed, and do all types of processing and analytics across platforms and languages. Synapse can fill our Data Lake with data from just about any source we can imagine, but to do that, we must setup connections between the sources of our data (e.g., SQL tables) and the sink (our Data Lake) so that we can copy the data and have our own versions for analysis.

Our first task is to connect to an Azure SQL Database which has the commonly known AdventureWorks data set already loaded. We take the following steps:

  1. Select the Data tab
  2. Select the Linkedtab
  3. Press the + sign
  4. Select Integration dataset
  5. Select Azure
  6. Select Azure SQL Database – the data source we want a connection with
  7. Provide a unique name for the connection; here we use aw_tables
  8. Select + under Linked Service; as its our first time establishing a connection here
  9. Provide another name for the linked service; here we use sqldatabase_adventureworks
  10. Select the server from subscription (or manually)
  11. Select the Server name: awtempserver
  12. Select the Database name: adventureworks
  13. Select Authentication type: Managed Identity (the SQL commands above gave it this privilege)
  14. Test the connection and create it if successful
  15. Choose no specific table name (unless desired)
  16. Select the Import schema: None

We now have a connection established to our SQL database, select Publish all!

Copy SQL Database Data to Azure Data Lake Storage Gen2

Now we are ready to retrieve data from our SQL database. We proceed with the following steps:

  1. Select the Integrate tab on the left of Synapse
  2. Press the + sign
  3. Select Copy Data tool
  4. Select Built-in copy task
  5. Press Next
  6. For Source data store, select the Connection: sqldatabase_adventureworks
  7. Select the tables we want to retrieve: SalesLT.Address and SalesLT.Customer
  8. Press Next
  9. Apply no filter – Press Next
  10. For Destination data store, select Connection: synapseaftemp-WorkspaceDefaultStorage (our ADLSGen2), Folder path: root/RAW/adventure_works (where the parquet files will be stored), Folder name suffix: replace .txt with .parquet (or the extension of the file you want to use in the next step)
  11. For File format settings, select Parquet from File format, select Next
  12. Specify our data copy pipeline with an interpretable name: aw_datacopy_pipeline
  13. Review the Summary and select Next, and then Finish

Now we can take a peak into our data lake and see that our pipeline successfully extracted our tables and placed them into the root/RAW/adventure_works folder.

Integrate Synapse with AzureML

Generate Service Principal Security

Next, lets see how we can access and analyze the information in our data lake in conjunction with AzureML. At this stage, we assume that you have created an AzureML resource, created a compute cluster to use, and created a new notebook.

Next, we need to create a Service Principal. Service Prinicpals manages authentication and comes into handy when we want to automate workflows. This type of authentication decouples the authentication process from any specific user and allows for managed access control. To create a Service Principal, we:

  1. Enter Azure Active Directory from the Azure Portal
  2. Select App registrations
  3. Select new registration
  4. Give it a unique and task familiar name, e.g., AzureML SP CHIECXO
  5. Assign it a single tenant
  6. Select Register

Next, we need to retrieve information of interest from the app registration page:

  1. Application (client) ID
  2. Directory (tenant) ID
  3. Secret

The secret is obtained by selecting Certificates & secrets and generating a new client secret. Save this information, as follows, to a file named .env.

AML_TENANT_ID=Directory (tenant) ID
AML_PRINCIPAL_ID=Application (client) ID
AML_PRINCIPAL_PASS=Secret

We do this in order to hide away this information from our notebooks. We will store this file on AzureML and load it with the python package python-dotenv.

Ensure AzureML is Updated

First, run these commands in a jupyter notebook:

!pip install --upgrade azureml-core
!pip install --upgrade azureml-pipeline-core
!pip install --upgrade azureml-sdk
!pip install --upgrade azureml-defaults
!pip install python-dotenv

Establish the Service Principal

# instantiate the service principal
sp = ServicePrincipalAuthentication(tenant_id=os.environ['AML_TENANT_ID'],
                                    service_principal_id=os.environ['AML_PRINCIPAL_ID'],
                                    service_principal_password=os.environ['AML_PRINCIPAL_PASS'])

Establish a Workspace

# instantiate a workspace
ws = Workspace(subscription_id = "2c3b88a1-7aa0-4107-b413-d4c701e0afc8",
               resource_group = "rg_chie_training",
               auth=sp,  # use service principal auth
               workspace_name = "training_aml")

print("Found workspace {} at location {}".format(ws.name, ws.location))

Establish a Compute Cluster

# choose a name for your CPU cluster
cpu_cluster_name = "cpu-cluster"

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_DS3_v2',
                                                            idle_seconds_before_scaledown='400',
                                                            min_nodes=0,
                                                            max_nodes=4)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

cpu_cluster.wait_for_completion(show_output=True)

Create Compute Package Dependencies

# create a new runconfig object
aml_run_config = RunConfiguration()

# use the aml_compute you created above. 
aml_run_config.target = cpu_cluster

# set docker base image to the default CPU-based image
aml_run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

# Use conda_dependencies.yml to create a conda environment in the Docker image for execution
aml_run_config.environment.python.user_managed_dependencies = False

# Specify CondaDependencies obj, add necessary packages
aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(
    conda_packages=['pip', 'pandas', 'numpy', 'python==3.6'], 
    pip_packages=['azureml-core==1.31.0', 'azureml-sdk', 'python-dotenv'])

print ("Run configuration created.")

Generate Access to the ADLSGen2 Data Lake

To setup a link to the Synapse ADLSGen2 Data Lake, we need to instantiate the code below and retrieve a bunch of information as follows:

  1. subscription_id: the subscription ID of the ADLSGen2 account found on the Azure portal -> Containers
  2. resource_group: the resource group that is paying for the ADLSGen2 account
  3. account_name: the name of the storage account associated with the ADLSGen2 found on Azure portal -> Containers
  4. tenant_id: the tenant ID of your organization
  5. client_id: the Service Principal client ID. You must go to the Azure portal, click App Registration, and create a new app registration for the ADLSGen2 account. The client ID for this app registration goes here.
  6. client_secret: the secret for the app registration. To create one, go to: Azure Active Directory -> App Registration (for #5 above) -> Certificates & secrets
# set ADLS Gen2 storage account alias in AML
adlsgen2_datastore_name = 'my_lake'

# subscription id of ADLS account
subscription_id=os.getenv("ADL_SUBSCRIPTION", "redacted")
# resource group of ADLS account
resource_group=os.getenv("ADL_RESOURCE_GROUP", "redacted")

# ADLS Gen2 account name
account_name=os.getenv("ADLSGEN2_ACCOUNTNAME", "aflake")
# tenant id of service principal -> Azure Active Directory
tenant_id=os.getenv("ADLSGEN2_TENANT", "redacted")
# client id of service principal -> Azure Active Directory -> App Registration
client_id=os.getenv("ADLSGEN2_CLIENTID", "redacted")
# the secret of service principal -> Azure Active Directory -> App Registration -> Certificates & secrets
client_secret=os.getenv("ADLSGEN2_CLIENT_SECRET", "redacted") 

adlsgen2_datastore = Datastore.register_azure_data_lake_gen2(workspace=ws,
                                                             datastore_name=adlsgen2_datastore_name,
                                                             account_name=account_name, # ADLS Gen2 account name
                                                             filesystem='root', # ADLS Gen2 filesystem
                                                             tenant_id=tenant_id, # tenant id of sp
                                                             client_id=client_id, # client id of sp
                                                             client_secret=client_secret) # the secret of sp

Pipeline Preparation

Notice that the location information below ignores root, the name of the container for the ADLSGen2 account. This is necessary because we already specify the filesystem above in adlsgen2_datastore.

# create a Dataset object from parquet files located on the data lake
my_dataset = Dataset.Tabular.from_parquet_files([(adlsgen2_datastore, '/RAW/adventure_works/SalesLTAddress.parquet')])
# location to write pipeline output to
step1_output_data_adlsgen2 = OutputFileDatasetConfig(name="processed_data",
                                                    destination=(adlsgen2_datastore, "/RAW/out")
                                                    ).as_upload(overwrite=True)

Next we instantiate a PythonScriptStep which is the workhorse of an AzureML pipeline.

# pipeline step 1
step1 = PythonScriptStep(
    name="generate_data",
    inputs=[my_dataset.as_named_input('raw_data')],
    script_name="scripts/do_stuff.py",
    arguments=["--save", step1_output_data_adlsgen2],
    runconfig=aml_run_config,
    allow_reuse=False
)

Create do_stuff.py Pipeline Script

To do things to our parquet data, we need to prepare our scripts/do_stuff.py file located in AzureML very carefully. Below is an minimally working example:

#
import os
from azureml.core import Run
import argparse
import pandas as pd

# load argparse
# note that --save refers to --save in PythonScriptStep
parser = argparse.ArgumentParser()
parser.add_argument("--save", type=str)
args = parser.parse_args()

# access input data
# note that 'raw_data' refers to my_dataset.as_named_input('raw_data') in PythonScriptStep
run = Run.get_context()
raw_data = run.input_datasets["raw_data"]

# turn the data into a dataframe
df1 = raw_data.to_pandas_dataframe()

# do something to the dataframe here



# finish by writing the processed data to storage
if not (args.save is None):
    os.makedirs(args.save, exist_ok=True)
    print("%s created" % args.save)
    path = args.save + "/processed.parquet"  # file name for parquet in ADLSGen2
    write_df = df1.to_parquet(path)
    
#

Execute the Pipeline

Now we can execute the pipeline. If successful, we will move on to publish it so we can continually trigger it as our needs require.

# build the pipeline
pipeline1 = Pipeline(workspace=ws, steps=[step1])

# validate the pipeline
pipeline1.validate

# submit a pipeline run
pipeline_run1 = Experiment(ws, 'run_synapse_data').submit(pipeline1)

# run and wait for completion to check its results
pipeline_run1.wait_for_completion(show_output=True)

Publish the Pipeline

If we want to run this pipeline repeatedly through Synapse or AzureML, we need to publish it. The endpoint below provides the unique identifiers needed to select the pipeline from among the many we create.

# publish the pipeline for use in Synapse
published_pipeline = pipeline_run1.publish_pipeline(
    name='synapse_aml_pipeline',
    description='Endpoint for transforming synapse data and uploading to blob storage',
    version='1.0')

print(published_pipeline.endpoint)

Create a Synapse-AzureML Pipeline

To close the Synapse -> AzureML pipeline loop, we need to establish a link between AzureML and Synapse so we can hook up our pipeline. To do so, we need to go into our AzureML resource and give our Synapse application a contributor role through Access control (IAM). Once complete, we must do the following in Synapse to establish a connection:

  1. Select the Integrate tab on the left of Synapse
  2. Under the Activities column, select Machine Learning and drag Machine Learning Execute Pipeline onto the pipeline space.
  3. Click Machine Learning Execute Pipeline and select the Settings tab.
  4. Establish a new Azure Machine Learning linked service
  5. Test the connection and save.

Run AzureML Pipeline after Synapse Copies Fresh Data to the ADLSGen2

With a connection in hand, we can now connect our Synapse pipeline to AzureML such that after Synapse brings in fresh data to our data lake we can then have AzureML run some arbitrarily complex processing on it and save its output back to our data lake. To do so, we need to:

  1. Select the Machine Learning Execute Pipeline
  2. Select Settings
  3. For Machine Learning pipeline name, choose our published pipeline name: synapse_aml_pipeline
  4. For Machine learning pipeline ID, choose the most recent result, or, the value returned from the above command in AzureML: print(published_pipeline.endpoint)
  5. Publish

Trigger the Synapse-AzureML Pipeline

We are now ready to trigger the full pipeline which will:

  1. Copy fresh data from our SQL database and place it into our data lake
  2. AzureML will read and arbitrarily transform the data and then save it back to the data lake into the out folder

To run this operation, we need to select the Integrate tab on the left and select Add trigger -> Trigger Now.

Connect Azure Data Lake Storage Gen2 to Azure Data Bricks

Our next logical task is to use our data in analysis with Data Bricks for parallel computing. To begin this process, we want to create an Azure Databricks workspace through the Azure Services homepage.

With our Data Bricks instance created, our next step is to create the security architecture that will allow Data Bricks to interact with our Data Lake.

Our first step is to create a key vault named dbkvdemo using Azure Key vaults. Next, we access Azure App registrations and create a new registration called db_app. We want to extract a couple important items from our registration once it is completed:

Next, we need to create a Client secret inside our App registrations for db_app. We do this by selecting Certificates & secrets in the left pane, clicking + for New client secret, adding a description db_secret and setting an appropriate expiration. We then want to copy the value for db_secret which should now be visible and will no longer be available after we leave this window: h_tVc.b2Z-Pq-alr~WU56mNm8V00Sez8l6.

Now, we need to return to our Key vaults where we select our key vault dbkvdemo and select Secrets on the left pane. We then select + Generate/Import, give it a name, dbappkey, and give it the value from the App registrations -> Certificates & secrets -> Client secrets -> Value (h_tVc.b2Z-Pq-alr~WU56mNm8V00Sez8l6).

Now, we need to create a Secret Scope in Data Bricks. We can access that by opening our Data Bricks connection and editing its URL like so: https://adb-368901377989901.1.azuredatabricks.net/?o=368901377989901 such that we add on this to the end: #secrets/createScope. We set our ScopeName to db_scope. Ensure that Manage Principal reads as All Users as Creator requires the premium subscription. For the DNS Name we can get that information from opening our Key vault -> Properties -> Vault URI (https://dbkvdemo.vault.azure.net/). The Resource ID is found on the same page under Resource ID and starts with /subscriptions/.

With our Secret Scope created, we can now create a new notebook and try accessing our Data Lake. To instantiate the access, we want to fill out the following code:

configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": "<application-id>",
          "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="<scope-name>",key="<service-credential-key-name>"),
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<directory-id>/oauth2/token"}

# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/",
  mount_point = "/mnt/<mount-name>",
  extra_configs = configs)

Where we replace the following fields with this data:

Lastly, we need to provide some accesses to our Data Lake. To do so, navigate to the Azure Storage accounts and select Access Control (IAM). Next we select Add role assignment and select Storage Blob Data Contributor because we want to read and write data through our Data Bricks application. We need to assign this privilege to a member, so we click + Select members and in the search box, we enter our App registration name that we made earlier in this process named: db_app. We then review/assign the assignment. This may take a few minutes for the security to work.

Now we can inject all of this data into our Data Bricks Notebook like so:

And we can see we now that have access to the parquet files located in our Data Lake and we can proceed with cleaning our data.

Setup Azure DevOps in Synapse for Git control

Lastly, we want to create and setup Azure DevOps inside Synapse for a few reasons. First, we want to instantiate version control and second, we want to backup our work.

Our first step is to access Azure DevOps (https://dev.azure.com/) which is hosted outside of the usual Azure services cloud portal. Inside DevOps, we want to create a new project, called synapse_demo and initiate a repo

Next, in Azure Synapse, we want to select the Manage tab on the left and select Git configuration and connect to our recently created repo on DevOps. For the collaboration_branch we should create a new branch called dev (unlike the picture below) to separate our work from main.

We now have version control working in Synapse. Next, we will hit Publish at the top to send our work to DevOps and we will notice that in the dev branch, Synapse has pushed our pipelines and integrated dataset connections to DevOps. Our work is now backed up.

Copy Data From Kusto Servers to Data Lake

Within Microsoft, Kusto servers are often used to store and retrieve data. Yet, extracting bulk data from Kusto servers can be tricky, owing to their limits on file sizes and memory limitations. The code below, set in the Source section of our copy data tool, lets us extract any amount of information we desire and copy it to our data lake.

set query_fanout_nodes_percent = 50;
let startTime = ago(3d);
let endTime = ago(0d);
cluster("kusto.windows.net").database("db1").nodecpuinfo
| where TIMESTAMP between (startTime .. endTime)
| project TIMESTAMP, Var1, Var2, Var3
| take 1750000;