1 // 2 // Mono.Messaging 3 // 4 // Authors: 5 // Michael Barker (mike@middlesoft.co.uk) 6 // 7 // (C) 2008 Michael Barker 8 // 9 10 // 11 // Permission is hereby granted, free of charge, to any person obtaining 12 // a copy of this software and associated documentation files (the 13 // "Software"), to deal in the Software without restriction, including 14 // without limitation the rights to use, copy, modify, merge, publish, 15 // distribute, sublicense, and/or sell copies of the Software, and to 16 // permit persons to whom the Software is furnished to do so, subject to 17 // the following conditions: 18 // 19 // The above copyright notice and this permission notice shall be 20 // included in all copies or substantial portions of the Software. 21 // 22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 29 // 30 31 using System; 32 using System.Threading; 33 using System.Collections; 34 35 namespace Mono.Messaging 36 { 37 38 public abstract class MessageQueueBase 39 { 40 protected abstract IMessageQueue Queue { 41 get; 42 } 43 44 public event CompletedEventHandler PeekCompleted; 45 46 public event CompletedEventHandler ReceiveCompleted; 47 BeginPeek()48 public IAsyncResult BeginPeek () 49 { 50 return new PeekAsyncResult (null, Queue, MessagingProviderLocator.InfiniteTimeout, 51 NullAsyncCallback); 52 } 53 BeginPeek(TimeSpan timeout)54 public IAsyncResult BeginPeek (TimeSpan timeout) 55 { 56 return new PeekAsyncResult (null, Queue, timeout, NullAsyncCallback); 57 } 58 BeginPeek(TimeSpan timeout, object stateObject)59 public IAsyncResult BeginPeek (TimeSpan timeout, object stateObject) 60 { 61 return new PeekAsyncResult (stateObject, Queue, 62 timeout, NullAsyncCallback); 63 } 64 BeginPeek(TimeSpan timeout, object stateObject, AsyncCallback callback)65 public IAsyncResult BeginPeek (TimeSpan timeout, 66 object stateObject, 67 AsyncCallback callback) 68 { 69 return new PeekAsyncResult (stateObject, Queue, timeout, callback); 70 } 71 EndPeek(IAsyncResult asyncResult)72 public IMessage EndPeek (IAsyncResult asyncResult) 73 { 74 PeekAsyncResult result = (PeekAsyncResult) asyncResult; 75 return result.Message; 76 } 77 BeginReceive()78 public IAsyncResult BeginReceive () 79 { 80 return new ReceiveAsyncResult (null, Queue, MessagingProviderLocator.InfiniteTimeout, 81 NullAsyncCallback); 82 } 83 BeginReceive(TimeSpan timeout)84 public IAsyncResult BeginReceive (TimeSpan timeout) 85 { 86 return new ReceiveAsyncResult (null, Queue, timeout, NullAsyncCallback); 87 } 88 BeginReceive(TimeSpan timeout, object stateObject)89 public IAsyncResult BeginReceive (TimeSpan timeout, object stateObject) 90 { 91 return new ReceiveAsyncResult (stateObject, Queue, timeout, NullAsyncCallback); 92 } 93 BeginReceive(TimeSpan timeout, object stateObject, AsyncCallback callback)94 public IAsyncResult BeginReceive (TimeSpan timeout, 95 object stateObject, 96 AsyncCallback callback) 97 { 98 return new ReceiveAsyncResult (stateObject, Queue, timeout, callback); 99 } 100 EndReceive(IAsyncResult asyncResult)101 public IMessage EndReceive (IAsyncResult asyncResult) 102 { 103 ReceiveAsyncResult result = (ReceiveAsyncResult) asyncResult; 104 return result.Message; 105 } 106 SendReceiveCompleted(IAsyncResult result)107 public void SendReceiveCompleted (IAsyncResult result) 108 { 109 if (ReceiveCompleted == null) 110 return; 111 112 ReceiveCompleted (this, new CompletedEventArgs (result)); 113 } 114 SendPeekCompleted(IAsyncResult result)115 public void SendPeekCompleted (IAsyncResult result) 116 { 117 if (PeekCompleted == null) 118 return; 119 120 PeekCompleted (this, new CompletedEventArgs (result)); 121 } 122 NullAsyncCallback(IAsyncResult result)123 internal void NullAsyncCallback (IAsyncResult result) 124 { 125 } 126 127 internal class ThreadWaitHandle : WaitHandle { 128 129 private readonly Thread t; 130 ThreadWaitHandle(Thread t)131 public ThreadWaitHandle (Thread t) 132 { 133 this.t = t; 134 } 135 WaitOne()136 public override bool WaitOne () 137 { 138 t.Join (); 139 return true; 140 } 141 WaitOne(Int32 timeout, bool exitContext)142 public override bool WaitOne (Int32 timeout, bool exitContext) 143 { 144 t.Join (timeout); 145 return true; 146 } 147 WaitOne(TimeSpan timeout, bool exitContext)148 public override bool WaitOne (TimeSpan timeout, bool exitContext) 149 { 150 t.Join (timeout); 151 return true; 152 } 153 } 154 155 internal abstract class AsyncResultBase : IAsyncResult { 156 157 private readonly object asyncState; 158 protected readonly WaitHandle asyncWaitHandle; 159 protected volatile bool isCompleted; 160 protected readonly IMessageQueue q; 161 private readonly Thread t; 162 protected IMessage message; 163 protected readonly TimeSpan timeout; 164 protected readonly AsyncCallback callback; 165 protected MonoMessagingException ex = null; 166 AsyncResultBase(object asyncState, IMessageQueue q, TimeSpan timeout, AsyncCallback callback)167 public AsyncResultBase (object asyncState, 168 IMessageQueue q, 169 TimeSpan timeout, 170 AsyncCallback callback) 171 { 172 this.asyncState = asyncState; 173 this.asyncWaitHandle = new Mutex (false); 174 this.q = q; 175 this.timeout = timeout; 176 this.callback = callback; 177 this.t = new Thread(run); 178 t.Start (); 179 asyncWaitHandle = new ThreadWaitHandle(t); 180 } 181 182 public object AsyncState { 183 get { return asyncState; } 184 } 185 186 public WaitHandle AsyncWaitHandle { 187 get { return asyncWaitHandle; } 188 } 189 190 public bool CompletedSynchronously { 191 get { return false; } 192 } 193 194 public bool IsCompleted { 195 get { return isCompleted; } 196 } 197 198 internal IMessage Message { 199 get { 200 if (ex != null) 201 throw new MonoMessagingException ("Asynchronous Wrapped Exception", ex); 202 203 return message; 204 } 205 } 206 GetMessage()207 protected abstract IMessage GetMessage (); 208 SendCompletedEvent(IAsyncResult result)209 protected abstract void SendCompletedEvent (IAsyncResult result); 210 run()211 private void run () 212 { 213 try { 214 message = GetMessage (); 215 isCompleted = true; 216 callback (this); 217 SendCompletedEvent (this); 218 } catch (MonoMessagingException ex) { 219 this.ex = ex; 220 } 221 } 222 } 223 224 internal class ReceiveAsyncResult : AsyncResultBase { 225 ReceiveAsyncResult(object asyncState, IMessageQueue q, TimeSpan timeout, AsyncCallback callback)226 public ReceiveAsyncResult (object asyncState, 227 IMessageQueue q, 228 TimeSpan timeout, 229 AsyncCallback callback) 230 : base (asyncState, q, timeout, callback) 231 { 232 } 233 GetMessage()234 protected override IMessage GetMessage () 235 { 236 if (timeout == MessagingProviderLocator.InfiniteTimeout) 237 return q.Receive (); 238 else 239 return q.Receive (timeout); 240 } 241 SendCompletedEvent(IAsyncResult result)242 protected override void SendCompletedEvent (IAsyncResult result) 243 { 244 q.SendReceiveCompleted (result); 245 } 246 } 247 248 internal class PeekAsyncResult : AsyncResultBase { 249 PeekAsyncResult(object asyncState, IMessageQueue q, TimeSpan timeout, AsyncCallback callback)250 public PeekAsyncResult (object asyncState, 251 IMessageQueue q, 252 TimeSpan timeout, 253 AsyncCallback callback) 254 : base (asyncState, q, timeout, callback) 255 { 256 } 257 SendCompletedEvent(IAsyncResult result)258 protected override void SendCompletedEvent (IAsyncResult result) 259 { 260 Console.WriteLine ("Send Peek Completed"); 261 q.SendPeekCompleted (result); 262 } 263 GetMessage()264 protected override IMessage GetMessage () 265 { 266 if (timeout == MessagingProviderLocator.InfiniteTimeout) 267 return q.Peek (); 268 else 269 return q.Peek (timeout); 270 } 271 } 272 } 273 } 274