Skip to content

Commit

Permalink
moving asyncserver to tornado
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabio Gibson committed May 16, 2017
1 parent 818cc92 commit 7e38b88
Show file tree
Hide file tree
Showing 14 changed files with 254 additions and 408 deletions.
4 changes: 1 addition & 3 deletions livesync/asyncserver/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
# This code was originally written by Johan Hanssen Seferidis
# available at https://github.com/Pithikos/python-websocket-server
from .server import WebsocketServer
from .server import LiveSyncSocketServer
from .hub import Hub
6 changes: 1 addition & 5 deletions livesync/asyncserver/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,10 @@


def dispatch(event):
uri = "ws://{host}:{port}".format(
uri = "ws://localhost:{port}".format(
host=settings.DJANGO_LIVESYNC['HOST'],
port=settings.DJANGO_LIVESYNC['PORT'])

connection = create_connection(uri)
connection.send(json.dumps(EVENTS[event]))
connection.close()


def dispatch_async(event):
Timer(1, dispatch, args=(event,)).start()
208 changes: 41 additions & 167 deletions livesync/asyncserver/handler.py
Original file line number Diff line number Diff line change
@@ -1,180 +1,54 @@
import re
import sys
import struct
from base64 import b64encode
from hashlib import sha1
from .utils import try_decode_UTF8, encode_to_UTF8
import uuid
import json
from tornado.websocket import WebSocketHandler
from .hub import Hub


if sys.version_info[0] < 3:
from SocketServer import StreamRequestHandler
else:
from socketserver import StreamRequestHandler
class LiveSyncSocketHandler(WebSocketHandler):
current_url = None
session_id = uuid.uuid4()

def __init__(self, *args, **kwargs):
super(LiveSyncSocketHandler, self).__init__(*args, **kwargs)
self.id = None

FIN = 0x80
OPCODE = 0x0f
MASKED = 0x80
PAYLOAD_LEN = 0x7f
PAYLOAD_LEN_EXT16 = 0x7e
PAYLOAD_LEN_EXT64 = 0x7f
def check_origin(self, origin):
return True

OPCODE_CONTINUATION = 0x0
OPCODE_TEXT = 0x1
OPCODE_BINARY = 0x2
OPCODE_CLOSE_CONN = 0x8
OPCODE_PING = 0x9
OPCODE_PONG = 0xA
@property
def url(self):
return self.request.headers.get("Origin", "") + '/'

def open(self):
self.id = self.get_query_argument('client_id', uuid.uuid4())
Hub.register(self)

class WebSocketHandler(StreamRequestHandler):
def __init__(self, socket, addr, server):
self.keep_alive = True
self.valid_client = False
self.handshake_done = False
self.handlers = {
OPCODE_TEXT: self.receive_message,
OPCODE_PING: self.send_pong,
OPCODE_PONG: self.receive_pong,
}
StreamRequestHandler.__init__(self, socket, addr, server)
if str(self.session_id) != self.get_query_argument('session_id', None):
self.write_message(json.dumps({
'action': 'welcome',
'payload': {
'client_id': str(self.id),
'session_id': str(self.session_id),
'current_url': self.current_url,
},
}))
else:
self.write_message(json.dumps({
'action': 'rejoin',
'payload': {},
}))

def handle(self):
while self.keep_alive:
if not self.handshake_done:
self.handshake_done = handshake(self.request)
self.valid_client = True
Hub.register(self)
elif self.valid_client:
self.read_next_message()
if not LiveSyncSocketHandler.current_url:
LiveSyncSocketHandler.current_url = self.url

def read_next_message(self):
try:
b1, b2 = read_bytes(self.rfile, 2)
except ValueError:
b1, b2 = 0, 0
def on_connection_close(self):
Hub.remove(self.id)

opcode = b1 & OPCODE
masked = b2 & MASKED
payload_length = b2 & PAYLOAD_LEN
def on_message(self, message):
message_json = json.loads(message)
action = message_json.get('action')

if not b1 or opcode == OPCODE_CLOSE_CONN or not masked:
self.keep_alive = 0
return
if action == "redirect":
LiveSyncSocketHandler.current_url = message_json.get('url')

opcode_handler = self.handlers.get(opcode)
decoded = decode_message(self.rfile, payload_length)
opcode_handler(decoded)

def receive_pong(self, msg):
pass

def receive_message(self, message):
Hub.echo(sender=self, msg=message)

def send_pong(self, message):
self.send_text(message, OPCODE_PONG)

def send_text(self, message, opcode=OPCODE_TEXT):
payload = create_payload(message)

if not payload:
return

header = create_header(len(payload), opcode)
self.request.send(header + payload)

def finish(self):
Hub.remove(self)


def handshake(request):
message = request.recv(1024).decode().strip()
upgrade = re.search(r'\nupgrade[\s]*:[\s]*websocket', message.lower())

if not upgrade:
return False

key = re.search(r'\n[sS]ec-[wW]eb[sS]ocket-[kK]ey[\s]*:[\s]*(.*)\r\n', message)

if key:
key = key.group(1)
else:
return False

response = make_handshake_response(key)
return request.send(response.encode())


def create_payload(message):
# Validate message
if isinstance(message, bytes):
message = try_decode_UTF8(message) # this is slower but ensures we have UTF-8
if not message:
return
elif isinstance(message, str) or isinstance(message, unicode):
pass
else:
return

return encode_to_UTF8(message)


def create_header(payload_length, opcode):
header = bytearray()
header.append(FIN | opcode)

# Normal payload
if payload_length <= 125:
header.append(payload_length)
# Extended payload
elif payload_length >= 126 and payload_length <= 65535:
header.append(PAYLOAD_LEN_EXT16)
header.extend(struct.pack(">H", payload_length))
# Huge extended payload
elif payload_length < 18446744073709551616:
header.append(PAYLOAD_LEN_EXT64)
header.extend(struct.pack(">Q", payload_length))
else:
raise Exception("Message is too big. Consider breaking it into chunks.")

return header


def decode_message(rfile, payload_length):
if payload_length == 126:
payload_length = struct.unpack(">H", rfile.read(2))[0]

elif payload_length == 127:
payload_length = struct.unpack(">Q", rfile.read(8))[0]

masks = read_bytes(rfile, 4)
decoded = ""

for char in read_bytes(rfile, payload_length):
char ^= masks[len(decoded) % 4]
decoded += chr(char)

return decoded


def read_bytes(rfile, num):
_bytes = rfile.read(num)
return _bytes if sys.version_info[0] > 2 else [ord(b) for b in _bytes]


def make_handshake_response(key):
return \
'HTTP/1.1 101 Switching Protocols\r\n'\
'Upgrade: websocket\r\n' \
'Connection: Upgrade\r\n' \
'Sec-WebSocket-Accept: %s\r\n' \
'\r\n' % calculate_response_key(key)


def calculate_response_key(key):
uid = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
hash_key = sha1(key.encode() + uid.encode())
response_key = b64encode(hash_key.digest()).strip()
return response_key.decode('ASCII')
Hub.echo(self.id, message)
22 changes: 13 additions & 9 deletions livesync/asyncserver/hub.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
class Hub(object):
clients = set()
@staticmethod
def clients():
if not hasattr(Hub, '_clients'):
Hub._clients = dict()
return Hub._clients

@staticmethod
def register(client):
Hub.clients.add(client)
Hub.clients()[client.id] = client

@staticmethod
def remove(client):
if client in Hub.clients:
Hub.clients.remove(client)
def remove(client_id):
if client_id in Hub.clients():
del Hub.clients()[client_id]

@staticmethod
def echo(sender, msg):
for client in Hub.clients:
if client != sender:
client.send_text(msg)
def echo(sender_id, msg):
for client in Hub.clients().values():
if client.id != sender_id:
client.write_message(msg)
60 changes: 27 additions & 33 deletions livesync/asyncserver/server.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,35 @@
import sys
from livesync.asyncserver.handler import WebSocketHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
from .handler import LiveSyncSocketHandler
from .dispatcher import dispatch
from socket import error
import time

if sys.version_info[0] < 3:
from SocketServer import ThreadingMixIn, TCPServer
else:
from socketserver import ThreadingMixIn, TCPServer

class LiveSyncSocketServer(Application):
def __init__(self,port=9001):
self.port = port
super(LiveSyncSocketServer, self).__init__([(r"/", LiveSyncSocketHandler)])

class WebsocketServer(ThreadingMixIn, TCPServer):
"""
A websocket server waiting for clients to connect.
def server_close(self):
IOLoop.instance().stop()
IOLoop.clear_instance()

Args:
port(int): Port to bind to
host(str): Hostname or IP to listen for connections. By default 127.0.0.1
is being used. To accept connections from any client, you should use
0.0.0.0.
Properties:
clients(list): A list of connected clients. A client is a dictionary
like below.
{
'id' : id,
'handler' : handler,
'address' : (addr, port)
}
"""
allow_reuse_address = True
daemon_threads = True # comment to keep threads alive until finished

def __init__(self, port=9001, host='127.0.0.1'):
TCPServer.__init__(self, (host, port), WebSocketHandler)

def run_forever(self):
def start(self, started_event=None):
try:
self.serve_forever()
self.listen(self.port)
if started_event:
# inform the main thread the server could be started.
started_event.set()
IOLoop.instance().start()
except KeyboardInterrupt:
self.server_close()
except:
exit(1)
sys.exit(0)
except error as err:
if err.errno == 98:
time.sleep(1)
dispatch('refresh')
sys.exit(0)
else:
sys.exit(1)
12 changes: 0 additions & 12 deletions livesync/asyncserver/utils.py

This file was deleted.

Loading

0 comments on commit 7e38b88

Please sign in to comment.