1 /* worker side of the hmmer daemon
2  */
3 #include "p7_config.h"
4 
5 #ifdef HMMER_THREADS
6 
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <errno.h>
11 #include <unistd.h>
12 #include <signal.h>
13 #include <pthread.h>
14 #include <setjmp.h>
15 #include <sys/socket.h>
16 #ifdef HAVE_NETINET_IN_H
17 #include <netinet/in.h>     /* On FreeBSD, you need netinet/in.h for struct sockaddr_in            */
18 #endif                      /* On OpenBSD, netinet/in.h is required for (must precede) arpa/inet.h */
19 #include <arpa/inet.h>
20 #include <syslog.h>
21 #include <time.h>
22 
23 #ifndef HMMER_THREADS
24 #error "Program requires pthreads be enabled."
25 #endif /*HMMER_THREADS*/
26 
27 #include "easel.h"
28 #include "esl_alphabet.h"
29 #include "esl_getopts.h"
30 #include "esl_sq.h"
31 #include "esl_sqio.h"
32 #include "esl_stopwatch.h"
33 #include "esl_threads.h"
34 
35 #include "hmmer.h"
36 #include "hmmpgmd.h"
37 #include "hmmpgmd_shard.h"
38 #include "cachedb.h"
39 #include "cachedb_shard.h"
40 #include "p7_hmmcache.h"
41 
42 #define MAX_WORKERS  64
43 #define MAX_BUFFER   4096
44 
45 #define CONF_FILE "/etc/hmmpgmd.conf"
46 
47 typedef struct {
48   HMMER_SEQ       **sq_list;     /* list of sequences to process     */
49   int               sq_cnt;      /* number of sequences              */
50   int               db_Z;        /* true number of sequences         */
51 
52   P7_OPROFILE     **om_list;     /* list of profiles to process      */
53   int               om_cnt;      /* number of profiles               */
54 
55   pthread_mutex_t  *inx_mutex;   /* protect data                     */
56   int              *blk_size;    /* sequences per block              */
57   int              *limit;       /* point to decrease block size     */
58   int              *inx;         /* next index to process            */
59 
60   P7_HMM           *hmm;         /* query HMM                        */
61   ESL_SQ           *seq;         /* query sequence                   */
62   ESL_ALPHABET     *abc;         /* digital alphabet                 */
63   ESL_GETOPTS      *opts;        /* search specific options          */
64 
65   RANGE_LIST       *range_list;  /* (optional) list of ranges searched within the seqdb */
66 
67   double            elapsed;     /* elapsed search time              */
68 
69   /* Structure created and populated by the individual threads.
70    * The main thread is responsible for freeing up the memory.
71    */
72   P7_PIPELINE      *pli;         /* work pipeline                    */
73   P7_TOPHITS       *th;          /* top hit results                  */
74 } WORKER_INFO;
75 
76 typedef struct {
77   int fd;                        /* socket connection to server      */
78   int ncpus;                     /* number of cpus to use            */
79 
80   P7_SEQCACHE *seq_db;           /* cached sequence database         */
81   P7_HMMCACHE *hmm_db;           /* cached hmm database              */
82 } WORKER_ENV;
83 
84 
85 static void
free_QueueData_shard(QUEUE_DATA_SHARD * data)86 free_QueueData_shard(QUEUE_DATA_SHARD *data)
87 {
88   /* free the query data */
89   esl_getopts_Destroy(data->opts);
90 
91   if (data->abc != NULL) esl_alphabet_Destroy(data->abc);
92   if (data->hmm != NULL) p7_hmm_Destroy(data->hmm);
93   if (data->seq != NULL) esl_sq_Destroy(data->seq);
94   if (data->cmd != NULL) free(data->cmd);
95   memset(data, 0, sizeof(*data));
96   free(data);
97 }
98 
99 static void process_InitCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env);
100 static void process_SearchCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env, QUEUE_DATA_SHARD *query);
101 static void process_Shutdown(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env);
102 
103 static QUEUE_DATA_SHARD *process_QueryCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env);
104 
105 static int  setup_masterside_comm(ESL_GETOPTS *opts);
106 
107 static void send_results(int fd, ESL_STOPWATCH *w, P7_TOPHITS *th, P7_PIPELINE *pli);
108 
109 #define BLOCK_SIZE 1000
110 static void search_thread(void *arg);
111 static void scan_thread(void *arg);
112 
113 static void
print_timings(int i,double elapsed,P7_PIPELINE * pli)114 print_timings(int i, double elapsed, P7_PIPELINE *pli)
115 {
116   char buf1[16];
117   int h, m, s, hs;
118 
119   h  = (int) (elapsed / 3600.);
120   m  = (int) (elapsed / 60.) - h * 60;
121   s  = (int) (elapsed) - h * 3600 - m * 60;
122   hs = (int) (elapsed * 100.) - h * 360000 - m * 6000 - s * 100;
123   sprintf(buf1, "%02d:%02d.%02d", m,s,hs);
124 
125   fprintf (stdout, "%2d %9" PRId64 " %9" PRId64 " %7" PRId64 " %7" PRId64 " %6" PRId64 " %5" PRId64 " %s\n",
126            i, pli->nseqs, pli->nres, pli->n_past_msv, pli->n_past_bias, pli->n_past_vit, pli->n_past_fwd, buf1);
127 }
128 
129 static int
read_Command(HMMD_COMMAND_SHARD ** ret_cmd,WORKER_ENV * env)130 read_Command(HMMD_COMMAND_SHARD **ret_cmd, WORKER_ENV *env)
131 {
132   HMMD_HEADER   hdr;
133   HMMD_COMMAND_SHARD *cmd = NULL;
134   int           n;
135 
136   /* read the command header */
137   if (readn(env->fd, &hdr, sizeof(hdr)) == -1) {
138     if (errno && errno != ECONNREFUSED) LOG_FATAL_MSG("read", errno);
139     return eslEOD;
140   }
141 
142   /* read the command data */
143   n = MSG_SIZE(&hdr);
144   if ((cmd = malloc(n)) == NULL) LOG_FATAL_MSG("malloc", errno);
145   memset(cmd, 0, n);		/* avoid uninitialized bytes. remove this, if we ever serialize/deserialize structures properly */
146   cmd->hdr.command = hdr.command;
147   cmd->hdr.length  = hdr.length;
148   if (hdr.length > 0) {
149     if (readn(env->fd, &cmd->init, hdr.length) == -1) {
150       if (errno && errno != ECONNREFUSED) LOG_FATAL_MSG("read", errno);
151       return eslEOD;
152     }
153   }
154 
155   *ret_cmd = cmd;
156   return eslOK;
157 }
158 
159 void
worker_process_shard(ESL_GETOPTS * go)160 worker_process_shard(ESL_GETOPTS *go)
161 {
162   HMMD_COMMAND_SHARD *cmd      = NULL;  /* see hmmpgmd.h */
163   int           shutdown = 0;
164   WORKER_ENV    env;
165   int           status;
166 
167   QUEUE_DATA_SHARD      *query      = NULL;
168 
169   /* Initializations */
170   impl_Init();
171   p7_FLogsumInit();      /* we're going to use table-driven Logsum() approximations at times */
172 
173   env.ncpus = ESL_MIN(esl_opt_GetInteger(go, "--cpu"),  esl_threads_GetCPUCount());
174 
175   env.hmm_db = NULL;
176   env.seq_db = NULL;
177   env.fd     = setup_masterside_comm(go);
178 
179   while (!shutdown)
180     {
181       if ((status = read_Command(&cmd, &env)) != eslOK) break;
182 
183 
184       switch (cmd->hdr.command) {
185       case HMMD_CMD_INIT: /* printf("Worker received init command\n"); */   process_InitCmd  (cmd, &env);                break;
186       case HMMD_CMD_SCAN:
187 	  {
188  		   query = process_QueryCmd(cmd, &env);
189  		   process_SearchCmd(cmd, &env, query);
190  		   free_QueueData_shard(query);
191 	  }
192 		  break;
193       case HMMD_CMD_SEARCH:
194 		    query = process_QueryCmd(cmd, &env);
195 	     process_SearchCmd(cmd, &env, query);
196         free_QueueData_shard(query);
197          break;
198       case HMMD_CMD_SHUTDOWN:  process_Shutdown (cmd, &env);  shutdown = 1; break;
199       default: p7_syslog(LOG_ERR,"[%s:%d] - unknown command %d (%d)\n", __FILE__, __LINE__, cmd->hdr.command, cmd->hdr.length);
200       }
201 
202       free(cmd);
203       cmd = NULL;
204     }
205 
206   if (env.hmm_db) p7_hmmcache_Close(env.hmm_db);
207   if (env.seq_db) p7_seqcache_Close(env.seq_db);
208   if (env.fd != -1) close(env.fd);
209   return;
210 }
211 
212 
213 static void
process_SearchCmd(HMMD_COMMAND_SHARD * cmd,WORKER_ENV * env,QUEUE_DATA_SHARD * query)214 process_SearchCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env, QUEUE_DATA_SHARD *query)
215 {
216   int              i;
217   int              cnt;
218   int              limit;
219   int              status;
220   int              blk_size;
221   WORKER_INFO     *info       = NULL;
222   ESL_ALPHABET    *abc;
223   ESL_STOPWATCH   *w;
224   ESL_THREADS     *threadObj  = NULL;
225   pthread_mutex_t  inx_mutex;
226   int              current_index;
227   time_t           date;
228   char             timestamp[32];
229 
230   w = esl_stopwatch_Create();
231   abc = esl_alphabet_Create(eslAMINO);
232 
233   if (pthread_mutex_init(&inx_mutex, NULL) != 0) p7_Fail("mutex init failed");
234   ESL_ALLOC(info, sizeof(*info) * env->ncpus);
235 
236   /* Log the current time (at search start) */
237   date = time(NULL);
238   ctime_r(&date, timestamp);
239   printf("\n%s", timestamp);	/* note that ctime_r() leaves \n on end of timestamp  */
240 
241   /* initialize thread data */
242   esl_stopwatch_Start(w);
243 
244   info->range_list = NULL;
245   if (esl_opt_IsUsed(query->opts, "--seqdb_ranges")) {
246     ESL_ALLOC(info->range_list, sizeof(RANGE_LIST));
247     hmmpgmd_GetRanges(info->range_list, esl_opt_GetString(query->opts, "--seqdb_ranges"));
248  /*   printf("Range-list query detected, with ranges: ");
249     for(int temp = 0; temp < info->range_list->N; temp++){
250       if(temp > 0){
251         printf(", ");
252       }
253       printf("%d..%d", info->range_list->starts[temp], info->range_list->ends[temp]);
254     }
255     printf("\n");  */
256   }
257 
258 
259   if (query->cmd_type == HMMD_CMD_SEARCH) threadObj = esl_threads_Create(&search_thread);
260   else                                    threadObj = esl_threads_Create(&scan_thread);
261 
262   if (query->query_type == HMMD_SEQUENCE) {
263     fprintf(stdout, "Search seq %s  [L=%ld]", query->seq->name, (long) query->seq->n);
264   } else {
265     fprintf(stdout, "Search hmm %s  [M=%d]", query->hmm->name, query->hmm->M);
266   }
267   fprintf(stdout, " vs %s DB %d [%d - %d]",
268           (query->cmd_type == HMMD_CMD_SEARCH) ? "SEQ" : "HMM",
269           query->dbx, query->inx, query->inx + query->cnt - 1);
270 
271   if (info->range_list)
272     fprintf(stdout, " in range(s) %s", esl_opt_GetString(query->opts, "--seqdb_ranges"));
273 
274   fprintf(stdout, "\n");
275 
276   /* Create processing pipeline and hit list */
277   for (i = 0; i < env->ncpus; ++i) {
278     info[i].abc   = query->abc;
279     info[i].hmm   = query->hmm;
280     info[i].seq   = query->seq;
281     info[i].opts  = query->opts;
282 
283     info[i].range_list  = info[0].range_list;
284 
285     info[i].th    = NULL;
286     info[i].pli   = NULL;
287 
288     info[i].inx_mutex = &inx_mutex;
289     info[i].inx       = &current_index;/* this is confusing trickery - to share a single variable across all threads */
290     info[i].blk_size  = &blk_size;     /* ditto */
291     info[i].limit     = &limit;	       /* ditto. TODO: come back and clean this up. */
292 
293     if (query->cmd_type == HMMD_CMD_SEARCH) {
294       HMMER_SEQ **list  = env->seq_db->db[query->dbx].list;
295       info[i].sq_list   = &list[0];  // Always search the whole DB in sharded mode
296       //info[i].sq_cnt    = query->cnt;
297       info[i].sq_cnt = env->seq_db->db[query->dbx].count;
298       info[i].db_Z      = env->seq_db->db[query->dbx].K;
299       info[i].om_list   = NULL;
300       info[i].om_cnt    = 0;
301     } else {
302       info[i].sq_list   = NULL;
303       info[i].sq_cnt    = 0;
304       info[i].db_Z      = 0;
305       info[i].om_list   = &env->hmm_db->list[query->inx];
306       info[i].om_cnt    = query->cnt;
307     }
308 
309     esl_threads_AddThread(threadObj, &info[i]);
310   }
311 
312   /* try block size of 5000.  we will need enough sequences for four
313    * blocks per thread or better.
314    */
315   blk_size = 5000;
316   cnt = query->cnt / env->ncpus / blk_size;
317   limit = query->cnt * 2 / 3;
318   if (cnt < 4) {
319     /* try block size of 1000  */
320     blk_size /= 5;
321     cnt = query->cnt / env->ncpus / blk_size;
322     if (cnt < 4) {
323       /* still not enough.  just divide it up into one block per thread */
324       blk_size = query->cnt / env->ncpus + 1;
325       limit = query->cnt * 2;
326     }
327   }
328   current_index = 0;
329 
330   esl_threads_WaitForStart(threadObj);
331   esl_threads_WaitForFinish(threadObj);
332 
333   esl_stopwatch_Stop(w);
334 #if 1
335   fprintf (stdout, "   Sequences  Residues                              Elapsed\n");
336   for (i = 0; i < env->ncpus; ++i) {
337     print_timings(i, info[i].elapsed, info[i].pli);
338   }
339 #endif
340   /* merge the results of the search results */
341   for (i = 1; i < env->ncpus; ++i) {
342     p7_tophits_Merge(info[0].th, info[i].th);
343     p7_pipeline_Merge(info[0].pli, info[i].pli);
344     p7_pipeline_Destroy(info[i].pli);
345     p7_tophits_Destroy(info[i].th);
346   }
347 
348   print_timings(99, w->elapsed, info[0].pli);
349   send_results(env->fd, w, info[0].th, info[0].pli);
350 
351   /* free the last of the pipeline data */
352   p7_pipeline_Destroy(info->pli);
353   p7_tophits_Destroy(info->th);
354 
355   esl_threads_Destroy(threadObj);
356 
357   pthread_mutex_destroy(&inx_mutex);
358 
359   if (info->range_list) {
360     if (info->range_list->starts)  free(info->range_list->starts);
361     if (info->range_list->ends)    free(info->range_list->ends);
362     free (info->range_list);
363   }
364 
365   free(info);
366 
367   esl_stopwatch_Destroy(w);
368   esl_alphabet_Destroy(abc);
369 
370   return;
371 
372  ERROR:
373   LOG_FATAL_MSG("malloc", errno);
374 }
375 
376 static QUEUE_DATA_SHARD *
process_QueryCmd(HMMD_COMMAND_SHARD * cmd,WORKER_ENV * env)377 process_QueryCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env)
378 {
379   int                i;
380   int                n;
381   int                status;
382 
383   char              *p;
384   char              *name;
385   char              *desc;
386   ESL_DSQ           *dsq;
387 
388   QUEUE_DATA_SHARD        *query  = NULL;
389 
390   if ((query = malloc(sizeof(QUEUE_DATA_SHARD))) == NULL) LOG_FATAL_MSG("malloc", errno);
391   memset(query, 0, sizeof(QUEUE_DATA_SHARD));	 /* avoid uninitialized bytes. remove this, if we ever serialize/deserialize structures properly */
392 
393   printf("CMD: %d %d\n", cmd->hdr.command, cmd->srch.query_type);
394 
395   query->cmd_type   = cmd->hdr.command;
396   query->query_type = cmd->srch.query_type;
397   query->dbx        = cmd->srch.db_inx;
398   query->inx        = cmd->srch.inx;
399   query->cnt        = cmd->srch.cnt;
400   query->sock       = env->fd;
401   query->cmd        = NULL;
402 
403   p = cmd->srch.data;
404 
405   /* process search specific options */
406   status = process_searchopts(env->fd, p, &query->opts);
407   if (status != eslOK)  LOG_FATAL_MSG("esl_getopts_Create", status);
408 
409   query->hmm = NULL;
410   query->seq = NULL;
411 
412   query->abc = esl_alphabet_Create(eslAMINO);
413 
414   /* check if we are processing a sequence or hmm */
415   if (cmd->srch.query_type == HMMD_SEQUENCE) {
416     n    = cmd->srch.query_length - 2;
417     name = p + cmd->srch.opts_length;
418     desc = name + strlen(name) + 1;
419     dsq  = (ESL_DSQ *) (desc + strlen(desc) + 1);
420     query->seq = esl_sq_CreateDigitalFrom(query->abc, name, dsq, n, desc, NULL, NULL);
421   } else {
422     P7_HMM  thmm;
423     P7_HMM *hmm = p7_hmm_CreateShell();
424 
425     /* allocate memory for the hmm and initialize */
426     p += cmd->srch.opts_length;
427     memcpy(&thmm, p, sizeof(P7_HMM));
428 
429     hmm->flags = thmm.flags;
430     p7_hmm_CreateBody(hmm, cmd->srch.query_length, query->abc);
431     p += sizeof(P7_HMM);
432 
433     /* initialize fields */
434     hmm->nseq       = thmm.nseq;
435     hmm->eff_nseq   = thmm.eff_nseq;
436     hmm->max_length = thmm.max_length;
437     hmm->checksum   = thmm.checksum;
438     hmm->ctime      = NULL;
439     hmm->comlog     = NULL;
440 
441     for (i = 0; i < p7_NCUTOFFS; i++) hmm->cutoff[i]  = thmm.cutoff[i];
442     for (i = 0; i < p7_NEVPARAM; i++) hmm->evparam[i] = thmm.evparam[i];
443     for (i = 0; i < p7_MAXABET;  i++) hmm->compo[i]   = thmm.compo[i];
444 
445     /* fill in the hmm pointers */
446     n = sizeof(float) * (hmm->M + 1) * p7H_NTRANSITIONS;
447     memcpy(*hmm->t, p, n);     p += n;
448 
449     n = sizeof(float) * (hmm->M + 1) * query->abc->K;
450     memcpy(*hmm->mat, p, n);   p += n;
451     memcpy(*hmm->ins, p, n);   p += n;
452 
453     if (thmm.name) { hmm->name = strdup(p); p += strlen(hmm->name) + 1; }
454     if (thmm.acc)  { hmm->acc  = strdup(p); p += strlen(hmm->acc)  + 1; }
455     if (thmm.desc) { hmm->desc = strdup(p); p += strlen(hmm->desc) + 1; }
456 
457     n = hmm->M + 2;
458     if (hmm->flags & p7H_RF)    { memcpy(hmm->rf,        p, n); p += n; }
459     if (hmm->flags & p7H_MMASK) { memcpy(hmm->mm,        p, n); p += n; }
460     if (hmm->flags & p7H_CONS)  { memcpy(hmm->consensus, p, n); p += n; }
461     if (hmm->flags & p7H_CS)    { memcpy(hmm->cs,        p, n); p += n; }
462     if (hmm->flags & p7H_CA)    { memcpy(hmm->ca,        p, n); p += n; }
463 
464     n = sizeof(int) * (hmm->M + 1);
465     if (hmm->flags & p7H_MAP) {  memcpy(hmm->map,       p, n); p += n; }
466 
467     query->hmm = hmm;
468   }
469 
470   return query;
471 }
472 
473 static void
process_Shutdown(HMMD_COMMAND_SHARD * cmd,WORKER_ENV * env)474 process_Shutdown(HMMD_COMMAND_SHARD *cmd, WORKER_ENV  *env)
475 {
476   int            n;
477 
478   n = MSG_SIZE(cmd);
479   cmd->hdr.status = eslOK;
480   if (writen(env->fd, cmd, n) != n) {
481     LOG_FATAL_MSG("write error", errno);
482   }
483 }
484 
485 static void
process_InitCmd(HMMD_COMMAND_SHARD * cmd,WORKER_ENV * env)486 process_InitCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV  *env)
487 {
488   char *p;
489   int   n;
490   int   status;
491 
492   if (env->hmm_db != NULL) p7_hmmcache_Close(env->hmm_db);
493   if (env->seq_db != NULL) p7_seqcache_Close(env->seq_db);
494 
495   env->hmm_db = NULL;
496   env->seq_db = NULL;
497 
498   /* load the sequence database */
499   if (cmd->init.db_cnt != 0) {
500     P7_SEQCACHE *sdb = NULL;
501     //printf("This worker assigned shard %d out of %d\n", cmd->init.my_shard, cmd->init.num_shards);
502 
503     p  = cmd->init.data + cmd->init.seqdb_off;
504 //   printf("Opening database file %s\n", p);
505     status = p7_seqcache_Open_shard(p, &sdb, NULL, cmd->init.my_shard, cmd->init.num_shards);
506     if (status != eslOK) {
507       p7_syslog(LOG_ERR,"[%s:%d] - p7_seqcache_Open %s error %d\n", __FILE__, __LINE__, p, status);
508       LOG_FATAL_MSG("cache seqdb error", status);
509     }
510 //    printf("Database opened\n");
511     /* validate the sequence database */
512     cmd->init.sid[MAX_INIT_DESC-1] = 0;
513     if (strcmp (cmd->init.sid, sdb->id) != 0 || cmd->init.db_cnt != sdb->db_cnt /*|| cmd->init.seq_cnt != sdb->count*/) {
514       p7_syslog(LOG_ERR,"[%s:%d] - seq db %s: integrity error %s - %s\n", __FILE__, __LINE__, p, cmd->init.sid, sdb->id);
515       LOG_FATAL_MSG("database integrity error", 0);
516     }
517 //    printf("Database validated\n");
518     env->seq_db = sdb;
519   }
520 
521   /* load the hmm database */
522   if (cmd->init.hmm_cnt != 0) {
523     P7_HMMCACHE *hcache = NULL;
524   //  printf("Opening HMM database %s\n", p);
525     p  = cmd->init.data + cmd->init.hmmdb_off;
526 
527     status = p7_hmmcache_Open(p, &hcache, NULL);
528     if (status != eslOK) {
529       p7_syslog(LOG_ERR,"[%s:%d] - p7_hmmcache_Open %s error %d\n", __FILE__, __LINE__, p, status);
530       LOG_FATAL_MSG("cache hmmdb error", status);
531     }
532 
533     if ( (status = p7_hmmcache_SetNumericNames(hcache)) != eslOK){
534       p7_syslog(LOG_ERR,"[%s:%d] - p7_hmmcache_SetNumericNames %s error %d\n", __FILE__, __LINE__, p, status);
535       LOG_FATAL_MSG("cache hmmdb error", status);
536     }
537 
538     /* validate the hmm database */
539     cmd->init.hid[MAX_INIT_DESC-1] = 0;
540     /* TODO: come up with a new pressed format with an id to compare - strcmp (cmd->init.hid, hdb->id) != 0 */
541     if (cmd->init.hmm_cnt != 1 || cmd->init.model_cnt != hcache->n) {
542       p7_syslog(LOG_ERR,"[%s:%d] - hmm db %s: integrity error\n", __FILE__, __LINE__, p);
543       LOG_FATAL_MSG("database integrity error", 0);
544     }
545 
546     env->hmm_db = hcache;
547 
548     printf("Loaded profile db %s;  models: %d  memory: %" PRId64 "\n",
549          p, hcache->n, (uint64_t) p7_hmmcache_Sizeof(hcache));
550 
551   }
552 
553   /* if stdout is redirected at the commandline, it causes printf's to be buffered,
554    * which means status logging isn't printed. This line strongly requests unbuffering,
555    * which should be ok, given the low stdout load of hmmpgmd
556    */
557   setvbuf (stdout, NULL, _IONBF, BUFSIZ);
558   printf("Data loaded into memory. Worker is ready.\n");
559   setvbuf (stdout, NULL, _IOFBF, BUFSIZ);
560 
561 
562   /* write back to the master that we are on line */
563   n = MSG_SIZE(cmd);
564   cmd->hdr.status = eslOK;
565   if (writen(env->fd, cmd, n) != n) {
566     LOG_FATAL_MSG("write error", errno);
567   }
568 }
569 
570 
571 static void
search_thread(void * arg)572 search_thread(void *arg)
573 {
574   int               i;
575   int               count;
576   int               seed;
577   int               status;
578   int               workeridx;
579   WORKER_INFO      *info;
580   ESL_THREADS      *obj;
581   ESL_SQ            dbsq;
582   ESL_STOPWATCH    *w        = NULL;         /* timing stopwatch               */
583   P7_BUILDER       *bld      = NULL;         /* HMM construction configuration */
584   P7_BG            *bg       = NULL;         /* null model                     */
585   P7_PIPELINE      *pli      = NULL;         /* work pipeline                  */
586   P7_TOPHITS       *th       = NULL;         /* top hit results                */
587   P7_PROFILE       *gm       = NULL;         /* generic model                  */
588   P7_OPROFILE      *om       = NULL;         /* optimized query profile        */
589 
590   obj = (ESL_THREADS *) arg;
591   esl_threads_Started(obj, &workeridx);
592 
593   info = (WORKER_INFO *) esl_threads_GetData(obj, workeridx);
594   w    = esl_stopwatch_Create();
595   bg   = p7_bg_Create(info->abc);
596   esl_stopwatch_Start(w);
597 
598   /* set up the dummy description and accession fields */
599   dbsq.desc = "";
600   dbsq.acc  = "";
601 
602   /* process a query sequence or hmm */
603   if (info->seq != NULL) {
604     bld = p7_builder_Create(NULL, info->abc);
605     if ((seed = esl_opt_GetInteger(info->opts, "--seed")) > 0) {
606       esl_randomness_Init(bld->r, seed);
607       bld->do_reseeding = TRUE;
608     }
609     bld->EmL = esl_opt_GetInteger(info->opts, "--EmL");
610     bld->EmN = esl_opt_GetInteger(info->opts, "--EmN");
611     bld->EvL = esl_opt_GetInteger(info->opts, "--EvL");
612     bld->EvN = esl_opt_GetInteger(info->opts, "--EvN");
613     bld->EfL = esl_opt_GetInteger(info->opts, "--EfL");
614     bld->EfN = esl_opt_GetInteger(info->opts, "--EfN");
615     bld->Eft = esl_opt_GetReal   (info->opts, "--Eft");
616 
617     if (esl_opt_IsOn(info->opts, "--mxfile")) status = p7_builder_SetScoreSystem (bld, esl_opt_GetString(info->opts, "--mxfile"), NULL, esl_opt_GetReal(info->opts, "--popen"), esl_opt_GetReal(info->opts, "--pextend"), bg);
618     else                                      status = p7_builder_LoadScoreSystem(bld, esl_opt_GetString(info->opts, "--mx"),           esl_opt_GetReal(info->opts, "--popen"), esl_opt_GetReal(info->opts, "--pextend"), bg);
619     if (status != eslOK) {
620       //client_error(info->sock, status, "hmmgpmd: failed to set single query sequence score system: %s", bld->errbuf);
621       fprintf(stderr, "hmmpgmd: failed to set single query sequence score system: %s", bld->errbuf);
622       pthread_exit(NULL);
623       return;
624     }
625     p7_SingleBuilder(bld, info->seq, bg, NULL, NULL, NULL, &om); /* bypass HMM - only need model */
626     p7_builder_Destroy(bld);
627   } else {
628     gm = p7_profile_Create (info->hmm->M, info->abc);
629     om = p7_oprofile_Create(info->hmm->M, info->abc);
630     p7_ProfileConfig(info->hmm, bg, gm, 100, p7_LOCAL);
631     p7_oprofile_Convert(gm, om);
632   }
633 
634   /* Create processing pipeline and hit list */
635   th  = p7_tophits_Create();
636   pli = p7_pipeline_Create(info->opts, om->M, 100, FALSE, p7_SEARCH_SEQS);
637   p7_pli_NewModel(pli, om, bg);
638 
639   if (pli->Z_setby == p7_ZSETBY_NTARGETS) pli->Z = info->db_Z;
640   if (info->range_list){
641   //  printf("Worker thread starting range-list search\n");
642   }
643   /* loop until all sequences have been processed */
644   count = 1;
645   while (count > 0) {
646     int          inx;
647     int          blksz;
648     HMMER_SEQ  **sq;
649 
650     /* grab the next block of sequences */
651     if (pthread_mutex_lock(info->inx_mutex) != 0) p7_Fail("mutex lock failed");
652     inx = *info->inx;
653     blksz = *info->blk_size;
654     if (inx > *info->limit) {
655       blksz /= 5;
656       if (blksz < 1000) {
657         *info->limit = info->sq_cnt * 2;
658       } else {
659         *info->limit = inx + (info->sq_cnt - inx) * 2 / 3;
660       }
661     }
662     *info->blk_size = blksz;
663     *info->inx += blksz;
664     if (pthread_mutex_unlock(info->inx_mutex) != 0) p7_Fail("mutex unlock failed");
665 
666     sq = info->sq_list + inx;
667 
668     count = info->sq_cnt - inx;
669     if (count > blksz) count = blksz;
670 
671     /* Main loop: */
672     for (i = 0; i < count; ++i, ++sq) {
673  /*     if( strtol((*sq)->name, NULL, 10) != (*sq)->idx){
674         printf("Sequence with index %d had name %s\n", (*sq)->idx, (*sq)->name);
675       }  */
676       if ( !(info->range_list) || hmmpgmd_IsWithinRanges ((*sq)->idx, info->range_list)) {
677         dbsq.name  = (*sq)->name;
678         dbsq.dsq   = (*sq)->dsq;
679         dbsq.n     = (*sq)->n;
680         dbsq.idx   = (*sq)->idx;
681         if((*sq)->desc != NULL) dbsq.desc  = (*sq)->desc;
682 
683         p7_bg_SetLength(bg, dbsq.n);
684         p7_oprofile_ReconfigLength(om, dbsq.n);
685 
686         p7_Pipeline(pli, om, bg, &dbsq, NULL, th);
687 
688         p7_pipeline_Reuse(pli);
689       }
690     }
691   }
692 
693   /* make available the pipeline objects to the main thread */
694   info->th = th;
695   info->pli = pli;
696 
697   /* clean up */
698   p7_bg_Destroy(bg);
699   p7_oprofile_Destroy(om);
700 
701   if (gm != NULL)  p7_profile_Destroy(gm);
702 
703   esl_stopwatch_Stop(w);
704   info->elapsed = w->elapsed;
705 
706   esl_stopwatch_Destroy(w);
707 
708   esl_threads_Finished(obj, workeridx);
709 
710   pthread_exit(NULL);
711   return;
712 }
713 
714 static void
scan_thread(void * arg)715 scan_thread(void *arg)
716 {
717   int               i;
718   int               count;
719   int               workeridx;
720   WORKER_INFO      *info;
721   ESL_THREADS      *obj;
722 
723   ESL_STOPWATCH    *w;
724 
725   P7_BG            *bg       = NULL;         /* null model                     */
726   P7_PIPELINE      *pli      = NULL;         /* work pipeline                  */
727   P7_TOPHITS       *th       = NULL;         /* top hit results                */
728 
729   obj = (ESL_THREADS *) arg;
730   esl_threads_Started(obj, &workeridx);
731 
732   info = (WORKER_INFO *) esl_threads_GetData(obj, workeridx);
733 
734   w = esl_stopwatch_Create();
735   esl_stopwatch_Start(w);
736 
737   /* Convert to an optimized model */
738   bg = p7_bg_Create(info->abc);
739 
740   /* Create processing pipeline and hit list */
741   th  = p7_tophits_Create();
742   pli = p7_pipeline_Create(info->opts, 100, 100, FALSE, p7_SCAN_MODELS);
743 
744   p7_pli_NewSeq(pli, info->seq);
745 
746   /* loop until all sequences have been processed */
747   count = 1;
748   while (count > 0) {
749     int           inx;
750     int          blksz;
751     P7_OPROFILE **om;
752 
753     /* grab the next block of sequences */
754     if (pthread_mutex_lock(info->inx_mutex) != 0) p7_Fail("mutex lock failed");
755     inx   = *info->inx;
756     blksz = *info->blk_size;
757     if (inx > *info->limit) {
758       blksz /= 5;
759       if (blksz < 1000) {
760         *info->limit = info->om_cnt * 2;
761       } else {
762         *info->limit = inx + (info->om_cnt - inx) * 2 / 3;
763       }
764     }
765     *info->blk_size = blksz;
766     *info->inx += blksz;
767     if (pthread_mutex_unlock(info->inx_mutex) != 0) p7_Fail("mutex unlock failed");
768 
769     om    = info->om_list + inx;
770     count = info->om_cnt - inx;
771     if (count > blksz) count = blksz;
772 
773     /* Main loop: */
774     for (i = 0; i < count; ++i, ++om) {
775       p7_pli_NewModel(pli, *om, bg);
776       p7_bg_SetLength(bg, info->seq->n);
777       p7_oprofile_ReconfigLength(*om, info->seq->n);
778 
779       p7_Pipeline(pli, *om, bg, info->seq, NULL, th);
780       p7_pipeline_Reuse(pli);
781     }
782   }
783 
784   /* make available the pipeline objects to the main thread */
785   info->th = th;
786   info->pli = pli;
787 
788   /* clean up */
789   p7_bg_Destroy(bg);
790 
791   esl_stopwatch_Stop(w);
792   info->elapsed = w->elapsed;
793 
794   esl_stopwatch_Destroy(w);
795 
796   esl_threads_Finished(obj, workeridx);
797 
798   pthread_exit(NULL);
799   return;
800 }
801 
802 
803 static void
send_results(int fd,ESL_STOPWATCH * w,P7_TOPHITS * th,P7_PIPELINE * pli)804 send_results(int fd, ESL_STOPWATCH *w, P7_TOPHITS *th, P7_PIPELINE *pli){
805   HMMD_SEARCH_STATS   stats;
806   HMMD_SEARCH_STATUS  status;
807   uint8_t **buf = NULL; // Buffer for the main results message
808   uint8_t **buf2 = NULL; // Buffer for the initial HMMD_SEARCH_STATUS message
809   uint8_t *buf_ptr = NULL;
810   uint8_t *buf2_ptr = NULL;
811   uint32_t n = 0; // index within buffer of serialized data
812   uint32_t nalloc = 0; // Size of serialized buffer
813 
814   // set up handles to buffers
815   buf = &buf_ptr;
816   buf2 = &buf2_ptr;
817 
818 
819   /* Start filling out the fixed-length structures to be sent */
820   memset(&status, 0, sizeof(HMMD_SEARCH_STATUS)); /* silence valgrind errors - zero out entire structure including its padding */
821   status.status     = eslOK;
822   status.msg_size   = sizeof(stats);
823 
824   /* copy the search stats */
825   stats.elapsed     = w->elapsed;
826   stats.user        = w->user;
827   stats.sys         = w->sys;
828 
829   stats.nmodels     = pli->nmodels;
830   stats.nseqs       = pli->nseqs;
831   stats.n_past_msv  = pli->n_past_msv;
832   stats.n_past_bias = pli->n_past_bias;
833   stats.n_past_vit  = pli->n_past_vit;
834   stats.n_past_fwd  = pli->n_past_fwd;
835 
836   stats.Z           = pli->Z;
837   stats.domZ        = pli->domZ;
838   stats.Z_setby     = pli->Z_setby;
839   stats.domZ_setby  = pli->domZ_setby;
840 
841   stats.nhits       = th->N;
842   stats.nreported   = th->nreported;
843   stats.nincluded   = th->nincluded;
844   stats.hit_offsets = NULL; // This field is only used when sending results back to the client
845 
846   // Build the main buffer of result data, starting with the stats object
847   if(p7_hmmd_search_stats_Serialize(&stats, buf, &n, &nalloc) != eslOK){
848     LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATS failed", errno);
849   }
850 
851   // and then the hits
852   for(int i =0; i< stats.nhits; i++){
853     if(p7_hit_Serialize(&(th->unsrt[i]), buf, &n, &nalloc) != eslOK){
854       LOG_FATAL_MSG("Serializing P7_HIT failed", errno);
855     }
856   }
857 
858   status.msg_size = n; // n will have the number of bytes used to serialize the main data block
859   n = 0;
860   nalloc = 0; // reset these to serialize status object
861 
862   // Serialize the search_status object
863  if(hmmd_search_status_Serialize(&status, buf2, &n, &nalloc) != eslOK){
864     LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATUS failed", errno);
865   }
866 
867   // Send the status object
868   if (writen(fd, buf2_ptr, n) != n) LOG_FATAL_MSG("write", errno);
869 
870   // And the serialized data
871   if (writen(fd, buf_ptr, status.msg_size) != status.msg_size) LOG_FATAL_MSG("write", errno);
872   free(buf_ptr);
873   free(buf2_ptr);
874   printf("Bytes: %" PRId64 "  hits: %" PRId64 "  sent on socket %d\n", status.msg_size, stats.nhits, fd);
875   fflush(stdout);
876 }
877 
878 static int
setup_masterside_comm(ESL_GETOPTS * opts)879 setup_masterside_comm(ESL_GETOPTS *opts)
880 {
881   int    fd = -1;
882   int    cnt;
883   int    sec;
884   int    connected;
885 
886   struct sockaddr_in   addr;
887 
888   /* create a reliable, stream socket using TCP */
889   if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) LOG_FATAL_MSG("socket", errno);
890 
891   /* construct the server address structure */
892   memset(&addr, 0, sizeof(addr));
893   addr.sin_family = AF_INET;
894   addr.sin_port   = htons(esl_opt_GetInteger(opts, "--wport"));
895   if ((inet_pton(AF_INET, esl_opt_GetString(opts, "--worker"), &addr.sin_addr)) < 0) LOG_FATAL_MSG("inet pton", errno);
896 
897   /* try to connect to the master server */
898   cnt = 0;
899   sec = 1;
900   connected = -1;
901   while (connected < 0) {
902     /* establish the connection to the master server */
903     if ((connected = connect(fd, (struct sockaddr *) &addr, sizeof(addr))) < 0) {
904       if (errno != ECONNREFUSED) LOG_FATAL_MSG("connect", errno);
905 
906       /* the master server is not listening.  sleep and try again */
907       sleep(sec);
908       if (++cnt > 10) {
909         cnt = 0;
910         if (sec < 64) sec *= 2;
911       }
912     }
913   }
914 
915   return fd;
916 }
917 
918 #endif /*HMMER_THREADS*/
919 
920 
921 
922