1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2008 Brian Aker, Eric Day
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39
40
41 /**
42 * @file
43 * @brief Server job definitions
44 */
45
46 #include "gear_config.h"
47 #include "libgearman-server/common.h"
48 #include <string.h>
49
50 #include <libgearman-server/queue.h>
51
52 /*
53 * Private declarations
54 */
55
56 /**
57 * @addtogroup gearman_server_job_private Private Server Job Functions
58 * @ingroup gearman_server_job
59 * @{
60 */
61
62 /**
63 * Get a server job structure from the unique ID. If data_size is non-zero,
64 * then unique points to the workload data and not a real unique key.
65 */
_server_job_get_unique(gearman_server_st * server,uint32_t unique_key,gearman_server_function_st * server_function,const char * unique,size_t data_size)66 static gearman_server_job_st * _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
67 gearman_server_function_st *server_function,
68 const char *unique, size_t data_size)
69 {
70 gearman_server_job_st *server_job;
71
72 for (server_job= server->unique_hash[unique_key % server->hashtable_buckets];
73 server_job != NULL; server_job= server_job->unique_next)
74 {
75 if (data_size == 0)
76 {
77 if (server_job->function == server_function &&
78 server_job->unique_key == unique_key &&
79 !strcmp(server_job->unique, unique))
80 {
81 return server_job;
82 }
83 }
84 else
85 {
86 if (server_job->function == server_function &&
87 server_job->unique_key == unique_key &&
88 server_job->data_size == data_size &&
89 memcmp(server_job->data, unique, data_size) == 0)
90 {
91 return server_job;
92 }
93 }
94 }
95
96 return NULL;
97 }
98
99 /** @} */
100
101 #pragma GCC diagnostic ignored "-Wold-style-cast"
102
103 /*
104 * Public definitions
105 */
gearman_server_job_add(gearman_server_st * server,const char * function_name,size_t function_name_size,const char * unique,size_t unique_size,const void * data,size_t data_size,gearman_job_priority_t priority,gearman_server_client_st * server_client,gearmand_error_t * ret_ptr,int64_t when)106 gearman_server_job_st * gearman_server_job_add(gearman_server_st *server,
107 const char *function_name, size_t function_name_size,
108 const char *unique, size_t unique_size,
109 const void *data, size_t data_size,
110 gearman_job_priority_t priority,
111 gearman_server_client_st *server_client,
112 gearmand_error_t *ret_ptr,
113 int64_t when)
114 {
115 return gearman_server_job_add_reducer(server,
116 function_name, function_name_size,
117 unique, unique_size,
118 NULL, 0, // reducer
119 data, data_size,
120 priority, server_client, ret_ptr, when);
121 }
122
123 gearman_server_job_st *
gearman_server_job_add_reducer(gearman_server_st * server,const char * function_name,size_t function_name_size,const char * unique,size_t unique_size,const char * reducer_name,size_t reducer_size,const void * data,size_t data_size,gearman_job_priority_t priority,gearman_server_client_st * server_client,gearmand_error_t * ret_ptr,int64_t when)124 gearman_server_job_add_reducer(gearman_server_st *server,
125 const char *function_name, size_t function_name_size,
126 const char *unique, size_t unique_size,
127 const char *reducer_name, size_t reducer_size,
128 const void *data, size_t data_size,
129 gearman_job_priority_t priority,
130 gearman_server_client_st *server_client,
131 gearmand_error_t *ret_ptr,
132 int64_t when)
133 {
134 gearman_server_function_st *server_function= gearman_server_function_get(server, function_name, function_name_size);
135 if (server_function == NULL)
136 {
137 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
138 return NULL;
139 }
140
141 uint32_t key;
142 gearman_server_job_st *server_job;
143 if (unique_size == 0)
144 {
145 server_job= NULL;
146 key= 0;
147 }
148 else
149 {
150 if (unique_size == 1 && *unique == '-')
151 {
152 if (data_size == 0)
153 {
154 key= 0;
155 server_job= NULL;
156 }
157 else
158 {
159 /* Look up job via unique data when unique = '-'. */
160 key= _server_job_hash((const char*)data, data_size);
161 server_job= _server_job_get_unique(server, key, server_function, (const char*)data, data_size);
162 }
163 }
164 else
165 {
166 /* Look up job via unique ID first to make sure it's not a duplicate. */
167 key= _server_job_hash(unique, unique_size);
168 server_job= _server_job_get_unique(server, key, server_function, unique, 0);
169 }
170 }
171
172 if (server_job == NULL)
173 {
174 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Comparing queue %u to limit %u for priority %u",
175 server_function->job_total, server_function->max_queue_size[priority],
176 priority);
177 if (server_function->max_queue_size[priority] > 0 &&
178 server_function->job_total >= server_function->max_queue_size[priority])
179 {
180 *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
181 return NULL;
182 }
183
184 server_job= gearman_server_job_create(server);
185 if (server_job == NULL)
186 {
187 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
188 return NULL;
189 }
190
191 server_job->priority= priority;
192
193 server_job->function= server_function;
194 server_function->job_total++;
195
196 int checked_length;
197 checked_length= snprintf(server_job->job_handle, GEARMAND_JOB_HANDLE_SIZE, "%s:%u",
198 server->job_handle_prefix, server->job_handle_count);
199
200 if (checked_length >= GEARMAND_JOB_HANDLE_SIZE || checked_length < 0)
201 {
202 gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Job handle plus handle count beyond GEARMAND_JOB_HANDLE_SIZE: %s:%u",
203 server->job_handle_prefix, server->job_handle_count);
204 }
205
206 server_job->unique_length= unique_size;
207 checked_length= snprintf(server_job->unique, GEARMAN_MAX_UNIQUE_SIZE, "%.*s",
208 (int)unique_size, unique);
209 if (checked_length >= GEARMAN_MAX_UNIQUE_SIZE || checked_length < 0)
210 {
211 gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "We received a unique beyond GEARMAN_MAX_UNIQUE_SIZE: %.*s", (int)unique_size, unique);
212 }
213
214 server->job_handle_count++;
215 server_job->data= data;
216 server_job->data_size= data_size;
217 server_job->when= when;
218
219 if (reducer_size)
220 {
221 strncpy(server_job->reducer, reducer_name, reducer_size);
222 server_job->reducer[reducer_size]= 0;
223 }
224 else
225 {
226 server_job->reducer[0]= 0;
227 }
228
229 server_job->unique_key= key;
230 key= key % server->hashtable_buckets;
231 GEARMAN_HASH_ADD(server->unique, key, server_job, unique_);
232
233 key= _server_job_hash(server_job->job_handle,
234 strlen(server_job->job_handle));
235 server_job->job_handle_key= key;
236 key= key % server->hashtable_buckets;
237 GEARMAN_HASH__ADD(server->job, key, server_job);
238
239 if (server->state.queue_startup)
240 {
241 server_job->job_queued= true;
242 }
243 else if (server_client == NULL)
244 {
245 *ret_ptr= gearman_queue_add(server,
246 server_job->unique, unique_size,
247 function_name,
248 function_name_size,
249 data, data_size, priority,
250 when);
251 if (gearmand_failed(*ret_ptr))
252 {
253 server_job->data= NULL;
254 gearman_server_job_free(server_job);
255 return NULL;
256 }
257
258 {
259 *ret_ptr= gearman_queue_flush(server);
260 if (*ret_ptr != GEARMAN_SUCCESS)
261 {
262 server_job->data= NULL;
263 gearman_server_job_free(server_job);
264 return NULL;
265 }
266 }
267
268 server_job->job_queued= true;
269 }
270
271 *ret_ptr= gearman_server_job_queue(server_job);
272 if (gearmand_failed(*ret_ptr))
273 {
274 if (server_client == NULL)
275 {
276 /* Do our best to remove the job from the queue. */
277 (void)gearman_queue_done(server,
278 server_job->unique, unique_size,
279 server_job->function->function_name,
280 server_job->function->function_name_size);
281 }
282
283 gearman_server_job_free(server_job);
284 return NULL;
285 }
286 }
287 else
288 {
289 *ret_ptr= GEARMAN_JOB_EXISTS;
290 }
291
292 if (server_client)
293 {
294 server_client->job= server_job;
295 GEARMAN_LIST_ADD(server_job->client, server_client, job_);
296 }
297
298 return server_job;
299 }
300
gearman_server_job_free(gearman_server_job_st * server_job)301 void gearman_server_job_free(gearman_server_job_st *server_job)
302 {
303 if (server_job == NULL)
304 {
305 return;
306 }
307
308 if (server_job->worker != NULL)
309 {
310 server_job->function->job_running--;
311 }
312
313 server_job->function->job_total--;
314
315 if (server_job->data != NULL)
316 {
317 free((void *)(server_job->data));
318 server_job->data= NULL;
319 }
320
321 while (server_job->client_list != NULL)
322 {
323 gearman_server_client_free(server_job->client_list);
324 }
325
326 if (server_job->worker != NULL)
327 {
328 GEARMAN_LIST_DEL(server_job->worker->job, server_job, worker_);
329 }
330
331 uint32_t key= server_job->unique_key % Server->hashtable_buckets;
332 GEARMAN_HASH_DEL(Server->unique, key, server_job, unique_);
333
334 key= server_job->job_handle_key % Server->hashtable_buckets;
335 GEARMAN_HASH__DEL(Server->job, key, server_job);
336
337 if (Server->free_job_count < GEARMAN_MAX_FREE_SERVER_JOB)
338 {
339 gearman_server_st *server= Server;
340 GEARMAN_LIST__ADD(server->free_job, server_job);
341 }
342 else
343 {
344 destroy_gearman_server_job_st(server_job);
345 }
346 }
347
gearman_server_job_queue(gearman_server_job_st * job)348 gearmand_error_t gearman_server_job_queue(gearman_server_job_st *job)
349 {
350 if (job->worker)
351 {
352 job->retries++;
353 if (Server->job_retries != 0 && Server->job_retries == job->retries)
354 {
355 gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
356 "Dropped job due to max retry count: %s %.*s",
357 job->job_handle,
358 (int)job->unique_length, job->unique);
359
360 gearman_server_client_st *client;
361 for (client= job->client_list; client != NULL; client= client->job_next)
362 {
363 gearmand_error_t ret= gearman_server_io_packet_add(client->con, false,
364 GEARMAN_MAGIC_RESPONSE,
365 GEARMAN_COMMAND_WORK_FAIL,
366 job->job_handle,
367 (size_t)strlen(job->job_handle),
368 NULL);
369 if (gearmand_failed(ret))
370 {
371 return ret;
372 }
373 }
374
375 /* Remove from persistent queue if one exists. */
376 if (job->job_queued)
377 {
378 gearmand_error_t ret= gearman_queue_done(Server,
379 job->unique, job->unique_length,
380 job->function->function_name,
381 job->function->function_name_size);
382 if (ret != GEARMAN_SUCCESS)
383 {
384 return ret;
385 }
386 }
387
388 gearman_server_job_free(job);
389 return GEARMAN_SUCCESS;
390 }
391
392 GEARMAN_LIST_DEL(job->worker->job, job, worker_);
393 job->worker= NULL;
394 job->function->job_running--;
395 job->function_next= NULL;
396 job->numerator= 0;
397 job->denominator= 0;
398 }
399
400 /* Queue NOOP for possible sleeping workers. */
401 if (job->function->worker_list != NULL)
402 {
403 gearman_server_worker_st *worker= job->function->worker_list;
404 uint32_t noop_sent= 0;
405
406 do
407 {
408 if (worker->con->is_sleeping && ! (worker->con->is_noop_sent))
409 {
410 gearmand_error_t ret= gearman_server_io_packet_add(worker->con, false,
411 GEARMAN_MAGIC_RESPONSE,
412 GEARMAN_COMMAND_NOOP, NULL);
413 if (gearmand_failed(ret))
414 {
415 gearmand_gerror("gearman_server_io_packet_add", ret);
416 return ret;
417 }
418
419 worker->con->is_noop_sent= true;
420 noop_sent++;
421 }
422
423 worker= worker->function_next;
424 }
425 while (worker != job->function->worker_list &&
426 (Server->worker_wakeup == 0 ||
427 noop_sent < Server->worker_wakeup));
428
429 job->function->worker_list= worker;
430 }
431
432 /* Queue the job to be run. */
433 if (job->function->job_list[job->priority] == NULL)
434 {
435 job->function->job_list[job->priority]= job;
436 }
437 else
438 {
439 job->function->job_end[job->priority]->function_next= job;
440 }
441
442 job->function->job_end[job->priority]= job;
443 job->function->job_count++;
444
445 return GEARMAN_SUCCESS;
446 }
447