1import zmq 2from zmq.eventloop import ioloop 3from zmq.eventloop.zmqstream import ZMQStream 4from threading import Thread 5from threading import Condition 6from threading import Lock 7from functools import partial 8import inspect 9import json 10import time 11import tempfile 12import sys 13 14from utils import underscore_to_camelcase 15from utils import camelcase_to_underscore 16from utils import JsonRpc 17import threading 18 19class JobState: 20 # Unknown status 21 UNKNOWN = -1, 22 # Initial state of job, should never be entered. 23 NONE = 0, 24 # Job has been accepted by the server and is being prepared (Writing input files, etc). 25 ACCEPTED = 1 26 # Job is being queued locally, either waiting for local execution or remote submission. 27 QUEUED_LOCAL = 2 28 # Job has been submitted to a remote queuing system. 29 SUBMITTED = 3 30 # Job is pending execution on a remote queuing system. 31 QUEUED_REMOTE = 4 32 # Job is running locally. 33 RUNNING_LOCAL = 5 34 # Job is running remotely. 35 RUNNING_REMOTE = 6 36 # Job has completed. 37 FINISHED = 7 38 # Job has been terminated at a user request. 39 CANCELED = 8 40 # Job has been terminated due to an error. 41 ERROR = 9 42 43class FilePath: 44 def __init__(self): 45 self.path = None 46 47class FileContents: 48 def __init__(self): 49 self.filename = None 50 self.contents = None 51 52class Job: 53 def __init__(self): 54 self.queue = None 55 self.program = None 56 self.description = '' 57 self.input_file = None 58 self.output_directory = None 59 self.local_working_directory = None 60 self.clean_remote_files = False 61 self.retrieve_output = True 62 self.clean_local_working_directory = False 63 self.hide_from_gui = False 64 self.popup_on_state_change = True 65 self.number_of_cores = 1 66 self.max_wall_time = -1 67 68 def job_state(self): 69 return self._job_state 70 71 def molequeue_id(self): 72 return self._mole_queue_id 73 74 def queue_id(self): 75 return self._queue_id 76 77class Queue: 78 def __init__(self): 79 self.name = None; 80 self.programs = []; 81 82class EventLoop(Thread): 83 def __init__(self, io_loop): 84 Thread.__init__(self) 85 self.io_loop = io_loop 86 87 def run(self): 88 self.io_loop.start() 89 90 def stop(self): 91 self.io_loop.stop() 92 self.join() 93 94class MoleQueueException(Exception): 95 """The base class of all MoleQueue exceptions """ 96 pass 97 98class JobException(MoleQueueException): 99 def __init__(self, packet_id, code, message): 100 self.packet_id = packet_id 101 self.code = code 102 self.message = message 103 104class JobInformationException(MoleQueueException): 105 def __init__(self, packet_id, data, code, message): 106 self.packet_id = packet_id 107 self.data = data 108 self.code = code 109 self.message = message 110 111class Client: 112 113 def __init__(self): 114 self._current_packet_id = 0 115 self._request_response_map = {} 116 self._new_response_condition = Condition() 117 self._packet_id_lock = Lock() 118 self._notification_callbacks = [] 119 120 def connect_to_server(self, server): 121 self.context = zmq.Context() 122 self.socket = self.context.socket(zmq.DEALER) 123 124 tmpdir = tempfile.gettempdir() 125 connection_string = 'ipc://%s/%s_%s' % (tmpdir, 'zmq', server) 126 self.socket.connect(connection_string) 127 128 io_loop = ioloop.IOLoop.instance() 129 self.stream = ZMQStream(self.socket, io_loop=io_loop) 130 131 # create partial function that has self as first argument 132 callback = partial(_on_recv, self) 133 self.stream.on_recv(callback) 134 self.event_loop = EventLoop(io_loop) 135 self.event_loop.start() 136 137 def disconnect(self): 138 self.stream.flush() 139 self.event_loop.stop() 140 self.socket.close() 141 142 def register_notification_callback(self, callback): 143 # check a valid function has been past 144 assert callable(callback) 145 self._notification_callbacks.append(callback) 146 147 def request_queue_list_update(self, timeout=None): 148 packet_id = self._next_packet_id() 149 150 jsonrpc = JsonRpc.generate_request(packet_id, 151 'listQueues', 152 None) 153 154 self._send_request(packet_id, jsonrpc) 155 response = self._wait_for_response(packet_id, timeout) 156 157 # Timeout 158 if response == None: 159 return None 160 161 queues = JsonRpc.json_to_queues(response) 162 163 return queues 164 165 def submit_job(self, request, timeout=None): 166 params = JsonRpc.object_to_json_params(request) 167 packet_id = self._next_packet_id() 168 169 jsonrpc = JsonRpc.generate_request(packet_id, 170 'submitJob', 171 params) 172 173 self._send_request(packet_id, jsonrpc) 174 response = self._wait_for_response(packet_id, timeout) 175 176 # Timeout 177 if response == None: 178 return None 179 180 # if we an error occurred then throw an exception 181 if 'error' in response: 182 exception = JobException(response['id'], 183 response['error']['code'], 184 response['error']['message']) 185 raise exception 186 187 # otherwise return the molequeue id 188 return response['result']['moleQueueId'] 189 190 def cancel_job(self): 191 # TODO 192 pass 193 194 def lookup_job(self, molequeue_id, timeout=None): 195 196 params = {'moleQueueId': molequeue_id} 197 198 packet_id = self._next_packet_id() 199 jsonrpc = JsonRpc.generate_request(packet_id, 200 'lookupJob', 201 params) 202 203 self._send_request(packet_id, jsonrpc) 204 response = self._wait_for_response(packet_id, timeout) 205 206 # Timeout 207 if response == None: 208 return None 209 210 # if we an error occurred then throw an exception 211 if 'error' in response: 212 exception = JobInformationException(response['id'], 213 response['error']['data'], 214 response['error']['code'], 215 response['error']['message']) 216 raise exception 217 218 job = JsonRpc.json_to_job(response) 219 220 return job 221 222 223 def _on_response(self, packet_id, msg): 224 if packet_id in self._request_response_map: 225 self._new_response_condition.acquire() 226 self._request_response_map[packet_id] = msg 227 # notify any threads waiting that their response may have arrived 228 self._new_response_condition.notify_all() 229 self._new_response_condition.release() 230 231 # TODO Convert raw JSON into a Python class 232 def _on_notification(self, msg): 233 for callback in self._notification_callbacks: 234 callback(msg) 235 236 # Testing only method. Kill the server application if allowed. 237 def _send_rpc_kill_request(self, timeout=None): 238 params = {} 239 packet_id = self._next_packet_id() 240 jsonrpc = JsonRpc.generate_request(packet_id, 'rpcKill', params) 241 self._send_request(packet_id, jsonrpc) 242 response = self._wait_for_response(packet_id, timeout) 243 244 # Timeout 245 if response == None: 246 return None 247 248 if 'result' in response and 'success' in response['result'] and response['result']['success'] == True: 249 return True 250 return False 251 252 def _next_packet_id(self): 253 with self._packet_id_lock: 254 self._current_packet_id += 1 255 next = self._current_packet_id 256 return next 257 258 def _send_request(self, packet_id, jsonrpc): 259 # add to request map so we know we are waiting on response for this packet 260 # id 261 self._request_response_map[packet_id] = None 262 self.stream.send(str(jsonrpc)) 263 self.stream.flush() 264 265 def _wait_for_response(self, packet_id, timeout): 266 try: 267 start = time.time() 268 # wait for the response to come in 269 self._new_response_condition.acquire() 270 while self._request_response_map[packet_id] == None: 271 # need to set a wait time otherwise the wait can't be interrupted 272 # See http://bugs.python.org/issue8844 273 wait_time = sys.maxint 274 if timeout != None: 275 wait_time = timeout - (time.time() - start) 276 if wait_time <= 0: 277 break; 278 self._new_response_condition.wait(wait_time) 279 280 response = self._request_response_map.pop(packet_id) 281 282 self._new_response_condition.release() 283 284 return response 285 except KeyboardInterrupt: 286 self.event_loop.stop() 287 raise 288 289def _on_recv(client, msg): 290 jsonrpc = json.loads(msg[0]) 291 292 # reply to a request 293 if 'id' in jsonrpc: 294 packet_id = jsonrpc['id'] 295 client._on_response(packet_id, jsonrpc) 296 # this is a notification 297 else: 298 client._on_notification(jsonrpc) 299