1 /*****************************************************************************
2 
3 Copyright (c) 2014, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file lock/lock0prdt.cc
29 The transaction lock system
30 
31 Created 9/7/2013 Jimmy Yang
32 *******************************************************/
33 
34 #define LOCK_MODULE_IMPLEMENTATION
35 
36 #include "lock0lock.h"
37 #include "lock0priv.h"
38 #include "lock0prdt.h"
39 
40 #ifdef UNIV_NONINL
41 #include "lock0lock.ic"
42 #include "lock0priv.ic"
43 #include "lock0prdt.ic"
44 #endif
45 
46 #include "ha_prototypes.h"
47 #include "usr0sess.h"
48 #include "trx0purge.h"
49 #include "dict0mem.h"
50 #include "dict0boot.h"
51 #include "trx0sys.h"
52 #include "srv0mon.h"
53 #include "ut0vec.h"
54 #include "btr0btr.h"
55 #include "dict0boot.h"
56 #include <set>
57 
58 /*********************************************************************//**
59 Get a minimum bounding box from a Predicate
60 @return	the minimum bounding box */
61 UNIV_INLINE
62 rtr_mbr_t*
prdt_get_mbr_from_prdt(const lock_prdt_t * prdt)63 prdt_get_mbr_from_prdt(
64 /*===================*/
65 	const lock_prdt_t*	prdt)	/*!< in: the lock predicate */
66 {
67 	rtr_mbr_t*	mbr_loc = reinterpret_cast<rtr_mbr_t*>(prdt->data);
68 
69 	return(mbr_loc);
70 }
71 
72 /*********************************************************************//**
73 Get a predicate from a lock
74 @return	the predicate */
75 lock_prdt_t*
lock_get_prdt_from_lock(const lock_t * lock)76 lock_get_prdt_from_lock(
77 /*====================*/
78 	const lock_t*	lock)	/*!< in: the lock */
79 {
80 	lock_prdt_t*	prdt = reinterpret_cast<lock_prdt_t*>(
81 				&((reinterpret_cast<byte*>(
82 					const_cast<lock_t*>(&lock[1])))[
83 						UNIV_WORD_SIZE]));
84 
85 	return(prdt);
86 }
87 
88 /*********************************************************************//**
89 Get a minimum bounding box directly from a lock
90 @return	the minimum bounding box*/
91 UNIV_INLINE
92 rtr_mbr_t*
lock_prdt_get_mbr_from_lock(const lock_t * lock)93 lock_prdt_get_mbr_from_lock(
94 /*========================*/
95 	const lock_t*	lock)	/*!< in: the lock */
96 {
97 	ut_ad(lock->type_mode & LOCK_PREDICATE);
98 
99 	lock_prdt_t*	prdt = lock_get_prdt_from_lock(lock);
100 
101 	rtr_mbr_t*	mbr_loc = prdt_get_mbr_from_prdt(prdt);
102 
103 	return(mbr_loc);
104 }
105 
106 /*********************************************************************//**
107 Append a predicate to the lock */
108 void
lock_prdt_set_prdt(lock_t * lock,const lock_prdt_t * prdt)109 lock_prdt_set_prdt(
110 /*===============*/
111 	lock_t*			lock,	/*!< in: lock */
112 	const lock_prdt_t*	prdt)	/*!< in: Predicate */
113 {
114 	ut_ad(lock->type_mode & LOCK_PREDICATE);
115 
116 	memcpy(&(((byte*) &lock[1])[UNIV_WORD_SIZE]), prdt, sizeof *prdt);
117 }
118 
119 /*********************************************************************//**
120 Checks if a predicate lock request for a new lock has to wait for
121 another lock.
122 @return	true if new lock has to wait for lock2 to be released */
123 bool
lock_prdt_has_to_wait(const trx_t * trx,ulint type_mode,lock_prdt_t * prdt,const lock_t * lock2)124 lock_prdt_has_to_wait(
125 /*==================*/
126 	const trx_t*	trx,	/*!< in: trx of new lock */
127 	ulint		type_mode,/*!< in: precise mode of the new lock
128 				to set: LOCK_S or LOCK_X, possibly
129 				ORed to LOCK_PREDICATE or LOCK_PRDT_PAGE,
130 				LOCK_INSERT_INTENTION */
131 	lock_prdt_t*	prdt,	/*!< in: lock predicate to check */
132 	const lock_t*	lock2)	/*!< in: another record lock; NOTE that
133 				it is assumed that this has a lock bit
134 				set on the same record as in the new
135 				lock we are setting */
136 {
137 	lock_prdt_t*	cur_prdt = lock_get_prdt_from_lock(lock2);
138 
139 	ut_ad(trx && lock2);
140 	ut_ad((lock2->type_mode & LOCK_PREDICATE && type_mode & LOCK_PREDICATE)
141 	      || (lock2->type_mode & LOCK_PRDT_PAGE
142 		  && type_mode & LOCK_PRDT_PAGE));
143 
144 	ut_ad(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE));
145 
146 	if (trx != lock2->trx
147 	    && !lock_mode_compatible(static_cast<lock_mode>(
148 			             LOCK_MODE_MASK & type_mode),
149 				     lock_get_mode(lock2))) {
150 
151 		/* If it is a page lock, then return true (conflict) */
152 		if (type_mode & LOCK_PRDT_PAGE) {
153 			ut_ad(lock2->type_mode & LOCK_PRDT_PAGE);
154 
155 			return(true);
156 		}
157 
158 		/* Predicate lock does not conflicts with non-predicate lock */
159 		if (!(lock2->type_mode & LOCK_PREDICATE)) {
160 			return(FALSE);
161 		}
162 
163 		ut_ad(lock2->type_mode & LOCK_PREDICATE);
164 
165 		if (!(type_mode & LOCK_INSERT_INTENTION)) {
166 			/* PREDICATE locks without LOCK_INSERT_INTENTION flag
167 			do not need to wait for anything. This is because
168 			different users can have conflicting lock types
169 			on predicates. */
170 
171 			return(FALSE);
172 		}
173 
174 		if (lock2->type_mode & LOCK_INSERT_INTENTION) {
175 
176 			/* No lock request needs to wait for an insert
177 			intention lock to be removed. This makes it similar
178 			to GAP lock, that allows conflicting insert intention
179 			locks */
180 			return(FALSE);
181 		}
182 
183 		if (!lock_prdt_consistent(cur_prdt, prdt, 0)) {
184 			return(false);
185 		}
186 
187 		return(TRUE);
188 	}
189 
190 	return(FALSE);
191 }
192 
193 /*********************************************************************//**
194 Checks if a transaction has a GRANTED stronger or equal predicate lock
195 on the page
196 @return	lock or NULL */
197 UNIV_INLINE
198 lock_t*
lock_prdt_has_lock(ulint precise_mode,ulint type_mode,const buf_block_t * block,lock_prdt_t * prdt,const trx_t * trx)199 lock_prdt_has_lock(
200 /*===============*/
201 	ulint			precise_mode,	/*!< in: LOCK_S or LOCK_X */
202 	ulint			type_mode,	/*!< in: LOCK_PREDICATE etc. */
203 	const buf_block_t*	block,		/*!< in: buffer block
204 						containing the record */
205 	lock_prdt_t*		prdt,		/*!< in: The predicate to be
206 						attached to the new lock */
207 	const trx_t*		trx)		/*!< in: transaction */
208 {
209 	lock_t*		lock;
210 
211 	ut_ad(lock_mutex_own());
212 	ut_ad((precise_mode & LOCK_MODE_MASK) == LOCK_S
213 	      || (precise_mode & LOCK_MODE_MASK) == LOCK_X);
214 	ut_ad(!(precise_mode & LOCK_INSERT_INTENTION));
215 
216 	for (lock = lock_rec_get_first(
217 		lock_hash_get(type_mode), block, PRDT_HEAPNO);
218 	     lock != NULL;
219 	     lock = lock_rec_get_next(PRDT_HEAPNO, lock)) {
220 		ut_ad(lock->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE));
221 
222 		if (lock->trx == trx
223 		    && !(lock->type_mode & LOCK_INSERT_INTENTION)
224 		    && !lock_get_wait(lock)
225 		    && lock_mode_stronger_or_eq(
226 			    lock_get_mode(lock),
227 			    static_cast<lock_mode>(
228 				    precise_mode & LOCK_MODE_MASK))) {
229 			if (lock->type_mode & LOCK_PRDT_PAGE) {
230 				return(lock);
231 			}
232 
233 			ut_ad(lock->type_mode & LOCK_PREDICATE);
234 			lock_prdt_t*	cur_prdt = lock_get_prdt_from_lock(
235 							lock);
236 
237 			/* if the lock predicate operator is the same
238 			as the one to look, and prdicate test is successful,
239 			then we find a lock */
240 			if (cur_prdt->op == prdt->op
241 			    && lock_prdt_consistent(cur_prdt, prdt, 0)) {
242 
243 				return(lock);
244 			}
245 		}
246 	}
247 
248 	return(NULL);
249 }
250 
251 /*********************************************************************//**
252 Checks if some other transaction has a conflicting predicate
253 lock request in the queue, so that we have to wait.
254 @return	lock or NULL */
255 static
256 const lock_t*
lock_prdt_other_has_conflicting(ulint mode,const buf_block_t * block,lock_prdt_t * prdt,const trx_t * trx)257 lock_prdt_other_has_conflicting(
258 /*============================*/
259 	ulint			mode,	/*!< in: LOCK_S or LOCK_X,
260 					possibly ORed to LOCK_PREDICATE or
261 					LOCK_PRDT_PAGE, LOCK_INSERT_INTENTION */
262 	const buf_block_t*	block,	/*!< in: buffer block containing
263 					the record */
264 	lock_prdt_t*		prdt,    /*!< in: Predicates (currently)
265 					the Minimum Bounding Rectangle)
266 					the new lock will be on */
267 	const trx_t*		trx)	/*!< in: our transaction */
268 {
269 	ut_ad(lock_mutex_own());
270 
271 	for (const lock_t* lock = lock_rec_get_first(
272 		lock_hash_get(mode), block, PRDT_HEAPNO);
273 	     lock != NULL;
274 	     lock = lock_rec_get_next_const(PRDT_HEAPNO, lock)) {
275 
276 		if (lock->trx == trx) {
277 			continue;
278 		}
279 
280 		if (lock_prdt_has_to_wait(trx, mode, prdt, lock)) {
281 			return(lock);
282 		}
283 	}
284 
285 	return(NULL);
286 }
287 
288 /*********************************************************************//**
289 Reset the Minimum Bounding Rectangle (to a large area) */
290 static
291 void
lock_prdt_enlarge_mbr(const lock_t * lock,rtr_mbr_t * mbr)292 lock_prdt_enlarge_mbr(
293 /*==================*/
294 	const lock_t*	lock,	/*!< in/out: lock to modify */
295 	rtr_mbr_t*	mbr)    /*!< in: Minimum Bounding Rectangle */
296 {
297 	rtr_mbr_t*	cur_mbr = lock_prdt_get_mbr_from_lock(lock);
298 
299 	if (cur_mbr->xmin > mbr->xmin) {
300 		cur_mbr->xmin = mbr->xmin;
301 	}
302 
303 	if (cur_mbr->ymin > mbr->ymin) {
304 		cur_mbr->ymin = mbr->ymin;
305 	}
306 
307 	if (cur_mbr->xmax < mbr->xmax) {
308 		cur_mbr->xmax = mbr->xmax;
309 	}
310 
311 	if (cur_mbr->ymax < mbr->ymax) {
312 		cur_mbr->ymax = mbr->ymax;
313 	}
314 }
315 
316 /*********************************************************************//**
317 Reset the predicates to a "covering" (larger) predicates */
318 static
319 void
lock_prdt_enlarge_prdt(lock_t * lock,lock_prdt_t * prdt)320 lock_prdt_enlarge_prdt(
321 /*===================*/
322 	lock_t*		lock,	/*!< in/out: lock to modify */
323 	lock_prdt_t*	prdt)	/*!< in: predicate */
324 {
325 	rtr_mbr_t*	mbr = prdt_get_mbr_from_prdt(prdt);
326 
327 	lock_prdt_enlarge_mbr(lock, mbr);
328 }
329 
330 /*********************************************************************//**
331 Check two predicates' MBRs are the same
332 @return	true if they are the same */
333 static
334 bool
lock_prdt_is_same(lock_prdt_t * prdt1,lock_prdt_t * prdt2)335 lock_prdt_is_same(
336 /*==============*/
337 	lock_prdt_t*	prdt1,		/*!< in: MBR with the lock */
338 	lock_prdt_t*	prdt2)		/*!< in: MBR with the lock */
339 {
340 	rtr_mbr_t*	mbr1 = prdt_get_mbr_from_prdt(prdt1);
341 	rtr_mbr_t*	mbr2 = prdt_get_mbr_from_prdt(prdt2);
342 
343 	if (prdt1->op == prdt2->op && MBR_EQUAL_CMP(mbr1, mbr2)) {
344 		return(true);
345 	}
346 
347 	return(false);
348 }
349 
350 /*********************************************************************//**
351 Looks for a similar predicate lock struct by the same trx on the same page.
352 This can be used to save space when a new record lock should be set on a page:
353 no new struct is needed, if a suitable old one is found.
354 @return	lock or NULL */
355 static
356 lock_t*
lock_prdt_find_on_page(ulint type_mode,const buf_block_t * block,lock_prdt_t * prdt,const trx_t * trx)357 lock_prdt_find_on_page(
358 /*===================*/
359 	ulint			type_mode,	/*!< in: lock type_mode field */
360 	const buf_block_t*	block,		/*!< in: buffer block */
361 	lock_prdt_t*		prdt,		/*!< in: MBR with the lock */
362 	const trx_t*		trx)		/*!< in: transaction */
363 {
364 	lock_t*	lock;
365 
366 	ut_ad(lock_mutex_own());
367 
368 	for (lock = lock_rec_get_first_on_page(lock_hash_get(type_mode), block);
369 	     lock != NULL;
370 	     lock = lock_rec_get_next_on_page(lock)) {
371 
372 		if (lock->trx == trx
373 		    && lock->type_mode == type_mode) {
374 			if (lock->type_mode & LOCK_PRDT_PAGE) {
375 				return(lock);
376 			}
377 
378 			ut_ad(lock->type_mode & LOCK_PREDICATE);
379 
380 			if (lock_prdt_is_same(lock_get_prdt_from_lock(lock),
381 					      prdt)) {
382 				return(lock);
383 			}
384 		}
385 	}
386 
387 	return(NULL);
388 }
389 
390 /*********************************************************************//**
391 Adds a predicate lock request in the predicate lock queue.
392 @return	lock where the bit was set */
393 static
394 lock_t*
lock_prdt_add_to_queue(ulint type_mode,const buf_block_t * block,dict_index_t * index,trx_t * trx,lock_prdt_t * prdt,bool caller_owns_trx_mutex)395 lock_prdt_add_to_queue(
396 /*===================*/
397 	ulint			type_mode,/*!< in: lock mode, wait, predicate
398 					etc. flags; type is ignored
399 					and replaced by LOCK_REC */
400 	const buf_block_t*	block,	/*!< in: buffer block containing
401 					the record */
402 	dict_index_t*		index,	/*!< in: index of record */
403 	trx_t*			trx,	/*!< in/out: transaction */
404 	lock_prdt_t*		prdt,	/*!< in: Minimum Bounding Rectangle
405 					the new lock will be on */
406 	bool			caller_owns_trx_mutex)
407 					/*!< in: TRUE if caller owns the
408 					transaction mutex */
409 {
410 	ut_ad(lock_mutex_own());
411 	ut_ad(caller_owns_trx_mutex == trx_mutex_own(trx));
412 	ut_ad(!dict_index_is_clust(index) && !dict_index_is_online_ddl(index));
413 	ut_ad(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE));
414 
415 #ifdef UNIV_DEBUG
416 	switch (type_mode & LOCK_MODE_MASK) {
417 	case LOCK_X:
418 	case LOCK_S:
419 		break;
420 	default:
421 		ut_error;
422 	}
423 #endif /* UNIV_DEBUG */
424 
425 	type_mode |= LOCK_REC;
426 
427 	/* Look for a waiting lock request on the same record or on a gap */
428 
429 	lock_t*		lock;
430 
431 	for (lock = lock_rec_get_first_on_page(lock_hash_get(type_mode), block);
432 	     lock != NULL;
433 	     lock = lock_rec_get_next_on_page(lock)) {
434 
435 		if (lock_get_wait(lock)
436 		    && lock_rec_get_nth_bit(lock, PRDT_HEAPNO)
437 		    && lock->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)) {
438 
439 			break;
440 		}
441 	}
442 
443 	if (lock == NULL && !(type_mode & LOCK_WAIT)) {
444 
445 		/* Look for a similar record lock on the same page:
446 		if one is found and there are no waiting lock requests,
447 		we can just set the bit */
448 
449 		lock = lock_prdt_find_on_page(type_mode, block, prdt, trx);
450 
451 		if (lock != NULL) {
452 
453 			if (lock->type_mode & LOCK_PREDICATE) {
454 				lock_prdt_enlarge_prdt(lock, prdt);
455 			}
456 
457 			return(lock);
458 		}
459 	}
460 
461 	RecLock	rec_lock(index, block, PRDT_HEAPNO, type_mode);
462 
463 	return(rec_lock.create(trx, caller_owns_trx_mutex, true, prdt));
464 }
465 
466 /*********************************************************************//**
467 Checks if locks of other transactions prevent an immediate insert of
468 a predicate record.
469 @return	DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
470 dberr_t
lock_prdt_insert_check_and_lock(ulint flags,const rec_t * rec,buf_block_t * block,dict_index_t * index,que_thr_t * thr,mtr_t * mtr,lock_prdt_t * prdt)471 lock_prdt_insert_check_and_lock(
472 /*============================*/
473 	ulint		flags,	/*!< in: if BTR_NO_LOCKING_FLAG bit is
474 				set, does nothing */
475 	const rec_t*	rec,	/*!< in: record after which to insert */
476 	buf_block_t*	block,	/*!< in/out: buffer block of rec */
477 	dict_index_t*	index,	/*!< in: index */
478 	que_thr_t*	thr,	/*!< in: query thread */
479 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
480 	lock_prdt_t*	prdt)	/*!< in: Predicates with Minimum Bound
481 				Rectangle */
482 {
483 	ut_ad(block->frame == page_align(rec));
484 
485 	if (flags & BTR_NO_LOCKING_FLAG) {
486 
487 		return(DB_SUCCESS);
488 	}
489 
490 	ut_ad(!dict_table_is_temporary(index->table));
491 	ut_ad(!dict_index_is_clust(index));
492 
493 	trx_t*	trx = thr_get_trx(thr);
494 
495 	lock_mutex_enter();
496 
497 	/* Because this code is invoked for a running transaction by
498 	the thread that is serving the transaction, it is not necessary
499 	to hold trx->mutex here. */
500 
501 	ut_ad(lock_table_has(trx, index->table, LOCK_IX));
502 
503 	lock_t*		lock;
504 
505 	/* Only need to check locks on prdt_hash */
506 	lock = lock_rec_get_first(lock_sys->prdt_hash, block, PRDT_HEAPNO);
507 
508 	if (lock == NULL) {
509 		lock_mutex_exit();
510 
511 		/* Update the page max trx id field */
512 		page_update_max_trx_id(block, buf_block_get_page_zip(block),
513 				       trx->id, mtr);
514 
515 		return(DB_SUCCESS);
516 	}
517 
518 	ut_ad(lock->type_mode & LOCK_PREDICATE);
519 
520 	dberr_t		err;
521 
522 	/* If another transaction has an explicit lock request which locks
523 	the predicate, waiting or granted, on the successor, the insert
524 	has to wait.
525 
526 	Similar to GAP lock, we do not consider lock from inserts conflicts
527 	with each other */
528 
529 	const ulint	mode = LOCK_X | LOCK_PREDICATE | LOCK_INSERT_INTENTION;
530 
531 	const lock_t*	wait_for = lock_prdt_other_has_conflicting(
532 		mode, block, prdt, trx);
533 
534 	if (wait_for != NULL) {
535 		rtr_mbr_t*	mbr = prdt_get_mbr_from_prdt(prdt);
536 
537 		/* Allocate MBR on the lock heap */
538 		lock_init_prdt_from_mbr(prdt, mbr, 0, trx->lock.lock_heap);
539 
540 		RecLock	rec_lock(thr, index, block, PRDT_HEAPNO, mode);
541 
542 		/* Note that we may get DB_SUCCESS also here! */
543 
544 		trx_mutex_enter(trx);
545 
546 		err = rec_lock.add_to_waitq(wait_for, prdt);
547 
548 		trx_mutex_exit(trx);
549 
550 	} else {
551 		err = DB_SUCCESS;
552 	}
553 
554 	lock_mutex_exit();
555 
556 	switch (err) {
557 	case DB_SUCCESS_LOCKED_REC:
558 		err = DB_SUCCESS;
559 		/* fall through */
560 	case DB_SUCCESS:
561 		/* Update the page max trx id field */
562 		page_update_max_trx_id(block,
563 				       buf_block_get_page_zip(block),
564 				       trx->id, mtr);
565 	default:
566 		/* We only care about the two return values. */
567 		break;
568 	}
569 
570 	return(err);
571 }
572 
573 /**************************************************************//**
574 Check whether any predicate lock in parent needs to propagate to
575 child page after split. */
576 void
lock_prdt_update_parent(buf_block_t * left_block,buf_block_t * right_block,lock_prdt_t * left_prdt,lock_prdt_t * right_prdt,lock_prdt_t * parent_prdt,ulint space,ulint page_no)577 lock_prdt_update_parent(
578 /*====================*/
579         buf_block_t*    left_block,	/*!< in/out: page to be split */
580         buf_block_t*    right_block,	/*!< in/out: the new half page */
581         lock_prdt_t*	left_prdt,	/*!< in: MBR on the old page */
582         lock_prdt_t*	right_prdt,	/*!< in: MBR on the new page */
583 	lock_prdt_t*	parent_prdt,	/*!< in: original parent MBR */
584 	ulint		space,		/*!< in: parent space id */
585 	ulint		page_no)	/*!< in: parent page number */
586 {
587 	lock_t*		lock;
588 
589 	lock_mutex_enter();
590 
591 	/* Get all locks in parent */
592 	for (lock = lock_rec_get_first_on_page_addr(
593 			lock_sys->prdt_hash, space, page_no);
594 	     lock;
595 	     lock = lock_rec_get_next_on_page(lock)) {
596 		lock_prdt_t*	lock_prdt;
597 		ulint		op = PAGE_CUR_DISJOINT;
598 
599 		ut_ad(lock);
600 
601 		if (!(lock->type_mode & LOCK_PREDICATE)
602 		    || (lock->type_mode & LOCK_MODE_MASK) == LOCK_X) {
603 			continue;
604 		}
605 
606 		lock_prdt = lock_get_prdt_from_lock(lock);
607 
608 		/* Check each lock in parent to see if it intersects with
609 		left or right child */
610 		if (!lock_prdt_consistent(lock_prdt, left_prdt, op)
611 		    && !lock_prdt_find_on_page(lock->type_mode, left_block,
612 					       lock_prdt, lock->trx)) {
613 			lock_prdt_add_to_queue(lock->type_mode,
614 					       left_block, lock->index,
615 					       lock->trx, lock_prdt,
616 					       FALSE);
617 		}
618 
619 		if (!lock_prdt_consistent(lock_prdt, right_prdt, op)
620 		    && !lock_prdt_find_on_page(lock->type_mode, right_block,
621 					       lock_prdt, lock->trx)) {
622 			lock_prdt_add_to_queue(lock->type_mode, right_block,
623 					       lock->index, lock->trx,
624 					       lock_prdt, FALSE);
625 		}
626 	}
627 
628 	lock_mutex_exit();
629 }
630 
631 /**************************************************************//**
632 Update predicate lock when page splits */
633 static
634 void
lock_prdt_update_split_low(buf_block_t * block,buf_block_t * new_block,lock_prdt_t * prdt,lock_prdt_t * new_prdt,ulint space,ulint page_no,ulint type_mode)635 lock_prdt_update_split_low(
636 /*=======================*/
637 	buf_block_t*	block,		/*!< in/out: page to be split */
638 	buf_block_t*	new_block,	/*!< in/out: the new half page */
639 	lock_prdt_t*	prdt,		/*!< in: MBR on the old page */
640 	lock_prdt_t*	new_prdt,	/*!< in: MBR on the new page */
641 	ulint		space,		/*!< in: space id */
642 	ulint		page_no,	/*!< in: page number */
643 	ulint		type_mode)	/*!< in: LOCK_PREDICATE or
644 					LOCK_PRDT_PAGE */
645 {
646 	lock_t*		lock;
647 
648 	lock_mutex_enter();
649 
650 	for (lock = lock_rec_get_first_on_page_addr(
651 			lock_hash_get(type_mode), space, page_no);
652 	     lock;
653 	     lock = lock_rec_get_next_on_page(lock)) {
654 		ut_ad(lock);
655 
656 		/* First dealing with Page Lock */
657 		if (lock->type_mode & LOCK_PRDT_PAGE) {
658 			/* Duplicate the lock to new page */
659 			trx_mutex_enter(lock->trx);
660 			lock_prdt_add_to_queue(lock->type_mode,
661 					       new_block,
662 					       lock->index,
663 					       lock->trx, NULL, TRUE);
664 
665 			trx_mutex_exit(lock->trx);
666 			continue;
667 		}
668 
669 		/* Now dealing with Predicate Lock */
670 		lock_prdt_t*	lock_prdt;
671 		ulint		op = PAGE_CUR_DISJOINT;
672 
673 		ut_ad(lock->type_mode & LOCK_PREDICATE);
674 
675 		/* No need to duplicate waiting X locks */
676 		if ((lock->type_mode & LOCK_MODE_MASK) == LOCK_X) {
677 			continue;
678 		}
679 
680 		lock_prdt = lock_get_prdt_from_lock(lock);
681 
682 		if (lock_prdt_consistent(lock_prdt, prdt, op)) {
683 
684 			if (!lock_prdt_consistent(lock_prdt, new_prdt, op)) {
685 				/* Move the lock to new page */
686 				trx_mutex_enter(lock->trx);
687 				lock_prdt_add_to_queue(lock->type_mode,
688 						       new_block,
689 						       lock->index,
690 						       lock->trx, lock_prdt,
691 						       TRUE);
692 				trx_mutex_exit(lock->trx);
693 			}
694 		} else if (!lock_prdt_consistent(lock_prdt, new_prdt, op)) {
695 			/* Duplicate the lock to new page */
696 			trx_mutex_enter(lock->trx);
697 			lock_prdt_add_to_queue(lock->type_mode,
698 					       new_block,
699 					       lock->index,
700 					       lock->trx, lock_prdt, TRUE);
701 
702 			trx_mutex_exit(lock->trx);
703 		}
704 	}
705 
706 	lock_mutex_exit();
707 }
708 
709 /**************************************************************//**
710 Update predicate lock when page splits */
711 void
lock_prdt_update_split(buf_block_t * block,buf_block_t * new_block,lock_prdt_t * prdt,lock_prdt_t * new_prdt,ulint space,ulint page_no)712 lock_prdt_update_split(
713 /*===================*/
714 	buf_block_t*	block,		/*!< in/out: page to be split */
715 	buf_block_t*	new_block,	/*!< in/out: the new half page */
716 	lock_prdt_t*	prdt,		/*!< in: MBR on the old page */
717 	lock_prdt_t*	new_prdt,	/*!< in: MBR on the new page */
718 	ulint		space,		/*!< in: space id */
719 	ulint		page_no)	/*!< in: page number */
720 {
721 	lock_prdt_update_split_low(block, new_block, prdt, new_prdt,
722 				   space, page_no, LOCK_PREDICATE);
723 
724 	lock_prdt_update_split_low(block, new_block, NULL, NULL,
725 				   space, page_no, LOCK_PRDT_PAGE);
726 }
727 
728 /*********************************************************************//**
729 Initiate a Predicate Lock from a MBR */
730 void
lock_init_prdt_from_mbr(lock_prdt_t * prdt,rtr_mbr_t * mbr,ulint mode,mem_heap_t * heap)731 lock_init_prdt_from_mbr(
732 /*====================*/
733 	lock_prdt_t*	prdt,	/*!< in/out: predicate to initialized */
734 	rtr_mbr_t*	mbr,	/*!< in: Minimum Bounding Rectangle */
735 	ulint		mode,	/*!< in: Search mode */
736 	mem_heap_t*	heap)	/*!< in: heap for allocating memory */
737 {
738 	memset(prdt, 0, sizeof(*prdt));
739 
740 	if (heap != NULL) {
741 		prdt->data = mem_heap_alloc(heap, sizeof(*mbr));
742 		ut_memcpy(prdt->data, mbr, sizeof(*mbr));
743 	} else {
744 		prdt->data = static_cast<void*>(mbr);
745 	}
746 
747 	prdt->op = static_cast<uint16>(mode);
748 }
749 
750 /*********************************************************************//**
751 Checks two predicate locks are compatible with each other
752 @return	true if consistent */
753 bool
lock_prdt_consistent(lock_prdt_t * prdt1,lock_prdt_t * prdt2,ulint op)754 lock_prdt_consistent(
755 /*=================*/
756 	lock_prdt_t*	prdt1,	/*!< in: Predicate for the lock */
757 	lock_prdt_t*	prdt2,	/*!< in: Predicate for the lock */
758 	ulint		op)	/*!< in: Predicate comparison operator */
759 {
760 	bool		ret = false;
761 	rtr_mbr_t*	mbr1 = prdt_get_mbr_from_prdt(prdt1);
762 	rtr_mbr_t*	mbr2 = prdt_get_mbr_from_prdt(prdt2);
763 	ulint		action;
764 
765 	if (op) {
766 		action = op;
767 	} else {
768 		if (prdt2->op != 0 && (prdt1->op != prdt2->op)) {
769 			return(false);
770 		}
771 
772 		action = prdt1->op;
773 	}
774 
775 	switch (action) {
776 	case PAGE_CUR_CONTAIN:
777 		ret = MBR_CONTAIN_CMP(mbr1, mbr2);
778 		break;
779 	case PAGE_CUR_DISJOINT:
780 		ret = MBR_DISJOINT_CMP(mbr1, mbr2);
781 		break;
782 	case PAGE_CUR_MBR_EQUAL:
783 		ret = MBR_EQUAL_CMP(mbr1, mbr2);
784 		break;
785 	case PAGE_CUR_INTERSECT:
786 		ret = MBR_INTERSECT_CMP(mbr1, mbr2);
787 		break;
788 	case PAGE_CUR_WITHIN:
789 		ret = MBR_WITHIN_CMP(mbr1, mbr2);
790 		break;
791 	default:
792 		ib::error() << "invalid operator " << action;
793 		ut_error;
794 	}
795 
796 	return(ret);
797 }
798 
799 /*********************************************************************//**
800 Acquire a predicate lock on a block
801 @return	DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
802 dberr_t
lock_prdt_lock(buf_block_t * block,lock_prdt_t * prdt,dict_index_t * index,lock_mode mode,ulint type_mode,que_thr_t * thr,mtr_t * mtr)803 lock_prdt_lock(
804 /*===========*/
805 	buf_block_t*	block,	/*!< in/out: buffer block of rec */
806 	lock_prdt_t*	prdt,	/*!< in: Predicate for the lock */
807 	dict_index_t*	index,	/*!< in: secondary index */
808 	lock_mode	mode,	/*!< in: mode of the lock which
809 				the read cursor should set on
810 				records: LOCK_S or LOCK_X; the
811 				latter is possible in
812 				SELECT FOR UPDATE */
813 	ulint		type_mode,
814 				/*!< in: LOCK_PREDICATE or LOCK_PRDT_PAGE */
815 	que_thr_t*	thr,	/*!< in: query thread
816 				(can be NULL if BTR_NO_LOCKING_FLAG) */
817 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
818 {
819 	trx_t*		trx = thr_get_trx(thr);
820 	dberr_t		err = DB_SUCCESS;
821 	lock_rec_req_status	status = LOCK_REC_SUCCESS;
822 
823 	if (trx->read_only || dict_table_is_temporary(index->table)) {
824 		return(DB_SUCCESS);
825 	}
826 
827 	ut_ad(!dict_index_is_clust(index));
828 	ut_ad(!dict_index_is_online_ddl(index));
829 	ut_ad(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE));
830 
831 	hash_table_t*	hash = type_mode == LOCK_PREDICATE
832 		? lock_sys->prdt_hash
833 		: lock_sys->prdt_page_hash;
834 
835 	/* Another transaction cannot have an implicit lock on the record,
836 	because when we come here, we already have modified the clustered
837 	index record, and this would not have been possible if another active
838 	transaction had modified this secondary index record. */
839 
840 	lock_mutex_enter();
841 
842 	const ulint	prdt_mode = mode | type_mode;
843 	lock_t*		lock = lock_rec_get_first_on_page(hash, block);
844 
845 	if (lock == NULL) {
846 
847 		RecLock	rec_lock(index, block, PRDT_HEAPNO, prdt_mode);
848 
849 		lock = rec_lock.create(trx, false, true);
850 
851 		status = LOCK_REC_SUCCESS_CREATED;
852 
853 	} else {
854 		trx_mutex_enter(trx);
855 
856 		if (lock_rec_get_next_on_page(lock)
857 		    || lock->trx != trx
858 		    || lock->type_mode != (LOCK_REC | prdt_mode)
859 		    || lock_rec_get_n_bits(lock) == 0
860 		    || ((type_mode & LOCK_PREDICATE)
861 		        && (!lock_prdt_consistent(
862 				lock_get_prdt_from_lock(lock), prdt, 0)))) {
863 
864 			lock = lock_prdt_has_lock(
865 				mode, type_mode, block, prdt, trx);
866 
867 			if (lock == NULL) {
868 
869 				const lock_t*	wait_for;
870 
871 				wait_for = lock_prdt_other_has_conflicting(
872 					prdt_mode, block, prdt, trx);
873 
874 				if (wait_for != NULL) {
875 
876 					RecLock	rec_lock(
877 						thr, index, block, PRDT_HEAPNO,
878 						prdt_mode, prdt);
879 
880 					err = rec_lock.add_to_waitq(wait_for);
881 
882 				} else {
883 
884 					lock_prdt_add_to_queue(
885 						prdt_mode, block, index, trx,
886 						prdt, true);
887 
888 					status = LOCK_REC_SUCCESS;
889 				}
890 			}
891 
892 			trx_mutex_exit(trx);
893 
894 		} else {
895 			trx_mutex_exit(trx);
896 
897 			if (!lock_rec_get_nth_bit(lock, PRDT_HEAPNO)) {
898 				lock_rec_set_nth_bit(lock, PRDT_HEAPNO);
899 				status = LOCK_REC_SUCCESS_CREATED;
900 			}
901 		}
902 	}
903 
904 	lock_mutex_exit();
905 
906 	if (status == LOCK_REC_SUCCESS_CREATED && type_mode == LOCK_PREDICATE) {
907 		/* Append the predicate in the lock record */
908 		lock_prdt_set_prdt(lock, prdt);
909 	}
910 
911 	return(err);
912 }
913 
914 /*********************************************************************//**
915 Acquire a "Page" lock on a block
916 @return	DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
917 dberr_t
lock_place_prdt_page_lock(ulint space,ulint page_no,dict_index_t * index,que_thr_t * thr)918 lock_place_prdt_page_lock(
919 /*======================*/
920 	ulint		space,		/*!< in: space for the page to lock */
921 	ulint		page_no,	/*!< in: page number */
922 	dict_index_t*	index,		/*!< in: secondary index */
923 	que_thr_t*	thr)		/*!< in: query thread */
924 {
925 	ut_ad(thr != NULL);
926 	ut_ad(!srv_read_only_mode);
927 
928 	ut_ad(!dict_index_is_clust(index));
929 	ut_ad(!dict_index_is_online_ddl(index));
930 
931 	/* Another transaction cannot have an implicit lock on the record,
932 	because when we come here, we already have modified the clustered
933 	index record, and this would not have been possible if another active
934 	transaction had modified this secondary index record. */
935 
936 	lock_mutex_enter();
937 
938 	const lock_t*	lock = lock_rec_get_first_on_page_addr(
939 		lock_sys->prdt_page_hash, space, page_no);
940 
941 	const ulint	mode = LOCK_S | LOCK_PRDT_PAGE;
942 	trx_t*		trx = thr_get_trx(thr);
943 
944 	if (lock != NULL) {
945 
946 		trx_mutex_enter(trx);
947 
948 		/* Find a matching record lock owned by this transaction. */
949 
950 		while (lock != NULL && lock->trx != trx) {
951 
952 			lock = lock_rec_get_next_on_page_const(lock);
953 		}
954 
955 		ut_ad(lock == NULL || lock->type_mode == (mode | LOCK_REC));
956 		ut_ad(lock == NULL || lock_rec_get_n_bits(lock) != 0);
957 
958 		trx_mutex_exit(trx);
959 	}
960 
961 	if (lock == NULL) {
962 		RecID	rec_id(space, page_no, PRDT_HEAPNO);
963 		RecLock	rec_lock(index, rec_id, mode);
964 
965 		rec_lock.create(trx, false, true);
966 
967 #ifdef PRDT_DIAG
968 		printf("GIS_DIAGNOSTIC: page lock %d\n", (int) page_no);
969 #endif /* PRDT_DIAG */
970 	}
971 
972 	lock_mutex_exit();
973 
974 	return(DB_SUCCESS);
975 }
976 
977 /** Check whether there are R-tree Page lock on a page
978 @param[in]	trx	trx to test the lock
979 @param[in]	space	space id for the page
980 @param[in]	page_no	page number
981 @return	true if there is none */
982 bool
lock_test_prdt_page_lock(const trx_t * trx,ulint space,ulint page_no)983 lock_test_prdt_page_lock(
984 	const trx_t*    trx,
985 	ulint           space,
986 	ulint           page_no)
987 {
988 	lock_t*		lock;
989 
990 	lock_mutex_enter();
991 
992 	lock = lock_rec_get_first_on_page_addr(
993 		lock_sys->prdt_page_hash, space, page_no);
994 
995 	lock_mutex_exit();
996 
997 	return(lock == NULL || trx == lock->trx);
998 }
999 
1000 /*************************************************************//**
1001 Moves the locks of a page to another page and resets the lock bits of
1002 the donating records. */
1003 void
lock_prdt_rec_move(const buf_block_t * receiver,const buf_block_t * donator)1004 lock_prdt_rec_move(
1005 /*===============*/
1006 	const buf_block_t*	receiver,	/*!< in: buffer block containing
1007 						the receiving record */
1008 	const buf_block_t*	donator)	/*!< in: buffer block containing
1009 						the donating record */
1010 {
1011 	lock_t* lock;
1012 
1013 	if (!lock_sys->prdt_hash) {
1014 		return;
1015 	}
1016 
1017 	lock_mutex_enter();
1018 
1019 	for (lock = lock_rec_get_first(lock_sys->prdt_hash,
1020 				       donator, PRDT_HEAPNO);
1021 	     lock != NULL;
1022 	     lock = lock_rec_get_next(PRDT_HEAPNO, lock)) {
1023 
1024 		const ulint     type_mode = lock->type_mode;
1025 		lock_prdt_t*	lock_prdt = lock_get_prdt_from_lock(lock);
1026 
1027 		lock_rec_trx_wait(lock, PRDT_HEAPNO, type_mode);
1028 
1029 		lock_prdt_add_to_queue(
1030 			type_mode, receiver, lock->index, lock->trx,
1031 			lock_prdt, FALSE);
1032 	}
1033 
1034 	lock_mutex_exit();
1035 }
1036 
1037 /** Removes predicate lock objects set on an index page which is discarded.
1038 @param[in]	block		page to be discarded
1039 @param[in]	lock_hash	lock hash */
1040 void
lock_prdt_page_free_from_discard(const buf_block_t * block,hash_table_t * lock_hash)1041 lock_prdt_page_free_from_discard(
1042 	const buf_block_t*      block,
1043 	hash_table_t*		lock_hash)
1044 {
1045 	lock_t*	lock;
1046 	lock_t*	next_lock;
1047 	ulint	space;
1048 	ulint	page_no;
1049 
1050 	ut_ad(lock_mutex_own());
1051 
1052 	space = block->page.id.space();
1053 	page_no = block->page.id.page_no();
1054 
1055 	lock = lock_rec_get_first_on_page_addr(lock_hash, space, page_no);
1056 
1057 	while (lock != NULL) {
1058 		next_lock = lock_rec_get_next_on_page(lock);
1059 
1060 		lock_rec_discard(lock);
1061 
1062 		lock = next_lock;
1063 	}
1064 }
1065 
1066