1 2 package org.jgroups.protocols; 3 4 import org.jgroups.*; 5 import org.jgroups.annotations.GuardedBy; 6 import org.jgroups.annotations.Property; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.QueueClosedException; 9 import org.jgroups.util.Util; 10 11 import javax.crypto.*; 12 import javax.crypto.spec.SecretKeySpec; 13 import java.io.DataInputStream; 14 import java.io.DataOutputStream; 15 import java.io.IOException; 16 import java.io.InputStream; 17 import java.security.*; 18 import java.security.cert.CertificateException; 19 import java.security.spec.X509EncodedKeySpec; 20 import java.util.Map; 21 import java.util.Vector; 22 import java.util.WeakHashMap; 23 import java.util.concurrent.BlockingQueue; 24 import java.util.concurrent.LinkedBlockingQueue; 25 import java.util.concurrent.TimeUnit; 26 import java.util.concurrent.locks.Lock; 27 import java.util.concurrent.locks.ReentrantLock; 28 29 /** 30 * ENCRYPT layer. Encrypt and decrypt the group communication in JGroups 31 * 32 * The file can be used in two ways: 33 * <ul> 34 * <li> Option 1. Configured with a secretKey in a keystore so it can be used at 35 * any layer in JGroups without the need for a coordinator, or if you want 36 * protection against passive monitoring but do not want the key exchange 37 * overhead and complexity. In this mode all nodes must be distributed with the 38 * same keystore file. 39 * <li> Option 2. Configured with algorithms and key sizes. The Encrypt Layer in 40 * this mode sould be used between the FRAG and PBCast layers in the stack. The 41 * coordinator then chooses the secretkey which it distributes amongst all the 42 * peers. In this form no keystore exists as the keys are distributed using a 43 * public/private key exchange. View changes that identify a new controller will 44 * result in a new session key being generated and then distributed to all 45 * peers. This overhead can be substantial in a an application with a reasonable 46 * peer churn. 47 * </ul> 48 * <p> 49 * <p> 50 * Each message is identified as encrypted with a specific encryption header 51 * which identifies the type of encrypt header and an MD5 digest that identifies 52 * the version of the key being used to encrypt/decrypt the messages. 53 * <p> 54 * <p> 55 * <h2>Option 1</h2> 56 * <br> 57 * This is the simplest option and can be used by simply inserting the 58 * Encryption layer at any point in the JGroup stack - it will encrypt all 59 * Events of a type MSG that have a non-null message buffer. The format of the 60 * entry in this form is:<br> 61 * <ENCRYPT key_store_name="defaultStore.keystore" store_password="changeit" 62 * alias="myKey"/><br> 63 * An example bare-bones.xml file showing the keystore version can be found in 64 * the conf ina file called EncryptKeyStore.xml - along with a 65 * defaultStore.keystore file.<br> 66 * In order to use the Encrypt layer in this manner it is necessary to have the 67 * secretKey already generated in a keystore file. The directory containing the 68 * keystore file must be on the application's classpath. You cannot create a 69 * SecretKey keystore file using the keytool application shipped with the JDK. A 70 * java file called KeyStoreGenerator is included in the demo package that can 71 * be used from the command line (or IDE) to generate a suitable keystore. 72 * <p> 73 * <p> 74 * <h2>Option 2</h2> 75 * <br> 76 * This option is suited to an application that does not ship with a known key 77 * but instead it is generated and distributed by the controller. The secret key 78 * is first generated by the Controller (in JGroup terms). When a view change 79 * occurs a peer will request the secret key by sending a key request with its 80 * own public key. The controller encrypts the secret key with this key and 81 * sends it back to the peer who then decrypts it and installs the key as its 82 * own secret key. <br> 83 * All encryption and decryption of Messages is done using this key. When a peer 84 * receives a view change that shows a different keyserver it will repeat this 85 * process - the view change event also trigger the encrypt layer to queue up 86 * and down messages until the new key is installed. The previous keys are 87 * retained so that messages sent before the view change that are queued can be 88 * decrypted if the key is different. <br> 89 * An example EncryptNoKeyStore.xml is included in the conf file as a guide. 90 * <p> 91 * <p> 92 * <br> 93 * Note: the current version does not support the concept of perfect forward 94 * encryption (PFE) which means that if a peer leaves the group the keys are 95 * re-generated preventing the departed peer from decrypting future messages if 96 * it chooses to listen in on the group. This is not included as it really 97 * requires a suitable authentication scheme as well to make this feature useful 98 * as there is nothing to stop the peer rejoining and receiving the new key. A 99 * future release will address this issue. 100 * 101 * @author Steve Woodcock 102 * @author Bela Ban 103 */ 104 public class ENCRYPT extends Protocol { 105 Observer observer; 106 107 interface Observer { up(Event evt)108 void up(Event evt); 109 passUp(Event evt)110 void passUp(Event evt); 111 down(Event evt)112 void down(Event evt); 113 passDown(Event evt)114 void passDown(Event evt); 115 } 116 117 private static final String DEFAULT_SYM_ALGO="AES"; 118 // address info 119 Address local_addr=null; 120 // keyserver address 121 Address keyServerAddr=null; 122 123 //used to see whether we are the key server 124 boolean keyServer=false; 125 126 /* ----------------------------------------- Properties -------------------------------------------------- */ 127 128 // encryption properties in no supplied key mode 129 @Property(name="asym_provider", description="Cryptographic Service Provider. Default is Bouncy Castle Provider") 130 String asymProvider=null; 131 132 @Property(name="sym_provider", description="Cryptographic Service Provider. Default is Bouncy Castle Provider") 133 String symProvider=null; 134 135 @Property(name="asym_algorithm", description="Cipher engine transformation for asymmetric algorithm. Default is RSA") 136 String asymAlgorithm="RSA"; 137 138 @Property(name="sym_algorithm", description="Cipher engine transformation for symmetric algorithm. Default is AES") 139 String symAlgorithm=DEFAULT_SYM_ALGO; 140 141 @Property(name="asym_init", description="Initial public/private key length. Default is 512") 142 int asymInit=512; 143 144 @Property(name="sym_init", description="Initial key length for matching symmetric algorithm. Default is 128") 145 int symInit=128; 146 147 // properties for functioning in supplied key mode 148 private boolean suppliedKey=false; 149 150 @Property(name="key_store_name", description="File on classpath that contains keystore repository") 151 String keyStoreName; 152 153 @Property(name="store_password", description="Password used to check the integrity/unlock the keystore. Change the default") 154 private String storePassword="changeit"; //JDK default 155 156 @Property(name="key_password", description="Password for recovering the key. Change the default") 157 private String keyPassword="changeit"; //JDK default 158 159 @Property(name="alias", description="Alias used for recovering the key. Change the default") 160 private String alias="mykey"; // JDK default 161 162 // public/private Key 163 KeyPair Kpair; // to store own's public/private Key 164 165 // for client to store server's public Key 166 PublicKey serverPubKey=null; 167 168 // needed because we do simultaneous encode/decode with these ciphers - which 169 // would be a threading issue 170 Cipher symEncodingCipher; 171 172 @GuardedBy("decrypt_lock") 173 Cipher symDecodingCipher; 174 175 /** To synchronize access to symDecodingCipher */ 176 protected final Lock decrypt_lock=new ReentrantLock(); 177 178 // version filed for secret key 179 private String symVersion=null; 180 // dhared secret key to encrypt/decrypt messages 181 SecretKey secretKey=null; 182 183 // map to hold previous keys so we can decrypt some earlier messages if we need to 184 final Map<String,Cipher> keyMap=new WeakHashMap<String,Cipher>(); 185 186 // queues to buffer data while we are swapping shared key 187 // or obtsining key for first time 188 189 private boolean queue_up=true; 190 191 private boolean queue_down=false; 192 193 // queue to hold upcoming messages while key negotiation is happening 194 private BlockingQueue<Event> upMessageQueue=new LinkedBlockingQueue<Event>(); 195 196 // queue to hold downcoming messages while key negotiation is happening 197 private BlockingQueue<Event> downMessageQueue=new LinkedBlockingQueue<Event>(); 198 // decrypting cypher for secret key requests 199 private Cipher asymCipher; 200 201 /** determines whether to encrypt the entire message, or just the buffer */ 202 @Property 203 private boolean encrypt_entire_message=false; 204 ENCRYPT()205 public ENCRYPT() {} 206 setObserver(Observer o)207 public void setObserver(Observer o) { 208 observer=o; 209 } 210 211 /* 212 * GetAlgorithm: Get the algorithm name from "algorithm/mode/padding" 213 * taken m original ENCRYPT file 214 */ getAlgorithm(String s)215 private static String getAlgorithm(String s) { 216 int index=s.indexOf("/"); 217 if(index == -1) 218 return s; 219 220 return s.substring(0, index); 221 } 222 init()223 public void init() throws Exception { 224 if(keyPassword == null && storePassword != null) { 225 keyPassword=storePassword; 226 227 if(log.isInfoEnabled()) 228 log.info("key_password used is same as store_password"); 229 } 230 if(keyStoreName == null) { 231 initSymKey(); 232 initKeyPair(); 233 } 234 else { 235 initConfiguredKey(); 236 } 237 initSymCiphers(symAlgorithm, getSecretKey()); 238 } 239 240 /** 241 * Initialisation if a supplied key is defined in the properties. This 242 * supplied key must be in a keystore which can be generated using the 243 * keystoreGenerator file in demos. The keystore must be on the classpath to 244 * find it. 245 * 246 * @throws KeyStoreException 247 * @throws Exception 248 * @throws IOException 249 * @throws NoSuchAlgorithmException 250 * @throws CertificateException 251 * @throws UnrecoverableKeyException 252 */ initConfiguredKey()253 private void initConfiguredKey() throws Exception { 254 InputStream inputStream=null; 255 // must not use default keystore type - as does not support secret keys 256 KeyStore store=KeyStore.getInstance("JCEKS"); 257 258 SecretKey tempKey=null; 259 try { 260 // load in keystore using this thread's classloader 261 inputStream=Thread.currentThread() 262 .getContextClassLoader() 263 .getResourceAsStream(keyStoreName); 264 // we can't find a keystore here - 265 if(inputStream == null) { 266 throw new Exception("Unable to load keystore " + keyStoreName 267 + " ensure file is on classpath"); 268 } 269 // we have located a file lets load the keystore 270 try { 271 store.load(inputStream, storePassword.toCharArray()); 272 // loaded keystore - get the key 273 tempKey=(SecretKey)store.getKey(alias, keyPassword.toCharArray()); 274 } 275 catch(IOException e) { 276 throw new Exception("Unable to load keystore " + keyStoreName + ": " + e); 277 } 278 catch(NoSuchAlgorithmException e) { 279 throw new Exception("No Such algorithm " + keyStoreName + ": " + e); 280 } 281 catch(CertificateException e) { 282 throw new Exception("Certificate exception " + keyStoreName + ": " + e); 283 } 284 285 if(tempKey == null) { 286 throw new Exception("Unable to retrieve key '" + alias 287 + "' from keystore " 288 + keyStoreName); 289 } 290 //set the key here 291 setSecretKey(tempKey); 292 293 if(symAlgorithm.equals(DEFAULT_SYM_ALGO)) { 294 symAlgorithm=tempKey.getAlgorithm(); 295 } 296 297 // set the fact we are using a supplied key 298 299 suppliedKey=true; 300 queue_down=false; 301 queue_up=false; 302 } 303 finally { 304 Util.close(inputStream); 305 } 306 307 } 308 309 /** 310 * Used to initialise the symmetric key if none is supplied in a keystore. 311 * 312 * @throws Exception 313 */ initSymKey()314 public void initSymKey() throws Exception { 315 KeyGenerator keyGen=null; 316 // see if we have a provider specified 317 if(symProvider != null && symProvider.trim().length() > 0) { 318 keyGen=KeyGenerator.getInstance(getAlgorithm(symAlgorithm), symProvider); 319 } 320 else { 321 keyGen=KeyGenerator.getInstance(getAlgorithm(symAlgorithm)); 322 } 323 // generate the key using the defined init properties 324 keyGen.init(symInit); 325 secretKey=keyGen.generateKey(); 326 327 setSecretKey(secretKey); 328 329 if(log.isInfoEnabled()) 330 log.info(" Symmetric key generated "); 331 } 332 333 /** 334 * Initialises the Ciphers for both encryption and decryption using the 335 * generated or supplied secret key. 336 * 337 * @param algorithm 338 * @param secret 339 * @throws Exception 340 */ initSymCiphers(String algorithm, SecretKey secret)341 private void initSymCiphers(String algorithm, SecretKey secret) throws Exception { 342 343 if(log.isInfoEnabled()) 344 log.info(" Initializing symmetric ciphers"); 345 346 if(symProvider != null && symProvider.trim().length() > 0) { 347 symEncodingCipher=Cipher.getInstance(algorithm, symProvider); 348 symDecodingCipher=Cipher.getInstance(algorithm, symProvider); 349 } 350 else { 351 symEncodingCipher=Cipher.getInstance(algorithm); 352 symDecodingCipher=Cipher.getInstance(algorithm); 353 } 354 355 symEncodingCipher.init(Cipher.ENCRYPT_MODE, secret); 356 symDecodingCipher.init(Cipher.DECRYPT_MODE, secret); 357 358 //set the version 359 MessageDigest digest=MessageDigest.getInstance("MD5"); 360 digest.reset(); 361 digest.update(secret.getEncoded()); 362 363 symVersion=new String(digest.digest(), "UTF-8"); 364 if(log.isInfoEnabled()) { 365 log.info(" Initialized symmetric ciphers with secret key (" + symVersion.length() + " bytes)"); 366 /* 367 StringBuilder sb=new StringBuilder(" Initialized symmetric ciphers with secret key (" + symVersion.length() 368 + " bytes) "); 369 char[] arr=symVersion.toCharArray(); 370 for(int i=0;i < arr.length;i++) { 371 char c=arr[i]; 372 sb.append((int)c); 373 } 374 log.info(sb.toString()); 375 */ 376 } 377 } 378 379 /** 380 * Generates the public/private key pair from the init params 381 * 382 * @throws Exception 383 */ initKeyPair()384 public void initKeyPair() throws Exception { 385 // generate keys according to the specified algorithms 386 // generate publicKey and Private Key 387 KeyPairGenerator KpairGen=null; 388 if(asymProvider != null && asymProvider.trim().length() > 0) { 389 KpairGen=KeyPairGenerator.getInstance(getAlgorithm(asymAlgorithm), asymProvider); 390 } 391 else { 392 KpairGen=KeyPairGenerator.getInstance(getAlgorithm(asymAlgorithm)); 393 394 } 395 KpairGen.initialize(asymInit, new SecureRandom()); 396 Kpair=KpairGen.generateKeyPair(); 397 398 // set up the Cipher to decrypt secret key responses encrypted with our key 399 400 if(asymProvider != null && asymProvider.trim().length() > 0) 401 asymCipher=Cipher.getInstance(asymAlgorithm, asymProvider); 402 else 403 asymCipher=Cipher.getInstance(asymAlgorithm); 404 405 asymCipher.init(Cipher.DECRYPT_MODE, Kpair.getPrivate()); 406 407 if(log.isInfoEnabled()) 408 log.info(" asym algo initialized"); 409 } 410 411 /** Just remove if you don't need to reset any state */ reset()412 public void reset() {} 413 414 /* (non-Javadoc) 415 * @see org.jgroups.stack.Protocol#up(org.jgroups.Event) 416 */ up(Event evt)417 public Object up(Event evt) { 418 switch(evt.getType()) { 419 case Event.VIEW_CHANGE: 420 View view=(View)evt.getArg(); 421 if(log.isInfoEnabled()) 422 log.info("handling view-change up: " + view); 423 if(!suppliedKey) { 424 handleViewChange(view, false); 425 } 426 break; 427 case Event.TMP_VIEW: 428 view=(View)evt.getArg(); 429 if(log.isInfoEnabled()) 430 log.info("handling tmp-view up: " + view); 431 if(!suppliedKey) { 432 // if a tmp_view then we are trying to become coordinator so 433 // make us keyserver 434 handleViewChange(view, true); 435 } 436 break; 437 // we try and decrypt all messages 438 case Event.MSG: 439 try { 440 handleUpMessage(evt); 441 } 442 catch(Exception e) { 443 log.warn("exception occurred decrypting message", e); 444 } 445 return null; 446 default: 447 break; 448 } 449 return passItUp(evt); 450 } 451 passItUp(Event evt)452 public Object passItUp(Event evt) { 453 if(observer != null) 454 observer.passUp(evt); 455 return up_prot != null? up_prot.up(evt) : null; 456 } 457 handleViewChange(View view, boolean makeServer)458 private synchronized void handleViewChange(View view, boolean makeServer) { 459 460 // if view is a bit broken set me as keyserver 461 Vector<Address> members = view.getMembers(); 462 if (members == null || members.isEmpty() || members.get(0) == null) { 463 becomeKeyServer(local_addr); 464 return; 465 } 466 // otherwise get keyserver from view controller 467 Address tmpKeyServer=view.getMembers().get(0); 468 469 //I am new keyserver - either first member of group or old key server is no more and 470 // I have been voted new controller 471 if(makeServer || (tmpKeyServer.equals(local_addr) && (keyServerAddr == null || (!tmpKeyServer.equals(keyServerAddr))))) { 472 becomeKeyServer(tmpKeyServer); 473 // a new keyserver has been set and it is not me 474 } 475 else if(keyServerAddr == null || (!tmpKeyServer.equals(keyServerAddr))) { 476 handleNewKeyServer(tmpKeyServer); 477 } 478 else { 479 if(log.isDebugEnabled()) 480 log.debug("Membership has changed but I do not care"); 481 } 482 } 483 484 /** 485 * Handles becoming server - resetting queue settings and setting keyserver 486 * address to be local address. 487 * 488 * @param tmpKeyServer 489 */ becomeKeyServer(Address tmpKeyServer)490 private void becomeKeyServer(Address tmpKeyServer) { 491 keyServerAddr=tmpKeyServer; 492 keyServer=true; 493 if(log.isInfoEnabled()) 494 log.info("I have become key server " + keyServerAddr); 495 queue_down=false; 496 queue_up=false; 497 } 498 499 /** 500 * Sets up the peer for a new keyserver - this is setting queueing to buffer 501 * messages until we have a new secret key from the key server and sending a 502 * key request to the new keyserver. 503 * 504 * @param newKeyServer 505 */ handleNewKeyServer(Address newKeyServer)506 private void handleNewKeyServer(Address newKeyServer) { 507 // start queueing until we have new key 508 // to make sure we are not sending with old key 509 queue_up=true; 510 queue_down=true; 511 // set new keyserver address 512 keyServerAddr=newKeyServer; 513 keyServer=false; 514 if(log.isInfoEnabled()) 515 log.info("Sending key request"); 516 517 // create a key request message 518 sendKeyRequest(); 519 } 520 521 /** 522 * @param evt 523 */ handleUpMessage(Event evt)524 private void handleUpMessage(Event evt) throws Exception { 525 Message msg=(Message)evt.getArg(); 526 527 if(msg == null) { 528 if(log.isTraceEnabled()) 529 log.trace("null message - passing straight up"); 530 passItUp(evt); 531 return; 532 } 533 534 if(msg.getLength() == 0 && !encrypt_entire_message) { 535 passItUp(evt); 536 return; 537 } 538 539 EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id); 540 541 // try and get the encryption header 542 if(hdr == null) { 543 if(log.isTraceEnabled()) 544 log.trace("dropping message as ENCRYPT header is null or has not been recognized, msg will not be passed up, " + "headers are " 545 + msg.printHeaders()); 546 return; 547 } 548 549 if(log.isTraceEnabled()) 550 log.trace("header received " + hdr); 551 552 // if a normal message try and decrypt it 553 if(hdr.getType() == EncryptHeader.ENCRYPT) { 554 // if msg buffer is empty, and we didn't encrypt the entire message, just pass up 555 if(!hdr.encrypt_entire_msg && ((Message)evt.getArg()).getLength() == 0) { 556 if(log.isTraceEnabled()) 557 log.trace("passing up message as it has an empty buffer "); 558 passItUp(evt); 559 return; 560 } 561 562 // if queueing then pass into queue to be dealt with later 563 if(queue_up) { 564 if(log.isTraceEnabled()) 565 log.trace("queueing up message as no session key established: " + evt.getArg()); 566 upMessageQueue.put(evt); 567 } 568 else { 569 // make sure we pass up any queued messages first 570 // could be more optimised but this can wait 571 // we only need this if not using supplied key 572 if(!suppliedKey) { 573 drainUpQueue(); 574 } 575 // try and decrypt the message - we need to copy msg as we modify its 576 // buffer (http://jira.jboss.com/jira/browse/JGRP-538) 577 Message tmpMsg=decryptMessage(symDecodingCipher, msg.copy()); 578 if(tmpMsg != null) { 579 if(log.isTraceEnabled()) 580 log.trace("decrypted message " + tmpMsg); 581 passItUp(new Event(Event.MSG, tmpMsg)); 582 } 583 else { 584 log.warn("Unrecognised cipher discarding message"); 585 } 586 } 587 } 588 else { 589 // check if we had some sort of encrypt control 590 // header if using supplied key we should not 591 // process it 592 if(suppliedKey) { 593 if(log.isWarnEnabled()) { 594 log.warn("We received an encrypt header of " + hdr.getType() 595 + " while in configured mode"); 596 } 597 } 598 else { 599 // see what sort of encrypt control message we 600 // have received 601 switch(hdr.getType()) { 602 // if a key request 603 case EncryptHeader.KEY_REQUEST: 604 if(log.isInfoEnabled()) { 605 log.info("received a key request from peer"); 606 } 607 608 //if a key request send response key back 609 try { 610 // extract peer's public key 611 PublicKey tmpKey=generatePubKey(msg.getBuffer()); 612 // send back the secret key we have 613 sendSecretKey(getSecretKey(), tmpKey, msg.getSrc()); 614 } 615 catch(Exception e) { 616 log.warn("unable to reconstitute peer's public key"); 617 } 618 break; 619 case EncryptHeader.SECRETKEY: 620 if(log.isInfoEnabled()) { 621 log.info("received a secretkey response from keyserver"); 622 } 623 624 try { 625 SecretKey tmp=decodeKey(msg.getBuffer()); 626 if(tmp == null) { 627 // unable to understand response 628 // lets try again 629 sendKeyRequest(); 630 } 631 else { 632 // otherwise lets set the reurned key 633 // as the shared key 634 setKeys(tmp, hdr.getVersion()); 635 if(log.isInfoEnabled()) { 636 log.info("Decoded secretkey response"); 637 } 638 } 639 } 640 catch(Exception e) { 641 log.warn("unable to process received public key"); 642 } 643 break; 644 default: 645 log.warn("Received ignored encrypt header of " + hdr.getType()); 646 break; 647 } 648 } 649 } 650 } 651 652 /** 653 * used to drain the up queue - synchronized so we can call it safely 654 * despite access from potentially two threads at once 655 * 656 * @throws QueueClosedException 657 * @throws Exception 658 */ drainUpQueue()659 private void drainUpQueue() throws Exception { 660 //we do not synchronize here as we only have one up thread so we should never get an issue 661 //synchronized(upLock){ 662 Event tmp=null; 663 while((tmp=upMessageQueue.poll(0L, TimeUnit.MILLISECONDS)) != null) { 664 Message msg=decryptMessage(symDecodingCipher, ((Message)tmp.getArg()).copy()); 665 666 if(msg != null) { 667 if(log.isTraceEnabled()) { 668 log.trace("passing up message from drain " + msg); 669 } 670 passItUp(new Event(Event.MSG, msg)); 671 } 672 else { 673 log.warn("discarding message in queue up drain as cannot decode it"); 674 } 675 } 676 } 677 678 /** 679 * Sets the keys for the app. and drains the queues - the drains could be 680 * called att he same time as the up/down messages calling in to the class 681 * so we may have an extra call to the drain methods but this slight expense 682 * is better than the alternative of waiting until the next message to 683 * trigger the drains which may never happen. 684 * 685 * @param key 686 * @param version 687 * @throws Exception 688 */ setKeys(SecretKey key, String version)689 private void setKeys(SecretKey key, String version) throws Exception { 690 691 // put the previous key into the map 692 // if the keys are already there then they will overwrite 693 keyMap.put(getSymVersion(), getSymDecodingCipher()); 694 695 setSecretKey(key); 696 initSymCiphers(key.getAlgorithm(), key); 697 setSymVersion(version); 698 699 // drain the up queue 700 log.info("setting queue up to false in setKeys"); 701 queue_up=false; 702 drainUpQueue(); 703 704 queue_down=false; 705 drainDownQueue(); 706 } 707 708 /** 709 * Does the actual work for decrypting - if version does not match current 710 * cipher then tries to use previous cipher 711 * 712 * @param cipher 713 * @param msg 714 * @return 715 * @throws Exception 716 */ decryptMessage(Cipher cipher, Message msg)717 private Message decryptMessage(Cipher cipher, Message msg) throws Exception { 718 EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id); 719 if(!hdr.getVersion().equals(getSymVersion())) { 720 log.warn("attempting to use stored cipher as message does not uses current encryption version "); 721 cipher=keyMap.get(hdr.getVersion()); 722 if(cipher == null) { 723 log.warn("Unable to find a matching cipher in previous key map"); 724 return null; 725 } 726 else { 727 if(log.isTraceEnabled()) 728 log.trace("decrypting using previous cipher version " + hdr.getVersion()); 729 return _decrypt(cipher, msg, hdr.encrypt_entire_msg); 730 } 731 } 732 733 else { 734 735 // reset buffer with decrypted message 736 return _decrypt(cipher, msg, hdr.encrypt_entire_msg); 737 } 738 } 739 _decrypt(Cipher cipher, Message msg, boolean decrypt_entire_msg)740 private Message _decrypt(Cipher cipher, Message msg, boolean decrypt_entire_msg) throws Exception { 741 byte[] decrypted_msg; 742 743 decrypt_lock.lock(); 744 try { 745 decrypted_msg=cipher.doFinal(msg.getRawBuffer(), msg.getOffset(), msg.getLength()); 746 } 747 finally { 748 decrypt_lock.unlock(); 749 } 750 751 if(!decrypt_entire_msg) { 752 msg.setBuffer(decrypted_msg); 753 return msg; 754 } 755 756 Message ret=(Message)Util.streamableFromByteBuffer(Message.class, decrypted_msg); 757 if(ret.getDest() == null) 758 ret.setDest(msg.getDest()); 759 if(ret.getSrc() == null) 760 ret.setSrc(msg.getSrc()); 761 return ret; 762 } 763 764 /** 765 * @param secret 766 * @param pubKey 767 * @throws InvalidKeyException 768 * @throws IllegalStateException 769 * @throws IllegalBlockSizeException 770 * @throws BadPaddingException 771 */ sendSecretKey(SecretKey secret, PublicKey pubKey, Address source)772 private void sendSecretKey(SecretKey secret, PublicKey pubKey, Address source) throws InvalidKeyException, 773 IllegalStateException, 774 IllegalBlockSizeException, 775 BadPaddingException, 776 NoSuchPaddingException, 777 NoSuchAlgorithmException { 778 Message newMsg; 779 780 if(log.isDebugEnabled()) 781 log.debug("encoding shared key "); 782 783 // create a cipher with peer's public key 784 Cipher tmp=Cipher.getInstance(asymAlgorithm); 785 tmp.init(Cipher.ENCRYPT_MODE, pubKey); 786 787 //encrypt current secret key 788 byte[] encryptedKey=tmp.doFinal(secret.getEncoded()); 789 790 //SW logout encrypted bytes we are sending so we 791 // can match the clients log to see if they match 792 if(log.isDebugEnabled()) 793 log.debug(" Generated encoded key which only client can decode:" + formatArray(encryptedKey)); 794 795 newMsg=new Message(source, local_addr, encryptedKey); 796 797 newMsg.putHeader(this.id, new EncryptHeader(EncryptHeader.SECRETKEY, getSymVersion())); 798 799 if(log.isDebugEnabled()) 800 log.debug(" Sending version " + getSymVersion() + " encoded key to client"); 801 passItDown(new Event(Event.MSG, newMsg)); 802 } 803 804 805 806 /** 807 * @return Message 808 */ sendKeyRequest()809 private Message sendKeyRequest() { 810 811 // send client's public key to server and request 812 // server's public key 813 Message newMsg=new Message(keyServerAddr, local_addr, Kpair.getPublic().getEncoded()); 814 815 newMsg.putHeader(this.id, new EncryptHeader(EncryptHeader.KEY_REQUEST, getSymVersion())); 816 passItDown(new Event(Event.MSG, newMsg)); 817 return newMsg; 818 } 819 820 /* (non-Javadoc) 821 * @see org.jgroups.stack.Protocol#down(org.jgroups.Event) 822 */ down(Event evt)823 public Object down(Event evt) { 824 if(observer != null) 825 observer.down(evt); 826 827 switch(evt.getType()) { 828 829 case Event.MSG: 830 try { 831 if(queue_down) { 832 if(log.isTraceEnabled()) 833 log.trace("queueing down message as no session key established" + evt.getArg()); 834 downMessageQueue.put(evt); // queue messages if we are waiting for a new key 835 } 836 else { 837 // make sure the down queue is drained first to keep ordering 838 if(!suppliedKey) { 839 drainDownQueue(); 840 } 841 sendDown(evt); 842 } 843 844 } 845 catch(Exception e) { 846 log.warn("unable to send down event " + e); 847 } 848 return null; 849 850 case Event.VIEW_CHANGE: 851 View view=(View)evt.getArg(); 852 if(log.isInfoEnabled()) 853 log.info("handling view-change down: " + view); 854 if(!suppliedKey) { 855 handleViewChange(view, false); 856 } 857 break; 858 859 case Event.SET_LOCAL_ADDRESS: 860 local_addr=(Address)evt.getArg(); 861 if(log.isDebugEnabled()) 862 log.debug("set local address to " + local_addr); 863 break; 864 865 case Event.TMP_VIEW: 866 view=(View)evt.getArg(); 867 if(log.isInfoEnabled()) 868 log.info("handling tmp-view down: " + view); 869 if(!suppliedKey) { 870 // if a tmp_view then we are trying to become coordinator so 871 // make us keyserver 872 handleViewChange(view, true); 873 } 874 break; 875 default: 876 break; 877 } 878 return down_prot.down(evt); 879 } 880 passItDown(Event evt)881 public Object passItDown(Event evt) { 882 if(observer != null) 883 observer.passDown(evt); 884 return down_prot != null? down_prot.down(evt) : null; 885 } 886 887 /** 888 * @throws Exception 889 * @throws QueueClosedException 890 */ drainDownQueue()891 private void drainDownQueue() throws Exception { 892 // we do not synchronize here as we only have one down thread so we should never get an issue 893 // first lets replay any oustanding events 894 Event tmp=null; 895 while((tmp=downMessageQueue.poll(0L, TimeUnit.MILLISECONDS)) != null) { 896 sendDown(tmp); 897 } 898 } 899 900 /** 901 * @param evt 902 * @throws Exception 903 */ sendDown(Event evt)904 private void sendDown(Event evt) throws Exception { 905 if(evt.getType() != Event.MSG) { 906 return; 907 } 908 909 Message msg=(Message)evt.getArg(); 910 if(msg.getLength() == 0 && !encrypt_entire_message) { 911 passItDown(evt); 912 return; 913 } 914 915 EncryptHeader hdr=new EncryptHeader(EncryptHeader.ENCRYPT, getSymVersion()); 916 hdr.encrypt_entire_msg=this.encrypt_entire_message; 917 918 if(encrypt_entire_message) { 919 byte[] serialized_msg=Util.streamableToByteBuffer(msg); 920 byte[] encrypted_msg=encryptMessage(symEncodingCipher, 921 serialized_msg, 922 0, 923 serialized_msg.length); 924 Message tmp=msg.copy(false); // we need to preserve headers which may already be present 925 tmp.setBuffer(encrypted_msg); 926 if(tmp.getSrc() == null) 927 tmp.setSrc(local_addr); 928 tmp.putHeader(this.id, hdr); 929 passItDown(new Event(Event.MSG, tmp)); 930 return; 931 } 932 933 // put our encrypt header on the message 934 msg.putHeader(this.id, hdr); 935 936 // copy neeeded because same message (object) may be retransmitted -> no double encryption 937 Message msgEncrypted=msg.copy(false); 938 msgEncrypted.setBuffer(encryptMessage(symEncodingCipher, 939 msg.getRawBuffer(), 940 msg.getOffset(), 941 msg.getLength())); 942 passItDown(new Event(Event.MSG, msgEncrypted)); 943 } 944 945 /** 946 * 947 * @param cipher 948 * @param plain 949 * @return 950 * @throws Exception 951 */ encryptMessage(Cipher cipher, byte[] plain, int offset, int length)952 private synchronized byte[] encryptMessage(Cipher cipher, byte[] plain, int offset, int length) throws Exception { 953 return cipher.doFinal(plain, offset, length); 954 } 955 decodeKey(byte[] encodedKey)956 private SecretKeySpec decodeKey(byte[] encodedKey) throws Exception { 957 // try and decode secrey key sent from keyserver 958 byte[] keyBytes; 959 960 synchronized(this) { 961 keyBytes=asymCipher.doFinal(encodedKey); 962 } 963 964 SecretKeySpec keySpec=null; 965 try { 966 keySpec=new SecretKeySpec(keyBytes, getAlgorithm(symAlgorithm)); 967 968 // test reconstituted key to see if valid 969 Cipher temp=Cipher.getInstance(symAlgorithm); 970 temp.init(Cipher.SECRET_KEY, keySpec); 971 } 972 catch(Exception e) { 973 log.fatal(e.toString()); 974 keySpec=null; 975 } 976 return keySpec; 977 } 978 979 /** 980 * used to reconstitute public key sent in byte form from peer 981 * 982 * @param encodedKey 983 * @return PublicKey 984 */ generatePubKey(byte[] encodedKey)985 private PublicKey generatePubKey(byte[] encodedKey) { 986 PublicKey pubKey=null; 987 try { 988 KeyFactory KeyFac=KeyFactory.getInstance(getAlgorithm(asymAlgorithm)); 989 X509EncodedKeySpec x509KeySpec=new X509EncodedKeySpec(encodedKey); 990 pubKey=KeyFac.generatePublic(x509KeySpec); 991 } 992 catch(Exception e) { 993 e.printStackTrace(); 994 } 995 return pubKey; 996 } 997 998 /* 999 * simple helper method so we can see the format of the byte arrays in a 1000 * readable form could be better to use Base64 but will do for now 1001 */ formatArray(byte[] array)1002 private static String formatArray(byte[] array) { 1003 StringBuilder buf=new StringBuilder(); 1004 for(int i=0;i < array.length;i++) { 1005 buf.append(Integer.toHexString(array[i])); 1006 } 1007 return buf.toString(); 1008 } 1009 1010 /** 1011 * @return Returns the asymInit. 1012 */ getAsymInit()1013 protected int getAsymInit() { 1014 return asymInit; 1015 } 1016 1017 /** 1018 * @return Returns the asymProvider. 1019 */ getAsymProvider()1020 protected String getAsymProvider() { 1021 return asymProvider; 1022 } 1023 1024 /** 1025 * @return Returns the desKey. 1026 */ getDesKey()1027 protected SecretKey getDesKey() { 1028 return secretKey; 1029 } 1030 1031 /** 1032 * @return Returns the kpair. 1033 */ getKpair()1034 protected KeyPair getKpair() { 1035 return Kpair; 1036 } 1037 1038 /** 1039 * @return Returns the asymCipher. 1040 */ getAsymCipher()1041 protected Cipher getAsymCipher() { 1042 return asymCipher; 1043 } 1044 1045 /** 1046 * @return Returns the serverPubKey. 1047 */ getServerPubKey()1048 protected PublicKey getServerPubKey() { 1049 return serverPubKey; 1050 } 1051 1052 /** 1053 * @return Returns the symAlgorithm. 1054 */ getSymAlgorithm()1055 protected String getSymAlgorithm() { 1056 return symAlgorithm; 1057 } 1058 1059 /** 1060 * @return Returns the symInit. 1061 */ getSymInit()1062 protected int getSymInit() { 1063 return symInit; 1064 } 1065 1066 /** 1067 * @return Returns the symProvider. 1068 */ getSymProvider()1069 protected String getSymProvider() { 1070 return symProvider; 1071 } 1072 1073 /** 1074 * @return Returns the asymAlgorithm. 1075 */ getAsymAlgorithm()1076 protected String getAsymAlgorithm() { 1077 return asymAlgorithm; 1078 } 1079 1080 /** 1081 * @return Returns the symVersion. 1082 */ getSymVersion()1083 private String getSymVersion() { 1084 return symVersion; 1085 } 1086 1087 /** 1088 * @param symVersion 1089 * The symVersion to set. 1090 */ setSymVersion(String symVersion)1091 private void setSymVersion(String symVersion) { 1092 this.symVersion=symVersion; 1093 } 1094 1095 /** 1096 * @return Returns the secretKey. 1097 */ getSecretKey()1098 private SecretKey getSecretKey() { 1099 return secretKey; 1100 } 1101 1102 /** 1103 * @param secretKey 1104 * The secretKey to set. 1105 */ setSecretKey(SecretKey secretKey)1106 private void setSecretKey(SecretKey secretKey) { 1107 this.secretKey=secretKey; 1108 } 1109 1110 /** 1111 * @return Returns the keyStoreName. 1112 */ getKeyStoreName()1113 protected String getKeyStoreName() { 1114 return keyStoreName; 1115 } 1116 1117 /** 1118 * @return Returns the symDecodingCipher. 1119 */ getSymDecodingCipher()1120 protected Cipher getSymDecodingCipher() { 1121 return symDecodingCipher; 1122 } 1123 1124 /** 1125 * @return Returns the symEncodingCipher. 1126 */ getSymEncodingCipher()1127 protected Cipher getSymEncodingCipher() { 1128 return symEncodingCipher; 1129 } 1130 1131 /** 1132 * @return Returns the local_addr. 1133 */ getLocal_addr()1134 protected Address getLocal_addr() { 1135 return local_addr; 1136 } 1137 1138 /** 1139 * @param local_addr 1140 * The local_addr to set. 1141 */ setLocal_addr(Address local_addr)1142 protected void setLocal_addr(Address local_addr) { 1143 this.local_addr=local_addr; 1144 } 1145 1146 /** 1147 * @return Returns the keyServerAddr. 1148 */ getKeyServerAddr()1149 protected Address getKeyServerAddr() { 1150 return keyServerAddr; 1151 } 1152 1153 /** 1154 * @param keyServerAddr 1155 * The keyServerAddr to set. 1156 */ setKeyServerAddr(Address keyServerAddr)1157 protected void setKeyServerAddr(Address keyServerAddr) { 1158 this.keyServerAddr=keyServerAddr; 1159 } 1160 1161 public static class EncryptHeader extends org.jgroups.Header { 1162 short type; 1163 public static final short ENCRYPT=0; 1164 public static final short KEY_REQUEST=1; 1165 public static final short SERVER_PUBKEY=2; 1166 public static final short SECRETKEY=3; 1167 public static final short SECRETKEY_READY=4; 1168 1169 String version; 1170 boolean encrypt_entire_msg=false; 1171 EncryptHeader()1172 public EncryptHeader() {} 1173 EncryptHeader(short type)1174 public EncryptHeader(short type) { 1175 //this(type, 0l); 1176 this.type=type; 1177 this.version=""; 1178 } 1179 EncryptHeader(short type,String version)1180 public EncryptHeader(short type,String version) { 1181 this.type=type; 1182 this.version=version; 1183 } 1184 1185 writeTo(DataOutputStream out)1186 public void writeTo(DataOutputStream out) throws IOException { 1187 out.writeShort(type); 1188 Util.writeString(version, out); 1189 out.writeBoolean(encrypt_entire_msg); 1190 } 1191 readFrom(DataInputStream in)1192 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { 1193 type=in.readShort(); 1194 version=Util.readString(in); 1195 encrypt_entire_msg=in.readBoolean(); 1196 } 1197 toString()1198 public String toString() { 1199 return "ENCRYPT [type=" + type 1200 + " version=\"" 1201 + (version != null? version.length() + " bytes" : "n/a") 1202 + "\"]"; 1203 } 1204 size()1205 public int size() { 1206 int retval=Global.SHORT_SIZE + Global.BYTE_SIZE + Global.BYTE_SIZE; 1207 if(version != null) 1208 retval+=version.length() + 2; 1209 return retval; 1210 } 1211 1212 1213 /** 1214 * @return Returns the type. 1215 */ getType()1216 protected short getType() { 1217 return type; 1218 } 1219 1220 /** 1221 * @return Returns the version. 1222 */ getVersion()1223 protected String getVersion() { 1224 return version; 1225 } 1226 } 1227 } 1228