Stelo Technical Documents

Using Stelo Data Lake Sink Connector for Azure Databricks

Last Update: 26 March 2024
Product: StarQuest Data Replicator
Version: SQDR 6.20 or later
Article ID: SQV00DR051

Abstract

Replicating data to Databricks and Data Lake storage can be accomplished in several ways:

  1. Using the traditional SQDR four tier model, where Data Lake Storage (tier 4) is accessed using only the Simba Spark ODBC driver (provided by Databricks) running on tier 3.
  2. Using SQDR Streaming Support to send data via Kafka from tier 2 through Azure Event Hubs or another Kafka broker to a consumer application running as Scala code in a notebook in the Databricks cluster; this code will apply data to the Delta Lake. See Using SQDR Streaming Support with Azure Databricks for details.
  3. Running Scala code in a notebook in the Databricks cluster to pull changes from the tier 2 staging database.

Note that the Simba Spark ODBC driver may also be used in the latter two methods (e.g. for creating the target table and performing a baseline) but is not directly involved with change data.

The subject of this technical document. is scenario #3 (running Scala code in a notebook in the Databricks cluster to pull changes from the tier 2 staging database).

When implementing the Scale code, there are two methods that can be chosen:

  • ApplyJdbc.apply is intended to support multiple Tier 2 Agents, and is based upon a configuration of the connection to the Tier 3 control database (i.e. SQDRC). It is intended to support “continuous” mode and is the preferred method for ease of configuration and low latency results. However, it may also be more expensive as it involves running the Tier 4 cluster continuously to achieve low latency.
  • ApplyJdbc.run is intended to support the "Run once and exit" mode of the Scala code. It supports a single Tier 2 Agent (i.e. the configuration points to a specific SQDRPn database). ApplyJdbc.run can be used to minimize the cost of Tier 4 processing when low latency is not the primary requirement.

See Reference below for details.

Prerequisites

  • SQDR and SQDR Plus 6.20 & later; combined tier environment (i.e. both tiers residing on the same system).
  • The control database for SQDR should be a local IBM Db2 LUW database named SQDRC, with control tables located in the schema SQDR (i.e. default configuration). The use of Microsoft SQL Server or remote control databases is not currently supported.
  • An Azure account to which Databricks support can be added.
  • Network connectivity (typically port 50000) from the Databricks environment to Db2 LUW running on the SQDR system is required. A VPN may need to be configured or a firewall exception may be required. See SSL for configuring TLS/SSL connectivity.
  • Verify that the password for the Windows user sqdr is known.

Databricks Setup

  • Access the Azure Databricks Service by entering Databricks in the Search dialog.
  • Create a Databricks workspace:
    • Select the Pricing Tier. Premium is required if you want to use Unity Catalog.
    • On the Networking panel, you can choose the type of networking. In our case, we left both choices at the default of No (No Secure Cluster Connectivity = Public IP; no Virtual Network)
  • Go to the Databricks resource and Launch the Workspace. Sign in to Databricks if necessary.
  • Select the Compute icon in the left column to access clusters.



  • Create a cluster and specify its characteristics. For functional testing (when performance is not necessarily a concern), you can create a minimal cluster:
    • Cluster mode: single node
    • Choose a Databricks runtime version e.g. 11.3 LTS (Scala 2.12, Spark 3.3.0)
    • Terminate after 15 minutes of inactivity (default 120)
    • Standard_DS3_v2
  • You can enter additional configuration options in the Spark Config field under Advanced Options. The following options have been suggested to improve performance of the Scala code, but have not been verified:

spark.databricks.delta.autoOptimize.optimizeWrite true
spark.databricks.delta.preview.enabled true
spark.databricks.delta.optimize.maxFileSize 33554432
spark.databricks.delta.autoCompact.enabled false
spark.databricks.delta.optimizeWrite.enabled true




Creating the cluster takes about 5 minutes.

After the cluster is created:

  • Select Libraries/Install New/Upload/Jar
  • Drop the JAR file SqdrSparkApply.jar (from C:\Program Files\StarQuest\sqdrplus\Tools)
    Click Install.
  • Similarly, install the JAR files db2jcc4.jar and db2jcc_license_cu.jar (from C:\Program Files\IBM\SQLLIB\java).
  • Or, if you are updating to a newer version of SqdrSparkApply.jar, delete the library from the cluster, restart the cluster, and then add the new jar file.



Create a SQL Notebook and Destination Schema

If desired, you can create a destination schema (rather than using default) by creating a Notebook with language SQL and entering and running the command CREATE SCHEMA MYSCHEMA. The SQL notebook can be used later for verifying replication results.

Databricks Scala Notebook Setup

Select the + sign in the left column and select Create Notebook. In recent versions of the Databrick GUI, the label New may apear next to the + sign.

 

  • Give it a name
  • Default Language: Scala
  • Cluster - select the appropriate cluster

 

 

  • Paste in the following Scala code and change the appropriate values and change the appropriate values. Also choose which method to use (ApplyJdbc.run or ApplyJdbc.apply) and comment/uncomment the appropriate sections. See Reference below for more information about the two methods. See SSL below to connect to the staging database using TLS/SSL.

%scala

import org.apache.spark.sql.SparkSession
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}

import com.starquest.sqdr.spark.scala.{ApplyJdbc}

val jdbcHostname = "MY-IP" (or Myhost)
val jdbcPort = 50000 // optional for ApplyJdbc.apply
val jdbcUsername = "SQDR"
val jdbcPassword = "mypassword"
val unityValue=true // optional - default is false
val continValue=false // optional - default is true

// ApplyJdbc.run method - connect directly to the staging database

// val jdbcDatabase = "SQDRP0"
// val controlDbSchema = "SQDR"
// val destinationId = null // optional - default is null

// ApplyJdbc.run(spark, jdbcHostname, jdbcPort, jdbcDatabase,
//jdbcUsername, jdbcPassword, controlDbSchema,unity=unityValue, continuousOperation=continValue, incrementalThreads = 5, snapshotThreads = 5)

// ApplyJdbc.apply - connect to the T3 control database SQDRC

val jdbcDatabase = "SQDRC" // optional - default is "SQDRC"
val controlDbSchema = "SQDR" // optional - default is "SQDR"
val destinationName = "MyDestination" // required - get name from drmgr

ApplyJdbc.apply(spark, destinationName, jdbcUsername, jdbcPassword, jdbcHostname, jdbcPort, jdbcDatabase,unity=unityValue, continuousOperation=continValue, incrementalThreads = 10, snapshotThreads = 10)

// simplest invocation (using defaults)
// ApplyJdbc.apply(spark, destinationName, jdbcUsername,
// jdbcPassword, jdbcHostname)

Running the Scala code

In the upper right corner of the work area, next to the control containing the words Scala, click the Run symbol and select Run Cell
or enter Shift+Enter

Observe that the version number SqdrSparkApply.jar is displayed at the bottom of the screen. Watch this area for job progress.

If there are any errors, fix them.

Otherwise, the job is running.

Note that unless continuousOperation=false is specified, the job runs continuously, keeping the compute cluster active (i.e. the 15 minute inactivity timer will not be effective), incurring non-trivial charges. If there is no work to perform, stop the job by selecting Stop Execution at the top of the screen.

In a typical scenario, you may want to set up a job to run the notebook on a scheduled timer.

To view the resulting data:

  • Create a new notebook of type SQL and enter a SQL statement.

or

  • Select Data from the left column and navigate to Database and Table. You can view the schema, sample data, creation and modification dates.

To return to the notebook later, select Compute from the left column, open the cluster (which should be running), and select Notebooks.

Creating an SQDR destination using the Simba ODBC driver

If you are interesting replicating only incremental change data to Databricks, using the Simba ODBC driver is optional; instead you can use any destination e.g. the SQDRC control database as a place-holder destination.

However, installing and configuring the Simba ODBC driver to communicate with Databricks can also handle creation of the destination tables. In addition, you can use the driver to replicate the baseline, though running the baseline through the Sink Connector is recommended.

Obtain connection information from the Databricks workspace:

  • Generate a token (to be used in place of a password):
    • Select your userID in the upper right corner.
    • Select User Settings.
    • Go to the Access Tokens tab.
    • Select the Generate New Token button. Be sure to save the token, as there is no way to recover it other than generating a new token.
  • Obtain the connection string:
    • Select Compute icon in the sidebar.
    • Select your cluster.
    • Select Advanced Options.
    • Select the JDBC/ODBC tab.

 

Copy the JDBC connection string and modify it as described below, or use this sed script to convert the JDBC connection string into an ODBC connection.

Starting with the JDBC connection string obtained from Databricks:

Example:
jdbc:spark://adb-nnnnn.0.azuredatabricks.net:443/default; transportMode=http;
ssl=1;httpPath=sql/protocolv1/o/555548385517300/0555-075538-pmvjaabc;AuthMech=3;
UID=token;PWD=<personal-access-token>

  1. Remove the UID & PWD parameters
  2. Convert the JDBC URL to host, port and database parameters, changing

jdbc:spark://<hostname>:443/default
to
host=<hostname>;Port=443;Database=default

  1. Change

transportMode=http
to
ThriftTransport=2

resulting in an ODBC connection string that can be used in Data Replicator Manager:

host=adb-nnnnn.azuredatabricks.net; Port=443;Database=default;ThriftTransport=2; ssl=1;httpPath=555548385517300/0555-075538-pmvjaabc;AuthMech=3

 

Create a destination in Data Replicator Manager, paste in the connection string as above, specify the name token as the user, and provide the token generated above as the password.

If you prefer to use an ODBC data source, extract the parameters from the connection string or from the JDBC/ODBC display and specify:

  • Host adb-895148384217555.0.azuredatabricks.net
  • Port 443
  • Mechanism: select User Name and Password from the dropdown
  • ThriftTransport: select HTTP from the dropdown
  • Next choose the SSL button and select the Enable SSL checkbox
  • Select HTTP Options and enter HTTPPath: sql/protocolv1/o/895558384217300/0520-210341-dvybvgij
  • Database default or a schema that you have created (or it can be left blank)

 

On the Advanced panel of the Destination, change the default schema on the Advanced panel from token to the name of an existing Databricks schema.

Select the checkbox Stream(Using Apply extensions), and optionally specify a directive for to be appended to the Create Table e.g.

USING DELTA LOCATION '/FileStore/tables/^f'

Specify ^~d instead of ^f if you do not want quotes around the table name. Select the Help button to see the options for this parameter.

 

Select the Default Parameters and enter the string className=com.starquest.sqdr.apply.ApplyPullProcessor as shown below

Configuring the SQDR Incremental Group

  1. Create the I/R group

Select the Parameters option of the Advanced dialog of the incremental group

and enter the following string:

className=com.starquest.sqdr.apply.ApplyPullProcessor

We recommend increasing the Row Limit to a much larger number e.g. 2 million, and verify that the DML Replication type is Batch.

Creating a Subscription

If the goal is incremental change data only, then configure the destination panel of the subscription specifying Baseline Replication Options to Append replicated rows to existing data and Null synchronization - DDL only as shown here. The destination panel should be configure as Insert using ODBC.

If you want baselines to be performed using the Sink Connector, specify Baseline Replication Options as Truncate or Append replicated rows (as desired) and Use native-loader function.

When creating subscriptions, we recommend using lower case for destination table names.

After creating the subscriptions, if Automatic Snapshot (on the advanced panel of the group) is enabled (default), the group is run automatically after creating the subscriptions. Otherwise, enable capture & apply.by running the subscriptions or the group.

Agent Parameters

Though the setup/configuration for this feature is typically done in Data Replicator Manager (Tier 3) or Databricks (Tier 4) as described in this tech doc, there are some advanced agent properties (maxStagedRows, maxStagedBlob, maxStagedClob, stableKeys) that can be used to adjust the behavior of the agent. See the Configuration Reference topic (under Reference and Troubleshooting) of the SQDR Plus help for details.

Spark Connector Status

In SQDR Plus version 6.30 or later, a new status object has been added to the Summary view of the agent to show the current state of the Scala notebook. An example of the Spark connector status object can be seen below:

See the following below for more information on each of the states of the Spark connector status:

  • Starting - Shows when the notebook has started to run
  • Active - The notebook is currently running and updates the timestamp of the Spark connector status every time the connector polls for new work.
    Note: If the Databricks cluster crashes, the connector status will not change and will continue to show Active with the last updated timestamp unless the notebook has been re-run after the Databricks cluster comes back up.
  • Failed - Shows when the notebook has encountered a failure
  • Stopped - Shows when the Scala notebook has been stopped

SSL/TLS Connectivity to Tier 2 Db2 LUW staging database

To connect to the Tier 2 Db2 LUW staging database over a secure connection, modify the Scala code, using the port defined for Db2 or StarPipes SSL communcations and adding the property extraJdbcProperties e.g.:

val jdbcSSLPort = 50448
ApplyJdbc.apply(spark, destinationName, jdbcUsername, jdbcPassword, jdbcHostname, jdbcSSLPort, continuousOperation = true, useAppend=true, unity=false, exitOnError = false, extraJdbcProperties = "sslConnection=true;")

If you are using a private Certificate Authority, you will also need to supply the CA certificate to Databricks. Use a Databricks init script to create the file /usr/local/share/ca-certificates/myca.crt and reference that file with extraJdbcProperties e.g.

val jdbcSSLPort = 50448
ApplyJdbc.apply(spark, destinationName, jdbcUsername, jdbcPassword, jdbcHostname, jdbcSSLPort, continuousOperation = true, useAppend=true, unity=false, exitOnError = false, extraJdbcProperties = "sslConnection=true;sslCertLocation=/usr/local/share/ca-certificates/myca.crt;")

See How to import a custom CA certificate and What are init scripts? for details. Note that only the first part of the sample script (creating the .crt file) is needed for our use case e.g.

#!/bin/bash
cat << 'EOF' > /usr/local/share/ca-certificates/myca.crt
-----BEGIN CERTIFICATE-----
<contents of CA certificate>
-----END CERTIFICATE-----
EOF

Reference

There are two methods - ApplyJdbc.run and ApplyJdbc.apply - that can be used to connect to the Tier 2 staging database. These are configured slightly different.

ApplyJdbc.run

Parameters that are listed without a default value are required, and must be supplied in the order shown in the definition.

Parameters that are listed in italics with a default value are optional, and can appear in any order after the required parameters. You must supply the keyword as well as the value e.g. unity=true - i.e. do not depend on the position for the optional parameters.

ApplyJdbc.run(spark, jdbcHostname, jdbcPort, jdbcDatabase,
jdbcUsername, jdbcPassword, controlDbSchema, continuousOperation=true, destinationId=null, unity=false> catalogFilter=null, useAppend=true, incrementalThreads=20, snapshotThreads=20, exitOnError=false, name=null)

ApplyJdbc.apply(spark, destinationName, jdbcUsername, jdbcPassword, jdbcHostname, jdbcPort=50000, jdbcDatabase="SQDR",controlDbSchema="SQDR", unity=false, catalogFilter=null, continuousOperation=true, useAppend=true, incrementalThreads=10, snapshotThreads=10, exitOnError=false, name=null)

ApplyJdbc.run

  • Intended to support the "Run once and exit" mode of the Scala code.
  • Supports a single Tier 2 Agent.
  • Connects directly to the Tier 2 staging database.
  • The database and schema name must be supplied (e.g. SQDRP0/SQDR). This information can be obtained from the list of Agents at the top level of SQDR Control Center.
  • (Optional) destinationId - default is null.
  • Used to minimize the cost of Tier 4 processing when low latency is not the primary requirement.

ApplyJdbc.apply

  • Intended to support “continuous” mode.
  • Supports multiple Tier 2 Agents.
  • Connects to the Tier 3 control database (typically SQDRC).
  • The database and schema names and jdbcPort are optional. Defaults are database SQDRC, schema SQDR, and port 50000.
  • destinationName is a required parameter and is used to programmatically identify the Tier 2 staging database.
  • Preferred method for ease of configuration and low latency results.
  • May be more expensive as it involves running the Tier 4 cluster continuously to achieve low latency.

Optional parameters

  • name - The default depends on the method used to run the notebook. If the method is apply, then the destination name is used. If the method is run, then Databricks is the name that's used. Use this parameter to change the name of the Spark connector that appears in the Spark connector status.
  • exitOnError - default is false. Set to true if the desired behavior for the notebook is to exit upon encountering an error.
  • unity - default is false. Set to true if using Unity Catalog (3 part naming)
  • catalogFilter - default is null. Set to the name of a schema to limit traffic to a specific schema. Requires the use of unity.
  • useAppend - default is true.
  • incrementalThreads and snapshotThreads control the maximum number of active threads and (indirectly) the amount of total memory used. Both parameters are optional and are independent of one another.

    Tuning these values can lower the resource utilization and avoid a garbage collection issue when the notebook is run on compute clusters with fewer resources:

    The defaults are incrementalThreads=20 and incrementalThreads=20 for ApplyJdbc.run and incrementalThreads=10 and incrementalThreads=10 for ApplyJdbc.apply.

    The default values for ApplyJdbc.apply are lower, because each limit applies to each unique agent defined on Tier 2 that exploits a given Databricks notebook running on Tier 4. ApplyJdbc.run supports only a single Tier 2 Agent.
  • continuousOperation - default is true. Set to false to configure the Scala job to terminate after 5 minutes of activity. This is useful if the Scala job is launched on a scheduled basis - e.g. once a day - to pick up the changes that have accumulated since the previous run to avoid incurring non-trivial charges and to optimize the MERGE operations. As of version 6.30, setting the parameter to false in the apply method will allow the Scala notebook to terminate.

Troubleshooting

Issue:

When creating a subscription with the Simba Spark ODBC driver, you receive a warning that the destination table does not exist, even when it does.

Solution: Table names in Databricks/Spark are typically lower case. Instruct SQDR to use lower case by unchecking Fold Identifiers on the Destination panel of the subscription and confirming that the destination table name is lower case.

Issue:

The Scala job fails to connect to the staging database with the error

Error opening socket to server {1} on port {2} with message: {3}. ERRORCODE=-4296, SQLSTATE=08001

Solution:

This indicates that the connection has been rejected. Verify that Db2 LUW on the Tier 2 system is listening on the port and IP address specified and that connectivity from Databricks is not being blocked by a firewall or an inactive VPN.

Issue:

Creating a new subscription fails with the error:

Service interface SQLQualifiers failed. ODBC message: SQLSTATE HY000, native error 35, [Simba][Hardy] (35) Error from server: error code: '0' error message: 'Error operating GET_CATALOGS [RequestId=8b3897c6-76c5-443b-8f48-8a47336ada4b ErrorClass=FEATURE_DISABLED] Unity Catalog is not available for feature tier STANDARD_TIER.'.

Solution:

Either confirm that the unity parameter is set to false (default) or upgrade the pricing tier of the Databricks workspace to Premium.

Hints & FAQ

Derived Columns

When using com.starquest.sqdr.apply.ApplyPullProcessor (the subject of this technical document) we recommend using an Expression (the upper left field of the Derived Column dialog) that is acceptable for Db2 LUW.

In the typical scenario of handling both baselines and incremental changes though Tier2, all derived columns are processed in the Db2 LUW staging database rather than at the Source.

Specifying different expressions in the Expression field (source database) and Staging Expression field (Db2 LUW staging database) is only applicable when baselines are obtained directly from the Source by Tier 3 i.e. when the Destination panel of the subscription specifies Insert Using ODBC rather than Use Native Loader Function.



DISCLAIMER

The information in technical documents comes without any warranty or applicability for a specific purpose. The author(s) or distributor(s) will not accept responsibility for any damage incurred directly or indirectly through use of the information contained in these documents. The instructions may need to be modified to be appropriate for the hardware and software that has been installed and configured within a particular organization.  The information in technical documents should be considered only as an example and may include information from various sources, including IBM, Microsoft, and other organizations.