1 // 2 // Author: Atsushi Enomoto <atsushi@ximian.com> 3 // 4 // Copyright (C) 2010 Novell, Inc (http://www.novell.com) 5 // 6 // Permission is hereby granted, free of charge, to any person obtaining 7 // a copy of this software and associated documentation files (the 8 // "Software"), to deal in the Software without restriction, including 9 // without limitation the rights to use, copy, modify, merge, publish, 10 // distribute, sublicense, and/or sell copies of the Software, and to 11 // permit persons to whom the Software is furnished to do so, subject to 12 // the following conditions: 13 // 14 // The above copyright notice and this permission notice shall be 15 // included in all copies or substantial portions of the Software. 16 // 17 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 21 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 22 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 23 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 24 // 25 using System; 26 using System.Collections.Generic; 27 using System.IO; 28 using System.Linq; 29 using System.Net; 30 using System.Net.NetworkInformation; 31 using System.Net.Sockets; 32 using System.ServiceModel; 33 using System.ServiceModel.Channels; 34 using System.ServiceModel.Discovery; 35 using System.Threading; 36 using System.Xml; 37 38 namespace System.ServiceModel.Discovery.Udp 39 { 40 internal class UdpDuplexChannel : ChannelBase, IDuplexChannel 41 { 42 // channel factory UdpDuplexChannel(UdpChannelFactory factory, BindingContext context, EndpointAddress address, Uri via)43 public UdpDuplexChannel (UdpChannelFactory factory, BindingContext context, EndpointAddress address, Uri via) 44 : base (factory) 45 { 46 if (factory == null) 47 throw new ArgumentNullException ("factory"); 48 if (context == null) 49 throw new ArgumentNullException ("context"); 50 if (address == null) 51 throw new ArgumentNullException ("address"); 52 53 binding_element = factory.Source; 54 RemoteAddress = address; 55 Via = via; 56 FillMessageEncoder (context); 57 } 58 UdpDuplexChannel(UdpChannelListener listener)59 public UdpDuplexChannel (UdpChannelListener listener) 60 : base (listener) 61 { 62 binding_element = listener.Source; 63 LocalAddress = new EndpointAddress (listener.Uri); 64 FillMessageEncoder (listener.Context); 65 } 66 67 MessageEncoder message_encoder; 68 UdpClient client; 69 IPAddress multicast_address; 70 UdpTransportBindingElement binding_element; 71 72 // for servers 73 public EndpointAddress LocalAddress { get; private set; } 74 // for clients 75 public EndpointAddress RemoteAddress { get; private set; } 76 77 public Uri Via { get; private set; } 78 FillMessageEncoder(BindingContext ctx)79 void FillMessageEncoder (BindingContext ctx) 80 { 81 var mbe = (MessageEncodingBindingElement) ctx.Binding.Elements.FirstOrDefault (be => be is MessageEncodingBindingElement); 82 if (mbe == null) 83 mbe = new TextMessageEncodingBindingElement (); 84 message_encoder = mbe.CreateMessageEncoderFactory ().Encoder; 85 } 86 Send(Message message)87 public void Send (Message message) 88 { 89 Send (message, DefaultSendTimeout); 90 } 91 92 static readonly Random rnd = new Random (); 93 GetSenderClient(Message message)94 UdpClient GetSenderClient (Message message) 95 { 96 if (RemoteAddress != null) 97 return client; 98 99 var rmp = message.Properties [RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty; 100 if (rmp == null) 101 throw new ArgumentException ("This duplex channel from the channel listener cannot send messages without RemoteEndpointMessageProperty"); 102 var cli = new UdpClient (); 103 cli.Connect (IPAddress.Parse (rmp.Address), rmp.Port); 104 return cli; 105 } 106 Send(Message message, TimeSpan timeout)107 public void Send (Message message, TimeSpan timeout) 108 { 109 if (State != CommunicationState.Opened) 110 throw new InvalidOperationException ("The UDP channel must be opened before sending a message."); 111 112 var cli = GetSenderClient (message); 113 try { 114 SendCore (cli, message, timeout); 115 } finally { 116 if (cli != client) 117 cli.Close (); 118 } 119 } 120 SendCore(UdpClient cli, Message message, TimeSpan timeout)121 void SendCore (UdpClient cli, Message message, TimeSpan timeout) 122 { 123 Logger.LogMessage (MessageLogSourceKind.TransportSend, ref message, int.MaxValue); 124 125 var ms = new MemoryStream (); 126 message_encoder.WriteMessage (message, ms); 127 // It seems .NET sends the same Message a couple of times so that the receivers don't miss it. So, do the same hack. 128 for (int i = 0; i < 3; i++) { 129 // FIXME: use MaxAnnouncementDelay. It is fixed now. 130 Thread.Sleep (rnd.Next (50, 500)); 131 cli.Send (ms.GetBuffer (), (int) ms.Length); 132 } 133 } 134 WaitForMessage(TimeSpan timeout)135 public bool WaitForMessage (TimeSpan timeout) 136 { 137 throw new NotImplementedException (); 138 } 139 Receive()140 public Message Receive () 141 { 142 return Receive (DefaultReceiveTimeout); 143 } 144 Receive(TimeSpan timeout)145 public Message Receive (TimeSpan timeout) 146 { 147 Message msg; 148 if (!TryReceive (timeout, out msg)) 149 throw new TimeoutException (); 150 return msg; 151 } 152 TryReceive(TimeSpan timeout, out Message msg)153 public bool TryReceive (TimeSpan timeout, out Message msg) 154 { 155 DateTime start = DateTime.UtcNow; 156 ThrowIfDisposedOrNotOpen (); 157 msg = null; 158 159 if (client == null) // could be invoked while being closed. 160 return false; 161 162 byte [] bytes = null; 163 IPEndPoint ip = new IPEndPoint (IPAddress.Any, 0); 164 ManualResetEvent wait = new ManualResetEvent (false); 165 var ar = client.BeginReceive (delegate (IAsyncResult result) { 166 try { 167 UdpClient cli = (UdpClient) result.AsyncState; 168 try { 169 bytes = cli.EndReceive (result, ref ip); 170 } catch (ObjectDisposedException) { 171 if (State == CommunicationState.Opened) 172 throw; 173 // Otherwise, called during shutdown. Ignore it. 174 } 175 } finally { 176 wait.Set (); 177 } 178 }, client); 179 180 if (!ar.IsCompleted && !wait.WaitOne (timeout)) 181 return false; 182 if (bytes == null || bytes.Length == 0) 183 return false; 184 185 // Clients will send the same message many times, and this receiver has to 186 187 // FIXME: give maxSizeOfHeaders 188 msg = message_encoder.ReadMessage (new MemoryStream (bytes), int.MaxValue); 189 var id = msg.Headers.MessageId; 190 if (message_ids.Contains (id)) 191 return TryReceive (timeout - (DateTime.UtcNow - start), out msg); 192 if (id != null) { 193 message_ids.Enqueue (id); 194 if (message_ids.Count >= binding_element.TransportSettings.DuplicateMessageHistoryLength) 195 message_ids.Dequeue (); 196 } 197 msg.Properties.Add ("Via", LocalAddress.Uri); 198 msg.Properties.Add ("Encoder", message_encoder); 199 msg.Properties.Add (RemoteEndpointMessageProperty.Name, new RemoteEndpointMessageProperty (ip.Address.ToString (), ip.Port)); 200 201 Logger.LogMessage (MessageLogSourceKind.TransportReceive, ref msg, binding_element.MaxReceivedMessageSize); 202 203 return true; 204 } 205 206 Queue<UniqueId> message_ids = new Queue<UniqueId> (); 207 OnAbort()208 protected override void OnAbort () 209 { 210 OnClose (TimeSpan.Zero); 211 } 212 213 Action<TimeSpan> open_delegate, close_delegate; 214 OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)215 protected override IAsyncResult OnBeginClose (TimeSpan timeout, AsyncCallback callback, object state) 216 { 217 if (close_delegate == null) 218 close_delegate = new Action<TimeSpan> (OnClose); 219 return close_delegate.BeginInvoke (timeout, callback, state); 220 } 221 OnEndClose(IAsyncResult result)222 protected override void OnEndClose (IAsyncResult result) 223 { 224 close_delegate.EndInvoke (result); 225 } 226 OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)227 protected override IAsyncResult OnBeginOpen (TimeSpan timeout, AsyncCallback callback, object state) 228 { 229 if (open_delegate == null) 230 open_delegate = new Action<TimeSpan> (OnOpen); 231 return open_delegate.BeginInvoke (timeout, callback, state); 232 } 233 OnEndOpen(IAsyncResult result)234 protected override void OnEndOpen (IAsyncResult result) 235 { 236 open_delegate.EndInvoke (result); 237 } 238 OnClose(TimeSpan timeout)239 protected override void OnClose (TimeSpan timeout) 240 { 241 if (client != null) { 242 if (multicast_address != null) { 243 client.DropMulticastGroup (multicast_address, LocalAddress.Uri.Port); 244 multicast_address = null; 245 } 246 client.Close (); 247 } 248 client = null; 249 } 250 OnOpen(TimeSpan timeout)251 protected override void OnOpen (TimeSpan timeout) 252 { 253 if (RemoteAddress != null) { 254 client = new UdpClient (); 255 var uri = Via ?? RemoteAddress.Uri; 256 client.Connect (uri.Host, uri.Port); 257 } else { 258 var ip = IPAddress.Parse (LocalAddress.Uri.Host); 259 bool isMulticast = NetworkInterface.GetAllNetworkInterfaces ().Any (nic => nic.SupportsMulticast && nic.GetIPProperties ().MulticastAddresses.Any (mca => mca.Address.Equals (ip))); 260 int port = LocalAddress.Uri.Port; 261 if (isMulticast) { 262 multicast_address = ip; 263 client = new UdpClient (new IPEndPoint (IPAddress.Any, port)); 264 client.JoinMulticastGroup (ip, binding_element.TransportSettings.TimeToLive); 265 } 266 else 267 client = new UdpClient (new IPEndPoint (ip, port)); 268 } 269 270 client.EnableBroadcast = true; 271 272 // FIXME: apply UdpTransportSetting here. 273 var settings = binding_element.TransportSettings; 274 if (settings.MulticastInterfaceId != null) 275 client.Client.SetSocketOption (SocketOptionLevel.Udp, SocketOptionName.MulticastInterface, settings.MulticastInterfaceId); 276 } 277 278 Func<TimeSpan,Message> receive_delegate; 279 BeginReceive(AsyncCallback callback, object state)280 public IAsyncResult BeginReceive (AsyncCallback callback, object state) 281 { 282 return BeginReceive (DefaultReceiveTimeout, callback, state); 283 } 284 BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)285 public IAsyncResult BeginReceive (TimeSpan timeout, AsyncCallback callback, object state) 286 { 287 if (receive_delegate == null) 288 receive_delegate = new Func<TimeSpan,Message> (Receive); 289 return receive_delegate.BeginInvoke (timeout, callback, state); 290 } 291 EndReceive(IAsyncResult result)292 public Message EndReceive (IAsyncResult result) 293 { 294 return receive_delegate.EndInvoke (result); 295 } 296 TryReceiveDelegate(TimeSpan timeout, out Message msg)297 delegate bool TryReceiveDelegate (TimeSpan timeout, out Message msg); 298 TryReceiveDelegate try_receive_delegate; 299 BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)300 public IAsyncResult BeginTryReceive (TimeSpan timeout, AsyncCallback callback, object state) 301 { 302 if (try_receive_delegate == null) 303 try_receive_delegate = new TryReceiveDelegate (TryReceive); 304 Message dummy; 305 return try_receive_delegate.BeginInvoke (timeout, out dummy, callback, state); 306 } 307 EndTryReceive(IAsyncResult result, out Message msg)308 public bool EndTryReceive (IAsyncResult result, out Message msg) 309 { 310 return try_receive_delegate.EndInvoke (out msg, result); 311 } 312 313 Func<TimeSpan,bool> wait_delegate; 314 BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)315 public IAsyncResult BeginWaitForMessage (TimeSpan timeout, AsyncCallback callback, object state) 316 { 317 if (wait_delegate == null) 318 wait_delegate = new Func<TimeSpan,bool> (WaitForMessage); 319 return wait_delegate.BeginInvoke (timeout, callback, state); 320 } 321 EndWaitForMessage(IAsyncResult result)322 public bool EndWaitForMessage (IAsyncResult result) 323 { 324 return wait_delegate.EndInvoke (result); 325 } 326 327 Action<Message,TimeSpan> send_delegate; 328 BeginSend(Message message, AsyncCallback callback, object state)329 public IAsyncResult BeginSend (Message message, AsyncCallback callback, object state) 330 { 331 return BeginSend (message, DefaultSendTimeout, callback, state); 332 } 333 BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)334 public IAsyncResult BeginSend (Message message, TimeSpan timeout, AsyncCallback callback, object state) 335 { 336 if (send_delegate == null) 337 send_delegate = new Action<Message,TimeSpan> (Send); 338 return send_delegate.BeginInvoke (message, timeout, callback, state); 339 } 340 EndSend(IAsyncResult result)341 public void EndSend (IAsyncResult result) 342 { 343 send_delegate.EndInvoke (result); 344 } 345 } 346 } 347