1 /*
2 Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef Transporter_H
26 #define Transporter_H
27
28 #include <ndb_global.h>
29
30 #include <SocketClient.hpp>
31
32 #include <TransporterRegistry.hpp>
33 #include <TransporterCallback.hpp>
34 #include "TransporterDefinitions.hpp"
35 #include "Packer.hpp"
36
37 #include <NdbMutex.h>
38 #include <NdbThread.h>
39
40 #include "ndb_socket.h"
41
42 #define DISCONNECT_ERRNO(e, sz) ( \
43 (sz == 0) || \
44 (!((sz == -1) && \
45 ((e == SOCKET_EAGAIN) || \
46 (e == SOCKET_EWOULDBLOCK) || \
47 (e == SOCKET_EINTR)))))
48
49 class Transporter {
50 friend class TransporterRegistry;
51 friend class Multi_Transporter;
52 friend class Qmgr;
53 public:
54 virtual bool initTransporter() = 0;
55
56 /**
57 * Destructor
58 */
59 virtual ~Transporter();
60
61
62 /**
63 * Disconnect node/socket
64 */
65 bool do_disconnect(int err, bool send_source);
66
67 /**
68 * Clear any data buffered in the transporter.
69 * Should only be called in a disconnected state.
70 */
resetBuffers()71 virtual void resetBuffers() {}
72
73 /**
74 * Is this transporter part of a multi transporter.
75 * It is a real transporter, but can be connected
76 * when the node is in the state connected.
77 */
isPartOfMultiTransporter()78 virtual bool isPartOfMultiTransporter()
79 {
80 return (m_multi_transporter_instance != 0);
81 }
82
get_multi_transporter_instance()83 Uint32 get_multi_transporter_instance()
84 {
85 return m_multi_transporter_instance;
86 }
isMultiTransporter()87 virtual bool isMultiTransporter()
88 {
89 return false;
90 }
91
set_multi_transporter_instance(Uint32 val)92 void set_multi_transporter_instance(Uint32 val)
93 {
94 m_multi_transporter_instance = val;
95 }
96
get_bytes_sent() const97 virtual Uint64 get_bytes_sent() const
98 {
99 return m_bytes_sent;
100 }
101
get_bytes_received() const102 virtual Uint64 get_bytes_received() const
103 {
104 return m_bytes_received;
105 }
106
107 /**
108 * In most cases we only use transporter per node connection.
109 * But in cases where the transporter is heavily loaded we can
110 * have multiple transporters to send for one node connection.
111 * In this case theNodeIdTransporters points to a Multi_Transporter
112 * object that has implemented a hash algorithm for
113 * get_send_transporter based on sending thread and receiving
114 * thread.
115 */
get_send_transporter(Uint32 recBlock,Uint32 sendBlock)116 virtual Transporter* get_send_transporter(Uint32 recBlock, Uint32 sendBlock)
117 {
118 (void)recBlock;
119 (void)sendBlock;
120 return this;
121 }
122
123 /**
124 * None blocking
125 * Use isConnected() to check status
126 */
127 virtual bool connect_client();
128 bool connect_client(NDB_SOCKET_TYPE sockfd);
129 bool connect_server(NDB_SOCKET_TYPE socket, BaseString& errormsg);
130
131 /**
132 * Returns socket used (sockets are used for all transporters to ensure
133 * we can wake up also shared memory transporters and other types of
134 * transporters in consistent manner.
135 */
136 NDB_SOCKET_TYPE getSocket() const;
137
138 /**
139 * Blocking
140 */
141 void doDisconnect();
142
143 /**
144 * Are we currently connected
145 */
146 bool isConnected() const;
147
148 /**
149 * Remote Node Id
150 */
151 NodeId getRemoteNodeId() const;
152
153 /**
154 * Index into allTransporters array.
155 */
156 TrpId getTransporterIndex() const;
157 void setTransporterIndex(TrpId);
158 /**
159 * Local (own) Node Id
160 */
161 NodeId getLocalNodeId() const;
162
163 /**
164 * Get port we're connecting to (signed)
165 */
get_s_port() const166 int get_s_port() const {
167 return m_s_port;
168 }
169
170 /**
171 * Set port to connect to (signed)
172 */
set_s_port(int port)173 void set_s_port(int port) {
174 m_s_port = port;
175 }
176
update_status_overloaded(Uint32 used)177 void update_status_overloaded(Uint32 used)
178 {
179 m_transporter_registry.set_status_overloaded(remoteNodeId,
180 used >= m_overload_limit);
181 m_transporter_registry.set_status_slowdown(remoteNodeId,
182 used >= m_slowdown_limit);
183 }
184
185 virtual bool doSend(bool need_wakeup = true) = 0;
186
187 /* Get the configured maximum send buffer usage. */
get_max_send_buffer()188 Uint32 get_max_send_buffer() { return m_max_send_buffer; }
189
get_connect_count()190 Uint32 get_connect_count() { return m_connect_count; }
191
inc_overload_count()192 void inc_overload_count() { m_overload_count++; }
get_overload_count()193 Uint32 get_overload_count() { return m_overload_count; }
inc_slowdown_count()194 void inc_slowdown_count() { m_slowdown_count++; }
get_slowdown_count()195 Uint32 get_slowdown_count() { return m_slowdown_count; }
set_recv_thread_idx(Uint32 recv_thread_idx)196 void set_recv_thread_idx (Uint32 recv_thread_idx)
197 {
198 m_recv_thread_idx = recv_thread_idx;
199 }
set_transporter_active(bool active)200 void set_transporter_active(bool active)
201 {
202 m_is_active = active;
203 }
get_recv_thread_idx()204 Uint32 get_recv_thread_idx() { return m_recv_thread_idx; }
205
206 TransporterType getTransporterType() const;
207
208 /**
209 * Only applies to TCP transporter, abort on any other object.
210 * Used as part of shutting down transporter when switching to
211 * multi socket setup.
212 * Shut down only for writes when all data have been sent.
213 */
shutdown()214 virtual void shutdown() { abort();}
215
216 protected:
217 Transporter(TransporterRegistry &,
218 TrpId transporter_index,
219 TransporterType,
220 const char *lHostName,
221 const char *rHostName,
222 int s_port,
223 bool isMgmConnection,
224 NodeId lNodeId,
225 NodeId rNodeId,
226 NodeId serverNodeId,
227 int byteorder,
228 bool compression,
229 bool checksum,
230 bool signalId,
231 Uint32 max_send_buffer,
232 bool _presend_checksum,
233 Uint32 spintime);
234
235 virtual bool configure(const TransporterConfiguration* conf);
236 virtual bool configure_derived(const TransporterConfiguration* conf) = 0;
237
238 /**
239 * Blocking, for max timeOut milli seconds
240 * Returns true if connect succeded
241 */
242 virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd) = 0;
243 virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd) = 0;
pre_connect_options(NDB_SOCKET_TYPE sockfd)244 virtual int pre_connect_options(NDB_SOCKET_TYPE sockfd) { return 0;}
245
246 /**
247 * Blocking
248 */
249 virtual void disconnectImpl() = 0;
250
251 /**
252 * Remote host name/and address
253 */
254 char remoteHostName[256];
255 char localHostName[256];
256
257 int m_s_port;
258
259 Uint32 m_spintime;
get_spintime()260 Uint32 get_spintime()
261 {
262 return m_spintime;
263 }
264 const NodeId remoteNodeId;
265 const NodeId localNodeId;
266
267 TrpId m_transporter_index;
268 const bool isServer;
269
270 int byteOrder;
271 bool compressionUsed;
272 bool checksumUsed;
273 bool check_send_checksum;
274 bool signalIdUsed;
275 Packer m_packer;
276 Uint32 m_max_send_buffer;
277 /* Overload limit, as configured with the OverloadLimit config parameter. */
278 Uint32 m_overload_limit;
279 Uint32 m_slowdown_limit;
280 void resetCounters();
281 Uint64 m_bytes_sent;
282 Uint64 m_bytes_received;
283 Uint32 m_connect_count;
284 Uint32 m_overload_count;
285 Uint32 m_slowdown_count;
286
287 // Sending/Receiving socket used by both client and server
288 NDB_SOCKET_TYPE theSocket;
289 private:
290 SocketClient *m_socket_client;
291 struct in_addr m_connect_address;
292
293 virtual bool send_is_possible(int timeout_millisec) const = 0;
294 virtual bool send_limit_reached(int bufsize) = 0;
295
296 void update_connect_state(bool connected);
297
298 protected:
299 /**
300 * means that we transform an MGM connection into
301 * a transporter connection
302 */
303 bool isMgmConnection;
304
305 Uint32 m_multi_transporter_instance;
306 Uint32 m_recv_thread_idx;
307 bool m_is_active;
308
309 Uint32 m_os_max_iovec;
310 Uint32 m_timeOutMillis;
311 bool m_connected; // Are we connected
312 TransporterType m_type;
313
314 /**
315 * Statistics
316 */
317 Uint32 reportFreq;
318 Uint32 receiveCount;
319 Uint64 receiveSize;
320 Uint32 sendCount;
321 Uint64 sendSize;
322
323 TransporterRegistry &m_transporter_registry;
get_callback_obj()324 TransporterCallback *get_callback_obj() { return m_transporter_registry.callbackObj; }
report_error(enum TransporterError err,const char * info=0)325 void report_error(enum TransporterError err, const char *info = 0)
326 { m_transporter_registry.report_error(remoteNodeId, err, info); }
327
328 Uint32 fetch_send_iovec_data(struct iovec dst[], Uint32 cnt);
329 void iovec_data_sent(int nBytesSent);
330
331 void set_get(NDB_SOCKET_TYPE fd,
332 int level,
333 int optval,
334 const char *optname,
335 int val);
336 /*
337 * Keep checksum state for Protocol6 messages over a byte stream.
338 */
339 class checksum_state {
340 enum cs_states
341 {
342 CS_INIT,
343 CS_MSG_CHECK,
344 CS_MSG_NOCHECK
345 };
346 cs_states state;
347 Uint32 chksum; // of already sent bytes, rotated so next byte to process matches first byte of chksum
348 Uint16 pending; // remaining bytes before state change
349 public:
350 bool computev(const struct iovec *iov, int iovcnt, size_t bytecnt = SIZE_T_MAX);
checksum_state()351 checksum_state(): state(CS_INIT), chksum(0), pending(4) {}
init()352 void init() { state = CS_INIT; chksum = 0; pending = 4; }
353 private:
354 bool compute(const void* bytes, size_t len);
355 static void static_asserts(); // container of static asserts, not to be called
356 void dumpBadChecksumInfo(Uint32 inputSum,
357 Uint32 badSum,
358 size_t offset,
359 Uint32 remaining,
360 const void* buf,
361 size_t len) const;
362
363 };
364 checksum_state send_checksum_state;
365 };
366
367 inline
368 NDB_SOCKET_TYPE
getSocket() const369 Transporter::getSocket() const {
370 return theSocket;
371 }
372
373 inline
374 TransporterType
getTransporterType() const375 Transporter::getTransporterType() const
376 {
377 return m_type;
378 }
379
380 inline
381 bool
isConnected() const382 Transporter::isConnected() const {
383 return m_connected;
384 }
385
386 inline
387 NodeId
getRemoteNodeId() const388 Transporter::getRemoteNodeId() const {
389 return remoteNodeId;
390 }
391
392 inline
393 TrpId
getTransporterIndex() const394 Transporter::getTransporterIndex() const {
395 return m_transporter_index;
396 }
397
398 inline
399 void
setTransporterIndex(TrpId val)400 Transporter::setTransporterIndex(TrpId val)
401 {
402 m_transporter_index = val;
403 }
404
405 inline
406 NodeId
getLocalNodeId() const407 Transporter::getLocalNodeId() const {
408 return localNodeId;
409 }
410
411 /**
412 * Get data to send (in addition to data possibly remaining from previous
413 * partial send).
414 */
415 inline
416 Uint32
fetch_send_iovec_data(struct iovec dst[],Uint32 cnt)417 Transporter::fetch_send_iovec_data(struct iovec dst[], Uint32 cnt)
418 {
419 return get_callback_obj()->get_bytes_to_send_iovec(remoteNodeId,
420 m_transporter_index,
421 dst,
422 cnt);
423 }
424
425 inline
426 void
iovec_data_sent(int nBytesSent)427 Transporter::iovec_data_sent(int nBytesSent)
428 {
429 Uint32 used_bytes = get_callback_obj()->bytes_sent(remoteNodeId,
430 m_transporter_index,
431 nBytesSent);
432 update_status_overloaded(used_bytes);
433 }
434
435 inline
436 void
static_asserts()437 Transporter::checksum_state::static_asserts()
438 {
439 STATIC_ASSERT(MAX_SEND_MESSAGE_BYTESIZE == (Uint16)MAX_SEND_MESSAGE_BYTESIZE);
440 STATIC_ASSERT(SIZE_T_MAX == (size_t)SIZE_T_MAX);
441 }
442
443 inline
444 bool
compute(const void * buf,size_t len)445 Transporter::checksum_state::compute(const void* buf, size_t len)
446 {
447 const Uint32 inputSum = chksum;
448 Uint32 off = 0;
449 unsigned char* psum = static_cast<unsigned char*>(static_cast<void*>(&chksum));
450 const unsigned char* bytes = static_cast<const unsigned char*>(buf);
451
452 while (off < len)
453 {
454 const Uint32 available = len - off;
455 switch (state)
456 {
457 case CS_INIT:
458 {
459 assert(pending <= 4);
460 assert(chksum == 0 || pending < 4);
461 const Uint32 nb = MIN(pending, available);
462 memcpy(psum + (4-pending), bytes + off, nb);
463 off+= nb;
464 pending-= nb;
465
466 if (pending == 0)
467 {
468 /* Msg header word 0 complete, parse to determine msg length */
469 assert(Protocol6::getMessageLength(chksum) <= (MAX_SEND_MESSAGE_BYTESIZE >> 2));
470 assert(Protocol6::getMessageLength(chksum) >= 2);
471 pending = (Protocol6::getMessageLength(chksum) * 4) - 4; /* Word 0 eaten */
472 state = (Protocol6::getCheckSumIncluded(chksum)? CS_MSG_CHECK : CS_MSG_NOCHECK);
473 }
474 break;
475 }
476 case CS_MSG_CHECK:
477 case CS_MSG_NOCHECK:
478 {
479 if (available < pending)
480 {
481 /* Only part of current msg body present */
482 if (state == CS_MSG_CHECK)
483 {
484 /* Add available content to the checksum */
485 chksum = computeXorChecksumBytes(bytes + off, available, chksum);
486 }
487 off += available;
488 pending -= available;
489 }
490 else
491 {
492 /* All of current msg body present, consume and check it */
493 if (state == CS_MSG_CHECK)
494 {
495 chksum = computeXorChecksumBytes(bytes + off, pending, chksum);
496 if (chksum != 0)
497 {
498 dumpBadChecksumInfo(inputSum,
499 chksum,
500 off,
501 pending,
502 buf,
503 len);
504 return false;
505 }
506 }
507 off += pending;
508
509 /* Now we are ready for the next msg */
510 pending = 4;
511 state = CS_INIT;
512 }
513 break;
514 }
515 }
516 } // while (off < len)
517
518 return true;
519 }
520
521
522 inline
523 bool
computev(const struct iovec * iov,int iovcnt,size_t bytecnt)524 Transporter::checksum_state::computev(const struct iovec *iov, int iovcnt, size_t bytecnt)
525 {
526 // bytecnt is SIZE_T_MAX implies use all iovec
527 size_t off = 0;
528 bool ok = true;
529 for(int iovi = 0; ok && bytecnt > off && iovi < iovcnt; iovi ++)
530 {
531 int nb = iov[iovi].iov_len;
532 if (bytecnt < off + nb)
533 {
534 nb = bytecnt - off;
535 }
536 if (!compute(iov[iovi].iov_base, nb))
537 {
538 fprintf(stderr,
539 "Transporter::checksum_state::computev() failed on IOV %u/%u "
540 "byteCount %llu off %llu nb %u\n",
541 iovi,
542 iovcnt,
543 Uint64(bytecnt),
544 Uint64(off),
545 nb);
546 /* TODO : Dump more IOV + bytecnt details */
547 return false;
548 }
549 off += nb;
550 }
551 if (bytecnt != SIZE_T_MAX && bytecnt != off)
552 {
553 fprintf(stderr,
554 "Transporter::checksum_state::computev() failed : "
555 "bytecnt %llu off %llu\n",
556 Uint64(bytecnt),
557 Uint64(off));
558 ok = false;
559 }
560 return ok;
561 }
562
563 #endif // Define of Transporter_H
564