1 //
2 // Author: Atsushi Enomoto <atsushi@ximian.com>
3 //
4 // Copyright (C) 2010 Novell, Inc (http://www.novell.com)
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining
7 // a copy of this software and associated documentation files (the
8 // "Software"), to deal in the Software without restriction, including
9 // without limitation the rights to use, copy, modify, merge, publish,
10 // distribute, sublicense, and/or sell copies of the Software, and to
11 // permit persons to whom the Software is furnished to do so, subject to
12 // the following conditions:
13 //
14 // The above copyright notice and this permission notice shall be
15 // included in all copies or substantial portions of the Software.
16 //
17 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 //
25 using System;
26 using System.Collections.Generic;
27 using System.IO;
28 using System.Linq;
29 using System.Net;
30 using System.Net.NetworkInformation;
31 using System.Net.Sockets;
32 using System.ServiceModel;
33 using System.ServiceModel.Channels;
34 using System.ServiceModel.Discovery;
35 using System.Threading;
36 using System.Xml;
37 
38 namespace System.ServiceModel.Discovery.Udp
39 {
40 	internal class UdpDuplexChannel : ChannelBase, IDuplexChannel
41 	{
42 		// channel factory
UdpDuplexChannel(UdpChannelFactory factory, BindingContext context, EndpointAddress address, Uri via)43 		public UdpDuplexChannel (UdpChannelFactory factory, BindingContext context, EndpointAddress address, Uri via)
44 			: base (factory)
45 		{
46 			if (factory == null)
47 				throw new ArgumentNullException ("factory");
48 			if (context == null)
49 				throw new ArgumentNullException ("context");
50 			if (address == null)
51 				throw new ArgumentNullException ("address");
52 
53 			binding_element = factory.Source;
54 			RemoteAddress = address;
55 			Via = via;
56 			FillMessageEncoder (context);
57 		}
58 
UdpDuplexChannel(UdpChannelListener listener)59 		public UdpDuplexChannel (UdpChannelListener listener)
60 			: base (listener)
61 		{
62 			binding_element = listener.Source;
63 			LocalAddress = new EndpointAddress (listener.Uri);
64 			FillMessageEncoder (listener.Context);
65 		}
66 
67 		MessageEncoder message_encoder;
68 		UdpClient client;
69 		IPAddress multicast_address;
70 		UdpTransportBindingElement binding_element;
71 
72 		// for servers
73 		public EndpointAddress LocalAddress { get; private set; }
74 		// for clients
75 		public EndpointAddress RemoteAddress { get; private set; }
76 
77 		public Uri Via { get; private set; }
78 
FillMessageEncoder(BindingContext ctx)79 		void FillMessageEncoder (BindingContext ctx)
80 		{
81 			var mbe = (MessageEncodingBindingElement) ctx.Binding.Elements.FirstOrDefault (be => be is MessageEncodingBindingElement);
82 			if (mbe == null)
83 				mbe = new TextMessageEncodingBindingElement ();
84 			message_encoder = mbe.CreateMessageEncoderFactory ().Encoder;
85 		}
86 
Send(Message message)87 		public void Send (Message message)
88 		{
89 			Send (message, DefaultSendTimeout);
90 		}
91 
92 		static readonly Random rnd = new Random ();
93 
GetSenderClient(Message message)94 		UdpClient GetSenderClient (Message message)
95 		{
96 			if (RemoteAddress != null)
97 				return client;
98 
99 			var rmp = message.Properties [RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty;
100 			if (rmp == null)
101 				throw new ArgumentException ("This duplex channel from the channel listener cannot send messages without RemoteEndpointMessageProperty");
102 			var cli = new UdpClient ();
103 			cli.Connect (IPAddress.Parse (rmp.Address), rmp.Port);
104 			return cli;
105 		}
106 
Send(Message message, TimeSpan timeout)107 		public void Send (Message message, TimeSpan timeout)
108 		{
109 			if (State != CommunicationState.Opened)
110 				throw new InvalidOperationException ("The UDP channel must be opened before sending a message.");
111 
112 			var cli = GetSenderClient (message);
113 			try {
114 				SendCore (cli, message, timeout);
115 			} finally {
116 				if (cli != client)
117 					cli.Close ();
118 			}
119 		}
120 
SendCore(UdpClient cli, Message message, TimeSpan timeout)121 		void SendCore (UdpClient cli, Message message, TimeSpan timeout)
122 		{
123 			Logger.LogMessage (MessageLogSourceKind.TransportSend, ref message, int.MaxValue);
124 
125 			var ms = new MemoryStream ();
126 			message_encoder.WriteMessage (message, ms);
127 			// It seems .NET sends the same Message a couple of times so that the receivers don't miss it. So, do the same hack.
128 			for (int i = 0; i < 3; i++) {
129 				// FIXME: use MaxAnnouncementDelay. It is fixed now.
130 				Thread.Sleep (rnd.Next (50, 500));
131 				cli.Send (ms.GetBuffer (), (int) ms.Length);
132 			}
133 		}
134 
WaitForMessage(TimeSpan timeout)135 		public bool WaitForMessage (TimeSpan timeout)
136 		{
137 			throw new NotImplementedException ();
138 		}
139 
Receive()140 		public Message Receive ()
141 		{
142 			return Receive (DefaultReceiveTimeout);
143 		}
144 
Receive(TimeSpan timeout)145 		public Message Receive (TimeSpan timeout)
146 		{
147 			Message msg;
148 			if (!TryReceive (timeout, out msg))
149 				throw new TimeoutException ();
150 			return msg;
151 		}
152 
TryReceive(TimeSpan timeout, out Message msg)153 		public bool TryReceive (TimeSpan timeout, out Message msg)
154 		{
155 			DateTime start = DateTime.UtcNow;
156 			ThrowIfDisposedOrNotOpen ();
157 			msg = null;
158 
159 			if (client == null) // could be invoked while being closed.
160 				return false;
161 
162 			byte [] bytes = null;
163 			IPEndPoint ip = new IPEndPoint (IPAddress.Any, 0);
164 			ManualResetEvent wait = new ManualResetEvent (false);
165 			var ar = client.BeginReceive (delegate (IAsyncResult result) {
166 				try {
167 					UdpClient cli = (UdpClient) result.AsyncState;
168 					try {
169 						bytes = cli.EndReceive (result, ref ip);
170 					} catch (ObjectDisposedException) {
171 						if (State == CommunicationState.Opened)
172 							throw;
173 						// Otherwise, called during shutdown. Ignore it.
174 					}
175 				} finally {
176 					wait.Set ();
177 				}
178 			}, client);
179 
180 			if (!ar.IsCompleted && !wait.WaitOne (timeout))
181 				return false;
182 			if (bytes == null || bytes.Length == 0)
183 				return false;
184 
185 			// Clients will send the same message many times, and this receiver has to
186 
187 			// FIXME: give maxSizeOfHeaders
188 			msg = message_encoder.ReadMessage (new MemoryStream (bytes), int.MaxValue);
189 			var id = msg.Headers.MessageId;
190 			if (message_ids.Contains (id))
191 				return TryReceive (timeout - (DateTime.UtcNow - start), out msg);
192 			if (id != null) {
193 				message_ids.Enqueue (id);
194 				if (message_ids.Count >= binding_element.TransportSettings.DuplicateMessageHistoryLength)
195 					message_ids.Dequeue ();
196 			}
197 			msg.Properties.Add ("Via", LocalAddress.Uri);
198 			msg.Properties.Add ("Encoder", message_encoder);
199 			msg.Properties.Add (RemoteEndpointMessageProperty.Name, new RemoteEndpointMessageProperty (ip.Address.ToString (), ip.Port));
200 
201 			Logger.LogMessage (MessageLogSourceKind.TransportReceive, ref msg, binding_element.MaxReceivedMessageSize);
202 
203 			return true;
204 		}
205 
206 		Queue<UniqueId> message_ids = new Queue<UniqueId> ();
207 
OnAbort()208 		protected override void OnAbort ()
209 		{
210 			OnClose (TimeSpan.Zero);
211 		}
212 
213 		Action<TimeSpan> open_delegate, close_delegate;
214 
OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)215 		protected override IAsyncResult OnBeginClose (TimeSpan timeout, AsyncCallback callback, object state)
216 		{
217 			if (close_delegate == null)
218 				close_delegate = new Action<TimeSpan> (OnClose);
219 			return close_delegate.BeginInvoke (timeout, callback, state);
220 		}
221 
OnEndClose(IAsyncResult result)222 		protected override void OnEndClose (IAsyncResult result)
223 		{
224 			close_delegate.EndInvoke (result);
225 		}
226 
OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)227 		protected override IAsyncResult OnBeginOpen (TimeSpan timeout, AsyncCallback callback, object state)
228 		{
229 			if (open_delegate == null)
230 				open_delegate = new Action<TimeSpan> (OnOpen);
231 			return open_delegate.BeginInvoke (timeout, callback, state);
232 		}
233 
OnEndOpen(IAsyncResult result)234 		protected override void OnEndOpen (IAsyncResult result)
235 		{
236 			open_delegate.EndInvoke (result);
237 		}
238 
OnClose(TimeSpan timeout)239 		protected override void OnClose (TimeSpan timeout)
240 		{
241 			if (client != null) {
242 				if (multicast_address != null) {
243 					client.DropMulticastGroup (multicast_address, LocalAddress.Uri.Port);
244 					multicast_address = null;
245 				}
246 				client.Close ();
247 			}
248 			client = null;
249 		}
250 
OnOpen(TimeSpan timeout)251 		protected override void OnOpen (TimeSpan timeout)
252 		{
253 			if (RemoteAddress != null) {
254 				client = new UdpClient ();
255 				var uri = Via ?? RemoteAddress.Uri;
256 				client.Connect (uri.Host, uri.Port);
257 			} else {
258 				var ip = IPAddress.Parse (LocalAddress.Uri.Host);
259 				bool isMulticast = NetworkInterface.GetAllNetworkInterfaces ().Any (nic => nic.SupportsMulticast && nic.GetIPProperties ().MulticastAddresses.Any (mca => mca.Address.Equals (ip)));
260 				int port = LocalAddress.Uri.Port;
261 				if (isMulticast) {
262 					multicast_address = ip;
263 					client = new UdpClient (new IPEndPoint (IPAddress.Any, port));
264 					client.JoinMulticastGroup (ip, binding_element.TransportSettings.TimeToLive);
265 				}
266 				else
267 					client = new UdpClient (new IPEndPoint (ip, port));
268 			}
269 
270 			client.EnableBroadcast = true;
271 
272 			// FIXME: apply UdpTransportSetting here.
273 			var settings = binding_element.TransportSettings;
274 			if (settings.MulticastInterfaceId != null)
275 				client.Client.SetSocketOption (SocketOptionLevel.Udp, SocketOptionName.MulticastInterface, settings.MulticastInterfaceId);
276 		}
277 
278 		Func<TimeSpan,Message> receive_delegate;
279 
BeginReceive(AsyncCallback callback, object state)280 		public IAsyncResult BeginReceive (AsyncCallback callback, object state)
281 		{
282 			return BeginReceive (DefaultReceiveTimeout, callback, state);
283 		}
284 
BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)285 		public IAsyncResult BeginReceive (TimeSpan timeout, AsyncCallback callback, object state)
286 		{
287 			if (receive_delegate == null)
288 				receive_delegate = new Func<TimeSpan,Message> (Receive);
289 			return receive_delegate.BeginInvoke (timeout, callback, state);
290 		}
291 
EndReceive(IAsyncResult result)292 		public Message EndReceive (IAsyncResult result)
293 		{
294 			return receive_delegate.EndInvoke (result);
295 		}
296 
TryReceiveDelegate(TimeSpan timeout, out Message msg)297 		delegate bool TryReceiveDelegate (TimeSpan timeout, out Message msg);
298 		TryReceiveDelegate try_receive_delegate;
299 
BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)300 		public IAsyncResult BeginTryReceive (TimeSpan timeout, AsyncCallback callback, object state)
301 		{
302 			if (try_receive_delegate == null)
303 				try_receive_delegate = new TryReceiveDelegate (TryReceive);
304 			Message dummy;
305 			return try_receive_delegate.BeginInvoke (timeout, out dummy, callback, state);
306 		}
307 
EndTryReceive(IAsyncResult result, out Message msg)308 		public bool EndTryReceive (IAsyncResult result, out Message msg)
309 		{
310 			return try_receive_delegate.EndInvoke (out msg, result);
311 		}
312 
313 		Func<TimeSpan,bool> wait_delegate;
314 
BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)315 		public IAsyncResult BeginWaitForMessage (TimeSpan timeout, AsyncCallback callback, object state)
316 		{
317 			if (wait_delegate == null)
318 				wait_delegate = new Func<TimeSpan,bool> (WaitForMessage);
319 			return wait_delegate.BeginInvoke (timeout, callback, state);
320 		}
321 
EndWaitForMessage(IAsyncResult result)322 		public bool EndWaitForMessage (IAsyncResult result)
323 		{
324 			return wait_delegate.EndInvoke (result);
325 		}
326 
327 		Action<Message,TimeSpan> send_delegate;
328 
BeginSend(Message message, AsyncCallback callback, object state)329 		public IAsyncResult BeginSend (Message message, AsyncCallback callback, object state)
330 		{
331 			return BeginSend (message, DefaultSendTimeout, callback, state);
332 		}
333 
BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)334 		public IAsyncResult BeginSend (Message message, TimeSpan timeout, AsyncCallback callback, object state)
335 		{
336 			if (send_delegate == null)
337 				send_delegate = new Action<Message,TimeSpan> (Send);
338 			return send_delegate.BeginInvoke (message, timeout, callback, state);
339 		}
340 
EndSend(IAsyncResult result)341 		public void EndSend (IAsyncResult result)
342 		{
343 			send_delegate.EndInvoke (result);
344 		}
345 	}
346 }
347