1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <ruby/ruby.h>
20 
21 #include "rb_call.h"
22 #include "rb_grpc_imports.generated.h"
23 
24 #include <grpc/grpc.h>
25 #include <grpc/impl/codegen/compression_types.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 
29 #include "rb_byte_buffer.h"
30 #include "rb_call_credentials.h"
31 #include "rb_completion_queue.h"
32 #include "rb_grpc.h"
33 
34 /* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */
35 static VALUE grpc_rb_cCall;
36 
37 /* grpc_rb_eCallError is the ruby class of the exception thrown during call
38    operations; */
39 VALUE grpc_rb_eCallError = Qnil;
40 
41 /* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate
42    a timeout. */
43 static VALUE grpc_rb_eOutOfTime = Qnil;
44 
45 /* grpc_rb_sBatchResult is struct class used to hold the results of a batch
46  * call. */
47 static VALUE grpc_rb_sBatchResult;
48 
49 /* grpc_rb_cMdAry is the MetadataArray class whose instances proxy
50  * grpc_metadata_array. */
51 VALUE grpc_rb_cMdAry;
52 
53 /* id_credentials is the name of the hidden ivar that preserves the value
54  * of the credentials added to the call */
55 static ID id_credentials;
56 
57 /* id_metadata is name of the attribute used to access the metadata hash
58  * received by the call and subsequently saved on it. */
59 static ID id_metadata;
60 
61 /* id_trailing_metadata is the name of the attribute used to access the trailing
62  * metadata hash received by the call and subsequently saved on it. */
63 static ID id_trailing_metadata;
64 
65 /* id_status is name of the attribute used to access the status object
66  * received by the call and subsequently saved on it. */
67 static ID id_status;
68 
69 /* id_write_flag is name of the attribute used to access the write_flag
70  * saved on the call. */
71 static ID id_write_flag;
72 
73 /* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
74 static VALUE sym_send_message;
75 static VALUE sym_send_metadata;
76 static VALUE sym_send_close;
77 static VALUE sym_send_status;
78 static VALUE sym_message;
79 static VALUE sym_status;
80 static VALUE sym_cancelled;
81 
82 typedef struct grpc_rb_call {
83   grpc_call* wrapped;
84   grpc_completion_queue* queue;
85 } grpc_rb_call;
86 
destroy_call(grpc_rb_call * call)87 static void destroy_call(grpc_rb_call* call) {
88   /* Ensure that we only try to destroy the call once */
89   if (call->wrapped != NULL) {
90     grpc_call_unref(call->wrapped);
91     call->wrapped = NULL;
92     grpc_rb_completion_queue_destroy(call->queue);
93     call->queue = NULL;
94   }
95 }
96 
97 /* Destroys a Call. */
grpc_rb_call_destroy(void * p)98 static void grpc_rb_call_destroy(void* p) {
99   if (p == NULL) {
100     return;
101   }
102   destroy_call((grpc_rb_call*)p);
103   xfree(p);
104 }
105 
106 const rb_data_type_t grpc_rb_md_ary_data_type = {
107     "grpc_metadata_array",
108     {GRPC_RB_GC_NOT_MARKED,
109      GRPC_RB_GC_DONT_FREE,
110      GRPC_RB_MEMSIZE_UNAVAILABLE,
111      {NULL, NULL}},
112     NULL,
113     NULL,
114 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
115     /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
116      * grpc_rb_call_destroy
117      * touches a hash object.
118      * TODO(yugui) Directly use st_table and call the free function earlier?
119      */
120     0,
121 #endif
122 };
123 
124 /* Describes grpc_call struct for RTypedData */
125 static const rb_data_type_t grpc_call_data_type = {"grpc_call",
126                                                    {GRPC_RB_GC_NOT_MARKED,
127                                                     grpc_rb_call_destroy,
128                                                     GRPC_RB_MEMSIZE_UNAVAILABLE,
129                                                     {NULL, NULL}},
130                                                    NULL,
131                                                    NULL,
132 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
133                                                    RUBY_TYPED_FREE_IMMEDIATELY
134 #endif
135 };
136 
137 /* Error code details is a hash containing text strings describing errors */
138 VALUE rb_error_code_details;
139 
140 /* Obtains the error detail string for given error code */
grpc_call_error_detail_of(grpc_call_error err)141 const char* grpc_call_error_detail_of(grpc_call_error err) {
142   VALUE detail_ref = rb_hash_aref(rb_error_code_details, UINT2NUM(err));
143   const char* detail = "unknown error code!";
144   if (detail_ref != Qnil) {
145     detail = StringValueCStr(detail_ref);
146   }
147   return detail;
148 }
149 
150 /* Called by clients to cancel an RPC on the server.
151    Can be called multiple times, from any thread. */
grpc_rb_call_cancel(VALUE self)152 static VALUE grpc_rb_call_cancel(VALUE self) {
153   grpc_rb_call* call = NULL;
154   grpc_call_error err;
155   if (RTYPEDDATA_DATA(self) == NULL) {
156     // This call has been closed
157     return Qnil;
158   }
159 
160   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
161   err = grpc_call_cancel(call->wrapped, NULL);
162   if (err != GRPC_CALL_OK) {
163     rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)",
164              grpc_call_error_detail_of(err), err);
165   }
166 
167   return Qnil;
168 }
169 
170 /* TODO: expose this as part of the surface API if needed.
171  * This is meant for internal usage by the "write thread" of grpc-ruby
172  * client-side bidi calls. It provides a way for the background write-thread
173  * to propagate failures to the main read-thread and give the user an error
174  * message. */
grpc_rb_call_cancel_with_status(VALUE self,VALUE status_code,VALUE details)175 static VALUE grpc_rb_call_cancel_with_status(VALUE self, VALUE status_code,
176                                              VALUE details) {
177   grpc_rb_call* call = NULL;
178   grpc_call_error err;
179   if (RTYPEDDATA_DATA(self) == NULL) {
180     // This call has been closed
181     return Qnil;
182   }
183 
184   if (TYPE(details) != T_STRING || TYPE(status_code) != T_FIXNUM) {
185     rb_raise(rb_eTypeError,
186              "Bad parameter type error for cancel with status. Want Fixnum, "
187              "String.");
188     return Qnil;
189   }
190 
191   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
192   err = grpc_call_cancel_with_status(call->wrapped, NUM2LONG(status_code),
193                                      StringValueCStr(details), NULL);
194   if (err != GRPC_CALL_OK) {
195     rb_raise(grpc_rb_eCallError, "cancel with status failed: %s (code=%d)",
196              grpc_call_error_detail_of(err), err);
197   }
198 
199   return Qnil;
200 }
201 
202 /* Releases the c-level resources associated with a call
203    Once a call has been closed, no further requests can be
204    processed.
205 */
grpc_rb_call_close(VALUE self)206 static VALUE grpc_rb_call_close(VALUE self) {
207   grpc_rb_call* call = NULL;
208   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
209   if (call != NULL) {
210     destroy_call(call);
211     xfree(RTYPEDDATA_DATA(self));
212     RTYPEDDATA_DATA(self) = NULL;
213   }
214   return Qnil;
215 }
216 
217 /* Called to obtain the peer that this call is connected to. */
grpc_rb_call_get_peer(VALUE self)218 static VALUE grpc_rb_call_get_peer(VALUE self) {
219   VALUE res = Qnil;
220   grpc_rb_call* call = NULL;
221   char* peer = NULL;
222   if (RTYPEDDATA_DATA(self) == NULL) {
223     rb_raise(grpc_rb_eCallError, "Cannot get peer value on closed call");
224     return Qnil;
225   }
226   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
227   peer = grpc_call_get_peer(call->wrapped);
228   res = rb_str_new2(peer);
229   gpr_free(peer);
230 
231   return res;
232 }
233 
234 /* Called to obtain the x509 cert of an authenticated peer. */
grpc_rb_call_get_peer_cert(VALUE self)235 static VALUE grpc_rb_call_get_peer_cert(VALUE self) {
236   grpc_rb_call* call = NULL;
237   VALUE res = Qnil;
238   grpc_auth_context* ctx = NULL;
239   if (RTYPEDDATA_DATA(self) == NULL) {
240     rb_raise(grpc_rb_eCallError, "Cannot get peer cert on closed call");
241     return Qnil;
242   }
243   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
244 
245   ctx = grpc_call_auth_context(call->wrapped);
246 
247   if (!ctx || !grpc_auth_context_peer_is_authenticated(ctx)) {
248     return Qnil;
249   }
250 
251   {
252     grpc_auth_property_iterator it = grpc_auth_context_find_properties_by_name(
253         ctx, GRPC_X509_PEM_CERT_PROPERTY_NAME);
254     const grpc_auth_property* prop = grpc_auth_property_iterator_next(&it);
255     if (prop == NULL) {
256       return Qnil;
257     }
258 
259     res = rb_str_new2(prop->value);
260   }
261 
262   grpc_auth_context_release(ctx);
263 
264   return res;
265 }
266 
267 /*
268   call-seq:
269   status = call.status
270 
271   Gets the status object saved the call.  */
grpc_rb_call_get_status(VALUE self)272 static VALUE grpc_rb_call_get_status(VALUE self) {
273   return rb_ivar_get(self, id_status);
274 }
275 
276 /*
277   call-seq:
278   call.status = status
279 
280   Saves a status object on the call.  */
grpc_rb_call_set_status(VALUE self,VALUE status)281 static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) {
282   if (!NIL_P(status) && rb_obj_class(status) != grpc_rb_sStatus) {
283     rb_raise(rb_eTypeError, "bad status: got:<%s> want: <Struct::Status>",
284              rb_obj_classname(status));
285     return Qnil;
286   }
287 
288   return rb_ivar_set(self, id_status, status);
289 }
290 
291 /*
292   call-seq:
293   metadata = call.metadata
294 
295   Gets the metadata object saved the call.  */
grpc_rb_call_get_metadata(VALUE self)296 static VALUE grpc_rb_call_get_metadata(VALUE self) {
297   return rb_ivar_get(self, id_metadata);
298 }
299 
300 /*
301   call-seq:
302   call.metadata = metadata
303 
304   Saves the metadata hash on the call.  */
grpc_rb_call_set_metadata(VALUE self,VALUE metadata)305 static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
306   if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
307     rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
308              rb_obj_classname(metadata));
309     return Qnil;
310   }
311 
312   return rb_ivar_set(self, id_metadata, metadata);
313 }
314 
315 /*
316   call-seq:
317   trailing_metadata = call.trailing_metadata
318 
319   Gets the trailing metadata object saved on the call */
grpc_rb_call_get_trailing_metadata(VALUE self)320 static VALUE grpc_rb_call_get_trailing_metadata(VALUE self) {
321   return rb_ivar_get(self, id_trailing_metadata);
322 }
323 
324 /*
325   call-seq:
326   call.trailing_metadata = trailing_metadata
327 
328   Saves the trailing metadata hash on the call. */
grpc_rb_call_set_trailing_metadata(VALUE self,VALUE metadata)329 static VALUE grpc_rb_call_set_trailing_metadata(VALUE self, VALUE metadata) {
330   if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
331     rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
332              rb_obj_classname(metadata));
333     return Qnil;
334   }
335 
336   return rb_ivar_set(self, id_trailing_metadata, metadata);
337 }
338 
339 /*
340   call-seq:
341   write_flag = call.write_flag
342 
343   Gets the write_flag value saved the call.  */
grpc_rb_call_get_write_flag(VALUE self)344 static VALUE grpc_rb_call_get_write_flag(VALUE self) {
345   return rb_ivar_get(self, id_write_flag);
346 }
347 
348 /*
349   call-seq:
350   call.write_flag = write_flag
351 
352   Saves the write_flag on the call.  */
grpc_rb_call_set_write_flag(VALUE self,VALUE write_flag)353 static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
354   if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) {
355     rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>",
356              rb_obj_classname(write_flag));
357     return Qnil;
358   }
359 
360   return rb_ivar_set(self, id_write_flag, write_flag);
361 }
362 
363 /*
364   call-seq:
365   call.set_credentials call_credentials
366 
367   Sets credentials on a call */
grpc_rb_call_set_credentials(VALUE self,VALUE credentials)368 static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) {
369   grpc_rb_call* call = NULL;
370   grpc_call_credentials* creds;
371   grpc_call_error err;
372   if (RTYPEDDATA_DATA(self) == NULL) {
373     rb_raise(grpc_rb_eCallError, "Cannot set credentials of closed call");
374     return Qnil;
375   }
376   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
377   creds = grpc_rb_get_wrapped_call_credentials(credentials);
378   err = grpc_call_set_credentials(call->wrapped, creds);
379   if (err != GRPC_CALL_OK) {
380     rb_raise(grpc_rb_eCallError,
381              "grpc_call_set_credentials failed with %s (code=%d)",
382              grpc_call_error_detail_of(err), err);
383   }
384   /* We need the credentials to be alive for as long as the call is alive,
385      but we don't care about destruction order. */
386   rb_ivar_set(self, id_credentials, credentials);
387   return Qnil;
388 }
389 
390 /* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
391    to fill grpc_metadata_array.
392 
393    it's capacity should have been computed via a prior call to
394    grpc_rb_md_ary_capacity_hash_cb
395 */
grpc_rb_md_ary_fill_hash_cb(VALUE key,VALUE val,VALUE md_ary_obj)396 static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
397   grpc_metadata_array* md_ary = NULL;
398   long array_length;
399   long i;
400   grpc_slice key_slice;
401   grpc_slice value_slice;
402   char* tmp_str = NULL;
403 
404   if (TYPE(key) == T_SYMBOL) {
405     key_slice = grpc_slice_from_static_string(rb_id2name(SYM2ID(key)));
406   } else if (TYPE(key) == T_STRING) {
407     key_slice =
408         grpc_slice_from_copied_buffer(RSTRING_PTR(key), RSTRING_LEN(key));
409   } else {
410     rb_raise(rb_eTypeError,
411              "grpc_rb_md_ary_fill_hash_cb: bad type for key parameter");
412     return ST_STOP;
413   }
414 
415   if (!grpc_header_key_is_legal(key_slice)) {
416     tmp_str = grpc_slice_to_c_string(key_slice);
417     rb_raise(rb_eArgError,
418              "'%s' is an invalid header key, must match [a-z0-9-_.]+", tmp_str);
419     return ST_STOP;
420   }
421 
422   /* Construct a metadata object from key and value and add it */
423   TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
424                        &grpc_rb_md_ary_data_type, md_ary);
425 
426   if (TYPE(val) == T_ARRAY) {
427     array_length = RARRAY_LEN(val);
428     /* If the value is an array, add capacity for each value in the array */
429     for (i = 0; i < array_length; i++) {
430       value_slice = grpc_slice_from_copied_buffer(
431           RSTRING_PTR(rb_ary_entry(val, i)), RSTRING_LEN(rb_ary_entry(val, i)));
432       if (!grpc_is_binary_header(key_slice) &&
433           !grpc_header_nonbin_value_is_legal(value_slice)) {
434         // The value has invalid characters
435         tmp_str = grpc_slice_to_c_string(value_slice);
436         rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
437                  tmp_str);
438         return ST_STOP;
439       }
440       GPR_ASSERT(md_ary->count < md_ary->capacity);
441       md_ary->metadata[md_ary->count].key = key_slice;
442       md_ary->metadata[md_ary->count].value = value_slice;
443       md_ary->count += 1;
444     }
445   } else if (TYPE(val) == T_STRING) {
446     value_slice =
447         grpc_slice_from_copied_buffer(RSTRING_PTR(val), RSTRING_LEN(val));
448     if (!grpc_is_binary_header(key_slice) &&
449         !grpc_header_nonbin_value_is_legal(value_slice)) {
450       // The value has invalid characters
451       tmp_str = grpc_slice_to_c_string(value_slice);
452       rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
453                tmp_str);
454       return ST_STOP;
455     }
456     GPR_ASSERT(md_ary->count < md_ary->capacity);
457     md_ary->metadata[md_ary->count].key = key_slice;
458     md_ary->metadata[md_ary->count].value = value_slice;
459     md_ary->count += 1;
460   } else {
461     rb_raise(rb_eArgError, "Header values must be of type string or array");
462     return ST_STOP;
463   }
464   return ST_CONTINUE;
465 }
466 
467 /* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used
468    to pre-compute the capacity a grpc_metadata_array.
469 */
grpc_rb_md_ary_capacity_hash_cb(VALUE key,VALUE val,VALUE md_ary_obj)470 static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
471                                            VALUE md_ary_obj) {
472   grpc_metadata_array* md_ary = NULL;
473 
474   (void)key;
475 
476   /* Construct a metadata object from key and value and add it */
477   TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
478                        &grpc_rb_md_ary_data_type, md_ary);
479 
480   if (TYPE(val) == T_ARRAY) {
481     /* If the value is an array, add capacity for each value in the array */
482     md_ary->capacity += RARRAY_LEN(val);
483   } else {
484     md_ary->capacity += 1;
485   }
486 
487   return ST_CONTINUE;
488 }
489 
490 /* grpc_rb_md_ary_convert converts a ruby metadata hash into
491    a grpc_metadata_array.
492    Note that this function may throw exceptions.
493 */
grpc_rb_md_ary_convert(VALUE md_ary_hash,grpc_metadata_array * md_ary)494 void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array* md_ary) {
495   VALUE md_ary_obj = Qnil;
496   if (md_ary_hash == Qnil) {
497     return; /* Do nothing if the expected has value is nil */
498   }
499   if (TYPE(md_ary_hash) != T_HASH) {
500     rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
501              rb_obj_classname(md_ary_hash));
502     return;
503   }
504 
505   /* Initialize the array, compute it's capacity, then fill it. */
506   grpc_metadata_array_init(md_ary);
507   md_ary_obj =
508       TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
509   rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
510   md_ary->metadata = gpr_zalloc(md_ary->capacity * sizeof(grpc_metadata));
511   rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
512 }
513 
514 /* Converts a metadata array to a hash. */
grpc_rb_md_ary_to_h(grpc_metadata_array * md_ary)515 VALUE grpc_rb_md_ary_to_h(grpc_metadata_array* md_ary) {
516   VALUE key = Qnil;
517   VALUE new_ary = Qnil;
518   VALUE value = Qnil;
519   VALUE result = rb_hash_new();
520   size_t i;
521 
522   for (i = 0; i < md_ary->count; i++) {
523     key = grpc_rb_slice_to_ruby_string(md_ary->metadata[i].key);
524     value = rb_hash_aref(result, key);
525     if (value == Qnil) {
526       value = grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value);
527       rb_hash_aset(result, key, value);
528     } else if (TYPE(value) == T_ARRAY) {
529       /* Add the string to the returned array */
530       rb_ary_push(value,
531                   grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
532     } else {
533       /* Add the current value with this key and the new one to an array */
534       new_ary = rb_ary_new();
535       rb_ary_push(new_ary, value);
536       rb_ary_push(new_ary,
537                   grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
538       rb_hash_aset(result, key, new_ary);
539     }
540   }
541   return result;
542 }
543 
544 /* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks
545    each key of an ops hash is valid.
546 */
grpc_rb_call_check_op_keys_hash_cb(VALUE key,VALUE val,VALUE ops_ary)547 static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
548                                               VALUE ops_ary) {
549   (void)val;
550   /* Update the capacity; the value is an array, add capacity for each value in
551    * the array */
552   if (TYPE(key) != T_FIXNUM) {
553     rb_raise(rb_eTypeError, "invalid operation : got <%s>, want <Fixnum>",
554              rb_obj_classname(key));
555     return ST_STOP;
556   }
557   switch (NUM2INT(key)) {
558     case GRPC_OP_SEND_INITIAL_METADATA:
559     case GRPC_OP_SEND_MESSAGE:
560     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
561     case GRPC_OP_SEND_STATUS_FROM_SERVER:
562     case GRPC_OP_RECV_INITIAL_METADATA:
563     case GRPC_OP_RECV_MESSAGE:
564     case GRPC_OP_RECV_STATUS_ON_CLIENT:
565     case GRPC_OP_RECV_CLOSE_ON_SERVER:
566       rb_ary_push(ops_ary, key);
567       return ST_CONTINUE;
568     default:
569       rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key));
570   };
571   return ST_STOP;
572 }
573 
574 /* grpc_rb_op_update_status_from_server adds the values in a ruby status
575    struct to the 'send_status_from_server' portion of an op.
576 */
grpc_rb_op_update_status_from_server(grpc_op * op,grpc_metadata_array * md_ary,grpc_slice * send_status_details,VALUE status)577 static void grpc_rb_op_update_status_from_server(
578     grpc_op* op, grpc_metadata_array* md_ary, grpc_slice* send_status_details,
579     VALUE status) {
580   VALUE code = rb_struct_aref(status, sym_code);
581   VALUE details = rb_struct_aref(status, sym_details);
582   VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
583 
584   /* TODO: add check to ensure status is the correct struct type */
585   if (TYPE(code) != T_FIXNUM) {
586     rb_raise(rb_eTypeError, "invalid code : got <%s>, want <Fixnum>",
587              rb_obj_classname(code));
588     return;
589   }
590   if (TYPE(details) != T_STRING) {
591     rb_raise(rb_eTypeError, "invalid details : got <%s>, want <String>",
592              rb_obj_classname(code));
593     return;
594   }
595 
596   *send_status_details =
597       grpc_slice_from_copied_buffer(RSTRING_PTR(details), RSTRING_LEN(details));
598 
599   op->data.send_status_from_server.status = NUM2INT(code);
600   op->data.send_status_from_server.status_details = send_status_details;
601   grpc_rb_md_ary_convert(metadata_hash, md_ary);
602   op->data.send_status_from_server.trailing_metadata_count = md_ary->count;
603   op->data.send_status_from_server.trailing_metadata = md_ary->metadata;
604 }
605 
606 /* run_batch_stack holds various values used by the
607  * grpc_rb_call_run_batch function */
608 typedef struct run_batch_stack {
609   /* The batch ops */
610   grpc_op ops[8]; /* 8 is the maximum number of operations */
611   size_t op_num;  /* tracks the last added operation */
612 
613   /* Data being sent */
614   grpc_metadata_array send_metadata;
615   grpc_metadata_array send_trailing_metadata;
616 
617   /* Data being received */
618   grpc_byte_buffer* recv_message;
619   grpc_metadata_array recv_metadata;
620   grpc_metadata_array recv_trailing_metadata;
621   int recv_cancelled;
622   grpc_status_code recv_status;
623   grpc_slice recv_status_details;
624   const char* recv_status_debug_error_string;
625   unsigned write_flag;
626   grpc_slice send_status_details;
627 } run_batch_stack;
628 
629 /* grpc_run_batch_stack_init ensures the run_batch_stack is properly
630  * initialized */
grpc_run_batch_stack_init(run_batch_stack * st,unsigned write_flag)631 static void grpc_run_batch_stack_init(run_batch_stack* st,
632                                       unsigned write_flag) {
633   MEMZERO(st, run_batch_stack, 1);
634   grpc_metadata_array_init(&st->send_metadata);
635   grpc_metadata_array_init(&st->send_trailing_metadata);
636   grpc_metadata_array_init(&st->recv_metadata);
637   grpc_metadata_array_init(&st->recv_trailing_metadata);
638   st->op_num = 0;
639   st->write_flag = write_flag;
640 }
641 
grpc_rb_metadata_array_destroy_including_entries(grpc_metadata_array * array)642 void grpc_rb_metadata_array_destroy_including_entries(
643     grpc_metadata_array* array) {
644   size_t i;
645   if (array->metadata) {
646     for (i = 0; i < array->count; i++) {
647       grpc_slice_unref(array->metadata[i].key);
648       grpc_slice_unref(array->metadata[i].value);
649     }
650   }
651   grpc_metadata_array_destroy(array);
652 }
653 
654 /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
655  * cleaned up */
grpc_run_batch_stack_cleanup(run_batch_stack * st)656 static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
657   size_t i = 0;
658 
659   grpc_rb_metadata_array_destroy_including_entries(&st->send_metadata);
660   grpc_rb_metadata_array_destroy_including_entries(&st->send_trailing_metadata);
661   grpc_metadata_array_destroy(&st->recv_metadata);
662   grpc_metadata_array_destroy(&st->recv_trailing_metadata);
663 
664   if (GRPC_SLICE_START_PTR(st->send_status_details) != NULL) {
665     grpc_slice_unref(st->send_status_details);
666   }
667 
668   if (GRPC_SLICE_START_PTR(st->recv_status_details) != NULL) {
669     grpc_slice_unref(st->recv_status_details);
670   }
671 
672   if (st->recv_message != NULL) {
673     grpc_byte_buffer_destroy(st->recv_message);
674   }
675 
676   for (i = 0; i < st->op_num; i++) {
677     if (st->ops[i].op == GRPC_OP_SEND_MESSAGE) {
678       grpc_byte_buffer_destroy(st->ops[i].data.send_message.send_message);
679     }
680   }
681 }
682 
683 /* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
684  * ops_hash */
grpc_run_batch_stack_fill_ops(run_batch_stack * st,VALUE ops_hash)685 static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
686   VALUE this_op = Qnil;
687   VALUE this_value = Qnil;
688   VALUE ops_ary = rb_ary_new();
689   size_t i = 0;
690 
691   /* Create a ruby array with just the operation keys */
692   rb_hash_foreach(ops_hash, grpc_rb_call_check_op_keys_hash_cb, ops_ary);
693 
694   /* Fill the ops array */
695   for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
696     this_op = rb_ary_entry(ops_ary, i);
697     this_value = rb_hash_aref(ops_hash, this_op);
698     st->ops[st->op_num].flags = 0;
699     switch (NUM2INT(this_op)) {
700       case GRPC_OP_SEND_INITIAL_METADATA:
701         grpc_rb_md_ary_convert(this_value, &st->send_metadata);
702         st->ops[st->op_num].data.send_initial_metadata.count =
703             st->send_metadata.count;
704         st->ops[st->op_num].data.send_initial_metadata.metadata =
705             st->send_metadata.metadata;
706         break;
707       case GRPC_OP_SEND_MESSAGE:
708         st->ops[st->op_num].data.send_message.send_message =
709             grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
710                                      RSTRING_LEN(this_value));
711         st->ops[st->op_num].flags = st->write_flag;
712         break;
713       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
714         break;
715       case GRPC_OP_SEND_STATUS_FROM_SERVER:
716         grpc_rb_op_update_status_from_server(
717             &st->ops[st->op_num], &st->send_trailing_metadata,
718             &st->send_status_details, this_value);
719         break;
720       case GRPC_OP_RECV_INITIAL_METADATA:
721         st->ops[st->op_num].data.recv_initial_metadata.recv_initial_metadata =
722             &st->recv_metadata;
723         break;
724       case GRPC_OP_RECV_MESSAGE:
725         st->ops[st->op_num].data.recv_message.recv_message = &st->recv_message;
726         break;
727       case GRPC_OP_RECV_STATUS_ON_CLIENT:
728         st->ops[st->op_num].data.recv_status_on_client.trailing_metadata =
729             &st->recv_trailing_metadata;
730         st->ops[st->op_num].data.recv_status_on_client.status =
731             &st->recv_status;
732         st->ops[st->op_num].data.recv_status_on_client.status_details =
733             &st->recv_status_details;
734         st->ops[st->op_num].data.recv_status_on_client.error_string =
735             &st->recv_status_debug_error_string;
736         break;
737       case GRPC_OP_RECV_CLOSE_ON_SERVER:
738         st->ops[st->op_num].data.recv_close_on_server.cancelled =
739             &st->recv_cancelled;
740         break;
741       default:
742         grpc_run_batch_stack_cleanup(st);
743         rb_raise(rb_eTypeError, "invalid operation : bad value %d",
744                  NUM2INT(this_op));
745     };
746     st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
747     st->ops[st->op_num].reserved = NULL;
748     st->op_num++;
749   }
750 }
751 
752 /* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
753    after the results have run */
grpc_run_batch_stack_build_result(run_batch_stack * st)754 static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
755   size_t i = 0;
756   VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil,
757                                Qnil, Qnil, Qnil, Qnil, NULL);
758   for (i = 0; i < st->op_num; i++) {
759     switch (st->ops[i].op) {
760       case GRPC_OP_SEND_INITIAL_METADATA:
761         rb_struct_aset(result, sym_send_metadata, Qtrue);
762         break;
763       case GRPC_OP_SEND_MESSAGE:
764         rb_struct_aset(result, sym_send_message, Qtrue);
765         break;
766       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
767         rb_struct_aset(result, sym_send_close, Qtrue);
768         break;
769       case GRPC_OP_SEND_STATUS_FROM_SERVER:
770         rb_struct_aset(result, sym_send_status, Qtrue);
771         break;
772       case GRPC_OP_RECV_INITIAL_METADATA:
773         rb_struct_aset(result, sym_metadata,
774                        grpc_rb_md_ary_to_h(&st->recv_metadata));
775       case GRPC_OP_RECV_MESSAGE:
776         rb_struct_aset(result, sym_message,
777                        grpc_rb_byte_buffer_to_s(st->recv_message));
778         break;
779       case GRPC_OP_RECV_STATUS_ON_CLIENT:
780         rb_struct_aset(
781             result, sym_status,
782             rb_struct_new(
783                 grpc_rb_sStatus, UINT2NUM(st->recv_status),
784                 (GRPC_SLICE_START_PTR(st->recv_status_details) == NULL
785                      ? Qnil
786                      : grpc_rb_slice_to_ruby_string(st->recv_status_details)),
787                 grpc_rb_md_ary_to_h(&st->recv_trailing_metadata),
788                 st->recv_status_debug_error_string == NULL
789                     ? Qnil
790                     : rb_str_new_cstr(st->recv_status_debug_error_string),
791                 NULL));
792         gpr_free((void*)st->recv_status_debug_error_string);
793         break;
794       case GRPC_OP_RECV_CLOSE_ON_SERVER:
795         rb_struct_aset(result, sym_send_close, Qtrue);
796         break;
797       default:
798         break;
799     }
800   }
801   return result;
802 }
803 
804 /* call-seq:
805    ops = {
806      GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
807      GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
808      ...
809    }
810    tag = Object.new
811    timeout = 10
812    call.start_batch(tag, timeout, ops)
813 
814    Start a batch of operations defined in the array ops; when complete, post a
815    completion of type 'tag' to the completion queue bound to the call.
816 
817    Also waits for the batch to complete, until timeout is reached.
818    The order of ops specified in the batch has no significance.
819    Only one operation of each type can be active at once in any given
820    batch */
grpc_rb_call_run_batch(VALUE self,VALUE ops_hash)821 static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
822   run_batch_stack* st = NULL;
823   grpc_rb_call* call = NULL;
824   grpc_event ev;
825   grpc_call_error err;
826   VALUE result = Qnil;
827   VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
828   unsigned write_flag = 0;
829   void* tag = (void*)&st;
830 
831   grpc_ruby_fork_guard();
832   if (RTYPEDDATA_DATA(self) == NULL) {
833     rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
834     return Qnil;
835   }
836   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
837 
838   /* Validate the ops args, adding them to a ruby array */
839   if (TYPE(ops_hash) != T_HASH) {
840     rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
841     return Qnil;
842   }
843   if (rb_write_flag != Qnil) {
844     write_flag = NUM2UINT(rb_write_flag);
845   }
846   st = gpr_malloc(sizeof(run_batch_stack));
847   grpc_run_batch_stack_init(st, write_flag);
848   grpc_run_batch_stack_fill_ops(st, ops_hash);
849 
850   /* call grpc_call_start_batch, then wait for it to complete using
851    * pluck_event */
852   err = grpc_call_start_batch(call->wrapped, st->ops, st->op_num, tag, NULL);
853   if (err != GRPC_CALL_OK) {
854     grpc_run_batch_stack_cleanup(st);
855     gpr_free(st);
856     rb_raise(grpc_rb_eCallError,
857              "grpc_call_start_batch failed with %s (code=%d)",
858              grpc_call_error_detail_of(err), err);
859     return Qnil;
860   }
861   ev = rb_completion_queue_pluck(call->queue, tag,
862                                  gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
863   if (!ev.success) {
864     rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
865   }
866   /* Build and return the BatchResult struct result,
867      if there is an error, it's reflected in the status */
868   result = grpc_run_batch_stack_build_result(st);
869   grpc_run_batch_stack_cleanup(st);
870   gpr_free(st);
871   return result;
872 }
873 
Init_grpc_write_flags()874 static void Init_grpc_write_flags() {
875   /* Constants representing the write flags in grpc.h */
876   VALUE grpc_rb_mWriteFlags =
877       rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags");
878   rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT",
879                   UINT2NUM(GRPC_WRITE_BUFFER_HINT));
880   rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS",
881                   UINT2NUM(GRPC_WRITE_NO_COMPRESS));
882 }
883 
Init_grpc_error_codes()884 static void Init_grpc_error_codes() {
885   /* Constants representing the error codes of grpc_call_error in grpc.h */
886   VALUE grpc_rb_mRpcErrors =
887       rb_define_module_under(grpc_rb_mGrpcCore, "RpcErrors");
888   rb_define_const(grpc_rb_mRpcErrors, "OK", UINT2NUM(GRPC_CALL_OK));
889   rb_define_const(grpc_rb_mRpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR));
890   rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_SERVER",
891                   UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER));
892   rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_CLIENT",
893                   UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT));
894   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_ACCEPTED",
895                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED));
896   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_INVOKED",
897                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED));
898   rb_define_const(grpc_rb_mRpcErrors, "NOT_INVOKED",
899                   UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED));
900   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_FINISHED",
901                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED));
902   rb_define_const(grpc_rb_mRpcErrors, "TOO_MANY_OPERATIONS",
903                   UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS));
904   rb_define_const(grpc_rb_mRpcErrors, "INVALID_FLAGS",
905                   UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS));
906 
907   /* Hint the GC that this is a global and shouldn't be sweeped. */
908   rb_global_variable(&rb_error_code_details);
909 
910   /* Add the detail strings to a Hash */
911   rb_error_code_details = rb_hash_new();
912   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_OK),
913                rb_str_new2("ok"));
914   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR),
915                rb_str_new2("unknown error"));
916   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER),
917                rb_str_new2("not available on a server"));
918   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT),
919                rb_str_new2("not available on a client"));
920   rb_hash_aset(rb_error_code_details,
921                UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED),
922                rb_str_new2("call is already accepted"));
923   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED),
924                rb_str_new2("call is already invoked"));
925   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED),
926                rb_str_new2("call is not yet invoked"));
927   rb_hash_aset(rb_error_code_details,
928                UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED),
929                rb_str_new2("call is already finished"));
930   rb_hash_aset(rb_error_code_details,
931                UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS),
932                rb_str_new2("outstanding read or write present"));
933   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS),
934                rb_str_new2("a bad flag was given"));
935   rb_define_const(grpc_rb_mRpcErrors, "ErrorMessages", rb_error_code_details);
936   rb_obj_freeze(rb_error_code_details);
937 }
938 
Init_grpc_op_codes()939 static void Init_grpc_op_codes() {
940   /* Constants representing operation type codes in grpc.h */
941   VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
942   rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA",
943                   UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA));
944   rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE",
945                   UINT2NUM(GRPC_OP_SEND_MESSAGE));
946   rb_define_const(grpc_rb_mCallOps, "SEND_CLOSE_FROM_CLIENT",
947                   UINT2NUM(GRPC_OP_SEND_CLOSE_FROM_CLIENT));
948   rb_define_const(grpc_rb_mCallOps, "SEND_STATUS_FROM_SERVER",
949                   UINT2NUM(GRPC_OP_SEND_STATUS_FROM_SERVER));
950   rb_define_const(grpc_rb_mCallOps, "RECV_INITIAL_METADATA",
951                   UINT2NUM(GRPC_OP_RECV_INITIAL_METADATA));
952   rb_define_const(grpc_rb_mCallOps, "RECV_MESSAGE",
953                   UINT2NUM(GRPC_OP_RECV_MESSAGE));
954   rb_define_const(grpc_rb_mCallOps, "RECV_STATUS_ON_CLIENT",
955                   UINT2NUM(GRPC_OP_RECV_STATUS_ON_CLIENT));
956   rb_define_const(grpc_rb_mCallOps, "RECV_CLOSE_ON_SERVER",
957                   UINT2NUM(GRPC_OP_RECV_CLOSE_ON_SERVER));
958 }
959 
Init_grpc_metadata_keys()960 static void Init_grpc_metadata_keys() {
961   VALUE grpc_rb_mMetadataKeys =
962       rb_define_module_under(grpc_rb_mGrpcCore, "MetadataKeys");
963   rb_define_const(grpc_rb_mMetadataKeys, "COMPRESSION_REQUEST_ALGORITHM",
964                   rb_str_new2(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY));
965 }
966 
Init_grpc_call()967 void Init_grpc_call() {
968   /* CallError inherits from Exception to signal that it is non-recoverable */
969   grpc_rb_eCallError =
970       rb_define_class_under(grpc_rb_mGrpcCore, "CallError", rb_eException);
971   grpc_rb_eOutOfTime =
972       rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException);
973   grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject);
974   grpc_rb_cMdAry =
975       rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject);
976 
977   /* Prevent allocation or inialization of the Call class */
978   rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc);
979   rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0);
980   rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy,
981                    1);
982 
983   /* Add ruby analogues of the Call methods. */
984   rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1);
985   rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
986   rb_define_method(grpc_rb_cCall, "cancel_with_status",
987                    grpc_rb_call_cancel_with_status, 2);
988   rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
989   rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
990   rb_define_method(grpc_rb_cCall, "peer_cert", grpc_rb_call_get_peer_cert, 0);
991   rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0);
992   rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
993   rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
994   rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
995   rb_define_method(grpc_rb_cCall, "trailing_metadata",
996                    grpc_rb_call_get_trailing_metadata, 0);
997   rb_define_method(grpc_rb_cCall,
998                    "trailing_metadata=", grpc_rb_call_set_trailing_metadata, 1);
999   rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
1000   rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
1001                    1);
1002   rb_define_method(grpc_rb_cCall, "set_credentials!",
1003                    grpc_rb_call_set_credentials, 1);
1004 
1005   /* Ids used to support call attributes */
1006   id_metadata = rb_intern("metadata");
1007   id_trailing_metadata = rb_intern("trailing_metadata");
1008   id_status = rb_intern("status");
1009   id_write_flag = rb_intern("write_flag");
1010 
1011   /* Ids used by the c wrapping internals. */
1012   id_credentials = rb_intern("__credentials");
1013 
1014   /* Ids used in constructing the batch result. */
1015   sym_send_message = ID2SYM(rb_intern("send_message"));
1016   sym_send_metadata = ID2SYM(rb_intern("send_metadata"));
1017   sym_send_close = ID2SYM(rb_intern("send_close"));
1018   sym_send_status = ID2SYM(rb_intern("send_status"));
1019   sym_message = ID2SYM(rb_intern("message"));
1020   sym_status = ID2SYM(rb_intern("status"));
1021   sym_cancelled = ID2SYM(rb_intern("cancelled"));
1022 
1023   /* The Struct used to return the run_batch result. */
1024   grpc_rb_sBatchResult = rb_struct_define(
1025       "BatchResult", "send_message", "send_metadata", "send_close",
1026       "send_status", "message", "metadata", "status", "cancelled", NULL);
1027 
1028   Init_grpc_error_codes();
1029   Init_grpc_op_codes();
1030   Init_grpc_write_flags();
1031   Init_grpc_metadata_keys();
1032 }
1033 
1034 /* Gets the call from the ruby object */
grpc_rb_get_wrapped_call(VALUE v)1035 grpc_call* grpc_rb_get_wrapped_call(VALUE v) {
1036   grpc_rb_call* call = NULL;
1037   TypedData_Get_Struct(v, grpc_rb_call, &grpc_call_data_type, call);
1038   return call->wrapped;
1039 }
1040 
1041 /* Obtains the wrapped object for a given call */
grpc_rb_wrap_call(grpc_call * c,grpc_completion_queue * q)1042 VALUE grpc_rb_wrap_call(grpc_call* c, grpc_completion_queue* q) {
1043   grpc_rb_call* wrapper;
1044   if (c == NULL || q == NULL) {
1045     return Qnil;
1046   }
1047   wrapper = ALLOC(grpc_rb_call);
1048   wrapper->wrapped = c;
1049   wrapper->queue = q;
1050   return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper);
1051 }
1052