1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import threading
16
17import grpc
18from grpc_testing import _common
19
20
21class State(_common.ChannelRpcHandler):
22
23    def __init__(self, invocation_metadata, requests, requests_closed):
24        self._condition = threading.Condition()
25        self._invocation_metadata = invocation_metadata
26        self._requests = requests
27        self._requests_closed = requests_closed
28        self._initial_metadata = None
29        self._responses = []
30        self._trailing_metadata = None
31        self._code = None
32        self._details = None
33
34    def initial_metadata(self):
35        with self._condition:
36            while True:
37                if self._initial_metadata is None:
38                    if self._code is None:
39                        self._condition.wait()
40                    else:
41                        return _common.FUSSED_EMPTY_METADATA
42                else:
43                    return self._initial_metadata
44
45    def add_request(self, request):
46        with self._condition:
47            if self._code is None and not self._requests_closed:
48                self._requests.append(request)
49                self._condition.notify_all()
50                return True
51            else:
52                return False
53
54    def close_requests(self):
55        with self._condition:
56            if self._code is None and not self._requests_closed:
57                self._requests_closed = True
58                self._condition.notify_all()
59
60    def take_response(self):
61        with self._condition:
62            while True:
63                if self._code is grpc.StatusCode.OK:
64                    if self._responses:
65                        response = self._responses.pop(0)
66                        return _common.ChannelRpcRead(response, None, None,
67                                                      None)
68                    else:
69                        return _common.ChannelRpcRead(None,
70                                                      self._trailing_metadata,
71                                                      grpc.StatusCode.OK,
72                                                      self._details)
73                elif self._code is None:
74                    if self._responses:
75                        response = self._responses.pop(0)
76                        return _common.ChannelRpcRead(response, None, None,
77                                                      None)
78                    else:
79                        self._condition.wait()
80                else:
81                    return _common.ChannelRpcRead(None, self._trailing_metadata,
82                                                  self._code, self._details)
83
84    def termination(self):
85        with self._condition:
86            while True:
87                if self._code is None:
88                    self._condition.wait()
89                else:
90                    return self._trailing_metadata, self._code, self._details
91
92    def cancel(self, code, details):
93        with self._condition:
94            if self._code is None:
95                if self._initial_metadata is None:
96                    self._initial_metadata = _common.FUSSED_EMPTY_METADATA
97                self._trailing_metadata = _common.FUSSED_EMPTY_METADATA
98                self._code = code
99                self._details = details
100                self._condition.notify_all()
101                return True
102            else:
103                return False
104
105    def take_invocation_metadata(self):
106        with self._condition:
107            if self._invocation_metadata is None:
108                raise ValueError('Expected invocation metadata!')
109            else:
110                invocation_metadata = self._invocation_metadata
111                self._invocation_metadata = None
112                return invocation_metadata
113
114    def take_invocation_metadata_and_request(self):
115        with self._condition:
116            if self._invocation_metadata is None:
117                raise ValueError('Expected invocation metadata!')
118            elif not self._requests:
119                raise ValueError('Expected at least one request!')
120            else:
121                invocation_metadata = self._invocation_metadata
122                self._invocation_metadata = None
123                return invocation_metadata, self._requests.pop(0)
124
125    def send_initial_metadata(self, initial_metadata):
126        with self._condition:
127            self._initial_metadata = _common.fuss_with_metadata(
128                initial_metadata)
129            self._condition.notify_all()
130
131    def take_request(self):
132        with self._condition:
133            while True:
134                if self._requests:
135                    return self._requests.pop(0)
136                else:
137                    self._condition.wait()
138
139    def requests_closed(self):
140        with self._condition:
141            while True:
142                if self._requests_closed:
143                    return
144                else:
145                    self._condition.wait()
146
147    def send_response(self, response):
148        with self._condition:
149            if self._code is None:
150                self._responses.append(response)
151                self._condition.notify_all()
152
153    def terminate_with_response(self, response, trailing_metadata, code,
154                                details):
155        with self._condition:
156            if self._initial_metadata is None:
157                self._initial_metadata = _common.FUSSED_EMPTY_METADATA
158            self._responses.append(response)
159            self._trailing_metadata = _common.fuss_with_metadata(
160                trailing_metadata)
161            self._code = code
162            self._details = details
163            self._condition.notify_all()
164
165    def terminate(self, trailing_metadata, code, details):
166        with self._condition:
167            if self._initial_metadata is None:
168                self._initial_metadata = _common.FUSSED_EMPTY_METADATA
169            self._trailing_metadata = _common.fuss_with_metadata(
170                trailing_metadata)
171            self._code = code
172            self._details = details
173            self._condition.notify_all()
174
175    def cancelled(self):
176        with self._condition:
177            while True:
178                if self._code is grpc.StatusCode.CANCELLED:
179                    return
180                elif self._code is None:
181                    self._condition.wait()
182                else:
183                    raise ValueError('Status code unexpectedly {}!'.format(
184                        self._code))
185
186    def is_active(self):
187        raise NotImplementedError()
188
189    def time_remaining(self):
190        raise NotImplementedError()
191
192    def add_callback(self, callback):
193        raise NotImplementedError()
194