1# encoding: utf-8 2""" 3Job and task components for writing .xml files that the Windows HPC Server 42008 can use to start jobs. 5 6Authors: 7 8* Brian Granger 9* MinRK 10 11""" 12 13#----------------------------------------------------------------------------- 14# Copyright (C) 2008-2011 The IPython Development Team 15# 16# Distributed under the terms of the BSD License. The full license is in 17# the file COPYING, distributed as part of this software. 18#----------------------------------------------------------------------------- 19 20#----------------------------------------------------------------------------- 21# Imports 22#----------------------------------------------------------------------------- 23 24import os 25import re 26import uuid 27 28from xml.etree import ElementTree as ET 29 30from traitlets.config.configurable import Configurable 31from ipython_genutils.py3compat import iteritems 32from traitlets import ( 33 Unicode, Integer, List, Instance, 34 Enum, Bool 35) 36 37#----------------------------------------------------------------------------- 38# Job and Task classes 39#----------------------------------------------------------------------------- 40 41 42def as_str(value): 43 if isinstance(value, str): 44 return value 45 elif isinstance(value, bool): 46 if value: 47 return 'true' 48 else: 49 return 'false' 50 elif isinstance(value, (int, float)): 51 return repr(value) 52 else: 53 return value 54 55 56def indent(elem, level=0): 57 i = "\n" + level*" " 58 if len(elem): 59 if not elem.text or not elem.text.strip(): 60 elem.text = i + " " 61 if not elem.tail or not elem.tail.strip(): 62 elem.tail = i 63 for elem in elem: 64 indent(elem, level+1) 65 if not elem.tail or not elem.tail.strip(): 66 elem.tail = i 67 else: 68 if level and (not elem.tail or not elem.tail.strip()): 69 elem.tail = i 70 71 72def find_username(): 73 domain = os.environ.get('USERDOMAIN') 74 username = os.environ.get('USERNAME','') 75 if domain is None: 76 return username 77 else: 78 return '%s\\%s' % (domain, username) 79 80 81class WinHPCJob(Configurable): 82 83 job_id = Unicode('') 84 job_name = Unicode('MyJob', config=True) 85 min_cores = Integer(1, config=True) 86 max_cores = Integer(1, config=True) 87 min_sockets = Integer(1, config=True) 88 max_sockets = Integer(1, config=True) 89 min_nodes = Integer(1, config=True) 90 max_nodes = Integer(1, config=True) 91 unit_type = Unicode("Core", config=True) 92 auto_calculate_min = Bool(True, config=True) 93 auto_calculate_max = Bool(True, config=True) 94 run_until_canceled = Bool(False, config=True) 95 is_exclusive = Bool(False, config=True) 96 username = Unicode(find_username(), config=True) 97 job_type = Unicode('Batch', config=True) 98 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'), 99 default_value='Highest', config=True) 100 requested_nodes = Unicode('', config=True) 101 project = Unicode('IPython', config=True) 102 xmlns = Unicode('http://schemas.microsoft.com/HPCS2008/scheduler/') 103 version = Unicode("2.000") 104 tasks = List([]) 105 106 @property 107 def owner(self): 108 return self.username 109 110 def _write_attr(self, root, attr, key): 111 s = as_str(getattr(self, attr, '')) 112 if s: 113 root.set(key, s) 114 115 def as_element(self): 116 # We have to add _A_ type things to get the right order than 117 # the MSFT XML parser expects. 118 root = ET.Element('Job') 119 self._write_attr(root, 'version', '_A_Version') 120 self._write_attr(root, 'job_name', '_B_Name') 121 self._write_attr(root, 'unit_type', '_C_UnitType') 122 self._write_attr(root, 'min_cores', '_D_MinCores') 123 self._write_attr(root, 'max_cores', '_E_MaxCores') 124 self._write_attr(root, 'min_sockets', '_F_MinSockets') 125 self._write_attr(root, 'max_sockets', '_G_MaxSockets') 126 self._write_attr(root, 'min_nodes', '_H_MinNodes') 127 self._write_attr(root, 'max_nodes', '_I_MaxNodes') 128 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled') 129 self._write_attr(root, 'is_exclusive', '_K_IsExclusive') 130 self._write_attr(root, 'username', '_L_UserName') 131 self._write_attr(root, 'job_type', '_M_JobType') 132 self._write_attr(root, 'priority', '_N_Priority') 133 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes') 134 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax') 135 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin') 136 self._write_attr(root, 'project', '_R_Project') 137 self._write_attr(root, 'owner', '_S_Owner') 138 self._write_attr(root, 'xmlns', '_T_xmlns') 139 dependencies = ET.SubElement(root, "Dependencies") 140 etasks = ET.SubElement(root, "Tasks") 141 for t in self.tasks: 142 etasks.append(t.as_element()) 143 return root 144 145 def tostring(self): 146 """Return the string representation of the job description XML.""" 147 root = self.as_element() 148 indent(root) 149 txt = ET.tostring(root, encoding="utf-8").decode('utf-8') 150 # Now remove the tokens used to order the attributes. 151 txt = re.sub(r'_[A-Z]_','',txt) 152 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt 153 return txt 154 155 def write(self, filename): 156 """Write the XML job description to a file.""" 157 txt = self.tostring() 158 with open(filename, 'w') as f: 159 f.write(txt) 160 161 def add_task(self, task): 162 """Add a task to the job. 163 164 Parameters 165 ---------- 166 task : :class:`WinHPCTask` 167 The task object to add. 168 """ 169 self.tasks.append(task) 170 171 172class WinHPCTask(Configurable): 173 174 task_id = Unicode('') 175 task_name = Unicode('') 176 version = Unicode("2.000") 177 min_cores = Integer(1, config=True) 178 max_cores = Integer(1, config=True) 179 min_sockets = Integer(1, config=True) 180 max_sockets = Integer(1, config=True) 181 min_nodes = Integer(1, config=True) 182 max_nodes = Integer(1, config=True) 183 unit_type = Unicode("Core", config=True) 184 command_line = Unicode('', config=True) 185 work_directory = Unicode('', config=True) 186 is_rerunnaable = Bool(True, config=True) 187 std_out_file_path = Unicode('', config=True) 188 std_err_file_path = Unicode('', config=True) 189 is_parametric = Bool(False, config=True) 190 environment_variables = Instance(dict, args=(), config=True) 191 192 def _write_attr(self, root, attr, key): 193 s = as_str(getattr(self, attr, '')) 194 if s: 195 root.set(key, s) 196 197 def as_element(self): 198 root = ET.Element('Task') 199 self._write_attr(root, 'version', '_A_Version') 200 self._write_attr(root, 'task_name', '_B_Name') 201 self._write_attr(root, 'min_cores', '_C_MinCores') 202 self._write_attr(root, 'max_cores', '_D_MaxCores') 203 self._write_attr(root, 'min_sockets', '_E_MinSockets') 204 self._write_attr(root, 'max_sockets', '_F_MaxSockets') 205 self._write_attr(root, 'min_nodes', '_G_MinNodes') 206 self._write_attr(root, 'max_nodes', '_H_MaxNodes') 207 self._write_attr(root, 'command_line', '_I_CommandLine') 208 self._write_attr(root, 'work_directory', '_J_WorkDirectory') 209 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable') 210 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath') 211 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath') 212 self._write_attr(root, 'is_parametric', '_N_IsParametric') 213 self._write_attr(root, 'unit_type', '_O_UnitType') 214 root.append(self.get_env_vars()) 215 return root 216 217 def get_env_vars(self): 218 env_vars = ET.Element('EnvironmentVariables') 219 for k, v in iteritems(self.environment_variables): 220 variable = ET.SubElement(env_vars, "Variable") 221 name = ET.SubElement(variable, "Name") 222 name.text = k 223 value = ET.SubElement(variable, "Value") 224 value.text = v 225 return env_vars 226 227 228 229# By declaring these, we can configure the controller and engine separately! 230 231class IPControllerJob(WinHPCJob): 232 job_name = Unicode('IPController', config=False) 233 is_exclusive = Bool(False, config=True) 234 username = Unicode(find_username(), config=True) 235 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'), 236 default_value='Highest', config=True) 237 requested_nodes = Unicode('', config=True) 238 project = Unicode('IPython', config=True) 239 240 241class IPEngineSetJob(WinHPCJob): 242 job_name = Unicode('IPEngineSet', config=False) 243 is_exclusive = Bool(False, config=True) 244 username = Unicode(find_username(), config=True) 245 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'), 246 default_value='Highest', config=True) 247 requested_nodes = Unicode('', config=True) 248 project = Unicode('IPython', config=True) 249 250 251class IPControllerTask(WinHPCTask): 252 253 task_name = Unicode('IPController', config=True) 254 controller_cmd = List(['ipcontroller.exe'], config=True) 255 controller_args = List(['--log-level=40'], config=True) 256 # I don't want these to be configurable 257 std_out_file_path = Unicode('', config=False) 258 std_err_file_path = Unicode('', config=False) 259 min_cores = Integer(1, config=False) 260 max_cores = Integer(1, config=False) 261 min_sockets = Integer(1, config=False) 262 max_sockets = Integer(1, config=False) 263 min_nodes = Integer(1, config=False) 264 max_nodes = Integer(1, config=False) 265 unit_type = Unicode("Core", config=False) 266 work_directory = Unicode('', config=False) 267 268 def __init__(self, **kwargs): 269 super(IPControllerTask, self).__init__(**kwargs) 270 the_uuid = uuid.uuid1() 271 self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid) 272 self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid) 273 274 @property 275 def command_line(self): 276 return ' '.join(self.controller_cmd + self.controller_args) 277 278 279class IPEngineTask(WinHPCTask): 280 281 task_name = Unicode('IPEngine', config=True) 282 engine_cmd = List(['ipengine.exe'], config=True) 283 engine_args = List(['--log-level=40'], config=True) 284 # I don't want these to be configurable 285 std_out_file_path = Unicode('', config=False) 286 std_err_file_path = Unicode('', config=False) 287 min_cores = Integer(1, config=False) 288 max_cores = Integer(1, config=False) 289 min_sockets = Integer(1, config=False) 290 max_sockets = Integer(1, config=False) 291 min_nodes = Integer(1, config=False) 292 max_nodes = Integer(1, config=False) 293 unit_type = Unicode("Core", config=False) 294 work_directory = Unicode('', config=False) 295 296 def __init__(self, **kwargs): 297 super(IPEngineTask,self).__init__(**kwargs) 298 the_uuid = uuid.uuid1() 299 self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid) 300 self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid) 301 302 @property 303 def command_line(self): 304 return ' '.join(self.engine_cmd + self.engine_args) 305 306 307