1 /*
2  Copyright (c) 2013, 2016, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License, version 2.0,
6  as published by the Free Software Foundation.
7 
8  This program is also distributed with certain software (including
9  but not limited to OpenSSL) that is licensed under separate terms,
10  as designated in a particular file or component or in included license
11  documentation.  The authors of MySQL hereby grant you an additional
12  permission to link the program and your derivative works with the
13  separately licensed software that they have included with MySQL.
14 
15  This program is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  GNU General Public License, version 2.0, for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with this program; if not, write to the Free Software
22  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23  */
24 
25 #include "adapter_global.h"
26 #include "NdbWrapperErrors.h"
27 #include "AsyncNdbContext.h"
28 #include "AsyncMethodCall.h"
29 #include "TransactionImpl.h"
30 
31 /* Thread starter, for pthread_create()
32 */
run_ndb_listener_thread(void * v)33 PTHREAD_RETURN_TYPE run_ndb_listener_thread(void *v) {
34   AsyncNdbContext * ctx = (AsyncNdbContext *) v;
35   ctx->runListenerThread();
36   return PTHREAD_RETURN_VAL;
37 }
38 
39 /* ioCompleted will run in the JavaScript main thread.
40    If the waiter thread called uv_async_send() more than once within an
41    interval, libuv might coalesce those into a single call here.
42 */
ioCompleted(uv_async_t * ndbWaitLoop)43 void ioCompleted(uv_async_t *ndbWaitLoop) {
44   AsyncNdbContext * ctx = (AsyncNdbContext *) ndbWaitLoop->data;
45   ctx->completeCallbacks();
46 }
47 
48 /* Class AsyncExecCall
49 */
50 class AsyncExecCall : public AsyncAsyncCall<int, NdbTransaction> {
51 public:
AsyncExecCall(NdbTransaction * tx,v8::Handle<v8::Function> jsCallback)52   AsyncExecCall(NdbTransaction *tx, v8::Handle<v8::Function> jsCallback) :
53     AsyncAsyncCall<int, NdbTransaction>(tx, jsCallback,
54       getNdbErrorIfLessThanZero<int, NdbTransaction>)                        {};
55   TransactionImpl * closeContext;
56 
closeTransaction()57   void closeTransaction() {
58     if(closeContext) {
59       DEBUG_PRINT("Closing");
60       closeContext->closeTransaction();
61       closeContext->registerClose();
62     }
63   }
64 };
65 
66 /* ndbTxCompleted is the callback on tx->executeAsynch().
67    Cast the void pointer back to AsyncExecCall and set its return value.
68    TODO: Is a HandleScope needed and if so who owns it??
69 */
ndbTxCompleted(int status,NdbTransaction * tx,void * v)70 void ndbTxCompleted(int status, NdbTransaction *tx, void *v) {
71   DEBUG_PRINT("ndbTxCompleted: %d %p %p", status, tx, v);
72   AsyncExecCall * mcallptr = (AsyncExecCall *) v;
73   mcallptr->return_val = status;
74   mcallptr->handleErrors();
75   mcallptr->closeTransaction();
76   tx->getNdb()->setCustomData(mcallptr);
77 }
78 
79 
80 /* ====== Class AsyncNdbContext ====== */
81 
82 /* Constructor
83 */
AsyncNdbContext(Ndb_cluster_connection * conn)84 AsyncNdbContext::AsyncNdbContext(Ndb_cluster_connection *conn) :
85   connection(conn),
86   shutdown_flag()
87 {
88   DEBUG_MARKER(UDEB_DEBUG);
89 
90   /* Create the multi-wait group */
91   waitgroup = connection->create_ndb_wait_group(WAIT_GROUP_SIZE);
92 
93   /* Register the completion function */
94   uv_async_init(uv_default_loop(), & async_handle, ioCompleted);
95 
96   /* Store some context in the uv_async_t */
97   async_handle.data = (void *) this;
98 
99   /* Start the listener thread. */
100   uv_thread_create(& listener_thread_id, run_ndb_listener_thread, (void *) this);
101 }
102 
103 
104 /* Destructor
105 */
~AsyncNdbContext()106 AsyncNdbContext::~AsyncNdbContext()
107 {
108   uv_thread_join(& listener_thread_id);
109   connection->release_ndb_wait_group(waitgroup);
110 }
111 
112 
113 /* Methods
114 */
115 
116 /* This could run in a UV worker thread (JavaScript async execution)
117    or possibly in the JavaScript thread (JavaScript sync execution)
118 */
executeAsynch(TransactionImpl * txc,NdbTransaction * tx,int execType,int abortOption,int forceSend,v8::Handle<v8::Function> jsCallback)119 int AsyncNdbContext::executeAsynch(TransactionImpl *txc,
120                                    NdbTransaction *tx,
121                                    int execType,
122                                    int abortOption,
123                                    int forceSend,
124                                    v8::Handle<v8::Function> jsCallback) {
125 
126   /* Create a container to help pass return values up the JS callback stack */
127   AsyncExecCall * mcallptr = new AsyncExecCall(tx, jsCallback);
128 
129   Ndb * ndb = tx->getNdb();
130   DEBUG_PRINT("NdbTransaction:%p:executeAsynch(%d,%d) -- Push: %p",
131               mcallptr->native_obj, execType, abortOption, ndb);
132 
133   /* The NdbTransaction should be closed unless execType is NoCommit */
134   mcallptr->closeContext = (execType == NdbTransaction::NoCommit) ? 0 : txc;
135 
136   /* send the transaction to NDB */
137   tx->executeAsynch((NdbTransaction::ExecType) execType,
138                     ndbTxCompleted,
139                     mcallptr,
140                     (NdbOperation::AbortOption) abortOption,
141                     forceSend);
142 
143 #ifdef USE_OLD_MULTIWAIT_API
144   sent_queue.produce(new ListNode<Ndb>(ndb));
145 #else
146   waitgroup->push(ndb);
147 #endif
148 
149   /* Notify the waitgroup that there is a new Ndb to wait on */
150   /* TODO: This could depend on forceSend? */
151   waitgroup->wakeup();
152 
153   return 1;
154 }
155 
156 
157 #ifndef USE_OLD_MULTIWAIT_API
158 
runListenerThread()159 void * AsyncNdbContext::runListenerThread() {
160   DEBUG_MARKER(UDEB_DEBUG);
161   int wait_timeout_millisec = 100;
162   int pct_ready = 50;
163   bool running = true;
164 
165   while(running) {
166     if(shutdown_flag.test()) {
167       DEBUG_PRINT("MULTIWAIT LISTENER GOT SHUTDOWN.");
168       pct_ready = 100;    /* One final read of all outstanding items */
169       wait_timeout_millisec = 200;
170       running = false;
171     }
172 
173     /* Wait for ready Ndbs */
174     if(waitgroup->wait(wait_timeout_millisec, pct_ready) > 0) {
175       uv_async_send(& async_handle);  // => ioCompleted() => completeCallbacks()
176     }
177   }
178 
179   return 0;
180 }
181 
shutdown()182 void AsyncNdbContext::shutdown() {
183   DEBUG_MARKER(UDEB_DEBUG);
184   shutdown_flag.set();
185   waitgroup->wakeup();
186 }
187 
188 
completeCallbacks()189 void AsyncNdbContext::completeCallbacks() {
190   AsyncExecCall * mcallptr;
191   Ndb * ndb = waitgroup->pop();
192 
193   while(ndb) {
194     DEBUG_PRINT("                                           -- Pop:  %p", ndb);
195     ndb->pollNdb(0, 1);  /* runs ndbTxCompleted() */
196     mcallptr = (AsyncExecCall *) ndb->getCustomData();
197     ndb->setCustomData(0);
198     main_thd_complete_async_call(mcallptr);
199     ndb = waitgroup->pop();
200   }
201 }
202 
203 #else     /* Old Multiwait */
204 
205 /* ====== Signals ===== */
206 static int SignalShutdown = 1;
207 
runListenerThread()208 void * AsyncNdbContext::runListenerThread() {
209   DEBUG_MARKER(UDEB_DEBUG);
210   ListNode<Ndb> * sentNdbs, * completedNdbs, * currentNode;
211   Ndb * ndb;
212   Ndb ** ready_list;
213   int wait_timeout_millisec = 5000;
214   int min_ready, nwaiting, npending = 0;
215   bool running = true;
216 
217   while(running) {  // Listener thread main loop
218 
219     /* Add new Ndbs to the wait group */
220     sentNdbs = sent_queue.consumeAll();
221     while(sentNdbs != 0) {
222       currentNode = sentNdbs;
223       sentNdbs = sentNdbs->next;
224 
225       if(currentNode->signalinfo == SignalShutdown) {
226         running = false;
227       }
228       else {
229         waitgroup->addNdb(currentNode->item);
230         npending++;
231         DEBUG_PRINT("Listener: %d pending", npending);
232       }
233       delete currentNode;   // Frees the ListNode from executeAsynch()
234     }
235 
236     /* What's the minimum number of ready Ndb's to wake up for? */
237     if(! running) {
238       min_ready = npending;  // Wait one final time for all outstanding Ndbs
239       wait_timeout_millisec = 200;
240     }
241     else {
242       int n = npending / 4;
243       min_ready = n > 0 ? n : 1;
244     }
245 
246     /* Wait until something is ready to poll */
247     nwaiting = waitgroup->wait(ready_list, wait_timeout_millisec, min_ready);
248 
249     completedNdbs = 0;
250     if(nwaiting > 0) {
251       /* Poll the ones that are ready */
252       DEBUG_PRINT("Listener: %d ready", nwaiting);
253       for(int i = 0 ; i < nwaiting ; i++) {
254         npending--;
255         assert(npending >= 0);
256         ndb = ready_list[i];
257         ndb->pollNdb(0, 1);  /* runs ndbTxCompleted() */
258         currentNode = new ListNode<Ndb>(ndb);
259         currentNode->next = completedNdbs;
260         completedNdbs = currentNode;
261       }
262 
263       /* Publish the completed ones */
264       completed_queue.produce(completedNdbs);
265 
266       /* Notify the main thread */
267       uv_async_send(& async_handle);
268     }
269   } // Listener thread main loop
270 
271   return 0;
272 }
273 
274 
275 /* Shut down the context
276 */
shutdown()277 void AsyncNdbContext::shutdown() {
278   DEBUG_MARKER(UDEB_DEBUG);
279   ListNode<Ndb> * finalNode = new ListNode<Ndb>((Ndb *) 0);
280   finalNode->signalinfo = SignalShutdown;
281 
282   /* Queue the shutdown node, and wake up the listener thread for it */
283   sent_queue.produce(finalNode);
284   waitgroup->wakeup();
285 }
286 
287 
288 /* This runs in the JavaScript main thread, at most once per uv_async_send().
289    It dispatches JavaScript callbacks for completed operations.
290 */
completeCallbacks()291 void AsyncNdbContext::completeCallbacks() {
292   ListNode<Ndb> * completedNdbs, * currentNode;
293 
294   completedNdbs = completed_queue.consumeAll();
295 
296   while(completedNdbs != 0) {
297     currentNode = completedNdbs;
298     Ndb * ndb = currentNode->item;
299     AsyncExecCall * mcallptr = static_cast<AsyncExecCall *>(ndb->getCustomData());
300     ndb->setCustomData(0);
301 
302     main_thd_complete_async_call(mcallptr);
303     completedNdbs = currentNode->next;
304 
305     delete currentNode;  // Frees the ListNode from runListenerThread()
306   }
307 }
308 
309 #endif
310