1 /* QQ: TODO - allocate everything from dynarrays !!! (benchmark) */
2 /* QQ: TODO instant duration locks */
3 /* QQ: #warning automatically place S instead of LS if possible */
4
5 /* Copyright (C) 2006 MySQL AB
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; version 2 of the License.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
19
20 /*
21 Generic Lock Manager
22
23 Lock manager handles locks on "resources", a resource must be uniquely
24 identified by a 64-bit number. Lock manager itself does not imply
25 anything about the nature of a resource - it can be a row, a table, a
26 database, or just anything.
27
28 Locks belong to "lock owners". A Lock owner is uniquely identified by a
29 16-bit number. A function loid2lo must be provided by the application
30 that takes such a number as an argument and returns a LOCK_OWNER
31 structure.
32
33 Lock levels are completely defined by three tables. Lock compatibility
34 matrix specifies which locks can be held at the same time on a resource.
35 Lock combining matrix specifies what lock level has the same behaviour as
36 a pair of two locks of given levels. getlock_result matrix simplifies
37 intention locking and lock escalation for an application, basically it
38 defines which locks are intention locks and which locks are "loose"
39 locks. It is only used to provide better diagnostics for the
40 application, lock manager itself does not differentiate between normal,
41 intention, and loose locks.
42
43 Internally lock manager is based on a lock-free hash, see lf_hash.c for
44 details. All locks are stored in a hash, with a resource id as a search
45 key, so all locks for the same resource will be considered collisions and
46 will be put in a one (lock-free) linked list. The main lock-handling
47 logic is in the inner loop that searches for a lock in such a linked
48 list - lockfind().
49
50 This works as follows. Locks generally are added to the end of the list
51 (with one exception, see below). When scanning the list it is always
52 possible to determine what locks are granted (active) and what locks are
53 waiting - first lock is obviously active, the second is active if it's
54 compatible with the first, and so on, a lock is active if it's compatible
55 with all previous locks and all locks before it are also active.
56 To calculate the "compatible with all previous locks" all locks are
57 accumulated in prev_lock variable using lock_combining_matrix.
58
59 Lock upgrades: when a thread that has a lock on a given resource,
60 requests a new lock on the same resource and the old lock is not enough
61 to satisfy new lock requirements (which is defined by
62 lock_combining_matrix[old_lock][new_lock] != old_lock), a new lock is
63 placed in the list. Depending on other locks it is immediately active or
64 it will wait for other locks. Here's an exception to "locks are added
65 to the end" rule - upgraded locks are added after the last active lock
66 but before all waiting locks. Old lock (the one we upgraded from) is
67 not removed from the list, indeed it may be needed if the new lock was
68 in a savepoint that gets rolled back. So old lock is marked as "ignored"
69 (IGNORE_ME flag). New lock gets an UPGRADED flag.
70
71 Loose locks add an important exception to the above. Loose locks do not
72 always commute with other locks. In the list IX-LS both locks are active,
73 while in the LS-IX list only the first lock is active. This creates a
74 problem in lock upgrades. If the list was IX-LS and the owner of the
75 first lock wants to place LS lock (which can be immediately granted), the
76 IX lock is upgraded to LSIX and the list becomes IX-LS-LSIX, which,
77 according to the lock compatibility matrix means that the last lock is
78 waiting - of course it all happened because IX and LS were swapped and
79 they don't commute. To work around this there's ACTIVE flag which is set
80 in every lock that never waited (was placed active), and this flag
81 overrides "compatible with all previous locks" rule.
82
83 When a lock is placed to the end of the list it's either compatible with
84 all locks and all locks are active - new lock becomes active at once, or
85 it conflicts with some of the locks, in this case in the 'blocker'
86 variable a conflicting lock is returned and the calling thread waits on a
87 pthread condition in the LOCK_OWNER structure of the owner of the
88 conflicting lock. Or a new lock is compatible with all locks, but some
89 existing locks are not compatible with each other (example: request IS,
90 when the list is S-IX) - that is not all locks are active. In this case a
91 first waiting lock is returned in the 'blocker' variable, lockman_getlock()
92 notices that a "blocker" does not conflict with the requested lock, and
93 "dereferences" it, to find the lock that it's waiting on. The calling
94 thread than begins to wait on the same lock.
95
96 To better support table-row relations where one needs to lock the table
97 with an intention lock before locking the row, extended diagnostics is
98 provided. When an intention lock (presumably on a table) is granted,
99 lockman_getlock() returns one of GOT_THE_LOCK (no need to lock the row,
100 perhaps the thread already has a normal lock on this table),
101 GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE (need to lock the row, as usual),
102 GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE (only need to check
103 whether it's possible to lock the row, but no need to lock it - perhaps
104 the thread has a loose lock on this table). This is defined by
105 getlock_result[] table.
106 */
107
108 #include <my_global.h>
109 #include <my_sys.h>
110 #include <my_bit.h>
111 #include <lf.h>
112 #include "my_cpu.h"
113 #include "lockman.h"
114
115 /*
116 Lock compatibility matrix.
117
118 It's asymmetric. Read it as "Somebody has the lock <value in the row
119 label>, can I set the lock <value in the column label> ?"
120
121 ') Though you can take LS lock while somebody has S lock, it makes no
122 sense - it's simpler to take S lock too.
123
124 1 - compatible
125 0 - incompatible
126 -1 - "impossible", so that we can assert the impossibility.
127 */
128 static int lock_compatibility_matrix[10][10]=
129 { /* N S X IS IX SIX LS LX SLX LSIX */
130 { -1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }, /* N */
131 { -1, 1, 0, 1, 0, 0, 1, 0, 0, 0 }, /* S */
132 { -1, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, /* X */
133 { -1, 1, 0, 1, 1, 1, 1, 1, 1, 1 }, /* IS */
134 { -1, 0, 0, 1, 1, 0, 1, 1, 0, 1 }, /* IX */
135 { -1, 0, 0, 1, 0, 0, 1, 0, 0, 0 }, /* SIX */
136 { -1, 1, 0, 1, 0, 0, 1, 0, 0, 0 }, /* LS */
137 { -1, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, /* LX */
138 { -1, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, /* SLX */
139 { -1, 0, 0, 1, 0, 0, 1, 0, 0, 0 } /* LSIX */
140 };
141
142 /*
143 Lock combining matrix.
144
145 It's symmetric. Read it as "what lock level L is identical to the
146 set of two locks A and B"
147
148 One should never get N from it, we assert the impossibility
149 */
150 static enum lockman_lock_type lock_combining_matrix[10][10]=
151 {/* N S X IS IX SIX LS LX SLX LSIX */
152 { N, S, X, IS, IX, SIX, S, SLX, SLX, SIX}, /* N */
153 { S, S, X, S, SIX, SIX, S, SLX, SLX, SIX}, /* S */
154 { X, X, X, X, X, X, X, X, X, X}, /* X */
155 { IS, S, X, IS, IX, SIX, LS, LX, SLX, LSIX}, /* IS */
156 { IX, SIX, X, IX, IX, SIX, LSIX, LX, SLX, LSIX}, /* IX */
157 { SIX, SIX, X, SIX, SIX, SIX, SIX, SLX, SLX, SIX}, /* SIX */
158 { LS, S, X, LS, LSIX, SIX, LS, LX, SLX, LSIX}, /* LS */
159 { LX, SLX, X, LX, LX, SLX, LX, LX, SLX, LX}, /* LX */
160 { SLX, SLX, X, SLX, SLX, SLX, SLX, SLX, SLX, SLX}, /* SLX */
161 { LSIX, SIX, X, LSIX, LSIX, SIX, LSIX, LX, SLX, LSIX} /* LSIX */
162 };
163
164 #define REPEAT_ONCE_MORE 0
165 #define OK_TO_PLACE_THE_LOCK 1
166 #define OK_TO_PLACE_THE_REQUEST 2
167 #define ALREADY_HAVE_THE_LOCK 4
168 #define ALREADY_HAVE_THE_REQUEST 8
169 #define PLACE_NEW_DISABLE_OLD 16
170 #define REQUEST_NEW_DISABLE_OLD 32
171 #define RESOURCE_WAS_UNLOCKED 64
172
173 #define NEED_TO_WAIT (OK_TO_PLACE_THE_REQUEST | ALREADY_HAVE_THE_REQUEST |\
174 REQUEST_NEW_DISABLE_OLD)
175 #define ALREADY_HAVE (ALREADY_HAVE_THE_LOCK | ALREADY_HAVE_THE_REQUEST)
176 #define LOCK_UPGRADE (PLACE_NEW_DISABLE_OLD | REQUEST_NEW_DISABLE_OLD)
177
178
179 /*
180 the return codes for lockman_getlock
181
182 It's asymmetric. Read it as "I have the lock <value in the row label>,
183 what value should be returned for <value in the column label> ?"
184
185 0 means impossible combination (assert!)
186
187 Defines below help to preserve the table structure.
188 I/L/A values are self explanatory
189 x means the combination is possible (assert should not crash)
190 but it cannot happen in row locks, only in table locks (S,X),
191 or lock escalations (LS,LX)
192 */
193 #define I GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE
194 #define L GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE
195 #define A GOT_THE_LOCK
196 #define x GOT_THE_LOCK
197 static enum lockman_getlock_result getlock_result[10][10]=
198 {/* N S X IS IX SIX LS LX SLX LSIX */
199 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* N */
200 { 0, x, 0, A, 0, 0, x, 0, 0, 0}, /* S */
201 { 0, x, x, A, A, 0, x, x, 0, 0}, /* X */
202 { 0, 0, 0, I, 0, 0, 0, 0, 0, 0}, /* IS */
203 { 0, 0, 0, I, I, 0, 0, 0, 0, 0}, /* IX */
204 { 0, x, 0, A, I, 0, x, 0, 0, 0}, /* SIX */
205 { 0, 0, 0, L, 0, 0, x, 0, 0, 0}, /* LS */
206 { 0, 0, 0, L, L, 0, x, x, 0, 0}, /* LX */
207 { 0, x, 0, A, L, 0, x, x, 0, 0}, /* SLX */
208 { 0, 0, 0, L, I, 0, x, 0, 0, 0} /* LSIX */
209 };
210 #undef I
211 #undef L
212 #undef A
213 #undef x
214
215 typedef struct lockman_lock {
216 uint64 resource;
217 struct lockman_lock *lonext;
218 intptr volatile link;
219 uint32 hashnr;
220 /* QQ: TODO - remove hashnr from LOCK */
221 uint16 loid;
222 uchar lock; /* sizeof(uchar) <= sizeof(enum) */
223 uchar flags;
224 } LOCK;
225
226 #define IGNORE_ME 1
227 #define UPGRADED 2
228 #define ACTIVE 4
229
230 typedef struct {
231 intptr volatile *prev;
232 LOCK *curr, *next;
233 LOCK *blocker, *upgrade_from;
234 } CURSOR;
235
236 #define PTR(V) (LOCK *)((V) & (~(intptr)1))
237 #define DELETED(V) ((V) & 1)
238
239 /*
240 NOTE
241 cursor is positioned in either case
242 pins[0..3] are used, they are NOT removed on return
243 */
lockfind(LOCK * volatile * head,LOCK * node,CURSOR * cursor,LF_PINS * pins)244 static int lockfind(LOCK * volatile *head, LOCK *node,
245 CURSOR *cursor, LF_PINS *pins)
246 {
247 uint32 hashnr, cur_hashnr;
248 uint64 resource, cur_resource;
249 intptr cur_link;
250 my_bool cur_active, compatible, upgrading, prev_active;
251 enum lockman_lock_type lock, prev_lock, cur_lock;
252 uint16 loid, cur_loid;
253 int cur_flags, flags;
254
255 hashnr= node->hashnr;
256 resource= node->resource;
257 lock= node->lock;
258 loid= node->loid;
259 flags= node->flags;
260
261 retry:
262 cursor->prev= (intptr *)head;
263 prev_lock= N;
264 cur_active= TRUE;
265 compatible= TRUE;
266 upgrading= FALSE;
267 cursor->blocker= cursor->upgrade_from= 0;
268 lf_unpin(pins, 3);
269 do {
270 cursor->curr= PTR(*cursor->prev);
271 lf_pin(pins, 1, cursor->curr);
272 } while(*cursor->prev != (intptr)cursor->curr && LF_BACKOFF());
273 for (;;)
274 {
275 if (!cursor->curr)
276 break;
277 do {
278 cur_link= cursor->curr->link;
279 cursor->next= PTR(cur_link);
280 lf_pin(pins, 0, cursor->next);
281 } while (cur_link != cursor->curr->link && LF_BACKOFF());
282 cur_hashnr= cursor->curr->hashnr;
283 cur_resource= cursor->curr->resource;
284 cur_lock= cursor->curr->lock;
285 cur_loid= cursor->curr->loid;
286 cur_flags= cursor->curr->flags;
287 if (*cursor->prev != (intptr)cursor->curr)
288 {
289 (void)LF_BACKOFF();
290 goto retry;
291 }
292 if (!DELETED(cur_link))
293 {
294 if (cur_hashnr > hashnr ||
295 (cur_hashnr == hashnr && cur_resource >= resource))
296 {
297 if (cur_hashnr > hashnr || cur_resource > resource)
298 break;
299 /* ok, we have a lock for this resource */
300 DBUG_ASSERT(lock_compatibility_matrix[prev_lock][cur_lock] >= 0);
301 DBUG_ASSERT(lock_compatibility_matrix[cur_lock][lock] >= 0);
302 if ((cur_flags & IGNORE_ME) && ! (flags & IGNORE_ME))
303 {
304 DBUG_ASSERT(cur_active);
305 if (cur_loid == loid)
306 cursor->upgrade_from= cursor->curr;
307 }
308 else
309 {
310 prev_active= cur_active;
311 if (cur_flags & ACTIVE)
312 DBUG_ASSERT(prev_active == TRUE);
313 else
314 cur_active&= lock_compatibility_matrix[prev_lock][cur_lock];
315 if (upgrading && !cur_active /*&& !(cur_flags & UPGRADED)*/)
316 break;
317 if (prev_active && !cur_active)
318 {
319 cursor->blocker= cursor->curr;
320 lf_pin(pins, 3, cursor->curr);
321 }
322 if (cur_loid == loid)
323 {
324 /* we already have a lock on this resource */
325 DBUG_ASSERT(lock_combining_matrix[cur_lock][lock] != N);
326 DBUG_ASSERT(!upgrading || (flags & IGNORE_ME));
327 if (lock_combining_matrix[cur_lock][lock] == cur_lock)
328 {
329 /* new lock is compatible */
330 if (cur_active)
331 {
332 cursor->blocker= cursor->curr; /* loose-locks! */
333 lf_unpin(pins, 3); /* loose-locks! */
334 return ALREADY_HAVE_THE_LOCK;
335 }
336 else
337 return ALREADY_HAVE_THE_REQUEST;
338 }
339 /* not compatible, upgrading */
340 upgrading= TRUE;
341 cursor->upgrade_from= cursor->curr;
342 }
343 else
344 {
345 if (!lock_compatibility_matrix[cur_lock][lock])
346 {
347 compatible= FALSE;
348 cursor->blocker= cursor->curr;
349 lf_pin(pins, 3, cursor->curr);
350 }
351 }
352 prev_lock= lock_combining_matrix[prev_lock][cur_lock];
353 DBUG_ASSERT(prev_lock != N);
354 }
355 }
356 cursor->prev= &(cursor->curr->link);
357 lf_pin(pins, 2, cursor->curr);
358 }
359 else
360 {
361 if (my_atomic_casptr((void **)cursor->prev,
362 (void **)(char*) &cursor->curr, cursor->next))
363 lf_alloc_free(pins, cursor->curr);
364 else
365 {
366 (void)LF_BACKOFF();
367 goto retry;
368 }
369 }
370 cursor->curr= cursor->next;
371 lf_pin(pins, 1, cursor->curr);
372 }
373 /*
374 either the end of lock list - no more locks for this resource,
375 or upgrading and the end of active lock list
376 */
377 if (upgrading)
378 {
379 if (compatible /*&& prev_active*/)
380 return PLACE_NEW_DISABLE_OLD;
381 else
382 return REQUEST_NEW_DISABLE_OLD;
383 }
384 if (cur_active && compatible)
385 {
386 /*
387 either no locks for this resource or all are compatible.
388 ok to place the lock in any case.
389 */
390 return prev_lock == N ? RESOURCE_WAS_UNLOCKED
391 : OK_TO_PLACE_THE_LOCK;
392 }
393 /* we have a lock conflict. ok to place a lock request. And wait */
394 return OK_TO_PLACE_THE_REQUEST;
395 }
396
397 /*
398 NOTE
399 it uses pins[0..3], on return pins 0..2 are removed, pin 3 (blocker) stays
400 */
lockinsert(LOCK * volatile * head,LOCK * node,LF_PINS * pins,LOCK ** blocker)401 static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
402 LOCK **blocker)
403 {
404 CURSOR cursor;
405 int res;
406
407 do
408 {
409 res= lockfind(head, node, &cursor, pins);
410 DBUG_ASSERT(res != ALREADY_HAVE_THE_REQUEST);
411 if (!(res & ALREADY_HAVE))
412 {
413 if (res & LOCK_UPGRADE)
414 {
415 node->flags|= UPGRADED;
416 node->lock= lock_combining_matrix[cursor.upgrade_from->lock][node->lock];
417 }
418 if (!(res & NEED_TO_WAIT))
419 node->flags|= ACTIVE;
420 node->link= (intptr)cursor.curr;
421 DBUG_ASSERT(node->link != (intptr)node);
422 DBUG_ASSERT(cursor.prev != &node->link);
423 if (!my_atomic_casptr((void **)cursor.prev,
424 (void **)(char*) &cursor.curr, node))
425 {
426 res= REPEAT_ONCE_MORE;
427 node->flags&= ~ACTIVE;
428 }
429 if (res & LOCK_UPGRADE)
430 cursor.upgrade_from->flags|= IGNORE_ME;
431 /*
432 QQ: is this OK ? if a reader has already read upgrade_from,
433 it may find it conflicting with node :(
434 - see the last test from test_lockman_simple()
435 */
436 }
437
438 } while (res == REPEAT_ONCE_MORE);
439 lf_unpin(pins, 0);
440 lf_unpin(pins, 1);
441 lf_unpin(pins, 2);
442 /*
443 note that blocker is not necessarily pinned here (when it's == curr).
444 this is ok as in such a case it's either a dummy node for
445 initialize_bucket() and dummy nodes don't need pinning,
446 or it's a lock of the same transaction for lockman_getlock,
447 and it cannot be removed by another thread
448 */
449 *blocker= cursor.blocker;
450 return res;
451 }
452
453 /*
454 NOTE
455 it uses pins[0..3], on return pins 0..2 are removed, pin 3 (blocker) stays
456 */
lockpeek(LOCK * volatile * head,LOCK * node,LF_PINS * pins,LOCK ** blocker)457 static int lockpeek(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
458 LOCK **blocker)
459 {
460 CURSOR cursor;
461 int res;
462
463 res= lockfind(head, node, &cursor, pins);
464
465 lf_unpin(pins, 0);
466 lf_unpin(pins, 1);
467 lf_unpin(pins, 2);
468 if (blocker)
469 *blocker= cursor.blocker;
470 return res;
471 }
472
473 /*
474 NOTE
475 it uses pins[0..3], on return all pins are removed.
476
477 One _must_ have the lock (or request) to call this
478 */
lockdelete(LOCK * volatile * head,LOCK * node,LF_PINS * pins)479 static int lockdelete(LOCK * volatile *head, LOCK *node, LF_PINS *pins)
480 {
481 CURSOR cursor;
482 int res;
483
484 do
485 {
486 res= lockfind(head, node, &cursor, pins);
487 DBUG_ASSERT(res & ALREADY_HAVE);
488
489 if (cursor.upgrade_from)
490 cursor.upgrade_from->flags&= ~IGNORE_ME;
491
492 /*
493 XXX this does not work with savepoints, as old lock is left ignored.
494 It cannot be unignored, as would basically mean moving the lock back
495 in the lock chain (from upgraded). And the latter is not allowed -
496 because it breaks list scanning. So old ignored lock must be deleted,
497 new - same - lock must be installed right after the lock we're deleting,
498 then we can delete. Good news is - this is only required when rolling
499 back a savepoint.
500 */
501 if (my_atomic_casptr((void **)(char*)&(cursor.curr->link),
502 (void **)(char*)&cursor.next, 1+(char *)cursor.next))
503 {
504 if (my_atomic_casptr((void **)cursor.prev,
505 (void **)(char*)&cursor.curr, cursor.next))
506 lf_alloc_free(pins, cursor.curr);
507 else
508 lockfind(head, node, &cursor, pins);
509 }
510 else
511 {
512 res= REPEAT_ONCE_MORE;
513 if (cursor.upgrade_from)
514 cursor.upgrade_from->flags|= IGNORE_ME;
515 }
516 } while (res == REPEAT_ONCE_MORE);
517 lf_unpin(pins, 0);
518 lf_unpin(pins, 1);
519 lf_unpin(pins, 2);
520 lf_unpin(pins, 3);
521 return res;
522 }
523
lockman_init(LOCKMAN * lm,loid_to_lo_func * func,uint timeout)524 void lockman_init(LOCKMAN *lm, loid_to_lo_func *func, uint timeout)
525 {
526 lf_alloc_init(&lm->alloc, sizeof(LOCK), offsetof(LOCK, lonext));
527 lf_dynarray_init(&lm->array, sizeof(LOCK **));
528 lm->size= 1;
529 lm->count= 0;
530 lm->loid_to_lo= func;
531 lm->lock_timeout= timeout;
532 }
533
lockman_destroy(LOCKMAN * lm)534 void lockman_destroy(LOCKMAN *lm)
535 {
536 LOCK *el= *(LOCK **)lf_dynarray_lvalue(&lm->array, 0);
537 while (el)
538 {
539 intptr next= el->link;
540 if (el->hashnr & 1)
541 lf_alloc_direct_free(&lm->alloc, el);
542 else
543 my_free((void *)el);
544 el= (LOCK *)next;
545 }
546 lf_alloc_destroy(&lm->alloc);
547 lf_dynarray_destroy(&lm->array);
548 }
549
550 /* TODO: optimize it */
551 #define MAX_LOAD 1
552
initialize_bucket(LOCKMAN * lm,LOCK * volatile * node,uint bucket,LF_PINS * pins)553 static void initialize_bucket(LOCKMAN *lm, LOCK * volatile *node,
554 uint bucket, LF_PINS *pins)
555 {
556 int res;
557 uint parent= my_clear_highest_bit(bucket);
558 LOCK *dummy= (LOCK *)my_malloc(PSI_INSTRUMENT_ME, sizeof(LOCK), MYF(MY_WME));
559 LOCK **tmp= 0, *cur;
560 LOCK * volatile *el= lf_dynarray_lvalue(&lm->array, parent);
561
562 if (*el == NULL && bucket)
563 initialize_bucket(lm, el, parent, pins);
564 dummy->hashnr= my_reverse_bits(bucket);
565 dummy->loid= 0;
566 dummy->lock= X; /* doesn't matter, in fact */
567 dummy->resource= 0;
568 dummy->flags= 0;
569 res= lockinsert(el, dummy, pins, &cur);
570 DBUG_ASSERT(res & (ALREADY_HAVE_THE_LOCK | RESOURCE_WAS_UNLOCKED));
571 if (res & ALREADY_HAVE_THE_LOCK)
572 {
573 my_free((void *)dummy);
574 dummy= cur;
575 }
576 my_atomic_casptr((void **)node, (void **)(char*) &tmp, dummy);
577 }
578
calc_hash(uint64 resource)579 static inline uint calc_hash(uint64 resource)
580 {
581 const uchar *pos= (uchar *)&resource;
582 ulong nr1= 1, nr2= 4, i;
583 for (i= 0; i < sizeof(resource) ; i++, pos++)
584 {
585 nr1^= (ulong) ((((uint) nr1 & 63)+nr2) * ((uint)*pos)) + (nr1 << 8);
586 nr2+= 3;
587 }
588 return nr1 & INT_MAX32;
589 }
590
591 /*
592 RETURN
593 see enum lockman_getlock_result
594 NOTE
595 uses pins[0..3], they're removed on return
596 */
lockman_getlock(LOCKMAN * lm,LOCK_OWNER * lo,uint64 resource,enum lockman_lock_type lock)597 enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
598 uint64 resource,
599 enum lockman_lock_type lock)
600 {
601 int res;
602 uint csize, bucket, hashnr;
603 LOCK *node, * volatile *el, *blocker;
604 LF_PINS *pins= lo->pins;
605 enum lockman_lock_type old_lock;
606
607 DBUG_ASSERT(lo->loid);
608 node= (LOCK *)lf_alloc_new(pins);
609 node->flags= 0;
610 node->lock= lock;
611 node->loid= lo->loid;
612 node->resource= resource;
613 hashnr= calc_hash(resource);
614 bucket= hashnr % lm->size;
615 el= lf_dynarray_lvalue(&lm->array, bucket);
616 if (*el == NULL)
617 initialize_bucket(lm, el, bucket, pins);
618 node->hashnr= my_reverse_bits(hashnr) | 1;
619 res= lockinsert(el, node, pins, &blocker);
620 if (res & ALREADY_HAVE)
621 {
622 int r;
623 old_lock= blocker->lock;
624 lf_alloc_free(pins, node);
625 r= getlock_result[old_lock][lock];
626 DBUG_ASSERT(r);
627 return r;
628 }
629 /* a new value was added to the hash */
630 csize= lm->size;
631 if ((my_atomic_add32(&lm->count, 1)+1.0) / csize > MAX_LOAD)
632 my_atomic_cas32(&lm->size, (int*) &csize, csize*2);
633 node->lonext= lo->all_locks;
634 lo->all_locks= node;
635 for ( ; res & NEED_TO_WAIT; res= lockpeek(el, node, pins, &blocker))
636 {
637 LOCK_OWNER *wait_for_lo;
638 ulonglong deadline;
639 struct timespec timeout;
640
641 lf_assert_pin(pins, 3); /* blocker must be pinned here */
642 wait_for_lo= lm->loid_to_lo(blocker->loid);
643
644 /*
645 now, this is tricky. blocker is not necessarily a LOCK
646 we're waiting for. If it's compatible with what we want,
647 then we're waiting for a lock that blocker is waiting for
648 (see two places where blocker is set in lockfind)
649 In the latter case, let's "dereference" it
650 */
651 if (lock_compatibility_matrix[blocker->lock][lock])
652 {
653 blocker= wait_for_lo->all_locks;
654 lf_pin(pins, 3, blocker);
655 if (blocker != wait_for_lo->all_locks)
656 continue;
657 wait_for_lo= wait_for_lo->waiting_for;
658 }
659
660 /*
661 note that the blocker transaction may have ended by now,
662 its LOCK_OWNER and short id were reused, so 'wait_for_lo' may point
663 to an unrelated - albeit valid - LOCK_OWNER
664 */
665 if (!wait_for_lo)
666 continue;
667
668 lo->waiting_for= wait_for_lo;
669
670 /*
671 We lock a mutex - it may belong to a wrong LOCK_OWNER, but it must
672 belong to _some_ LOCK_OWNER. It means, we can never free() a LOCK_OWNER,
673 if there're other active LOCK_OWNERs.
674 */
675 /* QQ: race condition here */
676 pthread_mutex_lock(wait_for_lo->mutex);
677 if (DELETED(blocker->link))
678 {
679 /*
680 blocker transaction was ended, or a savepoint that owned
681 the lock was rolled back. Either way - the lock was removed
682 */
683 pthread_mutex_unlock(wait_for_lo->mutex);
684 continue;
685 }
686
687 /* yuck. waiting */
688 deadline= my_hrtime().val*1000 + lm->lock_timeout * 1000000;
689 set_timespec_time_nsec(timeout, deadline);
690 do
691 {
692 pthread_cond_timedwait(wait_for_lo->cond, wait_for_lo->mutex, &timeout);
693 } while (!DELETED(blocker->link) && my_hrtime().val < deadline/1000);
694 pthread_mutex_unlock(wait_for_lo->mutex);
695 if (!DELETED(blocker->link))
696 {
697 /*
698 timeout.
699 note that we _don't_ release the lock request here.
700 Instead we're relying on the caller to abort the transaction,
701 and release all locks at once - see lockman_release_locks()
702 */
703 lf_unpin(pins, 3);
704 return DIDNT_GET_THE_LOCK;
705 }
706 }
707 lo->waiting_for= 0;
708 lf_assert_unpin(pins, 3); /* unpin should not be needed */
709 return getlock_result[lock][lock];
710 }
711
712 /*
713 RETURN
714 0 - deleted
715 1 - didn't (not found)
716 NOTE
717 see lockdelete() for pin usage notes
718 */
lockman_release_locks(LOCKMAN * lm,LOCK_OWNER * lo)719 int lockman_release_locks(LOCKMAN *lm, LOCK_OWNER *lo)
720 {
721 LOCK * volatile *el, *node, *next;
722 uint bucket;
723 LF_PINS *pins= lo->pins;
724
725 pthread_mutex_lock(lo->mutex);
726 for (node= lo->all_locks; node; node= next)
727 {
728 next= node->lonext;
729 bucket= calc_hash(node->resource) % lm->size;
730 el= lf_dynarray_lvalue(&lm->array, bucket);
731 if (*el == NULL)
732 initialize_bucket(lm, el, bucket, pins);
733 lockdelete(el, node, pins);
734 my_atomic_add32(&lm->count, -1);
735 }
736 lo->all_locks= 0;
737 /* now signal all waiters */
738 pthread_cond_broadcast(lo->cond);
739 pthread_mutex_unlock(lo->mutex);
740 return 0;
741 }
742
743 #ifdef MY_LF_EXTRA_DEBUG
744 static const char *lock2str[]=
745 { "N", "S", "X", "IS", "IX", "SIX", "LS", "LX", "SLX", "LSIX" };
746 /*
747 NOTE
748 the function below is NOT thread-safe !!!
749 */
print_lockhash(LOCKMAN * lm)750 void print_lockhash(LOCKMAN *lm)
751 {
752 LOCK *el= *(LOCK **)lf_dynarray_lvalue(&lm->array, 0);
753 printf("hash: size %u count %u\n", lm->size, lm->count);
754 while (el)
755 {
756 intptr next= el->link;
757 if (el->hashnr & 1)
758 {
759 printf("0x%08lx { resource %lu, loid %u, lock %s",
760 (long) el->hashnr, (ulong) el->resource, el->loid,
761 lock2str[el->lock]);
762 if (el->flags & IGNORE_ME) printf(" IGNORE_ME");
763 if (el->flags & UPGRADED) printf(" UPGRADED");
764 if (el->flags & ACTIVE) printf(" ACTIVE");
765 if (DELETED(next)) printf(" ***DELETED***");
766 printf("}\n");
767 }
768 else
769 {
770 /*printf("0x%08x { dummy }\n", el->hashnr);*/
771 DBUG_ASSERT(el->resource == 0 && el->loid == 0 && el->lock == X);
772 }
773 el= PTR(next);
774 }
775 }
776 #endif
777