1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Collections.Generic; 20 using System.IO; 21 using System.Linq; 22 using System.Net.Security; 23 using System.Security.Cryptography.X509Certificates; 24 using System.Threading; 25 using System.Threading.Tasks; 26 using Microsoft.AspNetCore.Builder; 27 using Microsoft.AspNetCore.Hosting; 28 using Microsoft.Extensions.Configuration; 29 using Microsoft.Extensions.DependencyInjection; 30 using Microsoft.Extensions.Logging; 31 using Thrift; 32 using Thrift.Protocol; 33 using Thrift.Server; 34 using Thrift.Transport; 35 using Thrift.Transport.Server; 36 using tutorial; 37 using shared; 38 using Thrift.Processor; 39 using System.Diagnostics; 40 41 namespace Server 42 { 43 public class Program 44 { 45 private static ServiceCollection ServiceCollection = new ServiceCollection(); 46 private static ILogger Logger; 47 private static readonly TConfiguration Configuration = null; // new TConfiguration() if needed 48 Main(string[] args)49 public static void Main(string[] args) 50 { 51 args = args ?? new string[0]; 52 53 ServiceCollection.AddLogging(logging => ConfigureLogging(logging)); 54 using (var serviceProvider = ServiceCollection.BuildServiceProvider()) 55 { 56 Logger = serviceProvider.GetService<ILoggerFactory>().CreateLogger(nameof(Server)); 57 58 if (args.Any(x => x.StartsWith("-help", StringComparison.OrdinalIgnoreCase))) 59 { 60 DisplayHelp(); 61 return; 62 } 63 64 using (var source = new CancellationTokenSource()) 65 { 66 RunAsync(args, source.Token).GetAwaiter().GetResult(); 67 68 Logger.LogInformation("Press any key to stop..."); 69 70 Console.ReadLine(); 71 source.Cancel(); 72 } 73 74 Logger.LogInformation("Server stopped"); 75 } 76 } 77 ConfigureLogging(ILoggingBuilder logging)78 private static void ConfigureLogging(ILoggingBuilder logging) 79 { 80 logging.SetMinimumLevel(LogLevel.Trace); 81 logging.AddConsole(); 82 logging.AddDebug(); 83 } 84 DisplayHelp()85 private static void DisplayHelp() 86 { 87 Logger.LogInformation(@" 88 Usage: 89 Server -help 90 will diplay help information 91 92 Server -tr:<transport> -bf:<buffering> -pr:<protocol> 93 will run server with specified arguments (tcp transport, no buffering, and binary protocol by default) 94 95 Options: 96 -tr (transport): 97 tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090) 98 namedpipe - namedpipe transport will be used (pipe address - "".test"") 99 http - http transport will be used (http address - ""localhost:9090"") 100 tcptls - tcp transport with tls will be used (host - ""localhost"", port - 9090) 101 102 -bf (buffering): 103 none - (default) no buffering will be used 104 buffered - buffered transport will be used 105 framed - framed transport will be used 106 107 -pr (protocol): 108 binary - (default) binary protocol will be used 109 compact - compact protocol will be used 110 json - json protocol will be used 111 multiplexed - multiplexed protocol will be used 112 113 Sample: 114 Server -tr:tcp 115 "); 116 } 117 RunAsync(string[] args, CancellationToken cancellationToken)118 private static async Task RunAsync(string[] args, CancellationToken cancellationToken) 119 { 120 var selectedTransport = GetTransport(args); 121 var selectedBuffering = GetBuffering(args); 122 var selectedProtocol = GetProtocol(args); 123 124 if (selectedTransport == Transport.Http) 125 { 126 new HttpServerSample().Run(cancellationToken); 127 } 128 else 129 { 130 await RunSelectedConfigurationAsync(selectedTransport, selectedBuffering, selectedProtocol, cancellationToken); 131 } 132 } 133 GetProtocol(string[] args)134 private static Protocol GetProtocol(string[] args) 135 { 136 var transport = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':')?[1]; 137 138 Enum.TryParse(transport, true, out Protocol selectedProtocol); 139 140 return selectedProtocol; 141 } 142 GetBuffering(string[] args)143 private static Buffering GetBuffering(string[] args) 144 { 145 var buffering = args.FirstOrDefault(x => x.StartsWith("-bf"))?.Split(":")?[1]; 146 147 Enum.TryParse<Buffering>(buffering, out var selectedBuffering); 148 149 return selectedBuffering; 150 } 151 GetTransport(string[] args)152 private static Transport GetTransport(string[] args) 153 { 154 var transport = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1]; 155 156 Enum.TryParse(transport, true, out Transport selectedTransport); 157 158 return selectedTransport; 159 } 160 RunSelectedConfigurationAsync(Transport transport, Buffering buffering, Protocol protocol, CancellationToken cancellationToken)161 private static async Task RunSelectedConfigurationAsync(Transport transport, Buffering buffering, Protocol protocol, CancellationToken cancellationToken) 162 { 163 var handler = new CalculatorAsyncHandler(); 164 165 TServerTransport serverTransport = null; 166 switch (transport) 167 { 168 case Transport.Tcp: 169 serverTransport = new TServerSocketTransport(9090, Configuration); 170 break; 171 case Transport.NamedPipe: 172 serverTransport = new TNamedPipeServerTransport(".test", Configuration); 173 break; 174 case Transport.TcpTls: 175 serverTransport = new TTlsServerSocketTransport(9090, Configuration, 176 GetCertificate(), ClientCertValidator, LocalCertificateSelectionCallback); 177 break; 178 } 179 180 TTransportFactory inputTransportFactory = null; 181 TTransportFactory outputTransportFactory = null; 182 switch (buffering) 183 { 184 case Buffering.Buffered: 185 inputTransportFactory = new TBufferedTransport.Factory(); 186 outputTransportFactory = new TBufferedTransport.Factory(); 187 break; 188 189 case Buffering.Framed: 190 inputTransportFactory = new TFramedTransport.Factory(); 191 outputTransportFactory = new TFramedTransport.Factory(); 192 break; 193 194 default: // layered transport(s) are optional 195 Debug.Assert(buffering == Buffering.None, "unhandled case"); 196 break; 197 } 198 199 TProtocolFactory inputProtocolFactory = null; 200 TProtocolFactory outputProtocolFactory = null; 201 ITAsyncProcessor processor = null; 202 switch (protocol) 203 { 204 case Protocol.Binary: 205 inputProtocolFactory = new TBinaryProtocol.Factory(); 206 outputProtocolFactory = new TBinaryProtocol.Factory(); 207 processor = new Calculator.AsyncProcessor(handler); 208 break; 209 210 case Protocol.Compact: 211 inputProtocolFactory = new TCompactProtocol.Factory(); 212 outputProtocolFactory = new TCompactProtocol.Factory(); 213 processor = new Calculator.AsyncProcessor(handler); 214 break; 215 216 case Protocol.Json: 217 inputProtocolFactory = new TJsonProtocol.Factory(); 218 outputProtocolFactory = new TJsonProtocol.Factory(); 219 processor = new Calculator.AsyncProcessor(handler); 220 break; 221 222 case Protocol.Multiplexed: 223 inputProtocolFactory = new TBinaryProtocol.Factory(); 224 outputProtocolFactory = new TBinaryProtocol.Factory(); 225 226 var calcHandler = new CalculatorAsyncHandler(); 227 var calcProcessor = new Calculator.AsyncProcessor(calcHandler); 228 229 var sharedServiceHandler = new SharedServiceAsyncHandler(); 230 var sharedServiceProcessor = new SharedService.AsyncProcessor(sharedServiceHandler); 231 232 var multiplexedProcessor = new TMultiplexedProcessor(); 233 multiplexedProcessor.RegisterProcessor(nameof(Calculator), calcProcessor); 234 multiplexedProcessor.RegisterProcessor(nameof(SharedService), sharedServiceProcessor); 235 236 processor = multiplexedProcessor; 237 break; 238 239 default: 240 throw new ArgumentOutOfRangeException(nameof(protocol), protocol, null); 241 } 242 243 244 try 245 { 246 Logger.LogInformation( 247 $"Selected TAsyncServer with {serverTransport} transport, {processor} processor and {inputProtocolFactory} protocol factories"); 248 249 var loggerFactory = ServiceCollection.BuildServiceProvider().GetService<ILoggerFactory>(); 250 251 var server = new TSimpleAsyncServer( 252 itProcessorFactory: new TSingletonProcessorFactory(processor), 253 serverTransport: serverTransport, 254 inputTransportFactory: inputTransportFactory, 255 outputTransportFactory: outputTransportFactory, 256 inputProtocolFactory: inputProtocolFactory, 257 outputProtocolFactory: outputProtocolFactory, 258 logger: loggerFactory.CreateLogger<TSimpleAsyncServer>()); 259 260 Logger.LogInformation("Starting the server..."); 261 262 await server.ServeAsync(cancellationToken); 263 } 264 catch (Exception x) 265 { 266 Logger.LogInformation(x.ToString()); 267 } 268 } 269 GetCertificate()270 private static X509Certificate2 GetCertificate() 271 { 272 // due to files location in net core better to take certs from top folder 273 var certFile = GetCertPath(Directory.GetParent(Directory.GetCurrentDirectory())); 274 return new X509Certificate2(certFile, "ThriftTest"); 275 } 276 GetCertPath(DirectoryInfo di, int maxCount = 6)277 private static string GetCertPath(DirectoryInfo di, int maxCount = 6) 278 { 279 var topDir = di; 280 var certFile = 281 topDir.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories) 282 .FirstOrDefault(); 283 if (certFile == null) 284 { 285 if (maxCount == 0) 286 throw new FileNotFoundException("Cannot find file in directories"); 287 return GetCertPath(di.Parent, maxCount - 1); 288 } 289 290 return certFile.FullName; 291 } 292 LocalCertificateSelectionCallback(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers)293 private static X509Certificate LocalCertificateSelectionCallback(object sender, 294 string targetHost, X509CertificateCollection localCertificates, 295 X509Certificate remoteCertificate, string[] acceptableIssuers) 296 { 297 return GetCertificate(); 298 } 299 ClientCertValidator(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)300 private static bool ClientCertValidator(object sender, X509Certificate certificate, 301 X509Chain chain, SslPolicyErrors sslPolicyErrors) 302 { 303 return true; 304 } 305 306 private enum Transport 307 { 308 Tcp, 309 NamedPipe, 310 Http, 311 TcpTls, 312 } 313 314 private enum Buffering 315 { 316 None, 317 Buffered, 318 Framed, 319 } 320 321 private enum Protocol 322 { 323 Binary, 324 Compact, 325 Json, 326 Multiplexed 327 } 328 329 public class HttpServerSample 330 { Run(CancellationToken cancellationToken)331 public void Run(CancellationToken cancellationToken) 332 { 333 var config = new ConfigurationBuilder() 334 .AddEnvironmentVariables(prefix: "ASPNETCORE_") 335 .Build(); 336 337 var host = new WebHostBuilder() 338 .UseConfiguration(config) 339 .UseKestrel() 340 .UseUrls("http://localhost:9090") 341 .UseContentRoot(Directory.GetCurrentDirectory()) 342 .UseStartup<Startup>() 343 .ConfigureLogging((ctx,logging) => ConfigureLogging(logging)) 344 .Build(); 345 346 Logger.LogTrace("test"); 347 Logger.LogCritical("test"); 348 host.RunAsync(cancellationToken).GetAwaiter().GetResult(); 349 } 350 351 public class Startup 352 { Startup(IWebHostEnvironment env)353 public Startup(IWebHostEnvironment env) 354 { 355 var builder = new ConfigurationBuilder() 356 .SetBasePath(env.ContentRootPath) 357 .AddEnvironmentVariables(); 358 359 Configuration = builder.Build(); 360 } 361 362 public IConfigurationRoot Configuration { get; } 363 364 // This method gets called by the runtime. Use this method to add services to the container. ConfigureServices(IServiceCollection services)365 public void ConfigureServices(IServiceCollection services) 366 { 367 services.AddTransient<Calculator.IAsync, CalculatorAsyncHandler>(); 368 services.AddTransient<ITAsyncProcessor, Calculator.AsyncProcessor>(); 369 services.AddTransient<THttpServerTransport, THttpServerTransport>(); 370 } 371 372 // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerFactory loggerFactory)373 public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerFactory loggerFactory) 374 { 375 app.UseMiddleware<THttpServerTransport>(); 376 } 377 } 378 } 379 380 public class CalculatorAsyncHandler : Calculator.IAsync 381 { 382 private readonly Dictionary<int, SharedStruct> _log = new Dictionary<int, SharedStruct>(); 383 CalculatorAsyncHandler()384 public CalculatorAsyncHandler() 385 { 386 } 387 getStructAsync(int key, CancellationToken cancellationToken)388 public async Task<SharedStruct> getStructAsync(int key, 389 CancellationToken cancellationToken) 390 { 391 Logger.LogInformation("GetStructAsync({0})", key); 392 return await Task.FromResult(_log[key]); 393 } 394 pingAsync(CancellationToken cancellationToken)395 public async Task pingAsync(CancellationToken cancellationToken) 396 { 397 Logger.LogInformation("PingAsync()"); 398 await Task.CompletedTask; 399 } 400 addAsync(int num1, int num2, CancellationToken cancellationToken)401 public async Task<int> addAsync(int num1, int num2, CancellationToken cancellationToken) 402 { 403 Logger.LogInformation($"AddAsync({num1},{num2})"); 404 return await Task.FromResult(num1 + num2); 405 } 406 calculateAsync(int logid, Work w, CancellationToken cancellationToken)407 public async Task<int> calculateAsync(int logid, Work w, CancellationToken cancellationToken) 408 { 409 Logger.LogInformation($"CalculateAsync({logid}, [{w.Op},{w.Num1},{w.Num2}])"); 410 411 var val = 0; 412 switch (w.Op) 413 { 414 case Operation.ADD: 415 val = w.Num1 + w.Num2; 416 break; 417 418 case Operation.SUBTRACT: 419 val = w.Num1 - w.Num2; 420 break; 421 422 case Operation.MULTIPLY: 423 val = w.Num1 * w.Num2; 424 break; 425 426 case Operation.DIVIDE: 427 if (w.Num2 == 0) 428 { 429 var io = new InvalidOperation 430 { 431 WhatOp = (int) w.Op, 432 Why = "Cannot divide by 0" 433 }; 434 435 throw io; 436 } 437 val = w.Num1 / w.Num2; 438 break; 439 440 default: 441 { 442 var io = new InvalidOperation 443 { 444 WhatOp = (int) w.Op, 445 Why = "Unknown operation" 446 }; 447 448 throw io; 449 } 450 } 451 452 var entry = new SharedStruct 453 { 454 Key = logid, 455 Value = val.ToString() 456 }; 457 458 _log[logid] = entry; 459 460 return await Task.FromResult(val); 461 } 462 zipAsync(CancellationToken cancellationToken)463 public async Task zipAsync(CancellationToken cancellationToken) 464 { 465 Logger.LogInformation("ZipAsync() with delay 100mc"); 466 await Task.Delay(100, CancellationToken.None); 467 } 468 } 469 470 public class SharedServiceAsyncHandler : SharedService.IAsync 471 { getStructAsync(int key, CancellationToken cancellationToken)472 public async Task<SharedStruct> getStructAsync(int key, CancellationToken cancellationToken) 473 { 474 Logger.LogInformation("GetStructAsync({0})", key); 475 return await Task.FromResult(new SharedStruct() 476 { 477 Key = key, 478 Value = "GetStructAsync" 479 }); 480 } 481 } 482 } 483 } 484