1import zmq
2from zmq.eventloop import ioloop
3from zmq.eventloop.zmqstream import ZMQStream
4from threading import Thread
5from threading import Condition
6from threading import Lock
7from functools import partial
8import inspect
9import json
10import time
11import tempfile
12import sys
13
14from utils import underscore_to_camelcase
15from utils import camelcase_to_underscore
16from utils import JsonRpc
17import threading
18
19class JobState:
20  # Unknown status
21  UNKNOWN = -1,
22  # Initial state of job, should never be entered.
23  NONE = 0,
24  # Job has been accepted by the server and is being prepared (Writing input files, etc).
25  ACCEPTED = 1
26  # Job is being queued locally, either waiting for local execution or remote submission.
27  QUEUED_LOCAL = 2
28  # Job has been submitted to a remote queuing system.
29  SUBMITTED = 3
30  # Job is pending execution on a remote queuing system.
31  QUEUED_REMOTE = 4
32  # Job is running locally.
33  RUNNING_LOCAL = 5
34  # Job is running remotely.
35  RUNNING_REMOTE = 6
36  # Job has completed.
37  FINISHED = 7
38  # Job has been terminated at a user request.
39  CANCELED = 8
40  # Job has been terminated due to an error.
41  ERROR = 9
42
43class FilePath:
44  def __init__(self):
45    self.path = None
46
47class FileContents:
48  def __init__(self):
49    self.filename = None
50    self.contents = None
51
52class Job:
53  def __init__(self):
54    self.queue = None
55    self.program = None
56    self.description = ''
57    self.input_file = None
58    self.output_directory = None
59    self.local_working_directory = None
60    self.clean_remote_files = False
61    self.retrieve_output = True
62    self.clean_local_working_directory = False
63    self.hide_from_gui = False
64    self.popup_on_state_change = True
65    self.number_of_cores = 1
66    self.max_wall_time = -1
67
68  def job_state(self):
69    return self._job_state
70
71  def molequeue_id(self):
72    return self._mole_queue_id
73
74  def queue_id(self):
75    return self._queue_id
76
77class Queue:
78  def __init__(self):
79    self.name = None;
80    self.programs = [];
81
82class EventLoop(Thread):
83  def __init__(self, io_loop):
84    Thread.__init__(self)
85    self.io_loop = io_loop
86
87  def run(self):
88    self.io_loop.start()
89
90  def stop(self):
91    self.io_loop.stop()
92    self.join()
93
94class MoleQueueException(Exception):
95  """The base class of all MoleQueue exceptions """
96  pass
97
98class JobException(MoleQueueException):
99  def __init__(self, packet_id, code, message):
100    self.packet_id = packet_id
101    self.code = code
102    self.message = message
103
104class JobInformationException(MoleQueueException):
105  def __init__(self, packet_id, data, code, message):
106    self.packet_id = packet_id
107    self.data = data
108    self.code = code
109    self.message = message
110
111class Client:
112
113  def __init__(self):
114    self._current_packet_id = 0
115    self._request_response_map = {}
116    self._new_response_condition = Condition()
117    self._packet_id_lock = Lock()
118    self._notification_callbacks = []
119
120  def connect_to_server(self, server):
121    self.context = zmq.Context()
122    self.socket = self.context.socket(zmq.DEALER)
123
124    tmpdir = tempfile.gettempdir()
125    connection_string  = 'ipc://%s/%s_%s' %  (tmpdir, 'zmq', server)
126    self.socket.connect(connection_string)
127
128    io_loop = ioloop.IOLoop.instance()
129    self.stream = ZMQStream(self.socket, io_loop=io_loop)
130
131    # create partial function that has self as first argument
132    callback = partial(_on_recv, self)
133    self.stream.on_recv(callback)
134    self.event_loop = EventLoop(io_loop)
135    self.event_loop.start()
136
137  def disconnect(self):
138    self.stream.flush()
139    self.event_loop.stop()
140    self.socket.close()
141
142  def register_notification_callback(self, callback):
143    # check a valid function has been past
144    assert callable(callback)
145    self._notification_callbacks.append(callback)
146
147  def request_queue_list_update(self, timeout=None):
148    packet_id = self._next_packet_id()
149
150    jsonrpc = JsonRpc.generate_request(packet_id,
151                                       'listQueues',
152                                       None)
153
154    self._send_request(packet_id, jsonrpc)
155    response = self._wait_for_response(packet_id, timeout)
156
157    # Timeout
158    if response == None:
159      return None
160
161    queues = JsonRpc.json_to_queues(response)
162
163    return queues
164
165  def submit_job(self, request, timeout=None):
166    params = JsonRpc.object_to_json_params(request)
167    packet_id = self._next_packet_id()
168
169    jsonrpc = JsonRpc.generate_request(packet_id,
170                                       'submitJob',
171                                       params)
172
173    self._send_request(packet_id, jsonrpc)
174    response = self._wait_for_response(packet_id, timeout)
175
176    # Timeout
177    if response == None:
178      return None
179
180    # if we an error occurred then throw an exception
181    if 'error' in response:
182      exception = JobException(response['id'],
183                               response['error']['code'],
184                               response['error']['message'])
185      raise exception
186
187    # otherwise return the molequeue id
188    return response['result']['moleQueueId']
189
190  def cancel_job(self):
191    # TODO
192    pass
193
194  def lookup_job(self, molequeue_id, timeout=None):
195
196    params = {'moleQueueId': molequeue_id}
197
198    packet_id = self._next_packet_id()
199    jsonrpc = JsonRpc.generate_request(packet_id,
200                                      'lookupJob',
201                                      params)
202
203    self._send_request(packet_id, jsonrpc)
204    response = self._wait_for_response(packet_id, timeout)
205
206    # Timeout
207    if response == None:
208      return None
209
210    # if we an error occurred then throw an exception
211    if 'error' in response:
212      exception = JobInformationException(response['id'],
213                                          response['error']['data'],
214                                          response['error']['code'],
215                                          response['error']['message'])
216      raise exception
217
218    job = JsonRpc.json_to_job(response)
219
220    return job
221
222
223  def _on_response(self, packet_id, msg):
224    if packet_id in self._request_response_map:
225      self._new_response_condition.acquire()
226      self._request_response_map[packet_id] = msg
227      # notify any threads waiting that their response may have arrived
228      self._new_response_condition.notify_all()
229      self._new_response_condition.release()
230
231  # TODO Convert raw JSON into a Python class
232  def _on_notification(self, msg):
233    for callback in self._notification_callbacks:
234      callback(msg)
235
236  # Testing only method. Kill the server application if allowed.
237  def _send_rpc_kill_request(self, timeout=None):
238    params = {}
239    packet_id = self._next_packet_id()
240    jsonrpc = JsonRpc.generate_request(packet_id, 'rpcKill', params)
241    self._send_request(packet_id, jsonrpc)
242    response = self._wait_for_response(packet_id, timeout)
243
244    # Timeout
245    if response == None:
246      return None
247
248    if 'result' in response and 'success' in response['result'] and response['result']['success'] == True:
249      return True
250    return False
251
252  def _next_packet_id(self):
253    with self._packet_id_lock:
254      self._current_packet_id += 1
255      next = self._current_packet_id
256    return next
257
258  def _send_request(self, packet_id, jsonrpc):
259    # add to request map so we know we are waiting on  response for this packet
260    # id
261    self._request_response_map[packet_id] = None
262    self.stream.send(str(jsonrpc))
263    self.stream.flush()
264
265  def _wait_for_response(self, packet_id, timeout):
266    try:
267      start = time.time()
268      # wait for the response to come in
269      self._new_response_condition.acquire()
270      while self._request_response_map[packet_id] == None:
271        # need to set a wait time otherwise the wait can't be interrupted
272        # See http://bugs.python.org/issue8844
273        wait_time = sys.maxint
274        if timeout != None:
275          wait_time = timeout - (time.time() - start)
276          if wait_time <= 0:
277            break;
278        self._new_response_condition.wait(wait_time)
279
280      response = self._request_response_map.pop(packet_id)
281
282      self._new_response_condition.release()
283
284      return response
285    except KeyboardInterrupt:
286      self.event_loop.stop()
287      raise
288
289def _on_recv(client, msg):
290  jsonrpc = json.loads(msg[0])
291
292  # reply to a request
293  if 'id' in jsonrpc:
294    packet_id = jsonrpc['id']
295    client._on_response(packet_id, jsonrpc)
296  # this is a notification
297  else:
298    client._on_notification(jsonrpc)
299