1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import threading 16 17import grpc 18from grpc_testing import _common 19 20 21class State(_common.ChannelRpcHandler): 22 23 def __init__(self, invocation_metadata, requests, requests_closed): 24 self._condition = threading.Condition() 25 self._invocation_metadata = invocation_metadata 26 self._requests = requests 27 self._requests_closed = requests_closed 28 self._initial_metadata = None 29 self._responses = [] 30 self._trailing_metadata = None 31 self._code = None 32 self._details = None 33 34 def initial_metadata(self): 35 with self._condition: 36 while True: 37 if self._initial_metadata is None: 38 if self._code is None: 39 self._condition.wait() 40 else: 41 return _common.FUSSED_EMPTY_METADATA 42 else: 43 return self._initial_metadata 44 45 def add_request(self, request): 46 with self._condition: 47 if self._code is None and not self._requests_closed: 48 self._requests.append(request) 49 self._condition.notify_all() 50 return True 51 else: 52 return False 53 54 def close_requests(self): 55 with self._condition: 56 if self._code is None and not self._requests_closed: 57 self._requests_closed = True 58 self._condition.notify_all() 59 60 def take_response(self): 61 with self._condition: 62 while True: 63 if self._code is grpc.StatusCode.OK: 64 if self._responses: 65 response = self._responses.pop(0) 66 return _common.ChannelRpcRead(response, None, None, 67 None) 68 else: 69 return _common.ChannelRpcRead(None, 70 self._trailing_metadata, 71 grpc.StatusCode.OK, 72 self._details) 73 elif self._code is None: 74 if self._responses: 75 response = self._responses.pop(0) 76 return _common.ChannelRpcRead(response, None, None, 77 None) 78 else: 79 self._condition.wait() 80 else: 81 return _common.ChannelRpcRead(None, self._trailing_metadata, 82 self._code, self._details) 83 84 def termination(self): 85 with self._condition: 86 while True: 87 if self._code is None: 88 self._condition.wait() 89 else: 90 return self._trailing_metadata, self._code, self._details 91 92 def cancel(self, code, details): 93 with self._condition: 94 if self._code is None: 95 if self._initial_metadata is None: 96 self._initial_metadata = _common.FUSSED_EMPTY_METADATA 97 self._trailing_metadata = _common.FUSSED_EMPTY_METADATA 98 self._code = code 99 self._details = details 100 self._condition.notify_all() 101 return True 102 else: 103 return False 104 105 def take_invocation_metadata(self): 106 with self._condition: 107 if self._invocation_metadata is None: 108 raise ValueError('Expected invocation metadata!') 109 else: 110 invocation_metadata = self._invocation_metadata 111 self._invocation_metadata = None 112 return invocation_metadata 113 114 def take_invocation_metadata_and_request(self): 115 with self._condition: 116 if self._invocation_metadata is None: 117 raise ValueError('Expected invocation metadata!') 118 elif not self._requests: 119 raise ValueError('Expected at least one request!') 120 else: 121 invocation_metadata = self._invocation_metadata 122 self._invocation_metadata = None 123 return invocation_metadata, self._requests.pop(0) 124 125 def send_initial_metadata(self, initial_metadata): 126 with self._condition: 127 self._initial_metadata = _common.fuss_with_metadata( 128 initial_metadata) 129 self._condition.notify_all() 130 131 def take_request(self): 132 with self._condition: 133 while True: 134 if self._requests: 135 return self._requests.pop(0) 136 else: 137 self._condition.wait() 138 139 def requests_closed(self): 140 with self._condition: 141 while True: 142 if self._requests_closed: 143 return 144 else: 145 self._condition.wait() 146 147 def send_response(self, response): 148 with self._condition: 149 if self._code is None: 150 self._responses.append(response) 151 self._condition.notify_all() 152 153 def terminate_with_response(self, response, trailing_metadata, code, 154 details): 155 with self._condition: 156 if self._initial_metadata is None: 157 self._initial_metadata = _common.FUSSED_EMPTY_METADATA 158 self._responses.append(response) 159 self._trailing_metadata = _common.fuss_with_metadata( 160 trailing_metadata) 161 self._code = code 162 self._details = details 163 self._condition.notify_all() 164 165 def terminate(self, trailing_metadata, code, details): 166 with self._condition: 167 if self._initial_metadata is None: 168 self._initial_metadata = _common.FUSSED_EMPTY_METADATA 169 self._trailing_metadata = _common.fuss_with_metadata( 170 trailing_metadata) 171 self._code = code 172 self._details = details 173 self._condition.notify_all() 174 175 def cancelled(self): 176 with self._condition: 177 while True: 178 if self._code is grpc.StatusCode.CANCELLED: 179 return 180 elif self._code is None: 181 self._condition.wait() 182 else: 183 raise ValueError('Status code unexpectedly {}!'.format( 184 self._code)) 185 186 def is_active(self): 187 raise NotImplementedError() 188 189 def time_remaining(self): 190 raise NotImplementedError() 191 192 def add_callback(self, callback): 193 raise NotImplementedError() 194