1 /*
2 * hhblits_mpi.cpp
3 *
4 * Created on: Apr 1, 2014
5 * Author: Markus Meier (markus.meier@mpibpc.mpg.de)
6 */
7
8 #include "hhsearch.h"
9 #include "hhalign.h"
10
11 #include <mpi.h>
12
13 extern "C" {
14 #include <ffindex.h>
15 #include <mpq/mpq.h>
16 }
17
18 #ifdef OPENMP
19 #include <omp.h>
20 #endif
21
22 struct OutputFFIndex {
23 char base[NAMELEN];
24 FILE *data_fh;
25 FILE *index_fh;
26 size_t offset;
27
28 void (*print)(HHblits &, std::stringstream &);
29
closeOutputFFIndex30 void close() {
31 fclose(data_fh);
32 fclose(index_fh);
33 }
34
saveOutputOutputFFIndex35 void saveOutput(HHblits &hhblits, char *name) {
36 std::stringstream out;
37 print(hhblits, out);
38
39 std::string tmp = out.str();
40 ffindex_insert_memory(data_fh, index_fh, &offset,
41 const_cast<char *>(tmp.c_str()), tmp.size(), name);
42
43 fflush(data_fh);
44 fflush(index_fh);
45 }
46 };
47
makeOutputFFIndex(char * par,const int mpi_rank,void (* print)(HHblits &,std::stringstream &),std::vector<OutputFFIndex> & outDatabases)48 void makeOutputFFIndex(char *par, const int mpi_rank,
49 void (*print)(HHblits &, std::stringstream &),
50 std::vector<OutputFFIndex> &outDatabases) {
51 if (*par) {
52 OutputFFIndex db;
53
54 strcpy(db.base, par);
55 db.offset = 0;
56 db.print = print;
57
58 char data_filename_out_rank[NAMELEN];
59 char index_filename_out_rank[NAMELEN];
60
61 snprintf(data_filename_out_rank, FILENAME_MAX, "%s.ffdata.%d", par,
62 mpi_rank);
63 snprintf(index_filename_out_rank, FILENAME_MAX, "%s.ffindex.%d", par,
64 mpi_rank);
65
66 db.data_fh = fopen(data_filename_out_rank, "w+");
67 db.index_fh = fopen(index_filename_out_rank, "w+");
68
69 if (db.data_fh == NULL) {
70 HH_LOG(WARNING) << "Could not open datafile " << data_filename_out_rank << std::endl;
71 return;
72 }
73
74 if (db.index_fh == NULL) {
75 HH_LOG(WARNING) << "Could not open indexfile " << index_filename_out_rank << std::endl;
76 return;
77 }
78
79 outDatabases.push_back(db);
80 }
81 }
82
merge_splits(const char * prefix)83 void merge_splits(const char *prefix) {
84 if (*prefix) {
85 char data_filename[FILENAME_MAX];
86 char index_filename[FILENAME_MAX];
87
88 snprintf(data_filename, FILENAME_MAX, "%s.ffdata", prefix);
89 snprintf(index_filename, FILENAME_MAX, "%s.ffindex", prefix);
90
91 ffmerge_splits(data_filename, index_filename, 1, MPQ_size - 1, true);
92 }
93 }
94
95 struct HHblits_MPQ_Wrapper {
96 char *data;
97 ffindex_index_t *index;
98 HHblits *hhblits;
99 std::vector<OutputFFIndex> *outputDatabases;
100
HHblits_MPQ_WrapperHHblits_MPQ_Wrapper101 HHblits_MPQ_Wrapper(char *data, ffindex_index_t *index, HHblits &hhblits,
102 std::vector<OutputFFIndex> &outputDatabases) {
103 this->data = data;
104 this->index = index;
105 this->hhblits = &hhblits;
106 this->outputDatabases = &outputDatabases;
107 }
108
PayloadHHblits_MPQ_Wrapper109 void Payload(const size_t start, const size_t end) {
110 // Foreach entry in the input file
111 for (size_t entry_index = start; entry_index < end; entry_index++) {
112 ffindex_entry_t *entry = ffindex_get_entry_by_index(index, entry_index);
113 if (entry == NULL) {
114 continue;
115 }
116
117 hhblits->Reset();
118
119 FILE *inf = ffindex_fopen_by_entry(data, entry);
120 hhblits->run(inf, entry->name);
121 fclose(inf);
122
123 for (size_t i = 0; i < outputDatabases->size(); i++) {
124 outputDatabases->operator[](i).saveOutput(*hhblits, entry->name);
125 }
126 }
127 }
128 };
129
payload(void * env,const size_t start,const size_t end)130 void static payload(void *env, const size_t start, const size_t end) {
131 HHblits_MPQ_Wrapper *hhblits_wrapper = (HHblits_MPQ_Wrapper *) env;
132 hhblits_wrapper->Payload(start, end);
133 }
134
main(int argc,const char ** argv)135 int main(int argc, const char **argv) {
136 Parameters par(argc, argv);
137 #ifdef HHSEARCH
138 HHsearch::ProcessAllArguments(par);
139 #elif HHALIGN
140 HHalign::ProcessAllArguments(par);
141 #else
142 HHblits::ProcessAllArguments(par);
143 #endif
144
145 // hhblits_mpi will be parallelized with openmpi, no other parallelization
146 par.threads = 1;
147 #ifdef OPENMP
148 omp_set_num_threads(1);
149 #endif
150
151 std::string data_filename(par.infile);
152 data_filename.append(".ffdata");
153
154 std::string index_filename(par.infile);
155 index_filename.append(".ffindex");
156
157 FFindexDatabase reader(data_filename.c_str(), index_filename.c_str(), false);
158 reader.ensureLinearAccess();
159
160 int mpq_status = MPQ_Init(argc, argv, reader.db_index->n_entries);
161
162 if (mpq_status == MPQ_SUCCESS) {
163 if (MPQ_rank == MPQ_MASTER) {
164 MPQ_Master(1);
165 } else {
166 std::vector<OutputFFIndex> outputDatabases;
167 makeOutputFFIndex(par.outfile, MPQ_rank, &HHblits::writeHHRFile,
168 outputDatabases);
169 makeOutputFFIndex(par.scorefile, MPQ_rank, &HHblits::writeScoresFile,
170 outputDatabases);
171 makeOutputFFIndex(par.pairwisealisfile, MPQ_rank,
172 &HHblits::writePairwiseAlisFile, outputDatabases);
173 makeOutputFFIndex(par.alitabfile, MPQ_rank, &HHblits::writeAlitabFile,
174 outputDatabases);
175 makeOutputFFIndex(par.psifile, MPQ_rank, &HHblits::writePsiFile,
176 outputDatabases);
177 makeOutputFFIndex(par.hhmfile, MPQ_rank, &HHblits::writeHMMFile,
178 outputDatabases);
179 makeOutputFFIndex(par.alnfile, MPQ_rank, &HHblits::writeA3MFile,
180 outputDatabases);
181 makeOutputFFIndex(par.matrices_output_file, MPQ_rank, &HHblits::writeMatricesFile,
182 outputDatabases);
183 makeOutputFFIndex(par.m8file, MPQ_rank, &HHblits::writeM8,
184 outputDatabases);
185
186 std::vector<HHblitsDatabase*> databases;
187 #ifdef HHSEARCH
188 HHsearch::prepareDatabases(par, databases);
189 HHblits app(par, databases);
190 #elif HHalign
191 HHalign app(par);
192 #else
193 HHblits::prepareDatabases(par, databases);
194 HHblits app(par, databases);
195 #endif
196
197 HHblits_MPQ_Wrapper wrapper(reader.db_data, reader.db_index, app, outputDatabases);
198 MPQ_Worker(payload, &wrapper);
199
200 for (size_t i = 0; i < outputDatabases.size(); i++) {
201 outputDatabases[i].close();
202 }
203 for (size_t i = 0; i < databases.size(); i++) {
204 delete databases[i];
205 }
206 }
207
208 MPI_Barrier(MPI_COMM_WORLD);
209
210 if (MPQ_rank == MPQ_MASTER) {
211 merge_splits(par.outfile);
212 merge_splits(par.scorefile);
213 merge_splits(par.pairwisealisfile);
214 merge_splits(par.alitabfile);
215 merge_splits(par.psifile);
216 merge_splits(par.hhmfile);
217 merge_splits(par.alnfile);
218 merge_splits(par.matrices_output_file);
219 merge_splits(par.m8file);
220 }
221 } else {
222 if (mpq_status == MPQ_ERROR_NO_WORKERS) {
223 fprintf(stderr, "MPQ_Init: Needs at least one worker process.\n");
224 exit(EXIT_FAILURE);
225 }
226 }
227
228 MPI_Finalize();
229 return EXIT_SUCCESS;
230 }
231
232