1 /* worker side of the hmmer daemon
2 */
3 #include "p7_config.h"
4
5 #ifdef HMMER_THREADS
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <errno.h>
11 #include <unistd.h>
12 #include <signal.h>
13 #include <pthread.h>
14 #include <setjmp.h>
15 #include <sys/socket.h>
16 #ifdef HAVE_NETINET_IN_H
17 #include <netinet/in.h> /* On FreeBSD, you need netinet/in.h for struct sockaddr_in */
18 #endif /* On OpenBSD, netinet/in.h is required for (must precede) arpa/inet.h */
19 #include <arpa/inet.h>
20 #include <syslog.h>
21 #include <time.h>
22
23 #ifndef HMMER_THREADS
24 #error "Program requires pthreads be enabled."
25 #endif /*HMMER_THREADS*/
26
27 #include "easel.h"
28 #include "esl_alphabet.h"
29 #include "esl_getopts.h"
30 #include "esl_sq.h"
31 #include "esl_sqio.h"
32 #include "esl_stopwatch.h"
33 #include "esl_threads.h"
34
35 #include "hmmer.h"
36 #include "hmmpgmd.h"
37 #include "hmmpgmd_shard.h"
38 #include "cachedb.h"
39 #include "cachedb_shard.h"
40 #include "p7_hmmcache.h"
41
42 #define MAX_WORKERS 64
43 #define MAX_BUFFER 4096
44
45 #define CONF_FILE "/etc/hmmpgmd.conf"
46
47 typedef struct {
48 HMMER_SEQ **sq_list; /* list of sequences to process */
49 int sq_cnt; /* number of sequences */
50 int db_Z; /* true number of sequences */
51
52 P7_OPROFILE **om_list; /* list of profiles to process */
53 int om_cnt; /* number of profiles */
54
55 pthread_mutex_t *inx_mutex; /* protect data */
56 int *blk_size; /* sequences per block */
57 int *limit; /* point to decrease block size */
58 int *inx; /* next index to process */
59
60 P7_HMM *hmm; /* query HMM */
61 ESL_SQ *seq; /* query sequence */
62 ESL_ALPHABET *abc; /* digital alphabet */
63 ESL_GETOPTS *opts; /* search specific options */
64
65 RANGE_LIST *range_list; /* (optional) list of ranges searched within the seqdb */
66
67 double elapsed; /* elapsed search time */
68
69 /* Structure created and populated by the individual threads.
70 * The main thread is responsible for freeing up the memory.
71 */
72 P7_PIPELINE *pli; /* work pipeline */
73 P7_TOPHITS *th; /* top hit results */
74 } WORKER_INFO;
75
76 typedef struct {
77 int fd; /* socket connection to server */
78 int ncpus; /* number of cpus to use */
79
80 P7_SEQCACHE *seq_db; /* cached sequence database */
81 P7_HMMCACHE *hmm_db; /* cached hmm database */
82 } WORKER_ENV;
83
84
85 static void
free_QueueData_shard(QUEUE_DATA_SHARD * data)86 free_QueueData_shard(QUEUE_DATA_SHARD *data)
87 {
88 /* free the query data */
89 esl_getopts_Destroy(data->opts);
90
91 if (data->abc != NULL) esl_alphabet_Destroy(data->abc);
92 if (data->hmm != NULL) p7_hmm_Destroy(data->hmm);
93 if (data->seq != NULL) esl_sq_Destroy(data->seq);
94 if (data->cmd != NULL) free(data->cmd);
95 memset(data, 0, sizeof(*data));
96 free(data);
97 }
98
99 static void process_InitCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env);
100 static void process_SearchCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env, QUEUE_DATA_SHARD *query);
101 static void process_Shutdown(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env);
102
103 static QUEUE_DATA_SHARD *process_QueryCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env);
104
105 static int setup_masterside_comm(ESL_GETOPTS *opts);
106
107 static void send_results(int fd, ESL_STOPWATCH *w, P7_TOPHITS *th, P7_PIPELINE *pli);
108
109 #define BLOCK_SIZE 1000
110 static void search_thread(void *arg);
111 static void scan_thread(void *arg);
112
113 static void
print_timings(int i,double elapsed,P7_PIPELINE * pli)114 print_timings(int i, double elapsed, P7_PIPELINE *pli)
115 {
116 char buf1[16];
117 int h, m, s, hs;
118
119 h = (int) (elapsed / 3600.);
120 m = (int) (elapsed / 60.) - h * 60;
121 s = (int) (elapsed) - h * 3600 - m * 60;
122 hs = (int) (elapsed * 100.) - h * 360000 - m * 6000 - s * 100;
123 sprintf(buf1, "%02d:%02d.%02d", m,s,hs);
124
125 fprintf (stdout, "%2d %9" PRId64 " %9" PRId64 " %7" PRId64 " %7" PRId64 " %6" PRId64 " %5" PRId64 " %s\n",
126 i, pli->nseqs, pli->nres, pli->n_past_msv, pli->n_past_bias, pli->n_past_vit, pli->n_past_fwd, buf1);
127 }
128
129 static int
read_Command(HMMD_COMMAND_SHARD ** ret_cmd,WORKER_ENV * env)130 read_Command(HMMD_COMMAND_SHARD **ret_cmd, WORKER_ENV *env)
131 {
132 HMMD_HEADER hdr;
133 HMMD_COMMAND_SHARD *cmd = NULL;
134 int n;
135
136 /* read the command header */
137 if (readn(env->fd, &hdr, sizeof(hdr)) == -1) {
138 if (errno && errno != ECONNREFUSED) LOG_FATAL_MSG("read", errno);
139 return eslEOD;
140 }
141
142 /* read the command data */
143 n = MSG_SIZE(&hdr);
144 if ((cmd = malloc(n)) == NULL) LOG_FATAL_MSG("malloc", errno);
145 memset(cmd, 0, n); /* avoid uninitialized bytes. remove this, if we ever serialize/deserialize structures properly */
146 cmd->hdr.command = hdr.command;
147 cmd->hdr.length = hdr.length;
148 if (hdr.length > 0) {
149 if (readn(env->fd, &cmd->init, hdr.length) == -1) {
150 if (errno && errno != ECONNREFUSED) LOG_FATAL_MSG("read", errno);
151 return eslEOD;
152 }
153 }
154
155 *ret_cmd = cmd;
156 return eslOK;
157 }
158
159 void
worker_process_shard(ESL_GETOPTS * go)160 worker_process_shard(ESL_GETOPTS *go)
161 {
162 HMMD_COMMAND_SHARD *cmd = NULL; /* see hmmpgmd.h */
163 int shutdown = 0;
164 WORKER_ENV env;
165 int status;
166
167 QUEUE_DATA_SHARD *query = NULL;
168
169 /* Initializations */
170 impl_Init();
171 p7_FLogsumInit(); /* we're going to use table-driven Logsum() approximations at times */
172
173 env.ncpus = ESL_MIN(esl_opt_GetInteger(go, "--cpu"), esl_threads_GetCPUCount());
174
175 env.hmm_db = NULL;
176 env.seq_db = NULL;
177 env.fd = setup_masterside_comm(go);
178
179 while (!shutdown)
180 {
181 if ((status = read_Command(&cmd, &env)) != eslOK) break;
182
183
184 switch (cmd->hdr.command) {
185 case HMMD_CMD_INIT: /* printf("Worker received init command\n"); */ process_InitCmd (cmd, &env); break;
186 case HMMD_CMD_SCAN:
187 {
188 query = process_QueryCmd(cmd, &env);
189 process_SearchCmd(cmd, &env, query);
190 free_QueueData_shard(query);
191 }
192 break;
193 case HMMD_CMD_SEARCH:
194 query = process_QueryCmd(cmd, &env);
195 process_SearchCmd(cmd, &env, query);
196 free_QueueData_shard(query);
197 break;
198 case HMMD_CMD_SHUTDOWN: process_Shutdown (cmd, &env); shutdown = 1; break;
199 default: p7_syslog(LOG_ERR,"[%s:%d] - unknown command %d (%d)\n", __FILE__, __LINE__, cmd->hdr.command, cmd->hdr.length);
200 }
201
202 free(cmd);
203 cmd = NULL;
204 }
205
206 if (env.hmm_db) p7_hmmcache_Close(env.hmm_db);
207 if (env.seq_db) p7_seqcache_Close(env.seq_db);
208 if (env.fd != -1) close(env.fd);
209 return;
210 }
211
212
213 static void
process_SearchCmd(HMMD_COMMAND_SHARD * cmd,WORKER_ENV * env,QUEUE_DATA_SHARD * query)214 process_SearchCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env, QUEUE_DATA_SHARD *query)
215 {
216 int i;
217 int cnt;
218 int limit;
219 int status;
220 int blk_size;
221 WORKER_INFO *info = NULL;
222 ESL_ALPHABET *abc;
223 ESL_STOPWATCH *w;
224 ESL_THREADS *threadObj = NULL;
225 pthread_mutex_t inx_mutex;
226 int current_index;
227 time_t date;
228 char timestamp[32];
229
230 w = esl_stopwatch_Create();
231 abc = esl_alphabet_Create(eslAMINO);
232
233 if (pthread_mutex_init(&inx_mutex, NULL) != 0) p7_Fail("mutex init failed");
234 ESL_ALLOC(info, sizeof(*info) * env->ncpus);
235
236 /* Log the current time (at search start) */
237 date = time(NULL);
238 ctime_r(&date, timestamp);
239 printf("\n%s", timestamp); /* note that ctime_r() leaves \n on end of timestamp */
240
241 /* initialize thread data */
242 esl_stopwatch_Start(w);
243
244 info->range_list = NULL;
245 if (esl_opt_IsUsed(query->opts, "--seqdb_ranges")) {
246 ESL_ALLOC(info->range_list, sizeof(RANGE_LIST));
247 hmmpgmd_GetRanges(info->range_list, esl_opt_GetString(query->opts, "--seqdb_ranges"));
248 /* printf("Range-list query detected, with ranges: ");
249 for(int temp = 0; temp < info->range_list->N; temp++){
250 if(temp > 0){
251 printf(", ");
252 }
253 printf("%d..%d", info->range_list->starts[temp], info->range_list->ends[temp]);
254 }
255 printf("\n"); */
256 }
257
258
259 if (query->cmd_type == HMMD_CMD_SEARCH) threadObj = esl_threads_Create(&search_thread);
260 else threadObj = esl_threads_Create(&scan_thread);
261
262 if (query->query_type == HMMD_SEQUENCE) {
263 fprintf(stdout, "Search seq %s [L=%ld]", query->seq->name, (long) query->seq->n);
264 } else {
265 fprintf(stdout, "Search hmm %s [M=%d]", query->hmm->name, query->hmm->M);
266 }
267 fprintf(stdout, " vs %s DB %d [%d - %d]",
268 (query->cmd_type == HMMD_CMD_SEARCH) ? "SEQ" : "HMM",
269 query->dbx, query->inx, query->inx + query->cnt - 1);
270
271 if (info->range_list)
272 fprintf(stdout, " in range(s) %s", esl_opt_GetString(query->opts, "--seqdb_ranges"));
273
274 fprintf(stdout, "\n");
275
276 /* Create processing pipeline and hit list */
277 for (i = 0; i < env->ncpus; ++i) {
278 info[i].abc = query->abc;
279 info[i].hmm = query->hmm;
280 info[i].seq = query->seq;
281 info[i].opts = query->opts;
282
283 info[i].range_list = info[0].range_list;
284
285 info[i].th = NULL;
286 info[i].pli = NULL;
287
288 info[i].inx_mutex = &inx_mutex;
289 info[i].inx = ¤t_index;/* this is confusing trickery - to share a single variable across all threads */
290 info[i].blk_size = &blk_size; /* ditto */
291 info[i].limit = &limit; /* ditto. TODO: come back and clean this up. */
292
293 if (query->cmd_type == HMMD_CMD_SEARCH) {
294 HMMER_SEQ **list = env->seq_db->db[query->dbx].list;
295 info[i].sq_list = &list[0]; // Always search the whole DB in sharded mode
296 //info[i].sq_cnt = query->cnt;
297 info[i].sq_cnt = env->seq_db->db[query->dbx].count;
298 info[i].db_Z = env->seq_db->db[query->dbx].K;
299 info[i].om_list = NULL;
300 info[i].om_cnt = 0;
301 } else {
302 info[i].sq_list = NULL;
303 info[i].sq_cnt = 0;
304 info[i].db_Z = 0;
305 info[i].om_list = &env->hmm_db->list[query->inx];
306 info[i].om_cnt = query->cnt;
307 }
308
309 esl_threads_AddThread(threadObj, &info[i]);
310 }
311
312 /* try block size of 5000. we will need enough sequences for four
313 * blocks per thread or better.
314 */
315 blk_size = 5000;
316 cnt = query->cnt / env->ncpus / blk_size;
317 limit = query->cnt * 2 / 3;
318 if (cnt < 4) {
319 /* try block size of 1000 */
320 blk_size /= 5;
321 cnt = query->cnt / env->ncpus / blk_size;
322 if (cnt < 4) {
323 /* still not enough. just divide it up into one block per thread */
324 blk_size = query->cnt / env->ncpus + 1;
325 limit = query->cnt * 2;
326 }
327 }
328 current_index = 0;
329
330 esl_threads_WaitForStart(threadObj);
331 esl_threads_WaitForFinish(threadObj);
332
333 esl_stopwatch_Stop(w);
334 #if 1
335 fprintf (stdout, " Sequences Residues Elapsed\n");
336 for (i = 0; i < env->ncpus; ++i) {
337 print_timings(i, info[i].elapsed, info[i].pli);
338 }
339 #endif
340 /* merge the results of the search results */
341 for (i = 1; i < env->ncpus; ++i) {
342 p7_tophits_Merge(info[0].th, info[i].th);
343 p7_pipeline_Merge(info[0].pli, info[i].pli);
344 p7_pipeline_Destroy(info[i].pli);
345 p7_tophits_Destroy(info[i].th);
346 }
347
348 print_timings(99, w->elapsed, info[0].pli);
349 send_results(env->fd, w, info[0].th, info[0].pli);
350
351 /* free the last of the pipeline data */
352 p7_pipeline_Destroy(info->pli);
353 p7_tophits_Destroy(info->th);
354
355 esl_threads_Destroy(threadObj);
356
357 pthread_mutex_destroy(&inx_mutex);
358
359 if (info->range_list) {
360 if (info->range_list->starts) free(info->range_list->starts);
361 if (info->range_list->ends) free(info->range_list->ends);
362 free (info->range_list);
363 }
364
365 free(info);
366
367 esl_stopwatch_Destroy(w);
368 esl_alphabet_Destroy(abc);
369
370 return;
371
372 ERROR:
373 LOG_FATAL_MSG("malloc", errno);
374 }
375
376 static QUEUE_DATA_SHARD *
process_QueryCmd(HMMD_COMMAND_SHARD * cmd,WORKER_ENV * env)377 process_QueryCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env)
378 {
379 int i;
380 int n;
381 int status;
382
383 char *p;
384 char *name;
385 char *desc;
386 ESL_DSQ *dsq;
387
388 QUEUE_DATA_SHARD *query = NULL;
389
390 if ((query = malloc(sizeof(QUEUE_DATA_SHARD))) == NULL) LOG_FATAL_MSG("malloc", errno);
391 memset(query, 0, sizeof(QUEUE_DATA_SHARD)); /* avoid uninitialized bytes. remove this, if we ever serialize/deserialize structures properly */
392
393 printf("CMD: %d %d\n", cmd->hdr.command, cmd->srch.query_type);
394
395 query->cmd_type = cmd->hdr.command;
396 query->query_type = cmd->srch.query_type;
397 query->dbx = cmd->srch.db_inx;
398 query->inx = cmd->srch.inx;
399 query->cnt = cmd->srch.cnt;
400 query->sock = env->fd;
401 query->cmd = NULL;
402
403 p = cmd->srch.data;
404
405 /* process search specific options */
406 status = process_searchopts(env->fd, p, &query->opts);
407 if (status != eslOK) LOG_FATAL_MSG("esl_getopts_Create", status);
408
409 query->hmm = NULL;
410 query->seq = NULL;
411
412 query->abc = esl_alphabet_Create(eslAMINO);
413
414 /* check if we are processing a sequence or hmm */
415 if (cmd->srch.query_type == HMMD_SEQUENCE) {
416 n = cmd->srch.query_length - 2;
417 name = p + cmd->srch.opts_length;
418 desc = name + strlen(name) + 1;
419 dsq = (ESL_DSQ *) (desc + strlen(desc) + 1);
420 query->seq = esl_sq_CreateDigitalFrom(query->abc, name, dsq, n, desc, NULL, NULL);
421 } else {
422 P7_HMM thmm;
423 P7_HMM *hmm = p7_hmm_CreateShell();
424
425 /* allocate memory for the hmm and initialize */
426 p += cmd->srch.opts_length;
427 memcpy(&thmm, p, sizeof(P7_HMM));
428
429 hmm->flags = thmm.flags;
430 p7_hmm_CreateBody(hmm, cmd->srch.query_length, query->abc);
431 p += sizeof(P7_HMM);
432
433 /* initialize fields */
434 hmm->nseq = thmm.nseq;
435 hmm->eff_nseq = thmm.eff_nseq;
436 hmm->max_length = thmm.max_length;
437 hmm->checksum = thmm.checksum;
438 hmm->ctime = NULL;
439 hmm->comlog = NULL;
440
441 for (i = 0; i < p7_NCUTOFFS; i++) hmm->cutoff[i] = thmm.cutoff[i];
442 for (i = 0; i < p7_NEVPARAM; i++) hmm->evparam[i] = thmm.evparam[i];
443 for (i = 0; i < p7_MAXABET; i++) hmm->compo[i] = thmm.compo[i];
444
445 /* fill in the hmm pointers */
446 n = sizeof(float) * (hmm->M + 1) * p7H_NTRANSITIONS;
447 memcpy(*hmm->t, p, n); p += n;
448
449 n = sizeof(float) * (hmm->M + 1) * query->abc->K;
450 memcpy(*hmm->mat, p, n); p += n;
451 memcpy(*hmm->ins, p, n); p += n;
452
453 if (thmm.name) { hmm->name = strdup(p); p += strlen(hmm->name) + 1; }
454 if (thmm.acc) { hmm->acc = strdup(p); p += strlen(hmm->acc) + 1; }
455 if (thmm.desc) { hmm->desc = strdup(p); p += strlen(hmm->desc) + 1; }
456
457 n = hmm->M + 2;
458 if (hmm->flags & p7H_RF) { memcpy(hmm->rf, p, n); p += n; }
459 if (hmm->flags & p7H_MMASK) { memcpy(hmm->mm, p, n); p += n; }
460 if (hmm->flags & p7H_CONS) { memcpy(hmm->consensus, p, n); p += n; }
461 if (hmm->flags & p7H_CS) { memcpy(hmm->cs, p, n); p += n; }
462 if (hmm->flags & p7H_CA) { memcpy(hmm->ca, p, n); p += n; }
463
464 n = sizeof(int) * (hmm->M + 1);
465 if (hmm->flags & p7H_MAP) { memcpy(hmm->map, p, n); p += n; }
466
467 query->hmm = hmm;
468 }
469
470 return query;
471 }
472
473 static void
process_Shutdown(HMMD_COMMAND_SHARD * cmd,WORKER_ENV * env)474 process_Shutdown(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env)
475 {
476 int n;
477
478 n = MSG_SIZE(cmd);
479 cmd->hdr.status = eslOK;
480 if (writen(env->fd, cmd, n) != n) {
481 LOG_FATAL_MSG("write error", errno);
482 }
483 }
484
485 static void
process_InitCmd(HMMD_COMMAND_SHARD * cmd,WORKER_ENV * env)486 process_InitCmd(HMMD_COMMAND_SHARD *cmd, WORKER_ENV *env)
487 {
488 char *p;
489 int n;
490 int status;
491
492 if (env->hmm_db != NULL) p7_hmmcache_Close(env->hmm_db);
493 if (env->seq_db != NULL) p7_seqcache_Close(env->seq_db);
494
495 env->hmm_db = NULL;
496 env->seq_db = NULL;
497
498 /* load the sequence database */
499 if (cmd->init.db_cnt != 0) {
500 P7_SEQCACHE *sdb = NULL;
501 //printf("This worker assigned shard %d out of %d\n", cmd->init.my_shard, cmd->init.num_shards);
502
503 p = cmd->init.data + cmd->init.seqdb_off;
504 // printf("Opening database file %s\n", p);
505 status = p7_seqcache_Open_shard(p, &sdb, NULL, cmd->init.my_shard, cmd->init.num_shards);
506 if (status != eslOK) {
507 p7_syslog(LOG_ERR,"[%s:%d] - p7_seqcache_Open %s error %d\n", __FILE__, __LINE__, p, status);
508 LOG_FATAL_MSG("cache seqdb error", status);
509 }
510 // printf("Database opened\n");
511 /* validate the sequence database */
512 cmd->init.sid[MAX_INIT_DESC-1] = 0;
513 if (strcmp (cmd->init.sid, sdb->id) != 0 || cmd->init.db_cnt != sdb->db_cnt /*|| cmd->init.seq_cnt != sdb->count*/) {
514 p7_syslog(LOG_ERR,"[%s:%d] - seq db %s: integrity error %s - %s\n", __FILE__, __LINE__, p, cmd->init.sid, sdb->id);
515 LOG_FATAL_MSG("database integrity error", 0);
516 }
517 // printf("Database validated\n");
518 env->seq_db = sdb;
519 }
520
521 /* load the hmm database */
522 if (cmd->init.hmm_cnt != 0) {
523 P7_HMMCACHE *hcache = NULL;
524 // printf("Opening HMM database %s\n", p);
525 p = cmd->init.data + cmd->init.hmmdb_off;
526
527 status = p7_hmmcache_Open(p, &hcache, NULL);
528 if (status != eslOK) {
529 p7_syslog(LOG_ERR,"[%s:%d] - p7_hmmcache_Open %s error %d\n", __FILE__, __LINE__, p, status);
530 LOG_FATAL_MSG("cache hmmdb error", status);
531 }
532
533 if ( (status = p7_hmmcache_SetNumericNames(hcache)) != eslOK){
534 p7_syslog(LOG_ERR,"[%s:%d] - p7_hmmcache_SetNumericNames %s error %d\n", __FILE__, __LINE__, p, status);
535 LOG_FATAL_MSG("cache hmmdb error", status);
536 }
537
538 /* validate the hmm database */
539 cmd->init.hid[MAX_INIT_DESC-1] = 0;
540 /* TODO: come up with a new pressed format with an id to compare - strcmp (cmd->init.hid, hdb->id) != 0 */
541 if (cmd->init.hmm_cnt != 1 || cmd->init.model_cnt != hcache->n) {
542 p7_syslog(LOG_ERR,"[%s:%d] - hmm db %s: integrity error\n", __FILE__, __LINE__, p);
543 LOG_FATAL_MSG("database integrity error", 0);
544 }
545
546 env->hmm_db = hcache;
547
548 printf("Loaded profile db %s; models: %d memory: %" PRId64 "\n",
549 p, hcache->n, (uint64_t) p7_hmmcache_Sizeof(hcache));
550
551 }
552
553 /* if stdout is redirected at the commandline, it causes printf's to be buffered,
554 * which means status logging isn't printed. This line strongly requests unbuffering,
555 * which should be ok, given the low stdout load of hmmpgmd
556 */
557 setvbuf (stdout, NULL, _IONBF, BUFSIZ);
558 printf("Data loaded into memory. Worker is ready.\n");
559 setvbuf (stdout, NULL, _IOFBF, BUFSIZ);
560
561
562 /* write back to the master that we are on line */
563 n = MSG_SIZE(cmd);
564 cmd->hdr.status = eslOK;
565 if (writen(env->fd, cmd, n) != n) {
566 LOG_FATAL_MSG("write error", errno);
567 }
568 }
569
570
571 static void
search_thread(void * arg)572 search_thread(void *arg)
573 {
574 int i;
575 int count;
576 int seed;
577 int status;
578 int workeridx;
579 WORKER_INFO *info;
580 ESL_THREADS *obj;
581 ESL_SQ dbsq;
582 ESL_STOPWATCH *w = NULL; /* timing stopwatch */
583 P7_BUILDER *bld = NULL; /* HMM construction configuration */
584 P7_BG *bg = NULL; /* null model */
585 P7_PIPELINE *pli = NULL; /* work pipeline */
586 P7_TOPHITS *th = NULL; /* top hit results */
587 P7_PROFILE *gm = NULL; /* generic model */
588 P7_OPROFILE *om = NULL; /* optimized query profile */
589
590 obj = (ESL_THREADS *) arg;
591 esl_threads_Started(obj, &workeridx);
592
593 info = (WORKER_INFO *) esl_threads_GetData(obj, workeridx);
594 w = esl_stopwatch_Create();
595 bg = p7_bg_Create(info->abc);
596 esl_stopwatch_Start(w);
597
598 /* set up the dummy description and accession fields */
599 dbsq.desc = "";
600 dbsq.acc = "";
601
602 /* process a query sequence or hmm */
603 if (info->seq != NULL) {
604 bld = p7_builder_Create(NULL, info->abc);
605 if ((seed = esl_opt_GetInteger(info->opts, "--seed")) > 0) {
606 esl_randomness_Init(bld->r, seed);
607 bld->do_reseeding = TRUE;
608 }
609 bld->EmL = esl_opt_GetInteger(info->opts, "--EmL");
610 bld->EmN = esl_opt_GetInteger(info->opts, "--EmN");
611 bld->EvL = esl_opt_GetInteger(info->opts, "--EvL");
612 bld->EvN = esl_opt_GetInteger(info->opts, "--EvN");
613 bld->EfL = esl_opt_GetInteger(info->opts, "--EfL");
614 bld->EfN = esl_opt_GetInteger(info->opts, "--EfN");
615 bld->Eft = esl_opt_GetReal (info->opts, "--Eft");
616
617 if (esl_opt_IsOn(info->opts, "--mxfile")) status = p7_builder_SetScoreSystem (bld, esl_opt_GetString(info->opts, "--mxfile"), NULL, esl_opt_GetReal(info->opts, "--popen"), esl_opt_GetReal(info->opts, "--pextend"), bg);
618 else status = p7_builder_LoadScoreSystem(bld, esl_opt_GetString(info->opts, "--mx"), esl_opt_GetReal(info->opts, "--popen"), esl_opt_GetReal(info->opts, "--pextend"), bg);
619 if (status != eslOK) {
620 //client_error(info->sock, status, "hmmgpmd: failed to set single query sequence score system: %s", bld->errbuf);
621 fprintf(stderr, "hmmpgmd: failed to set single query sequence score system: %s", bld->errbuf);
622 pthread_exit(NULL);
623 return;
624 }
625 p7_SingleBuilder(bld, info->seq, bg, NULL, NULL, NULL, &om); /* bypass HMM - only need model */
626 p7_builder_Destroy(bld);
627 } else {
628 gm = p7_profile_Create (info->hmm->M, info->abc);
629 om = p7_oprofile_Create(info->hmm->M, info->abc);
630 p7_ProfileConfig(info->hmm, bg, gm, 100, p7_LOCAL);
631 p7_oprofile_Convert(gm, om);
632 }
633
634 /* Create processing pipeline and hit list */
635 th = p7_tophits_Create();
636 pli = p7_pipeline_Create(info->opts, om->M, 100, FALSE, p7_SEARCH_SEQS);
637 p7_pli_NewModel(pli, om, bg);
638
639 if (pli->Z_setby == p7_ZSETBY_NTARGETS) pli->Z = info->db_Z;
640 if (info->range_list){
641 // printf("Worker thread starting range-list search\n");
642 }
643 /* loop until all sequences have been processed */
644 count = 1;
645 while (count > 0) {
646 int inx;
647 int blksz;
648 HMMER_SEQ **sq;
649
650 /* grab the next block of sequences */
651 if (pthread_mutex_lock(info->inx_mutex) != 0) p7_Fail("mutex lock failed");
652 inx = *info->inx;
653 blksz = *info->blk_size;
654 if (inx > *info->limit) {
655 blksz /= 5;
656 if (blksz < 1000) {
657 *info->limit = info->sq_cnt * 2;
658 } else {
659 *info->limit = inx + (info->sq_cnt - inx) * 2 / 3;
660 }
661 }
662 *info->blk_size = blksz;
663 *info->inx += blksz;
664 if (pthread_mutex_unlock(info->inx_mutex) != 0) p7_Fail("mutex unlock failed");
665
666 sq = info->sq_list + inx;
667
668 count = info->sq_cnt - inx;
669 if (count > blksz) count = blksz;
670
671 /* Main loop: */
672 for (i = 0; i < count; ++i, ++sq) {
673 /* if( strtol((*sq)->name, NULL, 10) != (*sq)->idx){
674 printf("Sequence with index %d had name %s\n", (*sq)->idx, (*sq)->name);
675 } */
676 if ( !(info->range_list) || hmmpgmd_IsWithinRanges ((*sq)->idx, info->range_list)) {
677 dbsq.name = (*sq)->name;
678 dbsq.dsq = (*sq)->dsq;
679 dbsq.n = (*sq)->n;
680 dbsq.idx = (*sq)->idx;
681 if((*sq)->desc != NULL) dbsq.desc = (*sq)->desc;
682
683 p7_bg_SetLength(bg, dbsq.n);
684 p7_oprofile_ReconfigLength(om, dbsq.n);
685
686 p7_Pipeline(pli, om, bg, &dbsq, NULL, th);
687
688 p7_pipeline_Reuse(pli);
689 }
690 }
691 }
692
693 /* make available the pipeline objects to the main thread */
694 info->th = th;
695 info->pli = pli;
696
697 /* clean up */
698 p7_bg_Destroy(bg);
699 p7_oprofile_Destroy(om);
700
701 if (gm != NULL) p7_profile_Destroy(gm);
702
703 esl_stopwatch_Stop(w);
704 info->elapsed = w->elapsed;
705
706 esl_stopwatch_Destroy(w);
707
708 esl_threads_Finished(obj, workeridx);
709
710 pthread_exit(NULL);
711 return;
712 }
713
714 static void
scan_thread(void * arg)715 scan_thread(void *arg)
716 {
717 int i;
718 int count;
719 int workeridx;
720 WORKER_INFO *info;
721 ESL_THREADS *obj;
722
723 ESL_STOPWATCH *w;
724
725 P7_BG *bg = NULL; /* null model */
726 P7_PIPELINE *pli = NULL; /* work pipeline */
727 P7_TOPHITS *th = NULL; /* top hit results */
728
729 obj = (ESL_THREADS *) arg;
730 esl_threads_Started(obj, &workeridx);
731
732 info = (WORKER_INFO *) esl_threads_GetData(obj, workeridx);
733
734 w = esl_stopwatch_Create();
735 esl_stopwatch_Start(w);
736
737 /* Convert to an optimized model */
738 bg = p7_bg_Create(info->abc);
739
740 /* Create processing pipeline and hit list */
741 th = p7_tophits_Create();
742 pli = p7_pipeline_Create(info->opts, 100, 100, FALSE, p7_SCAN_MODELS);
743
744 p7_pli_NewSeq(pli, info->seq);
745
746 /* loop until all sequences have been processed */
747 count = 1;
748 while (count > 0) {
749 int inx;
750 int blksz;
751 P7_OPROFILE **om;
752
753 /* grab the next block of sequences */
754 if (pthread_mutex_lock(info->inx_mutex) != 0) p7_Fail("mutex lock failed");
755 inx = *info->inx;
756 blksz = *info->blk_size;
757 if (inx > *info->limit) {
758 blksz /= 5;
759 if (blksz < 1000) {
760 *info->limit = info->om_cnt * 2;
761 } else {
762 *info->limit = inx + (info->om_cnt - inx) * 2 / 3;
763 }
764 }
765 *info->blk_size = blksz;
766 *info->inx += blksz;
767 if (pthread_mutex_unlock(info->inx_mutex) != 0) p7_Fail("mutex unlock failed");
768
769 om = info->om_list + inx;
770 count = info->om_cnt - inx;
771 if (count > blksz) count = blksz;
772
773 /* Main loop: */
774 for (i = 0; i < count; ++i, ++om) {
775 p7_pli_NewModel(pli, *om, bg);
776 p7_bg_SetLength(bg, info->seq->n);
777 p7_oprofile_ReconfigLength(*om, info->seq->n);
778
779 p7_Pipeline(pli, *om, bg, info->seq, NULL, th);
780 p7_pipeline_Reuse(pli);
781 }
782 }
783
784 /* make available the pipeline objects to the main thread */
785 info->th = th;
786 info->pli = pli;
787
788 /* clean up */
789 p7_bg_Destroy(bg);
790
791 esl_stopwatch_Stop(w);
792 info->elapsed = w->elapsed;
793
794 esl_stopwatch_Destroy(w);
795
796 esl_threads_Finished(obj, workeridx);
797
798 pthread_exit(NULL);
799 return;
800 }
801
802
803 static void
send_results(int fd,ESL_STOPWATCH * w,P7_TOPHITS * th,P7_PIPELINE * pli)804 send_results(int fd, ESL_STOPWATCH *w, P7_TOPHITS *th, P7_PIPELINE *pli){
805 HMMD_SEARCH_STATS stats;
806 HMMD_SEARCH_STATUS status;
807 uint8_t **buf = NULL; // Buffer for the main results message
808 uint8_t **buf2 = NULL; // Buffer for the initial HMMD_SEARCH_STATUS message
809 uint8_t *buf_ptr = NULL;
810 uint8_t *buf2_ptr = NULL;
811 uint32_t n = 0; // index within buffer of serialized data
812 uint32_t nalloc = 0; // Size of serialized buffer
813
814 // set up handles to buffers
815 buf = &buf_ptr;
816 buf2 = &buf2_ptr;
817
818
819 /* Start filling out the fixed-length structures to be sent */
820 memset(&status, 0, sizeof(HMMD_SEARCH_STATUS)); /* silence valgrind errors - zero out entire structure including its padding */
821 status.status = eslOK;
822 status.msg_size = sizeof(stats);
823
824 /* copy the search stats */
825 stats.elapsed = w->elapsed;
826 stats.user = w->user;
827 stats.sys = w->sys;
828
829 stats.nmodels = pli->nmodels;
830 stats.nseqs = pli->nseqs;
831 stats.n_past_msv = pli->n_past_msv;
832 stats.n_past_bias = pli->n_past_bias;
833 stats.n_past_vit = pli->n_past_vit;
834 stats.n_past_fwd = pli->n_past_fwd;
835
836 stats.Z = pli->Z;
837 stats.domZ = pli->domZ;
838 stats.Z_setby = pli->Z_setby;
839 stats.domZ_setby = pli->domZ_setby;
840
841 stats.nhits = th->N;
842 stats.nreported = th->nreported;
843 stats.nincluded = th->nincluded;
844 stats.hit_offsets = NULL; // This field is only used when sending results back to the client
845
846 // Build the main buffer of result data, starting with the stats object
847 if(p7_hmmd_search_stats_Serialize(&stats, buf, &n, &nalloc) != eslOK){
848 LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATS failed", errno);
849 }
850
851 // and then the hits
852 for(int i =0; i< stats.nhits; i++){
853 if(p7_hit_Serialize(&(th->unsrt[i]), buf, &n, &nalloc) != eslOK){
854 LOG_FATAL_MSG("Serializing P7_HIT failed", errno);
855 }
856 }
857
858 status.msg_size = n; // n will have the number of bytes used to serialize the main data block
859 n = 0;
860 nalloc = 0; // reset these to serialize status object
861
862 // Serialize the search_status object
863 if(hmmd_search_status_Serialize(&status, buf2, &n, &nalloc) != eslOK){
864 LOG_FATAL_MSG("Serializing HMMD_SEARCH_STATUS failed", errno);
865 }
866
867 // Send the status object
868 if (writen(fd, buf2_ptr, n) != n) LOG_FATAL_MSG("write", errno);
869
870 // And the serialized data
871 if (writen(fd, buf_ptr, status.msg_size) != status.msg_size) LOG_FATAL_MSG("write", errno);
872 free(buf_ptr);
873 free(buf2_ptr);
874 printf("Bytes: %" PRId64 " hits: %" PRId64 " sent on socket %d\n", status.msg_size, stats.nhits, fd);
875 fflush(stdout);
876 }
877
878 static int
setup_masterside_comm(ESL_GETOPTS * opts)879 setup_masterside_comm(ESL_GETOPTS *opts)
880 {
881 int fd = -1;
882 int cnt;
883 int sec;
884 int connected;
885
886 struct sockaddr_in addr;
887
888 /* create a reliable, stream socket using TCP */
889 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) LOG_FATAL_MSG("socket", errno);
890
891 /* construct the server address structure */
892 memset(&addr, 0, sizeof(addr));
893 addr.sin_family = AF_INET;
894 addr.sin_port = htons(esl_opt_GetInteger(opts, "--wport"));
895 if ((inet_pton(AF_INET, esl_opt_GetString(opts, "--worker"), &addr.sin_addr)) < 0) LOG_FATAL_MSG("inet pton", errno);
896
897 /* try to connect to the master server */
898 cnt = 0;
899 sec = 1;
900 connected = -1;
901 while (connected < 0) {
902 /* establish the connection to the master server */
903 if ((connected = connect(fd, (struct sockaddr *) &addr, sizeof(addr))) < 0) {
904 if (errno != ECONNREFUSED) LOG_FATAL_MSG("connect", errno);
905
906 /* the master server is not listening. sleep and try again */
907 sleep(sec);
908 if (++cnt > 10) {
909 cnt = 0;
910 if (sec < 64) sec *= 2;
911 }
912 }
913 }
914
915 return fd;
916 }
917
918 #endif /*HMMER_THREADS*/
919
920
921
922