1 /* 2 * Copyright (c) 2000, 2017 Oracle and/or its affiliates. All rights reserved. 3 * 4 * This program and the accompanying materials are made available under the 5 * terms of the Eclipse Distribution License v. 1.0, which is available at 6 * http://www.eclipse.org/org/documents/edl-v10.php. 7 * 8 * SPDX-License-Identifier: BSD-3-Clause 9 */ 10 11 import javax.jms.*; 12 13 /** 14 * The AckEquivExample class shows how the following two scenarios both ensure 15 * that a message will not be acknowledged until processing of it is complete: 16 * <ul> 17 * <li> Using an asynchronous consumer (message listener) in an 18 * AUTO_ACKNOWLEDGE session 19 * <li> Using a synchronous consumer in a CLIENT_ACKNOWLEDGE session 20 * </ul> 21 * <p> 22 * With a message listener, the automatic acknowledgment happens when the 23 * onMessage method returns -- that is, after message processing has finished. 24 * <p> 25 * With a synchronous receive, the client acknowledges the message after 26 * processing is complete. (If you use AUTO_ACKNOWLEDGE with a synchronous 27 * receive, the acknowledgement happens immediately after the receive call; if 28 * any subsequent processing steps fail, the message cannot be redelivered.) 29 * <p> 30 * The program contains a SynchProducer class, a SynchConsumer class, an 31 * AsynchSubscriber class with a TextListener class, a MultiplePublisher class, 32 * a main method, and a method that runs the other classes' threads. 33 * <p> 34 * Specify a queue name and a topic name on the command line when you run the 35 * program. The program also uses a queue named "controlQueue", which should be 36 * created before you run the program. 37 */ 38 public class AckEquivExample { 39 final String CONTROL_QUEUE = "controlQueue"; 40 String queueName = null; 41 String topicName = null; 42 int exitResult = 0; 43 44 /** 45 * The SynchProducer class creates a session in CLIENT_ACKNOWLEDGE mode and 46 * sends a message. 47 */ 48 public class SynchProducer extends Thread { 49 50 /** 51 * Runs the thread. 52 */ run()53 public void run() { 54 ConnectionFactory connectionFactory = null; 55 Connection connection = null; 56 Session session = null; 57 Queue queue = null; 58 MessageProducer msgProducer = null; 59 final String MSG_TEXT = 60 new String("Here is a client-acknowledge message"); 61 TextMessage message = null; 62 63 try { 64 connectionFactory = 65 SampleUtilities.getConnectionFactory(); 66 connection = 67 connectionFactory.createConnection(); 68 session = connection.createSession(false, 69 Session.CLIENT_ACKNOWLEDGE); 70 queue = SampleUtilities.getQueue(queueName, session); 71 } catch (Exception e) { 72 System.out.println("Connection problem: " + e.toString()); 73 if (connection != null) { 74 try { 75 connection.close(); 76 } catch (JMSException ee) {} 77 } 78 System.exit(1); 79 } 80 81 /* 82 * Create client-acknowledge producer. 83 * Create and send message. 84 */ 85 try { 86 System.out.println("PRODUCER: Created client-acknowledge session"); 87 msgProducer = session.createProducer(queue); 88 message = session.createTextMessage(); 89 message.setText(MSG_TEXT); 90 System.out.println("PRODUCER: Sending message: " 91 + message.getText()); 92 msgProducer.send(message); 93 } catch (JMSException e) { 94 System.out.println("Exception occurred: " + e.toString()); 95 exitResult = 1; 96 } finally { 97 if (connection != null) { 98 try { 99 connection.close(); 100 } catch (JMSException e) { 101 exitResult = 1; 102 } 103 } 104 } 105 } 106 } 107 108 /** 109 * The SynchConsumer class creates a session in CLIENT_ACKNOWLEDGE mode and 110 * receives the message sent by the SynchProducer class. 111 */ 112 public class SynchConsumer extends Thread { 113 114 /** 115 * Runs the thread. 116 */ run()117 public void run() { 118 ConnectionFactory connectionFactory = null; 119 Connection connection = null; 120 Session session = null; 121 Queue queue = null; 122 MessageConsumer msgConsumer = null; 123 TextMessage message = null; 124 125 try { 126 connectionFactory = 127 SampleUtilities.getConnectionFactory(); 128 connection = 129 connectionFactory.createConnection(); 130 session = connection.createSession(false, 131 Session.CLIENT_ACKNOWLEDGE); 132 queue = SampleUtilities.getQueue(queueName, session); 133 } catch (Exception e) { 134 System.out.println("Connection problem: " + e.toString()); 135 if (connection != null) { 136 try { 137 connection.close(); 138 } catch (JMSException ee) {} 139 } 140 System.exit(1); 141 } 142 143 /* 144 * Create client-acknowledge consumer. 145 * Receive message and process it. 146 * Acknowledge message. 147 */ 148 try { 149 System.out.println("CONSUMER: Created client-acknowledge session"); 150 msgConsumer = session.createConsumer(queue); 151 connection.start(); 152 message = (TextMessage) msgConsumer.receive(); 153 System.out.println("CONSUMER: Processing message: " 154 + message.getText()); 155 System.out.println("CONSUMER: Now I'll acknowledge the message"); 156 message.acknowledge(); 157 } catch (JMSException e) { 158 System.out.println("Exception occurred: " + e.toString()); 159 exitResult = 1; 160 } finally { 161 if (connection != null) { 162 try { 163 connection.close(); 164 } catch (JMSException e) { 165 exitResult = 1; 166 } 167 } 168 } 169 } 170 } 171 172 /** 173 * The AsynchSubscriber class creates a session in AUTO_ACKNOWLEDGE mode 174 * and fetches several messages from a topic asynchronously, using a 175 * message listener, TextListener. 176 * <p> 177 * Each message is acknowledged after the onMessage method completes. 178 */ 179 public class AsynchSubscriber extends Thread { 180 181 /** 182 * The TextListener class implements the MessageListener interface by 183 * defining an onMessage method for the AsynchSubscriber class. 184 */ 185 private class TextListener implements MessageListener { 186 final SampleUtilities.DoneLatch monitor = 187 new SampleUtilities.DoneLatch(); 188 189 /** 190 * Casts the message to a TextMessage and displays its text. 191 * A non-text message is interpreted as the end of the message 192 * stream, and the message listener sets its monitor state to all 193 * done processing messages. 194 * 195 * @param message the incoming message 196 */ onMessage(Message message)197 public void onMessage(Message message) { 198 if (message instanceof TextMessage) { 199 TextMessage msg = (TextMessage) message; 200 201 try { 202 System.out.println("CONSUMER: Processing message: " 203 + msg.getText()); 204 } catch (JMSException e) { 205 System.out.println("Exception in onMessage(): " 206 + e.toString()); 207 } 208 } else { 209 monitor.allDone(); 210 } 211 } 212 } 213 214 /** 215 * Runs the thread. 216 */ run()217 public void run() { 218 ConnectionFactory connectionFactory = null; 219 Connection connection = null; 220 Session session = null; 221 Topic topic = null; 222 TopicSubscriber topicSubscriber = null; 223 TextListener topicListener = null; 224 225 try { 226 connectionFactory = 227 SampleUtilities.getConnectionFactory(); 228 connection = 229 connectionFactory.createConnection(); 230 connection.setClientID("AckEquivExample"); 231 session = connection.createSession(false, 232 Session.AUTO_ACKNOWLEDGE); 233 System.out.println("CONSUMER: Created auto-acknowledge session"); 234 topic = SampleUtilities.getTopic(topicName, session); 235 } catch (Exception e) { 236 System.out.println("Connection problem: " + e.toString()); 237 if (connection != null) { 238 try { 239 connection.close(); 240 } catch (JMSException ee) {} 241 } 242 System.exit(1); 243 } 244 245 /* 246 * Create auto-acknowledge subscriber. 247 * Register message listener (TextListener). 248 * Start message delivery. 249 * Send synchronize message to publisher, then wait till all 250 * messages have arrived. 251 * Listener displays the messages obtained. 252 */ 253 try { 254 topicSubscriber = session.createDurableSubscriber(topic, 255 "AckEquivExampleSubscription"); 256 topicListener = new TextListener(); 257 topicSubscriber.setMessageListener(topicListener); 258 connection.start(); 259 260 // Let publisher know that subscriber is ready. 261 try { 262 SampleUtilities.sendSynchronizeMessage("CONSUMER: ", 263 CONTROL_QUEUE); 264 } catch (Exception e) { 265 System.out.println("Queue probably missing: " + e.toString()); 266 if (connection != null) { 267 try { 268 connection.close(); 269 } catch (JMSException ee) {} 270 } 271 System.exit(1); 272 } 273 274 /* 275 * Asynchronously process messages. 276 * Block until publisher issues a control message indicating 277 * end of publish stream. 278 */ 279 topicListener.monitor.waitTillDone(); 280 topicSubscriber.close(); 281 session.unsubscribe("AckEquivExampleSubscription"); 282 } catch (JMSException e) { 283 System.out.println("Exception occurred: " + e.toString()); 284 exitResult = 1; 285 } finally { 286 if (connection != null) { 287 try { 288 connection.close(); 289 } catch (JMSException e) { 290 exitResult = 1; 291 } 292 } 293 } 294 } 295 } 296 297 /** 298 * The MultiplePublisher class creates a session in AUTO_ACKNOWLEDGE mode 299 * and publishes three messages to a topic. 300 */ 301 public class MultiplePublisher extends Thread { 302 303 /** 304 * Runs the thread. 305 */ run()306 public void run() { 307 ConnectionFactory connectionFactory = null; 308 Connection connection = null; 309 Session session = null; 310 Topic topic = null; 311 MessageProducer topicPublisher = null; 312 TextMessage message = null; 313 final int NUMMSGS = 3; 314 final String MSG_TEXT = 315 new String("Here is an auto-acknowledge message"); 316 317 try { 318 connectionFactory = 319 SampleUtilities.getConnectionFactory(); 320 connection = 321 connectionFactory.createConnection(); 322 session = connection.createSession(false, 323 Session.AUTO_ACKNOWLEDGE); 324 System.out.println("PRODUCER: Created auto-acknowledge session"); 325 topic = SampleUtilities.getTopic(topicName, session); 326 } catch (Exception e) { 327 System.out.println("Connection problem: " + e.toString()); 328 if (connection != null) { 329 try { 330 connection.close(); 331 } catch (JMSException ee) {} 332 } 333 System.exit(1); 334 } 335 336 /* 337 * After synchronizing with subscriber, create publisher. 338 * Send 3 messages, varying text slightly. 339 * Send end-of-messages message. 340 */ 341 try { 342 /* 343 * Synchronize with subscriber. Wait for message indicating 344 * that subscriber is ready to receive messages. 345 */ 346 try { 347 SampleUtilities.receiveSynchronizeMessages("PRODUCER: ", 348 CONTROL_QUEUE, 1); 349 } catch (Exception e) { 350 System.out.println("Queue probably missing: " + e.toString()); 351 if (connection != null) { 352 try { 353 connection.close(); 354 } catch (JMSException ee) {} 355 } 356 System.exit(1); 357 } 358 359 topicPublisher = session.createProducer(topic); 360 message = session.createTextMessage(); 361 for (int i = 0; i < NUMMSGS; i++) { 362 message.setText(MSG_TEXT + " " + (i + 1)); 363 System.out.println("PRODUCER: Publishing message: " 364 + message.getText()); 365 topicPublisher.send(message); 366 } 367 368 // Send a non-text control message indicating end of messages. 369 topicPublisher.send(session.createMessage()); 370 } catch (JMSException e) { 371 System.out.println("Exception occurred: " + e.toString()); 372 exitResult = 1; 373 } finally { 374 if (connection != null) { 375 try { 376 connection.close(); 377 } catch (JMSException e) { 378 exitResult = 1; 379 } 380 } 381 } 382 } 383 } 384 385 /** 386 * Instantiates the producer, consumer, subscriber, and publisher classes and 387 * starts their threads. 388 * Calls the join method to wait for the threads to die. 389 */ run_threads()390 public void run_threads() { 391 SynchProducer synchProducer = new SynchProducer(); 392 SynchConsumer synchConsumer = new SynchConsumer(); 393 AsynchSubscriber asynchSubscriber = new AsynchSubscriber(); 394 MultiplePublisher multiplePublisher = new MultiplePublisher(); 395 396 synchProducer.start(); 397 synchConsumer.start(); 398 try { 399 synchProducer.join(); 400 synchConsumer.join(); 401 } catch (InterruptedException e) {} 402 403 asynchSubscriber.start(); 404 multiplePublisher.start(); 405 try { 406 asynchSubscriber.join(); 407 multiplePublisher.join(); 408 } catch (InterruptedException e) {} 409 } 410 411 /** 412 * Reads the queue and topic names from the command line, then calls the 413 * run_threads method to execute the program threads. 414 * 415 * @param args the topic used by the example 416 */ main(String[] args)417 public static void main(String[] args) { 418 AckEquivExample aee = new AckEquivExample(); 419 420 if (args.length != 2) { 421 System.out.println("Usage: java AckEquivExample <queue_name> <topic_name>"); 422 System.exit(1); 423 } 424 aee.queueName = new String(args[0]); 425 aee.topicName = new String(args[1]); 426 System.out.println("Queue name is " + aee.queueName); 427 System.out.println("Topic name is " + aee.topicName); 428 429 aee.run_threads(); 430 SampleUtilities.exit(aee.exitResult); 431 } 432 } 433