]> git.proxmox.com Git - mirror_novnc.git/blame - utils/websocket.py
Merge pull request #444 from samhed/extdesktop
[mirror_novnc.git] / utils / websocket.py
CommitLineData
e9155818 1#!/usr/bin/env python
95ef30a1
JM
2
3'''
4Python WebSocket library with support for "wss://" encryption.
8c305c60 5Copyright 2011 Joel Martin
31407abc 6Licensed under LGPL version 3 (see docs/LICENSE.LGPL-3)
95ef30a1 7
7d146027 8Supports following protocol versions:
c3c51ed3 9 - http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-07
48f26d79 10 - http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10
c3c51ed3 11 - http://tools.ietf.org/html/rfc6455
7d146027 12
95ef30a1
JM
13You can make a cert/key with openssl using:
14openssl req -new -x509 -days 365 -nodes -out self.pem -keyout self.pem
15as taken from http://docs.python.org/dev/library/ssl.html#certificates
16
17'''
18
082027dc 19import os, sys, time, errno, signal, socket, select, logging
f2d85676 20import array, struct
95ef30a1 21from base64 import b64encode, b64decode
8c305c60
JM
22
23# Imports that vary by python version
f2d85676
JM
24
25# python 3.0 differences
8c305c60 26if sys.hexversion > 0x3000000:
8c305c60
JM
27 b2s = lambda buf: buf.decode('latin_1')
28 s2b = lambda s: s.encode('latin_1')
1e508715 29 s2a = lambda s: s
8c305c60 30else:
f2d85676
JM
31 b2s = lambda buf: buf # No-op
32 s2b = lambda s: s # No-op
1e508715 33 s2a = lambda s: [ord(c) for c in s]
f2d85676
JM
34try: from io import StringIO
35except: from cStringIO import StringIO
36try: from http.server import SimpleHTTPRequestHandler
37except: from SimpleHTTPServer import SimpleHTTPRequestHandler
f2d85676
JM
38
39# python 2.6 differences
c3c51ed3
JM
40try: from hashlib import sha1
41except: from sha import sha as sha1
f2d85676
JM
42
43# python 2.5 differences
44try:
45 from struct import pack, unpack_from
46except:
47 from struct import pack
48 def unpack_from(fmt, buf, offset=0):
49 slice = buffer(buf, offset, struct.calcsize(fmt))
50 return struct.unpack(fmt, slice)
8c305c60
JM
51
52# Degraded functionality if these imports are missing
c3c51ed3
JM
53for mod, msg in [('numpy', 'HyBi protocol will be slower'),
54 ('ssl', 'TLS/SSL/wss is disabled'),
55 ('multiprocessing', 'Multi-Processing is disabled'),
56 ('resource', 'daemonizing is disabled')]:
8c305c60
JM
57 try:
58 globals()[mod] = __import__(mod)
59 except ImportError:
60 globals()[mod] = None
c3c51ed3 61 print("WARNING: no '%s' module, %s" % (mod, msg))
082027dc 62
f2d85676
JM
63if multiprocessing and sys.platform == 'win32':
64 # make sockets pickle-able/inheritable
65 import multiprocessing.reduction
8c305c60 66
95ef30a1 67
082027dc 68# HTTP handler with WebSocket upgrade support
69class WebSocketRequestHandler(SimpleHTTPRequestHandler):
6a883409 70 """
082027dc 71 WebSocket Request Handler Class, derived from SimpleHTTPRequestHandler.
72 Must be sub-classed with new_websocket_client method definition.
73 The request handler can be configured by setting optional
74 attributes on the server object:
75
76 * only_upgrade: If true, SimpleHTTPRequestHandler will not be enabled,
77 only websocket is allowed.
78 * verbose: If true, verbose logging is activated.
79 * daemon: Running as daemon, do not write to console etc
80 * record: Record raw frame data as JavaScript array into specified filename
81 * run_once: Handle a single request
82 * handler_id: A sequence number for this connection, appended to record filename
6a883409 83 """
7d146027
JM
84 buffer_size = 65536
85
7d146027 86 GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
95ef30a1 87
082027dc 88 server_version = "WebSockify"
6a883409 89
082027dc 90 protocol_version = "HTTP/1.1"
6a883409 91
c3900110
JM
92 # An exception while the WebSocket client was connected
93 class CClose(Exception):
94 pass
95
082027dc 96 def __init__(self, req, addr, server):
97 # Retrieve a few configuration variables from the server
98 self.only_upgrade = getattr(server, "only_upgrade", False)
99 self.verbose = getattr(server, "verbose", False)
100 self.daemon = getattr(server, "daemon", False)
101 self.record = getattr(server, "record", False)
102 self.run_once = getattr(server, "run_once", False)
103 self.rec = None
104 self.handler_id = getattr(server, "handler_id", False)
105 self.file_only = getattr(server, "file_only", False)
106 self.traffic = getattr(server, "traffic", False)
6a883409 107
082027dc 108 self.logger = getattr(server, "logger", None)
109 if self.logger is None:
110 self.logger = WebSocketServer.get_logger()
6a883409 111
082027dc 112 SimpleHTTPRequestHandler.__init__(self, req, addr, server)
6a883409 113
1e508715 114 @staticmethod
49578da4
JM
115 def unmask(buf, hlen, plen):
116 pstart = hlen + 4
117 pend = pstart + plen
1e508715
JM
118 if numpy:
119 b = c = s2b('')
49578da4 120 if plen >= 4:
1e508715 121 mask = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
49578da4 122 offset=hlen, count=1)
1e508715 123 data = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
49578da4 124 offset=pstart, count=int(plen / 4))
1e508715
JM
125 #b = numpy.bitwise_xor(data, mask).data
126 b = numpy.bitwise_xor(data, mask).tostring()
127
49578da4 128 if plen % 4:
082027dc 129 #self.msg("Partial unmask")
1e508715 130 mask = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
49578da4 131 offset=hlen, count=(plen % 4))
1e508715 132 data = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
49578da4
JM
133 offset=pend - (plen % 4),
134 count=(plen % 4))
1e508715
JM
135 c = numpy.bitwise_xor(data, mask).tostring()
136 return b + c
137 else:
138 # Slower fallback
49578da4 139 mask = buf[hlen:hlen+4]
1e508715 140 data = array.array('B')
49578da4 141 mask = s2a(mask)
1e508715
JM
142 data.fromstring(buf[pstart:pend])
143 for i in range(len(data)):
144 data[i] ^= mask[i % 4]
145 return data.tostring()
146
6a883409 147 @staticmethod
7d146027
JM
148 def encode_hybi(buf, opcode, base64=False):
149 """ Encode a HyBi style WebSocket frame.
150 Optional opcode:
151 0x0 - continuation
152 0x1 - text frame (base64 encode buf)
153 0x2 - binary frame (use raw buf)
154 0x8 - connection close
155 0x9 - ping
156 0xA - pong
157 """
158 if base64:
159 buf = b64encode(buf)
160
161 b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
162 payload_len = len(buf)
163 if payload_len <= 125:
f2d85676 164 header = pack('>BB', b1, payload_len)
fa74a6e6 165 elif payload_len > 125 and payload_len < 65536:
f2d85676 166 header = pack('>BBH', b1, 126, payload_len)
7d146027 167 elif payload_len >= 65536:
f2d85676 168 header = pack('>BBQ', b1, 127, payload_len)
7d146027 169
082027dc 170 #self.msg("Encoded: %s", repr(header + buf))
7d146027 171
8c305c60 172 return header + buf, len(header), 0
6a883409
JM
173
174 @staticmethod
082027dc 175 def decode_hybi(buf, base64=False, logger=None):
7d146027
JM
176 """ Decode HyBi style WebSocket packets.
177 Returns:
178 {'fin' : 0_or_1,
179 'opcode' : number,
49578da4 180 'masked' : boolean,
8c305c60 181 'hlen' : header_bytes_number,
7d146027
JM
182 'length' : payload_bytes_number,
183 'payload' : decoded_buffer,
184 'left' : bytes_left_number,
185 'close_code' : number,
186 'close_reason' : string}
187 """
188
8c305c60
JM
189 f = {'fin' : 0,
190 'opcode' : 0,
49578da4 191 'masked' : False,
8c305c60
JM
192 'hlen' : 2,
193 'length' : 0,
194 'payload' : None,
195 'left' : 0,
c3900110
JM
196 'close_code' : 1000,
197 'close_reason' : ''}
7d146027 198
082027dc 199 if logger is None:
200 logger = WebSocketServer.get_logger()
201
7d146027 202 blen = len(buf)
8c305c60 203 f['left'] = blen
7d146027 204
8c305c60
JM
205 if blen < f['hlen']:
206 return f # Incomplete frame header
7d146027 207
f2d85676 208 b1, b2 = unpack_from(">BB", buf)
8c305c60
JM
209 f['opcode'] = b1 & 0x0f
210 f['fin'] = (b1 & 0x80) >> 7
49578da4 211 f['masked'] = (b2 & 0x80) >> 7
7d146027 212
8c305c60 213 f['length'] = b2 & 0x7f
7d146027 214
8c305c60
JM
215 if f['length'] == 126:
216 f['hlen'] = 4
217 if blen < f['hlen']:
218 return f # Incomplete frame header
f2d85676 219 (f['length'],) = unpack_from('>xxH', buf)
8c305c60
JM
220 elif f['length'] == 127:
221 f['hlen'] = 10
222 if blen < f['hlen']:
223 return f # Incomplete frame header
f2d85676 224 (f['length'],) = unpack_from('>xxQ', buf)
7d146027 225
49578da4 226 full_len = f['hlen'] + f['masked'] * 4 + f['length']
7d146027
JM
227
228 if blen < full_len: # Incomplete frame
8c305c60 229 return f # Incomplete frame header
7d146027
JM
230
231 # Number of bytes that are part of the next frame(s)
8c305c60 232 f['left'] = blen - full_len
7d146027
JM
233
234 # Process 1 frame
49578da4 235 if f['masked']:
7d146027 236 # unmask payload
082027dc 237 f['payload'] = WebSocketRequestHandler.unmask(buf, f['hlen'],
49578da4 238 f['length'])
6a883409 239 else:
082027dc 240 logger.debug("Unmasked frame: %s" % repr(buf))
49578da4 241 f['payload'] = buf[(f['hlen'] + f['masked'] * 4):full_len]
7d146027 242
8c305c60 243 if base64 and f['opcode'] in [1, 2]:
7d146027 244 try:
8c305c60 245 f['payload'] = b64decode(f['payload'])
7d146027 246 except:
082027dc 247 logger.exception("Exception while b64decoding buffer: %s" %
248 (repr(buf)))
7d146027
JM
249 raise
250
8c305c60
JM
251 if f['opcode'] == 0x08:
252 if f['length'] >= 2:
53dfab7f 253 f['close_code'] = unpack_from(">H", f['payload'])[0]
8c305c60
JM
254 if f['length'] > 3:
255 f['close_reason'] = f['payload'][2:]
7d146027 256
8c305c60 257 return f
7d146027 258
6a883409 259
6a883409 260 #
082027dc 261 # WebSocketRequestHandler logging/output functions
6a883409
JM
262 #
263
082027dc 264 def print_traffic(self, token="."):
265 """ Show traffic flow mode. """
266 if self.traffic:
6a883409
JM
267 sys.stdout.write(token)
268 sys.stdout.flush()
269
082027dc 270 def msg(self, msg, *args, **kwargs):
6a883409 271 """ Output message with handler_id prefix. """
082027dc 272 prefix = "% 3d: " % self.handler_id
273 self.logger.log(logging.INFO, "%s%s" % (prefix, msg), *args, **kwargs)
6a883409 274
082027dc 275 def vmsg(self, msg, *args, **kwargs):
276 """ Same as msg() but as debug. """
277 prefix = "% 3d: " % self.handler_id
278 self.logger.log(logging.DEBUG, "%s%s" % (prefix, msg), *args, **kwargs)
279
280 def warn(self, msg, *args, **kwargs):
281 """ Same as msg() but as warning. """
282 prefix = "% 3d: " % self.handler_id
283 self.logger.log(logging.WARN, "%s%s" % (prefix, msg), *args, **kwargs)
6a883409
JM
284
285 #
082027dc 286 # Main WebSocketRequestHandler methods
6a883409 287 #
7d146027
JM
288 def send_frames(self, bufs=None):
289 """ Encode and send WebSocket frames. Any frames already
290 queued will be sent first. If buf is not set then only queued
291 frames will be sent. Returns the number of pending frames that
292 could not be fully sent. If returned pending frames is greater
293 than 0, then the caller should call again when the socket is
294 ready. """
295
8c305c60
JM
296 tdelta = int(time.time()*1000) - self.start_time
297
7d146027
JM
298 if bufs:
299 for buf in bufs:
c3c51ed3
JM
300 if self.base64:
301 encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=1, base64=True)
7d146027 302 else:
c3c51ed3 303 encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=2, base64=False)
8c305c60
JM
304
305 if self.rec:
306 self.rec.write("%s,\n" %
307 repr("{%s{" % tdelta
49578da4 308 + encbuf[lenhead:len(encbuf)-lentail]))
8c305c60
JM
309
310 self.send_parts.append(encbuf)
7d146027
JM
311
312 while self.send_parts:
313 # Send pending frames
314 buf = self.send_parts.pop(0)
082027dc 315 sent = self.request.send(buf)
7d146027
JM
316
317 if sent == len(buf):
082027dc 318 self.print_traffic("<")
7d146027 319 else:
082027dc 320 self.print_traffic("<.")
7d146027
JM
321 self.send_parts.insert(0, buf[sent:])
322 break
323
324 return len(self.send_parts)
325
326 def recv_frames(self):
327 """ Receive and decode WebSocket frames.
328
329 Returns:
330 (bufs_list, closed_string)
331 """
332
333 closed = False
334 bufs = []
8c305c60 335 tdelta = int(time.time()*1000) - self.start_time
7d146027 336
082027dc 337 buf = self.request.recv(self.buffer_size)
7d146027 338 if len(buf) == 0:
c3900110 339 closed = {'code': 1000, 'reason': "Client closed abruptly"}
7d146027
JM
340 return bufs, closed
341
342 if self.recv_part:
343 # Add partially received frames to current read buffer
344 buf = self.recv_part + buf
345 self.recv_part = None
346
347 while buf:
082027dc 348 frame = self.decode_hybi(buf, base64=self.base64,
349 logger=self.logger)
350 #self.msg("Received buf: %s, frame: %s", repr(buf), frame)
c3c51ed3
JM
351
352 if frame['payload'] == None:
353 # Incomplete/partial frame
082027dc 354 self.print_traffic("}.")
c3c51ed3
JM
355 if frame['left'] > 0:
356 self.recv_part = buf[-frame['left']:]
357 break
7d146027 358 else:
c3c51ed3
JM
359 if frame['opcode'] == 0x8: # connection close
360 closed = {'code': frame['close_code'],
361 'reason': frame['close_reason']}
7d146027
JM
362 break
363
082027dc 364 self.print_traffic("}")
7d146027 365
8c305c60
JM
366 if self.rec:
367 start = frame['hlen']
368 end = frame['hlen'] + frame['length']
49578da4 369 if frame['masked']:
082027dc 370 recbuf = WebSocketRequestHandler.unmask(buf, frame['hlen'],
49578da4
JM
371 frame['length'])
372 else:
373 recbuf = buf[frame['hlen']:frame['hlen'] +
374 frame['length']]
8c305c60 375 self.rec.write("%s,\n" %
49578da4 376 repr("}%s}" % tdelta + recbuf))
8c305c60
JM
377
378
7d146027
JM
379 bufs.append(frame['payload'])
380
381 if frame['left']:
382 buf = buf[-frame['left']:]
383 else:
384 buf = ''
385
386 return bufs, closed
387
c3900110 388 def send_close(self, code=1000, reason=''):
7d146027
JM
389 """ Send a WebSocket orderly close frame. """
390
c3c51ed3
JM
391 msg = pack(">H%ds" % len(reason), code, reason)
392 buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False)
082027dc 393 self.request.send(buf)
6a883409 394
082027dc 395 def do_websocket_handshake(self):
396 h = self.headers
204675c8
JM
397
398 prot = 'WebSocket-Protocol'
399 protocols = h.get('Sec-'+prot, h.get(prot, '')).split(',')
400
401 ver = h.get('Sec-WebSocket-Version')
402 if ver:
403 # HyBi/IETF version of the protocol
404
405 # HyBi-07 report version 7
406 # HyBi-08 - HyBi-12 report version 8
407 # HyBi-13 reports version 13
408 if ver in ['7', '8', '13']:
409 self.version = "hybi-%02d" % int(ver)
410 else:
082027dc 411 self.send_error(400, "Unsupported protocol version %s" % ver)
412 return False
204675c8
JM
413
414 key = h['Sec-WebSocket-Key']
415
416 # Choose binary if client supports it
417 if 'binary' in protocols:
418 self.base64 = False
419 elif 'base64' in protocols:
420 self.base64 = True
421 else:
082027dc 422 self.send_error(400, "Client must support 'binary' or 'base64' protocol")
423 return False
204675c8
JM
424
425 # Generate the hash value for the accept header
426 accept = b64encode(sha1(s2b(key + self.GUID)).digest())
427
082027dc 428 self.send_response(101, "Switching Protocols")
429 self.send_header("Upgrade", "websocket")
430 self.send_header("Connection", "Upgrade")
431 self.send_header("Sec-WebSocket-Accept", b2s(accept))
204675c8 432 if self.base64:
082027dc 433 self.send_header("Sec-WebSocket-Protocol", "base64")
204675c8 434 else:
082027dc 435 self.send_header("Sec-WebSocket-Protocol", "binary")
436 self.end_headers()
437 return True
438 else:
439 self.send_error(400, "Missing Sec-WebSocket-Version header. Hixie protocols not supported.")
440
441 return False
442
443 def handle_websocket(self):
444 """Upgrade a connection to Websocket, if requested. If this succeeds,
445 new_websocket_client() will be called. Otherwise, False is returned.
446 """
447 if (self.headers.get('upgrade') and
448 self.headers.get('upgrade').lower() == 'websocket'):
449
450 if not self.do_websocket_handshake():
451 return False
452
453 # Indicate to server that a Websocket upgrade was done
454 self.server.ws_connection = True
455 # Initialize per client settings
456 self.send_parts = []
457 self.recv_part = None
458 self.start_time = int(time.time()*1000)
459
460 # client_address is empty with, say, UNIX domain sockets
461 client_addr = ""
462 is_ssl = False
463 try:
464 client_addr = self.client_address[0]
465 is_ssl = self.client_address[2]
466 except IndexError:
467 pass
468
469 if is_ssl:
470 self.stype = "SSL/TLS (wss://)"
471 else:
472 self.stype = "Plain non-SSL (ws://)"
473
474 self.log_message("%s: %s WebSocket connection", client_addr,
475 self.stype)
476 self.log_message("%s: Version %s, base64: '%s'", client_addr,
477 self.version, self.base64)
478 if self.path != '/':
479 self.log_message("%s: Path: '%s'", client_addr, self.path)
480
481 if self.record:
482 # Record raw frame data as JavaScript array
483 fname = "%s.%s" % (self.record,
484 self.handler_id)
485 self.log_message("opening record file: %s", fname)
486 self.rec = open(fname, 'w+')
487 encoding = "binary"
488 if self.base64: encoding = "base64"
489 self.rec.write("var VNC_frame_encoding = '%s';\n"
490 % encoding)
491 self.rec.write("var VNC_frame_data = [\n")
492
493 try:
494 self.new_websocket_client()
495 except self.CClose:
496 # Close the client
497 _, exc, _ = sys.exc_info()
498 self.send_close(exc.args[0], exc.args[1])
499 return True
500 else:
501 return False
502
503 def do_GET(self):
504 """Handle GET request. Calls handle_websocket(). If unsuccessful,
505 and web server is enabled, SimpleHTTPRequestHandler.do_GET will be called."""
506 if not self.handle_websocket():
507 if self.only_upgrade:
508 self.send_error(405, "Method Not Allowed")
509 else:
510 SimpleHTTPRequestHandler.do_GET(self)
511
512 def list_directory(self, path):
513 if self.file_only:
514 self.send_error(404, "No such file")
515 else:
516 return SimpleHTTPRequestHandler.list_directory(self, path)
204675c8 517
082027dc 518 def new_websocket_client(self):
519 """ Do something with a WebSockets client connection. """
520 raise Exception("WebSocketRequestHandler.new_websocket_client() must be overloaded")
521
522 def do_HEAD(self):
523 if self.only_upgrade:
524 self.send_error(405, "Method Not Allowed")
525 else:
526 SimpleHTTPRequestHandler.do_HEAD(self)
527
528 def finish(self):
529 if self.rec:
530 self.rec.write("'EOF'];\n")
531 self.rec.close()
532
533 def handle(self):
534 # When using run_once, we have a single process, so
535 # we cannot loop in BaseHTTPRequestHandler.handle; we
536 # must return and handle new connections
537 if self.run_once:
538 self.handle_one_request()
204675c8 539 else:
082027dc 540 SimpleHTTPRequestHandler.handle(self)
541
542 def log_request(self, code='-', size='-'):
543 if self.verbose:
544 SimpleHTTPRequestHandler.log_request(self, code, size)
545
546
547class WebSocketServer(object):
548 """
549 WebSockets server class.
550 As an alternative, the standard library SocketServer can be used
551 """
204675c8 552
082027dc 553 policy_response = """<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>\n"""
554 log_prefix = "websocket"
555
556 # An exception before the WebSocket connection was established
557 class EClose(Exception):
558 pass
204675c8 559
082027dc 560 class Terminate(Exception):
561 pass
562
563 def __init__(self, RequestHandlerClass, listen_host='',
564 listen_port=None, source_is_ipv6=False,
565 verbose=False, cert='', key='', ssl_only=None,
566 daemon=False, record='', web='',
567 file_only=False,
568 run_once=False, timeout=0, idle_timeout=0, traffic=False,
569 tcp_keepalive=True, tcp_keepcnt=None, tcp_keepidle=None,
570 tcp_keepintvl=None):
571
572 # settings
573 self.RequestHandlerClass = RequestHandlerClass
574 self.verbose = verbose
575 self.listen_host = listen_host
576 self.listen_port = listen_port
577 self.prefer_ipv6 = source_is_ipv6
578 self.ssl_only = ssl_only
579 self.daemon = daemon
580 self.run_once = run_once
581 self.timeout = timeout
582 self.idle_timeout = idle_timeout
583 self.traffic = traffic
584
585 self.launch_time = time.time()
586 self.ws_connection = False
587 self.handler_id = 1
588
589 self.logger = self.get_logger()
590 self.tcp_keepalive = tcp_keepalive
591 self.tcp_keepcnt = tcp_keepcnt
592 self.tcp_keepidle = tcp_keepidle
593 self.tcp_keepintvl = tcp_keepintvl
594
595 # Make paths settings absolute
596 self.cert = os.path.abspath(cert)
597 self.key = self.web = self.record = ''
598 if key:
599 self.key = os.path.abspath(key)
600 if web:
601 self.web = os.path.abspath(web)
602 if record:
603 self.record = os.path.abspath(record)
604
605 if self.web:
606 os.chdir(self.web)
607 self.only_upgrade = not self.web
608
609 # Sanity checks
610 if not ssl and self.ssl_only:
611 raise Exception("No 'ssl' module and SSL-only specified")
612 if self.daemon and not resource:
613 raise Exception("Module 'resource' required to daemonize")
614
615 # Show configuration
616 self.msg("WebSocket server settings:")
617 self.msg(" - Listen on %s:%s",
618 self.listen_host, self.listen_port)
619 self.msg(" - Flash security policy server")
620 if self.web:
621 self.msg(" - Web server. Web root: %s", self.web)
622 if ssl:
623 if os.path.exists(self.cert):
624 self.msg(" - SSL/TLS support")
625 if self.ssl_only:
626 self.msg(" - Deny non-SSL/TLS connections")
627 else:
628 self.msg(" - No SSL/TLS support (no cert file)")
629 else:
630 self.msg(" - No SSL/TLS support (no 'ssl' module)")
631 if self.daemon:
632 self.msg(" - Backgrounding (daemon)")
633 if self.record:
634 self.msg(" - Recording to '%s.*'", self.record)
635
636 #
637 # WebSocketServer static methods
638 #
639
640 @staticmethod
641 def get_logger():
642 return logging.getLogger("%s.%s" % (
643 WebSocketServer.log_prefix,
644 WebSocketServer.__class__.__name__))
645
646 @staticmethod
647 def socket(host, port=None, connect=False, prefer_ipv6=False,
648 unix_socket=None, use_ssl=False, tcp_keepalive=True,
649 tcp_keepcnt=None, tcp_keepidle=None, tcp_keepintvl=None):
650 """ Resolve a host (and optional port) to an IPv4 or IPv6
651 address. Create a socket. Bind to it if listen is set,
652 otherwise connect to it. Return the socket.
653 """
654 flags = 0
655 if host == '':
656 host = None
657 if connect and not (port or unix_socket):
658 raise Exception("Connect mode requires a port")
659 if use_ssl and not ssl:
660 raise Exception("SSL socket requested but Python SSL module not loaded.");
661 if not connect and use_ssl:
662 raise Exception("SSL only supported in connect mode (for now)")
663 if not connect:
664 flags = flags | socket.AI_PASSIVE
665
666 if not unix_socket:
667 addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM,
668 socket.IPPROTO_TCP, flags)
669 if not addrs:
670 raise Exception("Could not resolve host '%s'" % host)
671 addrs.sort(key=lambda x: x[0])
672 if prefer_ipv6:
673 addrs.reverse()
674 sock = socket.socket(addrs[0][0], addrs[0][1])
675
676 if tcp_keepalive:
677 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
678 if tcp_keepcnt:
679 sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT,
680 tcp_keepcnt)
681 if tcp_keepidle:
682 sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE,
683 tcp_keepidle)
684 if tcp_keepintvl:
685 sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL,
686 tcp_keepintvl)
687
688 if connect:
689 sock.connect(addrs[0][4])
690 if use_ssl:
691 sock = ssl.wrap_socket(sock)
692 else:
693 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
694 sock.bind(addrs[0][4])
695 sock.listen(100)
696 else:
697 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
698 sock.connect(unix_socket)
699
700 return sock
701
702 @staticmethod
703 def daemonize(keepfd=None, chdir='/'):
704 os.umask(0)
705 if chdir:
706 os.chdir(chdir)
707 else:
708 os.chdir('/')
709 os.setgid(os.getgid()) # relinquish elevations
710 os.setuid(os.getuid()) # relinquish elevations
711
712 # Double fork to daemonize
713 if os.fork() > 0: os._exit(0) # Parent exits
714 os.setsid() # Obtain new process group
715 if os.fork() > 0: os._exit(0) # Parent exits
716
717 # Signal handling
718 signal.signal(signal.SIGTERM, signal.SIG_IGN)
719 signal.signal(signal.SIGINT, signal.SIG_IGN)
720
721 # Close open files
722 maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
723 if maxfd == resource.RLIM_INFINITY: maxfd = 256
724 for fd in reversed(range(maxfd)):
725 try:
726 if fd != keepfd:
727 os.close(fd)
728 except OSError:
729 _, exc, _ = sys.exc_info()
730 if exc.errno != errno.EBADF: raise
731
732 # Redirect I/O to /dev/null
733 os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
734 os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
735 os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
204675c8 736
6a883409
JM
737 def do_handshake(self, sock, address):
738 """
739 do_handshake does the following:
740 - Peek at the first few bytes from the socket.
741 - If the connection is Flash policy request then answer it,
742 close the socket and return.
743 - If the connection is an HTTPS/SSL/TLS connection then SSL
744 wrap the socket.
745 - Read from the (possibly wrapped) socket.
746 - If we have received a HTTP GET request and the webserver
747 functionality is enabled, answer it, close the socket and
748 return.
749 - Assume we have a WebSockets connection, parse the client
750 handshake data.
751 - Send a WebSockets handshake server response.
752 - Return the socket for this WebSocket client.
753 """
fc003a13 754 ready = select.select([sock], [], [], 3)[0]
204675c8
JM
755
756
ec2b6140
JM
757 if not ready:
758 raise self.EClose("ignoring socket not ready")
759 # Peek, but do not read the data so that we have a opportunity
760 # to SSL wrap the socket first
6a883409 761 handshake = sock.recv(1024, socket.MSG_PEEK)
557efaf8 762 #self.msg("Handshake [%s]" % handshake)
6a883409
JM
763
764 if handshake == "":
765 raise self.EClose("ignoring empty handshake")
766
8c305c60 767 elif handshake.startswith(s2b("<policy-file-request/>")):
6a883409
JM
768 # Answer Flash policy request
769 handshake = sock.recv(1024)
8c305c60 770 sock.send(s2b(self.policy_response))
6a883409
JM
771 raise self.EClose("Sending flash policy response")
772
2fa565b3 773 elif handshake[0] in ("\x16", "\x80", 22, 128):
6a883409 774 # SSL wrap the connection
8c305c60
JM
775 if not ssl:
776 raise self.EClose("SSL connection but no 'ssl' module")
6a883409
JM
777 if not os.path.exists(self.cert):
778 raise self.EClose("SSL connection but '%s' not found"
779 % self.cert)
123e5e74 780 retsock = None
6a883409
JM
781 try:
782 retsock = ssl.wrap_socket(
783 sock,
784 server_side=True,
785 certfile=self.cert,
786 keyfile=self.key)
8c305c60
JM
787 except ssl.SSLError:
788 _, x, _ = sys.exc_info()
6a883409 789 if x.args[0] == ssl.SSL_ERROR_EOF:
0c4f4b59
JM
790 if len(x.args) > 1:
791 raise self.EClose(x.args[1])
792 else:
793 raise self.EClose("Got SSL_ERROR_EOF")
6a883409
JM
794 else:
795 raise
796
6a883409
JM
797 elif self.ssl_only:
798 raise self.EClose("non-SSL connection received but disallowed")
799
800 else:
801 retsock = sock
6a883409 802
082027dc 803 # If the address is like (host, port), we are extending it
804 # with a flag indicating SSL. Not many other options
805 # available...
806 if len(address) == 2:
807 address = (address[0], address[1], (retsock != sock))
6a883409 808
082027dc 809 self.RequestHandlerClass(retsock, address, self)
6a883409 810
082027dc 811 # Return the WebSockets socket which may be SSL wrapped
812 return retsock
1e508715 813
082027dc 814 #
815 # WebSocketServer logging/output functions
816 #
6a883409 817
082027dc 818 def msg(self, *args, **kwargs):
819 """ Output message as info """
820 self.logger.log(logging.INFO, *args, **kwargs)
6a883409 821
082027dc 822 def vmsg(self, *args, **kwargs):
823 """ Same as msg() but as debug. """
824 self.logger.log(logging.DEBUG, *args, **kwargs)
825
826 def warn(self, *args, **kwargs):
827 """ Same as msg() but as warning. """
828 self.logger.log(logging.WARN, *args, **kwargs)
6a883409
JM
829
830
f2538f33
JM
831 #
832 # Events that can/should be overridden in sub-classes
833 #
834 def started(self):
835 """ Called after WebSockets startup """
836 self.vmsg("WebSockets server started")
837
838 def poll(self):
839 """ Run periodically while waiting for connections. """
ec2b6140
JM
840 #self.vmsg("Running poll()")
841 pass
842
082027dc 843 def terminate(self):
844 raise self.Terminate()
845
846 def multiprocessing_SIGCHLD(self, sig, stack):
847 self.vmsg('Reaing zombies, active child count is %s', len(multiprocessing.active_children()))
848
8c305c60
JM
849 def fallback_SIGCHLD(self, sig, stack):
850 # Reap zombies when using os.fork() (python 2.4)
ec2b6140 851 self.vmsg("Got SIGCHLD, reaping zombies")
215ae8e5
JM
852 try:
853 result = os.waitpid(-1, os.WNOHANG)
854 while result[0]:
855 self.vmsg("Reaped child process %s" % result[0])
856 result = os.waitpid(-1, os.WNOHANG)
857 except (OSError):
858 pass
f2538f33 859
f2538f33
JM
860 def do_SIGINT(self, sig, stack):
861 self.msg("Got SIGINT, exiting")
082027dc 862 self.terminate()
863
864 def do_SIGTERM(self, sig, stack):
865 self.msg("Got SIGTERM, exiting")
866 self.terminate()
f2538f33 867
8c305c60
JM
868 def top_new_client(self, startsock, address):
869 """ Do something with a WebSockets client connection. """
204675c8 870 # handler process
082027dc 871 client = None
8c305c60
JM
872 try:
873 try:
082027dc 874 client = self.do_handshake(startsock, address)
8c305c60
JM
875 except self.EClose:
876 _, exc, _ = sys.exc_info()
877 # Connection was not a WebSockets connection
878 if exc.args[0]:
879 self.msg("%s: %s" % (address[0], exc.args[0]))
082027dc 880 except WebSocketServer.Terminate:
881 raise
8c305c60
JM
882 except Exception:
883 _, exc, _ = sys.exc_info()
884 self.msg("handler exception: %s" % str(exc))
082027dc 885 self.vmsg("exception", exc_info=True)
8c305c60 886 finally:
8c305c60 887
082027dc 888 if client and client != startsock:
c3900110
JM
889 # Close the SSL wrapped socket
890 # Original socket closed by caller
082027dc 891 client.close()
6a883409
JM
892
893 def start_server(self):
894 """
895 Daemonize if requested. Listen for for connections. Run
896 do_handshake() method for each connection. If the connection
082027dc 897 is a WebSockets client then call new_websocket_client() method (which must
f2538f33 898 be overridden) for each new client connection.
6a883409 899 """
082027dc 900 lsock = self.socket(self.listen_host, self.listen_port, False,
901 self.prefer_ipv6,
902 tcp_keepalive=self.tcp_keepalive,
903 tcp_keepcnt=self.tcp_keepcnt,
904 tcp_keepidle=self.tcp_keepidle,
905 tcp_keepintvl=self.tcp_keepintvl)
6a883409 906
6a883409 907 if self.daemon:
7d146027 908 self.daemonize(keepfd=lsock.fileno(), chdir=self.web)
6a883409 909
f2538f33
JM
910 self.started() # Some things need to happen after daemonizing
911
082027dc 912 # Allow override of signals
913 original_signals = {
914 signal.SIGINT: signal.getsignal(signal.SIGINT),
915 signal.SIGTERM: signal.getsignal(signal.SIGTERM),
916 signal.SIGCHLD: signal.getsignal(signal.SIGCHLD),
917 }
f2538f33 918 signal.signal(signal.SIGINT, self.do_SIGINT)
082027dc 919 signal.signal(signal.SIGTERM, self.do_SIGTERM)
f2d85676 920 if not multiprocessing:
8c305c60
JM
921 # os.fork() (python 2.4) child reaper
922 signal.signal(signal.SIGCHLD, self.fallback_SIGCHLD)
082027dc 923 else:
924 # make sure that _cleanup is called when children die
925 # by calling active_children on SIGCHLD
926 signal.signal(signal.SIGCHLD, self.multiprocessing_SIGCHLD)
6a883409 927
204675c8 928 last_active_time = self.launch_time
082027dc 929 try:
930 while True:
f2538f33 931 try:
66937e39 932 try:
082027dc 933 startsock = None
934 pid = err = 0
935 child_count = 0
936
937 if multiprocessing:
938 # Collect zombie child processes
939 child_count = len(multiprocessing.active_children())
940
941 time_elapsed = time.time() - self.launch_time
942 if self.timeout and time_elapsed > self.timeout:
943 self.msg('listener exit due to --timeout %s'
944 % self.timeout)
945 break
66937e39 946
082027dc 947 if self.idle_timeout:
948 idle_time = 0
949 if child_count == 0:
950 idle_time = time.time() - last_active_time
951 else:
952 idle_time = 0
953 last_active_time = time.time()
954
955 if idle_time > self.idle_timeout and child_count == 0:
956 self.msg('listener exit due to --idle-timeout %s'
957 % self.idle_timeout)
958 break
959
960 try:
961 self.poll()
962
963 ready = select.select([lsock], [], [], 1)[0]
964 if lsock in ready:
965 startsock, address = lsock.accept()
966 else:
967 continue
968 except self.Terminate:
66937e39 969 raise
082027dc 970 except Exception:
971 _, exc, _ = sys.exc_info()
972 if hasattr(exc, 'errno'):
973 err = exc.errno
974 elif hasattr(exc, 'args'):
975 err = exc.args[0]
976 else:
977 err = exc[0]
978 if err == errno.EINTR:
979 self.vmsg("Ignoring interrupted syscall")
980 continue
981 else:
982 raise
983
984 if self.run_once:
985 # Run in same process if run_once
8c305c60 986 self.top_new_client(startsock, address)
082027dc 987 if self.ws_connection :
988 self.msg('%s: exiting due to --run-once'
989 % address[0])
990 break
991 elif multiprocessing:
992 self.vmsg('%s: new handler Process' % address[0])
993 p = multiprocessing.Process(
994 target=self.top_new_client,
995 args=(startsock, address))
996 p.start()
997 # child will not return
998 else:
999 # python 2.4
1000 self.vmsg('%s: forking handler' % address[0])
1001 pid = os.fork()
1002 if pid == 0:
1003 # child handler process
1004 self.top_new_client(startsock, address)
1005 break # child process exits
1006
1007 # parent process
1008 self.handler_id += 1
1009
1010 except (self.Terminate, SystemExit, KeyboardInterrupt):
1011 self.msg("In exit")
1012 break
1013 except Exception:
1014 self.msg("handler exception: %s", str(exc))
1015 self.vmsg("exception", exc_info=True)
a0315ab1 1016
082027dc 1017 finally:
1018 if startsock:
1019 startsock.close()
1020 finally:
1021 # Close listen port
1022 self.vmsg("Closing socket listening at %s:%s",
1023 self.listen_host, self.listen_port)
1024 lsock.close()
96bc3d30 1025
082027dc 1026 # Restore signals
1027 for sig, func in original_signals.items():
1028 signal.signal(sig, func)
96bc3d30 1029
96bc3d30 1030