1 /*
2  * Copyright @ 2019-Present 8x8, Inc
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.jitsi.videobridge.octo;
17 
18 import kotlin.*;
19 import kotlin.jvm.functions.*;
20 import org.jetbrains.annotations.*;
21 import org.jitsi.nlj.*;
22 import org.jitsi.nlj.format.*;
23 import org.jitsi.nlj.rtcp.*;
24 import org.jitsi.nlj.rtp.*;
25 import org.jitsi.nlj.transform.node.*;
26 import org.jitsi.nlj.util.*;
27 import org.jitsi.osgi.*;
28 import org.jitsi.rtp.*;
29 import org.jitsi.utils.*;
30 import org.jitsi.utils.event.*;
31 import org.jitsi.utils.logging2.*;
32 import org.jitsi.utils.queue.*;
33 import org.jitsi.videobridge.*;
34 import org.jitsi.videobridge.octo.config.*;
35 import org.jitsi.videobridge.util.*;
36 import org.jitsi.videobridge.xmpp.*;
37 import org.jitsi.xmpp.extensions.colibri.*;
38 import org.jitsi.xmpp.extensions.jingle.*;
39 import org.jitsi_modified.impl.neomedia.rtp.*;
40 import org.json.simple.*;
41 import org.osgi.framework.*;
42 
43 import java.net.*;
44 import java.util.*;
45 import java.util.concurrent.*;
46 import java.util.stream.*;
47 
48 /**
49  * The single class in the octo package which serves as a link between a
50  * {@link Conference} and its Octo-related functionality.
51  *
52  * @author Boris Grozev
53  */
54 public class OctoTentacle extends PropertyChangeNotifier implements PotentialPacketHandler
55 {
56     /**
57      * The {@link Logger} used by the {@link OctoTentacle} class and its
58      * instances to print debug information.
59      */
60     private final Logger logger;
61 
62     /**
63      * The conference for this {@link OctoTentacle}.
64      */
65     private final Conference conference;
66 
67     /**
68      * The {@link OctoEndpoints} instance which maintains the list of Octo
69      * endpoints in the conference.
70      */
71     private final OctoEndpoints octoEndpoints;
72 
73     /**
74      * The {@link OctoTransceiver} instance which handles RTP/RTCP processing.
75      */
76     final OctoTransceiver transceiver;
77 
78     /**
79      * The {@link OctoRelay} used to actually send and receive Octo packets.
80      */
81     private final OctoRelay relay;
82 
83     /**
84      * The instance that will request keyframes on behalf of this tentable. Note
85      * that we don't have bridge-to-bridge rtt measurements atm so we use a
86      * default value of 100ms.
87      */
88     private final KeyframeRequester keyframeRequester;
89 
90     /**
91      * The list of remote Octo targets.
92      */
93     private Set<SocketAddress> targets
94             = Collections.unmodifiableSet(new HashSet<>());
95 
96     /**
97      * Count the number of dropped packets and exceptions.
98      */
99     public static final CountingErrorHandler queueErrorCounter
100         = new CountingErrorHandler();
101 
102     /**
103      * The queues which pass packets to be sent.
104      */
105     private Map<String, PacketInfoQueue> outgoingPacketQueues =
106         new ConcurrentHashMap<>();
107 
108     /**
109      * Initializes a new {@link OctoTentacle} instance.
110      * @param conference the conference.
111      */
OctoTentacle(Conference conference)112     public OctoTentacle(Conference conference)
113     {
114         this.conference = conference;
115         this.logger = conference.getLogger().createChildLogger(this.getClass().getName());
116         octoEndpoints = new OctoEndpoints(conference);
117         transceiver = new OctoTransceiver(this, logger);
118 
119         BundleContext bundleContext = conference.getBundleContext();
120         OctoRelayService octoRelayService
121             = bundleContext == null ? null :
122                 ServiceUtils2.getService(bundleContext, OctoRelayService.class);
123 
124 
125         if (octoRelayService != null)
126         {
127             relay = octoRelayService.getRelay();
128             keyframeRequester = new KeyframeRequester(transceiver.getStreamInformationStore(), logger);
129             keyframeRequester.attach(new ConsumerNode("octo keyframe relay node")
130             {
131                 @Override
132                 protected void consume(@NotNull PacketInfo packetInfo)
133                 {
134                     packetInfo.sent();
135                     relay.sendPacket(packetInfo.getPacket(), targets,
136                         conference.getGid(), packetInfo.getEndpointId());
137                 }
138 
139                 @Override
140                 public void trace(@NotNull Function0<Unit> f)
141                 {
142                     f.invoke();
143                 }
144             });
145         }
146         else
147         {
148             relay = null;
149             keyframeRequester = null;
150         }
151     }
152 
153     /**
154      * Gets the audio level listener.
155      * @return
156      */
getAudioLevelListener()157     AudioLevelListener getAudioLevelListener()
158     {
159         return conference.getAudioLevelListener();
160     }
161 
162     /**
163      * Adds a {@link PayloadType}
164      */
addPayloadType(PayloadType payloadType)165     public void addPayloadType(PayloadType payloadType)
166     {
167         transceiver.addPayloadType(payloadType);
168     }
169 
170     /**
171      * Creates a PacketInfoQueue for an endpoint.
172      */
createQueue(String epId)173     private PacketInfoQueue createQueue(String epId)
174     {
175         PacketInfoQueue q = new PacketInfoQueue(
176             "octo-tentacle-outgoing-packet-queue",
177             TaskPools.IO_POOL,
178             this::doSend,
179             OctoConfig.Config.sendQueueSize());
180         q.setErrorHandler(queueErrorCounter);
181         return q;
182     }
183 
184     /**
185      * {@inheritDoc}
186      */
187     @Override
send(PacketInfo packetInfo)188     public void send(PacketInfo packetInfo)
189     {
190         /* We queue packets separately by their *source* endpoint.
191          * This achieves parallelization while guaranteeing that we don't
192          * reorder things that shouldn't be reordered.
193          */
194         PacketInfoQueue queue =
195             outgoingPacketQueues.computeIfAbsent(packetInfo.getEndpointId(),
196              this::createQueue);
197 
198         queue.add(packetInfo);
199     }
200 
201     /**
202      * Send Octo packet out.
203      */
doSend(PacketInfo packetInfo)204     private boolean doSend(PacketInfo packetInfo)
205     {
206         Packet packet = packetInfo.getPacket();
207         if (packet != null)
208         {
209             relay.sendPacket(
210                 packet,
211                 targets,
212                 conference.getGid(),
213                 packetInfo.getEndpointId());
214         }
215         return true;
216     }
217 
218     /**
219      * Sets the list of remote relays to send packets to.
220      * @param relays the list of relay IDs, which are converted to addresses
221      * using the logic in {@link OctoRelay}.
222      */
setRelays(Collection<String> relays)223     public void setRelays(Collection<String> relays)
224     {
225         Objects.requireNonNull(
226                 relay,
227                 "Octo requested but not configured");
228 
229         Set<SocketAddress> socketAddresses = new HashSet<>();
230         for (String relay : relays)
231         {
232             SocketAddress socketAddress = OctoRelay.relayIdToSocketAddress(relay);
233             if (socketAddress != null)
234             {
235                 socketAddresses.add(socketAddress);
236             }
237         }
238 
239         setTargets(socketAddresses);
240     }
241 
242     /**
243      * {@inheritDoc}
244      */
245     @Override
wants(PacketInfo packetInfo)246     public boolean wants(PacketInfo packetInfo)
247     {
248         // Cthulhu devours everything (as long as it's not coming from
249         // itself, and we have targets).
250         return !(packetInfo instanceof OctoPacketInfo) && !targets.isEmpty();
251     }
252 
253     /**
254      * Sets the list of sources and source groups which describe the RTP streams
255      * we expect to receive from remote Octo relays.
256      *
257      * @param audioSources the list of audio sources.
258      * @param videoSources the list of video sources.
259      * @param videoSourceGroups the list of source groups for video.
260      */
setSources( List<SourcePacketExtension> audioSources, List<SourcePacketExtension> videoSources, List<SourceGroupPacketExtension> videoSourceGroups)261     public void setSources(
262             List<SourcePacketExtension> audioSources,
263             List<SourcePacketExtension> videoSources,
264             List<SourceGroupPacketExtension> videoSourceGroups)
265     {
266         MediaStreamTrackDesc[] tracks =
267             MediaStreamTrackFactory.createMediaStreamTracks(
268                     videoSources, videoSourceGroups);
269         transceiver.setMediaStreamTracks(tracks);
270 
271         List<SourcePacketExtension> allSources = new LinkedList<>(audioSources);
272         allSources.addAll(videoSources);
273 
274         // Jicofo sends an empty "source" when it wants to clear the sources.
275         // This manifests as a failure to find an 'owner', hence we clear the
276         // nulls here.
277         Set<String> endpointIds
278                 = allSources.stream()
279                     .map(source -> MediaStreamTrackFactory.getOwner(source))
280                     .filter(Objects::nonNull)
281                     .collect(Collectors.toSet());
282 
283         octoEndpoints.setEndpoints(endpointIds);
284 
285         // We only need to call this if the tracks of any endpoint actually
286         // changed, but that's not easy to detect. It's safe to call it more
287         // often.
288         conference.endpointTracksChanged(null);
289 
290         endpointIds.forEach(endpointId ->
291         {
292             Map<MediaType, Set<Long>> endpointSsrcsByMediaType = new HashMap<>();
293             Set<Long> epAudioSsrcs = audioSources.stream()
294                     .filter(source -> endpointId.equals(MediaStreamTrackFactory.getOwner(source)))
295                     .filter(Objects::nonNull)
296                     .map(SourcePacketExtension::getSSRC)
297                     .collect(Collectors.toSet());
298             endpointSsrcsByMediaType.put(MediaType.AUDIO, epAudioSsrcs);
299 
300             Set<Long> epVideoSsrcs = videoSources.stream()
301                     .filter(source -> endpointId.equals(MediaStreamTrackFactory.getOwner(source)))
302                     .filter(Objects::nonNull)
303                     .map(SourcePacketExtension::getSSRC)
304                     .collect(Collectors.toSet());
305             endpointSsrcsByMediaType.put(MediaType.VIDEO, epVideoSsrcs);
306 
307             AbstractEndpoint endpoint = conference.getEndpoint(endpointId);
308             if (endpoint instanceof OctoEndpoint)
309             {
310                 ((OctoEndpoint) endpoint).setReceiveSsrcs(endpointSsrcsByMediaType);
311             }
312             else
313             {
314                 logger.warn("No OctoEndpoint for SSRCs");
315             }
316         });
317     }
318 
319     /**
320      * Called when a local endpoint is expired.
321      */
endpointExpired(String endpointId)322     public void endpointExpired(String endpointId)
323     {
324         outgoingPacketQueues.remove(endpointId);
325     }
326 
327     /**
328      * Handles and RTP packet coming from a remote Octo relay after it has
329      * been parsed and handled by our {@link #transceiver}.
330      * @param packetInfo the packet to handle.
331      */
handleIncomingPacket(PacketInfo packetInfo)332     void handleIncomingPacket(PacketInfo packetInfo)
333     {
334         conference.handleIncomingPacket(packetInfo);
335     }
336 
337     /**
338      * Handles a message received from an Octo relay.
339      * @param message
340      */
handleMessage(String message)341     public void handleMessage(String message)
342     {
343         octoEndpoints.messageTransport.onMessage(null /* source */ , message);
344     }
345 
346     /**
347      * Sets the list of remote addresses to send Octo packets to.
348      * @param targets the list of addresses.
349      */
setTargets(Set<SocketAddress> targets)350     private void setTargets(Set<SocketAddress> targets)
351     {
352         if (!targets.equals(this.targets))
353         {
354             this.targets = Collections.unmodifiableSet(targets);
355 
356             if (targets.isEmpty())
357             {
358                 relay.removeHandler(conference.getGid(), transceiver);
359             }
360             else
361             {
362                 relay.addHandler(conference.getGid(), transceiver);
363             }
364         }
365     }
366 
367     /**
368      * Adds an RTP header extension.
369      * @param extensionId
370      * @param rtpExtension
371      */
addRtpExtension(RtpExtension rtpExtension)372     public void addRtpExtension(RtpExtension rtpExtension)
373     {
374         transceiver.addRtpExtension(rtpExtension);
375     }
376 
377     /**
378      * Expires the Octo-related parts of a confence.
379      */
expire()380     public void expire()
381     {
382         logger.info("Expiring");
383         setRelays(new LinkedList<>());
384         octoEndpoints.setEndpoints(Collections.emptySet());
385     }
386 
387     /**
388      * Sends a data message through the Octo relay.
389      * @param message
390      */
sendMessage(String message)391     public void sendMessage(String message)
392     {
393         relay.sendString(
394                 message,
395                 targets,
396                 conference.getGid(),
397                 null);
398     }
399 
400     /**
401      * Gets a JSON representation of the parts of this object's state that
402      * are deemed useful for debugging.
403      */
404     @SuppressWarnings("unchecked")
getDebugState()405     public JSONObject getDebugState()
406     {
407         JSONObject debugState = new JSONObject();
408         debugState.put("octoEndpoints", octoEndpoints.getDebugState());
409         debugState.put("transceiver", transceiver.getDebugState());
410         debugState.put("relay", relay.getDebugState());
411         debugState.put("targets", targets.toString());
412 
413         return debugState;
414     }
415 
requestKeyframe(long mediaSsrc)416     public void requestKeyframe(long mediaSsrc)
417     {
418         if (keyframeRequester != null)
419         {
420             keyframeRequester.requestKeyframe(mediaSsrc);
421         }
422         else
423         {
424             logger.warn("Failed to request a keyframe from a foreign endpoint.");
425         }
426     }
427 }
428