1 /*
2    Copyright (c) 2011, 2013 Monty Program Ab.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 
18 #ifndef MY_APC_STANDALONE
19 
20 #include "mariadb.h"
21 #include "sql_class.h"
22 
23 #endif
24 
25 /* For standalone testing of APC system, see unittest/sql/my_apc-t.cc */
26 
27 /*
28   Initialize the target.
29 
30   @note
31   Initialization must be done prior to enabling/disabling the target, or making
32   any call requests to it.
33   Initial state after initialization is 'disabled'.
34 */
init(mysql_mutex_t * target_mutex)35 void Apc_target::init(mysql_mutex_t *target_mutex)
36 {
37   DBUG_ASSERT(!enabled);
38   LOCK_thd_kill_ptr= target_mutex;
39 #ifndef DBUG_OFF
40   n_calls_processed= 0;
41 #endif
42 }
43 
44 
45 /* [internal] Put request qe into the request list */
46 
enqueue_request(Call_request * qe)47 void Apc_target::enqueue_request(Call_request *qe)
48 {
49   mysql_mutex_assert_owner(LOCK_thd_kill_ptr);
50   if (apc_calls)
51   {
52     Call_request *after= apc_calls->prev;
53     qe->next= apc_calls;
54     apc_calls->prev= qe;
55 
56     qe->prev= after;
57     after->next= qe;
58   }
59   else
60   {
61     apc_calls= qe;
62     qe->next= qe->prev= qe;
63   }
64 }
65 
66 
67 /*
68   [internal] Remove request qe from the request queue.
69 
70   The request is not necessarily first in the queue.
71 */
72 
dequeue_request(Call_request * qe)73 void Apc_target::dequeue_request(Call_request *qe)
74 {
75   mysql_mutex_assert_owner(LOCK_thd_kill_ptr);
76   if (apc_calls == qe)
77   {
78     if ((apc_calls= apc_calls->next) == qe)
79     {
80       apc_calls= NULL;
81     }
82   }
83 
84   qe->prev->next= qe->next;
85   qe->next->prev= qe->prev;
86 }
87 
88 #ifdef HAVE_PSI_INTERFACE
89 
90 /* One key for all conds */
91 PSI_cond_key key_show_explain_request_COND;
92 
93 static PSI_cond_info show_explain_psi_conds[]=
94 {
95   { &key_show_explain_request_COND, "show_explain", 0 /* not using PSI_FLAG_GLOBAL*/ }
96 };
97 
init_show_explain_psi_keys(void)98 void init_show_explain_psi_keys(void)
99 {
100   if (PSI_server == NULL)
101     return;
102 
103   PSI_server->register_cond("sql", show_explain_psi_conds,
104                             array_elements(show_explain_psi_conds));
105 }
106 #endif
107 
108 
109 /*
110   Make an APC (Async Procedure Call) to another thread.
111 
112   @detail
113   Make an APC call: schedule it for execution and wait until the target
114   thread has executed it.
115 
116   - The caller is responsible for making sure he's not posting request
117     to the thread he's calling this function from.
118 
119   - The caller must have locked target_mutex. The function will release it.
120 
121   @retval FALSE - Ok, the call has been made
122   @retval TRUE  - Call wasnt made (either the target is in disabled state or
123                     timeout occurred)
124 */
125 
make_apc_call(THD * caller_thd,Apc_call * call,int timeout_sec,bool * timed_out)126 bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call,
127                                int timeout_sec, bool *timed_out)
128 {
129   bool res= TRUE;
130   *timed_out= FALSE;
131 
132   if (enabled)
133   {
134     /* Create and post the request */
135     Call_request apc_request;
136     apc_request.call= call;
137     apc_request.processed= FALSE;
138     mysql_cond_init(key_show_explain_request_COND, &apc_request.COND_request,
139                     NULL);
140     enqueue_request(&apc_request);
141     apc_request.what="enqueued by make_apc_call";
142 
143     struct timespec abstime;
144     const int timeout= timeout_sec;
145     set_timespec(abstime, timeout);
146 
147     int wait_res= 0;
148     PSI_stage_info old_stage;
149     caller_thd->ENTER_COND(&apc_request.COND_request, LOCK_thd_kill_ptr,
150                            &stage_show_explain, &old_stage);
151     /* todo: how about processing other errors here? */
152     while (!apc_request.processed && (wait_res != ETIMEDOUT))
153     {
154       /* We own LOCK_thd_kill_ptr */
155       wait_res= mysql_cond_timedwait(&apc_request.COND_request,
156                                      LOCK_thd_kill_ptr, &abstime);
157                                       // &apc_request.LOCK_request, &abstime);
158       if (caller_thd->killed)
159         break;
160     }
161 
162     if (!apc_request.processed)
163     {
164       /*
165         The wait has timed out, or this thread was KILLed.
166         Remove the request from the queue (ok to do because we own
167         LOCK_thd_kill_ptr)
168       */
169       apc_request.processed= TRUE;
170       dequeue_request(&apc_request);
171       *timed_out= TRUE;
172       res= TRUE;
173     }
174     else
175     {
176       /* Request was successfully executed and dequeued by the target thread */
177       res= FALSE;
178     }
179     /*
180       exit_cond() will call mysql_mutex_unlock(LOCK_thd_kill_ptr) for us:
181     */
182     caller_thd->EXIT_COND(&old_stage);
183 
184     /* Destroy all APC request data */
185     mysql_cond_destroy(&apc_request.COND_request);
186   }
187   else
188   {
189     mysql_mutex_unlock(LOCK_thd_kill_ptr);
190   }
191   return res;
192 }
193 
194 
195 /*
196   Process all APC requests.
197   This should be called periodically by the APC target thread.
198 */
199 
process_apc_requests()200 void Apc_target::process_apc_requests()
201 {
202   while (1)
203   {
204     Call_request *request;
205 
206     mysql_mutex_lock(LOCK_thd_kill_ptr);
207     if (!(request= get_first_in_queue()))
208     {
209       /* No requests in the queue */
210       mysql_mutex_unlock(LOCK_thd_kill_ptr);
211       break;
212     }
213 
214     /*
215       Remove the request from the queue (we're holding queue lock so we can be
216       sure that request owner won't try to remove it)
217     */
218     request->what="dequeued by process_apc_requests";
219     dequeue_request(request);
220     request->processed= TRUE;
221 
222     request->call->call_in_target_thread();
223     request->what="func called by process_apc_requests";
224 
225 #ifndef DBUG_OFF
226     n_calls_processed++;
227 #endif
228     mysql_cond_signal(&request->COND_request);
229     mysql_mutex_unlock(LOCK_thd_kill_ptr);
230   }
231 }
232 
233