1 /*
2 Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24 #include <ndb_global.h>
25 #include <stdlib.h>
26 #include "NdbWaitGroup.hpp"
27 #include "WakeupHandler.hpp"
28 #include "ndb_cluster_connection.hpp"
29 #include "TransporterFacade.hpp"
30 #include "ndb_cluster_connection_impl.hpp"
31 #include "NdbImpl.hpp"
32
33
round_up(int num,int factor)34 int round_up(int num, int factor)
35 {
36 return num + factor - 1 - (num - 1) % factor;
37 }
38
39
NdbWaitGroup(Ndb_cluster_connection * _conn,int ndbs)40 NdbWaitGroup::NdbWaitGroup(Ndb_cluster_connection *_conn, int ndbs) :
41 m_pos_new(0),
42 m_pos_wait(0),
43 m_pos_ready(0),
44 m_multiWaitHandler(0),
45 m_pos_overflow(0),
46 m_nodeId(0),
47 m_active_version(0),
48 m_conn(_conn)
49 {
50 const int pointers_per_cache_line = NDB_CL / sizeof(Ndb *);
51
52 /* round array size up to a whole cache line */
53 m_array_size = round_up(ndbs, pointers_per_cache_line);
54
55 /* m_pos is used in the version 1 api */
56 m_pos = m_array_size;
57
58 /* overflow list is 1/8 of array, also rounded up */
59 m_overflow_size = m_array_size / 8;
60 m_overflow_size = round_up(m_overflow_size, pointers_per_cache_line);
61
62 /* Return point is somewhere in the array */
63 m_pos_return = m_array_size / 3;
64
65 /* Allocate the main array and the overflow list */
66 m_array = (Ndb **) calloc(m_array_size, sizeof(Ndb *));
67 m_overflow = (Ndb **) calloc(m_overflow_size, sizeof(Ndb *));
68
69 /* Call into the TransporterFacade to set up wakeups */
70 bool rc = m_conn->m_impl.m_transporter_facade->setupWakeup();
71 require(rc);
72
73 /* Get a new Ndb object to be the dedicated "wakeup object" for the group */
74 m_wakeNdb = new Ndb(m_conn);
75 require(m_wakeNdb);
76 m_wakeNdb->init(1);
77 m_nodeId = m_wakeNdb->theNode;
78
79 /* Get a wakeup handler */
80 m_multiWaitHandler = new MultiNdbWakeupHandler(m_wakeNdb);
81 require(m_multiWaitHandler);
82 }
83
84
~NdbWaitGroup()85 NdbWaitGroup::~NdbWaitGroup()
86 {
87 delete m_multiWaitHandler;
88 delete m_wakeNdb;
89 free(m_array);
90 free(m_overflow);
91 }
92
93
wakeup()94 void NdbWaitGroup::wakeup()
95 {
96 m_conn->m_impl.m_transporter_facade->requestWakeup();
97 }
98
99
100 /* Old-API addNdb()
101 */
addNdb(Ndb * ndb)102 bool NdbWaitGroup::addNdb(Ndb *ndb)
103 {
104 if(unlikely(ndb->theNode != Uint32(m_nodeId)))
105 {
106 return false; // Ndb belongs to wrong ndb_cluster_connection
107 }
108
109 if(unlikely(m_pos == 0))
110 {
111 return false; // array is full
112 }
113
114 m_array[--m_pos] = ndb;
115 return true;
116 }
117
118
119 /* Old-API version of wait().
120 It is single-threaded without any concurrent push().
121 */
wait(Ndb ** & arrayHead,Uint32 timeout_millis,int min_ndbs)122 int NdbWaitGroup::wait(Ndb ** & arrayHead /* out */,
123 Uint32 timeout_millis,
124 int min_ndbs)
125 {
126 int nready;
127 int nwait = m_array_size - m_pos;
128 Ndb ** ndblist = m_array + m_pos;
129 arrayHead = NULL;
130 m_active_version = 1;
131 int wait_rc = m_multiWaitHandler->waitForInput(ndblist,
132 nwait,
133 min_ndbs,
134 timeout_millis,
135 &nready);
136 if(wait_rc == 0)
137 {
138 arrayHead = ndblist;
139 m_pos += nready;
140 return nready;
141 }
142 return wait_rc ? -1 : nready;
143 }
144
145
146 /* Version 2 API */
147
148 /*
149 QUEUE
150
151 A = Array m_array
152 MAX = Array Size m_array_size
153 RETURNPOINT = Some point between 0 and MAX m_pos_return
154 N = New (recently pushed to list) NC = New Cursor m_pos_new
155 W = Waiting (on NDB network i/o) WC = Waiting Cursor m_pos_wait
156 R = Returned (from NDB, ready to poll) RC = Returned Cursor m_pos_ready
157
158 init: NC = WC = RC = 0.
159
160 push: A[NC] = X
161 NC += 1 # NC is index of next new item
162 If(NC == MAX) List is full
163
164 wait: # Maintenance tasks:
165 (1) If list is full, resize
166 (2) If NC > RETURNPOINT, shift list downwad so A[WC] becomes A[0]
167 # Wait for all the newly arrived items
168 nwait = NC - WC
169 nready = waitForInput(WC, nwait)
170 WC += nready # WC is start index of the next wait
171
172 pop: IF (RC != WC)
173 RETURNVAL = A[RC]
174 RC += 1 # RC is index of next ready item
175
176 Many threads can push and pop; only one thread can use wait.
177 */
178
push(Ndb * ndb)179 int NdbWaitGroup::push(Ndb *ndb)
180 {
181 if(unlikely(ndb->theNode != Uint32(m_nodeId)))
182 {
183 return -1;
184 }
185
186 lock();
187 if(unlikely(m_pos_new == m_array_size)) // array is full
188 {
189 if(unlikely(m_pos_overflow == m_overflow_size)) // overflow list is full
190 {
191 m_overflow_size *= 2;
192 assert(m_overflow_size < NDBWAITGROUP_MAX_SIZE);
193 m_overflow = (Ndb **) realloc(m_overflow, m_overflow_size * sizeof(Ndb*));
194 }
195 m_overflow[m_pos_overflow++] = ndb;
196 }
197 else
198 {
199 m_array[m_pos_new++] = ndb; // common case
200 }
201 unlock();
202
203 return 0;
204 }
205
206
207 /* wait() takes the lock before and after wait (not during).
208 In 7.2, shifting or resizing the list requires a PollGuard,
209 but in 7.3, the underlying wakeupHandler will only touch the
210 array during wait() so no lock is needed.
211 */
wait(Uint32 timeout_millis,int pct_ready)212 int NdbWaitGroup::wait(Uint32 timeout_millis, int pct_ready)
213 {
214 int nready, nwait;
215 m_active_version = 2;
216 assert(pct_ready >=0 && pct_ready <= 100);
217
218 lock();
219
220 /* Resize list if full */
221 if(unlikely(m_pos_new == m_array_size))
222 {
223 resize_list();
224 }
225
226 /* On last pop, if list has advanced past return point, shift back to 0 */
227 if(m_pos_ready && /* Not at zero */
228 m_pos_ready == m_pos_wait && /* Cannot currently pop */
229 m_pos_new > m_pos_return) /* NC > RETURNPOINT */
230 {
231 for(Uint32 i = m_pos_wait; i < m_pos_new; i++)
232 {
233 m_array[i - m_pos_wait] = m_array[i];
234 }
235 m_pos_new -= m_pos_wait;
236 m_pos_ready = m_pos_wait = 0;
237 }
238
239 /* Number of items to wait for */
240 nwait = m_pos_new - m_pos_wait;
241 unlock();
242
243 /********** ENTER WAIT **********/
244 int min_ndbs = nwait * pct_ready / 100 ;
245 if(min_ndbs == 0 && pct_ready > 0) min_ndbs = 1;
246 Ndb ** arrayHead = m_array + m_pos_wait;
247 m_multiWaitHandler->waitForInput(arrayHead,
248 nwait,
249 min_ndbs,
250 timeout_millis,
251 &nready);
252 /********** EXIT WAIT *********/
253
254 lock();
255 m_pos_wait += nready;
256 unlock();
257
258 return nready;
259 }
260
261
pop()262 Ndb * NdbWaitGroup::pop()
263 {
264 Ndb * r = 0;
265
266 lock();
267 if(m_pos_ready < m_pos_wait)
268 {
269 r = m_array[m_pos_ready++];
270 }
271 unlock();
272
273 return r;
274 }
275
276
277 /* Private internal methods */
278
resize_list()279 void NdbWaitGroup::resize_list()
280 {
281 Uint32 size_required = m_array_size + m_pos_overflow + 1;
282 while(m_array_size < size_required)
283 {
284 m_array_size *= 2;
285 m_pos_return *= 2;
286 }
287 assert(m_array_size < NDBWAITGROUP_MAX_SIZE);
288
289 /* Reallocate */
290 m_array = (Ndb **) realloc(m_array, m_array_size * sizeof(Ndb *));
291
292 /* Copy from the overflow list to the new list. */
293 while(m_pos_overflow)
294 {
295 m_array[m_pos_new++] = m_overflow[--m_pos_overflow];
296 }
297 }
298
299