1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Collections.Generic; 20 using System.Diagnostics; 21 using System.IO; 22 using System.Linq; 23 using System.Security.Authentication; 24 using System.Security.Cryptography.X509Certificates; 25 using System.Text; 26 using System.Threading; 27 using System.Threading.Tasks; 28 using Microsoft.Extensions.Logging; 29 using Thrift; 30 using Thrift.Collections; 31 using Thrift.Processor; 32 using Thrift.Protocol; 33 using Thrift.Server; 34 using Thrift.Transport; 35 using Thrift.Transport.Server; 36 37 #pragma warning disable IDE0063 // using can be simplified, we don't 38 39 namespace ThriftTest 40 { 41 internal enum ProtocolChoice 42 { 43 Binary, 44 Compact, 45 Json 46 } 47 48 internal enum TransportChoice 49 { 50 Socket, 51 TlsSocket, 52 NamedPipe 53 } 54 55 internal enum BufferChoice 56 { 57 None, 58 Buffered, 59 Framed 60 } 61 62 internal class ServerParam 63 { 64 internal BufferChoice buffering = BufferChoice.None; 65 internal ProtocolChoice protocol = ProtocolChoice.Binary; 66 internal TransportChoice transport = TransportChoice.Socket; 67 internal int port = 9090; 68 internal string pipe = null; 69 Parse(List<string> args)70 internal void Parse(List<string> args) 71 { 72 for (var i = 0; i < args.Count; i++) 73 { 74 if (args[i].StartsWith("--pipe=")) 75 { 76 pipe = args[i].Substring(args[i].IndexOf("=") + 1); 77 transport = TransportChoice.NamedPipe; 78 } 79 else if (args[i].StartsWith("--port=")) 80 { 81 port = int.Parse(args[i].Substring(args[i].IndexOf("=") + 1)); 82 if(transport != TransportChoice.TlsSocket) 83 transport = TransportChoice.Socket; 84 } 85 else if (args[i] == "-b" || args[i] == "--buffered" || args[i] == "--transport=buffered") 86 { 87 buffering = BufferChoice.Buffered; 88 } 89 else if (args[i] == "-f" || args[i] == "--framed" || args[i] == "--transport=framed") 90 { 91 buffering = BufferChoice.Framed; 92 } 93 else if (args[i] == "--binary" || args[i] == "--protocol=binary") 94 { 95 protocol = ProtocolChoice.Binary; 96 } 97 else if (args[i] == "--compact" || args[i] == "--protocol=compact") 98 { 99 protocol = ProtocolChoice.Compact; 100 } 101 else if (args[i] == "--json" || args[i] == "--protocol=json") 102 { 103 protocol = ProtocolChoice.Json; 104 } 105 else if (args[i] == "--threaded" || args[i] == "--server-type=threaded") 106 { 107 throw new NotImplementedException(args[i]); 108 } 109 else if (args[i] == "--threadpool" || args[i] == "--server-type=threadpool") 110 { 111 throw new NotImplementedException(args[i]); 112 } 113 else if (args[i] == "--prototype" || args[i] == "--processor=prototype") 114 { 115 throw new NotImplementedException(args[i]); 116 } 117 else if (args[i] == "--ssl") 118 { 119 transport = TransportChoice.TlsSocket; 120 } 121 else if (args[i] == "--help") 122 { 123 PrintOptionsHelp(); 124 return; 125 } 126 else 127 { 128 Console.WriteLine("Invalid argument: {0}", args[i]); 129 PrintOptionsHelp(); 130 return; 131 } 132 } 133 134 } 135 PrintOptionsHelp()136 internal static void PrintOptionsHelp() 137 { 138 Console.WriteLine("Server options:"); 139 Console.WriteLine(" --pipe=<pipe name>"); 140 Console.WriteLine(" --port=<port number>"); 141 Console.WriteLine(" --transport=<transport name> one of buffered,framed (defaults to none)"); 142 Console.WriteLine(" --protocol=<protocol name> one of compact,json (defaults to binary)"); 143 Console.WriteLine(" --server-type=<type> one of threaded,threadpool (defaults to simple)"); 144 Console.WriteLine(" --processor=<prototype>"); 145 Console.WriteLine(" --ssl"); 146 Console.WriteLine(); 147 } 148 } 149 150 public class TestServer 151 { 152 public static int _clientID = -1; 153 private static readonly TConfiguration Configuration = null; // or new TConfiguration() if needed 154 TestLogDelegate(string msg, params object[] values)155 public delegate void TestLogDelegate(string msg, params object[] values); 156 157 public class MyServerEventHandler : TServerEventHandler 158 { 159 public int callCount = 0; 160 PreServeAsync(CancellationToken cancellationToken)161 public Task PreServeAsync(CancellationToken cancellationToken) 162 { 163 callCount++; 164 return Task.CompletedTask; 165 } 166 CreateContextAsync(TProtocol input, TProtocol output, CancellationToken cancellationToken)167 public Task<object> CreateContextAsync(TProtocol input, TProtocol output, CancellationToken cancellationToken) 168 { 169 callCount++; 170 return Task.FromResult<object>(null); 171 } 172 DeleteContextAsync(object serverContext, TProtocol input, TProtocol output, CancellationToken cancellationToken)173 public Task DeleteContextAsync(object serverContext, TProtocol input, TProtocol output, CancellationToken cancellationToken) 174 { 175 callCount++; 176 return Task.CompletedTask; 177 } 178 ProcessContextAsync(object serverContext, TTransport transport, CancellationToken cancellationToken)179 public Task ProcessContextAsync(object serverContext, TTransport transport, CancellationToken cancellationToken) 180 { 181 callCount++; 182 return Task.CompletedTask; 183 } 184 } 185 186 public class TestHandlerAsync : ThriftTest.IAsync 187 { 188 public TServer Server { get; set; } 189 private readonly int handlerID; 190 private readonly StringBuilder sb = new StringBuilder(); 191 private readonly TestLogDelegate logger; 192 TestHandlerAsync()193 public TestHandlerAsync() 194 { 195 handlerID = Interlocked.Increment(ref _clientID); 196 logger += TestConsoleLogger; 197 logger.Invoke("New TestHandler instance created"); 198 } 199 TestConsoleLogger(string msg, params object[] values)200 public void TestConsoleLogger(string msg, params object[] values) 201 { 202 sb.Clear(); 203 sb.AppendFormat("handler{0:D3}:", handlerID); 204 sb.AppendFormat(msg, values); 205 sb.AppendLine(); 206 Console.Write(sb.ToString()); 207 } 208 testVoidAsync(CancellationToken cancellationToken)209 public Task testVoidAsync(CancellationToken cancellationToken) 210 { 211 logger.Invoke("testVoid()"); 212 return Task.CompletedTask; 213 } 214 testStringAsync(string thing, CancellationToken cancellationToken)215 public Task<string> testStringAsync(string thing, CancellationToken cancellationToken) 216 { 217 logger.Invoke("testString({0})", thing); 218 return Task.FromResult(thing); 219 } 220 testBoolAsync(bool thing, CancellationToken cancellationToken)221 public Task<bool> testBoolAsync(bool thing, CancellationToken cancellationToken) 222 { 223 logger.Invoke("testBool({0})", thing); 224 return Task.FromResult(thing); 225 } 226 testByteAsync(sbyte thing, CancellationToken cancellationToken)227 public Task<sbyte> testByteAsync(sbyte thing, CancellationToken cancellationToken) 228 { 229 logger.Invoke("testByte({0})", thing); 230 return Task.FromResult(thing); 231 } 232 testI32Async(int thing, CancellationToken cancellationToken)233 public Task<int> testI32Async(int thing, CancellationToken cancellationToken) 234 { 235 logger.Invoke("testI32({0})", thing); 236 return Task.FromResult(thing); 237 } 238 testI64Async(long thing, CancellationToken cancellationToken)239 public Task<long> testI64Async(long thing, CancellationToken cancellationToken) 240 { 241 logger.Invoke("testI64({0})", thing); 242 return Task.FromResult(thing); 243 } 244 testDoubleAsync(double thing, CancellationToken cancellationToken)245 public Task<double> testDoubleAsync(double thing, CancellationToken cancellationToken) 246 { 247 logger.Invoke("testDouble({0})", thing); 248 return Task.FromResult(thing); 249 } 250 testBinaryAsync(byte[] thing, CancellationToken cancellationToken)251 public Task<byte[]> testBinaryAsync(byte[] thing, CancellationToken cancellationToken) 252 { 253 logger.Invoke("testBinary({0} bytes)", thing.Length); 254 return Task.FromResult(thing); 255 } 256 testStructAsync(Xtruct thing, CancellationToken cancellationToken)257 public Task<Xtruct> testStructAsync(Xtruct thing, CancellationToken cancellationToken) 258 { 259 logger.Invoke("testStruct({{\"{0}\", {1}, {2}, {3}}})", thing.String_thing, thing.Byte_thing, thing.I32_thing, thing.I64_thing); 260 return Task.FromResult(thing); 261 } 262 testNestAsync(Xtruct2 nest, CancellationToken cancellationToken)263 public Task<Xtruct2> testNestAsync(Xtruct2 nest, CancellationToken cancellationToken) 264 { 265 var thing = nest.Struct_thing; 266 logger.Invoke("testNest({{{0}, {{\"{1}\", {2}, {3}, {4}, {5}}}}})", 267 nest.Byte_thing, 268 thing.String_thing, 269 thing.Byte_thing, 270 thing.I32_thing, 271 thing.I64_thing, 272 nest.I32_thing); 273 return Task.FromResult(nest); 274 } 275 testMapAsync(Dictionary<int, int> thing, CancellationToken cancellationToken)276 public Task<Dictionary<int, int>> testMapAsync(Dictionary<int, int> thing, CancellationToken cancellationToken) 277 { 278 sb.Clear(); 279 sb.Append("testMap({{"); 280 var first = true; 281 foreach (var key in thing.Keys) 282 { 283 if (first) 284 { 285 first = false; 286 } 287 else 288 { 289 sb.Append(", "); 290 } 291 sb.AppendFormat("{0} => {1}", key, thing[key]); 292 } 293 sb.Append("}})"); 294 logger.Invoke(sb.ToString()); 295 return Task.FromResult(thing); 296 } 297 testStringMapAsync(Dictionary<string, string> thing, CancellationToken cancellationToken)298 public Task<Dictionary<string, string>> testStringMapAsync(Dictionary<string, string> thing, CancellationToken cancellationToken) 299 { 300 sb.Clear(); 301 sb.Append("testStringMap({{"); 302 var first = true; 303 foreach (var key in thing.Keys) 304 { 305 if (first) 306 { 307 first = false; 308 } 309 else 310 { 311 sb.Append(", "); 312 } 313 sb.AppendFormat("{0} => {1}", key, thing[key]); 314 } 315 sb.Append("}})"); 316 logger.Invoke(sb.ToString()); 317 return Task.FromResult(thing); 318 } 319 testSetAsync(THashSet<int> thing, CancellationToken cancellationToken)320 public Task<THashSet<int>> testSetAsync(THashSet<int> thing, CancellationToken cancellationToken) 321 { 322 sb.Clear(); 323 sb.Append("testSet({{"); 324 var first = true; 325 foreach (int elem in thing) 326 { 327 if (first) 328 { 329 first = false; 330 } 331 else 332 { 333 sb.Append(", "); 334 } 335 sb.AppendFormat("{0}", elem); 336 } 337 sb.Append("}})"); 338 logger.Invoke(sb.ToString()); 339 return Task.FromResult(thing); 340 } 341 testListAsync(List<int> thing, CancellationToken cancellationToken)342 public Task<List<int>> testListAsync(List<int> thing, CancellationToken cancellationToken) 343 { 344 sb.Clear(); 345 sb.Append("testList({{"); 346 var first = true; 347 foreach (var elem in thing) 348 { 349 if (first) 350 { 351 first = false; 352 } 353 else 354 { 355 sb.Append(", "); 356 } 357 sb.AppendFormat("{0}", elem); 358 } 359 sb.Append("}})"); 360 logger.Invoke(sb.ToString()); 361 return Task.FromResult(thing); 362 } 363 testEnumAsync(Numberz thing, CancellationToken cancellationToken)364 public Task<Numberz> testEnumAsync(Numberz thing, CancellationToken cancellationToken) 365 { 366 logger.Invoke("testEnum({0})", thing); 367 return Task.FromResult(thing); 368 } 369 testTypedefAsync(long thing, CancellationToken cancellationToken)370 public Task<long> testTypedefAsync(long thing, CancellationToken cancellationToken) 371 { 372 logger.Invoke("testTypedef({0})", thing); 373 return Task.FromResult(thing); 374 } 375 testMapMapAsync(int hello, CancellationToken cancellationToken)376 public Task<Dictionary<int, Dictionary<int, int>>> testMapMapAsync(int hello, CancellationToken cancellationToken) 377 { 378 logger.Invoke("testMapMap({0})", hello); 379 var mapmap = new Dictionary<int, Dictionary<int, int>>(); 380 381 var pos = new Dictionary<int, int>(); 382 var neg = new Dictionary<int, int>(); 383 for (var i = 1; i < 5; i++) 384 { 385 pos[i] = i; 386 neg[-i] = -i; 387 } 388 389 mapmap[4] = pos; 390 mapmap[-4] = neg; 391 392 return Task.FromResult(mapmap); 393 } 394 testInsanityAsync(Insanity argument, CancellationToken cancellationToken)395 public Task<Dictionary<long, Dictionary<Numberz, Insanity>>> testInsanityAsync(Insanity argument, CancellationToken cancellationToken) 396 { 397 logger.Invoke("testInsanity()"); 398 399 /** from ThriftTest.thrift: 400 * So you think you've got this all worked, out eh? 401 * 402 * Creates a the returned map with these values and prints it out: 403 * { 1 => { 2 => argument, 404 * 3 => argument, 405 * }, 406 * 2 => { 6 => <empty Insanity struct>, }, 407 * } 408 * @return map<UserId, map<Numberz,Insanity>> - a map with the above values 409 */ 410 411 var first_map = new Dictionary<Numberz, Insanity>(); 412 var second_map = new Dictionary<Numberz, Insanity>(); ; 413 414 first_map[Numberz.TWO] = argument; 415 first_map[Numberz.THREE] = argument; 416 417 second_map[Numberz.SIX] = new Insanity(); 418 419 var insane = new Dictionary<long, Dictionary<Numberz, Insanity>> 420 { 421 [1] = first_map, 422 [2] = second_map 423 }; 424 425 return Task.FromResult(insane); 426 } 427 testMultiAsync(sbyte arg0, int arg1, long arg2, Dictionary<short, string> arg3, Numberz arg4, long arg5, CancellationToken cancellationToken)428 public Task<Xtruct> testMultiAsync(sbyte arg0, int arg1, long arg2, Dictionary<short, string> arg3, Numberz arg4, long arg5, 429 CancellationToken cancellationToken) 430 { 431 logger.Invoke("testMulti()"); 432 433 var hello = new Xtruct(); ; 434 hello.String_thing = "Hello2"; 435 hello.Byte_thing = arg0; 436 hello.I32_thing = arg1; 437 hello.I64_thing = arg2; 438 return Task.FromResult(hello); 439 } 440 testExceptionAsync(string arg, CancellationToken cancellationToken)441 public Task testExceptionAsync(string arg, CancellationToken cancellationToken) 442 { 443 logger.Invoke("testException({0})", arg); 444 if (arg == "Xception") 445 { 446 var x = new Xception 447 { 448 ErrorCode = 1001, 449 Message = arg 450 }; 451 throw x; 452 } 453 if (arg == "TException") 454 { 455 throw new TException(); 456 } 457 return Task.CompletedTask; 458 } 459 testMultiExceptionAsync(string arg0, string arg1, CancellationToken cancellationToken)460 public Task<Xtruct> testMultiExceptionAsync(string arg0, string arg1, CancellationToken cancellationToken) 461 { 462 logger.Invoke("testMultiException({0}, {1})", arg0, arg1); 463 if (arg0 == "Xception") 464 { 465 var x = new Xception 466 { 467 ErrorCode = 1001, 468 Message = "This is an Xception" 469 }; 470 throw x; 471 } 472 473 if (arg0 == "Xception2") 474 { 475 var x = new Xception2 476 { 477 ErrorCode = 2002, 478 Struct_thing = new Xtruct { String_thing = "This is an Xception2" } 479 }; 480 throw x; 481 } 482 483 var result = new Xtruct { String_thing = arg1 }; 484 return Task.FromResult(result); 485 } 486 testOnewayAsync(int secondsToSleep, CancellationToken cancellationToken)487 public Task testOnewayAsync(int secondsToSleep, CancellationToken cancellationToken) 488 { 489 logger.Invoke("testOneway({0}), sleeping...", secondsToSleep); 490 Task.Delay(secondsToSleep * 1000, cancellationToken).GetAwaiter().GetResult(); 491 logger.Invoke("testOneway finished"); 492 493 return Task.CompletedTask; 494 } 495 } 496 497 GetServerCert()498 private static X509Certificate2 GetServerCert() 499 { 500 var serverCertName = "server.p12"; 501 var possiblePaths = new List<string> 502 { 503 "../../../keys/", 504 "../../keys/", 505 "../keys/", 506 "keys/", 507 }; 508 509 string existingPath = null; 510 foreach (var possiblePath in possiblePaths) 511 { 512 var path = Path.GetFullPath(possiblePath + serverCertName); 513 if (File.Exists(path)) 514 { 515 existingPath = path; 516 break; 517 } 518 } 519 520 if (string.IsNullOrEmpty(existingPath)) 521 { 522 throw new FileNotFoundException($"Cannot find file: {serverCertName}"); 523 } 524 525 var cert = new X509Certificate2(existingPath, "thrift"); 526 527 return cert; 528 } 529 Execute(List<string> args)530 public static int Execute(List<string> args) 531 { 532 using (var loggerFactory = new LoggerFactory()) //.AddConsole().AddDebug(); 533 { 534 var logger = loggerFactory.CreateLogger("Test"); 535 536 try 537 { 538 var param = new ServerParam(); 539 540 try 541 { 542 param.Parse(args); 543 } 544 catch (Exception ex) 545 { 546 Console.WriteLine("*** FAILED ***"); 547 Console.WriteLine("Error while parsing arguments"); 548 Console.WriteLine(ex.Message + " ST: " + ex.StackTrace); 549 return 1; 550 } 551 552 553 // Endpoint transport (mandatory) 554 TServerTransport trans; 555 switch (param.transport) 556 { 557 case TransportChoice.NamedPipe: 558 Debug.Assert(param.pipe != null); 559 trans = new TNamedPipeServerTransport(param.pipe, Configuration); 560 break; 561 562 563 case TransportChoice.TlsSocket: 564 var cert = GetServerCert(); 565 if (cert == null || !cert.HasPrivateKey) 566 { 567 cert?.Dispose(); 568 throw new InvalidOperationException("Certificate doesn't contain private key"); 569 } 570 571 trans = new TTlsServerSocketTransport(param.port, Configuration, 572 cert, 573 (sender, certificate, chain, errors) => true, 574 null, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12); 575 break; 576 577 case TransportChoice.Socket: 578 default: 579 trans = new TServerSocketTransport(param.port, Configuration); 580 break; 581 } 582 583 // Layered transport (mandatory) 584 TTransportFactory transFactory = null; 585 switch (param.buffering) 586 { 587 case BufferChoice.Framed: 588 transFactory = new TFramedTransport.Factory(); 589 break; 590 case BufferChoice.Buffered: 591 transFactory = new TBufferedTransport.Factory(); 592 break; 593 default: 594 Debug.Assert(param.buffering == BufferChoice.None, "unhandled case"); 595 transFactory = null; // no layered transprt 596 break; 597 } 598 599 TProtocolFactory proto = param.protocol switch 600 { 601 ProtocolChoice.Compact => new TCompactProtocol.Factory(), 602 ProtocolChoice.Json => new TJsonProtocol.Factory(), 603 ProtocolChoice.Binary => new TBinaryProtocol.Factory(), 604 _ => new TBinaryProtocol.Factory(), 605 }; 606 607 // Processor 608 var testHandler = new TestHandlerAsync(); 609 var testProcessor = new ThriftTest.AsyncProcessor(testHandler); 610 var processorFactory = new TSingletonProcessorFactory(testProcessor); 611 612 TServer serverEngine = new TSimpleAsyncServer(processorFactory, trans, transFactory, transFactory, proto, proto, logger); 613 614 //Server event handler 615 var serverEvents = new MyServerEventHandler(); 616 serverEngine.SetEventHandler(serverEvents); 617 618 // Run it 619 var where = (!string.IsNullOrEmpty(param.pipe)) ? "on pipe " + param.pipe : "on port " + param.port; 620 Console.WriteLine("Starting the AsyncBaseServer " + where + 621 " with processor TPrototypeProcessorFactory prototype factory " + 622 (param.buffering == BufferChoice.Buffered ? " with buffered transport" : "") + 623 (param.buffering == BufferChoice.Framed ? " with framed transport" : "") + 624 (param.transport == TransportChoice.TlsSocket ? " with encryption" : "") + 625 (param.protocol == ProtocolChoice.Compact ? " with compact protocol" : "") + 626 (param.protocol == ProtocolChoice.Json ? " with json protocol" : "") + 627 "..."); 628 serverEngine.ServeAsync(CancellationToken.None).GetAwaiter().GetResult(); 629 Console.ReadLine(); 630 } 631 catch (Exception x) 632 { 633 Console.Error.Write(x); 634 return 1; 635 } 636 637 Console.WriteLine("done."); 638 return 0; 639 } 640 } 641 } 642 643 } 644