1 /* master side of the hmmpgmd daemon
2 * MSF, Thu Aug 12, 2010 [Janelia]
3 */
4 #include "p7_config.h"
5
6 #ifdef HMMER_THREADS
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <errno.h>
12 #include <unistd.h>
13 #include <signal.h>
14 #include <pthread.h>
15 #include <setjmp.h>
16 #include <sys/socket.h>
17 #ifdef HAVE_NETINET_IN_H
18 #include <netinet/in.h> /* On FreeBSD, you need netinet/in.h for struct sockaddr_in */
19 #endif /* On OpenBSD, netinet/in.h is required for (must precede) arpa/inet.h */
20 #include <arpa/inet.h>
21 #include <syslog.h>
22 #include <assert.h>
23 #include <time.h>
24
25 #ifndef HMMER_THREADS
26 #error "Program requires pthreads be enabled."
27 #endif /*HMMER_THREADS*/
28
29 #include "easel.h"
30 #include "esl_alphabet.h"
31 #include "esl_getopts.h"
32 #include "esl_sq.h"
33 #include "esl_sqio.h"
34 #include "esl_stack.h"
35 #include "esl_stopwatch.h"
36 #include "esl_threads.h"
37
38 #include "hmmer.h"
39 #include "hmmpgmd.h"
40 #include "cachedb.h"
41 #include "p7_hmmcache.h"
42
43 #define MAX_WORKERS 64
44 #define MAX_BUFFER 4096
45
46 #define CONF_FILE "/etc/hmmpgmd.conf"
47
48 typedef struct {
49 HMMD_SEARCH_STATS stats;
50 HMMD_SEARCH_STATUS status;
51 P7_HIT **hits;
52 int nhits;
53 int db_inx;
54 int db_cnt;
55 int errors;
56 } SEARCH_RESULTS;
57
58 typedef struct {
59 int sock_fd;
60 char ip_addr[64];
61
62 ESL_STACK *cmdstack; /* stack of commands that clients want done */
63 } CLIENTSIDE_ARGS;
64
65 typedef struct {
66 int sock_fd;
67
68 pthread_mutex_t work_mutex;
69 pthread_cond_t start_cond;
70 pthread_cond_t complete_cond;
71
72 int db_version;
73 P7_SEQCACHE *seq_db;
74 P7_HMMCACHE *hmm_db;
75
76 int ready;
77 int failed;
78 struct worker_s *head;
79 struct worker_s *tail;
80
81 int pend_cnt;
82 struct worker_s *pending;
83
84 int idle_cnt;
85 struct worker_s *idling;
86
87 RANGE_LIST *range_list; /* (optional) list of ranges searched within the seqdb */
88
89 int completed;
90 } WORKERSIDE_ARGS;
91
92 typedef struct worker_s {
93 int sock_fd;
94 char ip_addr[64];
95
96 int completed;
97 int terminated;
98 HMMD_COMMAND *cmd;
99
100 uint32_t srch_inx;
101 uint32_t srch_cnt;
102
103 HMMD_SEARCH_STATS stats;
104 HMMD_SEARCH_STATUS status;
105 char *err_buf;
106 P7_HIT **hits;
107 uint32_t allocated_hits;
108 int total;
109
110 WORKERSIDE_ARGS *parent;
111
112 struct worker_s *next;
113 struct worker_s *prev;
114 } WORKER_DATA;
115
116
117 static void setup_clientside_comm(ESL_GETOPTS *opts, CLIENTSIDE_ARGS *args);
118 static void setup_workerside_comm(ESL_GETOPTS *opts, WORKERSIDE_ARGS *args);
119
120 static void destroy_worker(WORKER_DATA *worker);
121
122 static void init_results(SEARCH_RESULTS *results);
123 static void clear_results(WORKERSIDE_ARGS *comm, SEARCH_RESULTS *results);
124 static void gather_results(QUEUE_DATA *query, WORKERSIDE_ARGS *comm, SEARCH_RESULTS *results);
125 static void forward_results(QUEUE_DATA *query, SEARCH_RESULTS *results);
126
127 static void
print_client_msg(int fd,int status,char * format,va_list ap)128 print_client_msg(int fd, int status, char *format, va_list ap)
129 {
130 uint32_t nalloc =0;
131 uint32_t buf_offset = 0;
132 uint8_t *buf = NULL;
133 char ebuf[512];
134
135 HMMD_SEARCH_STATUS s;
136
137 memset(&s, 0, sizeof(HMMD_SEARCH_STATUS));
138
139 s.status = status;
140 s.msg_size = vsnprintf(ebuf, sizeof(ebuf), format, ap) +1; /* +1 because we send the \0 */
141
142 p7_syslog(LOG_ERR, ebuf);
143
144 if(hmmd_search_status_Serialize(&s, &buf, &buf_offset, &nalloc) != eslOK){
145 LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATUS failed", errno);
146 }
147 /* send back an unsuccessful status message */
148
149 if (writen(fd, buf, buf_offset) != buf_offset) {
150 p7_syslog(LOG_ERR,"[%s:%d] - writing (%d) error %d - %s\n", __FILE__, __LINE__, fd, errno, strerror(errno));
151 return;
152 }
153 if (writen(fd, ebuf, s.msg_size) != s.msg_size) {
154 p7_syslog(LOG_ERR,"[%s:%d] - writing (%d) error %d - %s\n", __FILE__, __LINE__, fd, errno, strerror(errno));
155 return;
156 }
157
158 free(buf);
159 }
160
161 static void
client_msg(int fd,int status,char * format,...)162 client_msg(int fd, int status, char *format, ...)
163 {
164 va_list ap;
165
166 va_start(ap, format);
167 print_client_msg(fd, status, format, ap);
168 va_end(ap);
169 }
170
171 static void
client_msg_longjmp(int fd,int status,jmp_buf * env,char * format,...)172 client_msg_longjmp(int fd, int status, jmp_buf *env, char *format, ...)
173 {
174 va_list ap;
175
176 va_start(ap, format);
177 print_client_msg(fd, status, format, ap);
178 va_end(ap);
179
180 longjmp(*env, 1);
181 }
182
183 static int
validate_workers(WORKERSIDE_ARGS * args)184 validate_workers(WORKERSIDE_ARGS *args)
185 {
186 int ready = 0;
187 int failed = 0;
188 int pending = 0;
189 int idling = 0;
190
191 WORKER_DATA *worker = NULL;
192 WORKER_DATA *tail = NULL;
193
194 /* count the idling workers */
195 worker = args->idling;
196 while (worker != NULL) {
197 ++idling;
198 if (worker->terminated) ++failed;
199 worker = worker->next;
200 }
201 assert(idling == args->idle_cnt);
202
203 /* count the pending workers */
204 worker = args->pending;
205 while (worker != NULL) {
206 ++pending;
207 if (worker->terminated) ++failed;
208 worker = worker->next;
209 }
210 assert(pending == args->pend_cnt);
211
212 if (args->head == NULL && args->tail == NULL) {
213 assert(failed == args->failed);
214 assert(ready == 0);
215 return 1;
216 }
217
218 assert(args->head != NULL && args->tail != NULL);
219 assert(args->head->prev == NULL);
220 assert(args->tail->next == NULL);
221
222 /* count the ready workers */
223 worker = args->head;
224 while (worker != NULL) {
225 ++ready;
226 assert(worker->prev == tail);
227 assert(ready <= args->ready);
228 tail = worker;
229 if (worker->terminated) ++failed;
230 worker = worker->next;
231 }
232 assert(ready == args->ready);
233 assert(failed == args->failed);
234 assert(tail == args->tail);
235
236 return 1;
237 }
238
239 static void
update_workers(WORKERSIDE_ARGS * args)240 update_workers(WORKERSIDE_ARGS *args)
241 {
242 WORKER_DATA *worker = NULL;
243
244 assert(validate_workers(args));
245
246 /* if there are any workers waiting to join, add them */
247 while (args->pending != NULL) {
248 worker = args->pending;
249 args->pending = worker->next;
250
251 worker->next = NULL;
252 if (args->head == NULL) {
253 args->head = worker;
254 worker->prev = NULL;
255 } else {
256 args->tail->next = worker;
257 worker->prev = args->tail;
258 }
259 args->tail = worker;
260
261 args->pend_cnt--;
262 args->ready++;
263 }
264
265 /* remove any workers who have failed */
266 worker = args->head;
267 while (args->failed > 0 && worker != NULL) {
268 WORKER_DATA *next = worker->next;
269 if (worker->terminated) {
270 --args->failed;
271 --args->ready;
272 if (args->head == worker && args->tail == worker) {
273 args->head = NULL;
274 args->tail = NULL;
275 } else if (args->head == worker) {
276 args->head = worker->next;
277 worker->next->prev = NULL;
278 } else if (args->tail == worker) {
279 args->tail = worker->prev;
280 worker->prev->next = NULL;
281 } else {
282 worker->next->prev = worker->prev;
283 worker->prev->next = worker->next;
284 }
285 destroy_worker(worker);
286 }
287 worker = next;
288 }
289
290 assert(validate_workers(args));
291 }
292
293 static void
process_search(WORKERSIDE_ARGS * args,QUEUE_DATA * query)294 process_search(WORKERSIDE_ARGS *args, QUEUE_DATA *query)
295 {
296 ESL_STOPWATCH *w = NULL; /* timer used for profiling statistics */
297 WORKER_DATA *worker = NULL;
298 SEARCH_RESULTS results;
299 int n;
300 int cnt;
301 int inx;
302 int ready_workers; /* counter variable used to track the number of workers currently available to receive work; short for "remaining", I imagine */
303 int tries;
304 int i;
305
306
307 memset(&results, 0, sizeof(SEARCH_RESULTS)); /* avoid valgrind bitching about uninit bytes; remove, if we ever serialize structs properly */
308
309 /* figure out the size of the database we are searching */
310 if (query->cmd_type == HMMD_CMD_SEARCH) {
311 if((args->seq_db == NULL)||(args->seq_db->db == NULL)|| (query->dbx >= args->seq_db->db_cnt) || (query->dbx < 0)){
312 // Client is attempting to search a database that does not exist, complain and abort search
313 client_msg(query->sock, eslFAIL, "Specified sequence database has not been loaded into the daemon. \n");
314 return;
315 }
316 else{
317 cnt = args->seq_db->db[query->dbx].count;
318 }
319 } else {
320 if(args->hmm_db == NULL){
321 // Client is attempting to search a database that does not exist, complain and abort search
322 client_msg(query->sock, eslFAIL, "No HMM database has been loaded into the daemon. \n");
323 return;
324 }
325 else{
326 cnt = args->hmm_db->n;
327 }
328 }
329
330 // start timer after we make sure the relevant database exists to make cleanup easier on error
331 w = esl_stopwatch_Create();
332 esl_stopwatch_Start(w);
333 init_results(&results);
334
335 //if range(s) are given, count how many of the seqdb's sequences are within supplied range(s)
336 if (args->range_list) { // can only happen in HMMD_CMD_SEARCH case
337 int range_cnt = 0; // this will now count how many of the seqs in the db are within the range
338 for (i=0; i<cnt; i++) {
339 if ( hmmpgmd_IsWithinRanges(args->seq_db->list[i].idx, args->range_list ) )
340 range_cnt++;
341 }
342 cnt = range_cnt;
343 }
344
345
346 inx = 0;
347 tries = 0;
348 do {
349 /* process any changes to the available workers */
350 if ((n = pthread_mutex_lock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
351
352 /* build a list of the currently available workers */
353 update_workers(args);
354
355 /* if there are no workers, report an error */
356 if (args->ready > 0) {
357 ready_workers = args->ready;
358
359 /* update the workers search information */
360 worker = args->head;
361
362 while (worker != NULL) {
363 worker->cmd = query->cmd;
364 worker->completed = 0;
365 worker->total = 0;
366
367 /* assign each worker a portion of the database */
368 worker->srch_inx = inx;
369 if (args->range_list) {
370 // if ranges are given, need to split the db list based on which elements in the list are within the given range(s)
371 int goal = cnt / ready_workers; //how many within-range sequences do I want to ask this worker to handle
372 int curr = 0; //how many within-range sequences have I seen since the start of this full-db range
373 worker->srch_cnt = 0;
374 while (curr < goal) {
375 if ( hmmpgmd_IsWithinRanges (args->seq_db->list[inx].idx, args->range_list ) )
376 curr++;
377 worker->srch_cnt++;
378 inx++;
379 }
380 cnt -= curr;
381 } else {
382 // default - split evenly among workers
383 worker->srch_cnt = cnt / ready_workers;
384 inx += worker->srch_cnt;
385 cnt -= worker->srch_cnt;
386 }
387
388 --ready_workers;
389 worker = worker->next;
390 }
391
392 args->completed = 0;
393
394 /* notify all the worker threads of the new query */
395 if ((n = pthread_cond_broadcast(&args->start_cond)) != 0) LOG_FATAL_MSG("cond broadcast", n);
396 }
397
398 if ((n = pthread_mutex_unlock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
399
400 if (args->ready > 0) {
401 /* Wait for all the workers to complete */
402 if ((n = pthread_mutex_lock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
403
404 while (args->completed < args->ready) {
405 if ((n = pthread_cond_wait (&args->complete_cond, &args->work_mutex)) != 0) LOG_FATAL_MSG("cond wait", n);
406 }
407
408 if ((n = pthread_mutex_unlock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
409 }
410
411 /* gather up the results from all the workers */
412 gather_results(query, args, &results);
413
414 /* we can recover from one worker crashing. get the block that worker ran on
415 * and redistribute its load to all the remaining workers.
416 */
417 inx = results.db_inx;
418 cnt = results.db_cnt;
419 ++tries;
420
421 } while (args->ready > 0 && results.errors == 1 && tries < 2);
422
423
424 esl_stopwatch_Stop(w);
425
426 /* copy the search stats */
427 results.stats.elapsed = w->elapsed;
428 results.stats.user = w->user;
429 results.stats.sys = w->sys;
430 results.stats.hit_offsets = NULL; // set this to make sure we allocate memory later
431 /* TODO: check for errors */
432 if (args->ready == 0) {
433 client_msg(query->sock, eslFAIL, "No compute nodes available\n");
434 } else if (args->failed > 0) {
435 client_msg(query->sock, eslFAIL, "Errors running search\n");
436 clear_results(args, &results);
437 } else {
438 forward_results(query, &results);
439 }
440
441 esl_stopwatch_Destroy(w);
442 }
443
444 static void
process_shutdown(WORKERSIDE_ARGS * args,QUEUE_DATA * query)445 process_shutdown(WORKERSIDE_ARGS *args, QUEUE_DATA *query)
446 {
447 int n;
448 int cnt;
449
450 HMMD_COMMAND cmd;
451
452 WORKER_DATA *worker = NULL;
453
454 /* process any changes to the available workers */
455 if ((n = pthread_mutex_lock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
456
457 /* build a list of the currently available workers */
458 update_workers(args);
459
460 /* reset all the idle and active workers */
461 cnt = 0;
462
463 /* build a reset command */
464 cmd.hdr.length = 0;
465 cmd.hdr.command = HMMD_CMD_SHUTDOWN;
466
467 /* look for the active workers to shutdown */
468 worker = args->head;
469 while (worker != NULL) {
470 worker->cmd = &cmd;
471 worker->completed = 0;
472 worker->total = 0;
473
474 worker = worker->next;
475 ++cnt;
476 }
477
478 /* look for the idle workers to shutdown */
479 worker = args->idling;
480 while (worker != NULL) {
481 worker->cmd = &cmd;
482 worker->completed = 0;
483 worker->total = 0;
484
485 worker = worker->next;
486 ++cnt;
487 }
488
489 /* check if there are any workers to shutdown */
490 if (cnt > 0) {
491 args->completed = 0;
492
493 /* notify all the worker threads of the new query */
494 if ((n = pthread_cond_broadcast(&args->start_cond)) != 0) LOG_FATAL_MSG("cond broadcast", n);
495 if ((n = pthread_mutex_unlock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
496
497 /* Wait for all the workers to complete */
498 if ((n = pthread_mutex_lock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
499
500 while (args->completed < cnt) {
501 if ((n = pthread_cond_wait (&args->complete_cond, &args->work_mutex)) != 0) LOG_FATAL_MSG("cond wait", n);
502 }
503 }
504
505 /* build a list of the currently available workers */
506 update_workers(args);
507
508 if ((n = pthread_mutex_unlock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
509 }
510
511
512 void
master_process(ESL_GETOPTS * go)513 master_process(ESL_GETOPTS *go)
514 {
515 P7_SEQCACHE *seq_db = NULL;
516 P7_HMMCACHE *hmm_db = NULL;
517 ESL_STACK *cmdstack = NULL; /* stack of commands that clients want done */
518 QUEUE_DATA *query = NULL;
519 CLIENTSIDE_ARGS client_comm;
520 WORKERSIDE_ARGS worker_comm;
521 int n;
522 int shutdown;
523 char errbuf[eslERRBUFSIZE];
524 int status = eslOK;
525
526 impl_Init();
527 p7_FLogsumInit(); /* we're going to use table-driven Logsum() approximations at times */
528
529 if (esl_opt_IsUsed(go, "--seqdb")) {
530 char *name = esl_opt_GetString(go, "--seqdb");
531 if ((status = p7_seqcache_Open(name, &seq_db, errbuf)) != eslOK)
532 p7_Fail("Failed to cache %s (%d)", name, status);
533
534 }
535
536 if (esl_opt_IsUsed(go, "--hmmdb")) {
537 char *name = esl_opt_GetString(go, "--hmmdb");
538
539 status = p7_hmmcache_Open(name, &hmm_db, errbuf);
540 if (status == eslENOTFOUND) p7_Fail("Failed to open profile database %s\n %s\n", name, errbuf);
541 else if (status == eslEFORMAT) p7_Fail("Failed to parse profile database %s\n %s\n", name, errbuf);
542 else if (status == eslEINCOMPAT) p7_Fail("Mismatched alphabets in profile db %s\n %s\n", name, errbuf);
543 else if (status != eslOK) p7_Fail("Failed to load profile db %s : code %d\n", name, status);
544
545 p7_hmmcache_SetNumericNames(hmm_db);
546
547 printf("Loaded profile db %s; models: %d memory: %" PRId64 "\n",
548 name, hmm_db->n, (uint64_t) p7_hmmcache_Sizeof(hmm_db));
549 }
550
551 /* if stdout is redirected at the commandline, it causes printf's to be buffered,
552 * which means status logging isn't printed. This line strongly requests unbuffering,
553 * which should be ok, given the low stdout load of hmmpgmd
554 */
555 setvbuf (stdout, NULL, _IONBF, BUFSIZ);
556 printf("Data loaded into memory. Master is ready.\n");
557 setvbuf (stdout, NULL, _IOFBF, BUFSIZ);
558
559 /* initialize the search stack, set it up for interthread communication */
560 cmdstack = esl_stack_PCreate();
561 esl_stack_UseMutex(cmdstack);
562 esl_stack_UseCond(cmdstack);
563
564 /* start the communications with the web clients */
565 client_comm.cmdstack = cmdstack;
566 setup_clientside_comm(go, &client_comm);
567
568 /* initialize the worker structure */
569 if ((n = pthread_mutex_init(&worker_comm.work_mutex, NULL)) != 0) LOG_FATAL_MSG("mutex init", n);
570 if ((n = pthread_cond_init(&worker_comm.start_cond, NULL)) != 0) LOG_FATAL_MSG("cond init", n);
571 if ((n = pthread_cond_init(&worker_comm.complete_cond, NULL)) != 0) LOG_FATAL_MSG("cond init", n);
572
573 worker_comm.sock_fd = -1;
574 worker_comm.head = NULL;
575 worker_comm.tail = NULL;
576 worker_comm.pending = NULL;
577 worker_comm.idling = NULL;
578 worker_comm.seq_db = seq_db;
579 worker_comm.hmm_db = hmm_db;
580 worker_comm.db_version = 1;
581
582 worker_comm.ready = 0;
583 worker_comm.failed = 0;
584 worker_comm.pend_cnt = 0;
585 worker_comm.idle_cnt = 0;
586
587 setup_workerside_comm(go, &worker_comm);
588
589 /* read query hmm/sequence
590 * the PPop() will wait until a client pushes a command to the queue
591 */
592 shutdown = 0;
593 while (!shutdown && esl_stack_PPop(cmdstack, (void **) &query) == eslOK) {
594 printf("Processing command %d from %s\n", query->cmd_type, query->ip_addr);
595 fflush(stdout);
596
597 worker_comm.range_list = NULL;
598
599 switch(query->cmd_type) {
600 case HMMD_CMD_SEARCH:
601 if (esl_opt_IsUsed(query->opts, "--seqdb_ranges")) {
602 ESL_ALLOC(worker_comm.range_list, sizeof(RANGE_LIST));
603 hmmpgmd_GetRanges(worker_comm.range_list, esl_opt_GetString(query->opts, "--seqdb_ranges"));
604 }
605 process_search(&worker_comm, query);
606 break;
607 case HMMD_CMD_SCAN: process_search(&worker_comm, query); break;
608 case HMMD_CMD_SHUTDOWN:
609 process_shutdown(&worker_comm, query);
610 p7_syslog(LOG_ERR,"[%s:%d] - shutting down...\n", __FILE__, __LINE__);
611 shutdown = 1;
612 break;
613 default:
614 p7_syslog(LOG_ERR,"[%s:%d] - unknown command %d from %s\n", __FILE__, __LINE__, query->cmd_type, query->ip_addr);
615 break;
616 }
617
618 free_QueueData(query);
619 }
620
621 esl_stack_ReleaseCond(cmdstack);
622
623 if (hmm_db) p7_hmmcache_Close(hmm_db);
624 if (seq_db) p7_seqcache_Close(seq_db);
625
626 esl_stack_Destroy(cmdstack);
627
628 pthread_mutex_destroy(&worker_comm.work_mutex);
629 pthread_cond_destroy(&worker_comm.start_cond);
630 pthread_cond_destroy(&worker_comm.complete_cond);
631
632
633 if (worker_comm.range_list) {
634 if (worker_comm.range_list->starts) free(worker_comm.range_list->starts);
635 if (worker_comm.range_list->ends) free(worker_comm.range_list->ends);
636 free (worker_comm.range_list);
637 }
638
639 return;
640
641
642 ERROR:
643 p7_Fail("Memory allocation error. Code: %d\n", status);
644
645 }
646
647
648 // Qsort comparison function to sort a list of pointers to P7_HITs
649 static int
hit_sorter2(const void * p1,const void * p2)650 hit_sorter2(const void *p1, const void *p2)
651 {
652 int cmp;
653
654 const P7_HIT *h1 = *((P7_HIT **) p1);
655 const P7_HIT *h2 = *((P7_HIT **) p2);
656
657 cmp = (h1->sortkey < h2->sortkey);
658 cmp -= (h1->sortkey > h2->sortkey);
659
660 return cmp;
661 }
662
663 static void
init_results(SEARCH_RESULTS * results)664 init_results(SEARCH_RESULTS *results)
665 {
666 results->status.status = eslOK;
667 results->status.msg_size = 0;
668
669 results->stats.nhits = 0;
670 results->stats.nreported = 0;
671 results->stats.nincluded = 0;
672
673 results->stats.nmodels = 0;
674 results->stats.nseqs = 0;
675 results->stats.n_past_msv = 0;
676 results->stats.n_past_bias = 0;
677 results->stats.n_past_vit = 0;
678 results->stats.n_past_fwd = 0;
679 results->stats.Z = 0;
680
681 results->hits = NULL;
682 results->stats.hit_offsets = NULL;
683 results->nhits = 0;
684 results->db_inx = 0;
685 results->db_cnt = 0;
686 results->errors = 0;
687 }
688
689 static void
gather_results(QUEUE_DATA * query,WORKERSIDE_ARGS * comm,SEARCH_RESULTS * results)690 gather_results(QUEUE_DATA *query, WORKERSIDE_ARGS *comm, SEARCH_RESULTS *results)
691 {
692 int cnt;
693 int n;
694
695 WORKER_DATA *worker;
696
697 /* allocate spaces to hold all the hits */
698 cnt = results->nhits + MAX_WORKERS;
699 if ((results->hits = realloc(results->hits, sizeof(P7_HIT *) * cnt)) == NULL) LOG_FATAL_MSG("malloc", errno);
700
701 /* lock the workers until we have merged the results */
702 if ((n = pthread_mutex_lock (&comm->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
703
704 /* count the number of hits */
705 cnt = results->nhits;
706 worker = comm->head;
707 while (worker != NULL) {
708 if (worker->completed) {
709 uint32_t previous_hits = results->stats.nhits;
710
711 results->stats.nhits += worker->stats.nhits;
712 results->stats.nreported += worker->stats.nreported;
713 results->stats.nincluded += worker->stats.nincluded;
714
715 results->stats.n_past_msv += worker->stats.n_past_msv;
716 results->stats.n_past_bias += worker->stats.n_past_bias;
717 results->stats.n_past_vit += worker->stats.n_past_vit;
718 results->stats.n_past_fwd += worker->stats.n_past_fwd;
719
720 results->stats.Z_setby = worker->stats.Z_setby;
721 results->stats.domZ_setby = worker->stats.domZ_setby;
722 results->stats.domZ = worker->stats.domZ;
723 results->stats.Z = worker->stats.Z;
724
725 results->status.msg_size += worker->status.msg_size - sizeof(HMMD_SEARCH_STATS);
726
727 if((results->stats.nhits- previous_hits) >0){ // There are new hits to deal with
728 // Add enough space to the list of hits for all the hits from this worker
729 results->hits = realloc(results->hits, results->stats.nhits * sizeof (P7_HIT *));
730 if(results->hits == NULL){
731 LOG_FATAL_MSG("malloc", n);
732 }
733
734 // copy this worker's hits into the global list
735 for(int i0 = 0, i1 = previous_hits; i1 < results->stats.nhits; i0++, i1++){
736 results->hits[i1] = worker->hits[i0];
737 }
738
739 free(worker->hits); // Free the worker's array of pointers to hits. The hits themselves
740 // will be freed by forward_results()
741
742 worker->hits = NULL;
743 }
744 worker->completed = 0;
745 ++cnt;
746 } else {
747 results->errors++;
748 results->db_inx = worker->srch_inx;
749 results->db_cnt = worker->srch_cnt;
750 }
751
752 worker = worker->next;
753 }
754
755 if ((n = pthread_mutex_unlock (&comm->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
756
757 if (query->cmd_type == HMMD_CMD_SEARCH) {
758 results->stats.nmodels = 1;
759 results->stats.nseqs = comm->seq_db->db[query->dbx].K;
760 } else {
761 results->stats.nseqs = 1;
762 results->stats.nmodels = comm->hmm_db->n;
763 }
764
765 if (results->stats.Z_setby == p7_ZSETBY_NTARGETS) {
766 results->stats.Z = (query->cmd_type == HMMD_CMD_SEARCH) ? results->stats.nseqs : results->stats.nmodels;
767 }
768
769 results->nhits = cnt;
770 }
771
772 static void
forward_results(QUEUE_DATA * query,SEARCH_RESULTS * results)773 forward_results(QUEUE_DATA *query, SEARCH_RESULTS *results)
774 {
775 P7_TOPHITS th;
776 P7_PIPELINE *pli = NULL;
777 P7_DOMAIN **dcl = NULL;
778 P7_HIT *hits = NULL;
779 int fd, n;
780 uint8_t **buf, **buf2, **buf3, *buf_ptr, *buf2_ptr, *buf3_ptr;
781 uint32_t nalloc, nalloc2, nalloc3, buf_offset, buf_offset2, buf_offset3;
782 enum p7_pipemodes_e mode;
783
784 // Initialize these pointers-to-pointers that we'll use for sending data
785 buf_ptr = NULL;
786 buf = &(buf_ptr);
787 buf2_ptr = NULL;
788 buf2 = &(buf2_ptr);
789 buf3_ptr = NULL;
790 buf3 = &(buf3_ptr);
791
792 fd = query->sock;
793
794 if (query->cmd_type == HMMD_CMD_SEARCH) mode = p7_SEARCH_SEQS;
795 else mode = p7_SCAN_MODELS;
796
797 /* sort the hits and apply score and E-value thresholds */
798 if (results->nhits > 0) {
799 if(results->stats.hit_offsets != NULL){
800 if ((results->stats.hit_offsets = realloc(results->stats.hit_offsets, results->stats.nhits * sizeof(uint64_t))) == NULL) LOG_FATAL_MSG("malloc", errno);
801 }
802 else{
803 if ((results->stats.hit_offsets = malloc(results->stats.nhits * sizeof(uint64_t))) == NULL) LOG_FATAL_MSG("malloc", errno);
804 }
805
806 // sort the hits
807 qsort(results->hits, results->stats.nhits, sizeof(P7_HIT *), hit_sorter2);
808
809 th.unsrt = NULL;
810 th.N = results->stats.nhits;
811 th.nreported = 0;
812 th.nincluded = 0;
813 th.is_sorted_by_sortkey = 0;
814 th.is_sorted_by_seqidx = 0;
815
816 pli = p7_pipeline_Create(query->opts, 100, 100, FALSE, mode);
817 pli->nmodels = results->stats.nmodels;
818 pli->nseqs = results->stats.nseqs;
819 pli->n_past_msv = results->stats.n_past_msv;
820 pli->n_past_bias = results->stats.n_past_bias;
821 pli->n_past_vit = results->stats.n_past_vit;
822 pli->n_past_fwd = results->stats.n_past_fwd;
823
824 pli->Z = results->stats.Z;
825 pli->domZ = results->stats.domZ;
826 pli->Z_setby = results->stats.Z_setby;
827 pli->domZ_setby = results->stats.domZ_setby;
828
829
830 th.hit = results->hits;
831
832 p7_tophits_Threshold(&th, pli);
833
834 /* after the top hits thresholds are checked, the number of sequences
835 * and domains to be reported can change. */
836 results->stats.nreported = th.nreported;
837 results->stats.nincluded = th.nincluded;
838 results->stats.domZ = pli->domZ;
839 results->stats.Z = pli->Z;
840 }
841
842 /* Build the buffers of serialized results we'll send back to the client.
843 Use three buffers, one for each object, because we need to build them in reverse order.
844 We need to serialize the hits to build the hits_offset array in HMMD_SEARCH_STATS.
845 We need the length of the serialized hits and HMMD_SEARCH_STATS objects to fill out the msg_size
846 field in status, but we want to send status, then stats, then hits */
847
848 nalloc = 0;
849 buf_offset = 0;
850
851 // First, the buffer of hits
852 for(int i =0; i< results->stats.nhits; i++){
853
854 results->stats.hit_offsets[i] = buf_offset;
855 if(p7_hit_Serialize(results->hits[i], buf, &buf_offset, &nalloc) != eslOK){
856 LOG_FATAL_MSG("Serializing P7_HIT failed", errno);
857 }
858
859 }
860 if(results->stats.nhits == 0){
861 results->stats.hit_offsets = NULL;
862 }
863
864 // Second, the buffer with the HMMD_SEARCH_STATS object
865
866 buf_offset2 = 0;
867 nalloc2 = 0;
868 if(p7_hmmd_search_stats_Serialize(&(results->stats), buf2, &buf_offset2, &nalloc2) != eslOK){
869 LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATS failed", errno);
870 }
871
872 results->status.msg_size = buf_offset + buf_offset2; // set size of second message
873
874 // Third, the buffer with the HMMD_SEARCH_STATUS object
875 buf_offset3 = 0;
876 nalloc3 = 0;
877 if(hmmd_search_status_Serialize(&(results->status), buf3, &buf_offset3, &nalloc3) != eslOK){
878 LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATUS failed", errno);
879 }
880
881 // Now, send the buffers in the reverse of the order they were built
882 /* send back a successful status message */
883 n = buf_offset3;
884
885 if (writen(fd, buf3_ptr, n) != n) {
886 p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, query->ip_addr, errno, strerror(errno));
887 goto CLEAR;
888 }
889
890 // and the stats object
891 n=buf_offset2;
892
893 if (writen(fd, buf2_ptr, n) != n) {
894 p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, query->ip_addr, errno, strerror(errno));
895 goto CLEAR;
896 }
897 printf("%p\n", results->hits[1]);
898 // and finally the hits
899 n=buf_offset;
900
901 if (writen(fd, buf_ptr, n) != n) {
902 p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, query->ip_addr, errno, strerror(errno));
903 goto CLEAR;
904 }
905 printf("Results for %s (%d) sent %" PRId64 " bytes\n", query->ip_addr, fd, results->status.msg_size);
906 printf("Hits:%"PRId64 " reported:%" PRId64 " included:%"PRId64 "\n", results->stats.nhits, results->stats.nreported, results->stats.nincluded);
907 fflush(stdout);
908
909 CLEAR:
910 /* free all the data */
911 for(int i = 0; i < results->stats.nhits; i++){
912 p7_hit_Destroy(results->hits[i]);
913 }
914
915 free(results->hits);
916 results->hits = NULL;
917
918 if (pli) p7_pipeline_Destroy(pli);
919 if (hits) free(hits);
920 if (dcl) free(dcl);
921 if(buf_ptr != NULL){
922 free(buf_ptr);
923 }
924 if(buf2_ptr != NULL){
925 free(buf2_ptr);
926 }
927 if(buf3_ptr != NULL){
928 free(buf3_ptr);
929 }
930 if(results->stats.hit_offsets != NULL){
931 free(results->stats.hit_offsets);
932 }
933 init_results(results);
934 return;
935 }
936
937 static void
destroy_worker(WORKER_DATA * worker)938 destroy_worker(WORKER_DATA *worker)
939 {
940 if (worker == NULL) {
941 if (worker->err_buf != NULL) free(worker->err_buf);
942 if (worker->hits != NULL){
943 for(int i = 0; i < worker->allocated_hits; i++){
944 p7_hit_Destroy(worker->hits[i]);
945 }
946 free(worker->hits);
947 }
948 memset(worker, 0, sizeof(WORKER_DATA));
949 free(worker);
950 }
951 }
952
953 static void
clear_results(WORKERSIDE_ARGS * args,SEARCH_RESULTS * results)954 clear_results(WORKERSIDE_ARGS *args, SEARCH_RESULTS *results)
955 {
956 int i;
957 int n;
958 WORKER_DATA *worker;
959
960 /* lock the workers until we have freed the results */
961 if ((n = pthread_mutex_lock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
962
963 assert(validate_workers(args));
964
965 /* free all the results */
966 worker = args->head;
967 while (worker != NULL) {
968 if (worker->err_buf != NULL) free(worker->err_buf);
969 if(worker->hits != NULL){
970 for(int i =0; i < worker->allocated_hits; i++){
971 p7_hit_Destroy(worker->hits[i]);
972 }
973 free(worker->hits);
974 worker->hits = NULL;
975 }
976 worker->err_buf = NULL;
977
978 worker->completed = 0;
979 worker = worker->next;
980 }
981
982 if ((n = pthread_mutex_unlock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
983
984 for (i = 0; i < results->nhits; ++i) {
985 if (results->hits[i] != NULL) p7_hit_Destroy(results->hits[i]);
986 results->hits[i] = NULL;
987 }
988
989 if (results->hits != NULL) free(results->hits);
990 init_results(results);
991 }
992
993 static void
process_ServerCmd(char * ptr,CLIENTSIDE_ARGS * data)994 process_ServerCmd(char *ptr, CLIENTSIDE_ARGS *data)
995 {
996 QUEUE_DATA *parms = NULL; /* cmd to queue */
997 HMMD_COMMAND *cmd = NULL; /* parsed cmd to process */
998 int fd = data->sock_fd;
999 ESL_STACK *cmdstack = data->cmdstack;
1000 char *s;
1001 time_t date;
1002 char timestamp[32];
1003
1004 /* skip leading white spaces */
1005 ++ptr;
1006 while (*ptr == ' ' || *ptr == '\t') ++ptr;
1007
1008 /* skip to the end of the line */
1009 s = ptr;
1010 while (*s && (*s != '\n' && *s != '\r')) ++s;
1011 *s = 0;
1012
1013 /* process the different commands */
1014 s = strsep(&ptr, " \t");
1015 if (strcmp(s, "shutdown") == 0)
1016 {
1017 if ((cmd = malloc(sizeof(HMMD_HEADER))) == NULL) LOG_FATAL_MSG("malloc", errno);
1018 memset(cmd, 0, sizeof(HMMD_HEADER)); /* avoid uninit bytes & valgrind bitching. Remove, if we ever serialize structs correctly. */
1019 cmd->hdr.length = 0;
1020 cmd->hdr.command = HMMD_CMD_SHUTDOWN;
1021 }
1022 else
1023 {
1024 client_msg(fd, eslEINVAL, "Unknown command %s\n", s);
1025 return;
1026 }
1027
1028 if ((parms = malloc(sizeof(QUEUE_DATA))) == NULL) LOG_FATAL_MSG("malloc", errno);
1029 memset(parms, 0, sizeof(QUEUE_DATA)); /* avoid valgrind bitches about uninit bytes; remove if structs are serialized properly */
1030
1031 parms->hmm = NULL;
1032 parms->seq = NULL;
1033 parms->abc = NULL;
1034 parms->opts = NULL;
1035 parms->dbx = -1;
1036 parms->cmd = cmd;
1037
1038 strcpy(parms->ip_addr, data->ip_addr);
1039 parms->sock = fd;
1040 parms->cmd_type = cmd->hdr.command;
1041 parms->query_type = 0;
1042
1043 date = time(NULL);
1044 ctime_r(&date, timestamp);
1045 printf("\n%s", timestamp); /* note ctime_r() leaves \n on end of timestamp */
1046 printf("Queuing command %d from %s (%d)\n", cmd->hdr.command, parms->ip_addr, parms->sock);
1047 fflush(stdout);
1048
1049 esl_stack_PPush(cmdstack, parms);
1050 }
1051
1052 static int
clientside_loop(CLIENTSIDE_ARGS * data)1053 clientside_loop(CLIENTSIDE_ARGS *data)
1054 {
1055 int status;
1056
1057 char *ptr;
1058 char *buffer;
1059 char opt_str[MAX_BUFFER];
1060
1061 int dbx;
1062 int buf_size;
1063 int remaining;
1064 int amount;
1065 int eod;
1066 int n;
1067
1068 P7_HMM *hmm = NULL; /* query HMM */
1069 ESL_SQ *seq = NULL; /* query sequence */
1070 ESL_SCOREMATRIX *sco = NULL; /* scoring matrix */
1071 P7_HMMFILE *hfp = NULL;
1072 ESL_ALPHABET *abc = NULL; /* digital alphabet */
1073 ESL_GETOPTS *opts = NULL; /* search specific options */
1074 HMMD_COMMAND *cmd = NULL; /* search cmd to send to workers */
1075
1076 ESL_STACK *cmdstack = data->cmdstack;
1077 QUEUE_DATA *parms;
1078 jmp_buf jmp_env;
1079 time_t date;
1080 char timestamp[32];
1081
1082 buf_size = MAX_BUFFER;
1083 if ((buffer = malloc(buf_size)) == NULL) LOG_FATAL_MSG("malloc", errno);
1084 ptr = buffer;
1085 remaining = buf_size;
1086 amount = 0;
1087
1088 eod = 0;
1089 while (!eod) {
1090 int l;
1091 char *s;
1092
1093 /* Receive message from client */
1094 if ((n = read(data->sock_fd, ptr, remaining)) < 0) {
1095 p7_syslog(LOG_ERR,"[%s:%d] - reading %s error %d - %s\n", __FILE__, __LINE__, data->ip_addr, errno, strerror(errno));
1096 return 1;
1097 }
1098
1099 if (n == 0) return 1;
1100
1101 ptr += n;
1102 amount += n;
1103 remaining -= n;
1104
1105 /* scan backwards till we hit the start of the line */
1106 l = amount;
1107 s = ptr - 1;
1108 while (l-- > 0 && (*s == '\n' || *s == '\r')) --s;
1109 while (l-- > 0 && (*s != '\n' && *s != '\r')) --s;
1110 eod = (amount > 1 && *(s + 1) == '/' && *(s + 2) == '/' );
1111
1112 /* if the buffer is full, make it larger */
1113 if (!eod && remaining == 0) {
1114 if ((buffer = realloc(buffer, buf_size * 2)) == NULL) LOG_FATAL_MSG("realloc", errno);
1115 ptr = buffer + buf_size;
1116 remaining = buf_size;
1117 buf_size *= 2;
1118 }
1119 }
1120
1121 /* zero terminate the buffer */
1122 if (remaining == 0) {
1123 if ((buffer = realloc(buffer, buf_size + 1)) == NULL) LOG_FATAL_MSG("realloc", errno);
1124 ptr = buffer + buf_size;
1125 }
1126 *ptr = 0;
1127
1128 /* skip all leading white spaces */
1129 ptr = buffer;
1130 while (*ptr && isspace(*ptr)) ++ptr;
1131
1132 opt_str[0] = 0;
1133 if (*ptr == '!') {
1134 process_ServerCmd(ptr, data);
1135 free(buffer);
1136 return 0;
1137 } else if (*ptr == '@') {
1138 char *s = ++ptr;
1139
1140 /* skip to the end of the line */
1141 while (*ptr && (*ptr != '\n' && *ptr != '\r')) ++ptr;
1142 *ptr++ = 0;
1143
1144 /* create a commandline string with dummy program name for
1145 * the esl_opt_ProcessSpoof() function to parse.
1146 */
1147 snprintf(opt_str, sizeof(opt_str), "hmmpgmd %s\n", s);
1148
1149 /* skip remaining white spaces */
1150 while (*ptr && isspace(*ptr)) ++ptr;
1151 } else {
1152 client_msg(data->sock_fd, eslEFORMAT, "Missing options string");
1153 free(buffer);
1154 return 0;
1155 }
1156
1157 if (strncmp(ptr, "//", 2) == 0) {
1158 client_msg(data->sock_fd, eslEFORMAT, "Missing search sequence/hmm");
1159 free(buffer);
1160 return 0;
1161 }
1162
1163 if (!setjmp(jmp_env)) {
1164 dbx = 0;
1165
1166 status = process_searchopts(data->sock_fd, opt_str, &opts);
1167 if (status != eslOK) {
1168 client_msg_longjmp(data->sock_fd, status, &jmp_env, "Failed to parse options string: %s", opts->errbuf);
1169 }
1170
1171 /* the options string can handle an optional database */
1172 if (esl_opt_ArgNumber(opts) > 0) {
1173 client_msg_longjmp(data->sock_fd, status, &jmp_env, "Incorrect number of command line arguments.");
1174 }
1175
1176 if (esl_opt_IsUsed(opts, "--seqdb")) {
1177 dbx = esl_opt_GetInteger(opts, "--seqdb");
1178 } else if (esl_opt_IsUsed(opts, "--hmmdb")) {
1179 dbx = esl_opt_GetInteger(opts, "--hmmdb");
1180 } else {
1181 client_msg_longjmp(data->sock_fd, eslEINVAL, &jmp_env, "No search database specified, --seqdb or --hmmdb.");
1182 }
1183
1184
1185 abc = esl_alphabet_Create(eslAMINO);
1186 seq = NULL;
1187 hmm = NULL;
1188
1189 if (*ptr == '>') {
1190 /* try to parse the input buffer as a FASTA sequence */
1191 seq = esl_sq_CreateDigital(abc);
1192 /* try to parse the input buffer as a FASTA sequence */
1193 status = esl_sqio_Parse(ptr, strlen(ptr), seq, eslSQFILE_DAEMON);
1194 if (status != eslOK) client_msg_longjmp(data->sock_fd, status, &jmp_env, "Error parsing FASTA sequence");
1195 if (seq->n < 1) client_msg_longjmp(data->sock_fd, eslEFORMAT, &jmp_env, "Error zero length FASTA sequence");
1196
1197 } else if (strncmp(ptr, "HMM", 3) == 0) {
1198 if (esl_opt_IsUsed(opts, "--hmmdb")) {
1199 client_msg_longjmp(data->sock_fd, status, &jmp_env, "A HMM cannot be used to search a hmm database");
1200 }
1201
1202 /* try to parse the buffer as an hmm */
1203 status = p7_hmmfile_OpenBuffer(ptr, strlen(ptr), &hfp);
1204 if (status != eslOK) client_msg_longjmp(data->sock_fd, status, &jmp_env, "Failed to open query hmm buffer");
1205
1206 status = p7_hmmfile_Read(hfp, &abc, &hmm);
1207 if (status != eslOK) client_msg_longjmp(data->sock_fd, status, &jmp_env, "Error reading query hmm: %s", hfp->errbuf);
1208
1209 p7_hmmfile_Close(hfp);
1210
1211 } else {
1212 /* no idea what we are trying to parse */
1213 client_msg_longjmp(data->sock_fd, eslEFORMAT, &jmp_env, "Unknown query sequence/hmm format");
1214 }
1215 } else {
1216 /* an error occured some where, so try to clean up */
1217 if (opts != NULL) esl_getopts_Destroy(opts);
1218 if (abc != NULL) esl_alphabet_Destroy(abc);
1219 if (hmm != NULL) p7_hmm_Destroy(hmm);
1220 if (seq != NULL) esl_sq_Destroy(seq);
1221 if (sco != NULL) esl_scorematrix_Destroy(sco);
1222
1223 free(buffer);
1224 return 0;
1225 }
1226
1227 if ((parms = malloc(sizeof(QUEUE_DATA))) == NULL) LOG_FATAL_MSG("malloc", errno);
1228
1229 /* build the search structure that will be sent to all the workers */
1230 n = sizeof(HMMD_COMMAND);
1231 n = n + strlen(opt_str) + 1;
1232
1233 if (seq != NULL) {
1234 n = n + strlen(seq->name) + 1;
1235 n = n + strlen(seq->desc) + 1;
1236 n = n + seq->n + 2;
1237 } else {
1238 n = n + sizeof(P7_HMM);
1239 n = n + sizeof(float) * (hmm->M + 1) * p7H_NTRANSITIONS;
1240 n = n + sizeof(float) * (hmm->M + 1) * abc->K;
1241 n = n + sizeof(float) * (hmm->M + 1) * abc->K;
1242 if (hmm->name != NULL) n = n + strlen(hmm->name) + 1;
1243 if (hmm->acc != NULL) n = n + strlen(hmm->acc) + 1;
1244 if (hmm->desc != NULL) n = n + strlen(hmm->desc) + 1;
1245 if (hmm->flags & p7H_RF) n = n + hmm->M + 2;
1246 if (hmm->flags & p7H_MMASK) n = n + hmm->M + 2;
1247 if (hmm->flags & p7H_CONS) n = n + hmm->M + 2;
1248 if (hmm->flags & p7H_CS) n = n + hmm->M + 2;
1249 if (hmm->flags & p7H_CA) n = n + hmm->M + 2;
1250 if (hmm->flags & p7H_MAP) n = n + sizeof(int) * (hmm->M + 1);
1251 }
1252
1253 if ((cmd = malloc(n)) == NULL) LOG_FATAL_MSG("malloc", errno);
1254 memset(cmd, 0, n); /* silence valgrind bitching about uninit bytes; remove if we ever serialize structs properly */
1255 cmd->hdr.length = n - sizeof(HMMD_HEADER);
1256 cmd->hdr.command = (esl_opt_IsUsed(opts, "--seqdb")) ? HMMD_CMD_SEARCH : HMMD_CMD_SCAN;
1257 cmd->srch.db_inx = dbx - 1; /* the program indexes databases 0 .. n-1 */
1258 cmd->srch.opts_length = strlen(opt_str) + 1;
1259
1260 ptr = cmd->srch.data;
1261
1262 memcpy(ptr, opt_str, cmd->srch.opts_length);
1263 ptr += cmd->srch.opts_length;
1264
1265 if (seq != NULL) {
1266 cmd->srch.query_type = HMMD_SEQUENCE;
1267 cmd->srch.query_length = seq->n + 2;
1268
1269 n = strlen(seq->name) + 1;
1270 memcpy(ptr, seq->name, n);
1271 ptr += n;
1272
1273 n = strlen(seq->desc) + 1;
1274 memcpy(ptr, seq->desc, n);
1275 ptr += n;
1276
1277 n = seq->n + 2;
1278 memcpy(ptr, seq->dsq, n);
1279 ptr += n;
1280 } else {
1281 cmd->srch.query_type = HMMD_HMM;
1282 cmd->srch.query_length = hmm->M;
1283
1284 n = sizeof(P7_HMM);
1285 memcpy(ptr, hmm, n);
1286 ptr += n;
1287
1288 n = sizeof(float) * (hmm->M + 1) * p7H_NTRANSITIONS;
1289 memcpy(ptr, *hmm->t, n);
1290 ptr += n;
1291
1292 n = sizeof(float) * (hmm->M + 1) * abc->K;
1293 memcpy(ptr, *hmm->mat, n);
1294 ptr += n;
1295 memcpy(ptr, *hmm->ins, n);
1296 ptr += n;
1297
1298 if (hmm->name) { n = strlen(hmm->name) + 1; memcpy(ptr, hmm->name, n); ptr += n; }
1299 if (hmm->acc) { n = strlen(hmm->acc) + 1; memcpy(ptr, hmm->acc, n); ptr += n; }
1300 if (hmm->desc) { n = strlen(hmm->desc) + 1; memcpy(ptr, hmm->desc, n); ptr += n; }
1301
1302 n = hmm->M + 2;
1303 if (hmm->flags & p7H_RF) { memcpy(ptr, hmm->rf, n); ptr += n; }
1304 if (hmm->flags & p7H_MMASK) { memcpy(ptr, hmm->mm, n); ptr += n; }
1305 if (hmm->flags & p7H_CONS) { memcpy(ptr, hmm->consensus, n); ptr += n; }
1306 if (hmm->flags & p7H_CS) { memcpy(ptr, hmm->cs, n); ptr += n; }
1307 if (hmm->flags & p7H_CA) { memcpy(ptr, hmm->ca, n); ptr += n; }
1308
1309 if (hmm->flags & p7H_MAP) {
1310 n = sizeof(int) * (hmm->M + 1);
1311 memcpy(ptr, hmm->map, n);
1312 ptr += n;
1313 }
1314 }
1315
1316 parms->hmm = hmm;
1317 parms->seq = seq;
1318 parms->abc = abc;
1319 parms->opts = opts;
1320 parms->dbx = dbx - 1;
1321 parms->cmd = cmd;
1322
1323 strcpy(parms->ip_addr, data->ip_addr);
1324 parms->sock = data->sock_fd;
1325 parms->cmd_type = cmd->hdr.command;
1326 parms->query_type = (seq != NULL) ? HMMD_SEQUENCE : HMMD_HMM;
1327
1328 date = time(NULL);
1329 ctime_r(&date, timestamp);
1330 printf("\n%s", timestamp); /* note ctime_r() leaves \n on end of timestamp */
1331
1332 if (parms->seq != NULL) {
1333 printf("Queuing %s %s from %s (%d)\n", (cmd->hdr.command == HMMD_CMD_SEARCH) ? "search" : "scan", parms->seq->name, parms->ip_addr, parms->sock);
1334 } else {
1335 printf("Queuing hmm %s from %s (%d)\n", parms->hmm->name, parms->ip_addr, parms->sock);
1336 }
1337 printf("%s", opt_str); /* note opt_str already has trailing \n */
1338 fflush(stdout);
1339
1340 esl_stack_PPush(cmdstack, parms);
1341
1342 free(buffer);
1343 return 0;
1344 }
1345
1346
1347 /* discard_function()
1348 * function handed to esl_stack_DiscardSelected() to remove
1349 * all commands in the stack that are associated with a
1350 * particular client socket, because we're closing that
1351 * client down. Prototype to this is dictate by the generalized
1352 * interface to esl_stack_DiscardSelected().
1353 */
1354 static int
discard_function(void * elemp,void * args)1355 discard_function(void *elemp, void *args)
1356 {
1357 QUEUE_DATA *elem = (QUEUE_DATA *) elemp;
1358 int fd = * (int *) args;
1359
1360 if (elem->sock == fd)
1361 {
1362 free_QueueData(elem);
1363 return TRUE;
1364 }
1365 return FALSE;
1366 }
1367
1368 static void *
clientside_thread(void * arg)1369 clientside_thread(void *arg)
1370 {
1371 int eof;
1372 CLIENTSIDE_ARGS *data = (CLIENTSIDE_ARGS *)arg;
1373
1374 /* Guarantees that thread resources are deallocated upon return */
1375 pthread_detach(pthread_self());
1376
1377 eof = 0;
1378 while (!eof) {
1379 eof = clientside_loop(data);
1380 }
1381
1382 /* remove any commands in stack associated with this client's socket */
1383 esl_stack_DiscardSelected(data->cmdstack, discard_function, &(data->sock_fd));
1384
1385 printf("Closing %s (%d)\n", data->ip_addr, data->sock_fd);
1386 fflush(stdout);
1387
1388 close(data->sock_fd);
1389 free(data);
1390
1391 pthread_exit(NULL);
1392 }
1393
1394 static void *
client_comm_thread(void * arg)1395 client_comm_thread(void *arg)
1396 {
1397 int n;
1398 int fd;
1399 int addrlen;
1400 pthread_t thread_id;
1401
1402 struct sockaddr_in addr;
1403
1404 CLIENTSIDE_ARGS *targs = NULL;
1405 CLIENTSIDE_ARGS *data = (CLIENTSIDE_ARGS *)arg;
1406
1407 for ( ;; ) {
1408
1409 /* Wait for a client to connect */
1410 n = sizeof(addr);
1411 if ((fd = accept(data->sock_fd, (struct sockaddr *)&addr, (unsigned int *)&n)) < 0) LOG_FATAL_MSG("accept", errno);
1412
1413 if ((targs = malloc(sizeof(CLIENTSIDE_ARGS))) == NULL) LOG_FATAL_MSG("malloc", errno);
1414 targs->cmdstack = data->cmdstack;
1415 targs->sock_fd = fd;
1416
1417 addrlen = sizeof(targs->ip_addr);
1418 strncpy(targs->ip_addr, inet_ntoa(addr.sin_addr), addrlen);
1419 targs->ip_addr[addrlen-1] = 0;
1420
1421 if ((n = pthread_create(&thread_id, NULL, clientside_thread, targs)) != 0) LOG_FATAL_MSG("thread create", n);
1422 }
1423
1424 pthread_exit(NULL);
1425 }
1426
1427 static void
setup_clientside_comm(ESL_GETOPTS * opts,CLIENTSIDE_ARGS * args)1428 setup_clientside_comm(ESL_GETOPTS *opts, CLIENTSIDE_ARGS *args)
1429 {
1430 int n;
1431 int reuse;
1432 int sock_fd;
1433 pthread_t thread_id;
1434
1435 struct linger linger;
1436 struct sockaddr_in addr;
1437
1438 /* Create socket for incoming connections */
1439 if ((sock_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) LOG_FATAL_MSG("socket", errno);
1440
1441 /* incase the server went down in an ungraceful way, allow the port to be
1442 * reused avoiding the timeout.
1443 */
1444 reuse = 1;
1445 if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) LOG_FATAL_MSG("setsockopt", errno);
1446
1447 /* the sockets are never closed, so if the server exits, force the kernel to
1448 * close the socket and clear it so the server can be restarted immediately.
1449 */
1450 linger.l_onoff = 1;
1451 linger.l_linger = 0;
1452 if (setsockopt(sock_fd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) LOG_FATAL_MSG("setsockopt", errno);
1453
1454 /* Construct local address structure */
1455 memset(&addr, 0, sizeof(addr));
1456 addr.sin_family = AF_INET;
1457 addr.sin_addr.s_addr = htonl(INADDR_ANY);
1458 addr.sin_port = htons(esl_opt_GetInteger(opts, "--cport"));
1459
1460 /* Bind to the local address */
1461 if (bind(sock_fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) LOG_FATAL_MSG("bind", errno);
1462
1463 /* Mark the socket so it will listen for incoming connections */
1464 if (listen(sock_fd, esl_opt_GetInteger(opts, "--ccncts")) < 0) LOG_FATAL_MSG("listen", errno);
1465 args->sock_fd = sock_fd;
1466
1467 if ((n = pthread_create(&thread_id, NULL, client_comm_thread, (void *)args)) != 0) LOG_FATAL_MSG("socket", n);
1468 }
1469
1470 static void
workerside_loop(WORKERSIDE_ARGS * data,WORKER_DATA * worker)1471 workerside_loop(WORKERSIDE_ARGS *data, WORKER_DATA *worker)
1472 {
1473 ESL_STOPWATCH *w = NULL;
1474 HMMD_SEARCH_STATS *stats = NULL;
1475 HMMD_COMMAND cmd;
1476 int n;
1477 int size;
1478 int total;
1479 char *ptr;
1480 uint8_t *buf; // Buffer to receive bytes into over sockets
1481 uint32_t buf_position; //Index into buffer for deserialize
1482 memset(&cmd, 0, sizeof(HMMD_COMMAND)); /* silence valgrind. if we ever serialize structs properly, remove */
1483 w = esl_stopwatch_Create();
1484
1485 for ( ; ; ) {
1486
1487 /* wait for the next search object */
1488 if ((n = pthread_mutex_lock (&data->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
1489
1490 /* wait for the master's signal to start the calculations */
1491 while (worker->cmd == NULL) {
1492 if ((n = pthread_cond_wait(&data->start_cond, &data->work_mutex)) != 0) LOG_FATAL_MSG("cond wait", n);
1493 }
1494
1495 if ((n = pthread_mutex_unlock (&data->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
1496
1497 if (worker->cmd->hdr.command == HMMD_CMD_SHUTDOWN) {
1498 fd_set rset;
1499 struct timeval tv;
1500
1501 n = MSG_SIZE(worker->cmd);
1502 if (writen(worker->sock_fd, worker->cmd, n) != n) {
1503 p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1504 break;
1505 }
1506
1507 FD_ZERO(&rset);
1508 FD_SET(worker->sock_fd, &rset);
1509
1510 tv.tv_sec = 2;
1511 tv.tv_usec = 0;
1512
1513 if ((n = select(worker->sock_fd + 1, &rset, NULL, NULL, &tv)) < 0) {
1514 p7_syslog(LOG_ERR,"[%s:%d] - select %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1515 } else {
1516 if (n == 0) {
1517 p7_syslog(LOG_ERR,"[%s:%d] - shutdown %s is not responding\n", __FILE__, __LINE__, worker->ip_addr);
1518 } else {
1519 n = sizeof(HMMD_HEADER);
1520 if ((size = readn(worker->sock_fd, &cmd, n)) == -1) {
1521 p7_syslog(LOG_ERR,"[%s:%d] - reading %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1522 }
1523 if (cmd.hdr.command == HMMD_CMD_SHUTDOWN) {
1524 p7_syslog(LOG_ERR,"[%s:%d] - shutting down %s\n", __FILE__, __LINE__, worker->ip_addr);
1525 } else {
1526 p7_syslog(LOG_ERR,"[%s:%d] - error shutting down %s - received %d\n", __FILE__, __LINE__, worker->ip_addr, cmd.hdr.command);
1527 }
1528 }
1529 }
1530 break;
1531 }
1532
1533 //printf ("Writing %d bytes to %s [MSG = %d/%d]\n", (int)MSG_SIZE(worker->cmd), worker->ip_addr, worker->cmd->hdr.command, worker->cmd->hdr.length);
1534
1535 esl_stopwatch_Start(w);
1536
1537 /* write search message in two parts */
1538 n = sizeof(HMMD_HEADER) + sizeof(HMMD_SEARCH_CMD);
1539 memcpy(&cmd, worker->cmd, n);
1540 cmd.srch.inx = worker->srch_inx;
1541 cmd.srch.cnt = worker->srch_cnt;
1542 if (writen(worker->sock_fd, &cmd, n) != n) {
1543 p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1544 break;
1545 }
1546
1547 /* write remaining data, i.e. sequence, options etc. */
1548 ptr = (char *)worker->cmd;
1549 ptr += n;
1550 n = MSG_SIZE(worker->cmd) - n;
1551 if (writen(worker->sock_fd, ptr, n) != n) {
1552 p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1553 break;
1554 }
1555
1556 total = 0;
1557 worker->total = 0;
1558
1559 n = HMMD_SEARCH_STATUS_SERIAL_SIZE;
1560 buf = malloc(n);
1561 if (buf == NULL){
1562 LOG_FATAL_MSG("malloc", errno);
1563 }
1564
1565 total += n;
1566 if ((size = readn(worker->sock_fd, buf, n)) == -1) {
1567 p7_syslog(LOG_ERR,"[%s:%d] - reading %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1568 break;
1569 }
1570
1571 buf_position = 0;
1572 if(hmmd_search_status_Deserialize(buf, &buf_position, &(worker->status)) != eslOK){
1573 LOG_FATAL_MSG("Couldn't deserialize HMMD_SEARCH_STATUS", errno);
1574 }
1575
1576 if (worker->status.status != eslOK) {
1577 n = worker->status.msg_size;
1578 total += n;
1579 if ((worker->err_buf = malloc(n)) == NULL) LOG_FATAL_MSG("malloc", errno);
1580 worker->err_buf[0] = 0;
1581 if ((size = readn(worker->sock_fd, worker->err_buf, n)) == -1) {
1582 p7_syslog(LOG_ERR,"[%s:%d] - reading %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1583 break;
1584 }
1585 } else {
1586
1587 // receive the results from the worker
1588 buf = realloc(buf, worker->status.msg_size);
1589 if(buf == NULL){
1590 LOG_FATAL_MSG("malloc", errno);
1591 }
1592
1593 total += worker->status.msg_size;
1594 if ((size = readn(worker->sock_fd, buf, worker->status.msg_size)) == -1) {
1595 p7_syslog(LOG_ERR,"[%s:%d] - reading %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1596 break;
1597 }
1598
1599 buf_position = 0; // start at beginning of new buffer of data
1600 // Now, serialize the data structures out of it
1601 if(p7_hmmd_search_stats_Deserialize(buf, &buf_position, &(worker->stats)) != eslOK){
1602 LOG_FATAL_MSG("Couldn't deserialize HMMD_SEARCH_STATS", errno);
1603 }
1604 stats = &worker->stats;
1605 if(stats->nhits > 0){
1606 worker->hits = malloc(stats->nhits * sizeof(P7_HIT *));
1607 if(worker->hits == NULL){
1608 LOG_FATAL_MSG("malloc", errno);
1609 }
1610 worker->allocated_hits = stats->nhits; // Need this if we have to destroy the worker because of an error
1611 /* read in the hits */
1612 for(int i = 0; i < stats->nhits; i++){
1613 worker->hits[i] = p7_hit_Create_empty();
1614 if(worker->hits[i] == NULL){
1615 LOG_FATAL_MSG("malloc", errno);
1616 }
1617 if(p7_hit_Deserialize(buf, &buf_position, worker->hits[i]) != eslOK){
1618 LOG_FATAL_MSG("Couldn't deserialize P7_HIT", errno);
1619 }
1620 }
1621 }
1622 free(buf);
1623 }
1624
1625 /* We've just allocated an array of pointers to P7_HIT objects and a bunch of P7_HIT
1626 objects that we don't free in this function. Here's what happens to them. gather_results() assembles
1627 all of the P7_HIT objects from the different workers into one big list, which it passes to forward_results().
1628 gather_results() frees each worker's array of pointers to P7_HIT objects, and forward_results is responsible for
1629 freeing all of the P7_HIT objects when it's done with them */
1630
1631 esl_stopwatch_Stop(w);
1632
1633 if ((n = pthread_mutex_lock (&data->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
1634
1635 /* set the state of the worker to completed */
1636 worker->cmd = NULL;
1637 worker->completed = 1;
1638 worker->total = total;
1639 ++data->completed;
1640
1641 /* notify the master that a worker has completed */
1642 if ((n = pthread_cond_broadcast(&data->complete_cond)) != 0) LOG_FATAL_MSG("cond broadcast", n);
1643 if ((n = pthread_mutex_unlock (&data->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
1644
1645 printf ("WORKER %s COMPLETED: %.2f sec received %d bytes\n", worker->ip_addr, w->elapsed, total);
1646 fflush(stdout);
1647 }
1648
1649 esl_stopwatch_Destroy(w);
1650
1651 return;
1652 }
1653
1654 static void *
workerside_thread(void * arg)1655 workerside_thread(void *arg)
1656 {
1657 HMMD_COMMAND *cmd = NULL;
1658 WORKER_DATA *worker = (WORKER_DATA *)arg;
1659 WORKERSIDE_ARGS *parent = (WORKERSIDE_ARGS *)worker->parent;
1660 HMMD_HEADER hdr;
1661 int n;
1662 int fd = 0;
1663 int version;
1664 int updated;
1665 int status = eslOK;
1666 char *p;
1667
1668 memset(&hdr, 0, sizeof(HMMD_HEADER)); /* silence valgrind; remove if/when we serialize structs properly */
1669
1670 /* Guarantees that thread resources are deallocated upon return */
1671 pthread_detach(pthread_self());
1672
1673 printf("Handling worker %s (%d)\n", worker->ip_addr, worker->sock_fd);
1674 fflush(stdout);
1675
1676 updated = 0;
1677 while (!updated) {
1678 /* get the database version to load */
1679 if ((n = pthread_mutex_lock (&parent->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
1680 version = parent->db_version;
1681 if ((n = pthread_mutex_unlock (&parent->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
1682
1683 n = sizeof(HMMD_COMMAND);
1684 if (parent->seq_db != NULL) n += strlen(parent->seq_db->name) + 1;
1685 if (parent->hmm_db != NULL) n += strlen(parent->hmm_db->name) + 1;
1686
1687 cmd = malloc(n);
1688 if (cmd == NULL) {
1689 p7_syslog(LOG_ERR,"[%s:%d] - malloc %d - %s\n", __FILE__, __LINE__, errno, strerror(errno));
1690 goto EXIT;
1691 }
1692 memset(cmd, 0, n);
1693
1694 cmd->hdr.length = n - sizeof(HMMD_HEADER);
1695 cmd->hdr.command = HMMD_CMD_INIT;
1696
1697 p = cmd->init.data;
1698
1699 if (parent->seq_db != NULL) {
1700 cmd->init.db_cnt = parent->seq_db->db_cnt;
1701 cmd->init.seq_cnt = parent->seq_db->count;
1702 cmd->init.seqdb_off = p - cmd->init.data;
1703
1704 strncpy(cmd->init.sid, parent->seq_db->id, sizeof(cmd->init.sid));
1705 cmd->init.sid[sizeof(cmd->init.sid)-1] = 0;
1706
1707 strcpy(p, parent->seq_db->name);
1708 p += strlen(parent->seq_db->name) + 1;
1709 }
1710
1711 if (parent->hmm_db != NULL) {
1712 cmd->init.hmm_cnt = 1;
1713 cmd->init.model_cnt = parent->hmm_db->n;
1714 cmd->init.hmmdb_off = p - cmd->init.data;
1715
1716 //strncpy(cmd->init.hid, parent->hmm_db->id, sizeof(cmd->init.hid));
1717 //cmd->init.hid[sizeof(cmd->init.hid)-1] = 0;
1718
1719 strcpy(p, parent->hmm_db->name);
1720 p += strlen(parent->hmm_db->name) + 1;
1721 }
1722
1723 if (writen(worker->sock_fd, cmd, n) != n) {
1724 p7_syslog(LOG_ERR,"[%s:%d] - writing (%d) error %d - %s\n", __FILE__, __LINE__, worker->sock_fd, errno, strerror(errno));
1725 status = eslFAIL;
1726 }
1727
1728 /* process the init command first */
1729 if (readn(worker->sock_fd, &hdr, sizeof(hdr)) == -1) {
1730 p7_syslog(LOG_ERR,"[%s:%d] - reading (%d) error %d - %s\n", __FILE__, __LINE__, worker->sock_fd, errno, strerror(errno));
1731 status = eslFAIL;
1732 }
1733
1734 /* cmd is a HMMD_COMMAND.
1735 * consists of HMMD_HEADER: length, command, status
1736 * and a union of HMMD_INIT_CMD, HMMD_SEARCH_COMMAND, HMMD_INIT_RESET.
1737 * we know which is valid, from hdr.command
1738 * the total malloc size for an HMMD_COMMAND is calculated from the header, using MSG_SIZE(cmd)
1739 */
1740 n = MSG_SIZE(&hdr);
1741 if ((cmd = realloc(cmd, n)) == NULL) {
1742 p7_syslog(LOG_ERR,"[%s:%d] - realloc error %d - %s\n", __FILE__, __LINE__, errno, strerror(errno));
1743 status = eslFAIL;
1744 }
1745 if (readn(worker->sock_fd, &(cmd->init), hdr.length) == -1) {
1746 p7_syslog(LOG_ERR,"[%s:%d] - reading (%d) error %d - %s\n", __FILE__, __LINE__, worker->sock_fd, errno, strerror(errno));
1747 status = eslFAIL;
1748 }
1749
1750 /* validate the database of the worker before adding him to the list */
1751 if (hdr.command != HMMD_CMD_INIT) {
1752 p7_syslog(LOG_ERR,"[%s:%d] - expecting HMMD_CMD_INIT %d\n", __FILE__, __LINE__, hdr.command);
1753 status = eslFAIL;
1754 }
1755 if (cmd->hdr.status != eslOK) {
1756 p7_syslog(LOG_ERR,"[%s:%d] - workers init status failed %d\n", __FILE__, __LINE__, cmd->hdr.status);
1757 status = eslFAIL;
1758 }
1759
1760 worker->next = NULL;
1761 worker->prev = NULL;
1762
1763 /* add the worker to the pending list */
1764 if ((n = pthread_mutex_lock (&parent->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
1765
1766 assert(validate_workers(parent));
1767
1768 /* make sure the master has not loaded a new database while we were waiting
1769 * for the worker to load and verify the database we started out this. If
1770 * the version has changed, force the worker to reload and verify.
1771 */
1772 if (version == parent->db_version) {
1773 if (status == eslOK) {
1774 worker->next = parent->pending;
1775 parent->pending = worker;
1776 ++parent->pend_cnt;
1777 } else {
1778 worker->next = parent->idling;
1779 parent->idling = worker;
1780 ++parent->idle_cnt;
1781 }
1782 updated = 1;
1783 }
1784
1785 assert(validate_workers(parent));
1786
1787 if ((n = pthread_mutex_unlock (&parent->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
1788 }
1789
1790 printf("Pending worker %s (%d)\n", worker->ip_addr, worker->sock_fd);
1791 fflush(stdout);
1792
1793 workerside_loop(parent, worker);
1794
1795 if ((n = pthread_mutex_lock (&parent->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
1796
1797 fd = worker->sock_fd;
1798
1799 ++parent->failed;
1800 ++parent->completed;
1801
1802 worker->terminated = 1;
1803 worker->total = 0;
1804 worker->sock_fd = -1;
1805
1806 assert(validate_workers(parent));
1807
1808 /* notify the master that a worker has completed */
1809 if ((n = pthread_cond_broadcast(&parent->complete_cond)) != 0) LOG_FATAL_MSG("cond broadcast", n);
1810 if ((n = pthread_mutex_unlock (&parent->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
1811
1812 EXIT:
1813 printf("Closing worker %s (%d)\n", worker->ip_addr, fd);
1814 fflush(stdout);
1815
1816 if (cmd != NULL) free(cmd);
1817 close(fd);
1818
1819 pthread_exit(NULL);
1820 }
1821
1822 static void *
worker_comm_thread(void * arg)1823 worker_comm_thread(void *arg)
1824 {
1825 int n;
1826 int fd;
1827 int addrlen;
1828 pthread_t thread_id;
1829
1830 struct sockaddr_in addr;
1831
1832 WORKERSIDE_ARGS *data = (WORKERSIDE_ARGS *)arg;
1833 WORKER_DATA *worker;
1834
1835 for ( ;; ) {
1836
1837 /* Wait for a worker to connect */
1838 n = sizeof(addr);
1839 if ((fd = accept(data->sock_fd, (struct sockaddr *)&addr, (unsigned int *)&n)) < 0) LOG_FATAL_MSG("accept", errno);
1840
1841 if ((worker = malloc(sizeof(WORKER_DATA))) == NULL) LOG_FATAL_MSG("thread create", errno);
1842 memset(worker, 0, sizeof(WORKER_DATA));
1843
1844 worker->parent = data;
1845 worker->sock_fd = fd;
1846 worker->allocated_hits = 0; // These may be redundant because of the memset earlier, but better safe than sorry
1847 worker->hits = NULL;
1848
1849 addrlen = sizeof(worker->ip_addr);
1850 strncpy(worker->ip_addr, inet_ntoa(addr.sin_addr), addrlen);
1851 worker->ip_addr[addrlen-1] = 0;
1852
1853 if ((n = pthread_create(&thread_id, NULL, workerside_thread, worker)) != 0) LOG_FATAL_MSG("thread create", n);
1854 }
1855
1856 pthread_exit(NULL);
1857 }
1858
1859 static void
setup_workerside_comm(ESL_GETOPTS * opts,WORKERSIDE_ARGS * args)1860 setup_workerside_comm(ESL_GETOPTS *opts, WORKERSIDE_ARGS *args)
1861 {
1862 int n;
1863 int reuse;
1864 int sock_fd;
1865 pthread_t thread_id;
1866
1867 struct linger linger;
1868 struct sockaddr_in addr;
1869
1870 /* Create socket for incoming connections */
1871 if ((sock_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) LOG_FATAL_MSG("socket", errno);
1872
1873 /* incase the server went down in an ungraceful way, allow the port to be
1874 * reused avoiding the timeout.
1875 */
1876 reuse = 1;
1877 if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) LOG_FATAL_MSG("setsockopt", errno);
1878
1879 /* the sockets are never closed, so if the server exits, force the kernel to
1880 * close the socket and clear it so the server can be restarted immediately.
1881 */
1882 linger.l_onoff = 1;
1883 linger.l_linger = 0;
1884 if (setsockopt(sock_fd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) LOG_FATAL_MSG("setsockopt", errno);
1885
1886 /* Construct local address structure */
1887 memset(&addr, 0, sizeof(addr));
1888 addr.sin_family = AF_INET;
1889 addr.sin_addr.s_addr = htonl(INADDR_ANY);
1890 addr.sin_port = htons(esl_opt_GetInteger(opts, "--wport"));
1891
1892 /* Bind to the local address */
1893 if (bind(sock_fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) LOG_FATAL_MSG("bind", errno);
1894
1895 /* Mark the socket so it will listen for incoming connections */
1896 if (listen(sock_fd, esl_opt_GetInteger(opts, "--wcncts")) < 0) LOG_FATAL_MSG("listen", errno);
1897
1898 args->sock_fd = sock_fd;
1899
1900 if ((n = pthread_create(&thread_id, NULL, worker_comm_thread, (void *)args)) != 0) LOG_FATAL_MSG("thread create", n);
1901 }
1902
1903 #endif /*HMMER_THREADS*/
1904
1905
1906
1907