1 //------------------------------------------------------------ 2 // Copyright (c) Microsoft Corporation. All rights reserved. 3 //------------------------------------------------------------ 4 5 namespace System.ServiceModel.Channels 6 { 7 using System.Collections.Generic; 8 using System.Runtime; 9 using System.ServiceModel; 10 11 class InputChannel : InputQueueChannel<Message>, IInputChannel 12 { 13 EndpointAddress localAddress; 14 InputChannel(ChannelManagerBase channelManager, EndpointAddress localAddress)15 public InputChannel(ChannelManagerBase channelManager, EndpointAddress localAddress) 16 : base(channelManager) 17 { 18 this.localAddress = localAddress; 19 } 20 21 public EndpointAddress LocalAddress 22 { 23 get { return localAddress; } 24 } 25 GetProperty()26 public override T GetProperty<T>() 27 { 28 if (typeof(T) == typeof(IInputChannel)) 29 { 30 return (T)(object)this; 31 } 32 33 T baseProperty = base.GetProperty<T>(); 34 if (baseProperty != null) 35 { 36 return baseProperty; 37 } 38 39 return default(T); 40 } 41 OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)42 protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) 43 { 44 return new CompletedAsyncResult(callback, state); 45 } 46 OnEndOpen(IAsyncResult result)47 protected override void OnEndOpen(IAsyncResult result) 48 { 49 CompletedAsyncResult.End(result); 50 } 51 OnOpen(TimeSpan timeout)52 protected override void OnOpen(TimeSpan timeout) 53 { 54 } 55 Receive()56 public virtual Message Receive() 57 { 58 return this.Receive(this.DefaultReceiveTimeout); 59 } 60 Receive(TimeSpan timeout)61 public virtual Message Receive(TimeSpan timeout) 62 { 63 if (timeout < TimeSpan.Zero) 64 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( 65 new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0))); 66 67 this.ThrowPending(); 68 69 return InputChannel.HelpReceive(this, timeout); 70 } 71 BeginReceive(AsyncCallback callback, object state)72 public virtual IAsyncResult BeginReceive(AsyncCallback callback, object state) 73 { 74 return this.BeginReceive(this.DefaultReceiveTimeout, callback, state); 75 } 76 BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)77 public virtual IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) 78 { 79 if (timeout < TimeSpan.Zero) 80 { 81 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( 82 new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0))); 83 } 84 85 this.ThrowPending(); 86 87 return InputChannel.HelpBeginReceive(this, timeout, callback, state); 88 } 89 EndReceive(IAsyncResult result)90 public Message EndReceive(IAsyncResult result) 91 { 92 return InputChannel.HelpEndReceive(result); 93 } 94 TryReceive(TimeSpan timeout, out Message message)95 public virtual bool TryReceive(TimeSpan timeout, out Message message) 96 { 97 if (timeout < TimeSpan.Zero) 98 { 99 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( 100 new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0))); 101 } 102 103 this.ThrowPending(); 104 return base.Dequeue(timeout, out message); 105 } 106 BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)107 public virtual IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) 108 { 109 if (timeout < TimeSpan.Zero) 110 { 111 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( 112 new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0))); 113 } 114 115 this.ThrowPending(); 116 return base.BeginDequeue(timeout, callback, state); 117 } 118 EndTryReceive(IAsyncResult result, out Message message)119 public virtual bool EndTryReceive(IAsyncResult result, out Message message) 120 { 121 return base.EndDequeue(result, out message); 122 } 123 WaitForMessage(TimeSpan timeout)124 public bool WaitForMessage(TimeSpan timeout) 125 { 126 if (timeout < TimeSpan.Zero) 127 { 128 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( 129 new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0))); 130 } 131 132 this.ThrowPending(); 133 return base.WaitForItem(timeout); 134 } 135 BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)136 public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) 137 { 138 if (timeout < TimeSpan.Zero) 139 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( 140 new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0))); 141 142 this.ThrowPending(); 143 return base.BeginWaitForItem(timeout, callback, state); 144 } 145 EndWaitForMessage(IAsyncResult result)146 public bool EndWaitForMessage(IAsyncResult result) 147 { 148 return base.EndWaitForItem(result); 149 } 150 151 #region static Helpers to convert TryReceive to Receive HelpReceive(IInputChannel channel, TimeSpan timeout)152 internal static Message HelpReceive(IInputChannel channel, TimeSpan timeout) 153 { 154 Message message; 155 if (channel.TryReceive(timeout, out message)) 156 { 157 return message; 158 } 159 else 160 { 161 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(CreateReceiveTimedOutException(channel, timeout)); 162 } 163 } 164 HelpBeginReceive(IInputChannel channel, TimeSpan timeout, AsyncCallback callback, object state)165 internal static IAsyncResult HelpBeginReceive(IInputChannel channel, TimeSpan timeout, AsyncCallback callback, object state) 166 { 167 return new HelpReceiveAsyncResult(channel, timeout, callback, state); 168 } 169 HelpEndReceive(IAsyncResult result)170 internal static Message HelpEndReceive(IAsyncResult result) 171 { 172 return HelpReceiveAsyncResult.End(result); 173 } 174 175 class HelpReceiveAsyncResult : AsyncResult 176 { 177 IInputChannel channel; 178 TimeSpan timeout; 179 static AsyncCallback onReceive = Fx.ThunkCallback(new AsyncCallback(OnReceive)); 180 Message message; 181 HelpReceiveAsyncResult(IInputChannel channel, TimeSpan timeout, AsyncCallback callback, object state)182 public HelpReceiveAsyncResult(IInputChannel channel, TimeSpan timeout, AsyncCallback callback, object state) 183 : base(callback, state) 184 { 185 this.channel = channel; 186 this.timeout = timeout; 187 IAsyncResult result = channel.BeginTryReceive(timeout, onReceive, this); 188 189 if (!result.CompletedSynchronously) 190 { 191 return; 192 } 193 194 HandleReceiveComplete(result); 195 base.Complete(true); 196 } 197 End(IAsyncResult result)198 public static Message End(IAsyncResult result) 199 { 200 HelpReceiveAsyncResult thisPtr = AsyncResult.End<HelpReceiveAsyncResult>(result); 201 return thisPtr.message; 202 } 203 HandleReceiveComplete(IAsyncResult result)204 void HandleReceiveComplete(IAsyncResult result) 205 { 206 if (!this.channel.EndTryReceive(result, out this.message)) 207 { 208 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( 209 InputChannel.CreateReceiveTimedOutException(this.channel, this.timeout)); 210 } 211 } 212 OnReceive(IAsyncResult result)213 static void OnReceive(IAsyncResult result) 214 { 215 if (result.CompletedSynchronously) 216 { 217 return; 218 } 219 220 HelpReceiveAsyncResult thisPtr = (HelpReceiveAsyncResult)result.AsyncState; 221 Exception completionException = null; 222 try 223 { 224 thisPtr.HandleReceiveComplete(result); 225 } 226 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread 227 catch (Exception e) 228 { 229 if (Fx.IsFatal(e)) 230 { 231 throw; 232 } 233 234 completionException = e; 235 } 236 237 thisPtr.Complete(false, completionException); 238 } 239 } 240 CreateReceiveTimedOutException(IInputChannel channel, TimeSpan timeout)241 static Exception CreateReceiveTimedOutException(IInputChannel channel, TimeSpan timeout) 242 { 243 if (channel.LocalAddress != null) 244 { 245 return new TimeoutException(SR.GetString(SR.ReceiveTimedOut, channel.LocalAddress.Uri.AbsoluteUri, timeout)); 246 } 247 else 248 { 249 return new TimeoutException(SR.GetString(SR.ReceiveTimedOutNoLocalAddress, timeout)); 250 } 251 } 252 #endregion 253 } 254 } 255