1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require_relative '../grpc'
16
17# GRPC contains the General RPC module.
18module GRPC
19  # RpcDesc is a Descriptor of an RPC method.
20  class RpcDesc < Struct.new(:name, :input, :output, :marshal_method,
21                             :unmarshal_method)
22    include Core::StatusCodes
23
24    # Used to wrap a message class to indicate that it needs to be streamed.
25    class Stream
26      attr_accessor :type
27
28      def initialize(type)
29        @type = type
30      end
31    end
32
33    # @return [Proc] { |instance| marshalled(instance) }
34    def marshal_proc
35      proc { |o| o.class.send(marshal_method, o).to_s }
36    end
37
38    # @param [:input, :output] target determines whether to produce the an
39    #                          unmarshal Proc for the rpc input parameter or
40    #                          its output parameter
41    #
42    # @return [Proc] An unmarshal proc { |marshalled(instance)| instance }
43    def unmarshal_proc(target)
44      fail ArgumentError unless [:input, :output].include?(target)
45      unmarshal_class = send(target)
46      unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream
47      proc { |o| unmarshal_class.send(unmarshal_method, o) }
48    end
49
50    def handle_request_response(active_call, mth, inter_ctx)
51      req = active_call.read_unary_request
52      call = active_call.single_req_view
53
54      inter_ctx.intercept!(
55        :request_response,
56        method: mth,
57        call: call,
58        request: req
59      ) do
60        resp = mth.call(req, call)
61        active_call.server_unary_response(
62          resp,
63          trailing_metadata: active_call.output_metadata
64        )
65      end
66    end
67
68    def handle_client_streamer(active_call, mth, inter_ctx)
69      call = active_call.multi_req_view
70
71      inter_ctx.intercept!(
72        :client_streamer,
73        method: mth,
74        call: call
75      ) do
76        resp = mth.call(call)
77        active_call.server_unary_response(
78          resp,
79          trailing_metadata: active_call.output_metadata
80        )
81      end
82    end
83
84    def handle_server_streamer(active_call, mth, inter_ctx)
85      req = active_call.read_unary_request
86      call = active_call.single_req_view
87
88      inter_ctx.intercept!(
89        :server_streamer,
90        method: mth,
91        call: call,
92        request: req
93      ) do
94        replies = mth.call(req, call)
95        replies.each { |r| active_call.remote_send(r) }
96        send_status(active_call, OK, 'OK', active_call.output_metadata)
97      end
98    end
99
100    ##
101    # @param [GRPC::ActiveCall] active_call
102    # @param [Method] mth
103    # @param [Array<GRPC::InterceptionContext>] inter_ctx
104    #
105    def handle_bidi_streamer(active_call, mth, inter_ctx)
106      active_call.run_server_bidi(mth, inter_ctx)
107      send_status(active_call, OK, 'OK', active_call.output_metadata)
108    end
109
110    ##
111    # @param [GRPC::ActiveCall] active_call The current active call object
112    #   for the request
113    # @param [Method] mth The current RPC method being called
114    # @param [GRPC::InterceptionContext] inter_ctx The interception context
115    #   being executed
116    #
117    def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new)
118      # While a server method is running, it might be cancelled, its deadline
119      # might be reached, the handler could throw an unknown error, or a
120      # well-behaved handler could throw a StatusError.
121      if request_response?
122        handle_request_response(active_call, mth, inter_ctx)
123      elsif client_streamer?
124        handle_client_streamer(active_call, mth, inter_ctx)
125      elsif server_streamer?
126        handle_server_streamer(active_call, mth, inter_ctx)
127      else  # is a bidi_stream
128        handle_bidi_streamer(active_call, mth, inter_ctx)
129      end
130    rescue BadStatus => e
131      # this is raised by handlers that want GRPC to send an application error
132      # code and detail message and some additional app-specific metadata.
133      GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")
134      send_status(active_call, e.code, e.details, e.metadata)
135    rescue Core::CallError => e
136      # This is raised by GRPC internals but should rarely, if ever happen.
137      # Log it, but don't notify the other endpoint..
138      GRPC.logger.warn("failed call: #{active_call}\n#{e}")
139    rescue Core::OutOfTime
140      # This is raised when active_call#method.call exceeds the deadline
141      # event.  Send a status of deadline exceeded
142      GRPC.logger.warn("late call: #{active_call}")
143      send_status(active_call, DEADLINE_EXCEEDED, 'late')
144    rescue StandardError, NotImplementedError => e
145      # This will usuaally be an unhandled error in the handling code.
146      # Send back a UNKNOWN status to the client
147      #
148      # Note: this intentionally does not map NotImplementedError to
149      # UNIMPLEMENTED because NotImplementedError is intended for low-level
150      # OS interaction (e.g. syscalls) not supported by the current OS.
151      GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
152      GRPC.logger.warn(e)
153      send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}")
154    end
155
156    def assert_arity_matches(mth)
157      # A bidi handler function can optionally be passed a second
158      # call object parameter for access to metadata, cancelling, etc.
159      if bidi_streamer?
160        if mth.arity != 2 && mth.arity != 1
161          fail arity_error(mth, 2, "should be #{mth.name}(req, call) or " \
162            "#{mth.name}(req)")
163        end
164      elsif request_response? || server_streamer?
165        if mth.arity != 2
166          fail arity_error(mth, 2, "should be #{mth.name}(req, call)")
167        end
168      else
169        if mth.arity != 1
170          fail arity_error(mth, 1, "should be #{mth.name}(call)")
171        end
172      end
173    end
174
175    def request_response?
176      !input.is_a?(Stream) && !output.is_a?(Stream)
177    end
178
179    def client_streamer?
180      input.is_a?(Stream) && !output.is_a?(Stream)
181    end
182
183    def server_streamer?
184      !input.is_a?(Stream) && output.is_a?(Stream)
185    end
186
187    def bidi_streamer?
188      input.is_a?(Stream) && output.is_a?(Stream)
189    end
190
191    def arity_error(mth, want, msg)
192      "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
193    end
194
195    def send_status(active_client, code, details, metadata = {})
196      details = 'Not sure why' if details.nil?
197      GRPC.logger.debug("Sending status  #{code}:#{details}")
198      active_client.send_status(code, details, code == OK, metadata: metadata)
199    rescue StandardError => e
200      GRPC.logger.warn("Could not send status #{code}:#{details}")
201      GRPC.logger.warn(e)
202    end
203  end
204end
205