1 /* 2 * Copyright 2002-2012 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package org.springframework.jms.connection; 18 19 import java.lang.reflect.InvocationHandler; 20 import java.lang.reflect.InvocationTargetException; 21 import java.lang.reflect.Method; 22 import java.lang.reflect.Proxy; 23 import java.util.ArrayList; 24 import java.util.HashMap; 25 import java.util.Iterator; 26 import java.util.LinkedList; 27 import java.util.List; 28 import java.util.Map; 29 import javax.jms.Connection; 30 import javax.jms.ConnectionFactory; 31 import javax.jms.Destination; 32 import javax.jms.JMSException; 33 import javax.jms.MessageConsumer; 34 import javax.jms.MessageProducer; 35 import javax.jms.QueueSession; 36 import javax.jms.Session; 37 import javax.jms.TemporaryQueue; 38 import javax.jms.TemporaryTopic; 39 import javax.jms.Topic; 40 import javax.jms.TopicSession; 41 42 import org.springframework.util.Assert; 43 import org.springframework.util.ObjectUtils; 44 45 /** 46 * {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session} 47 * caching as well {@link javax.jms.MessageProducer} caching. This ConnectionFactory 48 * also switches the {@link #setReconnectOnException "reconnectOnException" property} 49 * to "true" by default, allowing for automatic recovery of the underlying Connection. 50 * 51 * <p>By default, only one single Session will be cached, with further requested 52 * Sessions being created and disposed on demand. Consider raising the 53 * {@link #setSessionCacheSize "sessionCacheSize" value} in case of a 54 * high-concurrency environment. 55 * 56 * <p><b>NOTE: This ConnectionFactory decorator requires JMS 1.1 or higher.</b> 57 * You may use it through the JMS 1.0.2 API; however, the target JMS driver 58 * needs to be compliant with JMS 1.1. 59 * 60 * <p>When using the JMS 1.0.2 API, this ConnectionFactory will switch 61 * into queue/topic mode according to the JMS API methods used at runtime: 62 * <code>createQueueConnection</code> and <code>createTopicConnection</code> will 63 * lead to queue/topic mode, respectively; generic <code>createConnection</code> 64 * calls will lead to a JMS 1.1 connection which is able to serve both modes. 65 * 66 * <p><b>NOTE: This ConnectionFactory requires explicit closing of all Sessions 67 * obtained from its shared Connection.</b> This is the usual recommendation for 68 * native JMS access code anyway. However, with this ConnectionFactory, its use 69 * is mandatory in order to actually allow for Session reuse. 70 * 71 * <p>Note also that MessageConsumers obtained from a cached Session won't get 72 * closed until the Session will eventually be removed from the pool. This may 73 * lead to semantic side effects in some cases. For a durable subscriber, the 74 * logical <code>Session.close()</code> call will also close the subscription. 75 * Re-registering a durable consumer for the same subscription on the same 76 * Session handle is not supported; close and reobtain a cached Session first. 77 * 78 * @author Juergen Hoeller 79 * @since 2.5.3 80 */ 81 public class CachingConnectionFactory extends SingleConnectionFactory { 82 83 private int sessionCacheSize = 1; 84 85 private boolean cacheProducers = true; 86 87 private boolean cacheConsumers = true; 88 89 private volatile boolean active = true; 90 91 private final Map<Integer, LinkedList<Session>> cachedSessions = 92 new HashMap<Integer, LinkedList<Session>>(); 93 94 95 /** 96 * Create a new CachingConnectionFactory for bean-style usage. 97 * @see #setTargetConnectionFactory 98 */ CachingConnectionFactory()99 public CachingConnectionFactory() { 100 super(); 101 setReconnectOnException(true); 102 } 103 104 /** 105 * Create a new CachingConnectionFactory for the given target 106 * ConnectionFactory. 107 * @param targetConnectionFactory the target ConnectionFactory 108 */ CachingConnectionFactory(ConnectionFactory targetConnectionFactory)109 public CachingConnectionFactory(ConnectionFactory targetConnectionFactory) { 110 super(targetConnectionFactory); 111 setReconnectOnException(true); 112 } 113 114 115 /** 116 * Specify the desired size for the JMS Session cache (per JMS Session type). 117 * <p>This cache size is the maximum limit for the number of cached Sessions 118 * per session acknowledgement type (auto, client, dups_ok, transacted). 119 * As a consequence, the actual number of cached Sessions may be up to 120 * four times as high as the specified value - in the unlikely case 121 * of mixing and matching different acknowledgement types. 122 * <p>Default is 1: caching a single Session, (re-)creating further ones on 123 * demand. Specify a number like 10 if you'd like to raise the number of cached 124 * Sessions; that said, 1 may be sufficient for low-concurrency scenarios. 125 * @see #setCacheProducers 126 */ setSessionCacheSize(int sessionCacheSize)127 public void setSessionCacheSize(int sessionCacheSize) { 128 Assert.isTrue(sessionCacheSize >= 1, "Session cache size must be 1 or higher"); 129 this.sessionCacheSize = sessionCacheSize; 130 } 131 132 /** 133 * Return the desired size for the JMS Session cache (per JMS Session type). 134 */ getSessionCacheSize()135 public int getSessionCacheSize() { 136 return this.sessionCacheSize; 137 } 138 139 /** 140 * Specify whether to cache JMS MessageProducers per JMS Session instance 141 * (more specifically: one MessageProducer per Destination and Session). 142 * <p>Default is "true". Switch this to "false" in order to always 143 * recreate MessageProducers on demand. 144 */ setCacheProducers(boolean cacheProducers)145 public void setCacheProducers(boolean cacheProducers) { 146 this.cacheProducers = cacheProducers; 147 } 148 149 /** 150 * Return whether to cache JMS MessageProducers per JMS Session instance. 151 */ isCacheProducers()152 public boolean isCacheProducers() { 153 return this.cacheProducers; 154 } 155 156 /** 157 * Specify whether to cache JMS MessageConsumers per JMS Session instance 158 * (more specifically: one MessageConsumer per Destination, selector String 159 * and Session). Note that durable subscribers will only be cached until 160 * logical closing of the Session handle. 161 * <p>Default is "true". Switch this to "false" in order to always 162 * recreate MessageConsumers on demand. 163 */ setCacheConsumers(boolean cacheConsumers)164 public void setCacheConsumers(boolean cacheConsumers) { 165 this.cacheConsumers = cacheConsumers; 166 } 167 168 /** 169 * Return whether to cache JMS MessageConsumers per JMS Session instance. 170 */ isCacheConsumers()171 public boolean isCacheConsumers() { 172 return this.cacheConsumers; 173 } 174 175 176 /** 177 * Resets the Session cache as well. 178 */ resetConnection()179 public void resetConnection() { 180 this.active = false; 181 synchronized (this.cachedSessions) { 182 for (LinkedList<Session> sessionList : this.cachedSessions.values()) { 183 synchronized (sessionList) { 184 for (Session session : sessionList) { 185 try { 186 session.close(); 187 } 188 catch (Throwable ex) { 189 logger.trace("Could not close cached JMS Session", ex); 190 } 191 } 192 } 193 } 194 this.cachedSessions.clear(); 195 } 196 this.active = true; 197 198 // Now proceed with actual closing of the shared Connection... 199 super.resetConnection(); 200 } 201 202 /** 203 * Checks for a cached Session for the given mode. 204 */ getSession(Connection con, Integer mode)205 protected Session getSession(Connection con, Integer mode) throws JMSException { 206 LinkedList<Session> sessionList; 207 synchronized (this.cachedSessions) { 208 sessionList = this.cachedSessions.get(mode); 209 if (sessionList == null) { 210 sessionList = new LinkedList<Session>(); 211 this.cachedSessions.put(mode, sessionList); 212 } 213 } 214 Session session = null; 215 synchronized (sessionList) { 216 if (!sessionList.isEmpty()) { 217 session = sessionList.removeFirst(); 218 } 219 } 220 if (session != null) { 221 if (logger.isTraceEnabled()) { 222 logger.trace("Found cached JMS Session for mode " + mode + ": " + 223 (session instanceof SessionProxy ? ((SessionProxy) session).getTargetSession() : session)); 224 } 225 } 226 else { 227 Session targetSession = createSession(con, mode); 228 if (logger.isDebugEnabled()) { 229 logger.debug("Creating cached JMS Session for mode " + mode + ": " + targetSession); 230 } 231 session = getCachedSessionProxy(targetSession, sessionList); 232 } 233 return session; 234 } 235 236 /** 237 * Wrap the given Session with a proxy that delegates every method call to it 238 * but adapts close calls. This is useful for allowing application code to 239 * handle a special framework Session just like an ordinary Session. 240 * @param target the original Session to wrap 241 * @param sessionList the List of cached Sessions that the given Session belongs to 242 * @return the wrapped Session 243 */ getCachedSessionProxy(Session target, LinkedList<Session> sessionList)244 protected Session getCachedSessionProxy(Session target, LinkedList<Session> sessionList) { 245 List<Class> classes = new ArrayList<Class>(3); 246 classes.add(SessionProxy.class); 247 if (target instanceof QueueSession) { 248 classes.add(QueueSession.class); 249 } 250 if (target instanceof TopicSession) { 251 classes.add(TopicSession.class); 252 } 253 return (Session) Proxy.newProxyInstance( 254 SessionProxy.class.getClassLoader(), 255 classes.toArray(new Class[classes.size()]), 256 new CachedSessionInvocationHandler(target, sessionList)); 257 } 258 259 260 /** 261 * Invocation handler for a cached JMS Session proxy. 262 */ 263 private class CachedSessionInvocationHandler implements InvocationHandler { 264 265 private final Session target; 266 267 private final LinkedList<Session> sessionList; 268 269 private final Map<DestinationCacheKey, MessageProducer> cachedProducers = 270 new HashMap<DestinationCacheKey, MessageProducer>(); 271 272 private final Map<ConsumerCacheKey, MessageConsumer> cachedConsumers = 273 new HashMap<ConsumerCacheKey, MessageConsumer>(); 274 275 private boolean transactionOpen = false; 276 CachedSessionInvocationHandler(Session target, LinkedList<Session> sessionList)277 public CachedSessionInvocationHandler(Session target, LinkedList<Session> sessionList) { 278 this.target = target; 279 this.sessionList = sessionList; 280 } 281 invoke(Object proxy, Method method, Object[] args)282 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 283 String methodName = method.getName(); 284 if (methodName.equals("equals")) { 285 // Only consider equal when proxies are identical. 286 return (proxy == args[0]); 287 } 288 else if (methodName.equals("hashCode")) { 289 // Use hashCode of Session proxy. 290 return System.identityHashCode(proxy); 291 } 292 else if (methodName.equals("toString")) { 293 return "Cached JMS Session: " + this.target; 294 } 295 else if (methodName.equals("close")) { 296 // Handle close method: don't pass the call on. 297 if (active) { 298 synchronized (this.sessionList) { 299 if (this.sessionList.size() < getSessionCacheSize()) { 300 logicalClose((Session) proxy); 301 // Remain open in the session list. 302 return null; 303 } 304 } 305 } 306 // If we get here, we're supposed to shut down. 307 physicalClose(); 308 return null; 309 } 310 else if (methodName.equals("getTargetSession")) { 311 // Handle getTargetSession method: return underlying Session. 312 return this.target; 313 } 314 else if (methodName.equals("commit") || methodName.equals("rollback")) { 315 this.transactionOpen = false; 316 } 317 else if (methodName.startsWith("create")) { 318 this.transactionOpen = true; 319 if (isCacheProducers() && (methodName.equals("createProducer") || 320 methodName.equals("createSender") || methodName.equals("createPublisher"))) { 321 // Destination argument being null is ok for a producer 322 return getCachedProducer((Destination) args[0]); 323 } 324 else if (isCacheConsumers()) { 325 // let raw JMS invocation throw an exception if Destination (i.e. args[0]) is null 326 if ((methodName.equals("createConsumer") || methodName.equals("createReceiver") || 327 methodName.equals("createSubscriber"))) { 328 Destination dest = (Destination) args[0]; 329 if (dest != null && !(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) { 330 return getCachedConsumer(dest, 331 (args.length > 1 ? (String) args[1] : null), 332 (args.length > 2 && (Boolean) args[2]), 333 null); 334 } 335 } 336 else if (methodName.equals("createDurableSubscriber")) { 337 Destination dest = (Destination) args[0]; 338 if (dest != null) { 339 return getCachedConsumer(dest, 340 (args.length > 2 ? (String) args[2] : null), 341 (args.length > 3 && (Boolean) args[3]), 342 (String) args[1]); 343 } 344 } 345 } 346 } 347 try { 348 return method.invoke(this.target, args); 349 } 350 catch (InvocationTargetException ex) { 351 throw ex.getTargetException(); 352 } 353 } 354 getCachedProducer(Destination dest)355 private MessageProducer getCachedProducer(Destination dest) throws JMSException { 356 DestinationCacheKey cacheKey = (dest != null ? new DestinationCacheKey(dest) : null); 357 MessageProducer producer = this.cachedProducers.get(cacheKey); 358 if (producer != null) { 359 if (logger.isTraceEnabled()) { 360 logger.trace("Found cached JMS MessageProducer for destination [" + dest + "]: " + producer); 361 } 362 } 363 else { 364 producer = this.target.createProducer(dest); 365 if (logger.isDebugEnabled()) { 366 logger.debug("Creating cached JMS MessageProducer for destination [" + dest + "]: " + producer); 367 } 368 this.cachedProducers.put(cacheKey, producer); 369 } 370 return new CachedMessageProducer(producer); 371 } 372 getCachedConsumer( Destination dest, String selector, boolean noLocal, String subscription)373 private MessageConsumer getCachedConsumer( 374 Destination dest, String selector, boolean noLocal, String subscription) throws JMSException { 375 376 ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription); 377 MessageConsumer consumer = this.cachedConsumers.get(cacheKey); 378 if (consumer != null) { 379 if (logger.isTraceEnabled()) { 380 logger.trace("Found cached JMS MessageConsumer for destination [" + dest + "]: " + consumer); 381 } 382 } 383 else { 384 if (dest instanceof Topic) { 385 consumer = (subscription != null ? 386 this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) : 387 this.target.createConsumer(dest, selector, noLocal)); 388 } 389 else { 390 consumer = this.target.createConsumer(dest, selector); 391 } 392 if (logger.isDebugEnabled()) { 393 logger.debug("Creating cached JMS MessageConsumer for destination [" + dest + "]: " + consumer); 394 } 395 this.cachedConsumers.put(cacheKey, consumer); 396 } 397 return new CachedMessageConsumer(consumer); 398 } 399 logicalClose(Session proxy)400 private void logicalClose(Session proxy) throws JMSException { 401 // Preserve rollback-on-close semantics. 402 if (this.transactionOpen && this.target.getTransacted()) { 403 this.transactionOpen = false; 404 this.target.rollback(); 405 } 406 // Physically close durable subscribers at time of Session close call. 407 for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) { 408 Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next(); 409 if (entry.getKey().subscription != null) { 410 entry.getValue().close(); 411 it.remove(); 412 } 413 } 414 // Allow for multiple close calls... 415 boolean returned = false; 416 synchronized (this.sessionList) { 417 if (!this.sessionList.contains(proxy)) { 418 this.sessionList.addLast(proxy); 419 returned = true; 420 } 421 } 422 if (returned && logger.isTraceEnabled()) { 423 logger.trace("Returned cached Session: " + this.target); 424 } 425 } 426 physicalClose()427 private void physicalClose() throws JMSException { 428 if (logger.isDebugEnabled()) { 429 logger.debug("Closing cached Session: " + this.target); 430 } 431 // Explicitly close all MessageProducers and MessageConsumers that 432 // this Session happens to cache... 433 try { 434 for (MessageProducer producer : this.cachedProducers.values()) { 435 producer.close(); 436 } 437 for (MessageConsumer consumer : this.cachedConsumers.values()) { 438 consumer.close(); 439 } 440 } 441 finally { 442 this.cachedProducers.clear(); 443 this.cachedConsumers.clear(); 444 // Now actually close the Session. 445 this.target.close(); 446 } 447 } 448 } 449 450 451 /** 452 * Simple wrapper class around a Destination reference. 453 * Used as the cache key when caching MessageProducer objects. 454 */ 455 private static class DestinationCacheKey { 456 457 private final Destination destination; 458 459 private String destinationString; 460 DestinationCacheKey(Destination destination)461 public DestinationCacheKey(Destination destination) { 462 Assert.notNull(destination, "Destination must not be null"); 463 this.destination = destination; 464 } 465 getDestinationString()466 private String getDestinationString() { 467 if (this.destinationString == null) { 468 this.destinationString = this.destination.toString(); 469 } 470 return this.destinationString; 471 } 472 destinationEquals(DestinationCacheKey otherKey)473 protected boolean destinationEquals(DestinationCacheKey otherKey) { 474 return (this.destination.getClass().equals(otherKey.destination.getClass()) && 475 (this.destination.equals(otherKey.destination) || 476 getDestinationString().equals(otherKey.getDestinationString()))); 477 } 478 equals(Object other)479 public boolean equals(Object other) { 480 // Effectively checking object equality as well as toString equality. 481 // On WebSphere MQ, Destination objects do not implement equals... 482 return (other == this || destinationEquals((DestinationCacheKey) other)); 483 } 484 hashCode()485 public int hashCode() { 486 // Can't use a more specific hashCode since we can't rely on 487 // this.destination.hashCode() actually being the same value 488 // for equivalent destinations... Thanks a lot, WebSphere MQ! 489 return this.destination.getClass().hashCode(); 490 } 491 } 492 493 494 /** 495 * Simple wrapper class around a Destination and other consumer attributes. 496 * Used as the cache key when caching MessageConsumer objects. 497 */ 498 private static class ConsumerCacheKey extends DestinationCacheKey { 499 500 private final String selector; 501 502 private final boolean noLocal; 503 504 private final String subscription; 505 ConsumerCacheKey(Destination destination, String selector, boolean noLocal, String subscription)506 public ConsumerCacheKey(Destination destination, String selector, boolean noLocal, String subscription) { 507 super(destination); 508 this.selector = selector; 509 this.noLocal = noLocal; 510 this.subscription = subscription; 511 } 512 equals(Object other)513 public boolean equals(Object other) { 514 if (other == this) { 515 return true; 516 } 517 ConsumerCacheKey otherKey = (ConsumerCacheKey) other; 518 return (destinationEquals(otherKey) && 519 ObjectUtils.nullSafeEquals(this.selector, otherKey.selector) && 520 this.noLocal == otherKey.noLocal && 521 ObjectUtils.nullSafeEquals(this.subscription, otherKey.subscription)); 522 } 523 } 524 525 } 526