1# -*- coding: utf-8 -*- 2# coding=utf-8 3# -------------------------------------------------------------------------- 4# Copyright (c) Microsoft Corporation. All rights reserved. 5# Licensed under the MIT License. See License.txt in the project root for 6# license information. 7# -------------------------------------------------------------------------- 8 9""" 10Low-level classes for managing data transfer. 11""" 12from __future__ import print_function 13 14from collections import namedtuple, Counter 15from concurrent.futures import ThreadPoolExecutor 16import logging 17import multiprocessing 18import signal 19import sys 20import threading 21import time 22import uuid 23import operator 24import os 25 26from .exceptions import DatalakeIncompleteTransferException 27 28logger = logging.getLogger(__name__) 29 30 31class StateManager(object): 32 """ 33 Manages state for any hashable object. 34 35 When tracking multiple files and their chunks, each file/chunk can be in 36 any valid state for that particular type. 37 38 At the simplest level, we need to set and retrieve an object's current 39 state, while only allowing valid states to be used. In addition, we also 40 need to give statistics about a group of objects (are all objects in one 41 state? how many objects are in each available state?). 42 43 Parameters 44 ---------- 45 states: list of valid states 46 Managed objects can only use these defined states. 47 48 Examples 49 -------- 50 >>> StateManager('draft', 'review', 'complete') # doctest: +SKIP 51 <StateManager: draft=0 review=0 complete=0> 52 >>> mgr = StateManager('off', 'on') 53 >>> mgr['foo'] = 'on' 54 >>> mgr['bar'] = 'off' 55 >>> mgr['quux'] = 'on' 56 >>> mgr # doctest: +SKIP 57 <StateManager: off=1 on=2> 58 >>> mgr.contains_all('on') 59 False 60 >>> mgr['bar'] = 'on' 61 >>> mgr.contains_all('on') 62 True 63 >>> mgr.contains_none('off') 64 True 65 66 Internal class used by `ADLTransferClient`. 67 """ 68 def __init__(self, *states): 69 self._states = {state: set() for state in states} 70 self._objects = {} 71 72 @property 73 def states(self): 74 return list(self._states) 75 76 @property 77 def objects(self): 78 return list(self._objects) 79 80 def __iter__(self): 81 return iter(self._objects.items()) 82 83 def __getitem__(self, obj): 84 return self._objects[obj] 85 86 def __setitem__(self, obj, state): 87 if obj in self._objects: 88 self._states[self._objects[obj]].discard(obj) 89 self._states[state].add(obj) 90 self._objects[obj] = state 91 92 def contains_all(self, state): 93 """ Return whether all managed objects are in the given state """ 94 objs = self._states[state] 95 return len(objs) > 0 and len(self.objects) - len(objs) == 0 96 97 def contains_none(self, *states): 98 """ Return whether no managed objects are in the given states """ 99 return all([len(self._states[state]) == 0 for state in states]) 100 101 def __str__(self): 102 status = " ".join( 103 ["%s=%d" % (s, len(self._states[s])) for s in self._states]) 104 return "<StateManager: " + status + ">" 105 106 __repr__ = __str__ 107 108 109# Named tuples used to serialize client progress 110File = namedtuple('File', 'src dst state length chunks exception') 111Chunk = namedtuple('Chunk', 'name state offset expected actual exception') 112 113 114class ADLTransferClient(object): 115 """ 116 Client for transferring data from/to Azure DataLake Store 117 118 This is intended as the underlying class for `ADLDownloader` and 119 `ADLUploader`. If necessary, it can be used directly for additional 120 control. 121 122 Parameters 123 ---------- 124 adlfs: ADL filesystem instance 125 name: str 126 Unique ID used for persistence. 127 transfer: callable 128 Function or callable object invoked when transferring chunks. See 129 ``Function Signatures``. 130 merge: callable [None] 131 Function or callable object invoked when merging chunks. For each file 132 containing only one chunk, no merge function will be called, even if 133 provided. If None, then merging is skipped. See 134 ``Function Signatures``. 135 nthreads: int [None] 136 Number of threads to use (minimum is 1). If None, uses the number of 137 cores. 138 chunksize: int [2**28] 139 Number of bytes for a chunk. Large files are split into chunks. Files 140 smaller than this number will always be transferred in a single thread. 141 buffersize: int [2**25] 142 Number of bytes for internal buffer. This block cannot be bigger than 143 a chunk and cannot be smaller than a block. 144 blocksize: int [2**25] 145 Number of bytes for a block. Within each chunk, we write a smaller 146 block for each API call. This block cannot be bigger than a chunk. 147 chunked: bool [True] 148 If set, each transferred chunk is stored in a separate file until 149 chunks are gathered into a single file. Otherwise, each chunk will be 150 written into the same destination file. 151 unique_temporary: bool [True] 152 If set, transferred chunks are written into a unique temporary 153 directory. 154 persist_path: str [None] 155 Path used for persisting a client's state. If None, then `save()` 156 and `load()` will be empty operations. 157 delimiter: byte(s) or None 158 If set, will transfer blocks using delimiters, as well as split 159 files for transferring on that delimiter. 160 parent: ADLDownloader, ADLUploader or None 161 In typical usage, the transfer client is created in the context of an 162 upload or download, which can be persisted between sessions. 163 progress_callback: callable [None] 164 Callback for progress with signature function(current, total) where 165 current is the number of bytes transferred so far, and total is the 166 size of the blob, or None if the total size is unknown. 167 timeout: int (0) 168 Default value 0 means infinite timeout. Otherwise time in seconds before the 169 process will stop and raise an exception if transfer is still in progress 170 171 Temporary Files 172 --------------- 173 174 When a merge step is available, the client will write chunks to temporary 175 files before merging. The exact temporary file looks like this in 176 pseudo-BNF: 177 178 >>> # {dirname}/{basename}.segments[.{unique_str}]/{basename}_{offset} 179 180 Function Signatures 181 ------------------- 182 183 To perform the actual work needed by the client, the user must pass in two 184 callables, `transfer` and `merge`. If merge is not provided, then the 185 merge step will be skipped. 186 187 The `transfer` callable has the function signature, 188 `fn(adlfs, src, dst, offset, size, buffersize, blocksize, shutdown_event)`. 189 `adlfs` is the ADL filesystem instance. `src` and `dst` refer to the source 190 and destination of the respective file transfer. `offset` is the location 191 in `src` to read `size` bytes from. `buffersize` is the number of bytes 192 used for internal buffering before transfer. `blocksize` is the number of 193 bytes in a chunk to write at one time. The callable should return an 194 integer representing the number of bytes written. 195 196 The `merge` callable has the function signature, 197 `fn(adlfs, outfile, files, shutdown_event)`. `adlfs` is the ADL filesystem 198 instance. `outfile` is the result of merging `files`. 199 200 For both transfer callables, `shutdown_event` is optional. In particular, 201 `shutdown_event` is a `threading.Event` that is passed to the callable. 202 The event will be set when a shutdown is requested. It is good practice 203 to listen for this. 204 205 Internal State 206 -------------- 207 208 self._fstates: StateManager 209 This captures the current state of each transferred file. 210 self._files: dict 211 Using a tuple of the file source/destination as the key, this 212 dictionary stores the file metadata and all chunk states. The 213 dictionary key is `(src, dst)` and the value is 214 `dict(length, cstates, exception)`. 215 self._chunks: dict 216 Using a tuple of the chunk name/offset as the key, this dictionary 217 stores the chunk metadata and has a reference to the chunk's parent 218 file. The dictionary key is `(name, offset)` and the value is 219 `dict(parent=(src, dst), expected, actual, exception)`. 220 self._ffutures: dict 221 Using a Future object as the key, this dictionary provides a reverse 222 lookup for the file associated with the given future. The returned 223 value is the file's primary key, `(src, dst)`. 224 self._cfutures: dict 225 Using a Future object as the key, this dictionary provides a reverse 226 lookup for the chunk associated with the given future. The returned 227 value is the chunk's primary key, `(name, offset)`. 228 229 See Also 230 -------- 231 azure.datalake.store.multithread.ADLDownloader 232 azure.datalake.store.multithread.ADLUploader 233 """ 234 235 def __init__(self, adlfs, transfer, merge=None, nthreads=None, 236 chunksize=2**28, blocksize=2**25, chunked=True, 237 unique_temporary=True, delimiter=None, 238 parent=None, verbose=False, buffersize=2**25, 239 progress_callback=None, timeout=0): 240 self._adlfs = adlfs 241 self._parent = parent 242 self._transfer = transfer 243 self._merge = merge 244 self._nthreads = max(1, nthreads or multiprocessing.cpu_count()) 245 self._chunksize = chunksize 246 self._buffersize = buffersize 247 self._blocksize = blocksize 248 self._chunked = chunked 249 self._unique_temporary = unique_temporary 250 self._unique_str = uuid.uuid4().hex 251 self._progress_callback=progress_callback 252 self._progress_lock = threading.Lock() 253 self._timeout = timeout 254 self.verbose = verbose 255 256 # Internal state tracking files/chunks/futures 257 self._progress_total_bytes = 0 258 self._transfer_total_bytes = 0 259 260 self._files = {} 261 self._chunks = {} 262 self._ffutures = {} 263 self._cfutures = {} 264 self._fstates = StateManager( 265 'pending', 'transferring', 'merging', 'finished', 'cancelled', 266 'errored') 267 268 def submit(self, src, dst, length): 269 """ 270 Split a given file into chunks. 271 272 All submitted files/chunks start in the `pending` state until `run()` 273 is called. 274 """ 275 cstates = StateManager( 276 'pending', 'running', 'finished', 'cancelled', 'errored') 277 278 # Create unique temporary directory for each file 279 if self._chunked: 280 if self._unique_temporary: 281 filename = "{}.segments.{}".format(dst.name, self._unique_str) 282 else: 283 filename = "{}.segments".format(dst.name) 284 tmpdir = dst.parent/filename 285 else: 286 tmpdir = None 287 288 # TODO: might need xrange support for py2 289 offsets = range(0, length, self._chunksize) 290 291 # in the case of empty files, ensure that the initial offset of 0 is properly added. 292 if not offsets: 293 if not length: 294 offsets = [0] 295 else: 296 raise DatalakeIncompleteTransferException('Could not compute offsets for source: {}, with destination: {} and expected length: {}.'.format(src, dst, length)) 297 298 tmpdir_and_offsets = tmpdir and len(offsets) > 1 299 for offset in offsets: 300 if tmpdir_and_offsets: 301 name = tmpdir / "{}_{}".format(dst.name, offset) 302 else: 303 name = dst 304 cstates[(name, offset)] = 'pending' 305 self._chunks[(name, offset)] = { 306 "parent": (src, dst), 307 "expected": min(length - offset, self._chunksize), 308 "actual": 0, 309 "exception": None} 310 logger.debug("Submitted %s, byte offset %d", name, offset) 311 312 self._fstates[(src, dst)] = 'pending' 313 self._files[(src, dst)] = { 314 "length": length, 315 "cstates": cstates, 316 "exception": None} 317 self._transfer_total_bytes += length 318 319 def _start(self, src, dst): 320 key = (src, dst) 321 self._fstates[key] = 'transferring' 322 for obj in self._files[key]['cstates'].objects: 323 name, offset = obj 324 cs = self._files[key]['cstates'] 325 if obj in cs.objects and cs[obj] == 'finished': 326 continue 327 cs[obj] = 'running' 328 future = self._pool.submit( 329 self._transfer, self._adlfs, src, name, offset, 330 self._chunks[obj]['expected'], self._buffersize, 331 self._blocksize, shutdown_event=self._shutdown_event) 332 self._cfutures[future] = obj 333 future.add_done_callback(self._update) 334 335 @property 336 def active(self): 337 """ Return whether the transfer is active """ 338 return not self._fstates.contains_none('pending', 'transferring', 'merging') 339 340 @property 341 def successful(self): 342 """ 343 Return whether the transfer completed successfully. 344 345 It will raise AssertionError if the transfer is active. 346 """ 347 assert not self.active 348 return self._fstates.contains_all('finished') 349 350 @property 351 def progress(self): 352 """ Return a summary of all transferred file/chunks """ 353 files = [] 354 for key in self._files: 355 src, dst = key 356 chunks = [] 357 for obj in self._files[key]['cstates'].objects: 358 name, offset = obj 359 chunks.append(Chunk( 360 name=name, 361 offset=offset, 362 state=self._files[key]['cstates'][obj], 363 expected=self._chunks[obj]['expected'], 364 actual=self._chunks[obj]['actual'], 365 exception=self._chunks[obj]['exception'])) 366 files.append(File( 367 src=src, 368 dst=dst, 369 state=self._fstates[key], 370 length=self._files[key]['length'], 371 chunks=chunks, 372 exception=self._files[key]['exception'])) 373 return files 374 375 def _rename_file(self, src, dst, overwrite=False): 376 """ Rename a file from file_name.inprogress to just file_name. Invoked once download completes on a file. 377 378 Internal function used by `download`. 379 """ 380 try: 381 # we do a final check to make sure someone didn't create the destination file while download was occuring 382 # if the user did not specify overwrite. 383 if os.path.isfile(dst): 384 if not overwrite: 385 raise FileExistsError(dst) 386 os.remove(dst) 387 os.rename(src, dst) 388 except Exception as e: 389 logger.error('Rename failed for source file: %r; %r', src, e) 390 raise e 391 392 logger.debug('Renamed %r to %r', src, dst) 393 394 def _update_progress(self, length): 395 if self._progress_callback is not None: 396 with self._progress_lock: 397 self._progress_total_bytes += length 398 self._progress_callback(self._progress_total_bytes, self._transfer_total_bytes) 399 400 def _update(self, future): 401 402 if future in self._cfutures: 403 obj = self._cfutures[future] 404 parent = self._chunks[obj]['parent'] 405 cstates = self._files[parent]['cstates'] 406 src, dst = parent 407 408 if future.cancelled(): 409 cstates[obj] = 'cancelled' 410 elif future.exception(): 411 self._chunks[obj]['exception'] = repr(future.exception()) 412 cstates[obj] = 'errored' 413 else: 414 nbytes, exception = future.result() 415 self._chunks[obj]['actual'] = nbytes 416 self._chunks[obj]['exception'] = exception 417 if exception: 418 cstates[obj] = 'errored' 419 elif self._chunks[obj]['expected'] != nbytes: 420 name, offset = obj 421 cstates[obj] = 'errored' 422 exception = DatalakeIncompleteTransferException( 423 'chunk {}, offset {}: expected {} bytes, transferred {} bytes'.format( 424 name, offset, self._chunks[obj]['expected'], 425 self._chunks[obj]['actual'])) 426 self._chunks[obj]['exception'] = exception 427 logger.error("Incomplete transfer: %s -> %s, %s", 428 src, dst, repr(exception)) 429 else: 430 cstates[obj] = 'finished' 431 self._update_progress(nbytes) 432 433 if cstates.contains_all('finished'): 434 logger.debug("Chunks transferred") 435 if self._merge and len(cstates.objects) > 1: 436 logger.debug("Merging file: %s", self._fstates[parent]) 437 self._fstates[parent] = 'merging' 438 merge_future = self._pool.submit( 439 self._merge, self._adlfs, dst, 440 [chunk for chunk, _ in sorted(cstates.objects, 441 key=operator.itemgetter(1))], 442 overwrite=self._parent._overwrite, 443 shutdown_event=self._shutdown_event) 444 self._ffutures[merge_future] = parent 445 merge_future.add_done_callback(self._update) 446 else: 447 if not self._chunked and str(dst).endswith('.inprogress'): 448 logger.debug("Renaming file to remove .inprogress: %s", self._fstates[parent]) 449 self._fstates[parent] = 'merging' 450 self._rename_file(dst, dst.replace('.inprogress',''), overwrite=self._parent._overwrite) 451 dst = dst.replace('.inprogress', '') 452 453 self._fstates[parent] = 'finished' 454 logger.info("Transferred %s -> %s", src, dst) 455 elif cstates.contains_none('running', 'pending'): 456 logger.error("Transfer failed: %s -> %s", src, dst) 457 self._fstates[parent] = 'errored' 458 elif future in self._ffutures: 459 src, dst = self._ffutures[future] 460 461 if future.cancelled(): 462 self._fstates[(src, dst)] = 'cancelled' 463 elif future.exception(): 464 self._files[(src, dst)]['exception'] = repr(future.exception()) 465 self._fstates[(src, dst)] = 'errored' 466 else: 467 exception = future.result() 468 self._files[(src, dst)]['exception'] = exception 469 if exception: 470 self._fstates[(src, dst)] = 'errored' 471 else: 472 self._fstates[(src, dst)] = 'finished' 473 logger.info("Transferred %s -> %s", src, dst) 474 # TODO: Re-enable progress saving when a less IO intensive solution is available. 475 # See issue: https://github.com/Azure/azure-data-lake-store-python/issues/117 476 #self.save() 477 else: 478 raise ValueError("Illegal state future {} not found in either file futures {} nor chunk futures {}" 479 .format(future, self._ffutures, self._cfutures)) 480 if self.verbose: 481 print('\b' * 200, self.status, end='') 482 sys.stdout.flush() 483 484 @property 485 def status(self): 486 c = sum([Counter([c.state for c in f.chunks]) for f in 487 self.progress], Counter()) 488 return dict(c) 489 490 def run(self, nthreads=None, monitor=True, before_start=None): 491 self._pool = ThreadPoolExecutor(self._nthreads) 492 self._shutdown_event = threading.Event() 493 self._nthreads = nthreads or self._nthreads 494 self._ffutures = {} 495 self._cfutures = {} 496 497 for src, dst in self._files: 498 if before_start: 499 before_start(self._adlfs, src, dst) 500 self._start(src, dst) 501 502 if monitor: 503 self.monitor(timeout=self._timeout) 504 has_errors = False 505 error_list = [] 506 for f in self.progress: 507 for chunk in f.chunks: 508 if chunk.state == 'finished': 509 continue 510 if chunk.exception: 511 error_string = '{} -> {}, chunk {} {}: {}, {}'.format( 512 f.src, f.dst, chunk.name, chunk.offset, 513 chunk.state, repr(chunk.exception)) 514 logger.error(error_string) 515 has_errors = True 516 error_list.append(error_string) 517 else: 518 error_string = '{} -> {}, chunk {} {}: {}'.format( 519 f.src, f.dst, chunk.name, chunk.offset, 520 chunk.state) 521 logger.error(error_string) 522 error_list.append(error_string) 523 has_errors = True 524 if has_errors: 525 raise DatalakeIncompleteTransferException('One more more exceptions occured during transfer, resulting in an incomplete transfer. \n\n List of exceptions and errors:\n {}'.format('\n'.join(error_list))) 526 527 def _wait(self, poll=0.1, timeout=0): 528 start = time.time() 529 while self.active: 530 if timeout > 0 and time.time() - start > timeout: 531 break 532 time.sleep(poll) 533 534 def _clear(self): 535 self._cfutures = {} 536 self._ffutures = {} 537 self._pool = None 538 539 def shutdown(self): 540 """ 541 Shutdown task threads in an orderly fashion. 542 543 Within the context of this method, we disable Ctrl+C keystroke events 544 until all threads have exited. We re-enable Ctrl+C keystroke events 545 before leaving. 546 """ 547 handler = signal.signal(signal.SIGINT, signal.SIG_IGN) 548 try: 549 logger.debug("Shutting down worker threads") 550 self._shutdown_event.set() 551 self._pool.shutdown(wait=True) 552 except Exception as e: 553 logger.error("Unexpected exception occurred during shutdown: %s", repr(e)) 554 else: 555 logger.debug("Shutdown complete") 556 finally: 557 signal.signal(signal.SIGINT, handler) 558 559 def monitor(self, poll=0.1, timeout=0): 560 """ Wait for download to happen """ 561 try: 562 self._wait(poll, timeout) 563 except KeyboardInterrupt: 564 logger.warning("%s suspended and persisted", self) 565 self.shutdown() 566 self._clear() 567 568 # TODO: Re-enable progress saving when a less IO intensive solution is available. 569 # See issue: https://github.com/Azure/azure-data-lake-store-python/issues/117 570 #self.save() 571 572 def __getstate__(self): 573 dic2 = self.__dict__.copy() 574 dic2.pop('_cfutures', None) 575 dic2.pop('_ffutures', None) 576 dic2.pop('_pool', None) 577 dic2.pop('_shutdown_event', None) 578 dic2.pop('_progress_lock', None) 579 580 dic2['_files'] = dic2.get('_files', {}).copy() 581 dic2['_chunks'] = dic2.get('_chunks', {}).copy() 582 583 return dic2 584 585 def save(self, keep=True): 586 if self._parent is not None: 587 self._parent.save(keep=keep) 588