1# -*- coding: utf-8 -*-
2# coding=utf-8
3# --------------------------------------------------------------------------
4# Copyright (c) Microsoft Corporation. All rights reserved.
5# Licensed under the MIT License. See License.txt in the project root for
6# license information.
7# --------------------------------------------------------------------------
8
9"""
10Low-level classes for managing data transfer.
11"""
12from __future__ import print_function
13
14from collections import namedtuple, Counter
15from concurrent.futures import ThreadPoolExecutor
16import logging
17import multiprocessing
18import signal
19import sys
20import threading
21import time
22import uuid
23import operator
24import os
25
26from .exceptions import DatalakeIncompleteTransferException
27
28logger = logging.getLogger(__name__)
29
30
31class StateManager(object):
32    """
33    Manages state for any hashable object.
34
35    When tracking multiple files and their chunks, each file/chunk can be in
36    any valid state for that particular type.
37
38    At the simplest level, we need to set and retrieve an object's current
39    state, while only allowing valid states to be used. In addition, we also
40    need to give statistics about a group of objects (are all objects in one
41    state? how many objects are in each available state?).
42
43    Parameters
44    ----------
45    states: list of valid states
46        Managed objects can only use these defined states.
47
48    Examples
49    --------
50    >>> StateManager('draft', 'review', 'complete')  # doctest: +SKIP
51    <StateManager: draft=0 review=0 complete=0>
52    >>> mgr = StateManager('off', 'on')
53    >>> mgr['foo'] = 'on'
54    >>> mgr['bar'] = 'off'
55    >>> mgr['quux'] = 'on'
56    >>> mgr  # doctest: +SKIP
57    <StateManager: off=1 on=2>
58    >>> mgr.contains_all('on')
59    False
60    >>> mgr['bar'] = 'on'
61    >>> mgr.contains_all('on')
62    True
63    >>> mgr.contains_none('off')
64    True
65
66    Internal class used by `ADLTransferClient`.
67    """
68    def __init__(self, *states):
69        self._states = {state: set() for state in states}
70        self._objects = {}
71
72    @property
73    def states(self):
74        return list(self._states)
75
76    @property
77    def objects(self):
78        return list(self._objects)
79
80    def __iter__(self):
81        return iter(self._objects.items())
82
83    def __getitem__(self, obj):
84        return self._objects[obj]
85
86    def __setitem__(self, obj, state):
87        if obj in self._objects:
88            self._states[self._objects[obj]].discard(obj)
89        self._states[state].add(obj)
90        self._objects[obj] = state
91
92    def contains_all(self, state):
93        """ Return whether all managed objects are in the given state """
94        objs = self._states[state]
95        return len(objs) > 0 and len(self.objects) - len(objs) == 0
96
97    def contains_none(self, *states):
98        """ Return whether no managed objects are in the given states """
99        return all([len(self._states[state]) == 0 for state in states])
100
101    def __str__(self):
102        status = " ".join(
103            ["%s=%d" % (s, len(self._states[s])) for s in self._states])
104        return "<StateManager: " + status + ">"
105
106    __repr__ = __str__
107
108
109# Named tuples used to serialize client progress
110File = namedtuple('File', 'src dst state length chunks exception')
111Chunk = namedtuple('Chunk', 'name state offset expected actual exception')
112
113
114class ADLTransferClient(object):
115    """
116    Client for transferring data from/to Azure DataLake Store
117
118    This is intended as the underlying class for `ADLDownloader` and
119    `ADLUploader`. If necessary, it can be used directly for additional
120    control.
121
122    Parameters
123    ----------
124    adlfs: ADL filesystem instance
125    name: str
126        Unique ID used for persistence.
127    transfer: callable
128        Function or callable object invoked when transferring chunks. See
129        ``Function Signatures``.
130    merge: callable [None]
131        Function or callable object invoked when merging chunks. For each file
132        containing only one chunk, no merge function will be called, even if
133        provided. If None, then merging is skipped. See
134        ``Function Signatures``.
135    nthreads: int [None]
136        Number of threads to use (minimum is 1). If None, uses the number of
137        cores.
138    chunksize: int [2**28]
139        Number of bytes for a chunk. Large files are split into chunks. Files
140        smaller than this number will always be transferred in a single thread.
141    buffersize: int [2**25]
142        Number of bytes for internal buffer. This block cannot be bigger than
143        a chunk and cannot be smaller than a block.
144    blocksize: int [2**25]
145        Number of bytes for a block. Within each chunk, we write a smaller
146        block for each API call. This block cannot be bigger than a chunk.
147    chunked: bool [True]
148        If set, each transferred chunk is stored in a separate file until
149        chunks are gathered into a single file. Otherwise, each chunk will be
150        written into the same destination file.
151    unique_temporary: bool [True]
152        If set, transferred chunks are written into a unique temporary
153        directory.
154    persist_path: str [None]
155        Path used for persisting a client's state. If None, then `save()`
156        and `load()` will be empty operations.
157    delimiter: byte(s) or None
158        If set, will transfer blocks using delimiters, as well as split
159        files for transferring on that delimiter.
160    parent: ADLDownloader, ADLUploader or None
161        In typical usage, the transfer client is created in the context of an
162        upload or download, which can be persisted between sessions.
163    progress_callback: callable [None]
164        Callback for progress with signature function(current, total) where
165        current is the number of bytes transferred so far, and total is the
166        size of the blob, or None if the total size is unknown.
167    timeout: int (0)
168        Default value 0 means infinite timeout. Otherwise time in seconds before the
169        process will stop and raise an exception if  transfer is still in progress
170
171    Temporary Files
172    ---------------
173
174    When a merge step is available, the client will write chunks to temporary
175    files before merging. The exact temporary file looks like this in
176    pseudo-BNF:
177
178    >>> # {dirname}/{basename}.segments[.{unique_str}]/{basename}_{offset}
179
180    Function Signatures
181    -------------------
182
183    To perform the actual work needed by the client, the user must pass in two
184    callables, `transfer` and `merge`. If merge is not provided, then the
185    merge step will be skipped.
186
187    The `transfer` callable has the function signature,
188    `fn(adlfs, src, dst, offset, size, buffersize, blocksize, shutdown_event)`.
189    `adlfs` is the ADL filesystem instance. `src` and `dst` refer to the source
190    and destination of the respective file transfer. `offset` is the location
191    in `src` to read `size` bytes from. `buffersize` is the number of bytes
192    used for internal buffering before transfer. `blocksize` is the number of
193    bytes in a chunk to write at one time. The callable should return an
194    integer representing the number of bytes written.
195
196    The `merge` callable has the function signature,
197    `fn(adlfs, outfile, files, shutdown_event)`. `adlfs` is the ADL filesystem
198    instance. `outfile` is the result of merging `files`.
199
200    For both transfer callables, `shutdown_event` is optional. In particular,
201    `shutdown_event` is a `threading.Event` that is passed to the callable.
202    The event will be set when a shutdown is requested. It is good practice
203    to listen for this.
204
205    Internal State
206    --------------
207
208    self._fstates: StateManager
209        This captures the current state of each transferred file.
210    self._files: dict
211        Using a tuple of the file source/destination as the key, this
212        dictionary stores the file metadata and all chunk states. The
213        dictionary key is `(src, dst)` and the value is
214        `dict(length, cstates, exception)`.
215    self._chunks: dict
216        Using a tuple of the chunk name/offset as the key, this dictionary
217        stores the chunk metadata and has a reference to the chunk's parent
218        file. The dictionary key is `(name, offset)` and the value is
219        `dict(parent=(src, dst), expected, actual, exception)`.
220    self._ffutures: dict
221        Using a Future object as the key, this dictionary provides a reverse
222        lookup for the file associated with the given future. The returned
223        value is the file's primary key, `(src, dst)`.
224    self._cfutures: dict
225        Using a Future object as the key, this dictionary provides a reverse
226        lookup for the chunk associated with the given future. The returned
227        value is the chunk's primary key, `(name, offset)`.
228
229    See Also
230    --------
231    azure.datalake.store.multithread.ADLDownloader
232    azure.datalake.store.multithread.ADLUploader
233    """
234
235    def __init__(self, adlfs, transfer, merge=None, nthreads=None,
236                 chunksize=2**28, blocksize=2**25, chunked=True,
237                 unique_temporary=True, delimiter=None,
238                 parent=None, verbose=False, buffersize=2**25,
239                 progress_callback=None, timeout=0):
240        self._adlfs = adlfs
241        self._parent = parent
242        self._transfer = transfer
243        self._merge = merge
244        self._nthreads = max(1, nthreads or multiprocessing.cpu_count())
245        self._chunksize = chunksize
246        self._buffersize = buffersize
247        self._blocksize = blocksize
248        self._chunked = chunked
249        self._unique_temporary = unique_temporary
250        self._unique_str = uuid.uuid4().hex
251        self._progress_callback=progress_callback
252        self._progress_lock = threading.Lock()
253        self._timeout = timeout
254        self.verbose = verbose
255
256        # Internal state tracking files/chunks/futures
257        self._progress_total_bytes = 0
258        self._transfer_total_bytes = 0
259
260        self._files = {}
261        self._chunks = {}
262        self._ffutures = {}
263        self._cfutures = {}
264        self._fstates = StateManager(
265            'pending', 'transferring', 'merging', 'finished', 'cancelled',
266            'errored')
267
268    def submit(self, src, dst, length):
269        """
270        Split a given file into chunks.
271
272        All submitted files/chunks start in the `pending` state until `run()`
273        is called.
274        """
275        cstates = StateManager(
276            'pending', 'running', 'finished', 'cancelled', 'errored')
277
278        # Create unique temporary directory for each file
279        if self._chunked:
280            if self._unique_temporary:
281                filename = "{}.segments.{}".format(dst.name, self._unique_str)
282            else:
283                filename = "{}.segments".format(dst.name)
284            tmpdir = dst.parent/filename
285        else:
286            tmpdir = None
287
288        # TODO: might need xrange support for py2
289        offsets = range(0, length, self._chunksize)
290
291        # in the case of empty files, ensure that the initial offset of 0 is properly added.
292        if not offsets:
293            if not length:
294                offsets = [0]
295            else:
296                raise DatalakeIncompleteTransferException('Could not compute offsets for source: {}, with destination: {} and expected length: {}.'.format(src, dst, length))
297
298        tmpdir_and_offsets = tmpdir and len(offsets) > 1
299        for offset in offsets:
300            if tmpdir_and_offsets:
301                name = tmpdir / "{}_{}".format(dst.name, offset)
302            else:
303                name = dst
304            cstates[(name, offset)] = 'pending'
305            self._chunks[(name, offset)] = {
306                "parent": (src, dst),
307                "expected": min(length - offset, self._chunksize),
308                "actual": 0,
309                "exception": None}
310            logger.debug("Submitted %s, byte offset %d", name, offset)
311
312        self._fstates[(src, dst)] = 'pending'
313        self._files[(src, dst)] = {
314            "length": length,
315            "cstates": cstates,
316            "exception": None}
317        self._transfer_total_bytes += length
318
319    def _start(self, src, dst):
320        key = (src, dst)
321        self._fstates[key] = 'transferring'
322        for obj in self._files[key]['cstates'].objects:
323            name, offset = obj
324            cs = self._files[key]['cstates']
325            if obj in cs.objects and cs[obj] == 'finished':
326                continue
327            cs[obj] = 'running'
328            future = self._pool.submit(
329                self._transfer, self._adlfs, src, name, offset,
330                self._chunks[obj]['expected'], self._buffersize,
331                self._blocksize, shutdown_event=self._shutdown_event)
332            self._cfutures[future] = obj
333            future.add_done_callback(self._update)
334
335    @property
336    def active(self):
337        """ Return whether the transfer is active """
338        return not self._fstates.contains_none('pending', 'transferring', 'merging')
339
340    @property
341    def successful(self):
342        """
343        Return whether the transfer completed successfully.
344
345        It will raise AssertionError if the transfer is active.
346        """
347        assert not self.active
348        return self._fstates.contains_all('finished')
349
350    @property
351    def progress(self):
352        """ Return a summary of all transferred file/chunks """
353        files = []
354        for key in self._files:
355            src, dst = key
356            chunks = []
357            for obj in self._files[key]['cstates'].objects:
358                name, offset = obj
359                chunks.append(Chunk(
360                    name=name,
361                    offset=offset,
362                    state=self._files[key]['cstates'][obj],
363                    expected=self._chunks[obj]['expected'],
364                    actual=self._chunks[obj]['actual'],
365                    exception=self._chunks[obj]['exception']))
366            files.append(File(
367                src=src,
368                dst=dst,
369                state=self._fstates[key],
370                length=self._files[key]['length'],
371                chunks=chunks,
372                exception=self._files[key]['exception']))
373        return files
374
375    def _rename_file(self, src, dst, overwrite=False):
376        """ Rename a file from file_name.inprogress to just file_name. Invoked once download completes on a file.
377
378        Internal function used by `download`.
379        """
380        try:
381            # we do a final check to make sure someone didn't create the destination file while download was occuring
382            # if the user did not specify overwrite.
383            if os.path.isfile(dst):
384                if not overwrite:
385                    raise FileExistsError(dst)
386                os.remove(dst)
387            os.rename(src, dst)
388        except Exception as e:
389            logger.error('Rename failed for source file: %r; %r', src, e)
390            raise e
391
392        logger.debug('Renamed %r to %r', src, dst)
393
394    def _update_progress(self, length):
395        if self._progress_callback is not None:
396            with self._progress_lock:
397                self._progress_total_bytes += length
398            self._progress_callback(self._progress_total_bytes, self._transfer_total_bytes)
399
400    def _update(self, future):
401
402        if future in self._cfutures:
403            obj = self._cfutures[future]
404            parent = self._chunks[obj]['parent']
405            cstates = self._files[parent]['cstates']
406            src, dst = parent
407
408            if future.cancelled():
409                cstates[obj] = 'cancelled'
410            elif future.exception():
411                self._chunks[obj]['exception'] = repr(future.exception())
412                cstates[obj] = 'errored'
413            else:
414                nbytes, exception = future.result()
415                self._chunks[obj]['actual'] = nbytes
416                self._chunks[obj]['exception'] = exception
417                if exception:
418                    cstates[obj] = 'errored'
419                elif self._chunks[obj]['expected'] != nbytes:
420                    name, offset = obj
421                    cstates[obj] = 'errored'
422                    exception = DatalakeIncompleteTransferException(
423                        'chunk {}, offset {}: expected {} bytes, transferred {} bytes'.format(
424                            name, offset, self._chunks[obj]['expected'],
425                            self._chunks[obj]['actual']))
426                    self._chunks[obj]['exception'] = exception
427                    logger.error("Incomplete transfer: %s -> %s, %s",
428                                 src, dst, repr(exception))
429                else:
430                    cstates[obj] = 'finished'
431                    self._update_progress(nbytes)
432
433            if cstates.contains_all('finished'):
434                logger.debug("Chunks transferred")
435                if self._merge and len(cstates.objects) > 1:
436                    logger.debug("Merging file: %s", self._fstates[parent])
437                    self._fstates[parent] = 'merging'
438                    merge_future = self._pool.submit(
439                        self._merge, self._adlfs, dst,
440                        [chunk for chunk, _ in sorted(cstates.objects,
441                                                      key=operator.itemgetter(1))],
442                        overwrite=self._parent._overwrite,
443                        shutdown_event=self._shutdown_event)
444                    self._ffutures[merge_future] = parent
445                    merge_future.add_done_callback(self._update)
446                else:
447                    if not self._chunked and str(dst).endswith('.inprogress'):
448                        logger.debug("Renaming file to remove .inprogress: %s", self._fstates[parent])
449                        self._fstates[parent] = 'merging'
450                        self._rename_file(dst, dst.replace('.inprogress',''), overwrite=self._parent._overwrite)
451                        dst = dst.replace('.inprogress', '')
452
453                    self._fstates[parent] = 'finished'
454                    logger.info("Transferred %s -> %s", src, dst)
455            elif cstates.contains_none('running', 'pending'):
456                logger.error("Transfer failed: %s -> %s", src, dst)
457                self._fstates[parent] = 'errored'
458        elif future in self._ffutures:
459            src, dst = self._ffutures[future]
460
461            if future.cancelled():
462                self._fstates[(src, dst)] = 'cancelled'
463            elif future.exception():
464                self._files[(src, dst)]['exception'] = repr(future.exception())
465                self._fstates[(src, dst)] = 'errored'
466            else:
467                exception = future.result()
468                self._files[(src, dst)]['exception'] = exception
469                if exception:
470                    self._fstates[(src, dst)] = 'errored'
471                else:
472                    self._fstates[(src, dst)] = 'finished'
473                    logger.info("Transferred %s -> %s", src, dst)
474        # TODO: Re-enable progress saving when a less IO intensive solution is available.
475        # See issue: https://github.com/Azure/azure-data-lake-store-python/issues/117
476        #self.save()
477        else:
478            raise ValueError("Illegal state future {} not found in either file futures {} nor chunk futures {}"
479                             .format(future, self._ffutures, self._cfutures))
480        if self.verbose:
481            print('\b' * 200, self.status, end='')
482            sys.stdout.flush()
483
484    @property
485    def status(self):
486        c = sum([Counter([c.state for c in f.chunks]) for f in
487                 self.progress], Counter())
488        return dict(c)
489
490    def run(self, nthreads=None, monitor=True, before_start=None):
491        self._pool = ThreadPoolExecutor(self._nthreads)
492        self._shutdown_event = threading.Event()
493        self._nthreads = nthreads or self._nthreads
494        self._ffutures = {}
495        self._cfutures = {}
496
497        for src, dst in self._files:
498            if before_start:
499                before_start(self._adlfs, src, dst)
500            self._start(src, dst)
501
502        if monitor:
503            self.monitor(timeout=self._timeout)
504            has_errors = False
505            error_list = []
506            for f in self.progress:
507                for chunk in f.chunks:
508                    if chunk.state == 'finished':
509                        continue
510                    if chunk.exception:
511                        error_string = '{} -> {}, chunk {} {}: {}, {}'.format(
512                            f.src, f.dst, chunk.name, chunk.offset,
513                            chunk.state, repr(chunk.exception))
514                        logger.error(error_string)
515                        has_errors = True
516                        error_list.append(error_string)
517                    else:
518                        error_string = '{} -> {}, chunk {} {}: {}'.format(
519                            f.src, f.dst, chunk.name, chunk.offset,
520                            chunk.state)
521                        logger.error(error_string)
522                        error_list.append(error_string)
523                        has_errors = True
524            if has_errors:
525                raise DatalakeIncompleteTransferException('One more more exceptions occured during transfer, resulting in an incomplete transfer. \n\n List of exceptions and errors:\n {}'.format('\n'.join(error_list)))
526
527    def _wait(self, poll=0.1, timeout=0):
528        start = time.time()
529        while self.active:
530            if timeout > 0 and time.time() - start > timeout:
531                break
532            time.sleep(poll)
533
534    def _clear(self):
535        self._cfutures = {}
536        self._ffutures = {}
537        self._pool = None
538
539    def shutdown(self):
540        """
541        Shutdown task threads in an orderly fashion.
542
543        Within the context of this method, we disable Ctrl+C keystroke events
544        until all threads have exited. We re-enable Ctrl+C keystroke events
545        before leaving.
546        """
547        handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
548        try:
549            logger.debug("Shutting down worker threads")
550            self._shutdown_event.set()
551            self._pool.shutdown(wait=True)
552        except Exception as e:
553            logger.error("Unexpected exception occurred during shutdown: %s", repr(e))
554        else:
555            logger.debug("Shutdown complete")
556        finally:
557            signal.signal(signal.SIGINT, handler)
558
559    def monitor(self, poll=0.1, timeout=0):
560        """ Wait for download to happen """
561        try:
562            self._wait(poll, timeout)
563        except KeyboardInterrupt:
564            logger.warning("%s suspended and persisted", self)
565            self.shutdown()
566        self._clear()
567
568        # TODO: Re-enable progress saving when a less IO intensive solution is available.
569        # See issue: https://github.com/Azure/azure-data-lake-store-python/issues/117
570        #self.save()
571
572    def __getstate__(self):
573        dic2 = self.__dict__.copy()
574        dic2.pop('_cfutures', None)
575        dic2.pop('_ffutures', None)
576        dic2.pop('_pool', None)
577        dic2.pop('_shutdown_event', None)
578        dic2.pop('_progress_lock', None)
579
580        dic2['_files'] = dic2.get('_files', {}).copy()
581        dic2['_chunks'] = dic2.get('_chunks', {}).copy()
582
583        return dic2
584
585    def save(self, keep=True):
586        if self._parent is not None:
587            self._parent.save(keep=keep)
588