1 /* 2 * Copyright (C) 2021-2022 Ignite Realtime Community. All rights reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package org.jivesoftware.openfire.muc.spi; 17 18 import org.jivesoftware.openfire.XMPPServer; 19 import org.jivesoftware.openfire.cluster.NodeID; 20 import org.jivesoftware.openfire.muc.MUCEventListener; 21 import org.jivesoftware.openfire.muc.MUCRole; 22 import org.jivesoftware.openfire.muc.MUCRoom; 23 import org.jivesoftware.openfire.muc.MultiUserChatService; 24 import org.jivesoftware.openfire.muc.cluster.OccupantAddedTask; 25 import org.jivesoftware.openfire.muc.cluster.OccupantKickedForNicknameTask; 26 import org.jivesoftware.openfire.muc.cluster.OccupantRemovedTask; 27 import org.jivesoftware.openfire.muc.cluster.OccupantUpdatedTask; 28 import org.jivesoftware.openfire.muc.cluster.SyncLocalOccupantsAndSendJoinPresenceTask; 29 import org.jivesoftware.util.SystemProperty; 30 import org.jivesoftware.util.TaskEngine; 31 import org.jivesoftware.util.cache.CacheFactory; 32 import org.slf4j.Logger; 33 import org.slf4j.LoggerFactory; 34 import org.xmpp.packet.JID; 35 import org.xmpp.packet.Message; 36 37 import javax.annotation.Nonnull; 38 import javax.annotation.Nullable; 39 import java.time.Instant; 40 import java.time.LocalTime; 41 import java.time.format.DateTimeFormatter; 42 import java.util.*; 43 import java.util.concurrent.ConcurrentHashMap; 44 import java.util.concurrent.ConcurrentMap; 45 import java.util.stream.Collectors; 46 47 /** 48 * Maintains an in-memory inventory of what XMPP entity (user) is in what chatroom, across the entire XMPP cluster. 49 * 50 * Each instance of this class takes responsibility of maintaining the in-memory representation of occupants of rooms 51 * for exactly one instance of {@link org.jivesoftware.openfire.muc.MultiUserChatService}. 52 * 53 * Some data duplication exists between the data managed by this class, and the data that is managed by the collection 54 * of MUCRoom instances that are part of the same MUC service. The MUCRoom managed data is shared across the cluster 55 * using a clustered data structure (a clustered {@link org.jivesoftware.util.cache.Cache}). The content of such caches 56 * cannot survive certain events related to changes in the composition of the cluster (the local server joining or 57 * leaving the cluster). This introduces problems, as, for example, the occupants that are connected to the local server 58 * should see occupants connected to a cluster node that is now unavailable 'leave' the chatroom. This requires the 59 * local cluster node to retain knowledge, even after the clustered cache has been reset. This implementation therefore 60 * by design makes no use of such clustered caches to exchange data with other cluster nodes. Instead, this 61 * implementation relies on cluster tasks to share data between cluster nodes. To minimize data transfer and data 62 * duplication, the data managed by this implementation is kept to the bare minimum needed to perform post-cluster event 63 * maintenance. 64 * 65 * Apart from the responsibility of maintaining data for post-cluster event maintenance, this implementation adds some 66 * conveniences, that include: 67 * <ul> 68 * <li>A method (that operates on the maintained data) to determine what room names a particular user is in. As this 69 * implementation already maintains this data, obtaining it from this class is more convenient and more performant 70 * than obtaining it from the data that is maintained in the clustered data structure (as that is mapped 'by room')</li> 71 * <li>A 'last activity' timestamp for users on the local node (to be used to detect idle users</li> 72 * </ul> 73 * 74 * @author Guus der Kinderen, guus.der.kinderen@gmail.com 75 * @see <a href="https://igniterealtime.atlassian.net/browse/OF-2224">OF-2224</a> 76 */ 77 public class OccupantManager implements MUCEventListener 78 { 79 private static final Logger Log = LoggerFactory.getLogger(OccupantManager.class); 80 81 public static final SystemProperty<Boolean> PROPERTY_USE_NONBLOCKING_CLUSTERTASKS = SystemProperty.Builder.ofType(Boolean.class) 82 .setKey("xmpp.muc.occupants.clustertask.non-blocking") 83 .setDynamic(true) 84 .setDefaultValue(false) 85 .build(); 86 87 /**` 88 * Name of the MUC service that this instance is operating for. 89 */ 90 @Nonnull 91 private final String serviceName; 92 93 /**` 94 * The (domain-part of the) JID of the MUC service that this instance is operating for. 95 */ 96 @Nonnull 97 private final String serviceDomain; 98 99 /** 100 * Lookup table for finding occupants by node. 101 */ 102 @Nonnull 103 private final ConcurrentMap<NodeID, Set<Occupant>> occupantsByNode = new ConcurrentHashMap<>(); 104 105 /** 106 * Lookup table for finding nodes by occupant. 107 */ 108 @Nonnull 109 private final ConcurrentMap<Occupant, Set<NodeID>> nodesByOccupant = new ConcurrentHashMap<>(); 110 111 /** 112 * Creates a new instance, specific for the provided MUC service. 113 * 114 * @param service The service for which the new instance will be operating. 115 */ OccupantManager(@onnull final MultiUserChatService service)116 OccupantManager(@Nonnull final MultiUserChatService service) 117 { 118 this.serviceName = service.getServiceName(); 119 this.serviceDomain = service.getServiceDomain(); 120 Log.debug("Instantiating for service '{}'", serviceName); 121 } 122 123 /** 124 * Registers disappearance of an existing occupant, and/or appearance of a new occupant, on a specific node. 125 * 126 * This method maintains the three different occupant lookup tables, and keeps them in sync. 127 * 128 * @param oldOccupant An occupant that is to be removed from the registration of the referred node (nullable) 129 * @param newOccupant An occupant that is to be added to the registration of the referred node (nullable) 130 * @param nodeIDToReplaceOccupantFor The id of the node that the old/new occupant need to be (de)registered under. If null then the occupant is (de)registered for each node. 131 */ replaceOccupant(Occupant oldOccupant, Occupant newOccupant, NodeID nodeIDToReplaceOccupantFor)132 private void replaceOccupant(Occupant oldOccupant, Occupant newOccupant, NodeID nodeIDToReplaceOccupantFor) { 133 134 Set<NodeID> nodeIDsToReplaceOccupantFor = new HashSet<>(); 135 if (nodeIDToReplaceOccupantFor == null) { 136 nodeIDsToReplaceOccupantFor = occupantsByNode.keySet(); // all node ids 137 } else { 138 nodeIDsToReplaceOccupantFor.add(nodeIDToReplaceOccupantFor); // just the one 139 } 140 141 for (NodeID nodeID : nodeIDsToReplaceOccupantFor) { 142 synchronized (nodeID) { 143 // Step 1: remove old occupant, if there is any 144 deleteOccupantFromNode(oldOccupant, nodeID); 145 146 // Step 2: add new occupant, if there is any 147 if (newOccupant != null) { 148 occupantsByNode.computeIfAbsent(nodeID, (n) -> new HashSet<>()).add(newOccupant); 149 nodesByOccupant.computeIfAbsent(newOccupant, (n) -> new HashSet<>()).add(nodeID); 150 } 151 152 Log.debug("Replaced occupant {} with {} for node {}", oldOccupant, newOccupant, nodeID); 153 } 154 } 155 156 Log.debug("Occupants remaining after replace: {}", nodesByOccupant); 157 } 158 deleteOccupantFromNode(Occupant oldOccupant, NodeID nodeID)159 private void deleteOccupantFromNode(Occupant oldOccupant, NodeID nodeID) { 160 if (oldOccupant != null) { 161 if (occupantsByNode.containsKey(nodeID)) { 162 occupantsByNode.get(nodeID).remove(oldOccupant); 163 if (occupantsByNode.get(nodeID).isEmpty()) { 164 // Clean up, don't leave behind empty set 165 occupantsByNode.remove(nodeID); 166 } 167 } 168 if (nodesByOccupant.containsKey(oldOccupant)) { 169 nodesByOccupant.get(oldOccupant).remove(nodeID); 170 if (nodesByOccupant.get(oldOccupant).isEmpty()) { 171 // Clean up, don't leave behind empty set 172 nodesByOccupant.remove(oldOccupant); 173 } 174 } 175 176 // When an occupant is being pinged, but removed from the node, cancel the ping. 177 final TimerTask pendingPingTask = oldOccupant.getPendingPingTask(); 178 if (pendingPingTask != null) { 179 Log.debug("Remove pending ping task for {} that is being deleted.", oldOccupant); 180 TaskEngine.getInstance().cancelScheduledTask(pendingPingTask); 181 oldOccupant.setPendingPingTask(null); 182 } 183 } 184 } 185 186 /** 187 * Verifies that a JID relates to the service for which this instance is operating, by comparing its domain part. 188 * 189 * @param jid The JID to check 190 * @return True if the JID relates to the service, otherwise false. 191 */ isForThisService(@onnull final JID jid)192 public boolean isForThisService(@Nonnull final JID jid) { 193 return jid.getDomain().equals(serviceDomain); 194 } 195 196 /** 197 * When an XMPP entity / user is registered as a new occupant of a room, this event handler will ensure that this 198 * instance of OccupantManager, as well as all instances for the same server on every other cluster node, registers 199 * the relevant data that is needed to perform post-cluster event maintenance. 200 * 201 * @param roomJID the JID of the room where the occupant has joined. 202 * @param userJID The 'real' JID (not room-at-service-slash-nickname) of the XMPP user that joined. 203 * @param nickname nickname of the user in the room. 204 */ 205 @Override occupantJoined(JID roomJID, JID userJID, String nickname)206 public void occupantJoined(JID roomJID, JID userJID, String nickname) 207 { 208 if (!isForThisService(roomJID)) { 209 return; 210 } 211 212 final OccupantAddedTask task = registerOccupantJoinedLocally(roomJID, userJID, nickname); 213 214 // On all other cluster nodes 215 if (PROPERTY_USE_NONBLOCKING_CLUSTERTASKS.getValue()) { 216 CacheFactory.doClusterTask(task); 217 } else { 218 CacheFactory.doSynchronousClusterTask(task, false); 219 } 220 } 221 registerOccupantJoinedLocally(JID roomJID, JID userJID, String nickname)222 public OccupantAddedTask registerOccupantJoinedLocally(JID roomJID, JID userJID, String nickname) { 223 Log.debug("New local occupancy in room '{}' of service '{}': entity '{}' using nickname '{}'", roomJID.getNode(), serviceName, userJID, nickname); 224 225 final OccupantAddedTask task = new OccupantAddedTask(serviceName, roomJID.getNode(), nickname, userJID, XMPPServer.getInstance().getNodeID()); 226 process(task); // On this cluster node. 227 228 return task; 229 } 230 231 /** 232 * When an XMPP entity / user that is an occupant of a room changes its nickname, this event handler will ensure that 233 * the relevant data that is maintained in this instance of OccupantManager, as well as all instances for the same 234 * server on every other cluster node, is updated. 235 * 236 * @param roomJID the JID of the room where the occupant has joined. 237 * @param userJID The 'real' JID (not room-at-service-slash-nickname) of the XMPP user that joined. 238 * @param oldNickname nickname of the user in the room prior to the change. 239 * @param newNickname nickname of the user in the room after the change. 240 */ 241 @Override nicknameChanged(JID roomJID, JID userJID, String oldNickname, String newNickname)242 public void nicknameChanged(JID roomJID, JID userJID, String oldNickname, String newNickname) 243 { 244 if (!isForThisService(roomJID)) { 245 return; 246 } 247 248 Log.debug("Updated local occupancy in room '{}' of service '{}': entity '{}' now nickname '{}' (was: '{}')", roomJID.getNode(), serviceName, newNickname, oldNickname, userJID); 249 final OccupantUpdatedTask task = new OccupantUpdatedTask(serviceName, roomJID.getNode(), oldNickname, newNickname, userJID, XMPPServer.getInstance().getNodeID()); 250 process(task); // On this cluster node. 251 252 // On all other cluster nodes 253 if (PROPERTY_USE_NONBLOCKING_CLUSTERTASKS.getValue()) { 254 CacheFactory.doClusterTask(task); 255 } else { 256 CacheFactory.doSynchronousClusterTask(task, false); 257 } 258 } 259 260 /** 261 * When an XMPP entity / user is removed as an occupant of a room, this event handler will ensure that this 262 * instance of OccupantManager, as well as all instances for the same server on every other cluster node, updates 263 * the relevant data that it maintains to perform post-cluster event maintenance. 264 * 265 * @param roomJID the JID of the room where the occupant has left. 266 * @param userJID The 'real' JID (not room-at-service-slash-nickname) of the XMPP user that left. 267 * @param nickname nickname that the user used in the room. 268 */ 269 @Override occupantLeft(JID roomJID, JID userJID, String nickname)270 public void occupantLeft(JID roomJID, JID userJID, String nickname) 271 { 272 if (!isForThisService(roomJID)) { 273 return; 274 } 275 276 Log.debug("Removed local occupancy in room '{}' of service '{}': entity '{}' using nickname '{}'", roomJID.getNode(), serviceName, userJID, nickname); 277 final OccupantRemovedTask task = new OccupantRemovedTask(serviceName, roomJID.getNode(), nickname, userJID, XMPPServer.getInstance().getNodeID()); 278 process(task); // On this cluster node. 279 280 // On all other cluster nodes 281 if (PROPERTY_USE_NONBLOCKING_CLUSTERTASKS.getValue()) { 282 CacheFactory.doClusterTask(task); 283 } else { 284 CacheFactory.doSynchronousClusterTask(task, false); 285 } 286 } 287 288 /** 289 * When an XMPP entity / user is kicked out of a room because of nickname collision, this event handler will ensure 290 * that this instance of OccupantManager, as well as all instances for the same server on every other cluster node, 291 * updates the relevant data that it maintains to perform post-cluster event maintenance. 292 * 293 * @param roomJID the JID of the room where the occupant is kicked out. 294 * @param nickname nickname that the user used in the room. 295 */ 296 @Override occupantNickKicked(JID roomJID, String nickname)297 public void occupantNickKicked(JID roomJID, String nickname) { 298 Log.debug("Informing nodes about kicking occupant with nick {} from room {} of service {}", nickname, roomJID.getNode(), serviceName); 299 final OccupantKickedForNicknameTask task = new OccupantKickedForNicknameTask(serviceName, roomJID.getNode(), nickname, XMPPServer.getInstance().getNodeID()); 300 process(task); // On this cluster node. 301 302 // On all other cluster nodes 303 if (PROPERTY_USE_NONBLOCKING_CLUSTERTASKS.getValue()) { 304 CacheFactory.doClusterTask(task); 305 } else { 306 CacheFactory.doSynchronousClusterTask(task, false); 307 } 308 } 309 310 /** 311 * Updates the data maintained by this instance to perform post-cluster event maintenance, based on the data from 312 * the clustered task. 313 * 314 * @param task Cluster task that informs of a new occupant 315 */ process(@onnull final OccupantAddedTask task)316 public void process(@Nonnull final OccupantAddedTask task) 317 { 318 // LocalTime start = LocalTime.now(); 319 final Occupant newOccupant = new Occupant(task.getRoomName(), task.getNickname(), task.getRealJID()); 320 replaceOccupant(null, newOccupant, task.getOriginator()); 321 322 // logOccupantData("A new occupant was added on node " + task.getOriginator(), start, null); 323 } 324 325 /** 326 * Updates the data maintained by this instance to perform post-cluster event maintenance, based on the data from 327 * the clustered task. 328 * 329 * @param task Cluster task that informs of an update for an existing occupant 330 */ process(@onnull final OccupantUpdatedTask task)331 public void process(@Nonnull final OccupantUpdatedTask task) 332 { 333 // LocalTime start = LocalTime.now(); 334 final Occupant oldOccupant = new Occupant(task.getRoomName(), task.getOldNickname(), task.getRealJID()); 335 final Occupant newOccupant = new Occupant(task.getRoomName(), task.getNewNickname(), task.getRealJID()); 336 replaceOccupant(oldOccupant, newOccupant, task.getOriginator()); 337 338 // logOccupantData("An occupant was updated on node " + task.getOriginator(), start, null); 339 } 340 341 /** 342 * Updates the data maintained by this instance to perform post-cluster event maintenance, based on the data from 343 * the clustered task. 344 * 345 * @param task Cluster task that informs of a removed occupant 346 */ process(@onnull final OccupantRemovedTask task)347 public void process(@Nonnull final OccupantRemovedTask task) 348 { 349 Log.debug("Processing task to remove occupant {} - {}", task.getRealJID(), task.getNickname()); 350 351 // LocalTime start = LocalTime.now(); 352 final Occupant oldOccupant = new Occupant(task.getRoomName(), task.getNickname(), task.getRealJID()); 353 replaceOccupant(oldOccupant, null, task.getOriginator()); 354 355 Log.debug("Done processing task to remove occupant {} - {}", task.getRealJID(), task.getNickname()); 356 // logOccupantData("An occupant was removed on node " + task.getOriginator(), start, null); 357 } 358 359 /** 360 * Updates the data maintained by this instance to perform post-cluster event maintenance, based on the data from 361 * the clustered task. 362 * 363 * @param task Cluster task that informs of an occupant nickname that has been kicked out of a room 364 */ process(@onnull final OccupantKickedForNicknameTask task)365 public void process(@Nonnull final OccupantKickedForNicknameTask task) 366 { 367 Log.debug("Processing task to remove everyone with nick {} from room {}", task.getNickname(), task.getRoomName()); 368 369 // logOccupantData("Almost processing task to remove everyone with nick " + task.getNickname(), LocalTime.now(), null); 370 final Set<Occupant> occupantsToKick = occupantsByNode.values().stream() 371 .flatMap(Collection::stream) 372 .filter(o -> o.getNickname().equals(task.getNickname())) 373 .filter(o -> o.getRoomName().equals(task.getRoomName())) 374 .collect(Collectors.toSet()); 375 376 occupantsToKick.forEach(o -> replaceOccupant(o, null, null)); 377 378 Log.debug("Done processing task to remove everyone with nick {}", task.getNickname()); 379 } 380 381 /** 382 * Used by other nodes telling us about all of their occupants. 383 * 384 * @param task Cluster task that informs of occupants on a remote node. 385 */ process(@onnull final SyncLocalOccupantsAndSendJoinPresenceTask task)386 public void process(@Nonnull final SyncLocalOccupantsAndSendJoinPresenceTask task) { 387 388 Set<Occupant> oldOccupants = occupantsByNode.get(task.getOriginator()); 389 390 Log.debug("We received a copy of {} local MUC occupants from node {}. We already had {} occupants in local registration for that node.", task.getOccupants().size(), task.getOriginator(), oldOccupants == null ? 0 : oldOccupants.size()); 391 392 if (oldOccupants != null) { 393 // Use defensive copy to prevent concurrent modification exceptions. 394 oldOccupants = new HashSet<>(oldOccupants); 395 } 396 if (oldOccupants != null) { 397 Log.debug("Removing {} old occupants", oldOccupants.size()); 398 oldOccupants.forEach(oldOccupant -> replaceOccupant(oldOccupant, null, task.getOriginator())); 399 } 400 if (task.getOccupants() != null) { 401 Log.debug("Adding {} new occupants", task.getOccupants().size()); 402 task.getOccupants().forEach(newOccupant -> replaceOccupant(null, newOccupant, task.getOriginator())); 403 } 404 if (oldOccupants != null) { 405 if (oldOccupants.equals(task.getOccupants())) { 406 Log.info("We received a copy of local MUC occupants from node {}, but we already had this information. This hints at a possible inefficient sharing of data across the cluster.", task.getOriginator()); 407 } else { 408 Log.warn("We received a copy of local MUC occupants from node {}, but we already had occupants for this node. However, the new data is different from the old data!", task.getOriginator()); 409 } 410 } 411 412 final Set<Occupant> occupantsAfterSync = occupantsByNode.get(task.getOriginator()); 413 Log.debug("After processing the sync occupants from node {}, we now have {} occupants in local registration for that node.", task.getOriginator(), occupantsAfterSync == null ? 0 : occupantsAfterSync.size()); 414 } 415 416 /** 417 * Returns the name of all the rooms that a particular XMPP entity (user) is currently an occupant of. 418 * 419 * @param realJID The XMPP address of a user 420 * @return All room names that have the user as an occupant. 421 */ 422 @Nonnull roomNamesForAddress(@onnull final JID realJID)423 public Set<String> roomNamesForAddress(@Nonnull final JID realJID) { 424 return nodesByOccupant.keySet().stream() 425 .filter(occupant -> realJID.equals(occupant.getRealJID())) 426 .map(occupant -> occupant.roomName) 427 .collect(Collectors.toSet()); 428 } 429 430 /** 431 * Returns data that is maintained for occupants of the local cluster node. 432 * 433 * @return all data maintained for the local cluster node. 434 */ 435 @Nonnull getLocalOccupants()436 public Set<Occupant> getLocalOccupants() 437 { 438 return occupantsByNode.getOrDefault(XMPPServer.getInstance().getNodeID(), Collections.emptySet()); 439 } 440 441 /** 442 * Registers activity for a particular user that is assumed to be connected to the local cluster node. This records 443 * a timestamp, that can be used to detect idle-ness. 444 * 445 * @param userJid The address of the user for which to record activity. 446 */ registerActivity(@onnull final JID userJid)447 public void registerActivity(@Nonnull final JID userJid) { 448 449 // Only tracking it for the local cluster node, as those are the only users for which this node will monitor activity anyway 450 getLocalOccupants().stream() 451 .filter(occupant -> userJid.equals(occupant.getRealJID())) 452 .forEach(o -> o.setLastActive(Instant.now())); 453 } 454 455 /** 456 * Returns the most recent activity for a particular user that is assumed to be connected to the local cluster node. 457 * This returns a timestamp, that can be used to detect idle-ness. 458 * 459 * @param userJid The address of the user for which to return a timestamp of last activity. 460 * @return A timestamp, or null when there currently is no occupant by that JID on the local node. 461 */ 462 @Nullable lastActivityOnLocalNode(@onnull final JID userJid)463 public Instant lastActivityOnLocalNode(@Nonnull final JID userJid) { 464 return getLocalOccupants().stream() 465 .filter(occupant -> userJid.equals(occupant.getRealJID())) 466 .map(Occupant::getLastActive) 467 .max(java.util.Comparator.naturalOrder()) 468 .orElse(null); 469 } 470 471 /** 472 * Counts all users that are in at least one room. 473 * 474 * @return a user count 475 */ numberOfUniqueUsers()476 public int numberOfUniqueUsers() { 477 return nodesByOccupant.size(); 478 } 479 480 /** 481 * Checks whether the occupant exists, optionally excluding a specific node from evaluation. 482 * @param occupant 483 * @param exclude 484 * @return 485 */ exists(@onnull final Occupant occupant, @Nullable final NodeID exclude)486 public boolean exists(@Nonnull final Occupant occupant, @Nullable final NodeID exclude) { 487 return nodesByOccupant.getOrDefault(occupant, new HashSet<>()).stream() 488 .anyMatch(nodeID -> !nodeID.equals(exclude)); 489 } 490 exists(@onnull final Occupant occupant)491 public boolean exists(@Nonnull final Occupant occupant) { 492 return nodesByOccupant.containsKey(occupant); 493 } 494 495 @Nonnull occupantsForRoomByNode(@onnull final String roomName, @Nonnull final NodeID nodeID)496 public Set<Occupant> occupantsForRoomByNode(@Nonnull final String roomName, @Nonnull final NodeID nodeID) { 497 return occupantsByNode.getOrDefault(nodeID, Collections.emptySet()).stream() 498 .filter(occupant -> occupant.getRoomName().equals(roomName)) 499 .collect(Collectors.toSet()); 500 } 501 502 @Nonnull occupantsForRoomExceptForNode(@onnull final String roomName, @Nonnull final NodeID nodeID)503 public Set<Occupant> occupantsForRoomExceptForNode(@Nonnull final String roomName, @Nonnull final NodeID nodeID) { 504 return occupantsByNode.entrySet().stream().filter(e -> !e.getKey().equals(nodeID)) 505 .map(Map.Entry::getValue) 506 .flatMap(Collection::stream) 507 .filter(occupant -> occupant.getRoomName().equals(roomName)) 508 .collect(Collectors.toSet()); 509 } 510 511 /** 512 * Removes and returns all data that was maintained for a particular cluster node. It is assumed that this method 513 * is used in reaction to that cluster node having left the cluster. 514 * 515 * @param nodeID Identifier of the cluster node that left. 516 * @return All data that this instance maintained for the cluster node. 517 */ 518 @Nonnull leftCluster(@onnull final NodeID nodeID)519 public Set<Occupant> leftCluster(@Nonnull final NodeID nodeID) { 520 Set<Occupant> occupantsBeingRemoved = occupantsByNode.getOrDefault(nodeID, new HashSet<>()); 521 522 // Defensive copy to prevent modifying the returned set 523 Set<Occupant> returnValue = new HashSet<>(occupantsBeingRemoved); 524 525 returnValue.forEach(o -> replaceOccupant(o, null, nodeID)); 526 527 return returnValue; 528 } 529 530 /** 531 * Removes and returns all data that was maintained for cluster nodes other than the local node. It is assumed that 532 * this method is used in reaction to the local cluster node having left the cluster. 533 * 534 * @return All data that this instance maintained for all cluster nodes except the local one. 535 */ 536 @Nullable leftCluster()537 public Set<Occupant> leftCluster() { 538 539 final NodeID ownNodeID = XMPPServer.getInstance().getNodeID(); 540 541 synchronized (ownNodeID) { 542 final Set<Occupant> occupantsLeftOnThisNode = occupantsByNode.getOrDefault(ownNodeID, new HashSet<>()); 543 544 545 final Set<Occupant> occupantsRemoved = occupantsByNode.entrySet().stream() 546 .filter(e -> !e.getKey().equals(ownNodeID)) 547 .flatMap(e -> e.getValue().stream()) 548 .filter(o -> !occupantsLeftOnThisNode.contains(o)) 549 .collect(Collectors.toSet()); 550 551 // Now actually remove what needs to be removed 552 // TODO Somehow perform a lock or synchronize so no other thread can access the lookup tables while we are reorganising 553 occupantsByNode.entrySet().removeIf(e -> !e.getKey().equals(ownNodeID)); 554 555 556 nodesByOccupant.clear(); 557 occupantsLeftOnThisNode.forEach(o -> nodesByOccupant.computeIfAbsent(o, (n) -> new HashSet<>()).add(ownNodeID)); 558 559 Log.debug("Reset occupants because we left the cluster"); 560 561 return occupantsRemoved; 562 } 563 } 564 565 @Nonnull getOccupantsByNode()566 public Map<NodeID, Set<Occupant>> getOccupantsByNode() { 567 return Collections.unmodifiableMap(occupantsByNode); 568 } 569 570 @Nonnull getNodesByOccupant()571 public Map<Occupant, Set<NodeID>> getNodesByOccupant() { 572 return Collections.unmodifiableMap(nodesByOccupant); 573 } 574 575 @Override roomCreated(JID roomJID)576 public void roomCreated(JID roomJID) { 577 // Not used. 578 } 579 580 @Override roomDestroyed(JID roomJID)581 public void roomDestroyed(JID roomJID) { 582 // When a room is destroyed, remove all registered occupants for that room. 583 getNodesByOccupant().entrySet().stream() 584 .filter(entry -> entry.getKey().getRoomName().equals(roomJID.getNode())) 585 .forEach(entry -> entry.getValue().forEach(nodeID -> replaceOccupant(entry.getKey(), null, nodeID))); 586 } 587 588 @Override messageReceived(JID roomJID, JID user, String nickname, Message message)589 public void messageReceived(JID roomJID, JID user, String nickname, Message message) { 590 // Not used. 591 } 592 593 @Override privateMessageRecieved(JID toJID, JID fromJID, Message message)594 public void privateMessageRecieved(JID toJID, JID fromJID, Message message) { 595 // Not used. 596 } 597 598 @Override roomSubjectChanged(JID roomJID, JID user, String newSubject)599 public void roomSubjectChanged(JID roomJID, JID user, String newSubject) { 600 // Not used. 601 } 602 603 /** 604 * Representation of a user that is an occupant of a chatroom. 605 */ 606 public static class Occupant { 607 String roomName; 608 String nickname; 609 JID realJID; 610 Instant lastActive; // Only used on the local cluster node. 611 Instant lastPingRequest; // Only used on the local cluster node. 612 TimerTask pendingPingTask; // Only used on the local cluster node. 613 Occupant(String roomName, String nickname, JID realJID)614 public Occupant(String roomName, String nickname, JID realJID) { 615 this.roomName = roomName; 616 this.nickname = nickname; 617 this.realJID = realJID; 618 this.lastActive = Instant.now(); 619 this.lastPingRequest = null; 620 this.pendingPingTask = null; 621 } 622 getRoomName()623 public String getRoomName() { 624 return roomName; 625 } 626 setRoomName(String roomName)627 public void setRoomName(String roomName) { 628 this.roomName = roomName; 629 } 630 getNickname()631 public String getNickname() { 632 return nickname; 633 } 634 setNickname(String nickname)635 public void setNickname(String nickname) { 636 this.nickname = nickname; 637 } 638 getRealJID()639 public JID getRealJID() { 640 return realJID; 641 } 642 setRealJID(JID realJID)643 public void setRealJID(JID realJID) { 644 this.realJID = realJID; 645 } 646 getLastActive()647 public Instant getLastActive() { 648 return lastActive; 649 } 650 setLastActive(Instant lastActive)651 public void setLastActive(Instant lastActive) { 652 this.lastActive = lastActive; 653 } 654 655 @Nullable getLastPingRequest()656 public Instant getLastPingRequest() { 657 return lastPingRequest; 658 } 659 660 @Nullable getPendingPingTask()661 public TimerTask getPendingPingTask() { 662 return pendingPingTask; 663 } 664 setPendingPingTask(@ullable TimerTask pendingPingTask)665 public void setPendingPingTask(@Nullable TimerTask pendingPingTask) { 666 this.pendingPingTask = pendingPingTask; 667 if (pendingPingTask != null) { 668 this.lastPingRequest = Instant.now(); 669 } 670 } 671 672 @Override equals(Object o)673 public boolean equals(Object o) { 674 if (this == o) return true; 675 if (o == null || getClass() != o.getClass()) return false; 676 Occupant that = (Occupant) o; 677 return Objects.equals(roomName, that.roomName) && Objects.equals(nickname, that.nickname) && Objects.equals(realJID, that.realJID); 678 } 679 680 @Override hashCode()681 public int hashCode() { 682 return Objects.hash(roomName, nickname, realJID); 683 } 684 685 @Override toString()686 public String toString() { 687 return "Occupant " + 688 "'" + nickname + '\'' + 689 " of room '" + roomName + '\'' + 690 " (real JID '" + realJID + 691 "', last active " + lastActive + 692 ")"; 693 } 694 } 695 696 /** 697 * Logs key data about rooms and occupants. 698 * Use only when needed, it clouds up the log. 699 * @param reasonForLogging Will be logged to provide context for why this is being logged. 700 * @param start Start time of the operation for which this log is created. 701 * @param roomCache The room cache. 702 */ logOccupantData(String reasonForLogging, LocalTime start, Map<String, MUCRoom> roomCache)703 public void logOccupantData(String reasonForLogging, LocalTime start, Map<String, MUCRoom> roomCache) { 704 LocalTime end = LocalTime.now(); 705 StackTraceElement[] elements = Thread.currentThread().getStackTrace(); 706 List<String> callStack = Arrays.stream(elements).skip(2L).limit(10L).map(StackTraceElement::toString).collect(Collectors.toList()); 707 Log.debug("================== OCCUPANT MANAGER DATA =================="); 708 Log.debug("=== TIME ==="); 709 Log.debug( 710 "= {} - {} =", 711 start.withNano(0).format(DateTimeFormatter.ISO_LOCAL_TIME), 712 end.withNano(0).format(DateTimeFormatter.ISO_LOCAL_TIME) 713 ); 714 Log.debug("=== REASON ==="); 715 Log.debug("= {}", reasonForLogging); 716 Log.debug("=== OCCUPANTS BY NODE ==="); 717 Log.debug("= COUNT: {}", occupantsByNode.size()); 718 occupantsByNode.forEach((key, value) -> { 719 Log.debug("= {}", key); 720 value.forEach(occupant -> Log.debug("= - {}", occupant)); 721 }); 722 Log.debug("=== NODES BY OCCUPANT ==="); 723 Log.debug("= COUNT: {}", nodesByOccupant.size()); 724 nodesByOccupant.forEach((key, value) -> { 725 Log.debug("= {}", key); 726 value.forEach(nodeID -> Log.debug("= - {}", nodeID)); 727 }); 728 Log.debug("=== ROOM CACHE CONTENTS ==="); 729 if (roomCache == null) { 730 Log.debug("=== NOT SPECIFIED ==="); 731 } else { 732 roomCache 733 .values() 734 .forEach(room -> Log.debug("= " + room.getName() + " ---> " + room 735 .getOccupants() 736 .stream() 737 .map(MUCRole::getNickname) 738 .collect(Collectors.joining("/")) 739 )); 740 } 741 Log.debug("=== TOP OF CALL STACK ==="); 742 callStack.forEach(se -> Log.debug("= {}", se)); 743 Log.debug("================ END OCCUPANT MANAGER DATA ================"); 744 } 745 } 746