Databricks fetch data from Delta table and create ORC using pyspark readStream and writeStream

Databricks Pyspark to create a ORC file

Objective:

  • We will read the data from Delta table and create the IRC file using Spark readStream and writeStream.

    • For this blog I used the Databricks Community edition (free).

This approach can also extended to the Azure Datalake storage.

  • In order to use the ADLS Gen2, we need to mount it to the Databricks Cluster.

Pre-requisites:

Created csv file with below sample data, and uploaded to the Databricks community edition.

StudentId,Name,Subject,Mark
1,Ram,science,80
1,Ram,maths,90
1,Ram,language,85
2,Tom,science,72
2,Tom,maths,75
2,Tom,science,79
3,Kim,science,81
3,Kim,maths,92
3,Kim,language,89

Load csv data to Delta table

  • Create Database using below command
 %sql create database if not exists demo_db
  • Read the CSV file using the spark into a dataframe
  • Write data in dataframe to delta table, using delta format to specified path
dataframe = spark.read \
   .options(header='True',inferSchema='True') \
   .csv('dbfs:/FileStore/sample_data/sampleStudent.csv')

# ways to convert the dataframe into json schema file.
# this is just for information - not used in this example
schemaJson = dataframe.schema.json()
print(schemaJson)
  • Create a delta table student under demo_db database.
  • The dataframe with the data read from csv file above will be used to create the delta file.
  • Finally the table will be created with the created delta file, using create table command.
tableName='demo_db.student'
# Note: If this file or directory already exists delete using %fs rm -r '/tmp/'
savePath='/tmp/delta'
sourceType='delta'

dataframe.write \
  .format(sourceType) \
  .save(savePath)

# Create the table.
spark.sql("CREATE TABLE " + tableName + " USING DELTA LOCATION '" + savePath + "'")  

#To display the table content
display(spark.sql("SELECT * FROM " + tableName))
  • On executing above command, output will be as shown in below snapshot

image

Code to read the Delta table using readStream

  • Using the community edition with the above table then change load('/tmp/delta')
data_delta_readstream = ( 
    spark.readStream
    .format("delta")
    .load("/tmp/delta")                # This path can be the ADLS path if mounted to cluster
)

Code that creates orc file

  • Batch loader function used by the write stream to write the data
def batchLoader(df, batch_id):
  (
    df
    .repartition(1)
    .write
    .format("orc")
    .mode("append")
    .save(''/dbfs/FileStore/sample_data/student_data/')    # ADLS path can be used
  )

Code to write content to orc file using writeStream

  • We are specifying the checkpoint location, where Delta table keeps transactional information of written data.
data_delta_writestream = ( 
    data_delta_readstream
    .repartition(1)
    .writeStream
    .trigger(once=True)
    .option("checkpointLocation", "/dbfs/FileStore/sample_data/student_data/_checkpoints/student")
    .foreachBatch(batchLoader)
    .start()
)
  • Once the above write stream is executed successfully, should be able to see that 9 records updated.

image

  • The orc file created successfully under the specified path.

image

Note:

  • When creating the Delta table, I faced below issue
...from `dbfs:/tmp/delta` using Databricks Delta, but there is no transaction log present at `dbfs:/tmp/delta/_delta_log
  • This issue was due to the fact I executed the command multiple times without any changes.
  • To resolve the issue, simple delete the /tmp folder using %fs rm -r '/tmp/' command.

Tips::

Below are set of commands useful to work with the Delta tables

  • Within the notebook, if we need to use the specific database then
%sql use database demo_db
  • Since we have executed the above command, below query without the database name should render the data in the delta table.
%sql select * from student
  • To describe the database info
%sql describe database demo_db
  • To describe the table info
%sql describe table student
  • To drop the table
%sql drop table demo_db.table