1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Collections.Generic;
20 using System.IO;
21 using System.Linq;
22 using System.Net.Security;
23 using System.Security.Cryptography.X509Certificates;
24 using System.Threading;
25 using System.Threading.Tasks;
26 using Microsoft.AspNetCore.Builder;
27 using Microsoft.AspNetCore.Hosting;
28 using Microsoft.Extensions.Configuration;
29 using Microsoft.Extensions.DependencyInjection;
30 using Microsoft.Extensions.Logging;
31 using Thrift;
32 using Thrift.Protocols;
33 using Thrift.Server;
34 using Thrift.Transports;
35 using Thrift.Transports.Server;
36 using tutorial;
37 using shared;
38 
39 namespace Server
40 {
41     public class Program
42     {
43         private static readonly ILogger Logger = new LoggerFactory().AddConsole(LogLevel.Trace).AddDebug(LogLevel.Trace).CreateLogger(nameof(Server));
44 
Main(string[] args)45         public static void Main(string[] args)
46         {
47             args = args ?? new string[0];
48 
49             if (args.Any(x => x.StartsWith("-help", StringComparison.OrdinalIgnoreCase)))
50             {
51                 DisplayHelp();
52                 return;
53             }
54 
55             using (var source = new CancellationTokenSource())
56             {
57                 RunAsync(args, source.Token).GetAwaiter().GetResult();
58 
59                 Logger.LogInformation("Press any key to stop...");
60 
61                 Console.ReadLine();
62                 source.Cancel();
63             }
64 
65             Logger.LogInformation("Server stopped");
66         }
67 
DisplayHelp()68         private static void DisplayHelp()
69         {
70             Logger.LogInformation(@"
71 Usage:
72     Server.exe -help
73         will diplay help information
74 
75     Server.exe -tr:<transport> -pr:<protocol>
76         will run server with specified arguments (tcp transport and binary protocol by default)
77 
78 Options:
79     -tr (transport):
80         tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090)
81         tcpbuffered - tcp buffered transport will be used (host - ""localhost"", port - 9090)
82         namedpipe - namedpipe transport will be used (pipe address - "".test"")
83         http - http transport will be used (http address - ""localhost:9090"")
84         tcptls - tcp transport with tls will be used (host - ""localhost"", port - 9090)
85         framed - tcp framed transport will be used (host - ""localhost"", port - 9090)
86 
87     -pr (protocol):
88         binary - (default) binary protocol will be used
89         compact - compact protocol will be used
90         json - json protocol will be used
91         multiplexed - multiplexed protocol will be used
92 
93 Sample:
94     Server.exe -tr:tcp
95 ");
96         }
97 
RunAsync(string[] args, CancellationToken cancellationToken)98         private static async Task RunAsync(string[] args, CancellationToken cancellationToken)
99         {
100             var selectedTransport = GetTransport(args);
101             var selectedProtocol = GetProtocol(args);
102 
103             if (selectedTransport == Transport.Http)
104             {
105                 new HttpServerSample().Run(cancellationToken);
106             }
107             else
108             {
109                 await RunSelectedConfigurationAsync(selectedTransport, selectedProtocol, cancellationToken);
110             }
111         }
112 
GetProtocol(string[] args)113         private static Protocol GetProtocol(string[] args)
114         {
115             var transport = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':')?[1];
116 
117             Enum.TryParse(transport, true, out Protocol selectedProtocol);
118 
119             return selectedProtocol;
120         }
121 
GetTransport(string[] args)122         private static Transport GetTransport(string[] args)
123         {
124             var transport = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1];
125 
126             Enum.TryParse(transport, true, out Transport selectedTransport);
127 
128             return selectedTransport;
129         }
130 
RunSelectedConfigurationAsync(Transport transport, Protocol protocol, CancellationToken cancellationToken)131         private static async Task RunSelectedConfigurationAsync(Transport transport, Protocol protocol, CancellationToken cancellationToken)
132         {
133             var fabric = new LoggerFactory().AddConsole(LogLevel.Trace).AddDebug(LogLevel.Trace);
134             var handler = new CalculatorAsyncHandler();
135             ITAsyncProcessor processor = null;
136 
137             TServerTransport serverTransport = null;
138 
139             switch (transport)
140             {
141                 case Transport.Tcp:
142                     serverTransport = new TServerSocketTransport(9090);
143                     break;
144                 case Transport.TcpBuffered:
145                     serverTransport = new TServerSocketTransport(port: 9090, clientTimeout: 10000, useBufferedSockets: true);
146                     break;
147                 case Transport.NamedPipe:
148                     serverTransport = new TNamedPipeServerTransport(".test");
149                     break;
150                 case Transport.TcpTls:
151                     serverTransport = new TTlsServerSocketTransport(9090, false, GetCertificate(), ClientCertValidator, LocalCertificateSelectionCallback);
152                     break;
153                 case Transport.Framed:
154                     serverTransport = new TServerFramedTransport(9090);
155                     break;
156             }
157 
158             ITProtocolFactory inputProtocolFactory;
159             ITProtocolFactory outputProtocolFactory;
160 
161             switch (protocol)
162             {
163                 case Protocol.Binary:
164                 {
165                     inputProtocolFactory = new TBinaryProtocol.Factory();
166                     outputProtocolFactory = new TBinaryProtocol.Factory();
167                     processor = new Calculator.AsyncProcessor(handler);
168                 }
169                     break;
170                 case Protocol.Compact:
171                 {
172                     inputProtocolFactory = new TCompactProtocol.Factory();
173                     outputProtocolFactory = new TCompactProtocol.Factory();
174                     processor = new Calculator.AsyncProcessor(handler);
175                 }
176                     break;
177                 case Protocol.Json:
178                 {
179                     inputProtocolFactory = new TJsonProtocol.Factory();
180                     outputProtocolFactory = new TJsonProtocol.Factory();
181                     processor = new Calculator.AsyncProcessor(handler);
182                 }
183                     break;
184                 case Protocol.Multiplexed:
185                 {
186                     inputProtocolFactory = new TBinaryProtocol.Factory();
187                     outputProtocolFactory = new TBinaryProtocol.Factory();
188 
189                     var calcHandler = new CalculatorAsyncHandler();
190                     var calcProcessor = new Calculator.AsyncProcessor(calcHandler);
191 
192                     var sharedServiceHandler = new SharedServiceAsyncHandler();
193                     var sharedServiceProcessor = new SharedService.AsyncProcessor(sharedServiceHandler);
194 
195                     var multiplexedProcessor = new TMultiplexedProcessor();
196                     multiplexedProcessor.RegisterProcessor(nameof(Calculator), calcProcessor);
197                     multiplexedProcessor.RegisterProcessor(nameof(SharedService), sharedServiceProcessor);
198 
199                     processor = multiplexedProcessor;
200                 }
201                     break;
202                 default:
203                     throw new ArgumentOutOfRangeException(nameof(protocol), protocol, null);
204             }
205 
206             try
207             {
208                 Logger.LogInformation(
209                     $"Selected TAsyncServer with {serverTransport} transport, {processor} processor and {inputProtocolFactory} protocol factories");
210 
211                 var server = new AsyncBaseServer(processor, serverTransport, inputProtocolFactory, outputProtocolFactory, fabric);
212 
213                 Logger.LogInformation("Starting the server...");
214                 await server.ServeAsync(cancellationToken);
215             }
216             catch (Exception x)
217             {
218                 Logger.LogInformation(x.ToString());
219             }
220         }
221 
GetCertificate()222         private static X509Certificate2 GetCertificate()
223         {
224             // due to files location in net core better to take certs from top folder
225             var certFile = GetCertPath(Directory.GetParent(Directory.GetCurrentDirectory()));
226             return new X509Certificate2(certFile, "ThriftTest");
227         }
228 
GetCertPath(DirectoryInfo di, int maxCount = 6)229         private static string GetCertPath(DirectoryInfo di, int maxCount = 6)
230         {
231             var topDir = di;
232             var certFile =
233                 topDir.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories)
234                     .FirstOrDefault();
235             if (certFile == null)
236             {
237                 if (maxCount == 0)
238                     throw new FileNotFoundException("Cannot find file in directories");
239                 return GetCertPath(di.Parent, maxCount - 1);
240             }
241 
242             return certFile.FullName;
243         }
244 
LocalCertificateSelectionCallback(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers)245         private static X509Certificate LocalCertificateSelectionCallback(object sender,
246             string targetHost, X509CertificateCollection localCertificates,
247             X509Certificate remoteCertificate, string[] acceptableIssuers)
248         {
249             return GetCertificate();
250         }
251 
ClientCertValidator(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)252         private static bool ClientCertValidator(object sender, X509Certificate certificate,
253             X509Chain chain, SslPolicyErrors sslPolicyErrors)
254         {
255             return true;
256         }
257 
258         private enum Transport
259         {
260             Tcp,
261             TcpBuffered,
262             NamedPipe,
263             Http,
264             TcpTls,
265             Framed
266         }
267 
268         private enum Protocol
269         {
270             Binary,
271             Compact,
272             Json,
273             Multiplexed
274         }
275 
276         public class HttpServerSample
277         {
Run(CancellationToken cancellationToken)278             public void Run(CancellationToken cancellationToken)
279             {
280                 var config = new ConfigurationBuilder()
281                     .AddEnvironmentVariables(prefix: "ASPNETCORE_")
282                     .Build();
283 
284                 var host = new WebHostBuilder()
285                     .UseConfiguration(config)
286                     .UseKestrel()
287                     .UseUrls("http://localhost:9090")
288                     .UseContentRoot(Directory.GetCurrentDirectory())
289                     .UseStartup<Startup>()
290                     .Build();
291 
292                 host.RunAsync(cancellationToken).GetAwaiter().GetResult();
293             }
294 
295             public class Startup
296             {
Startup(IHostingEnvironment env)297                 public Startup(IHostingEnvironment env)
298                 {
299                     var builder = new ConfigurationBuilder()
300                         .SetBasePath(env.ContentRootPath)
301                         .AddEnvironmentVariables();
302 
303                     Configuration = builder.Build();
304                 }
305 
306                 public IConfigurationRoot Configuration { get; }
307 
308                 // This method gets called by the runtime. Use this method to add services to the container.
ConfigureServices(IServiceCollection services)309                 public void ConfigureServices(IServiceCollection services)
310                 {
311                     services.AddTransient<Calculator.IAsync, CalculatorAsyncHandler>();
312                     services.AddTransient<ITAsyncProcessor, Calculator.AsyncProcessor>();
313                     services.AddTransient<THttpServerTransport, THttpServerTransport>();
314                 }
315 
316                 // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)317                 public void Configure(IApplicationBuilder app, IHostingEnvironment env,
318                     ILoggerFactory loggerFactory)
319                 {
320                     app.UseMiddleware<THttpServerTransport>();
321                 }
322             }
323         }
324 
325         public class CalculatorAsyncHandler : Calculator.IAsync
326         {
327             private readonly Dictionary<int, SharedStruct> _log = new Dictionary<int, SharedStruct>();
328 
CalculatorAsyncHandler()329             public CalculatorAsyncHandler()
330             {
331             }
332 
getStructAsync(int key, CancellationToken cancellationToken)333             public async Task<SharedStruct> getStructAsync(int key,
334                 CancellationToken cancellationToken)
335             {
336                 Logger.LogInformation("GetStructAsync({0})", key);
337                 return await Task.FromResult(_log[key]);
338             }
339 
pingAsync(CancellationToken cancellationToken)340             public async Task pingAsync(CancellationToken cancellationToken)
341             {
342                 Logger.LogInformation("PingAsync()");
343                 await Task.CompletedTask;
344             }
345 
addAsync(int num1, int num2, CancellationToken cancellationToken)346             public async Task<int> addAsync(int num1, int num2, CancellationToken cancellationToken)
347             {
348                 Logger.LogInformation($"AddAsync({num1},{num2})");
349                 return await Task.FromResult(num1 + num2);
350             }
351 
calculateAsync(int logid, Work w, CancellationToken cancellationToken)352             public async Task<int> calculateAsync(int logid, Work w, CancellationToken cancellationToken)
353             {
354                 Logger.LogInformation($"CalculateAsync({logid}, [{w.Op},{w.Num1},{w.Num2}])");
355 
356                 var val = 0;
357                 switch (w.Op)
358                 {
359                     case Operation.ADD:
360                         val = w.Num1 + w.Num2;
361                         break;
362 
363                     case Operation.SUBTRACT:
364                         val = w.Num1 - w.Num2;
365                         break;
366 
367                     case Operation.MULTIPLY:
368                         val = w.Num1 * w.Num2;
369                         break;
370 
371                     case Operation.DIVIDE:
372                         if (w.Num2 == 0)
373                         {
374                             var io = new InvalidOperation
375                             {
376                                 WhatOp = (int) w.Op,
377                                 Why = "Cannot divide by 0"
378                             };
379 
380                             throw io;
381                         }
382                         val = w.Num1 / w.Num2;
383                         break;
384 
385                     default:
386                     {
387                         var io = new InvalidOperation
388                         {
389                             WhatOp = (int) w.Op,
390                             Why = "Unknown operation"
391                         };
392 
393                         throw io;
394                     }
395                 }
396 
397                 var entry = new SharedStruct
398                 {
399                     Key = logid,
400                     Value = val.ToString()
401                 };
402 
403                 _log[logid] = entry;
404 
405                 return await Task.FromResult(val);
406             }
407 
zipAsync(CancellationToken cancellationToken)408             public async Task zipAsync(CancellationToken cancellationToken)
409             {
410                 Logger.LogInformation("ZipAsync() with delay 100mc");
411                 await Task.Delay(100, CancellationToken.None);
412             }
413         }
414 
415         public class SharedServiceAsyncHandler : SharedService.IAsync
416         {
getStructAsync(int key, CancellationToken cancellationToken)417             public async Task<SharedStruct> getStructAsync(int key, CancellationToken cancellationToken)
418             {
419                 Logger.LogInformation("GetStructAsync({0})", key);
420                 return await Task.FromResult(new SharedStruct()
421                 {
422                     Key = key,
423                     Value = "GetStructAsync"
424                 });
425             }
426         }
427     }
428 }
429