Databricks to read from ORC file and upsert data to Delta table using PySpark readStream and writeStream

Objective:

  • With the orc file created from the Delta table in the post, now we will read that orc file and upsert the Delta table score (which has the same schema as the student table mentioned in that post).

Initial setup

Create the database

%sql create database if not exists demo_db

Load the sample CSV content to Databricks cluster

  • Create a csv file using below sample content
StudentId,Name,Subject,Mark
1,Ram,science,80
1,Ram,maths,90
1,Ram,language,85
2,Tom,science,72
2,Tom,maths,75
2,Tom,science,79
3,Kim,science,81
3,Kim,maths,92
3,Kim,language,89
  • Load the data to Databricks cluster workspace

image

Read the data from csv using spark and create the table

dataframe = spark.read.options(header='True',inferSchema='True').csv('dbfs:/FileStore/sample_data/sampleStudent.csv')

Write the data to delta file and create a table using the delta file

  • Below creates a table score and loads the data
tableName='demo_db.score'
savePath='/tmp/demo_score'
sourceType='delta'

dataframe.write \
  .format(sourceType) \
  .save(savePath)

# Create the table.
spark.sql("CREATE TABLE " + tableName + " USING DELTA LOCATION '" + savePath + "'")  
display(spark.sql("SELECT * FROM " + tableName))

Delete the loaded content to the score table

  • The content of the table will be deleted and start with empty database.
%sql delete from demo_db.score

image

As mentioned in this post the Delta table are loaded to the student table, and orc file is created in a specific location. We will use the same orc file to load the score table(which has the same schema as the student table).

Code to read from the Delta table using readStream

# schema defined, if we have the json version we can use it
rdSchema = ("StudentId int, " +
            "Name string, " +
            "Subject string, " +
            "Mark int")

data_readstream = ( 
    spark.readStream
     .schema(rdSchema)
     .format("orc")
     .load('/dbfs/FileStore/sample_data/student_data/')  #path to the orc file
)

Code defines the batch loader to merge dataframe and upsert delta table score

def upsertDataUsingBatchLoader(df, batch_id): 
  #datapartition = 'studentid_p'
  data_msg_df = (df)   # we can use the df dataframe directly instead of use another variable.

  #display(data_msg_df)

  ## create a temp  table in Databricks spark
  data_msg_df.createOrReplaceTempView("temp_student_tbl")

  # use the sparksession to merge the table
  data_msg_df._jdf.sparkSession().sql("""
    MERGE INTO demo_db.score mdt
    USING temp_student_tbl tst
    ON mdt.StudentId = tst.StudentId
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *                     
          """)

Code to write the information to Delta table using writeStream

data_writestream = ( 
    data_readstream
    .repartition(1)
    .writeStream
    .trigger(once=True)
    .format("delta")
    .option("checkpointLocation", "/dbfs/FileStore/sample_data/update_tbl/")
    .foreachBatch(upsertDataUsingBatchLoader)    #Pass in the funtion that uses the batch loader
    .start()
  )

Output

  • After executing the above write stream, the output will be more like in below snapshot

image

Output

  • Finally the score table will be updated with the data.

image