1###################################################################### 2# 3# File: b2/sync/action.py 4# 5# Copyright 2018 Backblaze Inc. All Rights Reserved. 6# 7# License https://www.backblaze.com/using_b2_code.html 8# 9###################################################################### 10 11from abc import (ABCMeta, abstractmethod) 12 13import logging 14import os 15import six 16 17from ..download_dest import DownloadDestLocalFile 18from ..upload_source import UploadSourceLocalFile 19from ..utils import raise_if_shutting_down 20from ..raw_api import SRC_LAST_MODIFIED_MILLIS 21from .report import SyncFileReporter 22 23logger = logging.getLogger(__name__) 24 25 26@six.add_metaclass(ABCMeta) 27class AbstractAction(object): 28 """ 29 An action to take, such as uploading, downloading, or deleting 30 a file. Multi-threaded tasks create a sequence of Actions, which 31 are then run by a pool of threads. 32 33 An action can depend on other actions completing. An example of 34 this is making sure a CreateBucketAction happens before an 35 UploadFileAction. 36 """ 37 38 def run(self, bucket, reporter, dry_run=False): 39 raise_if_shutting_down() 40 try: 41 if not dry_run: 42 self.do_action(bucket, reporter) 43 self.do_report(bucket, reporter) 44 except Exception as e: 45 logger.exception('an exception occurred in a sync action') 46 reporter.error(str(self) + ": " + repr(e) + ' ' + str(e)) 47 raise # Re-throw so we can identify failed actions 48 49 @abstractmethod 50 def get_bytes(self): 51 """ 52 Returns the number of bytes to transfer for this action. 53 """ 54 55 @abstractmethod 56 def do_action(self, bucket, reporter): 57 """ 58 Performs the action, returning only after the action is completed. 59 """ 60 61 @abstractmethod 62 def do_report(self, bucket, reporter): 63 """ 64 Report the action performed. 65 """ 66 67 68class B2UploadAction(AbstractAction): 69 def __init__(self, local_full_path, relative_name, b2_file_name, mod_time_millis, size): 70 self.local_full_path = local_full_path 71 self.relative_name = relative_name 72 self.b2_file_name = b2_file_name 73 self.mod_time_millis = mod_time_millis 74 self.size = size 75 76 def get_bytes(self): 77 return self.size 78 79 def do_action(self, bucket, reporter): 80 bucket.upload( 81 UploadSourceLocalFile(self.local_full_path), 82 self.b2_file_name, 83 file_info={SRC_LAST_MODIFIED_MILLIS: str(self.mod_time_millis)}, 84 progress_listener=SyncFileReporter(reporter) 85 ) 86 87 def do_report(self, bucket, reporter): 88 reporter.print_completion('upload ' + self.relative_name) 89 90 def __str__(self): 91 return 'b2_upload(%s, %s, %s)' % ( 92 self.local_full_path, self.b2_file_name, self.mod_time_millis 93 ) 94 95 96class B2HideAction(AbstractAction): 97 def __init__(self, relative_name, b2_file_name): 98 self.relative_name = relative_name 99 self.b2_file_name = b2_file_name 100 101 def get_bytes(self): 102 return 0 103 104 def do_action(self, bucket, reporter): 105 bucket.hide_file(self.b2_file_name) 106 107 def do_report(self, bucket, reporter): 108 reporter.update_transfer(1, 0) 109 reporter.print_completion('hide ' + self.relative_name) 110 111 def __str__(self): 112 return 'b2_hide(%s)' % (self.b2_file_name,) 113 114 115class B2DownloadAction(AbstractAction): 116 def __init__( 117 self, relative_name, b2_file_name, file_id, local_full_path, mod_time_millis, file_size 118 ): 119 self.relative_name = relative_name 120 self.b2_file_name = b2_file_name 121 self.file_id = file_id 122 self.local_full_path = local_full_path 123 self.mod_time_millis = mod_time_millis 124 self.file_size = file_size 125 126 def get_bytes(self): 127 return self.file_size 128 129 def do_action(self, bucket, reporter): 130 # Make sure the directory exists 131 parent_dir = os.path.dirname(self.local_full_path) 132 if not os.path.isdir(parent_dir): 133 try: 134 os.makedirs(parent_dir) 135 except OSError: 136 pass 137 if not os.path.isdir(parent_dir): 138 raise Exception('could not create directory %s' % (parent_dir,)) 139 140 # Download the file to a .tmp file 141 download_path = self.local_full_path + '.b2.sync.tmp' 142 download_dest = DownloadDestLocalFile(download_path) 143 bucket.download_file_by_name(self.b2_file_name, download_dest, SyncFileReporter(reporter)) 144 145 # Move the file into place 146 try: 147 os.unlink(self.local_full_path) 148 except OSError: 149 pass 150 os.rename(download_path, self.local_full_path) 151 152 def do_report(self, bucket, reporter): 153 reporter.print_completion('dnload ' + self.relative_name) 154 155 def __str__(self): 156 return ( 157 'b2_download(%s, %s, %s, %d)' % 158 (self.b2_file_name, self.file_id, self.local_full_path, self.mod_time_millis) 159 ) 160 161 162class B2DeleteAction(AbstractAction): 163 def __init__(self, relative_name, b2_file_name, file_id, note): 164 self.relative_name = relative_name 165 self.b2_file_name = b2_file_name 166 self.file_id = file_id 167 self.note = note 168 169 def get_bytes(self): 170 return 0 171 172 def do_action(self, bucket, reporter): 173 bucket.api.delete_file_version(self.file_id, self.b2_file_name) 174 175 def do_report(self, bucket, reporter): 176 reporter.update_transfer(1, 0) 177 reporter.print_completion('delete ' + self.relative_name + ' ' + self.note) 178 179 def __str__(self): 180 return 'b2_delete(%s, %s, %s)' % (self.b2_file_name, self.file_id, self.note) 181 182 183class LocalDeleteAction(AbstractAction): 184 def __init__(self, relative_name, full_path): 185 self.relative_name = relative_name 186 self.full_path = full_path 187 188 def get_bytes(self): 189 return 0 190 191 def do_action(self, bucket, reporter): 192 os.unlink(self.full_path) 193 194 def do_report(self, bucket, reporter): 195 reporter.update_transfer(1, 0) 196 reporter.print_completion('delete ' + self.relative_name) 197 198 def __str__(self): 199 return 'local_delete(%s)' % (self.full_path) 200