1# Copyright 2014,2016 Christoph Reiter 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7 8import os 9import threading 10import ctypes 11 12if os.name == "nt": 13 from . import winapi 14 15from gi.repository import GLib 16 17 18def write_pipe(pipe_name, data): 19 """Writes the data to the pipe or raises EnvironmentError""" 20 21 assert isinstance(data, bytes) 22 23 # XXX: otherwise many consecutive open fail, no idea.. 24 pipe_exists(pipe_name) 25 26 filename = NamedPipeServer._get_filename(pipe_name) 27 with open(filename, "wb") as h: 28 h.write(data) 29 30 31def pipe_exists(pipe_name): 32 """Returns True if the named pipe named 'pipe_name' currently exists""" 33 34 timeout_ms = 1 35 filename = NamedPipeServer._get_filename(pipe_name) 36 37 try: 38 if winapi.WaitNamedPipeW(filename, timeout_ms) == 0: 39 raise ctypes.WinError() 40 except WindowsError: 41 return False 42 return True 43 44 45class NamedPipeServerError(Exception): 46 pass 47 48 49class NamedPipeServer(threading.Thread): 50 """A named pipe for Windows. 51 52 * server: 53 server = NamedPipeServer("foo", lambda data: ...) 54 server.start() 55 glib_loop() 56 server.stop() 57 58 * client: 59 with open(NamedPipeServer.get_filename("foo"), "wb") as h: 60 h.write("Hello World") 61 62 """ 63 64 def __init__(self, name, callback): 65 """name is the name of the pipe file (should be unique I guess) 66 callback will be called with new data until close() is called. 67 """ 68 69 super(NamedPipeServer, self).__init__() 70 self._event = threading.Event() 71 self._filename = self._get_filename(name) 72 self._callback = callback 73 self._stopped = False 74 75 @classmethod 76 def _get_filename(cls, name): 77 return u"\\\\.\\pipe\\%s" % name 78 79 def _process(self, data): 80 def idle_process(data): 81 if not self._stopped: 82 self._callback(data) 83 return False 84 85 GLib.idle_add(idle_process, data) 86 87 def start(self): 88 super(NamedPipeServer, self).start() 89 # make sure we can use write_pipe() immediately after this returns 90 self._event.wait() 91 if self._stopped: 92 # something went wrong (maybe another instance is running) 93 raise NamedPipeServerError("Setting up named pipe failed") 94 95 def run(self): 96 buffer_size = 4096 97 98 try: 99 handle = winapi.CreateNamedPipeW( 100 self._filename, 101 (winapi.PIPE_ACCESS_INBOUND | 102 winapi.FILE_FLAG_FIRST_PIPE_INSTANCE), 103 (winapi.PIPE_TYPE_BYTE | winapi.PIPE_READMODE_BYTE | 104 winapi.PIPE_WAIT | winapi.PIPE_REJECT_REMOTE_CLIENTS), 105 winapi.PIPE_UNLIMITED_INSTANCES, 106 buffer_size, 107 buffer_size, 108 winapi.NMPWAIT_USE_DEFAULT_WAIT, 109 None) 110 111 if handle == winapi.INVALID_HANDLE_VALUE: 112 raise ctypes.WinError() 113 114 except WindowsError: 115 # due to FILE_FLAG_FIRST_PIPE_INSTANCE and not the first instance 116 self._stopped = True 117 self._event.set() 118 return 119 120 self._event.set() 121 122 while 1: 123 data = bytearray() 124 try: 125 if winapi.ConnectNamedPipe(handle, None) == 0: 126 raise ctypes.WinError() 127 128 while 1: 129 readbuf = ctypes.create_string_buffer(buffer_size) 130 bytesread = winapi.DWORD() 131 try: 132 if winapi.ReadFile( 133 handle, readbuf, buffer_size, 134 ctypes.byref(bytesread), None) == 0: 135 raise ctypes.WinError() 136 except WindowsError: 137 break 138 else: 139 message = readbuf[:bytesread.value] 140 141 data += message 142 143 if winapi.DisconnectNamedPipe(handle) == 0: 144 raise ctypes.WinError() 145 except WindowsError: 146 # better not loop forever.. 147 break 148 finally: 149 if self._stopped: 150 break 151 if data: 152 self._process(bytes(data)) 153 154 # ignore errors here.. 155 winapi.CloseHandle(handle) 156 157 def stop(self): 158 """After this returns the callback will no longer be called. 159 Can be called multiple times. 160 """ 161 162 self._event.wait() 163 if self._stopped: 164 return 165 166 self._stopped = True 167 try: 168 with open(self._filename, "wb") as h: 169 h.write(b"stop!") 170 except EnvironmentError: 171 pass 172 173 self._callback = None 174 175 self.join() 176