Working with Dataframes (PySpark) in Databricks
Table of contents
- 1. Creating Dataframe programmatically with data and schema for the data
- 2. To use the Column Name header directly as Schema
- 3. Directly reading from the file like CSV, Parquet, Orc, etc.
- 4. Converting JSON to dataframe
- 4.1 Converting the Dataframe into Temp Table
- 5. To add rows with null/None values in Dataframe
1. Creating Dataframe programmatically with data and schema for the data
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
dataValues = [("Tim","100","Sales",4050),
("Don","102","Accounts",5500),
("Rob","103","Security",4500)
]
schema = StructType([ \
StructField("name",StringType(),True), \
StructField("id", StringType(), True), \
StructField("department", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = spark.createDataFrame(data=dataValues,schema=schema)
df.printSchema()
df.show(truncate=False)
- Output
root
|-- name: string (nullable = true)
|-- id: string (nullable = true)
|-- department: string (nullable = true)
|-- salary: integer (nullable = true)
+----+---+----------+------+
|name|id |department|salary|
+----+---+----------+------+
|Tim |100|Sales |4050 |
|Don |102|Accounts |5500 |
|Rob |103|Security |4500 |
+----+---+----------+------+
2. To use the Column Name header directly as Schema
- Note: Doing this data type of the column is inferred as String.
data = [("1","Tom","Pass"),
("2","Jim","Pass"),
("3","Will","Fail")
]
columns = ["StudentId","Name","Grade"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
- Output:
root
|-- StudentId: string (nullable = true)
|-- Name: string (nullable = true)
|-- Grade: string (nullable = true)
+---------+----+-----+
|StudentId|Name|Grade|
+---------+----+-----+
|1 |Tom |Pass |
|2 |Jim |Pass |
|3 |Will|Fail |
+---------+----+-----+
3. Directly reading from the file like CSV, Parquet, Orc, etc.
- Note: Below example reads a CSV file, that was uploaded to the Databricks community edition.
employee_df=spark.read.csv("/FileStore/sample_data/sampleEmployee.csv",header="true",inferSchema="true")
employee_df.show()
- Output:
+----------+----+----------+-------+
|EmployeeId|Name|Department| Salary|
+----------+----+----------+-------+
| 1| Tim| Accounts|10000.0|
| 2| Tom| Sales|12000.0|
| 3| Bob| IT|15000.0|
| 4| Rob| null| 100.0|
| 5|null| Security| 3000.0|
+----------+----+----------+-------+
4. Converting JSON to dataframe
- The sample JSON is passed as string, not read from the file.
- The schema for the Dataframe is defined using pyspark.sql.functions
- The Datafrom the JSON is converted to specific type, using custom function
convertJsonToDataFrameRow()
- Finally pass converted DataframeRow and schema to create the Dataframe.
Additionally this code example also shows,
- Schema of array types, and Object types
- How to use timestamp
from pyspark.sql import SparkSession, functions as F, types as T
from datetime import datetime
import json as json
jsonInput = """
{
"employeeName":"dummy user1",
"department" : ["Finance","Accounts"],
"userName":"myusername",
"joinDate": "2018-09-15T19:55:00.000+0000",
"addressInfo":{
"City": "Seattle"
},
"isActive" : true,
"employeeId": 300
}
"""
#read as json using json lib
inputJson= json.loads(jsonInput)
# For debugging the json
print(inputJson)
# for debugging purpose
dateInfo = inputJson.get('joinDate')
print(f"date: {dateInfo}")
# Define the schema for json data
inputSchema = T.StructType([
T.StructField('name', T.StringType(), False), # last argument is nullable or not
T.StructField('department', T.ArrayType(T.StringType(),False),True),
T.StructField('loginId',T.StringType(), False),
T.StructField('joiningDate',T.TimestampType(),False),
T.StructField('address',T.MapType(T.StringType(),T.StringType(),True), False),
T.StructField('active',T.BooleanType(),False),
T.StructField('id',T.IntegerType(),False)
])
# simply return the date format with timestamp
def getdateFormat():
return "%Y-%m-%dT%H:%M:%S.%f%z"
addressDictionary= inputJson.get('addressInfo')
print(addressDictionary)
inputHasAddress = False
if( addressInfo in addressDictionary for addressInfo in ('city','streetName')):
inputHasAddress = True
print(f"contians address in input? - {inputHasAddress}")
# Lets convert the format and set the key name from the input
def convertJsonToDataFrameRow():
return {
'name': inputJson.get('employeeName'),
'department' : inputJson.get('department'),
'loginId' : inputJson.get('userName'),
'joiningDate': datetime.strptime(inputJson.get('joinDate'),getdateFormat()),
'active' : inputJson.get('isActive'),
'address' : inputJson.get('addressInfo'),
'id':inputJson.get('employeeId')
}
# Method to return provided address or defaul one
def fetchAddress(isDefault):
if isDefault == True:
return inputJson.get('addressInfo')
else:
return {
"streetName":"",
"city": ""
}
# convert to the dataframe and use it for future
inputDF = spark.createDataFrame([convertJsonToDataFrameRow()],inputSchema);
# display(inputDF)
inputDF.show()
- Output:
{'employeeName': 'dummy user1', 'department': ['Finance', 'Accounts'], 'userName': 'myusername', 'joinDate': '2018-09-15T19:55:00.000+0000', 'addressInfo': {'City': 'Seattle'}, 'isActive': True, 'employeeId': 300}
date: 2018-09-15T19:55:00.000+0000
{'City': 'Seattle'}
contians address in input? - True
+-----------+-------------------+----------+-------------------+-----------------+------+---+
| name| department| loginId| joiningDate| address|active| id|
+-----------+-------------------+----------+-------------------+-----------------+------+---+
|dummy user1|[Finance, Accounts]|myusername|2018-09-15 19:55:00|{City -> Seattle}| true|300|
+-----------+-------------------+----------+-------------------+-----------------+------+---+
4.1 Converting the Dataframe into Temp Table
Using createGlobalTempView("tablename")
we can convert dataframe to temp tables.
Below is an example where, we create a temp table from the dataframe and use spark.sql()
to create a dataframe by querying the temp table.
# converting the Dataframe to a temp table
TABLENAME="employee"
try:
inputDF.createGlobalTempView(TABLENAME)
tableDF = spark.sql(f"SELECT * FROM global_temp.{TABLENAME}")
display(tableDF)
except Exception as e:
print (f"Exception occurred: {e}")
raise e
finally:
spark.catalog.dropGlobalTempView(TABLENAME)
- Output
5. To add rows with null/None values in Dataframe
from pyspark.sql.functions import lit,col,when
# Define columns to add additional data set directly to the existing data frame
# Note, In this case I didn't want to update the CSV file,
# using spark function to update the null
# it is not easy to add a null to the data frame,
# I am using '' (blank) and converting those values to None/null
columns = ['EmployeeId','Name','Department','Salary']
# The Salary field is directly as None, if we are performing union operation
# for dataframe where the Salary is double, then when we create a dataframe
# we need to provide at least one double as in below case we used 20.0
data = [(6,'','',None), (7,'','',20.0)]
newEmployeeRow_df = spark.createDataFrame(data, columns)
# in each row update the blank value with None
newEmployeeRow_df=newEmployeeRow_df.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in newEmployeeRow_df.columns])
newEmployeeRow_df.show()
Note:
# using below will throw error message
data = [(6,'','',None), (7,'','',None)]
ValueError: Some of types cannot be determined after inferring
- Output:
+----------+----+----------+------+
|EmployeeId|Name|Department|Salary|
+----------+----+----------+------+
| 6|null| null| null|
| 7|null| null| 20.0|
+----------+----+----------+------+