1 /* master side of the hmmpgmd daemon
2  * MSF, Thu Aug 12, 2010 [Janelia]
3  */
4 #include "p7_config.h"
5 
6 #ifdef HMMER_THREADS
7 
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <errno.h>
12 #include <unistd.h>
13 #include <signal.h>
14 #include <pthread.h>
15 #include <setjmp.h>
16 #include <sys/socket.h>
17 #ifdef HAVE_NETINET_IN_H
18 #include <netinet/in.h>     /* On FreeBSD, you need netinet/in.h for struct sockaddr_in            */
19 #endif                      /* On OpenBSD, netinet/in.h is required for (must precede) arpa/inet.h */
20 #include <arpa/inet.h>
21 #include <syslog.h>
22 #include <assert.h>
23 #include <time.h>
24 
25 #ifndef HMMER_THREADS
26 #error "Program requires pthreads be enabled."
27 #endif /*HMMER_THREADS*/
28 
29 #include "easel.h"
30 #include "esl_alphabet.h"
31 #include "esl_getopts.h"
32 #include "esl_sq.h"
33 #include "esl_sqio.h"
34 #include "esl_stack.h"
35 #include "esl_stopwatch.h"
36 #include "esl_threads.h"
37 
38 #include "hmmer.h"
39 #include "hmmpgmd.h"
40 #include "cachedb.h"
41 #include "p7_hmmcache.h"
42 
43 #define MAX_WORKERS  64
44 #define MAX_BUFFER   4096
45 
46 #define CONF_FILE "/etc/hmmpgmd.conf"
47 
48 typedef struct {
49   HMMD_SEARCH_STATS   stats;
50   HMMD_SEARCH_STATUS  status;
51   P7_HIT              **hits;
52   int                 nhits;
53   int                 db_inx;
54   int                 db_cnt;
55   int                 errors;
56 } SEARCH_RESULTS;
57 
58 typedef struct {
59   int             sock_fd;
60   char            ip_addr[64];
61 
62   ESL_STACK      *cmdstack;	/* stack of commands that clients want done */
63 } CLIENTSIDE_ARGS;
64 
65 typedef struct {
66   int              sock_fd;
67 
68   pthread_mutex_t  work_mutex;
69   pthread_cond_t   start_cond;
70   pthread_cond_t   complete_cond;
71 
72   int              db_version;
73   P7_SEQCACHE     *seq_db;
74   P7_HMMCACHE     *hmm_db;
75 
76   int              ready;
77   int              failed;
78   struct worker_s *head;
79   struct worker_s *tail;
80 
81   int              pend_cnt;
82   struct worker_s *pending;
83 
84   int              idle_cnt;
85   struct worker_s *idling;
86 
87   RANGE_LIST       *range_list;  /* (optional) list of ranges searched within the seqdb */
88 
89   int              completed;
90 } WORKERSIDE_ARGS;
91 
92 typedef struct worker_s {
93   int                   sock_fd;
94   char                  ip_addr[64];
95 
96   int                   completed;
97   int                   terminated;
98   HMMD_COMMAND         *cmd;
99 
100   uint32_t              srch_inx;
101   uint32_t              srch_cnt;
102 
103   HMMD_SEARCH_STATS     stats;
104   HMMD_SEARCH_STATUS    status;
105   char                 *err_buf;
106   P7_HIT               **hits;
107   uint32_t             allocated_hits;
108   int                   total;
109 
110   WORKERSIDE_ARGS      *parent;
111 
112   struct worker_s      *next;
113   struct worker_s      *prev;
114 } WORKER_DATA;
115 
116 
117 static void setup_clientside_comm(ESL_GETOPTS *opts, CLIENTSIDE_ARGS  *args);
118 static void setup_workerside_comm(ESL_GETOPTS *opts, WORKERSIDE_ARGS  *args);
119 
120 static void destroy_worker(WORKER_DATA *worker);
121 
122 static void init_results(SEARCH_RESULTS *results);
123 static void clear_results(WORKERSIDE_ARGS *comm, SEARCH_RESULTS *results);
124 static void gather_results(QUEUE_DATA *query, WORKERSIDE_ARGS *comm, SEARCH_RESULTS *results);
125 static void forward_results(QUEUE_DATA *query, SEARCH_RESULTS *results);
126 
127 static void
print_client_msg(int fd,int status,char * format,va_list ap)128 print_client_msg(int fd, int status, char *format, va_list ap)
129 {
130   uint32_t nalloc =0;
131   uint32_t buf_offset = 0;
132   uint8_t *buf = NULL;
133   char  ebuf[512];
134 
135   HMMD_SEARCH_STATUS s;
136 
137   memset(&s, 0, sizeof(HMMD_SEARCH_STATUS));
138 
139   s.status   = status;
140   s.msg_size = vsnprintf(ebuf, sizeof(ebuf), format, ap) +1; /* +1 because we send the \0 */
141 
142   p7_syslog(LOG_ERR, ebuf);
143 
144   if(hmmd_search_status_Serialize(&s, &buf, &buf_offset, &nalloc) != eslOK){
145     LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATUS failed", errno);
146   }
147   /* send back an unsuccessful status message */
148 
149   if (writen(fd, buf, buf_offset) != buf_offset) {
150     p7_syslog(LOG_ERR,"[%s:%d] - writing (%d) error %d - %s\n", __FILE__, __LINE__, fd, errno, strerror(errno));
151     return;
152   }
153   if (writen(fd, ebuf, s.msg_size) != s.msg_size)  {
154     p7_syslog(LOG_ERR,"[%s:%d] - writing (%d) error %d - %s\n", __FILE__, __LINE__, fd, errno, strerror(errno));
155     return;
156   }
157 
158   free(buf);
159 }
160 
161 static void
client_msg(int fd,int status,char * format,...)162 client_msg(int fd, int status, char *format, ...)
163 {
164   va_list ap;
165 
166   va_start(ap, format);
167   print_client_msg(fd, status, format, ap);
168   va_end(ap);
169 }
170 
171 static void
client_msg_longjmp(int fd,int status,jmp_buf * env,char * format,...)172 client_msg_longjmp(int fd, int status, jmp_buf *env, char *format, ...)
173 {
174   va_list ap;
175 
176   va_start(ap, format);
177   print_client_msg(fd, status, format, ap);
178   va_end(ap);
179 
180   longjmp(*env, 1);
181 }
182 
183 static int
validate_workers(WORKERSIDE_ARGS * args)184 validate_workers(WORKERSIDE_ARGS *args)
185 {
186   int ready    = 0;
187   int failed   = 0;
188   int pending  = 0;
189   int idling   = 0;
190 
191   WORKER_DATA *worker = NULL;
192   WORKER_DATA *tail   = NULL;
193 
194   /* count the idling workers */
195   worker = args->idling;
196   while (worker != NULL) {
197     ++idling;
198     if (worker->terminated) ++failed;
199     worker = worker->next;
200   }
201   assert(idling == args->idle_cnt);
202 
203   /* count the pending workers */
204   worker = args->pending;
205   while (worker != NULL) {
206     ++pending;
207     if (worker->terminated) ++failed;
208     worker = worker->next;
209   }
210   assert(pending == args->pend_cnt);
211 
212   if (args->head == NULL && args->tail == NULL) {
213     assert(failed == args->failed);
214     assert(ready == 0);
215     return 1;
216   }
217 
218   assert(args->head != NULL && args->tail != NULL);
219   assert(args->head->prev == NULL);
220   assert(args->tail->next == NULL);
221 
222   /* count the ready workers */
223   worker = args->head;
224   while (worker != NULL) {
225     ++ready;
226     assert(worker->prev == tail);
227     assert(ready <= args->ready);
228     tail = worker;
229     if (worker->terminated) ++failed;
230     worker = worker->next;
231   }
232   assert(ready  == args->ready);
233   assert(failed == args->failed);
234   assert(tail   == args->tail);
235 
236   return 1;
237 }
238 
239 static void
update_workers(WORKERSIDE_ARGS * args)240 update_workers(WORKERSIDE_ARGS *args)
241 {
242   WORKER_DATA *worker = NULL;
243 
244   assert(validate_workers(args));
245 
246   /* if there are any workers waiting to join, add them */
247   while (args->pending != NULL) {
248     worker = args->pending;
249     args->pending = worker->next;
250 
251     worker->next = NULL;
252     if (args->head == NULL) {
253       args->head = worker;
254       worker->prev = NULL;
255     } else {
256       args->tail->next = worker;
257       worker->prev = args->tail;
258     }
259     args->tail = worker;
260 
261     args->pend_cnt--;
262     args->ready++;
263   }
264 
265   /* remove any workers who have failed */
266   worker = args->head;
267   while (args->failed > 0 && worker != NULL) {
268     WORKER_DATA *next =  worker->next;
269     if (worker->terminated) {
270       --args->failed;
271       --args->ready;
272       if (args->head == worker && args->tail == worker) {
273         args->head = NULL;
274         args->tail = NULL;
275       } else if (args->head == worker) {
276         args->head = worker->next;
277         worker->next->prev = NULL;
278       } else if (args->tail == worker) {
279         args->tail = worker->prev;
280         worker->prev->next = NULL;
281       } else {
282         worker->next->prev = worker->prev;
283         worker->prev->next = worker->next;
284       }
285       destroy_worker(worker);
286     }
287     worker = next;
288   }
289 
290   assert(validate_workers(args));
291 }
292 
293 static void
process_search(WORKERSIDE_ARGS * args,QUEUE_DATA * query)294 process_search(WORKERSIDE_ARGS *args, QUEUE_DATA *query)
295 {
296   ESL_STOPWATCH  *w          = NULL;      /* timer used for profiling statistics             */
297   WORKER_DATA    *worker     = NULL;
298   SEARCH_RESULTS  results;
299   int n;
300   int cnt;
301   int inx;
302   int ready_workers;    /* counter variable used to track the number of workers currently available to receive work; short for "remaining", I imagine */
303   int tries;
304   int i;
305 
306 
307   memset(&results, 0, sizeof(SEARCH_RESULTS)); /* avoid valgrind bitching about uninit bytes; remove, if we ever serialize structs properly */
308 
309   /* figure out the size of the database we are searching */
310   if (query->cmd_type == HMMD_CMD_SEARCH) {
311     if((args->seq_db == NULL)||(args->seq_db->db == NULL)|| (query->dbx >= args->seq_db->db_cnt) || (query->dbx < 0)){
312       // Client is attempting to search a database that does not exist, complain and abort search
313       client_msg(query->sock, eslFAIL, "Specified sequence database has not been loaded into the daemon. \n");
314       return;
315     }
316     else{
317       cnt = args->seq_db->db[query->dbx].count;
318     }
319   } else {
320     if(args->hmm_db == NULL){
321       // Client is attempting to search a database that does not exist, complain and abort search
322       client_msg(query->sock, eslFAIL, "No HMM database has been loaded into the daemon. \n");
323       return;
324     }
325     else{
326      cnt = args->hmm_db->n;
327     }
328   }
329 
330   // start timer after we make sure the relevant database exists to make cleanup easier on error
331   w = esl_stopwatch_Create();
332   esl_stopwatch_Start(w);
333   init_results(&results);
334 
335   //if range(s) are given, count how many of the seqdb's sequences are within supplied range(s)
336   if (args->range_list) { // can only happen in HMMD_CMD_SEARCH case
337     int range_cnt = 0; // this will now count how many of the seqs in the db are within the range
338     for (i=0; i<cnt; i++) {
339       if ( hmmpgmd_IsWithinRanges(args->seq_db->list[i].idx, args->range_list ) )
340         range_cnt++;
341     }
342     cnt = range_cnt;
343   }
344 
345 
346   inx = 0;
347   tries = 0;
348   do {
349     /* process any changes to the available workers */
350     if ((n = pthread_mutex_lock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
351 
352     /* build a list of the currently available workers */
353     update_workers(args);
354 
355     /* if there are no workers, report an error */
356     if (args->ready > 0) {
357       ready_workers = args->ready;
358 
359       /* update the workers search information */
360       worker = args->head;
361 
362       while (worker != NULL) {
363         worker->cmd        = query->cmd;
364         worker->completed  = 0;
365         worker->total      = 0;
366 
367         /* assign each worker a portion of the database */
368         worker->srch_inx = inx;
369         if (args->range_list) {
370           // if ranges are given, need to split the db list based on which elements in the list are within the given range(s)
371           int goal = cnt / ready_workers; //how many within-range sequences do I want to ask this worker to handle
372           int curr = 0;                   //how many within-range sequences have I seen since the start of this full-db range
373           worker->srch_cnt = 0;
374           while (curr < goal) {
375             if ( hmmpgmd_IsWithinRanges (args->seq_db->list[inx].idx, args->range_list ) )
376                 curr++;
377             worker->srch_cnt++;
378             inx++;
379           }
380           cnt -= curr;
381         } else {
382           // default - split evenly among workers
383           worker->srch_cnt = cnt / ready_workers;
384           inx += worker->srch_cnt;
385           cnt -= worker->srch_cnt;
386         }
387 
388         --ready_workers;
389         worker            = worker->next;
390       }
391 
392       args->completed = 0;
393 
394       /* notify all the worker threads of the new query */
395       if ((n = pthread_cond_broadcast(&args->start_cond)) != 0) LOG_FATAL_MSG("cond broadcast", n);
396     }
397 
398     if ((n = pthread_mutex_unlock (&args->work_mutex)) != 0)  LOG_FATAL_MSG("mutex unlock", n);
399 
400     if (args->ready > 0) {
401       /* Wait for all the workers to complete */
402       if ((n = pthread_mutex_lock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
403 
404       while (args->completed < args->ready) {
405         if ((n = pthread_cond_wait (&args->complete_cond, &args->work_mutex)) != 0) LOG_FATAL_MSG("cond wait", n);
406       }
407 
408       if ((n = pthread_mutex_unlock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
409     }
410 
411     /* gather up the results from all the workers */
412     gather_results(query, args, &results);
413 
414     /* we can recover from one worker crashing.  get the block that worker ran on
415      * and redistribute its load to all the remaining workers.
416      */
417     inx = results.db_inx;
418     cnt = results.db_cnt;
419     ++tries;
420 
421   } while (args->ready > 0 && results.errors == 1 && tries < 2);
422 
423 
424   esl_stopwatch_Stop(w);
425 
426   /* copy the search stats */
427   results.stats.elapsed = w->elapsed;
428   results.stats.user    = w->user;
429   results.stats.sys     = w->sys;
430   results.stats.hit_offsets = NULL; // set this to make sure we allocate memory later
431   /* TODO: check for errors */
432   if (args->ready == 0) {
433     client_msg(query->sock, eslFAIL, "No compute nodes available\n");
434   } else if (args->failed > 0) {
435     client_msg(query->sock, eslFAIL, "Errors running search\n");
436     clear_results(args, &results);
437   } else {
438     forward_results(query, &results);
439   }
440 
441   esl_stopwatch_Destroy(w);
442 }
443 
444 static void
process_shutdown(WORKERSIDE_ARGS * args,QUEUE_DATA * query)445 process_shutdown(WORKERSIDE_ARGS *args, QUEUE_DATA *query)
446 {
447   int n;
448   int cnt;
449 
450   HMMD_COMMAND cmd;
451 
452   WORKER_DATA *worker  = NULL;
453 
454   /* process any changes to the available workers */
455   if ((n = pthread_mutex_lock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
456 
457   /* build a list of the currently available workers */
458   update_workers(args);
459 
460   /* reset all the idle and active workers */
461   cnt = 0;
462 
463   /* build a reset command */
464   cmd.hdr.length  = 0;
465   cmd.hdr.command = HMMD_CMD_SHUTDOWN;
466 
467   /* look for the active workers to shutdown */
468   worker = args->head;
469   while (worker != NULL) {
470     worker->cmd        = &cmd;
471     worker->completed  = 0;
472     worker->total      = 0;
473 
474     worker = worker->next;
475     ++cnt;
476   }
477 
478   /* look for the idle workers to shutdown */
479   worker = args->idling;
480   while (worker != NULL) {
481     worker->cmd        = &cmd;
482     worker->completed  = 0;
483     worker->total      = 0;
484 
485     worker = worker->next;
486     ++cnt;
487   }
488 
489   /* check if there are any workers to shutdown */
490   if (cnt > 0) {
491     args->completed = 0;
492 
493     /* notify all the worker threads of the new query */
494     if ((n = pthread_cond_broadcast(&args->start_cond)) != 0) LOG_FATAL_MSG("cond broadcast", n);
495     if ((n = pthread_mutex_unlock (&args->work_mutex)) != 0)  LOG_FATAL_MSG("mutex unlock", n);
496 
497     /* Wait for all the workers to complete */
498     if ((n = pthread_mutex_lock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
499 
500     while (args->completed < cnt) {
501       if ((n = pthread_cond_wait (&args->complete_cond, &args->work_mutex)) != 0) LOG_FATAL_MSG("cond wait", n);
502     }
503   }
504 
505   /* build a list of the currently available workers */
506   update_workers(args);
507 
508   if ((n = pthread_mutex_unlock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
509 }
510 
511 
512 void
master_process(ESL_GETOPTS * go)513 master_process(ESL_GETOPTS *go)
514 {
515   P7_SEQCACHE        *seq_db     = NULL;
516   P7_HMMCACHE        *hmm_db     = NULL;
517   ESL_STACK          *cmdstack   = NULL; /* stack of commands that clients want done */
518   QUEUE_DATA         *query      = NULL;
519   CLIENTSIDE_ARGS     client_comm;
520   WORKERSIDE_ARGS     worker_comm;
521   int                 n;
522   int                 shutdown;
523   char                errbuf[eslERRBUFSIZE];
524   int                 status     = eslOK;
525 
526   impl_Init();
527   p7_FLogsumInit();     /* we're going to use table-driven Logsum() approximations at times */
528 
529   if (esl_opt_IsUsed(go, "--seqdb")) {
530     char *name = esl_opt_GetString(go, "--seqdb");
531     if ((status = p7_seqcache_Open(name, &seq_db, errbuf)) != eslOK)
532       p7_Fail("Failed to cache %s (%d)", name, status);
533 
534   }
535 
536   if (esl_opt_IsUsed(go, "--hmmdb")) {
537     char *name = esl_opt_GetString(go, "--hmmdb");
538 
539     status = p7_hmmcache_Open(name, &hmm_db, errbuf);
540     if      (status == eslENOTFOUND) p7_Fail("Failed to open profile database %s\n  %s\n",    name, errbuf);
541     else if (status == eslEFORMAT)   p7_Fail("Failed to parse profile database %s\n  %s\n",   name, errbuf);
542     else if (status == eslEINCOMPAT) p7_Fail("Mismatched alphabets in profile db %s\n  %s\n", name, errbuf);
543     else if (status != eslOK)        p7_Fail("Failed to load profile db %s : code %d\n",      name, status);
544 
545     p7_hmmcache_SetNumericNames(hmm_db);
546 
547     printf("Loaded profile db %s;  models: %d  memory: %" PRId64 "\n",
548 	   name, hmm_db->n, (uint64_t) p7_hmmcache_Sizeof(hmm_db));
549   }
550 
551   /* if stdout is redirected at the commandline, it causes printf's to be buffered,
552    * which means status logging isn't printed. This line strongly requests unbuffering,
553    * which should be ok, given the low stdout load of hmmpgmd
554    */
555   setvbuf (stdout, NULL, _IONBF, BUFSIZ);
556   printf("Data loaded into memory. Master is ready.\n");
557   setvbuf (stdout, NULL, _IOFBF, BUFSIZ);
558 
559   /* initialize the search stack, set it up for interthread communication  */
560   cmdstack = esl_stack_PCreate();
561   esl_stack_UseMutex(cmdstack);
562   esl_stack_UseCond(cmdstack);
563 
564   /* start the communications with the web clients */
565   client_comm.cmdstack = cmdstack;
566   setup_clientside_comm(go, &client_comm);
567 
568   /* initialize the worker structure */
569   if ((n = pthread_mutex_init(&worker_comm.work_mutex, NULL)) != 0)   LOG_FATAL_MSG("mutex init", n);
570   if ((n = pthread_cond_init(&worker_comm.start_cond, NULL)) != 0)    LOG_FATAL_MSG("cond init", n);
571   if ((n = pthread_cond_init(&worker_comm.complete_cond, NULL)) != 0) LOG_FATAL_MSG("cond init", n);
572 
573   worker_comm.sock_fd    = -1;
574   worker_comm.head       = NULL;
575   worker_comm.tail       = NULL;
576   worker_comm.pending    = NULL;
577   worker_comm.idling     = NULL;
578   worker_comm.seq_db     = seq_db;
579   worker_comm.hmm_db     = hmm_db;
580   worker_comm.db_version = 1;
581 
582   worker_comm.ready      = 0;
583   worker_comm.failed     = 0;
584   worker_comm.pend_cnt   = 0;
585   worker_comm.idle_cnt   = 0;
586 
587   setup_workerside_comm(go, &worker_comm);
588 
589   /* read query hmm/sequence
590    * the PPop() will wait until a client pushes a command to the queue
591    */
592   shutdown = 0;
593   while (!shutdown &&  esl_stack_PPop(cmdstack, (void **) &query) == eslOK) {
594     printf("Processing command %d from %s\n", query->cmd_type, query->ip_addr);
595     fflush(stdout);
596 
597     worker_comm.range_list = NULL;
598 
599     switch(query->cmd_type) {
600     case HMMD_CMD_SEARCH:
601       if (esl_opt_IsUsed(query->opts, "--seqdb_ranges")) {
602         ESL_ALLOC(worker_comm.range_list, sizeof(RANGE_LIST));
603         hmmpgmd_GetRanges(worker_comm.range_list, esl_opt_GetString(query->opts, "--seqdb_ranges"));
604       }
605       process_search(&worker_comm, query);
606       break;
607     case HMMD_CMD_SCAN:        process_search(&worker_comm, query); break;
608     case HMMD_CMD_SHUTDOWN:
609       process_shutdown(&worker_comm, query);
610       p7_syslog(LOG_ERR,"[%s:%d] - shutting down...\n", __FILE__, __LINE__);
611       shutdown = 1;
612       break;
613     default:
614       p7_syslog(LOG_ERR,"[%s:%d] - unknown command %d from %s\n", __FILE__, __LINE__, query->cmd_type, query->ip_addr);
615       break;
616     }
617 
618     free_QueueData(query);
619   }
620 
621   esl_stack_ReleaseCond(cmdstack);
622 
623   if (hmm_db) p7_hmmcache_Close(hmm_db);
624   if (seq_db) p7_seqcache_Close(seq_db);
625 
626   esl_stack_Destroy(cmdstack);
627 
628   pthread_mutex_destroy(&worker_comm.work_mutex);
629   pthread_cond_destroy(&worker_comm.start_cond);
630   pthread_cond_destroy(&worker_comm.complete_cond);
631 
632 
633   if (worker_comm.range_list) {
634     if (worker_comm.range_list->starts)  free(worker_comm.range_list->starts);
635     if (worker_comm.range_list->ends)    free(worker_comm.range_list->ends);
636     free (worker_comm.range_list);
637   }
638 
639   return;
640 
641 
642 ERROR:
643   p7_Fail("Memory allocation error. Code: %d\n",    status);
644 
645 }
646 
647 
648 // Qsort comparison function to sort a list of pointers to P7_HITs
649 static int
hit_sorter2(const void * p1,const void * p2)650 hit_sorter2(const void *p1, const void *p2)
651 {
652   int cmp;
653 
654   const P7_HIT *h1 = *((P7_HIT **) p1);
655   const P7_HIT *h2 = *((P7_HIT **) p2);
656 
657   cmp  = (h1->sortkey < h2->sortkey);
658   cmp -= (h1->sortkey > h2->sortkey);
659 
660   return cmp;
661 }
662 
663 static void
init_results(SEARCH_RESULTS * results)664 init_results(SEARCH_RESULTS *results)
665 {
666   results->status.status     = eslOK;
667   results->status.msg_size   = 0;
668 
669   results->stats.nhits       = 0;
670   results->stats.nreported   = 0;
671   results->stats.nincluded   = 0;
672 
673   results->stats.nmodels     = 0;
674   results->stats.nseqs       = 0;
675   results->stats.n_past_msv  = 0;
676   results->stats.n_past_bias = 0;
677   results->stats.n_past_vit  = 0;
678   results->stats.n_past_fwd  = 0;
679   results->stats.Z           = 0;
680 
681   results->hits              = NULL;
682   results->stats.hit_offsets = NULL;
683   results->nhits             = 0;
684   results->db_inx            = 0;
685   results->db_cnt            = 0;
686   results->errors            = 0;
687 }
688 
689 static void
gather_results(QUEUE_DATA * query,WORKERSIDE_ARGS * comm,SEARCH_RESULTS * results)690 gather_results(QUEUE_DATA *query, WORKERSIDE_ARGS *comm, SEARCH_RESULTS *results)
691 {
692   int cnt;
693   int n;
694 
695   WORKER_DATA        *worker;
696 
697   /* allocate spaces to hold all the hits */
698   cnt = results->nhits + MAX_WORKERS;
699   if ((results->hits = realloc(results->hits, sizeof(P7_HIT *) * cnt)) == NULL) LOG_FATAL_MSG("malloc", errno);
700 
701   /* lock the workers until we have merged the results */
702   if ((n = pthread_mutex_lock (&comm->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
703 
704   /* count the number of hits */
705   cnt = results->nhits;
706   worker = comm->head;
707   while (worker != NULL) {
708     if (worker->completed) {
709       uint32_t previous_hits = results->stats.nhits;
710 
711       results->stats.nhits        += worker->stats.nhits;
712       results->stats.nreported    += worker->stats.nreported;
713       results->stats.nincluded    += worker->stats.nincluded;
714 
715       results->stats.n_past_msv   += worker->stats.n_past_msv;
716       results->stats.n_past_bias  += worker->stats.n_past_bias;
717       results->stats.n_past_vit   += worker->stats.n_past_vit;
718       results->stats.n_past_fwd   += worker->stats.n_past_fwd;
719 
720       results->stats.Z_setby       = worker->stats.Z_setby;
721       results->stats.domZ_setby    = worker->stats.domZ_setby;
722       results->stats.domZ          = worker->stats.domZ;
723       results->stats.Z             = worker->stats.Z;
724 
725       results->status.msg_size    += worker->status.msg_size - sizeof(HMMD_SEARCH_STATS);
726 
727       if((results->stats.nhits- previous_hits) >0){ // There are new hits to deal with
728         // Add enough space to the list of hits for all the hits from this worker
729         results->hits = realloc(results->hits, results->stats.nhits * sizeof (P7_HIT *));
730         if(results->hits == NULL){
731           LOG_FATAL_MSG("malloc", n);
732         }
733 
734         // copy this worker's hits into the global list
735         for(int i0 = 0, i1 = previous_hits; i1 < results->stats.nhits; i0++, i1++){
736           results->hits[i1] = worker->hits[i0];
737         }
738 
739         free(worker->hits); //  Free the worker's array of pointers to hits.  The hits themselves
740         // will be freed by forward_results()
741 
742         worker->hits = NULL;
743       }
744       worker->completed   = 0;
745       ++cnt;
746     } else {
747       results->errors++;
748       results->db_inx            = worker->srch_inx;
749       results->db_cnt            = worker->srch_cnt;
750     }
751 
752     worker = worker->next;
753   }
754 
755   if ((n = pthread_mutex_unlock (&comm->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
756 
757   if (query->cmd_type == HMMD_CMD_SEARCH) {
758     results->stats.nmodels = 1;
759     results->stats.nseqs   = comm->seq_db->db[query->dbx].K;
760   } else {
761     results->stats.nseqs   = 1;
762     results->stats.nmodels = comm->hmm_db->n;
763   }
764 
765   if (results->stats.Z_setby == p7_ZSETBY_NTARGETS) {
766     results->stats.Z = (query->cmd_type == HMMD_CMD_SEARCH) ? results->stats.nseqs : results->stats.nmodels;
767   }
768 
769   results->nhits = cnt;
770 }
771 
772 static void
forward_results(QUEUE_DATA * query,SEARCH_RESULTS * results)773 forward_results(QUEUE_DATA *query, SEARCH_RESULTS *results)
774 {
775   P7_TOPHITS         th;
776   P7_PIPELINE        *pli   = NULL;
777   P7_DOMAIN         **dcl   = NULL;
778   P7_HIT             *hits  = NULL;
779   int fd, n;
780   uint8_t **buf, **buf2, **buf3, *buf_ptr, *buf2_ptr, *buf3_ptr;
781   uint32_t nalloc, nalloc2, nalloc3, buf_offset, buf_offset2, buf_offset3;
782   enum p7_pipemodes_e mode;
783 
784   // Initialize these pointers-to-pointers that we'll use for sending data
785   buf_ptr = NULL;
786   buf = &(buf_ptr);
787   buf2_ptr = NULL;
788   buf2 = &(buf2_ptr);
789   buf3_ptr = NULL;
790   buf3 = &(buf3_ptr);
791 
792   fd    = query->sock;
793 
794   if (query->cmd_type == HMMD_CMD_SEARCH) mode = p7_SEARCH_SEQS;
795   else                                    mode = p7_SCAN_MODELS;
796 
797   /* sort the hits and apply score and E-value thresholds */
798   if (results->nhits > 0) {
799     if(results->stats.hit_offsets != NULL){
800       if ((results->stats.hit_offsets = realloc(results->stats.hit_offsets, results->stats.nhits * sizeof(uint64_t))) == NULL) LOG_FATAL_MSG("malloc", errno);
801     }
802     else{
803       if ((results->stats.hit_offsets = malloc(results->stats.nhits * sizeof(uint64_t))) == NULL) LOG_FATAL_MSG("malloc", errno);
804     }
805 
806     // sort the hits
807     qsort(results->hits, results->stats.nhits, sizeof(P7_HIT *), hit_sorter2);
808 
809     th.unsrt     = NULL;
810     th.N         = results->stats.nhits;
811     th.nreported = 0;
812     th.nincluded = 0;
813     th.is_sorted_by_sortkey = 0;
814     th.is_sorted_by_seqidx  = 0;
815 
816     pli = p7_pipeline_Create(query->opts, 100, 100, FALSE, mode);
817     pli->nmodels     = results->stats.nmodels;
818     pli->nseqs       = results->stats.nseqs;
819     pli->n_past_msv  = results->stats.n_past_msv;
820     pli->n_past_bias = results->stats.n_past_bias;
821     pli->n_past_vit  = results->stats.n_past_vit;
822     pli->n_past_fwd  = results->stats.n_past_fwd;
823 
824     pli->Z           = results->stats.Z;
825     pli->domZ        = results->stats.domZ;
826     pli->Z_setby     = results->stats.Z_setby;
827     pli->domZ_setby  = results->stats.domZ_setby;
828 
829 
830     th.hit = results->hits;
831 
832     p7_tophits_Threshold(&th, pli);
833 
834     /* after the top hits thresholds are checked, the number of sequences
835      * and domains to be reported can change. */
836     results->stats.nreported = th.nreported;
837     results->stats.nincluded = th.nincluded;
838     results->stats.domZ      = pli->domZ;
839     results->stats.Z         = pli->Z;
840   }
841 
842   /* Build the buffers of serialized results we'll send back to the client.
843      Use three buffers, one for each object, because we need to build them in reverse order.
844      We need to serialize the hits to build the hits_offset array in HMMD_SEARCH_STATS.
845      We need the length of the serialized hits and HMMD_SEARCH_STATS objects to fill out the msg_size
846      field in status, but we want to send status, then stats, then hits */
847 
848   nalloc = 0;
849   buf_offset = 0;
850 
851   // First, the buffer of hits
852   for(int i =0; i< results->stats.nhits; i++){
853 
854     results->stats.hit_offsets[i] = buf_offset;
855     if(p7_hit_Serialize(results->hits[i], buf, &buf_offset, &nalloc) != eslOK){
856       LOG_FATAL_MSG("Serializing P7_HIT failed", errno);
857     }
858 
859   }
860   if(results->stats.nhits == 0){
861     results->stats.hit_offsets = NULL;
862   }
863 
864   // Second, the buffer with the HMMD_SEARCH_STATS object
865 
866   buf_offset2 = 0;
867   nalloc2 = 0;
868   if(p7_hmmd_search_stats_Serialize(&(results->stats), buf2, &buf_offset2, &nalloc2) != eslOK){
869     LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATS failed", errno);
870   }
871 
872   results->status.msg_size = buf_offset + buf_offset2; // set size of second message
873 
874   // Third, the buffer with the HMMD_SEARCH_STATUS object
875   buf_offset3 = 0;
876   nalloc3 = 0;
877   if(hmmd_search_status_Serialize(&(results->status), buf3, &buf_offset3, &nalloc3) != eslOK){
878     LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATUS failed", errno);
879   }
880 
881   // Now, send the buffers in the reverse of the order they were built
882   /* send back a successful status message */
883   n = buf_offset3;
884 
885   if (writen(fd, buf3_ptr, n) != n) {
886     p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, query->ip_addr, errno, strerror(errno));
887     goto CLEAR;
888   }
889 
890   // and the stats object
891   n=buf_offset2;
892 
893   if (writen(fd, buf2_ptr, n) != n) {
894     p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, query->ip_addr, errno, strerror(errno));
895     goto CLEAR;
896   }
897   printf("%p\n", results->hits[1]);
898   // and finally the hits
899   n=buf_offset;
900 
901   if (writen(fd, buf_ptr, n) != n) {
902     p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, query->ip_addr, errno, strerror(errno));
903     goto CLEAR;
904   }
905   printf("Results for %s (%d) sent %" PRId64 " bytes\n", query->ip_addr, fd, results->status.msg_size);
906   printf("Hits:%"PRId64 "  reported:%" PRId64 "  included:%"PRId64 "\n", results->stats.nhits, results->stats.nreported, results->stats.nincluded);
907   fflush(stdout);
908 
909  CLEAR:
910   /* free all the data */
911   for(int i = 0; i < results->stats.nhits; i++){
912     p7_hit_Destroy(results->hits[i]);
913   }
914 
915   free(results->hits);
916   results->hits = NULL;
917 
918   if (pli)  p7_pipeline_Destroy(pli);
919   if (hits) free(hits);
920   if (dcl)  free(dcl);
921   if(buf_ptr != NULL){
922     free(buf_ptr);
923   }
924   if(buf2_ptr != NULL){
925     free(buf2_ptr);
926   }
927   if(buf3_ptr != NULL){
928     free(buf3_ptr);
929   }
930   if(results->stats.hit_offsets != NULL){
931     free(results->stats.hit_offsets);
932   }
933   init_results(results);
934   return;
935 }
936 
937 static void
destroy_worker(WORKER_DATA * worker)938 destroy_worker(WORKER_DATA *worker)
939 {
940   if (worker == NULL) {
941     if (worker->err_buf  != NULL) free(worker->err_buf);
942     if (worker->hits != NULL){
943       for(int i = 0; i < worker->allocated_hits; i++){
944         p7_hit_Destroy(worker->hits[i]);
945       }
946       free(worker->hits);
947     }
948     memset(worker, 0, sizeof(WORKER_DATA));
949     free(worker);
950   }
951 }
952 
953 static void
clear_results(WORKERSIDE_ARGS * args,SEARCH_RESULTS * results)954 clear_results(WORKERSIDE_ARGS *args, SEARCH_RESULTS *results)
955 {
956   int i;
957   int n;
958   WORKER_DATA *worker;
959 
960   /* lock the workers until we have freed the results */
961   if ((n = pthread_mutex_lock (&args->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
962 
963   assert(validate_workers(args));
964 
965   /* free all the results */
966   worker = args->head;
967   while (worker != NULL) {
968     if (worker->err_buf  != NULL) free(worker->err_buf);
969     if(worker->hits != NULL){
970       for(int i =0; i < worker->allocated_hits; i++){
971         p7_hit_Destroy(worker->hits[i]);
972       }
973       free(worker->hits);
974       worker->hits = NULL;
975     }
976     worker->err_buf  = NULL;
977 
978     worker->completed = 0;
979     worker = worker->next;
980   }
981 
982   if ((n = pthread_mutex_unlock (&args->work_mutex)) != 0)  LOG_FATAL_MSG("mutex unlock", n);
983 
984   for (i = 0; i < results->nhits; ++i) {
985     if (results->hits[i]  != NULL) p7_hit_Destroy(results->hits[i]);
986     results->hits[i]  = NULL;
987   }
988 
989   if (results->hits != NULL) free(results->hits);
990   init_results(results);
991 }
992 
993 static void
process_ServerCmd(char * ptr,CLIENTSIDE_ARGS * data)994 process_ServerCmd(char *ptr, CLIENTSIDE_ARGS *data)
995 {
996   QUEUE_DATA    *parms    = NULL;     /* cmd to queue           */
997   HMMD_COMMAND  *cmd      = NULL;     /* parsed cmd to process  */
998   int            fd       = data->sock_fd;
999   ESL_STACK     *cmdstack = data->cmdstack;
1000   char          *s;
1001   time_t         date;
1002   char           timestamp[32];
1003 
1004   /* skip leading white spaces */
1005   ++ptr;
1006   while (*ptr == ' ' || *ptr == '\t') ++ptr;
1007 
1008   /* skip to the end of the line */
1009   s = ptr;
1010   while (*s && (*s != '\n' && *s != '\r')) ++s;
1011   *s = 0;
1012 
1013   /* process the different commands */
1014   s = strsep(&ptr, " \t");
1015   if (strcmp(s, "shutdown") == 0)
1016     {
1017       if ((cmd = malloc(sizeof(HMMD_HEADER))) == NULL) LOG_FATAL_MSG("malloc", errno);
1018       memset(cmd, 0, sizeof(HMMD_HEADER)); /* avoid uninit bytes & valgrind bitching. Remove, if we ever serialize structs correctly. */
1019       cmd->hdr.length  = 0;
1020       cmd->hdr.command = HMMD_CMD_SHUTDOWN;
1021     }
1022   else
1023     {
1024       client_msg(fd, eslEINVAL, "Unknown command %s\n", s);
1025       return;
1026     }
1027 
1028   if ((parms = malloc(sizeof(QUEUE_DATA))) == NULL) LOG_FATAL_MSG("malloc", errno);
1029   memset(parms, 0, sizeof(QUEUE_DATA)); /* avoid valgrind bitches about uninit bytes; remove if structs are serialized properly */
1030 
1031   parms->hmm  = NULL;
1032   parms->seq  = NULL;
1033   parms->abc  = NULL;
1034   parms->opts = NULL;
1035   parms->dbx  = -1;
1036   parms->cmd  = cmd;
1037 
1038   strcpy(parms->ip_addr, data->ip_addr);
1039   parms->sock       = fd;
1040   parms->cmd_type   = cmd->hdr.command;
1041   parms->query_type = 0;
1042 
1043   date = time(NULL);
1044   ctime_r(&date, timestamp);
1045   printf("\n%s", timestamp);	/* note ctime_r() leaves \n on end of timestamp */
1046   printf("Queuing command %d from %s (%d)\n", cmd->hdr.command, parms->ip_addr, parms->sock);
1047   fflush(stdout);
1048 
1049   esl_stack_PPush(cmdstack, parms);
1050 }
1051 
1052 static int
clientside_loop(CLIENTSIDE_ARGS * data)1053 clientside_loop(CLIENTSIDE_ARGS *data)
1054 {
1055   int                status;
1056 
1057   char              *ptr;
1058   char              *buffer;
1059   char               opt_str[MAX_BUFFER];
1060 
1061   int                dbx;
1062   int                buf_size;
1063   int                remaining;
1064   int                amount;
1065   int                eod;
1066   int                n;
1067 
1068   P7_HMM            *hmm     = NULL;     /* query HMM                      */
1069   ESL_SQ            *seq     = NULL;     /* query sequence                 */
1070   ESL_SCOREMATRIX   *sco     = NULL;     /* scoring matrix                 */
1071   P7_HMMFILE        *hfp     = NULL;
1072   ESL_ALPHABET      *abc     = NULL;     /* digital alphabet               */
1073   ESL_GETOPTS       *opts    = NULL;     /* search specific options        */
1074   HMMD_COMMAND      *cmd     = NULL;     /* search cmd to send to workers  */
1075 
1076   ESL_STACK         *cmdstack = data->cmdstack;
1077   QUEUE_DATA        *parms;
1078   jmp_buf            jmp_env;
1079   time_t             date;
1080   char               timestamp[32];
1081 
1082   buf_size = MAX_BUFFER;
1083   if ((buffer  = malloc(buf_size))   == NULL) LOG_FATAL_MSG("malloc", errno);
1084   ptr = buffer;
1085   remaining = buf_size;
1086   amount = 0;
1087 
1088   eod = 0;
1089   while (!eod) {
1090     int   l;
1091     char *s;
1092 
1093     /* Receive message from client */
1094     if ((n = read(data->sock_fd, ptr, remaining)) < 0) {
1095       p7_syslog(LOG_ERR,"[%s:%d] - reading %s error %d - %s\n", __FILE__, __LINE__, data->ip_addr, errno, strerror(errno));
1096       return 1;
1097     }
1098 
1099     if (n == 0) return 1;
1100 
1101     ptr += n;
1102     amount += n;
1103     remaining -= n;
1104 
1105     /* scan backwards till we hit the start of the line */
1106     l = amount;
1107     s = ptr - 1;
1108     while (l-- > 0 && (*s == '\n' || *s == '\r')) --s;
1109     while (l-- > 0 && (*s != '\n' && *s != '\r')) --s;
1110     eod = (amount > 1 && *(s + 1) == '/' && *(s + 2) == '/' );
1111 
1112     /* if the buffer is full, make it larger */
1113     if (!eod && remaining == 0) {
1114       if ((buffer = realloc(buffer, buf_size * 2)) == NULL) LOG_FATAL_MSG("realloc", errno);
1115       ptr = buffer + buf_size;
1116       remaining = buf_size;
1117       buf_size *= 2;
1118     }
1119   }
1120 
1121   /* zero terminate the buffer */
1122   if (remaining == 0) {
1123     if ((buffer = realloc(buffer, buf_size + 1)) == NULL) LOG_FATAL_MSG("realloc", errno);
1124     ptr = buffer + buf_size;
1125   }
1126   *ptr = 0;
1127 
1128   /* skip all leading white spaces */
1129   ptr = buffer;
1130   while (*ptr && isspace(*ptr)) ++ptr;
1131 
1132   opt_str[0] = 0;
1133   if (*ptr == '!') {
1134     process_ServerCmd(ptr, data);
1135     free(buffer);
1136     return 0;
1137   } else if (*ptr == '@') {
1138     char *s = ++ptr;
1139 
1140     /* skip to the end of the line */
1141     while (*ptr && (*ptr != '\n' && *ptr != '\r')) ++ptr;
1142     *ptr++ = 0;
1143 
1144     /* create a commandline string with dummy program name for
1145      * the esl_opt_ProcessSpoof() function to parse.
1146      */
1147     snprintf(opt_str, sizeof(opt_str), "hmmpgmd %s\n", s);
1148 
1149     /* skip remaining white spaces */
1150     while (*ptr && isspace(*ptr)) ++ptr;
1151   } else {
1152     client_msg(data->sock_fd, eslEFORMAT, "Missing options string");
1153     free(buffer);
1154     return 0;
1155   }
1156 
1157   if (strncmp(ptr, "//", 2) == 0) {
1158     client_msg(data->sock_fd, eslEFORMAT, "Missing search sequence/hmm");
1159     free(buffer);
1160     return 0;
1161   }
1162 
1163   if (!setjmp(jmp_env)) {
1164     dbx = 0;
1165 
1166     status = process_searchopts(data->sock_fd, opt_str, &opts);
1167     if (status != eslOK) {
1168       client_msg_longjmp(data->sock_fd, status, &jmp_env, "Failed to parse options string: %s", opts->errbuf);
1169     }
1170 
1171     /* the options string can handle an optional database */
1172     if (esl_opt_ArgNumber(opts) > 0) {
1173       client_msg_longjmp(data->sock_fd, status, &jmp_env, "Incorrect number of command line arguments.");
1174     }
1175 
1176     if (esl_opt_IsUsed(opts, "--seqdb")) {
1177       dbx = esl_opt_GetInteger(opts, "--seqdb");
1178     } else if (esl_opt_IsUsed(opts, "--hmmdb")) {
1179       dbx = esl_opt_GetInteger(opts, "--hmmdb");
1180     } else {
1181       client_msg_longjmp(data->sock_fd, eslEINVAL, &jmp_env, "No search database specified, --seqdb or --hmmdb.");
1182     }
1183 
1184 
1185     abc = esl_alphabet_Create(eslAMINO);
1186     seq = NULL;
1187     hmm = NULL;
1188 
1189     if (*ptr == '>') {
1190       /* try to parse the input buffer as a FASTA sequence */
1191       seq = esl_sq_CreateDigital(abc);
1192       /* try to parse the input buffer as a FASTA sequence */
1193       status = esl_sqio_Parse(ptr, strlen(ptr), seq, eslSQFILE_DAEMON);
1194       if (status != eslOK) client_msg_longjmp(data->sock_fd, status, &jmp_env, "Error parsing FASTA sequence");
1195       if (seq->n < 1) client_msg_longjmp(data->sock_fd, eslEFORMAT, &jmp_env, "Error zero length FASTA sequence");
1196 
1197     } else if (strncmp(ptr, "HMM", 3) == 0) {
1198       if (esl_opt_IsUsed(opts, "--hmmdb")) {
1199         client_msg_longjmp(data->sock_fd, status, &jmp_env, "A HMM cannot be used to search a hmm database");
1200       }
1201 
1202       /* try to parse the buffer as an hmm */
1203       status = p7_hmmfile_OpenBuffer(ptr, strlen(ptr), &hfp);
1204       if (status != eslOK) client_msg_longjmp(data->sock_fd, status, &jmp_env, "Failed to open query hmm buffer");
1205 
1206       status = p7_hmmfile_Read(hfp, &abc,  &hmm);
1207       if (status != eslOK) client_msg_longjmp(data->sock_fd, status, &jmp_env, "Error reading query hmm: %s", hfp->errbuf);
1208 
1209       p7_hmmfile_Close(hfp);
1210 
1211     } else {
1212       /* no idea what we are trying to parse */
1213       client_msg_longjmp(data->sock_fd, eslEFORMAT, &jmp_env, "Unknown query sequence/hmm format");
1214     }
1215   } else {
1216     /* an error occured some where, so try to clean up */
1217     if (opts != NULL) esl_getopts_Destroy(opts);
1218     if (abc  != NULL) esl_alphabet_Destroy(abc);
1219     if (hmm  != NULL) p7_hmm_Destroy(hmm);
1220     if (seq  != NULL) esl_sq_Destroy(seq);
1221     if (sco  != NULL) esl_scorematrix_Destroy(sco);
1222 
1223     free(buffer);
1224     return 0;
1225   }
1226 
1227   if ((parms = malloc(sizeof(QUEUE_DATA))) == NULL) LOG_FATAL_MSG("malloc", errno);
1228 
1229   /* build the search structure that will be sent to all the workers */
1230   n = sizeof(HMMD_COMMAND);
1231   n = n + strlen(opt_str) + 1;
1232 
1233   if (seq != NULL) {
1234     n = n + strlen(seq->name) + 1;
1235     n = n + strlen(seq->desc) + 1;
1236     n = n + seq->n + 2;
1237   } else {
1238     n = n + sizeof(P7_HMM);
1239     n = n + sizeof(float) * (hmm->M + 1) * p7H_NTRANSITIONS;
1240     n = n + sizeof(float) * (hmm->M + 1) * abc->K;
1241     n = n + sizeof(float) * (hmm->M + 1) * abc->K;
1242     if (hmm->name   != NULL)    n = n + strlen(hmm->name) + 1;
1243     if (hmm->acc    != NULL)    n = n + strlen(hmm->acc)  + 1;
1244     if (hmm->desc   != NULL)    n = n + strlen(hmm->desc) + 1;
1245     if (hmm->flags & p7H_RF)    n = n + hmm->M + 2;
1246     if (hmm->flags & p7H_MMASK) n = n + hmm->M + 2;
1247     if (hmm->flags & p7H_CONS)  n = n + hmm->M + 2;
1248     if (hmm->flags & p7H_CS)    n = n + hmm->M + 2;
1249     if (hmm->flags & p7H_CA)    n = n + hmm->M + 2;
1250     if (hmm->flags & p7H_MAP)   n = n + sizeof(int) * (hmm->M + 1);
1251   }
1252 
1253   if ((cmd = malloc(n)) == NULL) LOG_FATAL_MSG("malloc", errno);
1254   memset(cmd, 0, n);		/* silence valgrind bitching about uninit bytes; remove if we ever serialize structs properly */
1255   cmd->hdr.length       = n - sizeof(HMMD_HEADER);
1256   cmd->hdr.command      = (esl_opt_IsUsed(opts, "--seqdb")) ? HMMD_CMD_SEARCH : HMMD_CMD_SCAN;
1257   cmd->srch.db_inx      = dbx - 1;   /* the program indexes databases 0 .. n-1 */
1258   cmd->srch.opts_length = strlen(opt_str) + 1;
1259 
1260   ptr = cmd->srch.data;
1261 
1262   memcpy(ptr, opt_str, cmd->srch.opts_length);
1263   ptr += cmd->srch.opts_length;
1264 
1265   if (seq != NULL) {
1266     cmd->srch.query_type   = HMMD_SEQUENCE;
1267     cmd->srch.query_length = seq->n + 2;
1268 
1269     n = strlen(seq->name) + 1;
1270     memcpy(ptr, seq->name, n);
1271     ptr += n;
1272 
1273     n = strlen(seq->desc) + 1;
1274     memcpy(ptr, seq->desc, n);
1275     ptr += n;
1276 
1277     n = seq->n + 2;
1278     memcpy(ptr, seq->dsq, n);
1279     ptr += n;
1280   } else {
1281     cmd->srch.query_type   = HMMD_HMM;
1282     cmd->srch.query_length = hmm->M;
1283 
1284     n = sizeof(P7_HMM);
1285     memcpy(ptr, hmm, n);
1286     ptr += n;
1287 
1288     n = sizeof(float) * (hmm->M + 1) * p7H_NTRANSITIONS;
1289     memcpy(ptr, *hmm->t, n);
1290     ptr += n;
1291 
1292     n = sizeof(float) * (hmm->M + 1) * abc->K;
1293     memcpy(ptr, *hmm->mat, n);
1294     ptr += n;
1295     memcpy(ptr, *hmm->ins, n);
1296     ptr += n;
1297 
1298     if (hmm->name) { n = strlen(hmm->name) + 1;  memcpy(ptr, hmm->name, n);  ptr += n; }
1299     if (hmm->acc)  { n = strlen(hmm->acc)  + 1;  memcpy(ptr, hmm->acc, n);   ptr += n; }
1300     if (hmm->desc) { n = strlen(hmm->desc) + 1;  memcpy(ptr, hmm->desc, n);  ptr += n; }
1301 
1302     n = hmm->M + 2;
1303     if (hmm->flags & p7H_RF)    { memcpy(ptr, hmm->rf,        n); ptr += n; }
1304     if (hmm->flags & p7H_MMASK) { memcpy(ptr, hmm->mm,        n); ptr += n; }
1305     if (hmm->flags & p7H_CONS)  { memcpy(ptr, hmm->consensus, n); ptr += n; }
1306     if (hmm->flags & p7H_CS)    { memcpy(ptr, hmm->cs,        n); ptr += n; }
1307     if (hmm->flags & p7H_CA)    { memcpy(ptr, hmm->ca,        n); ptr += n; }
1308 
1309     if (hmm->flags & p7H_MAP) {
1310       n = sizeof(int) * (hmm->M + 1);
1311       memcpy(ptr, hmm->map, n);
1312       ptr += n;
1313     }
1314   }
1315 
1316   parms->hmm  = hmm;
1317   parms->seq  = seq;
1318   parms->abc  = abc;
1319   parms->opts = opts;
1320   parms->dbx  = dbx - 1;
1321   parms->cmd  = cmd;
1322 
1323   strcpy(parms->ip_addr, data->ip_addr);
1324   parms->sock       = data->sock_fd;
1325   parms->cmd_type   = cmd->hdr.command;
1326   parms->query_type = (seq != NULL) ? HMMD_SEQUENCE : HMMD_HMM;
1327 
1328   date = time(NULL);
1329   ctime_r(&date, timestamp);
1330   printf("\n%s", timestamp);	/* note ctime_r() leaves \n on end of timestamp */
1331 
1332   if (parms->seq != NULL) {
1333     printf("Queuing %s %s from %s (%d)\n", (cmd->hdr.command == HMMD_CMD_SEARCH) ? "search" : "scan", parms->seq->name, parms->ip_addr, parms->sock);
1334   } else {
1335     printf("Queuing hmm %s from %s (%d)\n", parms->hmm->name, parms->ip_addr, parms->sock);
1336   }
1337   printf("%s", opt_str);	/* note opt_str already has trailing \n */
1338   fflush(stdout);
1339 
1340   esl_stack_PPush(cmdstack, parms);
1341 
1342   free(buffer);
1343   return 0;
1344 }
1345 
1346 
1347 /* discard_function()
1348  * function handed to esl_stack_DiscardSelected() to remove
1349  * all commands in the stack that are associated with a
1350  * particular client socket, because we're closing that
1351  * client down. Prototype to this is dictate by the generalized
1352  * interface to esl_stack_DiscardSelected().
1353  */
1354 static int
discard_function(void * elemp,void * args)1355 discard_function(void *elemp, void *args)
1356 {
1357   QUEUE_DATA  *elem = (QUEUE_DATA *) elemp;
1358   int          fd   = * (int *) args;
1359 
1360   if (elem->sock == fd)
1361     {
1362       free_QueueData(elem);
1363       return TRUE;
1364     }
1365   return FALSE;
1366 }
1367 
1368 static void *
clientside_thread(void * arg)1369 clientside_thread(void *arg)
1370 {
1371   int              eof;
1372   CLIENTSIDE_ARGS *data = (CLIENTSIDE_ARGS *)arg;
1373 
1374   /* Guarantees that thread resources are deallocated upon return */
1375   pthread_detach(pthread_self());
1376 
1377   eof = 0;
1378   while (!eof) {
1379     eof = clientside_loop(data);
1380   }
1381 
1382   /* remove any commands in stack associated with this client's socket */
1383   esl_stack_DiscardSelected(data->cmdstack, discard_function, &(data->sock_fd));
1384 
1385   printf("Closing %s (%d)\n", data->ip_addr, data->sock_fd);
1386   fflush(stdout);
1387 
1388   close(data->sock_fd);
1389   free(data);
1390 
1391   pthread_exit(NULL);
1392 }
1393 
1394 static void *
client_comm_thread(void * arg)1395 client_comm_thread(void *arg)
1396 {
1397   int                  n;
1398   int                  fd;
1399   int                  addrlen;
1400   pthread_t            thread_id;
1401 
1402   struct sockaddr_in   addr;
1403 
1404   CLIENTSIDE_ARGS     *targs    = NULL;
1405   CLIENTSIDE_ARGS     *data     = (CLIENTSIDE_ARGS *)arg;
1406 
1407   for ( ;; ) {
1408 
1409     /* Wait for a client to connect */
1410     n = sizeof(addr);
1411     if ((fd = accept(data->sock_fd, (struct sockaddr *)&addr, (unsigned int *)&n)) < 0) LOG_FATAL_MSG("accept", errno);
1412 
1413     if ((targs = malloc(sizeof(CLIENTSIDE_ARGS))) == NULL) LOG_FATAL_MSG("malloc", errno);
1414     targs->cmdstack   = data->cmdstack;
1415     targs->sock_fd    = fd;
1416 
1417     addrlen = sizeof(targs->ip_addr);
1418     strncpy(targs->ip_addr, inet_ntoa(addr.sin_addr), addrlen);
1419     targs->ip_addr[addrlen-1] = 0;
1420 
1421     if ((n = pthread_create(&thread_id, NULL, clientside_thread, targs)) != 0) LOG_FATAL_MSG("thread create", n);
1422   }
1423 
1424   pthread_exit(NULL);
1425 }
1426 
1427 static void
setup_clientside_comm(ESL_GETOPTS * opts,CLIENTSIDE_ARGS * args)1428 setup_clientside_comm(ESL_GETOPTS *opts, CLIENTSIDE_ARGS *args)
1429 {
1430   int                  n;
1431   int                  reuse;
1432   int                  sock_fd;
1433   pthread_t            thread_id;
1434 
1435   struct linger        linger;
1436   struct sockaddr_in   addr;
1437 
1438   /* Create socket for incoming connections */
1439   if ((sock_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) LOG_FATAL_MSG("socket", errno);
1440 
1441   /* incase the server went down in an ungraceful way, allow the port to be
1442    * reused avoiding the timeout.
1443    */
1444   reuse = 1;
1445   if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) LOG_FATAL_MSG("setsockopt", errno);
1446 
1447   /* the sockets are never closed, so if the server exits, force the kernel to
1448    * close the socket and clear it so the server can be restarted immediately.
1449    */
1450   linger.l_onoff = 1;
1451   linger.l_linger = 0;
1452   if (setsockopt(sock_fd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) LOG_FATAL_MSG("setsockopt", errno);
1453 
1454   /* Construct local address structure */
1455   memset(&addr, 0, sizeof(addr));
1456   addr.sin_family = AF_INET;
1457   addr.sin_addr.s_addr = htonl(INADDR_ANY);
1458   addr.sin_port = htons(esl_opt_GetInteger(opts, "--cport"));
1459 
1460   /* Bind to the local address */
1461   if (bind(sock_fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) LOG_FATAL_MSG("bind", errno);
1462 
1463   /* Mark the socket so it will listen for incoming connections */
1464   if (listen(sock_fd, esl_opt_GetInteger(opts, "--ccncts")) < 0) LOG_FATAL_MSG("listen", errno);
1465   args->sock_fd = sock_fd;
1466 
1467   if ((n = pthread_create(&thread_id, NULL, client_comm_thread, (void *)args)) != 0) LOG_FATAL_MSG("socket", n);
1468 }
1469 
1470 static void
workerside_loop(WORKERSIDE_ARGS * data,WORKER_DATA * worker)1471 workerside_loop(WORKERSIDE_ARGS *data, WORKER_DATA *worker)
1472 {
1473   ESL_STOPWATCH      *w     = NULL;
1474   HMMD_SEARCH_STATS  *stats = NULL;
1475   HMMD_COMMAND        cmd;
1476   int    n;
1477   int    size;
1478   int    total;
1479   char  *ptr;
1480   uint8_t *buf; // Buffer to receive bytes into over sockets
1481   uint32_t buf_position; //Index into buffer for deserialize
1482   memset(&cmd, 0, sizeof(HMMD_COMMAND)); /* silence valgrind. if we ever serialize structs properly, remove */
1483   w = esl_stopwatch_Create();
1484 
1485   for ( ; ; ) {
1486 
1487     /* wait for the next search object */
1488     if ((n = pthread_mutex_lock (&data->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
1489 
1490     /* wait for the master's signal to start the calculations */
1491     while (worker->cmd == NULL) {
1492       if ((n = pthread_cond_wait(&data->start_cond, &data->work_mutex)) != 0) LOG_FATAL_MSG("cond wait", n);
1493     }
1494 
1495     if ((n = pthread_mutex_unlock (&data->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
1496 
1497     if (worker->cmd->hdr.command == HMMD_CMD_SHUTDOWN) {
1498       fd_set rset;
1499       struct timeval tv;
1500 
1501       n = MSG_SIZE(worker->cmd);
1502       if (writen(worker->sock_fd, worker->cmd, n) != n) {
1503         p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1504         break;
1505       }
1506 
1507       FD_ZERO(&rset);
1508       FD_SET(worker->sock_fd, &rset);
1509 
1510       tv.tv_sec = 2;
1511       tv.tv_usec = 0;
1512 
1513       if ((n = select(worker->sock_fd + 1, &rset, NULL, NULL, &tv)) < 0) {
1514         p7_syslog(LOG_ERR,"[%s:%d] - select %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1515       } else {
1516         if (n == 0) {
1517           p7_syslog(LOG_ERR,"[%s:%d] - shutdown %s is not responding\n", __FILE__, __LINE__, worker->ip_addr);
1518         } else {
1519           n = sizeof(HMMD_HEADER);
1520           if ((size = readn(worker->sock_fd, &cmd, n)) == -1) {
1521             p7_syslog(LOG_ERR,"[%s:%d] - reading %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1522           }
1523           if (cmd.hdr.command == HMMD_CMD_SHUTDOWN) {
1524             p7_syslog(LOG_ERR,"[%s:%d] - shutting down %s\n", __FILE__, __LINE__, worker->ip_addr);
1525           } else {
1526             p7_syslog(LOG_ERR,"[%s:%d] - error shutting down %s - received %d\n", __FILE__, __LINE__, worker->ip_addr, cmd.hdr.command);
1527           }
1528         }
1529       }
1530       break;
1531     }
1532 
1533     //printf ("Writing %d bytes to %s [MSG = %d/%d]\n", (int)MSG_SIZE(worker->cmd), worker->ip_addr, worker->cmd->hdr.command, worker->cmd->hdr.length);
1534 
1535     esl_stopwatch_Start(w);
1536 
1537     /* write search message in two parts */
1538     n = sizeof(HMMD_HEADER) + sizeof(HMMD_SEARCH_CMD);
1539     memcpy(&cmd, worker->cmd, n);
1540     cmd.srch.inx = worker->srch_inx;
1541     cmd.srch.cnt = worker->srch_cnt;
1542     if (writen(worker->sock_fd, &cmd, n) != n) {
1543       p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1544       break;
1545     }
1546 
1547     /* write remaining data, i.e. sequence, options etc. */
1548     ptr = (char *)worker->cmd;
1549     ptr += n;
1550     n = MSG_SIZE(worker->cmd) - n;
1551     if (writen(worker->sock_fd, ptr, n) != n) {
1552       p7_syslog(LOG_ERR,"[%s:%d] - writing %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1553       break;
1554     }
1555 
1556     total = 0;
1557     worker->total = 0;
1558 
1559     n = HMMD_SEARCH_STATUS_SERIAL_SIZE;
1560     buf = malloc(n);
1561     if (buf == NULL){
1562       LOG_FATAL_MSG("malloc", errno);
1563     }
1564 
1565     total += n;
1566     if ((size = readn(worker->sock_fd, buf, n)) == -1) {
1567       p7_syslog(LOG_ERR,"[%s:%d] - reading %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1568       break;
1569     }
1570 
1571     buf_position = 0;
1572     if(hmmd_search_status_Deserialize(buf, &buf_position, &(worker->status)) != eslOK){
1573        LOG_FATAL_MSG("Couldn't deserialize HMMD_SEARCH_STATUS", errno);
1574     }
1575 
1576     if (worker->status.status != eslOK) {
1577       n = worker->status.msg_size;
1578       total += n;
1579       if ((worker->err_buf = malloc(n)) == NULL) LOG_FATAL_MSG("malloc", errno);
1580       worker->err_buf[0] = 0;
1581       if ((size = readn(worker->sock_fd, worker->err_buf, n)) == -1) {
1582         p7_syslog(LOG_ERR,"[%s:%d] - reading %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1583         break;
1584       }
1585     } else {
1586 
1587       // receive the results from the worker
1588       buf = realloc(buf, worker->status.msg_size);
1589       if(buf == NULL){
1590         LOG_FATAL_MSG("malloc", errno);
1591       }
1592 
1593       total += worker->status.msg_size;
1594       if ((size = readn(worker->sock_fd, buf, worker->status.msg_size)) == -1) {
1595         p7_syslog(LOG_ERR,"[%s:%d] - reading %s error %d - %s\n", __FILE__, __LINE__, worker->ip_addr, errno, strerror(errno));
1596         break;
1597       }
1598 
1599       buf_position = 0; // start at beginning of new buffer of data
1600       // Now, serialize the data structures out of it
1601       if(p7_hmmd_search_stats_Deserialize(buf, &buf_position, &(worker->stats)) != eslOK){
1602         LOG_FATAL_MSG("Couldn't deserialize HMMD_SEARCH_STATS", errno);
1603       }
1604       stats = &worker->stats;
1605       if(stats->nhits > 0){
1606         worker->hits = malloc(stats->nhits * sizeof(P7_HIT *));
1607         if(worker->hits == NULL){
1608           LOG_FATAL_MSG("malloc", errno);
1609         }
1610         worker->allocated_hits = stats->nhits;  // Need this if we have to destroy the worker because of an error
1611         /* read in the hits */
1612         for(int i = 0; i < stats->nhits; i++){
1613           worker->hits[i] = p7_hit_Create_empty();
1614           if(worker->hits[i] == NULL){
1615             LOG_FATAL_MSG("malloc", errno);
1616           }
1617           if(p7_hit_Deserialize(buf, &buf_position, worker->hits[i]) != eslOK){
1618             LOG_FATAL_MSG("Couldn't deserialize P7_HIT", errno);
1619           }
1620         }
1621       }
1622       free(buf);
1623     }
1624 
1625     /* We've just allocated an array of pointers to P7_HIT objects and a bunch of P7_HIT
1626       objects that we don't free in this function.  Here's what happens to them.  gather_results() assembles
1627       all of the P7_HIT objects from the different workers into one big list, which it passes to forward_results().
1628       gather_results() frees each worker's array of pointers to P7_HIT objects, and forward_results is responsible for
1629       freeing all of the P7_HIT objects when it's done with them */
1630 
1631     esl_stopwatch_Stop(w);
1632 
1633     if ((n = pthread_mutex_lock (&data->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
1634 
1635     /* set the state of the worker to completed */
1636     worker->cmd       = NULL;
1637     worker->completed = 1;
1638     worker->total     = total;
1639     ++data->completed;
1640 
1641     /* notify the master that a worker has completed */
1642     if ((n = pthread_cond_broadcast(&data->complete_cond)) != 0) LOG_FATAL_MSG("cond broadcast", n);
1643     if ((n = pthread_mutex_unlock (&data->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
1644 
1645     printf ("WORKER %s COMPLETED: %.2f sec received %d bytes\n", worker->ip_addr, w->elapsed, total);
1646     fflush(stdout);
1647   }
1648 
1649   esl_stopwatch_Destroy(w);
1650 
1651   return;
1652 }
1653 
1654 static void *
workerside_thread(void * arg)1655 workerside_thread(void *arg)
1656 {
1657   HMMD_COMMAND     *cmd     = NULL;
1658   WORKER_DATA      *worker  = (WORKER_DATA *)arg;
1659   WORKERSIDE_ARGS  *parent  = (WORKERSIDE_ARGS *)worker->parent;
1660   HMMD_HEADER       hdr;
1661   int               n;
1662   int               fd = 0;
1663   int               version;
1664   int               updated;
1665   int               status = eslOK;
1666   char             *p;
1667 
1668   memset(&hdr, 0, sizeof(HMMD_HEADER)); /* silence valgrind; remove if/when we serialize structs properly */
1669 
1670   /* Guarantees that thread resources are deallocated upon return */
1671   pthread_detach(pthread_self());
1672 
1673   printf("Handling worker %s (%d)\n", worker->ip_addr, worker->sock_fd);
1674   fflush(stdout);
1675 
1676   updated = 0;
1677   while (!updated) {
1678     /* get the database version to load */
1679     if ((n = pthread_mutex_lock (&parent->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
1680     version = parent->db_version;
1681     if ((n = pthread_mutex_unlock (&parent->work_mutex)) != 0)  LOG_FATAL_MSG("mutex unlock", n);
1682 
1683     n = sizeof(HMMD_COMMAND);
1684     if (parent->seq_db != NULL) n += strlen(parent->seq_db->name) + 1;
1685     if (parent->hmm_db != NULL) n += strlen(parent->hmm_db->name) + 1;
1686 
1687     cmd = malloc(n);
1688     if (cmd == NULL) {
1689       p7_syslog(LOG_ERR,"[%s:%d] - malloc %d - %s\n", __FILE__, __LINE__, errno, strerror(errno));
1690       goto EXIT;
1691     }
1692     memset(cmd, 0, n);
1693 
1694     cmd->hdr.length  = n - sizeof(HMMD_HEADER);
1695     cmd->hdr.command = HMMD_CMD_INIT;
1696 
1697     p = cmd->init.data;
1698 
1699     if (parent->seq_db != NULL) {
1700       cmd->init.db_cnt      = parent->seq_db->db_cnt;
1701       cmd->init.seq_cnt     = parent->seq_db->count;
1702       cmd->init.seqdb_off   = p - cmd->init.data;
1703 
1704       strncpy(cmd->init.sid, parent->seq_db->id, sizeof(cmd->init.sid));
1705       cmd->init.sid[sizeof(cmd->init.sid)-1] = 0;
1706 
1707       strcpy(p, parent->seq_db->name);
1708       p += strlen(parent->seq_db->name) + 1;
1709     }
1710 
1711     if (parent->hmm_db != NULL) {
1712       cmd->init.hmm_cnt     = 1;
1713       cmd->init.model_cnt   = parent->hmm_db->n;
1714       cmd->init.hmmdb_off   = p - cmd->init.data;
1715 
1716       //strncpy(cmd->init.hid, parent->hmm_db->id, sizeof(cmd->init.hid));
1717       //cmd->init.hid[sizeof(cmd->init.hid)-1] = 0;
1718 
1719       strcpy(p, parent->hmm_db->name);
1720       p += strlen(parent->hmm_db->name) + 1;
1721     }
1722 
1723     if (writen(worker->sock_fd, cmd, n) != n) {
1724       p7_syslog(LOG_ERR,"[%s:%d] - writing (%d) error %d - %s\n", __FILE__, __LINE__, worker->sock_fd, errno, strerror(errno));
1725       status = eslFAIL;
1726     }
1727 
1728     /* process the init command first */
1729     if (readn(worker->sock_fd, &hdr, sizeof(hdr)) == -1) {
1730       p7_syslog(LOG_ERR,"[%s:%d] - reading (%d) error %d - %s\n", __FILE__, __LINE__, worker->sock_fd, errno, strerror(errno));
1731       status = eslFAIL;
1732     }
1733 
1734     /* cmd is a HMMD_COMMAND.
1735      *    consists of HMMD_HEADER:  length, command, status
1736      *    and a union of HMMD_INIT_CMD, HMMD_SEARCH_COMMAND, HMMD_INIT_RESET.
1737      *    we know which is valid, from hdr.command
1738      *    the total malloc size for an HMMD_COMMAND is calculated from the header, using MSG_SIZE(cmd)
1739      */
1740     n = MSG_SIZE(&hdr);
1741     if ((cmd = realloc(cmd, n)) == NULL) {
1742       p7_syslog(LOG_ERR,"[%s:%d] - realloc error %d - %s\n", __FILE__, __LINE__, errno, strerror(errno));
1743       status = eslFAIL;
1744     }
1745     if (readn(worker->sock_fd, &(cmd->init), hdr.length) == -1) {
1746       p7_syslog(LOG_ERR,"[%s:%d] - reading (%d) error %d - %s\n", __FILE__, __LINE__, worker->sock_fd, errno, strerror(errno));
1747       status = eslFAIL;
1748     }
1749 
1750     /* validate the database of the worker before adding him to the list */
1751     if (hdr.command != HMMD_CMD_INIT) {
1752       p7_syslog(LOG_ERR,"[%s:%d] - expecting HMMD_CMD_INIT %d\n", __FILE__, __LINE__, hdr.command);
1753       status = eslFAIL;
1754     }
1755     if (cmd->hdr.status != eslOK) {
1756       p7_syslog(LOG_ERR,"[%s:%d] - workers init status failed %d\n", __FILE__, __LINE__, cmd->hdr.status);
1757       status = eslFAIL;
1758     }
1759 
1760     worker->next = NULL;
1761     worker->prev = NULL;
1762 
1763     /* add the worker to the pending list */
1764     if ((n = pthread_mutex_lock (&parent->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
1765 
1766     assert(validate_workers(parent));
1767 
1768     /* make sure the master has not loaded a new database while we were waiting
1769      * for the worker to load and verify the database we started out this.  If
1770      * the version has changed, force the worker to reload and verify.
1771      */
1772     if (version == parent->db_version) {
1773       if (status == eslOK) {
1774         worker->next    = parent->pending;
1775         parent->pending = worker;
1776         ++parent->pend_cnt;
1777       } else {
1778         worker->next   = parent->idling;
1779         parent->idling = worker;
1780         ++parent->idle_cnt;
1781       }
1782       updated = 1;
1783     }
1784 
1785     assert(validate_workers(parent));
1786 
1787     if ((n = pthread_mutex_unlock (&parent->work_mutex)) != 0)  LOG_FATAL_MSG("mutex unlock", n);
1788   }
1789 
1790   printf("Pending worker %s (%d)\n", worker->ip_addr, worker->sock_fd);
1791   fflush(stdout);
1792 
1793   workerside_loop(parent, worker);
1794 
1795   if ((n = pthread_mutex_lock (&parent->work_mutex)) != 0) LOG_FATAL_MSG("mutex lock", n);
1796 
1797   fd = worker->sock_fd;
1798 
1799   ++parent->failed;
1800   ++parent->completed;
1801 
1802   worker->terminated = 1;
1803   worker->total      = 0;
1804   worker->sock_fd    = -1;
1805 
1806   assert(validate_workers(parent));
1807 
1808   /* notify the master that a worker has completed */
1809   if ((n = pthread_cond_broadcast(&parent->complete_cond)) != 0) LOG_FATAL_MSG("cond broadcast", n);
1810   if ((n = pthread_mutex_unlock (&parent->work_mutex)) != 0) LOG_FATAL_MSG("mutex unlock", n);
1811 
1812  EXIT:
1813   printf("Closing worker %s (%d)\n", worker->ip_addr, fd);
1814   fflush(stdout);
1815 
1816   if (cmd != NULL) free(cmd);
1817   close(fd);
1818 
1819   pthread_exit(NULL);
1820 }
1821 
1822 static void *
worker_comm_thread(void * arg)1823 worker_comm_thread(void *arg)
1824 {
1825   int                  n;
1826   int                  fd;
1827   int                  addrlen;
1828   pthread_t            thread_id;
1829 
1830   struct sockaddr_in   addr;
1831 
1832   WORKERSIDE_ARGS     *data  = (WORKERSIDE_ARGS *)arg;
1833   WORKER_DATA         *worker;
1834 
1835   for ( ;; ) {
1836 
1837     /* Wait for a worker to connect */
1838     n = sizeof(addr);
1839     if ((fd = accept(data->sock_fd, (struct sockaddr *)&addr, (unsigned int *)&n)) < 0) LOG_FATAL_MSG("accept", errno);
1840 
1841     if ((worker = malloc(sizeof(WORKER_DATA))) == NULL) LOG_FATAL_MSG("thread create", errno);
1842     memset(worker, 0, sizeof(WORKER_DATA));
1843 
1844     worker->parent     = data;
1845     worker->sock_fd    = fd;
1846     worker->allocated_hits = 0; // These may be redundant because of the memset earlier, but better safe than sorry
1847     worker->hits = NULL;
1848 
1849     addrlen = sizeof(worker->ip_addr);
1850     strncpy(worker->ip_addr, inet_ntoa(addr.sin_addr), addrlen);
1851     worker->ip_addr[addrlen-1] = 0;
1852 
1853     if ((n = pthread_create(&thread_id, NULL, workerside_thread, worker)) != 0) LOG_FATAL_MSG("thread create", n);
1854   }
1855 
1856   pthread_exit(NULL);
1857 }
1858 
1859 static void
setup_workerside_comm(ESL_GETOPTS * opts,WORKERSIDE_ARGS * args)1860 setup_workerside_comm(ESL_GETOPTS *opts, WORKERSIDE_ARGS *args)
1861 {
1862   int                  n;
1863   int                  reuse;
1864   int                  sock_fd;
1865   pthread_t            thread_id;
1866 
1867   struct linger        linger;
1868   struct sockaddr_in   addr;
1869 
1870   /* Create socket for incoming connections */
1871   if ((sock_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) LOG_FATAL_MSG("socket", errno);
1872 
1873   /* incase the server went down in an ungraceful way, allow the port to be
1874    * reused avoiding the timeout.
1875    */
1876   reuse = 1;
1877   if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) LOG_FATAL_MSG("setsockopt", errno);
1878 
1879   /* the sockets are never closed, so if the server exits, force the kernel to
1880    * close the socket and clear it so the server can be restarted immediately.
1881    */
1882   linger.l_onoff = 1;
1883   linger.l_linger = 0;
1884   if (setsockopt(sock_fd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) LOG_FATAL_MSG("setsockopt", errno);
1885 
1886   /* Construct local address structure */
1887   memset(&addr, 0, sizeof(addr));
1888   addr.sin_family = AF_INET;
1889   addr.sin_addr.s_addr = htonl(INADDR_ANY);
1890   addr.sin_port = htons(esl_opt_GetInteger(opts, "--wport"));
1891 
1892   /* Bind to the local address */
1893   if (bind(sock_fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) LOG_FATAL_MSG("bind", errno);
1894 
1895   /* Mark the socket so it will listen for incoming connections */
1896   if (listen(sock_fd, esl_opt_GetInteger(opts, "--wcncts")) < 0) LOG_FATAL_MSG("listen", errno);
1897 
1898   args->sock_fd = sock_fd;
1899 
1900   if ((n = pthread_create(&thread_id, NULL, worker_comm_thread, (void *)args)) != 0) LOG_FATAL_MSG("thread create", n);
1901 }
1902 
1903 #endif /*HMMER_THREADS*/
1904 
1905 
1906 
1907