Skip to content

Commit

Permalink
Introduce protocol versioning (#29)
Browse files Browse the repository at this point in the history
Introduce protocol versioning and update to protocol v3
  • Loading branch information
papr authored and mkassner committed May 19, 2017
1 parent 799e184 commit 6ae5ec0
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 60 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ __pycache__/
# C extensions
*.so
*.c
*.cpp

# Distribution / packaging
.Python
Expand Down
22 changes: 12 additions & 10 deletions examples/uvc-ndsi-bridge-host.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import struct
import time

from pyre import Pyre, PyreEvent
from pyre import Pyre
import uvc
import hashlib

Expand All @@ -20,6 +20,8 @@

sequence_limit = 2**32-1

GROUP = 'pupil-mobile-v3'


def has_data(socket):
return socket.get(zmq.EVENTS) & zmq.POLLIN
Expand All @@ -39,6 +41,7 @@ def __init__(self, uvc_id):

# init pyre
self.network = Pyre(socket.gethostname()+self.cap.name[-4:])
self.network.join(GROUP)
self.network.start()
logger.info('Bridging under "{}"'.format(self.network.name()))

Expand All @@ -52,7 +55,7 @@ def __init__(self, uvc_id):

def loop(self):
logger.info('Entering bridging loop...')
self.network.shout('pupil-mobile', self.sensor_attach_json().encode())
self.network.shout(GROUP, self.sensor_attach_json().encode())
try:
while True:
self.poll_network()
Expand All @@ -65,36 +68,35 @@ def loop(self):
import traceback
traceback.print_exc()
finally:
self.network.shout('pupil-mobile', json.dumps({
self.network.shout(GROUP, json.dumps({
'subject': 'detach',
'sensor_uuid': self.network.uuid().hex
}).encode())
logger.info('Leaving bridging loop...')

def publish_frame(self):
frame = self.cap.get_frame_robust()
now = int(time.time()*1000000)
now = time.time()
index = self.data_seq
self.data_seq += 1
self.data_seq %= sequence_limit

jpeg_buffer = frame.jpeg_buffer
m = hashlib.md5(jpeg_buffer)
lower_end = int(m.hexdigest(), 16) % 0x100000000
meta_data = struct.pack('<LLLLQLL', 0x10, frame.width, frame.height, index, now, jpeg_buffer.size, lower_end)
meta_data = struct.pack('<LLLLdLL', 0x10, frame.width, frame.height, index, now, jpeg_buffer.size, lower_end)
self.data.send_multipart([self.network.uuid().hex.encode(), meta_data, jpeg_buffer])

def poll_network(self):
while has_data(self.network.socket()):
event = PyreEvent(self.network)
if event.type == 'JOIN' and event.group == 'pupil-mobile':
for event in self.network.recent_events():
if event.type == 'JOIN' and event.group == GROUP:
self.network.whisper(event.peer_uuid, self.sensor_attach_json().encode())

def poll_cmd_socket(self):
while has_data(self.cmd):
sensor, cmd_str = self.cmd.recv_multipart()
try:
cmd = json.loads(cmd_str)
cmd = json.loads(cmd_str.decode())
except Exception as e:
logger.debug('Could not parse received cmd: {}'.format(cmd_str))
else:
Expand Down Expand Up @@ -155,7 +157,7 @@ def frame_size_control_json(self):
"readonly": False,
"map": [{
'value': idx,
'caption': '{:i}x{:i}'.format(*fs)
'caption': '{:d}x{:d}'.format(*fs)
} for idx, fs in enumerate(self.cap.frame_sizes)]
}
})
Expand Down
60 changes: 26 additions & 34 deletions NDSI-CommSpecs.md → ndsi-commspec.md
Original file line number Diff line number Diff line change
@@ -1,55 +1,47 @@
# Network Device Sensor Interface Protocol Specification v2.13
# Network Device Sensor Interface Protocol Specification

Status: draft
Protocol version: v3
Protocol status: draft


Network Device Sensor Interface protocol specifies the communication
between *Pupil Mobile* and the *Network Device Sensor Interface*.
between a set of hosts that provide sensor information to a set of clients.
Examples for these include [Pupil Mobile](https://github.com/pupil-labs/pupil-mobile-app) as host and [Pupil Capture](https://github.com/pupil-labs/pupil) as client.


The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "MAY", and "OPTIONAL" in this document are to be interpreted as described in [RFC 2119](https://tools.ietf.org/html/rfc2119).

## Control

### Host vs Clients
### Actors

NDSI actors (**Hosts** and **Clients**) find each other using the [ZeroMQ Realtime Exchange Protocol](https://rfc.zeromq.org/spec:36/ZRE). We recommend the usage of existing libraries (e.g. [zyre](https://github.com/zeromq/zyre), [Pyre](https://github.com/zeromq/pyre)) that implement the ZRE protocol. See the protocol spec for definitons of SHOUT, WHISPER, and join.

All actors MUST join the ZRE group `pupil-mobile-v3` -- hereinafter referred to as _GROUP_.

**Hosts** (e.g. Android app):

- **Hosts** SHOULD NOT join `pupil-mobile`.
- **Hosts** MUST SHOUT `<attach>` and `<detach>` notifications to
`pupil-mobile`.
- **Hosts** MUST WHISPER all currently available `sensor`s as a series
of `<attach>` notifications when a **client** joins `pupil-mobile`.
- **Hosts** SHOULD ignore incoming SHOUT and WHISPER messages.
- **Hosts** MUST SHOUT `<attach>` and `<detach>` notifications to the GROUP when sensors become available or unavailable respectively.
- **Hosts** MUST WHISPER all currently available `sensor`s as a series of `<attach>` notifications when a **client** joins the GROUP.
- **Hosts** MUST open at least one socket for each following type:
- **Notify** `zmq.PUB` socket, publishes `sensor` specific control
notifications (`update` and `remove`), randomly choosen port
- **Command** `zmq.PULL` socket, receives `sensor` specific commands,
randomly choosen port
- **Data** `zmq.PUB` socket, publishes stream data, format depends on
`sensor` type, randomly choosen port
- **Notify** `zmq.PUB` socket, publishes `sensor` specific control notifications (`update` and `remove`), randomly choosen port
- **Command** `zmq.PULL` socket, receives `sensor` specific commands, randomly choosen port
- **Data** `zmq.PUB` socket, publishes stream data, format depends on `sensor` type, randomly choosen port
- All messages send over these sockets MUST follow the format described below in **Sensor Messages**
- **Hosts** MUST listen for messages on the **command** socket.
- **Hosts** MUST publish all `control` state changes over its **notify**
socket.
- **Hosts** MUST respond to `<refresh_controls>` by publishing all available
`control` states as a series of `<control_update>`
- **Hosts** MUST publish all `control` state changes over its **notify** socket.
- **Hosts** MUST respond to `<refresh_controls>` by publishing all available `control` states as a series of `<control_update>`

**Clients** (e.g. the `ndsi` library)
**Clients** (e.g. the `pyndsi` library)

- **Clients** MUST join `pupil-mobile`.
- **Clients** MUST listen to incoming SHOUT and WHISPER messages.
- Messages including invalid `json` SHOULD be dropped (silently).
- **Clients** SHOULD maintain a list of available `sensor`s including
their static information (this includes especially the unique identifier
defined by the **host**, see **Sensor Messages** below).
- **Clients** SHOULD maintain a list of available `sensor`s including their static information (this includes especially the unique identifier defined by the **host**, see **Sensor Messages** below).
- To receive control updates of a specific `sensor`, **Clients** MUST:
1. Create a `zmq.SUB` socket, connected to
`notify_endpoint`,
2. Subscribe to the `sensor`s unique identifier
(`zmq_setsockopt(<socket>,ZMQ_SUBSCRIBE,<unique identifier>)`) and
start listening for *update* and *remove* notifications.
3. Create a `zmq.PUSH` socket, connected to `command_endpoint` (see
`<attach>` below), send `<refresh_controls>` command.
1. Create a `zmq.SUB` socket, connected to `notify_endpoint`,
2. Subscribe to the `sensor`s unique identifier (`zmq_setsockopt(<socket>,ZMQ_SUBSCRIBE,<unique identifier>)`) and start listening for *update* and *remove* notifications.
3. Create a `zmq.PUSH` socket, connected to `command_endpoint` (see `<attach>` below), send `<refresh_controls>` command.

- All messages send over these sockets MUST follow the format described below in **Sensor Messages**

Expand All @@ -74,7 +66,7 @@ Each host has multiple `sensor` instances. These can be of different types. The

All sensor related messages MUST be zeromq multi-part messages with at least
two frames. The first frame MUST include the `sensor`'s unique identifier and
the second the content of a notification or a command. The unique identfier MUST be formatted as an unicode string.
the second the [`json`](http://www.json.org/)-encoded content of a notification or a command. The unique identfier MUST be formatted as an unicode string.

Note: The corresponding value of missing message keys SHALL be handled as `null`.

Expand Down Expand Up @@ -278,7 +270,7 @@ typedef struct publish_header {
uint32_t width_le;
uint32_t height_le;
uint32_t sequence_le;
int64_t presentation_time_us_le;
float64_t presentation_time_s_le;
uint32_t data_bytes_le;
uint32_t reserved_le;
} __attribute__ ((packed)) publish_header_t;
Expand Down Expand Up @@ -316,7 +308,7 @@ typedef struct audio_header {
uint32_t format_le; // PCM8, PCM16, etc., usually use PCM8 on most of Android devices.
uint32_t channel_le; // 1 or 2, but most of Android devices just support 1
uint32_t sequence_le;
int64_t presentation_time_us_le;
float64_t presentation_time_s_le;
uint32_t data_bytes_le;
} __attribute__ ((packed)) audio_header;
```
Expand Down
3 changes: 2 additions & 1 deletion ndsi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
----------------------------------------------------------------------------------~(*)
'''

NDS_PROTOCOL_VERSION = '0.2.16'
__version__ = '0.3.1'
__protocol_version__ = '3'


class CaptureError(Exception):
Expand Down
8 changes: 2 additions & 6 deletions ndsi/frame.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ class InitError(CaptureError):
super(InitError, self).__init__(message)
self.message = message

@staticmethod
def unpack_metadata(packed_metadata):
return struct.unpack("<LLLLQL", packed_metadata)


cdef class JPEGFrame(object):
'''
Expand Down Expand Up @@ -107,7 +103,7 @@ cdef class JPEGFrame(object):
self._index = index
self._buffer_len = data_len
self._raw_data = zmq_frame
self.timestamp = (<double>timestamp)/1000000
self.timestamp = timestamp

cdef attach_tj_context(self, turbojpeg.tjhandle ctx):
self.tj_context = ctx
Expand Down Expand Up @@ -307,7 +303,7 @@ cdef class H264Frame(object):
self._buffer_len = data_len
self._yuv_buffer = yuv_buffer
self._h264_buffer = np.fromstring(h264_buffer, dtype=np.uint8)
self.timestamp = (<double>timestamp)/1000000
self.timestamp = timestamp

cdef attach_tj_context(self, turbojpeg.tjhandle ctx):
self.tj_context = ctx
Expand Down
3 changes: 2 additions & 1 deletion ndsi/network.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cdef class Network(object):
cdef object context
cdef unicode name
cdef object headers
cdef bint _warned_once_older_version, _warned_once_newer_version
cdef readonly list callbacks
cdef readonly dict sensors
cdef readonly object pyre_node
cdef readonly object pyre_node
30 changes: 24 additions & 6 deletions ndsi/network.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import traceback as tb
import zmq
from pyre import Pyre, PyreEvent, zhelper

from . import NDS_PROTOCOL_VERSION
from . import __protocol_version__
from .sensor cimport Sensor

import logging
Expand All @@ -28,16 +28,21 @@ cdef class Network(object):
Creates Pyre node and handles all communication.
'''

group = 'pupil-mobile-v{}'.format(__protocol_version__)

def __cinit__(self, *args, **kwargs):
pass

def __init__(self, context=None, name=None, headers=(), callbacks=()):
self.name = name
self.headers = [('nds-protocol-version', NDS_PROTOCOL_VERSION)]+list(headers)
self.headers = headers
self.pyre_node = None
self.context = context or zmq.Context()
self.sensors = {}
self.callbacks = [self.on_event]+list(callbacks)
self._warned_once_older_version = False
self._warned_once_newer_version = False

def start(self):
# Setup node
Expand Down Expand Up @@ -95,6 +100,23 @@ cdef class Network(object):
logger.debug('Unknown host message: {}'.format(msg))
return
self.execute_callbacks(msg)
elif event.type == 'JOIN':
# possible values for `group_version`
# - [<unrelated group>]
# - [<unrelated group>, <unrelated version>]
# - ['pupil-mobile']
# - ['pupil-mobile', <version>]
group_version = event.group.split('-v')
group = group_version[0]
version = group_version[1] if len(group_version) > 1 else '0'
if group == 'pupil-mobile':
if not self._warned_once_older_version and version < __protocol_version__:
logger.warning('Devices with outdated NDSI version found. Please update these devices.')
self._warned_once_older_version = True
elif not self._warned_once_newer_version and version > __protocol_version__:
logger.warning('Devices with newer NDSI version found. You should update.')
self._warned_once_newer_version = True

elif event.type == 'EXIT':
gone_peer = event.peer_uuid.hex
for sensor_uuid in list(self.sensors.keys()):
Expand Down Expand Up @@ -141,7 +163,3 @@ cdef class Network(object):
property running:
def __get__(self):
return bool(self.pyre_node)

property group:
def __get__(self):
return 'pupil-mobile'
2 changes: 1 addition & 1 deletion ndsi/sensor.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ cdef class Sensor(object):
if self.data_sub.poll(timeout=timeout):
while self.has_data:
data_msg = self.get_data(copy=True)
meta_data = py_struct.unpack("<LLLLQLL", data_msg[1])
meta_data = py_struct.unpack("<LLLLdLL", data_msg[1])
if meta_data[0] == VIDEO_FRAME_FORMAT_MJPEG:
return create_jpeg_frame(data_msg[2], meta_data)
elif meta_data[0] == VIDEO_FRAME_FORMAT_H264:
Expand Down
25 changes: 24 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,34 @@
import platform
import numpy
import glob
import os
import io
import re

from distutils.core import setup
from distutils.extension import Extension
from Cython.Build import cythonize


def read(*names, **kwargs):
with io.open(
os.path.join(os.path.dirname(__file__), *names),
encoding=kwargs.get("encoding", "utf8")
) as fp:
return fp.read()


# pip's single-source version method as described here:
# https://python-packaging-user-guide.readthedocs.io/single_source_version/
def find_version(*file_paths):
version_file = read(*file_paths)
version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
version_file, re.M)
if version_match:
return version_match.group(1)
raise RuntimeError("Unable to find version string.")


libs = []
library_dirs = []
include_dirs = []
Expand Down Expand Up @@ -83,7 +106,7 @@
language='c++')]

setup(name="ndsi",
version="0.2.16", # make sure this is the same as in ndsi/__init__.py
version=find_version('ndsi', '__init__.py'),
description="Remote Device Sensor Interface",
packages=['ndsi'],
ext_modules=cythonize(extensions))

0 comments on commit 6ae5ec0

Please sign in to comment.