1 /* Copyright (c) 2003-2005, 2007 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #include <ndb_global.h>
18
19 #include "SCI_Transporter.hpp"
20 #include <NdbOut.hpp>
21 #include <NdbSleep.h>
22 #include <NdbTick.h>
23 #include <NdbTick.h>
24
25 #include "TransporterInternalDefinitions.hpp"
26 #include <TransporterCallback.hpp>
27
28 #include <InputStream.hpp>
29 #include <OutputStream.hpp>
30
31 #define FLAGS 0
32 #define DEBUG_TRANSPORTER
SCI_Transporter(TransporterRegistry & t_reg,const char * lHostName,const char * rHostName,int r_port,bool isMgmConnection,Uint32 packetSize,Uint32 bufferSize,Uint32 nAdapters,Uint16 remoteSciNodeId0,Uint16 remoteSciNodeId1,NodeId _localNodeId,NodeId _remoteNodeId,NodeId serverNodeId,bool chksm,bool signalId,Uint32 reportFreq)33 SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,
34 const char *lHostName,
35 const char *rHostName,
36 int r_port,
37 bool isMgmConnection,
38 Uint32 packetSize,
39 Uint32 bufferSize,
40 Uint32 nAdapters,
41 Uint16 remoteSciNodeId0,
42 Uint16 remoteSciNodeId1,
43 NodeId _localNodeId,
44 NodeId _remoteNodeId,
45 NodeId serverNodeId,
46 bool chksm,
47 bool signalId,
48 Uint32 reportFreq) :
49 Transporter(t_reg, tt_SCI_TRANSPORTER,
50 lHostName, rHostName, r_port, isMgmConnection, _localNodeId,
51 _remoteNodeId, serverNodeId, 0, false, chksm, signalId)
52 {
53 DBUG_ENTER("SCI_Transporter::SCI_Transporter");
54 m_PacketSize = (packetSize + 3)/4 ;
55 m_BufferSize = bufferSize;
56 m_sendBuffer.m_buffer = NULL;
57
58 m_RemoteSciNodeId = remoteSciNodeId0;
59
60 if(remoteSciNodeId0 == 0 || remoteSciNodeId1 == 0)
61 m_numberOfRemoteNodes=1;
62 else
63 m_numberOfRemoteNodes=2;
64
65 m_RemoteSciNodeId1 = remoteSciNodeId1;
66
67
68 m_initLocal=false;
69 m_failCounter=0;
70 m_remoteNodes[0]=remoteSciNodeId0;
71 m_remoteNodes[1]=remoteSciNodeId1;
72 m_adapters = nAdapters;
73 m_ActiveAdapterId=0;
74 m_StandbyAdapterId=1;
75
76 m_mapped = false;
77 m_sciinit=false;
78
79 sciAdapters= new SciAdapter[nAdapters* (sizeof (SciAdapter))];
80 if(sciAdapters==NULL) {
81 }
82 m_SourceSegm= new sourceSegm[nAdapters* (sizeof (sourceSegm))];
83 if(m_SourceSegm==NULL) {
84 }
85 m_TargetSegm= new targetSegm[nAdapters* (sizeof (targetSegm))];
86 if(m_TargetSegm==NULL) {
87 }
88 m_reportFreq= reportFreq;
89
90 //reset all statistic counters.
91 #ifdef DEBUG_TRANSPORTER
92 i1024=0;
93 i2048=0;
94 i2049=0;
95 i10242048=0;
96 i20484096=0;
97 i4096=0;
98 i4097=0;
99 #endif
100 DBUG_VOID_RETURN;
101 }
102
disconnectImpl()103 void SCI_Transporter::disconnectImpl()
104 {
105 DBUG_ENTER("SCI_Transporter::disconnectImpl");
106 sci_error_t err;
107 if(m_mapped){
108 setDisconnect();
109 DBUG_PRINT("info", ("connect status = %d, remote node = %d",
110 (int)getConnectionStatus(), remoteNodeId));
111 disconnectRemote();
112 disconnectLocal();
113 }
114
115 // Empty send buffer
116
117 m_sendBuffer.m_dataSize = 0;
118
119 m_initLocal=false;
120 m_mapped = false;
121
122 if(m_sciinit) {
123 for(Uint32 i=0; i<m_adapters ; i++) {
124 SCIClose(sciAdapters[i].scidesc, FLAGS, &err);
125
126 if(err != SCI_ERR_OK) {
127 report_error(TE_SCI_UNABLE_TO_CLOSE_CHANNEL);
128 DBUG_PRINT("error",
129 ("Cannot close channel to the driver. Error code 0x%x",
130 err));
131 }
132 }
133 }
134 m_sciinit=false;
135
136 #ifdef DEBUG_TRANSPORTER
137 ndbout << "total: " << i1024+ i10242048 + i2048+i2049 << endl;
138 ndbout << "<1024: " << i1024 << endl;
139 ndbout << "1024-2047: " << i10242048 << endl;
140 ndbout << "==2048: " << i2048 << endl;
141 ndbout << "2049-4096: " << i20484096 << endl;
142 ndbout << "==4096: " << i4096 << endl;
143 ndbout << ">4096: " << i4097 << endl;
144 #endif
145 DBUG_VOID_RETURN;
146 }
147
148
initTransporter()149 bool SCI_Transporter::initTransporter() {
150 DBUG_ENTER("SCI_Transporter::initTransporter");
151 if(m_BufferSize < (2*MAX_MESSAGE_SIZE + 4096)){
152 m_BufferSize = 2 * MAX_MESSAGE_SIZE + 4096;
153 }
154
155 // Allocate buffers for sending, send buffer size plus 2048 bytes for avoiding
156 // the need to send twice when a large message comes around. Send buffer size is
157 // measured in words.
158 Uint32 sz = 4 * m_PacketSize + MAX_MESSAGE_SIZE;;
159
160 m_sendBuffer.m_sendBufferSize = 4 * ((sz + 3) / 4);
161 m_sendBuffer.m_buffer = new Uint32[m_sendBuffer.m_sendBufferSize / 4];
162 m_sendBuffer.m_dataSize = 0;
163
164 DBUG_PRINT("info",
165 ("Created SCI Send Buffer with buffer size %d and packet size %d",
166 m_sendBuffer.m_sendBufferSize, m_PacketSize * 4));
167 if(!getLinkStatus(m_ActiveAdapterId) ||
168 (m_adapters > 1 &&
169 !getLinkStatus(m_StandbyAdapterId))) {
170 DBUG_PRINT("error",
171 ("The link is not fully operational. Check the cables and the switches"));
172 //NDB should terminate
173 report_error(TE_SCI_LINK_ERROR);
174 DBUG_RETURN(false);
175 }
176 DBUG_RETURN(true);
177 } // initTransporter()
178
179
180
getLocalNodeId(Uint32 adapterNo)181 Uint32 SCI_Transporter::getLocalNodeId(Uint32 adapterNo)
182 {
183 sci_query_adapter_t queryAdapter;
184 sci_error_t error;
185 Uint32 _localNodeId;
186
187 queryAdapter.subcommand = SCI_Q_ADAPTER_NODEID;
188 queryAdapter.localAdapterNo = adapterNo;
189 queryAdapter.data = &_localNodeId;
190
191 SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error);
192
193 if(error != SCI_ERR_OK)
194 return 0;
195 return _localNodeId;
196 }
197
198
getLinkStatus(Uint32 adapterNo)199 bool SCI_Transporter::getLinkStatus(Uint32 adapterNo)
200 {
201 sci_query_adapter_t queryAdapter;
202 sci_error_t error;
203 int linkstatus;
204 queryAdapter.subcommand = SCI_Q_ADAPTER_LINK_OPERATIONAL;
205
206 queryAdapter.localAdapterNo = adapterNo;
207 queryAdapter.data = &linkstatus;
208
209 SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error);
210
211 if(error != SCI_ERR_OK) {
212 DBUG_PRINT("error", ("error %d querying adapter", error));
213 return false;
214 }
215 if(linkstatus<=0)
216 return false;
217 return true;
218 }
219
220
221
initLocalSegment()222 sci_error_t SCI_Transporter::initLocalSegment() {
223 DBUG_ENTER("SCI_Transporter::initLocalSegment");
224 Uint32 segmentSize = m_BufferSize;
225 Uint32 offset = 0;
226 sci_error_t err;
227 if(!m_sciinit) {
228 for(Uint32 i=0; i<m_adapters ; i++) {
229 SCIOpen(&(sciAdapters[i].scidesc), FLAGS, &err);
230 sciAdapters[i].localSciNodeId=getLocalNodeId(i);
231 DBUG_PRINT("info", ("SCInode iD %d adapter %d\n",
232 sciAdapters[i].localSciNodeId, i));
233 if(err != SCI_ERR_OK) {
234 DBUG_PRINT("error",
235 ("Cannot open an SCI virtual device. Error code 0x%x",
236 err));
237 DBUG_RETURN(err);
238 }
239 }
240 }
241
242 m_sciinit=true;
243
244 SCICreateSegment(sciAdapters[0].scidesc,
245 &(m_SourceSegm[0].localHandle),
246 hostSegmentId(localNodeId, remoteNodeId),
247 segmentSize,
248 0,
249 0,
250 0,
251 &err);
252
253 if(err != SCI_ERR_OK) {
254 DBUG_PRINT("error", ("Error creating segment, err = 0x%x", err));
255 DBUG_RETURN(err);
256 } else {
257 DBUG_PRINT("info", ("created segment id : %d",
258 hostSegmentId(localNodeId, remoteNodeId)));
259 }
260
261 /** Prepare the segment*/
262 for(Uint32 i=0; i < m_adapters; i++) {
263 SCIPrepareSegment((m_SourceSegm[0].localHandle),
264 i,
265 FLAGS,
266 &err);
267
268 if(err != SCI_ERR_OK) {
269 DBUG_PRINT("error",
270 ("Local Segment is not accessible by an SCI adapter. Error code 0x%x\n",
271 err));
272 DBUG_RETURN(err);
273 }
274 }
275
276
277 m_SourceSegm[0].mappedMemory =
278 SCIMapLocalSegment((m_SourceSegm[0].localHandle),
279 &(m_SourceSegm[0].lhm[0].map),
280 offset,
281 segmentSize,
282 NULL,
283 FLAGS,
284 &err);
285
286
287
288 if(err != SCI_ERR_OK) {
289 DBUG_PRINT("error", ("Cannot map area of size %d. Error code 0x%x",
290 segmentSize,err));
291 doDisconnect();
292 DBUG_RETURN(err);
293 }
294
295
296 /** Make the local segment available*/
297 for(Uint32 i=0; i < m_adapters; i++) {
298 SCISetSegmentAvailable((m_SourceSegm[0].localHandle),
299 i,
300 FLAGS,
301 &err);
302
303 if(err != SCI_ERR_OK) {
304 DBUG_PRINT("error",
305 ("Local Segment is not available for remote connections. Error code 0x%x\n",
306 err));
307 DBUG_RETURN(err);
308 }
309 }
310 setupLocalSegment();
311 DBUG_RETURN(err);
312
313 } // initLocalSegment()
314
315
doSend()316 bool SCI_Transporter::doSend() {
317 #ifdef DEBUG_TRANSPORTER
318 NDB_TICKS startSec=0, stopSec=0;
319 Uint32 startMicro=0, stopMicro=0, totalMicro=0;
320 #endif
321 sci_error_t err;
322 Uint32 retry=0;
323
324 const char * const sendPtr = (char*)m_sendBuffer.m_buffer;
325 const Uint32 sizeToSend = 4 * m_sendBuffer.m_dataSize; //Convert to number of bytes
326
327 if (sizeToSend > 0){
328 #ifdef DEBUG_TRANSPORTER
329 if(sizeToSend < 1024 )
330 i1024++;
331 if(sizeToSend > 1024 && sizeToSend < 2048 )
332 i10242048++;
333 if(sizeToSend==2048)
334 i2048++;
335 if(sizeToSend>2048 && sizeToSend < 4096)
336 i20484096++;
337 if(sizeToSend==4096)
338 i4096++;
339 if(sizeToSend==4097)
340 i4097++;
341 #endif
342
343 tryagain:
344 retry++;
345 if (retry > 3) {
346 DBUG_PRINT("error", ("SCI Transfer failed"));
347 report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
348 return false;
349 }
350 Uint32 * insertPtr = (Uint32 *)
351 (m_TargetSegm[m_ActiveAdapterId].writer)->getWritePtr(sizeToSend);
352
353 if(insertPtr != 0) {
354
355 const Uint32 remoteOffset=(Uint32)
356 ((char*)insertPtr -
357 (char*)(m_TargetSegm[m_ActiveAdapterId].mappedMemory));
358
359 SCIMemCpy(m_TargetSegm[m_ActiveAdapterId].sequence,
360 (void*)sendPtr,
361 m_TargetSegm[m_ActiveAdapterId].rhm[m_ActiveAdapterId].map,
362 remoteOffset,
363 sizeToSend,
364 SCI_FLAG_ERROR_CHECK,
365 &err);
366
367 if (err != SCI_ERR_OK) {
368 if (err == SCI_ERR_OUT_OF_RANGE ||
369 err == SCI_ERR_SIZE_ALIGNMENT ||
370 err == SCI_ERR_OFFSET_ALIGNMENT) {
371 DBUG_PRINT("error", ("Data transfer error = %d", err));
372 report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
373 return false;
374 }
375 if(err == SCI_ERR_TRANSFER_FAILED) {
376 if(getLinkStatus(m_ActiveAdapterId))
377 goto tryagain;
378 if (m_adapters == 1) {
379 DBUG_PRINT("error", ("SCI Transfer failed"));
380 report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
381 return false;
382 }
383 m_failCounter++;
384 Uint32 temp=m_ActiveAdapterId;
385 if (getLinkStatus(m_StandbyAdapterId)) {
386 failoverShmWriter();
387 SCIStoreBarrier(m_TargetSegm[m_StandbyAdapterId].sequence,0);
388 m_ActiveAdapterId=m_StandbyAdapterId;
389 m_StandbyAdapterId=temp;
390 DBUG_PRINT("error", ("Swapping from adapter %u to %u",
391 m_StandbyAdapterId, m_ActiveAdapterId));
392 } else {
393 report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
394 DBUG_PRINT("error", ("SCI Transfer failed"));
395 }
396 }
397 } else {
398 SHM_Writer * writer = (m_TargetSegm[m_ActiveAdapterId].writer);
399 writer->updateWritePtr(sizeToSend);
400
401 Uint32 sendLimit = writer->getBufferSize();
402 sendLimit -= writer->getWriteIndex();
403
404 m_sendBuffer.m_dataSize = 0;
405 m_sendBuffer.m_forceSendLimit = sendLimit;
406 }
407 } else {
408 /**
409 * If we end up here, the SCI segment is full.
410 */
411 DBUG_PRINT("error", ("the segment is full for some reason"));
412 return false;
413 } //if
414 }
415 return true;
416 } // doSend()
417
418
419
failoverShmWriter()420 void SCI_Transporter::failoverShmWriter() {
421 #if 0
422 (m_TargetSegm[m_StandbyAdapterId].writer)
423 ->copyIndexes((m_TargetSegm[m_StandbyAdapterId].writer));
424 #endif
425 } //failoverShm
426
427
setupLocalSegment()428 void SCI_Transporter::setupLocalSegment()
429 {
430 DBUG_ENTER("SCI_Transporter::setupLocalSegment");
431 Uint32 sharedSize = 0;
432 sharedSize =4096; //start of the buffer is page aligend
433
434 Uint32 sizeOfBuffer = m_BufferSize;
435
436 sizeOfBuffer -= sharedSize;
437
438 Uint32 * localReadIndex =
439 (Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory;
440 Uint32 * localWriteIndex = (Uint32*)(localReadIndex+ 1);
441 m_localStatusFlag = (Uint32*)(localReadIndex + 3);
442
443 char * localStartOfBuf = (char*)
444 ((char*)m_SourceSegm[m_ActiveAdapterId].mappedMemory+sharedSize);
445
446 * localReadIndex = 0;
447 * localWriteIndex = 0;
448
449 const Uint32 slack = MAX_MESSAGE_SIZE;
450
451 reader = new SHM_Reader(localStartOfBuf,
452 sizeOfBuffer,
453 slack,
454 localReadIndex,
455 localWriteIndex);
456
457 reader->clear();
458 DBUG_VOID_RETURN;
459 } //setupLocalSegment
460
setupRemoteSegment()461 void SCI_Transporter::setupRemoteSegment()
462 {
463 DBUG_ENTER("SCI_Transporter::setupRemoteSegment");
464 Uint32 sharedSize = 0;
465 sharedSize =4096; //start of the buffer is page aligned
466
467 Uint32 sizeOfBuffer = m_BufferSize;
468 const Uint32 slack = MAX_MESSAGE_SIZE;
469 sizeOfBuffer -= sharedSize;
470
471 Uint32 *segPtr = (Uint32*) m_TargetSegm[m_ActiveAdapterId].mappedMemory ;
472
473 Uint32 * remoteReadIndex = (Uint32*)segPtr;
474 Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1);
475 m_remoteStatusFlag = (Uint32*)(segPtr + 3);
476
477 char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize));
478
479 writer = new SHM_Writer(remoteStartOfBuf,
480 sizeOfBuffer,
481 slack,
482 remoteReadIndex,
483 remoteWriteIndex);
484
485 writer->clear();
486
487 m_TargetSegm[0].writer=writer;
488
489 m_sendBuffer.m_forceSendLimit = writer->getBufferSize();
490
491 if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {
492 report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
493 DBUG_PRINT("error", ("Unable to create sequence on active"));
494 doDisconnect();
495 }
496 if (m_adapters > 1) {
497 segPtr = (Uint32*) m_TargetSegm[m_StandbyAdapterId].mappedMemory ;
498
499 Uint32 * remoteReadIndex2 = (Uint32*)segPtr;
500 Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1);
501 m_remoteStatusFlag2 = (Uint32*)(segPtr + 3);
502
503 char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize);
504
505 /**
506 * setup a writer. writer2 is used to mirror the changes of
507 * writer on the standby
508 * segment, so that in the case of a failover, we can switch
509 * to the stdby seg. quickly.*
510 */
511 writer2 = new SHM_Writer(remoteStartOfBuf2,
512 sizeOfBuffer,
513 slack,
514 remoteReadIndex2,
515 remoteWriteIndex2);
516
517 * remoteReadIndex = 0;
518 * remoteWriteIndex = 0;
519 writer2->clear();
520 m_TargetSegm[1].writer=writer2;
521 if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) {
522 report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
523 DBUG_PRINT("error", ("Unable to create sequence on standby"));
524 doDisconnect();
525 }
526 }
527 DBUG_VOID_RETURN;
528 } //setupRemoteSegment
529
530 bool
init_local()531 SCI_Transporter::init_local()
532 {
533 DBUG_ENTER("SCI_Transporter::init_local");
534 if(!m_initLocal) {
535 if(initLocalSegment()!=SCI_ERR_OK){
536 NdbSleep_MilliSleep(10);
537 //NDB SHOULD TERMINATE AND COMPUTER REBOOTED!
538 report_error(TE_SCI_CANNOT_INIT_LOCALSEGMENT);
539 DBUG_RETURN(false);
540 }
541 m_initLocal=true;
542 }
543 DBUG_RETURN(true);
544 }
545
546 bool
init_remote()547 SCI_Transporter::init_remote()
548 {
549 DBUG_ENTER("SCI_Transporter::init_remote");
550 sci_error_t err;
551 Uint32 offset = 0;
552 if(!m_mapped ) {
553 DBUG_PRINT("info", ("Map remote segments"));
554 for(Uint32 i=0; i < m_adapters ; i++) {
555 m_TargetSegm[i].rhm[i].remoteHandle=0;
556 SCIConnectSegment(sciAdapters[i].scidesc,
557 &(m_TargetSegm[i].rhm[i].remoteHandle),
558 m_remoteNodes[i],
559 remoteSegmentId(localNodeId, remoteNodeId),
560 i,
561 0,
562 0,
563 0,
564 0,
565 &err);
566
567 if(err != SCI_ERR_OK) {
568 NdbSleep_MilliSleep(10);
569 DBUG_PRINT("error", ("Error connecting segment, err 0x%x", err));
570 DBUG_RETURN(false);
571 }
572 }
573 // Map the remote memory segment into program space
574 for(Uint32 i=0; i < m_adapters ; i++) {
575 m_TargetSegm[i].mappedMemory =
576 SCIMapRemoteSegment((m_TargetSegm[i].rhm[i].remoteHandle),
577 &(m_TargetSegm[i].rhm[i].map),
578 offset,
579 m_BufferSize,
580 NULL,
581 FLAGS,
582 &err);
583
584 if(err!= SCI_ERR_OK) {
585 DBUG_PRINT("error",
586 ("Cannot map a segment to the remote node %d. Error code 0x%x",
587 m_RemoteSciNodeId, err));
588 //NDB SHOULD TERMINATE AND COMPUTER REBOOTED!
589 report_error(TE_SCI_CANNOT_MAP_REMOTESEGMENT);
590 DBUG_RETURN(false);
591 }
592 }
593 m_mapped=true;
594 setupRemoteSegment();
595 setConnected();
596 DBUG_PRINT("info", ("connected and mapped to segment, remoteNode: %d",
597 remoteNodeId));
598 DBUG_PRINT("info", ("remoteSegId: %d",
599 remoteSegmentId(localNodeId, remoteNodeId)));
600 DBUG_RETURN(true);
601 } else {
602 DBUG_RETURN(getConnectionStatus());
603 }
604 }
605
606 bool
connect_client_impl(NDB_SOCKET_TYPE sockfd)607 SCI_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
608 {
609 SocketInputStream s_input(sockfd);
610 SocketOutputStream s_output(sockfd);
611 char buf[256];
612 DBUG_ENTER("SCI_Transporter::connect_client_impl");
613 // Wait for server to create and attach
614 if (s_input.gets(buf, 256) == 0) {
615 DBUG_PRINT("error", ("No initial response from server in SCI"));
616 NDB_CLOSE_SOCKET(sockfd);
617 DBUG_RETURN(false);
618 }
619 if (!init_local()) {
620 NDB_CLOSE_SOCKET(sockfd);
621 DBUG_RETURN(false);
622 }
623
624 // Send ok to server
625 s_output.println("sci client 1 ok");
626
627 if (!init_remote()) {
628 NDB_CLOSE_SOCKET(sockfd);
629 DBUG_RETURN(false);
630 }
631 // Wait for ok from server
632 if (s_input.gets(buf, 256) == 0) {
633 DBUG_PRINT("error", ("No second response from server in SCI"));
634 NDB_CLOSE_SOCKET(sockfd);
635 DBUG_RETURN(false);
636 }
637 // Send ok to server
638 s_output.println("sci client 2 ok");
639
640 NDB_CLOSE_SOCKET(sockfd);
641 DBUG_PRINT("info", ("Successfully connected client to node %d",
642 remoteNodeId));
643 DBUG_RETURN(true);
644 }
645
646 bool
connect_server_impl(NDB_SOCKET_TYPE sockfd)647 SCI_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
648 {
649 SocketOutputStream s_output(sockfd);
650 SocketInputStream s_input(sockfd);
651 char buf[256];
652 DBUG_ENTER("SCI_Transporter::connect_server_impl");
653
654 if (!init_local()) {
655 NDB_CLOSE_SOCKET(sockfd);
656 DBUG_RETURN(false);
657 }
658 // Send ok to client
659 s_output.println("sci server 1 ok");
660
661 // Wait for ok from client
662 if (s_input.gets(buf, 256) == 0) {
663 DBUG_PRINT("error", ("No response from client in SCI"));
664 NDB_CLOSE_SOCKET(sockfd);
665 DBUG_RETURN(false);
666 }
667
668 if (!init_remote()) {
669 NDB_CLOSE_SOCKET(sockfd);
670 DBUG_RETURN(false);
671 }
672 // Send ok to client
673 s_output.println("sci server 2 ok");
674 // Wait for ok from client
675 if (s_input.gets(buf, 256) == 0) {
676 DBUG_PRINT("error", ("No second response from client in SCI"));
677 NDB_CLOSE_SOCKET(sockfd);
678 DBUG_RETURN(false);
679 }
680
681 NDB_CLOSE_SOCKET(sockfd);
682 DBUG_PRINT("info", ("Successfully connected server to node %d",
683 remoteNodeId));
684 DBUG_RETURN(true);
685 }
686
createSequence(Uint32 adapterid)687 sci_error_t SCI_Transporter::createSequence(Uint32 adapterid) {
688 sci_error_t err;
689 SCICreateMapSequence((m_TargetSegm[adapterid].rhm[adapterid].map),
690 &(m_TargetSegm[adapterid].sequence),
691 SCI_FLAG_FAST_BARRIER,
692 &err);
693 return err;
694 } // createSequence()
695
disconnectLocal()696 bool SCI_Transporter::disconnectLocal()
697 {
698 DBUG_ENTER("SCI_Transporter::disconnectLocal");
699 sci_error_t err;
700 m_ActiveAdapterId=0;
701
702 /** Free resources used by a local segment
703 */
704
705 SCIUnmapSegment(m_SourceSegm[0].lhm[0].map,0,&err);
706 if(err!=SCI_ERR_OK) {
707 report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT);
708 DBUG_PRINT("error", ("Unable to unmap segment"));
709 DBUG_RETURN(false);
710 }
711
712 SCIRemoveSegment((m_SourceSegm[m_ActiveAdapterId].localHandle),
713 FLAGS,
714 &err);
715
716 if(err!=SCI_ERR_OK) {
717 report_error(TE_SCI_UNABLE_TO_REMOVE_SEGMENT);
718 DBUG_PRINT("error", ("Unable to remove segment"));
719 DBUG_RETURN(false);
720 }
721 DBUG_PRINT("info", ("Local memory segment is unmapped and removed"));
722 DBUG_RETURN(true);
723 } // disconnectLocal()
724
725
disconnectRemote()726 bool SCI_Transporter::disconnectRemote() {
727 DBUG_ENTER("SCI_Transporter::disconnectRemote");
728 sci_error_t err;
729 for(Uint32 i=0; i<m_adapters; i++) {
730 /**
731 * Segment unmapped, disconnect from the remotely connected segment
732 */
733 SCIUnmapSegment(m_TargetSegm[i].rhm[i].map,0,&err);
734 if(err!=SCI_ERR_OK) {
735 report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT);
736 DBUG_PRINT("error", ("Unable to unmap segment"));
737 DBUG_RETURN(false);
738 }
739
740 SCIDisconnectSegment(m_TargetSegm[i].rhm[i].remoteHandle,
741 FLAGS,
742 &err);
743 if(err!=SCI_ERR_OK) {
744 report_error(TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT);
745 DBUG_PRINT("error", ("Unable to disconnect segment"));
746 DBUG_RETURN(false);
747 }
748 DBUG_PRINT("info", ("Remote memory segment is unmapped and disconnected"));
749 }
750 DBUG_RETURN(true);
751 } // disconnectRemote()
752
753
~SCI_Transporter()754 SCI_Transporter::~SCI_Transporter() {
755 DBUG_ENTER("SCI_Transporter::~SCI_Transporter");
756 // Close channel to the driver
757 doDisconnect();
758 if(m_sendBuffer.m_buffer != NULL)
759 delete[] m_sendBuffer.m_buffer;
760 DBUG_VOID_RETURN;
761 } // ~SCI_Transporter()
762
closeSCI()763 void SCI_Transporter::closeSCI() {
764 // Termination of SCI
765 sci_error_t err;
766 DBUG_ENTER("SCI_Transporter::closeSCI");
767
768 // Disconnect and remove remote segment
769 disconnectRemote();
770
771 // Unmap and remove local segment
772
773 disconnectLocal();
774
775 // Closes an SCI virtual device
776 SCIClose(activeSCIDescriptor, FLAGS, &err);
777
778 if(err != SCI_ERR_OK) {
779 DBUG_PRINT("error",
780 ("Cannot close SCI channel to the driver. Error code 0x%x",
781 err));
782 }
783 SCITerminate();
784 DBUG_VOID_RETURN;
785 } // closeSCI()
786
787 Uint32 *
getWritePtr(Uint32 lenBytes,Uint32 prio)788 SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio)
789 {
790
791 Uint32 sci_buffer_remaining = m_sendBuffer.m_forceSendLimit;
792 Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize;
793 Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2;
794 Uint32 new_curr_data_size = curr_data_size + lenBytes;
795 if ((curr_data_size >= send_buf_size) ||
796 (curr_data_size >= sci_buffer_remaining)) {
797 /**
798 * The new message will not fit in the send buffer. We need to
799 * send the send buffer before filling it up with the new
800 * signal data. If current data size will spill over buffer edge
801 * we will also send to ensure correct operation.
802 */
803 if (!doSend()) {
804 /**
805 * We were not successfull sending, report 0 as meaning buffer full and
806 * upper levels handle retries and other recovery matters.
807 */
808 return 0;
809 }
810 }
811 /**
812 * New signal fits, simply fill it up with more data.
813 */
814 Uint32 sz = m_sendBuffer.m_dataSize;
815 return &m_sendBuffer.m_buffer[sz];
816 }
817
818 void
updateWritePtr(Uint32 lenBytes,Uint32 prio)819 SCI_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
820
821 Uint32 sz = m_sendBuffer.m_dataSize;
822 Uint32 packet_size = m_PacketSize;
823 sz += ((lenBytes + 3) >> 2);
824 m_sendBuffer.m_dataSize = sz;
825
826 if(sz > packet_size) {
827 /**-------------------------------------------------
828 * Buffer is full and we are ready to send. We will
829 * not wait since the signal is already in the buffer.
830 * Force flag set has the same indication that we
831 * should always send. If it is not possible to send
832 * we will not worry since we will soon be back for
833 * a renewed trial.
834 *-------------------------------------------------
835 */
836 doSend();
837 }
838 }
839
840 enum SciStatus {
841 SCIDISCONNECT = 1,
842 SCICONNECTED = 2
843 };
844
845 bool
getConnectionStatus()846 SCI_Transporter::getConnectionStatus() {
847 if(*m_localStatusFlag == SCICONNECTED &&
848 (*m_remoteStatusFlag == SCICONNECTED ||
849 ((m_adapters > 1) &&
850 *m_remoteStatusFlag2 == SCICONNECTED)))
851 return true;
852 else
853 return false;
854 }
855
856 void
setConnected()857 SCI_Transporter::setConnected() {
858 *m_remoteStatusFlag = SCICONNECTED;
859 if (m_adapters > 1) {
860 *m_remoteStatusFlag2 = SCICONNECTED;
861 }
862 *m_localStatusFlag = SCICONNECTED;
863 }
864
865 void
setDisconnect()866 SCI_Transporter::setDisconnect() {
867 if(getLinkStatus(m_ActiveAdapterId))
868 *m_remoteStatusFlag = SCIDISCONNECT;
869 if (m_adapters > 1) {
870 if(getLinkStatus(m_StandbyAdapterId))
871 *m_remoteStatusFlag2 = SCIDISCONNECT;
872 }
873 }
874
875 bool
checkConnected()876 SCI_Transporter::checkConnected() {
877 if (*m_localStatusFlag == SCIDISCONNECT) {
878 return false;
879 }
880 else
881 return true;
882 }
883
884 static bool init = false;
885
886 bool
initSCI()887 SCI_Transporter::initSCI() {
888 DBUG_ENTER("SCI_Transporter::initSCI");
889 if(!init){
890 sci_error_t error;
891 // Initialize SISCI library
892 SCIInitialize(0, &error);
893 if(error != SCI_ERR_OK) {
894 DBUG_PRINT("error", ("Cannot initialize SISCI library."));
895 DBUG_PRINT("error",
896 ("Inconsistency between SISCI library and SISCI driver. Error code 0x%x",
897 error));
898 DBUG_RETURN(false);
899 }
900 init = true;
901 }
902 DBUG_RETURN(true);
903 }
904
905 Uint32
get_free_buffer() const906 SCI_Transporter::get_free_buffer() const
907 {
908 return (m_TargetSegm[m_ActiveAdapterId].writer)->get_free_buffer();
909 }
910
911