1 /*
2    Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef TransporterFacade_H
26 #define TransporterFacade_H
27 
28 #include <kernel_types.h>
29 #include <ndb_limits.h>
30 #include <NdbThread.h>
31 #include <TransporterRegistry.hpp>
32 #include <NdbMutex.h>
33 #include "DictCache.hpp"
34 #include <BlockNumbers.h>
35 #include <mgmapi.h>
36 
37 class ClusterMgr;
38 class ArbitMgr;
39 struct ndb_mgm_configuration;
40 
41 class Ndb;
42 class NdbApiSignal;
43 class NdbWaiter;
44 class trp_client;
45 
46 extern "C" {
47   void* runSendRequest_C(void*);
48   void* runReceiveResponse_C(void*);
49 }
50 
51 class TransporterFacade : public TransporterCallback
52 {
53 public:
54   /**
55    * Max number of Ndb objects.
56    * (Ndb objects should not be shared by different threads.)
57    */
58   STATIC_CONST( MAX_NO_THREADS = 4711 );
59   TransporterFacade(GlobalDictCache *cache);
60   virtual ~TransporterFacade();
61 
62   int start_instance(NodeId, const ndb_mgm_configuration*);
63   void stop_instance();
64 
65   /*
66     (Re)configure the TransporterFacade
67     to a specific configuration
68   */
69   bool configure(NodeId, const ndb_mgm_configuration *);
70 
71   /**
72    * Register this block for sending/receiving signals
73    * @blockNo block number to use, -1 => any blockNumber
74    * @return BlockNumber or -1 for failure
75    */
76   Uint32 open_clnt(trp_client*, int blockNo = -1);
77   int close_clnt(trp_client*);
78 
79   Uint32 get_active_ndb_objects() const;
80 
81   // Only sends to nodes which are alive
82 private:
83   int sendSignal(const NdbApiSignal * signal, NodeId nodeId);
84   int sendSignal(const NdbApiSignal*, NodeId,
85                  const LinearSectionPtr ptr[3], Uint32 secs);
86   int sendSignal(const NdbApiSignal*, NodeId,
87                  const GenericSectionPtr ptr[3], Uint32 secs);
88   int sendFragmentedSignal(const NdbApiSignal*, NodeId,
89                            const LinearSectionPtr ptr[3], Uint32 secs);
90   int sendFragmentedSignal(const NdbApiSignal*, NodeId,
91                            const GenericSectionPtr ptr[3], Uint32 secs);
92 public:
93 
94   /**
95    * These are functions used by ndb_mgmd
96    */
97   void ext_set_max_api_reg_req_interval(Uint32 ms);
98   void ext_update_connections();
99   struct in_addr ext_get_connect_address(Uint32 nodeId);
100   void ext_forceHB();
101   bool ext_isConnected(NodeId aNodeId);
102   void ext_doConnect(int aNodeId);
103 
104   // Is node available for running transactions
105 private:
106   bool   get_node_alive(NodeId nodeId) const;
107   bool   getIsNodeSendable(NodeId nodeId) const;
108 
109 public:
110   Uint32 getMinDbNodeVersion() const;
111 
112   // My own processor id
113   NodeId ownId() const;
114 
115   void connected();
116 
117   void doConnect(int NodeId);
118   void reportConnected(int NodeId);
119   void doDisconnect(int NodeId);
120   void reportDisconnected(int NodeId);
121 
122   NodeId get_an_alive_node();
123   void trp_node_status(NodeId, Uint32 event);
124 
125   /**
126    * Send signal to each registered object
127    */
128   void for_each(trp_client* clnt,
129                 const NdbApiSignal* aSignal, const LinearSectionPtr ptr[3]);
130 
131   void lock_mutex();
132   void unlock_mutex();
133 
134   // Improving the API performance
135   void forceSend(Uint32 block_number);
136   void checkForceSend(Uint32 block_number);
137 
get_registry()138   TransporterRegistry* get_registry() { return theTransporterRegistry;};
139 
140 /*
141   When a thread has sent its signals and is ready to wait for reception
142   of these it does normally always wait on a conditional mutex and
143   the actual reception is handled by the receiver thread in the NDB API.
144   With the below new methods and variables each thread has the possibility
145   of becoming owner of the "right" to poll for signals. Effectually this
146   means that the thread acts temporarily as a receiver thread.
147   For the thread that succeeds in grabbing this "ownership" it will avoid
148   a number of expensive calls to conditional mutex and even more expensive
149   context switches to wake up.
150   When an owner of the poll "right" has completed its own task it is likely
151   that there are others still waiting. In this case we pick one of the
152   threads as new owner of the poll "right". Since we want to switch owner
153   as seldom as possible we always pick the last thread which is likely to
154   be the last to complete its reception.
155 */
156   void start_poll(trp_client*);
157   void do_poll(trp_client* clnt, Uint32 wait_time);
158   void complete_poll(trp_client*);
159   void wakeup(trp_client*);
160 
161   void external_poll(Uint32 wait_time);
162 
get_poll_owner(bool) const163   trp_client* get_poll_owner(bool) const { return m_poll_owner;}
164   trp_client* remove_last_from_poll_queue();
165   void add_to_poll_queue(trp_client* clnt);
166   void remove_from_poll_queue(trp_client* clnt);
167 
168   trp_client * m_poll_owner;
169   trp_client * m_poll_queue_head; // First in queue
170   trp_client * m_poll_queue_tail; // Last in queue
171   /* End poll owner stuff */
172 
173   // heart beat received from a node (e.g. a signal came)
174   void hb_received(NodeId n);
175   void set_auto_reconnect(int val);
176   int get_auto_reconnect() const;
177 
178   /* TransporterCallback interface. */
179   void deliver_signal(SignalHeader * const header,
180                       Uint8 prio,
181                       Uint32 * const signalData,
182                       LinearSectionPtr ptr[3]);
183   int checkJobBuffer();
184   void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
185   void reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes);
186   void reportConnect(NodeId nodeId);
187   void reportDisconnect(NodeId nodeId, Uint32 errNo);
188   void reportError(NodeId nodeId, TransporterError errorCode,
189                    const char *info = 0);
190   void transporter_recv_from(NodeId node);
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)191   Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
192   {
193     return theTransporterRegistry->get_bytes_to_send_iovec(node, dst, max);
194   }
bytes_sent(NodeId node,Uint32 bytes)195   Uint32 bytes_sent(NodeId node, Uint32 bytes)
196   {
197     return theTransporterRegistry->bytes_sent(node, bytes);
198   }
has_data_to_send(NodeId node)199   bool has_data_to_send(NodeId node)
200   {
201     return theTransporterRegistry->has_data_to_send(node);
202   }
reset_send_buffer(NodeId node,bool should_be_empty)203   void reset_send_buffer(NodeId node, bool should_be_empty)
204   {
205     theTransporterRegistry->reset_send_buffer(node, should_be_empty);
206   }
207 
208 private:
209 
210   friend class trp_client;
211   friend class ClusterMgr;
212   friend class ArbitMgr;
213   friend class Ndb_cluster_connection;
214   friend class Ndb_cluster_connection_impl;
215 
216   bool isConnected(NodeId aNodeId);
217   void doStop();
218 
219   TransporterRegistry* theTransporterRegistry;
220   SocketServer m_socket_server;
221   int sendPerformedLastInterval;
222   NodeId theOwnId;
223   NodeId theStartNodeId;
224 
225   ClusterMgr* theClusterMgr;
226 
227   // Improving the API response time
228   int checkCounter;
229   Uint32 currentSendLimit;
230 
231   void calculateSendLimit();
232 
233   // Declarations for the receive and send thread
234   int  theStopReceive;
235 
236   void threadMainSend(void);
237   NdbThread* theSendThread;
238   void threadMainReceive(void);
239   NdbThread* theReceiveThread;
240 
241   friend void* runSendRequest_C(void*);
242   friend void* runReceiveResponse_C(void*);
243 
244   bool do_connect_mgm(NodeId, const ndb_mgm_configuration*);
245 
246   /**
247    * Block number handling
248    */
249 private:
250 
251   struct ThreadData {
252     STATIC_CONST( ACTIVE = (1 << 16) | 1 );
253     STATIC_CONST( INACTIVE = (1 << 16) );
254     STATIC_CONST( END_OF_LIST = MAX_NO_THREADS + 1 );
255 
256     ThreadData(Uint32 initialSize = 32);
257 
258     Uint32 m_use_cnt;
259     Uint32 m_firstFree;
260     Vector<Uint32> m_statusNext;
261     Vector<trp_client*> m_objectExecute;
262 
263     int open(trp_client*);
264     int close(int number);
265     void expand(Uint32 size);
266 
getTransporterFacade::ThreadData267     inline trp_client* get(Uint16 blockNo) const {
268       blockNo -= MIN_API_BLOCK_NO;
269       if(likely (blockNo < m_objectExecute.size()))
270       {
271         return m_objectExecute.getBase()[blockNo];
272       }
273       return 0;
274     }
275   } m_threads;
276 
277   Uint32 m_fixed2dynamic[NO_API_FIXED_BLOCKS];
278   Uint32 m_fragmented_signal_id;
279 
280 public:
281   NdbMutex* theMutexPtr;
282 
283 public:
284   GlobalDictCache *m_globalDictCache;
285 };
286 
287 inline
288 void
lock_mutex()289 TransporterFacade::lock_mutex()
290 {
291   NdbMutex_Lock(theMutexPtr);
292 }
293 
294 inline
295 void
unlock_mutex()296 TransporterFacade::unlock_mutex()
297 {
298   NdbMutex_Unlock(theMutexPtr);
299 }
300 
301 #include "ClusterMgr.hpp"
302 #include "ndb_cluster_connection_impl.hpp"
303 
304 inline
get_connect_count() const305 unsigned Ndb_cluster_connection_impl::get_connect_count() const
306 {
307   if (m_transporter_facade->theClusterMgr)
308     return m_transporter_facade->theClusterMgr->m_connect_count;
309   return 0;
310 }
311 
312 inline
get_min_db_version() const313 unsigned Ndb_cluster_connection_impl::get_min_db_version() const
314 {
315   return m_transporter_facade->getMinDbNodeVersion();
316 }
317 
318 inline
319 bool
get_node_alive(NodeId n) const320 TransporterFacade::get_node_alive(NodeId n) const {
321   if (theClusterMgr)
322   {
323     return theClusterMgr->getNodeInfo(n).m_alive;
324   }
325   return 0;
326 }
327 
328 inline
329 void
hb_received(NodeId n)330 TransporterFacade::hb_received(NodeId n) {
331   theClusterMgr->hb_received(n);
332 }
333 
334 inline
335 Uint32
getMinDbNodeVersion() const336 TransporterFacade::getMinDbNodeVersion() const
337 {
338   if (theClusterMgr)
339     return theClusterMgr->minDbVersion;
340   else
341     return 0;
342 }
343 
344 inline
345 const trp_node &
getNodeInfo(Uint32 nodeId) const346 trp_client::getNodeInfo(Uint32 nodeId) const
347 {
348   return m_facade->theClusterMgr->getNodeInfo(nodeId);
349 }
350 
351 /**
352  * LinearSectionIterator
353  *
354  * This is an implementation of GenericSectionIterator
355  * that iterates over one linear section of memory.
356  * The iterator is used by the transporter at signal
357  * send time to obtain all of the relevant words for the
358  * signal section
359  */
360 class LinearSectionIterator: public GenericSectionIterator
361 {
362 private :
363   const Uint32* data;
364   Uint32 len;
365   bool read;
366 public :
LinearSectionIterator(const Uint32 * _data,Uint32 _len)367   LinearSectionIterator(const Uint32* _data, Uint32 _len)
368   {
369     data= (_len == 0)? NULL:_data;
370     len= _len;
371     read= false;
372   }
373 
~LinearSectionIterator()374   ~LinearSectionIterator()
375   {};
376 
reset()377   void reset()
378   {
379     /* Reset iterator */
380     read= false;
381   }
382 
getNextWords(Uint32 & sz)383   const Uint32* getNextWords(Uint32& sz)
384   {
385     if (likely(!read))
386     {
387       read= true;
388       sz= len;
389       return data;
390     }
391     sz= 0;
392     return NULL;
393   }
394 };
395 
396 
397 /**
398  * SignalSectionIterator
399  *
400  * This is an implementation of GenericSectionIterator
401  * that uses chained NdbApiSignal objects to store a
402  * signal section.
403  * The iterator is used by the transporter at signal
404  * send time to obtain all of the relevant words for the
405  * signal section
406  */
407 class SignalSectionIterator: public GenericSectionIterator
408 {
409 private :
410   NdbApiSignal* firstSignal;
411   NdbApiSignal* currentSignal;
412 public :
SignalSectionIterator(NdbApiSignal * signal)413   SignalSectionIterator(NdbApiSignal* signal)
414   {
415     firstSignal= currentSignal= signal;
416   }
417 
~SignalSectionIterator()418   ~SignalSectionIterator()
419   {};
420 
reset()421   void reset()
422   {
423     /* Reset iterator */
424     currentSignal= firstSignal;
425   }
426 
427   const Uint32* getNextWords(Uint32& sz);
428 };
429 
430 /*
431  * GenericSectionIteratorReader
432  * Helper class to simplify reading data from
433  * GenericSectionIterator implementations
434  */
435 
436 class GSIReader
437 {
438 private :
439   GenericSectionIterator* gsi;
440   const Uint32* chunkPtr;
441   Uint32 chunkRemain;
442 public :
GSIReader(GenericSectionIterator * _gsi)443   GSIReader(GenericSectionIterator* _gsi)
444   {
445     gsi = _gsi;
446     chunkPtr = NULL;
447     chunkRemain = 0;
448   }
449 
copyNWords(Uint32 * dest,Uint32 n)450   void copyNWords(Uint32* dest, Uint32 n)
451   {
452     while (n)
453     {
454       if (chunkRemain == 0)
455       {
456         /* Get next contiguous stretch of words from
457          * the iterator
458          */
459         chunkPtr = gsi->getNextWords(chunkRemain);
460         if (!chunkRemain)
461           abort(); // Must have the words the caller asks for
462       }
463       else
464       {
465         /* Have some words from the iterator, copy some/
466          * all of them
467          */
468         Uint32 wordsToCopy = MIN(chunkRemain, n);
469         memcpy(dest, chunkPtr, wordsToCopy << 2);
470         chunkPtr += wordsToCopy;
471         chunkRemain -= wordsToCopy;
472 
473         dest += wordsToCopy;
474         n -= wordsToCopy;
475       }
476     }
477   }
478 };
479 
480 
481 
482 
483 #endif // TransporterFacade_H
484