1 /*
2  * The Spread Toolkit.
3  *
4  * The contents of this file are subject to the Spread Open-Source
5  * License, Version 1.0 (the ``License''); you may not use
6  * this file except in compliance with the License.  You may obtain a
7  * copy of the License at:
8  *
9  * http://www.spread.org/license/
10  *
11  * or in the file ``license.txt'' found in this distribution.
12  *
13  * Software distributed under the License is distributed on an AS IS basis,
14  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
15  * for the specific language governing rights and limitations under the
16  * License.
17  *
18  * The Creators of Spread are:
19  *  Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
20  *
21  *  Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
22  *
23  *  All Rights Reserved.
24  *
25  * Major Contributor(s):
26  * ---------------
27  *    Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
28  *    Theo Schlossnagle    jesus@omniti.com - Perl, skiplists, autoconf.
29  *    Dan Schoenblum       dansch@cnds.jhu.edu - Java interface.
30  *    John Schultz         jschultz@cnds.jhu.edu - contribution to process group membership.
31  *
32  */
33 
34 
35 
36 package spread;
37 
38 import java.net.*;
39 import java.io.*;
40 import java.util.*;
41 
42 /**
43  * A SpreadConnection object is used to establish a connection to a spread daemon.
44  * To connect to a spread daemon, first create a new SpreadConnection object, and then
45  * call {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)}:
46  * <p><blockquote><pre>
47  * SpreadConnection connection = new SpreadConnection();
48  * connection.connect(null, 0, "name", false, false);
49  * </pre></blockquote><p>
50  * The only methods that can be called before
51  * {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)} are the add
52  * ({@link SpreadConnection#add(BasicMessageListener)}, {@link SpreadConnection#add(AdvancedMessageListener)})
53  * and remove ({@link SpreadConnection#remove(BasicMessageListener)}, {@link SpreadConnection#remove(AdvancedMessageListener)})
54  * methods.  If any other methods are called, a SpreadException is thrown, except for
55  * {@link SpreadConnection#getPrivateGroup()}, which returns null.
56  * <p>
57  * To disconnect from the daemon, call {@link SpreadConnection#disconnect()}:
58  * <p><blockquote><pre>
59  * connection.disconnect();
60  * </pre></blockquote><p>
61  * To send a message on this connection, call {@link SpreadConnection#multicast(SpreadMessage)}:
62  * <p><blockquote><pre>
63  * connection.multicast(message);
64  * </pre></blockquote><p>
65  * To receive a message sent to this connection, call {@link SpreadConnection#receive()}:
66  * <p><blockquote><pre>
67  * SpreadMessage message = connection.receive();
68  * </pre></blockquote><p>
69  */
70 public class SpreadConnection
71 {
72 	// The default Spread port.
73 	///////////////////////////
74 	private static final int DEFAULT_SPREAD_PORT = 4803;
75 
76 	// The maximum length of the private name.
77 	//////////////////////////////////////////
78 	private static final int MAX_PRIVATE_NAME = 10;
79 
80 	// The maximum length of a message + group names.
81 	//////////////////////////////////////////
82 	private static final int MAX_MESSAGE_LENGTH = 140000;
83 
84 	// The maximum length of the group name.
85 	////////////////////////////////////////
86 	protected static final int MAX_GROUP_NAME = 32;
87 
88 	// The Spread version.
89 	//////////////////////
90         private static final int SP_MAJOR_VERSION = 3;
91 	private static final int SP_MINOR_VERSION = 17;
92 	private static final int SP_PATCH_VERSION = 4;
93 
94         // The default authentication method
95         ////////////////////////////////////
96 	private static final String DEFAULT_AUTH_NAME = "NULL";
97 
98         // The class name of the default authentication method
99         //////////////////////////////////////////////////////
100 	private static final String DEFAULT_AUTHCLASS_NAME = "spread.NULLAuth";
101 
102 	// The maximum length of a authentication method name
103 	/////////////////////////////////////////////////////
104         private static final int MAX_AUTH_NAME = 30;
105 
106 	// The maximum number of authentication methods
107 	///////////////////////////////////////////////
108         private static final int MAX_AUTH_METHODS = 3;
109 
110 	// Received if a connection attempt was successful.
111 	///////////////////////////////////////////////////
112 	private static final int ACCEPT_SESSION = 1;
113 
114 	// Used to determine endianness.
115 	////////////////////////////////
116 	private static final int ENDIAN_TYPE = 0x80000080;
117 
118 	// Only true if this connection is connected.
119 	/////////////////////////////////////////////
120 	private boolean connected;
121 
122 	// Reading synchro.
123 	///////////////////
124 	private Boolean rsynchro;
125 	// Writing synchro.
126 	///////////////////
127 	private Boolean wsynchro;
128 	// Listener list synchro.
129 	///////////////////
130 	private Boolean listenersynchro;
131 
132 
133 	// True if calling listeners.
134 	/////////////////////////////
135 	private boolean callingListeners;
136 
137 	// The thread feeding the listeners.
138 	////////////////////////////////////
139 	private Listener listener;
140 
141 	// Basic listeners.
142 	///////////////////
143 	protected Vector basicListeners;
144 
145 	// Advanced listeners.
146 	//////////////////////
147 	protected Vector advancedListeners;
148 
149 	// The daemon's address.
150 	////////////////////////
151 	private InetAddress address;
152 
153 	// The daemon's port.
154 	/////////////////////
155 	private int port;
156 
157 	// Is this a priority connection?
158 	/////////////////////////////////
159 	private boolean priority;
160 
161 	// Getting group membership messages?
162 	/////////////////////////////////////
163 	private boolean groupMembership;
164 
165 	// Name of active choosen Authentication method
166 	///////////////////////////////////////////////
167 	private String authName;
168 
169 	// Name of class for active choosen Authentication method
170 	/////////////////////////////////////////////////////////
171 	private String authClassName;
172 
173         // Object reference to current authentication class
174         ///////////////////////////////////////////////////
175         private Object authObj;
176 
177 	// Method reference to authenticate method
178 	//////////////////////////////////////////
179 	private java.lang.reflect.Method authMethodAuthenticate;
180 
181 	// The socket this connection is using.
182 	///////////////////////////////////////
183 	private Socket socket;
184 
185 	// The socket's input stream.
186 	/////////////////////////////
187 	private InputStream socketInput;
188 
189 	// The socket's output stream.
190 	//////////////////////////////
191 	private OutputStream socketOutput;
192 
193 	// The private group.
194 	/////////////////////
195 	private SpreadGroup group;
196 
197 	// Commands buffered during listener callbacks.
198 	// The buffer is a list of BUFFER_* constants.
199 	// For commands with an argument (all except
200 	// for BUFFER_DISCONNECT), the argument follows in the Vector.
201 	//////////////////////////////////////////////////////////////
202 	private Vector listenerBuffer;
203 
204 	// Listener buffer commands.
205 	// These are Object's because they need to be added to a Vector.
206 	////////////////////////////////////////////////////////////////
207 	private static final Object BUFFER_DISCONNECT = new Object();
208 	private static final Object BUFFER_ADD_BASIC = new Object();
209 	private static final Object BUFFER_ADD_ADVANCED = new Object();
210 	private static final Object BUFFER_REMOVE_BASIC = new Object();
211 	private static final Object BUFFER_REMOVE_ADVANCED = new Object();
212 
213 	// Checks if the int is the same-endian type as the local machine.
214 	//////////////////////////////////////////////////////////////////
sameEndian(int i)215 	private static boolean sameEndian(int i)
216 	{
217 		return ((i & ENDIAN_TYPE) == 0);
218 	}
219 
220 	// Clears the int's endian type.
221 	////////////////////////////////
clearEndian(int i)222 	private static int clearEndian(int i)
223 	{
224 		return (i & ~ENDIAN_TYPE);
225 	}
226 
227 	// Endian-flips the int.
228 	////////////////////////
flip(int i)229 	protected static int flip(int i)
230 	{
231 		return (((i >> 24) & 0x000000ff) | ((i >>  8) & 0x0000ff00) | ((i <<  8) & 0x00ff0000) | ((i << 24) & 0xff000000));
232 	}
233 
234 	// Endian-flips the short.
235 	//////////////////////////
flip(short s)236 	private static short flip(short s)
237 	{
238 		return ((short)(((s >> 8) & 0x00ff) | ((s << 8 ) & 0xff00)));
239 	}
240 
241 	// Puts a group name into an array of bytes.
242 	////////////////////////////////////////////
toBytes(SpreadGroup group, byte buffer[], int bufferIndex)243 	private static void toBytes(SpreadGroup group, byte buffer[], int bufferIndex)
244 	{
245 		// Get the group's name.
246 		////////////////////////
247 		byte name[];
248 		try
249 		{
250 			name = group.toString().getBytes("ISO8859_1");
251 		}
252 		catch(UnsupportedEncodingException e)
253 		{
254 			// Already checked for this exception in connect.
255 			/////////////////////////////////////////////////
256 			name = new byte[0];
257 		}
258 
259 		// Put a cap on the length.
260 		///////////////////////////
261 		int len = name.length;
262 		if(len > MAX_GROUP_NAME)
263 			len = MAX_GROUP_NAME;
264 
265 		// Copy the name into the buffer.
266 		/////////////////////////////////
267 		System.arraycopy(name, 0, buffer, bufferIndex, len);
268 		for( ; len < MAX_GROUP_NAME ; len++ )
269 			buffer[bufferIndex + len] = 0;
270 	}
271 
272 	// Puts an int into an array of bytes.
273 	//////////////////////////////////////
toBytes(int i, byte buffer[], int bufferIndex)274 	private static void toBytes(int i, byte buffer[], int bufferIndex)
275 	{
276 		buffer[bufferIndex++] = (byte)((i >> 24) & 0xFF);
277 		buffer[bufferIndex++] = (byte)((i >> 16) & 0xFF);
278 		buffer[bufferIndex++] = (byte)((i >> 8 ) & 0xFF);
279 		buffer[bufferIndex++] = (byte)((i      ) & 0xFF);
280 	}
281 
282 	// Gets an int from an array of bytes.
283 	//////////////////////////////////////
toInt(byte buffer[], int bufferIndex)284 	protected static int toInt(byte buffer[], int bufferIndex)
285 	{
286 		int i0 = (buffer[bufferIndex++] & 0xFF);
287 		int i1 = (buffer[bufferIndex++] & 0xFF);
288 		int i2 = (buffer[bufferIndex++] & 0xFF);
289 		int i3 = (buffer[bufferIndex++] & 0xFF);
290 
291 		return ((i0 << 24) | (i1 << 16) | (i2 << 8) | (i3));
292 	}
293 
294     // Reads from inputsocket until all bytes read or exception raised
295     //////////////////////////////////////////////////////////////////
readBytesFromSocket(byte buffer[], String bufferTypeString)296     private void readBytesFromSocket(byte buffer[], String bufferTypeString) throws SpreadException
297     {
298 	int byteIndex;
299 	int rcode;
300 	try
301 	{
302 	    for(byteIndex = 0 ; byteIndex < buffer.length ; byteIndex += rcode)
303 	    {
304 		rcode = socketInput.read(buffer, byteIndex, buffer.length - byteIndex);
305 		if(rcode == -1)
306 		{
307 		    throw new SpreadException("Connection closed while reading " + bufferTypeString);
308 		}
309 	    }
310 	}
311 	catch(InterruptedIOException e) {
312 	    throw new SpreadException("readBytesFromSocket(): InterruptedIOException " + e);
313 	}
314 	catch(IOException e)
315 	{
316 	    throw new SpreadException("readBytesFromSocket(): read() " + e);
317 	}
318 
319     }
320 
321 
322 	// Gets a string from an array of bytes.
323 	////////////////////////////////////////
toGroup(byte buffer[], int bufferIndex)324 	protected SpreadGroup toGroup(byte buffer[], int bufferIndex)
325 	{
326 		try
327 		{
328 			for(int end = bufferIndex ; end < buffer.length ; end++)
329 			{
330 				if(buffer[end] == 0)
331 				{
332 					// Get the group name.
333 					//////////////////////
334 					String name = new String(buffer, bufferIndex, end - bufferIndex, "ISO8859_1");
335 
336 					// Return the group.
337 					////////////////////
338 					return new SpreadGroup(this, name);
339 				}
340 			}
341 		}
342 		catch(UnsupportedEncodingException e)
343 		{
344 			// Already checked for this exception in connect.
345 			/////////////////////////////////////////////////
346 		}
347 
348 		return null;
349 	}
350 
351 	// Set the send and receive buffer sizes.
352 	/////////////////////////////////////////
setBufferSizes()353 	private void setBufferSizes() throws SpreadException
354 	{
355 /* NOT SUPPORTED IN 1.1
356 		try
357 		{
358 			for(int i = 10 ; i <= 200 ; i += 5)
359 			{
360 				// The size to try.
361 				///////////////////
362 				int size = (1024 * i);
363 
364 				// Set the buffer sizes.
365 				////////////////////////
366 				socket.setSendBufferSize(size);
367 				socket.setReceiveBufferSize(size);
368 
369 				// Check the actual sizes.  If smaller, then the max was hit.
370 				/////////////////////////////////////////////////////////////
371 				if((socket.getSendBufferSize() < size) || (socket.getReceiveBufferSize() < size))
372 				{
373 					break;
374 				}
375 			}
376 		}
377 		catch(SocketException e)
378 		{
379 			throw new SpreadException("set/getSend/ReceiveBufferSize(): " + e);
380 		}
381 NOT SUPPORTED IN 1.1	*/
382 	}
383 
384 	// Sends the initial connect message.
385 	/////////////////////////////////////
sendConnect(String privateName)386 	private void sendConnect(String privateName) throws SpreadException
387 	{
388 		// Check the private name for validity.
389 		///////////////////////////////////////
390 		int len = (privateName == null ? 0 : privateName.length());
391 		if(len > MAX_PRIVATE_NAME)
392 		{
393 			privateName = privateName.substring(0, MAX_PRIVATE_NAME);
394 			len = MAX_PRIVATE_NAME;
395 		}
396 
397 		// Allocate the buffer.
398 		///////////////////////
399 		byte buffer[] = new byte[len + 5];
400 
401 		// Set the version.
402 		///////////////////
403 		buffer[0] = (byte)SP_MAJOR_VERSION;
404 		buffer[1] = (byte)SP_MINOR_VERSION;
405 		buffer[2] = (byte)SP_PATCH_VERSION;
406 
407 		// Byte used for group membership and priority.
408 		///////////////////////////////////////////////
409 		buffer[3] = 0;
410 
411 		// Group membership.
412 		////////////////////
413 		if(groupMembership)
414 		{
415 			buffer[3] |= 0x01;
416 		}
417 
418 		// Priority.
419 		////////////
420 		if(priority)
421 		{
422 			buffer[3] |= 0x10;
423 		}
424 
425 		// Write the length.
426 		////////////////////
427 		buffer[4] = (byte)len;
428 
429 		if(len > 0)
430 		{
431 			// Write the private name.
432 			//////////////////////////
433 			byte nameBytes[] = privateName.getBytes();
434 			for(int src = 0, dest = 5 ; src < len ; src++, dest++)
435 			{
436 				buffer[dest] = nameBytes[src];
437 			}
438 		}
439 
440 		// Send the connection message.
441 		///////////////////////////////
442 		try
443 		{
444 			socketOutput.write(buffer);
445 		}
446 		catch(IOException e)
447 		{
448 			throw new SpreadException("write(): " + e);
449 		}
450 	}
451 
452         // read the Auth List
453         /////////////////////
readAuthMethods()454     	private void readAuthMethods() throws SpreadException
455 	{
456 		// Read the length.
457 		///////////////////
458 		int len;
459 		try
460 		{
461 			len = socketInput.read();
462 		}
463 		catch(IOException e)
464 		{
465 			throw new SpreadException("read(): " + e);
466 		}
467 
468 		// System.out.println("readAuthMethods: len is " + len);
469 		// Check for no more data.
470 		//////////////////////////
471 		if(len == -1)
472 		{
473 			throw new SpreadException("Connection closed during connect attempt to read authlen");
474 		}
475 		// Check if it was a response code
476 		//////////////////////////////////
477 		if( len >= 128 )
478 		{
479 			throw new SpreadException("Connection attempt rejected=" + (0xffffff00 | len));
480 		}
481 
482 		// Read the name.
483 		/////////////////
484 		byte buffer[] = new byte[len];
485 		readBytesFromSocket(buffer, "authname");
486 		// System.out.println("readAuthMethods: string is " + new String(buffer));
487 
488 		//		if(numRead != len)
489 		//{
490 		//	throw new SpreadException("Connection closed during connect attempt to read authnames");
491 		//}
492 
493 		// for now we ignore the list.
494 		//////////////////////////////
495 	}
496 
497         // Sends the choice of auth methods  message.
498 	/////////////////////////////////////
sendAuthMethod()499 	private void sendAuthMethod() throws SpreadException
500 	{
501 		int len = authName.length();
502 		// Allocate the buffer.
503 		///////////////////////
504 		byte buffer[] = new byte[MAX_AUTH_NAME*MAX_AUTH_METHODS];
505 
506 		try
507 		{
508 			System.arraycopy(authName.getBytes("ISO8859_1"), 0, buffer, 0, len);
509 		}
510 		catch(UnsupportedEncodingException e)
511 		{
512 			// Already checked for this exception in connect.
513 			/////////////////////////////////////////////////
514 		}
515 		for( ; len < (MAX_AUTH_NAME*MAX_AUTH_METHODS) ; len++ )
516 			buffer[len] = 0;
517 
518 		// Send the connection message.
519 		///////////////////////////////
520 		try
521 		{
522 			socketOutput.write(buffer);
523 		}
524 		catch(IOException e)
525 		{
526 			throw new SpreadException("write(): " + e);
527 		}
528 	}
529 
530         //
531 	/////////////////////////////////////
instantiateAuthMethod()532 	private void instantiateAuthMethod() throws SpreadException
533 	{
534 		Class authclass;
535 
536 		//		System.out.println("Authname is " + authName);
537 		//		System.out.println("class name is " + authClassName);
538 		try {
539 			authclass = Class.forName(authClassName);
540 		} catch (ClassNotFoundException e) {
541 			throw new SpreadException("class " + authClassName + " not found.\n");
542 		}
543 
544 		try {
545 			authObj = authclass.newInstance();
546 		} catch (Exception e) {
547 			throw new SpreadException("class " + authClassName + " error getting instance.\n" + e);
548 		}
549 		try {
550 		    authMethodAuthenticate = authclass.getMethod("authenticate", new Class[] { });
551 		} catch (NoSuchMethodException e) {
552 		    System.out.println("Failed to find auth method authenticate()");
553 		    System.exit(1);
554 		} catch (SecurityException e) {
555 		    System.out.println("security exception for method authenticate()");
556 		    System.exit(1);
557 		}
558 
559 
560 	}
561 	// Checks for an accept message.
562 	////////////////////////////////
checkAccept()563 	private void checkAccept() throws SpreadException
564 	{
565 		// Read the connection response.
566 		////////////////////////////////
567 		int accepted;
568 		try
569 		{
570 			accepted = socketInput.read();
571 		}
572 		catch(IOException e)
573 		{
574 			throw new SpreadException("read(): " + e);
575 		}
576 
577 		// Check for no more data.
578 		//////////////////////////
579 		if(accepted == -1)
580 		{
581 			throw new SpreadException("Connection closed during connect attempt");
582 		}
583 
584 		// Was it accepted?
585 		///////////////////
586 		if(accepted != ACCEPT_SESSION)
587 		{
588 			throw new SpreadException("Connection attempt rejected=" + (0xffffff00 | accepted));
589 		}
590 	}
591 
592 	// Checks the daemon version.
593 	/////////////////////////////
checkVersion()594 	private void checkVersion() throws SpreadException
595 	{
596 		// Read the version.
597 		////////////////////
598 		int majorVersion;
599 		try
600 		{
601 			majorVersion = socketInput.read();
602 		}
603 		catch(IOException e)
604 		{
605 			throw new SpreadException("read(): " + e);
606 		}
607 
608 		// Read the sub-version.
609 		////////////////////////
610 		int minorVersion;
611 		try
612 		{
613 			minorVersion = socketInput.read();
614 		}
615 		catch(IOException e)
616 		{
617 			throw new SpreadException("read(): " + e);
618 		}
619 
620 		// Read the patch-version.
621 		////////////////////////
622 		int patchVersion;
623 		try
624 		{
625 			patchVersion = socketInput.read();
626 		}
627 		catch(IOException e)
628 		{
629 			throw new SpreadException("read(): " + e);
630 		}
631 
632 		// Check for no more data.
633 		//////////////////////////
634 		if((majorVersion == -1) || (minorVersion == -1) || (patchVersion == -1) )
635 		{
636 			throw new SpreadException("Connection closed during connect attempt");
637 		}
638 
639 		// Check the version.
640 		/////////////////////
641 		int version = ( (majorVersion*10000) + (minorVersion*100) + patchVersion );
642 		if(version < 30100)
643 		{
644 			throw new SpreadException("Old version " + majorVersion + "." + minorVersion + "." + patchVersion + " not supported");
645 		}
646 		if((version < 30800) && (priority))
647 		{
648 			throw new SpreadException("Old version " + majorVersion + "." + minorVersion + "." + patchVersion + " does not support priority");
649 		}
650 	}
651 
652 	// Get the private group name.
653 	//////////////////////////////
readGroup()654 	private void readGroup() throws SpreadException
655 	{
656 		// Read the length.
657 		///////////////////
658 		int len;
659 		try
660 		{
661 			len = socketInput.read();
662 		}
663 		catch(IOException e)
664 		{
665 			throw new SpreadException("read(): " + e);
666 		}
667 
668 		// Check for no more data.
669 		//////////////////////////
670 		if(len == -1)
671 		{
672 			throw new SpreadException("Connection closed during connect attempt");
673 		}
674 
675 		// Read the name.
676 		/////////////////
677 		byte buffer[] = new byte[len];
678 		readBytesFromSocket(buffer, "group name");
679 
680 		// Store the group.
681 		///////////////////
682 		group = new SpreadGroup(this, new String(buffer));
683 	}
684 
685 	// Constructor.
686 	///////////////
687 	/**
688 	 * Initializes a new SpreadConnection object.  To connect to a daemon with this
689 	 * object, use {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)}.
690 	 *
691 	 * @see  SpreadConnection#connect(InetAddress, int, String, boolean, boolean)
692 	 */
SpreadConnection()693 	public SpreadConnection()
694 	{
695 		// We're not connected.
696 		///////////////////////
697 		connected = false;
698 
699 		// Init synchros.
700 		/////////////////
701 		rsynchro = new Boolean(false);
702 		wsynchro = new Boolean(false);
703 		listenersynchro = new Boolean(false);
704 		// Init listeners.
705 		//////////////////
706 		basicListeners = new Vector();
707 		advancedListeners = new Vector();
708 
709 		// Init listener command buffer.
710 		////////////////////////////////
711 		listenerBuffer = new Vector();
712 
713 		// Init default authentication
714 		//////////////////////////////
715 		authName = DEFAULT_AUTH_NAME;
716 		authClassName = DEFAULT_AUTHCLASS_NAME;
717 	}
718 
719         /**
720          * Sets the authentication name and class string for the client side authentication method.
721          * An authentication method can only be registered before connect is called.
722          * The authentication method registered will then be used whenever
723          * {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)} is called.
724          *
725          * @param  authName  the short official "name" of the method begin registered.
726          * @param  authClassName  the complete class name for the method (including package)
727          * @throws SpreadException if the connection is already established
728          */
registerAuthentication( String authName, String authClassName )729         synchronized public void registerAuthentication( String authName, String authClassName ) throws SpreadException
730         {
731 		// Check if we're connected.
732 		////////////////////////////
733 		if(connected == true)
734 		{
735 			throw new SpreadException("Already connected.");
736 		}
737 
738                 this.authClassName = authClassName;
739 
740 		try
741 		{
742 			this.authName = authName.substring(0, MAX_AUTH_NAME);
743 		}
744 		catch(IndexOutOfBoundsException e)
745 		{
746 			// Nothing to shorten.
747 			//////////////////////
748 			this.authName = authName;
749 		}
750         }
751 
752 	// Establishes a connection with the spread daemon.
753 	///////////////////////////////////////////////////
754 	/**
755 	 * Establishes a connection to a spread daemon.  Groups can be joined, and messages can be
756 	 * sent or received once the connection has been established.
757 	 *
758 	 * @param  address  the daemon's address, or null to connect to the localhost
759 	 * @param  port  the daemon's port, or 0 for the default port (4803)
760 	 * @param  privateName  the private name to use for this connection
761 	 * @param  priority  if true, this is a priority connection
762 	 * @param  groupMembership  if true, membership messages will be received on this connection
763 	 * @throws SpreadException  if the connection cannot be established
764 	 * @see  SpreadConnection#disconnect()
765 	 */
connect(InetAddress address, int port, String privateName, boolean priority, boolean groupMembership)766 	synchronized public void connect(InetAddress address, int port, String privateName, boolean priority, boolean groupMembership) throws SpreadException
767 	{
768 		// Check if we're connected.
769 		////////////////////////////
770 		if(connected == true)
771 		{
772 			throw new SpreadException("Already connected.");
773 		}
774 
775 		// Is ISO8859_1 encoding supported?
776 		///////////////////////////////
777 		try
778 		{
779 			new String("ASCII/ISO8859_1 encoding test").getBytes("ISO8859_1");
780 		}
781 		catch(UnsupportedEncodingException e)
782 		{
783 			throw new SpreadException("ISO8859_1 encoding is not supported.");
784 		}
785 
786 		// Store member variables.
787 		//////////////////////////
788 		this.address = address;
789 		this.port = port;
790 		this.priority = priority;
791 		this.groupMembership = groupMembership;
792 
793 		// Check if no address was specified.
794 		/////////////////////////////////////
795 		if(address == null)
796 		{
797 			// Use the local host.
798 			//////////////////////
799 			try
800 			{
801 				address = InetAddress.getLocalHost();
802 			}
803 			catch(UnknownHostException e)
804 			{
805 				throw new SpreadException("Error getting local host");
806 			}
807 		}
808 
809 		// Check if no port was specified.
810 		//////////////////////////////////
811 		if(port == 0)
812 		{
813 			// Use the default port.
814 			////////////////////////
815 			port = DEFAULT_SPREAD_PORT;
816 		}
817 
818 		// Check if the port is out of range.
819 		/////////////////////////////////////
820 		if((port < 0) || (port > (32 * 1024)))
821 		{
822 			throw new SpreadException("Bad port (" + port + ").");
823 		}
824 
825 		// Create the socket.
826 		/////////////////////
827 		try
828 		{
829 			socket = new Socket(address, port);
830 		}
831 		catch(IOException e)
832 		{
833 			throw new SpreadException("Socket(): " + e);
834 		}
835 
836 		// Set the socket's buffer sizes.
837 		/////////////////////////////////
838 		setBufferSizes();
839 
840 		// Get the socket's streams.
841 		////////////////////////////
842 		try
843 		{
844 			socketInput = socket.getInputStream();
845 			socketOutput = socket.getOutputStream();
846 		}
847 		catch(IOException e)
848 		{
849 			throw new SpreadException("getInput/OutputStream(): " + e);
850 		}
851 
852 		// Send the connect message.
853 		////////////////////////////
854 		sendConnect(privateName);
855 
856 		// Recv the authentication method list
857 		//////////////////////////////////////
858 		readAuthMethods();
859 
860 		// Send auth method choice
861 		//////////////////////////
862 		sendAuthMethod();
863 
864 		// turn string name of auth method into class and instance
865 		//////////////////////////////////////////////////////////
866 		try {
867 		    instantiateAuthMethod();
868 		} catch (SpreadException e) {
869 		    System.out.println("Failed to create authMethod instance" + e.toString() );
870 		    System.exit(1);
871 		}
872 		// Call authenticate module. This will only return when connection is authenticated
873 		////////////////////////////
874 		try {
875 			    authMethodAuthenticate.invoke( authObj, new Object[] { });
876 		    } catch (IllegalAccessException e) {
877 			    System.out.println("error calling authenticate" + e.toString() );
878 			    System.exit(1);
879 		    } catch (IllegalArgumentException e) {
880 			    System.out.println("error calling authenticate" + e.toString() );
881 			    System.exit(1);
882 		    } catch (java.lang.reflect.InvocationTargetException e) {
883 			    Throwable thr = e.getTargetException();
884 			    if ( thr.getClass().equals(SpreadException.class) )
885 			    {
886 				 throw new SpreadException("Connection Rejected: Authentication failed");
887 			    }
888 		}
889 		// Check for acceptance.
890 		////////////////////////
891 		checkAccept();
892 
893 		// Check the version.
894 		/////////////////////
895 		checkVersion();
896 
897 		// Get the private group name.
898 		//////////////////////////////
899 		readGroup();
900 
901 		// Connection complete.
902 		///////////////////////
903 		connected = true;
904 
905 		// Are there any listeners.
906 		///////////////////////////
907 		if((basicListeners.size() != 0) || (advancedListeners.size() != 0))
908 		{
909 			// Start the listener thread.
910 			/////////////////////////////
911 			startListener();
912 		}
913 	}
914 
915 	// Disconnects from the spread daemon.
916 	//////////////////////////////////////
917 	/**
918 	 * Disconnects the connection to the daemon.  Nothing else should be done with this connection
919 	 * after disconnecting it.
920 	 *
921 	 * @throws  SpreadException  if there is no connection or there is an error disconnecting
922 	 * @see  SpreadConnection#connect(InetAddress, int, String, boolean, boolean)
923 	 */
disconnect()924 	synchronized public void disconnect() throws SpreadException
925 	{
926 		// Check if we're connected.
927 		////////////////////////////
928 		if(connected == false)
929 		{
930 			throw new SpreadException("Not connected.");
931 		}
932 
933 		// Are we in a listener callback?
934 		/////////////////////////////////
935 		if(callingListeners)
936 		{
937 			// Add it to the command buffer.
938 			////////////////////////////////
939 			listenerBuffer.addElement(BUFFER_DISCONNECT);
940 
941 			// Don't need to do anything else.
942 			//////////////////////////////////
943 			return;
944 		}
945 
946 		// Get a new message.
947 		/////////////////////
948 		SpreadMessage killMessage = new SpreadMessage();
949 
950 		// Send it to our private group.
951 		////////////////////////////////
952 		killMessage.addGroup(group);
953 
954 		// Set the service type.
955 		////////////////////////
956 		killMessage.setServiceType(SpreadMessage.KILL_MESS);
957 
958 		// Send the message.
959 		////////////////////
960 		multicast(killMessage);
961 
962 		// Check for a listener thread.
963 		///////////////////////////////
964 		if(listener != null)
965 		{
966 			// Stop it.
967 			///////////
968 			stopListener();
969 		}
970 
971 		// Close the socket.
972 		////////////////////
973 		try
974 		{
975 			socket.close();
976 		}
977 		catch(IOException e)
978 		{
979 			throw new SpreadException("close(): " + e);
980 		}
981 
982 		connected = false;
983 	}
984 
985 	// Gets the user's private group.
986 	/////////////////////////////////
987 	/**
988 	 * Gets the private group for this connection.
989 	 *
990 	 * @return  the SpreadGroup representing this connection's private group, or null if there is no connection
991 	 */
getPrivateGroup()992 	public SpreadGroup getPrivateGroup()
993 	{
994 		// Check if we're connected.
995 		////////////////////////////
996 		if(connected == false)
997 		{
998 			return null;
999 		}
1000 
1001 		return group;
1002 	}
1003 
1004 	// Receives a new message.
1005 	//////////////////////////
1006 	/**
1007 	 * Receives the next message waiting on this connection.  If there are no messages
1008 	 * waiting, the call will block until a message is ready to be received.
1009 	 *
1010 	 * @return  the message that has just been received
1011 	 * @throws  SpreadException  if there is no connection or there is any error reading a new message
1012 	 */
receive()1013         public SpreadMessage receive() throws SpreadException, InterruptedIOException
1014 	{
1015 
1016 	    synchronized(rsynchro) {
1017 	        // Check if there are any listeners.
1018 		////////////////////////////////////
1019 	        if((basicListeners.isEmpty() == false) || (advancedListeners.isEmpty() == false))
1020 		{
1021 			// Get out of here.
1022 			///////////////////
1023 		 	throw new SpreadException("Tried to receive while there are listeners");
1024 		}
1025 
1026 		return internal_receive();
1027 	    }
1028 	}
1029 
1030         // Actually receives a new message
1031         ///////////////////////////////////
internal_receive()1032 	private SpreadMessage internal_receive() throws SpreadException, InterruptedIOException
1033 	{
1034 		// Check if we're connected.
1035 		////////////////////////////
1036 		if(connected == false)
1037 		{
1038 			throw new SpreadException("Not connected.");
1039 		}
1040 
1041 		// Read the header.
1042 		///////////////////
1043 		byte header[] = new byte[MAX_GROUP_NAME+16];
1044 		int headerIndex;
1045 		int rcode;
1046 		try
1047 		{
1048 			for(headerIndex = 0 ; headerIndex < header.length ; headerIndex += rcode)
1049 			{
1050 				rcode = socketInput.read(header, headerIndex, header.length - headerIndex);
1051 				if(rcode == -1)
1052 				{
1053 					throw new SpreadException("Connection closed while reading header");
1054 				}
1055 			}
1056 		}
1057 		catch(InterruptedIOException e) {
1058 		    throw e;
1059 		}
1060 		catch(IOException e)
1061 		{
1062 			throw new SpreadException("read(): " + e);
1063 		}
1064 
1065 		// Reset header index.
1066 		//////////////////////
1067 		headerIndex = 0;
1068 
1069 		// Get service type.
1070 		////////////////////
1071 		int serviceType = toInt(header, headerIndex);
1072 		headerIndex += 4;
1073 
1074 		// Get the sender.
1075 		//////////////////
1076 		SpreadGroup sender = toGroup(header, headerIndex);
1077 		headerIndex += MAX_GROUP_NAME;
1078 
1079 		// Get the number of groups.
1080 		////////////////////////////
1081 		int numGroups = toInt(header, headerIndex);
1082 		headerIndex += 4;
1083 
1084 		// Get the hint/type.
1085 		/////////////////////
1086 		int hint = toInt(header, headerIndex);
1087 		headerIndex += 4;
1088 
1089 		// Get the data length.
1090 		///////////////////////
1091 		int dataLen = toInt(header, headerIndex);
1092 		headerIndex += 4;
1093 
1094 		// Does the header need to be flipped?
1095 		// (Checking for a daemon server endian-mismatch)
1096 		/////////////////////////////////////////////////
1097 		boolean daemonEndianMismatch;
1098 		if(sameEndian(serviceType) == false)
1099 		{
1100 			// Flip them.
1101 			/////////////
1102 			serviceType = flip(serviceType);
1103 			numGroups = flip(numGroups);
1104 			dataLen = flip(dataLen);
1105 
1106 			// The daemon endian-mismatch.
1107 			//////////////////////////////
1108 			daemonEndianMismatch = true;
1109 		}
1110 		else
1111 		{
1112 			// The daemon endian-mismatch.
1113 			//////////////////////////////
1114 			daemonEndianMismatch = false;
1115 		}
1116 
1117 		// Validate numGroups and dataLen
1118 
1119 		if ( (numGroups < 0) || (dataLen < 0) )
1120 		{
1121 			// drop message
1122 			throw new SpreadException("Illegal Message: Message Dropped");
1123 		}
1124 
1125 		// An endian mismatch.
1126 		//////////////////////
1127 		boolean endianMismatch;
1128 
1129 		// The type.
1130 		////////////
1131 		short type;
1132 
1133 		// Is this a regular message?
1134 		/////////////////////////////
1135 		if( SpreadMessage.isRegular(serviceType) || SpreadMessage.isReject(serviceType) )
1136 		{
1137 			// Does the hint need to be flipped?
1138 			// (Checking for a sender endian-mismatch)
1139 			//////////////////////////////////////////
1140 			if(sameEndian(hint) == false)
1141 			{
1142 				hint = flip(hint);
1143 				endianMismatch = true;
1144 			}
1145 			else
1146 			{
1147 				endianMismatch = false;
1148 			}
1149 
1150 			// Get the type from the hint.
1151 			//////////////////////////////
1152 			hint = clearEndian(hint);
1153 			hint >>= 8;
1154 			hint &= 0x0000FFFF;
1155 			type = (short)hint;
1156 		}
1157 		else
1158 		{
1159 			// This is not a regular message.
1160 			/////////////////////////////////
1161 			type = -1;
1162 			endianMismatch = false;
1163 		}
1164 
1165                 if( SpreadMessage.isReject(serviceType) )
1166                 {
1167                         // Read in the old type and or with reject type field.
1168                         byte oldtypeBuffer[] = new byte[4];
1169                         try
1170                         {
1171                                 for(int oldtypeIndex = 0 ; oldtypeIndex < oldtypeBuffer.length ; )
1172                                 {
1173                                         rcode = socketInput.read(oldtypeBuffer, oldtypeIndex, oldtypeBuffer.length - oldtypeIndex);
1174                                         if(rcode == -1)
1175                                         {
1176                                                 throw new SpreadException("Connection closed while reading groups");
1177                                         }
1178                                         oldtypeIndex += rcode;
1179                                 }
1180                         }
1181                         catch(InterruptedIOException e) {
1182                                 throw e;
1183                         }
1184                         catch(IOException e)
1185                         {
1186                                 throw new SpreadException("read(): " + e);
1187                         }
1188                         int oldType = toInt(oldtypeBuffer, 0);
1189                         if ( sameEndian(serviceType) == false )
1190                             oldType = flip(oldType);
1191 
1192                         serviceType = (SpreadMessage.REJECT_MESS | oldType);
1193                 }
1194 
1195 		// Read in the group names.
1196 		///////////////////////////
1197 		byte buffer[] = new byte[numGroups * MAX_GROUP_NAME];
1198 		try
1199 		{
1200 			for(int bufferIndex = 0 ; bufferIndex < buffer.length ; )
1201 			{
1202 				rcode = socketInput.read(buffer, bufferIndex, buffer.length - bufferIndex);
1203 				if(rcode == -1)
1204 				{
1205 					throw new SpreadException("Connection closed while reading groups");
1206 				}
1207 				bufferIndex += rcode;
1208 			}
1209 		}
1210 		catch(InterruptedIOException e) {
1211 		    throw e;
1212 		}
1213 		catch(IOException e)
1214 		{
1215 			throw new SpreadException("read(): " + e);
1216 		}
1217 
1218 		// Clear the endian type.
1219 		/////////////////////////
1220 		serviceType = clearEndian(serviceType);
1221 
1222 		// Get the groups from the buffer.
1223 		//////////////////////////////////
1224 		Vector groups = new Vector(numGroups);
1225 		for(int bufferIndex = 0 ; bufferIndex < buffer.length ; bufferIndex += MAX_GROUP_NAME)
1226 		{
1227 			// Translate the name into a group and add it to the vector.
1228 			////////////////////////////////////////////////////////////
1229 			groups.addElement(toGroup(buffer, bufferIndex));
1230 		}
1231 
1232 		// Read in the data.
1233 		////////////////////
1234 		byte data[] = new byte[dataLen];
1235 		try
1236 		{
1237 			for(int dataIndex = 0 ; dataIndex < dataLen ; )
1238 			{
1239 				rcode = socketInput.read(data, dataIndex, dataLen - dataIndex);
1240 				if(rcode == -1)
1241 				{
1242 					throw new SpreadException("Connection close while reading data");
1243 				}
1244 				dataIndex += rcode;
1245 			}
1246 		}
1247 		catch(InterruptedIOException e) {
1248 		    throw e;
1249 		}
1250 		catch(IOException e)
1251 		{
1252 			throw new SpreadException("read():" + e);
1253 		}
1254 
1255 		// Is it a membership message?
1256 		//////////////////////////////
1257 		MembershipInfo membershipInfo;
1258 		if(SpreadMessage.isMembership(serviceType))
1259 		{
1260 			// Create a membership info object.
1261 			///////////////////////////////////
1262 			membershipInfo = new MembershipInfo(this, serviceType, groups, sender, data, daemonEndianMismatch);
1263 
1264 			// Is it a regular membership message?
1265 			//////////////////////////////////////
1266 			if(membershipInfo.isRegularMembership())
1267 			{
1268 				// Find which of these groups is the local connection.
1269 				//////////////////////////////////////////////////////
1270 				type = (short)groups.indexOf(group);
1271 			}
1272 		}
1273 		else
1274 		{
1275 			// There's no membership info.
1276 			//////////////////////////////
1277 			membershipInfo = null;
1278 		}
1279 
1280 		// Create the message.
1281 		//////////////////////
1282 		return new SpreadMessage(serviceType, groups, sender, data, type, endianMismatch, membershipInfo);
1283 	}
1284 
1285 	// Receives numMessages new messages.
1286 	/////////////////////////////////////
1287 	/**
1288 	 * Receives <code>numMessages</code> messages on the connection and returns them in an array.
1289 	 * If there are not <code>numMessages</code> messages waiting, the call will block until there are
1290 	 * enough messages available.
1291 	 *
1292 	 * @param  numMessages  the number of messages to receive
1293 	 * @return an array of messages
1294 	 * @throws  SpreadException  if there is no connection or if there is any error reading the messages
1295 	 */
receive(int numMessages)1296         public SpreadMessage[] receive(int numMessages) throws SpreadException, InterruptedIOException
1297 	{
1298 		// Allocate the messages array.
1299 		///////////////////////////////
1300 		SpreadMessage[] messages = new SpreadMessage[numMessages];
1301 		synchronized(rsynchro) {
1302 		        // Check if there are any listeners.
1303 		        ////////////////////////////////////
1304 		        if ((basicListeners.isEmpty() == false) || (advancedListeners.isEmpty() == false))
1305 			{
1306 			    // Get out of here.
1307 			    ///////////////////
1308 			    throw new SpreadException("Tried to receive while there are listeners");
1309 			}
1310 
1311 			// Receive the messages.
1312 			////////////////////////
1313 			for(int i = 0 ; i < numMessages ; i++)
1314 			{
1315 				messages[i] = internal_receive();
1316 			}
1317 
1318 		}
1319 		// Return the array.
1320 		////////////////////
1321 		return messages;
1322 	}
1323 
1324 	// Returns true if there are messages waiting.
1325 	//////////////////////////////////////////////
1326 	/**
1327 	 * Returns true if there are any messages waiting on this connection.
1328 	 *
1329 	 * @return true if there is at least one message that can be received
1330 	 * @throws  SpreadException  if there is no connection or if there is an error checking for messages
1331 	 */
poll()1332 	public boolean poll() throws SpreadException
1333 	{
1334 		// Check if we're connected.
1335 		////////////////////////////
1336 		if(connected == false)
1337 		{
1338 			throw new SpreadException("Not connected.");
1339 		}
1340 
1341 		// Check if there is anything waiting.
1342 		//////////////////////////////////////
1343 		try
1344 		{
1345 			if(socketInput.available() == 0)
1346 			{
1347 				// There's nothing to read.
1348 				///////////////////////////
1349 				return false;
1350 			}
1351 		}
1352 		catch(IOException e)
1353 		{
1354 			throw new SpreadException("available(): " + e);
1355 		}
1356 
1357 		// There's something to read.
1358 		/////////////////////////////
1359 		return true;
1360 	}
1361 
1362 	// Private function for starting a listener thread.
1363 	///////////////////////////////////////////////////
startListener()1364 	private void startListener()
1365 	{
1366 		// Get a new thread.
1367 		////////////////////
1368 		listener = new Listener(this);
1369 
1370 		// Start it.
1371 		////////////
1372 		listener.start();
1373 	}
1374 
1375 	// Adds a new basic message listener.
1376 	/////////////////////////////////////
1377 	/**
1378 	 * Adds the BasicMessageListener to this connection.  If there are no other listeners, this call will
1379 	 * start a thread to listen for new messages.  From the time this function is called until
1380 	 * this listener is removed, {@link BasicMessageListener#messageReceived(SpreadMessage)} will
1381 	 * be called every time a message is received.
1382 	 *
1383 	 * @param  listener  a BasicMessageListener to add to this connection
1384 	 * @see  SpreadConnection#remove(BasicMessageListener)
1385 	 */
add(BasicMessageListener listener)1386 	public void add(BasicMessageListener listener)
1387 	{
1388 	synchronized(listenersynchro) {
1389 		// Are we in a listener callback?
1390 		/////////////////////////////////
1391 		if(callingListeners)
1392 		{
1393 			// Add it to the command buffer.
1394 			////////////////////////////////
1395 			listenerBuffer.addElement(BUFFER_ADD_BASIC);
1396 			listenerBuffer.addElement(listener);
1397 
1398 			// Don't need to do anything else.
1399 			//////////////////////////////////
1400 			return;
1401 		}
1402 		// Add the listener.
1403 		////////////////////
1404 		basicListeners.addElement(listener);
1405 
1406 		// Check if we're connected.
1407 		////////////////////////////
1408 		if(connected == true)
1409 		{
1410 			// Check if the thread is running.
1411 			//////////////////////////////////
1412 			if(this.listener == null)
1413 			{
1414 				// Start the thread.
1415 				////////////////////
1416 				startListener();
1417 			}
1418 		}
1419 	}
1420 	}
1421 
1422 	// Adds a new advanced message listener.
1423 	////////////////////////////////////////
1424 	/**
1425 	 * Adds the AdvancedMessageListener to this connection.  If there are no other listeners, this call will
1426 	 * start a thread to listen for new messages.  From the time this function is called until
1427 	 * this listener is removed, {@link AdvancedMessageListener#regularMessageReceived(SpreadMessage)} will
1428 	 * be called every time a regular message is received, and
1429 	 * {@link AdvancedMessageListener#membershipMessageReceived(SpreadMessage)} will be called every time
1430 	 * a membership message is received.
1431 	 *
1432 	 * @param  listener an AdvancedMessageListener to add to this connection
1433 	 * @see  SpreadConnection#remove(AdvancedMessageListener)
1434 	 */
add(AdvancedMessageListener listener)1435 	public void add(AdvancedMessageListener listener)
1436 	{
1437 	synchronized (listenersynchro) {
1438 		// Are we in a listener callback?
1439 		/////////////////////////////////
1440 		if(callingListeners)
1441 		{
1442 			// Add it to the command buffer.
1443 			////////////////////////////////
1444 			listenerBuffer.addElement(BUFFER_ADD_ADVANCED);
1445 			listenerBuffer.addElement(listener);
1446 
1447 			// Don't need to do anything else.
1448 			//////////////////////////////////
1449 			return;
1450 		}
1451 
1452 		// Add the listener.
1453 		////////////////////
1454 		advancedListeners.addElement(listener);
1455 
1456 		// Check if we're connected.
1457 		////////////////////////////
1458 		if(connected == true)
1459 		{
1460 			// Check if the thread is running.
1461 			//////////////////////////////////
1462 			if(this.listener == null)
1463 			{
1464 				// Start the thread.
1465 				////////////////////
1466 				startListener();
1467 			}
1468 		}
1469 	}
1470 	}
1471 
1472 	// Stops the listener thread.
1473 	/////////////////////////////
stopListener()1474 	private void stopListener()
1475 	{
1476 		// Set the signal.
1477 		//////////////////
1478 		listener.signal = true;
1479 
1480 		// Wait for the thread to die, to avoid inconsistencies.
1481 		////////////////////////////////////////////////////////
1482 		listener.join();
1483 
1484 		// Clear the variable.
1485 		//////////////////////
1486 		listener = null;
1487 	}
1488 
1489 	// Removes a basic message listener.
1490 	////////////////////////////////////
1491 	/**
1492 	 * Removes the BasicMessageListener from this connection.  If this is the only listener on this
1493 	 * connection, the listener thread will be stopped.
1494 	 *
1495 	 * @param  listener  the listener to remove
1496 	 * @see  SpreadConnection#add(BasicMessageListener)
1497 	 */
remove(BasicMessageListener listener)1498 	public void remove(BasicMessageListener listener)
1499 	{
1500 	synchronized (listenersynchro) {
1501 		// Are we in a listener callback?
1502 		/////////////////////////////////
1503 		if(callingListeners)
1504 		{
1505 			// Add it to the command buffer.
1506 			////////////////////////////////
1507 			listenerBuffer.addElement(BUFFER_REMOVE_BASIC);
1508 			listenerBuffer.addElement(listener);
1509 
1510 			// Don't need to do anything else.
1511 			//////////////////////////////////
1512 			return;
1513 		}
1514 
1515 		// Remove the listener.
1516 		///////////////////////
1517 		basicListeners.removeElement(listener);
1518 
1519 		// Check if we're connected.
1520 		////////////////////////////
1521 		if(connected == true)
1522 		{
1523 			// Check if there are any more listeners.
1524 			/////////////////////////////////////////
1525 			if((basicListeners.size() == 0) && (advancedListeners.size() == 0))
1526 			{
1527 				// Stop the listener thread.
1528 				////////////////////////////
1529 				stopListener();
1530 			}
1531 		}
1532 	}
1533 	}
1534 
1535 	// Removes an advanced message listener.
1536 	////////////////////////////////////////
1537 	/**
1538 	 * Removes the AdvancedMessageListener from this connection.  If this is the only listener on this
1539 	 * connection, the listener thread will be stopped.
1540 	 *
1541 	 * @param  listener  the listener to remove
1542 	 * @see SpreadConnection#add(AdvancedMessageListener)
1543 	 */
remove(AdvancedMessageListener listener)1544 	public void remove(AdvancedMessageListener listener)
1545 	{
1546 	synchronized (listenersynchro) {
1547 		// Are we in a listener callback?
1548 		/////////////////////////////////
1549 		if(callingListeners)
1550 		{
1551 			// Add it to the command buffer.
1552 			////////////////////////////////
1553 			listenerBuffer.addElement(BUFFER_REMOVE_ADVANCED);
1554 			listenerBuffer.addElement(listener);
1555 
1556 			// Don't need to do anything else.
1557 			//////////////////////////////////
1558 			return;
1559 		}
1560 
1561 		// Remove the listener.
1562 		///////////////////////
1563 		advancedListeners.removeElement(listener);
1564 
1565 		// Check if we're connected.
1566 		////////////////////////////
1567 		if(connected == true)
1568 		{
1569 			// Check if there are any more listeners.
1570 			/////////////////////////////////////////
1571 			if((basicListeners.size() == 0) && (advancedListeners.size() == 0))
1572 			{
1573 				// Stop the listener thread.
1574 				////////////////////////////
1575 				stopListener();
1576 			}
1577 		}
1578 	}
1579 	}
1580 
1581 	// This is the thread used to handle listener interfaces.
1582 	/////////////////////////////////////////////////////////
1583 	private class Listener extends Thread
1584 	{
1585 		// The connection this thread is listening to.
1586 		//////////////////////////////////////////////
1587 		private SpreadConnection connection;
1588 
1589 		// If true, the connection wants the thread to stop.
1590 		////////////////////////////////////////////////////
1591 		protected boolean signal;
1592 
1593 		// The constructor.
1594 		///////////////////
Listener(SpreadConnection connection)1595 		public Listener(SpreadConnection connection)
1596 		{
1597 			// Store local variables.
1598 			/////////////////////////
1599 			this.connection = connection;
1600 			this.signal = false;
1601 
1602 			// Be a daemon.
1603 			///////////////
1604 			this.setDaemon(true);
1605 		}
1606 
1607 		// The thread's entry point.
1608 		////////////////////////////
run()1609 		public void run()
1610 		{
1611 			// An incoming message.
1612 			///////////////////////
1613 			SpreadMessage message;
1614 
1615 			// A basic listener.
1616 			////////////////////
1617 			BasicMessageListener basicListener;
1618 
1619 			// An advanced listener.
1620 			////////////////////////
1621 			AdvancedMessageListener advancedListener;
1622 
1623 			// A buffered command.
1624 			//////////////////////
1625 			Object command;
1626 
1627 			int previous_socket_timeout = 100;
1628 
1629 			try
1630 			{
1631 			        try {
1632 				    previous_socket_timeout = connection.socket.getSoTimeout();
1633 				    connection.socket.setSoTimeout(100);
1634 				}
1635 				catch( SocketException e ) {
1636 				    // just ignore for now
1637 				    System.out.println("socket error setting timeout" + e.toString() );
1638 				}
1639 				while(true)
1640 				{
1641 					// Get a lock on the connection.
1642 					////////////////////////////////
1643 					synchronized(connection)
1644 					{
1645 						// Do they want us to stop?
1646 						///////////////////////////
1647 						if(signal == true)
1648 						{
1649 						    // We're done.
1650 						    //////////////
1651 						    System.out.println("LISTENER: told to exit so returning");
1652 						    try {
1653 							connection.socket.setSoTimeout(previous_socket_timeout);
1654 						    }
1655 						    catch( SocketException e ) {
1656 							// just ignore for now
1657 							System.out.println("socket error setting timeout" + e.toString() );
1658 						    }
1659 
1660 						    return;
1661 						}
1662 
1663 						// Get a message.
1664 						// WE WILL BLOCK HERE UNTIL DATA IS AVAILABLE
1665 						// or 100 MS expires
1666 						/////////////////
1667 						try {
1668 						    synchronized(rsynchro) {
1669 							message = connection.internal_receive();
1670 						    }
1671 						    // Calling listeners.
1672 						    /////////////////////
1673 						    callingListeners = true;
1674 
1675 						    // Tell all the basic listeners.
1676 						    ////////////////////////////////
1677 						    for(int i = 0 ; i < basicListeners.size() ; i++)
1678 						    {
1679 							    // Get the listener.
1680 							    ////////////////////
1681 							    basicListener = (BasicMessageListener)basicListeners.elementAt(i);
1682 
1683 							    // Tell it.
1684 							    ///////////
1685 							    basicListener.messageReceived(message);
1686 						    }
1687 
1688 						    // Tell all the advanced listeners.
1689 						    ///////////////////////////////////
1690 						    for(int i = 0 ; i < advancedListeners.size() ; i++)
1691 						    {
1692 							    // Get the listener.
1693 							    ////////////////////
1694 							    advancedListener = (AdvancedMessageListener)advancedListeners.elementAt(i);
1695 
1696 							    // What type of message is it?
1697 							    //////////////////////////////
1698 							    if(message.isRegular())
1699 							    {
1700 								    // Tell it.
1701 								    ///////////
1702 								    advancedListener.regularMessageReceived(message);
1703 							    }
1704 							    else
1705 							    {
1706 								    // Tell it.
1707 								    ///////////
1708 								    advancedListener.membershipMessageReceived(message);
1709 							    }
1710 						    }
1711 
1712 						    // Done calling listeners.
1713 						    //////////////////////////
1714 						    callingListeners = false;
1715 
1716 
1717 						} catch( InterruptedIOException e) {
1718 						    /// Ignore
1719 						}
1720 						// Execute buffered commands.
1721 						/////////////////////////////
1722 						while(listenerBuffer.isEmpty() == false)
1723 						{
1724 							// Get the first command.
1725 							/////////////////////////
1726 							command = listenerBuffer.firstElement();
1727 
1728 							// Remove it from the list.
1729 							///////////////////////////
1730 							listenerBuffer.removeElementAt(0);
1731 
1732 								// Check what type of command it is.
1733 								////////////////////////////////////
1734 							if(command == BUFFER_DISCONNECT)
1735 							{
1736 								// Disconnect.
1737 								//////////////
1738 								connection.disconnect();
1739 
1740 								// Don't execute any more commands.
1741 								///////////////////////////////////
1742 								listenerBuffer.removeAllElements();
1743 							}
1744 							else if(command == BUFFER_ADD_BASIC)
1745 							{
1746 								// Get the listener.
1747 								////////////////////
1748 								basicListener = (BasicMessageListener)listenerBuffer.firstElement();
1749 
1750 								// Add it.
1751 								//////////
1752 								connection.add(basicListener);
1753 								// Remove the listener from the Vector.
1754 								///////////////////////////////////////
1755 								listenerBuffer.removeElementAt(0);
1756 
1757 							}
1758 							else if(command == BUFFER_ADD_ADVANCED)
1759 							{
1760 								// Get the listener.
1761 								////////////////////
1762 								advancedListener = (AdvancedMessageListener)listenerBuffer.firstElement();
1763 
1764 								// Add it.
1765 								//////////
1766 								connection.add(advancedListener);
1767 								// Remove the listener from the Vector.
1768 								///////////////////////////////////////
1769 								listenerBuffer.removeElementAt(0);
1770 
1771 							}
1772 							else if(command == BUFFER_REMOVE_BASIC)
1773 							{
1774 								// Get the listener.
1775 								////////////////////
1776 								basicListener = (BasicMessageListener)listenerBuffer.firstElement();
1777 
1778 								// Remove it.
1779 								/////////////
1780 								connection.remove(basicListener);
1781 
1782 									// Remove the listener from the Vector.
1783 									///////////////////////////////////////
1784 								listenerBuffer.removeElementAt(0);
1785 							}
1786 							else if(command == BUFFER_REMOVE_ADVANCED)
1787 							{
1788 								// Get the listener.
1789 								////////////////////
1790 								advancedListener = (AdvancedMessageListener)listenerBuffer.firstElement();
1791 
1792 								// Remove it.
1793 								/////////////
1794 								connection.remove(advancedListener);
1795 								// Remove the listener from the Vector.
1796 								///////////////////////////////////////
1797 								listenerBuffer.removeElementAt(0);
1798 							}
1799 						}
1800 					}
1801 
1802 					// There are no messages waiting, take a break.
1803 					///////////////////////////////////////////////
1804 					yield();
1805 				}
1806 			}
1807 			catch(SpreadException e)
1808 			{
1809 				// Nothing to do but ignore it.
1810 				///////////////////////////////
1811 			    System.out.println("SpreadException: " + e.toString() );
1812 			}
1813 		}
1814 	}
1815 
1816 	// Sends the message.
1817 	/////////////////////
1818 	/**
1819 	 * Multicasts a message.  The message will be sent to all the groups specified in
1820 	 * the message.
1821 	 *
1822 	 * @param  message  the message to multicast
1823 	 * @throws  SpreadException  if there is no connection or if there is any error sending the message
1824 	 */
multicast(SpreadMessage message)1825 	public void multicast(SpreadMessage message) throws SpreadException
1826 	{
1827 		// Check if we're connected.
1828 		////////////////////////////
1829 		if(connected == false)
1830 		{
1831 			throw new SpreadException("Not connected.");
1832 		}
1833 
1834 		// The groups this message is going to.
1835 		///////////////////////////////////////
1836 		SpreadGroup groups[] = message.getGroups();
1837 
1838 		// The message data.
1839 		////////////////////
1840 		byte data[] = message.getData();
1841 
1842 		// Calculate the total number of bytes.
1843 		///////////////////////////////////////
1844 		int numBytes = 16;  // serviceType, numGroups, type/hint, dataLen
1845 		numBytes += MAX_GROUP_NAME;  // private group
1846 		numBytes += (MAX_GROUP_NAME * groups.length);  // groups
1847 
1848 		if (numBytes + data.length > MAX_MESSAGE_LENGTH )
1849 		{
1850 		    throw new SpreadException("Message is too long for a Spread Message");
1851 		}
1852 		// Allocate the send buffer.
1853 		////////////////////////////
1854 		byte buffer[] = new byte[numBytes];
1855 		int bufferIndex = 0;
1856 
1857 		// The service type.
1858 		////////////////////
1859 		toBytes(message.getServiceType(), buffer, bufferIndex);
1860 		bufferIndex += 4;
1861 
1862 		// The private group.
1863 		/////////////////////
1864 		toBytes(group, buffer, bufferIndex);
1865 		bufferIndex += MAX_GROUP_NAME;
1866 
1867 		// The number of groups.
1868 		////////////////////////
1869 		toBytes(groups.length, buffer, bufferIndex);
1870 		bufferIndex += 4;
1871 
1872 		// The service type and hint.
1873 		/////////////////////////////
1874 		toBytes(((int)message.getType() << 8) & 0x00FFFF00, buffer, bufferIndex);
1875 		bufferIndex += 4;
1876 
1877 		// The data length.
1878 		///////////////////
1879 		toBytes(data.length, buffer, bufferIndex);
1880 		bufferIndex += 4;
1881 
1882 		// The group names.
1883 		///////////////////
1884 		for(int i = 0 ; i < groups.length ; i++)
1885 		{
1886 			toBytes(groups[i], buffer, bufferIndex);
1887 			bufferIndex += MAX_GROUP_NAME;
1888 		}
1889 
1890 		// Send it.
1891 		///////////
1892 	synchronized (wsynchro) {
1893 		try
1894 		{
1895 			socketOutput.write(buffer);
1896 			socketOutput.write(data);
1897 		}
1898 		catch(IOException e)
1899 		{
1900 			throw new SpreadException("write(): " + e.toString());
1901 		}
1902 	}
1903 	}
1904 
1905 	// Sends the array of messages.
1906 	///////////////////////////////
1907 	/**
1908 	 * Multicasts an array of messages.  Each message will be sent to all the groups specified in
1909 	 * the message.
1910 	 *
1911 	 * @param  messages  the messages to multicast
1912 	 * @throws  SpreadException  if there is no connection or if there is any error sending the messages
1913 	 */
multicast(SpreadMessage messages[])1914 	public void multicast(SpreadMessage messages[]) throws SpreadException
1915 	{
1916 		// Go through the array.
1917 		////////////////////////
1918 		for(int i = 0 ; i < messages.length ; i++)
1919 		{
1920 			// Send this message.
1921 			/////////////////////
1922 			multicast(messages[i]);
1923 		}
1924 	}
1925 }
1926