]> git.proxmox.com Git - ovs.git/blob - python/ovs/stream.py
57e7a6eef86b8fe69dad112e14a4c124eba9f3d4
[ovs.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import socket
18 import sys
19
20 import ovs.poller
21 import ovs.socket_util
22 import ovs.vlog
23
24 import six
25
26 try:
27 from OpenSSL import SSL
28 except ImportError:
29 SSL = None
30
31 if sys.platform == 'win32':
32 import ovs.winutils as winutils
33 import pywintypes
34 import win32event
35 import win32file
36 import win32pipe
37
38 vlog = ovs.vlog.Vlog("stream")
39
40
41 def stream_or_pstream_needs_probes(name):
42 """ True if the stream or pstream specified by 'name' needs periodic probes
43 to verify connectivity. For [p]streams which need probes, it can take a
44 long time to notice the connection was dropped. Returns False if probes
45 aren't needed, and None if 'name' is invalid"""
46
47 cls = Stream._find_method(name)
48 if cls:
49 return cls.needs_probes()
50 elif PassiveStream.is_valid_name(name):
51 return PassiveStream.needs_probes(name)
52 else:
53 return None
54
55
56 class Stream(object):
57 """Bidirectional byte stream. Unix domain sockets, tcp and ssl
58 are implemented."""
59
60 # States.
61 __S_CONNECTING = 0
62 __S_CONNECTED = 1
63 __S_DISCONNECTED = 2
64
65 # Kinds of events that one might wait for.
66 W_CONNECT = 0 # Connect complete (success or failure).
67 W_RECV = 1 # Data received.
68 W_SEND = 2 # Send buffer room available.
69
70 _SOCKET_METHODS = {}
71
72 _SSL_private_key_file = None
73 _SSL_certificate_file = None
74 _SSL_ca_cert_file = None
75
76 # Windows only
77 _write = None # overlapped for write operation
78 _read = None # overlapped for read operation
79 _write_pending = False
80 _read_pending = False
81 _retry_connect = False
82
83 @staticmethod
84 def register_method(method, cls):
85 Stream._SOCKET_METHODS[method + ":"] = cls
86
87 @staticmethod
88 def _find_method(name):
89 for method, cls in six.iteritems(Stream._SOCKET_METHODS):
90 if name.startswith(method):
91 return cls
92 return None
93
94 @staticmethod
95 def is_valid_name(name):
96 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
97 TYPE is a supported stream type ("unix:", "tcp:" and "ssl:"),
98 otherwise False."""
99 return bool(Stream._find_method(name))
100
101 def __init__(self, socket, name, status, pipe=None, is_server=False):
102 self.socket = socket
103 self.pipe = pipe
104 if sys.platform == 'win32':
105 self._read = pywintypes.OVERLAPPED()
106 self._read.hEvent = winutils.get_new_event()
107 self._write = pywintypes.OVERLAPPED()
108 self._write.hEvent = winutils.get_new_event()
109 if pipe is not None:
110 # Flag to check if fd is a server HANDLE. In the case of a
111 # server handle we have to issue a disconnect before closing
112 # the actual handle.
113 self._server = is_server
114 suffix = name.split(":", 1)[1]
115 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
116 self._pipename = winutils.get_pipe_name(suffix)
117
118 self.name = name
119 if status == errno.EAGAIN:
120 self.state = Stream.__S_CONNECTING
121 elif status == 0:
122 self.state = Stream.__S_CONNECTED
123 else:
124 self.state = Stream.__S_DISCONNECTED
125
126 self.error = 0
127
128 # Default value of dscp bits for connection between controller and manager.
129 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
130 # in <netinet/ip.h> is used.
131 IPTOS_PREC_INTERNETCONTROL = 0xc0
132 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
133
134 @staticmethod
135 def open(name, dscp=DSCP_DEFAULT):
136 """Attempts to connect a stream to a remote peer. 'name' is a
137 connection name in the form "TYPE:ARGS", where TYPE is an active stream
138 class's name and ARGS are stream class-specific. The supported TYPEs
139 include "unix", "tcp", and "ssl".
140
141 Returns (error, stream): on success 'error' is 0 and 'stream' is the
142 new Stream, on failure 'error' is a positive errno value and 'stream'
143 is None.
144
145 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
146 and a new Stream. The connect() method can be used to check for
147 successful connection completion."""
148 cls = Stream._find_method(name)
149 if not cls:
150 return errno.EAFNOSUPPORT, None
151
152 suffix = name.split(":", 1)[1]
153 if name.startswith("unix:"):
154 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
155 if sys.platform == 'win32':
156 pipename = winutils.get_pipe_name(suffix)
157
158 if len(suffix) > 255:
159 # Return invalid argument if the name is too long
160 return errno.ENOENT, None
161
162 try:
163 # In case of "unix:" argument, the assumption is that
164 # there is a file created in the path (suffix).
165 open(suffix, 'r').close()
166 except:
167 return errno.ENOENT, None
168
169 try:
170 npipe = winutils.create_file(pipename)
171 try:
172 winutils.set_pipe_mode(npipe,
173 win32pipe.PIPE_READMODE_BYTE)
174 except pywintypes.error as e:
175 return errno.ENOENT, None
176 except pywintypes.error as e:
177 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
178 # Pipe is busy, set the retry flag to true and retry
179 # again during the connect function.
180 Stream.retry_connect = True
181 return 0, cls(None, name, errno.EAGAIN,
182 pipe=win32file.INVALID_HANDLE_VALUE,
183 is_server=False)
184 return errno.ENOENT, None
185 return 0, cls(None, name, 0, pipe=npipe, is_server=False)
186
187 error, sock = cls._open(suffix, dscp)
188 if error:
189 return error, None
190 else:
191 status = ovs.socket_util.check_connection_completion(sock)
192 return 0, cls(sock, name, status)
193
194 @staticmethod
195 def _open(suffix, dscp):
196 raise NotImplementedError("This method must be overrided by subclass")
197
198 @staticmethod
199 def open_block(error_stream):
200 """Blocks until a Stream completes its connection attempt, either
201 succeeding or failing. (error, stream) should be the tuple returned by
202 Stream.open(). Returns a tuple of the same form.
203
204 Typical usage:
205 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
206
207 # Py3 doesn't support tuple parameter unpacking - PEP 3113
208 error, stream = error_stream
209 if not error:
210 while True:
211 error = stream.connect()
212 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
213 # WSAEWOULDBLOCK would be the equivalent on Windows
214 # for EAGAIN on Unix.
215 error = errno.EAGAIN
216 if error != errno.EAGAIN:
217 break
218 stream.run()
219 poller = ovs.poller.Poller()
220 stream.run_wait(poller)
221 stream.connect_wait(poller)
222 poller.block()
223 if stream.socket is not None:
224 assert error != errno.EINPROGRESS
225
226 if error and stream:
227 stream.close()
228 stream = None
229 return error, stream
230
231 def close(self):
232 if self.socket is not None:
233 self.socket.close()
234 if self.pipe is not None:
235 if self._server:
236 # Flush the pipe to allow the client to read the pipe
237 # before disconnecting.
238 win32pipe.FlushFileBuffers(self.pipe)
239 win32pipe.DisconnectNamedPipe(self.pipe)
240 winutils.close_handle(self.pipe, vlog.warn)
241 winutils.close_handle(self._read.hEvent, vlog.warn)
242 winutils.close_handle(self._write.hEvent, vlog.warn)
243
244 def __scs_connecting(self):
245 if self.socket is not None:
246 retval = ovs.socket_util.check_connection_completion(self.socket)
247 assert retval != errno.EINPROGRESS
248 elif sys.platform == 'win32':
249 if self.retry_connect:
250 try:
251 self.pipe = winutils.create_file(self._pipename)
252 self._retry_connect = False
253 retval = 0
254 except pywintypes.error as e:
255 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
256 retval = errno.EAGAIN
257 else:
258 self._retry_connect = False
259 retval = errno.ENOENT
260 else:
261 # If retry_connect is false, it means it's already
262 # connected so we can set the value of retval to 0
263 retval = 0
264
265 if retval == 0:
266 self.state = Stream.__S_CONNECTED
267 elif retval != errno.EAGAIN:
268 self.state = Stream.__S_DISCONNECTED
269 self.error = retval
270
271 def connect(self):
272 """Tries to complete the connection on this stream. If the connection
273 is complete, returns 0 if the connection was successful or a positive
274 errno value if it failed. If the connection is still in progress,
275 returns errno.EAGAIN."""
276
277 if self.state == Stream.__S_CONNECTING:
278 self.__scs_connecting()
279
280 if self.state == Stream.__S_CONNECTING:
281 return errno.EAGAIN
282 elif self.state == Stream.__S_CONNECTED:
283 return 0
284 else:
285 assert self.state == Stream.__S_DISCONNECTED
286 return self.error
287
288 def recv(self, n):
289 """Tries to receive up to 'n' bytes from this stream. Returns a
290 (error, string) tuple:
291
292 - If successful, 'error' is zero and 'string' contains between 1
293 and 'n' bytes of data.
294
295 - On error, 'error' is a positive errno value.
296
297 - If the connection has been closed in the normal fashion or if 'n'
298 is 0, the tuple is (0, "").
299
300 The recv function will not block waiting for data to arrive. If no
301 data have been received, it returns (errno.EAGAIN, "") immediately."""
302
303 retval = self.connect()
304 if retval != 0:
305 return (retval, "")
306 elif n == 0:
307 return (0, "")
308
309 if sys.platform == 'win32' and self.socket is None:
310 return self.__recv_windows(n)
311
312 try:
313 return (0, self.socket.recv(n))
314 except socket.error as e:
315 return (ovs.socket_util.get_exception_errno(e), "")
316
317 def __recv_windows(self, n):
318 if self._read_pending:
319 try:
320 nBytesRead = winutils.get_overlapped_result(self.pipe,
321 self._read,
322 False)
323 self._read_pending = False
324 recvBuffer = self._read_buffer[:nBytesRead]
325
326 return (0, winutils.get_decoded_buffer(recvBuffer))
327 except pywintypes.error as e:
328 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
329 # The operation is still pending, try again
330 self._read_pending = True
331 return (errno.EAGAIN, "")
332 elif e.winerror in winutils.pipe_disconnected_errors:
333 # If the pipe was disconnected, return 0.
334 return (0, "")
335 else:
336 return (errno.EINVAL, "")
337
338 (errCode, self._read_buffer) = winutils.read_file(self.pipe,
339 n,
340 self._read)
341 if errCode:
342 if errCode == winutils.winerror.ERROR_IO_PENDING:
343 self._read_pending = True
344 return (errno.EAGAIN, "")
345 elif errCode in winutils.pipe_disconnected_errors:
346 # If the pipe was disconnected, return 0.
347 return (0, "")
348 else:
349 return (errCode, "")
350
351 try:
352 nBytesRead = winutils.get_overlapped_result(self.pipe,
353 self._read,
354 False)
355 winutils.win32event.SetEvent(self._read.hEvent)
356 except pywintypes.error as e:
357 if e.winerror in winutils.pipe_disconnected_errors:
358 # If the pipe was disconnected, return 0.
359 return (0, "")
360 else:
361 return (e.winerror, "")
362
363 recvBuffer = self._read_buffer[:nBytesRead]
364 return (0, winutils.get_decoded_buffer(recvBuffer))
365
366 def send(self, buf):
367 """Tries to send 'buf' on this stream.
368
369 If successful, returns the number of bytes sent, between 1 and
370 len(buf). 0 is only a valid return value if len(buf) is 0.
371
372 On error, returns a negative errno value.
373
374 Will not block. If no bytes can be immediately accepted for
375 transmission, returns -errno.EAGAIN immediately."""
376
377 retval = self.connect()
378 if retval != 0:
379 return -retval
380 elif len(buf) == 0:
381 return 0
382
383 if sys.platform == 'win32' and self.socket is None:
384 return self.__send_windows(buf)
385
386 try:
387 # Python 3 has separate types for strings and bytes. We must have
388 # bytes here.
389 if six.PY3 and not isinstance(buf, bytes):
390 buf = bytes(buf, 'utf-8')
391 elif six.PY2:
392 buf = buf.encode('utf-8')
393 return self.socket.send(buf)
394 except socket.error as e:
395 return -ovs.socket_util.get_exception_errno(e)
396
397 def __send_windows(self, buf):
398 if self._write_pending:
399 try:
400 nBytesWritten = winutils.get_overlapped_result(self.pipe,
401 self._write,
402 False)
403 self._write_pending = False
404 return nBytesWritten
405 except pywintypes.error as e:
406 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
407 # The operation is still pending, try again
408 self._read_pending = True
409 return -errno.EAGAIN
410 elif e.winerror in winutils.pipe_disconnected_errors:
411 # If the pipe was disconnected, return connection reset.
412 return -errno.ECONNRESET
413 else:
414 return -errno.EINVAL
415
416 buf = winutils.get_encoded_buffer(buf)
417 self._write_pending = False
418 (errCode, nBytesWritten) = winutils.write_file(self.pipe,
419 buf,
420 self._write)
421 if errCode:
422 if errCode == winutils.winerror.ERROR_IO_PENDING:
423 self._write_pending = True
424 return -errno.EAGAIN
425 if (not nBytesWritten and
426 errCode in winutils.pipe_disconnected_errors):
427 # If the pipe was disconnected, return connection reset.
428 return -errno.ECONNRESET
429 return nBytesWritten
430
431 def run(self):
432 pass
433
434 def run_wait(self, poller):
435 pass
436
437 def wait(self, poller, wait):
438 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
439
440 if self.state == Stream.__S_DISCONNECTED:
441 poller.immediate_wake()
442 return
443
444 if self.state == Stream.__S_CONNECTING:
445 wait = Stream.W_CONNECT
446
447 if sys.platform == 'win32':
448 self.__wait_windows(poller, wait)
449 return
450
451 if wait == Stream.W_RECV:
452 poller.fd_wait(self.socket, ovs.poller.POLLIN)
453 else:
454 poller.fd_wait(self.socket, ovs.poller.POLLOUT)
455
456 def __wait_windows(self, poller, wait):
457 if self.socket is not None:
458 if wait == Stream.W_RECV:
459 read_flags = (win32file.FD_READ |
460 win32file.FD_ACCEPT |
461 win32file.FD_CLOSE)
462 try:
463 win32file.WSAEventSelect(self.socket,
464 self._read.hEvent,
465 read_flags)
466 except pywintypes.error as e:
467 vlog.err("failed to associate events with socket: %s"
468 % e.strerror)
469 poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
470 else:
471 write_flags = (win32file.FD_WRITE |
472 win32file.FD_CONNECT |
473 win32file.FD_CLOSE)
474 try:
475 win32file.WSAEventSelect(self.socket,
476 self._write.hEvent,
477 write_flags)
478 except pywintypes.error as e:
479 vlog.err("failed to associate events with socket: %s"
480 % e.strerror)
481 poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
482 else:
483 if wait == Stream.W_RECV:
484 if self._read:
485 poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
486 elif wait == Stream.W_SEND:
487 if self._write:
488 poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
489 elif wait == Stream.W_CONNECT:
490 return
491
492 def connect_wait(self, poller):
493 self.wait(poller, Stream.W_CONNECT)
494
495 def recv_wait(self, poller):
496 self.wait(poller, Stream.W_RECV)
497
498 def send_wait(self, poller):
499 if sys.platform == 'win32':
500 poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
501 self.wait(poller, Stream.W_SEND)
502
503 def __del__(self):
504 # Don't delete the file: we might have forked.
505 if self.socket is not None:
506 self.socket.close()
507 if self.pipe is not None:
508 # Check if there are any remaining valid handles and close them
509 if self.pipe:
510 winutils.close_handle(self.pipe)
511 if self._read.hEvent:
512 winutils.close_handle(self._read.hEvent)
513 if self._write.hEvent:
514 winutils.close_handle(self._write.hEvent)
515
516 @staticmethod
517 def ssl_set_private_key_file(file_name):
518 Stream._SSL_private_key_file = file_name
519
520 @staticmethod
521 def ssl_set_certificate_file(file_name):
522 Stream._SSL_certificate_file = file_name
523
524 @staticmethod
525 def ssl_set_ca_cert_file(file_name):
526 Stream._SSL_ca_cert_file = file_name
527
528
529 class PassiveStream(object):
530 # Windows only
531 connect = None # overlapped for read operation
532 connect_pending = False
533
534 @staticmethod
535 def needs_probes(name):
536 return False if name.startswith("punix:") else True
537
538 @staticmethod
539 def is_valid_name(name):
540 """Returns True if 'name' is a passive stream name in the form
541 "TYPE:ARGS" and TYPE is a supported passive stream type (currently
542 "punix:" or "ptcp"), otherwise False."""
543 return name.startswith("punix:") | name.startswith("ptcp:")
544
545 def __init__(self, sock, name, bind_path, pipe=None):
546 self.name = name
547 self.pipe = pipe
548 self.socket = sock
549 if pipe is not None:
550 self.connect = pywintypes.OVERLAPPED()
551 self.connect.hEvent = winutils.get_new_event(bManualReset=True)
552 self.connect_pending = False
553 suffix = name.split(":", 1)[1]
554 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
555 self._pipename = winutils.get_pipe_name(suffix)
556
557 self.bind_path = bind_path
558
559 @staticmethod
560 def open(name):
561 """Attempts to start listening for remote stream connections. 'name'
562 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
563 stream class's name and ARGS are stream class-specific. Currently the
564 supported values for TYPE are "punix" and "ptcp".
565
566 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
567 new PassiveStream, on failure 'error' is a positive errno value and
568 'pstream' is None."""
569 if not PassiveStream.is_valid_name(name):
570 return errno.EAFNOSUPPORT, None
571
572 bind_path = name[6:]
573 if name.startswith("punix:"):
574 bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
575 if sys.platform != 'win32':
576 error, sock = ovs.socket_util.make_unix_socket(
577 socket.SOCK_STREAM, True, bind_path, None)
578 if error:
579 return error, None
580 else:
581 # Branch used only on Windows
582 try:
583 open(bind_path, 'w').close()
584 except:
585 return errno.ENOENT, None
586
587 pipename = winutils.get_pipe_name(bind_path)
588 if len(pipename) > 255:
589 # Return invalid argument if the name is too long
590 return errno.ENOENT, None
591
592 npipe = winutils.create_named_pipe(pipename)
593 if not npipe:
594 return errno.ENOENT, None
595 return 0, PassiveStream(None, name, bind_path, pipe=npipe)
596
597 elif name.startswith("ptcp:"):
598 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
599 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
600 remote = name.split(':')
601 sock.bind((remote[1], int(remote[2])))
602
603 else:
604 raise Exception('Unknown connection string')
605
606 try:
607 sock.listen(10)
608 except socket.error as e:
609 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
610 sock.close()
611 return e.error, None
612
613 return 0, PassiveStream(sock, name, bind_path)
614
615 def close(self):
616 """Closes this PassiveStream."""
617 if self.socket is not None:
618 self.socket.close()
619 if self.pipe is not None:
620 winutils.close_handle(self.pipe, vlog.warn)
621 winutils.close_handle(self.connect.hEvent, vlog.warn)
622 if self.bind_path is not None:
623 ovs.fatal_signal.unlink_file_now(self.bind_path)
624 self.bind_path = None
625
626 def accept(self):
627 """Tries to accept a new connection on this passive stream. Returns
628 (error, stream): if successful, 'error' is 0 and 'stream' is the new
629 Stream object, and on failure 'error' is a positive errno value and
630 'stream' is None.
631
632 Will not block waiting for a connection. If no connection is ready to
633 be accepted, returns (errno.EAGAIN, None) immediately."""
634 if sys.platform == 'win32' and self.socket is None:
635 return self.__accept_windows()
636 while True:
637 try:
638 sock, addr = self.socket.accept()
639 ovs.socket_util.set_nonblocking(sock)
640 if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
641 return 0, Stream(sock, "unix:%s" % addr, 0)
642 return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
643 str(addr[1])), 0)
644 except socket.error as e:
645 error = ovs.socket_util.get_exception_errno(e)
646 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
647 # WSAEWOULDBLOCK would be the equivalent on Windows
648 # for EAGAIN on Unix.
649 error = errno.EAGAIN
650 if error != errno.EAGAIN:
651 # XXX rate-limit
652 vlog.dbg("accept: %s" % os.strerror(error))
653 return error, None
654
655 def __accept_windows(self):
656 if self.connect_pending:
657 try:
658 winutils.get_overlapped_result(self.pipe, self.connect, False)
659 except pywintypes.error as e:
660 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
661 # The operation is still pending, try again
662 self.connect_pending = True
663 return errno.EAGAIN, None
664 else:
665 if self.pipe:
666 win32pipe.DisconnectNamedPipe(self.pipe)
667 return errno.EINVAL, None
668 self.connect_pending = False
669
670 error = winutils.connect_named_pipe(self.pipe, self.connect)
671 if error:
672 if error == winutils.winerror.ERROR_IO_PENDING:
673 self.connect_pending = True
674 return errno.EAGAIN, None
675 elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
676 if self.pipe:
677 win32pipe.DisconnectNamedPipe(self.pipe)
678 self.connect_pending = False
679 return errno.EINVAL, None
680 else:
681 win32event.SetEvent(self.connect.hEvent)
682
683 npipe = winutils.create_named_pipe(self._pipename)
684 if not npipe:
685 return errno.ENOENT, None
686
687 old_pipe = self.pipe
688 self.pipe = npipe
689 winutils.win32event.ResetEvent(self.connect.hEvent)
690 return 0, Stream(None, self.name, 0, pipe=old_pipe)
691
692 def wait(self, poller):
693 if sys.platform != 'win32' or self.socket is not None:
694 poller.fd_wait(self.socket, ovs.poller.POLLIN)
695 else:
696 poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
697
698 def __del__(self):
699 # Don't delete the file: we might have forked.
700 if self.socket is not None:
701 self.socket.close()
702 if self.pipe is not None:
703 # Check if there are any remaining valid handles and close them
704 if self.pipe:
705 winutils.close_handle(self.pipe)
706 if self._connect.hEvent:
707 winutils.close_handle(self._read.hEvent)
708
709
710 def usage(name):
711 return """
712 Active %s connection methods:
713 unix:FILE Unix domain socket named FILE
714 tcp:IP:PORT TCP socket to IP with port no of PORT
715 ssl:IP:PORT SSL socket to IP with port no of PORT
716
717 Passive %s connection methods:
718 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
719
720
721 class UnixStream(Stream):
722 @staticmethod
723 def needs_probes():
724 return False
725
726 @staticmethod
727 def _open(suffix, dscp):
728 connect_path = suffix
729 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
730 True, None, connect_path)
731
732
733 Stream.register_method("unix", UnixStream)
734
735
736 class TCPStream(Stream):
737 @staticmethod
738 def needs_probes():
739 return True
740
741 @staticmethod
742 def _open(suffix, dscp):
743 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
744 suffix, 0, dscp)
745 if not error:
746 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
747 return error, sock
748
749
750 Stream.register_method("tcp", TCPStream)
751
752
753 class SSLStream(Stream):
754 @staticmethod
755 def needs_probes():
756 return True
757
758 @staticmethod
759 def verify_cb(conn, cert, errnum, depth, ok):
760 return ok
761
762 @staticmethod
763 def _open(suffix, dscp):
764 error, sock = TCPStream._open(suffix, dscp)
765 if error:
766 return error, None
767
768 # Create an SSL context
769 ctx = SSL.Context(SSL.SSLv23_METHOD)
770 ctx.set_verify(SSL.VERIFY_PEER, SSLStream.verify_cb)
771 ctx.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
772 # If the client has not set the SSL configuration files
773 # exception would be raised.
774 ctx.use_privatekey_file(Stream._SSL_private_key_file)
775 ctx.use_certificate_file(Stream._SSL_certificate_file)
776 ctx.load_verify_locations(Stream._SSL_ca_cert_file)
777
778 ssl_sock = SSL.Connection(ctx, sock)
779 ssl_sock.set_connect_state()
780 return error, ssl_sock
781
782 def connect(self):
783 retval = super(SSLStream, self).connect()
784
785 if retval:
786 return retval
787
788 # TCP Connection is successful. Now do the SSL handshake
789 try:
790 self.socket.do_handshake()
791 except SSL.WantReadError:
792 return errno.EAGAIN
793 except SSL.SysCallError as e:
794 return ovs.socket_util.get_exception_errno(e)
795
796 return 0
797
798 def recv(self, n):
799 try:
800 return super(SSLStream, self).recv(n)
801 except SSL.WantReadError:
802 return (errno.EAGAIN, "")
803 except SSL.SysCallError as e:
804 return (ovs.socket_util.get_exception_errno(e), "")
805 except SSL.ZeroReturnError:
806 return (0, "")
807
808 def send(self, buf):
809 try:
810 if isinstance(buf, six.text_type):
811 # Convert to byte stream if the buffer is string type/unicode.
812 # pyopenssl version 0.14 expects the buffer to be byte string.
813 buf = buf.encode('utf-8')
814 return super(SSLStream, self).send(buf)
815 except SSL.WantWriteError:
816 return -errno.EAGAIN
817 except SSL.SysCallError as e:
818 return -ovs.socket_util.get_exception_errno(e)
819
820
821 if SSL:
822 # Register SSL only if the OpenSSL module is available
823 Stream.register_method("ssl", SSLStream)