1 /*
2    Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "WakeupHandler.hpp"
26 #include "Ndb.hpp"
27 #include "NdbImpl.hpp"
28 #include "trp_client.hpp"
29 
30 // ***** Multiwait handler ****
31 
32 /**
33  * An instance of this class is used when a single thread
34  * wants to wait for the asynchronous completion of transactions
35  * on multiple Ndb objects.
36  * When the thread starts waiting, all Ndb objects are checked
37  * for CompletedTransactions, and their wakeHandler is set to
38  * poin to the same MultiNdbWakeupHandler object.  The thread
39  * is then put to sleep / polls on a designated Ndb object.
40  *
41  * As transactions complete, the MultiNdbWakeHandler object
42  * moves their Ndb objects to the start of the passed Ndb
43  * object list and determines whether enough have completed
44  * to wake the waiting thread.
45  * When enough have completed, the waiting thread is woken via
46  * the designated Ndb object.
47  *
48  * The design only supports one instance of the MultiNdbWakeupHandler
49  * object per ndb cluster connection and this can only be used from
50  * one thread.
51  */
52 
MultiNdbWakeupHandler(Ndb * _wakeNdb)53 MultiNdbWakeupHandler::MultiNdbWakeupHandler(Ndb* _wakeNdb)
54   : wakeNdb(_wakeNdb)
55 {
56   localWakeupMutexPtr = NdbMutex_Create();
57   assert(localWakeupMutexPtr);
58   /* Register the waiter Ndb to receive wakeups for all Ndbs in the group */
59   PollGuard pg(* wakeNdb->theImpl);
60   ignore_wakeups();
61   bool rc = wakeNdb->theImpl->m_transporter_facade->registerForWakeup(wakeNdb->theImpl);
62   require(rc);
63   wakeNdb->theImpl->wakeHandler = this;
64 }
65 
66 
~MultiNdbWakeupHandler()67 MultiNdbWakeupHandler::~MultiNdbWakeupHandler()
68 {
69   if (localWakeupMutexPtr)
70   {
71     NdbMutex_Destroy(localWakeupMutexPtr);
72     localWakeupMutexPtr = NULL;
73   }
74   PollGuard pg(* wakeNdb->theImpl);
75   bool rc = wakeNdb->theImpl->m_transporter_facade->
76     unregisterForWakeup(wakeNdb->theImpl);
77   require(rc);
78 }
79 
80 
finalize_wait(int * nready)81 void MultiNdbWakeupHandler::finalize_wait(int *nready)
82 {
83   Uint32 num_completed_trans = 0;
84   for (Uint32 i = 0; i < cnt; i++)
85   {
86     Ndb *obj = objs[i];
87 
88     NdbMutex_Lock(obj->theImpl->m_mutex);
89     if (obj->theNoOfCompletedTransactions)
90     {
91       swapNdbsInArray(i, num_completed_trans);
92       num_completed_trans++;
93     }
94     unregisterNdb(obj);
95     NdbMutex_Unlock(obj->theImpl->m_mutex);
96   }
97   *nready = num_completed_trans;
98 }
99 
100 
registerNdb(Ndb * obj,Uint32 pos)101 void MultiNdbWakeupHandler::registerNdb(Ndb *obj, Uint32 pos)
102 {
103   NdbMutex_Lock(obj->theImpl->m_mutex);
104   obj->theImpl->wakeHandler = this;
105   /* It may already have some completed transactions */
106   if (obj->theNoOfCompletedTransactions)
107   {
108     NdbMutex_Lock(localWakeupMutexPtr);
109     numNdbsWithCompletedTrans++;
110     NdbMutex_Unlock(localWakeupMutexPtr);
111   }
112   NdbMutex_Unlock(obj->theImpl->m_mutex);
113 }
114 
115 
unregisterNdb(Ndb * obj)116 void MultiNdbWakeupHandler::unregisterNdb(Ndb *obj)
117 {
118   obj->theImpl->wakeHandler = 0;
119 }
120 
121 
waitForInput(Ndb ** _objs,int _cnt,int min_req,int timeout_millis,int * nready)122 int MultiNdbWakeupHandler::waitForInput(Ndb** _objs,
123                                         int _cnt,
124                                         int min_req,
125                                         int timeout_millis,
126                                         int *nready)
127 {
128   /**
129     Initialise object for waiting.
130 
131     numNdbsWithCompletedTrans:
132     Keeps track of number of transactions completed and is protected by
133     localWakeupMutexPtr-mutex. It can be set to 0 without mutex protection
134     since the poll owner thread will not access it until we have registered
135     at least one NDB object.
136 
137     minNdbsToWake:
138     This is used by both notifyWakeup and notifyTransactionsCompleted to
139     see whether we're currently waiting to be woken up. We always access
140     it protected by the Ndb mutex on the waiter object.
141 
142     objs:
143     This is a local array set when waitForInput is called. It is only
144     manipulated by the thread calling waitForInput. So it doesn't need
145     any protection when used.
146 
147     cnt:
148     This is a local counter of how many objects we're waiting for, only
149     used by the thread calling waitForInput, so no need to protect it.
150 
151     woken:
152     This is set by notifyWakeup to indicate we should wake up even if no
153     NDB objects are done. This is protected by the Ndb mutex on the waiter
154     object.
155   */
156 
157   numNdbsWithCompletedTrans = 0;
158   cnt = (Uint32)_cnt;
159   objs = _objs;
160 
161   NdbMutex_Lock(wakeNdb->theImpl->m_mutex);
162   ignore_wakeups();
163   NdbMutex_Unlock(wakeNdb->theImpl->m_mutex);
164 
165   /*
166     Before sleeping, we register each Ndb, and check whether it already
167     has any completed transactions.
168   */
169   for (Uint32 i = 0; i < cnt; i++)
170   {
171     /* Register the Ndb's */
172     registerNdb(objs[i], i);
173   }
174 
175   int ret = -1;
176   bool first = true;
177   const NDB_TICKS start = NdbTick_getCurrentTicks();
178   const int maxTime = timeout_millis;
179   {
180     PollGuard pg(*wakeNdb->theImpl);
181     do
182     {
183       if (first)
184       {
185         set_wakeup(min_req);
186         if (isReadyToWake())  // already enough
187         {
188           pg.wait_for_input(0);
189           // woken = false;
190           ignore_wakeups();
191           ret = 0;
192           break;
193         }
194         wakeNdb->theImpl->theWaiter.set_node(0);
195         wakeNdb->theImpl->theWaiter.set_state(WAIT_TRANS);
196         first = false;
197       }
198       /* PollGuard will put us to sleep until something relevant happens */
199       pg.wait_for_input(timeout_millis > 10 ? 10 : timeout_millis);
200       wakeNdb->theImpl->incClientStat(Ndb::WaitExecCompleteCount, 1);
201 
202       if (isReadyToWake())
203       {
204         // woken = false;
205         ignore_wakeups();
206         ret = 0;
207         break;
208       }
209       const NDB_TICKS now = NdbTick_getCurrentTicks();
210       timeout_millis = (maxTime - (int)NdbTick_Elapsed(start,now).milliSec());
211       if (timeout_millis <= 0)
212       {
213         ignore_wakeups();
214         break;
215       }
216     } while (1);
217   }
218   finalize_wait(nready);
219   return ret;
220 }
221 
222 
swapNdbsInArray(Uint32 indexA,Uint32 indexB)223 void MultiNdbWakeupHandler::swapNdbsInArray(Uint32 indexA, Uint32 indexB)
224 {
225   /* Generally used to move an Ndb object down the list
226    * (bubble sort), so that it is part of a contiguous
227    * list of Ndbs with completed transactions to return
228    * to caller.
229    * If it's already in the given position, no effect
230    */
231   assert(indexA < cnt);
232   assert(indexB < cnt);
233 
234   Ndb* a = objs[indexA];
235   Ndb* b = objs[indexB];
236 
237   objs[indexA] = b;
238   objs[indexB] = a;
239 }
240 
241 
notifyTransactionCompleted(Ndb * from)242 void MultiNdbWakeupHandler::notifyTransactionCompleted(Ndb* from)
243 {
244   Uint32 num_completed_trans;
245   if (!wakeNdb->theImpl->is_locked_for_poll())
246   {
247     wakeNdb->theImpl->lock_client();
248   }
249 
250   assert(wakeNdb->theImpl->wakeHandler == this);
251   assert(from != wakeNdb);
252 
253   /* Some Ndb object has just completed another transaction.
254      Ensure that it's in the completed Ndbs list
255   */
256   NdbMutex_Lock(localWakeupMutexPtr);
257   numNdbsWithCompletedTrans++;
258   num_completed_trans = numNdbsWithCompletedTrans;
259   NdbMutex_Unlock(localWakeupMutexPtr);
260 
261   if (!is_wakeups_ignored() && num_completed_trans >= minNdbsToWake)
262   {
263     wakeNdb->theImpl->theWaiter.signal(NO_WAIT);    // wakeup client thread
264   }
265   return;
266 }
267 
268 
notifyWakeup()269 void MultiNdbWakeupHandler::notifyWakeup()
270 {
271   if (!wakeNdb->theImpl->is_locked_for_poll())
272   {
273     wakeNdb->theImpl->lock_client();
274   }
275   assert(wakeNdb->theImpl->wakeHandler == this);
276 
277   woken = true;
278   /* Wakeup client thread, using 'waiter' Ndb */
279   if (!is_wakeups_ignored())
280   {
281     wakeNdb->theImpl->theWaiter.signal(NO_WAIT);
282   }
283 }
284 
285 
ignore_wakeups()286 void MultiNdbWakeupHandler::ignore_wakeups()
287 {
288   /**
289     We set minNdbsToWake to MAX value to ensure there won't be any
290     attempts to wake us up until we're ready to be woken.
291   */
292   minNdbsToWake = ~Uint32(0);
293 }
294 
is_wakeups_ignored()295 bool MultiNdbWakeupHandler::is_wakeups_ignored()
296 {
297   return (minNdbsToWake == (~Uint32(0)));
298 }
299 
300 
set_wakeup(Uint32 wakeup_count)301 void MultiNdbWakeupHandler::set_wakeup(Uint32 wakeup_count)
302 {
303   minNdbsToWake = wakeup_count;
304 }
305 
306 
307 
isReadyToWake() const308 bool MultiNdbWakeupHandler::isReadyToWake() const
309 {
310   NdbMutex_Lock(localWakeupMutexPtr);
311   bool ret = ((numNdbsWithCompletedTrans >= minNdbsToWake) || woken);
312   NdbMutex_Unlock(localWakeupMutexPtr);
313   return ret;
314 }
315