1# Copyright 2014,2016 Christoph Reiter
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7
8import os
9import threading
10import ctypes
11
12if os.name == "nt":
13    from . import winapi
14
15from gi.repository import GLib
16
17
18def write_pipe(pipe_name, data):
19    """Writes the data to the pipe or raises EnvironmentError"""
20
21    assert isinstance(data, bytes)
22
23    # XXX: otherwise many consecutive open fail, no idea..
24    pipe_exists(pipe_name)
25
26    filename = NamedPipeServer._get_filename(pipe_name)
27    with open(filename, "wb") as h:
28        h.write(data)
29
30
31def pipe_exists(pipe_name):
32    """Returns True if the named pipe named 'pipe_name' currently exists"""
33
34    timeout_ms = 1
35    filename = NamedPipeServer._get_filename(pipe_name)
36
37    try:
38        if winapi.WaitNamedPipeW(filename, timeout_ms) == 0:
39            raise ctypes.WinError()
40    except WindowsError:
41        return False
42    return True
43
44
45class NamedPipeServerError(Exception):
46    pass
47
48
49class NamedPipeServer(threading.Thread):
50    """A named pipe for Windows.
51
52    * server:
53        server = NamedPipeServer("foo", lambda data: ...)
54        server.start()
55        glib_loop()
56        server.stop()
57
58    * client:
59        with open(NamedPipeServer.get_filename("foo"), "wb") as h:
60            h.write("Hello World")
61
62    """
63
64    def __init__(self, name, callback):
65        """name is the name of the pipe file (should be unique I guess)
66        callback will be called with new data until close() is called.
67        """
68
69        super(NamedPipeServer, self).__init__()
70        self._event = threading.Event()
71        self._filename = self._get_filename(name)
72        self._callback = callback
73        self._stopped = False
74
75    @classmethod
76    def _get_filename(cls, name):
77        return u"\\\\.\\pipe\\%s" % name
78
79    def _process(self, data):
80        def idle_process(data):
81            if not self._stopped:
82                self._callback(data)
83            return False
84
85        GLib.idle_add(idle_process, data)
86
87    def start(self):
88        super(NamedPipeServer, self).start()
89        # make sure we can use write_pipe() immediately after this returns
90        self._event.wait()
91        if self._stopped:
92            # something went wrong (maybe another instance is running)
93            raise NamedPipeServerError("Setting up named pipe failed")
94
95    def run(self):
96        buffer_size = 4096
97
98        try:
99            handle = winapi.CreateNamedPipeW(
100                self._filename,
101                (winapi.PIPE_ACCESS_INBOUND |
102                 winapi.FILE_FLAG_FIRST_PIPE_INSTANCE),
103                (winapi.PIPE_TYPE_BYTE | winapi.PIPE_READMODE_BYTE |
104                 winapi.PIPE_WAIT | winapi.PIPE_REJECT_REMOTE_CLIENTS),
105                winapi.PIPE_UNLIMITED_INSTANCES,
106                buffer_size,
107                buffer_size,
108                winapi.NMPWAIT_USE_DEFAULT_WAIT,
109                None)
110
111            if handle == winapi.INVALID_HANDLE_VALUE:
112                raise ctypes.WinError()
113
114        except WindowsError:
115            # due to FILE_FLAG_FIRST_PIPE_INSTANCE and not the first instance
116            self._stopped = True
117            self._event.set()
118            return
119
120        self._event.set()
121
122        while 1:
123            data = bytearray()
124            try:
125                if winapi.ConnectNamedPipe(handle, None) == 0:
126                    raise ctypes.WinError()
127
128                while 1:
129                    readbuf = ctypes.create_string_buffer(buffer_size)
130                    bytesread = winapi.DWORD()
131                    try:
132                        if winapi.ReadFile(
133                                handle, readbuf, buffer_size,
134                                ctypes.byref(bytesread), None) == 0:
135                            raise ctypes.WinError()
136                    except WindowsError:
137                        break
138                    else:
139                        message = readbuf[:bytesread.value]
140
141                    data += message
142
143                if winapi.DisconnectNamedPipe(handle) == 0:
144                    raise ctypes.WinError()
145            except WindowsError:
146                # better not loop forever..
147                break
148            finally:
149                if self._stopped:
150                    break
151                if data:
152                    self._process(bytes(data))
153
154        # ignore errors here..
155        winapi.CloseHandle(handle)
156
157    def stop(self):
158        """After this returns the callback will no longer be called.
159        Can be called multiple times.
160        """
161
162        self._event.wait()
163        if self._stopped:
164            return
165
166        self._stopped = True
167        try:
168            with open(self._filename, "wb") as h:
169                h.write(b"stop!")
170        except EnvironmentError:
171            pass
172
173        self._callback = None
174
175        self.join()
176