1#!/usr/bin/env python 2# 3# Python-bindings store type test script 4# 5# Copyright (C) 2011-2021, Joachim Metz <joachim.metz@gmail.com> 6# 7# Refer to AUTHORS for acknowledgements. 8# 9# This program is free software: you can redistribute it and/or modify 10# it under the terms of the GNU Lesser General Public License as published by 11# the Free Software Foundation, either version 3 of the License, or 12# (at your option) any later version. 13# 14# This program is distributed in the hope that it will be useful, 15# but WITHOUT ANY WARRANTY; without even the implied warranty of 16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17# GNU General Public License for more details. 18# 19# You should have received a copy of the GNU Lesser General Public License 20# along with this program. If not, see <https://www.gnu.org/licenses/>. 21 22import argparse 23import os 24import random 25import sys 26import unittest 27 28import pyvshadow 29 30 31class DataRangeFileObject(object): 32 """File-like object that maps an in-file data range.""" 33 34 def __init__(self, path, range_offset, range_size): 35 """Initializes a file-like object. 36 37 Args: 38 path (str): path of the file that contains the data range. 39 range_offset (int): offset where the data range starts. 40 range_size (int): size of the data range starts, or None to indicate 41 the range should continue to the end of the parent file-like object. 42 """ 43 super(DataRangeFileObject, self).__init__() 44 self._current_offset = 0 45 self._file_object = open(path, "rb") 46 self._range_offset = range_offset 47 self._range_size = range_size 48 49 def __enter__(self): 50 """Enters a with statement.""" 51 return self 52 53 def __exit__(self, unused_type, unused_value, unused_traceback): 54 """Exits a with statement.""" 55 return 56 57 def close(self): 58 """Closes the file-like object.""" 59 if self._file_object: 60 self._file_object.close() 61 self._file_object = None 62 63 def get_offset(self): 64 """Retrieves the current offset into the file-like object. 65 66 Returns: 67 int: current offset in the data range. 68 """ 69 return self._current_offset 70 71 def get_size(self): 72 """Retrieves the size of the file-like object. 73 74 Returns: 75 int: size of the data range. 76 """ 77 return self._range_size 78 79 def read(self, size=None): 80 """Reads a byte string from the file-like object at the current offset. 81 82 The function will read a byte string of the specified size or 83 all of the remaining data if no size was specified. 84 85 Args: 86 size (Optional[int]): number of bytes to read, where None is all 87 remaining data. 88 89 Returns: 90 bytes: data read. 91 92 Raises: 93 IOError: if the read failed. 94 """ 95 if (self._range_offset < 0 or 96 (self._range_size is not None and self._range_size < 0)): 97 raise IOError("Invalid data range.") 98 99 if self._current_offset < 0: 100 raise IOError( 101 "Invalid current offset: {0:d} value less than zero.".format( 102 self._current_offset)) 103 104 if (self._range_size is not None and 105 self._current_offset >= self._range_size): 106 return b"" 107 108 if size is None: 109 size = self._range_size 110 if self._range_size is not None and self._current_offset + size > self._range_size: 111 size = self._range_size - self._current_offset 112 113 self._file_object.seek( 114 self._range_offset + self._current_offset, os.SEEK_SET) 115 116 data = self._file_object.read(size) 117 118 self._current_offset += len(data) 119 120 return data 121 122 def seek(self, offset, whence=os.SEEK_SET): 123 """Seeks to an offset within the file-like object. 124 125 Args: 126 offset (int): offset to seek to. 127 whence (Optional(int)): value that indicates whether offset is an absolute 128 or relative position within the file. 129 130 Raises: 131 IOError: if the seek failed. 132 """ 133 if self._current_offset < 0: 134 raise IOError( 135 "Invalid current offset: {0:d} value less than zero.".format( 136 self._current_offset)) 137 138 if whence == os.SEEK_CUR: 139 offset += self._current_offset 140 elif whence == os.SEEK_END: 141 offset += self._range_size 142 elif whence != os.SEEK_SET: 143 raise IOError("Unsupported whence.") 144 if offset < 0: 145 raise IOError("Invalid offset value less than zero.") 146 147 self._current_offset = offset 148 149 150class StoreTypeTests(unittest.TestCase): 151 """Tests the store type.""" 152 153 def test_read_buffer(self): 154 """Tests the read_buffer function.""" 155 test_source = unittest.source 156 if not test_source: 157 raise unittest.SkipTest("missing source") 158 159 vshadow_volume = pyvshadow.volume() 160 161 with DataRangeFileObject( 162 test_source, unittest.offset or 0, None) as file_object: 163 164 vshadow_volume.open_file_object(file_object) 165 166 if vshadow_volume.number_of_stores == 0: 167 raise unittest.SkipTest("missing stores") 168 169 vshadow_store = vshadow_volume.get_store( 170 vshadow_volume.number_of_stores - 1) 171 self.assertIsNotNone(vshadow_store) 172 173 size = vshadow_store.get_size() 174 175 if size < 4096: 176 # Test read without maximum size. 177 vshadow_store.seek_offset(0, os.SEEK_SET) 178 179 data = vshadow_store.read_buffer() 180 181 self.assertIsNotNone(data) 182 self.assertEqual(len(data), size) 183 184 # Test read with maximum size. 185 vshadow_store.seek_offset(0, os.SEEK_SET) 186 187 data = vshadow_store.read_buffer(size=4096) 188 189 self.assertIsNotNone(data) 190 self.assertEqual(len(data), min(size, 4096)) 191 192 if size > 8: 193 vshadow_store.seek_offset(-8, os.SEEK_END) 194 195 # Read buffer on size boundary. 196 data = vshadow_store.read_buffer(size=4096) 197 198 self.assertIsNotNone(data) 199 self.assertEqual(len(data), 8) 200 201 # Read buffer beyond size boundary. 202 data = vshadow_store.read_buffer(size=4096) 203 204 self.assertIsNotNone(data) 205 self.assertEqual(len(data), 0) 206 207 # Stress test read buffer. 208 vshadow_store.seek_offset(0, os.SEEK_SET) 209 210 remaining_size = size 211 212 for _ in range(1024): 213 read_size = int(random.random() * 4096) 214 215 data = vshadow_store.read_buffer(size=read_size) 216 217 self.assertIsNotNone(data) 218 219 data_size = len(data) 220 221 if read_size > remaining_size: 222 read_size = remaining_size 223 224 self.assertEqual(data_size, read_size) 225 226 remaining_size -= data_size 227 228 if not remaining_size: 229 vshadow_store.seek_offset(0, os.SEEK_SET) 230 231 remaining_size = size 232 233 with self.assertRaises(ValueError): 234 vshadow_store.read_buffer(size=-1) 235 236 vshadow_volume.close() 237 238 def test_read_buffer_at_offset(self): 239 """Tests the read_buffer_at_offset function.""" 240 test_source = unittest.source 241 if not test_source: 242 raise unittest.SkipTest("missing source") 243 244 vshadow_volume = pyvshadow.volume() 245 246 with DataRangeFileObject( 247 test_source, unittest.offset or 0, None) as file_object: 248 249 vshadow_volume.open_file_object(file_object) 250 251 if vshadow_volume.number_of_stores == 0: 252 raise unittest.SkipTest("missing stores") 253 254 vshadow_store = vshadow_volume.get_store( 255 vshadow_volume.number_of_stores - 1) 256 self.assertIsNotNone(vshadow_store) 257 258 size = vshadow_store.get_size() 259 260 # Test normal read. 261 data = vshadow_store.read_buffer_at_offset(4096, 0) 262 263 self.assertIsNotNone(data) 264 self.assertEqual(len(data), min(size, 4096)) 265 266 if size > 8: 267 # Read buffer on size boundary. 268 data = vshadow_store.read_buffer_at_offset(4096, size - 8) 269 270 self.assertIsNotNone(data) 271 self.assertEqual(len(data), 8) 272 273 # Read buffer beyond size boundary. 274 data = vshadow_store.read_buffer_at_offset(4096, size + 8) 275 276 self.assertIsNotNone(data) 277 self.assertEqual(len(data), 0) 278 279 # Stress test read buffer. 280 for _ in range(1024): 281 random_number = random.random() 282 283 media_offset = int(random_number * size) 284 read_size = int(random_number * 4096) 285 286 data = vshadow_store.read_buffer_at_offset(read_size, media_offset) 287 288 self.assertIsNotNone(data) 289 290 remaining_size = size - media_offset 291 292 data_size = len(data) 293 294 if read_size > remaining_size: 295 read_size = remaining_size 296 297 self.assertEqual(data_size, read_size) 298 299 remaining_size -= data_size 300 301 if not remaining_size: 302 vshadow_store.seek_offset(0, os.SEEK_SET) 303 304 with self.assertRaises(ValueError): 305 vshadow_store.read_buffer_at_offset(-1, 0) 306 307 with self.assertRaises(ValueError): 308 vshadow_store.read_buffer_at_offset(4096, -1) 309 310 vshadow_volume.close() 311 312 def test_seek_offset(self): 313 """Tests the seek_offset function.""" 314 test_source = unittest.source 315 if not test_source: 316 raise unittest.SkipTest("missing source") 317 318 vshadow_volume = pyvshadow.volume() 319 320 with DataRangeFileObject( 321 test_source, unittest.offset or 0, None) as file_object: 322 323 vshadow_volume.open_file_object(file_object) 324 325 if vshadow_volume.number_of_stores == 0: 326 raise unittest.SkipTest("missing stores") 327 328 vshadow_store = vshadow_volume.get_store( 329 vshadow_volume.number_of_stores - 1) 330 self.assertIsNotNone(vshadow_store) 331 332 size = vshadow_store.get_size() 333 334 vshadow_store.seek_offset(16, os.SEEK_SET) 335 336 offset = vshadow_store.get_offset() 337 self.assertEqual(offset, 16) 338 339 vshadow_store.seek_offset(16, os.SEEK_CUR) 340 341 offset = vshadow_store.get_offset() 342 self.assertEqual(offset, 32) 343 344 vshadow_store.seek_offset(-16, os.SEEK_CUR) 345 346 offset = vshadow_store.get_offset() 347 self.assertEqual(offset, 16) 348 349 if size > 16: 350 vshadow_store.seek_offset(-16, os.SEEK_END) 351 352 offset = vshadow_store.get_offset() 353 self.assertEqual(offset, size - 16) 354 355 vshadow_store.seek_offset(16, os.SEEK_END) 356 357 offset = vshadow_store.get_offset() 358 self.assertEqual(offset, size + 16) 359 360 # TODO: change IOError into ValueError 361 with self.assertRaises(IOError): 362 vshadow_store.seek_offset(-1, os.SEEK_SET) 363 364 # TODO: change IOError into ValueError 365 with self.assertRaises(IOError): 366 vshadow_store.seek_offset(-32 - size, os.SEEK_CUR) 367 368 # TODO: change IOError into ValueError 369 with self.assertRaises(IOError): 370 vshadow_store.seek_offset(-32 - size, os.SEEK_END) 371 372 # TODO: change IOError into ValueError 373 with self.assertRaises(IOError): 374 vshadow_store.seek_offset(0, -1) 375 376 vshadow_volume.close() 377 378 def test_get_offset(self): 379 """Tests the get_offset function.""" 380 test_source = unittest.source 381 if not test_source: 382 raise unittest.SkipTest("missing source") 383 384 vshadow_volume = pyvshadow.volume() 385 386 with DataRangeFileObject( 387 test_source, unittest.offset or 0, None) as file_object: 388 389 vshadow_volume.open_file_object(file_object) 390 391 if vshadow_volume.number_of_stores == 0: 392 raise unittest.SkipTest("missing stores") 393 394 vshadow_store = vshadow_volume.get_store( 395 vshadow_volume.number_of_stores - 1) 396 self.assertIsNotNone(vshadow_store) 397 398 offset = vshadow_store.get_offset() 399 self.assertIsNotNone(offset) 400 401 vshadow_volume.close() 402 403 def test_get_size(self): 404 """Tests the get_size function and size property.""" 405 test_source = unittest.source 406 if not test_source: 407 raise unittest.SkipTest("missing source") 408 409 vshadow_volume = pyvshadow.volume() 410 411 with DataRangeFileObject( 412 test_source, unittest.offset or 0, None) as file_object: 413 414 vshadow_volume.open_file_object(file_object) 415 416 if vshadow_volume.number_of_stores == 0: 417 raise unittest.SkipTest("missing stores") 418 419 vshadow_store = vshadow_volume.get_store( 420 vshadow_volume.number_of_stores - 1) 421 self.assertIsNotNone(vshadow_store) 422 423 size = vshadow_store.get_size() 424 self.assertIsNotNone(size) 425 426 self.assertIsNotNone(vshadow_store.size) 427 428 vshadow_volume.close() 429 430 def test_get_volume_size(self): 431 """Tests the get_volume_size function and volume_size property.""" 432 test_source = unittest.source 433 if not test_source: 434 raise unittest.SkipTest("missing source") 435 436 vshadow_volume = pyvshadow.volume() 437 438 with DataRangeFileObject( 439 test_source, unittest.offset or 0, None) as file_object: 440 441 vshadow_volume.open_file_object(file_object) 442 443 if vshadow_volume.number_of_stores == 0: 444 raise unittest.SkipTest("missing stores") 445 446 vshadow_store = vshadow_volume.get_store( 447 vshadow_volume.number_of_stores - 1) 448 self.assertIsNotNone(vshadow_store) 449 450 volume_size = vshadow_store.get_volume_size() 451 self.assertIsNotNone(volume_size) 452 453 self.assertIsNotNone(vshadow_store.volume_size) 454 455 vshadow_volume.close() 456 457 def test_get_creation_time(self): 458 """Tests the get_creation_time function and creation_time property.""" 459 test_source = unittest.source 460 if not test_source: 461 raise unittest.SkipTest("missing source") 462 463 vshadow_volume = pyvshadow.volume() 464 465 with DataRangeFileObject( 466 test_source, unittest.offset or 0, None) as file_object: 467 468 vshadow_volume.open_file_object(file_object) 469 470 if vshadow_volume.number_of_stores == 0: 471 raise unittest.SkipTest("missing stores") 472 473 vshadow_store = vshadow_volume.get_store( 474 vshadow_volume.number_of_stores - 1) 475 self.assertIsNotNone(vshadow_store) 476 477 creation_time = vshadow_store.get_creation_time() 478 self.assertIsNotNone(creation_time) 479 480 self.assertIsNotNone(vshadow_store.creation_time) 481 482 vshadow_volume.close() 483 484 def test_get_number_of_blocks(self): 485 """Tests the get_number_of_blocks function and number_of_blocks property.""" 486 test_source = unittest.source 487 if not test_source: 488 raise unittest.SkipTest("missing source") 489 490 vshadow_volume = pyvshadow.volume() 491 492 with DataRangeFileObject( 493 test_source, unittest.offset or 0, None) as file_object: 494 495 vshadow_volume.open_file_object(file_object) 496 497 if vshadow_volume.number_of_stores == 0: 498 raise unittest.SkipTest("missing stores") 499 500 vshadow_store = vshadow_volume.get_store( 501 vshadow_volume.number_of_stores - 1) 502 self.assertIsNotNone(vshadow_store) 503 504 number_of_blocks = vshadow_store.get_number_of_blocks() 505 self.assertIsNotNone(number_of_blocks) 506 507 self.assertIsNotNone(vshadow_store.number_of_blocks) 508 509 vshadow_volume.close() 510 511 512if __name__ == "__main__": 513 argument_parser = argparse.ArgumentParser() 514 515 argument_parser.add_argument( 516 "-o", "--offset", dest="offset", action="store", default=None, 517 type=int, help="offset of the source file.") 518 519 argument_parser.add_argument( 520 "source", nargs="?", action="store", metavar="PATH", 521 default=None, help="path of the source file.") 522 523 options, unknown_options = argument_parser.parse_known_args() 524 unknown_options.insert(0, sys.argv[0]) 525 526 setattr(unittest, "offset", options.offset) 527 setattr(unittest, "source", options.source) 528 529 unittest.main(argv=unknown_options, verbosity=2) 530