1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Collections.Generic; 20 using System.IO; 21 using System.Linq; 22 using System.Net.Security; 23 using System.Security.Cryptography.X509Certificates; 24 using System.Threading; 25 using System.Threading.Tasks; 26 using Microsoft.AspNetCore.Builder; 27 using Microsoft.AspNetCore.Hosting; 28 using Microsoft.Extensions.Configuration; 29 using Microsoft.Extensions.DependencyInjection; 30 using Microsoft.Extensions.Logging; 31 using Thrift; 32 using Thrift.Protocols; 33 using Thrift.Server; 34 using Thrift.Transports; 35 using Thrift.Transports.Server; 36 using tutorial; 37 using shared; 38 39 namespace Server 40 { 41 public class Program 42 { 43 private static readonly ILogger Logger = new LoggerFactory().AddConsole(LogLevel.Trace).AddDebug(LogLevel.Trace).CreateLogger(nameof(Server)); 44 Main(string[] args)45 public static void Main(string[] args) 46 { 47 args = args ?? new string[0]; 48 49 if (args.Any(x => x.StartsWith("-help", StringComparison.OrdinalIgnoreCase))) 50 { 51 DisplayHelp(); 52 return; 53 } 54 55 using (var source = new CancellationTokenSource()) 56 { 57 RunAsync(args, source.Token).GetAwaiter().GetResult(); 58 59 Logger.LogInformation("Press any key to stop..."); 60 61 Console.ReadLine(); 62 source.Cancel(); 63 } 64 65 Logger.LogInformation("Server stopped"); 66 } 67 DisplayHelp()68 private static void DisplayHelp() 69 { 70 Logger.LogInformation(@" 71 Usage: 72 Server.exe -help 73 will diplay help information 74 75 Server.exe -tr:<transport> -pr:<protocol> 76 will run server with specified arguments (tcp transport and binary protocol by default) 77 78 Options: 79 -tr (transport): 80 tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090) 81 tcpbuffered - tcp buffered transport will be used (host - ""localhost"", port - 9090) 82 namedpipe - namedpipe transport will be used (pipe address - "".test"") 83 http - http transport will be used (http address - ""localhost:9090"") 84 tcptls - tcp transport with tls will be used (host - ""localhost"", port - 9090) 85 framed - tcp framed transport will be used (host - ""localhost"", port - 9090) 86 87 -pr (protocol): 88 binary - (default) binary protocol will be used 89 compact - compact protocol will be used 90 json - json protocol will be used 91 multiplexed - multiplexed protocol will be used 92 93 Sample: 94 Server.exe -tr:tcp 95 "); 96 } 97 RunAsync(string[] args, CancellationToken cancellationToken)98 private static async Task RunAsync(string[] args, CancellationToken cancellationToken) 99 { 100 var selectedTransport = GetTransport(args); 101 var selectedProtocol = GetProtocol(args); 102 103 if (selectedTransport == Transport.Http) 104 { 105 new HttpServerSample().Run(cancellationToken); 106 } 107 else 108 { 109 await RunSelectedConfigurationAsync(selectedTransport, selectedProtocol, cancellationToken); 110 } 111 } 112 GetProtocol(string[] args)113 private static Protocol GetProtocol(string[] args) 114 { 115 var transport = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':')?[1]; 116 117 Enum.TryParse(transport, true, out Protocol selectedProtocol); 118 119 return selectedProtocol; 120 } 121 GetTransport(string[] args)122 private static Transport GetTransport(string[] args) 123 { 124 var transport = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1]; 125 126 Enum.TryParse(transport, true, out Transport selectedTransport); 127 128 return selectedTransport; 129 } 130 RunSelectedConfigurationAsync(Transport transport, Protocol protocol, CancellationToken cancellationToken)131 private static async Task RunSelectedConfigurationAsync(Transport transport, Protocol protocol, CancellationToken cancellationToken) 132 { 133 var fabric = new LoggerFactory().AddConsole(LogLevel.Trace).AddDebug(LogLevel.Trace); 134 var handler = new CalculatorAsyncHandler(); 135 ITAsyncProcessor processor = null; 136 137 TServerTransport serverTransport = null; 138 139 switch (transport) 140 { 141 case Transport.Tcp: 142 serverTransport = new TServerSocketTransport(9090); 143 break; 144 case Transport.TcpBuffered: 145 serverTransport = new TServerSocketTransport(port: 9090, clientTimeout: 10000, useBufferedSockets: true); 146 break; 147 case Transport.NamedPipe: 148 serverTransport = new TNamedPipeServerTransport(".test"); 149 break; 150 case Transport.TcpTls: 151 serverTransport = new TTlsServerSocketTransport(9090, false, GetCertificate(), ClientCertValidator, LocalCertificateSelectionCallback); 152 break; 153 case Transport.Framed: 154 serverTransport = new TServerFramedTransport(9090); 155 break; 156 } 157 158 ITProtocolFactory inputProtocolFactory; 159 ITProtocolFactory outputProtocolFactory; 160 161 switch (protocol) 162 { 163 case Protocol.Binary: 164 { 165 inputProtocolFactory = new TBinaryProtocol.Factory(); 166 outputProtocolFactory = new TBinaryProtocol.Factory(); 167 processor = new Calculator.AsyncProcessor(handler); 168 } 169 break; 170 case Protocol.Compact: 171 { 172 inputProtocolFactory = new TCompactProtocol.Factory(); 173 outputProtocolFactory = new TCompactProtocol.Factory(); 174 processor = new Calculator.AsyncProcessor(handler); 175 } 176 break; 177 case Protocol.Json: 178 { 179 inputProtocolFactory = new TJsonProtocol.Factory(); 180 outputProtocolFactory = new TJsonProtocol.Factory(); 181 processor = new Calculator.AsyncProcessor(handler); 182 } 183 break; 184 case Protocol.Multiplexed: 185 { 186 inputProtocolFactory = new TBinaryProtocol.Factory(); 187 outputProtocolFactory = new TBinaryProtocol.Factory(); 188 189 var calcHandler = new CalculatorAsyncHandler(); 190 var calcProcessor = new Calculator.AsyncProcessor(calcHandler); 191 192 var sharedServiceHandler = new SharedServiceAsyncHandler(); 193 var sharedServiceProcessor = new SharedService.AsyncProcessor(sharedServiceHandler); 194 195 var multiplexedProcessor = new TMultiplexedProcessor(); 196 multiplexedProcessor.RegisterProcessor(nameof(Calculator), calcProcessor); 197 multiplexedProcessor.RegisterProcessor(nameof(SharedService), sharedServiceProcessor); 198 199 processor = multiplexedProcessor; 200 } 201 break; 202 default: 203 throw new ArgumentOutOfRangeException(nameof(protocol), protocol, null); 204 } 205 206 try 207 { 208 Logger.LogInformation( 209 $"Selected TAsyncServer with {serverTransport} transport, {processor} processor and {inputProtocolFactory} protocol factories"); 210 211 var server = new AsyncBaseServer(processor, serverTransport, inputProtocolFactory, outputProtocolFactory, fabric); 212 213 Logger.LogInformation("Starting the server..."); 214 await server.ServeAsync(cancellationToken); 215 } 216 catch (Exception x) 217 { 218 Logger.LogInformation(x.ToString()); 219 } 220 } 221 GetCertificate()222 private static X509Certificate2 GetCertificate() 223 { 224 // due to files location in net core better to take certs from top folder 225 var certFile = GetCertPath(Directory.GetParent(Directory.GetCurrentDirectory())); 226 return new X509Certificate2(certFile, "ThriftTest"); 227 } 228 GetCertPath(DirectoryInfo di, int maxCount = 6)229 private static string GetCertPath(DirectoryInfo di, int maxCount = 6) 230 { 231 var topDir = di; 232 var certFile = 233 topDir.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories) 234 .FirstOrDefault(); 235 if (certFile == null) 236 { 237 if (maxCount == 0) 238 throw new FileNotFoundException("Cannot find file in directories"); 239 return GetCertPath(di.Parent, maxCount - 1); 240 } 241 242 return certFile.FullName; 243 } 244 LocalCertificateSelectionCallback(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers)245 private static X509Certificate LocalCertificateSelectionCallback(object sender, 246 string targetHost, X509CertificateCollection localCertificates, 247 X509Certificate remoteCertificate, string[] acceptableIssuers) 248 { 249 return GetCertificate(); 250 } 251 ClientCertValidator(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)252 private static bool ClientCertValidator(object sender, X509Certificate certificate, 253 X509Chain chain, SslPolicyErrors sslPolicyErrors) 254 { 255 return true; 256 } 257 258 private enum Transport 259 { 260 Tcp, 261 TcpBuffered, 262 NamedPipe, 263 Http, 264 TcpTls, 265 Framed 266 } 267 268 private enum Protocol 269 { 270 Binary, 271 Compact, 272 Json, 273 Multiplexed 274 } 275 276 public class HttpServerSample 277 { Run(CancellationToken cancellationToken)278 public void Run(CancellationToken cancellationToken) 279 { 280 var config = new ConfigurationBuilder() 281 .AddEnvironmentVariables(prefix: "ASPNETCORE_") 282 .Build(); 283 284 var host = new WebHostBuilder() 285 .UseConfiguration(config) 286 .UseKestrel() 287 .UseUrls("http://localhost:9090") 288 .UseContentRoot(Directory.GetCurrentDirectory()) 289 .UseStartup<Startup>() 290 .Build(); 291 292 host.RunAsync(cancellationToken).GetAwaiter().GetResult(); 293 } 294 295 public class Startup 296 { Startup(IHostingEnvironment env)297 public Startup(IHostingEnvironment env) 298 { 299 var builder = new ConfigurationBuilder() 300 .SetBasePath(env.ContentRootPath) 301 .AddEnvironmentVariables(); 302 303 Configuration = builder.Build(); 304 } 305 306 public IConfigurationRoot Configuration { get; } 307 308 // This method gets called by the runtime. Use this method to add services to the container. ConfigureServices(IServiceCollection services)309 public void ConfigureServices(IServiceCollection services) 310 { 311 services.AddTransient<Calculator.IAsync, CalculatorAsyncHandler>(); 312 services.AddTransient<ITAsyncProcessor, Calculator.AsyncProcessor>(); 313 services.AddTransient<THttpServerTransport, THttpServerTransport>(); 314 } 315 316 // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)317 public void Configure(IApplicationBuilder app, IHostingEnvironment env, 318 ILoggerFactory loggerFactory) 319 { 320 app.UseMiddleware<THttpServerTransport>(); 321 } 322 } 323 } 324 325 public class CalculatorAsyncHandler : Calculator.IAsync 326 { 327 private readonly Dictionary<int, SharedStruct> _log = new Dictionary<int, SharedStruct>(); 328 CalculatorAsyncHandler()329 public CalculatorAsyncHandler() 330 { 331 } 332 getStructAsync(int key, CancellationToken cancellationToken)333 public async Task<SharedStruct> getStructAsync(int key, 334 CancellationToken cancellationToken) 335 { 336 Logger.LogInformation("GetStructAsync({0})", key); 337 return await Task.FromResult(_log[key]); 338 } 339 pingAsync(CancellationToken cancellationToken)340 public async Task pingAsync(CancellationToken cancellationToken) 341 { 342 Logger.LogInformation("PingAsync()"); 343 await Task.CompletedTask; 344 } 345 addAsync(int num1, int num2, CancellationToken cancellationToken)346 public async Task<int> addAsync(int num1, int num2, CancellationToken cancellationToken) 347 { 348 Logger.LogInformation($"AddAsync({num1},{num2})"); 349 return await Task.FromResult(num1 + num2); 350 } 351 calculateAsync(int logid, Work w, CancellationToken cancellationToken)352 public async Task<int> calculateAsync(int logid, Work w, CancellationToken cancellationToken) 353 { 354 Logger.LogInformation($"CalculateAsync({logid}, [{w.Op},{w.Num1},{w.Num2}])"); 355 356 var val = 0; 357 switch (w.Op) 358 { 359 case Operation.ADD: 360 val = w.Num1 + w.Num2; 361 break; 362 363 case Operation.SUBTRACT: 364 val = w.Num1 - w.Num2; 365 break; 366 367 case Operation.MULTIPLY: 368 val = w.Num1 * w.Num2; 369 break; 370 371 case Operation.DIVIDE: 372 if (w.Num2 == 0) 373 { 374 var io = new InvalidOperation 375 { 376 WhatOp = (int) w.Op, 377 Why = "Cannot divide by 0" 378 }; 379 380 throw io; 381 } 382 val = w.Num1 / w.Num2; 383 break; 384 385 default: 386 { 387 var io = new InvalidOperation 388 { 389 WhatOp = (int) w.Op, 390 Why = "Unknown operation" 391 }; 392 393 throw io; 394 } 395 } 396 397 var entry = new SharedStruct 398 { 399 Key = logid, 400 Value = val.ToString() 401 }; 402 403 _log[logid] = entry; 404 405 return await Task.FromResult(val); 406 } 407 zipAsync(CancellationToken cancellationToken)408 public async Task zipAsync(CancellationToken cancellationToken) 409 { 410 Logger.LogInformation("ZipAsync() with delay 100mc"); 411 await Task.Delay(100, CancellationToken.None); 412 } 413 } 414 415 public class SharedServiceAsyncHandler : SharedService.IAsync 416 { getStructAsync(int key, CancellationToken cancellationToken)417 public async Task<SharedStruct> getStructAsync(int key, CancellationToken cancellationToken) 418 { 419 Logger.LogInformation("GetStructAsync({0})", key); 420 return await Task.FromResult(new SharedStruct() 421 { 422 Key = key, 423 Value = "GetStructAsync" 424 }); 425 } 426 } 427 } 428 } 429