1# Copyright 2019 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from __future__ import division 16 17import datetime 18from concurrent import futures 19import unittest 20import time 21import threading 22import six 23 24import grpc 25from tests.unit.framework.common import test_constants 26 27_WAIT_FOR_BLOCKING = datetime.timedelta(seconds=1) 28 29 30def _block_on_waiting(server, termination_event, timeout=None): 31 server.start() 32 server.wait_for_termination(timeout=timeout) 33 termination_event.set() 34 35 36class ServerWaitForTerminationTest(unittest.TestCase): 37 38 def test_unblock_by_invoking_stop(self): 39 termination_event = threading.Event() 40 server = grpc.server(futures.ThreadPoolExecutor()) 41 42 wait_thread = threading.Thread(target=_block_on_waiting, 43 args=( 44 server, 45 termination_event, 46 )) 47 wait_thread.daemon = True 48 wait_thread.start() 49 time.sleep(_WAIT_FOR_BLOCKING.total_seconds()) 50 51 server.stop(None) 52 termination_event.wait(timeout=test_constants.SHORT_TIMEOUT) 53 self.assertTrue(termination_event.is_set()) 54 55 def test_unblock_by_del(self): 56 termination_event = threading.Event() 57 server = grpc.server(futures.ThreadPoolExecutor()) 58 59 wait_thread = threading.Thread(target=_block_on_waiting, 60 args=( 61 server, 62 termination_event, 63 )) 64 wait_thread.daemon = True 65 wait_thread.start() 66 time.sleep(_WAIT_FOR_BLOCKING.total_seconds()) 67 68 # Invoke manually here, in Python 2 it will be invoked by GC sometime. 69 server.__del__() 70 termination_event.wait(timeout=test_constants.SHORT_TIMEOUT) 71 self.assertTrue(termination_event.is_set()) 72 73 def test_unblock_by_timeout(self): 74 termination_event = threading.Event() 75 server = grpc.server(futures.ThreadPoolExecutor()) 76 77 wait_thread = threading.Thread(target=_block_on_waiting, 78 args=( 79 server, 80 termination_event, 81 test_constants.SHORT_TIMEOUT / 2, 82 )) 83 wait_thread.daemon = True 84 wait_thread.start() 85 86 termination_event.wait(timeout=test_constants.SHORT_TIMEOUT) 87 self.assertTrue(termination_event.is_set()) 88 89 90if __name__ == '__main__': 91 unittest.main(verbosity=2) 92