Databricks fetch data from Delta table and create ORC using pyspark readStream and writeStream
Databricks Pyspark to create a ORC file
Objective:
We will read the data from Delta table and create the IRC file using Spark
readStream
andwriteStream
.- For this blog I used the Databricks Community edition (free).
This approach can also extended to the Azure Datalake storage.
- In order to use the ADLS Gen2, we need to mount it to the Databricks Cluster.
Pre-requisites:
Created csv file with below sample data, and uploaded to the Databricks community edition.
StudentId,Name,Subject,Mark
1,Ram,science,80
1,Ram,maths,90
1,Ram,language,85
2,Tom,science,72
2,Tom,maths,75
2,Tom,science,79
3,Kim,science,81
3,Kim,maths,92
3,Kim,language,89
Load csv data to Delta table
- Create Database using below command
%sql create database if not exists demo_db
- Read the CSV file using the spark into a dataframe
- Write data in dataframe to delta table, using delta format to specified path
dataframe = spark.read \
.options(header='True',inferSchema='True') \
.csv('dbfs:/FileStore/sample_data/sampleStudent.csv')
# ways to convert the dataframe into json schema file.
# this is just for information - not used in this example
schemaJson = dataframe.schema.json()
print(schemaJson)
- Create a delta table student under demo_db database.
- The dataframe with the data read from csv file above will be used to create the delta file.
- Finally the table will be created with the created delta file, using
create table
command.
tableName='demo_db.student'
# Note: If this file or directory already exists delete using %fs rm -r '/tmp/'
savePath='/tmp/delta'
sourceType='delta'
dataframe.write \
.format(sourceType) \
.save(savePath)
# Create the table.
spark.sql("CREATE TABLE " + tableName + " USING DELTA LOCATION '" + savePath + "'")
#To display the table content
display(spark.sql("SELECT * FROM " + tableName))
- On executing above command, output will be as shown in below snapshot
Code to read the Delta table using readStream
- Using the community edition with the above table then change
load('/tmp/delta')
data_delta_readstream = (
spark.readStream
.format("delta")
.load("/tmp/delta") # This path can be the ADLS path if mounted to cluster
)
Code that creates orc file
- Batch loader function used by the write stream to write the data
def batchLoader(df, batch_id):
(
df
.repartition(1)
.write
.format("orc")
.mode("append")
.save(''/dbfs/FileStore/sample_data/student_data/') # ADLS path can be used
)
Code to write content to orc file using writeStream
- We are specifying the checkpoint location, where Delta table keeps transactional information of written data.
data_delta_writestream = (
data_delta_readstream
.repartition(1)
.writeStream
.trigger(once=True)
.option("checkpointLocation", "/dbfs/FileStore/sample_data/student_data/_checkpoints/student")
.foreachBatch(batchLoader)
.start()
)
- Once the above write stream is executed successfully, should be able to see that 9 records updated.
- The orc file created successfully under the specified path.
Note:
- When creating the Delta table, I faced below issue
...from `dbfs:/tmp/delta` using Databricks Delta, but there is no transaction log present at `dbfs:/tmp/delta/_delta_log
- This issue was due to the fact I executed the command multiple times without any changes.
- To resolve the issue, simple delete the
/tmp
folder using%fs rm -r '/tmp/'
command.
Tips::
Below are set of commands useful to work with the Delta tables
- Within the notebook, if we need to use the specific database then
%sql use database demo_db
- Since we have executed the above command, below query without the database name should render the data in the delta table.
%sql select * from student
- To describe the database info
%sql describe database demo_db
- To describe the table info
%sql describe table student
- To drop the table
%sql drop table demo_db.table