diff --git a/examples/data_cleaning_and_lambda.md b/examples/data_cleaning_and_lambda.md index ef711bb..db5c7c6 100644 --- a/examples/data_cleaning_and_lambda.md +++ b/examples/data_cleaning_and_lambda.md @@ -41,26 +41,29 @@ and run your code there. You can do this in the AWS Glue console, as described Begin by pasting some boilerplate into the DevEndpoint notebook to import the AWS Glue libraries we'll need and set up a single `GlueContext`. - import sys - from awsglue.utils import getResolvedOptions - from pyspark.context import SparkContext - from awsglue.context import GlueContext - from awsglue.dynamicframe import DynamicFrame - from awsglue.job import Job - - glueContext = GlueContext(SparkContext.getOrCreate()) +```python +import sys +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext +from awsglue.context import GlueContext +from awsglue.dynamicframe import DynamicFrame +from awsglue.job import Job +glueContext = GlueContext(SparkContext.getOrCreate()) +``` ### 4. Data-type variations First, let's see what the schema looks like using Spark DataFrames: - medicare = spark.read.format( - "com.databricks.spark.csv").option( - "header", "true").option( - "inferSchema", "true").load( - 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') - medicare.printSchema() +```python +medicare = spark.read.format( + "com.databricks.spark.csv").option( + "header", "true").option( + "inferSchema", "true").load( + 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') +medicare.printSchema() +``` The output from `printSchema` is: @@ -81,10 +84,12 @@ The output from `printSchema` is: Now, let's see what the schema looks like after we load the data into a DynamicFrame, starting from the metadata that the crawler put in the AWS Glue Data Catalog: - medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( - database = "payments", - table_name = "medicare") - medicare_dynamicframe.printSchema() +```python +medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( + database = "payments", + table_name = "medicare") +medicare_dynamicframe.printSchema() +``` The output from `printSchema` this time is: @@ -121,8 +126,10 @@ To query the `provider id` column, we first need to resolve the choice. With Dyn we can try to convert those `string` values to `long` values using the `resolveChoice` transform method with a `cast:long` option: - medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) - medicare_res.printSchema() +```python +medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) +medicare_res.printSchema() +``` The output of the `printSchema` call is now: @@ -146,7 +153,9 @@ Another option is to convert the `choice` type to a `struct`, which keeps both t Let's take a look at the rows that were anomalous: - medicare_res.toDF().where("`provider id` is NULL").show() +```python +medicare_res.toDF().where("`provider id` is NULL").show() +``` What we see is: @@ -159,9 +168,10 @@ What we see is: Let's remove those malformed records now: - medicare_dataframe = medicare_res.toDF() - medicare_dataframe = medicare_dataframe.where("`provider id` is NOT NULL") - +```python +medicare_dataframe = medicare_res.toDF() +medicare_dataframe = medicare_dataframe.where("`provider id` is NOT NULL") +``` ### 5. Lambda functions (aka Python UDFs) and ApplyMapping @@ -173,18 +183,20 @@ features of DynamicFrames. Let's turn the payment information into numbers, so analytic engines like Amazon Redshift or Amazon Athena can do their number crunching faster: - from pyspark.sql.functions import udf - from pyspark.sql.types import StringType - - chop_f = udf(lambda x: x[1:], StringType()) - medicare_dataframe = medicare_dataframe.withColumn( - "ACC", chop_f( - medicare_dataframe["average covered charges"])).withColumn( - "ATP", chop_f( - medicare_dataframe["average total payments"])).withColumn( - "AMP", chop_f( - medicare_dataframe["average medicare payments"])) - medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show() +```python +from pyspark.sql.functions import udf +from pyspark.sql.types import StringType + +chop_f = udf(lambda x: x[1:], StringType()) +medicare_dataframe = medicare_dataframe.withColumn( + "ACC", chop_f( + medicare_dataframe["average covered charges"])).withColumn( + "ATP", chop_f( + medicare_dataframe["average total payments"])).withColumn( + "AMP", chop_f( + medicare_dataframe["average medicare payments"])) +medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show() +``` The output from the `show` call is: @@ -219,19 +231,21 @@ These are all still strings in the data. We can use the DynamicFrame's powerful the data so that other data programming languages and sytems can easily access it: - from awsglue.dynamicframe import DynamicFrame - medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") - medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), - ('provider id', 'long', 'provider.id', 'long'), - ('provider name', 'string', 'provider.name', 'string'), - ('provider city', 'string', 'provider.city', 'string'), - ('provider state', 'string', 'provider.state', 'string'), - ('provider zip code', 'long', 'provider.zip', 'long'), - ('hospital referral region description', 'string','rr', 'string'), - ('ACC', 'string', 'charges.covered', 'double'), - ('ATP', 'string', 'charges.total_pay', 'double'), - ('AMP', 'string', 'charges.medicare_pay', 'double')]) - medicare_nest_dyf.printSchema() +```python +from awsglue.dynamicframe import DynamicFrame +medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") +medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), + ('provider id', 'long', 'provider.id', 'long'), + ('provider name', 'string', 'provider.name', 'string'), + ('provider city', 'string', 'provider.city', 'string'), + ('provider state', 'string', 'provider.state', 'string'), + ('provider zip code', 'long', 'provider.zip', 'long'), + ('hospital referral region description', 'string','rr', 'string'), + ('ACC', 'string', 'charges.covered', 'double'), + ('ATP', 'string', 'charges.total_pay', 'double'), + ('AMP', 'string', 'charges.medicare_pay', 'double')]) +medicare_nest_dyf.printSchema() +``` The `printSchema` output is: @@ -251,7 +265,9 @@ The `printSchema` output is: Turning the data back to DataFrame, we can show what it now looks like: - medicare_nest_dyf.toDF().show() +```python +medicare_nest_dyf.toDF().show() +``` The output is: @@ -283,12 +299,13 @@ The output is: Finally, let's write the data out in an optimized Parquet format for Redshift Spectrum or Athena: - glueContext.write_dynamic_frame.from_options( - frame = medicare_nest_dyf, - connection_type = "s3", - connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, - format = "parquet") - +```python +glueContext.write_dynamic_frame.from_options( + frame = medicare_nest_dyf, + connection_type = "s3", + connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, + format = "parquet") +```