1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2008 Brian Aker, Eric Day
7  *  All rights reserved.
8  *
9  *  Redistribution and use in source and binary forms, with or without
10  *  modification, are permitted provided that the following conditions are
11  *  met:
12  *
13  *      * Redistributions of source code must retain the above copyright
14  *  notice, this list of conditions and the following disclaimer.
15  *
16  *      * Redistributions in binary form must reproduce the above
17  *  copyright notice, this list of conditions and the following disclaimer
18  *  in the documentation and/or other materials provided with the
19  *  distribution.
20  *
21  *      * The names of its contributors may not be used to endorse or
22  *  promote products derived from this software without specific prior
23  *  written permission.
24  *
25  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  */
38 
39 
40 
41 /**
42  * @file
43  * @brief Server job definitions
44  */
45 
46 #include "gear_config.h"
47 #include "libgearman-server/common.h"
48 #include <string.h>
49 
50 #include <libgearman-server/queue.h>
51 
52 /*
53  * Private declarations
54  */
55 
56 /**
57  * @addtogroup gearman_server_job_private Private Server Job Functions
58  * @ingroup gearman_server_job
59  * @{
60  */
61 
62 /**
63  * Get a server job structure from the unique ID. If data_size is non-zero,
64  * then unique points to the workload data and not a real unique key.
65  */
_server_job_get_unique(gearman_server_st * server,uint32_t unique_key,gearman_server_function_st * server_function,const char * unique,size_t data_size)66 static gearman_server_job_st * _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
67                                                       gearman_server_function_st *server_function,
68                                                       const char *unique, size_t data_size)
69 {
70   gearman_server_job_st *server_job;
71 
72   for (server_job= server->unique_hash[unique_key % server->hashtable_buckets];
73        server_job != NULL; server_job= server_job->unique_next)
74   {
75     if (data_size == 0)
76     {
77       if (server_job->function == server_function &&
78           server_job->unique_key == unique_key &&
79           !strcmp(server_job->unique, unique))
80       {
81         return server_job;
82       }
83     }
84     else
85     {
86       if (server_job->function == server_function &&
87           server_job->unique_key == unique_key &&
88           server_job->data_size == data_size &&
89           memcmp(server_job->data, unique, data_size) == 0)
90       {
91         return server_job;
92       }
93     }
94   }
95 
96   return NULL;
97 }
98 
99 /** @} */
100 
101 #pragma GCC diagnostic ignored "-Wold-style-cast"
102 
103 /*
104  * Public definitions
105  */
gearman_server_job_add(gearman_server_st * server,const char * function_name,size_t function_name_size,const char * unique,size_t unique_size,const void * data,size_t data_size,gearman_job_priority_t priority,gearman_server_client_st * server_client,gearmand_error_t * ret_ptr,int64_t when)106 gearman_server_job_st * gearman_server_job_add(gearman_server_st *server,
107                                                const char *function_name, size_t function_name_size,
108                                                const char *unique, size_t unique_size,
109                                                const void *data, size_t data_size,
110                                                gearman_job_priority_t priority,
111                                                gearman_server_client_st *server_client,
112                                                gearmand_error_t *ret_ptr,
113                                                int64_t when)
114 {
115   return gearman_server_job_add_reducer(server,
116                                         function_name, function_name_size,
117                                         unique, unique_size,
118                                         NULL, 0, // reducer
119                                         data, data_size,
120                                         priority, server_client, ret_ptr, when);
121 }
122 
123 gearman_server_job_st *
gearman_server_job_add_reducer(gearman_server_st * server,const char * function_name,size_t function_name_size,const char * unique,size_t unique_size,const char * reducer_name,size_t reducer_size,const void * data,size_t data_size,gearman_job_priority_t priority,gearman_server_client_st * server_client,gearmand_error_t * ret_ptr,int64_t when)124 gearman_server_job_add_reducer(gearman_server_st *server,
125                                const char *function_name, size_t function_name_size,
126                                const char *unique, size_t unique_size,
127                                const char *reducer_name, size_t reducer_size,
128                                const void *data, size_t data_size,
129                                gearman_job_priority_t priority,
130                                gearman_server_client_st *server_client,
131                                gearmand_error_t *ret_ptr,
132                                int64_t when)
133 {
134   gearman_server_function_st *server_function= gearman_server_function_get(server, function_name, function_name_size);
135   if (server_function == NULL)
136   {
137     *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
138     return NULL;
139   }
140 
141   uint32_t key;
142   gearman_server_job_st *server_job;
143   if (unique_size == 0)
144   {
145     server_job= NULL;
146     key= 0;
147   }
148   else
149   {
150     if (unique_size == 1 && *unique ==  '-')
151     {
152       if (data_size == 0)
153       {
154         key= 0;
155         server_job= NULL;
156       }
157       else
158       {
159         /* Look up job via unique data when unique = '-'. */
160         key= _server_job_hash((const char*)data, data_size);
161         server_job= _server_job_get_unique(server, key, server_function, (const char*)data, data_size);
162       }
163     }
164     else
165     {
166       /* Look up job via unique ID first to make sure it's not a duplicate. */
167       key= _server_job_hash(unique, unique_size);
168       server_job= _server_job_get_unique(server, key, server_function, unique, 0);
169     }
170   }
171 
172   if (server_job == NULL)
173   {
174     gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Comparing queue %u to limit %u for priority %u",
175       server_function->job_total, server_function->max_queue_size[priority],
176       priority);
177     if (server_function->max_queue_size[priority] > 0 &&
178         server_function->job_total >= server_function->max_queue_size[priority])
179     {
180       *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
181       return NULL;
182     }
183 
184     server_job= gearman_server_job_create(server);
185     if (server_job == NULL)
186     {
187       *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
188       return NULL;
189     }
190 
191     server_job->priority= priority;
192 
193     server_job->function= server_function;
194     server_function->job_total++;
195 
196     int checked_length;
197     checked_length= snprintf(server_job->job_handle, GEARMAND_JOB_HANDLE_SIZE, "%s:%u",
198                              server->job_handle_prefix, server->job_handle_count);
199 
200     if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
201     {
202       gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Job handle plus handle count beyond GEARMAND_JOB_HANDLE_SIZE: %s:%u",
203                          server->job_handle_prefix, server->job_handle_count);
204     }
205 
206     server_job->unique_length= unique_size;
207     checked_length= snprintf(server_job->unique, GEARMAN_MAX_UNIQUE_SIZE, "%.*s",
208                              (int)unique_size, unique);
209     if (checked_length >= GEARMAN_MAX_UNIQUE_SIZE || checked_length < 0)
210     {
211       gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "We received a unique beyond GEARMAN_MAX_UNIQUE_SIZE: %.*s", (int)unique_size, unique);
212     }
213 
214     server->job_handle_count++;
215     server_job->data= data;
216     server_job->data_size= data_size;
217 		server_job->when= when;
218 
219     if (reducer_size)
220     {
221       strncpy(server_job->reducer, reducer_name, reducer_size);
222       server_job->reducer[reducer_size]= 0;
223     }
224     else
225     {
226       server_job->reducer[0]= 0;
227     }
228 
229     server_job->unique_key= key;
230     key= key % server->hashtable_buckets;
231     GEARMAN_HASH_ADD(server->unique, key, server_job, unique_);
232 
233     key= _server_job_hash(server_job->job_handle,
234                           strlen(server_job->job_handle));
235     server_job->job_handle_key= key;
236     key= key % server->hashtable_buckets;
237     GEARMAN_HASH__ADD(server->job, key, server_job);
238 
239     if (server->state.queue_startup)
240     {
241       server_job->job_queued= true;
242     }
243     else if (server_client == NULL)
244     {
245       *ret_ptr= gearman_queue_add(server,
246                                   server_job->unique, unique_size,
247                                   function_name,
248                                   function_name_size,
249                                   data, data_size, priority,
250                                   when);
251       if (gearmand_failed(*ret_ptr))
252       {
253         server_job->data= NULL;
254         gearman_server_job_free(server_job);
255         return NULL;
256       }
257 
258       {
259         *ret_ptr= gearman_queue_flush(server);
260         if (*ret_ptr != GEARMAN_SUCCESS)
261         {
262           server_job->data= NULL;
263           gearman_server_job_free(server_job);
264           return NULL;
265         }
266       }
267 
268       server_job->job_queued= true;
269     }
270 
271     *ret_ptr= gearman_server_job_queue(server_job);
272     if (gearmand_failed(*ret_ptr))
273     {
274       if (server_client == NULL)
275       {
276         /* Do our best to remove the job from the queue. */
277         (void)gearman_queue_done(server,
278                                  server_job->unique, unique_size,
279                                  server_job->function->function_name,
280                                  server_job->function->function_name_size);
281       }
282 
283       gearman_server_job_free(server_job);
284       return NULL;
285     }
286   }
287   else
288   {
289     *ret_ptr= GEARMAN_JOB_EXISTS;
290   }
291 
292   if (server_client)
293   {
294     server_client->job= server_job;
295     GEARMAN_LIST_ADD(server_job->client, server_client, job_);
296   }
297 
298   return server_job;
299 }
300 
gearman_server_job_free(gearman_server_job_st * server_job)301 void gearman_server_job_free(gearman_server_job_st *server_job)
302 {
303   if (server_job == NULL)
304   {
305     return;
306   }
307 
308   if (server_job->worker != NULL)
309   {
310     server_job->function->job_running--;
311   }
312 
313   server_job->function->job_total--;
314 
315   if (server_job->data != NULL)
316   {
317     free((void *)(server_job->data));
318     server_job->data= NULL;
319   }
320 
321   while (server_job->client_list != NULL)
322   {
323     gearman_server_client_free(server_job->client_list);
324   }
325 
326   if (server_job->worker != NULL)
327   {
328     GEARMAN_LIST_DEL(server_job->worker->job, server_job, worker_);
329   }
330 
331   uint32_t key= server_job->unique_key % Server->hashtable_buckets;
332   GEARMAN_HASH_DEL(Server->unique, key, server_job, unique_);
333 
334   key= server_job->job_handle_key % Server->hashtable_buckets;
335   GEARMAN_HASH__DEL(Server->job, key, server_job);
336 
337   if (Server->free_job_count < GEARMAN_MAX_FREE_SERVER_JOB)
338   {
339     gearman_server_st *server= Server;
340     GEARMAN_LIST__ADD(server->free_job, server_job);
341   }
342   else
343   {
344     destroy_gearman_server_job_st(server_job);
345   }
346 }
347 
gearman_server_job_queue(gearman_server_job_st * job)348 gearmand_error_t gearman_server_job_queue(gearman_server_job_st *job)
349 {
350   if (job->worker)
351   {
352     job->retries++;
353     if (Server->job_retries != 0 && Server->job_retries == job->retries)
354     {
355       gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
356                          "Dropped job due to max retry count: %s %.*s",
357                          job->job_handle,
358                          (int)job->unique_length, job->unique);
359 
360       gearman_server_client_st *client;
361       for (client= job->client_list; client != NULL; client= client->job_next)
362       {
363         gearmand_error_t ret= gearman_server_io_packet_add(client->con, false,
364                                                            GEARMAN_MAGIC_RESPONSE,
365                                                            GEARMAN_COMMAND_WORK_FAIL,
366                                                            job->job_handle,
367                                                            (size_t)strlen(job->job_handle),
368                                                            NULL);
369         if (gearmand_failed(ret))
370         {
371           return ret;
372         }
373       }
374 
375       /* Remove from persistent queue if one exists. */
376       if (job->job_queued)
377       {
378         gearmand_error_t ret= gearman_queue_done(Server,
379                                                  job->unique, job->unique_length,
380                                                  job->function->function_name,
381                                                  job->function->function_name_size);
382         if (ret != GEARMAN_SUCCESS)
383         {
384           return ret;
385         }
386       }
387 
388       gearman_server_job_free(job);
389       return GEARMAN_SUCCESS;
390     }
391 
392     GEARMAN_LIST_DEL(job->worker->job, job, worker_);
393     job->worker= NULL;
394     job->function->job_running--;
395     job->function_next= NULL;
396     job->numerator= 0;
397     job->denominator= 0;
398   }
399 
400   /* Queue NOOP for possible sleeping workers. */
401   if (job->function->worker_list != NULL)
402   {
403     gearman_server_worker_st *worker= job->function->worker_list;
404     uint32_t noop_sent= 0;
405 
406     do
407     {
408       if (worker->con->is_sleeping && ! (worker->con->is_noop_sent))
409       {
410         gearmand_error_t ret= gearman_server_io_packet_add(worker->con, false,
411                                                            GEARMAN_MAGIC_RESPONSE,
412                                                            GEARMAN_COMMAND_NOOP, NULL);
413         if (gearmand_failed(ret))
414         {
415           gearmand_gerror("gearman_server_io_packet_add", ret);
416           return ret;
417         }
418 
419         worker->con->is_noop_sent= true;
420         noop_sent++;
421       }
422 
423       worker= worker->function_next;
424     }
425     while (worker != job->function->worker_list &&
426            (Server->worker_wakeup == 0 ||
427             noop_sent < Server->worker_wakeup));
428 
429     job->function->worker_list= worker;
430   }
431 
432   /* Queue the job to be run. */
433   if (job->function->job_list[job->priority] == NULL)
434   {
435     job->function->job_list[job->priority]= job;
436   }
437   else
438   {
439     job->function->job_end[job->priority]->function_next= job;
440   }
441 
442   job->function->job_end[job->priority]= job;
443   job->function->job_count++;
444 
445   return GEARMAN_SUCCESS;
446 }
447