Using Databricks delta to speed up Azure SQL load

If your organization doesn’t have enough data to require Azure SQL Warehouse with Polybase loading from data lakes, you might have observed that loading much data with Azure SQL databases can take some time. Today I show an example of how to use Databricks delta together with stored procedures to speed this up.

The case for today is the Norwegian public registry of enterprises, which is offering a publicly available dataset of about one million rows. Not the biggest dataset, but big enough to make a difference, and not so big that I have to scale up my Azure environment… The API is offering both a full data dump, and a update service where you can get updated entities.

Loading the delta files.

The setup for loading the files are quite simple. I set a timestamp for when to look for updates and use a web activity to get the updated entities. I get a list here of all of them, so afterwards I have to loop through each one to collect the actual values through a copy activity. This copy activity uses the timestamp so that I can filter later in my Databricks notebook.

%scala
spark.conf.set(
“fs.azure.account.key.storagedatahelge.dfs.core.windows.net”,
dbutils.secrets.get(scope = “keyvault-datahelge”, key = “adlsAccessKey”) )


val path = “abfss://brreg@storagedatahelge.dfs.core.windows.net/delta/brreg_delta_” + dbutils.widgets.get(“date”) + “_*.json”
val inputDf = spark.read
.json(path)


inputDf.createOrReplaceTempView(“inputJSON”)

Reading in data in Databricks with filter on date

CREATE OR REPLACE TEMPORARY VIEW tMainUnits AS
SELECT
organisasjonsnummer AS OrganizationNumber,
— and more columns here
“Delta” AS Change,
current_timestamp as InsertTimestamp,
current_timestamp as UpdateTimestamp,
b.version AS Version
FROM inputJSON AS a
CROSS JOIN (select max(version)+1 as version from (describe history BrregUnits)) AS b;

Creating a view with English column names, and setting version by using Databricks delta

MERGE INTO BrregUnits
USING tMainUnits AS updates
ON BrregUnits.OrganizationNumber == updates.OrganizationNumber
WHEN MATCHED
THEN
UPDATE SET
OrganizationName = updates.OrganizationName,
Change = updates.Change,
UpdateTimestamp = updates.UpdateTimestamp,
Version = updates.Version
WHEN NOT MATCHED
THEN
INSERT (OrganizationNumber,…,Change,InsertTimestamp,UpdateTimestamp,Version)
VALUES
(updates.OrganizationNumber,…,updates.Change,updates.InsertTimestamp,updates.UpdateTimestamp,updates.Version)

Merging data in databricks

%scala
val outputdelta = spark.sql(“select * from brregunits where version = (select max(version) as version from (describe history BrregUnits))”)
val pathdelta = “abfss://brreg@storagedatahelge.dfs.core.windows.net/processed/Delta”
outputdelta
.write.format(“parquet”)
.mode(“overwrite”)
.save(pathdelta)

Outputting only changed data to parquet

The notebook is quite simple. The magic is done with the merge statement where I update or insert records. I have selected to include a column that will have the version in Databricks delta that these records belong to; (select max(version)+1 as version from (describe history BrregUnits)). This means that in the end of the script I can output the rows with the latest version to a parquet file, and this will again be my source file for Azure Data Factory.

Writing to Azure SQL Database with a stored procedure

You can find the documentation for setting up this type of Azure SQL sink in the documentation.

So, what were the results?
– Loading initial data took nearly 8 minutes, and gave me a recommendation to scale up my database
– Loading delta data only takes a few seconds

This was no way a scientific performance test, but it shows how Databricks delta can be combined to speed up some data loads.

3 comments on “Using Databricks delta to speed up Azure SQL load

  1. Hi
    Can you explain by showing pipeline activity about how to load new data using “copy activity” in datafactory pipleline. At present, copy activity take data from source database and load data in datalake storage, which is taking lot of time. I want to take only new data from source database source so that i do not need to load whole table once again to ADLA storage.
    ___following is a current flow of pipeline.
    CopyData1—>Untillactivity—>U-Sql—->CopyData2

    In U-sql stage, we take full json file, transform it, Truncate ADLA Table, and in last Insert data into that table.
    I want to take only new data in “CopyData1” stage so that only new records come and then in U-Sql stage I should not truncate table to load full data again, i want to insert only ?

    IF YOU SHOW WITH PICTURE THAT WILL BE GREAT HELP
    Thanks

    1. Hi

      When copying from a source database the only way to only include new data is if the source data has a column you can filter based on your previous load. In this blog post I use a source that supports this as I can ask for new or updated records since my last load.

      As you are using U-SQL this is what I recommend to do:
      1. Add a new / modify your U-SQL script to create a file with last run date
      2. Use a lookup to find this date
      3. Filter the source SQL with this.

      1: U-SQL script
      DECLARE EXTERNAL @lastRunDate = “2019-08-24T15:27:00”;
      @lrd = SELECT * FROM (VALUES(@lastRunDate)) AS T(LastRunDate);
      OUTPUT @ldr TO “/output/LastRunDate.csv” USING Outputters.Csv(outputHeader:true);

      For the U-SQL activity you send in @Pipeline().TriggerTime to the lastRunDate. This will give you a file containing the last time you started the job.

      2. Add a Lookup activity in ADF on this file before CopyData1

      3. Alter the SQL query for CopyData1 to: @concat(‘SELECT * FROM WHERE LastModifiedDate=’, LookupLastRunDate.output.value.LastRunDate)

      Something like that should work, at least if you have a column for last modfied date in the source. Don’t have a U-SQL account for the moment so could create screenshots 🙂

      3. Alter you source query

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.