1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Helpful utilities related to the stream module."""
15
16import logging
17import threading
18
19from grpc.framework.foundation import stream
20
21_NO_VALUE = object()
22_LOGGER = logging.getLogger(__name__)
23
24
25class TransformingConsumer(stream.Consumer):
26    """A stream.Consumer that passes a transformation of its input to another."""
27
28    def __init__(self, transformation, downstream):
29        self._transformation = transformation
30        self._downstream = downstream
31
32    def consume(self, value):
33        self._downstream.consume(self._transformation(value))
34
35    def terminate(self):
36        self._downstream.terminate()
37
38    def consume_and_terminate(self, value):
39        self._downstream.consume_and_terminate(self._transformation(value))
40
41
42class IterableConsumer(stream.Consumer):
43    """A Consumer that when iterated over emits the values it has consumed."""
44
45    def __init__(self):
46        self._condition = threading.Condition()
47        self._values = []
48        self._active = True
49
50    def consume(self, value):
51        with self._condition:
52            if self._active:
53                self._values.append(value)
54                self._condition.notify()
55
56    def terminate(self):
57        with self._condition:
58            self._active = False
59            self._condition.notify()
60
61    def consume_and_terminate(self, value):
62        with self._condition:
63            if self._active:
64                self._values.append(value)
65                self._active = False
66                self._condition.notify()
67
68    def __iter__(self):
69        return self
70
71    def __next__(self):
72        return self.next()
73
74    def next(self):
75        with self._condition:
76            while self._active and not self._values:
77                self._condition.wait()
78            if self._values:
79                return self._values.pop(0)
80            else:
81                raise StopIteration()
82
83
84class ThreadSwitchingConsumer(stream.Consumer):
85    """A Consumer decorator that affords serialization and asynchrony."""
86
87    def __init__(self, sink, pool):
88        self._lock = threading.Lock()
89        self._sink = sink
90        self._pool = pool
91        # True if self._spin has been submitted to the pool to be called once and
92        # that call has not yet returned, False otherwise.
93        self._spinning = False
94        self._values = []
95        self._active = True
96
97    def _spin(self, sink, value, terminate):
98        while True:
99            try:
100                if value is _NO_VALUE:
101                    sink.terminate()
102                elif terminate:
103                    sink.consume_and_terminate(value)
104                else:
105                    sink.consume(value)
106            except Exception as e:  # pylint:disable=broad-except
107                _LOGGER.exception(e)
108
109            with self._lock:
110                if terminate:
111                    self._spinning = False
112                    return
113                elif self._values:
114                    value = self._values.pop(0)
115                    terminate = not self._values and not self._active
116                elif not self._active:
117                    value = _NO_VALUE
118                    terminate = True
119                else:
120                    self._spinning = False
121                    return
122
123    def consume(self, value):
124        with self._lock:
125            if self._active:
126                if self._spinning:
127                    self._values.append(value)
128                else:
129                    self._pool.submit(self._spin, self._sink, value, False)
130                    self._spinning = True
131
132    def terminate(self):
133        with self._lock:
134            if self._active:
135                self._active = False
136                if not self._spinning:
137                    self._pool.submit(self._spin, self._sink, _NO_VALUE, True)
138                    self._spinning = True
139
140    def consume_and_terminate(self, value):
141        with self._lock:
142            if self._active:
143                self._active = False
144                if self._spinning:
145                    self._values.append(value)
146                else:
147                    self._pool.submit(self._spin, self._sink, value, True)
148                    self._spinning = True
149