1 /*
2  * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <syslog.h>
29 #include <inttypes.h>
30 //#include <fcntl.h>
31 //#include <sys/ioctl.h>
32 #include <limits.h>
33 #include <signal.h>
34 #include <pthread.h>
35 #include <errno.h>
36 
37 #include "main.h"
38 #include "cfg.h"
39 #include "pcqueue.h"
40 #include "lwthread.h"
41 #include "datapack.h"
42 #include "massert.h"
43 
44 #include "mainserv.h"
45 #include "hddspacemgr.h"
46 #include "replicator.h"
47 #include "masterconn.h"
48 
49 #define JHASHSIZE 0x400
50 #define JHASHPOS(id) ((id)&0x3FF)
51 
52 enum {
53 	JSTATE_DISABLED,
54 	JSTATE_ENABLED,
55 	JSTATE_INPROGRESS
56 };
57 
58 enum {
59 	OP_EXIT=0,
60 	OP_INVAL,
61 //	OP_MAINSERV,
62 	OP_CHUNKOP,
63 //	OP_OPEN,
64 //	OP_CLOSE,
65 //	OP_READ,
66 //	OP_WRITE,
67 	OP_SERV_READ,
68 	OP_SERV_WRITE,
69 	OP_REPLICATE,
70 	OP_GETBLOCKS,
71 	OP_GETCHECKSUM,
72 	OP_GETCHECKSUMTAB,
73 	OP_CHUNKMOVE,
74 };
75 
76 // for OP_CHUNKOP
77 typedef struct _chunk_op_args {
78 	uint64_t chunkid,copychunkid;
79 	uint32_t version,newversion,copyversion;
80 	uint32_t length;
81 } chunk_op_args;
82 /*
83 // for OP_OPEN and OP_CLOSE
84 typedef struct _chunk_oc_args {
85 	uint64_t chunkid;
86 	uint32_t version;
87 } chunk_oc_args;
88 
89 // for OP_READ
90 typedef struct _chunk_rd_args {
91 	uint64_t chunkid;
92 	uint32_t version;
93 	uint32_t offset,size;
94 	uint16_t blocknum;
95 	uint8_t *buffer;
96 	uint8_t *crcbuff;
97 } chunk_rd_args;
98 
99 // for OP_WRITE
100 typedef struct _chunk_wr_args {
101 	uint64_t chunkid;
102 	uint32_t version;
103 	uint32_t offset,size;
104 	uint16_t blocknum;
105 	const uint8_t *buffer;
106 	const uint8_t *crcbuff;
107 } chunk_wr_args;
108 */
109 
110 // for OP_SERV_READ and OP_SERV_WRITE
111 typedef struct _chunk_rw_args {
112 	int sock;
113 	const uint8_t *packet;
114 	uint32_t length;
115 } chunk_rw_args;
116 
117 // for OP_REPLICATE
118 typedef struct _chunk_rp_args {
119 	uint64_t chunkid;
120 	uint32_t version;
121 	uint32_t xormasks[4];
122 	uint8_t srccnt;
123 } chunk_rp_args;
124 
125 // for OP_GETBLOCKS, OP_GETCHECKSUM and OP_GETCHECKSUMTAB
126 typedef struct _chunk_ij_args {
127 	uint64_t chunkid;
128 	uint32_t version;
129 	void *pointer;
130 } chunk_ij_args;
131 
132 // for OP_CHUNKMOVE
133 typedef struct _chunk_mv_args {
134 	void *fsrc;
135 	void *fdst;
136 } chunk_mv_args;
137 
138 typedef struct _job {
139 	uint32_t jobid;
140 	void (*callback)(uint8_t status,void *extra);
141 	void *extra;
142 	void *args;
143 	uint8_t jstate;
144 	struct _job *next;
145 } job;
146 
147 typedef struct _jobpool {
148 	int rpipe,wpipe;
149 	int32_t fdpdescpos;
150 	uint32_t workers_max;
151 	uint32_t workers_himark;
152 	uint32_t workers_lomark;
153 	uint32_t workers_max_idle;
154 	uint32_t workers_avail;
155 	uint32_t workers_total;
156 	uint32_t workers_term_waiting;
157 	pthread_cond_t worker_term_cond;
158 	pthread_mutex_t pipelock;
159 	pthread_mutex_t jobslock;
160 	void *jobqueue;
161 	void *statusqueue;
162 	job* jobhash[JHASHSIZE];
163 	uint32_t nextjobid;
164 } jobpool;
165 
166 typedef struct _worker {
167 	pthread_t thread_id;
168 	jobpool *jp;
169 } worker;
170 
171 static jobpool* globalpool = NULL;
172 
173 static uint32_t stats_maxjobscnt = 0;
174 
175 // static uint8_t exiting;
176 
job_stats(uint32_t * maxjobscnt)177 void job_stats(uint32_t *maxjobscnt) {
178 	*maxjobscnt = stats_maxjobscnt;
179 	stats_maxjobscnt = 0;
180 }
181 
job_send_status(jobpool * jp,uint32_t jobid,uint8_t status)182 static inline void job_send_status(jobpool *jp,uint32_t jobid,uint8_t status) {
183 	zassert(pthread_mutex_lock(&(jp->pipelock)));
184 	if (queue_isempty(jp->statusqueue)) {	// first status
185 		eassert(write(jp->wpipe,&status,1)==1);	// write anything to wake up select
186 	}
187 	queue_put(jp->statusqueue,jobid,status,NULL,1);
188 	zassert(pthread_mutex_unlock(&(jp->pipelock)));
189 	return;
190 }
191 
job_receive_status(jobpool * jp,uint32_t * jobid,uint8_t * status)192 static inline int job_receive_status(jobpool *jp,uint32_t *jobid,uint8_t *status) {
193 	uint32_t qstatus;
194 	zassert(pthread_mutex_lock(&(jp->pipelock)));
195 	queue_get(jp->statusqueue,jobid,&qstatus,NULL,NULL);
196 	*status = qstatus;
197 	if (queue_isempty(jp->statusqueue)) {
198 		eassert(read(jp->rpipe,&qstatus,1)==1);	// make pipe empty
199 		zassert(pthread_mutex_unlock(&(jp->pipelock)));
200 		return 0;	// last element
201 	}
202 	zassert(pthread_mutex_unlock(&(jp->pipelock)));
203 	return 1;	// not last
204 }
205 
206 void* job_worker(void *arg);
207 
208 static uint32_t lastnotify = 0;
209 
job_spawn_worker(jobpool * jp)210 static inline void job_spawn_worker(jobpool *jp) {
211 	worker *w;
212 
213 	w = malloc(sizeof(worker));
214 	passert(w);
215 	w->jp = jp;
216 	if (lwt_minthread_create(&(w->thread_id),0,job_worker,w)<0) {
217 		return;
218 	}
219 	jp->workers_avail++;
220 	jp->workers_total++;
221 	if (jp->workers_total%10==0 && lastnotify!=jp->workers_total) {
222 		syslog(LOG_NOTICE,"workers: %"PRIu32"+",jp->workers_total);
223 		lastnotify = jp->workers_total;
224 	}
225 //	syslog(LOG_NOTICE,"jobs: spawn worker (total: %"PRIu32")",jp->workers_total);
226 }
227 
job_close_worker(worker * w)228 static inline void job_close_worker(worker *w) {
229 	jobpool *jp = w->jp;
230 	jp->workers_avail--;
231 	jp->workers_total--;
232 	if (jp->workers_total==0 && jp->workers_term_waiting) {
233 		zassert(pthread_cond_signal(&(jp->worker_term_cond)));
234 		jp->workers_term_waiting--;
235 	}
236 	pthread_detach(w->thread_id);
237 	free(w);
238 	if (jp->workers_total%10==0 && lastnotify!=jp->workers_total) {
239 		syslog(LOG_NOTICE,"workers: %"PRIu32"-",jp->workers_total);
240 		lastnotify = jp->workers_total;
241 	}
242 //	syslog(LOG_NOTICE,"jobs: close worker (total: %"PRIu32")",jp->workers_total);
243 }
244 
245 #define opargs ((chunk_op_args*)(jptr->args))
246 // #define ocargs ((chunk_oc_args*)(jptr->args))
247 // #define rdargs ((chunk_rd_args*)(jptr->args))
248 // #define wrargs ((chunk_wr_args*)(jptr->args))
249 #define rwargs ((chunk_rw_args*)(jptr->args))
250 #define rpargs ((chunk_rp_args*)(jptr->args))
251 #define ijargs ((chunk_ij_args*)(jptr->args))
252 #define mvargs ((chunk_mv_args*)(jptr->args))
job_worker(void * arg)253 void* job_worker(void *arg) {
254 	worker *w = (worker*)arg;
255 	jobpool *jp = w->jp;
256 	job *jptr;
257 	uint8_t *jptrarg;
258 	uint8_t status,jstate;
259 	uint32_t jobid;
260 	uint32_t op;
261 
262 //	syslog(LOG_NOTICE,"worker %p started (jobqueue: %p ; jptr:%p ; jptrarg:%p ; status:%p )",(void*)pthread_self(),jp->jobqueue,(void*)&jptr,(void*)&jptrarg,(void*)&status);
263 	for (;;) {
264 		queue_get(jp->jobqueue,&jobid,&op,&jptrarg,NULL);
265 //		syslog(LOG_NOTICE,"job worker got job: %"PRIu32",%"PRIu32,jobid,op);
266 		jptr = (job*)jptrarg;
267 		zassert(pthread_mutex_lock(&(jp->jobslock)));
268 		if (jobid==0 && op==0 && jptrarg==NULL) { // queue has been closed
269 			job_close_worker(w);
270 			zassert(pthread_mutex_unlock(&(jp->jobslock)));
271 			return NULL;
272 		}
273 		jp->workers_avail--;
274 		if (jp->workers_avail==0 && jp->workers_total<jp->workers_max) {
275 			job_spawn_worker(jp);
276 		}
277 		if (jptr!=NULL) {
278 			jstate=jptr->jstate;
279 			if (jptr->jstate==JSTATE_ENABLED) {
280 				jptr->jstate=JSTATE_INPROGRESS;
281 			}
282 		} else {
283 			jstate=JSTATE_DISABLED;
284 		}
285 		zassert(pthread_mutex_unlock(&(jp->jobslock)));
286 		switch (op) {
287 			case OP_INVAL:
288 				status = MFS_ERROR_EINVAL;
289 				break;
290 /*
291 			case OP_MAINSERV:
292 				if (jstate==JSTATE_DISABLED) {
293 					status = MFS_ERROR_NOTDONE;
294 				} else {
295 					mainserv_serve(*((int*)(jptr->args)));
296 					status = MFS_STATUS_OK;
297 				}
298 				break;
299 */
300 			case OP_CHUNKOP:
301 				if (jstate==JSTATE_DISABLED) {
302 					status = MFS_ERROR_NOTDONE;
303 				} else {
304 					status = hdd_chunkop(opargs->chunkid,opargs->version,opargs->newversion,opargs->copychunkid,opargs->copyversion,opargs->length);
305 				}
306 				break;
307 /*
308 			case OP_OPEN:
309 				if (jstate==JSTATE_DISABLED) {
310 					status = MFS_ERROR_NOTDONE;
311 				} else {
312 					status = hdd_open(ocargs->chunkid,ocargs->version);
313 				}
314 				break;
315 			case OP_CLOSE:
316 				if (jstate==JSTATE_DISABLED) {
317 					status = MFS_ERROR_NOTDONE;
318 				} else {
319 					status = hdd_close(ocargs->chunkid);
320 				}
321 				break;
322 			case OP_READ:
323 				if (jstate==JSTATE_DISABLED) {
324 					status = MFS_ERROR_NOTDONE;
325 				} else {
326 					status = hdd_read(rdargs->chunkid,rdargs->version,rdargs->blocknum,rdargs->buffer,rdargs->offset,rdargs->size,rdargs->crcbuff);
327 				}
328 				break;
329 			case OP_WRITE:
330 				if (jstate==JSTATE_DISABLED) {
331 					status = MFS_ERROR_NOTDONE;
332 				} else {
333 					status = hdd_write(wrargs->chunkid,wrargs->version,wrargs->blocknum,wrargs->buffer,wrargs->offset,wrargs->size,wrargs->crcbuff);
334 				}
335 				break;
336 */
337 			case OP_SERV_READ:
338 				if (jstate==JSTATE_DISABLED) {
339 					status = MFS_ERROR_NOTDONE;
340 				} else {
341 					status = mainserv_read(rwargs->sock,rwargs->packet,rwargs->length);
342 				}
343 				break;
344 			case OP_SERV_WRITE:
345 				if (jstate==JSTATE_DISABLED) {
346 					status = MFS_ERROR_NOTDONE;
347 				} else {
348 					status = mainserv_write(rwargs->sock,rwargs->packet,rwargs->length);
349 				}
350 				break;
351 			case OP_REPLICATE:
352 				if (jstate==JSTATE_DISABLED) {
353 					status = MFS_ERROR_NOTDONE;
354 				} else {
355 					status = replicate(rpargs->chunkid,rpargs->version,rpargs->xormasks,rpargs->srccnt,((uint8_t*)(jptr->args))+sizeof(chunk_rp_args));
356 				}
357 				break;
358 			case OP_GETBLOCKS:
359 				if (jstate==JSTATE_DISABLED) {
360 					status = MFS_ERROR_NOTDONE;
361 				} else {
362 					status = hdd_get_blocks(ijargs->chunkid,ijargs->version,ijargs->pointer);
363 				}
364 				break;
365 			case OP_GETCHECKSUM:
366 				if (jstate==JSTATE_DISABLED) {
367 					status = MFS_ERROR_NOTDONE;
368 				} else {
369 					status = hdd_get_checksum(ijargs->chunkid,ijargs->version,ijargs->pointer);
370 				}
371 				break;
372 			case OP_GETCHECKSUMTAB:
373 				if (jstate==JSTATE_DISABLED) {
374 					status = MFS_ERROR_NOTDONE;
375 				} else {
376 					status = hdd_get_checksum_tab(ijargs->chunkid,ijargs->version,ijargs->pointer);
377 				}
378 				break;
379 			case OP_CHUNKMOVE:
380 				if (jstate==JSTATE_DISABLED) {
381 					status = MFS_ERROR_NOTDONE;
382 				} else {
383 					status = hdd_move(mvargs->fsrc,mvargs->fdst);
384 				}
385 				break;
386 			default: // OP_EXIT
387 //				syslog(LOG_NOTICE,"worker %p exiting (jobqueue: %p)",(void*)pthread_self(),jp->jobqueue);
388 				zassert(pthread_mutex_lock(&(jp->jobslock)));
389 				job_close_worker(w);
390 				zassert(pthread_mutex_unlock(&(jp->jobslock)));
391 				return NULL;
392 		}
393 		job_send_status(jp,jobid,status);
394 		zassert(pthread_mutex_lock(&(jp->jobslock)));
395 		jp->workers_avail++;
396 		if (jp->workers_avail > jp->workers_max_idle) {
397 			job_close_worker(w);
398 			zassert(pthread_mutex_unlock(&(jp->jobslock)));
399 			return NULL;
400 		}
401 		zassert(pthread_mutex_unlock(&(jp->jobslock)));
402 	}
403 }
404 
405 #define JOB_MODE_ALWAYS_DO 0
406 #define JOB_MODE_LIMITED_RETURN 1
407 #define JOB_MODE_LIMITED_QUEUE 2
408 
job_new(jobpool * jp,uint32_t op,void * args,void (* callback)(uint8_t status,void * extra),void * extra,uint8_t errstatus,uint8_t jobmode)409 static inline uint32_t job_new(jobpool *jp,uint32_t op,void *args,void (*callback)(uint8_t status,void *extra),void *extra,uint8_t errstatus,uint8_t jobmode) {
410 //	jobpool* jp = (jobpool*)jpool;
411 /*
412 	if (exiting) {
413 		if (callback) {
414 			callback(MFS_ERROR_NOTDONE,extra);
415 		}
416 		if (args) {
417 			free(args);
418 		}
419 		return 0;
420 	} else {
421 */
422 		uint32_t jobid;
423 		uint32_t jhpos;
424 		uint32_t workers_busy;
425 		uint32_t limit;
426 		job **jhandle,*jptr;
427 
428 		jptr = malloc(sizeof(job));
429 		passert(jptr);
430 
431 		zassert(pthread_mutex_lock(&(jp->jobslock)));
432 		jobid = jp->nextjobid;
433 		jp->nextjobid++;
434 		if (jp->nextjobid==0) {
435 			jp->nextjobid=1;
436 		}
437 		jhpos = JHASHPOS(jobid);
438 		jptr->jobid = jobid;
439 		jptr->callback = callback;
440 		jptr->extra = extra;
441 		jptr->args = args;
442 		jptr->jstate = JSTATE_ENABLED;
443 		jptr->next = jp->jobhash[jhpos];
444 		jp->jobhash[jhpos] = jptr;
445 		workers_busy = jp->workers_total-jp->workers_avail;
446 		limit = jp->workers_max;
447 		zassert(pthread_mutex_unlock(&(jp->jobslock)));
448 		if (queue_elements(jp->jobqueue)+workers_busy>limit && jobmode!=JOB_MODE_ALWAYS_DO) {
449 			if (jobmode==JOB_MODE_LIMITED_RETURN) {
450 				// remove this job from data structures
451 				zassert(pthread_mutex_lock(&(jp->jobslock)));
452 				jhandle = jp->jobhash+jhpos;
453 				while ((jptr = *jhandle)) {
454 					if (jptr->jobid==jobid) {
455 						*jhandle = jptr->next;
456 						if (jptr->args) {
457 							free(jptr->args);
458 						}
459 						free(jptr);
460 						break;
461 					} else {
462 						jhandle = &(jptr->next);
463 					}
464 				}
465 				zassert(pthread_mutex_unlock(&(jp->jobslock)));
466 				// end return jobid==0
467 				return 0;
468 			} else {
469 				job_send_status(jp,jobid,errstatus);
470 			}
471 		} else {
472 			queue_put(jp->jobqueue,jobid,op,(uint8_t*)jptr,1);
473 		}
474 		return jobid;
475 //	}
476 }
477 
478 /* interface */
479 
job_pool_new(void)480 void* job_pool_new(void) {
481 	int fd[2];
482 	uint32_t i;
483 	jobpool* jp;
484 
485 	if (pipe(fd)<0) {
486 		return NULL;
487 	}
488 	jp=malloc(sizeof(jobpool));
489 	passert(jp);
490 //	syslog(LOG_WARNING,"new pool of workers (%p:%"PRIu8")",(void*)jp,workers);
491 	jp->rpipe = fd[0];
492 	jp->wpipe = fd[1];
493 	jp->workers_avail = 0;
494 	jp->workers_total = 0;
495 	jp->workers_term_waiting = 0;
496 	zassert(pthread_cond_init(&(jp->worker_term_cond),NULL));
497 	zassert(pthread_mutex_init(&(jp->pipelock),NULL));
498 	zassert(pthread_mutex_init(&(jp->jobslock),NULL));
499 	jp->jobqueue = queue_new(0);
500 //	syslog(LOG_WARNING,"new jobqueue: %p",jp->jobqueue);
501 	jp->statusqueue = queue_new(0);
502 	zassert(pthread_mutex_lock(&(jp->jobslock)));
503 	for (i=0 ; i<JHASHSIZE ; i++) {
504 		jp->jobhash[i]=NULL;
505 	}
506 	jp->nextjobid = 1;
507 	job_spawn_worker(jp);
508 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
509 	return jp;
510 }
511 
job_pool_jobs_count(void)512 static uint32_t job_pool_jobs_count(void) {
513 	jobpool* jp = globalpool;
514 	uint32_t res;
515 	zassert(pthread_mutex_lock(&(jp->jobslock)));
516 	res = (jp->workers_total - jp->workers_avail) + queue_elements(jp->jobqueue);
517 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
518 	return res;
519 }
520 
521 /*
522 void job_pool_disable_and_change_callback_all(void (*callback)(uint8_t status,void *extra)) {
523 	jobpool* jp = globalpool;
524 	uint32_t jhpos;
525 	job *jptr;
526 
527 	zassert(pthread_mutex_lock(&(jp->jobslock)));
528 	for (jhpos = 0 ; jhpos<JHASHSIZE ; jhpos++) {
529 		for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
530 			if (jptr->jstate==JSTATE_ENABLED) {
531 				jptr->jstate=JSTATE_DISABLED;
532 			}
533 			jptr->callback=callback;
534 		}
535 	}
536 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
537 }
538 */
539 
job_pool_disable_job(uint32_t jobid)540 void job_pool_disable_job(uint32_t jobid) {
541 	jobpool* jp = globalpool;
542 	uint32_t jhpos = JHASHPOS(jobid);
543 	job *jptr;
544 
545 	zassert(pthread_mutex_lock(&(jp->jobslock)));
546 	for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
547 		if (jptr->jobid==jobid) {
548 			if (jptr->jstate==JSTATE_ENABLED) {
549 				jptr->jstate=JSTATE_DISABLED;
550 			}
551 		}
552 	}
553 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
554 }
555 
job_pool_change_callback(uint32_t jobid,void (* callback)(uint8_t status,void * extra),void * extra)556 void job_pool_change_callback(uint32_t jobid,void (*callback)(uint8_t status,void *extra),void *extra) {
557 	jobpool* jp = globalpool;
558 	uint32_t jhpos = JHASHPOS(jobid);
559 	job *jptr;
560 
561 	zassert(pthread_mutex_lock(&(jp->jobslock)));
562 	for (jptr = jp->jobhash[jhpos] ; jptr ; jptr=jptr->next) {
563 		if (jptr->jobid==jobid) {
564 			jptr->callback=callback;
565 			jptr->extra=extra;
566 		}
567 	}
568 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
569 }
570 
job_pool_check_jobs(uint8_t cb)571 void job_pool_check_jobs(uint8_t cb) {
572 	jobpool* jp = globalpool;
573 	uint32_t jobid,jhpos;
574 	uint8_t status;
575 	int notlast;
576 	job **jhandle,*jptr;
577 
578 	zassert(pthread_mutex_lock(&(jp->jobslock)));
579 	do {
580 		notlast = job_receive_status(jp,&jobid,&status);
581 		jhpos = JHASHPOS(jobid);
582 		jhandle = jp->jobhash+jhpos;
583 		while ((jptr = *jhandle)) {
584 			if (jptr->jobid==jobid) {
585 				if (jptr->callback && cb) {
586 					jptr->callback(status,jptr->extra);
587 				}
588 				*jhandle = jptr->next;
589 				if (jptr->args) {
590 					free(jptr->args);
591 				}
592 				free(jptr);
593 				break;
594 			} else {
595 				jhandle = &(jptr->next);
596 			}
597 		}
598 	} while (notlast);
599 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
600 }
601 
job_pool_delete(jobpool * jp)602 void job_pool_delete(jobpool* jp) {
603 	queue_close(jp->jobqueue);
604 	zassert(pthread_mutex_lock(&(jp->jobslock)));
605 	while (jp->workers_total>0) {
606 		jp->workers_term_waiting++;
607 		zassert(pthread_cond_wait(&(jp->worker_term_cond),&(jp->jobslock)));
608 	}
609 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
610 	if (!queue_isempty(jp->statusqueue)) {
611 		syslog(LOG_WARNING,"not empty job queue !!!");
612 		job_pool_check_jobs(0);
613 	}
614 //	syslog(LOG_NOTICE,"deleting jobqueue: %p",jp->jobqueue);
615 	queue_delete(jp->jobqueue);
616 	queue_delete(jp->statusqueue);
617 	zassert(pthread_cond_destroy(&(jp->worker_term_cond)));
618 	zassert(pthread_mutex_destroy(&(jp->pipelock)));
619 	zassert(pthread_mutex_destroy(&(jp->jobslock)));
620 	close(jp->rpipe);
621 	close(jp->wpipe);
622 	free(jp);
623 }
624 
job_inval(void (* callback)(uint8_t status,void * extra),void * extra)625 uint32_t job_inval(void (*callback)(uint8_t status,void *extra),void *extra) {
626 	jobpool* jp = globalpool;
627 	return job_new(jp,OP_INVAL,NULL,callback,extra,MFS_ERROR_EINVAL,JOB_MODE_LIMITED_QUEUE);
628 }
629 
630 /*
631 uint32_t job_mainserv(int sock) {
632 	jobpool* jp = globalpool;
633 	int *args;
634 	args = malloc(sizeof(int));
635 	passert(args);
636 	*args = sock;
637 	return job_new(jp,OP_MAINSERV,args,NULL,NULL);
638 }
639 */
640 
job_chunkop(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint32_t newversion,uint64_t copychunkid,uint32_t copyversion,uint32_t length)641 uint32_t job_chunkop(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint32_t newversion,uint64_t copychunkid,uint32_t copyversion,uint32_t length) {
642 	jobpool* jp = globalpool;
643 	chunk_op_args *args;
644 	args = malloc(sizeof(chunk_op_args));
645 	passert(args);
646 	args->chunkid = chunkid;
647 	args->version = version;
648 	args->newversion = newversion;
649 	args->copychunkid = copychunkid;
650 	args->copyversion = copyversion;
651 	args->length = length;
652 	return job_new(jp,OP_CHUNKOP,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
653 }
654 /*
655 uint32_t job_open(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version) {
656 	jobpool* jp = globalpool;
657 	chunk_oc_args *args;
658 	args = malloc(sizeof(chunk_oc_args));
659 	passert(args);
660 	args->chunkid = chunkid;
661 	args->version = version;
662 	return job_new(jp,OP_OPEN,args,callback,extra);
663 }
664 
665 uint32_t job_close(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid) {
666 	jobpool* jp = globalpool;
667 	chunk_oc_args *args;
668 	args = malloc(sizeof(chunk_oc_args));
669 	passert(args);
670 	args->chunkid = chunkid;
671 	args->version = 0;
672 	return job_new(jp,OP_CLOSE,args,callback,extra);
673 }
674 
675 uint32_t job_read(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint16_t blocknum,uint8_t *buffer,uint32_t offset,uint32_t size,uint8_t *crcbuff) {
676 	jobpool* jp = globalpool;
677 	chunk_rd_args *args;
678 	args = malloc(sizeof(chunk_rd_args));
679 	passert(args);
680 	args->chunkid = chunkid;
681 	args->version = version;
682 	args->blocknum = blocknum;
683 	args->buffer = buffer;
684 	args->offset = offset;
685 	args->size = size;
686 	args->crcbuff = crcbuff;
687 	return job_new(jp,OP_READ,args,callback,extra);
688 }
689 
690 uint32_t job_write(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint16_t blocknum,const uint8_t *buffer,uint32_t offset,uint32_t size,const uint8_t *crcbuff) {
691 	jobpool* jp = globalpool;
692 	chunk_wr_args *args;
693 	args = malloc(sizeof(chunk_wr_args));
694 	passert(args);
695 	args->chunkid = chunkid;
696 	args->version = version;
697 	args->blocknum = blocknum;
698 	args->buffer = buffer;
699 	args->offset = offset;
700 	args->size = size;
701 	args->crcbuff = crcbuff;
702 	return job_new(jp,OP_WRITE,args,callback,extra);
703 }
704 */
705 
job_serv_read(void (* callback)(uint8_t status,void * extra),void * extra,int sock,const uint8_t * packet,uint32_t length)706 uint32_t job_serv_read(void (*callback)(uint8_t status,void *extra),void *extra,int sock,const uint8_t *packet,uint32_t length) {
707 	jobpool* jp = globalpool;
708 	chunk_rw_args *args;
709 	args = malloc(sizeof(chunk_rw_args));
710 	passert(args);
711 	args->sock = sock;
712 	args->packet = packet;
713 	args->length = length;
714 	return job_new(jp,OP_SERV_READ,args,callback,extra,0,JOB_MODE_LIMITED_RETURN);
715 }
716 
job_serv_write(void (* callback)(uint8_t status,void * extra),void * extra,int sock,const uint8_t * packet,uint32_t length)717 uint32_t job_serv_write(void (*callback)(uint8_t status,void *extra),void *extra,int sock,const uint8_t *packet,uint32_t length) {
718 	jobpool* jp = globalpool;
719 	chunk_rw_args *args;
720 	args = malloc(sizeof(chunk_rw_args));
721 	passert(args);
722 	args->sock = sock;
723 	args->packet = packet;
724 	args->length = length;
725 	return job_new(jp,OP_SERV_WRITE,args,callback,extra,0,JOB_MODE_LIMITED_RETURN);
726 }
727 
job_replicate_raid(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint32_t xormasks[4],const uint8_t * srcs)728 uint32_t job_replicate_raid(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint32_t xormasks[4],const uint8_t *srcs) {
729 	jobpool* jp = globalpool;
730 	chunk_rp_args *args;
731 	uint8_t *ptr;
732 	ptr = malloc(sizeof(chunk_rp_args)+srccnt*18);
733 	passert(ptr);
734 	args = (chunk_rp_args*)ptr;
735 	ptr += sizeof(chunk_rp_args);
736 	args->chunkid = chunkid;
737 	args->version = version;
738 	args->srccnt = srccnt;
739 	args->xormasks[0] = xormasks[0];
740 	args->xormasks[1] = xormasks[1];
741 	args->xormasks[2] = xormasks[2];
742 	args->xormasks[3] = xormasks[3];
743 	memcpy(ptr,srcs,srccnt*18);
744 	return job_new(jp,OP_REPLICATE,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
745 }
746 
job_replicate_simple(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint32_t ip,uint16_t port)747 uint32_t job_replicate_simple(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint32_t ip,uint16_t port) {
748 	jobpool* jp = globalpool;
749 	chunk_rp_args *args;
750 	uint8_t *ptr;
751 	ptr = malloc(sizeof(chunk_rp_args)+18);
752 	passert(ptr);
753 	args = (chunk_rp_args*)ptr;
754 	ptr += sizeof(chunk_rp_args);
755 	args->chunkid = chunkid;
756 	args->version = version;
757 	args->srccnt = 1;
758 	args->xormasks[0] = UINT32_C(0x88888888);
759 	args->xormasks[1] = UINT32_C(0x44444444);
760 	args->xormasks[2] = UINT32_C(0x22222222);
761 	args->xormasks[3] = UINT32_C(0x11111111);
762 	put64bit(&ptr,chunkid);
763 	put32bit(&ptr,version);
764 	put32bit(&ptr,ip);
765 	put16bit(&ptr,port);
766 	return job_new(jp,OP_REPLICATE,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
767 }
768 
job_get_chunk_blocks(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * blocks)769 uint32_t job_get_chunk_blocks(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *blocks) {
770 	jobpool* jp = globalpool;
771 	chunk_ij_args *args;
772 	args = malloc(sizeof(chunk_ij_args));
773 	passert(args);
774 	args->chunkid = chunkid;
775 	args->version = version;
776 	args->pointer = blocks;
777 	return job_new(jp,OP_GETBLOCKS,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
778 }
779 
job_get_chunk_checksum(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * checksum)780 uint32_t job_get_chunk_checksum(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *checksum) {
781 	jobpool* jp = globalpool;
782 	chunk_ij_args *args;
783 	args = malloc(sizeof(chunk_ij_args));
784 	passert(args);
785 	args->chunkid = chunkid;
786 	args->version = version;
787 	args->pointer = checksum;
788 	return job_new(jp,OP_GETCHECKSUM,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
789 }
790 
job_get_chunk_checksum_tab(void (* callback)(uint8_t status,void * extra),void * extra,uint64_t chunkid,uint32_t version,uint8_t * checksum_tab)791 uint32_t job_get_chunk_checksum_tab(void (*callback)(uint8_t status,void *extra),void *extra,uint64_t chunkid,uint32_t version,uint8_t *checksum_tab) {
792 	jobpool* jp = globalpool;
793 	chunk_ij_args *args;
794 	args = malloc(sizeof(chunk_ij_args));
795 	passert(args);
796 	args->chunkid = chunkid;
797 	args->version = version;
798 	args->pointer = checksum_tab;
799 	return job_new(jp,OP_GETCHECKSUMTAB,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
800 }
801 
job_chunk_move(void (* callback)(uint8_t status,void * extra),void * extra,void * fsrc,void * fdst)802 uint32_t job_chunk_move(void (*callback)(uint8_t status,void *extra),void *extra,void *fsrc,void *fdst) {
803 	jobpool* jp = globalpool;
804 	chunk_mv_args *args;
805 	args = malloc(sizeof(chunk_mv_args));
806 	passert(args);
807 	args->fsrc = fsrc;
808 	args->fdst = fdst;
809 	return job_new(jp,OP_CHUNKMOVE,args,callback,extra,MFS_ERROR_NOTDONE,JOB_MODE_LIMITED_QUEUE);
810 }
811 
job_desc(struct pollfd * pdesc,uint32_t * ndesc)812 void job_desc(struct pollfd *pdesc,uint32_t *ndesc) {
813 	uint32_t pos = *ndesc;
814 	jobpool* jp = globalpool;
815 
816 	pdesc[pos].fd = jp->rpipe;
817 	pdesc[pos].events = POLLIN;
818 	jp->fdpdescpos = pos;
819 	pos++;
820 
821 	*ndesc = pos;
822 }
823 
job_serve(struct pollfd * pdesc)824 void job_serve(struct pollfd *pdesc) {
825 	jobpool* jp = globalpool;
826 	uint32_t jobscnt;
827 
828 	if (jp->fdpdescpos>=0 && (pdesc[jp->fdpdescpos].revents & POLLIN)) {
829 		job_pool_check_jobs(1);
830 	}
831 
832 	jobscnt = job_pool_jobs_count();
833 	if (jobscnt>=stats_maxjobscnt) {
834 		stats_maxjobscnt=jobscnt;
835 	}
836 }
837 
838 // can be only HLSTATUS_OK or HLSTATUS_OVERLOADED
839 static uint8_t current_hlstatus = HLSTATUS_OK;
840 
job_get_load_and_hlstatus(uint32_t * load,uint8_t * hlstatus)841 void job_get_load_and_hlstatus(uint32_t *load,uint8_t *hlstatus) {
842 	*load = job_pool_jobs_count();
843 	*hlstatus = current_hlstatus;
844 }
845 
job_heavyload_test(void)846 void job_heavyload_test(void) {
847 	jobpool* jp = globalpool;
848 	uint8_t hlstatus;
849 
850 	zassert(pthread_mutex_lock(&(jp->jobslock)));
851 	hlstatus = HLSTATUS_DEFAULT;
852 	if (jp->workers_total - jp->workers_avail > jp->workers_himark) {
853 		hlstatus = HLSTATUS_OVERLOADED;
854 	}
855 	if (jp->workers_total - jp->workers_avail < jp->workers_lomark) {
856 		hlstatus = HLSTATUS_OK;
857 	}
858 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
859 
860 	if (hlstatus!=HLSTATUS_DEFAULT && hlstatus!=current_hlstatus) {
861 		current_hlstatus = hlstatus;
862 		masterconn_reportload();
863 	}
864 }
865 
866 //void job_wantexit(void) {
867 //	exiting = 1;
868 //}
869 
job_canexit(void)870 int job_canexit(void) {
871 	return (job_pool_jobs_count()>0)?0:1;
872 }
873 
job_term(void)874 void job_term(void) {
875 	job_pool_delete(globalpool);
876 }
877 
job_reload(void)878 void job_reload(void) {
879 	jobpool* jp = globalpool;
880 
881 	zassert(pthread_mutex_lock(&(jp->jobslock)));
882 
883 	jp->workers_max = cfg_getuint32("WORKERS_MAX",250);
884 	jp->workers_himark = (jp->workers_max * 3) / 4;
885 	jp->workers_lomark = (jp->workers_max * 2) / 4;
886 	jp->workers_max_idle = cfg_getuint32("WORKERS_MAX_IDLE",40);
887 
888 	zassert(pthread_mutex_unlock(&(jp->jobslock)));
889 }
890 
job_init(void)891 int job_init(void) {
892 //	globalpool = (jobpool*)malloc(sizeof(jobpool));
893 //	exiting = 0;
894 	globalpool = job_pool_new();
895 
896 	if (globalpool==NULL) {
897 		return -1;
898 	}
899 	job_reload();
900 
901 	main_destruct_register(job_term);
902 //	main_wantexit_register(job_wantexit);
903 	main_canexit_register(job_canexit);
904 	main_reload_register(job_reload);
905 	main_eachloop_register(job_heavyload_test);
906 	main_poll_register(job_desc,job_serve);
907 	return 0;
908 }
909