1 /*
2  * Copyright @ 2015 - Present, 8x8 Inc
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.jitsi.videobridge;
17 
18 import edu.umd.cs.findbugs.annotations.*;
19 import org.jetbrains.annotations.*;
20 import org.jetbrains.annotations.Nullable;
21 import org.jitsi.eventadmin.*;
22 import org.jitsi.nlj.*;
23 import org.jitsi.rtp.*;
24 import org.jitsi.rtp.rtcp.rtcpfb.payload_specific_fb.*;
25 import org.jitsi.rtp.rtp.*;
26 import org.jitsi.utils.collections.*;
27 import org.jitsi.utils.event.*;
28 import org.jitsi.utils.logging.DiagnosticContext;
29 import org.jitsi.utils.logging2.*;
30 import org.jitsi.utils.logging2.Logger;
31 import org.jitsi.utils.logging2.LoggerImpl;
32 import org.jitsi.videobridge.octo.*;
33 import org.jitsi.videobridge.shim.*;
34 import org.jitsi.videobridge.util.*;
35 import org.jitsi.xmpp.extensions.colibri.*;
36 import org.json.simple.*;
37 import org.jxmpp.jid.*;
38 import org.jxmpp.jid.parts.*;
39 import org.osgi.framework.*;
40 
41 import java.beans.*;
42 import java.io.*;
43 import java.lang.*;
44 import java.lang.SuppressWarnings;
45 import java.util.*;
46 import java.util.concurrent.*;
47 import java.util.concurrent.atomic.*;
48 import java.util.logging.*;
49 
50 import static org.jitsi.utils.collections.JMap.entry;
51 import static org.jitsi.videobridge.EndpointMessageBuilder.*;
52 
53 /**
54  * Represents a conference in the terms of Jitsi Videobridge.
55  *
56  * @author Lyubomir Marinov
57  * @author Boris Grozev
58  * @author Hristo Terezov
59  * @author George Politis
60  */
61 public class Conference
62      extends PropertyChangeNotifier
63      implements PropertyChangeListener,
64         Expireable,
65         AbstractEndpointMessageTransport.EndpointMessageTransportEventHandler
66 {
67     /**
68      * The endpoints participating in this {@link Conference}. Although it's a
69      * {@link ConcurrentHashMap}, writing to it must be protected by
70      * synchronizing on the map itself, because it must be kept in sync with
71      * {@link #endpointsCache}.
72      */
73     private final ConcurrentHashMap<String, AbstractEndpoint> endpoints
74             = new ConcurrentHashMap<>();
75 
76     /**
77      * A read-only cache of the endpoints in this conference. Note that it
78      * contains only the {@link Endpoint} instances (and not Octo endpoints).
79      * This is because the cache was introduced for performance reasons only
80      * (we iterate over it for each RTP packet) and the Octo endpoints are not
81      * needed.
82      */
83     private List<Endpoint> endpointsCache = Collections.emptyList();
84 
85     private final Object endpointsCacheLock = new Object();
86 
87     /**
88      * The {@link EventAdmin} instance (to be) used by this {@code Conference}
89      * and all instances (of {@code Content}, {@code Channel}, etc.) created by
90      * it.
91      */
92     private final EventAdmin eventAdmin;
93 
94     /**
95      * The indicator which determines whether {@link #expire()} has been called
96      * on this <tt>Conference</tt>.
97      */
98     @SuppressFBWarnings(
99             value = "IS2_INCONSISTENT_SYNC",
100             justification = "The value is deemed safe to read without " +
101                 "synchronization.")
102     private boolean expired = false;
103 
104     /**
105      * The JID of the conference focus who has initialized this instance and
106      * from whom requests to manage this instance must come or they will be
107      * ignored. If <tt>null</tt> value is assigned we don't care who modifies
108      * the conference.
109      */
110     private final Jid focus;
111 
112     /**
113      * The (unique) identifier/ID of this instance.
114      */
115     private final String id;
116 
117     /**
118      * The "global" id of this conference, set by the controller (e.g. jicofo)
119      * as opposed to the bridge. This defaults to {@code null} unless it is
120      * specified.
121      */
122     private final String gid;
123 
124     /**
125      * The world readable name of this instance if any.
126      */
127     private Localpart name;
128 
129     /**
130      * The time in milliseconds of the last activity related to this
131      * <tt>Conference</tt>. In the time interval between the last activity and
132      * now, this <tt>Conference</tt> is considered inactive.
133      */
134     @SuppressFBWarnings(
135             value = "IS2_INCONSISTENT_SYNC",
136             justification = "The value is deemed safe to read without " +
137                     "synchronization.")
138     private long lastActivityTime;
139 
140     /**
141      * If {@link #focus} is <tt>null</tt> the value of the last known focus is
142      * stored in this member.
143      */
144     private Jid lastKnownFocus;
145 
146     /**
147      * The <tt>PropertyChangeListener</tt> which listens to
148      * <tt>PropertyChangeEvent</tt>s on behalf of this instance while
149      * referencing it by a <tt>WeakReference</tt>.
150      */
151     private final PropertyChangeListener propertyChangeListener
152         = new WeakReferencePropertyChangeListener(this);
153 
154     /**
155      * The speech activity (representation) of the <tt>Endpoint</tt>s of this
156      * <tt>Conference</tt>.
157      */
158     private final ConferenceSpeechActivity speechActivity;
159 
160     /**
161      * The audio level listener.
162      */
163     private final AudioLevelListener audioLevelListener;
164 
165     /**
166      * The <tt>Videobridge</tt> which has initialized this <tt>Conference</tt>.
167      */
168     private final Videobridge videobridge;
169 
170     /**
171      * Holds conference statistics.
172      */
173     private final Statistics statistics = new Statistics();
174 
175     /**
176      * The {@link Logger} to be used by this instance to print debug
177      * information.
178      */
179     private final Logger logger;
180 
181 
182     /**
183      * Whether this conference should be considered when generating statistics.
184      */
185     private final boolean includeInStatistics;
186 
187     /**
188      * The time when this {@link Conference} was created.
189      */
190     private final long creationTime = System.currentTimeMillis();
191 
192     /**
193      * The {@link ExpireableImpl} which we use to safely expire this conference.
194      */
195     private final ExpireableImpl expireableImpl;
196 
197     /**
198      * The shim which handles Colibri-related logic for this conference.
199      */
200     private final ConferenceShim shim;
201 
202     //TODO not public
203     final public EncodingsManager encodingsManager = new EncodingsManager();
204 
205     /**
206      * This {@link Conference}'s link to Octo.
207      */
208     private OctoTentacle tentacle;
209 
210     /**
211      * Initializes a new <tt>Conference</tt> instance which is to represent a
212      * conference in the terms of Jitsi Videobridge which has a specific
213      * (unique) ID and is managed by a conference focus with a specific JID.
214      *
215      * @param videobridge the <tt>Videobridge</tt> on which the new
216      * <tt>Conference</tt> instance is to be initialized
217      * @param id the (unique) ID of the new instance to be initialized
218      * @param focus the JID of the conference focus who has requested the
219      * initialization of the new instance and from whom further/future requests
220      * to manage the new instance must come or they will be ignored.
221      * Pass <tt>null</tt> to override this safety check.
222      * @param name world readable name of this instance if any.
223      * @param enableLogging whether logging should be enabled for this
224      * {@link Conference} and its sub-components, and whether this conference
225      * should be considered when generating statistics.
226      * @param gid the optional "global" id of the conference.
227      */
Conference(Videobridge videobridge, String id, Jid focus, Localpart name, boolean enableLogging, String gid)228     public Conference(Videobridge videobridge,
229                       String id,
230                       Jid focus,
231                       Localpart name,
232                       boolean enableLogging,
233                       String gid)
234     {
235         this.videobridge = Objects.requireNonNull(videobridge, "videobridge");
236         Level minLevel = enableLogging ? Level.ALL : Level.WARNING;
237         Map<String, String> context = JMap.ofEntries(
238             entry("confId", id)
239         );
240         if (gid != null)
241         {
242             context.put("gid", gid);
243         }
244         if (name != null)
245         {
246             context.put("conf_name", name.toString());
247         }
248         logger = new LoggerImpl(Conference.class.getName(), minLevel, new LogContext(context));
249         this.shim = new ConferenceShim(this, logger);
250         this.id = Objects.requireNonNull(id, "id");
251         this.gid = gid;
252         this.focus = focus;
253         this.eventAdmin = enableLogging ? videobridge.getEventAdmin() : null;
254         this.includeInStatistics = enableLogging;
255         this.name = name;
256 
257         lastKnownFocus = focus;
258 
259         speechActivity = new ConferenceSpeechActivity(this);
260         audioLevelListener
261             = (sourceSsrc, level)
262                 -> speechActivity.levelChanged(sourceSsrc, (int) level);
263 
264         expireableImpl = new ExpireableImpl(logger, this::expire);
265 
266         if (enableLogging)
267         {
268             eventAdmin.sendEvent(EventFactory.conferenceCreated(this));
269             Videobridge.Statistics videobridgeStatistics
270                 = videobridge.getStatistics();
271             videobridgeStatistics.totalConferencesCreated.incrementAndGet();
272         }
273 
274         // We listen to our own events so we have a centralized place to handle
275         // certain things (e.g. anytime the endpoints list changes)
276         addPropertyChangeListener(propertyChangeListener);
277 
278         touch();
279     }
280 
281     /**
282      * Creates a new diagnostic context instance that includes the conference
283      * name and the conference creation time.
284      *
285      * @return the new {@link DiagnosticContext} instance.
286      */
newDiagnosticContext()287     public DiagnosticContext newDiagnosticContext()
288     {
289 
290 
291         if (name != null)
292         {
293             DiagnosticContext diagnosticContext = new DiagnosticContext();
294             diagnosticContext.put("conf_name", name.toString());
295             diagnosticContext.put("conf_creation_time_ms", creationTime);
296             return diagnosticContext;
297         }
298         else
299         {
300             return new NoOpDiagnosticContext();
301         }
302     }
303 
304     /**
305      * Gets the statistics of this {@link Conference}.
306      *
307      * @return the statistics of this {@link Conference}.
308      */
getStatistics()309     public Statistics getStatistics()
310     {
311         return statistics;
312     }
313 
314     /**
315      * @return whether this conference should be included in generated
316      * statistics.
317      */
includeInStatistics()318      public boolean includeInStatistics()
319      {
320          return includeInStatistics;
321      }
322 
323     /**
324      * Sends a message to a subset of endpoints in the call, primary use
325      * case being a message that has originated from an endpoint (as opposed to
326      * a message originating from the bridge and being sent to all endpoints in
327      * the call, for that see {@link #broadcastMessage(String)}.
328      *
329      * @param msg the message to be sent
330      * @param endpoints the list of <tt>Endpoint</tt>s to which the message will
331      * be sent.
332      */
sendMessage( String msg, List<AbstractEndpoint> endpoints, boolean sendToOcto)333     public void sendMessage(
334         String msg,
335         List<AbstractEndpoint> endpoints,
336         boolean sendToOcto)
337     {
338         for (AbstractEndpoint endpoint : endpoints)
339         {
340             try
341             {
342                 endpoint.sendMessage(msg);
343             }
344             catch (IOException e)
345             {
346                 logger.error(
347                     "Failed to send message on data channel to: "
348                         + endpoint.getID() + ", msg: " + msg, e);
349             }
350         }
351 
352         if (sendToOcto && tentacle != null)
353         {
354             tentacle.sendMessage(msg);
355         }
356     }
357 
358     /**
359      * Used to send a message to a subset of endpoints in the call, primary use
360      * case being a message that has originated from an endpoint (as opposed to
361      * a message originating from the bridge and being sent to all endpoints in
362      * the call, for that see {@link #broadcastMessage(String)}.
363      *
364      * @param msg the message to be sent
365      * @param endpoints the list of <tt>Endpoint</tt>s to which the message will
366      * be sent.
367      */
sendMessage(String msg, List<AbstractEndpoint> endpoints)368     public void sendMessage(String msg, List<AbstractEndpoint> endpoints)
369     {
370         sendMessage(msg, endpoints, false);
371     }
372 
373     /**
374      * Broadcasts a string message to all endpoints of the conference.
375      *
376      * @param msg the message to be broadcast.
377      */
broadcastMessage(String msg, boolean sendToOcto)378     public void broadcastMessage(String msg, boolean sendToOcto)
379     {
380         sendMessage(msg, getEndpoints(), sendToOcto);
381     }
382 
383     /**
384      * Broadcasts a string message to all endpoints of the conference.
385      *
386      * @param msg the message to be broadcast.
387      */
broadcastMessage(String msg)388     public void broadcastMessage(String msg)
389     {
390         broadcastMessage(msg, false);
391     }
392 
393     /**
394      * Requests a keyframe from the endpoint with the specified id, if the
395      * endpoint is found in the conference.
396      *
397      * @param endpointID the id of the endpoint to request a keyframe from.
398      */
requestKeyframe(String endpointID, long mediaSsrc)399     public void requestKeyframe(String endpointID, long mediaSsrc)
400     {
401         AbstractEndpoint remoteEndpoint = getEndpoint(endpointID);
402 
403         if (remoteEndpoint != null)
404         {
405             remoteEndpoint.requestKeyframe(mediaSsrc);
406         }
407         else if (logger.isDebugEnabled())
408         {
409             logger.debug(
410                 "Cannot request keyframe because the endpoint was not found.");
411         }
412     }
413     /**
414      * Sets the values of the properties of a specific
415      * <tt>ColibriConferenceIQ</tt> to the values of the respective
416      * properties of this instance. Thus, the specified <tt>iq</tt> may be
417      * thought of as a description of this instance.
418      * <p>
419      * <b>Note</b>: The copying of the values is shallow i.e. the
420      * <tt>Content</tt>s of this instance are not described in the specified
421      * <tt>iq</tt>.
422      * </p>
423      *
424      * @param iq the <tt>ColibriConferenceIQ</tt> to set the values of the
425      * properties of this instance on
426      */
describeShallow(ColibriConferenceIQ iq)427     public void describeShallow(ColibriConferenceIQ iq)
428     {
429         iq.setID(getID());
430         iq.setName(getName());
431     }
432 
433     /**
434      * Notifies this instance that {@link #speechActivity} has identified a
435      * speaker switch event in this multipoint conference and there is now a new
436      * dominant speaker.
437      */
dominantSpeakerChanged()438     void dominantSpeakerChanged()
439     {
440         AbstractEndpoint dominantSpeaker = speechActivity.getDominantEndpoint();
441 
442         if (logger.isInfoEnabled())
443         {
444             String id
445                 = dominantSpeaker == null ? "null" : dominantSpeaker.getID();
446             logger.info("ds_change ds_id=" + id);
447             getVideobridge().getStatistics().totalDominantSpeakerChanges.increment();
448         }
449 
450         speechActivityEndpointsChanged(speechActivity.getEndpointIds());
451 
452         if (dominantSpeaker != null)
453         {
454             broadcastMessage(
455                     createDominantSpeakerEndpointChangeEvent(
456                         dominantSpeaker.getID()));
457             if (getEndpointCount() > 2)
458             {
459                 double senderRtt = getRtt(dominantSpeaker);
460                 double maxReceiveRtt = getMaxReceiverRtt(dominantSpeaker.getID());
461                 // We add an additional 10ms delay to reduce the risk of the keyframe arriving
462                 // too early
463                 double keyframeDelay = maxReceiveRtt - senderRtt + 10;
464                 if (logger.isDebugEnabled())
465                 {
466                     logger.debug("Scheduling keyframe request from " + dominantSpeaker.getID() + " after a delay" +
467                             " of " + keyframeDelay + "ms");
468                 }
469                 TaskPools.SCHEDULED_POOL.schedule(
470                         (Runnable)dominantSpeaker::requestKeyframe,
471                         (long)keyframeDelay,
472                         TimeUnit.MILLISECONDS
473                 );
474             }
475         }
476     }
477 
getRtt(AbstractEndpoint endpoint)478     private double getRtt(AbstractEndpoint endpoint)
479     {
480         if (endpoint instanceof Endpoint)
481         {
482             Endpoint localDominantSpeaker = (Endpoint)endpoint;
483             return localDominantSpeaker.getRtt();
484         }
485         else
486         {
487             // Octo endpoint
488             // TODO(brian): we don't currently have a way to get the RTT from this bridge
489             // to a remote endpoint, so we hard-code a value here.  Discussed this with
490             // Boris, and we talked about perhaps having OctoEndpoint periodically
491             // send pings to the remote endpoint to calculate its RTT from the perspective
492             // of this bridge.
493             return 100;
494         }
495     }
496 
getMaxReceiverRtt(String excludedEndpointId)497     private double getMaxReceiverRtt(String excludedEndpointId)
498     {
499         return endpointsCache.stream()
500                 .filter(ep -> !ep.getID().equalsIgnoreCase(excludedEndpointId))
501                 .map(Endpoint::getRtt)
502                 .mapToDouble(Double::valueOf)
503                 .max()
504                 .orElse(0);
505     }
506 
507     /**
508      * Expires this <tt>Conference</tt>, its <tt>Content</tt>s and their
509      * respective <tt>Channel</tt>s. Releases the resources acquired by this
510      * instance throughout its life time and prepares it to be garbage
511      * collected.
512      */
expire()513     public void expire()
514     {
515         synchronized (this)
516         {
517             if (expired)
518             {
519                 return;
520             }
521             else
522             {
523                 expired = true;
524             }
525         }
526 
527         logger.info("Expiring.");
528         EventAdmin eventAdmin = getEventAdmin();
529         if (eventAdmin != null)
530         {
531             eventAdmin.sendEvent(EventFactory.conferenceExpired(this));
532         }
533 
534         Videobridge videobridge = getVideobridge();
535 
536         try
537         {
538             videobridge.expireConference(this);
539         }
540         finally
541         {
542             if (logger.isDebugEnabled())
543             {
544                 logger.debug("Expiring endpoints.");
545             }
546             getEndpoints().forEach(AbstractEndpoint::expire);
547             speechActivity.expire();
548             if (tentacle != null)
549             {
550                 tentacle.expire();
551                 tentacle = null;
552             }
553 
554             if (includeInStatistics)
555             {
556                 updateStatisticsOnExpire();
557             }
558         }
559     }
560 
561     /**
562      * Updates the statistics for this conference when it is about to expire.
563      */
updateStatisticsOnExpire()564     private void updateStatisticsOnExpire()
565     {
566         long durationSeconds
567             = Math.round((System.currentTimeMillis() - creationTime) / 1000d);
568 
569         Videobridge.Statistics videobridgeStatistics
570             = getVideobridge().getStatistics();
571 
572         videobridgeStatistics.totalConferencesCompleted
573             .incrementAndGet();
574         videobridgeStatistics.totalConferenceSeconds.addAndGet(
575             durationSeconds);
576 
577         videobridgeStatistics.totalBytesReceived.addAndGet(
578             statistics.totalBytesReceived.get());
579         videobridgeStatistics.totalBytesSent.addAndGet(
580             statistics.totalBytesSent.get());
581         videobridgeStatistics.totalPacketsReceived.addAndGet(
582             statistics.totalPacketsReceived.get());
583         videobridgeStatistics.totalPacketsSent.addAndGet(
584             statistics.totalPacketsSent.get());
585 
586         boolean hasFailed
587             = statistics.hasIceFailedEndpoint
588                 && !statistics.hasIceSucceededEndpoint;
589         boolean hasPartiallyFailed
590             = statistics.hasIceFailedEndpoint
591                 && statistics.hasIceSucceededEndpoint;
592 
593         if (hasPartiallyFailed)
594         {
595             videobridgeStatistics.totalPartiallyFailedConferences
596                 .incrementAndGet();
597         }
598 
599         if (hasFailed)
600         {
601             videobridgeStatistics.totalFailedConferences.incrementAndGet();
602         }
603 
604         if (logger.isInfoEnabled())
605         {
606             StringBuilder sb = new StringBuilder("expire_conf,");
607             sb.append("duration=").append(durationSeconds)
608                 .append(",has_failed=").append(hasFailed)
609                 .append(",has_partially_failed=").append(hasPartiallyFailed);
610             logger.info(sb.toString());
611         }
612     }
613 
614     /**
615      * Finds an <tt>Endpoint</tt> of this <tt>Conference</tt> which sends an RTP
616      * stream with a specific SSRC and with a specific <tt>MediaType</tt>.
617      *
618      * @param receiveSSRC the SSRC of an RTP stream received by this
619      * <tt>Conference</tt> whose sending <tt>Endpoint</tt> is to be found
620      * @return <tt>Endpoint</tt> of this <tt>Conference</tt> which sends an RTP
621      * stream with the specified <tt>ssrc</tt> and with the specified
622      * <tt>mediaType</tt>; otherwise, <tt>null</tt>
623      */
findEndpointByReceiveSSRC(long receiveSSRC)624     AbstractEndpoint findEndpointByReceiveSSRC(long receiveSSRC)
625     {
626         return getEndpoints().stream()
627                 .filter(ep -> ep.receivesSsrc(receiveSSRC))
628                 .findFirst()
629                 .orElse(null);
630     }
631 
632     /**
633      * Returns the OSGi <tt>BundleContext</tt> in which this Conference is
634      * executing.
635      *
636      * @return the OSGi <tt>BundleContext</tt> in which the Conference is
637      * executing.
638      */
getBundleContext()639     public BundleContext getBundleContext()
640     {
641         return getVideobridge().getBundleContext();
642     }
643 
644     /**
645      * Gets an <tt>Endpoint</tt> participating in this <tt>Conference</tt> which
646      * has a specific identifier/ID.
647      *
648      * @param id the identifier/ID of the <tt>Endpoint</tt> which is to be
649      * returned
650      * @return an <tt>Endpoint</tt> participating in this <tt>Conference</tt>
651      * which has the specified <tt>id</tt> or <tt>null</tt>
652      */
653     @Nullable
getEndpoint(@otNull String id)654     public AbstractEndpoint getEndpoint(@NotNull String id)
655     {
656         return endpoints.get(
657             Objects.requireNonNull(id, "id must be non null"));
658     }
659 
660     /**
661      * Initializes a new <tt>Endpoint</tt> instance with the specified
662      * <tt>id</tt> and adds it to the list of <tt>Endpoint</tt>s participating
663      * in this <tt>Conference</tt>.
664      * @param id the identifier/ID of the <tt>Endpoint</tt> which will be
665      * created
666      * @param iceControlling {@code true} if the ICE agent of this endpoint's
667      * transport will be initialized to serve as a controlling ICE agent;
668      * otherwise, {@code false}
669      * @return an <tt>Endpoint</tt> participating in this <tt>Conference</tt>
670      */
671     @NotNull
createLocalEndpoint(String id, boolean iceControlling)672     public Endpoint createLocalEndpoint(String id, boolean iceControlling)
673         throws IOException
674     {
675         final AbstractEndpoint existingEndpoint = getEndpoint(id);
676         if (existingEndpoint instanceof OctoEndpoint)
677         {
678             // It is possible that an Endpoint was migrated from another bridge
679             // in the conference to this one, and the sources lists (which
680             // implicitly signal the Octo endpoints in the conference) haven't
681             // been updated yet. We'll force the Octo endpoint to expire and
682             // we'll continue with the creation of a new local Endpoint for the
683             // participant.
684             existingEndpoint.expire();
685         }
686         else if (existingEndpoint != null)
687         {
688             throw new IllegalArgumentException("Local endpoint with ID = "
689                 + id + "already created");
690         }
691 
692         final Endpoint endpoint = new Endpoint(
693             id, this, logger, iceControlling);
694         // The propertyChangeListener will weakly reference this
695         // Conference and will unregister itself from the endpoint
696         // sooner or later.
697         endpoint.addPropertyChangeListener(propertyChangeListener);
698 
699         addEndpoint(endpoint);
700 
701         EventAdmin eventAdmin = getEventAdmin();
702         if (eventAdmin != null)
703         {
704             eventAdmin.sendEvent(
705                 EventFactory.endpointCreated(endpoint));
706         }
707 
708         return endpoint;
709     }
710 
711     /**
712      * An endpoint was added or removed.
713      */
endpointsChanged()714     private void endpointsChanged()
715     {
716         speechActivity.endpointsChanged();
717     }
718 
719     /**
720      * The media stream tracks of one of the endpoints in this conference
721      * changed.
722      *
723      * @param endpoint the endpoint, or {@code null} if it was an Octo endpoint.
724      */
endpointTracksChanged(AbstractEndpoint endpoint)725     public void endpointTracksChanged(AbstractEndpoint endpoint)
726     {
727         List<String> endpoints = speechActivity.getEndpointIds();
728         endpointsCache.forEach((e) -> {
729             if (e != endpoint)
730             {
731                 e.speechActivityEndpointsChanged(endpoints);
732             }
733         });
734     }
735 
736     /**
737      * Updates {@link #endpointsCache} with the current contents of
738      * {@link #endpoints}.
739      */
updateEndpointsCache()740     private void updateEndpointsCache()
741     {
742         synchronized (endpointsCacheLock)
743         {
744             ArrayList<Endpoint>
745                     endpointsList
746                     = new ArrayList<>(endpoints.size());
747             endpoints.values().forEach(e ->
748             {
749                 if (e instanceof Endpoint)
750                 {
751                     endpointsList.add((Endpoint) e);
752                 }
753             });
754 
755             endpointsCache = Collections.unmodifiableList(endpointsList);
756         }
757     }
758 
759     /**
760      * Returns the number of local AND remote {@link Endpoint}s in this {@link Conference}.
761      *
762      * @return the number of local AND remote {@link Endpoint}s in this {@link Conference}.
763      */
getEndpointCount()764     public int getEndpointCount()
765     {
766         return endpoints.size();
767     }
768 
769     /**
770      * Returns the number of local {@link Endpoint}s in this {@link Conference}.
771      *
772      * @return the number of local {@link Endpoint}s in this {@link Conference}.
773      */
getLocalEndpointCount()774     public int getLocalEndpointCount()
775     {
776         return getLocalEndpoints().size();
777     }
778 
779     /**
780      * Gets the <tt>Endpoint</tt>s participating in/contributing to this
781      * <tt>Conference</tt>.
782      *
783      * @return the <tt>Endpoint</tt>s participating in/contributing to this
784      * <tt>Conference</tt>
785      */
getEndpoints()786     public List<AbstractEndpoint> getEndpoints()
787     {
788         return new ArrayList<>(this.endpoints.values());
789     }
790 
791     /**
792      * Gets the list of endpoints which are local to this bridge (as opposed
793      * to being on a remote bridge through Octo).
794      */
getLocalEndpoints()795     public List<Endpoint> getLocalEndpoints()
796     {
797         return endpointsCache;
798     }
799 
800     /**
801      * Gets the JID of the conference focus who has initialized this instance
802      * and from whom requests to manage this instance must come or they will be
803      * ignored.
804      *
805      * @return the JID of the conference focus who has initialized this instance
806      * and from whom requests to manage this instance must come or they will be
807      * ignored
808      */
getFocus()809     public final Jid getFocus()
810     {
811         return focus;
812     }
813 
814     /**
815      * Gets the (unique) identifier/ID of this instance.
816      *
817      * @return the (unique) identifier/ID of this instance
818      */
getID()819     public final String getID()
820     {
821         return id;
822     }
823 
824     /**
825      * Gets the time in milliseconds of the last activity related to this
826      * <tt>Conference</tt>.
827      *
828      * @return the time in milliseconds of the last activity related to this
829      * <tt>Conference</tt>
830      */
getLastActivityTime()831     public long getLastActivityTime()
832     {
833         synchronized (this)
834         {
835             return lastActivityTime;
836         }
837     }
838 
839     /**
840      * Returns the JID of the last known focus.
841      * @return the JID of the last known focus.
842      */
getLastKnowFocus()843     public Jid getLastKnowFocus()
844     {
845         return lastKnownFocus;
846     }
847 
848     /**
849      * Gets an <tt>Endpoint</tt> participating in this <tt>Conference</tt> which
850      * has a specific identifier/ID.
851      *
852      * @param id the identifier/ID of the <tt>Endpoint</tt> which is to be
853      * returned
854      * @return an <tt>Endpoint</tt> participating in this <tt>Conference</tt>
855      * or {@code null} if endpoint with specified ID does not exist in a
856      * conference
857      */
858     @Nullable
getLocalEndpoint(String id)859     public Endpoint getLocalEndpoint(String id)
860     {
861         AbstractEndpoint endpoint = getEndpoint(id);
862         if (endpoint instanceof Endpoint)
863         {
864             return (Endpoint) endpoint;
865         }
866 
867         return null;
868     }
869 
870     /**
871      * Gets the speech activity (representation) of the <tt>Endpoint</tt>s of
872      * this <tt>Conference</tt>.
873      *
874      * @return the speech activity (representation) of the <tt>Endpoint</tt>s of
875      * this <tt>Conference</tt>
876      */
getSpeechActivity()877     public ConferenceSpeechActivity getSpeechActivity()
878     {
879         return speechActivity;
880     }
881 
882     /**
883      * Gets the <tt>Videobridge</tt> which has initialized this
884      * <tt>Conference</tt>.
885      *
886      * @return the <tt>Videobridge</tt> which has initialized this
887      * <tt>Conference</tt>
888      */
getVideobridge()889     public final Videobridge getVideobridge()
890     {
891         return videobridge;
892     }
893 
894     /**
895      * Gets the indicator which determines whether this <tt>Conference</tt> has
896      * expired.
897      *
898      * @return <tt>true</tt> if this <tt>Conference</tt> has expired; otherwise,
899      * <tt>false</tt>
900      */
isExpired()901     public boolean isExpired()
902     {
903         // this.expired starts as 'false' and only ever changes to 'true',
904         // so there is no need to synchronize while reading.
905         return expired;
906     }
907 
908     /**
909      * Notifies this instance that there was a change in the value of a property
910      * of an object in which this instance is interested.
911      *
912      * @param ev a <tt>PropertyChangeEvent</tt> which specifies the object of
913      * interest, the name of the property and the old and new values of that
914      * property
915      */
916     @Override
917     @SuppressWarnings("unchecked")
propertyChange(PropertyChangeEvent ev)918     public void propertyChange(PropertyChangeEvent ev)
919     {
920         Object source = ev.getSource();
921 
922         if (isExpired())
923         {
924             // An expired Conference is to be treated like a null Conference
925             // i.e. it does not handle any PropertyChangeEvents. If possible,
926             // make sure that no further PropertyChangeEvents will be delivered
927             // to this Conference.
928             if (source instanceof PropertyChangeNotifier)
929             {
930                 ((PropertyChangeNotifier) source).removePropertyChangeListener(
931                         propertyChangeListener);
932             }
933         }
934         else if (Endpoint.SELECTED_ENDPOINTS_PROPERTY_NAME
935                 .equals(ev.getPropertyName()))
936         {
937             Set<String> oldSelectedEndpoints = (Set<String>)ev.getOldValue();
938             Set<String> newSelectedEndpoints = (Set<String>)ev.getNewValue();
939             // Any endpoints in the oldSelectedEndpoints list which AREN'T
940             // in the newSelectedEndpoints list should have their count decremented
941             oldSelectedEndpoints.stream()
942                 .filter(
943                     oldSelectedEp -> !newSelectedEndpoints.contains(oldSelectedEp))
944                 .map(this::getEndpoint)
945                 .filter(Objects::nonNull)
946                 .forEach(AbstractEndpoint::decrementSelectedCount);
947 
948             // Any endpoints in the newSelectedEndpoints list which AREN'T
949             // in the oldSelectedEndpoints list should have their count incremented
950             newSelectedEndpoints.stream()
951                 .filter(
952                     newSelectedEp -> !oldSelectedEndpoints.contains(newSelectedEp))
953                 .map(this::getEndpoint)
954                 .filter(Objects::nonNull)
955                 .forEach(AbstractEndpoint::incrementSelectedCount);
956         }
957     }
958 
959     /**
960      * Notifies this conference that one of it's endpoints has expired.
961      *
962      * @param endpoint the <tt>Endpoint</tt> which expired.
963      */
endpointExpired(AbstractEndpoint endpoint)964     void endpointExpired(AbstractEndpoint endpoint)
965     {
966         final AbstractEndpoint removedEndpoint;
967         String id = endpoint.getID();
968         removedEndpoint = endpoints.remove(id);
969         if (removedEndpoint != null)
970         {
971             updateEndpointsCache();
972         }
973 
974         if (tentacle != null)
975         {
976             tentacle.endpointExpired(id);
977         }
978 
979         if (removedEndpoint != null)
980         {
981             final EventAdmin eventAdmin = getEventAdmin();
982             if (eventAdmin != null)
983             {
984                 eventAdmin.sendEvent(
985                     EventFactory.endpointExpired(removedEndpoint));
986             }
987             endpointsChanged();
988         }
989     }
990 
991     /**
992      * Adds a specific {@link AbstractEndpoint} instance to the list of
993      * endpoints in this conference.
994      * @param endpoint the endpoint to add.
995      */
addEndpoint(AbstractEndpoint endpoint)996     public void addEndpoint(AbstractEndpoint endpoint)
997     {
998         if (endpoint.getConference() != this)
999         {
1000             throw new IllegalArgumentException("Endpoint belong to other " +
1001                 "conference = " + endpoint.getConference());
1002         }
1003 
1004         final AbstractEndpoint replacedEndpoint;
1005         replacedEndpoint = endpoints.put(endpoint.getID(), endpoint);
1006         updateEndpointsCache();
1007 
1008         endpointsChanged();
1009 
1010         if (replacedEndpoint != null)
1011         {
1012             logger.info("Endpoint with id " + endpoint.getID() + ": " +
1013                 replacedEndpoint + " has been replaced by new " +
1014                 "endpoint with same id: " + endpoint);
1015         }
1016     }
1017 
1018     /**
1019      * Notifies this {@link Conference} that one of its {@link Endpoint}s
1020      * transport channel has become available.
1021      *
1022      * @param endpoint the {@link Endpoint} whose transport channel has become
1023      * available.
1024      */
1025     @Override
endpointMessageTransportConnected(@otNull AbstractEndpoint endpoint)1026     public void endpointMessageTransportConnected(@NotNull AbstractEndpoint endpoint)
1027     {
1028         EventAdmin eventAdmin = getEventAdmin();
1029 
1030         if (eventAdmin != null)
1031         {
1032             eventAdmin.postEvent(
1033                 EventFactory.endpointMessageTransportReady(endpoint));
1034         }
1035 
1036         if (!isExpired())
1037         {
1038             AbstractEndpoint dominantSpeaker
1039                     = speechActivity.getDominantEndpoint();
1040 
1041             if (dominantSpeaker != null)
1042             {
1043                 try
1044                 {
1045                     endpoint.sendMessage(
1046                             createDominantSpeakerEndpointChangeEvent(
1047                                 dominantSpeaker.getID()));
1048                 }
1049                 catch (IOException e)
1050                 {
1051                     logger.error(
1052                             "Failed to send dominant speaker update"
1053                                 + " on data channel to " + endpoint.getID(),
1054                             e);
1055                 }
1056             }
1057         }
1058     }
1059 
1060     /**
1061      * Sets the JID of the last known focus.
1062      *
1063      * @param jid the JID of the last known focus.
1064      */
setLastKnownFocus(Jid jid)1065     public void setLastKnownFocus(Jid jid)
1066     {
1067         lastKnownFocus = jid;
1068     }
1069 
1070     /**
1071      * Notifies this instance that the list of ordered endpoints has changed
1072      */
speechActivityEndpointsChanged(List<String> newEndpointIds)1073     void speechActivityEndpointsChanged(List<String> newEndpointIds)
1074     {
1075         endpointsCache.forEach(
1076                 e ->  e.speechActivityEndpointsChanged(newEndpointIds));
1077     }
1078 
1079     /**
1080      * Sets the time in milliseconds of the last activity related to this
1081      * <tt>Conference</tt> to the current system time.
1082      */
touch()1083     public void touch()
1084     {
1085         long now = System.currentTimeMillis();
1086 
1087         synchronized (this)
1088         {
1089             if (getLastActivityTime() < now)
1090             {
1091                 lastActivityTime = now;
1092             }
1093         }
1094     }
1095 
1096     /**
1097      * Gets the conference name.
1098      *
1099      * @return the conference name
1100      */
getName()1101     public Localpart getName()
1102     {
1103         return name;
1104     }
1105 
1106     /**
1107      * Returns the <tt>EventAdmin</tt> instance used by this <tt>Conference</tt>
1108      * and all instances (of {@code Content}, {@code Channel}, etc.) created by
1109      * it.
1110      *
1111      * @return the <tt>EventAdmin</tt> instance used by this <tt>Conference</tt>
1112      */
getEventAdmin()1113     public EventAdmin getEventAdmin()
1114     {
1115         return eventAdmin;
1116     }
1117 
1118     /**
1119      * @return the {@link Logger} used by this instance.
1120      */
getLogger()1121     public Logger getLogger()
1122     {
1123         return logger;
1124     }
1125 
1126     /**
1127      * @return the global ID of the conference, or {@code null} if none has been
1128      * set.
1129      */
getGid()1130     public String getGid()
1131     {
1132         return gid;
1133     }
1134 
1135     /**
1136      * {@inheritDoc}
1137      * </p>
1138      * @return {@code true} if this {@link Conference} is ready to be expired.
1139      */
1140     @Override
shouldExpire()1141     public boolean shouldExpire()
1142     {
1143         // Allow a conference to have no endpoints in the first 20 seconds.
1144         return getEndpointCount() == 0
1145                 && (System.currentTimeMillis() - creationTime > 20000);
1146     }
1147 
1148     /**
1149      * {@inheritDoc}
1150      */
1151     @Override
safeExpire()1152     public void safeExpire()
1153     {
1154         expireableImpl.safeExpire();
1155     }
1156 
1157     /**
1158      * @return this {@link Conference}'s Colibri shim.
1159      */
getShim()1160     public ConferenceShim getShim()
1161     {
1162         return shim;
1163     }
1164 
1165     /**
1166      * Broadcasts the packet to all endpoints and tentacles that want it.
1167      *
1168      * @param packetInfo the packet
1169      */
sendOut(PacketInfo packetInfo)1170     private void sendOut(PacketInfo packetInfo)
1171     {
1172         String sourceEndpointId = packetInfo.getEndpointId();
1173         // We want to avoid calling 'clone' for the last receiver of this packet
1174         // since it's unnecessary.  To do so, we'll wait before we clone and send
1175         // to an interested handler until after we've determined another handler
1176         // is also interested in the packet.  We'll give the last handler the
1177         // original packet (without cloning).
1178         PotentialPacketHandler prevHandler = null;
1179         for (Endpoint endpoint : endpointsCache)
1180         {
1181             if (endpoint.getID().equals(sourceEndpointId))
1182             {
1183                 continue;
1184             }
1185 
1186             if (endpoint.wants(packetInfo))
1187             {
1188                 if (prevHandler != null)
1189                 {
1190                     prevHandler.send(packetInfo.clone());
1191                 }
1192                 prevHandler = endpoint;
1193             }
1194         }
1195         if (tentacle != null && tentacle.wants(packetInfo))
1196         {
1197             if (prevHandler != null)
1198             {
1199                 prevHandler.send(packetInfo.clone());
1200             }
1201             prevHandler = tentacle;
1202         }
1203 
1204         if (prevHandler != null)
1205         {
1206             prevHandler.send(packetInfo);
1207         }
1208         else
1209         {
1210             // No one wanted the packet, so the buffer is now free!
1211             ByteBufferPool.returnBuffer(packetInfo.getPacket().getBuffer());
1212         }
1213     }
1214 
1215     /**
1216      * Gets the audio level listener.
1217      */
getAudioLevelListener()1218     public AudioLevelListener getAudioLevelListener()
1219     {
1220         return audioLevelListener;
1221     }
1222 
1223     /**
1224      * @return The {@link OctoTentacle} for this conference.
1225      */
getTentacle()1226     public OctoTentacle getTentacle()
1227     {
1228         if (tentacle == null)
1229         {
1230             tentacle = new OctoTentacle(this);
1231             tentacle.addPropertyChangeListener(propertyChangeListener);
1232         }
1233         return tentacle;
1234     }
1235 
isOctoEnabled()1236     public boolean isOctoEnabled()
1237     {
1238         return tentacle != null;
1239     }
1240 
1241     /**
1242      * Handles an RTP/RTCP packet coming from a specific endpoint.
1243      * @param packetInfo
1244      */
handleIncomingPacket(PacketInfo packetInfo)1245     public void handleIncomingPacket(PacketInfo packetInfo)
1246     {
1247         Packet packet = packetInfo.getPacket();
1248         if (packet instanceof RtpPacket)
1249         {
1250             // This is identical to the default 'else' below, but it defined
1251             // because the vast majority of packet will follow this path.
1252             sendOut(packetInfo);
1253         }
1254         else if (packet instanceof RtcpFbPliPacket
1255                 || packet instanceof RtcpFbFirPacket)
1256         {
1257             long mediaSsrc = (packet instanceof RtcpFbPliPacket)
1258                 ? ((RtcpFbPliPacket) packet).getMediaSourceSsrc()
1259                 : ((RtcpFbFirPacket) packet).getMediaSenderSsrc();
1260 
1261             // XXX we could make this faster with a map
1262             AbstractEndpoint targetEndpoint
1263                 = findEndpointByReceiveSSRC(mediaSsrc);
1264 
1265             PotentialPacketHandler pph = null;
1266             if (targetEndpoint instanceof Endpoint)
1267             {
1268                 pph = (Endpoint) targetEndpoint;
1269             }
1270             else if (targetEndpoint instanceof OctoEndpoint)
1271             {
1272                 pph = tentacle;
1273             }
1274 
1275             // This is not a redundant check. With Octo and 3 or more bridges,
1276             // some PLI or FIR will come from Octo but the target endpoint will
1277             // also be Octo. We need to filter these out.
1278             if (pph == null)
1279             {
1280                 if (logger.isDebugEnabled())
1281                 {
1282                     logger.debug("Dropping FIR/PLI for media ssrc " + mediaSsrc);
1283                 }
1284             }
1285             else if (pph.wants(packetInfo))
1286             {
1287                 pph.send(packetInfo);
1288             }
1289         }
1290         else
1291         {
1292             sendOut(packetInfo);
1293         }
1294     }
1295 
1296     /**
1297      * Gets a JSON representation of the parts of this object's state that
1298      * are deemed useful for debugging.
1299      *
1300      * @param full if specified the result will include more details and will
1301      * include the debug state of the endpoint(s). Otherwise just the IDs and
1302      * names of the conference and endpoints are included.
1303      * @param endpointId the ID of the endpoint to include. If set to
1304      * {@code null}, all endpoints will be included.
1305      */
1306     @SuppressWarnings("unchecked")
getDebugState(boolean full, String endpointId)1307     public JSONObject getDebugState(boolean full, String endpointId)
1308     {
1309         JSONObject debugState = new JSONObject();
1310         debugState.put("id", id);
1311         debugState.put("name", name == null ? null : name.toString());
1312 
1313         if (full)
1314         {
1315             debugState.put("gid", gid);
1316             debugState.put("expired", expired);
1317             debugState.put("creationTime", creationTime);
1318             debugState.put("lastActivity", lastActivityTime);
1319             debugState.put("speechActivity", speechActivity.getDebugState());
1320             debugState.put("includeInStatistics", includeInStatistics);
1321             debugState.put("statistics", statistics.getJson());
1322             //debugState.put("encodingsManager", encodingsManager.getDebugState());
1323             OctoTentacle tentacle = this.tentacle;
1324             debugState.put(
1325                     "tentacle",
1326                     tentacle == null ? null : tentacle.getDebugState());
1327         }
1328 
1329         JSONObject endpoints = new JSONObject();
1330         debugState.put("endpoints", endpoints);
1331         for (Endpoint e : endpointsCache)
1332         {
1333             if (endpointId == null || endpointId.equals(e.getID()))
1334             {
1335                 endpoints.put(e.getID(),
1336                         full ? e.getDebugState() : e.getStatsId());
1337             }
1338         }
1339         return debugState;
1340     }
1341 
1342     /**
1343      * Whether this looks like a conference in which the two endpoints are
1344      * using a peer-to-peer connection (i.e. none of them are sending audio
1345      * or video).
1346      * This has false positives when e.g. an endpoint doesn't support p2p
1347      * (firefox) and both are audio/video muted.
1348      */
isP2p()1349     public boolean isP2p()
1350     {
1351         return isInactive() && getEndpointCount() == 2;
1352     }
1353 
1354     /**
1355      * Whether the conference is inactive, in the sense that none of its
1356      * endpoints are sending audio or video.
1357      */
isInactive()1358     public boolean isInactive()
1359     {
1360         return getEndpoints().stream().noneMatch(e -> e.isSendingAudio() || e.isSendingVideo());
1361     }
1362 
1363     /**
1364      * Holds conference statistics.
1365      */
1366     public static class Statistics
1367     {
1368         /**
1369          * The total number of bytes received in RTP packets in channels in this
1370          * conference. Note that this is only updated when channels expire.
1371          */
1372         AtomicLong totalBytesReceived = new AtomicLong();
1373 
1374         /**
1375          * The total number of bytes sent in RTP packets in channels in this
1376          * conference. Note that this is only updated when channels expire.
1377          */
1378         AtomicLong totalBytesSent = new AtomicLong();
1379 
1380         /**
1381          * The total number of RTP packets received in channels in this
1382          * conference. Note that this is only updated when channels expire.
1383          */
1384         AtomicLong totalPacketsReceived = new AtomicLong();
1385 
1386         /**
1387          * The total number of RTP packets received in channels in this
1388          * conference. Note that this is only updated when channels expire.
1389          */
1390         AtomicLong totalPacketsSent = new AtomicLong();
1391 
1392         /**
1393          * Whether at least one endpoint in this conference failed ICE.
1394          */
1395         boolean hasIceFailedEndpoint = false;
1396 
1397         /**
1398          * Whether at least one endpoint in this conference completed ICE
1399          * successfully.
1400          */
1401         boolean hasIceSucceededEndpoint = false;
1402 
1403         /**
1404          * Gets a snapshot of this object's state as JSON.
1405          */
1406         @SuppressWarnings("unchecked")
getJson()1407         private JSONObject getJson()
1408         {
1409             JSONObject jsonObject = new JSONObject();
1410             jsonObject.put("total_bytes_received", totalBytesReceived.get());
1411             jsonObject.put("total_bytes_sent", totalBytesSent.get());
1412             jsonObject.put("total_packets_received", totalPacketsReceived.get());
1413             jsonObject.put("total_packets_sent", totalPacketsSent.get());
1414             jsonObject.put("has_failed_endpoint", hasIceFailedEndpoint);
1415             jsonObject.put("has_succeeded_endpoint", hasIceSucceededEndpoint);
1416             return jsonObject;
1417         }
1418     }
1419 
1420     /**
1421      * This is a no-op diagnostic context (one that will record nothing) meant
1422      * to disable logging of time-series for health checks.
1423      */
1424     static class NoOpDiagnosticContext
1425         extends DiagnosticContext
1426     {
1427         @Override
makeTimeSeriesPoint(String timeSeriesName, long tsMs)1428         public TimeSeriesPoint makeTimeSeriesPoint(String timeSeriesName, long tsMs)
1429         {
1430             return new NoOpTimeSeriesPoint();
1431         }
1432 
1433         @Override
put(@otNull String key, @NotNull Object value)1434         public Object put(@NotNull String key, @NotNull Object value)
1435         {
1436             return null;
1437         }
1438     }
1439 
1440     static class NoOpTimeSeriesPoint
1441         extends DiagnosticContext.TimeSeriesPoint
1442     {
NoOpTimeSeriesPoint()1443         public NoOpTimeSeriesPoint()
1444         {
1445             this(Collections.emptyMap());
1446         }
1447 
NoOpTimeSeriesPoint(Map<String, Object> m)1448         public NoOpTimeSeriesPoint(Map<String, Object> m)
1449         {
1450             super(m);
1451         }
1452 
1453         @Override
put(String key, Object value)1454         public Object put(String key, Object value)
1455         {
1456             return null;
1457         }
1458     }
1459 }
1460