1 /* 2 * The Spread Toolkit. 3 * 4 * The contents of this file are subject to the Spread Open-Source 5 * License, Version 1.0 (the ``License''); you may not use 6 * this file except in compliance with the License. You may obtain a 7 * copy of the License at: 8 * 9 * http://www.spread.org/license/ 10 * 11 * or in the file ``license.txt'' found in this distribution. 12 * 13 * Software distributed under the License is distributed on an AS IS basis, 14 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 15 * for the specific language governing rights and limitations under the 16 * License. 17 * 18 * The Creators of Spread are: 19 * Yair Amir, Michal Miskin-Amir, Jonathan Stanton. 20 * 21 * Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com> 22 * 23 * All Rights Reserved. 24 * 25 * Major Contributor(s): 26 * --------------- 27 * Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security. 28 * Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf. 29 * Dan Schoenblum dansch@cnds.jhu.edu - Java interface. 30 * John Schultz jschultz@cnds.jhu.edu - contribution to process group membership. 31 * 32 */ 33 34 35 36 package spread; 37 38 import java.net.*; 39 import java.io.*; 40 import java.util.*; 41 42 /** 43 * A SpreadConnection object is used to establish a connection to a spread daemon. 44 * To connect to a spread daemon, first create a new SpreadConnection object, and then 45 * call {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)}: 46 * <p><blockquote><pre> 47 * SpreadConnection connection = new SpreadConnection(); 48 * connection.connect(null, 0, "name", false, false); 49 * </pre></blockquote><p> 50 * The only methods that can be called before 51 * {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)} are the add 52 * ({@link SpreadConnection#add(BasicMessageListener)}, {@link SpreadConnection#add(AdvancedMessageListener)}) 53 * and remove ({@link SpreadConnection#remove(BasicMessageListener)}, {@link SpreadConnection#remove(AdvancedMessageListener)}) 54 * methods. If any other methods are called, a SpreadException is thrown, except for 55 * {@link SpreadConnection#getPrivateGroup()}, which returns null. 56 * <p> 57 * To disconnect from the daemon, call {@link SpreadConnection#disconnect()}: 58 * <p><blockquote><pre> 59 * connection.disconnect(); 60 * </pre></blockquote><p> 61 * To send a message on this connection, call {@link SpreadConnection#multicast(SpreadMessage)}: 62 * <p><blockquote><pre> 63 * connection.multicast(message); 64 * </pre></blockquote><p> 65 * To receive a message sent to this connection, call {@link SpreadConnection#receive()}: 66 * <p><blockquote><pre> 67 * SpreadMessage message = connection.receive(); 68 * </pre></blockquote><p> 69 */ 70 public class SpreadConnection 71 { 72 // The default Spread port. 73 /////////////////////////// 74 private static final int DEFAULT_SPREAD_PORT = 4803; 75 76 // The maximum length of the private name. 77 ////////////////////////////////////////// 78 private static final int MAX_PRIVATE_NAME = 10; 79 80 // The maximum length of a message + group names. 81 ////////////////////////////////////////// 82 private static final int MAX_MESSAGE_LENGTH = 140000; 83 84 // The maximum length of the group name. 85 //////////////////////////////////////// 86 protected static final int MAX_GROUP_NAME = 32; 87 88 // The Spread version. 89 ////////////////////// 90 private static final int SP_MAJOR_VERSION = 3; 91 private static final int SP_MINOR_VERSION = 17; 92 private static final int SP_PATCH_VERSION = 4; 93 94 // The default authentication method 95 //////////////////////////////////// 96 private static final String DEFAULT_AUTH_NAME = "NULL"; 97 98 // The class name of the default authentication method 99 ////////////////////////////////////////////////////// 100 private static final String DEFAULT_AUTHCLASS_NAME = "spread.NULLAuth"; 101 102 // The maximum length of a authentication method name 103 ///////////////////////////////////////////////////// 104 private static final int MAX_AUTH_NAME = 30; 105 106 // The maximum number of authentication methods 107 /////////////////////////////////////////////// 108 private static final int MAX_AUTH_METHODS = 3; 109 110 // Received if a connection attempt was successful. 111 /////////////////////////////////////////////////// 112 private static final int ACCEPT_SESSION = 1; 113 114 // Used to determine endianness. 115 //////////////////////////////// 116 private static final int ENDIAN_TYPE = 0x80000080; 117 118 // Only true if this connection is connected. 119 ///////////////////////////////////////////// 120 private boolean connected; 121 122 // Reading synchro. 123 /////////////////// 124 private Boolean rsynchro; 125 // Writing synchro. 126 /////////////////// 127 private Boolean wsynchro; 128 // Listener list synchro. 129 /////////////////// 130 private Boolean listenersynchro; 131 132 133 // True if calling listeners. 134 ///////////////////////////// 135 private boolean callingListeners; 136 137 // The thread feeding the listeners. 138 //////////////////////////////////// 139 private Listener listener; 140 141 // Basic listeners. 142 /////////////////// 143 protected Vector basicListeners; 144 145 // Advanced listeners. 146 ////////////////////// 147 protected Vector advancedListeners; 148 149 // The daemon's address. 150 //////////////////////// 151 private InetAddress address; 152 153 // The daemon's port. 154 ///////////////////// 155 private int port; 156 157 // Is this a priority connection? 158 ///////////////////////////////// 159 private boolean priority; 160 161 // Getting group membership messages? 162 ///////////////////////////////////// 163 private boolean groupMembership; 164 165 // Name of active choosen Authentication method 166 /////////////////////////////////////////////// 167 private String authName; 168 169 // Name of class for active choosen Authentication method 170 ///////////////////////////////////////////////////////// 171 private String authClassName; 172 173 // Object reference to current authentication class 174 /////////////////////////////////////////////////// 175 private Object authObj; 176 177 // Method reference to authenticate method 178 ////////////////////////////////////////// 179 private java.lang.reflect.Method authMethodAuthenticate; 180 181 // The socket this connection is using. 182 /////////////////////////////////////// 183 private Socket socket; 184 185 // The socket's input stream. 186 ///////////////////////////// 187 private InputStream socketInput; 188 189 // The socket's output stream. 190 ////////////////////////////// 191 private OutputStream socketOutput; 192 193 // The private group. 194 ///////////////////// 195 private SpreadGroup group; 196 197 // Commands buffered during listener callbacks. 198 // The buffer is a list of BUFFER_* constants. 199 // For commands with an argument (all except 200 // for BUFFER_DISCONNECT), the argument follows in the Vector. 201 ////////////////////////////////////////////////////////////// 202 private Vector listenerBuffer; 203 204 // Listener buffer commands. 205 // These are Object's because they need to be added to a Vector. 206 //////////////////////////////////////////////////////////////// 207 private static final Object BUFFER_DISCONNECT = new Object(); 208 private static final Object BUFFER_ADD_BASIC = new Object(); 209 private static final Object BUFFER_ADD_ADVANCED = new Object(); 210 private static final Object BUFFER_REMOVE_BASIC = new Object(); 211 private static final Object BUFFER_REMOVE_ADVANCED = new Object(); 212 213 // Checks if the int is the same-endian type as the local machine. 214 ////////////////////////////////////////////////////////////////// sameEndian(int i)215 private static boolean sameEndian(int i) 216 { 217 return ((i & ENDIAN_TYPE) == 0); 218 } 219 220 // Clears the int's endian type. 221 //////////////////////////////// clearEndian(int i)222 private static int clearEndian(int i) 223 { 224 return (i & ~ENDIAN_TYPE); 225 } 226 227 // Endian-flips the int. 228 //////////////////////// flip(int i)229 protected static int flip(int i) 230 { 231 return (((i >> 24) & 0x000000ff) | ((i >> 8) & 0x0000ff00) | ((i << 8) & 0x00ff0000) | ((i << 24) & 0xff000000)); 232 } 233 234 // Endian-flips the short. 235 ////////////////////////// flip(short s)236 private static short flip(short s) 237 { 238 return ((short)(((s >> 8) & 0x00ff) | ((s << 8 ) & 0xff00))); 239 } 240 241 // Puts a group name into an array of bytes. 242 //////////////////////////////////////////// toBytes(SpreadGroup group, byte buffer[], int bufferIndex)243 private static void toBytes(SpreadGroup group, byte buffer[], int bufferIndex) 244 { 245 // Get the group's name. 246 //////////////////////// 247 byte name[]; 248 try 249 { 250 name = group.toString().getBytes("ISO8859_1"); 251 } 252 catch(UnsupportedEncodingException e) 253 { 254 // Already checked for this exception in connect. 255 ///////////////////////////////////////////////// 256 name = new byte[0]; 257 } 258 259 // Put a cap on the length. 260 /////////////////////////// 261 int len = name.length; 262 if(len > MAX_GROUP_NAME) 263 len = MAX_GROUP_NAME; 264 265 // Copy the name into the buffer. 266 ///////////////////////////////// 267 System.arraycopy(name, 0, buffer, bufferIndex, len); 268 for( ; len < MAX_GROUP_NAME ; len++ ) 269 buffer[bufferIndex + len] = 0; 270 } 271 272 // Puts an int into an array of bytes. 273 ////////////////////////////////////// toBytes(int i, byte buffer[], int bufferIndex)274 private static void toBytes(int i, byte buffer[], int bufferIndex) 275 { 276 buffer[bufferIndex++] = (byte)((i >> 24) & 0xFF); 277 buffer[bufferIndex++] = (byte)((i >> 16) & 0xFF); 278 buffer[bufferIndex++] = (byte)((i >> 8 ) & 0xFF); 279 buffer[bufferIndex++] = (byte)((i ) & 0xFF); 280 } 281 282 // Gets an int from an array of bytes. 283 ////////////////////////////////////// toInt(byte buffer[], int bufferIndex)284 protected static int toInt(byte buffer[], int bufferIndex) 285 { 286 int i0 = (buffer[bufferIndex++] & 0xFF); 287 int i1 = (buffer[bufferIndex++] & 0xFF); 288 int i2 = (buffer[bufferIndex++] & 0xFF); 289 int i3 = (buffer[bufferIndex++] & 0xFF); 290 291 return ((i0 << 24) | (i1 << 16) | (i2 << 8) | (i3)); 292 } 293 294 // Reads from inputsocket until all bytes read or exception raised 295 ////////////////////////////////////////////////////////////////// readBytesFromSocket(byte buffer[], String bufferTypeString)296 private void readBytesFromSocket(byte buffer[], String bufferTypeString) throws SpreadException 297 { 298 int byteIndex; 299 int rcode; 300 try 301 { 302 for(byteIndex = 0 ; byteIndex < buffer.length ; byteIndex += rcode) 303 { 304 rcode = socketInput.read(buffer, byteIndex, buffer.length - byteIndex); 305 if(rcode == -1) 306 { 307 throw new SpreadException("Connection closed while reading " + bufferTypeString); 308 } 309 } 310 } 311 catch(InterruptedIOException e) { 312 throw new SpreadException("readBytesFromSocket(): InterruptedIOException " + e); 313 } 314 catch(IOException e) 315 { 316 throw new SpreadException("readBytesFromSocket(): read() " + e); 317 } 318 319 } 320 321 322 // Gets a string from an array of bytes. 323 //////////////////////////////////////// toGroup(byte buffer[], int bufferIndex)324 protected SpreadGroup toGroup(byte buffer[], int bufferIndex) 325 { 326 try 327 { 328 for(int end = bufferIndex ; end < buffer.length ; end++) 329 { 330 if(buffer[end] == 0) 331 { 332 // Get the group name. 333 ////////////////////// 334 String name = new String(buffer, bufferIndex, end - bufferIndex, "ISO8859_1"); 335 336 // Return the group. 337 //////////////////// 338 return new SpreadGroup(this, name); 339 } 340 } 341 } 342 catch(UnsupportedEncodingException e) 343 { 344 // Already checked for this exception in connect. 345 ///////////////////////////////////////////////// 346 } 347 348 return null; 349 } 350 351 // Set the send and receive buffer sizes. 352 ///////////////////////////////////////// setBufferSizes()353 private void setBufferSizes() throws SpreadException 354 { 355 /* NOT SUPPORTED IN 1.1 356 try 357 { 358 for(int i = 10 ; i <= 200 ; i += 5) 359 { 360 // The size to try. 361 /////////////////// 362 int size = (1024 * i); 363 364 // Set the buffer sizes. 365 //////////////////////// 366 socket.setSendBufferSize(size); 367 socket.setReceiveBufferSize(size); 368 369 // Check the actual sizes. If smaller, then the max was hit. 370 ///////////////////////////////////////////////////////////// 371 if((socket.getSendBufferSize() < size) || (socket.getReceiveBufferSize() < size)) 372 { 373 break; 374 } 375 } 376 } 377 catch(SocketException e) 378 { 379 throw new SpreadException("set/getSend/ReceiveBufferSize(): " + e); 380 } 381 NOT SUPPORTED IN 1.1 */ 382 } 383 384 // Sends the initial connect message. 385 ///////////////////////////////////// sendConnect(String privateName)386 private void sendConnect(String privateName) throws SpreadException 387 { 388 // Check the private name for validity. 389 /////////////////////////////////////// 390 int len = (privateName == null ? 0 : privateName.length()); 391 if(len > MAX_PRIVATE_NAME) 392 { 393 privateName = privateName.substring(0, MAX_PRIVATE_NAME); 394 len = MAX_PRIVATE_NAME; 395 } 396 397 // Allocate the buffer. 398 /////////////////////// 399 byte buffer[] = new byte[len + 5]; 400 401 // Set the version. 402 /////////////////// 403 buffer[0] = (byte)SP_MAJOR_VERSION; 404 buffer[1] = (byte)SP_MINOR_VERSION; 405 buffer[2] = (byte)SP_PATCH_VERSION; 406 407 // Byte used for group membership and priority. 408 /////////////////////////////////////////////// 409 buffer[3] = 0; 410 411 // Group membership. 412 //////////////////// 413 if(groupMembership) 414 { 415 buffer[3] |= 0x01; 416 } 417 418 // Priority. 419 //////////// 420 if(priority) 421 { 422 buffer[3] |= 0x10; 423 } 424 425 // Write the length. 426 //////////////////// 427 buffer[4] = (byte)len; 428 429 if(len > 0) 430 { 431 // Write the private name. 432 ////////////////////////// 433 byte nameBytes[] = privateName.getBytes(); 434 for(int src = 0, dest = 5 ; src < len ; src++, dest++) 435 { 436 buffer[dest] = nameBytes[src]; 437 } 438 } 439 440 // Send the connection message. 441 /////////////////////////////// 442 try 443 { 444 socketOutput.write(buffer); 445 } 446 catch(IOException e) 447 { 448 throw new SpreadException("write(): " + e); 449 } 450 } 451 452 // read the Auth List 453 ///////////////////// readAuthMethods()454 private void readAuthMethods() throws SpreadException 455 { 456 // Read the length. 457 /////////////////// 458 int len; 459 try 460 { 461 len = socketInput.read(); 462 } 463 catch(IOException e) 464 { 465 throw new SpreadException("read(): " + e); 466 } 467 468 // System.out.println("readAuthMethods: len is " + len); 469 // Check for no more data. 470 ////////////////////////// 471 if(len == -1) 472 { 473 throw new SpreadException("Connection closed during connect attempt to read authlen"); 474 } 475 // Check if it was a response code 476 ////////////////////////////////// 477 if( len >= 128 ) 478 { 479 throw new SpreadException("Connection attempt rejected=" + (0xffffff00 | len)); 480 } 481 482 // Read the name. 483 ///////////////// 484 byte buffer[] = new byte[len]; 485 readBytesFromSocket(buffer, "authname"); 486 // System.out.println("readAuthMethods: string is " + new String(buffer)); 487 488 // if(numRead != len) 489 //{ 490 // throw new SpreadException("Connection closed during connect attempt to read authnames"); 491 //} 492 493 // for now we ignore the list. 494 ////////////////////////////// 495 } 496 497 // Sends the choice of auth methods message. 498 ///////////////////////////////////// sendAuthMethod()499 private void sendAuthMethod() throws SpreadException 500 { 501 int len = authName.length(); 502 // Allocate the buffer. 503 /////////////////////// 504 byte buffer[] = new byte[MAX_AUTH_NAME*MAX_AUTH_METHODS]; 505 506 try 507 { 508 System.arraycopy(authName.getBytes("ISO8859_1"), 0, buffer, 0, len); 509 } 510 catch(UnsupportedEncodingException e) 511 { 512 // Already checked for this exception in connect. 513 ///////////////////////////////////////////////// 514 } 515 for( ; len < (MAX_AUTH_NAME*MAX_AUTH_METHODS) ; len++ ) 516 buffer[len] = 0; 517 518 // Send the connection message. 519 /////////////////////////////// 520 try 521 { 522 socketOutput.write(buffer); 523 } 524 catch(IOException e) 525 { 526 throw new SpreadException("write(): " + e); 527 } 528 } 529 530 // 531 ///////////////////////////////////// instantiateAuthMethod()532 private void instantiateAuthMethod() throws SpreadException 533 { 534 Class authclass; 535 536 // System.out.println("Authname is " + authName); 537 // System.out.println("class name is " + authClassName); 538 try { 539 authclass = Class.forName(authClassName); 540 } catch (ClassNotFoundException e) { 541 throw new SpreadException("class " + authClassName + " not found.\n"); 542 } 543 544 try { 545 authObj = authclass.newInstance(); 546 } catch (Exception e) { 547 throw new SpreadException("class " + authClassName + " error getting instance.\n" + e); 548 } 549 try { 550 authMethodAuthenticate = authclass.getMethod("authenticate", new Class[] { }); 551 } catch (NoSuchMethodException e) { 552 System.out.println("Failed to find auth method authenticate()"); 553 System.exit(1); 554 } catch (SecurityException e) { 555 System.out.println("security exception for method authenticate()"); 556 System.exit(1); 557 } 558 559 560 } 561 // Checks for an accept message. 562 //////////////////////////////// checkAccept()563 private void checkAccept() throws SpreadException 564 { 565 // Read the connection response. 566 //////////////////////////////// 567 int accepted; 568 try 569 { 570 accepted = socketInput.read(); 571 } 572 catch(IOException e) 573 { 574 throw new SpreadException("read(): " + e); 575 } 576 577 // Check for no more data. 578 ////////////////////////// 579 if(accepted == -1) 580 { 581 throw new SpreadException("Connection closed during connect attempt"); 582 } 583 584 // Was it accepted? 585 /////////////////// 586 if(accepted != ACCEPT_SESSION) 587 { 588 throw new SpreadException("Connection attempt rejected=" + (0xffffff00 | accepted)); 589 } 590 } 591 592 // Checks the daemon version. 593 ///////////////////////////// checkVersion()594 private void checkVersion() throws SpreadException 595 { 596 // Read the version. 597 //////////////////// 598 int majorVersion; 599 try 600 { 601 majorVersion = socketInput.read(); 602 } 603 catch(IOException e) 604 { 605 throw new SpreadException("read(): " + e); 606 } 607 608 // Read the sub-version. 609 //////////////////////// 610 int minorVersion; 611 try 612 { 613 minorVersion = socketInput.read(); 614 } 615 catch(IOException e) 616 { 617 throw new SpreadException("read(): " + e); 618 } 619 620 // Read the patch-version. 621 //////////////////////// 622 int patchVersion; 623 try 624 { 625 patchVersion = socketInput.read(); 626 } 627 catch(IOException e) 628 { 629 throw new SpreadException("read(): " + e); 630 } 631 632 // Check for no more data. 633 ////////////////////////// 634 if((majorVersion == -1) || (minorVersion == -1) || (patchVersion == -1) ) 635 { 636 throw new SpreadException("Connection closed during connect attempt"); 637 } 638 639 // Check the version. 640 ///////////////////// 641 int version = ( (majorVersion*10000) + (minorVersion*100) + patchVersion ); 642 if(version < 30100) 643 { 644 throw new SpreadException("Old version " + majorVersion + "." + minorVersion + "." + patchVersion + " not supported"); 645 } 646 if((version < 30800) && (priority)) 647 { 648 throw new SpreadException("Old version " + majorVersion + "." + minorVersion + "." + patchVersion + " does not support priority"); 649 } 650 } 651 652 // Get the private group name. 653 ////////////////////////////// readGroup()654 private void readGroup() throws SpreadException 655 { 656 // Read the length. 657 /////////////////// 658 int len; 659 try 660 { 661 len = socketInput.read(); 662 } 663 catch(IOException e) 664 { 665 throw new SpreadException("read(): " + e); 666 } 667 668 // Check for no more data. 669 ////////////////////////// 670 if(len == -1) 671 { 672 throw new SpreadException("Connection closed during connect attempt"); 673 } 674 675 // Read the name. 676 ///////////////// 677 byte buffer[] = new byte[len]; 678 readBytesFromSocket(buffer, "group name"); 679 680 // Store the group. 681 /////////////////// 682 group = new SpreadGroup(this, new String(buffer)); 683 } 684 685 // Constructor. 686 /////////////// 687 /** 688 * Initializes a new SpreadConnection object. To connect to a daemon with this 689 * object, use {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)}. 690 * 691 * @see SpreadConnection#connect(InetAddress, int, String, boolean, boolean) 692 */ SpreadConnection()693 public SpreadConnection() 694 { 695 // We're not connected. 696 /////////////////////// 697 connected = false; 698 699 // Init synchros. 700 ///////////////// 701 rsynchro = new Boolean(false); 702 wsynchro = new Boolean(false); 703 listenersynchro = new Boolean(false); 704 // Init listeners. 705 ////////////////// 706 basicListeners = new Vector(); 707 advancedListeners = new Vector(); 708 709 // Init listener command buffer. 710 //////////////////////////////// 711 listenerBuffer = new Vector(); 712 713 // Init default authentication 714 ////////////////////////////// 715 authName = DEFAULT_AUTH_NAME; 716 authClassName = DEFAULT_AUTHCLASS_NAME; 717 } 718 719 /** 720 * Sets the authentication name and class string for the client side authentication method. 721 * An authentication method can only be registered before connect is called. 722 * The authentication method registered will then be used whenever 723 * {@link SpreadConnection#connect(InetAddress, int, String, boolean, boolean)} is called. 724 * 725 * @param authName the short official "name" of the method begin registered. 726 * @param authClassName the complete class name for the method (including package) 727 * @throws SpreadException if the connection is already established 728 */ registerAuthentication( String authName, String authClassName )729 synchronized public void registerAuthentication( String authName, String authClassName ) throws SpreadException 730 { 731 // Check if we're connected. 732 //////////////////////////// 733 if(connected == true) 734 { 735 throw new SpreadException("Already connected."); 736 } 737 738 this.authClassName = authClassName; 739 740 try 741 { 742 this.authName = authName.substring(0, MAX_AUTH_NAME); 743 } 744 catch(IndexOutOfBoundsException e) 745 { 746 // Nothing to shorten. 747 ////////////////////// 748 this.authName = authName; 749 } 750 } 751 752 // Establishes a connection with the spread daemon. 753 /////////////////////////////////////////////////// 754 /** 755 * Establishes a connection to a spread daemon. Groups can be joined, and messages can be 756 * sent or received once the connection has been established. 757 * 758 * @param address the daemon's address, or null to connect to the localhost 759 * @param port the daemon's port, or 0 for the default port (4803) 760 * @param privateName the private name to use for this connection 761 * @param priority if true, this is a priority connection 762 * @param groupMembership if true, membership messages will be received on this connection 763 * @throws SpreadException if the connection cannot be established 764 * @see SpreadConnection#disconnect() 765 */ connect(InetAddress address, int port, String privateName, boolean priority, boolean groupMembership)766 synchronized public void connect(InetAddress address, int port, String privateName, boolean priority, boolean groupMembership) throws SpreadException 767 { 768 // Check if we're connected. 769 //////////////////////////// 770 if(connected == true) 771 { 772 throw new SpreadException("Already connected."); 773 } 774 775 // Is ISO8859_1 encoding supported? 776 /////////////////////////////// 777 try 778 { 779 new String("ASCII/ISO8859_1 encoding test").getBytes("ISO8859_1"); 780 } 781 catch(UnsupportedEncodingException e) 782 { 783 throw new SpreadException("ISO8859_1 encoding is not supported."); 784 } 785 786 // Store member variables. 787 ////////////////////////// 788 this.address = address; 789 this.port = port; 790 this.priority = priority; 791 this.groupMembership = groupMembership; 792 793 // Check if no address was specified. 794 ///////////////////////////////////// 795 if(address == null) 796 { 797 // Use the local host. 798 ////////////////////// 799 try 800 { 801 address = InetAddress.getLocalHost(); 802 } 803 catch(UnknownHostException e) 804 { 805 throw new SpreadException("Error getting local host"); 806 } 807 } 808 809 // Check if no port was specified. 810 ////////////////////////////////// 811 if(port == 0) 812 { 813 // Use the default port. 814 //////////////////////// 815 port = DEFAULT_SPREAD_PORT; 816 } 817 818 // Check if the port is out of range. 819 ///////////////////////////////////// 820 if((port < 0) || (port > (32 * 1024))) 821 { 822 throw new SpreadException("Bad port (" + port + ")."); 823 } 824 825 // Create the socket. 826 ///////////////////// 827 try 828 { 829 socket = new Socket(address, port); 830 } 831 catch(IOException e) 832 { 833 throw new SpreadException("Socket(): " + e); 834 } 835 836 // Set the socket's buffer sizes. 837 ///////////////////////////////// 838 setBufferSizes(); 839 840 // Get the socket's streams. 841 //////////////////////////// 842 try 843 { 844 socketInput = socket.getInputStream(); 845 socketOutput = socket.getOutputStream(); 846 } 847 catch(IOException e) 848 { 849 throw new SpreadException("getInput/OutputStream(): " + e); 850 } 851 852 // Send the connect message. 853 //////////////////////////// 854 sendConnect(privateName); 855 856 // Recv the authentication method list 857 ////////////////////////////////////// 858 readAuthMethods(); 859 860 // Send auth method choice 861 ////////////////////////// 862 sendAuthMethod(); 863 864 // turn string name of auth method into class and instance 865 ////////////////////////////////////////////////////////// 866 try { 867 instantiateAuthMethod(); 868 } catch (SpreadException e) { 869 System.out.println("Failed to create authMethod instance" + e.toString() ); 870 System.exit(1); 871 } 872 // Call authenticate module. This will only return when connection is authenticated 873 //////////////////////////// 874 try { 875 authMethodAuthenticate.invoke( authObj, new Object[] { }); 876 } catch (IllegalAccessException e) { 877 System.out.println("error calling authenticate" + e.toString() ); 878 System.exit(1); 879 } catch (IllegalArgumentException e) { 880 System.out.println("error calling authenticate" + e.toString() ); 881 System.exit(1); 882 } catch (java.lang.reflect.InvocationTargetException e) { 883 Throwable thr = e.getTargetException(); 884 if ( thr.getClass().equals(SpreadException.class) ) 885 { 886 throw new SpreadException("Connection Rejected: Authentication failed"); 887 } 888 } 889 // Check for acceptance. 890 //////////////////////// 891 checkAccept(); 892 893 // Check the version. 894 ///////////////////// 895 checkVersion(); 896 897 // Get the private group name. 898 ////////////////////////////// 899 readGroup(); 900 901 // Connection complete. 902 /////////////////////// 903 connected = true; 904 905 // Are there any listeners. 906 /////////////////////////// 907 if((basicListeners.size() != 0) || (advancedListeners.size() != 0)) 908 { 909 // Start the listener thread. 910 ///////////////////////////// 911 startListener(); 912 } 913 } 914 915 // Disconnects from the spread daemon. 916 ////////////////////////////////////// 917 /** 918 * Disconnects the connection to the daemon. Nothing else should be done with this connection 919 * after disconnecting it. 920 * 921 * @throws SpreadException if there is no connection or there is an error disconnecting 922 * @see SpreadConnection#connect(InetAddress, int, String, boolean, boolean) 923 */ disconnect()924 synchronized public void disconnect() throws SpreadException 925 { 926 // Check if we're connected. 927 //////////////////////////// 928 if(connected == false) 929 { 930 throw new SpreadException("Not connected."); 931 } 932 933 // Are we in a listener callback? 934 ///////////////////////////////// 935 if(callingListeners) 936 { 937 // Add it to the command buffer. 938 //////////////////////////////// 939 listenerBuffer.addElement(BUFFER_DISCONNECT); 940 941 // Don't need to do anything else. 942 ////////////////////////////////// 943 return; 944 } 945 946 // Get a new message. 947 ///////////////////// 948 SpreadMessage killMessage = new SpreadMessage(); 949 950 // Send it to our private group. 951 //////////////////////////////// 952 killMessage.addGroup(group); 953 954 // Set the service type. 955 //////////////////////// 956 killMessage.setServiceType(SpreadMessage.KILL_MESS); 957 958 // Send the message. 959 //////////////////// 960 multicast(killMessage); 961 962 // Check for a listener thread. 963 /////////////////////////////// 964 if(listener != null) 965 { 966 // Stop it. 967 /////////// 968 stopListener(); 969 } 970 971 // Close the socket. 972 //////////////////// 973 try 974 { 975 socket.close(); 976 } 977 catch(IOException e) 978 { 979 throw new SpreadException("close(): " + e); 980 } 981 982 connected = false; 983 } 984 985 // Gets the user's private group. 986 ///////////////////////////////// 987 /** 988 * Gets the private group for this connection. 989 * 990 * @return the SpreadGroup representing this connection's private group, or null if there is no connection 991 */ getPrivateGroup()992 public SpreadGroup getPrivateGroup() 993 { 994 // Check if we're connected. 995 //////////////////////////// 996 if(connected == false) 997 { 998 return null; 999 } 1000 1001 return group; 1002 } 1003 1004 // Receives a new message. 1005 ////////////////////////// 1006 /** 1007 * Receives the next message waiting on this connection. If there are no messages 1008 * waiting, the call will block until a message is ready to be received. 1009 * 1010 * @return the message that has just been received 1011 * @throws SpreadException if there is no connection or there is any error reading a new message 1012 */ receive()1013 public SpreadMessage receive() throws SpreadException, InterruptedIOException 1014 { 1015 1016 synchronized(rsynchro) { 1017 // Check if there are any listeners. 1018 //////////////////////////////////// 1019 if((basicListeners.isEmpty() == false) || (advancedListeners.isEmpty() == false)) 1020 { 1021 // Get out of here. 1022 /////////////////// 1023 throw new SpreadException("Tried to receive while there are listeners"); 1024 } 1025 1026 return internal_receive(); 1027 } 1028 } 1029 1030 // Actually receives a new message 1031 /////////////////////////////////// internal_receive()1032 private SpreadMessage internal_receive() throws SpreadException, InterruptedIOException 1033 { 1034 // Check if we're connected. 1035 //////////////////////////// 1036 if(connected == false) 1037 { 1038 throw new SpreadException("Not connected."); 1039 } 1040 1041 // Read the header. 1042 /////////////////// 1043 byte header[] = new byte[MAX_GROUP_NAME+16]; 1044 int headerIndex; 1045 int rcode; 1046 try 1047 { 1048 for(headerIndex = 0 ; headerIndex < header.length ; headerIndex += rcode) 1049 { 1050 rcode = socketInput.read(header, headerIndex, header.length - headerIndex); 1051 if(rcode == -1) 1052 { 1053 throw new SpreadException("Connection closed while reading header"); 1054 } 1055 } 1056 } 1057 catch(InterruptedIOException e) { 1058 throw e; 1059 } 1060 catch(IOException e) 1061 { 1062 throw new SpreadException("read(): " + e); 1063 } 1064 1065 // Reset header index. 1066 ////////////////////// 1067 headerIndex = 0; 1068 1069 // Get service type. 1070 //////////////////// 1071 int serviceType = toInt(header, headerIndex); 1072 headerIndex += 4; 1073 1074 // Get the sender. 1075 ////////////////// 1076 SpreadGroup sender = toGroup(header, headerIndex); 1077 headerIndex += MAX_GROUP_NAME; 1078 1079 // Get the number of groups. 1080 //////////////////////////// 1081 int numGroups = toInt(header, headerIndex); 1082 headerIndex += 4; 1083 1084 // Get the hint/type. 1085 ///////////////////// 1086 int hint = toInt(header, headerIndex); 1087 headerIndex += 4; 1088 1089 // Get the data length. 1090 /////////////////////// 1091 int dataLen = toInt(header, headerIndex); 1092 headerIndex += 4; 1093 1094 // Does the header need to be flipped? 1095 // (Checking for a daemon server endian-mismatch) 1096 ///////////////////////////////////////////////// 1097 boolean daemonEndianMismatch; 1098 if(sameEndian(serviceType) == false) 1099 { 1100 // Flip them. 1101 ///////////// 1102 serviceType = flip(serviceType); 1103 numGroups = flip(numGroups); 1104 dataLen = flip(dataLen); 1105 1106 // The daemon endian-mismatch. 1107 ////////////////////////////// 1108 daemonEndianMismatch = true; 1109 } 1110 else 1111 { 1112 // The daemon endian-mismatch. 1113 ////////////////////////////// 1114 daemonEndianMismatch = false; 1115 } 1116 1117 // Validate numGroups and dataLen 1118 1119 if ( (numGroups < 0) || (dataLen < 0) ) 1120 { 1121 // drop message 1122 throw new SpreadException("Illegal Message: Message Dropped"); 1123 } 1124 1125 // An endian mismatch. 1126 ////////////////////// 1127 boolean endianMismatch; 1128 1129 // The type. 1130 //////////// 1131 short type; 1132 1133 // Is this a regular message? 1134 ///////////////////////////// 1135 if( SpreadMessage.isRegular(serviceType) || SpreadMessage.isReject(serviceType) ) 1136 { 1137 // Does the hint need to be flipped? 1138 // (Checking for a sender endian-mismatch) 1139 ////////////////////////////////////////// 1140 if(sameEndian(hint) == false) 1141 { 1142 hint = flip(hint); 1143 endianMismatch = true; 1144 } 1145 else 1146 { 1147 endianMismatch = false; 1148 } 1149 1150 // Get the type from the hint. 1151 ////////////////////////////// 1152 hint = clearEndian(hint); 1153 hint >>= 8; 1154 hint &= 0x0000FFFF; 1155 type = (short)hint; 1156 } 1157 else 1158 { 1159 // This is not a regular message. 1160 ///////////////////////////////// 1161 type = -1; 1162 endianMismatch = false; 1163 } 1164 1165 if( SpreadMessage.isReject(serviceType) ) 1166 { 1167 // Read in the old type and or with reject type field. 1168 byte oldtypeBuffer[] = new byte[4]; 1169 try 1170 { 1171 for(int oldtypeIndex = 0 ; oldtypeIndex < oldtypeBuffer.length ; ) 1172 { 1173 rcode = socketInput.read(oldtypeBuffer, oldtypeIndex, oldtypeBuffer.length - oldtypeIndex); 1174 if(rcode == -1) 1175 { 1176 throw new SpreadException("Connection closed while reading groups"); 1177 } 1178 oldtypeIndex += rcode; 1179 } 1180 } 1181 catch(InterruptedIOException e) { 1182 throw e; 1183 } 1184 catch(IOException e) 1185 { 1186 throw new SpreadException("read(): " + e); 1187 } 1188 int oldType = toInt(oldtypeBuffer, 0); 1189 if ( sameEndian(serviceType) == false ) 1190 oldType = flip(oldType); 1191 1192 serviceType = (SpreadMessage.REJECT_MESS | oldType); 1193 } 1194 1195 // Read in the group names. 1196 /////////////////////////// 1197 byte buffer[] = new byte[numGroups * MAX_GROUP_NAME]; 1198 try 1199 { 1200 for(int bufferIndex = 0 ; bufferIndex < buffer.length ; ) 1201 { 1202 rcode = socketInput.read(buffer, bufferIndex, buffer.length - bufferIndex); 1203 if(rcode == -1) 1204 { 1205 throw new SpreadException("Connection closed while reading groups"); 1206 } 1207 bufferIndex += rcode; 1208 } 1209 } 1210 catch(InterruptedIOException e) { 1211 throw e; 1212 } 1213 catch(IOException e) 1214 { 1215 throw new SpreadException("read(): " + e); 1216 } 1217 1218 // Clear the endian type. 1219 ///////////////////////// 1220 serviceType = clearEndian(serviceType); 1221 1222 // Get the groups from the buffer. 1223 ////////////////////////////////// 1224 Vector groups = new Vector(numGroups); 1225 for(int bufferIndex = 0 ; bufferIndex < buffer.length ; bufferIndex += MAX_GROUP_NAME) 1226 { 1227 // Translate the name into a group and add it to the vector. 1228 //////////////////////////////////////////////////////////// 1229 groups.addElement(toGroup(buffer, bufferIndex)); 1230 } 1231 1232 // Read in the data. 1233 //////////////////// 1234 byte data[] = new byte[dataLen]; 1235 try 1236 { 1237 for(int dataIndex = 0 ; dataIndex < dataLen ; ) 1238 { 1239 rcode = socketInput.read(data, dataIndex, dataLen - dataIndex); 1240 if(rcode == -1) 1241 { 1242 throw new SpreadException("Connection close while reading data"); 1243 } 1244 dataIndex += rcode; 1245 } 1246 } 1247 catch(InterruptedIOException e) { 1248 throw e; 1249 } 1250 catch(IOException e) 1251 { 1252 throw new SpreadException("read():" + e); 1253 } 1254 1255 // Is it a membership message? 1256 ////////////////////////////// 1257 MembershipInfo membershipInfo; 1258 if(SpreadMessage.isMembership(serviceType)) 1259 { 1260 // Create a membership info object. 1261 /////////////////////////////////// 1262 membershipInfo = new MembershipInfo(this, serviceType, groups, sender, data, daemonEndianMismatch); 1263 1264 // Is it a regular membership message? 1265 ////////////////////////////////////// 1266 if(membershipInfo.isRegularMembership()) 1267 { 1268 // Find which of these groups is the local connection. 1269 ////////////////////////////////////////////////////// 1270 type = (short)groups.indexOf(group); 1271 } 1272 } 1273 else 1274 { 1275 // There's no membership info. 1276 ////////////////////////////// 1277 membershipInfo = null; 1278 } 1279 1280 // Create the message. 1281 ////////////////////// 1282 return new SpreadMessage(serviceType, groups, sender, data, type, endianMismatch, membershipInfo); 1283 } 1284 1285 // Receives numMessages new messages. 1286 ///////////////////////////////////// 1287 /** 1288 * Receives <code>numMessages</code> messages on the connection and returns them in an array. 1289 * If there are not <code>numMessages</code> messages waiting, the call will block until there are 1290 * enough messages available. 1291 * 1292 * @param numMessages the number of messages to receive 1293 * @return an array of messages 1294 * @throws SpreadException if there is no connection or if there is any error reading the messages 1295 */ receive(int numMessages)1296 public SpreadMessage[] receive(int numMessages) throws SpreadException, InterruptedIOException 1297 { 1298 // Allocate the messages array. 1299 /////////////////////////////// 1300 SpreadMessage[] messages = new SpreadMessage[numMessages]; 1301 synchronized(rsynchro) { 1302 // Check if there are any listeners. 1303 //////////////////////////////////// 1304 if ((basicListeners.isEmpty() == false) || (advancedListeners.isEmpty() == false)) 1305 { 1306 // Get out of here. 1307 /////////////////// 1308 throw new SpreadException("Tried to receive while there are listeners"); 1309 } 1310 1311 // Receive the messages. 1312 //////////////////////// 1313 for(int i = 0 ; i < numMessages ; i++) 1314 { 1315 messages[i] = internal_receive(); 1316 } 1317 1318 } 1319 // Return the array. 1320 //////////////////// 1321 return messages; 1322 } 1323 1324 // Returns true if there are messages waiting. 1325 ////////////////////////////////////////////// 1326 /** 1327 * Returns true if there are any messages waiting on this connection. 1328 * 1329 * @return true if there is at least one message that can be received 1330 * @throws SpreadException if there is no connection or if there is an error checking for messages 1331 */ poll()1332 public boolean poll() throws SpreadException 1333 { 1334 // Check if we're connected. 1335 //////////////////////////// 1336 if(connected == false) 1337 { 1338 throw new SpreadException("Not connected."); 1339 } 1340 1341 // Check if there is anything waiting. 1342 ////////////////////////////////////// 1343 try 1344 { 1345 if(socketInput.available() == 0) 1346 { 1347 // There's nothing to read. 1348 /////////////////////////// 1349 return false; 1350 } 1351 } 1352 catch(IOException e) 1353 { 1354 throw new SpreadException("available(): " + e); 1355 } 1356 1357 // There's something to read. 1358 ///////////////////////////// 1359 return true; 1360 } 1361 1362 // Private function for starting a listener thread. 1363 /////////////////////////////////////////////////// startListener()1364 private void startListener() 1365 { 1366 // Get a new thread. 1367 //////////////////// 1368 listener = new Listener(this); 1369 1370 // Start it. 1371 //////////// 1372 listener.start(); 1373 } 1374 1375 // Adds a new basic message listener. 1376 ///////////////////////////////////// 1377 /** 1378 * Adds the BasicMessageListener to this connection. If there are no other listeners, this call will 1379 * start a thread to listen for new messages. From the time this function is called until 1380 * this listener is removed, {@link BasicMessageListener#messageReceived(SpreadMessage)} will 1381 * be called every time a message is received. 1382 * 1383 * @param listener a BasicMessageListener to add to this connection 1384 * @see SpreadConnection#remove(BasicMessageListener) 1385 */ add(BasicMessageListener listener)1386 public void add(BasicMessageListener listener) 1387 { 1388 synchronized(listenersynchro) { 1389 // Are we in a listener callback? 1390 ///////////////////////////////// 1391 if(callingListeners) 1392 { 1393 // Add it to the command buffer. 1394 //////////////////////////////// 1395 listenerBuffer.addElement(BUFFER_ADD_BASIC); 1396 listenerBuffer.addElement(listener); 1397 1398 // Don't need to do anything else. 1399 ////////////////////////////////// 1400 return; 1401 } 1402 // Add the listener. 1403 //////////////////// 1404 basicListeners.addElement(listener); 1405 1406 // Check if we're connected. 1407 //////////////////////////// 1408 if(connected == true) 1409 { 1410 // Check if the thread is running. 1411 ////////////////////////////////// 1412 if(this.listener == null) 1413 { 1414 // Start the thread. 1415 //////////////////// 1416 startListener(); 1417 } 1418 } 1419 } 1420 } 1421 1422 // Adds a new advanced message listener. 1423 //////////////////////////////////////// 1424 /** 1425 * Adds the AdvancedMessageListener to this connection. If there are no other listeners, this call will 1426 * start a thread to listen for new messages. From the time this function is called until 1427 * this listener is removed, {@link AdvancedMessageListener#regularMessageReceived(SpreadMessage)} will 1428 * be called every time a regular message is received, and 1429 * {@link AdvancedMessageListener#membershipMessageReceived(SpreadMessage)} will be called every time 1430 * a membership message is received. 1431 * 1432 * @param listener an AdvancedMessageListener to add to this connection 1433 * @see SpreadConnection#remove(AdvancedMessageListener) 1434 */ add(AdvancedMessageListener listener)1435 public void add(AdvancedMessageListener listener) 1436 { 1437 synchronized (listenersynchro) { 1438 // Are we in a listener callback? 1439 ///////////////////////////////// 1440 if(callingListeners) 1441 { 1442 // Add it to the command buffer. 1443 //////////////////////////////// 1444 listenerBuffer.addElement(BUFFER_ADD_ADVANCED); 1445 listenerBuffer.addElement(listener); 1446 1447 // Don't need to do anything else. 1448 ////////////////////////////////// 1449 return; 1450 } 1451 1452 // Add the listener. 1453 //////////////////// 1454 advancedListeners.addElement(listener); 1455 1456 // Check if we're connected. 1457 //////////////////////////// 1458 if(connected == true) 1459 { 1460 // Check if the thread is running. 1461 ////////////////////////////////// 1462 if(this.listener == null) 1463 { 1464 // Start the thread. 1465 //////////////////// 1466 startListener(); 1467 } 1468 } 1469 } 1470 } 1471 1472 // Stops the listener thread. 1473 ///////////////////////////// stopListener()1474 private void stopListener() 1475 { 1476 // Set the signal. 1477 ////////////////// 1478 listener.signal = true; 1479 1480 // Wait for the thread to die, to avoid inconsistencies. 1481 //////////////////////////////////////////////////////// 1482 listener.join(); 1483 1484 // Clear the variable. 1485 ////////////////////// 1486 listener = null; 1487 } 1488 1489 // Removes a basic message listener. 1490 //////////////////////////////////// 1491 /** 1492 * Removes the BasicMessageListener from this connection. If this is the only listener on this 1493 * connection, the listener thread will be stopped. 1494 * 1495 * @param listener the listener to remove 1496 * @see SpreadConnection#add(BasicMessageListener) 1497 */ remove(BasicMessageListener listener)1498 public void remove(BasicMessageListener listener) 1499 { 1500 synchronized (listenersynchro) { 1501 // Are we in a listener callback? 1502 ///////////////////////////////// 1503 if(callingListeners) 1504 { 1505 // Add it to the command buffer. 1506 //////////////////////////////// 1507 listenerBuffer.addElement(BUFFER_REMOVE_BASIC); 1508 listenerBuffer.addElement(listener); 1509 1510 // Don't need to do anything else. 1511 ////////////////////////////////// 1512 return; 1513 } 1514 1515 // Remove the listener. 1516 /////////////////////// 1517 basicListeners.removeElement(listener); 1518 1519 // Check if we're connected. 1520 //////////////////////////// 1521 if(connected == true) 1522 { 1523 // Check if there are any more listeners. 1524 ///////////////////////////////////////// 1525 if((basicListeners.size() == 0) && (advancedListeners.size() == 0)) 1526 { 1527 // Stop the listener thread. 1528 //////////////////////////// 1529 stopListener(); 1530 } 1531 } 1532 } 1533 } 1534 1535 // Removes an advanced message listener. 1536 //////////////////////////////////////// 1537 /** 1538 * Removes the AdvancedMessageListener from this connection. If this is the only listener on this 1539 * connection, the listener thread will be stopped. 1540 * 1541 * @param listener the listener to remove 1542 * @see SpreadConnection#add(AdvancedMessageListener) 1543 */ remove(AdvancedMessageListener listener)1544 public void remove(AdvancedMessageListener listener) 1545 { 1546 synchronized (listenersynchro) { 1547 // Are we in a listener callback? 1548 ///////////////////////////////// 1549 if(callingListeners) 1550 { 1551 // Add it to the command buffer. 1552 //////////////////////////////// 1553 listenerBuffer.addElement(BUFFER_REMOVE_ADVANCED); 1554 listenerBuffer.addElement(listener); 1555 1556 // Don't need to do anything else. 1557 ////////////////////////////////// 1558 return; 1559 } 1560 1561 // Remove the listener. 1562 /////////////////////// 1563 advancedListeners.removeElement(listener); 1564 1565 // Check if we're connected. 1566 //////////////////////////// 1567 if(connected == true) 1568 { 1569 // Check if there are any more listeners. 1570 ///////////////////////////////////////// 1571 if((basicListeners.size() == 0) && (advancedListeners.size() == 0)) 1572 { 1573 // Stop the listener thread. 1574 //////////////////////////// 1575 stopListener(); 1576 } 1577 } 1578 } 1579 } 1580 1581 // This is the thread used to handle listener interfaces. 1582 ///////////////////////////////////////////////////////// 1583 private class Listener extends Thread 1584 { 1585 // The connection this thread is listening to. 1586 ////////////////////////////////////////////// 1587 private SpreadConnection connection; 1588 1589 // If true, the connection wants the thread to stop. 1590 //////////////////////////////////////////////////// 1591 protected boolean signal; 1592 1593 // The constructor. 1594 /////////////////// Listener(SpreadConnection connection)1595 public Listener(SpreadConnection connection) 1596 { 1597 // Store local variables. 1598 ///////////////////////// 1599 this.connection = connection; 1600 this.signal = false; 1601 1602 // Be a daemon. 1603 /////////////// 1604 this.setDaemon(true); 1605 } 1606 1607 // The thread's entry point. 1608 //////////////////////////// run()1609 public void run() 1610 { 1611 // An incoming message. 1612 /////////////////////// 1613 SpreadMessage message; 1614 1615 // A basic listener. 1616 //////////////////// 1617 BasicMessageListener basicListener; 1618 1619 // An advanced listener. 1620 //////////////////////// 1621 AdvancedMessageListener advancedListener; 1622 1623 // A buffered command. 1624 ////////////////////// 1625 Object command; 1626 1627 int previous_socket_timeout = 100; 1628 1629 try 1630 { 1631 try { 1632 previous_socket_timeout = connection.socket.getSoTimeout(); 1633 connection.socket.setSoTimeout(100); 1634 } 1635 catch( SocketException e ) { 1636 // just ignore for now 1637 System.out.println("socket error setting timeout" + e.toString() ); 1638 } 1639 while(true) 1640 { 1641 // Get a lock on the connection. 1642 //////////////////////////////// 1643 synchronized(connection) 1644 { 1645 // Do they want us to stop? 1646 /////////////////////////// 1647 if(signal == true) 1648 { 1649 // We're done. 1650 ////////////// 1651 System.out.println("LISTENER: told to exit so returning"); 1652 try { 1653 connection.socket.setSoTimeout(previous_socket_timeout); 1654 } 1655 catch( SocketException e ) { 1656 // just ignore for now 1657 System.out.println("socket error setting timeout" + e.toString() ); 1658 } 1659 1660 return; 1661 } 1662 1663 // Get a message. 1664 // WE WILL BLOCK HERE UNTIL DATA IS AVAILABLE 1665 // or 100 MS expires 1666 ///////////////// 1667 try { 1668 synchronized(rsynchro) { 1669 message = connection.internal_receive(); 1670 } 1671 // Calling listeners. 1672 ///////////////////// 1673 callingListeners = true; 1674 1675 // Tell all the basic listeners. 1676 //////////////////////////////// 1677 for(int i = 0 ; i < basicListeners.size() ; i++) 1678 { 1679 // Get the listener. 1680 //////////////////// 1681 basicListener = (BasicMessageListener)basicListeners.elementAt(i); 1682 1683 // Tell it. 1684 /////////// 1685 basicListener.messageReceived(message); 1686 } 1687 1688 // Tell all the advanced listeners. 1689 /////////////////////////////////// 1690 for(int i = 0 ; i < advancedListeners.size() ; i++) 1691 { 1692 // Get the listener. 1693 //////////////////// 1694 advancedListener = (AdvancedMessageListener)advancedListeners.elementAt(i); 1695 1696 // What type of message is it? 1697 ////////////////////////////// 1698 if(message.isRegular()) 1699 { 1700 // Tell it. 1701 /////////// 1702 advancedListener.regularMessageReceived(message); 1703 } 1704 else 1705 { 1706 // Tell it. 1707 /////////// 1708 advancedListener.membershipMessageReceived(message); 1709 } 1710 } 1711 1712 // Done calling listeners. 1713 ////////////////////////// 1714 callingListeners = false; 1715 1716 1717 } catch( InterruptedIOException e) { 1718 /// Ignore 1719 } 1720 // Execute buffered commands. 1721 ///////////////////////////// 1722 while(listenerBuffer.isEmpty() == false) 1723 { 1724 // Get the first command. 1725 ///////////////////////// 1726 command = listenerBuffer.firstElement(); 1727 1728 // Remove it from the list. 1729 /////////////////////////// 1730 listenerBuffer.removeElementAt(0); 1731 1732 // Check what type of command it is. 1733 //////////////////////////////////// 1734 if(command == BUFFER_DISCONNECT) 1735 { 1736 // Disconnect. 1737 ////////////// 1738 connection.disconnect(); 1739 1740 // Don't execute any more commands. 1741 /////////////////////////////////// 1742 listenerBuffer.removeAllElements(); 1743 } 1744 else if(command == BUFFER_ADD_BASIC) 1745 { 1746 // Get the listener. 1747 //////////////////// 1748 basicListener = (BasicMessageListener)listenerBuffer.firstElement(); 1749 1750 // Add it. 1751 ////////// 1752 connection.add(basicListener); 1753 // Remove the listener from the Vector. 1754 /////////////////////////////////////// 1755 listenerBuffer.removeElementAt(0); 1756 1757 } 1758 else if(command == BUFFER_ADD_ADVANCED) 1759 { 1760 // Get the listener. 1761 //////////////////// 1762 advancedListener = (AdvancedMessageListener)listenerBuffer.firstElement(); 1763 1764 // Add it. 1765 ////////// 1766 connection.add(advancedListener); 1767 // Remove the listener from the Vector. 1768 /////////////////////////////////////// 1769 listenerBuffer.removeElementAt(0); 1770 1771 } 1772 else if(command == BUFFER_REMOVE_BASIC) 1773 { 1774 // Get the listener. 1775 //////////////////// 1776 basicListener = (BasicMessageListener)listenerBuffer.firstElement(); 1777 1778 // Remove it. 1779 ///////////// 1780 connection.remove(basicListener); 1781 1782 // Remove the listener from the Vector. 1783 /////////////////////////////////////// 1784 listenerBuffer.removeElementAt(0); 1785 } 1786 else if(command == BUFFER_REMOVE_ADVANCED) 1787 { 1788 // Get the listener. 1789 //////////////////// 1790 advancedListener = (AdvancedMessageListener)listenerBuffer.firstElement(); 1791 1792 // Remove it. 1793 ///////////// 1794 connection.remove(advancedListener); 1795 // Remove the listener from the Vector. 1796 /////////////////////////////////////// 1797 listenerBuffer.removeElementAt(0); 1798 } 1799 } 1800 } 1801 1802 // There are no messages waiting, take a break. 1803 /////////////////////////////////////////////// 1804 yield(); 1805 } 1806 } 1807 catch(SpreadException e) 1808 { 1809 // Nothing to do but ignore it. 1810 /////////////////////////////// 1811 System.out.println("SpreadException: " + e.toString() ); 1812 } 1813 } 1814 } 1815 1816 // Sends the message. 1817 ///////////////////// 1818 /** 1819 * Multicasts a message. The message will be sent to all the groups specified in 1820 * the message. 1821 * 1822 * @param message the message to multicast 1823 * @throws SpreadException if there is no connection or if there is any error sending the message 1824 */ multicast(SpreadMessage message)1825 public void multicast(SpreadMessage message) throws SpreadException 1826 { 1827 // Check if we're connected. 1828 //////////////////////////// 1829 if(connected == false) 1830 { 1831 throw new SpreadException("Not connected."); 1832 } 1833 1834 // The groups this message is going to. 1835 /////////////////////////////////////// 1836 SpreadGroup groups[] = message.getGroups(); 1837 1838 // The message data. 1839 //////////////////// 1840 byte data[] = message.getData(); 1841 1842 // Calculate the total number of bytes. 1843 /////////////////////////////////////// 1844 int numBytes = 16; // serviceType, numGroups, type/hint, dataLen 1845 numBytes += MAX_GROUP_NAME; // private group 1846 numBytes += (MAX_GROUP_NAME * groups.length); // groups 1847 1848 if (numBytes + data.length > MAX_MESSAGE_LENGTH ) 1849 { 1850 throw new SpreadException("Message is too long for a Spread Message"); 1851 } 1852 // Allocate the send buffer. 1853 //////////////////////////// 1854 byte buffer[] = new byte[numBytes]; 1855 int bufferIndex = 0; 1856 1857 // The service type. 1858 //////////////////// 1859 toBytes(message.getServiceType(), buffer, bufferIndex); 1860 bufferIndex += 4; 1861 1862 // The private group. 1863 ///////////////////// 1864 toBytes(group, buffer, bufferIndex); 1865 bufferIndex += MAX_GROUP_NAME; 1866 1867 // The number of groups. 1868 //////////////////////// 1869 toBytes(groups.length, buffer, bufferIndex); 1870 bufferIndex += 4; 1871 1872 // The service type and hint. 1873 ///////////////////////////// 1874 toBytes(((int)message.getType() << 8) & 0x00FFFF00, buffer, bufferIndex); 1875 bufferIndex += 4; 1876 1877 // The data length. 1878 /////////////////// 1879 toBytes(data.length, buffer, bufferIndex); 1880 bufferIndex += 4; 1881 1882 // The group names. 1883 /////////////////// 1884 for(int i = 0 ; i < groups.length ; i++) 1885 { 1886 toBytes(groups[i], buffer, bufferIndex); 1887 bufferIndex += MAX_GROUP_NAME; 1888 } 1889 1890 // Send it. 1891 /////////// 1892 synchronized (wsynchro) { 1893 try 1894 { 1895 socketOutput.write(buffer); 1896 socketOutput.write(data); 1897 } 1898 catch(IOException e) 1899 { 1900 throw new SpreadException("write(): " + e.toString()); 1901 } 1902 } 1903 } 1904 1905 // Sends the array of messages. 1906 /////////////////////////////// 1907 /** 1908 * Multicasts an array of messages. Each message will be sent to all the groups specified in 1909 * the message. 1910 * 1911 * @param messages the messages to multicast 1912 * @throws SpreadException if there is no connection or if there is any error sending the messages 1913 */ multicast(SpreadMessage messages[])1914 public void multicast(SpreadMessage messages[]) throws SpreadException 1915 { 1916 // Go through the array. 1917 //////////////////////// 1918 for(int i = 0 ; i < messages.length ; i++) 1919 { 1920 // Send this message. 1921 ///////////////////// 1922 multicast(messages[i]); 1923 } 1924 } 1925 } 1926