1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Collections.Generic; 20 using System.Diagnostics; 21 using System.IO; 22 using System.Linq; 23 using System.Net; 24 using System.Net.Security; 25 using System.Security.Cryptography.X509Certificates; 26 using System.Threading; 27 using System.Threading.Tasks; 28 using Microsoft.Extensions.Logging; 29 using Microsoft.Extensions.DependencyInjection; 30 using Thrift; 31 using Thrift.Protocol; 32 using Thrift.Transport; 33 using Thrift.Transport.Client; 34 using tutorial; 35 using shared; 36 37 namespace Client 38 { 39 public class Program 40 { 41 private static ServiceCollection ServiceCollection = new ServiceCollection(); 42 private static ILogger Logger; 43 private static readonly TConfiguration Configuration = null; // new TConfiguration() if needed 44 DisplayHelp()45 private static void DisplayHelp() 46 { 47 Logger.LogInformation(@" 48 Usage: 49 Client -help 50 will diplay help information 51 52 Client -tr:<transport> -bf:<buffering> -pr:<protocol> -mc:<numClients> 53 will run client with specified arguments (tcp transport and binary protocol by default) and with 1 client 54 55 Options: 56 -tr (transport): 57 tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090) 58 namedpipe - namedpipe transport will be used (pipe address - "".test"") 59 http - http transport will be used (address - ""http://localhost:9090"") 60 tcptls - tcp tls transport will be used (host - ""localhost"", port - 9090) 61 62 -bf (buffering): 63 none - (default) no buffering will be used 64 buffered - buffered transport will be used 65 framed - framed transport will be used 66 67 -pr (protocol): 68 binary - (default) binary protocol will be used 69 compact - compact protocol will be used 70 json - json protocol will be used 71 multiplexed - multiplexed protocol will be used 72 73 -mc (multiple clients): 74 <numClients> - number of multiple clients to connect to server (max 100, default 1) 75 76 Sample: 77 Client -tr:tcp -pr:binary 78 "); 79 } 80 Main(string[] args)81 public static void Main(string[] args) 82 { 83 args = args ?? new string[0]; 84 85 ServiceCollection.AddLogging(logging => ConfigureLogging(logging)); 86 using (var serviceProvider = ServiceCollection.BuildServiceProvider()) 87 { 88 Logger = serviceProvider.GetService<ILoggerFactory>().CreateLogger(nameof(Client)); 89 90 if (args.Any(x => x.StartsWith("-help", StringComparison.OrdinalIgnoreCase))) 91 { 92 DisplayHelp(); 93 return; 94 } 95 96 Logger.LogInformation("Starting client..."); 97 98 using (var source = new CancellationTokenSource()) 99 { 100 RunAsync(args, source.Token).GetAwaiter().GetResult(); 101 } 102 } 103 } 104 ConfigureLogging(ILoggingBuilder logging)105 private static void ConfigureLogging(ILoggingBuilder logging) 106 { 107 logging.SetMinimumLevel(LogLevel.Trace); 108 logging.AddConsole(); 109 logging.AddDebug(); 110 } 111 RunAsync(string[] args, CancellationToken cancellationToken)112 private static async Task RunAsync(string[] args, CancellationToken cancellationToken) 113 { 114 var numClients = GetNumberOfClients(args); 115 116 Logger.LogInformation($"Selected # of clients: {numClients}"); 117 118 var transports = new TTransport[numClients]; 119 for (int i = 0; i < numClients; i++) 120 { 121 var t = GetTransport(args); 122 transports[i] = t; 123 } 124 125 Logger.LogInformation($"Selected client transport: {transports[0]}"); 126 127 var protocols = new Tuple<Protocol, TProtocol>[numClients]; 128 for (int i = 0; i < numClients; i++) 129 { 130 var p = GetProtocol(args, transports[i]); 131 protocols[i] = p; 132 } 133 134 Logger.LogInformation($"Selected client protocol: {protocols[0].Item1}"); 135 136 var tasks = new Task[numClients]; 137 for (int i = 0; i < numClients; i++) 138 { 139 var task = RunClientAsync(protocols[i], cancellationToken); 140 tasks[i] = task; 141 } 142 143 Task.WaitAll(tasks); 144 145 await Task.CompletedTask; 146 } 147 GetTransport(string[] args)148 private static TTransport GetTransport(string[] args) 149 { 150 TTransport transport = new TSocketTransport(IPAddress.Loopback, 9090, Configuration); 151 152 // construct endpoint transport 153 var transportArg = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1]; 154 if (Enum.TryParse(transportArg, true, out Transport selectedTransport)) 155 { 156 switch (selectedTransport) 157 { 158 case Transport.Tcp: 159 transport = new TSocketTransport(IPAddress.Loopback, 9090, Configuration); 160 break; 161 162 case Transport.NamedPipe: 163 transport = new TNamedPipeTransport(".test", Configuration); 164 break; 165 166 case Transport.Http: 167 transport = new THttpTransport(new Uri("http://localhost:9090"), Configuration); 168 break; 169 170 case Transport.TcpTls: 171 transport = new TTlsSocketTransport(IPAddress.Loopback, 9090, Configuration, 172 GetCertificate(), CertValidator, LocalCertificateSelectionCallback); 173 break; 174 175 default: 176 Debug.Assert(false, "unhandled case"); 177 break; 178 } 179 } 180 181 // optionally add layered transport(s) 182 var bufferingArg = args.FirstOrDefault(x => x.StartsWith("-bf"))?.Split(':')?[1]; 183 if (Enum.TryParse<Buffering>(bufferingArg, out var selectedBuffering)) 184 { 185 switch (selectedBuffering) 186 { 187 case Buffering.Buffered: 188 transport = new TBufferedTransport(transport); 189 break; 190 191 case Buffering.Framed: 192 transport = new TFramedTransport(transport); 193 break; 194 195 default: // layered transport(s) are optional 196 Debug.Assert(selectedBuffering == Buffering.None, "unhandled case"); 197 break; 198 } 199 } 200 201 return transport; 202 } 203 GetNumberOfClients(string[] args)204 private static int GetNumberOfClients(string[] args) 205 { 206 var numClients = args.FirstOrDefault(x => x.StartsWith("-mc"))?.Split(':')?[1]; 207 208 Logger.LogInformation($"Selected # of clients: {numClients}"); 209 210 int c; 211 if( int.TryParse(numClients, out c) && (0 < c) && (c <= 100)) 212 return c; 213 else 214 return 1; 215 } 216 GetCertificate()217 private static X509Certificate2 GetCertificate() 218 { 219 // due to files location in net core better to take certs from top folder 220 var certFile = GetCertPath(Directory.GetParent(Directory.GetCurrentDirectory())); 221 return new X509Certificate2(certFile, "ThriftTest"); 222 } 223 GetCertPath(DirectoryInfo di, int maxCount = 6)224 private static string GetCertPath(DirectoryInfo di, int maxCount = 6) 225 { 226 var topDir = di; 227 var certFile = 228 topDir.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories) 229 .FirstOrDefault(); 230 if (certFile == null) 231 { 232 if (maxCount == 0) 233 throw new FileNotFoundException("Cannot find file in directories"); 234 return GetCertPath(di.Parent, maxCount - 1); 235 } 236 237 return certFile.FullName; 238 } 239 LocalCertificateSelectionCallback(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers)240 private static X509Certificate LocalCertificateSelectionCallback(object sender, 241 string targetHost, X509CertificateCollection localCertificates, 242 X509Certificate remoteCertificate, string[] acceptableIssuers) 243 { 244 return GetCertificate(); 245 } 246 CertValidator(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)247 private static bool CertValidator(object sender, X509Certificate certificate, 248 X509Chain chain, SslPolicyErrors sslPolicyErrors) 249 { 250 return true; 251 } 252 GetProtocol(string[] args, TTransport transport)253 private static Tuple<Protocol, TProtocol> GetProtocol(string[] args, TTransport transport) 254 { 255 var protocol = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':')?[1]; 256 257 Protocol selectedProtocol; 258 if (Enum.TryParse(protocol, true, out selectedProtocol)) 259 { 260 switch (selectedProtocol) 261 { 262 case Protocol.Binary: 263 return new Tuple<Protocol, TProtocol>(selectedProtocol, new TBinaryProtocol(transport)); 264 case Protocol.Compact: 265 return new Tuple<Protocol, TProtocol>(selectedProtocol, new TCompactProtocol(transport)); 266 case Protocol.Json: 267 return new Tuple<Protocol, TProtocol>(selectedProtocol, new TJsonProtocol(transport)); 268 case Protocol.Multiplexed: 269 // it returns BinaryProtocol to avoid making wrapped protocol as public in TProtocolDecorator (in RunClientAsync it will be wrapped into Multiplexed protocol) 270 return new Tuple<Protocol, TProtocol>(selectedProtocol, new TBinaryProtocol(transport)); 271 default: 272 Debug.Assert(false, "unhandled case"); 273 break; 274 } 275 } 276 277 return new Tuple<Protocol, TProtocol>(selectedProtocol, new TBinaryProtocol(transport)); 278 } 279 RunClientAsync(Tuple<Protocol, TProtocol> protocolTuple, CancellationToken cancellationToken)280 private static async Task RunClientAsync(Tuple<Protocol, TProtocol> protocolTuple, CancellationToken cancellationToken) 281 { 282 try 283 { 284 var protocol = protocolTuple.Item2; 285 var protocolType = protocolTuple.Item1; 286 287 TBaseClient client = null; 288 289 try 290 { 291 if (protocolType != Protocol.Multiplexed) 292 { 293 294 client = new Calculator.Client(protocol); 295 await ExecuteCalculatorClientOperations(cancellationToken, (Calculator.Client)client); 296 } 297 else 298 { 299 // it uses binary protocol there to create Multiplexed protocols 300 var multiplex = new TMultiplexedProtocol(protocol, nameof(Calculator)); 301 client = new Calculator.Client(multiplex); 302 await ExecuteCalculatorClientOperations(cancellationToken, (Calculator.Client)client); 303 304 multiplex = new TMultiplexedProtocol(protocol, nameof(SharedService)); 305 client = new SharedService.Client(multiplex); 306 await ExecuteSharedServiceClientOperations(cancellationToken, (SharedService.Client)client); 307 } 308 } 309 catch (Exception ex) 310 { 311 Logger.LogError($"{client?.ClientId} " + ex); 312 } 313 finally 314 { 315 protocol.Transport.Close(); 316 } 317 } 318 catch (TApplicationException x) 319 { 320 Logger.LogError(x.ToString()); 321 } 322 } 323 ExecuteCalculatorClientOperations(CancellationToken cancellationToken, Calculator.Client client)324 private static async Task ExecuteCalculatorClientOperations(CancellationToken cancellationToken, Calculator.Client client) 325 { 326 await client.OpenTransportAsync(cancellationToken); 327 328 // Async version 329 330 Logger.LogInformation($"{client.ClientId} PingAsync()"); 331 await client.pingAsync(cancellationToken); 332 333 Logger.LogInformation($"{client.ClientId} AddAsync(1,1)"); 334 var sum = await client.addAsync(1, 1, cancellationToken); 335 Logger.LogInformation($"{client.ClientId} AddAsync(1,1)={sum}"); 336 337 var work = new Work 338 { 339 Op = Operation.DIVIDE, 340 Num1 = 1, 341 Num2 = 0 342 }; 343 344 try 345 { 346 Logger.LogInformation($"{client.ClientId} CalculateAsync(1)"); 347 await client.calculateAsync(1, work, cancellationToken); 348 Logger.LogInformation($"{client.ClientId} Whoa we can divide by 0"); 349 } 350 catch (InvalidOperation io) 351 { 352 Logger.LogInformation($"{client.ClientId} Invalid operation: " + io); 353 } 354 355 work.Op = Operation.SUBTRACT; 356 work.Num1 = 15; 357 work.Num2 = 10; 358 359 try 360 { 361 Logger.LogInformation($"{client.ClientId} CalculateAsync(1)"); 362 var diff = await client.calculateAsync(1, work, cancellationToken); 363 Logger.LogInformation($"{client.ClientId} 15-10={diff}"); 364 } 365 catch (InvalidOperation io) 366 { 367 Logger.LogInformation($"{client.ClientId} Invalid operation: " + io); 368 } 369 370 Logger.LogInformation($"{client.ClientId} GetStructAsync(1)"); 371 var log = await client.getStructAsync(1, cancellationToken); 372 Logger.LogInformation($"{client.ClientId} Check log: {log.Value}"); 373 374 Logger.LogInformation($"{client.ClientId} ZipAsync() with delay 100mc on server side"); 375 await client.zipAsync(cancellationToken); 376 } ExecuteSharedServiceClientOperations(CancellationToken cancellationToken, SharedService.Client client)377 private static async Task ExecuteSharedServiceClientOperations(CancellationToken cancellationToken, SharedService.Client client) 378 { 379 await client.OpenTransportAsync(cancellationToken); 380 381 // Async version 382 383 Logger.LogInformation($"{client.ClientId} SharedService GetStructAsync(1)"); 384 var log = await client.getStructAsync(1, cancellationToken); 385 Logger.LogInformation($"{client.ClientId} SharedService Value: {log.Value}"); 386 } 387 388 389 private enum Transport 390 { 391 Tcp, 392 NamedPipe, 393 Http, 394 TcpBuffered, 395 Framed, 396 TcpTls 397 } 398 399 private enum Protocol 400 { 401 Binary, 402 Compact, 403 Json, 404 Multiplexed 405 } 406 407 private enum Buffering 408 { 409 None, 410 Buffered, 411 Framed 412 } 413 } 414 } 415