1# -*- coding: utf-8 -*-
2# Copyright 2016 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Thread message classes.
16
17Messages are added to the status queue.
18"""
19
20from __future__ import absolute_import
21from __future__ import print_function
22from __future__ import division
23from __future__ import unicode_literals
24
25import os
26import threading
27
28from apitools.base.py.exceptions import Error as apitools_service_error
29from six.moves.http_client import error as six_service_error
30
31
32class StatusMessage(object):
33  """General StatusMessage class.
34
35  All Message classes inherit this StatusMessage class.
36  """
37
38  def __init__(self, message_time, process_id=None, thread_id=None):
39    """Creates a Message.
40
41    Args:
42      message_time: Time that this message was created (since Epoch).
43      process_id: Process ID that produced this message (overridable for
44          testing).
45      thread_id: Thread ID that produced this message (overridable for testing).
46    """
47    self.time = message_time
48    self.process_id = process_id or os.getpid()
49    self.thread_id = thread_id or threading.current_thread().ident
50
51  def __str__(self):
52    """Returns a string with a valid constructor for this message."""
53    return (
54        '%s(%s, process_id=%s, thread_id=%s)' %
55        (self.__class__.__name__, self.time, self.process_id, self.thread_id))
56
57
58class RetryableErrorMessage(StatusMessage):
59  """Message class for retryable errors encountered by the JSON API.
60
61  This class contains information about the retryable error encountered to
62  report to analytics collection and to display in the UI.
63  """
64
65  def __init__(self,
66               exception,
67               message_time,
68               num_retries=0,
69               total_wait_sec=0,
70               process_id=None,
71               thread_id=None):
72    """Creates a RetryableErrorMessage.
73
74    Args:
75      exception: The retryable error that was thrown.
76      message_time: Float representing when message was created (seconds since
77          Epoch).
78      num_retries: The number of retries consumed so far.
79      total_wait_sec: The total amount of time waited so far in retrying.
80      process_id: Process ID that produced this message (overridable for
81          testing).
82      thread_id: Thread ID that produced this message (overridable for testing).
83    """
84    super(RetryableErrorMessage, self).__init__(message_time,
85                                                process_id=process_id,
86                                                thread_id=thread_id)
87
88    self.error_type = exception.__class__.__name__
89    # The socket module error class names aren't descriptive enough, so we
90    # make the error_type more specific. Standard Python uses the module name
91    # 'socket' while PyPy uses '_socket' instead.
92    if exception.__class__.__module__ in ('socket', '_socket'):
93      self.error_type = 'Socket' + exception.__class__.__name__.capitalize()
94
95    if (isinstance(exception, apitools_service_error) or
96        isinstance(exception, six_service_error)):
97      self.is_service_error = True
98    else:
99      self.is_service_error = False
100
101    # The number of retries consumed to display to the user.
102    self.num_retries = num_retries
103
104    # The total amount of time waited on the request to display to the user.
105    self.total_wait_sec = total_wait_sec
106
107  def __str__(self):
108    """Returns a string with a valid constructor for this message."""
109    return ('%s(%s(), num_retries=%s, total_wait_sec=%s, '
110            'time=%s, process_id=%s, thread_id=%s)' %
111            (self.__class__.__name__, self.error_type, self.num_retries,
112             self.total_wait_sec, self.time, self.process_id, self.thread_id))
113
114
115class FinalMessage(StatusMessage):
116  """Creates a FinalMessage.
117
118  A FinalMessage simply indicates that we have finished our operation.
119  """
120
121  def __init__(self, message_time):
122    """Creates a FinalMessage.
123
124    Args:
125      message_time: Float representing when message was created (seconds since
126          Epoch).
127    """
128    super(FinalMessage, self).__init__(message_time)
129
130
131class MetadataMessage(StatusMessage):
132  """Creates a MetadataMessage.
133
134  A MetadataMessage simply indicates that a metadata operation on a given object
135  has been successfully done. The only passed argument is the time when such
136  operation has finished.
137  """
138
139  def __init__(self, message_time):
140    """Creates a MetadataMessage.
141
142    Args:
143      message_time: Float representing when message was created (seconds since
144          Epoch).
145    """
146    super(MetadataMessage, self).__init__(message_time)
147
148
149class FileMessage(StatusMessage):
150  """Marks start or end of an operation for a file, cloud object or component.
151
152  This class contains general information about each file/component. With that,
153  information such as total size and estimated time remaining may be calculated
154  beforehand.
155  """
156
157  # Enum message types
158  FILE_DOWNLOAD = 1
159  FILE_UPLOAD = 2
160  FILE_CLOUD_COPY = 3
161  FILE_LOCAL_COPY = 4
162  FILE_DAISY_COPY = 5
163  FILE_REWRITE = 6
164  FILE_HASH = 7
165  COMPONENT_TO_UPLOAD = 8
166  # EXISTING_COMPONENT describes a component that already exists. The field
167  # finished does not apply quite well for it, but the convention used by the UI
168  # is to process the component alongside FileMessages from other components
169  # when finished==False, and to ignore a FileMessage made for
170  # EXISTING_COMPONENT when finished==True.
171  EXISTING_COMPONENT = 9
172  COMPONENT_TO_DOWNLOAD = 10
173  EXISTING_OBJECT_TO_DELETE = 11
174
175  def __init__(self,
176               src_url,
177               dst_url,
178               message_time,
179               size=None,
180               finished=False,
181               component_num=None,
182               message_type=None,
183               bytes_already_downloaded=None,
184               process_id=None,
185               thread_id=None):
186    """Creates a FileMessage.
187
188    Args:
189      src_url: FileUrl/CloudUrl representing the source file.
190      dst_url: FileUrl/CloudUrl representing the destination file.
191      message_time: Float representing when message was created (seconds since
192          Epoch).
193      size: Total size of this file/component, in bytes.
194      finished: Boolean to indicate whether this is starting or finishing
195          a file/component transfer.
196      component_num: Component number, if dealing with a component.
197      message_type: Type of the file/component.
198      bytes_already_downloaded: Specific field for resuming downloads. When
199          starting a component download, it tells how many bytes were already
200          downloaded.
201      process_id: Process ID that produced this message (overridable for
202          testing).
203      thread_id: Thread ID that produced this message (overridable for testing).
204    """
205
206    super(FileMessage, self).__init__(message_time,
207                                      process_id=process_id,
208                                      thread_id=thread_id)
209    self.src_url = src_url
210    self.dst_url = dst_url
211    self.size = size
212    self.component_num = component_num
213    self.finished = finished
214    self.message_type = message_type
215    self.bytes_already_downloaded = bytes_already_downloaded
216
217  def __str__(self):
218    """Returns a string with a valid constructor for this message."""
219    return ('%s(\'%s\', \'%s\', %s, size=%s, finished=%s, component_num=%s, '
220            'message_type=%s, bytes_already_downloaded=%s, process_id=%s, '
221            'thread_id=%s)' %
222            (self.__class__.__name__, self.src_url, self.dst_url, self.time,
223             self.size, self.finished, self.component_num, self.message_type,
224             self.bytes_already_downloaded, self.process_id, self.thread_id))
225
226
227class ProgressMessage(StatusMessage):
228  """Message class for a file/object/component progress.
229
230  This class contains specific information about the current progress of a file,
231  cloud object or single component.
232  """
233
234  def __init__(self,
235               size,
236               processed_bytes,
237               src_url,
238               message_time,
239               dst_url=None,
240               component_num=None,
241               operation_name=None,
242               process_id=None,
243               thread_id=None):
244    """Creates a ProgressMessage.
245
246    Args:
247      size: Integer for total size of this file/component, in bytes.
248      processed_bytes: Integer for number of bytes already processed from that
249          specific component, which means processed_bytes <= size.
250      src_url: FileUrl/CloudUrl representing the source file.
251      message_time: Float representing when message was created (seconds since
252          Epoch).
253      dst_url: FileUrl/CloudUrl representing the destination file, or None
254          for unary operations like hashing.
255      component_num: Indicates the component number, if any.
256      operation_name: Name of the operation that is being made over that
257          component.
258      process_id: Process ID that produced this message (overridable for
259          testing).
260      thread_id: Thread ID that produced this message (overridable for testing).
261    """
262    super(ProgressMessage, self).__init__(message_time)
263    self.size = size
264    self.processed_bytes = processed_bytes
265    self.component_num = component_num
266    self.src_url = src_url
267    self.dst_url = dst_url
268    self.finished = (size == processed_bytes)
269    self.operation_name = operation_name
270
271  def __str__(self):
272    """Returns a string with a valid constructor for this message."""
273    # For a valid constructor, None should not be quoted.
274    dst_url_string = ('\'%s\'' % self.dst_url) if self.dst_url else None
275    operation_name_string = (
276        '\'%s\'' % self.operation_name) if self.operation_name else None
277    return ('%s(%s, %s, \'%s\', %s, dst_url=%s, component_num=%s, '
278            'operation_name=%s, process_id=%s, thread_id=%s)' %
279            (self.__class__.__name__, self.size, self.processed_bytes,
280             self.src_url, self.time, dst_url_string, self.component_num,
281             operation_name_string, self.process_id, self.thread_id))
282
283
284class SeekAheadMessage(StatusMessage):
285  """Message class for results obtained by SeekAheadThread().
286
287  It estimates the number of objects and total size in case the task_queue
288  cannot hold all tasks at once (only used in large operations).
289  This class contains information about all the objects yet to be processed.
290  """
291
292  def __init__(self, num_objects, size, message_time):
293    """Creates a SeekAheadMessage.
294
295    Args:
296      num_objects: Number of total objects that the SeekAheadThread estimates.
297      size: Total size corresponding to the sum of the size of objects iterated
298          by SeekAheadThread.
299      message_time: Float representing when message was created (seconds since
300          Epoch).
301    """
302    super(SeekAheadMessage, self).__init__(message_time)
303    self.num_objects = num_objects
304    self.size = size
305
306  def __str__(self):
307    """Returns a string with a valid constructor for this message."""
308    return ('%s(%s, %s, %s, process_id=%s, thread_id=%s)' %
309            (self.__class__.__name__, self.num_objects, self.size, self.time,
310             self.process_id, self.thread_id))
311
312
313class ProducerThreadMessage(StatusMessage):
314  """Message class for results obtained by calculations made on ProducerThread.
315
316  It estimates the number of objects and total size currently dealty by
317  task_queue. If the task_queue cannot support all objects at once, the
318  SeekAheadThread will be responsible for sending an accurate message.
319  """
320
321  def __init__(self, num_objects, size, message_time, finished=False):
322    """Creates a SeekAheadMessage.
323
324    Args:
325      num_objects: Number of total objects that the task_queue has.
326      size: Total size corresponding to the sum of the size of objects iterated
327          by the task_queue
328      message_time: Float representing when message was created (seconds since
329          Epoch).
330      finished: Boolean to indicate whether this is the final message from the
331          ProducerThread. The difference is that this message displays
332          the correct total size and number of objects, whereas the
333          previous ones were periodic (on the number of files) updates.
334    """
335    super(ProducerThreadMessage, self).__init__(message_time)
336    self.num_objects = num_objects
337    self.size = size
338    self.finished = finished
339
340  def __str__(self):
341    """Returns a string with a valid constructor for this message."""
342    return ('%s(%s, %s, %s, finished=%s)' %
343            (self.__class__.__name__, self.num_objects, self.size, self.time,
344             self.finished))
345
346
347class PerformanceSummaryMessage(StatusMessage):
348  """Message class to log PerformanceSummary parameters.
349
350  This class acts as a relay between a multiprocess/multithread situation and
351  the global status queue, from which the PerformanceSummary info gets consumed.
352  """
353
354  def __init__(self, message_time, uses_slice):
355    """Creates a PerformanceSummaryMessage.
356
357    Args:
358      message_time: Float representing when message was created (seconds since
359          Epoch).
360      uses_slice: True if the command uses slice parallelism.
361    """
362    super(PerformanceSummaryMessage, self).__init__(message_time,
363                                                    process_id=None,
364                                                    thread_id=None)
365    self.uses_slice = uses_slice
366
367  def __str__(self):
368    """Returns a string with a valid constructor for this message."""
369    return ('%s(%s, %s)' %
370            (self.__class__.__name__, self.time, self.uses_slice))
371