{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "b83251c2", "metadata": {}, "source": [ "# Advanced usage of type hints\n", "## Functions that do not affect the schema\n", "\n", "There are a number of functions in `DataSet` which do not affect the schema. For example:" ] }, { "cell_type": "code", "execution_count": 1, "id": "bdf75d36", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.Builder().config(\"spark.ui.showConsoleProgress\", \"false\").getOrCreate()\n", "spark.sparkContext.setLogLevel(\"ERROR\")" ] }, { "cell_type": "code", "execution_count": 2, "id": "c84b26e9", "metadata": {}, "outputs": [], "source": [ "from typedspark import Column, Schema, DataSet, create_partially_filled_dataset\n", "from pyspark.sql.types import StringType\n", "\n", "\n", "class A(Schema):\n", " a: Column[StringType]\n", "\n", "\n", "df = create_partially_filled_dataset(\n", " spark,\n", " A,\n", " {\n", " A.a: [\"a\", \"b\", \"c\"],\n", " },\n", ")\n", "res = df.filter(A.a == \"a\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "01675118", "metadata": {}, "source": [ "In the above example, `filter()` will not actually make any changes to the schema, hence we have implemented the return type of `DataSet.filter()` to be a `DataSet` of the same `Schema` that you started with. In other words, a linter will see that `res` is of the type `DataSet[A]`.\n", "\n", "This allows you to skip casting steps in many cases and instead define functions as:" ] }, { "cell_type": "code", "execution_count": 3, "id": "b0fe9344", "metadata": {}, "outputs": [], "source": [ "def foo(df: DataSet[A]) -> DataSet[A]:\n", " return df.filter(A.a == \"a\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "4ed27a7d", "metadata": {}, "source": [ "The functions for which this is currently implemented include:\n", "\n", "* `filter()`\n", "* `distinct()`\n", "* `orderBy()`\n", "* `where()`\n", "* `alias()`\n", "* `persist()`\n", "* `unpersist()`\n", "\n", "## Functions applied to two DataSets of the same schema\n", "\n", "Similarly, some functions return a `DataSet[A]` when they take two `DataSet[A]` as an input. For example, here a linter will see that `res` is of the type `DataSet[A]`." ] }, { "cell_type": "code", "execution_count": 4, "id": "3abfb0d8", "metadata": {}, "outputs": [], "source": [ "df_a = create_partially_filled_dataset(spark, A, {A.a: [\"a\", \"b\", \"c\"]})\n", "df_b = create_partially_filled_dataset(spark, A, {A.a: [\"d\", \"e\", \"f\"]})\n", "\n", "res = df_a.unionByName(df_b)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "dfa33e61", "metadata": {}, "source": [ "The functions in this category include:\n", "\n", "* `unionByName()`\n", "* `join(..., how=\"semi\")`\n", "\n", "## Transformations\n", "\n", "Finally, the `transform()` function can also be typed. In the following example, a linter will see that `res` is of the type `DataSet[B]`." ] }, { "cell_type": "code", "execution_count": 5, "id": "5b0df362", "metadata": {}, "outputs": [], "source": [ "from typedspark import transform_to_schema\n", "from pyspark.sql.functions import lit\n", "\n", "\n", "class B(A):\n", " b: Column[StringType]\n", "\n", "\n", "def foo(df: DataSet[A]) -> DataSet[A]:\n", " return transform_to_schema(\n", " df,\n", " B,\n", " {\n", " B.b: lit(\"hi\"),\n", " },\n", " )\n", "\n", "\n", "res = create_partially_filled_dataset(\n", " spark,\n", " A,\n", " {\n", " A.a: [\"a\", \"b\", \"c\"],\n", " },\n", ").transform(foo)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "4221be22", "metadata": {}, "source": [ "## Did we miss anything?\n", "\n", "There are likely more functions that we did not yet cover. Feel free to make an issue and reach out when there is one that you'd like typedspark to support!" ] }, { "attachments": {}, "cell_type": "markdown", "id": "d21ad841", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": "typedspark", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 5 }