Transforming a DataSet to another schema
The basics
We often come across the following pattern:
[1]:
from pyspark.sql.types import IntegerType, StringType
from typedspark import Column, Schema, DataSet
class Person(Schema):
name: Column[StringType]
job_id: Column[IntegerType]
class Job(Schema):
id: Column[IntegerType]
function: Column[StringType]
hourly_rate: Column[IntegerType]
class PersonWithJob(Person, Job):
id: Column[IntegerType]
name: Column[StringType]
job_name: Column[StringType]
rate: Column[IntegerType]
def get_plumbers(persons: DataSet[Person], jobs: DataSet[Job]) -> DataSet[PersonWithJob]:
return DataSet[PersonWithJob](
jobs.filter(Job.function == "plumber")
.join(persons, Job.id == Person.job_id)
.withColumn(PersonWithJob.job_name.str, Job.function)
.withColumn(PersonWithJob.rate.str, Job.hourly_rate)
.select(*PersonWithJob.all_column_names())
)
We can make that quite a bit more condensed:
[2]:
from typedspark import transform_to_schema
def get_plumbers(persons: DataSet[Person], jobs: DataSet[Job]) -> DataSet[PersonWithJob]:
return transform_to_schema(
jobs.filter(
Job.function == "plumber",
).join(
persons,
Job.id == Person.job_id,
),
PersonWithJob,
{
PersonWithJob.job_name: Job.function,
PersonWithJob.rate: Job.hourly_rate,
},
)
Specifically, transform_to_schema() has the following benefits:
No more need to cast every return statement using
DataSet[Schema](...)No more need to drop the columns that are not in the schema using
select(*Schema.all_column_names())Less verbose syntax compared to
.withColumn(...)
Unique keys required
The transformations dictionary in transform_to_schema(..., transformations) requires columns with unique names as keys. The following pattern will throw an exception.
[3]:
from pyspark.sql import SparkSession
spark = SparkSession.Builder().config("spark.ui.showConsoleProgress", "false").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
[4]:
from typedspark import create_partially_filled_dataset
df = create_partially_filled_dataset(spark, Job, {Job.hourly_rate: [10, 20, 30]})
try:
transform_to_schema(
df,
Job,
{
Job.hourly_rate: Job.hourly_rate + 3,
Job.hourly_rate: Job.hourly_rate * 2,
},
)
except ValueError as e:
print(e)
[CANNOT_CONVERT_COLUMN_INTO_BOOL] Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
Instead, use one line per column
[5]:
transform_to_schema(
df,
Job,
{
Job.hourly_rate: (Job.hourly_rate + 3) * 2,
},
).show()
+----+--------+-----------+
| id|function|hourly_rate|
+----+--------+-----------+
|NULL| NULL| 26|
|NULL| NULL| 46|
|NULL| NULL| 66|
+----+--------+-----------+
Filling missing columns and nested fields with null
transform_to_schema() accepts two flags:
fill_unspecified_columns_with_nulls. When True, it adds null for any top-level column present in the target schema but absent from the source dataframe.fill_unspecified_inner_fields_with_nulls. When True handles the complementary case: the top-level column exists in both, but the nested type inside it (structs, array of structs, or map of structs) is missing fields required by the target schema. Those missing fields are filled with null in-place, recursively.
Use both together when the source data is structurally incomplete at any level relative to the target schema.
[ ]: