Using Databricks delta to speed up Azure SQL load

If your organization doesn’t have enough data to require Azure SQL Warehouse with Polybase loading from data lakes, you might have observed that loading much data with Azure SQL databases can take some time. Today I show an example of how to use Databricks delta together with stored procedures to speed this up.

The case for today is the Norwegian public registry of enterprises, which is offering a publicly available dataset of about one million rows. Not the biggest dataset, but big enough to make a difference, and not so big that I have to scale up my Azure environment… The API is offering both a full data dump, and a update service where you can get updated entities.

Loading the delta files.

The setup for loading the files are quite simple. I set a timestamp for when to look for updates and use a web activity to get the updated entities. I get a list here of all of them, so afterwards I have to loop through each one to collect the actual values through a copy activity. This copy activity uses the timestamp so that I can filter later in my Databricks notebook.

%scala
spark.conf.set(
“fs.azure.account.key.storagedatahelge.dfs.core.windows.net”,
dbutils.secrets.get(scope = “keyvault-datahelge”, key = “adlsAccessKey”) )


val path = “abfss://brreg@storagedatahelge.dfs.core.windows.net/delta/brreg_delta_” + dbutils.widgets.get(“date”) + “_*.json”
val inputDf = spark.read
.json(path)


inputDf.createOrReplaceTempView(“inputJSON”)

Reading in data in Databricks with filter on date

CREATE OR REPLACE TEMPORARY VIEW tMainUnits AS
SELECT
organisasjonsnummer AS OrganizationNumber,
— and more columns here
“Delta” AS Change,
current_timestamp as InsertTimestamp,
current_timestamp as UpdateTimestamp,
b.version AS Version
FROM inputJSON AS a
CROSS JOIN (select max(version)+1 as version from (describe history BrregUnits)) AS b;

Creating a view with English column names, and setting version by using Databricks delta

MERGE INTO BrregUnits
USING tMainUnits AS updates
ON BrregUnits.OrganizationNumber == updates.OrganizationNumber
WHEN MATCHED
THEN
UPDATE SET
OrganizationName = updates.OrganizationName,
Change = updates.Change,
UpdateTimestamp = updates.UpdateTimestamp,
Version = updates.Version
WHEN NOT MATCHED
THEN
INSERT (OrganizationNumber,…,Change,InsertTimestamp,UpdateTimestamp,Version)
VALUES
(updates.OrganizationNumber,…,updates.Change,updates.InsertTimestamp,updates.UpdateTimestamp,updates.Version)

Merging data in databricks

%scala
val outputdelta = spark.sql(“select * from brregunits where version = (select max(version) as version from (describe history BrregUnits))”)
val pathdelta = “abfss://brreg@storagedatahelge.dfs.core.windows.net/processed/Delta”
outputdelta
.write.format(“parquet”)
.mode(“overwrite”)
.save(pathdelta)

Outputting only changed data to parquet

The notebook is quite simple. The magic is done with the merge statement where I update or insert records. I have selected to include a column that will have the version in Databricks delta that these records belong to; (select max(version)+1 as version from (describe history BrregUnits)). This means that in the end of the script I can output the rows with the latest version to a parquet file, and this will again be my source file for Azure Data Factory.

Writing to Azure SQL Database with a stored procedure

You can find the documentation for setting up this type of Azure SQL sink in the documentation.

So, what were the results?
– Loading initial data took nearly 8 minutes, and gave me a recommendation to scale up my database
– Loading delta data only takes a few seconds

This was no way a scientific performance test, but it shows how Databricks delta can be combined to speed up some data loads.

Leave a Reply

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.