1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. 3 * 4 * Copyright (c) 2000-2017 Oracle and/or its affiliates. All rights reserved. 5 * 6 * The contents of this file are subject to the terms of either the GNU 7 * General Public License Version 2 only ("GPL") or the Common Development 8 * and Distribution License("CDDL") (collectively, the "License"). You 9 * may not use this file except in compliance with the License. You can 10 * obtain a copy of the License at 11 * https://oss.oracle.com/licenses/CDDL+GPL-1.1 12 * or LICENSE.txt. See the License for the specific 13 * language governing permissions and limitations under the License. 14 * 15 * When distributing the software, include this License Header Notice in each 16 * file and include the License file at LICENSE.txt. 17 * 18 * GPL Classpath Exception: 19 * Oracle designates this particular file as subject to the "Classpath" 20 * exception as provided by Oracle in the GPL Version 2 section of the License 21 * file that accompanied this code. 22 * 23 * Modifications: 24 * If applicable, add the following below the License Header, with the fields 25 * enclosed by brackets [] replaced by your own identifying information: 26 * "Portions Copyright [year] [name of copyright owner]" 27 * 28 * Contributor(s): 29 * If you wish your version of this file to be governed by only the CDDL or 30 * only the GPL Version 2, indicate your decision by adding "[Contributor] 31 * elects to include this software in this distribution under the [CDDL or GPL 32 * Version 2] license." If you don't indicate a single choice of license, a 33 * recipient has the option to distribute your version of this file under 34 * either the CDDL, the GPL Version 2 or to extend the choice of license to 35 * its licensees as provided above. However, if you add GPL Version 2 code 36 * and therefore, elected the GPL Version 2 license, then the option applies 37 * only if the new code is made subject to such option by the copyright 38 * holder. 39 */ 40 41 import javax.jms.*; 42 43 /** 44 * The SynchTopicExample class demonstrates the simplest form of the 45 * publish/subscribe model: the producer publishes a message, and the 46 * consumer reads it using a synchronous receive. 47 * <p> 48 * The program contains a SimpleProducer class, a SynchConsumer class, a 49 * main method, and a method that runs the consumer and producer 50 * threads. 51 * <p> 52 * Specify a topic name on the command line when you run the program. 53 * <p> 54 * The program calls methods in the SampleUtilities class. 55 */ 56 public class SynchTopicExample { 57 String topicName = null; 58 int exitResult = 0; 59 60 /** 61 * The SynchConsumer class fetches a single message from a topic using 62 * synchronous message delivery. 63 */ 64 public class SynchConsumer extends Thread { 65 66 /** 67 * Runs the thread. 68 */ run()69 public void run() { 70 ConnectionFactory connectionFactory = null; 71 Connection connection = null; 72 Session session = null; 73 Topic topic = null; 74 MessageConsumer msgConsumer = null; 75 final boolean NOLOCAL = true; 76 TextMessage inMessage = null; 77 TextMessage outMessage = null; 78 MessageProducer msgProducer = null; 79 80 /* 81 * Obtain connection factory. 82 * Create connection. 83 * Create session from connection; false means session is not 84 * transacted. 85 * Obtain topic name. 86 */ 87 try { 88 connectionFactory = 89 SampleUtilities.getConnectionFactory(); 90 connection = 91 connectionFactory.createConnection(); 92 session = connection.createSession(false, 93 Session.AUTO_ACKNOWLEDGE); 94 topic = SampleUtilities.getTopic(topicName, session); 95 } catch (Exception e) { 96 System.out.println("Connection problem: " + e.toString()); 97 if (connection != null) { 98 try { 99 connection.close(); 100 } catch (JMSException ee) {} 101 } 102 System.exit(1); 103 } 104 105 /* 106 * Create consumer, then start message delivery. Consumer is 107 * non-local so that it won't receive the message we publish. 108 * Wait for text message to arrive, then display its contents. 109 * Close connection and exit. 110 */ 111 try { 112 msgConsumer = 113 session.createConsumer(topic, null, NOLOCAL); 114 connection.start(); 115 116 inMessage = (TextMessage) msgConsumer.receive(); 117 System.out.println("CONSUMER THREAD: Reading message: " 118 + inMessage.getText()); 119 120 /* 121 * Notify producer that we received a message and it 122 * can stop broadcasting. 123 */ 124 msgProducer = session.createProducer(topic); 125 outMessage = session.createTextMessage(); 126 outMessage.setText("Done"); 127 msgProducer.send(outMessage); 128 } catch (JMSException e) { 129 System.out.println("Exception occurred: " + e.toString()); 130 exitResult = 1; 131 } finally { 132 if (connection != null) { 133 try { 134 connection.close(); 135 } catch (JMSException e) { 136 exitResult = 1; 137 } 138 } 139 } 140 } 141 } 142 143 /** 144 * The SimpleProducer class publishes a single message to a topic. 145 */ 146 public class SimpleProducer extends Thread { 147 148 /** 149 * Runs the thread. 150 */ run()151 public void run() { 152 ConnectionFactory connectionFactory = null; 153 Connection connection = null; 154 Session session = null; 155 Topic topic = null; 156 MessageConsumer producerControlConsumer = null; 157 final boolean NOLOCAL = true; 158 MessageProducer msgProducer = null; 159 TextMessage sentMessage = null; 160 final String MSG_TEXT = new String("Here is a message "); 161 Message receivedMessage = null; 162 163 /* 164 * Obtain connection factory. 165 * Create connection. 166 * Create session from connection; false means session is not 167 * transacted. 168 * Obtain topic name. 169 */ 170 try { 171 connectionFactory = 172 SampleUtilities.getConnectionFactory(); 173 connection = 174 connectionFactory.createConnection(); 175 session = connection.createSession(false, 176 Session.AUTO_ACKNOWLEDGE); 177 topic = SampleUtilities.getTopic(topicName, session); 178 } catch (Exception e) { 179 System.out.println("Connection problem: " + e.toString()); 180 if (connection != null) { 181 try { 182 connection.close(); 183 } catch (JMSException ee) {} 184 } 185 System.exit(1); 186 } 187 188 /* 189 * Create non-local consumer to receive "Done" message from 190 * another connection; start delivery. 191 * Create producer and text message. 192 * Set message text, display it, and publish message. 193 * Close connection and exit. 194 */ 195 try { 196 producerControlConsumer = 197 session.createConsumer(topic, null, NOLOCAL); 198 connection.start(); 199 200 /* 201 * Publish a message once per second until consumer 202 * reports that it has finished receiving messages. 203 */ 204 msgProducer = session.createProducer(topic); 205 sentMessage = session.createTextMessage(); 206 for (int i = 1; receivedMessage == null; i++) { 207 sentMessage.setText(MSG_TEXT + i); 208 System.out.println("PRODUCER THREAD: Publishing message: " 209 + sentMessage.getText()); 210 msgProducer.send(sentMessage); 211 try { Thread.sleep(1000); } catch (InterruptedException ie){} 212 receivedMessage = producerControlConsumer.receiveNoWait(); 213 } 214 } catch (JMSException e) { 215 System.out.println("Exception occurred: " + e.toString()); 216 exitResult = 1; 217 } finally { 218 if (connection != null) { 219 try { 220 connection.close(); 221 } catch (JMSException e) { 222 exitResult = 1; 223 } 224 } 225 } 226 } 227 } 228 229 /** 230 * Instantiates the consumer and producer classes and starts their 231 * threads. 232 * Calls the join method to wait for the threads to die. 233 * <p> 234 * It is essential to start the consumer before starting the producer. 235 * In the publish/subscribe model, a consumer can ordinarily receive only 236 * messages published while it is active. 237 */ run_threads()238 public void run_threads() { 239 SynchConsumer synchConsumer = new SynchConsumer(); 240 SimpleProducer simpleProducer = new SimpleProducer(); 241 242 synchConsumer.start(); 243 simpleProducer.start(); 244 try { 245 synchConsumer.join(); 246 simpleProducer.join(); 247 } catch (InterruptedException e) {} 248 } 249 250 /** 251 * Reads the topic name from the command line and displays it. The 252 * topic must have been created by the jmsadmin tool. 253 * Calls the run_threads method to execute the program threads. 254 * Exits program. 255 * 256 * @param args the topic used by the example 257 */ main(String[] args)258 public static void main(String[] args) { 259 SynchTopicExample ste = new SynchTopicExample(); 260 261 if (args.length != 1) { 262 System.out.println("Usage: java SynchTopicExample <topic_name>"); 263 System.exit(1); 264 } 265 ste.topicName = new String(args[0]); 266 System.out.println("Topic name is " + ste.topicName); 267 268 ste.run_threads(); 269 SampleUtilities.exit(ste.exitResult); 270 } 271 } 272