{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "ab6cce16", "metadata": {}, "source": [ "# Transforming a DataSet to another schema\n", "\n", "## The basics\n", "\n", "We often come across the following pattern:" ] }, { "cell_type": "code", "execution_count": 1, "id": "44b1c3bb", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.types import IntegerType, StringType\n", "from typedspark import Column, Schema, DataSet\n", "\n", "\n", "class Person(Schema):\n", " name: Column[StringType]\n", " job_id: Column[IntegerType]\n", "\n", "\n", "class Job(Schema):\n", " id: Column[IntegerType]\n", " function: Column[StringType]\n", " hourly_rate: Column[IntegerType]\n", "\n", "\n", "class PersonWithJob(Person, Job):\n", " id: Column[IntegerType]\n", " name: Column[StringType]\n", " job_name: Column[StringType]\n", " rate: Column[IntegerType]\n", "\n", "\n", "def get_plumbers(persons: DataSet[Person], jobs: DataSet[Job]) -> DataSet[PersonWithJob]:\n", " return DataSet[PersonWithJob](\n", " jobs.filter(Job.function == \"plumber\")\n", " .join(persons, Job.id == Person.job_id)\n", " .withColumn(PersonWithJob.job_name.str, Job.function)\n", " .withColumn(PersonWithJob.rate.str, Job.hourly_rate)\n", " .select(*PersonWithJob.all_column_names())\n", " )" ] }, { "attachments": {}, "cell_type": "markdown", "id": "2ba1ded0", "metadata": {}, "source": [ "We can make that quite a bit more condensed:" ] }, { "cell_type": "code", "execution_count": 2, "id": "a08d2a1f", "metadata": {}, "outputs": [], "source": [ "from typedspark import transform_to_schema\n", "\n", "\n", "def get_plumbers(persons: DataSet[Person], jobs: DataSet[Job]) -> DataSet[PersonWithJob]:\n", " return transform_to_schema(\n", " jobs.filter(\n", " Job.function == \"plumber\",\n", " ).join(\n", " persons,\n", " Job.id == Person.job_id,\n", " ),\n", " PersonWithJob,\n", " {\n", " PersonWithJob.job_name: Job.function,\n", " PersonWithJob.rate: Job.hourly_rate,\n", " },\n", " )" ] }, { "attachments": {}, "cell_type": "markdown", "id": "98d00a0f", "metadata": {}, "source": [ "Specifically, `transform_to_schema()` has the following benefits:\n", "\n", "* No more need to cast every return statement using `DataSet[Schema](...)`\n", "* No more need to drop the columns that are not in the schema using `select(*Schema.all_column_names())`\n", "* Less verbose syntax compared to `.withColumn(...)`\n", "\n", "## Unique keys required\n", "\n", "The `transformations` dictionary in `transform_to_schema(..., transformations)` requires columns with unique names as keys. The following pattern will throw an exception." ] }, { "cell_type": "code", "execution_count": 3, "id": "916f8122", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.Builder().config(\"spark.ui.showConsoleProgress\", \"false\").getOrCreate()\n", "spark.sparkContext.setLogLevel(\"ERROR\")" ] }, { "cell_type": "code", "execution_count": 4, "id": "756995ae", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[CANNOT_CONVERT_COLUMN_INTO_BOOL] Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.\n" ] } ], "source": [ "from typedspark import create_partially_filled_dataset\n", "\n", "df = create_partially_filled_dataset(spark, Job, {Job.hourly_rate: [10, 20, 30]})\n", "\n", "try:\n", " transform_to_schema(\n", " df,\n", " Job,\n", " {\n", " Job.hourly_rate: Job.hourly_rate + 3,\n", " Job.hourly_rate: Job.hourly_rate * 2,\n", " },\n", " )\n", "except ValueError as e:\n", " print(e)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "67b9285c", "metadata": {}, "source": [ "Instead, use one line per column" ] }, { "cell_type": "code", "execution_count": 5, "id": "46c8833f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+--------+-----------+\n", "| id|function|hourly_rate|\n", "+----+--------+-----------+\n", "|NULL| NULL| 26|\n", "|NULL| NULL| 46|\n", "|NULL| NULL| 66|\n", "+----+--------+-----------+\n", "\n" ] } ], "source": [ "transform_to_schema(\n", " df,\n", " Job,\n", " {\n", " Job.hourly_rate: (Job.hourly_rate + 3) * 2,\n", " },\n", ").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "6546f023", "metadata": {}, "source": [ "## Filling missing columns and nested fields with null\n", "\n", "`transform_to_schema()` accepts two flags:\n", "- `fill_unspecified_columns_with_nulls`. When True, it adds null for any top-level column present in the target schema but absent from the source dataframe.\n", "\n", "- `fill_unspecified_inner_fields_with_nulls`. When True handles the complementary case: the top-level column exists in both, but the nested type inside it (structs, array of structs, or map of structs) is missing fields required by the target schema. Those missing fields are filled with null in-place, recursively.\n", "\n", "Use both together when the source data is structurally incomplete at any level relative to the target schema.\n" ] }, { "metadata": {}, "cell_type": "code", "outputs": [], "execution_count": null, "source": "", "id": "e0caa88ff0e2054e" } ], "metadata": { "kernelspec": { "display_name": "typedspark", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 5 }