1 /*
2 Copyright (c) 2013, 2016, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "adapter_global.h"
26 #include "NdbWrapperErrors.h"
27 #include "AsyncNdbContext.h"
28 #include "AsyncMethodCall.h"
29 #include "TransactionImpl.h"
30
31 /* Thread starter, for pthread_create()
32 */
run_ndb_listener_thread(void * v)33 PTHREAD_RETURN_TYPE run_ndb_listener_thread(void *v) {
34 AsyncNdbContext * ctx = (AsyncNdbContext *) v;
35 ctx->runListenerThread();
36 return PTHREAD_RETURN_VAL;
37 }
38
39 /* ioCompleted will run in the JavaScript main thread.
40 If the waiter thread called uv_async_send() more than once within an
41 interval, libuv might coalesce those into a single call here.
42 */
ioCompleted(uv_async_t * ndbWaitLoop)43 void ioCompleted(uv_async_t *ndbWaitLoop) {
44 AsyncNdbContext * ctx = (AsyncNdbContext *) ndbWaitLoop->data;
45 ctx->completeCallbacks();
46 }
47
48 /* Class AsyncExecCall
49 */
50 class AsyncExecCall : public AsyncAsyncCall<int, NdbTransaction> {
51 public:
AsyncExecCall(NdbTransaction * tx,v8::Handle<v8::Function> jsCallback)52 AsyncExecCall(NdbTransaction *tx, v8::Handle<v8::Function> jsCallback) :
53 AsyncAsyncCall<int, NdbTransaction>(tx, jsCallback,
54 getNdbErrorIfLessThanZero<int, NdbTransaction>) {};
55 TransactionImpl * closeContext;
56
closeTransaction()57 void closeTransaction() {
58 if(closeContext) {
59 DEBUG_PRINT("Closing");
60 closeContext->closeTransaction();
61 closeContext->registerClose();
62 }
63 }
64 };
65
66 /* ndbTxCompleted is the callback on tx->executeAsynch().
67 Cast the void pointer back to AsyncExecCall and set its return value.
68 TODO: Is a HandleScope needed and if so who owns it??
69 */
ndbTxCompleted(int status,NdbTransaction * tx,void * v)70 void ndbTxCompleted(int status, NdbTransaction *tx, void *v) {
71 DEBUG_PRINT("ndbTxCompleted: %d %p %p", status, tx, v);
72 AsyncExecCall * mcallptr = (AsyncExecCall *) v;
73 mcallptr->return_val = status;
74 mcallptr->handleErrors();
75 mcallptr->closeTransaction();
76 tx->getNdb()->setCustomData(mcallptr);
77 }
78
79
80 /* ====== Class AsyncNdbContext ====== */
81
82 /* Constructor
83 */
AsyncNdbContext(Ndb_cluster_connection * conn)84 AsyncNdbContext::AsyncNdbContext(Ndb_cluster_connection *conn) :
85 connection(conn),
86 shutdown_flag()
87 {
88 DEBUG_MARKER(UDEB_DEBUG);
89
90 /* Create the multi-wait group */
91 waitgroup = connection->create_ndb_wait_group(WAIT_GROUP_SIZE);
92
93 /* Register the completion function */
94 uv_async_init(uv_default_loop(), & async_handle, ioCompleted);
95
96 /* Store some context in the uv_async_t */
97 async_handle.data = (void *) this;
98
99 /* Start the listener thread. */
100 uv_thread_create(& listener_thread_id, run_ndb_listener_thread, (void *) this);
101 }
102
103
104 /* Destructor
105 */
~AsyncNdbContext()106 AsyncNdbContext::~AsyncNdbContext()
107 {
108 uv_thread_join(& listener_thread_id);
109 connection->release_ndb_wait_group(waitgroup);
110 }
111
112
113 /* Methods
114 */
115
116 /* This could run in a UV worker thread (JavaScript async execution)
117 or possibly in the JavaScript thread (JavaScript sync execution)
118 */
executeAsynch(TransactionImpl * txc,NdbTransaction * tx,int execType,int abortOption,int forceSend,v8::Handle<v8::Function> jsCallback)119 int AsyncNdbContext::executeAsynch(TransactionImpl *txc,
120 NdbTransaction *tx,
121 int execType,
122 int abortOption,
123 int forceSend,
124 v8::Handle<v8::Function> jsCallback) {
125
126 /* Create a container to help pass return values up the JS callback stack */
127 AsyncExecCall * mcallptr = new AsyncExecCall(tx, jsCallback);
128
129 Ndb * ndb = tx->getNdb();
130 DEBUG_PRINT("NdbTransaction:%p:executeAsynch(%d,%d) -- Push: %p",
131 mcallptr->native_obj, execType, abortOption, ndb);
132
133 /* The NdbTransaction should be closed unless execType is NoCommit */
134 mcallptr->closeContext = (execType == NdbTransaction::NoCommit) ? 0 : txc;
135
136 /* send the transaction to NDB */
137 tx->executeAsynch((NdbTransaction::ExecType) execType,
138 ndbTxCompleted,
139 mcallptr,
140 (NdbOperation::AbortOption) abortOption,
141 forceSend);
142
143 #ifdef USE_OLD_MULTIWAIT_API
144 sent_queue.produce(new ListNode<Ndb>(ndb));
145 #else
146 waitgroup->push(ndb);
147 #endif
148
149 /* Notify the waitgroup that there is a new Ndb to wait on */
150 /* TODO: This could depend on forceSend? */
151 waitgroup->wakeup();
152
153 return 1;
154 }
155
156
157 #ifndef USE_OLD_MULTIWAIT_API
158
runListenerThread()159 void * AsyncNdbContext::runListenerThread() {
160 DEBUG_MARKER(UDEB_DEBUG);
161 int wait_timeout_millisec = 100;
162 int pct_ready = 50;
163 bool running = true;
164
165 while(running) {
166 if(shutdown_flag.test()) {
167 DEBUG_PRINT("MULTIWAIT LISTENER GOT SHUTDOWN.");
168 pct_ready = 100; /* One final read of all outstanding items */
169 wait_timeout_millisec = 200;
170 running = false;
171 }
172
173 /* Wait for ready Ndbs */
174 if(waitgroup->wait(wait_timeout_millisec, pct_ready) > 0) {
175 uv_async_send(& async_handle); // => ioCompleted() => completeCallbacks()
176 }
177 }
178
179 return 0;
180 }
181
shutdown()182 void AsyncNdbContext::shutdown() {
183 DEBUG_MARKER(UDEB_DEBUG);
184 shutdown_flag.set();
185 waitgroup->wakeup();
186 }
187
188
completeCallbacks()189 void AsyncNdbContext::completeCallbacks() {
190 AsyncExecCall * mcallptr;
191 Ndb * ndb = waitgroup->pop();
192
193 while(ndb) {
194 DEBUG_PRINT(" -- Pop: %p", ndb);
195 ndb->pollNdb(0, 1); /* runs ndbTxCompleted() */
196 mcallptr = (AsyncExecCall *) ndb->getCustomData();
197 ndb->setCustomData(0);
198 main_thd_complete_async_call(mcallptr);
199 ndb = waitgroup->pop();
200 }
201 }
202
203 #else /* Old Multiwait */
204
205 /* ====== Signals ===== */
206 static int SignalShutdown = 1;
207
runListenerThread()208 void * AsyncNdbContext::runListenerThread() {
209 DEBUG_MARKER(UDEB_DEBUG);
210 ListNode<Ndb> * sentNdbs, * completedNdbs, * currentNode;
211 Ndb * ndb;
212 Ndb ** ready_list;
213 int wait_timeout_millisec = 5000;
214 int min_ready, nwaiting, npending = 0;
215 bool running = true;
216
217 while(running) { // Listener thread main loop
218
219 /* Add new Ndbs to the wait group */
220 sentNdbs = sent_queue.consumeAll();
221 while(sentNdbs != 0) {
222 currentNode = sentNdbs;
223 sentNdbs = sentNdbs->next;
224
225 if(currentNode->signalinfo == SignalShutdown) {
226 running = false;
227 }
228 else {
229 waitgroup->addNdb(currentNode->item);
230 npending++;
231 DEBUG_PRINT("Listener: %d pending", npending);
232 }
233 delete currentNode; // Frees the ListNode from executeAsynch()
234 }
235
236 /* What's the minimum number of ready Ndb's to wake up for? */
237 if(! running) {
238 min_ready = npending; // Wait one final time for all outstanding Ndbs
239 wait_timeout_millisec = 200;
240 }
241 else {
242 int n = npending / 4;
243 min_ready = n > 0 ? n : 1;
244 }
245
246 /* Wait until something is ready to poll */
247 nwaiting = waitgroup->wait(ready_list, wait_timeout_millisec, min_ready);
248
249 completedNdbs = 0;
250 if(nwaiting > 0) {
251 /* Poll the ones that are ready */
252 DEBUG_PRINT("Listener: %d ready", nwaiting);
253 for(int i = 0 ; i < nwaiting ; i++) {
254 npending--;
255 assert(npending >= 0);
256 ndb = ready_list[i];
257 ndb->pollNdb(0, 1); /* runs ndbTxCompleted() */
258 currentNode = new ListNode<Ndb>(ndb);
259 currentNode->next = completedNdbs;
260 completedNdbs = currentNode;
261 }
262
263 /* Publish the completed ones */
264 completed_queue.produce(completedNdbs);
265
266 /* Notify the main thread */
267 uv_async_send(& async_handle);
268 }
269 } // Listener thread main loop
270
271 return 0;
272 }
273
274
275 /* Shut down the context
276 */
shutdown()277 void AsyncNdbContext::shutdown() {
278 DEBUG_MARKER(UDEB_DEBUG);
279 ListNode<Ndb> * finalNode = new ListNode<Ndb>((Ndb *) 0);
280 finalNode->signalinfo = SignalShutdown;
281
282 /* Queue the shutdown node, and wake up the listener thread for it */
283 sent_queue.produce(finalNode);
284 waitgroup->wakeup();
285 }
286
287
288 /* This runs in the JavaScript main thread, at most once per uv_async_send().
289 It dispatches JavaScript callbacks for completed operations.
290 */
completeCallbacks()291 void AsyncNdbContext::completeCallbacks() {
292 ListNode<Ndb> * completedNdbs, * currentNode;
293
294 completedNdbs = completed_queue.consumeAll();
295
296 while(completedNdbs != 0) {
297 currentNode = completedNdbs;
298 Ndb * ndb = currentNode->item;
299 AsyncExecCall * mcallptr = static_cast<AsyncExecCall *>(ndb->getCustomData());
300 ndb->setCustomData(0);
301
302 main_thd_complete_async_call(mcallptr);
303 completedNdbs = currentNode->next;
304
305 delete currentNode; // Frees the ListNode from runListenerThread()
306 }
307 }
308
309 #endif
310