]> git.proxmox.com Git - mirror_novnc.git/blob - utils/websocket.py
Merge pull request #444 from samhed/extdesktop
[mirror_novnc.git] / utils / websocket.py
1 #!/usr/bin/env python
2
3 '''
4 Python WebSocket library with support for "wss://" encryption.
5 Copyright 2011 Joel Martin
6 Licensed under LGPL version 3 (see docs/LICENSE.LGPL-3)
7
8 Supports following protocol versions:
9 - http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-07
10 - http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10
11 - http://tools.ietf.org/html/rfc6455
12
13 You can make a cert/key with openssl using:
14 openssl req -new -x509 -days 365 -nodes -out self.pem -keyout self.pem
15 as taken from http://docs.python.org/dev/library/ssl.html#certificates
16
17 '''
18
19 import os, sys, time, errno, signal, socket, select, logging
20 import array, struct
21 from base64 import b64encode, b64decode
22
23 # Imports that vary by python version
24
25 # python 3.0 differences
26 if sys.hexversion > 0x3000000:
27 b2s = lambda buf: buf.decode('latin_1')
28 s2b = lambda s: s.encode('latin_1')
29 s2a = lambda s: s
30 else:
31 b2s = lambda buf: buf # No-op
32 s2b = lambda s: s # No-op
33 s2a = lambda s: [ord(c) for c in s]
34 try: from io import StringIO
35 except: from cStringIO import StringIO
36 try: from http.server import SimpleHTTPRequestHandler
37 except: from SimpleHTTPServer import SimpleHTTPRequestHandler
38
39 # python 2.6 differences
40 try: from hashlib import sha1
41 except: from sha import sha as sha1
42
43 # python 2.5 differences
44 try:
45 from struct import pack, unpack_from
46 except:
47 from struct import pack
48 def unpack_from(fmt, buf, offset=0):
49 slice = buffer(buf, offset, struct.calcsize(fmt))
50 return struct.unpack(fmt, slice)
51
52 # Degraded functionality if these imports are missing
53 for mod, msg in [('numpy', 'HyBi protocol will be slower'),
54 ('ssl', 'TLS/SSL/wss is disabled'),
55 ('multiprocessing', 'Multi-Processing is disabled'),
56 ('resource', 'daemonizing is disabled')]:
57 try:
58 globals()[mod] = __import__(mod)
59 except ImportError:
60 globals()[mod] = None
61 print("WARNING: no '%s' module, %s" % (mod, msg))
62
63 if multiprocessing and sys.platform == 'win32':
64 # make sockets pickle-able/inheritable
65 import multiprocessing.reduction
66
67
68 # HTTP handler with WebSocket upgrade support
69 class WebSocketRequestHandler(SimpleHTTPRequestHandler):
70 """
71 WebSocket Request Handler Class, derived from SimpleHTTPRequestHandler.
72 Must be sub-classed with new_websocket_client method definition.
73 The request handler can be configured by setting optional
74 attributes on the server object:
75
76 * only_upgrade: If true, SimpleHTTPRequestHandler will not be enabled,
77 only websocket is allowed.
78 * verbose: If true, verbose logging is activated.
79 * daemon: Running as daemon, do not write to console etc
80 * record: Record raw frame data as JavaScript array into specified filename
81 * run_once: Handle a single request
82 * handler_id: A sequence number for this connection, appended to record filename
83 """
84 buffer_size = 65536
85
86 GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
87
88 server_version = "WebSockify"
89
90 protocol_version = "HTTP/1.1"
91
92 # An exception while the WebSocket client was connected
93 class CClose(Exception):
94 pass
95
96 def __init__(self, req, addr, server):
97 # Retrieve a few configuration variables from the server
98 self.only_upgrade = getattr(server, "only_upgrade", False)
99 self.verbose = getattr(server, "verbose", False)
100 self.daemon = getattr(server, "daemon", False)
101 self.record = getattr(server, "record", False)
102 self.run_once = getattr(server, "run_once", False)
103 self.rec = None
104 self.handler_id = getattr(server, "handler_id", False)
105 self.file_only = getattr(server, "file_only", False)
106 self.traffic = getattr(server, "traffic", False)
107
108 self.logger = getattr(server, "logger", None)
109 if self.logger is None:
110 self.logger = WebSocketServer.get_logger()
111
112 SimpleHTTPRequestHandler.__init__(self, req, addr, server)
113
114 @staticmethod
115 def unmask(buf, hlen, plen):
116 pstart = hlen + 4
117 pend = pstart + plen
118 if numpy:
119 b = c = s2b('')
120 if plen >= 4:
121 mask = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
122 offset=hlen, count=1)
123 data = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
124 offset=pstart, count=int(plen / 4))
125 #b = numpy.bitwise_xor(data, mask).data
126 b = numpy.bitwise_xor(data, mask).tostring()
127
128 if plen % 4:
129 #self.msg("Partial unmask")
130 mask = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
131 offset=hlen, count=(plen % 4))
132 data = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
133 offset=pend - (plen % 4),
134 count=(plen % 4))
135 c = numpy.bitwise_xor(data, mask).tostring()
136 return b + c
137 else:
138 # Slower fallback
139 mask = buf[hlen:hlen+4]
140 data = array.array('B')
141 mask = s2a(mask)
142 data.fromstring(buf[pstart:pend])
143 for i in range(len(data)):
144 data[i] ^= mask[i % 4]
145 return data.tostring()
146
147 @staticmethod
148 def encode_hybi(buf, opcode, base64=False):
149 """ Encode a HyBi style WebSocket frame.
150 Optional opcode:
151 0x0 - continuation
152 0x1 - text frame (base64 encode buf)
153 0x2 - binary frame (use raw buf)
154 0x8 - connection close
155 0x9 - ping
156 0xA - pong
157 """
158 if base64:
159 buf = b64encode(buf)
160
161 b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
162 payload_len = len(buf)
163 if payload_len <= 125:
164 header = pack('>BB', b1, payload_len)
165 elif payload_len > 125 and payload_len < 65536:
166 header = pack('>BBH', b1, 126, payload_len)
167 elif payload_len >= 65536:
168 header = pack('>BBQ', b1, 127, payload_len)
169
170 #self.msg("Encoded: %s", repr(header + buf))
171
172 return header + buf, len(header), 0
173
174 @staticmethod
175 def decode_hybi(buf, base64=False, logger=None):
176 """ Decode HyBi style WebSocket packets.
177 Returns:
178 {'fin' : 0_or_1,
179 'opcode' : number,
180 'masked' : boolean,
181 'hlen' : header_bytes_number,
182 'length' : payload_bytes_number,
183 'payload' : decoded_buffer,
184 'left' : bytes_left_number,
185 'close_code' : number,
186 'close_reason' : string}
187 """
188
189 f = {'fin' : 0,
190 'opcode' : 0,
191 'masked' : False,
192 'hlen' : 2,
193 'length' : 0,
194 'payload' : None,
195 'left' : 0,
196 'close_code' : 1000,
197 'close_reason' : ''}
198
199 if logger is None:
200 logger = WebSocketServer.get_logger()
201
202 blen = len(buf)
203 f['left'] = blen
204
205 if blen < f['hlen']:
206 return f # Incomplete frame header
207
208 b1, b2 = unpack_from(">BB", buf)
209 f['opcode'] = b1 & 0x0f
210 f['fin'] = (b1 & 0x80) >> 7
211 f['masked'] = (b2 & 0x80) >> 7
212
213 f['length'] = b2 & 0x7f
214
215 if f['length'] == 126:
216 f['hlen'] = 4
217 if blen < f['hlen']:
218 return f # Incomplete frame header
219 (f['length'],) = unpack_from('>xxH', buf)
220 elif f['length'] == 127:
221 f['hlen'] = 10
222 if blen < f['hlen']:
223 return f # Incomplete frame header
224 (f['length'],) = unpack_from('>xxQ', buf)
225
226 full_len = f['hlen'] + f['masked'] * 4 + f['length']
227
228 if blen < full_len: # Incomplete frame
229 return f # Incomplete frame header
230
231 # Number of bytes that are part of the next frame(s)
232 f['left'] = blen - full_len
233
234 # Process 1 frame
235 if f['masked']:
236 # unmask payload
237 f['payload'] = WebSocketRequestHandler.unmask(buf, f['hlen'],
238 f['length'])
239 else:
240 logger.debug("Unmasked frame: %s" % repr(buf))
241 f['payload'] = buf[(f['hlen'] + f['masked'] * 4):full_len]
242
243 if base64 and f['opcode'] in [1, 2]:
244 try:
245 f['payload'] = b64decode(f['payload'])
246 except:
247 logger.exception("Exception while b64decoding buffer: %s" %
248 (repr(buf)))
249 raise
250
251 if f['opcode'] == 0x08:
252 if f['length'] >= 2:
253 f['close_code'] = unpack_from(">H", f['payload'])[0]
254 if f['length'] > 3:
255 f['close_reason'] = f['payload'][2:]
256
257 return f
258
259
260 #
261 # WebSocketRequestHandler logging/output functions
262 #
263
264 def print_traffic(self, token="."):
265 """ Show traffic flow mode. """
266 if self.traffic:
267 sys.stdout.write(token)
268 sys.stdout.flush()
269
270 def msg(self, msg, *args, **kwargs):
271 """ Output message with handler_id prefix. """
272 prefix = "% 3d: " % self.handler_id
273 self.logger.log(logging.INFO, "%s%s" % (prefix, msg), *args, **kwargs)
274
275 def vmsg(self, msg, *args, **kwargs):
276 """ Same as msg() but as debug. """
277 prefix = "% 3d: " % self.handler_id
278 self.logger.log(logging.DEBUG, "%s%s" % (prefix, msg), *args, **kwargs)
279
280 def warn(self, msg, *args, **kwargs):
281 """ Same as msg() but as warning. """
282 prefix = "% 3d: " % self.handler_id
283 self.logger.log(logging.WARN, "%s%s" % (prefix, msg), *args, **kwargs)
284
285 #
286 # Main WebSocketRequestHandler methods
287 #
288 def send_frames(self, bufs=None):
289 """ Encode and send WebSocket frames. Any frames already
290 queued will be sent first. If buf is not set then only queued
291 frames will be sent. Returns the number of pending frames that
292 could not be fully sent. If returned pending frames is greater
293 than 0, then the caller should call again when the socket is
294 ready. """
295
296 tdelta = int(time.time()*1000) - self.start_time
297
298 if bufs:
299 for buf in bufs:
300 if self.base64:
301 encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=1, base64=True)
302 else:
303 encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=2, base64=False)
304
305 if self.rec:
306 self.rec.write("%s,\n" %
307 repr("{%s{" % tdelta
308 + encbuf[lenhead:len(encbuf)-lentail]))
309
310 self.send_parts.append(encbuf)
311
312 while self.send_parts:
313 # Send pending frames
314 buf = self.send_parts.pop(0)
315 sent = self.request.send(buf)
316
317 if sent == len(buf):
318 self.print_traffic("<")
319 else:
320 self.print_traffic("<.")
321 self.send_parts.insert(0, buf[sent:])
322 break
323
324 return len(self.send_parts)
325
326 def recv_frames(self):
327 """ Receive and decode WebSocket frames.
328
329 Returns:
330 (bufs_list, closed_string)
331 """
332
333 closed = False
334 bufs = []
335 tdelta = int(time.time()*1000) - self.start_time
336
337 buf = self.request.recv(self.buffer_size)
338 if len(buf) == 0:
339 closed = {'code': 1000, 'reason': "Client closed abruptly"}
340 return bufs, closed
341
342 if self.recv_part:
343 # Add partially received frames to current read buffer
344 buf = self.recv_part + buf
345 self.recv_part = None
346
347 while buf:
348 frame = self.decode_hybi(buf, base64=self.base64,
349 logger=self.logger)
350 #self.msg("Received buf: %s, frame: %s", repr(buf), frame)
351
352 if frame['payload'] == None:
353 # Incomplete/partial frame
354 self.print_traffic("}.")
355 if frame['left'] > 0:
356 self.recv_part = buf[-frame['left']:]
357 break
358 else:
359 if frame['opcode'] == 0x8: # connection close
360 closed = {'code': frame['close_code'],
361 'reason': frame['close_reason']}
362 break
363
364 self.print_traffic("}")
365
366 if self.rec:
367 start = frame['hlen']
368 end = frame['hlen'] + frame['length']
369 if frame['masked']:
370 recbuf = WebSocketRequestHandler.unmask(buf, frame['hlen'],
371 frame['length'])
372 else:
373 recbuf = buf[frame['hlen']:frame['hlen'] +
374 frame['length']]
375 self.rec.write("%s,\n" %
376 repr("}%s}" % tdelta + recbuf))
377
378
379 bufs.append(frame['payload'])
380
381 if frame['left']:
382 buf = buf[-frame['left']:]
383 else:
384 buf = ''
385
386 return bufs, closed
387
388 def send_close(self, code=1000, reason=''):
389 """ Send a WebSocket orderly close frame. """
390
391 msg = pack(">H%ds" % len(reason), code, reason)
392 buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False)
393 self.request.send(buf)
394
395 def do_websocket_handshake(self):
396 h = self.headers
397
398 prot = 'WebSocket-Protocol'
399 protocols = h.get('Sec-'+prot, h.get(prot, '')).split(',')
400
401 ver = h.get('Sec-WebSocket-Version')
402 if ver:
403 # HyBi/IETF version of the protocol
404
405 # HyBi-07 report version 7
406 # HyBi-08 - HyBi-12 report version 8
407 # HyBi-13 reports version 13
408 if ver in ['7', '8', '13']:
409 self.version = "hybi-%02d" % int(ver)
410 else:
411 self.send_error(400, "Unsupported protocol version %s" % ver)
412 return False
413
414 key = h['Sec-WebSocket-Key']
415
416 # Choose binary if client supports it
417 if 'binary' in protocols:
418 self.base64 = False
419 elif 'base64' in protocols:
420 self.base64 = True
421 else:
422 self.send_error(400, "Client must support 'binary' or 'base64' protocol")
423 return False
424
425 # Generate the hash value for the accept header
426 accept = b64encode(sha1(s2b(key + self.GUID)).digest())
427
428 self.send_response(101, "Switching Protocols")
429 self.send_header("Upgrade", "websocket")
430 self.send_header("Connection", "Upgrade")
431 self.send_header("Sec-WebSocket-Accept", b2s(accept))
432 if self.base64:
433 self.send_header("Sec-WebSocket-Protocol", "base64")
434 else:
435 self.send_header("Sec-WebSocket-Protocol", "binary")
436 self.end_headers()
437 return True
438 else:
439 self.send_error(400, "Missing Sec-WebSocket-Version header. Hixie protocols not supported.")
440
441 return False
442
443 def handle_websocket(self):
444 """Upgrade a connection to Websocket, if requested. If this succeeds,
445 new_websocket_client() will be called. Otherwise, False is returned.
446 """
447 if (self.headers.get('upgrade') and
448 self.headers.get('upgrade').lower() == 'websocket'):
449
450 if not self.do_websocket_handshake():
451 return False
452
453 # Indicate to server that a Websocket upgrade was done
454 self.server.ws_connection = True
455 # Initialize per client settings
456 self.send_parts = []
457 self.recv_part = None
458 self.start_time = int(time.time()*1000)
459
460 # client_address is empty with, say, UNIX domain sockets
461 client_addr = ""
462 is_ssl = False
463 try:
464 client_addr = self.client_address[0]
465 is_ssl = self.client_address[2]
466 except IndexError:
467 pass
468
469 if is_ssl:
470 self.stype = "SSL/TLS (wss://)"
471 else:
472 self.stype = "Plain non-SSL (ws://)"
473
474 self.log_message("%s: %s WebSocket connection", client_addr,
475 self.stype)
476 self.log_message("%s: Version %s, base64: '%s'", client_addr,
477 self.version, self.base64)
478 if self.path != '/':
479 self.log_message("%s: Path: '%s'", client_addr, self.path)
480
481 if self.record:
482 # Record raw frame data as JavaScript array
483 fname = "%s.%s" % (self.record,
484 self.handler_id)
485 self.log_message("opening record file: %s", fname)
486 self.rec = open(fname, 'w+')
487 encoding = "binary"
488 if self.base64: encoding = "base64"
489 self.rec.write("var VNC_frame_encoding = '%s';\n"
490 % encoding)
491 self.rec.write("var VNC_frame_data = [\n")
492
493 try:
494 self.new_websocket_client()
495 except self.CClose:
496 # Close the client
497 _, exc, _ = sys.exc_info()
498 self.send_close(exc.args[0], exc.args[1])
499 return True
500 else:
501 return False
502
503 def do_GET(self):
504 """Handle GET request. Calls handle_websocket(). If unsuccessful,
505 and web server is enabled, SimpleHTTPRequestHandler.do_GET will be called."""
506 if not self.handle_websocket():
507 if self.only_upgrade:
508 self.send_error(405, "Method Not Allowed")
509 else:
510 SimpleHTTPRequestHandler.do_GET(self)
511
512 def list_directory(self, path):
513 if self.file_only:
514 self.send_error(404, "No such file")
515 else:
516 return SimpleHTTPRequestHandler.list_directory(self, path)
517
518 def new_websocket_client(self):
519 """ Do something with a WebSockets client connection. """
520 raise Exception("WebSocketRequestHandler.new_websocket_client() must be overloaded")
521
522 def do_HEAD(self):
523 if self.only_upgrade:
524 self.send_error(405, "Method Not Allowed")
525 else:
526 SimpleHTTPRequestHandler.do_HEAD(self)
527
528 def finish(self):
529 if self.rec:
530 self.rec.write("'EOF'];\n")
531 self.rec.close()
532
533 def handle(self):
534 # When using run_once, we have a single process, so
535 # we cannot loop in BaseHTTPRequestHandler.handle; we
536 # must return and handle new connections
537 if self.run_once:
538 self.handle_one_request()
539 else:
540 SimpleHTTPRequestHandler.handle(self)
541
542 def log_request(self, code='-', size='-'):
543 if self.verbose:
544 SimpleHTTPRequestHandler.log_request(self, code, size)
545
546
547 class WebSocketServer(object):
548 """
549 WebSockets server class.
550 As an alternative, the standard library SocketServer can be used
551 """
552
553 policy_response = """<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>\n"""
554 log_prefix = "websocket"
555
556 # An exception before the WebSocket connection was established
557 class EClose(Exception):
558 pass
559
560 class Terminate(Exception):
561 pass
562
563 def __init__(self, RequestHandlerClass, listen_host='',
564 listen_port=None, source_is_ipv6=False,
565 verbose=False, cert='', key='', ssl_only=None,
566 daemon=False, record='', web='',
567 file_only=False,
568 run_once=False, timeout=0, idle_timeout=0, traffic=False,
569 tcp_keepalive=True, tcp_keepcnt=None, tcp_keepidle=None,
570 tcp_keepintvl=None):
571
572 # settings
573 self.RequestHandlerClass = RequestHandlerClass
574 self.verbose = verbose
575 self.listen_host = listen_host
576 self.listen_port = listen_port
577 self.prefer_ipv6 = source_is_ipv6
578 self.ssl_only = ssl_only
579 self.daemon = daemon
580 self.run_once = run_once
581 self.timeout = timeout
582 self.idle_timeout = idle_timeout
583 self.traffic = traffic
584
585 self.launch_time = time.time()
586 self.ws_connection = False
587 self.handler_id = 1
588
589 self.logger = self.get_logger()
590 self.tcp_keepalive = tcp_keepalive
591 self.tcp_keepcnt = tcp_keepcnt
592 self.tcp_keepidle = tcp_keepidle
593 self.tcp_keepintvl = tcp_keepintvl
594
595 # Make paths settings absolute
596 self.cert = os.path.abspath(cert)
597 self.key = self.web = self.record = ''
598 if key:
599 self.key = os.path.abspath(key)
600 if web:
601 self.web = os.path.abspath(web)
602 if record:
603 self.record = os.path.abspath(record)
604
605 if self.web:
606 os.chdir(self.web)
607 self.only_upgrade = not self.web
608
609 # Sanity checks
610 if not ssl and self.ssl_only:
611 raise Exception("No 'ssl' module and SSL-only specified")
612 if self.daemon and not resource:
613 raise Exception("Module 'resource' required to daemonize")
614
615 # Show configuration
616 self.msg("WebSocket server settings:")
617 self.msg(" - Listen on %s:%s",
618 self.listen_host, self.listen_port)
619 self.msg(" - Flash security policy server")
620 if self.web:
621 self.msg(" - Web server. Web root: %s", self.web)
622 if ssl:
623 if os.path.exists(self.cert):
624 self.msg(" - SSL/TLS support")
625 if self.ssl_only:
626 self.msg(" - Deny non-SSL/TLS connections")
627 else:
628 self.msg(" - No SSL/TLS support (no cert file)")
629 else:
630 self.msg(" - No SSL/TLS support (no 'ssl' module)")
631 if self.daemon:
632 self.msg(" - Backgrounding (daemon)")
633 if self.record:
634 self.msg(" - Recording to '%s.*'", self.record)
635
636 #
637 # WebSocketServer static methods
638 #
639
640 @staticmethod
641 def get_logger():
642 return logging.getLogger("%s.%s" % (
643 WebSocketServer.log_prefix,
644 WebSocketServer.__class__.__name__))
645
646 @staticmethod
647 def socket(host, port=None, connect=False, prefer_ipv6=False,
648 unix_socket=None, use_ssl=False, tcp_keepalive=True,
649 tcp_keepcnt=None, tcp_keepidle=None, tcp_keepintvl=None):
650 """ Resolve a host (and optional port) to an IPv4 or IPv6
651 address. Create a socket. Bind to it if listen is set,
652 otherwise connect to it. Return the socket.
653 """
654 flags = 0
655 if host == '':
656 host = None
657 if connect and not (port or unix_socket):
658 raise Exception("Connect mode requires a port")
659 if use_ssl and not ssl:
660 raise Exception("SSL socket requested but Python SSL module not loaded.");
661 if not connect and use_ssl:
662 raise Exception("SSL only supported in connect mode (for now)")
663 if not connect:
664 flags = flags | socket.AI_PASSIVE
665
666 if not unix_socket:
667 addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM,
668 socket.IPPROTO_TCP, flags)
669 if not addrs:
670 raise Exception("Could not resolve host '%s'" % host)
671 addrs.sort(key=lambda x: x[0])
672 if prefer_ipv6:
673 addrs.reverse()
674 sock = socket.socket(addrs[0][0], addrs[0][1])
675
676 if tcp_keepalive:
677 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
678 if tcp_keepcnt:
679 sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT,
680 tcp_keepcnt)
681 if tcp_keepidle:
682 sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE,
683 tcp_keepidle)
684 if tcp_keepintvl:
685 sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL,
686 tcp_keepintvl)
687
688 if connect:
689 sock.connect(addrs[0][4])
690 if use_ssl:
691 sock = ssl.wrap_socket(sock)
692 else:
693 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
694 sock.bind(addrs[0][4])
695 sock.listen(100)
696 else:
697 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
698 sock.connect(unix_socket)
699
700 return sock
701
702 @staticmethod
703 def daemonize(keepfd=None, chdir='/'):
704 os.umask(0)
705 if chdir:
706 os.chdir(chdir)
707 else:
708 os.chdir('/')
709 os.setgid(os.getgid()) # relinquish elevations
710 os.setuid(os.getuid()) # relinquish elevations
711
712 # Double fork to daemonize
713 if os.fork() > 0: os._exit(0) # Parent exits
714 os.setsid() # Obtain new process group
715 if os.fork() > 0: os._exit(0) # Parent exits
716
717 # Signal handling
718 signal.signal(signal.SIGTERM, signal.SIG_IGN)
719 signal.signal(signal.SIGINT, signal.SIG_IGN)
720
721 # Close open files
722 maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
723 if maxfd == resource.RLIM_INFINITY: maxfd = 256
724 for fd in reversed(range(maxfd)):
725 try:
726 if fd != keepfd:
727 os.close(fd)
728 except OSError:
729 _, exc, _ = sys.exc_info()
730 if exc.errno != errno.EBADF: raise
731
732 # Redirect I/O to /dev/null
733 os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
734 os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
735 os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
736
737 def do_handshake(self, sock, address):
738 """
739 do_handshake does the following:
740 - Peek at the first few bytes from the socket.
741 - If the connection is Flash policy request then answer it,
742 close the socket and return.
743 - If the connection is an HTTPS/SSL/TLS connection then SSL
744 wrap the socket.
745 - Read from the (possibly wrapped) socket.
746 - If we have received a HTTP GET request and the webserver
747 functionality is enabled, answer it, close the socket and
748 return.
749 - Assume we have a WebSockets connection, parse the client
750 handshake data.
751 - Send a WebSockets handshake server response.
752 - Return the socket for this WebSocket client.
753 """
754 ready = select.select([sock], [], [], 3)[0]
755
756
757 if not ready:
758 raise self.EClose("ignoring socket not ready")
759 # Peek, but do not read the data so that we have a opportunity
760 # to SSL wrap the socket first
761 handshake = sock.recv(1024, socket.MSG_PEEK)
762 #self.msg("Handshake [%s]" % handshake)
763
764 if handshake == "":
765 raise self.EClose("ignoring empty handshake")
766
767 elif handshake.startswith(s2b("<policy-file-request/>")):
768 # Answer Flash policy request
769 handshake = sock.recv(1024)
770 sock.send(s2b(self.policy_response))
771 raise self.EClose("Sending flash policy response")
772
773 elif handshake[0] in ("\x16", "\x80", 22, 128):
774 # SSL wrap the connection
775 if not ssl:
776 raise self.EClose("SSL connection but no 'ssl' module")
777 if not os.path.exists(self.cert):
778 raise self.EClose("SSL connection but '%s' not found"
779 % self.cert)
780 retsock = None
781 try:
782 retsock = ssl.wrap_socket(
783 sock,
784 server_side=True,
785 certfile=self.cert,
786 keyfile=self.key)
787 except ssl.SSLError:
788 _, x, _ = sys.exc_info()
789 if x.args[0] == ssl.SSL_ERROR_EOF:
790 if len(x.args) > 1:
791 raise self.EClose(x.args[1])
792 else:
793 raise self.EClose("Got SSL_ERROR_EOF")
794 else:
795 raise
796
797 elif self.ssl_only:
798 raise self.EClose("non-SSL connection received but disallowed")
799
800 else:
801 retsock = sock
802
803 # If the address is like (host, port), we are extending it
804 # with a flag indicating SSL. Not many other options
805 # available...
806 if len(address) == 2:
807 address = (address[0], address[1], (retsock != sock))
808
809 self.RequestHandlerClass(retsock, address, self)
810
811 # Return the WebSockets socket which may be SSL wrapped
812 return retsock
813
814 #
815 # WebSocketServer logging/output functions
816 #
817
818 def msg(self, *args, **kwargs):
819 """ Output message as info """
820 self.logger.log(logging.INFO, *args, **kwargs)
821
822 def vmsg(self, *args, **kwargs):
823 """ Same as msg() but as debug. """
824 self.logger.log(logging.DEBUG, *args, **kwargs)
825
826 def warn(self, *args, **kwargs):
827 """ Same as msg() but as warning. """
828 self.logger.log(logging.WARN, *args, **kwargs)
829
830
831 #
832 # Events that can/should be overridden in sub-classes
833 #
834 def started(self):
835 """ Called after WebSockets startup """
836 self.vmsg("WebSockets server started")
837
838 def poll(self):
839 """ Run periodically while waiting for connections. """
840 #self.vmsg("Running poll()")
841 pass
842
843 def terminate(self):
844 raise self.Terminate()
845
846 def multiprocessing_SIGCHLD(self, sig, stack):
847 self.vmsg('Reaing zombies, active child count is %s', len(multiprocessing.active_children()))
848
849 def fallback_SIGCHLD(self, sig, stack):
850 # Reap zombies when using os.fork() (python 2.4)
851 self.vmsg("Got SIGCHLD, reaping zombies")
852 try:
853 result = os.waitpid(-1, os.WNOHANG)
854 while result[0]:
855 self.vmsg("Reaped child process %s" % result[0])
856 result = os.waitpid(-1, os.WNOHANG)
857 except (OSError):
858 pass
859
860 def do_SIGINT(self, sig, stack):
861 self.msg("Got SIGINT, exiting")
862 self.terminate()
863
864 def do_SIGTERM(self, sig, stack):
865 self.msg("Got SIGTERM, exiting")
866 self.terminate()
867
868 def top_new_client(self, startsock, address):
869 """ Do something with a WebSockets client connection. """
870 # handler process
871 client = None
872 try:
873 try:
874 client = self.do_handshake(startsock, address)
875 except self.EClose:
876 _, exc, _ = sys.exc_info()
877 # Connection was not a WebSockets connection
878 if exc.args[0]:
879 self.msg("%s: %s" % (address[0], exc.args[0]))
880 except WebSocketServer.Terminate:
881 raise
882 except Exception:
883 _, exc, _ = sys.exc_info()
884 self.msg("handler exception: %s" % str(exc))
885 self.vmsg("exception", exc_info=True)
886 finally:
887
888 if client and client != startsock:
889 # Close the SSL wrapped socket
890 # Original socket closed by caller
891 client.close()
892
893 def start_server(self):
894 """
895 Daemonize if requested. Listen for for connections. Run
896 do_handshake() method for each connection. If the connection
897 is a WebSockets client then call new_websocket_client() method (which must
898 be overridden) for each new client connection.
899 """
900 lsock = self.socket(self.listen_host, self.listen_port, False,
901 self.prefer_ipv6,
902 tcp_keepalive=self.tcp_keepalive,
903 tcp_keepcnt=self.tcp_keepcnt,
904 tcp_keepidle=self.tcp_keepidle,
905 tcp_keepintvl=self.tcp_keepintvl)
906
907 if self.daemon:
908 self.daemonize(keepfd=lsock.fileno(), chdir=self.web)
909
910 self.started() # Some things need to happen after daemonizing
911
912 # Allow override of signals
913 original_signals = {
914 signal.SIGINT: signal.getsignal(signal.SIGINT),
915 signal.SIGTERM: signal.getsignal(signal.SIGTERM),
916 signal.SIGCHLD: signal.getsignal(signal.SIGCHLD),
917 }
918 signal.signal(signal.SIGINT, self.do_SIGINT)
919 signal.signal(signal.SIGTERM, self.do_SIGTERM)
920 if not multiprocessing:
921 # os.fork() (python 2.4) child reaper
922 signal.signal(signal.SIGCHLD, self.fallback_SIGCHLD)
923 else:
924 # make sure that _cleanup is called when children die
925 # by calling active_children on SIGCHLD
926 signal.signal(signal.SIGCHLD, self.multiprocessing_SIGCHLD)
927
928 last_active_time = self.launch_time
929 try:
930 while True:
931 try:
932 try:
933 startsock = None
934 pid = err = 0
935 child_count = 0
936
937 if multiprocessing:
938 # Collect zombie child processes
939 child_count = len(multiprocessing.active_children())
940
941 time_elapsed = time.time() - self.launch_time
942 if self.timeout and time_elapsed > self.timeout:
943 self.msg('listener exit due to --timeout %s'
944 % self.timeout)
945 break
946
947 if self.idle_timeout:
948 idle_time = 0
949 if child_count == 0:
950 idle_time = time.time() - last_active_time
951 else:
952 idle_time = 0
953 last_active_time = time.time()
954
955 if idle_time > self.idle_timeout and child_count == 0:
956 self.msg('listener exit due to --idle-timeout %s'
957 % self.idle_timeout)
958 break
959
960 try:
961 self.poll()
962
963 ready = select.select([lsock], [], [], 1)[0]
964 if lsock in ready:
965 startsock, address = lsock.accept()
966 else:
967 continue
968 except self.Terminate:
969 raise
970 except Exception:
971 _, exc, _ = sys.exc_info()
972 if hasattr(exc, 'errno'):
973 err = exc.errno
974 elif hasattr(exc, 'args'):
975 err = exc.args[0]
976 else:
977 err = exc[0]
978 if err == errno.EINTR:
979 self.vmsg("Ignoring interrupted syscall")
980 continue
981 else:
982 raise
983
984 if self.run_once:
985 # Run in same process if run_once
986 self.top_new_client(startsock, address)
987 if self.ws_connection :
988 self.msg('%s: exiting due to --run-once'
989 % address[0])
990 break
991 elif multiprocessing:
992 self.vmsg('%s: new handler Process' % address[0])
993 p = multiprocessing.Process(
994 target=self.top_new_client,
995 args=(startsock, address))
996 p.start()
997 # child will not return
998 else:
999 # python 2.4
1000 self.vmsg('%s: forking handler' % address[0])
1001 pid = os.fork()
1002 if pid == 0:
1003 # child handler process
1004 self.top_new_client(startsock, address)
1005 break # child process exits
1006
1007 # parent process
1008 self.handler_id += 1
1009
1010 except (self.Terminate, SystemExit, KeyboardInterrupt):
1011 self.msg("In exit")
1012 break
1013 except Exception:
1014 self.msg("handler exception: %s", str(exc))
1015 self.vmsg("exception", exc_info=True)
1016
1017 finally:
1018 if startsock:
1019 startsock.close()
1020 finally:
1021 # Close listen port
1022 self.vmsg("Closing socket listening at %s:%s",
1023 self.listen_host, self.listen_port)
1024 lsock.close()
1025
1026 # Restore signals
1027 for sig, func in original_signals.items():
1028 signal.signal(sig, func)
1029
1030