Skip to content

Commit

Permalink
Merge pull request #7 from Canadian-Geospatial-Platform/dev
Browse files Browse the repository at this point in the history
v1-1-1-release
  • Loading branch information
xinli-cai authored Aug 30, 2023
2 parents 82726b9 + 0312322 commit a580dce
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 80 deletions.
24 changes: 24 additions & 0 deletions aws-lambda/Preprocessing_lambda/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Use a base image provided by AWS for Python 3.9 runtime
FROM public.ecr.aws/lambda/python:3.9-x86_64

# Set the working directory
WORKDIR /var/task

# Copy requirements.txt
COPY requirements.txt ./

# Copy function code
COPY app.py ./

# Install the specified packages
RUN pip install -r requirements.txt

# Download the NLTK 'punkt' tokenizer and 'stopwords' data
RUN python -m nltk.downloader -d /var/task/nltk_data punkt stopwords

# Set the NLTK_DATA environment variable to use the included data
ENV NLTK_DATA=/var/task/nltk_data

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.lambda_handler" ]

26 changes: 17 additions & 9 deletions aws-lambda/Preprocessing_lambda/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,39 @@
import nltk
from nltk.corpus import stopwords # module for stop words that come with NLTK
from nltk.stem import PorterStemmer # module for stemming
from nltk.tokenize import word_tokenize # module for tokenizing strings
nltk.download('punkt')
nltk.download('stopwords')
from nltk.tokenize import word_tokenize # module for tokenizing strings

#nltk.download('punkt')
#nltk.download('stopwords')

import re

# Environment variables
# environment variables for lambda
file_name = os.environ['FILE_NAME']
bucket_name = os.environ['BUCKET_NAME']
bucket_name_nlp = os.environ['BUCKET_NAME_NLP']

"""
#dev setting -- comment out for release
file_name = "records.parquet"
bucket_name = "webpresence-geocore-geojson-to-parquet-dev"
bucket_name_nlp='nlp-data-preprocessing'

selected_var = ['features_properties_id', 'features_properties_title_en', 'features_properties_title_en','features_properties_description_en','features_properties_keywords_en']
"""

def lambda_handler(event, context):

#Change directory to /tmp folder
os.chdir('/tmp') #This is important
"""
#Make a directory
if not os.path.exists(os.path.join('mydir')):
os.makedirs('mydir')
"""
df = open_S3_file_as_df(bucket_name, file_name)
print(f'The shape of the raw metadata parquet dataset is {df.shape}')

# Select key columns, currently only english
df_en = df[selected_var]
df_en = df[['features_properties_id', 'features_properties_title_en','features_properties_title_fr','features_properties_description_en','features_properties_keywords_en']]
# Replace NaN and "Not Available; Indisponible" with empty string
print("The NaN values in the English columns are \n")
df_en = df_en.fillna('')
Expand All @@ -58,7 +66,7 @@ def lambda_handler(event, context):
#print(duplicateRowsDF['features_properties_id'].unique())

# Save to temp folder, see https://iotespresso.com/temporary-storage-during-aws-lambda-runtime-python/
save_path = os.path.join(os.getcwd(), 'mydir', 'duplicateRowsDF')
save_path = os.path.join(os.getcwd(), 'duplicateRowsDF')
duplicateRowsDF.to_csv(save_path)
df_fetched= pd.read_csv(save_path)

Expand Down
3 changes: 2 additions & 1 deletion aws-lambda/Preprocessing_lambda/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pandas
nltk
pyarrow
fastparquet
requests
requests
urllib3<2
17 changes: 17 additions & 0 deletions aws-lambda/Preprocessing_lambda/zip-deployment.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
# Build Cloud Formation Zip Package for Similarity Engine using Data Preprocessing
# Author: Xinli Cai
# Date: August 29, 2023


# Navigate to the directory containing `requirements.txt`
cd similarity-engine-data-process/

# Install required packages
pip install -t similarity-engine-data-preprocess-20230822-2200/ -r requirements.txt

# Change to the build directory
cd similarity-engine-data-preprocess-20230822-2200/

# Zip the necessary files
zip -r similarity-engine-data-preprocess-20230822-2200.zip ../app.py ../__init.py__ ./*
19 changes: 19 additions & 0 deletions aws-lambda/Word2Vec_lambda/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Use a base image provided by AWS for Python 3.9 runtime
FROM public.ecr.aws/lambda/python:3.9-x86_64

# Set the working directory
WORKDIR /var/task

# Copy requirements.txt
COPY requirements.txt ./

# Copy function code
COPY app.py ./
COPY dynamodb.py ./

# Install the specified packages
RUN pip install -r requirements.txt

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.lambda_handler" ]

6 changes: 0 additions & 6 deletions aws-lambda/Word2Vec_lambda/README.md

This file was deleted.

97 changes: 38 additions & 59 deletions aws-lambda/Word2Vec_lambda/app.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,54 @@

import boto3
import logging
from botocore.exceptions import ClientError
import pandas as pd


import numpy as np
import json
import datetime

import io
import os

from gensim.models import Word2Vec
from sklearn.metrics.pairwise import cosine_similarity
from gensim import matutils

from tqdm import tqdm
from dynamodb import *


#dev setting
# environment variables for lambda
file_name = os.environ['FILE_NAME']
file_name_origianl = os.environ['FILE_NAME_ORIGINAL']
bucket_name_nlp = os.environ['BUCKET_NAME_NLP']
bucket_name = os.environ['BUCKET_NAME']

"""
#dev setting -- comment out for release
file_name = "Processed_records.parquet"
bucket_name_nlp = "nlp-data-preprocessing"
file_name_origianl = "records.parquet"
bucket_name = "webpresence-geocore-geojson-to-parquet-dev"
"""

def lambda_handler(event, context):
"""
#Change directory to /tmp folder, this is required if new files are created for lambda
os.chdir('/tmp') #This is important
#Make a directory
if not os.path.exists(os.path.join('mydir')):
os.makedirs('mydir')

"""
# Read the preprocessed data from S3
df_en = open_S3_file_as_df(bucket_name_nlp, file_name)
try:
df_en = open_S3_file_as_df(bucket_name_nlp, file_name)
except ClientError as e:
print('Accessing the S3 was failed on line 47 when calling df_en = open_S3_file_as_df(bucket_name_nlp, file_name)')
print(e.response['Error']['Message'])

# Get a sample of 500 rows as the training data
df = df_en[['features_properties_id', 'features_properties_title_en', 'metadata_en_processed']]
#df = df.sample(n=500, random_state=1)
# Use all data to train the model
df.head()
print(df.shape)


# Use all data to train the model
df = df_en[['features_properties_id', 'features_properties_title_en', 'features_properties_title_fr','metadata_en_processed']]
print(f'The shape of the preprocessed df is {df.shape}')
# Replace the missing value in the 'features_properties_title_en' column with an empty string
df['features_properties_title_en'].fillna('', inplace=True)


# Prepare the input for the Word2Vec model
sentences = df['metadata_en_processed'].apply(lambda x: x.split(' ')).tolist()
Expand All @@ -58,55 +60,21 @@ def lambda_handler(event, context):

# Convert each sentence in 'metadata_preprocessed' into a vector
vectors = df['metadata_en_processed'].apply(sentence_to_vector, model=model)
# Replace the missing value in the 'features_properties_title_en' column with an empty string
df['features_properties_title_en'].fillna('', inplace=True)


# Calculate similarity between each vector and all others
similarity_matrix = cosine_similarity(np.array(vectors.tolist()))


# Initialize new columns for the top 5 similar texts
df['sim1'], df['sim2'], df['sim3'], df['sim4'], df['sim5'] = "", "", "", "", ""

# For each text, find the top 5 most similar texts and append their 'features_properties_title_en' as new columns
df.reset_index(drop=True, inplace=True)
for i in tqdm(range(similarity_matrix.shape[0])):
top_5_similar = np.argsort(-similarity_matrix[i, :])[1:6] # Exclude the text itself
df.loc[i, ['sim1', 'sim2', 'sim3', 'sim4', 'sim5']] = df.loc[top_5_similar, 'features_properties_title_en'].values

# Read the original parquet file and merge by features_properties_id
df_original = open_S3_file_as_df(bucket_name, file_name_origianl)
merged_df = df_original.merge(df[['features_properties_id', 'sim1', 'sim2', 'sim3', 'sim4', 'sim5']], on='features_properties_id', how='left')
""" Option 1: merge the similar results with records.parquet directly
# Initialize new columns for the top 10 similar texts
df['sim1'], df['sim2'], df['sim3'], df['sim4'], df['sim5'],df['sim6'], df['sim7'], df['sim8'], df['sim9'], df['sim10'] = "", "", "", "", "","", "", "", "", ""

# For each text, find the top 10 most similar texts and append their 'features_properties_title_en' as new columns
df.reset_index(drop=True, inplace=True)
for i in tqdm(range(similarity_matrix.shape[0])):
top_10_similar = np.argsort(-similarity_matrix[i, :])[1:11] # Exclude the text itself
df.loc[i, ['sim1', 'sim2', 'sim3', 'sim4', 'sim5','sim6', 'sim7', 'sim8', 'sim9', 'sim10']] = df.loc[top_10_similar, 'features_properties_id'].values
# Read the original parquet file and merge by features_properties_id
df_original = open_S3_file_as_df(bucket_name, file_name_origianl)
merged_df = df_original.merge(df[['features_properties_id', 'sim1', 'sim2', 'sim3', 'sim4', 'sim5','sim6', 'sim7', 'sim8', 'sim9', 'sim10']],
on='features_properties_id', how='left')
print(f'the shape of original parquet file is {df_original.shape}')
# Save to temp folder, see https://iotespresso.com/temporary-storage-during-aws-lambda-runtime-python/
save_path = os.path.join(os.getcwd(), 'mydir', 'merged_df')
merged_df.to_csv(save_path)
df_fetched= pd.read_csv(save_path)
print(f'the shape of merged parquet file is {merged_df.shape}')
# upload merged dataframe to S3
upload_dataframe_to_s3_as_parquet(df=df_fetched, bucket_name=bucket_name_nlp, file_key='sim_word2vec_records.parquet')
"""
# Upload the similar results as a AWS dynamodb
"""
The parquet lambda function has been modified to merge the similairy table with records.parquet everytime when records.parquet is updated.
#Option 2: upload the similar results as a dynamodb, and merge the tabke with records.parquet everytime when records.parquet is updated
"""
df['similarity'] = np.nan # Initialize the column
# For each text, find the top 10 most similar texts and save them as a JSON array object in the 'similarity' column
df.reset_index(drop=True, inplace=True)
for i in tqdm(range(similarity_matrix.shape[0])):
for i in range(similarity_matrix.shape[0]):
top_10_similar = np.argsort(-similarity_matrix[i, :])[1:11] # Exclude the text itself
sim_array = []
for j, idx in enumerate(top_10_similar):
Expand Down Expand Up @@ -137,7 +105,6 @@ def lambda_handler(event, context):
delete_table(TableName='similarity')
waiter = client.get_waiter('table_not_exists')
waiter.wait(TableName='similarity')
print('Before create')
except ClientError as e:
print(e)
#Create table
Expand All @@ -148,8 +115,20 @@ def lambda_handler(event, context):
waiter.wait(TableName='similarity')
except ClientError as e:
print(e)

"""DEBUG
#Check if empty string in the primary key before scan the table
empty_string_rows = df[df['features_properties_id'] == '']
print(f'Number of NA values in the df id column is \n {empty_string_rows}')
"""
#Remove rows with empty string in 'features_properties_id', primary key can not be empty in DynamoDB table
df_cleaned = df[df['features_properties_id']!='']
rows_removed = df.shape[0] - df_cleaned.shape[0]
print(f'Removed {rows_removed} rows with empyt string in features_properties_id')
#Batch write to table
batch_write_items_into_table(df, TableName='similarity')
batch_write_items_into_table(df_cleaned, TableName='similarity')



# Function to read the parquet file as pandas dataframe
def open_S3_file_as_df(bucket_name, file_name):
Expand Down
12 changes: 7 additions & 5 deletions aws-lambda/Word2Vec_lambda/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
gensim==4.1.2
scikit-learn==1.0
pyarrow
fastparquet
tqdm
pandas
numpy
scikit-learn
transformers
torch
pyarrow
fastparquet
22 changes: 22 additions & 0 deletions aws-lambda/Word2Vec_lambda/zip-deployment.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
# Build Cloud Formation Package for Similarity Engine using Word2Vec
# Author: Xinli Cai
# Date: August 29, 2023


# Cloud Formation Package for Similarity Engine using Word2Vec

# Step 1: Navigate to the directory where the `requirements.txt` is
cd similarity-engine-word2vec-model-dev/

# Step 2: Install the required packages
pip install -t similarity-engine-word2vec-model-build/ -r requirements.txt

# Step 3: Change to the build directory
cd similarity-engine-word2vec-model-build/

# Step 4: Zip the necessary files
zip -r similarity-engine-word2vec-model.zip ../app.py ../dynamodb.py ../__init.py__ ./*

# Restore to the initial directory
cd ..

0 comments on commit a580dce

Please sign in to comment.