{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "a006aaea", "metadata": {}, "source": [ "# Dynamically generate schemas from an existing DataFrame \n", "\n", "Besides loading a `DataSet` from an existing table, we also provide `create_schema()`, which generates a `Schema` from a `DataFrame` that you have in memory. This allows you to get autocomplete on `DataSets` that you create on-the-fly. We'll first create some data, and then we'll dive into an example of `create_schema()` using a pivot table.\n", "\n", "## Creating data" ] }, { "cell_type": "code", "execution_count": 1, "id": "5442cec0", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.Builder().config(\"spark.ui.showConsoleProgress\", \"false\").getOrCreate()\n", "spark.sparkContext.setLogLevel(\"ERROR\")" ] }, { "cell_type": "code", "execution_count": 2, "id": "0df943fb", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------+------+------------+------------+-------------+\n", "|vaccination_id|pet_id|vaccine_name|vaccine_date|next_due_date|\n", "+--------------+------+------------+------------+-------------+\n", "| 1| 1| rabies| 2022-11-03| 2023-11-03|\n", "| 2| 2| rabies| 2022-10-08| 2023-10-08|\n", "| 3| 3| rabies| 2022-10-14| 2023-10-14|\n", "| 4| 1| lyme| 2022-10-17| 2023-10-17|\n", "| 5| 3| lyme| 2022-10-04| 2023-10-04|\n", "| 6| 2| influenza| 2022-10-03| 2023-10-03|\n", "| 7| 3| influenza| 2022-10-05| 2023-10-05|\n", "+--------------+------+------------+------------+-------------+\n", "\n" ] } ], "source": [ "from datetime import timedelta, datetime\n", "from pyspark.sql.functions import first\n", "from pyspark.sql.types import LongType, StringType, DateType\n", "from typedspark import Column, Schema, create_partially_filled_dataset, create_schema\n", "\n", "date = datetime(2023, 10, 2)\n", "\n", "\n", "class Vaccinations(Schema):\n", " vaccination_id: Column[LongType]\n", " pet_id: Column[LongType]\n", " vaccine_name: Column[StringType]\n", " vaccine_date: Column[DateType]\n", " next_due_date: Column[DateType]\n", "\n", "\n", "vaccinations = create_partially_filled_dataset(\n", " spark,\n", " Vaccinations,\n", " {\n", " Vaccinations.vaccination_id: [1, 2, 3, 4, 5, 6, 7],\n", " Vaccinations.pet_id: [1, 2, 3, 1, 3, 2, 3],\n", " Vaccinations.vaccine_name: [\n", " \"rabies\",\n", " \"rabies\",\n", " \"rabies\",\n", " \"lyme\",\n", " \"lyme\",\n", " \"influenza\",\n", " \"influenza\",\n", " ],\n", " Vaccinations.next_due_date: [\n", " date + timedelta(days=32),\n", " date + timedelta(days=6),\n", " date + timedelta(days=12),\n", " date + timedelta(days=15),\n", " date + timedelta(days=2),\n", " date + timedelta(days=1),\n", " date + timedelta(days=3),\n", " ],\n", " Vaccinations.vaccine_date: [\n", " date + timedelta(days=32) - timedelta(days=365),\n", " date + timedelta(days=6) - timedelta(days=365),\n", " date + timedelta(days=12) - timedelta(days=365),\n", " date + timedelta(days=15) - timedelta(days=365),\n", " date + timedelta(days=2) - timedelta(days=365),\n", " date + timedelta(days=1) - timedelta(days=365),\n", " date + timedelta(days=3) - timedelta(days=365),\n", " ],\n", " },\n", ")\n", "vaccinations.show()" ] }, { "cell_type": "markdown", "id": "2de6c45c", "metadata": {}, "source": [ "## Example using a pivot table\n", "\n", "Let's pivot this table and run `create_schema()`!" ] }, { "cell_type": "code", "execution_count": 3, "id": "f29d8015", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+----------+----------+----------+\n", "|pet_id| influenza| lyme| rabies|\n", "+------+----------+----------+----------+\n", "| 1| NULL|2023-10-17|2023-11-03|\n", "| 2|2023-10-03| NULL|2023-10-08|\n", "| 3|2023-10-05|2023-10-04|2023-10-14|\n", "+------+----------+----------+----------+\n", "\n" ] } ], "source": [ "pivot = (\n", " vaccinations.groupby(Vaccinations.pet_id)\n", " .pivot(Vaccinations.vaccine_name.str)\n", " .agg(first(Vaccinations.next_due_date))\n", ")\n", "\n", "pivot, Pivot = create_schema(pivot)\n", "pivot.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "10524400", "metadata": {}, "source": [ "We can use the resulting `schema` as usual:" ] }, { "cell_type": "code", "execution_count": 4, "id": "974459f3", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "from pyspark.sql.types import DateType, LongType\n", "\n", "from typedspark import Column, Schema\n", "\n", "\n", "class DynamicallyLoadedSchema(Schema):\n", " pet_id: Column[LongType]\n", " influenza: Column[DateType]\n", " lyme: Column[DateType]\n", " rabies: Column[DateType]" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "Pivot" ] }, { "cell_type": "code", "execution_count": 5, "id": "b6c659c6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+----------+----------+----------+\n", "|pet_id| influenza| lyme| rabies|\n", "+------+----------+----------+----------+\n", "| 2|2023-10-03| NULL|2023-10-08|\n", "| 3|2023-10-05|2023-10-04|2023-10-14|\n", "+------+----------+----------+----------+\n", "\n" ] } ], "source": [ "pivot.filter(Pivot.influenza.isNotNull()).show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "18ea295c", "metadata": {}, "source": [ "## Monkeypatch\n", "\n", "We also support doing the above directly in a function-chain using a monkeypatch. " ] }, { "cell_type": "code", "execution_count": 6, "id": "50d02f7e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+----------+----------+----------+\n", "|pet_id| influenza| lyme| rabies|\n", "+------+----------+----------+----------+\n", "| 1| NULL|2023-10-17|2023-11-03|\n", "| 2|2023-10-03| NULL|2023-10-08|\n", "| 3|2023-10-05|2023-10-04|2023-10-14|\n", "+------+----------+----------+----------+\n", "\n" ] } ], "source": [ "pivot, Pivot = (\n", " vaccinations.groupby(Vaccinations.pet_id)\n", " .pivot(Vaccinations.vaccine_name.str)\n", " .agg(first(Vaccinations.next_due_date))\n", " .to_typedspark()\n", ")\n", "pivot.show()" ] }, { "cell_type": "code", "execution_count": 7, "id": "a2cbe246", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "from pyspark.sql.types import DateType, LongType\n", "\n", "from typedspark import Column, Schema\n", "\n", "\n", "class DynamicallyLoadedSchema(Schema):\n", " pet_id: Column[LongType]\n", " influenza: Column[DateType]\n", " lyme: Column[DateType]\n", " rabies: Column[DateType]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "Pivot" ] }, { "cell_type": "markdown", "id": "e6c8b2cd", "metadata": {}, "source": [ "Using the monkeypatched form comes with pros and cons. The pros:\n", "\n", "* No need for intermediate variables\n", "* No need to import `create_schema()`\n", "* Both contribute to a more straightforward workflow\n", "\n", "And cons:\n", "\n", "* The `to_typedspark()` function is glued against against the `DataFrame` class once we import anything from `typedspark`.\n", "* This often means that `to_typedspark()` will not show up during autocomplete, so you'll have to type it yourself.\n", "* For the same reason, typecheckers may raise a linting error.\n", "* And finally, it only works if you've imported something from typedspark already. Shouldn't be a major problem (you'll likely have imported `Catalogs`, for example), but it's something to be aware of.\n" ] } ], "metadata": { "kernelspec": { "display_name": "typedspark", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 5 }