1from __future__ import absolute_import 2from __future__ import print_function 3 4from .block.BootBlock import BootBlock 5from .block.RootBlock import RootBlock 6from .ADFSVolDir import ADFSVolDir 7from .ADFSBitmap import ADFSBitmap 8from .FileName import FileName 9from .RootMetaInfo import RootMetaInfo 10from .FSError import * 11from .FSString import FSString 12from .TimeStamp import TimeStamp 13from . import DosType 14import amitools.util.ByteSize as ByteSize 15 16class ADFSVolume: 17 18 def __init__(self, blkdev): 19 self.blkdev = blkdev 20 21 self.boot = None 22 self.root = None 23 self.root_dir = None 24 self.bitmap = None 25 26 self.valid = False 27 self.is_ffs = None 28 self.is_intl = None 29 self.is_dircache = None 30 self.is_longname = None 31 self.name = None 32 self.meta_info = None 33 34 def open(self): 35 # read boot block 36 self.boot = BootBlock(self.blkdev) 37 self.boot.read() 38 # valid root block? 39 if self.boot.valid: 40 # get fs flags 41 dos_type = self.boot.dos_type 42 self.is_ffs = DosType.is_ffs(dos_type) 43 self.is_intl = DosType.is_intl(dos_type) 44 self.is_dircache = DosType.is_dircache(dos_type) 45 self.is_longname = DosType.is_longname(dos_type) 46 # read root 47 self.root = RootBlock(self.blkdev, self.boot.calc_root_blk) 48 self.root.read() 49 if self.root.valid: 50 self.name = FSString(self.root.name) 51 # build meta info 52 self.meta_info = RootMetaInfo( self.root.create_ts, self.root.disk_ts, self.root.mod_ts ) 53 # create root dir 54 self.root_dir = ADFSVolDir(self, self.root) 55 self.root_dir.read() 56 # create bitmap 57 self.bitmap = ADFSBitmap(self.root) 58 self.bitmap.read() 59 self.valid = True 60 else: 61 raise FSError(INVALID_ROOT_BLOCK, block=self.root) 62 else: 63 raise FSError(INVALID_BOOT_BLOCK, block=self.boot) 64 65 def create(self, name, meta_info=None, dos_type=None, boot_code=None, is_ffs=False, is_intl=False, is_dircache=False, is_longname=False): 66 # determine dos_type 67 if dos_type == None: 68 dos_type = DosType.DOS0 69 if is_longname: 70 dos_type = DosType.DOS6 71 elif is_dircache: 72 dos_type |= DosType.DOS_MASK_DIRCACHE 73 elif is_intl: 74 dos_type |= DosType.DOS_MASK_INTL 75 if is_ffs: 76 dos_type |= DosType.DOS_MASK_FFS 77 # update flags 78 self.is_ffs = DosType.is_ffs(dos_type) 79 self.is_intl = DosType.is_intl(dos_type) 80 self.is_dircache = DosType.is_dircache(dos_type) 81 self.is_longname = DosType.is_longname(dos_type) 82 # convert and check volume name 83 if not isinstance(name, FSString): 84 raise ValueError("create's name must be a FSString") 85 fn = FileName(name, is_intl=self.is_intl, is_longname=False) # Volumes don't support long names 86 if not fn.is_valid(): 87 raise FSError(INVALID_VOLUME_NAME, file_name=name, node=self) 88 # create a boot block 89 self.boot = BootBlock(self.blkdev) 90 self.boot.create(dos_type=dos_type, boot_code=boot_code) 91 self.boot.write() 92 # create a root block 93 self.root = RootBlock(self.blkdev, self.boot.calc_root_blk) 94 if meta_info == None: 95 meta_info = RootMetaInfo() 96 meta_info.set_current_as_create_time() 97 meta_info.set_current_as_mod_time() 98 meta_info.set_current_as_disk_time() 99 create_ts = meta_info.get_create_ts() 100 disk_ts = meta_info.get_disk_ts() 101 mod_ts = meta_info.get_mod_ts() 102 self.meta_info = meta_info 103 self.root.create(name.get_ami_str(), create_ts, disk_ts, mod_ts, fstype=dos_type) 104 self.name = name 105 # create bitmap 106 self.bitmap = ADFSBitmap(self.root) 107 self.bitmap.create() 108 self.bitmap.write() # writes root block, too 109 # create empty root dir 110 self.root_dir = ADFSVolDir(self, self.root) 111 self.root_dir.read() 112 # all ok 113 self.valid = True 114 115 def close(self): 116 pass 117 118 def get_info(self): 119 """return an array of strings with information on the volume""" 120 res = [] 121 total = self.get_total_blocks() 122 free = self.get_free_blocks() 123 used = total - free 124 bb = self.blkdev.block_bytes 125 btotal = total * bb 126 bfree = free * bb 127 bused = used * bb 128 prc_free = 10000 * free / total 129 prc_used = 10000 - prc_free 130 res.append("total: %10d %s %12d" % (total, ByteSize.to_byte_size_str(btotal), btotal)) 131 res.append("used: %10d %s %12d %5.2f%%" % (used, ByteSize.to_byte_size_str(bused), bused, prc_used / 100.0)) 132 res.append("free: %10d %s %12d %5.2f%%" % (free, ByteSize.to_byte_size_str(bfree), bfree, prc_free / 100.0)) 133 return res 134 135 # ----- Path Queries ----- 136 137 def get_path_name(self, path_name, allow_file=True, allow_dir=True): 138 """get node for given path""" 139 # make sure path name is a FSString 140 if not isinstance(path_name, FSString): 141 raise ValueError("get_path_name's path must be a FSString") 142 # create and check file name 143 fn = FileName(path_name, is_intl=self.is_intl, is_longname=self.is_longname) 144 if not fn.is_valid(): 145 raise FSError(INVALID_FILE_NAME, file_name=path_name, node=self) 146 # find node 147 if fn.is_root_path_alias(): 148 # its the root node 149 return self.root_dir 150 else: 151 # find a sub node 152 path = fn.split_path() 153 return self.root_dir.get_path(path, allow_file, allow_dir) 154 155 def get_dir_path_name(self, path_name): 156 """get node for given path and ensure its a directory""" 157 return self.get_path_name(path_name, allow_file=False) 158 159 def get_file_path_name(self, path_name): 160 """get node for given path and ensure its a file""" 161 return self.get_path_name(path_name, allow_dir=False) 162 163 def get_create_path_name(self, path_name, suggest_name=None): 164 """get a parent node and path name for creation 165 return: parent_node_or_none, file_name_or_none 166 """ 167 # make sure input is correct 168 if not isinstance(path_name, FSString): 169 raise ValueError("get_create_path_name's path_name must be a FSString") 170 if suggest_name != None and not isinstance(suggest_name, FSString): 171 raise ValueError("get_create_path_name's suggest_name must be a FSString") 172 # is root path? 173 fn = FileName(path_name, is_intl=self.is_intl, is_longname=self.is_longname) 174 if not fn.is_valid(): 175 raise FSError(INVALID_FILE_NAME, file_name=path_name, node=self) 176 # find node 177 if fn.is_root_path_alias(): 178 return self.root_dir, suggest_name 179 else: 180 # try to get path_name as a directory 181 node = self.get_dir_path_name(path_name) 182 if node != None: 183 return node, suggest_name 184 else: 185 # split into dir and file name 186 dn, fn = fn.get_dir_and_base_name() 187 if dn != None: 188 # has a directory -> try to fetch it 189 node = self.get_dir_path_name(dn) 190 else: 191 # no dir -> assume root dir 192 node = self.root_dir 193 if fn != None: 194 # take given name 195 return node, fn 196 else: 197 # use suggested name 198 return node, suggest_name 199 200 # ----- convenience API ----- 201 202 def get_volume_name(self): 203 return self.name 204 205 def get_root_dir(self): 206 return self.root_dir 207 208 def get_dos_type(self): 209 return self.boot.dos_type 210 211 def get_boot_code(self): 212 return self.boot.boot_code 213 214 def get_free_blocks(self): 215 return self.bitmap.get_num_free() 216 217 def get_used_blocks(self): 218 free = self.bitmap.get_num_free() 219 total = self.blkdev.num_blocks 220 return total - free 221 222 def get_total_blocks(self): 223 return self.blkdev.num_blocks 224 225 def get_meta_info(self): 226 return self.meta_info 227 228 def update_disk_time(self): 229 mi = RootMetaInfo() 230 mi.set_current_as_disk_time() 231 self.change_meta_info(mi) 232 233 def change_meta_info(self, meta_info): 234 if self.root != None and self.root.valid: 235 dirty = False 236 # update create_ts 237 create_ts = meta_info.get_create_ts() 238 if create_ts != None: 239 self.root.create_ts = meta_info.get_create_ts() 240 dirty = True 241 # update disk_ts 242 disk_ts = meta_info.get_disk_ts() 243 if disk_ts != None: 244 self.root.disk_ts = disk_ts 245 dirty = True 246 # update mod_ts 247 mod_ts = meta_info.get_mod_ts() 248 if mod_ts != None: 249 self.root.mod_ts = mod_ts 250 dirty = True 251 # update if something changed 252 if dirty: 253 self.root.write() 254 self.meta_info = RootMetaInfo( self.root.create_ts, self.root.disk_ts, self.root.mod_ts ) 255 return True 256 else: 257 return False 258 259 def change_create_ts(self, create_ts): 260 return self.change_meta_info(RootMetaInfo(create_ts=create_ts)) 261 262 def change_disk_ts(self, disk_ts): 263 return self.change_meta_info(RootMetaInfo(disk_ts=disk_ts)) 264 265 def change_mod_ts(self, mod_ts): 266 return self.change_meta_info(RootMetaInfo(mod_ts=mod_ts)) 267 268 def change_create_ts_by_string(self, create_ts_str): 269 t = TimeStamp() 270 t.parse(create_ts_str) 271 return self.change_meta_info(RootMetaInfo(create_ts=t)) 272 273 def change_disk_ts_by_string(self, disk_ts_str): 274 t = TimeStamp() 275 t.parse(disk_ts_str) 276 return self.change_meta_info(RootMetaInfo(disk_ts=t)) 277 278 def change_mod_ts_by_string(self, mod_ts_str): 279 t = TimeStamp() 280 t.parse(mod_ts_str) 281 return self.change_meta_info(RootMetaInfo(mod_ts=t)) 282 283 def relabel(self, name): 284 """Relabel the volume""" 285 # make sure its a FSString 286 if not isinstance(name, FSString): 287 raise ValueError("relabel's name must be a FSString") 288 # validate file name 289 fn = FileName(name, is_intl=self.is_intl, is_longname=False) 290 if not fn.is_valid(): 291 raise FSError(INVALID_VOLUME_NAME, file_name=name, node=self) 292 # update root block 293 ami_name = name.get_ami_str() 294 self.root.name = ami_name 295 self.root.write() 296 # store internally 297 self.name = name 298 self.root_dir.name = name 299 300 def create_dir(self, ami_path): 301 """Create a new directory""" 302 # make sure its a FSString 303 if not isinstance(ami_path, FSString): 304 raise ValueError("create_dir's ami_path must be a FSString") 305 # check file path 306 fn = FileName(ami_path, is_intl=self.is_intl, is_longname=self.is_longname) 307 if not fn.is_valid(): 308 raise FSError(INVALID_FILE_NAME, file_name=ami_path) 309 # split into dir and base name 310 dir_name, base_name = fn.get_dir_and_base_name() 311 if base_name == None: 312 raise FSError(INVALID_FILE_NAME, file_name=ami_path) 313 # find parent of dir 314 if dir_name == None: 315 node = self.root_dir 316 else: 317 # no parent dir found 318 node = self.get_dir_path_name(dir_name) 319 if node == None: 320 raise FSError(INVALID_PARENT_DIRECTORY, file_name=ami_path, extra="not found: "+dir_name) 321 node.create_dir(base_name) 322 323 def write_file(self, data, ami_path, suggest_name=None, cache=False): 324 """Write given data as a file""" 325 # get parent node and file_name 326 parent_node, file_name = self.get_create_path_name(ami_path, suggest_name) 327 if parent_node == None: 328 raise FSError(INVALID_PARENT_DIRECTORY, file_name=ami_path) 329 if file_name == None: 330 raise FSError(INVALID_FILE_NAME, file_name=file_name) 331 # create file 332 node = parent_node.create_file(file_name, data) 333 if not cache: 334 node.flush() 335 336 def read_file(self, ami_path, cache=False): 337 """Read a file and return data""" 338 # get node of file 339 node = self.get_file_path_name(ami_path) 340 if node == None: 341 raise FSError(FILE_NOT_FOUND, file_name=ami_path) 342 data = node.get_file_data() 343 if not cache: 344 node.flush() 345 return data 346 347 def delete(self, ami_path, wipe=False, all=False): 348 """Delete a file or directory at given path""" 349 node = self.get_path_name(ami_path) 350 if node == None: 351 raise FSError(FILE_NOT_FOUND, file_name=ami_path) 352 node.delete(wipe=wipe, all=all) 353