1# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf8 -*- 2# 3# Copyright 2015 Steve Tynor <steve.tynor@gmail.com> 4# Copyright 2016 Thomas Harning Jr <harningt@gmail.com> 5# - mirror/stripe modes 6# - write error modes 7# 8# This file is part of duplicity. 9# 10# Duplicity is free software; you can redistribute it and/or modify it 11# under the terms of the GNU General Public License as published by the 12# Free Software Foundation; either version 2 of the License, or (at your 13# option) any later version. 14# 15# Duplicity is distributed in the hope that it will be useful, but 16# WITHOUT ANY WARRANTY; without even the implied warranty of 17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18# General Public License for more details. 19# 20# You should have received a copy of the GNU General Public License 21# along with duplicity; if not, write to the Free Software Foundation, 22# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 23 24# 25 26from future import standard_library 27standard_library.install_aliases() 28import os 29import os.path 30import sys 31import urllib.request # pylint: disable=import-error 32import urllib.parse # pylint: disable=import-error 33import urllib.error # pylint: disable=import-error 34import json 35 36import duplicity.backend 37from duplicity.errors import BackendException 38from duplicity import config 39from duplicity import log 40from duplicity import util 41 42 43class MultiBackend(duplicity.backend.Backend): 44 u"""Store files across multiple remote stores. URL is a path to a local file 45 containing URLs/other config defining the remote store""" 46 47 # the stores we are managing 48 __stores = [] 49 __affinities = {} 50 51 # Set of known query paramaters 52 __knownQueryParameters = frozenset([ 53 u'mode', 54 u'onfail', 55 u'subpath', 56 ]) 57 58 # the mode of operation to follow 59 # can be one of 'stripe' or 'mirror' currently 60 __mode = u'stripe' 61 __mode_allowedSet = frozenset([ 62 u'mirror', 63 u'stripe', 64 ]) 65 66 # the write error handling logic 67 # can be one of the following: 68 # * continue - default, on failure continues to next source 69 # * abort - stop all further operations 70 __onfail_mode = u'continue' 71 __onfail_mode_allowedSet = frozenset([ 72 u'abort', 73 u'continue', 74 ]) 75 76 # sub path to dynamically add sub directories to backends 77 # will be appended to the url value 78 __subpath = u'' 79 80 # when we write in stripe mode, we "stripe" via a simple round-robin across 81 # remote stores. It's hard to get too much more sophisticated 82 # since we can't rely on the backend to give us any useful meta 83 # data (e.g. sizes of files, capacity of the store (quotas)) to do 84 # a better job of balancing load across stores. 85 __write_cursor = 0 86 87 @staticmethod 88 def get_query_params(parsed_url): 89 # Reparse so the query string is available 90 reparsed_url = urllib.parse.urlparse(parsed_url.geturl()) 91 if len(reparsed_url.query) == 0: 92 return dict() 93 try: 94 queryMultiDict = urllib.parse.parse_qs(reparsed_url.query, strict_parsing=True) 95 except ValueError as e: 96 log.Log(_(u"MultiBackend: Could not parse query string %s: %s ") 97 % (reparsed_url.query, e), 98 log.ERROR) 99 raise BackendException(u'Could not parse query string') 100 queryDict = dict() 101 # Convert the multi-dict to a single dictionary 102 # while checking to make sure that no unrecognized values are found 103 for name, valueList in list(queryMultiDict.items()): 104 if len(valueList) != 1: 105 log.Log(_(u"MultiBackend: Invalid query string %s: more than one value for %s") 106 % (reparsed_url.query, name), 107 log.ERROR) 108 raise BackendException(u'Invalid query string') 109 if name not in MultiBackend.__knownQueryParameters: 110 log.Log(_(u"MultiBackend: Invalid query string %s: unknown parameter %s") 111 % (reparsed_url.query, name), 112 log.ERROR) 113 raise BackendException(u'Invalid query string') 114 115 queryDict[name] = valueList[0] 116 return queryDict 117 118 def __init__(self, parsed_url): 119 duplicity.backend.Backend.__init__(self, parsed_url) 120 121 # Init each of the wrapped stores 122 # 123 # config file is a json formatted collection of values, one for 124 # each backend. We will 'stripe' data across all the given stores: 125 # 126 # 'url' - the URL used for the backend store 127 # 'env' - an optional list of enviroment variable values to set 128 # during the intialization of the backend 129 # 130 # Example: 131 # 132 # [ 133 # { 134 # "url": "abackend://myuser@domain.com/backup", 135 # "env": [ 136 # { 137 # "name" : "MYENV", 138 # "value" : "xyz" 139 # }, 140 # { 141 # "name" : "FOO", 142 # "value" : "bar" 143 # } 144 # ] 145 # }, 146 # { 147 # "url": "file:///path/to/dir" 148 # } 149 # ] 150 151 queryParams = MultiBackend.get_query_params(parsed_url) 152 153 if u'mode' in queryParams: 154 self.__mode = queryParams[u'mode'] 155 156 if u'onfail' in queryParams: 157 self.__onfail_mode = queryParams[u'onfail'] 158 159 if self.__mode not in MultiBackend.__mode_allowedSet: 160 log.Log(_(u"MultiBackend: illegal value for %s: %s") 161 % (u'mode', self.__mode), log.ERROR) 162 raise BackendException(u"MultiBackend: invalid mode value") 163 164 if self.__onfail_mode not in MultiBackend.__onfail_mode_allowedSet: 165 log.Log(_(u"MultiBackend: illegal value for %s: %s") 166 % (u'onfail', self.__onfail_mode), log.ERROR) 167 raise BackendException(u"MultiBackend: invalid onfail value") 168 169 if u'subpath' in queryParams: 170 self.__subpath = queryParams[u'subpath'] 171 172 try: 173 with open(parsed_url.path) as f: 174 configs = json.load(f) 175 except IOError as e: 176 log.Log(_(u"MultiBackend: Url %s") 177 % (parsed_url.geturl()), 178 log.ERROR) 179 180 log.Log(_(u"MultiBackend: Could not load config file %s: %s ") 181 % (parsed_url.path, e), 182 log.ERROR) 183 raise BackendException(u'Could not load config file') 184 185 for config in configs: 186 url = config[u'url'] + self.__subpath 187 if sys.version_info.major == 2: 188 url = url.encode(u'utf-8') 189 log.Log(_(u"MultiBackend: use store %s") 190 % (url), 191 log.INFO) 192 if u'env' in config: 193 for env in config[u'env']: 194 log.Log(_(u"MultiBackend: set env %s = %s") 195 % (env[u'name'], env[u'value']), 196 log.INFO) 197 os.environ[env[u'name']] = env[u'value'] 198 199 store = duplicity.backend.get_backend(url) 200 self.__stores.append(store) 201 202 # Prefix affinity 203 if u'prefixes' in config: 204 if self.__mode == u'stripe': 205 raise BackendException(u"Multibackend: stripe mode not supported with prefix affinity.") 206 for prefix in config[u'prefixes']: 207 log.Log(_(u"Multibackend: register affinity for prefix %s") 208 % prefix, log.INFO) 209 if prefix in self.__affinities: 210 self.__affinities[prefix].append(store) 211 else: 212 self.__affinities[prefix] = [store] 213 214 # store_list = store.list() 215 # log.Log(_("MultiBackend: at init, store %s has %s files") 216 # % (url, len(store_list)), 217 # log.INFO) 218 219 def _eligible_stores(self, filename): 220 if self.__affinities: 221 matching_prefixes = [k for k in list(self.__affinities.keys()) if util.fsdecode(filename).startswith(k)] 222 matching_stores = {store for prefix in matching_prefixes for store in self.__affinities[prefix]} 223 if matching_stores: 224 # Distinct stores with matching prefix 225 return list(matching_stores) 226 227 # No affinity rule or no matching store for that prefix 228 return self.__stores 229 230 def _put(self, source_path, remote_filename): 231 # Store an indication of whether any of these passed 232 passed = False 233 234 # Eligibile stores for this action 235 stores = self._eligible_stores(remote_filename) 236 237 # Mirror mode always starts at zero 238 if self.__mode == u'mirror': 239 self.__write_cursor = 0 240 241 first = self.__write_cursor 242 while True: 243 store = stores[self.__write_cursor] 244 try: 245 next = self.__write_cursor + 1 # pylint: disable=redefined-builtin 246 if (next > len(stores) - 1): 247 next = 0 248 log.Log(_(u"MultiBackend: _put: write to store #%s (%s)") 249 % (self.__write_cursor, store.backend.parsed_url.url_string), 250 log.DEBUG) 251 store.put(source_path, remote_filename) 252 passed = True 253 self.__write_cursor = next 254 # No matter what, if we loop around, break this loop 255 if next == 0: 256 break 257 # If in stripe mode, don't continue to the next 258 if self.__mode == u'stripe': 259 break 260 except Exception as e: 261 log.Log(_(u"MultiBackend: failed to write to store #%s (%s), try #%s, Exception: %s") 262 % (self.__write_cursor, store.backend.parsed_url.url_string, next, e), 263 log.INFO) 264 self.__write_cursor = next 265 266 # If we consider write failure as abort, abort 267 if self.__onfail_mode == u'abort': 268 log.Log(_(u"MultiBackend: failed to write %s. Aborting process.") 269 % (source_path), 270 log.ERROR) 271 raise BackendException(u"failed to write") 272 273 # If we've looped around, and none of them passed, fail 274 if (self.__write_cursor == first) and not passed: 275 log.Log(_(u"MultiBackend: failed to write %s. Tried all backing stores and none succeeded") 276 % (source_path), 277 log.ERROR) 278 raise BackendException(u"failed to write") 279 280 def _get(self, remote_filename, local_path): 281 # since the backend operations will be retried, we can't 282 # simply try to get from the store, if not found, move to the 283 # next store (since each failure will be retried n times 284 # before finally giving up). So we need to get the list first 285 # before we try to fetch 286 # ENHANCEME: maintain a cached list for each store 287 stores = self._eligible_stores(remote_filename) 288 289 for s in stores: 290 flist = s.list() 291 if remote_filename in flist: 292 s.get(remote_filename, local_path) 293 return 294 log.Log(_(u"MultiBackend: failed to get %s to %s from %s") 295 % (remote_filename, local_path, s.backend.parsed_url.url_string), 296 log.INFO) 297 log.Log(_(u"MultiBackend: failed to get %s. Tried all backing stores and none succeeded") 298 % (remote_filename), 299 log.ERROR) 300 raise BackendException(u"failed to get") 301 302 def _list(self): 303 lists = [] 304 for s in self.__stores: 305 config.are_errors_fatal[u'list'] = (False, []) 306 l = s.list() 307 log.Notice(_(u"MultiBackend: %s: %d files") 308 % (s.backend.parsed_url.url_string, len(l))) 309 if len(l) == 0 and duplicity.backend._last_exception: 310 log.Warn(_(u"Exception during list of %s: %s" 311 % (s.backend.parsed_url.url_string, 312 util.uexc(duplicity.backend._last_exception)))) 313 duplicity.backend._last_exception = None 314 lists.append(l) 315 # combine the lists into a single flat list w/o duplicates via set: 316 result = list({item for sublist in lists for item in sublist}) 317 log.Log(_(u"MultiBackend: combined list: %s") 318 % (result), 319 log.DEBUG) 320 return result 321 322 def _delete(self, filename): 323 # Store an indication on whether any passed 324 passed = False 325 326 stores = self._eligible_stores(filename) 327 328 # since the backend operations will be retried, we can't 329 # simply try to get from the store, if not found, move to the 330 # next store (since each failure will be retried n times 331 # before finally giving up). So we need to get the list first 332 # before we try to delete 333 # ENHANCEME: maintain a cached list for each store 334 for s in stores: 335 flist = s.list() 336 if filename in flist: 337 if hasattr(s.backend, u'_delete_list'): 338 s._do_delete_list([filename, ]) 339 elif hasattr(s.backend, u'_delete'): 340 s._do_delete(filename) 341 passed = True 342 # In stripe mode, only one item will have the file 343 if self.__mode == u'stripe': 344 return 345 if not passed: 346 log.Log(_(u"MultiBackend: failed to delete %s. Tried all backing stores and none succeeded") 347 % (filename), 348 log.ERROR) 349 350 351duplicity.backend.register_backend(u'multi', MultiBackend) 352