AZURE DATABRICKS : DEMYSTIFYING THE USE OF SPARK3 CONNECTOR FOR AZURE COSMOS DB — LEVEL — INTERMEDIATE

Accelerate Bigdata workloads using Spark 3 Connector for Azure Cosmos DB

RK Iyer
7 min readAug 28, 2021

✐ Co-Author — Amit Damle

Photo by Valery Sysoev on Unsplash

❑ Background

One of the most pervasive computing scenarios which customers come across while solving advanced analytics, data science & blazing fast IoT problems is to quickly persist or read large amount of fast changing data from a massive parallel processing engine like Spark to a NoSql database like Cosmos DB with a guaranteed SLAs for consistency, availability, low latency, and throughput.

To adhere to this critical requirement, azure-cosmosdb-spark, the official Java based connector for Azure CosmosDB and Apache Spark was introduced. Please refer Apache Spark cosmos connector for more details. This connector allows you to easily create a lambda architecture for batch-processing, stream-processing and building a serving layer with minimum latency. This connector was supported Apache Spark version 2.2.1, 2.3.X, 2.4.X.

Now with Apache Spark 3.0 introduced in June 2020, Azure Cosmos DB Spark3 OLTAP connector, a new Cosmos DB Spark Connector for Spark 3 is available. It is Generally available(GA). In this blog, We will learn how the spark 3 connector communicates with cosmos DB, how to use spark 3 connector library along with its features & best practices.

❑ How Spark Connector communicates with Cosmos DB?

Spark Connector communication with Cosmos DB

The data flow is as follows:

➀ The connection is made from Spark master node to Cosmos DB gateway node to obtain the partition map.

Note, user only specifies Spark and Cosmos DB connections, the fact that it connects to the respective master and gateway nodes is transparent to the user.

➁ This information is provided back to the Spark master node. At this point, we should be able to parse the query to determine which partitions (and their locations) within Cosmos DB we need to access.

➂ This information is transmitted to the Spark worker nodes.

Spark worker nodes connect directly to the Cosmos DB partitions to extract the data that is needed and bring the data back to the Spark partitions within the Spark worker nodes.

Note — The important point to note is that communication between Spark and Cosmos DB is significantly faster because the data movement is between the Spark worker nodes and the Cosmos DB data nodes (partitions) compared to pyDocumentDB approach where the communication between Spark and Cosmos DB is limited to the Spark master node and Cosmos DB gateway nodes resulting in significant lower performance.

Azure Cosmos DB Spark3 OLTAP connector features

Some important features of Azure Cosmos DB Spark3 OLTAP connector are

  • Cosmos DB Spark Connector is based on Spark 3.1.x. You can use any other Spark 3.1.1 spark offering as well.
  • This connector is developed grounds up using Cosmos DB Java V4 SDK.
  • Added support for different partitioning strategies.
  • You should be able to use any language supported by Spark (PySpark, Scala, Java, etc.), or any Spark interface you are familiar with (Jupyter Notebook, Livy, etc.).
  • Currently Azure Cosmos DB Spark3 OLTP connector supports only Azure Cosmos DB Core (SQL) API.

❑ Common Data Access Patterns

❐ High volumes/frequency writes to Cosmos DB

Cosmos DB writes with Cosmos DB Spark3 connector

❐ Processing change feeds using Spark

❑ Quick Start Guide

Below steps shows how you can use Spark 3 OLTP connector for Azure Cosmos DB with Azure Databricks to ingest and read the data.

Scenario — Read from a dataset stored in an Azure Databricks workspace and store it in an Azure Cosmos DB container using a Spark job.

❐ Prerequisites

🗹 An active Azure account.

🗹 Azure Databricks Runtime 8.0 with Spark 3.1.1.

🗹 (Optional) SLF4J binding is used to associate a specific logging framework with SLF4J.

❐ Steps

  1. Creating Cosmos DB Instance and get connection details

Get the Cosmos DB URI Endpoint and the key from the Cosmos DB that we are going to read.

Get the Cosmos DB URL
Get the Primary key

2. Upload the file to blob

I have downloaded opensource green taxi trip data for the month of January from TLC Trip Record Data — TLC (nyc.gov). Please note that I have just used 1000 records. You can find the file.

The green taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts.

3. Preparing the Azure Databricks cluster

Create a two-node cluster with the Databricks runtime 8.0 (which includes Apache Spark 3.1.1 and Scala 2.12). Please refer here on instructions of how to create an Azure Databricks cluster.

4. Install the Cosmos DB Spark 3 Connector

Once you set up the cluster, next add the spark 3 connector library from the Maven repository. Click on the Libraries and then select the Maven as the Library source.

Note — Latest version (4.3.0) has been published to Maven. Please check the release history for the latest version release.

You can install the new version via Maven coordinates.

com.azure.cosmos.spark:azure-cosmos-spark_3–1_2–12:4.3.0

5. Ingest data into the Azure Cosmos DB.

i. Get credentials necessary for databricks to connect to your blob container

ii. Configure Databricks to read the file

Configure your spark session to use credentials for your blob container

storage_account_name = ‘nycdatasetwasb’
storage_account_access_key = ‘bxxxxxxxxxxxxxxxxxxxxxxxxxx==’
spark.conf.set(‘fs.azure.account.key.’ + storage_account_name + ‘.blob.core.windows.net’, storage_account_access_key)

Build the file path in the blob container and read the file as a spark data frame

blob_container = ‘nyccontainer’
filePath = “wasbs://” + blob_container + “@” + storage_account_name + “.blob.core.windows.net/green_tripdata_2020–01.csv”
df_rawInput = spark.read.format(“csv”).load(filePath, inferSchema = True, header = True)

iv. Create the database and collection using the Catalog API

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "NYCDB"
cosmosContainerName = "GreentripContainer"

spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

v. Every resource within an Azure Cosmos DB database account needs to have a unique identifier so adding an id column with unique values.

import datetime
import time
import uuid
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, LongType
# Add an id column with unique values
uuidUdf= udf(lambda : str(uuid.uuid4()),StringType())
nowUdf= udf(lambda : int(time.time() * 1000),LongType())
df_rawInput = df_rawInputWithoutLimit \
.withColumn(“id”, uuidUdf()) \
.withColumn(“insertedAt”, nowUdf())

vi. Provide the write configuration and Ingest the data in Cosmos DB. Write strategy can be “ItemAppend” to ignore conflicts or “ItemOverwrite” to overwrite. The concurrency of the write operation will be determined by the available number of Spark executors.

import uuid
import datetime
print(“Starting ingestion: “, datetime.datetime.utcnow().strftime(“%Y-%m-%d %H:%M:%S.%f”))writeCfg = {
“spark.cosmos.accountEndpoint”: cosmosEndpoint,
“spark.cosmos.accountKey”: cosmosMasterKey,
“spark.cosmos.database”: cosmosDatabaseName,
“spark.cosmos.container”: cosmosContainerName,
“spark.cosmos.write.strategy”: “ItemOverwrite”,
}
df_input_withId \
.write \
.format(“cosmos.oltp”) \
.mode(“Append”) \
.options(**writeCfg) \
.save()
print(“Finished ingestion: “, datetime.datetime.utcnow().strftime(“%Y-%m-%d %H:%M:%S.%f”))

vii. Confirm if the data is ingested in Cosmos DB

Confirm data Ingestion

Note - It is important to estimate and provision right amount of provisioned throughput/Request Units (RU/s), for your workload to optimize cost and performance. We will have a detailed blog for estimating the number of RU’s.

❑ Reference

Azure/azure-cosmosdb-spark: Apache Spark Connector for Azure Cosmos DB (github.com)

Azure Cosmos DB Apache Spark 3 OLTP Connector for SQL API release notes and resources | Microsoft Docs

azure-sdk-for-java/quick-start.md at main · Azure/azure-sdk-for-java (github.com)

azure-sdk-for-java/01_Batch.ipynb at main · Azure/azure-sdk-for-java (github.com)

❑ Acknowledgement

I would like to thank Fabian Meiswinkel & Manish Sharma for their guidance.

I have used a simple example to demonstrate how to use the Azure Cosmos DB Spark 3 OLTP connector & hope this blog helped you in understanding the basics of Spark3 connector & getting started.

We will have next blog covering how to perform real time stream processing of your Cosmos DB changes to Databricks with Spark 3.

Till then, Happy Learning!!!

Please Note — All opinions expressed here are my personal views and not of my employer.

Thought of the moment-

When you lose a PARENT you gain a GOD….

--

--

RK Iyer

Architect@Microsoft, Technology Evangelist, Sports Enthusiast! All opinions here are my personal thoughts and not my employers.