1 /*
2  * Copyright (C) 2020-2021 Ignite Realtime Foundation. All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.jivesoftware.openfire.muc.spi;
17 
18 import org.dom4j.Element;
19 import org.dom4j.QName;
20 import org.jivesoftware.openfire.XMPPServer;
21 import org.jivesoftware.openfire.muc.*;
22 import org.jivesoftware.util.SystemProperty;
23 import org.jivesoftware.util.XMPPDateTimeFormat;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import org.xmpp.packet.*;
27 
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import java.io.Serializable;
31 import java.text.ParseException;
32 import java.util.*;
33 import java.util.concurrent.*;
34 import java.util.concurrent.locks.Lock;
35 
36 // TODO: monitor health of s2s connection, somehow?
37 public class FMUCHandler
38 {
39     private static final Logger Log = LoggerFactory.getLogger( FMUCHandler.class );
40 
41     public static final SystemProperty<Boolean> FMUC_ENABLED = SystemProperty.Builder.ofType(Boolean.class)
42         .setKey("xmpp.muc.room.fmuc.enabled")
43         .setDynamic(true)
44         .setDefaultValue(false)
45         .addListener( isEnabled -> {
46             XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatServices().forEach(
47                 service -> service.getAllRoomNames().forEach(
48                     name -> {
49                         final Lock lock = service.getChatRoomLock(name);
50                         lock.lock();
51                         try {
52                             final MUCRoom room = service.getChatRoom(name);
53                             room.getFmucHandler().applyConfigurationChanges();
54 
55                             // Ensure that other cluster nodes see the changes applied above.
56                             service.syncChatRoom(room);
57                         } finally {
58                             lock.unlock();
59                         }
60                     }
61                 )
62             );
63         })
64         .build();
65 
66     /**
67      * Qualified name of the element that denotes FMUC functionality, as specified by XEP-0289.
68      */
69     public static final QName FMUC = QName.get("fmuc", "http://isode.com/protocol/fmuc");
70 
71     /**
72      * The room for which this handler operates.
73      */
74     private final MUCRoom room;
75 
76     /**
77      * Indicates if FMUC functionality has been started.
78      */
79     private boolean isStarted;
80 
81     /**
82      * State that indicates if the local room has been configured to actively start joining a remote room.
83      */
84     private OutboundJoinConfiguration outboundJoinConfiguration;
85 
86     /**
87      * Tracks state while the outbound join is being set up.
88      */
89     private OutboundJoinProgress outboundJoinProgress;
90 
91     /**
92      * The address of the remote rooms that the local room has connected to (in which the remote room has taken the
93      * role of 'joined FMUC node' while the local room has taken the role of the 'joining FMUC node').
94      */
95     private OutboundJoin outboundJoin;
96 
97     /**
98      * The addresses of the remote rooms that have connected to the local room (in which the remote rooms have taken the
99      * role of 'joining FMUC nodes' while the local room has taken the role of the 'joined FMUC node').
100      */
101     private Map<JID, InboundJoin> inboundJoins = new HashMap<>();
102 
FMUCHandler(@onnull MUCRoom chatroom)103     public FMUCHandler(@Nonnull MUCRoom chatroom) {
104         this.room = chatroom;
105     }
106 
107     /**
108      * Starts federation, which will cause a federation attempt with the outbound ('joined') node, if one is configured,
109      * and there currently are occupants in the room.
110      */
startOutbound()111     public synchronized void startOutbound()
112     {
113         Log.debug( "(room: '{}'): FMUC outbound federation is being started...", room.getJID() );
114 
115         final Collection<MUCRole> occupants = room.getOccupants();
116         if ( occupants.isEmpty() ) {
117             Log.trace("(room: '{}'): No occupants in the room. No need to initiate an FMUC join.", room.getJID());
118         } else {
119             Log.trace("(room: '{}'): {} occupant(s) in the room. Initiating an FMUC join for each of them.", room.getJID(), occupants.size());
120             for ( final MUCRole occupant : occupants ) {
121                 try {
122                     Log.trace("(room: '{}'): Making occupant '{}' join the FMUC room.", room.getJID(), occupant.getUserAddress());
123                     join( occupant, false, true );
124                 } catch ( Exception e ) {
125                     Log.trace("(room: '{}'): An exception occurred while making occupant '{}' join the FMUC room.", room.getJID(), occupant.getUserAddress(), e);
126                 }
127             }
128         }
129         Log.debug( "(room: '{}'): Finished starting FMUC outbound federation.", room.getJID() );
130     }
131 
132     /**
133      * Stops federation, which will cause any joined and joining nodes to be disconnected.
134      */
stop()135     public synchronized void stop()
136     {
137         Log.debug( "(room: '{}'): FMUC federation is being stopped...", room.getJID() );
138 
139         // Keep track of all occupants that are becoming unavailable due to them having joined on remote nodes.
140         final Set<JID> removedRemoteOccupants = new HashSet<>();
141 
142         final Set<JID> removedOccupantsInbound = doStopInbound();
143         final Set<JID> removedOccupantsOutbound = doStopOutbound();
144         removedRemoteOccupants.addAll( removedOccupantsInbound );
145         removedRemoteOccupants.addAll( removedOccupantsOutbound );
146 
147         Log.trace( "(room: '{}'): Done disconnecting inbound and outbound nodes from the node set. Now removing all their ({}) occupants from the room.", room.getJID(), removedRemoteOccupants.size() );
148         makeRemoteOccupantLeaveRoom( removedRemoteOccupants );
149 
150         isStarted = false;
151         Log.debug( "(room: '{}'): Finished stopping FMUC federation.", room.getJID() );
152     }
153 
154     /**
155      * Stops inbound federation, which will cause existing federation with all of the inbound ('joining') nodes, if any
156      * are established, to be teared down.
157      */
stopInbound()158     public synchronized void stopInbound() {
159         final Set<JID> removedOccupants = doStopInbound();
160 
161         Log.trace( "(room: '{}'): Removing all ({}) occupants from the room for remote inbound node(s) that we just disconnected from.", room.getJID(), removedOccupants.size() );
162         makeRemoteOccupantLeaveRoom( removedOccupants );
163 
164         Log.debug( "(room: '{}'): Finished stopping inbound FMUC federation.", room.getJID() );
165     }
166 
167     /**
168      * Stops inbound federation, which will cause existing federation with one specific inbound ('joining') nodes to be
169      * teared down.
170      *
171      * @param peer the address of the remote node (must be a bare JID).
172      */
stopInbound( @onnull JID peer )173     public synchronized void stopInbound( @Nonnull JID peer )
174     {
175         if ( !peer.asBareJID().equals(peer) ) {
176             throw new IllegalArgumentException( "Expected argument 'peer' to be a bare JID, but it was not: " + peer );
177         }
178 
179         final Set<JID> removedOccupants = doStopInbound( peer );
180 
181         Log.trace( "(room: '{}'): Removing all ({}) occupants from the room for remote inbound node '{}' that we just disconnected from.", room.getJID(), removedOccupants.size(), peer );
182         makeRemoteOccupantLeaveRoom( removedOccupants );
183 
184         Log.debug( "(room: '{}'): Finished stopping inbound FMUC federation.", room.getJID() );
185     }
186 
187     /**
188      * The workhorse implementation of stopping inbound federation, that returns a list of JIDs representing the
189      * occupants from the remote, joining nodes that are no longer in the room as a result of stopping the federation. For
190      * these occupants, a presence stanza should be shared with the remainder of the occupants. It is desirable to send
191      * these stanzas only after all any other nodes that are to be disconnected have been disconnected (to prevent
192      * sharing updates with other remote nodes that we might also be disconnecting from. This method does
193      * therefor not send these stanzas. Instead, it returns the addresses that should be send stanzas. This allows
194      * callers to aggregate addresses, and send the the stanzas in one iteration.
195      *
196      * The implementation in this method will inform the remote, joining node that they left the local, joined node by
197      * sending it a 'left' message.
198      *
199      * @return The JIDs of occupants on the remote, joining node that are no longer in the room.
200      */
doStopInbound()201     private synchronized Set<JID> doStopInbound() {
202         return doStopInbound( null );
203     }
204 
205     /**
206      * Identical to {@link #doStopInbound()} but used to disconnect just one node, instead of all of them.
207      *
208      * When null is passed as an argument, all inbound nodes are disconnected (equivalent to a call to {@link #doStopInbound()}).
209      *
210      * @param peer the address of the remote node (must be a bare JID), or null to remove all nodes.
211      * @return The JIDs of occupants on the remote, joining node that are no longer in the room.
212      * @see #doStopInbound()
213      */
doStopInbound( @ullable JID peer )214     private synchronized Set<JID> doStopInbound( @Nullable JID peer ) {
215         if ( peer != null && !peer.asBareJID().equals(peer) ) {
216             throw new IllegalArgumentException( "Expected argument 'peer' to be null or a bare JID, but it was not: " + peer );
217         }
218 
219         final Set<JID> result = new HashSet<>();
220         if ( inboundJoins.isEmpty() ) {
221             Log.trace( "(room: '{}'): No remote MUC joining us. No need to inform joining nodes that they have now left.", room.getJID() );
222         } else {
223             final Iterator<Map.Entry<JID, InboundJoin>> iterator = inboundJoins.entrySet().iterator();
224             while ( iterator.hasNext() )
225             {
226                 final InboundJoin inboundJoin = iterator.next().getValue();
227                 if ( peer != null && !inboundJoin.getPeer().equals( peer )) {
228                     // This is not the peer you're looking for.
229                     continue;
230                 }
231                 iterator.remove(); // Remove inboundJoin so that it's no longer send stanzas, and incoming stanzas are being treated as if from an unconnected entity.
232 
233                 result.addAll( inboundJoin.occupants );
234 
235                 try
236                 {
237                     Log.trace("(room: '{}'): Informing joining node '{}' that it is leaving the FMUC node set.", room.getJID(), inboundJoin.getPeer());
238                     final Presence left = new Presence();
239                     left.setFrom(room.getJID());
240                     left.setTo(inboundJoin.getPeer());
241                     left.getElement().addElement(FMUC).addElement("left");
242                     XMPPServer.getInstance().getPacketRouter().route(left);
243                 }
244                 catch ( Exception e )
245                 {
246                     Log.warn("(room: '{}'): An exception occurred while informing joining node '{}' that it is leaving the FMUC node set.", room.getJID(), inboundJoin.getPeer(), e);
247                 }
248             }
249         }
250         return result;
251     }
252 
253     /**
254      * Stops outbound federation, which will cause existing federation with the outbound ('joined') node, if one is
255      * established, to be teared down.
256      */
stopOutbound()257     public synchronized void stopOutbound()
258     {
259         final Set<JID> removedOccupants = doStopOutbound();
260 
261         Log.trace("(room: '{}'): Removing all ({}) occupants from the room for remote outbound node that we just disconnected from.", room.getJID(), removedOccupants.size());
262         makeRemoteOccupantLeaveRoom( removedOccupants );
263 
264         Log.debug( "(room: '{}'): Finished stopping outbound FMUC federation.", room.getJID() );
265     }
266 
267     /**
268      * The workhorse implementation of stopping outbound federation, that returns a list of JIDs representing the
269      * occupants from the remote, joined node that are no longer in the room as a result of stopping the federation. For
270      * these occupants, a presence stanza should be shared with the remainder of the occupants. It is desirable to send
271      * these stanzas only after all any other nodes that are to be disconnected have been disconnected (to prevent
272      * sharing updates with other remote nodes that we might also be disconnecting from. This method does
273      * therefor not send these stanzas. Instead, it returns the addresses that should be send stanzas. This allows
274      * callers to aggregate addresses, and send the the stanzas in one iteration.
275      *
276      * The implementation in this method will inform the remote, joined node that the local, joining node has left, by
277      * sending it presence stanzas for all occupants that the local, joining node has contributed to the FMUC set. After
278      * the remote, joined, node has received the last stanza, it should conclude that the local, joining node has left
279      * (it should respond with a 'left' message, although this implementation does not depend on that).
280      *
281      * @return The JIDs of occupants on the remote, joined node that are no longer in the room.
282      */
doStopOutbound()283     private synchronized Set<JID> doStopOutbound()
284     {
285         Log.debug("(room: '{}'): Stopping federation with remote node that we joined (if any).", room.getJID());
286         final Set<JID> result = new HashSet<>();
287 
288         if ( outboundJoinProgress == null ) {
289             Log.trace("(room: '{}'): We are not in progress of joining a remote node. No need to abort such an effort.", room.getJID());
290         } else {
291             Log.trace("(room: '{}'): Aborting the ongoing effort of joining remote node '{}'.", room.getJID(), outboundJoinProgress.getPeer());
292             abortOutboundJoinProgress();
293         }
294 
295         if ( outboundJoin == null) {
296             Log.trace("(room: '{}'): We did not join a remote node. No need to inform one that we have left.", room.getJID());
297         } else {
298             Log.trace("(room: '{}'): Informing joined node '{}' that we are leaving the FMUC node set.", room.getJID(), outboundJoin.getPeer());
299 
300             // Remove outboundJoin so that it's no longer send stanzas, and incoming stanzas are being treated as if from an unconnected entity.
301             // We'll need to store some state to be able to process things later though.
302             final JID peer = outboundJoin.getPeer();
303             final Set<PendingCallback> pendingEcho = outboundJoin.pendingEcho;
304             final Set<JID> theirOccupants = outboundJoin.occupants;
305 
306             outboundJoin = null;
307 
308             result.addAll( theirOccupants );
309 
310             // If we're waiting for an echo of a stanza that we've sent to this MUC, that now will no longer arrive. Make sure that we unblock all threads waiting for such an echo.
311             if ( !pendingEcho.isEmpty() )
312             {
313                 Log.trace("(room: '{}'): Completing {} callbacks that were waiting for an echo from peer '{}' that is being disconnected from.", room.getJID(), pendingEcho.size(), peer );
314                 for ( final PendingCallback pendingCallback : pendingEcho )
315                 {
316                     try {
317                         pendingCallback.complete(); // TODO maybe completeExceptionally?
318                     } catch ( Exception e ) {
319                         Log.warn("(room: '{}'): An exception occurred while completing callback pending echo from peer '{}' (that we're disconnecting from).", room.getJID(), peer, e);
320                     }
321                 }
322             }
323 
324             // Find all the occupants that the local node contributed to the FMUC set (those are the occupants that are
325             // not joined through the remote, joined node). Note that these can include occupants that are on other nodes!
326             final Set<MUCRole> occupantsToLeave = new HashSet<>( room.getOccupants() );
327             occupantsToLeave.removeIf( mucRole -> mucRole.getReportedFmucAddress() != null && theirOccupants.contains( mucRole.getReportedFmucAddress() ));
328             Log.trace("(room: '{}'): Identified {} occupants that the local node contributed to the FMUC set.", room.getJID(), occupantsToLeave.size());
329 
330             // Inform the remote, joined node that these are now all gone.
331             for ( final MUCRole occupantToLeave : occupantsToLeave ) {
332                 try
333                 {
334                     Log.trace("(room: '{}'): Informing joined node '{}' that occupant '{}' left the MUC.", room.getJID(), peer, occupantToLeave.getUserAddress());
335 
336                     final Presence leave = occupantToLeave.getPresence().createCopy();
337                     leave.setType(Presence.Type.unavailable);
338                     leave.setTo(new JID(peer.getNode(), peer.getDomain(), occupantToLeave.getNickname()));
339                     leave.setFrom(occupantToLeave.getRoleAddress());
340 
341                     // Change (or add) presence information about roles and affiliations
342                     Element childElement = leave.getChildElement("x", "http://jabber.org/protocol/muc#user");
343                     if ( childElement == null )
344                     {
345                         childElement = leave.addChildElement("x", "http://jabber.org/protocol/muc#user");
346                     }
347                     Element item = childElement.element("item");
348                     if ( item == null )
349                     {
350                         item = childElement.addElement("item");
351                     }
352                     item.addAttribute("role", "none");
353 
354                     final Presence enriched = enrichWithFMUCElement(leave, occupantToLeave.getReportedFmucAddress() != null ? occupantToLeave.getReportedFmucAddress() : occupantToLeave.getUserAddress());
355 
356                     XMPPServer.getInstance().getPacketRouter().route(enriched);
357                 } catch ( Exception e ) {
358                     Log.warn("(room: '{}'): An exception occurred while informing joined node '{}' that occupant '{}' left the MUC.", room.getJID(), peer, occupantToLeave.getUserAddress(), e);
359                 }
360             }
361         }
362 
363         Log.trace("(room: '{}'): Finished stopping federation with remote node that we joined (if any).", room.getJID());
364         return result;
365     }
366 
367     /**
368      * Reads configuration from the room instance that is being services by this handler, and applies relevant changes.
369      */
applyConfigurationChanges()370     public synchronized void applyConfigurationChanges()
371     {
372         if ( isStarted != (room.isFmucEnabled() && FMUC_ENABLED.getValue()) ) {
373             Log.debug( "(room: '{}'): Changing availability of FMUC functionality to {}.", room.getJID(), (room.isFmucEnabled() && FMUC_ENABLED.getValue()) );
374             if ((room.isFmucEnabled() && FMUC_ENABLED.getValue())) {
375                 startOutbound();
376             } else {
377                 stop();
378                 return;
379             }
380         }
381 
382         final OutboundJoinConfiguration desiredConfig;
383         if ( room.getFmucOutboundNode() != null ) {
384             desiredConfig = new OutboundJoinConfiguration( room.getFmucOutboundNode(), room.getFmucOutboundMode() );
385         } else {
386             desiredConfig = null;
387         }
388         Log.debug("(room: '{}'): Changing outbound join configuration. Existing: {}, New: {}", room.getJID(), this.outboundJoinConfiguration, desiredConfig );
389 
390         if ( this.outboundJoinProgress != null ) {
391             if (desiredConfig == null) {
392                 Log.trace( "(room: '{}'): Had, but now no longer has, outbound join configuration. Aborting ongoing federation attempt...", room.getJID() );
393                 abortOutboundJoinProgress();
394             } else if ( this.outboundJoinProgress.getPeer().equals( desiredConfig.getPeer() ) ) {
395                 Log.trace( "(room: '{}'): New configuration matches peer that ongoing federation attempt is made with. Allowing attempt to continue.", room.getJID() );
396             } else {
397                 Log.trace( "(room: '{}'): New configuration targets a different peer that ongoing federation attempt is made with. Aborting attempt.", room.getJID() );
398                 abortOutboundJoinProgress();
399             }
400         }
401 
402         if ( this.outboundJoinConfiguration == null && desiredConfig != null ) {
403             Log.trace( "(room: '{}'): Did not, but now has, outbound join configuration. Starting federation...", room.getJID() );
404             this.outboundJoinConfiguration = desiredConfig;
405             startOutbound();
406         } else if ( this.outboundJoinConfiguration != null && desiredConfig == null ) {
407             Log.trace( "(room: '{}'): Had, but now no longer has, outbound join configuration. Stopping federation...", room.getJID() );
408             this.outboundJoinConfiguration = desiredConfig;
409             stopOutbound();
410         } else if ( this.outboundJoinConfiguration != null && desiredConfig != null ) {
411             if ( outboundJoin == null ) {
412                 if ( this.outboundJoinConfiguration.equals(desiredConfig ) ) {
413                     // no change
414                 } else {
415                     Log.trace( "(room: '{}'): Applying new configuration.", room.getJID() );
416                     this.outboundJoinConfiguration = desiredConfig;
417                     startOutbound();
418                 }
419             } else {
420                 if ( outboundJoin.getConfiguration().equals( desiredConfig ) ) {
421                     Log.trace( "(room: '{}'): New configuration matches configuration of established federation. Not applying any change.", room.getJID() );
422                 } else {
423                     Log.trace( "(room: '{}'): Already had outbound join configuration, now got a different config. Restarting federation...", room.getJID() );
424                     stopOutbound();
425                     this.outboundJoinConfiguration = desiredConfig;
426                     startOutbound();
427                 }
428             }
429         }
430         isStarted = true;
431     }
432 
433     /**
434      * Propagates a stanza to the FMUC set, if FMUC is active for this room.
435      *
436      * Note that when a master-slave mode is active, we need to wait for an echo back, before the message can be
437      * broadcasted locally. This method will return a CompletableFuture object that is completed as soon as processing
438      * can continue. This doesn't necessarily mean that processing/propagating has been completed (eg: when the FMUC
439      * is configured to use master-master mode, a completed Future instance will be returned.
440      *
441      * When FMUC is not active, this method will return a completed Future instance.
442      *
443      * @param stanza the stanza to be propagated through FMUC.
444      * @param sender the role of the sender that is the original author of the stanza.
445      * @return A future object that completes when the stanza can be propagated locally.
446      */
propagate( @onnull Packet stanza, @Nonnull MUCRole sender )447     public synchronized CompletableFuture<?> propagate( @Nonnull Packet stanza, @Nonnull MUCRole sender )
448     {
449         if ( !(room.isFmucEnabled() && FMUC_ENABLED.getValue()) ) {
450             Log.debug( "(room: '{}'): FMUC disabled, skipping FMUC propagation.", room.getJID() );
451             return CompletableFuture.completedFuture(null);
452         }
453 
454         Log.debug( "(room: '{}'): A stanza (type: {}, from: {}) is to be propagated in the FMUC node set.", room.getJID(), stanza.getClass().getSimpleName(), stanza.getFrom() );
455 
456         /* TODO this implementation currently is blocking, and synchronous: inbound propagation only occurs after outbound
457                 propagation has been started. Is this needed? What should happen if outbound propagation fails? When the
458                 mode used is master/slave, then it is reasonable to assume that a failed outbound propagation makes the
459                 message propagation fail completely (and thus should not be propagated to the inbound nodes or locally?
460         */
461         // propagate to joined FMUC node (conditionally blocks, depending on FMUC mode that's in effect).
462         final CompletableFuture<?> propagateToOutbound = propagateOutbound( stanza, sender );
463 
464         // propagate to all joining FMUC nodes (need never block).
465         final CompletableFuture<?> propagateToInbound = propagateInbound( stanza, sender );
466 
467         // Return a Future that completes when all of the Futures constructed above complete.
468         return CompletableFuture.allOf( propagateToOutbound, propagateToInbound );
469     }
470 
471     /**
472      * Makes a user in our XMPP domain join the FMUC room.
473      *
474      * The join event is propagated to all nodes in the FMUC set that the room is part of.
475      *
476      * When 'outbound' federation is desired, but has not yet been established (when this is the first user to join the
477      * room), federation is initiated.
478      *
479      * Depending on the configuration and state of the FMUC set, the join operation is considered 'blocking'. In case of
480      * a FMUC based on master-slave mode, for example, the operation cannot continue until the remote FMUC node has
481      * echo'd back the join. To accommodate this behavior, this method will return a Future instance. The Future will
482      * immediately be marked as 'done' when the operation need not have 'blocking' behavior. Note that in such cases,
483      * the Future being 'done' does explicitly <em>not</em> indicate that the remote FMUC node has received, processed
484      * and/or accepted the event.
485      *
486      * If the local room is not configured to federate with another room, an invocation of this method will do nothing.
487      *
488      * @param mucRole The role in which the user is joining the room.
489      * @return A future object that completes when the stanza can be propagated locally.
490      */
join( @onnull MUCRole mucRole )491     public synchronized Future<?> join( @Nonnull MUCRole mucRole )
492     {
493         return join( mucRole, true, true );
494     }
495 
join(@onnull MUCRole mucRole, final boolean includeInbound, final boolean includeOutbound )496     protected synchronized Future<?> join(@Nonnull MUCRole mucRole, final boolean includeInbound, final boolean includeOutbound )
497     {
498         if ( !(room.isFmucEnabled() && FMUC_ENABLED.getValue()) ) {
499             Log.debug( "(room: '{}'): FMUC disabled, skipping FMUC join.", room.getJID() );
500             return CompletableFuture.completedFuture(null);
501         }
502 
503         Log.debug( "(room: '{}'): user '{}' (as '{}') attempts to join.", room.getJID(), mucRole.getUserAddress(), mucRole.getRoleAddress() );
504 
505         final CompletableFuture<?> propagateToOutbound;
506         if ( !includeOutbound ) {
507             Log.trace( "(room: '{}'): skip propagating to outbound, as instructed.", room.getJID() );
508             propagateToOutbound = CompletableFuture.completedFuture(null);
509         } else {
510             // Do we need to initiate a new outbound federation (are we configured to have an outbound federation, but has one not been started yet?
511             if ( outboundJoinConfiguration != null) {
512                 if ( outboundJoin == null ) {
513                     if ( outboundJoinProgress == null ) {
514                         Log.trace("(room: '{}'): FMUC configuration contains configuration for a remote MUC that needs to be joined: {}", room.getJID(), outboundJoinConfiguration.getPeer() );
515                         // When a new federation is established, there's no need to explicitly propagate the join too - that's implicitly done as part of the initialization of the new federation.
516                         propagateToOutbound = initiateFederationOutbound( mucRole );
517                     } else {
518                         Log.debug("(room: '{}'): Received a FMUC 'join' request for a remote MUC that we're already in process of joining: {}", room.getJID(), outboundJoinConfiguration.getPeer() );
519                         return outboundJoinProgress.addToQueue( generateJoinStanza( mucRole ), mucRole ); // queue a new join stanza to be sent after the ongoing join completes.
520                     }
521                 }
522                 else
523                 {
524                     // TODO Doesn't this imply some kind of problem - why would we be joining a MUC that we've already joined?
525                     Log.warn("(room: '{}'): FMUC configuration contains configuration for a remote MUC: {}. Federation with this MUC has already been established.", room.getJID(), outboundJoin.getPeer() );
526                     // propagate to existing the existing joined FMUC node (be blocking if master/slave mode!)
527                     propagateToOutbound = propagateOutbound( generateJoinStanza( mucRole ), mucRole );
528                 }
529             } else {
530                 // Nothing to do!
531                 Log.trace( "(room: '{}'): FMUC configuration does not contain a remote MUC that needs to be joined.", room.getJID() );
532                 propagateToOutbound = CompletableFuture.completedFuture(null);
533             }
534         }
535 
536         /* TODO this implementation currently is blocking, and synchronous: inbound propagation only occurs after outbound
537                 propagation has been started. Is this needed? What should happen if outbound propagation fails? When the
538                 mode used is master/slave, then it is reasonable to assume that a failed outbound join makes the join fail
539                 (and thus should not be propagated to the inbound nodes. */
540 
541         // propagate to all joining FMUC nodes (need never block).
542         final CompletableFuture<?> propagateToInbound;
543         if ( !includeInbound ) {
544             Log.trace( "(room: '{}'): skip propagating to inbound, as instructed.", room.getJID() );
545             propagateToInbound = CompletableFuture.completedFuture(null);
546         } else {
547             propagateToInbound = propagateInbound( generateJoinStanza( mucRole ), mucRole );
548         }
549 
550         // Return a Future that completes when all of the Futures constructed above complete.
551         return CompletableFuture.allOf( propagateToOutbound, propagateToInbound );
552     }
553 
554     /**
555      * Attempt to establish a federation with a remote MUC room. In this relation 'our' room will take the role of
556      * 'joining FMUC node'.
557      *
558      * @param mucRole Occupant that joined the room, triggering the federation to be initiated.
559      * @return A future object that completes when the join can be propagated locally.
560      */
initiateFederationOutbound( @onnull MUCRole mucRole )561     private CompletableFuture<?> initiateFederationOutbound( @Nonnull MUCRole mucRole )
562     {
563         Log.debug("(room: '{}'): Attempting to establish federation by joining '{}', triggered by user '{}' (as '{}').", room.getJID(), outboundJoinConfiguration.getPeer(), mucRole.getUserAddress(), mucRole.getRoleAddress() );
564 
565         final Presence joinStanza = enrichWithFMUCElement( generateJoinStanza( mucRole ), mucRole );
566         joinStanza.setFrom( new JID(room.getName(), room.getMUCService().getServiceDomain(), mucRole.getNickname() ) );
567         joinStanza.setTo( new JID(outboundJoinConfiguration.getPeer().getNode(), outboundJoinConfiguration.getPeer().getDomain(), mucRole.getNickname() ) );
568 
569         Log.trace("(room: '{}'): Registering a callback to be used when the federation request to '{}' has completed.", room.getJID(), outboundJoinConfiguration.getPeer() );
570         final CompletableFuture<List<Packet>> result = new CompletableFuture<>();
571         final boolean mustBlock = outboundJoinConfiguration.getMode() == FMUCMode.MasterSlave;
572         if ( !mustBlock ) {
573             Log.trace("(room: '{}'): No need to wait for federation to complete before allowing the local user to join the room.", room.getJID() );
574             result.complete( null );
575         } else {
576             Log.trace("(room: '{}'): Federation needs to have been completed before allowing the local user to join the room.", room.getJID() );
577         }
578 
579         outboundJoinProgress = new OutboundJoinProgress(outboundJoinConfiguration.getPeer(), result );
580 
581         Log.trace( "(room: '{}'): Sending FMUC join request: {}", room.getJID(), joinStanza.toXML() );
582         XMPPServer.getInstance().getPacketRouter().route(joinStanza);
583 
584         return result;
585     }
586 
587     /**
588      * Sends a stanza to the joined FMUC node, when the local node has established such an outbound join.
589      *
590      * Note that when a master-slave mode is active, we need to wait for an echo back, before the message can be
591      * broadcasted locally. This method will return a CompletableFuture object that is completed as soon as processing
592      * can continue. This doesn't necessarily mean that processing/propagating has been completed (eg: when the FMUC
593      * is configured to use master-master mode, a completed Future instance will be returned.
594      *
595      * @param stanza The stanza to be sent.
596      * @param sender Representation of the sender of the stanza.
597      * @return A future object that completes when the stanza can be propagated locally.
598      */
propagateOutbound( @onnull Packet stanza, @Nonnull MUCRole sender )599     private CompletableFuture<?> propagateOutbound( @Nonnull Packet stanza, @Nonnull MUCRole sender )
600     {
601         Log.trace("(room: '{}'): Propagate outbound, stanza: {}, sender: {}", room.getJID(), stanza, sender);
602 
603         if ( outboundJoin == null )
604         {
605             if ( outboundJoinProgress != null ) {
606                 Log.trace("(room: '{}'): Remote MUC joining in progress. Queuing outbound propagation until after the join has been established.", room.getJID());
607                 return outboundJoinProgress.addToQueue( stanza, sender );
608             }
609             else
610             {
611                 Log.trace("(room: '{}'): No remote MUC joined. No need to propagate outbound.", room.getJID());
612                 return CompletableFuture.completedFuture(null);
613             }
614         }
615 
616         if ( !outboundJoin.wantsStanzasSentBy( sender ) ) {
617             Log.trace("(room: '{}'): Skipping outbound propagation to peer '{}', as this peer needs not be sent stanzas sent by '{}' (potentially because it's a master-master mode joined FMUC and the sender originates on that node).", room.getJID(), outboundJoin.getPeer(), sender );
618             return CompletableFuture.completedFuture(null);
619         }
620 
621         final CompletableFuture<?> result = new CompletableFuture<>();
622         doPropagateOutbound( stanza, sender, result );
623 
624         return result;
625     }
626 
doPropagateOutbound(@onnull Packet stanza, @Nonnull MUCRole sender, @Nonnull CompletableFuture<?> result )627     private void doPropagateOutbound(@Nonnull Packet stanza, @Nonnull MUCRole sender, @Nonnull CompletableFuture<?> result )
628     {
629         Log.debug("(room: '{}'): Propagating a stanza (type '{}') from user '{}' (as '{}') to the joined FMUC node {}.", room.getJID(), stanza.getClass().getSimpleName(), sender.getUserAddress(), sender.getRoleAddress(), outboundJoin.getPeer() );
630 
631         final Packet enriched = enrichWithFMUCElement( stanza, sender );
632         enriched.setFrom( new JID(room.getName(), room.getMUCService().getServiceDomain(), sender.getNickname() ) );
633         enriched.setTo( new JID(outboundJoin.getPeer().getNode(), outboundJoin.getPeer().getDomain(), sender.getNickname() ) );
634 
635         // When we're in a master-slave mode with the remote FMUC node that we're joining to, we must wait for it
636         // to echo back the presence data, before we can distribute it in the local room.
637         final boolean mustBlock = outboundJoin.getMode() == FMUCMode.MasterSlave;
638         if ( !mustBlock ) {
639             Log.trace("(room: '{}'): No need to wait for an echo back from joined FMUC node {} of the propagation of stanza sent by user '{}' (as '{}').", room.getJID(), outboundJoin.getPeer(), sender.getUserAddress(), sender.getRoleAddress() );
640             result.complete( null );
641         } else {
642             Log.debug("(room: '{}'): An echo back from joined FMUC node {} of the propagation of stanza snet by user '{}' (as '{}') needs to be received before the join event can be propagated locally.", room.getJID(), outboundJoin.getPeer(), sender.getUserAddress(), sender.getRoleAddress() );
643 
644             // register callback to complete this future when echo is received back.
645             outboundJoin.registerEchoCallback( enriched, result );
646         }
647 
648         // Send the outbound stanza.
649         XMPPServer.getInstance().getPacketRouter().route( enriched );
650     }
651 
652     /**
653      * Sends a stanza to all joined FMUC node, when the local node has accepted such inbound joins from remote peers.
654      *
655      * Optionally, the address if one particular peer can be provided to avoid propagation of the stanza to that
656      * particular node. This is to be used to prevent 'echos back' to the originating FMUC node, when appropriate.
657      *
658      * @param stanza The stanza to be sent.
659      * @param sender Representation of the sender of the stanza.
660      * @return A future object that completes when the stanza can be propagated locally.
661      */
propagateInbound( @onnull Packet stanza, @Nonnull MUCRole sender )662     private CompletableFuture<?> propagateInbound( @Nonnull Packet stanza, @Nonnull MUCRole sender )
663     {
664         Log.trace("(room: '{}'): Propagate inbound, stanza: {}, sender: {}", room.getJID(), stanza, sender);
665         if ( inboundJoins.isEmpty() )
666         {
667             Log.trace("(room: '{}'): No remote MUC joining us. No need to propagate inbound.", room.getJID());
668             return CompletableFuture.completedFuture(null);
669         }
670 
671         Log.trace( "(room: '{}'): Propagating a stanza (type '{}') from user '{}' (as '{}') to the all {} joining FMUC nodes.", room.getJID(), stanza.getClass().getSimpleName(), sender.getUserAddress(), sender.getRoleAddress(), inboundJoins.size() );
672 
673         for( final InboundJoin inboundJoin : inboundJoins.values() )
674         {
675             if ( !inboundJoin.wantsStanzasSentBy( sender ) ) {
676                 Log.trace("(room: '{}'): Skipping inbound propagation to peer '{}', as this peer needs not be sent stanzas sent by '{}' (potentially because it's a master-slave mode joined FMUC and the sender originates on that node).", room.getJID(), inboundJoin.getPeer(), sender );
677                 continue;
678             }
679 
680             Log.trace( "(room: '{}'): Propagating a stanza (type '{}') from user '{}' (as '{}') to the joining FMUC node '{}'", room.getJID(), stanza.getClass().getSimpleName(), sender.getUserAddress(), sender.getRoleAddress(), inboundJoin.getPeer() );
681             final Packet enriched = enrichWithFMUCElement( stanza, sender );
682             enriched.setFrom( sender.getRoleAddress());
683             enriched.setTo( inboundJoin.getPeer() );
684             XMPPServer.getInstance().getPacketRouter().route( enriched );
685         }
686         return CompletableFuture.completedFuture(null);
687     }
688 
689     /**
690      * Adds an FMUC child element to the stanza, if such an element does not yet exist.
691      *
692      * This method provides the functionally opposite implementation of {@link #createCopyWithoutFMUC(Packet)}.
693      *
694      * @param stanza The stanza to which an FMUC child element is to be added.
695      * @param sender Representation of the originator of the stanza.
696      * @param <S> Type of stanza
697      * @return A copy of the stanza, with an added FMUC child element.
698      */
enrichWithFMUCElement( @onnull S stanza, @Nonnull MUCRole sender )699     private static <S extends Packet> S enrichWithFMUCElement( @Nonnull S stanza, @Nonnull MUCRole sender )
700     {
701         // Defensive copy - ensure that the original stanza (that might be routed locally) is not modified).
702         final S result = (S) stanza.createCopy();
703 
704         final Element fmuc = result.getElement().element(FMUC);
705         if ( fmuc != null ) {
706             return result;
707         }
708 
709         final JID from;
710         if ( sender.getRoleAddress().getResource() == null ) {
711             // This role represents the room itself as the sender. Rooms do not have a 'user' address.
712             from = sender.getRoleAddress();
713         } else {
714             from = sender.getUserAddress();
715         }
716         result.getElement().addElement( FMUC ).addAttribute( "from", from.toString() );
717 
718         return result;
719     }
720 
721     /**
722      * Adds an FMUC child element to the stanza, if such an element does not yet exist.
723      *
724      * This method provides the functionally opposite implementation of {@link #createCopyWithoutFMUC(Packet)}.
725      *
726      * @param stanza The stanza to which an FMUC child element is to be added.
727      * @param sender Representation of the originator of the stanza.
728      * @param <S> Type of stanza
729      * @return A copy of the stanza, with an added FMUC child element.
730      */
enrichWithFMUCElement( @onnull S stanza, @Nonnull JID sender )731     private static <S extends Packet> S enrichWithFMUCElement( @Nonnull S stanza, @Nonnull JID sender )
732     {
733         // Defensive copy - ensure that the original stanza (that might be routed locally) is not modified).
734         final S result = (S) stanza.createCopy();
735 
736         final Element fmuc = result.getElement().element(FMUC);
737         if ( fmuc != null ) {
738             return result;
739         }
740 
741         result.getElement().addElement( FMUC ).addAttribute( "from", sender.toString() );
742 
743         return result;
744     }
745 
746     /**
747      * Removes FMUC child elements from the stanza, if such an element exists.
748      *
749      * This method provides the functionally opposite implementation of {@link #enrichWithFMUCElement(Packet, MUCRole)}.
750      *
751      * @param stanza The stanza from which an FMUC child element is to be removed.
752      * @param <S> Type of stanza
753      * @return A copy of the stanza, without FMUC child element.
754      */
createCopyWithoutFMUC( S stanza )755     public static <S extends Packet> S createCopyWithoutFMUC( S stanza )
756     {
757         final S result = (S) stanza.createCopy();
758         final Iterator<Element> elementIterator = result.getElement().elementIterator(FMUC);
759         while (elementIterator.hasNext()) {
760             elementIterator.next();
761             elementIterator.remove();
762         }
763         return result;
764     }
765 
766     /**
767      * Creates a stanza that represents a room 'join' in a MUC room.
768      *
769      * @param mucRole Representation of the (local) user that caused the join to be initiated.
770      */
771     // TODO this does not have any FMUC specifics. Must this exist in this class?
generateJoinStanza( @onnull MUCRole mucRole )772     private Presence generateJoinStanza( @Nonnull MUCRole mucRole )
773     {
774         Log.debug( "(room: '{}'): Generating a stanza that represents the joining of local user '{}' (as '{}').", room.getJID(), mucRole.getUserAddress(), mucRole.getRoleAddress() );
775         final Presence joinStanza = new Presence();
776         joinStanza.getElement().addElement(QName.get("x", "http://jabber.org/protocol/muc"));
777         final Element mucUser = joinStanza.getElement().addElement(QName.get("x", "http://jabber.org/protocol/muc#user"));
778         final Element mucUserItem = mucUser.addElement("item");
779         mucUserItem.addAttribute("affiliation", mucRole.getAffiliation().toString());
780         mucUserItem.addAttribute("role", mucRole.getRole().toString());
781 
782         // Don't include the occupant's JID if the room is semi-anon and the new occupant is not a moderator
783         if (!room.canAnyoneDiscoverJID()) {
784             if (MUCRole.Role.moderator == mucRole.getRole()) {
785                 mucUserItem.addAttribute("jid", mucRole.getUserAddress().toString());
786             }
787             else {
788                 mucUserItem.addAttribute("jid", null);
789             }
790         }
791 
792         return joinStanza;
793     }
794 
process( @onnull final Packet stanza )795     public synchronized void process( @Nonnull final Packet stanza )
796     {
797         if ( !(room.isFmucEnabled() && FMUC_ENABLED.getValue()) ) {
798             Log.debug( "(room: '{}'): FMUC disabled, skipping processing of stanza: {}", room.getJID(), stanza.toXML() );
799             if ( stanza instanceof IQ && ((IQ) stanza).isRequest() ) {
800                 final IQ errorResult = IQ.createResultIQ( (IQ) stanza);
801                 errorResult.setError(PacketError.Condition.service_unavailable);
802                 XMPPServer.getInstance().getPacketRouter().route( errorResult );
803             }
804             return;
805         }
806 
807         Log.trace( "(room: '{}'): Processing stanza from '{}': {}", room.getJID(), stanza.getFrom(), stanza.toXML() );
808         final JID remoteMUC = stanza.getFrom().asBareJID();
809 
810         if ( stanza.getElement().element(FMUC) == null ) {
811             throw new IllegalArgumentException( "Unable to process stanza that does not have FMUC data: " + stanza.toXML() );
812         }
813 
814         if ( remoteMUC.getNode() == null ) {
815             throw new IllegalArgumentException( "Unable to process stanza that did not originate from a MUC room (the 'from' address has no node-part):" + stanza.toXML() );
816         }
817 
818         if ( outboundJoinProgress != null && outboundJoinProgress.getPeer().equals( remoteMUC ) && !outboundJoinProgress.isJoinComplete() )
819         {
820             Log.trace("(room: '{}'): Received stanza from '{}' that is identified as outbound FMUC node for which a join is in progress.", room.getJID(), remoteMUC);
821 
822             Log.trace("(room: '{}'): Queueing stanza from '{}' as partial FMUC join response.", room.getJID(), remoteMUC);
823             outboundJoinProgress.addResponse(stanza);
824 
825             if ( outboundJoinProgress.isJoinComplete() )
826             {
827                 Log.debug("(room: '{}'): Received a complete FMUC join response from '{}'.", room.getJID(), remoteMUC);
828                 finishOutboundJoin();
829                 outboundJoinProgress = null;
830             }
831         }
832         else if ( outboundJoin != null && outboundJoin.getPeer().equals( remoteMUC ) )
833         {
834             Log.trace("(room: '{}'): Received stanza from '{}' that is identified as outbound FMUC node.", room.getJID(), remoteMUC);
835             if ( stanza instanceof Presence && stanza.getElement().element(FMUC).element("left") != null ) {
836                 processLeftInstruction( (Presence) stanza );
837             } else {
838                 outboundJoin.evaluateForCallbackCompletion(stanza);
839                 processRegularMUCStanza( stanza );
840             }
841         }
842         else if ( inboundJoins.get( remoteMUC.asBareJID() ) != null )
843         {
844             Log.trace("(room: '{}'): Received stanza from '{}' that is identified as inbound FMUC node.", room.getJID(), remoteMUC);
845             processRegularMUCStanza( stanza );
846         }
847         else
848         {
849             Log.trace("(room: '{}'): Received stanza from '{}' that is not a known FMUC node.", room.getJID(), remoteMUC ); //Treating as inbound FMUC node join request.", room.getJID(), remoteMUC);
850             if ( isFMUCJoinRequest( stanza ) ) {
851                 try
852                 {
853                     checkAcceptingFMUCJoiningNodePreconditions(remoteMUC);
854                     acceptJoiningFMUCNode( (Presence) stanza );
855                 } catch ( final FMUCException e ) {
856                     rejectJoiningFMUCNode( (Presence) stanza, e.getMessage() );
857                 }
858             } else {
859                 Log.debug("(room: '{}'): Unable to process stanza from '{}'. Ignoring: {}", room.getJID(), remoteMUC, stanza.toXML() );
860                 if ( stanza instanceof IQ && ((IQ) stanza).isRequest() ) {
861                     final IQ errorResult = IQ.createResultIQ( (IQ) stanza);
862                     errorResult.setError(PacketError.Condition.service_unavailable);
863                     XMPPServer.getInstance().getPacketRouter().route( errorResult );
864                 }
865             }
866         }
867     }
868 
869     /**
870      * Process a 'left' notification that is sent to us by the remote joined node.
871      *
872      * All occupants of the room will be notified that the occupants that joined through the node that has disconnected
873      * us are no longer available (in a typical scenario, no such local occupants are expected to be in the room, as the
874      * 'left' notification should be trigged by the last occupant having left the room).
875      *
876      * @param stanza The stanza that informed us that the FMUC peer considers us disconnected.
877      */
processLeftInstruction( @onnull final Presence stanza )878     private void processLeftInstruction( @Nonnull final Presence stanza )
879     {
880         if ( stanza.getElement().element(FMUC) == null ) {
881             throw new IllegalArgumentException( "Unable to process stanza that does not have FMUC data: " + stanza.toXML() );
882         }
883 
884         Log.trace("(room: '{}'): FMUC peer '{}' informed us that we left the FMUC set.", room.getJID(), outboundJoin.getPeer() );
885 
886         // This *should* only occur after all of our local users have left the room. For good measure, send out
887         // 'leave' for all occupants from the now disconnected FMUC node anyway.
888         makeRemoteOccupantLeaveRoom( outboundJoin.occupants );
889 
890         outboundJoin = null;
891         outboundJoinProgress = null;
892     }
893 
894     /**
895      * Processes a stanza that is received from another node in the FMUC set, by translating it into 'regular' MUC data.
896      *
897      * The provided input is expected to be a stanza received from another node in the FMUC set. It is stripped from
898      * FMUC data, after which it is distributed to the local users.
899      *
900      * Additionally, it is sent out to all (other) FMUC nodes that are known.
901      *
902      * @param stanza The data to be processed.
903      */
processRegularMUCStanza( @onnull final Packet stanza )904     private void processRegularMUCStanza( @Nonnull final Packet stanza )
905     {
906         final Element fmuc = stanza.getElement().element(FMUC);
907         if (fmuc == null) {
908             throw new IllegalArgumentException( "Provided stanza must have an 'fmuc' child element (but does not).");
909         }
910 
911         final JID remoteMUC = stanza.getFrom().asBareJID();
912         final JID author = new JID( fmuc.attributeValue("from") ); // TODO input validation.
913         final MUCRole senderRole = room.getOccupantByFullJID( author );
914         Log.trace("(room: '{}'): Processing stanza from remote FMUC peer '{}' as regular room traffic. Sender of stanza: {}", room.getJID(), remoteMUC, author );
915 
916         // Distribute. Note that this will distribute both to the local node, as well as to all FMUC nodes in the the FMUC set.
917         if ( stanza instanceof Presence ) {
918             RemoteFMUCNode remoteFMUCNode = inboundJoins.get(remoteMUC);
919             if ( remoteFMUCNode == null && outboundJoin != null && remoteMUC.equals(outboundJoin.getPeer())) {
920                 remoteFMUCNode = outboundJoin;
921             }
922             if ( remoteFMUCNode != null )
923             {
924                 final boolean isLeave = ((Presence) stanza).getType() == Presence.Type.unavailable;
925                 final boolean isJoin = ((Presence) stanza).isAvailable();
926 
927                 if ( isLeave )
928                 {
929                     Log.trace("(room: '{}'): Occupant '{}' left room on remote FMUC peer '{}'", room.getJID(), author, remoteMUC );
930                     makeRemoteOccupantLeaveRoom( (Presence) stanza );
931                     remoteFMUCNode.removeOccupant(author); // Remove occupant only after the leave stanzas have been sent, otherwise the author is (no longer) recognized as an occupant of the particular node when the leave is being processed.
932 
933                     // The joined room confirms that the joining room has left the set by sending a presence stanza from the bare JID
934                     // of the joined room to the bare JID of the joining room with an FMUC payload containing an element 'left'.
935                     if ( remoteFMUCNode instanceof InboundJoin && remoteFMUCNode.occupants.isEmpty() ) {
936                         Log.trace("(room: '{}'): Last occupant that joined on remote FMUC peer '{}' has now left the room. The peer has left the FMUC node set.", room.getJID(), remoteMUC );
937                         final Presence leaveFMUCSet = new Presence();
938                         leaveFMUCSet.setTo( remoteMUC );
939                         leaveFMUCSet.setFrom( room.getJID() );
940                         leaveFMUCSet.getElement().addElement( FMUC ).addElement( "left" );
941                         inboundJoins.remove(remoteMUC);
942 
943                         XMPPServer.getInstance().getPacketRouter().route( leaveFMUCSet );
944                     }
945                 } else if ( isJoin ) {
946                     Log.trace("(room: '{}'): Occupant '{}' joined room on remote FMUC peer '{}'", room.getJID(), author, remoteMUC );
947                     remoteFMUCNode.addOccupant(author);
948                     makeRemoteOccupantJoinRoom( (Presence) stanza );
949                 } else {
950                     // FIXME implement sharing of presence.
951                     Log.error("Processing of presence stanzas received from other FMUC nodes is pending implementation! Ignored stanza: {}", stanza.toXML(), new UnsupportedOperationException());
952                 }
953             }
954             else
955             {
956                 Log.warn( "Unable to process stanza: {}", stanza.toXML() );
957             }
958         } else {
959             // Strip all FMUC data.
960             final Packet stripped = createCopyWithoutFMUC( stanza );
961 
962             // The 'stripped' stanza is going to be distributed locally. Act as if it originates from a local user, instead of the remote FMUC one.
963             final JID from;
964             from = senderRole.getRoleAddress();
965             stripped.setFrom( from );
966             stripped.setTo( room.getJID() );
967 
968             room.send( stripped, senderRole );
969         }
970     }
971 
finishOutboundJoin()972     private void finishOutboundJoin()
973     {
974         if ( outboundJoinProgress == null ) {
975             throw new IllegalStateException( "Cannot finish outbound join from '" + room.getJID() + "' as none is in progress." );
976         }
977         Log.trace("(room: '{}'): Finish setting up the outbound FMUC join with '{}'.", room.getJID(), outboundJoinProgress.getPeer() );
978         if ( !outboundJoinProgress.isJoinComplete() ) {
979             throw new IllegalStateException( "Cannot finish outbound join from '" + room.getJID()+"' to '"+ outboundJoinProgress.getPeer()+"', as it is not complete!" );
980         }
981 
982         List<OutboundJoinProgress.QueuedStanza> queued = outboundJoinProgress.purgeQueue();
983         if ( outboundJoinProgress.isRejected() )
984         {
985             Log.trace("(room: '{}'): Notifying callback waiting for the complete FMUC join response from '{}' with a rejection.", room.getJID(), outboundJoinProgress.getPeer() );
986             final FMUCException rejection = new FMUCException(outboundJoinProgress.getRejectionMessage());
987             outboundJoinProgress.getCallback().completeExceptionally(rejection);
988             queued.forEach( queuedStanza -> queuedStanza.future.completeExceptionally(rejection));
989         }
990         else
991         {
992             Log.trace("(room: '{}'): Synchronizing state of local room with joined FMUC node '{}'.", room.getJID(), outboundJoinProgress.getPeer() );
993             outboundJoin = new OutboundJoin(outboundJoinConfiguration);
994 
995             // Before processing the data in context of the local FMUC room, ensure that the FMUC metadata state is up-to-date.
996             for ( final Packet response : outboundJoinProgress.getResponses() ) {
997                 if ( response instanceof Presence ) {
998                     final JID occupantOnJoinedNode = getFMUCFromJID(response);
999                     outboundJoin.addOccupant( occupantOnJoinedNode );
1000                 }
1001             }
1002 
1003             // Use a room role that can be used to identify the remote fmuc node (to prevent data from being echo'd back)
1004             final MUCRole roomRole = MUCRole.createRoomRole(room);
1005             roomRole.setReportedFmucAddress( outboundJoin.getPeer() );
1006 
1007             // Use received data to augment state of the local room.
1008             for ( final Packet response : outboundJoinProgress.getResponses() ) {
1009                 try
1010                 {
1011                     if ( response instanceof Presence ) {
1012                         makeRemoteOccupantJoinRoom((Presence) response);
1013                     } else if ( response instanceof Message && response.getElement().element("body") != null) {
1014                         addRemoteHistoryToRoom((Message) response);
1015                     } else if ( response instanceof Message && response.getElement().element("subject") != null) {
1016                         applyRemoteSubjectToRoom((Message) response, roomRole);
1017                     }
1018                 } catch ( Exception e ) {
1019                     Log.error( "(room: '{}'): An unexpected exception occurred while processing FMUC join response stanzas.", room.getJID(), e );
1020                 }
1021             }
1022 
1023             Log.trace("(room: '{}'): Notifying callback waiting for the complete FMUC join response from '{}' with success.", room.getJID(), outboundJoinProgress.getPeer() );
1024             outboundJoinProgress.getCallback().complete( null );
1025 
1026             Log.trace("(room: '{}'): Sending {} stanza(s) that were queued, waiting for the complete FMUC join", room.getJID(), queued );
1027             for ( final OutboundJoinProgress.QueuedStanza queuedStanza : queued ) {
1028                 try {
1029                     doPropagateOutbound(queuedStanza.stanza, queuedStanza.sender, queuedStanza.future);
1030                 } catch ( Exception e ) {
1031                     Log.warn( "An exception occurred while trying to process a stanza that was queued pending completion of FMUC join in room " + room.getJID() + ": " + queuedStanza.stanza );
1032                 }
1033             }
1034         }
1035     }
1036 
applyRemoteSubjectToRoom( @onnull final Message message, @Nonnull final MUCRole mucRole )1037     private void applyRemoteSubjectToRoom( @Nonnull final Message message, @Nonnull final MUCRole mucRole )
1038     {
1039         try
1040         {
1041             Log.trace("(room: '{}'): Received subject from joined FMUC node '{}'. Applying it locally.", room.getJID(), mucRole.getReportedFmucAddress() );
1042             room.changeSubject(createCopyWithoutFMUC(message), mucRole);
1043         }
1044         catch ( ForbiddenException e ) {
1045             // This should not be possible, as we're using a role above that should bypass the auth checks that throw this exception!
1046             Log.error( "(room: '{}'): An unexpected exception occurred while processing FMUC join response stanzas.", room.getJID(), e );
1047         }
1048     }
1049 
addRemoteHistoryToRoom( @onnull final Message message )1050     private void addRemoteHistoryToRoom( @Nonnull final Message message )
1051     {
1052         final Element fmuc = message.getElement().element(FMUC);
1053         if ( fmuc == null ) {
1054             throw new IllegalArgumentException( "Argument 'presence' should be an FMUC presence, but it does not appear to be: it is missing the FMUC child element." );
1055         }
1056 
1057         Log.trace("(room: '{}'): Received history from joined FMUC node '{}'. Applying it locally.", room.getJID(), outboundJoinProgress.getPeer() );
1058 
1059         final JID userJID = new JID( fmuc.attributeValue("from"));
1060         final String nickname = message.getFrom().getResource();
1061         Date sentDate;
1062         final Element delay = message.getChildElement("delay","urn:xmpp:delay");
1063         if ( delay != null ) {
1064             final String stamp = delay.attributeValue("stamp");
1065             try
1066             {
1067                 sentDate = new XMPPDateTimeFormat().parseString(stamp);
1068             }
1069             catch ( ParseException e )
1070             {
1071                 Log.warn( "Cannot parse 'stamp' from delay element in message as received in FMUC join: {}", message, e );
1072                 sentDate = null;
1073             }
1074         } else {
1075             sentDate = null;
1076             Log.warn( "Missing delay element in message received in FMUC join: {}", message );
1077         }
1078 
1079         final Message cleanedUpMessage = createCopyWithoutFMUC(message);
1080         room.getRoomHistory().addOldMessage( userJID.toString(), nickname, sentDate, cleanedUpMessage.getSubject(), cleanedUpMessage.getBody(), cleanedUpMessage.toXML() );
1081     }
1082 
1083     /**
1084      * Parses the JID from an FMUC stanza.
1085      *
1086      * More specifically, this method returns the JID representation of the 'from' attribute value of the 'fmuc' child
1087      * element in the stanza. A runtime exception is thrown when no such value exists, or when that value is not a
1088      * valid JID.
1089      *
1090      * @param stanza An FMUC stanza
1091      * @return A JID.
1092      * @throws RuntimeException when no valid JID value is found in the 'from' attribute of the FMUC child element.
1093      */
getFMUCFromJID( @onnull final Packet stanza )1094     public static JID getFMUCFromJID( @Nonnull final Packet stanza )
1095     {
1096         final Element fmuc = stanza.getElement().element(FMUC);
1097         if ( fmuc == null ) {
1098             throw new IllegalArgumentException( "Argument 'stanza' should be an FMUC stanza, but it does not appear to be: it is missing the FMUC child element." );
1099         }
1100 
1101         final String fromValue = fmuc.attributeValue( "from" );
1102 
1103         if ( fromValue == null ) {
1104             throw new IllegalArgumentException( "Argument 'stanza' should be a valid FMUC stanza, but it does not appear to be: it is missing a 'from' attribute value in the FMUC child element." );
1105         }
1106 
1107         final JID userJID = new JID( fromValue );
1108         return userJID;
1109     }
1110 
1111     /**
1112      * Processes a presence stanza that is expected to be an FMUC-flavored 'room join' representation, and adds the
1113      * remote user to the room.
1114      *
1115      * This method will <em>not</em> make modifications to the state of the FMUC node set. It expects those changes to
1116      * be taken care of by the caller.
1117      *
1118      * This method provides the functionally opposite implementation of {@link #makeRemoteOccupantLeaveRoom(Presence)}.
1119      *
1120      * @param presence The stanza representing a user on a federated FMUC node joining the room (cannot be null).
1121      * @see #makeRemoteOccupantLeaveRoom(Presence)
1122      */
makeRemoteOccupantJoinRoom( @onnull final Presence presence )1123     private void makeRemoteOccupantJoinRoom( @Nonnull final Presence presence )
1124     {
1125         // FIXME: better input validation / better problem handling when remote node sends crappy data!
1126         final Element mucUser = presence.getElement().element(QName.get("x","http://jabber.org/protocol/muc#user"));
1127         final Element fmuc = presence.getElement().element(FMUC);
1128         if ( fmuc == null ) {
1129             throw new IllegalArgumentException( "Argument 'presence' should be an FMUC presence, but it does not appear to be: it is missing the FMUC child element." );
1130         }
1131 
1132         final JID remoteMUC = presence.getFrom().asBareJID();
1133         final String nickname = presence.getFrom().getResource();
1134 
1135         Log.debug( "(room: '{}'): Occupant on remote peer '{}' joins the room with nickname '{}'.", room.getJID(), remoteMUC, nickname );
1136 
1137         MUCRole.Role role;
1138         if ( mucUser != null && mucUser.element("item") != null && mucUser.element("item").attributeValue("role") != null ) {
1139             try {
1140                 role = MUCRole.Role.valueOf(mucUser.element("item").attributeValue("role"));
1141             } catch ( IllegalArgumentException e ) {
1142                 Log.info( "Cannot parse role as received in FMUC join, using default role instead: {}", presence, e );
1143                 role = MUCRole.Role.participant;
1144             }
1145         } else {
1146             Log.info( "Cannot parse role as received in FMUC join, using default role instead: {}", presence );
1147             role = MUCRole.Role.participant;
1148         }
1149 
1150         MUCRole.Affiliation affiliation;
1151         if ( mucUser != null && mucUser.element("item") != null && mucUser.element("item").attributeValue("affiliation") != null ) {
1152             try {
1153                 affiliation = MUCRole.Affiliation.valueOf(mucUser.element("item").attributeValue("affiliation"));
1154             } catch ( IllegalArgumentException e ) {
1155                 Log.info( "Cannot parse affiliation as received in FMUC join, using default role instead: {}", presence, e );
1156                 affiliation = MUCRole.Affiliation.none;
1157             }
1158         } else {
1159             Log.info( "Cannot parse affiliation as received in FMUC join, using default role instead: {}", presence );
1160             affiliation = MUCRole.Affiliation.none;
1161         }
1162 
1163         final JID userJID = getFMUCFromJID( presence );
1164 
1165         final MUCRole joinRole = new MUCRole(room, nickname, role, affiliation, userJID, createCopyWithoutFMUC(presence));
1166 
1167         joinRole.setReportedFmucAddress( userJID );
1168 
1169         final boolean clientOnlyJoin = room.alreadyJoinedWithThisNick( userJID, nickname );
1170         if (clientOnlyJoin)
1171         {
1172             Log.warn( "(room: '{}'): Ignoring join of occupant on remote peer '{}' with nickname '{}' as this user is already in the room.", room.getJID(), remoteMUC, nickname );
1173         }
1174         else
1175         {
1176             // Update the (local) room state to now include this occupant.
1177             room.addOccupantRole(joinRole);
1178 
1179             // Send out presence stanzas that signal all other occupants that this occupant has now joined. Unlike a 'regular' join we MUST
1180             // _not_ sent back presence for all other occupants (that has already been covered by the FMUC protocol implementation).
1181             room.sendInitialPresenceToExistingOccupants(joinRole);
1182         }
1183     }
1184 
1185     /**
1186      * Removes a remote user from the room.
1187      *
1188      * This method is intended to be used when a remote node is being disconnected from the FMUC node set, without having
1189      * sent 'leave' presence stanzas for its occupants. This method generates such presence stanzas, and delegates
1190      * further processing to {@link #makeRemoteOccupantLeaveRoom(Presence)}
1191      *
1192      * @param removedRemoteOccupants The occupants to be removed from the room.
1193      */
makeRemoteOccupantLeaveRoom( @onnull Set<JID> removedRemoteOccupants )1194     private void makeRemoteOccupantLeaveRoom( @Nonnull Set<JID> removedRemoteOccupants ) {
1195         for ( final JID removedRemoteOccupant : removedRemoteOccupants )
1196         {
1197             try
1198             {
1199                 Log.trace("(room: '{}'): Removing occupant '{}' that was joined through a (now presumably disconnected) remote node.", room.getJID(), removedRemoteOccupant);
1200                 final MUCRole role = room.getOccupantByFullJID( removedRemoteOccupant );
1201                 if ( role == null ) {
1202                     Log.warn("(room: '{}'): Unable to remove '{}' as it currently is not registered as an occupant of this room.", room.getJID(), removedRemoteOccupant);
1203                     continue;
1204                 }
1205 
1206                 final Presence leave = new Presence();
1207                 leave.setType(Presence.Type.unavailable);
1208                 leave.setTo(role.getRoleAddress());
1209                 leave.setFrom(role.getUserAddress());
1210                 leave.setStatus("FMUC node disconnect");
1211                 final Presence enriched = enrichWithFMUCElement( leave, role.getReportedFmucAddress() );
1212 
1213                 makeRemoteOccupantLeaveRoom( enriched );
1214             }
1215             catch ( Exception e )
1216             {
1217                 Log.warn("(room: '{}'): An exception occurred while removing occupant '{}' from a (now presumably disconnected) remote node.", room.getJID(), removedRemoteOccupant, e);
1218             }
1219         }
1220     }
1221 
1222     /**
1223      * Processes a presence stanza that is expected to be an FMUC-flavored 'leave' representation, and removes the
1224      * remote user from the room.
1225      *
1226      * This method will <em>not</em> make modifications to the state of the FMUC node set. It expects those changes to
1227      * be taken care of by the caller.
1228      *
1229      * This method provides the functionally opposite implementation of {@link #makeRemoteOccupantJoinRoom(Presence)}.
1230      *
1231      * @param presence The stanza representing a user on a federated FMUC node leaving the room (cannot be null).
1232      * @see #makeRemoteOccupantJoinRoom(Presence)
1233      */
makeRemoteOccupantLeaveRoom( @onnull final Presence presence )1234     private void makeRemoteOccupantLeaveRoom( @Nonnull final Presence presence )
1235     {
1236         // FIXME: better input validation / better problem handling when remote node sends crappy data!
1237         final Element fmuc = presence.getElement().element(FMUC);
1238         if ( fmuc == null ) {
1239             throw new IllegalArgumentException( "Argument 'presence' should be an FMUC presence, but it does not appear to be: it is missing the FMUC child element." );
1240         }
1241         final JID userJID = getFMUCFromJID( presence );
1242 
1243         final MUCRole leaveRole = room.getOccupantByFullJID( userJID );
1244         leaveRole.setPresence( createCopyWithoutFMUC(presence) ); // update presence to reflect the 'leave' - this is used later to broadcast to other occupants.
1245 
1246         // Send presence to inform all occupants of the room that the user has left.
1247         room.sendLeavePresenceToExistingOccupants( leaveRole )
1248             // DO NOT use 'thenRunAsync', as that will cause issues with clustering (it uses an executor that overrides the contextClassLoader, causing ClassNotFound exceptions in ClusterExternalizableUtil.
1249             .thenRun( () -> {
1250                 // Update the (local) room state to no longer include this occupant.
1251                 room.removeOccupantRole(leaveRole);
1252             });
1253     }
1254 
1255     /**
1256      * Checks if the entity that attempts to join, which is assumed to represent a remote, joining FMUC node, is allowed
1257      * to join the local ('joined') FMUC node.
1258      *
1259      * @param joiningPeer the address of the remote room that attempts to join the local FMUC node.
1260      * @throws FMUCException when the peer cannot join the local FMUC node.
1261      */
checkAcceptingFMUCJoiningNodePreconditions( @onnull final JID joiningPeer )1262     private void checkAcceptingFMUCJoiningNodePreconditions( @Nonnull final JID joiningPeer ) throws FMUCException
1263     {
1264         if ( !joiningPeer.asBareJID().equals(joiningPeer) ) {
1265             throw new IllegalArgumentException( "Expected argument 'joiningPeer' to be a bare JID, but it was not: " + joiningPeer );
1266         }
1267 
1268         if ( !(room.isFmucEnabled() && FMUC_ENABLED.getValue()) )
1269         {
1270             Log.info( "(room: '{}'): Rejecting join request of remote joining peer '{}': FMUC functionality is not enabled.", room.getJID(), joiningPeer );
1271             throw new FMUCException( "FMUC functionality is not enabled." );
1272         }
1273 
1274         if ( this.outboundJoinConfiguration != null && joiningPeer.equals(this.outboundJoinConfiguration.getPeer()) ) {
1275             Log.info( "(room: '{}'): Rejecting join request of remote joining peer '{}': The local, joined node is set up to federate with the joining node (cannot have circular federation).", room.getJID(), joiningPeer );
1276             throw new FMUCException( "The joined node is set up to federate with the joining node (cannot have circular federation)." );
1277         }
1278 
1279         Log.debug( "(room: '{}'): Accepting join request of remote joining peer '{}'.", room.getJID(), joiningPeer );
1280     }
1281 
1282     /**
1283      * Sends a stanza back to a remote, joining FMUC node that represents rejection of an FMUC join request.
1284      *
1285      * @param joinRequest The request to join that is being rejected
1286      * @param rejectionMessage An optional, human readable message that describes the reason for the rejection.
1287      */
rejectJoiningFMUCNode( @onnull final Presence joinRequest, @Nullable final String rejectionMessage )1288     private void rejectJoiningFMUCNode( @Nonnull final Presence joinRequest, @Nullable final String rejectionMessage )
1289     {
1290         Log.trace("(room: '{}'): Rejecting FMUC join request from '{}'.", room.getJID(), joinRequest.getFrom().asBareJID() );
1291         final Presence rejection = new Presence();
1292         rejection.setTo( joinRequest.getFrom() );
1293         rejection.setFrom( this.room.getJID() );
1294         final Element rejectEl = rejection.addChildElement( FMUC.getName(), FMUC.getNamespaceURI() ).addElement("reject");
1295         if ( rejectionMessage != null && !rejectionMessage.trim().isEmpty() ) {
1296             rejectEl.setText( rejectionMessage );
1297         }
1298         XMPPServer.getInstance().getPacketRouter().route( rejection );
1299     }
1300 
1301     /**
1302      * Sends a stanza back to a remote, joining FMUC node that represents acceptance of a FMUC join request.
1303      *
1304      * @param joinRequest The request to join that is being accepted.
1305      */
acceptJoiningFMUCNode( @onnull final Presence joinRequest )1306     private void acceptJoiningFMUCNode( @Nonnull final Presence joinRequest )
1307     {
1308         Log.trace("(room: '{}'): Accepting FMUC join request from '{}'.", room.getJID(), joinRequest.getFrom().asBareJID() );
1309         final JID joiningPeer = joinRequest.getFrom().asBareJID();
1310         final InboundJoin inboundJoin = new InboundJoin(joiningPeer);
1311         final JID occupant = getFMUCFromJID(joinRequest);
1312         inboundJoin.addOccupant( occupant );
1313         inboundJoins.put(joiningPeer, inboundJoin); // TODO make thread safe.
1314         afterJoinSendOccupants( joiningPeer );
1315         afterJoinSendHistory( joiningPeer );
1316         afterJoinSendSubject( joiningPeer );
1317         makeRemoteOccupantJoinRoom( joinRequest );
1318     }
1319 
afterJoinSendOccupants( @onnull final JID joiningPeer )1320     private void afterJoinSendOccupants( @Nonnull final JID joiningPeer )
1321     {
1322         if ( !joiningPeer.asBareJID().equals(joiningPeer) ) {
1323             throw new IllegalArgumentException( "Expected argument 'joiningPeer' to be a bare JID, but it was not: " + joiningPeer );
1324         }
1325 
1326         Log.trace("(room: '{}'): Sending current occupants to joining node '{}'.", room.getJID(), joiningPeer );
1327 
1328         for ( final MUCRole occupant : room.getOccupants() ) {
1329             if ( occupant.getReportedFmucAddress() != null && occupant.getReportedFmucAddress().asBareJID().equals( joiningPeer ) ) {
1330                 Log.trace("(room: '{}'): Skipping occupant '{}' as that originates from the joining node.", room.getJID(), occupant );
1331                 continue;
1332             }
1333 
1334             // TODO can we use occupant.getPresence() for this?
1335             // TODO do we need to worry about who we're exposing data to?
1336             final Presence presence = new Presence();
1337             presence.setFrom( occupant.getRoleAddress() );
1338             presence.setTo( joiningPeer );
1339             final Presence enriched = enrichWithFMUCElement( presence, occupant );
1340             final Element xitem = enriched.addChildElement( "x", "http://jabber.org/protocol/muc#user" ).addElement( "item" );
1341             xitem.addAttribute( "affiliation", occupant.getAffiliation().toString() );
1342             xitem.addAttribute( "role", occupant.getRole().toString() );
1343             xitem.addAttribute( "jid", occupant.getRoleAddress().toString() );
1344 
1345             XMPPServer.getInstance().getPacketRouter().route( enriched );
1346         }
1347     }
1348 
afterJoinSendHistory( @onnull final JID joiningPeer )1349     private void afterJoinSendHistory( @Nonnull final JID joiningPeer )
1350     {
1351         if ( !joiningPeer.asBareJID().equals(joiningPeer) ) {
1352             throw new IllegalArgumentException( "Expected argument 'joiningPeer' to be a bare JID, but it was not: " + joiningPeer );
1353         }
1354 
1355         // TODO. Can org.jivesoftware.openfire.muc.spi.MUCRoom.sendRoomHistoryAfterJoin be reused to reduce duplicate code and responsibilities?
1356         Log.trace("(room: '{}'): Sending history to joining node '{}'.", room.getJID(), joiningPeer );
1357         final MUCRoomHistory roomHistory = room.getRoomHistory();
1358         final Iterator<Message> history = roomHistory.getMessageHistory();
1359         while (history.hasNext()) {
1360             // The message stanza in the history is the original stanza (with original addressing), which we can leverage
1361             // to obtain the 'real' jid of the sender. Note that this sender need not be in the room any more, so we can't
1362             // depend on having a MUCRole for it.
1363             final Message oldMessage = history.next();
1364 
1365             final JID originalAuthorUserAddress = oldMessage.getFrom();
1366             final JID originalAuthorRoleAddress = new JID( room.getJID().getNode(), room.getJID().getDomain(), originalAuthorUserAddress.getResource() );
1367 
1368             final Message enriched = enrichWithFMUCElement( oldMessage, originalAuthorUserAddress );
1369 
1370             // Correct the addressing of the stanza.
1371             enriched.setFrom( originalAuthorRoleAddress );
1372             enriched.setTo( joiningPeer );
1373 
1374             XMPPServer.getInstance().getPacketRouter().route( enriched );
1375         }
1376     }
1377 
afterJoinSendSubject( @onnull final JID joiningPeer )1378     private void afterJoinSendSubject( @Nonnull final JID joiningPeer )
1379     {
1380         if ( !joiningPeer.asBareJID().equals(joiningPeer) ) {
1381             throw new IllegalArgumentException( "Expected argument 'joiningPeer' to be a bare JID, but it was not: " + joiningPeer );
1382         }
1383 
1384         Log.trace("(room: '{}'): Sending subject to joining node '{}'.", room.getJID(), joiningPeer );
1385 
1386         // TODO can org.jivesoftware.openfire.muc.spi.MUCRoom.sendRoomSubjectAfterJoin be re-used?
1387         final MUCRoomHistory roomHistory = room.getRoomHistory();
1388         Message roomSubject = roomHistory.getChangedSubject();
1389         if ( roomSubject != null ) {
1390             roomSubject = roomSubject.createCopy(); // OF-2163: Prevent accidental modifications to the original.
1391         } else {
1392             roomSubject = new Message();
1393             roomSubject.setFrom(this.room.getJID()); // This might break FMUC, as it does not include the nickname of the author of the subject.
1394             roomSubject.setTo(joiningPeer);
1395             roomSubject.setType(Message.Type.groupchat);
1396             roomSubject.setID(UUID.randomUUID().toString());
1397             final Element subjectEl = roomSubject.getElement().addElement("subject");
1398             if ( room.getSubject() != null && !room.getSubject().isEmpty() )
1399             {
1400                 subjectEl.setText(room.getSubject());
1401             }
1402         }
1403 
1404         final JID originalAuthorUserAddress = roomSubject.getFrom();
1405         final JID originalAuthorRoleAddress = new JID( room.getJID().getNode(), room.getJID().getDomain(), originalAuthorUserAddress.getResource() );
1406 
1407         final Message enriched = enrichWithFMUCElement( roomSubject, originalAuthorUserAddress );
1408 
1409         // Correct the addressing of the stanza.
1410         enriched.setFrom( originalAuthorRoleAddress );
1411         enriched.setTo( joiningPeer );
1412 
1413         XMPPServer.getInstance().getPacketRouter().route( enriched );
1414     }
1415 
isSubject( @onnull final Packet stanza )1416     public static boolean isSubject( @Nonnull final Packet stanza )
1417     {
1418         final Element fmuc = stanza.getElement().element(FMUC);
1419         if (fmuc == null) {
1420             throw new IllegalArgumentException( "Provided stanza must have an 'fmuc' child element (but does not).");
1421         }
1422 
1423         final boolean result = stanza instanceof Message && stanza.getElement().element("subject") != null;
1424         Log.trace( "Stanza from '{}' was determined to {} a stanza containing a MUC subject.", stanza.getFrom(), result ? "be" : "not be" );
1425         return result;
1426     }
1427 
isFMUCReject( @onnull final Packet stanza )1428     public static boolean isFMUCReject( @Nonnull final Packet stanza )
1429     {
1430         final Element fmuc = stanza.getElement().element(FMUC);
1431         if (fmuc == null) {
1432             throw new IllegalArgumentException( "Provided stanza must have an 'fmuc' child element (but does not).");
1433         }
1434 
1435         final boolean result = stanza instanceof Presence && fmuc.element("reject") != null;
1436         Log.trace( "Stanza from '{}' was determined to {} a stanza containing a FMUC join reject.", stanza.getFrom(), result ? "be" : "not be" );
1437         return result;
1438     }
1439 
isFMUCJoinRequest( @onnull Packet stanza )1440     public static boolean isFMUCJoinRequest( @Nonnull Packet stanza )
1441     {
1442         final Element fmuc = stanza.getElement().element(FMUC);
1443         if (fmuc == null) {
1444             throw new IllegalArgumentException( "Provided stanza must have an 'fmuc' child element (but does not).");
1445         }
1446 
1447         final boolean result =
1448             (stanza instanceof Presence) &&
1449                 stanza.getElement().element( QName.get( "x", "http://jabber.org/protocol/muc") ) != null &&
1450                 stanza.getElement().element( QName.get( "x", "http://jabber.org/protocol/muc#user") ) != null;
1451 
1452         Log.trace( "Stanza from '{}' was determined to {} a stanza containing a FMUC join request.", stanza.getFrom(), result ? "be" : "not be" );
1453         return result;
1454     }
1455 
getOutboundJoin()1456     public OutboundJoin getOutboundJoin() {
1457         return outboundJoin;
1458     }
1459 
getOutboundJoinProgress()1460     public OutboundJoinProgress getOutboundJoinProgress() {
1461         return outboundJoinProgress;
1462     }
1463 
getInboundJoins()1464     public Collection<InboundJoin> getInboundJoins() {
1465         return inboundJoins.values();
1466     }
1467 
abortOutboundJoinProgress()1468     public void abortOutboundJoinProgress()
1469     {
1470         if ( outboundJoinProgress != null ) {
1471             outboundJoinProgress.abort();
1472             outboundJoinProgress = null;
1473         }
1474     }
1475 
1476     abstract static class RemoteFMUCNode implements Serializable
1477     {
1478         private final Logger Log;
1479 
1480         /**
1481          * The address of the remote MUC room that is federating with us, in which 'our' MUC room takes the role of
1482          * 'joined FMUC node' while the room that federates with us (who's address is recorded in this field) takes the
1483          * role of 'joining FMUC node'.
1484          */
1485         private final JID peer;
1486 
1487         /**
1488          * The addresses of the occupants of the MUC that are joined through this FMUC node.
1489          */
1490         protected final Set<JID> occupants = new HashSet<>();
1491 
getPeer()1492         public JID getPeer() {
1493             return peer;
1494         }
1495 
RemoteFMUCNode( @onnull final JID peer )1496         RemoteFMUCNode( @Nonnull final JID peer ) {
1497             this.peer = peer.asBareJID();
1498             Log = LoggerFactory.getLogger( this.getClass().getName() + ".[peer: " + peer + "]" );
1499         }
1500 
wantsStanzasSentBy( @onnull final MUCRole sender )1501         public boolean wantsStanzasSentBy( @Nonnull final MUCRole sender ) {
1502             // Only send data if the sender is not an entity on this remote FMUC node, or the remote FMUC node itself.
1503             return sender.getReportedFmucAddress() == null || (!occupants.contains( sender.getReportedFmucAddress() ) && !peer.equals( sender.getReportedFmucAddress()) );
1504         }
1505 
addOccupant( @onnull final JID occupant )1506         public boolean addOccupant( @Nonnull final JID occupant ) {
1507             Log.trace( "Adding remote occupant: '{}'", occupant );
1508             return occupants.add( occupant );
1509         }
1510 
removeOccupant( @onnull final JID occupant )1511         public boolean removeOccupant( @Nonnull final JID occupant ) {
1512             Log.trace( "Removing remote occupant: '{}'", occupant );
1513             return occupants.remove( occupant );
1514         }
1515 
getOccupants()1516         public Set<JID> getOccupants() {
1517             return occupants;
1518         }
1519     }
1520 
1521     public static class InboundJoin extends RemoteFMUCNode
1522     {
InboundJoin( @onnull final JID peer )1523         public InboundJoin( @Nonnull final JID peer )
1524         {
1525             super(peer);
1526         }
1527     }
1528 
1529     public static class OutboundJoinConfiguration
1530     {
1531         private final FMUCMode mode;
1532         private final JID peer;
1533 
OutboundJoinConfiguration( @onnull final JID peer, @Nonnull final FMUCMode mode )1534         public OutboundJoinConfiguration( @Nonnull final JID peer, @Nonnull final FMUCMode mode ) {
1535             this.mode = mode;
1536             this.peer = peer;
1537         }
1538 
getMode()1539         public FMUCMode getMode()
1540         {
1541             return mode;
1542         }
1543 
getPeer()1544         public JID getPeer()
1545         {
1546             return peer;
1547         }
1548 
1549         @Override
equals( final Object o )1550         public boolean equals( final Object o )
1551         {
1552             if ( this == o ) { return true; }
1553             if ( o == null || getClass() != o.getClass() ) { return false; }
1554             final OutboundJoinConfiguration that = (OutboundJoinConfiguration) o;
1555             return mode == that.mode &&
1556                 peer.equals(that.peer);
1557         }
1558 
1559         @Override
hashCode()1560         public int hashCode()
1561         {
1562             return Objects.hash(mode, peer);
1563         }
1564 
1565         @Override
toString()1566         public String toString()
1567         {
1568             return "OutboundJoinConfiguration{" +
1569                 "peer=" + peer +
1570                 ", mode=" + mode +
1571                 '}';
1572         }
1573     }
1574 
1575     public static class OutboundJoin extends RemoteFMUCNode
1576     {
1577         private final FMUCMode mode;
1578 
1579         /**
1580          * A list of stanzas that need to have been echo'd by a remote FMUC node, before they can be processed locally.
1581          * This collection _does not_ include stanzas needed to set up the initial join. This collection is only used
1582          * for subsequent stanzas that are shared in a setting where echo-ing is required (due to the mode of the
1583          * federation being defined as 'master-slave').
1584          */
1585         private final Set<PendingCallback> pendingEcho = new HashSet<>();
1586 
OutboundJoin( @onnull OutboundJoinConfiguration configuration )1587         public OutboundJoin( @Nonnull OutboundJoinConfiguration configuration ) {
1588             super( configuration.getPeer() );
1589             this.mode = configuration.getMode();
1590         }
1591 
OutboundJoin( @onnull final JID peer, @Nonnull final FMUCMode mode )1592         public OutboundJoin( @Nonnull final JID peer, @Nonnull final FMUCMode mode ) {
1593             super(peer);
1594             this.mode = mode;
1595         }
1596 
getMode()1597         public FMUCMode getMode()
1598         {
1599             return mode;
1600         }
1601 
getConfiguration()1602         public OutboundJoinConfiguration getConfiguration() {
1603             return new OutboundJoinConfiguration(getPeer(), getMode() );
1604         }
1605 
1606         @Override
wantsStanzasSentBy( @onnull MUCRole sender )1607         public boolean wantsStanzasSentBy( @Nonnull MUCRole sender ) {
1608             if ( FMUCMode.MasterSlave == mode ) {
1609                 return true; // always wants stanzas - if only because the data needs to be echo'd back.
1610             }
1611 
1612             return super.wantsStanzasSentBy(sender);
1613         }
1614 
evaluateForCallbackCompletion( @onnull Packet stanza )1615         public synchronized void evaluateForCallbackCompletion( @Nonnull Packet stanza )
1616         {
1617             Log.trace( "Evaluating stanza for callback completion..." );
1618             if ( stanza.getElement().element(FMUC) == null ) {
1619                 throw new IllegalArgumentException( "Argument 'stanza' must have an FMUC child element." );
1620             }
1621 
1622             // Ignore if we do not have a callback waiting for this stanza.
1623             final Iterator<PendingCallback> iter = pendingEcho.iterator();
1624             while (iter.hasNext()) {
1625                 final PendingCallback item = iter.next();
1626                 if ( item.isMatch(stanza) ) {
1627                     Log.trace( "Invoking callback, as peer '{}' echo'd back stanza: {}", getPeer(), stanza.toXML() );
1628                     item.complete();
1629                     iter.remove();
1630                 }
1631             }
1632             Log.trace( "Finished evaluating stanza for callback completion." );
1633         }
1634 
registerEchoCallback( @onnull final Packet stanza, @Nonnull final CompletableFuture<?> result )1635         public synchronized void registerEchoCallback( @Nonnull final Packet stanza, @Nonnull final CompletableFuture<?> result )
1636         {
1637             if ( stanza.getElement().element(FMUC) == null ) {
1638                 throw new IllegalArgumentException( "Argument 'stanza' must have an FMUC child element." );
1639             }
1640 
1641             Log.trace( "Registering callback to be invoked when peer '{}' echos back stanza {}", getPeer(), stanza.toXML() );
1642             pendingEcho.add( new PendingCallback( stanza, result ) );
1643         }
1644     }
1645 
1646     public static class OutboundJoinProgress implements Serializable
1647     {
1648         private final Logger Log;
1649 
1650         /**
1651          * The address of the remote MUC room with which we are attempting to federate, in which 'our' MUC room takes the role of
1652          * 'joining FMUC node' while the room that federates with us (who's address is recorded in this field) takes the
1653          * role of 'joined FMUC node'.
1654          */
1655         private final JID peer;
1656 
1657         /**
1658          * The future that is awaiting completion of the join operation.
1659          */
1660         private final CompletableFuture<List<Packet>> callback;
1661 
1662         /**
1663          * A list of stanzas that have been sent from the remote room to the local room as part of the 'join' effort.
1664          *
1665          * This list is expected to contain (presence of) each participant, a message history, and a subject stanza.
1666          */
1667         private final ArrayList<Packet> responses;
1668 
1669         /**
1670          * A list of stanzas to be sent to the remote room as soon as federation has been established.
1671          *
1672          * This list is expected to contain stanzas that were shared with the joined room between the instant that a
1673          * federation attempt was started, and was completed.
1674          */
1675         private final ArrayList<QueuedStanza> queue;
1676 
1677         /**
1678          * The state of the federation join. Null means that the request is pending completion. True means a successful
1679          * join was achieved, while false means that the join request failed or was aborted.
1680          */
1681         private Boolean joinResult;
1682 
OutboundJoinProgress( @onnull final JID peer, @Nonnull final CompletableFuture<List<Packet>> callback )1683         public OutboundJoinProgress( @Nonnull final JID peer, @Nonnull final CompletableFuture<List<Packet>> callback )
1684         {
1685             Log = LoggerFactory.getLogger( this.getClass().getName() + ".[peer: " + peer + "]" );
1686             this.peer = peer.asBareJID();
1687             this.callback = callback;
1688             this.responses = new ArrayList<>();
1689             this.queue = new ArrayList<>();
1690         }
1691 
getPeer()1692         public JID getPeer() {
1693             return peer;
1694         }
1695 
getCallback()1696         public synchronized CompletableFuture<List<Packet>> getCallback() {
1697             return callback;
1698         }
1699 
getResponses()1700         public synchronized ArrayList<Packet> getResponses() {
1701             return responses;
1702         }
1703 
addResponse( @onnull final Packet stanza )1704         synchronized void addResponse( @Nonnull final Packet stanza ) {
1705             this.responses.add( stanza );
1706             if (joinResult == null) {
1707                 if ( isSubject( stanza ) ) {
1708                     joinResult = true;
1709                 }
1710                 if ( isFMUCReject( stanza ) ) {
1711                     joinResult = false;
1712                 }
1713             }
1714         }
1715 
1716         /**
1717          * Adds a stanza to be sent to the remote, joined MUC as soon as federation has been established.
1718          *
1719          * This method is intended to be used only when federation is in progress of being established.
1720          *
1721          * @param stanza The stanza to share
1722          * @param sender The author of the stanza
1723          * @return A future, indicating if local distribution of the stanza needs to wait.
1724          */
addToQueue( @onnull final Packet stanza, @Nonnull final MUCRole sender )1725         public synchronized CompletableFuture<?> addToQueue( @Nonnull final Packet stanza, @Nonnull final MUCRole sender ) {
1726             if( isJoinComplete() ) {
1727                 throw new IllegalStateException( "Queueing a stanza is not expected to occur when federation has already been established." );
1728             }
1729 
1730             final CompletableFuture<?> result = new CompletableFuture<>();
1731             if ( callback.isDone() ) {
1732                 result.complete(null);
1733             }
1734 
1735             Log.trace( "Adding stanza (type {}) from '{}' to queue, to be sent to peer as soon as federation has been established.", stanza.getClass().getSimpleName(), sender.getUserAddress() );
1736             queue.add( new QueuedStanza( stanza, sender, result ) );
1737 
1738             return result;
1739         }
1740 
1741         /**
1742          * Retrieve and clean the list of stanzas that have been queued after federation was initiated, but before it
1743          * was finished.
1744          *
1745          * @return A list of queued stanzas (possibly empty).
1746          */
purgeQueue()1747         public synchronized List<QueuedStanza> purgeQueue() {
1748             Log.trace( "Purging queue (size: {}) of stanzas to be sent to peer as soon as federation has been established.", queue.size() );
1749             final List<QueuedStanza> result = new ArrayList<>( queue );
1750             queue.clear();
1751             return result;
1752         }
1753 
isJoinComplete()1754         public synchronized boolean isJoinComplete() {
1755             return joinResult != null;
1756         }
1757 
isRejected()1758         public synchronized boolean isRejected() {
1759             return joinResult != null && !joinResult;
1760         }
1761 
getRejectionMessage()1762         public synchronized String getRejectionMessage()
1763         {
1764             if (!isRejected()) {
1765                 throw new IllegalStateException( "Cannot get rejection message, as rejection did not occur." );
1766             }
1767 
1768             final Packet stanza = this.responses.get( this.responses.size() -1 );
1769             final Element fmuc = stanza.getElement().element(FMUC);
1770             return fmuc.elementText("reject");
1771         }
1772 
isSuccessful()1773         public synchronized boolean isSuccessful() {
1774             return joinResult != null && joinResult;
1775         }
1776 
abort()1777         synchronized void abort()
1778         {
1779             Log.trace( "Aborting federation attempt." );
1780 
1781             joinResult = false;
1782 
1783             // Messages that are queued to be sent after federation has been established might have threads blocking on that delivery. That now will no longer happen. Make sure that we unblock all threads waiting for such an echo.
1784             if ( !queue.isEmpty() )
1785             {
1786                 Log.trace("Completing {} callbacks for queued stanzas might be waiting for federation to be established.", queue.size() );
1787                 for ( final QueuedStanza pendingCallback : queue )
1788                 {
1789                     try {
1790                         pendingCallback.future.complete( null ); // TODO maybe completeExceptionally?
1791                     } catch ( Exception e ) {
1792                         Log.warn("An exception occurred while completing callback for a queued message.", e);
1793                     }
1794                 }
1795             }
1796             callback.completeExceptionally( new IllegalStateException( "Federation with peer " + peer + " has been aborted.") );
1797         }
1798 
1799         static class QueuedStanza {
1800             final Packet stanza;
1801             final MUCRole sender;
1802             final CompletableFuture<?> future;
1803 
QueuedStanza(final Packet stanza, final MUCRole sender, final CompletableFuture<?> future )1804             QueuedStanza(final Packet stanza, final MUCRole sender, final CompletableFuture<?> future ) {
1805                 this.stanza = stanza;
1806                 this.sender = sender;
1807                 this.future = future;
1808             }
1809         }
1810     }
1811 
1812     /**
1813      * Represents a callback waiting for a stanza to be echo'd back from a remote FMUC node.
1814      */
1815     static class PendingCallback {
1816 
1817         final CompletableFuture<?> callback;
1818         final Class<? extends Packet> type;
1819         final JID remoteFMUCNode;
1820         final List<Element> elements;
1821 
PendingCallback( @onnull S original, @Nonnull CompletableFuture<?> callback )1822         public <S extends Packet> PendingCallback( @Nonnull S original, @Nonnull CompletableFuture<?> callback ) {
1823             if (!hasFMUCElement(original)) {
1824                 throw new IllegalArgumentException( "Provided stanza must be a stanza that is sent to a remote FMUC node, but was not (the FMUC child element is missing): " + original );
1825             }
1826             this.type = getType( original );
1827             this.remoteFMUCNode = original.getTo().asBareJID();
1828             this.elements = original.getElement().elements();
1829             this.callback = callback;
1830         }
1831 
complete()1832         private void complete() {
1833             callback.complete( null );
1834         }
1835 
isMatch( @onnull S stanza )1836         public <S extends Packet> boolean isMatch( @Nonnull S stanza )
1837         {
1838             if (!hasFMUCElement(stanza)) {
1839                 throw new IllegalArgumentException( "Provided stanza must be a stanza that was sent by remote FMUC node, but was not (the FMUC child element is missing): " + stanza );
1840             }
1841 
1842             if (!stanza.getClass().equals(type)) {
1843                 return false;
1844             }
1845 
1846             if (!stanza.getFrom().asBareJID().equals( remoteFMUCNode )) {
1847                 return false;
1848             }
1849 
1850             // All child elements of the echo'd stanza must equal the original.
1851             return elements.equals( stanza.getElement().elements() );
1852         }
1853 
hasFMUCElement( @onnull Packet stanza )1854         protected static boolean hasFMUCElement( @Nonnull Packet stanza ) {
1855             return stanza.getElement().element(FMUC) != null;
1856         }
1857 
getType( @onnull Packet stanza )1858         protected static Class<? extends Packet> getType( @Nonnull Packet stanza ) {
1859             return stanza.getClass();
1860         }
1861     }
1862 }
1863