1 /* worker side of the hmmer daemon
2  */
3 #include "p7_config.h"
4 
5 #ifdef HMMER_THREADS
6 
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <errno.h>
11 #include <unistd.h>
12 #include <signal.h>
13 #include <pthread.h>
14 #include <setjmp.h>
15 #include <sys/socket.h>
16 #ifdef HAVE_NETINET_IN_H
17 #include <netinet/in.h>     /* On FreeBSD, you need netinet/in.h for struct sockaddr_in            */
18 #endif                      /* On OpenBSD, netinet/in.h is required for (must precede) arpa/inet.h */
19 #include <arpa/inet.h>
20 #include <syslog.h>
21 #include <time.h>
22 
23 #ifndef HMMER_THREADS
24 #error "Program requires pthreads be enabled."
25 #endif /*HMMER_THREADS*/
26 
27 #include "easel.h"
28 #include "esl_alphabet.h"
29 #include "esl_getopts.h"
30 #include "esl_sq.h"
31 #include "esl_sqio.h"
32 #include "esl_stopwatch.h"
33 #include "esl_threads.h"
34 
35 #include "hmmer.h"
36 #include "hmmpgmd.h"
37 #include "cachedb.h"
38 #include "p7_hmmcache.h"
39 
40 #define MAX_WORKERS  64
41 #define MAX_BUFFER   4096
42 
43 #define CONF_FILE "/etc/hmmpgmd.conf"
44 
45 typedef struct {
46   HMMER_SEQ       **sq_list;     /* list of sequences to process     */
47   int               sq_cnt;      /* number of sequences              */
48   int               db_Z;        /* true number of sequences         */
49 
50   P7_OPROFILE     **om_list;     /* list of profiles to process      */
51   int               om_cnt;      /* number of profiles               */
52 
53   pthread_mutex_t  *inx_mutex;   /* protect data                     */
54   int              *blk_size;    /* sequences per block              */
55   int              *limit;       /* point to decrease block size     */
56   int              *inx;         /* next index to process            */
57 
58   P7_HMM           *hmm;         /* query HMM                        */
59   ESL_SQ           *seq;         /* query sequence                   */
60   ESL_ALPHABET     *abc;         /* digital alphabet                 */
61   ESL_GETOPTS      *opts;        /* search specific options          */
62 
63   RANGE_LIST       *range_list;  /* (optional) list of ranges searched within the seqdb */
64 
65   double            elapsed;     /* elapsed search time              */
66 
67   /* Structure created and populated by the individual threads.
68    * The main thread is responsible for freeing up the memory.
69    */
70   P7_PIPELINE      *pli;         /* work pipeline                    */
71   P7_TOPHITS       *th;          /* top hit results                  */
72 } WORKER_INFO;
73 
74 typedef struct {
75   int fd;                        /* socket connection to server      */
76   int ncpus;                     /* number of cpus to use            */
77 
78   P7_SEQCACHE *seq_db;           /* cached sequence database         */
79   P7_HMMCACHE *hmm_db;           /* cached hmm database              */
80 } WORKER_ENV;
81 
82 static void process_InitCmd(HMMD_COMMAND *cmd, WORKER_ENV *env);
83 static void process_SearchCmd(HMMD_COMMAND *cmd, WORKER_ENV *env, QUEUE_DATA *query);
84 static void process_Shutdown(HMMD_COMMAND *cmd, WORKER_ENV *env);
85 
86 static QUEUE_DATA *process_QueryCmd(HMMD_COMMAND *cmd, WORKER_ENV *env);
87 
88 static int  setup_masterside_comm(ESL_GETOPTS *opts);
89 
90 static void send_results(int fd, ESL_STOPWATCH *w, P7_TOPHITS *th, P7_PIPELINE *pli);
91 
92 #define BLOCK_SIZE 1000
93 static void search_thread(void *arg);
94 static void scan_thread(void *arg);
95 
96 static void
print_timings(int i,double elapsed,P7_PIPELINE * pli)97 print_timings(int i, double elapsed, P7_PIPELINE *pli)
98 {
99   char buf1[16];
100   int h, m, s, hs;
101 
102   h  = (int) (elapsed / 3600.);
103   m  = (int) (elapsed / 60.) - h * 60;
104   s  = (int) (elapsed) - h * 3600 - m * 60;
105   hs = (int) (elapsed * 100.) - h * 360000 - m * 6000 - s * 100;
106   sprintf(buf1, "%02d:%02d.%02d", m,s,hs);
107 
108   fprintf (stdout, "%2d %9" PRId64 " %9" PRId64 " %7" PRId64 " %7" PRId64 " %6" PRId64 " %5" PRId64 " %s\n",
109            i, pli->nseqs, pli->nres, pli->n_past_msv, pli->n_past_bias, pli->n_past_vit, pli->n_past_fwd, buf1);
110 }
111 
112 static int
read_Command(HMMD_COMMAND ** ret_cmd,WORKER_ENV * env)113 read_Command(HMMD_COMMAND **ret_cmd, WORKER_ENV *env)
114 {
115   HMMD_HEADER   hdr;
116   HMMD_COMMAND *cmd = NULL;
117   int           n;
118 
119   /* read the command header */
120   if (readn(env->fd, &hdr, sizeof(hdr)) == -1) {
121     if (errno && errno != ECONNREFUSED) LOG_FATAL_MSG("read", errno);
122     return eslEOD;
123   }
124 
125   /* read the command data */
126   n = MSG_SIZE(&hdr);
127   if ((cmd = malloc(n)) == NULL) LOG_FATAL_MSG("malloc", errno);
128   memset(cmd, 0, n);		/* avoid uninitialized bytes. remove this, if we ever serialize/deserialize structures properly */
129   cmd->hdr.command = hdr.command;
130   cmd->hdr.length  = hdr.length;
131   if (hdr.length > 0) {
132     if (readn(env->fd, &cmd->init, hdr.length) == -1) {
133       if (errno && errno != ECONNREFUSED) LOG_FATAL_MSG("read", errno);
134       return eslEOD;
135     }
136   }
137 
138   *ret_cmd = cmd;
139   return eslOK;
140 }
141 
142 void
worker_process(ESL_GETOPTS * go)143 worker_process(ESL_GETOPTS *go)
144 {
145   HMMD_COMMAND *cmd      = NULL;  /* see hmmpgmd.h */
146   int           shutdown = 0;
147   WORKER_ENV    env;
148   int           status;
149 
150   QUEUE_DATA      *query      = NULL;
151 
152   /* Initializations */
153   impl_Init();
154   p7_FLogsumInit();      /* we're going to use table-driven Logsum() approximations at times */
155 
156   env.ncpus = ESL_MIN(esl_opt_GetInteger(go, "--cpu"),  esl_threads_GetCPUCount());
157 
158   env.hmm_db = NULL;
159   env.seq_db = NULL;
160   env.fd     = setup_masterside_comm(go);
161 
162   while (!shutdown)
163     {
164       if ((status = read_Command(&cmd, &env)) != eslOK) break;
165 
166       switch (cmd->hdr.command) {
167       case HMMD_CMD_INIT:      process_InitCmd  (cmd, &env);                break;
168       case HMMD_CMD_SCAN:
169 	  {
170  		   query = process_QueryCmd(cmd, &env);
171  		   process_SearchCmd(cmd, &env, query);
172  		   free_QueueData(query);
173 	  }
174 		 break;
175       case HMMD_CMD_SEARCH:
176 		   query = process_QueryCmd(cmd, &env);
177 	     process_SearchCmd(cmd, &env, query);
178        free_QueueData(query);
179          break;
180       case HMMD_CMD_SHUTDOWN:  process_Shutdown (cmd, &env);  shutdown = 1; break;
181       default: p7_syslog(LOG_ERR,"[%s:%d] - unknown command %d (%d)\n", __FILE__, __LINE__, cmd->hdr.command, cmd->hdr.length);
182       }
183 
184       free(cmd);
185       cmd = NULL;
186     }
187 
188   if (env.hmm_db) p7_hmmcache_Close(env.hmm_db);
189   if (env.seq_db) p7_seqcache_Close(env.seq_db);
190   if (env.fd != -1) close(env.fd);
191   return;
192 }
193 
194 
195 static void
process_SearchCmd(HMMD_COMMAND * cmd,WORKER_ENV * env,QUEUE_DATA * query)196 process_SearchCmd(HMMD_COMMAND *cmd, WORKER_ENV *env, QUEUE_DATA *query)
197 {
198   int              i;
199   int              cnt;
200   int              limit;
201   int              status;
202   int              blk_size;
203   WORKER_INFO     *info       = NULL;
204   ESL_ALPHABET    *abc;
205   ESL_STOPWATCH   *w;
206   ESL_THREADS     *threadObj  = NULL;
207   pthread_mutex_t  inx_mutex;
208   int              current_index;
209   time_t           date;
210   char             timestamp[32];
211 
212   w = esl_stopwatch_Create();
213   abc = esl_alphabet_Create(eslAMINO);
214 
215   if (pthread_mutex_init(&inx_mutex, NULL) != 0) p7_Fail("mutex init failed");
216   ESL_ALLOC(info, sizeof(*info) * env->ncpus);
217 
218   /* Log the current time (at search start) */
219   date = time(NULL);
220   ctime_r(&date, timestamp);
221   printf("\n%s", timestamp);	/* note that ctime_r() leaves \n on end of timestamp  */
222 
223   /* initialize thread data */
224   esl_stopwatch_Start(w);
225 
226   info->range_list = NULL;
227   if (esl_opt_IsUsed(query->opts, "--seqdb_ranges")) {
228     ESL_ALLOC(info->range_list, sizeof(RANGE_LIST));
229     hmmpgmd_GetRanges(info->range_list, esl_opt_GetString(query->opts, "--seqdb_ranges"));
230   }
231 
232 
233   if (query->cmd_type == HMMD_CMD_SEARCH) threadObj = esl_threads_Create(&search_thread);
234   else                                    threadObj = esl_threads_Create(&scan_thread);
235 
236   if (query->query_type == HMMD_SEQUENCE) {
237     fprintf(stdout, "Search seq %s  [L=%ld]", query->seq->name, (long) query->seq->n);
238   } else {
239     fprintf(stdout, "Search hmm %s  [M=%d]", query->hmm->name, query->hmm->M);
240   }
241   fprintf(stdout, " vs %s DB %d [%d - %d]",
242           (query->cmd_type == HMMD_CMD_SEARCH) ? "SEQ" : "HMM",
243           query->dbx, query->inx, query->inx + query->cnt - 1);
244 
245   if (info->range_list)
246     fprintf(stdout, " in range(s) %s", esl_opt_GetString(query->opts, "--seqdb_ranges"));
247 
248   fprintf(stdout, "\n");
249 
250   /* Create processing pipeline and hit list */
251   for (i = 0; i < env->ncpus; ++i) {
252     info[i].abc   = query->abc;
253     info[i].hmm   = query->hmm;
254     info[i].seq   = query->seq;
255     info[i].opts  = query->opts;
256 
257     info[i].range_list  = info[0].range_list;
258 
259     info[i].th    = NULL;
260     info[i].pli   = NULL;
261 
262     info[i].inx_mutex = &inx_mutex;
263     info[i].inx       = &current_index;/* this is confusing trickery - to share a single variable across all threads */
264     info[i].blk_size  = &blk_size;     /* ditto */
265     info[i].limit     = &limit;	       /* ditto. TODO: come back and clean this up. */
266 
267     if (query->cmd_type == HMMD_CMD_SEARCH) {
268       HMMER_SEQ **list  = env->seq_db->db[query->dbx].list;
269       info[i].sq_list   = &list[query->inx];
270       info[i].sq_cnt    = query->cnt;
271       info[i].db_Z      = env->seq_db->db[query->dbx].K;
272       info[i].om_list   = NULL;
273       info[i].om_cnt    = 0;
274     } else {
275       info[i].sq_list   = NULL;
276       info[i].sq_cnt    = 0;
277       info[i].db_Z      = 0;
278       info[i].om_list   = &env->hmm_db->list[query->inx];
279       info[i].om_cnt    = query->cnt;
280     }
281 
282     esl_threads_AddThread(threadObj, &info[i]);
283   }
284 
285   /* try block size of 5000.  we will need enough sequences for four
286    * blocks per thread or better.
287    */
288   blk_size = 5000;
289   cnt = query->cnt / env->ncpus / blk_size;
290   limit = query->cnt * 2 / 3;
291   if (cnt < 4) {
292     /* try block size of 1000  */
293     blk_size /= 5;
294     cnt = query->cnt / env->ncpus / blk_size;
295     if (cnt < 4) {
296       /* still not enough.  just divide it up into one block per thread */
297       blk_size = query->cnt / env->ncpus + 1;
298       limit = query->cnt * 2;
299     }
300   }
301   current_index = 0;
302 
303   esl_threads_WaitForStart(threadObj);
304   esl_threads_WaitForFinish(threadObj);
305 
306   esl_stopwatch_Stop(w);
307 #if 1
308   fprintf (stdout, "   Sequences  Residues                              Elapsed\n");
309   for (i = 0; i < env->ncpus; ++i) {
310     print_timings(i, info[i].elapsed, info[i].pli);
311   }
312 #endif
313   /* merge the results of the search results */
314   for (i = 1; i < env->ncpus; ++i) {
315     p7_tophits_Merge(info[0].th, info[i].th);
316     p7_pipeline_Merge(info[0].pli, info[i].pli);
317     p7_pipeline_Destroy(info[i].pli);
318     p7_tophits_Destroy(info[i].th);
319   }
320 
321   print_timings(99, w->elapsed, info[0].pli);
322   send_results(env->fd, w, info[0].th, info[0].pli);
323 
324   /* free the last of the pipeline data */
325   p7_pipeline_Destroy(info->pli);
326   p7_tophits_Destroy(info->th);
327 
328   esl_threads_Destroy(threadObj);
329 
330   pthread_mutex_destroy(&inx_mutex);
331 
332   if (info->range_list) {
333     if (info->range_list->starts)  free(info->range_list->starts);
334     if (info->range_list->ends)    free(info->range_list->ends);
335     free (info->range_list);
336   }
337 
338   free(info);
339 
340   esl_stopwatch_Destroy(w);
341   esl_alphabet_Destroy(abc);
342 
343   return;
344 
345  ERROR:
346   LOG_FATAL_MSG("malloc", errno);
347 }
348 
349 static QUEUE_DATA *
process_QueryCmd(HMMD_COMMAND * cmd,WORKER_ENV * env)350 process_QueryCmd(HMMD_COMMAND *cmd, WORKER_ENV *env)
351 {
352   int                i;
353   int                n;
354   int                status;
355 
356   char              *p;
357   char              *name;
358   char              *desc;
359   ESL_DSQ           *dsq;
360 
361   QUEUE_DATA        *query  = NULL;
362 
363   if ((query = malloc(sizeof(QUEUE_DATA))) == NULL) LOG_FATAL_MSG("malloc", errno);
364   memset(query, 0, sizeof(QUEUE_DATA));	 /* avoid uninitialized bytes. remove this, if we ever serialize/deserialize structures properly */
365 
366   printf("CMD: %d %d\n", cmd->hdr.command, cmd->srch.query_type);
367 
368   query->cmd_type   = cmd->hdr.command;
369   query->query_type = cmd->srch.query_type;
370   query->dbx        = cmd->srch.db_inx;
371   query->inx        = cmd->srch.inx;
372   query->cnt        = cmd->srch.cnt;
373   query->sock       = env->fd;
374   query->cmd        = NULL;
375 
376   p = cmd->srch.data;
377 
378   /* process search specific options */
379   status = process_searchopts(env->fd, p, &query->opts);
380   if (status != eslOK)  LOG_FATAL_MSG("esl_getopts_Create", status);
381 
382   query->hmm = NULL;
383   query->seq = NULL;
384 
385   query->abc = esl_alphabet_Create(eslAMINO);
386 
387   /* check if we are processing a sequence or hmm */
388   if (cmd->srch.query_type == HMMD_SEQUENCE) {
389     n    = cmd->srch.query_length - 2;
390     name = p + cmd->srch.opts_length;
391     desc = name + strlen(name) + 1;
392     dsq  = (ESL_DSQ *) (desc + strlen(desc) + 1);
393     query->seq = esl_sq_CreateDigitalFrom(query->abc, name, dsq, n, desc, NULL, NULL);
394   } else {
395     P7_HMM  thmm;
396     P7_HMM *hmm = p7_hmm_CreateShell();
397 
398     /* allocate memory for the hmm and initialize */
399     p += cmd->srch.opts_length;
400     memcpy(&thmm, p, sizeof(P7_HMM));
401 
402     hmm->flags = thmm.flags;
403     p7_hmm_CreateBody(hmm, cmd->srch.query_length, query->abc);
404     p += sizeof(P7_HMM);
405 
406     /* initialize fields */
407     hmm->nseq       = thmm.nseq;
408     hmm->eff_nseq   = thmm.eff_nseq;
409     hmm->max_length = thmm.max_length;
410     hmm->checksum   = thmm.checksum;
411     hmm->ctime      = NULL;
412     hmm->comlog     = NULL;
413 
414     for (i = 0; i < p7_NCUTOFFS; i++) hmm->cutoff[i]  = thmm.cutoff[i];
415     for (i = 0; i < p7_NEVPARAM; i++) hmm->evparam[i] = thmm.evparam[i];
416     for (i = 0; i < p7_MAXABET;  i++) hmm->compo[i]   = thmm.compo[i];
417 
418     /* fill in the hmm pointers */
419     n = sizeof(float) * (hmm->M + 1) * p7H_NTRANSITIONS;
420     memcpy(*hmm->t, p, n);     p += n;
421 
422     n = sizeof(float) * (hmm->M + 1) * query->abc->K;
423     memcpy(*hmm->mat, p, n);   p += n;
424     memcpy(*hmm->ins, p, n);   p += n;
425 
426     if (thmm.name) { hmm->name = strdup(p); p += strlen(hmm->name) + 1; }
427     if (thmm.acc)  { hmm->acc  = strdup(p); p += strlen(hmm->acc)  + 1; }
428     if (thmm.desc) { hmm->desc = strdup(p); p += strlen(hmm->desc) + 1; }
429 
430     n = hmm->M + 2;
431     if (hmm->flags & p7H_RF)    { memcpy(hmm->rf,        p, n); p += n; }
432     if (hmm->flags & p7H_MMASK) { memcpy(hmm->mm,        p, n); p += n; }
433     if (hmm->flags & p7H_CONS)  { memcpy(hmm->consensus, p, n); p += n; }
434     if (hmm->flags & p7H_CS)    { memcpy(hmm->cs,        p, n); p += n; }
435     if (hmm->flags & p7H_CA)    { memcpy(hmm->ca,        p, n); p += n; }
436 
437     n = sizeof(int) * (hmm->M + 1);
438     if (hmm->flags & p7H_MAP) {  memcpy(hmm->map,       p, n); p += n; }
439 
440     query->hmm = hmm;
441   }
442 
443   return query;
444 }
445 
446 static void
process_Shutdown(HMMD_COMMAND * cmd,WORKER_ENV * env)447 process_Shutdown(HMMD_COMMAND *cmd, WORKER_ENV  *env)
448 {
449   int            n;
450 
451   n = MSG_SIZE(cmd);
452   cmd->hdr.status = eslOK;
453   if (writen(env->fd, cmd, n) != n) {
454     LOG_FATAL_MSG("write error", errno);
455   }
456 }
457 
458 static void
process_InitCmd(HMMD_COMMAND * cmd,WORKER_ENV * env)459 process_InitCmd(HMMD_COMMAND *cmd, WORKER_ENV  *env)
460 {
461   char *p;
462   int   n;
463   int   status;
464 
465   if (env->hmm_db != NULL) p7_hmmcache_Close(env->hmm_db);
466   if (env->seq_db != NULL) p7_seqcache_Close(env->seq_db);
467 
468   env->hmm_db = NULL;
469   env->seq_db = NULL;
470 
471   /* load the sequence database */
472   if (cmd->init.db_cnt != 0) {
473     P7_SEQCACHE *sdb = NULL;
474 
475     p  = cmd->init.data + cmd->init.seqdb_off;
476     status = p7_seqcache_Open(p, &sdb, NULL);
477     if (status != eslOK) {
478       p7_syslog(LOG_ERR,"[%s:%d] - p7_seqcache_Open %s error %d\n", __FILE__, __LINE__, p, status);
479       LOG_FATAL_MSG("cache seqdb error", status);
480     }
481 
482     /* validate the sequence database */
483     cmd->init.sid[MAX_INIT_DESC-1] = 0;
484     if (strcmp (cmd->init.sid, sdb->id) != 0 || cmd->init.db_cnt != sdb->db_cnt || cmd->init.seq_cnt != sdb->count) {
485       p7_syslog(LOG_ERR,"[%s:%d] - seq db %s: integrity error %s - %s\n", __FILE__, __LINE__, p, cmd->init.sid, sdb->id);
486       LOG_FATAL_MSG("database integrity error", 0);
487     }
488 
489     env->seq_db = sdb;
490   }
491 
492   /* load the hmm database */
493   if (cmd->init.hmm_cnt != 0) {
494     P7_HMMCACHE *hcache = NULL;
495 
496     p  = cmd->init.data + cmd->init.hmmdb_off;
497 
498     status = p7_hmmcache_Open(p, &hcache, NULL);
499     if (status != eslOK) {
500       p7_syslog(LOG_ERR,"[%s:%d] - p7_hmmcache_Open %s error %d\n", __FILE__, __LINE__, p, status);
501       LOG_FATAL_MSG("cache hmmdb error", status);
502     }
503 
504     if ( (status = p7_hmmcache_SetNumericNames(hcache)) != eslOK){
505       p7_syslog(LOG_ERR,"[%s:%d] - p7_hmmcache_SetNumericNames %s error %d\n", __FILE__, __LINE__, p, status);
506       LOG_FATAL_MSG("cache hmmdb error", status);
507     }
508 
509     /* validate the hmm database */
510     cmd->init.hid[MAX_INIT_DESC-1] = 0;
511     /* TODO: come up with a new pressed format with an id to compare - strcmp (cmd->init.hid, hdb->id) != 0 */
512     if (cmd->init.hmm_cnt != 1 || cmd->init.model_cnt != hcache->n) {
513       p7_syslog(LOG_ERR,"[%s:%d] - hmm db %s: integrity error\n", __FILE__, __LINE__, p);
514       LOG_FATAL_MSG("database integrity error", 0);
515     }
516 
517     env->hmm_db = hcache;
518 
519     printf("Loaded profile db %s;  models: %d  memory: %" PRId64 "\n",
520          p, hcache->n, (uint64_t) p7_hmmcache_Sizeof(hcache));
521 
522   }
523 
524   /* if stdout is redirected at the commandline, it causes printf's to be buffered,
525    * which means status logging isn't printed. This line strongly requests unbuffering,
526    * which should be ok, given the low stdout load of hmmpgmd
527    */
528   setvbuf (stdout, NULL, _IONBF, BUFSIZ);
529   printf("Data loaded into memory. Worker is ready.\n");
530   setvbuf (stdout, NULL, _IOFBF, BUFSIZ);
531 
532 
533   /* write back to the master that we are on line */
534   n = MSG_SIZE(cmd);
535   cmd->hdr.status = eslOK;
536   if (writen(env->fd, cmd, n) != n) {
537     LOG_FATAL_MSG("write error", errno);
538   }
539 }
540 
541 
542 static void
search_thread(void * arg)543 search_thread(void *arg)
544 {
545   int               i;
546   int               count;
547   int               seed;
548   int               status;
549   int               workeridx;
550   WORKER_INFO      *info;
551   ESL_THREADS      *obj;
552   ESL_SQ            dbsq;
553   ESL_STOPWATCH    *w        = NULL;         /* timing stopwatch               */
554   P7_BUILDER       *bld      = NULL;         /* HMM construction configuration */
555   P7_BG            *bg       = NULL;         /* null model                     */
556   P7_PIPELINE      *pli      = NULL;         /* work pipeline                  */
557   P7_TOPHITS       *th       = NULL;         /* top hit results                */
558   P7_PROFILE       *gm       = NULL;         /* generic model                  */
559   P7_OPROFILE      *om       = NULL;         /* optimized query profile        */
560 
561   obj = (ESL_THREADS *) arg;
562   esl_threads_Started(obj, &workeridx);
563 
564   info = (WORKER_INFO *) esl_threads_GetData(obj, workeridx);
565   w    = esl_stopwatch_Create();
566   bg   = p7_bg_Create(info->abc);
567   esl_stopwatch_Start(w);
568 
569   /* set up the dummy description and accession fields */
570   dbsq.desc = "";
571   dbsq.acc  = "";
572 
573   /* process a query sequence or hmm */
574   if (info->seq != NULL) {
575     bld = p7_builder_Create(NULL, info->abc);
576     if ((seed = esl_opt_GetInteger(info->opts, "--seed")) > 0) {
577       esl_randomness_Init(bld->r, seed);
578       bld->do_reseeding = TRUE;
579     }
580     bld->EmL = esl_opt_GetInteger(info->opts, "--EmL");
581     bld->EmN = esl_opt_GetInteger(info->opts, "--EmN");
582     bld->EvL = esl_opt_GetInteger(info->opts, "--EvL");
583     bld->EvN = esl_opt_GetInteger(info->opts, "--EvN");
584     bld->EfL = esl_opt_GetInteger(info->opts, "--EfL");
585     bld->EfN = esl_opt_GetInteger(info->opts, "--EfN");
586     bld->Eft = esl_opt_GetReal   (info->opts, "--Eft");
587 
588     if (esl_opt_IsOn(info->opts, "--mxfile")) status = p7_builder_SetScoreSystem (bld, esl_opt_GetString(info->opts, "--mxfile"), NULL, esl_opt_GetReal(info->opts, "--popen"), esl_opt_GetReal(info->opts, "--pextend"), bg);
589     else                                      status = p7_builder_LoadScoreSystem(bld, esl_opt_GetString(info->opts, "--mx"),           esl_opt_GetReal(info->opts, "--popen"), esl_opt_GetReal(info->opts, "--pextend"), bg);
590     if (status != eslOK) {
591       //client_error(info->sock, status, "hmmgpmd: failed to set single query sequence score system: %s", bld->errbuf);
592       fprintf(stderr, "hmmpgmd: failed to set single query sequence score system: %s", bld->errbuf);
593       pthread_exit(NULL);
594       return;
595     }
596     p7_SingleBuilder(bld, info->seq, bg, NULL, NULL, NULL, &om); /* bypass HMM - only need model */
597     p7_builder_Destroy(bld);
598   } else {
599     gm = p7_profile_Create (info->hmm->M, info->abc);
600     om = p7_oprofile_Create(info->hmm->M, info->abc);
601     p7_ProfileConfig(info->hmm, bg, gm, 100, p7_LOCAL);
602     p7_oprofile_Convert(gm, om);
603   }
604 
605   /* Create processing pipeline and hit list */
606   th  = p7_tophits_Create();
607   pli = p7_pipeline_Create(info->opts, om->M, 100, FALSE, p7_SEARCH_SEQS);
608   p7_pli_NewModel(pli, om, bg);
609 
610   if (pli->Z_setby == p7_ZSETBY_NTARGETS) pli->Z = info->db_Z;
611 
612   /* loop until all sequences have been processed */
613   count = 1;
614   while (count > 0) {
615     int          inx;
616     int          blksz;
617     HMMER_SEQ  **sq;
618 
619     /* grab the next block of sequences */
620     if (pthread_mutex_lock(info->inx_mutex) != 0) p7_Fail("mutex lock failed");
621     inx = *info->inx;
622     blksz = *info->blk_size;
623     if (inx > *info->limit) {
624       blksz /= 5;
625       if (blksz < 1000) {
626         *info->limit = info->sq_cnt * 2;
627       } else {
628         *info->limit = inx + (info->sq_cnt - inx) * 2 / 3;
629       }
630     }
631     *info->blk_size = blksz;
632     *info->inx += blksz;
633     if (pthread_mutex_unlock(info->inx_mutex) != 0) p7_Fail("mutex unlock failed");
634 
635     sq = info->sq_list + inx;
636 
637     count = info->sq_cnt - inx;
638     if (count > blksz) count = blksz;
639 
640     /* Main loop: */
641     for (i = 0; i < count; ++i, ++sq) {
642       if ( !(info->range_list) || hmmpgmd_IsWithinRanges ((*sq)->idx, info->range_list)) {
643         dbsq.name  = (*sq)->name;
644         dbsq.dsq   = (*sq)->dsq;
645         dbsq.n     = (*sq)->n;
646         dbsq.idx   = (*sq)->idx;
647         if((*sq)->desc != NULL) dbsq.desc  = (*sq)->desc;
648 
649         p7_bg_SetLength(bg, dbsq.n);
650         p7_oprofile_ReconfigLength(om, dbsq.n);
651 
652         p7_Pipeline(pli, om, bg, &dbsq, NULL, th);
653 
654         p7_pipeline_Reuse(pli);
655       }
656     }
657   }
658 
659   /* make available the pipeline objects to the main thread */
660   info->th = th;
661   info->pli = pli;
662 
663   /* clean up */
664   p7_bg_Destroy(bg);
665   p7_oprofile_Destroy(om);
666 
667   if (gm != NULL)  p7_profile_Destroy(gm);
668 
669   esl_stopwatch_Stop(w);
670   info->elapsed = w->elapsed;
671 
672   esl_stopwatch_Destroy(w);
673 
674   esl_threads_Finished(obj, workeridx);
675 
676   pthread_exit(NULL);
677   return;
678 }
679 
680 static void
scan_thread(void * arg)681 scan_thread(void *arg)
682 {
683   int               i;
684   int               count;
685   int               workeridx;
686   WORKER_INFO      *info;
687   ESL_THREADS      *obj;
688 
689   ESL_STOPWATCH    *w;
690 
691   P7_BG            *bg       = NULL;         /* null model                     */
692   P7_PIPELINE      *pli      = NULL;         /* work pipeline                  */
693   P7_TOPHITS       *th       = NULL;         /* top hit results                */
694 
695   obj = (ESL_THREADS *) arg;
696   esl_threads_Started(obj, &workeridx);
697 
698   info = (WORKER_INFO *) esl_threads_GetData(obj, workeridx);
699 
700   w = esl_stopwatch_Create();
701   esl_stopwatch_Start(w);
702 
703   /* Convert to an optimized model */
704   bg = p7_bg_Create(info->abc);
705 
706   /* Create processing pipeline and hit list */
707   th  = p7_tophits_Create();
708   pli = p7_pipeline_Create(info->opts, 100, 100, FALSE, p7_SCAN_MODELS);
709 
710   p7_pli_NewSeq(pli, info->seq);
711 
712   /* loop until all sequences have been processed */
713   count = 1;
714   while (count > 0) {
715     int           inx;
716     int          blksz;
717     P7_OPROFILE **om;
718 
719     /* grab the next block of sequences */
720     if (pthread_mutex_lock(info->inx_mutex) != 0) p7_Fail("mutex lock failed");
721     inx   = *info->inx;
722     blksz = *info->blk_size;
723     if (inx > *info->limit) {
724       blksz /= 5;
725       if (blksz < 1000) {
726         *info->limit = info->om_cnt * 2;
727       } else {
728         *info->limit = inx + (info->om_cnt - inx) * 2 / 3;
729       }
730     }
731     *info->blk_size = blksz;
732     *info->inx += blksz;
733     if (pthread_mutex_unlock(info->inx_mutex) != 0) p7_Fail("mutex unlock failed");
734 
735     om    = info->om_list + inx;
736     count = info->om_cnt - inx;
737     if (count > blksz) count = blksz;
738 
739     /* Main loop: */
740     for (i = 0; i < count; ++i, ++om) {
741       p7_pli_NewModel(pli, *om, bg);
742       p7_bg_SetLength(bg, info->seq->n);
743       p7_oprofile_ReconfigLength(*om, info->seq->n);
744 
745       p7_Pipeline(pli, *om, bg, info->seq, NULL, th);
746       p7_pipeline_Reuse(pli);
747     }
748   }
749 
750   /* make available the pipeline objects to the main thread */
751   info->th = th;
752   info->pli = pli;
753 
754   /* clean up */
755   p7_bg_Destroy(bg);
756 
757   esl_stopwatch_Stop(w);
758   info->elapsed = w->elapsed;
759 
760   esl_stopwatch_Destroy(w);
761 
762   esl_threads_Finished(obj, workeridx);
763 
764   pthread_exit(NULL);
765   return;
766 }
767 
768 
769 static void
send_results(int fd,ESL_STOPWATCH * w,P7_TOPHITS * th,P7_PIPELINE * pli)770 send_results(int fd, ESL_STOPWATCH *w, P7_TOPHITS *th, P7_PIPELINE *pli){
771   HMMD_SEARCH_STATS   stats;
772   HMMD_SEARCH_STATUS  status;
773   uint8_t **buf = NULL; // Buffer for the main results message
774   uint8_t **buf2 = NULL; // Buffer for the initial HMMD_SEARCH_STATUS message
775   uint8_t *buf_ptr = NULL;
776   uint8_t *buf2_ptr = NULL;
777   uint32_t n = 0; // index within buffer of serialized data
778   uint32_t nalloc = 0; // Size of serialized buffer
779 
780   // set up handles to buffers
781   buf = &buf_ptr;
782   buf2 = &buf2_ptr;
783 
784 
785   /* Start filling out the fixed-length structures to be sent */
786   memset(&status, 0, sizeof(HMMD_SEARCH_STATUS)); /* silence valgrind errors - zero out entire structure including its padding */
787   status.status     = eslOK;
788   status.msg_size   = sizeof(stats);
789 
790   /* copy the search stats */
791   stats.elapsed     = w->elapsed;
792   stats.user        = w->user;
793   stats.sys         = w->sys;
794 
795   stats.nmodels     = pli->nmodels;
796   stats.nseqs       = pli->nseqs;
797   stats.n_past_msv  = pli->n_past_msv;
798   stats.n_past_bias = pli->n_past_bias;
799   stats.n_past_vit  = pli->n_past_vit;
800   stats.n_past_fwd  = pli->n_past_fwd;
801 
802   stats.Z           = pli->Z;
803   stats.domZ        = pli->domZ;
804   stats.Z_setby     = pli->Z_setby;
805   stats.domZ_setby  = pli->domZ_setby;
806 
807   stats.nhits       = th->N;
808   stats.nreported   = th->nreported;
809   stats.nincluded   = th->nincluded;
810   stats.hit_offsets = NULL; // This field is only used when sending results back to the client
811   if(p7_hmmd_search_stats_Serialize(&stats, buf, &n, &nalloc) != eslOK){
812     LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATS failed", errno);
813   }
814 
815   // and then the hits
816   for(int i =0; i< stats.nhits; i++){
817     if(p7_hit_Serialize(&(th->unsrt[i]), buf, &n, &nalloc) != eslOK){
818       LOG_FATAL_MSG("Serializing P7_HIT failed", errno);
819     }
820   }
821 
822   status.msg_size = n; // n will have the number of bytes used to serialize the main data block
823   n = 0;
824   nalloc = 0; // reset these to serialize status object
825 
826   // Serialize the search_status object
827  if(hmmd_search_status_Serialize(&status, buf2, &n, &nalloc) != eslOK){
828     LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATUS failed", errno);
829   }
830 
831   // Send the status object
832   if (writen(fd, buf2_ptr, n) != n) LOG_FATAL_MSG("write", errno);
833 
834   // And the serialized data
835   if (writen(fd, buf_ptr, status.msg_size) != status.msg_size) LOG_FATAL_MSG("write", errno);
836   free(buf_ptr);
837   free(buf2_ptr);
838   printf("Bytes: %" PRId64 "  hits: %" PRId64 "  sent on socket %d\n", status.msg_size, stats.nhits, fd);
839   fflush(stdout);
840 }
841 
842 
843 static int
setup_masterside_comm(ESL_GETOPTS * opts)844 setup_masterside_comm(ESL_GETOPTS *opts)
845 {
846   int    fd = -1;
847   int    cnt;
848   int    sec;
849   int    connected;
850 
851   struct sockaddr_in   addr;
852 
853   /* create a reliable, stream socket using TCP */
854   if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) LOG_FATAL_MSG("socket", errno);
855 
856   /* construct the server address structure */
857   memset(&addr, 0, sizeof(addr));
858   addr.sin_family = AF_INET;
859   addr.sin_port   = htons(esl_opt_GetInteger(opts, "--wport"));
860   if ((inet_pton(AF_INET, esl_opt_GetString(opts, "--worker"), &addr.sin_addr)) < 0) LOG_FATAL_MSG("inet pton", errno);
861 
862   /* try to connect to the master server */
863   cnt = 0;
864   sec = 1;
865   connected = -1;
866   while (connected < 0) {
867     /* establish the connection to the master server */
868     if ((connected = connect(fd, (struct sockaddr *) &addr, sizeof(addr))) < 0) {
869       if (errno != ECONNREFUSED) LOG_FATAL_MSG("connect", errno);
870 
871       /* the master server is not listening.  sleep and try again */
872       sleep(sec);
873       if (++cnt > 10) {
874         cnt = 0;
875         if (sec < 64) sec *= 2;
876       }
877     }
878   }
879 
880   return fd;
881 }
882 
883 #endif /*HMMER_THREADS*/
884 
885 
886 
887