1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Collections.Generic;
20 using System.IO;
21 using System.Linq;
22 using System.Net.Security;
23 using System.Security.Cryptography.X509Certificates;
24 using System.Threading;
25 using System.Threading.Tasks;
26 using Microsoft.AspNetCore.Builder;
27 using Microsoft.AspNetCore.Hosting;
28 using Microsoft.Extensions.Configuration;
29 using Microsoft.Extensions.DependencyInjection;
30 using Microsoft.Extensions.Logging;
31 using Thrift;
32 using Thrift.Protocol;
33 using Thrift.Server;
34 using Thrift.Transport;
35 using Thrift.Transport.Server;
36 using tutorial;
37 using shared;
38 using Thrift.Processor;
39 using System.Diagnostics;
40 
41 namespace Server
42 {
43     public class Program
44     {
45         private static ServiceCollection ServiceCollection = new ServiceCollection();
46         private static ILogger Logger;
47         private static readonly TConfiguration Configuration = null;  // new TConfiguration() if  needed
48 
Main(string[] args)49         public static void Main(string[] args)
50         {
51             args = args ?? new string[0];
52 
53             ServiceCollection.AddLogging(logging => ConfigureLogging(logging));
54             using (var serviceProvider = ServiceCollection.BuildServiceProvider())
55             {
56                 Logger = serviceProvider.GetService<ILoggerFactory>().CreateLogger(nameof(Server));
57 
58                 if (args.Any(x => x.StartsWith("-help", StringComparison.OrdinalIgnoreCase)))
59                 {
60                     DisplayHelp();
61                     return;
62                 }
63 
64                 using (var source = new CancellationTokenSource())
65                 {
66                     RunAsync(args, source.Token).GetAwaiter().GetResult();
67 
68                     Logger.LogInformation("Press any key to stop...");
69 
70                     Console.ReadLine();
71                     source.Cancel();
72                 }
73 
74                 Logger.LogInformation("Server stopped");
75             }
76         }
77 
ConfigureLogging(ILoggingBuilder logging)78         private static void ConfigureLogging(ILoggingBuilder logging)
79         {
80             logging.SetMinimumLevel(LogLevel.Trace);
81             logging.AddConsole();
82             logging.AddDebug();
83         }
84 
DisplayHelp()85         private static void DisplayHelp()
86         {
87             Logger.LogInformation(@"
88 Usage:
89     Server -help
90         will diplay help information
91 
92     Server -tr:<transport> -bf:<buffering> -pr:<protocol>
93         will run server with specified arguments (tcp transport, no buffering, and binary protocol by default)
94 
95 Options:
96     -tr (transport):
97         tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090)
98         namedpipe - namedpipe transport will be used (pipe address - "".test"")
99         http - http transport will be used (http address - ""localhost:9090"")
100         tcptls - tcp transport with tls will be used (host - ""localhost"", port - 9090)
101 
102     -bf (buffering):
103         none - (default) no buffering will be used
104         buffered - buffered transport will be used
105         framed - framed transport will be used
106 
107     -pr (protocol):
108         binary - (default) binary protocol will be used
109         compact - compact protocol will be used
110         json - json protocol will be used
111         multiplexed - multiplexed protocol will be used
112 
113 Sample:
114     Server -tr:tcp
115 ");
116         }
117 
RunAsync(string[] args, CancellationToken cancellationToken)118         private static async Task RunAsync(string[] args, CancellationToken cancellationToken)
119         {
120             var selectedTransport = GetTransport(args);
121             var selectedBuffering = GetBuffering(args);
122             var selectedProtocol = GetProtocol(args);
123 
124             if (selectedTransport == Transport.Http)
125             {
126                 new HttpServerSample().Run(cancellationToken);
127             }
128             else
129             {
130                 await RunSelectedConfigurationAsync(selectedTransport, selectedBuffering, selectedProtocol, cancellationToken);
131             }
132         }
133 
GetProtocol(string[] args)134         private static Protocol GetProtocol(string[] args)
135         {
136             var transport = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':')?[1];
137 
138             Enum.TryParse(transport, true, out Protocol selectedProtocol);
139 
140             return selectedProtocol;
141         }
142 
GetBuffering(string[] args)143         private static Buffering GetBuffering(string[] args)
144         {
145             var buffering = args.FirstOrDefault(x => x.StartsWith("-bf"))?.Split(":")?[1];
146 
147             Enum.TryParse<Buffering>(buffering, out var selectedBuffering);
148 
149             return selectedBuffering;
150         }
151 
GetTransport(string[] args)152         private static Transport GetTransport(string[] args)
153         {
154             var transport = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1];
155 
156             Enum.TryParse(transport, true, out Transport selectedTransport);
157 
158             return selectedTransport;
159         }
160 
RunSelectedConfigurationAsync(Transport transport, Buffering buffering, Protocol protocol, CancellationToken cancellationToken)161         private static async Task RunSelectedConfigurationAsync(Transport transport, Buffering buffering, Protocol protocol, CancellationToken cancellationToken)
162         {
163             var handler = new CalculatorAsyncHandler();
164 
165             TServerTransport serverTransport = null;
166             switch (transport)
167             {
168                 case Transport.Tcp:
169                     serverTransport = new TServerSocketTransport(9090, Configuration);
170                     break;
171                 case Transport.NamedPipe:
172                     serverTransport = new TNamedPipeServerTransport(".test", Configuration);
173                     break;
174                 case Transport.TcpTls:
175                     serverTransport = new TTlsServerSocketTransport(9090, Configuration,
176                         GetCertificate(), ClientCertValidator, LocalCertificateSelectionCallback);
177                     break;
178             }
179 
180             TTransportFactory inputTransportFactory = null;
181             TTransportFactory outputTransportFactory = null;
182             switch (buffering)
183             {
184                 case Buffering.Buffered:
185                     inputTransportFactory = new TBufferedTransport.Factory();
186                     outputTransportFactory = new TBufferedTransport.Factory();
187                     break;
188 
189                 case Buffering.Framed:
190                     inputTransportFactory = new TFramedTransport.Factory();
191                     outputTransportFactory = new TFramedTransport.Factory();
192                     break;
193 
194                 default: // layered transport(s) are optional
195                     Debug.Assert(buffering == Buffering.None, "unhandled case");
196                     break;
197             }
198 
199             TProtocolFactory inputProtocolFactory = null;
200             TProtocolFactory outputProtocolFactory = null;
201             ITAsyncProcessor processor = null;
202             switch (protocol)
203             {
204                 case Protocol.Binary:
205                     inputProtocolFactory = new TBinaryProtocol.Factory();
206                     outputProtocolFactory = new TBinaryProtocol.Factory();
207                     processor = new Calculator.AsyncProcessor(handler);
208                     break;
209 
210                 case Protocol.Compact:
211                     inputProtocolFactory = new TCompactProtocol.Factory();
212                     outputProtocolFactory = new TCompactProtocol.Factory();
213                     processor = new Calculator.AsyncProcessor(handler);
214                     break;
215 
216                 case Protocol.Json:
217                     inputProtocolFactory = new TJsonProtocol.Factory();
218                     outputProtocolFactory = new TJsonProtocol.Factory();
219                     processor = new Calculator.AsyncProcessor(handler);
220                     break;
221 
222                 case Protocol.Multiplexed:
223                     inputProtocolFactory = new TBinaryProtocol.Factory();
224                     outputProtocolFactory = new TBinaryProtocol.Factory();
225 
226                     var calcHandler = new CalculatorAsyncHandler();
227                     var calcProcessor = new Calculator.AsyncProcessor(calcHandler);
228 
229                     var sharedServiceHandler = new SharedServiceAsyncHandler();
230                     var sharedServiceProcessor = new SharedService.AsyncProcessor(sharedServiceHandler);
231 
232                     var multiplexedProcessor = new TMultiplexedProcessor();
233                     multiplexedProcessor.RegisterProcessor(nameof(Calculator), calcProcessor);
234                     multiplexedProcessor.RegisterProcessor(nameof(SharedService), sharedServiceProcessor);
235 
236                     processor = multiplexedProcessor;
237                     break;
238 
239                 default:
240                     throw new ArgumentOutOfRangeException(nameof(protocol), protocol, null);
241             }
242 
243 
244             try
245             {
246                 Logger.LogInformation(
247                     $"Selected TAsyncServer with {serverTransport} transport, {processor} processor and {inputProtocolFactory} protocol factories");
248 
249                 var loggerFactory = ServiceCollection.BuildServiceProvider().GetService<ILoggerFactory>();
250 
251                 var server = new TSimpleAsyncServer(
252                     itProcessorFactory: new TSingletonProcessorFactory(processor),
253                     serverTransport: serverTransport,
254                     inputTransportFactory: inputTransportFactory,
255                     outputTransportFactory: outputTransportFactory,
256                     inputProtocolFactory: inputProtocolFactory,
257                     outputProtocolFactory: outputProtocolFactory,
258                     logger: loggerFactory.CreateLogger<TSimpleAsyncServer>());
259 
260                 Logger.LogInformation("Starting the server...");
261 
262                 await server.ServeAsync(cancellationToken);
263             }
264             catch (Exception x)
265             {
266                 Logger.LogInformation(x.ToString());
267             }
268         }
269 
GetCertificate()270         private static X509Certificate2 GetCertificate()
271         {
272             // due to files location in net core better to take certs from top folder
273             var certFile = GetCertPath(Directory.GetParent(Directory.GetCurrentDirectory()));
274             return new X509Certificate2(certFile, "ThriftTest");
275         }
276 
GetCertPath(DirectoryInfo di, int maxCount = 6)277         private static string GetCertPath(DirectoryInfo di, int maxCount = 6)
278         {
279             var topDir = di;
280             var certFile =
281                 topDir.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories)
282                     .FirstOrDefault();
283             if (certFile == null)
284             {
285                 if (maxCount == 0)
286                     throw new FileNotFoundException("Cannot find file in directories");
287                 return GetCertPath(di.Parent, maxCount - 1);
288             }
289 
290             return certFile.FullName;
291         }
292 
LocalCertificateSelectionCallback(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers)293         private static X509Certificate LocalCertificateSelectionCallback(object sender,
294             string targetHost, X509CertificateCollection localCertificates,
295             X509Certificate remoteCertificate, string[] acceptableIssuers)
296         {
297             return GetCertificate();
298         }
299 
ClientCertValidator(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)300         private static bool ClientCertValidator(object sender, X509Certificate certificate,
301             X509Chain chain, SslPolicyErrors sslPolicyErrors)
302         {
303             return true;
304         }
305 
306         private enum Transport
307         {
308             Tcp,
309             NamedPipe,
310             Http,
311             TcpTls,
312         }
313 
314         private enum Buffering
315         {
316             None,
317             Buffered,
318             Framed,
319         }
320 
321         private enum Protocol
322         {
323             Binary,
324             Compact,
325             Json,
326             Multiplexed
327         }
328 
329         public class HttpServerSample
330         {
Run(CancellationToken cancellationToken)331             public void Run(CancellationToken cancellationToken)
332             {
333                 var config = new ConfigurationBuilder()
334                     .AddEnvironmentVariables(prefix: "ASPNETCORE_")
335                     .Build();
336 
337                 var host = new WebHostBuilder()
338                     .UseConfiguration(config)
339                     .UseKestrel()
340                     .UseUrls("http://localhost:9090")
341                     .UseContentRoot(Directory.GetCurrentDirectory())
342                     .UseStartup<Startup>()
343                     .ConfigureLogging((ctx,logging) => ConfigureLogging(logging))
344                     .Build();
345 
346                 Logger.LogTrace("test");
347                 Logger.LogCritical("test");
348                 host.RunAsync(cancellationToken).GetAwaiter().GetResult();
349             }
350 
351             public class Startup
352             {
Startup(IWebHostEnvironment env)353                 public Startup(IWebHostEnvironment env)
354                 {
355                     var builder = new ConfigurationBuilder()
356                         .SetBasePath(env.ContentRootPath)
357                         .AddEnvironmentVariables();
358 
359                     Configuration = builder.Build();
360                 }
361 
362                 public IConfigurationRoot Configuration { get; }
363 
364                 // This method gets called by the runtime. Use this method to add services to the container.
ConfigureServices(IServiceCollection services)365                 public void ConfigureServices(IServiceCollection services)
366                 {
367                     services.AddTransient<Calculator.IAsync, CalculatorAsyncHandler>();
368                     services.AddTransient<ITAsyncProcessor, Calculator.AsyncProcessor>();
369                     services.AddTransient<THttpServerTransport, THttpServerTransport>();
370                 }
371 
372                 // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerFactory loggerFactory)373                 public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerFactory loggerFactory)
374                 {
375                     app.UseMiddleware<THttpServerTransport>();
376                 }
377             }
378         }
379 
380         public class CalculatorAsyncHandler : Calculator.IAsync
381         {
382             private readonly Dictionary<int, SharedStruct> _log = new Dictionary<int, SharedStruct>();
383 
CalculatorAsyncHandler()384             public CalculatorAsyncHandler()
385             {
386             }
387 
getStructAsync(int key, CancellationToken cancellationToken)388             public async Task<SharedStruct> getStructAsync(int key,
389                 CancellationToken cancellationToken)
390             {
391                 Logger.LogInformation("GetStructAsync({0})", key);
392                 return await Task.FromResult(_log[key]);
393             }
394 
pingAsync(CancellationToken cancellationToken)395             public async Task pingAsync(CancellationToken cancellationToken)
396             {
397                 Logger.LogInformation("PingAsync()");
398                 await Task.CompletedTask;
399             }
400 
addAsync(int num1, int num2, CancellationToken cancellationToken)401             public async Task<int> addAsync(int num1, int num2, CancellationToken cancellationToken)
402             {
403                 Logger.LogInformation($"AddAsync({num1},{num2})");
404                 return await Task.FromResult(num1 + num2);
405             }
406 
calculateAsync(int logid, Work w, CancellationToken cancellationToken)407             public async Task<int> calculateAsync(int logid, Work w, CancellationToken cancellationToken)
408             {
409                 Logger.LogInformation($"CalculateAsync({logid}, [{w.Op},{w.Num1},{w.Num2}])");
410 
411                 var val = 0;
412                 switch (w.Op)
413                 {
414                     case Operation.ADD:
415                         val = w.Num1 + w.Num2;
416                         break;
417 
418                     case Operation.SUBTRACT:
419                         val = w.Num1 - w.Num2;
420                         break;
421 
422                     case Operation.MULTIPLY:
423                         val = w.Num1 * w.Num2;
424                         break;
425 
426                     case Operation.DIVIDE:
427                         if (w.Num2 == 0)
428                         {
429                             var io = new InvalidOperation
430                             {
431                                 WhatOp = (int) w.Op,
432                                 Why = "Cannot divide by 0"
433                             };
434 
435                             throw io;
436                         }
437                         val = w.Num1 / w.Num2;
438                         break;
439 
440                     default:
441                     {
442                         var io = new InvalidOperation
443                         {
444                             WhatOp = (int) w.Op,
445                             Why = "Unknown operation"
446                         };
447 
448                         throw io;
449                     }
450                 }
451 
452                 var entry = new SharedStruct
453                 {
454                     Key = logid,
455                     Value = val.ToString()
456                 };
457 
458                 _log[logid] = entry;
459 
460                 return await Task.FromResult(val);
461             }
462 
zipAsync(CancellationToken cancellationToken)463             public async Task zipAsync(CancellationToken cancellationToken)
464             {
465                 Logger.LogInformation("ZipAsync() with delay 100mc");
466                 await Task.Delay(100, CancellationToken.None);
467             }
468         }
469 
470         public class SharedServiceAsyncHandler : SharedService.IAsync
471         {
getStructAsync(int key, CancellationToken cancellationToken)472             public async Task<SharedStruct> getStructAsync(int key, CancellationToken cancellationToken)
473             {
474                 Logger.LogInformation("GetStructAsync({0})", key);
475                 return await Task.FromResult(new SharedStruct()
476                 {
477                     Key = key,
478                     Value = "GetStructAsync"
479                 });
480             }
481         }
482     }
483 }
484