Working with Dataframes (PySpark) in Databricks

1. Creating Dataframe programmatically with data and schema for the data

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
dataValues = [("Tim","100","Sales",4050),
    ("Don","102","Accounts",5500),
    ("Rob","103","Security",4500)
  ]

schema = StructType([ \
    StructField("name",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("department", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])

df = spark.createDataFrame(data=dataValues,schema=schema)
df.printSchema()
df.show(truncate=False)
  • Output
root
 |-- name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)

+----+---+----------+------+
|name|id |department|salary|
+----+---+----------+------+
|Tim |100|Sales     |4050  |
|Don |102|Accounts  |5500  |
|Rob |103|Security  |4500  |
+----+---+----------+------+

2. To use the Column Name header directly as Schema

  • Note: Doing this data type of the column is inferred as String.
data = [("1","Tom","Pass"),
    ("2","Jim","Pass"),
    ("3","Will","Fail")
  ]
columns = ["StudentId","Name","Grade"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
  • Output:
root
 |-- StudentId: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Grade: string (nullable = true)

+---------+----+-----+
|StudentId|Name|Grade|
+---------+----+-----+
|1        |Tom |Pass |
|2        |Jim |Pass |
|3        |Will|Fail |
+---------+----+-----+

3. Directly reading from the file like CSV, Parquet, Orc, etc.

  • Note: Below example reads a CSV file, that was uploaded to the Databricks community edition.
employee_df=spark.read.csv("/FileStore/sample_data/sampleEmployee.csv",header="true",inferSchema="true")
employee_df.show()
  • Output:
+----------+----+----------+-------+
|EmployeeId|Name|Department| Salary|
+----------+----+----------+-------+
|         1| Tim|  Accounts|10000.0|
|         2| Tom|     Sales|12000.0|
|         3| Bob|        IT|15000.0|
|         4| Rob|      null|  100.0|
|         5|null|  Security| 3000.0|
+----------+----+----------+-------+

4. Converting JSON to dataframe

  • The sample JSON is passed as string, not read from the file.
  • The schema for the Dataframe is defined using pyspark.sql.functions
  • The Datafrom the JSON is converted to specific type, using custom function convertJsonToDataFrameRow()
  • Finally pass converted DataframeRow and schema to create the Dataframe.

Additionally this code example also shows,

  • Schema of array types, and Object types
  • How to use timestamp
from pyspark.sql import SparkSession, functions as F, types as T
from datetime import datetime
import json as json

jsonInput = """
{
  "employeeName":"dummy user1",
  "department" : ["Finance","Accounts"],
  "userName":"myusername",
  "joinDate": "2018-09-15T19:55:00.000+0000",
  "addressInfo":{ 
     "City": "Seattle"
  },
  "isActive" : true,
  "employeeId": 300
}
"""

#read as json using json lib
inputJson= json.loads(jsonInput)

# For debugging the json
print(inputJson)

# for debugging purpose
dateInfo = inputJson.get('joinDate')
print(f"date: {dateInfo}")

# Define the schema for json data
inputSchema = T.StructType([
  T.StructField('name', T.StringType(), False), # last argument is nullable or not
  T.StructField('department', T.ArrayType(T.StringType(),False),True),
  T.StructField('loginId',T.StringType(), False),
  T.StructField('joiningDate',T.TimestampType(),False),
  T.StructField('address',T.MapType(T.StringType(),T.StringType(),True), False),
  T.StructField('active',T.BooleanType(),False),
  T.StructField('id',T.IntegerType(),False)
])

# simply return the date format with timestamp
def getdateFormat():
  return "%Y-%m-%dT%H:%M:%S.%f%z"

addressDictionary= inputJson.get('addressInfo')
print(addressDictionary)
inputHasAddress = False
if( addressInfo in addressDictionary for addressInfo in ('city','streetName')):
  inputHasAddress = True

print(f"contians address in input? - {inputHasAddress}")

# Lets convert the format and set the key name from the input
def convertJsonToDataFrameRow():
  return {
    'name': inputJson.get('employeeName'),
    'department' : inputJson.get('department'),
    'loginId' : inputJson.get('userName'),
    'joiningDate': datetime.strptime(inputJson.get('joinDate'),getdateFormat()),
    'active' : inputJson.get('isActive'),
    'address' : inputJson.get('addressInfo'),
    'id':inputJson.get('employeeId')
  }

# Method to return provided address or defaul one
def fetchAddress(isDefault):
  if isDefault == True:
    return inputJson.get('addressInfo')
  else:
    return {
     "streetName":"",
     "city": ""
   }

# convert to the dataframe and use it for future
inputDF = spark.createDataFrame([convertJsonToDataFrameRow()],inputSchema);
# display(inputDF)
inputDF.show()
  • Output:
{'employeeName': 'dummy user1', 'department': ['Finance', 'Accounts'], 'userName': 'myusername', 'joinDate': '2018-09-15T19:55:00.000+0000', 'addressInfo': {'City': 'Seattle'}, 'isActive': True, 'employeeId': 300}
date: 2018-09-15T19:55:00.000+0000
{'City': 'Seattle'}
contians address in input? - True
+-----------+-------------------+----------+-------------------+-----------------+------+---+
|       name|         department|   loginId|        joiningDate|          address|active| id|
+-----------+-------------------+----------+-------------------+-----------------+------+---+
|dummy user1|[Finance, Accounts]|myusername|2018-09-15 19:55:00|{City -> Seattle}|  true|300|
+-----------+-------------------+----------+-------------------+-----------------+------+---+

4.1 Converting the Dataframe into Temp Table

Using createGlobalTempView("tablename") we can convert dataframe to temp tables.

Below is an example where, we create a temp table from the dataframe and use spark.sql() to create a dataframe by querying the temp table.

# converting the Dataframe to a temp table 
TABLENAME="employee"
try:
  inputDF.createGlobalTempView(TABLENAME)
  tableDF = spark.sql(f"SELECT * FROM global_temp.{TABLENAME}")
  display(tableDF)
except Exception as e:
  print (f"Exception occurred: {e}")
  raise e
finally:
  spark.catalog.dropGlobalTempView(TABLENAME)
  • Output

image.png

5. To add rows with null/None values in Dataframe

from pyspark.sql.functions import lit,col,when

# Define columns to add additional data set directly to the existing data frame
# Note, In this case I didn't want to update the CSV file, 
# using spark function to update the null
# it is not  easy to add a null to the data frame, 
# I am using '' (blank) and converting those values to None/null

columns = ['EmployeeId','Name','Department','Salary']

# The Salary field is directly as None, if we are performing union operation 
# for dataframe where the Salary is double, then when we create a dataframe
# we need to provide at least one double as in below case we used 20.0
data = [(6,'','',None), (7,'','',20.0)]   

newEmployeeRow_df = spark.createDataFrame(data, columns)

# in each row update the blank value with None
newEmployeeRow_df=newEmployeeRow_df.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in newEmployeeRow_df.columns])

newEmployeeRow_df.show()

Note:

# using below will throw error message
data = [(6,'','',None), (7,'','',None)]

ValueError: Some of types cannot be determined after inferring
  • Output:
+----------+----+----------+------+
|EmployeeId|Name|Department|Salary|
+----------+----+----------+------+
|         6|null|      null|  null|
|         7|null|      null|  20.0|
+----------+----+----------+------+