]> git.proxmox.com Git - mirror_ovs.git/blame - python/ovs/stream.py
DNS: Add basic support for asynchronous DNS resolving
[mirror_ovs.git] / python / ovs / stream.py
CommitLineData
e0edde6f 1# Copyright (c) 2010, 2011, 2012 Nicira, Inc.
99155935
BP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at:
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import errno
99155935 16import os
99155935 17import socket
03947eb7 18import sys
99155935
BP
19
20import ovs.poller
21import ovs.socket_util
3a656eaf
EJ
22import ovs.vlog
23
6c7050b5 24import six
25
d90ed7d6
NS
26try:
27 from OpenSSL import SSL
28except ImportError:
29 SSL = None
30
03947eb7
AB
31if sys.platform == 'win32':
32 import ovs.winutils as winutils
33 import pywintypes
34 import win32event
35 import win32file
36 import win32pipe
37
3a656eaf 38vlog = ovs.vlog.Vlog("stream")
99155935 39
26bb0f31 40
f1936eb6 41def stream_or_pstream_needs_probes(name):
e7dce33f
GL
42 """ True if the stream or pstream specified by 'name' needs periodic probes
43 to verify connectivity. For [p]streams which need probes, it can take a
44 long time to notice the connection was dropped. Returns False if probes
45 aren't needed, and None if 'name' is invalid"""
46
47 cls = Stream._find_method(name)
48 if cls:
49 return cls.needs_probes()
50 elif PassiveStream.is_valid_name(name):
51 return PassiveStream.needs_probes(name)
f1936eb6 52 else:
e7dce33f 53 return None
f1936eb6
EJ
54
55
99155935 56class Stream(object):
d90ed7d6 57 """Bidirectional byte stream. Unix domain sockets, tcp and ssl
99155935 58 are implemented."""
99155935
BP
59
60 # States.
61 __S_CONNECTING = 0
62 __S_CONNECTED = 1
63 __S_DISCONNECTED = 2
64
65 # Kinds of events that one might wait for.
66 W_CONNECT = 0 # Connect complete (success or failure).
67 W_RECV = 1 # Data received.
68 W_SEND = 2 # Send buffer room available.
69
e06d06a7
IY
70 _SOCKET_METHODS = {}
71
d90ed7d6
NS
72 _SSL_private_key_file = None
73 _SSL_certificate_file = None
74 _SSL_ca_cert_file = None
75
03947eb7
AB
76 # Windows only
77 _write = None # overlapped for write operation
78 _read = None # overlapped for read operation
79 _write_pending = False
80 _read_pending = False
81 _retry_connect = False
82
e06d06a7 83 @staticmethod
f7b7ee97
IY
84 def register_method(method, cls):
85 Stream._SOCKET_METHODS[method + ":"] = cls
e06d06a7
IY
86
87 @staticmethod
88 def _find_method(name):
cb96c1b2 89 for method, cls in six.iteritems(Stream._SOCKET_METHODS):
e06d06a7
IY
90 if name.startswith(method):
91 return cls
92 return None
93
99155935
BP
94 @staticmethod
95 def is_valid_name(name):
96 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
d90ed7d6 97 TYPE is a supported stream type ("unix:", "tcp:" and "ssl:"),
e06d06a7
IY
98 otherwise False."""
99 return bool(Stream._find_method(name))
99155935 100
03947eb7 101 def __init__(self, socket, name, status, pipe=None, is_server=False):
99155935 102 self.socket = socket
03947eb7
AB
103 self.pipe = pipe
104 if sys.platform == 'win32':
03947eb7
AB
105 if pipe is not None:
106 # Flag to check if fd is a server HANDLE. In the case of a
107 # server handle we have to issue a disconnect before closing
108 # the actual handle.
109 self._server = is_server
110 suffix = name.split(":", 1)[1]
111 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
112 self._pipename = winutils.get_pipe_name(suffix)
fef22d68
AB
113 self._read = pywintypes.OVERLAPPED()
114 self._read.hEvent = winutils.get_new_event()
115 self._write = pywintypes.OVERLAPPED()
116 self._write.hEvent = winutils.get_new_event()
117 else:
118 self._wevent = winutils.get_new_event(bManualReset=False,
119 bInitialState=False)
03947eb7 120
99155935 121 self.name = name
99155935
BP
122 if status == errno.EAGAIN:
123 self.state = Stream.__S_CONNECTING
124 elif status == 0:
125 self.state = Stream.__S_CONNECTED
126 else:
127 self.state = Stream.__S_DISCONNECTED
128
129 self.error = 0
130
e06d06a7
IY
131 # Default value of dscp bits for connection between controller and manager.
132 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
133 # in <netinet/ip.h> is used.
134 IPTOS_PREC_INTERNETCONTROL = 0xc0
135 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
136
99155935 137 @staticmethod
e06d06a7 138 def open(name, dscp=DSCP_DEFAULT):
99155935
BP
139 """Attempts to connect a stream to a remote peer. 'name' is a
140 connection name in the form "TYPE:ARGS", where TYPE is an active stream
e7c640c3
GL
141 class's name and ARGS are stream class-specific. The supported TYPEs
142 include "unix", "tcp", and "ssl".
99155935
BP
143
144 Returns (error, stream): on success 'error' is 0 and 'stream' is the
145 new Stream, on failure 'error' is a positive errno value and 'stream'
146 is None.
147
148 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
149 and a new Stream. The connect() method can be used to check for
150 successful connection completion."""
e06d06a7
IY
151 cls = Stream._find_method(name)
152 if not cls:
99155935
BP
153 return errno.EAFNOSUPPORT, None
154
e06d06a7 155 suffix = name.split(":", 1)[1]
2c487bc8
PR
156 if name.startswith("unix:"):
157 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
03947eb7
AB
158 if sys.platform == 'win32':
159 pipename = winutils.get_pipe_name(suffix)
160
161 if len(suffix) > 255:
162 # Return invalid argument if the name is too long
163 return errno.ENOENT, None
164
165 try:
166 # In case of "unix:" argument, the assumption is that
167 # there is a file created in the path (suffix).
168 open(suffix, 'r').close()
169 except:
170 return errno.ENOENT, None
171
172 try:
173 npipe = winutils.create_file(pipename)
174 try:
175 winutils.set_pipe_mode(npipe,
176 win32pipe.PIPE_READMODE_BYTE)
177 except pywintypes.error as e:
178 return errno.ENOENT, None
179 except pywintypes.error as e:
180 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
181 # Pipe is busy, set the retry flag to true and retry
182 # again during the connect function.
183 Stream.retry_connect = True
184 return 0, cls(None, name, errno.EAGAIN,
185 pipe=win32file.INVALID_HANDLE_VALUE,
186 is_server=False)
187 return errno.ENOENT, None
188 return 0, cls(None, name, 0, pipe=npipe, is_server=False)
189
e06d06a7 190 error, sock = cls._open(suffix, dscp)
99155935
BP
191 if error:
192 return error, None
193 else:
194 status = ovs.socket_util.check_connection_completion(sock)
d90ed7d6 195 return 0, cls(sock, name, status)
99155935 196
e06d06a7
IY
197 @staticmethod
198 def _open(suffix, dscp):
199 raise NotImplementedError("This method must be overrided by subclass")
200
99155935 201 @staticmethod
3ab76c56 202 def open_block(error_stream):
99155935 203 """Blocks until a Stream completes its connection attempt, either
91c63796 204 succeeding or failing. (error, stream) should be the tuple returned by
99155935
BP
205 Stream.open(). Returns a tuple of the same form.
206
207 Typical usage:
91c63796 208 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
99155935 209
3ab76c56
TW
210 # Py3 doesn't support tuple parameter unpacking - PEP 3113
211 error, stream = error_stream
99155935
BP
212 if not error:
213 while True:
214 error = stream.connect()
03947eb7
AB
215 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
216 # WSAEWOULDBLOCK would be the equivalent on Windows
217 # for EAGAIN on Unix.
218 error = errno.EAGAIN
99155935
BP
219 if error != errno.EAGAIN:
220 break
221 stream.run()
222 poller = ovs.poller.Poller()
5796d9cf 223 stream.run_wait(poller)
99155935
BP
224 stream.connect_wait(poller)
225 poller.block()
03947eb7
AB
226 if stream.socket is not None:
227 assert error != errno.EINPROGRESS
26bb0f31 228
99155935
BP
229 if error and stream:
230 stream.close()
231 stream = None
232 return error, stream
233
234 def close(self):
03947eb7
AB
235 if self.socket is not None:
236 self.socket.close()
237 if self.pipe is not None:
238 if self._server:
09e192cd
AS
239 # Flush the pipe to allow the client to read the pipe
240 # before disconnecting.
241 win32pipe.FlushFileBuffers(self.pipe)
03947eb7
AB
242 win32pipe.DisconnectNamedPipe(self.pipe)
243 winutils.close_handle(self.pipe, vlog.warn)
244 winutils.close_handle(self._read.hEvent, vlog.warn)
245 winutils.close_handle(self._write.hEvent, vlog.warn)
99155935
BP
246
247 def __scs_connecting(self):
03947eb7
AB
248 if self.socket is not None:
249 retval = ovs.socket_util.check_connection_completion(self.socket)
250 assert retval != errno.EINPROGRESS
251 elif sys.platform == 'win32':
252 if self.retry_connect:
253 try:
254 self.pipe = winutils.create_file(self._pipename)
255 self._retry_connect = False
256 retval = 0
257 except pywintypes.error as e:
258 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
259 retval = errno.EAGAIN
260 else:
261 self._retry_connect = False
262 retval = errno.ENOENT
263 else:
264 # If retry_connect is false, it means it's already
265 # connected so we can set the value of retval to 0
266 retval = 0
267
99155935
BP
268 if retval == 0:
269 self.state = Stream.__S_CONNECTED
270 elif retval != errno.EAGAIN:
271 self.state = Stream.__S_DISCONNECTED
272 self.error = retval
273
274 def connect(self):
275 """Tries to complete the connection on this stream. If the connection
276 is complete, returns 0 if the connection was successful or a positive
277 errno value if it failed. If the connection is still in progress,
278 returns errno.EAGAIN."""
dcb66dae
BP
279
280 if self.state == Stream.__S_CONNECTING:
281 self.__scs_connecting()
282
283 if self.state == Stream.__S_CONNECTING:
284 return errno.EAGAIN
285 elif self.state == Stream.__S_CONNECTED:
286 return 0
287 else:
288 assert self.state == Stream.__S_DISCONNECTED
289 return self.error
99155935
BP
290
291 def recv(self, n):
292 """Tries to receive up to 'n' bytes from this stream. Returns a
293 (error, string) tuple:
26bb0f31 294
99155935
BP
295 - If successful, 'error' is zero and 'string' contains between 1
296 and 'n' bytes of data.
297
298 - On error, 'error' is a positive errno value.
299
300 - If the connection has been closed in the normal fashion or if 'n'
301 is 0, the tuple is (0, "").
26bb0f31 302
99155935
BP
303 The recv function will not block waiting for data to arrive. If no
304 data have been received, it returns (errno.EAGAIN, "") immediately."""
305
306 retval = self.connect()
307 if retval != 0:
308 return (retval, "")
309 elif n == 0:
310 return (0, "")
311
03947eb7
AB
312 if sys.platform == 'win32' and self.socket is None:
313 return self.__recv_windows(n)
314
99155935
BP
315 try:
316 return (0, self.socket.recv(n))
3ab76c56 317 except socket.error as e:
99155935
BP
318 return (ovs.socket_util.get_exception_errno(e), "")
319
03947eb7
AB
320 def __recv_windows(self, n):
321 if self._read_pending:
322 try:
323 nBytesRead = winutils.get_overlapped_result(self.pipe,
324 self._read,
325 False)
326 self._read_pending = False
03947eb7
AB
327 except pywintypes.error as e:
328 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
329 # The operation is still pending, try again
330 self._read_pending = True
331 return (errno.EAGAIN, "")
332 elif e.winerror in winutils.pipe_disconnected_errors:
333 # If the pipe was disconnected, return 0.
334 return (0, "")
335 else:
336 return (errno.EINVAL, "")
ba953e1e
AB
337 else:
338 (errCode, self._read_buffer) = winutils.read_file(self.pipe,
339 n,
340 self._read)
341 if errCode:
342 if errCode == winutils.winerror.ERROR_IO_PENDING:
343 self._read_pending = True
344 return (errno.EAGAIN, "")
345 elif errCode in winutils.pipe_disconnected_errors:
346 # If the pipe was disconnected, return 0.
347 return (0, "")
348 else:
349 return (errCode, "")
03947eb7 350
ba953e1e
AB
351 try:
352 nBytesRead = winutils.get_overlapped_result(self.pipe,
353 self._read,
354 False)
355 winutils.win32event.SetEvent(self._read.hEvent)
356 except pywintypes.error as e:
357 if e.winerror in winutils.pipe_disconnected_errors:
358 # If the pipe was disconnected, return 0.
359 return (0, "")
360 else:
361 return (e.winerror, "")
03947eb7
AB
362
363 recvBuffer = self._read_buffer[:nBytesRead]
2254074e
AB
364 # recvBuffer will have the type memoryview in Python3.
365 # We can use bytes to convert it to type bytes which works on
366 # both Python2 and Python3.
367 return (0, bytes(recvBuffer))
03947eb7 368
99155935
BP
369 def send(self, buf):
370 """Tries to send 'buf' on this stream.
371
372 If successful, returns the number of bytes sent, between 1 and
373 len(buf). 0 is only a valid return value if len(buf) is 0.
374
375 On error, returns a negative errno value.
376
377 Will not block. If no bytes can be immediately accepted for
378 transmission, returns -errno.EAGAIN immediately."""
379
380 retval = self.connect()
381 if retval != 0:
382 return -retval
383 elif len(buf) == 0:
384 return 0
385
28058c29
JS
386 # We must have bytes for sending.
387 if isinstance(buf, six.text_type):
2254074e
AB
388 buf = buf.encode('utf-8')
389
03947eb7
AB
390 if sys.platform == 'win32' and self.socket is None:
391 return self.__send_windows(buf)
392
99155935
BP
393 try:
394 return self.socket.send(buf)
3ab76c56 395 except socket.error as e:
99155935
BP
396 return -ovs.socket_util.get_exception_errno(e)
397
03947eb7
AB
398 def __send_windows(self, buf):
399 if self._write_pending:
400 try:
401 nBytesWritten = winutils.get_overlapped_result(self.pipe,
402 self._write,
403 False)
404 self._write_pending = False
03947eb7
AB
405 except pywintypes.error as e:
406 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
407 # The operation is still pending, try again
408 self._read_pending = True
409 return -errno.EAGAIN
410 elif e.winerror in winutils.pipe_disconnected_errors:
411 # If the pipe was disconnected, return connection reset.
412 return -errno.ECONNRESET
413 else:
414 return -errno.EINVAL
ba953e1e
AB
415 else:
416 (errCode, nBytesWritten) = winutils.write_file(self.pipe,
417 buf,
418 self._write)
419 if errCode:
420 if errCode == winutils.winerror.ERROR_IO_PENDING:
421 self._write_pending = True
422 return -errno.EAGAIN
423 if (not nBytesWritten and
424 errCode in winutils.pipe_disconnected_errors):
425 # If the pipe was disconnected, return connection reset.
426 return -errno.ECONNRESET
03947eb7
AB
427 return nBytesWritten
428
99155935
BP
429 def run(self):
430 pass
431
432 def run_wait(self, poller):
433 pass
434
435 def wait(self, poller, wait):
436 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
437
438 if self.state == Stream.__S_DISCONNECTED:
439 poller.immediate_wake()
440 return
441
442 if self.state == Stream.__S_CONNECTING:
443 wait = Stream.W_CONNECT
03947eb7
AB
444
445 if sys.platform == 'win32':
446 self.__wait_windows(poller, wait)
447 return
448
8ba31bf1 449 if wait == Stream.W_RECV:
bb1c9a65 450 poller.fd_wait(self.socket, ovs.poller.POLLIN)
8ba31bf1 451 else:
bb1c9a65 452 poller.fd_wait(self.socket, ovs.poller.POLLOUT)
99155935 453
03947eb7
AB
454 def __wait_windows(self, poller, wait):
455 if self.socket is not None:
456 if wait == Stream.W_RECV:
0024e9e9
AB
457 mask = (win32file.FD_READ |
458 win32file.FD_ACCEPT |
459 win32file.FD_CLOSE)
460 event = ovs.poller.POLLIN
03947eb7 461 else:
0024e9e9
AB
462 mask = (win32file.FD_WRITE |
463 win32file.FD_CONNECT |
464 win32file.FD_CLOSE)
465 event = ovs.poller.POLLOUT
466
467 try:
468 win32file.WSAEventSelect(self.socket,
469 self._wevent,
470 mask)
471 except pywintypes.error as e:
472 vlog.err("failed to associate events with socket: %s"
473 % e.strerror)
474 poller.fd_wait(self._wevent, event)
03947eb7
AB
475 else:
476 if wait == Stream.W_RECV:
477 if self._read:
478 poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
479 elif wait == Stream.W_SEND:
480 if self._write:
481 poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
482 elif wait == Stream.W_CONNECT:
483 return
484
99155935
BP
485 def connect_wait(self, poller):
486 self.wait(poller, Stream.W_CONNECT)
26bb0f31 487
99155935
BP
488 def recv_wait(self, poller):
489 self.wait(poller, Stream.W_RECV)
26bb0f31 490
99155935
BP
491 def send_wait(self, poller):
492 self.wait(poller, Stream.W_SEND)
26bb0f31 493
99155935
BP
494 def __del__(self):
495 # Don't delete the file: we might have forked.
03947eb7
AB
496 if self.socket is not None:
497 self.socket.close()
498 if self.pipe is not None:
499 # Check if there are any remaining valid handles and close them
500 if self.pipe:
501 winutils.close_handle(self.pipe)
502 if self._read.hEvent:
503 winutils.close_handle(self._read.hEvent)
504 if self._write.hEvent:
505 winutils.close_handle(self._write.hEvent)
99155935 506
d90ed7d6
NS
507 @staticmethod
508 def ssl_set_private_key_file(file_name):
509 Stream._SSL_private_key_file = file_name
510
511 @staticmethod
512 def ssl_set_certificate_file(file_name):
513 Stream._SSL_certificate_file = file_name
514
515 @staticmethod
516 def ssl_set_ca_cert_file(file_name):
517 Stream._SSL_ca_cert_file = file_name
518
26bb0f31 519
99155935 520class PassiveStream(object):
03947eb7
AB
521 # Windows only
522 connect = None # overlapped for read operation
523 connect_pending = False
524
e7dce33f
GL
525 @staticmethod
526 def needs_probes(name):
527 return False if name.startswith("punix:") else True
528
99155935
BP
529 @staticmethod
530 def is_valid_name(name):
531 """Returns True if 'name' is a passive stream name in the form
af358237
OBY
532 "TYPE:ARGS" and TYPE is a supported passive stream type (currently
533 "punix:" or "ptcp"), otherwise False."""
534 return name.startswith("punix:") | name.startswith("ptcp:")
99155935 535
03947eb7 536 def __init__(self, sock, name, bind_path, pipe=None):
99155935 537 self.name = name
03947eb7 538 self.pipe = pipe
99155935 539 self.socket = sock
03947eb7
AB
540 if pipe is not None:
541 self.connect = pywintypes.OVERLAPPED()
0024e9e9 542 self.connect.hEvent = winutils.get_new_event()
03947eb7
AB
543 self.connect_pending = False
544 suffix = name.split(":", 1)[1]
545 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
546 self._pipename = winutils.get_pipe_name(suffix)
547
99155935
BP
548 self.bind_path = bind_path
549
550 @staticmethod
551 def open(name):
552 """Attempts to start listening for remote stream connections. 'name'
553 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
af358237
OBY
554 stream class's name and ARGS are stream class-specific. Currently the
555 supported values for TYPE are "punix" and "ptcp".
99155935
BP
556
557 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
558 new PassiveStream, on failure 'error' is a positive errno value and
559 'pstream' is None."""
560 if not PassiveStream.is_valid_name(name):
561 return errno.EAFNOSUPPORT, None
562
563 bind_path = name[6:]
2c487bc8
PR
564 if name.startswith("punix:"):
565 bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
03947eb7
AB
566 if sys.platform != 'win32':
567 error, sock = ovs.socket_util.make_unix_socket(
568 socket.SOCK_STREAM, True, bind_path, None)
569 if error:
570 return error, None
571 else:
572 # Branch used only on Windows
573 try:
574 open(bind_path, 'w').close()
575 except:
576 return errno.ENOENT, None
577
578 pipename = winutils.get_pipe_name(bind_path)
579 if len(pipename) > 255:
580 # Return invalid argument if the name is too long
581 return errno.ENOENT, None
582
583 npipe = winutils.create_named_pipe(pipename)
584 if not npipe:
585 return errno.ENOENT, None
586 return 0, PassiveStream(None, name, bind_path, pipe=npipe)
af358237
OBY
587
588 elif name.startswith("ptcp:"):
589 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
590 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
591 remote = name.split(':')
592 sock.bind((remote[1], int(remote[2])))
593
594 else:
595 raise Exception('Unknown connection string')
99155935
BP
596
597 try:
598 sock.listen(10)
3ab76c56 599 except socket.error as e:
3a656eaf 600 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
99155935
BP
601 sock.close()
602 return e.error, None
603
604 return 0, PassiveStream(sock, name, bind_path)
605
606 def close(self):
607 """Closes this PassiveStream."""
03947eb7
AB
608 if self.socket is not None:
609 self.socket.close()
610 if self.pipe is not None:
611 winutils.close_handle(self.pipe, vlog.warn)
612 winutils.close_handle(self.connect.hEvent, vlog.warn)
99155935
BP
613 if self.bind_path is not None:
614 ovs.fatal_signal.unlink_file_now(self.bind_path)
615 self.bind_path = None
616
617 def accept(self):
618 """Tries to accept a new connection on this passive stream. Returns
619 (error, stream): if successful, 'error' is 0 and 'stream' is the new
620 Stream object, and on failure 'error' is a positive errno value and
621 'stream' is None.
622
623 Will not block waiting for a connection. If no connection is ready to
624 be accepted, returns (errno.EAGAIN, None) immediately."""
03947eb7
AB
625 if sys.platform == 'win32' and self.socket is None:
626 return self.__accept_windows()
99155935
BP
627 while True:
628 try:
629 sock, addr = self.socket.accept()
630 ovs.socket_util.set_nonblocking(sock)
03947eb7 631 if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
af358237
OBY
632 return 0, Stream(sock, "unix:%s" % addr, 0)
633 return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
634 str(addr[1])), 0)
3ab76c56 635 except socket.error as e:
99155935 636 error = ovs.socket_util.get_exception_errno(e)
03947eb7
AB
637 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
638 # WSAEWOULDBLOCK would be the equivalent on Windows
639 # for EAGAIN on Unix.
640 error = errno.EAGAIN
99155935
BP
641 if error != errno.EAGAIN:
642 # XXX rate-limit
3a656eaf 643 vlog.dbg("accept: %s" % os.strerror(error))
99155935
BP
644 return error, None
645
03947eb7
AB
646 def __accept_windows(self):
647 if self.connect_pending:
648 try:
649 winutils.get_overlapped_result(self.pipe, self.connect, False)
650 except pywintypes.error as e:
651 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
652 # The operation is still pending, try again
653 self.connect_pending = True
654 return errno.EAGAIN, None
655 else:
656 if self.pipe:
657 win32pipe.DisconnectNamedPipe(self.pipe)
658 return errno.EINVAL, None
659 self.connect_pending = False
660
661 error = winutils.connect_named_pipe(self.pipe, self.connect)
662 if error:
663 if error == winutils.winerror.ERROR_IO_PENDING:
664 self.connect_pending = True
665 return errno.EAGAIN, None
666 elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
667 if self.pipe:
668 win32pipe.DisconnectNamedPipe(self.pipe)
669 self.connect_pending = False
670 return errno.EINVAL, None
671 else:
672 win32event.SetEvent(self.connect.hEvent)
673
674 npipe = winutils.create_named_pipe(self._pipename)
675 if not npipe:
676 return errno.ENOENT, None
677
678 old_pipe = self.pipe
679 self.pipe = npipe
680 winutils.win32event.ResetEvent(self.connect.hEvent)
681 return 0, Stream(None, self.name, 0, pipe=old_pipe)
682
99155935 683 def wait(self, poller):
03947eb7
AB
684 if sys.platform != 'win32' or self.socket is not None:
685 poller.fd_wait(self.socket, ovs.poller.POLLIN)
686 else:
687 poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
99155935
BP
688
689 def __del__(self):
690 # Don't delete the file: we might have forked.
03947eb7
AB
691 if self.socket is not None:
692 self.socket.close()
693 if self.pipe is not None:
694 # Check if there are any remaining valid handles and close them
695 if self.pipe:
696 winutils.close_handle(self.pipe)
697 if self._connect.hEvent:
698 winutils.close_handle(self._read.hEvent)
99155935 699
26bb0f31 700
ec394dad
EJ
701def usage(name):
702 return """
703Active %s connection methods:
704 unix:FILE Unix domain socket named FILE
771680d9
YS
705 tcp:HOST:PORT TCP socket to HOST with port no of PORT
706 ssl:HOST:PORT SSL socket to HOST with port no of PORT
ec394dad
EJ
707
708Passive %s connection methods:
709 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
e06d06a7
IY
710
711
e06d06a7 712class UnixStream(Stream):
e7dce33f
GL
713 @staticmethod
714 def needs_probes():
715 return False
716
e06d06a7
IY
717 @staticmethod
718 def _open(suffix, dscp):
719 connect_path = suffix
a0631d92
RB
720 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
721 True, None, connect_path)
884e0dfe
DDP
722
723
f7b7ee97 724Stream.register_method("unix", UnixStream)
e06d06a7
IY
725
726
e06d06a7 727class TCPStream(Stream):
e7dce33f
GL
728 @staticmethod
729 def needs_probes():
730 return True
731
e06d06a7
IY
732 @staticmethod
733 def _open(suffix, dscp):
734 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
735 suffix, 0, dscp)
736 if not error:
737 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
738 return error, sock
884e0dfe
DDP
739
740
f7b7ee97 741Stream.register_method("tcp", TCPStream)
d90ed7d6
NS
742
743
744class SSLStream(Stream):
e7dce33f
GL
745 @staticmethod
746 def needs_probes():
747 return True
d90ed7d6
NS
748
749 @staticmethod
750 def verify_cb(conn, cert, errnum, depth, ok):
751 return ok
752
753 @staticmethod
754 def _open(suffix, dscp):
755 error, sock = TCPStream._open(suffix, dscp)
756 if error:
757 return error, None
758
759 # Create an SSL context
760 ctx = SSL.Context(SSL.SSLv23_METHOD)
761 ctx.set_verify(SSL.VERIFY_PEER, SSLStream.verify_cb)
762 ctx.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
d90ed7d6
NS
763 # If the client has not set the SSL configuration files
764 # exception would be raised.
765 ctx.use_privatekey_file(Stream._SSL_private_key_file)
766 ctx.use_certificate_file(Stream._SSL_certificate_file)
767 ctx.load_verify_locations(Stream._SSL_ca_cert_file)
768
769 ssl_sock = SSL.Connection(ctx, sock)
770 ssl_sock.set_connect_state()
771 return error, ssl_sock
772
773 def connect(self):
774 retval = super(SSLStream, self).connect()
775
776 if retval:
777 return retval
778
779 # TCP Connection is successful. Now do the SSL handshake
780 try:
781 self.socket.do_handshake()
782 except SSL.WantReadError:
783 return errno.EAGAIN
2dc7e5ec
GL
784 except SSL.SysCallError as e:
785 return ovs.socket_util.get_exception_errno(e)
d90ed7d6
NS
786
787 return 0
788
789 def recv(self, n):
790 try:
791 return super(SSLStream, self).recv(n)
792 except SSL.WantReadError:
793 return (errno.EAGAIN, "")
2dc7e5ec
GL
794 except SSL.SysCallError as e:
795 return (ovs.socket_util.get_exception_errno(e), "")
796 except SSL.ZeroReturnError:
797 return (0, "")
d90ed7d6
NS
798
799 def send(self, buf):
800 try:
d90ed7d6
NS
801 return super(SSLStream, self).send(buf)
802 except SSL.WantWriteError:
871a3876
GL
803 return -errno.EAGAIN
804 except SSL.SysCallError as e:
805 return -ovs.socket_util.get_exception_errno(e)
d90ed7d6
NS
806
807
808if SSL:
809 # Register SSL only if the OpenSSL module is available
810 Stream.register_method("ssl", SSLStream)