-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfailure_detector.py
189 lines (171 loc) · 7.18 KB
/
failure_detector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import socket
import sys
import logging
import time
import random
import math
import threading
import pickle
# Return membershiplist
def getmid(mlist, i):
return '%s/%d/%s' %(mlist.lst[i]['host'], mlist.lst[i]['port'], mlist.timestamps[i])
# Get the address and port
def getmel(mid):
addr = mid.split('/')
return {'host': addr[0], 'port': int(addr[1])}
class FailureDetector:
# Initailize Failure Detector Class
def __init__(self, mlist, host, port,fail_queue):
self.buffer_recent = {}
self.mlist = mlist
self.timestamp = self.mlist.time
self.host = host
self.port = port
self.fail_queue = fail_queue
#self.crash_fail = []
# Form a piggyback packet
def form_piggyback_packet(self,func_identifier,msg_type):
msg_formed = msg_type
for key,val in self.buffer_recent.items():
#logging.info(func_identifier + ' Reading key from dictionary ' + key)
if val > 0:
new_val = val - 1
msg_formed = msg_formed + ',' + key
logging.info(func_identifier + ' Form package Fail/new node Information of ' + key)
self.buffer_recent[key] = new_val
return msg_formed
# Update the recently received buffer list
def update_buffer_list(self,func_identifier, address_id_list):
size = len(self.mlist.lst)
if size >= 2:
dissemination_cnt = int(math.ceil((math.log(size,2))))
else:
dissemination_cnt = 1
for address_id in address_id_list:
logging.info(func_identifier + ' Check Recent Buffer for ' + address_id)
if address_id not in self.buffer_recent:
self.buffer_recent[address_id] = dissemination_cnt
logging.info(func_identifier + ' Write to dictionary key ' + address_id + ' value ' + str(dissemination_cnt) )
# Update the server list
def update_server_list(self):
for key,val in self.buffer_recent.items():
addr = key.split('_')
mel = getmel(addr[1])
# 01_failaddressid, 01_nodeleaveid
# Remove the fail address if it exists in membership list
if addr[0] == '01':
if mel in self.mlist.lst:
self.mlist.remove(mel)
self.fail_queue.put(addr[1])
#self.crash_fail.append(addr[1])
logging.info('Update membership list with removal of' + addr[1])
# 10_newnodeid, Add the new node if it is not in membership list already
elif addr[0] == '10':
if mel not in self.mlist.lst:
#Once a node is detected as fail, it is not added back
#if addr[1] not in crash_fail:
self.mlist.add(mel, addr[1].split('/')[-1])
logging.info('Update membership list with addition of' + addr[1])
# Garbage collection for buffer_recent
if val == 0:
self.buffer_recent.pop(key)
# Send ping
def send_ping(self,lock):
while True:
lock.acquire()
self.update_server_list()
lock.release()
# Choose a node and shuffle list
inds = range(len(self.mlist.lst))
random.shuffle(inds)
# Check size of membership list
size_mlist = len(self.mlist.lst)
# If len is greater than 0
if size_mlist > 0:
# Set SWIM established timeout
swim_timeout = 2.8/(2*size_mlist - 1)
else:
# Set SWIM established timeout
swim_timeout = 0.120
#logging.info('SWIM timeout = ' + str(swim_timeout))
# for address in self.server_list:
for idx in inds:
address = getmid(self.mlist, idx)
fail_indicator = False
fail_address = '01_' + address
# Do not send pings to already fail node
lock.acquire()
if fail_address in self.buffer_recent:
fail_indicator = True
lock.release()
if fail_indicator == True:
continue
# Set UDP Connection
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(swim_timeout)
lock.acquire()
ping_message = self.form_piggyback_packet('send_ping', 'p')
lock.release()
addr = address.split('/')
pkt = pickle.dumps({
'cmd': 'ping',
'data': ping_message,
'sender_host': self.host,
'sender_port': self.port,
'sender_timestamp': self.timestamp
})
sock_sent = sock.sendto(pkt, (addr[0],int(addr[1])))
data = ''
try:
ret_buf, server_identity = sock.recvfrom(8192)
except socket.timeout:
logging.info('ACK not received within timeout from node : ' + address)
address_id = []
address_id.append('01_' + address)
lock.acquire()
#logging.info('Update recent buffer from send_ping')
self.update_buffer_list('send_ping', address_id)
lock.release()
# Communicate Fail Address to Master
#self.fail_queue.put(address)
except (socket.error,socket.gaierror) as err_msg:
logging.error("Socket Error")
logging.exception(err_msg)
finally:
sock.close()
# Receive ping
def recv_ping(self, buf, sock, sender, sender_id):
lock = self.lock
ack_message = 'a'
lock.acquire()
# Create Acknowledgement
ack_message = self.form_piggyback_packet('recv_ping', 'a')
lock.release()
# Send Acknowledgement
sock.sendto(ack_message, sender)
data = buf.split(',')
lock.acquire()
# If ping was received from node not in mlist, add it to buffer_list
smel = getmel(sender_id)
if smel not in self.mlist.lst:
data.append('10_' + sender_id)
#logging.info('recv_ping_debug_statement ' + str(sender[0]))
self.update_buffer_list('recv_ping', data[1:])
lock.release()
# Start thread
def run(self):
try:
lock = threading.Lock()
self.lock = lock
ping_thread = threading.Thread(target=self.send_ping,args=(lock,))
ping_thread.daemon = True
ping_thread.start()
except(KeyboardInterrupt, SystemExit):
print("exiting all threads and main program")
# Main Function to connect and start logging
if __name__ == "__main__":
FORMAT = '%(asctime)-15s %(message)s'
logging.basicConfig(format = FORMAT, filename = "faildetector.log", level = logging.INFO, filemode = "w")
fail_detect = FailureDetector()
fail_detect.run()