{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "a836183d", "metadata": {}, "source": [ "# Dealing with column ambiguity\n", "\n", "## The basics\n", "\n", "Sometimes columns can be ambiguous, for example:" ] }, { "cell_type": "code", "execution_count": 1, "id": "ee6a97e4", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.Builder().config(\"spark.ui.showConsoleProgress\", \"false\").getOrCreate()\n", "spark.sparkContext.setLogLevel(\"ERROR\")" ] }, { "cell_type": "code", "execution_count": 2, "id": "6206d284", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`id`, `id`].\n" ] } ], "source": [ "from typedspark import Column, Schema, create_partially_filled_dataset\n", "from pyspark.sql.types import IntegerType, StringType\n", "\n", "\n", "class Person(Schema):\n", " id: Column[IntegerType]\n", " name: Column[StringType]\n", " age: Column[IntegerType]\n", "\n", "\n", "class Job(Schema):\n", " id: Column[IntegerType]\n", " salary: Column[IntegerType]\n", "\n", "\n", "df_a = create_partially_filled_dataset(spark, Person, {Person.id: [1, 2, 3]})\n", "df_b = create_partially_filled_dataset(spark, Job, {Job.id: [1, 2, 3]})\n", "\n", "try:\n", " df_a.join(df_b, Person.id == Job.id)\n", "except Exception as e:\n", " print(e)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "09be224a", "metadata": {}, "source": [ "The above resulted in a `AnalysisException`, because Spark can't figure out whether `id` belongs to `df_a` or `df_b`. To deal with this, you need to register your `Schema` to the `DataSet`." ] }, { "cell_type": "code", "execution_count": 3, "id": "459a5e06", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+----+---+------+\n", "| id|name| age| id|salary|\n", "+---+----+----+---+------+\n", "| 1|NULL|NULL| 1| NULL|\n", "| 2|NULL|NULL| 2| NULL|\n", "| 3|NULL|NULL| 3| NULL|\n", "+---+----+----+---+------+\n", "\n" ] } ], "source": [ "from typedspark import register_schema_to_dataset\n", "\n", "person = register_schema_to_dataset(df_a, Person)\n", "job = register_schema_to_dataset(df_b, Job)\n", "(\n", " df_a.join(\n", " df_b,\n", " person.id == job.id,\n", " ).show()\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "8359bd08", "metadata": {}, "source": [ "## Ambiguous columns and transform_to_schema()\n", "\n", "When you use `transform_to_schema()` in a setting with ambiguous columns, you may run into the following error:" ] }, { "cell_type": "code", "execution_count": 4, "id": "e0f1a644", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Columns {'id'} are ambiguous.\n", "Please specify the transformations for these columns explicitly, for example:\n", "\n", "schema_a = register_schema_to_dataset(df_a, A)\n", "schema_b = register_schema_to_dataset(df_b, B)\n", "\n", "transform_to_schema(\n", " df_a.join(\n", " df_b,\n", " schema_a.id == schema_b.id\n", " ),\n", " C,\n", " {\n", " C.id: schema_a.id,\n", " }\n", ")\n", "\n" ] } ], "source": [ "from typedspark import transform_to_schema\n", "\n", "\n", "class PersonWithJob(Schema):\n", " id: Column[IntegerType]\n", " name: Column[StringType]\n", " age: Column[IntegerType]\n", " salary: Column[IntegerType]\n", "\n", "\n", "try:\n", " (\n", " transform_to_schema(\n", " df_a.join(\n", " df_b,\n", " person.id == job.id,\n", " ),\n", " PersonWithJob,\n", " ).show()\n", " )\n", "except ValueError as e:\n", " print(e)" ] }, { "cell_type": "markdown", "id": "5b08a13f", "metadata": {}, "source": [ "The problem is that typedspark doesn't know whether `PersonWithJob.id` should be set to `person.id` or to `job.id`. Let's solve it as the error message suggests!" ] }, { "cell_type": "code", "execution_count": 5, "id": "f39ff425", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+----+------+\n", "| id|name| age|salary|\n", "+---+----+----+------+\n", "| 1|NULL|NULL| NULL|\n", "| 2|NULL|NULL| NULL|\n", "| 3|NULL|NULL| NULL|\n", "+---+----+----+------+\n", "\n" ] } ], "source": [ "(\n", " transform_to_schema(\n", " df_a.join(\n", " df_b,\n", " person.id == job.id,\n", " ),\n", " PersonWithJob,\n", " {\n", " PersonWithJob.id: person.id,\n", " },\n", " ).show()\n", ")" ] }, { "cell_type": "markdown", "id": "675b4dfc", "metadata": {}, "source": [ "## Self-joins\n", "\n", "When dealing with self-joins, running `register_dataset_to_schema()` is not enough. Instead, we'll need `register_dataset_to_schema_with_alias()`." ] }, { "cell_type": "code", "execution_count": 6, "id": "8ea83b65", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+---+---+-------+---+\n", "| id| name|age| id| name|age|\n", "+---+-------+---+---+-------+---+\n", "| 1| Alice| 20| 1| Alice| 20|\n", "| 2| Bob| 30| 2| Bob| 30|\n", "| 3|Charlie| 40| 3|Charlie| 40|\n", "+---+-------+---+---+-------+---+\n", "\n" ] } ], "source": [ "from typedspark import register_schema_to_dataset_with_alias\n", "\n", "df = create_partially_filled_dataset(\n", " spark,\n", " Person,\n", " {\n", " Person.id: [1, 2, 3],\n", " Person.name: [\"Alice\", \"Bob\", \"Charlie\"],\n", " Person.age: [20, 30, 40],\n", " },\n", ")\n", "\n", "df_a, person_a = register_schema_to_dataset_with_alias(df, Person, alias=\"a\")\n", "df_b, person_b = register_schema_to_dataset_with_alias(df, Person, alias=\"b\")\n", "\n", "df_a.join(df_b, person_a.id == person_b.id).show()" ] }, { "cell_type": "markdown", "id": "1a8dcdd1", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": "typedspark", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 5 }