-
Notifications
You must be signed in to change notification settings - Fork 908
/
Copy pathpyspark-create-dataframe-dictionary.py
52 lines (44 loc) · 1.61 KB
/
pyspark-create-dataframe-dictionary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# -*- coding: utf-8 -*-
"""
author SparkByExamples.com
"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dataDictionary = [
('James',{'hair':'black','eye':'brown'}),
('Michael',{'hair':'brown','eye':None}),
('Robert',{'hair':'red','eye':'black'}),
('Washington',{'hair':'grey','eye':'grey'}),
('Jefferson',{'hair':'brown','eye':''})
]
df = spark.createDataFrame(data=dataDictionary, schema = ['name','properties'])
df.printSchema()
df.show(truncate=False)
# Using StructType schema
from pyspark.sql.types import StructField, StructType, StringType, MapType,IntegerType
schema = StructType([
StructField('name', StringType(), True),
StructField('properties', MapType(StringType(),StringType()),True)
])
df2 = spark.createDataFrame(data=dataDictionary, schema = schema)
df2.printSchema()
df2.show(truncate=False)
df3=df.rdd.map(lambda x: \
(x.name,x.properties["hair"],x.properties["eye"])) \
.toDF(["name","hair","eye"])
df3.printSchema()
df3.show()
df.withColumn("hair",df.properties.getItem("hair")) \
.withColumn("eye",df.properties.getItem("eye")) \
.drop("properties") \
.show()
df.withColumn("hair",df.properties["hair"]) \
.withColumn("eye",df.properties["eye"]) \
.drop("properties") \
.show()
# Functions
from pyspark.sql.functions import explode,map_keys,col
keysDF = df.select(explode(map_keys(df.properties))).distinct()
keysList = keysDF.rdd.map(lambda x:x[0]).collect()
keyCols = list(map(lambda x: col("properties").getItem(x).alias(str(x)), keysList))
df.select(df.name, *keyCols).show()