1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require_relative '../grpc' 16require_relative 'active_call' 17require_relative 'service' 18require 'thread' 19 20# GRPC contains the General RPC module. 21module GRPC 22 # Pool is a simple thread pool. 23 class Pool 24 # Default keep alive period is 1s 25 DEFAULT_KEEP_ALIVE = 1 26 27 def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) 28 fail 'pool size must be positive' unless size > 0 29 @jobs = Queue.new 30 @size = size 31 @stopped = false 32 @stop_mutex = Mutex.new # needs to be held when accessing @stopped 33 @stop_cond = ConditionVariable.new 34 @workers = [] 35 @keep_alive = keep_alive 36 37 # Each worker thread has its own queue to push and pull jobs 38 # these queues are put into @ready_queues when that worker is idle 39 @ready_workers = Queue.new 40 end 41 42 # Returns the number of jobs waiting 43 def jobs_waiting 44 @jobs.size 45 end 46 47 def ready_for_work? 48 # Busy worker threads are either doing work, or have a single job 49 # waiting on them. Workers that are idle with no jobs waiting 50 # have their "queues" in @ready_workers 51 !@ready_workers.empty? 52 end 53 54 # Runs the given block on the queue with the provided args. 55 # 56 # @param args the args passed blk when it is called 57 # @param blk the block to call 58 def schedule(*args, &blk) 59 return if blk.nil? 60 @stop_mutex.synchronize do 61 if @stopped 62 GRPC.logger.warn('did not schedule job, already stopped') 63 return 64 end 65 GRPC.logger.info('schedule another job') 66 fail 'No worker threads available' if @ready_workers.empty? 67 worker_queue = @ready_workers.pop 68 69 fail 'worker already has a task waiting' unless worker_queue.empty? 70 worker_queue << [blk, args] 71 end 72 end 73 74 # Starts running the jobs in the thread pool. 75 def start 76 @stop_mutex.synchronize do 77 fail 'already stopped' if @stopped 78 end 79 until @workers.size == @size.to_i 80 new_worker_queue = Queue.new 81 @ready_workers << new_worker_queue 82 next_thread = Thread.new(new_worker_queue) do |jobs| 83 catch(:exit) do # allows { throw :exit } to kill a thread 84 loop_execute_jobs(jobs) 85 end 86 remove_current_thread 87 end 88 @workers << next_thread 89 end 90 end 91 92 # Stops the jobs in the pool 93 def stop 94 GRPC.logger.info('stopping, will wait for all the workers to exit') 95 @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop 96 @stopped = true 97 loop do 98 break unless ready_for_work? 99 worker_queue = @ready_workers.pop 100 worker_queue << [proc { throw :exit }, []] 101 end 102 @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 103 end 104 forcibly_stop_workers 105 GRPC.logger.info('stopped, all workers are shutdown') 106 end 107 108 protected 109 110 # Forcibly shutdown any threads that are still alive. 111 def forcibly_stop_workers 112 return unless @workers.size > 0 113 GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)") 114 @workers.each do |t| 115 next unless t.alive? 116 begin 117 t.exit 118 rescue StandardError => e 119 GRPC.logger.warn('error while terminating a worker') 120 GRPC.logger.warn(e) 121 end 122 end 123 end 124 125 # removes the threads from workers, and signal when all the 126 # threads are complete. 127 def remove_current_thread 128 @stop_mutex.synchronize do 129 @workers.delete(Thread.current) 130 @stop_cond.signal if @workers.size.zero? 131 end 132 end 133 134 def loop_execute_jobs(worker_queue) 135 loop do 136 begin 137 blk, args = worker_queue.pop 138 blk.call(*args) 139 rescue StandardError, GRPC::Core::CallError => e 140 GRPC.logger.warn('Error in worker thread') 141 GRPC.logger.warn(e) 142 end 143 # there shouldn't be any work given to this thread while its busy 144 fail('received a task while busy') unless worker_queue.empty? 145 @stop_mutex.synchronize do 146 return if @stopped 147 @ready_workers << worker_queue 148 end 149 end 150 end 151 end 152 153 # RpcServer hosts a number of services and makes them available on the 154 # network. 155 class RpcServer 156 include Core::CallOps 157 include Core::TimeConsts 158 extend ::Forwardable 159 160 def_delegators :@server, :add_http2_port 161 162 # Default thread pool size is 30 163 DEFAULT_POOL_SIZE = 30 164 165 # Deprecated due to internal changes to the thread pool 166 DEFAULT_MAX_WAITING_REQUESTS = 20 167 168 # Default poll period is 1s 169 DEFAULT_POLL_PERIOD = 1 170 171 # Signal check period is 0.25s 172 SIGNAL_CHECK_PERIOD = 0.25 173 174 # setup_connect_md_proc is used by #initialize to validate the 175 # connect_md_proc. 176 def self.setup_connect_md_proc(a_proc) 177 return nil if a_proc.nil? 178 fail(TypeError, '!Proc') unless a_proc.is_a? Proc 179 a_proc 180 end 181 182 # Creates a new RpcServer. 183 # 184 # The RPC server is configured using keyword arguments. 185 # 186 # There are some specific keyword args used to configure the RpcServer 187 # instance. 188 # 189 # * pool_size: the size of the thread pool the server uses to run its 190 # threads. No more concurrent requests can be made than the size 191 # of the thread pool 192 # 193 # * max_waiting_requests: Deprecated due to internal changes to the thread 194 # pool. This is still an argument for compatibility but is ignored. 195 # 196 # * poll_period: The amount of time in seconds to wait for 197 # currently-serviced RPC's to finish before cancelling them when shutting 198 # down the server. 199 # 200 # * pool_keep_alive: The amount of time in seconds to wait 201 # for currently busy thread-pool threads to finish before 202 # forcing an abrupt exit to each thread. 203 # 204 # * connect_md_proc: 205 # when non-nil is a proc for determining metadata to send back the client 206 # on receiving an invocation req. The proc signature is: 207 # {key: val, ..} func(method_name, {key: val, ...}) 208 # 209 # * server_args: 210 # A server arguments hash to be passed down to the underlying core server 211 # 212 # * interceptors: 213 # An array of GRPC::ServerInterceptor objects that will be used for 214 # intercepting server handlers to provide extra functionality. 215 # Interceptors are an EXPERIMENTAL API. 216 # 217 def initialize(pool_size: DEFAULT_POOL_SIZE, 218 max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, 219 poll_period: DEFAULT_POLL_PERIOD, 220 pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE, 221 connect_md_proc: nil, 222 server_args: {}, 223 interceptors: []) 224 @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) 225 @max_waiting_requests = max_waiting_requests 226 @poll_period = poll_period 227 @pool_size = pool_size 228 @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive) 229 @run_cond = ConditionVariable.new 230 @run_mutex = Mutex.new 231 # running_state can take 4 values: :not_started, :running, :stopping, and 232 # :stopped. State transitions can only proceed in that order. 233 @running_state = :not_started 234 @server = Core::Server.new(server_args) 235 @interceptors = InterceptorRegistry.new(interceptors) 236 end 237 238 # stops a running server 239 # 240 # the call has no impact if the server is already stopped, otherwise 241 # server's current call loop is it's last. 242 def stop 243 # if called via run_till_terminated_or_interrupted, 244 # signal stop_server_thread and don't do anything 245 if @stop_server.nil? == false && @stop_server == false 246 @stop_server = true 247 @stop_server_cv.broadcast 248 return 249 end 250 @run_mutex.synchronize do 251 fail 'Cannot stop before starting' if @running_state == :not_started 252 return if @running_state != :running 253 transition_running_state(:stopping) 254 deadline = from_relative_time(@poll_period) 255 @server.shutdown_and_notify(deadline) 256 end 257 @pool.stop 258 end 259 260 def running_state 261 @run_mutex.synchronize do 262 return @running_state 263 end 264 end 265 266 # Can only be called while holding @run_mutex 267 def transition_running_state(target_state) 268 state_transitions = { 269 not_started: :running, 270 running: :stopping, 271 stopping: :stopped 272 } 273 if state_transitions[@running_state] == target_state 274 @running_state = target_state 275 else 276 fail "Bad server state transition: #{@running_state}->#{target_state}" 277 end 278 end 279 280 def running? 281 running_state == :running 282 end 283 284 def stopped? 285 running_state == :stopped 286 end 287 288 # Is called from other threads to wait for #run to start up the server. 289 # 290 # If run has not been called, this returns immediately. 291 # 292 # @param timeout [Numeric] number of seconds to wait 293 # @return [true, false] true if the server is running, false otherwise 294 def wait_till_running(timeout = nil) 295 @run_mutex.synchronize do 296 @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started 297 return @running_state == :running 298 end 299 end 300 301 # handle registration of classes 302 # 303 # service is either a class that includes GRPC::GenericService and whose 304 # #new function can be called without argument or any instance of such a 305 # class. 306 # 307 # E.g, after 308 # 309 # class Divider 310 # include GRPC::GenericService 311 # rpc :div DivArgs, DivReply # single request, single response 312 # def initialize(optional_arg='default option') # no args 313 # ... 314 # end 315 # 316 # srv = GRPC::RpcServer.new(...) 317 # 318 # # Either of these works 319 # 320 # srv.handle(Divider) 321 # 322 # # or 323 # 324 # srv.handle(Divider.new('replace optional arg')) 325 # 326 # It raises RuntimeError: 327 # - if service is not valid service class or object 328 # - its handler methods are already registered 329 # - if the server is already running 330 # 331 # @param service [Object|Class] a service class or object as described 332 # above 333 def handle(service) 334 @run_mutex.synchronize do 335 unless @running_state == :not_started 336 fail 'cannot add services if the server has been started' 337 end 338 cls = service.is_a?(Class) ? service : service.class 339 assert_valid_service_class(cls) 340 add_rpc_descs_for(service) 341 end 342 end 343 344 # runs the server 345 # 346 # - if no rpc_descs are registered, this exits immediately, otherwise it 347 # continues running permanently and does not return until program exit. 348 # 349 # - #running? returns true after this is called, until #stop cause the 350 # the server to stop. 351 def run 352 @run_mutex.synchronize do 353 fail 'cannot run without registering services' if rpc_descs.size.zero? 354 @pool.start 355 @server.start 356 transition_running_state(:running) 357 @run_cond.broadcast 358 end 359 loop_handle_server_calls 360 end 361 362 alias_method :run_till_terminated, :run 363 364 # runs the server with signal handlers 365 # @param signals 366 # List of String, Integer or both representing signals that the user 367 # would like to send to the server for graceful shutdown 368 # @param wait_interval (optional) 369 # Integer seconds that user would like stop_server_thread to poll 370 # stop_server 371 def run_till_terminated_or_interrupted(signals, wait_interval = 60) 372 @stop_server = false 373 @stop_server_mu = Mutex.new 374 @stop_server_cv = ConditionVariable.new 375 376 @stop_server_thread = Thread.new do 377 loop do 378 break if @stop_server 379 @stop_server_mu.synchronize do 380 @stop_server_cv.wait(@stop_server_mu, wait_interval) 381 end 382 end 383 384 # stop is surrounded by mutex, should handle multiple calls to stop 385 # correctly 386 stop 387 end 388 389 valid_signals = Signal.list 390 391 # register signal handlers 392 signals.each do |sig| 393 # input validation 394 target_sig = if sig.class == String 395 # cut out the SIG prefix to see if valid signal 396 sig.upcase.start_with?('SIG') ? sig.upcase[3..-1] : sig.upcase 397 else 398 sig 399 end 400 401 # register signal traps for all valid signals 402 if valid_signals.value?(target_sig) || valid_signals.key?(target_sig) 403 Signal.trap(target_sig) do 404 @stop_server = true 405 @stop_server_cv.broadcast 406 end 407 else 408 fail "#{target_sig} not a valid signal" 409 end 410 end 411 412 run 413 414 @stop_server_thread.join 415 end 416 417 # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs 418 def available?(an_rpc) 419 return an_rpc if @pool.ready_for_work? 420 GRPC.logger.warn('no free worker threads currently') 421 noop = proc { |x| x } 422 423 # Create a new active call that knows that metadata hasn't been 424 # sent yet 425 c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, 426 metadata_received: true, started: false) 427 c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, 428 'No free threads in thread pool') 429 nil 430 end 431 432 # Sends UNIMPLEMENTED if the method is not implemented by this server 433 def implemented?(an_rpc) 434 mth = an_rpc.method.to_sym 435 return an_rpc if rpc_descs.key?(mth) 436 GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") 437 noop = proc { |x| x } 438 439 # Create a new active call that knows that 440 # metadata hasn't been sent yet 441 c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, 442 metadata_received: true, started: false) 443 c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') 444 nil 445 end 446 447 # handles calls to the server 448 def loop_handle_server_calls 449 fail 'not started' if running_state == :not_started 450 while running_state == :running 451 begin 452 an_rpc = @server.request_call 453 break if (!an_rpc.nil?) && an_rpc.call.nil? 454 active_call = new_active_server_call(an_rpc) 455 unless active_call.nil? 456 @pool.schedule(active_call) do |ac| 457 c, mth = ac 458 begin 459 rpc_descs[mth].run_server_method( 460 c, 461 rpc_handlers[mth], 462 @interceptors.build_context 463 ) 464 rescue StandardError 465 c.send_status(GRPC::Core::StatusCodes::INTERNAL, 466 'Server handler failed') 467 end 468 end 469 end 470 rescue Core::CallError, RuntimeError => e 471 # these might happen for various reasons. The correct behavior of 472 # the server is to log them and continue, if it's not shutting down. 473 if running_state == :running 474 GRPC.logger.warn("server call failed: #{e}") 475 end 476 next 477 end 478 end 479 # @running_state should be :stopping here 480 @run_mutex.synchronize do 481 transition_running_state(:stopped) 482 GRPC.logger.info("stopped: #{self}") 483 @server.close 484 end 485 end 486 487 def new_active_server_call(an_rpc) 488 return nil if an_rpc.nil? || an_rpc.call.nil? 489 490 # allow the metadata to be accessed from the call 491 an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers 492 connect_md = nil 493 unless @connect_md_proc.nil? 494 connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) 495 end 496 497 return nil unless available?(an_rpc) 498 return nil unless implemented?(an_rpc) 499 500 # Create the ActiveCall. Indicate that metadata hasnt been sent yet. 501 GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") 502 rpc_desc = rpc_descs[an_rpc.method.to_sym] 503 c = ActiveCall.new(an_rpc.call, 504 rpc_desc.marshal_proc, 505 rpc_desc.unmarshal_proc(:input), 506 an_rpc.deadline, 507 metadata_received: true, 508 started: false, 509 metadata_to_send: connect_md) 510 c.attach_peer_cert(an_rpc.call.peer_cert) 511 mth = an_rpc.method.to_sym 512 [c, mth] 513 end 514 515 protected 516 517 def rpc_descs 518 @rpc_descs ||= {} 519 end 520 521 def rpc_handlers 522 @rpc_handlers ||= {} 523 end 524 525 def assert_valid_service_class(cls) 526 unless cls.include?(GenericService) 527 fail "#{cls} must 'include GenericService'" 528 end 529 fail "#{cls} should specify some rpc descriptions" if 530 cls.rpc_descs.size.zero? 531 end 532 533 # This should be called while holding @run_mutex 534 def add_rpc_descs_for(service) 535 cls = service.is_a?(Class) ? service : service.class 536 specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {}) 537 cls.rpc_descs.each_pair do |name, spec| 538 route = "/#{cls.service_name}/#{name}".to_sym 539 fail "already registered: rpc #{route} from #{spec}" if specs.key? route 540 specs[route] = spec 541 rpc_name = GenericService.underscore(name.to_s).to_sym 542 if service.is_a?(Class) 543 handlers[route] = cls.new.method(rpc_name) 544 else 545 handlers[route] = service.method(rpc_name) 546 end 547 GRPC.logger.info("handling #{route} with #{handlers[route]}") 548 end 549 end 550 end 551end 552