Skip to content

Commit

Permalink
added aws lambdas and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
minimike86 committed Dec 1, 2024
1 parent 0a1c447 commit bd700ab
Show file tree
Hide file tree
Showing 4 changed files with 1,156 additions and 51 deletions.
86 changes: 59 additions & 27 deletions aws/lambda/refresh_twitch_access_token.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import json
import logging
import os
from typing import Tuple

import boto3
import urllib3
import requests
from botocore.exceptions import ClientError
from urllib3 import encode_multipart_formdata

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)


def get_parameter(parameter_name):
ssm = boto3.client("ssm")
Expand All @@ -19,22 +29,22 @@ def get_secret(parameter):
raise


def refresh_token(_refresh_token):
def refresh_token(_refresh_token: str) -> Tuple[bool, any]:
# Retrieve client id from AWS Parameter Store
# Retrieve client secret from AWS Parameter Store
# TODO: Don't hardcode secret name arn needs to fetch from env_var
client_id_param_arn = (
"arn:aws:ssm:eu-west-2:339713094915:parameter/twitch/client_id"
)
client_secret_param_arn = (
"arn:aws:ssm:eu-west-2:339713094915:parameter/twitch/client_secret"
)
try:
client_id = get_secret(
"arn:aws:ssm:eu-west-2:339713094915:parameter/twitch/client_id"
) # TODO: Don't hardcode secret name arn
client_secret = get_secret(
"arn:aws:ssm:eu-west-2:339713094915:parameter/twitch/client_secret"
) # TODO: Don't hardcode secret name arn
except Exception as e:
return {
"statusCode": 500,
"body": json.dumps({"message": f"Error retrieving secret: {str(e)}"}),
}
http = urllib3.PoolManager()
client_id = get_secret(client_id_param_arn)
client_secret = get_secret(client_secret_param_arn)
except Exception as exception:
raise exception

try:
url = "https://id.twitch.tv/oauth2/token"
data = {
Expand All @@ -45,28 +55,41 @@ def refresh_token(_refresh_token):
}
encoded_data = encode_multipart_formdata(data)[1]
headers = {"Content-Type": "application/x-www-form-urlencoded}"}
response = http.request("POST", url, body=encoded_data, headers=headers)
response = requests.post(url, data=encoded_data, headers=headers)

# Check if request was successful
if response.status == 200:
return True, json.loads(response.data.decode("utf-8"))
if response.status_code == 200:
return True, json.loads(response.json())
else:
return False, None
finally:
http.clear()
return False, json.loads(response.json())

except Exception as exception:
raise exception


def store_in_dynamodb(_refresh_token, refresh_response):
"""
Stores or updates the refresh and access tokens in DynamoDB.
Parameters:
- _refresh_token: The current refresh token held by the system.
- refresh_response: The result of a web request for a new token, containing the access token,
refresh token, scope, and token type.
Returns:
- A dictionary with the HTTP status code and a message about the operation.
"""
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("MSecBot_User") # TODO: Don't hardcode dynamodb.Table name
table_name = os.getenv("DYNAMODB_USER_TABLE_NAME")
table = dynamodb.Table(table_name)

try:
# Check if the item exists
response = table.get_item(Key={"refresh_token": _refresh_token})
if "Item" in response:
get_item_outcome = table.get_item(Key={"refresh_token": _refresh_token})
if "Item" in get_item_outcome:
# Item exists, update it
table.update_item(
Key={"refresh_token": _refresh_token},
Key={"id": int(get_item_outcome["Item"]["id"])},
UpdateExpression="set access_token=:a, refresh_token=:r, scope=:s, token_type=:t",
ExpressionAttributeValues={
":a": refresh_response.get("access_token"),
Expand Down Expand Up @@ -103,10 +126,11 @@ def store_in_dynamodb(_refresh_token, refresh_response):
}
)
return {"statusCode": 200, "body": json.dumps("Token stored successfully!")}
except ClientError as e:
except ClientError as client_error:
logger.error(f"[client_error]: {client_error.response['Error']['Message']}")
return {
"statusCode": 500,
"body": json.dumps(f'Error: {e.response["Error"]["Message"]}'),
"body": json.dumps(f'Error: {client_error.response["Error"]["Message"]}'),
}


Expand All @@ -122,7 +146,15 @@ def lambda_handler(event, _context):
}

# Validate access token
is_refreshed, refresh_response = _refresh_token(refresh_token)
try:
is_refreshed, refresh_response = refresh_token(_refresh_token)
except Exception as exception:
return {
"statusCode": 500,
"body": json.dumps(
{"message": f"Error retrieving secret: {str(exception)}"}
),
}

if is_refreshed:
# Update user refresh token in DynamoDB
Expand Down
66 changes: 42 additions & 24 deletions aws/lambda/store_oauth_authorize_code.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json
import os

import boto3
import urllib3
import requests
from botocore.exceptions import ClientError
from moto.ssm.exceptions import ParameterNotFound


def get_parameter(parameter_name):
Expand All @@ -12,24 +14,24 @@ def get_parameter(parameter_name):


def validate_token(access_token):
http = urllib3.PoolManager()
try:
url = "https://id.twitch.tv/oauth2/validate"
headers = {"Authorization": f"OAuth {access_token}"}
response = http.request("GET", url, headers=headers)
response = requests.get(url=url, headers=headers)

# Check if request was successful
if response.status == 200:
return True, json.loads(response.data.decode("utf-8"))
if response.status_code == 200:
return True, response.json()
else:
return False, None
finally:
http.clear()
return False, response.json()
except Exception as exc_info:
return False, {"status": 500, "message": str(exc_info)}


def store_in_dynamodb(token_response, validation_response):
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("MSecBot_User") # TODO: Don't hardcode dynamodb.Table name
table_name = os.getenv("DYNAMODB_USER_TABLE_NAME")
table = dynamodb.Table(table_name)

try:
# Check if the item exists
Expand Down Expand Up @@ -66,10 +68,16 @@ def store_in_dynamodb(token_response, validation_response):
"scopes": validation_response.get("scopes"),
}
)
except ClientError as e:

except TypeError as type_error:
return {
"statusCode": 500,
"body": json.dumps(f"Error: {type_error}"),
}
except ClientError as client_error:
return {
"statusCode": 500,
"body": json.dumps(f'Error: {e.response["Error"]["Message"]}'),
"body": json.dumps(f'Error: {client_error.response["Error"]["Message"]}'),
}


Expand Down Expand Up @@ -100,21 +108,32 @@ def lambda_handler(event, _context):
client_secret = get_secret(
"arn:aws:ssm:eu-west-2:339713094915:parameter/twitch/client_secret"
) # TODO: Don't hardcode secret name arn
except Exception as e:
except ParameterNotFound as parameter_not_found:
return {
"statusCode": 500,
"body": json.dumps(
{"message": f"Error retrieving secret: {str(parameter_not_found)}"}
),
}
except Exception as exception:
return {
"statusCode": 500,
"body": json.dumps({"message": f"Error retrieving secret: {str(e)}"}),
"body": json.dumps(
{"message": f"Error retrieving secret: {str(exception)}"}
),
}

# Retrieve redirect_uri from AWS Parameter Store
try:
redirect_uri = get_parameter(
"arn:aws:ssm:eu-west-2:339713094915:parameter/twitch/oath2/redirect_url"
) # TODO: Don't hardcode twitch/oath2/redirect_url arn
except Exception as e:
except Exception as exception:
return {
"statusCode": 500,
"body": json.dumps({"message": f"Error retrieving redirect uri: {str(e)}"}),
"body": json.dumps(
{"message": f"Error retrieving redirect uri: {str(exception)}"}
),
}

# Define parameters for POST request
Expand All @@ -128,16 +147,15 @@ def lambda_handler(event, _context):
encoded_params = json.dumps(params).encode("utf-8")

# Make POST request to Twitch API
http = urllib3.PoolManager()
try:
url = "https://id.twitch.tv/oauth2/token"
headers = {"Content-Type": "application/json"}
response = http.request("POST", url, body=encoded_params, headers=headers)
response = requests.post(url=url, json=encoded_params, headers=headers)

# Check if request was successful
if response.status == 200:
# Parse the JSON response
token_response = json.loads(response.data.decode("utf-8"))
if response.status_code == 200:
# Parse the JSON response=
token_response = json.loads(response.json().decode("utf-8"))

# Check received token is valid and grab extra metadata
is_valid, validation_response = validate_token(
Expand All @@ -148,7 +166,7 @@ def lambda_handler(event, _context):
# Store validation response in DynamoDB
store_in_dynamodb(token_response, validation_response)

return {"statusCode": 200, "body": response.data.decode("utf-8")}
return {"statusCode": 200, "body": json.dumps(token_response)}

else:
return {
Expand All @@ -158,9 +176,9 @@ def lambda_handler(event, _context):

else:
return {
"statusCode": response.status,
"statusCode": response.status_code,
"body": json.dumps({"message": "Failed to retrieve access token"}),
}

finally:
http.clear()
except Exception as exception:
return {"statusCode": 500, "body": json.dumps({"error": str(exception)})}
Loading

0 comments on commit bd700ab

Please sign in to comment.