1 /* 2 * Copyright (C) 2020-2021 Ignite Realtime Foundation. All rights reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package org.jivesoftware.openfire.muc.spi; 17 18 import org.dom4j.Element; 19 import org.dom4j.QName; 20 import org.jivesoftware.openfire.XMPPServer; 21 import org.jivesoftware.openfire.muc.*; 22 import org.jivesoftware.util.SystemProperty; 23 import org.jivesoftware.util.XMPPDateTimeFormat; 24 import org.slf4j.Logger; 25 import org.slf4j.LoggerFactory; 26 import org.xmpp.packet.*; 27 28 import javax.annotation.Nonnull; 29 import javax.annotation.Nullable; 30 import java.io.Serializable; 31 import java.text.ParseException; 32 import java.util.*; 33 import java.util.concurrent.*; 34 import java.util.concurrent.locks.Lock; 35 36 // TODO: monitor health of s2s connection, somehow? 37 public class FMUCHandler 38 { 39 private static final Logger Log = LoggerFactory.getLogger( FMUCHandler.class ); 40 41 public static final SystemProperty<Boolean> FMUC_ENABLED = SystemProperty.Builder.ofType(Boolean.class) 42 .setKey("xmpp.muc.room.fmuc.enabled") 43 .setDynamic(true) 44 .setDefaultValue(false) 45 .addListener( isEnabled -> { 46 XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatServices().forEach( 47 service -> service.getAllRoomNames().forEach( 48 name -> { 49 final Lock lock = service.getChatRoomLock(name); 50 lock.lock(); 51 try { 52 final MUCRoom room = service.getChatRoom(name); 53 room.getFmucHandler().applyConfigurationChanges(); 54 55 // Ensure that other cluster nodes see the changes applied above. 56 service.syncChatRoom(room); 57 } finally { 58 lock.unlock(); 59 } 60 } 61 ) 62 ); 63 }) 64 .build(); 65 66 /** 67 * Qualified name of the element that denotes FMUC functionality, as specified by XEP-0289. 68 */ 69 public static final QName FMUC = QName.get("fmuc", "http://isode.com/protocol/fmuc"); 70 71 /** 72 * The room for which this handler operates. 73 */ 74 private final MUCRoom room; 75 76 /** 77 * Indicates if FMUC functionality has been started. 78 */ 79 private boolean isStarted; 80 81 /** 82 * State that indicates if the local room has been configured to actively start joining a remote room. 83 */ 84 private OutboundJoinConfiguration outboundJoinConfiguration; 85 86 /** 87 * Tracks state while the outbound join is being set up. 88 */ 89 private OutboundJoinProgress outboundJoinProgress; 90 91 /** 92 * The address of the remote rooms that the local room has connected to (in which the remote room has taken the 93 * role of 'joined FMUC node' while the local room has taken the role of the 'joining FMUC node'). 94 */ 95 private OutboundJoin outboundJoin; 96 97 /** 98 * The addresses of the remote rooms that have connected to the local room (in which the remote rooms have taken the 99 * role of 'joining FMUC nodes' while the local room has taken the role of the 'joined FMUC node'). 100 */ 101 private Map<JID, InboundJoin> inboundJoins = new HashMap<>(); 102 FMUCHandler(@onnull MUCRoom chatroom)103 public FMUCHandler(@Nonnull MUCRoom chatroom) { 104 this.room = chatroom; 105 } 106 107 /** 108 * Starts federation, which will cause a federation attempt with the outbound ('joined') node, if one is configured, 109 * and there currently are occupants in the room. 110 */ startOutbound()111 public synchronized void startOutbound() 112 { 113 Log.debug( "(room: '{}'): FMUC outbound federation is being started...", room.getJID() ); 114 115 final Collection<MUCRole> occupants = room.getOccupants(); 116 if ( occupants.isEmpty() ) { 117 Log.trace("(room: '{}'): No occupants in the room. No need to initiate an FMUC join.", room.getJID()); 118 } else { 119 Log.trace("(room: '{}'): {} occupant(s) in the room. Initiating an FMUC join for each of them.", room.getJID(), occupants.size()); 120 for ( final MUCRole occupant : occupants ) { 121 try { 122 Log.trace("(room: '{}'): Making occupant '{}' join the FMUC room.", room.getJID(), occupant.getUserAddress()); 123 join( occupant, false, true ); 124 } catch ( Exception e ) { 125 Log.trace("(room: '{}'): An exception occurred while making occupant '{}' join the FMUC room.", room.getJID(), occupant.getUserAddress(), e); 126 } 127 } 128 } 129 Log.debug( "(room: '{}'): Finished starting FMUC outbound federation.", room.getJID() ); 130 } 131 132 /** 133 * Stops federation, which will cause any joined and joining nodes to be disconnected. 134 */ stop()135 public synchronized void stop() 136 { 137 Log.debug( "(room: '{}'): FMUC federation is being stopped...", room.getJID() ); 138 139 // Keep track of all occupants that are becoming unavailable due to them having joined on remote nodes. 140 final Set<JID> removedRemoteOccupants = new HashSet<>(); 141 142 final Set<JID> removedOccupantsInbound = doStopInbound(); 143 final Set<JID> removedOccupantsOutbound = doStopOutbound(); 144 removedRemoteOccupants.addAll( removedOccupantsInbound ); 145 removedRemoteOccupants.addAll( removedOccupantsOutbound ); 146 147 Log.trace( "(room: '{}'): Done disconnecting inbound and outbound nodes from the node set. Now removing all their ({}) occupants from the room.", room.getJID(), removedRemoteOccupants.size() ); 148 makeRemoteOccupantLeaveRoom( removedRemoteOccupants ); 149 150 isStarted = false; 151 Log.debug( "(room: '{}'): Finished stopping FMUC federation.", room.getJID() ); 152 } 153 154 /** 155 * Stops inbound federation, which will cause existing federation with all of the inbound ('joining') nodes, if any 156 * are established, to be teared down. 157 */ stopInbound()158 public synchronized void stopInbound() { 159 final Set<JID> removedOccupants = doStopInbound(); 160 161 Log.trace( "(room: '{}'): Removing all ({}) occupants from the room for remote inbound node(s) that we just disconnected from.", room.getJID(), removedOccupants.size() ); 162 makeRemoteOccupantLeaveRoom( removedOccupants ); 163 164 Log.debug( "(room: '{}'): Finished stopping inbound FMUC federation.", room.getJID() ); 165 } 166 167 /** 168 * Stops inbound federation, which will cause existing federation with one specific inbound ('joining') nodes to be 169 * teared down. 170 * 171 * @param peer the address of the remote node (must be a bare JID). 172 */ stopInbound( @onnull JID peer )173 public synchronized void stopInbound( @Nonnull JID peer ) 174 { 175 if ( !peer.asBareJID().equals(peer) ) { 176 throw new IllegalArgumentException( "Expected argument 'peer' to be a bare JID, but it was not: " + peer ); 177 } 178 179 final Set<JID> removedOccupants = doStopInbound( peer ); 180 181 Log.trace( "(room: '{}'): Removing all ({}) occupants from the room for remote inbound node '{}' that we just disconnected from.", room.getJID(), removedOccupants.size(), peer ); 182 makeRemoteOccupantLeaveRoom( removedOccupants ); 183 184 Log.debug( "(room: '{}'): Finished stopping inbound FMUC federation.", room.getJID() ); 185 } 186 187 /** 188 * The workhorse implementation of stopping inbound federation, that returns a list of JIDs representing the 189 * occupants from the remote, joining nodes that are no longer in the room as a result of stopping the federation. For 190 * these occupants, a presence stanza should be shared with the remainder of the occupants. It is desirable to send 191 * these stanzas only after all any other nodes that are to be disconnected have been disconnected (to prevent 192 * sharing updates with other remote nodes that we might also be disconnecting from. This method does 193 * therefor not send these stanzas. Instead, it returns the addresses that should be send stanzas. This allows 194 * callers to aggregate addresses, and send the the stanzas in one iteration. 195 * 196 * The implementation in this method will inform the remote, joining node that they left the local, joined node by 197 * sending it a 'left' message. 198 * 199 * @return The JIDs of occupants on the remote, joining node that are no longer in the room. 200 */ doStopInbound()201 private synchronized Set<JID> doStopInbound() { 202 return doStopInbound( null ); 203 } 204 205 /** 206 * Identical to {@link #doStopInbound()} but used to disconnect just one node, instead of all of them. 207 * 208 * When null is passed as an argument, all inbound nodes are disconnected (equivalent to a call to {@link #doStopInbound()}). 209 * 210 * @param peer the address of the remote node (must be a bare JID), or null to remove all nodes. 211 * @return The JIDs of occupants on the remote, joining node that are no longer in the room. 212 * @see #doStopInbound() 213 */ doStopInbound( @ullable JID peer )214 private synchronized Set<JID> doStopInbound( @Nullable JID peer ) { 215 if ( peer != null && !peer.asBareJID().equals(peer) ) { 216 throw new IllegalArgumentException( "Expected argument 'peer' to be null or a bare JID, but it was not: " + peer ); 217 } 218 219 final Set<JID> result = new HashSet<>(); 220 if ( inboundJoins.isEmpty() ) { 221 Log.trace( "(room: '{}'): No remote MUC joining us. No need to inform joining nodes that they have now left.", room.getJID() ); 222 } else { 223 final Iterator<Map.Entry<JID, InboundJoin>> iterator = inboundJoins.entrySet().iterator(); 224 while ( iterator.hasNext() ) 225 { 226 final InboundJoin inboundJoin = iterator.next().getValue(); 227 if ( peer != null && !inboundJoin.getPeer().equals( peer )) { 228 // This is not the peer you're looking for. 229 continue; 230 } 231 iterator.remove(); // Remove inboundJoin so that it's no longer send stanzas, and incoming stanzas are being treated as if from an unconnected entity. 232 233 result.addAll( inboundJoin.occupants ); 234 235 try 236 { 237 Log.trace("(room: '{}'): Informing joining node '{}' that it is leaving the FMUC node set.", room.getJID(), inboundJoin.getPeer()); 238 final Presence left = new Presence(); 239 left.setFrom(room.getJID()); 240 left.setTo(inboundJoin.getPeer()); 241 left.getElement().addElement(FMUC).addElement("left"); 242 XMPPServer.getInstance().getPacketRouter().route(left); 243 } 244 catch ( Exception e ) 245 { 246 Log.warn("(room: '{}'): An exception occurred while informing joining node '{}' that it is leaving the FMUC node set.", room.getJID(), inboundJoin.getPeer(), e); 247 } 248 } 249 } 250 return result; 251 } 252 253 /** 254 * Stops outbound federation, which will cause existing federation with the outbound ('joined') node, if one is 255 * established, to be teared down. 256 */ stopOutbound()257 public synchronized void stopOutbound() 258 { 259 final Set<JID> removedOccupants = doStopOutbound(); 260 261 Log.trace("(room: '{}'): Removing all ({}) occupants from the room for remote outbound node that we just disconnected from.", room.getJID(), removedOccupants.size()); 262 makeRemoteOccupantLeaveRoom( removedOccupants ); 263 264 Log.debug( "(room: '{}'): Finished stopping outbound FMUC federation.", room.getJID() ); 265 } 266 267 /** 268 * The workhorse implementation of stopping outbound federation, that returns a list of JIDs representing the 269 * occupants from the remote, joined node that are no longer in the room as a result of stopping the federation. For 270 * these occupants, a presence stanza should be shared with the remainder of the occupants. It is desirable to send 271 * these stanzas only after all any other nodes that are to be disconnected have been disconnected (to prevent 272 * sharing updates with other remote nodes that we might also be disconnecting from. This method does 273 * therefor not send these stanzas. Instead, it returns the addresses that should be send stanzas. This allows 274 * callers to aggregate addresses, and send the the stanzas in one iteration. 275 * 276 * The implementation in this method will inform the remote, joined node that the local, joining node has left, by 277 * sending it presence stanzas for all occupants that the local, joining node has contributed to the FMUC set. After 278 * the remote, joined, node has received the last stanza, it should conclude that the local, joining node has left 279 * (it should respond with a 'left' message, although this implementation does not depend on that). 280 * 281 * @return The JIDs of occupants on the remote, joined node that are no longer in the room. 282 */ doStopOutbound()283 private synchronized Set<JID> doStopOutbound() 284 { 285 Log.debug("(room: '{}'): Stopping federation with remote node that we joined (if any).", room.getJID()); 286 final Set<JID> result = new HashSet<>(); 287 288 if ( outboundJoinProgress == null ) { 289 Log.trace("(room: '{}'): We are not in progress of joining a remote node. No need to abort such an effort.", room.getJID()); 290 } else { 291 Log.trace("(room: '{}'): Aborting the ongoing effort of joining remote node '{}'.", room.getJID(), outboundJoinProgress.getPeer()); 292 abortOutboundJoinProgress(); 293 } 294 295 if ( outboundJoin == null) { 296 Log.trace("(room: '{}'): We did not join a remote node. No need to inform one that we have left.", room.getJID()); 297 } else { 298 Log.trace("(room: '{}'): Informing joined node '{}' that we are leaving the FMUC node set.", room.getJID(), outboundJoin.getPeer()); 299 300 // Remove outboundJoin so that it's no longer send stanzas, and incoming stanzas are being treated as if from an unconnected entity. 301 // We'll need to store some state to be able to process things later though. 302 final JID peer = outboundJoin.getPeer(); 303 final Set<PendingCallback> pendingEcho = outboundJoin.pendingEcho; 304 final Set<JID> theirOccupants = outboundJoin.occupants; 305 306 outboundJoin = null; 307 308 result.addAll( theirOccupants ); 309 310 // If we're waiting for an echo of a stanza that we've sent to this MUC, that now will no longer arrive. Make sure that we unblock all threads waiting for such an echo. 311 if ( !pendingEcho.isEmpty() ) 312 { 313 Log.trace("(room: '{}'): Completing {} callbacks that were waiting for an echo from peer '{}' that is being disconnected from.", room.getJID(), pendingEcho.size(), peer ); 314 for ( final PendingCallback pendingCallback : pendingEcho ) 315 { 316 try { 317 pendingCallback.complete(); // TODO maybe completeExceptionally? 318 } catch ( Exception e ) { 319 Log.warn("(room: '{}'): An exception occurred while completing callback pending echo from peer '{}' (that we're disconnecting from).", room.getJID(), peer, e); 320 } 321 } 322 } 323 324 // Find all the occupants that the local node contributed to the FMUC set (those are the occupants that are 325 // not joined through the remote, joined node). Note that these can include occupants that are on other nodes! 326 final Set<MUCRole> occupantsToLeave = new HashSet<>( room.getOccupants() ); 327 occupantsToLeave.removeIf( mucRole -> mucRole.getReportedFmucAddress() != null && theirOccupants.contains( mucRole.getReportedFmucAddress() )); 328 Log.trace("(room: '{}'): Identified {} occupants that the local node contributed to the FMUC set.", room.getJID(), occupantsToLeave.size()); 329 330 // Inform the remote, joined node that these are now all gone. 331 for ( final MUCRole occupantToLeave : occupantsToLeave ) { 332 try 333 { 334 Log.trace("(room: '{}'): Informing joined node '{}' that occupant '{}' left the MUC.", room.getJID(), peer, occupantToLeave.getUserAddress()); 335 336 final Presence leave = occupantToLeave.getPresence().createCopy(); 337 leave.setType(Presence.Type.unavailable); 338 leave.setTo(new JID(peer.getNode(), peer.getDomain(), occupantToLeave.getNickname())); 339 leave.setFrom(occupantToLeave.getRoleAddress()); 340 341 // Change (or add) presence information about roles and affiliations 342 Element childElement = leave.getChildElement("x", "http://jabber.org/protocol/muc#user"); 343 if ( childElement == null ) 344 { 345 childElement = leave.addChildElement("x", "http://jabber.org/protocol/muc#user"); 346 } 347 Element item = childElement.element("item"); 348 if ( item == null ) 349 { 350 item = childElement.addElement("item"); 351 } 352 item.addAttribute("role", "none"); 353 354 final Presence enriched = enrichWithFMUCElement(leave, occupantToLeave.getReportedFmucAddress() != null ? occupantToLeave.getReportedFmucAddress() : occupantToLeave.getUserAddress()); 355 356 XMPPServer.getInstance().getPacketRouter().route(enriched); 357 } catch ( Exception e ) { 358 Log.warn("(room: '{}'): An exception occurred while informing joined node '{}' that occupant '{}' left the MUC.", room.getJID(), peer, occupantToLeave.getUserAddress(), e); 359 } 360 } 361 } 362 363 Log.trace("(room: '{}'): Finished stopping federation with remote node that we joined (if any).", room.getJID()); 364 return result; 365 } 366 367 /** 368 * Reads configuration from the room instance that is being services by this handler, and applies relevant changes. 369 */ applyConfigurationChanges()370 public synchronized void applyConfigurationChanges() 371 { 372 if ( isStarted != (room.isFmucEnabled() && FMUC_ENABLED.getValue()) ) { 373 Log.debug( "(room: '{}'): Changing availability of FMUC functionality to {}.", room.getJID(), (room.isFmucEnabled() && FMUC_ENABLED.getValue()) ); 374 if ((room.isFmucEnabled() && FMUC_ENABLED.getValue())) { 375 startOutbound(); 376 } else { 377 stop(); 378 return; 379 } 380 } 381 382 final OutboundJoinConfiguration desiredConfig; 383 if ( room.getFmucOutboundNode() != null ) { 384 desiredConfig = new OutboundJoinConfiguration( room.getFmucOutboundNode(), room.getFmucOutboundMode() ); 385 } else { 386 desiredConfig = null; 387 } 388 Log.debug("(room: '{}'): Changing outbound join configuration. Existing: {}, New: {}", room.getJID(), this.outboundJoinConfiguration, desiredConfig ); 389 390 if ( this.outboundJoinProgress != null ) { 391 if (desiredConfig == null) { 392 Log.trace( "(room: '{}'): Had, but now no longer has, outbound join configuration. Aborting ongoing federation attempt...", room.getJID() ); 393 abortOutboundJoinProgress(); 394 } else if ( this.outboundJoinProgress.getPeer().equals( desiredConfig.getPeer() ) ) { 395 Log.trace( "(room: '{}'): New configuration matches peer that ongoing federation attempt is made with. Allowing attempt to continue.", room.getJID() ); 396 } else { 397 Log.trace( "(room: '{}'): New configuration targets a different peer that ongoing federation attempt is made with. Aborting attempt.", room.getJID() ); 398 abortOutboundJoinProgress(); 399 } 400 } 401 402 if ( this.outboundJoinConfiguration == null && desiredConfig != null ) { 403 Log.trace( "(room: '{}'): Did not, but now has, outbound join configuration. Starting federation...", room.getJID() ); 404 this.outboundJoinConfiguration = desiredConfig; 405 startOutbound(); 406 } else if ( this.outboundJoinConfiguration != null && desiredConfig == null ) { 407 Log.trace( "(room: '{}'): Had, but now no longer has, outbound join configuration. Stopping federation...", room.getJID() ); 408 this.outboundJoinConfiguration = desiredConfig; 409 stopOutbound(); 410 } else if ( this.outboundJoinConfiguration != null && desiredConfig != null ) { 411 if ( outboundJoin == null ) { 412 if ( this.outboundJoinConfiguration.equals(desiredConfig ) ) { 413 // no change 414 } else { 415 Log.trace( "(room: '{}'): Applying new configuration.", room.getJID() ); 416 this.outboundJoinConfiguration = desiredConfig; 417 startOutbound(); 418 } 419 } else { 420 if ( outboundJoin.getConfiguration().equals( desiredConfig ) ) { 421 Log.trace( "(room: '{}'): New configuration matches configuration of established federation. Not applying any change.", room.getJID() ); 422 } else { 423 Log.trace( "(room: '{}'): Already had outbound join configuration, now got a different config. Restarting federation...", room.getJID() ); 424 stopOutbound(); 425 this.outboundJoinConfiguration = desiredConfig; 426 startOutbound(); 427 } 428 } 429 } 430 isStarted = true; 431 } 432 433 /** 434 * Propagates a stanza to the FMUC set, if FMUC is active for this room. 435 * 436 * Note that when a master-slave mode is active, we need to wait for an echo back, before the message can be 437 * broadcasted locally. This method will return a CompletableFuture object that is completed as soon as processing 438 * can continue. This doesn't necessarily mean that processing/propagating has been completed (eg: when the FMUC 439 * is configured to use master-master mode, a completed Future instance will be returned. 440 * 441 * When FMUC is not active, this method will return a completed Future instance. 442 * 443 * @param stanza the stanza to be propagated through FMUC. 444 * @param sender the role of the sender that is the original author of the stanza. 445 * @return A future object that completes when the stanza can be propagated locally. 446 */ propagate( @onnull Packet stanza, @Nonnull MUCRole sender )447 public synchronized CompletableFuture<?> propagate( @Nonnull Packet stanza, @Nonnull MUCRole sender ) 448 { 449 if ( !(room.isFmucEnabled() && FMUC_ENABLED.getValue()) ) { 450 Log.debug( "(room: '{}'): FMUC disabled, skipping FMUC propagation.", room.getJID() ); 451 return CompletableFuture.completedFuture(null); 452 } 453 454 Log.debug( "(room: '{}'): A stanza (type: {}, from: {}) is to be propagated in the FMUC node set.", room.getJID(), stanza.getClass().getSimpleName(), stanza.getFrom() ); 455 456 /* TODO this implementation currently is blocking, and synchronous: inbound propagation only occurs after outbound 457 propagation has been started. Is this needed? What should happen if outbound propagation fails? When the 458 mode used is master/slave, then it is reasonable to assume that a failed outbound propagation makes the 459 message propagation fail completely (and thus should not be propagated to the inbound nodes or locally? 460 */ 461 // propagate to joined FMUC node (conditionally blocks, depending on FMUC mode that's in effect). 462 final CompletableFuture<?> propagateToOutbound = propagateOutbound( stanza, sender ); 463 464 // propagate to all joining FMUC nodes (need never block). 465 final CompletableFuture<?> propagateToInbound = propagateInbound( stanza, sender ); 466 467 // Return a Future that completes when all of the Futures constructed above complete. 468 return CompletableFuture.allOf( propagateToOutbound, propagateToInbound ); 469 } 470 471 /** 472 * Makes a user in our XMPP domain join the FMUC room. 473 * 474 * The join event is propagated to all nodes in the FMUC set that the room is part of. 475 * 476 * When 'outbound' federation is desired, but has not yet been established (when this is the first user to join the 477 * room), federation is initiated. 478 * 479 * Depending on the configuration and state of the FMUC set, the join operation is considered 'blocking'. In case of 480 * a FMUC based on master-slave mode, for example, the operation cannot continue until the remote FMUC node has 481 * echo'd back the join. To accommodate this behavior, this method will return a Future instance. The Future will 482 * immediately be marked as 'done' when the operation need not have 'blocking' behavior. Note that in such cases, 483 * the Future being 'done' does explicitly <em>not</em> indicate that the remote FMUC node has received, processed 484 * and/or accepted the event. 485 * 486 * If the local room is not configured to federate with another room, an invocation of this method will do nothing. 487 * 488 * @param mucRole The role in which the user is joining the room. 489 * @return A future object that completes when the stanza can be propagated locally. 490 */ join( @onnull MUCRole mucRole )491 public synchronized Future<?> join( @Nonnull MUCRole mucRole ) 492 { 493 return join( mucRole, true, true ); 494 } 495 join(@onnull MUCRole mucRole, final boolean includeInbound, final boolean includeOutbound )496 protected synchronized Future<?> join(@Nonnull MUCRole mucRole, final boolean includeInbound, final boolean includeOutbound ) 497 { 498 if ( !(room.isFmucEnabled() && FMUC_ENABLED.getValue()) ) { 499 Log.debug( "(room: '{}'): FMUC disabled, skipping FMUC join.", room.getJID() ); 500 return CompletableFuture.completedFuture(null); 501 } 502 503 Log.debug( "(room: '{}'): user '{}' (as '{}') attempts to join.", room.getJID(), mucRole.getUserAddress(), mucRole.getRoleAddress() ); 504 505 final CompletableFuture<?> propagateToOutbound; 506 if ( !includeOutbound ) { 507 Log.trace( "(room: '{}'): skip propagating to outbound, as instructed.", room.getJID() ); 508 propagateToOutbound = CompletableFuture.completedFuture(null); 509 } else { 510 // Do we need to initiate a new outbound federation (are we configured to have an outbound federation, but has one not been started yet? 511 if ( outboundJoinConfiguration != null) { 512 if ( outboundJoin == null ) { 513 if ( outboundJoinProgress == null ) { 514 Log.trace("(room: '{}'): FMUC configuration contains configuration for a remote MUC that needs to be joined: {}", room.getJID(), outboundJoinConfiguration.getPeer() ); 515 // When a new federation is established, there's no need to explicitly propagate the join too - that's implicitly done as part of the initialization of the new federation. 516 propagateToOutbound = initiateFederationOutbound( mucRole ); 517 } else { 518 Log.debug("(room: '{}'): Received a FMUC 'join' request for a remote MUC that we're already in process of joining: {}", room.getJID(), outboundJoinConfiguration.getPeer() ); 519 return outboundJoinProgress.addToQueue( generateJoinStanza( mucRole ), mucRole ); // queue a new join stanza to be sent after the ongoing join completes. 520 } 521 } 522 else 523 { 524 // TODO Doesn't this imply some kind of problem - why would we be joining a MUC that we've already joined? 525 Log.warn("(room: '{}'): FMUC configuration contains configuration for a remote MUC: {}. Federation with this MUC has already been established.", room.getJID(), outboundJoin.getPeer() ); 526 // propagate to existing the existing joined FMUC node (be blocking if master/slave mode!) 527 propagateToOutbound = propagateOutbound( generateJoinStanza( mucRole ), mucRole ); 528 } 529 } else { 530 // Nothing to do! 531 Log.trace( "(room: '{}'): FMUC configuration does not contain a remote MUC that needs to be joined.", room.getJID() ); 532 propagateToOutbound = CompletableFuture.completedFuture(null); 533 } 534 } 535 536 /* TODO this implementation currently is blocking, and synchronous: inbound propagation only occurs after outbound 537 propagation has been started. Is this needed? What should happen if outbound propagation fails? When the 538 mode used is master/slave, then it is reasonable to assume that a failed outbound join makes the join fail 539 (and thus should not be propagated to the inbound nodes. */ 540 541 // propagate to all joining FMUC nodes (need never block). 542 final CompletableFuture<?> propagateToInbound; 543 if ( !includeInbound ) { 544 Log.trace( "(room: '{}'): skip propagating to inbound, as instructed.", room.getJID() ); 545 propagateToInbound = CompletableFuture.completedFuture(null); 546 } else { 547 propagateToInbound = propagateInbound( generateJoinStanza( mucRole ), mucRole ); 548 } 549 550 // Return a Future that completes when all of the Futures constructed above complete. 551 return CompletableFuture.allOf( propagateToOutbound, propagateToInbound ); 552 } 553 554 /** 555 * Attempt to establish a federation with a remote MUC room. In this relation 'our' room will take the role of 556 * 'joining FMUC node'. 557 * 558 * @param mucRole Occupant that joined the room, triggering the federation to be initiated. 559 * @return A future object that completes when the join can be propagated locally. 560 */ initiateFederationOutbound( @onnull MUCRole mucRole )561 private CompletableFuture<?> initiateFederationOutbound( @Nonnull MUCRole mucRole ) 562 { 563 Log.debug("(room: '{}'): Attempting to establish federation by joining '{}', triggered by user '{}' (as '{}').", room.getJID(), outboundJoinConfiguration.getPeer(), mucRole.getUserAddress(), mucRole.getRoleAddress() ); 564 565 final Presence joinStanza = enrichWithFMUCElement( generateJoinStanza( mucRole ), mucRole ); 566 joinStanza.setFrom( new JID(room.getName(), room.getMUCService().getServiceDomain(), mucRole.getNickname() ) ); 567 joinStanza.setTo( new JID(outboundJoinConfiguration.getPeer().getNode(), outboundJoinConfiguration.getPeer().getDomain(), mucRole.getNickname() ) ); 568 569 Log.trace("(room: '{}'): Registering a callback to be used when the federation request to '{}' has completed.", room.getJID(), outboundJoinConfiguration.getPeer() ); 570 final CompletableFuture<List<Packet>> result = new CompletableFuture<>(); 571 final boolean mustBlock = outboundJoinConfiguration.getMode() == FMUCMode.MasterSlave; 572 if ( !mustBlock ) { 573 Log.trace("(room: '{}'): No need to wait for federation to complete before allowing the local user to join the room.", room.getJID() ); 574 result.complete( null ); 575 } else { 576 Log.trace("(room: '{}'): Federation needs to have been completed before allowing the local user to join the room.", room.getJID() ); 577 } 578 579 outboundJoinProgress = new OutboundJoinProgress(outboundJoinConfiguration.getPeer(), result ); 580 581 Log.trace( "(room: '{}'): Sending FMUC join request: {}", room.getJID(), joinStanza.toXML() ); 582 XMPPServer.getInstance().getPacketRouter().route(joinStanza); 583 584 return result; 585 } 586 587 /** 588 * Sends a stanza to the joined FMUC node, when the local node has established such an outbound join. 589 * 590 * Note that when a master-slave mode is active, we need to wait for an echo back, before the message can be 591 * broadcasted locally. This method will return a CompletableFuture object that is completed as soon as processing 592 * can continue. This doesn't necessarily mean that processing/propagating has been completed (eg: when the FMUC 593 * is configured to use master-master mode, a completed Future instance will be returned. 594 * 595 * @param stanza The stanza to be sent. 596 * @param sender Representation of the sender of the stanza. 597 * @return A future object that completes when the stanza can be propagated locally. 598 */ propagateOutbound( @onnull Packet stanza, @Nonnull MUCRole sender )599 private CompletableFuture<?> propagateOutbound( @Nonnull Packet stanza, @Nonnull MUCRole sender ) 600 { 601 Log.trace("(room: '{}'): Propagate outbound, stanza: {}, sender: {}", room.getJID(), stanza, sender); 602 603 if ( outboundJoin == null ) 604 { 605 if ( outboundJoinProgress != null ) { 606 Log.trace("(room: '{}'): Remote MUC joining in progress. Queuing outbound propagation until after the join has been established.", room.getJID()); 607 return outboundJoinProgress.addToQueue( stanza, sender ); 608 } 609 else 610 { 611 Log.trace("(room: '{}'): No remote MUC joined. No need to propagate outbound.", room.getJID()); 612 return CompletableFuture.completedFuture(null); 613 } 614 } 615 616 if ( !outboundJoin.wantsStanzasSentBy( sender ) ) { 617 Log.trace("(room: '{}'): Skipping outbound propagation to peer '{}', as this peer needs not be sent stanzas sent by '{}' (potentially because it's a master-master mode joined FMUC and the sender originates on that node).", room.getJID(), outboundJoin.getPeer(), sender ); 618 return CompletableFuture.completedFuture(null); 619 } 620 621 final CompletableFuture<?> result = new CompletableFuture<>(); 622 doPropagateOutbound( stanza, sender, result ); 623 624 return result; 625 } 626 doPropagateOutbound(@onnull Packet stanza, @Nonnull MUCRole sender, @Nonnull CompletableFuture<?> result )627 private void doPropagateOutbound(@Nonnull Packet stanza, @Nonnull MUCRole sender, @Nonnull CompletableFuture<?> result ) 628 { 629 Log.debug("(room: '{}'): Propagating a stanza (type '{}') from user '{}' (as '{}') to the joined FMUC node {}.", room.getJID(), stanza.getClass().getSimpleName(), sender.getUserAddress(), sender.getRoleAddress(), outboundJoin.getPeer() ); 630 631 final Packet enriched = enrichWithFMUCElement( stanza, sender ); 632 enriched.setFrom( new JID(room.getName(), room.getMUCService().getServiceDomain(), sender.getNickname() ) ); 633 enriched.setTo( new JID(outboundJoin.getPeer().getNode(), outboundJoin.getPeer().getDomain(), sender.getNickname() ) ); 634 635 // When we're in a master-slave mode with the remote FMUC node that we're joining to, we must wait for it 636 // to echo back the presence data, before we can distribute it in the local room. 637 final boolean mustBlock = outboundJoin.getMode() == FMUCMode.MasterSlave; 638 if ( !mustBlock ) { 639 Log.trace("(room: '{}'): No need to wait for an echo back from joined FMUC node {} of the propagation of stanza sent by user '{}' (as '{}').", room.getJID(), outboundJoin.getPeer(), sender.getUserAddress(), sender.getRoleAddress() ); 640 result.complete( null ); 641 } else { 642 Log.debug("(room: '{}'): An echo back from joined FMUC node {} of the propagation of stanza snet by user '{}' (as '{}') needs to be received before the join event can be propagated locally.", room.getJID(), outboundJoin.getPeer(), sender.getUserAddress(), sender.getRoleAddress() ); 643 644 // register callback to complete this future when echo is received back. 645 outboundJoin.registerEchoCallback( enriched, result ); 646 } 647 648 // Send the outbound stanza. 649 XMPPServer.getInstance().getPacketRouter().route( enriched ); 650 } 651 652 /** 653 * Sends a stanza to all joined FMUC node, when the local node has accepted such inbound joins from remote peers. 654 * 655 * Optionally, the address if one particular peer can be provided to avoid propagation of the stanza to that 656 * particular node. This is to be used to prevent 'echos back' to the originating FMUC node, when appropriate. 657 * 658 * @param stanza The stanza to be sent. 659 * @param sender Representation of the sender of the stanza. 660 * @return A future object that completes when the stanza can be propagated locally. 661 */ propagateInbound( @onnull Packet stanza, @Nonnull MUCRole sender )662 private CompletableFuture<?> propagateInbound( @Nonnull Packet stanza, @Nonnull MUCRole sender ) 663 { 664 Log.trace("(room: '{}'): Propagate inbound, stanza: {}, sender: {}", room.getJID(), stanza, sender); 665 if ( inboundJoins.isEmpty() ) 666 { 667 Log.trace("(room: '{}'): No remote MUC joining us. No need to propagate inbound.", room.getJID()); 668 return CompletableFuture.completedFuture(null); 669 } 670 671 Log.trace( "(room: '{}'): Propagating a stanza (type '{}') from user '{}' (as '{}') to the all {} joining FMUC nodes.", room.getJID(), stanza.getClass().getSimpleName(), sender.getUserAddress(), sender.getRoleAddress(), inboundJoins.size() ); 672 673 for( final InboundJoin inboundJoin : inboundJoins.values() ) 674 { 675 if ( !inboundJoin.wantsStanzasSentBy( sender ) ) { 676 Log.trace("(room: '{}'): Skipping inbound propagation to peer '{}', as this peer needs not be sent stanzas sent by '{}' (potentially because it's a master-slave mode joined FMUC and the sender originates on that node).", room.getJID(), inboundJoin.getPeer(), sender ); 677 continue; 678 } 679 680 Log.trace( "(room: '{}'): Propagating a stanza (type '{}') from user '{}' (as '{}') to the joining FMUC node '{}'", room.getJID(), stanza.getClass().getSimpleName(), sender.getUserAddress(), sender.getRoleAddress(), inboundJoin.getPeer() ); 681 final Packet enriched = enrichWithFMUCElement( stanza, sender ); 682 enriched.setFrom( sender.getRoleAddress()); 683 enriched.setTo( inboundJoin.getPeer() ); 684 XMPPServer.getInstance().getPacketRouter().route( enriched ); 685 } 686 return CompletableFuture.completedFuture(null); 687 } 688 689 /** 690 * Adds an FMUC child element to the stanza, if such an element does not yet exist. 691 * 692 * This method provides the functionally opposite implementation of {@link #createCopyWithoutFMUC(Packet)}. 693 * 694 * @param stanza The stanza to which an FMUC child element is to be added. 695 * @param sender Representation of the originator of the stanza. 696 * @param <S> Type of stanza 697 * @return A copy of the stanza, with an added FMUC child element. 698 */ enrichWithFMUCElement( @onnull S stanza, @Nonnull MUCRole sender )699 private static <S extends Packet> S enrichWithFMUCElement( @Nonnull S stanza, @Nonnull MUCRole sender ) 700 { 701 // Defensive copy - ensure that the original stanza (that might be routed locally) is not modified). 702 final S result = (S) stanza.createCopy(); 703 704 final Element fmuc = result.getElement().element(FMUC); 705 if ( fmuc != null ) { 706 return result; 707 } 708 709 final JID from; 710 if ( sender.getRoleAddress().getResource() == null ) { 711 // This role represents the room itself as the sender. Rooms do not have a 'user' address. 712 from = sender.getRoleAddress(); 713 } else { 714 from = sender.getUserAddress(); 715 } 716 result.getElement().addElement( FMUC ).addAttribute( "from", from.toString() ); 717 718 return result; 719 } 720 721 /** 722 * Adds an FMUC child element to the stanza, if such an element does not yet exist. 723 * 724 * This method provides the functionally opposite implementation of {@link #createCopyWithoutFMUC(Packet)}. 725 * 726 * @param stanza The stanza to which an FMUC child element is to be added. 727 * @param sender Representation of the originator of the stanza. 728 * @param <S> Type of stanza 729 * @return A copy of the stanza, with an added FMUC child element. 730 */ enrichWithFMUCElement( @onnull S stanza, @Nonnull JID sender )731 private static <S extends Packet> S enrichWithFMUCElement( @Nonnull S stanza, @Nonnull JID sender ) 732 { 733 // Defensive copy - ensure that the original stanza (that might be routed locally) is not modified). 734 final S result = (S) stanza.createCopy(); 735 736 final Element fmuc = result.getElement().element(FMUC); 737 if ( fmuc != null ) { 738 return result; 739 } 740 741 result.getElement().addElement( FMUC ).addAttribute( "from", sender.toString() ); 742 743 return result; 744 } 745 746 /** 747 * Removes FMUC child elements from the stanza, if such an element exists. 748 * 749 * This method provides the functionally opposite implementation of {@link #enrichWithFMUCElement(Packet, MUCRole)}. 750 * 751 * @param stanza The stanza from which an FMUC child element is to be removed. 752 * @param <S> Type of stanza 753 * @return A copy of the stanza, without FMUC child element. 754 */ createCopyWithoutFMUC( S stanza )755 public static <S extends Packet> S createCopyWithoutFMUC( S stanza ) 756 { 757 final S result = (S) stanza.createCopy(); 758 final Iterator<Element> elementIterator = result.getElement().elementIterator(FMUC); 759 while (elementIterator.hasNext()) { 760 elementIterator.next(); 761 elementIterator.remove(); 762 } 763 return result; 764 } 765 766 /** 767 * Creates a stanza that represents a room 'join' in a MUC room. 768 * 769 * @param mucRole Representation of the (local) user that caused the join to be initiated. 770 */ 771 // TODO this does not have any FMUC specifics. Must this exist in this class? generateJoinStanza( @onnull MUCRole mucRole )772 private Presence generateJoinStanza( @Nonnull MUCRole mucRole ) 773 { 774 Log.debug( "(room: '{}'): Generating a stanza that represents the joining of local user '{}' (as '{}').", room.getJID(), mucRole.getUserAddress(), mucRole.getRoleAddress() ); 775 final Presence joinStanza = new Presence(); 776 joinStanza.getElement().addElement(QName.get("x", "http://jabber.org/protocol/muc")); 777 final Element mucUser = joinStanza.getElement().addElement(QName.get("x", "http://jabber.org/protocol/muc#user")); 778 final Element mucUserItem = mucUser.addElement("item"); 779 mucUserItem.addAttribute("affiliation", mucRole.getAffiliation().toString()); 780 mucUserItem.addAttribute("role", mucRole.getRole().toString()); 781 782 // Don't include the occupant's JID if the room is semi-anon and the new occupant is not a moderator 783 if (!room.canAnyoneDiscoverJID()) { 784 if (MUCRole.Role.moderator == mucRole.getRole()) { 785 mucUserItem.addAttribute("jid", mucRole.getUserAddress().toString()); 786 } 787 else { 788 mucUserItem.addAttribute("jid", null); 789 } 790 } 791 792 return joinStanza; 793 } 794 process( @onnull final Packet stanza )795 public synchronized void process( @Nonnull final Packet stanza ) 796 { 797 if ( !(room.isFmucEnabled() && FMUC_ENABLED.getValue()) ) { 798 Log.debug( "(room: '{}'): FMUC disabled, skipping processing of stanza: {}", room.getJID(), stanza.toXML() ); 799 if ( stanza instanceof IQ && ((IQ) stanza).isRequest() ) { 800 final IQ errorResult = IQ.createResultIQ( (IQ) stanza); 801 errorResult.setError(PacketError.Condition.service_unavailable); 802 XMPPServer.getInstance().getPacketRouter().route( errorResult ); 803 } 804 return; 805 } 806 807 Log.trace( "(room: '{}'): Processing stanza from '{}': {}", room.getJID(), stanza.getFrom(), stanza.toXML() ); 808 final JID remoteMUC = stanza.getFrom().asBareJID(); 809 810 if ( stanza.getElement().element(FMUC) == null ) { 811 throw new IllegalArgumentException( "Unable to process stanza that does not have FMUC data: " + stanza.toXML() ); 812 } 813 814 if ( remoteMUC.getNode() == null ) { 815 throw new IllegalArgumentException( "Unable to process stanza that did not originate from a MUC room (the 'from' address has no node-part):" + stanza.toXML() ); 816 } 817 818 if ( outboundJoinProgress != null && outboundJoinProgress.getPeer().equals( remoteMUC ) && !outboundJoinProgress.isJoinComplete() ) 819 { 820 Log.trace("(room: '{}'): Received stanza from '{}' that is identified as outbound FMUC node for which a join is in progress.", room.getJID(), remoteMUC); 821 822 Log.trace("(room: '{}'): Queueing stanza from '{}' as partial FMUC join response.", room.getJID(), remoteMUC); 823 outboundJoinProgress.addResponse(stanza); 824 825 if ( outboundJoinProgress.isJoinComplete() ) 826 { 827 Log.debug("(room: '{}'): Received a complete FMUC join response from '{}'.", room.getJID(), remoteMUC); 828 finishOutboundJoin(); 829 outboundJoinProgress = null; 830 } 831 } 832 else if ( outboundJoin != null && outboundJoin.getPeer().equals( remoteMUC ) ) 833 { 834 Log.trace("(room: '{}'): Received stanza from '{}' that is identified as outbound FMUC node.", room.getJID(), remoteMUC); 835 if ( stanza instanceof Presence && stanza.getElement().element(FMUC).element("left") != null ) { 836 processLeftInstruction( (Presence) stanza ); 837 } else { 838 outboundJoin.evaluateForCallbackCompletion(stanza); 839 processRegularMUCStanza( stanza ); 840 } 841 } 842 else if ( inboundJoins.get( remoteMUC.asBareJID() ) != null ) 843 { 844 Log.trace("(room: '{}'): Received stanza from '{}' that is identified as inbound FMUC node.", room.getJID(), remoteMUC); 845 processRegularMUCStanza( stanza ); 846 } 847 else 848 { 849 Log.trace("(room: '{}'): Received stanza from '{}' that is not a known FMUC node.", room.getJID(), remoteMUC ); //Treating as inbound FMUC node join request.", room.getJID(), remoteMUC); 850 if ( isFMUCJoinRequest( stanza ) ) { 851 try 852 { 853 checkAcceptingFMUCJoiningNodePreconditions(remoteMUC); 854 acceptJoiningFMUCNode( (Presence) stanza ); 855 } catch ( final FMUCException e ) { 856 rejectJoiningFMUCNode( (Presence) stanza, e.getMessage() ); 857 } 858 } else { 859 Log.debug("(room: '{}'): Unable to process stanza from '{}'. Ignoring: {}", room.getJID(), remoteMUC, stanza.toXML() ); 860 if ( stanza instanceof IQ && ((IQ) stanza).isRequest() ) { 861 final IQ errorResult = IQ.createResultIQ( (IQ) stanza); 862 errorResult.setError(PacketError.Condition.service_unavailable); 863 XMPPServer.getInstance().getPacketRouter().route( errorResult ); 864 } 865 } 866 } 867 } 868 869 /** 870 * Process a 'left' notification that is sent to us by the remote joined node. 871 * 872 * All occupants of the room will be notified that the occupants that joined through the node that has disconnected 873 * us are no longer available (in a typical scenario, no such local occupants are expected to be in the room, as the 874 * 'left' notification should be trigged by the last occupant having left the room). 875 * 876 * @param stanza The stanza that informed us that the FMUC peer considers us disconnected. 877 */ processLeftInstruction( @onnull final Presence stanza )878 private void processLeftInstruction( @Nonnull final Presence stanza ) 879 { 880 if ( stanza.getElement().element(FMUC) == null ) { 881 throw new IllegalArgumentException( "Unable to process stanza that does not have FMUC data: " + stanza.toXML() ); 882 } 883 884 Log.trace("(room: '{}'): FMUC peer '{}' informed us that we left the FMUC set.", room.getJID(), outboundJoin.getPeer() ); 885 886 // This *should* only occur after all of our local users have left the room. For good measure, send out 887 // 'leave' for all occupants from the now disconnected FMUC node anyway. 888 makeRemoteOccupantLeaveRoom( outboundJoin.occupants ); 889 890 outboundJoin = null; 891 outboundJoinProgress = null; 892 } 893 894 /** 895 * Processes a stanza that is received from another node in the FMUC set, by translating it into 'regular' MUC data. 896 * 897 * The provided input is expected to be a stanza received from another node in the FMUC set. It is stripped from 898 * FMUC data, after which it is distributed to the local users. 899 * 900 * Additionally, it is sent out to all (other) FMUC nodes that are known. 901 * 902 * @param stanza The data to be processed. 903 */ processRegularMUCStanza( @onnull final Packet stanza )904 private void processRegularMUCStanza( @Nonnull final Packet stanza ) 905 { 906 final Element fmuc = stanza.getElement().element(FMUC); 907 if (fmuc == null) { 908 throw new IllegalArgumentException( "Provided stanza must have an 'fmuc' child element (but does not)."); 909 } 910 911 final JID remoteMUC = stanza.getFrom().asBareJID(); 912 final JID author = new JID( fmuc.attributeValue("from") ); // TODO input validation. 913 final MUCRole senderRole = room.getOccupantByFullJID( author ); 914 Log.trace("(room: '{}'): Processing stanza from remote FMUC peer '{}' as regular room traffic. Sender of stanza: {}", room.getJID(), remoteMUC, author ); 915 916 // Distribute. Note that this will distribute both to the local node, as well as to all FMUC nodes in the the FMUC set. 917 if ( stanza instanceof Presence ) { 918 RemoteFMUCNode remoteFMUCNode = inboundJoins.get(remoteMUC); 919 if ( remoteFMUCNode == null && outboundJoin != null && remoteMUC.equals(outboundJoin.getPeer())) { 920 remoteFMUCNode = outboundJoin; 921 } 922 if ( remoteFMUCNode != null ) 923 { 924 final boolean isLeave = ((Presence) stanza).getType() == Presence.Type.unavailable; 925 final boolean isJoin = ((Presence) stanza).isAvailable(); 926 927 if ( isLeave ) 928 { 929 Log.trace("(room: '{}'): Occupant '{}' left room on remote FMUC peer '{}'", room.getJID(), author, remoteMUC ); 930 makeRemoteOccupantLeaveRoom( (Presence) stanza ); 931 remoteFMUCNode.removeOccupant(author); // Remove occupant only after the leave stanzas have been sent, otherwise the author is (no longer) recognized as an occupant of the particular node when the leave is being processed. 932 933 // The joined room confirms that the joining room has left the set by sending a presence stanza from the bare JID 934 // of the joined room to the bare JID of the joining room with an FMUC payload containing an element 'left'. 935 if ( remoteFMUCNode instanceof InboundJoin && remoteFMUCNode.occupants.isEmpty() ) { 936 Log.trace("(room: '{}'): Last occupant that joined on remote FMUC peer '{}' has now left the room. The peer has left the FMUC node set.", room.getJID(), remoteMUC ); 937 final Presence leaveFMUCSet = new Presence(); 938 leaveFMUCSet.setTo( remoteMUC ); 939 leaveFMUCSet.setFrom( room.getJID() ); 940 leaveFMUCSet.getElement().addElement( FMUC ).addElement( "left" ); 941 inboundJoins.remove(remoteMUC); 942 943 XMPPServer.getInstance().getPacketRouter().route( leaveFMUCSet ); 944 } 945 } else if ( isJoin ) { 946 Log.trace("(room: '{}'): Occupant '{}' joined room on remote FMUC peer '{}'", room.getJID(), author, remoteMUC ); 947 remoteFMUCNode.addOccupant(author); 948 makeRemoteOccupantJoinRoom( (Presence) stanza ); 949 } else { 950 // FIXME implement sharing of presence. 951 Log.error("Processing of presence stanzas received from other FMUC nodes is pending implementation! Ignored stanza: {}", stanza.toXML(), new UnsupportedOperationException()); 952 } 953 } 954 else 955 { 956 Log.warn( "Unable to process stanza: {}", stanza.toXML() ); 957 } 958 } else { 959 // Strip all FMUC data. 960 final Packet stripped = createCopyWithoutFMUC( stanza ); 961 962 // The 'stripped' stanza is going to be distributed locally. Act as if it originates from a local user, instead of the remote FMUC one. 963 final JID from; 964 from = senderRole.getRoleAddress(); 965 stripped.setFrom( from ); 966 stripped.setTo( room.getJID() ); 967 968 room.send( stripped, senderRole ); 969 } 970 } 971 finishOutboundJoin()972 private void finishOutboundJoin() 973 { 974 if ( outboundJoinProgress == null ) { 975 throw new IllegalStateException( "Cannot finish outbound join from '" + room.getJID() + "' as none is in progress." ); 976 } 977 Log.trace("(room: '{}'): Finish setting up the outbound FMUC join with '{}'.", room.getJID(), outboundJoinProgress.getPeer() ); 978 if ( !outboundJoinProgress.isJoinComplete() ) { 979 throw new IllegalStateException( "Cannot finish outbound join from '" + room.getJID()+"' to '"+ outboundJoinProgress.getPeer()+"', as it is not complete!" ); 980 } 981 982 List<OutboundJoinProgress.QueuedStanza> queued = outboundJoinProgress.purgeQueue(); 983 if ( outboundJoinProgress.isRejected() ) 984 { 985 Log.trace("(room: '{}'): Notifying callback waiting for the complete FMUC join response from '{}' with a rejection.", room.getJID(), outboundJoinProgress.getPeer() ); 986 final FMUCException rejection = new FMUCException(outboundJoinProgress.getRejectionMessage()); 987 outboundJoinProgress.getCallback().completeExceptionally(rejection); 988 queued.forEach( queuedStanza -> queuedStanza.future.completeExceptionally(rejection)); 989 } 990 else 991 { 992 Log.trace("(room: '{}'): Synchronizing state of local room with joined FMUC node '{}'.", room.getJID(), outboundJoinProgress.getPeer() ); 993 outboundJoin = new OutboundJoin(outboundJoinConfiguration); 994 995 // Before processing the data in context of the local FMUC room, ensure that the FMUC metadata state is up-to-date. 996 for ( final Packet response : outboundJoinProgress.getResponses() ) { 997 if ( response instanceof Presence ) { 998 final JID occupantOnJoinedNode = getFMUCFromJID(response); 999 outboundJoin.addOccupant( occupantOnJoinedNode ); 1000 } 1001 } 1002 1003 // Use a room role that can be used to identify the remote fmuc node (to prevent data from being echo'd back) 1004 final MUCRole roomRole = MUCRole.createRoomRole(room); 1005 roomRole.setReportedFmucAddress( outboundJoin.getPeer() ); 1006 1007 // Use received data to augment state of the local room. 1008 for ( final Packet response : outboundJoinProgress.getResponses() ) { 1009 try 1010 { 1011 if ( response instanceof Presence ) { 1012 makeRemoteOccupantJoinRoom((Presence) response); 1013 } else if ( response instanceof Message && response.getElement().element("body") != null) { 1014 addRemoteHistoryToRoom((Message) response); 1015 } else if ( response instanceof Message && response.getElement().element("subject") != null) { 1016 applyRemoteSubjectToRoom((Message) response, roomRole); 1017 } 1018 } catch ( Exception e ) { 1019 Log.error( "(room: '{}'): An unexpected exception occurred while processing FMUC join response stanzas.", room.getJID(), e ); 1020 } 1021 } 1022 1023 Log.trace("(room: '{}'): Notifying callback waiting for the complete FMUC join response from '{}' with success.", room.getJID(), outboundJoinProgress.getPeer() ); 1024 outboundJoinProgress.getCallback().complete( null ); 1025 1026 Log.trace("(room: '{}'): Sending {} stanza(s) that were queued, waiting for the complete FMUC join", room.getJID(), queued ); 1027 for ( final OutboundJoinProgress.QueuedStanza queuedStanza : queued ) { 1028 try { 1029 doPropagateOutbound(queuedStanza.stanza, queuedStanza.sender, queuedStanza.future); 1030 } catch ( Exception e ) { 1031 Log.warn( "An exception occurred while trying to process a stanza that was queued pending completion of FMUC join in room " + room.getJID() + ": " + queuedStanza.stanza ); 1032 } 1033 } 1034 } 1035 } 1036 applyRemoteSubjectToRoom( @onnull final Message message, @Nonnull final MUCRole mucRole )1037 private void applyRemoteSubjectToRoom( @Nonnull final Message message, @Nonnull final MUCRole mucRole ) 1038 { 1039 try 1040 { 1041 Log.trace("(room: '{}'): Received subject from joined FMUC node '{}'. Applying it locally.", room.getJID(), mucRole.getReportedFmucAddress() ); 1042 room.changeSubject(createCopyWithoutFMUC(message), mucRole); 1043 } 1044 catch ( ForbiddenException e ) { 1045 // This should not be possible, as we're using a role above that should bypass the auth checks that throw this exception! 1046 Log.error( "(room: '{}'): An unexpected exception occurred while processing FMUC join response stanzas.", room.getJID(), e ); 1047 } 1048 } 1049 addRemoteHistoryToRoom( @onnull final Message message )1050 private void addRemoteHistoryToRoom( @Nonnull final Message message ) 1051 { 1052 final Element fmuc = message.getElement().element(FMUC); 1053 if ( fmuc == null ) { 1054 throw new IllegalArgumentException( "Argument 'presence' should be an FMUC presence, but it does not appear to be: it is missing the FMUC child element." ); 1055 } 1056 1057 Log.trace("(room: '{}'): Received history from joined FMUC node '{}'. Applying it locally.", room.getJID(), outboundJoinProgress.getPeer() ); 1058 1059 final JID userJID = new JID( fmuc.attributeValue("from")); 1060 final String nickname = message.getFrom().getResource(); 1061 Date sentDate; 1062 final Element delay = message.getChildElement("delay","urn:xmpp:delay"); 1063 if ( delay != null ) { 1064 final String stamp = delay.attributeValue("stamp"); 1065 try 1066 { 1067 sentDate = new XMPPDateTimeFormat().parseString(stamp); 1068 } 1069 catch ( ParseException e ) 1070 { 1071 Log.warn( "Cannot parse 'stamp' from delay element in message as received in FMUC join: {}", message, e ); 1072 sentDate = null; 1073 } 1074 } else { 1075 sentDate = null; 1076 Log.warn( "Missing delay element in message received in FMUC join: {}", message ); 1077 } 1078 1079 final Message cleanedUpMessage = createCopyWithoutFMUC(message); 1080 room.getRoomHistory().addOldMessage( userJID.toString(), nickname, sentDate, cleanedUpMessage.getSubject(), cleanedUpMessage.getBody(), cleanedUpMessage.toXML() ); 1081 } 1082 1083 /** 1084 * Parses the JID from an FMUC stanza. 1085 * 1086 * More specifically, this method returns the JID representation of the 'from' attribute value of the 'fmuc' child 1087 * element in the stanza. A runtime exception is thrown when no such value exists, or when that value is not a 1088 * valid JID. 1089 * 1090 * @param stanza An FMUC stanza 1091 * @return A JID. 1092 * @throws RuntimeException when no valid JID value is found in the 'from' attribute of the FMUC child element. 1093 */ getFMUCFromJID( @onnull final Packet stanza )1094 public static JID getFMUCFromJID( @Nonnull final Packet stanza ) 1095 { 1096 final Element fmuc = stanza.getElement().element(FMUC); 1097 if ( fmuc == null ) { 1098 throw new IllegalArgumentException( "Argument 'stanza' should be an FMUC stanza, but it does not appear to be: it is missing the FMUC child element." ); 1099 } 1100 1101 final String fromValue = fmuc.attributeValue( "from" ); 1102 1103 if ( fromValue == null ) { 1104 throw new IllegalArgumentException( "Argument 'stanza' should be a valid FMUC stanza, but it does not appear to be: it is missing a 'from' attribute value in the FMUC child element." ); 1105 } 1106 1107 final JID userJID = new JID( fromValue ); 1108 return userJID; 1109 } 1110 1111 /** 1112 * Processes a presence stanza that is expected to be an FMUC-flavored 'room join' representation, and adds the 1113 * remote user to the room. 1114 * 1115 * This method will <em>not</em> make modifications to the state of the FMUC node set. It expects those changes to 1116 * be taken care of by the caller. 1117 * 1118 * This method provides the functionally opposite implementation of {@link #makeRemoteOccupantLeaveRoom(Presence)}. 1119 * 1120 * @param presence The stanza representing a user on a federated FMUC node joining the room (cannot be null). 1121 * @see #makeRemoteOccupantLeaveRoom(Presence) 1122 */ makeRemoteOccupantJoinRoom( @onnull final Presence presence )1123 private void makeRemoteOccupantJoinRoom( @Nonnull final Presence presence ) 1124 { 1125 // FIXME: better input validation / better problem handling when remote node sends crappy data! 1126 final Element mucUser = presence.getElement().element(QName.get("x","http://jabber.org/protocol/muc#user")); 1127 final Element fmuc = presence.getElement().element(FMUC); 1128 if ( fmuc == null ) { 1129 throw new IllegalArgumentException( "Argument 'presence' should be an FMUC presence, but it does not appear to be: it is missing the FMUC child element." ); 1130 } 1131 1132 final JID remoteMUC = presence.getFrom().asBareJID(); 1133 final String nickname = presence.getFrom().getResource(); 1134 1135 Log.debug( "(room: '{}'): Occupant on remote peer '{}' joins the room with nickname '{}'.", room.getJID(), remoteMUC, nickname ); 1136 1137 MUCRole.Role role; 1138 if ( mucUser != null && mucUser.element("item") != null && mucUser.element("item").attributeValue("role") != null ) { 1139 try { 1140 role = MUCRole.Role.valueOf(mucUser.element("item").attributeValue("role")); 1141 } catch ( IllegalArgumentException e ) { 1142 Log.info( "Cannot parse role as received in FMUC join, using default role instead: {}", presence, e ); 1143 role = MUCRole.Role.participant; 1144 } 1145 } else { 1146 Log.info( "Cannot parse role as received in FMUC join, using default role instead: {}", presence ); 1147 role = MUCRole.Role.participant; 1148 } 1149 1150 MUCRole.Affiliation affiliation; 1151 if ( mucUser != null && mucUser.element("item") != null && mucUser.element("item").attributeValue("affiliation") != null ) { 1152 try { 1153 affiliation = MUCRole.Affiliation.valueOf(mucUser.element("item").attributeValue("affiliation")); 1154 } catch ( IllegalArgumentException e ) { 1155 Log.info( "Cannot parse affiliation as received in FMUC join, using default role instead: {}", presence, e ); 1156 affiliation = MUCRole.Affiliation.none; 1157 } 1158 } else { 1159 Log.info( "Cannot parse affiliation as received in FMUC join, using default role instead: {}", presence ); 1160 affiliation = MUCRole.Affiliation.none; 1161 } 1162 1163 final JID userJID = getFMUCFromJID( presence ); 1164 1165 final MUCRole joinRole = new MUCRole(room, nickname, role, affiliation, userJID, createCopyWithoutFMUC(presence)); 1166 1167 joinRole.setReportedFmucAddress( userJID ); 1168 1169 final boolean clientOnlyJoin = room.alreadyJoinedWithThisNick( userJID, nickname ); 1170 if (clientOnlyJoin) 1171 { 1172 Log.warn( "(room: '{}'): Ignoring join of occupant on remote peer '{}' with nickname '{}' as this user is already in the room.", room.getJID(), remoteMUC, nickname ); 1173 } 1174 else 1175 { 1176 // Update the (local) room state to now include this occupant. 1177 room.addOccupantRole(joinRole); 1178 1179 // Send out presence stanzas that signal all other occupants that this occupant has now joined. Unlike a 'regular' join we MUST 1180 // _not_ sent back presence for all other occupants (that has already been covered by the FMUC protocol implementation). 1181 room.sendInitialPresenceToExistingOccupants(joinRole); 1182 } 1183 } 1184 1185 /** 1186 * Removes a remote user from the room. 1187 * 1188 * This method is intended to be used when a remote node is being disconnected from the FMUC node set, without having 1189 * sent 'leave' presence stanzas for its occupants. This method generates such presence stanzas, and delegates 1190 * further processing to {@link #makeRemoteOccupantLeaveRoom(Presence)} 1191 * 1192 * @param removedRemoteOccupants The occupants to be removed from the room. 1193 */ makeRemoteOccupantLeaveRoom( @onnull Set<JID> removedRemoteOccupants )1194 private void makeRemoteOccupantLeaveRoom( @Nonnull Set<JID> removedRemoteOccupants ) { 1195 for ( final JID removedRemoteOccupant : removedRemoteOccupants ) 1196 { 1197 try 1198 { 1199 Log.trace("(room: '{}'): Removing occupant '{}' that was joined through a (now presumably disconnected) remote node.", room.getJID(), removedRemoteOccupant); 1200 final MUCRole role = room.getOccupantByFullJID( removedRemoteOccupant ); 1201 if ( role == null ) { 1202 Log.warn("(room: '{}'): Unable to remove '{}' as it currently is not registered as an occupant of this room.", room.getJID(), removedRemoteOccupant); 1203 continue; 1204 } 1205 1206 final Presence leave = new Presence(); 1207 leave.setType(Presence.Type.unavailable); 1208 leave.setTo(role.getRoleAddress()); 1209 leave.setFrom(role.getUserAddress()); 1210 leave.setStatus("FMUC node disconnect"); 1211 final Presence enriched = enrichWithFMUCElement( leave, role.getReportedFmucAddress() ); 1212 1213 makeRemoteOccupantLeaveRoom( enriched ); 1214 } 1215 catch ( Exception e ) 1216 { 1217 Log.warn("(room: '{}'): An exception occurred while removing occupant '{}' from a (now presumably disconnected) remote node.", room.getJID(), removedRemoteOccupant, e); 1218 } 1219 } 1220 } 1221 1222 /** 1223 * Processes a presence stanza that is expected to be an FMUC-flavored 'leave' representation, and removes the 1224 * remote user from the room. 1225 * 1226 * This method will <em>not</em> make modifications to the state of the FMUC node set. It expects those changes to 1227 * be taken care of by the caller. 1228 * 1229 * This method provides the functionally opposite implementation of {@link #makeRemoteOccupantJoinRoom(Presence)}. 1230 * 1231 * @param presence The stanza representing a user on a federated FMUC node leaving the room (cannot be null). 1232 * @see #makeRemoteOccupantJoinRoom(Presence) 1233 */ makeRemoteOccupantLeaveRoom( @onnull final Presence presence )1234 private void makeRemoteOccupantLeaveRoom( @Nonnull final Presence presence ) 1235 { 1236 // FIXME: better input validation / better problem handling when remote node sends crappy data! 1237 final Element fmuc = presence.getElement().element(FMUC); 1238 if ( fmuc == null ) { 1239 throw new IllegalArgumentException( "Argument 'presence' should be an FMUC presence, but it does not appear to be: it is missing the FMUC child element." ); 1240 } 1241 final JID userJID = getFMUCFromJID( presence ); 1242 1243 final MUCRole leaveRole = room.getOccupantByFullJID( userJID ); 1244 leaveRole.setPresence( createCopyWithoutFMUC(presence) ); // update presence to reflect the 'leave' - this is used later to broadcast to other occupants. 1245 1246 // Send presence to inform all occupants of the room that the user has left. 1247 room.sendLeavePresenceToExistingOccupants( leaveRole ) 1248 // DO NOT use 'thenRunAsync', as that will cause issues with clustering (it uses an executor that overrides the contextClassLoader, causing ClassNotFound exceptions in ClusterExternalizableUtil. 1249 .thenRun( () -> { 1250 // Update the (local) room state to no longer include this occupant. 1251 room.removeOccupantRole(leaveRole); 1252 }); 1253 } 1254 1255 /** 1256 * Checks if the entity that attempts to join, which is assumed to represent a remote, joining FMUC node, is allowed 1257 * to join the local ('joined') FMUC node. 1258 * 1259 * @param joiningPeer the address of the remote room that attempts to join the local FMUC node. 1260 * @throws FMUCException when the peer cannot join the local FMUC node. 1261 */ checkAcceptingFMUCJoiningNodePreconditions( @onnull final JID joiningPeer )1262 private void checkAcceptingFMUCJoiningNodePreconditions( @Nonnull final JID joiningPeer ) throws FMUCException 1263 { 1264 if ( !joiningPeer.asBareJID().equals(joiningPeer) ) { 1265 throw new IllegalArgumentException( "Expected argument 'joiningPeer' to be a bare JID, but it was not: " + joiningPeer ); 1266 } 1267 1268 if ( !(room.isFmucEnabled() && FMUC_ENABLED.getValue()) ) 1269 { 1270 Log.info( "(room: '{}'): Rejecting join request of remote joining peer '{}': FMUC functionality is not enabled.", room.getJID(), joiningPeer ); 1271 throw new FMUCException( "FMUC functionality is not enabled." ); 1272 } 1273 1274 if ( this.outboundJoinConfiguration != null && joiningPeer.equals(this.outboundJoinConfiguration.getPeer()) ) { 1275 Log.info( "(room: '{}'): Rejecting join request of remote joining peer '{}': The local, joined node is set up to federate with the joining node (cannot have circular federation).", room.getJID(), joiningPeer ); 1276 throw new FMUCException( "The joined node is set up to federate with the joining node (cannot have circular federation)." ); 1277 } 1278 1279 Log.debug( "(room: '{}'): Accepting join request of remote joining peer '{}'.", room.getJID(), joiningPeer ); 1280 } 1281 1282 /** 1283 * Sends a stanza back to a remote, joining FMUC node that represents rejection of an FMUC join request. 1284 * 1285 * @param joinRequest The request to join that is being rejected 1286 * @param rejectionMessage An optional, human readable message that describes the reason for the rejection. 1287 */ rejectJoiningFMUCNode( @onnull final Presence joinRequest, @Nullable final String rejectionMessage )1288 private void rejectJoiningFMUCNode( @Nonnull final Presence joinRequest, @Nullable final String rejectionMessage ) 1289 { 1290 Log.trace("(room: '{}'): Rejecting FMUC join request from '{}'.", room.getJID(), joinRequest.getFrom().asBareJID() ); 1291 final Presence rejection = new Presence(); 1292 rejection.setTo( joinRequest.getFrom() ); 1293 rejection.setFrom( this.room.getJID() ); 1294 final Element rejectEl = rejection.addChildElement( FMUC.getName(), FMUC.getNamespaceURI() ).addElement("reject"); 1295 if ( rejectionMessage != null && !rejectionMessage.trim().isEmpty() ) { 1296 rejectEl.setText( rejectionMessage ); 1297 } 1298 XMPPServer.getInstance().getPacketRouter().route( rejection ); 1299 } 1300 1301 /** 1302 * Sends a stanza back to a remote, joining FMUC node that represents acceptance of a FMUC join request. 1303 * 1304 * @param joinRequest The request to join that is being accepted. 1305 */ acceptJoiningFMUCNode( @onnull final Presence joinRequest )1306 private void acceptJoiningFMUCNode( @Nonnull final Presence joinRequest ) 1307 { 1308 Log.trace("(room: '{}'): Accepting FMUC join request from '{}'.", room.getJID(), joinRequest.getFrom().asBareJID() ); 1309 final JID joiningPeer = joinRequest.getFrom().asBareJID(); 1310 final InboundJoin inboundJoin = new InboundJoin(joiningPeer); 1311 final JID occupant = getFMUCFromJID(joinRequest); 1312 inboundJoin.addOccupant( occupant ); 1313 inboundJoins.put(joiningPeer, inboundJoin); // TODO make thread safe. 1314 afterJoinSendOccupants( joiningPeer ); 1315 afterJoinSendHistory( joiningPeer ); 1316 afterJoinSendSubject( joiningPeer ); 1317 makeRemoteOccupantJoinRoom( joinRequest ); 1318 } 1319 afterJoinSendOccupants( @onnull final JID joiningPeer )1320 private void afterJoinSendOccupants( @Nonnull final JID joiningPeer ) 1321 { 1322 if ( !joiningPeer.asBareJID().equals(joiningPeer) ) { 1323 throw new IllegalArgumentException( "Expected argument 'joiningPeer' to be a bare JID, but it was not: " + joiningPeer ); 1324 } 1325 1326 Log.trace("(room: '{}'): Sending current occupants to joining node '{}'.", room.getJID(), joiningPeer ); 1327 1328 for ( final MUCRole occupant : room.getOccupants() ) { 1329 if ( occupant.getReportedFmucAddress() != null && occupant.getReportedFmucAddress().asBareJID().equals( joiningPeer ) ) { 1330 Log.trace("(room: '{}'): Skipping occupant '{}' as that originates from the joining node.", room.getJID(), occupant ); 1331 continue; 1332 } 1333 1334 // TODO can we use occupant.getPresence() for this? 1335 // TODO do we need to worry about who we're exposing data to? 1336 final Presence presence = new Presence(); 1337 presence.setFrom( occupant.getRoleAddress() ); 1338 presence.setTo( joiningPeer ); 1339 final Presence enriched = enrichWithFMUCElement( presence, occupant ); 1340 final Element xitem = enriched.addChildElement( "x", "http://jabber.org/protocol/muc#user" ).addElement( "item" ); 1341 xitem.addAttribute( "affiliation", occupant.getAffiliation().toString() ); 1342 xitem.addAttribute( "role", occupant.getRole().toString() ); 1343 xitem.addAttribute( "jid", occupant.getRoleAddress().toString() ); 1344 1345 XMPPServer.getInstance().getPacketRouter().route( enriched ); 1346 } 1347 } 1348 afterJoinSendHistory( @onnull final JID joiningPeer )1349 private void afterJoinSendHistory( @Nonnull final JID joiningPeer ) 1350 { 1351 if ( !joiningPeer.asBareJID().equals(joiningPeer) ) { 1352 throw new IllegalArgumentException( "Expected argument 'joiningPeer' to be a bare JID, but it was not: " + joiningPeer ); 1353 } 1354 1355 // TODO. Can org.jivesoftware.openfire.muc.spi.MUCRoom.sendRoomHistoryAfterJoin be reused to reduce duplicate code and responsibilities? 1356 Log.trace("(room: '{}'): Sending history to joining node '{}'.", room.getJID(), joiningPeer ); 1357 final MUCRoomHistory roomHistory = room.getRoomHistory(); 1358 final Iterator<Message> history = roomHistory.getMessageHistory(); 1359 while (history.hasNext()) { 1360 // The message stanza in the history is the original stanza (with original addressing), which we can leverage 1361 // to obtain the 'real' jid of the sender. Note that this sender need not be in the room any more, so we can't 1362 // depend on having a MUCRole for it. 1363 final Message oldMessage = history.next(); 1364 1365 final JID originalAuthorUserAddress = oldMessage.getFrom(); 1366 final JID originalAuthorRoleAddress = new JID( room.getJID().getNode(), room.getJID().getDomain(), originalAuthorUserAddress.getResource() ); 1367 1368 final Message enriched = enrichWithFMUCElement( oldMessage, originalAuthorUserAddress ); 1369 1370 // Correct the addressing of the stanza. 1371 enriched.setFrom( originalAuthorRoleAddress ); 1372 enriched.setTo( joiningPeer ); 1373 1374 XMPPServer.getInstance().getPacketRouter().route( enriched ); 1375 } 1376 } 1377 afterJoinSendSubject( @onnull final JID joiningPeer )1378 private void afterJoinSendSubject( @Nonnull final JID joiningPeer ) 1379 { 1380 if ( !joiningPeer.asBareJID().equals(joiningPeer) ) { 1381 throw new IllegalArgumentException( "Expected argument 'joiningPeer' to be a bare JID, but it was not: " + joiningPeer ); 1382 } 1383 1384 Log.trace("(room: '{}'): Sending subject to joining node '{}'.", room.getJID(), joiningPeer ); 1385 1386 // TODO can org.jivesoftware.openfire.muc.spi.MUCRoom.sendRoomSubjectAfterJoin be re-used? 1387 final MUCRoomHistory roomHistory = room.getRoomHistory(); 1388 Message roomSubject = roomHistory.getChangedSubject(); 1389 if ( roomSubject != null ) { 1390 roomSubject = roomSubject.createCopy(); // OF-2163: Prevent accidental modifications to the original. 1391 } else { 1392 roomSubject = new Message(); 1393 roomSubject.setFrom(this.room.getJID()); // This might break FMUC, as it does not include the nickname of the author of the subject. 1394 roomSubject.setTo(joiningPeer); 1395 roomSubject.setType(Message.Type.groupchat); 1396 roomSubject.setID(UUID.randomUUID().toString()); 1397 final Element subjectEl = roomSubject.getElement().addElement("subject"); 1398 if ( room.getSubject() != null && !room.getSubject().isEmpty() ) 1399 { 1400 subjectEl.setText(room.getSubject()); 1401 } 1402 } 1403 1404 final JID originalAuthorUserAddress = roomSubject.getFrom(); 1405 final JID originalAuthorRoleAddress = new JID( room.getJID().getNode(), room.getJID().getDomain(), originalAuthorUserAddress.getResource() ); 1406 1407 final Message enriched = enrichWithFMUCElement( roomSubject, originalAuthorUserAddress ); 1408 1409 // Correct the addressing of the stanza. 1410 enriched.setFrom( originalAuthorRoleAddress ); 1411 enriched.setTo( joiningPeer ); 1412 1413 XMPPServer.getInstance().getPacketRouter().route( enriched ); 1414 } 1415 isSubject( @onnull final Packet stanza )1416 public static boolean isSubject( @Nonnull final Packet stanza ) 1417 { 1418 final Element fmuc = stanza.getElement().element(FMUC); 1419 if (fmuc == null) { 1420 throw new IllegalArgumentException( "Provided stanza must have an 'fmuc' child element (but does not)."); 1421 } 1422 1423 final boolean result = stanza instanceof Message && stanza.getElement().element("subject") != null; 1424 Log.trace( "Stanza from '{}' was determined to {} a stanza containing a MUC subject.", stanza.getFrom(), result ? "be" : "not be" ); 1425 return result; 1426 } 1427 isFMUCReject( @onnull final Packet stanza )1428 public static boolean isFMUCReject( @Nonnull final Packet stanza ) 1429 { 1430 final Element fmuc = stanza.getElement().element(FMUC); 1431 if (fmuc == null) { 1432 throw new IllegalArgumentException( "Provided stanza must have an 'fmuc' child element (but does not)."); 1433 } 1434 1435 final boolean result = stanza instanceof Presence && fmuc.element("reject") != null; 1436 Log.trace( "Stanza from '{}' was determined to {} a stanza containing a FMUC join reject.", stanza.getFrom(), result ? "be" : "not be" ); 1437 return result; 1438 } 1439 isFMUCJoinRequest( @onnull Packet stanza )1440 public static boolean isFMUCJoinRequest( @Nonnull Packet stanza ) 1441 { 1442 final Element fmuc = stanza.getElement().element(FMUC); 1443 if (fmuc == null) { 1444 throw new IllegalArgumentException( "Provided stanza must have an 'fmuc' child element (but does not)."); 1445 } 1446 1447 final boolean result = 1448 (stanza instanceof Presence) && 1449 stanza.getElement().element( QName.get( "x", "http://jabber.org/protocol/muc") ) != null && 1450 stanza.getElement().element( QName.get( "x", "http://jabber.org/protocol/muc#user") ) != null; 1451 1452 Log.trace( "Stanza from '{}' was determined to {} a stanza containing a FMUC join request.", stanza.getFrom(), result ? "be" : "not be" ); 1453 return result; 1454 } 1455 getOutboundJoin()1456 public OutboundJoin getOutboundJoin() { 1457 return outboundJoin; 1458 } 1459 getOutboundJoinProgress()1460 public OutboundJoinProgress getOutboundJoinProgress() { 1461 return outboundJoinProgress; 1462 } 1463 getInboundJoins()1464 public Collection<InboundJoin> getInboundJoins() { 1465 return inboundJoins.values(); 1466 } 1467 abortOutboundJoinProgress()1468 public void abortOutboundJoinProgress() 1469 { 1470 if ( outboundJoinProgress != null ) { 1471 outboundJoinProgress.abort(); 1472 outboundJoinProgress = null; 1473 } 1474 } 1475 1476 abstract static class RemoteFMUCNode implements Serializable 1477 { 1478 private final Logger Log; 1479 1480 /** 1481 * The address of the remote MUC room that is federating with us, in which 'our' MUC room takes the role of 1482 * 'joined FMUC node' while the room that federates with us (who's address is recorded in this field) takes the 1483 * role of 'joining FMUC node'. 1484 */ 1485 private final JID peer; 1486 1487 /** 1488 * The addresses of the occupants of the MUC that are joined through this FMUC node. 1489 */ 1490 protected final Set<JID> occupants = new HashSet<>(); 1491 getPeer()1492 public JID getPeer() { 1493 return peer; 1494 } 1495 RemoteFMUCNode( @onnull final JID peer )1496 RemoteFMUCNode( @Nonnull final JID peer ) { 1497 this.peer = peer.asBareJID(); 1498 Log = LoggerFactory.getLogger( this.getClass().getName() + ".[peer: " + peer + "]" ); 1499 } 1500 wantsStanzasSentBy( @onnull final MUCRole sender )1501 public boolean wantsStanzasSentBy( @Nonnull final MUCRole sender ) { 1502 // Only send data if the sender is not an entity on this remote FMUC node, or the remote FMUC node itself. 1503 return sender.getReportedFmucAddress() == null || (!occupants.contains( sender.getReportedFmucAddress() ) && !peer.equals( sender.getReportedFmucAddress()) ); 1504 } 1505 addOccupant( @onnull final JID occupant )1506 public boolean addOccupant( @Nonnull final JID occupant ) { 1507 Log.trace( "Adding remote occupant: '{}'", occupant ); 1508 return occupants.add( occupant ); 1509 } 1510 removeOccupant( @onnull final JID occupant )1511 public boolean removeOccupant( @Nonnull final JID occupant ) { 1512 Log.trace( "Removing remote occupant: '{}'", occupant ); 1513 return occupants.remove( occupant ); 1514 } 1515 getOccupants()1516 public Set<JID> getOccupants() { 1517 return occupants; 1518 } 1519 } 1520 1521 public static class InboundJoin extends RemoteFMUCNode 1522 { InboundJoin( @onnull final JID peer )1523 public InboundJoin( @Nonnull final JID peer ) 1524 { 1525 super(peer); 1526 } 1527 } 1528 1529 public static class OutboundJoinConfiguration 1530 { 1531 private final FMUCMode mode; 1532 private final JID peer; 1533 OutboundJoinConfiguration( @onnull final JID peer, @Nonnull final FMUCMode mode )1534 public OutboundJoinConfiguration( @Nonnull final JID peer, @Nonnull final FMUCMode mode ) { 1535 this.mode = mode; 1536 this.peer = peer; 1537 } 1538 getMode()1539 public FMUCMode getMode() 1540 { 1541 return mode; 1542 } 1543 getPeer()1544 public JID getPeer() 1545 { 1546 return peer; 1547 } 1548 1549 @Override equals( final Object o )1550 public boolean equals( final Object o ) 1551 { 1552 if ( this == o ) { return true; } 1553 if ( o == null || getClass() != o.getClass() ) { return false; } 1554 final OutboundJoinConfiguration that = (OutboundJoinConfiguration) o; 1555 return mode == that.mode && 1556 peer.equals(that.peer); 1557 } 1558 1559 @Override hashCode()1560 public int hashCode() 1561 { 1562 return Objects.hash(mode, peer); 1563 } 1564 1565 @Override toString()1566 public String toString() 1567 { 1568 return "OutboundJoinConfiguration{" + 1569 "peer=" + peer + 1570 ", mode=" + mode + 1571 '}'; 1572 } 1573 } 1574 1575 public static class OutboundJoin extends RemoteFMUCNode 1576 { 1577 private final FMUCMode mode; 1578 1579 /** 1580 * A list of stanzas that need to have been echo'd by a remote FMUC node, before they can be processed locally. 1581 * This collection _does not_ include stanzas needed to set up the initial join. This collection is only used 1582 * for subsequent stanzas that are shared in a setting where echo-ing is required (due to the mode of the 1583 * federation being defined as 'master-slave'). 1584 */ 1585 private final Set<PendingCallback> pendingEcho = new HashSet<>(); 1586 OutboundJoin( @onnull OutboundJoinConfiguration configuration )1587 public OutboundJoin( @Nonnull OutboundJoinConfiguration configuration ) { 1588 super( configuration.getPeer() ); 1589 this.mode = configuration.getMode(); 1590 } 1591 OutboundJoin( @onnull final JID peer, @Nonnull final FMUCMode mode )1592 public OutboundJoin( @Nonnull final JID peer, @Nonnull final FMUCMode mode ) { 1593 super(peer); 1594 this.mode = mode; 1595 } 1596 getMode()1597 public FMUCMode getMode() 1598 { 1599 return mode; 1600 } 1601 getConfiguration()1602 public OutboundJoinConfiguration getConfiguration() { 1603 return new OutboundJoinConfiguration(getPeer(), getMode() ); 1604 } 1605 1606 @Override wantsStanzasSentBy( @onnull MUCRole sender )1607 public boolean wantsStanzasSentBy( @Nonnull MUCRole sender ) { 1608 if ( FMUCMode.MasterSlave == mode ) { 1609 return true; // always wants stanzas - if only because the data needs to be echo'd back. 1610 } 1611 1612 return super.wantsStanzasSentBy(sender); 1613 } 1614 evaluateForCallbackCompletion( @onnull Packet stanza )1615 public synchronized void evaluateForCallbackCompletion( @Nonnull Packet stanza ) 1616 { 1617 Log.trace( "Evaluating stanza for callback completion..." ); 1618 if ( stanza.getElement().element(FMUC) == null ) { 1619 throw new IllegalArgumentException( "Argument 'stanza' must have an FMUC child element." ); 1620 } 1621 1622 // Ignore if we do not have a callback waiting for this stanza. 1623 final Iterator<PendingCallback> iter = pendingEcho.iterator(); 1624 while (iter.hasNext()) { 1625 final PendingCallback item = iter.next(); 1626 if ( item.isMatch(stanza) ) { 1627 Log.trace( "Invoking callback, as peer '{}' echo'd back stanza: {}", getPeer(), stanza.toXML() ); 1628 item.complete(); 1629 iter.remove(); 1630 } 1631 } 1632 Log.trace( "Finished evaluating stanza for callback completion." ); 1633 } 1634 registerEchoCallback( @onnull final Packet stanza, @Nonnull final CompletableFuture<?> result )1635 public synchronized void registerEchoCallback( @Nonnull final Packet stanza, @Nonnull final CompletableFuture<?> result ) 1636 { 1637 if ( stanza.getElement().element(FMUC) == null ) { 1638 throw new IllegalArgumentException( "Argument 'stanza' must have an FMUC child element." ); 1639 } 1640 1641 Log.trace( "Registering callback to be invoked when peer '{}' echos back stanza {}", getPeer(), stanza.toXML() ); 1642 pendingEcho.add( new PendingCallback( stanza, result ) ); 1643 } 1644 } 1645 1646 public static class OutboundJoinProgress implements Serializable 1647 { 1648 private final Logger Log; 1649 1650 /** 1651 * The address of the remote MUC room with which we are attempting to federate, in which 'our' MUC room takes the role of 1652 * 'joining FMUC node' while the room that federates with us (who's address is recorded in this field) takes the 1653 * role of 'joined FMUC node'. 1654 */ 1655 private final JID peer; 1656 1657 /** 1658 * The future that is awaiting completion of the join operation. 1659 */ 1660 private final CompletableFuture<List<Packet>> callback; 1661 1662 /** 1663 * A list of stanzas that have been sent from the remote room to the local room as part of the 'join' effort. 1664 * 1665 * This list is expected to contain (presence of) each participant, a message history, and a subject stanza. 1666 */ 1667 private final ArrayList<Packet> responses; 1668 1669 /** 1670 * A list of stanzas to be sent to the remote room as soon as federation has been established. 1671 * 1672 * This list is expected to contain stanzas that were shared with the joined room between the instant that a 1673 * federation attempt was started, and was completed. 1674 */ 1675 private final ArrayList<QueuedStanza> queue; 1676 1677 /** 1678 * The state of the federation join. Null means that the request is pending completion. True means a successful 1679 * join was achieved, while false means that the join request failed or was aborted. 1680 */ 1681 private Boolean joinResult; 1682 OutboundJoinProgress( @onnull final JID peer, @Nonnull final CompletableFuture<List<Packet>> callback )1683 public OutboundJoinProgress( @Nonnull final JID peer, @Nonnull final CompletableFuture<List<Packet>> callback ) 1684 { 1685 Log = LoggerFactory.getLogger( this.getClass().getName() + ".[peer: " + peer + "]" ); 1686 this.peer = peer.asBareJID(); 1687 this.callback = callback; 1688 this.responses = new ArrayList<>(); 1689 this.queue = new ArrayList<>(); 1690 } 1691 getPeer()1692 public JID getPeer() { 1693 return peer; 1694 } 1695 getCallback()1696 public synchronized CompletableFuture<List<Packet>> getCallback() { 1697 return callback; 1698 } 1699 getResponses()1700 public synchronized ArrayList<Packet> getResponses() { 1701 return responses; 1702 } 1703 addResponse( @onnull final Packet stanza )1704 synchronized void addResponse( @Nonnull final Packet stanza ) { 1705 this.responses.add( stanza ); 1706 if (joinResult == null) { 1707 if ( isSubject( stanza ) ) { 1708 joinResult = true; 1709 } 1710 if ( isFMUCReject( stanza ) ) { 1711 joinResult = false; 1712 } 1713 } 1714 } 1715 1716 /** 1717 * Adds a stanza to be sent to the remote, joined MUC as soon as federation has been established. 1718 * 1719 * This method is intended to be used only when federation is in progress of being established. 1720 * 1721 * @param stanza The stanza to share 1722 * @param sender The author of the stanza 1723 * @return A future, indicating if local distribution of the stanza needs to wait. 1724 */ addToQueue( @onnull final Packet stanza, @Nonnull final MUCRole sender )1725 public synchronized CompletableFuture<?> addToQueue( @Nonnull final Packet stanza, @Nonnull final MUCRole sender ) { 1726 if( isJoinComplete() ) { 1727 throw new IllegalStateException( "Queueing a stanza is not expected to occur when federation has already been established." ); 1728 } 1729 1730 final CompletableFuture<?> result = new CompletableFuture<>(); 1731 if ( callback.isDone() ) { 1732 result.complete(null); 1733 } 1734 1735 Log.trace( "Adding stanza (type {}) from '{}' to queue, to be sent to peer as soon as federation has been established.", stanza.getClass().getSimpleName(), sender.getUserAddress() ); 1736 queue.add( new QueuedStanza( stanza, sender, result ) ); 1737 1738 return result; 1739 } 1740 1741 /** 1742 * Retrieve and clean the list of stanzas that have been queued after federation was initiated, but before it 1743 * was finished. 1744 * 1745 * @return A list of queued stanzas (possibly empty). 1746 */ purgeQueue()1747 public synchronized List<QueuedStanza> purgeQueue() { 1748 Log.trace( "Purging queue (size: {}) of stanzas to be sent to peer as soon as federation has been established.", queue.size() ); 1749 final List<QueuedStanza> result = new ArrayList<>( queue ); 1750 queue.clear(); 1751 return result; 1752 } 1753 isJoinComplete()1754 public synchronized boolean isJoinComplete() { 1755 return joinResult != null; 1756 } 1757 isRejected()1758 public synchronized boolean isRejected() { 1759 return joinResult != null && !joinResult; 1760 } 1761 getRejectionMessage()1762 public synchronized String getRejectionMessage() 1763 { 1764 if (!isRejected()) { 1765 throw new IllegalStateException( "Cannot get rejection message, as rejection did not occur." ); 1766 } 1767 1768 final Packet stanza = this.responses.get( this.responses.size() -1 ); 1769 final Element fmuc = stanza.getElement().element(FMUC); 1770 return fmuc.elementText("reject"); 1771 } 1772 isSuccessful()1773 public synchronized boolean isSuccessful() { 1774 return joinResult != null && joinResult; 1775 } 1776 abort()1777 synchronized void abort() 1778 { 1779 Log.trace( "Aborting federation attempt." ); 1780 1781 joinResult = false; 1782 1783 // Messages that are queued to be sent after federation has been established might have threads blocking on that delivery. That now will no longer happen. Make sure that we unblock all threads waiting for such an echo. 1784 if ( !queue.isEmpty() ) 1785 { 1786 Log.trace("Completing {} callbacks for queued stanzas might be waiting for federation to be established.", queue.size() ); 1787 for ( final QueuedStanza pendingCallback : queue ) 1788 { 1789 try { 1790 pendingCallback.future.complete( null ); // TODO maybe completeExceptionally? 1791 } catch ( Exception e ) { 1792 Log.warn("An exception occurred while completing callback for a queued message.", e); 1793 } 1794 } 1795 } 1796 callback.completeExceptionally( new IllegalStateException( "Federation with peer " + peer + " has been aborted.") ); 1797 } 1798 1799 static class QueuedStanza { 1800 final Packet stanza; 1801 final MUCRole sender; 1802 final CompletableFuture<?> future; 1803 QueuedStanza(final Packet stanza, final MUCRole sender, final CompletableFuture<?> future )1804 QueuedStanza(final Packet stanza, final MUCRole sender, final CompletableFuture<?> future ) { 1805 this.stanza = stanza; 1806 this.sender = sender; 1807 this.future = future; 1808 } 1809 } 1810 } 1811 1812 /** 1813 * Represents a callback waiting for a stanza to be echo'd back from a remote FMUC node. 1814 */ 1815 static class PendingCallback { 1816 1817 final CompletableFuture<?> callback; 1818 final Class<? extends Packet> type; 1819 final JID remoteFMUCNode; 1820 final List<Element> elements; 1821 PendingCallback( @onnull S original, @Nonnull CompletableFuture<?> callback )1822 public <S extends Packet> PendingCallback( @Nonnull S original, @Nonnull CompletableFuture<?> callback ) { 1823 if (!hasFMUCElement(original)) { 1824 throw new IllegalArgumentException( "Provided stanza must be a stanza that is sent to a remote FMUC node, but was not (the FMUC child element is missing): " + original ); 1825 } 1826 this.type = getType( original ); 1827 this.remoteFMUCNode = original.getTo().asBareJID(); 1828 this.elements = original.getElement().elements(); 1829 this.callback = callback; 1830 } 1831 complete()1832 private void complete() { 1833 callback.complete( null ); 1834 } 1835 isMatch( @onnull S stanza )1836 public <S extends Packet> boolean isMatch( @Nonnull S stanza ) 1837 { 1838 if (!hasFMUCElement(stanza)) { 1839 throw new IllegalArgumentException( "Provided stanza must be a stanza that was sent by remote FMUC node, but was not (the FMUC child element is missing): " + stanza ); 1840 } 1841 1842 if (!stanza.getClass().equals(type)) { 1843 return false; 1844 } 1845 1846 if (!stanza.getFrom().asBareJID().equals( remoteFMUCNode )) { 1847 return false; 1848 } 1849 1850 // All child elements of the echo'd stanza must equal the original. 1851 return elements.equals( stanza.getElement().elements() ); 1852 } 1853 hasFMUCElement( @onnull Packet stanza )1854 protected static boolean hasFMUCElement( @Nonnull Packet stanza ) { 1855 return stanza.getElement().element(FMUC) != null; 1856 } 1857 getType( @onnull Packet stanza )1858 protected static Class<? extends Packet> getType( @Nonnull Packet stanza ) { 1859 return stanza.getClass(); 1860 } 1861 } 1862 } 1863