-
Notifications
You must be signed in to change notification settings - Fork 0
/
Cleaning_Data_with_PySpark.py
485 lines (312 loc) · 14.7 KB
/
Cleaning_Data_with_PySpark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
#####################
# DataFrame details #
#####################
### Defining a schema
# Import the pyspark.sql.types library
import pyspark.sql.types
from pyspark.sql.types import *
# Define a new schema using the StructType method
people_schema = StructType([
# Define a StructField for each field
StructField('name', StringType(), False),
StructField('age', IntegerType(), False),
StructField('city', StringType(), False)
])
### Using lazy processing
# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2018.csv.gz')
# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))
# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])
# Show the DataFrame
aa_dfw_df.show()
### Saving a DataFrame in Parquet format
# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())
# Combine the DataFrames into one
df3 = df1.union(df2)
# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')
# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())
### SQL and Parquet
# Read the Parquet file into flights_df
flights_df = spark.read.parquet('AA_DFW_ALL.parquet')
# Register the temp table
flights_df.createOrReplaceTempView('flights')
# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)
#############################################
# Manipulating DataFrames in the real world #
#############################################
### Filtering column content with Python
# Show the distinct VOTER_NAME entries
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)
# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))
# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)
### Modifying DataFrame columns
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))
# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))
# Drop the splits column
voter_df = voter_df.drop('splits')
# Show the voter_df DataFrame
voter_df.show()
### when() example
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
when(voter_df.TITLE == 'Councilmember', F.rand()))
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
when(voter_df.TITLE == 'Councilmember', F.rand())
.when(voter_df.TITLE == 'Mayor', 2)
.otherwise(0))
### When / Otherwise
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
when(voter_df.TITLE == 'Councilmember', F.rand())
.when(voter_df.TITLE == 'Mayor', 2)
.otherwise(0))
# Show some of the DataFrame rows
voter_df.show()
# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()
### Using user defined functions in Spark
def getFirstAndMiddle(names):
# Return a space separated string of names
return ' '.join(names[:-1])
# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())
# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))
# Show the DataFrame
voter_df.show()
### Adding an ID Field
# Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct()
# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())
# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
### IDs with different partitions
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())
# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
voter_df_single = voter_df_single.withColumn('ROW_ID', F.monotonically_increasing_id())
# Show the top 10 IDs in each DataFrame
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)
### More ID tricks
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]
# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)
# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()
#########################
# Improving Performance #
#########################
### Caching a DataFrame
start_time = time.time()
# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()
# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))
# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))
### Removing a DataFrame from cache
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")
# Remove departures_df from the cache
departures_df.unpersist()
# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)
### File import performance
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_*.txt.gz')
# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))
start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))
### Reading Spark configurations
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')
# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')
# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)
### Writing Spark configurations
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()
# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)
# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures.txt.gz').distinct()
# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())
### Normal joins
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df, \
flights_df["Destination Airport"] == airports_df["IATA"] )
# Show the query plan
normal_df.explain()
### Using broadcasting on Spark joins
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast
# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
flights_df["Destination Airport"] == airports_df["IATA"] )
# Show the query plan and compare against the original
broadcast_df.explain()
### Comparing broadcast vs normal joins
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time
start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time
# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))
#########################################
# Complex processing and data pipelines #
#########################################
### Quick pipeline
# Import the data to a DataFrame
departures_df = spark.read.csv('2015-departures.csv.gz', header=True)
# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] != '0')
# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())
# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')
### Removing commented lines
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()
# Count the number of rows beginning with '#'
comment_count = annotations_df.filter(col('_c0').startswith('#')).count()
# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')
# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))
### Removing invalid rows
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')
# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))
# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df['colcount'] < 5))
# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))
### Splitting into columns
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')
# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))
# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)
### Further parsing
def retriever(cols, colcount):
# Return a list of dog data
return cols[4:colcount]
# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))
# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))
# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('dog_list').drop('split_cols').drop('colcount')
### Validate rows via join
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')
# Count the number of rows in split_df
split_count = split_df.count()
# Join the DataFrames
joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")
# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))
### Examining invalid rows
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()
# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'left_anti')
# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))
# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)
### Dog parsing
# Select the dog details and show 10 untruncated rows
print(joined_df.select("dog_list").show(10, truncate=False))
# Define a schema type for the details in the dog list
DogType = StructType([
StructField("breed", StringType(), False),
StructField("start_x", IntegerType(), False),
StructField("start_y", IntegerType(), False),
StructField("end_x", IntegerType(), False),
StructField("end_y", IntegerType(), False)
])
### Per image count
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
dogs = []
for dog in doglist:
(breed, start_x, start_y, end_x, end_y) = dog.split(',')
dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
return dogs
# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))
# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')
# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)
### Percentage dog pixels
# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
totalpixels = 0
for dog in doglist:
totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
return totalpixels
# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount(joined_df.dogs))
# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width.cast("integer") * joined_df.height.cast("integer"))) * 100)
# Show the first 10 annotations with more than 60% dog
joined_df.filter(joined_df.dog_percent > 60).show(10)