API Documentation

Typedspark: column-wise type annotations for pyspark DataFrames.

class typedspark.ArrayType

Bases: Generic[_ValueType], TypedSparkDataType

Allows for type annotations such as.

class Basket(Schema):
    items: Column[ArrayType[StringType]]
class typedspark.Catalogs(spark: SparkSession | None = None, silent: bool = False)

Bases: object

Loads all catalogs, databases and tables in a SparkSession.

class typedspark.Column(name: str, dataframe: DataFrame | None = None, curid: int | None = None, dtype: T | None = None, parent: DataFrame | Column | None = None, alias: str | None = None)

Bases: Column, Generic[T]

Represents a Column in a Schema. Can be used as:

class A(Schema):
    a: Column[IntegerType]
    b: Column[StringType]
property dtype: T

Get the datatype of the column, e.g. Column[IntegerType] -> IntegerType.

property full_path: str

Full path of the column including parent structure. Example: .. code-block:: python

from pyspark.sql.types import IntegerType, StringType from typedspark import DataSet, StructType, Schema, Column

class Values(Schema):

name: Column[StringType] severity: Column[IntegerType]

class Actions(Schema):

consequences: Column[StructType[Values]]

Actions.consequences.dtype.schema.severity.full_path will yield the name of the field severity including the full path: consequences.severity

class typedspark.ColumnMeta(comment: str | None = None)

Bases: object

Contains the metadata for a Column. Used as:

class A(Schema):
    a: Annotated[
        Column[IntegerType],
        ColumnMeta(
            comment="This is a comment",
        )
    ]
comment: str | None = None
get_metadata() Dict[str, str] | None

Returns the metadata of this column.

class typedspark.DataSet(dataframe: DataFrame)

Bases: DataSetImplements[_Schema, _Schema]

DataSet subclasses pyspark DataFrame and hence has all the same functionality, with in addition the possibility to define a schema.

class Person(Schema):
    name: Column[StringType]
    age: Column[LongType]

def foo(df: DataSet[Person]) -> DataSet[Person]:
    # do stuff
    return df
alias(alias: str) DataSet[_Schema]
cache() DataSet[_Schema]
checkpoint(eager: bool = True) DataSet[_Schema]
coalesce(numPartitions: int) DataSet[_Schema]
distinct() DataSet[_Schema]
dropDuplicates(subset: List[str] | None = None) DataSet[_Schema]
dropDuplicatesWithinWatermark(subset: List[str] | None = None) DataSet[_Schema]
drop_duplicates(subset: List[str] | None = None) DataSet[_Schema]
dropna(how: str = 'any', thresh: int | None = None, subset: str | Tuple[str, ...] | List[str] | None = None) DataSet[_Schema]
exceptAll(other: DataSet[_Schema]) DataSet[_Schema]
exceptAll(other: DataFrame) DataFrame
fillna(value: bool | float | int | str, subset: str | Tuple[str, ...] | List[str] | None = None) DataSet[_Schema]
fillna(value: Dict[str, bool | float | int | str]) DataSet[_Schema]
filter(condition: Column | str) DataSet[_Schema]

Filters rows using the given condition.

hint(name: str, *parameters) DataSet[_Schema]
intersect(other: DataSet[_Schema]) DataSet[_Schema]
intersect(other: DataFrame) DataFrame
intersectAll(other: DataSet[_Schema]) DataSet[_Schema]
intersectAll(other: DataFrame) DataFrame
join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: None = None) DataFrame
join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: Literal['semi'] = None) DataSet[_Schema]
join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: str | None = None) DataFrame
limit(num: int) DataSet[_Schema]
localCheckpoint(eager: bool = True, storageLevel: StorageLevel | None = None) DataSet[_Schema]
observe(*args, **kwargs) DataSet[_Schema]
orderBy(*args, **kwargs) DataSet[_Schema]
persist(storageLevel: StorageLevel = StorageLevel(True, True, False, True, 1)) DataSet[_Schema]
repartition(numPartitions: int, *cols: Column | str) DataSet[_Schema]
repartition(*cols: Column | str) DataSet[_Schema]
repartitionByRange(numPartitions: int, *cols: Column | str) DataSet[_Schema]
repartitionByRange(*cols: Column | str) DataSet[_Schema]
replace(to_replace: bool | float | int | str, value: bool | float | int | str | None, subset: List[str] | None = None) DataSet[_Schema]
replace(to_replace: List[bool | float | int | str], value: List[bool | float | int | str | None], subset: List[str] | None = None) DataSet[_Schema]
replace(to_replace: Dict[bool | float | int | str, bool | float | int | str | None], subset: List[str] | None = None) DataSet[_Schema]
replace(to_replace: List[bool | float | int | str], value: bool | float | int | str | None, subset: List[str] | None = None) DataSet[_Schema]
sample(*args, **kwargs) DataSet[_Schema]
sampleBy(col, fractions, seed: int | None = None) DataSet[_Schema]
sort(*cols, **kwargs) DataSet[_Schema]
sortWithinPartitions(*cols, **kwargs) DataSet[_Schema]
subtract(other: DataSet[_Schema]) DataSet[_Schema]
subtract(other: DataFrame) DataFrame
transform(func: ~typing.Callable[[~typing.Concatenate[~typedspark._core.dataset.DataSet[_Schema], ~P]], ~typedspark._core.dataset._ReturnType], *args: ~typing.~P, **kwargs: ~typing.~P) _ReturnType
union(other: DataSet[_Schema]) DataSet[_Schema]
union(other: DataFrame) DataFrame
unionAll(other: DataSet[_Schema]) DataSet[_Schema]
unionAll(other: DataFrame) DataFrame
unionByName(other: DataSet[_Schema], allowMissingColumns: Literal[False] = False) DataSet[_Schema]
unionByName(other: DataFrame, allowMissingColumns: bool = False) DataFrame
unpersist(blocking: bool = False) DataSet[_Schema]
where(condition: Column | str) DataSet[_Schema]

Filters rows using the given condition.

class typedspark.DataSetImplements(*args, **kwargs)

Bases: DataFrame, Generic[_Protocol, _Implementation]

DataSetImplements allows us to define functions such as:

Such a function: 1. Takes as an input DataSetImplements[Age, T]: a DataSet that implements the protocol

Age as T.

  1. Returns a DataSet[T]: a DataSet of the same type as the one that was provided.

DataSetImplements should solely be used as a type annotation, it is never initialized.

alias(alias: str) DataSet[_Implementation]
cache() DataSet[_Implementation]
checkpoint(eager: bool = True) DataSet[_Implementation]
coalesce(numPartitions: int) DataSet[_Implementation]
distinct() DataSet[_Implementation]
dropDuplicates(subset: List[str] | None = None) DataSet[_Implementation]
dropDuplicatesWithinWatermark(subset: List[str] | None = None) DataSet[_Implementation]
drop_duplicates(subset: List[str] | None = None) DataSet[_Implementation]
dropna(how: str = 'any', thresh: int | None = None, subset: str | Tuple[str, ...] | List[str] | None = None) DataSet[_Implementation]
exceptAll(other: DataSet[_Implementation]) DataSet[_Implementation]
exceptAll(other: DataFrame) DataFrame
fillna(value: bool | float | int | str, subset: str | Tuple[str, ...] | List[str] | None = None) DataSet[_Implementation]
fillna(value: Dict[str, bool | float | int | str]) DataSet[_Implementation]
filter(condition: Column | str) DataSet[_Implementation]

Filters rows using the given condition.

hint(name: str, *parameters) DataSet[_Implementation]
intersect(other: DataSet[_Implementation]) DataSet[_Implementation]
intersect(other: DataFrame) DataFrame
intersectAll(other: DataSet[_Implementation]) DataSet[_Implementation]
intersectAll(other: DataFrame) DataFrame
join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: None = None) DataFrame
join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: Literal['semi'] = None) DataSet[_Implementation]
join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: str | None = None) DataFrame
limit(num: int) DataSet[_Implementation]
localCheckpoint(eager: bool = True, storageLevel: StorageLevel | None = None) DataSet[_Implementation]
observe(*args, **kwargs) DataSet[_Implementation]
orderBy(*args, **kwargs) DataSet[_Implementation]
persist(storageLevel: StorageLevel = StorageLevel(True, True, False, True, 1)) DataSet[_Implementation]
repartition(numPartitions: int, *cols: Column | str) DataSet[_Implementation]
repartition(*cols: Column | str) DataSet[_Implementation]
repartitionByRange(numPartitions: int, *cols: Column | str) DataSet[_Implementation]
repartitionByRange(*cols: Column | str) DataSet[_Implementation]
replace(to_replace: bool | float | int | str, value: bool | float | int | str | None, subset: List[str] | None = None) DataSet[_Implementation]
replace(to_replace: List[bool | float | int | str], value: List[bool | float | int | str | None], subset: List[str] | None = None) DataSet[_Implementation]
replace(to_replace: Dict[bool | float | int | str, bool | float | int | str | None], subset: List[str] | None = None) DataSet[_Implementation]
replace(to_replace: List[bool | float | int | str], value: bool | float | int | str | None, subset: List[str] | None = None) DataSet[_Implementation]
sample(*args, **kwargs) DataSet[_Implementation]
sampleBy(col, fractions, seed: int | None = None) DataSet[_Implementation]
sort(*cols, **kwargs) DataSet[_Implementation]
sortWithinPartitions(*cols, **kwargs) DataSet[_Implementation]
subtract(other: DataSet[_Implementation]) DataSet[_Implementation]
subtract(other: DataFrame) DataFrame
transform(func: ~typing.Callable[[~typing.Concatenate[~typedspark._core.dataset.DataSet[_Implementation], ~P]], ~typedspark._core.dataset._ReturnType], *args: ~typing.~P, **kwargs: ~typing.~P) _ReturnType
property typedspark_schema: Type[_Implementation]

Returns the Schema of the DataSet.

union(other: DataSet[_Implementation]) DataSet[_Implementation]
union(other: DataFrame) DataFrame
unionAll(other: DataSet[_Implementation]) DataSet[_Implementation]
unionAll(other: DataFrame) DataFrame
unionByName(other: DataSet[_Implementation], allowMissingColumns: Literal[False] = False) DataSet[_Implementation]
unionByName(other: DataFrame, allowMissingColumns: bool = False) DataFrame
unpersist(blocking: bool = False) DataSet[_Implementation]
where(condition: Column | str) DataSet[_Implementation]

Filters rows using the given condition.

class typedspark.Database(spark: SparkSession | None = None, db_name: str = 'default', catalog_name: str | None = None)

Bases: object

Loads all tables in a database.

property str: str

Returns the database name.

class typedspark.Databases(spark: SparkSession | None = None, silent: bool = False, catalog_name: str | None = None)

Bases: object

Loads all databases and tables in a SparkSession.

class typedspark.DayTimeIntervalType

Bases: Generic[_StartField, _EndField], TypedSparkDataType

Allows for type annotations such as.

class TimeInterval(Schema):
    interval: Column[DayTimeIntervalType[IntervalType.HOUR, IntervalType.SECOND]
class typedspark.DecimalType

Bases: Generic[_Precision, _Scale], TypedSparkDataType

Allows for type annotations such as.

class Numbers(Schema):
    number: Column[DecimalType[Literal[10], Literal[0]]]
class typedspark.IntervalType

Bases: LiteralType

Interval types for DayTimeIntervalType.

DAY

alias of Literal[0]

HOUR

alias of Literal[1]

MINUTE

alias of Literal[2]

SECOND

alias of Literal[3]

class typedspark.MapType

Bases: Generic[_KeyType, _ValueType], TypedSparkDataType

Allows for type annotations such as.

class Basket(Schema):
    items: Column[MapType[StringType, StringType]]
class typedspark.MetaSchema(name: str, bases: Any, dct: Dict[str, Any])

Bases: _ProtocolMeta

MetaSchema is the metaclass of Schema.

It basically implements all functionality of Schema. But since classes are typically considered more convenient than metaclasses, we provide Schema as the public interface.

class A(Schema):
    a: Column[IntegerType]
    b: Column[StringType]

DataSet[A](df)

The class methods of Schema are described here.

all_column_names() List[str]

Returns all column names for a given schema.

all_column_names_except_for(except_for: List[str]) List[str]

Returns all column names for a given schema except for the columns specified in the except_for parameter.

get_dlt_kwargs(name: str | None = None) DltKwargs

Creates a representation of the Schema to be used by Delta Live Tables.

@dlt.table(**DimPatient.get_dlt_kwargs())
def table_definition() -> DataSet[DimPatient]:
    <your table definition here>
get_docstring() str | None

Returns the docstring of the schema.

get_metadata() dict[str, dict[str, Any]]

Returns the metadata of each of the columns in the schema.

get_schema_definition_as_string(schema_name: str | None = None, include_documentation: bool = False, generate_imports: bool = True, add_subschemas: bool = True) str

Return the code for the Schema as a string.

get_schema_name()

Returns the name with which the schema was initialized.

get_snake_case() str

Return the class name transformed into snakecase.

get_structtype() StructType

Creates the spark StructType for the schema.

print_schema(schema_name: str | None = None, include_documentation: bool = False, generate_imports: bool = True, add_subschemas: bool = False)

Print the code for the Schema.

class typedspark.Schema

Bases: Protocol

class typedspark.StructType(schema: Type[_Schema], parent: Column[Any])

Bases: Generic[_Schema], TypedSparkDataType

Allows for type annotations such as:

class Job(Schema):
    position: Column[StringType]
    salary: Column[LongType]

class Person(Schema):
    job: Column[StructType[Job]]
schema = None
typedspark.create_empty_dataset(spark: SparkSession, schema: Type[T], n_rows: int = 3) DataSet[T]

Creates a DataSet with Schema schema, containing n_rows rows, filled with None values.

class Person(Schema):
    name: Column[StringType]
    age: Column[LongType]

df = create_empty_dataset(spark, Person)
typedspark.create_partially_filled_dataset(spark: SparkSession, schema: Type[T], data: Dict[Column[Any], List[Any]] | List[Dict[Column[Any], Any]]) DataSet[T]

Creates a DataSet with Schema schema, where data can be defined in either of the following two ways:

class Person(Schema):
    name: Column[StringType]
    age: Column[LongType]
    job: Column[StringType]

df = create_partially_filled_dataset(
    spark,
    Person,
    {
        Person.name: ["John", "Jack", "Jane"],
        Person.age: [30, 40, 50],
    }
)

Or:

df = create_partially_filled_dataset(
    spark,
    Person,
    [
        {Person.name: "John", Person.age: 30},
        {Person.name: "Jack", Person.age: 40},
        {Person.name: "Jane", Person.age: 50},
    ]
)

Any columns in the schema that are not present in the data will be initialized with None values.

typedspark.create_schema(dataframe: DataFrame, schema_name: str | None = None) Tuple[DataSet[Schema], Type[Schema]]

This function inferres a Schema in a notebook based on a the provided DataFrame.

This allows for autocompletion on column names, amongst other things.

df, Person = create_schema(df)
typedspark.create_structtype_row(schema: Type[T], data: Dict[Column[Any], Any]) Row

Creates a Row with StructType schema, where data is a mapping from column to data in the respective column.

typedspark.load_table(spark: SparkSession, table_name: str, schema_name: str | None = None) Tuple[DataSet[Schema], Type[Schema]]

This function loads a DataSet, along with its inferred Schema, in a notebook.

This allows for autocompletion on column names, amongst other things.

df, Person = load_table(spark, "path.to.table")
typedspark.register_schema_to_dataset(dataframe: DataSet[T], schema: Type[T]) Type[T]

Helps combat column ambiguity. For example:

class Person(Schema):
    id: Column[IntegerType]
    name: Column[StringType]

class Job(Schema):
    id: Column[IntegerType]
    salary: Column[IntegerType]

class PersonWithJob(Person, Job):
    pass

def foo(df_a: DataSet[Person], df_b: DataSet[Job]) -> DataSet[PersonWithJob]:
    return DataSet[PersonWithSalary](
        df_a.join(
            df_b,
            Person.id == Job.id
        )
    )

Calling foo() would result in a AnalysisException, because Spark can’t figure out whether id belongs to df_a or df_b. To deal with this, you need to register your Schema to the DataSet.

from typedspark import register_schema_to_dataset

def foo(df_a: DataSet[Person], df_b: DataSet[Job]) -> DataSet[PersonWithSalary]:
    person = register_schema_to_dataset(df_a, Person)
    job = register_schema_to_dataset(df_b, Job)
    return DataSet[PersonWithSalary](
        df_a.join(
            df_b,
            person.id == job.id
        )
    )
typedspark.register_schema_to_dataset_with_alias(dataframe: DataSet[T], schema: Type[T], alias: str) Tuple[DataSet[T], Type[T]]

When dealing with self-joins, running register_dataset_to_schema() is not enough.

Instead, we’ll need register_dataset_to_schema_with_alias(), e.g.:

class Person(Schema):
    id: Column[IntegerType]
    name: Column[StringType]

df_a, person_a = register_schema_to_dataset_with_alias(df, Person, alias="a")
df_b, person_b = register_schema_to_dataset_with_alias(df, Person, alias="b")

df_a.join(df_b, person_a.id == person_b.id)
typedspark.structtype_column(schema: Type[Schema], transformations: Dict[Column[Any], Column] | None = None, fill_unspecified_columns_with_nulls: bool = False) Column

Helps with creating new StructType columns of a certain schema, for example:

transform_to_schema(
    df,
    Output,
    {
        Output.values: structtype_column(
            Value,
            {
                Value.a: Input.a + 2,
                ...
            }
        )
    }
)
typedspark.transform_to_schema(dataframe: DataFrame, schema: Type[T], transformations: Dict[Column[Any], Column] | None = None, fill_unspecified_columns_with_nulls: bool = False, fill_unspecified_inner_fields_with_nulls: bool = False, run_sequentially: bool = True) DataSet[T]

On the provided DataFrame df, it performs the transformations (if provided), and subsequently subsets the resulting DataFrame to the columns specified in schema.

transform_to_schema(
    df_a.join(df_b, A.a == B.f),
    AB,
    {
        AB.a: A.a + 3,
        AB.b: A.b + 7,
        AB.i: B.i - 5,
        AB.j: B.j + 1,
    }
)