1 /*
2 Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "WakeupHandler.hpp"
26 #include "Ndb.hpp"
27 #include "NdbImpl.hpp"
28 #include "trp_client.hpp"
29
30 // ***** Multiwait handler ****
31
32 /**
33 * An instance of this class is used when a single thread
34 * wants to wait for the asynchronous completion of transactions
35 * on multiple Ndb objects.
36 * When the thread starts waiting, all Ndb objects are checked
37 * for CompletedTransactions, and their wakeHandler is set to
38 * poin to the same MultiNdbWakeupHandler object. The thread
39 * is then put to sleep / polls on a designated Ndb object.
40 *
41 * As transactions complete, the MultiNdbWakeHandler object
42 * moves their Ndb objects to the start of the passed Ndb
43 * object list and determines whether enough have completed
44 * to wake the waiting thread.
45 * When enough have completed, the waiting thread is woken via
46 * the designated Ndb object.
47 *
48 * The design only supports one instance of the MultiNdbWakeupHandler
49 * object per ndb cluster connection and this can only be used from
50 * one thread.
51 */
52
MultiNdbWakeupHandler(Ndb * _wakeNdb)53 MultiNdbWakeupHandler::MultiNdbWakeupHandler(Ndb* _wakeNdb)
54 : wakeNdb(_wakeNdb)
55 {
56 localWakeupMutexPtr = NdbMutex_Create();
57 assert(localWakeupMutexPtr);
58 /* Register the waiter Ndb to receive wakeups for all Ndbs in the group */
59 PollGuard pg(* wakeNdb->theImpl);
60 ignore_wakeups();
61 bool rc = wakeNdb->theImpl->m_transporter_facade->registerForWakeup(wakeNdb->theImpl);
62 require(rc);
63 wakeNdb->theImpl->wakeHandler = this;
64 }
65
66
~MultiNdbWakeupHandler()67 MultiNdbWakeupHandler::~MultiNdbWakeupHandler()
68 {
69 if (localWakeupMutexPtr)
70 {
71 NdbMutex_Destroy(localWakeupMutexPtr);
72 localWakeupMutexPtr = NULL;
73 }
74 PollGuard pg(* wakeNdb->theImpl);
75 bool rc = wakeNdb->theImpl->m_transporter_facade->
76 unregisterForWakeup(wakeNdb->theImpl);
77 require(rc);
78 }
79
80
finalize_wait(int * nready)81 void MultiNdbWakeupHandler::finalize_wait(int *nready)
82 {
83 Uint32 num_completed_trans = 0;
84 for (Uint32 i = 0; i < cnt; i++)
85 {
86 Ndb *obj = objs[i];
87
88 NdbMutex_Lock(obj->theImpl->m_mutex);
89 if (obj->theNoOfCompletedTransactions)
90 {
91 swapNdbsInArray(i, num_completed_trans);
92 num_completed_trans++;
93 }
94 unregisterNdb(obj);
95 NdbMutex_Unlock(obj->theImpl->m_mutex);
96 }
97 *nready = num_completed_trans;
98 }
99
100
registerNdb(Ndb * obj,Uint32 pos)101 void MultiNdbWakeupHandler::registerNdb(Ndb *obj, Uint32 pos)
102 {
103 NdbMutex_Lock(obj->theImpl->m_mutex);
104 obj->theImpl->wakeHandler = this;
105 /* It may already have some completed transactions */
106 if (obj->theNoOfCompletedTransactions)
107 {
108 NdbMutex_Lock(localWakeupMutexPtr);
109 numNdbsWithCompletedTrans++;
110 NdbMutex_Unlock(localWakeupMutexPtr);
111 }
112 NdbMutex_Unlock(obj->theImpl->m_mutex);
113 }
114
115
unregisterNdb(Ndb * obj)116 void MultiNdbWakeupHandler::unregisterNdb(Ndb *obj)
117 {
118 obj->theImpl->wakeHandler = 0;
119 }
120
121
waitForInput(Ndb ** _objs,int _cnt,int min_req,int timeout_millis,int * nready)122 int MultiNdbWakeupHandler::waitForInput(Ndb** _objs,
123 int _cnt,
124 int min_req,
125 int timeout_millis,
126 int *nready)
127 {
128 /**
129 Initialise object for waiting.
130
131 numNdbsWithCompletedTrans:
132 Keeps track of number of transactions completed and is protected by
133 localWakeupMutexPtr-mutex. It can be set to 0 without mutex protection
134 since the poll owner thread will not access it until we have registered
135 at least one NDB object.
136
137 minNdbsToWake:
138 This is used by both notifyWakeup and notifyTransactionsCompleted to
139 see whether we're currently waiting to be woken up. We always access
140 it protected by the Ndb mutex on the waiter object.
141
142 objs:
143 This is a local array set when waitForInput is called. It is only
144 manipulated by the thread calling waitForInput. So it doesn't need
145 any protection when used.
146
147 cnt:
148 This is a local counter of how many objects we're waiting for, only
149 used by the thread calling waitForInput, so no need to protect it.
150
151 woken:
152 This is set by notifyWakeup to indicate we should wake up even if no
153 NDB objects are done. This is protected by the Ndb mutex on the waiter
154 object.
155 */
156
157 numNdbsWithCompletedTrans = 0;
158 cnt = (Uint32)_cnt;
159 objs = _objs;
160
161 NdbMutex_Lock(wakeNdb->theImpl->m_mutex);
162 ignore_wakeups();
163 NdbMutex_Unlock(wakeNdb->theImpl->m_mutex);
164
165 /*
166 Before sleeping, we register each Ndb, and check whether it already
167 has any completed transactions.
168 */
169 for (Uint32 i = 0; i < cnt; i++)
170 {
171 /* Register the Ndb's */
172 registerNdb(objs[i], i);
173 }
174
175 int ret = -1;
176 bool first = true;
177 const NDB_TICKS start = NdbTick_getCurrentTicks();
178 const int maxTime = timeout_millis;
179 {
180 PollGuard pg(*wakeNdb->theImpl);
181 do
182 {
183 if (first)
184 {
185 set_wakeup(min_req);
186 if (isReadyToWake()) // already enough
187 {
188 pg.wait_for_input(0);
189 // woken = false;
190 ignore_wakeups();
191 ret = 0;
192 break;
193 }
194 wakeNdb->theImpl->theWaiter.set_node(0);
195 wakeNdb->theImpl->theWaiter.set_state(WAIT_TRANS);
196 first = false;
197 }
198 /* PollGuard will put us to sleep until something relevant happens */
199 pg.wait_for_input(timeout_millis > 10 ? 10 : timeout_millis);
200 wakeNdb->theImpl->incClientStat(Ndb::WaitExecCompleteCount, 1);
201
202 if (isReadyToWake())
203 {
204 // woken = false;
205 ignore_wakeups();
206 ret = 0;
207 break;
208 }
209 const NDB_TICKS now = NdbTick_getCurrentTicks();
210 timeout_millis = (maxTime - (int)NdbTick_Elapsed(start,now).milliSec());
211 if (timeout_millis <= 0)
212 {
213 ignore_wakeups();
214 break;
215 }
216 } while (1);
217 }
218 finalize_wait(nready);
219 return ret;
220 }
221
222
swapNdbsInArray(Uint32 indexA,Uint32 indexB)223 void MultiNdbWakeupHandler::swapNdbsInArray(Uint32 indexA, Uint32 indexB)
224 {
225 /* Generally used to move an Ndb object down the list
226 * (bubble sort), so that it is part of a contiguous
227 * list of Ndbs with completed transactions to return
228 * to caller.
229 * If it's already in the given position, no effect
230 */
231 assert(indexA < cnt);
232 assert(indexB < cnt);
233
234 Ndb* a = objs[indexA];
235 Ndb* b = objs[indexB];
236
237 objs[indexA] = b;
238 objs[indexB] = a;
239 }
240
241
notifyTransactionCompleted(Ndb * from)242 void MultiNdbWakeupHandler::notifyTransactionCompleted(Ndb* from)
243 {
244 Uint32 num_completed_trans;
245 if (!wakeNdb->theImpl->is_locked_for_poll())
246 {
247 wakeNdb->theImpl->lock_client();
248 }
249
250 assert(wakeNdb->theImpl->wakeHandler == this);
251 assert(from != wakeNdb);
252
253 /* Some Ndb object has just completed another transaction.
254 Ensure that it's in the completed Ndbs list
255 */
256 NdbMutex_Lock(localWakeupMutexPtr);
257 numNdbsWithCompletedTrans++;
258 num_completed_trans = numNdbsWithCompletedTrans;
259 NdbMutex_Unlock(localWakeupMutexPtr);
260
261 if (!is_wakeups_ignored() && num_completed_trans >= minNdbsToWake)
262 {
263 wakeNdb->theImpl->theWaiter.signal(NO_WAIT); // wakeup client thread
264 }
265 return;
266 }
267
268
notifyWakeup()269 void MultiNdbWakeupHandler::notifyWakeup()
270 {
271 if (!wakeNdb->theImpl->is_locked_for_poll())
272 {
273 wakeNdb->theImpl->lock_client();
274 }
275 assert(wakeNdb->theImpl->wakeHandler == this);
276
277 woken = true;
278 /* Wakeup client thread, using 'waiter' Ndb */
279 if (!is_wakeups_ignored())
280 {
281 wakeNdb->theImpl->theWaiter.signal(NO_WAIT);
282 }
283 }
284
285
ignore_wakeups()286 void MultiNdbWakeupHandler::ignore_wakeups()
287 {
288 /**
289 We set minNdbsToWake to MAX value to ensure there won't be any
290 attempts to wake us up until we're ready to be woken.
291 */
292 minNdbsToWake = ~Uint32(0);
293 }
294
is_wakeups_ignored()295 bool MultiNdbWakeupHandler::is_wakeups_ignored()
296 {
297 return (minNdbsToWake == (~Uint32(0)));
298 }
299
300
set_wakeup(Uint32 wakeup_count)301 void MultiNdbWakeupHandler::set_wakeup(Uint32 wakeup_count)
302 {
303 minNdbsToWake = wakeup_count;
304 }
305
306
307
isReadyToWake() const308 bool MultiNdbWakeupHandler::isReadyToWake() const
309 {
310 NdbMutex_Lock(localWakeupMutexPtr);
311 bool ret = ((numNdbsWithCompletedTrans >= minNdbsToWake) || woken);
312 NdbMutex_Unlock(localWakeupMutexPtr);
313 return ret;
314 }
315