Dealing with column ambiguity

The basics

Sometimes columns can be ambiguous, for example:

[1]:
from pyspark.sql import SparkSession

spark = SparkSession.Builder().config("spark.ui.showConsoleProgress", "false").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
[2]:
from typedspark import Column, Schema, create_partially_filled_dataset
from pyspark.sql.types import IntegerType, StringType


class Person(Schema):
    id: Column[IntegerType]
    name: Column[StringType]
    age: Column[IntegerType]


class Job(Schema):
    id: Column[IntegerType]
    salary: Column[IntegerType]


df_a = create_partially_filled_dataset(spark, Person, {Person.id: [1, 2, 3]})
df_b = create_partially_filled_dataset(spark, Job, {Job.id: [1, 2, 3]})

try:
    df_a.join(df_b, Person.id == Job.id)
except Exception as e:
    print(e)
[AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`id`, `id`].

The above resulted in a AnalysisException, because Spark can’t figure out whether id belongs to df_a or df_b. To deal with this, you need to register your Schema to the DataSet.

[3]:
from typedspark import register_schema_to_dataset

person = register_schema_to_dataset(df_a, Person)
job = register_schema_to_dataset(df_b, Job)
(
    df_a.join(
        df_b,
        person.id == job.id,
    ).show()
)
+---+----+----+---+------+
| id|name| age| id|salary|
+---+----+----+---+------+
|  1|NULL|NULL|  1|  NULL|
|  2|NULL|NULL|  2|  NULL|
|  3|NULL|NULL|  3|  NULL|
+---+----+----+---+------+

Ambiguous columns and transform_to_schema()

When you use transform_to_schema() in a setting with ambiguous columns, you may run into the following error:

[4]:
from typedspark import transform_to_schema


class PersonWithJob(Schema):
    id: Column[IntegerType]
    name: Column[StringType]
    age: Column[IntegerType]
    salary: Column[IntegerType]


try:
    (
        transform_to_schema(
            df_a.join(
                df_b,
                person.id == job.id,
            ),
            PersonWithJob,
        ).show()
    )
except ValueError as e:
    print(e)
Columns {'id'} are ambiguous.
Please specify the transformations for these columns explicitly, for example:

schema_a = register_schema_to_dataset(df_a, A)
schema_b = register_schema_to_dataset(df_b, B)

transform_to_schema(
    df_a.join(
        df_b,
        schema_a.id == schema_b.id
    ),
    C,
    {
        C.id: schema_a.id,
    }
)

The problem is that typedspark doesn’t know whether PersonWithJob.id should be set to person.id or to job.id. Let’s solve it as the error message suggests!

[5]:
(
    transform_to_schema(
        df_a.join(
            df_b,
            person.id == job.id,
        ),
        PersonWithJob,
        {
            PersonWithJob.id: person.id,
        },
    ).show()
)
+---+----+----+------+
| id|name| age|salary|
+---+----+----+------+
|  1|NULL|NULL|  NULL|
|  2|NULL|NULL|  NULL|
|  3|NULL|NULL|  NULL|
+---+----+----+------+

Self-joins

When dealing with self-joins, running register_dataset_to_schema() is not enough. Instead, we’ll need register_dataset_to_schema_with_alias().

[6]:
from typedspark import register_schema_to_dataset_with_alias

df = create_partially_filled_dataset(
    spark,
    Person,
    {
        Person.id: [1, 2, 3],
        Person.name: ["Alice", "Bob", "Charlie"],
        Person.age: [20, 30, 40],
    },
)

df_a, person_a = register_schema_to_dataset_with_alias(df, Person, alias="a")
df_b, person_b = register_schema_to_dataset_with_alias(df, Person, alias="b")

df_a.join(df_b, person_a.id == person_b.id).show()
+---+-------+---+---+-------+---+
| id|   name|age| id|   name|age|
+---+-------+---+---+-------+---+
|  1|  Alice| 20|  1|  Alice| 20|
|  2|    Bob| 30|  2|    Bob| 30|
|  3|Charlie| 40|  3|Charlie| 40|
+---+-------+---+---+-------+---+