1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 //****************************************************************************
26 //
27 //  NAME
28 //      TransporterRegistry
29 //
30 //  DESCRIPTION
31 //      TransporterRegistry (singelton) is the interface to the
32 //      transporter layer. It handles transporter states and
33 //      holds the transporter arrays.
34 //
35 //***************************************************************************/
36 #ifndef TransporterRegistry_H
37 #define TransporterRegistry_H
38 
39 #if defined(HAVE_EPOLL_CREATE)
40 #include <sys/epoll.h>
41 #endif
42 #include "TransporterDefinitions.hpp"
43 #include "TransporterCallback.hpp"
44 #include <SocketServer.hpp>
45 #include <SocketClient.hpp>
46 
47 #include <NdbTCP.h>
48 
49 #include <mgmapi/mgmapi.h>
50 
51 #include <NodeBitmask.hpp>
52 
53 // A transporter is always in an IOState.
54 // NoHalt is used initially and as long as it is no restrictions on
55 // sending or receiving.
56 enum IOState {
57   NoHalt     = 0,
58   HaltInput  = 1,
59   HaltOutput = 2,
60   HaltIO     = 3
61 };
62 
63 
64 static const char *performStateString[] =
65   { "is connected",
66     "is trying to connect",
67     "does nothing",
68     "is trying to disconnect" };
69 
70 class Transporter;
71 class TCP_Transporter;
72 class SCI_Transporter;
73 class SHM_Transporter;
74 
75 class TransporterRegistry;
76 class SocketAuthenticator;
77 
78 class TransporterService : public SocketServer::Service {
79   SocketAuthenticator * m_auth;
80   TransporterRegistry * m_transporter_registry;
81 public:
TransporterService(SocketAuthenticator * auth=0)82   TransporterService(SocketAuthenticator *auth= 0)
83   {
84     m_auth= auth;
85     m_transporter_registry= 0;
86   }
setTransporterRegistry(TransporterRegistry * t)87   void setTransporterRegistry(TransporterRegistry *t)
88   {
89     m_transporter_registry= t;
90   }
91   SocketServer::Session * newSession(NDB_SOCKET_TYPE socket);
92 };
93 
94 /**
95  * @class TransporterRegistry
96  * @brief ...
97  */
98 class TransporterRegistry : private TransporterSendBufferHandle {
99   friend class SHM_Transporter;
100   friend class SHM_Writer;
101   friend class Transporter;
102   friend class TransporterService;
103 public:
104  /**
105   * Constructor
106   */
107   TransporterRegistry(TransporterCallback *callback,
108                       bool use_default_send_buffer = true,
109 		      unsigned maxTransporters = MAX_NTRANSPORTERS,
110 		      unsigned sizeOfLongSignalMemory = 100);
111 
112   /**
113    * this handle will be used in the client connect thread
114    * to fetch information on dynamic ports.  The old handle
115    * (if set) is destroyed, and this is destroyed by the destructor
116    */
117   void set_mgm_handle(NdbMgmHandle h);
get_mgm_handle(void)118   NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; };
119 
120   bool init(NodeId localNodeId);
121 
122   /**
123      Handle the handshaking with a new client connection
124      on the server port.
125      NOTE! Connection should be closed if function
126      returns false
127   */
128   bool connect_server(NDB_SOCKET_TYPE sockfd, BaseString& errormsg) const;
129 
130   bool connect_client(NdbMgmHandle *h);
131 
132   /**
133    * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
134    * and returns the socket.
135    */
136   NDB_SOCKET_TYPE connect_ndb_mgmd(SocketClient *sc);
137 
138   /**
139    * Given a connected NdbMgmHandle, turns it into a transporter
140    * and returns the socket.
141    */
142   NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle *h);
143 
144   /**
145    * Remove all transporters
146    */
147   void removeAll();
148 
149   /**
150    * Disconnect all transporters
151    */
152   void disconnectAll();
153 
154   /**
155    * Stops the server, disconnects all the transporter
156    * and deletes them and remove it from the transporter arrays
157    */
158   virtual ~TransporterRegistry();
159 
160   bool start_service(SocketServer& server);
161   struct NdbThread* start_clients();
162   bool stop_clients();
163   void start_clients_thread();
164   void update_connections();
165 
166   /**
167    * Start/Stop receiving
168    */
169   void startReceiving();
170   void stopReceiving();
171 
172   /**
173    * Start/Stop sending
174    */
175   void startSending();
176   void stopSending();
177 
178   // A transporter is always in a PerformState.
179   // PerformIO is used initially and as long as any of the events
180   // PerformConnect, ...
181   enum PerformState {
182     CONNECTED         = 0,
183     CONNECTING        = 1,
184     DISCONNECTED      = 2,
185     DISCONNECTING     = 3
186   };
getPerformStateString(NodeId nodeId) const187   const char *getPerformStateString(NodeId nodeId) const
188   { return performStateString[(unsigned)performStates[nodeId]]; };
189 
getPerformState(NodeId nodeId) const190   PerformState getPerformState(NodeId nodeId) const { return performStates[nodeId]; }
191 
192   /**
193    * Get and set methods for PerformState
194    */
195   void do_connect(NodeId node_id);
196   void do_disconnect(NodeId node_id, int errnum = 0);
is_connected(NodeId node_id)197   bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; };
198   void report_connect(NodeId node_id);
199   void report_disconnect(NodeId node_id, int errnum);
200   void report_error(NodeId nodeId, TransporterError errorCode,
201                     const char *errorInfo = 0);
202 
203   /**
204    * Get and set methods for IOState
205    */
206   IOState ioState(NodeId nodeId);
207   void setIOState(NodeId nodeId, IOState state);
208 
209 private:
210 
211   bool createTCPTransporter(TransporterConfiguration * config);
212   bool createSCITransporter(TransporterConfiguration * config);
213   bool createSHMTransporter(TransporterConfiguration * config);
214 
215 public:
216   /**
217    *   configureTransporter
218    *
219    *   Configure a transporter, ie. create new if it
220    *   does not exist otherwise try to reconfigure it
221    *
222    */
223   bool configureTransporter(TransporterConfiguration * config);
224 
225   /**
226    * Allocate send buffer for default send buffer handling.
227    *
228    * Upper layer that implements their own TransporterSendBufferHandle do not
229    * use this, instead they manage their own send buffers.
230    *
231    * Argument is the value of config parameter TotalSendBufferMemory. If 0,
232    * a default will be used of sum(max send buffer) over all transporters.
233    */
234   void allocate_send_buffers(Uint64 total_send_buffer);
235 
236   /**
237    * Get sum of max send buffer over all transporters, to be used as a default
238    * for allocate_send_buffers eg.
239    *
240    * Must be called after creating all transporters for returned value to be
241    * correct.
242    */
get_total_max_send_buffer()243   Uint64 get_total_max_send_buffer() { return m_total_max_send_buffer; }
244 
get_using_default_send_buffer() const245   bool get_using_default_send_buffer() const{ return m_use_default_send_buffer;}
246 
247   /**
248    * Set or clear overloaded bit.
249    * Query if any overloaded bit is set.
250    */
251   void set_status_overloaded(Uint32 nodeId, bool val);
252   const NodeBitmask& get_status_overloaded() const;
253 
254   /**
255    * prepareSend
256    *
257    * When IOState is HaltOutput or HaltIO do not send or insert any
258    * signals in the SendBuffer, unless it is intended for the remote
259    * CMVMI block (blockno 252)
260    * Perform prepareSend on the transporter.
261    *
262    * NOTE signalHeader->xxxBlockRef should contain block numbers and
263    *                                not references
264    */
265   SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
266                          const SignalHeader * const signalHeader, Uint8 prio,
267 			 const Uint32 * const signalData,
268 			 NodeId nodeId,
269 			 const LinearSectionPtr ptr[3]);
270 
271   SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
272                          const SignalHeader * const signalHeader, Uint8 prio,
273 			 const Uint32 * const signalData,
274 			 NodeId nodeId,
275 			 class SectionSegmentPool & pool,
276 			 const SegmentedSectionPtr ptr[3]);
277   SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
278                          const SignalHeader * const signalHeader, Uint8 prio,
279                          const Uint32 * const signalData,
280                          NodeId nodeId,
281                          const GenericSectionPtr ptr[3]);
282   /**
283    * Backwards compatiple methods with default send buffer handling.
284    */
prepareSend(const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const LinearSectionPtr ptr[3])285   SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
286 			 const Uint32 * const signalData,
287 			 NodeId nodeId,
288 			 const LinearSectionPtr ptr[3])
289   {
290     return prepareSend(this, signalHeader, prio, signalData, nodeId, ptr);
291   }
prepareSend(const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,class SectionSegmentPool & pool,const SegmentedSectionPtr ptr[3])292   SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
293 			 const Uint32 * const signalData,
294 			 NodeId nodeId,
295 			 class SectionSegmentPool & pool,
296 			 const SegmentedSectionPtr ptr[3])
297   {
298     return prepareSend(this, signalHeader, prio, signalData, nodeId, pool, ptr);
299   }
prepareSend(const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const GenericSectionPtr ptr[3])300   SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
301                          const Uint32 * const signalData,
302                          NodeId nodeId,
303                          const GenericSectionPtr ptr[3])
304   {
305     return prepareSend(this, signalHeader, prio, signalData, nodeId, ptr);
306   }
307 
308   /**
309    * external_IO
310    *
311    * Equal to: poll(...); perform_IO()
312    *
313    */
314   void external_IO(Uint32 timeOutMillis);
315 
pollReceive(Uint32 timeOutMillis)316   inline Uint32 pollReceive(Uint32 timeOutMillis) {
317     return pollReceive(timeOutMillis, m_has_data_transporters);
318   }
319   Uint32 pollReceive(Uint32 timeOutMillis, NodeBitmask& mask);
320   void performReceive();
321   int performSend(NodeId nodeId);
322   void performSend();
323 
324   /**
325    * Force sending if more than or equal to sendLimit
326    * number have asked for send. Returns 0 if not sending
327    * and 1 if sending.
328    */
329   int forceSendCheck(int sendLimit);
330 
331 #ifdef DEBUG_TRANSPORTER
332   void printState();
333 #endif
334 
335 #ifdef ERROR_INSERT
336   /* Utils for testing latency issues */
337   bool isBlocked(NodeId nodeId);
338   void blockReceive(NodeId nodeId);
339   void unblockReceive(NodeId nodeId);
340 #endif
341 
342   class Transporter_interface {
343   public:
344     NodeId m_remote_nodeId;
345     int m_s_service_port;			// signed port number
346     const char *m_interface;
347   };
348   Vector<Transporter_interface> m_transporter_interface;
349   void add_transporter_interface(NodeId remoteNodeId, const char *interf,
350 		  		 int s_port);	// signed port. <0 is dynamic
351   Transporter* get_transporter(NodeId nodeId);
352   struct in_addr get_connect_address(NodeId node_id) const;
353 protected:
354 
355 private:
356   TransporterCallback *callbackObj;
357 
358   NdbMgmHandle m_mgm_handle;
359 
360   struct NdbThread   *m_start_clients_thread;
361   bool                m_run_start_clients_thread;
362 
363   int sendCounter;
364   NodeId localNodeId;
365   unsigned maxTransporters;
366   int nTransporters;
367   int nTCPTransporters;
368   int nSCITransporters;
369   int nSHMTransporters;
370 
371 #ifdef ERROR_INSERT
372   Bitmask<MAX_NTRANSPORTERS/32> m_blocked;
373   Bitmask<MAX_NTRANSPORTERS/32> m_blocked_with_data;
374   Bitmask<MAX_NTRANSPORTERS/32> m_blocked_disconnected;
375   int m_disconnect_errors[MAX_NTRANSPORTERS];
376 #endif
377 
378   /**
379    * Bitmask of transporters that has data "carried over" since
380    *   last performReceive
381    */
382   NodeBitmask m_has_data_transporters;
383 #if defined(HAVE_EPOLL_CREATE)
384   int m_epoll_fd;
385   struct epoll_event *m_epoll_events;
386   bool change_epoll(TCP_Transporter *t, bool add);
387 #endif
388   /**
389    * Arrays holding all transporters in the order they are created
390    */
391   TCP_Transporter** theTCPTransporters;
392   SCI_Transporter** theSCITransporters;
393   SHM_Transporter** theSHMTransporters;
394 
395   /**
396    * Array, indexed by nodeId, holding all transporters
397    */
398   TransporterType* theTransporterTypes;
399   Transporter**    theTransporters;
400 
401   /**
402    * State arrays, index by host id
403    */
404   PerformState* performStates;
405   int*          m_disconnect_errnum;
406   IOState*      ioStates;
407   struct ErrorState {
408     TransporterError m_code;
409     const char *m_info;
410   };
411   struct ErrorState *m_error_states;
412 
413   /**
414    * Overloaded bits, for fast check.
415    */
416   NodeBitmask m_status_overloaded;
417 
418   /**
419    * Unpack signal data.
420    *
421    * Defined in Packer.cpp.
422    */
423   Uint32 unpack(Uint32 * readPtr,
424 		Uint32 bufferSize,
425 		NodeId remoteNodeId,
426 		IOState state);
427 
428   Uint32 * unpack(Uint32 * readPtr,
429 		  Uint32 * eodPtr,
430 		  NodeId remoteNodeId,
431 		  IOState state);
432 
433   static Uint32 unpack_length_words(const Uint32 *readPtr, Uint32 maxWords);
434   /**
435    * Disconnect the transporter and remove it from
436    * theTransporters array. Do not allow any holes
437    * in theTransporters. Delete the transporter
438    * and remove it from theIndexedTransporters array
439    */
440   void removeTransporter(NodeId nodeId);
441 
442   /**
443    * Used in polling if exists TCP_Transporter
444    */
445   int tcpReadSelectReply;
446   ndb_socket_poller m_socket_poller;
447 
448   Uint32 poll_TCP(Uint32 timeOutMillis, NodeBitmask&);
449   Uint32 poll_SCI(Uint32 timeOutMillis, NodeBitmask&);
450   Uint32 poll_SHM(Uint32 timeOutMillis, NodeBitmask&);
451 
452   int m_shm_own_pid;
453   int m_transp_count;
454 
455 public:
456   bool setup_wakeup_socket();
457   void wakeup();
458 private:
459   bool m_has_extra_wakeup_socket;
460   NDB_SOCKET_TYPE m_extra_wakeup_sockets[2];
461   void consume_extra_sockets();
462 
463 
464   Uint32 *getWritePtr(TransporterSendBufferHandle *handle,
465                       NodeId node, Uint32 lenBytes, Uint32 prio);
466   void updateWritePtr(TransporterSendBufferHandle *handle,
467                       NodeId node, Uint32 lenBytes, Uint32 prio);
468 
469   /**
470    * TransporterSendBufferHandle implementation.
471    *
472    * Used for default send buffer handling, when the upper layer does not
473    * want to do special buffer handling itself.
474    */
475   virtual Uint32 *getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
476                               Uint32 max_use);
477   virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
478   virtual bool forceSend(NodeId node);
479 
480 private:
481   /* Send buffer pages. */
482   struct SendBufferPage {
483     /* This is the number of words that will fit in one page of send buffer. */
484     static const Uint32 PGSIZE = 32768;
max_data_bytesTransporterRegistry::SendBufferPage485     static Uint32 max_data_bytes()
486     {
487       return PGSIZE - offsetof(SendBufferPage, m_data);
488     }
489 
490     /* Send buffer for one transporter is kept in a single-linked list. */
491     struct SendBufferPage *m_next;
492 
493     /* Bytes of send data available in this page. */
494     Uint16 m_bytes;
495     /* Start of unsent data */
496     Uint16 m_start;
497 
498     /* Data; real size is to the end of one page. */
499     char m_data[2];
500   };
501 
502   /* Send buffer for one transporter. */
503   struct SendBuffer {
504     /* Total size of data in buffer, from m_offset_start_data to end. */
505     Uint32 m_used_bytes;
506     /* Linked list of active buffer pages with first and last pointer. */
507     SendBufferPage *m_first_page;
508     SendBufferPage *m_last_page;
509   };
510 
511   SendBufferPage *alloc_page();
512   void release_page(SendBufferPage *page);
513 
514 private:
515   /* True if we are using the default send buffer implementation. */
516   bool m_use_default_send_buffer;
517   /* Send buffers. */
518   SendBuffer *m_send_buffers;
519   /* Linked list of free pages. */
520   SendBufferPage *m_page_freelist;
521   /* Original block of memory for pages (so we can free it at exit). */
522   unsigned char *m_send_buffer_memory;
523   /**
524    * Sum of max transporter memory for each transporter.
525    * Used to compute default send buffer size.
526    */
527   Uint64 m_total_max_send_buffer;
528 
529 public:
530   Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
531   Uint32 bytes_sent(NodeId node, Uint32 bytes);
532   bool has_data_to_send(NodeId node);
533 
534   void reset_send_buffer(NodeId node, bool should_be_empty);
535 
536   void print_transporters(const char* where, NdbOut& out = ndbout);
537 
538 };
539 
540 inline void
set_status_overloaded(Uint32 nodeId,bool val)541 TransporterRegistry::set_status_overloaded(Uint32 nodeId, bool val)
542 {
543   assert(nodeId < MAX_NODES);
544   m_status_overloaded.set(nodeId, val);
545 }
546 
547 inline const NodeBitmask&
get_status_overloaded() const548 TransporterRegistry::get_status_overloaded() const
549 {
550   return m_status_overloaded;
551 }
552 
553 #endif // Define of TransporterRegistry_H
554