1 /*
2 Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #define DBTUP_C
26 #define DBTUP_BUFFER_CPP
27 #include "Dbtup.hpp"
28 #include <RefConvert.hpp>
29 #include <ndb_limits.h>
30 #include <pc.hpp>
31 #include <signaldata/TransIdAI.hpp>
32
33 #define JAM_FILE_ID 410
34
35
execSEND_PACKED(Signal * signal)36 void Dbtup::execSEND_PACKED(Signal* signal)
37 {
38 Uint16 hostId;
39 Uint32 i;
40 Uint32 TpackedListIndex= cpackedListIndex;
41 bool present = false;
42 for (i= 0; i < TpackedListIndex; i++) {
43 jam();
44 hostId= cpackedList[i];
45 ndbrequire((hostId - 1) < (MAX_NODES - 1)); // Also check not zero
46 HostBuffer* const buffer = &hostBuffer[hostId];
47 Uint32 TpacketTA= buffer->noOfPacketsTA;
48 if (TpacketTA != 0) {
49 jamDebug();
50
51 if (ERROR_INSERTED(4037))
52 {
53 /* Delay a SEND_PACKED signal for 10 calls to execSEND_PACKED */
54 jam();
55 if (!present)
56 {
57 /* First valid packed data in this pass */
58 jamDebug();
59 present = true;
60 cerrorPackedDelay++;
61
62 if ((cerrorPackedDelay % 10) != 0)
63 {
64 /* Skip it */
65 jamDebug();
66 return;
67 }
68 }
69 }
70 const BlockReference TBref= numberToRef(API_PACKED, hostId);
71 const Uint32 TpacketLen= buffer->packetLenTA;
72 MEMCOPY_NO_WORDS(&signal->theData[0],
73 &buffer->packetBufferTA[0],
74 TpacketLen);
75 sendSignal(TBref, GSN_TRANSID_AI, signal, TpacketLen, JBB);
76 buffer->noOfPacketsTA= 0;
77 buffer->packetLenTA= 0;
78 }
79 buffer->inPackedList= false;
80 }//for
81 cpackedListIndex= 0;
82 }
83
84 /**
85 * Copy a TRANSID_AI signal, which alread has its header constructed in 'signal',
86 * into a packed buffer structure.
87 *
88 * Prereq:
89 * - Signal should be sufficiently small to allow it to be 'packed'
90 * - Buffer should have sufficient free space for the signal.
91 */
bufferTRANSID_AI(Signal * signal,BlockReference aRef,const Uint32 * dataBuf,Uint32 lenOfData)92 void Dbtup::bufferTRANSID_AI(Signal* signal, BlockReference aRef,
93 const Uint32 *dataBuf,
94 Uint32 lenOfData)
95 {
96 ndbassert(lenOfData > 0);
97 ndbassert(TransIdAI::HeaderLength+lenOfData+1 <= 25);
98
99 const Uint32 hostId= refToNode(aRef);
100 HostBuffer* const buffer = &hostBuffer[hostId];
101 const Uint32 TpacketLen= buffer->packetLenTA;
102
103 // ----------------------------------------------------------------
104 // There should always be space in the buffer.
105 // ----------------------------------------------------------------
106 ndbassert((TpacketLen + 1+TransIdAI::HeaderLength+lenOfData) <= 25);
107
108 // ----------------------------------------------------------------
109 // Copy the header + TRANSID_AI signal into the buffer
110 // ----------------------------------------------------------------
111 Uint32* const packedBuffer = &buffer->packetBufferTA[TpacketLen];
112 const Uint32 Theader= ((refToBlock(aRef) << 16)+lenOfData);
113 packedBuffer[0] = Theader;
114
115 MEMCOPY_NO_WORDS(&packedBuffer[1], signal->theData, TransIdAI::HeaderLength);
116 MEMCOPY_NO_WORDS(&packedBuffer[1+TransIdAI::HeaderLength], dataBuf, lenOfData);
117
118 buffer->packetLenTA= TpacketLen + 1+TransIdAI::HeaderLength+lenOfData;
119 buffer->noOfPacketsTA++;
120 updatePackedList(hostId);
121 }
122
updatePackedList(Uint16 hostId)123 void Dbtup::updatePackedList(Uint16 hostId)
124 {
125 if (hostBuffer[hostId].inPackedList == false) {
126 Uint32 TpackedListIndex= cpackedListIndex;
127 jamDebug();
128 hostBuffer[hostId].inPackedList= true;
129 cpackedList[TpackedListIndex]= hostId;
130 cpackedListIndex= TpackedListIndex + 1;
131 }
132 }
133
134 /**
135 * Send a TRANSID_AI signal to an API node. If sufficiently small, the signal is
136 * buffered for later being sent as a API_PACKED-signal. When required the
137 * packed buffer is flushed to the destination API-node.
138 *
139 * Prereq:
140 * - The destination node must be an API node.
141 * - We must be connected to the API node.
142 */
sendAPI_TRANSID_AI(Signal * signal,Uint32 recBlockRef,const Uint32 * dataBuf,Uint32 lenOfData)143 void Dbtup::sendAPI_TRANSID_AI(Signal* signal,
144 Uint32 recBlockRef,
145 const Uint32 *dataBuf,
146 Uint32 lenOfData)
147 {
148 const Uint32 nodeId= refToNode(recBlockRef);
149
150 // Test prerequisites:
151 ndbassert(getNodeInfo(nodeId).m_connected);
152 ndbassert(getNodeInfo(nodeId).m_type >= NodeInfo::API && getNodeInfo(nodeId).m_type <= NodeInfo::MGM);
153
154 ndbrequire(nodeId < MAX_NODES);
155 HostBuffer* const buffer = &hostBuffer[nodeId];
156 const Uint32 TpacketLen= buffer->packetLenTA;
157
158 /**
159 * Check if the packed buffers has to be flushed first.
160 * Note that even if we will not use them for this (too large) signal,
161 * it has to be flushed now in order to maintain the order of TRANSID_AIs
162 */
163 if (TpacketLen > 0 &&
164 TpacketLen + 1+TransIdAI::HeaderLength+lenOfData > 25)
165 {
166 jamDebug();
167 TransIdAI *transIdAI = (TransIdAI *)signal->getDataPtrSend();
168
169 // Save prepare TRANSID_AI header
170 const Uint32 sig0= transIdAI->connectPtr;
171 const Uint32 sig1= transIdAI->transId[0];
172 const Uint32 sig2= transIdAI->transId[1];
173
174 if (dataBuf != &signal->theData[25])
175 {
176 jamDebug();
177 /**
178 * TUP incorrectly guessed that it could prepare the signal
179 * to be EXECUTE_DIRECT'ly. Has to move it away for sendSignal()
180 * needing the low 25 signal-words to send the packed buffers.
181 * (Use memmove as src & dest may overlap)
182 */
183 memmove(&signal->theData[25], dataBuf, lenOfData*sizeof(Uint32));
184 dataBuf = &signal->theData[25];
185 }
186
187 // Send already buffered TRANSID_AI(s) preceeding this TRANSID_AI
188 const BlockReference TBref = numberToRef(API_PACKED, nodeId);
189 MEMCOPY_NO_WORDS(&signal->theData[0], &buffer->packetBufferTA[0], TpacketLen);
190 sendSignal(TBref, GSN_TRANSID_AI, signal, TpacketLen, JBB);
191 buffer->noOfPacketsTA = 0;
192 buffer->packetLenTA = 0;
193
194 // Reconstruct the current TRANSID_AI header
195 transIdAI->connectPtr = sig0;
196 transIdAI->transId[0] = sig1;
197 transIdAI->transId[1] = sig2;
198 }
199
200 if (lenOfData <= TransIdAI::DataLength)
201 {
202 /**
203 * Short signal, buffer it, or send directly
204 * 1) Buffer signal if we can pack at least
205 * this signal + another 1-word signal into buffers.
206 * 2) else, short-signal is sent immediately.
207 *
208 * Note that the check for fitting a 1-word signal in addition
209 * to this signal serves dual purposes:
210 * - The 1-word signal is the smalles possible signal which
211 * can either be added later, or already is buffered.
212 * - So failing to also add a 1-word signal implies that any
213 * previously buffered signals were flushed above.
214 * Thus, 'packetLenTA' is also known to be '== 0' in
215 * the non-buffered sendSignal further below.
216 */
217 #ifndef NDB_NO_DROPPED_SIGNAL
218 if (1+TransIdAI::HeaderLength + lenOfData + // this TRANSID_AI
219 1+TransIdAI::HeaderLength + 1 <= 25) // 1 word TRANSID_AI
220 {
221 jamDebug();
222 bufferTRANSID_AI(signal, recBlockRef, dataBuf, lenOfData);
223 }
224 else
225 #endif
226 {
227 jamDebug();
228 ndbassert(buffer->packetLenTA == 0);
229 if (dataBuf != &signal->theData[TransIdAI::HeaderLength])
230 {
231 MEMCOPY_NO_WORDS(&signal->theData[TransIdAI::HeaderLength], dataBuf, lenOfData);
232 }
233 sendSignal(recBlockRef, GSN_TRANSID_AI, signal,
234 TransIdAI::HeaderLength+lenOfData, JBB);
235 }
236 }
237 else
238 {
239 jamDebug();
240 /**
241 * Send to API as a long signal.
242 */
243 LinearSectionPtr ptr[3];
244 ptr[0].p= const_cast<Uint32*>(dataBuf);
245 ptr[0].sz= lenOfData;
246 sendSignal(recBlockRef, GSN_TRANSID_AI, signal,
247 TransIdAI::HeaderLength, JBB, ptr, 1);
248 }
249 }
250
251 /* ---------------------------------------------------------------- */
252 /* ----------------------- SEND READ ATTRINFO --------------------- */
253 /* ---------------------------------------------------------------- */
sendReadAttrinfo(Signal * signal,KeyReqStruct * req_struct,Uint32 ToutBufIndex)254 void Dbtup::sendReadAttrinfo(Signal* signal,
255 KeyReqStruct *req_struct,
256 Uint32 ToutBufIndex)
257 {
258 if(ToutBufIndex == 0)
259 return;
260
261 const BlockReference recBlockref= req_struct->rec_blockref;
262 const Uint32 nodeId= refToNode(recBlockref);
263
264 bool connectedToNode= getNodeInfo(nodeId).m_connected;
265 const Uint32 type= getNodeInfo(nodeId).m_type;
266 const bool is_api= (type >= NodeInfo::API && type <= NodeInfo::MGM);
267
268 if (ERROR_INSERTED(4006) && (nodeId != getOwnNodeId())){
269 // Use error insert to turn routing on
270 jam();
271 connectedToNode= false;
272 }
273
274 Uint32 sig0= req_struct->tc_operation_ptr;
275 Uint32 sig1= req_struct->trans_id1;
276 Uint32 sig2= req_struct->trans_id2;
277
278 TransIdAI * transIdAI= (TransIdAI *)signal->getDataPtrSend();
279 transIdAI->connectPtr= sig0;
280 transIdAI->transId[0]= sig1;
281 transIdAI->transId[1]= sig2;
282
283 const Uint32 routeBlockref= req_struct->TC_ref;
284 /**
285 * If we are not connected to the destination block, we may reach it
286 * indirectly by sending a TRANSID_AI_R signal to routeBlockref. Only
287 * TC can handle TRANSID_AI_R signals. The 'ndbrequire' below should
288 * check that there is no chance of sending TRANSID_AI_R to a block
289 * that cannot handle it.
290 */
291 ndbassert (refToMain(routeBlockref) == DBTC ||
292 /**
293 * routeBlockref will point to SPJ for operations initiated by
294 * that block. TRANSID_AI_R should not be sent to SPJ, as
295 * SPJ will do its own internal error handling to compensate
296 * for the lost TRANSID_AI signal.
297 */
298 refToMain(routeBlockref) == DBSPJ ||
299 /**
300 * A node should always be connected to itself. So we should
301 * never need to send TRANSID_AI_R in this case.
302 */
303 (nodeId == getOwnNodeId() && connectedToNode));
304
305 /**
306 * If a previous read_pseudo executed a 'FLUSH_AI', we may
307 * already have sent a TRANSID_AI signal with the result row
308 * to the API node. The result size was then already recorded
309 * in 'read_length' and we should not add the size of this
310 * row as it is not part of the 'result' .
311 */
312 if (req_struct->read_length != 0)
313 {
314 ndbassert(!is_api); // API result already FLUSH_AI'ed
315 }
316 else
317 {
318 // No API-result produced yet, record this
319 req_struct->read_length = ToutBufIndex;
320 }
321
322 if (connectedToNode){
323 /**
324 * Own node -> execute direct
325 */
326 if(nodeId != getOwnNodeId())
327 {
328 jamDebug();
329 if (is_api)
330 {
331 sendAPI_TRANSID_AI(signal, recBlockref,
332 &signal->theData[25], ToutBufIndex);
333 }
334
335 /**
336 * Send long signal if 'long' data.
337 * Note that SPJ can *only* handle long signals
338 */
339 else if (ToutBufIndex > TransIdAI::DataLength ||
340 refToMain(recBlockref) == DBSPJ)
341 {
342 jam();
343 /**
344 * Receiver block doesn't support packed 'short' signals.
345 */
346 LinearSectionPtr ptr[3];
347 ptr[0].p= &signal->theData[25];
348 ptr[0].sz= ToutBufIndex;
349 sendSignal(recBlockref, GSN_TRANSID_AI, signal,
350 TransIdAI::HeaderLength, JBB, ptr, 1);
351 }
352 else
353 {
354 jam();
355 ndbassert(ToutBufIndex <= TransIdAI::DataLength);
356 /**
357 * Data is 'short', send short signal
358 */
359 MEMCOPY_NO_WORDS(&signal->theData[TransIdAI::HeaderLength],
360 &signal->theData[25], ToutBufIndex);
361 sendSignal(recBlockref, GSN_TRANSID_AI, signal,
362 TransIdAI::HeaderLength+ToutBufIndex, JBB);
363 }
364 return;
365 } //nodeId != getOwnNodeId()
366
367 /**
368 * BACKUP, LQH & SUMA run in our thread, so we can EXECUTE_DIRECT().
369 *
370 * The UTIL/TC blocks are in another thread (in multi-threaded ndbd), so
371 * must use sendSignal().
372 *
373 * In MT LQH only LQH and BACKUP are in same thread, and BACKUP only
374 * in LCP case since user-backup uses single worker.
375 */
376 const bool sameInstance = refToInstance(recBlockref) == instance();
377 const Uint32 blockNumber= refToMain(recBlockref);
378 if (sameInstance &&
379 (blockNumber == BACKUP ||
380 blockNumber == DBLQH ||
381 blockNumber == SUMA))
382 {
383 static_assert(MAX_TUPLE_SIZE_IN_WORDS + MAX_ATTRIBUTES_IN_TABLE <=
384 NDB_ARRAY_SIZE(signal->theData) - TransIdAI::HeaderLength,
385 "");
386 ndbrequire(TransIdAI::HeaderLength + ToutBufIndex <= NDB_ARRAY_SIZE(signal->theData));
387 EXECUTE_DIRECT(blockNumber, GSN_TRANSID_AI, signal,
388 TransIdAI::HeaderLength + ToutBufIndex);
389 jamEntryDebug();
390 }
391 else
392 {
393 jam();
394 LinearSectionPtr ptr[3];
395 ptr[0].p= &signal->theData[TransIdAI::HeaderLength];
396 ptr[0].sz= ToutBufIndex;
397 if (ERROR_INSERTED(4038))
398 {
399 /* Copy data to Seg-section for delayed send */
400 Uint32 sectionIVal = RNIL;
401 ndbrequire(appendToSection(sectionIVal, ptr[0].p, ptr[0].sz));
402 SectionHandle sh(this, sectionIVal);
403
404 sendSignalWithDelay(recBlockref, GSN_TRANSID_AI, signal, 10,
405 TransIdAI::HeaderLength, &sh);
406 }
407 else
408 {
409 /**
410 * We are sending to the same node, it is important that we maintain
411 * signal order with SCAN_FRAGCONF and other signals. So we make sure
412 * that TRANSID_AI is sent at the same priority level as the
413 * SCAN_FRAGCONF will be sent at.
414 *
415 * One case for this is Backups, the receiver is the first LDM thread
416 * which could have raised priority of scan executions to Priority A.
417 * To ensure that TRANSID_AI arrives there before SCAN_FRAGCONF we
418 * send also TRANSID_AI on priority A if the signal is sent on prio A.
419 */
420 JobBufferLevel prioLevel = req_struct->m_prio_a_flag ? JBA : JBB;
421 sendSignal(recBlockref,
422 GSN_TRANSID_AI,
423 signal,
424 TransIdAI::HeaderLength,
425 prioLevel,
426 ptr,
427 1);
428 }
429 }
430 return;
431 }
432
433 /**
434 * If this node does not have a direct connection
435 * to the receiving node, we want to send the signals
436 * routed via the node that controls this read
437 */
438 // TODO is_api && !old_dest){
439 if (refToNode(recBlockref) == refToNode(routeBlockref))
440 {
441 jam();
442 /**
443 * Signal's only alternative route is direct - cannot be delivered,
444 * drop it. (Expected behavior if recBlockRef is an SPJ block.)
445 */
446 return;
447 }
448 // Only TC can handle TRANSID_AI_R signals.
449 ndbrequire(refToMain(routeBlockref) == DBTC);
450 transIdAI->attrData[0]= recBlockref;
451 LinearSectionPtr ptr[3];
452 ptr[0].p= &signal->theData[25];
453 ptr[0].sz= ToutBufIndex;
454 sendSignal(routeBlockref, GSN_TRANSID_AI_R, signal,
455 TransIdAI::HeaderLength+1, JBB, ptr, 1);
456 }
457