1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 //****************************************************************************
26 //
27 // NAME
28 // TransporterRegistry
29 //
30 // DESCRIPTION
31 // TransporterRegistry (singelton) is the interface to the
32 // transporter layer. It handles transporter states and
33 // holds the transporter arrays.
34 //
35 //***************************************************************************/
36 #ifndef TransporterRegistry_H
37 #define TransporterRegistry_H
38
39 #if defined(HAVE_EPOLL_CREATE)
40 #include <sys/epoll.h>
41 #endif
42 #include "TransporterDefinitions.hpp"
43 #include "TransporterCallback.hpp"
44 #include <SocketServer.hpp>
45 #include <SocketClient.hpp>
46
47 #include <NdbTCP.h>
48
49 #include <mgmapi/mgmapi.h>
50
51 #include <NodeBitmask.hpp>
52
53 // A transporter is always in an IOState.
54 // NoHalt is used initially and as long as it is no restrictions on
55 // sending or receiving.
56 enum IOState {
57 NoHalt = 0,
58 HaltInput = 1,
59 HaltOutput = 2,
60 HaltIO = 3
61 };
62
63
64 static const char *performStateString[] =
65 { "is connected",
66 "is trying to connect",
67 "does nothing",
68 "is trying to disconnect" };
69
70 class Transporter;
71 class TCP_Transporter;
72 class SCI_Transporter;
73 class SHM_Transporter;
74
75 class TransporterRegistry;
76 class SocketAuthenticator;
77
78 class TransporterService : public SocketServer::Service {
79 SocketAuthenticator * m_auth;
80 TransporterRegistry * m_transporter_registry;
81 public:
TransporterService(SocketAuthenticator * auth=0)82 TransporterService(SocketAuthenticator *auth= 0)
83 {
84 m_auth= auth;
85 m_transporter_registry= 0;
86 }
setTransporterRegistry(TransporterRegistry * t)87 void setTransporterRegistry(TransporterRegistry *t)
88 {
89 m_transporter_registry= t;
90 }
91 SocketServer::Session * newSession(NDB_SOCKET_TYPE socket);
92 };
93
94 /**
95 * @class TransporterRegistry
96 * @brief ...
97 */
98 class TransporterRegistry : private TransporterSendBufferHandle {
99 friend class SHM_Transporter;
100 friend class SHM_Writer;
101 friend class Transporter;
102 friend class TransporterService;
103 public:
104 /**
105 * Constructor
106 */
107 TransporterRegistry(TransporterCallback *callback,
108 bool use_default_send_buffer = true,
109 unsigned maxTransporters = MAX_NTRANSPORTERS,
110 unsigned sizeOfLongSignalMemory = 100);
111
112 /**
113 * this handle will be used in the client connect thread
114 * to fetch information on dynamic ports. The old handle
115 * (if set) is destroyed, and this is destroyed by the destructor
116 */
117 void set_mgm_handle(NdbMgmHandle h);
get_mgm_handle(void)118 NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; };
119
120 bool init(NodeId localNodeId);
121
122 /**
123 Handle the handshaking with a new client connection
124 on the server port.
125 NOTE! Connection should be closed if function
126 returns false
127 */
128 bool connect_server(NDB_SOCKET_TYPE sockfd, BaseString& errormsg) const;
129
130 bool connect_client(NdbMgmHandle *h);
131
132 /**
133 * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
134 * and returns the socket.
135 */
136 NDB_SOCKET_TYPE connect_ndb_mgmd(SocketClient *sc);
137
138 /**
139 * Given a connected NdbMgmHandle, turns it into a transporter
140 * and returns the socket.
141 */
142 NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle *h);
143
144 /**
145 * Remove all transporters
146 */
147 void removeAll();
148
149 /**
150 * Disconnect all transporters
151 */
152 void disconnectAll();
153
154 /**
155 * Stops the server, disconnects all the transporter
156 * and deletes them and remove it from the transporter arrays
157 */
158 virtual ~TransporterRegistry();
159
160 bool start_service(SocketServer& server);
161 struct NdbThread* start_clients();
162 bool stop_clients();
163 void start_clients_thread();
164 void update_connections();
165
166 /**
167 * Start/Stop receiving
168 */
169 void startReceiving();
170 void stopReceiving();
171
172 /**
173 * Start/Stop sending
174 */
175 void startSending();
176 void stopSending();
177
178 // A transporter is always in a PerformState.
179 // PerformIO is used initially and as long as any of the events
180 // PerformConnect, ...
181 enum PerformState {
182 CONNECTED = 0,
183 CONNECTING = 1,
184 DISCONNECTED = 2,
185 DISCONNECTING = 3
186 };
getPerformStateString(NodeId nodeId) const187 const char *getPerformStateString(NodeId nodeId) const
188 { return performStateString[(unsigned)performStates[nodeId]]; };
189
getPerformState(NodeId nodeId) const190 PerformState getPerformState(NodeId nodeId) const { return performStates[nodeId]; }
191
192 /**
193 * Get and set methods for PerformState
194 */
195 void do_connect(NodeId node_id);
196 void do_disconnect(NodeId node_id, int errnum = 0);
is_connected(NodeId node_id)197 bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; };
198 void report_connect(NodeId node_id);
199 void report_disconnect(NodeId node_id, int errnum);
200 void report_error(NodeId nodeId, TransporterError errorCode,
201 const char *errorInfo = 0);
202
203 /**
204 * Get and set methods for IOState
205 */
206 IOState ioState(NodeId nodeId);
207 void setIOState(NodeId nodeId, IOState state);
208
209 private:
210
211 bool createTCPTransporter(TransporterConfiguration * config);
212 bool createSCITransporter(TransporterConfiguration * config);
213 bool createSHMTransporter(TransporterConfiguration * config);
214
215 public:
216 /**
217 * configureTransporter
218 *
219 * Configure a transporter, ie. create new if it
220 * does not exist otherwise try to reconfigure it
221 *
222 */
223 bool configureTransporter(TransporterConfiguration * config);
224
225 /**
226 * Allocate send buffer for default send buffer handling.
227 *
228 * Upper layer that implements their own TransporterSendBufferHandle do not
229 * use this, instead they manage their own send buffers.
230 *
231 * Argument is the value of config parameter TotalSendBufferMemory. If 0,
232 * a default will be used of sum(max send buffer) over all transporters.
233 */
234 void allocate_send_buffers(Uint64 total_send_buffer);
235
236 /**
237 * Get sum of max send buffer over all transporters, to be used as a default
238 * for allocate_send_buffers eg.
239 *
240 * Must be called after creating all transporters for returned value to be
241 * correct.
242 */
get_total_max_send_buffer()243 Uint64 get_total_max_send_buffer() { return m_total_max_send_buffer; }
244
get_using_default_send_buffer() const245 bool get_using_default_send_buffer() const{ return m_use_default_send_buffer;}
246
247 /**
248 * Set or clear overloaded bit.
249 * Query if any overloaded bit is set.
250 */
251 void set_status_overloaded(Uint32 nodeId, bool val);
252 const NodeBitmask& get_status_overloaded() const;
253
254 /**
255 * prepareSend
256 *
257 * When IOState is HaltOutput or HaltIO do not send or insert any
258 * signals in the SendBuffer, unless it is intended for the remote
259 * CMVMI block (blockno 252)
260 * Perform prepareSend on the transporter.
261 *
262 * NOTE signalHeader->xxxBlockRef should contain block numbers and
263 * not references
264 */
265 SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
266 const SignalHeader * const signalHeader, Uint8 prio,
267 const Uint32 * const signalData,
268 NodeId nodeId,
269 const LinearSectionPtr ptr[3]);
270
271 SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
272 const SignalHeader * const signalHeader, Uint8 prio,
273 const Uint32 * const signalData,
274 NodeId nodeId,
275 class SectionSegmentPool & pool,
276 const SegmentedSectionPtr ptr[3]);
277 SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
278 const SignalHeader * const signalHeader, Uint8 prio,
279 const Uint32 * const signalData,
280 NodeId nodeId,
281 const GenericSectionPtr ptr[3]);
282 /**
283 * Backwards compatiple methods with default send buffer handling.
284 */
prepareSend(const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const LinearSectionPtr ptr[3])285 SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
286 const Uint32 * const signalData,
287 NodeId nodeId,
288 const LinearSectionPtr ptr[3])
289 {
290 return prepareSend(this, signalHeader, prio, signalData, nodeId, ptr);
291 }
prepareSend(const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,class SectionSegmentPool & pool,const SegmentedSectionPtr ptr[3])292 SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
293 const Uint32 * const signalData,
294 NodeId nodeId,
295 class SectionSegmentPool & pool,
296 const SegmentedSectionPtr ptr[3])
297 {
298 return prepareSend(this, signalHeader, prio, signalData, nodeId, pool, ptr);
299 }
prepareSend(const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const GenericSectionPtr ptr[3])300 SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
301 const Uint32 * const signalData,
302 NodeId nodeId,
303 const GenericSectionPtr ptr[3])
304 {
305 return prepareSend(this, signalHeader, prio, signalData, nodeId, ptr);
306 }
307
308 /**
309 * external_IO
310 *
311 * Equal to: poll(...); perform_IO()
312 *
313 */
314 void external_IO(Uint32 timeOutMillis);
315
pollReceive(Uint32 timeOutMillis)316 inline Uint32 pollReceive(Uint32 timeOutMillis) {
317 return pollReceive(timeOutMillis, m_has_data_transporters);
318 }
319 Uint32 pollReceive(Uint32 timeOutMillis, NodeBitmask& mask);
320 void performReceive();
321 int performSend(NodeId nodeId);
322 void performSend();
323
324 /**
325 * Force sending if more than or equal to sendLimit
326 * number have asked for send. Returns 0 if not sending
327 * and 1 if sending.
328 */
329 int forceSendCheck(int sendLimit);
330
331 #ifdef DEBUG_TRANSPORTER
332 void printState();
333 #endif
334
335 #ifdef ERROR_INSERT
336 /* Utils for testing latency issues */
337 bool isBlocked(NodeId nodeId);
338 void blockReceive(NodeId nodeId);
339 void unblockReceive(NodeId nodeId);
340 #endif
341
342 class Transporter_interface {
343 public:
344 NodeId m_remote_nodeId;
345 int m_s_service_port; // signed port number
346 const char *m_interface;
347 };
348 Vector<Transporter_interface> m_transporter_interface;
349 void add_transporter_interface(NodeId remoteNodeId, const char *interf,
350 int s_port); // signed port. <0 is dynamic
351 Transporter* get_transporter(NodeId nodeId);
352 struct in_addr get_connect_address(NodeId node_id) const;
353 protected:
354
355 private:
356 TransporterCallback *callbackObj;
357
358 NdbMgmHandle m_mgm_handle;
359
360 struct NdbThread *m_start_clients_thread;
361 bool m_run_start_clients_thread;
362
363 int sendCounter;
364 NodeId localNodeId;
365 unsigned maxTransporters;
366 int nTransporters;
367 int nTCPTransporters;
368 int nSCITransporters;
369 int nSHMTransporters;
370
371 #ifdef ERROR_INSERT
372 Bitmask<MAX_NTRANSPORTERS/32> m_blocked;
373 Bitmask<MAX_NTRANSPORTERS/32> m_blocked_with_data;
374 Bitmask<MAX_NTRANSPORTERS/32> m_blocked_disconnected;
375 int m_disconnect_errors[MAX_NTRANSPORTERS];
376 #endif
377
378 /**
379 * Bitmask of transporters that has data "carried over" since
380 * last performReceive
381 */
382 NodeBitmask m_has_data_transporters;
383 #if defined(HAVE_EPOLL_CREATE)
384 int m_epoll_fd;
385 struct epoll_event *m_epoll_events;
386 bool change_epoll(TCP_Transporter *t, bool add);
387 #endif
388 /**
389 * Arrays holding all transporters in the order they are created
390 */
391 TCP_Transporter** theTCPTransporters;
392 SCI_Transporter** theSCITransporters;
393 SHM_Transporter** theSHMTransporters;
394
395 /**
396 * Array, indexed by nodeId, holding all transporters
397 */
398 TransporterType* theTransporterTypes;
399 Transporter** theTransporters;
400
401 /**
402 * State arrays, index by host id
403 */
404 PerformState* performStates;
405 int* m_disconnect_errnum;
406 IOState* ioStates;
407 struct ErrorState {
408 TransporterError m_code;
409 const char *m_info;
410 };
411 struct ErrorState *m_error_states;
412
413 /**
414 * Overloaded bits, for fast check.
415 */
416 NodeBitmask m_status_overloaded;
417
418 /**
419 * Unpack signal data.
420 *
421 * Defined in Packer.cpp.
422 */
423 Uint32 unpack(Uint32 * readPtr,
424 Uint32 bufferSize,
425 NodeId remoteNodeId,
426 IOState state);
427
428 Uint32 * unpack(Uint32 * readPtr,
429 Uint32 * eodPtr,
430 NodeId remoteNodeId,
431 IOState state);
432
433 static Uint32 unpack_length_words(const Uint32 *readPtr, Uint32 maxWords);
434 /**
435 * Disconnect the transporter and remove it from
436 * theTransporters array. Do not allow any holes
437 * in theTransporters. Delete the transporter
438 * and remove it from theIndexedTransporters array
439 */
440 void removeTransporter(NodeId nodeId);
441
442 /**
443 * Used in polling if exists TCP_Transporter
444 */
445 int tcpReadSelectReply;
446 ndb_socket_poller m_socket_poller;
447
448 Uint32 poll_TCP(Uint32 timeOutMillis, NodeBitmask&);
449 Uint32 poll_SCI(Uint32 timeOutMillis, NodeBitmask&);
450 Uint32 poll_SHM(Uint32 timeOutMillis, NodeBitmask&);
451
452 int m_shm_own_pid;
453 int m_transp_count;
454
455 public:
456 bool setup_wakeup_socket();
457 void wakeup();
458 private:
459 bool m_has_extra_wakeup_socket;
460 NDB_SOCKET_TYPE m_extra_wakeup_sockets[2];
461 void consume_extra_sockets();
462
463
464 Uint32 *getWritePtr(TransporterSendBufferHandle *handle,
465 NodeId node, Uint32 lenBytes, Uint32 prio);
466 void updateWritePtr(TransporterSendBufferHandle *handle,
467 NodeId node, Uint32 lenBytes, Uint32 prio);
468
469 /**
470 * TransporterSendBufferHandle implementation.
471 *
472 * Used for default send buffer handling, when the upper layer does not
473 * want to do special buffer handling itself.
474 */
475 virtual Uint32 *getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
476 Uint32 max_use);
477 virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
478 virtual bool forceSend(NodeId node);
479
480 private:
481 /* Send buffer pages. */
482 struct SendBufferPage {
483 /* This is the number of words that will fit in one page of send buffer. */
484 static const Uint32 PGSIZE = 32768;
max_data_bytesTransporterRegistry::SendBufferPage485 static Uint32 max_data_bytes()
486 {
487 return PGSIZE - offsetof(SendBufferPage, m_data);
488 }
489
490 /* Send buffer for one transporter is kept in a single-linked list. */
491 struct SendBufferPage *m_next;
492
493 /* Bytes of send data available in this page. */
494 Uint16 m_bytes;
495 /* Start of unsent data */
496 Uint16 m_start;
497
498 /* Data; real size is to the end of one page. */
499 char m_data[2];
500 };
501
502 /* Send buffer for one transporter. */
503 struct SendBuffer {
504 /* Total size of data in buffer, from m_offset_start_data to end. */
505 Uint32 m_used_bytes;
506 /* Linked list of active buffer pages with first and last pointer. */
507 SendBufferPage *m_first_page;
508 SendBufferPage *m_last_page;
509 };
510
511 SendBufferPage *alloc_page();
512 void release_page(SendBufferPage *page);
513
514 private:
515 /* True if we are using the default send buffer implementation. */
516 bool m_use_default_send_buffer;
517 /* Send buffers. */
518 SendBuffer *m_send_buffers;
519 /* Linked list of free pages. */
520 SendBufferPage *m_page_freelist;
521 /* Original block of memory for pages (so we can free it at exit). */
522 unsigned char *m_send_buffer_memory;
523 /**
524 * Sum of max transporter memory for each transporter.
525 * Used to compute default send buffer size.
526 */
527 Uint64 m_total_max_send_buffer;
528
529 public:
530 Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
531 Uint32 bytes_sent(NodeId node, Uint32 bytes);
532 bool has_data_to_send(NodeId node);
533
534 void reset_send_buffer(NodeId node, bool should_be_empty);
535
536 void print_transporters(const char* where, NdbOut& out = ndbout);
537
538 };
539
540 inline void
set_status_overloaded(Uint32 nodeId,bool val)541 TransporterRegistry::set_status_overloaded(Uint32 nodeId, bool val)
542 {
543 assert(nodeId < MAX_NODES);
544 m_status_overloaded.set(nodeId, val);
545 }
546
547 inline const NodeBitmask&
get_status_overloaded() const548 TransporterRegistry::get_status_overloaded() const
549 {
550 return m_status_overloaded;
551 }
552
553 #endif // Define of TransporterRegistry_H
554