diff --git a/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.ipynb b/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.ipynb deleted file mode 100644 index 748c542..0000000 --- a/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.ipynb +++ /dev/null @@ -1,119 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "938cae9a-282a-47ea-9828-0d082d94774c", - "metadata": {}, - "outputs": [], - "source": [ - "from pyspark.conf import SparkConf\n", - "from pyspark.sql import SparkSession\n", - "\n", - "CONF = {\n", - " \"spark.ui.showConsoleProgress\": \"false\",\n", - " \"spark.ui.dagGraph.retainedRootRDDs\": \"1\",\n", - " \"spark.ui.retainedJobs\": \"1\",\n", - " \"spark.ui.retainedStages\": \"1\",\n", - " \"spark.ui.retainedTasks\": \"1\",\n", - " \"spark.sql.ui.retainedExecutions\": \"1\",\n", - " \"spark.worker.ui.retainedExecutors\": \"1\",\n", - " \"spark.worker.ui.retainedDrivers\": \"1\",\n", - " \"spark.executor.instances\": \"1\",\n", - "}Î\n", - "\n", - "def spark_session() -> SparkSession:\n", - " '''\n", - " - set a bunch of spark config variables that help lighten the load\n", - " - local[1] locks the spark runtime to a single core\n", - " - silence noisy warning logs\n", - " '''\n", - " conf = SparkConf().setAll([(k,v) for k,v in CONF.items()])\n", - "\n", - " sc = SparkSession.builder.master(\"local[1]\").config(conf=conf).getOrCreate()\n", - " sc.sparkContext.setLogLevel(\"ERROR\")\n", - " return sc" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "5a9afc91-dafc-4e2c-98f4-ecc8e3b876ce", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "Setting default log level to \"WARN\".\n", - "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", - "23/06/05 17:25:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" - ] - } - ], - "source": [ - "spark = spark_session()" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "d4a27d70-e893-4810-92a6-7ffa43b11c15", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+---+---------------------------+\n", - "|a |n |\n", - "+---+---------------------------+\n", - "|b |{a -> b} |\n", - "|c |{y -> b, z -> x} |\n", - "|d |{2 -> 3, t -> a, o -> null}|\n", - "+---+---------------------------+\n", - "\n" - ] - } - ], - "source": [ - "df = spark.createDataFrame([\n", - " {'a': 'b', 'n': {'a': 'b'}},\n", - " {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},\n", - " {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}\n", - "])\n", - "\n", - "df.show(truncate=False)\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "05f0f0b4-a002-4947-b340-cb38912be8aa", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.4" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.md b/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.md deleted file mode 100644 index 626b631..0000000 --- a/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.md +++ /dev/null @@ -1,112 +0,0 @@ -# 20230605 PySpark Fu - -When working with PySpark, I often find myself needing to extra research to find solutions to -slightly-harder-than-usual problems. This is a collection of those solutions, and the journey. - -- [20230605 PySpark Fu](#20230605-pyspark-fu) - - [Instantiate spark](#instantiate-spark) - - [Convert DataFrame to List of Dicts](#convert-dataframe-to-list-of-dicts) - - [Problem 2: Filter nested key/values](#problem-2-filter-nested-keyvalues) - ---- - -## Instantiate spark - -> In particular, for testing small code snippets, I find it's good to use `local[1]` which forces Spark to run using only 1 core. -> This makes the results faster and minimises the impact on other processes. - -```python -from pyspark.sql import SparkSession - -def spark_session() -> SparkSession: - # local[1] locks the spark runtime to a single core - return SparkSession.builder.master("local[1]").getOrCreate() -``` - -## Convert DataFrame to List of Dicts - -```python -from pyspark.sql import DataFrame -from typing import List, Dict - -def collect_to_dict(df: DataFrame) -> List[Dict]: - return [r.asDict(recursive=True) for r in df.collect()] -``` - -## Problem 2: Filter nested key/values - -```python -from test.helper import spark_session -spark = spark_session() - -df = spark.createDataFrame([ - {'a': 'b', 'n': {'a': 'b'}}, - {'a': 'c', 'n': {'z': 'x', 'y': 'b'}}, - {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}} -]) - -df.show(truncate=False) - -# +---+---------------------------+ -# |a |n | -# +---+---------------------------+ -# |b |{a -> b} | -# |c |{y -> b, z -> x} | -# |d |{2 -> 3, t -> a, o -> null}| -# +---+---------------------------+ -``` - -```python -df2 = df.select( - 'a', - F.explode(F.col('n')) -).filter( - F.col('value').isNotNull() -).select( - 'a', - F.create_map(F.col('key'), F.col('value')).alias('n') -).groupBy( - 'a' -).agg( - F.collect_list('n').alias('maps') -).select( - 'a', - F.expr('aggregate(slice(maps, 2, size(maps)), maps[0], (acc, element) -> map_concat(acc, cast(element, int)))').alias('n') -) -``` - -```python -import pyspark.sql.functions as F -df = spark.createDataFrame([{'a': 'b', 'n': {'a': 'b'}}, {'a': 'c', 'n': {'z': 'x'}}, {'a': 'd', 'n': {'o': None, 't': 'a', '4': 3}}]) - - #Filter out the key-value pairs with null values -filtered_df = df.withColumn("n", F.map_filter(F.col("n"), lambda k, v: v.isNotNull())) - -df.display() -filtered_df.display() -``` - -```python -In [91]: df.show(truncate=False) -+---+-----------+ -|a |n | -+---+-----------+ -|b |{a -> b} | -|c |{z -> x} | -|d |{o -> null, t -> a}| -+---+-----------+ - - -In [92]: df_desired.show(truncate=False) -+---+--------+ -|a |n | -+---+--------+ -|b |{a -> b}| -|c |{z -> x}| -|d |{t -> a}| -+---+--------+ - -# e.g. write a function with the following interface or similar -# -In [93]: assert my_filter_func(df, primary='a', struct='n') == df_desired -``` \ No newline at end of file diff --git a/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb b/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb index 72751d1..902d3a7 100644 --- a/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb +++ b/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb @@ -1083,9 +1083,9 @@ } ], "source": [ - "Schema.from_df(spark.createDataFrame([{\n", - " 'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}\n", - "}])).show()" + "Schema.from_df(spark.createDataFrame([\n", + " {'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}}\n", + "])).show()" ] }, { diff --git a/articles/20230605_pyspark_fu/f.json b/articles/20230605_pyspark_fu/f.json deleted file mode 100644 index 3812f03..0000000 --- a/articles/20230605_pyspark_fu/f.json +++ /dev/null @@ -1,3 +0,0 @@ -{"a": "b", "n": {"a": "b"}} -{"a": "c", "n": {"z": "x", "y": "b"}} -{"a": "d", "n": {"o": null, "t": "a", "2": 3}}