pure python implementation, many application layer algorithms already implemented
say, there are 3 tasks that need to be performed
- synchronous model
~~~~~~~~~~~~~~~~~~~~ they can be done one after the other, this is the synchronous model. simplest model, but slow
- multi-thread/processes
~~~~~~~~~~~~~~~~~~~~~~~~~ then, there is a multi-threaded model. in which each task runs in it’s own thread and all get executed in parallel (concurrently) (the OS, if there aren’t that many cores, may interleave the threads, but that is another topic) the tasks can also be implemented as multi-processes
- communication and coordination b/w threads a little tricky
- asynchronous model
~~~~~~~~~~~~~~~~~~~~~ one thread, but the tasks’ execution interleaved. only one executing at a point of time. (this is essentially what happens if your pc has only one processor (one core) even in 2nd model)
here, one task here runs until it explicitly relinguishes control to other tasks (unlike in the 2nd model where the os decides which thread is active and for how long)
the 3rd model will be faster and provide a better ux if the 3 tasks have a lot of blocking calls (eg, i/o) ~~~~~
a simple blocking server in python
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((options.iface, options.port or 0))
sock.listen(5) while True: sock, addr = sock.accept()
print ‘Somebody at %s wants poetry!’ % (addr,) inputf = open(poetry_file)
while True: bytes = inputf.read(num_bytes) if not bytes: # no more poetry :( sock.close() inputf.close() return print ‘Sending %d bytes’ % len(bytes) try: sock.sendall(bytes) # this is a blocking call except socket.error: sock.close() inputf.close() return
time.sleep(delay)
we need to provide the file containing poetry typical commands we used for devknox sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((options.iface, options.port or 0))
sock.listen(5) try: sock.sendall(bytes) # this is a blocking call except socket.error: sock.close() inputf.close()
just that this is a server listening for client connection and sending to client
you can do netcat localhost <port_no> and it will send the poem to the terminal
by default it listens on the local “loopback” interface (which is the localhost)
now, a client to get the poem from the server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(address)
poem = ”
while True:
data = sock.recv(1024) if not data: sock.close() break poem += data return poem
if you give it multiple servers to get the data from, it will get it first from one server completely, then move on to the next one. even if there is a long blocking call (the server just sleeps for some time, the client waits)
this is a blocking client. now the non-blocking client say, you start it by giving it a list of servers(port numbers) to get the data from and for each port number, get a socket connection
sockets = [] for address in adresses: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(address) sock.setblocking(0) //the socket is made non-blocking sockets.append(sock)
now, we maintain a dict for poem from each server
poems = dict.fromkeys(sockets, ”) # socket -> accumulated poem (( a = range(3) dict.fromkeys(a) {0: None, 1: None, 2: None} ))
sockets = list(sockets) # make a copy
while sockets:
rlist, _, _ = select.select(sockets, [], [])
for sock in rlist: data = ” while True: try: new_data = sock.recv(1024) except socket.error, e: if e.args[0] == errno.EWOULDBLOCK:
break raise else: if not new_data: break else: data += new_data
if not data: sockets.remove(sock) sock.close() print ‘Task %d finished’ % task_num else: addr_fmt = format_address(sock.getpeername()) msg = ‘Task %d: got %d bytes of poetry from %s’ print msg % (task_num, len(data), addr_fmt) poems[sock] += data return poems
this won’t stop for the server, it will do the task asynchronously the be pedantic, the print statement is a blocking call; we should use nonblocking i/o also, twisted has features for that
the main thing here is the client connects to all three at a single time (it has 3 sockets which it uses to get the data from the 3 servers)
the sockets are placed in the non-blocking mode by sock.setblocking(0) so, when the server blocks, the socket raises the EWOULDBLOCK error and so, we break and select some other socket (it is chosen by select.select())
documentation for select.select select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
Wait until one or more file descriptors are ready for some kind of I/O. The first three arguments are sequences of file descriptors to be waited for: rlist – wait until ready for reading wlist – wait until ready for writing xlist – wait for an “exceptional condition”
so, it just blocks until we have a socket that can read from the server. this repeats until all the sockets have been closed
this non-blocking client is much faster
there is an outer loop that waits on the sockets, all at once and when one blocks, puts another one in to get data
this is the reactor pattern: the use of a loop that waits for events to happen, and then handles them reactor because it waits and then reacts to events. (it is aka event loop)
the twisted is just that, a reactor pattern with a lot of popular protocols implemented and ready to work out of box
we can write this client using twisted. but first some demo programs:
- run the reactor
from twisted.reactor import reactor reactor.run()
normally, we would give it a list of sockets to monitor for i/o so, the reactor is just a fancy select.select! we just import the reactor, we don’t need to create it. it is a singleton, there is only one reactor object which is created when you import it
there are many types of reactors, eg: twisted.internet.pollreactor (it uses the poll system call, man poll) from t.i import pollreactor pollreactor.install()
- callWhenRunning
def hello(): print ‘Hello from the reactor loop!’
from twisted.internet import reactor reactor.callWhenRunning(hello) print ‘Starting the reactor.’ reactor.run()
this will print starting the reactor hello from the reactor loop (and then it just sits there)
note we add the hello function as the callback the reactor pattern is single-threaded the reactor is in control of the thread, but our code is executed when some event happens and our function is assigned as the callback. there is only one thread, so when our callback function runs, the reactor loop doesn’t don’t have any blocking calls in the callback! so, if you want to launch a subprocess, don’t use os.subprocess, because it is blocking, use the twisted api for doing that
nice example: say, you have a function that needs to count down from 5 in your callback, don’t block like this:
def blah(): for i in range(5): print i time.sleep(5)
WRONG 🔝 RIGHT :bottom:
class Blah: i = 5 def blah(): if self.i == 0: reactor.stop() else: print i reactor.callLater(1, self.blah)
- stop the reactor using reactor.stop()
once stopped, it cannot be restarted
how is the callLater implemented? the 1 second is used as a timeout, and when it times out, that is an event for the reactor and the function is called
- since our code is only called in the form of callbacks, when our code raises an
error, the exception is reported but the reactor isn’t stopped
writing the twisted client to grab poetry asynchronously we will still use the sockets to connect, but replace the select.select call with twisted reactor
for each socket, we do this:
sockets = [PoetrySocket(i + 1, addr) for i, addr in enumerate(1000, 1001, 1002)]
class PoetrySocket(object):
poem = ”
def __init__(self, task_num, address): self.task_num = task_num self.address = address self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(address) self.sock.setblocking(0)
from twisted.internet import reactor reactor.addReader(self)
so, we add the socket to the rector to watch out for.
the addReader method is defined in the IReactorFDSet Interface. and the reactor (the reactor class) implements that Interface. that is why the reactor has the addReader method.
def addReader(reader): “”” I add reader to the set of file descriptors to get read events for. @param reader: An L{IReadDescriptor} provider that will be checked for read events until it is removed from the reactor wi th L{removeReader}. @return: C{None}. “””
the docstring for that addReader method (as laid out in the interface IReactorFDSet, the reactor class will have this method too, with actual implementation code) says that the reader, which is the argument must implement the IReadDescriptor inteface
class IReadDescriptor(IFileDescriptor): def doRead(): “”” Some data is available for reading on your descriptor. “””
so, the reader must have the doRead method. also, the methods defined in IFileDescriptor (connectionLost(reason), fileno) in turn, IFileDescriptor implements the ILoggingContext and so we need it’s methods too (def logPrefix) note, we didn’t have to explicitly say that the Poetrysocket class implements the IReadDescriptor interface
hence, the doRead method is really our callback - but in twisted, we don’t give the callback function directly. we give the object of the class that implements the given interface (because that is mandated by the reactor api)
let’s look at doRead from twisted.internet import main
def doRead(self): bytes = ”
while True: try: bytesread = self.sock.recv(1024) if not bytesread: break else: bytes += bytesread except socket.error, e: if e.args[0] == errno.EWOULDBLOCK: break return main.CONNECTION_LOST
if not bytes: print ‘Task %d finished’ % self.task_num return main.CONNECTION_DONE else: msg = ‘Task %d: got %d bytes of poetry from %s’ print msg % (self.task_num, len(bytes), self.format_addr())
self.poem += bytes
so, doRead has to return special values depending on what happened. to know what to return, we would have to look at the documentation of the doRead method and the interface that mandates it.
if our callback was blocking here (by, not setting the sockets.setblocking(0) for eg), twisted would have behaved just like the synchronous client, twisted can’t stop our code. so, no blocking calls in our callbacks.
twisted also has the complimentary Writer equivalents to the Reader which monitors file descriptors (sockets) we want to send data to ~~~~~~~
in the last twisted program, we used twisted only to let us know which file descriptor is ready to receive data but we still used the sockets in our callbacks. we can use twisted apis there as well.
the interface defining how an object should behave is an abstraction (for eg, the IReadDescriptor is an abstraction for “file descriptor you can read bytes from”)
We have some new abstractions that we will use
Transports ~~~~~~~~~~
define by the ITransport interface. They represent a single connection that can send and/or receive bytes. “the Transports are abstracting the TCP connections”. so, we don’t need to make the sockets and use them to make the connection (socket.connect) method.
Transport can also represent UDP sockets, UNIX Pipes etc
so, the Transports handles the connection and the details of asynchronous i/o
Protocols ~~~~~~~~~
they are abstractions of the different protocols which are implemented by Twisted. abstractions as in, we don’t have to implement the protocol ourselves, we can implement the required interface and the object of that class will implement that protocol.
each connection (Transport object) requires a Protocol object with it. hence, we can use the Protocol object to store the data (maintain the state) of the the Transport objects (the various connections)
the Protocol class which implements the IProtocol interface says that it must have a makeConnection method which takes in an Transport instance as the argument.
Protocol factories ~~~~~~~~~~~~~~~~~~
since each connection needs a protocol instance, we need a way to make the appropriate protocol instance “on demand” whenever a new connection (Transport object) is made. this is done by Protocol factories. the Protocol factories are an example of the Factory design pattern. they simple have a buildProtocol method that returns a new Protocol instance each time it is called. Twisted uses this to method for each new connection
so, now in our new twisted server, we won’t use sockets what happens is, we again have a list of port numbers with the address localhost so, for we do reactor.connectTCP(host, port, factory) the host is always 127.0.0.1 port is 1001, 1002, 1003 factory is the instance of class that implements the ClientFactory (t.i.protocol.ClientFactory) This is the Protocol factory that allows us to spawn Protocol objects on demand for each connection (Transport instance) it has the attribute protocol which is assigned the PoetryProtocol class and PoetryProtocol class that implements the Protocol interface it is very clean with the methods dataReceived, connectionLost the protocol factory has the clientConnectionFailed method also.
the ClientFactory is a subclass for t.i.p.Factory and it is specialized for clients (for eg, it implements the buildProtocol method for us, actually the Factory class does it but meh)
so, in twisted, there are interfaces that dictate everything. we can have interfaces that implement some of the functions required by other interfaces for us
now, when the reactor is started, it used the PF to create protocol instances for the connections it is asked to make and starts interacting as per the Protocol defined we don’t need to even mention the Transport class, it is all taken care of by twisted. we use the Protocol to maintain state, look at the dataReceived method.
class PoetryProtocol(Protocol):
poem = ”
def dataReceived(self, data): self.poem += data
def connectionLost(self, reason): self.poemReceived(self.poem)
def poemReceived(self, poem): self.factory.poem_finished(poem)
we can refer to the PF from the protocol using self.factory
so, there is a single connection to each address that the reactor is given. and each connection has a protocol instance. the client factory sets the factory attribute of each protocol to point to the PF itself.
all the protocols can share state using their common factory class by accessing it by self.factory
we know that the protocol instance is associated with the connection, the Transport instance this is done by the makeConnection method (from t.i.BaseProtocol) def makeConnection(self, transport): “”“Make a connection to a transport and a server.
This sets the ‘transport’ attribute of this Protocol, and calls the connectionMade() callback. “”” self.connected = 1 self.transport = transport self.connectionMade()
it sets the connected flag to 1, transport attribute points to the transport object
understand Twisted code from the bottom up. read how the tcp is implemented and then go up. twisted builds on the underlying abstractions to get more abstractions.
now our client is pretty robust, but it still can improve. our ClientFactory (our PF i.e.) is burdened with the task of maintaining the poem count etc, which isn’t it’s job. all it must be concerned with is making PoetryProtocols and collecting the finished poems from them and send it to the code that needed it. let the business of checking if all the poems are received be outside the PF.
def main(): def got_poem(poem): poems.append(poem) if len(poems) == len(addresses): reactor.stop()
for address in addresses: host, port = address get_poetry(host, port, got_poem)
reactor.run()
def get_poetry(host, port, callback): “”” Download a poem from the given host and port and invoke callback(poem) when the poem is complete. “”” from twisted.internet import reactor factory = PoetryClientFactory(callback) reactor.connectTCP(host, port, factory)
class PoetryClientFactory(ClientFactory):
protocol = PoetryProtocol
def __init__(self, callback): self.callback = callback
def poem_finished(self, poem): self.callback(poem)
the callback does just what it is suppose to do and is much cleaner we pass it a callback that it must call when it gets a poem (send the data back to the code that requested it)
this is better code as the PF can now be used elsewhere where the context is a little different.
if there is an error connecting to the server, the PF has clientConnectionFailed method that we need to implement. but the default implementation is blank, so, in case of errors with connection, the reactor will just sit there, without doing nothing.
**the methods mandated by the interfaces are not necessary to implement. they are just the hooks that allow us to customize the program using the twisted apis and do what we want it to do. but the interfaces provide the default implementation and it is blank **
we didn’t implement the clientConnectionFailed method because what to do on failure is context specific. so, we can outsource that too. we can do:
def poem_finished(self, poem): self.callback(poem)
def clientConnectionFailed(self, connector, reason): self.callback(reason)
so, callback with poem if we have it or with error
this is a little odd, it will overload the callback method because it has to deal with both when there is success and when there is failure. we need separate callbacks for error and normal execution. how about callback(poem) and errback(err)
class PoetryClientFactory(ClientFactory): protocol = PoetryProtocol
def __init__(self, callback, errback): self.callback = callback self.errback = errback
def poem_finished(self, poem): self.callback(poem)
def clientConnectionFailed(self, connector, reason): self.errback(reason)
ofcourse, we do this for each server we wish to link to:
from twisted.internet import reactor factory = PoetryClientFactory(callback, errback) reactor.connectTCP(host, port, factory)
but does that mean that for every api, we will need to write 2 extra functions, callback, errback twisted has an abstraction to handle this
if we don’t catch the exception in asynchronous programs, like ones written using Twisted, the reactor will sit there, doing nothing, it won’t crash. hence, we need to make sure to ask Twisted to call our errbacks in case of exceptions
Twisted wraps the exception in a Failure object, which is an abstraction for exceptions and tracebacks
So, since callbacks are so fundamental to asynchronous programming, and using them can have us need to write another callback or errback, twisted abstracts them as well - The Deferred
A deferred contains a pair of callback chains - one chain for normal results, one for errors from twisted.internet.defer import Deferred
def got_poem(res): print ‘Your poem is served:’ print res
def poem_failed(err): print ‘No poetry for you.’
d = Deferred()
d.addCallbacks(got_poem, poem_failed)
d.callback(‘This poem is short.’)
the deferred doesn’t need the reactor. we create the deferred object, add the callbacks using addCallbacks(callback, errback) and fire the deferred by calling it’s callback/errback. ( d.errback(Exception(“this poem is short”), you need to pass the Failure object technically, but the deferred accepts Exception as well). the errback is invoked with a failure object but.
here, we still have to write 2 functions for our callback, but it is easy now to chain them. add multiple callbacks, errbacks etc and also to link them together; manage them in general
a deferred cannot be fired more than once. this is because they technically help us solve the problem of managing what to do when we can get the poem and when we cannot. in synchronous programming we used: try: // success except: // failure both of them were called only once. here, too, we use deferreds to make sure they are called only once.
we can start firing the deferred chains like so: reactor.callWhenRunning(d.callback, ‘a poem I am’)
using them, we can rewrite the twsited client
for address in addresses: host, port = address d = get_poetry(host, port) d.addCallbacks(got_poem, poem_failed) // in the callback, errback, we append the poems/errors d.addBoth(poem_done) // here, we check if len(poems)+len(errors) == len(addresses), if so, stop the reactor
def get_poetry(host, port): “”” Download a poem from the given host and port. This function returns a Deferred which will be fired with the complete text of the poem or a Failure if the poem could not be downloaded. “”” d = defer.Deferred() from twisted.internet import reactor factory = PoetryClientFactory(d) // the PF is not passed the deferred, not the callback/errback pair reactor.connectTCP(host, port, factory) return d
we don’t fire the deferred, the reactor calls the appropriate function (clientConnectionFailed for eg, and we fire the errback there)
methods from the PF: def poem_finished(self, poem): if self.deferred is not None: d, self.deferred = self.deferred, None // we dereference the deferred so that we don’t fire it accidently twice d.callback(poem)
def clientConnectionFailed(self, connector, reason): if self.deferred is not None: d, self.deferred = self.deferred, None d.errback(reason)
deferred refers to an asynchronous result, the result that is on it’s way when using the deferreds, it simply calls our callbacks/errbacks (the reactor calls them), in reply to an even so, we shouldn’t use blocking calls in our callbacks.
add just a callback function to the deferred - d.addCallback()
lets add some more features to the client it has the “byronification” service now. so, the downloaded poem is passed to byronify function. if it successfully processes the poem, pass it along, else, if it throws an error, raise an exception.
the logic in synchronous code: try: poem = get_poetry(host, port) except: print(“error getting the poem”) else: try: poem = engine.byronificate(poem) except GibberishError: print(“error getting the poem”) except: print poem (when some other error, print the original poem) else: print poem
trivia: the exception in synchronous code, it moves from the low level to the high level, from general purpose deep function calls to the more specific calls, leading up to the place where your function was called.
the errbacks are called by either the callback/errback before it failing or the deferred being fired by calling it’s errback first.
if the last in the deferred callback/errback fails, the reactor catches the exception and logs it
if any callback/errback wants to raise/re-raise an exception in asynchronous code, it can:
- raise any kind of exception
- return a Failure object
also, the first argument of any errback is always a Failure object. (But when calling the errback, you can pass it an Exception)
4 ways to add callbacks:
- addCallbacks
- addCallback
- addErrback
- addBoth –> the callback given here will run, it’s like the finally clause in try/except
so, we have now, to implement the above logic:
for address in addresses: host, port = address d = get_poetry(host, port) //this will give it a deferred, and get the poem using PF, P d.addCallback(try_to_cummingsify) //once we have the poem, we try to cummingsify it d.addCallbacks(got_poem, poem_failed) // print it if we get it, else we say couldnt get it d.addBoth(poem_done) //stop the reactor
def try_to_cummingsify(poem): try: return cummingsify(poem) except GibberishError: raise except: print ‘Cummingsify failed!’ return poem
def cummingsify(poem): def success(): return poem.lower()
def gibberish(): raise GibberishError()
def bug(): raise ValueError()
return random.choice([success, gibberish, bug])()
we are using try except here, we can add cummingsify as the callback directly. and add an errback for when it fails.
Failure.value –> the Exception object
hence, we can choose either to use try/except statements or let deferreds route the results/errors directly
lets write the server in twisted as well
class PoetryProtocol(Protocol): def connectionMade(self): self.transport.write(self.factory.poem) self.transport.loseConnection()
class PoetryFactory(ServerFactory): protocol = PoetryProtocol def __init__(self, poem): self.poem = poem
def main(): poem = open(poetry_file).read() factory = PoetryFactory(poem) from twisted.internet import reactor port = reactor.listenTCP(options.port or 0, factory, interface=options.iface) print ‘Serving %s on %s.’ % (poetry_file, port.getHost()) reactor.run()
the protocol we define is simple, when the connection is made, write the poem and close the connection the connectionMade method is a callback that is invoked by the reactor when the protocol is connected to the transport. for each new connection, there would a protocol instance and transport instance that would be created
note our PF subclasses the ServerFactory. we are using the listenTCP method and the factory argument should be an instance of the ServerFactory
here, whenever there is a new connection, the PF creates an instance of the Protocol, and a new Transport and assigns both to each other. then, the connectionMade is called and the data is transferred and the connection is closed.
from this high level code, you can check the documentation to learn about the hooks twisted provides to modify the behavior. for eg, there must be a method that is called when some client wants to connect to the server. we can use this method to check the ip of the client and deny/accept the connection request. or better, there may be a method to do this already.
uptil now, the poetry transformation was done by the client. let’s move that to the server. also, there are various types of transformations now possible - according to our protocol, our client has to give send a string in the form - transform_name.text_of_poem
this is a remote procedure call RPC - it is a protocol that one program can use to request a service from another program (client from server) without having to understand the network details. you are calling a remote procedure (procedure == function == subroutine)
what we do is in the Protocol, in the stringReceived method, we split the string on “.” and call the corresponding method on self.factory.transform(transform_name, text_of_poem) what the transform method does is checks if it has any method of that name, if yes, passes the poem there, else, returns None. the requested transformation service if there, sends the poem text to the class providing the service which lives outside of the PF.
**why do we have an extra step here, why not from the transform method call the service directly? this is to guard access to the services we want to provide. if not for this, the client could possibly make us execute arbitrary method (present in the class) by providing it’s name - this is how remote code execution happens! also, we have one more place to perform any protocol-specific adaptation to the API of the service object if need be.
note how there is seperation of concerns - the protocol doesn’t have the services nor does the PF. the services (the bussiness logic here so to say) lives in another class and it is just called by the PF
this server needs the poem to be sent to it by the client (or another server)
moral: appreciate how we separated the functional logic from the Protocol and PF using a separate Service class. this is good coding practice. this way, we can add more protocol by which we can serve clients, without having to touch the service class etc. this is decoupling in action.
if we have a deferred which returns another deferred, it behaves exactly how you would expect it to behave eg:
def callback_1(res): print ‘callback_1 got’, res return 1
def callback_2(res): print ‘callback_2 got’, res return 2
def callback_3(res): print ‘callback_3 got’, res return 3
def callback_2_async(res): print ‘callback_2 got’, res global deferred_2 # never do this in a real program deferred_2 = Deferred() return deferred_2
d = Deferred() d.addCallback(callback_1) d.addCallback(callback_2_async) d.addCallback(callback_3)
d.callback(0) //at this moment, we get: callback_1 got 0 callback_2 got 1
//that’s it, the callback 3 did not execute. because the deferred that was returned wasn’t fired
deferred_2.callback(2) //here, we manually fire it
callback_3 got 2 //and so, the outer deferred’s callback chain resumes firing.
earlier, the transformation service resided inside the client. now, we since our server provides it, we will use the server to do that
we use it like this: python twisted-client-6/get-poetry.py 10001 10002 10003 this client will get the poems from port 1000[2, 3] and use the server at 10001 for transformation
what happens is: we take the first port number and assign it to the
what if our result was sometimes synchronous and asynchronous other times? ~~~~
this can be the case if we have a proxy before the server and making all the requests pass thru the proxy. the proxy can either send the poem directly if it has it or download it from the server and then send it
to handle this situation, we will use a deferred, but we will fire it before we return it to the caller you can add callback chains to the deferred after it has been fired
the errback is fired if the previous callback/errback returned an Failure or raised an Exception
you can pause the deferred and it won’t fire it’s callbacks until you unpause it (this is used internally to pause the outer deferred when one callback returns an deferred)
the server now has: ProxyService - this returns poem if cached, else, connects to mainserver and sends a deferred PoetryProxyFactory - the user connects here, with the PoetryProxyProtocol. the connectionMade returns a deferred that calls service’s get_poem. get_poem can either connect or return the cached poem
we use maybeDeferred(some_function) –> this will return a deferred the same deferred if some_function returns a deferred which is already fired with callback if some_function returns a value which is already fired with errback if function raises an exception or returns a Failure
succeed(poem) –> return a deferred with it’s callback fired fail(poem) –> return a deferred with it’s errback fired with the exception
so, use maybedeferred or succeed/fail to make your synchronous code return asynchronous deferreds
the key to a good program is to solve it in pairs, after making a high level blueprint for the same
you can either pass the PF a deferred when initializing it or assign one for itself in the __init__ method also, generally the Service has the deferred, or else the PF
testing is via testing framework “trial” since python has a lot of namespacing, you can use several things to pass information
- nested functions - the inside function has access to out side function things
def one(): a = 1 def two(): print(a) two()
one() 1
- functions of a class can use self.<attribute_names>
~~~~~~~ twisted can be made a daemon process and logging can be sent to syslog, and pid can be stored in a file so that the admin can easily send signals to the daemon - using the twistd script
we need to use the IService interface to define a named service that can be started and stopped. the interface mandates that the service have “name” –> the name of the service “running” attribute present –> boolean for the running status of the service
also, we have - startService(), stopService()
service can be sorted into collections, that are started or stopped together
also we have setServiceParent to add a service to a collection the services can be organized into heriarchies. for the collections we have IServiceCollection. the collection is just a plain container class with methods to - get service’s name (getServiceNamed), iterate over services, add/remove service from collection
Application –> doesn’t have an interface of it’s own. all services are children or grandchildren of this interface IService, IServiceCollection
logging is handled by twisted.python.log
so, to go from frontend app to daemon, just define the service heirarchy and you are done
read more if we need to implement this as a daemon ~~~~~~~~
inline callbacks
python’s generators they use yield, and not return they are resumed from the last yield
generators return an iterator that you can iterate over, only once though. generators and iterators are lazily-created sequences of values
example:
def my_generator(): print ‘starting up’ yield 1 print “workin’” yield 2 print “still workin’” yield 3 print ‘done’
for n in my_generator(): print n
my_generator returns 1, 2, 3 when one pass has been made, it raises StopIteration exception (try: my_generator.next() except StopIteration: break
the generator function offers an analogy to the reactor. the whole generator is called by the reactor. it keeps running until it returns control to the reactor using yield. so, the code b/w the various yields (the print statements) are the callbacks. the nice thing here is that they are organized in a nice sequential manner and we can see which ones will be called, in which order
the generator can send/receive values b/w successive calls. example:
def my_generator(): print(‘starting up’) val1 = yield 1 print(“workin’”, val1) val2 = yield 2 print(“still workin’”, val2) val3 = yield 3 print(‘done’, val3)
gen = my_generator() print(gen.__next__()) print(gen.send(10)) //give val1 value of 10 print(gen.send(20)) //give val2 value of 20 print(gen.throw(Exception)) //val3 gets Exception. so, the program stops, done thingy not printed
starting up 1 workin’ 10 2 still workin’ 20 3 Traceback (most recent call last): File “inline-callbacks/gen-1.py”, line 16, in <module> print(gen.throw(Exception)) File “inline-callbacks/gen-1.py”, line 8, in my_generator val3 = yield 3 Exception
so, the generator is just a series of callbacks, just like in deferreds. just like the callbacks, the generators can also receive results/Failures.
we can write our callback functions in a series, just like with the generator eg:
@inlineCallbacks def my_callbacks(): from twisted.internet import reactor
print ‘first callback’ result = yield 1 # yielded values that aren’t deferred come right back
print ‘second callback got’, result d = Deferred() reactor.callLater(5, d.callback, 2) result = yield d # yielded deferreds will pause the generator
print ‘third callback got’, result # the result of the deferred
d = Deferred() reactor.callLater(5, d.errback, Exception(3))
try: yield d except Exception, e: result = e
print ‘fourth callback got’, repr(result) # the exception from the deferred
reactor.stop()
from twisted.internet import reactor reactor.callWhenRunning(my_callbacks) reactor.run()
hence, bottomline: the reactor runs the inlineCallbacks like this: while True: something = generator_or_inlineCallbacks.next() //something is what the generator “yield”ed
now, if this something is a deferred, the reactor executes that if it is just a normal value, like integer 5, the reactor resumes the execution of the callback series the result of the deferred, (if yielded by the generator) will be returned to the generator by the reactor
the decorator inlinecallbacks generally is used to turn a generator into asynchronous callbacks. the generator can return deferreds and we can send back the results of the deferreds to the generator so, if the generator yields a non deferred, say, integer 1, it is like a yielding a deferred that calls it passthru callback with the value 1. the result of the deferred is passing the value on, so, the generator gets the integer value 1 right back
calling the generator gives you an deferred. so, we get a deferred who’s callbacks we can see clearly and which may in turn return deferred values. we can attach callbacks/errbacks to this deferred. it will fire when the generator has finished executing. the callback is fired if the generator returns a normal value (use defer.returnValue to return success results), the errback if it raises an exception or returns an failure
so, inlineCallbacks is just a nice way of arranging the callbacks in one place so that it becomes easy to see what is going on.
we can use the deferred returned by calling that inlinecallbacks and attach more callbacks to it the advantages of using callbacks:
- all the callbacks share a namespace, since they are part of one function
- callback order is easier to see
- easy to pass results/errors to next in line callbacks
- errors can be handled by try/except
downsides – the callbacks cannot be called independently, you can only attach this series of callbacks to a deferred. makes code reuse difficult
we can group deferreds also using the DeferredList. this allows us to see their results in one place and also to check if everything finished executing we can start a bunch of asynchronous operations and get notified when all of them have finished executing
the DeferredList is created from python Lists, made up of only Deferred objects
the initialization of the Deferredlist gives us a deferred. you can add callbacks/errbacks to it if all the member deferreds succeed, the callback is called with the result which is a list of the same size as the DeferredList. each element has a tuple, containing True/False, the second the actual result i.e. (True, result) or (False, Failure)
the DL fires only if all the Ds in it have already fired. since the lists are ordered, the order of the list in the result of DL is in the same order of the Ds in the DL (not the order in which they were fired)
you can consume the errors of the Ds (if any) using consumeErrors=True now, to check that we have downloaded poems from all servers, we can do:
ds = [] for host, port in addresses: d = get_transformed_poem(host, port) d.addCallbacks(got_poem) ds.append(d)
dlist = DeferredList(ds, consumeErrors=True) dlist.addCallback(lambda res: reactor.stop())
from twisted.internet import defer
def got_results(res): print(‘We got:’, res)
d1 = defer.Deferred() d2 = defer.Deferred() d3 = defer.Deferred() d = defer.DeferredList([d1, d2, d3]) d.addCallback(got_results) d1.callback(‘d1 result’) d2.callback(‘d2 result’) d3.callback(‘d3 result’)
We got: [(True, ‘d1 result’), (True, ‘d2 result’), (True, ‘d3 result’)]
we can also cancel the deferred, this will cause it’s errback chains to run with the custom Failure –> CancelledError
cancelling the deferred after it has fired has no effect firing a deferred after cancelling results i the same Cancellederror
cancelling a deferred may not cancel the actual asynchronous operation
our control flow as in twsited-server-4
start with the main method. the client connection is made to proxyservice. it’s connectionMade method has a deferred that calls the get_poem of the service (which is tied to the PF of the proxy service). after we get the poem, the deferred has additional methods to send the poem to client (self.transport.write) and then to close the connection. the get_poem checks it’s self.poem and returns it if it has it. else, it employs another deferred to get the poem from the server (that deferred has the callbacks to close the connection with the server and set the self.poem)
the cancel is used for the connectionLost of the proxyserviceprotocol (the proxy service)
initiating a deferred with a function adds it as the first callback to that deferred eg: d = Deferred(some_fn)
I wrote somewhere that initializing the deferred with a function adds it as a callback that is absolutely incorrect here is the doc for __init__ on Deferred
def __init__(self, canceller=None): (source)
Parameters: canceller –> a callable used to stop the pending operation scheduled by this Deferred when Deferred.cancel is invoked. The canceller will be passed the deferred whose cancelation is requested (i.e., self).
If a canceller is not given, or does not invoke its argument’s callback or errback method, Deferred.cancel will invoke Deferred.errback with a CancelledError.
so, the function you pass is called when you cancel the deferred. it no function is given, the errback will be called on d.cancel
when an outer deferred returns an inner deferred and you cancel the outer deferred, it cancels the inner deferred this causes the cancel function to run of the inner deferred(if provided) or else it’s errback chain. after it is done, the outer deferred’s errback chain is run.