Dynamically generate schemas from an existing DataFrame
Besides loading a DataSet from an existing table, we also provide create_schema(), which generates a Schema from a DataFrame that you have in memory. This allows you to get autocomplete on DataSets that you create on-the-fly. We’ll first create some data, and then we’ll dive into an example of create_schema() using a pivot table.
Creating data
[1]:
from pyspark.sql import SparkSession
spark = SparkSession.Builder().config("spark.ui.showConsoleProgress", "false").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
[2]:
from datetime import timedelta, datetime
from pyspark.sql.functions import first
from pyspark.sql.types import LongType, StringType, DateType
from typedspark import Column, Schema, create_partially_filled_dataset, create_schema
date = datetime(2023, 10, 2)
class Vaccinations(Schema):
vaccination_id: Column[LongType]
pet_id: Column[LongType]
vaccine_name: Column[StringType]
vaccine_date: Column[DateType]
next_due_date: Column[DateType]
vaccinations = create_partially_filled_dataset(
spark,
Vaccinations,
{
Vaccinations.vaccination_id: [1, 2, 3, 4, 5, 6, 7],
Vaccinations.pet_id: [1, 2, 3, 1, 3, 2, 3],
Vaccinations.vaccine_name: [
"rabies",
"rabies",
"rabies",
"lyme",
"lyme",
"influenza",
"influenza",
],
Vaccinations.next_due_date: [
date + timedelta(days=32),
date + timedelta(days=6),
date + timedelta(days=12),
date + timedelta(days=15),
date + timedelta(days=2),
date + timedelta(days=1),
date + timedelta(days=3),
],
Vaccinations.vaccine_date: [
date + timedelta(days=32) - timedelta(days=365),
date + timedelta(days=6) - timedelta(days=365),
date + timedelta(days=12) - timedelta(days=365),
date + timedelta(days=15) - timedelta(days=365),
date + timedelta(days=2) - timedelta(days=365),
date + timedelta(days=1) - timedelta(days=365),
date + timedelta(days=3) - timedelta(days=365),
],
},
)
vaccinations.show()
+--------------+------+------------+------------+-------------+
|vaccination_id|pet_id|vaccine_name|vaccine_date|next_due_date|
+--------------+------+------------+------------+-------------+
| 1| 1| rabies| 2022-11-03| 2023-11-03|
| 2| 2| rabies| 2022-10-08| 2023-10-08|
| 3| 3| rabies| 2022-10-14| 2023-10-14|
| 4| 1| lyme| 2022-10-17| 2023-10-17|
| 5| 3| lyme| 2022-10-04| 2023-10-04|
| 6| 2| influenza| 2022-10-03| 2023-10-03|
| 7| 3| influenza| 2022-10-05| 2023-10-05|
+--------------+------+------------+------------+-------------+
Example using a pivot table
Let’s pivot this table and run create_schema()!
[3]:
pivot = (
vaccinations.groupby(Vaccinations.pet_id)
.pivot(Vaccinations.vaccine_name.str)
.agg(first(Vaccinations.next_due_date))
)
pivot, Pivot = create_schema(pivot)
pivot.show()
+------+----------+----------+----------+
|pet_id| influenza| lyme| rabies|
+------+----------+----------+----------+
| 1| NULL|2023-10-17|2023-11-03|
| 2|2023-10-03| NULL|2023-10-08|
| 3|2023-10-05|2023-10-04|2023-10-14|
+------+----------+----------+----------+
We can use the resulting schema as usual:
[4]:
Pivot
[4]:
from pyspark.sql.types import DateType, LongType
from typedspark import Column, Schema
class DynamicallyLoadedSchema(Schema):
pet_id: Column[LongType]
influenza: Column[DateType]
lyme: Column[DateType]
rabies: Column[DateType]
[5]:
pivot.filter(Pivot.influenza.isNotNull()).show()
+------+----------+----------+----------+
|pet_id| influenza| lyme| rabies|
+------+----------+----------+----------+
| 2|2023-10-03| NULL|2023-10-08|
| 3|2023-10-05|2023-10-04|2023-10-14|
+------+----------+----------+----------+
Monkeypatch
We also support doing the above directly in a function-chain using a monkeypatch.
[6]:
pivot, Pivot = (
vaccinations.groupby(Vaccinations.pet_id)
.pivot(Vaccinations.vaccine_name.str)
.agg(first(Vaccinations.next_due_date))
.to_typedspark()
)
pivot.show()
+------+----------+----------+----------+
|pet_id| influenza| lyme| rabies|
+------+----------+----------+----------+
| 1| NULL|2023-10-17|2023-11-03|
| 2|2023-10-03| NULL|2023-10-08|
| 3|2023-10-05|2023-10-04|2023-10-14|
+------+----------+----------+----------+
[7]:
Pivot
[7]:
from pyspark.sql.types import DateType, LongType
from typedspark import Column, Schema
class DynamicallyLoadedSchema(Schema):
pet_id: Column[LongType]
influenza: Column[DateType]
lyme: Column[DateType]
rabies: Column[DateType]
Using the monkeypatched form comes with pros and cons. The pros:
No need for intermediate variables
No need to import
create_schema()Both contribute to a more straightforward workflow
And cons:
The
to_typedspark()function is glued against against theDataFrameclass once we import anything fromtypedspark.This often means that
to_typedspark()will not show up during autocomplete, so you’ll have to type it yourself.For the same reason, typecheckers may raise a linting error.
And finally, it only works if you’ve imported something from typedspark already. Shouldn’t be a major problem (you’ll likely have imported
Catalogs, for example), but it’s something to be aware of.