]> git.proxmox.com Git - mirror_ovs.git/blob - python/ovs/stream.py
c15be4b3e53cbc3c5908306d495e907cdf9c2dc2
[mirror_ovs.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import socket
18 import sys
19
20 import ovs.poller
21 import ovs.socket_util
22 import ovs.vlog
23
24 import six
25
26 try:
27 from OpenSSL import SSL
28 except ImportError:
29 SSL = None
30
31 if sys.platform == 'win32':
32 import ovs.winutils as winutils
33 import pywintypes
34 import win32event
35 import win32file
36 import win32pipe
37
38 vlog = ovs.vlog.Vlog("stream")
39
40
41 def stream_or_pstream_needs_probes(name):
42 """ True if the stream or pstream specified by 'name' needs periodic probes
43 to verify connectivity. For [p]streams which need probes, it can take a
44 long time to notice the connection was dropped. Returns False if probes
45 aren't needed, and None if 'name' is invalid"""
46
47 cls = Stream._find_method(name)
48 if cls:
49 return cls.needs_probes()
50 elif PassiveStream.is_valid_name(name):
51 return PassiveStream.needs_probes(name)
52 else:
53 return None
54
55
56 class Stream(object):
57 """Bidirectional byte stream. Unix domain sockets, tcp and ssl
58 are implemented."""
59
60 # States.
61 __S_CONNECTING = 0
62 __S_CONNECTED = 1
63 __S_DISCONNECTED = 2
64
65 # Kinds of events that one might wait for.
66 W_CONNECT = 0 # Connect complete (success or failure).
67 W_RECV = 1 # Data received.
68 W_SEND = 2 # Send buffer room available.
69
70 _SOCKET_METHODS = {}
71
72 _SSL_private_key_file = None
73 _SSL_certificate_file = None
74 _SSL_ca_cert_file = None
75
76 # Windows only
77 _write = None # overlapped for write operation
78 _read = None # overlapped for read operation
79 _write_pending = False
80 _read_pending = False
81 _retry_connect = False
82
83 @staticmethod
84 def register_method(method, cls):
85 Stream._SOCKET_METHODS[method + ":"] = cls
86
87 @staticmethod
88 def _find_method(name):
89 for method, cls in six.iteritems(Stream._SOCKET_METHODS):
90 if name.startswith(method):
91 return cls
92 return None
93
94 @staticmethod
95 def is_valid_name(name):
96 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
97 TYPE is a supported stream type ("unix:", "tcp:" and "ssl:"),
98 otherwise False."""
99 return bool(Stream._find_method(name))
100
101 def __init__(self, socket, name, status, pipe=None, is_server=False):
102 self.socket = socket
103 self.pipe = pipe
104 if sys.platform == 'win32':
105 if pipe is not None:
106 # Flag to check if fd is a server HANDLE. In the case of a
107 # server handle we have to issue a disconnect before closing
108 # the actual handle.
109 self._server = is_server
110 suffix = name.split(":", 1)[1]
111 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
112 self._pipename = winutils.get_pipe_name(suffix)
113 self._read = pywintypes.OVERLAPPED()
114 self._read.hEvent = winutils.get_new_event()
115 self._write = pywintypes.OVERLAPPED()
116 self._write.hEvent = winutils.get_new_event()
117 else:
118 self._wevent = winutils.get_new_event(bManualReset=False,
119 bInitialState=False)
120
121 self.name = name
122 if status == errno.EAGAIN:
123 self.state = Stream.__S_CONNECTING
124 elif status == 0:
125 self.state = Stream.__S_CONNECTED
126 else:
127 self.state = Stream.__S_DISCONNECTED
128
129 self.error = 0
130
131 # Default value of dscp bits for connection between controller and manager.
132 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
133 # in <netinet/ip.h> is used.
134 IPTOS_PREC_INTERNETCONTROL = 0xc0
135 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
136
137 @staticmethod
138 def open(name, dscp=DSCP_DEFAULT):
139 """Attempts to connect a stream to a remote peer. 'name' is a
140 connection name in the form "TYPE:ARGS", where TYPE is an active stream
141 class's name and ARGS are stream class-specific. The supported TYPEs
142 include "unix", "tcp", and "ssl".
143
144 Returns (error, stream): on success 'error' is 0 and 'stream' is the
145 new Stream, on failure 'error' is a positive errno value and 'stream'
146 is None.
147
148 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
149 and a new Stream. The connect() method can be used to check for
150 successful connection completion."""
151 cls = Stream._find_method(name)
152 if not cls:
153 return errno.EAFNOSUPPORT, None
154
155 suffix = name.split(":", 1)[1]
156 if name.startswith("unix:"):
157 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
158 if sys.platform == 'win32':
159 pipename = winutils.get_pipe_name(suffix)
160
161 if len(suffix) > 255:
162 # Return invalid argument if the name is too long
163 return errno.ENOENT, None
164
165 try:
166 # In case of "unix:" argument, the assumption is that
167 # there is a file created in the path (suffix).
168 open(suffix, 'r').close()
169 except:
170 return errno.ENOENT, None
171
172 try:
173 npipe = winutils.create_file(pipename)
174 try:
175 winutils.set_pipe_mode(npipe,
176 win32pipe.PIPE_READMODE_BYTE)
177 except pywintypes.error:
178 return errno.ENOENT, None
179 except pywintypes.error as e:
180 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
181 # Pipe is busy, set the retry flag to true and retry
182 # again during the connect function.
183 Stream.retry_connect = True
184 return 0, cls(None, name, errno.EAGAIN,
185 pipe=win32file.INVALID_HANDLE_VALUE,
186 is_server=False)
187 return errno.ENOENT, None
188 return 0, cls(None, name, 0, pipe=npipe, is_server=False)
189
190 error, sock = cls._open(suffix, dscp)
191 if error:
192 return error, None
193 else:
194 err = ovs.socket_util.check_connection_completion(sock)
195 if err == errno.EAGAIN or err == errno.EINPROGRESS:
196 status = errno.EAGAIN
197 err = 0
198 elif err == 0:
199 status = 0
200 else:
201 status = err
202 return err, cls(sock, name, status)
203
204 @staticmethod
205 def _open(suffix, dscp):
206 raise NotImplementedError("This method must be overrided by subclass")
207
208 @staticmethod
209 def open_block(error_stream, timeout=None):
210 """Blocks until a Stream completes its connection attempt, either
211 succeeding or failing, but no more than 'timeout' milliseconds.
212 (error, stream) should be the tuple returned by Stream.open().
213 Negative value of 'timeout' means infinite waiting.
214 Returns a tuple of the same form.
215
216 Typical usage:
217 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
218
219 # Py3 doesn't support tuple parameter unpacking - PEP 3113
220 error, stream = error_stream
221 if not error:
222 deadline = None
223 if timeout is not None and timeout >= 0:
224 deadline = ovs.timeval.msec() + timeout
225 while True:
226 error = stream.connect()
227 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
228 # WSAEWOULDBLOCK would be the equivalent on Windows
229 # for EAGAIN on Unix.
230 error = errno.EAGAIN
231 if error != errno.EAGAIN:
232 break
233 if deadline is not None and ovs.timeval.msec() > deadline:
234 error = errno.ETIMEDOUT
235 break
236 stream.run()
237 poller = ovs.poller.Poller()
238 stream.run_wait(poller)
239 stream.connect_wait(poller)
240 if deadline is not None:
241 poller.timer_wait_until(deadline)
242 poller.block()
243 if stream.socket is not None:
244 assert error != errno.EINPROGRESS
245
246 if error and stream:
247 stream.close()
248 stream = None
249 return error, stream
250
251 def close(self):
252 if self.socket is not None:
253 self.socket.close()
254 if self.pipe is not None:
255 if self._server:
256 # Flush the pipe to allow the client to read the pipe
257 # before disconnecting.
258 win32pipe.FlushFileBuffers(self.pipe)
259 win32pipe.DisconnectNamedPipe(self.pipe)
260 winutils.close_handle(self.pipe, vlog.warn)
261 winutils.close_handle(self._read.hEvent, vlog.warn)
262 winutils.close_handle(self._write.hEvent, vlog.warn)
263
264 def __scs_connecting(self):
265 if self.socket is not None:
266 retval = ovs.socket_util.check_connection_completion(self.socket)
267 assert retval != errno.EINPROGRESS
268 elif sys.platform == 'win32':
269 if self.retry_connect:
270 try:
271 self.pipe = winutils.create_file(self._pipename)
272 self._retry_connect = False
273 retval = 0
274 except pywintypes.error as e:
275 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
276 retval = errno.EAGAIN
277 else:
278 self._retry_connect = False
279 retval = errno.ENOENT
280 else:
281 # If retry_connect is false, it means it's already
282 # connected so we can set the value of retval to 0
283 retval = 0
284
285 if retval == 0:
286 self.state = Stream.__S_CONNECTED
287 elif retval != errno.EAGAIN:
288 self.state = Stream.__S_DISCONNECTED
289 self.error = retval
290
291 def connect(self):
292 """Tries to complete the connection on this stream. If the connection
293 is complete, returns 0 if the connection was successful or a positive
294 errno value if it failed. If the connection is still in progress,
295 returns errno.EAGAIN."""
296
297 if self.state == Stream.__S_CONNECTING:
298 self.__scs_connecting()
299
300 if self.state == Stream.__S_CONNECTING:
301 return errno.EAGAIN
302 elif self.state == Stream.__S_CONNECTED:
303 return 0
304 else:
305 assert self.state == Stream.__S_DISCONNECTED
306 return self.error
307
308 def recv(self, n):
309 """Tries to receive up to 'n' bytes from this stream. Returns a
310 (error, string) tuple:
311
312 - If successful, 'error' is zero and 'string' contains between 1
313 and 'n' bytes of data.
314
315 - On error, 'error' is a positive errno value.
316
317 - If the connection has been closed in the normal fashion or if 'n'
318 is 0, the tuple is (0, "").
319
320 The recv function will not block waiting for data to arrive. If no
321 data have been received, it returns (errno.EAGAIN, "") immediately."""
322
323 retval = self.connect()
324 if retval != 0:
325 return (retval, "")
326 elif n == 0:
327 return (0, "")
328
329 if sys.platform == 'win32' and self.socket is None:
330 return self.__recv_windows(n)
331
332 try:
333 return (0, self.socket.recv(n))
334 except socket.error as e:
335 return (ovs.socket_util.get_exception_errno(e), "")
336
337 def __recv_windows(self, n):
338 if self._read_pending:
339 try:
340 nBytesRead = winutils.get_overlapped_result(self.pipe,
341 self._read,
342 False)
343 self._read_pending = False
344 except pywintypes.error as e:
345 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
346 # The operation is still pending, try again
347 self._read_pending = True
348 return (errno.EAGAIN, "")
349 elif e.winerror in winutils.pipe_disconnected_errors:
350 # If the pipe was disconnected, return 0.
351 return (0, "")
352 else:
353 return (errno.EINVAL, "")
354 else:
355 (errCode, self._read_buffer) = winutils.read_file(self.pipe,
356 n,
357 self._read)
358 if errCode:
359 if errCode == winutils.winerror.ERROR_IO_PENDING:
360 self._read_pending = True
361 return (errno.EAGAIN, "")
362 elif errCode in winutils.pipe_disconnected_errors:
363 # If the pipe was disconnected, return 0.
364 return (0, "")
365 else:
366 return (errCode, "")
367
368 try:
369 nBytesRead = winutils.get_overlapped_result(self.pipe,
370 self._read,
371 False)
372 winutils.win32event.SetEvent(self._read.hEvent)
373 except pywintypes.error as e:
374 if e.winerror in winutils.pipe_disconnected_errors:
375 # If the pipe was disconnected, return 0.
376 return (0, "")
377 else:
378 return (e.winerror, "")
379
380 recvBuffer = self._read_buffer[:nBytesRead]
381 # recvBuffer will have the type memoryview in Python3.
382 # We can use bytes to convert it to type bytes which works on
383 # both Python2 and Python3.
384 return (0, bytes(recvBuffer))
385
386 def send(self, buf):
387 """Tries to send 'buf' on this stream.
388
389 If successful, returns the number of bytes sent, between 1 and
390 len(buf). 0 is only a valid return value if len(buf) is 0.
391
392 On error, returns a negative errno value.
393
394 Will not block. If no bytes can be immediately accepted for
395 transmission, returns -errno.EAGAIN immediately."""
396
397 retval = self.connect()
398 if retval != 0:
399 return -retval
400 elif len(buf) == 0:
401 return 0
402
403 # We must have bytes for sending.
404 if isinstance(buf, six.text_type):
405 buf = buf.encode('utf-8')
406
407 if sys.platform == 'win32' and self.socket is None:
408 return self.__send_windows(buf)
409
410 try:
411 return self.socket.send(buf)
412 except socket.error as e:
413 return -ovs.socket_util.get_exception_errno(e)
414
415 def __send_windows(self, buf):
416 if self._write_pending:
417 try:
418 nBytesWritten = winutils.get_overlapped_result(self.pipe,
419 self._write,
420 False)
421 self._write_pending = False
422 except pywintypes.error as e:
423 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
424 # The operation is still pending, try again
425 self._read_pending = True
426 return -errno.EAGAIN
427 elif e.winerror in winutils.pipe_disconnected_errors:
428 # If the pipe was disconnected, return connection reset.
429 return -errno.ECONNRESET
430 else:
431 return -errno.EINVAL
432 else:
433 (errCode, nBytesWritten) = winutils.write_file(self.pipe,
434 buf,
435 self._write)
436 if errCode:
437 if errCode == winutils.winerror.ERROR_IO_PENDING:
438 self._write_pending = True
439 return -errno.EAGAIN
440 if (not nBytesWritten and
441 errCode in winutils.pipe_disconnected_errors):
442 # If the pipe was disconnected, return connection reset.
443 return -errno.ECONNRESET
444 return nBytesWritten
445
446 def run(self):
447 pass
448
449 def run_wait(self, poller):
450 pass
451
452 def wait(self, poller, wait):
453 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
454
455 if self.state == Stream.__S_DISCONNECTED:
456 poller.immediate_wake()
457 return
458
459 if self.state == Stream.__S_CONNECTING:
460 wait = Stream.W_CONNECT
461
462 if sys.platform == 'win32':
463 self.__wait_windows(poller, wait)
464 return
465
466 if wait == Stream.W_RECV:
467 poller.fd_wait(self.socket, ovs.poller.POLLIN)
468 else:
469 poller.fd_wait(self.socket, ovs.poller.POLLOUT)
470
471 def __wait_windows(self, poller, wait):
472 if self.socket is not None:
473 if wait == Stream.W_RECV:
474 mask = (win32file.FD_READ |
475 win32file.FD_ACCEPT |
476 win32file.FD_CLOSE)
477 event = ovs.poller.POLLIN
478 else:
479 mask = (win32file.FD_WRITE |
480 win32file.FD_CONNECT |
481 win32file.FD_CLOSE)
482 event = ovs.poller.POLLOUT
483
484 try:
485 win32file.WSAEventSelect(self.socket,
486 self._wevent,
487 mask)
488 except pywintypes.error as e:
489 vlog.err("failed to associate events with socket: %s"
490 % e.strerror)
491 poller.fd_wait(self._wevent, event)
492 else:
493 if wait == Stream.W_RECV:
494 if self._read:
495 poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
496 elif wait == Stream.W_SEND:
497 if self._write:
498 poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
499 elif wait == Stream.W_CONNECT:
500 return
501
502 def connect_wait(self, poller):
503 self.wait(poller, Stream.W_CONNECT)
504
505 def recv_wait(self, poller):
506 self.wait(poller, Stream.W_RECV)
507
508 def send_wait(self, poller):
509 self.wait(poller, Stream.W_SEND)
510
511 def __del__(self):
512 # Don't delete the file: we might have forked.
513 if self.socket is not None:
514 self.socket.close()
515 if self.pipe is not None:
516 # Check if there are any remaining valid handles and close them
517 if self.pipe:
518 winutils.close_handle(self.pipe)
519 if self._read.hEvent:
520 winutils.close_handle(self._read.hEvent)
521 if self._write.hEvent:
522 winutils.close_handle(self._write.hEvent)
523
524 @staticmethod
525 def ssl_set_private_key_file(file_name):
526 Stream._SSL_private_key_file = file_name
527
528 @staticmethod
529 def ssl_set_certificate_file(file_name):
530 Stream._SSL_certificate_file = file_name
531
532 @staticmethod
533 def ssl_set_ca_cert_file(file_name):
534 Stream._SSL_ca_cert_file = file_name
535
536
537 class PassiveStream(object):
538 # Windows only
539 connect = None # overlapped for read operation
540 connect_pending = False
541
542 @staticmethod
543 def needs_probes(name):
544 return False if name.startswith("punix:") else True
545
546 @staticmethod
547 def is_valid_name(name):
548 """Returns True if 'name' is a passive stream name in the form
549 "TYPE:ARGS" and TYPE is a supported passive stream type (currently
550 "punix:" or "ptcp"), otherwise False."""
551 return name.startswith("punix:") | name.startswith("ptcp:")
552
553 def __init__(self, sock, name, bind_path, pipe=None):
554 self.name = name
555 self.pipe = pipe
556 self.socket = sock
557 if pipe is not None:
558 self.connect = pywintypes.OVERLAPPED()
559 self.connect.hEvent = winutils.get_new_event()
560 self.connect_pending = False
561 suffix = name.split(":", 1)[1]
562 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
563 self._pipename = winutils.get_pipe_name(suffix)
564
565 self.bind_path = bind_path
566
567 @staticmethod
568 def open(name):
569 """Attempts to start listening for remote stream connections. 'name'
570 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
571 stream class's name and ARGS are stream class-specific. Currently the
572 supported values for TYPE are "punix" and "ptcp".
573
574 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
575 new PassiveStream, on failure 'error' is a positive errno value and
576 'pstream' is None."""
577 if not PassiveStream.is_valid_name(name):
578 return errno.EAFNOSUPPORT, None
579
580 bind_path = name[6:]
581 if name.startswith("punix:"):
582 bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
583 if sys.platform != 'win32':
584 error, sock = ovs.socket_util.make_unix_socket(
585 socket.SOCK_STREAM, True, bind_path, None)
586 if error:
587 return error, None
588 else:
589 # Branch used only on Windows
590 try:
591 open(bind_path, 'w').close()
592 except:
593 return errno.ENOENT, None
594
595 pipename = winutils.get_pipe_name(bind_path)
596 if len(pipename) > 255:
597 # Return invalid argument if the name is too long
598 return errno.ENOENT, None
599
600 npipe = winutils.create_named_pipe(pipename)
601 if not npipe:
602 return errno.ENOENT, None
603 return 0, PassiveStream(None, name, bind_path, pipe=npipe)
604
605 elif name.startswith("ptcp:"):
606 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
607 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
608 remote = name.split(':')
609 sock.bind((remote[1], int(remote[2])))
610
611 else:
612 raise Exception('Unknown connection string')
613
614 try:
615 sock.listen(10)
616 except socket.error as e:
617 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
618 sock.close()
619 return e.error, None
620
621 return 0, PassiveStream(sock, name, bind_path)
622
623 def close(self):
624 """Closes this PassiveStream."""
625 if self.socket is not None:
626 self.socket.close()
627 if self.pipe is not None:
628 winutils.close_handle(self.pipe, vlog.warn)
629 winutils.close_handle(self.connect.hEvent, vlog.warn)
630 if self.bind_path is not None:
631 ovs.fatal_signal.unlink_file_now(self.bind_path)
632 self.bind_path = None
633
634 def accept(self):
635 """Tries to accept a new connection on this passive stream. Returns
636 (error, stream): if successful, 'error' is 0 and 'stream' is the new
637 Stream object, and on failure 'error' is a positive errno value and
638 'stream' is None.
639
640 Will not block waiting for a connection. If no connection is ready to
641 be accepted, returns (errno.EAGAIN, None) immediately."""
642 if sys.platform == 'win32' and self.socket is None:
643 return self.__accept_windows()
644 while True:
645 try:
646 sock, addr = self.socket.accept()
647 ovs.socket_util.set_nonblocking(sock)
648 if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
649 return 0, Stream(sock, "unix:%s" % addr, 0)
650 return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
651 str(addr[1])), 0)
652 except socket.error as e:
653 error = ovs.socket_util.get_exception_errno(e)
654 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
655 # WSAEWOULDBLOCK would be the equivalent on Windows
656 # for EAGAIN on Unix.
657 error = errno.EAGAIN
658 if error != errno.EAGAIN:
659 # XXX rate-limit
660 vlog.dbg("accept: %s" % os.strerror(error))
661 return error, None
662
663 def __accept_windows(self):
664 if self.connect_pending:
665 try:
666 winutils.get_overlapped_result(self.pipe, self.connect, False)
667 except pywintypes.error as e:
668 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
669 # The operation is still pending, try again
670 self.connect_pending = True
671 return errno.EAGAIN, None
672 else:
673 if self.pipe:
674 win32pipe.DisconnectNamedPipe(self.pipe)
675 return errno.EINVAL, None
676 self.connect_pending = False
677
678 error = winutils.connect_named_pipe(self.pipe, self.connect)
679 if error:
680 if error == winutils.winerror.ERROR_IO_PENDING:
681 self.connect_pending = True
682 return errno.EAGAIN, None
683 elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
684 if self.pipe:
685 win32pipe.DisconnectNamedPipe(self.pipe)
686 self.connect_pending = False
687 return errno.EINVAL, None
688 else:
689 win32event.SetEvent(self.connect.hEvent)
690
691 npipe = winutils.create_named_pipe(self._pipename)
692 if not npipe:
693 return errno.ENOENT, None
694
695 old_pipe = self.pipe
696 self.pipe = npipe
697 winutils.win32event.ResetEvent(self.connect.hEvent)
698 return 0, Stream(None, self.name, 0, pipe=old_pipe)
699
700 def wait(self, poller):
701 if sys.platform != 'win32' or self.socket is not None:
702 poller.fd_wait(self.socket, ovs.poller.POLLIN)
703 else:
704 poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
705
706 def __del__(self):
707 # Don't delete the file: we might have forked.
708 if self.socket is not None:
709 self.socket.close()
710 if self.pipe is not None:
711 # Check if there are any remaining valid handles and close them
712 if self.pipe:
713 winutils.close_handle(self.pipe)
714 if self._connect.hEvent:
715 winutils.close_handle(self._read.hEvent)
716
717
718 def usage(name):
719 return """
720 Active %s connection methods:
721 unix:FILE Unix domain socket named FILE
722 tcp:HOST:PORT TCP socket to HOST with port no of PORT
723 ssl:HOST:PORT SSL socket to HOST with port no of PORT
724
725 Passive %s connection methods:
726 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
727
728
729 class UnixStream(Stream):
730 @staticmethod
731 def needs_probes():
732 return False
733
734 @staticmethod
735 def _open(suffix, dscp):
736 connect_path = suffix
737 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
738 True, None, connect_path)
739
740
741 Stream.register_method("unix", UnixStream)
742
743
744 class TCPStream(Stream):
745 @staticmethod
746 def needs_probes():
747 return True
748
749 @staticmethod
750 def _open(suffix, dscp):
751 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
752 suffix, 0, dscp)
753 if not error:
754 try:
755 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
756 except socket.error as e:
757 sock.close()
758 return ovs.socket_util.get_exception_errno(e), None
759 return error, sock
760
761
762 Stream.register_method("tcp", TCPStream)
763
764
765 class SSLStream(Stream):
766 @staticmethod
767 def needs_probes():
768 return True
769
770 @staticmethod
771 def verify_cb(conn, cert, errnum, depth, ok):
772 return ok
773
774 @staticmethod
775 def _open(suffix, dscp):
776 error, sock = TCPStream._open(suffix, dscp)
777 if error:
778 return error, None
779
780 # Create an SSL context
781 ctx = SSL.Context(SSL.SSLv23_METHOD)
782 ctx.set_verify(SSL.VERIFY_PEER, SSLStream.verify_cb)
783 ctx.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
784 # If the client has not set the SSL configuration files
785 # exception would be raised.
786 ctx.use_privatekey_file(Stream._SSL_private_key_file)
787 ctx.use_certificate_file(Stream._SSL_certificate_file)
788 ctx.load_verify_locations(Stream._SSL_ca_cert_file)
789
790 ssl_sock = SSL.Connection(ctx, sock)
791 ssl_sock.set_connect_state()
792 return error, ssl_sock
793
794 def connect(self):
795 retval = super(SSLStream, self).connect()
796
797 if retval:
798 return retval
799
800 # TCP Connection is successful. Now do the SSL handshake
801 try:
802 self.socket.do_handshake()
803 except SSL.WantReadError:
804 return errno.EAGAIN
805 except SSL.SysCallError as e:
806 return ovs.socket_util.get_exception_errno(e)
807
808 return 0
809
810 def recv(self, n):
811 try:
812 return super(SSLStream, self).recv(n)
813 except SSL.WantReadError:
814 return (errno.EAGAIN, "")
815 except SSL.SysCallError as e:
816 return (ovs.socket_util.get_exception_errno(e), "")
817 except SSL.ZeroReturnError:
818 return (0, "")
819
820 def send(self, buf):
821 try:
822 return super(SSLStream, self).send(buf)
823 except SSL.WantWriteError:
824 return -errno.EAGAIN
825 except SSL.SysCallError as e:
826 return -ovs.socket_util.get_exception_errno(e)
827
828
829 if SSL:
830 # Register SSL only if the OpenSSL module is available
831 Stream.register_method("ssl", SSLStream)