Apache SPARK: Programming in PySpark RDD’s

Mannan Ul Haq
0

What is RDD?

RDD stands for Resilient Distributed Dataset. It is a fundamental abstraction in Apache Spark that represents a collection of immutable, partitioned data distributed across a cluster of machines. Here's a breakdown of its key characteristics:

  1. Resilient: RDDs are resilient because they can automatically recover from failures. Spark keeps track of the lineage of transformations applied to an RDD, so in case of a failure, it can reconstruct lost partitions by reapplying those transformations.
  2. Distributed: RDDs distribute data across multiple nodes in a cluster, allowing parallel processing of data. Each RDD is divided into partitions, with each partition being processed by a separate task on different nodes in the cluster.
  3. Dataset: RDD represents a collection of data elements that can be operated on in parallel.

PySpark

PySpark is the Python API for Apache Spark, an open-source, distributed computing system designed for large-scale data processing. PySpark allows Python developers to harness the power of Spark using the familiar syntax and libraries of Python.


Creating RDD:

There are two common ways to create RDDs: using the parallelize() method and using the textFile() method.


1. parallelize() Method

The parallelize() method is used to create an RDD from a collection of objects that exist on the driver node. This method takes a collection (such as a list or array) and distributes the data across the cluster to form an RDD.


Here's an example of using the parallelize() method in Python with PySpark:

# Create a collection of data
data = [1, 2, 3, 4, 5]

# Parallelize the collection to create an RDD
rdd = sc.parallelize(data)

# Collect the results back to the driver program
result = rdd.collect()
print(result)  # Output: [1, 2, 3, 4, 5]


2. textFile() Method

The textFile() method is used to read data from a text file on the local file system, HDFS, or any Hadoop-supported file system URI. This method returns an RDD of strings, with each element being a line from the text file.


Here's an example of using the textFile() method in Python with PySpark:

# Read a text file into an RDD
rdd = sc.textFile("file:///path/to/your/textfile.txt")

# Collect the results back to the driver program
lines = rdd.collect()
for line in lines:
    print(line)
    
# Save the RDD as a text file
rdd.saveAsTextFile("file:///path/to/output")


Data partitioning:

Data partitioning in Spark involves dividing data across partitions to optimize performance and reduce network traffic. Spark automatically infers default values based on cluster size, but users can specify the number of partitions.


1. parallelize() Method

numRDD = sc.parallelize(range(10), minPartitions = 6)

2. textFile() Method

fileRDD = sc.textFile("README.md", minPartitions = 6)

The number of partitions in an RDD can be found by using getNumPartitions() method.


RDDs support two types of operations:

  1. Transformations: These are operations that produce a new RDD from an existing RDD. Transformations are lazy, meaning they do not compute their results immediately. Instead, they build up a directed acyclic graph (DAG) representing the computation, which is executed only when an action is called.
  2. Actions: These are operations that trigger the execution of the DAG and return a result to the driver program or write data to external storage.

Transformations

1. Map:

Purpose: The map transformation serves to apply a specified function to each element within the RDD, consequently generating a new RDD containing the transformed results.

Example:

data = sc.parallelize(["value1,value2,value3", "value4,value5,value6", ...])
each_line_data = data.map(lambda line: line.split(","))

# Output:
# each_line_data = [['value1', 'value2', 'value3'], ['value4', 'value5', 'value6'], ...]

Explanation: In this example, the RDD data is created using sc.parallelize() with a list of strings representing lines of values separated by commas. Then, the map transformation is utilized to split each line within the RDD data based on comma separation. The resultant RDD, each_line_data, comprises elements represented as lists, where each list contains values separated by commas.


2. FlatMap:

Purpose: The flatMap transformation is similar to map but differs in its output. It applies a function to each element in the RDD and returns a new RDD by flattening the results. This means that each element of the input RDD can produce zero or more elements in the output RDD.

Example:

data = sc.parallelize(["value1,value2,value3", "value4,value5,value6", ...])
flat_mapped_data = data.flatMap(lambda line: line.split(","))

# Output:
# flat_mapped_data = ['value1', 'value2', 'value3', 'value4', 'value5', 'value6', ...]

Explanation: In this example, the RDD data is created using sc.parallelize() with a list of strings representing lines of values separated by commas. Then, the flatMap transformation is utilized to split each line within the RDD data based on comma separation and flatten the resulting list of lists into a single list. The resultant RDD, flat_mapped_data, comprises individual values rather than lists of values.

3. Filter:

Purpose: The filter transformation is used to select elements from an RDD that satisfy a given predicate function, returning a new RDD containing only those elements.

Example:

data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
filtered_data = data.filter(lambda x: x % 2 == 0)

# Output:
# filtered_data = [2, 4, 6, 8, 10]

Explanation: In this example, the RDD data is created using sc.parallelize() with a list of integers. The filter transformation is then applied to select only the elements from data that satisfy the predicate function x % 2 == 0, which checks if the element is even. The resultant RDD, filtered_data, contains only the even numbers from the original RDD.

4. ReduceByKey:

Purpose: The reduceByKey transformation is used specifically on key-value pair RDDs. It combines values with the same key using a specified associative function.

Example:

data = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)])
reduced_data = data.reduceByKey(lambda x, y: x + y)

# Output:
# reduced_data = [("a", 4), ("b", 6), ("c", 5)]

Explanation: In this example, the RDD data is created using sc.parallelize() with a list of key-value pairs. The reduceByKey transformation is then applied to combine values with the same key. The lambda function lambda x, y: x + y is used to sum up the values associated with each key. The resultant RDD, reduced_data, contains unique keys with their associated aggregated values.

# To Get Max Value
max_data = data.reduceByKey(lambda x, y: max(x, y))

# To Get Min Value
min_data = data.reduceByKey(lambda x, y: min(x, y))


5. SortBy:

Purpose: The sortBy transformation is used to sort the elements of an RDD based on a specified key or a custom sorting function.

Example:

data = sc.parallelize([("b", 2), ("a", 1), ("c", 3)])
sorted_data = data.sortBy(lambda x: x[0], ascending = True)

# sorted_data = [("a", 1), ("b", 2), ("c", 3)]

Explanation: In this example, the RDD data is created using sc.parallelize() with a list of key-value pairs. The sortBy transformation is then applied to sort the elements of the RDD based on the key, which is accessed through the lambda function lambda x: x[0]. This sorts the RDD lexicographically based on the keys. The resultant RDD, sorted_data, contains elements sorted by their keys.


6. Union:

Purpose: Combines two RDDs into a single RDD containing all the elements from both RDDs.

Example:

RDD3 = RDD1.union(RDD2)


7. Intersection:

Purpose: intersection returns an RDD that contains the intersection of elements present in both input RDDs.

Example:

common_elements_RDD = RDD1.intersection(RDD2)


8. Subtract:

Purpose: The subtract operation in PySpark returns an RDD that contains elements present in the first RDD but not in the second RDD. It essentially subtracts the elements of the second RDD from the first RDD.

Example:

unique_elements_RDD = RDD1.subtract(RDD2)


9. Distinct:

Purpose: The distinct transformation removes duplicate elements from an RDD, returning a new RDD with only unique elements.

Example:

distinct_elements_RDD = input_RDD.distinct()

Explanation: Creates a new RDD distinct_elements_RDD  containing only unique elements from the input_RDD. Duplicate elements are removed, leaving only one occurrence of each unique element.

10. MapValues:

Purpose: The mapValues transformation is used to apply a function to the values of each key-value pair in an RDD without changing the keys.

Example:

data = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
mapped_data = data.mapValues(lambda x: x * 2)

# Output:
# mapped_data = [("a", 2), ("b", 4), ("c", 6)]

Explanation: In this example, the RDD data is created using sc.parallelize() with a list of key-value pairs. The mapValues transformation is then applied to double the values of each key-value pair. The keys remain unchanged. The resultant RDD, mapped_data, contains key-value pairs with the values doubled.

11. GroupByKey:

Purpose: The groupByKey transformation is used to group together values associated with the same key in an RDD of key-value pairs.

Example:

data = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)])
grouped_data = data.groupByKey()

# Output:
# grouped_data = [("a", [1, 3]), ("b", [2, 4]), ("c", [5])]

Explanation: In this example, the RDD data is created using sc.parallelize() with a list of key-value pairs. The groupByKey transformation is then applied to group together values associated with the same key. The resultant RDD, grouped_data, contains key-value pairs where each key is associated with an iterable of its corresponding values.

12. Join Statements:

Join:

Purpose: Combines elements from two RDDs based on matching keys.
Behavior: Retains only the key-value pairs for which there are matching keys in both RDDs.

Example:

rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sc.parallelize([(1, 'x'), (3, 'y')])
joined_rdd = rdd1.join(rdd2)

# Output:
# [(1, ('a', 'x'))]


Left Join:

Purpose: Combines elements from two RDDs, retaining all elements from the left RDD.
Behavior: If a key in the left RDD has no match in the right RDD, the corresponding value in the result will be None.

Example:

rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sc.parallelize([(1, 'x'), (3, 'y')])
left_joined_rdd = rdd1.leftOuterJoin(rdd2)

# Output:
# [(1, ('a', 'x')), (2, ('b', None))]


Right Join:

Purpose: Combines elements from two RDDs, retaining all elements from the right RDD.
Behavior: If a key in the right RDD has no match in the left RDD, the corresponding value in the result will be None.

Example:

rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sc.parallelize([(1, 'x'), (3, 'y')])
right_joined_rdd = rdd1.rightOuterJoin(rdd2)

# Output:
# [(1, ('a', 'x')), (None, ('b', 'y'))]


13. FlatMapValues:

Purpose: The flatMapValues transformation in PySpark is used to transform each value of a key-value pair RDD into zero or more output values, while retaining the original keys. It is similar to the flatMap transformation but operates specifically on the values of key-value pairs.

Syntax:

new_rdd = rdd.flatMapValues(lambda value: iterable_function(value))


Actions

1. take:

Purpose: Retrieves the first n elements from an RDD and returns them as a list.

Example:

taken_elements = data.take(5)


2. count:

Purpose: The count action returns the total number of elements in an RDD.

Example:

total_count = input_RDD.count()


3. countByValue:

PurposeThe countByValue action in Apache Spark counts the occurrences of each unique value in an RDD and returns the result as a dictionary where the keys are the distinct values and the values are their respective counts.


Example:

data = sc.parallelize([1, 2, 3, 1, 2, 1, 3, 4, 5])
value_counts = data.countByValue()

# Output:
# value_counts = {1: 3, 2: 2, 3: 2, 4: 1, 5: 1}


Here's an example of SPARK CODE in Python using PySpark

You have a dataset transaction.csv:

Find the total transaction amount for each customer for the year 2023.

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("TASK 1").setMaster("local")

sc = SparkContext.getOrCreate(conf = conf)
spark = SparkSession(sc)

Data = sc.textFile("Task1_Input.csv")
each_line_Data = Data.map(lambda line: line.split(","))
Data_2023 = each_line_Data.filter(lambda row: "2023" in row[2])
Mapped_Data = Data_2023.map(lambda row: (int(row[0]), int(row[1])))
Total_Transactions_2023 = Mapped_Data.reduceByKey(lambda x, y: x + y)

DataFrame = spark.createDataFrame(Total_Transactions_2023, ["Customer_Id", "Total_Amount"])
DataFrame.show()

sc.stop()


Post a Comment

0Comments

Post a Comment (0)

#buttons=(Accept !) #days=(20)

Our website uses cookies to enhance your experience. Check Now
Accept !