1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2008 Brian Aker, Eric Day
7  *  All rights reserved.
8  *
9  *  Redistribution and use in source and binary forms, with or without
10  *  modification, are permitted provided that the following conditions are
11  *  met:
12  *
13  *      * Redistributions of source code must retain the above copyright
14  *  notice, this list of conditions and the following disclaimer.
15  *
16  *      * Redistributions in binary form must reproduce the above
17  *  copyright notice, this list of conditions and the following disclaimer
18  *  in the documentation and/or other materials provided with the
19  *  distribution.
20  *
21  *      * The names of its contributors may not be used to endorse or
22  *  promote products derived from this software without specific prior
23  *  written permission.
24  *
25  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  */
38 
39 /**
40  * @file
41  * @brief Server connection definitions
42  */
43 
44 #include "gear_config.h"
45 #include "libgearman-server/common.h"
46 
47 #include <libgearman/command.h>
48 
49 #include <cassert>
50 #include <cerrno>
51 #include <cstring>
52 #include <memory>
53 
54 #ifndef __INTEL_COMPILER
55 #pragma GCC diagnostic ignored "-Wold-style-cast"
56 #endif
57 
58 /*
59  * Public definitions
60  */
61 
62 gearman_server_packet_st *
gearman_server_packet_create(gearman_server_thread_st * thread,bool from_thread)63 gearman_server_packet_create(gearman_server_thread_st *thread,
64                              bool from_thread)
65 {
66   gearman_server_packet_st *server_packet= NULL;
67 
68   if (from_thread && Server->flags.threaded)
69   {
70     if (thread->free_packet_count > 0)
71     {
72       server_packet= thread->free_packet_list;
73       thread->free_packet_list= server_packet->next;
74       thread->free_packet_count--;
75     }
76   }
77   else
78   {
79     if (Server->free_packet_count > 0)
80     {
81       server_packet= Server->free_packet_list;
82       Server->free_packet_list= server_packet->next;
83       Server->free_packet_count--;
84     }
85   }
86 
87   if (server_packet == NULL)
88   {
89     server_packet= new (std::nothrow) gearman_server_packet_st;
90     if (server_packet == NULL)
91     {
92       gearmand_perror(errno, "new() gearman_server_packet_st");
93       return NULL;
94     }
95   }
96 
97   server_packet->next= NULL;
98 
99   return server_packet;
100 }
101 
destroy_gearman_server_packet_st(gearman_server_packet_st * packet)102 void destroy_gearman_server_packet_st(gearman_server_packet_st *packet)
103 {
104   delete packet;
105 }
106 
gearman_server_packet_free(gearman_server_packet_st * packet,gearman_server_thread_st * thread,bool from_thread)107 void gearman_server_packet_free(gearman_server_packet_st *packet,
108                                 gearman_server_thread_st *thread,
109                                 bool from_thread)
110 {
111   if (from_thread and Server->flags.threaded)
112   {
113     if (thread->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
114     {
115       packet->next= thread->free_packet_list;
116       thread->free_packet_list= packet;
117       thread->free_packet_count++;
118     }
119     else
120     {
121       gearmand_debug("delete() gearman_server_packet_st");
122       delete packet;
123     }
124   }
125   else
126   {
127     if (Server->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
128     {
129       packet->next= Server->free_packet_list;
130       Server->free_packet_list= packet;
131       Server->free_packet_count++;
132     }
133     else
134     {
135       gearmand_debug("delete() gearman_server_packet_st");
136       delete packet;
137     }
138   }
139 }
140 
gearman_server_io_packet_add(gearman_server_con_st * con,bool take_data,enum gearman_magic_t magic,gearman_command_t command,const void * arg,...)141 gearmand_error_t gearman_server_io_packet_add(gearman_server_con_st *con,
142                                               bool take_data,
143                                               enum gearman_magic_t magic,
144                                               gearman_command_t command,
145                                               const void *arg, ...)
146 {
147   gearman_server_packet_st *server_packet;
148   va_list ap;
149 
150   server_packet= gearman_server_packet_create(con->thread, false);
151   if (server_packet == NULL)
152   {
153     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
154   }
155 
156   server_packet->packet.reset(magic, command);
157 
158   va_start(ap, arg);
159 
160   while (arg)
161   {
162     size_t arg_size= va_arg(ap, size_t);
163 
164     gearmand_error_t ret= gearmand_packet_create(&(server_packet->packet), arg, arg_size);
165     if (gearmand_failed(ret))
166     {
167       va_end(ap);
168       gearmand_packet_free(&(server_packet->packet));
169       gearman_server_packet_free(server_packet, con->thread, false);
170       return ret;
171     }
172 
173     arg= va_arg(ap, void *);
174   }
175 
176   va_end(ap);
177 
178   gearmand_error_t ret= gearmand_packet_pack_header(&(server_packet->packet));
179   if (gearmand_failed(ret))
180   {
181     gearmand_packet_free(&(server_packet->packet));
182     gearman_server_packet_free(server_packet, con->thread, false);
183     return ret;
184   }
185 
186   if (take_data)
187   {
188     server_packet->packet.options.free_data= true;
189   }
190 
191   int error;
192   if ((error= pthread_mutex_lock(&con->thread->lock)) == 0)
193   {
194     GEARMAN_FIFO__ADD(con->io_packet, server_packet);
195     if ((error= pthread_mutex_unlock(&con->thread->lock)))
196     {
197       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock");
198     }
199   }
200   else
201   {
202     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
203   }
204 
205   gearman_server_con_io_add(con);
206 
207   return GEARMAN_SUCCESS;
208 }
209 
gearman_server_io_packet_remove(gearman_server_con_st * con)210 void gearman_server_io_packet_remove(gearman_server_con_st *con)
211 {
212   gearman_server_packet_st *server_packet= con->io_packet_list;
213 
214   gearmand_packet_free(&(server_packet->packet));
215 
216   int error;
217   if ((error= pthread_mutex_lock(&con->thread->lock)) == 0)
218   {
219     GEARMAN_FIFO__DEL(con->io_packet, server_packet);
220     if ((error= pthread_mutex_unlock(&con->thread->lock)))
221     {
222       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock");
223     }
224   }
225   else
226   {
227     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
228   }
229 
230   gearman_server_packet_free(server_packet, con->thread, true);
231 }
232 
gearman_server_proc_packet_add(gearman_server_con_st * con,gearman_server_packet_st * packet)233 void gearman_server_proc_packet_add(gearman_server_con_st *con,
234                                     gearman_server_packet_st *packet)
235 {
236   int error;
237   if ((error= pthread_mutex_lock(&con->thread->lock)) == 0)
238   {
239     GEARMAN_FIFO__ADD(con->proc_packet, packet);
240     if ((error= pthread_mutex_unlock(&con->thread->lock)))
241     {
242       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock");
243     }
244   }
245   else
246   {
247     gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
248   }
249 
250   gearman_server_con_proc_add(con);
251 }
252 
253 gearman_server_packet_st *
gearman_server_proc_packet_remove(gearman_server_con_st * con)254 gearman_server_proc_packet_remove(gearman_server_con_st *con)
255 {
256   gearman_server_packet_st *server_packet= con->proc_packet_list;
257 
258   if (server_packet)
259   {
260     int error;
261     if ((error= pthread_mutex_lock(&con->thread->lock)) == 0)
262     {
263       GEARMAN_FIFO__DEL(con->proc_packet, server_packet);
264       if ((error= pthread_mutex_unlock(&con->thread->lock)) != 0)
265       {
266         gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_unlock");
267       }
268     }
269     else
270     {
271       gearmand_log_fatal_perror(GEARMAN_DEFAULT_LOG_PARAM, error, "pthread_mutex_lock");
272     }
273   }
274 
275   return server_packet;
276 }
277 
gearmand_strcommand(gearmand_packet_st * packet)278 const char *gearmand_strcommand(gearmand_packet_st *packet)
279 {
280   assert(packet);
281   return gearman_command_info(packet->command)->name;
282 }
283 
packet_create_arg(gearmand_packet_st * packet,const void * arg,size_t arg_size)284 inline static gearmand_error_t packet_create_arg(gearmand_packet_st *packet,
285                                                  const void *arg, size_t arg_size)
286 {
287   if (packet->argc == gearman_command_info(packet->command)->argc and
288       (not (gearman_command_info(packet->command)->data) or
289        packet->data))
290   {
291     gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "too many arguments for command(%s)", gearman_command_info(packet->command)->name);
292     return GEARMAN_TOO_MANY_ARGS;
293   }
294 
295   if (packet->argc == gearman_command_info(packet->command)->argc)
296   {
297     packet->data= static_cast<const char *>(arg);
298     packet->data_size= arg_size;
299     return GEARMAN_SUCCESS;
300   }
301 
302   if (packet->args_size == 0 and packet->magic != GEARMAN_MAGIC_TEXT)
303   {
304     packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
305   }
306 
307   if ((packet->args_size + arg_size) < GEARMAN_ARGS_BUFFER_SIZE)
308   {
309     packet->args= packet->args_buffer;
310   }
311   else
312   {
313     gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "resizing packet buffer");
314     if (packet->args == packet->args_buffer)
315     {
316       packet->args= (char *)malloc(packet->args_size + arg_size);
317       memcpy(packet->args, packet->args_buffer, packet->args_size);
318     }
319     else
320     {
321       char *new_args= (char *)realloc(packet->args, packet->args_size + arg_size);
322       if (new_args == NULL)
323       {
324         return gearmand_perror(errno, "realloc");
325       }
326       packet->args= new_args;
327     }
328   }
329 
330   memcpy(packet->args + packet->args_size, arg, arg_size);
331   packet->args_size+= arg_size;
332   packet->arg_size[packet->argc]= arg_size;
333   packet->argc++;
334 
335   size_t offset;
336   if (packet->magic == GEARMAN_MAGIC_TEXT)
337   {
338     offset= 0;
339   }
340   else
341   {
342     offset= GEARMAN_PACKET_HEADER_SIZE;
343   }
344 
345   for (uint8_t x= 0; x < packet->argc; ++x)
346   {
347     packet->arg[x]= packet->args + offset;
348     offset+= packet->arg_size[x];
349   }
350 
351   return GEARMAN_SUCCESS;
352 }
353 
354 /** @} */
355 
356 /*
357  * Public Definitions
358  */
359 
360 
reset(enum gearman_magic_t magic_,gearman_command_t command_)361 void gearmand_packet_st::reset(enum gearman_magic_t magic_, gearman_command_t command_)
362 {
363   options.complete= false;
364   options.free_data= false;
365 
366   magic= magic_;
367   command= command_;
368   argc= 0;
369   args_size= 0;
370   data_size= 0;
371 
372   args= NULL;
373   data= NULL;
374 }
375 
gearmand_packet_create(gearmand_packet_st * packet,const void * arg,size_t arg_size)376 gearmand_error_t gearmand_packet_create(gearmand_packet_st *packet,
377                                           const void *arg, size_t arg_size)
378 {
379   return packet_create_arg(packet, arg, arg_size);
380 }
381 
gearmand_packet_free(gearmand_packet_st * packet)382 void gearmand_packet_free(gearmand_packet_st *packet)
383 {
384   if (packet->args != packet->args_buffer && packet->args != NULL)
385   {
386     gearmand_debug("free packet's args");
387     free(packet->args);
388     packet->args= NULL;
389   }
390 
391   if (packet->options.free_data && packet->data != NULL)
392   {
393     gearmand_debug("free() packet's data");
394     free((void *)packet->data); //@todo fix the need for the casting.
395     packet->data= NULL;
396   }
397 }
398 
gearmand_packet_pack_header(gearmand_packet_st * packet)399 gearmand_error_t gearmand_packet_pack_header(gearmand_packet_st *packet)
400 {
401   if (packet->magic == GEARMAN_MAGIC_TEXT)
402   {
403     packet->options.complete= true;
404     return GEARMAN_SUCCESS;
405   }
406 
407   if (packet->args_size == 0)
408   {
409     packet->args= packet->args_buffer;
410     packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
411   }
412 
413   switch (packet->magic)
414   {
415   case GEARMAN_MAGIC_TEXT:
416     break;
417 
418   case GEARMAN_MAGIC_REQUEST:
419     memcpy(packet->args, "\0REQ", 4);
420     break;
421 
422   case GEARMAN_MAGIC_RESPONSE:
423     memcpy(packet->args, "\0RES", 4);
424     break;
425 
426   default:
427     gearmand_error("invalid magic value");
428     return GEARMAN_INVALID_MAGIC;
429   }
430 
431   if (packet->command == GEARMAN_COMMAND_TEXT ||
432       packet->command >= GEARMAN_COMMAND_MAX)
433   {
434     gearmand_error("invalid command value");
435     return GEARMAN_INVALID_COMMAND;
436   }
437 
438   uint32_t tmp= packet->command;
439   tmp= htonl(tmp);
440   memcpy(packet->args + 4, &tmp, 4);
441 
442   uint64_t length_64= packet->args_size + packet->data_size - GEARMAN_PACKET_HEADER_SIZE;
443 
444   // Check for overflow on 32bit(portable?).
445   if (length_64 >= UINT32_MAX || length_64 < packet->data_size)
446   {
447     gearmand_error("data size too too long");
448     return GEARMAN_ARGUMENT_TOO_LARGE;
449   }
450 
451   tmp= (uint32_t)length_64;
452   tmp= htonl(tmp);
453   memcpy(packet->args + 8, &tmp, 4);
454 
455   packet->options.complete= true;
456 
457   return GEARMAN_SUCCESS;
458 }
459