{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "0bac925e", "metadata": {}, "source": [ "# Loading DataSets in notebooks\n", "\n", "## The basics\n", "\n", "Using typedspark, you can dynamically load a `DataSet` and its corresponding `Schema` from an existing table. This provides the following benefits:\n", "\n", "* Autocomplete on table names\n", "* Autocomplete on column names\n", "* Generating a `Schema` from the table\n", " * Which you can copy-paste into your IDE when you want to add a new `DataSet` to your pipelines\n", " * Works even when the underlying table has not been serialized using typedspark!\n", "\n", "To illustrate these points, let us first make a temporary table that we can load later." ] }, { "cell_type": "code", "execution_count": 1, "id": "88feb784", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.Builder().config(\"spark.ui.showConsoleProgress\", \"false\").getOrCreate()\n", "spark.sparkContext.setLogLevel(\"ERROR\")" ] }, { "cell_type": "code", "execution_count": 2, "id": "d56535f4", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "df = spark.createDataFrame(\n", " pd.DataFrame(\n", " dict(\n", " name=[\"Jack\", \"John\", \"Jane\"],\n", " age=[20, 30, 40],\n", " )\n", " )\n", ")\n", "df.createOrReplaceTempView(\"person_table\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "aa94840a", "metadata": {}, "source": [ "## Catalogs\n", "\n", "The easiest way to load a `DataSet` is using `Catalogs`." ] }, { "cell_type": "code", "execution_count": 3, "id": "3ae12618", "metadata": {}, "outputs": [], "source": [ "from typedspark import Catalogs\n", "\n", "db = Catalogs()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "033c28d6", "metadata": {}, "source": [ "After running the above cell, we can use `db` to load our table. Notice that you'll get autocomplete here!" ] }, { "cell_type": "code", "execution_count": 4, "id": "4adf417f", "metadata": {}, "outputs": [], "source": [ "persons, Person = db.spark_catalog.default.person_table()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "03140ede", "metadata": {}, "source": [ "We can use the `DataSet` and `Schema` just as we would do in the IDE." ] }, { "cell_type": "code", "execution_count": 5, "id": "74e81f87", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+---+\n", "|name|age|\n", "+----+---+\n", "|Jack| 20|\n", "|John| 30|\n", "|Jane| 40|\n", "+----+---+\n", "\n" ] } ], "source": [ "persons.show()" ] }, { "cell_type": "code", "execution_count": 6, "id": "f54ef94a", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "from pyspark.sql.types import LongType, StringType\n", "\n", "from typedspark import Column, Schema\n", "\n", "\n", "class PersonTable(Schema):\n", " name: Column[StringType]\n", " age: Column[LongType]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "Person" ] }, { "cell_type": "code", "execution_count": 7, "id": "8b6d809d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+---+\n", "|name|age|\n", "+----+---+\n", "|John| 30|\n", "|Jane| 40|\n", "+----+---+\n", "\n" ] } ], "source": [ "persons.filter(Person.age > 25).show()" ] }, { "cell_type": "markdown", "id": "a34fcedb", "metadata": {}, "source": [ "And of course, while we're typing the above statement, we'll get autocomplete on the columns of `persons`!\n", "\n", "Another application of the above is if we're migrating to existing tables to typedspark, and we want to generate the corresponding schemas such that we can copy them to the repository. In that case, we may also want to generate documentation." ] }, { "cell_type": "code", "execution_count": 8, "id": "167f1072", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "from typing import Annotated\n", "\n", "from pyspark.sql.types import LongType, StringType\n", "\n", "from typedspark import Column, ColumnMeta, Schema\n", "\n", "\n", "class PersonTable(Schema):\n", " \"\"\"Add documentation here.\"\"\"\n", "\n", " name: Annotated[Column[StringType], ColumnMeta(comment=\"\")]\n", " age: Annotated[Column[LongType], ColumnMeta(comment=\"\")]\n", "\n" ] } ], "source": [ "Person.print_schema(include_documentation=True)" ] }, { "cell_type": "markdown", "id": "63e3cc44", "metadata": {}, "source": [ "Of note, `Catalog()` automatically runs `register_schema_to_dataset()` on the resulting `DataSet` and `Schema`, hence resolving potential column disambiguities." ] }, { "attachments": {}, "cell_type": "markdown", "id": "f9cee952", "metadata": {}, "source": [ "## Databases\n", "\n", "`Catalogs` is often the only class you need. But if loading all catalogs takes too long, or if you only want to use only one catalog anyway, you can use `Databases` instead. We can use `Databases(spark, catalog_name=...)` to specify which catalog we want to load. Or we can omit this parameter to load the default catalog (often `spark_catalog` or `hive_metastore`)." ] }, { "cell_type": "code", "execution_count": 9, "id": "ac42d191", "metadata": {}, "outputs": [], "source": [ "from typedspark import Databases\n", "\n", "db = Databases()" ] }, { "cell_type": "code", "execution_count": 10, "id": "df585e65", "metadata": {}, "outputs": [], "source": [ "persons, Person = db.default.person_table()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e8606fb4", "metadata": {}, "source": [ "## Database\n", "\n", "If we just want to load the tables from a single database, we can use `Database`. Once again, we can either specify the database (through `Database(spark, db_name=...)`) or leave it blank to load the default database." ] }, { "cell_type": "code", "execution_count": 11, "id": "1233a5fe", "metadata": {}, "outputs": [], "source": [ "from typedspark import Database\n", "\n", "db = Database()" ] }, { "cell_type": "code", "execution_count": 12, "id": "9b2c02ac", "metadata": {}, "outputs": [], "source": [ "person, Person = db.person_table.load()" ] }, { "cell_type": "markdown", "id": "071cd277", "metadata": {}, "source": [ "# Names starting with an underscore\n", "\n", "For `Catalogs()`, `Databases()` and `Database()`, names starting with an underscore are problematic for auto-complete. For example, suppose we'd make this table" ] }, { "cell_type": "code", "execution_count": 13, "id": "acfd55af", "metadata": {}, "outputs": [], "source": [ "df.write.saveAsTable(\"default._person\")" ] }, { "cell_type": "markdown", "id": "7fcf12ef", "metadata": {}, "source": [ "Then we won't get auto-complete using `db._person`, since our notebook will assume we don't want auto-complete on private variables. To circumvent this, we rename the attribute as such:" ] }, { "cell_type": "code", "execution_count": 14, "id": "b0491ad1", "metadata": {}, "outputs": [], "source": [ "db = Database()" ] }, { "cell_type": "code", "execution_count": 15, "id": "f7f41731", "metadata": {}, "outputs": [], "source": [ "persons, Person = db.u_person()" ] }, { "cell_type": "markdown", "id": "5e1b3aaa", "metadata": {}, "source": [ "The underlying table is not renamed, solely the class attribute used for autocomplete.\n", "\n", "When renaming the attribute leads to a naming conflict (e.g. because `u_person` already exists), we resolve the conflict by adding more underscores (e.g. `_person` would then become `u__person`)." ] }, { "cell_type": "markdown", "id": "e0187268", "metadata": {}, "source": [ "## Loading a single DataSet\n", "\n", "If you really only want to load one DataSet, you can use `load_table()`." ] }, { "cell_type": "code", "execution_count": 16, "id": "264c2957", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "from pyspark.sql.types import LongType, StringType\n", "\n", "from typedspark import Column, Schema\n", "\n", "\n", "class DynamicallyLoadedSchema(Schema):\n", " name: Column[StringType]\n", " age: Column[LongType]" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from typedspark import load_table\n", "\n", "person, Person = load_table(spark, \"person_table\")\n", "Person" ] }, { "cell_type": "markdown", "id": "ef2ec0e3", "metadata": {}, "source": [ "If you’d like to have the schema name properly displayed, you can use the optional schema_name argument." ] }, { "cell_type": "code", "execution_count": 17, "id": "7199bee0", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "from pyspark.sql.types import LongType, StringType\n", "\n", "from typedspark import Column, Schema\n", "\n", "\n", "class Person(Schema):\n", " name: Column[StringType]\n", " age: Column[LongType]" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "person, Person = load_table(spark, \"person_table\", schema_name=\"Person\")\n", "Person" ] }, { "cell_type": "markdown", "id": "15e97d2d", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": "typedspark", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 5 }