What is RDD?
RDD stands for Resilient Distributed Dataset. It is a fundamental abstraction in Apache Spark that represents a collection of immutable, partitioned data distributed across a cluster of machines. Here's a breakdown of its key characteristics:
- Resilient: RDDs are resilient because they can automatically recover from failures. Spark keeps track of the lineage of transformations applied to an RDD, so in case of a failure, it can reconstruct lost partitions by reapplying those transformations.
- Distributed: RDDs distribute data across multiple nodes in a cluster, allowing parallel processing of data. Each RDD is divided into partitions, with each partition being processed by a separate task on different nodes in the cluster.
- Dataset: RDD represents a collection of data elements that can be operated on in parallel.
PySpark
PySpark is the Python API for Apache Spark, an open-source, distributed computing system designed for large-scale data processing. PySpark allows Python developers to harness the power of Spark using the familiar syntax and libraries of Python.
Creating RDD:
There are two common ways to create RDDs: using the parallelize()
method and using the textFile()
method.
1. parallelize() Method
The parallelize() method is used to create an RDD from a collection of objects that exist on the driver node. This method takes a collection (such as a list or array) and distributes the data across the cluster to form an RDD.
Here's an example of using the parallelize() method in Python with PySpark:
# Create a collection of data
data = [1, 2, 3, 4, 5]
# Parallelize the collection to create an RDD
rdd = sc.parallelize(data)
# Collect the results back to the driver program
result = rdd.collect()
print(result) # Output: [1, 2, 3, 4, 5]
2. textFile() Method
The textFile() method is used to read data from a text file on the local file system, HDFS, or any Hadoop-supported file system URI. This method returns an RDD of strings, with each element being a line from the text file.
Here's an example of using the textFile() method in Python with PySpark:
# Read a text file into an RDD
rdd = sc.textFile("file:///path/to/your/textfile.txt")
# Collect the results back to the driver program
lines = rdd.collect()
for line in lines:
print(line)
# Save the RDD as a text file
rdd.saveAsTextFile("file:///path/to/output")
Data partitioning:
Data partitioning in Spark involves dividing data across partitions to optimize performance and reduce network traffic. Spark automatically infers default values based on cluster size, but users can specify the number of partitions.
1. parallelize() Method
numRDD = sc.parallelize(range(10), minPartitions = 6)
2. textFile() Method
fileRDD = sc.textFile("README.md", minPartitions = 6)
The number of partitions in an RDD can be found by using getNumPartitions() method.
RDDs support two types of operations:
- Transformations: These are operations that produce a new RDD from an existing RDD. Transformations are lazy, meaning they do not compute their results immediately. Instead, they build up a directed acyclic graph (DAG) representing the computation, which is executed only when an action is called.
- Actions: These are operations that trigger the execution of the DAG and return a result to the driver program or write data to external storage.
Transformations
1. Map:
data = sc.parallelize(["value1,value2,value3", "value4,value5,value6", ...])
each_line_data = data.map(lambda line: line.split(","))
# Output:
# each_line_data = [['value1', 'value2', 'value3'], ['value4', 'value5', 'value6'], ...]
Explanation: In this example, the RDD data is created using sc.parallelize() with a list of strings representing lines of values separated by commas. Then, the map transformation is utilized to split each line within the RDD data based on comma separation. The resultant RDD, each_line_data, comprises elements represented as lists, where each list contains values separated by commas.
2. FlatMap:
data = sc.parallelize(["value1,value2,value3", "value4,value5,value6", ...])
flat_mapped_data = data.flatMap(lambda line: line.split(","))
# Output:
# flat_mapped_data = ['value1', 'value2', 'value3', 'value4', 'value5', 'value6', ...]
3. Filter:
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
filtered_data = data.filter(lambda x: x % 2 == 0)
# Output:
# filtered_data = [2, 4, 6, 8, 10]
4. ReduceByKey:
data = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)])
reduced_data = data.reduceByKey(lambda x, y: x + y)
# Output:
# reduced_data = [("a", 4), ("b", 6), ("c", 5)]
Explanation: In this example, the RDD data is created using sc.parallelize() with a list of key-value pairs. The reduceByKey transformation is then applied to combine values with the same key. The lambda function lambda x, y: x + y is used to sum up the values associated with each key. The resultant RDD, reduced_data, contains unique keys with their associated aggregated values.
# To Get Max Value
max_data = data.reduceByKey(lambda x, y: max(x, y))
# To Get Min Value
min_data = data.reduceByKey(lambda x, y: min(x, y))
5. SortBy:
data = sc.parallelize([("b", 2), ("a", 1), ("c", 3)])
sorted_data = data.sortBy(lambda x: x[0], ascending = True)
# sorted_data = [("a", 1), ("b", 2), ("c", 3)]
Explanation: In this example, the RDD data is created using sc.parallelize() with a list of key-value pairs. The sortBy transformation is then applied to sort the elements of the RDD based on the key, which is accessed through the lambda function lambda x: x[0]. This sorts the RDD lexicographically based on the keys. The resultant RDD, sorted_data, contains elements sorted by their keys.
6. Union:
RDD3 = RDD1.union(RDD2)
7. Intersection:
common_elements_RDD = RDD1.intersection(RDD2)
8. Subtract:
unique_elements_RDD = RDD1.subtract(RDD2)
9. Distinct:
distinct_elements_RDD = input_RDD.distinct()
10. MapValues:
data = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
mapped_data = data.mapValues(lambda x: x * 2)
# Output:
# mapped_data = [("a", 2), ("b", 4), ("c", 6)]
11. GroupByKey:
data = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)])
grouped_data = data.groupByKey()
# Output:
# grouped_data = [("a", [1, 3]), ("b", [2, 4]), ("c", [5])]
12. Join Statements:
Join:
Purpose: Combines elements from two RDDs based on matching keys.
rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sc.parallelize([(1, 'x'), (3, 'y')])
joined_rdd = rdd1.join(rdd2)
# Output:
# [(1, ('a', 'x'))]
Left Join:
rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sc.parallelize([(1, 'x'), (3, 'y')])
left_joined_rdd = rdd1.leftOuterJoin(rdd2)
# Output:
# [(1, ('a', 'x')), (2, ('b', None))]
Right Join:
rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sc.parallelize([(1, 'x'), (3, 'y')])
right_joined_rdd = rdd1.rightOuterJoin(rdd2)
# Output:
# [(1, ('a', 'x')), (None, ('b', 'y'))]
13. FlatMapValues:
new_rdd = rdd.flatMapValues(lambda value: iterable_function(value))
Actions
1. take:
Purpose: Retrieves the first
n
elements from an RDD and returns them as a list.taken_elements = data.take(5)
2. count:
Purpose: The
count
action returns the total number of elements in an RDD.total_count = input_RDD.count()
3. countByValue:
Purpose: The countByValue
action in Apache Spark counts the occurrences of each unique value in an RDD and returns the result as a dictionary where the keys are the distinct values and the values are their respective counts.
Example:
data = sc.parallelize([1, 2, 3, 1, 2, 1, 3, 4, 5])
value_counts = data.countByValue()
# Output:
# value_counts = {1: 3, 2: 2, 3: 2, 4: 1, 5: 1}
Here's an example of SPARK CODE in Python using PySpark
You have a dataset transaction.csv:
Find the total transaction amount for each customer for the year 2023.
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName("TASK 1").setMaster("local")
sc = SparkContext.getOrCreate(conf = conf)
spark = SparkSession(sc)
Data = sc.textFile("Task1_Input.csv")
each_line_Data = Data.map(lambda line: line.split(","))
Data_2023 = each_line_Data.filter(lambda row: "2023" in row[2])
Mapped_Data = Data_2023.map(lambda row: (int(row[0]), int(row[1])))
Total_Transactions_2023 = Mapped_Data.reduceByKey(lambda x, y: x + y)
DataFrame = spark.createDataFrame(Total_Transactions_2023, ["Customer_Id", "Total_Amount"])
DataFrame.show()
sc.stop()