1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require_relative '../grpc'
16require_relative 'active_call'
17require_relative 'service'
18require 'thread'
19
20# GRPC contains the General RPC module.
21module GRPC
22  # Pool is a simple thread pool.
23  class Pool
24    # Default keep alive period is 1s
25    DEFAULT_KEEP_ALIVE = 1
26
27    def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
28      fail 'pool size must be positive' unless size > 0
29      @jobs = Queue.new
30      @size = size
31      @stopped = false
32      @stop_mutex = Mutex.new # needs to be held when accessing @stopped
33      @stop_cond = ConditionVariable.new
34      @workers = []
35      @keep_alive = keep_alive
36
37      # Each worker thread has its own queue to push and pull jobs
38      # these queues are put into @ready_queues when that worker is idle
39      @ready_workers = Queue.new
40    end
41
42    # Returns the number of jobs waiting
43    def jobs_waiting
44      @jobs.size
45    end
46
47    def ready_for_work?
48      # Busy worker threads are either doing work, or have a single job
49      # waiting on them. Workers that are idle with no jobs waiting
50      # have their "queues" in @ready_workers
51      !@ready_workers.empty?
52    end
53
54    # Runs the given block on the queue with the provided args.
55    #
56    # @param args the args passed blk when it is called
57    # @param blk the block to call
58    def schedule(*args, &blk)
59      return if blk.nil?
60      @stop_mutex.synchronize do
61        if @stopped
62          GRPC.logger.warn('did not schedule job, already stopped')
63          return
64        end
65        GRPC.logger.info('schedule another job')
66        fail 'No worker threads available' if @ready_workers.empty?
67        worker_queue = @ready_workers.pop
68
69        fail 'worker already has a task waiting' unless worker_queue.empty?
70        worker_queue << [blk, args]
71      end
72    end
73
74    # Starts running the jobs in the thread pool.
75    def start
76      @stop_mutex.synchronize do
77        fail 'already stopped' if @stopped
78      end
79      until @workers.size == @size.to_i
80        new_worker_queue = Queue.new
81        @ready_workers << new_worker_queue
82        next_thread = Thread.new(new_worker_queue) do |jobs|
83          catch(:exit) do  # allows { throw :exit } to kill a thread
84            loop_execute_jobs(jobs)
85          end
86          remove_current_thread
87        end
88        @workers << next_thread
89      end
90    end
91
92    # Stops the jobs in the pool
93    def stop
94      GRPC.logger.info('stopping, will wait for all the workers to exit')
95      @stop_mutex.synchronize do  # wait @keep_alive seconds for workers to stop
96        @stopped = true
97        loop do
98          break unless ready_for_work?
99          worker_queue = @ready_workers.pop
100          worker_queue << [proc { throw :exit }, []]
101        end
102        @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
103      end
104      forcibly_stop_workers
105      GRPC.logger.info('stopped, all workers are shutdown')
106    end
107
108    protected
109
110    # Forcibly shutdown any threads that are still alive.
111    def forcibly_stop_workers
112      return unless @workers.size > 0
113      GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
114      @workers.each do |t|
115        next unless t.alive?
116        begin
117          t.exit
118        rescue StandardError => e
119          GRPC.logger.warn('error while terminating a worker')
120          GRPC.logger.warn(e)
121        end
122      end
123    end
124
125    # removes the threads from workers, and signal when all the
126    # threads are complete.
127    def remove_current_thread
128      @stop_mutex.synchronize do
129        @workers.delete(Thread.current)
130        @stop_cond.signal if @workers.size.zero?
131      end
132    end
133
134    def loop_execute_jobs(worker_queue)
135      loop do
136        begin
137          blk, args = worker_queue.pop
138          blk.call(*args)
139        rescue StandardError, GRPC::Core::CallError => e
140          GRPC.logger.warn('Error in worker thread')
141          GRPC.logger.warn(e)
142        end
143        # there shouldn't be any work given to this thread while its busy
144        fail('received a task while busy') unless worker_queue.empty?
145        @stop_mutex.synchronize do
146          return if @stopped
147          @ready_workers << worker_queue
148        end
149      end
150    end
151  end
152
153  # RpcServer hosts a number of services and makes them available on the
154  # network.
155  class RpcServer
156    include Core::CallOps
157    include Core::TimeConsts
158    extend ::Forwardable
159
160    def_delegators :@server, :add_http2_port
161
162    # Default thread pool size is 30
163    DEFAULT_POOL_SIZE = 30
164
165    # Deprecated due to internal changes to the thread pool
166    DEFAULT_MAX_WAITING_REQUESTS = 20
167
168    # Default poll period is 1s
169    DEFAULT_POLL_PERIOD = 1
170
171    # Signal check period is 0.25s
172    SIGNAL_CHECK_PERIOD = 0.25
173
174    # setup_connect_md_proc is used by #initialize to validate the
175    # connect_md_proc.
176    def self.setup_connect_md_proc(a_proc)
177      return nil if a_proc.nil?
178      fail(TypeError, '!Proc') unless a_proc.is_a? Proc
179      a_proc
180    end
181
182    # Creates a new RpcServer.
183    #
184    # The RPC server is configured using keyword arguments.
185    #
186    # There are some specific keyword args used to configure the RpcServer
187    # instance.
188    #
189    # * pool_size: the size of the thread pool the server uses to run its
190    # threads. No more concurrent requests can be made than the size
191    # of the thread pool
192    #
193    # * max_waiting_requests: Deprecated due to internal changes to the thread
194    # pool. This is still an argument for compatibility but is ignored.
195    #
196    # * poll_period: The amount of time in seconds to wait for
197    # currently-serviced RPC's to finish before cancelling them when shutting
198    # down the server.
199    #
200    # * pool_keep_alive: The amount of time in seconds to wait
201    # for currently busy thread-pool threads to finish before
202    # forcing an abrupt exit to each thread.
203    #
204    # * connect_md_proc:
205    # when non-nil is a proc for determining metadata to send back the client
206    # on receiving an invocation req.  The proc signature is:
207    #   {key: val, ..} func(method_name, {key: val, ...})
208    #
209    # * server_args:
210    # A server arguments hash to be passed down to the underlying core server
211    #
212    # * interceptors:
213    # An array of GRPC::ServerInterceptor objects that will be used for
214    # intercepting server handlers to provide extra functionality.
215    # Interceptors are an EXPERIMENTAL API.
216    #
217    def initialize(pool_size: DEFAULT_POOL_SIZE,
218                   max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS,
219                   poll_period: DEFAULT_POLL_PERIOD,
220                   pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE,
221                   connect_md_proc: nil,
222                   server_args: {},
223                   interceptors: [])
224      @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
225      @max_waiting_requests = max_waiting_requests
226      @poll_period = poll_period
227      @pool_size = pool_size
228      @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive)
229      @run_cond = ConditionVariable.new
230      @run_mutex = Mutex.new
231      # running_state can take 4 values: :not_started, :running, :stopping, and
232      # :stopped. State transitions can only proceed in that order.
233      @running_state = :not_started
234      @server = Core::Server.new(server_args)
235      @interceptors = InterceptorRegistry.new(interceptors)
236    end
237
238    # stops a running server
239    #
240    # the call has no impact if the server is already stopped, otherwise
241    # server's current call loop is it's last.
242    def stop
243      # if called via run_till_terminated_or_interrupted,
244      #   signal stop_server_thread and don't do anything
245      if @stop_server.nil? == false && @stop_server == false
246        @stop_server = true
247        @stop_server_cv.broadcast
248        return
249      end
250      @run_mutex.synchronize do
251        fail 'Cannot stop before starting' if @running_state == :not_started
252        return if @running_state != :running
253        transition_running_state(:stopping)
254        deadline = from_relative_time(@poll_period)
255        @server.shutdown_and_notify(deadline)
256      end
257      @pool.stop
258    end
259
260    def running_state
261      @run_mutex.synchronize do
262        return @running_state
263      end
264    end
265
266    # Can only be called while holding @run_mutex
267    def transition_running_state(target_state)
268      state_transitions = {
269        not_started: :running,
270        running: :stopping,
271        stopping: :stopped
272      }
273      if state_transitions[@running_state] == target_state
274        @running_state = target_state
275      else
276        fail "Bad server state transition: #{@running_state}->#{target_state}"
277      end
278    end
279
280    def running?
281      running_state == :running
282    end
283
284    def stopped?
285      running_state == :stopped
286    end
287
288    # Is called from other threads to wait for #run to start up the server.
289    #
290    # If run has not been called, this returns immediately.
291    #
292    # @param timeout [Numeric] number of seconds to wait
293    # @return [true, false] true if the server is running, false otherwise
294    def wait_till_running(timeout = nil)
295      @run_mutex.synchronize do
296        @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
297        return @running_state == :running
298      end
299    end
300
301    # handle registration of classes
302    #
303    # service is either a class that includes GRPC::GenericService and whose
304    # #new function can be called without argument or any instance of such a
305    # class.
306    #
307    # E.g, after
308    #
309    # class Divider
310    #   include GRPC::GenericService
311    #   rpc :div DivArgs, DivReply    # single request, single response
312    #   def initialize(optional_arg='default option') # no args
313    #     ...
314    #   end
315    #
316    # srv = GRPC::RpcServer.new(...)
317    #
318    # # Either of these works
319    #
320    # srv.handle(Divider)
321    #
322    # # or
323    #
324    # srv.handle(Divider.new('replace optional arg'))
325    #
326    # It raises RuntimeError:
327    # - if service is not valid service class or object
328    # - its handler methods are already registered
329    # - if the server is already running
330    #
331    # @param service [Object|Class] a service class or object as described
332    #        above
333    def handle(service)
334      @run_mutex.synchronize do
335        unless @running_state == :not_started
336          fail 'cannot add services if the server has been started'
337        end
338        cls = service.is_a?(Class) ? service : service.class
339        assert_valid_service_class(cls)
340        add_rpc_descs_for(service)
341      end
342    end
343
344    # runs the server
345    #
346    # - if no rpc_descs are registered, this exits immediately, otherwise it
347    #   continues running permanently and does not return until program exit.
348    #
349    # - #running? returns true after this is called, until #stop cause the
350    #   the server to stop.
351    def run
352      @run_mutex.synchronize do
353        fail 'cannot run without registering services' if rpc_descs.size.zero?
354        @pool.start
355        @server.start
356        transition_running_state(:running)
357        @run_cond.broadcast
358      end
359      loop_handle_server_calls
360    end
361
362    alias_method :run_till_terminated, :run
363
364    # runs the server with signal handlers
365    # @param signals
366    #     List of String, Integer or both representing signals that the user
367    #     would like to send to the server for graceful shutdown
368    # @param wait_interval (optional)
369    #     Integer seconds that user would like stop_server_thread to poll
370    #     stop_server
371    def run_till_terminated_or_interrupted(signals, wait_interval = 60)
372      @stop_server = false
373      @stop_server_mu = Mutex.new
374      @stop_server_cv = ConditionVariable.new
375
376      @stop_server_thread = Thread.new do
377        loop do
378          break if @stop_server
379          @stop_server_mu.synchronize do
380            @stop_server_cv.wait(@stop_server_mu, wait_interval)
381          end
382        end
383
384        # stop is surrounded by mutex, should handle multiple calls to stop
385        #   correctly
386        stop
387      end
388
389      valid_signals = Signal.list
390
391      # register signal handlers
392      signals.each do |sig|
393        # input validation
394        target_sig = if sig.class == String
395                       # cut out the SIG prefix to see if valid signal
396                       sig.upcase.start_with?('SIG') ? sig.upcase[3..-1] : sig.upcase
397                     else
398                       sig
399                     end
400
401        # register signal traps for all valid signals
402        if valid_signals.value?(target_sig) || valid_signals.key?(target_sig)
403          Signal.trap(target_sig) do
404            @stop_server = true
405            @stop_server_cv.broadcast
406          end
407        else
408          fail "#{target_sig} not a valid signal"
409        end
410      end
411
412      run
413
414      @stop_server_thread.join
415    end
416
417    # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
418    def available?(an_rpc)
419      return an_rpc if @pool.ready_for_work?
420      GRPC.logger.warn('no free worker threads currently')
421      noop = proc { |x| x }
422
423      # Create a new active call that knows that metadata hasn't been
424      # sent yet
425      c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
426                         metadata_received: true, started: false)
427      c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED,
428                    'No free threads in thread pool')
429      nil
430    end
431
432    # Sends UNIMPLEMENTED if the method is not implemented by this server
433    def implemented?(an_rpc)
434      mth = an_rpc.method.to_sym
435      return an_rpc if rpc_descs.key?(mth)
436      GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
437      noop = proc { |x| x }
438
439      # Create a new active call that knows that
440      # metadata hasn't been sent yet
441      c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
442                         metadata_received: true, started: false)
443      c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
444      nil
445    end
446
447    # handles calls to the server
448    def loop_handle_server_calls
449      fail 'not started' if running_state == :not_started
450      while running_state == :running
451        begin
452          an_rpc = @server.request_call
453          break if (!an_rpc.nil?) && an_rpc.call.nil?
454          active_call = new_active_server_call(an_rpc)
455          unless active_call.nil?
456            @pool.schedule(active_call) do |ac|
457              c, mth = ac
458              begin
459                rpc_descs[mth].run_server_method(
460                  c,
461                  rpc_handlers[mth],
462                  @interceptors.build_context
463                )
464              rescue StandardError
465                c.send_status(GRPC::Core::StatusCodes::INTERNAL,
466                              'Server handler failed')
467              end
468            end
469          end
470        rescue Core::CallError, RuntimeError => e
471          # these might happen for various reasons.  The correct behavior of
472          # the server is to log them and continue, if it's not shutting down.
473          if running_state == :running
474            GRPC.logger.warn("server call failed: #{e}")
475          end
476          next
477        end
478      end
479      # @running_state should be :stopping here
480      @run_mutex.synchronize do
481        transition_running_state(:stopped)
482        GRPC.logger.info("stopped: #{self}")
483        @server.close
484      end
485    end
486
487    def new_active_server_call(an_rpc)
488      return nil if an_rpc.nil? || an_rpc.call.nil?
489
490      # allow the metadata to be accessed from the call
491      an_rpc.call.metadata = an_rpc.metadata  # attaches md to call for handlers
492      connect_md = nil
493      unless @connect_md_proc.nil?
494        connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
495      end
496
497      return nil unless available?(an_rpc)
498      return nil unless implemented?(an_rpc)
499
500      # Create the ActiveCall. Indicate that metadata hasnt been sent yet.
501      GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
502      rpc_desc = rpc_descs[an_rpc.method.to_sym]
503      c = ActiveCall.new(an_rpc.call,
504                         rpc_desc.marshal_proc,
505                         rpc_desc.unmarshal_proc(:input),
506                         an_rpc.deadline,
507                         metadata_received: true,
508                         started: false,
509                         metadata_to_send: connect_md)
510      c.attach_peer_cert(an_rpc.call.peer_cert)
511      mth = an_rpc.method.to_sym
512      [c, mth]
513    end
514
515    protected
516
517    def rpc_descs
518      @rpc_descs ||= {}
519    end
520
521    def rpc_handlers
522      @rpc_handlers ||= {}
523    end
524
525    def assert_valid_service_class(cls)
526      unless cls.include?(GenericService)
527        fail "#{cls} must 'include GenericService'"
528      end
529      fail "#{cls} should specify some rpc descriptions" if
530        cls.rpc_descs.size.zero?
531    end
532
533    # This should be called while holding @run_mutex
534    def add_rpc_descs_for(service)
535      cls = service.is_a?(Class) ? service : service.class
536      specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
537      cls.rpc_descs.each_pair do |name, spec|
538        route = "/#{cls.service_name}/#{name}".to_sym
539        fail "already registered: rpc #{route} from #{spec}" if specs.key? route
540        specs[route] = spec
541        rpc_name = GenericService.underscore(name.to_s).to_sym
542        if service.is_a?(Class)
543          handlers[route] = cls.new.method(rpc_name)
544        else
545          handlers[route] = service.method(rpc_name)
546        end
547        GRPC.logger.info("handling #{route} with #{handlers[route]}")
548      end
549    end
550  end
551end
552