1LoadFunctionLibrary ("../all-terms.bf");
2LoadFunctionLibrary ("../models/parameters.bf");
3LoadFunctionLibrary ("../IOFunctions.bf");
4
5/** @module mpi
6        Functions for creating, populating, and manipulating
7        MPI job queues. In the absence of an MPI environment
8        the jobs are executed serially
9 */
10
11
12
13namespace mpi {
14    job_id = 0;
15
16    function NodeCount () {
17        return utility.GetEnvVariable ("MPI_NODE_COUNT");
18    }
19
20    function IsMasterNode () {
21        return utility.GetEnvVariable ("MPI_NODE_ID") == 0;
22    }
23
24    function get_next_job_id () {
25        job_id += 1;
26        return job_id;
27    }
28
29    /** Partition a list of tasks into approximately (+/- 1) equal subtasks
30     * @name mpi.PartitionIntoBlocks
31     * @param  {Dict} object
32     *      key -> task specification
33     * @return {Dict}
34            node -> list of task specifications
35     */
36
37    lfunction PartitionIntoBlocks (object) {
38        if (Type (object) == "AssociativeList") {
39            mpi_node_count = utility.GetEnvVariable ("MPI_NODE_COUNT");
40            if (mpi_node_count > 1) {
41                return_object = {};
42                task_count = utility.Array1D (object);
43                task_keys  = utility.Keys (object);
44                slice_size = Max (1, task_count $ mpi_node_count);
45                roundoff   = Max (0, task_count - mpi_node_count * slice_size);
46                current_index = 0;
47
48                for (n = 0; n < mpi_node_count; n += 1) {
49                    node_tasks = {};
50                    if (current_index < task_count) {
51                        for (i = 0; i < slice_size; i+=1) {
52                            node_tasks[task_keys[current_index]] = object [task_keys[current_index]];
53                            current_index += 1;
54                        }
55                        if (n < roundoff) {
56                            node_tasks[task_keys[current_index]] = object [task_keys[current_index]];
57                            current_index += 1;
58                        }
59                    }
60                    return_object [n] = node_tasks;
61                }
62                return return_object;
63            }
64            return { "0" : object};
65        }
66        return object;
67    }
68
69
70    lfunction CreateQueue (nodesetup) {
71        /** create and return an empty FIFO queue for MPI jobs
72         * @name mpi.CreateQueue
73         * @param  {Dict} nodesetup
74         *      controls what gets passed to slave nodes
75         *      "Headers" -> iterable (matrix/dict) of string paths of header files to load
76         *      "Models" ->  matrix of model names to make available to slave nodes
77         *      "Filters" ->  matrix of filter names to make available to slave nodes
78         *      "LikelihoodFunctions" -> iterable (matrix/dict) of LikelihoodFunction IDs to export to slave nodes
79         * @return {Dict} an "opaque" queue structure
80         */
81
82        mpi_node_count = utility.GetEnvVariable ("MPI_NODE_COUNT");
83
84        queue = {};
85        send_to_nodes = "";
86        if (mpi_node_count > 1) {
87
88            if (None != nodesetup) {
89                if (Abs (nodesetup)) {
90
91                    utility.SetEnvVariable ("LF_NEXUS_EXPORT_EXTRA",
92                                            'PRESERVE_SLAVE_NODE_STATE = TRUE; MPI_NEXUS_FILE_RETURN = "None";');
93
94                    send_to_nodes * 128;
95
96
97                    utility.ForEach (nodesetup[utility.getGlobalValue("terms.mpi.LikelihoodFunctions")], "_value_",
98                                     '
99                                        ExecuteCommands ("Export (create_queue.temp, " + _value_ + ")");
100                                        for (`&k` = 1; `&k` < `mpi_node_count`; `&k` += 1) {
101                                           MPISend (`&k`, create_queue.temp);
102                                        }
103                                        for (`&k` = 1; `&k` < `mpi_node_count`; `&k` += 1) {
104                                            MPIReceive (-1, ignore, ignore);
105                                        }
106
107                                     ');
108
109
110                    if (utility.Has (nodesetup, utility.getGlobalValue("terms.mpi.Headers"), None)) {
111                        send_to_nodes * "PRESERVE_SLAVE_NODE_STATE = TRUE;\n";
112                        send_to_nodes * (Join (";\n",utility.Map (nodesetup[utility.getGlobalValue("terms.mpi.Headers")], "_value_", "'LoadFunctionLibrary(\"' + _value_ +'\")'")) + ";");
113                    }
114
115                    if (utility.Has (nodesetup, utility.getGlobalValue("terms.mpi.Functions"), None)) {
116                        send_to_nodes * "PRESERVE_SLAVE_NODE_STATE = TRUE;\n";
117                        utility.ForEach (nodesetup[utility.getGlobalValue("terms.mpi.Functions")], "_value_",
118                            '
119                                ExecuteCommands ("Export (_test_id_," + _value_ + ")");
120                                `&send_to_nodes` * _test_id_;
121                            '
122                        );
123                    }
124
125                    if (utility.Has (nodesetup, utility.getGlobalValue("terms.mpi.Variables"), None)) {
126                        utility.ForEach (nodesetup[utility.getGlobalValue("terms.mpi.Variables")], "_value_",
127                            '
128                                `&send_to_nodes` * ("\n" + _value_ + " = " +  (parameters.Quote(^_value_)) + ";\n") ;
129                            '
130                        );
131                    }
132
133                    if (utility.Has (nodesetup, utility.getGlobalValue("terms.mpi.DataSetFilters"), None)) {
134                        utility.SetEnvVariable ("DATA_FILE_PRINT_FORMAT",9);
135                        utility.ForEach (nodesetup[utility.getGlobalValue("terms.mpi.DataSetFilters")], "_value_",
136                            '
137                                Export (serialized_filter, ^_value_);
138                                 `&send_to_nodes` * ("\nDataSet __private_" + _value_ + " = ReadFromString (\'" + (serialized_filter&&2)  + "\'); DataSetFilter " + _value_ + " = CreateFilter (__private_" + _value_ + ",1);");
139                            '
140                        );
141                    }
142
143                    model_count = utility.Array1D (nodesetup[utility.getGlobalValue("terms.mpi.Models")]);
144
145                    if (model_count) {
146                        send_to_nodes * "PRESERVE_SLAVE_NODE_STATE = TRUE;\n";
147
148                        globals_to_export = {};
149                        functions_to_export = {};
150
151                        for (m = 0; m < model_count; m+=1) {
152                            model_name = (nodesetup[utility.getGlobalValue("terms.mpi.Models")])[m];
153                            model_globals = utility.UniqueValues(((^model_name)[utility.getGlobalValue("terms.parameters")])[utility.getGlobalValue("terms.global")]);
154                            model_global_count = utility.Array1D (model_globals);
155                            for (v = 0; v < model_global_count; v+=1) {
156                                globals_to_export [model_globals[v]] = 1;
157                            }
158
159                            utility.ForEach ({{ utility.getGlobalValue("terms.model.get_branch_length"), utility.getGlobalValue("terms.model.set_branch_length") }}, "_value_",
160                            '
161                                _test_id_ = (^(`&model_name`))[_value_];
162                                if (Type (_test_id_) == "String" && Abs (_test_id_) > 0) {
163                                    `&functions_to_export` [_test_id_] = 1;
164                                }
165                            ');
166                        }
167
168                        utility.ForEach (utility.Keys(globals_to_export), "_value_",
169                            '
170                                `&send_to_nodes` * parameters.ExportParameterDefinition (_value_);
171                            '
172                        );
173
174                        utility.ForEach (utility.Keys(functions_to_export), "_value_",
175                            '
176                                ExecuteCommands ("Export (_test_id_," + _value_ + ")");
177                                `&send_to_nodes` * _test_id_;
178                            '
179                        );
180                    }
181
182
183                    send_to_nodes * 0;
184                    queue ["cache"] = {};
185
186                }
187            }
188
189            if (Abs (send_to_nodes)) {
190
191                for (k = 1; k < mpi_node_count; k += 1) {
192                   MPISend (k, send_to_nodes);
193                }
194                for (k = 1; k < mpi_node_count; k += 1) {
195                   MPIReceive (-1, ignore, ignore);
196                }
197            }
198
199            //assert (0);
200            for (k = 1; k < mpi_node_count; k += 1) {
201                queue [k] = {utility.getGlobalValue("terms.mpi.job_id") : None, utility.getGlobalValue("terms.mpi.callback") : None, utility.getGlobalValue("terms.mpi.arguments"): None};
202            }
203
204            // this will store jobs that were previously sent to each node; avoiding redefinition if possible
205        }
206        return queue;
207    }
208
209
210    lfunction QueueJob (queue, job, arguments, result_callback) {
211
212        /**
213            send the job function with provided arguments to
214            the first available node.
215
216            When the job is finished; call the "result_callback" function
217
218        */
219
220        mpi_node_count = utility.GetEnvVariable ("MPI_NODE_COUNT");
221
222        if (mpi_node_count > 1) {
223            for (node = 1; node < mpi_node_count; node += 1) {
224                if (None == (queue [node])[utility.getGlobalValue("terms.mpi.job_id")]) {
225                    break;
226                }
227            }
228
229            if (node == mpi_node_count) {
230                node = aux._handle_receieve (queue);
231            }
232
233            complete_function_dump = None;
234
235            if (utility.Has (queue, "cache" , "AssociativeList")) {
236                if ((queue["cache"])[node] == job) {
237                    complete_function_dump = "";
238                    //console.log ("CACHED MPI preamble");
239                } else {
240                    (queue["cache"])[node] = job;
241                }
242            }
243
244            if (None == complete_function_dump) {
245                complete_function_dump = aux.queue_export_function (job);
246            }
247
248            //console.log (complete_function_dump);
249            job_id = get_next_job_id();
250            //fprintf (stdout, "Sending to node ", node, "\n");
251            queue [node] = {utility.getGlobalValue("terms.mpi.job_id") : job_id, utility.getGlobalValue("terms.mpi.callback") : result_callback, utility.getGlobalValue("terms.mpi.arguments") : arguments};
252             MPISend (node, complete_function_dump + "; return " + job + '(' + Join (",",utility.Map (arguments,"_value_", "utility.convertToArgumentString (_value_)")) + ')');
253
254        } else {
255
256            //console.log(job);
257            //console.log(arguments);
258            //console.log(result_callback);
259            //exit();
260            Call (result_callback, 0, Eval (job + '(' + Join (",",utility.Map (arguments,"_value_", "utility.convertToArgumentString (_value_)")) + ')'), arguments);
261        }
262    }
263
264    lfunction QueueComplete (queue) {
265
266        mpi_node_count = utility.GetEnvVariable ("MPI_NODE_COUNT");
267
268        if (mpi_node_count > 1) {
269            do {
270
271                for (node = 1; node < mpi_node_count; node += 1) {
272                    if (None != (queue [node])[utility.getGlobalValue("terms.mpi.job_id")]) {
273                        break;
274                    }
275                }
276
277                if (node < mpi_node_count) {
278                    node = aux._handle_receieve (queue);
279                }
280            } while (node < mpi_node_count);
281        }
282
283        queue = None
284    }
285
286    namespace aux {
287        function queue_export_function (func_id) {
288
289            Export (complete_function_dump, ^func_id);
290            return complete_function_dump;
291        }
292
293        lfunction _handle_receieve (queue) {
294            MPIReceive (-1,from,result);
295            Call ((queue [from])[utility.getGlobalValue("terms.mpi.callback")], from, Eval(result), (queue [from])[utility.getGlobalValue("terms.mpi.arguments")]);
296            queue [from] = {utility.getGlobalValue("terms.mpi.job_id") : None, utility.getGlobalValue("terms.mpi.callback") : None};
297            return from;
298        }
299    }
300
301
302    //------------------------------------------------------------------------------
303
304    lfunction ComputeOnGrid (lf_id, grid, handler, callback) {
305
306        jobs = mpi.PartitionIntoBlocks(grid);
307
308        scores = {};
309
310        queue  = mpi.CreateQueue ({^"terms.mpi.LikelihoodFunctions": {{lf_id}},
311                                   ^"terms.mpi.Headers" : utility.GetListOfLoadedModules ("libv3/")});
312
313
314        io.ReportProgressBar("", "Computing LF on a grid");
315        for (i = 1; i < Abs (jobs); i += 1) {
316           io.ReportProgressBar("", "Computing LF on a grid "  + i + "/" + Abs (jobs));
317           mpi.QueueJob (queue, handler, {"0" : lf_id,
318                                           "1" : jobs [i],
319                                           "2" : &scores}, callback);
320        }
321
322
323        io.ClearProgressBar();
324        Call (callback, -1, Call (handler, lf_id, jobs[0], &scores), {"0" : lf_id, "1" : jobs [0], "2" : &scores});
325
326        mpi.QueueComplete (queue);
327
328        return scores;
329
330    }
331
332    //------------------------------------------------------------------------------
333
334
335    lfunction ComputeOnGrid.ResultHandler (node, result, arguments) {
336        utility.Extend (^(arguments[2]), result);
337    }
338
339    //------------------------------------------------------------------------------
340
341    lfunction ComputeOnGrid.SimpleEvaluator (lf_id, tasks, scores) {
342        LFCompute (^lf_id, LF_START_COMPUTE);
343
344        results = {};
345        task_ids = utility.Keys (tasks);
346        task_count = Abs (tasks);
347        for (i = 0; i < task_count; i+=1) {
348            parameters.SetValues (tasks[task_ids[i]]);
349            LFCompute (^lf_id, ll);
350            results [task_ids[i]] = ll;
351
352        }
353        LFCompute (^lf_id, LF_DONE_COMPUTE);
354        return results;
355    }
356
357    //------------------------------------------------------------------------------
358
359    lfunction pass2.evaluator (lf_id, tasks, scores) {
360
361        results = {};
362        task_ids = utility.Keys (tasks);
363        task_count = Abs (tasks);
364        for (i = 0; i < task_count; i+=1) {
365            parameters.SetValues (tasks[task_ids[i]]);
366            ConstructCategoryMatrix(site_likelihoods,^lf_id,SITE_LOG_LIKELIHOODS);
367            /*if (( (tasks[task_ids[i]]) ["FADE bias"])["MLE"] == 0.0) {
368                console.log (tasks[task_ids[i]]);
369                console.log (site_likelihoods);
370            }*/
371            results [task_ids[i]] = site_likelihoods ["Max(_MATRIX_ELEMENT_VALUE_,-1e200)"];
372
373            // to avoid returning -inf
374        }
375
376
377        return results;
378    }
379}
380