1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Reference implementation for health checking in gRPC Python.""" 15 16import asyncio 17import collections 18from typing import MutableMapping 19 20import grpc 21from grpc_health.v1 import health_pb2 as _health_pb2 22from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc 23 24 25class HealthServicer(_health_pb2_grpc.HealthServicer): 26 """An AsyncIO implementation of health checking servicer.""" 27 _server_status: MutableMapping[ 28 str, '_health_pb2.HealthCheckResponse.ServingStatus'] 29 _server_watchers: MutableMapping[str, asyncio.Condition] 30 _gracefully_shutting_down: bool 31 32 def __init__(self) -> None: 33 self._server_status = {"": _health_pb2.HealthCheckResponse.SERVING} 34 self._server_watchers = collections.defaultdict(asyncio.Condition) 35 self._gracefully_shutting_down = False 36 37 async def Check(self, request: _health_pb2.HealthCheckRequest, 38 context) -> None: 39 status = self._server_status.get(request.service) 40 41 if status is None: 42 await context.abort(grpc.StatusCode.NOT_FOUND) 43 else: 44 return _health_pb2.HealthCheckResponse(status=status) 45 46 async def Watch(self, request: _health_pb2.HealthCheckRequest, 47 context) -> None: 48 condition = self._server_watchers[request.service] 49 last_status = None 50 try: 51 async with condition: 52 while True: 53 status = self._server_status.get( 54 request.service, 55 _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN) 56 57 # NOTE(lidiz) If the observed status is the same, it means 58 # there are missing intermediate statuses. It's considered 59 # acceptable since peer only interested in eventual status. 60 if status != last_status: 61 # Responds with current health state 62 await context.write( 63 _health_pb2.HealthCheckResponse(status=status)) 64 65 # Records the last sent status 66 last_status = status 67 68 # Polling on health state changes 69 await condition.wait() 70 finally: 71 if request.service in self._server_watchers: 72 del self._server_watchers[request.service] 73 74 async def _set( 75 self, service: str, 76 status: _health_pb2.HealthCheckResponse.ServingStatus) -> None: 77 if service in self._server_watchers: 78 condition = self._server_watchers.get(service) 79 async with condition: 80 self._server_status[service] = status 81 condition.notify_all() 82 else: 83 self._server_status[service] = status 84 85 async def set( 86 self, service: str, 87 status: _health_pb2.HealthCheckResponse.ServingStatus) -> None: 88 """Sets the status of a service. 89 90 Args: 91 service: string, the name of the service. 92 status: HealthCheckResponse.status enum value indicating the status of 93 the service 94 """ 95 if self._gracefully_shutting_down: 96 return 97 else: 98 await self._set(service, status) 99 100 async def enter_graceful_shutdown(self) -> None: 101 """Permanently sets the status of all services to NOT_SERVING. 102 103 This should be invoked when the server is entering a graceful shutdown 104 period. After this method is invoked, future attempts to set the status 105 of a service will be ignored. 106 """ 107 if self._gracefully_shutting_down: 108 return 109 else: 110 self._gracefully_shutting_down = True 111 for service in self._server_status: 112 await self._set(service, 113 _health_pb2.HealthCheckResponse.NOT_SERVING) 114