{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "76ffac97", "metadata": {}, "source": [ "# Subclassing schemas\n", "Subclassing schemas is a useful pattern for pipelines where every next function adds a few columns." ] }, { "cell_type": "code", "execution_count": 1, "id": "7725965f", "metadata": {}, "outputs": [], "source": [ "from typedspark import Column, Schema, DataSet\n", "from pyspark.sql.types import LongType, StringType\n", "from pyspark.sql.functions import lit\n", "\n", "\n", "class Person(Schema):\n", " id: Column[LongType]\n", " name: Column[StringType]\n", "\n", "\n", "class PersonWithAge(Person):\n", " age: Column[LongType]\n", "\n", "\n", "def foo(df: DataSet[Person]) -> DataSet[PersonWithAge]:\n", " return DataSet[PersonWithAge](\n", " df.withColumn(PersonWithAge.age, lit(42)),\n", " )" ] }, { "attachments": {}, "cell_type": "markdown", "id": "cc265385", "metadata": {}, "source": [ "Similarly, you can use this pattern when merging (or joining or concatenating) two datasets together." ] }, { "cell_type": "code", "execution_count": 2, "id": "c0d6b6a6", "metadata": {}, "outputs": [], "source": [ "class PersonA(Schema):\n", " id: Column[LongType]\n", " name: Column[StringType]\n", "\n", "\n", "class PersonB(Schema):\n", " id: Column[LongType]\n", " age: Column\n", "\n", "\n", "class PersonAB(PersonA, PersonB):\n", " pass\n", "\n", "\n", "def foo(df_a: DataSet[PersonA], df_b: DataSet[PersonB]) -> DataSet[PersonAB]:\n", " return DataSet[PersonAB](\n", " df_a.join(df_b, PersonAB.id),\n", " )" ] }, { "cell_type": "markdown", "id": "b24f0e09", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": "typedspark", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 5 }