1 /*
2    Copyright (c) 2007, 2008, Sun Microsystems, Inc,
3    Copyright (c) 2011, 2012, Monty Program Ab
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
17 
18 #include <my_global.h>
19 #include <wqueue.h>
20 
21 #define STRUCT_PTR(TYPE, MEMBER, a)                                           \
22           (TYPE *) ((char *) (a) - offsetof(TYPE, MEMBER))
23 /*
24   Link a thread into double-linked queue of waiting threads.
25 
26   SYNOPSIS
27     wqueue_link_into_queue()
28       wqueue              pointer to the queue structure
29       thread              pointer to the thread to be added to the queue
30 
31   RETURN VALUE
32     none
33 
34   NOTES.
35     Queue is represented by a circular list of the thread structures
36     The list is double-linked of the type (**prev,*next), accessed by
37     a pointer to the last element.
38 */
39 
wqueue_link_into_queue(WQUEUE * wqueue,struct st_my_thread_var * thread)40 void wqueue_link_into_queue(WQUEUE *wqueue, struct st_my_thread_var *thread)
41 {
42   struct st_my_thread_var *last;
43   if (!(last= wqueue->last_thread))
44   {
45     /* Queue is empty */
46     thread->next= thread;
47     thread->prev= &thread->next;
48   }
49   else
50   {
51     thread->prev= last->next->prev;
52     last->next->prev= &thread->next;
53     thread->next= last->next;
54     last->next= thread;
55   }
56   wqueue->last_thread= thread;
57 }
58 
59 
60 /*
61   Add a thread to single-linked queue of waiting threads
62 
63   SYNOPSIS
64     wqueue_add_to_queue()
65       wqueue              pointer to the queue structure
66       thread              pointer to the thread to be added to the queue
67 
68   RETURN VALUE
69     none
70 
71   NOTES.
72     Queue is represented by a circular list of the thread structures
73     The list is single-linked of the type (*next), accessed by a pointer
74     to the last element.
75 */
76 
wqueue_add_to_queue(WQUEUE * wqueue,struct st_my_thread_var * thread)77 void wqueue_add_to_queue(WQUEUE *wqueue, struct st_my_thread_var *thread)
78 {
79   struct st_my_thread_var *last;
80   if (!(last= wqueue->last_thread))
81     thread->next= thread;
82   else
83   {
84     thread->next= last->next;
85     last->next= thread;
86   }
87 #ifndef DBUG_OFF
88   thread->prev= NULL; /* force segfault if used */
89 #endif
90   wqueue->last_thread= thread;
91 }
92 
93 /*
94   Unlink a thread from double-linked queue of waiting threads
95 
96   SYNOPSIS
97     wqueue_unlink_from_queue()
98       wqueue              pointer to the queue structure
99       thread              pointer to the thread to be removed from the queue
100 
101   RETURN VALUE
102     none
103 
104   NOTES.
105     See NOTES for link_into_queue
106 */
107 
wqueue_unlink_from_queue(WQUEUE * wqueue,struct st_my_thread_var * thread)108 void wqueue_unlink_from_queue(WQUEUE *wqueue, struct st_my_thread_var *thread)
109 {
110   if (thread->next == thread)
111     /* The queue contains only one member */
112     wqueue->last_thread= NULL;
113   else
114   {
115     thread->next->prev= thread->prev;
116     *thread->prev= thread->next;
117     if (wqueue->last_thread == thread)
118       wqueue->last_thread= STRUCT_PTR(struct st_my_thread_var, next,
119                                       thread->prev);
120   }
121   thread->next= NULL;
122 }
123 
124 
125 /*
126   Remove all threads from queue signaling them to proceed
127 
128   SYNOPSIS
129     wqueue_realease_queue()
130       wqueue              pointer to the queue structure
131       thread              pointer to the thread to be added to the queue
132 
133   RETURN VALUE
134     none
135 
136   NOTES.
137     See notes for add_to_queue
138     When removed from the queue each thread is signaled via condition
139     variable thread->suspend.
140 */
141 
wqueue_release_queue(WQUEUE * wqueue)142 void wqueue_release_queue(WQUEUE *wqueue)
143 {
144   struct st_my_thread_var *last= wqueue->last_thread;
145   struct st_my_thread_var *next= last->next;
146   struct st_my_thread_var *thread;
147   do
148   {
149     thread= next;
150     mysql_cond_signal(&thread->suspend);
151     next= thread->next;
152     thread->next= NULL;
153   }
154   while (thread != last);
155   wqueue->last_thread= NULL;
156 }
157 
158 
159 /**
160   @brief Removes all threads waiting for read or first one waiting for write.
161 
162   @param wqueue          pointer to the queue structure
163   @param thread          pointer to the thread to be added to the queue
164 
165   @note This function is applicable only to single linked lists.
166 */
167 
wqueue_release_one_locktype_from_queue(WQUEUE * wqueue)168 void wqueue_release_one_locktype_from_queue(WQUEUE *wqueue)
169 {
170   struct st_my_thread_var *last= wqueue->last_thread;
171   struct st_my_thread_var *next= last->next;
172   struct st_my_thread_var *thread;
173   struct st_my_thread_var *new_list= NULL;
174   uint first_type= next->lock_type;
175   if (first_type == MY_PTHREAD_LOCK_WRITE)
176   {
177     /* release first waiting for write lock */
178     mysql_cond_signal(&next->suspend);
179     if (next == last)
180       wqueue->last_thread= NULL;
181     else
182       last->next= next->next;
183     next->next= NULL;
184     return;
185   }
186   do
187   {
188     thread= next;
189     next= thread->next;
190     if (thread->lock_type == MY_PTHREAD_LOCK_WRITE)
191     {
192       /* skip waiting for write lock */
193       if (new_list)
194       {
195         thread->next= new_list->next;
196         new_list= new_list->next= thread;
197       }
198       else
199         new_list= thread->next= thread;
200     }
201     else
202     {
203       /* release waiting for read lock */
204       mysql_cond_signal(&thread->suspend);
205       thread->next= NULL;
206     }
207   } while (thread != last);
208   wqueue->last_thread= new_list;
209 }
210 
211 
212 /*
213   Add thread and wait
214 
215   SYNOPSIS
216     wqueue_add_and_wait()
217     wqueue               queue to add to
218     thread               thread which is waiting
219     lock                 mutex need for the operation
220 */
221 
wqueue_add_and_wait(WQUEUE * wqueue,struct st_my_thread_var * thread,mysql_mutex_t * lock)222 void wqueue_add_and_wait(WQUEUE *wqueue,
223                          struct st_my_thread_var *thread,
224                          mysql_mutex_t *lock)
225 {
226   DBUG_ENTER("wqueue_add_and_wait");
227   DBUG_PRINT("enter",
228              ("thread: %p cond: %p mutex: %p",
229               thread, &thread->suspend, lock));
230   wqueue_add_to_queue(wqueue, thread);
231   do
232   {
233     DBUG_PRINT("info", ("wait... cond:  %p  mutex:  %p",
234                         &thread->suspend, lock));
235     mysql_cond_wait(&thread->suspend, lock);
236     DBUG_PRINT("info", ("wait done cond: %p mutex: %p next: %p",
237                         &thread->suspend, lock,
238                         thread->next));
239   }
240   while (thread->next);
241   DBUG_VOID_RETURN;
242 }
243