1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/core/end2end/end2end_tests.h"
20 
21 #include <stdio.h>
22 #include <string.h>
23 
24 #include <grpc/byte_buffer.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28 #include "test/core/end2end/cq_verifier.h"
29 
tag(intptr_t t)30 static void* tag(intptr_t t) { return (void*)t; }
31 
begin_test(grpc_end2end_test_config config,const char * test_name,grpc_channel_args * client_args,grpc_channel_args * server_args)32 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
33                                             const char* test_name,
34                                             grpc_channel_args* client_args,
35                                             grpc_channel_args* server_args) {
36   grpc_end2end_test_fixture f;
37   gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
38   f = config.create_fixture(client_args, server_args);
39   config.init_server(&f, server_args);
40   config.init_client(&f, client_args);
41   return f;
42 }
43 
n_seconds_from_now(int n)44 static gpr_timespec n_seconds_from_now(int n) {
45   return grpc_timeout_seconds_to_deadline(n);
46 }
47 
five_seconds_from_now(void)48 static gpr_timespec five_seconds_from_now(void) {
49   return n_seconds_from_now(5);
50 }
51 
drain_cq(grpc_completion_queue * cq)52 static void drain_cq(grpc_completion_queue* cq) {
53   grpc_event ev;
54   do {
55     ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
56   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
57 }
58 
shutdown_server(grpc_end2end_test_fixture * f)59 static void shutdown_server(grpc_end2end_test_fixture* f) {
60   if (!f->server) return;
61   grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
62   GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
63                                          grpc_timeout_seconds_to_deadline(5),
64                                          nullptr)
65                  .type == GRPC_OP_COMPLETE);
66   grpc_server_destroy(f->server);
67   f->server = nullptr;
68 }
69 
shutdown_client(grpc_end2end_test_fixture * f)70 static void shutdown_client(grpc_end2end_test_fixture* f) {
71   if (!f->client) return;
72   grpc_channel_destroy(f->client);
73   f->client = nullptr;
74 }
75 
end_test(grpc_end2end_test_fixture * f)76 static void end_test(grpc_end2end_test_fixture* f) {
77   shutdown_server(f);
78   shutdown_client(f);
79 
80   grpc_completion_queue_shutdown(f->cq);
81   drain_cq(f->cq);
82   grpc_completion_queue_destroy(f->cq);
83   grpc_completion_queue_destroy(f->shutdown_cq);
84 }
85 
simple_request_body(grpc_end2end_test_config,grpc_end2end_test_fixture f)86 static void simple_request_body(grpc_end2end_test_config /*config*/,
87                                 grpc_end2end_test_fixture f) {
88   grpc_call* c;
89   grpc_call* s;
90   cq_verifier* cqv = cq_verifier_create(f.cq);
91   grpc_op ops[6];
92   grpc_op* op;
93   grpc_metadata_array initial_metadata_recv;
94   grpc_metadata_array trailing_metadata_recv;
95   grpc_metadata_array request_metadata_recv;
96   grpc_call_details call_details;
97   grpc_status_code status;
98   grpc_call_error error;
99   grpc_slice details;
100   int was_cancelled = 2;
101 
102   gpr_timespec deadline = five_seconds_from_now();
103   c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
104                                grpc_slice_from_static_string("/foo"), nullptr,
105                                deadline, nullptr);
106   GPR_ASSERT(c);
107 
108   grpc_metadata_array_init(&initial_metadata_recv);
109   grpc_metadata_array_init(&trailing_metadata_recv);
110   grpc_metadata_array_init(&request_metadata_recv);
111   grpc_call_details_init(&call_details);
112 
113   memset(ops, 0, sizeof(ops));
114   op = ops;
115   op->op = GRPC_OP_SEND_INITIAL_METADATA;
116   op->data.send_initial_metadata.count = 0;
117   op->flags = 0;
118   op->reserved = nullptr;
119   op++;
120   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
121   op->flags = 0;
122   op->reserved = nullptr;
123   op++;
124   op->op = GRPC_OP_RECV_INITIAL_METADATA;
125   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
126   op->flags = 0;
127   op->reserved = nullptr;
128   op++;
129   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
130   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
131   op->data.recv_status_on_client.status = &status;
132   op->data.recv_status_on_client.status_details = &details;
133   op->flags = 0;
134   op->reserved = nullptr;
135   op++;
136   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
137                                 nullptr);
138   GPR_ASSERT(GRPC_CALL_OK == error);
139 
140   error =
141       grpc_server_request_call(f.server, &s, &call_details,
142                                &request_metadata_recv, f.cq, f.cq, tag(101));
143   GPR_ASSERT(GRPC_CALL_OK == error);
144   CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
145   cq_verify(cqv);
146 
147   memset(ops, 0, sizeof(ops));
148   op = ops;
149   op->op = GRPC_OP_SEND_INITIAL_METADATA;
150   op->data.send_initial_metadata.count = 0;
151   op->flags = 0;
152   op->reserved = nullptr;
153   op++;
154   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
155   op->data.send_status_from_server.trailing_metadata_count = 0;
156   op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
157   grpc_slice status_details = grpc_slice_from_static_string("xyz");
158   op->data.send_status_from_server.status_details = &status_details;
159   op->flags = 0;
160   op->reserved = nullptr;
161   op++;
162   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
163   op->data.recv_close_on_server.cancelled = &was_cancelled;
164   op->flags = 0;
165   op->reserved = nullptr;
166   op++;
167   error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
168                                 nullptr);
169   GPR_ASSERT(GRPC_CALL_OK == error);
170 
171   CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
172   CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
173   cq_verify(cqv);
174 
175   GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
176   GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
177   GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
178   GPR_ASSERT(was_cancelled == 1);
179 
180   grpc_slice_unref(details);
181   grpc_metadata_array_destroy(&initial_metadata_recv);
182   grpc_metadata_array_destroy(&trailing_metadata_recv);
183   grpc_metadata_array_destroy(&request_metadata_recv);
184   grpc_call_details_destroy(&call_details);
185 
186   grpc_call_unref(c);
187   grpc_call_unref(s);
188 
189   cq_verifier_destroy(cqv);
190 }
191 
test_max_concurrent_streams(grpc_end2end_test_config config)192 static void test_max_concurrent_streams(grpc_end2end_test_config config) {
193   grpc_end2end_test_fixture f;
194   grpc_arg server_arg;
195   grpc_channel_args server_args;
196   grpc_call* c1;
197   grpc_call* c2;
198   grpc_call* s1;
199   grpc_call* s2;
200   int live_call;
201   gpr_timespec deadline;
202   cq_verifier* cqv;
203   grpc_event ev;
204   grpc_call_details call_details;
205   grpc_metadata_array request_metadata_recv;
206   grpc_metadata_array initial_metadata_recv1;
207   grpc_metadata_array trailing_metadata_recv1;
208   grpc_metadata_array initial_metadata_recv2;
209   grpc_metadata_array trailing_metadata_recv2;
210   grpc_status_code status1;
211   grpc_call_error error;
212   grpc_slice details1;
213   grpc_status_code status2;
214   grpc_slice details2;
215   grpc_op ops[6];
216   grpc_op* op;
217   int was_cancelled;
218   int got_client_start;
219   int got_server_start;
220 
221   server_arg.key = const_cast<char*>(GRPC_ARG_MAX_CONCURRENT_STREAMS);
222   server_arg.type = GRPC_ARG_INTEGER;
223   server_arg.value.integer = 1;
224 
225   server_args.num_args = 1;
226   server_args.args = &server_arg;
227 
228   f = begin_test(config, "test_max_concurrent_streams", nullptr, &server_args);
229   cqv = cq_verifier_create(f.cq);
230 
231   grpc_metadata_array_init(&request_metadata_recv);
232   grpc_metadata_array_init(&initial_metadata_recv1);
233   grpc_metadata_array_init(&trailing_metadata_recv1);
234   grpc_metadata_array_init(&initial_metadata_recv2);
235   grpc_metadata_array_init(&trailing_metadata_recv2);
236   grpc_call_details_init(&call_details);
237 
238   /* perform a ping-pong to ensure that settings have had a chance to round
239      trip */
240   simple_request_body(config, f);
241   /* perform another one to make sure that the one stream case still works */
242   simple_request_body(config, f);
243 
244   /* start two requests - ensuring that the second is not accepted until
245      the first completes */
246   deadline = n_seconds_from_now(1000);
247   c1 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
248                                 f.cq, grpc_slice_from_static_string("/alpha"),
249                                 nullptr, deadline, nullptr);
250   GPR_ASSERT(c1);
251   c2 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
252                                 f.cq, grpc_slice_from_static_string("/beta"),
253                                 nullptr, deadline, nullptr);
254   GPR_ASSERT(c2);
255 
256   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
257                                  f.server, &s1, &call_details,
258                                  &request_metadata_recv, f.cq, f.cq, tag(101)));
259 
260   memset(ops, 0, sizeof(ops));
261   op = ops;
262   op->op = GRPC_OP_SEND_INITIAL_METADATA;
263   op->data.send_initial_metadata.count = 0;
264   op->flags = 0;
265   op->reserved = nullptr;
266   op++;
267   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
268   op->flags = 0;
269   op->reserved = nullptr;
270   op++;
271   error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
272                                 tag(301), nullptr);
273   GPR_ASSERT(GRPC_CALL_OK == error);
274 
275   memset(ops, 0, sizeof(ops));
276   op = ops;
277   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
278   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1;
279   op->data.recv_status_on_client.status = &status1;
280   op->data.recv_status_on_client.status_details = &details1;
281   op->flags = 0;
282   op->reserved = nullptr;
283   op++;
284   op->op = GRPC_OP_RECV_INITIAL_METADATA;
285   op->data.recv_initial_metadata.recv_initial_metadata =
286       &initial_metadata_recv1;
287   op->flags = 0;
288   op->reserved = nullptr;
289   op++;
290   error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
291                                 tag(302), nullptr);
292   GPR_ASSERT(GRPC_CALL_OK == error);
293 
294   memset(ops, 0, sizeof(ops));
295   op = ops;
296   op->op = GRPC_OP_SEND_INITIAL_METADATA;
297   op->data.send_initial_metadata.count = 0;
298   op->flags = 0;
299   op->reserved = nullptr;
300   op++;
301   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
302   op->flags = 0;
303   op->reserved = nullptr;
304   op++;
305   error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
306                                 tag(401), nullptr);
307   GPR_ASSERT(GRPC_CALL_OK == error);
308 
309   memset(ops, 0, sizeof(ops));
310   op = ops;
311   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
312   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
313   op->data.recv_status_on_client.status = &status2;
314   op->data.recv_status_on_client.status_details = &details2;
315   op->flags = 0;
316   op->reserved = nullptr;
317   op++;
318   op->op = GRPC_OP_RECV_INITIAL_METADATA;
319   op->data.recv_initial_metadata.recv_initial_metadata =
320       &initial_metadata_recv1;
321   op->flags = 0;
322   op->reserved = nullptr;
323   op++;
324   error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
325                                 tag(402), nullptr);
326   GPR_ASSERT(GRPC_CALL_OK == error);
327 
328   got_client_start = 0;
329   got_server_start = 0;
330   live_call = -1;
331   while (!got_client_start || !got_server_start) {
332     ev = grpc_completion_queue_next(f.cq, grpc_timeout_seconds_to_deadline(3),
333                                     nullptr);
334     GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
335     GPR_ASSERT(ev.success);
336     if (ev.tag == tag(101)) {
337       GPR_ASSERT(!got_server_start);
338       got_server_start = 1;
339     } else {
340       GPR_ASSERT(!got_client_start);
341       GPR_ASSERT(ev.tag == tag(301) || ev.tag == tag(401));
342       /* The /alpha or /beta calls started above could be invoked (but NOT
343        * both);
344        * check this here */
345       /* We'll get tag 303 or 403, we want 300, 400 */
346       live_call = (static_cast<int>((intptr_t)ev.tag)) - 1;
347       got_client_start = 1;
348     }
349   }
350   GPR_ASSERT(live_call == 300 || live_call == 400);
351 
352   memset(ops, 0, sizeof(ops));
353   op = ops;
354   op->op = GRPC_OP_SEND_INITIAL_METADATA;
355   op->data.send_initial_metadata.count = 0;
356   op->flags = 0;
357   op->reserved = nullptr;
358   op++;
359   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
360   op->data.recv_close_on_server.cancelled = &was_cancelled;
361   op->flags = 0;
362   op->reserved = nullptr;
363   op++;
364   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
365   op->data.send_status_from_server.trailing_metadata_count = 0;
366   op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
367   grpc_slice status_details = grpc_slice_from_static_string("xyz");
368   op->data.send_status_from_server.status_details = &status_details;
369   op->flags = 0;
370   op->reserved = nullptr;
371   op++;
372   error = grpc_call_start_batch(s1, ops, static_cast<size_t>(op - ops),
373                                 tag(102), nullptr);
374   GPR_ASSERT(GRPC_CALL_OK == error);
375 
376   CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
377   CQ_EXPECT_COMPLETION(cqv, tag(live_call + 2), 1);
378   /* first request is finished, we should be able to start the second */
379   live_call = (live_call == 300) ? 400 : 300;
380   CQ_EXPECT_COMPLETION(cqv, tag(live_call + 1), 1);
381   cq_verify(cqv);
382 
383   grpc_call_details_destroy(&call_details);
384 
385   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
386                                  f.server, &s2, &call_details,
387                                  &request_metadata_recv, f.cq, f.cq, tag(201)));
388   CQ_EXPECT_COMPLETION(cqv, tag(201), 1);
389   cq_verify(cqv);
390 
391   memset(ops, 0, sizeof(ops));
392   op = ops;
393   op->op = GRPC_OP_SEND_INITIAL_METADATA;
394   op->data.send_initial_metadata.count = 0;
395   op->flags = 0;
396   op->reserved = nullptr;
397   op++;
398   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
399   op->data.recv_close_on_server.cancelled = &was_cancelled;
400   op->flags = 0;
401   op->reserved = nullptr;
402   op++;
403   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
404   op->data.send_status_from_server.trailing_metadata_count = 0;
405   op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
406   op->data.send_status_from_server.status_details = &status_details;
407   op->flags = 0;
408   op->reserved = nullptr;
409   op++;
410   error = grpc_call_start_batch(s2, ops, static_cast<size_t>(op - ops),
411                                 tag(202), nullptr);
412   GPR_ASSERT(GRPC_CALL_OK == error);
413 
414   CQ_EXPECT_COMPLETION(cqv, tag(live_call + 2), 1);
415   CQ_EXPECT_COMPLETION(cqv, tag(202), 1);
416   cq_verify(cqv);
417 
418   cq_verifier_destroy(cqv);
419 
420   grpc_call_unref(c1);
421   grpc_call_unref(s1);
422   grpc_call_unref(c2);
423   grpc_call_unref(s2);
424 
425   grpc_slice_unref(details1);
426   grpc_slice_unref(details2);
427   grpc_metadata_array_destroy(&initial_metadata_recv1);
428   grpc_metadata_array_destroy(&trailing_metadata_recv1);
429   grpc_metadata_array_destroy(&initial_metadata_recv2);
430   grpc_metadata_array_destroy(&trailing_metadata_recv2);
431   grpc_metadata_array_destroy(&request_metadata_recv);
432   grpc_call_details_destroy(&call_details);
433 
434   end_test(&f);
435   config.tear_down_data(&f);
436 }
437 
test_max_concurrent_streams_with_timeout_on_first(grpc_end2end_test_config config)438 static void test_max_concurrent_streams_with_timeout_on_first(
439     grpc_end2end_test_config config) {
440   grpc_end2end_test_fixture f;
441   grpc_arg server_arg;
442   grpc_channel_args server_args;
443   grpc_call* c1;
444   grpc_call* c2;
445   grpc_call* s1;
446   grpc_call* s2;
447   cq_verifier* cqv;
448   grpc_call_details call_details;
449   grpc_metadata_array request_metadata_recv;
450   grpc_metadata_array initial_metadata_recv1;
451   grpc_metadata_array trailing_metadata_recv1;
452   grpc_metadata_array initial_metadata_recv2;
453   grpc_metadata_array trailing_metadata_recv2;
454   grpc_status_code status1;
455   grpc_call_error error;
456   grpc_slice details1 = grpc_empty_slice();
457   grpc_status_code status2;
458   grpc_slice details2 = grpc_empty_slice();
459   grpc_op ops[6];
460   grpc_op* op;
461   int was_cancelled;
462 
463   server_arg.key = const_cast<char*>(GRPC_ARG_MAX_CONCURRENT_STREAMS);
464   server_arg.type = GRPC_ARG_INTEGER;
465   server_arg.value.integer = 1;
466 
467   server_args.num_args = 1;
468   server_args.args = &server_arg;
469 
470   f = begin_test(config, "test_max_concurrent_streams_with_timeout_on_first",
471                  nullptr, &server_args);
472   cqv = cq_verifier_create(f.cq);
473 
474   grpc_metadata_array_init(&request_metadata_recv);
475   grpc_metadata_array_init(&initial_metadata_recv1);
476   grpc_metadata_array_init(&trailing_metadata_recv1);
477   grpc_metadata_array_init(&initial_metadata_recv2);
478   grpc_metadata_array_init(&trailing_metadata_recv2);
479   grpc_call_details_init(&call_details);
480 
481   /* perform a ping-pong to ensure that settings have had a chance to round
482      trip */
483   simple_request_body(config, f);
484   /* perform another one to make sure that the one stream case still works */
485   simple_request_body(config, f);
486 
487   /* start two requests - ensuring that the second is not accepted until
488      the first completes */
489   c1 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
490                                 f.cq, grpc_slice_from_static_string("/alpha"),
491                                 nullptr, n_seconds_from_now(3), nullptr);
492   GPR_ASSERT(c1);
493   c2 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
494                                 f.cq, grpc_slice_from_static_string("/beta"),
495                                 nullptr, n_seconds_from_now(1000), nullptr);
496   GPR_ASSERT(c2);
497 
498   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
499                                  f.server, &s1, &call_details,
500                                  &request_metadata_recv, f.cq, f.cq, tag(101)));
501 
502   memset(ops, 0, sizeof(ops));
503   op = ops;
504   op->op = GRPC_OP_SEND_INITIAL_METADATA;
505   op->data.send_initial_metadata.count = 0;
506   op->flags = 0;
507   op->reserved = nullptr;
508   op++;
509   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
510   op->flags = 0;
511   op->reserved = nullptr;
512   op++;
513   error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
514                                 tag(301), nullptr);
515   GPR_ASSERT(GRPC_CALL_OK == error);
516 
517   memset(ops, 0, sizeof(ops));
518   op = ops;
519   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
520   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1;
521   op->data.recv_status_on_client.status = &status1;
522   op->data.recv_status_on_client.status_details = &details1;
523   op->flags = 0;
524   op->reserved = nullptr;
525   op++;
526   op->op = GRPC_OP_RECV_INITIAL_METADATA;
527   op->data.recv_initial_metadata.recv_initial_metadata =
528       &initial_metadata_recv1;
529   op->flags = 0;
530   op->reserved = nullptr;
531   op++;
532   error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
533                                 tag(302), nullptr);
534   GPR_ASSERT(GRPC_CALL_OK == error);
535 
536   CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
537   CQ_EXPECT_COMPLETION(cqv, tag(301), 1);
538   cq_verify(cqv);
539 
540   memset(ops, 0, sizeof(ops));
541   op = ops;
542   op->op = GRPC_OP_SEND_INITIAL_METADATA;
543   op->data.send_initial_metadata.count = 0;
544   op->flags = 0;
545   op->reserved = nullptr;
546   op++;
547   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
548   op->flags = 0;
549   op->reserved = nullptr;
550   op++;
551   error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
552                                 tag(401), nullptr);
553   GPR_ASSERT(GRPC_CALL_OK == error);
554 
555   memset(ops, 0, sizeof(ops));
556   op = ops;
557   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
558   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
559   op->data.recv_status_on_client.status = &status2;
560   op->data.recv_status_on_client.status_details = &details2;
561   op->flags = 0;
562   op->reserved = nullptr;
563   op++;
564   op->op = GRPC_OP_RECV_INITIAL_METADATA;
565   op->data.recv_initial_metadata.recv_initial_metadata =
566       &initial_metadata_recv2;
567   op->flags = 0;
568   op->reserved = nullptr;
569   op++;
570   error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
571                                 tag(402), nullptr);
572   GPR_ASSERT(GRPC_CALL_OK == error);
573 
574   grpc_call_details_destroy(&call_details);
575   grpc_call_details_init(&call_details);
576   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
577                                  f.server, &s2, &call_details,
578                                  &request_metadata_recv, f.cq, f.cq, tag(201)));
579 
580   CQ_EXPECT_COMPLETION(cqv, tag(302), 1);
581   /* first request is finished, we should be able to start the second */
582   CQ_EXPECT_COMPLETION(cqv, tag(401), 1);
583   CQ_EXPECT_COMPLETION(cqv, tag(201), 1);
584   cq_verify(cqv);
585 
586   memset(ops, 0, sizeof(ops));
587   op = ops;
588   op->op = GRPC_OP_SEND_INITIAL_METADATA;
589   op->data.send_initial_metadata.count = 0;
590   op->flags = 0;
591   op->reserved = nullptr;
592   op++;
593   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
594   op->data.recv_close_on_server.cancelled = &was_cancelled;
595   op->flags = 0;
596   op->reserved = nullptr;
597   op++;
598   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
599   op->data.send_status_from_server.trailing_metadata_count = 0;
600   op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
601   grpc_slice status_details = grpc_slice_from_static_string("xyz");
602   op->data.send_status_from_server.status_details = &status_details;
603   op->flags = 0;
604   op->reserved = nullptr;
605   op++;
606   error = grpc_call_start_batch(s2, ops, static_cast<size_t>(op - ops),
607                                 tag(202), nullptr);
608   GPR_ASSERT(GRPC_CALL_OK == error);
609 
610   CQ_EXPECT_COMPLETION(cqv, tag(402), 1);
611   CQ_EXPECT_COMPLETION(cqv, tag(202), 1);
612   cq_verify(cqv);
613 
614   cq_verifier_destroy(cqv);
615 
616   grpc_call_unref(c1);
617   grpc_call_unref(s1);
618   grpc_call_unref(c2);
619   grpc_call_unref(s2);
620 
621   grpc_slice_unref(details1);
622   grpc_slice_unref(details2);
623   grpc_metadata_array_destroy(&initial_metadata_recv1);
624   grpc_metadata_array_destroy(&trailing_metadata_recv1);
625   grpc_metadata_array_destroy(&initial_metadata_recv2);
626   grpc_metadata_array_destroy(&trailing_metadata_recv2);
627   grpc_metadata_array_destroy(&request_metadata_recv);
628   grpc_call_details_destroy(&call_details);
629 
630   end_test(&f);
631   config.tear_down_data(&f);
632 }
633 
test_max_concurrent_streams_with_timeout_on_second(grpc_end2end_test_config config)634 static void test_max_concurrent_streams_with_timeout_on_second(
635     grpc_end2end_test_config config) {
636   grpc_end2end_test_fixture f;
637   grpc_arg server_arg;
638   grpc_channel_args server_args;
639   grpc_call* c1;
640   grpc_call* c2;
641   grpc_call* s1;
642   cq_verifier* cqv;
643   grpc_call_details call_details;
644   grpc_metadata_array request_metadata_recv;
645   grpc_metadata_array initial_metadata_recv1;
646   grpc_metadata_array trailing_metadata_recv1;
647   grpc_metadata_array initial_metadata_recv2;
648   grpc_metadata_array trailing_metadata_recv2;
649   grpc_status_code status1;
650   grpc_call_error error;
651   grpc_slice details1 = grpc_empty_slice();
652   grpc_status_code status2;
653   grpc_slice details2 = grpc_empty_slice();
654   grpc_op ops[6];
655   grpc_op* op;
656   int was_cancelled;
657 
658   server_arg.key = const_cast<char*>(GRPC_ARG_MAX_CONCURRENT_STREAMS);
659   server_arg.type = GRPC_ARG_INTEGER;
660   server_arg.value.integer = 1;
661 
662   server_args.num_args = 1;
663   server_args.args = &server_arg;
664 
665   f = begin_test(config, "test_max_concurrent_streams_with_timeout_on_second",
666                  nullptr, &server_args);
667   cqv = cq_verifier_create(f.cq);
668 
669   grpc_metadata_array_init(&request_metadata_recv);
670   grpc_metadata_array_init(&initial_metadata_recv1);
671   grpc_metadata_array_init(&trailing_metadata_recv1);
672   grpc_metadata_array_init(&initial_metadata_recv2);
673   grpc_metadata_array_init(&trailing_metadata_recv2);
674   grpc_call_details_init(&call_details);
675 
676   /* perform a ping-pong to ensure that settings have had a chance to round
677      trip */
678   simple_request_body(config, f);
679   /* perform another one to make sure that the one stream case still works */
680   simple_request_body(config, f);
681 
682   /* start two requests - ensuring that the second is not accepted until
683      the first completes , and the second request will timeout in the
684      concurrent_list */
685   c1 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
686                                 f.cq, grpc_slice_from_static_string("/alpha"),
687                                 nullptr, n_seconds_from_now(1000), nullptr);
688   GPR_ASSERT(c1);
689   c2 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
690                                 f.cq, grpc_slice_from_static_string("/beta"),
691                                 nullptr, n_seconds_from_now(3), nullptr);
692   GPR_ASSERT(c2);
693 
694   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
695                                  f.server, &s1, &call_details,
696                                  &request_metadata_recv, f.cq, f.cq, tag(101)));
697 
698   memset(ops, 0, sizeof(ops));
699   op = ops;
700   op->op = GRPC_OP_SEND_INITIAL_METADATA;
701   op->data.send_initial_metadata.count = 0;
702   op->flags = 0;
703   op->reserved = nullptr;
704   op++;
705   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
706   op->flags = 0;
707   op->reserved = nullptr;
708   op++;
709   error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
710                                 tag(301), nullptr);
711   GPR_ASSERT(GRPC_CALL_OK == error);
712 
713   memset(ops, 0, sizeof(ops));
714   op = ops;
715   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
716   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1;
717   op->data.recv_status_on_client.status = &status1;
718   op->data.recv_status_on_client.status_details = &details1;
719   op->flags = 0;
720   op->reserved = nullptr;
721   op++;
722   op->op = GRPC_OP_RECV_INITIAL_METADATA;
723   op->data.recv_initial_metadata.recv_initial_metadata =
724       &initial_metadata_recv1;
725   op->flags = 0;
726   op->reserved = nullptr;
727   op++;
728   error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
729                                 tag(302), nullptr);
730   GPR_ASSERT(GRPC_CALL_OK == error);
731 
732   CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
733   CQ_EXPECT_COMPLETION(cqv, tag(301), 1);
734   cq_verify(cqv);
735 
736   memset(ops, 0, sizeof(ops));
737   op = ops;
738   op->op = GRPC_OP_SEND_INITIAL_METADATA;
739   op->data.send_initial_metadata.count = 0;
740   op->flags = 0;
741   op->reserved = nullptr;
742   op++;
743   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
744   op->flags = 0;
745   op->reserved = nullptr;
746   op++;
747   error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
748                                 tag(401), nullptr);
749   GPR_ASSERT(GRPC_CALL_OK == error);
750 
751   memset(ops, 0, sizeof(ops));
752   op = ops;
753   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
754   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
755   op->data.recv_status_on_client.status = &status2;
756   op->data.recv_status_on_client.status_details = &details2;
757   op->flags = 0;
758   op->reserved = nullptr;
759   op++;
760   op->op = GRPC_OP_RECV_INITIAL_METADATA;
761   op->data.recv_initial_metadata.recv_initial_metadata =
762       &initial_metadata_recv2;
763   op->flags = 0;
764   op->reserved = nullptr;
765   op++;
766   error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
767                                 tag(402), nullptr);
768   GPR_ASSERT(GRPC_CALL_OK == error);
769 
770   /* the second request is time out*/
771   CQ_EXPECT_COMPLETION(cqv, tag(401), 0);
772   CQ_EXPECT_COMPLETION(cqv, tag(402), 1);
773   cq_verify(cqv);
774 
775   /* second request is finished because of time out, so destroy the second call
776    */
777   grpc_call_unref(c2);
778 
779   /* now reply the first call */
780   memset(ops, 0, sizeof(ops));
781   op = ops;
782   op->op = GRPC_OP_SEND_INITIAL_METADATA;
783   op->data.send_initial_metadata.count = 0;
784   op->flags = 0;
785   op->reserved = nullptr;
786   op++;
787   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
788   op->data.recv_close_on_server.cancelled = &was_cancelled;
789   op->flags = 0;
790   op->reserved = nullptr;
791   op++;
792   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
793   op->data.send_status_from_server.trailing_metadata_count = 0;
794   op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
795   grpc_slice status_details = grpc_slice_from_static_string("xyz");
796   op->data.send_status_from_server.status_details = &status_details;
797   op->flags = 0;
798   op->reserved = nullptr;
799   op++;
800   error = grpc_call_start_batch(s1, ops, static_cast<size_t>(op - ops),
801                                 tag(102), nullptr);
802   GPR_ASSERT(GRPC_CALL_OK == error);
803 
804   CQ_EXPECT_COMPLETION(cqv, tag(302), 1);
805   CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
806   cq_verify(cqv);
807 
808   cq_verifier_destroy(cqv);
809 
810   grpc_call_unref(c1);
811   grpc_call_unref(s1);
812 
813   grpc_slice_unref(details1);
814   grpc_slice_unref(details2);
815   grpc_metadata_array_destroy(&initial_metadata_recv1);
816   grpc_metadata_array_destroy(&trailing_metadata_recv1);
817   grpc_metadata_array_destroy(&initial_metadata_recv2);
818   grpc_metadata_array_destroy(&trailing_metadata_recv2);
819   grpc_metadata_array_destroy(&request_metadata_recv);
820   grpc_call_details_destroy(&call_details);
821 
822   end_test(&f);
823   config.tear_down_data(&f);
824 }
825 
max_concurrent_streams(grpc_end2end_test_config config)826 void max_concurrent_streams(grpc_end2end_test_config config) {
827   test_max_concurrent_streams_with_timeout_on_first(config);
828   test_max_concurrent_streams_with_timeout_on_second(config);
829   test_max_concurrent_streams(config);
830 }
831 
max_concurrent_streams_pre_init(void)832 void max_concurrent_streams_pre_init(void) {}
833