1 #region Copyright notice and license
2 
3 // Copyright 2015-2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections.Generic;
21 using System.Linq;
22 using System.Threading;
23 using System.Threading.Tasks;
24 using Google.Protobuf;
25 using Grpc.Core;
26 using Grpc.Core.Utils;
27 
28 namespace Grpc.Testing
29 {
30     /// <summary>
31     /// Implementation of TestService server
32     /// </summary>
33     public class TestServiceImpl : TestService.TestServiceBase
34     {
EmptyCall(Empty request, ServerCallContext context)35         public override Task<Empty> EmptyCall(Empty request, ServerCallContext context)
36         {
37             return Task.FromResult(new Empty());
38         }
39 
UnaryCall(SimpleRequest request, ServerCallContext context)40         public override async Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context)
41         {
42             await EnsureEchoMetadataAsync(context);
43             EnsureEchoStatus(request.ResponseStatus, context);
44 
45             var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) };
46             return response;
47         }
48 
StreamingOutputCall(StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context)49         public override async Task StreamingOutputCall(StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context)
50         {
51             await EnsureEchoMetadataAsync(context);
52             EnsureEchoStatus(request.ResponseStatus, context);
53 
54             foreach (var responseParam in request.ResponseParameters)
55             {
56                 var response = new StreamingOutputCallResponse { Payload = CreateZerosPayload(responseParam.Size) };
57                 await responseStream.WriteAsync(response);
58             }
59         }
60 
StreamingInputCall(IAsyncStreamReader<StreamingInputCallRequest> requestStream, ServerCallContext context)61         public override async Task<StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<StreamingInputCallRequest> requestStream, ServerCallContext context)
62         {
63             await EnsureEchoMetadataAsync(context);
64 
65             int sum = 0;
66             await requestStream.ForEachAsync(request =>
67             {
68                 sum += request.Payload.Body.Length;
69                 return TaskUtils.CompletedTask;
70             });
71             return new StreamingInputCallResponse { AggregatedPayloadSize = sum };
72         }
73 
FullDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context)74         public override async Task FullDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context)
75         {
76             await EnsureEchoMetadataAsync(context);
77 
78             await requestStream.ForEachAsync(async request =>
79             {
80                 EnsureEchoStatus(request.ResponseStatus, context);
81                 foreach (var responseParam in request.ResponseParameters)
82                 {
83                     var response = new StreamingOutputCallResponse { Payload = CreateZerosPayload(responseParam.Size) };
84                     await responseStream.WriteAsync(response);
85                 }
86             });
87         }
88 
HalfDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context)89         public override Task HalfDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context)
90         {
91             throw new NotImplementedException();
92         }
93 
CreateZerosPayload(int size)94         private static Payload CreateZerosPayload(int size)
95         {
96             return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
97         }
98 
EnsureEchoMetadataAsync(ServerCallContext context)99         private static async Task EnsureEchoMetadataAsync(ServerCallContext context)
100         {
101             var echoInitialList = context.RequestHeaders.Where((entry) => entry.Key == "x-grpc-test-echo-initial").ToList();
102             if (echoInitialList.Any()) {
103                 var entry = echoInitialList.Single();
104                 await context.WriteResponseHeadersAsync(new Metadata { entry });
105             }
106 
107             var echoTrailingList = context.RequestHeaders.Where((entry) => entry.Key == "x-grpc-test-echo-trailing-bin").ToList();
108             if (echoTrailingList.Any()) {
109                 context.ResponseTrailers.Add(echoTrailingList.Single());
110             }
111         }
112 
EnsureEchoStatus(EchoStatus responseStatus, ServerCallContext context)113         private static void EnsureEchoStatus(EchoStatus responseStatus, ServerCallContext context)
114         {
115             if (responseStatus != null)
116             {
117                 var statusCode = (StatusCode)responseStatus.Code;
118                 context.Status = new Status(statusCode, responseStatus.Message);
119             }
120         }
121     }
122 }
123