1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Collections.Generic;
20 using System.Diagnostics;
21 using System.IO;
22 using System.Linq;
23 using System.Net;
24 using System.Net.Security;
25 using System.Security.Cryptography.X509Certificates;
26 using System.Threading;
27 using System.Threading.Tasks;
28 using Microsoft.Extensions.Logging;
29 using Microsoft.Extensions.DependencyInjection;
30 using Thrift;
31 using Thrift.Protocol;
32 using Thrift.Transport;
33 using Thrift.Transport.Client;
34 using tutorial;
35 using shared;
36 
37 namespace Client
38 {
39     public class Program
40     {
41         private static ServiceCollection ServiceCollection = new ServiceCollection();
42         private static ILogger Logger;
43         private static readonly TConfiguration Configuration = null;  // new TConfiguration() if  needed
44 
DisplayHelp()45         private static void DisplayHelp()
46         {
47             Logger.LogInformation(@"
48 Usage:
49     Client -help
50         will diplay help information
51 
52     Client -tr:<transport> -bf:<buffering> -pr:<protocol> -mc:<numClients>
53         will run client with specified arguments (tcp transport and binary protocol by default) and with 1 client
54 
55 Options:
56     -tr (transport):
57         tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090)
58         namedpipe - namedpipe transport will be used (pipe address - "".test"")
59         http - http transport will be used (address - ""http://localhost:9090"")
60         tcptls - tcp tls transport will be used (host - ""localhost"", port - 9090)
61 
62     -bf (buffering):
63         none - (default) no buffering will be used
64         buffered - buffered transport will be used
65         framed - framed transport will be used
66 
67     -pr (protocol):
68         binary - (default) binary protocol will be used
69         compact - compact protocol will be used
70         json - json protocol will be used
71         multiplexed - multiplexed protocol will be used
72 
73     -mc (multiple clients):
74         <numClients> - number of multiple clients to connect to server (max 100, default 1)
75 
76 Sample:
77     Client -tr:tcp -pr:binary
78 ");
79         }
80 
Main(string[] args)81         public static void Main(string[] args)
82         {
83             args = args ?? new string[0];
84 
85             ServiceCollection.AddLogging(logging => ConfigureLogging(logging));
86             using (var serviceProvider = ServiceCollection.BuildServiceProvider())
87             {
88                 Logger = serviceProvider.GetService<ILoggerFactory>().CreateLogger(nameof(Client));
89 
90                 if (args.Any(x => x.StartsWith("-help", StringComparison.OrdinalIgnoreCase)))
91                 {
92                     DisplayHelp();
93                     return;
94                 }
95 
96                 Logger.LogInformation("Starting client...");
97 
98                 using (var source = new CancellationTokenSource())
99                 {
100                     RunAsync(args, source.Token).GetAwaiter().GetResult();
101                 }
102             }
103         }
104 
ConfigureLogging(ILoggingBuilder logging)105         private static void ConfigureLogging(ILoggingBuilder logging)
106         {
107             logging.SetMinimumLevel(LogLevel.Trace);
108             logging.AddConsole();
109             logging.AddDebug();
110         }
111 
RunAsync(string[] args, CancellationToken cancellationToken)112         private static async Task RunAsync(string[] args, CancellationToken cancellationToken)
113         {
114             var numClients = GetNumberOfClients(args);
115 
116             Logger.LogInformation($"Selected # of clients: {numClients}");
117 
118             var transports = new TTransport[numClients];
119             for (int i = 0; i < numClients; i++)
120             {
121                 var t = GetTransport(args);
122                 transports[i] = t;
123             }
124 
125             Logger.LogInformation($"Selected client transport: {transports[0]}");
126 
127             var protocols = new Tuple<Protocol, TProtocol>[numClients];
128             for (int i = 0; i < numClients; i++)
129             {
130                 var p = GetProtocol(args, transports[i]);
131                 protocols[i] = p;
132             }
133 
134             Logger.LogInformation($"Selected client protocol: {protocols[0].Item1}");
135 
136             var tasks = new Task[numClients];
137             for (int i = 0; i < numClients; i++)
138             {
139                 var task = RunClientAsync(protocols[i], cancellationToken);
140                 tasks[i] = task;
141             }
142 
143             Task.WaitAll(tasks);
144 
145             await Task.CompletedTask;
146         }
147 
GetTransport(string[] args)148         private static TTransport GetTransport(string[] args)
149         {
150             TTransport transport = new TSocketTransport(IPAddress.Loopback, 9090, Configuration);
151 
152             // construct endpoint transport
153             var transportArg = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1];
154             if (Enum.TryParse(transportArg, true, out Transport selectedTransport))
155             {
156                 switch (selectedTransport)
157                 {
158                     case Transport.Tcp:
159                         transport = new TSocketTransport(IPAddress.Loopback, 9090, Configuration);
160                         break;
161 
162                     case Transport.NamedPipe:
163                         transport = new TNamedPipeTransport(".test", Configuration);
164                         break;
165 
166                     case Transport.Http:
167                         transport = new THttpTransport(new Uri("http://localhost:9090"), Configuration);
168                         break;
169 
170                     case Transport.TcpTls:
171                         transport = new TTlsSocketTransport(IPAddress.Loopback, 9090, Configuration,
172                             GetCertificate(), CertValidator, LocalCertificateSelectionCallback);
173                         break;
174 
175                     default:
176                         Debug.Assert(false, "unhandled case");
177                         break;
178                 }
179             }
180 
181             // optionally add layered transport(s)
182             var bufferingArg = args.FirstOrDefault(x => x.StartsWith("-bf"))?.Split(':')?[1];
183             if (Enum.TryParse<Buffering>(bufferingArg, out var selectedBuffering))
184             {
185                 switch (selectedBuffering)
186                 {
187                     case Buffering.Buffered:
188                         transport = new TBufferedTransport(transport);
189                         break;
190 
191                     case Buffering.Framed:
192                         transport = new TFramedTransport(transport);
193                         break;
194 
195                     default: // layered transport(s) are optional
196                         Debug.Assert(selectedBuffering == Buffering.None, "unhandled case");
197                         break;
198                 }
199             }
200 
201             return transport;
202         }
203 
GetNumberOfClients(string[] args)204         private static int GetNumberOfClients(string[] args)
205         {
206             var numClients = args.FirstOrDefault(x => x.StartsWith("-mc"))?.Split(':')?[1];
207 
208             Logger.LogInformation($"Selected # of clients: {numClients}");
209 
210             int c;
211             if( int.TryParse(numClients, out c) && (0 < c) && (c <= 100))
212 				return c;
213 			else
214 				return 1;
215         }
216 
GetCertificate()217         private static X509Certificate2 GetCertificate()
218         {
219             // due to files location in net core better to take certs from top folder
220             var certFile = GetCertPath(Directory.GetParent(Directory.GetCurrentDirectory()));
221             return new X509Certificate2(certFile, "ThriftTest");
222         }
223 
GetCertPath(DirectoryInfo di, int maxCount = 6)224         private static string GetCertPath(DirectoryInfo di, int maxCount = 6)
225         {
226             var topDir = di;
227             var certFile =
228                 topDir.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories)
229                     .FirstOrDefault();
230             if (certFile == null)
231             {
232                 if (maxCount == 0)
233                     throw new FileNotFoundException("Cannot find file in directories");
234                 return GetCertPath(di.Parent, maxCount - 1);
235             }
236 
237             return certFile.FullName;
238         }
239 
LocalCertificateSelectionCallback(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers)240         private static X509Certificate LocalCertificateSelectionCallback(object sender,
241             string targetHost, X509CertificateCollection localCertificates,
242             X509Certificate remoteCertificate, string[] acceptableIssuers)
243         {
244             return GetCertificate();
245         }
246 
CertValidator(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)247         private static bool CertValidator(object sender, X509Certificate certificate,
248             X509Chain chain, SslPolicyErrors sslPolicyErrors)
249         {
250             return true;
251         }
252 
GetProtocol(string[] args, TTransport transport)253         private static Tuple<Protocol, TProtocol> GetProtocol(string[] args, TTransport transport)
254         {
255             var protocol = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':')?[1];
256 
257             Protocol selectedProtocol;
258             if (Enum.TryParse(protocol, true, out selectedProtocol))
259             {
260                 switch (selectedProtocol)
261                 {
262                     case Protocol.Binary:
263                         return new Tuple<Protocol, TProtocol>(selectedProtocol, new TBinaryProtocol(transport));
264                     case Protocol.Compact:
265                         return new Tuple<Protocol, TProtocol>(selectedProtocol, new TCompactProtocol(transport));
266                     case Protocol.Json:
267                         return new Tuple<Protocol, TProtocol>(selectedProtocol, new TJsonProtocol(transport));
268                     case Protocol.Multiplexed:
269                         // it returns BinaryProtocol to avoid making wrapped protocol as public in TProtocolDecorator (in RunClientAsync it will be wrapped into Multiplexed protocol)
270                         return new Tuple<Protocol, TProtocol>(selectedProtocol, new TBinaryProtocol(transport));
271                     default:
272                         Debug.Assert(false, "unhandled case");
273                         break;
274                 }
275             }
276 
277             return new Tuple<Protocol, TProtocol>(selectedProtocol, new TBinaryProtocol(transport));
278         }
279 
RunClientAsync(Tuple<Protocol, TProtocol> protocolTuple, CancellationToken cancellationToken)280         private static async Task RunClientAsync(Tuple<Protocol, TProtocol> protocolTuple, CancellationToken cancellationToken)
281         {
282             try
283             {
284                 var protocol = protocolTuple.Item2;
285                 var protocolType = protocolTuple.Item1;
286 
287                 TBaseClient client = null;
288 
289                 try
290                 {
291                     if (protocolType != Protocol.Multiplexed)
292                     {
293 
294                         client = new Calculator.Client(protocol);
295                         await ExecuteCalculatorClientOperations(cancellationToken, (Calculator.Client)client);
296                     }
297                     else
298                     {
299                         // it uses binary protocol there  to create Multiplexed protocols
300                         var multiplex = new TMultiplexedProtocol(protocol, nameof(Calculator));
301                         client = new Calculator.Client(multiplex);
302                         await ExecuteCalculatorClientOperations(cancellationToken, (Calculator.Client)client);
303 
304                         multiplex = new TMultiplexedProtocol(protocol, nameof(SharedService));
305                         client = new SharedService.Client(multiplex);
306                         await ExecuteSharedServiceClientOperations(cancellationToken, (SharedService.Client)client);
307                     }
308                 }
309                 catch (Exception ex)
310                 {
311                     Logger.LogError($"{client?.ClientId} " + ex);
312                 }
313                 finally
314                 {
315                     protocol.Transport.Close();
316                 }
317             }
318             catch (TApplicationException x)
319             {
320                 Logger.LogError(x.ToString());
321             }
322         }
323 
ExecuteCalculatorClientOperations(CancellationToken cancellationToken, Calculator.Client client)324         private static async Task ExecuteCalculatorClientOperations(CancellationToken cancellationToken, Calculator.Client client)
325         {
326             await client.OpenTransportAsync(cancellationToken);
327 
328             // Async version
329 
330             Logger.LogInformation($"{client.ClientId} PingAsync()");
331             await client.pingAsync(cancellationToken);
332 
333             Logger.LogInformation($"{client.ClientId} AddAsync(1,1)");
334             var sum = await client.addAsync(1, 1, cancellationToken);
335             Logger.LogInformation($"{client.ClientId} AddAsync(1,1)={sum}");
336 
337             var work = new Work
338             {
339                 Op = Operation.DIVIDE,
340                 Num1 = 1,
341                 Num2 = 0
342             };
343 
344             try
345             {
346                 Logger.LogInformation($"{client.ClientId} CalculateAsync(1)");
347                 await client.calculateAsync(1, work, cancellationToken);
348                 Logger.LogInformation($"{client.ClientId} Whoa we can divide by 0");
349             }
350             catch (InvalidOperation io)
351             {
352                 Logger.LogInformation($"{client.ClientId} Invalid operation: " + io);
353             }
354 
355             work.Op = Operation.SUBTRACT;
356             work.Num1 = 15;
357             work.Num2 = 10;
358 
359             try
360             {
361                 Logger.LogInformation($"{client.ClientId} CalculateAsync(1)");
362                 var diff = await client.calculateAsync(1, work, cancellationToken);
363                 Logger.LogInformation($"{client.ClientId} 15-10={diff}");
364             }
365             catch (InvalidOperation io)
366             {
367                 Logger.LogInformation($"{client.ClientId} Invalid operation: " + io);
368             }
369 
370             Logger.LogInformation($"{client.ClientId} GetStructAsync(1)");
371             var log = await client.getStructAsync(1, cancellationToken);
372             Logger.LogInformation($"{client.ClientId} Check log: {log.Value}");
373 
374             Logger.LogInformation($"{client.ClientId} ZipAsync() with delay 100mc on server side");
375             await client.zipAsync(cancellationToken);
376         }
ExecuteSharedServiceClientOperations(CancellationToken cancellationToken, SharedService.Client client)377         private static async Task ExecuteSharedServiceClientOperations(CancellationToken cancellationToken, SharedService.Client client)
378         {
379             await client.OpenTransportAsync(cancellationToken);
380 
381             // Async version
382 
383             Logger.LogInformation($"{client.ClientId} SharedService GetStructAsync(1)");
384             var log = await client.getStructAsync(1, cancellationToken);
385             Logger.LogInformation($"{client.ClientId} SharedService Value: {log.Value}");
386         }
387 
388 
389         private enum Transport
390         {
391             Tcp,
392             NamedPipe,
393             Http,
394             TcpBuffered,
395             Framed,
396             TcpTls
397         }
398 
399         private enum Protocol
400         {
401             Binary,
402             Compact,
403             Json,
404             Multiplexed
405         }
406 
407         private enum Buffering
408         {
409             None,
410             Buffered,
411             Framed
412         }
413     }
414 }
415