1# -*- coding: utf-8 -*- 2 3 4__license__ = 'GPL v3' 5__copyright__ = '2009, John Schember <john@nachtimwald.com>' 6__docformat__ = 'restructuredtext en' 7 8import os, shutil, time 9 10from calibre import fsync 11from calibre.devices.errors import PathError 12from calibre.utils.filenames import case_preserving_open_file 13 14 15class File: 16 17 def __init__(self, path): 18 stats = os.stat(path) 19 self.is_dir = os.path.isdir(path) 20 self.is_readonly = not os.access(path, os.W_OK) 21 self.ctime = stats.st_ctime 22 self.wtime = stats.st_mtime 23 self.size = stats.st_size 24 if path.endswith(os.sep): 25 path = path[:-1] 26 self.path = path 27 self.name = os.path.basename(path) 28 29 30def check_transfer(infile, dest): 31 infile.seek(0) 32 dest.seek(0) 33 return infile.read() == dest.read() 34 35 36class CLI: 37 38 def get_file(self, path, outfile, end_session=True): 39 path = self.munge_path(path) 40 with lopen(path, 'rb') as src: 41 shutil.copyfileobj(src, outfile) 42 43 def put_file(self, infile, path, replace_file=False, end_session=True): 44 path = self.munge_path(path) 45 close = False 46 if not hasattr(infile, 'read'): 47 infile, close = lopen(infile, 'rb'), True 48 infile.seek(0) 49 if os.path.isdir(path): 50 path = os.path.join(path, infile.name) 51 if not replace_file and os.path.exists(path): 52 raise PathError('File already exists: ' + path) 53 dest, actual_path = case_preserving_open_file(path) 54 with dest: 55 try: 56 shutil.copyfileobj(infile, dest) 57 except OSError: 58 print('WARNING: First attempt to send file to device failed') 59 time.sleep(0.2) 60 infile.seek(0) 61 dest.seek(0) 62 dest.truncate() 63 shutil.copyfileobj(infile, dest) 64 fsync(dest) 65 # if not check_transfer(infile, dest): raise Exception('Transfer failed') 66 if close: 67 infile.close() 68 return actual_path 69 70 def munge_path(self, path): 71 if path.startswith('/') and not (path.startswith(self._main_prefix) or 72 (self._card_a_prefix and path.startswith(self._card_a_prefix)) or 73 (self._card_b_prefix and path.startswith(self._card_b_prefix))): 74 path = self._main_prefix + path[1:] 75 elif path.startswith('carda:'): 76 path = path.replace('carda:', self._card_a_prefix[:-1]) 77 elif path.startswith('cardb:'): 78 path = path.replace('cardb:', self._card_b_prefix[:-1]) 79 return path 80 81 def list(self, path, recurse=False, end_session=True, munge=True): 82 if munge: 83 path = self.munge_path(path) 84 if os.path.isfile(path): 85 return [(os.path.dirname(path), [File(path)])] 86 entries = [File(os.path.join(path, f)) for f in os.listdir(path)] 87 dirs = [(path, entries)] 88 for _file in entries: 89 if recurse and _file.is_dir: 90 dirs[len(dirs):] = self.list(_file.path, recurse=True, munge=False) 91 return dirs 92 93 def mkdir(self, path, end_session=True): 94 if self.SUPPORTS_SUB_DIRS: 95 path = self.munge_path(path) 96 os.mkdir(path) 97 98 def rm(self, path, end_session=True): 99 path = self.munge_path(path) 100 self.delete_books([path]) 101 102 def touch(self, path, end_session=True): 103 path = self.munge_path(path) 104 if not os.path.exists(path): 105 lopen(path, 'wb').close() 106 if not os.path.isdir(path): 107 os.utime(path, None) 108