{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "72077ffb", "metadata": {}, "source": [ "# StructType Columns\n", "\n", "## The basics\n", "\n", "We can define `StructType` columns in `typedspark` as follows:" ] }, { "cell_type": "code", "execution_count": 1, "id": "934c2a2d", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.types import IntegerType, StringType\n", "from typedspark import DataSet, StructType, Schema, Column\n", "\n", "\n", "class Values(Schema):\n", " name: Column[StringType]\n", " severity: Column[IntegerType]\n", "\n", "\n", "class Actions(Schema):\n", " consequeces: Column[StructType[Values]]" ] }, { "attachments": {}, "cell_type": "markdown", "id": "a6cd9cb4", "metadata": {}, "source": [ "We can get auto-complete (and refactoring) of the sub-columns by using:" ] }, { "cell_type": "code", "execution_count": 2, "id": "89b3f661", "metadata": {}, "outputs": [], "source": [ "def get_high_severity_actions(df: DataSet[Actions]) -> DataSet[Actions]:\n", " return df.filter(Actions.consequeces.dtype.schema.severity > 5)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "276c6561", "metadata": {}, "source": [ "## Transform to schema\n", "\n", "You can use the following syntax to add `StructType` columns in `transform_to_schema()`." ] }, { "cell_type": "code", "execution_count": 3, "id": "e0f4da52", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.Builder().config(\"spark.ui.showConsoleProgress\", \"false\").getOrCreate()\n", "spark.sparkContext.setLogLevel(\"ERROR\")" ] }, { "cell_type": "code", "execution_count": 4, "id": "cf40dcde", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------+\n", "|consequeces|\n", "+-----------+\n", "| {a, 1}|\n", "| {b, 2}|\n", "| {c, 3}|\n", "+-----------+\n", "\n" ] } ], "source": [ "from typedspark import create_partially_filled_dataset, transform_to_schema, structtype_column\n", "\n", "\n", "class Input(Schema):\n", " a: Column[StringType]\n", " b: Column[IntegerType]\n", "\n", "\n", "df = create_partially_filled_dataset(\n", " spark,\n", " Input,\n", " {\n", " Input.a: [\"a\", \"b\", \"c\"],\n", " Input.b: [1, 2, 3],\n", " },\n", ")\n", "\n", "transform_to_schema(\n", " df,\n", " Actions,\n", " {\n", " Actions.consequeces: structtype_column(\n", " Actions.consequeces.dtype.schema,\n", " {\n", " Actions.consequeces.dtype.schema.name: Input.a,\n", " Actions.consequeces.dtype.schema.severity: Input.b,\n", " },\n", " )\n", " },\n", ").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "271da325", "metadata": {}, "source": [ "Note that just like in `transform_to_schema()`, the `transformations` dictionary in `structtype_column(..., transformations)` requires columns with unique names as keys.\n", "\n", "## Generating DataSets\n", "\n", "We can generate `DataSets` with `StructType` columns as follows:" ] }, { "cell_type": "code", "execution_count": 5, "id": "3ccb56ff", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------+\n", "|consequeces|\n", "+-----------+\n", "| {NULL, 1}|\n", "| {NULL, 2}|\n", "| {NULL, 3}|\n", "+-----------+\n", "\n" ] } ], "source": [ "from typedspark import create_partially_filled_dataset\n", "\n", "values = create_partially_filled_dataset(\n", " spark,\n", " Values,\n", " {\n", " Values.severity: [1, 2, 3],\n", " },\n", ")\n", "\n", "actions = create_partially_filled_dataset(\n", " spark,\n", " Actions,\n", " {\n", " Actions.consequeces: values.collect(),\n", " },\n", ")\n", "actions.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "8a2232b3", "metadata": {}, "source": [ "Or in row-wise format:" ] }, { "cell_type": "code", "execution_count": 6, "id": "04a8e9d7", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------+\n", "|consequeces|\n", "+-----------+\n", "| {a, 1}|\n", "| {b, 2}|\n", "| {c, 3}|\n", "+-----------+\n", "\n" ] } ], "source": [ "from typedspark import create_structtype_row\n", "\n", "create_partially_filled_dataset(\n", " spark,\n", " Actions,\n", " [\n", " {\n", " Actions.consequeces: create_structtype_row(\n", " Values, {Values.name: \"a\", Values.severity: 1}\n", " ),\n", " },\n", " {\n", " Actions.consequeces: create_structtype_row(\n", " Values, {Values.name: \"b\", Values.severity: 2}\n", " ),\n", " },\n", " {\n", " Actions.consequeces: create_structtype_row(\n", " Values, {Values.name: \"c\", Values.severity: 3}\n", " ),\n", " },\n", " ],\n", ").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "33d83e5f", "metadata": {}, "source": [] }, { "attachments": {}, "cell_type": "markdown", "id": "0f3ab181", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": "typedspark", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 5 }