StructType Columns

The basics

We can define StructType columns in typedspark as follows:

[1]:
from pyspark.sql.types import IntegerType, StringType
from typedspark import DataSet, StructType, Schema, Column


class Values(Schema):
    name: Column[StringType]
    severity: Column[IntegerType]


class Actions(Schema):
    consequeces: Column[StructType[Values]]

We can get auto-complete (and refactoring) of the sub-columns by using:

[2]:
def get_high_severity_actions(df: DataSet[Actions]) -> DataSet[Actions]:
    return df.filter(Actions.consequeces.dtype.schema.severity > 5)

Transform to schema

You can use the following syntax to add StructType columns in transform_to_schema().

[3]:
from pyspark.sql import SparkSession

spark = SparkSession.Builder().config("spark.ui.showConsoleProgress", "false").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
[4]:
from typedspark import create_partially_filled_dataset, transform_to_schema, structtype_column


class Input(Schema):
    a: Column[StringType]
    b: Column[IntegerType]


df = create_partially_filled_dataset(
    spark,
    Input,
    {
        Input.a: ["a", "b", "c"],
        Input.b: [1, 2, 3],
    },
)

transform_to_schema(
    df,
    Actions,
    {
        Actions.consequeces: structtype_column(
            Actions.consequeces.dtype.schema,
            {
                Actions.consequeces.dtype.schema.name: Input.a,
                Actions.consequeces.dtype.schema.severity: Input.b,
            },
        )
    },
).show()
+-----------+
|consequeces|
+-----------+
|     {a, 1}|
|     {b, 2}|
|     {c, 3}|
+-----------+

Note that just like in transform_to_schema(), the transformations dictionary in structtype_column(..., transformations) requires columns with unique names as keys.

Generating DataSets

We can generate DataSets with StructType columns as follows:

[5]:
from typedspark import create_partially_filled_dataset

values = create_partially_filled_dataset(
    spark,
    Values,
    {
        Values.severity: [1, 2, 3],
    },
)

actions = create_partially_filled_dataset(
    spark,
    Actions,
    {
        Actions.consequeces: values.collect(),
    },
)
actions.show()
+-----------+
|consequeces|
+-----------+
|  {NULL, 1}|
|  {NULL, 2}|
|  {NULL, 3}|
+-----------+

Or in row-wise format:

[6]:
from typedspark import create_structtype_row

create_partially_filled_dataset(
    spark,
    Actions,
    [
        {
            Actions.consequeces: create_structtype_row(
                Values, {Values.name: "a", Values.severity: 1}
            ),
        },
        {
            Actions.consequeces: create_structtype_row(
                Values, {Values.name: "b", Values.severity: 2}
            ),
        },
        {
            Actions.consequeces: create_structtype_row(
                Values, {Values.name: "c", Values.severity: 3}
            ),
        },
    ],
).show()
+-----------+
|consequeces|
+-----------+
|     {a, 1}|
|     {b, 2}|
|     {c, 3}|
+-----------+