1# Copyright (C) 2015-2021 Regents of the University of California
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import logging
15import os
16from typing import Optional
17
18from toil.wdl.wdl_functions import heredoc_wdl
19from toil.wdl.wdl_types import (WDLArrayType,
20                                WDLCompoundType,
21                                WDLFileType,
22                                WDLMapType,
23                                WDLPairType,
24                                WDLType)
25
26logger = logging.getLogger(__name__)
27
28
29class SynthesizeWDL:
30    """
31    SynthesizeWDL takes the "workflows_dictionary" and "tasks_dictionary" produced by
32    wdl_analysis.py and uses them to write a native python script for use with Toil.
33
34    A WDL "workflow" section roughly corresponds to the python "main()" function, where
35    functions are wrapped as Toil "jobs", output dependencies specified, and called.
36
37    A WDL "task" section corresponds to a unique python function, which will be wrapped
38    as a Toil "job" and defined outside of the "main()" function that calls it.
39
40    Generally this handles breaking sections into their corresponding Toil counterparts.
41
42    For example: write the imports, then write all functions defining jobs (which have subsections
43    like: write header, define variables, read "File" types into the jobstore, docker call, etc.),
44    then write the main and all of its subsections.
45    """
46
47    def __init__(self,
48                 version: str,
49                 tasks_dictionary: dict,
50                 workflows_dictionary: dict,
51                 output_directory: str,
52                 json_dict: dict,
53                 docker_user: str,
54                 jobstore: Optional[str] = None,
55                 destBucket: Optional[str] = None):
56
57        self.version = version
58        self.output_directory = output_directory
59        if not os.path.exists(self.output_directory):
60            try:
61                os.makedirs(self.output_directory)
62            except:
63                raise OSError(
64                    'Could not create directory.  Insufficient permissions or disk space most likely.')
65
66        self.output_file = os.path.join(self.output_directory, 'toilwdl_compiled.py')
67
68        self.jobstore = jobstore if jobstore else './toilWorkflowRun'
69
70        if docker_user != 'None':
71            self.docker_user = "'" + docker_user + "'"
72        else:
73            self.docker_user = docker_user
74
75        # only json is required; tsv/csv are optional
76        self.json_dict = json_dict
77
78        # holds task skeletons from WDL task objects
79        self.tasks_dictionary = tasks_dictionary
80        # holds workflow structure from WDL workflow objects
81        self.workflows_dictionary = workflows_dictionary
82
83        # keep track of which workflow is being written
84        self.current_workflow = None
85
86        # unique iterator to add to cmd names
87        self.cmd_num = 0
88
89        # deposit WDL outputs into a cloud bucket; optional
90        self.destBucket = destBucket
91
92    def write_modules(self):
93        # string used to write imports to the file
94        module_string = heredoc_wdl('''
95                    from toil.job import Job
96                    from toil.common import Toil
97                    from toil.lib.docker import apiDockerCall
98                    from toil.wdl.wdl_types import WDLType
99                    from toil.wdl.wdl_types import WDLStringType
100                    from toil.wdl.wdl_types import WDLIntType
101                    from toil.wdl.wdl_types import WDLFloatType
102                    from toil.wdl.wdl_types import WDLBooleanType
103                    from toil.wdl.wdl_types import WDLFileType
104                    from toil.wdl.wdl_types import WDLArrayType
105                    from toil.wdl.wdl_types import WDLPairType
106                    from toil.wdl.wdl_types import WDLMapType
107                    from toil.wdl.wdl_types import WDLFile
108                    from toil.wdl.wdl_types import WDLPair
109                    from toil.wdl.wdl_functions import generate_docker_bashscript_file
110                    from toil.wdl.wdl_functions import generate_stdout_file
111                    from toil.wdl.wdl_functions import select_first
112                    from toil.wdl.wdl_functions import sub
113                    from toil.wdl.wdl_functions import size
114                    from toil.wdl.wdl_functions import glob
115                    from toil.wdl.wdl_functions import process_and_read_file
116                    from toil.wdl.wdl_functions import process_infile
117                    from toil.wdl.wdl_functions import process_outfile
118                    from toil.wdl.wdl_functions import abspath_file
119                    from toil.wdl.wdl_functions import combine_dicts
120                    from toil.wdl.wdl_functions import parse_memory
121                    from toil.wdl.wdl_functions import parse_cores
122                    from toil.wdl.wdl_functions import parse_disk
123                    from toil.wdl.wdl_functions import read_lines
124                    from toil.wdl.wdl_functions import read_tsv
125                    from toil.wdl.wdl_functions import read_csv
126                    from toil.wdl.wdl_functions import read_json
127                    from toil.wdl.wdl_functions import read_map
128                    from toil.wdl.wdl_functions import read_int
129                    from toil.wdl.wdl_functions import read_string
130                    from toil.wdl.wdl_functions import read_float
131                    from toil.wdl.wdl_functions import read_boolean
132                    from toil.wdl.wdl_functions import write_lines
133                    from toil.wdl.wdl_functions import write_tsv
134                    from toil.wdl.wdl_functions import write_json
135                    from toil.wdl.wdl_functions import write_map
136                    from toil.wdl.wdl_functions import defined
137                    from toil.wdl.wdl_functions import basename
138                    from toil.wdl.wdl_functions import floor
139                    from toil.wdl.wdl_functions import ceil
140                    from toil.wdl.wdl_functions import wdl_range
141                    from toil.wdl.wdl_functions import transpose
142                    from toil.wdl.wdl_functions import length
143                    from toil.wdl.wdl_functions import wdl_zip
144                    from toil.wdl.wdl_functions import cross
145                    from toil.wdl.wdl_functions import as_pairs
146                    from toil.wdl.wdl_functions import as_map
147                    from toil.wdl.wdl_functions import keys
148                    from toil.wdl.wdl_functions import collect_by_key
149                    from toil.wdl.wdl_functions import flatten
150                    import fnmatch
151                    import textwrap
152                    import subprocess
153                    import os
154                    import errno
155                    import time
156                    import shutil
157                    import shlex
158                    import uuid
159                    import logging
160
161                    _toil_wdl_internal__current_working_dir = os.getcwd()
162
163                    logger = logging.getLogger(__name__)
164
165
166                        ''', {'jobstore': self.jobstore})[1:]
167        return module_string
168
169    def write_main(self):
170        """
171        Writes out a huge string representing the main section of the python
172        compiled toil script.
173
174        Currently looks at and writes 5 sections:
175        1. JSON Variables (includes importing and preparing files as tuples)
176        2. TSV Variables (includes importing and preparing files as tuples)
177        3. CSV Variables (includes importing and preparing files as tuples)
178        4. Wrapping each WDL "task" function as a toil job
179        5. List out children and encapsulated jobs by priority, then start job0.
180
181        This should create variable declarations necessary for function calls.
182        Map file paths appropriately and store them in the toil fileStore so
183        that they are persistent from job to job.  Create job wrappers for toil.
184        And finally write out, and run the jobs in order of priority using the
185        addChild and encapsulate commands provided by toil.
186
187        :return: giant string containing the main def for the toil script.
188        """
189
190        main_section = ''
191
192        # write out the main header
193        main_header = self.write_main_header()
194        main_section = main_section + main_header
195
196        # write toil job wrappers with input vars
197        jobs_to_write = self.write_main_jobwrappers()
198        main_section = main_section + jobs_to_write
199
200        # loop to export all outputs to a cloud bucket
201        if self.destBucket:
202            main_destbucket = self.write_main_destbucket()
203            main_section = main_section + main_destbucket
204
205        return main_section
206
207    def write_main_header(self):
208        main_header = heredoc_wdl('''
209            if __name__=="__main__":
210                options = Job.Runner.getDefaultOptions("{jobstore}")
211                options.clean = 'always'
212                with Toil(options) as fileStore:
213            ''', {'jobstore': self.jobstore})
214        return main_header
215
216    def write_main_jobwrappers(self):
217        """
218        Writes out 'jobs' as wrapped toil objects in preparation for calling.
219
220        :return: A string representing this.
221        """
222        main_section = ''
223
224        # toil cannot technically start with multiple jobs, so an empty
225        # 'initialize_jobs' function is always called first to get around this
226        main_section = main_section + '        job0 = Job.wrapJobFn(initialize_jobs)\n'
227
228        # declare each job in main as a wrapped toil function in order of priority
229        for wf in self.workflows_dictionary:
230            self.current_workflow = wf
231            for assignment in self.workflows_dictionary[wf]:
232                if assignment.startswith('declaration'):
233                    main_section += self.write_main_jobwrappers_declaration(self.workflows_dictionary[wf][assignment])
234                if assignment.startswith('call'):
235                    main_section += '        job0 = job0.encapsulate()\n'
236                    main_section += self.write_main_jobwrappers_call(self.workflows_dictionary[wf][assignment])
237                if assignment.startswith('scatter'):
238                    main_section += '        job0 = job0.encapsulate()\n'
239                    main_section += self.write_main_jobwrappers_scatter(self.workflows_dictionary[wf][assignment],
240                                                                        assignment)
241                if assignment.startswith('if'):
242                    main_section += '        if {}:\n'.format(self.workflows_dictionary[wf][assignment]['expression'])
243                    main_section += self.write_main_jobwrappers_if(self.workflows_dictionary[wf][assignment]['body'])
244
245        main_section += '\n        fileStore.start(job0)\n'
246
247        return main_section
248
249    def write_main_jobwrappers_declaration(self, declaration):
250
251        main_section = ''
252        var_name, var_type, var_expr = declaration
253
254        # check the json file for the expression's value
255        # this is a higher priority and overrides anything written in the .wdl
256        json_expressn = self.json_var(wf=self.current_workflow, var=var_name)
257        if json_expressn is not None:
258            var_expr = json_expressn
259
260        main_section += '        {} = {}.create(\n                {})\n' \
261            .format(var_name, self.write_declaration_type(var_type), var_expr)
262
263        # import filepath into jobstore
264        if self.needs_file_import(var_type) and var_expr:
265            main_section += '        {} = process_infile({}, fileStore)\n'.format(var_name, var_name)
266
267        return main_section
268
269    def write_main_destbucket(self):
270        """
271        Writes out a loop for exporting outputs to a cloud bucket.
272
273        :return: A string representing this.
274        """
275        main_section = heredoc_wdl('''
276            outdir = '{outdir}'
277            onlyfiles = [os.path.join(outdir, f) for f in os.listdir(outdir) if os.path.isfile(os.path.join(outdir, f))]
278            for output_f_path in onlyfiles:
279                output_file = fileStore.writeGlobalFile(output_f_path)
280                preserveThisFilename = os.path.basename(output_f_path)
281                destUrl = '/'.join(s.strip('/') for s in [destBucket, preserveThisFilename])
282                fileStore.exportFile(output_file, destUrl)
283            ''', {'outdir': self.output_directory}, indent='    ')
284        return main_section
285
286    def fetch_ignoredifs(self, assignments, breaking_assignment):
287        ignore_ifs = []
288        for assignment in assignments:
289            if assignment.startswith('call'):
290                pass
291            elif assignment.startswith('scatter'):
292                pass
293            elif assignment.startswith('if'):
294                if not self.fetch_ignoredifs_chain(assignments[assignment]['body'], breaking_assignment):
295                    ignore_ifs.append(assignment)
296        return ignore_ifs
297
298    def fetch_ignoredifs_chain(self, assignments, breaking_assignment):
299        for assignment in assignments:
300            if assignment.startswith('call'):
301                if assignment == breaking_assignment:
302                    return True
303            if assignment.startswith('scatter'):
304                if assignment == breaking_assignment:
305                    return True
306            if assignment.startswith('if'):
307                return self.fetch_ignoredifs_chain(assignments[assignment]['body'], breaking_assignment)
308        return False
309
310    def write_main_jobwrappers_if(self, if_statement):
311        # check for empty if statement
312        if not if_statement:
313            return self.indent('        pass')
314
315        main_section = ''
316        for assignment in if_statement:
317            if assignment.startswith('declaration'):
318                main_section += self.write_main_jobwrappers_declaration(if_statement[assignment])
319            if assignment.startswith('call'):
320                main_section += '        job0 = job0.encapsulate()\n'
321                main_section += self.write_main_jobwrappers_call(if_statement[assignment])
322            if assignment.startswith('scatter'):
323                main_section += '        job0 = job0.encapsulate()\n'
324                main_section += self.write_main_jobwrappers_scatter(if_statement[assignment], assignment)
325            if assignment.startswith('if'):
326                main_section += '        if {}:\n'.format(if_statement[assignment]['expression'])
327                main_section += self.write_main_jobwrappers_if(if_statement[assignment]['body'])
328        main_section = self.indent(main_section)
329        return main_section
330
331    def write_main_jobwrappers_scatter(self, task, assignment):
332        scatter_inputs = self.fetch_scatter_inputs(assignment)
333
334        main_section = '        {scatter} = job0.addChild({scatter}Cls('.format(scatter=assignment)
335        for var in scatter_inputs:
336            main_section += var + '=' + var + ', '
337        if main_section.endswith(', '):
338            main_section = main_section[:-2]
339        main_section += '))\n'
340
341        scatter_outputs = self.fetch_scatter_outputs(task)
342        for var in scatter_outputs:
343            main_section += '        {var} = {scatter}.rv("{var}")\n'.format(var=var['task'] + '_' + var['output'], scatter=assignment)
344
345        return main_section
346
347    def fetch_scatter_outputs(self, task):
348        scatteroutputs = []
349
350        for var in task['body']:
351            # TODO variable support
352            if var.startswith('call'):
353                if 'outputs' in self.tasks_dictionary[task['body'][var]['task']]:
354                    for output in self.tasks_dictionary[task['body'][var]['task']]['outputs']:
355                        scatteroutputs.append({'task': task['body'][var]['alias'], 'output': output[0]})
356        return scatteroutputs
357
358    def fetch_scatter_inputs(self, assigned):
359
360        for wf in self.workflows_dictionary:
361            ignored_ifs = self.fetch_ignoredifs(self.workflows_dictionary[wf], assigned)
362            # TODO support additional wfs
363            break
364
365        scatternamespace = []
366
367        for wf in self.workflows_dictionary:
368            for assignment in self.workflows_dictionary[wf]:
369                if assignment == assigned:
370                    return scatternamespace
371                elif assignment.startswith('declaration'):
372                    name, _, _ = self.workflows_dictionary[wf][assignment]
373                    scatternamespace.append(name)
374                elif assignment.startswith('call'):
375                    if 'outputs' in self.tasks_dictionary[self.workflows_dictionary[wf][assignment]['task']]:
376                        for output in self.tasks_dictionary[self.workflows_dictionary[wf][assignment]['task']]['outputs']:
377                            scatternamespace.append(self.workflows_dictionary[wf][assignment]['alias'] + '_' + output[0])
378                elif assignment.startswith('scatter'):
379                    for var in self.fetch_scatter_outputs(self.workflows_dictionary[wf][assignment]):
380                        scatternamespace.append(var['task'] + '_' + var['output'])
381                elif assignment.startswith('if') and assignment not in ignored_ifs:
382                    new_list, cont_or_break = self.fetch_scatter_inputs_chain(self.workflows_dictionary[wf][assignment]['body'],
383                                                                        assigned,
384                                                                        ignored_ifs,
385                                                                        inputs_list=[])
386                    scatternamespace += new_list
387                    if not cont_or_break:
388                        return scatternamespace
389        return scatternamespace
390
391    def fetch_scatter_inputs_chain(self, inputs, assigned, ignored_ifs, inputs_list):
392        for i in inputs:
393            if i == assigned:
394                return inputs_list, False
395            elif i.startswith('call'):
396                if 'outputs' in self.tasks_dictionary[inputs[i]['task']]:
397                    for output in self.tasks_dictionary[inputs[i]['task']]['outputs']:
398                        inputs_list.append(inputs[i]['alias'] + '_' + output[0])
399            elif i.startswith('scatter'):
400                for var in self.fetch_scatter_outputs(inputs[i]):
401                    inputs_list.append(var['task'] + '_' + var['output'])
402            elif i.startswith('if') and i not in ignored_ifs:
403                inputs_list, cont_or_break = self.fetch_scatter_inputs_chain(inputs[i]['body'], assigned, ignored_ifs, inputs_list)
404                if not cont_or_break:
405                    return inputs_list, False
406        return inputs_list, True
407
408    def write_main_jobwrappers_call(self, task):
409        main_section = '        {} = job0.addChild({}Cls('.format(task['alias'], task['task'])
410        for var in task['io']:
411            main_section += var + '=' + task['io'][var] + ', '
412        if main_section.endswith(', '):
413            main_section = main_section[:-2]
414        main_section += '))\n'
415
416        call_outputs = self.fetch_call_outputs(task)
417        for var in call_outputs:
418            main_section += '        {var} = {task}.rv("{output}")\n'.format(var=var['task'] + '_' + var['output'],
419                                                                             task=var['task'],
420                                                                             output=var['output'])
421        return main_section
422
423    def fetch_call_outputs(self, task):
424        calloutputs = []
425        if 'outputs' in self.tasks_dictionary[task['task']]:
426            for output in self.tasks_dictionary[task['task']]['outputs']:
427                calloutputs.append({'task': task['alias'], 'output': output[0]})
428        return calloutputs
429
430    def write_functions(self):
431        """
432        Writes out a python function for each WDL "task" object.
433
434        :return: a giant string containing the meat of the job defs.
435        """
436
437        # toil cannot technically start with multiple jobs, so an empty
438        # 'initialize_jobs' function is always called first to get around this
439        fn_section = 'def initialize_jobs(job):\n' + \
440                     '    job.fileStore.logToMaster("initialize_jobs")\n'
441
442        for job in self.tasks_dictionary:
443            fn_section += self.write_function(job)
444
445        for wf in self.workflows_dictionary:
446            for assignment in self.workflows_dictionary[wf]:
447                if assignment.startswith('scatter'):
448                    fn_section += self.write_scatterfunction(self.workflows_dictionary[wf][assignment], assignment)
449                if assignment.startswith('if'):
450                    fn_section += self.write_scatterfunctions_within_if(self.workflows_dictionary[wf][assignment]['body'])
451
452        return fn_section
453
454    def write_scatterfunctions_within_if(self, ifstatement):
455        fn_section = ''
456        for assignment in ifstatement:
457            if assignment.startswith('scatter'):
458                fn_section += self.write_scatterfunction(ifstatement[assignment], assignment)
459            if assignment.startswith('if'):
460                fn_section += self.write_scatterfunctions_within_if(ifstatement[assignment]['body'])
461        return fn_section
462
463    def write_scatterfunction(self, job, scattername):
464        """
465        Writes out a python function for each WDL "scatter" object.
466        """
467
468        scatter_outputs = self.fetch_scatter_outputs(job)
469
470        # write the function header
471        fn_section = self.write_scatterfunction_header(scattername)
472
473        # write the scatter definitions
474        fn_section += self.write_scatterfunction_lists(scatter_outputs)
475
476        # write
477        fn_section += self.write_scatterfunction_loop(job, scatter_outputs)
478
479        # write the outputs for the task to return
480        fn_section += self.write_scatterfunction_outputreturn(scatter_outputs)
481
482        return fn_section
483
484    def write_scatterfunction_header(self, scattername):
485        """
486
487        :return:
488        """
489        scatter_inputs = self.fetch_scatter_inputs(scattername)
490
491        fn_section = '\n\nclass {jobname}Cls(Job):\n'.format(jobname=scattername)
492        fn_section += '    def __init__(self, '
493        for input in scatter_inputs:
494            fn_section += '{input}=None, '.format(input=input)
495        fn_section += '*args, **kwargs):\n'
496        fn_section += '        Job.__init__(self)\n\n'
497
498        for input in scatter_inputs:
499            fn_section += '        self.id_{input} = {input}\n'.format(input=input)
500
501        fn_section += heredoc_wdl('''
502
503                             def run(self, fileStore):
504                                 fileStore.logToMaster("{jobname}")
505                                 tempDir = fileStore.getLocalTempDir()
506
507                                 try:
508                                     os.makedirs(os.path.join(tempDir, 'execution'))
509                                 except OSError as e:
510                                     if e.errno != errno.EEXIST:
511                                         raise
512                                 ''', {'jobname': scattername}, indent='    ')[1:]
513        for input in scatter_inputs:
514            fn_section += '        {input} = self.id_{input}\n'.format(input=input)
515        return fn_section
516
517    def write_scatterfunction_outputreturn(self, scatter_outputs):
518        """
519
520        :return:
521        """
522        fn_section = '\n        rvDict = {'
523        for var in scatter_outputs:
524            fn_section += '"{var}": {var}, '.format(var=var['task'] + '_' + var['output'])
525        if fn_section.endswith(', '):
526            fn_section = fn_section[:-2]
527        fn_section += '}\n'
528        fn_section += '        return rvDict\n\n'
529
530        return fn_section[:-1]
531
532    def write_scatterfunction_lists(self, scatter_outputs):
533        """
534
535        :return:
536        """
537        fn_section = '\n'
538        for var in scatter_outputs:
539            fn_section += '        {var} = []\n'.format(var=var['task'] + '_' + var['output'])
540
541        return fn_section
542
543    def write_scatterfunction_loop(self, job, scatter_outputs):
544        """
545
546        :return:
547        """
548        collection = job['collection']
549        item = job['item']
550
551        fn_section = '        for {item} in {collection}:\n'.format(item=item, collection=collection)
552
553        previous_dependency = 'self'
554        for statement in job['body']:
555            if statement.startswith('declaration'):
556                # reusing write_main_jobwrappers_declaration() here, but it needs to be indented one more level.
557                fn_section += self.indent(
558                    self.write_main_jobwrappers_declaration(job['body'][statement]))
559            elif statement.startswith('call'):
560                fn_section += self.write_scatter_callwrapper(job['body'][statement], previous_dependency)
561                previous_dependency = 'job_' + job['body'][statement]['alias']
562            elif statement.startswith('scatter'):
563                raise NotImplementedError('nested scatter not implemented.')
564            elif statement.startswith('if'):
565                fn_section += '            if {}:\n'.format(job['body'][statement]['expression'])
566                # reusing write_main_jobwrappers_if() here, but it needs to be indented one more level.
567                fn_section += self.indent(self.write_main_jobwrappers_if(job['body'][statement]['body']))
568
569        # check for empty scatter section
570        if len(job['body']) == 0:
571            fn_section += '            pass'
572
573        for var in scatter_outputs:
574            fn_section += '            {var}.append({task}.rv("{output}"))\n'.format(var=var['task'] + '_' + var['output'],
575                                                                                     task='job_' + var['task'],
576                                                                                     output=var['output'])
577        return fn_section
578
579    def write_scatter_callwrapper(self, job, previous_dependency):
580        fn_section = '            job_{alias} = {pd}.addFollowOn({task}Cls('.format(alias=job['alias'],
581                                                                                    pd=previous_dependency,
582                                                                                    task=job['task'])
583        for var in job['io']:
584            fn_section += var + '=' + job['io'][var] + ', '
585        if fn_section.endswith(', '):
586            fn_section = fn_section[:-2]
587        fn_section += '))\n'
588        return fn_section
589
590    def write_function(self, job):
591        """
592        Writes out a python function for each WDL "task" object.
593
594        Each python function is a unit of work written out as a string in
595        preparation to being written out to a file.  In WDL, each "job" is
596        called a "task".  Each WDL task is written out in multiple steps:
597
598        1: Header and inputs (e.g. 'def mapping(self, input1, input2)')
599        2: Log job name (e.g. 'job.fileStore.logToMaster('initialize_jobs')')
600        3: Create temp dir (e.g. 'tempDir = fileStore.getLocalTempDir()')
601        4: import filenames and use readGlobalFile() to get files from the
602           jobStore
603        5: Reformat commandline variables (like converting to ' '.join(files)).
604        6: Commandline call using subprocess.Popen().
605        7: Write the section returning the outputs.  Also logs stats.
606
607        :return: a giant string containing the meat of the job defs for the toil script.
608        """
609
610        # write the function header
611        fn_section = self.write_function_header(job)
612
613        # write out commandline keywords
614        fn_section += self.write_function_cmdline(job)
615
616        if self.needsdocker(job):
617            # write a bash script to inject into the docker
618            fn_section += self.write_function_bashscriptline(job)
619            # write a call to the docker API
620            fn_section += self.write_function_dockercall(job)
621        else:
622            # write a subprocess call
623            fn_section += self.write_function_subprocesspopen()
624
625        # write the outputs for the definition to return
626        fn_section += self.write_function_outputreturn(job, docker=self.needsdocker(job))
627
628        return fn_section
629
630    def write_function_header(self, job):
631        """
632        Writes the header that starts each function, for example, this function
633        can write and return:
634
635        'def write_function_header(self, job, job_declaration_array):'
636
637        :param job: A list such that:
638                        (job priority #, job ID #, Job Skeleton Name, Job Alias)
639        :param job_declaration_array: A list of all inputs that job requires.
640        :return: A string representing this.
641        """
642        fn_section = '\n\nclass {jobname}Cls(Job):\n'.format(jobname=job)
643        fn_section += '    def __init__(self, '
644        if 'inputs' in self.tasks_dictionary[job]:
645            for i in self.tasks_dictionary[job]['inputs']:
646                var = i[0]
647                vartype = i[1]
648                if vartype == 'String':
649                    fn_section += '{input}="", '.format(input=var)
650                else:
651                    fn_section += '{input}=None, '.format(input=var)
652        fn_section += '*args, **kwargs):\n'
653        fn_section += '        super({jobname}Cls, self).__init__(*args, **kwargs)\n'.format(jobname=job)
654
655        # TODO: Resolve inherent problems resolving resource requirements
656        # In WDL, "local-disk " + 500 + " HDD" cannot be directly converted to python.
657        # This needs a special handler.
658        if 'runtime' in self.tasks_dictionary[job]:
659            runtime_resources = []
660            if 'memory' in self.tasks_dictionary[job]['runtime']:
661                runtime_resources.append('memory=memory')
662                memory = self.tasks_dictionary[job]['runtime']['memory']
663                fn_section += '        memory=parse_memory({})\n'.format(memory)
664            if 'cpu' in self.tasks_dictionary[job]['runtime']:
665                runtime_resources.append('cores=cores')
666                cores = self.tasks_dictionary[job]['runtime']['cpu']
667                fn_section += '        cores=parse_cores({})\n'.format(cores)
668            if 'disks' in self.tasks_dictionary[job]['runtime']:
669                runtime_resources.append('disk=disk')
670                disk = self.tasks_dictionary[job]['runtime']['disks']
671                fn_section += '        disk=parse_disk({})\n'.format(disk)
672            runtime_resources = ['self'] + runtime_resources
673            fn_section += '        Job.__init__({})\n\n'.format(', '.join(runtime_resources))
674
675        if 'inputs' in self.tasks_dictionary[job]:
676            for i in self.tasks_dictionary[job]['inputs']:
677                var = i[0]
678                var_type = i[1]
679                var_expressn = i[2]
680                json_expressn = self.json_var(task=job, var=var)
681
682                # json declarations have priority and can overwrite
683                # whatever is in the wdl file
684                if json_expressn is not None:
685                    var_expressn = json_expressn
686
687                if var_expressn is None:
688                    # declarations from workflow
689                    fn_section += '        self.id_{} = {}\n'.format(var, var)
690                else:
691                    # declarations from a WDL or JSON file
692                    fn_section += '        self.id_{} = {}.create(\n                {})\n'\
693                        .format(var, self.write_declaration_type(var_type), var_expressn)
694
695        fn_section += heredoc_wdl('''
696
697                             def run(self, fileStore):
698                                 fileStore.logToMaster("{jobname}")
699                                 tempDir = fileStore.getLocalTempDir()
700
701                                 _toil_wdl_internal__stdout_file = os.path.join(tempDir, 'stdout')
702                                 _toil_wdl_internal__stderr_file = os.path.join(tempDir, 'stderr')
703
704                                 try:
705                                     os.makedirs(os.path.join(tempDir, 'execution'))
706                                 except OSError as e:
707                                     if e.errno != errno.EEXIST:
708                                         raise
709                                 ''', {'jobname': job}, indent='    ')[1:]
710        if 'inputs' in self.tasks_dictionary[job]:
711            for i in self.tasks_dictionary[job]['inputs']:
712                var = i[0]
713                var_type = i[1]
714
715                docker_bool = str(self.needsdocker(job))
716
717                if self.needs_file_import(var_type):
718                    args = ', '.join(
719                        [
720                            f'abspath_file(self.id_{var}, _toil_wdl_internal__current_working_dir)',
721                            'tempDir',
722                            'fileStore',
723                            f'docker={docker_bool}'
724                        ])
725                    fn_section += '        {} = process_and_read_file({})\n'.format(var, args)
726                else:
727                    fn_section += '        {} = self.id_{}\n'.format(var, var)
728
729        return fn_section
730
731    def json_var(self, var, task=None, wf=None):
732        """
733
734        :param var:
735        :param task:
736        :param wf:
737        :return:
738        """
739        # default to the last workflow in the list
740        if wf is None:
741            for workflow in self.workflows_dictionary:
742                wf = workflow
743
744        for identifier in self.json_dict:
745            # check task declarations
746            if task:
747                if identifier == '{}.{}.{}'.format(wf, task, var):
748                    return self.json_dict[identifier]
749            # else check workflow declarations
750            else:
751                if identifier == '{}.{}'.format(wf, var):
752                    return self.json_dict[identifier]
753
754        return None
755
756    def needs_file_import(self, var_type: WDLType) -> bool:
757        """
758        Check if the given type contains a File type. A return value of True
759        means that the value with this type has files to import.
760        """
761        if isinstance(var_type, WDLFileType):
762            return True
763
764        if isinstance(var_type, WDLCompoundType):
765            if isinstance(var_type, WDLArrayType):
766                return self.needs_file_import(var_type.element)
767            elif isinstance(var_type, WDLPairType):
768                return self.needs_file_import(var_type.left) or self.needs_file_import(var_type.right)
769            elif isinstance(var_type, WDLMapType):
770                return self.needs_file_import(var_type.key) or self.needs_file_import(var_type.value)
771            else:
772                raise NotImplementedError
773        return False
774
775    def write_declaration_type(self, var_type: WDLType):
776        """
777        Return a string that preserves the construction of the given WDL type
778        so it can be passed into the compiled script.
779        """
780        section = var_type.__class__.__name__ + '('  # e.g.: 'WDLIntType('
781
782        if isinstance(var_type, WDLCompoundType):
783            if isinstance(var_type, WDLArrayType):
784                section += self.write_declaration_type(var_type.element)
785            elif isinstance(var_type, WDLPairType):
786                section += self.write_declaration_type(var_type.left) + ', '
787                section += self.write_declaration_type(var_type.right)
788            elif isinstance(var_type, WDLMapType):
789                section += self.write_declaration_type(var_type.key) + ', '
790                section += self.write_declaration_type(var_type.value)
791            else:
792                raise ValueError(var_type)
793
794        if var_type.optional:
795            if isinstance(var_type, WDLCompoundType):
796                section += ', '
797            section += 'optional=True'
798        return section + ')'
799
800    def write_function_bashscriptline(self, job):
801        """
802        Writes a function to create a bashscript for injection into the docker
803        container.
804
805        :param job_task_reference: The job referenced in WDL's Task section.
806        :param job_alias: The actual job name to be written.
807        :return: A string writing all of this.
808        """
809        fn_section = "        generate_docker_bashscript_file(temp_dir=tempDir, docker_dir=tempDir, globs=["
810        # TODO: Add glob
811        # if 'outputs' in self.tasks_dictionary[job]:
812        #     for output in self.tasks_dictionary[job]['outputs']:
813        #         fn_section += '({}), '.format(output[2])
814        if fn_section.endswith(', '):
815            fn_section = fn_section[:-2]
816        fn_section += "], cmd=cmd, job_name='{}')\n\n".format(str(job))
817
818        return fn_section
819
820    def write_function_dockercall(self, job):
821        """
822        Writes a string containing the apiDockerCall() that will run the job.
823
824        :param job_task_reference: The name of the job calling docker.
825        :param docker_image: The corresponding name of the docker image.
826                                                            e.g. "ubuntu:latest"
827        :return: A string containing the apiDockerCall() that will run the job.
828        """
829        docker_dict = {"docker_image": self.tasks_dictionary[job]['runtime']['docker'],
830                       "job_task_reference": job,
831                       "docker_user": str(self.docker_user)}
832        docker_template = heredoc_wdl('''
833        # apiDockerCall() with demux=True returns a tuple of bytes objects (stdout, stderr).
834        _toil_wdl_internal__stdout, _toil_wdl_internal__stderr = \\
835            apiDockerCall(self,
836                          image={docker_image},
837                          working_dir=tempDir,
838                          parameters=[os.path.join(tempDir, "{job_task_reference}_script.sh")],
839                          entrypoint="/bin/bash",
840                          user={docker_user},
841                          stderr=True,
842                          demux=True,
843                          volumes={{tempDir: {{"bind": tempDir}}}})
844        with open(os.path.join(_toil_wdl_internal__current_working_dir, '{job_task_reference}.log'), 'wb') as f:
845            if _toil_wdl_internal__stdout:
846                f.write(_toil_wdl_internal__stdout)
847            if _toil_wdl_internal__stderr:
848                f.write(_toil_wdl_internal__stderr)
849        ''', docker_dict, indent='        ')[1:]
850
851        return docker_template
852
853    def write_function_cmdline(self, job):
854        """
855        Write a series of commandline variables to be concatenated together
856        eventually and either called with subprocess.Popen() or with
857        apiDockerCall() if a docker image is called for.
858
859        :param job: A list such that:
860                        (job priority #, job ID #, Job Skeleton Name, Job Alias)
861        :return: A string representing this.
862        """
863
864        fn_section = '\n'
865        cmd_array = []
866        if 'raw_commandline' in self.tasks_dictionary[job]:
867            for cmd in self.tasks_dictionary[job]['raw_commandline']:
868                if not cmd.startswith("r'''"):
869                    cmd = 'str({i} if not isinstance({i}, WDLFile) else process_and_read_file({i}, tempDir, fileStore)).strip("{nl}")'.format(i=cmd, nl=r"\n")
870                fn_section = fn_section + heredoc_wdl('''
871                        try:
872                            # Intended to deal with "optional" inputs that may not exist
873                            # TODO: handle this better
874                            command{num} = {cmd}
875                        except:
876                            command{num} = ''\n''', {'cmd': cmd, 'num': self.cmd_num}, indent='        ')
877                cmd_array.append('command' + str(self.cmd_num))
878                self.cmd_num = self.cmd_num + 1
879
880        if cmd_array:
881            fn_section += '\n        cmd = '
882            for command in cmd_array:
883                fn_section += '{command} + '.format(command=command)
884            if fn_section.endswith(' + '):
885                fn_section = fn_section[:-3]
886            fn_section += '\n        cmd = textwrap.dedent(cmd.strip("{nl}"))\n'.format(nl=r"\n")
887        else:
888            # empty command section
889            fn_section += '        cmd = ""'
890
891        return fn_section
892
893    def write_function_subprocesspopen(self):
894        """
895        Write a subprocess.Popen() call for this function and write it out as a
896        string.
897
898        :param job: A list such that:
899                        (job priority #, job ID #, Job Skeleton Name, Job Alias)
900        :return: A string representing this.
901        """
902        fn_section = heredoc_wdl('''
903                this_process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
904                _toil_wdl_internal__stdout, _toil_wdl_internal__stderr = this_process.communicate()\n''', indent='        ')
905
906        return fn_section
907
908    def write_function_outputreturn(self, job, docker=False):
909        """
910        Find the output values that this function needs and write them out as a
911        string.
912
913        :param job: A list such that:
914                        (job priority #, job ID #, Job Skeleton Name, Job Alias)
915        :param job_task_reference: The name of the job to look up values for.
916        :return: A string representing this.
917        """
918
919        fn_section = ''
920
921        fn_section += heredoc_wdl('''
922            _toil_wdl_internal__stdout_file = generate_stdout_file(_toil_wdl_internal__stdout,
923                                                                   tempDir,
924                                                                   fileStore=fileStore)
925            _toil_wdl_internal__stderr_file = generate_stdout_file(_toil_wdl_internal__stderr,
926                                                                   tempDir,
927                                                                   fileStore=fileStore,
928                                                                   stderr=True)
929        ''', indent='        ')[1:]
930
931        if 'outputs' in self.tasks_dictionary[job]:
932            return_values = []
933            for output in self.tasks_dictionary[job]['outputs']:
934                output_name = output[0]
935                output_type = output[1]
936                output_value = output[2]
937
938                if self.needs_file_import(output_type):
939                    nonglob_dict = {
940                        "output_name": output_name,
941                        "output_type": self.write_declaration_type(output_type),
942                        "expression": output_value,
943                        "out_dir": self.output_directory}
944
945                    nonglob_template = heredoc_wdl('''
946                        {output_name} = {output_type}.create(
947                            {expression}, output=True)
948                        {output_name} = process_outfile({output_name}, fileStore, tempDir, '{out_dir}')
949                    ''', nonglob_dict, indent='        ')[1:]
950                    fn_section += nonglob_template
951                    return_values.append(output_name)
952                else:
953                    fn_section += '        {} = {}\n'.format(output_name, output_value)
954                    return_values.append(output_name)
955
956            if return_values:
957                fn_section += '        rvDict = {'
958            for return_value in return_values:
959                fn_section += '"{rv}": {rv}, '.format(rv=return_value)
960            if fn_section.endswith(', '):
961                fn_section = fn_section[:-2]
962            if return_values:
963                fn_section = fn_section + '}\n'
964
965            if return_values:
966                fn_section += '        return rvDict\n\n'
967
968        return fn_section
969
970    def indent(self, string2indent: str) -> str:
971        """
972        Indent the input string by 4 spaces.
973        """
974        split_string = string2indent.split('\n')
975        return '\n'.join(f'    {line}' for line in split_string)
976
977    def needsdocker(self, job):
978        """
979
980        :param job:
981        :return:
982        """
983        if 'runtime' in self.tasks_dictionary[job]:
984            if 'docker' in self.tasks_dictionary[job]['runtime']:
985                return True
986
987        return False
988
989    def write_python_file(self,
990                          module_section,
991                          fn_section,
992                          main_section,
993                          output_file):
994        """
995        Just takes three strings and writes them to output_file.
996
997        :param module_section: A string of 'import modules'.
998        :param fn_section: A string of python 'def functions()'.
999        :param main_section: A string declaring toil options and main's header.
1000        :param job_section: A string import files into toil and declaring jobs.
1001        :param output_file: The file to write the compiled toil script to.
1002        """
1003        with open(output_file, 'w') as file:
1004            file.write(module_section)
1005            file.write(fn_section)
1006            file.write(main_section)
1007