1 /*
2 * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <syslog.h>
29 #include <inttypes.h>
30 //#include <fcntl.h>
31 //#include <sys/ioctl.h>
32 #include <limits.h>
33 #include <signal.h>
34 #include <pthread.h>
35 #include <errno.h>
36
37 #include "main.h"
38 #include "cfg.h"
39 #include "pcqueue.h"
40 #include "lwthread.h"
41 #include "datapack.h"
42 #include "massert.h"
43
44 #include "mainserv.h"
45 #include "hddspacemgr.h"
46 #include "replicator.h"
47 #include "masterconn.h"
48
49 #define JHASHSIZE 0x400
50 #define JHASHPOS(id) ((id)&0x3FF)
51
52 enum {
53 JSTATE_DISABLED,
54 JSTATE_ENABLED,
55 JSTATE_INPROGRESS
56 };
57
58 enum {
59 OP_EXIT=0,
60 OP_INVAL,
61 // OP_MAINSERV,
62 OP_CHUNKOP,
63 // OP_OPEN,
64 // OP_CLOSE,
65 // OP_READ,
66 // OP_WRITE,
67 OP_SERV_READ,
68 OP_SERV_WRITE,
69 OP_REPLICATE,
70 OP_GETBLOCKS,
71 OP_GETCHECKSUM,
72 OP_GETCHECKSUMTAB,
73 OP_CHUNKMOVE,
74 };
75
76 // for OP_CHUNKOP
77 typedef struct _chunk_op_args {
78 uint64_t chunkid,copychunkid;
79 uint32_t version,newversion,copyversion;
80 uint32_t length;
81 } chunk_op_args;
82 /*
83 // for OP_OPEN and OP_CLOSE
84 typedef struct _chunk_oc_args {
85 uint64_t chunkid;
86 uint32_t version;
87 } chunk_oc_args;
88
89 // for OP_READ
90 typedef struct _chunk_rd_args {
91 uint64_t chunkid;
92 uint32_t version;
93 uint32_t offset,size;
94 uint16_t blocknum;
95 uint8_t *buffer;
96 uint8_t *crcbuff;
97 } chunk_rd_args;
98
99 // for OP_WRITE
100 typedef struct _chunk_wr_args {
101 uint64_t chunkid;
102 uint32_t version;
103 uint32_t offset,size;
104 uint16_t blocknum;
105 const uint8_t *buffer;
106 const uint8_t *crcbuff;
107 } chunk_wr_args;
108 */
109
110 // for OP_SERV_READ and OP_SERV_WRITE
111 typedef struct _chunk_rw_args {
112 int sock;
113 const uint8_t *packet;
114 uint32_t length;
115 } chunk_rw_args;
116
117 // for OP_REPLICATE
118 typedef struct _chunk_rp_args {
119 uint64_t chunkid;
120 uint32_t version;
121 uint32_t xormasks[4];
122 uint8_t srccnt;
123 } chunk_rp_args;
124
125 // for OP_GETBLOCKS, OP_GETCHECKSUM and OP_GETCHECKSUMTAB
126 typedef struct _chunk_ij_args {
127 uint64_t chunkid;
128 uint32_t version;
129 void *pointer;
130 } chunk_ij_args;
131
132 // for OP_CHUNKMOVE
133 typedef struct _chunk_mv_args {
134 void *fsrc;
135 void *fdst;
136 } chunk_mv_args;
137
138 typedef struct _job {
139 uint32_t jobid;
140 void (*callback)(uint8_t status,void *extra);
141 void *extra;
142 void *args;
143 uint8_t jstate;
144 struct _job *next;
145 } job;
146
147 typedef struct _jobpool {
148 int rpipe,wpipe;
149 int32_t fdpdescpos;
150 uint32_t workers_max;
151 uint32_t workers_himark;
152 uint32_t workers_lomark;
153 uint32_t workers_max_idle;
154 uint32_t workers_avail;
155 uint32_t workers_total;
156 uint32_t workers_term_waiting;
157 pthread_cond_t worker_term_cond;
158 pthread_mutex_t pipelock;
159 pthread_mutex_t jobslock;
160 void *jobqueue;
161 void *statusqueue;
162 job* jobhash[JHASHSIZE];
163 uint32_t nextjobid;
164 } jobpool;
165
166 typedef struct _worker {
167 pthread_t thread_id;
168 jobpool *jp;
169 } worker;
170
171 static jobpool* globalpool = NULL;
172
173 static uint32_t stats_maxjobscnt = 0;
174
175 // static uint8_t exiting;
176
job_stats(uint32_t * maxjobscnt)177 void job_stats(uint32_t *maxjobscnt) {
178 *maxjobscnt = stats_maxjobscnt;
179 stats_maxjobscnt = 0;
180 }
181
job_send_status(jobpool * jp,uint32_t jobid,uint8_t status)182 static inline void job_send_status(jobpool *jp,uint32_t jobid,uint8_t status) {
183 zassert(pthread_mutex_lock(&(jp->pipelock)));
184 if (queue_isempty(jp->statusqueue)) { // first status
185 eassert(write(jp->wpipe,&status,1)==1); // write anything to wake up select
186 }
187 queue_put(jp->statusqueue,jobid,status,NULL,1);
188 zassert(pthread_mutex_unlock(&(jp->pipelock)));
189 return;
190 }
191
job_receive_status(jobpool * jp,uint32_t * jobid,uint8_t * status)192 static inline int job_receive_status(jobpool *jp,uint32_t *jobid,uint8_t *status) {
193 uint32_t qstatus;
194 zassert(pthread_mutex_lock(&(jp->pipelock)));
195 queue_get(jp->statusqueue,jobid,&qstatus,NULL,NULL);
196 *status = qstatus;
197 if (queue_isempty(jp->statusqueue)) {
198 eassert(read(jp->rpipe,&qstatus,1)==1); // make pipe empty
199 zassert(pthread_mutex_unlock(&(jp->pipelock)));
200 return 0; // last element
201 }
202 zassert(pthread_mutex_unlock(&(jp->pipelock)));
203 return 1; // not last
204 }
205
206 void* job_worker(void *arg);
207
208 static uint32_t lastnotify = 0;
209
job_spawn_worker(jobpool * jp)210 static inline void job_spawn_worker(jobpool *jp) {
211 worker *w;
212
213 w = malloc(sizeof(worker));
214 passert(w);
215 w->jp = jp;
216 if (lwt_minthread_create(&(w->thread_id),0,job_worker,w)<0) {
217 return;
218 }
219 jp->workers_avail++;
220 jp->workers_total++;
221 if (jp->workers_total%10==0 && lastnotify!=jp->workers_total) {
222 syslog(LOG_NOTICE,"workers: %"PRIu32"+",jp->workers_total);
223 lastnotify = jp->workers_total;
224 }
225 // syslog(LOG_NOTICE,"jobs: spawn worker (total: %"PRIu32")",jp->workers_total);
226 }
227
job_close_worker(worker * w)228 static inline void job_close_worker(worker *w) {
229 jobpool *jp = w->jp;
230 jp->workers_avail--;
231 jp->workers_total--;
232 if (jp->workers_total==0 && jp->workers_term_waiting) {
233 zassert(pthread_cond_signal(&(jp->worker_term_cond)));
234 jp->workers_term_waiting--;
235 }
236 pthread_detach(w->thread_id);
237 free(w);
238 if (jp->workers_total%10==0 && lastnotify!=jp->workers_total) {
239 syslog(LOG_NOTICE,"workers: %"PRIu32"-",jp->workers_total);
240 lastnotify = jp->workers_total;
241 }
242 // syslog(LOG_NOTICE,"jobs: close worker (total: %"PRIu32")",jp->workers_total);
243 }
244
245 #define opargs ((chunk_op_args*)(jptr->args))
246 // #define ocargs ((chunk_oc_args*)(jptr->args))
247 // #define rdargs ((chunk_rd_args*)(jptr->args))
248 // #define wrargs ((chunk_wr_args*)(jptr->args))
249 #define rwargs ((chunk_rw_args*)(jptr->args))
250 #define rpargs ((chunk_rp_args*)(jptr->args))
251 #define ijargs ((chunk_ij_args*)(jptr->args))
252 #define mvargs ((chunk_mv_args*)(jptr->args))
job_worker(void * arg)253 void* job_worker(void *arg) {
254 worker *w = (worker*)arg;
255 jobpool *jp = w->jp;
256 job *jptr;
257 uint8_t *jptrarg;
258 uint8_t status,jstate;
259 uint32_t jobid;
260 uint32_t op;
261
262 // syslog(LOG_NOTICE,"worker %p started (jobqueue: %p ; jptr:%p ; jptrarg:%p ; status:%p )",(void*)pthread_self(),jp->jobqueue,(void*)&jptr,(void*)&jptrarg,(void*)&status);
263 for (;;) {
264 queue_get(jp->jobqueue,&jobid,&op,&jptrarg,NULL);
265 // syslog(LOG_NOTICE,"job worker got job: %"PRIu32",%"PRIu32,jobid,op);
266 jptr = (job*)jptrarg;
267 zassert(pthread_mutex_lock(&(jp->jobslock)));
268 if (jobid==0 && op==0 && jptrarg==NULL) { // queue has been closed
269 job_close_worker(w);
270 zassert(pthread_mutex_unlock(&(jp->jobslock)));
271 return NULL;
272 }
273 jp->workers_avail--;
274 if (jp->workers_avail==0 && jp->workers_total<jp->workers_max) {
275 job_spawn_worker(jp);
276 }
277 if (jptr!=NULL) {
278 jstate=jptr->jstate;
279 if (jptr->jstate==JSTATE_ENABLED) {
280 jptr->jstate=JSTATE_INPROGRESS;
281 }
282 } else {
283 jstate=JSTATE_DISABLED;
284 }
285 zassert(pthread_mutex_unlock(&(jp->jobslock)));
286 switch (op) {
287 case OP_INVAL:
288 status = MFS_ERROR_EINVAL;
289 break;
290 /*
291 case OP_MAINSERV:
292 if (jstate==JSTATE_DISABLED) {
293 status = MFS_ERROR_NOTDONE;
294 } else {
295 mainserv_serve(*((int*)(jptr->args)));
296 status = MFS_STATUS_OK;
297 }
298 break;
299 */
300 case OP_CHUNKOP:
301 if (jstate==JSTATE_DISABLED) {
302 status = MFS_ERROR_NOTDONE;
303 } else {
304 status = hdd_chunkop(opargs->chunkid,opargs->version,opargs->newversion,opargs->copychunkid,opargs->copyversion,opargs->length);
305 }
306 break;
307 /*
308 case OP_OPEN:
309 if (jstate==JSTATE_DISABLED) {
310 status = MFS_ERROR_NOTDONE;
311 } else {
312 status = hdd_open(ocargs->chunkid,ocargs->version);
313 }
314 break;
315 case OP_CLOSE:
316 if (jstate==JSTATE_DISABLED) {
317 status = MFS_ERROR_NOTDONE;
318 } else {
319 status = hdd_close(ocargs->chunkid);
320 }
321 break;
322 case OP_READ:
323 if (jstate==JSTATE_DISABLED) {
324 status = MFS_ERROR_NOTDONE;
325 } else {
326 status = hdd_read(rdargs->chunkid,rdargs->version,rdargs->blocknum,rdargs->buffer,rdargs->offset,rdargs->size,rdargs->crcbuff);
327 }
328 break;
329 case OP_WRITE:
330 if (jstate==JSTATE_DISABLED) {
331 status = MFS_ERROR_NOTDONE;
332 } else {
333 status = hdd_write(wrargs->chunkid,wrargs->version,wrargs->blocknum,wrargs->buffer,wrargs->offset,wrargs->size,wrargs->crcbuff);
334 }
335 break;
336 */
337 case OP_SERV_READ:
338 if (jstate==JSTATE_DISABLED) {
339 status = MFS_ERROR_NOTDONE;
340 } else {
341 status = mainserv_read(rwargs->sock,rwargs->packet,rwargs->length);
342 }
343 break;
344 case OP_SERV_WRITE:
345 if (jstate==JSTATE_DISABLED) {
346 status = MFS_ERROR_NOTDONE;
347 } else {
348 status = mainserv_write(rwargs->sock,rwargs->packet,rwargs->length);
349 }
350 break;
351 case OP_REPLICATE:
352 if (jstate==JSTATE_DISABLED) {
353 status = MFS_ERROR_NOTDONE;
354 } else {
355 status = replicate(rpargs->chunkid,rpargs->version,rpargs->xormasks,rpargs->srccnt,((uint8_t*)(jptr->args))+sizeof(chunk_rp_args));
356 }
357 break;
358 case OP_GETBLOCKS:
359 if (jstate==JSTATE_DISABLED) {
360 status = MFS_ERROR_NOTDONE;
361 } else {
362 status = hdd_get_blocks(ijargs->chunkid,ijargs->version,ijargs->pointer);
363 }
364 break;
365 case OP_GETCHECKSUM:
366 if (jstate==JSTATE_DISABLED) {
367 status = MFS_ERROR_NOTDONE;
368 } else {
369 status = hdd_get_checksum(ijargs->chunkid,ijargs->version,ijargs->pointer);
370 }
371 break;
372 case OP_GETCHECKSUMTAB:
373 if (jstate==JSTATE_DISABLED) {
374 status = MFS_ERROR_NOTDONE;
375 } else {
376 status = hdd_get_checksum_tab(ijargs->chunkid,ijargs->version,ijargs->pointer);
377 }
378 break;
379 case OP_CHUNKMOVE:
380 if (jstate==JSTATE_DISABLED) {
381 status = MFS_ERROR_NOTDONE;
382 } else {
383 status = hdd_move(mvargs->fsrc,mvargs->fdst);
384 }
385 break;
386 default: // OP_EXIT
387 // syslog(LOG_NOTICE,"worker %p exiting (jobqueue: %p)",(void*)pthread_self(),jp->jobqueue);
388 zassert(pthread_mutex_lock(&(jp->jobslock)));
389 job_close_worker(w);
390 zassert(pthread_mutex_unlock(&(jp->jobslock)));
391 return NULL;
392 }
393 job_send_status(jp,jobid,status);
394 zassert(pthread_mutex_lock(&(jp->jobslock)));
395 jp->workers_avail++;
396 if (jp->workers_avail > jp->workers_max_idle) {
397 job_close_worker(w);
398 zassert(pthread_mutex_unlock(&(jp->jobslock)));
399 return NULL;
400 }
401 zassert(pthread_mutex_unlock(&(jp->jobslock)));
402 }
403 }
404
405 #define JOB_MODE_ALWAYS_DO 0
406 #define JOB_MODE_LIMITED_RETURN 1
407 #define JOB_MODE_LIMITED_QUEUE 2
408
job_new(jobpool * jp,uint32_t op,void * args,void (* callback)(uint8_t status,void * extra),void * extra,uint8_t errstatus,uint8_t jobmode)409 static inline uint32_t job_new(jobpool *jp,uint32_t op,void *args,void (*callback)(uint8_t status,void *extra),void *extra,uint8_t errstatus,uint8_t jobmode) {
410 // jobpool* jp = (jobpool*)jpool;
411 /*
412 if (exiting) {
413 if (callback) {
414 callback(MFS_ERROR_NOTDONE,extra);
415 }
416 if (args) {
417 free(args);
418 }
419 return 0;
420 } else {
421 */
422 uint32_t jobid;
423 uint32_t jhpos;
424 uint32_t workers_busy;
425 uint32_t limit;
426 job **jhandle,*jptr;
427
428 jptr = malloc(sizeof(job));
429 passert(jptr);
430
431 zassert(pthread_mutex_lock(&(jp->jobslock)));
432 jobid = jp->nextjobid;
433 jp->nextjobid++;
434 if (jp->nextjobid==0) {
435 jp->nextjobid=1;
436 }
437 jhpos = JHASHPOS(jobid);
438 jptr->jobid = jobid;
439 jptr->callback = callback;
440 jptr->extra = extra;
441 jptr->args = args;
442 jptr->jstate = JSTATE_ENABLED;
443 jptr->next = jp->jobhash[jhpos];
444 jp->jobhash[jhpos] = jptr;
445 workers_busy = jp->workers_total-jp->workers_avail;
446 limit = jp->workers_max;
447 zassert(pthread_mutex_unlock(&(jp->jobslock)));
448 if (queue_elements(jp->jobqueue)+workers_busy>limit && jobmode!=JOB_MODE_ALWAYS_DO) {
449 if (jobmode==JOB_MODE_LIMITED_RETURN) {
450 // remove this job from data structures
451 zassert(pthread_mutex_lock(&(jp->jobslock)));
452 jhandle = jp->jobhash+jhpos;
453 while ((jptr = *jhandle)) {
454 if (jptr->jobid==jobid) {
455 *jhandle = jptr->next;
456 if (jptr->args) {
457 free(jptr->args);
458 }
459 free(jptr);
460 break;
461 } else {
462 jhandle = &(jptr->next);
463 }
464 }
465 zassert(pthread_mutex_unlock(&(jp->jobslock)));
466 // end return jobid==0
467 return 0;
468 } else {
469 job_send_status(jp,jobid,errstatus);
470 }
471 } else {
472 queue_put(jp->jobqueue,jobid,op,(uint8_t*)jptr,1);
473 }
474 return jobid;
475 // }
476 }
477
478 /* interface */
479
job_pool_new(void)480 void* job_pool_new(void) {
481 int fd[2];
482 uint32_t i;
483 jobpool* jp;
484
485 if (pipe(fd)<0) {
486 return NULL;
487 }
488 jp=malloc(sizeof(jobpool));
489 passert(jp);
490 // syslog(LOG_WARNING,"new pool of workers (%p:%"PRIu8")",(void*)jp,workers);
491 jp->rpipe = fd[0];
492 jp->wpipe = fd[1];
493 jp->workers_avail = 0;
494 jp->workers_total = 0;
495 jp->workers_term_waiting = 0;
496 zassert(pthread_cond_init(&(jp->worker_term_cond),NULL));
497 zassert(pthread_mutex_init(&(jp->pipelock),NULL));
498 zassert(pthread_mutex_init(&(jp->jobslock),NULL));
499 jp->jobqueue = queue_new(0);
500 // syslog(LOG_WARNING,"new jobqueue: %p",jp->jobqueue);
501 jp->statusqueue = queue_new(0);
502 zassert(pthread_mutex_lock(&(jp->jobslock)));
503 for (i=0 ; i<JHASHSIZE ; i++) {
504 jp->jobhash[i]=NULL;
505 }
506 jp->nextjobid = 1;
507 job_spawn_worker(jp);
508 zassert(pthread_mutex_unlock(&(jp->jobslock)));
509 return jp;
510 }
511
job_pool_jobs_count(void)512 static uint32_t job_pool_jobs_count(void) {
513 jobpool* jp = globalpool;
514 uint32_t res;
515 zassert(pthread_mutex_lock(&(jp->jobslock)));
516 res = (jp->workers_total - jp->workers_avail) + queue_elements(jp->jobqueue);
517 zassert(pthread_mutex_unlock(&(jp->jobslock)));
518 return res;
519 }
520
521 /*
522 void job_pool_disable_and_change_callback_all(void (*callback)(uint8_t status,void *extra)) {
523 jobpool* jp = globalpool;
524 uint32_t jhpos;
525 job *jptr;
526
527 zassert(pthread_mutex_lock(&(jp->jobslock)));
528 for (jhpos = 0 ; jhpos<JHASHSIZE ; jhpos++) {
529 for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
530 if (jptr->jstate==JSTATE_ENABLED) {
531 jptr->jstate=JSTATE_DISABLED;
532 }
533 jptr->callback=callback;
534 }
535 }
536 zassert(pthread_mutex_unlock(&(jp->jobslock)));
537 }
538 */
539
job_pool_disable_job(uint32_t jobid)540 void job_pool_disable_job(uint32_t jobid) {
541 jobpool* jp = globalpool;
542 uint32_t jhpos = JHASHPOS(jobid);
543 job *jptr;
544
545 zassert(pthread_mutex_lock(&(jp->jobslock)));
546 for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
547 if (jptr->jobid==jobid) {
548 if (jptr->jstate==JSTATE_ENABLED) {
549 jptr->jstate=JSTATE_DISABLED;
550 }
551 }
552 }
553 zassert(pthread_mutex_unlock(&(jp->jobslock)));
554 }
555
job_pool_change_callback(uint32_t jobid,void (* callback)(uint8_t status,void * extra),void * extra)556 void job_pool_change_callback(uint32_t jobid,void (*callback)(uint8_t status,void *extra),void *extra) {
557 jobpool* jp = globalpool;
558 uint32_t jhpos = JHASHPOS(jobid);
559 job *jptr;
560
561 zassert(pthread_mutex_lock(&(jp->jobslock)));
562 for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
563 if (jptr->jobid==jobid) {
564 jptr->callback=callback;
565 jptr->extra=extra;
566 }
567 }
568 zassert(pthread_mutex_unlock(&(jp->jobslock)));
569 }
570
job_pool_check_jobs(uint8_t cb)571 void job_pool_check_jobs(uint8_t cb) {
572 jobpool* jp = globalpool;
573 uint32_t jobid,jhpos;
574 uint8_t status;
575 int notlast;
576 job **jhandle,*jptr;
577
578 zassert(pthread_mutex_lock(&(jp->jobslock)));
579 do {
580 notlast = job_receive_status(jp,&jobid,&status);
581 jhpos = JHASHPOS(jobid);
582 jhandle = jp->jobhash+jhpos;
583 while ((jptr = *jhandle)) {
584 if (jptr->jobid==jobid) {
585 if (jptr->callback && cb) {
586 jptr->callback(status,jptr->extra);
587 }
588 *jhandle = jptr->next;
589 if (jptr->args) {
590 free(jptr->args);
591 }
592 free(jptr);
593 break;
594 } else {
595 jhandle = &(jptr->next);
596 }
597 }
598 } while (notlast);
599 zassert(pthread_mutex_unlock(&(jp->jobslock)));
600 }
601
job_pool_delete(jobpool * jp)602 void job_pool_delete(jobpool* jp) {
603 queue_close(jp->jobqueue);
604 zassert(pthread_mutex_lock(&(jp->jobslock)));
605 while (jp->workers_total>0) {
606 jp->workers_term_waiting++;
607 zassert(pthread_cond_wait(&(jp->worker_term_cond),&(jp->jobslock)));
608 }
609 zassert(pthread_mutex_unlock(&(jp->jobslock)));
610 if (!queue_isempty(jp->statusqueue)) {
611 syslog(LOG_WARNING,"not empty job queue !!!");
612 job_pool_check_jobs(0);
613 }
614 // syslog(LOG_NOTICE,"deleting jobqueue: %p",jp->jobqueue);
615 queue_delete(jp->jobqueue);
616 queue_delete(jp->statusqueue);
617 zassert(pthread_cond_destroy(&(jp->worker_term_cond)));
618 zassert(pthread_mutex_destroy(&(jp->pipelock)));
619 zassert(pthread_mutex_destroy(&(jp->jobslock)));
620 close(jp->rpipe);
621 close(jp->wpipe);
622 free(jp);
623 }
624
job_inval(void (* callback)(uint8_t status,void * extra),void * extra)625 uint32_t job_inval(void (*callback)(uint8_t status,void *extra),void *extra) {
626 jobpool* jp = globalpool;
627 return job_new(jp,OP_INVAL,NULL,callback,extra,MFS_ERROR_EINVAL,JOB_MODE_LIMITED_QUEUE);
628 }
629
630 /*
631 uint32_t job_mainserv(int sock) {
632 jobpool* jp = globalpool;
633 int *args;
634 args = malloc(sizeof(int));
635 passert(args);
636 *args = sock;
637 return job_new(jp,OP_MAINSERV,args,NULL,NULL);
638 }
639 */
640
job_chunkop(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint32_t newversion,uint64_t copychunkid,uint32_t copyversion,uint32_t length)641 uint32_t job_chunkop(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint32_t newversion,uint64_t copychunkid,uint32_t copyversion,uint32_t length) {
642 jobpool* jp = globalpool;
643 chunk_op_args *args;
644 args = malloc(sizeof(chunk_op_args));
645 passert(args);
646 args->chunkid = chunkid;
647 args->version = version;
648 args->newversion = newversion;
649 args->copychunkid = copychunkid;
650 args->copyversion = copyversion;
651 args->length = length;
652 return job_new(jp,OP_CHUNKOP,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
653 }
654 /*
655 uint32_t job_open(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version) {
656 jobpool* jp = globalpool;
657 chunk_oc_args *args;
658 args = malloc(sizeof(chunk_oc_args));
659 passert(args);
660 args->chunkid = chunkid;
661 args->version = version;
662 return job_new(jp,OP_OPEN,args,callback,extra);
663 }
664
665 uint32_t job_close(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid) {
666 jobpool* jp = globalpool;
667 chunk_oc_args *args;
668 args = malloc(sizeof(chunk_oc_args));
669 passert(args);
670 args->chunkid = chunkid;
671 args->version = 0;
672 return job_new(jp,OP_CLOSE,args,callback,extra);
673 }
674
675 uint32_t job_read(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint16_t blocknum,uint8_t *buffer,uint32_t offset,uint32_t size,uint8_t *crcbuff) {
676 jobpool* jp = globalpool;
677 chunk_rd_args *args;
678 args = malloc(sizeof(chunk_rd_args));
679 passert(args);
680 args->chunkid = chunkid;
681 args->version = version;
682 args->blocknum = blocknum;
683 args->buffer = buffer;
684 args->offset = offset;
685 args->size = size;
686 args->crcbuff = crcbuff;
687 return job_new(jp,OP_READ,args,callback,extra);
688 }
689
690 uint32_t job_write(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint16_t blocknum,const uint8_t *buffer,uint32_t offset,uint32_t size,const uint8_t *crcbuff) {
691 jobpool* jp = globalpool;
692 chunk_wr_args *args;
693 args = malloc(sizeof(chunk_wr_args));
694 passert(args);
695 args->chunkid = chunkid;
696 args->version = version;
697 args->blocknum = blocknum;
698 args->buffer = buffer;
699 args->offset = offset;
700 args->size = size;
701 args->crcbuff = crcbuff;
702 return job_new(jp,OP_WRITE,args,callback,extra);
703 }
704 */
705
job_serv_read(void (* callback)(uint8_t status,void * extra),void * extra,int sock,const uint8_t * packet,uint32_t length)706 uint32_t job_serv_read(void (*callback)(uint8_t status,void *extra),void *extra,int sock,const uint8_t *packet,uint32_t length) {
707 jobpool* jp = globalpool;
708 chunk_rw_args *args;
709 args = malloc(sizeof(chunk_rw_args));
710 passert(args);
711 args->sock = sock;
712 args->packet = packet;
713 args->length = length;
714 return job_new(jp,OP_SERV_READ,args,callback,extra,0,JOB_MODE_LIMITED_RETURN);
715 }
716
job_serv_write(void (* callback)(uint8_t status,void * extra),void * extra,int sock,const uint8_t * packet,uint32_t length)717 uint32_t job_serv_write(void (*callback)(uint8_t status,void *extra),void *extra,int sock,const uint8_t *packet,uint32_t length) {
718 jobpool* jp = globalpool;
719 chunk_rw_args *args;
720 args = malloc(sizeof(chunk_rw_args));
721 passert(args);
722 args->sock = sock;
723 args->packet = packet;
724 args->length = length;
725 return job_new(jp,OP_SERV_WRITE,args,callback,extra,0,JOB_MODE_LIMITED_RETURN);
726 }
727
job_replicate_raid(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint32_t xormasks[4],const uint8_t * srcs)728 uint32_t job_replicate_raid(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint32_t xormasks[4],const uint8_t *srcs) {
729 jobpool* jp = globalpool;
730 chunk_rp_args *args;
731 uint8_t *ptr;
732 ptr = malloc(sizeof(chunk_rp_args)+srccnt*18);
733 passert(ptr);
734 args = (chunk_rp_args*)ptr;
735 ptr += sizeof(chunk_rp_args);
736 args->chunkid = chunkid;
737 args->version = version;
738 args->srccnt = srccnt;
739 args->xormasks[0] = xormasks[0];
740 args->xormasks[1] = xormasks[1];
741 args->xormasks[2] = xormasks[2];
742 args->xormasks[3] = xormasks[3];
743 memcpy(ptr,srcs,srccnt*18);
744 return job_new(jp,OP_REPLICATE,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
745 }
746
job_replicate_simple(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint32_t ip,uint16_t port)747 uint32_t job_replicate_simple(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint32_t ip,uint16_t port) {
748 jobpool* jp = globalpool;
749 chunk_rp_args *args;
750 uint8_t *ptr;
751 ptr = malloc(sizeof(chunk_rp_args)+18);
752 passert(ptr);
753 args = (chunk_rp_args*)ptr;
754 ptr += sizeof(chunk_rp_args);
755 args->chunkid = chunkid;
756 args->version = version;
757 args->srccnt = 1;
758 args->xormasks[0] = UINT32_C(0x88888888);
759 args->xormasks[1] = UINT32_C(0x44444444);
760 args->xormasks[2] = UINT32_C(0x22222222);
761 args->xormasks[3] = UINT32_C(0x11111111);
762 put64bit(&ptr,chunkid);
763 put32bit(&ptr,version);
764 put32bit(&ptr,ip);
765 put16bit(&ptr,port);
766 return job_new(jp,OP_REPLICATE,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
767 }
768
job_get_chunk_blocks(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * blocks)769 uint32_t job_get_chunk_blocks(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *blocks) {
770 jobpool* jp = globalpool;
771 chunk_ij_args *args;
772 args = malloc(sizeof(chunk_ij_args));
773 passert(args);
774 args->chunkid = chunkid;
775 args->version = version;
776 args->pointer = blocks;
777 return job_new(jp,OP_GETBLOCKS,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
778 }
779
job_get_chunk_checksum(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * checksum)780 uint32_t job_get_chunk_checksum(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *checksum) {
781 jobpool* jp = globalpool;
782 chunk_ij_args *args;
783 args = malloc(sizeof(chunk_ij_args));
784 passert(args);
785 args->chunkid = chunkid;
786 args->version = version;
787 args->pointer = checksum;
788 return job_new(jp,OP_GETCHECKSUM,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
789 }
790
job_get_chunk_checksum_tab(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * checksum_tab)791 uint32_t job_get_chunk_checksum_tab(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *checksum_tab) {
792 jobpool* jp = globalpool;
793 chunk_ij_args *args;
794 args = malloc(sizeof(chunk_ij_args));
795 passert(args);
796 args->chunkid = chunkid;
797 args->version = version;
798 args->pointer = checksum_tab;
799 return job_new(jp,OP_GETCHECKSUMTAB,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
800 }
801
job_chunk_move(void (* callback)(uint8_t status,void * extra),void * extra,void * fsrc,void * fdst)802 uint32_t job_chunk_move(void (*callback)(uint8_t status,void *extra),void *extra,void *fsrc,void *fdst) {
803 jobpool* jp = globalpool;
804 chunk_mv_args *args;
805 args = malloc(sizeof(chunk_mv_args));
806 passert(args);
807 args->fsrc = fsrc;
808 args->fdst = fdst;
809 return job_new(jp,OP_CHUNKMOVE,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
810 }
811
job_desc(struct pollfd * pdesc,uint32_t * ndesc)812 void job_desc(struct pollfd *pdesc,uint32_t *ndesc) {
813 uint32_t pos = *ndesc;
814 jobpool* jp = globalpool;
815
816 pdesc[pos].fd = jp->rpipe;
817 pdesc[pos].events = POLLIN;
818 jp->fdpdescpos = pos;
819 pos++;
820
821 *ndesc = pos;
822 }
823
job_serve(struct pollfd * pdesc)824 void job_serve(struct pollfd *pdesc) {
825 jobpool* jp = globalpool;
826 uint32_t jobscnt;
827
828 if (jp->fdpdescpos>=0 && (pdesc[jp->fdpdescpos].revents & POLLIN)) {
829 job_pool_check_jobs(1);
830 }
831
832 jobscnt = job_pool_jobs_count();
833 if (jobscnt>=stats_maxjobscnt) {
834 stats_maxjobscnt=jobscnt;
835 }
836 }
837
838 // can be only HLSTATUS_OK or HLSTATUS_OVERLOADED
839 static uint8_t current_hlstatus = HLSTATUS_OK;
840
job_get_load_and_hlstatus(uint32_t * load,uint8_t * hlstatus)841 void job_get_load_and_hlstatus(uint32_t *load,uint8_t *hlstatus) {
842 *load = job_pool_jobs_count();
843 *hlstatus = current_hlstatus;
844 }
845
job_heavyload_test(void)846 void job_heavyload_test(void) {
847 jobpool* jp = globalpool;
848 uint8_t hlstatus;
849
850 zassert(pthread_mutex_lock(&(jp->jobslock)));
851 hlstatus = HLSTATUS_DEFAULT;
852 if (jp->workers_total - jp->workers_avail > jp->workers_himark) {
853 hlstatus = HLSTATUS_OVERLOADED;
854 }
855 if (jp->workers_total - jp->workers_avail < jp->workers_lomark) {
856 hlstatus = HLSTATUS_OK;
857 }
858 zassert(pthread_mutex_unlock(&(jp->jobslock)));
859
860 if (hlstatus!=HLSTATUS_DEFAULT && hlstatus!=current_hlstatus) {
861 current_hlstatus = hlstatus;
862 masterconn_reportload();
863 }
864 }
865
866 //void job_wantexit(void) {
867 // exiting = 1;
868 //}
869
job_canexit(void)870 int job_canexit(void) {
871 return (job_pool_jobs_count()>0)?0:1;
872 }
873
job_term(void)874 void job_term(void) {
875 job_pool_delete(globalpool);
876 }
877
job_reload(void)878 void job_reload(void) {
879 jobpool* jp = globalpool;
880
881 zassert(pthread_mutex_lock(&(jp->jobslock)));
882
883 jp->workers_max = cfg_getuint32("WORKERS_MAX",250);
884 jp->workers_himark = (jp->workers_max * 3) / 4;
885 jp->workers_lomark = (jp->workers_max * 2) / 4;
886 jp->workers_max_idle = cfg_getuint32("WORKERS_MAX_IDLE",40);
887
888 zassert(pthread_mutex_unlock(&(jp->jobslock)));
889 }
890
job_init(void)891 int job_init(void) {
892 // globalpool = (jobpool*)malloc(sizeof(jobpool));
893 // exiting = 0;
894 globalpool = job_pool_new();
895
896 if (globalpool==NULL) {
897 return -1;
898 }
899 job_reload();
900
901 main_destruct_register(job_term);
902 // main_wantexit_register(job_wantexit);
903 main_canexit_register(job_canexit);
904 main_reload_register(job_reload);
905 main_eachloop_register(job_heavyload_test);
906 main_poll_register(job_desc,job_serve);
907 return 0;
908 }
909