1 /*
2 Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef TransporterFacade_H
26 #define TransporterFacade_H
27
28 #include <kernel_types.h>
29 #include <ndb_limits.h>
30 #include <NdbThread.h>
31 #include <TransporterRegistry.hpp>
32 #include <NdbMutex.h>
33 #include "DictCache.hpp"
34 #include <BlockNumbers.h>
35 #include <mgmapi.h>
36
37 class ClusterMgr;
38 class ArbitMgr;
39 struct ndb_mgm_configuration;
40
41 class Ndb;
42 class NdbApiSignal;
43 class NdbWaiter;
44 class trp_client;
45
46 extern "C" {
47 void* runSendRequest_C(void*);
48 void* runReceiveResponse_C(void*);
49 }
50
51 class TransporterFacade : public TransporterCallback
52 {
53 public:
54 /**
55 * Max number of Ndb objects.
56 * (Ndb objects should not be shared by different threads.)
57 */
58 STATIC_CONST( MAX_NO_THREADS = 4711 );
59 TransporterFacade(GlobalDictCache *cache);
60 virtual ~TransporterFacade();
61
62 int start_instance(NodeId, const ndb_mgm_configuration*);
63 void stop_instance();
64
65 /*
66 (Re)configure the TransporterFacade
67 to a specific configuration
68 */
69 bool configure(NodeId, const ndb_mgm_configuration *);
70
71 /**
72 * Register this block for sending/receiving signals
73 * @blockNo block number to use, -1 => any blockNumber
74 * @return BlockNumber or -1 for failure
75 */
76 Uint32 open_clnt(trp_client*, int blockNo = -1);
77 int close_clnt(trp_client*);
78
79 Uint32 get_active_ndb_objects() const;
80
81 // Only sends to nodes which are alive
82 private:
83 int sendSignal(const NdbApiSignal * signal, NodeId nodeId);
84 int sendSignal(const NdbApiSignal*, NodeId,
85 const LinearSectionPtr ptr[3], Uint32 secs);
86 int sendSignal(const NdbApiSignal*, NodeId,
87 const GenericSectionPtr ptr[3], Uint32 secs);
88 int sendFragmentedSignal(const NdbApiSignal*, NodeId,
89 const LinearSectionPtr ptr[3], Uint32 secs);
90 int sendFragmentedSignal(const NdbApiSignal*, NodeId,
91 const GenericSectionPtr ptr[3], Uint32 secs);
92 public:
93
94 /**
95 * These are functions used by ndb_mgmd
96 */
97 void ext_set_max_api_reg_req_interval(Uint32 ms);
98 void ext_update_connections();
99 struct in_addr ext_get_connect_address(Uint32 nodeId);
100 void ext_forceHB();
101 bool ext_isConnected(NodeId aNodeId);
102 void ext_doConnect(int aNodeId);
103
104 // Is node available for running transactions
105 private:
106 bool get_node_alive(NodeId nodeId) const;
107 bool getIsNodeSendable(NodeId nodeId) const;
108
109 public:
110 Uint32 getMinDbNodeVersion() const;
111
112 // My own processor id
113 NodeId ownId() const;
114
115 void connected();
116
117 void doConnect(int NodeId);
118 void reportConnected(int NodeId);
119 void doDisconnect(int NodeId);
120 void reportDisconnected(int NodeId);
121
122 NodeId get_an_alive_node();
123 void trp_node_status(NodeId, Uint32 event);
124
125 /**
126 * Send signal to each registered object
127 */
128 void for_each(trp_client* clnt,
129 const NdbApiSignal* aSignal, const LinearSectionPtr ptr[3]);
130
131 void lock_mutex();
132 void unlock_mutex();
133
134 // Improving the API performance
135 void forceSend(Uint32 block_number);
136 void checkForceSend(Uint32 block_number);
137
get_registry()138 TransporterRegistry* get_registry() { return theTransporterRegistry;};
139
140 /*
141 When a thread has sent its signals and is ready to wait for reception
142 of these it does normally always wait on a conditional mutex and
143 the actual reception is handled by the receiver thread in the NDB API.
144 With the below new methods and variables each thread has the possibility
145 of becoming owner of the "right" to poll for signals. Effectually this
146 means that the thread acts temporarily as a receiver thread.
147 For the thread that succeeds in grabbing this "ownership" it will avoid
148 a number of expensive calls to conditional mutex and even more expensive
149 context switches to wake up.
150 When an owner of the poll "right" has completed its own task it is likely
151 that there are others still waiting. In this case we pick one of the
152 threads as new owner of the poll "right". Since we want to switch owner
153 as seldom as possible we always pick the last thread which is likely to
154 be the last to complete its reception.
155 */
156 void start_poll(trp_client*);
157 void do_poll(trp_client* clnt, Uint32 wait_time);
158 void complete_poll(trp_client*);
159 void wakeup(trp_client*);
160
161 void external_poll(Uint32 wait_time);
162
get_poll_owner(bool) const163 trp_client* get_poll_owner(bool) const { return m_poll_owner;}
164 trp_client* remove_last_from_poll_queue();
165 void add_to_poll_queue(trp_client* clnt);
166 void remove_from_poll_queue(trp_client* clnt);
167
168 trp_client * m_poll_owner;
169 trp_client * m_poll_queue_head; // First in queue
170 trp_client * m_poll_queue_tail; // Last in queue
171 /* End poll owner stuff */
172
173 // heart beat received from a node (e.g. a signal came)
174 void hb_received(NodeId n);
175 void set_auto_reconnect(int val);
176 int get_auto_reconnect() const;
177
178 /* TransporterCallback interface. */
179 void deliver_signal(SignalHeader * const header,
180 Uint8 prio,
181 Uint32 * const signalData,
182 LinearSectionPtr ptr[3]);
183 int checkJobBuffer();
184 void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
185 void reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes);
186 void reportConnect(NodeId nodeId);
187 void reportDisconnect(NodeId nodeId, Uint32 errNo);
188 void reportError(NodeId nodeId, TransporterError errorCode,
189 const char *info = 0);
190 void transporter_recv_from(NodeId node);
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)191 Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
192 {
193 return theTransporterRegistry->get_bytes_to_send_iovec(node, dst, max);
194 }
bytes_sent(NodeId node,Uint32 bytes)195 Uint32 bytes_sent(NodeId node, Uint32 bytes)
196 {
197 return theTransporterRegistry->bytes_sent(node, bytes);
198 }
has_data_to_send(NodeId node)199 bool has_data_to_send(NodeId node)
200 {
201 return theTransporterRegistry->has_data_to_send(node);
202 }
reset_send_buffer(NodeId node,bool should_be_empty)203 void reset_send_buffer(NodeId node, bool should_be_empty)
204 {
205 theTransporterRegistry->reset_send_buffer(node, should_be_empty);
206 }
207
208 private:
209
210 friend class trp_client;
211 friend class ClusterMgr;
212 friend class ArbitMgr;
213 friend class Ndb_cluster_connection;
214 friend class Ndb_cluster_connection_impl;
215
216 bool isConnected(NodeId aNodeId);
217 void doStop();
218
219 TransporterRegistry* theTransporterRegistry;
220 SocketServer m_socket_server;
221 int sendPerformedLastInterval;
222 NodeId theOwnId;
223 NodeId theStartNodeId;
224
225 ClusterMgr* theClusterMgr;
226
227 // Improving the API response time
228 int checkCounter;
229 Uint32 currentSendLimit;
230
231 void calculateSendLimit();
232
233 // Declarations for the receive and send thread
234 int theStopReceive;
235
236 void threadMainSend(void);
237 NdbThread* theSendThread;
238 void threadMainReceive(void);
239 NdbThread* theReceiveThread;
240
241 friend void* runSendRequest_C(void*);
242 friend void* runReceiveResponse_C(void*);
243
244 bool do_connect_mgm(NodeId, const ndb_mgm_configuration*);
245
246 /**
247 * Block number handling
248 */
249 private:
250
251 struct ThreadData {
252 STATIC_CONST( ACTIVE = (1 << 16) | 1 );
253 STATIC_CONST( INACTIVE = (1 << 16) );
254 STATIC_CONST( END_OF_LIST = MAX_NO_THREADS + 1 );
255
256 ThreadData(Uint32 initialSize = 32);
257
258 Uint32 m_use_cnt;
259 Uint32 m_firstFree;
260 Vector<Uint32> m_statusNext;
261 Vector<trp_client*> m_objectExecute;
262
263 int open(trp_client*);
264 int close(int number);
265 void expand(Uint32 size);
266
getTransporterFacade::ThreadData267 inline trp_client* get(Uint16 blockNo) const {
268 blockNo -= MIN_API_BLOCK_NO;
269 if(likely (blockNo < m_objectExecute.size()))
270 {
271 return m_objectExecute.getBase()[blockNo];
272 }
273 return 0;
274 }
275 } m_threads;
276
277 Uint32 m_fixed2dynamic[NO_API_FIXED_BLOCKS];
278 Uint32 m_fragmented_signal_id;
279
280 public:
281 NdbMutex* theMutexPtr;
282
283 public:
284 GlobalDictCache *m_globalDictCache;
285 };
286
287 inline
288 void
lock_mutex()289 TransporterFacade::lock_mutex()
290 {
291 NdbMutex_Lock(theMutexPtr);
292 }
293
294 inline
295 void
unlock_mutex()296 TransporterFacade::unlock_mutex()
297 {
298 NdbMutex_Unlock(theMutexPtr);
299 }
300
301 #include "ClusterMgr.hpp"
302 #include "ndb_cluster_connection_impl.hpp"
303
304 inline
get_connect_count() const305 unsigned Ndb_cluster_connection_impl::get_connect_count() const
306 {
307 if (m_transporter_facade->theClusterMgr)
308 return m_transporter_facade->theClusterMgr->m_connect_count;
309 return 0;
310 }
311
312 inline
get_min_db_version() const313 unsigned Ndb_cluster_connection_impl::get_min_db_version() const
314 {
315 return m_transporter_facade->getMinDbNodeVersion();
316 }
317
318 inline
319 bool
get_node_alive(NodeId n) const320 TransporterFacade::get_node_alive(NodeId n) const {
321 if (theClusterMgr)
322 {
323 return theClusterMgr->getNodeInfo(n).m_alive;
324 }
325 return 0;
326 }
327
328 inline
329 void
hb_received(NodeId n)330 TransporterFacade::hb_received(NodeId n) {
331 theClusterMgr->hb_received(n);
332 }
333
334 inline
335 Uint32
getMinDbNodeVersion() const336 TransporterFacade::getMinDbNodeVersion() const
337 {
338 if (theClusterMgr)
339 return theClusterMgr->minDbVersion;
340 else
341 return 0;
342 }
343
344 inline
345 const trp_node &
getNodeInfo(Uint32 nodeId) const346 trp_client::getNodeInfo(Uint32 nodeId) const
347 {
348 return m_facade->theClusterMgr->getNodeInfo(nodeId);
349 }
350
351 /**
352 * LinearSectionIterator
353 *
354 * This is an implementation of GenericSectionIterator
355 * that iterates over one linear section of memory.
356 * The iterator is used by the transporter at signal
357 * send time to obtain all of the relevant words for the
358 * signal section
359 */
360 class LinearSectionIterator: public GenericSectionIterator
361 {
362 private :
363 const Uint32* data;
364 Uint32 len;
365 bool read;
366 public :
LinearSectionIterator(const Uint32 * _data,Uint32 _len)367 LinearSectionIterator(const Uint32* _data, Uint32 _len)
368 {
369 data= (_len == 0)? NULL:_data;
370 len= _len;
371 read= false;
372 }
373
~LinearSectionIterator()374 ~LinearSectionIterator()
375 {};
376
reset()377 void reset()
378 {
379 /* Reset iterator */
380 read= false;
381 }
382
getNextWords(Uint32 & sz)383 const Uint32* getNextWords(Uint32& sz)
384 {
385 if (likely(!read))
386 {
387 read= true;
388 sz= len;
389 return data;
390 }
391 sz= 0;
392 return NULL;
393 }
394 };
395
396
397 /**
398 * SignalSectionIterator
399 *
400 * This is an implementation of GenericSectionIterator
401 * that uses chained NdbApiSignal objects to store a
402 * signal section.
403 * The iterator is used by the transporter at signal
404 * send time to obtain all of the relevant words for the
405 * signal section
406 */
407 class SignalSectionIterator: public GenericSectionIterator
408 {
409 private :
410 NdbApiSignal* firstSignal;
411 NdbApiSignal* currentSignal;
412 public :
SignalSectionIterator(NdbApiSignal * signal)413 SignalSectionIterator(NdbApiSignal* signal)
414 {
415 firstSignal= currentSignal= signal;
416 }
417
~SignalSectionIterator()418 ~SignalSectionIterator()
419 {};
420
reset()421 void reset()
422 {
423 /* Reset iterator */
424 currentSignal= firstSignal;
425 }
426
427 const Uint32* getNextWords(Uint32& sz);
428 };
429
430 /*
431 * GenericSectionIteratorReader
432 * Helper class to simplify reading data from
433 * GenericSectionIterator implementations
434 */
435
436 class GSIReader
437 {
438 private :
439 GenericSectionIterator* gsi;
440 const Uint32* chunkPtr;
441 Uint32 chunkRemain;
442 public :
GSIReader(GenericSectionIterator * _gsi)443 GSIReader(GenericSectionIterator* _gsi)
444 {
445 gsi = _gsi;
446 chunkPtr = NULL;
447 chunkRemain = 0;
448 }
449
copyNWords(Uint32 * dest,Uint32 n)450 void copyNWords(Uint32* dest, Uint32 n)
451 {
452 while (n)
453 {
454 if (chunkRemain == 0)
455 {
456 /* Get next contiguous stretch of words from
457 * the iterator
458 */
459 chunkPtr = gsi->getNextWords(chunkRemain);
460 if (!chunkRemain)
461 abort(); // Must have the words the caller asks for
462 }
463 else
464 {
465 /* Have some words from the iterator, copy some/
466 * all of them
467 */
468 Uint32 wordsToCopy = MIN(chunkRemain, n);
469 memcpy(dest, chunkPtr, wordsToCopy << 2);
470 chunkPtr += wordsToCopy;
471 chunkRemain -= wordsToCopy;
472
473 dest += wordsToCopy;
474 n -= wordsToCopy;
475 }
476 }
477 }
478 };
479
480
481
482
483 #endif // TransporterFacade_H
484