1 /*
2  * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <syslog.h>
29 #include <inttypes.h>
30 //#include <fcntl.h>
31 //#include <sys/ioctl.h>
32 #include <limits.h>
33 #include <signal.h>
34 #include <pthread.h>
35 #include <errno.h>
36 
37 #include "main.h"
38 #include "cfg.h"
39 #include "pcqueue.h"
40 #include "datapack.h"
41 #include "massert.h"
42 
43 #include "mainserv.h"
44 #include "hddspacemgr.h"
45 #include "replicator.h"
46 
47 #define JHASHSIZE 0x400
48 #define JHASHPOS(id) ((id)&0x3FF)
49 
50 enum {
51 	JSTATE_DISABLED,
52 	JSTATE_ENABLED,
53 	JSTATE_INPROGRESS
54 };
55 
56 enum {
57 	OP_EXIT=0,
58 	OP_INVAL,
59 //	OP_MAINSERV,
60 	OP_CHUNKOP,
61 //	OP_OPEN,
62 //	OP_CLOSE,
63 //	OP_READ,
64 //	OP_WRITE,
65 	OP_SERV_READ,
66 	OP_SERV_WRITE,
67 	OP_REPLICATE,
68 	OP_GETBLOCKS,
69 	OP_GETCHECKSUM,
70 	OP_GETCHECKSUMTAB
71 };
72 
73 // for OP_CHUNKOP
74 typedef struct _chunk_op_args {
75 	uint64_t chunkid,copychunkid;
76 	uint32_t version,newversion,copyversion;
77 	uint32_t length;
78 } chunk_op_args;
79 /*
80 // for OP_OPEN and OP_CLOSE
81 typedef struct _chunk_oc_args {
82 	uint64_t chunkid;
83 	uint32_t version;
84 } chunk_oc_args;
85 
86 // for OP_READ
87 typedef struct _chunk_rd_args {
88 	uint64_t chunkid;
89 	uint32_t version;
90 	uint32_t offset,size;
91 	uint16_t blocknum;
92 	uint8_t *buffer;
93 	uint8_t *crcbuff;
94 } chunk_rd_args;
95 
96 // for OP_WRITE
97 typedef struct _chunk_wr_args {
98 	uint64_t chunkid;
99 	uint32_t version;
100 	uint32_t offset,size;
101 	uint16_t blocknum;
102 	const uint8_t *buffer;
103 	const uint8_t *crcbuff;
104 } chunk_wr_args;
105 */
106 
107 // for OP_SERV_READ and OP_SERV_WRITE
108 typedef struct _chunk_rw_args {
109 	int sock;
110 	const uint8_t *packet;
111 	uint32_t length;
112 } chunk_rw_args;
113 
114 // for OP_REPLICATE
115 typedef struct _chunk_rp_args {
116 	uint64_t chunkid;
117 	uint32_t version;
118 	uint8_t srccnt;
119 } chunk_rp_args;
120 
121 // for OP_GETBLOCKS, OP_GETCHECKSUM and OP_GETCHECKSUMTAB
122 typedef struct _chunk_ij_args {
123 	uint64_t chunkid;
124 	uint32_t version;
125 	void *pointer;
126 } chunk_ij_args;
127 
128 typedef struct _job {
129 	uint32_t jobid;
130 	void (*callback)(uint8_t status,void *extra);
131 	void *extra;
132 	void *args;
133 	uint8_t jstate;
134 	struct _job *next;
135 } job;
136 
137 typedef struct _jobpool {
138 	int rpipe,wpipe;
139 	int32_t fdpdescpos;
140 	uint32_t workers_max;
141 	uint32_t workers_max_idle;
142 	uint32_t workers_avail;
143 	uint32_t workers_total;
144 	uint32_t workers_term_waiting;
145 	pthread_cond_t worker_term_cond;
146 	pthread_mutex_t pipelock;
147 	pthread_mutex_t jobslock;
148 	void *jobqueue;
149 	void *statusqueue;
150 	job* jobhash[JHASHSIZE];
151 	uint32_t nextjobid;
152 } jobpool;
153 
154 typedef struct _worker {
155 	pthread_t thread_id;
156 	jobpool *jp;
157 } worker;
158 
159 static jobpool* globalpool = NULL;
160 
161 static uint32_t stats_maxjobscnt = 0;
162 static uint32_t last_maxjobscnt = 0;
163 
164 static uint8_t exiting;
165 
job_stats(uint32_t * maxjobscnt)166 void job_stats(uint32_t *maxjobscnt) {
167 	*maxjobscnt = last_maxjobscnt = stats_maxjobscnt;
168 	stats_maxjobscnt = 0;
169 }
170 
job_getload(void)171 uint32_t job_getload(void) {
172 	return last_maxjobscnt;
173 }
174 
job_send_status(jobpool * jp,uint32_t jobid,uint8_t status)175 static inline void job_send_status(jobpool *jp,uint32_t jobid,uint8_t status) {
176 	zassert(pthread_mutex_lock(&(jp->pipelock)));
177 	if (queue_isempty(jp->statusqueue)) {	// first status
178 		eassert(write(jp->wpipe,&status,1)==1);	// write anything to wake up select
179 	}
180 	queue_put(jp->statusqueue,jobid,status,NULL,1);
181 	zassert(pthread_mutex_unlock(&(jp->pipelock)));
182 	return;
183 }
184 
job_receive_status(jobpool * jp,uint32_t * jobid,uint8_t * status)185 static inline int job_receive_status(jobpool *jp,uint32_t *jobid,uint8_t *status) {
186 	uint32_t qstatus;
187 	zassert(pthread_mutex_lock(&(jp->pipelock)));
188 	queue_get(jp->statusqueue,jobid,&qstatus,NULL,NULL);
189 	*status = qstatus;
190 	if (queue_isempty(jp->statusqueue)) {
191 		eassert(read(jp->rpipe,&qstatus,1)==1);	// make pipe empty
192 		zassert(pthread_mutex_unlock(&(jp->pipelock)));
193 		return 0;	// last element
194 	}
195 	zassert(pthread_mutex_unlock(&(jp->pipelock)));
196 	return 1;	// not last
197 }
198 
199 void* job_worker(void *arg);
200 
201 static uint32_t lastnotify = 0;
202 
job_spawn_worker(jobpool * jp)203 static inline void job_spawn_worker(jobpool *jp) {
204 	worker *w;
205 
206 	w = malloc(sizeof(worker));
207 	passert(w);
208 	w->jp = jp;
209 	if (main_minthread_create(&(w->thread_id),0,job_worker,w)<0) {
210 		return;
211 	}
212 	jp->workers_avail++;
213 	jp->workers_total++;
214 	if (jp->workers_total%10==0 && lastnotify!=jp->workers_total) {
215 		syslog(LOG_NOTICE,"workers: %"PRIu32"+",jp->workers_total);
216 		lastnotify = jp->workers_total;
217 	}
218 //	syslog(LOG_NOTICE,"jobs: spawn worker (total: %"PRIu32")",jp->workers_total);
219 }
220 
job_close_worker(worker * w)221 static inline void job_close_worker(worker *w) {
222 	jobpool *jp = w->jp;
223 	jp->workers_avail--;
224 	jp->workers_total--;
225 	if (jp->workers_total==0 && jp->workers_term_waiting) {
226 		zassert(pthread_cond_signal(&(jp->worker_term_cond)));
227 		jp->workers_term_waiting--;
228 	}
229 	pthread_detach(w->thread_id);
230 	free(w);
231 	if (jp->workers_total%10==0 && lastnotify!=jp->workers_total) {
232 		syslog(LOG_NOTICE,"workers: %"PRIu32"-",jp->workers_total);
233 		lastnotify = jp->workers_total;
234 	}
235 //	syslog(LOG_NOTICE,"jobs: close worker (total: %"PRIu32")",jp->workers_total);
236 }
237 
238 #define opargs ((chunk_op_args*)(jptr->args))
239 // #define ocargs ((chunk_oc_args*)(jptr->args))
240 // #define rdargs ((chunk_rd_args*)(jptr->args))
241 // #define wrargs ((chunk_wr_args*)(jptr->args))
242 #define rwargs ((chunk_rw_args*)(jptr->args))
243 #define rpargs ((chunk_rp_args*)(jptr->args))
244 #define ijargs ((chunk_ij_args*)(jptr->args))
job_worker(void * arg)245 void* job_worker(void *arg) {
246 	worker *w = (worker*)arg;
247 	jobpool *jp = w->jp;
248 	job *jptr;
249 	uint8_t *jptrarg;
250 	uint8_t status,jstate;
251 	uint32_t jobid;
252 	uint32_t op;
253 
254 //	syslog(LOG_NOTICE,"worker %p started (jobqueue: %p ; jptr:%p ; jptrarg:%p ; status:%p )",(void*)pthread_self(),jp->jobqueue,(void*)&jptr,(void*)&jptrarg,(void*)&status);
255 	for (;;) {
256 		queue_get(jp->jobqueue,&jobid,&op,&jptrarg,NULL);
257 //		syslog(LOG_NOTICE,"job worker got job: %"PRIu32",%"PRIu32,jobid,op);
258 		jptr = (job*)jptrarg;
259 		zassert(pthread_mutex_lock(&(jp->jobslock)));
260 		if (jobid==0 && op==0 && jptrarg==NULL) { // queue has been closed
261 			job_close_worker(w);
262 			zassert(pthread_mutex_unlock(&(jp->jobslock)));
263 			return NULL;
264 		}
265 		jp->workers_avail--;
266 		if (jp->workers_avail==0 && jp->workers_total<jp->workers_max) {
267 			job_spawn_worker(jp);
268 		}
269 		if (jptr!=NULL) {
270 			jstate=jptr->jstate;
271 			if (jptr->jstate==JSTATE_ENABLED) {
272 				jptr->jstate=JSTATE_INPROGRESS;
273 			}
274 		} else {
275 			jstate=JSTATE_DISABLED;
276 		}
277 		zassert(pthread_mutex_unlock(&(jp->jobslock)));
278 		switch (op) {
279 			case OP_INVAL:
280 				status = ERROR_EINVAL;
281 				break;
282 /*
283 			case OP_MAINSERV:
284 				if (jstate==JSTATE_DISABLED) {
285 					status = ERROR_NOTDONE;
286 				} else {
287 					mainserv_serve(*((int*)(jptr->args)));
288 					status = STATUS_OK;
289 				}
290 				break;
291 */
292 			case OP_CHUNKOP:
293 				if (jstate==JSTATE_DISABLED) {
294 					status = ERROR_NOTDONE;
295 				} else {
296 					status = hdd_chunkop(opargs->chunkid,opargs->version,opargs->newversion,opargs->copychunkid,opargs->copyversion,opargs->length);
297 				}
298 				break;
299 /*
300 			case OP_OPEN:
301 				if (jstate==JSTATE_DISABLED) {
302 					status = ERROR_NOTDONE;
303 				} else {
304 					status = hdd_open(ocargs->chunkid,ocargs->version);
305 				}
306 				break;
307 			case OP_CLOSE:
308 				if (jstate==JSTATE_DISABLED) {
309 					status = ERROR_NOTDONE;
310 				} else {
311 					status = hdd_close(ocargs->chunkid);
312 				}
313 				break;
314 			case OP_READ:
315 				if (jstate==JSTATE_DISABLED) {
316 					status = ERROR_NOTDONE;
317 				} else {
318 					status = hdd_read(rdargs->chunkid,rdargs->version,rdargs->blocknum,rdargs->buffer,rdargs->offset,rdargs->size,rdargs->crcbuff);
319 				}
320 				break;
321 			case OP_WRITE:
322 				if (jstate==JSTATE_DISABLED) {
323 					status = ERROR_NOTDONE;
324 				} else {
325 					status = hdd_write(wrargs->chunkid,wrargs->version,wrargs->blocknum,wrargs->buffer,wrargs->offset,wrargs->size,wrargs->crcbuff);
326 				}
327 				break;
328 */
329 			case OP_SERV_READ:
330 				if (jstate==JSTATE_DISABLED) {
331 					status = ERROR_NOTDONE;
332 				} else {
333 					status = mainserv_read(rwargs->sock,rwargs->packet,rwargs->length);
334 				}
335 				break;
336 			case OP_SERV_WRITE:
337 				if (jstate==JSTATE_DISABLED) {
338 					status = ERROR_NOTDONE;
339 				} else {
340 					status = mainserv_write(rwargs->sock,rwargs->packet,rwargs->length);
341 				}
342 				break;
343 			case OP_REPLICATE:
344 				if (jstate==JSTATE_DISABLED) {
345 					status = ERROR_NOTDONE;
346 				} else {
347 					status = replicate(rpargs->chunkid,rpargs->version,rpargs->srccnt,((uint8_t*)(jptr->args))+sizeof(chunk_rp_args));
348 				}
349 				break;
350 			case OP_GETBLOCKS:
351 				if (jstate==JSTATE_DISABLED) {
352 					status = ERROR_NOTDONE;
353 				} else {
354 					status = hdd_get_blocks(ijargs->chunkid,ijargs->version,ijargs->pointer);
355 				}
356 				break;
357 			case OP_GETCHECKSUM:
358 				if (jstate==JSTATE_DISABLED) {
359 					status = ERROR_NOTDONE;
360 				} else {
361 					status = hdd_get_checksum(ijargs->chunkid,ijargs->version,ijargs->pointer);
362 				}
363 				break;
364 			case OP_GETCHECKSUMTAB:
365 				if (jstate==JSTATE_DISABLED) {
366 					status = ERROR_NOTDONE;
367 				} else {
368 					status = hdd_get_checksum_tab(ijargs->chunkid,ijargs->version,ijargs->pointer);
369 				}
370 				break;
371 			default: // OP_EXIT
372 //				syslog(LOG_NOTICE,"worker %p exiting (jobqueue: %p)",(void*)pthread_self(),jp->jobqueue);
373 				zassert(pthread_mutex_lock(&(jp->jobslock)));
374 				job_close_worker(w);
375 				zassert(pthread_mutex_unlock(&(jp->jobslock)));
376 				return NULL;
377 		}
378 		job_send_status(jp,jobid,status);
379 		zassert(pthread_mutex_lock(&(jp->jobslock)));
380 		jp->workers_avail++;
381 		if (jp->workers_avail > jp->workers_max_idle) {
382 			job_close_worker(w);
383 			zassert(pthread_mutex_unlock(&(jp->jobslock)));
384 			return NULL;
385 		}
386 		zassert(pthread_mutex_unlock(&(jp->jobslock)));
387 	}
388 }
389 
job_new(jobpool * jp,uint32_t op,void * args,void (* callback)(uint8_t status,void * extra),void * extra)390 static inline uint32_t job_new(jobpool *jp,uint32_t op,void *args,void (*callback)(uint8_t status,void *extra),void *extra) {
391 //	jobpool* jp = (jobpool*)jpool;
392 	if (exiting) {
393 		if (callback) {
394 			callback(ERROR_NOTDONE,extra);
395 		}
396 		if (args) {
397 			free(args);
398 		}
399 		return 0;
400 	} else {
401 		uint32_t jobid = jp->nextjobid;
402 		uint32_t jhpos = JHASHPOS(jobid);
403 		job *jptr;
404 		jptr = malloc(sizeof(job));
405 		passert(jptr);
406 		jptr->jobid = jobid;
407 		jptr->callback = callback;
408 		jptr->extra = extra;
409 		jptr->args = args;
410 		jptr->jstate = JSTATE_ENABLED;
411 		jptr->next = jp->jobhash[jhpos];
412 		jp->jobhash[jhpos] = jptr;
413 		queue_put(jp->jobqueue,jobid,op,(uint8_t*)jptr,1);
414 		jp->nextjobid++;
415 		if (jp->nextjobid==0) {
416 			jp->nextjobid=1;
417 		}
418 		return jobid;
419 	}
420 }
421 
422 /* interface */
423 
job_pool_new(uint32_t jobs)424 void* job_pool_new(uint32_t jobs) {
425 	int fd[2];
426 	uint32_t i;
427 	jobpool* jp;
428 
429 	if (pipe(fd)<0) {
430 		return NULL;
431 	}
432        	jp=malloc(sizeof(jobpool));
433 	passert(jp);
434 //	syslog(LOG_WARNING,"new pool of workers (%p:%"PRIu8")",(void*)jp,workers);
435 	jp->rpipe = fd[0];
436 	jp->wpipe = fd[1];
437 	jp->workers_avail = 0;
438 	jp->workers_total = 0;
439 	jp->workers_term_waiting = 0;
440 	zassert(pthread_cond_init(&(jp->worker_term_cond),NULL));
441 	zassert(pthread_mutex_init(&(jp->pipelock),NULL));
442 	zassert(pthread_mutex_init(&(jp->jobslock),NULL));
443 	jp->jobqueue = queue_new(jobs);
444 //	syslog(LOG_WARNING,"new jobqueue: %p",jp->jobqueue);
445 	jp->statusqueue = queue_new(0);
446 	for (i=0 ; i<JHASHSIZE ; i++) {
447 		jp->jobhash[i]=NULL;
448 	}
449 	jp->nextjobid = 1;
450 	zassert(pthread_mutex_lock(&(jp->jobslock)));
451 	job_spawn_worker(jp);
452 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
453 	return jp;
454 }
455 
job_pool_jobs_count(void)456 uint32_t job_pool_jobs_count(void) {
457 	jobpool* jp = globalpool;
458 	uint32_t res;
459 	zassert(pthread_mutex_lock(&(jp->jobslock)));
460 	res = (jp->workers_total - jp->workers_avail) + queue_elements(jp->jobqueue);
461 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
462 	return res;
463 }
464 
465 /*
466 void job_pool_disable_and_change_callback_all(void (*callback)(uint8_t status,void *extra)) {
467 	jobpool* jp = globalpool;
468 	uint32_t jhpos;
469 	job *jptr;
470 
471 	zassert(pthread_mutex_lock(&(jp->jobslock)));
472 	for (jhpos = 0 ; jhpos<JHASHSIZE ; jhpos++) {
473 		for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
474 			if (jptr->jstate==JSTATE_ENABLED) {
475 				jptr->jstate=JSTATE_DISABLED;
476 			}
477 			jptr->callback=callback;
478 		}
479 	}
480 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
481 }
482 */
483 
job_pool_disable_job(uint32_t jobid)484 void job_pool_disable_job(uint32_t jobid) {
485 	jobpool* jp = globalpool;
486 	uint32_t jhpos = JHASHPOS(jobid);
487 	job *jptr;
488 	for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
489 		if (jptr->jobid==jobid) {
490 			zassert(pthread_mutex_lock(&(jp->jobslock)));
491 			if (jptr->jstate==JSTATE_ENABLED) {
492 				jptr->jstate=JSTATE_DISABLED;
493 			}
494 			zassert(pthread_mutex_unlock(&(jp->jobslock)));
495 		}
496 	}
497 }
498 
job_pool_change_callback(uint32_t jobid,void (* callback)(uint8_t status,void * extra),void * extra)499 void job_pool_change_callback(uint32_t jobid,void (*callback)(uint8_t status,void *extra),void *extra) {
500 	jobpool* jp = globalpool;
501 	uint32_t jhpos = JHASHPOS(jobid);
502 	job *jptr;
503 	for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
504 		if (jptr->jobid==jobid) {
505 			jptr->callback=callback;
506 			jptr->extra=extra;
507 		}
508 	}
509 }
510 
job_pool_check_jobs(uint8_t cb)511 void job_pool_check_jobs(uint8_t cb) {
512 	jobpool* jp = globalpool;
513 	uint32_t jobid,jhpos;
514 	uint8_t status;
515 	int notlast;
516 	job **jhandle,*jptr;
517 	do {
518 		notlast = job_receive_status(jp,&jobid,&status);
519 		jhpos = JHASHPOS(jobid);
520 		jhandle = jp->jobhash+jhpos;
521 		while ((jptr = *jhandle)) {
522 			if (jptr->jobid==jobid) {
523 				if (jptr->callback && cb) {
524 					jptr->callback(status,jptr->extra);
525 				}
526 				*jhandle = jptr->next;
527 				if (jptr->args) {
528 					free(jptr->args);
529 				}
530 				free(jptr);
531 				break;
532 			} else {
533 				jhandle = &(jptr->next);
534 			}
535 		}
536 	} while (notlast);
537 }
538 
job_pool_delete(jobpool * jp)539 void job_pool_delete(jobpool* jp) {
540 	queue_close(jp->jobqueue);
541 	zassert(pthread_mutex_lock(&(jp->jobslock)));
542 	while (jp->workers_total>0) {
543 		jp->workers_term_waiting++;
544 		zassert(pthread_cond_wait(&(jp->worker_term_cond),&(jp->jobslock)));
545 	}
546 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
547 	if (!queue_isempty(jp->statusqueue)) {
548 		job_pool_check_jobs(0);
549 	}
550 //	syslog(LOG_NOTICE,"deleting jobqueue: %p",jp->jobqueue);
551 	queue_delete(jp->jobqueue);
552 	queue_delete(jp->statusqueue);
553 	zassert(pthread_cond_destroy(&(jp->worker_term_cond)));
554 	zassert(pthread_mutex_destroy(&(jp->pipelock)));
555 	zassert(pthread_mutex_destroy(&(jp->jobslock)));
556 	close(jp->rpipe);
557 	close(jp->wpipe);
558 	free(jp);
559 }
560 
job_inval(void (* callback)(uint8_t status,void * extra),void * extra)561 uint32_t job_inval(void (*callback)(uint8_t status,void *extra),void *extra) {
562 	jobpool* jp = globalpool;
563 	return job_new(jp,OP_INVAL,NULL,callback,extra);
564 }
565 
566 /*
567 uint32_t job_mainserv(int sock) {
568 	jobpool* jp = globalpool;
569 	int *args;
570 	args = malloc(sizeof(int));
571 	passert(args);
572 	*args = sock;
573 	return job_new(jp,OP_MAINSERV,args,NULL,NULL);
574 }
575 */
576 
job_chunkop(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint32_t newversion,uint64_t copychunkid,uint32_t copyversion,uint32_t length)577 uint32_t job_chunkop(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint32_t newversion,uint64_t copychunkid,uint32_t copyversion,uint32_t length) {
578 	jobpool* jp = globalpool;
579 	chunk_op_args *args;
580 	args = malloc(sizeof(chunk_op_args));
581 	passert(args);
582 	args->chunkid = chunkid;
583 	args->version = version;
584 	args->newversion = newversion;
585 	args->copychunkid = copychunkid;
586 	args->copyversion = copyversion;
587 	args->length = length;
588 	return job_new(jp,OP_CHUNKOP,args,callback,extra);
589 }
590 /*
591 uint32_t job_open(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version) {
592 	jobpool* jp = globalpool;
593 	chunk_oc_args *args;
594 	args = malloc(sizeof(chunk_oc_args));
595 	passert(args);
596 	args->chunkid = chunkid;
597 	args->version = version;
598 	return job_new(jp,OP_OPEN,args,callback,extra);
599 }
600 
601 uint32_t job_close(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid) {
602 	jobpool* jp = globalpool;
603 	chunk_oc_args *args;
604 	args = malloc(sizeof(chunk_oc_args));
605 	passert(args);
606 	args->chunkid = chunkid;
607 	args->version = 0;
608 	return job_new(jp,OP_CLOSE,args,callback,extra);
609 }
610 
611 uint32_t job_read(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint16_t blocknum,uint8_t *buffer,uint32_t offset,uint32_t size,uint8_t *crcbuff) {
612 	jobpool* jp = globalpool;
613 	chunk_rd_args *args;
614 	args = malloc(sizeof(chunk_rd_args));
615 	passert(args);
616 	args->chunkid = chunkid;
617 	args->version = version;
618 	args->blocknum = blocknum;
619 	args->buffer = buffer;
620 	args->offset = offset;
621 	args->size = size;
622 	args->crcbuff = crcbuff;
623 	return job_new(jp,OP_READ,args,callback,extra);
624 }
625 
626 uint32_t job_write(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint16_t blocknum,const uint8_t *buffer,uint32_t offset,uint32_t size,const uint8_t *crcbuff) {
627 	jobpool* jp = globalpool;
628 	chunk_wr_args *args;
629 	args = malloc(sizeof(chunk_wr_args));
630 	passert(args);
631 	args->chunkid = chunkid;
632 	args->version = version;
633 	args->blocknum = blocknum;
634 	args->buffer = buffer;
635 	args->offset = offset;
636 	args->size = size;
637 	args->crcbuff = crcbuff;
638 	return job_new(jp,OP_WRITE,args,callback,extra);
639 }
640 */
641 
job_serv_read(void (* callback)(uint8_t status,void * extra),void * extra,int sock,const uint8_t * packet,uint32_t length)642 uint32_t job_serv_read(void (*callback)(uint8_t status,void *extra),void *extra,int sock,const uint8_t *packet,uint32_t length) {
643 	jobpool* jp = globalpool;
644 	chunk_rw_args *args;
645 	args = malloc(sizeof(chunk_rw_args));
646 	passert(args);
647 	args->sock = sock;
648 	args->packet = packet;
649 	args->length = length;
650 	return job_new(jp,OP_SERV_READ,args,callback,extra);
651 }
652 
job_serv_write(void (* callback)(uint8_t status,void * extra),void * extra,int sock,const uint8_t * packet,uint32_t length)653 uint32_t job_serv_write(void (*callback)(uint8_t status,void *extra),void *extra,int sock,const uint8_t *packet,uint32_t length) {
654 	jobpool* jp = globalpool;
655 	chunk_rw_args *args;
656 	args = malloc(sizeof(chunk_rw_args));
657 	passert(args);
658 	args->sock = sock;
659 	args->packet = packet;
660 	args->length = length;
661 	return job_new(jp,OP_SERV_WRITE,args,callback,extra);
662 }
663 
job_replicate(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint8_t * srcs)664 uint32_t job_replicate(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint8_t *srcs) {
665 	jobpool* jp = globalpool;
666 	chunk_rp_args *args;
667 	uint8_t *ptr;
668 	ptr = malloc(sizeof(chunk_rp_args)+srccnt*18);
669 	passert(ptr);
670 	args = (chunk_rp_args*)ptr;
671 	ptr += sizeof(chunk_rp_args);
672 	args->chunkid = chunkid;
673 	args->version = version;
674 	args->srccnt = srccnt;
675 	memcpy(ptr,srcs,srccnt*18);
676 	return job_new(jp,OP_REPLICATE,args,callback,extra);
677 }
678 
job_replicate_simple(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint32_t ip,uint16_t port)679 uint32_t job_replicate_simple(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint32_t ip,uint16_t port) {
680 	jobpool* jp = globalpool;
681 	chunk_rp_args *args;
682 	uint8_t *ptr;
683 	ptr = malloc(sizeof(chunk_rp_args)+18);
684 	passert(ptr);
685 	args = (chunk_rp_args*)ptr;
686 	ptr += sizeof(chunk_rp_args);
687 	args->chunkid = chunkid;
688 	args->version = version;
689 	args->srccnt = 1;
690 	put64bit(&ptr,chunkid);
691 	put32bit(&ptr,version);
692 	put32bit(&ptr,ip);
693 	put16bit(&ptr,port);
694 	return job_new(jp,OP_REPLICATE,args,callback,extra);
695 }
696 
job_get_chunk_blocks(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * blocks)697 uint32_t job_get_chunk_blocks(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *blocks) {
698 	jobpool* jp = globalpool;
699 	chunk_ij_args *args;
700 	args = malloc(sizeof(chunk_ij_args));
701 	passert(args);
702 	args->chunkid = chunkid;
703 	args->version = version;
704 	args->pointer = blocks;
705 	return job_new(jp,OP_GETBLOCKS,args,callback,extra);
706 }
707 
job_get_chunk_checksum(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * checksum)708 uint32_t job_get_chunk_checksum(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *checksum) {
709 	jobpool* jp = globalpool;
710 	chunk_ij_args *args;
711 	args = malloc(sizeof(chunk_ij_args));
712 	passert(args);
713 	args->chunkid = chunkid;
714 	args->version = version;
715 	args->pointer = checksum;
716 	return job_new(jp,OP_GETCHECKSUM,args,callback,extra);
717 }
718 
job_get_chunk_checksum_tab(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * checksum_tab)719 uint32_t job_get_chunk_checksum_tab(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *checksum_tab) {
720 	jobpool* jp = globalpool;
721 	chunk_ij_args *args;
722 	args = malloc(sizeof(chunk_ij_args));
723 	passert(args);
724 	args->chunkid = chunkid;
725 	args->version = version;
726 	args->pointer = checksum_tab;
727 	return job_new(jp,OP_GETCHECKSUMTAB,args,callback,extra);
728 }
729 
job_desc(struct pollfd * pdesc,uint32_t * ndesc)730 void job_desc(struct pollfd *pdesc,uint32_t *ndesc) {
731 	uint32_t pos = *ndesc;
732 	jobpool* jp = globalpool;
733 
734 	pdesc[pos].fd = jp->rpipe;
735 	pdesc[pos].events = POLLIN;
736 	jp->fdpdescpos = pos;
737 	pos++;
738 
739 	*ndesc = pos;
740 }
741 
job_serve(struct pollfd * pdesc)742 void job_serve(struct pollfd *pdesc) {
743 	jobpool* jp = globalpool;
744 	uint32_t jobscnt;
745 
746 	if (jp->fdpdescpos>=0 && (pdesc[jp->fdpdescpos].revents & POLLIN)) {
747 		job_pool_check_jobs(1);
748 	}
749 
750 	jobscnt = job_pool_jobs_count();
751 	if (jobscnt>=stats_maxjobscnt) {
752 		stats_maxjobscnt=jobscnt;
753 	}
754 }
755 
job_wantexit(void)756 void job_wantexit(void) {
757 	exiting = 1;
758 }
759 
job_canexit(void)760 int job_canexit(void) {
761 	return (job_pool_jobs_count()>0)?0:1;
762 }
763 
job_term(void)764 void job_term(void) {
765 	job_pool_delete(globalpool);
766 }
767 
job_reload(void)768 void job_reload(void) {
769 	jobpool* jp = globalpool;
770 
771 	zassert(pthread_mutex_lock(&(jp->jobslock)));
772 
773 	jp->workers_max = cfg_getuint32("WORKERS_MAX",150);
774 	jp->workers_max_idle = cfg_getuint32("WORKERS_MAX_IDLE",40);
775 
776 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
777 }
778 
job_init(void)779 int job_init(void) {
780 //	globalpool = (jobpool*)malloc(sizeof(jobpool));
781 	exiting = 0;
782 	globalpool = job_pool_new(256);
783 
784 	job_reload();
785 
786 	main_destruct_register(job_term);
787 	main_wantexit_register(job_wantexit);
788 	main_canexit_register(job_canexit);
789 	main_reload_register(job_reload);
790 	main_poll_register(job_desc,job_serve);
791 	return 0;
792 }
793