1 /*
2    Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #define DBTUP_C
26 #define DBTUP_BUFFER_CPP
27 #include "Dbtup.hpp"
28 #include <RefConvert.hpp>
29 #include <ndb_limits.h>
30 #include <pc.hpp>
31 #include <signaldata/TransIdAI.hpp>
32 
33 #define JAM_FILE_ID 410
34 
35 
execSEND_PACKED(Signal * signal)36 void Dbtup::execSEND_PACKED(Signal* signal)
37 {
38   Uint16 hostId;
39   Uint32 i;
40   Uint32 TpackedListIndex= cpackedListIndex;
41   bool present = false;
42   for (i= 0; i < TpackedListIndex; i++) {
43     jam();
44     hostId= cpackedList[i];
45     ndbrequire((hostId - 1) < (MAX_NODES - 1)); // Also check not zero
46     HostBuffer* const buffer = &hostBuffer[hostId];
47     Uint32 TpacketTA= buffer->noOfPacketsTA;
48     if (TpacketTA != 0) {
49       jamDebug();
50 
51       if (ERROR_INSERTED(4037))
52       {
53         /* Delay a SEND_PACKED signal for 10 calls to execSEND_PACKED */
54         jam();
55         if (!present)
56         {
57           /* First valid packed data in this pass */
58           jamDebug();
59           present = true;
60           cerrorPackedDelay++;
61 
62           if ((cerrorPackedDelay % 10) != 0)
63           {
64             /* Skip it */
65             jamDebug();
66             return;
67           }
68         }
69       }
70       const BlockReference TBref= numberToRef(API_PACKED, hostId);
71       const Uint32 TpacketLen= buffer->packetLenTA;
72       MEMCOPY_NO_WORDS(&signal->theData[0],
73                        &buffer->packetBufferTA[0],
74                        TpacketLen);
75       sendSignal(TBref, GSN_TRANSID_AI, signal, TpacketLen, JBB);
76       buffer->noOfPacketsTA= 0;
77       buffer->packetLenTA= 0;
78     }
79     buffer->inPackedList= false;
80   }//for
81   cpackedListIndex= 0;
82 }
83 
84 /**
85  * Copy a TRANSID_AI signal, which alread has its header constructed in 'signal',
86  * into a packed buffer structure.
87  *
88  * Prereq:
89  *  - Signal should be sufficiently small to allow it to be 'packed'
90  *  - Buffer should have sufficient free space for the signal.
91  */
bufferTRANSID_AI(Signal * signal,BlockReference aRef,const Uint32 * dataBuf,Uint32 lenOfData)92 void Dbtup::bufferTRANSID_AI(Signal* signal, BlockReference aRef,
93                              const Uint32 *dataBuf,
94                              Uint32 lenOfData)
95 {
96   ndbassert(lenOfData > 0);
97   ndbassert(TransIdAI::HeaderLength+lenOfData+1 <= 25);
98 
99   const Uint32 hostId= refToNode(aRef);
100   HostBuffer* const buffer = &hostBuffer[hostId];
101   const Uint32 TpacketLen= buffer->packetLenTA;
102 
103   // ----------------------------------------------------------------
104   // There should always be space in the buffer.
105   // ----------------------------------------------------------------
106   ndbassert((TpacketLen + 1+TransIdAI::HeaderLength+lenOfData) <= 25);
107 
108   // ----------------------------------------------------------------
109   // Copy the header + TRANSID_AI signal into the buffer
110   // ----------------------------------------------------------------
111   Uint32* const packedBuffer = &buffer->packetBufferTA[TpacketLen];
112   const Uint32 Theader= ((refToBlock(aRef) << 16)+lenOfData);
113   packedBuffer[0] = Theader;
114 
115   MEMCOPY_NO_WORDS(&packedBuffer[1], signal->theData, TransIdAI::HeaderLength);
116   MEMCOPY_NO_WORDS(&packedBuffer[1+TransIdAI::HeaderLength], dataBuf, lenOfData);
117 
118   buffer->packetLenTA= TpacketLen + 1+TransIdAI::HeaderLength+lenOfData;
119   buffer->noOfPacketsTA++;
120   updatePackedList(hostId);
121 }
122 
updatePackedList(Uint16 hostId)123 void Dbtup::updatePackedList(Uint16 hostId)
124 {
125   if (hostBuffer[hostId].inPackedList == false) {
126     Uint32 TpackedListIndex= cpackedListIndex;
127     jamDebug();
128     hostBuffer[hostId].inPackedList= true;
129     cpackedList[TpackedListIndex]= hostId;
130     cpackedListIndex= TpackedListIndex + 1;
131   }
132 }
133 
134 /**
135  * Send a TRANSID_AI signal to an API node. If sufficiently small, the signal is
136  * buffered for later being sent as a API_PACKED-signal. When required the
137  * packed buffer is flushed to the destination API-node.
138  *
139  * Prereq:
140  *  - The destination node must be an API node.
141  *  - We must be connected to the API node.
142  */
sendAPI_TRANSID_AI(Signal * signal,Uint32 recBlockRef,const Uint32 * dataBuf,Uint32 lenOfData)143 void Dbtup::sendAPI_TRANSID_AI(Signal* signal,
144                                Uint32 recBlockRef,
145                                const Uint32 *dataBuf,
146                                Uint32 lenOfData)
147 {
148   const Uint32 nodeId= refToNode(recBlockRef);
149 
150   // Test prerequisites:
151   ndbassert(getNodeInfo(nodeId).m_connected);
152   ndbassert(getNodeInfo(nodeId).m_type >= NodeInfo::API && getNodeInfo(nodeId).m_type <= NodeInfo::MGM);
153 
154   ndbrequire(nodeId < MAX_NODES);
155   HostBuffer* const buffer = &hostBuffer[nodeId];
156   const Uint32 TpacketLen= buffer->packetLenTA;
157 
158   /**
159    * Check if the packed buffers has to be flushed first.
160    * Note that even if we will not use them for this (too large) signal,
161    * it has to be flushed now in order to maintain the order of TRANSID_AIs
162    */
163   if (TpacketLen > 0 &&
164       TpacketLen + 1+TransIdAI::HeaderLength+lenOfData > 25)
165   {
166     jamDebug();
167     TransIdAI *transIdAI = (TransIdAI *)signal->getDataPtrSend();
168 
169     // Save prepare TRANSID_AI header
170     const Uint32 sig0= transIdAI->connectPtr;
171     const Uint32 sig1= transIdAI->transId[0];
172     const Uint32 sig2= transIdAI->transId[1];
173 
174     if (dataBuf != &signal->theData[25])
175     {
176       jamDebug();
177       /**
178        * TUP incorrectly guessed that it could prepare the signal
179        * to be EXECUTE_DIRECT'ly. Has to move it away for sendSignal()
180        * needing the low 25 signal-words to send the packed buffers.
181        * (Use memmove as src & dest may overlap)
182        */
183       memmove(&signal->theData[25], dataBuf, lenOfData*sizeof(Uint32));
184       dataBuf = &signal->theData[25];
185     }
186 
187     // Send already buffered TRANSID_AI(s) preceeding this TRANSID_AI
188     const BlockReference TBref = numberToRef(API_PACKED, nodeId);
189     MEMCOPY_NO_WORDS(&signal->theData[0], &buffer->packetBufferTA[0], TpacketLen);
190     sendSignal(TBref, GSN_TRANSID_AI, signal, TpacketLen, JBB);
191     buffer->noOfPacketsTA = 0;
192     buffer->packetLenTA = 0;
193 
194     // Reconstruct the current TRANSID_AI header
195     transIdAI->connectPtr = sig0;
196     transIdAI->transId[0] = sig1;
197     transIdAI->transId[1] = sig2;
198   }
199 
200   if (lenOfData <= TransIdAI::DataLength)
201   {
202     /**
203      * Short signal, buffer it, or send directly
204      * 1) Buffer signal if we can pack at least
205      *    this signal + another 1-word signal into buffers.
206      * 2) else, short-signal is sent immediately.
207      *
208      * Note that the check for fitting a 1-word signal in addition
209      * to this signal serves dual purposes:
210      * - The 1-word signal is the smalles possible signal which
211      *   can either be added later, or already is buffered.
212      * - So failing to also add a 1-word signal implies that any
213      *   previously buffered signals were flushed above.
214      *   Thus, 'packetLenTA' is also known to be '== 0' in
215      *   the non-buffered sendSignal further below.
216      */
217 #ifndef NDB_NO_DROPPED_SIGNAL
218     if (1+TransIdAI::HeaderLength + lenOfData +  // this TRANSID_AI
219         1+TransIdAI::HeaderLength + 1 <= 25)     // 1 word TRANSID_AI
220     {
221       jamDebug();
222       bufferTRANSID_AI(signal, recBlockRef, dataBuf, lenOfData);
223     }
224     else
225 #endif
226     {
227       jamDebug();
228       ndbassert(buffer->packetLenTA == 0);
229       if (dataBuf != &signal->theData[TransIdAI::HeaderLength])
230       {
231         MEMCOPY_NO_WORDS(&signal->theData[TransIdAI::HeaderLength], dataBuf, lenOfData);
232       }
233       sendSignal(recBlockRef, GSN_TRANSID_AI, signal,
234                  TransIdAI::HeaderLength+lenOfData, JBB);
235     }
236   }
237   else
238   {
239     jamDebug();
240     /**
241      * Send to API as a long signal.
242      */
243     LinearSectionPtr ptr[3];
244     ptr[0].p= const_cast<Uint32*>(dataBuf);
245     ptr[0].sz= lenOfData;
246     sendSignal(recBlockRef, GSN_TRANSID_AI, signal,
247                TransIdAI::HeaderLength, JBB, ptr, 1);
248   }
249 }
250 
251 /* ---------------------------------------------------------------- */
252 /* ----------------------- SEND READ ATTRINFO --------------------- */
253 /* ---------------------------------------------------------------- */
sendReadAttrinfo(Signal * signal,KeyReqStruct * req_struct,Uint32 ToutBufIndex)254 void Dbtup::sendReadAttrinfo(Signal* signal,
255                              KeyReqStruct *req_struct,
256                              Uint32 ToutBufIndex)
257 {
258   if(ToutBufIndex == 0)
259     return;
260 
261   const BlockReference recBlockref= req_struct->rec_blockref;
262   const Uint32 nodeId= refToNode(recBlockref);
263 
264   bool connectedToNode= getNodeInfo(nodeId).m_connected;
265   const Uint32 type= getNodeInfo(nodeId).m_type;
266   const bool is_api= (type >= NodeInfo::API && type <= NodeInfo::MGM);
267 
268   if (ERROR_INSERTED(4006) && (nodeId != getOwnNodeId())){
269     // Use error insert to turn routing on
270     jam();
271     connectedToNode= false;
272   }
273 
274   Uint32 sig0= req_struct->tc_operation_ptr;
275   Uint32 sig1= req_struct->trans_id1;
276   Uint32 sig2= req_struct->trans_id2;
277 
278   TransIdAI * transIdAI=  (TransIdAI *)signal->getDataPtrSend();
279   transIdAI->connectPtr= sig0;
280   transIdAI->transId[0]= sig1;
281   transIdAI->transId[1]= sig2;
282 
283   const Uint32 routeBlockref= req_struct->TC_ref;
284   /**
285    * If we are not connected to the destination block, we may reach it
286    * indirectly by sending a TRANSID_AI_R signal to routeBlockref. Only
287    * TC can handle TRANSID_AI_R signals. The 'ndbrequire' below should
288    * check that there is no chance of sending TRANSID_AI_R to a block
289    * that cannot handle it.
290    */
291   ndbassert (refToMain(routeBlockref) == DBTC ||
292              /**
293               * routeBlockref will point to SPJ for operations initiated by
294               * that block. TRANSID_AI_R should not be sent to SPJ, as
295               * SPJ will do its own internal error handling to compensate
296               * for the lost TRANSID_AI signal.
297               */
298              refToMain(routeBlockref) == DBSPJ ||
299              /**
300               * A node should always be connected to itself. So we should
301               * never need to send TRANSID_AI_R in this case.
302               */
303              (nodeId == getOwnNodeId() && connectedToNode));
304 
305   /**
306    * If a previous read_pseudo executed a 'FLUSH_AI', we may
307    * already have sent a TRANSID_AI signal with the result row
308    * to the API node. The result size was then already recorded
309    * in 'read_length' and we should not add the size of this
310    * row as it is not part of the 'result' .
311    */
312   if (req_struct->read_length != 0)
313   {
314     ndbassert(!is_api);  // API result already FLUSH_AI'ed
315   }
316   else
317   {
318     // No API-result produced yet, record this
319     req_struct->read_length = ToutBufIndex;
320   }
321 
322   if (connectedToNode){
323     /**
324      * Own node -> execute direct
325      */
326     if(nodeId != getOwnNodeId())
327     {
328       jamDebug();
329       if (is_api)
330       {
331         sendAPI_TRANSID_AI(signal, recBlockref,
332                            &signal->theData[25], ToutBufIndex);
333       }
334 
335       /**
336        * Send long signal if 'long' data.
337        * Note that SPJ can *only* handle long signals
338        */
339       else if (ToutBufIndex > TransIdAI::DataLength ||
340                refToMain(recBlockref) == DBSPJ)
341       {
342         jam();
343         /**
344          * Receiver block doesn't support packed 'short' signals.
345          */
346         LinearSectionPtr ptr[3];
347         ptr[0].p= &signal->theData[25];
348         ptr[0].sz= ToutBufIndex;
349         sendSignal(recBlockref, GSN_TRANSID_AI, signal,
350                    TransIdAI::HeaderLength, JBB, ptr, 1);
351       }
352       else
353       {
354         jam();
355         ndbassert(ToutBufIndex <= TransIdAI::DataLength);
356         /**
357          * Data is 'short', send short signal
358          */
359         MEMCOPY_NO_WORDS(&signal->theData[TransIdAI::HeaderLength],
360                          &signal->theData[25], ToutBufIndex);
361         sendSignal(recBlockref, GSN_TRANSID_AI, signal,
362                    TransIdAI::HeaderLength+ToutBufIndex, JBB);
363       }
364       return;
365     } //nodeId != getOwnNodeId()
366 
367     /**
368      * BACKUP, LQH & SUMA run in our thread, so we can EXECUTE_DIRECT().
369      *
370      * The UTIL/TC blocks are in another thread (in multi-threaded ndbd), so
371      * must use sendSignal().
372      *
373      * In MT LQH only LQH and BACKUP are in same thread, and BACKUP only
374      * in LCP case since user-backup uses single worker.
375      */
376     const bool sameInstance = refToInstance(recBlockref) == instance();
377     const Uint32 blockNumber= refToMain(recBlockref);
378     if (sameInstance &&
379         (blockNumber == BACKUP ||
380          blockNumber == DBLQH ||
381          blockNumber == SUMA))
382     {
383       static_assert(MAX_TUPLE_SIZE_IN_WORDS + MAX_ATTRIBUTES_IN_TABLE <=
384                       NDB_ARRAY_SIZE(signal->theData) - TransIdAI::HeaderLength,
385                     "");
386       ndbrequire(TransIdAI::HeaderLength + ToutBufIndex <= NDB_ARRAY_SIZE(signal->theData));
387       EXECUTE_DIRECT(blockNumber, GSN_TRANSID_AI, signal,
388                      TransIdAI::HeaderLength + ToutBufIndex);
389       jamEntryDebug();
390     }
391     else
392     {
393       jam();
394       LinearSectionPtr ptr[3];
395       ptr[0].p= &signal->theData[TransIdAI::HeaderLength];
396       ptr[0].sz= ToutBufIndex;
397       if (ERROR_INSERTED(4038))
398       {
399         /* Copy data to Seg-section for delayed send */
400         Uint32 sectionIVal = RNIL;
401         ndbrequire(appendToSection(sectionIVal, ptr[0].p, ptr[0].sz));
402         SectionHandle sh(this, sectionIVal);
403 
404         sendSignalWithDelay(recBlockref, GSN_TRANSID_AI, signal, 10,
405                             TransIdAI::HeaderLength, &sh);
406       }
407       else
408       {
409         /**
410          * We are sending to the same node, it is important that we maintain
411          * signal order with SCAN_FRAGCONF and other signals. So we make sure
412          * that TRANSID_AI is sent at the same priority level as the
413          * SCAN_FRAGCONF will be sent at.
414          *
415          * One case for this is Backups, the receiver is the first LDM thread
416          * which could have raised priority of scan executions to Priority A.
417          * To ensure that TRANSID_AI arrives there before SCAN_FRAGCONF we
418          * send also TRANSID_AI on priority A if the signal is sent on prio A.
419          */
420         JobBufferLevel prioLevel = req_struct->m_prio_a_flag ? JBA : JBB;
421         sendSignal(recBlockref,
422                    GSN_TRANSID_AI,
423                    signal,
424                    TransIdAI::HeaderLength,
425                    prioLevel,
426                    ptr,
427                    1);
428       }
429     }
430     return;
431   }
432 
433   /**
434    * If this node does not have a direct connection
435    * to the receiving node, we want to send the signals
436    * routed via the node that controls this read
437    */
438   // TODO is_api && !old_dest){
439   if (refToNode(recBlockref) == refToNode(routeBlockref))
440   {
441     jam();
442     /**
443      * Signal's only alternative route is direct - cannot be delivered,
444      * drop it. (Expected behavior if recBlockRef is an SPJ block.)
445      */
446     return;
447   }
448   // Only TC can handle TRANSID_AI_R signals.
449   ndbrequire(refToMain(routeBlockref) == DBTC);
450   transIdAI->attrData[0]= recBlockref;
451   LinearSectionPtr ptr[3];
452   ptr[0].p= &signal->theData[25];
453   ptr[0].sz= ToutBufIndex;
454   sendSignal(routeBlockref, GSN_TRANSID_AI_R, signal,
455              TransIdAI::HeaderLength+1, JBB, ptr, 1);
456 }
457