1# -*- coding: utf-8 -*- 2# Copyright 2016 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Thread message classes. 16 17Messages are added to the status queue. 18""" 19 20from __future__ import absolute_import 21from __future__ import print_function 22from __future__ import division 23from __future__ import unicode_literals 24 25import os 26import threading 27 28from apitools.base.py.exceptions import Error as apitools_service_error 29from six.moves.http_client import error as six_service_error 30 31 32class StatusMessage(object): 33 """General StatusMessage class. 34 35 All Message classes inherit this StatusMessage class. 36 """ 37 38 def __init__(self, message_time, process_id=None, thread_id=None): 39 """Creates a Message. 40 41 Args: 42 message_time: Time that this message was created (since Epoch). 43 process_id: Process ID that produced this message (overridable for 44 testing). 45 thread_id: Thread ID that produced this message (overridable for testing). 46 """ 47 self.time = message_time 48 self.process_id = process_id or os.getpid() 49 self.thread_id = thread_id or threading.current_thread().ident 50 51 def __str__(self): 52 """Returns a string with a valid constructor for this message.""" 53 return ( 54 '%s(%s, process_id=%s, thread_id=%s)' % 55 (self.__class__.__name__, self.time, self.process_id, self.thread_id)) 56 57 58class RetryableErrorMessage(StatusMessage): 59 """Message class for retryable errors encountered by the JSON API. 60 61 This class contains information about the retryable error encountered to 62 report to analytics collection and to display in the UI. 63 """ 64 65 def __init__(self, 66 exception, 67 message_time, 68 num_retries=0, 69 total_wait_sec=0, 70 process_id=None, 71 thread_id=None): 72 """Creates a RetryableErrorMessage. 73 74 Args: 75 exception: The retryable error that was thrown. 76 message_time: Float representing when message was created (seconds since 77 Epoch). 78 num_retries: The number of retries consumed so far. 79 total_wait_sec: The total amount of time waited so far in retrying. 80 process_id: Process ID that produced this message (overridable for 81 testing). 82 thread_id: Thread ID that produced this message (overridable for testing). 83 """ 84 super(RetryableErrorMessage, self).__init__(message_time, 85 process_id=process_id, 86 thread_id=thread_id) 87 88 self.error_type = exception.__class__.__name__ 89 # The socket module error class names aren't descriptive enough, so we 90 # make the error_type more specific. Standard Python uses the module name 91 # 'socket' while PyPy uses '_socket' instead. 92 if exception.__class__.__module__ in ('socket', '_socket'): 93 self.error_type = 'Socket' + exception.__class__.__name__.capitalize() 94 95 if (isinstance(exception, apitools_service_error) or 96 isinstance(exception, six_service_error)): 97 self.is_service_error = True 98 else: 99 self.is_service_error = False 100 101 # The number of retries consumed to display to the user. 102 self.num_retries = num_retries 103 104 # The total amount of time waited on the request to display to the user. 105 self.total_wait_sec = total_wait_sec 106 107 def __str__(self): 108 """Returns a string with a valid constructor for this message.""" 109 return ('%s(%s(), num_retries=%s, total_wait_sec=%s, ' 110 'time=%s, process_id=%s, thread_id=%s)' % 111 (self.__class__.__name__, self.error_type, self.num_retries, 112 self.total_wait_sec, self.time, self.process_id, self.thread_id)) 113 114 115class FinalMessage(StatusMessage): 116 """Creates a FinalMessage. 117 118 A FinalMessage simply indicates that we have finished our operation. 119 """ 120 121 def __init__(self, message_time): 122 """Creates a FinalMessage. 123 124 Args: 125 message_time: Float representing when message was created (seconds since 126 Epoch). 127 """ 128 super(FinalMessage, self).__init__(message_time) 129 130 131class MetadataMessage(StatusMessage): 132 """Creates a MetadataMessage. 133 134 A MetadataMessage simply indicates that a metadata operation on a given object 135 has been successfully done. The only passed argument is the time when such 136 operation has finished. 137 """ 138 139 def __init__(self, message_time): 140 """Creates a MetadataMessage. 141 142 Args: 143 message_time: Float representing when message was created (seconds since 144 Epoch). 145 """ 146 super(MetadataMessage, self).__init__(message_time) 147 148 149class FileMessage(StatusMessage): 150 """Marks start or end of an operation for a file, cloud object or component. 151 152 This class contains general information about each file/component. With that, 153 information such as total size and estimated time remaining may be calculated 154 beforehand. 155 """ 156 157 # Enum message types 158 FILE_DOWNLOAD = 1 159 FILE_UPLOAD = 2 160 FILE_CLOUD_COPY = 3 161 FILE_LOCAL_COPY = 4 162 FILE_DAISY_COPY = 5 163 FILE_REWRITE = 6 164 FILE_HASH = 7 165 COMPONENT_TO_UPLOAD = 8 166 # EXISTING_COMPONENT describes a component that already exists. The field 167 # finished does not apply quite well for it, but the convention used by the UI 168 # is to process the component alongside FileMessages from other components 169 # when finished==False, and to ignore a FileMessage made for 170 # EXISTING_COMPONENT when finished==True. 171 EXISTING_COMPONENT = 9 172 COMPONENT_TO_DOWNLOAD = 10 173 EXISTING_OBJECT_TO_DELETE = 11 174 175 def __init__(self, 176 src_url, 177 dst_url, 178 message_time, 179 size=None, 180 finished=False, 181 component_num=None, 182 message_type=None, 183 bytes_already_downloaded=None, 184 process_id=None, 185 thread_id=None): 186 """Creates a FileMessage. 187 188 Args: 189 src_url: FileUrl/CloudUrl representing the source file. 190 dst_url: FileUrl/CloudUrl representing the destination file. 191 message_time: Float representing when message was created (seconds since 192 Epoch). 193 size: Total size of this file/component, in bytes. 194 finished: Boolean to indicate whether this is starting or finishing 195 a file/component transfer. 196 component_num: Component number, if dealing with a component. 197 message_type: Type of the file/component. 198 bytes_already_downloaded: Specific field for resuming downloads. When 199 starting a component download, it tells how many bytes were already 200 downloaded. 201 process_id: Process ID that produced this message (overridable for 202 testing). 203 thread_id: Thread ID that produced this message (overridable for testing). 204 """ 205 206 super(FileMessage, self).__init__(message_time, 207 process_id=process_id, 208 thread_id=thread_id) 209 self.src_url = src_url 210 self.dst_url = dst_url 211 self.size = size 212 self.component_num = component_num 213 self.finished = finished 214 self.message_type = message_type 215 self.bytes_already_downloaded = bytes_already_downloaded 216 217 def __str__(self): 218 """Returns a string with a valid constructor for this message.""" 219 return ('%s(\'%s\', \'%s\', %s, size=%s, finished=%s, component_num=%s, ' 220 'message_type=%s, bytes_already_downloaded=%s, process_id=%s, ' 221 'thread_id=%s)' % 222 (self.__class__.__name__, self.src_url, self.dst_url, self.time, 223 self.size, self.finished, self.component_num, self.message_type, 224 self.bytes_already_downloaded, self.process_id, self.thread_id)) 225 226 227class ProgressMessage(StatusMessage): 228 """Message class for a file/object/component progress. 229 230 This class contains specific information about the current progress of a file, 231 cloud object or single component. 232 """ 233 234 def __init__(self, 235 size, 236 processed_bytes, 237 src_url, 238 message_time, 239 dst_url=None, 240 component_num=None, 241 operation_name=None, 242 process_id=None, 243 thread_id=None): 244 """Creates a ProgressMessage. 245 246 Args: 247 size: Integer for total size of this file/component, in bytes. 248 processed_bytes: Integer for number of bytes already processed from that 249 specific component, which means processed_bytes <= size. 250 src_url: FileUrl/CloudUrl representing the source file. 251 message_time: Float representing when message was created (seconds since 252 Epoch). 253 dst_url: FileUrl/CloudUrl representing the destination file, or None 254 for unary operations like hashing. 255 component_num: Indicates the component number, if any. 256 operation_name: Name of the operation that is being made over that 257 component. 258 process_id: Process ID that produced this message (overridable for 259 testing). 260 thread_id: Thread ID that produced this message (overridable for testing). 261 """ 262 super(ProgressMessage, self).__init__(message_time) 263 self.size = size 264 self.processed_bytes = processed_bytes 265 self.component_num = component_num 266 self.src_url = src_url 267 self.dst_url = dst_url 268 self.finished = (size == processed_bytes) 269 self.operation_name = operation_name 270 271 def __str__(self): 272 """Returns a string with a valid constructor for this message.""" 273 # For a valid constructor, None should not be quoted. 274 dst_url_string = ('\'%s\'' % self.dst_url) if self.dst_url else None 275 operation_name_string = ( 276 '\'%s\'' % self.operation_name) if self.operation_name else None 277 return ('%s(%s, %s, \'%s\', %s, dst_url=%s, component_num=%s, ' 278 'operation_name=%s, process_id=%s, thread_id=%s)' % 279 (self.__class__.__name__, self.size, self.processed_bytes, 280 self.src_url, self.time, dst_url_string, self.component_num, 281 operation_name_string, self.process_id, self.thread_id)) 282 283 284class SeekAheadMessage(StatusMessage): 285 """Message class for results obtained by SeekAheadThread(). 286 287 It estimates the number of objects and total size in case the task_queue 288 cannot hold all tasks at once (only used in large operations). 289 This class contains information about all the objects yet to be processed. 290 """ 291 292 def __init__(self, num_objects, size, message_time): 293 """Creates a SeekAheadMessage. 294 295 Args: 296 num_objects: Number of total objects that the SeekAheadThread estimates. 297 size: Total size corresponding to the sum of the size of objects iterated 298 by SeekAheadThread. 299 message_time: Float representing when message was created (seconds since 300 Epoch). 301 """ 302 super(SeekAheadMessage, self).__init__(message_time) 303 self.num_objects = num_objects 304 self.size = size 305 306 def __str__(self): 307 """Returns a string with a valid constructor for this message.""" 308 return ('%s(%s, %s, %s, process_id=%s, thread_id=%s)' % 309 (self.__class__.__name__, self.num_objects, self.size, self.time, 310 self.process_id, self.thread_id)) 311 312 313class ProducerThreadMessage(StatusMessage): 314 """Message class for results obtained by calculations made on ProducerThread. 315 316 It estimates the number of objects and total size currently dealty by 317 task_queue. If the task_queue cannot support all objects at once, the 318 SeekAheadThread will be responsible for sending an accurate message. 319 """ 320 321 def __init__(self, num_objects, size, message_time, finished=False): 322 """Creates a SeekAheadMessage. 323 324 Args: 325 num_objects: Number of total objects that the task_queue has. 326 size: Total size corresponding to the sum of the size of objects iterated 327 by the task_queue 328 message_time: Float representing when message was created (seconds since 329 Epoch). 330 finished: Boolean to indicate whether this is the final message from the 331 ProducerThread. The difference is that this message displays 332 the correct total size and number of objects, whereas the 333 previous ones were periodic (on the number of files) updates. 334 """ 335 super(ProducerThreadMessage, self).__init__(message_time) 336 self.num_objects = num_objects 337 self.size = size 338 self.finished = finished 339 340 def __str__(self): 341 """Returns a string with a valid constructor for this message.""" 342 return ('%s(%s, %s, %s, finished=%s)' % 343 (self.__class__.__name__, self.num_objects, self.size, self.time, 344 self.finished)) 345 346 347class PerformanceSummaryMessage(StatusMessage): 348 """Message class to log PerformanceSummary parameters. 349 350 This class acts as a relay between a multiprocess/multithread situation and 351 the global status queue, from which the PerformanceSummary info gets consumed. 352 """ 353 354 def __init__(self, message_time, uses_slice): 355 """Creates a PerformanceSummaryMessage. 356 357 Args: 358 message_time: Float representing when message was created (seconds since 359 Epoch). 360 uses_slice: True if the command uses slice parallelism. 361 """ 362 super(PerformanceSummaryMessage, self).__init__(message_time, 363 process_id=None, 364 thread_id=None) 365 self.uses_slice = uses_slice 366 367 def __str__(self): 368 """Returns a string with a valid constructor for this message.""" 369 return ('%s(%s, %s)' % 370 (self.__class__.__name__, self.time, self.uses_slice)) 371