1 #region Copyright notice and license
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 #endregion
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Threading;
20 using System.Threading.Tasks;
21 
22 using Grpc.Core.Internal;
23 using Grpc.Core.Logging;
24 using Grpc.Core.Utils;
25 
26 namespace Grpc.Core
27 {
28     /// <summary>
29     /// Represents a gRPC channel. Channels are an abstraction of long-lived connections to remote servers.
30     /// More client objects can reuse the same channel. Creating a channel is an expensive operation compared to invoking
31     /// a remote call so in general you should reuse a single channel for as many calls as possible.
32     /// </summary>
33     public class Channel : ChannelBase
34     {
35         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
36 
37         readonly object myLock = new object();
38         readonly AtomicCounter activeCallCounter = new AtomicCounter();
39         readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource();
40 
41         readonly GrpcEnvironment environment;
42         readonly CompletionQueueSafeHandle completionQueue;
43         readonly ChannelSafeHandle handle;
44         readonly Dictionary<string, ChannelOption> options;
45 
46         bool shutdownRequested;
47 
48         /// <summary>
49         /// Creates a channel that connects to a specific host.
50         /// Port will default to 80 for an unsecure channel and to 443 for a secure channel.
51         /// </summary>
52         /// <param name="target">Target of the channel.</param>
53         /// <param name="credentials">Credentials to secure the channel.</param>
Channel(string target, ChannelCredentials credentials)54         public Channel(string target, ChannelCredentials credentials) :
55             this(target, credentials, null)
56         {
57         }
58 
59         /// <summary>
60         /// Creates a channel that connects to a specific host.
61         /// Port will default to 80 for an unsecure channel or to 443 for a secure channel.
62         /// </summary>
63         /// <param name="target">Target of the channel.</param>
64         /// <param name="credentials">Credentials to secure the channel.</param>
65         /// <param name="options">Channel options.</param>
Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options)66         public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options) : base(target)
67         {
68             this.options = CreateOptionsDictionary(options);
69             EnsureUserAgentChannelOption(this.options);
70             this.environment = GrpcEnvironment.AddRef();
71 
72             this.completionQueue = this.environment.PickCompletionQueue();
73             using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values))
74             {
75                 var nativeCredentials = credentials.ToNativeCredentials();
76                 if (nativeCredentials != null)
77                 {
78                     this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs);
79                 }
80                 else
81                 {
82                     this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs);
83                 }
84             }
85             GrpcEnvironment.RegisterChannel(this);
86         }
87 
88         /// <summary>
89         /// Creates a channel that connects to a specific host and port.
90         /// </summary>
91         /// <param name="host">The name or IP address of the host.</param>
92         /// <param name="port">The port.</param>
93         /// <param name="credentials">Credentials to secure the channel.</param>
Channel(string host, int port, ChannelCredentials credentials)94         public Channel(string host, int port, ChannelCredentials credentials) :
95             this(host, port, credentials, null)
96         {
97         }
98 
99         /// <summary>
100         /// Creates a channel that connects to a specific host and port.
101         /// </summary>
102         /// <param name="host">The name or IP address of the host.</param>
103         /// <param name="port">The port.</param>
104         /// <param name="credentials">Credentials to secure the channel.</param>
105         /// <param name="options">Channel options.</param>
Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options)106         public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options) :
107             this(string.Format("{0}:{1}", host, port), credentials, options)
108         {
109         }
110 
111         /// <summary>
112         /// Gets current connectivity state of this channel.
113         /// After channel has been shutdown, <c>ChannelState.Shutdown</c> will be returned.
114         /// </summary>
115         public ChannelState State
116         {
117             get
118             {
119                 return GetConnectivityState(false);
120             }
121         }
122 
123         // cached handler for watch connectivity state
124         static readonly BatchCompletionDelegate WatchConnectivityStateHandler = (success, ctx, state) =>
125         {
126             var tcs = (TaskCompletionSource<bool>) state;
127             tcs.SetResult(success);
128         };
129 
130         /// <summary>
131         /// Returned tasks completes once channel state has become different from
132         /// given lastObservedState.
133         /// If deadline is reached or an error occurs, returned task is cancelled.
134         /// </summary>
WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)135         public async Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)
136         {
137             var result = await TryWaitForStateChangedAsync(lastObservedState, deadline).ConfigureAwait(false);
138             if (!result)
139             {
140                 throw new TaskCanceledException("Reached deadline.");
141             }
142         }
143 
144         /// <summary>
145         /// Returned tasks completes once channel state has become different from
146         /// given lastObservedState (<c>true</c> is returned) or if the wait has timed out (<c>false</c> is returned).
147         /// </summary>
TryWaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)148         public Task<bool> TryWaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)
149         {
150             GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.Shutdown,
151                 "Shutdown is a terminal state. No further state changes can occur.");
152             var tcs = new TaskCompletionSource<bool>();
153             var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture;
154             lock (myLock)
155             {
156                 if (handle.IsClosed)
157                 {
158                     // If channel has been already shutdown and handle was disposed, we would end up with
159                     // an abandoned completion added to the completion registry. Instead, we make sure we fail early.
160                     throw new ObjectDisposedException(nameof(handle), "Channel handle has already been disposed.");
161                 }
162                 else
163                 {
164                     // pass "tcs" as "state" for WatchConnectivityStateHandler.
165                     handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs);
166                 }
167             }
168             return tcs.Task;
169         }
170 
171         /// <summary>Resolved address of the remote endpoint in URI format.</summary>
172         public string ResolvedTarget
173         {
174             get
175             {
176                 return handle.GetTarget();
177             }
178         }
179 
180         /// <summary>
181         /// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked.
182         /// </summary>
183         public CancellationToken ShutdownToken
184         {
185             get
186             {
187                 return this.shutdownTokenSource.Token;
188             }
189         }
190 
191         /// <summary>
192         /// Allows explicitly requesting channel to connect without starting an RPC.
193         /// Returned task completes once state Ready was seen. If the deadline is reached,
194         /// or channel enters the Shutdown state, the task is cancelled.
195         /// There is no need to call this explicitly unless your use case requires that.
196         /// Starting an RPC on a new channel will request connection implicitly.
197         /// </summary>
198         /// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param>
199         public async Task ConnectAsync(DateTime? deadline = null)
200         {
201             var currentState = GetConnectivityState(true);
202             while (currentState != ChannelState.Ready)
203             {
204                 if (currentState == ChannelState.Shutdown)
205                 {
206                     throw new OperationCanceledException("Channel has reached Shutdown state.");
207                 }
208                 await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false);
209                 currentState = GetConnectivityState(false);
210             }
211         }
212 
213         /// <summary>Provides implementation of a non-virtual public member.</summary>
ShutdownAsyncCore()214         protected override async Task ShutdownAsyncCore()
215         {
216             lock (myLock)
217             {
218                 GrpcPreconditions.CheckState(!shutdownRequested);
219                 shutdownRequested = true;
220             }
221             GrpcEnvironment.UnregisterChannel(this);
222 
223             shutdownTokenSource.Cancel();
224 
225             var activeCallCount = activeCallCounter.Count;
226             if (activeCallCount > 0)
227             {
228                 Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount);
229             }
230 
231             lock (myLock)
232             {
233                 handle.Dispose();
234             }
235 
236             await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
237         }
238 
239         /// <summary>
240         /// Create a new <see cref="CallInvoker"/> for the channel.
241         /// </summary>
242         /// <returns>A new <see cref="CallInvoker"/>.</returns>
CreateCallInvoker()243         public override CallInvoker CreateCallInvoker()
244         {
245             return new DefaultCallInvoker(this);
246         }
247 
248         internal ChannelSafeHandle Handle
249         {
250             get
251             {
252                 return this.handle;
253             }
254         }
255 
256         internal GrpcEnvironment Environment
257         {
258             get
259             {
260                 return this.environment;
261             }
262         }
263 
264         internal CompletionQueueSafeHandle CompletionQueue
265         {
266             get
267             {
268                 return this.completionQueue;
269             }
270         }
271 
AddCallReference(object call)272         internal void AddCallReference(object call)
273         {
274             activeCallCounter.Increment();
275 
276             bool success = false;
277             handle.DangerousAddRef(ref success);
278             GrpcPreconditions.CheckState(success);
279         }
280 
RemoveCallReference(object call)281         internal void RemoveCallReference(object call)
282         {
283             handle.DangerousRelease();
284 
285             activeCallCounter.Decrement();
286         }
287 
288         // for testing only
GetCallReferenceCount()289         internal long GetCallReferenceCount()
290         {
291             return activeCallCounter.Count;
292         }
293 
GetConnectivityState(bool tryToConnect)294         private ChannelState GetConnectivityState(bool tryToConnect)
295         {
296             try
297             {
298                 lock (myLock)
299                 {
300                     return handle.CheckConnectivityState(tryToConnect);
301                 }
302             }
303             catch (ObjectDisposedException)
304             {
305                 return ChannelState.Shutdown;
306             }
307         }
308 
EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)309         private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)
310         {
311             var key = ChannelOptions.PrimaryUserAgentString;
312             var userAgentString = "";
313 
314             ChannelOption option;
315             if (options.TryGetValue(key, out option))
316             {
317                 // user-provided userAgentString needs to be at the beginning
318                 userAgentString = option.StringValue + " ";
319             };
320 
321             userAgentString += UserAgentStringProvider.DefaultInstance.GrpcCsharpUserAgentString;
322 
323             options[ChannelOptions.PrimaryUserAgentString] = new ChannelOption(key, userAgentString);
324         }
325 
CreateOptionsDictionary(IEnumerable<ChannelOption> options)326         private static Dictionary<string, ChannelOption> CreateOptionsDictionary(IEnumerable<ChannelOption> options)
327         {
328             var dict = new Dictionary<string, ChannelOption>();
329             if (options == null)
330             {
331                 return dict;
332             }
333             foreach (var option in options)
334             {
335                 dict.Add(option.Name, option);
336             }
337             return dict;
338         }
339     }
340 }
341