1LoadFunctionLibrary ("../all-terms.bf"); 2LoadFunctionLibrary ("../models/parameters.bf"); 3LoadFunctionLibrary ("../IOFunctions.bf"); 4 5/** @module mpi 6 Functions for creating, populating, and manipulating 7 MPI job queues. In the absence of an MPI environment 8 the jobs are executed serially 9 */ 10 11 12 13namespace mpi { 14 job_id = 0; 15 16 function NodeCount () { 17 return utility.GetEnvVariable ("MPI_NODE_COUNT"); 18 } 19 20 function IsMasterNode () { 21 return utility.GetEnvVariable ("MPI_NODE_ID") == 0; 22 } 23 24 function get_next_job_id () { 25 job_id += 1; 26 return job_id; 27 } 28 29 /** Partition a list of tasks into approximately (+/- 1) equal subtasks 30 * @name mpi.PartitionIntoBlocks 31 * @param {Dict} object 32 * key -> task specification 33 * @return {Dict} 34 node -> list of task specifications 35 */ 36 37 lfunction PartitionIntoBlocks (object) { 38 if (Type (object) == "AssociativeList") { 39 mpi_node_count = utility.GetEnvVariable ("MPI_NODE_COUNT"); 40 if (mpi_node_count > 1) { 41 return_object = {}; 42 task_count = utility.Array1D (object); 43 task_keys = utility.Keys (object); 44 slice_size = Max (1, task_count $ mpi_node_count); 45 roundoff = Max (0, task_count - mpi_node_count * slice_size); 46 current_index = 0; 47 48 for (n = 0; n < mpi_node_count; n += 1) { 49 node_tasks = {}; 50 if (current_index < task_count) { 51 for (i = 0; i < slice_size; i+=1) { 52 node_tasks[task_keys[current_index]] = object [task_keys[current_index]]; 53 current_index += 1; 54 } 55 if (n < roundoff) { 56 node_tasks[task_keys[current_index]] = object [task_keys[current_index]]; 57 current_index += 1; 58 } 59 } 60 return_object [n] = node_tasks; 61 } 62 return return_object; 63 } 64 return { "0" : object}; 65 } 66 return object; 67 } 68 69 70 lfunction CreateQueue (nodesetup) { 71 /** create and return an empty FIFO queue for MPI jobs 72 * @name mpi.CreateQueue 73 * @param {Dict} nodesetup 74 * controls what gets passed to slave nodes 75 * "Headers" -> iterable (matrix/dict) of string paths of header files to load 76 * "Models" -> matrix of model names to make available to slave nodes 77 * "Filters" -> matrix of filter names to make available to slave nodes 78 * "LikelihoodFunctions" -> iterable (matrix/dict) of LikelihoodFunction IDs to export to slave nodes 79 * @return {Dict} an "opaque" queue structure 80 */ 81 82 mpi_node_count = utility.GetEnvVariable ("MPI_NODE_COUNT"); 83 84 queue = {}; 85 send_to_nodes = ""; 86 if (mpi_node_count > 1) { 87 88 if (None != nodesetup) { 89 if (Abs (nodesetup)) { 90 91 utility.SetEnvVariable ("LF_NEXUS_EXPORT_EXTRA", 92 'PRESERVE_SLAVE_NODE_STATE = TRUE; MPI_NEXUS_FILE_RETURN = "None";'); 93 94 send_to_nodes * 128; 95 96 97 utility.ForEach (nodesetup[utility.getGlobalValue("terms.mpi.LikelihoodFunctions")], "_value_", 98 ' 99 ExecuteCommands ("Export (create_queue.temp, " + _value_ + ")"); 100 for (`&k` = 1; `&k` < `mpi_node_count`; `&k` += 1) { 101 MPISend (`&k`, create_queue.temp); 102 } 103 for (`&k` = 1; `&k` < `mpi_node_count`; `&k` += 1) { 104 MPIReceive (-1, ignore, ignore); 105 } 106 107 '); 108 109 110 if (utility.Has (nodesetup, utility.getGlobalValue("terms.mpi.Headers"), None)) { 111 send_to_nodes * "PRESERVE_SLAVE_NODE_STATE = TRUE;\n"; 112 send_to_nodes * (Join (";\n",utility.Map (nodesetup[utility.getGlobalValue("terms.mpi.Headers")], "_value_", "'LoadFunctionLibrary(\"' + _value_ +'\")'")) + ";"); 113 } 114 115 if (utility.Has (nodesetup, utility.getGlobalValue("terms.mpi.Functions"), None)) { 116 send_to_nodes * "PRESERVE_SLAVE_NODE_STATE = TRUE;\n"; 117 utility.ForEach (nodesetup[utility.getGlobalValue("terms.mpi.Functions")], "_value_", 118 ' 119 ExecuteCommands ("Export (_test_id_," + _value_ + ")"); 120 `&send_to_nodes` * _test_id_; 121 ' 122 ); 123 } 124 125 if (utility.Has (nodesetup, utility.getGlobalValue("terms.mpi.Variables"), None)) { 126 utility.ForEach (nodesetup[utility.getGlobalValue("terms.mpi.Variables")], "_value_", 127 ' 128 `&send_to_nodes` * ("\n" + _value_ + " = " + (parameters.Quote(^_value_)) + ";\n") ; 129 ' 130 ); 131 } 132 133 if (utility.Has (nodesetup, utility.getGlobalValue("terms.mpi.DataSetFilters"), None)) { 134 utility.SetEnvVariable ("DATA_FILE_PRINT_FORMAT",9); 135 utility.ForEach (nodesetup[utility.getGlobalValue("terms.mpi.DataSetFilters")], "_value_", 136 ' 137 Export (serialized_filter, ^_value_); 138 `&send_to_nodes` * ("\nDataSet __private_" + _value_ + " = ReadFromString (\'" + (serialized_filter&&2) + "\'); DataSetFilter " + _value_ + " = CreateFilter (__private_" + _value_ + ",1);"); 139 ' 140 ); 141 } 142 143 model_count = utility.Array1D (nodesetup[utility.getGlobalValue("terms.mpi.Models")]); 144 145 if (model_count) { 146 send_to_nodes * "PRESERVE_SLAVE_NODE_STATE = TRUE;\n"; 147 148 globals_to_export = {}; 149 functions_to_export = {}; 150 151 for (m = 0; m < model_count; m+=1) { 152 model_name = (nodesetup[utility.getGlobalValue("terms.mpi.Models")])[m]; 153 model_globals = utility.UniqueValues(((^model_name)[utility.getGlobalValue("terms.parameters")])[utility.getGlobalValue("terms.global")]); 154 model_global_count = utility.Array1D (model_globals); 155 for (v = 0; v < model_global_count; v+=1) { 156 globals_to_export [model_globals[v]] = 1; 157 } 158 159 utility.ForEach ({{ utility.getGlobalValue("terms.model.get_branch_length"), utility.getGlobalValue("terms.model.set_branch_length") }}, "_value_", 160 ' 161 _test_id_ = (^(`&model_name`))[_value_]; 162 if (Type (_test_id_) == "String" && Abs (_test_id_) > 0) { 163 `&functions_to_export` [_test_id_] = 1; 164 } 165 '); 166 } 167 168 utility.ForEach (utility.Keys(globals_to_export), "_value_", 169 ' 170 `&send_to_nodes` * parameters.ExportParameterDefinition (_value_); 171 ' 172 ); 173 174 utility.ForEach (utility.Keys(functions_to_export), "_value_", 175 ' 176 ExecuteCommands ("Export (_test_id_," + _value_ + ")"); 177 `&send_to_nodes` * _test_id_; 178 ' 179 ); 180 } 181 182 183 send_to_nodes * 0; 184 queue ["cache"] = {}; 185 186 } 187 } 188 189 if (Abs (send_to_nodes)) { 190 191 for (k = 1; k < mpi_node_count; k += 1) { 192 MPISend (k, send_to_nodes); 193 } 194 for (k = 1; k < mpi_node_count; k += 1) { 195 MPIReceive (-1, ignore, ignore); 196 } 197 } 198 199 //assert (0); 200 for (k = 1; k < mpi_node_count; k += 1) { 201 queue [k] = {utility.getGlobalValue("terms.mpi.job_id") : None, utility.getGlobalValue("terms.mpi.callback") : None, utility.getGlobalValue("terms.mpi.arguments"): None}; 202 } 203 204 // this will store jobs that were previously sent to each node; avoiding redefinition if possible 205 } 206 return queue; 207 } 208 209 210 lfunction QueueJob (queue, job, arguments, result_callback) { 211 212 /** 213 send the job function with provided arguments to 214 the first available node. 215 216 When the job is finished; call the "result_callback" function 217 218 */ 219 220 mpi_node_count = utility.GetEnvVariable ("MPI_NODE_COUNT"); 221 222 if (mpi_node_count > 1) { 223 for (node = 1; node < mpi_node_count; node += 1) { 224 if (None == (queue [node])[utility.getGlobalValue("terms.mpi.job_id")]) { 225 break; 226 } 227 } 228 229 if (node == mpi_node_count) { 230 node = aux._handle_receieve (queue); 231 } 232 233 complete_function_dump = None; 234 235 if (utility.Has (queue, "cache" , "AssociativeList")) { 236 if ((queue["cache"])[node] == job) { 237 complete_function_dump = ""; 238 //console.log ("CACHED MPI preamble"); 239 } else { 240 (queue["cache"])[node] = job; 241 } 242 } 243 244 if (None == complete_function_dump) { 245 complete_function_dump = aux.queue_export_function (job); 246 } 247 248 //console.log (complete_function_dump); 249 job_id = get_next_job_id(); 250 //fprintf (stdout, "Sending to node ", node, "\n"); 251 queue [node] = {utility.getGlobalValue("terms.mpi.job_id") : job_id, utility.getGlobalValue("terms.mpi.callback") : result_callback, utility.getGlobalValue("terms.mpi.arguments") : arguments}; 252 MPISend (node, complete_function_dump + "; return " + job + '(' + Join (",",utility.Map (arguments,"_value_", "utility.convertToArgumentString (_value_)")) + ')'); 253 254 } else { 255 256 //console.log(job); 257 //console.log(arguments); 258 //console.log(result_callback); 259 //exit(); 260 Call (result_callback, 0, Eval (job + '(' + Join (",",utility.Map (arguments,"_value_", "utility.convertToArgumentString (_value_)")) + ')'), arguments); 261 } 262 } 263 264 lfunction QueueComplete (queue) { 265 266 mpi_node_count = utility.GetEnvVariable ("MPI_NODE_COUNT"); 267 268 if (mpi_node_count > 1) { 269 do { 270 271 for (node = 1; node < mpi_node_count; node += 1) { 272 if (None != (queue [node])[utility.getGlobalValue("terms.mpi.job_id")]) { 273 break; 274 } 275 } 276 277 if (node < mpi_node_count) { 278 node = aux._handle_receieve (queue); 279 } 280 } while (node < mpi_node_count); 281 } 282 283 queue = None 284 } 285 286 namespace aux { 287 function queue_export_function (func_id) { 288 289 Export (complete_function_dump, ^func_id); 290 return complete_function_dump; 291 } 292 293 lfunction _handle_receieve (queue) { 294 MPIReceive (-1,from,result); 295 Call ((queue [from])[utility.getGlobalValue("terms.mpi.callback")], from, Eval(result), (queue [from])[utility.getGlobalValue("terms.mpi.arguments")]); 296 queue [from] = {utility.getGlobalValue("terms.mpi.job_id") : None, utility.getGlobalValue("terms.mpi.callback") : None}; 297 return from; 298 } 299 } 300 301 302 //------------------------------------------------------------------------------ 303 304 lfunction ComputeOnGrid (lf_id, grid, handler, callback) { 305 306 jobs = mpi.PartitionIntoBlocks(grid); 307 308 scores = {}; 309 310 queue = mpi.CreateQueue ({^"terms.mpi.LikelihoodFunctions": {{lf_id}}, 311 ^"terms.mpi.Headers" : utility.GetListOfLoadedModules ("libv3/")}); 312 313 314 io.ReportProgressBar("", "Computing LF on a grid"); 315 for (i = 1; i < Abs (jobs); i += 1) { 316 io.ReportProgressBar("", "Computing LF on a grid " + i + "/" + Abs (jobs)); 317 mpi.QueueJob (queue, handler, {"0" : lf_id, 318 "1" : jobs [i], 319 "2" : &scores}, callback); 320 } 321 322 323 io.ClearProgressBar(); 324 Call (callback, -1, Call (handler, lf_id, jobs[0], &scores), {"0" : lf_id, "1" : jobs [0], "2" : &scores}); 325 326 mpi.QueueComplete (queue); 327 328 return scores; 329 330 } 331 332 //------------------------------------------------------------------------------ 333 334 335 lfunction ComputeOnGrid.ResultHandler (node, result, arguments) { 336 utility.Extend (^(arguments[2]), result); 337 } 338 339 //------------------------------------------------------------------------------ 340 341 lfunction ComputeOnGrid.SimpleEvaluator (lf_id, tasks, scores) { 342 LFCompute (^lf_id, LF_START_COMPUTE); 343 344 results = {}; 345 task_ids = utility.Keys (tasks); 346 task_count = Abs (tasks); 347 for (i = 0; i < task_count; i+=1) { 348 parameters.SetValues (tasks[task_ids[i]]); 349 LFCompute (^lf_id, ll); 350 results [task_ids[i]] = ll; 351 352 } 353 LFCompute (^lf_id, LF_DONE_COMPUTE); 354 return results; 355 } 356 357 //------------------------------------------------------------------------------ 358 359 lfunction pass2.evaluator (lf_id, tasks, scores) { 360 361 results = {}; 362 task_ids = utility.Keys (tasks); 363 task_count = Abs (tasks); 364 for (i = 0; i < task_count; i+=1) { 365 parameters.SetValues (tasks[task_ids[i]]); 366 ConstructCategoryMatrix(site_likelihoods,^lf_id,SITE_LOG_LIKELIHOODS); 367 /*if (( (tasks[task_ids[i]]) ["FADE bias"])["MLE"] == 0.0) { 368 console.log (tasks[task_ids[i]]); 369 console.log (site_likelihoods); 370 }*/ 371 results [task_ids[i]] = site_likelihoods ["Max(_MATRIX_ELEMENT_VALUE_,-1e200)"]; 372 373 // to avoid returning -inf 374 } 375 376 377 return results; 378 } 379} 380