If your organization doesn’t have enough data to require Azure SQL Warehouse with Polybase loading from data lakes, you might have observed that loading much data with Azure SQL databases can take some time. Today I show an example of how to use Databricks delta together with stored procedures to speed this up.
The case for today is the Norwegian public registry of enterprises, which is offering a publicly available dataset of about one million rows. Not the biggest dataset, but big enough to make a difference, and not so big that I have to scale up my Azure environment… The API is offering both a full data dump, and a update service where you can get updated entities.
The setup for loading the files are quite simple. I set a timestamp for when to look for updates and use a web activity to get the updated entities. I get a list here of all of them, so afterwards I have to loop through each one to collect the actual values through a copy activity. This copy activity uses the timestamp so that I can filter later in my Databricks notebook.
dbutils.secrets.get(scope = “keyvault-datahelge”, key = “adlsAccessKey”) )
val path = “abfss://firstname.lastname@example.org/delta/brreg_delta_” + dbutils.widgets.get(“date”) + “_*.json”
val inputDf = spark.read
Reading in data in Databricks with filter on date
CREATE OR REPLACE TEMPORARY VIEW tMainUnits ASCreating a view with English column names, and setting version by using Databricks delta
organisasjonsnummer AS OrganizationNumber,
— and more columns here
“Delta” AS Change,
current_timestamp as InsertTimestamp,
current_timestamp as UpdateTimestamp,
b.version AS Version
FROM inputJSON AS a
CROSS JOIN (select max(version)+1 as version from (describe history BrregUnits)) AS b;
MERGE INTO BrregUnitsMerging data in databricks
USING tMainUnits AS updates
ON BrregUnits.OrganizationNumber == updates.OrganizationNumber
OrganizationName = updates.OrganizationName,
Change = updates.Change,
UpdateTimestamp = updates.UpdateTimestamp,
Version = updates.Version
WHEN NOT MATCHED
%scalaOutputting only changed data to parquet
val outputdelta = spark.sql(“select * from brregunits where version = (select max(version) as version from (describe history BrregUnits))”)
val pathdelta = “abfss://email@example.com/processed/Delta”
The notebook is quite simple. The magic is done with the merge statement where I update or insert records. I have selected to include a column that will have the version in Databricks delta that these records belong to; (select max(version)+1 as version from (describe history BrregUnits)). This means that in the end of the script I can output the rows with the latest version to a parquet file, and this will again be my source file for Azure Data Factory.
You can find the documentation for setting up this type of Azure SQL sink in the documentation.
So, what were the results?
– Loading initial data took nearly 8 minutes, and gave me a recommendation to scale up my database
– Loading delta data only takes a few seconds
This was no way a scientific performance test, but it shows how Databricks delta can be combined to speed up some data loads.