What is twisted?
twisted is an event-driven networking framework written in python, which supports many protocols, including UDP, TCP, TLS, and other application-layer protocols such as HTTP, SMTP, NNTM, IRC, and XMPP/Jabber. One of the best things about twisted is that it implements a lot of application-layer protocols, so that developers can use them directly. implementations of these protocols. In fact, it is very easy to modify Twisted's SSH server-side implementation. In many cases, developers need to implement the protocol class.
A Twisted program consists of a main loop initiated by a reactor and some callback functions. When an event occurs, such as a client connecting to a server, this is when the server-side event is triggered for execution.
Writing a Simple TCP Server with Twisted
The following code is a TCPServer, this server records the data information sent by the client.
==== ==== import sys from import ServerFactory from import LineReceiver from import log from import reactor class CmdProtocol(LineReceiver): delimiter = '\n' def connectionMade(self): self.client_ip = ()[1] ("Client connection from %s" % self.client_ip) if len() >= .clients_max: ("Too many connections. bye !") self.client_ip = None () else: (self.client_ip) def connectionLost(self, reason): ('Lost client connection. Reason: %s' % reason) if self.client_ip: (self.client_ip) def lineReceived(self, line): ('Cmd received from %s : %s' % (self.client_ip, line)) class MyFactory(ServerFactory): protocol = CmdProtocol def __init__(self, clients_max=10): self.clients_max = clients_max = [] () (9999, MyFactory(2)) ()
The following code is crucial:
from import reactor ()
These two lines of code will start the creator's main loop.
In the above code we have created the "ServerFactory" class which is responsible for returning instances of "CmdProtocol". Each connection is handled by an instantiated instance of "CmdProtocol". Twisted's reactor automatically creates an instance of CmdProtocol when a TCP connection is made. As you can see, the methods of the protocol class all correspond to a kind of event handling.
When the client connects to the server, the "connectionMade" method is triggered, in which you can do some authentication and so on, and also limit the total number of client connections. Each instance of a protocol has a reference to a factory, which can be accessed by using the factory instance.
The above implementation of "CmdProtocol" is a subclass of the LineReceiver class, which will separate the data sent by the client by line breaks, and will trigger the lineReceived method at each line break. Later we can enhance LineReceived to parse commands.
Twisted implements its own logging system, here we configure the log output to stdout
When executed we bind the factory to port 9999 to start listening.
user@lab:~/TMP$ python 2011-08-29 13:32:32+0200 [-] Log opened. 2011-08-29 13:32:32+0200 [-] __main__.MyFactory starting on 9999 2011-08-29 13:32:32+0200 [-] Starting factory <__main__.MyFactory instance at 0x227e320 2011-08-29 13:32:35+0200 [__main__.MyFactory] Client connection from 127.0.0.1 2011-08-29 13:32:38+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : hello server
Using Twisted to Call External Processes
Let's add a command to the previous server, through which we can read the contents of /var/log/syslog
import sys import os from import ServerFactory, ProcessProtocol from import LineReceiver from import log from import reactor class TailProtocol(ProcessProtocol): def __init__(self, write_callback): = write_callback def outReceived(self, data): ("Begin lastlog\n") data = [line for line in ('\n') if not ('==')] for d in data: (d + '\n') ("End lastlog\n") def processEnded(self, reason): if != 0: (reason) class CmdProtocol(LineReceiver): delimiter = '\n' def processCmd(self, line): if ('lastlog'): tailProtocol = TailProtocol() (tailProtocol, '/usr/bin/tail', args=['/usr/bin/tail', '-10', '/var/log/syslog']) elif ('exit'): () else: ('Command not found.\n') def connectionMade(self): self.client_ip = ()[1] ("Client connection from %s" % self.client_ip) if len() >= .clients_max: ("Too many connections. bye !") self.client_ip = None () else: (self.client_ip) def connectionLost(self, reason): ('Lost client connection. Reason: %s' % reason) if self.client_ip: (self.client_ip) def lineReceived(self, line): ('Cmd received from %s : %s' % (self.client_ip, line)) (line) class MyFactory(ServerFactory): protocol = CmdProtocol def __init__(self, clients_max=10): self.clients_max = clients_max = [] () (9999, MyFactory(2)) ()
In the above code, after receiving a line from the client, we will execute the processCmd method, if the received line is the exit command, then the server will disconnect, if we receive the lastlog, we have to spit out a child process to execute the tail command, and redirect the output of the tail command to the client. Here we need to implement the ProcessProtocol class, we need to override the processEnded method and outReceived method of this class. The outReceived method will be executed when there is output from the tail command and the processEnded method will be executed when the process exits.
The following is a sample execution result:
user@lab:~/TMP$ python 2011-08-29 15:13:38+0200 [-] Log opened. 2011-08-29 15:13:38+0200 [-] __main__.MyFactory starting on 9999 2011-08-29 15:13:38+0200 [-] Starting factory <__main__.MyFactory instance at 0x1a5a3f8> 2011-08-29 15:13:47+0200 [__main__.MyFactory] Client connection from 127.0.0.1 2011-08-29 15:13:58+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : test 2011-08-29 15:14:02+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : lastlog 2011-08-29 15:14:05+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : exit 2011-08-29 15:14:05+0200 [CmdProtocol,0,127.0.0.1] Lost client connection. Reason: [Failure instance: Traceback (failure with no frames): <class ''>: Connection was closed cleanly.
The following command can be used to initiate commands as a client:
user@lab:~$ netcat 127.0.0.1 9999 test Command not found. lastlog Begin lastlog Aug 29 15:02:03 lab sSMTP[5919]: Unable to locate mail Aug 29 15:02:03 lab sSMTP[5919]: Cannot open mail:25 Aug 29 15:02:03 lab CRON[4945]: (CRON) error (grandchild #4947 failed with exit status 1) Aug 29 15:02:03 lab sSMTP[5922]: Unable to locate mail Aug 29 15:02:03 lab sSMTP[5922]: Cannot open mail:25 Aug 29 15:02:03 lab CRON[4945]: (logcheck) MAIL (mailed 1 byte of output; but got status 0x0001, #012) Aug 29 15:05:01 lab CRON[5925]: (root) CMD (command -v debian-sa1 > /dev/null && debian-sa1 1 1) Aug 29 15:10:01 lab CRON[5930]: (root) CMD (test -x /usr/lib/atsar/atsa1 && /usr/lib/atsar/atsa1) Aug 29 15:10:01 lab CRON[5928]: (CRON) error (grandchild #5930 failed with exit status 1) Aug 29 15:13:21 lab pulseaudio[3361]: : 387 events suppressed End lastlog exit
Using Deferred Objects
The reactor is a loop that is waiting for an event to occur. The event can be a database operation, or a long computation, as long as it returns a Deferred object. As long as these operations return a Deferred object, the Deferred object automatically triggers a callback function when the event occurs. reactor will block the current code execution.
Now we're going to use the Defferred object to calculate the SHA1 hash.
import sys import os import hashlib from import ServerFactory, ProcessProtocol from import LineReceiver from import log from import reactor, threads class TailProtocol(ProcessProtocol): def __init__(self, write_callback): = write_callback def outReceived(self, data): ("Begin lastlog\n") data = [line for line in ('\n') if not ('==')] for d in data: (d + '\n') ("End lastlog\n") def processEnded(self, reason): if != 0: (reason) class HashCompute(object): def __init__(self, path, write_callback): = path = write_callback def blockingMethod(self): () data = file().read() # uncomment to add more delay # import time # (10) return hashlib.sha1(data).hexdigest() def compute(self): d = () () () def ret(self, hdata): ("File hash is : %s\n" % hdata) def err(self, failure): ("An error occured : %s\n" % ()) class CmdProtocol(LineReceiver): delimiter = '\n' def processCmd(self, line): if ('lastlog'): tailProtocol = TailProtocol() (tailProtocol, '/usr/bin/tail', args=['/usr/bin/tail', '-10', '/var/log/syslog']) elif ('comphash'): try: useless, path = (' ') except: ('Please provide a path.\n') return hc = HashCompute(path, ) () elif ('exit'): () else: ('Command not found.\n') def connectionMade(self): self.client_ip = ()[1] ("Client connection from %s" % self.client_ip) if len() >= .clients_max: ("Too many connections. bye !") self.client_ip = None () else: (self.client_ip) def connectionLost(self, reason): ('Lost client connection. Reason: %s' % reason) if self.client_ip: (self.client_ip) def lineReceived(self, line): ('Cmd received from %s : %s' % (self.client_ip, line)) (line) class MyFactory(ServerFactory): protocol = CmdProtocol def __init__(self, clients_max=10): self.clients_max = clients_max = [] () (9999, MyFactory(2)) ()
blockingMethod reads a file from the filesystem and calculates the SHA1. Here we use twisted's deferToThread method, which returns a Deferred object. This method returns a Deferred object. The Deferred object is returned immediately after the call, so that the main process can continue to execute and handle other events. The callback function is triggered as soon as the method passed to deferToThread finishes executing. If an error occurs during execution, the blockingMethod method throws an exception. If the execution succeeds, the result of the calculation is returned via hdata's ret.
Recommended reading material for twisted
/documents/current/core/howto/ /documents/current/core/howto/ /documents/current/core/howto/
API Documentation:
/documents/current/api/