]>
Commit | Line | Data |
---|---|---|
e0edde6f | 1 | # Copyright (c) 2010, 2011, 2012 Nicira, Inc. |
99155935 BP |
2 | # |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | # you may not use this file except in compliance with the License. | |
5 | # You may obtain a copy of the License at: | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, | |
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | # See the License for the specific language governing permissions and | |
13 | # limitations under the License. | |
14 | ||
15 | import errno | |
99155935 | 16 | import os |
99155935 | 17 | import socket |
03947eb7 | 18 | import sys |
99155935 BP |
19 | |
20 | import ovs.poller | |
21 | import ovs.socket_util | |
3a656eaf EJ |
22 | import ovs.vlog |
23 | ||
6c7050b5 | 24 | import six |
25 | ||
d90ed7d6 NS |
26 | try: |
27 | from OpenSSL import SSL | |
28 | except ImportError: | |
29 | SSL = None | |
30 | ||
03947eb7 AB |
31 | if sys.platform == 'win32': |
32 | import ovs.winutils as winutils | |
33 | import pywintypes | |
34 | import win32event | |
35 | import win32file | |
36 | import win32pipe | |
37 | ||
3a656eaf | 38 | vlog = ovs.vlog.Vlog("stream") |
99155935 | 39 | |
26bb0f31 | 40 | |
f1936eb6 | 41 | def stream_or_pstream_needs_probes(name): |
e7dce33f GL |
42 | """ True if the stream or pstream specified by 'name' needs periodic probes |
43 | to verify connectivity. For [p]streams which need probes, it can take a | |
44 | long time to notice the connection was dropped. Returns False if probes | |
45 | aren't needed, and None if 'name' is invalid""" | |
46 | ||
47 | cls = Stream._find_method(name) | |
48 | if cls: | |
49 | return cls.needs_probes() | |
50 | elif PassiveStream.is_valid_name(name): | |
51 | return PassiveStream.needs_probes(name) | |
f1936eb6 | 52 | else: |
e7dce33f | 53 | return None |
f1936eb6 EJ |
54 | |
55 | ||
99155935 | 56 | class Stream(object): |
d90ed7d6 | 57 | """Bidirectional byte stream. Unix domain sockets, tcp and ssl |
99155935 | 58 | are implemented.""" |
99155935 BP |
59 | |
60 | # States. | |
61 | __S_CONNECTING = 0 | |
62 | __S_CONNECTED = 1 | |
63 | __S_DISCONNECTED = 2 | |
64 | ||
65 | # Kinds of events that one might wait for. | |
66 | W_CONNECT = 0 # Connect complete (success or failure). | |
67 | W_RECV = 1 # Data received. | |
68 | W_SEND = 2 # Send buffer room available. | |
69 | ||
e06d06a7 IY |
70 | _SOCKET_METHODS = {} |
71 | ||
d90ed7d6 NS |
72 | _SSL_private_key_file = None |
73 | _SSL_certificate_file = None | |
74 | _SSL_ca_cert_file = None | |
75 | ||
03947eb7 AB |
76 | # Windows only |
77 | _write = None # overlapped for write operation | |
78 | _read = None # overlapped for read operation | |
79 | _write_pending = False | |
80 | _read_pending = False | |
81 | _retry_connect = False | |
82 | ||
e06d06a7 | 83 | @staticmethod |
f7b7ee97 IY |
84 | def register_method(method, cls): |
85 | Stream._SOCKET_METHODS[method + ":"] = cls | |
e06d06a7 IY |
86 | |
87 | @staticmethod | |
88 | def _find_method(name): | |
cb96c1b2 | 89 | for method, cls in six.iteritems(Stream._SOCKET_METHODS): |
e06d06a7 IY |
90 | if name.startswith(method): |
91 | return cls | |
92 | return None | |
93 | ||
99155935 BP |
94 | @staticmethod |
95 | def is_valid_name(name): | |
96 | """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and | |
d90ed7d6 | 97 | TYPE is a supported stream type ("unix:", "tcp:" and "ssl:"), |
e06d06a7 IY |
98 | otherwise False.""" |
99 | return bool(Stream._find_method(name)) | |
99155935 | 100 | |
03947eb7 | 101 | def __init__(self, socket, name, status, pipe=None, is_server=False): |
99155935 | 102 | self.socket = socket |
03947eb7 AB |
103 | self.pipe = pipe |
104 | if sys.platform == 'win32': | |
03947eb7 AB |
105 | if pipe is not None: |
106 | # Flag to check if fd is a server HANDLE. In the case of a | |
107 | # server handle we have to issue a disconnect before closing | |
108 | # the actual handle. | |
109 | self._server = is_server | |
110 | suffix = name.split(":", 1)[1] | |
111 | suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) | |
112 | self._pipename = winutils.get_pipe_name(suffix) | |
fef22d68 AB |
113 | self._read = pywintypes.OVERLAPPED() |
114 | self._read.hEvent = winutils.get_new_event() | |
115 | self._write = pywintypes.OVERLAPPED() | |
116 | self._write.hEvent = winutils.get_new_event() | |
117 | else: | |
118 | self._wevent = winutils.get_new_event(bManualReset=False, | |
119 | bInitialState=False) | |
03947eb7 | 120 | |
99155935 | 121 | self.name = name |
99155935 BP |
122 | if status == errno.EAGAIN: |
123 | self.state = Stream.__S_CONNECTING | |
124 | elif status == 0: | |
125 | self.state = Stream.__S_CONNECTED | |
126 | else: | |
127 | self.state = Stream.__S_DISCONNECTED | |
128 | ||
129 | self.error = 0 | |
130 | ||
e06d06a7 IY |
131 | # Default value of dscp bits for connection between controller and manager. |
132 | # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined | |
133 | # in <netinet/ip.h> is used. | |
134 | IPTOS_PREC_INTERNETCONTROL = 0xc0 | |
135 | DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2 | |
136 | ||
99155935 | 137 | @staticmethod |
e06d06a7 | 138 | def open(name, dscp=DSCP_DEFAULT): |
99155935 BP |
139 | """Attempts to connect a stream to a remote peer. 'name' is a |
140 | connection name in the form "TYPE:ARGS", where TYPE is an active stream | |
e7c640c3 GL |
141 | class's name and ARGS are stream class-specific. The supported TYPEs |
142 | include "unix", "tcp", and "ssl". | |
99155935 BP |
143 | |
144 | Returns (error, stream): on success 'error' is 0 and 'stream' is the | |
145 | new Stream, on failure 'error' is a positive errno value and 'stream' | |
146 | is None. | |
147 | ||
148 | Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0 | |
149 | and a new Stream. The connect() method can be used to check for | |
150 | successful connection completion.""" | |
e06d06a7 IY |
151 | cls = Stream._find_method(name) |
152 | if not cls: | |
99155935 BP |
153 | return errno.EAFNOSUPPORT, None |
154 | ||
e06d06a7 | 155 | suffix = name.split(":", 1)[1] |
2c487bc8 PR |
156 | if name.startswith("unix:"): |
157 | suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) | |
03947eb7 AB |
158 | if sys.platform == 'win32': |
159 | pipename = winutils.get_pipe_name(suffix) | |
160 | ||
161 | if len(suffix) > 255: | |
162 | # Return invalid argument if the name is too long | |
163 | return errno.ENOENT, None | |
164 | ||
165 | try: | |
166 | # In case of "unix:" argument, the assumption is that | |
167 | # there is a file created in the path (suffix). | |
168 | open(suffix, 'r').close() | |
169 | except: | |
170 | return errno.ENOENT, None | |
171 | ||
172 | try: | |
173 | npipe = winutils.create_file(pipename) | |
174 | try: | |
175 | winutils.set_pipe_mode(npipe, | |
176 | win32pipe.PIPE_READMODE_BYTE) | |
177 | except pywintypes.error as e: | |
178 | return errno.ENOENT, None | |
179 | except pywintypes.error as e: | |
180 | if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: | |
181 | # Pipe is busy, set the retry flag to true and retry | |
182 | # again during the connect function. | |
183 | Stream.retry_connect = True | |
184 | return 0, cls(None, name, errno.EAGAIN, | |
185 | pipe=win32file.INVALID_HANDLE_VALUE, | |
186 | is_server=False) | |
187 | return errno.ENOENT, None | |
188 | return 0, cls(None, name, 0, pipe=npipe, is_server=False) | |
189 | ||
e06d06a7 | 190 | error, sock = cls._open(suffix, dscp) |
99155935 BP |
191 | if error: |
192 | return error, None | |
193 | else: | |
194 | status = ovs.socket_util.check_connection_completion(sock) | |
d90ed7d6 | 195 | return 0, cls(sock, name, status) |
99155935 | 196 | |
e06d06a7 IY |
197 | @staticmethod |
198 | def _open(suffix, dscp): | |
199 | raise NotImplementedError("This method must be overrided by subclass") | |
200 | ||
99155935 | 201 | @staticmethod |
3ab76c56 | 202 | def open_block(error_stream): |
99155935 | 203 | """Blocks until a Stream completes its connection attempt, either |
91c63796 | 204 | succeeding or failing. (error, stream) should be the tuple returned by |
99155935 BP |
205 | Stream.open(). Returns a tuple of the same form. |
206 | ||
207 | Typical usage: | |
91c63796 | 208 | error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))""" |
99155935 | 209 | |
3ab76c56 TW |
210 | # Py3 doesn't support tuple parameter unpacking - PEP 3113 |
211 | error, stream = error_stream | |
99155935 BP |
212 | if not error: |
213 | while True: | |
214 | error = stream.connect() | |
03947eb7 AB |
215 | if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: |
216 | # WSAEWOULDBLOCK would be the equivalent on Windows | |
217 | # for EAGAIN on Unix. | |
218 | error = errno.EAGAIN | |
99155935 BP |
219 | if error != errno.EAGAIN: |
220 | break | |
221 | stream.run() | |
222 | poller = ovs.poller.Poller() | |
5796d9cf | 223 | stream.run_wait(poller) |
99155935 BP |
224 | stream.connect_wait(poller) |
225 | poller.block() | |
03947eb7 AB |
226 | if stream.socket is not None: |
227 | assert error != errno.EINPROGRESS | |
26bb0f31 | 228 | |
99155935 BP |
229 | if error and stream: |
230 | stream.close() | |
231 | stream = None | |
232 | return error, stream | |
233 | ||
234 | def close(self): | |
03947eb7 AB |
235 | if self.socket is not None: |
236 | self.socket.close() | |
237 | if self.pipe is not None: | |
238 | if self._server: | |
09e192cd AS |
239 | # Flush the pipe to allow the client to read the pipe |
240 | # before disconnecting. | |
241 | win32pipe.FlushFileBuffers(self.pipe) | |
03947eb7 AB |
242 | win32pipe.DisconnectNamedPipe(self.pipe) |
243 | winutils.close_handle(self.pipe, vlog.warn) | |
244 | winutils.close_handle(self._read.hEvent, vlog.warn) | |
245 | winutils.close_handle(self._write.hEvent, vlog.warn) | |
99155935 BP |
246 | |
247 | def __scs_connecting(self): | |
03947eb7 AB |
248 | if self.socket is not None: |
249 | retval = ovs.socket_util.check_connection_completion(self.socket) | |
250 | assert retval != errno.EINPROGRESS | |
251 | elif sys.platform == 'win32': | |
252 | if self.retry_connect: | |
253 | try: | |
254 | self.pipe = winutils.create_file(self._pipename) | |
255 | self._retry_connect = False | |
256 | retval = 0 | |
257 | except pywintypes.error as e: | |
258 | if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: | |
259 | retval = errno.EAGAIN | |
260 | else: | |
261 | self._retry_connect = False | |
262 | retval = errno.ENOENT | |
263 | else: | |
264 | # If retry_connect is false, it means it's already | |
265 | # connected so we can set the value of retval to 0 | |
266 | retval = 0 | |
267 | ||
99155935 BP |
268 | if retval == 0: |
269 | self.state = Stream.__S_CONNECTED | |
270 | elif retval != errno.EAGAIN: | |
271 | self.state = Stream.__S_DISCONNECTED | |
272 | self.error = retval | |
273 | ||
274 | def connect(self): | |
275 | """Tries to complete the connection on this stream. If the connection | |
276 | is complete, returns 0 if the connection was successful or a positive | |
277 | errno value if it failed. If the connection is still in progress, | |
278 | returns errno.EAGAIN.""" | |
dcb66dae BP |
279 | |
280 | if self.state == Stream.__S_CONNECTING: | |
281 | self.__scs_connecting() | |
282 | ||
283 | if self.state == Stream.__S_CONNECTING: | |
284 | return errno.EAGAIN | |
285 | elif self.state == Stream.__S_CONNECTED: | |
286 | return 0 | |
287 | else: | |
288 | assert self.state == Stream.__S_DISCONNECTED | |
289 | return self.error | |
99155935 BP |
290 | |
291 | def recv(self, n): | |
292 | """Tries to receive up to 'n' bytes from this stream. Returns a | |
293 | (error, string) tuple: | |
26bb0f31 | 294 | |
99155935 BP |
295 | - If successful, 'error' is zero and 'string' contains between 1 |
296 | and 'n' bytes of data. | |
297 | ||
298 | - On error, 'error' is a positive errno value. | |
299 | ||
300 | - If the connection has been closed in the normal fashion or if 'n' | |
301 | is 0, the tuple is (0, ""). | |
26bb0f31 | 302 | |
99155935 BP |
303 | The recv function will not block waiting for data to arrive. If no |
304 | data have been received, it returns (errno.EAGAIN, "") immediately.""" | |
305 | ||
306 | retval = self.connect() | |
307 | if retval != 0: | |
308 | return (retval, "") | |
309 | elif n == 0: | |
310 | return (0, "") | |
311 | ||
03947eb7 AB |
312 | if sys.platform == 'win32' and self.socket is None: |
313 | return self.__recv_windows(n) | |
314 | ||
99155935 BP |
315 | try: |
316 | return (0, self.socket.recv(n)) | |
3ab76c56 | 317 | except socket.error as e: |
99155935 BP |
318 | return (ovs.socket_util.get_exception_errno(e), "") |
319 | ||
03947eb7 AB |
320 | def __recv_windows(self, n): |
321 | if self._read_pending: | |
322 | try: | |
323 | nBytesRead = winutils.get_overlapped_result(self.pipe, | |
324 | self._read, | |
325 | False) | |
326 | self._read_pending = False | |
03947eb7 AB |
327 | except pywintypes.error as e: |
328 | if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: | |
329 | # The operation is still pending, try again | |
330 | self._read_pending = True | |
331 | return (errno.EAGAIN, "") | |
332 | elif e.winerror in winutils.pipe_disconnected_errors: | |
333 | # If the pipe was disconnected, return 0. | |
334 | return (0, "") | |
335 | else: | |
336 | return (errno.EINVAL, "") | |
ba953e1e AB |
337 | else: |
338 | (errCode, self._read_buffer) = winutils.read_file(self.pipe, | |
339 | n, | |
340 | self._read) | |
341 | if errCode: | |
342 | if errCode == winutils.winerror.ERROR_IO_PENDING: | |
343 | self._read_pending = True | |
344 | return (errno.EAGAIN, "") | |
345 | elif errCode in winutils.pipe_disconnected_errors: | |
346 | # If the pipe was disconnected, return 0. | |
347 | return (0, "") | |
348 | else: | |
349 | return (errCode, "") | |
03947eb7 | 350 | |
ba953e1e AB |
351 | try: |
352 | nBytesRead = winutils.get_overlapped_result(self.pipe, | |
353 | self._read, | |
354 | False) | |
355 | winutils.win32event.SetEvent(self._read.hEvent) | |
356 | except pywintypes.error as e: | |
357 | if e.winerror in winutils.pipe_disconnected_errors: | |
358 | # If the pipe was disconnected, return 0. | |
359 | return (0, "") | |
360 | else: | |
361 | return (e.winerror, "") | |
03947eb7 AB |
362 | |
363 | recvBuffer = self._read_buffer[:nBytesRead] | |
2254074e AB |
364 | # recvBuffer will have the type memoryview in Python3. |
365 | # We can use bytes to convert it to type bytes which works on | |
366 | # both Python2 and Python3. | |
367 | return (0, bytes(recvBuffer)) | |
03947eb7 | 368 | |
99155935 BP |
369 | def send(self, buf): |
370 | """Tries to send 'buf' on this stream. | |
371 | ||
372 | If successful, returns the number of bytes sent, between 1 and | |
373 | len(buf). 0 is only a valid return value if len(buf) is 0. | |
374 | ||
375 | On error, returns a negative errno value. | |
376 | ||
377 | Will not block. If no bytes can be immediately accepted for | |
378 | transmission, returns -errno.EAGAIN immediately.""" | |
379 | ||
380 | retval = self.connect() | |
381 | if retval != 0: | |
382 | return -retval | |
383 | elif len(buf) == 0: | |
384 | return 0 | |
385 | ||
28058c29 JS |
386 | # We must have bytes for sending. |
387 | if isinstance(buf, six.text_type): | |
2254074e AB |
388 | buf = buf.encode('utf-8') |
389 | ||
03947eb7 AB |
390 | if sys.platform == 'win32' and self.socket is None: |
391 | return self.__send_windows(buf) | |
392 | ||
99155935 BP |
393 | try: |
394 | return self.socket.send(buf) | |
3ab76c56 | 395 | except socket.error as e: |
99155935 BP |
396 | return -ovs.socket_util.get_exception_errno(e) |
397 | ||
03947eb7 AB |
398 | def __send_windows(self, buf): |
399 | if self._write_pending: | |
400 | try: | |
401 | nBytesWritten = winutils.get_overlapped_result(self.pipe, | |
402 | self._write, | |
403 | False) | |
404 | self._write_pending = False | |
03947eb7 AB |
405 | except pywintypes.error as e: |
406 | if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: | |
407 | # The operation is still pending, try again | |
408 | self._read_pending = True | |
409 | return -errno.EAGAIN | |
410 | elif e.winerror in winutils.pipe_disconnected_errors: | |
411 | # If the pipe was disconnected, return connection reset. | |
412 | return -errno.ECONNRESET | |
413 | else: | |
414 | return -errno.EINVAL | |
ba953e1e AB |
415 | else: |
416 | (errCode, nBytesWritten) = winutils.write_file(self.pipe, | |
417 | buf, | |
418 | self._write) | |
419 | if errCode: | |
420 | if errCode == winutils.winerror.ERROR_IO_PENDING: | |
421 | self._write_pending = True | |
422 | return -errno.EAGAIN | |
423 | if (not nBytesWritten and | |
424 | errCode in winutils.pipe_disconnected_errors): | |
425 | # If the pipe was disconnected, return connection reset. | |
426 | return -errno.ECONNRESET | |
03947eb7 AB |
427 | return nBytesWritten |
428 | ||
99155935 BP |
429 | def run(self): |
430 | pass | |
431 | ||
432 | def run_wait(self, poller): | |
433 | pass | |
434 | ||
435 | def wait(self, poller, wait): | |
436 | assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND) | |
437 | ||
438 | if self.state == Stream.__S_DISCONNECTED: | |
439 | poller.immediate_wake() | |
440 | return | |
441 | ||
442 | if self.state == Stream.__S_CONNECTING: | |
443 | wait = Stream.W_CONNECT | |
03947eb7 AB |
444 | |
445 | if sys.platform == 'win32': | |
446 | self.__wait_windows(poller, wait) | |
447 | return | |
448 | ||
8ba31bf1 | 449 | if wait == Stream.W_RECV: |
bb1c9a65 | 450 | poller.fd_wait(self.socket, ovs.poller.POLLIN) |
8ba31bf1 | 451 | else: |
bb1c9a65 | 452 | poller.fd_wait(self.socket, ovs.poller.POLLOUT) |
99155935 | 453 | |
03947eb7 AB |
454 | def __wait_windows(self, poller, wait): |
455 | if self.socket is not None: | |
456 | if wait == Stream.W_RECV: | |
0024e9e9 AB |
457 | mask = (win32file.FD_READ | |
458 | win32file.FD_ACCEPT | | |
459 | win32file.FD_CLOSE) | |
460 | event = ovs.poller.POLLIN | |
03947eb7 | 461 | else: |
0024e9e9 AB |
462 | mask = (win32file.FD_WRITE | |
463 | win32file.FD_CONNECT | | |
464 | win32file.FD_CLOSE) | |
465 | event = ovs.poller.POLLOUT | |
466 | ||
467 | try: | |
468 | win32file.WSAEventSelect(self.socket, | |
469 | self._wevent, | |
470 | mask) | |
471 | except pywintypes.error as e: | |
472 | vlog.err("failed to associate events with socket: %s" | |
473 | % e.strerror) | |
474 | poller.fd_wait(self._wevent, event) | |
03947eb7 AB |
475 | else: |
476 | if wait == Stream.W_RECV: | |
477 | if self._read: | |
478 | poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN) | |
479 | elif wait == Stream.W_SEND: | |
480 | if self._write: | |
481 | poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT) | |
482 | elif wait == Stream.W_CONNECT: | |
483 | return | |
484 | ||
99155935 BP |
485 | def connect_wait(self, poller): |
486 | self.wait(poller, Stream.W_CONNECT) | |
26bb0f31 | 487 | |
99155935 BP |
488 | def recv_wait(self, poller): |
489 | self.wait(poller, Stream.W_RECV) | |
26bb0f31 | 490 | |
99155935 BP |
491 | def send_wait(self, poller): |
492 | self.wait(poller, Stream.W_SEND) | |
26bb0f31 | 493 | |
99155935 BP |
494 | def __del__(self): |
495 | # Don't delete the file: we might have forked. | |
03947eb7 AB |
496 | if self.socket is not None: |
497 | self.socket.close() | |
498 | if self.pipe is not None: | |
499 | # Check if there are any remaining valid handles and close them | |
500 | if self.pipe: | |
501 | winutils.close_handle(self.pipe) | |
502 | if self._read.hEvent: | |
503 | winutils.close_handle(self._read.hEvent) | |
504 | if self._write.hEvent: | |
505 | winutils.close_handle(self._write.hEvent) | |
99155935 | 506 | |
d90ed7d6 NS |
507 | @staticmethod |
508 | def ssl_set_private_key_file(file_name): | |
509 | Stream._SSL_private_key_file = file_name | |
510 | ||
511 | @staticmethod | |
512 | def ssl_set_certificate_file(file_name): | |
513 | Stream._SSL_certificate_file = file_name | |
514 | ||
515 | @staticmethod | |
516 | def ssl_set_ca_cert_file(file_name): | |
517 | Stream._SSL_ca_cert_file = file_name | |
518 | ||
26bb0f31 | 519 | |
99155935 | 520 | class PassiveStream(object): |
03947eb7 AB |
521 | # Windows only |
522 | connect = None # overlapped for read operation | |
523 | connect_pending = False | |
524 | ||
e7dce33f GL |
525 | @staticmethod |
526 | def needs_probes(name): | |
527 | return False if name.startswith("punix:") else True | |
528 | ||
99155935 BP |
529 | @staticmethod |
530 | def is_valid_name(name): | |
531 | """Returns True if 'name' is a passive stream name in the form | |
af358237 OBY |
532 | "TYPE:ARGS" and TYPE is a supported passive stream type (currently |
533 | "punix:" or "ptcp"), otherwise False.""" | |
534 | return name.startswith("punix:") | name.startswith("ptcp:") | |
99155935 | 535 | |
03947eb7 | 536 | def __init__(self, sock, name, bind_path, pipe=None): |
99155935 | 537 | self.name = name |
03947eb7 | 538 | self.pipe = pipe |
99155935 | 539 | self.socket = sock |
03947eb7 AB |
540 | if pipe is not None: |
541 | self.connect = pywintypes.OVERLAPPED() | |
0024e9e9 | 542 | self.connect.hEvent = winutils.get_new_event() |
03947eb7 AB |
543 | self.connect_pending = False |
544 | suffix = name.split(":", 1)[1] | |
545 | suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) | |
546 | self._pipename = winutils.get_pipe_name(suffix) | |
547 | ||
99155935 BP |
548 | self.bind_path = bind_path |
549 | ||
550 | @staticmethod | |
551 | def open(name): | |
552 | """Attempts to start listening for remote stream connections. 'name' | |
553 | is a connection name in the form "TYPE:ARGS", where TYPE is an passive | |
af358237 OBY |
554 | stream class's name and ARGS are stream class-specific. Currently the |
555 | supported values for TYPE are "punix" and "ptcp". | |
99155935 BP |
556 | |
557 | Returns (error, pstream): on success 'error' is 0 and 'pstream' is the | |
558 | new PassiveStream, on failure 'error' is a positive errno value and | |
559 | 'pstream' is None.""" | |
560 | if not PassiveStream.is_valid_name(name): | |
561 | return errno.EAFNOSUPPORT, None | |
562 | ||
563 | bind_path = name[6:] | |
2c487bc8 PR |
564 | if name.startswith("punix:"): |
565 | bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path) | |
03947eb7 AB |
566 | if sys.platform != 'win32': |
567 | error, sock = ovs.socket_util.make_unix_socket( | |
568 | socket.SOCK_STREAM, True, bind_path, None) | |
569 | if error: | |
570 | return error, None | |
571 | else: | |
572 | # Branch used only on Windows | |
573 | try: | |
574 | open(bind_path, 'w').close() | |
575 | except: | |
576 | return errno.ENOENT, None | |
577 | ||
578 | pipename = winutils.get_pipe_name(bind_path) | |
579 | if len(pipename) > 255: | |
580 | # Return invalid argument if the name is too long | |
581 | return errno.ENOENT, None | |
582 | ||
583 | npipe = winutils.create_named_pipe(pipename) | |
584 | if not npipe: | |
585 | return errno.ENOENT, None | |
586 | return 0, PassiveStream(None, name, bind_path, pipe=npipe) | |
af358237 OBY |
587 | |
588 | elif name.startswith("ptcp:"): | |
589 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
590 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
591 | remote = name.split(':') | |
592 | sock.bind((remote[1], int(remote[2]))) | |
593 | ||
594 | else: | |
595 | raise Exception('Unknown connection string') | |
99155935 BP |
596 | |
597 | try: | |
598 | sock.listen(10) | |
3ab76c56 | 599 | except socket.error as e: |
3a656eaf | 600 | vlog.err("%s: listen: %s" % (name, os.strerror(e.error))) |
99155935 BP |
601 | sock.close() |
602 | return e.error, None | |
603 | ||
604 | return 0, PassiveStream(sock, name, bind_path) | |
605 | ||
606 | def close(self): | |
607 | """Closes this PassiveStream.""" | |
03947eb7 AB |
608 | if self.socket is not None: |
609 | self.socket.close() | |
610 | if self.pipe is not None: | |
611 | winutils.close_handle(self.pipe, vlog.warn) | |
612 | winutils.close_handle(self.connect.hEvent, vlog.warn) | |
99155935 BP |
613 | if self.bind_path is not None: |
614 | ovs.fatal_signal.unlink_file_now(self.bind_path) | |
615 | self.bind_path = None | |
616 | ||
617 | def accept(self): | |
618 | """Tries to accept a new connection on this passive stream. Returns | |
619 | (error, stream): if successful, 'error' is 0 and 'stream' is the new | |
620 | Stream object, and on failure 'error' is a positive errno value and | |
621 | 'stream' is None. | |
622 | ||
623 | Will not block waiting for a connection. If no connection is ready to | |
624 | be accepted, returns (errno.EAGAIN, None) immediately.""" | |
03947eb7 AB |
625 | if sys.platform == 'win32' and self.socket is None: |
626 | return self.__accept_windows() | |
99155935 BP |
627 | while True: |
628 | try: | |
629 | sock, addr = self.socket.accept() | |
630 | ovs.socket_util.set_nonblocking(sock) | |
03947eb7 | 631 | if (sys.platform != 'win32' and sock.family == socket.AF_UNIX): |
af358237 OBY |
632 | return 0, Stream(sock, "unix:%s" % addr, 0) |
633 | return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0], | |
634 | str(addr[1])), 0) | |
3ab76c56 | 635 | except socket.error as e: |
99155935 | 636 | error = ovs.socket_util.get_exception_errno(e) |
03947eb7 AB |
637 | if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: |
638 | # WSAEWOULDBLOCK would be the equivalent on Windows | |
639 | # for EAGAIN on Unix. | |
640 | error = errno.EAGAIN | |
99155935 BP |
641 | if error != errno.EAGAIN: |
642 | # XXX rate-limit | |
3a656eaf | 643 | vlog.dbg("accept: %s" % os.strerror(error)) |
99155935 BP |
644 | return error, None |
645 | ||
03947eb7 AB |
646 | def __accept_windows(self): |
647 | if self.connect_pending: | |
648 | try: | |
649 | winutils.get_overlapped_result(self.pipe, self.connect, False) | |
650 | except pywintypes.error as e: | |
651 | if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: | |
652 | # The operation is still pending, try again | |
653 | self.connect_pending = True | |
654 | return errno.EAGAIN, None | |
655 | else: | |
656 | if self.pipe: | |
657 | win32pipe.DisconnectNamedPipe(self.pipe) | |
658 | return errno.EINVAL, None | |
659 | self.connect_pending = False | |
660 | ||
661 | error = winutils.connect_named_pipe(self.pipe, self.connect) | |
662 | if error: | |
663 | if error == winutils.winerror.ERROR_IO_PENDING: | |
664 | self.connect_pending = True | |
665 | return errno.EAGAIN, None | |
666 | elif error != winutils.winerror.ERROR_PIPE_CONNECTED: | |
667 | if self.pipe: | |
668 | win32pipe.DisconnectNamedPipe(self.pipe) | |
669 | self.connect_pending = False | |
670 | return errno.EINVAL, None | |
671 | else: | |
672 | win32event.SetEvent(self.connect.hEvent) | |
673 | ||
674 | npipe = winutils.create_named_pipe(self._pipename) | |
675 | if not npipe: | |
676 | return errno.ENOENT, None | |
677 | ||
678 | old_pipe = self.pipe | |
679 | self.pipe = npipe | |
680 | winutils.win32event.ResetEvent(self.connect.hEvent) | |
681 | return 0, Stream(None, self.name, 0, pipe=old_pipe) | |
682 | ||
99155935 | 683 | def wait(self, poller): |
03947eb7 AB |
684 | if sys.platform != 'win32' or self.socket is not None: |
685 | poller.fd_wait(self.socket, ovs.poller.POLLIN) | |
686 | else: | |
687 | poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) | |
99155935 BP |
688 | |
689 | def __del__(self): | |
690 | # Don't delete the file: we might have forked. | |
03947eb7 AB |
691 | if self.socket is not None: |
692 | self.socket.close() | |
693 | if self.pipe is not None: | |
694 | # Check if there are any remaining valid handles and close them | |
695 | if self.pipe: | |
696 | winutils.close_handle(self.pipe) | |
697 | if self._connect.hEvent: | |
698 | winutils.close_handle(self._read.hEvent) | |
99155935 | 699 | |
26bb0f31 | 700 | |
ec394dad EJ |
701 | def usage(name): |
702 | return """ | |
703 | Active %s connection methods: | |
704 | unix:FILE Unix domain socket named FILE | |
771680d9 YS |
705 | tcp:HOST:PORT TCP socket to HOST with port no of PORT |
706 | ssl:HOST:PORT SSL socket to HOST with port no of PORT | |
ec394dad EJ |
707 | |
708 | Passive %s connection methods: | |
709 | punix:FILE Listen on Unix domain socket FILE""" % (name, name) | |
e06d06a7 IY |
710 | |
711 | ||
e06d06a7 | 712 | class UnixStream(Stream): |
e7dce33f GL |
713 | @staticmethod |
714 | def needs_probes(): | |
715 | return False | |
716 | ||
e06d06a7 IY |
717 | @staticmethod |
718 | def _open(suffix, dscp): | |
719 | connect_path = suffix | |
a0631d92 RB |
720 | return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, |
721 | True, None, connect_path) | |
884e0dfe DDP |
722 | |
723 | ||
f7b7ee97 | 724 | Stream.register_method("unix", UnixStream) |
e06d06a7 IY |
725 | |
726 | ||
e06d06a7 | 727 | class TCPStream(Stream): |
e7dce33f GL |
728 | @staticmethod |
729 | def needs_probes(): | |
730 | return True | |
731 | ||
e06d06a7 IY |
732 | @staticmethod |
733 | def _open(suffix, dscp): | |
734 | error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM, | |
735 | suffix, 0, dscp) | |
736 | if not error: | |
737 | sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | |
738 | return error, sock | |
884e0dfe DDP |
739 | |
740 | ||
f7b7ee97 | 741 | Stream.register_method("tcp", TCPStream) |
d90ed7d6 NS |
742 | |
743 | ||
744 | class SSLStream(Stream): | |
e7dce33f GL |
745 | @staticmethod |
746 | def needs_probes(): | |
747 | return True | |
d90ed7d6 NS |
748 | |
749 | @staticmethod | |
750 | def verify_cb(conn, cert, errnum, depth, ok): | |
751 | return ok | |
752 | ||
753 | @staticmethod | |
754 | def _open(suffix, dscp): | |
755 | error, sock = TCPStream._open(suffix, dscp) | |
756 | if error: | |
757 | return error, None | |
758 | ||
759 | # Create an SSL context | |
760 | ctx = SSL.Context(SSL.SSLv23_METHOD) | |
761 | ctx.set_verify(SSL.VERIFY_PEER, SSLStream.verify_cb) | |
762 | ctx.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3) | |
d90ed7d6 NS |
763 | # If the client has not set the SSL configuration files |
764 | # exception would be raised. | |
765 | ctx.use_privatekey_file(Stream._SSL_private_key_file) | |
766 | ctx.use_certificate_file(Stream._SSL_certificate_file) | |
767 | ctx.load_verify_locations(Stream._SSL_ca_cert_file) | |
768 | ||
769 | ssl_sock = SSL.Connection(ctx, sock) | |
770 | ssl_sock.set_connect_state() | |
771 | return error, ssl_sock | |
772 | ||
773 | def connect(self): | |
774 | retval = super(SSLStream, self).connect() | |
775 | ||
776 | if retval: | |
777 | return retval | |
778 | ||
779 | # TCP Connection is successful. Now do the SSL handshake | |
780 | try: | |
781 | self.socket.do_handshake() | |
782 | except SSL.WantReadError: | |
783 | return errno.EAGAIN | |
2dc7e5ec GL |
784 | except SSL.SysCallError as e: |
785 | return ovs.socket_util.get_exception_errno(e) | |
d90ed7d6 NS |
786 | |
787 | return 0 | |
788 | ||
789 | def recv(self, n): | |
790 | try: | |
791 | return super(SSLStream, self).recv(n) | |
792 | except SSL.WantReadError: | |
793 | return (errno.EAGAIN, "") | |
2dc7e5ec GL |
794 | except SSL.SysCallError as e: |
795 | return (ovs.socket_util.get_exception_errno(e), "") | |
796 | except SSL.ZeroReturnError: | |
797 | return (0, "") | |
d90ed7d6 NS |
798 | |
799 | def send(self, buf): | |
800 | try: | |
d90ed7d6 NS |
801 | return super(SSLStream, self).send(buf) |
802 | except SSL.WantWriteError: | |
871a3876 GL |
803 | return -errno.EAGAIN |
804 | except SSL.SysCallError as e: | |
805 | return -ovs.socket_util.get_exception_errno(e) | |
d90ed7d6 NS |
806 | |
807 | ||
808 | if SSL: | |
809 | # Register SSL only if the OpenSSL module is available | |
810 | Stream.register_method("ssl", SSLStream) |