1 /*
2    Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 #include <ndb_global.h>
25 #include <stdlib.h>
26 #include "NdbWaitGroup.hpp"
27 #include "WakeupHandler.hpp"
28 #include "ndb_cluster_connection.hpp"
29 #include "TransporterFacade.hpp"
30 #include "ndb_cluster_connection_impl.hpp"
31 #include "NdbImpl.hpp"
32 
33 
round_up(int num,int factor)34 int round_up(int num, int factor)
35 {
36   return num + factor - 1 - (num - 1) % factor;
37 }
38 
39 
NdbWaitGroup(Ndb_cluster_connection * _conn,int ndbs)40 NdbWaitGroup::NdbWaitGroup(Ndb_cluster_connection *_conn, int ndbs) :
41   m_pos_new(0),
42   m_pos_wait(0),
43   m_pos_ready(0),
44   m_multiWaitHandler(0),
45   m_pos_overflow(0),
46   m_nodeId(0),
47   m_active_version(0),
48   m_conn(_conn)
49 {
50   const int pointers_per_cache_line = NDB_CL / sizeof(Ndb *);
51 
52   /* round array size up to a whole cache line */
53   m_array_size = round_up(ndbs, pointers_per_cache_line);
54 
55   /* m_pos is used in the version 1 api */
56   m_pos = m_array_size;
57 
58   /* overflow list is 1/8 of array, also rounded up */
59   m_overflow_size = m_array_size / 8;
60   m_overflow_size = round_up(m_overflow_size, pointers_per_cache_line);
61 
62   /* Return point is somewhere in the array */
63   m_pos_return = m_array_size / 3;
64 
65   /* Allocate the main array and the overflow list */
66   m_array = (Ndb **) calloc(m_array_size, sizeof(Ndb *));
67   m_overflow = (Ndb **) calloc(m_overflow_size, sizeof(Ndb *));
68 
69   /* Call into the TransporterFacade to set up wakeups */
70   bool rc = m_conn->m_impl.m_transporter_facade->setupWakeup();
71   require(rc);
72 
73   /* Get a new Ndb object to be the dedicated "wakeup object" for the group */
74   m_wakeNdb = new Ndb(m_conn);
75   require(m_wakeNdb);
76   m_wakeNdb->init(1);
77   m_nodeId = m_wakeNdb->theNode;
78 
79   /* Get a wakeup handler */
80   m_multiWaitHandler = new MultiNdbWakeupHandler(m_wakeNdb);
81   require(m_multiWaitHandler);
82 }
83 
84 
~NdbWaitGroup()85 NdbWaitGroup::~NdbWaitGroup()
86 {
87   delete m_multiWaitHandler;
88   delete m_wakeNdb;
89   free(m_array);
90   free(m_overflow);
91 }
92 
93 
wakeup()94 void NdbWaitGroup::wakeup()
95 {
96   m_conn->m_impl.m_transporter_facade->requestWakeup();
97 }
98 
99 
100 /*  Old-API addNdb()
101 */
addNdb(Ndb * ndb)102 bool NdbWaitGroup::addNdb(Ndb *ndb)
103 {
104   if(unlikely(ndb->theNode != Uint32(m_nodeId)))
105   {
106     return false; // Ndb belongs to wrong ndb_cluster_connection
107   }
108 
109   if(unlikely(m_pos == 0))
110   {
111     return false; // array is full
112   }
113 
114   m_array[--m_pos] = ndb;
115   return true;
116 }
117 
118 
119 /*  Old-API version of wait().
120     It is single-threaded without any concurrent push().
121 */
wait(Ndb ** & arrayHead,Uint32 timeout_millis,int min_ndbs)122 int NdbWaitGroup::wait(Ndb ** & arrayHead    /* out */,
123                        Uint32 timeout_millis,
124                        int min_ndbs)
125 {
126   int nready;
127   int nwait = m_array_size - m_pos;
128   Ndb ** ndblist = m_array + m_pos;
129   arrayHead = NULL;
130   m_active_version = 1;
131   int wait_rc = m_multiWaitHandler->waitForInput(ndblist,
132                                                  nwait,
133                                                  min_ndbs,
134                                                  timeout_millis,
135                                                  &nready);
136   if(wait_rc == 0)
137   {
138     arrayHead = ndblist;
139     m_pos += nready;
140     return nready;
141   }
142   return wait_rc ? -1 : nready;
143 }
144 
145 
146 /* Version 2 API */
147 
148 /*
149     QUEUE
150 
151     A = Array                                                       m_array
152     MAX = Array Size                                                m_array_size
153     RETURNPOINT = Some point between 0 and MAX                      m_pos_return
154     N = New (recently pushed to list)       NC = New Cursor         m_pos_new
155     W = Waiting (on NDB network i/o)        WC = Waiting Cursor     m_pos_wait
156     R = Returned (from NDB, ready to poll)  RC = Returned Cursor    m_pos_ready
157 
158     init:  NC = WC = RC = 0.
159 
160     push:  A[NC] = X
161            NC += 1                      # NC is index of next new item
162            If(NC == MAX) List is full
163 
164     wait:  # Maintenance tasks:
165               (1) If list is full, resize
166               (2) If NC > RETURNPOINT, shift list downwad so A[WC] becomes A[0]
167            # Wait for all the newly arrived items
168            nwait = NC - WC
169            nready = waitForInput(WC, nwait)
170            WC += nready                 # WC is start index of the next wait
171 
172     pop:   IF (RC != WC)
173              RETURNVAL = A[RC]
174              RC += 1                    # RC is index of next ready item
175 
176     Many threads can push and pop; only one thread can use wait.
177 */
178 
push(Ndb * ndb)179 int NdbWaitGroup::push(Ndb *ndb)
180 {
181   if(unlikely(ndb->theNode != Uint32(m_nodeId)))
182   {
183     return -1;
184   }
185 
186   lock();
187   if(unlikely(m_pos_new == m_array_size))  // array is full
188   {
189     if(unlikely(m_pos_overflow == m_overflow_size))  // overflow list is full
190     {
191       m_overflow_size *= 2;
192       assert(m_overflow_size < NDBWAITGROUP_MAX_SIZE);
193       m_overflow = (Ndb **) realloc(m_overflow, m_overflow_size * sizeof(Ndb*));
194     }
195     m_overflow[m_pos_overflow++] = ndb;
196   }
197   else
198   {
199     m_array[m_pos_new++] = ndb;   // common case
200   }
201   unlock();
202 
203   return 0;
204 }
205 
206 
207 /* wait() takes the lock before and after wait (not during).
208    In 7.2, shifting or resizing the list requires a PollGuard,
209    but in 7.3, the underlying wakeupHandler will only touch the
210    array during wait() so no lock is needed.
211 */
wait(Uint32 timeout_millis,int pct_ready)212 int NdbWaitGroup::wait(Uint32 timeout_millis, int pct_ready)
213 {
214   int nready, nwait;
215   m_active_version = 2;
216   assert(pct_ready >=0 && pct_ready <= 100);
217 
218   lock();
219 
220   /* Resize list if full */
221   if(unlikely(m_pos_new == m_array_size))
222   {
223     resize_list();
224   }
225 
226   /* On last pop, if list has advanced past return point, shift back to 0 */
227   if(m_pos_ready &&                  /* Not at zero */
228      m_pos_ready == m_pos_wait &&    /* Cannot currently pop */
229      m_pos_new > m_pos_return)       /* NC > RETURNPOINT */
230   {
231     for(Uint32 i = m_pos_wait; i < m_pos_new; i++)
232     {
233       m_array[i - m_pos_wait] = m_array[i];
234     }
235     m_pos_new -= m_pos_wait;
236     m_pos_ready = m_pos_wait = 0;
237   }
238 
239   /* Number of items to wait for */
240   nwait = m_pos_new - m_pos_wait;
241   unlock();
242 
243   /********** ENTER WAIT **********/
244   int min_ndbs = nwait * pct_ready / 100 ;
245   if(min_ndbs == 0 && pct_ready > 0) min_ndbs = 1;
246   Ndb ** arrayHead = m_array + m_pos_wait;
247   m_multiWaitHandler->waitForInput(arrayHead,
248                                    nwait,
249                                    min_ndbs,
250                                    timeout_millis,
251                                    &nready);
252   /********** EXIT WAIT *********/
253 
254   lock();
255   m_pos_wait += nready;
256   unlock();
257 
258   return nready;
259 }
260 
261 
pop()262 Ndb * NdbWaitGroup::pop()
263 {
264   Ndb * r = 0;
265 
266   lock();
267   if(m_pos_ready < m_pos_wait)
268   {
269     r = m_array[m_pos_ready++];
270   }
271   unlock();
272 
273   return r;
274 }
275 
276 
277 /* Private internal methods */
278 
resize_list()279 void NdbWaitGroup::resize_list()
280 {
281   Uint32 size_required = m_array_size + m_pos_overflow + 1;
282   while(m_array_size < size_required)
283   {
284     m_array_size *= 2;
285     m_pos_return *= 2;
286   }
287   assert(m_array_size < NDBWAITGROUP_MAX_SIZE);
288 
289   /* Reallocate */
290   m_array = (Ndb **) realloc(m_array, m_array_size * sizeof(Ndb *));
291 
292   /* Copy from the overflow list to the new list. */
293   while(m_pos_overflow)
294   {
295     m_array[m_pos_new++] = m_overflow[--m_pos_overflow];
296   }
297 }
298 
299