1# Copyright 2015 Cloudbase Solutions Srl
2# All Rights Reserved.
3#
4#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5#    not use this file except in compliance with the License. You may obtain
6#    a copy of the License at
7#
8#         http://www.apache.org/licenses/LICENSE-2.0
9#
10#    Unless required by applicable law or agreed to in writing, software
11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13#    License for the specific language governing permissions and limitations
14#    under the License.
15
16import errno
17from unittest import mock
18
19from six.moves import builtins
20
21from os_win import constants
22from os_win import exceptions
23from os_win.tests.unit import test_base
24from os_win.utils.io import namedpipe
25from os_win.utils.winapi import constants as w_const
26
27
28class NamedPipeTestCase(test_base.BaseTestCase):
29    _FAKE_LOG_PATH = 'fake_log_path'
30
31    @mock.patch.object(namedpipe.NamedPipeHandler, '_setup_io_structures')
32    def setUp(self, mock_setup_structures):
33        super(NamedPipeTestCase, self).setUp()
34
35        self._mock_input_queue = mock.Mock()
36        self._mock_output_queue = mock.Mock()
37        self._mock_client_connected = mock.Mock()
38
39        self._ioutils = mock.Mock()
40
41        threading_patcher = mock.patch.object(namedpipe, 'threading')
42        threading_patcher.start()
43        self.addCleanup(threading_patcher.stop)
44
45        self._handler = namedpipe.NamedPipeHandler(
46            mock.sentinel.pipe_name,
47            self._mock_input_queue,
48            self._mock_output_queue,
49            self._mock_client_connected,
50            self._FAKE_LOG_PATH)
51        self._handler._ioutils = self._ioutils
52
53    def _mock_setup_pipe_handler(self):
54        self._handler._log_file_handle = mock.Mock()
55        self._handler._pipe_handle = mock.sentinel.pipe_handle
56        self._r_worker = mock.Mock()
57        self._w_worker = mock.Mock()
58        self._handler._workers = [self._r_worker, self._w_worker]
59        self._handler._r_buffer = mock.Mock()
60        self._handler._w_buffer = mock.Mock()
61        self._handler._r_overlapped = mock.Mock()
62        self._handler._w_overlapped = mock.Mock()
63        self._handler._r_completion_routine = mock.Mock()
64        self._handler._w_completion_routine = mock.Mock()
65
66    @mock.patch.object(builtins, 'open')
67    @mock.patch.object(namedpipe.NamedPipeHandler, '_open_pipe')
68    def test_start_pipe_handler(self, mock_open_pipe, mock_open):
69        self._handler.start()
70
71        mock_open_pipe.assert_called_once_with()
72        mock_open.assert_called_once_with(self._FAKE_LOG_PATH, 'ab', 1)
73        self.assertEqual(mock_open.return_value,
74                         self._handler._log_file_handle)
75
76        thread = namedpipe.threading.Thread
77        thread.assert_has_calls(
78            [mock.call(target=self._handler._read_from_pipe),
79             mock.call().setDaemon(True),
80             mock.call().start(),
81             mock.call(target=self._handler._write_to_pipe),
82             mock.call().setDaemon(True),
83             mock.call().start()])
84
85    @mock.patch.object(namedpipe.NamedPipeHandler, 'stop')
86    @mock.patch.object(namedpipe.NamedPipeHandler, '_open_pipe')
87    def test_start_pipe_handler_exception(self, mock_open_pipe,
88                                          mock_stop_handler):
89        mock_open_pipe.side_effect = Exception
90
91        self.assertRaises(exceptions.OSWinException,
92                          self._handler.start)
93
94        mock_stop_handler.assert_called_once_with()
95
96    @mock.patch.object(namedpipe.NamedPipeHandler, '_cleanup_handles')
97    @mock.patch.object(namedpipe.NamedPipeHandler, '_cancel_io')
98    def _test_stop_pipe_handler(self, mock_cancel_io,
99                                mock_cleanup_handles,
100                                workers_started=True):
101        self._mock_setup_pipe_handler()
102        if not workers_started:
103            handler_workers = []
104            self._handler._workers = handler_workers
105        else:
106            handler_workers = self._handler._workers
107            self._r_worker.is_alive.side_effect = (True, False)
108            self._w_worker.is_alive.return_value = False
109
110        self._handler.stop()
111
112        self._handler._stopped.set.assert_called_once_with()
113        if not workers_started:
114            mock_cleanup_handles.assert_called_once_with()
115        else:
116            self.assertFalse(mock_cleanup_handles.called)
117
118        if workers_started:
119            mock_cancel_io.assert_called_once_with()
120            self._r_worker.join.assert_called_once_with(0.5)
121            self.assertFalse(self._w_worker.join.called)
122
123        self.assertEqual([], self._handler._workers)
124
125    def test_stop_pipe_handler_workers_started(self):
126        self._test_stop_pipe_handler()
127
128    def test_stop_pipe_handler_workers_not_started(self):
129        self._test_stop_pipe_handler(workers_started=False)
130
131    @mock.patch.object(namedpipe.NamedPipeHandler, '_close_pipe')
132    def test_cleanup_handles(self, mock_close_pipe):
133        self._mock_setup_pipe_handler()
134        log_handle = self._handler._log_file_handle
135        r_event = self._handler._r_overlapped.hEvent
136        w_event = self._handler._w_overlapped.hEvent
137
138        self._handler._cleanup_handles()
139
140        mock_close_pipe.assert_called_once_with()
141        log_handle.close.assert_called_once_with()
142        self._ioutils.close_handle.assert_has_calls(
143            [mock.call(r_event), mock.call(w_event)])
144
145        self.assertIsNone(self._handler._log_file_handle)
146        self.assertIsNone(self._handler._r_overlapped.hEvent)
147        self.assertIsNone(self._handler._w_overlapped.hEvent)
148
149    def test_setup_io_structures(self):
150        self._handler._setup_io_structures()
151
152        self.assertEqual(self._ioutils.get_buffer.return_value,
153                         self._handler._r_buffer)
154        self.assertEqual(self._ioutils.get_buffer.return_value,
155                         self._handler._w_buffer)
156        self.assertEqual(
157            self._ioutils.get_new_overlapped_structure.return_value,
158            self._handler._r_overlapped)
159        self.assertEqual(
160            self._ioutils.get_new_overlapped_structure.return_value,
161            self._handler._w_overlapped)
162        self.assertEqual(
163            self._ioutils.get_completion_routine.return_value,
164            self._handler._r_completion_routine)
165        self.assertEqual(
166            self._ioutils.get_completion_routine.return_value,
167            self._handler._w_completion_routine)
168        self.assertIsNone(self._handler._log_file_handle)
169
170        self._ioutils.get_buffer.assert_has_calls(
171            [mock.call(constants.SERIAL_CONSOLE_BUFFER_SIZE)] * 2)
172        self._ioutils.get_completion_routine.assert_has_calls(
173            [mock.call(self._handler._read_callback),
174             mock.call()])
175
176    def test_open_pipe(self):
177        self._handler._open_pipe()
178
179        self._ioutils.wait_named_pipe.assert_called_once_with(
180            mock.sentinel.pipe_name)
181        self._ioutils.open.assert_called_once_with(
182            mock.sentinel.pipe_name,
183            desired_access=(w_const.GENERIC_READ | w_const.GENERIC_WRITE),
184            share_mode=(w_const.FILE_SHARE_READ | w_const.FILE_SHARE_WRITE),
185            creation_disposition=w_const.OPEN_EXISTING,
186            flags_and_attributes=w_const.FILE_FLAG_OVERLAPPED)
187
188        self.assertEqual(self._ioutils.open.return_value,
189                         self._handler._pipe_handle)
190
191    def test_close_pipe(self):
192        self._mock_setup_pipe_handler()
193
194        self._handler._close_pipe()
195
196        self._ioutils.close_handle.assert_called_once_with(
197            mock.sentinel.pipe_handle)
198        self.assertIsNone(self._handler._pipe_handle)
199
200    def test_cancel_io(self):
201        self._mock_setup_pipe_handler()
202
203        self._handler._cancel_io()
204
205        overlapped_structures = [self._handler._r_overlapped,
206                                 self._handler._w_overlapped]
207
208        self._ioutils.cancel_io.assert_has_calls(
209            [mock.call(self._handler._pipe_handle,
210                       overlapped_structure,
211                       ignore_invalid_handle=True)
212             for overlapped_structure in overlapped_structures])
213
214    @mock.patch.object(namedpipe.NamedPipeHandler, '_start_io_worker')
215    def test_read_from_pipe(self, mock_start_worker):
216        self._mock_setup_pipe_handler()
217
218        self._handler._read_from_pipe()
219
220        mock_start_worker.assert_called_once_with(
221            self._ioutils.read,
222            self._handler._r_buffer,
223            self._handler._r_overlapped,
224            self._handler._r_completion_routine)
225
226    @mock.patch.object(namedpipe.NamedPipeHandler, '_start_io_worker')
227    def test_write_to_pipe(self, mock_start_worker):
228        self._mock_setup_pipe_handler()
229
230        self._handler._write_to_pipe()
231
232        mock_start_worker.assert_called_once_with(
233            self._ioutils.write,
234            self._handler._w_buffer,
235            self._handler._w_overlapped,
236            self._handler._w_completion_routine,
237            self._handler._get_data_to_write)
238
239    @mock.patch.object(namedpipe.NamedPipeHandler, '_cleanup_handles')
240    def _test_start_io_worker(self, mock_cleanup_handles,
241                              buff_update_func=None, exception=None):
242        self._handler._stopped.isSet.side_effect = [False, True]
243        self._handler._pipe_handle = mock.sentinel.pipe_handle
244        self._handler.stop = mock.Mock()
245
246        io_func = mock.Mock(side_effect=exception)
247        fake_buffer = 'fake_buffer'
248
249        self._handler._start_io_worker(io_func, fake_buffer,
250                                       mock.sentinel.overlapped_structure,
251                                       mock.sentinel.completion_routine,
252                                       buff_update_func)
253
254        if buff_update_func:
255            num_bytes = buff_update_func()
256        else:
257            num_bytes = len(fake_buffer)
258
259        io_func.assert_called_once_with(mock.sentinel.pipe_handle,
260                                        fake_buffer, num_bytes,
261                                        mock.sentinel.overlapped_structure,
262                                        mock.sentinel.completion_routine)
263
264        if exception:
265            self._handler._stopped.set.assert_called_once_with()
266        mock_cleanup_handles.assert_called_once_with()
267
268    def test_start_io_worker(self):
269        self._test_start_io_worker()
270
271    def test_start_io_worker_with_buffer_update_method(self):
272        self._test_start_io_worker(buff_update_func=mock.Mock())
273
274    def test_start_io_worker_exception(self):
275        self._test_start_io_worker(exception=IOError)
276
277    @mock.patch.object(namedpipe.NamedPipeHandler, '_write_to_log')
278    def test_read_callback(self, mock_write_to_log):
279        self._mock_setup_pipe_handler()
280        fake_data = self._ioutils.get_buffer_data.return_value
281
282        self._handler._read_callback(mock.sentinel.num_bytes)
283
284        self._ioutils.get_buffer_data.assert_called_once_with(
285            self._handler._r_buffer, mock.sentinel.num_bytes)
286        self._mock_output_queue.put.assert_called_once_with(fake_data)
287        mock_write_to_log.assert_called_once_with(fake_data)
288
289    @mock.patch.object(namedpipe, 'time')
290    def test_get_data_to_write(self, mock_time):
291        self._mock_setup_pipe_handler()
292        self._handler._stopped.isSet.side_effect = [False, False]
293        self._mock_client_connected.isSet.side_effect = [False, True]
294        fake_data = 'fake input data'
295        self._mock_input_queue.get.return_value = fake_data
296
297        num_bytes = self._handler._get_data_to_write()
298
299        mock_time.sleep.assert_called_once_with(1)
300        self._ioutils.write_buffer_data.assert_called_once_with(
301            self._handler._w_buffer, fake_data)
302        self.assertEqual(len(fake_data), num_bytes)
303
304    @mock.patch.object(namedpipe.NamedPipeHandler, '_rotate_logs')
305    def _test_write_to_log(self, mock_rotate_logs, size_exceeded=False):
306        self._mock_setup_pipe_handler()
307        self._handler._stopped.isSet.return_value = False
308        fake_handle = self._handler._log_file_handle
309        fake_handle.tell.return_value = (constants.MAX_CONSOLE_LOG_FILE_SIZE
310                                         if size_exceeded else 0)
311        fake_data = 'fake_data'
312
313        self._handler._write_to_log(fake_data)
314
315        if size_exceeded:
316            mock_rotate_logs.assert_called_once_with()
317
318        self._handler._log_file_handle.write.assert_called_once_with(
319            fake_data)
320
321    def test_write_to_log(self):
322        self._test_write_to_log()
323
324    def test_write_to_log_size_exceeded(self):
325        self._test_write_to_log(size_exceeded=True)
326
327    def test_flush_log_file(self):
328        self._handler._log_file_handle = None
329        self._handler.flush_log_file()
330
331        self._handler._log_file_handle = mock.Mock()
332        self._handler.flush_log_file()
333
334        self._handler._log_file_handle.flush.side_effect = ValueError
335        self._handler.flush_log_file()
336
337    @mock.patch.object(namedpipe.NamedPipeHandler, '_retry_if_file_in_use')
338    @mock.patch.object(builtins, 'open')
339    @mock.patch.object(namedpipe, 'os')
340    def test_rotate_logs(self, mock_os, mock_open, mock_exec_retry):
341        fake_archived_log_path = self._FAKE_LOG_PATH + '.1'
342        mock_os.path.exists.return_value = True
343
344        self._mock_setup_pipe_handler()
345        fake_handle = self._handler._log_file_handle
346
347        self._handler._rotate_logs()
348
349        fake_handle.flush.assert_called_once_with()
350        fake_handle.close.assert_called_once_with()
351        mock_os.path.exists.assert_called_once_with(
352            fake_archived_log_path)
353
354        mock_exec_retry.assert_has_calls([mock.call(mock_os.remove,
355                                                    fake_archived_log_path),
356                                          mock.call(mock_os.rename,
357                                                    self._FAKE_LOG_PATH,
358                                                    fake_archived_log_path)])
359
360        mock_open.assert_called_once_with(self._FAKE_LOG_PATH, 'ab', 1)
361        self.assertEqual(mock_open.return_value,
362                         self._handler._log_file_handle)
363
364    @mock.patch.object(namedpipe, 'time')
365    def test_retry_if_file_in_use_exceeded_retries(self, mock_time):
366        class FakeWindowsException(Exception):
367            errno = errno.EACCES
368
369        raise_count = self._handler._MAX_LOG_ROTATE_RETRIES + 1
370        mock_func_side_eff = [FakeWindowsException] * raise_count
371        mock_func = mock.Mock(side_effect=mock_func_side_eff)
372
373        with mock.patch.object(namedpipe, 'WindowsError',
374                               FakeWindowsException, create=True):
375            self.assertRaises(FakeWindowsException,
376                              self._handler._retry_if_file_in_use,
377                              mock_func, mock.sentinel.arg)
378            mock_time.sleep.assert_has_calls(
379                [mock.call(1)] * self._handler._MAX_LOG_ROTATE_RETRIES)
380