1 /*
2  * hhblits_mpi.cpp
3  *
4  *  Created on: Apr 1, 2014
5  *      Author: Markus Meier (markus.meier@mpibpc.mpg.de)
6  */
7 
8 #include "hhsearch.h"
9 #include "hhalign.h"
10 
11 #include <mpi.h>
12 
13 extern "C" {
14 #include <ffindex.h>
15 #include <mpq/mpq.h>
16 }
17 
18 #ifdef OPENMP
19 #include <omp.h>
20 #endif
21 
22 struct OutputFFIndex {
23     char base[NAMELEN];
24     FILE *data_fh;
25     FILE *index_fh;
26     size_t offset;
27 
28     void (*print)(HHblits &, std::stringstream &);
29 
closeOutputFFIndex30     void close() {
31         fclose(data_fh);
32         fclose(index_fh);
33     }
34 
saveOutputOutputFFIndex35     void saveOutput(HHblits &hhblits, char *name) {
36         std::stringstream out;
37         print(hhblits, out);
38 
39         std::string tmp = out.str();
40         ffindex_insert_memory(data_fh, index_fh, &offset,
41                               const_cast<char *>(tmp.c_str()), tmp.size(), name);
42 
43         fflush(data_fh);
44         fflush(index_fh);
45     }
46 };
47 
makeOutputFFIndex(char * par,const int mpi_rank,void (* print)(HHblits &,std::stringstream &),std::vector<OutputFFIndex> & outDatabases)48 void makeOutputFFIndex(char *par, const int mpi_rank,
49                        void (*print)(HHblits &, std::stringstream &),
50                        std::vector<OutputFFIndex> &outDatabases) {
51     if (*par) {
52         OutputFFIndex db;
53 
54         strcpy(db.base, par);
55         db.offset = 0;
56         db.print = print;
57 
58         char data_filename_out_rank[NAMELEN];
59         char index_filename_out_rank[NAMELEN];
60 
61         snprintf(data_filename_out_rank, FILENAME_MAX, "%s.ffdata.%d", par,
62                  mpi_rank);
63         snprintf(index_filename_out_rank, FILENAME_MAX, "%s.ffindex.%d", par,
64                  mpi_rank);
65 
66         db.data_fh = fopen(data_filename_out_rank, "w+");
67         db.index_fh = fopen(index_filename_out_rank, "w+");
68 
69         if (db.data_fh == NULL) {
70             HH_LOG(WARNING) << "Could not open datafile " << data_filename_out_rank << std::endl;
71             return;
72         }
73 
74         if (db.index_fh == NULL) {
75             HH_LOG(WARNING) << "Could not open indexfile " << index_filename_out_rank << std::endl;
76             return;
77         }
78 
79         outDatabases.push_back(db);
80     }
81 }
82 
merge_splits(const char * prefix)83 void merge_splits(const char *prefix) {
84     if (*prefix) {
85         char data_filename[FILENAME_MAX];
86         char index_filename[FILENAME_MAX];
87 
88         snprintf(data_filename, FILENAME_MAX, "%s.ffdata", prefix);
89         snprintf(index_filename, FILENAME_MAX, "%s.ffindex", prefix);
90 
91         ffmerge_splits(data_filename, index_filename, 1, MPQ_size - 1, true);
92     }
93 }
94 
95 struct HHblits_MPQ_Wrapper {
96     char *data;
97     ffindex_index_t *index;
98     HHblits *hhblits;
99     std::vector<OutputFFIndex> *outputDatabases;
100 
HHblits_MPQ_WrapperHHblits_MPQ_Wrapper101     HHblits_MPQ_Wrapper(char *data, ffindex_index_t *index, HHblits &hhblits,
102                         std::vector<OutputFFIndex> &outputDatabases) {
103         this->data = data;
104         this->index = index;
105         this->hhblits = &hhblits;
106         this->outputDatabases = &outputDatabases;
107     }
108 
PayloadHHblits_MPQ_Wrapper109     void Payload(const size_t start, const size_t end) {
110         // Foreach entry in the input file
111         for (size_t entry_index = start; entry_index < end; entry_index++) {
112             ffindex_entry_t *entry = ffindex_get_entry_by_index(index, entry_index);
113             if (entry == NULL) {
114                 continue;
115             }
116 
117             hhblits->Reset();
118 
119             FILE *inf = ffindex_fopen_by_entry(data, entry);
120             hhblits->run(inf, entry->name);
121             fclose(inf);
122 
123             for (size_t i = 0; i < outputDatabases->size(); i++) {
124                 outputDatabases->operator[](i).saveOutput(*hhblits, entry->name);
125             }
126         }
127     }
128 };
129 
payload(void * env,const size_t start,const size_t end)130 void static payload(void *env, const size_t start, const size_t end) {
131     HHblits_MPQ_Wrapper *hhblits_wrapper = (HHblits_MPQ_Wrapper *) env;
132     hhblits_wrapper->Payload(start, end);
133 }
134 
main(int argc,const char ** argv)135 int main(int argc, const char **argv) {
136     Parameters par(argc, argv);
137 #ifdef HHSEARCH
138     HHsearch::ProcessAllArguments(par);
139 #elif HHALIGN
140     HHalign::ProcessAllArguments(par);
141 #else
142     HHblits::ProcessAllArguments(par);
143 #endif
144 
145     // hhblits_mpi will be parallelized with openmpi, no other parallelization
146     par.threads = 1;
147 #ifdef OPENMP
148     omp_set_num_threads(1);
149 #endif
150 
151     std::string data_filename(par.infile);
152     data_filename.append(".ffdata");
153 
154     std::string index_filename(par.infile);
155     index_filename.append(".ffindex");
156 
157     FFindexDatabase reader(data_filename.c_str(), index_filename.c_str(), false);
158     reader.ensureLinearAccess();
159 
160     int mpq_status = MPQ_Init(argc, argv, reader.db_index->n_entries);
161 
162     if (mpq_status == MPQ_SUCCESS) {
163         if (MPQ_rank == MPQ_MASTER) {
164             MPQ_Master(1);
165         } else {
166             std::vector<OutputFFIndex> outputDatabases;
167             makeOutputFFIndex(par.outfile, MPQ_rank, &HHblits::writeHHRFile,
168                               outputDatabases);
169             makeOutputFFIndex(par.scorefile, MPQ_rank, &HHblits::writeScoresFile,
170                               outputDatabases);
171             makeOutputFFIndex(par.pairwisealisfile, MPQ_rank,
172                               &HHblits::writePairwiseAlisFile, outputDatabases);
173             makeOutputFFIndex(par.alitabfile, MPQ_rank, &HHblits::writeAlitabFile,
174                               outputDatabases);
175             makeOutputFFIndex(par.psifile, MPQ_rank, &HHblits::writePsiFile,
176                               outputDatabases);
177             makeOutputFFIndex(par.hhmfile, MPQ_rank, &HHblits::writeHMMFile,
178                               outputDatabases);
179             makeOutputFFIndex(par.alnfile, MPQ_rank, &HHblits::writeA3MFile,
180                               outputDatabases);
181             makeOutputFFIndex(par.matrices_output_file, MPQ_rank, &HHblits::writeMatricesFile,
182                               outputDatabases);
183             makeOutputFFIndex(par.m8file, MPQ_rank, &HHblits::writeM8,
184                               outputDatabases);
185 
186             std::vector<HHblitsDatabase*> databases;
187 #ifdef HHSEARCH
188             HHsearch::prepareDatabases(par, databases);
189             HHblits app(par, databases);
190 #elif HHalign
191             HHalign app(par);
192 #else
193             HHblits::prepareDatabases(par, databases);
194             HHblits app(par, databases);
195 #endif
196 
197             HHblits_MPQ_Wrapper wrapper(reader.db_data, reader.db_index, app, outputDatabases);
198             MPQ_Worker(payload, &wrapper);
199 
200             for (size_t i = 0; i < outputDatabases.size(); i++) {
201                 outputDatabases[i].close();
202             }
203             for (size_t i = 0; i < databases.size(); i++) {
204                 delete databases[i];
205             }
206         }
207 
208         MPI_Barrier(MPI_COMM_WORLD);
209 
210         if (MPQ_rank == MPQ_MASTER) {
211             merge_splits(par.outfile);
212             merge_splits(par.scorefile);
213             merge_splits(par.pairwisealisfile);
214             merge_splits(par.alitabfile);
215             merge_splits(par.psifile);
216             merge_splits(par.hhmfile);
217             merge_splits(par.alnfile);
218             merge_splits(par.matrices_output_file);
219             merge_splits(par.m8file);
220         }
221     } else {
222         if (mpq_status == MPQ_ERROR_NO_WORKERS) {
223             fprintf(stderr, "MPQ_Init: Needs at least one worker process.\n");
224             exit(EXIT_FAILURE);
225         }
226     }
227 
228     MPI_Finalize();
229     return EXIT_SUCCESS;
230 }
231 
232