1#!/usr/bin/env python 2############################################################################# 3# Copyright (c) 2015-2018 Balabit 4# 5# This program is free software; you can redistribute it and/or modify it 6# under the terms of the GNU General Public License version 2 as published 7# by the Free Software Foundation, or (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 17# 18# As an additional exemption you are allowed to compile & link against the 19# OpenSSL libraries as published by the OpenSSL project. See the file 20# COPYING for details. 21# 22############################################################################# 23import logging 24import shutil 25 26from pathlib2 import Path 27 28import src.testcase_parameters.testcase_parameters as tc_parameters 29from src.common.blocking import DEFAULT_TIMEOUT 30from src.common.blocking import wait_until_true 31from src.common.blocking import wait_until_true_custom 32 33logger = logging.getLogger(__name__) 34 35 36def open_file(file_path, mode): 37 # Python 2 compatibility note: open() can work only with string representation of path 38 return open(str(file_path), mode) 39 40 41def copy_file(src_file_path, dst_dir): 42 # Python 2 compatibility note: shutil.copy() can work only with string representation of path 43 shutil.copy(str(src_file_path), str(dst_dir)) 44 45 46def copy_shared_file(testcase_parameters, shared_file_name): 47 shared_dir = testcase_parameters.get_shared_dir() 48 copy_file(Path(shared_dir, shared_file_name), testcase_parameters.get_working_dir()) 49 return Path(testcase_parameters.get_working_dir(), shared_file_name) 50 51 52def delete_session_file(shared_file_name): 53 shared_file_name = Path(tc_parameters.WORKING_DIR, shared_file_name) 54 shared_file_name.unlink() 55 56 57class File(object): 58 def __init__(self, file_path): 59 self.path = Path(file_path) 60 self.__opened_file = None 61 62 def __del__(self): 63 if self.is_opened(): 64 self.close() 65 66 def wait_for_creation(self): 67 file_created = wait_until_true(self.path.exists) 68 if file_created: 69 logger.debug("File has been created: {}".format(self.path)) 70 else: 71 raise Exception("File was not created in time: {}".format(self.path)) 72 return file_created 73 74 def open(self, mode): 75 self.__opened_file = open_file(self.path, mode) 76 return self.__opened_file 77 78 def close(self): 79 self.__opened_file.close() 80 self.__opened_file = None 81 82 def is_opened(self): 83 return self.__opened_file is not None 84 85 def readline(self): 86 if not self.is_opened(): 87 raise Exception("File was not opened before trying to read from it.") 88 return self.__opened_file.readline() 89 90 def read(self): 91 content = "" 92 while True: 93 buffer = self.__opened_file.read() 94 if buffer == "": 95 break 96 content += buffer 97 return content 98 99 def write(self, content): 100 self.__opened_file.write(content) 101 self.__opened_file.flush() 102 103 def wait_for_lines(self, lines, timeout=DEFAULT_TIMEOUT): 104 def find_lines_in_file(lines_to_find, lines_found, f): 105 line_read = f.readline() 106 if not line_read and lines_to_find: 107 return False 108 for line_to_find in lines_to_find: 109 if line_to_find in line_read: 110 lines_found.append(line_read) 111 lines_to_find.remove(line_to_find) 112 break 113 everything_is_found = lines_to_find == [] 114 return everything_is_found 115 116 lines_found = [] 117 if not wait_until_true_custom(find_lines_in_file, (lines, lines_found, self), timeout=timeout, poll_freq=0): 118 raise Exception("Could not find all lines in {}. Remaining lines to find: {} Lines found: {}".format(self.path, lines, lines_found)) 119 120 return lines_found 121 122 def wait_for_number_of_lines(self, number_of_lines, timeout=DEFAULT_TIMEOUT): 123 lines = ["\n"] * number_of_lines 124 try: 125 return self.wait_for_lines(lines, timeout) 126 except Exception: 127 raise Exception("Could not find {} number of lines in {}. Remaining number of lines: {}.".format(number_of_lines, self.path, len(lines))) 128