1 /* Copyright (C) 2019, 2020, MariaDB Corporation.
2
3 This program is free software; you can redistribute itand /or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
15
16 #include "tpool_structs.h"
17 #include "tpool.h"
18
19 #ifdef LINUX_NATIVE_AIO
20 # include <thread>
21 # include <atomic>
22 # include <libaio.h>
23 # include <sys/syscall.h>
24
25 /**
26 Invoke the io_getevents() system call, without timeout parameter.
27
28 @param ctx context from io_setup()
29 @param min_nr minimum number of completion events to wait for
30 @param nr maximum number of completion events to collect
31 @param ev the collected events
32
33 In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
34 the io_getevents() implementation in libaio was "optimized" so that it
35 would elide the system call when there are no outstanding requests
36 and a timeout was specified.
37
38 The libaio code for dereferencing ctx would occasionally trigger
39 SIGSEGV if io_destroy() was concurrently invoked from another thread.
40 Hence, we have to use the raw system call.
41
42 WHY are we doing this at all?
43 Because we want io_destroy() from another thread to interrupt io_getevents().
44
45 And, WHY do we want io_destroy() from another thread to interrupt
46 io_getevents()?
47
48 Because there is no documented, libaio-friendly and race-condition-free way to
49 interrupt io_getevents(). io_destroy() coupled with raw syscall seemed to work
50 for us so far.
51
52 Historical note : in the past, we used io_getevents with timeouts. We'd wake
53 up periodically, check for shutdown flag, return from the main routine.
54 This was admittedly safer, yet it did cost periodic wakeups, which we are not
55 willing to do anymore.
56
57 @note we also rely on the undocumented property, that io_destroy(ctx)
58 will make this version of io_getevents return EINVAL.
59 */
my_getevents(io_context_t ctx,long min_nr,long nr,io_event * ev)60 static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
61 {
62 int saved_errno= errno;
63 int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
64 min_nr, nr, ev, 0);
65 if (ret < 0)
66 {
67 ret= -errno;
68 errno= saved_errno;
69 }
70 return ret;
71 }
72 #endif
73
74
75 /*
76 Linux AIO implementation, based on native AIO.
77 Needs libaio.h and -laio at the compile time.
78
79 io_submit() is used to submit async IO.
80
81 A single thread will collect the completion notification
82 with io_getevents() and forward io completion callback to
83 the worker threadpool.
84 */
85 namespace tpool
86 {
87 #ifdef LINUX_NATIVE_AIO
88
89 class aio_linux final : public aio
90 {
91 thread_pool *m_pool;
92 io_context_t m_io_ctx;
93 std::thread m_getevent_thread;
94 static std::atomic<bool> shutdown_in_progress;
95
getevent_thread_routine(aio_linux * aio)96 static void getevent_thread_routine(aio_linux *aio)
97 {
98 /*
99 We collect events in small batches to hopefully reduce the
100 number of system calls.
101 */
102 constexpr unsigned MAX_EVENTS= 256;
103
104 io_event events[MAX_EVENTS];
105 for (;;)
106 {
107 switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
108 case -EINTR:
109 continue;
110 case -EINVAL:
111 if (shutdown_in_progress)
112 return;
113 /* fall through */
114 default:
115 if (ret < 0)
116 {
117 fprintf(stderr, "io_getevents returned %d\n", ret);
118 abort();
119 return;
120 }
121 for (int i= 0; i < ret; i++)
122 {
123 const io_event &event= events[i];
124 aiocb *iocb= static_cast<aiocb*>(event.obj);
125 if (static_cast<int>(event.res) < 0)
126 {
127 iocb->m_err= -event.res;
128 iocb->m_ret_len= 0;
129 }
130 else
131 {
132 iocb->m_ret_len= event.res;
133 iocb->m_err= 0;
134 }
135 iocb->m_internal_task.m_func= iocb->m_callback;
136 iocb->m_internal_task.m_arg= iocb;
137 iocb->m_internal_task.m_group= iocb->m_group;
138 aio->m_pool->submit_task(&iocb->m_internal_task);
139 }
140 }
141 }
142 }
143
144 public:
aio_linux(io_context_t ctx,thread_pool * pool)145 aio_linux(io_context_t ctx, thread_pool *pool)
146 : m_pool(pool), m_io_ctx(ctx),
147 m_getevent_thread(getevent_thread_routine, this)
148 {
149 }
150
~aio_linux()151 ~aio_linux()
152 {
153 shutdown_in_progress= true;
154 io_destroy(m_io_ctx);
155 m_getevent_thread.join();
156 shutdown_in_progress= false;
157 }
158
submit_io(aiocb * cb)159 int submit_io(aiocb *cb) override
160 {
161 io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len,
162 cb->m_offset);
163 if (cb->m_opcode != aio_opcode::AIO_PREAD)
164 cb->aio_lio_opcode= IO_CMD_PWRITE;
165 iocb *icb= static_cast<iocb*>(cb);
166 int ret= io_submit(m_io_ctx, 1, &icb);
167 if (ret == 1)
168 return 0;
169 errno= -ret;
170 return -1;
171 }
172
bind(native_file_handle &)173 int bind(native_file_handle&) override { return 0; }
unbind(const native_file_handle &)174 int unbind(const native_file_handle&) override { return 0; }
175 };
176
177 std::atomic<bool> aio_linux::shutdown_in_progress;
178
create_linux_aio(thread_pool * pool,int max_io)179 aio *create_linux_aio(thread_pool *pool, int max_io)
180 {
181 io_context_t ctx;
182 memset(&ctx, 0, sizeof ctx);
183 if (int ret= io_setup(max_io, &ctx))
184 {
185 fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
186 return nullptr;
187 }
188 return new aio_linux(ctx, pool);
189 }
190 #else
191 aio *create_linux_aio(thread_pool*, int) { return nullptr; }
192 #endif
193 }
194