-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
87 lines (75 loc) · 2.99 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os
import json
import urllib.request
import boto3
from datetime import datetime, timedelta, timezone
def format_game_data(game):
status = game.get("Status", "Unknown")
away_team = game.get("AwayTeam", "Unknown")
home_team = game.get("HomeTeam", "Unknown")
final_score = f"{game.get('AwayTeamScore', 'N/A')}-{game.get('HomeTeamScore', 'N/A')}"
start_time = game.get("DateTime", "Unknown")
channel = game.get("Channel", "Unknown")
quarters = game.get("Quarters", [])
quarter_scores = ', '.join([f"Q{q['Number']}: {q.get('AwayScore', 'N/A')}-{q.get('HomeScore', 'N/A')}" for q in quarters])
if status == "Final":
return (
f"Game Status: {status}\n"
f"{away_team} vs {home_team}\n"
f"Final Score: {final_score}\n"
f"Start Time: {start_time}\n"
f"Channel: {channel}\n"
f"Quarter Scores: {quarter_scores}\n"
)
elif status == "InProgress":
last_play = game.get("LastPlay", "N/A")
return (
f"Game Status: {status}\n"
f"{away_team} vs {home_team}\n"
f"Current Score: {final_score}\n"
f"Last Play: {last_play}\n"
f"Channel: {channel}\n"
)
elif status == "Scheduled":
return (
f"Game Status: {status}\n"
f"{away_team} vs {home_team}\n"
f"Start Time: {start_time}\n"
f"Channel: {channel}\n"
)
else:
return (
f"Game Status: {status}\n"
f"{away_team} vs {home_team}\n"
f"Details are unavailable at the moment.\n"
)
def lambda_handler(event, context):
api_key = os.getenv("NBA_API_KEY")
sns_topic_arn = os.getenv("SNS_TOPIC_ARN")
sns_client = boto3.client("sns")
utc_now = datetime.now(timezone.utc)
east_african_time = utc_now + timedelta(hours=3)
today_date = east_african_time.strftime("%Y-%m-%d")
print(f"Fetching games for date: {today_date}")
api_url = f"https://api.sportsdata.io/v3/nba/scores/json/GamesByDate/{today_date}?key={api_key}"
print(today_date)
try:
with urllib.request.urlopen(api_url) as response:
data = json.loads(response.read().decode())
print(json.dumps(data, indent=4))
except Exception as e:
print(f"Error fetching data from API: {e}")
return {"statusCode": 500, "body": "Error fetching data"}
messages = [format_game_data(game) for game in data]
final_message = "\n---\n".join(messages) if messages else "No games available for today."
try:
sns_client.publish(
TopicArn=sns_topic_arn,
Message=final_message,
Subject="NBA Game Updates"
)
print("Message published to SNS successfully.")
except Exception as e:
print(f"Error publishing to SNS: {e}")
return {"statusCode": 500, "body": "Error publishing to SNS"}
return {"statusCode": 200, "body": "Data processed and sent to SNS"}