1# -*- coding: utf-8 -*- # 2# Copyright 2021 Google LLC. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Task for file downloads. 16 17Typically executed in a task iterator: 18googlecloudsdk.command_lib.storage.tasks.task_executor. 19""" 20 21from __future__ import absolute_import 22from __future__ import division 23from __future__ import unicode_literals 24 25import os 26import threading 27 28from googlecloudsdk.api_lib.storage import api_factory 29from googlecloudsdk.api_lib.storage import cloud_api 30from googlecloudsdk.command_lib.storage import progress_callbacks 31from googlecloudsdk.command_lib.storage import tracker_file_util 32from googlecloudsdk.command_lib.storage import util 33from googlecloudsdk.command_lib.storage.tasks import task_status 34from googlecloudsdk.command_lib.storage.tasks.cp import file_part_task 35from googlecloudsdk.core import properties 36from googlecloudsdk.core.util import files 37 38 39def _get_valid_downloaded_byte_count(destination_url, resource): 40 """Checks to see how many bytes of file have been downloaded. 41 42 Args: 43 destination_url (storage_url.FileUrl): Has path of file being downloaded. 44 resource (resource_reference.ObjectResource): Has metadata of path being 45 downloaded. 46 47 Returns: 48 Int byte count of size of partially-downloaded file. Returns 0 if file is 49 an invalid size, empty, or non-existent. 50 """ 51 if not destination_url.exists(): 52 return 0 53 existing_file_size = os.path.getsize(destination_url.object_name) 54 return existing_file_size if existing_file_size < resource.size else 0 55 56 57class FilePartDownloadTask(file_part_task.FilePartTask): 58 """Downloads a byte range. 59 60 Normally, don't docstring private attributes, but initialization parameters 61 need to be more specific than parent class's. 62 63 Attributes: 64 _source_resource (resource_reference.ObjectResource): Must contain the full 65 path of object to download, including bucket. Directories will not be 66 accepted. Does not need to contain metadata. 67 _destination_resource (resource_reference.FileObjectResource): Must contain 68 local filesystem path to upload object. Does not need to contain metadata. 69 _offset (int): The index of the first byte in the upload range. 70 _length (int?): The number of bytes in the upload range. 71 _component_number (int?): If a multipart operation, indicates the 72 component number. 73 _total_components (int?): If a multipart operation, indicates the total 74 number of components. 75 """ 76 77 def _perform_download(self, digesters, progress_callback, download_strategy, 78 start_byte, end_byte): 79 """Prepares file stream, calls API, and validates hash.""" 80 mode = ( 81 files.BinaryFileWriterMode.MODIFY 82 if start_byte else files.BinaryFileWriterMode.TRUNCATE) 83 with files.BinaryFileWriter( 84 self._destination_resource.storage_url.object_name, 85 create_path=True, 86 mode=mode) as download_stream: 87 download_stream.seek(start_byte) 88 provider = self._source_resource.storage_url.scheme 89 # TODO(b/162264437): Support all of download_object's parameters. 90 api_factory.get_api(provider).download_object( 91 self._source_resource, 92 download_stream, 93 digesters=digesters, 94 download_strategy=download_strategy, 95 progress_callback=progress_callback, 96 start_byte=start_byte, 97 end_byte=end_byte) 98 99 # TODO(b/172048376): Add crc32c, and make this a loop. 100 if util.HashAlgorithms.MD5 in digesters: 101 calculated_digest = util.get_base64_hash_digest_string( 102 digesters[util.HashAlgorithms.MD5]) 103 util.validate_object_hashes_match(self._source_resource.storage_url, 104 self._source_resource.md5_hash, 105 calculated_digest) 106 107 def _perform_one_shot_download(self, digesters, progress_callback): 108 """Sets up a basic download based on task attributes.""" 109 end_byte = self._offset + self._length 110 self._perform_download( 111 digesters, 112 progress_callback, 113 cloud_api.DownloadStrategy.ONE_SHOT, 114 start_byte=self._offset, 115 end_byte=end_byte) 116 117 def _perform_resumable_download(self, digesters, progress_callback): 118 """Resume or start download that can be resumabled.""" 119 destination_url = self._destination_resource.storage_url 120 existing_file_size = _get_valid_downloaded_byte_count( 121 destination_url, self._source_resource) 122 if existing_file_size: 123 with files.BinaryFileReader(destination_url.object_name) as file_reader: 124 # Get hash of partially-downloaded file as start for validation. 125 for hash_algorithm in digesters: 126 digesters[hash_algorithm] = util.get_hash_from_file_stream( 127 file_reader, hash_algorithm) 128 129 tracker_file_path, start_byte = ( 130 tracker_file_util.read_or_create_download_tracker_file( 131 self._source_resource, 132 destination_url, 133 existing_file_size=existing_file_size)) 134 end_byte = self._source_resource.size 135 136 self._perform_download(digesters, progress_callback, 137 cloud_api.DownloadStrategy.RESUMABLE, start_byte, 138 end_byte) 139 140 tracker_file_util.delete_tracker_file(tracker_file_path) 141 142 def _perform_component_download(self): 143 """Component download does not validate hash or delete tracker.""" 144 # TODO(b/181339817): Implement sliced downloads. 145 raise NotImplementedError 146 147 def execute(self, task_status_queue=None): 148 """Performs download.""" 149 if self._source_resource.md5_hash and self._component_number is None: 150 # Checks component_number to avoid hashing slices in sliced downloads. 151 digesters = {util.HashAlgorithms.MD5: util.get_md5_hash()} 152 else: 153 digesters = {} 154 155 progress_callback = progress_callbacks.FilesAndBytesProgressCallback( 156 status_queue=task_status_queue, 157 size=self._source_resource.size, 158 source_url=self._source_resource.storage_url, 159 destination_url=self._destination_resource.storage_url, 160 component_number=self._component_number, 161 total_components=self._total_components, 162 operation_name=task_status.OperationName.DOWNLOADING, 163 process_id=os.getpid(), 164 thread_id=threading.get_ident(), 165 ) 166 167 if self._component_number is not None: 168 self._perform_component_download() 169 elif (self._source_resource.size and self._source_resource.size >= 170 properties.VALUES.storage.resumable_threshold.GetInt()): 171 self._perform_resumable_download(digesters, progress_callback) 172 else: 173 self._perform_one_shot_download(digesters, progress_callback) 174