1 /* worker side of the hmmer daemon
2 */
3 #include "p7_config.h"
4
5 #ifdef HMMER_THREADS
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <errno.h>
11 #include <unistd.h>
12 #include <signal.h>
13 #include <pthread.h>
14 #include <setjmp.h>
15 #include <sys/socket.h>
16 #ifdef HAVE_NETINET_IN_H
17 #include <netinet/in.h> /* On FreeBSD, you need netinet/in.h for struct sockaddr_in */
18 #endif /* On OpenBSD, netinet/in.h is required for (must precede) arpa/inet.h */
19 #include <arpa/inet.h>
20 #include <syslog.h>
21 #include <time.h>
22
23 #ifndef HMMER_THREADS
24 #error "Program requires pthreads be enabled."
25 #endif /*HMMER_THREADS*/
26
27 #include "easel.h"
28 #include "esl_alphabet.h"
29 #include "esl_getopts.h"
30 #include "esl_sq.h"
31 #include "esl_sqio.h"
32 #include "esl_stopwatch.h"
33 #include "esl_threads.h"
34
35 #include "hmmer.h"
36 #include "hmmpgmd.h"
37 #include "cachedb.h"
38 #include "p7_hmmcache.h"
39
40 #define MAX_WORKERS 64
41 #define MAX_BUFFER 4096
42
43 #define CONF_FILE "/etc/hmmpgmd.conf"
44
45 typedef struct {
46 HMMER_SEQ **sq_list; /* list of sequences to process */
47 int sq_cnt; /* number of sequences */
48 int db_Z; /* true number of sequences */
49
50 P7_OPROFILE **om_list; /* list of profiles to process */
51 int om_cnt; /* number of profiles */
52
53 pthread_mutex_t *inx_mutex; /* protect data */
54 int *blk_size; /* sequences per block */
55 int *limit; /* point to decrease block size */
56 int *inx; /* next index to process */
57
58 P7_HMM *hmm; /* query HMM */
59 ESL_SQ *seq; /* query sequence */
60 ESL_ALPHABET *abc; /* digital alphabet */
61 ESL_GETOPTS *opts; /* search specific options */
62
63 RANGE_LIST *range_list; /* (optional) list of ranges searched within the seqdb */
64
65 double elapsed; /* elapsed search time */
66
67 /* Structure created and populated by the individual threads.
68 * The main thread is responsible for freeing up the memory.
69 */
70 P7_PIPELINE *pli; /* work pipeline */
71 P7_TOPHITS *th; /* top hit results */
72 } WORKER_INFO;
73
74 typedef struct {
75 int fd; /* socket connection to server */
76 int ncpus; /* number of cpus to use */
77
78 P7_SEQCACHE *seq_db; /* cached sequence database */
79 P7_HMMCACHE *hmm_db; /* cached hmm database */
80 } WORKER_ENV;
81
82 static void process_InitCmd(HMMD_COMMAND *cmd, WORKER_ENV *env);
83 static void process_SearchCmd(HMMD_COMMAND *cmd, WORKER_ENV *env, QUEUE_DATA *query);
84 static void process_Shutdown(HMMD_COMMAND *cmd, WORKER_ENV *env);
85
86 static QUEUE_DATA *process_QueryCmd(HMMD_COMMAND *cmd, WORKER_ENV *env);
87
88 static int setup_masterside_comm(ESL_GETOPTS *opts);
89
90 static void send_results(int fd, ESL_STOPWATCH *w, P7_TOPHITS *th, P7_PIPELINE *pli);
91
92 #define BLOCK_SIZE 1000
93 static void search_thread(void *arg);
94 static void scan_thread(void *arg);
95
96 static void
print_timings(int i,double elapsed,P7_PIPELINE * pli)97 print_timings(int i, double elapsed, P7_PIPELINE *pli)
98 {
99 char buf1[16];
100 int h, m, s, hs;
101
102 h = (int) (elapsed / 3600.);
103 m = (int) (elapsed / 60.) - h * 60;
104 s = (int) (elapsed) - h * 3600 - m * 60;
105 hs = (int) (elapsed * 100.) - h * 360000 - m * 6000 - s * 100;
106 sprintf(buf1, "%02d:%02d.%02d", m,s,hs);
107
108 fprintf (stdout, "%2d %9" PRId64 " %9" PRId64 " %7" PRId64 " %7" PRId64 " %6" PRId64 " %5" PRId64 " %s\n",
109 i, pli->nseqs, pli->nres, pli->n_past_msv, pli->n_past_bias, pli->n_past_vit, pli->n_past_fwd, buf1);
110 }
111
112 static int
read_Command(HMMD_COMMAND ** ret_cmd,WORKER_ENV * env)113 read_Command(HMMD_COMMAND **ret_cmd, WORKER_ENV *env)
114 {
115 HMMD_HEADER hdr;
116 HMMD_COMMAND *cmd = NULL;
117 int n;
118
119 /* read the command header */
120 if (readn(env->fd, &hdr, sizeof(hdr)) == -1) {
121 if (errno && errno != ECONNREFUSED) LOG_FATAL_MSG("read", errno);
122 return eslEOD;
123 }
124
125 /* read the command data */
126 n = MSG_SIZE(&hdr);
127 if ((cmd = malloc(n)) == NULL) LOG_FATAL_MSG("malloc", errno);
128 memset(cmd, 0, n); /* avoid uninitialized bytes. remove this, if we ever serialize/deserialize structures properly */
129 cmd->hdr.command = hdr.command;
130 cmd->hdr.length = hdr.length;
131 if (hdr.length > 0) {
132 if (readn(env->fd, &cmd->init, hdr.length) == -1) {
133 if (errno && errno != ECONNREFUSED) LOG_FATAL_MSG("read", errno);
134 return eslEOD;
135 }
136 }
137
138 *ret_cmd = cmd;
139 return eslOK;
140 }
141
142 void
worker_process(ESL_GETOPTS * go)143 worker_process(ESL_GETOPTS *go)
144 {
145 HMMD_COMMAND *cmd = NULL; /* see hmmpgmd.h */
146 int shutdown = 0;
147 WORKER_ENV env;
148 int status;
149
150 QUEUE_DATA *query = NULL;
151
152 /* Initializations */
153 impl_Init();
154 p7_FLogsumInit(); /* we're going to use table-driven Logsum() approximations at times */
155
156 env.ncpus = ESL_MIN(esl_opt_GetInteger(go, "--cpu"), esl_threads_GetCPUCount());
157
158 env.hmm_db = NULL;
159 env.seq_db = NULL;
160 env.fd = setup_masterside_comm(go);
161
162 while (!shutdown)
163 {
164 if ((status = read_Command(&cmd, &env)) != eslOK) break;
165
166 switch (cmd->hdr.command) {
167 case HMMD_CMD_INIT: process_InitCmd (cmd, &env); break;
168 case HMMD_CMD_SCAN:
169 {
170 query = process_QueryCmd(cmd, &env);
171 process_SearchCmd(cmd, &env, query);
172 free_QueueData(query);
173 }
174 break;
175 case HMMD_CMD_SEARCH:
176 query = process_QueryCmd(cmd, &env);
177 process_SearchCmd(cmd, &env, query);
178 free_QueueData(query);
179 break;
180 case HMMD_CMD_SHUTDOWN: process_Shutdown (cmd, &env); shutdown = 1; break;
181 default: p7_syslog(LOG_ERR,"[%s:%d] - unknown command %d (%d)\n", __FILE__, __LINE__, cmd->hdr.command, cmd->hdr.length);
182 }
183
184 free(cmd);
185 cmd = NULL;
186 }
187
188 if (env.hmm_db) p7_hmmcache_Close(env.hmm_db);
189 if (env.seq_db) p7_seqcache_Close(env.seq_db);
190 if (env.fd != -1) close(env.fd);
191 return;
192 }
193
194
195 static void
process_SearchCmd(HMMD_COMMAND * cmd,WORKER_ENV * env,QUEUE_DATA * query)196 process_SearchCmd(HMMD_COMMAND *cmd, WORKER_ENV *env, QUEUE_DATA *query)
197 {
198 int i;
199 int cnt;
200 int limit;
201 int status;
202 int blk_size;
203 WORKER_INFO *info = NULL;
204 ESL_ALPHABET *abc;
205 ESL_STOPWATCH *w;
206 ESL_THREADS *threadObj = NULL;
207 pthread_mutex_t inx_mutex;
208 int current_index;
209 time_t date;
210 char timestamp[32];
211
212 w = esl_stopwatch_Create();
213 abc = esl_alphabet_Create(eslAMINO);
214
215 if (pthread_mutex_init(&inx_mutex, NULL) != 0) p7_Fail("mutex init failed");
216 ESL_ALLOC(info, sizeof(*info) * env->ncpus);
217
218 /* Log the current time (at search start) */
219 date = time(NULL);
220 ctime_r(&date, timestamp);
221 printf("\n%s", timestamp); /* note that ctime_r() leaves \n on end of timestamp */
222
223 /* initialize thread data */
224 esl_stopwatch_Start(w);
225
226 info->range_list = NULL;
227 if (esl_opt_IsUsed(query->opts, "--seqdb_ranges")) {
228 ESL_ALLOC(info->range_list, sizeof(RANGE_LIST));
229 hmmpgmd_GetRanges(info->range_list, esl_opt_GetString(query->opts, "--seqdb_ranges"));
230 }
231
232
233 if (query->cmd_type == HMMD_CMD_SEARCH) threadObj = esl_threads_Create(&search_thread);
234 else threadObj = esl_threads_Create(&scan_thread);
235
236 if (query->query_type == HMMD_SEQUENCE) {
237 fprintf(stdout, "Search seq %s [L=%ld]", query->seq->name, (long) query->seq->n);
238 } else {
239 fprintf(stdout, "Search hmm %s [M=%d]", query->hmm->name, query->hmm->M);
240 }
241 fprintf(stdout, " vs %s DB %d [%d - %d]",
242 (query->cmd_type == HMMD_CMD_SEARCH) ? "SEQ" : "HMM",
243 query->dbx, query->inx, query->inx + query->cnt - 1);
244
245 if (info->range_list)
246 fprintf(stdout, " in range(s) %s", esl_opt_GetString(query->opts, "--seqdb_ranges"));
247
248 fprintf(stdout, "\n");
249
250 /* Create processing pipeline and hit list */
251 for (i = 0; i < env->ncpus; ++i) {
252 info[i].abc = query->abc;
253 info[i].hmm = query->hmm;
254 info[i].seq = query->seq;
255 info[i].opts = query->opts;
256
257 info[i].range_list = info[0].range_list;
258
259 info[i].th = NULL;
260 info[i].pli = NULL;
261
262 info[i].inx_mutex = &inx_mutex;
263 info[i].inx = ¤t_index;/* this is confusing trickery - to share a single variable across all threads */
264 info[i].blk_size = &blk_size; /* ditto */
265 info[i].limit = &limit; /* ditto. TODO: come back and clean this up. */
266
267 if (query->cmd_type == HMMD_CMD_SEARCH) {
268 HMMER_SEQ **list = env->seq_db->db[query->dbx].list;
269 info[i].sq_list = &list[query->inx];
270 info[i].sq_cnt = query->cnt;
271 info[i].db_Z = env->seq_db->db[query->dbx].K;
272 info[i].om_list = NULL;
273 info[i].om_cnt = 0;
274 } else {
275 info[i].sq_list = NULL;
276 info[i].sq_cnt = 0;
277 info[i].db_Z = 0;
278 info[i].om_list = &env->hmm_db->list[query->inx];
279 info[i].om_cnt = query->cnt;
280 }
281
282 esl_threads_AddThread(threadObj, &info[i]);
283 }
284
285 /* try block size of 5000. we will need enough sequences for four
286 * blocks per thread or better.
287 */
288 blk_size = 5000;
289 cnt = query->cnt / env->ncpus / blk_size;
290 limit = query->cnt * 2 / 3;
291 if (cnt < 4) {
292 /* try block size of 1000 */
293 blk_size /= 5;
294 cnt = query->cnt / env->ncpus / blk_size;
295 if (cnt < 4) {
296 /* still not enough. just divide it up into one block per thread */
297 blk_size = query->cnt / env->ncpus + 1;
298 limit = query->cnt * 2;
299 }
300 }
301 current_index = 0;
302
303 esl_threads_WaitForStart(threadObj);
304 esl_threads_WaitForFinish(threadObj);
305
306 esl_stopwatch_Stop(w);
307 #if 1
308 fprintf (stdout, " Sequences Residues Elapsed\n");
309 for (i = 0; i < env->ncpus; ++i) {
310 print_timings(i, info[i].elapsed, info[i].pli);
311 }
312 #endif
313 /* merge the results of the search results */
314 for (i = 1; i < env->ncpus; ++i) {
315 p7_tophits_Merge(info[0].th, info[i].th);
316 p7_pipeline_Merge(info[0].pli, info[i].pli);
317 p7_pipeline_Destroy(info[i].pli);
318 p7_tophits_Destroy(info[i].th);
319 }
320
321 print_timings(99, w->elapsed, info[0].pli);
322 send_results(env->fd, w, info[0].th, info[0].pli);
323
324 /* free the last of the pipeline data */
325 p7_pipeline_Destroy(info->pli);
326 p7_tophits_Destroy(info->th);
327
328 esl_threads_Destroy(threadObj);
329
330 pthread_mutex_destroy(&inx_mutex);
331
332 if (info->range_list) {
333 if (info->range_list->starts) free(info->range_list->starts);
334 if (info->range_list->ends) free(info->range_list->ends);
335 free (info->range_list);
336 }
337
338 free(info);
339
340 esl_stopwatch_Destroy(w);
341 esl_alphabet_Destroy(abc);
342
343 return;
344
345 ERROR:
346 LOG_FATAL_MSG("malloc", errno);
347 }
348
349 static QUEUE_DATA *
process_QueryCmd(HMMD_COMMAND * cmd,WORKER_ENV * env)350 process_QueryCmd(HMMD_COMMAND *cmd, WORKER_ENV *env)
351 {
352 int i;
353 int n;
354 int status;
355
356 char *p;
357 char *name;
358 char *desc;
359 ESL_DSQ *dsq;
360
361 QUEUE_DATA *query = NULL;
362
363 if ((query = malloc(sizeof(QUEUE_DATA))) == NULL) LOG_FATAL_MSG("malloc", errno);
364 memset(query, 0, sizeof(QUEUE_DATA)); /* avoid uninitialized bytes. remove this, if we ever serialize/deserialize structures properly */
365
366 printf("CMD: %d %d\n", cmd->hdr.command, cmd->srch.query_type);
367
368 query->cmd_type = cmd->hdr.command;
369 query->query_type = cmd->srch.query_type;
370 query->dbx = cmd->srch.db_inx;
371 query->inx = cmd->srch.inx;
372 query->cnt = cmd->srch.cnt;
373 query->sock = env->fd;
374 query->cmd = NULL;
375
376 p = cmd->srch.data;
377
378 /* process search specific options */
379 status = process_searchopts(env->fd, p, &query->opts);
380 if (status != eslOK) LOG_FATAL_MSG("esl_getopts_Create", status);
381
382 query->hmm = NULL;
383 query->seq = NULL;
384
385 query->abc = esl_alphabet_Create(eslAMINO);
386
387 /* check if we are processing a sequence or hmm */
388 if (cmd->srch.query_type == HMMD_SEQUENCE) {
389 n = cmd->srch.query_length - 2;
390 name = p + cmd->srch.opts_length;
391 desc = name + strlen(name) + 1;
392 dsq = (ESL_DSQ *) (desc + strlen(desc) + 1);
393 query->seq = esl_sq_CreateDigitalFrom(query->abc, name, dsq, n, desc, NULL, NULL);
394 } else {
395 P7_HMM thmm;
396 P7_HMM *hmm = p7_hmm_CreateShell();
397
398 /* allocate memory for the hmm and initialize */
399 p += cmd->srch.opts_length;
400 memcpy(&thmm, p, sizeof(P7_HMM));
401
402 hmm->flags = thmm.flags;
403 p7_hmm_CreateBody(hmm, cmd->srch.query_length, query->abc);
404 p += sizeof(P7_HMM);
405
406 /* initialize fields */
407 hmm->nseq = thmm.nseq;
408 hmm->eff_nseq = thmm.eff_nseq;
409 hmm->max_length = thmm.max_length;
410 hmm->checksum = thmm.checksum;
411 hmm->ctime = NULL;
412 hmm->comlog = NULL;
413
414 for (i = 0; i < p7_NCUTOFFS; i++) hmm->cutoff[i] = thmm.cutoff[i];
415 for (i = 0; i < p7_NEVPARAM; i++) hmm->evparam[i] = thmm.evparam[i];
416 for (i = 0; i < p7_MAXABET; i++) hmm->compo[i] = thmm.compo[i];
417
418 /* fill in the hmm pointers */
419 n = sizeof(float) * (hmm->M + 1) * p7H_NTRANSITIONS;
420 memcpy(*hmm->t, p, n); p += n;
421
422 n = sizeof(float) * (hmm->M + 1) * query->abc->K;
423 memcpy(*hmm->mat, p, n); p += n;
424 memcpy(*hmm->ins, p, n); p += n;
425
426 if (thmm.name) { hmm->name = strdup(p); p += strlen(hmm->name) + 1; }
427 if (thmm.acc) { hmm->acc = strdup(p); p += strlen(hmm->acc) + 1; }
428 if (thmm.desc) { hmm->desc = strdup(p); p += strlen(hmm->desc) + 1; }
429
430 n = hmm->M + 2;
431 if (hmm->flags & p7H_RF) { memcpy(hmm->rf, p, n); p += n; }
432 if (hmm->flags & p7H_MMASK) { memcpy(hmm->mm, p, n); p += n; }
433 if (hmm->flags & p7H_CONS) { memcpy(hmm->consensus, p, n); p += n; }
434 if (hmm->flags & p7H_CS) { memcpy(hmm->cs, p, n); p += n; }
435 if (hmm->flags & p7H_CA) { memcpy(hmm->ca, p, n); p += n; }
436
437 n = sizeof(int) * (hmm->M + 1);
438 if (hmm->flags & p7H_MAP) { memcpy(hmm->map, p, n); p += n; }
439
440 query->hmm = hmm;
441 }
442
443 return query;
444 }
445
446 static void
process_Shutdown(HMMD_COMMAND * cmd,WORKER_ENV * env)447 process_Shutdown(HMMD_COMMAND *cmd, WORKER_ENV *env)
448 {
449 int n;
450
451 n = MSG_SIZE(cmd);
452 cmd->hdr.status = eslOK;
453 if (writen(env->fd, cmd, n) != n) {
454 LOG_FATAL_MSG("write error", errno);
455 }
456 }
457
458 static void
process_InitCmd(HMMD_COMMAND * cmd,WORKER_ENV * env)459 process_InitCmd(HMMD_COMMAND *cmd, WORKER_ENV *env)
460 {
461 char *p;
462 int n;
463 int status;
464
465 if (env->hmm_db != NULL) p7_hmmcache_Close(env->hmm_db);
466 if (env->seq_db != NULL) p7_seqcache_Close(env->seq_db);
467
468 env->hmm_db = NULL;
469 env->seq_db = NULL;
470
471 /* load the sequence database */
472 if (cmd->init.db_cnt != 0) {
473 P7_SEQCACHE *sdb = NULL;
474
475 p = cmd->init.data + cmd->init.seqdb_off;
476 status = p7_seqcache_Open(p, &sdb, NULL);
477 if (status != eslOK) {
478 p7_syslog(LOG_ERR,"[%s:%d] - p7_seqcache_Open %s error %d\n", __FILE__, __LINE__, p, status);
479 LOG_FATAL_MSG("cache seqdb error", status);
480 }
481
482 /* validate the sequence database */
483 cmd->init.sid[MAX_INIT_DESC-1] = 0;
484 if (strcmp (cmd->init.sid, sdb->id) != 0 || cmd->init.db_cnt != sdb->db_cnt || cmd->init.seq_cnt != sdb->count) {
485 p7_syslog(LOG_ERR,"[%s:%d] - seq db %s: integrity error %s - %s\n", __FILE__, __LINE__, p, cmd->init.sid, sdb->id);
486 LOG_FATAL_MSG("database integrity error", 0);
487 }
488
489 env->seq_db = sdb;
490 }
491
492 /* load the hmm database */
493 if (cmd->init.hmm_cnt != 0) {
494 P7_HMMCACHE *hcache = NULL;
495
496 p = cmd->init.data + cmd->init.hmmdb_off;
497
498 status = p7_hmmcache_Open(p, &hcache, NULL);
499 if (status != eslOK) {
500 p7_syslog(LOG_ERR,"[%s:%d] - p7_hmmcache_Open %s error %d\n", __FILE__, __LINE__, p, status);
501 LOG_FATAL_MSG("cache hmmdb error", status);
502 }
503
504 if ( (status = p7_hmmcache_SetNumericNames(hcache)) != eslOK){
505 p7_syslog(LOG_ERR,"[%s:%d] - p7_hmmcache_SetNumericNames %s error %d\n", __FILE__, __LINE__, p, status);
506 LOG_FATAL_MSG("cache hmmdb error", status);
507 }
508
509 /* validate the hmm database */
510 cmd->init.hid[MAX_INIT_DESC-1] = 0;
511 /* TODO: come up with a new pressed format with an id to compare - strcmp (cmd->init.hid, hdb->id) != 0 */
512 if (cmd->init.hmm_cnt != 1 || cmd->init.model_cnt != hcache->n) {
513 p7_syslog(LOG_ERR,"[%s:%d] - hmm db %s: integrity error\n", __FILE__, __LINE__, p);
514 LOG_FATAL_MSG("database integrity error", 0);
515 }
516
517 env->hmm_db = hcache;
518
519 printf("Loaded profile db %s; models: %d memory: %" PRId64 "\n",
520 p, hcache->n, (uint64_t) p7_hmmcache_Sizeof(hcache));
521
522 }
523
524 /* if stdout is redirected at the commandline, it causes printf's to be buffered,
525 * which means status logging isn't printed. This line strongly requests unbuffering,
526 * which should be ok, given the low stdout load of hmmpgmd
527 */
528 setvbuf (stdout, NULL, _IONBF, BUFSIZ);
529 printf("Data loaded into memory. Worker is ready.\n");
530 setvbuf (stdout, NULL, _IOFBF, BUFSIZ);
531
532
533 /* write back to the master that we are on line */
534 n = MSG_SIZE(cmd);
535 cmd->hdr.status = eslOK;
536 if (writen(env->fd, cmd, n) != n) {
537 LOG_FATAL_MSG("write error", errno);
538 }
539 }
540
541
542 static void
search_thread(void * arg)543 search_thread(void *arg)
544 {
545 int i;
546 int count;
547 int seed;
548 int status;
549 int workeridx;
550 WORKER_INFO *info;
551 ESL_THREADS *obj;
552 ESL_SQ dbsq;
553 ESL_STOPWATCH *w = NULL; /* timing stopwatch */
554 P7_BUILDER *bld = NULL; /* HMM construction configuration */
555 P7_BG *bg = NULL; /* null model */
556 P7_PIPELINE *pli = NULL; /* work pipeline */
557 P7_TOPHITS *th = NULL; /* top hit results */
558 P7_PROFILE *gm = NULL; /* generic model */
559 P7_OPROFILE *om = NULL; /* optimized query profile */
560
561 obj = (ESL_THREADS *) arg;
562 esl_threads_Started(obj, &workeridx);
563
564 info = (WORKER_INFO *) esl_threads_GetData(obj, workeridx);
565 w = esl_stopwatch_Create();
566 bg = p7_bg_Create(info->abc);
567 esl_stopwatch_Start(w);
568
569 /* set up the dummy description and accession fields */
570 dbsq.desc = "";
571 dbsq.acc = "";
572
573 /* process a query sequence or hmm */
574 if (info->seq != NULL) {
575 bld = p7_builder_Create(NULL, info->abc);
576 if ((seed = esl_opt_GetInteger(info->opts, "--seed")) > 0) {
577 esl_randomness_Init(bld->r, seed);
578 bld->do_reseeding = TRUE;
579 }
580 bld->EmL = esl_opt_GetInteger(info->opts, "--EmL");
581 bld->EmN = esl_opt_GetInteger(info->opts, "--EmN");
582 bld->EvL = esl_opt_GetInteger(info->opts, "--EvL");
583 bld->EvN = esl_opt_GetInteger(info->opts, "--EvN");
584 bld->EfL = esl_opt_GetInteger(info->opts, "--EfL");
585 bld->EfN = esl_opt_GetInteger(info->opts, "--EfN");
586 bld->Eft = esl_opt_GetReal (info->opts, "--Eft");
587
588 if (esl_opt_IsOn(info->opts, "--mxfile")) status = p7_builder_SetScoreSystem (bld, esl_opt_GetString(info->opts, "--mxfile"), NULL, esl_opt_GetReal(info->opts, "--popen"), esl_opt_GetReal(info->opts, "--pextend"), bg);
589 else status = p7_builder_LoadScoreSystem(bld, esl_opt_GetString(info->opts, "--mx"), esl_opt_GetReal(info->opts, "--popen"), esl_opt_GetReal(info->opts, "--pextend"), bg);
590 if (status != eslOK) {
591 //client_error(info->sock, status, "hmmgpmd: failed to set single query sequence score system: %s", bld->errbuf);
592 fprintf(stderr, "hmmpgmd: failed to set single query sequence score system: %s", bld->errbuf);
593 pthread_exit(NULL);
594 return;
595 }
596 p7_SingleBuilder(bld, info->seq, bg, NULL, NULL, NULL, &om); /* bypass HMM - only need model */
597 p7_builder_Destroy(bld);
598 } else {
599 gm = p7_profile_Create (info->hmm->M, info->abc);
600 om = p7_oprofile_Create(info->hmm->M, info->abc);
601 p7_ProfileConfig(info->hmm, bg, gm, 100, p7_LOCAL);
602 p7_oprofile_Convert(gm, om);
603 }
604
605 /* Create processing pipeline and hit list */
606 th = p7_tophits_Create();
607 pli = p7_pipeline_Create(info->opts, om->M, 100, FALSE, p7_SEARCH_SEQS);
608 p7_pli_NewModel(pli, om, bg);
609
610 if (pli->Z_setby == p7_ZSETBY_NTARGETS) pli->Z = info->db_Z;
611
612 /* loop until all sequences have been processed */
613 count = 1;
614 while (count > 0) {
615 int inx;
616 int blksz;
617 HMMER_SEQ **sq;
618
619 /* grab the next block of sequences */
620 if (pthread_mutex_lock(info->inx_mutex) != 0) p7_Fail("mutex lock failed");
621 inx = *info->inx;
622 blksz = *info->blk_size;
623 if (inx > *info->limit) {
624 blksz /= 5;
625 if (blksz < 1000) {
626 *info->limit = info->sq_cnt * 2;
627 } else {
628 *info->limit = inx + (info->sq_cnt - inx) * 2 / 3;
629 }
630 }
631 *info->blk_size = blksz;
632 *info->inx += blksz;
633 if (pthread_mutex_unlock(info->inx_mutex) != 0) p7_Fail("mutex unlock failed");
634
635 sq = info->sq_list + inx;
636
637 count = info->sq_cnt - inx;
638 if (count > blksz) count = blksz;
639
640 /* Main loop: */
641 for (i = 0; i < count; ++i, ++sq) {
642 if ( !(info->range_list) || hmmpgmd_IsWithinRanges ((*sq)->idx, info->range_list)) {
643 dbsq.name = (*sq)->name;
644 dbsq.dsq = (*sq)->dsq;
645 dbsq.n = (*sq)->n;
646 dbsq.idx = (*sq)->idx;
647 if((*sq)->desc != NULL) dbsq.desc = (*sq)->desc;
648
649 p7_bg_SetLength(bg, dbsq.n);
650 p7_oprofile_ReconfigLength(om, dbsq.n);
651
652 p7_Pipeline(pli, om, bg, &dbsq, NULL, th);
653
654 p7_pipeline_Reuse(pli);
655 }
656 }
657 }
658
659 /* make available the pipeline objects to the main thread */
660 info->th = th;
661 info->pli = pli;
662
663 /* clean up */
664 p7_bg_Destroy(bg);
665 p7_oprofile_Destroy(om);
666
667 if (gm != NULL) p7_profile_Destroy(gm);
668
669 esl_stopwatch_Stop(w);
670 info->elapsed = w->elapsed;
671
672 esl_stopwatch_Destroy(w);
673
674 esl_threads_Finished(obj, workeridx);
675
676 pthread_exit(NULL);
677 return;
678 }
679
680 static void
scan_thread(void * arg)681 scan_thread(void *arg)
682 {
683 int i;
684 int count;
685 int workeridx;
686 WORKER_INFO *info;
687 ESL_THREADS *obj;
688
689 ESL_STOPWATCH *w;
690
691 P7_BG *bg = NULL; /* null model */
692 P7_PIPELINE *pli = NULL; /* work pipeline */
693 P7_TOPHITS *th = NULL; /* top hit results */
694
695 obj = (ESL_THREADS *) arg;
696 esl_threads_Started(obj, &workeridx);
697
698 info = (WORKER_INFO *) esl_threads_GetData(obj, workeridx);
699
700 w = esl_stopwatch_Create();
701 esl_stopwatch_Start(w);
702
703 /* Convert to an optimized model */
704 bg = p7_bg_Create(info->abc);
705
706 /* Create processing pipeline and hit list */
707 th = p7_tophits_Create();
708 pli = p7_pipeline_Create(info->opts, 100, 100, FALSE, p7_SCAN_MODELS);
709
710 p7_pli_NewSeq(pli, info->seq);
711
712 /* loop until all sequences have been processed */
713 count = 1;
714 while (count > 0) {
715 int inx;
716 int blksz;
717 P7_OPROFILE **om;
718
719 /* grab the next block of sequences */
720 if (pthread_mutex_lock(info->inx_mutex) != 0) p7_Fail("mutex lock failed");
721 inx = *info->inx;
722 blksz = *info->blk_size;
723 if (inx > *info->limit) {
724 blksz /= 5;
725 if (blksz < 1000) {
726 *info->limit = info->om_cnt * 2;
727 } else {
728 *info->limit = inx + (info->om_cnt - inx) * 2 / 3;
729 }
730 }
731 *info->blk_size = blksz;
732 *info->inx += blksz;
733 if (pthread_mutex_unlock(info->inx_mutex) != 0) p7_Fail("mutex unlock failed");
734
735 om = info->om_list + inx;
736 count = info->om_cnt - inx;
737 if (count > blksz) count = blksz;
738
739 /* Main loop: */
740 for (i = 0; i < count; ++i, ++om) {
741 p7_pli_NewModel(pli, *om, bg);
742 p7_bg_SetLength(bg, info->seq->n);
743 p7_oprofile_ReconfigLength(*om, info->seq->n);
744
745 p7_Pipeline(pli, *om, bg, info->seq, NULL, th);
746 p7_pipeline_Reuse(pli);
747 }
748 }
749
750 /* make available the pipeline objects to the main thread */
751 info->th = th;
752 info->pli = pli;
753
754 /* clean up */
755 p7_bg_Destroy(bg);
756
757 esl_stopwatch_Stop(w);
758 info->elapsed = w->elapsed;
759
760 esl_stopwatch_Destroy(w);
761
762 esl_threads_Finished(obj, workeridx);
763
764 pthread_exit(NULL);
765 return;
766 }
767
768
769 static void
send_results(int fd,ESL_STOPWATCH * w,P7_TOPHITS * th,P7_PIPELINE * pli)770 send_results(int fd, ESL_STOPWATCH *w, P7_TOPHITS *th, P7_PIPELINE *pli){
771 HMMD_SEARCH_STATS stats;
772 HMMD_SEARCH_STATUS status;
773 uint8_t **buf = NULL; // Buffer for the main results message
774 uint8_t **buf2 = NULL; // Buffer for the initial HMMD_SEARCH_STATUS message
775 uint8_t *buf_ptr = NULL;
776 uint8_t *buf2_ptr = NULL;
777 uint32_t n = 0; // index within buffer of serialized data
778 uint32_t nalloc = 0; // Size of serialized buffer
779
780 // set up handles to buffers
781 buf = &buf_ptr;
782 buf2 = &buf2_ptr;
783
784
785 /* Start filling out the fixed-length structures to be sent */
786 memset(&status, 0, sizeof(HMMD_SEARCH_STATUS)); /* silence valgrind errors - zero out entire structure including its padding */
787 status.status = eslOK;
788 status.msg_size = sizeof(stats);
789
790 /* copy the search stats */
791 stats.elapsed = w->elapsed;
792 stats.user = w->user;
793 stats.sys = w->sys;
794
795 stats.nmodels = pli->nmodels;
796 stats.nseqs = pli->nseqs;
797 stats.n_past_msv = pli->n_past_msv;
798 stats.n_past_bias = pli->n_past_bias;
799 stats.n_past_vit = pli->n_past_vit;
800 stats.n_past_fwd = pli->n_past_fwd;
801
802 stats.Z = pli->Z;
803 stats.domZ = pli->domZ;
804 stats.Z_setby = pli->Z_setby;
805 stats.domZ_setby = pli->domZ_setby;
806
807 stats.nhits = th->N;
808 stats.nreported = th->nreported;
809 stats.nincluded = th->nincluded;
810 stats.hit_offsets = NULL; // This field is only used when sending results back to the client
811 if(p7_hmmd_search_stats_Serialize(&stats, buf, &n, &nalloc) != eslOK){
812 LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATS failed", errno);
813 }
814
815 // and then the hits
816 for(int i =0; i< stats.nhits; i++){
817 if(p7_hit_Serialize(&(th->unsrt[i]), buf, &n, &nalloc) != eslOK){
818 LOG_FATAL_MSG("Serializing P7_HIT failed", errno);
819 }
820 }
821
822 status.msg_size = n; // n will have the number of bytes used to serialize the main data block
823 n = 0;
824 nalloc = 0; // reset these to serialize status object
825
826 // Serialize the search_status object
827 if(hmmd_search_status_Serialize(&status, buf2, &n, &nalloc) != eslOK){
828 LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATUS failed", errno);
829 }
830
831 // Send the status object
832 if (writen(fd, buf2_ptr, n) != n) LOG_FATAL_MSG("write", errno);
833
834 // And the serialized data
835 if (writen(fd, buf_ptr, status.msg_size) != status.msg_size) LOG_FATAL_MSG("write", errno);
836 free(buf_ptr);
837 free(buf2_ptr);
838 printf("Bytes: %" PRId64 " hits: %" PRId64 " sent on socket %d\n", status.msg_size, stats.nhits, fd);
839 fflush(stdout);
840 }
841
842
843 static int
setup_masterside_comm(ESL_GETOPTS * opts)844 setup_masterside_comm(ESL_GETOPTS *opts)
845 {
846 int fd = -1;
847 int cnt;
848 int sec;
849 int connected;
850
851 struct sockaddr_in addr;
852
853 /* create a reliable, stream socket using TCP */
854 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) LOG_FATAL_MSG("socket", errno);
855
856 /* construct the server address structure */
857 memset(&addr, 0, sizeof(addr));
858 addr.sin_family = AF_INET;
859 addr.sin_port = htons(esl_opt_GetInteger(opts, "--wport"));
860 if ((inet_pton(AF_INET, esl_opt_GetString(opts, "--worker"), &addr.sin_addr)) < 0) LOG_FATAL_MSG("inet pton", errno);
861
862 /* try to connect to the master server */
863 cnt = 0;
864 sec = 1;
865 connected = -1;
866 while (connected < 0) {
867 /* establish the connection to the master server */
868 if ((connected = connect(fd, (struct sockaddr *) &addr, sizeof(addr))) < 0) {
869 if (errno != ECONNREFUSED) LOG_FATAL_MSG("connect", errno);
870
871 /* the master server is not listening. sleep and try again */
872 sleep(sec);
873 if (++cnt > 10) {
874 cnt = 0;
875 if (sec < 64) sec *= 2;
876 }
877 }
878 }
879
880 return fd;
881 }
882
883 #endif /*HMMER_THREADS*/
884
885
886
887