1 /* Copyright(C) 2019 MariaDB Corporation.
2
3 This program is free software; you can redistribute itand /or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
15
16 #include "tpool_structs.h"
17 #include <algorithm>
18 #include <assert.h>
19 #include <condition_variable>
20 #include <iostream>
21 #include <limits.h>
22 #include <mutex>
23 #include <queue>
24 #include <stack>
25 #include <thread>
26 #include <vector>
27 #include <tpool.h>
28
29 namespace tpool
30 {
31
32 /*
33 Windows AIO implementation, completion port based.
34 A single thread collects the completion notification with
35 GetQueuedCompletionStatus(), and forwards io completion callback
36 the worker threadpool
37 */
38 class tpool_generic_win_aio : public aio
39 {
40 /* Thread that does collects completion status from the completion port. */
41 std::thread m_thread;
42
43 /* IOCP Completion port.*/
44 HANDLE m_completion_port;
45
46 /* The worker pool where completion routine is executed, as task. */
47 thread_pool* m_pool;
48 public:
tpool_generic_win_aio(thread_pool * pool,int max_io)49 tpool_generic_win_aio(thread_pool* pool, int max_io) : m_pool(pool)
50 {
51 m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
52 m_thread = std::thread(aio_completion_thread_proc, this);
53 }
54
55 /**
56 Task to be executed in the work pool.
57 */
io_completion_task(void * data)58 static void io_completion_task(void* data)
59 {
60 auto cb = (aiocb*)data;
61 cb->execute_callback();
62 }
63
completion_thread_work()64 void completion_thread_work()
65 {
66 for (;;)
67 {
68 DWORD n_bytes;
69 aiocb* aiocb;
70 ULONG_PTR key;
71 if (!GetQueuedCompletionStatus(m_completion_port, &n_bytes, &key,
72 (LPOVERLAPPED*)& aiocb, INFINITE))
73 break;
74
75 aiocb->m_err = 0;
76 aiocb->m_ret_len = n_bytes;
77
78 if (n_bytes != aiocb->m_len)
79 {
80 if (GetOverlappedResult(aiocb->m_fh, aiocb,
81 (LPDWORD)& aiocb->m_ret_len, FALSE))
82 {
83 aiocb->m_err = GetLastError();
84 }
85 }
86 aiocb->m_internal_task.m_func = aiocb->m_callback;
87 aiocb->m_internal_task.m_arg = aiocb;
88 aiocb->m_internal_task.m_group = aiocb->m_group;
89 m_pool->submit_task(&aiocb->m_internal_task);
90 }
91 }
92
aio_completion_thread_proc(tpool_generic_win_aio * aio)93 static void aio_completion_thread_proc(tpool_generic_win_aio* aio)
94 {
95 aio->completion_thread_work();
96 }
97
~tpool_generic_win_aio()98 ~tpool_generic_win_aio()
99 {
100 if (m_completion_port)
101 CloseHandle(m_completion_port);
102 m_thread.join();
103 }
104
submit_io(aiocb * cb)105 virtual int submit_io(aiocb* cb) override
106 {
107 memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
108 cb->m_internal = this;
109 ULARGE_INTEGER uli;
110 uli.QuadPart = cb->m_offset;
111 cb->Offset = uli.LowPart;
112 cb->OffsetHigh = uli.HighPart;
113
114 BOOL ok;
115 if (cb->m_opcode == aio_opcode::AIO_PREAD)
116 ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
117 else
118 ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
119
120 if (ok || (GetLastError() == ERROR_IO_PENDING))
121 return 0;
122 return -1;
123 }
124
125 // Inherited via aio
bind(native_file_handle & fd)126 virtual int bind(native_file_handle& fd) override
127 {
128 return CreateIoCompletionPort(fd, m_completion_port, 0, 0) ? 0
129 : GetLastError();
130 }
unbind(const native_file_handle & fd)131 virtual int unbind(const native_file_handle& fd) override { return 0; }
132 };
133
create_win_aio(thread_pool * pool,int max_io)134 aio* create_win_aio(thread_pool* pool, int max_io)
135 {
136 return new tpool_generic_win_aio(pool, max_io);
137 }
138
139 } // namespace tpool
140