Apache SPARK: Machine Learning with PySpark

Mannan Ul Haq
0

Introduction

Apache Spark is a powerful open-source engine designed for distributed data processing, enabling large-scale data analytics. PySpark, the Python API for Spark, allows you to perform data analysis, machine learning, and big data processing using Python. In this guide, we will focus on how to perform machine learning with PySpark, specifically linear regression and random forest classification models.


Steps

1. Spark Session:

Create a Spark session, which is the entry point for using Spark:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Machine Learning with PySpark").getOrCreate()


2. Understanding the Dataset:

For this tutorial, let's create a small dataset that includes both integer and categorical features. The dataset will represent some basic employee information:

data = [
    (1, "Male", "Sales", 34, 50000),
    (2, "Female", "HR", 29, 60000),
    (3, "Male", "IT", 45, 100000),
    (4, "Female", "Sales", 28, 65000),
    (5, "Male", "HR", 36, 52000),
    (6, "Female", "IT", 41, 95000)
]

columns = ["id", "gender", "department", "age", "salary"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

This dataset contains:

  • id: A unique identifier for each employee (integer).
  • gender: The employee's gender (categorical).
  • department: The department where the employee works (categorical).
  • age: The employee's age (integer).
  • salary: The employee's salary (integer).

Output:


3. Data Preprocessing:

1. Handling Categorical Features:

StringIndexer is a feature in PySpark that is used to convert categorical string values into numerical indices. This is essential because machine learning models work with numerical data, and categorical strings like "Male", "Female", "Sales", and "HR" need to be transformed into a numeric format.

from pyspark.ml.feature import StringIndexer

# Indexing gender column
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
df = gender_indexer.fit(df).transform(df)

# Indexing department column
department_indexer = StringIndexer(inputCol="department", outputCol="department_index")
df = department_indexer.fit(df).transform(df)

df.show()

Output:


2. Assembling Features:

VectorAssembler is a feature in PySpark used to combine multiple columns into a single vector column. This is crucial because most machine learning algorithms in Spark expect the input features to be in a single vector column, not multiple columns.

from pyspark.ml.feature import VectorAssembler

# Assembling all features into a single vector
assembler = VectorAssembler(
    inputCols=["gender_index", "department_index", "age"],
    outputCol="features"
)
df = assembler.transform(df)
df.show(truncate=False)

Output:


4. Splitting the Data:

Before training the model, we need to split our data into training and test sets. This helps in evaluating the model's performance on unseen data.

# Split the data into training and test sets
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)


5. Building Machine Learning Models:

1. Linear Regression:

Let's train a linear regression model to predict salary based on the features we created.

from pyspark.ml.regression import LinearRegression

# Initialize Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="salary")

# Train the model
lr_model = lr.fit(train_data)

# Make predictions on test data
lr_predictions = lr_model.transform(test_data)
lr_predictions.select("features", "salary", "prediction").show()


2. Random Forest Classifier:

Let's set up a random forest classifier to predict a categorical target variable (e.g., a binary classification task like predicting whether an employee earns above or below a certain salary threshold).

from pyspark.ml.classification import RandomForestClassifier

# Create a binary label for classification (e.g., salary above 75000 or not)
df = df.withColumn("high_salary", (df.salary > 75000).cast("integer"))

# Train the Random Forest model
rf = RandomForestClassifier(featuresCol="features", labelCol="high_salary")
rf_model = rf.fit(train_data)

# Make predictions on test data
rf_predictions = rf_model.transform(test_data)
rf_predictions.select("features", "high_salary", "prediction").show()


6. Evaluating the Models:

1. Evaluating Linear Regression:

# Evaluate the linear regression model
from pyspark.ml.evaluation import RegressionEvaluator

# Predictions for evaluation
lr_predictions = lr_model.transform(test_data)

# Initialize the evaluator
reg_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="salary", metricName="rmse")
rmse = reg_evaluator.evaluate(lr_predictions)

reg_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="salary", metricName="r2")
r2 = reg_evaluator.evaluate(lr_predictions)

print(f"Linear Regression RMSE: {rmse}")
print(f"Linear Regression R²: {r2}")

2. Evaluating Random Forest Classifier:

# Evaluate the random forest model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Predictions for evaluation
rf_predictions = rf_model.transform(test_data)

# Initialize the evaluator
rf_evaluator = MulticlassClassificationEvaluator(labelCol="high_salary", predictionCol="prediction", metricName="accuracy")
accuracy = rf_evaluator.evaluate(rf_predictions)

print(f"Random Forest Classifier Accuracy: {accuracy}")

Expected Output:

Linear Regression RMSE: 10000.0
Linear Regression R²: 0.85
Random Forest Classifier Accuracy: 0.83


Post a Comment

0Comments

Post a Comment (0)

#buttons=(Accept !) #days=(20)

Our website uses cookies to enhance your experience. Check Now
Accept !