1 /*
2  * Copyright 2002-2012 the original author or authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package org.springframework.jms.connection;
18 
19 import java.lang.reflect.InvocationHandler;
20 import java.lang.reflect.InvocationTargetException;
21 import java.lang.reflect.Method;
22 import java.lang.reflect.Proxy;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29 import javax.jms.Connection;
30 import javax.jms.ConnectionFactory;
31 import javax.jms.Destination;
32 import javax.jms.JMSException;
33 import javax.jms.MessageConsumer;
34 import javax.jms.MessageProducer;
35 import javax.jms.QueueSession;
36 import javax.jms.Session;
37 import javax.jms.TemporaryQueue;
38 import javax.jms.TemporaryTopic;
39 import javax.jms.Topic;
40 import javax.jms.TopicSession;
41 
42 import org.springframework.util.Assert;
43 import org.springframework.util.ObjectUtils;
44 
45 /**
46  * {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session}
47  * caching as well {@link javax.jms.MessageProducer} caching. This ConnectionFactory
48  * also switches the {@link #setReconnectOnException "reconnectOnException" property}
49  * to "true" by default, allowing for automatic recovery of the underlying Connection.
50  *
51  * <p>By default, only one single Session will be cached, with further requested
52  * Sessions being created and disposed on demand. Consider raising the
53  * {@link #setSessionCacheSize "sessionCacheSize" value} in case of a
54  * high-concurrency environment.
55  *
56  * <p><b>NOTE: This ConnectionFactory decorator requires JMS 1.1 or higher.</b>
57  * You may use it through the JMS 1.0.2 API; however, the target JMS driver
58  * needs to be compliant with JMS 1.1.
59  *
60  * <p>When using the JMS 1.0.2 API, this ConnectionFactory will switch
61  * into queue/topic mode according to the JMS API methods used at runtime:
62  * <code>createQueueConnection</code> and <code>createTopicConnection</code> will
63  * lead to queue/topic mode, respectively; generic <code>createConnection</code>
64  * calls will lead to a JMS 1.1 connection which is able to serve both modes.
65  *
66  * <p><b>NOTE: This ConnectionFactory requires explicit closing of all Sessions
67  * obtained from its shared Connection.</b> This is the usual recommendation for
68  * native JMS access code anyway. However, with this ConnectionFactory, its use
69  * is mandatory in order to actually allow for Session reuse.
70  *
71  * <p>Note also that MessageConsumers obtained from a cached Session won't get
72  * closed until the Session will eventually be removed from the pool. This may
73  * lead to semantic side effects in some cases. For a durable subscriber, the
74  * logical <code>Session.close()</code> call will also close the subscription.
75  * Re-registering a durable consumer for the same subscription on the same
76  * Session handle is not supported; close and reobtain a cached Session first.
77  *
78  * @author Juergen Hoeller
79  * @since 2.5.3
80  */
81 public class CachingConnectionFactory extends SingleConnectionFactory {
82 
83 	private int sessionCacheSize = 1;
84 
85 	private boolean cacheProducers = true;
86 
87 	private boolean cacheConsumers = true;
88 
89 	private volatile boolean active = true;
90 
91 	private final Map<Integer, LinkedList<Session>> cachedSessions =
92 			new HashMap<Integer, LinkedList<Session>>();
93 
94 
95 	/**
96 	 * Create a new CachingConnectionFactory for bean-style usage.
97 	 * @see #setTargetConnectionFactory
98 	 */
CachingConnectionFactory()99 	public CachingConnectionFactory() {
100 		super();
101 		setReconnectOnException(true);
102 	}
103 
104 	/**
105 	 * Create a new CachingConnectionFactory for the given target
106 	 * ConnectionFactory.
107 	 * @param targetConnectionFactory the target ConnectionFactory
108 	 */
CachingConnectionFactory(ConnectionFactory targetConnectionFactory)109 	public CachingConnectionFactory(ConnectionFactory targetConnectionFactory) {
110 		super(targetConnectionFactory);
111 		setReconnectOnException(true);
112 	}
113 
114 
115 	/**
116 	 * Specify the desired size for the JMS Session cache (per JMS Session type).
117 	 * <p>This cache size is the maximum limit for the number of cached Sessions
118 	 * per session acknowledgement type (auto, client, dups_ok, transacted).
119 	 * As a consequence, the actual number of cached Sessions may be up to
120 	 * four times as high as the specified value - in the unlikely case
121 	 * of mixing and matching different acknowledgement types.
122 	 * <p>Default is 1: caching a single Session, (re-)creating further ones on
123 	 * demand. Specify a number like 10 if you'd like to raise the number of cached
124 	 * Sessions; that said, 1 may be sufficient for low-concurrency scenarios.
125 	 * @see #setCacheProducers
126 	 */
setSessionCacheSize(int sessionCacheSize)127 	public void setSessionCacheSize(int sessionCacheSize) {
128 		Assert.isTrue(sessionCacheSize >= 1, "Session cache size must be 1 or higher");
129 		this.sessionCacheSize = sessionCacheSize;
130 	}
131 
132 	/**
133 	 * Return the desired size for the JMS Session cache (per JMS Session type).
134 	 */
getSessionCacheSize()135 	public int getSessionCacheSize() {
136 		return this.sessionCacheSize;
137 	}
138 
139 	/**
140 	 * Specify whether to cache JMS MessageProducers per JMS Session instance
141 	 * (more specifically: one MessageProducer per Destination and Session).
142 	 * <p>Default is "true". Switch this to "false" in order to always
143 	 * recreate MessageProducers on demand.
144 	 */
setCacheProducers(boolean cacheProducers)145 	public void setCacheProducers(boolean cacheProducers) {
146 		this.cacheProducers = cacheProducers;
147 	}
148 
149 	/**
150 	 * Return whether to cache JMS MessageProducers per JMS Session instance.
151 	 */
isCacheProducers()152 	public boolean isCacheProducers() {
153 		return this.cacheProducers;
154 	}
155 
156 	/**
157 	 * Specify whether to cache JMS MessageConsumers per JMS Session instance
158 	 * (more specifically: one MessageConsumer per Destination, selector String
159 	 * and Session). Note that durable subscribers will only be cached until
160 	 * logical closing of the Session handle.
161 	 * <p>Default is "true". Switch this to "false" in order to always
162 	 * recreate MessageConsumers on demand.
163 	 */
setCacheConsumers(boolean cacheConsumers)164 	public void setCacheConsumers(boolean cacheConsumers) {
165 		this.cacheConsumers = cacheConsumers;
166 	}
167 
168 	/**
169 	 * Return whether to cache JMS MessageConsumers per JMS Session instance.
170 	 */
isCacheConsumers()171 	public boolean isCacheConsumers() {
172 		return this.cacheConsumers;
173 	}
174 
175 
176 	/**
177 	 * Resets the Session cache as well.
178 	 */
resetConnection()179 	public void resetConnection() {
180 		this.active = false;
181 		synchronized (this.cachedSessions) {
182 			for (LinkedList<Session> sessionList : this.cachedSessions.values()) {
183 				synchronized (sessionList) {
184 					for (Session session : sessionList) {
185 						try {
186 							session.close();
187 						}
188 						catch (Throwable ex) {
189 							logger.trace("Could not close cached JMS Session", ex);
190 						}
191 					}
192 				}
193 			}
194 			this.cachedSessions.clear();
195 		}
196 		this.active = true;
197 
198 		// Now proceed with actual closing of the shared Connection...
199 		super.resetConnection();
200 	}
201 
202 	/**
203 	 * Checks for a cached Session for the given mode.
204 	 */
getSession(Connection con, Integer mode)205 	protected Session getSession(Connection con, Integer mode) throws JMSException {
206 		LinkedList<Session> sessionList;
207 		synchronized (this.cachedSessions) {
208 			sessionList = this.cachedSessions.get(mode);
209 			if (sessionList == null) {
210 				sessionList = new LinkedList<Session>();
211 				this.cachedSessions.put(mode, sessionList);
212 			}
213 		}
214 		Session session = null;
215 		synchronized (sessionList) {
216 			if (!sessionList.isEmpty()) {
217 				session = sessionList.removeFirst();
218 			}
219 		}
220 		if (session != null) {
221 			if (logger.isTraceEnabled()) {
222 				logger.trace("Found cached JMS Session for mode " + mode + ": " +
223 						(session instanceof SessionProxy ? ((SessionProxy) session).getTargetSession() : session));
224 			}
225 		}
226 		else {
227 			Session targetSession = createSession(con, mode);
228 			if (logger.isDebugEnabled()) {
229 				logger.debug("Creating cached JMS Session for mode " + mode + ": " + targetSession);
230 			}
231 			session = getCachedSessionProxy(targetSession, sessionList);
232 		}
233 		return session;
234 	}
235 
236 	/**
237 	 * Wrap the given Session with a proxy that delegates every method call to it
238 	 * but adapts close calls. This is useful for allowing application code to
239 	 * handle a special framework Session just like an ordinary Session.
240 	 * @param target the original Session to wrap
241 	 * @param sessionList the List of cached Sessions that the given Session belongs to
242 	 * @return the wrapped Session
243 	 */
getCachedSessionProxy(Session target, LinkedList<Session> sessionList)244 	protected Session getCachedSessionProxy(Session target, LinkedList<Session> sessionList) {
245 		List<Class> classes = new ArrayList<Class>(3);
246 		classes.add(SessionProxy.class);
247 		if (target instanceof QueueSession) {
248 			classes.add(QueueSession.class);
249 		}
250 		if (target instanceof TopicSession) {
251 			classes.add(TopicSession.class);
252 		}
253 		return (Session) Proxy.newProxyInstance(
254 				SessionProxy.class.getClassLoader(),
255 				classes.toArray(new Class[classes.size()]),
256 				new CachedSessionInvocationHandler(target, sessionList));
257 	}
258 
259 
260 	/**
261 	 * Invocation handler for a cached JMS Session proxy.
262 	 */
263 	private class CachedSessionInvocationHandler implements InvocationHandler {
264 
265 		private final Session target;
266 
267 		private final LinkedList<Session> sessionList;
268 
269 		private final Map<DestinationCacheKey, MessageProducer> cachedProducers =
270 				new HashMap<DestinationCacheKey, MessageProducer>();
271 
272 		private final Map<ConsumerCacheKey, MessageConsumer> cachedConsumers =
273 				new HashMap<ConsumerCacheKey, MessageConsumer>();
274 
275 		private boolean transactionOpen = false;
276 
CachedSessionInvocationHandler(Session target, LinkedList<Session> sessionList)277 		public CachedSessionInvocationHandler(Session target, LinkedList<Session> sessionList) {
278 			this.target = target;
279 			this.sessionList = sessionList;
280 		}
281 
invoke(Object proxy, Method method, Object[] args)282 		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
283 			String methodName = method.getName();
284 			if (methodName.equals("equals")) {
285 				// Only consider equal when proxies are identical.
286 				return (proxy == args[0]);
287 			}
288 			else if (methodName.equals("hashCode")) {
289 				// Use hashCode of Session proxy.
290 				return System.identityHashCode(proxy);
291 			}
292 			else if (methodName.equals("toString")) {
293 				return "Cached JMS Session: " + this.target;
294 			}
295 			else if (methodName.equals("close")) {
296 				// Handle close method: don't pass the call on.
297 				if (active) {
298 					synchronized (this.sessionList) {
299 						if (this.sessionList.size() < getSessionCacheSize()) {
300 							logicalClose((Session) proxy);
301 							// Remain open in the session list.
302 							return null;
303 						}
304 					}
305 				}
306 				// If we get here, we're supposed to shut down.
307 				physicalClose();
308 				return null;
309 			}
310 			else if (methodName.equals("getTargetSession")) {
311 				// Handle getTargetSession method: return underlying Session.
312 				return this.target;
313 			}
314 			else if (methodName.equals("commit") || methodName.equals("rollback")) {
315 				this.transactionOpen = false;
316 			}
317 			else if (methodName.startsWith("create")) {
318 				this.transactionOpen = true;
319 				if (isCacheProducers() && (methodName.equals("createProducer") ||
320 						methodName.equals("createSender") || methodName.equals("createPublisher"))) {
321 					// Destination argument being null is ok for a producer
322 					return getCachedProducer((Destination) args[0]);
323 				}
324 				else if (isCacheConsumers()) {
325 					// let raw JMS invocation throw an exception if Destination (i.e. args[0]) is null
326 					if ((methodName.equals("createConsumer") || methodName.equals("createReceiver") ||
327 							methodName.equals("createSubscriber"))) {
328 						Destination dest = (Destination) args[0];
329 						if (dest != null && !(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) {
330 							return getCachedConsumer(dest,
331 									(args.length > 1 ? (String) args[1] : null),
332 									(args.length > 2 && (Boolean) args[2]),
333 									null);
334 						}
335 					}
336 					else if (methodName.equals("createDurableSubscriber")) {
337 						Destination dest = (Destination) args[0];
338 						if (dest != null) {
339 							return getCachedConsumer(dest,
340 									(args.length > 2 ? (String) args[2] : null),
341 									(args.length > 3 && (Boolean) args[3]),
342 									(String) args[1]);
343 						}
344 					}
345 				}
346 			}
347 			try {
348 				return method.invoke(this.target, args);
349 			}
350 			catch (InvocationTargetException ex) {
351 				throw ex.getTargetException();
352 			}
353 		}
354 
getCachedProducer(Destination dest)355 		private MessageProducer getCachedProducer(Destination dest) throws JMSException {
356 			DestinationCacheKey cacheKey = (dest != null ? new DestinationCacheKey(dest) : null);
357 			MessageProducer producer = this.cachedProducers.get(cacheKey);
358 			if (producer != null) {
359 				if (logger.isTraceEnabled()) {
360 					logger.trace("Found cached JMS MessageProducer for destination [" + dest + "]: " + producer);
361 				}
362 			}
363 			else {
364 				producer = this.target.createProducer(dest);
365 				if (logger.isDebugEnabled()) {
366 					logger.debug("Creating cached JMS MessageProducer for destination [" + dest + "]: " + producer);
367 				}
368 				this.cachedProducers.put(cacheKey, producer);
369 			}
370 			return new CachedMessageProducer(producer);
371 		}
372 
getCachedConsumer( Destination dest, String selector, boolean noLocal, String subscription)373 		private MessageConsumer getCachedConsumer(
374 				Destination dest, String selector, boolean noLocal, String subscription) throws JMSException {
375 
376 			ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription);
377 			MessageConsumer consumer = this.cachedConsumers.get(cacheKey);
378 			if (consumer != null) {
379 				if (logger.isTraceEnabled()) {
380 					logger.trace("Found cached JMS MessageConsumer for destination [" + dest + "]: " + consumer);
381 				}
382 			}
383 			else {
384 				if (dest instanceof Topic) {
385 					consumer = (subscription != null ?
386 							this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) :
387 							this.target.createConsumer(dest, selector, noLocal));
388 				}
389 				else {
390 					consumer = this.target.createConsumer(dest, selector);
391 				}
392 				if (logger.isDebugEnabled()) {
393 					logger.debug("Creating cached JMS MessageConsumer for destination [" + dest + "]: " + consumer);
394 				}
395 				this.cachedConsumers.put(cacheKey, consumer);
396 			}
397 			return new CachedMessageConsumer(consumer);
398 		}
399 
logicalClose(Session proxy)400 		private void logicalClose(Session proxy) throws JMSException {
401 			// Preserve rollback-on-close semantics.
402 			if (this.transactionOpen && this.target.getTransacted()) {
403 				this.transactionOpen = false;
404 				this.target.rollback();
405 			}
406 			// Physically close durable subscribers at time of Session close call.
407 			for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) {
408 				Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next();
409 				if (entry.getKey().subscription != null) {
410 					entry.getValue().close();
411 					it.remove();
412 				}
413 			}
414 			// Allow for multiple close calls...
415 			boolean returned = false;
416 			synchronized (this.sessionList) {
417 				if (!this.sessionList.contains(proxy)) {
418 					this.sessionList.addLast(proxy);
419 					returned = true;
420 				}
421 			}
422 			if (returned && logger.isTraceEnabled()) {
423 				logger.trace("Returned cached Session: " + this.target);
424 			}
425 		}
426 
physicalClose()427 		private void physicalClose() throws JMSException {
428 			if (logger.isDebugEnabled()) {
429 				logger.debug("Closing cached Session: " + this.target);
430 			}
431 			// Explicitly close all MessageProducers and MessageConsumers that
432 			// this Session happens to cache...
433 			try {
434 				for (MessageProducer producer : this.cachedProducers.values()) {
435 					producer.close();
436 				}
437 				for (MessageConsumer consumer : this.cachedConsumers.values()) {
438 					consumer.close();
439 				}
440 			}
441 			finally {
442 				this.cachedProducers.clear();
443 				this.cachedConsumers.clear();
444 				// Now actually close the Session.
445 				this.target.close();
446 			}
447 		}
448 	}
449 
450 
451 	/**
452 	 * Simple wrapper class around a Destination reference.
453 	 * Used as the cache key when caching MessageProducer objects.
454 	 */
455 	private static class DestinationCacheKey {
456 
457 		private final Destination destination;
458 
459 		private String destinationString;
460 
DestinationCacheKey(Destination destination)461 		public DestinationCacheKey(Destination destination) {
462 			Assert.notNull(destination, "Destination must not be null");
463 			this.destination = destination;
464 		}
465 
getDestinationString()466 		private String getDestinationString() {
467 			if (this.destinationString == null) {
468 				this.destinationString = this.destination.toString();
469 			}
470 			return this.destinationString;
471 		}
472 
destinationEquals(DestinationCacheKey otherKey)473 		protected boolean destinationEquals(DestinationCacheKey otherKey) {
474 			return (this.destination.getClass().equals(otherKey.destination.getClass()) &&
475 					(this.destination.equals(otherKey.destination) ||
476 							getDestinationString().equals(otherKey.getDestinationString())));
477 		}
478 
equals(Object other)479 		public boolean equals(Object other) {
480 			// Effectively checking object equality as well as toString equality.
481 			// On WebSphere MQ, Destination objects do not implement equals...
482 			return (other == this || destinationEquals((DestinationCacheKey) other));
483 		}
484 
hashCode()485 		public int hashCode() {
486 			// Can't use a more specific hashCode since we can't rely on
487 			// this.destination.hashCode() actually being the same value
488 			// for equivalent destinations... Thanks a lot, WebSphere MQ!
489 			return this.destination.getClass().hashCode();
490 		}
491 	}
492 
493 
494 	/**
495 	 * Simple wrapper class around a Destination and other consumer attributes.
496 	 * Used as the cache key when caching MessageConsumer objects.
497 	 */
498 	private static class ConsumerCacheKey extends DestinationCacheKey {
499 
500 		private final String selector;
501 
502 		private final boolean noLocal;
503 
504 		private final String subscription;
505 
ConsumerCacheKey(Destination destination, String selector, boolean noLocal, String subscription)506 		public ConsumerCacheKey(Destination destination, String selector, boolean noLocal, String subscription) {
507 			super(destination);
508 			this.selector = selector;
509 			this.noLocal = noLocal;
510 			this.subscription = subscription;
511 		}
512 
equals(Object other)513 		public boolean equals(Object other) {
514 			if (other == this) {
515 				return true;
516 			}
517 			ConsumerCacheKey otherKey = (ConsumerCacheKey) other;
518 			return (destinationEquals(otherKey) &&
519 					ObjectUtils.nullSafeEquals(this.selector, otherKey.selector) &&
520 					this.noLocal == otherKey.noLocal &&
521 					ObjectUtils.nullSafeEquals(this.subscription, otherKey.subscription));
522 		}
523 	}
524 
525 }
526