1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Diagnostics;
21 using System.IO;
22 using System.Runtime.CompilerServices;
23 using System.Runtime.InteropServices;
24 using System.Threading;
25 using System.Threading.Tasks;
26 using Grpc.Core.Internal;
27 using Grpc.Core.Utils;
28 
29 namespace Grpc.Core.Internal
30 {
31     /// <summary>
32     /// Manages server side native call lifecycle.
33     /// </summary>
34     internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>, IReceivedCloseOnServerCallback, ISendStatusFromServerCompletionCallback
35     {
36         readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
37         readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
38         readonly Server server;
39 
AsyncCallServer(Action<TResponse, SerializationContext> serializer, Func<DeserializationContext, TRequest> deserializer, Server server)40         public AsyncCallServer(Action<TResponse, SerializationContext> serializer, Func<DeserializationContext, TRequest> deserializer, Server server) : base(serializer, deserializer)
41         {
42             this.server = GrpcPreconditions.CheckNotNull(server);
43         }
44 
Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)45         public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)
46         {
47             call.Initialize(completionQueue);
48 
49             server.AddCallReference(this);
50             InitializeInternal(call);
51         }
52 
53         /// <summary>
54         /// Only for testing purposes.
55         /// </summary>
InitializeForTesting(INativeCall call)56         public void InitializeForTesting(INativeCall call)
57         {
58             server.AddCallReference(this);
59             InitializeInternal(call);
60         }
61 
62         /// <summary>
63         /// Starts a server side call.
64         /// </summary>
ServerSideCallAsync()65         public Task ServerSideCallAsync()
66         {
67             lock (myLock)
68             {
69                 GrpcPreconditions.CheckNotNull(call);
70 
71                 started = true;
72 
73                 call.StartServerSide(ReceiveCloseOnServerCallback);
74                 return finishedServersideTcs.Task;
75             }
76         }
77 
78         /// <summary>
79         /// Sends a streaming response. Only one pending send action is allowed at any given time.
80         /// </summary>
SendMessageAsync(TResponse msg, WriteFlags writeFlags)81         public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags)
82         {
83             return SendMessageInternalAsync(msg, writeFlags);
84         }
85 
86         /// <summary>
87         /// Receives a streaming request. Only one pending read action is allowed at any given time.
88         /// </summary>
ReadMessageAsync()89         public Task<TRequest> ReadMessageAsync()
90         {
91             return ReadMessageInternalAsync();
92         }
93 
94         /// <summary>
95         /// Initiates sending a initial metadata.
96         /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
97         /// to make things simpler.
98         /// </summary>
SendInitialMetadataAsync(Metadata headers)99         public Task SendInitialMetadataAsync(Metadata headers)
100         {
101             lock (myLock)
102             {
103                 GrpcPreconditions.CheckNotNull(headers, "metadata");
104 
105                 GrpcPreconditions.CheckState(started);
106                 GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
107                 GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
108 
109                 var earlyResult = CheckSendAllowedOrEarlyResult();
110                 if (earlyResult != null)
111                 {
112                     return earlyResult;
113                 }
114 
115                 using (var metadataArray = MetadataArraySafeHandle.Create(headers))
116                 {
117                     call.StartSendInitialMetadata(SendCompletionCallback, metadataArray);
118                 }
119 
120                 this.initialMetadataSent = true;
121                 streamingWriteTcs = new TaskCompletionSource<object>();
122                 return streamingWriteTcs.Task;
123             }
124         }
125 
126         /// <summary>
127         /// Sends call result status, indicating we are done with writes.
128         /// Sending a status different from StatusCode.OK will also implicitly cancel the call.
129         /// </summary>
SendStatusFromServerAsync(Status status, Metadata trailers, ResponseWithFlags? optionalWrite)130         public Task SendStatusFromServerAsync(Status status, Metadata trailers, ResponseWithFlags? optionalWrite)
131         {
132             using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope())
133             {
134                 var payload = optionalWrite.HasValue ? UnsafeSerialize(optionalWrite.Value.Response, serializationScope.Context) : SliceBufferSafeHandle.NullInstance;
135                 var writeFlags = optionalWrite.HasValue ? optionalWrite.Value.WriteFlags : default(WriteFlags);
136 
137                 lock (myLock)
138                 {
139                     GrpcPreconditions.CheckState(started);
140                     GrpcPreconditions.CheckState(!disposed);
141                     GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once.");
142 
143                     using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
144                     {
145                         call.StartSendStatusFromServer(SendStatusFromServerCompletionCallback, status, metadataArray, !initialMetadataSent,
146                             payload, writeFlags);
147                     }
148                     halfcloseRequested = true;
149                     initialMetadataSent = true;
150                     sendStatusFromServerTcs = new TaskCompletionSource<object>();
151                     if (optionalWrite.HasValue)
152                     {
153                         streamingWritesCounter++;
154                     }
155                     return sendStatusFromServerTcs.Task;
156                 }
157             }
158         }
159 
160         /// <summary>
161         /// Gets cancellation token that gets cancelled once close completion
162         /// is received and the cancelled flag is set.
163         /// </summary>
164         public CancellationToken CancellationToken
165         {
166             get
167             {
168                 return cancellationTokenSource.Token;
169             }
170         }
171 
172         public string Peer
173         {
174             get
175             {
176                 return call.GetPeer();
177             }
178         }
179 
180         protected override bool IsClient
181         {
182             get { return false; }
183         }
184 
GetRpcExceptionClientOnly()185         protected override Exception GetRpcExceptionClientOnly()
186         {
187             throw new InvalidOperationException("Call be only called for client calls");
188         }
189 
OnAfterReleaseResourcesLocked()190         protected override void OnAfterReleaseResourcesLocked()
191         {
192             server.RemoveCallReference(this);
193         }
194 
CheckSendAllowedOrEarlyResult()195         protected override Task CheckSendAllowedOrEarlyResult()
196         {
197             GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
198             GrpcPreconditions.CheckState(!finished, "Already finished.");
199             GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
200             GrpcPreconditions.CheckState(!disposed);
201 
202             return null;
203         }
204 
205         /// <summary>
206         /// Handles the server side close completion.
207         /// </summary>
HandleFinishedServerside(bool success, bool cancelled)208         private void HandleFinishedServerside(bool success, bool cancelled)
209         {
210             // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
211             // success will be always set to true.
212             bool releasedResources;
213             lock (myLock)
214             {
215                 finished = true;
216                 if (streamingReadTcs == null)
217                 {
218                     // if there's no pending read, readingDone=true will dispose now.
219                     // if there is a pending read, we will dispose once that read finishes.
220                     readingDone = true;
221                     streamingReadTcs = new TaskCompletionSource<TRequest>();
222                     streamingReadTcs.SetResult(default(TRequest));
223                 }
224                 releasedResources = ReleaseResourcesIfPossible();
225             }
226 
227             if (releasedResources)
228             {
229                 OnAfterReleaseResourcesUnlocked();
230             }
231 
232             if (cancelled)
233             {
234                 cancellationTokenSource.Cancel();
235             }
236 
237             finishedServersideTcs.SetResult(null);
238         }
239 
240         IReceivedCloseOnServerCallback ReceiveCloseOnServerCallback => this;
241 
IReceivedCloseOnServerCallback.OnReceivedCloseOnServer(bool success, bool cancelled)242         void IReceivedCloseOnServerCallback.OnReceivedCloseOnServer(bool success, bool cancelled)
243         {
244             HandleFinishedServerside(success, cancelled);
245         }
246 
247         ISendStatusFromServerCompletionCallback SendStatusFromServerCompletionCallback => this;
248 
ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success)249         void ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success)
250         {
251             HandleSendStatusFromServerFinished(success);
252         }
253 
254         public struct ResponseWithFlags
255         {
ResponseWithFlagsGrpc.Core.Internal.AsyncCallServer.ResponseWithFlags256             public ResponseWithFlags(TResponse response, WriteFlags writeFlags)
257             {
258                 this.Response = response;
259                 this.WriteFlags = writeFlags;
260             }
261 
262             public TResponse Response { get; }
263             public WriteFlags WriteFlags { get; }
264         }
265     }
266 }
267