1 /* Copyright (C) 2019, 2020, MariaDB Corporation.
2 
3 This program is free software; you can redistribute itand /or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6 
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
10 GNU General Public License for more details.
11 
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
15 
16 #include "tpool_structs.h"
17 #include "tpool.h"
18 
19 #ifdef LINUX_NATIVE_AIO
20 # include <thread>
21 # include <atomic>
22 # include <libaio.h>
23 # include <sys/syscall.h>
24 
25 /**
26   Invoke the io_getevents() system call, without timeout parameter.
27 
28   @param ctx     context from io_setup()
29   @param min_nr  minimum number of completion events to wait for
30   @param nr      maximum number of completion events to collect
31   @param ev      the collected events
32 
33   In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
34   the io_getevents() implementation in libaio was "optimized" so that it
35   would elide the system call when there are no outstanding requests
36   and a timeout was specified.
37 
38   The libaio code for dereferencing ctx would occasionally trigger
39   SIGSEGV if io_destroy() was concurrently invoked from another thread.
40   Hence, we have to use the raw system call.
41 
42   WHY are we doing this at all?
43   Because we want io_destroy() from another thread to interrupt io_getevents().
44 
45   And, WHY do we want io_destroy() from another thread to interrupt
46   io_getevents()?
47 
48   Because there is no documented, libaio-friendly and race-condition-free way to
49   interrupt io_getevents(). io_destroy() coupled with raw syscall seemed to work
50   for us so far.
51 
52   Historical note : in the past, we used io_getevents with timeouts. We'd wake
53   up periodically, check for shutdown flag, return from the main routine.
54   This was admittedly safer, yet it did cost periodic wakeups, which we are not
55   willing to do anymore.
56 
57   @note we also rely on the undocumented property, that io_destroy(ctx)
58   will make this version of io_getevents return EINVAL.
59 */
my_getevents(io_context_t ctx,long min_nr,long nr,io_event * ev)60 static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
61 {
62   int saved_errno= errno;
63   int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
64                    min_nr, nr, ev, 0);
65   if (ret < 0)
66   {
67     ret= -errno;
68     errno= saved_errno;
69   }
70   return ret;
71 }
72 #endif
73 
74 
75 /*
76   Linux AIO implementation, based on native AIO.
77   Needs libaio.h and -laio at the compile time.
78 
79   io_submit() is used to submit async IO.
80 
81   A single thread will collect the completion notification
82   with io_getevents() and forward io completion callback to
83   the worker threadpool.
84 */
85 namespace tpool
86 {
87 #ifdef LINUX_NATIVE_AIO
88 
89 class aio_linux final : public aio
90 {
91   thread_pool *m_pool;
92   io_context_t m_io_ctx;
93   std::thread m_getevent_thread;
94   static std::atomic<bool> shutdown_in_progress;
95 
getevent_thread_routine(aio_linux * aio)96   static void getevent_thread_routine(aio_linux *aio)
97   {
98     /*
99       We collect events in small batches to hopefully reduce the
100       number of system calls.
101     */
102     constexpr unsigned MAX_EVENTS= 256;
103 
104     io_event events[MAX_EVENTS];
105     for (;;)
106     {
107       switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
108       case -EINTR:
109         continue;
110       case -EINVAL:
111         if (shutdown_in_progress)
112           return;
113         /* fall through */
114       default:
115         if (ret < 0)
116         {
117           fprintf(stderr, "io_getevents returned %d\n", ret);
118           abort();
119           return;
120         }
121         for (int i= 0; i < ret; i++)
122         {
123           const io_event &event= events[i];
124           aiocb *iocb= static_cast<aiocb*>(event.obj);
125           if (static_cast<int>(event.res) < 0)
126           {
127             iocb->m_err= -event.res;
128             iocb->m_ret_len= 0;
129           }
130           else
131           {
132             iocb->m_ret_len= event.res;
133             iocb->m_err= 0;
134           }
135           iocb->m_internal_task.m_func= iocb->m_callback;
136           iocb->m_internal_task.m_arg= iocb;
137           iocb->m_internal_task.m_group= iocb->m_group;
138           aio->m_pool->submit_task(&iocb->m_internal_task);
139         }
140       }
141     }
142   }
143 
144 public:
aio_linux(io_context_t ctx,thread_pool * pool)145   aio_linux(io_context_t ctx, thread_pool *pool)
146     : m_pool(pool), m_io_ctx(ctx),
147     m_getevent_thread(getevent_thread_routine, this)
148   {
149   }
150 
~aio_linux()151   ~aio_linux()
152   {
153     shutdown_in_progress= true;
154     io_destroy(m_io_ctx);
155     m_getevent_thread.join();
156     shutdown_in_progress= false;
157   }
158 
submit_io(aiocb * cb)159   int submit_io(aiocb *cb) override
160   {
161     io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len,
162                   cb->m_offset);
163     if (cb->m_opcode != aio_opcode::AIO_PREAD)
164       cb->aio_lio_opcode= IO_CMD_PWRITE;
165     iocb *icb= static_cast<iocb*>(cb);
166     int ret= io_submit(m_io_ctx, 1, &icb);
167     if (ret == 1)
168       return 0;
169     errno= -ret;
170     return -1;
171   }
172 
bind(native_file_handle &)173   int bind(native_file_handle&) override { return 0; }
unbind(const native_file_handle &)174   int unbind(const native_file_handle&) override { return 0; }
175 };
176 
177 std::atomic<bool> aio_linux::shutdown_in_progress;
178 
create_linux_aio(thread_pool * pool,int max_io)179 aio *create_linux_aio(thread_pool *pool, int max_io)
180 {
181   io_context_t ctx;
182   memset(&ctx, 0, sizeof ctx);
183   if (int ret= io_setup(max_io, &ctx))
184   {
185     fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
186     return nullptr;
187   }
188   return new aio_linux(ctx, pool);
189 }
190 #else
191 aio *create_linux_aio(thread_pool*, int) { return nullptr; }
192 #endif
193 }
194