1 /*
2 * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <syslog.h>
29 #include <inttypes.h>
30 //#include <fcntl.h>
31 //#include <sys/ioctl.h>
32 #include <limits.h>
33 #include <signal.h>
34 #include <pthread.h>
35 #include <errno.h>
36
37 #include "main.h"
38 #include "cfg.h"
39 #include "pcqueue.h"
40 #include "datapack.h"
41 #include "massert.h"
42
43 #include "mainserv.h"
44 #include "hddspacemgr.h"
45 #include "replicator.h"
46
47 #define JHASHSIZE 0x400
48 #define JHASHPOS(id) ((id)&0x3FF)
49
50 enum {
51 JSTATE_DISABLED,
52 JSTATE_ENABLED,
53 JSTATE_INPROGRESS
54 };
55
56 enum {
57 OP_EXIT=0,
58 OP_INVAL,
59 // OP_MAINSERV,
60 OP_CHUNKOP,
61 // OP_OPEN,
62 // OP_CLOSE,
63 // OP_READ,
64 // OP_WRITE,
65 OP_SERV_READ,
66 OP_SERV_WRITE,
67 OP_REPLICATE,
68 OP_GETBLOCKS,
69 OP_GETCHECKSUM,
70 OP_GETCHECKSUMTAB
71 };
72
73 // for OP_CHUNKOP
74 typedef struct _chunk_op_args {
75 uint64_t chunkid,copychunkid;
76 uint32_t version,newversion,copyversion;
77 uint32_t length;
78 } chunk_op_args;
79 /*
80 // for OP_OPEN and OP_CLOSE
81 typedef struct _chunk_oc_args {
82 uint64_t chunkid;
83 uint32_t version;
84 } chunk_oc_args;
85
86 // for OP_READ
87 typedef struct _chunk_rd_args {
88 uint64_t chunkid;
89 uint32_t version;
90 uint32_t offset,size;
91 uint16_t blocknum;
92 uint8_t *buffer;
93 uint8_t *crcbuff;
94 } chunk_rd_args;
95
96 // for OP_WRITE
97 typedef struct _chunk_wr_args {
98 uint64_t chunkid;
99 uint32_t version;
100 uint32_t offset,size;
101 uint16_t blocknum;
102 const uint8_t *buffer;
103 const uint8_t *crcbuff;
104 } chunk_wr_args;
105 */
106
107 // for OP_SERV_READ and OP_SERV_WRITE
108 typedef struct _chunk_rw_args {
109 int sock;
110 const uint8_t *packet;
111 uint32_t length;
112 } chunk_rw_args;
113
114 // for OP_REPLICATE
115 typedef struct _chunk_rp_args {
116 uint64_t chunkid;
117 uint32_t version;
118 uint8_t srccnt;
119 } chunk_rp_args;
120
121 // for OP_GETBLOCKS, OP_GETCHECKSUM and OP_GETCHECKSUMTAB
122 typedef struct _chunk_ij_args {
123 uint64_t chunkid;
124 uint32_t version;
125 void *pointer;
126 } chunk_ij_args;
127
128 typedef struct _job {
129 uint32_t jobid;
130 void (*callback)(uint8_t status,void *extra);
131 void *extra;
132 void *args;
133 uint8_t jstate;
134 struct _job *next;
135 } job;
136
137 typedef struct _jobpool {
138 int rpipe,wpipe;
139 int32_t fdpdescpos;
140 uint32_t workers_max;
141 uint32_t workers_max_idle;
142 uint32_t workers_avail;
143 uint32_t workers_total;
144 uint32_t workers_term_waiting;
145 pthread_cond_t worker_term_cond;
146 pthread_mutex_t pipelock;
147 pthread_mutex_t jobslock;
148 void *jobqueue;
149 void *statusqueue;
150 job* jobhash[JHASHSIZE];
151 uint32_t nextjobid;
152 } jobpool;
153
154 typedef struct _worker {
155 pthread_t thread_id;
156 jobpool *jp;
157 } worker;
158
159 static jobpool* globalpool = NULL;
160
161 static uint32_t stats_maxjobscnt = 0;
162 static uint32_t last_maxjobscnt = 0;
163
164 static uint8_t exiting;
165
job_stats(uint32_t * maxjobscnt)166 void job_stats(uint32_t *maxjobscnt) {
167 *maxjobscnt = last_maxjobscnt = stats_maxjobscnt;
168 stats_maxjobscnt = 0;
169 }
170
job_getload(void)171 uint32_t job_getload(void) {
172 return last_maxjobscnt;
173 }
174
job_send_status(jobpool * jp,uint32_t jobid,uint8_t status)175 static inline void job_send_status(jobpool *jp,uint32_t jobid,uint8_t status) {
176 zassert(pthread_mutex_lock(&(jp->pipelock)));
177 if (queue_isempty(jp->statusqueue)) { // first status
178 eassert(write(jp->wpipe,&status,1)==1); // write anything to wake up select
179 }
180 queue_put(jp->statusqueue,jobid,status,NULL,1);
181 zassert(pthread_mutex_unlock(&(jp->pipelock)));
182 return;
183 }
184
job_receive_status(jobpool * jp,uint32_t * jobid,uint8_t * status)185 static inline int job_receive_status(jobpool *jp,uint32_t *jobid,uint8_t *status) {
186 uint32_t qstatus;
187 zassert(pthread_mutex_lock(&(jp->pipelock)));
188 queue_get(jp->statusqueue,jobid,&qstatus,NULL,NULL);
189 *status = qstatus;
190 if (queue_isempty(jp->statusqueue)) {
191 eassert(read(jp->rpipe,&qstatus,1)==1); // make pipe empty
192 zassert(pthread_mutex_unlock(&(jp->pipelock)));
193 return 0; // last element
194 }
195 zassert(pthread_mutex_unlock(&(jp->pipelock)));
196 return 1; // not last
197 }
198
199 void* job_worker(void *arg);
200
201 static uint32_t lastnotify = 0;
202
job_spawn_worker(jobpool * jp)203 static inline void job_spawn_worker(jobpool *jp) {
204 worker *w;
205
206 w = malloc(sizeof(worker));
207 passert(w);
208 w->jp = jp;
209 if (main_minthread_create(&(w->thread_id),0,job_worker,w)<0) {
210 return;
211 }
212 jp->workers_avail++;
213 jp->workers_total++;
214 if (jp->workers_total%10==0 && lastnotify!=jp->workers_total) {
215 syslog(LOG_NOTICE,"workers: %"PRIu32"+",jp->workers_total);
216 lastnotify = jp->workers_total;
217 }
218 // syslog(LOG_NOTICE,"jobs: spawn worker (total: %"PRIu32")",jp->workers_total);
219 }
220
job_close_worker(worker * w)221 static inline void job_close_worker(worker *w) {
222 jobpool *jp = w->jp;
223 jp->workers_avail--;
224 jp->workers_total--;
225 if (jp->workers_total==0 && jp->workers_term_waiting) {
226 zassert(pthread_cond_signal(&(jp->worker_term_cond)));
227 jp->workers_term_waiting--;
228 }
229 pthread_detach(w->thread_id);
230 free(w);
231 if (jp->workers_total%10==0 && lastnotify!=jp->workers_total) {
232 syslog(LOG_NOTICE,"workers: %"PRIu32"-",jp->workers_total);
233 lastnotify = jp->workers_total;
234 }
235 // syslog(LOG_NOTICE,"jobs: close worker (total: %"PRIu32")",jp->workers_total);
236 }
237
238 #define opargs ((chunk_op_args*)(jptr->args))
239 // #define ocargs ((chunk_oc_args*)(jptr->args))
240 // #define rdargs ((chunk_rd_args*)(jptr->args))
241 // #define wrargs ((chunk_wr_args*)(jptr->args))
242 #define rwargs ((chunk_rw_args*)(jptr->args))
243 #define rpargs ((chunk_rp_args*)(jptr->args))
244 #define ijargs ((chunk_ij_args*)(jptr->args))
job_worker(void * arg)245 void* job_worker(void *arg) {
246 worker *w = (worker*)arg;
247 jobpool *jp = w->jp;
248 job *jptr;
249 uint8_t *jptrarg;
250 uint8_t status,jstate;
251 uint32_t jobid;
252 uint32_t op;
253
254 // syslog(LOG_NOTICE,"worker %p started (jobqueue: %p ; jptr:%p ; jptrarg:%p ; status:%p )",(void*)pthread_self(),jp->jobqueue,(void*)&jptr,(void*)&jptrarg,(void*)&status);
255 for (;;) {
256 queue_get(jp->jobqueue,&jobid,&op,&jptrarg,NULL);
257 // syslog(LOG_NOTICE,"job worker got job: %"PRIu32",%"PRIu32,jobid,op);
258 jptr = (job*)jptrarg;
259 zassert(pthread_mutex_lock(&(jp->jobslock)));
260 if (jobid==0 && op==0 && jptrarg==NULL) { // queue has been closed
261 job_close_worker(w);
262 zassert(pthread_mutex_unlock(&(jp->jobslock)));
263 return NULL;
264 }
265 jp->workers_avail--;
266 if (jp->workers_avail==0 && jp->workers_total<jp->workers_max) {
267 job_spawn_worker(jp);
268 }
269 if (jptr!=NULL) {
270 jstate=jptr->jstate;
271 if (jptr->jstate==JSTATE_ENABLED) {
272 jptr->jstate=JSTATE_INPROGRESS;
273 }
274 } else {
275 jstate=JSTATE_DISABLED;
276 }
277 zassert(pthread_mutex_unlock(&(jp->jobslock)));
278 switch (op) {
279 case OP_INVAL:
280 status = ERROR_EINVAL;
281 break;
282 /*
283 case OP_MAINSERV:
284 if (jstate==JSTATE_DISABLED) {
285 status = ERROR_NOTDONE;
286 } else {
287 mainserv_serve(*((int*)(jptr->args)));
288 status = STATUS_OK;
289 }
290 break;
291 */
292 case OP_CHUNKOP:
293 if (jstate==JSTATE_DISABLED) {
294 status = ERROR_NOTDONE;
295 } else {
296 status = hdd_chunkop(opargs->chunkid,opargs->version,opargs->newversion,opargs->copychunkid,opargs->copyversion,opargs->length);
297 }
298 break;
299 /*
300 case OP_OPEN:
301 if (jstate==JSTATE_DISABLED) {
302 status = ERROR_NOTDONE;
303 } else {
304 status = hdd_open(ocargs->chunkid,ocargs->version);
305 }
306 break;
307 case OP_CLOSE:
308 if (jstate==JSTATE_DISABLED) {
309 status = ERROR_NOTDONE;
310 } else {
311 status = hdd_close(ocargs->chunkid);
312 }
313 break;
314 case OP_READ:
315 if (jstate==JSTATE_DISABLED) {
316 status = ERROR_NOTDONE;
317 } else {
318 status = hdd_read(rdargs->chunkid,rdargs->version,rdargs->blocknum,rdargs->buffer,rdargs->offset,rdargs->size,rdargs->crcbuff);
319 }
320 break;
321 case OP_WRITE:
322 if (jstate==JSTATE_DISABLED) {
323 status = ERROR_NOTDONE;
324 } else {
325 status = hdd_write(wrargs->chunkid,wrargs->version,wrargs->blocknum,wrargs->buffer,wrargs->offset,wrargs->size,wrargs->crcbuff);
326 }
327 break;
328 */
329 case OP_SERV_READ:
330 if (jstate==JSTATE_DISABLED) {
331 status = ERROR_NOTDONE;
332 } else {
333 status = mainserv_read(rwargs->sock,rwargs->packet,rwargs->length);
334 }
335 break;
336 case OP_SERV_WRITE:
337 if (jstate==JSTATE_DISABLED) {
338 status = ERROR_NOTDONE;
339 } else {
340 status = mainserv_write(rwargs->sock,rwargs->packet,rwargs->length);
341 }
342 break;
343 case OP_REPLICATE:
344 if (jstate==JSTATE_DISABLED) {
345 status = ERROR_NOTDONE;
346 } else {
347 status = replicate(rpargs->chunkid,rpargs->version,rpargs->srccnt,((uint8_t*)(jptr->args))+sizeof(chunk_rp_args));
348 }
349 break;
350 case OP_GETBLOCKS:
351 if (jstate==JSTATE_DISABLED) {
352 status = ERROR_NOTDONE;
353 } else {
354 status = hdd_get_blocks(ijargs->chunkid,ijargs->version,ijargs->pointer);
355 }
356 break;
357 case OP_GETCHECKSUM:
358 if (jstate==JSTATE_DISABLED) {
359 status = ERROR_NOTDONE;
360 } else {
361 status = hdd_get_checksum(ijargs->chunkid,ijargs->version,ijargs->pointer);
362 }
363 break;
364 case OP_GETCHECKSUMTAB:
365 if (jstate==JSTATE_DISABLED) {
366 status = ERROR_NOTDONE;
367 } else {
368 status = hdd_get_checksum_tab(ijargs->chunkid,ijargs->version,ijargs->pointer);
369 }
370 break;
371 default: // OP_EXIT
372 // syslog(LOG_NOTICE,"worker %p exiting (jobqueue: %p)",(void*)pthread_self(),jp->jobqueue);
373 zassert(pthread_mutex_lock(&(jp->jobslock)));
374 job_close_worker(w);
375 zassert(pthread_mutex_unlock(&(jp->jobslock)));
376 return NULL;
377 }
378 job_send_status(jp,jobid,status);
379 zassert(pthread_mutex_lock(&(jp->jobslock)));
380 jp->workers_avail++;
381 if (jp->workers_avail > jp->workers_max_idle) {
382 job_close_worker(w);
383 zassert(pthread_mutex_unlock(&(jp->jobslock)));
384 return NULL;
385 }
386 zassert(pthread_mutex_unlock(&(jp->jobslock)));
387 }
388 }
389
job_new(jobpool * jp,uint32_t op,void * args,void (* callback)(uint8_t status,void * extra),void * extra)390 static inline uint32_t job_new(jobpool *jp,uint32_t op,void *args,void (*callback)(uint8_t status,void *extra),void *extra) {
391 // jobpool* jp = (jobpool*)jpool;
392 if (exiting) {
393 if (callback) {
394 callback(ERROR_NOTDONE,extra);
395 }
396 if (args) {
397 free(args);
398 }
399 return 0;
400 } else {
401 uint32_t jobid = jp->nextjobid;
402 uint32_t jhpos = JHASHPOS(jobid);
403 job *jptr;
404 jptr = malloc(sizeof(job));
405 passert(jptr);
406 jptr->jobid = jobid;
407 jptr->callback = callback;
408 jptr->extra = extra;
409 jptr->args = args;
410 jptr->jstate = JSTATE_ENABLED;
411 jptr->next = jp->jobhash[jhpos];
412 jp->jobhash[jhpos] = jptr;
413 queue_put(jp->jobqueue,jobid,op,(uint8_t*)jptr,1);
414 jp->nextjobid++;
415 if (jp->nextjobid==0) {
416 jp->nextjobid=1;
417 }
418 return jobid;
419 }
420 }
421
422 /* interface */
423
job_pool_new(uint32_t jobs)424 void* job_pool_new(uint32_t jobs) {
425 int fd[2];
426 uint32_t i;
427 jobpool* jp;
428
429 if (pipe(fd)<0) {
430 return NULL;
431 }
432 jp=malloc(sizeof(jobpool));
433 passert(jp);
434 // syslog(LOG_WARNING,"new pool of workers (%p:%"PRIu8")",(void*)jp,workers);
435 jp->rpipe = fd[0];
436 jp->wpipe = fd[1];
437 jp->workers_avail = 0;
438 jp->workers_total = 0;
439 jp->workers_term_waiting = 0;
440 zassert(pthread_cond_init(&(jp->worker_term_cond),NULL));
441 zassert(pthread_mutex_init(&(jp->pipelock),NULL));
442 zassert(pthread_mutex_init(&(jp->jobslock),NULL));
443 jp->jobqueue = queue_new(jobs);
444 // syslog(LOG_WARNING,"new jobqueue: %p",jp->jobqueue);
445 jp->statusqueue = queue_new(0);
446 for (i=0 ; i<JHASHSIZE ; i++) {
447 jp->jobhash[i]=NULL;
448 }
449 jp->nextjobid = 1;
450 zassert(pthread_mutex_lock(&(jp->jobslock)));
451 job_spawn_worker(jp);
452 zassert(pthread_mutex_unlock(&(jp->jobslock)));
453 return jp;
454 }
455
job_pool_jobs_count(void)456 uint32_t job_pool_jobs_count(void) {
457 jobpool* jp = globalpool;
458 uint32_t res;
459 zassert(pthread_mutex_lock(&(jp->jobslock)));
460 res = (jp->workers_total - jp->workers_avail) + queue_elements(jp->jobqueue);
461 zassert(pthread_mutex_unlock(&(jp->jobslock)));
462 return res;
463 }
464
465 /*
466 void job_pool_disable_and_change_callback_all(void (*callback)(uint8_t status,void *extra)) {
467 jobpool* jp = globalpool;
468 uint32_t jhpos;
469 job *jptr;
470
471 zassert(pthread_mutex_lock(&(jp->jobslock)));
472 for (jhpos = 0 ; jhpos<JHASHSIZE ; jhpos++) {
473 for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
474 if (jptr->jstate==JSTATE_ENABLED) {
475 jptr->jstate=JSTATE_DISABLED;
476 }
477 jptr->callback=callback;
478 }
479 }
480 zassert(pthread_mutex_unlock(&(jp->jobslock)));
481 }
482 */
483
job_pool_disable_job(uint32_t jobid)484 void job_pool_disable_job(uint32_t jobid) {
485 jobpool* jp = globalpool;
486 uint32_t jhpos = JHASHPOS(jobid);
487 job *jptr;
488 for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
489 if (jptr->jobid==jobid) {
490 zassert(pthread_mutex_lock(&(jp->jobslock)));
491 if (jptr->jstate==JSTATE_ENABLED) {
492 jptr->jstate=JSTATE_DISABLED;
493 }
494 zassert(pthread_mutex_unlock(&(jp->jobslock)));
495 }
496 }
497 }
498
job_pool_change_callback(uint32_t jobid,void (* callback)(uint8_t status,void * extra),void * extra)499 void job_pool_change_callback(uint32_t jobid,void (*callback)(uint8_t status,void *extra),void *extra) {
500 jobpool* jp = globalpool;
501 uint32_t jhpos = JHASHPOS(jobid);
502 job *jptr;
503 for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
504 if (jptr->jobid==jobid) {
505 jptr->callback=callback;
506 jptr->extra=extra;
507 }
508 }
509 }
510
job_pool_check_jobs(uint8_t cb)511 void job_pool_check_jobs(uint8_t cb) {
512 jobpool* jp = globalpool;
513 uint32_t jobid,jhpos;
514 uint8_t status;
515 int notlast;
516 job **jhandle,*jptr;
517 do {
518 notlast = job_receive_status(jp,&jobid,&status);
519 jhpos = JHASHPOS(jobid);
520 jhandle = jp->jobhash+jhpos;
521 while ((jptr = *jhandle)) {
522 if (jptr->jobid==jobid) {
523 if (jptr->callback && cb) {
524 jptr->callback(status,jptr->extra);
525 }
526 *jhandle = jptr->next;
527 if (jptr->args) {
528 free(jptr->args);
529 }
530 free(jptr);
531 break;
532 } else {
533 jhandle = &(jptr->next);
534 }
535 }
536 } while (notlast);
537 }
538
job_pool_delete(jobpool * jp)539 void job_pool_delete(jobpool* jp) {
540 queue_close(jp->jobqueue);
541 zassert(pthread_mutex_lock(&(jp->jobslock)));
542 while (jp->workers_total>0) {
543 jp->workers_term_waiting++;
544 zassert(pthread_cond_wait(&(jp->worker_term_cond),&(jp->jobslock)));
545 }
546 zassert(pthread_mutex_unlock(&(jp->jobslock)));
547 if (!queue_isempty(jp->statusqueue)) {
548 job_pool_check_jobs(0);
549 }
550 // syslog(LOG_NOTICE,"deleting jobqueue: %p",jp->jobqueue);
551 queue_delete(jp->jobqueue);
552 queue_delete(jp->statusqueue);
553 zassert(pthread_cond_destroy(&(jp->worker_term_cond)));
554 zassert(pthread_mutex_destroy(&(jp->pipelock)));
555 zassert(pthread_mutex_destroy(&(jp->jobslock)));
556 close(jp->rpipe);
557 close(jp->wpipe);
558 free(jp);
559 }
560
job_inval(void (* callback)(uint8_t status,void * extra),void * extra)561 uint32_t job_inval(void (*callback)(uint8_t status,void *extra),void *extra) {
562 jobpool* jp = globalpool;
563 return job_new(jp,OP_INVAL,NULL,callback,extra);
564 }
565
566 /*
567 uint32_t job_mainserv(int sock) {
568 jobpool* jp = globalpool;
569 int *args;
570 args = malloc(sizeof(int));
571 passert(args);
572 *args = sock;
573 return job_new(jp,OP_MAINSERV,args,NULL,NULL);
574 }
575 */
576
job_chunkop(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint32_t newversion,uint64_t copychunkid,uint32_t copyversion,uint32_t length)577 uint32_t job_chunkop(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint32_t newversion,uint64_t copychunkid,uint32_t copyversion,uint32_t length) {
578 jobpool* jp = globalpool;
579 chunk_op_args *args;
580 args = malloc(sizeof(chunk_op_args));
581 passert(args);
582 args->chunkid = chunkid;
583 args->version = version;
584 args->newversion = newversion;
585 args->copychunkid = copychunkid;
586 args->copyversion = copyversion;
587 args->length = length;
588 return job_new(jp,OP_CHUNKOP,args,callback,extra);
589 }
590 /*
591 uint32_t job_open(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version) {
592 jobpool* jp = globalpool;
593 chunk_oc_args *args;
594 args = malloc(sizeof(chunk_oc_args));
595 passert(args);
596 args->chunkid = chunkid;
597 args->version = version;
598 return job_new(jp,OP_OPEN,args,callback,extra);
599 }
600
601 uint32_t job_close(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid) {
602 jobpool* jp = globalpool;
603 chunk_oc_args *args;
604 args = malloc(sizeof(chunk_oc_args));
605 passert(args);
606 args->chunkid = chunkid;
607 args->version = 0;
608 return job_new(jp,OP_CLOSE,args,callback,extra);
609 }
610
611 uint32_t job_read(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint16_t blocknum,uint8_t *buffer,uint32_t offset,uint32_t size,uint8_t *crcbuff) {
612 jobpool* jp = globalpool;
613 chunk_rd_args *args;
614 args = malloc(sizeof(chunk_rd_args));
615 passert(args);
616 args->chunkid = chunkid;
617 args->version = version;
618 args->blocknum = blocknum;
619 args->buffer = buffer;
620 args->offset = offset;
621 args->size = size;
622 args->crcbuff = crcbuff;
623 return job_new(jp,OP_READ,args,callback,extra);
624 }
625
626 uint32_t job_write(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint16_t blocknum,const uint8_t *buffer,uint32_t offset,uint32_t size,const uint8_t *crcbuff) {
627 jobpool* jp = globalpool;
628 chunk_wr_args *args;
629 args = malloc(sizeof(chunk_wr_args));
630 passert(args);
631 args->chunkid = chunkid;
632 args->version = version;
633 args->blocknum = blocknum;
634 args->buffer = buffer;
635 args->offset = offset;
636 args->size = size;
637 args->crcbuff = crcbuff;
638 return job_new(jp,OP_WRITE,args,callback,extra);
639 }
640 */
641
job_serv_read(void (* callback)(uint8_t status,void * extra),void * extra,int sock,const uint8_t * packet,uint32_t length)642 uint32_t job_serv_read(void (*callback)(uint8_t status,void *extra),void *extra,int sock,const uint8_t *packet,uint32_t length) {
643 jobpool* jp = globalpool;
644 chunk_rw_args *args;
645 args = malloc(sizeof(chunk_rw_args));
646 passert(args);
647 args->sock = sock;
648 args->packet = packet;
649 args->length = length;
650 return job_new(jp,OP_SERV_READ,args,callback,extra);
651 }
652
job_serv_write(void (* callback)(uint8_t status,void * extra),void * extra,int sock,const uint8_t * packet,uint32_t length)653 uint32_t job_serv_write(void (*callback)(uint8_t status,void *extra),void *extra,int sock,const uint8_t *packet,uint32_t length) {
654 jobpool* jp = globalpool;
655 chunk_rw_args *args;
656 args = malloc(sizeof(chunk_rw_args));
657 passert(args);
658 args->sock = sock;
659 args->packet = packet;
660 args->length = length;
661 return job_new(jp,OP_SERV_WRITE,args,callback,extra);
662 }
663
job_replicate(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint8_t * srcs)664 uint32_t job_replicate(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint8_t *srcs) {
665 jobpool* jp = globalpool;
666 chunk_rp_args *args;
667 uint8_t *ptr;
668 ptr = malloc(sizeof(chunk_rp_args)+srccnt*18);
669 passert(ptr);
670 args = (chunk_rp_args*)ptr;
671 ptr += sizeof(chunk_rp_args);
672 args->chunkid = chunkid;
673 args->version = version;
674 args->srccnt = srccnt;
675 memcpy(ptr,srcs,srccnt*18);
676 return job_new(jp,OP_REPLICATE,args,callback,extra);
677 }
678
job_replicate_simple(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint32_t ip,uint16_t port)679 uint32_t job_replicate_simple(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint32_t ip,uint16_t port) {
680 jobpool* jp = globalpool;
681 chunk_rp_args *args;
682 uint8_t *ptr;
683 ptr = malloc(sizeof(chunk_rp_args)+18);
684 passert(ptr);
685 args = (chunk_rp_args*)ptr;
686 ptr += sizeof(chunk_rp_args);
687 args->chunkid = chunkid;
688 args->version = version;
689 args->srccnt = 1;
690 put64bit(&ptr,chunkid);
691 put32bit(&ptr,version);
692 put32bit(&ptr,ip);
693 put16bit(&ptr,port);
694 return job_new(jp,OP_REPLICATE,args,callback,extra);
695 }
696
job_get_chunk_blocks(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * blocks)697 uint32_t job_get_chunk_blocks(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *blocks) {
698 jobpool* jp = globalpool;
699 chunk_ij_args *args;
700 args = malloc(sizeof(chunk_ij_args));
701 passert(args);
702 args->chunkid = chunkid;
703 args->version = version;
704 args->pointer = blocks;
705 return job_new(jp,OP_GETBLOCKS,args,callback,extra);
706 }
707
job_get_chunk_checksum(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * checksum)708 uint32_t job_get_chunk_checksum(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *checksum) {
709 jobpool* jp = globalpool;
710 chunk_ij_args *args;
711 args = malloc(sizeof(chunk_ij_args));
712 passert(args);
713 args->chunkid = chunkid;
714 args->version = version;
715 args->pointer = checksum;
716 return job_new(jp,OP_GETCHECKSUM,args,callback,extra);
717 }
718
job_get_chunk_checksum_tab(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * checksum_tab)719 uint32_t job_get_chunk_checksum_tab(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *checksum_tab) {
720 jobpool* jp = globalpool;
721 chunk_ij_args *args;
722 args = malloc(sizeof(chunk_ij_args));
723 passert(args);
724 args->chunkid = chunkid;
725 args->version = version;
726 args->pointer = checksum_tab;
727 return job_new(jp,OP_GETCHECKSUMTAB,args,callback,extra);
728 }
729
job_desc(struct pollfd * pdesc,uint32_t * ndesc)730 void job_desc(struct pollfd *pdesc,uint32_t *ndesc) {
731 uint32_t pos = *ndesc;
732 jobpool* jp = globalpool;
733
734 pdesc[pos].fd = jp->rpipe;
735 pdesc[pos].events = POLLIN;
736 jp->fdpdescpos = pos;
737 pos++;
738
739 *ndesc = pos;
740 }
741
job_serve(struct pollfd * pdesc)742 void job_serve(struct pollfd *pdesc) {
743 jobpool* jp = globalpool;
744 uint32_t jobscnt;
745
746 if (jp->fdpdescpos>=0 && (pdesc[jp->fdpdescpos].revents & POLLIN)) {
747 job_pool_check_jobs(1);
748 }
749
750 jobscnt = job_pool_jobs_count();
751 if (jobscnt>=stats_maxjobscnt) {
752 stats_maxjobscnt=jobscnt;
753 }
754 }
755
job_wantexit(void)756 void job_wantexit(void) {
757 exiting = 1;
758 }
759
job_canexit(void)760 int job_canexit(void) {
761 return (job_pool_jobs_count()>0)?0:1;
762 }
763
job_term(void)764 void job_term(void) {
765 job_pool_delete(globalpool);
766 }
767
job_reload(void)768 void job_reload(void) {
769 jobpool* jp = globalpool;
770
771 zassert(pthread_mutex_lock(&(jp->jobslock)));
772
773 jp->workers_max = cfg_getuint32("WORKERS_MAX",150);
774 jp->workers_max_idle = cfg_getuint32("WORKERS_MAX_IDLE",40);
775
776 zassert(pthread_mutex_unlock(&(jp->jobslock)));
777 }
778
job_init(void)779 int job_init(void) {
780 // globalpool = (jobpool*)malloc(sizeof(jobpool));
781 exiting = 0;
782 globalpool = job_pool_new(256);
783
784 job_reload();
785
786 main_destruct_register(job_term);
787 main_wantexit_register(job_wantexit);
788 main_canexit_register(job_canexit);
789 main_reload_register(job_reload);
790 main_poll_register(job_desc,job_serve);
791 return 0;
792 }
793