Dealing with column ambiguity
The basics
Sometimes columns can be ambiguous, for example:
[1]:
from pyspark.sql import SparkSession
spark = SparkSession.Builder().config("spark.ui.showConsoleProgress", "false").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
[2]:
from typedspark import Column, Schema, create_partially_filled_dataset
from pyspark.sql.types import IntegerType, StringType
class Person(Schema):
id: Column[IntegerType]
name: Column[StringType]
age: Column[IntegerType]
class Job(Schema):
id: Column[IntegerType]
salary: Column[IntegerType]
df_a = create_partially_filled_dataset(spark, Person, {Person.id: [1, 2, 3]})
df_b = create_partially_filled_dataset(spark, Job, {Job.id: [1, 2, 3]})
try:
df_a.join(df_b, Person.id == Job.id)
except Exception as e:
print(e)
[AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`id`, `id`].
The above resulted in a AnalysisException, because Spark can’t figure out whether id belongs to df_a or df_b. To deal with this, you need to register your Schema to the DataSet.
[3]:
from typedspark import register_schema_to_dataset
person = register_schema_to_dataset(df_a, Person)
job = register_schema_to_dataset(df_b, Job)
(
df_a.join(
df_b,
person.id == job.id,
).show()
)
+---+----+----+---+------+
| id|name| age| id|salary|
+---+----+----+---+------+
| 1|NULL|NULL| 1| NULL|
| 2|NULL|NULL| 2| NULL|
| 3|NULL|NULL| 3| NULL|
+---+----+----+---+------+
Ambiguous columns and transform_to_schema()
When you use transform_to_schema() in a setting with ambiguous columns, you may run into the following error:
[4]:
from typedspark import transform_to_schema
class PersonWithJob(Schema):
id: Column[IntegerType]
name: Column[StringType]
age: Column[IntegerType]
salary: Column[IntegerType]
try:
(
transform_to_schema(
df_a.join(
df_b,
person.id == job.id,
),
PersonWithJob,
).show()
)
except ValueError as e:
print(e)
Columns {'id'} are ambiguous.
Please specify the transformations for these columns explicitly, for example:
schema_a = register_schema_to_dataset(df_a, A)
schema_b = register_schema_to_dataset(df_b, B)
transform_to_schema(
df_a.join(
df_b,
schema_a.id == schema_b.id
),
C,
{
C.id: schema_a.id,
}
)
The problem is that typedspark doesn’t know whether PersonWithJob.id should be set to person.id or to job.id. Let’s solve it as the error message suggests!
[5]:
(
transform_to_schema(
df_a.join(
df_b,
person.id == job.id,
),
PersonWithJob,
{
PersonWithJob.id: person.id,
},
).show()
)
+---+----+----+------+
| id|name| age|salary|
+---+----+----+------+
| 1|NULL|NULL| NULL|
| 2|NULL|NULL| NULL|
| 3|NULL|NULL| NULL|
+---+----+----+------+
Self-joins
When dealing with self-joins, running register_dataset_to_schema() is not enough. Instead, we’ll need register_dataset_to_schema_with_alias().
[6]:
from typedspark import register_schema_to_dataset_with_alias
df = create_partially_filled_dataset(
spark,
Person,
{
Person.id: [1, 2, 3],
Person.name: ["Alice", "Bob", "Charlie"],
Person.age: [20, 30, 40],
},
)
df_a, person_a = register_schema_to_dataset_with_alias(df, Person, alias="a")
df_b, person_b = register_schema_to_dataset_with_alias(df, Person, alias="b")
df_a.join(df_b, person_a.id == person_b.id).show()
+---+-------+---+---+-------+---+
| id| name|age| id| name|age|
+---+-------+---+---+-------+---+
| 1| Alice| 20| 1| Alice| 20|
| 2| Bob| 30| 2| Bob| 30|
| 3|Charlie| 40| 3|Charlie| 40|
+---+-------+---+---+-------+---+