Loading DataSets in notebooks
The basics
Using typedspark, you can dynamically load a DataSet and its corresponding Schema from an existing table. This provides the following benefits:
Autocomplete on table names
Autocomplete on column names
Generating a
Schemafrom the tableWhich you can copy-paste into your IDE when you want to add a new
DataSetto your pipelinesWorks even when the underlying table has not been serialized using typedspark!
To illustrate these points, let us first make a temporary table that we can load later.
[1]:
from pyspark.sql import SparkSession
spark = SparkSession.Builder().config("spark.ui.showConsoleProgress", "false").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
[2]:
import pandas as pd
df = spark.createDataFrame(
pd.DataFrame(
dict(
name=["Jack", "John", "Jane"],
age=[20, 30, 40],
)
)
)
df.createOrReplaceTempView("person_table")
Catalogs
The easiest way to load a DataSet is using Catalogs.
[3]:
from typedspark import Catalogs
db = Catalogs()
After running the above cell, we can use db to load our table. Notice that you’ll get autocomplete here!
[4]:
persons, Person = db.spark_catalog.default.person_table()
We can use the DataSet and Schema just as we would do in the IDE.
[5]:
persons.show()
+----+---+
|name|age|
+----+---+
|Jack| 20|
|John| 30|
|Jane| 40|
+----+---+
[6]:
Person
[6]:
from pyspark.sql.types import LongType, StringType
from typedspark import Column, Schema
class PersonTable(Schema):
name: Column[StringType]
age: Column[LongType]
[7]:
persons.filter(Person.age > 25).show()
+----+---+
|name|age|
+----+---+
|John| 30|
|Jane| 40|
+----+---+
And of course, while we’re typing the above statement, we’ll get autocomplete on the columns of persons!
Another application of the above is if we’re migrating to existing tables to typedspark, and we want to generate the corresponding schemas such that we can copy them to the repository. In that case, we may also want to generate documentation.
[8]:
Person.print_schema(include_documentation=True)
from typing import Annotated
from pyspark.sql.types import LongType, StringType
from typedspark import Column, ColumnMeta, Schema
class PersonTable(Schema):
"""Add documentation here."""
name: Annotated[Column[StringType], ColumnMeta(comment="")]
age: Annotated[Column[LongType], ColumnMeta(comment="")]
Of note, Catalog() automatically runs register_schema_to_dataset() on the resulting DataSet and Schema, hence resolving potential column disambiguities.
Databases
Catalogs is often the only class you need. But if loading all catalogs takes too long, or if you only want to use only one catalog anyway, you can use Databases instead. We can use Databases(spark, catalog_name=...) to specify which catalog we want to load. Or we can omit this parameter to load the default catalog (often spark_catalog or hive_metastore).
[9]:
from typedspark import Databases
db = Databases()
[10]:
persons, Person = db.default.person_table()
Database
If we just want to load the tables from a single database, we can use Database. Once again, we can either specify the database (through Database(spark, db_name=...)) or leave it blank to load the default database.
[11]:
from typedspark import Database
db = Database()
[12]:
person, Person = db.person_table.load()
Names starting with an underscore
For Catalogs(), Databases() and Database(), names starting with an underscore are problematic for auto-complete. For example, suppose we’d make this table
[13]:
df.write.saveAsTable("default._person")
Then we won’t get auto-complete using db._person, since our notebook will assume we don’t want auto-complete on private variables. To circumvent this, we rename the attribute as such:
[14]:
db = Database()
[15]:
persons, Person = db.u_person()
The underlying table is not renamed, solely the class attribute used for autocomplete.
When renaming the attribute leads to a naming conflict (e.g. because u_person already exists), we resolve the conflict by adding more underscores (e.g. _person would then become u__person).
Loading a single DataSet
If you really only want to load one DataSet, you can use load_table().
[16]:
from typedspark import load_table
person, Person = load_table(spark, "person_table")
Person
[16]:
from pyspark.sql.types import LongType, StringType
from typedspark import Column, Schema
class DynamicallyLoadedSchema(Schema):
name: Column[StringType]
age: Column[LongType]
If you’d like to have the schema name properly displayed, you can use the optional schema_name argument.
[17]:
person, Person = load_table(spark, "person_table", schema_name="Person")
Person
[17]:
from pyspark.sql.types import LongType, StringType
from typedspark import Column, Schema
class Person(Schema):
name: Column[StringType]
age: Column[LongType]