Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created the functionality to split the columns for issue #85 #92

Merged
merged 13 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions quinn/split_columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, split, when, length


def split_col(df: DataFrame, col_name: str, delimiter: str,
new_col_names: List[str], mode: str = "permissive") -> DataFrame:
"""
Splits the given column based on the delimiter and creates new columns with the split values.
:param df: The input DataFrame
:type df: pyspark.sql.DataFrame
:param col_name: The name of the column to split
:type col_name: str
:param delimiter: The delimiter to split the column on
:type delimiter: str
:param new_col_names: A list of two strings for the new column names
:type new_col_names: (List[str])
:param mode: The split mode. Can be "strict" or "permissive". Default is "strict"
puneetsharma04 marked this conversation as resolved.
Show resolved Hide resolved
:type mode: str
:return: dataframe: The resulting DataFrame with the split columns
:rtype: pyspark.sql.DataFrame
"""
if col_name not in df.columns:
raise ValueError(f"Column '{col_name}' not found in DataFrame.")

if not isinstance(delimiter, str):
raise TypeError("Delimiter must be a string.")

if not isinstance(new_col_names, list) or len(new_col_names) != 2:
raise ValueError("New column names must be a list of two strings.")

split_col_expr = split(col(col_name), delimiter)

if mode == "strict":
df = df.select("*", split_col_expr.getItem(0).alias(new_col_names[0]),
split_col_expr.getItem(1).alias(new_col_names[1])) \
.drop(col_name)
elif mode == "permissive":
df = df.select("*", split_col_expr.getItem(0).alias(new_col_names[0]),
when(length(split_col_expr.getItem(1)) > 1, split_col_expr.getItem(1))
puneetsharma04 marked this conversation as resolved.
Show resolved Hide resolved
.alias(new_col_names[1])) \
.filter(col(new_col_names[1]).isNotNull()) \
puneetsharma04 marked this conversation as resolved.
Show resolved Hide resolved
.drop(col_name)
else:
raise ValueError(f"Invalid mode: {mode}")

return df
25 changes: 25 additions & 0 deletions tests/test_split_columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import quinn
from tests.conftest import auto_inject_fixtures


@auto_inject_fixtures("spark")
def test_split_columns(spark):
# Create Spark DataFrame
data = [("chrisXXmoe", 2025, "bio"),
("davidXXcross", 2026, "physics"),
("sophiaXXraul", 2022, "bio"),
("fredXXli", 2025, "physics"),
("someXXperson", 2023, "math"),
("liXXyao", 2025, "physics")]

df = spark.createDataFrame(data, ["student_name", "graduation_year", "major"])
# Define the delimiter
delimiter = "XX"

# New column names
new_col_names = ["student_first_name", "student_last_name"]

col_name = "student_name"

# Call split_col() function to split "student_name" column
new_df = quinn.split_col(df, col_name, delimiter, new_col_names)