diff --git a/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.ipynb b/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.ipynb new file mode 100644 index 0000000..902d3a7 --- /dev/null +++ b/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.ipynb @@ -0,0 +1,1610 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "1a22b5d9", + "metadata": {}, + "source": [ + "# Pyspark Fu\n", + "\n", + "- [1. Initialising the Spark Session](#1-initialising-the-spark-session)\n", + "- [2. Create a simple dataframe for debugging](#2-create-a-simple-dataframe-for-debugging)\n", + "- [3. Joins](#3-joins)\n", + " - [3.1. Avoid duplicate column names](#31-avoid-duplicate-column-names)\n", + " - [3.1.2 Join using list of names](#312-join-using-list-of-names)\n", + " - [3.1.3 Dataframe aliasing is a bit weird](#313-dataframe-aliasing-is-a-bit-weird)\n", + "- [4. The Schema format](#4-the-schema-format)\n", + " - [4.1 Simple Schema Utility Class](#41-simple-schema-utility-class)\n", + " - [4.2 Schema Inference Debacles](#42-schema-inference-debacles)\n" + ] + }, + { + "cell_type": "markdown", + "id": "6e2e9929", + "metadata": {}, + "source": [ + "## 1. Initialising the Spark Session" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "938cae9a-282a-47ea-9828-0d082d94774c", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.conf import SparkConf\n", + "from pyspark.sql import SparkSession\n", + "\n", + "CONF = {\n", + " 'spark.ui.showConsoleProgress': 'false',\n", + " 'spark.ui.dagGraph.retainedRootRDDs': '1',\n", + " 'spark.ui.retainedJobs': '1',\n", + " 'spark.ui.retainedStages': '1',\n", + " 'spark.ui.retainedTasks': '1',\n", + " 'spark.sql.ui.retainedExecutions': '1',\n", + " 'spark.worker.ui.retainedExecutors': '1',\n", + " 'spark.worker.ui.retainedDrivers': '1',\n", + " 'spark.executor.instances': '1',\n", + "}\n", + "\n", + "def spark_session() -> SparkSession:\n", + " '''\n", + " - set a bunch of spark config variables that help lighten the load\n", + " - local[1] locks the spark runtime to a single core\n", + " - silence noisy warning logs\n", + " '''\n", + " conf = SparkConf().setAll([(k,v) for k,v in CONF.items()])\n", + "\n", + " sc = SparkSession.builder.master('local[1]').config(conf=conf).getOrCreate()\n", + " sc.sparkContext.setLogLevel('ERROR')\n", + " return sc" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "5a9afc91-dafc-4e2c-98f4-ecc8e3b876ce", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Setting default log level to \"WARN\".\n", + "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", + "23/08/30 18:40:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" + ] + } + ], + "source": [ + "spark = spark_session()" + ] + }, + { + "cell_type": "markdown", + "id": "e7dbdeb5", + "metadata": {}, + "source": [ + "## 2. Create a simple dataframe for debugging\n", + "\n", + "\n", + "- The pyspark official docs don't often \"create\" the dataframe that the code examples refer to" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "d4a27d70-e893-4810-92a6-7ffa43b11c15", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------------------------+\n", + "|a |n |\n", + "+---+---------------------------+\n", + "|b |{a -> b} |\n", + "|c |{y -> b, z -> x} |\n", + "|d |{2 -> 3, t -> a, o -> null}|\n", + "+---+---------------------------+\n", + "\n" + ] + } + ], + "source": [ + "df = spark.createDataFrame([\n", + " {'a': 'b', 'n': {'a': 'b'}},\n", + " {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},\n", + " {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}\n", + "])\n", + "\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "markdown", + "id": "05f0f0b4-a002-4947-b340-cb38912be8aa", + "metadata": {}, + "source": [ + "## 3. Joins\n", + "\n", + "### 3.1. Avoid duplicate column names" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "54888af5", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------+\n", + "| id| name|\n", + "+---+---------+\n", + "|123| pikachu|\n", + "|999| evee|\n", + "|007|charizard|\n", + "+---+---------+\n", + "\n", + "+---+-----+\n", + "| id| name|\n", + "+---+-----+\n", + "|123| ash|\n", + "|999|chloe|\n", + "|007| ash|\n", + "+---+-----+\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "(None, None)" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Let's construct two dataframes that share a column to join on\n", + "\n", + "df1 = spark.createDataFrame([\n", + " {'id': '123', 'name': 'pikachu'},\n", + " {'id': '999', 'name': 'evee'},\n", + " {'id': '007', 'name': 'charizard'},\n", + "])\n", + "df2 = spark.createDataFrame([\n", + " {'id': '123', 'name': 'ash'},\n", + " {'id': '999', 'name': 'chloe'},\n", + " {'id': '007', 'name': 'ash'},\n", + "])\n", + "\n", + "df1.show(), df2.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "6eb73a7d", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------+---+-----+\n", + "| id| name| id| name|\n", + "+---+---------+---+-----+\n", + "|007|charizard|007| ash|\n", + "|123| pikachu|123| ash|\n", + "|999| evee|999|chloe|\n", + "+---+---------+---+-----+\n", + "\n" + ] + } + ], + "source": [ + "# Now, lets join them together into a combined pokemon-and-trainer table\n", + "joined = df1.join(\n", + " df2,\n", + " on=df1['id'] == df2['id'],\n", + " how='inner',\n", + ")\n", + "joined.show()" + ] + }, + { + "cell_type": "markdown", + "id": "af898b40", + "metadata": {}, + "source": [ + "This _seems_ fine initially, but spark blows up as soon as you try and use the 'id' column in an expression\n", + "\n", + "This example will produce the error:\n", + "\n", + "`[AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`id`, `id`].`\n", + "\n", + "This can be particularly annoying as the error will only appear when you attempt to use the columns, but will go undetected if this doesn't happen" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "5cab4eb2", + "metadata": {}, + "outputs": [], + "source": [ + "import pyspark.sql.utils\n", + "from pyspark.sql import DataFrame\n", + "from typing import List\n", + "\n", + "def try_select(df: DataFrame, cols: List[str]):\n", + " try:\n", + " df.select(*cols).show()\n", + "\n", + " except pyspark.sql.utils.AnalysisException as e:\n", + " print('select failed!', e)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "34f0c2ac", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "select failed! [AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`id`, `id`].\n" + ] + } + ], + "source": [ + "try_select(joined, ['id', 'name', 'trainer'])" + ] + }, + { + "cell_type": "markdown", + "id": "012d4744", + "metadata": {}, + "source": [ + "The solution: use a different parameter for the `on` columns\n", + "\n", + "### 3.1.2 Join using list of names" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "c0bc54b2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------+-----+\n", + "| id| name| name|\n", + "+---+---------+-----+\n", + "|007|charizard| ash|\n", + "|123| pikachu| ash|\n", + "|999| evee|chloe|\n", + "+---+---------+-----+\n", + "\n", + "select failed! [AMBIGUOUS_REFERENCE] Reference `name` is ambiguous, could be: [`name`, `name`].\n" + ] + } + ], + "source": [ + "joined = df1.join(\n", + " df2,\n", + " on=['id'],\n", + " how='inner',\n", + ")\n", + "joined.show()\n", + "\n", + "# Now let's try that same select again\n", + "try_select(joined, ['id', 'name', 'trainer'])" + ] + }, + { + "cell_type": "markdown", + "id": "414bf5ac", + "metadata": {}, + "source": [ + "### 3.1.3 Dataframe aliasing is a bit weird" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "8b46a846", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------+\n", + "| id| name|\n", + "+---+---------+\n", + "|123| pikachu|\n", + "|999| evee|\n", + "|007|charizard|\n", + "+---+---------+\n", + "\n" + ] + } + ], + "source": [ + "df1.alias('pokemon').select('*').show()" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "ccae01f4", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------+---+-----+\n", + "| id| name| id| name|\n", + "+---+---------+---+-----+\n", + "|007|charizard|007| ash|\n", + "|123| pikachu|123| ash|\n", + "|999| evee|999|chloe|\n", + "+---+---------+---+-----+\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "['id', 'name', 'id', 'name']" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pyspark.sql.functions as F\n", + "\n", + "joined = df1.alias('pokemon').join(\n", + " df2.alias('trainers'),\n", + " on=F.col('pokemon.id') == F.col('trainers.id'),\n", + " how='inner',\n", + ")\n", + "joined.show()\n", + "joined.columns" + ] + }, + { + "cell_type": "markdown", + "id": "19620ae6", + "metadata": {}, + "source": [ + "Now, our error message is much better, as it contains the dataframe aliases identifying which table the duplicate column name is from" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "0d0a82b2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "select failed! [AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`pokemon`.`id`, `trainers`.`id`].\n" + ] + } + ], + "source": [ + "try_select(joined, ['id'])" + ] + }, + { + "cell_type": "markdown", + "id": "6d393943", + "metadata": {}, + "source": [ + "Confusingly, using `Dataframe.columns` does not show the aliases, but they are usable when selecting" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "3be334bf", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['id', 'name', 'id', 'name']\n", + "+---+\n", + "| id|\n", + "+---+\n", + "|007|\n", + "|123|\n", + "|999|\n", + "+---+\n", + "\n" + ] + } + ], + "source": [ + "print(joined.columns)\n", + "\n", + "try_select(joined, ['pokemon.id'])" + ] + }, + { + "cell_type": "markdown", + "id": "ffc67166", + "metadata": {}, + "source": [ + "## 4. The Schema format" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "f6fd3f01", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------------------------+\n", + "|a |n |\n", + "+---+---------------------------+\n", + "|b |{a -> b} |\n", + "|c |{y -> b, z -> x} |\n", + "|d |{2 -> 3, t -> a, o -> null}|\n", + "+---+---------------------------+\n", + "\n" + ] + } + ], + "source": [ + "df = spark.createDataFrame([\n", + " {'a': 'b', 'n': {'a': 'b'}},\n", + " {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},\n", + " {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}\n", + "])\n", + "\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "markdown", + "id": "ed1c6c0f", + "metadata": {}, + "source": [ + "Every dataframe has a schema attached, which is a nested object using `StructType` as its root" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "0602fafb", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "StructType([StructField('a', StringType(), True), StructField('n', MapType(StringType(), StringType(), True), True)])" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.schema" + ] + }, + { + "cell_type": "markdown", + "id": "685d28e1", + "metadata": {}, + "source": [ + "This form is fine to use, but can't really stored inside a config file, or passed between systems as parameters. In order to do this, its possible to convert a pyspark schema `StructType` object to JSON, and back again" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "47eacbe0", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\"fields\":[{\"metadata\":{},\"name\":\"a\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"n\",\"nullable\":true,\"type\":{\"keyType\":\"string\",\"type\":\"map\",\"valueContainsNull\":true,\"valueType\":\"string\"}}],\"type\":\"struct\"}\n" + ] + } + ], + "source": [ + "# Convert schema to JSON string\n", + "json_string = df.schema.json()\n", + "print(json_string)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "a364e739", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "StructType([StructField('a', StringType(), True), StructField('n', MapType(StringType(), StringType(), True), True)])" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Convert JSON string to schema\n", + "from pyspark.sql import types as T\n", + "import json\n", + "\n", + "T.StructType.fromJson(json.loads(json_string))" + ] + }, + { + "cell_type": "markdown", + "id": "4baa1603", + "metadata": {}, + "source": [ + "> _You may notice that although the method is called `fromJson`, the method actually accepts a `dictionary`, not a JSON string!_" + ] + }, + { + "cell_type": "markdown", + "id": "8e7abd85", + "metadata": {}, + "source": [ + "### 4.1 Simple Schema Utility Class\n", + "\n", + "Bundling a few helper methods into a utility class is very handy when dealing with pyspark schemas!\n", + "\n", + "Generally speaking, you'll often need to\n", + "\n", + "- Grab a schema from an existing dataframe, to inspect or store\n", + "- Convert that schema to JSON, so that it can be stored or passed around easily\n", + "- Create a pyspark schema from input JSON\n", + "- Print the schema in a human-readable form (multi-line JSON works best)" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "50006bd4", + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass\n", + "import json\n", + "\n", + "from pyspark.sql import DataFrame\n", + "\n", + "@dataclass\n", + "class Schema:\n", + " schema: T.StructType\n", + " \n", + " def from_json(j): return Schema(T.StructType.fromJson(json.loads(j)))\n", + " def from_df(df): return Schema(df.schema)\n", + "\n", + " def as_json(self): return self.schema.json()\n", + " def as_dict(self): return json.loads(self.as_json())\n", + " def show(self): print(json.dumps(self.as_dict(), indent=2))" + ] + }, + { + "cell_type": "markdown", + "id": "a4c6092e", + "metadata": {}, + "source": [ + "> This class is instantiated with a `df.schema`, however it's a good idea to provide some static methods like `from_df` and `from_json` so that it's flexible" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "10f2516f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"n\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"keyType\": \"string\",\n", + " \"type\": \"map\",\n", + " \"valueContainsNull\": true,\n", + " \"valueType\": \"string\"\n", + " }\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "Schema.from_df(df).show()" + ] + }, + { + "cell_type": "markdown", + "id": "289aa5fb", + "metadata": {}, + "source": [ + "### 4.2 Schema Inference Debacles\n", + "\n", + "I've found that pyspark will infer different schemas depending on how the DataFrame is initialised, namely when\n", + "\n", + "- using `spark.read.json`\n", + "- using `spark.createDataFrame`" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "69daf2e4", + "metadata": {}, + "outputs": [], + "source": [ + "row = {'id': 123, 'key': 'yolo', 'attrs': {'a': 1}}" + ] + }, + { + "cell_type": "markdown", + "id": "76af69f1", + "metadata": {}, + "source": [ + "#### 4.2.1 Inferring schema when reading JSON" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "495ce8a7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "import json\n", + "\n", + "# write the row to a file\n", + "json.dump(row, open('not_there.json', 'w'))\n", + "\n", + "Schema(spark.read.json('not_there.json').schema).show()" + ] + }, + { + "cell_type": "markdown", + "id": "2593458e", + "metadata": {}, + "source": [ + "Here, `attrs` has been detected as a `struct` object, which contains a field `a` of type `long`" + ] + }, + { + "cell_type": "markdown", + "id": "7577e2d0", + "metadata": {}, + "source": [ + "#### 4.2.2 Inferring schema when using createDataFrame\n", + "\n", + "Here, `attrs` has been detected as a `map` object, which contains string keys, with a value of type `long`" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "ad644671", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"keyType\": \"string\",\n", + " \"type\": \"map\",\n", + " \"valueContainsNull\": true,\n", + " \"valueType\": \"long\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "Schema.from_df(\n", + " spark.createDataFrame([row])\n", + ").show()" + ] + }, + { + "cell_type": "markdown", + "id": "e30ac195", + "metadata": {}, + "source": [ + "#### 4.2.3 Bonus\n", + "\n", + "To carry the madness forward, it's possible to force either `spark.read.json` or `spark.createDataFrame` to use the schema produced by the other, seemingly without consequences (that I've found so far!)" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "a37183b0", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "# Use the inferred schema from reading the file with createDataFrame\n", + "\n", + "Schema.from_df(\n", + " spark.createDataFrame([row], schema=spark.read.json('not_there.json').schema)\n", + ").show()" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "5e9dec7a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"keyType\": \"string\",\n", + " \"type\": \"map\",\n", + " \"valueContainsNull\": true,\n", + " \"valueType\": \"long\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "# Use the inferred schema from createDataFrame when reading the file\n", + "\n", + "Schema.from_df(\n", + " spark.read.json('not_there.json', schema=spark.createDataFrame([row]).schema)\n", + ").show()" + ] + }, + { + "cell_type": "markdown", + "id": "7e8bdf9d-87c3-4347-9712-e49a22fea92c", + "metadata": {}, + "source": [ + "## 5. Default empty DataFrames\n", + "\n", + "Sometimes it's handy to be able to instantiate an \"empty\" dataframe in the case that a file/some source data is missing" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "f70151d6-b305-45ff-a680-81f3a7d0c3a7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[PATH_NOT_FOUND] Path does not exist: file:/Users/tomm/dev/tmck-code.github.io/articles/20230605_pyspark_fu/optional_source.json.\n" + ] + } + ], + "source": [ + "# This will result in an AnalysisException complaining that \n", + "# the file did not exist\n", + "from pyspark.errors.exceptions.captured import AnalysisException\n", + "\n", + "try:\n", + " spark.read.json('optional_source.json')\n", + "except AnalysisException as e:\n", + " print(e)" + ] + }, + { + "cell_type": "markdown", + "id": "e673e8c9-9bf9-47a1-9926-d3d94fdfcb57", + "metadata": {}, + "source": [ + "We can mitigate this by catching the exception, and creating a dataframe that matches the schema, but has 0 rows.\n", + "\n", + "This ensures that any queries on the dataframe will still work, as all the columns will exist with the correct type.\n", + "\n", + "_**This requires that we know the schema of the optional file**_\n", + "\n", + "\n", + "The easiest way to create a schema is usually to create a single-line file containing a valid line that matches the expected schema. Then, read that file into a dataframe and capture the schema for re-use (read: copy/paste)" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "6d76f75b-ebe8-47d4-a14b-67ade411e5ef", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "import json\n", + "\n", + "with open('not_there.json', 'w') as ostream:\n", + " ostream.write(json.dumps({\n", + " 'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}\n", + " }))\n", + "\n", + "Schema(spark.read.json('not_there.json').schema).show()" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "d523bafc", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"keyType\": \"string\",\n", + " \"type\": \"map\",\n", + " \"valueContainsNull\": true,\n", + " \"valueType\": \"string\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "Schema.from_df(spark.createDataFrame([\n", + " {'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}}\n", + "])).show()" + ] + }, + { + "cell_type": "markdown", + "id": "bf55be1d-bf2b-4368-87c0-2a28974dc262", + "metadata": {}, + "source": [ + "I've never found a way (using StringIO or similar) to achieve this without writing a file - if you find a way then let me know!\n", + "\n", + "Let's bundle this up into a method that tidies up after itself:" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "id": "99aca1da-147e-4ab4-ada4-c2642c066830", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import os\n", + "\n", + "def guess_schema(row: dict, tmp_fpath: str = 'tmp.json') -> dict:\n", + " with open(tmp_fpath, 'w') as ostream:\n", + " ostream.write(json.dumps({\n", + " 'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}\n", + " })) \n", + " schema = json.loads(spark.read.json('not_there.json').schema.json())\n", + " os.remove(tmp_fpath)\n", + "\n", + " return schema" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "id": "fb184808-7c85-4506-ab9d-f7f2849eee70", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "schema = guess_schema(\n", + " {'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}}\n", + ")\n", + "print(json.dumps(schema, indent=2))" + ] + }, + { + "cell_type": "markdown", + "id": "45033389-3134-42cb-aafd-2a6b38161cea", + "metadata": {}, + "source": [ + "As you can see from this quick demo, it isn't quick to craft pyspark schemas from hand! In my experience it's prone to much human error and frustrating debugging, especially as schemas can grow large very quickly!\n", + "\n", + "Now, we can tie this into the method to safely load/create a dataframe" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "05059828-375c-4a63-b8b4-766bcbf6568d", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.errors.exceptions.captured import AnalysisException\n", + "import pyspark.sql.types as T\n", + "\n", + "def safe_load(fpath: str, schema: dict):\n", + " try:\n", + " return spark.read.json(fpath)\n", + " except AnalysisException as e:\n", + " print(e)\n", + " return spark.createDataFrame([], schema=T.StructType.fromJson(schema))" + ] + }, + { + "cell_type": "markdown", + "id": "838b9778-c361-49d0-a111-b04858069fc6", + "metadata": {}, + "source": [ + "> Side note: the method to convert a dict to a StructType (schema) is confusingly named `fromJson` despite the fact that the method accepts a dict, not a JSON string" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "id": "1a81a746-f9b6-4ed7-b34f-8e0c0375b511", + "metadata": {}, + "outputs": [], + "source": [ + "df = safe_load('not_there.json', schema)" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "id": "1649aefc-7d18-48fe-9022-cbd0296bda5e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+---+----+\n", + "|attrs| id| key|\n", + "+-----+---+----+\n", + "| {b}|123|yolo|\n", + "+-----+---+----+\n", + "\n" + ] + } + ], + "source": [ + "df.show()" + ] + }, + { + "cell_type": "markdown", + "id": "ce755e33-0f8e-4fd8-8e52-d6650fd23f78", + "metadata": {}, + "source": [ + "After the initial generation, the schema can be stored in a file and loaded or just defined directly in the code, rather than \"guessed\" every time" + ] + }, + { + "cell_type": "markdown", + "id": "94b81a00", + "metadata": {}, + "source": [ + "## 6. Generating JSON output with dynamic keys" + ] + }, + { + "cell_type": "markdown", + "id": "707a4444", + "metadata": {}, + "source": [ + "To demonstrate the problem, we will\n", + "1. read in a JSON file matching the dataframe above, with a few different nested types (e.g. strings, numbers, and null)\n", + "2. remove any key/values pairs with null values\n", + "3. write a JSON file where all nested types match the input" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "id": "ac71e1c5", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+---+----+\n", + "|attrs|id |key |\n", + "+-----+---+----+\n", + "|{b} |123|yolo|\n", + "+-----+---+----+\n", + "\n" + ] + } + ], + "source": [ + "example_df = spark.createDataFrame([\n", + " {'a': 'b', 'n': {'a': 'b'}},\n", + " {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},\n", + " {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}\n", + "])\n", + "\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "id": "629ec980", + "metadata": {}, + "outputs": [], + "source": [ + "data = [\n", + " {'a': 'b', 'n': {'a': True, 'z': ['1', 7, True]}},\n", + " {'a': 'c', 'n': {'a': 'b', 'z': 'x', 't': None}},\n", + " {'a': 'd', 'n': {'a': 3, 'z': None}},\n", + "]\n", + "print('\\n'.join(json.dumps(r) for r in data), file=open('tmp.json', 'w'))" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "f2e5e5de", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\"a\": \"b\", \"n\": {\"a\": true, \"z\": [\"1\", 7, true]}}\n", + "{\"a\": \"c\", \"n\": {\"a\": \"b\", \"z\": \"x\", \"t\": null}}\n", + "{\"a\": \"d\", \"n\": {\"a\": 3, \"z\": null}}\n" + ] + } + ], + "source": [ + "cat tmp.json" + ] + }, + { + "cell_type": "markdown", + "id": "bc13635c", + "metadata": {}, + "source": [ + "Now, to read the file with spark" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "id": "d28ed31f", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+--------------------------+\n", + "|a |n |\n", + "+---+--------------------------+\n", + "|b |{true, null, [\"1\",7,true]}|\n", + "|c |{b, null, x} |\n", + "|d |{3, null, null} |\n", + "+---+--------------------------+\n", + "\n" + ] + } + ], + "source": [ + "df = spark.read.json('tmp.json')\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "markdown", + "id": "06bd8002", + "metadata": {}, + "source": [ + "This looks a little weird! This is due to the fact that spark.createDataFrame and spark.read.json can be given an identical input table and infer different schemas" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "id": "bfa038b5", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"n\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"t\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"z\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + " }\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "Schema(df.schema).show()" + ] + }, + { + "cell_type": "markdown", + "id": "fcfa1d9b", + "metadata": {}, + "source": [ + "You might notice that spark has inferred all " + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "id": "e48f5b25", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+------------------------------+\n", + "|a |n |\n", + "+---+------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true]}|\n", + "|c |{a -> b, z -> x, t -> null} |\n", + "|d |{a -> 3, z -> null} |\n", + "+---+------------------------------+\n", + "\n" + ] + } + ], + "source": [ + "df = spark.read.json('tmp.json', schema=example_df.schema)\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "id": "501cd405", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+------------------------------+\n", + "|a |n |\n", + "+---+------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true]}|\n", + "|c |{a -> b, z -> x} |\n", + "|d |{a -> 3} |\n", + "+---+------------------------------+\n", + "\n" + ] + } + ], + "source": [ + "import pyspark.sql.functions as F\n", + "\n", + "df_filtered = df.select(\n", + " F.col('a'),\n", + " F.map_filter(F.col('n'), lambda k,v: v.isNotNull()).alias('n')\n", + ")\n", + "df_filtered.show(truncate=False)" + ] + }, + { + "cell_type": "markdown", + "id": "f8dcd270", + "metadata": {}, + "source": [ + "The performance impact of using a python UDF can be mitigated by using a pure SQL statement" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "id": "24efbc9c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+------------------------------+\n", + "|a |custom |\n", + "+---+------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true]}|\n", + "|c |{a -> b, z -> x} |\n", + "|d |{a -> 3} |\n", + "+---+------------------------------+\n", + "\n" + ] + } + ], + "source": [ + "df.createOrReplaceTempView(\"df\")\n", + "df_filtered = spark.sql(\n", + " \"select a, map_filter(n, (k,v) -> v is not null) custom from df\"\n", + ")\n", + "df_filtered.show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "id": "049d7363", + "metadata": {}, + "outputs": [], + "source": [ + "df_filtered.write.format('json').mode('overwrite').save('tmp_out.json')" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "id": "665c9b31", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\"a\":\"b\",\"custom\":{\"a\":\"true\",\"z\":\"[\\\"1\\\",7,true]\"}}\n", + "{\"a\":\"c\",\"custom\":{\"a\":\"b\",\"z\":\"x\"}}\n", + "{\"a\":\"d\",\"custom\":{\"a\":\"3\"}}\n" + ] + } + ], + "source": [ + "cat tmp_out.json/part*" + ] + }, + { + "cell_type": "markdown", + "id": "9b8d80d5", + "metadata": {}, + "source": [ + "As we can see here, this is very close! All of the types have been _somewhat_ preserved:\n", + "\n", + "- All strings are still strings\n", + "- All other types are JSON-encoded strings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01b26da2", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb b/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb index 902d3a7..13e729d 100644 --- a/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb +++ b/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb @@ -73,7 +73,7 @@ "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", - "23/08/30 18:40:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" + "23/11/22 11:24:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] } ], @@ -1270,7 +1270,7 @@ }, { "cell_type": "code", - "execution_count": 32, + "execution_count": 147, "id": "ac71e1c5", "metadata": {}, "outputs": [ @@ -1278,43 +1278,45 @@ "name": "stdout", "output_type": "stream", "text": [ - "+-----+---+----+\n", - "|attrs|id |key |\n", - "+-----+---+----+\n", - "|{b} |123|yolo|\n", - "+-----+---+----+\n", + "+---+---------------------------+\n", + "|a |custom |\n", + "+---+---------------------------+\n", + "|b |{a -> b} |\n", + "|c |{y -> b, z -> x} |\n", + "|d |{2 -> 3, t -> a, o -> null}|\n", + "+---+---------------------------+\n", "\n" ] } ], "source": [ "example_df = spark.createDataFrame([\n", - " {'a': 'b', 'n': {'a': 'b'}},\n", - " {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},\n", - " {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}\n", + " {'a': 'b', 'custom': {'a': 'b'}},\n", + " {'a': 'c', 'custom': {'z': 'x', 'y': 'b'}},\n", + " {'a': 'd', 'custom': {'o': None, 't': 'a', '2': 3}}\n", "])\n", "\n", - "df.show(truncate=False)" + "example_df.show(truncate=False)" ] }, { "cell_type": "code", - "execution_count": 33, + "execution_count": 148, "id": "629ec980", "metadata": {}, "outputs": [], "source": [ "data = [\n", - " {'a': 'b', 'n': {'a': True, 'z': ['1', 7, True]}},\n", - " {'a': 'c', 'n': {'a': 'b', 'z': 'x', 't': None}},\n", - " {'a': 'd', 'n': {'a': 3, 'z': None}},\n", + " {'a': 'b', 'custom': {'a': True, 'z': ['1', 7, True, [1, 2]]}},\n", + " {'a': 'c', 'custom': {'a': 'b', 'z': 'x', 't': None}},\n", + " {'a': 'd', 'custom': {'a': 3, 'z': [True]}},\n", "]\n", "print('\\n'.join(json.dumps(r) for r in data), file=open('tmp.json', 'w'))" ] }, { "cell_type": "code", - "execution_count": 34, + "execution_count": 149, "id": "f2e5e5de", "metadata": {}, "outputs": [ @@ -1322,9 +1324,9 @@ "name": "stdout", "output_type": "stream", "text": [ - "{\"a\": \"b\", \"n\": {\"a\": true, \"z\": [\"1\", 7, true]}}\n", - "{\"a\": \"c\", \"n\": {\"a\": \"b\", \"z\": \"x\", \"t\": null}}\n", - "{\"a\": \"d\", \"n\": {\"a\": 3, \"z\": null}}\n" + "{\"a\": \"b\", \"custom\": {\"a\": true, \"z\": [\"1\", 7, true, [1, 2]]}}\n", + "{\"a\": \"c\", \"custom\": {\"a\": \"b\", \"z\": \"x\", \"t\": null}}\n", + "{\"a\": \"d\", \"custom\": {\"a\": 3, \"z\": [true]}}\n" ] } ], @@ -1342,7 +1344,7 @@ }, { "cell_type": "code", - "execution_count": 35, + "execution_count": 151, "id": "d28ed31f", "metadata": { "scrolled": true @@ -1352,13 +1354,13 @@ "name": "stdout", "output_type": "stream", "text": [ - "+---+--------------------------+\n", - "|a |n |\n", - "+---+--------------------------+\n", - "|b |{true, null, [\"1\",7,true]}|\n", - "|c |{b, null, x} |\n", - "|d |{3, null, null} |\n", - "+---+--------------------------+\n", + "+---+--------------------------------+\n", + "|a |custom |\n", + "+---+--------------------------------+\n", + "|b |{true, null, [\"1\",7,true,[1,2]]}|\n", + "|c |{b, null, x} |\n", + "|d |{3, null, [true]} |\n", + "+---+--------------------------------+\n", "\n" ] } @@ -1378,7 +1380,7 @@ }, { "cell_type": "code", - "execution_count": 36, + "execution_count": 152, "id": "bfa038b5", "metadata": {}, "outputs": [ @@ -1396,7 +1398,7 @@ " },\n", " {\n", " \"metadata\": {},\n", - " \"name\": \"n\",\n", + " \"name\": \"custom\",\n", " \"nullable\": true,\n", " \"type\": {\n", " \"fields\": [\n", @@ -1432,6 +1434,14 @@ "Schema(df.schema).show()" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "8864f513-7b56-4367-bfd7-3d542a85f712", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "markdown", "id": "fcfa1d9b", @@ -1442,7 +1452,7 @@ }, { "cell_type": "code", - "execution_count": 37, + "execution_count": 153, "id": "e48f5b25", "metadata": { "scrolled": true @@ -1452,13 +1462,13 @@ "name": "stdout", "output_type": "stream", "text": [ - "+---+------------------------------+\n", - "|a |n |\n", - "+---+------------------------------+\n", - "|b |{a -> true, z -> [\"1\",7,true]}|\n", - "|c |{a -> b, z -> x, t -> null} |\n", - "|d |{a -> 3, z -> null} |\n", - "+---+------------------------------+\n", + "+---+------------------------------------+\n", + "|a |custom |\n", + "+---+------------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true,[1,2]]}|\n", + "|c |{a -> b, z -> x, t -> null} |\n", + "|d |{a -> 3, z -> [true]} |\n", + "+---+------------------------------------+\n", "\n" ] } @@ -1470,7 +1480,7 @@ }, { "cell_type": "code", - "execution_count": 38, + "execution_count": 137, "id": "501cd405", "metadata": {}, "outputs": [ @@ -1478,13 +1488,21 @@ "name": "stdout", "output_type": "stream", "text": [ - "+---+------------------------------+\n", - "|a |n |\n", - "+---+------------------------------+\n", - "|b |{a -> true, z -> [\"1\",7,true]}|\n", - "|c |{a -> b, z -> x} |\n", - "|d |{a -> 3} |\n", - "+---+------------------------------+\n", + "+---+------------------------------------+\n", + "|a |custom |\n", + "+---+------------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true,[1,2]]}|\n", + "|c |{a -> b, z -> x, t -> null} |\n", + "|d |{a -> 3, z -> [true]} |\n", + "+---+------------------------------------+\n", + "\n", + "+---+------------------------------------+\n", + "|a |custom |\n", + "+---+------------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true,[1,2]]}|\n", + "|c |{a -> b, z -> x} |\n", + "|d |{a -> 3, z -> [true]} |\n", + "+---+------------------------------------+\n", "\n" ] } @@ -1492,9 +1510,10 @@ "source": [ "import pyspark.sql.functions as F\n", "\n", + "df.show(truncate=False)\n", "df_filtered = df.select(\n", " F.col('a'),\n", - " F.map_filter(F.col('n'), lambda k,v: v.isNotNull()).alias('n')\n", + " F.map_filter(F.col('custom'), lambda k,v: v.isNotNull()).alias('custom')\n", ")\n", "df_filtered.show(truncate=False)" ] @@ -1509,7 +1528,7 @@ }, { "cell_type": "code", - "execution_count": 39, + "execution_count": 138, "id": "24efbc9c", "metadata": {}, "outputs": [ @@ -1517,13 +1536,13 @@ "name": "stdout", "output_type": "stream", "text": [ - "+---+------------------------------+\n", - "|a |custom |\n", - "+---+------------------------------+\n", - "|b |{a -> true, z -> [\"1\",7,true]}|\n", - "|c |{a -> b, z -> x} |\n", - "|d |{a -> 3} |\n", - "+---+------------------------------+\n", + "+---+------------------------------------+\n", + "|a |custom |\n", + "+---+------------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true,[1,2]]}|\n", + "|c |{a -> b, z -> x} |\n", + "|d |{a -> 3, z -> [true]} |\n", + "+---+------------------------------------+\n", "\n" ] } @@ -1531,24 +1550,40 @@ "source": [ "df.createOrReplaceTempView(\"df\")\n", "df_filtered = spark.sql(\n", - " \"select a, map_filter(n, (k,v) -> v is not null) custom from df\"\n", + " \"select a, map_filter(custom, (k,v) -> v is not null) custom from df\"\n", ")\n", "df_filtered.show(truncate=False)" ] }, { "cell_type": "code", - "execution_count": 40, + "execution_count": 139, "id": "049d7363", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\"a\":\"b\",\"custom\":{\"a\":\"true\",\"z\":\"[\\\"1\\\",7,true,[1,2]]\"}}\n", + "{\"a\":\"c\",\"custom\":{\"a\":\"b\",\"z\":\"x\"}}\n", + "{\"a\":\"d\",\"custom\":{\"a\":\"3\",\"z\":\"true\"}}\n", + "{\"a\":\"b\",\"custom\":{\"a\":\"true\",\"z\":\"[\\\"1\\\",7,true,[1,2]]\"}}\n", + "{\"a\":\"c\",\"custom\":{\"a\":\"b\",\"z\":\"x\",\"t\":null}}\n", + "{\"a\":\"d\",\"custom\":{\"a\":\"3\",\"z\":\"[true]\"}}\n" + ] + } + ], "source": [ + "! cat tmp_out.json/part*\n", + "df.write.format('json').mode('overwrite').save('tmp_out.json')\n", + "! cat tmp_out.json/part*\n", "df_filtered.write.format('json').mode('overwrite').save('tmp_out.json')" ] }, { "cell_type": "code", - "execution_count": 41, + "execution_count": 140, "id": "665c9b31", "metadata": {}, "outputs": [ @@ -1556,9 +1591,9 @@ "name": "stdout", "output_type": "stream", "text": [ - "{\"a\":\"b\",\"custom\":{\"a\":\"true\",\"z\":\"[\\\"1\\\",7,true]\"}}\n", + "{\"a\":\"b\",\"custom\":{\"a\":\"true\",\"z\":\"[\\\"1\\\",7,true,[1,2]]\"}}\n", "{\"a\":\"c\",\"custom\":{\"a\":\"b\",\"z\":\"x\"}}\n", - "{\"a\":\"d\",\"custom\":{\"a\":\"3\"}}\n" + "{\"a\":\"d\",\"custom\":{\"a\":\"3\",\"z\":\"[true]\"}}\n" ] } ], @@ -1579,9 +1614,90 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 143, "id": "01b26da2", "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------------------------------+\n", + "|a |custom |\n", + "+---+---------------------------------+\n", + "|b |{\"a\":true,\"z\":[\"1\",7,true,[1,2]]}|\n", + "|c |{\"a\":\"b\",\"z\":\"x\",\"t\":null} |\n", + "|d |{\"a\":3,\"z\":[true]} |\n", + "+---+---------------------------------+\n", + "\n", + "StructType([StructField('a', StringType(), True), StructField('custom', StringType(), True)])\n" + ] + }, + { + "data": { + "text/plain": [ + "[Row(from_json(custom)=Row(z=['1', '7', 'true', '[1,2]'])),\n", + " Row(from_json(custom)=Row(z=None)),\n", + " Row(from_json(custom)=Row(z=['true']))]" + ] + }, + "execution_count": 143, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from pyspark.sql import functions as F\n", + "from pyspark.sql import types as T\n", + "df = spark.read.json('tmp.json', schema=T.StructType(\n", + " [\n", + " T.StructField(\n", + " name='a', dataType = T.StringType()\n", + " ),\n", + " T.StructField(\n", + " name='custom',\n", + " dataType=T.StringType()\n", + " )\n", + " ]\n", + "))\n", + "df.show(truncate=False)\n", + "print(df.schema)\n", + "df.select(\n", + " F.from_json(\n", + " F.col('custom'),\n", + " schema=T.StructType(\n", + " [\n", + " T.StructField(\n", + " name='z',\n", + " dataType=T.ArrayType(T.BooleanType())\n", + " )\n", + " ]\n", + " )\n", + " )\n", + ").collect()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fc822e9d-65b0-477d-8f1a-016bbcaf374f", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "86583c85-dce3-428e-9641-468346d9ed21", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "55ec7c51-98c1-4874-ab21-ff73f580720a", + "metadata": {}, "outputs": [], "source": [] } diff --git a/articles/20230605_pyspark_fu/not_there.json b/articles/20230605_pyspark_fu/not_there.json new file mode 100644 index 0000000..135ba47 --- /dev/null +++ b/articles/20230605_pyspark_fu/not_there.json @@ -0,0 +1 @@ +{"id": 123, "key": "yolo", "attrs": {"a": "b"}} \ No newline at end of file diff --git a/articles/20230605_pyspark_fu/tmp.json b/articles/20230605_pyspark_fu/tmp.json new file mode 100644 index 0000000..f999fd5 --- /dev/null +++ b/articles/20230605_pyspark_fu/tmp.json @@ -0,0 +1,3 @@ +{"a": "b", "custom": {"a": true, "z": ["1", 7, true, [1, 2]]}} +{"a": "c", "custom": {"a": "b", "z": "x", "t": null}} +{"a": "d", "custom": {"a": 3, "z": [true]}} diff --git a/articles/20230605_pyspark_fu/tmp_out.json/._SUCCESS.crc b/articles/20230605_pyspark_fu/tmp_out.json/._SUCCESS.crc new file mode 100644 index 0000000..3b7b044 Binary files /dev/null and b/articles/20230605_pyspark_fu/tmp_out.json/._SUCCESS.crc differ diff --git a/articles/20230605_pyspark_fu/tmp_out.json/.part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json.crc b/articles/20230605_pyspark_fu/tmp_out.json/.part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json.crc new file mode 100644 index 0000000..610dd23 Binary files /dev/null and b/articles/20230605_pyspark_fu/tmp_out.json/.part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json.crc differ diff --git a/articles/20230605_pyspark_fu/tmp_out.json/_SUCCESS b/articles/20230605_pyspark_fu/tmp_out.json/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/articles/20230605_pyspark_fu/tmp_out.json/part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json b/articles/20230605_pyspark_fu/tmp_out.json/part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json new file mode 100644 index 0000000..2d557ff --- /dev/null +++ b/articles/20230605_pyspark_fu/tmp_out.json/part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json @@ -0,0 +1,3 @@ +{"a":"b","custom":{"a":"true","z":"[\"1\",7,true,[1,2]]"}} +{"a":"c","custom":{"a":"b","z":"x"}} +{"a":"d","custom":{"a":"3","z":"[true]"}}