1 /*
2 Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #include <ndb_global.h>
27
28 #include "SHM_Transporter.hpp"
29 #include "TransporterInternalDefinitions.hpp"
30 #include <TransporterCallback.hpp>
31 #include <NdbSleep.h>
32 #include <NdbOut.hpp>
33 #include <NdbMutex.h>
34 #include <ndb_localtime.h>
35
36 #include <InputStream.hpp>
37 #include <OutputStream.hpp>
38
39 #include <EventLogger.hpp>
40 extern EventLogger * g_eventLogger;
41
42 #if 0
43 #define DEBUG_FPRINTF(arglist) do { fprintf arglist ; } while (0)
44 #else
45 #define DEBUG_FPRINTF(a)
46 #endif
47
SHM_Transporter(TransporterRegistry & t_reg,TrpId transporter_index,const char * lHostName,const char * rHostName,int r_port,bool isMgmConnection_arg,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,bool checksum,bool signalId,key_t _shmKey,Uint32 _shmSize,bool preSendChecksum,Uint32 _spintime,Uint32 _send_buffer_size)48 SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
49 TrpId transporter_index,
50 const char *lHostName,
51 const char *rHostName,
52 int r_port,
53 bool isMgmConnection_arg,
54 NodeId lNodeId,
55 NodeId rNodeId,
56 NodeId serverNodeId,
57 bool checksum,
58 bool signalId,
59 key_t _shmKey,
60 Uint32 _shmSize,
61 bool preSendChecksum,
62 Uint32 _spintime,
63 Uint32 _send_buffer_size) :
64 Transporter(t_reg, transporter_index, tt_SHM_TRANSPORTER,
65 lHostName, rHostName, r_port, isMgmConnection_arg,
66 lNodeId, rNodeId, serverNodeId,
67 0, false, checksum, signalId,
68 _send_buffer_size,
69 preSendChecksum,
70 _spintime),
71 shmKey(_shmKey),
72 shmSize(_shmSize)
73 {
74 #ifndef _WIN32
75 shmId= 0;
76 #endif
77 _shmSegCreated = false;
78 _attached = false;
79
80 shmBuf = 0;
81 reader = 0;
82 writer = 0;
83
84 setupBuffersDone = false;
85 m_server_locked = false;
86 m_client_locked = false;
87 #ifdef DEBUG_TRANSPORTER
88 printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
89 #endif
90 m_signal_threshold = 262144;
91 }
92
SHM_Transporter(TransporterRegistry & t_reg,const SHM_Transporter * t)93 SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
94 const SHM_Transporter* t)
95 :
96 Transporter(t_reg,
97 0,
98 tt_SHM_TRANSPORTER,
99 t->localHostName,
100 t->remoteHostName,
101 t->m_s_port,
102 t->isMgmConnection,
103 t->localNodeId,
104 t->remoteNodeId,
105 t->isServer ? t->localNodeId : t->remoteNodeId,
106 0,
107 false,
108 t->checksumUsed,
109 t->signalIdUsed,
110 t->m_max_send_buffer,
111 t->check_send_checksum,
112 t->m_spintime)
113 {
114 shmKey = t->shmKey;
115 shmSize = t->shmSize;
116 #ifndef NDB_WIN32
117 shmId= 0;
118 #endif
119 _shmSegCreated = false;
120 _attached = false;
121
122 shmBuf = 0;
123 reader = 0;
124 writer = 0;
125
126 setupBuffersDone = false;
127 m_server_locked = false;
128 m_client_locked = false;
129 #ifdef DEBUG_TRANSPORTER
130 printf("shm key (%d - %d) = %d\n",
131 t->localNodeId, t->remoteNodeId, shmKey);
132 #endif
133 m_signal_threshold = 262144;
134 send_checksum_state.init();
135 }
136
137
138 bool
configure_derived(const TransporterConfiguration * conf)139 SHM_Transporter::configure_derived(const TransporterConfiguration* conf)
140 {
141 if ((key_t)conf->shm.shmKey == shmKey &&
142 (int)conf->shm.shmSize == shmSize)
143 return true; // No change
144 return false; // Can't reconfigure
145 }
146
147
~SHM_Transporter()148 SHM_Transporter::~SHM_Transporter()
149 {
150 DEBUG_FPRINTF((stderr, "(%u)doDisconnect(%u), line: %d\n",
151 localNodeId, remoteNodeId, __LINE__));
152 doDisconnect();
153 }
154
155 void
resetBuffers()156 SHM_Transporter::resetBuffers()
157 {
158 assert(!isConnected());
159 DEBUG_FPRINTF((stderr, "(%u)resetBuffers(%u), line: %d\n",
160 localNodeId, remoteNodeId, __LINE__));
161 detach_shm(true);
162 send_checksum_state.init();
163 }
164
165 bool
initTransporter()166 SHM_Transporter::initTransporter()
167 {
168 return true;
169 }
170
171 bool
setupBuffers()172 SHM_Transporter::setupBuffers()
173 {
174 Uint32 sharedSize = 0;
175 sharedSize += 64;
176 sharedSize += sizeof(NdbMutex);
177
178 const Uint32 slack = MAX(MAX_RECV_MESSAGE_BYTESIZE,
179 MAX_SEND_MESSAGE_BYTESIZE);
180
181 /**
182 * NOTE: There is 7th shared variable in Win2k (sharedCountAttached).
183 */
184 Uint32 sizeOfBuffer = shmSize;
185 sizeOfBuffer -= 2*sharedSize;
186 sizeOfBuffer /= 2;
187
188 Uint32 * base1 = (Uint32*)shmBuf;
189
190 Uint32 * sharedReadIndex1 = base1;
191 Uint32 * sharedWriteIndex1 = base1 + 1;
192 serverStatusFlag = base1 + 4;
193 serverAwakenedFlag = base1 + 5;
194 serverUpFlag = base1 + 6;
195 serverMutex = (NdbMutex*)(base1 + 16);
196 char * startOfBuf1 = shmBuf+sharedSize;
197
198 Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
199 Uint32 * sharedReadIndex2 = base2;
200 Uint32 * sharedWriteIndex2 = base2 + 1;
201 clientStatusFlag = base2 + 4;
202 clientAwakenedFlag = base2 + 5;
203 clientUpFlag = base2 + 6;
204 clientMutex = (NdbMutex*)(base2 + 16);
205 char * startOfBuf2 = ((char *)base2)+sharedSize;
206
207 if (isServer)
208 {
209 if ((NdbMutex_Init_Shared(serverMutex) != 0) ||
210 (NdbMutex_Init_Shared(clientMutex) != 0))
211 {
212 return true;
213 }
214 * serverAwakenedFlag = 0;
215 * clientAwakenedFlag = 0;
216 * serverUpFlag = 1;
217 * clientUpFlag = 0;
218 }
219 else
220 {
221 NdbMutex_Lock(serverMutex);
222 * clientUpFlag = 1;
223 NdbMutex_Unlock(serverMutex);
224 }
225
226 if (reader != 0)
227 {
228 DEBUG_FPRINTF((stderr, "(%u)reader = %p, m_shm_reader: %p (%u) LINE:%d",
229 localNodeId, reader, &m_shm_reader, remoteNodeId, __LINE__));
230 }
231 assert(reader == 0);
232 assert(writer == 0);
233 if(isServer)
234 {
235 * serverStatusFlag = 0;
236 reader = new (&m_shm_reader)
237 SHM_Reader(startOfBuf1,
238 sizeOfBuffer,
239 slack,
240 sharedReadIndex1,
241 sharedWriteIndex1);
242
243 writer = new (&m_shm_writer)
244 SHM_Writer(startOfBuf2,
245 sizeOfBuffer,
246 slack,
247 sharedReadIndex2,
248 sharedWriteIndex2);
249
250 * sharedReadIndex1 = 0;
251 * sharedWriteIndex1 = 0;
252
253 * sharedReadIndex2 = 0;
254 * sharedWriteIndex2 = 0;
255
256 * serverStatusFlag = 1;
257
258 #ifdef DEBUG_TRANSPORTER
259 printf("-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);
260 printf("Reader at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
261 printf("sharedReadIndex1 at %ld (%p) = %d\n",
262 (char*)sharedReadIndex1-shmBuf,
263 sharedReadIndex1, *sharedReadIndex1);
264 printf("sharedWriteIndex1 at %ld (%p) = %d\n",
265 (char*)sharedWriteIndex1-shmBuf,
266 sharedWriteIndex1, *sharedWriteIndex1);
267
268 printf("Writer at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
269 printf("sharedReadIndex2 at %ld (%p) = %d\n",
270 (char*)sharedReadIndex2-shmBuf,
271 sharedReadIndex2, *sharedReadIndex2);
272 printf("sharedWriteIndex2 at %ld (%p) = %d\n",
273 (char*)sharedWriteIndex2-shmBuf,
274 sharedWriteIndex2, *sharedWriteIndex2);
275
276 printf("sizeOfBuffer = %d\n", sizeOfBuffer);
277 #endif
278 }
279 else
280 {
281 * clientStatusFlag = 0;
282 reader = new (&m_shm_reader)
283 SHM_Reader(startOfBuf2,
284 sizeOfBuffer,
285 slack,
286 sharedReadIndex2,
287 sharedWriteIndex2);
288
289 writer = new (&m_shm_writer)
290 SHM_Writer(startOfBuf1,
291 sizeOfBuffer,
292 slack,
293 sharedReadIndex1,
294 sharedWriteIndex1);
295
296 * sharedReadIndex2 = 0;
297 * sharedWriteIndex1 = 0;
298
299 * clientStatusFlag = 1;
300 #ifdef DEBUG_TRANSPORTER
301 printf("-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);
302 printf("Reader at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
303 printf("sharedReadIndex2 at %ld (%p) = %d\n",
304 (char*)sharedReadIndex2-shmBuf,
305 sharedReadIndex2, *sharedReadIndex2);
306 printf("sharedWriteIndex2 at %ld (%p) = %d\n",
307 (char*)sharedWriteIndex2-shmBuf,
308 sharedWriteIndex2, *sharedWriteIndex2);
309
310 printf("Writer at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
311 printf("sharedReadIndex1 at %ld (%p) = %d\n",
312 (char*)sharedReadIndex1-shmBuf,
313 sharedReadIndex1, *sharedReadIndex1);
314 printf("sharedWriteIndex1 at %ld (%p) = %d\n",
315 (char*)sharedWriteIndex1-shmBuf,
316 sharedWriteIndex1, *sharedWriteIndex1);
317
318 printf("sizeOfBuffer = %d\n", sizeOfBuffer);
319 #endif
320 }
321 #ifdef DEBUG_TRANSPORTER
322 printf("Mapping from %p to %p\n", shmBuf, shmBuf+shmSize);
323 #endif
324 return false;
325 }
326
327 bool
connect_server_impl(NDB_SOCKET_TYPE sockfd)328 SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
329 {
330 DBUG_ENTER("SHM_Transporter::connect_server_impl");
331 DEBUG_FPRINTF((stderr, "(%u)connect_server_impl(%u)\n",
332 localNodeId, remoteNodeId));
333 SocketOutputStream s_output(sockfd);
334 SocketInputStream s_input(sockfd);
335
336 // Create
337 if (!_shmSegCreated)
338 {
339 if (!ndb_shm_create())
340 {
341 DEBUG_FPRINTF((stderr, "(%u)connect_server_impl failed LINE:%d,"
342 " to remote node %d\n",
343 localNodeId, __LINE__, remoteNodeId));
344 DBUG_RETURN(false);
345 }
346 _shmSegCreated = true;
347 DEBUG_FPRINTF((stderr, "(%u)ndb_shm_create()(%u)\n",
348 localNodeId, remoteNodeId));
349 }
350
351 // Attach
352 if (!_attached)
353 {
354 if (!ndb_shm_attach())
355 {
356 DEBUG_FPRINTF((stderr, "(%u)connect_server_impl failed LINE:%d,"
357 " to remote node %d\n",
358 localNodeId, __LINE__, remoteNodeId));
359 DBUG_RETURN(false);
360 }
361 _attached = true;
362 DEBUG_FPRINTF((stderr, "(%u)ndb_shm_attach()(%u)\n",
363 localNodeId, remoteNodeId));
364 }
365
366 require(!setupBuffersDone);
367 {
368 DEBUG_FPRINTF((stderr, "(%u)setupBuffers(%u) Line:%d\n",
369 localNodeId, remoteNodeId, __LINE__));
370 if (setupBuffers())
371 {
372 fprintf(stderr, "Shared memory not supported on this platform\n");
373 detach_shm(false);
374 DBUG_RETURN(false);
375 }
376 setupBuffersDone=true;
377 }
378
379 // Send ok to client
380 s_output.println("shm server 1 ok: %d",
381 m_transporter_registry.m_shm_own_pid);
382
383 // Wait for ok from client
384 char buf[256];
385 DBUG_PRINT("info", ("Wait for ok from client"));
386 if (s_input.gets(buf, sizeof(buf)) == 0)
387 {
388 DEBUG_FPRINTF((stderr, "(%u)connect_server_impl failed LINE:%d,"
389 " to remote node %d\n",
390 localNodeId, __LINE__, remoteNodeId));
391 detach_shm(false);
392 DBUG_RETURN(false);
393 }
394
395 if (sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
396 {
397 DEBUG_FPRINTF((stderr, "(%u)connect_server_impl failed LINE:%d,"
398 " to remote node %d\n",
399 localNodeId, __LINE__, remoteNodeId));
400 detach_shm(false);
401 DBUG_RETURN(false);
402 }
403
404 DEBUG_FPRINTF((stderr, "(%u)connect_common()(%u)\n",
405 localNodeId, remoteNodeId));
406 int r= connect_common(sockfd);
407
408 if (r)
409 {
410 // Send ok to client
411 s_output.println("shm server 2 ok");
412 // Wait for ok from client
413 if (s_input.gets(buf, 256) == 0)
414 {
415 DEBUG_FPRINTF((stderr, "(%u)connect_server_impl failed LINE:%d,"
416 " to remote node %d\n",
417 localNodeId, __LINE__, remoteNodeId));
418 detach_shm(false);
419 DBUG_RETURN(false);
420 }
421 DBUG_PRINT("info", ("Successfully connected server to node %d",
422 remoteNodeId));
423 }
424 DEBUG_FPRINTF((stderr, "(%u)set_socket()(%u)\n",
425 localNodeId, remoteNodeId));
426 set_socket(sockfd);
427 DBUG_RETURN(r);
428 }
429
430 void
set_socket(NDB_SOCKET_TYPE sockfd)431 SHM_Transporter::set_socket(NDB_SOCKET_TYPE sockfd)
432 {
433 set_get(sockfd, IPPROTO_TCP, TCP_NODELAY, "TCP_NODELAY", 1);
434 set_get(sockfd, SOL_SOCKET, SO_KEEPALIVE, "SO_KEEPALIVE", 1);
435 ndb_socket_nonblock(sockfd, true);
436 get_callback_obj()->lock_transporter(remoteNodeId, m_transporter_index);
437 theSocket = sockfd;
438 send_checksum_state.init();
439 get_callback_obj()->unlock_transporter(remoteNodeId, m_transporter_index);
440 }
441
442 bool
connect_client_impl(NDB_SOCKET_TYPE sockfd)443 SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
444 {
445 DBUG_ENTER("SHM_Transporter::connect_client_impl");
446 DEBUG_FPRINTF((stderr, "(%u)connect_client_impl(%u)\n",
447 localNodeId, remoteNodeId));
448 SocketInputStream s_input(sockfd);
449 SocketOutputStream s_output(sockfd);
450 char buf[256];
451
452 // Wait for server to create and attach
453 DBUG_PRINT("info", ("Wait for server to create and attach"));
454 if (s_input.gets(buf, 256) == 0)
455 {
456 DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
457 " to remote node %d\n",
458 localNodeId, __LINE__, remoteNodeId));
459
460 DBUG_PRINT("error", ("Server id %d did not attach",
461 remoteNodeId));
462 DBUG_RETURN(false);
463 }
464
465 if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
466 {
467 DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
468 " to remote node %d\n",
469 localNodeId, __LINE__, remoteNodeId));
470 DBUG_RETURN(false);
471 }
472
473 // Create
474 if(!_shmSegCreated)
475 {
476 if (!ndb_shm_get())
477 {
478 DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
479 " to remote node %d\n",
480 localNodeId, __LINE__, remoteNodeId));
481 DBUG_PRINT("error", ("Failed create of shm seg to node %d",
482 remoteNodeId));
483 DBUG_RETURN(false);
484 }
485 _shmSegCreated = true;
486 DEBUG_FPRINTF((stderr, "(%u)ndb_shm_get(%u)\n",
487 localNodeId, remoteNodeId));
488 }
489
490 // Attach
491 if (!_attached)
492 {
493 if (!ndb_shm_attach())
494 {
495 DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
496 " to remote node %d\n",
497 localNodeId, __LINE__, remoteNodeId));
498 DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
499 remoteNodeId));
500 DBUG_RETURN(false);
501 }
502 _attached = true;
503 DEBUG_FPRINTF((stderr, "(%u)ndb_shm_attach(%u)\n",
504 localNodeId, remoteNodeId));
505 }
506
507 require(!setupBuffersDone);
508 {
509 DEBUG_FPRINTF((stderr, "(%u)setupBuffers(%u) Line:%d\n",
510 localNodeId, remoteNodeId, __LINE__));
511 if (setupBuffers())
512 {
513 fprintf(stderr, "Shared memory not supported on this platform\n");
514 detach_shm(false);
515 DBUG_RETURN(false);
516 }
517 else
518 {
519 setupBuffersDone=true;
520 }
521 }
522
523 // Send ok to server
524 s_output.println("shm client 1 ok: %d",
525 m_transporter_registry.m_shm_own_pid);
526
527 DEBUG_FPRINTF((stderr, "(%u)connect_common(%u)\n",
528 localNodeId, remoteNodeId));
529 int r = connect_common(sockfd);
530
531 if (r)
532 {
533 // Wait for ok from server
534 DBUG_PRINT("info", ("Wait for ok from server"));
535 if (s_input.gets(buf, 256) == 0)
536 {
537 DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
538 " to remote node %d\n",
539 localNodeId, __LINE__, remoteNodeId));
540 DBUG_PRINT("error", ("No ok from server node %d",
541 remoteNodeId));
542 detach_shm(false);
543 DBUG_RETURN(false);
544 }
545 // Send ok to server
546 s_output.println("shm client 2 ok");
547 DBUG_PRINT("info", ("Successfully connected client to node %d",
548 remoteNodeId));
549 }
550 else
551 {
552 DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
553 " to remote node %d\n",
554 localNodeId, __LINE__, remoteNodeId));
555 detach_shm(false);
556 }
557 set_socket(sockfd);
558 DEBUG_FPRINTF((stderr, "(%u)set_socket(%u)\n",
559 localNodeId, remoteNodeId));
560 DBUG_RETURN(r);
561 }
562
563 bool
connect_common(NDB_SOCKET_TYPE sockfd)564 SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
565 {
566 if (!checkConnected())
567 {
568 DEBUG_FPRINTF((stderr, "(%u)checkConnected failed(%u)\n",
569 localNodeId, remoteNodeId));
570 return false;
571 }
572 if (isServer)
573 {
574 DEBUG_FPRINTF((stderr, "(%u)ndb_shm_destroy(%u)\n",
575 localNodeId, remoteNodeId));
576 ndb_shm_destroy();
577 }
578
579 require(setupBuffersDone);
580 Uint32 waited = 0;
581 while (waited < m_timeOutMillis)
582 {
583 if (*serverStatusFlag == 1 && *clientStatusFlag == 1)
584 return true;
585 NdbSleep_MilliSleep(10);
586 waited += 10;
587 }
588 DEBUG_FPRINTF((stderr, "(%u)setupBuffers(%u) failed\n",
589 localNodeId, remoteNodeId));
590 DBUG_PRINT("error", ("Failed to set up buffers to node %d",
591 remoteNodeId));
592 return false;
593 }
594
595 void
remove_mutexes()596 SHM_Transporter::remove_mutexes()
597 {
598 if (ndb_socket_valid(theSocket))
599 {
600 NdbMutex_Deinit(serverMutex);
601 NdbMutex_Deinit(clientMutex);
602 }
603 }
604
setupBuffersUndone()605 void SHM_Transporter::setupBuffersUndone()
606 {
607 if (setupBuffersDone)
608 {
609 NdbMutex_Lock(serverMutex);
610 NdbMutex_Lock(clientMutex);
611 setupBuffersDone = false;
612 DEBUG_FPRINTF((stderr, "(%u)setupBuffersUndone(%u)\n",
613 localNodeId, remoteNodeId));
614 NdbMutex_Unlock(serverMutex);
615 NdbMutex_Unlock(clientMutex);
616 }
617 }
618
619 void
disconnect_socket()620 SHM_Transporter::disconnect_socket()
621 {
622 get_callback_obj()->lock_transporter(remoteNodeId, m_transporter_index);
623
624 NDB_SOCKET_TYPE sock = theSocket;
625 ndb_socket_invalidate(&theSocket);
626
627
628 if(ndb_socket_valid(sock))
629 {
630 if(ndb_socket_close(sock) < 0){
631 report_error(TE_ERROR_CLOSING_SOCKET);
632 }
633 }
634 setupBuffersUndone();
635 get_callback_obj()->unlock_transporter(remoteNodeId, m_transporter_index);
636 }
637
638 /**
639 * This method is used when we need to wake up other side to
640 * ensure that the messages we transported in shared memory
641 * transporter is quickly handled.
642 *
643 * The first step is to grab a mutex from the shared memory segment,
644 * next we check the status of the transporter on the other side. If
645 * this transporter is asleep we will simply send 1 byte, it doesn't
646 * matter what the byte value is. We set it to 0 just to ensure it
647 * has defined value for potential future use.
648 *
649 * If we discover that the other side is awake there is no need to
650 * do anything, the other side will check the shared memory before
651 * it goes to sleep.
652 */
653 void
wakeup()654 SHM_Transporter::wakeup()
655 {
656 Uint32 one_more_try = 5;
657 char buf[1];
658 int iovcnt = 1;
659 struct iovec iov[1];
660
661 lock_reverse_mutex();
662 bool awake_state = handle_reverse_awake_state();
663 unlock_reverse_mutex();
664 if (awake_state)
665 {
666 return;
667 }
668 iov[0].iov_len = 1;
669 iov[0].iov_base = &buf[0];
670 buf[0] = 0;
671 do
672 {
673 one_more_try--;
674 int nBytesSent = (int)ndb_socket_writev(theSocket, iov, iovcnt);
675 if (nBytesSent != 1)
676 {
677 require(nBytesSent < 0); //Should not be possible with any other value
678 int err = ndb_socket_errno();
679 if (DISCONNECT_ERRNO(err, nBytesSent))
680 {
681 do_disconnect(err, true);
682 }
683 }
684 else
685 {
686 return;
687 }
688 } while (one_more_try);
689 }
690
691 void
doReceive()692 SHM_Transporter::doReceive()
693 {
694 bool one_more_try;
695 char buf[128];
696 do
697 {
698 one_more_try = false;
699 const int nBytesRead = (int)ndb_recv(theSocket, buf, sizeof(buf), 0);
700 if (unlikely(nBytesRead <= 0))
701 {
702 int err;
703 if (nBytesRead == 0)
704 {
705 err = 0;
706 }
707 else
708 {
709 err = ndb_socket_errno();
710 }
711 if (DISCONNECT_ERRNO(err, nBytesRead))
712 {
713 do_disconnect(err, false);
714 }
715 else
716 {
717 one_more_try = false;
718 }
719 }
720 else if (unlikely(nBytesRead == sizeof(buf)))
721 {
722 one_more_try = true;
723 }
724 } while (one_more_try);
725 }
726
727 /**
728 * The need_wakeup flag is always set except when called from
729 * forceSend in mt.cpp, in this case we only send to try to
730 * free up some send buffers. So there is no need to ensure
731 * that the other side is awakened in this special case.
732 */
733 bool
doSend(bool need_wakeup)734 SHM_Transporter::doSend(bool need_wakeup)
735 {
736 struct iovec iov[64];
737 Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
738
739 if (!setupBuffersDone)
740 {
741 DEBUG_FPRINTF((stderr, "(%u)doSend(%u)\n", localNodeId, remoteNodeId));
742 return false;
743 }
744 if (cnt == 0)
745 {
746 /**
747 * Need to handle the wakeup flag, even when there is nothing to
748 * send. We can call doSend in an attempt to do an emergency send.
749 * In this case we could register a pending send even with an
750 * empty send buffer. So this could lead to a later doSend call
751 * that have no data to send. So the idea is to delay the wakeup
752 * until end of execution even if the send buffer is full in the
753 * middle of executing signals.
754 */
755 if (need_wakeup)
756 {
757 wakeup();
758 }
759 return false;
760 }
761
762 Uint32 sum = 0;
763 for(Uint32 i = 0; i<cnt; i++)
764 {
765 assert(iov[i].iov_len);
766 sum += iov[i].iov_len;
767 }
768
769 int nBytesSent = writer->writev(iov, cnt);
770 #if 0
771 time_t curr_time;
772 tm tm_buf;
773 curr_time = ::time((time_t*)NULL);
774 ndb_localtime_r(&curr_time, &tm_buf);
775 Uint32 minute = tm_buf.tm_min;
776 Uint32 second = tm_buf.tm_sec;
777 Uint64 millis = NdbTick_CurrentMillisecond();
778 DEBUG_FPRINTF((stderr, "%u.%u.%llu (%u)W:writev(%u),"
779 " sent: %d, free: %u"
780 ", w_inx: %u, r_inx: %u\n",
781 minute, second,
782 millis % Uint64(1000),
783 localNodeId, remoteNodeId, nBytesSent,
784 writer->get_free_buffer(),
785 writer->getWriteIndex(),
786 writer->getReadIndex()));
787 #endif
788 if (nBytesSent > 0)
789 {
790 iovec_data_sent(nBytesSent);
791 m_bytes_sent += nBytesSent;
792 sendCount++;
793 sendSize += nBytesSent;
794 if (sendCount >= reportFreq)
795 {
796 get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
797 sendCount = 0;
798 sendSize = 0;
799 }
800
801 if (need_wakeup)
802 {
803 wakeup();
804 }
805 if (Uint32(nBytesSent) == sum &&
806 (cnt != NDB_ARRAY_SIZE(iov)) &&
807 need_wakeup)
808 {
809 return false;
810 }
811 return true;
812 }
813 return true;
814 }
815
816 /*
817 * We need the extra m_client_locked and m_server_locked
818 * variables to ensure that we don't unlock something
819 * that was never locked. The timing of the setting up
820 * of buffers and locking of mutexes isn't perfect,
821 * therefore we protect those calls through these variables.
822 */
823 void
lock_mutex()824 SHM_Transporter::lock_mutex()
825 {
826 if (setupBuffersDone)
827 {
828 if (isServer)
829 {
830 NdbMutex_Lock(serverMutex);
831 m_server_locked = true;
832 }
833 else
834 {
835 NdbMutex_Lock(clientMutex);
836 m_client_locked = true;
837 }
838 }
839 }
840
841 void
unlock_mutex()842 SHM_Transporter::unlock_mutex()
843 {
844 if (setupBuffersDone)
845 {
846 if (isServer)
847 {
848 if (m_server_locked)
849 NdbMutex_Unlock(serverMutex);
850 }
851 else
852 {
853 if (m_client_locked)
854 NdbMutex_Unlock(clientMutex);
855 }
856 }
857 }
858
859 void
lock_reverse_mutex()860 SHM_Transporter::lock_reverse_mutex()
861 {
862 if (setupBuffersDone)
863 {
864 if (isServer)
865 {
866 NdbMutex_Lock(clientMutex);
867 m_client_locked = true;
868 }
869 else
870 {
871 NdbMutex_Lock(serverMutex);
872 m_server_locked = true;
873 }
874 }
875 }
876
877 void
unlock_reverse_mutex()878 SHM_Transporter::unlock_reverse_mutex()
879 {
880 if (setupBuffersDone)
881 {
882 if (isServer)
883 {
884 if (m_client_locked)
885 NdbMutex_Unlock(clientMutex);
886 }
887 else
888 {
889 if (m_server_locked)
890 NdbMutex_Unlock(serverMutex);
891 }
892 }
893 }
894
895 void
set_awake_state(Uint32 awake_state)896 SHM_Transporter::set_awake_state(Uint32 awake_state)
897 {
898 if (setupBuffersDone)
899 {
900 if (isServer)
901 {
902 *serverStatusFlag = awake_state;
903 *serverAwakenedFlag = 0;
904 }
905 else
906 {
907 *clientStatusFlag = awake_state;
908 *clientAwakenedFlag = 0;
909 }
910 }
911 }
912
913 bool
handle_reverse_awake_state()914 SHM_Transporter::handle_reverse_awake_state()
915 {
916 /**
917 * We are sending to the other side. We need to understand if we
918 * should send a wakeup byte to the other side. If we already did
919 * so and the other side still hasn't woke up, we need not do it
920 * again. If the other side is awake we also need not send any
921 * wakeup byte.
922 */
923 if (setupBuffersDone)
924 {
925 if (isServer)
926 {
927 if (*clientStatusFlag == 1 || *clientAwakenedFlag == 1)
928 {
929 return true;
930 }
931 else
932 {
933 *clientAwakenedFlag = 1;
934 return false;
935 }
936 }
937 else
938 {
939 if (*serverStatusFlag == 1 || *serverAwakenedFlag == 1)
940 {
941 return true;
942 }
943 else
944 {
945 *serverAwakenedFlag = 1;
946 return false;
947 }
948 }
949 }
950 else
951 {
952 return true;
953 }
954 }
955
956 void
updateReceivePtr(TransporterReceiveHandle & recvdata,Uint32 * ptr)957 SHM_Transporter::updateReceivePtr(TransporterReceiveHandle& recvdata,
958 Uint32 *ptr)
959 {
960 Uint32 size_read = reader->updateReadPtr(ptr);
961 #if 0
962 time_t curr_time;
963 tm tm_buf;
964 curr_time = ::time((time_t*)NULL);
965 ndb_localtime_r(&curr_time, &tm_buf);
966 Uint32 minute = tm_buf.tm_min;
967 Uint32 second = tm_buf.tm_sec;
968 Uint64 millis = NdbTick_CurrentMillisecond();
969 DEBUG_FPRINTF((stderr, "%u.%u.%llu (%u)updateReadPtr(%u),"
970 " sz_read: %u, r_inx: %u"
971 ", w_inx: %u\n",
972 minute, second,
973 millis % Uint64(1000),
974 localNodeId, remoteNodeId,
975 size_read,
976 reader->getReadIndex(),
977 reader->getWriteIndex()));
978 #endif
979 receiveCount++;
980 receiveSize += size_read;
981 m_bytes_received += size_read;
982 if (receiveCount == reportFreq)
983 {
984 recvdata.reportReceiveLen(remoteNodeId,
985 receiveCount,
986 receiveSize);
987 receiveCount = 0;
988 receiveSize = 0;
989 }
990 }
991
992 /**
993 * send_is_possible is only called in situations with high load.
994 * So it is not critical to use the mutex protection here.
995 */
996 bool
send_is_possible(int timeout_millisec) const997 SHM_Transporter::send_is_possible(int timeout_millisec) const
998 {
999 do
1000 {
1001 if (setupBuffersDone)
1002 {
1003 if (writer->get_free_buffer() > MAX_SEND_MESSAGE_BYTESIZE)
1004 {
1005 return true;
1006 }
1007 if (timeout_millisec > 0)
1008 {
1009 DEBUG_FPRINTF((stderr, "send_is_possible, wait 10ms\n"));
1010 NdbSleep_MilliSleep(timeout_millisec);
1011 timeout_millisec = 0;
1012 }
1013 DEBUG_FPRINTF((stderr, "send_is_possible, timed out\n"));
1014 return false;
1015 }
1016 else
1017 {
1018 break;
1019 }
1020 } while (1);
1021 return true;
1022 }
1023