forked from rb-one/Cuso_Introductorio_de_Spark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
codeExample.py
44 lines (40 loc) · 1.55 KB
/
codeExample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: mnmcount <file>", file=sys.stderr)
sys.exit(-1)
spark = (SparkSession
.builder
.appName("PythonMnMCount")
.getOrCreate())
# get the M&M data set file name
mnm_file = sys.argv[1]
# read the file into a Spark DataFrame
mnm_df = (spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnm_file))
mnm_df.show(n=5, truncate=False)
# aggregate count of all colors and groupBy state and color
# orderBy descending order
count_mnm_df = ( mnm_df.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(count("Count")
.alias("Total"))
.orderBy("Total", ascending=False))
# show all the resulting aggregation for all the dates and colors
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))
#
# find the aggregate count for California by filtering
ca_count_mnm_df = ( mnm_df.select("*")
.where(mnm_df.State == 'CA')
.groupBy("State", "Color")
.agg(count("Count")
.alias("Total"))
.orderBy("Total", ascending=False) )
# show the resulting aggregation for California
ca_count_mnm_df.show(n=10, truncate=False)