1#!/usr/bin/env python
2#
3# Python-bindings store type test script
4#
5# Copyright (C) 2011-2021, Joachim Metz <joachim.metz@gmail.com>
6#
7# Refer to AUTHORS for acknowledgements.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU Lesser General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU Lesser General Public License
20# along with this program.  If not, see <https://www.gnu.org/licenses/>.
21
22import argparse
23import os
24import random
25import sys
26import unittest
27
28import pyvshadow
29
30
31class DataRangeFileObject(object):
32  """File-like object that maps an in-file data range."""
33
34  def __init__(self, path, range_offset, range_size):
35    """Initializes a file-like object.
36
37    Args:
38      path (str): path of the file that contains the data range.
39      range_offset (int): offset where the data range starts.
40      range_size (int): size of the data range starts, or None to indicate
41          the range should continue to the end of the parent file-like object.
42    """
43    super(DataRangeFileObject, self).__init__()
44    self._current_offset = 0
45    self._file_object = open(path, "rb")
46    self._range_offset = range_offset
47    self._range_size = range_size
48
49  def __enter__(self):
50    """Enters a with statement."""
51    return self
52
53  def __exit__(self, unused_type, unused_value, unused_traceback):
54    """Exits a with statement."""
55    return
56
57  def close(self):
58    """Closes the file-like object."""
59    if self._file_object:
60      self._file_object.close()
61      self._file_object = None
62
63  def get_offset(self):
64    """Retrieves the current offset into the file-like object.
65
66    Returns:
67      int: current offset in the data range.
68    """
69    return self._current_offset
70
71  def get_size(self):
72    """Retrieves the size of the file-like object.
73
74    Returns:
75      int: size of the data range.
76    """
77    return self._range_size
78
79  def read(self, size=None):
80    """Reads a byte string from the file-like object at the current offset.
81
82    The function will read a byte string of the specified size or
83    all of the remaining data if no size was specified.
84
85    Args:
86      size (Optional[int]): number of bytes to read, where None is all
87          remaining data.
88
89    Returns:
90      bytes: data read.
91
92    Raises:
93      IOError: if the read failed.
94    """
95    if (self._range_offset < 0 or
96        (self._range_size is not None and self._range_size < 0)):
97      raise IOError("Invalid data range.")
98
99    if self._current_offset < 0:
100      raise IOError(
101          "Invalid current offset: {0:d} value less than zero.".format(
102              self._current_offset))
103
104    if (self._range_size is not None and
105        self._current_offset >= self._range_size):
106      return b""
107
108    if size is None:
109      size = self._range_size
110    if self._range_size is not None and self._current_offset + size > self._range_size:
111      size = self._range_size - self._current_offset
112
113    self._file_object.seek(
114        self._range_offset + self._current_offset, os.SEEK_SET)
115
116    data = self._file_object.read(size)
117
118    self._current_offset += len(data)
119
120    return data
121
122  def seek(self, offset, whence=os.SEEK_SET):
123    """Seeks to an offset within the file-like object.
124
125    Args:
126      offset (int): offset to seek to.
127      whence (Optional(int)): value that indicates whether offset is an absolute
128          or relative position within the file.
129
130    Raises:
131      IOError: if the seek failed.
132    """
133    if self._current_offset < 0:
134      raise IOError(
135          "Invalid current offset: {0:d} value less than zero.".format(
136              self._current_offset))
137
138    if whence == os.SEEK_CUR:
139      offset += self._current_offset
140    elif whence == os.SEEK_END:
141      offset += self._range_size
142    elif whence != os.SEEK_SET:
143      raise IOError("Unsupported whence.")
144    if offset < 0:
145      raise IOError("Invalid offset value less than zero.")
146
147    self._current_offset = offset
148
149
150class StoreTypeTests(unittest.TestCase):
151  """Tests the store type."""
152
153  def test_read_buffer(self):
154    """Tests the read_buffer function."""
155    test_source = unittest.source
156    if not test_source:
157      raise unittest.SkipTest("missing source")
158
159    vshadow_volume = pyvshadow.volume()
160
161    with DataRangeFileObject(
162        test_source, unittest.offset or 0, None) as file_object:
163
164      vshadow_volume.open_file_object(file_object)
165
166      if vshadow_volume.number_of_stores == 0:
167        raise unittest.SkipTest("missing stores")
168
169      vshadow_store = vshadow_volume.get_store(
170          vshadow_volume.number_of_stores - 1)
171      self.assertIsNotNone(vshadow_store)
172
173      size = vshadow_store.get_size()
174
175      if size < 4096:
176        # Test read without maximum size.
177        vshadow_store.seek_offset(0, os.SEEK_SET)
178
179        data = vshadow_store.read_buffer()
180
181        self.assertIsNotNone(data)
182        self.assertEqual(len(data), size)
183
184      # Test read with maximum size.
185      vshadow_store.seek_offset(0, os.SEEK_SET)
186
187      data = vshadow_store.read_buffer(size=4096)
188
189      self.assertIsNotNone(data)
190      self.assertEqual(len(data), min(size, 4096))
191
192      if size > 8:
193        vshadow_store.seek_offset(-8, os.SEEK_END)
194
195        # Read buffer on size boundary.
196        data = vshadow_store.read_buffer(size=4096)
197
198        self.assertIsNotNone(data)
199        self.assertEqual(len(data), 8)
200
201        # Read buffer beyond size boundary.
202        data = vshadow_store.read_buffer(size=4096)
203
204        self.assertIsNotNone(data)
205        self.assertEqual(len(data), 0)
206
207      # Stress test read buffer.
208      vshadow_store.seek_offset(0, os.SEEK_SET)
209
210      remaining_size = size
211
212      for _ in range(1024):
213        read_size = int(random.random() * 4096)
214
215        data = vshadow_store.read_buffer(size=read_size)
216
217        self.assertIsNotNone(data)
218
219        data_size = len(data)
220
221        if read_size > remaining_size:
222          read_size = remaining_size
223
224        self.assertEqual(data_size, read_size)
225
226        remaining_size -= data_size
227
228        if not remaining_size:
229          vshadow_store.seek_offset(0, os.SEEK_SET)
230
231          remaining_size = size
232
233      with self.assertRaises(ValueError):
234        vshadow_store.read_buffer(size=-1)
235
236    vshadow_volume.close()
237
238  def test_read_buffer_at_offset(self):
239    """Tests the read_buffer_at_offset function."""
240    test_source = unittest.source
241    if not test_source:
242      raise unittest.SkipTest("missing source")
243
244    vshadow_volume = pyvshadow.volume()
245
246    with DataRangeFileObject(
247        test_source, unittest.offset or 0, None) as file_object:
248
249      vshadow_volume.open_file_object(file_object)
250
251      if vshadow_volume.number_of_stores == 0:
252        raise unittest.SkipTest("missing stores")
253
254      vshadow_store = vshadow_volume.get_store(
255          vshadow_volume.number_of_stores - 1)
256      self.assertIsNotNone(vshadow_store)
257
258      size = vshadow_store.get_size()
259
260      # Test normal read.
261      data = vshadow_store.read_buffer_at_offset(4096, 0)
262
263      self.assertIsNotNone(data)
264      self.assertEqual(len(data), min(size, 4096))
265
266      if size > 8:
267        # Read buffer on size boundary.
268        data = vshadow_store.read_buffer_at_offset(4096, size - 8)
269
270        self.assertIsNotNone(data)
271        self.assertEqual(len(data), 8)
272
273        # Read buffer beyond size boundary.
274        data = vshadow_store.read_buffer_at_offset(4096, size + 8)
275
276        self.assertIsNotNone(data)
277        self.assertEqual(len(data), 0)
278
279      # Stress test read buffer.
280      for _ in range(1024):
281        random_number = random.random()
282
283        media_offset = int(random_number * size)
284        read_size = int(random_number * 4096)
285
286        data = vshadow_store.read_buffer_at_offset(read_size, media_offset)
287
288        self.assertIsNotNone(data)
289
290        remaining_size = size - media_offset
291
292        data_size = len(data)
293
294        if read_size > remaining_size:
295          read_size = remaining_size
296
297        self.assertEqual(data_size, read_size)
298
299        remaining_size -= data_size
300
301        if not remaining_size:
302          vshadow_store.seek_offset(0, os.SEEK_SET)
303
304      with self.assertRaises(ValueError):
305        vshadow_store.read_buffer_at_offset(-1, 0)
306
307      with self.assertRaises(ValueError):
308        vshadow_store.read_buffer_at_offset(4096, -1)
309
310      vshadow_volume.close()
311
312  def test_seek_offset(self):
313    """Tests the seek_offset function."""
314    test_source = unittest.source
315    if not test_source:
316      raise unittest.SkipTest("missing source")
317
318    vshadow_volume = pyvshadow.volume()
319
320    with DataRangeFileObject(
321        test_source, unittest.offset or 0, None) as file_object:
322
323      vshadow_volume.open_file_object(file_object)
324
325      if vshadow_volume.number_of_stores == 0:
326        raise unittest.SkipTest("missing stores")
327
328      vshadow_store = vshadow_volume.get_store(
329          vshadow_volume.number_of_stores - 1)
330      self.assertIsNotNone(vshadow_store)
331
332      size = vshadow_store.get_size()
333
334      vshadow_store.seek_offset(16, os.SEEK_SET)
335
336      offset = vshadow_store.get_offset()
337      self.assertEqual(offset, 16)
338
339      vshadow_store.seek_offset(16, os.SEEK_CUR)
340
341      offset = vshadow_store.get_offset()
342      self.assertEqual(offset, 32)
343
344      vshadow_store.seek_offset(-16, os.SEEK_CUR)
345
346      offset = vshadow_store.get_offset()
347      self.assertEqual(offset, 16)
348
349      if size > 16:
350        vshadow_store.seek_offset(-16, os.SEEK_END)
351
352        offset = vshadow_store.get_offset()
353        self.assertEqual(offset, size - 16)
354
355      vshadow_store.seek_offset(16, os.SEEK_END)
356
357      offset = vshadow_store.get_offset()
358      self.assertEqual(offset, size + 16)
359
360      # TODO: change IOError into ValueError
361      with self.assertRaises(IOError):
362        vshadow_store.seek_offset(-1, os.SEEK_SET)
363
364      # TODO: change IOError into ValueError
365      with self.assertRaises(IOError):
366        vshadow_store.seek_offset(-32 - size, os.SEEK_CUR)
367
368      # TODO: change IOError into ValueError
369      with self.assertRaises(IOError):
370        vshadow_store.seek_offset(-32 - size, os.SEEK_END)
371
372      # TODO: change IOError into ValueError
373      with self.assertRaises(IOError):
374        vshadow_store.seek_offset(0, -1)
375
376      vshadow_volume.close()
377
378  def test_get_offset(self):
379    """Tests the get_offset function."""
380    test_source = unittest.source
381    if not test_source:
382      raise unittest.SkipTest("missing source")
383
384    vshadow_volume = pyvshadow.volume()
385
386    with DataRangeFileObject(
387        test_source, unittest.offset or 0, None) as file_object:
388
389      vshadow_volume.open_file_object(file_object)
390
391      if vshadow_volume.number_of_stores == 0:
392        raise unittest.SkipTest("missing stores")
393
394      vshadow_store = vshadow_volume.get_store(
395          vshadow_volume.number_of_stores - 1)
396      self.assertIsNotNone(vshadow_store)
397
398      offset = vshadow_store.get_offset()
399      self.assertIsNotNone(offset)
400
401      vshadow_volume.close()
402
403  def test_get_size(self):
404    """Tests the get_size function and size property."""
405    test_source = unittest.source
406    if not test_source:
407      raise unittest.SkipTest("missing source")
408
409    vshadow_volume = pyvshadow.volume()
410
411    with DataRangeFileObject(
412        test_source, unittest.offset or 0, None) as file_object:
413
414      vshadow_volume.open_file_object(file_object)
415
416      if vshadow_volume.number_of_stores == 0:
417        raise unittest.SkipTest("missing stores")
418
419      vshadow_store = vshadow_volume.get_store(
420          vshadow_volume.number_of_stores - 1)
421      self.assertIsNotNone(vshadow_store)
422
423      size = vshadow_store.get_size()
424      self.assertIsNotNone(size)
425
426      self.assertIsNotNone(vshadow_store.size)
427
428      vshadow_volume.close()
429
430  def test_get_volume_size(self):
431    """Tests the get_volume_size function and volume_size property."""
432    test_source = unittest.source
433    if not test_source:
434      raise unittest.SkipTest("missing source")
435
436    vshadow_volume = pyvshadow.volume()
437
438    with DataRangeFileObject(
439        test_source, unittest.offset or 0, None) as file_object:
440
441      vshadow_volume.open_file_object(file_object)
442
443      if vshadow_volume.number_of_stores == 0:
444        raise unittest.SkipTest("missing stores")
445
446      vshadow_store = vshadow_volume.get_store(
447          vshadow_volume.number_of_stores - 1)
448      self.assertIsNotNone(vshadow_store)
449
450      volume_size = vshadow_store.get_volume_size()
451      self.assertIsNotNone(volume_size)
452
453      self.assertIsNotNone(vshadow_store.volume_size)
454
455      vshadow_volume.close()
456
457  def test_get_creation_time(self):
458    """Tests the get_creation_time function and creation_time property."""
459    test_source = unittest.source
460    if not test_source:
461      raise unittest.SkipTest("missing source")
462
463    vshadow_volume = pyvshadow.volume()
464
465    with DataRangeFileObject(
466        test_source, unittest.offset or 0, None) as file_object:
467
468      vshadow_volume.open_file_object(file_object)
469
470      if vshadow_volume.number_of_stores == 0:
471        raise unittest.SkipTest("missing stores")
472
473      vshadow_store = vshadow_volume.get_store(
474          vshadow_volume.number_of_stores - 1)
475      self.assertIsNotNone(vshadow_store)
476
477      creation_time = vshadow_store.get_creation_time()
478      self.assertIsNotNone(creation_time)
479
480      self.assertIsNotNone(vshadow_store.creation_time)
481
482      vshadow_volume.close()
483
484  def test_get_number_of_blocks(self):
485    """Tests the get_number_of_blocks function and number_of_blocks property."""
486    test_source = unittest.source
487    if not test_source:
488      raise unittest.SkipTest("missing source")
489
490    vshadow_volume = pyvshadow.volume()
491
492    with DataRangeFileObject(
493        test_source, unittest.offset or 0, None) as file_object:
494
495      vshadow_volume.open_file_object(file_object)
496
497      if vshadow_volume.number_of_stores == 0:
498        raise unittest.SkipTest("missing stores")
499
500      vshadow_store = vshadow_volume.get_store(
501          vshadow_volume.number_of_stores - 1)
502      self.assertIsNotNone(vshadow_store)
503
504      number_of_blocks = vshadow_store.get_number_of_blocks()
505      self.assertIsNotNone(number_of_blocks)
506
507      self.assertIsNotNone(vshadow_store.number_of_blocks)
508
509      vshadow_volume.close()
510
511
512if __name__ == "__main__":
513  argument_parser = argparse.ArgumentParser()
514
515  argument_parser.add_argument(
516      "-o", "--offset", dest="offset", action="store", default=None,
517      type=int, help="offset of the source file.")
518
519  argument_parser.add_argument(
520      "source", nargs="?", action="store", metavar="PATH",
521      default=None, help="path of the source file.")
522
523  options, unknown_options = argument_parser.parse_known_args()
524  unknown_options.insert(0, sys.argv[0])
525
526  setattr(unittest, "offset", options.offset)
527  setattr(unittest, "source", options.source)
528
529  unittest.main(argv=unknown_options, verbosity=2)
530