]> git.proxmox.com Git - mirror_ovs.git/blob - python/ovs/stream.py
DNS: Add basic support for asynchronous DNS resolving
[mirror_ovs.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import socket
18 import sys
19
20 import ovs.poller
21 import ovs.socket_util
22 import ovs.vlog
23
24 import six
25
26 try:
27 from OpenSSL import SSL
28 except ImportError:
29 SSL = None
30
31 if sys.platform == 'win32':
32 import ovs.winutils as winutils
33 import pywintypes
34 import win32event
35 import win32file
36 import win32pipe
37
38 vlog = ovs.vlog.Vlog("stream")
39
40
41 def stream_or_pstream_needs_probes(name):
42 """ True if the stream or pstream specified by 'name' needs periodic probes
43 to verify connectivity. For [p]streams which need probes, it can take a
44 long time to notice the connection was dropped. Returns False if probes
45 aren't needed, and None if 'name' is invalid"""
46
47 cls = Stream._find_method(name)
48 if cls:
49 return cls.needs_probes()
50 elif PassiveStream.is_valid_name(name):
51 return PassiveStream.needs_probes(name)
52 else:
53 return None
54
55
56 class Stream(object):
57 """Bidirectional byte stream. Unix domain sockets, tcp and ssl
58 are implemented."""
59
60 # States.
61 __S_CONNECTING = 0
62 __S_CONNECTED = 1
63 __S_DISCONNECTED = 2
64
65 # Kinds of events that one might wait for.
66 W_CONNECT = 0 # Connect complete (success or failure).
67 W_RECV = 1 # Data received.
68 W_SEND = 2 # Send buffer room available.
69
70 _SOCKET_METHODS = {}
71
72 _SSL_private_key_file = None
73 _SSL_certificate_file = None
74 _SSL_ca_cert_file = None
75
76 # Windows only
77 _write = None # overlapped for write operation
78 _read = None # overlapped for read operation
79 _write_pending = False
80 _read_pending = False
81 _retry_connect = False
82
83 @staticmethod
84 def register_method(method, cls):
85 Stream._SOCKET_METHODS[method + ":"] = cls
86
87 @staticmethod
88 def _find_method(name):
89 for method, cls in six.iteritems(Stream._SOCKET_METHODS):
90 if name.startswith(method):
91 return cls
92 return None
93
94 @staticmethod
95 def is_valid_name(name):
96 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
97 TYPE is a supported stream type ("unix:", "tcp:" and "ssl:"),
98 otherwise False."""
99 return bool(Stream._find_method(name))
100
101 def __init__(self, socket, name, status, pipe=None, is_server=False):
102 self.socket = socket
103 self.pipe = pipe
104 if sys.platform == 'win32':
105 if pipe is not None:
106 # Flag to check if fd is a server HANDLE. In the case of a
107 # server handle we have to issue a disconnect before closing
108 # the actual handle.
109 self._server = is_server
110 suffix = name.split(":", 1)[1]
111 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
112 self._pipename = winutils.get_pipe_name(suffix)
113 self._read = pywintypes.OVERLAPPED()
114 self._read.hEvent = winutils.get_new_event()
115 self._write = pywintypes.OVERLAPPED()
116 self._write.hEvent = winutils.get_new_event()
117 else:
118 self._wevent = winutils.get_new_event(bManualReset=False,
119 bInitialState=False)
120
121 self.name = name
122 if status == errno.EAGAIN:
123 self.state = Stream.__S_CONNECTING
124 elif status == 0:
125 self.state = Stream.__S_CONNECTED
126 else:
127 self.state = Stream.__S_DISCONNECTED
128
129 self.error = 0
130
131 # Default value of dscp bits for connection between controller and manager.
132 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
133 # in <netinet/ip.h> is used.
134 IPTOS_PREC_INTERNETCONTROL = 0xc0
135 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
136
137 @staticmethod
138 def open(name, dscp=DSCP_DEFAULT):
139 """Attempts to connect a stream to a remote peer. 'name' is a
140 connection name in the form "TYPE:ARGS", where TYPE is an active stream
141 class's name and ARGS are stream class-specific. The supported TYPEs
142 include "unix", "tcp", and "ssl".
143
144 Returns (error, stream): on success 'error' is 0 and 'stream' is the
145 new Stream, on failure 'error' is a positive errno value and 'stream'
146 is None.
147
148 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
149 and a new Stream. The connect() method can be used to check for
150 successful connection completion."""
151 cls = Stream._find_method(name)
152 if not cls:
153 return errno.EAFNOSUPPORT, None
154
155 suffix = name.split(":", 1)[1]
156 if name.startswith("unix:"):
157 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
158 if sys.platform == 'win32':
159 pipename = winutils.get_pipe_name(suffix)
160
161 if len(suffix) > 255:
162 # Return invalid argument if the name is too long
163 return errno.ENOENT, None
164
165 try:
166 # In case of "unix:" argument, the assumption is that
167 # there is a file created in the path (suffix).
168 open(suffix, 'r').close()
169 except:
170 return errno.ENOENT, None
171
172 try:
173 npipe = winutils.create_file(pipename)
174 try:
175 winutils.set_pipe_mode(npipe,
176 win32pipe.PIPE_READMODE_BYTE)
177 except pywintypes.error as e:
178 return errno.ENOENT, None
179 except pywintypes.error as e:
180 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
181 # Pipe is busy, set the retry flag to true and retry
182 # again during the connect function.
183 Stream.retry_connect = True
184 return 0, cls(None, name, errno.EAGAIN,
185 pipe=win32file.INVALID_HANDLE_VALUE,
186 is_server=False)
187 return errno.ENOENT, None
188 return 0, cls(None, name, 0, pipe=npipe, is_server=False)
189
190 error, sock = cls._open(suffix, dscp)
191 if error:
192 return error, None
193 else:
194 status = ovs.socket_util.check_connection_completion(sock)
195 return 0, cls(sock, name, status)
196
197 @staticmethod
198 def _open(suffix, dscp):
199 raise NotImplementedError("This method must be overrided by subclass")
200
201 @staticmethod
202 def open_block(error_stream):
203 """Blocks until a Stream completes its connection attempt, either
204 succeeding or failing. (error, stream) should be the tuple returned by
205 Stream.open(). Returns a tuple of the same form.
206
207 Typical usage:
208 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
209
210 # Py3 doesn't support tuple parameter unpacking - PEP 3113
211 error, stream = error_stream
212 if not error:
213 while True:
214 error = stream.connect()
215 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
216 # WSAEWOULDBLOCK would be the equivalent on Windows
217 # for EAGAIN on Unix.
218 error = errno.EAGAIN
219 if error != errno.EAGAIN:
220 break
221 stream.run()
222 poller = ovs.poller.Poller()
223 stream.run_wait(poller)
224 stream.connect_wait(poller)
225 poller.block()
226 if stream.socket is not None:
227 assert error != errno.EINPROGRESS
228
229 if error and stream:
230 stream.close()
231 stream = None
232 return error, stream
233
234 def close(self):
235 if self.socket is not None:
236 self.socket.close()
237 if self.pipe is not None:
238 if self._server:
239 # Flush the pipe to allow the client to read the pipe
240 # before disconnecting.
241 win32pipe.FlushFileBuffers(self.pipe)
242 win32pipe.DisconnectNamedPipe(self.pipe)
243 winutils.close_handle(self.pipe, vlog.warn)
244 winutils.close_handle(self._read.hEvent, vlog.warn)
245 winutils.close_handle(self._write.hEvent, vlog.warn)
246
247 def __scs_connecting(self):
248 if self.socket is not None:
249 retval = ovs.socket_util.check_connection_completion(self.socket)
250 assert retval != errno.EINPROGRESS
251 elif sys.platform == 'win32':
252 if self.retry_connect:
253 try:
254 self.pipe = winutils.create_file(self._pipename)
255 self._retry_connect = False
256 retval = 0
257 except pywintypes.error as e:
258 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
259 retval = errno.EAGAIN
260 else:
261 self._retry_connect = False
262 retval = errno.ENOENT
263 else:
264 # If retry_connect is false, it means it's already
265 # connected so we can set the value of retval to 0
266 retval = 0
267
268 if retval == 0:
269 self.state = Stream.__S_CONNECTED
270 elif retval != errno.EAGAIN:
271 self.state = Stream.__S_DISCONNECTED
272 self.error = retval
273
274 def connect(self):
275 """Tries to complete the connection on this stream. If the connection
276 is complete, returns 0 if the connection was successful or a positive
277 errno value if it failed. If the connection is still in progress,
278 returns errno.EAGAIN."""
279
280 if self.state == Stream.__S_CONNECTING:
281 self.__scs_connecting()
282
283 if self.state == Stream.__S_CONNECTING:
284 return errno.EAGAIN
285 elif self.state == Stream.__S_CONNECTED:
286 return 0
287 else:
288 assert self.state == Stream.__S_DISCONNECTED
289 return self.error
290
291 def recv(self, n):
292 """Tries to receive up to 'n' bytes from this stream. Returns a
293 (error, string) tuple:
294
295 - If successful, 'error' is zero and 'string' contains between 1
296 and 'n' bytes of data.
297
298 - On error, 'error' is a positive errno value.
299
300 - If the connection has been closed in the normal fashion or if 'n'
301 is 0, the tuple is (0, "").
302
303 The recv function will not block waiting for data to arrive. If no
304 data have been received, it returns (errno.EAGAIN, "") immediately."""
305
306 retval = self.connect()
307 if retval != 0:
308 return (retval, "")
309 elif n == 0:
310 return (0, "")
311
312 if sys.platform == 'win32' and self.socket is None:
313 return self.__recv_windows(n)
314
315 try:
316 return (0, self.socket.recv(n))
317 except socket.error as e:
318 return (ovs.socket_util.get_exception_errno(e), "")
319
320 def __recv_windows(self, n):
321 if self._read_pending:
322 try:
323 nBytesRead = winutils.get_overlapped_result(self.pipe,
324 self._read,
325 False)
326 self._read_pending = False
327 except pywintypes.error as e:
328 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
329 # The operation is still pending, try again
330 self._read_pending = True
331 return (errno.EAGAIN, "")
332 elif e.winerror in winutils.pipe_disconnected_errors:
333 # If the pipe was disconnected, return 0.
334 return (0, "")
335 else:
336 return (errno.EINVAL, "")
337 else:
338 (errCode, self._read_buffer) = winutils.read_file(self.pipe,
339 n,
340 self._read)
341 if errCode:
342 if errCode == winutils.winerror.ERROR_IO_PENDING:
343 self._read_pending = True
344 return (errno.EAGAIN, "")
345 elif errCode in winutils.pipe_disconnected_errors:
346 # If the pipe was disconnected, return 0.
347 return (0, "")
348 else:
349 return (errCode, "")
350
351 try:
352 nBytesRead = winutils.get_overlapped_result(self.pipe,
353 self._read,
354 False)
355 winutils.win32event.SetEvent(self._read.hEvent)
356 except pywintypes.error as e:
357 if e.winerror in winutils.pipe_disconnected_errors:
358 # If the pipe was disconnected, return 0.
359 return (0, "")
360 else:
361 return (e.winerror, "")
362
363 recvBuffer = self._read_buffer[:nBytesRead]
364 # recvBuffer will have the type memoryview in Python3.
365 # We can use bytes to convert it to type bytes which works on
366 # both Python2 and Python3.
367 return (0, bytes(recvBuffer))
368
369 def send(self, buf):
370 """Tries to send 'buf' on this stream.
371
372 If successful, returns the number of bytes sent, between 1 and
373 len(buf). 0 is only a valid return value if len(buf) is 0.
374
375 On error, returns a negative errno value.
376
377 Will not block. If no bytes can be immediately accepted for
378 transmission, returns -errno.EAGAIN immediately."""
379
380 retval = self.connect()
381 if retval != 0:
382 return -retval
383 elif len(buf) == 0:
384 return 0
385
386 # We must have bytes for sending.
387 if isinstance(buf, six.text_type):
388 buf = buf.encode('utf-8')
389
390 if sys.platform == 'win32' and self.socket is None:
391 return self.__send_windows(buf)
392
393 try:
394 return self.socket.send(buf)
395 except socket.error as e:
396 return -ovs.socket_util.get_exception_errno(e)
397
398 def __send_windows(self, buf):
399 if self._write_pending:
400 try:
401 nBytesWritten = winutils.get_overlapped_result(self.pipe,
402 self._write,
403 False)
404 self._write_pending = False
405 except pywintypes.error as e:
406 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
407 # The operation is still pending, try again
408 self._read_pending = True
409 return -errno.EAGAIN
410 elif e.winerror in winutils.pipe_disconnected_errors:
411 # If the pipe was disconnected, return connection reset.
412 return -errno.ECONNRESET
413 else:
414 return -errno.EINVAL
415 else:
416 (errCode, nBytesWritten) = winutils.write_file(self.pipe,
417 buf,
418 self._write)
419 if errCode:
420 if errCode == winutils.winerror.ERROR_IO_PENDING:
421 self._write_pending = True
422 return -errno.EAGAIN
423 if (not nBytesWritten and
424 errCode in winutils.pipe_disconnected_errors):
425 # If the pipe was disconnected, return connection reset.
426 return -errno.ECONNRESET
427 return nBytesWritten
428
429 def run(self):
430 pass
431
432 def run_wait(self, poller):
433 pass
434
435 def wait(self, poller, wait):
436 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
437
438 if self.state == Stream.__S_DISCONNECTED:
439 poller.immediate_wake()
440 return
441
442 if self.state == Stream.__S_CONNECTING:
443 wait = Stream.W_CONNECT
444
445 if sys.platform == 'win32':
446 self.__wait_windows(poller, wait)
447 return
448
449 if wait == Stream.W_RECV:
450 poller.fd_wait(self.socket, ovs.poller.POLLIN)
451 else:
452 poller.fd_wait(self.socket, ovs.poller.POLLOUT)
453
454 def __wait_windows(self, poller, wait):
455 if self.socket is not None:
456 if wait == Stream.W_RECV:
457 mask = (win32file.FD_READ |
458 win32file.FD_ACCEPT |
459 win32file.FD_CLOSE)
460 event = ovs.poller.POLLIN
461 else:
462 mask = (win32file.FD_WRITE |
463 win32file.FD_CONNECT |
464 win32file.FD_CLOSE)
465 event = ovs.poller.POLLOUT
466
467 try:
468 win32file.WSAEventSelect(self.socket,
469 self._wevent,
470 mask)
471 except pywintypes.error as e:
472 vlog.err("failed to associate events with socket: %s"
473 % e.strerror)
474 poller.fd_wait(self._wevent, event)
475 else:
476 if wait == Stream.W_RECV:
477 if self._read:
478 poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
479 elif wait == Stream.W_SEND:
480 if self._write:
481 poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
482 elif wait == Stream.W_CONNECT:
483 return
484
485 def connect_wait(self, poller):
486 self.wait(poller, Stream.W_CONNECT)
487
488 def recv_wait(self, poller):
489 self.wait(poller, Stream.W_RECV)
490
491 def send_wait(self, poller):
492 self.wait(poller, Stream.W_SEND)
493
494 def __del__(self):
495 # Don't delete the file: we might have forked.
496 if self.socket is not None:
497 self.socket.close()
498 if self.pipe is not None:
499 # Check if there are any remaining valid handles and close them
500 if self.pipe:
501 winutils.close_handle(self.pipe)
502 if self._read.hEvent:
503 winutils.close_handle(self._read.hEvent)
504 if self._write.hEvent:
505 winutils.close_handle(self._write.hEvent)
506
507 @staticmethod
508 def ssl_set_private_key_file(file_name):
509 Stream._SSL_private_key_file = file_name
510
511 @staticmethod
512 def ssl_set_certificate_file(file_name):
513 Stream._SSL_certificate_file = file_name
514
515 @staticmethod
516 def ssl_set_ca_cert_file(file_name):
517 Stream._SSL_ca_cert_file = file_name
518
519
520 class PassiveStream(object):
521 # Windows only
522 connect = None # overlapped for read operation
523 connect_pending = False
524
525 @staticmethod
526 def needs_probes(name):
527 return False if name.startswith("punix:") else True
528
529 @staticmethod
530 def is_valid_name(name):
531 """Returns True if 'name' is a passive stream name in the form
532 "TYPE:ARGS" and TYPE is a supported passive stream type (currently
533 "punix:" or "ptcp"), otherwise False."""
534 return name.startswith("punix:") | name.startswith("ptcp:")
535
536 def __init__(self, sock, name, bind_path, pipe=None):
537 self.name = name
538 self.pipe = pipe
539 self.socket = sock
540 if pipe is not None:
541 self.connect = pywintypes.OVERLAPPED()
542 self.connect.hEvent = winutils.get_new_event()
543 self.connect_pending = False
544 suffix = name.split(":", 1)[1]
545 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
546 self._pipename = winutils.get_pipe_name(suffix)
547
548 self.bind_path = bind_path
549
550 @staticmethod
551 def open(name):
552 """Attempts to start listening for remote stream connections. 'name'
553 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
554 stream class's name and ARGS are stream class-specific. Currently the
555 supported values for TYPE are "punix" and "ptcp".
556
557 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
558 new PassiveStream, on failure 'error' is a positive errno value and
559 'pstream' is None."""
560 if not PassiveStream.is_valid_name(name):
561 return errno.EAFNOSUPPORT, None
562
563 bind_path = name[6:]
564 if name.startswith("punix:"):
565 bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
566 if sys.platform != 'win32':
567 error, sock = ovs.socket_util.make_unix_socket(
568 socket.SOCK_STREAM, True, bind_path, None)
569 if error:
570 return error, None
571 else:
572 # Branch used only on Windows
573 try:
574 open(bind_path, 'w').close()
575 except:
576 return errno.ENOENT, None
577
578 pipename = winutils.get_pipe_name(bind_path)
579 if len(pipename) > 255:
580 # Return invalid argument if the name is too long
581 return errno.ENOENT, None
582
583 npipe = winutils.create_named_pipe(pipename)
584 if not npipe:
585 return errno.ENOENT, None
586 return 0, PassiveStream(None, name, bind_path, pipe=npipe)
587
588 elif name.startswith("ptcp:"):
589 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
590 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
591 remote = name.split(':')
592 sock.bind((remote[1], int(remote[2])))
593
594 else:
595 raise Exception('Unknown connection string')
596
597 try:
598 sock.listen(10)
599 except socket.error as e:
600 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
601 sock.close()
602 return e.error, None
603
604 return 0, PassiveStream(sock, name, bind_path)
605
606 def close(self):
607 """Closes this PassiveStream."""
608 if self.socket is not None:
609 self.socket.close()
610 if self.pipe is not None:
611 winutils.close_handle(self.pipe, vlog.warn)
612 winutils.close_handle(self.connect.hEvent, vlog.warn)
613 if self.bind_path is not None:
614 ovs.fatal_signal.unlink_file_now(self.bind_path)
615 self.bind_path = None
616
617 def accept(self):
618 """Tries to accept a new connection on this passive stream. Returns
619 (error, stream): if successful, 'error' is 0 and 'stream' is the new
620 Stream object, and on failure 'error' is a positive errno value and
621 'stream' is None.
622
623 Will not block waiting for a connection. If no connection is ready to
624 be accepted, returns (errno.EAGAIN, None) immediately."""
625 if sys.platform == 'win32' and self.socket is None:
626 return self.__accept_windows()
627 while True:
628 try:
629 sock, addr = self.socket.accept()
630 ovs.socket_util.set_nonblocking(sock)
631 if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
632 return 0, Stream(sock, "unix:%s" % addr, 0)
633 return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
634 str(addr[1])), 0)
635 except socket.error as e:
636 error = ovs.socket_util.get_exception_errno(e)
637 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
638 # WSAEWOULDBLOCK would be the equivalent on Windows
639 # for EAGAIN on Unix.
640 error = errno.EAGAIN
641 if error != errno.EAGAIN:
642 # XXX rate-limit
643 vlog.dbg("accept: %s" % os.strerror(error))
644 return error, None
645
646 def __accept_windows(self):
647 if self.connect_pending:
648 try:
649 winutils.get_overlapped_result(self.pipe, self.connect, False)
650 except pywintypes.error as e:
651 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
652 # The operation is still pending, try again
653 self.connect_pending = True
654 return errno.EAGAIN, None
655 else:
656 if self.pipe:
657 win32pipe.DisconnectNamedPipe(self.pipe)
658 return errno.EINVAL, None
659 self.connect_pending = False
660
661 error = winutils.connect_named_pipe(self.pipe, self.connect)
662 if error:
663 if error == winutils.winerror.ERROR_IO_PENDING:
664 self.connect_pending = True
665 return errno.EAGAIN, None
666 elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
667 if self.pipe:
668 win32pipe.DisconnectNamedPipe(self.pipe)
669 self.connect_pending = False
670 return errno.EINVAL, None
671 else:
672 win32event.SetEvent(self.connect.hEvent)
673
674 npipe = winutils.create_named_pipe(self._pipename)
675 if not npipe:
676 return errno.ENOENT, None
677
678 old_pipe = self.pipe
679 self.pipe = npipe
680 winutils.win32event.ResetEvent(self.connect.hEvent)
681 return 0, Stream(None, self.name, 0, pipe=old_pipe)
682
683 def wait(self, poller):
684 if sys.platform != 'win32' or self.socket is not None:
685 poller.fd_wait(self.socket, ovs.poller.POLLIN)
686 else:
687 poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
688
689 def __del__(self):
690 # Don't delete the file: we might have forked.
691 if self.socket is not None:
692 self.socket.close()
693 if self.pipe is not None:
694 # Check if there are any remaining valid handles and close them
695 if self.pipe:
696 winutils.close_handle(self.pipe)
697 if self._connect.hEvent:
698 winutils.close_handle(self._read.hEvent)
699
700
701 def usage(name):
702 return """
703 Active %s connection methods:
704 unix:FILE Unix domain socket named FILE
705 tcp:HOST:PORT TCP socket to HOST with port no of PORT
706 ssl:HOST:PORT SSL socket to HOST with port no of PORT
707
708 Passive %s connection methods:
709 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
710
711
712 class UnixStream(Stream):
713 @staticmethod
714 def needs_probes():
715 return False
716
717 @staticmethod
718 def _open(suffix, dscp):
719 connect_path = suffix
720 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
721 True, None, connect_path)
722
723
724 Stream.register_method("unix", UnixStream)
725
726
727 class TCPStream(Stream):
728 @staticmethod
729 def needs_probes():
730 return True
731
732 @staticmethod
733 def _open(suffix, dscp):
734 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
735 suffix, 0, dscp)
736 if not error:
737 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
738 return error, sock
739
740
741 Stream.register_method("tcp", TCPStream)
742
743
744 class SSLStream(Stream):
745 @staticmethod
746 def needs_probes():
747 return True
748
749 @staticmethod
750 def verify_cb(conn, cert, errnum, depth, ok):
751 return ok
752
753 @staticmethod
754 def _open(suffix, dscp):
755 error, sock = TCPStream._open(suffix, dscp)
756 if error:
757 return error, None
758
759 # Create an SSL context
760 ctx = SSL.Context(SSL.SSLv23_METHOD)
761 ctx.set_verify(SSL.VERIFY_PEER, SSLStream.verify_cb)
762 ctx.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
763 # If the client has not set the SSL configuration files
764 # exception would be raised.
765 ctx.use_privatekey_file(Stream._SSL_private_key_file)
766 ctx.use_certificate_file(Stream._SSL_certificate_file)
767 ctx.load_verify_locations(Stream._SSL_ca_cert_file)
768
769 ssl_sock = SSL.Connection(ctx, sock)
770 ssl_sock.set_connect_state()
771 return error, ssl_sock
772
773 def connect(self):
774 retval = super(SSLStream, self).connect()
775
776 if retval:
777 return retval
778
779 # TCP Connection is successful. Now do the SSL handshake
780 try:
781 self.socket.do_handshake()
782 except SSL.WantReadError:
783 return errno.EAGAIN
784 except SSL.SysCallError as e:
785 return ovs.socket_util.get_exception_errno(e)
786
787 return 0
788
789 def recv(self, n):
790 try:
791 return super(SSLStream, self).recv(n)
792 except SSL.WantReadError:
793 return (errno.EAGAIN, "")
794 except SSL.SysCallError as e:
795 return (ovs.socket_util.get_exception_errno(e), "")
796 except SSL.ZeroReturnError:
797 return (0, "")
798
799 def send(self, buf):
800 try:
801 return super(SSLStream, self).send(buf)
802 except SSL.WantWriteError:
803 return -errno.EAGAIN
804 except SSL.SysCallError as e:
805 return -ovs.socket_util.get_exception_errno(e)
806
807
808 if SSL:
809 # Register SSL only if the OpenSSL module is available
810 Stream.register_method("ssl", SSLStream)