1""" 2PySCeS - Python Simulator for Cellular Systems (http://pysces.sourceforge.net) 3 4Copyright (C) 2004-2022 B.G. Olivier, J.M. Rohwer, J.-H.S Hofmeyr all rights reserved, 5 6Brett G. Olivier (bgoli@users.sourceforge.net) 7Triple-J Group for Molecular Cell Physiology 8Stellenbosch University, South Africa. 9 10Permission to use, modify, and distribute this software is given under the 11terms of the PySceS (BSD style) license. See LICENSE.txt that came with 12this distribution for specifics. 13 14NO WARRANTY IS EXPRESSED OR IMPLIED. USE AT YOUR OWN RISK. 15Brett G. Olivier 16""" 17from __future__ import division, print_function 18from __future__ import absolute_import 19from __future__ import unicode_literals 20 21import os 22import numpy, scipy, itertools, time 23try: 24 input = raw_input # Py2 compatibility 25except NameError: 26 pass 27 28from .KrakenNET import socket, time, cPickle 29from .KrakenNET import SimpleClient, SimpleMultiReadClient, ThreadedClient 30from .KrakenNET import ModelFileServer, TentacleScanner, ServerListLoader 31from .KrakenNET import ServerStatusCheck, HOSTNAME, BLOCK_SIZE, PICKLE_PROTOCOL, STATUS_PORT, PYSCES_PORT, CFSERVE_PORT 32 33print('I AM:', HOSTNAME) 34print('Using BLOCK_SIZE: %s\n' % BLOCK_SIZE) 35 36class KrakenController: 37 kc_status_port = None 38 kc_pysces_port = None 39 kc_block_size = None 40 41 kc_file_name = 'server_list' 42 kc_directory_name = None 43 kc_available_server_list = None 44 kc_busy_server_list = None 45 kc_tentacle_response = None 46 kc_model_server = None 47 kc_tentacle_scanner = None 48 kc_tentacle_monitor = None 49 kc_tentacle_process_map = None 50 51 def __init__(self,status_port=STATUS_PORT, pysces_port=PYSCES_PORT, block_size=BLOCK_SIZE): 52 self.kc_status_port = status_port 53 self.kc_pysces_port = pysces_port 54 self.kc_block_size = block_size 55 self.kc_available_server_list = [] 56 self.kc_busy_server_list = [] 57 self.kc_tentacle_response = {} 58 self.kc_tentacle_process_map = {} 59 60 print(self.kc_status_port, self.kc_pysces_port, self.kc_block_size) 61 62 def initialiseTentacles(self,file_name=None, directory_name=os.getcwd()): 63 if file_name != None: 64 self.kc_file_name = file_name 65 self.kc_available_server_list = ServerListLoader().ReadFile(self.kc_file_name, directory_name) 66 self.kc_tentacle_scanner = TentacleScanner(self.kc_available_server_list) 67 print('Server list:') 68 print(self.kc_available_server_list, '\n') 69 assert len(self.kc_available_server_list) > 0, '\n* No servers in server list file' 70 71 def getActiveTentacles(self): 72 self.kc_tentacle_scanner.scan() 73 self.kc_available_server_list = self.kc_tentacle_scanner.servers_ready 74 self.kc_busy_server_list = self.kc_tentacle_scanner.servers_busy 75 print('Server list:') 76 print(self.kc_available_server_list, '\n') 77 assert len(self.kc_available_server_list) > 0, '\n* No active servers' 78 for t in self.kc_available_server_list: 79 self.kc_tentacle_response.setdefault(t) 80 self.kc_tentacle_process_map.setdefault(t) 81 # maybe wil be needed later for reruns of getActiveTentacles 82 ## if self.kc_tentacle_process_map.has_key[t]: 83 ## dt = self.kc_tentacle_process_map.pop[t] 84 ## del dt 85 86 ## print self.kc_tentacle_response 87 ## print self.kc_tentacle_process_map 88 89 90 def Run(self): 91 pass 92 93 def sendCmdListToAll(self, command_list, pause=0.5): 94 assert type(command_list) == list or type(command_list) == tuple, '\n* Input must be a list or tuple' 95 for server in self.kc_available_server_list: 96 self.sendCmdListToOne(command_list, server) 97 time.sleep(pause) 98 99 def sendCmdListToOne(self, command_list, server): 100 assert type(command_list) == list or type(command_list) == tuple, '\n* Input must be a list or tuple' 101 assert server in self.kc_available_server_list, '\n* Server %s not in active server list' % server 102 clnt = SimpleClient(server, self.kc_pysces_port, self.kc_block_size) 103 clnt.send(command_list) 104 self.kc_tentacle_response[server] = clnt.response 105 ## print self.kc_tentacle_response 106 107 def sendJobToOne(self, command, server): 108 assert server in self.kc_available_server_list, '\n* Server %s not in active server list' % server 109 self.kc_tentacle_process_map[server] = ThreadedClient(command, server, self.kc_pysces_port, self.kc_block_size) 110 111 def sendJobToAll(self, command): 112 for server in self.kc_available_server_list: 113 self.sendJobToOne(command, server) 114 115 def runAllJobs(self): 116 Istarted = [] 117 for s in list(self.kc_tentacle_process_map.keys()): 118 if self.kc_tentacle_process_map[s] != None: 119 if not self.kc_tentacle_process_map[s].isAlive(): 120 self.kc_tentacle_process_map[s].start() 121 Istarted.append(self.kc_tentacle_process_map[s]) 122 for s in Istarted: 123 s.join() 124 del Istarted 125 126 def clearProcessMap(self): 127 for s in list(self.kc_tentacle_process_map.keys()): 128 if self.kc_tentacle_process_map[s] != None: 129 if not self.kc_tentacle_process_map[s].isAlive(): 130 self.kc_tentacle_process_map[s] = None 131 ## print self.kc_tentacle_process_map 132 133 134 def getDataFromOneJob(self, server): 135 assert server in self.kc_tentacle_process_map, '\n* Server %s not in active server list' % server 136 if self.kc_tentacle_process_map[server].response == 'True': 137 print('* \"%s\" reports completing job without error' % server) 138 else: 139 print('* Check result! \"%s\" reports job processing failure' % server) 140 client = SimpleMultiReadClient(server, self.kc_pysces_port, self.kc_block_size) 141 client.send('P_GETDATA') 142 self.kc_tentacle_response[server] = cPickle.loads(client.response) 143 print('\n', server, 'returns', type(self.kc_tentacle_response[server])) 144 print('Data successfully retrieved from: %s \n' % server) 145 146 def getDataFromAllJobs(self, pause=0.2): 147 for s in list(self.kc_tentacle_process_map.keys()): 148 if self.kc_tentacle_process_map[s] != None: 149 self.getDataFromOneJob(s) 150 time.sleep(pause) 151 152 def killTentacles(self): 153 GO = True 154 while GO: 155 inp = input('\nYou have initiated tentacle shutdown ... are you sure? (y/n): ') 156 if inp in ['y','Y','yes']: 157 GO = False 158 for server in self.kc_available_server_list: 159 SimpleClient(server, self.kc_status_port, self.kc_block_size).send(['KILL']) 160 SimpleClient(server, self.kc_pysces_port, self.kc_block_size).send(['KILL']) 161 print('Warning: all servers deleted for this host') 162 self.kc_available_server_list = [] 163 elif inp in ['n','N','no']: 164 GO = False 165 else: 166 print('\nPlease enter y for yes or n for no') 167 168 def chkpsc(self,F): 169 try: 170 if F[-4:] == '.psc': 171 pass 172 else: 173 print('Assuming extension is .psc') 174 F += '.psc' 175 except: 176 print('Chkpsc error') 177 return F 178 179 def startModelServer(self, model_file, model_dir): 180 self.model_file = model_file 181 self.model_dir = model_dir 182 self.model_file = self.chkpsc(self.model_file) 183 mpath = os.path.join(self.model_dir, self.model_file) 184 if os.path.exists(mpath): 185 try: 186 self.kc_model_server = ModelFileServer(CFSERVE_PORT, BLOCK_SIZE, 'model_server') 187 self.kc_model_server.ReadFile(self.model_file, self.model_dir) 188 self.kc_model_server.start() 189 except Exception as ex: 190 print('Server might already be running on this machine') 191 print(ex) 192 else: 193 print('Error: file %s does not exist!\n' % mpath) 194 raise NameError 195 196 def startTentacleMonitor(self, interval=60): 197 self.kc_tentacle_monitor = ServerStatusCheck(self.kc_available_server_list, self.kc_status_port, self.kc_block_size, interval) 198 self.kc_tentacle_monitor.start() 199 200 def ResetServersToReady(self): 201 for server in self.kc_available_server_list: 202 SimpleClient(server, self.kc_status_port, self.kc_block_size).send(['P_RESET_STATUS']) 203 204 205class KrakenScanController(KrakenController): 206 job_list = None 207 JobsWaiting = None 208 result_list = None 209 result_array = None 210 task_id = None 211 working_dir = None 212 Model_File = None 213 Model_Dir = None 214 tentacle_monitor_interval = 120 215 init_commands = None 216 once_off_tentacle_init = None 217 218 def setEnv(self, task_id, working_dir, model_file, model_dir): 219 self.task_id = task_id 220 self.working_dir = working_dir 221 self.job_list = [] 222 self.result_list = [] 223 self.Model_File = model_file 224 self.Model_Dir = model_dir 225 226 def startController(self): 227 ## self.blob = getBasicController() 228 self.initialiseTentacles(directory_name=self.working_dir) 229 self.getActiveTentacles() 230 self.startModelServer(self.Model_File, self.Model_Dir) 231 self.startTentacleMonitor(interval=self.tentacle_monitor_interval) 232 print('Server list: %s\n' % self.kc_available_server_list) 233 234 def setInitCmds(self, lst): 235 assert type(lst) == list, 'This must be a list' 236 self.init_commands = lst 237 238 def setOnceOffTentacleInit(self, lst): 239 assert type(lst) == list, 'This must be a list' 240 self.once_off_tentacle_init = lst 241 242 def setJobList(self, lst): 243 assert type(lst) == list, 'This must be a list' 244 self.job_list = lst 245 246 247 def setScanJobs(self, start, end, intervals, job, log=False): 248 """Splits a range into a number of jobs with intervals""" 249 assert intervals >= 1, '\n* Minimum of 1 interval' 250 if log: 251 kpoints = scipy.logspace(scipy.log10(start), scipy.log10(end), intervals+1) 252 else: 253 kpoints = scipy.linspace(start, end, intervals+1) 254 self.job_list = [] 255 for p in range(len(kpoints)-1): 256 job2 = job % (kpoints[p], kpoints[p+1]) 257 self.job_list.append(job2) 258 print(job2) 259 260 def Run(self, raw_data_dump=True): 261 START = time.time() 262 self.Scheduler3(self.job_list, self.init_commands) 263 MID = time.time() 264 if raw_data_dump: 265 try: 266 self.Dump(self.result_list, self.task_id+'_raw_data') 267 except Exception as ex: 268 print('Raw data not saved', ex) 269 print("Data generation time = %2.2f minutes." % ((MID-START)/60.0)) 270 271 def buildCycler(self, lst): 272 """ 273 creates an infinite looping generator with itertools 274 """ 275 return itertools.cycle(lst) 276 277 def Dump(self, thing, fname): 278 try: 279 F = open(os.path.join(self.working_dir, fname.replace('.bin','')+'.bin'), 'wb') 280 cPickle.dump(thing, F, 2) 281 F.flush() 282 F.close() 283 except Exception as ex: 284 print('Dump exception raised') 285 print(ex) 286 287 def deltaCommand(self): 288 """ 289 This is used to setup conditions for each next-job-on-server 290 """ 291 delta = 'P_NONE' 292 return [delta] 293 294 def Scheduler3(self, job_list, init_list): 295 """ 296 job_list is a list of jobs to run 297 init_list set of `static' command used to initialise each job 298 """ 299 original_job_list = tuple(job_list) 300 TIME_START = time.time() 301 self.result_list = [] 302 self.JobsWaiting = True 303 getServer = self.buildCycler(self.kc_available_server_list) 304 if self.once_off_tentacle_init != None: 305 self.sendCmdListToAll(self.once_off_tentacle_init, pause=0.5) 306 while self.JobsWaiting: 307 deferred_jobs = [] 308 job_servers = [] 309 for job in range(len(job_list)): 310 if job < len(self.kc_available_server_list): 311 print('\nJob %s queued for processing...' % (job+1)) 312 server = next(getServer) 313 job_servers.append(server) 314 print("\nProcessing job %s init list ..." % (job+1)) 315 self.sendCmdListToOne(init_list, server) 316 print("\nProcessing job %s delta command ..." % (job+1)) 317 self.sendCmdListToOne(self.deltaCommand(), server) 318 self.sendJobToOne(job_list[job], server) 319 else: 320 deferred_jobs.append(job_list[job]) 321 print('Job %s deferred and rescheduled.' % (job+1)) 322 print("\nProcessing queued jobs ...") 323 self.runAllJobs() 324 for server in job_servers: 325 self.getDataFromOneJob(server) 326 arr = self.kc_tentacle_response[server] 327 self.result_list.append(arr) 328 print(type(arr)) 329 self.clearProcessMap() 330 if len(deferred_jobs) > 0: 331 print('\nDeferred:', deferred_jobs) 332 job_list = deferred_jobs 333 else: 334 self.JobsWaiting = False 335 336 TIME_END = time.time() 337 338 print('\nNumber of results = %s' % len(self.result_list)) 339 total_states = 0 340 ## for x in range(len(self.result_list)): 341 ## try: 342 ## print '\tResult %s has %s rows and %s columns' % (x+1, self.result_list[x].shape[0], self.result_list[x].shape[1]) 343 ## total_states += self.result_list[x].shape[0] 344 ## except Exception, ex: 345 ## print type(self.result_list[x][0]) 346 ## print '\tResult %s %s: %s' % (x+1, type(self.result_list[x][0]), self.result_list[x][0]) 347 ## print original_job_list[x] 348 ## print ex 349 ## print 'Total time taken to complete %s state scan %s minutes' % (total_states, (TIME_END-TIME_START)/60.0) 350 351 #investigate using vstack with type/range checking 352 def concatenateResults(self): 353 output = None 354 for arr in range(len(self.result_list)): 355 if type(self.result_list[arr]) == numpy.ndarray: 356 if arr == 0: 357 output = self.result_list[arr] 358 else: 359 output = scipy.concatenate((output,self.result_list[arr])) 360 self.result_array = output 361 362 def getResultArray(self): 363 return self.result_array 364 365 def getResultList(self): 366 return self.result_list 367 368 def getAndClearResultArray(self): 369 print('ReSetting result_array and result_list (returning result_array)') 370 tmp = self.result_array 371 self.result_list = [] 372 self.result_array = None 373 return tmp 374 375 def getAndClearResultList(self): 376 print('ReSetting result_array and result_list (returning result_list)') 377 tmp = self.result_list 378 self.result_list = [] 379 self.result_array = None 380 return tmp 381 382 383 384def getBasicController(lvl=3, directory=os.getcwd()): 385 if lvl >= 1: blob = KrakenController() 386 if lvl >= 2: blob.initialiseTentacles(directory_name=directory) 387 if lvl >= 3: blob.getActiveTentacles() 388 return blob 389 390