1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Helpful utilities related to the stream module.""" 15 16import logging 17import threading 18 19from grpc.framework.foundation import stream 20 21_NO_VALUE = object() 22_LOGGER = logging.getLogger(__name__) 23 24 25class TransformingConsumer(stream.Consumer): 26 """A stream.Consumer that passes a transformation of its input to another.""" 27 28 def __init__(self, transformation, downstream): 29 self._transformation = transformation 30 self._downstream = downstream 31 32 def consume(self, value): 33 self._downstream.consume(self._transformation(value)) 34 35 def terminate(self): 36 self._downstream.terminate() 37 38 def consume_and_terminate(self, value): 39 self._downstream.consume_and_terminate(self._transformation(value)) 40 41 42class IterableConsumer(stream.Consumer): 43 """A Consumer that when iterated over emits the values it has consumed.""" 44 45 def __init__(self): 46 self._condition = threading.Condition() 47 self._values = [] 48 self._active = True 49 50 def consume(self, value): 51 with self._condition: 52 if self._active: 53 self._values.append(value) 54 self._condition.notify() 55 56 def terminate(self): 57 with self._condition: 58 self._active = False 59 self._condition.notify() 60 61 def consume_and_terminate(self, value): 62 with self._condition: 63 if self._active: 64 self._values.append(value) 65 self._active = False 66 self._condition.notify() 67 68 def __iter__(self): 69 return self 70 71 def __next__(self): 72 return self.next() 73 74 def next(self): 75 with self._condition: 76 while self._active and not self._values: 77 self._condition.wait() 78 if self._values: 79 return self._values.pop(0) 80 else: 81 raise StopIteration() 82 83 84class ThreadSwitchingConsumer(stream.Consumer): 85 """A Consumer decorator that affords serialization and asynchrony.""" 86 87 def __init__(self, sink, pool): 88 self._lock = threading.Lock() 89 self._sink = sink 90 self._pool = pool 91 # True if self._spin has been submitted to the pool to be called once and 92 # that call has not yet returned, False otherwise. 93 self._spinning = False 94 self._values = [] 95 self._active = True 96 97 def _spin(self, sink, value, terminate): 98 while True: 99 try: 100 if value is _NO_VALUE: 101 sink.terminate() 102 elif terminate: 103 sink.consume_and_terminate(value) 104 else: 105 sink.consume(value) 106 except Exception as e: # pylint:disable=broad-except 107 _LOGGER.exception(e) 108 109 with self._lock: 110 if terminate: 111 self._spinning = False 112 return 113 elif self._values: 114 value = self._values.pop(0) 115 terminate = not self._values and not self._active 116 elif not self._active: 117 value = _NO_VALUE 118 terminate = True 119 else: 120 self._spinning = False 121 return 122 123 def consume(self, value): 124 with self._lock: 125 if self._active: 126 if self._spinning: 127 self._values.append(value) 128 else: 129 self._pool.submit(self._spin, self._sink, value, False) 130 self._spinning = True 131 132 def terminate(self): 133 with self._lock: 134 if self._active: 135 self._active = False 136 if not self._spinning: 137 self._pool.submit(self._spin, self._sink, _NO_VALUE, True) 138 self._spinning = True 139 140 def consume_and_terminate(self, value): 141 with self._lock: 142 if self._active: 143 self._active = False 144 if self._spinning: 145 self._values.append(value) 146 else: 147 self._pool.submit(self._spin, self._sink, value, True) 148 self._spinning = True 149