Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple connections for boards with multiple ethernet connected chips #142

Open
mundya opened this issue May 15, 2015 · 10 comments · May be fixed by #224
Open

Multiple connections for boards with multiple ethernet connected chips #142

mundya opened this issue May 15, 2015 · 10 comments · May be fixed by #224

Comments

@mundya
Copy link
Member

mundya commented May 15, 2015

Not immediately required (it would be nice to have for Canada, possibly) but definitely necessary for Spaun.

As far as I can tell, the IP address of a chip can be read from the sv struct (ip_addr) and is, I assume, formatted the same way as IP addresses for IP tags (i.e., !4B, bewaring that the default appears to be 255.255.0.0).
The real challenge is in determining the co-ordinates of likely chips, though I guess a safe approach would be to always jump 8* (north or east) from the last found ethernet chip and then read eth_addr to get the co-ordinates of the nearest ethernet connected chip.

I initially thought that this process should be part of get_machine... but now I'm not so sure.

Once we have a map of co-ordinates to ethernet addresses it becomes a drop in replacement to MachineController.get_connection, though we should also expose the mapping so that other software (like Nengo) can transmit SDP to SpiNNaker efficiently.

* or 7?

Also: "bewaring" is an awesome word.

@mundya mundya added this to the 1.0 milestone May 15, 2015
@mossblaser
Copy link
Member

There is of course the issue of how you parallelise all your reads/writes (i.e. using multiple Ethernets at once) with a nice-ish API... Asyncio-type approaches seem to be the main candidate here but slooowww!

That aside, you may find the following functions from the defunct asyncio branch useful for getting the set of candidate ethernet-connected chips:

rig/rig/geometry.py

Lines 176 to 254 in b666e80

def spinn5_eth_coords(width, height):
"""Generate a list of board coordinates with Ethernet connectivity in a
SpiNNaker machine.
Specifically, generates the coordinates for the ethernet connected chips of
SpiNN-5 boards arranged in a standard torus topology.
Parameters
----------
width : int
Width of the system in chips.
height : int
Height of the system in chips.
"""
# Internally, work with the width and height rounded up to the next
# multiple of 12
w = ((width + 11) // 12) * 12
h = ((height + 11) // 12) * 12
for x in range(0, w, 12):
for y in range(0, h, 12):
for dx, dy in ((0, 0), (4, 8), (8, 4)):
nx = (x + dx) % w
ny = (y + dy) % h
# Skip points which are outside the range available
if nx < width and ny < height:
yield (nx, ny)
def spinn5_local_eth_coord(x, y, w, h):
"""Get the coordinates of a chip's local ethernet connected chip.
.. note::
This function assumes the system is constructed from SpiNN-5 boards
returns the coordinates of the ethernet connected chip on the current
board.
Parameters
----------
x : int
y : int
w : int
Width of the system in chips.
h : int
Height of the system in chips.
"""
dx, dy = SPINN5_ETH_OFFSET[y % 12][x % 12]
return ((x + dx) % w), ((y + dy) % h)
SPINN5_ETH_OFFSET = np.array([
[(vx - x, vy - y) for x, (vx, vy) in enumerate(row)]
for y, row in enumerate([
# Below is an enumeration of the absolute coordinates of the nearest
# ethernet connected chip. Note that the above list comprehension
# changes these into offsets to the nearest chip.
# X: 0 1 2 3 4 5 6 7 8 9 10 11 # noqa Y:
[(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 0
[(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 1
[(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 2
[(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 3
[(-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 4
[(-4, +4), (-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 5
[(-4, +4), (-4, +4), (-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 6
[(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 7
[(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4), (+8, +4), (+8, +4)], # noqa 8
[(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4), (+8, +4)], # noqa 9
[(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4)], # noqa 10
[(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)] # noqa 11
])
])
"""SpiNN-5 ethernet connected chip lookup.
Used by :py:func:`.spinn5_local_eth_coord`. Given an x and y chip position
modulo 12, return the offset of the board's bottom-left chip from the chip's
position.
Note: the order of indexes: ``SPINN5_ETH_OFFSET[y][x]``!
"""

When I was playing with this, I built the following machine discovery implementation:

def discover_connections(self):
"""Attempt to discover and use all available connections to a machine.
After calling this method, MachineController will attempt to use a
connection local to the destinations of commands. Additionally,
multiple connections may be used simultaneously when possible.
.. note::
A connection to chip (0, 0) must already be established and the
system must be booted for this command to succeed.
.. note::
At present this method only discovers Ethernet connections in
systems of SpiNN-5 boards.
Returns
-------
int
The number of new connections established.
"""
w, h = yield From(self.get_dimensions(async=True))
# Get the set of live chips so we don't bother using Ethernet connected
# chips which are dead or isolated from the network.
p2p_tables = yield From(self.get_p2p_routing_table(0, 0, async=True))
dead_chips = set(coord for (coord, route) in iteritems(p2p_tables)
if route == consts.P2PTableEntry.none)
# Check which of the potentially Ethernet connected chips have live
# Ethernet connections.
eth_up_cmds = []
for x, y in spinn5_eth_coords(w, h):
if (x, y) not in dead_chips:
eth_up_cmds.append(
self.read_struct_field("sv", "eth_up", x=x, y=y,
async=True))
eth_ups = yield From(gather_fail(*eth_up_cmds, loop=self.loop))
# Get IPs of the Ethernet connections which are up
xys = []
ip_addr_cmds = []
eth_up_iter = iter(eth_ups)
for x, y in spinn5_eth_coords(w, h):
if (x, y) not in dead_chips and next(eth_up_iter):
xys.append((x, y))
ip_addr_cmds.append(
self.read_struct_field("sv", "ip_addr", x=x, y=y,
async=True))
ip_addrs = yield From(gather_fail(*ip_addr_cmds, loop=self.loop))
# Build a dictionary {coord: ip_addr, ...} of IP addresses of Ethernet
# connected chips.
hosts = {}
for (x, y), eth_up, ip_addr in zip(xys, eth_ups, ip_addrs):
if eth_up:
# Convert IP addresses into string format
hosts[(x, y)] = ".".join(str((ip_addr >> (i * 8)) & 0xFF)
for i in range(4))
# Strip out connections which are already present
for coord in self.connections:
hosts.pop(coord, None)
# Attempt to open connections to each
connect_cmds = []
for coord, host in iteritems(hosts):
connect_cmds.append(
self.connect_to_hosts({coord: host}, async=True))
connect_results = yield From(trollius.gather(
*connect_cmds, loop=self.loop, return_exceptions=True))
bad_hosts = []
for (coord, host), connect_result in zip(iteritems(hosts),
connect_results):
if isinstance(connect_result, Exception):
bad_hosts.append(coord)
for bad_host in bad_hosts:
hosts.pop(bad_host, None)
# Make sure we can communicate via each new connection
test_cmds = []
for coord, host in iteritems(hosts):
test_cmds.append(
self.get_software_version(coord[0], coord[1], async=True))
test_results = yield From(trollius.gather(
*test_cmds, loop=self.loop, return_exceptions=True))
bad_hosts = []
for (coord, host), test_result in zip(iteritems(hosts), test_results):
if isinstance(test_result, Exception):
bad_hosts.append(coord)
# Close any connections which turned out bad
for bad_host in bad_hosts:
hosts.pop(bad_host, None)
self.connections.pop(bad_host).close()
# Return number of new, verified connections
raise Return(len(hosts))

If you're worrying about IP address and ethernet connectivity stuff, this code should have you covered. Note the sv->eth_up field for detecting up-ness and that the least-significant-eight-bits of the IP returned are the first field in a conventionally rendered IP address.

@mundya
Copy link
Member Author

mundya commented May 15, 2015

Awesome, thank you!

There is of course the issue of how you parallelise all your reads/writes (i.e. using multiple Ethernets at once) with a nice-ish API... Asyncio-type approaches seem to be the main candidate here but slooowww!

On my "to test" list is making SCPConnection thread safe and seeing what hellish results come from writing in two threads at once.

@mossblaser
Copy link
Member

Following from a recent offline discussion with @mundya about speeding up I/O for large machines using multiple Ethernet connections, the following conclusions were reached:

  • As ever, in principle, the only operation worth speeding up is read/write since all other operations are broadcast (and thus already fast enough) or for diagnostics only (and thus for interactive-style use and so already fast enough).
  • The "hard-bit" is extending the existing file-like metaphor without requiring apps to be reorganised. Using AsyncIO is probably not a workable option.

Possible options:

  • For writing:
    • The buffering mechanism can potentially be extended to allow a global "flush()" call to be made which uses as many Ethernet connections as available. Disadvantage is we end up having everything we want to load placed in memory at once.
    • To work-around the above disadvantage, we could add a call-back-on-write option which allows writes to be defined by an address, a length and a callback which is called when the data is actually required. This complicates life somewhat, of course.
  • For reading:
    • Conventional read interfaces clearly aren't going to cut it.
    • Could do call-back-on-read or read-into-buffer; obviously one is messier, the other requires everything being in memory at once and sounds just a little bit dangerous too.

Clearly all this requires some new centralised state:

  • Queues of outstanding reads/writes (one per chip or per connection) and what to do with them

This is going to prove faffy to get right...

@mossblaser
Copy link
Member

The following is an informal design document for multi-connection support. Feedback highly encouraged.

High-Throughput IO Plans

Getting data in and out of SpiNNaker machines with high throughput is very
important for large multi-board systems with potentially around six gigabytes
of data per board to be loaded and only a single fairly measly 100MBit/s
interface each. Rig's new SCP I/O implementation with windowing enables the
best possible throughput into a single Ethernet connection. In this PR I hope
to extend this to allowing multiple connections to several Ethernet connected
chips to be used. It is not anticipated that anything but bulk memory
read/writes will be required to perform high throughput I/O since all other
operations are either broadcast-like or diagnostic (and thus fast enough at the
moment).

How many connections?

Though ideally Rig should be able to drive all the Ethernet connections to an
arbitrarily large machine at once, in practice this is simply infeasible for
the following reasons:

  • The Python SCP implementation is likely to become CPU bound once a sufficient
    number of SCP connections are in use since individual SCP packets are very
    small.
  • A single Ethernet connection is more-or-less going to offer 1 GBit/s of I/O
    which is only enough to saturate 10 boards' Ethernet connections anyway
    (though in practice board Ethernet bandwidth is severely constrained anyway).

I'm not sure which of these two issues will strike first. Initially I'll hope
for the best and assume we get I/O bound before CPU bound.

Which connection should be used?

I would suggest that loading is always performed via the Ethernet connection on
the same board as the target processor. This choice has the advantage of being
easily resolved via a lookup table. When a connection is not present, two
options exist:

  1. Fall back on (0, 0)
  2. Fall back on the nearest working connection
  3. Fall back on a randomly chosen connection
  4. Fall back on a cyclically chosen connection

Option 1 is very easy but may lead to significant load imbalance: in principle
N dead Ethernet connections could mean N times longer load times.

Option 2 in principle is ideal but in practice is expensive since it requires
a fancy spatial lookup function to discover. A lookup table could be used but
this seems wasteful...

Option 3 and 4 potentially work reasonably well and probably differ mainly in
expense of computation. It probably isn't worth worrying about the difference
in cost...

Overall, option 3 is probably the easiest to implement and stands the best
chance of working.

How should multiple connections be used?

In principle it could be done with a single select statement and some clever
logic. Alternatively, we can run many instances of the current SCPConnection
objects in separate threads. If we're truly I/O bound these two approaches
differ only in the relative costs of Python vs OS level dispatching
performance.

If this were C, the select option would be the way to go but since this is
Python there's a decent chance the difference in performance could be
comparable. The threading option is also potentially much easier to implement
so I'll be going with that. If the SCP sending code proves too CPU intensive
(sigh...) multiprocessing may be an option if there turns out to be some
moderately efficient way to share memory.

I propose having a thread pool with each thread allocated a particular SCP
connection. When more SCP connections are present than threads, the remaining
connections (and their associated blocks of data) should be queued and given to
threads which have exhausted the traffic associated with their connection.

As for how to select the number of threads; initially this can just be fixed
but eventually some kind of control system could be set up...

How should data be supplied?

Two obvious possibilities exist:

  1. Data/free memory could be supplied in full up-front.
  2. Callbacks could be arranged which return/handle data as it is fetched.

The former is potentially the easiest to implement and has the cleanest API.
Unfortunately it may require your PC to have vast amounts of memory for very
large problems. Further, there is lots of potential for subtle bugs arising
from accidentally using the same memory twice or accessing uninitialised memory
buffers.

The later is much more fiddly to implement but obviously is more scalable since
clever users can process their data as it arrives. It would be possible to wrap
this approach to look like the first approach for ease of implementation. This
approach may also be marginally more costly due to the added function call
overhead.

I think due to safety and scalability I'll regrettably have to go with the
second option as first class citizen and the first as an alternative API.

How should errors be treated?

For a large portion of applications, a single error during read/write is
non-recoverable. This is a semi-easy case to deal with, made super-easy if any
data received so far can be safely ignored, e.g. if all that will happen is an
error message and an exit.

Sadly, for larger-scale applications, a partial failure may not be as bad as
complete failure. For example, when reading back the results of an experiment,
if a single core has died during the experiment, the results on other cores may
still be valid and worth downloading. In such a circumstance it is still
important to be able to determine what failed afterwards.

The correct solution here is probably to have two alternative modes of
operation.

How should the API look?

Files on Linux are famously poor at dealing with multiple asynchronous
reads/writes. On platforms where this is supported it is largely via
non-orthodox API. However this is done, it is probably going to result in the
File-like API being extended beyond typical file-like behaviour.

AsyncIO/Trollius could be used here however it has famously dreadful
performance and even more dreadful documentation. This type of abstraction also
has a tendency to leak out into applications where it is unwanted.

An idea:

# Thin-wrapper around the current interface; overrides mc.read and mc.write
# such that they queue up reads/writes. For reads they also return a new
# buffer with garbage in it.
mc.begin_burst()

mc.write(...)
data0 = mc.read(...)

file_like.write(...)
file_like.write(...)
file_like.flush()

data1 = file_like.read()

# All the above actions actually take place now and in parallel. If
# something goes wrong, you get an exception and the above actions
# may/may-not have been called.
mc.end_burst()

The above works nicely for upgrading existing applications where you can afford
the memory overhead of having everything in RAM at once. To extend the APIs
somewhat we could make:

  • write() accept a callable and length in place of some bytes.
  • read() accept a callable as an extra argument and not return

With this modified API, callback style usage could be achieved at the expense
of the file-like API being no-longer file-like. To properly support this, the
write buffering/coalescing behaviour of MemoryIO will need modifying to support
coalescing callbacks. Outside burst-mode, the callbacks will simply get called
immediately.

By default, if an error occurs (including in a callback function) the end_burst
method will simply propagate that error stopping all other activity as cleanly
and quickly as possible. Some callbacks may not be called and some read data
may not have been placed in the reserved buffers (leaving their contents
undefined).

Alternatively, end_burst could accumulate exceptions and tracebacks, continuing
as far as possible, and return a list of (callback, exception, traceback)
tuples. Obviously, the callback object won't make sense unless you're using
your own callback system but given that this is a sufficiently advanced mode of
operation, this is probably what should be used anyway.

What will happen to the internals?

  1. The begin_burst function will simply set a flag.
  2. A set of read_into() methods will need to be added to allow creation of the
    read buffer in advance.
  3. For each connection there will be a queue of read/write requests.
  4. All read/write requests should be wrapped in a lambda(?) and placed in the
    buffer. For conventional reads, a buffer should be created and then
    read_into called with that buffer.
  5. When not in a burst, the buffer will be emptied immediately (or a fast path
    used...).
  6. The end_burst function will spawn some threads and in each thread work on
    processing the queues.

@mossblaser
Copy link
Member

An alternative API idea:

  • MachineController.read/write could accept an argument which optionally postpones the read/write.
  • When MemoryIOs are created, the above argument can be chosen which then applies to all uses of that memoryio.

This alternative has the following advantages:

  • Internal use of read/write (e.g. reading struct variables) don't need to be modified to assert that burst mode is not being used
  • It makes the whole thing a lot more explicit without too much hassle (and can even be enabled via the context manager!). This is good since things like reads producing garbage are a ripe source of bugs for beginners!
  • Reduces the amount of "leaky" state (added state is now reduced to just the set of queues)

@mundya mundya linked a pull request Feb 23, 2016 that will close this issue
7 tasks
@mundya
Copy link
Member Author

mundya commented Mar 15, 2016

An alternative API idea:

  • MachineController.read/write could accept an argument which optionally postpones the read/write.
  • When MemoryIOs are created, the above argument can be chosen which then applies to all uses of that memoryio.

I like this, with the modification that the additional argument to read and write is a callback function. If the function is None (the default) then the read/write is blocking, otherwise the read/write is asynchronous and the callback is called on completion with the read data/number of bytes written. MemoryIO can just expose these arguments.

We can add flush, or similar, to the machine controller to block until all queued events are processed.

Thoughts? The advantages are the same as above but with fewer API changes (and with complete backwards compatibility).

@mossblaser
Copy link
Member

What would your use-case for the callback function be? My understanding of most loads is that a sync/flush function is the simplest thing which could possibly work and should result in minimal changes to existing code. Certainly much simpler than callbacks etc. for this style of application.

My reluctance to exposing a callback is that the "write and flush/sync later " API does not imply any sort of asynchronous programming mechanism while a callback does and I don't have a good feeling about that given Python's lack of a good defacto asynchronous programming mechanism.

@mundya
Copy link
Member Author

mundya commented Mar 15, 2016

Which callback? The read callback I can see being particularly handy (over, say read_into and sync, where I have to suspend a load of operations until the sync completes).

The write callback was mostly for symmetry over just saying write(..., async=True). I suspect that in most cases the write callback probably would be a NOP but some particularly pedantic people might want to check that they did actually write as much as they intended.

@mundya
Copy link
Member Author

mundya commented Mar 15, 2016

[I guess write(..., immediate=False) might be nearer the truth...]

@mossblaser
Copy link
Member

The read callback I can see being particularly handy (over, say read_into and sync, where I have to suspend a load of operations until the sync completes).

Indeed, but if we provide the callback we are introducing an implied concurrency model to the user's application. This is not a step I'd take too lightly and as mentioned there is no clear "good solution" here. I can't think of an application where you'd want to be notified when some (but not all) of your reads have completed -- what sort of thing are you thinking of?

The write callback was mostly for symmetry over just saying write(..., async=True). I suspect that in most cases the write callback probably would be a NOP but some particularly pedantic people might want to check that they did actually write as much as they intended.

Fair, though my plan was to have exceptions in read/writes to pop out of the sync function to prevent silent errors in most apps. Having to write a NOP function is also vaguely irritating ;).

@mundya mundya modified the milestone: 1.0 Mar 17, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants