Dynamically generate schemas from an existing DataFrame

Besides loading a DataSet from an existing table, we also provide create_schema(), which generates a Schema from a DataFrame that you have in memory. This allows you to get autocomplete on DataSets that you create on-the-fly. We’ll first create some data, and then we’ll dive into an example of create_schema() using a pivot table.

Creating data

[1]:
from pyspark.sql import SparkSession

spark = SparkSession.Builder().config("spark.ui.showConsoleProgress", "false").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
[2]:
from datetime import timedelta, datetime
from pyspark.sql.functions import first
from pyspark.sql.types import LongType, StringType, DateType
from typedspark import Column, Schema, create_partially_filled_dataset, create_schema

date = datetime(2023, 10, 2)


class Vaccinations(Schema):
    vaccination_id: Column[LongType]
    pet_id: Column[LongType]
    vaccine_name: Column[StringType]
    vaccine_date: Column[DateType]
    next_due_date: Column[DateType]


vaccinations = create_partially_filled_dataset(
    spark,
    Vaccinations,
    {
        Vaccinations.vaccination_id: [1, 2, 3, 4, 5, 6, 7],
        Vaccinations.pet_id: [1, 2, 3, 1, 3, 2, 3],
        Vaccinations.vaccine_name: [
            "rabies",
            "rabies",
            "rabies",
            "lyme",
            "lyme",
            "influenza",
            "influenza",
        ],
        Vaccinations.next_due_date: [
            date + timedelta(days=32),
            date + timedelta(days=6),
            date + timedelta(days=12),
            date + timedelta(days=15),
            date + timedelta(days=2),
            date + timedelta(days=1),
            date + timedelta(days=3),
        ],
        Vaccinations.vaccine_date: [
            date + timedelta(days=32) - timedelta(days=365),
            date + timedelta(days=6) - timedelta(days=365),
            date + timedelta(days=12) - timedelta(days=365),
            date + timedelta(days=15) - timedelta(days=365),
            date + timedelta(days=2) - timedelta(days=365),
            date + timedelta(days=1) - timedelta(days=365),
            date + timedelta(days=3) - timedelta(days=365),
        ],
    },
)
vaccinations.show()
+--------------+------+------------+------------+-------------+
|vaccination_id|pet_id|vaccine_name|vaccine_date|next_due_date|
+--------------+------+------------+------------+-------------+
|             1|     1|      rabies|  2022-11-03|   2023-11-03|
|             2|     2|      rabies|  2022-10-08|   2023-10-08|
|             3|     3|      rabies|  2022-10-14|   2023-10-14|
|             4|     1|        lyme|  2022-10-17|   2023-10-17|
|             5|     3|        lyme|  2022-10-04|   2023-10-04|
|             6|     2|   influenza|  2022-10-03|   2023-10-03|
|             7|     3|   influenza|  2022-10-05|   2023-10-05|
+--------------+------+------------+------------+-------------+

Example using a pivot table

Let’s pivot this table and run create_schema()!

[3]:
pivot = (
    vaccinations.groupby(Vaccinations.pet_id)
    .pivot(Vaccinations.vaccine_name.str)
    .agg(first(Vaccinations.next_due_date))
)

pivot, Pivot = create_schema(pivot)
pivot.show()
+------+----------+----------+----------+
|pet_id| influenza|      lyme|    rabies|
+------+----------+----------+----------+
|     1|      NULL|2023-10-17|2023-11-03|
|     2|2023-10-03|      NULL|2023-10-08|
|     3|2023-10-05|2023-10-04|2023-10-14|
+------+----------+----------+----------+

We can use the resulting schema as usual:

[4]:
Pivot
[4]:

from pyspark.sql.types import DateType, LongType

from typedspark import Column, Schema


class DynamicallyLoadedSchema(Schema):
    pet_id: Column[LongType]
    influenza: Column[DateType]
    lyme: Column[DateType]
    rabies: Column[DateType]
[5]:
pivot.filter(Pivot.influenza.isNotNull()).show()
+------+----------+----------+----------+
|pet_id| influenza|      lyme|    rabies|
+------+----------+----------+----------+
|     2|2023-10-03|      NULL|2023-10-08|
|     3|2023-10-05|2023-10-04|2023-10-14|
+------+----------+----------+----------+

Monkeypatch

We also support doing the above directly in a function-chain using a monkeypatch.

[6]:
pivot, Pivot = (
    vaccinations.groupby(Vaccinations.pet_id)
    .pivot(Vaccinations.vaccine_name.str)
    .agg(first(Vaccinations.next_due_date))
    .to_typedspark()
)
pivot.show()
+------+----------+----------+----------+
|pet_id| influenza|      lyme|    rabies|
+------+----------+----------+----------+
|     1|      NULL|2023-10-17|2023-11-03|
|     2|2023-10-03|      NULL|2023-10-08|
|     3|2023-10-05|2023-10-04|2023-10-14|
+------+----------+----------+----------+

[7]:
Pivot
[7]:

from pyspark.sql.types import DateType, LongType

from typedspark import Column, Schema


class DynamicallyLoadedSchema(Schema):
    pet_id: Column[LongType]
    influenza: Column[DateType]
    lyme: Column[DateType]
    rabies: Column[DateType]

Using the monkeypatched form comes with pros and cons. The pros:

  • No need for intermediate variables

  • No need to import create_schema()

  • Both contribute to a more straightforward workflow

And cons:

  • The to_typedspark() function is glued against against the DataFrame class once we import anything from typedspark.

  • This often means that to_typedspark() will not show up during autocomplete, so you’ll have to type it yourself.

  • For the same reason, typecheckers may raise a linting error.

  • And finally, it only works if you’ve imported something from typedspark already. Shouldn’t be a major problem (you’ll likely have imported Catalogs, for example), but it’s something to be aware of.