1# Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at:
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import errno
16import os
17import socket
18import sys
19
20import ovs.poller
21import ovs.socket_util
22import ovs.vlog
23
24import six
25
26try:
27    from OpenSSL import SSL
28except ImportError:
29    SSL = None
30
31if sys.platform == 'win32':
32    import ovs.winutils as winutils
33    import pywintypes
34    import win32event
35    import win32file
36    import win32pipe
37
38vlog = ovs.vlog.Vlog("stream")
39
40
41def stream_or_pstream_needs_probes(name):
42    """ True if the stream or pstream specified by 'name' needs periodic probes
43    to verify connectivity.  For [p]streams which need probes, it can take a
44    long time to notice the connection was dropped.  Returns False if probes
45    aren't needed, and None if 'name' is invalid"""
46
47    cls = Stream._find_method(name)
48    if cls:
49        return cls.needs_probes()
50    elif PassiveStream.is_valid_name(name):
51        return PassiveStream.needs_probes(name)
52    else:
53        return None
54
55
56class Stream(object):
57    """Bidirectional byte stream.  Unix domain sockets, tcp and ssl
58    are implemented."""
59
60    # States.
61    __S_CONNECTING = 0
62    __S_CONNECTED = 1
63    __S_DISCONNECTED = 2
64
65    # Kinds of events that one might wait for.
66    W_CONNECT = 0               # Connect complete (success or failure).
67    W_RECV = 1                  # Data received.
68    W_SEND = 2                  # Send buffer room available.
69
70    _SOCKET_METHODS = {}
71
72    _SSL_private_key_file = None
73    _SSL_certificate_file = None
74    _SSL_ca_cert_file = None
75
76    # Windows only
77    _write = None                # overlapped for write operation
78    _read = None                 # overlapped for read operation
79    _write_pending = False
80    _read_pending = False
81    _retry_connect = False
82
83    @staticmethod
84    def register_method(method, cls):
85        Stream._SOCKET_METHODS[method + ":"] = cls
86
87    @staticmethod
88    def _find_method(name):
89        for method, cls in six.iteritems(Stream._SOCKET_METHODS):
90            if name.startswith(method):
91                return cls
92        return None
93
94    @staticmethod
95    def is_valid_name(name):
96        """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
97        TYPE is a supported stream type ("unix:", "tcp:" and "ssl:"),
98        otherwise False."""
99        return bool(Stream._find_method(name))
100
101    def __init__(self, socket, name, status, pipe=None, is_server=False):
102        self.socket = socket
103        self.pipe = pipe
104        if sys.platform == 'win32':
105            if pipe is not None:
106                # Flag to check if fd is a server HANDLE.  In the case of a
107                # server handle we have to issue a disconnect before closing
108                # the actual handle.
109                self._server = is_server
110                suffix = name.split(":", 1)[1]
111                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
112                self._pipename = winutils.get_pipe_name(suffix)
113                self._read = pywintypes.OVERLAPPED()
114                self._read.hEvent = winutils.get_new_event()
115                self._write = pywintypes.OVERLAPPED()
116                self._write.hEvent = winutils.get_new_event()
117            else:
118                self._wevent = winutils.get_new_event(bManualReset=False,
119                                                      bInitialState=False)
120
121        self.name = name
122        if status == errno.EAGAIN:
123            self.state = Stream.__S_CONNECTING
124        elif status == 0:
125            self.state = Stream.__S_CONNECTED
126        else:
127            self.state = Stream.__S_DISCONNECTED
128
129        self.error = 0
130
131    # Default value of dscp bits for connection between controller and manager.
132    # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
133    # in <netinet/ip.h> is used.
134    IPTOS_PREC_INTERNETCONTROL = 0xc0
135    DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
136
137    @staticmethod
138    def open(name, dscp=DSCP_DEFAULT):
139        """Attempts to connect a stream to a remote peer.  'name' is a
140        connection name in the form "TYPE:ARGS", where TYPE is an active stream
141        class's name and ARGS are stream class-specific.  The supported TYPEs
142        include "unix", "tcp", and "ssl".
143
144        Returns (error, stream): on success 'error' is 0 and 'stream' is the
145        new Stream, on failure 'error' is a positive errno value and 'stream'
146        is None.
147
148        Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
149        and a new Stream.  The connect() method can be used to check for
150        successful connection completion."""
151        cls = Stream._find_method(name)
152        if not cls:
153            return errno.EAFNOSUPPORT, None
154
155        suffix = name.split(":", 1)[1]
156        if name.startswith("unix:"):
157            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
158            if sys.platform == 'win32':
159                pipename = winutils.get_pipe_name(suffix)
160
161                if len(suffix) > 255:
162                    # Return invalid argument if the name is too long
163                    return errno.ENOENT, None
164
165                try:
166                    # In case of "unix:" argument, the assumption is that
167                    # there is a file created in the path (suffix).
168                    open(suffix, 'r').close()
169                except:
170                    return errno.ENOENT, None
171
172                try:
173                    npipe = winutils.create_file(pipename)
174                    try:
175                        winutils.set_pipe_mode(npipe,
176                                               win32pipe.PIPE_READMODE_BYTE)
177                    except pywintypes.error as e:
178                        return errno.ENOENT, None
179                except pywintypes.error as e:
180                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
181                        # Pipe is busy, set the retry flag to true and retry
182                        # again during the connect function.
183                        Stream.retry_connect = True
184                        return 0, cls(None, name, errno.EAGAIN,
185                                      pipe=win32file.INVALID_HANDLE_VALUE,
186                                      is_server=False)
187                    return errno.ENOENT, None
188                return 0, cls(None, name, 0, pipe=npipe, is_server=False)
189
190        error, sock = cls._open(suffix, dscp)
191        if error:
192            return error, None
193        else:
194            status = ovs.socket_util.check_connection_completion(sock)
195            return 0, cls(sock, name, status)
196
197    @staticmethod
198    def _open(suffix, dscp):
199        raise NotImplementedError("This method must be overrided by subclass")
200
201    @staticmethod
202    def open_block(error_stream):
203        """Blocks until a Stream completes its connection attempt, either
204        succeeding or failing.  (error, stream) should be the tuple returned by
205        Stream.open().  Returns a tuple of the same form.
206
207        Typical usage:
208        error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
209
210        # Py3 doesn't support tuple parameter unpacking - PEP 3113
211        error, stream = error_stream
212        if not error:
213            while True:
214                error = stream.connect()
215                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
216                    # WSAEWOULDBLOCK would be the equivalent on Windows
217                    # for EAGAIN on Unix.
218                    error = errno.EAGAIN
219                if error != errno.EAGAIN:
220                    break
221                stream.run()
222                poller = ovs.poller.Poller()
223                stream.run_wait(poller)
224                stream.connect_wait(poller)
225                poller.block()
226            if stream.socket is not None:
227                assert error != errno.EINPROGRESS
228
229        if error and stream:
230            stream.close()
231            stream = None
232        return error, stream
233
234    def close(self):
235        if self.socket is not None:
236            self.socket.close()
237        if self.pipe is not None:
238            if self._server:
239                # Flush the pipe to allow the client to read the pipe
240                # before disconnecting.
241                win32pipe.FlushFileBuffers(self.pipe)
242                win32pipe.DisconnectNamedPipe(self.pipe)
243            winutils.close_handle(self.pipe, vlog.warn)
244            winutils.close_handle(self._read.hEvent, vlog.warn)
245            winutils.close_handle(self._write.hEvent, vlog.warn)
246
247    def __scs_connecting(self):
248        if self.socket is not None:
249            retval = ovs.socket_util.check_connection_completion(self.socket)
250            assert retval != errno.EINPROGRESS
251        elif sys.platform == 'win32':
252            if self.retry_connect:
253                try:
254                    self.pipe = winutils.create_file(self._pipename)
255                    self._retry_connect = False
256                    retval = 0
257                except pywintypes.error as e:
258                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
259                        retval = errno.EAGAIN
260                    else:
261                        self._retry_connect = False
262                        retval = errno.ENOENT
263            else:
264                # If retry_connect is false, it means it's already
265                # connected so we can set the value of retval to 0
266                retval = 0
267
268        if retval == 0:
269            self.state = Stream.__S_CONNECTED
270        elif retval != errno.EAGAIN:
271            self.state = Stream.__S_DISCONNECTED
272            self.error = retval
273
274    def connect(self):
275        """Tries to complete the connection on this stream.  If the connection
276        is complete, returns 0 if the connection was successful or a positive
277        errno value if it failed.  If the connection is still in progress,
278        returns errno.EAGAIN."""
279
280        if self.state == Stream.__S_CONNECTING:
281            self.__scs_connecting()
282
283        if self.state == Stream.__S_CONNECTING:
284            return errno.EAGAIN
285        elif self.state == Stream.__S_CONNECTED:
286            return 0
287        else:
288            assert self.state == Stream.__S_DISCONNECTED
289            return self.error
290
291    def recv(self, n):
292        """Tries to receive up to 'n' bytes from this stream.  Returns a
293        (error, string) tuple:
294
295            - If successful, 'error' is zero and 'string' contains between 1
296              and 'n' bytes of data.
297
298            - On error, 'error' is a positive errno value.
299
300            - If the connection has been closed in the normal fashion or if 'n'
301              is 0, the tuple is (0, "").
302
303        The recv function will not block waiting for data to arrive.  If no
304        data have been received, it returns (errno.EAGAIN, "") immediately."""
305
306        retval = self.connect()
307        if retval != 0:
308            return (retval, "")
309        elif n == 0:
310            return (0, "")
311
312        if sys.platform == 'win32' and self.socket is None:
313            return self.__recv_windows(n)
314
315        try:
316            return (0, self.socket.recv(n))
317        except socket.error as e:
318            return (ovs.socket_util.get_exception_errno(e), "")
319
320    def __recv_windows(self, n):
321        if self._read_pending:
322            try:
323                nBytesRead = winutils.get_overlapped_result(self.pipe,
324                                                            self._read,
325                                                            False)
326                self._read_pending = False
327            except pywintypes.error as e:
328                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
329                    # The operation is still pending, try again
330                    self._read_pending = True
331                    return (errno.EAGAIN, "")
332                elif e.winerror in winutils.pipe_disconnected_errors:
333                    # If the pipe was disconnected, return 0.
334                    return (0, "")
335                else:
336                    return (errno.EINVAL, "")
337        else:
338            (errCode, self._read_buffer) = winutils.read_file(self.pipe,
339                                                              n,
340                                                              self._read)
341            if errCode:
342                if errCode == winutils.winerror.ERROR_IO_PENDING:
343                    self._read_pending = True
344                    return (errno.EAGAIN, "")
345                elif errCode in winutils.pipe_disconnected_errors:
346                    # If the pipe was disconnected, return 0.
347                    return (0, "")
348                else:
349                    return (errCode, "")
350
351            try:
352                nBytesRead = winutils.get_overlapped_result(self.pipe,
353                                                            self._read,
354                                                            False)
355                winutils.win32event.SetEvent(self._read.hEvent)
356            except pywintypes.error as e:
357                if e.winerror in winutils.pipe_disconnected_errors:
358                    # If the pipe was disconnected, return 0.
359                    return (0, "")
360                else:
361                    return (e.winerror, "")
362
363        recvBuffer = self._read_buffer[:nBytesRead]
364        # recvBuffer will have the type memoryview in Python3.
365        # We can use bytes to convert it to type bytes which works on
366        # both Python2 and Python3.
367        return (0, bytes(recvBuffer))
368
369    def send(self, buf):
370        """Tries to send 'buf' on this stream.
371
372        If successful, returns the number of bytes sent, between 1 and
373        len(buf).  0 is only a valid return value if len(buf) is 0.
374
375        On error, returns a negative errno value.
376
377        Will not block.  If no bytes can be immediately accepted for
378        transmission, returns -errno.EAGAIN immediately."""
379
380        retval = self.connect()
381        if retval != 0:
382            return -retval
383        elif len(buf) == 0:
384            return 0
385
386        # Python 3 has separate types for strings and bytes.  We must have
387        # bytes here.
388        if six.PY3 and not isinstance(buf, bytes):
389            buf = bytes(buf, 'utf-8')
390        elif six.PY2:
391            buf = buf.encode('utf-8')
392
393        if sys.platform == 'win32' and self.socket is None:
394            return self.__send_windows(buf)
395
396        try:
397            return self.socket.send(buf)
398        except socket.error as e:
399            return -ovs.socket_util.get_exception_errno(e)
400
401    def __send_windows(self, buf):
402        if self._write_pending:
403            try:
404                nBytesWritten = winutils.get_overlapped_result(self.pipe,
405                                                               self._write,
406                                                               False)
407                self._write_pending = False
408            except pywintypes.error as e:
409                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
410                    # The operation is still pending, try again
411                    self._read_pending = True
412                    return -errno.EAGAIN
413                elif e.winerror in winutils.pipe_disconnected_errors:
414                    # If the pipe was disconnected, return connection reset.
415                    return -errno.ECONNRESET
416                else:
417                    return -errno.EINVAL
418        else:
419            (errCode, nBytesWritten) = winutils.write_file(self.pipe,
420                                                           buf,
421                                                           self._write)
422            if errCode:
423                if errCode == winutils.winerror.ERROR_IO_PENDING:
424                    self._write_pending = True
425                    return -errno.EAGAIN
426                if (not nBytesWritten and
427                        errCode in winutils.pipe_disconnected_errors):
428                    # If the pipe was disconnected, return connection reset.
429                    return -errno.ECONNRESET
430        return nBytesWritten
431
432    def run(self):
433        pass
434
435    def run_wait(self, poller):
436        pass
437
438    def wait(self, poller, wait):
439        assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
440
441        if self.state == Stream.__S_DISCONNECTED:
442            poller.immediate_wake()
443            return
444
445        if self.state == Stream.__S_CONNECTING:
446            wait = Stream.W_CONNECT
447
448        if sys.platform == 'win32':
449            self.__wait_windows(poller, wait)
450            return
451
452        if wait == Stream.W_RECV:
453            poller.fd_wait(self.socket, ovs.poller.POLLIN)
454        else:
455            poller.fd_wait(self.socket, ovs.poller.POLLOUT)
456
457    def __wait_windows(self, poller, wait):
458        if self.socket is not None:
459            if wait == Stream.W_RECV:
460                mask = (win32file.FD_READ |
461                        win32file.FD_ACCEPT |
462                        win32file.FD_CLOSE)
463                event = ovs.poller.POLLIN
464            else:
465                mask = (win32file.FD_WRITE |
466                        win32file.FD_CONNECT |
467                        win32file.FD_CLOSE)
468                event = ovs.poller.POLLOUT
469
470            try:
471                win32file.WSAEventSelect(self.socket,
472                                         self._wevent,
473                                         mask)
474            except pywintypes.error as e:
475                vlog.err("failed to associate events with socket: %s"
476                         % e.strerror)
477            poller.fd_wait(self._wevent, event)
478        else:
479            if wait == Stream.W_RECV:
480                if self._read:
481                    poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
482            elif wait == Stream.W_SEND:
483                if self._write:
484                    poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
485            elif wait == Stream.W_CONNECT:
486                return
487
488    def connect_wait(self, poller):
489        self.wait(poller, Stream.W_CONNECT)
490
491    def recv_wait(self, poller):
492        self.wait(poller, Stream.W_RECV)
493
494    def send_wait(self, poller):
495        self.wait(poller, Stream.W_SEND)
496
497    def __del__(self):
498        # Don't delete the file: we might have forked.
499        if self.socket is not None:
500            self.socket.close()
501        if self.pipe is not None:
502            # Check if there are any remaining valid handles and close them
503            if self.pipe:
504                winutils.close_handle(self.pipe)
505            if self._read.hEvent:
506                winutils.close_handle(self._read.hEvent)
507            if self._write.hEvent:
508                winutils.close_handle(self._write.hEvent)
509
510    @staticmethod
511    def ssl_set_private_key_file(file_name):
512        Stream._SSL_private_key_file = file_name
513
514    @staticmethod
515    def ssl_set_certificate_file(file_name):
516        Stream._SSL_certificate_file = file_name
517
518    @staticmethod
519    def ssl_set_ca_cert_file(file_name):
520        Stream._SSL_ca_cert_file = file_name
521
522
523class PassiveStream(object):
524    # Windows only
525    connect = None                  # overlapped for read operation
526    connect_pending = False
527
528    @staticmethod
529    def needs_probes(name):
530        return False if name.startswith("punix:") else True
531
532    @staticmethod
533    def is_valid_name(name):
534        """Returns True if 'name' is a passive stream name in the form
535        "TYPE:ARGS" and TYPE is a supported passive stream type (currently
536        "punix:" or "ptcp"), otherwise False."""
537        return name.startswith("punix:") | name.startswith("ptcp:")
538
539    def __init__(self, sock, name, bind_path, pipe=None):
540        self.name = name
541        self.pipe = pipe
542        self.socket = sock
543        if pipe is not None:
544            self.connect = pywintypes.OVERLAPPED()
545            self.connect.hEvent = winutils.get_new_event()
546            self.connect_pending = False
547            suffix = name.split(":", 1)[1]
548            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
549            self._pipename = winutils.get_pipe_name(suffix)
550
551        self.bind_path = bind_path
552
553    @staticmethod
554    def open(name):
555        """Attempts to start listening for remote stream connections.  'name'
556        is a connection name in the form "TYPE:ARGS", where TYPE is an passive
557        stream class's name and ARGS are stream class-specific. Currently the
558        supported values for TYPE are "punix" and "ptcp".
559
560        Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
561        new PassiveStream, on failure 'error' is a positive errno value and
562        'pstream' is None."""
563        if not PassiveStream.is_valid_name(name):
564            return errno.EAFNOSUPPORT, None
565
566        bind_path = name[6:]
567        if name.startswith("punix:"):
568            bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
569            if sys.platform != 'win32':
570                error, sock = ovs.socket_util.make_unix_socket(
571                    socket.SOCK_STREAM, True, bind_path, None)
572                if error:
573                    return error, None
574            else:
575                # Branch used only on Windows
576                try:
577                    open(bind_path, 'w').close()
578                except:
579                    return errno.ENOENT, None
580
581                pipename = winutils.get_pipe_name(bind_path)
582                if len(pipename) > 255:
583                    # Return invalid argument if the name is too long
584                    return errno.ENOENT, None
585
586                npipe = winutils.create_named_pipe(pipename)
587                if not npipe:
588                    return errno.ENOENT, None
589                return 0, PassiveStream(None, name, bind_path, pipe=npipe)
590
591        elif name.startswith("ptcp:"):
592            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
593            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
594            remote = name.split(':')
595            sock.bind((remote[1], int(remote[2])))
596
597        else:
598            raise Exception('Unknown connection string')
599
600        try:
601            sock.listen(10)
602        except socket.error as e:
603            vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
604            sock.close()
605            return e.error, None
606
607        return 0, PassiveStream(sock, name, bind_path)
608
609    def close(self):
610        """Closes this PassiveStream."""
611        if self.socket is not None:
612            self.socket.close()
613        if self.pipe is not None:
614            winutils.close_handle(self.pipe, vlog.warn)
615            winutils.close_handle(self.connect.hEvent, vlog.warn)
616        if self.bind_path is not None:
617            ovs.fatal_signal.unlink_file_now(self.bind_path)
618            self.bind_path = None
619
620    def accept(self):
621        """Tries to accept a new connection on this passive stream.  Returns
622        (error, stream): if successful, 'error' is 0 and 'stream' is the new
623        Stream object, and on failure 'error' is a positive errno value and
624        'stream' is None.
625
626        Will not block waiting for a connection.  If no connection is ready to
627        be accepted, returns (errno.EAGAIN, None) immediately."""
628        if sys.platform == 'win32' and self.socket is None:
629            return self.__accept_windows()
630        while True:
631            try:
632                sock, addr = self.socket.accept()
633                ovs.socket_util.set_nonblocking(sock)
634                if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
635                    return 0, Stream(sock, "unix:%s" % addr, 0)
636                return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
637                                                       str(addr[1])), 0)
638            except socket.error as e:
639                error = ovs.socket_util.get_exception_errno(e)
640                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
641                    # WSAEWOULDBLOCK would be the equivalent on Windows
642                    # for EAGAIN on Unix.
643                    error = errno.EAGAIN
644                if error != errno.EAGAIN:
645                    # XXX rate-limit
646                    vlog.dbg("accept: %s" % os.strerror(error))
647                return error, None
648
649    def __accept_windows(self):
650        if self.connect_pending:
651            try:
652                winutils.get_overlapped_result(self.pipe, self.connect, False)
653            except pywintypes.error as e:
654                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
655                    # The operation is still pending, try again
656                    self.connect_pending = True
657                    return errno.EAGAIN, None
658                else:
659                    if self.pipe:
660                        win32pipe.DisconnectNamedPipe(self.pipe)
661                    return errno.EINVAL, None
662            self.connect_pending = False
663
664        error = winutils.connect_named_pipe(self.pipe, self.connect)
665        if error:
666            if error == winutils.winerror.ERROR_IO_PENDING:
667                self.connect_pending = True
668                return errno.EAGAIN, None
669            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
670                if self.pipe:
671                    win32pipe.DisconnectNamedPipe(self.pipe)
672                self.connect_pending = False
673                return errno.EINVAL, None
674            else:
675                win32event.SetEvent(self.connect.hEvent)
676
677        npipe = winutils.create_named_pipe(self._pipename)
678        if not npipe:
679            return errno.ENOENT, None
680
681        old_pipe = self.pipe
682        self.pipe = npipe
683        winutils.win32event.ResetEvent(self.connect.hEvent)
684        return 0, Stream(None, self.name, 0, pipe=old_pipe)
685
686    def wait(self, poller):
687        if sys.platform != 'win32' or self.socket is not None:
688            poller.fd_wait(self.socket, ovs.poller.POLLIN)
689        else:
690            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
691
692    def __del__(self):
693        # Don't delete the file: we might have forked.
694        if self.socket is not None:
695            self.socket.close()
696        if self.pipe is not None:
697            # Check if there are any remaining valid handles and close them
698            if self.pipe:
699                winutils.close_handle(self.pipe)
700            if self._connect.hEvent:
701                winutils.close_handle(self._read.hEvent)
702
703
704def usage(name):
705    return """
706Active %s connection methods:
707  unix:FILE               Unix domain socket named FILE
708  tcp:IP:PORT             TCP socket to IP with port no of PORT
709  ssl:IP:PORT             SSL socket to IP with port no of PORT
710
711Passive %s connection methods:
712  punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
713
714
715class UnixStream(Stream):
716    @staticmethod
717    def needs_probes():
718        return False
719
720    @staticmethod
721    def _open(suffix, dscp):
722        connect_path = suffix
723        return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
724                                                True, None, connect_path)
725
726
727Stream.register_method("unix", UnixStream)
728
729
730class TCPStream(Stream):
731    @staticmethod
732    def needs_probes():
733        return True
734
735    @staticmethod
736    def _open(suffix, dscp):
737        error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
738                                                       suffix, 0, dscp)
739        if not error:
740            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
741        return error, sock
742
743
744Stream.register_method("tcp", TCPStream)
745
746
747class SSLStream(Stream):
748    @staticmethod
749    def needs_probes():
750        return True
751
752    @staticmethod
753    def verify_cb(conn, cert, errnum, depth, ok):
754        return ok
755
756    @staticmethod
757    def _open(suffix, dscp):
758        error, sock = TCPStream._open(suffix, dscp)
759        if error:
760            return error, None
761
762        # Create an SSL context
763        ctx = SSL.Context(SSL.SSLv23_METHOD)
764        ctx.set_verify(SSL.VERIFY_PEER, SSLStream.verify_cb)
765        ctx.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
766        # If the client has not set the SSL configuration files
767        # exception would be raised.
768        ctx.use_privatekey_file(Stream._SSL_private_key_file)
769        ctx.use_certificate_file(Stream._SSL_certificate_file)
770        ctx.load_verify_locations(Stream._SSL_ca_cert_file)
771
772        ssl_sock = SSL.Connection(ctx, sock)
773        ssl_sock.set_connect_state()
774        return error, ssl_sock
775
776    def connect(self):
777        retval = super(SSLStream, self).connect()
778
779        if retval:
780            return retval
781
782        # TCP Connection is successful. Now do the SSL handshake
783        try:
784            self.socket.do_handshake()
785        except SSL.WantReadError:
786            return errno.EAGAIN
787        except SSL.SysCallError as e:
788            return ovs.socket_util.get_exception_errno(e)
789
790        return 0
791
792    def recv(self, n):
793        try:
794            return super(SSLStream, self).recv(n)
795        except SSL.WantReadError:
796            return (errno.EAGAIN, "")
797        except SSL.SysCallError as e:
798            return (ovs.socket_util.get_exception_errno(e), "")
799        except SSL.ZeroReturnError:
800            return (0, "")
801
802    def send(self, buf):
803        try:
804            if isinstance(buf, six.text_type):
805                # Convert to byte stream if the buffer is string type/unicode.
806                # pyopenssl version 0.14 expects the buffer to be byte string.
807                buf = buf.encode('utf-8')
808            return super(SSLStream, self).send(buf)
809        except SSL.WantWriteError:
810            return -errno.EAGAIN
811        except SSL.SysCallError as e:
812            return -ovs.socket_util.get_exception_errno(e)
813
814
815if SSL:
816    # Register SSL only if the OpenSSL module is available
817    Stream.register_method("ssl", SSLStream)
818