1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2008 Brian Aker, Eric Day
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39 /**
40 * @file
41 * @brief Server Thread Definitions
42 */
43
44 #include "gear_config.h"
45 #include "libgearman-server/common.h"
46
47 #include <libgearman/command.h>
48
49 #ifdef __cplusplus
50 # include <cassert>
51 # include <cerrno>
52 #else
53 # include <assert.h>
54 # include <errno.h>
55 #endif
56
57 /*
58 * Private declarations
59 */
60
61 /**
62 * @addtogroup gearman_server_private Private Server Functions
63 * @ingroup gearman_server
64 * @{
65 */
66
67 /**
68 * Try reading packets for a connection.
69 */
70 static gearmand_error_t _thread_packet_read(gearman_server_con_st *con);
71
72 /**
73 * Flush outgoing packets for a connection.
74 */
75 static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con);
76
77 /**
78 * Start processing thread for the server.
79 */
80 static gearmand_error_t _proc_thread_start(gearman_server_st *server);
81
82 /**
83 * Kill processing thread for the server.
84 */
85 static void _proc_thread_kill(gearman_server_st *server);
86
87 /** @} */
88
89 /*
90 * Public definitions
91 */
92
gearman_server_thread_init(gearman_server_st * server,gearman_server_thread_st * thread,gearman_log_server_fn * log_function,gearmand_thread_st * context,gearmand_event_watch_fn * event_watch)93 bool gearman_server_thread_init(gearman_server_st *server,
94 gearman_server_thread_st *thread,
95 gearman_log_server_fn *log_function,
96 gearmand_thread_st *context,
97 gearmand_event_watch_fn *event_watch)
98 {
99 assert(server);
100 assert(thread);
101 if (server->thread_count == 1)
102 {
103 /* The server is going to be multi-threaded, start processing thread. */
104 if (_proc_thread_start(server) != GEARMAN_SUCCESS)
105 {
106 return false;
107 }
108 }
109
110 thread->con_count= 0;
111 thread->io_count= 0;
112 thread->proc_count= 0;
113 thread->to_be_freed_count= 0;
114 thread->free_con_count= 0;
115 thread->free_packet_count= 0;
116 thread->log_fn= log_function;
117 thread->log_context= context;
118 thread->run_fn= NULL;
119 thread->run_fn_arg= NULL;
120 thread->con_list= NULL;
121 thread->io_list= NULL;
122 thread->proc_list= NULL;
123 thread->free_con_list= NULL;
124 thread->free_packet_list= NULL;
125 thread->to_be_freed_list= NULL;
126
127 int error;
128 if ((error= pthread_mutex_init(&(thread->lock), NULL)))
129 {
130 gearmand_perror(error, "pthread_mutex_init");
131 return false;
132 }
133
134 GEARMAN_LIST__ADD(server->thread, thread);
135
136 thread->gearman= &(thread->gearmand_connection_list_static);
137 thread->gearman->init(event_watch, NULL);
138
139 return true;
140 }
141
gearman_server_thread_free(gearman_server_thread_st * thread)142 void gearman_server_thread_free(gearman_server_thread_st *thread)
143 {
144 _proc_thread_kill(Server);
145
146 while (thread->con_list != NULL)
147 {
148 gearman_server_con_free(thread->con_list);
149 }
150
151 while (thread->free_con_list != NULL)
152 {
153 gearman_server_con_st *con= thread->free_con_list;
154 thread->free_con_list= con->next;
155 delete con;
156 }
157
158 while (thread->free_packet_list != NULL)
159 {
160 gearman_server_packet_st *packet= thread->free_packet_list;
161 thread->free_packet_list= packet->next;
162 destroy_gearman_server_packet_st(packet);
163 }
164
165 if (thread->gearman != NULL)
166 {
167 thread->gearman->list_free();
168 }
169
170 pthread_mutex_destroy(&(thread->lock));
171
172 GEARMAN_LIST__DEL(Server->thread, thread);
173 }
174
gearman_server_thread_set_run(gearman_server_thread_st * thread,gearman_server_thread_run_fn * run_fn,void * run_fn_arg)175 void gearman_server_thread_set_run(gearman_server_thread_st *thread,
176 gearman_server_thread_run_fn *run_fn,
177 void *run_fn_arg)
178 {
179 thread->run_fn= run_fn;
180 thread->run_fn_arg= run_fn_arg;
181 }
182
183 gearmand_con_st *
gearman_server_thread_run(gearman_server_thread_st * thread,gearmand_error_t * ret_ptr)184 gearman_server_thread_run(gearman_server_thread_st *thread,
185 gearmand_error_t *ret_ptr)
186 {
187 /* If we are multi-threaded, we may have packets to flush or connections that
188 should start reading again. */
189 if (Server->flags.threaded)
190 {
191 gearman_server_con_st *server_con;
192
193 while ((server_con= gearman_server_con_to_be_freed_next(thread)) != NULL)
194 {
195 if (server_con->is_dead && server_con->proc_removed)
196 gearman_server_con_free(server_con);
197 else
198 gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "con %llu isn't dead %d or proc removed %d, but is in to_be_freed_list",
199 server_con, server_con->is_dead, server_con->proc_removed);
200 }
201
202 while ((server_con= gearman_server_con_io_next(thread)) != NULL)
203 {
204 if (server_con->is_dead)
205 {
206 gearman_server_con_attempt_free(server_con);
207 continue;
208 }
209
210 if (server_con->ret != GEARMAN_SUCCESS)
211 {
212 *ret_ptr= server_con->ret;
213 return gearman_server_con_data(server_con);
214 }
215
216 /* See if any outgoing packets were queued. */
217 *ret_ptr= _thread_packet_flush(server_con);
218 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
219 {
220 return gearman_server_con_data(server_con);
221 }
222 }
223 }
224
225 /* Check for new activity on connections. */
226 {
227 gearman_server_con_st *server_con;
228
229 while ((server_con= gearmand_ready(thread->gearman)))
230 {
231 /* Try to read new packets. */
232 if (server_con->con.revents & POLLIN)
233 {
234 *ret_ptr= _thread_packet_read(server_con);
235 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
236 return gearman_server_con_data(server_con);
237 }
238
239 /* Flush existing outgoing packets. */
240 if (server_con->con.revents & POLLOUT)
241 {
242 *ret_ptr= _thread_packet_flush(server_con);
243 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
244 {
245 return gearman_server_con_data(server_con);
246 }
247 }
248 }
249 }
250
251 /* Start flushing new outgoing packets if we are single threaded. */
252 if (! (Server->flags.threaded))
253 {
254 gearman_server_con_st *server_con;
255 while ((server_con= gearman_server_con_io_next(thread)))
256 {
257 *ret_ptr= _thread_packet_flush(server_con);
258 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
259 {
260 return gearman_server_con_data(server_con);
261 }
262 }
263 }
264
265 /* Check for the two shutdown modes. */
266 if (Server->shutdown)
267 {
268 *ret_ptr= GEARMAN_SHUTDOWN;
269 }
270 else if (Server->shutdown_graceful)
271 {
272 if (Server->job_count == 0)
273 {
274 *ret_ptr= GEARMAN_SHUTDOWN;
275 }
276 else
277 {
278 *ret_ptr= GEARMAN_SHUTDOWN_GRACEFUL;
279 }
280 }
281 else
282 {
283 *ret_ptr= GEARMAN_SUCCESS;
284 }
285
286 return NULL;
287 }
288
289 /*
290 * Private definitions
291 */
292
_thread_packet_read(gearman_server_con_st * con)293 static gearmand_error_t _thread_packet_read(gearman_server_con_st *con)
294 {
295 while (1)
296 {
297 if (con->packet == NULL)
298 {
299 if (! (con->packet= gearman_server_packet_create(con->thread, true)))
300 {
301 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
302 }
303 }
304
305 gearmand_error_t ret;
306 if (gearmand_failed(ret= gearman_io_recv(con, true)))
307 {
308 if (ret == GEARMAN_IO_WAIT)
309 {
310 break;
311 }
312
313 gearman_server_packet_free(con->packet, con->thread, true);
314 con->packet= NULL;
315 return ret;
316 }
317
318 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
319 "Received %s %s:%u",
320 gearman_command_info(con->packet->packet.command)->name,
321 con->_host == NULL ? "-" : con->_host,
322 con->_port == NULL ? "-" : con->_port);
323
324 /* We read a complete packet. */
325 if (Server->flags.threaded)
326 {
327 /* Multi-threaded, queue for the processing thread to run. */
328 gearman_server_proc_packet_add(con, con->packet);
329 con->packet= NULL;
330 }
331 else
332 {
333 /* Single threaded, run the command here. */
334 gearmand_error_t rc= gearman_server_run_command(con, &(con->packet->packet));
335 gearmand_packet_free(&(con->packet->packet));
336 gearman_server_packet_free(con->packet, con->thread, true);
337 con->packet= NULL;
338 if (gearmand_failed(rc))
339 {
340 return rc;
341 }
342 }
343 }
344
345 return GEARMAN_SUCCESS;
346 }
347
_thread_packet_flush(gearman_server_con_st * con)348 static gearmand_error_t _thread_packet_flush(gearman_server_con_st *con)
349 {
350 /* Check to see if we've already tried to avoid excessive system calls. */
351 if (con->con.events & POLLOUT)
352 {
353 return GEARMAN_IO_WAIT;
354 }
355
356 while (con->io_packet_list)
357 {
358 gearmand_error_t ret= gearman_io_send(con, &(con->io_packet_list->packet),
359 con->io_packet_list->next == NULL ? true : false);
360 if (gearmand_failed(ret))
361 {
362 return ret;
363 }
364
365 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
366 "Sent %s to %s:%d",
367 gearman_command_info(con->io_packet_list->packet.command)->name,
368 con->_host == NULL ? "-" : con->_host,
369 con->_port == NULL ? "-" : con->_port);
370
371 gearman_server_io_packet_remove(con);
372 }
373
374 /* Clear the POLLOUT flag. */
375 return gearmand_io_set_events(con, POLLIN);
376 }
377
_proc_thread_start(gearman_server_st * server)378 static gearmand_error_t _proc_thread_start(gearman_server_st *server)
379 {
380 int error;
381 if ((error= pthread_mutex_init(&(server->proc_lock), NULL)))
382 {
383 return gearmand_perror(error, "pthread_mutex_init");
384 }
385
386 if ((error= pthread_cond_init(&(server->proc_cond), NULL)))
387 {
388 return gearmand_perror(error, "pthread_cond_init");
389 }
390
391 pthread_attr_t attr;
392 if ((error= pthread_attr_init(&attr)))
393 {
394 return gearmand_perror(error, "pthread_attr_init");
395 }
396
397 if ((error= pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)))
398 {
399 (void) pthread_attr_destroy(&attr);
400 return gearmand_perror(error, "pthread_attr_setscope");
401 }
402
403 if ((error= pthread_create(&(server->proc_id), &attr, _proc, server)))
404 {
405 (void) pthread_attr_destroy(&attr);
406 return gearmand_perror(error, "pthread_create");
407 }
408
409 if ((error= pthread_attr_destroy(&attr)))
410 {
411 gearmand_perror(error, "pthread_create");
412 }
413
414 server->flags.threaded= true;
415
416 return GEARMAN_SUCCESS;
417 }
418
_proc_thread_kill(gearman_server_st * server)419 static void _proc_thread_kill(gearman_server_st *server)
420 {
421 if (! (server->flags.threaded) || server->proc_shutdown)
422 {
423 return;
424 }
425
426 server->proc_shutdown= true;
427
428 /* Signal proc thread to shutdown. */
429 int error;
430 if ((error= pthread_mutex_lock(&(server->proc_lock))) == 0)
431 {
432 if ((error= pthread_cond_signal(&(server->proc_cond))))
433 {
434 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_cond_signal");
435 }
436
437 if ((error= pthread_mutex_unlock(&(server->proc_lock))))
438 {
439 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock");
440 }
441 }
442 else
443 {
444 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
445 }
446
447 /* Wait for the proc thread to exit and then cleanup. */
448 if ((error= pthread_join(server->proc_id, NULL)))
449 {
450 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_join");
451 }
452
453 if ((error= pthread_cond_destroy(&(server->proc_cond))))
454 {
455 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_cond_destroy");
456 }
457
458 if ((error= pthread_mutex_destroy(&(server->proc_lock))))
459 {
460 gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_destroy");
461 }
462 }
463