1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <ruby/ruby.h>
20 
21 #include "rb_call.h"
22 #include "rb_grpc_imports.generated.h"
23 
24 #include <grpc/grpc.h>
25 #include <grpc/impl/codegen/compression_types.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 
29 #include "rb_byte_buffer.h"
30 #include "rb_call_credentials.h"
31 #include "rb_completion_queue.h"
32 #include "rb_grpc.h"
33 
34 /* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */
35 static VALUE grpc_rb_cCall;
36 
37 /* grpc_rb_eCallError is the ruby class of the exception thrown during call
38    operations; */
39 VALUE grpc_rb_eCallError = Qnil;
40 
41 /* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate
42    a timeout. */
43 static VALUE grpc_rb_eOutOfTime = Qnil;
44 
45 /* grpc_rb_sBatchResult is struct class used to hold the results of a batch
46  * call. */
47 static VALUE grpc_rb_sBatchResult;
48 
49 /* grpc_rb_cMdAry is the MetadataArray class whose instances proxy
50  * grpc_metadata_array. */
51 static VALUE grpc_rb_cMdAry;
52 
53 /* id_credentials is the name of the hidden ivar that preserves the value
54  * of the credentials added to the call */
55 static ID id_credentials;
56 
57 /* id_metadata is name of the attribute used to access the metadata hash
58  * received by the call and subsequently saved on it. */
59 static ID id_metadata;
60 
61 /* id_trailing_metadata is the name of the attribute used to access the trailing
62  * metadata hash received by the call and subsequently saved on it. */
63 static ID id_trailing_metadata;
64 
65 /* id_status is name of the attribute used to access the status object
66  * received by the call and subsequently saved on it. */
67 static ID id_status;
68 
69 /* id_write_flag is name of the attribute used to access the write_flag
70  * saved on the call. */
71 static ID id_write_flag;
72 
73 /* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
74 static VALUE sym_send_message;
75 static VALUE sym_send_metadata;
76 static VALUE sym_send_close;
77 static VALUE sym_send_status;
78 static VALUE sym_message;
79 static VALUE sym_status;
80 static VALUE sym_cancelled;
81 
82 typedef struct grpc_rb_call {
83   grpc_call* wrapped;
84   grpc_completion_queue* queue;
85 } grpc_rb_call;
86 
destroy_call(grpc_rb_call * call)87 static void destroy_call(grpc_rb_call* call) {
88   /* Ensure that we only try to destroy the call once */
89   if (call->wrapped != NULL) {
90     grpc_call_unref(call->wrapped);
91     call->wrapped = NULL;
92     grpc_rb_completion_queue_destroy(call->queue);
93     call->queue = NULL;
94   }
95 }
96 
97 /* Destroys a Call. */
grpc_rb_call_destroy(void * p)98 static void grpc_rb_call_destroy(void* p) {
99   if (p == NULL) {
100     return;
101   }
102   destroy_call((grpc_rb_call*)p);
103   xfree(p);
104 }
105 
106 static const rb_data_type_t grpc_rb_md_ary_data_type = {
107     "grpc_metadata_array",
108     {GRPC_RB_GC_NOT_MARKED,
109      GRPC_RB_GC_DONT_FREE,
110      GRPC_RB_MEMSIZE_UNAVAILABLE,
111      {NULL, NULL}},
112     NULL,
113     NULL,
114 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
115     /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
116      * grpc_rb_call_destroy
117      * touches a hash object.
118      * TODO(yugui) Directly use st_table and call the free function earlier?
119      */
120     0,
121 #endif
122 };
123 
124 /* Describes grpc_call struct for RTypedData */
125 static const rb_data_type_t grpc_call_data_type = {"grpc_call",
126                                                    {GRPC_RB_GC_NOT_MARKED,
127                                                     grpc_rb_call_destroy,
128                                                     GRPC_RB_MEMSIZE_UNAVAILABLE,
129                                                     {NULL, NULL}},
130                                                    NULL,
131                                                    NULL,
132 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
133                                                    RUBY_TYPED_FREE_IMMEDIATELY
134 #endif
135 };
136 
137 /* Error code details is a hash containing text strings describing errors */
138 VALUE rb_error_code_details;
139 
140 /* Obtains the error detail string for given error code */
grpc_call_error_detail_of(grpc_call_error err)141 const char* grpc_call_error_detail_of(grpc_call_error err) {
142   VALUE detail_ref = rb_hash_aref(rb_error_code_details, UINT2NUM(err));
143   const char* detail = "unknown error code!";
144   if (detail_ref != Qnil) {
145     detail = StringValueCStr(detail_ref);
146   }
147   return detail;
148 }
149 
150 /* Called by clients to cancel an RPC on the server.
151    Can be called multiple times, from any thread. */
grpc_rb_call_cancel(VALUE self)152 static VALUE grpc_rb_call_cancel(VALUE self) {
153   grpc_rb_call* call = NULL;
154   grpc_call_error err;
155   if (RTYPEDDATA_DATA(self) == NULL) {
156     // This call has been closed
157     return Qnil;
158   }
159 
160   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
161   err = grpc_call_cancel(call->wrapped, NULL);
162   if (err != GRPC_CALL_OK) {
163     rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)",
164              grpc_call_error_detail_of(err), err);
165   }
166 
167   return Qnil;
168 }
169 
170 /* TODO: expose this as part of the surface API if needed.
171  * This is meant for internal usage by the "write thread" of grpc-ruby
172  * client-side bidi calls. It provides a way for the background write-thread
173  * to propagate failures to the main read-thread and give the user an error
174  * message. */
grpc_rb_call_cancel_with_status(VALUE self,VALUE status_code,VALUE details)175 static VALUE grpc_rb_call_cancel_with_status(VALUE self, VALUE status_code,
176                                              VALUE details) {
177   grpc_rb_call* call = NULL;
178   grpc_call_error err;
179   if (RTYPEDDATA_DATA(self) == NULL) {
180     // This call has been closed
181     return Qnil;
182   }
183 
184   if (TYPE(details) != T_STRING || TYPE(status_code) != T_FIXNUM) {
185     rb_raise(rb_eTypeError,
186              "Bad parameter type error for cancel with status. Want Fixnum, "
187              "String.");
188     return Qnil;
189   }
190 
191   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
192   err = grpc_call_cancel_with_status(call->wrapped, NUM2LONG(status_code),
193                                      StringValueCStr(details), NULL);
194   if (err != GRPC_CALL_OK) {
195     rb_raise(grpc_rb_eCallError, "cancel with status failed: %s (code=%d)",
196              grpc_call_error_detail_of(err), err);
197   }
198 
199   return Qnil;
200 }
201 
202 /* Releases the c-level resources associated with a call
203    Once a call has been closed, no further requests can be
204    processed.
205 */
grpc_rb_call_close(VALUE self)206 static VALUE grpc_rb_call_close(VALUE self) {
207   grpc_rb_call* call = NULL;
208   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
209   if (call != NULL) {
210     destroy_call(call);
211     xfree(RTYPEDDATA_DATA(self));
212     RTYPEDDATA_DATA(self) = NULL;
213   }
214   return Qnil;
215 }
216 
217 /* Called to obtain the peer that this call is connected to. */
grpc_rb_call_get_peer(VALUE self)218 static VALUE grpc_rb_call_get_peer(VALUE self) {
219   VALUE res = Qnil;
220   grpc_rb_call* call = NULL;
221   char* peer = NULL;
222   if (RTYPEDDATA_DATA(self) == NULL) {
223     rb_raise(grpc_rb_eCallError, "Cannot get peer value on closed call");
224     return Qnil;
225   }
226   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
227   peer = grpc_call_get_peer(call->wrapped);
228   res = rb_str_new2(peer);
229   gpr_free(peer);
230 
231   return res;
232 }
233 
234 /* Called to obtain the x509 cert of an authenticated peer. */
grpc_rb_call_get_peer_cert(VALUE self)235 static VALUE grpc_rb_call_get_peer_cert(VALUE self) {
236   grpc_rb_call* call = NULL;
237   VALUE res = Qnil;
238   grpc_auth_context* ctx = NULL;
239   if (RTYPEDDATA_DATA(self) == NULL) {
240     rb_raise(grpc_rb_eCallError, "Cannot get peer cert on closed call");
241     return Qnil;
242   }
243   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
244 
245   ctx = grpc_call_auth_context(call->wrapped);
246 
247   if (!ctx || !grpc_auth_context_peer_is_authenticated(ctx)) {
248     return Qnil;
249   }
250 
251   {
252     grpc_auth_property_iterator it = grpc_auth_context_find_properties_by_name(
253         ctx, GRPC_X509_PEM_CERT_PROPERTY_NAME);
254     const grpc_auth_property* prop = grpc_auth_property_iterator_next(&it);
255     if (prop == NULL) {
256       return Qnil;
257     }
258 
259     res = rb_str_new2(prop->value);
260   }
261 
262   grpc_auth_context_release(ctx);
263 
264   return res;
265 }
266 
267 /*
268   call-seq:
269   status = call.status
270 
271   Gets the status object saved the call.  */
grpc_rb_call_get_status(VALUE self)272 static VALUE grpc_rb_call_get_status(VALUE self) {
273   return rb_ivar_get(self, id_status);
274 }
275 
276 /*
277   call-seq:
278   call.status = status
279 
280   Saves a status object on the call.  */
grpc_rb_call_set_status(VALUE self,VALUE status)281 static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) {
282   if (!NIL_P(status) && rb_obj_class(status) != grpc_rb_sStatus) {
283     rb_raise(rb_eTypeError, "bad status: got:<%s> want: <Struct::Status>",
284              rb_obj_classname(status));
285     return Qnil;
286   }
287 
288   return rb_ivar_set(self, id_status, status);
289 }
290 
291 /*
292   call-seq:
293   metadata = call.metadata
294 
295   Gets the metadata object saved the call.  */
grpc_rb_call_get_metadata(VALUE self)296 static VALUE grpc_rb_call_get_metadata(VALUE self) {
297   return rb_ivar_get(self, id_metadata);
298 }
299 
300 /*
301   call-seq:
302   call.metadata = metadata
303 
304   Saves the metadata hash on the call.  */
grpc_rb_call_set_metadata(VALUE self,VALUE metadata)305 static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
306   if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
307     rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
308              rb_obj_classname(metadata));
309     return Qnil;
310   }
311 
312   return rb_ivar_set(self, id_metadata, metadata);
313 }
314 
315 /*
316   call-seq:
317   trailing_metadata = call.trailing_metadata
318 
319   Gets the trailing metadata object saved on the call */
grpc_rb_call_get_trailing_metadata(VALUE self)320 static VALUE grpc_rb_call_get_trailing_metadata(VALUE self) {
321   return rb_ivar_get(self, id_trailing_metadata);
322 }
323 
324 /*
325   call-seq:
326   call.trailing_metadata = trailing_metadata
327 
328   Saves the trailing metadata hash on the call. */
grpc_rb_call_set_trailing_metadata(VALUE self,VALUE metadata)329 static VALUE grpc_rb_call_set_trailing_metadata(VALUE self, VALUE metadata) {
330   if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
331     rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
332              rb_obj_classname(metadata));
333     return Qnil;
334   }
335 
336   return rb_ivar_set(self, id_trailing_metadata, metadata);
337 }
338 
339 /*
340   call-seq:
341   write_flag = call.write_flag
342 
343   Gets the write_flag value saved the call.  */
grpc_rb_call_get_write_flag(VALUE self)344 static VALUE grpc_rb_call_get_write_flag(VALUE self) {
345   return rb_ivar_get(self, id_write_flag);
346 }
347 
348 /*
349   call-seq:
350   call.write_flag = write_flag
351 
352   Saves the write_flag on the call.  */
grpc_rb_call_set_write_flag(VALUE self,VALUE write_flag)353 static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
354   if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) {
355     rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>",
356              rb_obj_classname(write_flag));
357     return Qnil;
358   }
359 
360   return rb_ivar_set(self, id_write_flag, write_flag);
361 }
362 
363 /*
364   call-seq:
365   call.set_credentials call_credentials
366 
367   Sets credentials on a call */
grpc_rb_call_set_credentials(VALUE self,VALUE credentials)368 static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) {
369   grpc_rb_call* call = NULL;
370   grpc_call_credentials* creds;
371   grpc_call_error err;
372   if (RTYPEDDATA_DATA(self) == NULL) {
373     rb_raise(grpc_rb_eCallError, "Cannot set credentials of closed call");
374     return Qnil;
375   }
376   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
377   creds = grpc_rb_get_wrapped_call_credentials(credentials);
378   err = grpc_call_set_credentials(call->wrapped, creds);
379   if (err != GRPC_CALL_OK) {
380     rb_raise(grpc_rb_eCallError,
381              "grpc_call_set_credentials failed with %s (code=%d)",
382              grpc_call_error_detail_of(err), err);
383   }
384   /* We need the credentials to be alive for as long as the call is alive,
385      but we don't care about destruction order. */
386   rb_ivar_set(self, id_credentials, credentials);
387   return Qnil;
388 }
389 
390 /* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
391    to fill grpc_metadata_array.
392 
393    it's capacity should have been computed via a prior call to
394    grpc_rb_md_ary_capacity_hash_cb
395 */
grpc_rb_md_ary_fill_hash_cb(VALUE key,VALUE val,VALUE md_ary_obj)396 static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
397   grpc_metadata_array* md_ary = NULL;
398   long array_length;
399   long i;
400   grpc_slice key_slice;
401   grpc_slice value_slice;
402   char* tmp_str = NULL;
403 
404   if (TYPE(key) == T_SYMBOL) {
405     key_slice = grpc_slice_from_static_string(rb_id2name(SYM2ID(key)));
406   } else if (TYPE(key) == T_STRING) {
407     key_slice =
408         grpc_slice_from_copied_buffer(RSTRING_PTR(key), RSTRING_LEN(key));
409   } else {
410     rb_raise(rb_eTypeError,
411              "grpc_rb_md_ary_fill_hash_cb: bad type for key parameter");
412     return ST_STOP;
413   }
414 
415   if (!grpc_header_key_is_legal(key_slice)) {
416     tmp_str = grpc_slice_to_c_string(key_slice);
417     rb_raise(rb_eArgError,
418              "'%s' is an invalid header key, must match [a-z0-9-_.]+", tmp_str);
419     return ST_STOP;
420   }
421 
422   /* Construct a metadata object from key and value and add it */
423   TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
424                        &grpc_rb_md_ary_data_type, md_ary);
425 
426   if (TYPE(val) == T_ARRAY) {
427     array_length = RARRAY_LEN(val);
428     /* If the value is an array, add capacity for each value in the array */
429     for (i = 0; i < array_length; i++) {
430       value_slice = grpc_slice_from_copied_buffer(
431           RSTRING_PTR(rb_ary_entry(val, i)), RSTRING_LEN(rb_ary_entry(val, i)));
432       if (!grpc_is_binary_header(key_slice) &&
433           !grpc_header_nonbin_value_is_legal(value_slice)) {
434         // The value has invalid characters
435         tmp_str = grpc_slice_to_c_string(value_slice);
436         rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
437                  tmp_str);
438         return ST_STOP;
439       }
440       GPR_ASSERT(md_ary->count < md_ary->capacity);
441       md_ary->metadata[md_ary->count].key = key_slice;
442       md_ary->metadata[md_ary->count].value = value_slice;
443       md_ary->count += 1;
444     }
445   } else if (TYPE(val) == T_STRING) {
446     value_slice =
447         grpc_slice_from_copied_buffer(RSTRING_PTR(val), RSTRING_LEN(val));
448     if (!grpc_is_binary_header(key_slice) &&
449         !grpc_header_nonbin_value_is_legal(value_slice)) {
450       // The value has invalid characters
451       tmp_str = grpc_slice_to_c_string(value_slice);
452       rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
453                tmp_str);
454       return ST_STOP;
455     }
456     GPR_ASSERT(md_ary->count < md_ary->capacity);
457     md_ary->metadata[md_ary->count].key = key_slice;
458     md_ary->metadata[md_ary->count].value = value_slice;
459     md_ary->count += 1;
460   } else {
461     rb_raise(rb_eArgError, "Header values must be of type string or array");
462     return ST_STOP;
463   }
464   return ST_CONTINUE;
465 }
466 
467 /* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used
468    to pre-compute the capacity a grpc_metadata_array.
469 */
grpc_rb_md_ary_capacity_hash_cb(VALUE key,VALUE val,VALUE md_ary_obj)470 static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
471                                            VALUE md_ary_obj) {
472   grpc_metadata_array* md_ary = NULL;
473 
474   (void)key;
475 
476   /* Construct a metadata object from key and value and add it */
477   TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
478                        &grpc_rb_md_ary_data_type, md_ary);
479 
480   if (TYPE(val) == T_ARRAY) {
481     /* If the value is an array, add capacity for each value in the array */
482     md_ary->capacity += RARRAY_LEN(val);
483   } else {
484     md_ary->capacity += 1;
485   }
486 
487   return ST_CONTINUE;
488 }
489 
490 /* grpc_rb_md_ary_convert converts a ruby metadata hash into
491    a grpc_metadata_array.
492 */
grpc_rb_md_ary_convert(VALUE md_ary_hash,grpc_metadata_array * md_ary)493 void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array* md_ary) {
494   VALUE md_ary_obj = Qnil;
495   if (md_ary_hash == Qnil) {
496     return; /* Do nothing if the expected has value is nil */
497   }
498   if (TYPE(md_ary_hash) != T_HASH) {
499     rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
500              rb_obj_classname(md_ary_hash));
501     return;
502   }
503 
504   /* Initialize the array, compute it's capacity, then fill it. */
505   grpc_metadata_array_init(md_ary);
506   md_ary_obj =
507       TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
508   rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
509   md_ary->metadata = gpr_zalloc(md_ary->capacity * sizeof(grpc_metadata));
510   rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
511 }
512 
513 /* Converts a metadata array to a hash. */
grpc_rb_md_ary_to_h(grpc_metadata_array * md_ary)514 VALUE grpc_rb_md_ary_to_h(grpc_metadata_array* md_ary) {
515   VALUE key = Qnil;
516   VALUE new_ary = Qnil;
517   VALUE value = Qnil;
518   VALUE result = rb_hash_new();
519   size_t i;
520 
521   for (i = 0; i < md_ary->count; i++) {
522     key = grpc_rb_slice_to_ruby_string(md_ary->metadata[i].key);
523     value = rb_hash_aref(result, key);
524     if (value == Qnil) {
525       value = grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value);
526       rb_hash_aset(result, key, value);
527     } else if (TYPE(value) == T_ARRAY) {
528       /* Add the string to the returned array */
529       rb_ary_push(value,
530                   grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
531     } else {
532       /* Add the current value with this key and the new one to an array */
533       new_ary = rb_ary_new();
534       rb_ary_push(new_ary, value);
535       rb_ary_push(new_ary,
536                   grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
537       rb_hash_aset(result, key, new_ary);
538     }
539   }
540   return result;
541 }
542 
543 /* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks
544    each key of an ops hash is valid.
545 */
grpc_rb_call_check_op_keys_hash_cb(VALUE key,VALUE val,VALUE ops_ary)546 static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
547                                               VALUE ops_ary) {
548   (void)val;
549   /* Update the capacity; the value is an array, add capacity for each value in
550    * the array */
551   if (TYPE(key) != T_FIXNUM) {
552     rb_raise(rb_eTypeError, "invalid operation : got <%s>, want <Fixnum>",
553              rb_obj_classname(key));
554     return ST_STOP;
555   }
556   switch (NUM2INT(key)) {
557     case GRPC_OP_SEND_INITIAL_METADATA:
558     case GRPC_OP_SEND_MESSAGE:
559     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
560     case GRPC_OP_SEND_STATUS_FROM_SERVER:
561     case GRPC_OP_RECV_INITIAL_METADATA:
562     case GRPC_OP_RECV_MESSAGE:
563     case GRPC_OP_RECV_STATUS_ON_CLIENT:
564     case GRPC_OP_RECV_CLOSE_ON_SERVER:
565       rb_ary_push(ops_ary, key);
566       return ST_CONTINUE;
567     default:
568       rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key));
569   };
570   return ST_STOP;
571 }
572 
573 /* grpc_rb_op_update_status_from_server adds the values in a ruby status
574    struct to the 'send_status_from_server' portion of an op.
575 */
grpc_rb_op_update_status_from_server(grpc_op * op,grpc_metadata_array * md_ary,grpc_slice * send_status_details,VALUE status)576 static void grpc_rb_op_update_status_from_server(
577     grpc_op* op, grpc_metadata_array* md_ary, grpc_slice* send_status_details,
578     VALUE status) {
579   VALUE code = rb_struct_aref(status, sym_code);
580   VALUE details = rb_struct_aref(status, sym_details);
581   VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
582 
583   /* TODO: add check to ensure status is the correct struct type */
584   if (TYPE(code) != T_FIXNUM) {
585     rb_raise(rb_eTypeError, "invalid code : got <%s>, want <Fixnum>",
586              rb_obj_classname(code));
587     return;
588   }
589   if (TYPE(details) != T_STRING) {
590     rb_raise(rb_eTypeError, "invalid details : got <%s>, want <String>",
591              rb_obj_classname(code));
592     return;
593   }
594 
595   *send_status_details =
596       grpc_slice_from_copied_buffer(RSTRING_PTR(details), RSTRING_LEN(details));
597 
598   op->data.send_status_from_server.status = NUM2INT(code);
599   op->data.send_status_from_server.status_details = send_status_details;
600   grpc_rb_md_ary_convert(metadata_hash, md_ary);
601   op->data.send_status_from_server.trailing_metadata_count = md_ary->count;
602   op->data.send_status_from_server.trailing_metadata = md_ary->metadata;
603 }
604 
605 /* run_batch_stack holds various values used by the
606  * grpc_rb_call_run_batch function */
607 typedef struct run_batch_stack {
608   /* The batch ops */
609   grpc_op ops[8]; /* 8 is the maximum number of operations */
610   size_t op_num;  /* tracks the last added operation */
611 
612   /* Data being sent */
613   grpc_metadata_array send_metadata;
614   grpc_metadata_array send_trailing_metadata;
615 
616   /* Data being received */
617   grpc_byte_buffer* recv_message;
618   grpc_metadata_array recv_metadata;
619   grpc_metadata_array recv_trailing_metadata;
620   int recv_cancelled;
621   grpc_status_code recv_status;
622   grpc_slice recv_status_details;
623   const char* recv_status_debug_error_string;
624   unsigned write_flag;
625   grpc_slice send_status_details;
626 } run_batch_stack;
627 
628 /* grpc_run_batch_stack_init ensures the run_batch_stack is properly
629  * initialized */
grpc_run_batch_stack_init(run_batch_stack * st,unsigned write_flag)630 static void grpc_run_batch_stack_init(run_batch_stack* st,
631                                       unsigned write_flag) {
632   MEMZERO(st, run_batch_stack, 1);
633   grpc_metadata_array_init(&st->send_metadata);
634   grpc_metadata_array_init(&st->send_trailing_metadata);
635   grpc_metadata_array_init(&st->recv_metadata);
636   grpc_metadata_array_init(&st->recv_trailing_metadata);
637   st->op_num = 0;
638   st->write_flag = write_flag;
639 }
640 
grpc_rb_metadata_array_destroy_including_entries(grpc_metadata_array * array)641 void grpc_rb_metadata_array_destroy_including_entries(
642     grpc_metadata_array* array) {
643   size_t i;
644   if (array->metadata) {
645     for (i = 0; i < array->count; i++) {
646       grpc_slice_unref(array->metadata[i].key);
647       grpc_slice_unref(array->metadata[i].value);
648     }
649   }
650   grpc_metadata_array_destroy(array);
651 }
652 
653 /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
654  * cleaned up */
grpc_run_batch_stack_cleanup(run_batch_stack * st)655 static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
656   size_t i = 0;
657 
658   grpc_rb_metadata_array_destroy_including_entries(&st->send_metadata);
659   grpc_rb_metadata_array_destroy_including_entries(&st->send_trailing_metadata);
660   grpc_metadata_array_destroy(&st->recv_metadata);
661   grpc_metadata_array_destroy(&st->recv_trailing_metadata);
662 
663   if (GRPC_SLICE_START_PTR(st->send_status_details) != NULL) {
664     grpc_slice_unref(st->send_status_details);
665   }
666 
667   if (GRPC_SLICE_START_PTR(st->recv_status_details) != NULL) {
668     grpc_slice_unref(st->recv_status_details);
669   }
670 
671   if (st->recv_message != NULL) {
672     grpc_byte_buffer_destroy(st->recv_message);
673   }
674 
675   for (i = 0; i < st->op_num; i++) {
676     if (st->ops[i].op == GRPC_OP_SEND_MESSAGE) {
677       grpc_byte_buffer_destroy(st->ops[i].data.send_message.send_message);
678     }
679   }
680 }
681 
682 /* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
683  * ops_hash */
grpc_run_batch_stack_fill_ops(run_batch_stack * st,VALUE ops_hash)684 static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
685   VALUE this_op = Qnil;
686   VALUE this_value = Qnil;
687   VALUE ops_ary = rb_ary_new();
688   size_t i = 0;
689 
690   /* Create a ruby array with just the operation keys */
691   rb_hash_foreach(ops_hash, grpc_rb_call_check_op_keys_hash_cb, ops_ary);
692 
693   /* Fill the ops array */
694   for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
695     this_op = rb_ary_entry(ops_ary, i);
696     this_value = rb_hash_aref(ops_hash, this_op);
697     st->ops[st->op_num].flags = 0;
698     switch (NUM2INT(this_op)) {
699       case GRPC_OP_SEND_INITIAL_METADATA:
700         grpc_rb_md_ary_convert(this_value, &st->send_metadata);
701         st->ops[st->op_num].data.send_initial_metadata.count =
702             st->send_metadata.count;
703         st->ops[st->op_num].data.send_initial_metadata.metadata =
704             st->send_metadata.metadata;
705         break;
706       case GRPC_OP_SEND_MESSAGE:
707         st->ops[st->op_num].data.send_message.send_message =
708             grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
709                                      RSTRING_LEN(this_value));
710         st->ops[st->op_num].flags = st->write_flag;
711         break;
712       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
713         break;
714       case GRPC_OP_SEND_STATUS_FROM_SERVER:
715         grpc_rb_op_update_status_from_server(
716             &st->ops[st->op_num], &st->send_trailing_metadata,
717             &st->send_status_details, this_value);
718         break;
719       case GRPC_OP_RECV_INITIAL_METADATA:
720         st->ops[st->op_num].data.recv_initial_metadata.recv_initial_metadata =
721             &st->recv_metadata;
722         break;
723       case GRPC_OP_RECV_MESSAGE:
724         st->ops[st->op_num].data.recv_message.recv_message = &st->recv_message;
725         break;
726       case GRPC_OP_RECV_STATUS_ON_CLIENT:
727         st->ops[st->op_num].data.recv_status_on_client.trailing_metadata =
728             &st->recv_trailing_metadata;
729         st->ops[st->op_num].data.recv_status_on_client.status =
730             &st->recv_status;
731         st->ops[st->op_num].data.recv_status_on_client.status_details =
732             &st->recv_status_details;
733         st->ops[st->op_num].data.recv_status_on_client.error_string =
734             &st->recv_status_debug_error_string;
735         break;
736       case GRPC_OP_RECV_CLOSE_ON_SERVER:
737         st->ops[st->op_num].data.recv_close_on_server.cancelled =
738             &st->recv_cancelled;
739         break;
740       default:
741         grpc_run_batch_stack_cleanup(st);
742         rb_raise(rb_eTypeError, "invalid operation : bad value %d",
743                  NUM2INT(this_op));
744     };
745     st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
746     st->ops[st->op_num].reserved = NULL;
747     st->op_num++;
748   }
749 }
750 
751 /* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
752    after the results have run */
grpc_run_batch_stack_build_result(run_batch_stack * st)753 static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
754   size_t i = 0;
755   VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil,
756                                Qnil, Qnil, Qnil, Qnil, NULL);
757   for (i = 0; i < st->op_num; i++) {
758     switch (st->ops[i].op) {
759       case GRPC_OP_SEND_INITIAL_METADATA:
760         rb_struct_aset(result, sym_send_metadata, Qtrue);
761         break;
762       case GRPC_OP_SEND_MESSAGE:
763         rb_struct_aset(result, sym_send_message, Qtrue);
764         break;
765       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
766         rb_struct_aset(result, sym_send_close, Qtrue);
767         break;
768       case GRPC_OP_SEND_STATUS_FROM_SERVER:
769         rb_struct_aset(result, sym_send_status, Qtrue);
770         break;
771       case GRPC_OP_RECV_INITIAL_METADATA:
772         rb_struct_aset(result, sym_metadata,
773                        grpc_rb_md_ary_to_h(&st->recv_metadata));
774       case GRPC_OP_RECV_MESSAGE:
775         rb_struct_aset(result, sym_message,
776                        grpc_rb_byte_buffer_to_s(st->recv_message));
777         break;
778       case GRPC_OP_RECV_STATUS_ON_CLIENT:
779         rb_struct_aset(
780             result, sym_status,
781             rb_struct_new(
782                 grpc_rb_sStatus, UINT2NUM(st->recv_status),
783                 (GRPC_SLICE_START_PTR(st->recv_status_details) == NULL
784                      ? Qnil
785                      : grpc_rb_slice_to_ruby_string(st->recv_status_details)),
786                 grpc_rb_md_ary_to_h(&st->recv_trailing_metadata),
787                 st->recv_status_debug_error_string == NULL
788                     ? Qnil
789                     : rb_str_new_cstr(st->recv_status_debug_error_string),
790                 NULL));
791         gpr_free((void*)st->recv_status_debug_error_string);
792         break;
793       case GRPC_OP_RECV_CLOSE_ON_SERVER:
794         rb_struct_aset(result, sym_send_close, Qtrue);
795         break;
796       default:
797         break;
798     }
799   }
800   return result;
801 }
802 
803 /* call-seq:
804    ops = {
805      GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
806      GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
807      ...
808    }
809    tag = Object.new
810    timeout = 10
811    call.start_batch(tag, timeout, ops)
812 
813    Start a batch of operations defined in the array ops; when complete, post a
814    completion of type 'tag' to the completion queue bound to the call.
815 
816    Also waits for the batch to complete, until timeout is reached.
817    The order of ops specified in the batch has no significance.
818    Only one operation of each type can be active at once in any given
819    batch */
grpc_rb_call_run_batch(VALUE self,VALUE ops_hash)820 static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
821   run_batch_stack* st = NULL;
822   grpc_rb_call* call = NULL;
823   grpc_event ev;
824   grpc_call_error err;
825   VALUE result = Qnil;
826   VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
827   unsigned write_flag = 0;
828   void* tag = (void*)&st;
829 
830   grpc_ruby_fork_guard();
831   if (RTYPEDDATA_DATA(self) == NULL) {
832     rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
833     return Qnil;
834   }
835   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
836 
837   /* Validate the ops args, adding them to a ruby array */
838   if (TYPE(ops_hash) != T_HASH) {
839     rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
840     return Qnil;
841   }
842   if (rb_write_flag != Qnil) {
843     write_flag = NUM2UINT(rb_write_flag);
844   }
845   st = gpr_malloc(sizeof(run_batch_stack));
846   grpc_run_batch_stack_init(st, write_flag);
847   grpc_run_batch_stack_fill_ops(st, ops_hash);
848 
849   /* call grpc_call_start_batch, then wait for it to complete using
850    * pluck_event */
851   err = grpc_call_start_batch(call->wrapped, st->ops, st->op_num, tag, NULL);
852   if (err != GRPC_CALL_OK) {
853     grpc_run_batch_stack_cleanup(st);
854     gpr_free(st);
855     rb_raise(grpc_rb_eCallError,
856              "grpc_call_start_batch failed with %s (code=%d)",
857              grpc_call_error_detail_of(err), err);
858     return Qnil;
859   }
860   ev = rb_completion_queue_pluck(call->queue, tag,
861                                  gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
862   if (!ev.success) {
863     rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
864   }
865   /* Build and return the BatchResult struct result,
866      if there is an error, it's reflected in the status */
867   result = grpc_run_batch_stack_build_result(st);
868   grpc_run_batch_stack_cleanup(st);
869   gpr_free(st);
870   return result;
871 }
872 
Init_grpc_write_flags()873 static void Init_grpc_write_flags() {
874   /* Constants representing the write flags in grpc.h */
875   VALUE grpc_rb_mWriteFlags =
876       rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags");
877   rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT",
878                   UINT2NUM(GRPC_WRITE_BUFFER_HINT));
879   rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS",
880                   UINT2NUM(GRPC_WRITE_NO_COMPRESS));
881 }
882 
Init_grpc_error_codes()883 static void Init_grpc_error_codes() {
884   /* Constants representing the error codes of grpc_call_error in grpc.h */
885   VALUE grpc_rb_mRpcErrors =
886       rb_define_module_under(grpc_rb_mGrpcCore, "RpcErrors");
887   rb_define_const(grpc_rb_mRpcErrors, "OK", UINT2NUM(GRPC_CALL_OK));
888   rb_define_const(grpc_rb_mRpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR));
889   rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_SERVER",
890                   UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER));
891   rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_CLIENT",
892                   UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT));
893   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_ACCEPTED",
894                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED));
895   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_INVOKED",
896                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED));
897   rb_define_const(grpc_rb_mRpcErrors, "NOT_INVOKED",
898                   UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED));
899   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_FINISHED",
900                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED));
901   rb_define_const(grpc_rb_mRpcErrors, "TOO_MANY_OPERATIONS",
902                   UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS));
903   rb_define_const(grpc_rb_mRpcErrors, "INVALID_FLAGS",
904                   UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS));
905 
906   /* Hint the GC that this is a global and shouldn't be sweeped. */
907   rb_global_variable(&rb_error_code_details);
908 
909   /* Add the detail strings to a Hash */
910   rb_error_code_details = rb_hash_new();
911   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_OK),
912                rb_str_new2("ok"));
913   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR),
914                rb_str_new2("unknown error"));
915   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER),
916                rb_str_new2("not available on a server"));
917   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT),
918                rb_str_new2("not available on a client"));
919   rb_hash_aset(rb_error_code_details,
920                UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED),
921                rb_str_new2("call is already accepted"));
922   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED),
923                rb_str_new2("call is already invoked"));
924   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED),
925                rb_str_new2("call is not yet invoked"));
926   rb_hash_aset(rb_error_code_details,
927                UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED),
928                rb_str_new2("call is already finished"));
929   rb_hash_aset(rb_error_code_details,
930                UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS),
931                rb_str_new2("outstanding read or write present"));
932   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS),
933                rb_str_new2("a bad flag was given"));
934   rb_define_const(grpc_rb_mRpcErrors, "ErrorMessages", rb_error_code_details);
935   rb_obj_freeze(rb_error_code_details);
936 }
937 
Init_grpc_op_codes()938 static void Init_grpc_op_codes() {
939   /* Constants representing operation type codes in grpc.h */
940   VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
941   rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA",
942                   UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA));
943   rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE",
944                   UINT2NUM(GRPC_OP_SEND_MESSAGE));
945   rb_define_const(grpc_rb_mCallOps, "SEND_CLOSE_FROM_CLIENT",
946                   UINT2NUM(GRPC_OP_SEND_CLOSE_FROM_CLIENT));
947   rb_define_const(grpc_rb_mCallOps, "SEND_STATUS_FROM_SERVER",
948                   UINT2NUM(GRPC_OP_SEND_STATUS_FROM_SERVER));
949   rb_define_const(grpc_rb_mCallOps, "RECV_INITIAL_METADATA",
950                   UINT2NUM(GRPC_OP_RECV_INITIAL_METADATA));
951   rb_define_const(grpc_rb_mCallOps, "RECV_MESSAGE",
952                   UINT2NUM(GRPC_OP_RECV_MESSAGE));
953   rb_define_const(grpc_rb_mCallOps, "RECV_STATUS_ON_CLIENT",
954                   UINT2NUM(GRPC_OP_RECV_STATUS_ON_CLIENT));
955   rb_define_const(grpc_rb_mCallOps, "RECV_CLOSE_ON_SERVER",
956                   UINT2NUM(GRPC_OP_RECV_CLOSE_ON_SERVER));
957 }
958 
Init_grpc_metadata_keys()959 static void Init_grpc_metadata_keys() {
960   VALUE grpc_rb_mMetadataKeys =
961       rb_define_module_under(grpc_rb_mGrpcCore, "MetadataKeys");
962   rb_define_const(grpc_rb_mMetadataKeys, "COMPRESSION_REQUEST_ALGORITHM",
963                   rb_str_new2(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY));
964 }
965 
Init_grpc_call()966 void Init_grpc_call() {
967   /* CallError inherits from Exception to signal that it is non-recoverable */
968   grpc_rb_eCallError =
969       rb_define_class_under(grpc_rb_mGrpcCore, "CallError", rb_eException);
970   grpc_rb_eOutOfTime =
971       rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException);
972   grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject);
973   grpc_rb_cMdAry =
974       rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject);
975 
976   /* Prevent allocation or inialization of the Call class */
977   rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc);
978   rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0);
979   rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy,
980                    1);
981 
982   /* Add ruby analogues of the Call methods. */
983   rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1);
984   rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
985   rb_define_method(grpc_rb_cCall, "cancel_with_status",
986                    grpc_rb_call_cancel_with_status, 2);
987   rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
988   rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
989   rb_define_method(grpc_rb_cCall, "peer_cert", grpc_rb_call_get_peer_cert, 0);
990   rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0);
991   rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
992   rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
993   rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
994   rb_define_method(grpc_rb_cCall, "trailing_metadata",
995                    grpc_rb_call_get_trailing_metadata, 0);
996   rb_define_method(grpc_rb_cCall,
997                    "trailing_metadata=", grpc_rb_call_set_trailing_metadata, 1);
998   rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
999   rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
1000                    1);
1001   rb_define_method(grpc_rb_cCall, "set_credentials!",
1002                    grpc_rb_call_set_credentials, 1);
1003 
1004   /* Ids used to support call attributes */
1005   id_metadata = rb_intern("metadata");
1006   id_trailing_metadata = rb_intern("trailing_metadata");
1007   id_status = rb_intern("status");
1008   id_write_flag = rb_intern("write_flag");
1009 
1010   /* Ids used by the c wrapping internals. */
1011   id_credentials = rb_intern("__credentials");
1012 
1013   /* Ids used in constructing the batch result. */
1014   sym_send_message = ID2SYM(rb_intern("send_message"));
1015   sym_send_metadata = ID2SYM(rb_intern("send_metadata"));
1016   sym_send_close = ID2SYM(rb_intern("send_close"));
1017   sym_send_status = ID2SYM(rb_intern("send_status"));
1018   sym_message = ID2SYM(rb_intern("message"));
1019   sym_status = ID2SYM(rb_intern("status"));
1020   sym_cancelled = ID2SYM(rb_intern("cancelled"));
1021 
1022   /* The Struct used to return the run_batch result. */
1023   grpc_rb_sBatchResult = rb_struct_define(
1024       "BatchResult", "send_message", "send_metadata", "send_close",
1025       "send_status", "message", "metadata", "status", "cancelled", NULL);
1026 
1027   Init_grpc_error_codes();
1028   Init_grpc_op_codes();
1029   Init_grpc_write_flags();
1030   Init_grpc_metadata_keys();
1031 }
1032 
1033 /* Gets the call from the ruby object */
grpc_rb_get_wrapped_call(VALUE v)1034 grpc_call* grpc_rb_get_wrapped_call(VALUE v) {
1035   grpc_rb_call* call = NULL;
1036   TypedData_Get_Struct(v, grpc_rb_call, &grpc_call_data_type, call);
1037   return call->wrapped;
1038 }
1039 
1040 /* Obtains the wrapped object for a given call */
grpc_rb_wrap_call(grpc_call * c,grpc_completion_queue * q)1041 VALUE grpc_rb_wrap_call(grpc_call* c, grpc_completion_queue* q) {
1042   grpc_rb_call* wrapper;
1043   if (c == NULL || q == NULL) {
1044     return Qnil;
1045   }
1046   wrapper = ALLOC(grpc_rb_call);
1047   wrapper->wrapped = c;
1048   wrapper->queue = q;
1049   return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper);
1050 }
1051