1 /* $NetBSD: event_iocp.c,v 1.1.1.1 2013/04/11 16:43:19 christos Exp $ */ 2 /* 3 * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #ifndef _WIN32_WINNT 29 /* Minimum required for InitializeCriticalSectionAndSpinCount */ 30 #define _WIN32_WINNT 0x0403 31 #endif 32 #include <winsock2.h> 33 #include <windows.h> 34 #include <process.h> 35 #include <stdio.h> 36 #include <mswsock.h> 37 38 #include "event2/util.h" 39 #include "util-internal.h" 40 #include "iocp-internal.h" 41 #include "log-internal.h" 42 #include "mm-internal.h" 43 #include "event-internal.h" 44 #include "evthread-internal.h" 45 46 #define NOTIFICATION_KEY ((ULONG_PTR)-1) 47 48 void 49 event_overlapped_init(struct event_overlapped *o, iocp_callback cb) 50 { 51 memset(o, 0, sizeof(struct event_overlapped)); 52 o->cb = cb; 53 } 54 55 static void 56 handle_entry(OVERLAPPED *o, ULONG_PTR completion_key, DWORD nBytes, int ok) 57 { 58 struct event_overlapped *eo = 59 EVUTIL_UPCAST(o, struct event_overlapped, overlapped); 60 eo->cb(eo, completion_key, nBytes, ok); 61 } 62 63 static void 64 loop(void *_port) 65 { 66 struct event_iocp_port *port = _port; 67 long ms = port->ms; 68 HANDLE p = port->port; 69 70 if (ms <= 0) 71 ms = INFINITE; 72 73 while (1) { 74 OVERLAPPED *overlapped=NULL; 75 ULONG_PTR key=0; 76 DWORD bytes=0; 77 int ok = GetQueuedCompletionStatus(p, &bytes, &key, 78 &overlapped, ms); 79 EnterCriticalSection(&port->lock); 80 if (port->shutdown) { 81 if (--port->n_live_threads == 0) 82 ReleaseSemaphore(port->shutdownSemaphore, 1, 83 NULL); 84 LeaveCriticalSection(&port->lock); 85 return; 86 } 87 LeaveCriticalSection(&port->lock); 88 89 if (key != NOTIFICATION_KEY && overlapped) 90 handle_entry(overlapped, key, bytes, ok); 91 else if (!overlapped) 92 break; 93 } 94 event_warnx("GetQueuedCompletionStatus exited with no event."); 95 EnterCriticalSection(&port->lock); 96 if (--port->n_live_threads == 0) 97 ReleaseSemaphore(port->shutdownSemaphore, 1, NULL); 98 LeaveCriticalSection(&port->lock); 99 } 100 101 int 102 event_iocp_port_associate(struct event_iocp_port *port, evutil_socket_t fd, 103 ev_uintptr_t key) 104 { 105 HANDLE h; 106 h = CreateIoCompletionPort((HANDLE)fd, port->port, key, port->n_threads); 107 if (!h) 108 return -1; 109 return 0; 110 } 111 112 static void * 113 get_extension_function(SOCKET s, const GUID *which_fn) 114 { 115 void *ptr = NULL; 116 DWORD bytes=0; 117 WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, 118 (GUID*)which_fn, sizeof(*which_fn), 119 &ptr, sizeof(ptr), 120 &bytes, NULL, NULL); 121 122 /* No need to detect errors here: if ptr is set, then we have a good 123 function pointer. Otherwise, we should behave as if we had no 124 function pointer. 125 */ 126 return ptr; 127 } 128 129 /* Mingw doesn't have these in its mswsock.h. The values are copied from 130 wine.h. Perhaps if we copy them exactly, the cargo will come again. 131 */ 132 #ifndef WSAID_ACCEPTEX 133 #define WSAID_ACCEPTEX \ 134 {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} 135 #endif 136 #ifndef WSAID_CONNECTEX 137 #define WSAID_CONNECTEX \ 138 {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}} 139 #endif 140 #ifndef WSAID_GETACCEPTEXSOCKADDRS 141 #define WSAID_GETACCEPTEXSOCKADDRS \ 142 {0xb5367df2,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} 143 #endif 144 145 static void 146 init_extension_functions(struct win32_extension_fns *ext) 147 { 148 const GUID acceptex = WSAID_ACCEPTEX; 149 const GUID connectex = WSAID_CONNECTEX; 150 const GUID getacceptexsockaddrs = WSAID_GETACCEPTEXSOCKADDRS; 151 SOCKET s = socket(AF_INET, SOCK_STREAM, 0); 152 if (s == INVALID_SOCKET) 153 return; 154 ext->AcceptEx = get_extension_function(s, &acceptex); 155 ext->ConnectEx = get_extension_function(s, &connectex); 156 ext->GetAcceptExSockaddrs = get_extension_function(s, 157 &getacceptexsockaddrs); 158 closesocket(s); 159 } 160 161 static struct win32_extension_fns the_extension_fns; 162 static int extension_fns_initialized = 0; 163 164 const struct win32_extension_fns * 165 event_get_win32_extension_fns(void) 166 { 167 return &the_extension_fns; 168 } 169 170 #define N_CPUS_DEFAULT 2 171 172 struct event_iocp_port * 173 event_iocp_port_launch(int n_cpus) 174 { 175 struct event_iocp_port *port; 176 int i; 177 178 if (!extension_fns_initialized) 179 init_extension_functions(&the_extension_fns); 180 181 if (!(port = mm_calloc(1, sizeof(struct event_iocp_port)))) 182 return NULL; 183 184 if (n_cpus <= 0) 185 n_cpus = N_CPUS_DEFAULT; 186 port->n_threads = n_cpus * 2; 187 port->threads = mm_calloc(port->n_threads, sizeof(HANDLE)); 188 if (!port->threads) 189 goto err; 190 191 port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 192 n_cpus); 193 port->ms = -1; 194 if (!port->port) 195 goto err; 196 197 port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL); 198 if (!port->shutdownSemaphore) 199 goto err; 200 201 for (i=0; i<port->n_threads; ++i) { 202 ev_uintptr_t th = _beginthread(loop, 0, port); 203 if (th == (ev_uintptr_t)-1) 204 goto err; 205 port->threads[i] = (HANDLE)th; 206 ++port->n_live_threads; 207 } 208 209 InitializeCriticalSectionAndSpinCount(&port->lock, 1000); 210 211 return port; 212 err: 213 if (port->port) 214 CloseHandle(port->port); 215 if (port->threads) 216 mm_free(port->threads); 217 if (port->shutdownSemaphore) 218 CloseHandle(port->shutdownSemaphore); 219 mm_free(port); 220 return NULL; 221 } 222 223 static void 224 _event_iocp_port_unlock_and_free(struct event_iocp_port *port) 225 { 226 DeleteCriticalSection(&port->lock); 227 CloseHandle(port->port); 228 CloseHandle(port->shutdownSemaphore); 229 mm_free(port->threads); 230 mm_free(port); 231 } 232 233 static int 234 event_iocp_notify_all(struct event_iocp_port *port) 235 { 236 int i, r, ok=1; 237 for (i=0; i<port->n_threads; ++i) { 238 r = PostQueuedCompletionStatus(port->port, 0, NOTIFICATION_KEY, 239 NULL); 240 if (!r) 241 ok = 0; 242 } 243 return ok ? 0 : -1; 244 } 245 246 int 247 event_iocp_shutdown(struct event_iocp_port *port, long waitMsec) 248 { 249 DWORD ms = INFINITE; 250 int n; 251 252 EnterCriticalSection(&port->lock); 253 port->shutdown = 1; 254 LeaveCriticalSection(&port->lock); 255 event_iocp_notify_all(port); 256 257 if (waitMsec >= 0) 258 ms = waitMsec; 259 260 WaitForSingleObject(port->shutdownSemaphore, ms); 261 EnterCriticalSection(&port->lock); 262 n = port->n_live_threads; 263 LeaveCriticalSection(&port->lock); 264 if (n == 0) { 265 _event_iocp_port_unlock_and_free(port); 266 return 0; 267 } else { 268 return -1; 269 } 270 } 271 272 int 273 event_iocp_activate_overlapped( 274 struct event_iocp_port *port, struct event_overlapped *o, 275 ev_uintptr_t key, ev_uint32_t n) 276 { 277 BOOL r; 278 279 r = PostQueuedCompletionStatus(port->port, n, key, &o->overlapped); 280 return (r==0) ? -1 : 0; 281 } 282 283 struct event_iocp_port * 284 event_base_get_iocp(struct event_base *base) 285 { 286 #ifdef WIN32 287 return base->iocp; 288 #else 289 return NULL; 290 #endif 291 } 292