1# Copyright (C) 2015-2021 Regents of the University of California 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import logging 15import os 16from typing import Optional 17 18from toil.wdl.wdl_functions import heredoc_wdl 19from toil.wdl.wdl_types import (WDLArrayType, 20 WDLCompoundType, 21 WDLFileType, 22 WDLMapType, 23 WDLPairType, 24 WDLType) 25 26logger = logging.getLogger(__name__) 27 28 29class SynthesizeWDL: 30 """ 31 SynthesizeWDL takes the "workflows_dictionary" and "tasks_dictionary" produced by 32 wdl_analysis.py and uses them to write a native python script for use with Toil. 33 34 A WDL "workflow" section roughly corresponds to the python "main()" function, where 35 functions are wrapped as Toil "jobs", output dependencies specified, and called. 36 37 A WDL "task" section corresponds to a unique python function, which will be wrapped 38 as a Toil "job" and defined outside of the "main()" function that calls it. 39 40 Generally this handles breaking sections into their corresponding Toil counterparts. 41 42 For example: write the imports, then write all functions defining jobs (which have subsections 43 like: write header, define variables, read "File" types into the jobstore, docker call, etc.), 44 then write the main and all of its subsections. 45 """ 46 47 def __init__(self, 48 version: str, 49 tasks_dictionary: dict, 50 workflows_dictionary: dict, 51 output_directory: str, 52 json_dict: dict, 53 docker_user: str, 54 jobstore: Optional[str] = None, 55 destBucket: Optional[str] = None): 56 57 self.version = version 58 self.output_directory = output_directory 59 if not os.path.exists(self.output_directory): 60 try: 61 os.makedirs(self.output_directory) 62 except: 63 raise OSError( 64 'Could not create directory. Insufficient permissions or disk space most likely.') 65 66 self.output_file = os.path.join(self.output_directory, 'toilwdl_compiled.py') 67 68 self.jobstore = jobstore if jobstore else './toilWorkflowRun' 69 70 if docker_user != 'None': 71 self.docker_user = "'" + docker_user + "'" 72 else: 73 self.docker_user = docker_user 74 75 # only json is required; tsv/csv are optional 76 self.json_dict = json_dict 77 78 # holds task skeletons from WDL task objects 79 self.tasks_dictionary = tasks_dictionary 80 # holds workflow structure from WDL workflow objects 81 self.workflows_dictionary = workflows_dictionary 82 83 # keep track of which workflow is being written 84 self.current_workflow = None 85 86 # unique iterator to add to cmd names 87 self.cmd_num = 0 88 89 # deposit WDL outputs into a cloud bucket; optional 90 self.destBucket = destBucket 91 92 def write_modules(self): 93 # string used to write imports to the file 94 module_string = heredoc_wdl(''' 95 from toil.job import Job 96 from toil.common import Toil 97 from toil.lib.docker import apiDockerCall 98 from toil.wdl.wdl_types import WDLType 99 from toil.wdl.wdl_types import WDLStringType 100 from toil.wdl.wdl_types import WDLIntType 101 from toil.wdl.wdl_types import WDLFloatType 102 from toil.wdl.wdl_types import WDLBooleanType 103 from toil.wdl.wdl_types import WDLFileType 104 from toil.wdl.wdl_types import WDLArrayType 105 from toil.wdl.wdl_types import WDLPairType 106 from toil.wdl.wdl_types import WDLMapType 107 from toil.wdl.wdl_types import WDLFile 108 from toil.wdl.wdl_types import WDLPair 109 from toil.wdl.wdl_functions import generate_docker_bashscript_file 110 from toil.wdl.wdl_functions import generate_stdout_file 111 from toil.wdl.wdl_functions import select_first 112 from toil.wdl.wdl_functions import sub 113 from toil.wdl.wdl_functions import size 114 from toil.wdl.wdl_functions import glob 115 from toil.wdl.wdl_functions import process_and_read_file 116 from toil.wdl.wdl_functions import process_infile 117 from toil.wdl.wdl_functions import process_outfile 118 from toil.wdl.wdl_functions import abspath_file 119 from toil.wdl.wdl_functions import combine_dicts 120 from toil.wdl.wdl_functions import parse_memory 121 from toil.wdl.wdl_functions import parse_cores 122 from toil.wdl.wdl_functions import parse_disk 123 from toil.wdl.wdl_functions import read_lines 124 from toil.wdl.wdl_functions import read_tsv 125 from toil.wdl.wdl_functions import read_csv 126 from toil.wdl.wdl_functions import read_json 127 from toil.wdl.wdl_functions import read_map 128 from toil.wdl.wdl_functions import read_int 129 from toil.wdl.wdl_functions import read_string 130 from toil.wdl.wdl_functions import read_float 131 from toil.wdl.wdl_functions import read_boolean 132 from toil.wdl.wdl_functions import write_lines 133 from toil.wdl.wdl_functions import write_tsv 134 from toil.wdl.wdl_functions import write_json 135 from toil.wdl.wdl_functions import write_map 136 from toil.wdl.wdl_functions import defined 137 from toil.wdl.wdl_functions import basename 138 from toil.wdl.wdl_functions import floor 139 from toil.wdl.wdl_functions import ceil 140 from toil.wdl.wdl_functions import wdl_range 141 from toil.wdl.wdl_functions import transpose 142 from toil.wdl.wdl_functions import length 143 from toil.wdl.wdl_functions import wdl_zip 144 from toil.wdl.wdl_functions import cross 145 from toil.wdl.wdl_functions import as_pairs 146 from toil.wdl.wdl_functions import as_map 147 from toil.wdl.wdl_functions import keys 148 from toil.wdl.wdl_functions import collect_by_key 149 from toil.wdl.wdl_functions import flatten 150 import fnmatch 151 import textwrap 152 import subprocess 153 import os 154 import errno 155 import time 156 import shutil 157 import shlex 158 import uuid 159 import logging 160 161 _toil_wdl_internal__current_working_dir = os.getcwd() 162 163 logger = logging.getLogger(__name__) 164 165 166 ''', {'jobstore': self.jobstore})[1:] 167 return module_string 168 169 def write_main(self): 170 """ 171 Writes out a huge string representing the main section of the python 172 compiled toil script. 173 174 Currently looks at and writes 5 sections: 175 1. JSON Variables (includes importing and preparing files as tuples) 176 2. TSV Variables (includes importing and preparing files as tuples) 177 3. CSV Variables (includes importing and preparing files as tuples) 178 4. Wrapping each WDL "task" function as a toil job 179 5. List out children and encapsulated jobs by priority, then start job0. 180 181 This should create variable declarations necessary for function calls. 182 Map file paths appropriately and store them in the toil fileStore so 183 that they are persistent from job to job. Create job wrappers for toil. 184 And finally write out, and run the jobs in order of priority using the 185 addChild and encapsulate commands provided by toil. 186 187 :return: giant string containing the main def for the toil script. 188 """ 189 190 main_section = '' 191 192 # write out the main header 193 main_header = self.write_main_header() 194 main_section = main_section + main_header 195 196 # write toil job wrappers with input vars 197 jobs_to_write = self.write_main_jobwrappers() 198 main_section = main_section + jobs_to_write 199 200 # loop to export all outputs to a cloud bucket 201 if self.destBucket: 202 main_destbucket = self.write_main_destbucket() 203 main_section = main_section + main_destbucket 204 205 return main_section 206 207 def write_main_header(self): 208 main_header = heredoc_wdl(''' 209 if __name__=="__main__": 210 options = Job.Runner.getDefaultOptions("{jobstore}") 211 options.clean = 'always' 212 with Toil(options) as fileStore: 213 ''', {'jobstore': self.jobstore}) 214 return main_header 215 216 def write_main_jobwrappers(self): 217 """ 218 Writes out 'jobs' as wrapped toil objects in preparation for calling. 219 220 :return: A string representing this. 221 """ 222 main_section = '' 223 224 # toil cannot technically start with multiple jobs, so an empty 225 # 'initialize_jobs' function is always called first to get around this 226 main_section = main_section + ' job0 = Job.wrapJobFn(initialize_jobs)\n' 227 228 # declare each job in main as a wrapped toil function in order of priority 229 for wf in self.workflows_dictionary: 230 self.current_workflow = wf 231 for assignment in self.workflows_dictionary[wf]: 232 if assignment.startswith('declaration'): 233 main_section += self.write_main_jobwrappers_declaration(self.workflows_dictionary[wf][assignment]) 234 if assignment.startswith('call'): 235 main_section += ' job0 = job0.encapsulate()\n' 236 main_section += self.write_main_jobwrappers_call(self.workflows_dictionary[wf][assignment]) 237 if assignment.startswith('scatter'): 238 main_section += ' job0 = job0.encapsulate()\n' 239 main_section += self.write_main_jobwrappers_scatter(self.workflows_dictionary[wf][assignment], 240 assignment) 241 if assignment.startswith('if'): 242 main_section += ' if {}:\n'.format(self.workflows_dictionary[wf][assignment]['expression']) 243 main_section += self.write_main_jobwrappers_if(self.workflows_dictionary[wf][assignment]['body']) 244 245 main_section += '\n fileStore.start(job0)\n' 246 247 return main_section 248 249 def write_main_jobwrappers_declaration(self, declaration): 250 251 main_section = '' 252 var_name, var_type, var_expr = declaration 253 254 # check the json file for the expression's value 255 # this is a higher priority and overrides anything written in the .wdl 256 json_expressn = self.json_var(wf=self.current_workflow, var=var_name) 257 if json_expressn is not None: 258 var_expr = json_expressn 259 260 main_section += ' {} = {}.create(\n {})\n' \ 261 .format(var_name, self.write_declaration_type(var_type), var_expr) 262 263 # import filepath into jobstore 264 if self.needs_file_import(var_type) and var_expr: 265 main_section += ' {} = process_infile({}, fileStore)\n'.format(var_name, var_name) 266 267 return main_section 268 269 def write_main_destbucket(self): 270 """ 271 Writes out a loop for exporting outputs to a cloud bucket. 272 273 :return: A string representing this. 274 """ 275 main_section = heredoc_wdl(''' 276 outdir = '{outdir}' 277 onlyfiles = [os.path.join(outdir, f) for f in os.listdir(outdir) if os.path.isfile(os.path.join(outdir, f))] 278 for output_f_path in onlyfiles: 279 output_file = fileStore.writeGlobalFile(output_f_path) 280 preserveThisFilename = os.path.basename(output_f_path) 281 destUrl = '/'.join(s.strip('/') for s in [destBucket, preserveThisFilename]) 282 fileStore.exportFile(output_file, destUrl) 283 ''', {'outdir': self.output_directory}, indent=' ') 284 return main_section 285 286 def fetch_ignoredifs(self, assignments, breaking_assignment): 287 ignore_ifs = [] 288 for assignment in assignments: 289 if assignment.startswith('call'): 290 pass 291 elif assignment.startswith('scatter'): 292 pass 293 elif assignment.startswith('if'): 294 if not self.fetch_ignoredifs_chain(assignments[assignment]['body'], breaking_assignment): 295 ignore_ifs.append(assignment) 296 return ignore_ifs 297 298 def fetch_ignoredifs_chain(self, assignments, breaking_assignment): 299 for assignment in assignments: 300 if assignment.startswith('call'): 301 if assignment == breaking_assignment: 302 return True 303 if assignment.startswith('scatter'): 304 if assignment == breaking_assignment: 305 return True 306 if assignment.startswith('if'): 307 return self.fetch_ignoredifs_chain(assignments[assignment]['body'], breaking_assignment) 308 return False 309 310 def write_main_jobwrappers_if(self, if_statement): 311 # check for empty if statement 312 if not if_statement: 313 return self.indent(' pass') 314 315 main_section = '' 316 for assignment in if_statement: 317 if assignment.startswith('declaration'): 318 main_section += self.write_main_jobwrappers_declaration(if_statement[assignment]) 319 if assignment.startswith('call'): 320 main_section += ' job0 = job0.encapsulate()\n' 321 main_section += self.write_main_jobwrappers_call(if_statement[assignment]) 322 if assignment.startswith('scatter'): 323 main_section += ' job0 = job0.encapsulate()\n' 324 main_section += self.write_main_jobwrappers_scatter(if_statement[assignment], assignment) 325 if assignment.startswith('if'): 326 main_section += ' if {}:\n'.format(if_statement[assignment]['expression']) 327 main_section += self.write_main_jobwrappers_if(if_statement[assignment]['body']) 328 main_section = self.indent(main_section) 329 return main_section 330 331 def write_main_jobwrappers_scatter(self, task, assignment): 332 scatter_inputs = self.fetch_scatter_inputs(assignment) 333 334 main_section = ' {scatter} = job0.addChild({scatter}Cls('.format(scatter=assignment) 335 for var in scatter_inputs: 336 main_section += var + '=' + var + ', ' 337 if main_section.endswith(', '): 338 main_section = main_section[:-2] 339 main_section += '))\n' 340 341 scatter_outputs = self.fetch_scatter_outputs(task) 342 for var in scatter_outputs: 343 main_section += ' {var} = {scatter}.rv("{var}")\n'.format(var=var['task'] + '_' + var['output'], scatter=assignment) 344 345 return main_section 346 347 def fetch_scatter_outputs(self, task): 348 scatteroutputs = [] 349 350 for var in task['body']: 351 # TODO variable support 352 if var.startswith('call'): 353 if 'outputs' in self.tasks_dictionary[task['body'][var]['task']]: 354 for output in self.tasks_dictionary[task['body'][var]['task']]['outputs']: 355 scatteroutputs.append({'task': task['body'][var]['alias'], 'output': output[0]}) 356 return scatteroutputs 357 358 def fetch_scatter_inputs(self, assigned): 359 360 for wf in self.workflows_dictionary: 361 ignored_ifs = self.fetch_ignoredifs(self.workflows_dictionary[wf], assigned) 362 # TODO support additional wfs 363 break 364 365 scatternamespace = [] 366 367 for wf in self.workflows_dictionary: 368 for assignment in self.workflows_dictionary[wf]: 369 if assignment == assigned: 370 return scatternamespace 371 elif assignment.startswith('declaration'): 372 name, _, _ = self.workflows_dictionary[wf][assignment] 373 scatternamespace.append(name) 374 elif assignment.startswith('call'): 375 if 'outputs' in self.tasks_dictionary[self.workflows_dictionary[wf][assignment]['task']]: 376 for output in self.tasks_dictionary[self.workflows_dictionary[wf][assignment]['task']]['outputs']: 377 scatternamespace.append(self.workflows_dictionary[wf][assignment]['alias'] + '_' + output[0]) 378 elif assignment.startswith('scatter'): 379 for var in self.fetch_scatter_outputs(self.workflows_dictionary[wf][assignment]): 380 scatternamespace.append(var['task'] + '_' + var['output']) 381 elif assignment.startswith('if') and assignment not in ignored_ifs: 382 new_list, cont_or_break = self.fetch_scatter_inputs_chain(self.workflows_dictionary[wf][assignment]['body'], 383 assigned, 384 ignored_ifs, 385 inputs_list=[]) 386 scatternamespace += new_list 387 if not cont_or_break: 388 return scatternamespace 389 return scatternamespace 390 391 def fetch_scatter_inputs_chain(self, inputs, assigned, ignored_ifs, inputs_list): 392 for i in inputs: 393 if i == assigned: 394 return inputs_list, False 395 elif i.startswith('call'): 396 if 'outputs' in self.tasks_dictionary[inputs[i]['task']]: 397 for output in self.tasks_dictionary[inputs[i]['task']]['outputs']: 398 inputs_list.append(inputs[i]['alias'] + '_' + output[0]) 399 elif i.startswith('scatter'): 400 for var in self.fetch_scatter_outputs(inputs[i]): 401 inputs_list.append(var['task'] + '_' + var['output']) 402 elif i.startswith('if') and i not in ignored_ifs: 403 inputs_list, cont_or_break = self.fetch_scatter_inputs_chain(inputs[i]['body'], assigned, ignored_ifs, inputs_list) 404 if not cont_or_break: 405 return inputs_list, False 406 return inputs_list, True 407 408 def write_main_jobwrappers_call(self, task): 409 main_section = ' {} = job0.addChild({}Cls('.format(task['alias'], task['task']) 410 for var in task['io']: 411 main_section += var + '=' + task['io'][var] + ', ' 412 if main_section.endswith(', '): 413 main_section = main_section[:-2] 414 main_section += '))\n' 415 416 call_outputs = self.fetch_call_outputs(task) 417 for var in call_outputs: 418 main_section += ' {var} = {task}.rv("{output}")\n'.format(var=var['task'] + '_' + var['output'], 419 task=var['task'], 420 output=var['output']) 421 return main_section 422 423 def fetch_call_outputs(self, task): 424 calloutputs = [] 425 if 'outputs' in self.tasks_dictionary[task['task']]: 426 for output in self.tasks_dictionary[task['task']]['outputs']: 427 calloutputs.append({'task': task['alias'], 'output': output[0]}) 428 return calloutputs 429 430 def write_functions(self): 431 """ 432 Writes out a python function for each WDL "task" object. 433 434 :return: a giant string containing the meat of the job defs. 435 """ 436 437 # toil cannot technically start with multiple jobs, so an empty 438 # 'initialize_jobs' function is always called first to get around this 439 fn_section = 'def initialize_jobs(job):\n' + \ 440 ' job.fileStore.logToMaster("initialize_jobs")\n' 441 442 for job in self.tasks_dictionary: 443 fn_section += self.write_function(job) 444 445 for wf in self.workflows_dictionary: 446 for assignment in self.workflows_dictionary[wf]: 447 if assignment.startswith('scatter'): 448 fn_section += self.write_scatterfunction(self.workflows_dictionary[wf][assignment], assignment) 449 if assignment.startswith('if'): 450 fn_section += self.write_scatterfunctions_within_if(self.workflows_dictionary[wf][assignment]['body']) 451 452 return fn_section 453 454 def write_scatterfunctions_within_if(self, ifstatement): 455 fn_section = '' 456 for assignment in ifstatement: 457 if assignment.startswith('scatter'): 458 fn_section += self.write_scatterfunction(ifstatement[assignment], assignment) 459 if assignment.startswith('if'): 460 fn_section += self.write_scatterfunctions_within_if(ifstatement[assignment]['body']) 461 return fn_section 462 463 def write_scatterfunction(self, job, scattername): 464 """ 465 Writes out a python function for each WDL "scatter" object. 466 """ 467 468 scatter_outputs = self.fetch_scatter_outputs(job) 469 470 # write the function header 471 fn_section = self.write_scatterfunction_header(scattername) 472 473 # write the scatter definitions 474 fn_section += self.write_scatterfunction_lists(scatter_outputs) 475 476 # write 477 fn_section += self.write_scatterfunction_loop(job, scatter_outputs) 478 479 # write the outputs for the task to return 480 fn_section += self.write_scatterfunction_outputreturn(scatter_outputs) 481 482 return fn_section 483 484 def write_scatterfunction_header(self, scattername): 485 """ 486 487 :return: 488 """ 489 scatter_inputs = self.fetch_scatter_inputs(scattername) 490 491 fn_section = '\n\nclass {jobname}Cls(Job):\n'.format(jobname=scattername) 492 fn_section += ' def __init__(self, ' 493 for input in scatter_inputs: 494 fn_section += '{input}=None, '.format(input=input) 495 fn_section += '*args, **kwargs):\n' 496 fn_section += ' Job.__init__(self)\n\n' 497 498 for input in scatter_inputs: 499 fn_section += ' self.id_{input} = {input}\n'.format(input=input) 500 501 fn_section += heredoc_wdl(''' 502 503 def run(self, fileStore): 504 fileStore.logToMaster("{jobname}") 505 tempDir = fileStore.getLocalTempDir() 506 507 try: 508 os.makedirs(os.path.join(tempDir, 'execution')) 509 except OSError as e: 510 if e.errno != errno.EEXIST: 511 raise 512 ''', {'jobname': scattername}, indent=' ')[1:] 513 for input in scatter_inputs: 514 fn_section += ' {input} = self.id_{input}\n'.format(input=input) 515 return fn_section 516 517 def write_scatterfunction_outputreturn(self, scatter_outputs): 518 """ 519 520 :return: 521 """ 522 fn_section = '\n rvDict = {' 523 for var in scatter_outputs: 524 fn_section += '"{var}": {var}, '.format(var=var['task'] + '_' + var['output']) 525 if fn_section.endswith(', '): 526 fn_section = fn_section[:-2] 527 fn_section += '}\n' 528 fn_section += ' return rvDict\n\n' 529 530 return fn_section[:-1] 531 532 def write_scatterfunction_lists(self, scatter_outputs): 533 """ 534 535 :return: 536 """ 537 fn_section = '\n' 538 for var in scatter_outputs: 539 fn_section += ' {var} = []\n'.format(var=var['task'] + '_' + var['output']) 540 541 return fn_section 542 543 def write_scatterfunction_loop(self, job, scatter_outputs): 544 """ 545 546 :return: 547 """ 548 collection = job['collection'] 549 item = job['item'] 550 551 fn_section = ' for {item} in {collection}:\n'.format(item=item, collection=collection) 552 553 previous_dependency = 'self' 554 for statement in job['body']: 555 if statement.startswith('declaration'): 556 # reusing write_main_jobwrappers_declaration() here, but it needs to be indented one more level. 557 fn_section += self.indent( 558 self.write_main_jobwrappers_declaration(job['body'][statement])) 559 elif statement.startswith('call'): 560 fn_section += self.write_scatter_callwrapper(job['body'][statement], previous_dependency) 561 previous_dependency = 'job_' + job['body'][statement]['alias'] 562 elif statement.startswith('scatter'): 563 raise NotImplementedError('nested scatter not implemented.') 564 elif statement.startswith('if'): 565 fn_section += ' if {}:\n'.format(job['body'][statement]['expression']) 566 # reusing write_main_jobwrappers_if() here, but it needs to be indented one more level. 567 fn_section += self.indent(self.write_main_jobwrappers_if(job['body'][statement]['body'])) 568 569 # check for empty scatter section 570 if len(job['body']) == 0: 571 fn_section += ' pass' 572 573 for var in scatter_outputs: 574 fn_section += ' {var}.append({task}.rv("{output}"))\n'.format(var=var['task'] + '_' + var['output'], 575 task='job_' + var['task'], 576 output=var['output']) 577 return fn_section 578 579 def write_scatter_callwrapper(self, job, previous_dependency): 580 fn_section = ' job_{alias} = {pd}.addFollowOn({task}Cls('.format(alias=job['alias'], 581 pd=previous_dependency, 582 task=job['task']) 583 for var in job['io']: 584 fn_section += var + '=' + job['io'][var] + ', ' 585 if fn_section.endswith(', '): 586 fn_section = fn_section[:-2] 587 fn_section += '))\n' 588 return fn_section 589 590 def write_function(self, job): 591 """ 592 Writes out a python function for each WDL "task" object. 593 594 Each python function is a unit of work written out as a string in 595 preparation to being written out to a file. In WDL, each "job" is 596 called a "task". Each WDL task is written out in multiple steps: 597 598 1: Header and inputs (e.g. 'def mapping(self, input1, input2)') 599 2: Log job name (e.g. 'job.fileStore.logToMaster('initialize_jobs')') 600 3: Create temp dir (e.g. 'tempDir = fileStore.getLocalTempDir()') 601 4: import filenames and use readGlobalFile() to get files from the 602 jobStore 603 5: Reformat commandline variables (like converting to ' '.join(files)). 604 6: Commandline call using subprocess.Popen(). 605 7: Write the section returning the outputs. Also logs stats. 606 607 :return: a giant string containing the meat of the job defs for the toil script. 608 """ 609 610 # write the function header 611 fn_section = self.write_function_header(job) 612 613 # write out commandline keywords 614 fn_section += self.write_function_cmdline(job) 615 616 if self.needsdocker(job): 617 # write a bash script to inject into the docker 618 fn_section += self.write_function_bashscriptline(job) 619 # write a call to the docker API 620 fn_section += self.write_function_dockercall(job) 621 else: 622 # write a subprocess call 623 fn_section += self.write_function_subprocesspopen() 624 625 # write the outputs for the definition to return 626 fn_section += self.write_function_outputreturn(job, docker=self.needsdocker(job)) 627 628 return fn_section 629 630 def write_function_header(self, job): 631 """ 632 Writes the header that starts each function, for example, this function 633 can write and return: 634 635 'def write_function_header(self, job, job_declaration_array):' 636 637 :param job: A list such that: 638 (job priority #, job ID #, Job Skeleton Name, Job Alias) 639 :param job_declaration_array: A list of all inputs that job requires. 640 :return: A string representing this. 641 """ 642 fn_section = '\n\nclass {jobname}Cls(Job):\n'.format(jobname=job) 643 fn_section += ' def __init__(self, ' 644 if 'inputs' in self.tasks_dictionary[job]: 645 for i in self.tasks_dictionary[job]['inputs']: 646 var = i[0] 647 vartype = i[1] 648 if vartype == 'String': 649 fn_section += '{input}="", '.format(input=var) 650 else: 651 fn_section += '{input}=None, '.format(input=var) 652 fn_section += '*args, **kwargs):\n' 653 fn_section += ' super({jobname}Cls, self).__init__(*args, **kwargs)\n'.format(jobname=job) 654 655 # TODO: Resolve inherent problems resolving resource requirements 656 # In WDL, "local-disk " + 500 + " HDD" cannot be directly converted to python. 657 # This needs a special handler. 658 if 'runtime' in self.tasks_dictionary[job]: 659 runtime_resources = [] 660 if 'memory' in self.tasks_dictionary[job]['runtime']: 661 runtime_resources.append('memory=memory') 662 memory = self.tasks_dictionary[job]['runtime']['memory'] 663 fn_section += ' memory=parse_memory({})\n'.format(memory) 664 if 'cpu' in self.tasks_dictionary[job]['runtime']: 665 runtime_resources.append('cores=cores') 666 cores = self.tasks_dictionary[job]['runtime']['cpu'] 667 fn_section += ' cores=parse_cores({})\n'.format(cores) 668 if 'disks' in self.tasks_dictionary[job]['runtime']: 669 runtime_resources.append('disk=disk') 670 disk = self.tasks_dictionary[job]['runtime']['disks'] 671 fn_section += ' disk=parse_disk({})\n'.format(disk) 672 runtime_resources = ['self'] + runtime_resources 673 fn_section += ' Job.__init__({})\n\n'.format(', '.join(runtime_resources)) 674 675 if 'inputs' in self.tasks_dictionary[job]: 676 for i in self.tasks_dictionary[job]['inputs']: 677 var = i[0] 678 var_type = i[1] 679 var_expressn = i[2] 680 json_expressn = self.json_var(task=job, var=var) 681 682 # json declarations have priority and can overwrite 683 # whatever is in the wdl file 684 if json_expressn is not None: 685 var_expressn = json_expressn 686 687 if var_expressn is None: 688 # declarations from workflow 689 fn_section += ' self.id_{} = {}\n'.format(var, var) 690 else: 691 # declarations from a WDL or JSON file 692 fn_section += ' self.id_{} = {}.create(\n {})\n'\ 693 .format(var, self.write_declaration_type(var_type), var_expressn) 694 695 fn_section += heredoc_wdl(''' 696 697 def run(self, fileStore): 698 fileStore.logToMaster("{jobname}") 699 tempDir = fileStore.getLocalTempDir() 700 701 _toil_wdl_internal__stdout_file = os.path.join(tempDir, 'stdout') 702 _toil_wdl_internal__stderr_file = os.path.join(tempDir, 'stderr') 703 704 try: 705 os.makedirs(os.path.join(tempDir, 'execution')) 706 except OSError as e: 707 if e.errno != errno.EEXIST: 708 raise 709 ''', {'jobname': job}, indent=' ')[1:] 710 if 'inputs' in self.tasks_dictionary[job]: 711 for i in self.tasks_dictionary[job]['inputs']: 712 var = i[0] 713 var_type = i[1] 714 715 docker_bool = str(self.needsdocker(job)) 716 717 if self.needs_file_import(var_type): 718 args = ', '.join( 719 [ 720 f'abspath_file(self.id_{var}, _toil_wdl_internal__current_working_dir)', 721 'tempDir', 722 'fileStore', 723 f'docker={docker_bool}' 724 ]) 725 fn_section += ' {} = process_and_read_file({})\n'.format(var, args) 726 else: 727 fn_section += ' {} = self.id_{}\n'.format(var, var) 728 729 return fn_section 730 731 def json_var(self, var, task=None, wf=None): 732 """ 733 734 :param var: 735 :param task: 736 :param wf: 737 :return: 738 """ 739 # default to the last workflow in the list 740 if wf is None: 741 for workflow in self.workflows_dictionary: 742 wf = workflow 743 744 for identifier in self.json_dict: 745 # check task declarations 746 if task: 747 if identifier == '{}.{}.{}'.format(wf, task, var): 748 return self.json_dict[identifier] 749 # else check workflow declarations 750 else: 751 if identifier == '{}.{}'.format(wf, var): 752 return self.json_dict[identifier] 753 754 return None 755 756 def needs_file_import(self, var_type: WDLType) -> bool: 757 """ 758 Check if the given type contains a File type. A return value of True 759 means that the value with this type has files to import. 760 """ 761 if isinstance(var_type, WDLFileType): 762 return True 763 764 if isinstance(var_type, WDLCompoundType): 765 if isinstance(var_type, WDLArrayType): 766 return self.needs_file_import(var_type.element) 767 elif isinstance(var_type, WDLPairType): 768 return self.needs_file_import(var_type.left) or self.needs_file_import(var_type.right) 769 elif isinstance(var_type, WDLMapType): 770 return self.needs_file_import(var_type.key) or self.needs_file_import(var_type.value) 771 else: 772 raise NotImplementedError 773 return False 774 775 def write_declaration_type(self, var_type: WDLType): 776 """ 777 Return a string that preserves the construction of the given WDL type 778 so it can be passed into the compiled script. 779 """ 780 section = var_type.__class__.__name__ + '(' # e.g.: 'WDLIntType(' 781 782 if isinstance(var_type, WDLCompoundType): 783 if isinstance(var_type, WDLArrayType): 784 section += self.write_declaration_type(var_type.element) 785 elif isinstance(var_type, WDLPairType): 786 section += self.write_declaration_type(var_type.left) + ', ' 787 section += self.write_declaration_type(var_type.right) 788 elif isinstance(var_type, WDLMapType): 789 section += self.write_declaration_type(var_type.key) + ', ' 790 section += self.write_declaration_type(var_type.value) 791 else: 792 raise ValueError(var_type) 793 794 if var_type.optional: 795 if isinstance(var_type, WDLCompoundType): 796 section += ', ' 797 section += 'optional=True' 798 return section + ')' 799 800 def write_function_bashscriptline(self, job): 801 """ 802 Writes a function to create a bashscript for injection into the docker 803 container. 804 805 :param job_task_reference: The job referenced in WDL's Task section. 806 :param job_alias: The actual job name to be written. 807 :return: A string writing all of this. 808 """ 809 fn_section = " generate_docker_bashscript_file(temp_dir=tempDir, docker_dir=tempDir, globs=[" 810 # TODO: Add glob 811 # if 'outputs' in self.tasks_dictionary[job]: 812 # for output in self.tasks_dictionary[job]['outputs']: 813 # fn_section += '({}), '.format(output[2]) 814 if fn_section.endswith(', '): 815 fn_section = fn_section[:-2] 816 fn_section += "], cmd=cmd, job_name='{}')\n\n".format(str(job)) 817 818 return fn_section 819 820 def write_function_dockercall(self, job): 821 """ 822 Writes a string containing the apiDockerCall() that will run the job. 823 824 :param job_task_reference: The name of the job calling docker. 825 :param docker_image: The corresponding name of the docker image. 826 e.g. "ubuntu:latest" 827 :return: A string containing the apiDockerCall() that will run the job. 828 """ 829 docker_dict = {"docker_image": self.tasks_dictionary[job]['runtime']['docker'], 830 "job_task_reference": job, 831 "docker_user": str(self.docker_user)} 832 docker_template = heredoc_wdl(''' 833 # apiDockerCall() with demux=True returns a tuple of bytes objects (stdout, stderr). 834 _toil_wdl_internal__stdout, _toil_wdl_internal__stderr = \\ 835 apiDockerCall(self, 836 image={docker_image}, 837 working_dir=tempDir, 838 parameters=[os.path.join(tempDir, "{job_task_reference}_script.sh")], 839 entrypoint="/bin/bash", 840 user={docker_user}, 841 stderr=True, 842 demux=True, 843 volumes={{tempDir: {{"bind": tempDir}}}}) 844 with open(os.path.join(_toil_wdl_internal__current_working_dir, '{job_task_reference}.log'), 'wb') as f: 845 if _toil_wdl_internal__stdout: 846 f.write(_toil_wdl_internal__stdout) 847 if _toil_wdl_internal__stderr: 848 f.write(_toil_wdl_internal__stderr) 849 ''', docker_dict, indent=' ')[1:] 850 851 return docker_template 852 853 def write_function_cmdline(self, job): 854 """ 855 Write a series of commandline variables to be concatenated together 856 eventually and either called with subprocess.Popen() or with 857 apiDockerCall() if a docker image is called for. 858 859 :param job: A list such that: 860 (job priority #, job ID #, Job Skeleton Name, Job Alias) 861 :return: A string representing this. 862 """ 863 864 fn_section = '\n' 865 cmd_array = [] 866 if 'raw_commandline' in self.tasks_dictionary[job]: 867 for cmd in self.tasks_dictionary[job]['raw_commandline']: 868 if not cmd.startswith("r'''"): 869 cmd = 'str({i} if not isinstance({i}, WDLFile) else process_and_read_file({i}, tempDir, fileStore)).strip("{nl}")'.format(i=cmd, nl=r"\n") 870 fn_section = fn_section + heredoc_wdl(''' 871 try: 872 # Intended to deal with "optional" inputs that may not exist 873 # TODO: handle this better 874 command{num} = {cmd} 875 except: 876 command{num} = ''\n''', {'cmd': cmd, 'num': self.cmd_num}, indent=' ') 877 cmd_array.append('command' + str(self.cmd_num)) 878 self.cmd_num = self.cmd_num + 1 879 880 if cmd_array: 881 fn_section += '\n cmd = ' 882 for command in cmd_array: 883 fn_section += '{command} + '.format(command=command) 884 if fn_section.endswith(' + '): 885 fn_section = fn_section[:-3] 886 fn_section += '\n cmd = textwrap.dedent(cmd.strip("{nl}"))\n'.format(nl=r"\n") 887 else: 888 # empty command section 889 fn_section += ' cmd = ""' 890 891 return fn_section 892 893 def write_function_subprocesspopen(self): 894 """ 895 Write a subprocess.Popen() call for this function and write it out as a 896 string. 897 898 :param job: A list such that: 899 (job priority #, job ID #, Job Skeleton Name, Job Alias) 900 :return: A string representing this. 901 """ 902 fn_section = heredoc_wdl(''' 903 this_process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 904 _toil_wdl_internal__stdout, _toil_wdl_internal__stderr = this_process.communicate()\n''', indent=' ') 905 906 return fn_section 907 908 def write_function_outputreturn(self, job, docker=False): 909 """ 910 Find the output values that this function needs and write them out as a 911 string. 912 913 :param job: A list such that: 914 (job priority #, job ID #, Job Skeleton Name, Job Alias) 915 :param job_task_reference: The name of the job to look up values for. 916 :return: A string representing this. 917 """ 918 919 fn_section = '' 920 921 fn_section += heredoc_wdl(''' 922 _toil_wdl_internal__stdout_file = generate_stdout_file(_toil_wdl_internal__stdout, 923 tempDir, 924 fileStore=fileStore) 925 _toil_wdl_internal__stderr_file = generate_stdout_file(_toil_wdl_internal__stderr, 926 tempDir, 927 fileStore=fileStore, 928 stderr=True) 929 ''', indent=' ')[1:] 930 931 if 'outputs' in self.tasks_dictionary[job]: 932 return_values = [] 933 for output in self.tasks_dictionary[job]['outputs']: 934 output_name = output[0] 935 output_type = output[1] 936 output_value = output[2] 937 938 if self.needs_file_import(output_type): 939 nonglob_dict = { 940 "output_name": output_name, 941 "output_type": self.write_declaration_type(output_type), 942 "expression": output_value, 943 "out_dir": self.output_directory} 944 945 nonglob_template = heredoc_wdl(''' 946 {output_name} = {output_type}.create( 947 {expression}, output=True) 948 {output_name} = process_outfile({output_name}, fileStore, tempDir, '{out_dir}') 949 ''', nonglob_dict, indent=' ')[1:] 950 fn_section += nonglob_template 951 return_values.append(output_name) 952 else: 953 fn_section += ' {} = {}\n'.format(output_name, output_value) 954 return_values.append(output_name) 955 956 if return_values: 957 fn_section += ' rvDict = {' 958 for return_value in return_values: 959 fn_section += '"{rv}": {rv}, '.format(rv=return_value) 960 if fn_section.endswith(', '): 961 fn_section = fn_section[:-2] 962 if return_values: 963 fn_section = fn_section + '}\n' 964 965 if return_values: 966 fn_section += ' return rvDict\n\n' 967 968 return fn_section 969 970 def indent(self, string2indent: str) -> str: 971 """ 972 Indent the input string by 4 spaces. 973 """ 974 split_string = string2indent.split('\n') 975 return '\n'.join(f' {line}' for line in split_string) 976 977 def needsdocker(self, job): 978 """ 979 980 :param job: 981 :return: 982 """ 983 if 'runtime' in self.tasks_dictionary[job]: 984 if 'docker' in self.tasks_dictionary[job]['runtime']: 985 return True 986 987 return False 988 989 def write_python_file(self, 990 module_section, 991 fn_section, 992 main_section, 993 output_file): 994 """ 995 Just takes three strings and writes them to output_file. 996 997 :param module_section: A string of 'import modules'. 998 :param fn_section: A string of python 'def functions()'. 999 :param main_section: A string declaring toil options and main's header. 1000 :param job_section: A string import files into toil and declaring jobs. 1001 :param output_file: The file to write the compiled toil script to. 1002 """ 1003 with open(output_file, 'w') as file: 1004 file.write(module_section) 1005 file.write(fn_section) 1006 file.write(main_section) 1007