Apache SPARK: PySpark DataFrames & SQL

Mannan Ul Haq
0

PySpark DataFrames

  • RDDs (Resilient Distributed Datasets) are not good at handling structured data and lack built-in optimization. The solution to this is DataFrames.
  • DataFrames are like tables in a database or in R/Python. They organize data into named columns and have a structure.
  • Both RDDs and DataFrames share features like immutability, in-memory storage, resilience, and distribution.
  • DataFrames in PySpark support both SQL queries (SELECT * from table) or expression methods (df.select()).

Expression Methods

1. Create a DataFrame from reading a CSV:

data = spark.read.csv("EmployeeData.csv", header=True, inferSchema=True)
data.show()


2. Selecting Columns:

# Find first and last name of all Employees
RDD1 = data.select(data.first_name, data.last_name)
RDD1.show()


3. Use Alias:

# Find fore and sur name of all Employees
RDD2 = data.select(data.first_name.alias("fore_name"), data.last_name.alias("sur_name"))
RDD2.show()


4. Ordering Data:

# Find all Employees ordered by salary (ascending order)
RDD3 = data.orderBy(data.salary, ascending=True)
RDD3.show()


5. Filtering Data:

# Find all male Employees
RDD4 = data.filter(data.sex == 'M')
RDD4.show()

# Find all Employees form Branch 2 and Sex F
RDD5 = data.filter((data.branch_id == 2) & (data.sex == 'F'))
RDD5.show()


6. Finding Distinct Values:

# Find out all distinct Gender
RDD6 = data.select(data.sex).distinct()
RDD6.show()


7. Counting Rows:

# Find number of Employee
RDD7 = data.count()
print("Number of Employees:", RDD7)


8. Aggregating Data - Average:

# Find average of all Employee salaries
from pyspark.sql.functions import avg
RDD8 = data.select(avg(data.salary))
RDD8.show()


9. Counting Distinct Values:

# Find number of distinct sex in employee table
RDD10 = data.select(data.sex).distinct().count()
print("Number of Distinct Sex:", RDD10)


10. Grouping Data:

# Find how many sex in Employee Table
RDD11 = data.groupBy(data.sex).count()
RDD11.show()


11. Joining Data:

# Find out total sales greater than 560000 of each with Client Name
from pyspark.sql.functions import sum

Data1 = spark.read.csv("ClientsData.csv", header = True, inferSchema = True)
Data2 = spark.read.csv("WorksOnData.csv", header = True, inferSchema = True)

rdd1 = Data1.join(Data2, Data1.client_id == Data2.client_id).select(Data1.client_name, Data2.total_sales)
rdd2 = rdd1.groupBy(Data1.client_name)
rdd3 = rdd2.agg(sum("total_sales").alias("total_sales"))
rdd4 = rdd3.filter(rdd3.total_sales > 56000)
rdd4.show()

# avg("total_sales")
# min("total_sales")
# max("total_sales")
# count()

RDD12.show()

# For left-join: df1.join(df2, df1["join_column"] == df2["join_column"], "left")
# For right-join:  df1.join(df2, df1["join_column"] == df2["join_column"], "right")


12. Use between:

filtered_df_between = df.filter(col("salary").between(60000, 80000))
filtered_df_between.show()


13. Use isin:

filtered_df_isin = df.filter(df["branch_id"].isin([1, 2]))
filtered_df_isin.show()


14. Use ~isin:

filtered_df_not_isin = df.filter(~df["branch_id"].isin([1, 2]))
filtered_df_not_isin.show()


15. Use isnull:

filtered_df_isnull = df.filter(df["super_id"].isNull())
filtered_df_isnull.show()


16. Use isnotnull:

filtered_df_isnotnull = df.filter(df["branch_id"].isNotNull())
filtered_df_isnotnull.show()


SQL Queries

SQL queries in PySpark leverage the structured data processing capabilities of Spark, enabling users to interact with large-scale datasets using familiar SQL syntax.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("SQL Queries").setMaster("local")

sc = SparkContext.getOrCreate(conf = conf)
spark = SparkSession(sc)

df = spark.read.csv("EmployeeData.csv", header = True, inferSchema = True)
df.createOrReplaceTempView("EmployeeData")

# Find all Employees
sql_cmd = "SELECT * FROM EmployeeData"
df_result = spark.sql(sql_cmd)
df_result.show()

# Find first and last name of all Employees
sql_cmd = "SELECT first_name, last_name FROM EmployeeData"
df_result = spark.sql(sql_cmd)
df_result.show()

# Find fore and sur name of all Employees
sql_cmd = "SELECT first_name AS fore_name, last_name AS sur_name FROM EmployeeData"
df_result = spark.sql(sql_cmd)
df_result.show()

# Find all Employees ordered by salary (ascending order)
sql_cmd = "SELECT * FROM EmployeeData ORDER BY salary ASC"
df_result = spark.sql(sql_cmd)
df_result.show()

# Find all male Employees
sql_cmd = "SELECT * FROM EmployeeData WHERE sex ='M'"
df_result = spark.sql(sql_cmd)
df_result.show()

# Find average of all Employee salaries
sql_cmd = "SELECT AVG(salary) FROM EmployeeData"
df_result = spark.sql(sql_cmd)
df_result.show()

df1 = spark.read.csv("ClientsData.csv", header = True, inferSchema = True)
df1.createOrReplaceTempView("ClientsData")
df2 = spark.read.csv("WorksOnData.csv", header = True, inferSchema = True)
df2.createOrReplaceTempView("WorksOnData")

# Find out total sales of each with Client Name
sql_cmd = '''SELECT ClientsData.client_name, SUM(WorksOnData.total_sales) AS Sales
             FROM ClientsData
             JOIN WorksOnData ON ClientsData.client_id = WorksOnData.client_id
             GROUP BY ClientsData.client_name'''
df_result = spark.sql(sql_cmd)
df_result.show()

sc.stop()


Post a Comment

0Comments

Post a Comment (0)

#buttons=(Accept !) #days=(20)

Our website uses cookies to enhance your experience. Check Now
Accept !