1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 43    Windows AIOPS */
10 
11 #include "squid.h"
12 #include "DiskIO/DiskThreads/CommIO.h"
13 #include "DiskThreads.h"
14 #include "fd.h"
15 #include "mem/Pool.h"
16 #include "SquidConfig.h"
17 #include "SquidTime.h"
18 #include "Store.h"
19 
20 #include <cerrno>
21 #include <csignal>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <dirent.h>
25 
26 #define RIDICULOUS_LENGTH   4096
27 
28 enum _squidaio_thread_status {
29     _THREAD_STARTING = 0,
30     _THREAD_WAITING,
31     _THREAD_BUSY,
32     _THREAD_FAILED,
33     _THREAD_DONE
34 };
35 typedef enum _squidaio_thread_status squidaio_thread_status;
36 
37 typedef struct squidaio_request_t {
38 
39     struct squidaio_request_t *next;
40     squidaio_request_type request_type;
41     int cancelled;
42     char *path;
43     int oflag;
44     mode_t mode;
45     int fd;
46     char *bufferp;
47     char *tmpbufp;
48     size_t buflen;
49     off_t offset;
50     int whence;
51     int ret;
52     int err;
53 
54     struct stat *tmpstatp;
55 
56     struct stat *statp;
57     squidaio_result_t *resultp;
58 } squidaio_request_t;
59 
60 typedef struct squidaio_request_queue_t {
61     HANDLE mutex;
62     HANDLE cond; /* See Event objects */
63     squidaio_request_t *volatile head;
64     squidaio_request_t *volatile *volatile tailp;
65     unsigned long requests;
66     unsigned long blocked;  /* main failed to lock the queue */
67 } squidaio_request_queue_t;
68 
69 typedef struct squidaio_thread_t squidaio_thread_t;
70 
71 struct squidaio_thread_t {
72     squidaio_thread_t *next;
73     HANDLE thread;
74     DWORD dwThreadId; /* thread ID */
75     squidaio_thread_status status;
76 
77     struct squidaio_request_t *current_req;
78     unsigned long requests;
79     int volatile exit;
80 };
81 
82 static void squidaio_queue_request(squidaio_request_t *);
83 static void squidaio_cleanup_request(squidaio_request_t *);
84 static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
85 static void squidaio_do_open(squidaio_request_t *);
86 static void squidaio_do_read(squidaio_request_t *);
87 static void squidaio_do_write(squidaio_request_t *);
88 static void squidaio_do_close(squidaio_request_t *);
89 static void squidaio_do_stat(squidaio_request_t *);
90 static void squidaio_do_unlink(squidaio_request_t *);
91 #if AIO_OPENDIR
92 static void *squidaio_do_opendir(squidaio_request_t *);
93 #endif
94 static void squidaio_debug(squidaio_request_t *);
95 static void squidaio_poll_queues(void);
96 
97 static squidaio_thread_t *threads = NULL;
98 static int squidaio_initialised = 0;
99 
100 #define AIO_LARGE_BUFS  16384
101 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
102 #define AIO_SMALL_BUFS  AIO_LARGE_BUFS >> 2
103 #define AIO_TINY_BUFS   AIO_LARGE_BUFS >> 3
104 #define AIO_MICRO_BUFS  128
105 
106 static MemAllocator *squidaio_large_bufs = NULL;    /* 16K */
107 static MemAllocator *squidaio_medium_bufs = NULL;   /* 8K */
108 static MemAllocator *squidaio_small_bufs = NULL;    /* 4K */
109 static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
110 static MemAllocator *squidaio_micro_bufs = NULL;    /* 128K */
111 
112 static int request_queue_len = 0;
113 static MemAllocator *squidaio_request_pool = NULL;
114 static MemAllocator *squidaio_thread_pool = NULL;
115 static squidaio_request_queue_t request_queue;
116 
117 static struct {
118     squidaio_request_t *head, **tailp;
119 }
120 
121 request_queue2 = {
122 
123     NULL, &request_queue2.head
124 };
125 static squidaio_request_queue_t done_queue;
126 
127 static struct {
128     squidaio_request_t *head, **tailp;
129 }
130 
131 done_requests = {
132 
133     NULL, &done_requests.head
134 };
135 
136 static HANDLE main_thread;
137 
138 static MemAllocator *
squidaio_get_pool(int size)139 squidaio_get_pool(int size)
140 {
141     if (size <= AIO_LARGE_BUFS) {
142         if (size <= AIO_MICRO_BUFS)
143             return squidaio_micro_bufs;
144         else if (size <= AIO_TINY_BUFS)
145             return squidaio_tiny_bufs;
146         else if (size <= AIO_SMALL_BUFS)
147             return squidaio_small_bufs;
148         else if (size <= AIO_MEDIUM_BUFS)
149             return squidaio_medium_bufs;
150         else
151             return squidaio_large_bufs;
152     }
153 
154     return NULL;
155 }
156 
157 void *
squidaio_xmalloc(int size)158 squidaio_xmalloc(int size)
159 {
160     void *p;
161     MemAllocator *pool;
162 
163     if ((pool = squidaio_get_pool(size)) != NULL) {
164         p = pool->alloc();
165     } else
166         p = xmalloc(size);
167 
168     return p;
169 }
170 
171 static char *
squidaio_xstrdup(const char * str)172 squidaio_xstrdup(const char *str)
173 {
174     char *p;
175     int len = strlen(str) + 1;
176 
177     p = (char *)squidaio_xmalloc(len);
178     strncpy(p, str, len);
179 
180     return p;
181 }
182 
183 void
squidaio_xfree(void * p,int size)184 squidaio_xfree(void *p, int size)
185 {
186     MemAllocator *pool;
187 
188     if ((pool = squidaio_get_pool(size)) != NULL) {
189         pool->freeOne(p);
190     } else
191         xfree(p);
192 }
193 
194 static void
squidaio_xstrfree(char * str)195 squidaio_xstrfree(char *str)
196 {
197     MemAllocator *pool;
198     int len = strlen(str) + 1;
199 
200     if ((pool = squidaio_get_pool(len)) != NULL) {
201         pool->freeOne(str);
202     } else
203         xfree(str);
204 }
205 
206 void
squidaio_init(void)207 squidaio_init(void)
208 {
209     int i;
210     squidaio_thread_t *threadp;
211 
212     if (squidaio_initialised)
213         return;
214 
215     if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
216                          GetCurrentThread(),  /* pseudo handle to copy */
217                          GetCurrentProcess(), /* pseudo handle, don't close */
218                          &main_thread,
219                          0,                   /* required access */
220                          FALSE,               /* child process's don't inherit the handle */
221                          DUPLICATE_SAME_ACCESS)) {
222         /* spit errors */
223         fatal("Couldn't get current thread handle");
224     }
225 
226     /* Initialize request queue */
227     if ((request_queue.mutex = CreateMutex(NULL,    /* no inheritance */
228                                            FALSE,   /* start unowned (as per mutex_init) */
229                                            NULL)    /* no name */
230         ) == NULL) {
231         fatal("Failed to create mutex");
232     }
233 
234     if ((request_queue.cond = CreateEvent(NULL,     /* no inheritance */
235                                           FALSE,    /* auto signal reset - which I think is pthreads like ? */
236                                           FALSE,    /* start non signaled */
237                                           NULL)     /* no name */
238         ) == NULL) {
239         fatal("Failed to create condition variable");
240     }
241 
242     request_queue.head = NULL;
243 
244     request_queue.tailp = &request_queue.head;
245 
246     request_queue.requests = 0;
247 
248     request_queue.blocked = 0;
249 
250     /* Initialize done queue */
251 
252     if ((done_queue.mutex = CreateMutex(NULL,  /* no inheritance */
253                                         FALSE, /* start unowned (as per mutex_init) */
254                                         NULL)  /* no name */
255         ) == NULL) {
256         fatal("Failed to create mutex");
257     }
258 
259     if ((done_queue.cond = CreateEvent(NULL,  /* no inheritance */
260                                        TRUE,  /* manually signaled - which I think is pthreads like ? */
261                                        FALSE, /* start non signaled */
262                                        NULL)  /* no name */
263         ) == NULL) {
264         fatal("Failed to create condition variable");
265     }
266 
267     done_queue.head = NULL;
268 
269     done_queue.tailp = &done_queue.head;
270 
271     done_queue.requests = 0;
272 
273     done_queue.blocked = 0;
274 
275     // Initialize the thread I/O pipes before creating any threads
276     // see bug 3189 comment 5 about race conditions.
277     CommIO::Initialize();
278 
279     /* Create threads and get them to sit in their wait loop */
280     squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
281 
282     assert(NUMTHREADS);
283 
284     for (i = 0; i < NUMTHREADS; ++i) {
285         threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
286         threadp->status = _THREAD_STARTING;
287         threadp->current_req = NULL;
288         threadp->requests = 0;
289         threadp->next = threads;
290         threads = threadp;
291 
292         if ((threadp->thread = CreateThread(NULL,                   /* no security attributes */
293                                             0,                      /* use default stack size */
294                                             squidaio_thread_loop,   /* thread function */
295                                             threadp,                /* argument to thread function */
296                                             0,                      /* use default creation flags */
297                                             &(threadp->dwThreadId)) /* returns the thread identifier */
298             ) == NULL) {
299             fprintf(stderr, "Thread creation failed\n");
300             threadp->status = _THREAD_FAILED;
301             continue;
302         }
303 
304         /* Set the new thread priority above parent process */
305         SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
306     }
307 
308     /* Create request pool */
309     squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
310 
311     squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
312 
313     squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
314 
315     squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
316 
317     squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
318 
319     squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
320 
321     squidaio_initialised = 1;
322 }
323 
324 void
squidaio_shutdown(void)325 squidaio_shutdown(void)
326 {
327     squidaio_thread_t *threadp;
328     int i;
329     HANDLE * hthreads;
330 
331     if (!squidaio_initialised)
332         return;
333 
334     /* This is the same as in squidaio_sync */
335     do {
336         squidaio_poll_queues();
337     } while (request_queue_len > 0);
338 
339     hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
340 
341     threadp = threads;
342 
343     for (i = 0; i < NUMTHREADS; ++i) {
344         threadp->exit = 1;
345         hthreads[i] = threadp->thread;
346         threadp = threadp->next;
347     }
348 
349     ReleaseMutex(request_queue.mutex);
350     ResetEvent(request_queue.cond);
351     ReleaseMutex(done_queue.mutex);
352     ResetEvent(done_queue.cond);
353     Sleep(0);
354 
355     WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
356 
357     for (i = 0; i < NUMTHREADS; ++i) {
358         CloseHandle(hthreads[i]);
359     }
360 
361     CloseHandle(main_thread);
362     CommIO::NotifyIOClose();
363 
364     squidaio_initialised = 0;
365     xfree(hthreads);
366 }
367 
368 static DWORD WINAPI
squidaio_thread_loop(LPVOID lpParam)369 squidaio_thread_loop(LPVOID lpParam)
370 {
371     squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
372     squidaio_request_t *request;
373     HANDLE cond; /* local copy of the event queue because win32 event handles
374                               * don't atomically release the mutex as cond variables do. */
375 
376     /* lock the thread info */
377 
378     if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
379         fatal("Can't get ownership of mutex\n");
380     }
381 
382     /* duplicate the handle */
383     if (!DuplicateHandle(GetCurrentProcess(),    /* pseudo handle, don't close */
384                          request_queue.cond,     /* handle to copy */
385                          GetCurrentProcess(),    /* pseudo handle, don't close */
386                          &cond,
387                          0,                      /* required access */
388                          FALSE,                  /* child process's don't inherit the handle */
389                          DUPLICATE_SAME_ACCESS))
390         fatal("Can't duplicate mutex handle\n");
391 
392     if (!ReleaseMutex(request_queue.mutex)) {
393         CloseHandle(cond);
394         fatal("Can't release mutex\n");
395     }
396 
397     Sleep(0);
398 
399     while (1) {
400         DWORD rv;
401         threadp->current_req = request = NULL;
402         request = NULL;
403         /* Get a request to process */
404         threadp->status = _THREAD_WAITING;
405 
406         if (threadp->exit) {
407             CloseHandle(request_queue.mutex);
408             CloseHandle(cond);
409             return 0;
410         }
411 
412         rv = WaitForSingleObject(request_queue.mutex, INFINITE);
413 
414         if (rv == WAIT_FAILED) {
415             CloseHandle(cond);
416             return 1;
417         }
418 
419         while (!request_queue.head) {
420             if (!ReleaseMutex(request_queue.mutex)) {
421                 CloseHandle(cond);
422                 threadp->status = _THREAD_FAILED;
423                 return 1;
424             }
425 
426             Sleep(0);
427             rv = WaitForSingleObject(cond, INFINITE);
428 
429             if (rv == WAIT_FAILED) {
430                 CloseHandle(cond);
431                 return 1;
432             }
433 
434             rv = WaitForSingleObject(request_queue.mutex, INFINITE);
435 
436             if (rv == WAIT_FAILED) {
437                 CloseHandle(cond);
438                 return 1;
439             }
440         }
441 
442         request = request_queue.head;
443 
444         if (request)
445             request_queue.head = request->next;
446 
447         if (!request_queue.head)
448             request_queue.tailp = &request_queue.head;
449 
450         if (!ReleaseMutex(request_queue.mutex)) {
451             CloseHandle(cond);
452             return 1;
453         }
454 
455         Sleep(0);
456 
457         /* process the request */
458         threadp->status = _THREAD_BUSY;
459 
460         request->next = NULL;
461 
462         threadp->current_req = request;
463 
464         errno = 0;
465 
466         if (!request->cancelled) {
467             switch (request->request_type) {
468 
469             case _AIO_OP_OPEN:
470                 squidaio_do_open(request);
471                 break;
472 
473             case _AIO_OP_READ:
474                 squidaio_do_read(request);
475                 break;
476 
477             case _AIO_OP_WRITE:
478                 squidaio_do_write(request);
479                 break;
480 
481             case _AIO_OP_CLOSE:
482                 squidaio_do_close(request);
483                 break;
484 
485             case _AIO_OP_UNLINK:
486                 squidaio_do_unlink(request);
487                 break;
488 
489 #if AIO_OPENDIR         /* Opendir not implemented yet */
490 
491             case _AIO_OP_OPENDIR:
492                 squidaio_do_opendir(request);
493                 break;
494 #endif
495 
496             case _AIO_OP_STAT:
497                 squidaio_do_stat(request);
498                 break;
499 
500             default:
501                 request->ret = -1;
502                 request->err = EINVAL;
503                 break;
504             }
505         } else {        /* cancelled */
506             request->ret = -1;
507             request->err = EINTR;
508         }
509 
510         threadp->status = _THREAD_DONE;
511         /* put the request in the done queue */
512         rv = WaitForSingleObject(done_queue.mutex, INFINITE);
513 
514         if (rv == WAIT_FAILED) {
515             CloseHandle(cond);
516             return 1;
517         }
518 
519         *done_queue.tailp = request;
520         done_queue.tailp = &request->next;
521 
522         if (!ReleaseMutex(done_queue.mutex)) {
523             CloseHandle(cond);
524             return 1;
525         }
526 
527         CommIO::NotifyIOCompleted();
528         Sleep(0);
529         ++ threadp->requests;
530     }               /* while forever */
531 
532     CloseHandle(cond);
533 
534     return 0;
535 }               /* squidaio_thread_loop */
536 
537 static void
squidaio_queue_request(squidaio_request_t * request)538 squidaio_queue_request(squidaio_request_t * request)
539 {
540     static int high_start = 0;
541     debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
542     /* Mark it as not executed (failing result, no error) */
543     request->ret = -1;
544     request->err = 0;
545     /* Internal housekeeping */
546     request_queue_len += 1;
547     request->resultp->_data = request;
548     /* Play some tricks with the request_queue2 queue */
549     request->next = NULL;
550 
551     if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
552         if (request_queue2.head) {
553             /* Grab blocked requests */
554             *request_queue.tailp = request_queue2.head;
555             request_queue.tailp = request_queue2.tailp;
556         }
557 
558         /* Enqueue request */
559         *request_queue.tailp = request;
560 
561         request_queue.tailp = &request->next;
562 
563         if (!SetEvent(request_queue.cond))
564             fatal("Couldn't push queue");
565 
566         if (!ReleaseMutex(request_queue.mutex)) {
567             /* unexpected error */
568             fatal("Couldn't push queue");
569         }
570 
571         Sleep(0);
572 
573         if (request_queue2.head) {
574             /* Clear queue of blocked requests */
575             request_queue2.head = NULL;
576             request_queue2.tailp = &request_queue2.head;
577         }
578     } else {
579         /* Oops, the request queue is blocked, use request_queue2 */
580         *request_queue2.tailp = request;
581         request_queue2.tailp = &request->next;
582     }
583 
584     if (request_queue2.head) {
585         static uint64_t filter = 0;
586         static uint64_t filter_limit = 8196;
587 
588         if (++filter >= filter_limit) {
589             filter_limit += filter;
590             filter = 0;
591             debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Queue congestion (growing to " << filter_limit << ")");
592         }
593     }
594 
595     /* Warn if out of threads */
596     if (request_queue_len > MAGIC1) {
597         static int last_warn = 0;
598         static int queue_high, queue_low;
599 
600         if (high_start == 0) {
601             high_start = (int)squid_curtime;
602             queue_high = request_queue_len;
603             queue_low = request_queue_len;
604         }
605 
606         if (request_queue_len > queue_high)
607             queue_high = request_queue_len;
608 
609         if (request_queue_len < queue_low)
610             queue_low = request_queue_len;
611 
612         if (squid_curtime >= (last_warn + 15) &&
613                 squid_curtime >= (high_start + 5)) {
614             debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Disk I/O overloading");
615 
616             if (squid_curtime >= (high_start + 15))
617                 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
618                        request_queue_len << ", high=" << queue_high <<
619                        ", low=" << queue_low << ", duration=" <<
620                        (long int) (squid_curtime - high_start));
621 
622             last_warn = (int)squid_curtime;
623         }
624     } else {
625         high_start = 0;
626     }
627 
628     /* Warn if seriously overloaded */
629     if (request_queue_len > RIDICULOUS_LENGTH) {
630         debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
631         debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
632         squidaio_sync();
633         debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
634     }
635 }               /* squidaio_queue_request */
636 
637 static void
squidaio_cleanup_request(squidaio_request_t * requestp)638 squidaio_cleanup_request(squidaio_request_t * requestp)
639 {
640     squidaio_result_t *resultp = requestp->resultp;
641     int cancelled = requestp->cancelled;
642 
643     /* Free allocated structures and copy data back to user space if the */
644     /* request hasn't been cancelled */
645 
646     switch (requestp->request_type) {
647 
648     case _AIO_OP_STAT:
649 
650         if (!cancelled && requestp->ret == 0)
651             memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
652 
653         squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
654 
655         squidaio_xstrfree(requestp->path);
656 
657         break;
658 
659     case _AIO_OP_OPEN:
660         if (cancelled && requestp->ret >= 0)
661             /* The open() was cancelled but completed */
662             close(requestp->ret);
663 
664         squidaio_xstrfree(requestp->path);
665 
666         break;
667 
668     case _AIO_OP_CLOSE:
669         if (cancelled && requestp->ret < 0)
670             /* The close() was cancelled and never got executed */
671             close(requestp->fd);
672 
673         break;
674 
675     case _AIO_OP_UNLINK:
676 
677     case _AIO_OP_OPENDIR:
678         squidaio_xstrfree(requestp->path);
679 
680         break;
681 
682     case _AIO_OP_READ:
683         break;
684 
685     case _AIO_OP_WRITE:
686         break;
687 
688     default:
689         break;
690     }
691 
692     if (resultp != NULL && !cancelled) {
693         resultp->aio_return = requestp->ret;
694         resultp->aio_errno = requestp->err;
695     }
696 
697     squidaio_request_pool->freeOne(requestp);
698 }               /* squidaio_cleanup_request */
699 
700 int
squidaio_cancel(squidaio_result_t * resultp)701 squidaio_cancel(squidaio_result_t * resultp)
702 {
703     squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
704 
705     if (request && request->resultp == resultp) {
706         debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
707         request->cancelled = 1;
708         request->resultp = NULL;
709         resultp->_data = NULL;
710         resultp->result_type = _AIO_OP_NONE;
711         return 0;
712     }
713 
714     return 1;
715 }               /* squidaio_cancel */
716 
717 int
squidaio_open(const char * path,int oflag,mode_t mode,squidaio_result_t * resultp)718 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
719 {
720     squidaio_init();
721     squidaio_request_t *requestp;
722 
723     requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
724 
725     requestp->path = (char *) squidaio_xstrdup(path);
726 
727     requestp->oflag = oflag;
728 
729     requestp->mode = mode;
730 
731     requestp->resultp = resultp;
732 
733     requestp->request_type = _AIO_OP_OPEN;
734 
735     requestp->cancelled = 0;
736 
737     resultp->result_type = _AIO_OP_OPEN;
738 
739     squidaio_queue_request(requestp);
740 
741     return 0;
742 }
743 
744 static void
squidaio_do_open(squidaio_request_t * requestp)745 squidaio_do_open(squidaio_request_t * requestp)
746 {
747     requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
748     requestp->err = errno;
749 }
750 
751 int
squidaio_read(int fd,char * bufp,size_t bufs,off_t offset,int whence,squidaio_result_t * resultp)752 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
753 {
754     squidaio_request_t *requestp;
755 
756     requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
757 
758     requestp->fd = fd;
759 
760     requestp->bufferp = bufp;
761 
762     requestp->buflen = bufs;
763 
764     requestp->offset = offset;
765 
766     requestp->whence = whence;
767 
768     requestp->resultp = resultp;
769 
770     requestp->request_type = _AIO_OP_READ;
771 
772     requestp->cancelled = 0;
773 
774     resultp->result_type = _AIO_OP_READ;
775 
776     squidaio_queue_request(requestp);
777 
778     return 0;
779 }
780 
781 static void
squidaio_do_read(squidaio_request_t * requestp)782 squidaio_do_read(squidaio_request_t * requestp)
783 {
784     lseek(requestp->fd, requestp->offset, requestp->whence);
785 
786     if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
787                   requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
788         WIN32_maperror(GetLastError());
789         requestp->ret = -1;
790     }
791 
792     requestp->err = errno;
793 }
794 
795 int
squidaio_write(int fd,char * bufp,size_t bufs,off_t offset,int whence,squidaio_result_t * resultp)796 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
797 {
798     squidaio_request_t *requestp;
799 
800     requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
801 
802     requestp->fd = fd;
803 
804     requestp->bufferp = bufp;
805 
806     requestp->buflen = bufs;
807 
808     requestp->offset = offset;
809 
810     requestp->whence = whence;
811 
812     requestp->resultp = resultp;
813 
814     requestp->request_type = _AIO_OP_WRITE;
815 
816     requestp->cancelled = 0;
817 
818     resultp->result_type = _AIO_OP_WRITE;
819 
820     squidaio_queue_request(requestp);
821 
822     return 0;
823 }
824 
825 static void
squidaio_do_write(squidaio_request_t * requestp)826 squidaio_do_write(squidaio_request_t * requestp)
827 {
828     if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
829                    requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
830         WIN32_maperror(GetLastError());
831         requestp->ret = -1;
832     }
833 
834     requestp->err = errno;
835 }
836 
837 int
squidaio_close(int fd,squidaio_result_t * resultp)838 squidaio_close(int fd, squidaio_result_t * resultp)
839 {
840     squidaio_request_t *requestp;
841 
842     requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
843 
844     requestp->fd = fd;
845 
846     requestp->resultp = resultp;
847 
848     requestp->request_type = _AIO_OP_CLOSE;
849 
850     requestp->cancelled = 0;
851 
852     resultp->result_type = _AIO_OP_CLOSE;
853 
854     squidaio_queue_request(requestp);
855 
856     return 0;
857 }
858 
859 static void
squidaio_do_close(squidaio_request_t * requestp)860 squidaio_do_close(squidaio_request_t * requestp)
861 {
862     if ((requestp->ret = close(requestp->fd)) < 0) {
863         debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
864         close(requestp->fd);
865     }
866 
867     requestp->err = errno;
868 }
869 
870 int
871 
squidaio_stat(const char * path,struct stat * sb,squidaio_result_t * resultp)872 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
873 {
874     squidaio_init();
875     squidaio_request_t *requestp;
876 
877     requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
878 
879     requestp->path = (char *) squidaio_xstrdup(path);
880 
881     requestp->statp = sb;
882 
883     requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
884 
885     requestp->resultp = resultp;
886 
887     requestp->request_type = _AIO_OP_STAT;
888 
889     requestp->cancelled = 0;
890 
891     resultp->result_type = _AIO_OP_STAT;
892 
893     squidaio_queue_request(requestp);
894 
895     return 0;
896 }
897 
898 static void
squidaio_do_stat(squidaio_request_t * requestp)899 squidaio_do_stat(squidaio_request_t * requestp)
900 {
901     requestp->ret = stat(requestp->path, requestp->tmpstatp);
902     requestp->err = errno;
903 }
904 
905 int
squidaio_unlink(const char * path,squidaio_result_t * resultp)906 squidaio_unlink(const char *path, squidaio_result_t * resultp)
907 {
908     squidaio_init();
909     squidaio_request_t *requestp;
910 
911     requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
912 
913     requestp->path = squidaio_xstrdup(path);
914 
915     requestp->resultp = resultp;
916 
917     requestp->request_type = _AIO_OP_UNLINK;
918 
919     requestp->cancelled = 0;
920 
921     resultp->result_type = _AIO_OP_UNLINK;
922 
923     squidaio_queue_request(requestp);
924 
925     return 0;
926 }
927 
928 static void
squidaio_do_unlink(squidaio_request_t * requestp)929 squidaio_do_unlink(squidaio_request_t * requestp)
930 {
931     requestp->ret = unlink(requestp->path);
932     requestp->err = errno;
933 }
934 
935 #if AIO_OPENDIR
936 /* XXX squidaio_opendir NOT implemented yet.. */
937 
938 int
squidaio_opendir(const char * path,squidaio_result_t * resultp)939 squidaio_opendir(const char *path, squidaio_result_t * resultp)
940 {
941     squidaio_request_t *requestp;
942     int len;
943 
944     requestp = squidaio_request_pool->alloc();
945 
946     resultp->result_type = _AIO_OP_OPENDIR;
947 
948     return -1;
949 }
950 
951 static void
squidaio_do_opendir(squidaio_request_t * requestp)952 squidaio_do_opendir(squidaio_request_t * requestp)
953 {
954     /* NOT IMPLEMENTED */
955 }
956 
957 #endif
958 
959 static void
squidaio_poll_queues(void)960 squidaio_poll_queues(void)
961 {
962     /* kick "overflow" request queue */
963 
964     if (request_queue2.head &&
965             (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
966         *request_queue.tailp = request_queue2.head;
967         request_queue.tailp = request_queue2.tailp;
968 
969         if (!SetEvent(request_queue.cond))
970             fatal("couldn't push queue\n");
971 
972         if (!ReleaseMutex(request_queue.mutex)) {
973             /* unexpected error */
974         }
975 
976         Sleep(0);
977         request_queue2.head = NULL;
978         request_queue2.tailp = &request_queue2.head;
979     }
980 
981     /* poll done queue */
982     if (done_queue.head &&
983             (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
984 
985         struct squidaio_request_t *requests = done_queue.head;
986         done_queue.head = NULL;
987         done_queue.tailp = &done_queue.head;
988 
989         if (!ReleaseMutex(done_queue.mutex)) {
990             /* unexpected error */
991         }
992 
993         Sleep(0);
994         *done_requests.tailp = requests;
995         request_queue_len -= 1;
996 
997         while (requests->next) {
998             requests = requests->next;
999             request_queue_len -= 1;
1000         }
1001 
1002         done_requests.tailp = &requests->next;
1003     }
1004 }
1005 
1006 squidaio_result_t *
squidaio_poll_done(void)1007 squidaio_poll_done(void)
1008 {
1009     squidaio_request_t *request;
1010     squidaio_result_t *resultp;
1011     int cancelled;
1012     int polled = 0;
1013 
1014 AIO_REPOLL:
1015     request = done_requests.head;
1016 
1017     if (request == NULL && !polled) {
1018         CommIO::ResetNotifications();
1019         squidaio_poll_queues();
1020         polled = 1;
1021         request = done_requests.head;
1022     }
1023 
1024     if (!request) {
1025         return NULL;
1026     }
1027 
1028     debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1029     done_requests.head = request->next;
1030 
1031     if (!done_requests.head)
1032         done_requests.tailp = &done_requests.head;
1033 
1034     resultp = request->resultp;
1035 
1036     cancelled = request->cancelled;
1037 
1038     squidaio_debug(request);
1039 
1040     debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1041 
1042     squidaio_cleanup_request(request);
1043 
1044     if (cancelled)
1045         goto AIO_REPOLL;
1046 
1047     return resultp;
1048 }               /* squidaio_poll_done */
1049 
1050 int
squidaio_operations_pending(void)1051 squidaio_operations_pending(void)
1052 {
1053     return request_queue_len + (done_requests.head ? 1 : 0);
1054 }
1055 
1056 int
squidaio_sync(void)1057 squidaio_sync(void)
1058 {
1059     /* XXX This might take a while if the queue is large.. */
1060 
1061     do {
1062         squidaio_poll_queues();
1063     } while (request_queue_len > 0);
1064 
1065     return squidaio_operations_pending();
1066 }
1067 
1068 int
squidaio_get_queue_len(void)1069 squidaio_get_queue_len(void)
1070 {
1071     return request_queue_len;
1072 }
1073 
1074 static void
squidaio_debug(squidaio_request_t * request)1075 squidaio_debug(squidaio_request_t * request)
1076 {
1077     switch (request->request_type) {
1078 
1079     case _AIO_OP_OPEN:
1080         debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1081         break;
1082 
1083     case _AIO_OP_READ:
1084         debugs(43, 5, "READ on fd: " << request->fd);
1085         break;
1086 
1087     case _AIO_OP_WRITE:
1088         debugs(43, 5, "WRITE on fd: " << request->fd);
1089         break;
1090 
1091     case _AIO_OP_CLOSE:
1092         debugs(43, 5, "CLOSE of fd: " << request->fd);
1093         break;
1094 
1095     case _AIO_OP_UNLINK:
1096         debugs(43, 5, "UNLINK of " << request->path);
1097         break;
1098 
1099     default:
1100         break;
1101     }
1102 }
1103 
1104 void
squidaio_stats(StoreEntry * sentry)1105 squidaio_stats(StoreEntry * sentry)
1106 {
1107     squidaio_thread_t *threadp;
1108     int i;
1109 
1110     if (!squidaio_initialised)
1111         return;
1112 
1113     storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1114 
1115     storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1116 
1117     threadp = threads;
1118 
1119     for (i = 0; i < NUMTHREADS; ++i) {
1120         storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1121         threadp = threadp->next;
1122     }
1123 }
1124 
1125