1 
2 package org.jgroups.protocols;
3 
4 import org.jgroups.*;
5 import org.jgroups.annotations.GuardedBy;
6 import org.jgroups.annotations.Property;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.QueueClosedException;
9 import org.jgroups.util.Util;
10 
11 import javax.crypto.*;
12 import javax.crypto.spec.SecretKeySpec;
13 import java.io.DataInputStream;
14 import java.io.DataOutputStream;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.security.*;
18 import java.security.cert.CertificateException;
19 import java.security.spec.X509EncodedKeySpec;
20 import java.util.Map;
21 import java.util.Vector;
22 import java.util.WeakHashMap;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.locks.Lock;
27 import java.util.concurrent.locks.ReentrantLock;
28 
29 /**
30  * ENCRYPT layer. Encrypt and decrypt the group communication in JGroups
31  *
32  * The file can be used in two ways:
33  * <ul>
34  * <li> Option 1. Configured with a secretKey in a keystore so it can be used at
35  * any layer in JGroups without the need for a coordinator, or if you want
36  * protection against passive monitoring but do not want the key exchange
37  * overhead and complexity. In this mode all nodes must be distributed with the
38  * same keystore file.
39  * <li> Option 2. Configured with algorithms and key sizes. The Encrypt Layer in
40  * this mode sould be used between the FRAG and PBCast layers in the stack. The
41  * coordinator then chooses the secretkey which it distributes amongst all the
42  * peers. In this form no keystore exists as the keys are distributed using a
43  * public/private key exchange. View changes that identify a new controller will
44  * result in a new session key being generated and then distributed to all
45  * peers. This overhead can be substantial in a an application with a reasonable
46  * peer churn.
47  * </ul>
48  * <p>
49  * <p>
50  * Each message is identified as encrypted with a specific encryption header
51  * which identifies the type of encrypt header and an MD5 digest that identifies
52  * the version of the key being used to encrypt/decrypt the messages.
53  * <p>
54  * <p>
55  * <h2>Option 1</h2>
56  * <br>
57  * This is the simplest option and can be used by simply inserting the
58  * Encryption layer at any point in the JGroup stack - it will encrypt all
59  * Events of a type MSG that have a non-null message buffer. The format of the
60  * entry in this form is:<br>
61  * &lt;ENCRYPT key_store_name="defaultStore.keystore" store_password="changeit"
62  * alias="myKey"/&gt;<br>
63  * An example bare-bones.xml file showing the keystore version can be found in
64  * the conf ina file called EncryptKeyStore.xml - along with a
65  * defaultStore.keystore file.<br>
66  * In order to use the Encrypt layer in this manner it is necessary to have the
67  * secretKey already generated in a keystore file. The directory containing the
68  * keystore file must be on the application's classpath. You cannot create a
69  * SecretKey keystore file using the keytool application shipped with the JDK. A
70  * java file called KeyStoreGenerator is included in the demo package that can
71  * be used from the command line (or IDE) to generate a suitable keystore.
72  * <p>
73  * <p>
74  * <h2>Option 2</h2>
75  * <br>
76  * This option is suited to an application that does not ship with a known key
77  * but instead it is generated and distributed by the controller. The secret key
78  * is first generated by the Controller (in JGroup terms). When a view change
79  * occurs a peer will request the secret key by sending a key request with its
80  * own public key. The controller encrypts the secret key with this key and
81  * sends it back to the peer who then decrypts it and installs the key as its
82  * own secret key. <br>
83  * All encryption and decryption of Messages is done using this key. When a peer
84  * receives a view change that shows a different keyserver it will repeat this
85  * process - the view change event also trigger the encrypt layer to queue up
86  * and down messages until the new key is installed. The previous keys are
87  * retained so that messages sent before the view change that are queued can be
88  * decrypted if the key is different. <br>
89  * An example EncryptNoKeyStore.xml is included in the conf file as a guide.
90  * <p>
91  * <p>
92  * <br>
93  * Note: the current version does not support the concept of perfect forward
94  * encryption (PFE) which means that if a peer leaves the group the keys are
95  * re-generated preventing the departed peer from decrypting future messages if
96  * it chooses to listen in on the group. This is not included as it really
97  * requires a suitable authentication scheme as well to make this feature useful
98  * as there is nothing to stop the peer rejoining and receiving the new key. A
99  * future release will address this issue.
100  *
101  * @author Steve Woodcock
102  * @author Bela Ban
103  */
104 public class ENCRYPT extends Protocol {
105     Observer observer;
106 
107     interface Observer {
up(Event evt)108         void up(Event evt);
109 
passUp(Event evt)110         void passUp(Event evt);
111 
down(Event evt)112         void down(Event evt);
113 
passDown(Event evt)114         void passDown(Event evt);
115     }
116 
117     private static final String DEFAULT_SYM_ALGO="AES";
118     // address info
119     Address local_addr=null;
120     // keyserver address
121     Address keyServerAddr=null;
122 
123     //used to see whether we are the key server
124     boolean keyServer=false;
125 
126     /* -----------------------------------------    Properties     -------------------------------------------------- */
127 
128     // encryption properties in no supplied key mode
129     @Property(name="asym_provider", description="Cryptographic Service Provider. Default is Bouncy Castle Provider")
130     String asymProvider=null;
131 
132     @Property(name="sym_provider", description="Cryptographic Service Provider. Default is Bouncy Castle Provider")
133     String symProvider=null;
134 
135     @Property(name="asym_algorithm", description="Cipher engine transformation for asymmetric algorithm. Default is RSA")
136     String asymAlgorithm="RSA";
137 
138     @Property(name="sym_algorithm", description="Cipher engine transformation for symmetric algorithm. Default is AES")
139     String symAlgorithm=DEFAULT_SYM_ALGO;
140 
141     @Property(name="asym_init", description="Initial public/private key length. Default is 512")
142     int asymInit=512;
143 
144     @Property(name="sym_init", description="Initial key length for matching symmetric algorithm. Default is 128")
145     int symInit=128;
146 
147     // properties for functioning in supplied key mode
148     private boolean suppliedKey=false;
149 
150     @Property(name="key_store_name", description="File on classpath that contains keystore repository")
151     String keyStoreName;
152 
153     @Property(name="store_password", description="Password used to check the integrity/unlock the keystore. Change the default")
154     private String storePassword="changeit"; //JDK default
155 
156     @Property(name="key_password", description="Password for recovering the key. Change the default")
157     private String keyPassword="changeit"; //JDK default
158 
159     @Property(name="alias", description="Alias used for recovering the key. Change the default")
160     private String alias="mykey"; // JDK default
161 
162     // public/private Key
163     KeyPair Kpair; // to store own's public/private Key
164 
165     //	 for client to store server's public Key
166     PublicKey serverPubKey=null;
167 
168     // needed because we do simultaneous encode/decode with these ciphers - which
169     // would be a threading issue
170     Cipher symEncodingCipher;
171 
172     @GuardedBy("decrypt_lock")
173     Cipher symDecodingCipher;
174 
175     /** To synchronize access to symDecodingCipher */
176     protected final Lock decrypt_lock=new ReentrantLock();
177 
178     // version filed for secret key
179     private String symVersion=null;
180     // dhared secret key to encrypt/decrypt messages
181     SecretKey secretKey=null;
182 
183     // map to hold previous keys so we can decrypt some earlier messages if we need to
184     final Map<String,Cipher> keyMap=new WeakHashMap<String,Cipher>();
185 
186     // queues to buffer data while we are swapping shared key
187     // or obtsining key for first time
188 
189     private boolean queue_up=true;
190 
191     private boolean queue_down=false;
192 
193     // queue to hold upcoming messages while key negotiation is happening
194     private BlockingQueue<Event> upMessageQueue=new LinkedBlockingQueue<Event>();
195 
196     //	 queue to hold downcoming messages while key negotiation is happening
197     private BlockingQueue<Event> downMessageQueue=new LinkedBlockingQueue<Event>();
198     // decrypting cypher for secret key requests
199     private Cipher asymCipher;
200 
201     /** determines whether to encrypt the entire message, or just the buffer */
202     @Property
203     private boolean encrypt_entire_message=false;
204 
ENCRYPT()205     public ENCRYPT() {}
206 
setObserver(Observer o)207     public void setObserver(Observer o) {
208         observer=o;
209     }
210 
211     /*
212       * GetAlgorithm: Get the algorithm name from "algorithm/mode/padding"
213       *  taken m original ENCRYPT file
214       */
getAlgorithm(String s)215     private static String getAlgorithm(String s) {
216         int index=s.indexOf("/");
217         if(index == -1)
218             return s;
219 
220         return s.substring(0, index);
221     }
222 
init()223     public void init() throws Exception {
224         if(keyPassword == null && storePassword != null) {
225             keyPassword=storePassword;
226 
227             if(log.isInfoEnabled())
228                 log.info("key_password used is same as store_password");
229         }
230         if(keyStoreName == null) {
231             initSymKey();
232             initKeyPair();
233         }
234         else {
235             initConfiguredKey();
236         }
237         initSymCiphers(symAlgorithm, getSecretKey());
238     }
239 
240     /**
241      * Initialisation if a supplied key is defined in the properties. This
242      * supplied key must be in a keystore which can be generated using the
243      * keystoreGenerator file in demos. The keystore must be on the classpath to
244      * find it.
245      *
246      * @throws KeyStoreException
247      * @throws Exception
248      * @throws IOException
249      * @throws NoSuchAlgorithmException
250      * @throws CertificateException
251      * @throws UnrecoverableKeyException
252      */
initConfiguredKey()253     private void initConfiguredKey() throws Exception {
254         InputStream inputStream=null;
255         // must not use default keystore type - as does not support secret keys
256         KeyStore store=KeyStore.getInstance("JCEKS");
257 
258         SecretKey tempKey=null;
259         try {
260             // load in keystore using this thread's classloader
261             inputStream=Thread.currentThread()
262                               .getContextClassLoader()
263                               .getResourceAsStream(keyStoreName);
264             // we can't find a keystore here -
265             if(inputStream == null) {
266                 throw new Exception("Unable to load keystore " + keyStoreName
267                                     + " ensure file is on classpath");
268             }
269             // we have located a file lets load the keystore
270             try {
271                 store.load(inputStream, storePassword.toCharArray());
272                 // loaded keystore - get the key
273                 tempKey=(SecretKey)store.getKey(alias, keyPassword.toCharArray());
274             }
275             catch(IOException e) {
276                 throw new Exception("Unable to load keystore " + keyStoreName + ": " + e);
277             }
278             catch(NoSuchAlgorithmException e) {
279                 throw new Exception("No Such algorithm " + keyStoreName + ": " + e);
280             }
281             catch(CertificateException e) {
282                 throw new Exception("Certificate exception " + keyStoreName + ": " + e);
283             }
284 
285             if(tempKey == null) {
286                 throw new Exception("Unable to retrieve key '" + alias
287                                     + "' from keystore "
288                                     + keyStoreName);
289             }
290             //set the key here
291             setSecretKey(tempKey);
292 
293             if(symAlgorithm.equals(DEFAULT_SYM_ALGO)) {
294                 symAlgorithm=tempKey.getAlgorithm();
295             }
296 
297             // set the fact we are using a supplied key
298 
299             suppliedKey=true;
300             queue_down=false;
301             queue_up=false;
302         }
303         finally {
304             Util.close(inputStream);
305         }
306 
307     }
308 
309     /**
310      * Used to initialise the symmetric key if none is supplied in a keystore.
311      *
312      * @throws Exception
313      */
initSymKey()314     public void initSymKey() throws Exception {
315         KeyGenerator keyGen=null;
316         // see if we have a provider specified
317         if(symProvider != null && symProvider.trim().length() > 0) {
318             keyGen=KeyGenerator.getInstance(getAlgorithm(symAlgorithm), symProvider);
319         }
320         else {
321             keyGen=KeyGenerator.getInstance(getAlgorithm(symAlgorithm));
322         }
323         // generate the key using the defined init properties
324         keyGen.init(symInit);
325         secretKey=keyGen.generateKey();
326 
327         setSecretKey(secretKey);
328 
329         if(log.isInfoEnabled())
330             log.info(" Symmetric key generated ");
331     }
332 
333     /**
334      * Initialises the Ciphers for both encryption and decryption using the
335      * generated or supplied secret key.
336      *
337      * @param algorithm
338      * @param secret
339      * @throws Exception
340      */
initSymCiphers(String algorithm, SecretKey secret)341     private void initSymCiphers(String algorithm, SecretKey secret) throws Exception {
342 
343         if(log.isInfoEnabled())
344             log.info(" Initializing symmetric ciphers");
345 
346         if(symProvider != null && symProvider.trim().length() > 0) {
347             symEncodingCipher=Cipher.getInstance(algorithm, symProvider);
348             symDecodingCipher=Cipher.getInstance(algorithm, symProvider);
349         }
350         else {
351             symEncodingCipher=Cipher.getInstance(algorithm);
352             symDecodingCipher=Cipher.getInstance(algorithm);
353         }
354 
355         symEncodingCipher.init(Cipher.ENCRYPT_MODE, secret);
356         symDecodingCipher.init(Cipher.DECRYPT_MODE, secret);
357 
358         //set the version
359         MessageDigest digest=MessageDigest.getInstance("MD5");
360         digest.reset();
361         digest.update(secret.getEncoded());
362 
363         symVersion=new String(digest.digest(), "UTF-8");
364         if(log.isInfoEnabled()) {
365             log.info(" Initialized symmetric ciphers with secret key (" + symVersion.length() + " bytes)");
366 	    /*
367             StringBuilder sb=new StringBuilder(" Initialized symmetric ciphers with secret key (" + symVersion.length()
368                                                + " bytes) ");
369             char[] arr=symVersion.toCharArray();
370             for(int i=0;i < arr.length;i++) {
371                 char c=arr[i];
372                 sb.append((int)c);
373             }
374             log.info(sb.toString());
375 	    */
376         }
377     }
378 
379     /**
380      * Generates the public/private key pair from the init params
381      *
382      * @throws Exception
383      */
initKeyPair()384     public void initKeyPair() throws Exception {
385         // generate keys according to the specified algorithms
386         // generate publicKey and Private Key
387         KeyPairGenerator KpairGen=null;
388         if(asymProvider != null && asymProvider.trim().length() > 0) {
389             KpairGen=KeyPairGenerator.getInstance(getAlgorithm(asymAlgorithm), asymProvider);
390         }
391         else {
392             KpairGen=KeyPairGenerator.getInstance(getAlgorithm(asymAlgorithm));
393 
394         }
395         KpairGen.initialize(asymInit, new SecureRandom());
396         Kpair=KpairGen.generateKeyPair();
397 
398         // set up the Cipher to decrypt secret key responses encrypted with our key
399 
400         if(asymProvider != null && asymProvider.trim().length() > 0)
401             asymCipher=Cipher.getInstance(asymAlgorithm, asymProvider);
402         else
403             asymCipher=Cipher.getInstance(asymAlgorithm);
404 
405         asymCipher.init(Cipher.DECRYPT_MODE, Kpair.getPrivate());
406 
407         if(log.isInfoEnabled())
408             log.info(" asym algo initialized");
409     }
410 
411     /** Just remove if you don't need to reset any state */
reset()412     public void reset() {}
413 
414     /* (non-Javadoc)
415       * @see org.jgroups.stack.Protocol#up(org.jgroups.Event)
416       */
up(Event evt)417     public Object up(Event evt) {
418         switch(evt.getType()) {
419             case Event.VIEW_CHANGE:
420                 View view=(View)evt.getArg();
421                 if(log.isInfoEnabled())
422                     log.info("handling view-change up: " + view);
423                 if(!suppliedKey) {
424                     handleViewChange(view, false);
425                 }
426                 break;
427             case Event.TMP_VIEW:
428                 view=(View)evt.getArg();
429                 if(log.isInfoEnabled())
430                     log.info("handling tmp-view up: " + view);
431                 if(!suppliedKey) {
432                     // if a tmp_view then we are trying to become coordinator so
433                     // make us keyserver
434                     handleViewChange(view, true);
435                 }
436                 break;
437             // we try and decrypt all messages
438             case Event.MSG:
439                 try {
440                     handleUpMessage(evt);
441                 }
442                 catch(Exception e) {
443                     log.warn("exception occurred decrypting message", e);
444                 }
445                 return null;
446             default:
447                 break;
448         }
449         return passItUp(evt);
450     }
451 
passItUp(Event evt)452     public Object passItUp(Event evt) {
453         if(observer != null)
454             observer.passUp(evt);
455         return up_prot != null? up_prot.up(evt) : null;
456     }
457 
handleViewChange(View view, boolean makeServer)458     private synchronized void handleViewChange(View view, boolean makeServer) {
459 
460         // if view is a bit broken set me as keyserver
461         Vector<Address> members = view.getMembers();
462         if (members == null || members.isEmpty() || members.get(0) == null) {
463             becomeKeyServer(local_addr);
464             return;
465         }
466         // otherwise get keyserver from view controller
467         Address tmpKeyServer=view.getMembers().get(0);
468 
469         //I am new keyserver - either first member of group or old key server is no more and
470         // I have been voted new controller
471         if(makeServer || (tmpKeyServer.equals(local_addr) && (keyServerAddr == null || (!tmpKeyServer.equals(keyServerAddr))))) {
472             becomeKeyServer(tmpKeyServer);
473             // a new keyserver has been set and it is not me
474         }
475         else if(keyServerAddr == null || (!tmpKeyServer.equals(keyServerAddr))) {
476             handleNewKeyServer(tmpKeyServer);
477         }
478         else {
479             if(log.isDebugEnabled())
480                 log.debug("Membership has changed but I do not care");
481         }
482     }
483 
484     /**
485      * Handles becoming server - resetting queue settings and setting keyserver
486      * address to be local address.
487      *
488      * @param tmpKeyServer
489      */
becomeKeyServer(Address tmpKeyServer)490     private void becomeKeyServer(Address tmpKeyServer) {
491         keyServerAddr=tmpKeyServer;
492         keyServer=true;
493         if(log.isInfoEnabled())
494             log.info("I have become key server " + keyServerAddr);
495         queue_down=false;
496         queue_up=false;
497     }
498 
499     /**
500      * Sets up the peer for a new keyserver - this is setting queueing to buffer
501      * messages until we have a new secret key from the key server and sending a
502      * key request to the new keyserver.
503      *
504      * @param newKeyServer
505      */
handleNewKeyServer(Address newKeyServer)506     private void handleNewKeyServer(Address newKeyServer) {
507         // start queueing until we have new key
508         // to make sure we are not sending with old key
509         queue_up=true;
510         queue_down=true;
511         // set new keyserver address
512         keyServerAddr=newKeyServer;
513         keyServer=false;
514         if(log.isInfoEnabled())
515             log.info("Sending key request");
516 
517         // create a key request message
518         sendKeyRequest();
519     }
520 
521     /**
522      * @param evt
523      */
handleUpMessage(Event evt)524     private void handleUpMessage(Event evt) throws Exception {
525         Message msg=(Message)evt.getArg();
526 
527         if(msg == null) {
528             if(log.isTraceEnabled())
529                 log.trace("null message - passing straight up");
530             passItUp(evt);
531             return;
532         }
533 
534         if(msg.getLength() == 0 && !encrypt_entire_message) {
535             passItUp(evt);
536             return;
537         }
538 
539         EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id);
540 
541         // try and get the encryption header
542         if(hdr == null) {
543             if(log.isTraceEnabled())
544                 log.trace("dropping message as ENCRYPT header is null  or has not been recognized, msg will not be passed up, " + "headers are "
545                           + msg.printHeaders());
546             return;
547         }
548 
549         if(log.isTraceEnabled())
550             log.trace("header received " + hdr);
551 
552         // if a normal message try and decrypt it
553         if(hdr.getType() == EncryptHeader.ENCRYPT) {
554             // if msg buffer is empty, and we didn't encrypt the entire message, just pass up
555             if(!hdr.encrypt_entire_msg && ((Message)evt.getArg()).getLength() == 0) {
556                 if(log.isTraceEnabled())
557                     log.trace("passing up message as it has an empty buffer ");
558                 passItUp(evt);
559                 return;
560             }
561 
562             // if queueing then pass into queue to be dealt with later
563             if(queue_up) {
564                 if(log.isTraceEnabled())
565                     log.trace("queueing up message as no session key established: " + evt.getArg());
566                 upMessageQueue.put(evt);
567             }
568             else {
569                 // make sure we pass up any queued messages first
570                 // could be more optimised but this can wait
571                 // we only need this if not using supplied key
572                 if(!suppliedKey) {
573                     drainUpQueue();
574                 }
575                 // try and decrypt the message - we need to copy msg as we modify its
576                 // buffer (http://jira.jboss.com/jira/browse/JGRP-538)
577                 Message tmpMsg=decryptMessage(symDecodingCipher, msg.copy());
578                 if(tmpMsg != null) {
579                     if(log.isTraceEnabled())
580                         log.trace("decrypted message " + tmpMsg);
581                     passItUp(new Event(Event.MSG, tmpMsg));
582                 }
583                 else {
584                     log.warn("Unrecognised cipher discarding message");
585                 }
586             }
587         }
588         else {
589             // check if we had some sort of encrypt control
590             // header if using supplied key we should not
591             // process it
592             if(suppliedKey) {
593                 if(log.isWarnEnabled()) {
594                     log.warn("We received an encrypt header of " + hdr.getType()
595                              + " while in configured mode");
596                 }
597             }
598             else {
599                 // see what sort of encrypt control message we
600                 // have received
601                 switch(hdr.getType()) {
602                     // if a key request
603                     case EncryptHeader.KEY_REQUEST:
604                         if(log.isInfoEnabled()) {
605                             log.info("received a key request from peer");
606                         }
607 
608                         //if a key request send response key back
609                         try {
610                             // extract peer's public key
611                             PublicKey tmpKey=generatePubKey(msg.getBuffer());
612                             // send back the secret key we have
613                             sendSecretKey(getSecretKey(), tmpKey, msg.getSrc());
614                         }
615                         catch(Exception e) {
616                             log.warn("unable to reconstitute peer's public key");
617                         }
618                         break;
619                     case EncryptHeader.SECRETKEY:
620                         if(log.isInfoEnabled()) {
621                             log.info("received a secretkey response from keyserver");
622                         }
623 
624                         try {
625                             SecretKey tmp=decodeKey(msg.getBuffer());
626                             if(tmp == null) {
627                                 // unable to understand response
628                                 // lets try again
629                                 sendKeyRequest();
630                             }
631                             else {
632                                 // otherwise lets set the reurned key
633                                 // as the shared key
634                                 setKeys(tmp, hdr.getVersion());
635                                 if(log.isInfoEnabled()) {
636                                     log.info("Decoded secretkey response");
637                                 }
638                             }
639                         }
640                         catch(Exception e) {
641                             log.warn("unable to process received public key");
642                         }
643                         break;
644                     default:
645                         log.warn("Received ignored encrypt header of " + hdr.getType());
646                         break;
647                 }
648             }
649         }
650     }
651 
652     /**
653      * used to drain the up queue - synchronized so we can call it safely
654      * despite access from potentially two threads at once
655      *
656      * @throws QueueClosedException
657      * @throws Exception
658      */
drainUpQueue()659     private void drainUpQueue() throws Exception {
660         //we do not synchronize here as we only have one up thread so we should never get an issue
661         //synchronized(upLock){
662         Event tmp=null;
663         while((tmp=upMessageQueue.poll(0L, TimeUnit.MILLISECONDS)) != null) {
664             Message msg=decryptMessage(symDecodingCipher, ((Message)tmp.getArg()).copy());
665 
666             if(msg != null) {
667                 if(log.isTraceEnabled()) {
668                     log.trace("passing up message from drain " + msg);
669                 }
670                 passItUp(new Event(Event.MSG, msg));
671             }
672             else {
673                 log.warn("discarding message in queue up drain as cannot decode it");
674             }
675         }
676     }
677 
678     /**
679      * Sets the keys for the app. and drains the queues - the drains could be
680      * called att he same time as the up/down messages calling in to the class
681      * so we may have an extra call to the drain methods but this slight expense
682      * is better than the alternative of waiting until the next message to
683      * trigger the drains which may never happen.
684      *
685      * @param key
686      * @param version
687      * @throws Exception
688      */
setKeys(SecretKey key, String version)689     private void setKeys(SecretKey key, String version) throws Exception {
690 
691         // put the previous key into the map
692         // if the keys are already there then they will overwrite
693         keyMap.put(getSymVersion(), getSymDecodingCipher());
694 
695         setSecretKey(key);
696         initSymCiphers(key.getAlgorithm(), key);
697         setSymVersion(version);
698 
699         // drain the up queue
700         log.info("setting queue up to false in setKeys");
701         queue_up=false;
702         drainUpQueue();
703 
704         queue_down=false;
705         drainDownQueue();
706     }
707 
708     /**
709      * Does the actual work for decrypting - if version does not match current
710      * cipher then tries to use previous cipher
711      *
712      * @param cipher
713      * @param msg
714      * @return
715      * @throws Exception
716      */
decryptMessage(Cipher cipher, Message msg)717     private Message decryptMessage(Cipher cipher, Message msg) throws Exception {
718         EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id);
719         if(!hdr.getVersion().equals(getSymVersion())) {
720             log.warn("attempting to use stored cipher as message does not uses current encryption version ");
721             cipher=keyMap.get(hdr.getVersion());
722             if(cipher == null) {
723                 log.warn("Unable to find a matching cipher in previous key map");
724                 return null;
725             }
726             else {
727                 if(log.isTraceEnabled())
728                     log.trace("decrypting using previous cipher version " + hdr.getVersion());
729                 return _decrypt(cipher, msg, hdr.encrypt_entire_msg);
730             }
731         }
732 
733         else {
734 
735             // reset buffer with decrypted message
736             return _decrypt(cipher, msg, hdr.encrypt_entire_msg);
737         }
738     }
739 
_decrypt(Cipher cipher, Message msg, boolean decrypt_entire_msg)740     private Message _decrypt(Cipher cipher, Message msg, boolean decrypt_entire_msg) throws Exception {
741         byte[] decrypted_msg;
742 
743         decrypt_lock.lock();
744         try {
745             decrypted_msg=cipher.doFinal(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
746         }
747         finally {
748             decrypt_lock.unlock();
749         }
750 
751         if(!decrypt_entire_msg) {
752             msg.setBuffer(decrypted_msg);
753             return msg;
754         }
755 
756         Message ret=(Message)Util.streamableFromByteBuffer(Message.class, decrypted_msg);
757         if(ret.getDest() == null)
758             ret.setDest(msg.getDest());
759         if(ret.getSrc() == null)
760             ret.setSrc(msg.getSrc());
761         return ret;
762     }
763 
764     /**
765      * @param secret
766      * @param pubKey
767      * @throws InvalidKeyException
768      * @throws IllegalStateException
769      * @throws IllegalBlockSizeException
770      * @throws BadPaddingException
771      */
sendSecretKey(SecretKey secret, PublicKey pubKey, Address source)772     private void sendSecretKey(SecretKey secret, PublicKey pubKey, Address source) throws InvalidKeyException,
773                                                                                   IllegalStateException,
774                                                                                   IllegalBlockSizeException,
775                                                                                   BadPaddingException,
776                                                                                   NoSuchPaddingException,
777                                                                                   NoSuchAlgorithmException {
778         Message newMsg;
779 
780         if(log.isDebugEnabled())
781             log.debug("encoding shared key ");
782 
783         // create a cipher with peer's public key
784         Cipher tmp=Cipher.getInstance(asymAlgorithm);
785         tmp.init(Cipher.ENCRYPT_MODE, pubKey);
786 
787         //encrypt current secret key
788         byte[] encryptedKey=tmp.doFinal(secret.getEncoded());
789 
790         //SW logout encrypted bytes we are sending so we
791         // can match the clients log to see if they match
792         if(log.isDebugEnabled())
793             log.debug(" Generated encoded key which only client can decode:" + formatArray(encryptedKey));
794 
795         newMsg=new Message(source, local_addr, encryptedKey);
796 
797         newMsg.putHeader(this.id, new EncryptHeader(EncryptHeader.SECRETKEY, getSymVersion()));
798 
799         if(log.isDebugEnabled())
800             log.debug(" Sending version " + getSymVersion() + " encoded key to client");
801         passItDown(new Event(Event.MSG, newMsg));
802     }
803 
804 
805 
806     /**
807      * @return Message
808      */
sendKeyRequest()809     private Message sendKeyRequest() {
810 
811         // send client's public key to server and request
812         // server's public key
813         Message newMsg=new Message(keyServerAddr, local_addr, Kpair.getPublic().getEncoded());
814 
815         newMsg.putHeader(this.id, new EncryptHeader(EncryptHeader.KEY_REQUEST, getSymVersion()));
816         passItDown(new Event(Event.MSG, newMsg));
817         return newMsg;
818     }
819 
820     /* (non-Javadoc)
821       * @see org.jgroups.stack.Protocol#down(org.jgroups.Event)
822       */
down(Event evt)823     public Object down(Event evt) {
824         if(observer != null)
825             observer.down(evt);
826 
827         switch(evt.getType()) {
828 
829             case Event.MSG:
830                 try {
831                     if(queue_down) {
832                         if(log.isTraceEnabled())
833                             log.trace("queueing down message as no session key established" + evt.getArg());
834                         downMessageQueue.put(evt); // queue messages if we are waiting for a new key
835                     }
836                     else {
837                         // make sure the down queue is drained first to keep ordering
838                         if(!suppliedKey) {
839                             drainDownQueue();
840                         }
841                         sendDown(evt);
842                     }
843 
844                 }
845                 catch(Exception e) {
846                     log.warn("unable to send down event " + e);
847                 }
848                 return null;
849 
850             case Event.VIEW_CHANGE:
851                 View view=(View)evt.getArg();
852                 if(log.isInfoEnabled())
853                     log.info("handling view-change down: " + view);
854                 if(!suppliedKey) {
855                     handleViewChange(view, false);
856                 }
857                 break;
858 
859             case Event.SET_LOCAL_ADDRESS:
860                 local_addr=(Address)evt.getArg();
861                 if(log.isDebugEnabled())
862                     log.debug("set local address to " + local_addr);
863                 break;
864 
865             case Event.TMP_VIEW:
866                 view=(View)evt.getArg();
867                 if(log.isInfoEnabled())
868                     log.info("handling tmp-view down: " + view);
869                 if(!suppliedKey) {
870                     // if a tmp_view then we are trying to become coordinator so
871                     // make us keyserver
872                     handleViewChange(view, true);
873                 }
874                 break;
875             default:
876                 break;
877         }
878         return down_prot.down(evt);
879     }
880 
passItDown(Event evt)881     public Object passItDown(Event evt) {
882         if(observer != null)
883             observer.passDown(evt);
884         return down_prot != null? down_prot.down(evt) : null;
885     }
886 
887     /**
888      * @throws Exception
889      * @throws QueueClosedException
890      */
drainDownQueue()891     private void drainDownQueue() throws Exception {
892         //	we do not synchronize here as we only have one down thread so we should never get an issue
893         //  first lets replay any oustanding events
894         Event tmp=null;
895         while((tmp=downMessageQueue.poll(0L, TimeUnit.MILLISECONDS)) != null) {
896             sendDown(tmp);
897         }
898     }
899 
900     /**
901      * @param evt
902      * @throws Exception
903      */
sendDown(Event evt)904     private void sendDown(Event evt) throws Exception {
905         if(evt.getType() != Event.MSG) {
906             return;
907         }
908 
909         Message msg=(Message)evt.getArg();
910         if(msg.getLength() == 0 && !encrypt_entire_message) {
911             passItDown(evt);
912             return;
913         }
914 
915         EncryptHeader hdr=new EncryptHeader(EncryptHeader.ENCRYPT, getSymVersion());
916         hdr.encrypt_entire_msg=this.encrypt_entire_message;
917 
918         if(encrypt_entire_message) {
919             byte[] serialized_msg=Util.streamableToByteBuffer(msg);
920             byte[] encrypted_msg=encryptMessage(symEncodingCipher,
921                                                 serialized_msg,
922                                                 0,
923                                                 serialized_msg.length);
924             Message tmp=msg.copy(false); // we need to preserve headers which may already be present
925             tmp.setBuffer(encrypted_msg);
926             if(tmp.getSrc() == null)
927                 tmp.setSrc(local_addr);
928             tmp.putHeader(this.id, hdr);
929             passItDown(new Event(Event.MSG, tmp));
930             return;
931         }
932 
933         // put our encrypt header on the message
934         msg.putHeader(this.id, hdr);
935 
936         // copy neeeded because same message (object) may be retransmitted -> no double encryption
937         Message msgEncrypted=msg.copy(false);
938         msgEncrypted.setBuffer(encryptMessage(symEncodingCipher,
939                                               msg.getRawBuffer(),
940                                               msg.getOffset(),
941                                               msg.getLength()));
942         passItDown(new Event(Event.MSG, msgEncrypted));
943     }
944 
945     /**
946      *
947      * @param cipher
948      * @param plain
949      * @return
950      * @throws Exception
951      */
encryptMessage(Cipher cipher, byte[] plain, int offset, int length)952     private synchronized byte[] encryptMessage(Cipher cipher, byte[] plain, int offset, int length) throws Exception {
953         return cipher.doFinal(plain, offset, length);
954     }
955 
decodeKey(byte[] encodedKey)956     private SecretKeySpec decodeKey(byte[] encodedKey) throws Exception {
957         // try and decode secrey key sent from keyserver
958         byte[] keyBytes;
959 
960         synchronized(this) {
961             keyBytes=asymCipher.doFinal(encodedKey);
962         }
963 
964         SecretKeySpec keySpec=null;
965         try {
966             keySpec=new SecretKeySpec(keyBytes, getAlgorithm(symAlgorithm));
967 
968             // test reconstituted key to see if valid
969             Cipher temp=Cipher.getInstance(symAlgorithm);
970             temp.init(Cipher.SECRET_KEY, keySpec);
971         }
972         catch(Exception e) {
973             log.fatal(e.toString());
974             keySpec=null;
975         }
976         return keySpec;
977     }
978 
979     /**
980      * used to reconstitute public key sent in byte form from peer
981      *
982      * @param encodedKey
983      * @return PublicKey
984      */
generatePubKey(byte[] encodedKey)985     private PublicKey generatePubKey(byte[] encodedKey) {
986         PublicKey pubKey=null;
987         try {
988             KeyFactory KeyFac=KeyFactory.getInstance(getAlgorithm(asymAlgorithm));
989             X509EncodedKeySpec x509KeySpec=new X509EncodedKeySpec(encodedKey);
990             pubKey=KeyFac.generatePublic(x509KeySpec);
991         }
992         catch(Exception e) {
993             e.printStackTrace();
994         }
995         return pubKey;
996     }
997 
998     /*
999       * simple helper method so we can see the format of the byte arrays in a
1000       * readable form could be better to use Base64 but will do for now
1001       */
formatArray(byte[] array)1002     private static String formatArray(byte[] array) {
1003         StringBuilder buf=new StringBuilder();
1004         for(int i=0;i < array.length;i++) {
1005             buf.append(Integer.toHexString(array[i]));
1006         }
1007         return buf.toString();
1008     }
1009 
1010     /**
1011      * @return Returns the asymInit.
1012      */
getAsymInit()1013     protected int getAsymInit() {
1014         return asymInit;
1015     }
1016 
1017     /**
1018      * @return Returns the asymProvider.
1019      */
getAsymProvider()1020     protected String getAsymProvider() {
1021         return asymProvider;
1022     }
1023 
1024     /**
1025      * @return Returns the desKey.
1026      */
getDesKey()1027     protected SecretKey getDesKey() {
1028         return secretKey;
1029     }
1030 
1031     /**
1032      * @return Returns the kpair.
1033      */
getKpair()1034     protected KeyPair getKpair() {
1035         return Kpair;
1036     }
1037 
1038     /**
1039      * @return Returns the asymCipher.
1040      */
getAsymCipher()1041     protected Cipher getAsymCipher() {
1042         return asymCipher;
1043     }
1044 
1045     /**
1046      * @return Returns the serverPubKey.
1047      */
getServerPubKey()1048     protected PublicKey getServerPubKey() {
1049         return serverPubKey;
1050     }
1051 
1052     /**
1053      * @return Returns the symAlgorithm.
1054      */
getSymAlgorithm()1055     protected String getSymAlgorithm() {
1056         return symAlgorithm;
1057     }
1058 
1059     /**
1060      * @return Returns the symInit.
1061      */
getSymInit()1062     protected int getSymInit() {
1063         return symInit;
1064     }
1065 
1066     /**
1067      * @return Returns the symProvider.
1068      */
getSymProvider()1069     protected String getSymProvider() {
1070         return symProvider;
1071     }
1072 
1073     /**
1074      * @return Returns the asymAlgorithm.
1075      */
getAsymAlgorithm()1076     protected String getAsymAlgorithm() {
1077         return asymAlgorithm;
1078     }
1079 
1080     /**
1081      * @return Returns the symVersion.
1082      */
getSymVersion()1083     private String getSymVersion() {
1084         return symVersion;
1085     }
1086 
1087     /**
1088      * @param symVersion
1089      *                The symVersion to set.
1090      */
setSymVersion(String symVersion)1091     private void setSymVersion(String symVersion) {
1092         this.symVersion=symVersion;
1093     }
1094 
1095     /**
1096      * @return Returns the secretKey.
1097      */
getSecretKey()1098     private SecretKey getSecretKey() {
1099         return secretKey;
1100     }
1101 
1102     /**
1103      * @param secretKey
1104      *                The secretKey to set.
1105      */
setSecretKey(SecretKey secretKey)1106     private void setSecretKey(SecretKey secretKey) {
1107         this.secretKey=secretKey;
1108     }
1109 
1110     /**
1111      * @return Returns the keyStoreName.
1112      */
getKeyStoreName()1113     protected String getKeyStoreName() {
1114         return keyStoreName;
1115     }
1116 
1117     /**
1118      * @return Returns the symDecodingCipher.
1119      */
getSymDecodingCipher()1120     protected Cipher getSymDecodingCipher() {
1121         return symDecodingCipher;
1122     }
1123 
1124     /**
1125      * @return Returns the symEncodingCipher.
1126      */
getSymEncodingCipher()1127     protected Cipher getSymEncodingCipher() {
1128         return symEncodingCipher;
1129     }
1130 
1131     /**
1132      * @return Returns the local_addr.
1133      */
getLocal_addr()1134     protected Address getLocal_addr() {
1135         return local_addr;
1136     }
1137 
1138     /**
1139      * @param local_addr
1140      *                The local_addr to set.
1141      */
setLocal_addr(Address local_addr)1142     protected void setLocal_addr(Address local_addr) {
1143         this.local_addr=local_addr;
1144     }
1145 
1146     /**
1147      * @return Returns the keyServerAddr.
1148      */
getKeyServerAddr()1149     protected Address getKeyServerAddr() {
1150         return keyServerAddr;
1151     }
1152 
1153     /**
1154      * @param keyServerAddr
1155      *                The keyServerAddr to set.
1156      */
setKeyServerAddr(Address keyServerAddr)1157     protected void setKeyServerAddr(Address keyServerAddr) {
1158         this.keyServerAddr=keyServerAddr;
1159     }
1160 
1161     public static class EncryptHeader extends org.jgroups.Header {
1162         short type;
1163         public static final short ENCRYPT=0;
1164         public static final short KEY_REQUEST=1;
1165         public static final short SERVER_PUBKEY=2;
1166         public static final short SECRETKEY=3;
1167         public static final short SECRETKEY_READY=4;
1168 
1169         String version;
1170         boolean encrypt_entire_msg=false;
1171 
EncryptHeader()1172         public EncryptHeader() {}
1173 
EncryptHeader(short type)1174         public EncryptHeader(short type) {
1175             //this(type, 0l);
1176             this.type=type;
1177             this.version="";
1178         }
1179 
EncryptHeader(short type,String version)1180         public EncryptHeader(short type,String version) {
1181             this.type=type;
1182             this.version=version;
1183         }
1184 
1185 
writeTo(DataOutputStream out)1186         public void writeTo(DataOutputStream out) throws IOException {
1187             out.writeShort(type);
1188             Util.writeString(version, out);
1189             out.writeBoolean(encrypt_entire_msg);
1190         }
1191 
readFrom(DataInputStream in)1192         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
1193             type=in.readShort();
1194             version=Util.readString(in);
1195             encrypt_entire_msg=in.readBoolean();
1196         }
1197 
toString()1198         public String toString() {
1199             return "ENCRYPT [type=" + type
1200                    + " version=\""
1201                    + (version != null? version.length() + " bytes" : "n/a")
1202                    + "\"]";
1203         }
1204 
size()1205         public int size() {
1206             int retval=Global.SHORT_SIZE + Global.BYTE_SIZE + Global.BYTE_SIZE;
1207             if(version != null)
1208                 retval+=version.length() + 2;
1209             return retval;
1210         }
1211 
1212 
1213         /**
1214          * @return Returns the type.
1215          */
getType()1216         protected short getType() {
1217             return type;
1218         }
1219 
1220         /**
1221          * @return Returns the version.
1222          */
getVersion()1223         protected String getVersion() {
1224             return version;
1225         }
1226     }
1227 }
1228