1 /* QQ: TODO - allocate everything from dynarrays !!! (benchmark) */
2 /* QQ: automatically place S instead of LS if possible */
3 /* Copyright (C) 2006 MySQL AB
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
17 
18 #include <my_base.h>
19 #include <hash.h>
20 #include "tablockman.h"
21 
22 /*
23   Lock Manager for Table Locks
24 
25   The code below handles locks on resources - but it is optimized for a
26   case when a number of resources is not very large, and there are many of
27   locks per resource - that is a resource is likely to be a table or a
28   database, but hardly a row in a table.
29 
30   Locks belong to "lock owners". A Lock Owner is uniquely identified by a
31   16-bit number - loid (lock owner identifier). A function loid_to_tlo must
32   be provided by the application that takes such a number as an argument
33   and returns a TABLE_LOCK_OWNER structure.
34 
35   Lock levels are completely defined by three tables. Lock compatibility
36   matrix specifies which locks can be held at the same time on a resource.
37   Lock combining matrix specifies what lock level has the same behaviour as
38   a pair of two locks of given levels. getlock_result matrix simplifies
39   intention locking and lock escalation for an application, basically it
40   defines which locks are intention locks and which locks are "loose"
41   locks.  It is only used to provide better diagnostics for the
42   application, lock manager itself does not differentiate between normal,
43   intention, and loose locks.
44 
45   The assumptions are: few distinct resources, many locks are held at the
46   same time on one resource. Thus: a lock structure _per resource_ can be
47   rather large; a lock structure _per lock_ does not need to be very small
48   either; we need to optimize for _speed_. Operations we need are: place a
49   lock, check if a particular transaction already has a lock on this
50   resource, check if a conflicting lock exists, if yes - find who owns it.
51 
52   Solution: every resource has a structure with
53   1. Hash of latest (see the lock upgrade section below) granted locks with
54      loid as a key. Thus, checking if a given transaction has a lock on
55      this resource is O(1) operation.
56   2. Doubly-linked lists of all granted locks - one list for every lock
57      type. Thus, checking if a conflicting lock exists is a check whether
58      an appropriate list head pointer is not null, also O(1).
59   3. Every lock has a loid of the owner, thus checking who owns a
60      conflicting lock is also O(1).
61   4. Deque of waiting locks. It's a deque (double-ended queue) not a fifo,
62      because for lock upgrades requests are added to the queue head, not
63      tail. This is a single place where there it gets O(N) on number
64      of locks - when a transaction wakes up from waiting on a condition,
65      it may need to scan the queue backward to the beginning to find
66      a conflicting lock. It is guaranteed though that "all transactions
67      before it" received the same - or earlier - signal.  In other words a
68      transaction needs to scan all transactions before it that received the
69      signal but didn't have a chance to resume the execution yet, so
70      practically OS scheduler won't let the scan to be O(N).
71 
72   Waiting: if there is a conflicting lock or if wait queue is not empty, a
73   requested lock cannot be granted at once. It is added to the end of the
74   wait queue. If a queue was empty and there is a conflicting lock - the
75   "blocker" transaction is the owner of this lock. If a queue is not empty,
76   an owner of the previous lock in the queue is the "blocker". But if the
77   previous lock is compatible with the request, then the "blocker" is the
78   transaction that the owner of the lock at the end of the queue is waiting
79   for (in other words, our lock is added to the end of the wait queue, and
80   our blocker is the same as of the lock right before us).
81 
82   Lock upgrades: when a thread that has a lock on a given resource,
83   requests a new lock on the same resource and the old lock is not enough
84   to satisfy new lock requirements (which is defined by
85   lock_combining_matrix[old_lock][new_lock] != old_lock), a new lock
86   (defined by lock_combining_matrix as above) is placed. Depending on
87   other granted locks it is immediately granted or it has to wait.  Here the
88   lock is added to the start of the waiting queue, not to the end.  Old
89   lock, is removed from the hash, but not from the doubly-linked lists.
90   (indeed, a transaction checks "do I have a lock on this resource ?" by
91   looking in a hash, and it should find a latest lock, so old locks must be
92   removed; but a transaction checks "are there conflicting locks ?" by
93   checking doubly-linked lists, it doesn't matter if it will find an old
94   lock - if it would be removed, a new lock would be also a conflict).
95   So, a hash contains only "latest" locks - there can be only one latest
96   lock per resource per transaction. But doubly-linked lists contain all
97   locks, even "obsolete" ones, because it doesnt't hurt. Note that old
98   locks can not be freed early, in particular they stay in the
99   'active_locks' list of a lock owner, because they may be "re-enabled"
100   on a savepoint rollback.
101 
102   To better support table-row relations where one needs to lock the table
103   with an intention lock before locking the row, extended diagnostics is
104   provided.  When an intention lock (presumably on a table) is granted,
105   lockman_getlock() returns one of GOT_THE_LOCK (no need to lock the row,
106   perhaps the thread already has a normal lock on this table),
107   GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE (need to lock the row, as usual),
108   GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE (only need to check
109   whether it's possible to lock the row, but no need to lock it - perhaps
110   the thread has a loose lock on this table). This is defined by
111   getlock_result[] table.
112 
113   Instant duration locks are not supported. Though they're trivial to add,
114   they are normally only used on rows, not on tables. So, presumably,
115   they are not needed here.
116 
117   Mutexes: there're table mutexes (LOCKED_TABLE::mutex), lock owner mutexes
118   (TABLE_LOCK_OWNER::mutex), and a pool mutex (TABLOCKMAN::pool_mutex).
119   table mutex protects operations on the table lock structures, and lock
120   owner pointers waiting_for and waiting_for_loid.
121   lock owner mutex is only used to wait on lock owner condition
122   (TABLE_LOCK_OWNER::cond), there's no need to protect owner's lock
123   structures, and only lock owner itself may access them.
124   The pool mutex protects a pool of unused locks. Note the locking order:
125   first the table mutex, then the owner mutex or a pool mutex.
126   Table mutex lock cannot be attempted when owner or pool mutex are locked.
127   No mutex lock can be attempted if owner or pool mutex are locked.
128 */
129 
130 /*
131   Lock compatibility matrix.
132 
133   It's asymmetric. Read it as "Somebody has the lock <value in the row
134   label>, can I set the lock <value in the column label> ?"
135 
136   ') Though you can take LS lock while somebody has S lock, it makes no
137   sense - it's simpler to take S lock too.
138 
139   1  - compatible
140   0  - incompatible
141   -1 - "impossible", so that we can assert the impossibility.
142 */
143 static const int lock_compatibility_matrix[10][10]=
144 { /* N    S   X  IS  IX  SIX LS  LX  SLX LSIX          */
145   {  -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 }, /* N    */
146   {  -1,  1,  0,  1,  0,  0,  1,  0,  0,  0 }, /* S    */
147   {  -1,  0,  0,  0,  0,  0,  0,  0,  0,  0 }, /* X    */
148   {  -1,  1,  0,  1,  1,  1,  1,  1,  1,  1 }, /* IS   */
149   {  -1,  0,  0,  1,  1,  0,  1,  1,  0,  1 }, /* IX   */
150   {  -1,  0,  0,  1,  0,  0,  1,  0,  0,  0 }, /* SIX  */
151   {  -1,  1,  0,  1,  0,  0,  1,  0,  0,  0 }, /* LS   */
152   {  -1,  0,  0,  0,  0,  0,  0,  0,  0,  0 }, /* LX   */
153   {  -1,  0,  0,  0,  0,  0,  0,  0,  0,  0 }, /* SLX  */
154   {  -1,  0,  0,  1,  0,  0,  1,  0,  0,  0 }  /* LSIX */
155 };
156 
157 /*
158   Lock combining matrix.
159 
160   It's symmetric. Read it as "what lock level L is identical to the
161   set of two locks A and B"
162 
163   One should never get N from it, we assert the impossibility
164 */
165 static const enum lockman_lock_type lock_combining_matrix[10][10]=
166 {/*    N    S   X    IS    IX  SIX    LS    LX   SLX   LSIX         */
167   {    N,   N,  N,    N,    N,   N,    N,    N,   N,    N}, /* N    */
168   {    N,   S,  X,    S,  SIX, SIX,    S,  SLX, SLX,  SIX}, /* S    */
169   {    N,   X,  X,    X,    X,   X,    X,    X,   X,    X}, /* X    */
170   {    N,   S,  X,   IS,   IX, SIX,   LS,   LX, SLX, LSIX}, /* IS   */
171   {    N, SIX,  X,   IX,   IX, SIX, LSIX,   LX, SLX, LSIX}, /* IX   */
172   {    N, SIX,  X,  SIX,  SIX, SIX,  SIX,  SLX, SLX,  SIX}, /* SIX  */
173   {    N,   S,  X,   LS, LSIX, SIX,   LS,   LX, SLX, LSIX}, /* LS   */
174   {    N, SLX,  X,   LX,   LX, SLX,   LX,   LX, SLX,   LX}, /* LX   */
175   {    N, SLX,  X,  SLX,  SLX, SLX,  SLX,  SLX, SLX,  SLX}, /* SLX  */
176   {    N, SIX,  X, LSIX, LSIX, SIX, LSIX,   LX, SLX, LSIX}  /* LSIX */
177 };
178 
179 /*
180   the return codes for lockman_getlock
181 
182   It's asymmetric. Read it as "I have the lock <value in the row label>,
183   what value should be returned for <value in the column label> ?"
184 
185   0 means impossible combination (assert!)
186 
187   Defines below help to preserve the table structure.
188   I/L/A values are self explanatory
189   x means the combination is possible (assert should not crash)
190     but it cannot happen in row locks, only in table locks (S,X),
191     or lock escalations (LS,LX)
192 */
193 #define I GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE
194 #define L GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE
195 #define A GOT_THE_LOCK
196 #define x GOT_THE_LOCK
197 static const enum lockman_getlock_result getlock_result[10][10]=
198 {/*    N    S   X    IS    IX  SIX    LS    LX   SLX   LSIX         */
199   {    0,   0,  0,    0,    0,   0,    0,    0,   0,    0}, /* N    */
200   {    0,   x,  0,    A,    0,   0,    x,    0,   0,    0}, /* S    */
201   {    0,   x,  x,    A,    A,   0,    x,    x,   0,    0}, /* X    */
202   {    0,   0,  0,    I,    0,   0,    0,    0,   0,    0}, /* IS   */
203   {    0,   0,  0,    I,    I,   0,    0,    0,   0,    0}, /* IX   */
204   {    0,   x,  0,    A,    I,   0,    x,    0,   0,    0}, /* SIX  */
205   {    0,   0,  0,    L,    0,   0,    x,    0,   0,    0}, /* LS   */
206   {    0,   0,  0,    L,    L,   0,    x,    x,   0,    0}, /* LX   */
207   {    0,   x,  0,    A,    L,   0,    x,    x,   0,    0}, /* SLX  */
208   {    0,   0,  0,    L,    I,   0,    x,    0,   0,    0}  /* LSIX */
209 };
210 #undef I
211 #undef L
212 #undef A
213 #undef x
214 
215 /*
216   this structure is optimized for a case when there're many locks
217   on the same resource - e.g. a table
218 */
219 
220 struct st_table_lock {
221   /* QQ: do we need upgraded_from ? */
222   struct st_table_lock *next_in_lo, *upgraded_from, *next, *prev;
223   struct st_locked_table *table;
224   uint16 loid;
225   uchar  lock_type;
226 };
227 
228 static inline
find_by_loid(LOCKED_TABLE * table,uint16 loid)229 TABLE_LOCK *find_by_loid(LOCKED_TABLE *table, uint16 loid)
230 {
231   return (TABLE_LOCK *)my_hash_search(& table->latest_locks,
232                                    (uchar *)& loid, sizeof(loid));
233 }
234 
235 static inline
remove_from_wait_queue(TABLE_LOCK * lock,LOCKED_TABLE * table)236 void remove_from_wait_queue(TABLE_LOCK *lock, LOCKED_TABLE *table)
237 {
238   DBUG_ASSERT(table == lock->table);
239   if (lock->prev)
240   {
241     DBUG_ASSERT(table->wait_queue_out != lock);
242     lock->prev->next= lock->next;
243   }
244   else
245   {
246     DBUG_ASSERT(table->wait_queue_out == lock);
247     table->wait_queue_out= lock->next;
248   }
249   if (lock->next)
250   {
251     DBUG_ASSERT(table->wait_queue_in != lock);
252     lock->next->prev= lock->prev;
253   }
254   else
255   {
256     DBUG_ASSERT(table->wait_queue_in == lock);
257     table->wait_queue_in= lock->prev;
258   }
259 }
260 
261 /*
262   DESCRIPTION
263     tries to lock a resource 'table' with a lock level 'lock'.
264 
265   RETURN
266     see enum lockman_getlock_result
267 */
268 enum lockman_getlock_result
tablockman_getlock(TABLOCKMAN * lm,TABLE_LOCK_OWNER * lo,LOCKED_TABLE * table,enum lockman_lock_type lock)269 tablockman_getlock(TABLOCKMAN *lm, TABLE_LOCK_OWNER *lo,
270                    LOCKED_TABLE *table, enum lockman_lock_type lock)
271 {
272   TABLE_LOCK *old, *new, *blocker, *blocker2;
273   TABLE_LOCK_OWNER *wait_for;
274   struct timespec timeout;
275   enum lockman_lock_type new_lock;
276   enum lockman_getlock_result res;
277   int i;
278 
279   DBUG_ASSERT(lo->waiting_lock == 0);
280   DBUG_ASSERT(lo->waiting_for == 0);
281   DBUG_ASSERT(lo->waiting_for_loid == 0);
282 
283   mysql_mutex_lock(& table->mutex);
284   /* do we already have a lock on this resource ? */
285   old= find_by_loid(table, lo->loid);
286 
287   /* calculate the level of the upgraded lock, if yes */
288   new_lock= old ? lock_combining_matrix[old->lock_type][lock] : lock;
289 
290   /* and check if old lock is enough to satisfy the new request */
291   if (old && new_lock == old->lock_type)
292   {
293     /* yes */
294     res= getlock_result[old->lock_type][lock];
295     goto ret;
296   }
297 
298   /* no, placing a new lock. first - take a free lock structure from the pool */
299   mysql_mutex_lock(& lm->pool_mutex);
300   new= lm->pool;
301   if (new)
302   {
303     lm->pool= new->next;
304     mysql_mutex_unlock(& lm->pool_mutex);
305   }
306   else
307   {
308     mysql_mutex_unlock(& lm->pool_mutex);
309     new= (TABLE_LOCK *)my_malloc(sizeof(*new), MYF(MY_WME));
310     if (unlikely(!new))
311     {
312       res= NO_MEMORY_FOR_LOCK;
313       goto ret;
314     }
315   }
316 
317   new->loid= lo->loid;
318   new->lock_type= new_lock;
319   new->table= table;
320 
321   /* and try to place it */
322   for (new->prev= table->wait_queue_in;;)
323   {
324     wait_for= 0;
325     if (!old)
326     {
327       /* not upgrading - a lock must be added to the _end_ of the wait queue */
328       for (blocker= new->prev; blocker && !wait_for; blocker= blocker->prev)
329       {
330         TABLE_LOCK_OWNER *tmp= lm->loid_to_tlo(blocker->loid);
331 
332         /* find a blocking lock */
333         DBUG_ASSERT(table->wait_queue_out);
334         DBUG_ASSERT(table->wait_queue_in);
335         if (!lock_compatibility_matrix[blocker->lock_type][lock])
336         {
337           /* found! */
338           wait_for= tmp;
339           break;
340         }
341 
342         /*
343           hmm, the lock before doesn't block us, let's look one step further.
344           the condition below means:
345 
346             if we never waited on a condition yet
347             OR
348             the lock before ours (blocker) waits on a lock (blocker2) that is
349                present in the hash AND and conflicts with 'blocker'
350 
351             the condition after OR may fail if 'blocker2' was removed from
352             the hash, its signal woke us up, but 'blocker' itself didn't see
353             the signal yet.
354         */
355         if (!lo->waiting_lock ||
356             ((blocker2= find_by_loid(table, tmp->waiting_for_loid)) &&
357             !lock_compatibility_matrix[blocker2->lock_type]
358                                       [blocker->lock_type]))
359         {
360           /* but it's waiting for a real lock. we'll wait for the same lock */
361           wait_for= tmp->waiting_for;
362           /*
363             We don't really need tmp->waiting_for, as tmp->waiting_for_loid
364             is enough.  waiting_for is just a local cache to avoid calling
365             loid_to_tlo().
366             But it's essensial that tmp->waiting_for pointer can ONLY
367             be dereferenced if find_by_loid() above returns a non-null
368             pointer, because a TABLE_LOCK_OWNER object that it points to
369             may've been freed when we come here after a signal.
370             In particular tmp->waiting_for_loid cannot be replaced
371             with tmp->waiting_for->loid.
372           */
373           DBUG_ASSERT(wait_for == lm->loid_to_tlo(tmp->waiting_for_loid));
374           break;
375         }
376 
377         /*
378           otherwise - a lock it's waiting for doesn't exist.
379           We've no choice but to scan the wait queue backwards, looking
380           for a conflicting lock or a lock waiting for a real lock.
381           QQ is there a way to avoid this scanning ?
382         */
383       }
384     }
385 
386     if (wait_for == 0)
387     {
388       /* checking for compatibility with existing locks */
389       for (blocker= 0, i= 0; i < LOCK_TYPES; i++)
390       {
391         if (table->active_locks[i] && !lock_compatibility_matrix[i+1][lock])
392         {
393           blocker= table->active_locks[i];
394           /* if the first lock in the list is our own - skip it */
395           if (blocker->loid == lo->loid)
396             blocker= blocker->next;
397           if (blocker) /* found a conflicting lock, need to wait */
398             break;
399         }
400       }
401       if (!blocker) /* free to go */
402         break;
403       wait_for= lm->loid_to_tlo(blocker->loid);
404     }
405 
406     /* ok, we're here - the wait is inevitable */
407     lo->waiting_for= wait_for;
408     lo->waiting_for_loid= wait_for->loid;
409     if (!lo->waiting_lock) /* first iteration of the for() loop */
410     {
411       /* lock upgrade or new lock request ? */
412       if (old)
413       {
414         /* upgrade - add the lock to the _start_ of the wait queue */
415         new->prev= 0;
416         if ((new->next= table->wait_queue_out))
417           new->next->prev= new;
418         table->wait_queue_out= new;
419         if (!table->wait_queue_in)
420           table->wait_queue_in= table->wait_queue_out;
421       }
422       else
423       {
424         /* new lock - add the lock to the _end_ of the wait queue */
425         new->next= 0;
426         if ((new->prev= table->wait_queue_in))
427           new->prev->next= new;
428         table->wait_queue_in= new;
429         if (!table->wait_queue_out)
430           table->wait_queue_out= table->wait_queue_in;
431       }
432       lo->waiting_lock= new;
433 
434       set_timespec_nsec(timeout,lm->lock_timeout * 1000000);
435 
436     }
437 
438     /*
439       prepare to wait.
440       we must lock blocker's mutex to wait on blocker's cond.
441       and we must release table's mutex.
442       note that blocker's mutex is locked _before_ table's mutex is released
443     */
444     mysql_mutex_lock(wait_for->mutex);
445     mysql_mutex_unlock(& table->mutex);
446 
447     /* now really wait */
448     i= mysql_cond_timedwait(wait_for->cond, wait_for->mutex, & timeout);
449 
450     mysql_mutex_unlock(wait_for->mutex);
451 
452     if (i == ETIMEDOUT || i == ETIME)
453     {
454       /* we rely on the caller to rollback and release all locks */
455       res= LOCK_TIMEOUT;
456       goto ret2;
457     }
458 
459     mysql_mutex_lock(& table->mutex);
460 
461     /* ... and repeat from the beginning */
462   }
463   /* yeah! we can place the lock now */
464 
465   /* remove the lock from the wait queue, if it was there */
466   if (lo->waiting_lock)
467   {
468     remove_from_wait_queue(new, table);
469     lo->waiting_lock= 0;
470     lo->waiting_for= 0;
471     lo->waiting_for_loid= 0;
472   }
473 
474   /* add it to the list of all locks of this lock owner */
475   new->next_in_lo= lo->active_locks;
476   lo->active_locks= new;
477 
478   /* and to the list of active locks of this lock type */
479   new->prev= 0;
480   if ((new->next= table->active_locks[new_lock-1]))
481     new->next->prev= new;
482   table->active_locks[new_lock-1]= new;
483 
484   /* update the latest_locks hash */
485   if (old)
486     my_hash_delete(& table->latest_locks, (uchar *)old);
487   my_hash_insert(& table->latest_locks, (uchar *)new);
488 
489   new->upgraded_from= old;
490 
491   res= getlock_result[lock][lock];
492 
493 ret:
494   mysql_mutex_unlock(& table->mutex);
495 ret2:
496   DBUG_ASSERT(res);
497   return res;
498 }
499 
500 /*
501   DESCRIPTION
502     release all locks belonging to a transaction.
503     signal waiters to continue
504 */
tablockman_release_locks(TABLOCKMAN * lm,TABLE_LOCK_OWNER * lo)505 void tablockman_release_locks(TABLOCKMAN *lm, TABLE_LOCK_OWNER *lo)
506 {
507   TABLE_LOCK *lock, *local_pool= 0, *local_pool_end;
508 
509   /*
510     instead of adding released locks to a pool one by one, we'll link
511     them in a list and add to a pool in one short action (under a mutex)
512   */
513   local_pool_end= lo->waiting_lock ? lo->waiting_lock : lo->active_locks;
514   if (!local_pool_end)
515     return;
516 
517   /* release a waiting lock, if any */
518   if ((lock= lo->waiting_lock))
519   {
520     DBUG_ASSERT(lock->loid == lo->loid);
521     mysql_mutex_lock(& lock->table->mutex);
522     remove_from_wait_queue(lock, lock->table);
523 
524     /*
525       a special case: if this lock was not the last in the wait queue
526       and it's compatible with the next lock, than the next lock
527       is waiting for our blocker though really it waits for us, indirectly.
528       Signal our blocker to release this next lock (after we removed our
529       lock from the wait queue, of course).
530     */
531     /*
532       An example to clarify the above:
533         trn1> S-lock the table. Granted.
534         trn2> IX-lock the table. Added to the wait queue. trn2 waits on trn1
535         trn3> IS-lock the table.  The queue is not empty, so IS-lock is added
536               to the queue. It's compatible with the waiting IX-lock, so trn3
537               waits for trn2->waiting_for, that is trn1.
538       if trn1 releases the lock it signals trn1->cond and both waiting
539       transactions are awaken. But if trn2 times out, trn3 must be notified
540       too (as IS and S locks are compatible). So trn2 must signal trn1->cond.
541     */
542     if (lock->next &&
543         lock_compatibility_matrix[lock->next->lock_type][lock->lock_type])
544     {
545       mysql_mutex_lock(lo->waiting_for->mutex);
546       mysql_cond_broadcast(lo->waiting_for->cond);
547       mysql_mutex_unlock(lo->waiting_for->mutex);
548     }
549     lo->waiting_for= 0;
550     lo->waiting_for_loid= 0;
551     mysql_mutex_unlock(& lock->table->mutex);
552 
553     lock->next= local_pool;
554     local_pool= lock;
555   }
556 
557   /* now release granted locks */
558   lock= lo->active_locks;
559   while (lock)
560   {
561     TABLE_LOCK *cur= lock;
562     mysql_mutex_t *mutex= & lock->table->mutex;
563     DBUG_ASSERT(cur->loid == lo->loid);
564 
565     DBUG_ASSERT(lock != lock->next_in_lo);
566     lock= lock->next_in_lo;
567 
568     /* TODO ? group locks by table to reduce the number of mutex locks */
569     mysql_mutex_lock(mutex);
570     my_hash_delete(& cur->table->latest_locks, (uchar *)cur);
571 
572     if (cur->prev)
573       cur->prev->next= cur->next;
574     if (cur->next)
575       cur->next->prev= cur->prev;
576     if (cur->table->active_locks[cur->lock_type-1] == cur)
577       cur->table->active_locks[cur->lock_type-1]= cur->next;
578 
579     cur->next= local_pool;
580     local_pool= cur;
581 
582     mysql_mutex_unlock(mutex);
583   }
584 
585   lo->waiting_lock= lo->active_locks= 0;
586 
587   /*
588     okay, all locks released. now signal that we're leaving,
589     in case somebody's waiting for it
590   */
591   mysql_mutex_lock(lo->mutex);
592   mysql_cond_broadcast(lo->cond);
593   mysql_mutex_unlock(lo->mutex);
594 
595   /* and push all freed locks to the lockman's pool */
596   mysql_mutex_lock(& lm->pool_mutex);
597   local_pool_end->next= lm->pool;
598   lm->pool= local_pool;
599   mysql_mutex_unlock(& lm->pool_mutex);
600 }
601 
tablockman_init(TABLOCKMAN * lm,loid_to_tlo_func * func,uint timeout)602 void tablockman_init(TABLOCKMAN *lm, loid_to_tlo_func *func, uint timeout)
603 {
604   lm->pool= 0;
605   lm->loid_to_tlo= func;
606   lm->lock_timeout= timeout;
607   mysql_mutex_init(& lm->pool_mutex, MY_MUTEX_INIT_FAST);
608   my_interval_timer(); /* ensure that my_interval_timer() is initialized */
609 }
610 
tablockman_destroy(TABLOCKMAN * lm)611 void tablockman_destroy(TABLOCKMAN *lm)
612 {
613   while (lm->pool)
614   {
615     TABLE_LOCK *tmp= lm->pool;
616     lm->pool= tmp->next;
617     my_free(tmp);
618   }
619   mysql_mutex_destroy(& lm->pool_mutex);
620 }
621 
622 /*
623   initialize a LOCKED_TABLE structure
624 
625   SYNOPSYS
626     lt                          a LOCKED_TABLE to initialize
627     initial_hash_size           initial size for 'latest_locks' hash
628 */
tablockman_init_locked_table(LOCKED_TABLE * lt,int initial_hash_size)629 void tablockman_init_locked_table(LOCKED_TABLE *lt, int initial_hash_size)
630 {
631   bzero(lt, sizeof(*lt));
632   mysql_mutex_init(& lt->mutex, MY_MUTEX_INIT_FAST);
633   my_hash_init(& lt->latest_locks, & my_charset_bin, initial_hash_size,
634             offsetof(TABLE_LOCK, loid),
635             sizeof(((TABLE_LOCK*)0)->loid), 0, 0, 0);
636 }
637 
tablockman_destroy_locked_table(LOCKED_TABLE * lt)638 void tablockman_destroy_locked_table(LOCKED_TABLE *lt)
639 {
640   int i;
641 
642   DBUG_ASSERT(lt->wait_queue_out == 0);
643   DBUG_ASSERT(lt->wait_queue_in == 0);
644   DBUG_ASSERT(lt->latest_locks.records == 0);
645   for (i= 0; i<LOCK_TYPES; i++)
646      DBUG_ASSERT(lt->active_locks[i] == 0);
647 
648   my_hash_free(& lt->latest_locks);
649   mysql_mutex_destroy(& lt->mutex);
650 }
651 
652 #ifdef EXTRA_DEBUG
653 static const char *lock2str[LOCK_TYPES+1]= {"N", "S", "X", "IS", "IX", "SIX",
654   "LS", "LX", "SLX", "LSIX"};
655 
tablockman_print_tlo(TABLE_LOCK_OWNER * lo)656 void tablockman_print_tlo(TABLE_LOCK_OWNER *lo)
657 {
658   TABLE_LOCK *lock;
659 
660   printf("lo%d>", lo->loid);
661   if ((lock= lo->waiting_lock))
662     printf(" (%s.0x%lx)", lock2str[lock->lock_type], (ulong)lock->table);
663   for (lock= lo->active_locks;
664        lock && lock != lock->next_in_lo;
665        lock= lock->next_in_lo)
666     printf(" %s.0x%lx", lock2str[lock->lock_type], (ulong)lock->table);
667   if (lock && lock == lock->next_in_lo)
668     printf("!");
669   printf("\n");
670 }
671 #endif
672 
673