1# Copyright (c) 2010, 2011, 2012 Nicira, Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at: 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import errno 16import os 17import socket 18import sys 19 20import ovs.poller 21import ovs.socket_util 22import ovs.vlog 23 24import six 25 26try: 27 from OpenSSL import SSL 28except ImportError: 29 SSL = None 30 31if sys.platform == 'win32': 32 import ovs.winutils as winutils 33 import pywintypes 34 import win32event 35 import win32file 36 import win32pipe 37 38vlog = ovs.vlog.Vlog("stream") 39 40 41def stream_or_pstream_needs_probes(name): 42 """ True if the stream or pstream specified by 'name' needs periodic probes 43 to verify connectivity. For [p]streams which need probes, it can take a 44 long time to notice the connection was dropped. Returns False if probes 45 aren't needed, and None if 'name' is invalid""" 46 47 cls = Stream._find_method(name) 48 if cls: 49 return cls.needs_probes() 50 elif PassiveStream.is_valid_name(name): 51 return PassiveStream.needs_probes(name) 52 else: 53 return None 54 55 56class Stream(object): 57 """Bidirectional byte stream. Unix domain sockets, tcp and ssl 58 are implemented.""" 59 60 # States. 61 __S_CONNECTING = 0 62 __S_CONNECTED = 1 63 __S_DISCONNECTED = 2 64 65 # Kinds of events that one might wait for. 66 W_CONNECT = 0 # Connect complete (success or failure). 67 W_RECV = 1 # Data received. 68 W_SEND = 2 # Send buffer room available. 69 70 _SOCKET_METHODS = {} 71 72 _SSL_private_key_file = None 73 _SSL_certificate_file = None 74 _SSL_ca_cert_file = None 75 76 # Windows only 77 _write = None # overlapped for write operation 78 _read = None # overlapped for read operation 79 _write_pending = False 80 _read_pending = False 81 _retry_connect = False 82 83 @staticmethod 84 def register_method(method, cls): 85 Stream._SOCKET_METHODS[method + ":"] = cls 86 87 @staticmethod 88 def _find_method(name): 89 for method, cls in six.iteritems(Stream._SOCKET_METHODS): 90 if name.startswith(method): 91 return cls 92 return None 93 94 @staticmethod 95 def is_valid_name(name): 96 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and 97 TYPE is a supported stream type ("unix:", "tcp:" and "ssl:"), 98 otherwise False.""" 99 return bool(Stream._find_method(name)) 100 101 def __init__(self, socket, name, status, pipe=None, is_server=False): 102 self.socket = socket 103 self.pipe = pipe 104 if sys.platform == 'win32': 105 if pipe is not None: 106 # Flag to check if fd is a server HANDLE. In the case of a 107 # server handle we have to issue a disconnect before closing 108 # the actual handle. 109 self._server = is_server 110 suffix = name.split(":", 1)[1] 111 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) 112 self._pipename = winutils.get_pipe_name(suffix) 113 self._read = pywintypes.OVERLAPPED() 114 self._read.hEvent = winutils.get_new_event() 115 self._write = pywintypes.OVERLAPPED() 116 self._write.hEvent = winutils.get_new_event() 117 else: 118 self._wevent = winutils.get_new_event(bManualReset=False, 119 bInitialState=False) 120 121 self.name = name 122 if status == errno.EAGAIN: 123 self.state = Stream.__S_CONNECTING 124 elif status == 0: 125 self.state = Stream.__S_CONNECTED 126 else: 127 self.state = Stream.__S_DISCONNECTED 128 129 self.error = 0 130 131 # Default value of dscp bits for connection between controller and manager. 132 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined 133 # in <netinet/ip.h> is used. 134 IPTOS_PREC_INTERNETCONTROL = 0xc0 135 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2 136 137 @staticmethod 138 def open(name, dscp=DSCP_DEFAULT): 139 """Attempts to connect a stream to a remote peer. 'name' is a 140 connection name in the form "TYPE:ARGS", where TYPE is an active stream 141 class's name and ARGS are stream class-specific. The supported TYPEs 142 include "unix", "tcp", and "ssl". 143 144 Returns (error, stream): on success 'error' is 0 and 'stream' is the 145 new Stream, on failure 'error' is a positive errno value and 'stream' 146 is None. 147 148 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0 149 and a new Stream. The connect() method can be used to check for 150 successful connection completion.""" 151 cls = Stream._find_method(name) 152 if not cls: 153 return errno.EAFNOSUPPORT, None 154 155 suffix = name.split(":", 1)[1] 156 if name.startswith("unix:"): 157 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) 158 if sys.platform == 'win32': 159 pipename = winutils.get_pipe_name(suffix) 160 161 if len(suffix) > 255: 162 # Return invalid argument if the name is too long 163 return errno.ENOENT, None 164 165 try: 166 # In case of "unix:" argument, the assumption is that 167 # there is a file created in the path (suffix). 168 open(suffix, 'r').close() 169 except: 170 return errno.ENOENT, None 171 172 try: 173 npipe = winutils.create_file(pipename) 174 try: 175 winutils.set_pipe_mode(npipe, 176 win32pipe.PIPE_READMODE_BYTE) 177 except pywintypes.error as e: 178 return errno.ENOENT, None 179 except pywintypes.error as e: 180 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: 181 # Pipe is busy, set the retry flag to true and retry 182 # again during the connect function. 183 Stream.retry_connect = True 184 return 0, cls(None, name, errno.EAGAIN, 185 pipe=win32file.INVALID_HANDLE_VALUE, 186 is_server=False) 187 return errno.ENOENT, None 188 return 0, cls(None, name, 0, pipe=npipe, is_server=False) 189 190 error, sock = cls._open(suffix, dscp) 191 if error: 192 return error, None 193 else: 194 status = ovs.socket_util.check_connection_completion(sock) 195 return 0, cls(sock, name, status) 196 197 @staticmethod 198 def _open(suffix, dscp): 199 raise NotImplementedError("This method must be overrided by subclass") 200 201 @staticmethod 202 def open_block(error_stream): 203 """Blocks until a Stream completes its connection attempt, either 204 succeeding or failing. (error, stream) should be the tuple returned by 205 Stream.open(). Returns a tuple of the same form. 206 207 Typical usage: 208 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))""" 209 210 # Py3 doesn't support tuple parameter unpacking - PEP 3113 211 error, stream = error_stream 212 if not error: 213 while True: 214 error = stream.connect() 215 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: 216 # WSAEWOULDBLOCK would be the equivalent on Windows 217 # for EAGAIN on Unix. 218 error = errno.EAGAIN 219 if error != errno.EAGAIN: 220 break 221 stream.run() 222 poller = ovs.poller.Poller() 223 stream.run_wait(poller) 224 stream.connect_wait(poller) 225 poller.block() 226 if stream.socket is not None: 227 assert error != errno.EINPROGRESS 228 229 if error and stream: 230 stream.close() 231 stream = None 232 return error, stream 233 234 def close(self): 235 if self.socket is not None: 236 self.socket.close() 237 if self.pipe is not None: 238 if self._server: 239 # Flush the pipe to allow the client to read the pipe 240 # before disconnecting. 241 win32pipe.FlushFileBuffers(self.pipe) 242 win32pipe.DisconnectNamedPipe(self.pipe) 243 winutils.close_handle(self.pipe, vlog.warn) 244 winutils.close_handle(self._read.hEvent, vlog.warn) 245 winutils.close_handle(self._write.hEvent, vlog.warn) 246 247 def __scs_connecting(self): 248 if self.socket is not None: 249 retval = ovs.socket_util.check_connection_completion(self.socket) 250 assert retval != errno.EINPROGRESS 251 elif sys.platform == 'win32': 252 if self.retry_connect: 253 try: 254 self.pipe = winutils.create_file(self._pipename) 255 self._retry_connect = False 256 retval = 0 257 except pywintypes.error as e: 258 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: 259 retval = errno.EAGAIN 260 else: 261 self._retry_connect = False 262 retval = errno.ENOENT 263 else: 264 # If retry_connect is false, it means it's already 265 # connected so we can set the value of retval to 0 266 retval = 0 267 268 if retval == 0: 269 self.state = Stream.__S_CONNECTED 270 elif retval != errno.EAGAIN: 271 self.state = Stream.__S_DISCONNECTED 272 self.error = retval 273 274 def connect(self): 275 """Tries to complete the connection on this stream. If the connection 276 is complete, returns 0 if the connection was successful or a positive 277 errno value if it failed. If the connection is still in progress, 278 returns errno.EAGAIN.""" 279 280 if self.state == Stream.__S_CONNECTING: 281 self.__scs_connecting() 282 283 if self.state == Stream.__S_CONNECTING: 284 return errno.EAGAIN 285 elif self.state == Stream.__S_CONNECTED: 286 return 0 287 else: 288 assert self.state == Stream.__S_DISCONNECTED 289 return self.error 290 291 def recv(self, n): 292 """Tries to receive up to 'n' bytes from this stream. Returns a 293 (error, string) tuple: 294 295 - If successful, 'error' is zero and 'string' contains between 1 296 and 'n' bytes of data. 297 298 - On error, 'error' is a positive errno value. 299 300 - If the connection has been closed in the normal fashion or if 'n' 301 is 0, the tuple is (0, ""). 302 303 The recv function will not block waiting for data to arrive. If no 304 data have been received, it returns (errno.EAGAIN, "") immediately.""" 305 306 retval = self.connect() 307 if retval != 0: 308 return (retval, "") 309 elif n == 0: 310 return (0, "") 311 312 if sys.platform == 'win32' and self.socket is None: 313 return self.__recv_windows(n) 314 315 try: 316 return (0, self.socket.recv(n)) 317 except socket.error as e: 318 return (ovs.socket_util.get_exception_errno(e), "") 319 320 def __recv_windows(self, n): 321 if self._read_pending: 322 try: 323 nBytesRead = winutils.get_overlapped_result(self.pipe, 324 self._read, 325 False) 326 self._read_pending = False 327 except pywintypes.error as e: 328 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: 329 # The operation is still pending, try again 330 self._read_pending = True 331 return (errno.EAGAIN, "") 332 elif e.winerror in winutils.pipe_disconnected_errors: 333 # If the pipe was disconnected, return 0. 334 return (0, "") 335 else: 336 return (errno.EINVAL, "") 337 else: 338 (errCode, self._read_buffer) = winutils.read_file(self.pipe, 339 n, 340 self._read) 341 if errCode: 342 if errCode == winutils.winerror.ERROR_IO_PENDING: 343 self._read_pending = True 344 return (errno.EAGAIN, "") 345 elif errCode in winutils.pipe_disconnected_errors: 346 # If the pipe was disconnected, return 0. 347 return (0, "") 348 else: 349 return (errCode, "") 350 351 try: 352 nBytesRead = winutils.get_overlapped_result(self.pipe, 353 self._read, 354 False) 355 winutils.win32event.SetEvent(self._read.hEvent) 356 except pywintypes.error as e: 357 if e.winerror in winutils.pipe_disconnected_errors: 358 # If the pipe was disconnected, return 0. 359 return (0, "") 360 else: 361 return (e.winerror, "") 362 363 recvBuffer = self._read_buffer[:nBytesRead] 364 # recvBuffer will have the type memoryview in Python3. 365 # We can use bytes to convert it to type bytes which works on 366 # both Python2 and Python3. 367 return (0, bytes(recvBuffer)) 368 369 def send(self, buf): 370 """Tries to send 'buf' on this stream. 371 372 If successful, returns the number of bytes sent, between 1 and 373 len(buf). 0 is only a valid return value if len(buf) is 0. 374 375 On error, returns a negative errno value. 376 377 Will not block. If no bytes can be immediately accepted for 378 transmission, returns -errno.EAGAIN immediately.""" 379 380 retval = self.connect() 381 if retval != 0: 382 return -retval 383 elif len(buf) == 0: 384 return 0 385 386 # Python 3 has separate types for strings and bytes. We must have 387 # bytes here. 388 if six.PY3 and not isinstance(buf, bytes): 389 buf = bytes(buf, 'utf-8') 390 elif six.PY2: 391 buf = buf.encode('utf-8') 392 393 if sys.platform == 'win32' and self.socket is None: 394 return self.__send_windows(buf) 395 396 try: 397 return self.socket.send(buf) 398 except socket.error as e: 399 return -ovs.socket_util.get_exception_errno(e) 400 401 def __send_windows(self, buf): 402 if self._write_pending: 403 try: 404 nBytesWritten = winutils.get_overlapped_result(self.pipe, 405 self._write, 406 False) 407 self._write_pending = False 408 except pywintypes.error as e: 409 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: 410 # The operation is still pending, try again 411 self._read_pending = True 412 return -errno.EAGAIN 413 elif e.winerror in winutils.pipe_disconnected_errors: 414 # If the pipe was disconnected, return connection reset. 415 return -errno.ECONNRESET 416 else: 417 return -errno.EINVAL 418 else: 419 (errCode, nBytesWritten) = winutils.write_file(self.pipe, 420 buf, 421 self._write) 422 if errCode: 423 if errCode == winutils.winerror.ERROR_IO_PENDING: 424 self._write_pending = True 425 return -errno.EAGAIN 426 if (not nBytesWritten and 427 errCode in winutils.pipe_disconnected_errors): 428 # If the pipe was disconnected, return connection reset. 429 return -errno.ECONNRESET 430 return nBytesWritten 431 432 def run(self): 433 pass 434 435 def run_wait(self, poller): 436 pass 437 438 def wait(self, poller, wait): 439 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND) 440 441 if self.state == Stream.__S_DISCONNECTED: 442 poller.immediate_wake() 443 return 444 445 if self.state == Stream.__S_CONNECTING: 446 wait = Stream.W_CONNECT 447 448 if sys.platform == 'win32': 449 self.__wait_windows(poller, wait) 450 return 451 452 if wait == Stream.W_RECV: 453 poller.fd_wait(self.socket, ovs.poller.POLLIN) 454 else: 455 poller.fd_wait(self.socket, ovs.poller.POLLOUT) 456 457 def __wait_windows(self, poller, wait): 458 if self.socket is not None: 459 if wait == Stream.W_RECV: 460 mask = (win32file.FD_READ | 461 win32file.FD_ACCEPT | 462 win32file.FD_CLOSE) 463 event = ovs.poller.POLLIN 464 else: 465 mask = (win32file.FD_WRITE | 466 win32file.FD_CONNECT | 467 win32file.FD_CLOSE) 468 event = ovs.poller.POLLOUT 469 470 try: 471 win32file.WSAEventSelect(self.socket, 472 self._wevent, 473 mask) 474 except pywintypes.error as e: 475 vlog.err("failed to associate events with socket: %s" 476 % e.strerror) 477 poller.fd_wait(self._wevent, event) 478 else: 479 if wait == Stream.W_RECV: 480 if self._read: 481 poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN) 482 elif wait == Stream.W_SEND: 483 if self._write: 484 poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT) 485 elif wait == Stream.W_CONNECT: 486 return 487 488 def connect_wait(self, poller): 489 self.wait(poller, Stream.W_CONNECT) 490 491 def recv_wait(self, poller): 492 self.wait(poller, Stream.W_RECV) 493 494 def send_wait(self, poller): 495 self.wait(poller, Stream.W_SEND) 496 497 def __del__(self): 498 # Don't delete the file: we might have forked. 499 if self.socket is not None: 500 self.socket.close() 501 if self.pipe is not None: 502 # Check if there are any remaining valid handles and close them 503 if self.pipe: 504 winutils.close_handle(self.pipe) 505 if self._read.hEvent: 506 winutils.close_handle(self._read.hEvent) 507 if self._write.hEvent: 508 winutils.close_handle(self._write.hEvent) 509 510 @staticmethod 511 def ssl_set_private_key_file(file_name): 512 Stream._SSL_private_key_file = file_name 513 514 @staticmethod 515 def ssl_set_certificate_file(file_name): 516 Stream._SSL_certificate_file = file_name 517 518 @staticmethod 519 def ssl_set_ca_cert_file(file_name): 520 Stream._SSL_ca_cert_file = file_name 521 522 523class PassiveStream(object): 524 # Windows only 525 connect = None # overlapped for read operation 526 connect_pending = False 527 528 @staticmethod 529 def needs_probes(name): 530 return False if name.startswith("punix:") else True 531 532 @staticmethod 533 def is_valid_name(name): 534 """Returns True if 'name' is a passive stream name in the form 535 "TYPE:ARGS" and TYPE is a supported passive stream type (currently 536 "punix:" or "ptcp"), otherwise False.""" 537 return name.startswith("punix:") | name.startswith("ptcp:") 538 539 def __init__(self, sock, name, bind_path, pipe=None): 540 self.name = name 541 self.pipe = pipe 542 self.socket = sock 543 if pipe is not None: 544 self.connect = pywintypes.OVERLAPPED() 545 self.connect.hEvent = winutils.get_new_event() 546 self.connect_pending = False 547 suffix = name.split(":", 1)[1] 548 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) 549 self._pipename = winutils.get_pipe_name(suffix) 550 551 self.bind_path = bind_path 552 553 @staticmethod 554 def open(name): 555 """Attempts to start listening for remote stream connections. 'name' 556 is a connection name in the form "TYPE:ARGS", where TYPE is an passive 557 stream class's name and ARGS are stream class-specific. Currently the 558 supported values for TYPE are "punix" and "ptcp". 559 560 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the 561 new PassiveStream, on failure 'error' is a positive errno value and 562 'pstream' is None.""" 563 if not PassiveStream.is_valid_name(name): 564 return errno.EAFNOSUPPORT, None 565 566 bind_path = name[6:] 567 if name.startswith("punix:"): 568 bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path) 569 if sys.platform != 'win32': 570 error, sock = ovs.socket_util.make_unix_socket( 571 socket.SOCK_STREAM, True, bind_path, None) 572 if error: 573 return error, None 574 else: 575 # Branch used only on Windows 576 try: 577 open(bind_path, 'w').close() 578 except: 579 return errno.ENOENT, None 580 581 pipename = winutils.get_pipe_name(bind_path) 582 if len(pipename) > 255: 583 # Return invalid argument if the name is too long 584 return errno.ENOENT, None 585 586 npipe = winutils.create_named_pipe(pipename) 587 if not npipe: 588 return errno.ENOENT, None 589 return 0, PassiveStream(None, name, bind_path, pipe=npipe) 590 591 elif name.startswith("ptcp:"): 592 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 593 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 594 remote = name.split(':') 595 sock.bind((remote[1], int(remote[2]))) 596 597 else: 598 raise Exception('Unknown connection string') 599 600 try: 601 sock.listen(10) 602 except socket.error as e: 603 vlog.err("%s: listen: %s" % (name, os.strerror(e.error))) 604 sock.close() 605 return e.error, None 606 607 return 0, PassiveStream(sock, name, bind_path) 608 609 def close(self): 610 """Closes this PassiveStream.""" 611 if self.socket is not None: 612 self.socket.close() 613 if self.pipe is not None: 614 winutils.close_handle(self.pipe, vlog.warn) 615 winutils.close_handle(self.connect.hEvent, vlog.warn) 616 if self.bind_path is not None: 617 ovs.fatal_signal.unlink_file_now(self.bind_path) 618 self.bind_path = None 619 620 def accept(self): 621 """Tries to accept a new connection on this passive stream. Returns 622 (error, stream): if successful, 'error' is 0 and 'stream' is the new 623 Stream object, and on failure 'error' is a positive errno value and 624 'stream' is None. 625 626 Will not block waiting for a connection. If no connection is ready to 627 be accepted, returns (errno.EAGAIN, None) immediately.""" 628 if sys.platform == 'win32' and self.socket is None: 629 return self.__accept_windows() 630 while True: 631 try: 632 sock, addr = self.socket.accept() 633 ovs.socket_util.set_nonblocking(sock) 634 if (sys.platform != 'win32' and sock.family == socket.AF_UNIX): 635 return 0, Stream(sock, "unix:%s" % addr, 0) 636 return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0], 637 str(addr[1])), 0) 638 except socket.error as e: 639 error = ovs.socket_util.get_exception_errno(e) 640 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: 641 # WSAEWOULDBLOCK would be the equivalent on Windows 642 # for EAGAIN on Unix. 643 error = errno.EAGAIN 644 if error != errno.EAGAIN: 645 # XXX rate-limit 646 vlog.dbg("accept: %s" % os.strerror(error)) 647 return error, None 648 649 def __accept_windows(self): 650 if self.connect_pending: 651 try: 652 winutils.get_overlapped_result(self.pipe, self.connect, False) 653 except pywintypes.error as e: 654 if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: 655 # The operation is still pending, try again 656 self.connect_pending = True 657 return errno.EAGAIN, None 658 else: 659 if self.pipe: 660 win32pipe.DisconnectNamedPipe(self.pipe) 661 return errno.EINVAL, None 662 self.connect_pending = False 663 664 error = winutils.connect_named_pipe(self.pipe, self.connect) 665 if error: 666 if error == winutils.winerror.ERROR_IO_PENDING: 667 self.connect_pending = True 668 return errno.EAGAIN, None 669 elif error != winutils.winerror.ERROR_PIPE_CONNECTED: 670 if self.pipe: 671 win32pipe.DisconnectNamedPipe(self.pipe) 672 self.connect_pending = False 673 return errno.EINVAL, None 674 else: 675 win32event.SetEvent(self.connect.hEvent) 676 677 npipe = winutils.create_named_pipe(self._pipename) 678 if not npipe: 679 return errno.ENOENT, None 680 681 old_pipe = self.pipe 682 self.pipe = npipe 683 winutils.win32event.ResetEvent(self.connect.hEvent) 684 return 0, Stream(None, self.name, 0, pipe=old_pipe) 685 686 def wait(self, poller): 687 if sys.platform != 'win32' or self.socket is not None: 688 poller.fd_wait(self.socket, ovs.poller.POLLIN) 689 else: 690 poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) 691 692 def __del__(self): 693 # Don't delete the file: we might have forked. 694 if self.socket is not None: 695 self.socket.close() 696 if self.pipe is not None: 697 # Check if there are any remaining valid handles and close them 698 if self.pipe: 699 winutils.close_handle(self.pipe) 700 if self._connect.hEvent: 701 winutils.close_handle(self._read.hEvent) 702 703 704def usage(name): 705 return """ 706Active %s connection methods: 707 unix:FILE Unix domain socket named FILE 708 tcp:IP:PORT TCP socket to IP with port no of PORT 709 ssl:IP:PORT SSL socket to IP with port no of PORT 710 711Passive %s connection methods: 712 punix:FILE Listen on Unix domain socket FILE""" % (name, name) 713 714 715class UnixStream(Stream): 716 @staticmethod 717 def needs_probes(): 718 return False 719 720 @staticmethod 721 def _open(suffix, dscp): 722 connect_path = suffix 723 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, 724 True, None, connect_path) 725 726 727Stream.register_method("unix", UnixStream) 728 729 730class TCPStream(Stream): 731 @staticmethod 732 def needs_probes(): 733 return True 734 735 @staticmethod 736 def _open(suffix, dscp): 737 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM, 738 suffix, 0, dscp) 739 if not error: 740 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 741 return error, sock 742 743 744Stream.register_method("tcp", TCPStream) 745 746 747class SSLStream(Stream): 748 @staticmethod 749 def needs_probes(): 750 return True 751 752 @staticmethod 753 def verify_cb(conn, cert, errnum, depth, ok): 754 return ok 755 756 @staticmethod 757 def _open(suffix, dscp): 758 error, sock = TCPStream._open(suffix, dscp) 759 if error: 760 return error, None 761 762 # Create an SSL context 763 ctx = SSL.Context(SSL.SSLv23_METHOD) 764 ctx.set_verify(SSL.VERIFY_PEER, SSLStream.verify_cb) 765 ctx.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3) 766 # If the client has not set the SSL configuration files 767 # exception would be raised. 768 ctx.use_privatekey_file(Stream._SSL_private_key_file) 769 ctx.use_certificate_file(Stream._SSL_certificate_file) 770 ctx.load_verify_locations(Stream._SSL_ca_cert_file) 771 772 ssl_sock = SSL.Connection(ctx, sock) 773 ssl_sock.set_connect_state() 774 return error, ssl_sock 775 776 def connect(self): 777 retval = super(SSLStream, self).connect() 778 779 if retval: 780 return retval 781 782 # TCP Connection is successful. Now do the SSL handshake 783 try: 784 self.socket.do_handshake() 785 except SSL.WantReadError: 786 return errno.EAGAIN 787 except SSL.SysCallError as e: 788 return ovs.socket_util.get_exception_errno(e) 789 790 return 0 791 792 def recv(self, n): 793 try: 794 return super(SSLStream, self).recv(n) 795 except SSL.WantReadError: 796 return (errno.EAGAIN, "") 797 except SSL.SysCallError as e: 798 return (ovs.socket_util.get_exception_errno(e), "") 799 except SSL.ZeroReturnError: 800 return (0, "") 801 802 def send(self, buf): 803 try: 804 if isinstance(buf, six.text_type): 805 # Convert to byte stream if the buffer is string type/unicode. 806 # pyopenssl version 0.14 expects the buffer to be byte string. 807 buf = buf.encode('utf-8') 808 return super(SSLStream, self).send(buf) 809 except SSL.WantWriteError: 810 return -errno.EAGAIN 811 except SSL.SysCallError as e: 812 return -ovs.socket_util.get_exception_errno(e) 813 814 815if SSL: 816 # Register SSL only if the OpenSSL module is available 817 Stream.register_method("ssl", SSLStream) 818