1# -*- coding: utf-8 -*- #
2# Copyright 2021 Google LLC. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#    http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Task for file downloads.
16
17Typically executed in a task iterator:
18googlecloudsdk.command_lib.storage.tasks.task_executor.
19"""
20
21from __future__ import absolute_import
22from __future__ import division
23from __future__ import unicode_literals
24
25import os
26import threading
27
28from googlecloudsdk.api_lib.storage import api_factory
29from googlecloudsdk.api_lib.storage import cloud_api
30from googlecloudsdk.command_lib.storage import progress_callbacks
31from googlecloudsdk.command_lib.storage import tracker_file_util
32from googlecloudsdk.command_lib.storage import util
33from googlecloudsdk.command_lib.storage.tasks import task_status
34from googlecloudsdk.command_lib.storage.tasks.cp import file_part_task
35from googlecloudsdk.core import properties
36from googlecloudsdk.core.util import files
37
38
39def _get_valid_downloaded_byte_count(destination_url, resource):
40  """Checks to see how many bytes of file have been downloaded.
41
42  Args:
43    destination_url (storage_url.FileUrl): Has path of file being downloaded.
44    resource (resource_reference.ObjectResource): Has metadata of path being
45      downloaded.
46
47  Returns:
48    Int byte count of size of partially-downloaded file. Returns 0 if file is
49    an invalid size, empty, or non-existent.
50  """
51  if not destination_url.exists():
52    return 0
53  existing_file_size = os.path.getsize(destination_url.object_name)
54  return existing_file_size if existing_file_size < resource.size else 0
55
56
57class FilePartDownloadTask(file_part_task.FilePartTask):
58  """Downloads a byte range.
59
60  Normally, don't docstring private attributes, but initialization parameters
61  need to be more specific than parent class's.
62
63  Attributes:
64    _source_resource (resource_reference.ObjectResource): Must contain the full
65      path of object to download, including bucket. Directories will not be
66      accepted. Does not need to contain metadata.
67    _destination_resource (resource_reference.FileObjectResource): Must contain
68      local filesystem path to upload object. Does not need to contain metadata.
69    _offset (int): The index of the first byte in the upload range.
70    _length (int?): The number of bytes in the upload range.
71    _component_number (int?): If a multipart operation, indicates the
72      component number.
73    _total_components (int?): If a multipart operation, indicates the total
74      number of components.
75  """
76
77  def _perform_download(self, digesters, progress_callback, download_strategy,
78                        start_byte, end_byte):
79    """Prepares file stream, calls API, and validates hash."""
80    mode = (
81        files.BinaryFileWriterMode.MODIFY
82        if start_byte else files.BinaryFileWriterMode.TRUNCATE)
83    with files.BinaryFileWriter(
84        self._destination_resource.storage_url.object_name,
85        create_path=True,
86        mode=mode) as download_stream:
87      download_stream.seek(start_byte)
88      provider = self._source_resource.storage_url.scheme
89      # TODO(b/162264437): Support all of download_object's parameters.
90      api_factory.get_api(provider).download_object(
91          self._source_resource,
92          download_stream,
93          digesters=digesters,
94          download_strategy=download_strategy,
95          progress_callback=progress_callback,
96          start_byte=start_byte,
97          end_byte=end_byte)
98
99    # TODO(b/172048376): Add crc32c, and make this a loop.
100    if util.HashAlgorithms.MD5 in digesters:
101      calculated_digest = util.get_base64_hash_digest_string(
102          digesters[util.HashAlgorithms.MD5])
103      util.validate_object_hashes_match(self._source_resource.storage_url,
104                                        self._source_resource.md5_hash,
105                                        calculated_digest)
106
107  def _perform_one_shot_download(self, digesters, progress_callback):
108    """Sets up a basic download based on task attributes."""
109    end_byte = self._offset + self._length
110    self._perform_download(
111        digesters,
112        progress_callback,
113        cloud_api.DownloadStrategy.ONE_SHOT,
114        start_byte=self._offset,
115        end_byte=end_byte)
116
117  def _perform_resumable_download(self, digesters, progress_callback):
118    """Resume or start download that can be resumabled."""
119    destination_url = self._destination_resource.storage_url
120    existing_file_size = _get_valid_downloaded_byte_count(
121        destination_url, self._source_resource)
122    if existing_file_size:
123      with files.BinaryFileReader(destination_url.object_name) as file_reader:
124        # Get hash of partially-downloaded file as start for validation.
125        for hash_algorithm in digesters:
126          digesters[hash_algorithm] = util.get_hash_from_file_stream(
127              file_reader, hash_algorithm)
128
129    tracker_file_path, start_byte = (
130        tracker_file_util.read_or_create_download_tracker_file(
131            self._source_resource,
132            destination_url,
133            existing_file_size=existing_file_size))
134    end_byte = self._source_resource.size
135
136    self._perform_download(digesters, progress_callback,
137                           cloud_api.DownloadStrategy.RESUMABLE, start_byte,
138                           end_byte)
139
140    tracker_file_util.delete_tracker_file(tracker_file_path)
141
142  def _perform_component_download(self):
143    """Component download does not validate hash or delete tracker."""
144    # TODO(b/181339817): Implement sliced downloads.
145    raise NotImplementedError
146
147  def execute(self, task_status_queue=None):
148    """Performs download."""
149    if self._source_resource.md5_hash and self._component_number is None:
150      # Checks component_number to avoid hashing slices in sliced downloads.
151      digesters = {util.HashAlgorithms.MD5: util.get_md5_hash()}
152    else:
153      digesters = {}
154
155    progress_callback = progress_callbacks.FilesAndBytesProgressCallback(
156        status_queue=task_status_queue,
157        size=self._source_resource.size,
158        source_url=self._source_resource.storage_url,
159        destination_url=self._destination_resource.storage_url,
160        component_number=self._component_number,
161        total_components=self._total_components,
162        operation_name=task_status.OperationName.DOWNLOADING,
163        process_id=os.getpid(),
164        thread_id=threading.get_ident(),
165    )
166
167    if self._component_number is not None:
168      self._perform_component_download()
169    elif (self._source_resource.size and self._source_resource.size >=
170          properties.VALUES.storage.resumable_threshold.GetInt()):
171      self._perform_resumable_download(digesters, progress_callback)
172    else:
173      self._perform_one_shot_download(digesters, progress_callback)
174