import math
import multiprocessing
import os
from typing import Any, Dict, List, Set, Tuple, Union
import pyspark
# CONFIG
[docs]
def config_spark_local(autoset: bool = True) -> None:
"""Configures Spark for local execution with optimized settings based on system resources.
Automatically calculates and sets optimal Spark configuration parameters based on:
- Available CPU cores
- System memory
- Executor allocation
- Memory distribution
Args:
autoset (bool, optional): Whether to automatically apply the configuration.
If False, only prints recommended settings. Defaults to True.
Examples:
>>> config_spark_local()
Here is the current computer specs ...
executor_per_node: 1
spark_executor_instances: 1
total_executor_memory: 30
spark_executor_memory: 27
memory_overhead: 3
spark_default_parallelism: 10
spark.sql.execution.arrow.pyspark.enabled recommended by Koalas ...
spark auto-configured ...
Note:
Configuration includes:
- Executor cores and memory
- Driver cores and memory
- Memory overhead
- Default parallelism
- Shuffle partitions
- Arrow optimization for PySpark
"""
def round_down_or_one(x):
if math.floor(x) == 0:
return 1
else:
return math.floor(x)
print("Here is the current computer specs ...")
vcore_per_node = multiprocessing.cpu_count()
spark_executor_cores = 5
number_of_nodes = 1
total_ram_per_node_gb = (
os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / (1024.0**3)
)
executor_per_node = round_down_or_one((vcore_per_node - 1) / spark_executor_cores)
spark_executor_instances = round_down_or_one(
(executor_per_node * number_of_nodes) - 1
)
total_executor_memory = round_down_or_one(
(total_ram_per_node_gb - 1) / executor_per_node
)
spark_executor_memory = round_down_or_one(total_executor_memory * 0.9)
memory_overhead = round_down_or_one(total_executor_memory * 0.1)
spark_default_parallelism = round_down_or_one(
spark_executor_instances * spark_executor_cores * 2
)
print(f"executor_per_node: {executor_per_node}")
print(f"spark_executor_instances: {spark_executor_instances}")
print(f"total_executor_memory: {total_executor_memory}")
print(f"spark_executor_memory: {spark_executor_memory}")
print(f"memory_overhead: {memory_overhead}")
print(f"spark_default_parallelism: {spark_default_parallelism}")
if autoset:
spark = (
pyspark.sql.SparkSession.builder.master("local")
.config("spark.executor.cores", str(spark_executor_cores))
.config("spark.driver.cores", str(spark_executor_cores))
.config("spark.executor.instances", str(spark_executor_instances))
.config("spark.executor.memory", f"{spark_executor_memory}g")
.config("spark.driver.memory", f"{spark_executor_memory}g")
.config("spark.executor.memoryOverhead", f"{memory_overhead}g")
.config("spark.default.parallelism", str(spark_default_parallelism))
.config("spark.sql.shuffle.partitions", str(spark_default_parallelism))
.getOrCreate()
)
print("spark.sql.execution.arrow.pyspark.enabled recommended by Koalas ...")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")
print("spark auto-configured ...")
else:
print("Here is the recommended command to execute:")
text = f"""
spark = pyspark.sql.SparkSession.builder.master("local") \
.config("spark.executor.cores", "{spark_executor_cores}") \
.config("spark.driver.cores", "{spark_executor_cores}") \
.config("spark.executor.instances", "{spark_executor_instances}") \
.config("spark.executor.memory", "{spark_executor_memory}g") \
.config("spark.driver.memory", "{spark_executor_memory}g") \
.config("spark.executor.memoryOverhead", "{memory_overhead}g") \
.config("spark.default.parallelism", "{spark_default_parallelism}") \
.config("spark.sql.shuffle.partitions", "{spark_default_parallelism}") \
.getOrCreate()
"""
print(text)
print("config_spark_local exited ...")
# COLUMNS
[docs]
def add_dummy_columns(
dataframe: pyspark.sql.DataFrame, columns: List[str], value: str
) -> pyspark.sql.DataFrame:
"""Adds new columns with default values to a Spark DataFrame.
Args:
dataframe (pyspark.sql.DataFrame): Input Spark DataFrame
columns (List[str]): List of column names to add
value (str): Default value for the new columns
Returns:
pyspark.sql.DataFrame: DataFrame with added columns
Raises:
TypeError: If arguments are not of correct type
- dataframe must be a Spark DataFrame
- columns must be a list
- value must be a string
Examples:
>>> df = spark.createDataFrame([("Alice", 1)], ["name", "id"])
>>> new_df = add_dummy_columns(df, ["age", "city"], "unknown")
>>> new_df.show()
+-----+---+---+------+
| name| id|age| city|
+-----+---+---+------+
|Alice| 1|unknown|unknown|
+-----+---+---+------+
"""
import pyspark.sql.functions as F
if not isinstance(dataframe, pyspark.sql.dataframe.DataFrame):
raise TypeError("Argument must be a Pyspark dataframe ...")
if not isinstance(columns, list):
raise TypeError("Argument must be a list ...")
if not isinstance(value, str):
raise TypeError("Argument must be a str ...")
df = dataframe
dummy_columns = set(columns) - set(dataframe.columns)
for column in dummy_columns:
df = df.withColumn(column, F.lit(value))
return df
[docs]
def column_into_list(dataframe: pyspark.sql.DataFrame, column: str) -> List[Any]:
"""Extracts values from a DataFrame column into a Python list.
Args:
dataframe (pyspark.sql.DataFrame): Input Spark DataFrame
column (str): Name of the column to extract
Returns:
List[Any]: List containing all values from the specified column,
including duplicates
Raises:
TypeError: If dataframe is not a Spark DataFrame or column is not a string
Examples:
>>> df = spark.createDataFrame([(1,), (2,), (2,)], ["value"])
>>> column_into_list(df, "value")
[1, 2, 2]
"""
if not isinstance(dataframe, pyspark.sql.dataframe.DataFrame):
raise TypeError("Argument must be a Pyspark dataframe ...")
if not isinstance(column, str):
raise TypeError("Argument must be a str ...")
if column in dataframe.columns:
list_ = dataframe.select(column).toPandas()[column].values.tolist()
return list_
[docs]
def column_into_set(dataframe: pyspark.sql.DataFrame, column: str) -> Set[Any]:
"""Extracts unique values from a DataFrame column into a Python set.
Args:
dataframe (pyspark.sql.DataFrame): Input Spark DataFrame
column (str): Name of the column to extract
Returns:
Set[Any]: Set containing unique values from the specified column
Raises:
TypeError: If dataframe is not a Spark DataFrame or column is not a string
Examples:
>>> df = spark.createDataFrame([(1,), (2,), (2,)], ["value"])
>>> column_into_set(df, "value")
{1, 2}
"""
if not isinstance(dataframe, pyspark.sql.dataframe.DataFrame):
raise TypeError("Argument must be a Pyspark dataframe ...")
if not isinstance(column, str):
raise TypeError("Argument must be a str ...")
set_ = set(column_into_list(dataframe, column))
return set_
[docs]
def columns_prefix(
dataframe: pyspark.sql.DataFrame, prefix: str
) -> pyspark.sql.DataFrame:
"""Adds a prefix to all column names in a DataFrame.
Args:
dataframe (pyspark.sql.DataFrame): Input Spark DataFrame
prefix (str): Prefix to add to column names
Returns:
pyspark.sql.DataFrame: DataFrame with renamed columns
Raises:
TypeError: If dataframe is not a Spark DataFrame or prefix is not a string
Examples:
>>> df = spark.createDataFrame([("Alice", 1)], ["name", "id"])
>>> new_df = columns_prefix(df, "user_")
>>> new_df.show()
+---------+-------+
|user_name|user_id|
+---------+-------+
| Alice| 1|
+---------+-------+
"""
if not isinstance(dataframe, pyspark.sql.dataframe.DataFrame):
raise TypeError("Argument must be a Pyspark dataframe ...")
if not isinstance(prefix, str):
raise TypeError("Argument must be a str ...")
df = dataframe
for column in dataframe.columns:
if not column.startswith(prefix):
df = df.withColumnRenamed(column, prefix + column)
return df
[docs]
def columns_statistics(
dataframe: pyspark.sql.DataFrame, n: int = 10
) -> Tuple[List[str], List[str]]:
"""Analyzes column statistics and identifies empty and single-value columns.
Performs comprehensive analysis of each column including:
- Value counts
- Empty value detection
- Single value detection
- Basic statistics
Args:
dataframe (pyspark.sql.DataFrame): Input Spark DataFrame
n (int, optional): Number of top values to display for each column.
Defaults to 10.
Returns:
Tuple[List[str], List[str]]: Two lists containing:
- List of empty column names
- List of single-value column names
Raises:
TypeError: If dataframe is not a Spark DataFrame
Examples:
>>> df = spark.createDataFrame([
... ("Alice", None),
... ("Alice", None)
... ], ["name", "email"])
>>> empty_cols, single_cols = columns_statistics(df)
>>> print(f"Empty columns: {empty_cols}")
Empty columns: ['email']
>>> print(f"Single value columns: {single_cols}")
Single value columns: ['name']
"""
if not isinstance(dataframe, pyspark.sql.dataframe.DataFrame):
raise TypeError("Argument must be a Pyspark dataframe ...")
describe(dataframe)
empty_columns, single_columns = [], []
for column in dataframe.columns:
df = group_count(dataframe=dataframe, columns=column, n=n)
print(column)
df.show(n=n)
if df.count() == 1:
single_columns.append(column)
print(f"!!!!! {column} is a candidate to drop !!!!!\n\n")
if (
not df.first()[0]
or df.first()[0].casefold() == "none"
or df.first()[0].casefold()
):
empty_columns.append(column)
print(
f"There are {len(single_columns)} of single value columns, they are: {single_columns}"
)
print(
f"There are {len(empty_columns)} of null value columns, they are: {empty_columns}"
)
return empty_columns, single_columns
# DATAFRAME
[docs]
def rename(
dataframe: pyspark.sql.DataFrame, columns: Dict[str, str]
) -> pyspark.sql.DataFrame:
"""Renames multiple columns in a DataFrame using a mapping dictionary.
Args:
dataframe (pyspark.sql.DataFrame): Input Spark DataFrame
columns (Dict[str, str]): Dictionary mapping old column names to new names
Returns:
pyspark.sql.DataFrame: DataFrame with renamed columns
Raises:
TypeError: If dataframe is not a Spark DataFrame or columns is not a dict
Examples:
>>> df = spark.createDataFrame([("Alice", 1)], ["_1", "_2"])
>>> new_df = rename(df, {"_1": "name", "_2": "id"})
>>> new_df.show()
+-----+---+
| name| id|
+-----+---+
|Alice| 1|
+-----+---+
"""
import pyspark.sql.functions as F
if not isinstance(dataframe, pyspark.sql.dataframe.DataFrame):
raise TypeError("Argument must be a Pyspark dataframe ...")
if not isinstance(columns, dict):
raise TypeError("Argument must be a dict ...")
df = dataframe.select(
[F.col(c).alias(columns.get(c, c)) for c in dataframe.columns]
)
return df
# STATISTICS
[docs]
def describe(dataframe: pyspark.sql.DataFrame) -> None:
"""Prints comprehensive information about a DataFrame.
Displays:
- DataFrame type
- Number of columns
- Number of rows
- Schema information
Args:
dataframe (pyspark.sql.DataFrame): Input Spark DataFrame
Raises:
TypeError: If dataframe is not a Spark DataFrame
Examples:
>>> df = spark.createDataFrame([("Alice", 1)], ["name", "id"])
>>> describe(df)
The dataframe: <class 'pyspark.sql.dataframe.DataFrame'>
Number of columns: 2
Number of rows: 1
root
|-- name: string (nullable = true)
|-- id: long (nullable = true)
"""
if not isinstance(dataframe, pyspark.sql.dataframe.DataFrame):
raise TypeError("Argument must be a Pyspark dataframe ...")
print(f"The dataframe: {type(dataframe)}")
print(f"Number of columns: {len(dataframe.columns)}")
print(f"Number of rows: {dataframe.count()}")
dataframe.printSchema()
[docs]
def group_count(
dataframe: pyspark.sql.DataFrame,
columns: Union[str, List[str]],
n: Union[int, float] = 10,
) -> pyspark.sql.DataFrame:
"""Performs group by operation and calculates count and percentage for each group.
Args:
dataframe (pyspark.sql.DataFrame): Input Spark DataFrame
columns (Union[str, List[str]]): Column(s) to group by
n (Union[int, float], optional): Number of top groups to return.
Use float('inf') for all groups. Defaults to 10.
Returns:
pyspark.sql.DataFrame: DataFrame with columns:
- Group by column(s)
- count: Count of records in each group
- percent: Percentage of total records in each group
Raises:
TypeError: If arguments are not of correct type
Examples:
>>> df = spark.createDataFrame([
... (1, 'A'), (1, 'B'), (2, 'A')
... ], ["id", "category"])
>>> group_count(df, ["id"]).show()
+---+-----+-------+
| id|count|percent|
+---+-----+-------+
| 1| 2| 66.7|
| 2| 1| 33.3|
+---+-----+-------+
"""
import pyspark.sql.functions as F
if not isinstance(dataframe, pyspark.sql.dataframe.DataFrame):
raise TypeError("Argument must be a Pyspark dataframe ...")
if not isinstance(columns, list):
raise TypeError("Argument must be a list ...")
df = dataframe.groupBy(columns).count().orderBy("count", ascending=False)
row_count = dataframe.count()
df = df.withColumn(
"percent", F.round(F.udf(lambda x: x * 100 / row_count)("count"), 3)
)
if n != float("inf"):
df = df.limit(n)
return df
if __name__ == "__main__":
import doctest
doctest.testmod()