Loading DataSets in notebooks

The basics

Using typedspark, you can dynamically load a DataSet and its corresponding Schema from an existing table. This provides the following benefits:

  • Autocomplete on table names

  • Autocomplete on column names

  • Generating a Schema from the table

    • Which you can copy-paste into your IDE when you want to add a new DataSet to your pipelines

    • Works even when the underlying table has not been serialized using typedspark!

To illustrate these points, let us first make a temporary table that we can load later.

[1]:
from pyspark.sql import SparkSession

spark = SparkSession.Builder().config("spark.ui.showConsoleProgress", "false").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
[2]:
import pandas as pd

df = spark.createDataFrame(
    pd.DataFrame(
        dict(
            name=["Jack", "John", "Jane"],
            age=[20, 30, 40],
        )
    )
)
df.createOrReplaceTempView("person_table")

Catalogs

The easiest way to load a DataSet is using Catalogs.

[3]:
from typedspark import Catalogs

db = Catalogs()

After running the above cell, we can use db to load our table. Notice that you’ll get autocomplete here!

[4]:
persons, Person = db.spark_catalog.default.person_table()

We can use the DataSet and Schema just as we would do in the IDE.

[5]:
persons.show()
+----+---+
|name|age|
+----+---+
|Jack| 20|
|John| 30|
|Jane| 40|
+----+---+

[6]:
Person
[6]:

from pyspark.sql.types import LongType, StringType

from typedspark import Column, Schema


class PersonTable(Schema):
    name: Column[StringType]
    age: Column[LongType]
[7]:
persons.filter(Person.age > 25).show()
+----+---+
|name|age|
+----+---+
|John| 30|
|Jane| 40|
+----+---+

And of course, while we’re typing the above statement, we’ll get autocomplete on the columns of persons!

Another application of the above is if we’re migrating to existing tables to typedspark, and we want to generate the corresponding schemas such that we can copy them to the repository. In that case, we may also want to generate documentation.

[8]:
Person.print_schema(include_documentation=True)
from typing import Annotated

from pyspark.sql.types import LongType, StringType

from typedspark import Column, ColumnMeta, Schema


class PersonTable(Schema):
    """Add documentation here."""

    name: Annotated[Column[StringType], ColumnMeta(comment="")]
    age: Annotated[Column[LongType], ColumnMeta(comment="")]

Of note, Catalog() automatically runs register_schema_to_dataset() on the resulting DataSet and Schema, hence resolving potential column disambiguities.

Databases

Catalogs is often the only class you need. But if loading all catalogs takes too long, or if you only want to use only one catalog anyway, you can use Databases instead. We can use Databases(spark, catalog_name=...) to specify which catalog we want to load. Or we can omit this parameter to load the default catalog (often spark_catalog or hive_metastore).

[9]:
from typedspark import Databases

db = Databases()
[10]:
persons, Person = db.default.person_table()

Database

If we just want to load the tables from a single database, we can use Database. Once again, we can either specify the database (through Database(spark, db_name=...)) or leave it blank to load the default database.

[11]:
from typedspark import Database

db = Database()
[12]:
person, Person = db.person_table.load()

Names starting with an underscore

For Catalogs(), Databases() and Database(), names starting with an underscore are problematic for auto-complete. For example, suppose we’d make this table

[13]:
df.write.saveAsTable("default._person")

Then we won’t get auto-complete using db._person, since our notebook will assume we don’t want auto-complete on private variables. To circumvent this, we rename the attribute as such:

[14]:
db = Database()
[15]:
persons, Person = db.u_person()

The underlying table is not renamed, solely the class attribute used for autocomplete.

When renaming the attribute leads to a naming conflict (e.g. because u_person already exists), we resolve the conflict by adding more underscores (e.g. _person would then become u__person).

Loading a single DataSet

If you really only want to load one DataSet, you can use load_table().

[16]:
from typedspark import load_table

person, Person = load_table(spark, "person_table")
Person
[16]:

from pyspark.sql.types import LongType, StringType

from typedspark import Column, Schema


class DynamicallyLoadedSchema(Schema):
    name: Column[StringType]
    age: Column[LongType]

If you’d like to have the schema name properly displayed, you can use the optional schema_name argument.

[17]:
person, Person = load_table(spark, "person_table", schema_name="Person")
Person
[17]:

from pyspark.sql.types import LongType, StringType

from typedspark import Column, Schema


class Person(Schema):
    name: Column[StringType]
    age: Column[LongType]