1 /*
2  * Copyright (C) 2021-2022 Ignite Realtime Community. All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.jivesoftware.openfire.muc.spi;
17 
18 import org.jivesoftware.openfire.XMPPServer;
19 import org.jivesoftware.openfire.cluster.NodeID;
20 import org.jivesoftware.openfire.muc.MUCEventListener;
21 import org.jivesoftware.openfire.muc.MUCRole;
22 import org.jivesoftware.openfire.muc.MUCRoom;
23 import org.jivesoftware.openfire.muc.MultiUserChatService;
24 import org.jivesoftware.openfire.muc.cluster.OccupantAddedTask;
25 import org.jivesoftware.openfire.muc.cluster.OccupantKickedForNicknameTask;
26 import org.jivesoftware.openfire.muc.cluster.OccupantRemovedTask;
27 import org.jivesoftware.openfire.muc.cluster.OccupantUpdatedTask;
28 import org.jivesoftware.openfire.muc.cluster.SyncLocalOccupantsAndSendJoinPresenceTask;
29 import org.jivesoftware.util.SystemProperty;
30 import org.jivesoftware.util.TaskEngine;
31 import org.jivesoftware.util.cache.CacheFactory;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.xmpp.packet.JID;
35 import org.xmpp.packet.Message;
36 
37 import javax.annotation.Nonnull;
38 import javax.annotation.Nullable;
39 import java.time.Instant;
40 import java.time.LocalTime;
41 import java.time.format.DateTimeFormatter;
42 import java.util.*;
43 import java.util.concurrent.ConcurrentHashMap;
44 import java.util.concurrent.ConcurrentMap;
45 import java.util.stream.Collectors;
46 
47 /**
48  * Maintains an in-memory inventory of what XMPP entity (user) is in what chatroom, across the entire XMPP cluster.
49  *
50  * Each instance of this class takes responsibility of maintaining the in-memory representation of occupants of rooms
51  * for exactly one instance of {@link org.jivesoftware.openfire.muc.MultiUserChatService}.
52  *
53  * Some data duplication exists between the data managed by this class, and the data that is managed by the collection
54  * of MUCRoom instances that are part of the same MUC service. The MUCRoom managed data is shared across the cluster
55  * using a clustered data structure (a clustered {@link org.jivesoftware.util.cache.Cache}). The content of such caches
56  * cannot survive certain events related to changes in the composition of the cluster (the local server joining or
57  * leaving the cluster). This introduces problems, as, for example, the occupants that are connected to the local server
58  * should see occupants connected to a cluster node that is now unavailable 'leave' the chatroom. This requires the
59  * local cluster node to retain knowledge, even after the clustered cache has been reset. This implementation therefore
60  * by design makes no use of such clustered caches to exchange data with other cluster nodes. Instead, this
61  * implementation relies on cluster tasks to share data between cluster nodes. To minimize data transfer and data
62  * duplication, the data managed by this implementation is kept to the bare minimum needed to perform post-cluster event
63  * maintenance.
64  *
65  * Apart from the responsibility of maintaining data for post-cluster event maintenance, this implementation adds some
66  * conveniences, that include:
67  * <ul>
68  * <li>A method (that operates on the maintained data) to determine what room names a particular user is in. As this
69  *     implementation already maintains this data, obtaining it from this class is more convenient and more performant
70  *     than obtaining it from the data that is maintained in the clustered data structure (as that is mapped 'by room')</li>
71  * <li>A 'last activity' timestamp for users on the local node (to be used to detect idle users</li>
72  * </ul>
73  *
74  * @author Guus der Kinderen, guus.der.kinderen@gmail.com
75  * @see <a href="https://igniterealtime.atlassian.net/browse/OF-2224">OF-2224</a>
76  */
77 public class OccupantManager implements MUCEventListener
78 {
79     private static final Logger Log = LoggerFactory.getLogger(OccupantManager.class);
80 
81     public static final SystemProperty<Boolean> PROPERTY_USE_NONBLOCKING_CLUSTERTASKS = SystemProperty.Builder.ofType(Boolean.class)
82         .setKey("xmpp.muc.occupants.clustertask.non-blocking")
83         .setDynamic(true)
84         .setDefaultValue(false)
85         .build();
86 
87     /**`
88      * Name of the MUC service that this instance is operating for.
89      */
90     @Nonnull
91     private final String serviceName;
92 
93     /**`
94      * The (domain-part of the) JID of the MUC service that this instance is operating for.
95      */
96     @Nonnull
97     private final String serviceDomain;
98 
99     /**
100      * Lookup table for finding occupants by node.
101      */
102     @Nonnull
103     private final ConcurrentMap<NodeID, Set<Occupant>> occupantsByNode = new ConcurrentHashMap<>();
104 
105     /**
106      * Lookup table for finding nodes by occupant.
107      */
108     @Nonnull
109     private final ConcurrentMap<Occupant, Set<NodeID>> nodesByOccupant = new ConcurrentHashMap<>();
110 
111     /**
112      * Creates a new instance, specific for the provided MUC service.
113      *
114      * @param service The service for which the new instance will be operating.
115      */
OccupantManager(@onnull final MultiUserChatService service)116     OccupantManager(@Nonnull final MultiUserChatService service)
117     {
118         this.serviceName = service.getServiceName();
119         this.serviceDomain = service.getServiceDomain();
120         Log.debug("Instantiating for service '{}'", serviceName);
121     }
122 
123     /**
124      * Registers disappearance of an existing occupant, and/or appearance of a new occupant, on a specific node.
125      *
126      * This method maintains the three different occupant lookup tables, and keeps them in sync.
127      *
128      * @param oldOccupant An occupant that is to be removed from the registration of the referred node (nullable)
129      * @param newOccupant An occupant that is to be added to the registration of the referred node (nullable)
130      * @param nodeIDToReplaceOccupantFor The id of the node that the old/new occupant need to be (de)registered under. If null then the occupant is (de)registered for each node.
131      */
replaceOccupant(Occupant oldOccupant, Occupant newOccupant, NodeID nodeIDToReplaceOccupantFor)132     private void replaceOccupant(Occupant oldOccupant, Occupant newOccupant, NodeID nodeIDToReplaceOccupantFor) {
133 
134         Set<NodeID> nodeIDsToReplaceOccupantFor = new HashSet<>();
135         if (nodeIDToReplaceOccupantFor == null) {
136             nodeIDsToReplaceOccupantFor = occupantsByNode.keySet(); // all node ids
137         } else {
138             nodeIDsToReplaceOccupantFor.add(nodeIDToReplaceOccupantFor); // just the one
139         }
140 
141         for (NodeID nodeID : nodeIDsToReplaceOccupantFor) {
142             synchronized (nodeID) {
143                 // Step 1: remove old occupant, if there is any
144                 deleteOccupantFromNode(oldOccupant, nodeID);
145 
146                 // Step 2: add new occupant, if there is any
147                 if (newOccupant != null) {
148                     occupantsByNode.computeIfAbsent(nodeID, (n) -> new HashSet<>()).add(newOccupant);
149                     nodesByOccupant.computeIfAbsent(newOccupant, (n) -> new HashSet<>()).add(nodeID);
150                 }
151 
152                 Log.debug("Replaced occupant {} with {} for node {}", oldOccupant, newOccupant, nodeID);
153             }
154         }
155 
156         Log.debug("Occupants remaining after replace: {}", nodesByOccupant);
157     }
158 
deleteOccupantFromNode(Occupant oldOccupant, NodeID nodeID)159     private void deleteOccupantFromNode(Occupant oldOccupant, NodeID nodeID) {
160         if (oldOccupant != null) {
161             if (occupantsByNode.containsKey(nodeID)) {
162                 occupantsByNode.get(nodeID).remove(oldOccupant);
163                 if (occupantsByNode.get(nodeID).isEmpty()) {
164                     // Clean up, don't leave behind empty set
165                     occupantsByNode.remove(nodeID);
166                 }
167             }
168             if (nodesByOccupant.containsKey(oldOccupant)) {
169                 nodesByOccupant.get(oldOccupant).remove(nodeID);
170                 if (nodesByOccupant.get(oldOccupant).isEmpty()) {
171                     // Clean up, don't leave behind empty set
172                     nodesByOccupant.remove(oldOccupant);
173                 }
174             }
175 
176             // When an occupant is being pinged, but removed from the node, cancel the ping.
177             final TimerTask pendingPingTask = oldOccupant.getPendingPingTask();
178             if (pendingPingTask != null) {
179                 Log.debug("Remove pending ping task for {} that is being deleted.", oldOccupant);
180                 TaskEngine.getInstance().cancelScheduledTask(pendingPingTask);
181                 oldOccupant.setPendingPingTask(null);
182             }
183         }
184     }
185 
186     /**
187      * Verifies that a JID relates to the service for which this instance is operating, by comparing its domain part.
188      *
189      * @param jid The JID to check
190      * @return True if the JID relates to the service, otherwise false.
191      */
isForThisService(@onnull final JID jid)192     public boolean isForThisService(@Nonnull final JID jid) {
193         return jid.getDomain().equals(serviceDomain);
194     }
195 
196     /**
197      * When an XMPP entity / user is registered as a new occupant of a room, this event handler will ensure that this
198      * instance of OccupantManager, as well as all instances for the same server on every other cluster node, registers
199      * the relevant data that is needed to perform post-cluster event maintenance.
200      *
201      * @param roomJID the JID of the room where the occupant has joined.
202      * @param userJID The 'real' JID (not room-at-service-slash-nickname) of the XMPP user that joined.
203      * @param nickname nickname of the user in the room.
204      */
205     @Override
occupantJoined(JID roomJID, JID userJID, String nickname)206     public void occupantJoined(JID roomJID, JID userJID, String nickname)
207     {
208         if (!isForThisService(roomJID)) {
209             return;
210         }
211 
212         final OccupantAddedTask task = registerOccupantJoinedLocally(roomJID, userJID, nickname);
213 
214         // On all other cluster nodes
215         if (PROPERTY_USE_NONBLOCKING_CLUSTERTASKS.getValue()) {
216             CacheFactory.doClusterTask(task);
217         } else {
218             CacheFactory.doSynchronousClusterTask(task, false);
219         }
220     }
221 
registerOccupantJoinedLocally(JID roomJID, JID userJID, String nickname)222     public OccupantAddedTask registerOccupantJoinedLocally(JID roomJID, JID userJID, String nickname) {
223         Log.debug("New local occupancy in room '{}' of service '{}': entity '{}' using nickname '{}'", roomJID.getNode(), serviceName, userJID, nickname);
224 
225         final OccupantAddedTask task = new OccupantAddedTask(serviceName, roomJID.getNode(), nickname, userJID, XMPPServer.getInstance().getNodeID());
226         process(task); // On this cluster node.
227 
228         return task;
229     }
230 
231     /**
232      * When an XMPP entity / user that is an occupant of a room changes its nickname, this event handler will ensure that
233      * the relevant data that is maintained in this instance of OccupantManager, as well as all instances for the same
234      * server on every other cluster node, is updated.
235      *
236      * @param roomJID the JID of the room where the occupant has joined.
237      * @param userJID The 'real' JID (not room-at-service-slash-nickname) of the XMPP user that joined.
238      * @param oldNickname nickname of the user in the room prior to the change.
239      * @param newNickname nickname of the user in the room after the change.
240      */
241     @Override
nicknameChanged(JID roomJID, JID userJID, String oldNickname, String newNickname)242     public void nicknameChanged(JID roomJID, JID userJID, String oldNickname, String newNickname)
243     {
244         if (!isForThisService(roomJID)) {
245             return;
246         }
247 
248         Log.debug("Updated local occupancy in room '{}' of service '{}': entity '{}' now nickname '{}' (was: '{}')", roomJID.getNode(), serviceName, newNickname, oldNickname, userJID);
249         final OccupantUpdatedTask task = new OccupantUpdatedTask(serviceName, roomJID.getNode(), oldNickname, newNickname, userJID, XMPPServer.getInstance().getNodeID());
250         process(task); // On this cluster node.
251 
252         // On all other cluster nodes
253         if (PROPERTY_USE_NONBLOCKING_CLUSTERTASKS.getValue()) {
254             CacheFactory.doClusterTask(task);
255         } else {
256             CacheFactory.doSynchronousClusterTask(task, false);
257         }
258     }
259 
260     /**
261      * When an XMPP entity / user is removed as an occupant of a room, this event handler will ensure that this
262      * instance of OccupantManager, as well as all instances for the same server on every other cluster node, updates
263      * the relevant data that it maintains to perform post-cluster event maintenance.
264      *
265      * @param roomJID the JID of the room where the occupant has left.
266      * @param userJID The 'real' JID (not room-at-service-slash-nickname) of the XMPP user that left.
267      * @param nickname nickname that the user used in the room.
268      */
269     @Override
occupantLeft(JID roomJID, JID userJID, String nickname)270     public void occupantLeft(JID roomJID, JID userJID, String nickname)
271     {
272         if (!isForThisService(roomJID)) {
273             return;
274         }
275 
276         Log.debug("Removed local occupancy in room '{}' of service '{}': entity '{}' using nickname '{}'", roomJID.getNode(), serviceName, userJID, nickname);
277         final OccupantRemovedTask task = new OccupantRemovedTask(serviceName, roomJID.getNode(), nickname, userJID, XMPPServer.getInstance().getNodeID());
278         process(task); // On this cluster node.
279 
280         // On all other cluster nodes
281         if (PROPERTY_USE_NONBLOCKING_CLUSTERTASKS.getValue()) {
282             CacheFactory.doClusterTask(task);
283         } else {
284             CacheFactory.doSynchronousClusterTask(task, false);
285         }
286     }
287 
288     /**
289      * When an XMPP entity / user is kicked out of a room because of nickname collision, this event handler will ensure
290      * that this instance of OccupantManager, as well as all instances for the same server on every other cluster node,
291      * updates the relevant data that it maintains to perform post-cluster event maintenance.
292      *
293      * @param roomJID the JID of the room where the occupant is kicked out.
294      * @param nickname nickname that the user used in the room.
295      */
296     @Override
occupantNickKicked(JID roomJID, String nickname)297     public void occupantNickKicked(JID roomJID, String nickname) {
298         Log.debug("Informing nodes about kicking occupant with nick {} from room {} of service {}", nickname, roomJID.getNode(), serviceName);
299         final OccupantKickedForNicknameTask task = new OccupantKickedForNicknameTask(serviceName, roomJID.getNode(), nickname, XMPPServer.getInstance().getNodeID());
300         process(task); // On this cluster node.
301 
302         // On all other cluster nodes
303         if (PROPERTY_USE_NONBLOCKING_CLUSTERTASKS.getValue()) {
304             CacheFactory.doClusterTask(task);
305         } else {
306             CacheFactory.doSynchronousClusterTask(task, false);
307         }
308     }
309 
310     /**
311      * Updates the data maintained by this instance to perform post-cluster event maintenance, based on the data from
312      * the clustered task.
313      *
314      * @param task Cluster task that informs of a new occupant
315      */
process(@onnull final OccupantAddedTask task)316     public void process(@Nonnull final OccupantAddedTask task)
317     {
318 //        LocalTime start = LocalTime.now();
319         final Occupant newOccupant = new Occupant(task.getRoomName(), task.getNickname(), task.getRealJID());
320         replaceOccupant(null, newOccupant, task.getOriginator());
321 
322 //        logOccupantData("A new occupant was added on node " + task.getOriginator(), start, null);
323     }
324 
325     /**
326      * Updates the data maintained by this instance to perform post-cluster event maintenance, based on the data from
327      * the clustered task.
328      *
329      * @param task Cluster task that informs of an update for an existing occupant
330      */
process(@onnull final OccupantUpdatedTask task)331     public void process(@Nonnull final OccupantUpdatedTask task)
332     {
333 //        LocalTime start = LocalTime.now();
334         final Occupant oldOccupant = new Occupant(task.getRoomName(), task.getOldNickname(), task.getRealJID());
335         final Occupant newOccupant = new Occupant(task.getRoomName(), task.getNewNickname(), task.getRealJID());
336         replaceOccupant(oldOccupant, newOccupant, task.getOriginator());
337 
338 //        logOccupantData("An occupant was updated on node " + task.getOriginator(), start, null);
339     }
340 
341     /**
342      * Updates the data maintained by this instance to perform post-cluster event maintenance, based on the data from
343      * the clustered task.
344      *
345      * @param task Cluster task that informs of a removed occupant
346      */
process(@onnull final OccupantRemovedTask task)347     public void process(@Nonnull final OccupantRemovedTask task)
348     {
349         Log.debug("Processing task to remove occupant {} - {}", task.getRealJID(), task.getNickname());
350 
351 //        LocalTime start = LocalTime.now();
352         final Occupant oldOccupant = new Occupant(task.getRoomName(), task.getNickname(), task.getRealJID());
353         replaceOccupant(oldOccupant, null, task.getOriginator());
354 
355         Log.debug("Done processing task to remove occupant {} - {}", task.getRealJID(), task.getNickname());
356 //        logOccupantData("An occupant was removed on node " + task.getOriginator(), start, null);
357     }
358 
359     /**
360      * Updates the data maintained by this instance to perform post-cluster event maintenance, based on the data from
361      * the clustered task.
362      *
363      * @param task Cluster task that informs of an occupant nickname that has been kicked out of a room
364      */
process(@onnull final OccupantKickedForNicknameTask task)365     public void process(@Nonnull final OccupantKickedForNicknameTask task)
366     {
367         Log.debug("Processing task to remove everyone with nick {} from room {}", task.getNickname(), task.getRoomName());
368 
369 //        logOccupantData("Almost processing task to remove everyone with nick " + task.getNickname(), LocalTime.now(), null);
370         final Set<Occupant> occupantsToKick = occupantsByNode.values().stream()
371             .flatMap(Collection::stream)
372             .filter(o -> o.getNickname().equals(task.getNickname()))
373             .filter(o -> o.getRoomName().equals(task.getRoomName()))
374             .collect(Collectors.toSet());
375 
376         occupantsToKick.forEach(o -> replaceOccupant(o, null, null));
377 
378         Log.debug("Done processing task to remove everyone with nick {}", task.getNickname());
379     }
380 
381     /**
382      * Used by other nodes telling us about all of their occupants.
383      *
384      * @param task Cluster task that informs of occupants on a remote node.
385      */
process(@onnull final SyncLocalOccupantsAndSendJoinPresenceTask task)386     public void process(@Nonnull final SyncLocalOccupantsAndSendJoinPresenceTask task) {
387 
388         Set<Occupant> oldOccupants = occupantsByNode.get(task.getOriginator());
389 
390         Log.debug("We received a copy of {} local MUC occupants from node {}. We already had {} occupants in local registration for that node.", task.getOccupants().size(), task.getOriginator(), oldOccupants == null ? 0 : oldOccupants.size());
391 
392         if (oldOccupants != null) {
393             // Use defensive copy to prevent concurrent modification exceptions.
394             oldOccupants = new HashSet<>(oldOccupants);
395         }
396         if (oldOccupants != null) {
397             Log.debug("Removing {} old occupants", oldOccupants.size());
398             oldOccupants.forEach(oldOccupant -> replaceOccupant(oldOccupant, null, task.getOriginator()));
399         }
400         if (task.getOccupants() != null) {
401             Log.debug("Adding {} new occupants", task.getOccupants().size());
402             task.getOccupants().forEach(newOccupant -> replaceOccupant(null, newOccupant, task.getOriginator()));
403         }
404         if (oldOccupants != null) {
405             if (oldOccupants.equals(task.getOccupants())) {
406                 Log.info("We received a copy of local MUC occupants from node {}, but we already had this information. This hints at a possible inefficient sharing of data across the cluster.", task.getOriginator());
407             } else {
408                 Log.warn("We received a copy of local MUC occupants from node {}, but we already had occupants for this node. However, the new data is different from the old data!", task.getOriginator());
409             }
410         }
411 
412         final Set<Occupant> occupantsAfterSync = occupantsByNode.get(task.getOriginator());
413         Log.debug("After processing the sync occupants from node {}, we now have {} occupants in local registration for that node.", task.getOriginator(), occupantsAfterSync == null ? 0 : occupantsAfterSync.size());
414     }
415 
416     /**
417      * Returns the name of all the rooms that a particular XMPP entity (user) is currently an occupant of.
418      *
419      * @param realJID The XMPP address of a user
420      * @return All room names that have the user as an occupant.
421      */
422     @Nonnull
roomNamesForAddress(@onnull final JID realJID)423     public Set<String> roomNamesForAddress(@Nonnull final JID realJID) {
424         return nodesByOccupant.keySet().stream()
425             .filter(occupant -> realJID.equals(occupant.getRealJID()))
426             .map(occupant -> occupant.roomName)
427             .collect(Collectors.toSet());
428     }
429 
430     /**
431      * Returns data that is maintained for occupants of the local cluster node.
432      *
433      * @return all data maintained for the local cluster node.
434      */
435     @Nonnull
getLocalOccupants()436     public Set<Occupant> getLocalOccupants()
437     {
438         return occupantsByNode.getOrDefault(XMPPServer.getInstance().getNodeID(), Collections.emptySet());
439     }
440 
441     /**
442      * Registers activity for a particular user that is assumed to be connected to the local cluster node. This records
443      * a timestamp, that can be used to detect idle-ness.
444      *
445      * @param userJid The address of the user for which to record activity.
446      */
registerActivity(@onnull final JID userJid)447     public void registerActivity(@Nonnull final JID userJid) {
448 
449         // Only tracking it for the local cluster node, as those are the only users for which this node will monitor activity anyway
450         getLocalOccupants().stream()
451             .filter(occupant -> userJid.equals(occupant.getRealJID()))
452             .forEach(o -> o.setLastActive(Instant.now()));
453     }
454 
455     /**
456      * Returns the most recent activity for a particular user that is assumed to be connected to the local cluster node.
457      * This returns a timestamp, that can be used to detect idle-ness.
458      *
459      * @param userJid The address of the user for which to return a timestamp of last activity.
460      * @return A timestamp, or null when there currently is no occupant by that JID on the local node.
461      */
462     @Nullable
lastActivityOnLocalNode(@onnull final JID userJid)463     public Instant lastActivityOnLocalNode(@Nonnull final JID userJid) {
464         return getLocalOccupants().stream()
465             .filter(occupant -> userJid.equals(occupant.getRealJID()))
466             .map(Occupant::getLastActive)
467             .max(java.util.Comparator.naturalOrder())
468             .orElse(null);
469     }
470 
471     /**
472      * Counts all users that are in at least one room.
473      *
474      * @return a user count
475      */
numberOfUniqueUsers()476     public int numberOfUniqueUsers() {
477         return nodesByOccupant.size();
478     }
479 
480     /**
481      * Checks whether the occupant exists, optionally excluding a specific node from evaluation.
482      * @param occupant
483      * @param exclude
484      * @return
485      */
exists(@onnull final Occupant occupant, @Nullable final NodeID exclude)486     public boolean exists(@Nonnull final Occupant occupant, @Nullable final NodeID exclude) {
487         return nodesByOccupant.getOrDefault(occupant, new HashSet<>()).stream()
488             .anyMatch(nodeID -> !nodeID.equals(exclude));
489     }
490 
exists(@onnull final Occupant occupant)491     public boolean exists(@Nonnull final Occupant occupant) {
492         return nodesByOccupant.containsKey(occupant);
493     }
494 
495     @Nonnull
occupantsForRoomByNode(@onnull final String roomName, @Nonnull final NodeID nodeID)496     public Set<Occupant> occupantsForRoomByNode(@Nonnull final String roomName, @Nonnull final NodeID nodeID) {
497         return occupantsByNode.getOrDefault(nodeID, Collections.emptySet()).stream()
498             .filter(occupant -> occupant.getRoomName().equals(roomName))
499             .collect(Collectors.toSet());
500     }
501 
502     @Nonnull
occupantsForRoomExceptForNode(@onnull final String roomName, @Nonnull final NodeID nodeID)503     public Set<Occupant> occupantsForRoomExceptForNode(@Nonnull final String roomName, @Nonnull final NodeID nodeID) {
504         return occupantsByNode.entrySet().stream().filter(e -> !e.getKey().equals(nodeID))
505             .map(Map.Entry::getValue)
506             .flatMap(Collection::stream)
507             .filter(occupant -> occupant.getRoomName().equals(roomName))
508             .collect(Collectors.toSet());
509     }
510 
511     /**
512      * Removes and returns all data that was maintained for a particular cluster node. It is assumed that this method
513      * is used in reaction to that cluster node having left the cluster.
514      *
515      * @param nodeID Identifier of the cluster node that left.
516      * @return All data that this instance maintained for the cluster node.
517      */
518     @Nonnull
leftCluster(@onnull final NodeID nodeID)519     public Set<Occupant> leftCluster(@Nonnull final NodeID nodeID) {
520         Set<Occupant> occupantsBeingRemoved = occupantsByNode.getOrDefault(nodeID, new HashSet<>());
521 
522         // Defensive copy to prevent modifying the returned set
523         Set<Occupant> returnValue = new HashSet<>(occupantsBeingRemoved);
524 
525         returnValue.forEach(o -> replaceOccupant(o, null, nodeID));
526 
527         return returnValue;
528     }
529 
530     /**
531      * Removes and returns all data that was maintained for cluster nodes other than the local node. It is assumed that
532      * this method is used in reaction to the local cluster node having left the cluster.
533      *
534      * @return All data that this instance maintained for all cluster nodes except the local one.
535      */
536     @Nullable
leftCluster()537     public Set<Occupant> leftCluster() {
538 
539         final NodeID ownNodeID = XMPPServer.getInstance().getNodeID();
540 
541         synchronized (ownNodeID) {
542             final Set<Occupant> occupantsLeftOnThisNode = occupantsByNode.getOrDefault(ownNodeID, new HashSet<>());
543 
544 
545             final Set<Occupant> occupantsRemoved = occupantsByNode.entrySet().stream()
546                 .filter(e -> !e.getKey().equals(ownNodeID))
547                 .flatMap(e -> e.getValue().stream())
548                 .filter(o -> !occupantsLeftOnThisNode.contains(o))
549                 .collect(Collectors.toSet());
550 
551             // Now actually remove what needs to be removed
552             // TODO Somehow perform a lock or synchronize so no other thread can access the lookup tables while we are reorganising
553             occupantsByNode.entrySet().removeIf(e -> !e.getKey().equals(ownNodeID));
554 
555 
556             nodesByOccupant.clear();
557             occupantsLeftOnThisNode.forEach(o -> nodesByOccupant.computeIfAbsent(o, (n) -> new HashSet<>()).add(ownNodeID));
558 
559             Log.debug("Reset occupants because we left the cluster");
560 
561             return occupantsRemoved;
562         }
563     }
564 
565     @Nonnull
getOccupantsByNode()566     public Map<NodeID, Set<Occupant>> getOccupantsByNode() {
567         return Collections.unmodifiableMap(occupantsByNode);
568     }
569 
570     @Nonnull
getNodesByOccupant()571     public Map<Occupant, Set<NodeID>> getNodesByOccupant() {
572         return Collections.unmodifiableMap(nodesByOccupant);
573     }
574 
575     @Override
roomCreated(JID roomJID)576     public void roomCreated(JID roomJID) {
577         // Not used.
578     }
579 
580     @Override
roomDestroyed(JID roomJID)581     public void roomDestroyed(JID roomJID) {
582         // When a room is destroyed, remove all registered occupants for that room.
583         getNodesByOccupant().entrySet().stream()
584             .filter(entry -> entry.getKey().getRoomName().equals(roomJID.getNode()))
585             .forEach(entry -> entry.getValue().forEach(nodeID -> replaceOccupant(entry.getKey(), null, nodeID)));
586     }
587 
588     @Override
messageReceived(JID roomJID, JID user, String nickname, Message message)589     public void messageReceived(JID roomJID, JID user, String nickname, Message message) {
590         // Not used.
591     }
592 
593     @Override
privateMessageRecieved(JID toJID, JID fromJID, Message message)594     public void privateMessageRecieved(JID toJID, JID fromJID, Message message) {
595         // Not used.
596     }
597 
598     @Override
roomSubjectChanged(JID roomJID, JID user, String newSubject)599     public void roomSubjectChanged(JID roomJID, JID user, String newSubject) {
600         // Not used.
601     }
602 
603     /**
604      * Representation of a user that is an occupant of a chatroom.
605      */
606     public static class Occupant {
607         String roomName;
608         String nickname;
609         JID realJID;
610         Instant lastActive; // Only used on the local cluster node.
611         Instant lastPingRequest; // Only used on the local cluster node.
612         TimerTask pendingPingTask; // Only used on the local cluster node.
613 
Occupant(String roomName, String nickname, JID realJID)614         public Occupant(String roomName, String nickname, JID realJID) {
615             this.roomName = roomName;
616             this.nickname = nickname;
617             this.realJID = realJID;
618             this.lastActive = Instant.now();
619             this.lastPingRequest = null;
620             this.pendingPingTask = null;
621         }
622 
getRoomName()623         public String getRoomName() {
624             return roomName;
625         }
626 
setRoomName(String roomName)627         public void setRoomName(String roomName) {
628             this.roomName = roomName;
629         }
630 
getNickname()631         public String getNickname() {
632             return nickname;
633         }
634 
setNickname(String nickname)635         public void setNickname(String nickname) {
636             this.nickname = nickname;
637         }
638 
getRealJID()639         public JID getRealJID() {
640             return realJID;
641         }
642 
setRealJID(JID realJID)643         public void setRealJID(JID realJID) {
644             this.realJID = realJID;
645         }
646 
getLastActive()647         public Instant getLastActive() {
648             return lastActive;
649         }
650 
setLastActive(Instant lastActive)651         public void setLastActive(Instant lastActive) {
652             this.lastActive = lastActive;
653         }
654 
655         @Nullable
getLastPingRequest()656         public Instant getLastPingRequest() {
657             return lastPingRequest;
658         }
659 
660         @Nullable
getPendingPingTask()661         public TimerTask getPendingPingTask() {
662             return pendingPingTask;
663         }
664 
setPendingPingTask(@ullable TimerTask pendingPingTask)665         public void setPendingPingTask(@Nullable TimerTask pendingPingTask) {
666             this.pendingPingTask = pendingPingTask;
667             if (pendingPingTask != null) {
668                 this.lastPingRequest = Instant.now();
669             }
670         }
671 
672         @Override
equals(Object o)673         public boolean equals(Object o) {
674             if (this == o) return true;
675             if (o == null || getClass() != o.getClass()) return false;
676             Occupant that = (Occupant) o;
677             return Objects.equals(roomName, that.roomName) && Objects.equals(nickname, that.nickname) && Objects.equals(realJID, that.realJID);
678         }
679 
680         @Override
hashCode()681         public int hashCode() {
682             return Objects.hash(roomName, nickname, realJID);
683         }
684 
685         @Override
toString()686         public String toString() {
687             return "Occupant " +
688                 "'" + nickname + '\'' +
689                 " of room '" + roomName + '\'' +
690                 " (real JID '" + realJID +
691                 "', last active " + lastActive +
692                 ")";
693         }
694     }
695 
696     /**
697      * Logs key data about rooms and occupants.
698      * Use only when needed, it clouds up the log.
699      * @param reasonForLogging Will be logged to provide context for why this is being logged.
700      * @param start Start time of the operation for which this log is created.
701      * @param roomCache The room cache.
702      */
logOccupantData(String reasonForLogging, LocalTime start, Map<String, MUCRoom> roomCache)703     public void logOccupantData(String reasonForLogging, LocalTime start, Map<String, MUCRoom> roomCache) {
704         LocalTime end = LocalTime.now();
705         StackTraceElement[] elements = Thread.currentThread().getStackTrace();
706         List<String> callStack = Arrays.stream(elements).skip(2L).limit(10L).map(StackTraceElement::toString).collect(Collectors.toList());
707         Log.debug("================== OCCUPANT MANAGER DATA ==================");
708         Log.debug("===                         TIME                        ===");
709         Log.debug(
710             "=                   {} - {}                   =",
711             start.withNano(0).format(DateTimeFormatter.ISO_LOCAL_TIME),
712             end.withNano(0).format(DateTimeFormatter.ISO_LOCAL_TIME)
713         );
714         Log.debug("===                        REASON                       ===");
715         Log.debug("= {}", reasonForLogging);
716         Log.debug("===                  OCCUPANTS BY NODE                  ===");
717         Log.debug("=                        COUNT: {}", occupantsByNode.size());
718         occupantsByNode.forEach((key, value) -> {
719             Log.debug("= {}", key);
720             value.forEach(occupant -> Log.debug("= - {}", occupant));
721         });
722         Log.debug("===                  NODES BY OCCUPANT                  ===");
723         Log.debug("=                        COUNT: {}", nodesByOccupant.size());
724         nodesByOccupant.forEach((key, value) -> {
725             Log.debug("= {}", key);
726             value.forEach(nodeID -> Log.debug("= - {}", nodeID));
727         });
728         Log.debug("===                  ROOM CACHE CONTENTS                ===");
729         if (roomCache == null) {
730             Log.debug("===                     NOT SPECIFIED                   ===");
731         } else {
732             roomCache
733                 .values()
734                 .forEach(room -> Log.debug("= " + room.getName() + " ---> " + room
735                     .getOccupants()
736                     .stream()
737                     .map(MUCRole::getNickname)
738                     .collect(Collectors.joining("/"))
739                 ));
740         }
741         Log.debug("===                  TOP OF CALL STACK                  ===");
742         callStack.forEach(se -> Log.debug("= {}", se));
743         Log.debug("================ END OCCUPANT MANAGER DATA ================");
744     }
745 }
746