1 // 2 // Copyright (c) ZeroC, Inc. All rights reserved. 3 // 4 5 namespace Ice 6 { 7 8 public interface MarshaledResult 9 { getOutputStream(Current current)10 OutputStream getOutputStream(Current current); 11 }; 12 13 } 14 15 namespace IceInternal 16 { 17 using System; 18 using System.Collections.Generic; 19 using System.Diagnostics; 20 using System.IO; 21 using System.Globalization; 22 using System.Threading.Tasks; 23 24 public class Incoming : Ice.Request 25 { Incoming(Instance instance, ResponseHandler handler, Ice.ConnectionI connection, Ice.ObjectAdapter adapter, bool response, byte compress, int requestId)26 public Incoming(Instance instance, ResponseHandler handler, Ice.ConnectionI connection, 27 Ice.ObjectAdapter adapter, bool response, byte compress, int requestId) 28 { 29 _instance = instance; 30 _responseHandler = handler; 31 _response = response; 32 _compress = compress; 33 34 _current = new Ice.Current(); 35 _current.id = new Ice.Identity(); 36 _current.adapter = adapter; 37 _current.con = connection; 38 _current.requestId = requestId; 39 40 _cookie = null; 41 } 42 43 // 44 // These functions allow this object to be reused, rather than reallocated. 45 // reset(Instance instance, ResponseHandler handler, Ice.ConnectionI connection, Ice.ObjectAdapter adapter, bool response, byte compress, int requestId)46 public void reset(Instance instance, ResponseHandler handler, Ice.ConnectionI connection, 47 Ice.ObjectAdapter adapter, bool response, byte compress, int requestId) 48 { 49 _instance = instance; 50 _responseHandler = handler; 51 _response = response; 52 _compress = compress; 53 54 // 55 // Don't recycle the Current object, because servants may keep a reference to it. 56 // 57 _current = new Ice.Current(); 58 _current.id = new Ice.Identity(); 59 _current.adapter = adapter; 60 _current.con = connection; 61 _current.requestId = requestId; 62 63 Debug.Assert(_cookie == null); 64 65 _inParamPos = -1; 66 } 67 reclaim()68 public bool reclaim() 69 { 70 if(_responseHandler != null) // Async dispatch not ready for being reclaimed! 71 { 72 return false; 73 } 74 75 _current = null; 76 _servant = null; 77 _locator = null; 78 _cookie = null; 79 80 //_observer = null; 81 Debug.Assert(_observer == null); 82 83 _os = null; 84 _is = null; 85 86 //_responseHandler = null; 87 Debug.Assert(_responseHandler == null); 88 89 _inParamPos = -1; 90 91 return true; 92 } 93 getCurrent()94 public Ice.Current getCurrent() 95 { 96 return _current; 97 } 98 invoke(ServantManager servantManager, Ice.InputStream stream)99 public void invoke(ServantManager servantManager, Ice.InputStream stream) 100 { 101 _is = stream; 102 103 int start = _is.pos(); 104 105 // 106 // Read the current. 107 // 108 _current.id.ice_readMembers(_is); 109 110 // 111 // For compatibility with the old FacetPath. 112 // 113 string[] facetPath = _is.readStringSeq(); 114 if(facetPath.Length > 0) 115 { 116 if(facetPath.Length > 1) 117 { 118 throw new Ice.MarshalException(); 119 } 120 _current.facet = facetPath[0]; 121 } 122 else 123 { 124 _current.facet = ""; 125 } 126 127 _current.operation = _is.readString(); 128 _current.mode = (Ice.OperationMode)_is.readByte(); 129 _current.ctx = new Dictionary<string, string>(); 130 int sz = _is.readSize(); 131 while(sz-- > 0) 132 { 133 string first = _is.readString(); 134 string second = _is.readString(); 135 _current.ctx[first] = second; 136 } 137 138 Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; 139 if(obsv != null) 140 { 141 // Read the encapsulation size. 142 int size = _is.readInt(); 143 _is.pos(_is.pos() - 4); 144 145 _observer = obsv.getDispatchObserver(_current, _is.pos() - start + size); 146 if(_observer != null) 147 { 148 _observer.attach(); 149 } 150 } 151 152 // 153 // Don't put the code above into the try block below. Exceptions 154 // in the code above are considered fatal, and must propagate to 155 // the caller of this operation. 156 // 157 158 if(servantManager != null) 159 { 160 _servant = servantManager.findServant(_current.id, _current.facet); 161 if(_servant == null) 162 { 163 _locator = servantManager.findServantLocator(_current.id.category); 164 if(_locator == null && _current.id.category.Length > 0) 165 { 166 _locator = servantManager.findServantLocator(""); 167 } 168 169 if(_locator != null) 170 { 171 Debug.Assert(_locator != null); 172 try 173 { 174 _servant = _locator.locate(_current, out _cookie); 175 } 176 catch(Exception ex) 177 { 178 skipReadParams(); // Required for batch requests. 179 handleException(ex, false); 180 return; 181 } 182 } 183 } 184 } 185 186 if(_servant == null) 187 { 188 try 189 { 190 if(servantManager != null && servantManager.hasServant(_current.id)) 191 { 192 throw new Ice.FacetNotExistException(_current.id, _current.facet, _current.operation); 193 } 194 else 195 { 196 throw new Ice.ObjectNotExistException(_current.id, _current.facet, _current.operation); 197 } 198 } 199 catch(Exception ex) 200 { 201 skipReadParams(); // Required for batch requests 202 handleException(ex, false); 203 return; 204 } 205 } 206 207 try 208 { 209 Task<Ice.OutputStream> task = _servant.iceDispatch(this, _current); 210 if(task == null) 211 { 212 completed(null, false); 213 } 214 else 215 { 216 if(task.IsCompleted) 217 { 218 _os = task.GetAwaiter().GetResult(); // Get the response 219 completed(null, false); 220 } 221 else 222 { 223 task.ContinueWith((Task<Ice.OutputStream> t) => 224 { 225 try 226 { 227 _os = t.GetAwaiter().GetResult(); 228 completed(null, true); // true = asynchronous 229 } 230 catch(Exception ex) 231 { 232 completed(ex, true); // true = asynchronous 233 } 234 }, TaskContinuationOptions.ExecuteSynchronously); 235 } 236 } 237 } 238 catch(Exception ex) 239 { 240 completed(ex, false); 241 } 242 } 243 setResult(Ice.OutputStream os)244 public Task<Ice.OutputStream> setResult(Ice.OutputStream os) 245 { 246 _os = os; 247 return null; // Response is cached in the Incoming to not have to create unecessary Task 248 } 249 250 public Task<Ice.OutputStream> setMarshaledResult<T>(T result) where T : Ice.MarshaledResult 251 { 252 if(result == null) 253 { 254 _os = default(T).getOutputStream(_current); 255 } 256 else 257 { 258 _os = result.getOutputStream(_current); 259 } 260 return null; // Response is cached in the Incoming to not have to create unecessary Task 261 } 262 setResultTask(Task<R> task, Action<Ice.OutputStream, R> write)263 public Task<Ice.OutputStream> setResultTask<R>(Task<R> task, Action<Ice.OutputStream, R> write) 264 { 265 if(task == null) 266 { 267 _os = startWriteParams(); 268 write(_os, default(R)); 269 endWriteParams(_os); 270 return null; // Response is cached in the Incoming to not have to create unecessary Task 271 } 272 else 273 { 274 return task.ContinueWith((Task<R> t) => 275 { 276 var result = t.GetAwaiter().GetResult(); 277 var os = startWriteParams(); 278 write(os, result); 279 endWriteParams(os); 280 return Task.FromResult(os); 281 }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); 282 } 283 } 284 setResultTask(Task task)285 public Task<Ice.OutputStream> setResultTask(Task task) 286 { 287 if(task == null) 288 { 289 _os = writeEmptyParams(); 290 return null; 291 } 292 else 293 { 294 return task.ContinueWith((Task t) => 295 { 296 t.GetAwaiter().GetResult(); 297 return Task.FromResult(writeEmptyParams()); 298 }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); 299 } 300 } 301 302 public Task<Ice.OutputStream> setMarshaledResultTask<T>(Task<T> task) where T : Ice.MarshaledResult 303 { 304 if(task == null) 305 { 306 _os = default(T).getOutputStream(_current); 307 return null; // Response is cached in the Incoming to not have to create unecessary Task 308 } 309 else 310 { 311 return task.ContinueWith((Task<T> t) => 312 { 313 return Task.FromResult(t.GetAwaiter().GetResult().getOutputStream(_current)); 314 }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); 315 } 316 } 317 completed(Exception exc, bool amd)318 public void completed(Exception exc, bool amd) 319 { 320 try 321 { 322 if(_locator != null) 323 { 324 Debug.Assert(_locator != null && _servant != null); 325 try 326 { 327 _locator.finished(_current, _servant, _cookie); 328 } 329 catch(Exception ex) 330 { 331 handleException(ex, amd); 332 return; 333 } 334 } 335 336 Debug.Assert(_responseHandler != null); 337 338 if(exc != null) 339 { 340 handleException(exc, amd); 341 } 342 else if(_response) 343 { 344 if(_observer != null) 345 { 346 _observer.reply(_os.size() - Protocol.headerSize - 4); 347 } 348 _responseHandler.sendResponse(_current.requestId, _os, _compress, amd); 349 } 350 else 351 { 352 _responseHandler.sendNoResponse(); 353 } 354 } 355 catch(Ice.LocalException ex) 356 { 357 _responseHandler.invokeException(_current.requestId, ex, 1, amd); 358 } 359 finally 360 { 361 if(_observer != null) 362 { 363 _observer.detach(); 364 _observer = null; 365 } 366 _responseHandler = null; 367 } 368 } 369 startOver()370 public void startOver() 371 { 372 if(_inParamPos == -1) 373 { 374 // 375 // That's the first startOver, so almost nothing to do 376 // 377 _inParamPos = _is.pos(); 378 } 379 else 380 { 381 // 382 // Let's rewind _is, reset _os 383 // 384 _is.pos(_inParamPos); 385 _os = null; 386 } 387 } 388 skipReadParams()389 public void skipReadParams() 390 { 391 // 392 // Remember the encoding used by the input parameters, we'll 393 // encode the response parameters with the same encoding. 394 // 395 _current.encoding = _is.skipEncapsulation(); 396 } 397 startReadParams()398 public Ice.InputStream startReadParams() 399 { 400 // 401 // Remember the encoding used by the input parameters, we'll 402 // encode the response parameters with the same encoding. 403 // 404 _current.encoding = _is.startEncapsulation(); 405 return _is; 406 } 407 endReadParams()408 public void endReadParams() 409 { 410 _is.endEncapsulation(); 411 } 412 readEmptyParams()413 public void readEmptyParams() 414 { 415 _current.encoding = _is.skipEmptyEncapsulation(); 416 } 417 readParamEncaps()418 public byte[] readParamEncaps() 419 { 420 return _is.readEncapsulation(out _current.encoding); 421 } 422 setFormat(Ice.FormatType format)423 public void setFormat(Ice.FormatType format) 424 { 425 _format = format; 426 } 427 createResponseOutputStream(Ice.Current current)428 static public Ice.OutputStream createResponseOutputStream(Ice.Current current) 429 { 430 var os = new Ice.OutputStream(current.adapter.getCommunicator(), Ice.Util.currentProtocolEncoding); 431 os.writeBlob(Protocol.replyHdr); 432 os.writeInt(current.requestId); 433 os.writeByte(ReplyStatus.replyOK); 434 return os; 435 } 436 startWriteParams()437 public Ice.OutputStream startWriteParams() 438 { 439 if(!_response) 440 { 441 throw new Ice.MarshalException("can't marshal out parameters for oneway dispatch"); 442 } 443 444 var os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); 445 os.writeBlob(Protocol.replyHdr); 446 os.writeInt(_current.requestId); 447 os.writeByte(ReplyStatus.replyOK); 448 os.startEncapsulation(_current.encoding, _format); 449 return os; 450 } 451 endWriteParams(Ice.OutputStream os)452 public void endWriteParams(Ice.OutputStream os) 453 { 454 if(_response) 455 { 456 os.endEncapsulation(); 457 } 458 } 459 writeEmptyParams()460 public Ice.OutputStream writeEmptyParams() 461 { 462 if(_response) 463 { 464 var os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); 465 os.writeBlob(Protocol.replyHdr); 466 os.writeInt(_current.requestId); 467 os.writeByte(ReplyStatus.replyOK); 468 os.writeEmptyEncapsulation(_current.encoding); 469 return os; 470 } 471 else 472 { 473 return null; 474 } 475 } 476 writeParamEncaps(byte[] v, bool ok)477 public Ice.OutputStream writeParamEncaps(byte[] v, bool ok) 478 { 479 if(!ok && _observer != null) 480 { 481 _observer.userException(); 482 } 483 484 if(_response) 485 { 486 var os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); 487 os.writeBlob(Protocol.replyHdr); 488 os.writeInt(_current.requestId); 489 os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException); 490 if(v == null || v.Length == 0) 491 { 492 os.writeEmptyEncapsulation(_current.encoding); 493 } 494 else 495 { 496 os.writeEncapsulation(v); 497 } 498 return os; 499 } 500 else 501 { 502 return null; 503 } 504 } 505 warning(Exception ex)506 private void warning(Exception ex) 507 { 508 Debug.Assert(_instance != null); 509 510 using(StringWriter sw = new StringWriter(CultureInfo.CurrentCulture)) 511 { 512 IceUtilInternal.OutputBase output = new IceUtilInternal.OutputBase(sw); 513 Ice.ToStringMode toStringMode = _instance.toStringMode(); 514 output.setUseTab(false); 515 output.print("dispatch exception:"); 516 output.print("\nidentity: " + Ice.Util.identityToString(_current.id, toStringMode)); 517 output.print("\nfacet: " + IceUtilInternal.StringUtil.escapeString(_current.facet, "", toStringMode)); 518 output.print("\noperation: " + _current.operation); 519 if(_current.con != null) 520 { 521 try 522 { 523 for(Ice.ConnectionInfo p = _current.con.getInfo(); p != null; p = p.underlying) 524 { 525 if(p is Ice.IPConnectionInfo) 526 { 527 Ice.IPConnectionInfo ipinfo = p as Ice.IPConnectionInfo; 528 output.print("\nremote host: " + ipinfo.remoteAddress + " remote port: " + ipinfo.remotePort); 529 break; 530 } 531 } 532 } 533 catch(Ice.LocalException) 534 { 535 } 536 } 537 output.print("\n"); 538 output.print(ex.ToString()); 539 _instance.initializationData().logger.warning(sw.ToString()); 540 } 541 } 542 handleException(Exception exc, bool amd)543 private void handleException(Exception exc, bool amd) 544 { 545 Debug.Assert(_responseHandler != null); 546 547 if(exc is Ice.SystemException) 548 { 549 if(_responseHandler.systemException(_current.requestId, (Ice.SystemException)exc, amd)) 550 { 551 return; 552 } 553 } 554 555 try 556 { 557 throw exc; 558 } 559 catch(Ice.RequestFailedException ex) 560 { 561 if(ex.id == null || ex.id.name == null || ex.id.name.Length == 0) 562 { 563 ex.id = _current.id; 564 } 565 566 if(ex.facet == null || ex.facet.Length == 0) 567 { 568 ex.facet = _current.facet; 569 } 570 571 if(ex.operation == null || ex.operation.Length == 0) 572 { 573 ex.operation = _current.operation; 574 } 575 576 if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) 577 { 578 warning(ex); 579 } 580 581 if(_observer != null) 582 { 583 _observer.failed(ex.ice_id()); 584 } 585 586 if(_response) 587 { 588 _os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); 589 _os.writeBlob(Protocol.replyHdr); 590 _os.writeInt(_current.requestId); 591 if(ex is Ice.ObjectNotExistException) 592 { 593 _os.writeByte(ReplyStatus.replyObjectNotExist); 594 } 595 else if(ex is Ice.FacetNotExistException) 596 { 597 _os.writeByte(ReplyStatus.replyFacetNotExist); 598 } 599 else if(ex is Ice.OperationNotExistException) 600 { 601 _os.writeByte(ReplyStatus.replyOperationNotExist); 602 } 603 else 604 { 605 Debug.Assert(false); 606 } 607 ex.id.ice_writeMembers(_os); 608 609 // 610 // For compatibility with the old FacetPath. 611 // 612 if(ex.facet == null || ex.facet.Length == 0) 613 { 614 _os.writeStringSeq(null); 615 } 616 else 617 { 618 string[] facetPath2 = { ex.facet }; 619 _os.writeStringSeq(facetPath2); 620 } 621 622 _os.writeString(ex.operation); 623 624 if(_observer != null) 625 { 626 _observer.reply(_os.size() - Protocol.headerSize - 4); 627 } 628 _responseHandler.sendResponse(_current.requestId, _os, _compress, amd); 629 } 630 else 631 { 632 _responseHandler.sendNoResponse(); 633 } 634 } 635 catch(Ice.UnknownLocalException ex) 636 { 637 if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 0) 638 { 639 warning(ex); 640 } 641 642 if(_observer != null) 643 { 644 _observer.failed(ex.ice_id()); 645 } 646 647 if(_response) 648 { 649 _os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); 650 _os.writeBlob(Protocol.replyHdr); 651 _os.writeInt(_current.requestId); 652 _os.writeByte(ReplyStatus.replyUnknownLocalException); 653 _os.writeString(ex.unknown); 654 if(_observer != null) 655 { 656 _observer.reply(_os.size() - Protocol.headerSize - 4); 657 } 658 _responseHandler.sendResponse(_current.requestId, _os, _compress, amd); 659 } 660 else 661 { 662 _responseHandler.sendNoResponse(); 663 } 664 } 665 catch(Ice.UnknownUserException ex) 666 { 667 if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 0) 668 { 669 warning(ex); 670 } 671 672 if(_observer != null) 673 { 674 _observer.failed(ex.ice_id()); 675 } 676 677 if(_response) 678 { 679 _os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); 680 _os.writeBlob(Protocol.replyHdr); 681 _os.writeInt(_current.requestId); 682 _os.writeByte(ReplyStatus.replyUnknownUserException); 683 _os.writeString(ex.unknown); 684 if(_observer != null) 685 { 686 _observer.reply(_os.size() - Protocol.headerSize - 4); 687 } 688 Debug.Assert(_responseHandler != null && _current != null); 689 _responseHandler.sendResponse(_current.requestId, _os, _compress, amd); 690 } 691 else 692 { 693 _responseHandler.sendNoResponse(); 694 } 695 } 696 catch(Ice.UnknownException ex) 697 { 698 if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 0) 699 { 700 warning(ex); 701 } 702 703 if(_observer != null) 704 { 705 _observer.failed(ex.ice_id()); 706 } 707 708 if(_response) 709 { 710 _os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); 711 _os.writeBlob(Protocol.replyHdr); 712 _os.writeInt(_current.requestId); 713 _os.writeByte(ReplyStatus.replyUnknownException); 714 _os.writeString(ex.unknown); 715 if(_observer != null) 716 { 717 _observer.reply(_os.size() - Protocol.headerSize - 4); 718 } 719 _responseHandler.sendResponse(_current.requestId, _os, _compress, amd); 720 } 721 else 722 { 723 _responseHandler.sendNoResponse(); 724 } 725 } 726 catch(Ice.UserException ex) 727 { 728 if(_observer != null) 729 { 730 _observer.userException(); 731 } 732 733 if(_response) 734 { 735 _os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); 736 _os.writeBlob(Protocol.replyHdr); 737 _os.writeInt(_current.requestId); 738 _os.writeByte(ReplyStatus.replyUserException); 739 _os.startEncapsulation(_current.encoding, _format); 740 _os.writeException(ex); 741 _os.endEncapsulation(); 742 if(_observer != null) 743 { 744 _observer.reply(_os.size() - Protocol.headerSize - 4); 745 } 746 _responseHandler.sendResponse(_current.requestId, _os, _compress, false); 747 } 748 else 749 { 750 _responseHandler.sendNoResponse(); 751 } 752 } 753 catch(Ice.Exception ex) 754 { 755 if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 0) 756 { 757 warning(ex); 758 } 759 760 if(_observer != null) 761 { 762 _observer.failed(ex.ice_id()); 763 } 764 765 if(_response) 766 { 767 _os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); 768 _os.writeBlob(Protocol.replyHdr); 769 _os.writeInt(_current.requestId); 770 _os.writeByte(ReplyStatus.replyUnknownLocalException); 771 _os.writeString(ex.ice_id() + "\n" + ex.StackTrace); 772 if(_observer != null) 773 { 774 _observer.reply(_os.size() - Protocol.headerSize - 4); 775 } 776 _responseHandler.sendResponse(_current.requestId, _os, _compress, amd); 777 } 778 else 779 { 780 _responseHandler.sendNoResponse(); 781 } 782 } 783 catch(Exception ex) 784 { 785 if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 0) 786 { 787 warning(ex); 788 } 789 790 if(_observer != null) 791 { 792 _observer.failed(ex.GetType().FullName); 793 } 794 795 if(_response) 796 { 797 _os = new Ice.OutputStream(_instance, Ice.Util.currentProtocolEncoding); 798 _os.writeBlob(Protocol.replyHdr); 799 _os.writeInt(_current.requestId); 800 _os.writeByte(ReplyStatus.replyUnknownException); 801 _os.writeString(ex.ToString()); 802 if(_observer != null) 803 { 804 _observer.reply(_os.size() - Protocol.headerSize - 4); 805 } 806 _responseHandler.sendResponse(_current.requestId, _os, _compress, amd); 807 } 808 else 809 { 810 _responseHandler.sendNoResponse(); 811 } 812 } 813 814 if(_observer != null) 815 { 816 _observer.detach(); 817 _observer = null; 818 } 819 _responseHandler = null; 820 } 821 822 private Instance _instance; 823 private Ice.Current _current; 824 private Ice.Object _servant; 825 private Ice.ServantLocator _locator; 826 private object _cookie; 827 private Ice.Instrumentation.DispatchObserver _observer; 828 private ResponseHandler _responseHandler; 829 830 private bool _response; 831 private byte _compress; 832 private Ice.FormatType _format = Ice.FormatType.DefaultFormat; 833 834 private Ice.OutputStream _os; 835 private Ice.InputStream _is; 836 837 private int _inParamPos = -1; 838 839 public Incoming next; // For use by Connection. 840 } 841 } 842