1 /* QQ: TODO - allocate everything from dynarrays !!! (benchmark) */
2 /* QQ: TODO instant duration locks */
3 /* QQ: #warning automatically place S instead of LS if possible */
4 
5 /* Copyright (C) 2006 MySQL AB
6 
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; version 2 of the License.
10 
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15 
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
19 
20 /*
21   Generic Lock Manager
22 
23   Lock manager handles locks on "resources", a resource must be uniquely
24   identified by a 64-bit number. Lock manager itself does not imply
25   anything about the nature of a resource - it can be a row, a table, a
26   database, or just anything.
27 
28   Locks belong to "lock owners". A Lock owner is uniquely identified by a
29   16-bit number. A function loid2lo must be provided by the application
30   that takes such a number as an argument and returns a LOCK_OWNER
31   structure.
32 
33   Lock levels are completely defined by three tables. Lock compatibility
34   matrix specifies which locks can be held at the same time on a resource.
35   Lock combining matrix specifies what lock level has the same behaviour as
36   a pair of two locks of given levels. getlock_result matrix simplifies
37   intention locking and lock escalation for an application, basically it
38   defines which locks are intention locks and which locks are "loose"
39   locks.  It is only used to provide better diagnostics for the
40   application, lock manager itself does not differentiate between normal,
41   intention, and loose locks.
42 
43   Internally lock manager is based on a lock-free hash, see lf_hash.c for
44   details.  All locks are stored in a hash, with a resource id as a search
45   key, so all locks for the same resource will be considered collisions and
46   will be put in a one (lock-free) linked list.  The main lock-handling
47   logic is in the inner loop that searches for a lock in such a linked
48   list - lockfind().
49 
50   This works as follows. Locks generally are added to the end of the list
51   (with one exception, see below). When scanning the list it is always
52   possible to determine what locks are granted (active) and what locks are
53   waiting - first lock is obviously active, the second is active if it's
54   compatible with the first, and so on, a lock is active if it's compatible
55   with all previous locks and all locks before it are also active.
56   To calculate the "compatible with all previous locks" all locks are
57   accumulated in prev_lock variable using lock_combining_matrix.
58 
59   Lock upgrades: when a thread that has a lock on a given resource,
60   requests a new lock on the same resource and the old lock is not enough
61   to satisfy new lock requirements (which is defined by
62   lock_combining_matrix[old_lock][new_lock] != old_lock), a new lock is
63   placed in the list. Depending on other locks it is immediately active or
64   it will wait for other locks. Here's an exception to "locks are added
65   to the end" rule - upgraded locks are added after the last active lock
66   but before all waiting locks. Old lock (the one we upgraded from) is
67   not removed from the list, indeed it may be needed if the new lock was
68   in a savepoint that gets rolled back. So old lock is marked as "ignored"
69   (IGNORE_ME flag). New lock gets an UPGRADED flag.
70 
71   Loose locks add an important exception to the above. Loose locks do not
72   always commute with other locks. In the list IX-LS both locks are active,
73   while in the LS-IX list only the first lock is active. This creates a
74   problem in lock upgrades. If the list was IX-LS and the owner of the
75   first lock wants to place LS lock (which can be immediately granted), the
76   IX lock is upgraded to LSIX and the list becomes IX-LS-LSIX, which,
77   according to the lock compatibility matrix means that the last lock is
78   waiting - of course it all happened because IX and LS were swapped and
79   they don't commute. To work around this there's ACTIVE flag which is set
80   in every lock that never waited (was placed active), and this flag
81   overrides "compatible with all previous locks" rule.
82 
83   When a lock is placed to the end of the list it's either compatible with
84   all locks and all locks are active - new lock becomes active at once, or
85   it conflicts with some of the locks, in this case in the 'blocker'
86   variable a conflicting lock is returned and the calling thread waits on a
87   pthread condition in the LOCK_OWNER structure of the owner of the
88   conflicting lock. Or a new lock is compatible with all locks, but some
89   existing locks are not compatible with each other (example: request IS,
90   when the list is S-IX) - that is not all locks are active. In this case a
91   first waiting lock is returned in the 'blocker' variable, lockman_getlock()
92   notices that a "blocker" does not conflict with the requested lock, and
93   "dereferences" it, to find the lock that it's waiting on.  The calling
94   thread than begins to wait on the same lock.
95 
96   To better support table-row relations where one needs to lock the table
97   with an intention lock before locking the row, extended diagnostics is
98   provided.  When an intention lock (presumably on a table) is granted,
99   lockman_getlock() returns one of GOT_THE_LOCK (no need to lock the row,
100   perhaps the thread already has a normal lock on this table),
101   GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE (need to lock the row, as usual),
102   GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE (only need to check
103   whether it's possible to lock the row, but no need to lock it - perhaps
104   the thread has a loose lock on this table). This is defined by
105   getlock_result[] table.
106 */
107 
108 #include <my_global.h>
109 #include <my_sys.h>
110 #include <my_bit.h>
111 #include <lf.h>
112 #include "my_cpu.h"
113 #include "lockman.h"
114 
115 /*
116   Lock compatibility matrix.
117 
118   It's asymmetric. Read it as "Somebody has the lock <value in the row
119   label>, can I set the lock <value in the column label> ?"
120 
121   ') Though you can take LS lock while somebody has S lock, it makes no
122   sense - it's simpler to take S lock too.
123 
124   1  - compatible
125   0  - incompatible
126   -1 - "impossible", so that we can assert the impossibility.
127 */
128 static int lock_compatibility_matrix[10][10]=
129 { /* N    S   X  IS  IX  SIX LS  LX  SLX LSIX          */
130   {  -1,  1,  1,  1,  1,  1,  1,  1,  1,  1 }, /* N    */
131   {  -1,  1,  0,  1,  0,  0,  1,  0,  0,  0 }, /* S    */
132   {  -1,  0,  0,  0,  0,  0,  0,  0,  0,  0 }, /* X    */
133   {  -1,  1,  0,  1,  1,  1,  1,  1,  1,  1 }, /* IS   */
134   {  -1,  0,  0,  1,  1,  0,  1,  1,  0,  1 }, /* IX   */
135   {  -1,  0,  0,  1,  0,  0,  1,  0,  0,  0 }, /* SIX  */
136   {  -1,  1,  0,  1,  0,  0,  1,  0,  0,  0 }, /* LS   */
137   {  -1,  0,  0,  0,  0,  0,  0,  0,  0,  0 }, /* LX   */
138   {  -1,  0,  0,  0,  0,  0,  0,  0,  0,  0 }, /* SLX  */
139   {  -1,  0,  0,  1,  0,  0,  1,  0,  0,  0 }  /* LSIX */
140 };
141 
142 /*
143   Lock combining matrix.
144 
145   It's symmetric. Read it as "what lock level L is identical to the
146   set of two locks A and B"
147 
148   One should never get N from it, we assert the impossibility
149 */
150 static enum lockman_lock_type lock_combining_matrix[10][10]=
151 {/*    N    S   X    IS    IX  SIX    LS    LX   SLX   LSIX         */
152   {    N,   S,  X,   IS,   IX, SIX,    S,  SLX, SLX,  SIX}, /* N    */
153   {    S,   S,  X,    S,  SIX, SIX,    S,  SLX, SLX,  SIX}, /* S    */
154   {    X,   X,  X,    X,    X,   X,    X,    X,   X,    X}, /* X    */
155   {   IS,   S,  X,   IS,   IX, SIX,   LS,   LX, SLX, LSIX}, /* IS   */
156   {   IX, SIX,  X,   IX,   IX, SIX, LSIX,   LX, SLX, LSIX}, /* IX   */
157   {  SIX, SIX,  X,  SIX,  SIX, SIX,  SIX,  SLX, SLX,  SIX}, /* SIX  */
158   {   LS,   S,  X,   LS, LSIX, SIX,   LS,   LX, SLX, LSIX}, /* LS   */
159   {   LX, SLX,  X,   LX,   LX, SLX,   LX,   LX, SLX,   LX}, /* LX   */
160   {  SLX, SLX,  X,  SLX,  SLX, SLX,  SLX,  SLX, SLX,  SLX}, /* SLX  */
161   { LSIX, SIX,  X, LSIX, LSIX, SIX, LSIX,   LX, SLX, LSIX}  /* LSIX */
162 };
163 
164 #define REPEAT_ONCE_MORE                0
165 #define OK_TO_PLACE_THE_LOCK            1
166 #define OK_TO_PLACE_THE_REQUEST         2
167 #define ALREADY_HAVE_THE_LOCK           4
168 #define ALREADY_HAVE_THE_REQUEST        8
169 #define PLACE_NEW_DISABLE_OLD          16
170 #define REQUEST_NEW_DISABLE_OLD        32
171 #define RESOURCE_WAS_UNLOCKED          64
172 
173 #define NEED_TO_WAIT (OK_TO_PLACE_THE_REQUEST | ALREADY_HAVE_THE_REQUEST |\
174                       REQUEST_NEW_DISABLE_OLD)
175 #define ALREADY_HAVE (ALREADY_HAVE_THE_LOCK   | ALREADY_HAVE_THE_REQUEST)
176 #define LOCK_UPGRADE (PLACE_NEW_DISABLE_OLD   | REQUEST_NEW_DISABLE_OLD)
177 
178 
179 /*
180   the return codes for lockman_getlock
181 
182   It's asymmetric. Read it as "I have the lock <value in the row label>,
183   what value should be returned for <value in the column label> ?"
184 
185   0 means impossible combination (assert!)
186 
187   Defines below help to preserve the table structure.
188   I/L/A values are self explanatory
189   x means the combination is possible (assert should not crash)
190     but it cannot happen in row locks, only in table locks (S,X),
191     or lock escalations (LS,LX)
192 */
193 #define I GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE
194 #define L GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE
195 #define A GOT_THE_LOCK
196 #define x GOT_THE_LOCK
197 static enum lockman_getlock_result getlock_result[10][10]=
198 {/*    N    S   X    IS    IX  SIX    LS    LX   SLX   LSIX         */
199   {    0,   0,  0,    0,    0,   0,    0,    0,   0,    0}, /* N    */
200   {    0,   x,  0,    A,    0,   0,    x,    0,   0,    0}, /* S    */
201   {    0,   x,  x,    A,    A,   0,    x,    x,   0,    0}, /* X    */
202   {    0,   0,  0,    I,    0,   0,    0,    0,   0,    0}, /* IS   */
203   {    0,   0,  0,    I,    I,   0,    0,    0,   0,    0}, /* IX   */
204   {    0,   x,  0,    A,    I,   0,    x,    0,   0,    0}, /* SIX  */
205   {    0,   0,  0,    L,    0,   0,    x,    0,   0,    0}, /* LS   */
206   {    0,   0,  0,    L,    L,   0,    x,    x,   0,    0}, /* LX   */
207   {    0,   x,  0,    A,    L,   0,    x,    x,   0,    0}, /* SLX  */
208   {    0,   0,  0,    L,    I,   0,    x,    0,   0,    0}  /* LSIX */
209 };
210 #undef I
211 #undef L
212 #undef A
213 #undef x
214 
215 typedef struct lockman_lock {
216   uint64 resource;
217   struct lockman_lock  *lonext;
218   intptr volatile link;
219   uint32 hashnr;
220   /* QQ: TODO - remove hashnr from LOCK */
221   uint16 loid;
222   uchar lock;              /* sizeof(uchar) <= sizeof(enum) */
223   uchar flags;
224 } LOCK;
225 
226 #define IGNORE_ME               1
227 #define UPGRADED                2
228 #define ACTIVE                  4
229 
230 typedef struct {
231   intptr volatile *prev;
232   LOCK *curr, *next;
233   LOCK *blocker, *upgrade_from;
234 } CURSOR;
235 
236 #define PTR(V)      (LOCK *)((V) & (~(intptr)1))
237 #define DELETED(V)  ((V) & 1)
238 
239 /*
240   NOTE
241     cursor is positioned in either case
242     pins[0..3] are used, they are NOT removed on return
243 */
lockfind(LOCK * volatile * head,LOCK * node,CURSOR * cursor,LF_PINS * pins)244 static int lockfind(LOCK * volatile *head, LOCK *node,
245                     CURSOR *cursor, LF_PINS *pins)
246 {
247   uint32        hashnr, cur_hashnr;
248   uint64        resource, cur_resource;
249   intptr        cur_link;
250   my_bool       cur_active, compatible, upgrading, prev_active;
251   enum lockman_lock_type lock, prev_lock, cur_lock;
252   uint16        loid, cur_loid;
253   int           cur_flags, flags;
254 
255   hashnr= node->hashnr;
256   resource= node->resource;
257   lock= node->lock;
258   loid= node->loid;
259   flags= node->flags;
260 
261 retry:
262   cursor->prev= (intptr *)head;
263   prev_lock= N;
264   cur_active= TRUE;
265   compatible= TRUE;
266   upgrading= FALSE;
267   cursor->blocker= cursor->upgrade_from= 0;
268   lf_unpin(pins, 3);
269   do {
270     cursor->curr= PTR(*cursor->prev);
271     lf_pin(pins, 1, cursor->curr);
272   } while(*cursor->prev != (intptr)cursor->curr && LF_BACKOFF());
273   for (;;)
274   {
275     if (!cursor->curr)
276       break;
277     do {
278       cur_link= cursor->curr->link;
279       cursor->next= PTR(cur_link);
280       lf_pin(pins, 0, cursor->next);
281     } while (cur_link != cursor->curr->link && LF_BACKOFF());
282     cur_hashnr= cursor->curr->hashnr;
283     cur_resource= cursor->curr->resource;
284     cur_lock= cursor->curr->lock;
285     cur_loid= cursor->curr->loid;
286     cur_flags= cursor->curr->flags;
287     if (*cursor->prev != (intptr)cursor->curr)
288     {
289       (void)LF_BACKOFF();
290       goto retry;
291     }
292     if (!DELETED(cur_link))
293     {
294       if (cur_hashnr > hashnr ||
295           (cur_hashnr == hashnr && cur_resource >= resource))
296       {
297         if (cur_hashnr > hashnr || cur_resource > resource)
298           break;
299         /* ok, we have a lock for this resource */
300         DBUG_ASSERT(lock_compatibility_matrix[prev_lock][cur_lock] >= 0);
301         DBUG_ASSERT(lock_compatibility_matrix[cur_lock][lock] >= 0);
302         if ((cur_flags & IGNORE_ME) && ! (flags & IGNORE_ME))
303         {
304           DBUG_ASSERT(cur_active);
305           if (cur_loid == loid)
306             cursor->upgrade_from= cursor->curr;
307         }
308         else
309         {
310           prev_active= cur_active;
311           if (cur_flags & ACTIVE)
312             DBUG_ASSERT(prev_active == TRUE);
313           else
314             cur_active&= lock_compatibility_matrix[prev_lock][cur_lock];
315           if (upgrading && !cur_active /*&& !(cur_flags & UPGRADED)*/)
316             break;
317           if (prev_active && !cur_active)
318           {
319             cursor->blocker= cursor->curr;
320             lf_pin(pins, 3, cursor->curr);
321           }
322           if (cur_loid == loid)
323           {
324             /* we already have a lock on this resource */
325             DBUG_ASSERT(lock_combining_matrix[cur_lock][lock] != N);
326             DBUG_ASSERT(!upgrading || (flags & IGNORE_ME));
327             if (lock_combining_matrix[cur_lock][lock] == cur_lock)
328             {
329               /* new lock is compatible */
330               if (cur_active)
331               {
332                 cursor->blocker= cursor->curr;  /* loose-locks! */
333                 lf_unpin(pins, 3);             /* loose-locks! */
334                 return ALREADY_HAVE_THE_LOCK;
335               }
336               else
337                 return ALREADY_HAVE_THE_REQUEST;
338             }
339             /* not compatible, upgrading */
340             upgrading= TRUE;
341             cursor->upgrade_from= cursor->curr;
342           }
343           else
344           {
345             if (!lock_compatibility_matrix[cur_lock][lock])
346             {
347               compatible= FALSE;
348               cursor->blocker= cursor->curr;
349               lf_pin(pins, 3, cursor->curr);
350             }
351           }
352           prev_lock= lock_combining_matrix[prev_lock][cur_lock];
353           DBUG_ASSERT(prev_lock != N);
354         }
355       }
356       cursor->prev= &(cursor->curr->link);
357       lf_pin(pins, 2, cursor->curr);
358     }
359     else
360     {
361       if (my_atomic_casptr((void **)cursor->prev,
362                            (void **)(char*) &cursor->curr, cursor->next))
363         lf_alloc_free(pins, cursor->curr);
364       else
365       {
366         (void)LF_BACKOFF();
367         goto retry;
368       }
369     }
370     cursor->curr= cursor->next;
371     lf_pin(pins, 1, cursor->curr);
372   }
373   /*
374     either the end of lock list - no more locks for this resource,
375     or upgrading and the end of active lock list
376   */
377   if (upgrading)
378   {
379     if (compatible /*&& prev_active*/)
380       return PLACE_NEW_DISABLE_OLD;
381     else
382       return REQUEST_NEW_DISABLE_OLD;
383   }
384   if (cur_active && compatible)
385   {
386     /*
387       either no locks for this resource or all are compatible.
388       ok to place the lock in any case.
389     */
390     return prev_lock == N ? RESOURCE_WAS_UNLOCKED
391                           : OK_TO_PLACE_THE_LOCK;
392   }
393   /* we have a lock conflict. ok to place a lock request. And wait */
394   return OK_TO_PLACE_THE_REQUEST;
395 }
396 
397 /*
398   NOTE
399     it uses pins[0..3], on return pins 0..2 are removed, pin 3 (blocker) stays
400 */
lockinsert(LOCK * volatile * head,LOCK * node,LF_PINS * pins,LOCK ** blocker)401 static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
402                       LOCK **blocker)
403 {
404   CURSOR         cursor;
405   int            res;
406 
407   do
408   {
409     res= lockfind(head, node, &cursor, pins);
410     DBUG_ASSERT(res != ALREADY_HAVE_THE_REQUEST);
411     if (!(res & ALREADY_HAVE))
412     {
413       if (res & LOCK_UPGRADE)
414       {
415         node->flags|= UPGRADED;
416         node->lock= lock_combining_matrix[cursor.upgrade_from->lock][node->lock];
417       }
418       if (!(res & NEED_TO_WAIT))
419         node->flags|= ACTIVE;
420       node->link= (intptr)cursor.curr;
421       DBUG_ASSERT(node->link != (intptr)node);
422       DBUG_ASSERT(cursor.prev != &node->link);
423       if (!my_atomic_casptr((void **)cursor.prev,
424                             (void **)(char*) &cursor.curr, node))
425       {
426         res= REPEAT_ONCE_MORE;
427         node->flags&= ~ACTIVE;
428       }
429       if (res & LOCK_UPGRADE)
430         cursor.upgrade_from->flags|= IGNORE_ME;
431       /*
432         QQ: is this OK ? if a reader has already read upgrade_from,
433         it may find it conflicting with node :(
434         - see the last test from test_lockman_simple()
435       */
436     }
437 
438   } while (res == REPEAT_ONCE_MORE);
439   lf_unpin(pins, 0);
440   lf_unpin(pins, 1);
441   lf_unpin(pins, 2);
442   /*
443     note that blocker is not necessarily pinned here (when it's == curr).
444     this is ok as in such a case it's either a dummy node for
445     initialize_bucket() and dummy nodes don't need pinning,
446     or it's a lock of the same transaction for lockman_getlock,
447     and it cannot be removed by another thread
448   */
449   *blocker= cursor.blocker;
450   return res;
451 }
452 
453 /*
454   NOTE
455     it uses pins[0..3], on return pins 0..2 are removed, pin 3 (blocker) stays
456 */
lockpeek(LOCK * volatile * head,LOCK * node,LF_PINS * pins,LOCK ** blocker)457 static int lockpeek(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
458                     LOCK **blocker)
459 {
460   CURSOR         cursor;
461   int            res;
462 
463   res= lockfind(head, node, &cursor, pins);
464 
465   lf_unpin(pins, 0);
466   lf_unpin(pins, 1);
467   lf_unpin(pins, 2);
468   if (blocker)
469     *blocker= cursor.blocker;
470   return res;
471 }
472 
473 /*
474   NOTE
475     it uses pins[0..3], on return all pins are removed.
476 
477     One _must_ have the lock (or request) to call this
478 */
lockdelete(LOCK * volatile * head,LOCK * node,LF_PINS * pins)479 static int lockdelete(LOCK * volatile *head, LOCK *node, LF_PINS *pins)
480 {
481   CURSOR cursor;
482   int res;
483 
484   do
485   {
486     res= lockfind(head, node, &cursor, pins);
487     DBUG_ASSERT(res & ALREADY_HAVE);
488 
489     if (cursor.upgrade_from)
490       cursor.upgrade_from->flags&= ~IGNORE_ME;
491 
492     /*
493       XXX this does not work with savepoints, as old lock is left ignored.
494       It cannot be unignored, as would basically mean moving the lock back
495       in the lock chain (from upgraded). And the latter is not allowed -
496       because it breaks list scanning. So old ignored lock must be deleted,
497       new - same - lock must be installed right after the lock we're deleting,
498       then we can delete. Good news is - this is only required when rolling
499       back a savepoint.
500     */
501     if (my_atomic_casptr((void **)(char*)&(cursor.curr->link),
502                          (void **)(char*)&cursor.next, 1+(char *)cursor.next))
503     {
504       if (my_atomic_casptr((void **)cursor.prev,
505                            (void **)(char*)&cursor.curr, cursor.next))
506         lf_alloc_free(pins, cursor.curr);
507       else
508         lockfind(head, node, &cursor, pins);
509     }
510     else
511     {
512       res= REPEAT_ONCE_MORE;
513       if (cursor.upgrade_from)
514         cursor.upgrade_from->flags|= IGNORE_ME;
515     }
516   } while (res == REPEAT_ONCE_MORE);
517   lf_unpin(pins, 0);
518   lf_unpin(pins, 1);
519   lf_unpin(pins, 2);
520   lf_unpin(pins, 3);
521   return res;
522 }
523 
lockman_init(LOCKMAN * lm,loid_to_lo_func * func,uint timeout)524 void lockman_init(LOCKMAN *lm, loid_to_lo_func *func, uint timeout)
525 {
526   lf_alloc_init(&lm->alloc, sizeof(LOCK), offsetof(LOCK, lonext));
527   lf_dynarray_init(&lm->array, sizeof(LOCK **));
528   lm->size= 1;
529   lm->count= 0;
530   lm->loid_to_lo= func;
531   lm->lock_timeout= timeout;
532 }
533 
lockman_destroy(LOCKMAN * lm)534 void lockman_destroy(LOCKMAN *lm)
535 {
536   LOCK *el= *(LOCK **)lf_dynarray_lvalue(&lm->array, 0);
537   while (el)
538   {
539     intptr next= el->link;
540     if (el->hashnr & 1)
541       lf_alloc_direct_free(&lm->alloc, el);
542     else
543       my_free((void *)el);
544     el= (LOCK *)next;
545   }
546   lf_alloc_destroy(&lm->alloc);
547   lf_dynarray_destroy(&lm->array);
548 }
549 
550 /* TODO: optimize it */
551 #define MAX_LOAD 1
552 
initialize_bucket(LOCKMAN * lm,LOCK * volatile * node,uint bucket,LF_PINS * pins)553 static void initialize_bucket(LOCKMAN *lm, LOCK * volatile *node,
554                               uint bucket, LF_PINS *pins)
555 {
556   int res;
557   uint parent= my_clear_highest_bit(bucket);
558   LOCK *dummy= (LOCK *)my_malloc(PSI_INSTRUMENT_ME, sizeof(LOCK), MYF(MY_WME));
559   LOCK **tmp= 0, *cur;
560   LOCK * volatile *el= lf_dynarray_lvalue(&lm->array, parent);
561 
562   if (*el == NULL && bucket)
563     initialize_bucket(lm, el, parent, pins);
564   dummy->hashnr= my_reverse_bits(bucket);
565   dummy->loid= 0;
566   dummy->lock= X; /* doesn't matter, in fact */
567   dummy->resource= 0;
568   dummy->flags= 0;
569   res= lockinsert(el, dummy, pins, &cur);
570   DBUG_ASSERT(res & (ALREADY_HAVE_THE_LOCK | RESOURCE_WAS_UNLOCKED));
571   if (res & ALREADY_HAVE_THE_LOCK)
572   {
573     my_free((void *)dummy);
574     dummy= cur;
575   }
576   my_atomic_casptr((void **)node, (void **)(char*) &tmp, dummy);
577 }
578 
calc_hash(uint64 resource)579 static inline uint calc_hash(uint64 resource)
580 {
581   const uchar *pos= (uchar *)&resource;
582   ulong nr1= 1, nr2= 4, i;
583   for (i= 0; i < sizeof(resource) ; i++, pos++)
584   {
585     nr1^= (ulong) ((((uint) nr1 & 63)+nr2) * ((uint)*pos)) + (nr1 << 8);
586     nr2+= 3;
587   }
588   return nr1 & INT_MAX32;
589 }
590 
591 /*
592   RETURN
593     see enum lockman_getlock_result
594   NOTE
595     uses pins[0..3], they're removed on return
596 */
lockman_getlock(LOCKMAN * lm,LOCK_OWNER * lo,uint64 resource,enum lockman_lock_type lock)597 enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
598                                             uint64 resource,
599                                             enum lockman_lock_type lock)
600 {
601   int res;
602   uint csize, bucket, hashnr;
603   LOCK *node, * volatile *el, *blocker;
604   LF_PINS *pins= lo->pins;
605   enum lockman_lock_type old_lock;
606 
607   DBUG_ASSERT(lo->loid);
608   node= (LOCK *)lf_alloc_new(pins);
609   node->flags= 0;
610   node->lock= lock;
611   node->loid= lo->loid;
612   node->resource= resource;
613   hashnr= calc_hash(resource);
614   bucket= hashnr % lm->size;
615   el= lf_dynarray_lvalue(&lm->array, bucket);
616   if (*el == NULL)
617     initialize_bucket(lm, el, bucket, pins);
618   node->hashnr= my_reverse_bits(hashnr) | 1;
619   res= lockinsert(el, node, pins, &blocker);
620   if (res & ALREADY_HAVE)
621   {
622     int r;
623     old_lock= blocker->lock;
624     lf_alloc_free(pins, node);
625     r= getlock_result[old_lock][lock];
626     DBUG_ASSERT(r);
627     return r;
628   }
629   /* a new value was added to the hash */
630   csize= lm->size;
631   if ((my_atomic_add32(&lm->count, 1)+1.0) / csize > MAX_LOAD)
632     my_atomic_cas32(&lm->size, (int*) &csize, csize*2);
633   node->lonext= lo->all_locks;
634   lo->all_locks= node;
635   for ( ; res & NEED_TO_WAIT; res= lockpeek(el, node, pins, &blocker))
636   {
637     LOCK_OWNER *wait_for_lo;
638     ulonglong deadline;
639     struct timespec timeout;
640 
641     lf_assert_pin(pins, 3); /* blocker must be pinned here */
642     wait_for_lo= lm->loid_to_lo(blocker->loid);
643 
644     /*
645       now, this is tricky. blocker is not necessarily a LOCK
646       we're waiting for. If it's compatible with what we want,
647       then we're waiting for a lock that blocker is waiting for
648       (see two places where blocker is set in lockfind)
649       In the latter case, let's "dereference" it
650     */
651     if (lock_compatibility_matrix[blocker->lock][lock])
652     {
653       blocker= wait_for_lo->all_locks;
654       lf_pin(pins, 3, blocker);
655       if (blocker != wait_for_lo->all_locks)
656         continue;
657       wait_for_lo= wait_for_lo->waiting_for;
658     }
659 
660     /*
661       note that the blocker transaction may have ended by now,
662       its LOCK_OWNER and short id were reused, so 'wait_for_lo' may point
663       to an unrelated - albeit valid - LOCK_OWNER
664     */
665     if (!wait_for_lo)
666       continue;
667 
668     lo->waiting_for= wait_for_lo;
669 
670     /*
671       We lock a mutex - it may belong to a wrong LOCK_OWNER, but it must
672       belong to _some_ LOCK_OWNER. It means, we can never free() a LOCK_OWNER,
673       if there're other active LOCK_OWNERs.
674     */
675     /* QQ: race condition here */
676     pthread_mutex_lock(wait_for_lo->mutex);
677     if (DELETED(blocker->link))
678     {
679       /*
680         blocker transaction was ended, or a savepoint that owned
681         the lock was rolled back. Either way - the lock was removed
682       */
683       pthread_mutex_unlock(wait_for_lo->mutex);
684       continue;
685     }
686 
687     /* yuck. waiting */
688     deadline= my_hrtime().val*1000 + lm->lock_timeout * 1000000;
689     set_timespec_time_nsec(timeout, deadline);
690     do
691     {
692       pthread_cond_timedwait(wait_for_lo->cond, wait_for_lo->mutex, &timeout);
693     } while (!DELETED(blocker->link) && my_hrtime().val < deadline/1000);
694     pthread_mutex_unlock(wait_for_lo->mutex);
695     if (!DELETED(blocker->link))
696     {
697       /*
698         timeout.
699         note that we _don't_ release the lock request here.
700         Instead we're relying on the caller to abort the transaction,
701         and release all locks at once - see lockman_release_locks()
702       */
703       lf_unpin(pins, 3);
704       return DIDNT_GET_THE_LOCK;
705     }
706   }
707   lo->waiting_for= 0;
708   lf_assert_unpin(pins, 3); /* unpin should not be needed */
709   return getlock_result[lock][lock];
710 }
711 
712 /*
713   RETURN
714     0 - deleted
715     1 - didn't (not found)
716   NOTE
717     see lockdelete() for pin usage notes
718 */
lockman_release_locks(LOCKMAN * lm,LOCK_OWNER * lo)719 int lockman_release_locks(LOCKMAN *lm, LOCK_OWNER *lo)
720 {
721   LOCK * volatile *el, *node, *next;
722   uint bucket;
723   LF_PINS *pins= lo->pins;
724 
725   pthread_mutex_lock(lo->mutex);
726   for (node= lo->all_locks; node; node= next)
727   {
728     next= node->lonext;
729     bucket= calc_hash(node->resource) % lm->size;
730     el= lf_dynarray_lvalue(&lm->array, bucket);
731     if (*el == NULL)
732       initialize_bucket(lm, el, bucket, pins);
733     lockdelete(el, node, pins);
734     my_atomic_add32(&lm->count, -1);
735   }
736   lo->all_locks= 0;
737   /* now signal all waiters */
738   pthread_cond_broadcast(lo->cond);
739   pthread_mutex_unlock(lo->mutex);
740   return 0;
741 }
742 
743 #ifdef MY_LF_EXTRA_DEBUG
744 static const char *lock2str[]=
745 { "N", "S", "X", "IS", "IX", "SIX", "LS", "LX", "SLX", "LSIX" };
746 /*
747   NOTE
748     the function below is NOT thread-safe !!!
749 */
print_lockhash(LOCKMAN * lm)750 void print_lockhash(LOCKMAN *lm)
751 {
752   LOCK *el= *(LOCK **)lf_dynarray_lvalue(&lm->array, 0);
753   printf("hash: size %u count %u\n", lm->size, lm->count);
754   while (el)
755   {
756     intptr next= el->link;
757     if (el->hashnr & 1)
758     {
759       printf("0x%08lx { resource %lu, loid %u, lock %s",
760              (long) el->hashnr, (ulong) el->resource, el->loid,
761              lock2str[el->lock]);
762       if (el->flags & IGNORE_ME) printf(" IGNORE_ME");
763       if (el->flags & UPGRADED) printf(" UPGRADED");
764       if (el->flags & ACTIVE) printf(" ACTIVE");
765       if (DELETED(next)) printf(" ***DELETED***");
766       printf("}\n");
767     }
768     else
769     {
770       /*printf("0x%08x { dummy }\n", el->hashnr);*/
771       DBUG_ASSERT(el->resource == 0 && el->loid == 0 && el->lock == X);
772     }
773     el= PTR(next);
774   }
775 }
776 #endif
777