1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Reference implementation for health checking in gRPC Python."""
15
16import asyncio
17import collections
18from typing import MutableMapping
19
20import grpc
21from grpc_health.v1 import health_pb2 as _health_pb2
22from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc
23
24
25class HealthServicer(_health_pb2_grpc.HealthServicer):
26    """An AsyncIO implementation of health checking servicer."""
27    _server_status: MutableMapping[
28        str, '_health_pb2.HealthCheckResponse.ServingStatus']
29    _server_watchers: MutableMapping[str, asyncio.Condition]
30    _gracefully_shutting_down: bool
31
32    def __init__(self) -> None:
33        self._server_status = {"": _health_pb2.HealthCheckResponse.SERVING}
34        self._server_watchers = collections.defaultdict(asyncio.Condition)
35        self._gracefully_shutting_down = False
36
37    async def Check(self, request: _health_pb2.HealthCheckRequest,
38                    context) -> None:
39        status = self._server_status.get(request.service)
40
41        if status is None:
42            await context.abort(grpc.StatusCode.NOT_FOUND)
43        else:
44            return _health_pb2.HealthCheckResponse(status=status)
45
46    async def Watch(self, request: _health_pb2.HealthCheckRequest,
47                    context) -> None:
48        condition = self._server_watchers[request.service]
49        last_status = None
50        try:
51            async with condition:
52                while True:
53                    status = self._server_status.get(
54                        request.service,
55                        _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN)
56
57                    # NOTE(lidiz) If the observed status is the same, it means
58                    # there are missing intermediate statuses. It's considered
59                    # acceptable since peer only interested in eventual status.
60                    if status != last_status:
61                        # Responds with current health state
62                        await context.write(
63                            _health_pb2.HealthCheckResponse(status=status))
64
65                    # Records the last sent status
66                    last_status = status
67
68                    # Polling on health state changes
69                    await condition.wait()
70        finally:
71            if request.service in self._server_watchers:
72                del self._server_watchers[request.service]
73
74    async def _set(
75            self, service: str,
76            status: _health_pb2.HealthCheckResponse.ServingStatus) -> None:
77        if service in self._server_watchers:
78            condition = self._server_watchers.get(service)
79            async with condition:
80                self._server_status[service] = status
81                condition.notify_all()
82        else:
83            self._server_status[service] = status
84
85    async def set(
86            self, service: str,
87            status: _health_pb2.HealthCheckResponse.ServingStatus) -> None:
88        """Sets the status of a service.
89
90        Args:
91          service: string, the name of the service.
92          status: HealthCheckResponse.status enum value indicating the status of
93            the service
94        """
95        if self._gracefully_shutting_down:
96            return
97        else:
98            await self._set(service, status)
99
100    async def enter_graceful_shutdown(self) -> None:
101        """Permanently sets the status of all services to NOT_SERVING.
102
103        This should be invoked when the server is entering a graceful shutdown
104        period. After this method is invoked, future attempts to set the status
105        of a service will be ignored.
106        """
107        if self._gracefully_shutting_down:
108            return
109        else:
110            self._gracefully_shutting_down = True
111            for service in self._server_status:
112                await self._set(service,
113                                _health_pb2.HealthCheckResponse.NOT_SERVING)
114