1 /* 2 * Copyright @ 2015 - Present, 8x8 Inc 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package org.jitsi.videobridge; 17 18 import edu.umd.cs.findbugs.annotations.*; 19 import org.jetbrains.annotations.*; 20 import org.jetbrains.annotations.Nullable; 21 import org.jitsi.eventadmin.*; 22 import org.jitsi.nlj.*; 23 import org.jitsi.rtp.*; 24 import org.jitsi.rtp.rtcp.rtcpfb.payload_specific_fb.*; 25 import org.jitsi.rtp.rtp.*; 26 import org.jitsi.utils.collections.*; 27 import org.jitsi.utils.event.*; 28 import org.jitsi.utils.logging.DiagnosticContext; 29 import org.jitsi.utils.logging2.*; 30 import org.jitsi.utils.logging2.Logger; 31 import org.jitsi.utils.logging2.LoggerImpl; 32 import org.jitsi.videobridge.octo.*; 33 import org.jitsi.videobridge.shim.*; 34 import org.jitsi.videobridge.util.*; 35 import org.jitsi.xmpp.extensions.colibri.*; 36 import org.json.simple.*; 37 import org.jxmpp.jid.*; 38 import org.jxmpp.jid.parts.*; 39 import org.osgi.framework.*; 40 41 import java.beans.*; 42 import java.io.*; 43 import java.lang.*; 44 import java.lang.SuppressWarnings; 45 import java.util.*; 46 import java.util.concurrent.*; 47 import java.util.concurrent.atomic.*; 48 import java.util.logging.*; 49 50 import static org.jitsi.utils.collections.JMap.entry; 51 import static org.jitsi.videobridge.EndpointMessageBuilder.*; 52 53 /** 54 * Represents a conference in the terms of Jitsi Videobridge. 55 * 56 * @author Lyubomir Marinov 57 * @author Boris Grozev 58 * @author Hristo Terezov 59 * @author George Politis 60 */ 61 public class Conference 62 extends PropertyChangeNotifier 63 implements PropertyChangeListener, 64 Expireable, 65 AbstractEndpointMessageTransport.EndpointMessageTransportEventHandler 66 { 67 /** 68 * The endpoints participating in this {@link Conference}. Although it's a 69 * {@link ConcurrentHashMap}, writing to it must be protected by 70 * synchronizing on the map itself, because it must be kept in sync with 71 * {@link #endpointsCache}. 72 */ 73 private final ConcurrentHashMap<String, AbstractEndpoint> endpoints 74 = new ConcurrentHashMap<>(); 75 76 /** 77 * A read-only cache of the endpoints in this conference. Note that it 78 * contains only the {@link Endpoint} instances (and not Octo endpoints). 79 * This is because the cache was introduced for performance reasons only 80 * (we iterate over it for each RTP packet) and the Octo endpoints are not 81 * needed. 82 */ 83 private List<Endpoint> endpointsCache = Collections.emptyList(); 84 85 private final Object endpointsCacheLock = new Object(); 86 87 /** 88 * The {@link EventAdmin} instance (to be) used by this {@code Conference} 89 * and all instances (of {@code Content}, {@code Channel}, etc.) created by 90 * it. 91 */ 92 private final EventAdmin eventAdmin; 93 94 /** 95 * The indicator which determines whether {@link #expire()} has been called 96 * on this <tt>Conference</tt>. 97 */ 98 @SuppressFBWarnings( 99 value = "IS2_INCONSISTENT_SYNC", 100 justification = "The value is deemed safe to read without " + 101 "synchronization.") 102 private boolean expired = false; 103 104 /** 105 * The JID of the conference focus who has initialized this instance and 106 * from whom requests to manage this instance must come or they will be 107 * ignored. If <tt>null</tt> value is assigned we don't care who modifies 108 * the conference. 109 */ 110 private final Jid focus; 111 112 /** 113 * The (unique) identifier/ID of this instance. 114 */ 115 private final String id; 116 117 /** 118 * The "global" id of this conference, set by the controller (e.g. jicofo) 119 * as opposed to the bridge. This defaults to {@code null} unless it is 120 * specified. 121 */ 122 private final String gid; 123 124 /** 125 * The world readable name of this instance if any. 126 */ 127 private Localpart name; 128 129 /** 130 * The time in milliseconds of the last activity related to this 131 * <tt>Conference</tt>. In the time interval between the last activity and 132 * now, this <tt>Conference</tt> is considered inactive. 133 */ 134 @SuppressFBWarnings( 135 value = "IS2_INCONSISTENT_SYNC", 136 justification = "The value is deemed safe to read without " + 137 "synchronization.") 138 private long lastActivityTime; 139 140 /** 141 * If {@link #focus} is <tt>null</tt> the value of the last known focus is 142 * stored in this member. 143 */ 144 private Jid lastKnownFocus; 145 146 /** 147 * The <tt>PropertyChangeListener</tt> which listens to 148 * <tt>PropertyChangeEvent</tt>s on behalf of this instance while 149 * referencing it by a <tt>WeakReference</tt>. 150 */ 151 private final PropertyChangeListener propertyChangeListener 152 = new WeakReferencePropertyChangeListener(this); 153 154 /** 155 * The speech activity (representation) of the <tt>Endpoint</tt>s of this 156 * <tt>Conference</tt>. 157 */ 158 private final ConferenceSpeechActivity speechActivity; 159 160 /** 161 * The audio level listener. 162 */ 163 private final AudioLevelListener audioLevelListener; 164 165 /** 166 * The <tt>Videobridge</tt> which has initialized this <tt>Conference</tt>. 167 */ 168 private final Videobridge videobridge; 169 170 /** 171 * Holds conference statistics. 172 */ 173 private final Statistics statistics = new Statistics(); 174 175 /** 176 * The {@link Logger} to be used by this instance to print debug 177 * information. 178 */ 179 private final Logger logger; 180 181 182 /** 183 * Whether this conference should be considered when generating statistics. 184 */ 185 private final boolean includeInStatistics; 186 187 /** 188 * The time when this {@link Conference} was created. 189 */ 190 private final long creationTime = System.currentTimeMillis(); 191 192 /** 193 * The {@link ExpireableImpl} which we use to safely expire this conference. 194 */ 195 private final ExpireableImpl expireableImpl; 196 197 /** 198 * The shim which handles Colibri-related logic for this conference. 199 */ 200 private final ConferenceShim shim; 201 202 //TODO not public 203 final public EncodingsManager encodingsManager = new EncodingsManager(); 204 205 /** 206 * This {@link Conference}'s link to Octo. 207 */ 208 private OctoTentacle tentacle; 209 210 /** 211 * Initializes a new <tt>Conference</tt> instance which is to represent a 212 * conference in the terms of Jitsi Videobridge which has a specific 213 * (unique) ID and is managed by a conference focus with a specific JID. 214 * 215 * @param videobridge the <tt>Videobridge</tt> on which the new 216 * <tt>Conference</tt> instance is to be initialized 217 * @param id the (unique) ID of the new instance to be initialized 218 * @param focus the JID of the conference focus who has requested the 219 * initialization of the new instance and from whom further/future requests 220 * to manage the new instance must come or they will be ignored. 221 * Pass <tt>null</tt> to override this safety check. 222 * @param name world readable name of this instance if any. 223 * @param enableLogging whether logging should be enabled for this 224 * {@link Conference} and its sub-components, and whether this conference 225 * should be considered when generating statistics. 226 * @param gid the optional "global" id of the conference. 227 */ Conference(Videobridge videobridge, String id, Jid focus, Localpart name, boolean enableLogging, String gid)228 public Conference(Videobridge videobridge, 229 String id, 230 Jid focus, 231 Localpart name, 232 boolean enableLogging, 233 String gid) 234 { 235 this.videobridge = Objects.requireNonNull(videobridge, "videobridge"); 236 Level minLevel = enableLogging ? Level.ALL : Level.WARNING; 237 Map<String, String> context = JMap.ofEntries( 238 entry("confId", id) 239 ); 240 if (gid != null) 241 { 242 context.put("gid", gid); 243 } 244 if (name != null) 245 { 246 context.put("conf_name", name.toString()); 247 } 248 logger = new LoggerImpl(Conference.class.getName(), minLevel, new LogContext(context)); 249 this.shim = new ConferenceShim(this, logger); 250 this.id = Objects.requireNonNull(id, "id"); 251 this.gid = gid; 252 this.focus = focus; 253 this.eventAdmin = enableLogging ? videobridge.getEventAdmin() : null; 254 this.includeInStatistics = enableLogging; 255 this.name = name; 256 257 lastKnownFocus = focus; 258 259 speechActivity = new ConferenceSpeechActivity(this); 260 audioLevelListener 261 = (sourceSsrc, level) 262 -> speechActivity.levelChanged(sourceSsrc, (int) level); 263 264 expireableImpl = new ExpireableImpl(logger, this::expire); 265 266 if (enableLogging) 267 { 268 eventAdmin.sendEvent(EventFactory.conferenceCreated(this)); 269 Videobridge.Statistics videobridgeStatistics 270 = videobridge.getStatistics(); 271 videobridgeStatistics.totalConferencesCreated.incrementAndGet(); 272 } 273 274 // We listen to our own events so we have a centralized place to handle 275 // certain things (e.g. anytime the endpoints list changes) 276 addPropertyChangeListener(propertyChangeListener); 277 278 touch(); 279 } 280 281 /** 282 * Creates a new diagnostic context instance that includes the conference 283 * name and the conference creation time. 284 * 285 * @return the new {@link DiagnosticContext} instance. 286 */ newDiagnosticContext()287 public DiagnosticContext newDiagnosticContext() 288 { 289 290 291 if (name != null) 292 { 293 DiagnosticContext diagnosticContext = new DiagnosticContext(); 294 diagnosticContext.put("conf_name", name.toString()); 295 diagnosticContext.put("conf_creation_time_ms", creationTime); 296 return diagnosticContext; 297 } 298 else 299 { 300 return new NoOpDiagnosticContext(); 301 } 302 } 303 304 /** 305 * Gets the statistics of this {@link Conference}. 306 * 307 * @return the statistics of this {@link Conference}. 308 */ getStatistics()309 public Statistics getStatistics() 310 { 311 return statistics; 312 } 313 314 /** 315 * @return whether this conference should be included in generated 316 * statistics. 317 */ includeInStatistics()318 public boolean includeInStatistics() 319 { 320 return includeInStatistics; 321 } 322 323 /** 324 * Sends a message to a subset of endpoints in the call, primary use 325 * case being a message that has originated from an endpoint (as opposed to 326 * a message originating from the bridge and being sent to all endpoints in 327 * the call, for that see {@link #broadcastMessage(String)}. 328 * 329 * @param msg the message to be sent 330 * @param endpoints the list of <tt>Endpoint</tt>s to which the message will 331 * be sent. 332 */ sendMessage( String msg, List<AbstractEndpoint> endpoints, boolean sendToOcto)333 public void sendMessage( 334 String msg, 335 List<AbstractEndpoint> endpoints, 336 boolean sendToOcto) 337 { 338 for (AbstractEndpoint endpoint : endpoints) 339 { 340 try 341 { 342 endpoint.sendMessage(msg); 343 } 344 catch (IOException e) 345 { 346 logger.error( 347 "Failed to send message on data channel to: " 348 + endpoint.getID() + ", msg: " + msg, e); 349 } 350 } 351 352 if (sendToOcto && tentacle != null) 353 { 354 tentacle.sendMessage(msg); 355 } 356 } 357 358 /** 359 * Used to send a message to a subset of endpoints in the call, primary use 360 * case being a message that has originated from an endpoint (as opposed to 361 * a message originating from the bridge and being sent to all endpoints in 362 * the call, for that see {@link #broadcastMessage(String)}. 363 * 364 * @param msg the message to be sent 365 * @param endpoints the list of <tt>Endpoint</tt>s to which the message will 366 * be sent. 367 */ sendMessage(String msg, List<AbstractEndpoint> endpoints)368 public void sendMessage(String msg, List<AbstractEndpoint> endpoints) 369 { 370 sendMessage(msg, endpoints, false); 371 } 372 373 /** 374 * Broadcasts a string message to all endpoints of the conference. 375 * 376 * @param msg the message to be broadcast. 377 */ broadcastMessage(String msg, boolean sendToOcto)378 public void broadcastMessage(String msg, boolean sendToOcto) 379 { 380 sendMessage(msg, getEndpoints(), sendToOcto); 381 } 382 383 /** 384 * Broadcasts a string message to all endpoints of the conference. 385 * 386 * @param msg the message to be broadcast. 387 */ broadcastMessage(String msg)388 public void broadcastMessage(String msg) 389 { 390 broadcastMessage(msg, false); 391 } 392 393 /** 394 * Requests a keyframe from the endpoint with the specified id, if the 395 * endpoint is found in the conference. 396 * 397 * @param endpointID the id of the endpoint to request a keyframe from. 398 */ requestKeyframe(String endpointID, long mediaSsrc)399 public void requestKeyframe(String endpointID, long mediaSsrc) 400 { 401 AbstractEndpoint remoteEndpoint = getEndpoint(endpointID); 402 403 if (remoteEndpoint != null) 404 { 405 remoteEndpoint.requestKeyframe(mediaSsrc); 406 } 407 else if (logger.isDebugEnabled()) 408 { 409 logger.debug( 410 "Cannot request keyframe because the endpoint was not found."); 411 } 412 } 413 /** 414 * Sets the values of the properties of a specific 415 * <tt>ColibriConferenceIQ</tt> to the values of the respective 416 * properties of this instance. Thus, the specified <tt>iq</tt> may be 417 * thought of as a description of this instance. 418 * <p> 419 * <b>Note</b>: The copying of the values is shallow i.e. the 420 * <tt>Content</tt>s of this instance are not described in the specified 421 * <tt>iq</tt>. 422 * </p> 423 * 424 * @param iq the <tt>ColibriConferenceIQ</tt> to set the values of the 425 * properties of this instance on 426 */ describeShallow(ColibriConferenceIQ iq)427 public void describeShallow(ColibriConferenceIQ iq) 428 { 429 iq.setID(getID()); 430 iq.setName(getName()); 431 } 432 433 /** 434 * Notifies this instance that {@link #speechActivity} has identified a 435 * speaker switch event in this multipoint conference and there is now a new 436 * dominant speaker. 437 */ dominantSpeakerChanged()438 void dominantSpeakerChanged() 439 { 440 AbstractEndpoint dominantSpeaker = speechActivity.getDominantEndpoint(); 441 442 if (logger.isInfoEnabled()) 443 { 444 String id 445 = dominantSpeaker == null ? "null" : dominantSpeaker.getID(); 446 logger.info("ds_change ds_id=" + id); 447 getVideobridge().getStatistics().totalDominantSpeakerChanges.increment(); 448 } 449 450 speechActivityEndpointsChanged(speechActivity.getEndpointIds()); 451 452 if (dominantSpeaker != null) 453 { 454 broadcastMessage( 455 createDominantSpeakerEndpointChangeEvent( 456 dominantSpeaker.getID())); 457 if (getEndpointCount() > 2) 458 { 459 double senderRtt = getRtt(dominantSpeaker); 460 double maxReceiveRtt = getMaxReceiverRtt(dominantSpeaker.getID()); 461 // We add an additional 10ms delay to reduce the risk of the keyframe arriving 462 // too early 463 double keyframeDelay = maxReceiveRtt - senderRtt + 10; 464 if (logger.isDebugEnabled()) 465 { 466 logger.debug("Scheduling keyframe request from " + dominantSpeaker.getID() + " after a delay" + 467 " of " + keyframeDelay + "ms"); 468 } 469 TaskPools.SCHEDULED_POOL.schedule( 470 (Runnable)dominantSpeaker::requestKeyframe, 471 (long)keyframeDelay, 472 TimeUnit.MILLISECONDS 473 ); 474 } 475 } 476 } 477 getRtt(AbstractEndpoint endpoint)478 private double getRtt(AbstractEndpoint endpoint) 479 { 480 if (endpoint instanceof Endpoint) 481 { 482 Endpoint localDominantSpeaker = (Endpoint)endpoint; 483 return localDominantSpeaker.getRtt(); 484 } 485 else 486 { 487 // Octo endpoint 488 // TODO(brian): we don't currently have a way to get the RTT from this bridge 489 // to a remote endpoint, so we hard-code a value here. Discussed this with 490 // Boris, and we talked about perhaps having OctoEndpoint periodically 491 // send pings to the remote endpoint to calculate its RTT from the perspective 492 // of this bridge. 493 return 100; 494 } 495 } 496 getMaxReceiverRtt(String excludedEndpointId)497 private double getMaxReceiverRtt(String excludedEndpointId) 498 { 499 return endpointsCache.stream() 500 .filter(ep -> !ep.getID().equalsIgnoreCase(excludedEndpointId)) 501 .map(Endpoint::getRtt) 502 .mapToDouble(Double::valueOf) 503 .max() 504 .orElse(0); 505 } 506 507 /** 508 * Expires this <tt>Conference</tt>, its <tt>Content</tt>s and their 509 * respective <tt>Channel</tt>s. Releases the resources acquired by this 510 * instance throughout its life time and prepares it to be garbage 511 * collected. 512 */ expire()513 public void expire() 514 { 515 synchronized (this) 516 { 517 if (expired) 518 { 519 return; 520 } 521 else 522 { 523 expired = true; 524 } 525 } 526 527 logger.info("Expiring."); 528 EventAdmin eventAdmin = getEventAdmin(); 529 if (eventAdmin != null) 530 { 531 eventAdmin.sendEvent(EventFactory.conferenceExpired(this)); 532 } 533 534 Videobridge videobridge = getVideobridge(); 535 536 try 537 { 538 videobridge.expireConference(this); 539 } 540 finally 541 { 542 if (logger.isDebugEnabled()) 543 { 544 logger.debug("Expiring endpoints."); 545 } 546 getEndpoints().forEach(AbstractEndpoint::expire); 547 speechActivity.expire(); 548 if (tentacle != null) 549 { 550 tentacle.expire(); 551 tentacle = null; 552 } 553 554 if (includeInStatistics) 555 { 556 updateStatisticsOnExpire(); 557 } 558 } 559 } 560 561 /** 562 * Updates the statistics for this conference when it is about to expire. 563 */ updateStatisticsOnExpire()564 private void updateStatisticsOnExpire() 565 { 566 long durationSeconds 567 = Math.round((System.currentTimeMillis() - creationTime) / 1000d); 568 569 Videobridge.Statistics videobridgeStatistics 570 = getVideobridge().getStatistics(); 571 572 videobridgeStatistics.totalConferencesCompleted 573 .incrementAndGet(); 574 videobridgeStatistics.totalConferenceSeconds.addAndGet( 575 durationSeconds); 576 577 videobridgeStatistics.totalBytesReceived.addAndGet( 578 statistics.totalBytesReceived.get()); 579 videobridgeStatistics.totalBytesSent.addAndGet( 580 statistics.totalBytesSent.get()); 581 videobridgeStatistics.totalPacketsReceived.addAndGet( 582 statistics.totalPacketsReceived.get()); 583 videobridgeStatistics.totalPacketsSent.addAndGet( 584 statistics.totalPacketsSent.get()); 585 586 boolean hasFailed 587 = statistics.hasIceFailedEndpoint 588 && !statistics.hasIceSucceededEndpoint; 589 boolean hasPartiallyFailed 590 = statistics.hasIceFailedEndpoint 591 && statistics.hasIceSucceededEndpoint; 592 593 if (hasPartiallyFailed) 594 { 595 videobridgeStatistics.totalPartiallyFailedConferences 596 .incrementAndGet(); 597 } 598 599 if (hasFailed) 600 { 601 videobridgeStatistics.totalFailedConferences.incrementAndGet(); 602 } 603 604 if (logger.isInfoEnabled()) 605 { 606 StringBuilder sb = new StringBuilder("expire_conf,"); 607 sb.append("duration=").append(durationSeconds) 608 .append(",has_failed=").append(hasFailed) 609 .append(",has_partially_failed=").append(hasPartiallyFailed); 610 logger.info(sb.toString()); 611 } 612 } 613 614 /** 615 * Finds an <tt>Endpoint</tt> of this <tt>Conference</tt> which sends an RTP 616 * stream with a specific SSRC and with a specific <tt>MediaType</tt>. 617 * 618 * @param receiveSSRC the SSRC of an RTP stream received by this 619 * <tt>Conference</tt> whose sending <tt>Endpoint</tt> is to be found 620 * @return <tt>Endpoint</tt> of this <tt>Conference</tt> which sends an RTP 621 * stream with the specified <tt>ssrc</tt> and with the specified 622 * <tt>mediaType</tt>; otherwise, <tt>null</tt> 623 */ findEndpointByReceiveSSRC(long receiveSSRC)624 AbstractEndpoint findEndpointByReceiveSSRC(long receiveSSRC) 625 { 626 return getEndpoints().stream() 627 .filter(ep -> ep.receivesSsrc(receiveSSRC)) 628 .findFirst() 629 .orElse(null); 630 } 631 632 /** 633 * Returns the OSGi <tt>BundleContext</tt> in which this Conference is 634 * executing. 635 * 636 * @return the OSGi <tt>BundleContext</tt> in which the Conference is 637 * executing. 638 */ getBundleContext()639 public BundleContext getBundleContext() 640 { 641 return getVideobridge().getBundleContext(); 642 } 643 644 /** 645 * Gets an <tt>Endpoint</tt> participating in this <tt>Conference</tt> which 646 * has a specific identifier/ID. 647 * 648 * @param id the identifier/ID of the <tt>Endpoint</tt> which is to be 649 * returned 650 * @return an <tt>Endpoint</tt> participating in this <tt>Conference</tt> 651 * which has the specified <tt>id</tt> or <tt>null</tt> 652 */ 653 @Nullable getEndpoint(@otNull String id)654 public AbstractEndpoint getEndpoint(@NotNull String id) 655 { 656 return endpoints.get( 657 Objects.requireNonNull(id, "id must be non null")); 658 } 659 660 /** 661 * Initializes a new <tt>Endpoint</tt> instance with the specified 662 * <tt>id</tt> and adds it to the list of <tt>Endpoint</tt>s participating 663 * in this <tt>Conference</tt>. 664 * @param id the identifier/ID of the <tt>Endpoint</tt> which will be 665 * created 666 * @param iceControlling {@code true} if the ICE agent of this endpoint's 667 * transport will be initialized to serve as a controlling ICE agent; 668 * otherwise, {@code false} 669 * @return an <tt>Endpoint</tt> participating in this <tt>Conference</tt> 670 */ 671 @NotNull createLocalEndpoint(String id, boolean iceControlling)672 public Endpoint createLocalEndpoint(String id, boolean iceControlling) 673 throws IOException 674 { 675 final AbstractEndpoint existingEndpoint = getEndpoint(id); 676 if (existingEndpoint instanceof OctoEndpoint) 677 { 678 // It is possible that an Endpoint was migrated from another bridge 679 // in the conference to this one, and the sources lists (which 680 // implicitly signal the Octo endpoints in the conference) haven't 681 // been updated yet. We'll force the Octo endpoint to expire and 682 // we'll continue with the creation of a new local Endpoint for the 683 // participant. 684 existingEndpoint.expire(); 685 } 686 else if (existingEndpoint != null) 687 { 688 throw new IllegalArgumentException("Local endpoint with ID = " 689 + id + "already created"); 690 } 691 692 final Endpoint endpoint = new Endpoint( 693 id, this, logger, iceControlling); 694 // The propertyChangeListener will weakly reference this 695 // Conference and will unregister itself from the endpoint 696 // sooner or later. 697 endpoint.addPropertyChangeListener(propertyChangeListener); 698 699 addEndpoint(endpoint); 700 701 EventAdmin eventAdmin = getEventAdmin(); 702 if (eventAdmin != null) 703 { 704 eventAdmin.sendEvent( 705 EventFactory.endpointCreated(endpoint)); 706 } 707 708 return endpoint; 709 } 710 711 /** 712 * An endpoint was added or removed. 713 */ endpointsChanged()714 private void endpointsChanged() 715 { 716 speechActivity.endpointsChanged(); 717 } 718 719 /** 720 * The media stream tracks of one of the endpoints in this conference 721 * changed. 722 * 723 * @param endpoint the endpoint, or {@code null} if it was an Octo endpoint. 724 */ endpointTracksChanged(AbstractEndpoint endpoint)725 public void endpointTracksChanged(AbstractEndpoint endpoint) 726 { 727 List<String> endpoints = speechActivity.getEndpointIds(); 728 endpointsCache.forEach((e) -> { 729 if (e != endpoint) 730 { 731 e.speechActivityEndpointsChanged(endpoints); 732 } 733 }); 734 } 735 736 /** 737 * Updates {@link #endpointsCache} with the current contents of 738 * {@link #endpoints}. 739 */ updateEndpointsCache()740 private void updateEndpointsCache() 741 { 742 synchronized (endpointsCacheLock) 743 { 744 ArrayList<Endpoint> 745 endpointsList 746 = new ArrayList<>(endpoints.size()); 747 endpoints.values().forEach(e -> 748 { 749 if (e instanceof Endpoint) 750 { 751 endpointsList.add((Endpoint) e); 752 } 753 }); 754 755 endpointsCache = Collections.unmodifiableList(endpointsList); 756 } 757 } 758 759 /** 760 * Returns the number of local AND remote {@link Endpoint}s in this {@link Conference}. 761 * 762 * @return the number of local AND remote {@link Endpoint}s in this {@link Conference}. 763 */ getEndpointCount()764 public int getEndpointCount() 765 { 766 return endpoints.size(); 767 } 768 769 /** 770 * Returns the number of local {@link Endpoint}s in this {@link Conference}. 771 * 772 * @return the number of local {@link Endpoint}s in this {@link Conference}. 773 */ getLocalEndpointCount()774 public int getLocalEndpointCount() 775 { 776 return getLocalEndpoints().size(); 777 } 778 779 /** 780 * Gets the <tt>Endpoint</tt>s participating in/contributing to this 781 * <tt>Conference</tt>. 782 * 783 * @return the <tt>Endpoint</tt>s participating in/contributing to this 784 * <tt>Conference</tt> 785 */ getEndpoints()786 public List<AbstractEndpoint> getEndpoints() 787 { 788 return new ArrayList<>(this.endpoints.values()); 789 } 790 791 /** 792 * Gets the list of endpoints which are local to this bridge (as opposed 793 * to being on a remote bridge through Octo). 794 */ getLocalEndpoints()795 public List<Endpoint> getLocalEndpoints() 796 { 797 return endpointsCache; 798 } 799 800 /** 801 * Gets the JID of the conference focus who has initialized this instance 802 * and from whom requests to manage this instance must come or they will be 803 * ignored. 804 * 805 * @return the JID of the conference focus who has initialized this instance 806 * and from whom requests to manage this instance must come or they will be 807 * ignored 808 */ getFocus()809 public final Jid getFocus() 810 { 811 return focus; 812 } 813 814 /** 815 * Gets the (unique) identifier/ID of this instance. 816 * 817 * @return the (unique) identifier/ID of this instance 818 */ getID()819 public final String getID() 820 { 821 return id; 822 } 823 824 /** 825 * Gets the time in milliseconds of the last activity related to this 826 * <tt>Conference</tt>. 827 * 828 * @return the time in milliseconds of the last activity related to this 829 * <tt>Conference</tt> 830 */ getLastActivityTime()831 public long getLastActivityTime() 832 { 833 synchronized (this) 834 { 835 return lastActivityTime; 836 } 837 } 838 839 /** 840 * Returns the JID of the last known focus. 841 * @return the JID of the last known focus. 842 */ getLastKnowFocus()843 public Jid getLastKnowFocus() 844 { 845 return lastKnownFocus; 846 } 847 848 /** 849 * Gets an <tt>Endpoint</tt> participating in this <tt>Conference</tt> which 850 * has a specific identifier/ID. 851 * 852 * @param id the identifier/ID of the <tt>Endpoint</tt> which is to be 853 * returned 854 * @return an <tt>Endpoint</tt> participating in this <tt>Conference</tt> 855 * or {@code null} if endpoint with specified ID does not exist in a 856 * conference 857 */ 858 @Nullable getLocalEndpoint(String id)859 public Endpoint getLocalEndpoint(String id) 860 { 861 AbstractEndpoint endpoint = getEndpoint(id); 862 if (endpoint instanceof Endpoint) 863 { 864 return (Endpoint) endpoint; 865 } 866 867 return null; 868 } 869 870 /** 871 * Gets the speech activity (representation) of the <tt>Endpoint</tt>s of 872 * this <tt>Conference</tt>. 873 * 874 * @return the speech activity (representation) of the <tt>Endpoint</tt>s of 875 * this <tt>Conference</tt> 876 */ getSpeechActivity()877 public ConferenceSpeechActivity getSpeechActivity() 878 { 879 return speechActivity; 880 } 881 882 /** 883 * Gets the <tt>Videobridge</tt> which has initialized this 884 * <tt>Conference</tt>. 885 * 886 * @return the <tt>Videobridge</tt> which has initialized this 887 * <tt>Conference</tt> 888 */ getVideobridge()889 public final Videobridge getVideobridge() 890 { 891 return videobridge; 892 } 893 894 /** 895 * Gets the indicator which determines whether this <tt>Conference</tt> has 896 * expired. 897 * 898 * @return <tt>true</tt> if this <tt>Conference</tt> has expired; otherwise, 899 * <tt>false</tt> 900 */ isExpired()901 public boolean isExpired() 902 { 903 // this.expired starts as 'false' and only ever changes to 'true', 904 // so there is no need to synchronize while reading. 905 return expired; 906 } 907 908 /** 909 * Notifies this instance that there was a change in the value of a property 910 * of an object in which this instance is interested. 911 * 912 * @param ev a <tt>PropertyChangeEvent</tt> which specifies the object of 913 * interest, the name of the property and the old and new values of that 914 * property 915 */ 916 @Override 917 @SuppressWarnings("unchecked") propertyChange(PropertyChangeEvent ev)918 public void propertyChange(PropertyChangeEvent ev) 919 { 920 Object source = ev.getSource(); 921 922 if (isExpired()) 923 { 924 // An expired Conference is to be treated like a null Conference 925 // i.e. it does not handle any PropertyChangeEvents. If possible, 926 // make sure that no further PropertyChangeEvents will be delivered 927 // to this Conference. 928 if (source instanceof PropertyChangeNotifier) 929 { 930 ((PropertyChangeNotifier) source).removePropertyChangeListener( 931 propertyChangeListener); 932 } 933 } 934 else if (Endpoint.SELECTED_ENDPOINTS_PROPERTY_NAME 935 .equals(ev.getPropertyName())) 936 { 937 Set<String> oldSelectedEndpoints = (Set<String>)ev.getOldValue(); 938 Set<String> newSelectedEndpoints = (Set<String>)ev.getNewValue(); 939 // Any endpoints in the oldSelectedEndpoints list which AREN'T 940 // in the newSelectedEndpoints list should have their count decremented 941 oldSelectedEndpoints.stream() 942 .filter( 943 oldSelectedEp -> !newSelectedEndpoints.contains(oldSelectedEp)) 944 .map(this::getEndpoint) 945 .filter(Objects::nonNull) 946 .forEach(AbstractEndpoint::decrementSelectedCount); 947 948 // Any endpoints in the newSelectedEndpoints list which AREN'T 949 // in the oldSelectedEndpoints list should have their count incremented 950 newSelectedEndpoints.stream() 951 .filter( 952 newSelectedEp -> !oldSelectedEndpoints.contains(newSelectedEp)) 953 .map(this::getEndpoint) 954 .filter(Objects::nonNull) 955 .forEach(AbstractEndpoint::incrementSelectedCount); 956 } 957 } 958 959 /** 960 * Notifies this conference that one of it's endpoints has expired. 961 * 962 * @param endpoint the <tt>Endpoint</tt> which expired. 963 */ endpointExpired(AbstractEndpoint endpoint)964 void endpointExpired(AbstractEndpoint endpoint) 965 { 966 final AbstractEndpoint removedEndpoint; 967 String id = endpoint.getID(); 968 removedEndpoint = endpoints.remove(id); 969 if (removedEndpoint != null) 970 { 971 updateEndpointsCache(); 972 } 973 974 if (tentacle != null) 975 { 976 tentacle.endpointExpired(id); 977 } 978 979 if (removedEndpoint != null) 980 { 981 final EventAdmin eventAdmin = getEventAdmin(); 982 if (eventAdmin != null) 983 { 984 eventAdmin.sendEvent( 985 EventFactory.endpointExpired(removedEndpoint)); 986 } 987 endpointsChanged(); 988 } 989 } 990 991 /** 992 * Adds a specific {@link AbstractEndpoint} instance to the list of 993 * endpoints in this conference. 994 * @param endpoint the endpoint to add. 995 */ addEndpoint(AbstractEndpoint endpoint)996 public void addEndpoint(AbstractEndpoint endpoint) 997 { 998 if (endpoint.getConference() != this) 999 { 1000 throw new IllegalArgumentException("Endpoint belong to other " + 1001 "conference = " + endpoint.getConference()); 1002 } 1003 1004 final AbstractEndpoint replacedEndpoint; 1005 replacedEndpoint = endpoints.put(endpoint.getID(), endpoint); 1006 updateEndpointsCache(); 1007 1008 endpointsChanged(); 1009 1010 if (replacedEndpoint != null) 1011 { 1012 logger.info("Endpoint with id " + endpoint.getID() + ": " + 1013 replacedEndpoint + " has been replaced by new " + 1014 "endpoint with same id: " + endpoint); 1015 } 1016 } 1017 1018 /** 1019 * Notifies this {@link Conference} that one of its {@link Endpoint}s 1020 * transport channel has become available. 1021 * 1022 * @param endpoint the {@link Endpoint} whose transport channel has become 1023 * available. 1024 */ 1025 @Override endpointMessageTransportConnected(@otNull AbstractEndpoint endpoint)1026 public void endpointMessageTransportConnected(@NotNull AbstractEndpoint endpoint) 1027 { 1028 EventAdmin eventAdmin = getEventAdmin(); 1029 1030 if (eventAdmin != null) 1031 { 1032 eventAdmin.postEvent( 1033 EventFactory.endpointMessageTransportReady(endpoint)); 1034 } 1035 1036 if (!isExpired()) 1037 { 1038 AbstractEndpoint dominantSpeaker 1039 = speechActivity.getDominantEndpoint(); 1040 1041 if (dominantSpeaker != null) 1042 { 1043 try 1044 { 1045 endpoint.sendMessage( 1046 createDominantSpeakerEndpointChangeEvent( 1047 dominantSpeaker.getID())); 1048 } 1049 catch (IOException e) 1050 { 1051 logger.error( 1052 "Failed to send dominant speaker update" 1053 + " on data channel to " + endpoint.getID(), 1054 e); 1055 } 1056 } 1057 } 1058 } 1059 1060 /** 1061 * Sets the JID of the last known focus. 1062 * 1063 * @param jid the JID of the last known focus. 1064 */ setLastKnownFocus(Jid jid)1065 public void setLastKnownFocus(Jid jid) 1066 { 1067 lastKnownFocus = jid; 1068 } 1069 1070 /** 1071 * Notifies this instance that the list of ordered endpoints has changed 1072 */ speechActivityEndpointsChanged(List<String> newEndpointIds)1073 void speechActivityEndpointsChanged(List<String> newEndpointIds) 1074 { 1075 endpointsCache.forEach( 1076 e -> e.speechActivityEndpointsChanged(newEndpointIds)); 1077 } 1078 1079 /** 1080 * Sets the time in milliseconds of the last activity related to this 1081 * <tt>Conference</tt> to the current system time. 1082 */ touch()1083 public void touch() 1084 { 1085 long now = System.currentTimeMillis(); 1086 1087 synchronized (this) 1088 { 1089 if (getLastActivityTime() < now) 1090 { 1091 lastActivityTime = now; 1092 } 1093 } 1094 } 1095 1096 /** 1097 * Gets the conference name. 1098 * 1099 * @return the conference name 1100 */ getName()1101 public Localpart getName() 1102 { 1103 return name; 1104 } 1105 1106 /** 1107 * Returns the <tt>EventAdmin</tt> instance used by this <tt>Conference</tt> 1108 * and all instances (of {@code Content}, {@code Channel}, etc.) created by 1109 * it. 1110 * 1111 * @return the <tt>EventAdmin</tt> instance used by this <tt>Conference</tt> 1112 */ getEventAdmin()1113 public EventAdmin getEventAdmin() 1114 { 1115 return eventAdmin; 1116 } 1117 1118 /** 1119 * @return the {@link Logger} used by this instance. 1120 */ getLogger()1121 public Logger getLogger() 1122 { 1123 return logger; 1124 } 1125 1126 /** 1127 * @return the global ID of the conference, or {@code null} if none has been 1128 * set. 1129 */ getGid()1130 public String getGid() 1131 { 1132 return gid; 1133 } 1134 1135 /** 1136 * {@inheritDoc} 1137 * </p> 1138 * @return {@code true} if this {@link Conference} is ready to be expired. 1139 */ 1140 @Override shouldExpire()1141 public boolean shouldExpire() 1142 { 1143 // Allow a conference to have no endpoints in the first 20 seconds. 1144 return getEndpointCount() == 0 1145 && (System.currentTimeMillis() - creationTime > 20000); 1146 } 1147 1148 /** 1149 * {@inheritDoc} 1150 */ 1151 @Override safeExpire()1152 public void safeExpire() 1153 { 1154 expireableImpl.safeExpire(); 1155 } 1156 1157 /** 1158 * @return this {@link Conference}'s Colibri shim. 1159 */ getShim()1160 public ConferenceShim getShim() 1161 { 1162 return shim; 1163 } 1164 1165 /** 1166 * Broadcasts the packet to all endpoints and tentacles that want it. 1167 * 1168 * @param packetInfo the packet 1169 */ sendOut(PacketInfo packetInfo)1170 private void sendOut(PacketInfo packetInfo) 1171 { 1172 String sourceEndpointId = packetInfo.getEndpointId(); 1173 // We want to avoid calling 'clone' for the last receiver of this packet 1174 // since it's unnecessary. To do so, we'll wait before we clone and send 1175 // to an interested handler until after we've determined another handler 1176 // is also interested in the packet. We'll give the last handler the 1177 // original packet (without cloning). 1178 PotentialPacketHandler prevHandler = null; 1179 for (Endpoint endpoint : endpointsCache) 1180 { 1181 if (endpoint.getID().equals(sourceEndpointId)) 1182 { 1183 continue; 1184 } 1185 1186 if (endpoint.wants(packetInfo)) 1187 { 1188 if (prevHandler != null) 1189 { 1190 prevHandler.send(packetInfo.clone()); 1191 } 1192 prevHandler = endpoint; 1193 } 1194 } 1195 if (tentacle != null && tentacle.wants(packetInfo)) 1196 { 1197 if (prevHandler != null) 1198 { 1199 prevHandler.send(packetInfo.clone()); 1200 } 1201 prevHandler = tentacle; 1202 } 1203 1204 if (prevHandler != null) 1205 { 1206 prevHandler.send(packetInfo); 1207 } 1208 else 1209 { 1210 // No one wanted the packet, so the buffer is now free! 1211 ByteBufferPool.returnBuffer(packetInfo.getPacket().getBuffer()); 1212 } 1213 } 1214 1215 /** 1216 * Gets the audio level listener. 1217 */ getAudioLevelListener()1218 public AudioLevelListener getAudioLevelListener() 1219 { 1220 return audioLevelListener; 1221 } 1222 1223 /** 1224 * @return The {@link OctoTentacle} for this conference. 1225 */ getTentacle()1226 public OctoTentacle getTentacle() 1227 { 1228 if (tentacle == null) 1229 { 1230 tentacle = new OctoTentacle(this); 1231 tentacle.addPropertyChangeListener(propertyChangeListener); 1232 } 1233 return tentacle; 1234 } 1235 isOctoEnabled()1236 public boolean isOctoEnabled() 1237 { 1238 return tentacle != null; 1239 } 1240 1241 /** 1242 * Handles an RTP/RTCP packet coming from a specific endpoint. 1243 * @param packetInfo 1244 */ handleIncomingPacket(PacketInfo packetInfo)1245 public void handleIncomingPacket(PacketInfo packetInfo) 1246 { 1247 Packet packet = packetInfo.getPacket(); 1248 if (packet instanceof RtpPacket) 1249 { 1250 // This is identical to the default 'else' below, but it defined 1251 // because the vast majority of packet will follow this path. 1252 sendOut(packetInfo); 1253 } 1254 else if (packet instanceof RtcpFbPliPacket 1255 || packet instanceof RtcpFbFirPacket) 1256 { 1257 long mediaSsrc = (packet instanceof RtcpFbPliPacket) 1258 ? ((RtcpFbPliPacket) packet).getMediaSourceSsrc() 1259 : ((RtcpFbFirPacket) packet).getMediaSenderSsrc(); 1260 1261 // XXX we could make this faster with a map 1262 AbstractEndpoint targetEndpoint 1263 = findEndpointByReceiveSSRC(mediaSsrc); 1264 1265 PotentialPacketHandler pph = null; 1266 if (targetEndpoint instanceof Endpoint) 1267 { 1268 pph = (Endpoint) targetEndpoint; 1269 } 1270 else if (targetEndpoint instanceof OctoEndpoint) 1271 { 1272 pph = tentacle; 1273 } 1274 1275 // This is not a redundant check. With Octo and 3 or more bridges, 1276 // some PLI or FIR will come from Octo but the target endpoint will 1277 // also be Octo. We need to filter these out. 1278 if (pph == null) 1279 { 1280 if (logger.isDebugEnabled()) 1281 { 1282 logger.debug("Dropping FIR/PLI for media ssrc " + mediaSsrc); 1283 } 1284 } 1285 else if (pph.wants(packetInfo)) 1286 { 1287 pph.send(packetInfo); 1288 } 1289 } 1290 else 1291 { 1292 sendOut(packetInfo); 1293 } 1294 } 1295 1296 /** 1297 * Gets a JSON representation of the parts of this object's state that 1298 * are deemed useful for debugging. 1299 * 1300 * @param full if specified the result will include more details and will 1301 * include the debug state of the endpoint(s). Otherwise just the IDs and 1302 * names of the conference and endpoints are included. 1303 * @param endpointId the ID of the endpoint to include. If set to 1304 * {@code null}, all endpoints will be included. 1305 */ 1306 @SuppressWarnings("unchecked") getDebugState(boolean full, String endpointId)1307 public JSONObject getDebugState(boolean full, String endpointId) 1308 { 1309 JSONObject debugState = new JSONObject(); 1310 debugState.put("id", id); 1311 debugState.put("name", name == null ? null : name.toString()); 1312 1313 if (full) 1314 { 1315 debugState.put("gid", gid); 1316 debugState.put("expired", expired); 1317 debugState.put("creationTime", creationTime); 1318 debugState.put("lastActivity", lastActivityTime); 1319 debugState.put("speechActivity", speechActivity.getDebugState()); 1320 debugState.put("includeInStatistics", includeInStatistics); 1321 debugState.put("statistics", statistics.getJson()); 1322 //debugState.put("encodingsManager", encodingsManager.getDebugState()); 1323 OctoTentacle tentacle = this.tentacle; 1324 debugState.put( 1325 "tentacle", 1326 tentacle == null ? null : tentacle.getDebugState()); 1327 } 1328 1329 JSONObject endpoints = new JSONObject(); 1330 debugState.put("endpoints", endpoints); 1331 for (Endpoint e : endpointsCache) 1332 { 1333 if (endpointId == null || endpointId.equals(e.getID())) 1334 { 1335 endpoints.put(e.getID(), 1336 full ? e.getDebugState() : e.getStatsId()); 1337 } 1338 } 1339 return debugState; 1340 } 1341 1342 /** 1343 * Whether this looks like a conference in which the two endpoints are 1344 * using a peer-to-peer connection (i.e. none of them are sending audio 1345 * or video). 1346 * This has false positives when e.g. an endpoint doesn't support p2p 1347 * (firefox) and both are audio/video muted. 1348 */ isP2p()1349 public boolean isP2p() 1350 { 1351 return isInactive() && getEndpointCount() == 2; 1352 } 1353 1354 /** 1355 * Whether the conference is inactive, in the sense that none of its 1356 * endpoints are sending audio or video. 1357 */ isInactive()1358 public boolean isInactive() 1359 { 1360 return getEndpoints().stream().noneMatch(e -> e.isSendingAudio() || e.isSendingVideo()); 1361 } 1362 1363 /** 1364 * Holds conference statistics. 1365 */ 1366 public static class Statistics 1367 { 1368 /** 1369 * The total number of bytes received in RTP packets in channels in this 1370 * conference. Note that this is only updated when channels expire. 1371 */ 1372 AtomicLong totalBytesReceived = new AtomicLong(); 1373 1374 /** 1375 * The total number of bytes sent in RTP packets in channels in this 1376 * conference. Note that this is only updated when channels expire. 1377 */ 1378 AtomicLong totalBytesSent = new AtomicLong(); 1379 1380 /** 1381 * The total number of RTP packets received in channels in this 1382 * conference. Note that this is only updated when channels expire. 1383 */ 1384 AtomicLong totalPacketsReceived = new AtomicLong(); 1385 1386 /** 1387 * The total number of RTP packets received in channels in this 1388 * conference. Note that this is only updated when channels expire. 1389 */ 1390 AtomicLong totalPacketsSent = new AtomicLong(); 1391 1392 /** 1393 * Whether at least one endpoint in this conference failed ICE. 1394 */ 1395 boolean hasIceFailedEndpoint = false; 1396 1397 /** 1398 * Whether at least one endpoint in this conference completed ICE 1399 * successfully. 1400 */ 1401 boolean hasIceSucceededEndpoint = false; 1402 1403 /** 1404 * Gets a snapshot of this object's state as JSON. 1405 */ 1406 @SuppressWarnings("unchecked") getJson()1407 private JSONObject getJson() 1408 { 1409 JSONObject jsonObject = new JSONObject(); 1410 jsonObject.put("total_bytes_received", totalBytesReceived.get()); 1411 jsonObject.put("total_bytes_sent", totalBytesSent.get()); 1412 jsonObject.put("total_packets_received", totalPacketsReceived.get()); 1413 jsonObject.put("total_packets_sent", totalPacketsSent.get()); 1414 jsonObject.put("has_failed_endpoint", hasIceFailedEndpoint); 1415 jsonObject.put("has_succeeded_endpoint", hasIceSucceededEndpoint); 1416 return jsonObject; 1417 } 1418 } 1419 1420 /** 1421 * This is a no-op diagnostic context (one that will record nothing) meant 1422 * to disable logging of time-series for health checks. 1423 */ 1424 static class NoOpDiagnosticContext 1425 extends DiagnosticContext 1426 { 1427 @Override makeTimeSeriesPoint(String timeSeriesName, long tsMs)1428 public TimeSeriesPoint makeTimeSeriesPoint(String timeSeriesName, long tsMs) 1429 { 1430 return new NoOpTimeSeriesPoint(); 1431 } 1432 1433 @Override put(@otNull String key, @NotNull Object value)1434 public Object put(@NotNull String key, @NotNull Object value) 1435 { 1436 return null; 1437 } 1438 } 1439 1440 static class NoOpTimeSeriesPoint 1441 extends DiagnosticContext.TimeSeriesPoint 1442 { NoOpTimeSeriesPoint()1443 public NoOpTimeSeriesPoint() 1444 { 1445 this(Collections.emptyMap()); 1446 } 1447 NoOpTimeSeriesPoint(Map<String, Object> m)1448 public NoOpTimeSeriesPoint(Map<String, Object> m) 1449 { 1450 super(m); 1451 } 1452 1453 @Override put(String key, Object value)1454 public Object put(String key, Object value) 1455 { 1456 return null; 1457 } 1458 } 1459 } 1460