1# Copyright 2019 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import division
16
17import datetime
18from concurrent import futures
19import unittest
20import time
21import threading
22import six
23
24import grpc
25from tests.unit.framework.common import test_constants
26
27_WAIT_FOR_BLOCKING = datetime.timedelta(seconds=1)
28
29
30def _block_on_waiting(server, termination_event, timeout=None):
31    server.start()
32    server.wait_for_termination(timeout=timeout)
33    termination_event.set()
34
35
36class ServerWaitForTerminationTest(unittest.TestCase):
37
38    def test_unblock_by_invoking_stop(self):
39        termination_event = threading.Event()
40        server = grpc.server(futures.ThreadPoolExecutor())
41
42        wait_thread = threading.Thread(target=_block_on_waiting,
43                                       args=(
44                                           server,
45                                           termination_event,
46                                       ))
47        wait_thread.daemon = True
48        wait_thread.start()
49        time.sleep(_WAIT_FOR_BLOCKING.total_seconds())
50
51        server.stop(None)
52        termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
53        self.assertTrue(termination_event.is_set())
54
55    def test_unblock_by_del(self):
56        termination_event = threading.Event()
57        server = grpc.server(futures.ThreadPoolExecutor())
58
59        wait_thread = threading.Thread(target=_block_on_waiting,
60                                       args=(
61                                           server,
62                                           termination_event,
63                                       ))
64        wait_thread.daemon = True
65        wait_thread.start()
66        time.sleep(_WAIT_FOR_BLOCKING.total_seconds())
67
68        # Invoke manually here, in Python 2 it will be invoked by GC sometime.
69        server.__del__()
70        termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
71        self.assertTrue(termination_event.is_set())
72
73    def test_unblock_by_timeout(self):
74        termination_event = threading.Event()
75        server = grpc.server(futures.ThreadPoolExecutor())
76
77        wait_thread = threading.Thread(target=_block_on_waiting,
78                                       args=(
79                                           server,
80                                           termination_event,
81                                           test_constants.SHORT_TIMEOUT / 2,
82                                       ))
83        wait_thread.daemon = True
84        wait_thread.start()
85
86        termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
87        self.assertTrue(termination_event.is_set())
88
89
90if __name__ == '__main__':
91    unittest.main(verbosity=2)
92