1 /*
2  * Copyright (c) 2000, 2017 Oracle and/or its affiliates. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Distribution License v. 1.0, which is available at
6  * http://www.eclipse.org/org/documents/edl-v10.php.
7  *
8  * SPDX-License-Identifier: BSD-3-Clause
9  */
10 
11 import javax.jms.*;
12 
13 /**
14  * The AckEquivExample class shows how the following two scenarios both ensure
15  * that a message will not be acknowledged until processing of it is complete:
16  * <ul>
17  * <li> Using an asynchronous consumer (message listener) in an
18  * AUTO_ACKNOWLEDGE session
19  * <li> Using a synchronous consumer in a CLIENT_ACKNOWLEDGE session
20  * </ul>
21  * <p>
22  * With a message listener, the automatic acknowledgment happens when the
23  * onMessage method returns -- that is, after message processing has finished.
24  * <p>
25  * With a synchronous receive, the client acknowledges the message after
26  * processing is complete.  (If you use AUTO_ACKNOWLEDGE with a synchronous
27  * receive, the acknowledgement happens immediately after the receive call; if
28  * any subsequent processing steps fail, the message cannot be redelivered.)
29  * <p>
30  * The program contains a SynchProducer class, a SynchConsumer class, an
31  * AsynchSubscriber class with a TextListener class, a MultiplePublisher class,
32  * a main method, and a method that runs the other classes' threads.
33  * <p>
34  * Specify a queue name and a topic name on the command line when you run the
35  * program.  The program also uses a queue named "controlQueue", which should be
36  * created before you run the program.
37  */
38 public class AckEquivExample {
39     final String  CONTROL_QUEUE = "controlQueue";
40     String        queueName = null;
41     String        topicName = null;
42     int           exitResult = 0;
43 
44     /**
45      * The SynchProducer class creates a session in CLIENT_ACKNOWLEDGE mode and
46      * sends a message.
47      */
48     public class SynchProducer extends Thread {
49 
50         /**
51          * Runs the thread.
52          */
run()53         public void run() {
54             ConnectionFactory    connectionFactory = null;
55             Connection           connection = null;
56             Session              session = null;
57             Queue                queue = null;
58             MessageProducer      msgProducer = null;
59             final String         MSG_TEXT =
60                                     new String("Here is a client-acknowledge message");
61             TextMessage          message = null;
62 
63             try {
64                 connectionFactory =
65                     SampleUtilities.getConnectionFactory();
66                 connection =
67                     connectionFactory.createConnection();
68                 session = connection.createSession(false,
69                     Session.CLIENT_ACKNOWLEDGE);
70                 queue = SampleUtilities.getQueue(queueName, session);
71             } catch (Exception e) {
72                 System.out.println("Connection problem: " + e.toString());
73                 if (connection != null) {
74                     try {
75                         connection.close();
76                     } catch (JMSException ee) {}
77                 }
78                 System.exit(1);
79             }
80 
81             /*
82              * Create client-acknowledge producer.
83              * Create and send message.
84              */
85             try {
86                 System.out.println("PRODUCER: Created client-acknowledge session");
87                 msgProducer = session.createProducer(queue);
88                 message = session.createTextMessage();
89                 message.setText(MSG_TEXT);
90                 System.out.println("PRODUCER: Sending message: "
91                     + message.getText());
92                 msgProducer.send(message);
93             } catch (JMSException e) {
94                 System.out.println("Exception occurred: " + e.toString());
95                 exitResult = 1;
96             } finally {
97                 if (connection != null) {
98                     try {
99                         connection.close();
100                     } catch (JMSException e) {
101                         exitResult = 1;
102                     }
103                 }
104             }
105         }
106     }
107 
108     /**
109      * The SynchConsumer class creates a session in CLIENT_ACKNOWLEDGE mode and
110      * receives the message sent by the SynchProducer class.
111      */
112     public class SynchConsumer extends Thread {
113 
114         /**
115          * Runs the thread.
116          */
run()117     	public void run() {
118             ConnectionFactory    connectionFactory = null;
119             Connection           connection = null;
120             Session              session = null;
121             Queue                queue = null;
122             MessageConsumer      msgConsumer = null;
123             TextMessage          message = null;
124 
125             try {
126                 connectionFactory =
127                     SampleUtilities.getConnectionFactory();
128                 connection =
129                     connectionFactory.createConnection();
130                 session = connection.createSession(false,
131                     Session.CLIENT_ACKNOWLEDGE);
132                 queue = SampleUtilities.getQueue(queueName, session);
133             } catch (Exception e) {
134                 System.out.println("Connection problem: " + e.toString());
135                 if (connection != null) {
136                     try {
137                         connection.close();
138                     } catch (JMSException ee) {}
139                 }
140                 System.exit(1);
141             }
142 
143             /*
144              * Create client-acknowledge consumer.
145              * Receive message and process it.
146              * Acknowledge message.
147              */
148              try {
149                 System.out.println("CONSUMER: Created client-acknowledge session");
150                 msgConsumer = session.createConsumer(queue);
151                 connection.start();
152                 message = (TextMessage) msgConsumer.receive();
153                 System.out.println("CONSUMER: Processing message: "
154                     + message.getText());
155                 System.out.println("CONSUMER: Now I'll acknowledge the message");
156                 message.acknowledge();
157             } catch (JMSException e) {
158                 System.out.println("Exception occurred: " + e.toString());
159                 exitResult = 1;
160             } finally {
161                 if (connection != null) {
162                     try {
163                         connection.close();
164                     } catch (JMSException e) {
165                         exitResult = 1;
166                     }
167                 }
168             }
169         }
170     }
171 
172     /**
173      * The AsynchSubscriber class creates a session in AUTO_ACKNOWLEDGE mode
174      * and fetches several messages from a topic asynchronously, using a
175      * message listener, TextListener.
176      * <p>
177      * Each message is acknowledged after the onMessage method completes.
178      */
179     public class AsynchSubscriber extends Thread {
180 
181         /**
182          * The TextListener class implements the MessageListener interface by
183          * defining an onMessage method for the AsynchSubscriber class.
184          */
185         private class TextListener implements MessageListener {
186             final SampleUtilities.DoneLatch  monitor =
187                 new SampleUtilities.DoneLatch();
188 
189             /**
190              * Casts the message to a TextMessage and displays its text.
191              * A non-text message is interpreted as the end of the message
192              * stream, and the message listener sets its monitor state to all
193              * done processing messages.
194              *
195              * @param message	the incoming message
196              */
onMessage(Message message)197             public void onMessage(Message message) {
198                 if (message instanceof TextMessage) {
199                     TextMessage  msg = (TextMessage) message;
200 
201                     try {
202                         System.out.println("CONSUMER: Processing message: "
203                                            + msg.getText());
204                     } catch (JMSException e) {
205                         System.out.println("Exception in onMessage(): "
206                                            + e.toString());
207                     }
208                 } else {
209                     monitor.allDone();
210                 }
211             }
212         }
213 
214         /**
215          * Runs the thread.
216          */
run()217         public void run() {
218             ConnectionFactory    connectionFactory = null;
219             Connection           connection = null;
220             Session              session = null;
221             Topic                topic = null;
222             TopicSubscriber      topicSubscriber = null;
223             TextListener         topicListener = null;
224 
225             try {
226                 connectionFactory =
227                     SampleUtilities.getConnectionFactory();
228                 connection =
229                     connectionFactory.createConnection();
230                 connection.setClientID("AckEquivExample");
231                 session = connection.createSession(false,
232                     Session.AUTO_ACKNOWLEDGE);
233                 System.out.println("CONSUMER: Created auto-acknowledge session");
234                 topic = SampleUtilities.getTopic(topicName, session);
235             } catch (Exception e) {
236                 System.out.println("Connection problem: " + e.toString());
237                 if (connection != null) {
238                     try {
239                         connection.close();
240                     } catch (JMSException ee) {}
241                 }
242     	        System.exit(1);
243             }
244 
245             /*
246              * Create auto-acknowledge subscriber.
247              * Register message listener (TextListener).
248              * Start message delivery.
249              * Send synchronize message to publisher, then wait till all
250              *   messages have arrived.
251              * Listener displays the messages obtained.
252              */
253             try {
254                 topicSubscriber = session.createDurableSubscriber(topic,
255                      "AckEquivExampleSubscription");
256                 topicListener = new TextListener();
257                 topicSubscriber.setMessageListener(topicListener);
258                 connection.start();
259 
260                 // Let publisher know that subscriber is ready.
261                 try {
262                     SampleUtilities.sendSynchronizeMessage("CONSUMER: ",
263                                                             CONTROL_QUEUE);
264                 } catch (Exception e) {
265                     System.out.println("Queue probably missing: " + e.toString());
266                     if (connection != null) {
267                         try {
268                             connection.close();
269                         } catch (JMSException ee) {}
270                     }
271     	            System.exit(1);
272     	        }
273 
274                 /*
275                  * Asynchronously process messages.
276                  * Block until publisher issues a control message indicating
277                  * end of publish stream.
278                  */
279                 topicListener.monitor.waitTillDone();
280                 topicSubscriber.close();
281                 session.unsubscribe("AckEquivExampleSubscription");
282             } catch (JMSException e) {
283                 System.out.println("Exception occurred: " + e.toString());
284                 exitResult = 1;
285             } finally {
286                 if (connection != null) {
287                     try {
288                         connection.close();
289                     } catch (JMSException e) {
290                         exitResult = 1;
291                     }
292                 }
293             }
294         }
295     }
296 
297     /**
298      * The MultiplePublisher class creates a session in AUTO_ACKNOWLEDGE mode
299      * and publishes three messages to a topic.
300      */
301     public class MultiplePublisher extends Thread {
302 
303         /**
304          * Runs the thread.
305          */
run()306         public void run() {
307             ConnectionFactory    connectionFactory = null;
308             Connection           connection = null;
309             Session              session = null;
310             Topic                topic = null;
311             MessageProducer      topicPublisher = null;
312             TextMessage          message = null;
313             final int            NUMMSGS = 3;
314             final String         MSG_TEXT =
315                                      new String("Here is an auto-acknowledge message");
316 
317             try {
318                 connectionFactory =
319                     SampleUtilities.getConnectionFactory();
320                 connection =
321                     connectionFactory.createConnection();
322                 session = connection.createSession(false,
323                     Session.AUTO_ACKNOWLEDGE);
324                 System.out.println("PRODUCER: Created auto-acknowledge session");
325                 topic = SampleUtilities.getTopic(topicName, session);
326             } catch (Exception e) {
327                 System.out.println("Connection problem: " + e.toString());
328                 if (connection != null) {
329                     try {
330                         connection.close();
331                     } catch (JMSException ee) {}
332                 }
333     	        System.exit(1);
334             }
335 
336             /*
337              * After synchronizing with subscriber, create publisher.
338              * Send 3 messages, varying text slightly.
339              * Send end-of-messages message.
340              */
341             try {
342                 /*
343                  * Synchronize with subscriber.  Wait for message indicating
344                  * that subscriber is ready to receive messages.
345                  */
346                 try {
347                     SampleUtilities.receiveSynchronizeMessages("PRODUCER: ",
348                                                               CONTROL_QUEUE, 1);
349                 } catch (Exception e) {
350                     System.out.println("Queue probably missing: " + e.toString());
351                     if (connection != null) {
352                         try {
353                             connection.close();
354                         } catch (JMSException ee) {}
355                     }
356     	            System.exit(1);
357     	        }
358 
359                 topicPublisher = session.createProducer(topic);
360                 message = session.createTextMessage();
361                 for (int i = 0; i < NUMMSGS; i++) {
362                     message.setText(MSG_TEXT + " " + (i + 1));
363                     System.out.println("PRODUCER: Publishing message: "
364                         + message.getText());
365                     topicPublisher.send(message);
366                 }
367 
368                 // Send a non-text control message indicating end of messages.
369                 topicPublisher.send(session.createMessage());
370             } catch (JMSException e) {
371                 System.out.println("Exception occurred: " + e.toString());
372                 exitResult = 1;
373             } finally {
374                 if (connection != null) {
375                     try {
376                         connection.close();
377                     } catch (JMSException e) {
378                         exitResult = 1;
379                     }
380                 }
381             }
382         }
383     }
384 
385     /**
386      * Instantiates the producer, consumer, subscriber, and publisher classes and
387      * starts their threads.
388      * Calls the join method to wait for the threads to die.
389      */
run_threads()390     public void run_threads() {
391         SynchProducer        synchProducer = new SynchProducer();
392         SynchConsumer        synchConsumer = new SynchConsumer();
393         AsynchSubscriber     asynchSubscriber = new AsynchSubscriber();
394         MultiplePublisher    multiplePublisher = new MultiplePublisher();
395 
396         synchProducer.start();
397         synchConsumer.start();
398         try {
399             synchProducer.join();
400             synchConsumer.join();
401         } catch (InterruptedException e) {}
402 
403         asynchSubscriber.start();
404         multiplePublisher.start();
405         try {
406             asynchSubscriber.join();
407             multiplePublisher.join();
408         } catch (InterruptedException e) {}
409     }
410 
411     /**
412      * Reads the queue and topic names from the command line, then calls the
413      * run_threads method to execute the program threads.
414      *
415      * @param args	the topic used by the example
416      */
main(String[] args)417     public static void main(String[] args) {
418         AckEquivExample  aee = new AckEquivExample();
419 
420         if (args.length != 2) {
421     	    System.out.println("Usage: java AckEquivExample <queue_name> <topic_name>");
422     	    System.exit(1);
423     	}
424         aee.queueName = new String(args[0]);
425         aee.topicName = new String(args[1]);
426         System.out.println("Queue name is " + aee.queueName);
427         System.out.println("Topic name is " + aee.topicName);
428 
429     	aee.run_threads();
430     	SampleUtilities.exit(aee.exitResult);
431     }
432 }
433