API Documentation
Typedspark: column-wise type annotations for pyspark DataFrames.
- class typedspark.ArrayType
Bases:
Generic[_ValueType],TypedSparkDataTypeAllows for type annotations such as.
class Basket(Schema): items: Column[ArrayType[StringType]]
- class typedspark.Catalogs(spark: SparkSession | None = None, silent: bool = False)
Bases:
objectLoads all catalogs, databases and tables in a SparkSession.
- class typedspark.Column(name: str, dataframe: DataFrame | None = None, curid: int | None = None, dtype: T | None = None, parent: DataFrame | Column | None = None, alias: str | None = None)
Bases:
Column,Generic[T]Represents a
Columnin aSchema. Can be used as:class A(Schema): a: Column[IntegerType] b: Column[StringType]
- property dtype: T
Get the datatype of the column, e.g. Column[IntegerType] -> IntegerType.
- property full_path: str
Full path of the column including parent structure. Example: .. code-block:: python
from pyspark.sql.types import IntegerType, StringType from typedspark import DataSet, StructType, Schema, Column
- class Values(Schema):
name: Column[StringType] severity: Column[IntegerType]
- class Actions(Schema):
consequences: Column[StructType[Values]]
Actions.consequences.dtype.schema.severity.full_path will yield the name of the field severity including the full path: consequences.severity
- class typedspark.ColumnMeta(comment: str | None = None)
Bases:
objectContains the metadata for a
Column. Used as:class A(Schema): a: Annotated[ Column[IntegerType], ColumnMeta( comment="This is a comment", ) ]
- class typedspark.DataSet(dataframe: DataFrame)
Bases:
DataSetImplements[_Schema,_Schema]DataSetsubclasses pysparkDataFrameand hence has all the same functionality, with in addition the possibility to define a schema.class Person(Schema): name: Column[StringType] age: Column[LongType] def foo(df: DataSet[Person]) -> DataSet[Person]: # do stuff return df
- dropna(how: str = 'any', thresh: int | None = None, subset: str | Tuple[str, ...] | List[str] | None = None) DataSet[_Schema]
- fillna(value: bool | float | int | str, subset: str | Tuple[str, ...] | List[str] | None = None) DataSet[_Schema]
- fillna(value: Dict[str, bool | float | int | str]) DataSet[_Schema]
- join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: None = None) DataFrame
- join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: Literal['semi'] = None) DataSet[_Schema]
- join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: str | None = None) DataFrame
- repartition(numPartitions: int, *cols: Column | str) DataSet[_Schema]
- repartition(*cols: Column | str) DataSet[_Schema]
- repartitionByRange(numPartitions: int, *cols: Column | str) DataSet[_Schema]
- repartitionByRange(*cols: Column | str) DataSet[_Schema]
- replace(to_replace: bool | float | int | str, value: bool | float | int | str | None, subset: List[str] | None = None) DataSet[_Schema]
- replace(to_replace: List[bool | float | int | str], value: List[bool | float | int | str | None], subset: List[str] | None = None) DataSet[_Schema]
- replace(to_replace: Dict[bool | float | int | str, bool | float | int | str | None], subset: List[str] | None = None) DataSet[_Schema]
- replace(to_replace: List[bool | float | int | str], value: bool | float | int | str | None, subset: List[str] | None = None) DataSet[_Schema]
- transform(func: ~typing.Callable[[~typing.Concatenate[~typedspark._core.dataset.DataSet[_Schema], ~P]], ~typedspark._core.dataset._ReturnType], *args: ~typing.~P, **kwargs: ~typing.~P) _ReturnType
- class typedspark.DataSetImplements(*args, **kwargs)
Bases:
DataFrame,Generic[_Protocol,_Implementation]DataSetImplements allows us to define functions such as:
Such a function: 1. Takes as an input
DataSetImplements[Age, T]: aDataSetthat implements the protocolAgeasT.Returns a
DataSet[T]: aDataSetof the same type as the one that was provided.
DataSetImplementsshould solely be used as a type annotation, it is never initialized.- dropna(how: str = 'any', thresh: int | None = None, subset: str | Tuple[str, ...] | List[str] | None = None) DataSet[_Implementation]
- exceptAll(other: DataSet[_Implementation]) DataSet[_Implementation]
- exceptAll(other: DataFrame) DataFrame
- fillna(value: bool | float | int | str, subset: str | Tuple[str, ...] | List[str] | None = None) DataSet[_Implementation]
- fillna(value: Dict[str, bool | float | int | str]) DataSet[_Implementation]
- intersect(other: DataSet[_Implementation]) DataSet[_Implementation]
- intersect(other: DataFrame) DataFrame
- intersectAll(other: DataSet[_Implementation]) DataSet[_Implementation]
- intersectAll(other: DataFrame) DataFrame
- join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: None = None) DataFrame
- join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: Literal['semi'] = None) DataSet[_Implementation]
- join(other: DataFrame, on: str | List[str] | Column | List[Column] | None = None, how: str | None = None) DataFrame
- localCheckpoint(eager: bool = True, storageLevel: StorageLevel | None = None) DataSet[_Implementation]
- persist(storageLevel: StorageLevel = StorageLevel(True, True, False, True, 1)) DataSet[_Implementation]
- repartition(numPartitions: int, *cols: Column | str) DataSet[_Implementation]
- repartition(*cols: Column | str) DataSet[_Implementation]
- repartitionByRange(numPartitions: int, *cols: Column | str) DataSet[_Implementation]
- repartitionByRange(*cols: Column | str) DataSet[_Implementation]
- replace(to_replace: bool | float | int | str, value: bool | float | int | str | None, subset: List[str] | None = None) DataSet[_Implementation]
- replace(to_replace: List[bool | float | int | str], value: List[bool | float | int | str | None], subset: List[str] | None = None) DataSet[_Implementation]
- replace(to_replace: Dict[bool | float | int | str, bool | float | int | str | None], subset: List[str] | None = None) DataSet[_Implementation]
- replace(to_replace: List[bool | float | int | str], value: bool | float | int | str | None, subset: List[str] | None = None) DataSet[_Implementation]
- subtract(other: DataSet[_Implementation]) DataSet[_Implementation]
- subtract(other: DataFrame) DataFrame
- transform(func: ~typing.Callable[[~typing.Concatenate[~typedspark._core.dataset.DataSet[_Implementation], ~P]], ~typedspark._core.dataset._ReturnType], *args: ~typing.~P, **kwargs: ~typing.~P) _ReturnType
- property typedspark_schema: Type[_Implementation]
Returns the
Schemaof theDataSet.
- unionAll(other: DataSet[_Implementation]) DataSet[_Implementation]
- unionAll(other: DataFrame) DataFrame
- class typedspark.Database(spark: SparkSession | None = None, db_name: str = 'default', catalog_name: str | None = None)
Bases:
objectLoads all tables in a database.
- class typedspark.Databases(spark: SparkSession | None = None, silent: bool = False, catalog_name: str | None = None)
Bases:
objectLoads all databases and tables in a SparkSession.
- class typedspark.DayTimeIntervalType
Bases:
Generic[_StartField,_EndField],TypedSparkDataTypeAllows for type annotations such as.
class TimeInterval(Schema): interval: Column[DayTimeIntervalType[IntervalType.HOUR, IntervalType.SECOND]
- class typedspark.DecimalType
Bases:
Generic[_Precision,_Scale],TypedSparkDataTypeAllows for type annotations such as.
class Numbers(Schema): number: Column[DecimalType[Literal[10], Literal[0]]]
- class typedspark.IntervalType
Bases:
LiteralTypeInterval types for
DayTimeIntervalType.- DAY
alias of
Literal[0]
- HOUR
alias of
Literal[1]
- MINUTE
alias of
Literal[2]
- SECOND
alias of
Literal[3]
- class typedspark.MapType
Bases:
Generic[_KeyType,_ValueType],TypedSparkDataTypeAllows for type annotations such as.
class Basket(Schema): items: Column[MapType[StringType, StringType]]
- class typedspark.MetaSchema(name: str, bases: Any, dct: Dict[str, Any])
Bases:
_ProtocolMetaMetaSchemais the metaclass ofSchema.It basically implements all functionality of
Schema. But since classes are typically considered more convenient than metaclasses, we provideSchemaas the public interface.class A(Schema): a: Column[IntegerType] b: Column[StringType] DataSet[A](df)
The class methods of
Schemaare described here.- all_column_names_except_for(except_for: List[str]) List[str]
Returns all column names for a given schema except for the columns specified in the
except_forparameter.
- get_dlt_kwargs(name: str | None = None) DltKwargs
Creates a representation of the
Schemato be used by Delta Live Tables.@dlt.table(**DimPatient.get_dlt_kwargs()) def table_definition() -> DataSet[DimPatient]: <your table definition here>
- get_metadata() dict[str, dict[str, Any]]
Returns the metadata of each of the columns in the schema.
- get_schema_definition_as_string(schema_name: str | None = None, include_documentation: bool = False, generate_imports: bool = True, add_subschemas: bool = True) str
Return the code for the
Schemaas a string.
- get_schema_name()
Returns the name with which the schema was initialized.
- get_structtype() StructType
Creates the spark StructType for the schema.
- class typedspark.Schema
Bases:
Protocol
- class typedspark.StructType(schema: Type[_Schema], parent: Column[Any])
Bases:
Generic[_Schema],TypedSparkDataTypeAllows for type annotations such as:
class Job(Schema): position: Column[StringType] salary: Column[LongType] class Person(Schema): job: Column[StructType[Job]]
- schema = None
- typedspark.create_empty_dataset(spark: SparkSession, schema: Type[T], n_rows: int = 3) DataSet[T]
Creates a
DataSetwithSchemaschema, containingn_rowsrows, filled withNonevalues.class Person(Schema): name: Column[StringType] age: Column[LongType] df = create_empty_dataset(spark, Person)
- typedspark.create_partially_filled_dataset(spark: SparkSession, schema: Type[T], data: Dict[Column[Any], List[Any]] | List[Dict[Column[Any], Any]]) DataSet[T]
Creates a
DataSetwithSchemaschema, wheredatacan be defined in either of the following two ways:class Person(Schema): name: Column[StringType] age: Column[LongType] job: Column[StringType] df = create_partially_filled_dataset( spark, Person, { Person.name: ["John", "Jack", "Jane"], Person.age: [30, 40, 50], } )
Or:
df = create_partially_filled_dataset( spark, Person, [ {Person.name: "John", Person.age: 30}, {Person.name: "Jack", Person.age: 40}, {Person.name: "Jane", Person.age: 50}, ] )
Any columns in the schema that are not present in the data will be initialized with
Nonevalues.
- typedspark.create_schema(dataframe: DataFrame, schema_name: str | None = None) Tuple[DataSet[Schema], Type[Schema]]
This function inferres a
Schemain a notebook based on a the providedDataFrame.This allows for autocompletion on column names, amongst other things.
df, Person = create_schema(df)
- typedspark.create_structtype_row(schema: Type[T], data: Dict[Column[Any], Any]) Row
Creates a
RowwithStructTypeschema, wheredatais a mapping from column to data in the respective column.
- typedspark.load_table(spark: SparkSession, table_name: str, schema_name: str | None = None) Tuple[DataSet[Schema], Type[Schema]]
This function loads a
DataSet, along with its inferredSchema, in a notebook.This allows for autocompletion on column names, amongst other things.
df, Person = load_table(spark, "path.to.table")
- typedspark.register_schema_to_dataset(dataframe: DataSet[T], schema: Type[T]) Type[T]
Helps combat column ambiguity. For example:
class Person(Schema): id: Column[IntegerType] name: Column[StringType] class Job(Schema): id: Column[IntegerType] salary: Column[IntegerType] class PersonWithJob(Person, Job): pass def foo(df_a: DataSet[Person], df_b: DataSet[Job]) -> DataSet[PersonWithJob]: return DataSet[PersonWithSalary]( df_a.join( df_b, Person.id == Job.id ) )
Calling
foo()would result in aAnalysisException, because Spark can’t figure out whetheridbelongs todf_aordf_b. To deal with this, you need to register yourSchemato theDataSet.from typedspark import register_schema_to_dataset def foo(df_a: DataSet[Person], df_b: DataSet[Job]) -> DataSet[PersonWithSalary]: person = register_schema_to_dataset(df_a, Person) job = register_schema_to_dataset(df_b, Job) return DataSet[PersonWithSalary]( df_a.join( df_b, person.id == job.id ) )
- typedspark.register_schema_to_dataset_with_alias(dataframe: DataSet[T], schema: Type[T], alias: str) Tuple[DataSet[T], Type[T]]
When dealing with self-joins, running register_dataset_to_schema() is not enough.
Instead, we’ll need register_dataset_to_schema_with_alias(), e.g.:
class Person(Schema): id: Column[IntegerType] name: Column[StringType] df_a, person_a = register_schema_to_dataset_with_alias(df, Person, alias="a") df_b, person_b = register_schema_to_dataset_with_alias(df, Person, alias="b") df_a.join(df_b, person_a.id == person_b.id)
- typedspark.structtype_column(schema: Type[Schema], transformations: Dict[Column[Any], Column] | None = None, fill_unspecified_columns_with_nulls: bool = False) Column
Helps with creating new
StructTypecolumns of a certain schema, for example:transform_to_schema( df, Output, { Output.values: structtype_column( Value, { Value.a: Input.a + 2, ... } ) } )
- typedspark.transform_to_schema(dataframe: DataFrame, schema: Type[T], transformations: Dict[Column[Any], Column] | None = None, fill_unspecified_columns_with_nulls: bool = False, fill_unspecified_inner_fields_with_nulls: bool = False, run_sequentially: bool = True) DataSet[T]
On the provided DataFrame
df, it performs thetransformations(if provided), and subsequently subsets the resulting DataFrame to the columns specified inschema.transform_to_schema( df_a.join(df_b, A.a == B.f), AB, { AB.a: A.a + 3, AB.b: A.b + 7, AB.i: B.i - 5, AB.j: B.j + 1, } )