1 /*
2  * Copyright (C) 2005-2008 Jive Software. All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package org.jivesoftware.openfire.net;
18 
19 import java.io.BufferedWriter;
20 import java.io.IOException;
21 import java.io.OutputStreamWriter;
22 import java.io.Writer;
23 import java.net.Socket;
24 import java.net.UnknownHostException;
25 import java.nio.channels.Channels;
26 import java.nio.charset.StandardCharsets;
27 import java.security.cert.Certificate;
28 import java.util.Collection;
29 import java.util.Date;
30 import java.util.HashMap;
31 import java.util.Map;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicReference;
35 
36 import javax.annotation.Nullable;
37 import javax.net.ssl.SSLPeerUnverifiedException;
38 
39 import org.jivesoftware.openfire.*;
40 import org.jivesoftware.openfire.auth.UnauthorizedException;
41 import org.jivesoftware.openfire.session.IncomingServerSession;
42 import org.jivesoftware.openfire.session.LocalSession;
43 import org.jivesoftware.openfire.session.Session;
44 import org.jivesoftware.openfire.spi.ConnectionConfiguration;
45 import org.jivesoftware.openfire.spi.ConnectionManagerImpl;
46 import org.jivesoftware.openfire.spi.ConnectionType;
47 import org.jivesoftware.util.JiveGlobals;
48 import org.jivesoftware.util.LocaleUtils;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import org.xmpp.packet.Packet;
52 
53 import com.jcraft.jzlib.JZlib;
54 import com.jcraft.jzlib.ZOutputStream;
55 
56 /**
57  * An object to track the state of a XMPP client-server session.
58  * Currently this class contains the socket channel connecting the
59  * client and server.
60  *
61  * @author Iain Shigeoka
62  * @deprecated Old, pre NIO / MINA code. Should not be used as NIO offers better performance. Currently only in use for s2s.
63  */
64 public class SocketConnection implements Connection {
65 
66     private static final Logger Log = LoggerFactory.getLogger(SocketConnection.class);
67 
68     private static Map<SocketConnection, String> instances =
69             new ConcurrentHashMap<>();
70 
71     /**
72      * Milliseconds a connection has to be idle to be closed. Timeout is disabled by default. It's
73      * up to the connection's owner to configure the timeout value. Sending stanzas to the client
74      * is not considered as activity. We are only considering the connection active when the
75      * client sends some data or hearbeats (i.e. whitespaces) to the server.
76      * The reason for this is that sending data will fail if the connection is closed. And if
77      * the thread is blocked while sending data (because the socket is closed) then the clean up
78      * thread will close the socket anyway.
79      */
80     private long idleTimeout = -1;
81 
82     final private Map<ConnectionCloseListener, Object> listeners =
83             new HashMap<>();
84 
85     private Socket socket;
86     private SocketReader socketReader;
87 
88     private Writer writer;
89     private AtomicBoolean writing = new AtomicBoolean(false);
90     private AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
91 
92     /**
93      * Deliverer to use when the connection is closed or was closed when delivering
94      * a packet.
95      */
96     private PacketDeliverer backupDeliverer;
97 
98     private LocalSession session;
99     private boolean secure;
100     private boolean compressed;
101     private org.jivesoftware.util.XMLWriter xmlSerializer;
102     private int majorVersion = 1;
103     private int minorVersion = 0;
104     private String language = null;
105     private TLSStreamHandler tlsStreamHandler;
106 
107     private long writeStarted = -1;
108 
109     /**
110      * TLS policy currently in use for this connection.
111      */
112     private TLSPolicy tlsPolicy = TLSPolicy.optional;
113     private boolean usingSelfSignedCertificate;
114 
115     /**
116      * Compression policy currently in use for this connection.
117      */
118     private CompressionPolicy compressionPolicy = CompressionPolicy.disabled;
119 
getInstances()120     public static Collection<SocketConnection> getInstances() {
121         return instances.keySet();
122     }
123 
124     /**
125      * Create a new session using the supplied socket.
126      *
127      * @param backupDeliverer the packet deliverer this connection will use when socket is closed.
128      * @param socket the socket to represent.
129      * @param isSecure true if this is a secure connection.
130      * @throws java.io.IOException if there was a socket error while sending the packet.
131      */
SocketConnection(PacketDeliverer backupDeliverer, Socket socket, boolean isSecure)132     public SocketConnection(PacketDeliverer backupDeliverer, Socket socket, boolean isSecure)
133             throws IOException {
134         if (socket == null) {
135             throw new NullPointerException("Socket channel must be non-null");
136         }
137 
138         this.secure = isSecure;
139         this.socket = socket;
140         // DANIELE: Modify socket to use channel
141         if (socket.getChannel() != null) {
142             writer = Channels.newWriter(
143                     ServerTrafficCounter.wrapWritableChannel(socket.getChannel()), StandardCharsets.UTF_8.newEncoder(), -1);
144         }
145         else {
146             writer = new BufferedWriter(new OutputStreamWriter(
147                     ServerTrafficCounter.wrapOutputStream(socket.getOutputStream()), StandardCharsets.UTF_8));
148         }
149         this.backupDeliverer = backupDeliverer;
150         xmlSerializer = new XMLSocketWriter(writer, this);
151 
152         instances.put(this, "");
153 
154         // Default this sensibly.
155         this.tlsPolicy = this.getConfiguration().getTlsPolicy();
156     }
157 
158     /**
159      * Returns the stream handler responsible for securing the plain connection and providing
160      * the corresponding input and output streams.
161      *
162      * @return the stream handler responsible for securing the plain connection and providing
163      *         the corresponding input and output streams.
164      */
getTLSStreamHandler()165     public TLSStreamHandler getTLSStreamHandler() {
166         return tlsStreamHandler;
167     }
168 
startTLS(boolean clientMode, boolean directTLS)169     public void startTLS(boolean clientMode, boolean directTLS) throws IOException {
170         if (!secure) {
171             secure = true;
172 
173             // Prepare for TLS
174             final ClientAuth clientAuth;
175             if (session instanceof IncomingServerSession)
176             {
177                 clientAuth = ClientAuth.needed;
178             }
179             else
180             {
181                 clientAuth = ClientAuth.wanted;
182             }
183             tlsStreamHandler = new TLSStreamHandler(socket, getConfiguration(), clientMode);
184             if (!clientMode && !directTLS) {
185                 // Indicate the client that the server is ready to negotiate TLS
186                 deliverRawText("<proceed xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\"/>");
187             }
188             // Start handshake
189             tlsStreamHandler.start();
190             // Use new wrapped writers
191             writer = new BufferedWriter(new OutputStreamWriter(tlsStreamHandler.getOutputStream(), StandardCharsets.UTF_8));
192             xmlSerializer = new XMLSocketWriter(writer, this);
193         }
194     }
195 
196     @Override
addCompression()197     public void addCompression() {
198         // WARNING: We do not support adding compression for incoming traffic but not for outgoing traffic.
199     }
200 
201     @Override
startCompression()202     public void startCompression() {
203         compressed = true;
204 
205         try {
206             if (tlsStreamHandler == null) {
207                 ZOutputStream out = new ZOutputStream(
208                         ServerTrafficCounter.wrapOutputStream(socket.getOutputStream()),
209                         JZlib.Z_BEST_COMPRESSION);
210                 out.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
211                 writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
212                 xmlSerializer = new XMLSocketWriter(writer, this);
213             }
214             else {
215                 ZOutputStream out = new ZOutputStream(tlsStreamHandler.getOutputStream(), JZlib.Z_BEST_COMPRESSION);
216                 out.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
217                 writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
218                 xmlSerializer = new XMLSocketWriter(writer, this);
219             }
220         } catch (IOException e) {
221             // TODO Would be nice to still be able to throw the exception and not catch it here
222             Log.error("Error while starting compression", e);
223             compressed = false;
224         }
225     }
226 
227     @Override
getConfiguration()228     public ConnectionConfiguration getConfiguration()
229     {
230         // This is an ugly hack to get backwards compatibility with the pre-MINA era. As this implementation is being
231         // removed (it is marked as deprecated - at the time of writing, it is only used for S2S). The ugly hack: assume
232         // S2S:
233         final ConnectionManagerImpl connectionManager = ((ConnectionManagerImpl) XMPPServer.getInstance().getConnectionManager());
234         return connectionManager.getListener( ConnectionType.SOCKET_S2S, false ).generateConnectionConfiguration();
235     }
236 
validate()237     public boolean validate() {
238         if (isClosed()) {
239             return false;
240         }
241         boolean allowedToWrite = false;
242         try {
243             requestWriting();
244             allowedToWrite = true;
245             // Register that we started sending data on the connection
246             writeStarted();
247             writer.write(" ");
248             writer.flush();
249         }
250         catch (Exception e) {
251             Log.warn("Closing no longer valid connection" + "\n" + this.toString(), e);
252             close();
253         }
254         finally {
255             // Register that we finished sending data on the connection
256             writeFinished();
257             if (allowedToWrite) {
258                 releaseWriting();
259             }
260         }
261         return !isClosed();
262     }
263 
264     @Override
init(LocalSession owner)265     public void init(LocalSession owner) {
266         session = owner;
267     }
268 
269     @Override
reinit(LocalSession owner)270     public void reinit(LocalSession owner) {
271         session = owner;
272 
273         // ConnectionCloseListeners are registered with their session instance as a callback object. When re-initializing,
274         // this object needs to be replaced with the new session instance (or otherwise, the old session will be used
275         // during the callback. OF-2014
276         for ( final Map.Entry<ConnectionCloseListener, Object> entry : listeners.entrySet() )
277         {
278             if ( entry.getValue() instanceof LocalSession ) {
279                 entry.setValue( owner );
280             }
281         }
282     }
283 
284     @Override
registerCloseListener(ConnectionCloseListener listener, Object handbackMessage)285     public void registerCloseListener(ConnectionCloseListener listener, Object handbackMessage) {
286         if (isClosed()) {
287             listener.onConnectionClose(handbackMessage);
288         }
289         else {
290             listeners.put(listener, handbackMessage);
291         }
292     }
293 
294     @Override
removeCloseListener(ConnectionCloseListener listener)295     public void removeCloseListener(ConnectionCloseListener listener) {
296         listeners.remove(listener);
297     }
298 
299     @Override
getAddress()300     public byte[] getAddress() throws UnknownHostException {
301         return socket.getInetAddress().getAddress();
302     }
303 
304     @Override
getHostAddress()305     public String getHostAddress() throws UnknownHostException {
306         return socket.getInetAddress().getHostAddress();
307     }
308 
309     @Override
getHostName()310     public String getHostName() throws UnknownHostException {
311         return socket.getInetAddress().getHostName();
312     }
313 
314     /**
315      * Returns the port that the connection uses.
316      *
317      * @return the port that the connection uses.
318      */
getPort()319     public int getPort() {
320         return socket.getPort();
321     }
322 
323     /**
324      * Returns the Writer used to send data to the connection. The writer should be
325      * used with caution. In the majority of cases, the {@link #deliver(Packet)}
326      * method should be used to send data instead of using the writer directly.
327      * You must synchronize on the writer before writing data to it to ensure
328      * data consistency:
329      *
330      * <pre>
331      *  Writer writer = connection.getWriter();
332      * synchronized(writer) {
333      *     // write data....
334      * }</pre>
335      *
336      * @return the Writer for this connection.
337      */
getWriter()338     public Writer getWriter() {
339         return writer;
340     }
341 
342     @Override
isClosed()343     public boolean isClosed() {
344         return state.get() == State.CLOSED;
345     }
346 
347     @Override
isSecure()348     public boolean isSecure() {
349         return secure;
350     }
351 
352     @Override
isCompressed()353     public boolean isCompressed() {
354         return compressed;
355     }
356 
357     @Override
getTlsPolicy()358     public TLSPolicy getTlsPolicy() {
359         return tlsPolicy;
360     }
361 
362     /**
363      * Sets whether TLS is mandatory, optional or is disabled. When TLS is mandatory clients
364      * are required to secure their connections or otherwise their connections will be closed.
365      * On the other hand, when TLS is disabled clients are not allowed to secure their connections
366      * using TLS. Their connections will be closed if they try to secure the connection. in this
367      * last case.
368      *
369      * @param tlsPolicy whether TLS is mandatory, optional or is disabled.
370      */
371     @Override
setTlsPolicy(TLSPolicy tlsPolicy)372     public void setTlsPolicy(TLSPolicy tlsPolicy) {
373         this.tlsPolicy = tlsPolicy;
374     }
375 
376     @Override
getCompressionPolicy()377     public CompressionPolicy getCompressionPolicy() {
378         return compressionPolicy;
379     }
380 
381     /**
382      * Sets whether compression is enabled or is disabled.
383      *
384      * @param compressionPolicy whether Compression is enabled or is disabled.
385      */
386     @Override
setCompressionPolicy(CompressionPolicy compressionPolicy)387     public void setCompressionPolicy(CompressionPolicy compressionPolicy) {
388         this.compressionPolicy = compressionPolicy;
389     }
390 
getIdleTimeout()391     public long getIdleTimeout() {
392         return idleTimeout;
393     }
394 
395     /**
396      * Sets the number of milliseconds a connection has to be idle to be closed. Sending
397      * stanzas to the client is not considered as activity. We are only considering the
398      * connection active when the client sends some data or hearbeats (i.e. whitespaces)
399      * to the server.
400      *
401      * @param timeout the number of milliseconds a connection has to be idle to be closed.
402      */
setIdleTimeout(long timeout)403     public void setIdleTimeout(long timeout) {
404         this.idleTimeout = timeout;
405     }
406 
407     @Override
getMajorXMPPVersion()408     public int getMajorXMPPVersion() {
409         return majorVersion;
410     }
411 
412     @Override
getMinorXMPPVersion()413     public int getMinorXMPPVersion() {
414         return minorVersion;
415     }
416 
417     /**
418      * Sets the XMPP version information. In most cases, the version should be "1.0".
419      * However, older clients using the "Jabber" protocol do not set a version. In that
420      * case, the version is "0.0".
421      *
422      * @param majorVersion the major version.
423      * @param minorVersion the minor version.
424      */
425     @Override
setXMPPVersion(int majorVersion, int minorVersion)426     public void setXMPPVersion(int majorVersion, int minorVersion) {
427         this.majorVersion = majorVersion;
428         this.minorVersion = minorVersion;
429     }
430 
431     @Override
getLocalCertificates()432     public Certificate[] getLocalCertificates() {
433         if (tlsStreamHandler != null) {
434             return tlsStreamHandler.getSSLSession().getLocalCertificates();
435         }
436         return new Certificate[0];
437     }
438 
439     @Override
getPeerCertificates()440     public Certificate[] getPeerCertificates() {
441         if (tlsStreamHandler != null) {
442             try {
443                 return tlsStreamHandler.getSSLSession().getPeerCertificates();
444             } catch (SSLPeerUnverifiedException e ) {
445                 // Perfectly valid when client-auth is 'want', a problem when it is 'need'.
446                 Log.debug( "Peer certificates have not been verified - there are no certificates to return for: {}", tlsStreamHandler.getSSLSession().getPeerHost(), e );
447             }
448         }
449         return new Certificate[0];
450     }
451 
452     @Override
setUsingSelfSignedCertificate(boolean isSelfSigned)453     public void setUsingSelfSignedCertificate(boolean isSelfSigned) {
454         this.usingSelfSignedCertificate = isSelfSigned;
455     }
456 
457     @Override
isUsingSelfSignedCertificate()458     public boolean isUsingSelfSignedCertificate() {
459         return usingSelfSignedCertificate;
460     }
461 
462     @Override
463     @Nullable
getPacketDeliverer()464     public PacketDeliverer getPacketDeliverer() {
465         return backupDeliverer;
466     }
467 
468     /**
469      * Closes the connection without sending any data (not even a stream end-tag).
470      */
forceClose()471     public void forceClose() {
472         close( true );
473     }
474 
475     /**
476      * Closes the connection after trying to send a stream end tag.
477      */
478     @Override
close()479     public void close() {
480         close( false );
481     }
482 
483     /**
484      * Normal connection close will attempt to write the stream end tag. Otherwise this method
485      * forces the connection closed immediately. This method will be called from {@link SocketSendingTracker}
486      * when sending data over the socket has taken a long time and we need to close the socket, discard
487      * the connection and its session.
488      */
close(boolean force)489     private void close(boolean force) {
490         if (state.compareAndSet(State.OPEN, State.CLOSED)) {
491 
492             if (session != null) {
493                 session.setStatus(Session.STATUS_CLOSED);
494             }
495 
496             if (!force) {
497                 boolean allowedToWrite = false;
498                 try {
499                     requestWriting();
500                     allowedToWrite = true;
501                     // Register that we started sending data on the connection
502                     writeStarted();
503                     writer.write("</stream:stream>");
504                     writer.flush();
505                 }
506                 catch (Exception e) {
507                     Log.debug("Failed to deliver stream close tag: " + e.getMessage());
508                 }
509 
510                 // Register that we finished sending data on the connection
511                 writeFinished();
512                 if (allowedToWrite) {
513                     releaseWriting();
514                 }
515             }
516 
517             closeConnection();
518             notifyCloseListeners();
519             listeners.clear();
520         }
521     }
522 
523     @Override
systemShutdown()524     public void systemShutdown() {
525         deliverRawText("<stream:error><system-shutdown " +
526                 "xmlns='urn:ietf:params:xml:ns:xmpp-streams'/></stream:error>");
527         close();
528     }
529 
writeStarted()530     void writeStarted() {
531         writeStarted = System.currentTimeMillis();
532     }
533 
writeFinished()534     void writeFinished() {
535         writeStarted = -1;
536     }
537 
538     /**
539      * Returns true if the socket was closed due to a bad health. The socket is considered to
540      * be in a bad state if a thread has been writing for a while and the write operation has
541      * not finished in a long time or when the client has not sent a heartbeat for a long time.
542      * In any of both cases the socket will be closed.
543      *
544      * @return true if the socket was closed due to a bad health.s
545      */
checkHealth()546     boolean checkHealth() {
547         // Check that the sending operation is still active
548         long writeTimestamp = writeStarted;
549         if (writeTimestamp > -1 && System.currentTimeMillis() - writeTimestamp >
550                 JiveGlobals.getIntProperty("xmpp.session.sending-limit", 60000)) {
551             // Close the socket
552             if (Log.isDebugEnabled()) {
553                 Log.debug("Closing connection: " + this + " that started sending data at: " +
554                         new Date(writeTimestamp));
555             }
556             forceClose();
557             return true;
558         }
559         else {
560             // Check if the connection has been idle. A connection is considered idle if the client
561             // has not been receiving data for a period. Sending data to the client is not
562             // considered as activity.
563             if (idleTimeout > -1 && socketReader != null &&
564                     System.currentTimeMillis() - socketReader.getLastActive() > idleTimeout) {
565                 // Close the socket
566                 if (Log.isDebugEnabled()) {
567                     Log.debug("Closing connection that has been idle: " + this);
568                 }
569                 forceClose();
570                 return true;
571             }
572         }
573         return false;
574     }
575 
release()576     private void release() {
577         writeStarted = -1;
578         instances.remove(this);
579     }
580 
closeConnection()581     private void closeConnection() {
582         release();
583         try {
584             if (tlsStreamHandler == null) {
585                 socket.close();
586             }
587             else {
588                 // Close the channels since we are using TLS (i.e. NIO). If the channels implement
589                 // the InterruptibleChannel interface then any other thread that was blocked in
590                 // an I/O operation will be interrupted and an exception thrown
591                 tlsStreamHandler.close();
592             }
593         }
594         catch (Exception e) {
595             Log.error(LocaleUtils.getLocalizedString("admin.error.close")
596                     + "\n" + this.toString(), e);
597         }
598     }
599 
600     @Override
deliver(Packet packet)601     public void deliver(Packet packet) throws UnauthorizedException, PacketException {
602         if (isClosed()) {
603             if (backupDeliverer != null) {
604                 backupDeliverer.deliver(packet);
605             } else {
606                 Log.trace("Discarding packet that was due to be delivered on closed connection {}, for which no backup deliverer was configured.", this);
607             }
608         }
609         else {
610             boolean errorDelivering = false;
611             boolean allowedToWrite = false;
612             try {
613                 requestWriting();
614                 allowedToWrite = true;
615                 xmlSerializer.write(packet.getElement());
616                 xmlSerializer.flush();
617             }
618             catch (Exception e) {
619                 Log.debug("Error delivering packet" + "\n" + this.toString(), e);
620                 errorDelivering = true;
621             }
622             finally {
623                 if (allowedToWrite) {
624                     releaseWriting();
625                 }
626             }
627             if (errorDelivering) {
628                 close();
629                 // Retry sending the packet again through the backup deliverer.
630                 if (backupDeliverer != null) {
631                     backupDeliverer.deliver(packet);
632                 } else {
633                     Log.trace("Discarding packet that failed to be delivered to connection {}, for which no backup deliverer was configured.", this);
634                 }
635             }
636             else {
637                 session.incrementServerPacketCount();
638             }
639         }
640     }
641 
642     @Override
deliverRawText(String text)643     public void deliverRawText(String text) {
644         if (!isClosed()) {
645             boolean errorDelivering = false;
646             boolean allowedToWrite = false;
647             try {
648                 requestWriting();
649                 allowedToWrite = true;
650                 // Register that we started sending data on the connection
651                 writeStarted();
652                 writer.write(text);
653                 writer.flush();
654             }
655             catch (Exception e) {
656                 Log.debug("Error delivering raw text" + "\n" + this.toString(), e);
657                 errorDelivering = true;
658             }
659             finally {
660                 // Register that we finished sending data on the connection
661                 writeFinished();
662                 if (allowedToWrite) {
663                     releaseWriting();
664                 }
665             }
666             if (errorDelivering) {
667                 close();
668             }
669         }
670     }
671 
672     /**
673      * Notifies all close listeners that the connection has been closed.
674      * Used by subclasses to properly finish closing the connection.
675      */
notifyCloseListeners()676     private void notifyCloseListeners() {
677         synchronized (listeners) {
678             for (ConnectionCloseListener listener : listeners.keySet()) {
679                 try {
680                     listener.onConnectionClose(listeners.get(listener));
681                 }
682                 catch (Exception e) {
683                     Log.error("Error notifying listener: " + listener, e);
684                 }
685             }
686         }
687     }
688 
requestWriting()689     private void requestWriting() throws Exception {
690         for (;;) {
691             if (writing.compareAndSet(false, true)) {
692                 // We are now in writing mode and only we can write to the socket
693                 return;
694             }
695             else {
696                 // Check health of the socket
697                 if (checkHealth()) {
698                     // Connection was closed then stop
699                     throw new Exception("Probable dead connection was closed");
700                 }
701                 else {
702                     Thread.sleep(1);
703                 }
704             }
705         }
706     }
707 
releaseWriting()708     private void releaseWriting() {
709         writing.compareAndSet(true, false);
710     }
711 
712     @Override
toString()713     public String toString() {
714         return super.toString() + " socket: " + socket + " session: " + session;
715     }
716 
setSocketReader(SocketReader socketReader)717     public void setSocketReader(SocketReader socketReader) {
718         this.socketReader = socketReader;
719     }
720 }
721