1 /* 2 * Copyright (C) 2005-2008 Jive Software. All rights reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package org.jivesoftware.openfire.net; 18 19 import java.io.BufferedWriter; 20 import java.io.IOException; 21 import java.io.OutputStreamWriter; 22 import java.io.Writer; 23 import java.net.Socket; 24 import java.net.UnknownHostException; 25 import java.nio.channels.Channels; 26 import java.nio.charset.StandardCharsets; 27 import java.security.cert.Certificate; 28 import java.util.Collection; 29 import java.util.Date; 30 import java.util.HashMap; 31 import java.util.Map; 32 import java.util.concurrent.ConcurrentHashMap; 33 import java.util.concurrent.atomic.AtomicBoolean; 34 import java.util.concurrent.atomic.AtomicReference; 35 36 import javax.annotation.Nullable; 37 import javax.net.ssl.SSLPeerUnverifiedException; 38 39 import org.jivesoftware.openfire.*; 40 import org.jivesoftware.openfire.auth.UnauthorizedException; 41 import org.jivesoftware.openfire.session.IncomingServerSession; 42 import org.jivesoftware.openfire.session.LocalSession; 43 import org.jivesoftware.openfire.session.Session; 44 import org.jivesoftware.openfire.spi.ConnectionConfiguration; 45 import org.jivesoftware.openfire.spi.ConnectionManagerImpl; 46 import org.jivesoftware.openfire.spi.ConnectionType; 47 import org.jivesoftware.util.JiveGlobals; 48 import org.jivesoftware.util.LocaleUtils; 49 import org.slf4j.Logger; 50 import org.slf4j.LoggerFactory; 51 import org.xmpp.packet.Packet; 52 53 import com.jcraft.jzlib.JZlib; 54 import com.jcraft.jzlib.ZOutputStream; 55 56 /** 57 * An object to track the state of a XMPP client-server session. 58 * Currently this class contains the socket channel connecting the 59 * client and server. 60 * 61 * @author Iain Shigeoka 62 * @deprecated Old, pre NIO / MINA code. Should not be used as NIO offers better performance. Currently only in use for s2s. 63 */ 64 public class SocketConnection implements Connection { 65 66 private static final Logger Log = LoggerFactory.getLogger(SocketConnection.class); 67 68 private static Map<SocketConnection, String> instances = 69 new ConcurrentHashMap<>(); 70 71 /** 72 * Milliseconds a connection has to be idle to be closed. Timeout is disabled by default. It's 73 * up to the connection's owner to configure the timeout value. Sending stanzas to the client 74 * is not considered as activity. We are only considering the connection active when the 75 * client sends some data or hearbeats (i.e. whitespaces) to the server. 76 * The reason for this is that sending data will fail if the connection is closed. And if 77 * the thread is blocked while sending data (because the socket is closed) then the clean up 78 * thread will close the socket anyway. 79 */ 80 private long idleTimeout = -1; 81 82 final private Map<ConnectionCloseListener, Object> listeners = 83 new HashMap<>(); 84 85 private Socket socket; 86 private SocketReader socketReader; 87 88 private Writer writer; 89 private AtomicBoolean writing = new AtomicBoolean(false); 90 private AtomicReference<State> state = new AtomicReference<State>(State.OPEN); 91 92 /** 93 * Deliverer to use when the connection is closed or was closed when delivering 94 * a packet. 95 */ 96 private PacketDeliverer backupDeliverer; 97 98 private LocalSession session; 99 private boolean secure; 100 private boolean compressed; 101 private org.jivesoftware.util.XMLWriter xmlSerializer; 102 private int majorVersion = 1; 103 private int minorVersion = 0; 104 private String language = null; 105 private TLSStreamHandler tlsStreamHandler; 106 107 private long writeStarted = -1; 108 109 /** 110 * TLS policy currently in use for this connection. 111 */ 112 private TLSPolicy tlsPolicy = TLSPolicy.optional; 113 private boolean usingSelfSignedCertificate; 114 115 /** 116 * Compression policy currently in use for this connection. 117 */ 118 private CompressionPolicy compressionPolicy = CompressionPolicy.disabled; 119 getInstances()120 public static Collection<SocketConnection> getInstances() { 121 return instances.keySet(); 122 } 123 124 /** 125 * Create a new session using the supplied socket. 126 * 127 * @param backupDeliverer the packet deliverer this connection will use when socket is closed. 128 * @param socket the socket to represent. 129 * @param isSecure true if this is a secure connection. 130 * @throws java.io.IOException if there was a socket error while sending the packet. 131 */ SocketConnection(PacketDeliverer backupDeliverer, Socket socket, boolean isSecure)132 public SocketConnection(PacketDeliverer backupDeliverer, Socket socket, boolean isSecure) 133 throws IOException { 134 if (socket == null) { 135 throw new NullPointerException("Socket channel must be non-null"); 136 } 137 138 this.secure = isSecure; 139 this.socket = socket; 140 // DANIELE: Modify socket to use channel 141 if (socket.getChannel() != null) { 142 writer = Channels.newWriter( 143 ServerTrafficCounter.wrapWritableChannel(socket.getChannel()), StandardCharsets.UTF_8.newEncoder(), -1); 144 } 145 else { 146 writer = new BufferedWriter(new OutputStreamWriter( 147 ServerTrafficCounter.wrapOutputStream(socket.getOutputStream()), StandardCharsets.UTF_8)); 148 } 149 this.backupDeliverer = backupDeliverer; 150 xmlSerializer = new XMLSocketWriter(writer, this); 151 152 instances.put(this, ""); 153 154 // Default this sensibly. 155 this.tlsPolicy = this.getConfiguration().getTlsPolicy(); 156 } 157 158 /** 159 * Returns the stream handler responsible for securing the plain connection and providing 160 * the corresponding input and output streams. 161 * 162 * @return the stream handler responsible for securing the plain connection and providing 163 * the corresponding input and output streams. 164 */ getTLSStreamHandler()165 public TLSStreamHandler getTLSStreamHandler() { 166 return tlsStreamHandler; 167 } 168 startTLS(boolean clientMode, boolean directTLS)169 public void startTLS(boolean clientMode, boolean directTLS) throws IOException { 170 if (!secure) { 171 secure = true; 172 173 // Prepare for TLS 174 final ClientAuth clientAuth; 175 if (session instanceof IncomingServerSession) 176 { 177 clientAuth = ClientAuth.needed; 178 } 179 else 180 { 181 clientAuth = ClientAuth.wanted; 182 } 183 tlsStreamHandler = new TLSStreamHandler(socket, getConfiguration(), clientMode); 184 if (!clientMode && !directTLS) { 185 // Indicate the client that the server is ready to negotiate TLS 186 deliverRawText("<proceed xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\"/>"); 187 } 188 // Start handshake 189 tlsStreamHandler.start(); 190 // Use new wrapped writers 191 writer = new BufferedWriter(new OutputStreamWriter(tlsStreamHandler.getOutputStream(), StandardCharsets.UTF_8)); 192 xmlSerializer = new XMLSocketWriter(writer, this); 193 } 194 } 195 196 @Override addCompression()197 public void addCompression() { 198 // WARNING: We do not support adding compression for incoming traffic but not for outgoing traffic. 199 } 200 201 @Override startCompression()202 public void startCompression() { 203 compressed = true; 204 205 try { 206 if (tlsStreamHandler == null) { 207 ZOutputStream out = new ZOutputStream( 208 ServerTrafficCounter.wrapOutputStream(socket.getOutputStream()), 209 JZlib.Z_BEST_COMPRESSION); 210 out.setFlushMode(JZlib.Z_PARTIAL_FLUSH); 211 writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)); 212 xmlSerializer = new XMLSocketWriter(writer, this); 213 } 214 else { 215 ZOutputStream out = new ZOutputStream(tlsStreamHandler.getOutputStream(), JZlib.Z_BEST_COMPRESSION); 216 out.setFlushMode(JZlib.Z_PARTIAL_FLUSH); 217 writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)); 218 xmlSerializer = new XMLSocketWriter(writer, this); 219 } 220 } catch (IOException e) { 221 // TODO Would be nice to still be able to throw the exception and not catch it here 222 Log.error("Error while starting compression", e); 223 compressed = false; 224 } 225 } 226 227 @Override getConfiguration()228 public ConnectionConfiguration getConfiguration() 229 { 230 // This is an ugly hack to get backwards compatibility with the pre-MINA era. As this implementation is being 231 // removed (it is marked as deprecated - at the time of writing, it is only used for S2S). The ugly hack: assume 232 // S2S: 233 final ConnectionManagerImpl connectionManager = ((ConnectionManagerImpl) XMPPServer.getInstance().getConnectionManager()); 234 return connectionManager.getListener( ConnectionType.SOCKET_S2S, false ).generateConnectionConfiguration(); 235 } 236 validate()237 public boolean validate() { 238 if (isClosed()) { 239 return false; 240 } 241 boolean allowedToWrite = false; 242 try { 243 requestWriting(); 244 allowedToWrite = true; 245 // Register that we started sending data on the connection 246 writeStarted(); 247 writer.write(" "); 248 writer.flush(); 249 } 250 catch (Exception e) { 251 Log.warn("Closing no longer valid connection" + "\n" + this.toString(), e); 252 close(); 253 } 254 finally { 255 // Register that we finished sending data on the connection 256 writeFinished(); 257 if (allowedToWrite) { 258 releaseWriting(); 259 } 260 } 261 return !isClosed(); 262 } 263 264 @Override init(LocalSession owner)265 public void init(LocalSession owner) { 266 session = owner; 267 } 268 269 @Override reinit(LocalSession owner)270 public void reinit(LocalSession owner) { 271 session = owner; 272 273 // ConnectionCloseListeners are registered with their session instance as a callback object. When re-initializing, 274 // this object needs to be replaced with the new session instance (or otherwise, the old session will be used 275 // during the callback. OF-2014 276 for ( final Map.Entry<ConnectionCloseListener, Object> entry : listeners.entrySet() ) 277 { 278 if ( entry.getValue() instanceof LocalSession ) { 279 entry.setValue( owner ); 280 } 281 } 282 } 283 284 @Override registerCloseListener(ConnectionCloseListener listener, Object handbackMessage)285 public void registerCloseListener(ConnectionCloseListener listener, Object handbackMessage) { 286 if (isClosed()) { 287 listener.onConnectionClose(handbackMessage); 288 } 289 else { 290 listeners.put(listener, handbackMessage); 291 } 292 } 293 294 @Override removeCloseListener(ConnectionCloseListener listener)295 public void removeCloseListener(ConnectionCloseListener listener) { 296 listeners.remove(listener); 297 } 298 299 @Override getAddress()300 public byte[] getAddress() throws UnknownHostException { 301 return socket.getInetAddress().getAddress(); 302 } 303 304 @Override getHostAddress()305 public String getHostAddress() throws UnknownHostException { 306 return socket.getInetAddress().getHostAddress(); 307 } 308 309 @Override getHostName()310 public String getHostName() throws UnknownHostException { 311 return socket.getInetAddress().getHostName(); 312 } 313 314 /** 315 * Returns the port that the connection uses. 316 * 317 * @return the port that the connection uses. 318 */ getPort()319 public int getPort() { 320 return socket.getPort(); 321 } 322 323 /** 324 * Returns the Writer used to send data to the connection. The writer should be 325 * used with caution. In the majority of cases, the {@link #deliver(Packet)} 326 * method should be used to send data instead of using the writer directly. 327 * You must synchronize on the writer before writing data to it to ensure 328 * data consistency: 329 * 330 * <pre> 331 * Writer writer = connection.getWriter(); 332 * synchronized(writer) { 333 * // write data.... 334 * }</pre> 335 * 336 * @return the Writer for this connection. 337 */ getWriter()338 public Writer getWriter() { 339 return writer; 340 } 341 342 @Override isClosed()343 public boolean isClosed() { 344 return state.get() == State.CLOSED; 345 } 346 347 @Override isSecure()348 public boolean isSecure() { 349 return secure; 350 } 351 352 @Override isCompressed()353 public boolean isCompressed() { 354 return compressed; 355 } 356 357 @Override getTlsPolicy()358 public TLSPolicy getTlsPolicy() { 359 return tlsPolicy; 360 } 361 362 /** 363 * Sets whether TLS is mandatory, optional or is disabled. When TLS is mandatory clients 364 * are required to secure their connections or otherwise their connections will be closed. 365 * On the other hand, when TLS is disabled clients are not allowed to secure their connections 366 * using TLS. Their connections will be closed if they try to secure the connection. in this 367 * last case. 368 * 369 * @param tlsPolicy whether TLS is mandatory, optional or is disabled. 370 */ 371 @Override setTlsPolicy(TLSPolicy tlsPolicy)372 public void setTlsPolicy(TLSPolicy tlsPolicy) { 373 this.tlsPolicy = tlsPolicy; 374 } 375 376 @Override getCompressionPolicy()377 public CompressionPolicy getCompressionPolicy() { 378 return compressionPolicy; 379 } 380 381 /** 382 * Sets whether compression is enabled or is disabled. 383 * 384 * @param compressionPolicy whether Compression is enabled or is disabled. 385 */ 386 @Override setCompressionPolicy(CompressionPolicy compressionPolicy)387 public void setCompressionPolicy(CompressionPolicy compressionPolicy) { 388 this.compressionPolicy = compressionPolicy; 389 } 390 getIdleTimeout()391 public long getIdleTimeout() { 392 return idleTimeout; 393 } 394 395 /** 396 * Sets the number of milliseconds a connection has to be idle to be closed. Sending 397 * stanzas to the client is not considered as activity. We are only considering the 398 * connection active when the client sends some data or hearbeats (i.e. whitespaces) 399 * to the server. 400 * 401 * @param timeout the number of milliseconds a connection has to be idle to be closed. 402 */ setIdleTimeout(long timeout)403 public void setIdleTimeout(long timeout) { 404 this.idleTimeout = timeout; 405 } 406 407 @Override getMajorXMPPVersion()408 public int getMajorXMPPVersion() { 409 return majorVersion; 410 } 411 412 @Override getMinorXMPPVersion()413 public int getMinorXMPPVersion() { 414 return minorVersion; 415 } 416 417 /** 418 * Sets the XMPP version information. In most cases, the version should be "1.0". 419 * However, older clients using the "Jabber" protocol do not set a version. In that 420 * case, the version is "0.0". 421 * 422 * @param majorVersion the major version. 423 * @param minorVersion the minor version. 424 */ 425 @Override setXMPPVersion(int majorVersion, int minorVersion)426 public void setXMPPVersion(int majorVersion, int minorVersion) { 427 this.majorVersion = majorVersion; 428 this.minorVersion = minorVersion; 429 } 430 431 @Override getLocalCertificates()432 public Certificate[] getLocalCertificates() { 433 if (tlsStreamHandler != null) { 434 return tlsStreamHandler.getSSLSession().getLocalCertificates(); 435 } 436 return new Certificate[0]; 437 } 438 439 @Override getPeerCertificates()440 public Certificate[] getPeerCertificates() { 441 if (tlsStreamHandler != null) { 442 try { 443 return tlsStreamHandler.getSSLSession().getPeerCertificates(); 444 } catch (SSLPeerUnverifiedException e ) { 445 // Perfectly valid when client-auth is 'want', a problem when it is 'need'. 446 Log.debug( "Peer certificates have not been verified - there are no certificates to return for: {}", tlsStreamHandler.getSSLSession().getPeerHost(), e ); 447 } 448 } 449 return new Certificate[0]; 450 } 451 452 @Override setUsingSelfSignedCertificate(boolean isSelfSigned)453 public void setUsingSelfSignedCertificate(boolean isSelfSigned) { 454 this.usingSelfSignedCertificate = isSelfSigned; 455 } 456 457 @Override isUsingSelfSignedCertificate()458 public boolean isUsingSelfSignedCertificate() { 459 return usingSelfSignedCertificate; 460 } 461 462 @Override 463 @Nullable getPacketDeliverer()464 public PacketDeliverer getPacketDeliverer() { 465 return backupDeliverer; 466 } 467 468 /** 469 * Closes the connection without sending any data (not even a stream end-tag). 470 */ forceClose()471 public void forceClose() { 472 close( true ); 473 } 474 475 /** 476 * Closes the connection after trying to send a stream end tag. 477 */ 478 @Override close()479 public void close() { 480 close( false ); 481 } 482 483 /** 484 * Normal connection close will attempt to write the stream end tag. Otherwise this method 485 * forces the connection closed immediately. This method will be called from {@link SocketSendingTracker} 486 * when sending data over the socket has taken a long time and we need to close the socket, discard 487 * the connection and its session. 488 */ close(boolean force)489 private void close(boolean force) { 490 if (state.compareAndSet(State.OPEN, State.CLOSED)) { 491 492 if (session != null) { 493 session.setStatus(Session.STATUS_CLOSED); 494 } 495 496 if (!force) { 497 boolean allowedToWrite = false; 498 try { 499 requestWriting(); 500 allowedToWrite = true; 501 // Register that we started sending data on the connection 502 writeStarted(); 503 writer.write("</stream:stream>"); 504 writer.flush(); 505 } 506 catch (Exception e) { 507 Log.debug("Failed to deliver stream close tag: " + e.getMessage()); 508 } 509 510 // Register that we finished sending data on the connection 511 writeFinished(); 512 if (allowedToWrite) { 513 releaseWriting(); 514 } 515 } 516 517 closeConnection(); 518 notifyCloseListeners(); 519 listeners.clear(); 520 } 521 } 522 523 @Override systemShutdown()524 public void systemShutdown() { 525 deliverRawText("<stream:error><system-shutdown " + 526 "xmlns='urn:ietf:params:xml:ns:xmpp-streams'/></stream:error>"); 527 close(); 528 } 529 writeStarted()530 void writeStarted() { 531 writeStarted = System.currentTimeMillis(); 532 } 533 writeFinished()534 void writeFinished() { 535 writeStarted = -1; 536 } 537 538 /** 539 * Returns true if the socket was closed due to a bad health. The socket is considered to 540 * be in a bad state if a thread has been writing for a while and the write operation has 541 * not finished in a long time or when the client has not sent a heartbeat for a long time. 542 * In any of both cases the socket will be closed. 543 * 544 * @return true if the socket was closed due to a bad health.s 545 */ checkHealth()546 boolean checkHealth() { 547 // Check that the sending operation is still active 548 long writeTimestamp = writeStarted; 549 if (writeTimestamp > -1 && System.currentTimeMillis() - writeTimestamp > 550 JiveGlobals.getIntProperty("xmpp.session.sending-limit", 60000)) { 551 // Close the socket 552 if (Log.isDebugEnabled()) { 553 Log.debug("Closing connection: " + this + " that started sending data at: " + 554 new Date(writeTimestamp)); 555 } 556 forceClose(); 557 return true; 558 } 559 else { 560 // Check if the connection has been idle. A connection is considered idle if the client 561 // has not been receiving data for a period. Sending data to the client is not 562 // considered as activity. 563 if (idleTimeout > -1 && socketReader != null && 564 System.currentTimeMillis() - socketReader.getLastActive() > idleTimeout) { 565 // Close the socket 566 if (Log.isDebugEnabled()) { 567 Log.debug("Closing connection that has been idle: " + this); 568 } 569 forceClose(); 570 return true; 571 } 572 } 573 return false; 574 } 575 release()576 private void release() { 577 writeStarted = -1; 578 instances.remove(this); 579 } 580 closeConnection()581 private void closeConnection() { 582 release(); 583 try { 584 if (tlsStreamHandler == null) { 585 socket.close(); 586 } 587 else { 588 // Close the channels since we are using TLS (i.e. NIO). If the channels implement 589 // the InterruptibleChannel interface then any other thread that was blocked in 590 // an I/O operation will be interrupted and an exception thrown 591 tlsStreamHandler.close(); 592 } 593 } 594 catch (Exception e) { 595 Log.error(LocaleUtils.getLocalizedString("admin.error.close") 596 + "\n" + this.toString(), e); 597 } 598 } 599 600 @Override deliver(Packet packet)601 public void deliver(Packet packet) throws UnauthorizedException, PacketException { 602 if (isClosed()) { 603 if (backupDeliverer != null) { 604 backupDeliverer.deliver(packet); 605 } else { 606 Log.trace("Discarding packet that was due to be delivered on closed connection {}, for which no backup deliverer was configured.", this); 607 } 608 } 609 else { 610 boolean errorDelivering = false; 611 boolean allowedToWrite = false; 612 try { 613 requestWriting(); 614 allowedToWrite = true; 615 xmlSerializer.write(packet.getElement()); 616 xmlSerializer.flush(); 617 } 618 catch (Exception e) { 619 Log.debug("Error delivering packet" + "\n" + this.toString(), e); 620 errorDelivering = true; 621 } 622 finally { 623 if (allowedToWrite) { 624 releaseWriting(); 625 } 626 } 627 if (errorDelivering) { 628 close(); 629 // Retry sending the packet again through the backup deliverer. 630 if (backupDeliverer != null) { 631 backupDeliverer.deliver(packet); 632 } else { 633 Log.trace("Discarding packet that failed to be delivered to connection {}, for which no backup deliverer was configured.", this); 634 } 635 } 636 else { 637 session.incrementServerPacketCount(); 638 } 639 } 640 } 641 642 @Override deliverRawText(String text)643 public void deliverRawText(String text) { 644 if (!isClosed()) { 645 boolean errorDelivering = false; 646 boolean allowedToWrite = false; 647 try { 648 requestWriting(); 649 allowedToWrite = true; 650 // Register that we started sending data on the connection 651 writeStarted(); 652 writer.write(text); 653 writer.flush(); 654 } 655 catch (Exception e) { 656 Log.debug("Error delivering raw text" + "\n" + this.toString(), e); 657 errorDelivering = true; 658 } 659 finally { 660 // Register that we finished sending data on the connection 661 writeFinished(); 662 if (allowedToWrite) { 663 releaseWriting(); 664 } 665 } 666 if (errorDelivering) { 667 close(); 668 } 669 } 670 } 671 672 /** 673 * Notifies all close listeners that the connection has been closed. 674 * Used by subclasses to properly finish closing the connection. 675 */ notifyCloseListeners()676 private void notifyCloseListeners() { 677 synchronized (listeners) { 678 for (ConnectionCloseListener listener : listeners.keySet()) { 679 try { 680 listener.onConnectionClose(listeners.get(listener)); 681 } 682 catch (Exception e) { 683 Log.error("Error notifying listener: " + listener, e); 684 } 685 } 686 } 687 } 688 requestWriting()689 private void requestWriting() throws Exception { 690 for (;;) { 691 if (writing.compareAndSet(false, true)) { 692 // We are now in writing mode and only we can write to the socket 693 return; 694 } 695 else { 696 // Check health of the socket 697 if (checkHealth()) { 698 // Connection was closed then stop 699 throw new Exception("Probable dead connection was closed"); 700 } 701 else { 702 Thread.sleep(1); 703 } 704 } 705 } 706 } 707 releaseWriting()708 private void releaseWriting() { 709 writing.compareAndSet(true, false); 710 } 711 712 @Override toString()713 public String toString() { 714 return super.toString() + " socket: " + socket + " session: " + session; 715 } 716 setSocketReader(SocketReader socketReader)717 public void setSocketReader(SocketReader socketReader) { 718 this.socketReader = socketReader; 719 } 720 } 721