1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3  *
4  * Copyright (c) 2000-2017 Oracle and/or its affiliates. All rights reserved.
5  *
6  * The contents of this file are subject to the terms of either the GNU
7  * General Public License Version 2 only ("GPL") or the Common Development
8  * and Distribution License("CDDL") (collectively, the "License").  You
9  * may not use this file except in compliance with the License.  You can
10  * obtain a copy of the License at
11  * https://oss.oracle.com/licenses/CDDL+GPL-1.1
12  * or LICENSE.txt.  See the License for the specific
13  * language governing permissions and limitations under the License.
14  *
15  * When distributing the software, include this License Header Notice in each
16  * file and include the License file at LICENSE.txt.
17  *
18  * GPL Classpath Exception:
19  * Oracle designates this particular file as subject to the "Classpath"
20  * exception as provided by Oracle in the GPL Version 2 section of the License
21  * file that accompanied this code.
22  *
23  * Modifications:
24  * If applicable, add the following below the License Header, with the fields
25  * enclosed by brackets [] replaced by your own identifying information:
26  * "Portions Copyright [year] [name of copyright owner]"
27  *
28  * Contributor(s):
29  * If you wish your version of this file to be governed by only the CDDL or
30  * only the GPL Version 2, indicate your decision by adding "[Contributor]
31  * elects to include this software in this distribution under the [CDDL or GPL
32  * Version 2] license."  If you don't indicate a single choice of license, a
33  * recipient has the option to distribute your version of this file under
34  * either the CDDL, the GPL Version 2 or to extend the choice of license to
35  * its licensees as provided above.  However, if you add GPL Version 2 code
36  * and therefore, elected the GPL Version 2 license, then the option applies
37  * only if the new code is made subject to such option by the copyright
38  * holder.
39  */
40 
41 import javax.jms.*;
42 
43 /**
44  * The SynchTopicExample class demonstrates the simplest form of the
45  * publish/subscribe model: the producer publishes a message, and the
46  * consumer reads it using a synchronous receive.
47  * <p>
48  * The program contains a SimpleProducer class, a SynchConsumer class, a
49  * main method, and a method that runs the consumer and producer
50  * threads.
51  * <p>
52  * Specify a topic name on the command line when you run the program.
53  * <p>
54  * The program calls methods in the SampleUtilities class.
55  */
56 public class SynchTopicExample {
57     String  topicName = null;
58     int     exitResult = 0;
59 
60     /**
61      * The SynchConsumer class fetches a single message from a topic using
62      * synchronous message delivery.
63      */
64     public class SynchConsumer extends Thread {
65 
66         /**
67          * Runs the thread.
68          */
run()69         public void run() {
70             ConnectionFactory    connectionFactory = null;
71             Connection           connection = null;
72             Session              session = null;
73             Topic                topic = null;
74             MessageConsumer      msgConsumer = null;
75             final boolean        NOLOCAL = true;
76             TextMessage          inMessage = null;
77             TextMessage          outMessage = null;
78             MessageProducer      msgProducer = null;
79 
80             /*
81              * Obtain connection factory.
82              * Create connection.
83              * Create session from connection; false means session is not
84              * transacted.
85              * Obtain topic name.
86              */
87             try {
88                 connectionFactory =
89                     SampleUtilities.getConnectionFactory();
90                 connection =
91                     connectionFactory.createConnection();
92                 session = connection.createSession(false,
93                     Session.AUTO_ACKNOWLEDGE);
94                 topic = SampleUtilities.getTopic(topicName, session);
95             } catch (Exception e) {
96                 System.out.println("Connection problem: " + e.toString());
97                 if (connection != null) {
98                     try {
99                         connection.close();
100                     } catch (JMSException ee) {}
101                 }
102     	        System.exit(1);
103             }
104 
105             /*
106              * Create consumer, then start message delivery.  Consumer is
107              * non-local so that it won't receive the message we publish.
108              * Wait for text message to arrive, then display its contents.
109              * Close connection and exit.
110              */
111             try {
112                 msgConsumer =
113                     session.createConsumer(topic, null, NOLOCAL);
114                 connection.start();
115 
116                 inMessage = (TextMessage) msgConsumer.receive();
117                 System.out.println("CONSUMER THREAD: Reading message: "
118                                    + inMessage.getText());
119 
120                 /*
121                  * Notify producer that we received a message and it
122                  * can stop broadcasting.
123                  */
124                 msgProducer = session.createProducer(topic);
125                 outMessage = session.createTextMessage();
126                 outMessage.setText("Done");
127                 msgProducer.send(outMessage);
128             } catch (JMSException e) {
129                 System.out.println("Exception occurred: " + e.toString());
130                 exitResult = 1;
131             } finally {
132                 if (connection != null) {
133                     try {
134                         connection.close();
135                     } catch (JMSException e) {
136                         exitResult = 1;
137                     }
138                 }
139             }
140         }
141     }
142 
143     /**
144      * The SimpleProducer class publishes a single message to a topic.
145      */
146     public class SimpleProducer extends Thread {
147 
148         /**
149          * Runs the thread.
150          */
run()151         public void run() {
152             ConnectionFactory    connectionFactory = null;
153             Connection           connection = null;
154             Session              session = null;
155             Topic                topic = null;
156             MessageConsumer      producerControlConsumer = null;
157             final boolean        NOLOCAL = true;
158             MessageProducer      msgProducer =  null;
159             TextMessage          sentMessage = null;
160             final String         MSG_TEXT = new String("Here is a message ");
161             Message              receivedMessage = null;
162 
163             /*
164              * Obtain connection factory.
165              * Create connection.
166              * Create session from connection; false means session is not
167              * transacted.
168              * Obtain topic name.
169              */
170             try {
171                 connectionFactory =
172                     SampleUtilities.getConnectionFactory();
173                 connection =
174                     connectionFactory.createConnection();
175                 session = connection.createSession(false,
176                     Session.AUTO_ACKNOWLEDGE);
177                 topic = SampleUtilities.getTopic(topicName, session);
178             } catch (Exception e) {
179                 System.out.println("Connection problem: " + e.toString());
180                 if (connection != null) {
181                     try {
182                         connection.close();
183                     } catch (JMSException ee) {}
184                 }
185     	        System.exit(1);
186             }
187 
188             /*
189              * Create non-local consumer to receive "Done" message from
190              * another connection; start delivery.
191              * Create producer and text message.
192              * Set message text, display it, and publish message.
193              * Close connection and exit.
194              */
195             try {
196                 producerControlConsumer =
197                     session.createConsumer(topic, null, NOLOCAL);
198                 connection.start();
199 
200                 /*
201                  * Publish a message once per second until consumer
202                  * reports that it has finished receiving messages.
203                  */
204                 msgProducer = session.createProducer(topic);
205                 sentMessage = session.createTextMessage();
206                 for (int i = 1; receivedMessage == null; i++) {
207                     sentMessage.setText(MSG_TEXT + i);
208                     System.out.println("PRODUCER THREAD: Publishing message: "
209                                        + sentMessage.getText());
210                     msgProducer.send(sentMessage);
211                     try { Thread.sleep(1000); } catch (InterruptedException ie){}
212                     receivedMessage = producerControlConsumer.receiveNoWait();
213                 }
214             } catch (JMSException e) {
215                 System.out.println("Exception occurred: " + e.toString());
216                 exitResult = 1;
217             } finally {
218                 if (connection != null) {
219                     try {
220                         connection.close();
221                     } catch (JMSException e) {
222                         exitResult = 1;
223                     }
224                 }
225             }
226         }
227     }
228 
229     /**
230      * Instantiates the consumer and producer classes and starts their
231      * threads.
232      * Calls the join method to wait for the threads to die.
233      * <p>
234      * It is essential to start the consumer before starting the producer.
235      * In the publish/subscribe model, a consumer can ordinarily receive only
236      * messages published while it is active.
237      */
run_threads()238     public void run_threads() {
239         SynchConsumer  synchConsumer = new SynchConsumer();
240         SimpleProducer  simpleProducer = new SimpleProducer();
241 
242         synchConsumer.start();
243         simpleProducer.start();
244         try {
245             synchConsumer.join();
246             simpleProducer.join();
247         } catch (InterruptedException e) {}
248     }
249 
250     /**
251      * Reads the topic name from the command line and displays it.  The
252      * topic must have been created by the jmsadmin tool.
253      * Calls the run_threads method to execute the program threads.
254      * Exits program.
255      *
256      * @param args	the topic used by the example
257      */
main(String[] args)258     public static void main(String[] args) {
259         SynchTopicExample  ste = new SynchTopicExample();
260 
261         if (args.length != 1) {
262     	    System.out.println("Usage: java SynchTopicExample <topic_name>");
263     	    System.exit(1);
264     	}
265         ste.topicName = new String(args[0]);
266         System.out.println("Topic name is " + ste.topicName);
267 
268     	ste.run_threads();
269     	SampleUtilities.exit(ste.exitResult);
270     }
271 }
272