1#!/usr/bin/env python
2#############################################################################
3# Copyright (c) 2015-2018 Balabit
4#
5# This program is free software; you can redistribute it and/or modify it
6# under the terms of the GNU General Public License version 2 as published
7# by the Free Software Foundation, or (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17#
18# As an additional exemption you are allowed to compile & link against the
19# OpenSSL libraries as published by the OpenSSL project. See the file
20# COPYING for details.
21#
22#############################################################################
23import logging
24import shutil
25
26from pathlib2 import Path
27
28import src.testcase_parameters.testcase_parameters as tc_parameters
29from src.common.blocking import DEFAULT_TIMEOUT
30from src.common.blocking import wait_until_true
31from src.common.blocking import wait_until_true_custom
32
33logger = logging.getLogger(__name__)
34
35
36def open_file(file_path, mode):
37    # Python 2 compatibility note: open() can work only with string representation of path
38    return open(str(file_path), mode)
39
40
41def copy_file(src_file_path, dst_dir):
42    # Python 2 compatibility note: shutil.copy() can work only with string representation of path
43    shutil.copy(str(src_file_path), str(dst_dir))
44
45
46def copy_shared_file(testcase_parameters, shared_file_name):
47    shared_dir = testcase_parameters.get_shared_dir()
48    copy_file(Path(shared_dir, shared_file_name), testcase_parameters.get_working_dir())
49    return Path(testcase_parameters.get_working_dir(), shared_file_name)
50
51
52def delete_session_file(shared_file_name):
53    shared_file_name = Path(tc_parameters.WORKING_DIR, shared_file_name)
54    shared_file_name.unlink()
55
56
57class File(object):
58    def __init__(self, file_path):
59        self.path = Path(file_path)
60        self.__opened_file = None
61
62    def __del__(self):
63        if self.is_opened():
64            self.close()
65
66    def wait_for_creation(self):
67        file_created = wait_until_true(self.path.exists)
68        if file_created:
69            logger.debug("File has been created: {}".format(self.path))
70        else:
71            raise Exception("File was not created in time: {}".format(self.path))
72        return file_created
73
74    def open(self, mode):
75        self.__opened_file = open_file(self.path, mode)
76        return self.__opened_file
77
78    def close(self):
79        self.__opened_file.close()
80        self.__opened_file = None
81
82    def is_opened(self):
83        return self.__opened_file is not None
84
85    def readline(self):
86        if not self.is_opened():
87            raise Exception("File was not opened before trying to read from it.")
88        return self.__opened_file.readline()
89
90    def read(self):
91        content = ""
92        while True:
93            buffer = self.__opened_file.read()
94            if buffer == "":
95                break
96            content += buffer
97        return content
98
99    def write(self, content):
100        self.__opened_file.write(content)
101        self.__opened_file.flush()
102
103    def wait_for_lines(self, lines, timeout=DEFAULT_TIMEOUT):
104        def find_lines_in_file(lines_to_find, lines_found, f):
105            line_read = f.readline()
106            if not line_read and lines_to_find:
107                return False
108            for line_to_find in lines_to_find:
109                if line_to_find in line_read:
110                    lines_found.append(line_read)
111                    lines_to_find.remove(line_to_find)
112                    break
113            everything_is_found = lines_to_find == []
114            return everything_is_found
115
116        lines_found = []
117        if not wait_until_true_custom(find_lines_in_file, (lines, lines_found, self), timeout=timeout, poll_freq=0):
118            raise Exception("Could not find all lines in {}. Remaining lines to find: {} Lines found: {}".format(self.path, lines, lines_found))
119
120        return lines_found
121
122    def wait_for_number_of_lines(self, number_of_lines, timeout=DEFAULT_TIMEOUT):
123        lines = ["\n"] * number_of_lines
124        try:
125            return self.wait_for_lines(lines, timeout)
126        except Exception:
127            raise Exception("Could not find {} number of lines in {}. Remaining number of lines: {}.".format(number_of_lines, self.path, len(lines)))
128