PySpark DataFrames
- RDDs (Resilient Distributed Datasets) are not good at handling structured data and lack built-in optimization. The solution to this is DataFrames.
- DataFrames are like tables in a database or in R/Python. They organize data into named columns and have a structure.
- Both RDDs and DataFrames share features like immutability, in-memory storage, resilience, and distribution.
- DataFrames in PySpark support both SQL queries (SELECT * from table) or expression methods (df.select()).
Expression Methods
1. Create a DataFrame from reading a CSV:
data = spark.read.csv("EmployeeData.csv", header=True, inferSchema=True)
data.show()
2. Selecting Columns:
# Find first and last name of all Employees
RDD1 = data.select(data.first_name, data.last_name)
RDD1.show()
3. Use Alias:
# Find fore and sur name of all Employees
RDD2 = data.select(data.first_name.alias("fore_name"), data.last_name.alias("sur_name"))
RDD2.show()
4. Ordering Data:
# Find all Employees ordered by salary (ascending order)
RDD3 = data.orderBy(data.salary, ascending=True)
RDD3.show()
5. Filtering Data:
# Find all male Employees
RDD4 = data.filter(data.sex == 'M')
RDD4.show()
# Find all Employees form Branch 2 and Sex F
RDD5 = data.filter((data.branch_id == 2) & (data.sex == 'F'))
RDD5.show()
6. Finding Distinct Values:
# Find out all distinct Gender
RDD6 = data.select(data.sex).distinct()
RDD6.show()
7. Counting Rows:
# Find number of Employee
RDD7 = data.count()
print("Number of Employees:", RDD7)
8. Aggregating Data - Average:
# Find average of all Employee salaries
from pyspark.sql.functions import avg
RDD8 = data.select(avg(data.salary))
RDD8.show()
9. Counting Distinct Values:
# Find number of distinct sex in employee table
RDD10 = data.select(data.sex).distinct().count()
print("Number of Distinct Sex:", RDD10)
10. Grouping Data:
# Find how many sex in Employee Table
RDD11 = data.groupBy(data.sex).count()
RDD11.show()
11. Joining Data:
# Find out total sales greater than 560000 of each with Client Name
from pyspark.sql.functions import sum
Data1 = spark.read.csv("ClientsData.csv", header = True, inferSchema = True)
Data2 = spark.read.csv("WorksOnData.csv", header = True, inferSchema = True)
rdd1 = Data1.join(Data2, Data1.client_id == Data2.client_id).select(Data1.client_name, Data2.total_sales)
rdd2 = rdd1.groupBy(Data1.client_name)
rdd3 = rdd2.agg(sum("total_sales").alias("total_sales"))
rdd4 = rdd3.filter(rdd3.total_sales > 56000)
rdd4.show()
# avg("total_sales")
# min("total_sales")
# max("total_sales")
# count()
RDD12.show()
# For left-join: df1.join(df2, df1["join_column"] == df2["join_column"], "left")
# For right-join: df1.join(df2, df1["join_column"] == df2["join_column"], "right")
12. Use between:
filtered_df_between = df.filter(col("salary").between(60000, 80000))
filtered_df_between.show()
13. Use isin:
filtered_df_isin = df.filter(df["branch_id"].isin([1, 2]))
filtered_df_isin.show()
14. Use ~isin:
filtered_df_not_isin = df.filter(~df["branch_id"].isin([1, 2]))
filtered_df_not_isin.show()
15. Use isnull:
filtered_df_isnull = df.filter(df["super_id"].isNull())
filtered_df_isnull.show()
16. Use isnotnull:
filtered_df_isnotnull = df.filter(df["branch_id"].isNotNull())
filtered_df_isnotnull.show()
SQL Queries
SQL queries in PySpark leverage the structured data processing capabilities of Spark, enabling users to interact with large-scale datasets using familiar SQL syntax.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName("SQL Queries").setMaster("local")
sc = SparkContext.getOrCreate(conf = conf)
spark = SparkSession(sc)
df = spark.read.csv("EmployeeData.csv", header = True, inferSchema = True)
df.createOrReplaceTempView("EmployeeData")
# Find all Employees
sql_cmd = "SELECT * FROM EmployeeData"
df_result = spark.sql(sql_cmd)
df_result.show()
# Find first and last name of all Employees
sql_cmd = "SELECT first_name, last_name FROM EmployeeData"
df_result = spark.sql(sql_cmd)
df_result.show()
# Find fore and sur name of all Employees
sql_cmd = "SELECT first_name AS fore_name, last_name AS sur_name FROM EmployeeData"
df_result = spark.sql(sql_cmd)
df_result.show()
# Find all Employees ordered by salary (ascending order)
sql_cmd = "SELECT * FROM EmployeeData ORDER BY salary ASC"
df_result = spark.sql(sql_cmd)
df_result.show()
# Find all male Employees
sql_cmd = "SELECT * FROM EmployeeData WHERE sex ='M'"
df_result = spark.sql(sql_cmd)
df_result.show()
# Find average of all Employee salaries
sql_cmd = "SELECT AVG(salary) FROM EmployeeData"
df_result = spark.sql(sql_cmd)
df_result.show()
df1 = spark.read.csv("ClientsData.csv", header = True, inferSchema = True)
df1.createOrReplaceTempView("ClientsData")
df2 = spark.read.csv("WorksOnData.csv", header = True, inferSchema = True)
df2.createOrReplaceTempView("WorksOnData")
# Find out total sales of each with Client Name
sql_cmd = '''SELECT ClientsData.client_name, SUM(WorksOnData.total_sales) AS Sales
FROM ClientsData
JOIN WorksOnData ON ClientsData.client_id = WorksOnData.client_id
GROUP BY ClientsData.client_name'''
df_result = spark.sql(sql_cmd)
df_result.show()
sc.stop()