-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
spot.py
100 lines (85 loc) · 3.13 KB
/
spot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/bin/env python3
import os
import logging
from time import time, sleep
from requests import get
from slack_sdk import WebClient
# Configure logging
logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class Spot(object):
"""
A class to monitor AWS EC2 Spot Instance interruptions and notify via Slack.
Attributes:
SLACK_API_TOKEN (str): Slack API token for authentication.
SLACK_CHANNEL (str): Slack channel ID where notifications will be sent.
EC2_META_DATA (str): URL to fetch EC2 instance metadata.
SPOT_META_URL (str): URL to check for EC2 Spot Instance termination notices.
SLEEP (int): Time in seconds to wait between checks for termination notices.
CLUSTER (str): Name of the cluster the instance belongs to.
"""
def __init__(self):
"""
Initializes the Spot instance with environment variables and constants.
"""
self.SLACK_API_TOKEN = os.environ.get('SLACK_API_TOKEN', None)
self.SLACK_CHANNEL = os.environ.get('SLACK_CHANNEL', None)
self.EC2_META_DATA = 'http://169.254.169.254/latest/dynamic/instance-identity/document/'
self.SPOT_META_URL = 'http://169.254.169.254/latest/meta-data/spot/termination-time'
self.SLEEP = 5
self.CLUSTER = os.environ.get('CLUSTER', None)
def instance_details(self):
"""
Fetches and returns the EC2 instance details.
Returns:
dict: A dictionary containing the EC2 instance details.
"""
return get(self.EC2_META_DATA, timeout=3).json()
def payload(self, m):
"""
Constructs the payload to be sent to Slack.
Args:
m (str): The message to be included in the payload.
Returns:
list: A list containing the payload dictionary.
"""
details = self.instance_details()
CLUSTER = 'Default' if self.CLUSTER is None else self.CLUSTER
return [{
"fallback": m,
"color": "#a30b24",
"pretext": "",
"author_name": "Mr Bot",
"author_icon": "http://ohai.mr-bot.co/assets/mrbot-500-5a2319d6ea6fa0362f73f3334805e012.png",
"title": "Spot Instance Termination Notice",
"title_link": "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-interruptions.html",
"text": "Cluster: {}, instanceId: {}, accountId: {}, AZ: {}, instanceType: {}".format(CLUSTER, details['instanceId'], details['accountId'], details['availabilityZone'], details['instanceType']),
"fields": [{
"title": "Priority",
"value": "High",
"short": "false"
}],
"footer": "SpotInstaceWatcher",
"ts": time()
}]
def slackit(self):
"""
Sends a notification to Slack using the constructed payload.
"""
slack = WebClient(token=self.SLACK_API_TOKEN)
slack.api_call(
"chat.postMessage",
channel=self.SLACK_CHANNEL,
attachments=self.payload('terminated!')
)
def watcher(self):
"""
Monitors for EC2 Spot Instance termination notices and sends notifications.
"""
while get(self.SPOT_META_URL).status_code != 200:
logging.info(f"Instance {self.instance_details().get('instanceId')} still alive, looping ...")
sleep(self.SLEEP)
self.slackit()
if __name__ == '__main__':
spot = Spot()
spot.watcher()