1# -*- coding: utf-8 -*- 2"""The functions to perform the sync itself given a pair of file 3listings (remote and local) and a copy of the previous state (db) 4 5Part of the pyosf package 6https://github.com/psychopy/pyosf/ 7 8Released under MIT license 9 10Created on Sun Feb 7 21:31:15 2016 11 12@author: lpzjwp 13""" 14from __future__ import absolute_import, print_function 15from .constants import SHA 16import copy 17import os 18import shutil 19import weakref 20try: 21 from psychopy import logging 22except ImportError: 23 import logging 24from .tools import dict_from_list 25 26""" 27Resolutions table 28search for e.g. code:010 to find the relevant python resolution 29+-------+-------+--------+-------------------------------------------+ 30| index | local | remote | action | 31+-------+-------+--------+-------------------------------------------+ 32| 1 | 1 | 1 | check sha for file changes | 33| 1 | 0 | 0 | remove from index | 34| 1 | 1 | 0 | if loc mod since last sync then recreate | 35| 1 | 0 | 1 | if rem mod since last sync then recreate | 36| | | | | 37| 0 | 1 | 1 | check sha and modified date | 38| 0 | 1 | 0 | create on remote | 39| | | | | 40| 0 | 0 | 1 | create on local | 41+-------+-------+--------+-------------------------------------------+ 42""" 43 44 45class Changes(object): 46 """This is essentially a dictionary of lists 47 """ 48 def __init__(self, proj): 49 self.proj = weakref.ref(proj) 50 # make sure indices are up to date 51 proj.local.rebuild_index() 52 proj.osf.rebuild_index() 53 # create the names of the self attributes 54 # the actual attributes will be created during _set_empty 55 self._change_types = [] 56 # order is determined by the conflict cases (requires most change) 57 for action in ['add', 'mv', 'update', 'del']: 58 for target in ['local', 'remote']: 59 self._change_types.append("{}_{}".format(action, target)) 60 self._set_empty() 61 self.local_index = proj.local.index 62 self.remote_index = proj.osf.index 63 self.last_index = proj.index 64 self.analyze() 65 self._status = 0 66 67 def __str__(self): 68 s = "\t Add\t Del\t Mv\t Update\n" 69 for kind in ['local', 'remote']: 70 s += "{}\t {:n}\t {:n}\t {:n}\t {:n}\n".format( 71 kind, 72 len(getattr(self, "add_{}".format(kind))), 73 len(getattr(self, "del_{}".format(kind))), 74 len(getattr(self, "mv_{}".format(kind))), 75 len(getattr(self, "update_{}".format(kind))), 76 ) 77 return s 78 79 def __len__(self): 80 return len(self.dry_run()) 81 82 def _set_empty(self): 83 for attrib_name in self._change_types: 84 setattr(self, attrib_name, {}) 85 86 def _make_dirs(self, path): 87 """Replaces os.makedirs by keeping tack of what folders we added 88 """ 89 root = path 90 to_add = [] 91 print("DoingMakeDirs for {}".format(path)) 92 # find how low we have to go before valid path found 93 while not os.path.isdir(root): 94 root, needed = os.path.split(root) 95 to_add.insert(0, needed) # insert at beginning to add first 96 if root=='' or needed=='': 97 break # we got to either "/" or "path" 98 # now create those folder rescursively from bottom 99 for this_folder in to_add: 100 root = os.path.join(root, this_folder) 101 os.mkdir(root) 102 self.add_to_index(root) # update the index with this new folder 103 print("AddedFolder {}".format(root)) 104 105 def apply_add_local(self, asset, new_path=None, threaded=False): 106 proj = self.proj() 107 full_path = os.path.join(proj.local.root_path, new_path) 108 109 # handle folders 110 if asset['kind'] == "folder": 111 if os.path.isdir(full_path): 112 self.add_to_index(full_path) # make sure its in index 113 return 1 # the folder may have been created implicitly already 114 else: 115 self._make_dirs(full_path) 116 logging.info("Sync.Changes: created folder: {}" 117 .format(full_path)) 118 return 1 # the folder may have been created implicitly already 119 120 # this is a file 121 container, filename = os.path.split(full_path) 122 if not os.path.isdir(container): 123 self._make_dirs(container) 124 proj.osf.session.download_file(asset['url'], full_path, 125 size=asset['size'], 126 threaded=threaded, changes=self) 127 logging.info("Sync.Changes request: File download: {}" 128 .format(new_path)) 129 return 1 130 131 def apply_add_remote(self, asset, new_path=None, threaded=False): 132 proj = self.proj() 133 if new_path in proj.osf.containers: 134 # this has already handled (e.g. during prev file upload) 135 return 1 136 elif new_path is not None: 137 asset = copy.copy(asset) 138 asset['path'] = new_path 139 if asset['kind'] == 'folder': 140 proj.osf.add_container(asset['path'], kind='folder', changes=self) 141 logging.info("Sync.Changes request: Create folder: {}" 142 .format(new_path)) 143 else: 144 proj.osf.add_file(asset, threaded=threaded, changes=self) 145 logging.info("Sync.Changes request: File upload: {}" 146 .format(new_path)) 147 return 1 148 149 def apply_mv_local(self, asset, new_path, threaded=False): 150 if asset['kind'] == 'folder': 151 return 1 152 153 proj = self.proj() 154 full_path_new = os.path.join(proj.local.root_path, new_path) 155 full_path_old = os.path.join(proj.local.root_path, asset['path']) 156 # check if folder exists: 157 new_folder = os.path.split(full_path_new)[0] 158 if not os.path.isdir(new_folder): 159 self._make_dirs(new_folder) 160 shutil.move(full_path_old, full_path_new) 161 self.rename_in_index(asset, new_path) 162 logging.info("Sync.Changes done: Moved file locally: {} -> {}" 163 .format(asset['full_path'], new_path)) 164 return 1 165 166 def apply_mv_remote(self, asset, new_path, threaded=False): 167 proj = self.proj() 168 new_folder, new_name = os.path.split(new_path) 169 proj.osf.rename_file(asset, new_path, changes=self) 170 logging.info("Sync.Changes request: Move file remote: {} -> {}" 171 .format(asset['path'], new_path)) 172 return 1 173 174 def apply_del_local(self, asset, new_path=None, threaded=False): 175 proj = self.proj() 176 full_path = os.path.join(proj.local.root_path, new_path) 177 if os.path.isfile(full_path): # might have been removed already? 178 os.remove(full_path) 179 if os.path.isdir(full_path): # might have been removed already? 180 os.rmdir(full_path) 181 logging.info("Sync.Changes done: Removed file locally: {}" 182 .format(asset['path'])) 183 self.remove_from_index(asset['path']) 184 return 1 185 186 def apply_del_remote(self, asset, new_path=None, threaded=False): 187 proj = self.proj() 188 proj.osf.del_file(asset, changes=self) 189 logging.info("Sync.Changes request: Remove file remotely: {}" 190 .format(asset['path'])) 191 return 1 192 193 def apply_update_local(self, asset, new_path=None, threaded=False): 194 proj = self.proj() 195 full_path = os.path.join(proj.local.root_path, asset['path']) 196 # remove previous copy of file 197 if os.path.isfile(full_path): # might have been removed already? 198 os.remove(full_path) 199 self.remove_from_index(asset['path']) 200 # then fetch new one from remote 201 proj.osf.session.download_file(asset['url'], full_path, 202 size=asset['size'], 203 threaded=threaded, changes=self) 204 logging.info("Sync.Changes request: Update file locally: {}" 205 .format(asset['path'])) 206 return 1 207 208 def apply_update_remote(self, asset, new_path=None, threaded=False): 209 proj = self.proj() 210 proj.osf.add_file(asset, update=True, 211 threaded=threaded, changes=self) 212 logging.info("Sync.Changes request: Update file remotely: {}" 213 .format(new_path)) 214 return 1 215 216 def _asset_from_path(self, path): 217 """Try to find asset and return it 218 """ 219 root = self.proj().root_path 220 if path.startswith(root): 221 path = path.replace(root, '') 222 while path.startswith('/'): 223 path = path[1:] 224 path = path.replace(self.proj().root_path, '') 225 local_dict = dict_from_list(self.local_index, 'path') 226 last_dict = dict_from_list(self.last_index, 'path') 227 remote_dict = dict_from_list(self.remote_index, 'path') 228 if path in local_dict: 229 return local_dict[path] 230 elif path in last_dict: 231 return last_dict[path] 232 elif path in remote_dict: 233 return remote_dict[path] 234 else: 235 for ky in remote_dict.keys(): 236 print(' - {}, '.format(ky, remote_dict[ky]['path'])) 237 return 0 # fail 238 239 def add_to_index(self, path): 240 """Tries to find the asset from the path to delete it 241 242 Path is ideally a local path (which acts as a key to the asset in 243 the local index) but if it's a URL we'll try to deduce the local path 244 """ 245 asset = self._asset_from_path(path) 246 if asset: 247 self.last_index.append(asset) 248 return 1 # success 249 else: 250 logging.error("Was asked to add {} to index but " 251 "it wasn't found. That could lead to corruption." 252 .format(path)) 253 return 0 # fail 254 255 def remove_from_index(self, path): 256 asset = self._asset_from_path(path) 257 if asset: 258 self.last_index.remove(asset) 259 return 1 # success 260 else: 261 logging.error("Was asked to remove {} from index but " 262 "it wasn't found. That could lead to corruption." 263 .format(path)) 264 return 0 # fail 265 266 267 def rename_in_index(self, asset, new_path): 268 last_dict = dict_from_list(self.last_index, 'path') 269 if asset['path'] in last_dict: 270 new_asset = last_dict[asset['path']] 271 new_asset['path'] = new_path 272 return 1 273 else: 274 logging.error("Was asked to remove {} from index but " 275 "it wasn't in index. That could lead to corruption." 276 .format(asset['path'])) 277 return 0 278 279 280 def apply(self, threaded=False, dry_run=False): 281 """Apply the changes using the given remote.Session object 282 returns a list of strings about what happened (or will happen if 283 dry_run=True) 284 """ 285 proj = self.proj() 286 self._status = 1 287 actions = [] 288 # would it be wise to perform del operations before others? 289 for action_type in self._change_types: 290 action_dict = getattr(self, action_type) 291 path_list = list(action_dict.keys()) # for Python3 convert to list 292 # sort by path 293 if action_type[:3] in ['del', 'mv_']: 294 reverse = True # so folders deleted last 295 else: 296 reverse = False # so folders created first 297 path_list.sort(reverse=reverse) 298 # get the self.apply___() function to be applied 299 func_apply = getattr(self, "apply_{}".format(action_type)) 300 for new_path in path_list: 301 asset = action_dict[new_path] 302 if dry_run: 303 actions.append("{}: {}".format(action_type, new_path)) 304 else: 305 func_apply(asset, new_path, threaded=threaded) 306 if not dry_run: 307 proj.local._needs_rebuild_index = True 308 if threaded: 309 proj.osf.session.apply_changes() # starts the up/downloads 310 else: 311 self.finish_sync() 312 return actions 313 314 def dry_run(self): 315 """Doesn't do anything but returns a list of strings describing the 316 actions 317 """ 318 return self.apply(dry_run=True) 319 320 @property 321 def progress(self): 322 """Returns the progress of changes: 323 0 for not started sync (need apply()) 324 dict during sync {'up':[done, total], 'down':[done, total]} 325 1 for finished 326 """ 327 if self._status in [0, -1]: # not started or had already finished 328 return self._status 329 # otherwise we're partway through sync so check with session 330 prog = self.proj().osf.session.get_progress() 331 if prog == 1: 332 self._status = -1 # was running but now finished 333 else: 334 self._status = 1 # running 335 return prog # probably a dictionary 336 337 def finish_sync(self): 338 """Rebuilds index and saves project file when the sync has finished 339 """ 340 proj = self.proj() 341 # when local/remote updates are complete refresh index based on local 342 proj.local.rebuild_index() 343 # proj.index = proj.local.index 344 self._set_empty() 345 proj.save() 346 if hasattr(logging, 'flush'): # psychopy.logging has control of flush 347 logging.flush() 348 349 def analyze(self): 350 """Take a list of files 351 """ 352 local = self.local_index 353 remote = self.remote_index 354 index = self.last_index 355 # copies of the three asset lists. 356 # Safe to alter these only at top level 357 local_p = dict_from_list(local, 'path') 358 remote_p = dict_from_list(remote, 'path') 359 index_p = dict_from_list(index, 'path') 360 361 # go through the files in the database 362 for path, asset in index_p.items(): 363 # code:1xx all these files existed at last sync 364 if path in remote_p.keys() and path in local_p.keys(): 365 # code:111 366 # Still exists in all. Check for local/remote modifications 367 local_asset = local_p[path] 368 remote_asset = remote_p[path] 369 370 if asset['kind'] == 'folder': 371 # for folders check /contents/ not folder itself 372 logging.debug("Sync.analyze 111a: {} no action" 373 .format(path)) 374 375 elif asset[SHA] == remote_asset[SHA] and \ 376 asset[SHA] == local_asset[SHA]: 377 # all copies match. Go and have a cup of tea. 378 logging.debug("Sync.analyze 111b: {} no action" 379 .format(path)) 380 381 elif asset[SHA] != remote_asset[SHA] and \ 382 asset[SHA] != local_asset[SHA]: 383 # both changed. Conflict! 384 local_time = local_asset['date_modified'] 385 remote_time = remote_asset['date_modified'] 386 local_path, remote_path = conflict_paths(path, local_time, 387 remote_time) 388 # rename the remote and local files with CONFLICT tag 389 self.mv_local[local_path] = local_asset 390 self.mv_remote[remote_path] = remote_asset 391 # and swap the version from the other side 392 self.add_local[remote_path] = remote_asset 393 self.add_remote[local_path] = local_asset 394 logging.info("Sync.analyze 111c: {} conflict" 395 "(changed on local and remote)" 396 .format(path)) 397 398 elif asset[SHA] != remote_asset[SHA]: 399 # changed remotely only 400 # TODO: we know the files differ and we presume the remote 401 # is the newer one. Could check the date_modified? 402 # But if they differed wouldn't that mean a clock err? 403 self.update_local['path'] = remote_asset 404 logging.info("Sync.analyze 111d: {} changed remotely" 405 .format(path)) 406 407 elif asset[SHA] != local_asset[SHA]: 408 # changed locally only 409 # TODO: we know the files differ and we presume the local 410 # is the newer one. Could check the date_modified? 411 # But if they differed wouldn't that mean a clock err? 412 # fetch the links from the remote so we can do an update op 413 local_asset['links'] = remote_asset['links'] 414 self.update_remote['path'] = local_asset 415 logging.info("Sync.analyze 111e: {} changed locally" 416 .format(path)) 417 418 # don't re-analyze 419 del local_p[path] 420 del remote_p[path] 421 422 elif path not in remote_p.keys() and path not in local_p.keys(): 423 # code:100 424 # Was deleted in both. Remove from index 425 logging.debug("Sync.analyze 100: {}" 426 "deleted locally and remotely" 427 .format(path)) 428 self.remove_from_index(path) 429 430 elif path not in local_p.keys(): 431 remote_asset = remote_p[path] 432 # code:101 has been deleted locally but exists remotely 433 if asset['date_modified'] < remote_asset['date_modified']: 434 # deleted locally but changed on remote. Recreate 435 # make new path and get the newer asset info 436 new_path = recreated_path(path) 437 # remote: rename (move) to include "_DELETED" 438 # local: just add the new asset with new path 439 self.add_local[new_path] = remote_asset 440 self.mv_remote[new_path] = remote_asset 441 logging.warn("Sync.analyze 101a: {} conflict " 442 "(deleted locally and changed remotely)" 443 .format(path)) 444 else: 445 # deleted locally unchanged remotely. Delete remotely 446 self.del_remote[asset['path']] = remote_asset 447 logging.info("Sync.analyze 101b: {} " 448 "deleted locally (and unchanged remotely)" 449 .format(path)) 450 del remote_p[path] # remove so we don't re-analyze 451 452 elif path not in remote_p.keys(): 453 # has been deleted remotely but exists locally 454 # code:110 455 local_asset = local_p[path] 456 if asset['date_modified'] < local_asset['date_modified']: 457 # deleted remotely but changed on local. Recreate 458 # make new path and get the newer asset info 459 new_path = recreated_path(path) 460 # remote: rename (move) to include "_DELETED" 461 self.mv_local[new_path] = local_asset 462 # local: just add the new asset with new path 463 self.add_remote[new_path] = local_asset 464 logging.warn("Sync.analyze 110a: {} conflict " 465 "(deleted remotely unchanged locally)" 466 .format(path)) 467 else: 468 # deleted remotely unchanged locally. Delete locally 469 self.del_local[asset['path']] = asset 470 logging.info("Sync.analyze 110b: {} " 471 "deleted remotely (and unchanged locally)" 472 .format(path)) 473 del local_p[path] # remove so we don't re-analyse 474 475 # go through the files in the local 476 for path, local_asset in local_p.items(): 477 # code:01x we know these files aren't in index but are local 478 if path in remote_p.keys(): 479 if local_asset['kind'] == 'folder': # if folder then leave 480 continue 481 # TODO: do we need to handle the case that the user creates a 482 # folder in one place and file in another with same names?! 483 remote_asset = remote_p[path] 484 # code:011 485 if remote_asset[SHA] == local_asset[SHA]: 486 # both copies match but not in index (user uplaoded?) 487 logging.debug("Sync.analyze 011a: {} " 488 "added remotely and locally (identical file)" 489 .format(path)) 490 del remote_p[path] 491 else: 492 # code:010 493 self.add_remote[path] = local_asset 494 logging.info("Sync.analyze 010a: {} added locally" 495 .format(path)) 496 497 # go through the files in the remote 498 for path, remote_asset in remote_p.items(): 499 # code:001 has been created remotely 500 self.add_local[path] = remote_asset 501 logging.info("Sync.analyze 001a: {} added remotely" 502 .format(path)) 503 504 505def recreated_path(path): 506 """If we have to add a file back (that was deleted) then add RECREATED to 507 the name 508 """ 509 root, ext = os.path.splitext(path) 510 return root+"_DELETED"+ext 511 512 513def conflict_paths(path, local_time, server_time): 514 """ 515 """ 516 root, ext = os.path.splitext(path) 517 local = "{}_CONFLICT{}{}".format(root, local_time, ext) 518 server = "{}_CONFLICT{}{}".format(root, server_time, ext) 519 return local, server 520 521 522def _update_path(asset, new_path=None): 523 """Helper function to check whether path *in* the dict matches the key 524 """ 525 if new_path is None: 526 new_path = asset['path'] 527 elif new_path != asset['path']: 528 asset = copy.copy(asset) # update a new copy 529 asset['path'] = new_path 530 return new_path, asset 531