1 //
2 // Mono.Messaging
3 //
4 // Authors:
5 //	  Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9 
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 //
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 //
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 
31 using System;
32 using System.Threading;
33 using System.Collections;
34 
35 namespace Mono.Messaging
36 {
37 
38 	public abstract class MessageQueueBase
39 	{
40 		protected abstract IMessageQueue Queue {
41 			get;
42 		}
43 
44 		public event CompletedEventHandler PeekCompleted;
45 
46 		public event CompletedEventHandler ReceiveCompleted;
47 
BeginPeek()48 		public IAsyncResult BeginPeek ()
49 		{
50 			return new PeekAsyncResult (null, Queue, MessagingProviderLocator.InfiniteTimeout,
51 			                            NullAsyncCallback);
52 		}
53 
BeginPeek(TimeSpan timeout)54 		public IAsyncResult BeginPeek (TimeSpan timeout)
55 		{
56 			return new PeekAsyncResult (null, Queue, timeout, NullAsyncCallback);
57 		}
58 
BeginPeek(TimeSpan timeout, object stateObject)59 		public IAsyncResult BeginPeek (TimeSpan timeout, object stateObject)
60 		{
61 			return new PeekAsyncResult (stateObject, Queue,
62 			                            timeout, NullAsyncCallback);
63 		}
64 
BeginPeek(TimeSpan timeout, object stateObject, AsyncCallback callback)65 		public IAsyncResult BeginPeek (TimeSpan timeout,
66 									  object stateObject,
67 									  AsyncCallback callback)
68 		{
69 			return new PeekAsyncResult (stateObject, Queue, timeout, callback);
70 		}
71 
EndPeek(IAsyncResult asyncResult)72 		public IMessage EndPeek (IAsyncResult asyncResult)
73 		{
74 			PeekAsyncResult result = (PeekAsyncResult) asyncResult;
75 			return result.Message;
76 		}
77 
BeginReceive()78 		public IAsyncResult BeginReceive ()
79 		{
80 			return new ReceiveAsyncResult (null, Queue, MessagingProviderLocator.InfiniteTimeout,
81 			                               NullAsyncCallback);
82 		}
83 
BeginReceive(TimeSpan timeout)84 		public IAsyncResult BeginReceive (TimeSpan timeout)
85 		{
86 			return new ReceiveAsyncResult (null, Queue, timeout, NullAsyncCallback);
87 		}
88 
BeginReceive(TimeSpan timeout, object stateObject)89 		public IAsyncResult BeginReceive (TimeSpan timeout, object stateObject)
90 		{
91 			return new ReceiveAsyncResult (stateObject, Queue, timeout, NullAsyncCallback);
92 		}
93 
BeginReceive(TimeSpan timeout, object stateObject, AsyncCallback callback)94 		public IAsyncResult BeginReceive (TimeSpan timeout,
95 		                                  object stateObject,
96 		                                  AsyncCallback callback)
97 		{
98 			return new ReceiveAsyncResult (stateObject, Queue, timeout, callback);
99 		}
100 
EndReceive(IAsyncResult asyncResult)101 		public IMessage EndReceive (IAsyncResult asyncResult)
102 		{
103 			ReceiveAsyncResult result = (ReceiveAsyncResult) asyncResult;
104 			return result.Message;
105 		}
106 
SendReceiveCompleted(IAsyncResult result)107 		public void SendReceiveCompleted (IAsyncResult result)
108 		{
109 			if (ReceiveCompleted == null)
110 				return;
111 
112 			ReceiveCompleted (this, new CompletedEventArgs (result));
113 		}
114 
SendPeekCompleted(IAsyncResult result)115 		public void SendPeekCompleted (IAsyncResult result)
116 		{
117 			if (PeekCompleted == null)
118 				return;
119 
120 			PeekCompleted (this, new CompletedEventArgs (result));
121 		}
122 
NullAsyncCallback(IAsyncResult result)123 		internal void NullAsyncCallback (IAsyncResult result)
124 		{
125 		}
126 
127 		internal class ThreadWaitHandle : WaitHandle {
128 
129 			private readonly Thread t;
130 
ThreadWaitHandle(Thread t)131 			public ThreadWaitHandle (Thread t)
132 			{
133 				this.t = t;
134 			}
135 
WaitOne()136 			public override bool WaitOne ()
137 			{
138 				t.Join ();
139 				return true;
140 			}
141 
WaitOne(Int32 timeout, bool exitContext)142 			public override bool WaitOne (Int32 timeout, bool exitContext)
143 			{
144 				t.Join (timeout);
145 				return true;
146 			}
147 
WaitOne(TimeSpan timeout, bool exitContext)148 			public override bool WaitOne (TimeSpan timeout, bool exitContext)
149 			{
150 				t.Join (timeout);
151 				return true;
152 			}
153 		}
154 
155 		internal abstract class AsyncResultBase : IAsyncResult {
156 
157 			private readonly object asyncState;
158 			protected readonly WaitHandle asyncWaitHandle;
159 			protected volatile bool isCompleted;
160 			protected readonly IMessageQueue q;
161 			private readonly Thread t;
162 			protected IMessage message;
163 			protected readonly TimeSpan timeout;
164 			protected readonly AsyncCallback callback;
165 			protected MonoMessagingException ex = null;
166 
AsyncResultBase(object asyncState, IMessageQueue q, TimeSpan timeout, AsyncCallback callback)167 			public AsyncResultBase (object asyncState,
168 			                        IMessageQueue q,
169 			                        TimeSpan timeout,
170 			                        AsyncCallback callback)
171 			{
172 				this.asyncState = asyncState;
173 				this.asyncWaitHandle = new Mutex (false);
174 				this.q = q;
175 				this.timeout = timeout;
176 				this.callback = callback;
177 				this.t = new Thread(run);
178 				t.Start ();
179 				asyncWaitHandle = new ThreadWaitHandle(t);
180 			}
181 
182 			public object AsyncState {
183 				get { return asyncState; }
184 			}
185 
186 			public WaitHandle AsyncWaitHandle {
187 				get { return asyncWaitHandle; }
188 			}
189 
190 			public bool CompletedSynchronously {
191 				get { return false; }
192 			}
193 
194 			public bool IsCompleted {
195 				get { return isCompleted; }
196 			}
197 
198 			internal IMessage Message {
199 				get {
200 					if (ex != null)
201 						throw new MonoMessagingException ("Asynchronous Wrapped Exception", ex);
202 
203 					return message;
204 				}
205 			}
206 
GetMessage()207 			protected abstract IMessage GetMessage ();
208 
SendCompletedEvent(IAsyncResult result)209 			protected abstract void SendCompletedEvent (IAsyncResult result);
210 
run()211 			private void run ()
212 			{
213 				try {
214 					message = GetMessage ();
215 					isCompleted = true;
216 					callback (this);
217 					SendCompletedEvent (this);
218 				} catch (MonoMessagingException ex) {
219 					this.ex = ex;
220 				}
221 			}
222 		}
223 
224 		internal class ReceiveAsyncResult : AsyncResultBase {
225 
ReceiveAsyncResult(object asyncState, IMessageQueue q, TimeSpan timeout, AsyncCallback callback)226 			public ReceiveAsyncResult (object asyncState,
227 			                           IMessageQueue q,
228 			                           TimeSpan timeout,
229 			                           AsyncCallback callback)
230 				: base (asyncState, q, timeout, callback)
231 			{
232 			}
233 
GetMessage()234 			protected override IMessage GetMessage ()
235 			{
236 				if (timeout == MessagingProviderLocator.InfiniteTimeout)
237 					return q.Receive ();
238 				else
239 					return q.Receive (timeout);
240 			}
241 
SendCompletedEvent(IAsyncResult result)242 			protected override void SendCompletedEvent (IAsyncResult result)
243 			{
244 				q.SendReceiveCompleted (result);
245 			}
246 		}
247 
248 		internal class PeekAsyncResult : AsyncResultBase {
249 
PeekAsyncResult(object asyncState, IMessageQueue q, TimeSpan timeout, AsyncCallback callback)250 			public PeekAsyncResult (object asyncState,
251 			                        IMessageQueue q,
252 			                        TimeSpan timeout,
253 			                        AsyncCallback callback)
254 				: base (asyncState, q, timeout, callback)
255 			{
256 			}
257 
SendCompletedEvent(IAsyncResult result)258 			protected override void SendCompletedEvent (IAsyncResult result)
259 			{
260 				Console.WriteLine ("Send Peek Completed");
261 				q.SendPeekCompleted (result);
262 			}
263 
GetMessage()264 			protected override IMessage GetMessage ()
265 			{
266 				if (timeout == MessagingProviderLocator.InfiniteTimeout)
267 					return q.Peek ();
268 				else
269 					return q.Peek (timeout);
270 			}
271 		}
272 	}
273 }
274