1 /*
2    Copyright (c) 2003, 2013, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "API.hpp"
26 #include "NdbOut.hpp"
27 #include <NdbBlob.hpp>
28 
29 #include <Interpreter.hpp>
30 #include <NdbInterpretedCode.hpp>
31 #include <AttributeHeader.hpp>
32 #include <signaldata/TcKeyReq.hpp>
33 
34 /*****************************************************************************
35  * int insertTuple();
36  *****************************************************************************/
37 int
insertTuple()38 NdbOperation::insertTuple()
39 {
40   NdbTransaction* tNdbCon = theNdbCon;
41   int tErrorLine = theErrorLine;
42   if (theStatus == Init) {
43     theStatus = OperationDefined;
44     theOperationType = InsertRequest;
45     tNdbCon->theSimpleState = 0;
46     theErrorLine = tErrorLine++;
47     theLockMode = LM_Exclusive;
48     m_abortOption = AbortOnError;
49     return 0;
50   } else {
51     setErrorCode(4200);
52     return -1;
53   }//if
54 }//NdbOperation::insertTuple()
55 /******************************************************************************
56  * int updateTuple();
57  *****************************************************************************/
58 int
updateTuple()59 NdbOperation::updateTuple()
60 {
61   NdbTransaction* tNdbCon = theNdbCon;
62   int tErrorLine = theErrorLine;
63   if (theStatus == Init) {
64     theStatus = OperationDefined;
65     tNdbCon->theSimpleState = 0;
66     theOperationType = UpdateRequest;
67     theErrorLine = tErrorLine++;
68     theLockMode = LM_Exclusive;
69     m_abortOption = AbortOnError;
70     return 0;
71   } else {
72     setErrorCode(4200);
73     return -1;
74   }//if
75 }//NdbOperation::updateTuple()
76 /*****************************************************************************
77  * int writeTuple();
78  *****************************************************************************/
79 int
writeTuple()80 NdbOperation::writeTuple()
81 {
82   NdbTransaction* tNdbCon = theNdbCon;
83   int tErrorLine = theErrorLine;
84   if (theStatus == Init) {
85     theStatus = OperationDefined;
86     tNdbCon->theSimpleState = 0;
87     theOperationType = WriteRequest;
88     theErrorLine = tErrorLine++;
89     theLockMode = LM_Exclusive;
90     m_abortOption = AbortOnError;
91     return 0;
92   } else {
93     setErrorCode(4200);
94     return -1;
95   }//if
96 }//NdbOperation::writeTuple()
97 /*****************************************************************************
98  * int deleteTuple();
99  *****************************************************************************/
100 int
deleteTuple()101 NdbOperation::deleteTuple()
102 {
103   NdbTransaction* tNdbCon = theNdbCon;
104   int tErrorLine = theErrorLine;
105   if (theStatus == Init) {
106     theStatus = OperationDefined;
107     tNdbCon->theSimpleState = 0;
108     theOperationType = DeleteRequest;
109     theErrorLine = tErrorLine++;
110     theLockMode = LM_Exclusive;
111     m_abortOption = AbortOnError;
112     return 0;
113   } else {
114     setErrorCode(4200);
115     return -1;
116   }//if
117 }//NdbOperation::deleteTuple()
118 
119 /******************************************************************************
120  * int readTuple();
121  *****************************************************************************/
122 int
readTuple(NdbOperation::LockMode lm)123 NdbOperation::readTuple(NdbOperation::LockMode lm)
124 {
125   switch(lm) {
126   case LM_Read:
127     return readTuple();
128     break;
129   case LM_Exclusive:
130     return readTupleExclusive();
131     break;
132   case LM_CommittedRead:
133     return committedRead();
134     break;
135   case LM_SimpleRead:
136     return simpleRead();
137   default:
138     return -1;
139   };
140 }
141 /******************************************************************************
142  * int readTuple();
143  *****************************************************************************/
144 int
readTuple()145 NdbOperation::readTuple()
146 {
147   NdbTransaction* tNdbCon = theNdbCon;
148   int tErrorLine = theErrorLine;
149   if (theStatus == Init) {
150     theStatus = OperationDefined;
151     tNdbCon->theSimpleState = 0;
152     theOperationType = ReadRequest;
153     theErrorLine = tErrorLine++;
154     theLockMode = LM_Read;
155     m_abortOption = AO_IgnoreError;
156     return 0;
157   } else {
158     setErrorCode(4200);
159     return -1;
160   }//if
161 }//NdbOperation::readTuple()
162 
163 /******************************************************************************
164  * int readTupleExclusive();
165  *****************************************************************************/
166 int
readTupleExclusive()167 NdbOperation::readTupleExclusive()
168 {
169   NdbTransaction* tNdbCon = theNdbCon;
170   int tErrorLine = theErrorLine;
171   if (theStatus == Init) {
172     theStatus = OperationDefined;
173     tNdbCon->theSimpleState = 0;
174     theOperationType = ReadExclusive;
175     theErrorLine = tErrorLine++;
176     theLockMode = LM_Exclusive;
177     m_abortOption = AO_IgnoreError;
178     return 0;
179   } else {
180     setErrorCode(4200);
181     return -1;
182   }//if
183 }//NdbOperation::readTupleExclusive()
184 
185 /*****************************************************************************
186  * int simpleRead();
187  *****************************************************************************/
188 int
simpleRead()189 NdbOperation::simpleRead()
190 {
191   NdbTransaction* tNdbCon = theNdbCon;
192   int tErrorLine = theErrorLine;
193   if (theStatus == Init) {
194     theStatus = OperationDefined;
195     theOperationType = ReadRequest;
196     theSimpleIndicator = 1;
197     theDirtyIndicator = 0;
198     theErrorLine = tErrorLine++;
199     theLockMode = LM_SimpleRead;
200     m_abortOption = AO_IgnoreError;
201     tNdbCon->theSimpleState = 0;
202     return 0;
203   } else {
204     setErrorCode(4200);
205     return -1;
206   }//if
207 }//NdbOperation::simpleRead()
208 
209 /*****************************************************************************
210  * int dirtyRead();
211  *****************************************************************************/
212 int
dirtyRead()213 NdbOperation::dirtyRead()
214 {
215   return committedRead();
216 }//NdbOperation::dirtyRead()
217 
218 /*****************************************************************************
219  * int committedRead();
220  *****************************************************************************/
221 int
committedRead()222 NdbOperation::committedRead()
223 {
224   int tErrorLine = theErrorLine;
225   if (theStatus == Init) {
226     theStatus = OperationDefined;
227     theOperationType = ReadRequest;
228     theSimpleIndicator = 1;
229     theDirtyIndicator = 1;
230     theErrorLine = tErrorLine++;
231     theLockMode = LM_CommittedRead;
232     m_abortOption = AO_IgnoreError;
233     return 0;
234   } else {
235     setErrorCode(4200);
236     return -1;
237   }//if
238 }//NdbOperation::committedRead()
239 
240 /*****************************************************************************
241  * int dirtyUpdate();
242  ****************************************************************************/
243 int
dirtyUpdate()244 NdbOperation::dirtyUpdate()
245 {
246   NdbTransaction* tNdbCon = theNdbCon;
247   int tErrorLine = theErrorLine;
248   if (theStatus == Init) {
249     theStatus = OperationDefined;
250     theOperationType = UpdateRequest;
251     tNdbCon->theSimpleState = 0;
252     theSimpleIndicator = 1;
253     theDirtyIndicator = 1;
254     theErrorLine = tErrorLine++;
255     theLockMode = LM_CommittedRead;
256     m_abortOption = AbortOnError;
257     return 0;
258   } else {
259     setErrorCode(4200);
260     return -1;
261   }//if
262 }//NdbOperation::dirtyUpdate()
263 
264 /******************************************************************************
265  * int dirtyWrite();
266  *****************************************************************************/
267 int
dirtyWrite()268 NdbOperation::dirtyWrite()
269 {
270   NdbTransaction* tNdbCon = theNdbCon;
271   int tErrorLine = theErrorLine;
272   if (theStatus == Init) {
273     theStatus = OperationDefined;
274     theOperationType = WriteRequest;
275     tNdbCon->theSimpleState = 0;
276     theSimpleIndicator = 1;
277     theDirtyIndicator = 1;
278     theErrorLine = tErrorLine++;
279     theLockMode = LM_CommittedRead;
280     m_abortOption = AbortOnError;
281     return 0;
282   } else {
283     setErrorCode(4200);
284     return -1;
285   }//if
286 }//NdbOperation::dirtyWrite()
287 
288 /******************************************************************************
289  * int interpretedUpdateTuple();
290  ****************************************************************************/
291 int
interpretedUpdateTuple()292 NdbOperation::interpretedUpdateTuple()
293 {
294   NdbTransaction* tNdbCon = theNdbCon;
295   int tErrorLine = theErrorLine;
296   if (theStatus == Init) {
297     theStatus = OperationDefined;
298     tNdbCon->theSimpleState = 0;
299     theOperationType = UpdateRequest;
300     theAI_LenInCurrAI = 25;
301     theLockMode = LM_Exclusive;
302     theErrorLine = tErrorLine++;
303     m_abortOption = AbortOnError;
304     initInterpreter();
305     return 0;
306   } else {
307     setErrorCode(4200);
308     return -1;
309   }//if
310 }//NdbOperation::interpretedUpdateTuple()
311 
312 /*****************************************************************************
313  * int interpretedDeleteTuple();
314  *****************************************************************************/
315 int
interpretedDeleteTuple()316 NdbOperation::interpretedDeleteTuple()
317 {
318   NdbTransaction* tNdbCon = theNdbCon;
319   int tErrorLine = theErrorLine;
320   if (theStatus == Init) {
321     theStatus = OperationDefined;
322     tNdbCon->theSimpleState = 0;
323     theOperationType = DeleteRequest;
324 
325     theErrorLine = tErrorLine++;
326     theAI_LenInCurrAI = 25;
327     theLockMode = LM_Exclusive;
328     m_abortOption = AbortOnError;
329     initInterpreter();
330     return 0;
331   } else {
332     setErrorCode(4200);
333     return -1;
334   }//if
335 }//NdbOperation::interpretedDeleteTuple()
336 
337 void
setReadLockMode(LockMode lockMode)338 NdbOperation::setReadLockMode(LockMode lockMode)
339 {
340   /* We only support changing lock mode for read operations at this time. */
341   assert(theOperationType == ReadRequest || theOperationType == ReadExclusive);
342   switch (lockMode) {
343   case LM_CommittedRead: /* TODO, check theNdbCon->theSimpleState */
344     theOperationType= ReadRequest;
345     theSimpleIndicator= 1;
346     theDirtyIndicator= 1;
347     break;
348   case LM_SimpleRead: /* TODO, check theNdbCon->theSimpleState */
349     theOperationType= ReadRequest;
350     theSimpleIndicator= 1;
351     theDirtyIndicator= 0;
352     break;
353   case LM_Read:
354     theNdbCon->theSimpleState= 0;
355     theOperationType= ReadRequest;
356     theSimpleIndicator= 0;
357     theDirtyIndicator= 0;
358     break;
359   case LM_Exclusive:
360     theNdbCon->theSimpleState= 0;
361     theOperationType= ReadExclusive;
362     theSimpleIndicator= 0;
363     theDirtyIndicator= 0;
364     break;
365   default:
366     /* Not supported / invalid. */
367     assert(false);
368   }
369   theLockMode= lockMode;
370 }
371 
372 
373 /******************************************************************************
374  * int getValue(AttrInfo* tAttrInfo, char* aRef )
375  *
376  * Return Value   Return 0 : GetValue was successful.
377  *                Return -1: In all other case.
378  * Parameters:    tAttrInfo : Attribute object of the retrieved attribute
379  *                            value.
380  * Remark:        Define an attribute to retrieve in query.
381  *****************************************************************************/
382 NdbRecAttr*
getValue_impl(const NdbColumnImpl * tAttrInfo,char * aValue)383 NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue)
384 {
385   NdbRecAttr* tRecAttr;
386   if ((tAttrInfo != NULL) &&
387       (theStatus != Init)){
388     if (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK)
389     {
390       m_flags &= ~Uint8(OF_NO_DISK);
391     }
392     if (theStatus != GetValue) {
393       if (theStatus == UseNdbRecord)
394         /* This path for extra GetValues for NdbRecord */
395         return getValue_NdbRecord(tAttrInfo, aValue);
396       if (theInterpretIndicator == 1) {
397 	if (theStatus == FinalGetValue) {
398 	  ; // Simply continue with getValue
399 	} else if (theStatus == ExecInterpretedValue) {
400 	  if (insertATTRINFO(Interpreter::EXIT_OK) == -1)
401 	    return NULL;
402 	  theInterpretedSize = theTotalCurrAI_Len -
403 	    (theInitialReadSize + 5);
404 	} else if (theStatus == SetValueInterpreted) {
405 	  theFinalUpdateSize = theTotalCurrAI_Len -
406 	    (theInitialReadSize + theInterpretedSize + 5);
407 	} else {
408 	  setErrorCodeAbort(4230);
409 	  return NULL;
410 	}//if
411         /* Final read, after running interpreted instructions. */
412 	theStatus = FinalGetValue;
413       } else {
414 	setErrorCodeAbort(4230);
415 	return NULL;
416       }//if
417     }//if
418     AttributeHeader ah(tAttrInfo->m_attrId, 0);
419     if (insertATTRINFO(ah.m_value) != -1) {
420       // Insert Attribute Id into ATTRINFO part.
421 
422       /************************************************************************
423        * Get a Receive Attribute object and link it into the operation object.
424        ***********************************************************************/
425       if((tRecAttr = theReceiver.getValue(tAttrInfo, aValue)) != 0){
426 	theErrorLine++;
427 	return tRecAttr;
428       } else {
429 	setErrorCodeAbort(4000);
430 	return NULL;
431       }
432     } else {
433       return NULL;
434     }//if insertATTRINFO failure
435   } else {
436     if (tAttrInfo == NULL) {
437       setErrorCodeAbort(4004);
438       return NULL;
439     }//if
440   }//if
441   setErrorCodeAbort(4200);
442   return NULL;
443 }
444 
445 NdbRecAttr*
getValue_NdbRecord(const NdbColumnImpl * tAttrInfo,char * aValue)446 NdbOperation::getValue_NdbRecord(const NdbColumnImpl* tAttrInfo, char* aValue)
447 {
448   NdbRecAttr* tRecAttr;
449 
450   if (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK)
451   {
452     m_flags &= ~Uint8(OF_NO_DISK);
453   }
454 
455   /*
456     For getValue with NdbRecord operations, we just allocate the NdbRecAttr,
457     the signal data will be constructed later.
458   */
459   if((tRecAttr = theReceiver.getValue(tAttrInfo, aValue)) != 0) {
460     theErrorLine++;
461     return tRecAttr;
462   } else {
463     setErrorCodeAbort(4000);
464     return NULL;
465   }
466 }
467 
468 /*****************************************************************************
469  * int setValue(AttrInfo* tAttrInfo, char* aValue, Uint32 len)
470  *
471  * Return Value:  Return 0 : SetValue was succesful.
472  *                Return -1: In all other case.
473  * Parameters:    tAttrInfo : Attribute object where the attribute
474  *                            info exists.
475  *                aValue : Reference to the variable with the new value.
476  *		  len    : Length of the value
477  * Remark:        Define a attribute to set in a query.
478 ******************************************************************************/
479 int
setValue(const NdbColumnImpl * tAttrInfo,const char * aValuePassed)480 NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
481 			const char* aValuePassed)
482 {
483   DBUG_ENTER("NdbOperation::setValue");
484   DBUG_PRINT("enter", ("col: %s  op:%d  val: 0x%lx",
485                        tAttrInfo ? tAttrInfo->m_name.c_str() : "NULL",
486                        theOperationType, (long) aValuePassed));
487 
488   int tReturnCode;
489   Uint32 tAttrId;
490   Uint32 tData;
491   Uint32 tempData[ NDB_MAX_TUPLE_SIZE_IN_WORDS ];
492   OperationType tOpType = theOperationType;
493   OperationStatus tStatus = theStatus;
494 
495 
496   if ((tOpType == UpdateRequest) ||
497       (tOpType == WriteRequest)) {
498     if (theInterpretIndicator == 0) {
499       if (tStatus == SetValue) {
500         ;
501       } else {
502         setErrorCodeAbort(4234);
503         DBUG_RETURN(-1);
504       }//if
505     } else {
506       if (tStatus == GetValue) {
507         theInitialReadSize = theTotalCurrAI_Len - 5;
508       } else if	(tStatus == ExecInterpretedValue) {
509 	//--------------------------------------------------------------------
510 	// We insert an exit from interpretation since we are now starting
511 	// to set values in the tuple by setValue.
512 	//--------------------------------------------------------------------
513         if (insertATTRINFO(Interpreter::EXIT_OK) == -1){
514           DBUG_RETURN(-1);
515 	}
516         theInterpretedSize = theTotalCurrAI_Len -
517           (theInitialReadSize + 5);
518       } else if (tStatus == SetValueInterpreted) {
519         ; // Simply continue adding new setValue
520       } else {
521 	//--------------------------------------------------------------------
522 	// setValue used in the wrong context. Application coding error.
523 	//-------------------------------------------------------------------
524         setErrorCodeAbort(4234); //Wrong error code
525         DBUG_RETURN(-1);
526       }//if
527       theStatus = SetValueInterpreted;
528     }//if
529   } else if (tOpType == InsertRequest) {
530     if ((theStatus != SetValue) && (theStatus != OperationDefined)) {
531       setErrorCodeAbort(4234);
532       DBUG_RETURN(-1);
533     }//if
534   } else if (tOpType == ReadRequest || tOpType == ReadExclusive) {
535     setErrorCodeAbort(4504);
536     DBUG_RETURN(-1);
537   } else if (tOpType == DeleteRequest) {
538     setErrorCodeAbort(4504);
539     DBUG_RETURN(-1);
540   } else if (tOpType == OpenScanRequest || tOpType == OpenRangeScanRequest) {
541     setErrorCodeAbort(4228);
542     DBUG_RETURN(-1);
543   } else {
544     //---------------------------------------------------------------------
545     // setValue with undefined operation type.
546     // Probably application coding error.
547     //---------------------------------------------------------------------
548     setErrorCodeAbort(4108);
549     DBUG_RETURN(-1);
550   }//if
551   if (tAttrInfo == NULL) {
552     setErrorCodeAbort(4004);
553     DBUG_RETURN(-1);
554   }//if
555   if (tAttrInfo->m_pk) {
556     if (theOperationType == InsertRequest) {
557       DBUG_RETURN(equal_impl(tAttrInfo, aValuePassed));
558     } else {
559       setErrorCodeAbort(4202);
560       DBUG_RETURN(-1);
561     }//if
562   }//if
563 
564   // Insert Attribute Id into ATTRINFO part.
565   tAttrId = tAttrInfo->m_attrId;
566   if (tAttrInfo->m_storageType == NDB_STORAGETYPE_DISK)
567   {
568     m_flags &= ~Uint8(OF_NO_DISK);
569   }
570   const char *aValue = aValuePassed;
571   if (aValue == NULL) {
572     if (tAttrInfo->m_nullable) {
573       AttributeHeader ah(tAttrId, 0);
574       ah.setNULL();
575       insertATTRINFO(ah.m_value);
576       // Insert Attribute Id with the value
577       // NULL into ATTRINFO part.
578       DBUG_RETURN(0);
579     } else {
580       /***********************************************************************
581        * Setting a NULL value on a NOT NULL attribute is not allowed.
582        **********************************************************************/
583       setErrorCodeAbort(4203);
584       DBUG_RETURN(-1);
585     }//if
586   }//if
587 
588   Uint32 len;
589   if (! tAttrInfo->get_var_length(aValue, len)) {
590     setErrorCodeAbort(4209);
591     DBUG_RETURN(-1);
592   }
593 
594   const Uint32 sizeInBytes = len;
595   const Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ;
596 
597   const int attributeSize = sizeInBytes;
598   const int slack = sizeInBytes & 3;
599 
600   if (((UintPtr)aValue & 3) != 0 || (slack != 0)){
601     memcpy(&tempData[0], aValue, attributeSize);
602     aValue = (char*)&tempData[0];
603     if(slack != 0) {
604       char * tmp = (char*)&tempData[0];
605       memset(&tmp[attributeSize], 0, (4 - slack));
606     }//if
607   }//if
608 
609   // Excluding bits in last word
610   const Uint32 sizeInWords = sizeInBytes / 4;
611   AttributeHeader ah(tAttrId, sizeInBytes);
612   insertATTRINFO( ah.m_value );
613 
614   /***********************************************************************
615    * Check if the pointer of the value passed is aligned on a 4 byte boundary.
616    * If so only assign the pointer to the internal variable aValue.
617    * If it is not aligned then we start by copying the value to tempData and
618    * use this as aValue instead.
619    *************************************************************************/
620 
621   tReturnCode = insertATTRINFOloop((Uint32*)aValue, sizeInWords);
622   if (tReturnCode == -1) {
623     DBUG_RETURN(tReturnCode);
624   }//if
625   if (bitsInLastWord != 0) {
626     tData = *(Uint32*)(aValue + sizeInWords*4);
627     tData = convertEndian(tData);
628     tData = tData & ((1 << bitsInLastWord) - 1);
629     tData = convertEndian(tData);
630     tReturnCode = insertATTRINFO(tData);
631     if (tReturnCode == -1) {
632       DBUG_RETURN(tReturnCode);
633     }//if
634   }//if
635   theErrorLine++;
636   DBUG_RETURN(0);
637 }//NdbOperation::setValue()
638 
639 
640 int
setAnyValue(Uint32 any_value)641 NdbOperation::setAnyValue(Uint32 any_value)
642 {
643   OperationType tOpType = theOperationType;
644 
645   if (theStatus == UseNdbRecord)
646   {
647     /* Method not allowed for NdbRecord, use OperationOptions or
648        ScanOptions structure instead */
649     setErrorCodeAbort(4515);
650     return -1;
651   }
652 
653   const NdbColumnImpl* impl =
654     &NdbColumnImpl::getImpl(* NdbDictionary::Column::ANY_VALUE);
655 
656   switch(tOpType){
657   case DeleteRequest:{
658     Uint32 ah;
659     AttributeHeader::init(&ah, AttributeHeader::ANY_VALUE, 4);
660     if (insertATTRINFO(ah) != -1 && insertATTRINFO(any_value) != -1 )
661     {
662       return 0;
663     }
664   }
665   default:
666     return setValue(impl, (const char *)&any_value);
667   }
668 
669   setErrorCodeAbort(4000);
670   return -1;
671 }
672 
673 int
setOptimize(Uint32 options)674 NdbOperation::setOptimize(Uint32 options)
675 {
676   return setValue(&NdbColumnImpl::getImpl(*NdbDictionary::Column::OPTIMIZE),
677                   (const char*)&options);
678 }
679 
680 /* Non-const variant of getBlobHandle - can return existing blob
681  * handles, or create new ones for non-NdbRecord operations
682  */
683 NdbBlob*
getBlobHandle(NdbTransaction * aCon,const NdbColumnImpl * tAttrInfo)684 NdbOperation::getBlobHandle(NdbTransaction* aCon, const NdbColumnImpl* tAttrInfo)
685 {
686   NdbBlob* tBlob = theBlobList;
687   NdbBlob* tLastBlob = NULL;
688   while (tBlob != NULL) {
689     if (tBlob->theColumn == tAttrInfo)
690       return tBlob;
691     tLastBlob = tBlob;
692     tBlob = tBlob->theNext;
693   }
694 
695   /*
696    * For NdbRecord PK, unique index and scan operations, we only fetch existing
697    * blob handles here, creation must be done by requesting the blob in the
698    * NdbRecord and mask when creating the operation.
699    * For NdbRecAttr PK, IK and scan operations, we allow Blob handles
700    * to be created here.  Note that NdbRecAttr PK and unique index ops are handled
701    * differently to NdbRecAttr scan operations.
702    */
703   if (m_attribute_record)
704   {
705     setErrorCodeAbort(4288);
706     return NULL;
707   }
708 
709   /* Check key fully defined for key operations */
710   switch (theStatus)
711   {
712   case TupleKeyDefined:
713   case GetValue:
714   case SetValue:
715   case FinalGetValue:
716   case ExecInterpretedValue:
717   case SetValueInterpreted:
718     /* All ok states to create a Blob Handle in */
719     break;
720   default:
721   {
722     /* Unexpected state to be obtaining Blob handle */
723     /* Invalid usage of blob attribute */
724     setErrorCodeAbort(4264);
725     return NULL;
726   }
727   }
728 
729   tBlob = theNdb->getNdbBlob();
730   if (tBlob == NULL)
731     return NULL;
732   if (tBlob->atPrepare(aCon, this, tAttrInfo) == -1) {
733     theNdb->releaseNdbBlob(tBlob);
734     return NULL;
735   }
736   if (tLastBlob == NULL)
737     theBlobList = tBlob;
738   else
739     tLastBlob->theNext = tBlob;
740   tBlob->theNext = NULL;
741   theNdbCon->theBlobFlag = true;
742   return tBlob;
743 }
744 
745 /* const variant of getBlobHandle - only returns existing blob handles */
746 NdbBlob*
getBlobHandle(NdbTransaction * aCon,const NdbColumnImpl * tAttrInfo) const747 NdbOperation::getBlobHandle(NdbTransaction* aCon, const NdbColumnImpl* tAttrInfo) const
748 {
749   NdbBlob* tBlob = theBlobList;
750   while (tBlob != NULL) {
751     if (tBlob->theColumn == tAttrInfo)
752       return tBlob;
753     tBlob = tBlob->theNext;
754   }
755 
756   /*
757     Const method - cannot create a new BLOB handle, NdbRecord
758     or NdbRecAttr
759   */
760   setErrorCodeAbort(4288);
761   return NULL;
762 }
763 
764 /*
765   This is used to set up a blob handle for an NdbRecord operation.
766 
767   It allocates the NdbBlob object, initialises it, and links it into the
768   operation.
769 
770   There are two cases for how to set up the primary key info:
771     1. Normal primary key or hash index key operations. The keyinfo argument
772        is passed as NULL, and the key value is read from the NdbRecord and
773        row passed from the application.
774     2. Take-over scan operation. The keyinfo argument points to a buffer
775        containing KEYINFO20 data.
776 
777   For a scan operation, there is no key info to set up at prepare time.
778 */
779 NdbBlob *
linkInBlobHandle(NdbTransaction * aCon,const NdbColumnImpl * column,NdbBlob * & lastPtr)780 NdbOperation::linkInBlobHandle(NdbTransaction *aCon,
781                                const NdbColumnImpl *column,
782                                NdbBlob * & lastPtr)
783 {
784   int res;
785 
786   NdbBlob *bh= theNdb->getNdbBlob();
787   if (bh == NULL)
788     return NULL;
789 
790   if (theOperationType == OpenScanRequest ||
791       theOperationType == OpenRangeScanRequest)
792   {
793     res= bh->atPrepareNdbRecordScan(aCon, this, column);
794   }
795   else if (m_key_record == NULL)
796   {
797     /* This means that we have a scan take-over operation, and we should
798        obtain the key from KEYINFO20 data.
799     */
800     res= bh->atPrepareNdbRecordTakeover(aCon, this, column,
801                                         m_key_row, m_keyinfo_length*4);
802   }
803   else
804   {
805     res= bh->atPrepareNdbRecord(aCon, this, column, m_key_record, m_key_row);
806   }
807   if (res == -1)
808   {
809     theNdb->releaseNdbBlob(bh);
810     return NULL;
811   }
812   if (lastPtr)
813     lastPtr->theNext= bh;
814   else
815     theBlobList= bh;
816   lastPtr= bh;
817   bh->theNext= NULL;
818   theNdbCon->theBlobFlag= true;
819 
820   return bh;
821 }
822 
823 /*
824  * Setup blob handles for an NdbRecord operation.
825  *
826  * Create blob handles for all requested blob columns.
827  *
828  * For read request, store the pointers to blob handles in the row.
829  */
830 int
getBlobHandlesNdbRecord(NdbTransaction * aCon,const Uint32 * m_read_mask)831 NdbOperation::getBlobHandlesNdbRecord(NdbTransaction* aCon,
832                                       const Uint32 * m_read_mask)
833 {
834   NdbBlob *lastBlob= NULL;
835 
836   for (Uint32 i= 0; i<m_attribute_record->noOfColumns; i++)
837   {
838     const NdbRecord::Attr *col= &m_attribute_record->columns[i];
839     if (!(col->flags & NdbRecord::IsBlob))
840       continue;
841 
842     Uint32 attrId= col->attrId;
843     if (!BitmaskImpl::get((NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5,
844                           m_read_mask, attrId))
845       continue;
846 
847     const NdbColumnImpl *tableColumn= m_currentTable->getColumn(attrId);
848     assert(tableColumn != NULL);
849 
850     NdbBlob *bh= linkInBlobHandle(aCon, tableColumn, lastBlob);
851     if (bh == NULL)
852       return -1;
853 
854     if (theOperationType == ReadRequest || theOperationType == ReadExclusive)
855     {
856       /*
857        * For read request, it is safe to cast away const-ness for the
858        * m_attribute_row.
859        */
860       memcpy((char *)&m_attribute_row[col->offset], &bh, sizeof(bh));
861     }
862   }
863 
864   return 0;
865 }
866 
867 /*
868   For a delete, we need to create blob handles for all table blob columns,
869   so that we can be sure to delete all blob parts for the row.
870   If checkReadset is true, we also check that the caller is not asking to
871   read any blobs as part of the delete.
872 */
873 int
getBlobHandlesNdbRecordDelete(NdbTransaction * aCon,bool checkReadSet,const Uint32 * m_read_mask)874 NdbOperation::getBlobHandlesNdbRecordDelete(NdbTransaction* aCon,
875                                             bool checkReadSet,
876                                             const Uint32 * m_read_mask)
877 {
878   NdbBlob *lastBlob= NULL;
879 
880   assert(theOperationType == DeleteRequest);
881 
882   for (Uint32 i= 0; i < m_currentTable->m_columns.size(); i++)
883   {
884     const NdbColumnImpl* c= m_currentTable->m_columns[i];
885     assert(c != 0);
886     if (!c->getBlobType())
887       continue;
888 
889     if (checkReadSet &&
890         (BitmaskImpl::get((NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5,
891                           m_read_mask, c->m_attrId)))
892     {
893       /* Blobs are not allowed in NdbRecord delete result record */
894       setErrorCodeAbort(4511);
895       return -1;
896     }
897 
898     NdbBlob *bh= linkInBlobHandle(aCon, c, lastBlob);
899     if (bh == NULL)
900       return -1;
901   }
902 
903   return 0;
904 }
905 
906 NdbRecAttr*
getVarValue(const NdbColumnImpl * tAttrInfo,char * aBareValue,Uint16 * aLenLoc)907 NdbOperation::getVarValue(const NdbColumnImpl* tAttrInfo,
908                           char* aBareValue, Uint16* aLenLoc)
909 {
910   NdbRecAttr* ra = getValue(tAttrInfo, aBareValue);
911   if (ra != NULL) {
912     assert(aLenLoc != NULL);
913     ra->m_getVarValue = aLenLoc;
914   }
915   return ra;
916 }
917 
918 int
setVarValue(const NdbColumnImpl * tAttrInfo,const char * aBareValue,const Uint16 & aLen)919 NdbOperation::setVarValue(const NdbColumnImpl* tAttrInfo,
920                           const char* aBareValue, const Uint16& aLen)
921 {
922   DBUG_ENTER("NdbOperation::setVarValue");
923   DBUG_PRINT("info", ("aLen=%u", (Uint32)aLen));
924 
925   // wl3717_todo not optimal..
926   const Uint32 MaxTupleSizeInLongWords= (NDB_MAX_TUPLE_SIZE + 7)/ 8;
927   Uint64 buf[ MaxTupleSizeInLongWords ];
928   assert( aLen < (NDB_MAX_TUPLE_SIZE - 2) );
929   unsigned char* p = (unsigned char*)buf;
930   p[0] = (aLen & 0xff);
931   p[1] = (aLen >> 8);
932   memcpy(&p[2], aBareValue, aLen);
933   if (setValue(tAttrInfo, (char*)buf) == -1)
934     DBUG_RETURN(-1);
935   DBUG_RETURN(0);
936 }
937 
938 /****************************************************************************
939  * int insertATTRINFO( Uint32 aData );
940  *
941  * Return Value:   Return 0 : insertATTRINFO was succesful.
942  *                 Return -1: In all other case.
943  * Parameters:     aData: the data to insert into ATTRINFO.
944  * Remark:         Puts the the data into either TCKEYREQ signal or
945  *                 ATTRINFO signal.
946  *****************************************************************************/
947 int
insertATTRINFO(Uint32 aData)948 NdbOperation::insertATTRINFO( Uint32 aData )
949 {
950   NdbApiSignal* tSignal;
951   register Uint32 tAI_LenInCurrAI = theAI_LenInCurrAI;
952   register Uint32* tAttrPtr = theATTRINFOptr;
953   register Uint32 tTotCurrAILen = theTotalCurrAI_Len;
954 
955   if (tAI_LenInCurrAI >= 25) {
956     Ndb* tNdb = theNdb;
957     NdbApiSignal* tFirstAttrinfo = theFirstATTRINFO;
958     tAI_LenInCurrAI = 3;
959     tSignal = tNdb->getSignal();
960     if (tSignal != NULL) {
961       tSignal->setSignal(m_attrInfoGSN, refToBlock(theNdbCon->m_tcRef));
962       tAttrPtr = &tSignal->getDataPtrSend()[3];
963       if (tFirstAttrinfo == NULL) {
964         tSignal->next(NULL);
965         theFirstATTRINFO = tSignal;
966         theCurrentATTRINFO = tSignal;
967       } else {
968         NdbApiSignal* tCurrentAttrinfoBeforeUpdate = theCurrentATTRINFO;
969         tSignal->next(NULL);
970         theCurrentATTRINFO = tSignal;
971         tCurrentAttrinfoBeforeUpdate->next(tSignal);
972       }//if
973     } else {
974       goto insertATTRINFO_error1;
975     }//if
976   }//if
977   *tAttrPtr = aData;
978   tAttrPtr++;
979   tTotCurrAILen++;
980   tAI_LenInCurrAI++;
981   theTotalCurrAI_Len = tTotCurrAILen;
982   theAI_LenInCurrAI = tAI_LenInCurrAI;
983   theATTRINFOptr = tAttrPtr;
984   return 0;
985 
986 insertATTRINFO_error1:
987   setErrorCodeAbort(4000);
988   return -1;
989 
990 }//NdbOperation::insertATTRINFO()
991 
992 /*****************************************************************************
993  * int insertATTRINFOloop(Uint32* aDataPtr, Uint32 aLength );
994  *
995  * Return Value:  Return 0 : insertATTRINFO was succesful.
996  *                Return -1: In all other case.
997  * Parameters:    aDataPtr: Pointer to the data to insert into ATTRINFO.
998  *                aLength: Length of data to be copied
999  * Remark:        Puts the the data into either TCKEYREQ signal or
1000  *                ATTRINFO signal.
1001  *****************************************************************************/
1002 int
insertATTRINFOloop(register const Uint32 * aDataPtr,register Uint32 aLength)1003 NdbOperation::insertATTRINFOloop(register const Uint32* aDataPtr,
1004 				 register Uint32 aLength)
1005 {
1006   NdbApiSignal* tSignal;
1007   register Uint32 tAI_LenInCurrAI = theAI_LenInCurrAI;
1008   register Uint32 tTotCurrAILen = theTotalCurrAI_Len;
1009   register Uint32* tAttrPtr = theATTRINFOptr;
1010   Ndb* tNdb = theNdb;
1011 
1012   while (aLength > 0) {
1013     if (tAI_LenInCurrAI >= 25) {
1014       NdbApiSignal* tFirstAttrinfo = theFirstATTRINFO;
1015       tAI_LenInCurrAI = 3;
1016       tSignal = tNdb->getSignal();
1017       if (tSignal != NULL) {
1018         tSignal->setSignal(m_attrInfoGSN, refToBlock(theNdbCon->m_tcRef));
1019         tAttrPtr = &tSignal->getDataPtrSend()[3];
1020         if (tFirstAttrinfo == NULL) {
1021           tSignal->next(NULL);
1022           theFirstATTRINFO = tSignal;
1023           theCurrentATTRINFO = tSignal;
1024         } else {
1025           NdbApiSignal* tCurrentAttrinfoBeforeUpdate = theCurrentATTRINFO;
1026           tSignal->next(NULL);
1027           theCurrentATTRINFO = tSignal;
1028           tCurrentAttrinfoBeforeUpdate->next(tSignal);
1029         }//if
1030       } else {
1031         goto insertATTRINFO_error1;
1032       }//if
1033     }//if
1034     {
1035       register Uint32 tData = *aDataPtr;
1036       aDataPtr++;
1037       aLength--;
1038       tAI_LenInCurrAI++;
1039       *tAttrPtr = tData;
1040       tAttrPtr++;
1041       tTotCurrAILen++;
1042     }
1043   }//while
1044   theATTRINFOptr = tAttrPtr;
1045   theTotalCurrAI_Len = tTotCurrAILen;
1046   theAI_LenInCurrAI = tAI_LenInCurrAI;
1047   return 0;
1048 
1049 insertATTRINFO_error1:
1050   setErrorCodeAbort(4000);
1051   return -1;
1052 
1053 }//NdbOperation::insertATTRINFOloop()
1054 
1055 NdbOperation::AbortOption
getAbortOption() const1056 NdbOperation::getAbortOption() const
1057 {
1058   return (AbortOption)m_abortOption;
1059 }
1060 
1061 int
setAbortOption(AbortOption ao)1062 NdbOperation::setAbortOption(AbortOption ao)
1063 {
1064   if (theStatus == UseNdbRecord)
1065   {
1066     /* Method not allowed for NdbRecord, use OperationOptions or
1067        ScanOptions structure instead */
1068     setErrorCodeAbort(4515);
1069     return -1;
1070   }
1071 
1072   switch(ao)
1073   {
1074     case AO_IgnoreError:
1075     case AbortOnError:
1076       m_abortOption= ao;
1077       return 0;
1078     default:
1079       return -1;
1080   }
1081 }
1082 
1083 
1084 int
prepareGetLockHandleNdbRecord()1085 NdbOperation::prepareGetLockHandleNdbRecord()
1086 {
1087   /* This method is used to perform the correct actions
1088    * when the OO_LOCKHANDLE flag is set on an NdbRecord
1089    * operation.
1090    */
1091   assert(theLockHandle == NULL);
1092   theLockHandle = theNdbCon->getLockHandle();
1093   if (!theLockHandle)
1094   {
1095     return 4000; /* Memory allocation issue */
1096   }
1097 
1098   assert(! theLockHandle->isLockRefValid());
1099 
1100   assert(m_attribute_record);
1101   theLockHandle->m_table = m_attribute_record->table;
1102   assert(theLockHandle->m_table);
1103 
1104   NdbRecAttr* ra =
1105     getValue_NdbRecord(&NdbColumnImpl::getImpl(*NdbDictionary::Column::LOCK_REF),
1106                        (char*) &theLockHandle->m_lockRef);
1107 
1108   if (!ra)
1109   {
1110     /* Assume error code set */
1111     assert(theError.code);
1112     return theError.code;
1113   }
1114 
1115   theLockHandle->m_state = NdbLockHandle::PREPARED;
1116 
1117   return 0;
1118 }
1119 
1120 /*
1121  * handleOperationOptions
1122  * static member for setting operation options
1123  * Called when defining operations, from NdbTransaction and
1124  * NdbScanOperation
1125  */
1126 int
handleOperationOptions(const OperationType type,const OperationOptions * opts,const Uint32 sizeOfOptions,NdbOperation * op)1127 NdbOperation::handleOperationOptions (const OperationType type,
1128                                       const OperationOptions *opts,
1129                                       const Uint32 sizeOfOptions,
1130                                       NdbOperation *op)
1131 {
1132   /* Check options size for versioning... */
1133   if (unlikely((sizeOfOptions != 0) &&
1134                (sizeOfOptions != sizeof(OperationOptions))))
1135   {
1136     // Handle different sized OperationOptions
1137     // Probably smaller is old version, larger is new version.
1138 
1139     // No other versions currently supported
1140     // Invalid or unsupported OperationOptions structure
1141     return 4297;
1142   }
1143 
1144   bool isScanTakeoverOp = (op->m_key_record == NULL);
1145 
1146   if (opts->optionsPresent & OperationOptions::OO_ABORTOPTION)
1147   {
1148     /* User defined operation abortoption : Allowed for
1149      * any operation
1150      */
1151     switch (opts->abortOption)
1152     {
1153     case AO_IgnoreError:
1154     case AbortOnError:
1155     {
1156       op->m_abortOption=opts->abortOption;
1157       break;
1158     }
1159     default:
1160       // Non-specific abortoption
1161       // Invalid AbortOption
1162       return 4296;
1163     }
1164   }
1165 
1166   if ((opts->optionsPresent & OperationOptions::OO_GETVALUE) &&
1167       (opts->numExtraGetValues > 0))
1168   {
1169     if (opts->extraGetValues == NULL)
1170     {
1171       // Incorrect combination of OperationOptions optionsPresent,
1172       // extraGet/SetValues ptr and numExtraGet/SetValues
1173       return 4512;
1174     }
1175 
1176     // Only certain operation types allow extra GetValues
1177     // Update could be made to support it in future
1178     if (type == ReadRequest ||
1179         type == ReadExclusive ||
1180         type == DeleteRequest)
1181     {
1182       // Could be readTuple(), or lockCurrentTuple().
1183       // We perform old-school NdbRecAttr reads on
1184       // these values.
1185       for (unsigned int i=0; i < opts->numExtraGetValues; i++)
1186       {
1187         GetValueSpec *pvalSpec
1188           = &(opts->extraGetValues[i]);
1189 
1190         pvalSpec->recAttr=NULL;
1191 
1192         if (pvalSpec->column == NULL)
1193         {
1194           // Column is NULL in Get/SetValueSpec structure
1195           return 4295;
1196         }
1197 
1198         NdbRecAttr *pra=
1199           op->getValue_NdbRecord(&NdbColumnImpl::getImpl(*pvalSpec->column),
1200                                  (char *) pvalSpec->appStorage);
1201 
1202         if (pra == NULL)
1203         {
1204           return -1;
1205         }
1206 
1207         pvalSpec->recAttr = pra;
1208       }
1209     }
1210     else
1211     {
1212       // Bad operation type for GetValue
1213       switch (type)
1214       {
1215       case WriteRequest :
1216       case UpdateRequest :
1217       {
1218         return 4502;
1219         // GetValue not allowed in Update operation
1220       }
1221       case InsertRequest :
1222       {
1223         return 4503;
1224         // GetValue not allowed in Insert operation
1225       }
1226       default :
1227         return 4118;
1228         // Parameter error in API call
1229       }
1230     }
1231   }
1232 
1233   if ((opts->optionsPresent & OperationOptions::OO_SETVALUE) &&
1234       (opts->numExtraSetValues > 0))
1235   {
1236     if (opts->extraSetValues == NULL)
1237     {
1238       // Incorrect combination of OperationOptions optionsPresent,
1239       // extraGet/SetValues ptr and numExtraGet/SetValues
1240       return 4512;
1241     }
1242 
1243     if ((type == InsertRequest) ||
1244         (type == UpdateRequest) ||
1245         (type == WriteRequest))
1246     {
1247       /* Could be insert/update/writeTuple() or
1248        * updateCurrentTuple()
1249        */
1250       // Validate SetValuesSpec
1251       for (Uint32 i=0; i< opts->numExtraSetValues; i++)
1252       {
1253         const NdbDictionary::Column *pcol=opts->extraSetValues[i].column;
1254         const void *pvalue=opts->extraSetValues[i].value;
1255 
1256         if (pcol == NULL)
1257         {
1258           // Column is NULL in Get/SetValueSpec structure
1259           return 4295;
1260         }
1261 
1262         if (type == UpdateRequest && pcol->getPrimaryKey())
1263         {
1264           // It is not possible to update a primary key column.
1265           // It can be set like this for insert and write (but it
1266           // still needs to be included in the key NdbRecord and row).
1267           return 4202;
1268         }
1269 
1270         if (pvalue == NULL)
1271         {
1272           if (!pcol->getNullable())
1273           {
1274             // Trying to set a NOT NULL attribute to NULL
1275             return 4203;
1276           }
1277         }
1278 
1279         NdbDictionary::Column::Type colType=pcol->getType();
1280 
1281         if ((colType == NdbDictionary::Column::Blob) ||
1282             (colType == NdbDictionary::Column::Text))
1283         {
1284           // Invalid usage of blob attribute
1285           return 4264;
1286         }
1287       }
1288 
1289       // Store details of extra set values for later
1290       op->m_extraSetValues = opts->extraSetValues;
1291       op->m_numExtraSetValues = opts->numExtraSetValues;
1292     }
1293     else
1294     {
1295       // Set value and Read/Delete etc is incompatible
1296       return 4204;
1297     }
1298   }
1299 
1300   if (opts->optionsPresent & OperationOptions::OO_PARTITION_ID)
1301   {
1302     /* Should not have any blobs defined at this stage */
1303     assert(op->theBlobList == NULL);
1304 
1305     /* Not allowed for scan takeover ops */
1306     if (unlikely(isScanTakeoverOp))
1307     {
1308       return 4510;
1309       /* User-specified partition id not allowed for scan
1310        * takeover operation
1311        */
1312     }
1313     /* Only allowed for pk ops on user defined partitioned tables
1314      * or when defining an unlock operation
1315      */
1316     if (unlikely( ! (((op->m_attribute_record->flags &
1317                        NdbRecord::RecHasUserDefinedPartitioning) &&
1318                       (op->m_key_record->table->m_index == NULL)) ||
1319                      (type == UnlockRequest))))
1320     {
1321       /* Explicit partitioning info not allowed for table and operation*/
1322       return 4546;
1323     }
1324     op->theDistributionKey=opts->partitionId;
1325     op->theDistrKeyIndicator_= 1;
1326   }
1327 
1328   if (opts->optionsPresent & OperationOptions::OO_INTERPRETED)
1329   {
1330     /* Check the operation type is valid */
1331     if (! ((type == ReadRequest)   ||
1332            (type == ReadExclusive) ||
1333            (type == UpdateRequest) ||
1334            (type == DeleteRequest)))
1335       /* NdbInterpretedCode not supported for operation type */
1336       return 4539;
1337 
1338     /* Check the program's for the same table as the
1339      * operation, within a major version number
1340      * Perhaps NdbInterpretedCode should not contain the table
1341      */
1342     const NdbDictionary::Table* codeTable= opts->interpretedCode->getTable();
1343     if (codeTable != NULL)
1344     {
1345       NdbTableImpl* impl= &NdbTableImpl::getImpl(*codeTable);
1346 
1347       if ((impl->m_id != (int) op->m_attribute_record->tableId) ||
1348           (table_version_major(impl->m_version) !=
1349            table_version_major(op->m_attribute_record->tableVersion)))
1350         return 4524; // NdbInterpretedCode is for different table`
1351     }
1352 
1353     /* Check the program's finalised */
1354     if ((opts->interpretedCode->m_flags &
1355          NdbInterpretedCode::Finalised) == 0)
1356       return 4519; // NdbInterpretedCode::finalise() not called.
1357 
1358     op->m_interpreted_code = opts->interpretedCode;
1359   }
1360 
1361   if (opts->optionsPresent & OperationOptions::OO_ANYVALUE)
1362   {
1363     /* Any operation can have an ANYVALUE set */
1364     op->m_any_value = opts->anyValue;
1365     op->m_flags |= OF_USE_ANY_VALUE;
1366   }
1367 
1368   if (opts->optionsPresent & OperationOptions::OO_CUSTOMDATA)
1369   {
1370     /* Set the operation's customData ptr */
1371     op->m_customData = opts->customData;
1372   }
1373 
1374   if (opts->optionsPresent & OperationOptions::OO_LOCKHANDLE)
1375   {
1376     if (unlikely(op->theNdb->getMinDbNodeVersion() <
1377                  NDBD_UNLOCK_OP_SUPPORTED))
1378     {
1379       /* Function not implemented yet */
1380       return 4003;
1381     }
1382 
1383     /* Check that this is a pk read with a lock
1384      * No need to worry about Blob lock upgrade issues as
1385      * Blobs have not been handled at this stage
1386      */
1387     if (((type != ReadRequest) &&
1388          (type != ReadExclusive)) ||
1389         (op->m_key_record &&
1390          (op->m_key_record->flags & NdbRecord::RecIsIndex)) ||
1391         ((op->theLockMode != LM_Read) &&
1392          (op->theLockMode != LM_Exclusive)))
1393     {
1394       return 4549; /* getLockHandle only supported for primary key read with a lock */
1395     }
1396 
1397     int prepareRc = op->prepareGetLockHandleNdbRecord();
1398     if (prepareRc != 0)
1399     {
1400       return prepareRc;
1401     }
1402   }
1403 
1404   if (opts->optionsPresent & OperationOptions::OO_QUEUABLE)
1405   {
1406     op->m_flags |= OF_QUEUEABLE;
1407   }
1408 
1409   if (opts->optionsPresent & OperationOptions::OO_NOT_QUEUABLE)
1410   {
1411     op->m_flags &= ~Uint8(OF_QUEUEABLE);
1412   }
1413 
1414   if (opts->optionsPresent & OperationOptions::OO_DEFERRED_CONSTAINTS)
1415   {
1416     op->m_flags |= OF_DEFERRED_CONSTRAINTS;
1417   }
1418 
1419   if (opts->optionsPresent & OperationOptions::OO_DISABLE_FK)
1420   {
1421     op->m_flags |= OF_DISABLE_FK;
1422   }
1423 
1424   return 0;
1425 }
1426