1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require_relative 'active_call'
16require_relative '../version'
17
18# GRPC contains the General RPC module.
19module GRPC
20  # rubocop:disable Metrics/ParameterLists
21
22  # ClientStub represents a client connection to a gRPC server, and can be used
23  # to send requests.
24  class ClientStub
25    include Core::StatusCodes
26    include Core::TimeConsts
27
28    # Default timeout is infinity.
29    DEFAULT_TIMEOUT = INFINITE_FUTURE
30
31    # setup_channel is used by #initialize to constuct a channel from its
32    # arguments.
33    def self.setup_channel(alt_chan, host, creds, channel_args = {})
34      unless alt_chan.nil?
35        fail(TypeError, '!Channel') unless alt_chan.is_a?(Core::Channel)
36        return alt_chan
37      end
38      if channel_args['grpc.primary_user_agent'].nil?
39        channel_args['grpc.primary_user_agent'] = ''
40      else
41        channel_args['grpc.primary_user_agent'] += ' '
42      end
43      channel_args['grpc.primary_user_agent'] += "grpc-ruby/#{VERSION}"
44      unless creds.is_a?(Core::ChannelCredentials) || creds.is_a?(Symbol)
45        fail(TypeError, '!ChannelCredentials or Symbol')
46      end
47      Core::Channel.new(host, channel_args, creds)
48    end
49
50    # Allows users of the stub to modify the propagate mask.
51    #
52    # This is an advanced feature for use when making calls to another gRPC
53    # server whilst running in the handler of an existing one.
54    attr_writer :propagate_mask
55
56    # Creates a new ClientStub.
57    #
58    # Minimally, a stub is created with the just the host of the gRPC service
59    # it wishes to access, e.g.,
60    #
61    #   my_stub = ClientStub.new(example.host.com:50505,
62    #                            :this_channel_is_insecure)
63    #
64    # If a channel_override argument is passed, it will be used as the
65    # underlying channel. Otherwise, the channel_args argument will be used
66    # to construct a new underlying channel.
67    #
68    # There are some specific keyword args that are not used to configure the
69    # channel:
70    #
71    # - :channel_override
72    # when present, this must be a pre-created GRPC::Core::Channel.  If it's
73    # present the host and arbitrary keyword arg areignored, and the RPC
74    # connection uses this channel.
75    #
76    # - :timeout
77    # when present, this is the default timeout used for calls
78    #
79    # @param host [String] the host the stub connects to
80    # @param creds [Core::ChannelCredentials|Symbol] the channel credentials, or
81    #     :this_channel_is_insecure, which explicitly indicates that the client
82    #     should be created with an insecure connection. Note: this argument is
83    #     ignored if the channel_override argument is provided.
84    # @param channel_override [Core::Channel] a pre-created channel
85    # @param timeout [Number] the default timeout to use in requests
86    # @param propagate_mask [Number] A bitwise combination of flags in
87    #     GRPC::Core::PropagateMasks. Indicates how data should be propagated
88    #     from parent server calls to child client calls if this client is being
89    #     used within a gRPC server.
90    # @param channel_args [Hash] the channel arguments. Note: this argument is
91    #     ignored if the channel_override argument is provided.
92    # @param interceptors [Array<GRPC::ClientInterceptor>] An array of
93    #     GRPC::ClientInterceptor objects that will be used for
94    #     intercepting calls before they are executed
95    #     Interceptors are an EXPERIMENTAL API.
96    def initialize(host, creds,
97                   channel_override: nil,
98                   timeout: nil,
99                   propagate_mask: nil,
100                   channel_args: {},
101                   interceptors: [])
102      @ch = ClientStub.setup_channel(channel_override, host, creds,
103                                     channel_args)
104      alt_host = channel_args[Core::Channel::SSL_TARGET]
105      @host = alt_host.nil? ? host : alt_host
106      @propagate_mask = propagate_mask
107      @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
108      @interceptors = InterceptorRegistry.new(interceptors)
109    end
110
111    # request_response sends a request to a GRPC server, and returns the
112    # response.
113    #
114    # == Flow Control ==
115    # This is a blocking call.
116    #
117    # * it does not return until a response is received.
118    #
119    # * the requests is sent only when GRPC core's flow control allows it to
120    #   be sent.
121    #
122    # == Errors ==
123    # An RuntimeError is raised if
124    #
125    # * the server responds with a non-OK status
126    #
127    # * the deadline is exceeded
128    #
129    # == Return Value ==
130    #
131    # If return_op is false, the call returns the response
132    #
133    # If return_op is true, the call returns an Operation, calling execute
134    # on the Operation returns the response.
135    #
136    # @param method [String] the RPC method to call on the GRPC server
137    # @param req [Object] the request sent to the server
138    # @param marshal [Function] f(obj)->string that marshals requests
139    # @param unmarshal [Function] f(string)->obj that unmarshals responses
140    # @param deadline [Time] (optional) the time the request should complete
141    # @param return_op [true|false] return an Operation if true
142    # @param parent [Core::Call] a prior call whose reserved metadata
143    #   will be propagated by this one.
144    # @param credentials [Core::CallCredentials] credentials to use when making
145    #   the call
146    # @param metadata [Hash] metadata to be sent to the server
147    # @return [Object] the response received from the server
148    def request_response(method, req, marshal, unmarshal,
149                         deadline: nil,
150                         return_op: false,
151                         parent: nil,
152                         credentials: nil,
153                         metadata: {})
154      c = new_active_call(method, marshal, unmarshal,
155                          deadline: deadline,
156                          parent: parent,
157                          credentials: credentials)
158      interception_context = @interceptors.build_context
159      intercept_args = {
160        method: method,
161        request: req,
162        call: c.interceptable,
163        metadata: metadata
164      }
165      if return_op
166        # return the operation view of the active_call; define #execute as a
167        # new method for this instance that invokes #request_response.
168        c.merge_metadata_to_send(metadata)
169        op = c.operation
170        op.define_singleton_method(:execute) do
171          interception_context.intercept!(:request_response, intercept_args) do
172            c.request_response(req, metadata: metadata)
173          end
174        end
175        op
176      else
177        interception_context.intercept!(:request_response, intercept_args) do
178          c.request_response(req, metadata: metadata)
179        end
180      end
181    end
182
183    # client_streamer sends a stream of requests to a GRPC server, and
184    # returns a single response.
185    #
186    # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
187    # #each enumeration protocol. In the simplest case, requests will be an
188    # array of marshallable objects; in typical case it will be an Enumerable
189    # that allows dynamic construction of the marshallable objects.
190    #
191    # == Flow Control ==
192    # This is a blocking call.
193    #
194    # * it does not return until a response is received.
195    #
196    # * each requests is sent only when GRPC core's flow control allows it to
197    #   be sent.
198    #
199    # == Errors ==
200    # An RuntimeError is raised if
201    #
202    # * the server responds with a non-OK status
203    #
204    # * the deadline is exceeded
205    #
206    # == Return Value ==
207    #
208    # If return_op is false, the call consumes the requests and returns
209    # the response.
210    #
211    # If return_op is true, the call returns the response.
212    #
213    # @param method [String] the RPC method to call on the GRPC server
214    # @param requests [Object] an Enumerable of requests to send
215    # @param marshal [Function] f(obj)->string that marshals requests
216    # @param unmarshal [Function] f(string)->obj that unmarshals responses
217    # @param deadline [Time] (optional) the time the request should complete
218    # @param return_op [true|false] return an Operation if true
219    # @param parent [Core::Call] a prior call whose reserved metadata
220    #   will be propagated by this one.
221    # @param credentials [Core::CallCredentials] credentials to use when making
222    #   the call
223    # @param metadata [Hash] metadata to be sent to the server
224    # @return [Object|Operation] the response received from the server
225    def client_streamer(method, requests, marshal, unmarshal,
226                        deadline: nil,
227                        return_op: false,
228                        parent: nil,
229                        credentials: nil,
230                        metadata: {})
231      c = new_active_call(method, marshal, unmarshal,
232                          deadline: deadline,
233                          parent: parent,
234                          credentials: credentials)
235      interception_context = @interceptors.build_context
236      intercept_args = {
237        method: method,
238        requests: requests,
239        call: c.interceptable,
240        metadata: metadata
241      }
242      if return_op
243        # return the operation view of the active_call; define #execute as a
244        # new method for this instance that invokes #client_streamer.
245        c.merge_metadata_to_send(metadata)
246        op = c.operation
247        op.define_singleton_method(:execute) do
248          interception_context.intercept!(:client_streamer, intercept_args) do
249            c.client_streamer(requests)
250          end
251        end
252        op
253      else
254        interception_context.intercept!(:client_streamer, intercept_args) do
255          c.client_streamer(requests, metadata: metadata)
256        end
257      end
258    end
259
260    # server_streamer sends one request to the GRPC server, which yields a
261    # stream of responses.
262    #
263    # responses provides an enumerator over the streamed responses, i.e. it
264    # follows Ruby's #each iteration protocol.  The enumerator blocks while
265    # waiting for each response, stops when the server signals that no
266    # further responses will be supplied.  If the implicit block is provided,
267    # it is executed with each response as the argument and no result is
268    # returned.
269    #
270    # == Flow Control ==
271    # This is a blocking call.
272    #
273    # * the request is sent only when GRPC core's flow control allows it to
274    #   be sent.
275    #
276    # * the request will not complete until the server sends the final
277    #   response followed by a status message.
278    #
279    # == Errors ==
280    # An RuntimeError is raised if
281    #
282    # * the server responds with a non-OK status when any response is
283    # * retrieved
284    #
285    # * the deadline is exceeded
286    #
287    # == Return Value ==
288    #
289    # if the return_op is false, the return value is an Enumerator of the
290    # results, unless a block is provided, in which case the block is
291    # executed with each response.
292    #
293    # if return_op is true, the function returns an Operation whose #execute
294    # method runs server streamer call. Again, Operation#execute either
295    # calls the given block with each response or returns an Enumerator of the
296    # responses.
297    #
298    # == Keyword Args ==
299    #
300    # Unspecified keyword arguments are treated as metadata to be sent to the
301    # server.
302    #
303    # @param method [String] the RPC method to call on the GRPC server
304    # @param req [Object] the request sent to the server
305    # @param marshal [Function] f(obj)->string that marshals requests
306    # @param unmarshal [Function] f(string)->obj that unmarshals responses
307    # @param deadline [Time] (optional) the time the request should complete
308    # @param return_op [true|false]return an Operation if true
309    # @param parent [Core::Call] a prior call whose reserved metadata
310    #   will be propagated by this one.
311    # @param credentials [Core::CallCredentials] credentials to use when making
312    #   the call
313    # @param metadata [Hash] metadata to be sent to the server
314    # @param blk [Block] when provided, is executed for each response
315    # @return [Enumerator|Operation|nil] as discussed above
316    def server_streamer(method, req, marshal, unmarshal,
317                        deadline: nil,
318                        return_op: false,
319                        parent: nil,
320                        credentials: nil,
321                        metadata: {},
322                        &blk)
323      c = new_active_call(method, marshal, unmarshal,
324                          deadline: deadline,
325                          parent: parent,
326                          credentials: credentials)
327      interception_context = @interceptors.build_context
328      intercept_args = {
329        method: method,
330        request: req,
331        call: c.interceptable,
332        metadata: metadata
333      }
334      if return_op
335        # return the operation view of the active_call; define #execute
336        # as a new method for this instance that invokes #server_streamer
337        c.merge_metadata_to_send(metadata)
338        op = c.operation
339        op.define_singleton_method(:execute) do
340          interception_context.intercept!(:server_streamer, intercept_args) do
341            c.server_streamer(req, &blk)
342          end
343        end
344        op
345      else
346        interception_context.intercept!(:server_streamer, intercept_args) do
347          c.server_streamer(req, metadata: metadata, &blk)
348        end
349      end
350    end
351
352    # bidi_streamer sends a stream of requests to the GRPC server, and yields
353    # a stream of responses.
354    #
355    # This method takes an Enumerable of requests, and returns and enumerable
356    # of responses.
357    #
358    # == requests ==
359    #
360    # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
361    # #each enumeration protocol. In the simplest case, requests will be an
362    # array of marshallable objects; in typical case it will be an
363    # Enumerable that allows dynamic construction of the marshallable
364    # objects.
365    #
366    # == responses ==
367    #
368    # This is an enumerator of responses.  I.e, its #next method blocks
369    # waiting for the next response.  Also, if at any point the block needs
370    # to consume all the remaining responses, this can be done using #each or
371    # #collect.  Calling #each or #collect should only be done if
372    # the_call#writes_done has been called, otherwise the block will loop
373    # forever.
374    #
375    # == Flow Control ==
376    # This is a blocking call.
377    #
378    # * the call completes when the next call to provided block returns
379    #   false
380    #
381    # * the execution block parameters are two objects for sending and
382    #   receiving responses, each of which blocks waiting for flow control.
383    #   E.g, calles to bidi_call#remote_send will wait until flow control
384    #   allows another write before returning; and obviously calls to
385    #   responses#next block until the next response is available.
386    #
387    # == Termination ==
388    #
389    # As well as sending and receiving messages, the block passed to the
390    # function is also responsible for:
391    #
392    # * calling bidi_call#writes_done to indicate no further reqs will be
393    #   sent.
394    #
395    # * returning false if once the bidi stream is functionally completed.
396    #
397    # Note that response#next will indicate that there are no further
398    # responses by throwing StopIteration, but can only happen either
399    # if bidi_call#writes_done is called.
400    #
401    # To properly terminate the RPC, the responses should be completely iterated
402    # through; one way to do this is to loop on responses#next until no further
403    # responses are available.
404    #
405    # == Errors ==
406    # An RuntimeError is raised if
407    #
408    # * the server responds with a non-OK status when any response is
409    # * retrieved
410    #
411    # * the deadline is exceeded
412    #
413    #
414    # == Return Value ==
415    #
416    # if the return_op is false, the return value is an Enumerator of the
417    # results, unless a block is provided, in which case the block is
418    # executed with each response.
419    #
420    # if return_op is true, the function returns an Operation whose #execute
421    # method runs the Bidi call. Again, Operation#execute either calls a
422    # given block with each response or returns an Enumerator of the
423    # responses.
424    #
425    # @param method [String] the RPC method to call on the GRPC server
426    # @param requests [Object] an Enumerable of requests to send
427    # @param marshal [Function] f(obj)->string that marshals requests
428    # @param unmarshal [Function] f(string)->obj that unmarshals responses
429    # @param deadline [Time] (optional) the time the request should complete
430    # @param return_op [true|false] return an Operation if true
431    # @param parent [Core::Call] a prior call whose reserved metadata
432    #   will be propagated by this one.
433    # @param credentials [Core::CallCredentials] credentials to use when making
434    #   the call
435    # @param metadata [Hash] metadata to be sent to the server
436    # @param blk [Block] when provided, is executed for each response
437    # @return [Enumerator|nil|Operation] as discussed above
438    def bidi_streamer(method, requests, marshal, unmarshal,
439                      deadline: nil,
440                      return_op: false,
441                      parent: nil,
442                      credentials: nil,
443                      metadata: {},
444                      &blk)
445      c = new_active_call(method, marshal, unmarshal,
446                          deadline: deadline,
447                          parent: parent,
448                          credentials: credentials)
449      interception_context = @interceptors.build_context
450      intercept_args = {
451        method: method,
452        requests: requests,
453        call: c.interceptable,
454        metadata: metadata
455      }
456      if return_op
457        # return the operation view of the active_call; define #execute
458        # as a new method for this instance that invokes #bidi_streamer
459        c.merge_metadata_to_send(metadata)
460        op = c.operation
461        op.define_singleton_method(:execute) do
462          interception_context.intercept!(:bidi_streamer, intercept_args) do
463            c.bidi_streamer(requests, &blk)
464          end
465        end
466        op
467      else
468        interception_context.intercept!(:bidi_streamer, intercept_args) do
469          c.bidi_streamer(requests, metadata: metadata, &blk)
470        end
471      end
472    end
473
474    private
475
476    # Creates a new active stub
477    #
478    # @param method [string] the method being called.
479    # @param marshal [Function] f(obj)->string that marshals requests
480    # @param unmarshal [Function] f(string)->obj that unmarshals responses
481    # @param parent [Grpc::Call] a parent call, available when calls are
482    #   made from server
483    # @param credentials [Core::CallCredentials] credentials to use when making
484    #   the call
485    def new_active_call(method, marshal, unmarshal,
486                        deadline: nil,
487                        parent: nil,
488                        credentials: nil)
489      deadline = from_relative_time(@timeout) if deadline.nil?
490      # Provide each new client call with its own completion queue
491      call = @ch.create_call(parent, # parent call
492                             @propagate_mask, # propagation options
493                             method,
494                             nil, # host use nil,
495                             deadline)
496      call.set_credentials! credentials unless credentials.nil?
497      ActiveCall.new(call, marshal, unmarshal, deadline,
498                     started: false)
499    end
500  end
501end
502