1"""
2PySCeS - Python Simulator for Cellular Systems (http://pysces.sourceforge.net)
3
4Copyright (C) 2004-2022 B.G. Olivier, J.M. Rohwer, J.-H.S Hofmeyr all rights reserved,
5
6Brett G. Olivier (bgoli@users.sourceforge.net)
7Triple-J Group for Molecular Cell Physiology
8Stellenbosch University, South Africa.
9
10Permission to use, modify, and distribute this software is given under the
11terms of the PySceS (BSD style) license. See LICENSE.txt that came with
12this distribution for specifics.
13
14NO WARRANTY IS EXPRESSED OR IMPLIED.  USE AT YOUR OWN RISK.
15Brett G. Olivier
16"""
17from __future__ import division, print_function
18from __future__ import absolute_import
19from __future__ import unicode_literals
20
21import os
22import numpy, scipy, itertools, time
23try:
24    input = raw_input  # Py2 compatibility
25except NameError:
26    pass
27
28from .KrakenNET import socket, time, cPickle
29from .KrakenNET import SimpleClient, SimpleMultiReadClient, ThreadedClient
30from .KrakenNET import ModelFileServer, TentacleScanner, ServerListLoader
31from .KrakenNET import ServerStatusCheck, HOSTNAME, BLOCK_SIZE, PICKLE_PROTOCOL, STATUS_PORT, PYSCES_PORT, CFSERVE_PORT
32
33print('I AM:', HOSTNAME)
34print('Using BLOCK_SIZE: %s\n' % BLOCK_SIZE)
35
36class KrakenController:
37    kc_status_port = None
38    kc_pysces_port = None
39    kc_block_size = None
40
41    kc_file_name = 'server_list'
42    kc_directory_name = None
43    kc_available_server_list = None
44    kc_busy_server_list = None
45    kc_tentacle_response = None
46    kc_model_server = None
47    kc_tentacle_scanner = None
48    kc_tentacle_monitor = None
49    kc_tentacle_process_map = None
50
51    def __init__(self,status_port=STATUS_PORT, pysces_port=PYSCES_PORT, block_size=BLOCK_SIZE):
52        self.kc_status_port = status_port
53        self.kc_pysces_port = pysces_port
54        self.kc_block_size = block_size
55        self.kc_available_server_list = []
56        self.kc_busy_server_list = []
57        self.kc_tentacle_response = {}
58        self.kc_tentacle_process_map = {}
59
60        print(self.kc_status_port, self.kc_pysces_port, self.kc_block_size)
61
62    def initialiseTentacles(self,file_name=None, directory_name=os.getcwd()):
63        if file_name != None:
64            self.kc_file_name = file_name
65        self.kc_available_server_list = ServerListLoader().ReadFile(self.kc_file_name, directory_name)
66        self.kc_tentacle_scanner = TentacleScanner(self.kc_available_server_list)
67        print('Server list:')
68        print(self.kc_available_server_list, '\n')
69        assert len(self.kc_available_server_list) > 0, '\n* No servers in server list file'
70
71    def getActiveTentacles(self):
72        self.kc_tentacle_scanner.scan()
73        self.kc_available_server_list = self.kc_tentacle_scanner.servers_ready
74        self.kc_busy_server_list = self.kc_tentacle_scanner.servers_busy
75        print('Server list:')
76        print(self.kc_available_server_list, '\n')
77        assert len(self.kc_available_server_list) > 0, '\n* No active servers'
78        for t in self.kc_available_server_list:
79            self.kc_tentacle_response.setdefault(t)
80            self.kc_tentacle_process_map.setdefault(t)
81            # maybe wil be needed later for reruns of getActiveTentacles
82            ##  if self.kc_tentacle_process_map.has_key[t]:
83                ##  dt = self.kc_tentacle_process_map.pop[t]
84                ##  del dt
85
86        ##  print self.kc_tentacle_response
87        ##  print self.kc_tentacle_process_map
88
89
90    def Run(self):
91        pass
92
93    def sendCmdListToAll(self, command_list, pause=0.5):
94        assert type(command_list) == list or type(command_list) == tuple, '\n* Input must be a list or tuple'
95        for server in self.kc_available_server_list:
96            self.sendCmdListToOne(command_list, server)
97            time.sleep(pause)
98
99    def sendCmdListToOne(self, command_list, server):
100        assert type(command_list) == list or type(command_list) == tuple, '\n* Input must be a list or tuple'
101        assert server in self.kc_available_server_list, '\n* Server %s not in active server list' % server
102        clnt = SimpleClient(server, self.kc_pysces_port, self.kc_block_size)
103        clnt.send(command_list)
104        self.kc_tentacle_response[server] = clnt.response
105        ##  print self.kc_tentacle_response
106
107    def sendJobToOne(self, command, server):
108        assert server in self.kc_available_server_list, '\n* Server %s not in active server list' % server
109        self.kc_tentacle_process_map[server] = ThreadedClient(command, server, self.kc_pysces_port, self.kc_block_size)
110
111    def sendJobToAll(self, command):
112        for server in self.kc_available_server_list:
113            self.sendJobToOne(command, server)
114
115    def runAllJobs(self):
116        Istarted = []
117        for s in list(self.kc_tentacle_process_map.keys()):
118            if self.kc_tentacle_process_map[s] != None:
119                if not self.kc_tentacle_process_map[s].isAlive():
120                    self.kc_tentacle_process_map[s].start()
121                    Istarted.append(self.kc_tentacle_process_map[s])
122        for s in Istarted:
123            s.join()
124        del Istarted
125
126    def clearProcessMap(self):
127        for s in list(self.kc_tentacle_process_map.keys()):
128            if self.kc_tentacle_process_map[s] != None:
129                if not self.kc_tentacle_process_map[s].isAlive():
130                    self.kc_tentacle_process_map[s] = None
131        ##  print self.kc_tentacle_process_map
132
133
134    def getDataFromOneJob(self, server):
135        assert server in self.kc_tentacle_process_map, '\n* Server %s not in active server list' % server
136        if self.kc_tentacle_process_map[server].response == 'True':
137            print('* \"%s\" reports completing job without error' % server)
138        else:
139            print('* Check result! \"%s\" reports job processing failure'  % server)
140        client = SimpleMultiReadClient(server, self.kc_pysces_port, self.kc_block_size)
141        client.send('P_GETDATA')
142        self.kc_tentacle_response[server] = cPickle.loads(client.response)
143        print('\n', server, 'returns', type(self.kc_tentacle_response[server]))
144        print('Data successfully retrieved from: %s \n' % server)
145
146    def getDataFromAllJobs(self, pause=0.2):
147        for s in list(self.kc_tentacle_process_map.keys()):
148            if self.kc_tentacle_process_map[s] != None:
149                self.getDataFromOneJob(s)
150                time.sleep(pause)
151
152    def killTentacles(self):
153        GO = True
154        while GO:
155            inp = input('\nYou have initiated tentacle shutdown ... are you sure? (y/n): ')
156            if inp in ['y','Y','yes']:
157                GO = False
158                for server in self.kc_available_server_list:
159                    SimpleClient(server, self.kc_status_port, self.kc_block_size).send(['KILL'])
160                    SimpleClient(server, self.kc_pysces_port, self.kc_block_size).send(['KILL'])
161                print('Warning: all servers deleted for this host')
162                self.kc_available_server_list = []
163            elif inp in ['n','N','no']:
164                GO = False
165            else:
166                print('\nPlease enter y for yes or n for no')
167
168    def chkpsc(self,F):
169        try:
170            if F[-4:] == '.psc':
171                pass
172            else:
173                print('Assuming extension is .psc')
174                F += '.psc'
175        except:
176            print('Chkpsc error')
177        return F
178
179    def startModelServer(self, model_file, model_dir):
180        self.model_file = model_file
181        self.model_dir = model_dir
182        self.model_file = self.chkpsc(self.model_file)
183        mpath = os.path.join(self.model_dir, self.model_file)
184        if os.path.exists(mpath):
185            try:
186                self.kc_model_server = ModelFileServer(CFSERVE_PORT, BLOCK_SIZE, 'model_server')
187                self.kc_model_server.ReadFile(self.model_file, self.model_dir)
188                self.kc_model_server.start()
189            except Exception as ex:
190                print('Server might already be running on this machine')
191                print(ex)
192        else:
193            print('Error: file %s does not exist!\n' % mpath)
194            raise NameError
195
196    def startTentacleMonitor(self, interval=60):
197        self.kc_tentacle_monitor = ServerStatusCheck(self.kc_available_server_list, self.kc_status_port, self.kc_block_size, interval)
198        self.kc_tentacle_monitor.start()
199
200    def ResetServersToReady(self):
201        for server in self.kc_available_server_list:
202            SimpleClient(server, self.kc_status_port, self.kc_block_size).send(['P_RESET_STATUS'])
203
204
205class KrakenScanController(KrakenController):
206    job_list = None
207    JobsWaiting = None
208    result_list = None
209    result_array = None
210    task_id = None
211    working_dir = None
212    Model_File = None
213    Model_Dir = None
214    tentacle_monitor_interval = 120
215    init_commands = None
216    once_off_tentacle_init = None
217
218    def setEnv(self, task_id, working_dir, model_file, model_dir):
219        self.task_id = task_id
220        self.working_dir = working_dir
221        self.job_list = []
222        self.result_list = []
223        self.Model_File = model_file
224        self.Model_Dir = model_dir
225
226    def startController(self):
227        ##  self.blob = getBasicController()
228        self.initialiseTentacles(directory_name=self.working_dir)
229        self.getActiveTentacles()
230        self.startModelServer(self.Model_File, self.Model_Dir)
231        self.startTentacleMonitor(interval=self.tentacle_monitor_interval)
232        print('Server list: %s\n' % self.kc_available_server_list)
233
234    def setInitCmds(self, lst):
235        assert type(lst) == list, 'This must be a list'
236        self.init_commands = lst
237
238    def setOnceOffTentacleInit(self, lst):
239        assert type(lst) == list, 'This must be a list'
240        self.once_off_tentacle_init = lst
241
242    def setJobList(self, lst):
243        assert type(lst) == list, 'This must be a list'
244        self.job_list = lst
245
246
247    def setScanJobs(self, start, end, intervals, job, log=False):
248        """Splits a range into a number of jobs with intervals"""
249        assert intervals >= 1, '\n* Minimum of 1 interval'
250        if log:
251            kpoints = scipy.logspace(scipy.log10(start), scipy.log10(end), intervals+1)
252        else:
253            kpoints = scipy.linspace(start, end, intervals+1)
254        self.job_list = []
255        for p in range(len(kpoints)-1):
256            job2 = job % (kpoints[p], kpoints[p+1])
257            self.job_list.append(job2)
258            print(job2)
259
260    def Run(self, raw_data_dump=True):
261        START = time.time()
262        self.Scheduler3(self.job_list, self.init_commands)
263        MID = time.time()
264        if raw_data_dump:
265            try:
266                self.Dump(self.result_list, self.task_id+'_raw_data')
267            except Exception as ex:
268                print('Raw data not saved', ex)
269        print("Data generation time = %2.2f minutes." % ((MID-START)/60.0))
270
271    def buildCycler(self, lst):
272        """
273        creates an infinite looping generator with itertools
274        """
275        return itertools.cycle(lst)
276
277    def Dump(self, thing, fname):
278        try:
279            F = open(os.path.join(self.working_dir, fname.replace('.bin','')+'.bin'), 'wb')
280            cPickle.dump(thing, F, 2)
281            F.flush()
282            F.close()
283        except Exception as ex:
284            print('Dump exception raised')
285            print(ex)
286
287    def deltaCommand(self):
288        """
289        This is used to setup conditions for each next-job-on-server
290        """
291        delta = 'P_NONE'
292        return [delta]
293
294    def Scheduler3(self, job_list, init_list):
295        """
296        job_list is a list of jobs to run
297        init_list set of `static' command used to initialise each job
298        """
299        original_job_list = tuple(job_list)
300        TIME_START = time.time()
301        self.result_list = []
302        self.JobsWaiting = True
303        getServer = self.buildCycler(self.kc_available_server_list)
304        if self.once_off_tentacle_init != None:
305            self.sendCmdListToAll(self.once_off_tentacle_init, pause=0.5)
306        while self.JobsWaiting:
307            deferred_jobs = []
308            job_servers = []
309            for job in range(len(job_list)):
310                if job < len(self.kc_available_server_list):
311                    print('\nJob %s queued for processing...' % (job+1))
312                    server = next(getServer)
313                    job_servers.append(server)
314                    print("\nProcessing job %s init list ..." % (job+1))
315                    self.sendCmdListToOne(init_list, server)
316                    print("\nProcessing job %s delta command ..." % (job+1))
317                    self.sendCmdListToOne(self.deltaCommand(), server)
318                    self.sendJobToOne(job_list[job], server)
319                else:
320                    deferred_jobs.append(job_list[job])
321                    print('Job %s deferred and rescheduled.' % (job+1))
322            print("\nProcessing queued jobs ...")
323            self.runAllJobs()
324            for server in job_servers:
325                self.getDataFromOneJob(server)
326                arr = self.kc_tentacle_response[server]
327                self.result_list.append(arr)
328                print(type(arr))
329            self.clearProcessMap()
330            if len(deferred_jobs) > 0:
331                print('\nDeferred:', deferred_jobs)
332                job_list = deferred_jobs
333            else:
334                self.JobsWaiting = False
335
336        TIME_END = time.time()
337
338        print('\nNumber of results = %s' % len(self.result_list))
339        total_states = 0
340        ##  for x in range(len(self.result_list)):
341            ##  try:
342                ##  print '\tResult %s has %s rows and %s columns' % (x+1, self.result_list[x].shape[0], self.result_list[x].shape[1])
343                ##  total_states += self.result_list[x].shape[0]
344            ##  except Exception, ex:
345                ##  print type(self.result_list[x][0])
346                ##  print '\tResult %s %s: %s' % (x+1, type(self.result_list[x][0]), self.result_list[x][0])
347                ##  print original_job_list[x]
348                ##  print ex
349        ##  print 'Total time taken to complete %s state scan %s minutes' % (total_states, (TIME_END-TIME_START)/60.0)
350
351    #investigate using vstack with type/range checking
352    def concatenateResults(self):
353        output = None
354        for arr in range(len(self.result_list)):
355            if type(self.result_list[arr]) == numpy.ndarray:
356                if arr == 0:
357                    output = self.result_list[arr]
358                else:
359                    output = scipy.concatenate((output,self.result_list[arr]))
360        self.result_array = output
361
362    def getResultArray(self):
363        return self.result_array
364
365    def getResultList(self):
366        return self.result_list
367
368    def getAndClearResultArray(self):
369        print('ReSetting result_array and result_list (returning result_array)')
370        tmp = self.result_array
371        self.result_list = []
372        self.result_array = None
373        return tmp
374
375    def getAndClearResultList(self):
376        print('ReSetting result_array and result_list (returning result_list)')
377        tmp = self.result_list
378        self.result_list = []
379        self.result_array = None
380        return tmp
381
382
383
384def getBasicController(lvl=3, directory=os.getcwd()):
385    if lvl >= 1: blob = KrakenController()
386    if lvl >= 2: blob.initialiseTentacles(directory_name=directory)
387    if lvl >= 3: blob.getActiveTentacles()
388    return blob
389
390