1 /* Copyright(C) 2019 MariaDB Corporation.
2 
3 This program is free software; you can redistribute itand /or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6 
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
10 GNU General Public License for more details.
11 
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
15 
16 #include "tpool_structs.h"
17 #include <algorithm>
18 #include <assert.h>
19 #include <condition_variable>
20 #include <iostream>
21 #include <limits.h>
22 #include <mutex>
23 #include <queue>
24 #include <stack>
25 #include <thread>
26 #include <vector>
27 #include <tpool.h>
28 
29 namespace tpool
30 {
31 
32 /*
33   Windows AIO implementation, completion port based.
34   A single thread collects the completion notification with
35   GetQueuedCompletionStatus(), and forwards io completion callback
36   the worker threadpool
37 */
38 class tpool_generic_win_aio : public aio
39 {
40   /* Thread that does collects completion status from the completion port. */
41   std::thread m_thread;
42 
43   /* IOCP Completion port.*/
44   HANDLE m_completion_port;
45 
46   /* The worker pool where completion routine is executed, as task. */
47   thread_pool* m_pool;
48 public:
tpool_generic_win_aio(thread_pool * pool,int max_io)49   tpool_generic_win_aio(thread_pool* pool, int max_io) : m_pool(pool)
50   {
51     m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
52     m_thread = std::thread(aio_completion_thread_proc, this);
53   }
54 
55   /**
56    Task to be executed in the work pool.
57   */
io_completion_task(void * data)58   static void io_completion_task(void* data)
59   {
60     auto cb = (aiocb*)data;
61     cb->execute_callback();
62   }
63 
completion_thread_work()64   void completion_thread_work()
65   {
66     for (;;)
67     {
68       DWORD n_bytes;
69       aiocb* aiocb;
70       ULONG_PTR key;
71       if (!GetQueuedCompletionStatus(m_completion_port, &n_bytes, &key,
72         (LPOVERLAPPED*)& aiocb, INFINITE))
73         break;
74 
75       aiocb->m_err = 0;
76       aiocb->m_ret_len = n_bytes;
77 
78       if (n_bytes != aiocb->m_len)
79       {
80         if (GetOverlappedResult(aiocb->m_fh, aiocb,
81           (LPDWORD)& aiocb->m_ret_len, FALSE))
82         {
83            aiocb->m_err = GetLastError();
84         }
85       }
86       aiocb->m_internal_task.m_func = aiocb->m_callback;
87       aiocb->m_internal_task.m_arg = aiocb;
88       aiocb->m_internal_task.m_group = aiocb->m_group;
89       m_pool->submit_task(&aiocb->m_internal_task);
90     }
91   }
92 
aio_completion_thread_proc(tpool_generic_win_aio * aio)93   static void aio_completion_thread_proc(tpool_generic_win_aio* aio)
94   {
95     aio->completion_thread_work();
96   }
97 
~tpool_generic_win_aio()98   ~tpool_generic_win_aio()
99   {
100     if (m_completion_port)
101       CloseHandle(m_completion_port);
102     m_thread.join();
103   }
104 
submit_io(aiocb * cb)105   virtual int submit_io(aiocb* cb) override
106   {
107     memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
108     cb->m_internal = this;
109     ULARGE_INTEGER uli;
110     uli.QuadPart = cb->m_offset;
111     cb->Offset = uli.LowPart;
112     cb->OffsetHigh = uli.HighPart;
113 
114     BOOL ok;
115     if (cb->m_opcode == aio_opcode::AIO_PREAD)
116       ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
117     else
118       ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
119 
120     if (ok || (GetLastError() == ERROR_IO_PENDING))
121       return 0;
122     return -1;
123   }
124 
125   // Inherited via aio
bind(native_file_handle & fd)126   virtual int bind(native_file_handle& fd) override
127   {
128     return CreateIoCompletionPort(fd, m_completion_port, 0, 0) ? 0
129       : GetLastError();
130   }
unbind(const native_file_handle & fd)131   virtual int unbind(const native_file_handle& fd) override { return 0; }
132 };
133 
create_win_aio(thread_pool * pool,int max_io)134 aio* create_win_aio(thread_pool* pool, int max_io)
135 {
136   return new tpool_generic_win_aio(pool, max_io);
137 }
138 
139 } // namespace tpool
140