Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added markdown syntax highlighting #86

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 73 additions & 56 deletions examples/data_cleaning_and_lambda.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,29 @@ and run your code there. You can do this in the AWS Glue console, as described
Begin by pasting some boilerplate into the DevEndpoint notebook to import the
AWS Glue libraries we'll need and set up a single `GlueContext`.

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```python
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

### 4. Data-type variations

First, let's see what the schema looks like using Spark DataFrames:

medicare = spark.read.format(
"com.databricks.spark.csv").option(
"header", "true").option(
"inferSchema", "true").load(
's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()
```python
medicare = spark.read.format(
"com.databricks.spark.csv").option(
"header", "true").option(
"inferSchema", "true").load(
's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()
```

The output from `printSchema` is:

Expand All @@ -81,10 +84,12 @@ The output from `printSchema` is:
Now, let's see what the schema looks like after we load the data into a DynamicFrame,
starting from the metadata that the crawler put in the AWS Glue Data Catalog:

medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
database = "payments",
table_name = "medicare")
medicare_dynamicframe.printSchema()
```python
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
database = "payments",
table_name = "medicare")
medicare_dynamicframe.printSchema()
```

The output from `printSchema` this time is:

Expand Down Expand Up @@ -121,8 +126,10 @@ To query the `provider id` column, we first need to resolve the choice. With Dyn
we can try to convert those `string` values to `long` values using the `resolveChoice`
transform method with a `cast:long` option:

medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()
```python
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()
```

The output of the `printSchema` call is now:

Expand All @@ -146,7 +153,9 @@ Another option is to convert the `choice` type to a `struct`, which keeps both t

Let's take a look at the rows that were anomalous:

medicare_res.toDF().where("`provider id` is NULL").show()
```python
medicare_res.toDF().where("`provider id` is NULL").show()
```

What we see is:

Expand All @@ -159,9 +168,10 @@ What we see is:

Let's remove those malformed records now:

medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("`provider id` is NOT NULL")

```python
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("`provider id` is NOT NULL")
```

### 5. Lambda functions (aka Python UDFs) and ApplyMapping

Expand All @@ -173,18 +183,20 @@ features of DynamicFrames.
Let's turn the payment information into numbers, so analytic engines like Amazon
Redshift or Amazon Athena can do their number crunching faster:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
"ACC", chop_f(
medicare_dataframe["average covered charges"])).withColumn(
"ATP", chop_f(
medicare_dataframe["average total payments"])).withColumn(
"AMP", chop_f(
medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
```python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
"ACC", chop_f(
medicare_dataframe["average covered charges"])).withColumn(
"ATP", chop_f(
medicare_dataframe["average total payments"])).withColumn(
"AMP", chop_f(
medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
```

The output from the `show` call is:

Expand Down Expand Up @@ -219,19 +231,21 @@ These are all still strings in the data. We can use the DynamicFrame's powerful
the data so that other data programming languages and sytems can
easily access it:

from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
('provider id', 'long', 'provider.id', 'long'),
('provider name', 'string', 'provider.name', 'string'),
('provider city', 'string', 'provider.city', 'string'),
('provider state', 'string', 'provider.state', 'string'),
('provider zip code', 'long', 'provider.zip', 'long'),
('hospital referral region description', 'string','rr', 'string'),
('ACC', 'string', 'charges.covered', 'double'),
('ATP', 'string', 'charges.total_pay', 'double'),
('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()
```python
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
('provider id', 'long', 'provider.id', 'long'),
('provider name', 'string', 'provider.name', 'string'),
('provider city', 'string', 'provider.city', 'string'),
('provider state', 'string', 'provider.state', 'string'),
('provider zip code', 'long', 'provider.zip', 'long'),
('hospital referral region description', 'string','rr', 'string'),
('ACC', 'string', 'charges.covered', 'double'),
('ATP', 'string', 'charges.total_pay', 'double'),
('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()
```

The `printSchema` output is:

Expand All @@ -251,7 +265,9 @@ The `printSchema` output is:

Turning the data back to DataFrame, we can show what it now looks like:

medicare_nest_dyf.toDF().show()
```python
medicare_nest_dyf.toDF().show()
```

The output is:

Expand Down Expand Up @@ -283,12 +299,13 @@ The output is:

Finally, let's write the data out in an optimized Parquet format for Redshift Spectrum or Athena:

glueContext.write_dynamic_frame.from_options(
frame = medicare_nest_dyf,
connection_type = "s3",
connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
format = "parquet")

```python
glueContext.write_dynamic_frame.from_options(
frame = medicare_nest_dyf,
connection_type = "s3",
connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
format = "parquet")
```