1 /* 2 * Copyright @ 2019-Present 8x8, Inc 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package org.jitsi.videobridge.octo; 17 18 import kotlin.*; 19 import kotlin.jvm.functions.*; 20 import org.jetbrains.annotations.*; 21 import org.jitsi.nlj.*; 22 import org.jitsi.nlj.format.*; 23 import org.jitsi.nlj.rtcp.*; 24 import org.jitsi.nlj.rtp.*; 25 import org.jitsi.nlj.transform.node.*; 26 import org.jitsi.nlj.util.*; 27 import org.jitsi.osgi.*; 28 import org.jitsi.rtp.*; 29 import org.jitsi.utils.*; 30 import org.jitsi.utils.event.*; 31 import org.jitsi.utils.logging2.*; 32 import org.jitsi.utils.queue.*; 33 import org.jitsi.videobridge.*; 34 import org.jitsi.videobridge.octo.config.*; 35 import org.jitsi.videobridge.util.*; 36 import org.jitsi.videobridge.xmpp.*; 37 import org.jitsi.xmpp.extensions.colibri.*; 38 import org.jitsi.xmpp.extensions.jingle.*; 39 import org.jitsi_modified.impl.neomedia.rtp.*; 40 import org.json.simple.*; 41 import org.osgi.framework.*; 42 43 import java.net.*; 44 import java.util.*; 45 import java.util.concurrent.*; 46 import java.util.stream.*; 47 48 /** 49 * The single class in the octo package which serves as a link between a 50 * {@link Conference} and its Octo-related functionality. 51 * 52 * @author Boris Grozev 53 */ 54 public class OctoTentacle extends PropertyChangeNotifier implements PotentialPacketHandler 55 { 56 /** 57 * The {@link Logger} used by the {@link OctoTentacle} class and its 58 * instances to print debug information. 59 */ 60 private final Logger logger; 61 62 /** 63 * The conference for this {@link OctoTentacle}. 64 */ 65 private final Conference conference; 66 67 /** 68 * The {@link OctoEndpoints} instance which maintains the list of Octo 69 * endpoints in the conference. 70 */ 71 private final OctoEndpoints octoEndpoints; 72 73 /** 74 * The {@link OctoTransceiver} instance which handles RTP/RTCP processing. 75 */ 76 final OctoTransceiver transceiver; 77 78 /** 79 * The {@link OctoRelay} used to actually send and receive Octo packets. 80 */ 81 private final OctoRelay relay; 82 83 /** 84 * The instance that will request keyframes on behalf of this tentable. Note 85 * that we don't have bridge-to-bridge rtt measurements atm so we use a 86 * default value of 100ms. 87 */ 88 private final KeyframeRequester keyframeRequester; 89 90 /** 91 * The list of remote Octo targets. 92 */ 93 private Set<SocketAddress> targets 94 = Collections.unmodifiableSet(new HashSet<>()); 95 96 /** 97 * Count the number of dropped packets and exceptions. 98 */ 99 public static final CountingErrorHandler queueErrorCounter 100 = new CountingErrorHandler(); 101 102 /** 103 * The queues which pass packets to be sent. 104 */ 105 private Map<String, PacketInfoQueue> outgoingPacketQueues = 106 new ConcurrentHashMap<>(); 107 108 /** 109 * Initializes a new {@link OctoTentacle} instance. 110 * @param conference the conference. 111 */ OctoTentacle(Conference conference)112 public OctoTentacle(Conference conference) 113 { 114 this.conference = conference; 115 this.logger = conference.getLogger().createChildLogger(this.getClass().getName()); 116 octoEndpoints = new OctoEndpoints(conference); 117 transceiver = new OctoTransceiver(this, logger); 118 119 BundleContext bundleContext = conference.getBundleContext(); 120 OctoRelayService octoRelayService 121 = bundleContext == null ? null : 122 ServiceUtils2.getService(bundleContext, OctoRelayService.class); 123 124 125 if (octoRelayService != null) 126 { 127 relay = octoRelayService.getRelay(); 128 keyframeRequester = new KeyframeRequester(transceiver.getStreamInformationStore(), logger); 129 keyframeRequester.attach(new ConsumerNode("octo keyframe relay node") 130 { 131 @Override 132 protected void consume(@NotNull PacketInfo packetInfo) 133 { 134 packetInfo.sent(); 135 relay.sendPacket(packetInfo.getPacket(), targets, 136 conference.getGid(), packetInfo.getEndpointId()); 137 } 138 139 @Override 140 public void trace(@NotNull Function0<Unit> f) 141 { 142 f.invoke(); 143 } 144 }); 145 } 146 else 147 { 148 relay = null; 149 keyframeRequester = null; 150 } 151 } 152 153 /** 154 * Gets the audio level listener. 155 * @return 156 */ getAudioLevelListener()157 AudioLevelListener getAudioLevelListener() 158 { 159 return conference.getAudioLevelListener(); 160 } 161 162 /** 163 * Adds a {@link PayloadType} 164 */ addPayloadType(PayloadType payloadType)165 public void addPayloadType(PayloadType payloadType) 166 { 167 transceiver.addPayloadType(payloadType); 168 } 169 170 /** 171 * Creates a PacketInfoQueue for an endpoint. 172 */ createQueue(String epId)173 private PacketInfoQueue createQueue(String epId) 174 { 175 PacketInfoQueue q = new PacketInfoQueue( 176 "octo-tentacle-outgoing-packet-queue", 177 TaskPools.IO_POOL, 178 this::doSend, 179 OctoConfig.Config.sendQueueSize()); 180 q.setErrorHandler(queueErrorCounter); 181 return q; 182 } 183 184 /** 185 * {@inheritDoc} 186 */ 187 @Override send(PacketInfo packetInfo)188 public void send(PacketInfo packetInfo) 189 { 190 /* We queue packets separately by their *source* endpoint. 191 * This achieves parallelization while guaranteeing that we don't 192 * reorder things that shouldn't be reordered. 193 */ 194 PacketInfoQueue queue = 195 outgoingPacketQueues.computeIfAbsent(packetInfo.getEndpointId(), 196 this::createQueue); 197 198 queue.add(packetInfo); 199 } 200 201 /** 202 * Send Octo packet out. 203 */ doSend(PacketInfo packetInfo)204 private boolean doSend(PacketInfo packetInfo) 205 { 206 Packet packet = packetInfo.getPacket(); 207 if (packet != null) 208 { 209 relay.sendPacket( 210 packet, 211 targets, 212 conference.getGid(), 213 packetInfo.getEndpointId()); 214 } 215 return true; 216 } 217 218 /** 219 * Sets the list of remote relays to send packets to. 220 * @param relays the list of relay IDs, which are converted to addresses 221 * using the logic in {@link OctoRelay}. 222 */ setRelays(Collection<String> relays)223 public void setRelays(Collection<String> relays) 224 { 225 Objects.requireNonNull( 226 relay, 227 "Octo requested but not configured"); 228 229 Set<SocketAddress> socketAddresses = new HashSet<>(); 230 for (String relay : relays) 231 { 232 SocketAddress socketAddress = OctoRelay.relayIdToSocketAddress(relay); 233 if (socketAddress != null) 234 { 235 socketAddresses.add(socketAddress); 236 } 237 } 238 239 setTargets(socketAddresses); 240 } 241 242 /** 243 * {@inheritDoc} 244 */ 245 @Override wants(PacketInfo packetInfo)246 public boolean wants(PacketInfo packetInfo) 247 { 248 // Cthulhu devours everything (as long as it's not coming from 249 // itself, and we have targets). 250 return !(packetInfo instanceof OctoPacketInfo) && !targets.isEmpty(); 251 } 252 253 /** 254 * Sets the list of sources and source groups which describe the RTP streams 255 * we expect to receive from remote Octo relays. 256 * 257 * @param audioSources the list of audio sources. 258 * @param videoSources the list of video sources. 259 * @param videoSourceGroups the list of source groups for video. 260 */ setSources( List<SourcePacketExtension> audioSources, List<SourcePacketExtension> videoSources, List<SourceGroupPacketExtension> videoSourceGroups)261 public void setSources( 262 List<SourcePacketExtension> audioSources, 263 List<SourcePacketExtension> videoSources, 264 List<SourceGroupPacketExtension> videoSourceGroups) 265 { 266 MediaStreamTrackDesc[] tracks = 267 MediaStreamTrackFactory.createMediaStreamTracks( 268 videoSources, videoSourceGroups); 269 transceiver.setMediaStreamTracks(tracks); 270 271 List<SourcePacketExtension> allSources = new LinkedList<>(audioSources); 272 allSources.addAll(videoSources); 273 274 // Jicofo sends an empty "source" when it wants to clear the sources. 275 // This manifests as a failure to find an 'owner', hence we clear the 276 // nulls here. 277 Set<String> endpointIds 278 = allSources.stream() 279 .map(source -> MediaStreamTrackFactory.getOwner(source)) 280 .filter(Objects::nonNull) 281 .collect(Collectors.toSet()); 282 283 octoEndpoints.setEndpoints(endpointIds); 284 285 // We only need to call this if the tracks of any endpoint actually 286 // changed, but that's not easy to detect. It's safe to call it more 287 // often. 288 conference.endpointTracksChanged(null); 289 290 endpointIds.forEach(endpointId -> 291 { 292 Map<MediaType, Set<Long>> endpointSsrcsByMediaType = new HashMap<>(); 293 Set<Long> epAudioSsrcs = audioSources.stream() 294 .filter(source -> endpointId.equals(MediaStreamTrackFactory.getOwner(source))) 295 .filter(Objects::nonNull) 296 .map(SourcePacketExtension::getSSRC) 297 .collect(Collectors.toSet()); 298 endpointSsrcsByMediaType.put(MediaType.AUDIO, epAudioSsrcs); 299 300 Set<Long> epVideoSsrcs = videoSources.stream() 301 .filter(source -> endpointId.equals(MediaStreamTrackFactory.getOwner(source))) 302 .filter(Objects::nonNull) 303 .map(SourcePacketExtension::getSSRC) 304 .collect(Collectors.toSet()); 305 endpointSsrcsByMediaType.put(MediaType.VIDEO, epVideoSsrcs); 306 307 AbstractEndpoint endpoint = conference.getEndpoint(endpointId); 308 if (endpoint instanceof OctoEndpoint) 309 { 310 ((OctoEndpoint) endpoint).setReceiveSsrcs(endpointSsrcsByMediaType); 311 } 312 else 313 { 314 logger.warn("No OctoEndpoint for SSRCs"); 315 } 316 }); 317 } 318 319 /** 320 * Called when a local endpoint is expired. 321 */ endpointExpired(String endpointId)322 public void endpointExpired(String endpointId) 323 { 324 outgoingPacketQueues.remove(endpointId); 325 } 326 327 /** 328 * Handles and RTP packet coming from a remote Octo relay after it has 329 * been parsed and handled by our {@link #transceiver}. 330 * @param packetInfo the packet to handle. 331 */ handleIncomingPacket(PacketInfo packetInfo)332 void handleIncomingPacket(PacketInfo packetInfo) 333 { 334 conference.handleIncomingPacket(packetInfo); 335 } 336 337 /** 338 * Handles a message received from an Octo relay. 339 * @param message 340 */ handleMessage(String message)341 public void handleMessage(String message) 342 { 343 octoEndpoints.messageTransport.onMessage(null /* source */ , message); 344 } 345 346 /** 347 * Sets the list of remote addresses to send Octo packets to. 348 * @param targets the list of addresses. 349 */ setTargets(Set<SocketAddress> targets)350 private void setTargets(Set<SocketAddress> targets) 351 { 352 if (!targets.equals(this.targets)) 353 { 354 this.targets = Collections.unmodifiableSet(targets); 355 356 if (targets.isEmpty()) 357 { 358 relay.removeHandler(conference.getGid(), transceiver); 359 } 360 else 361 { 362 relay.addHandler(conference.getGid(), transceiver); 363 } 364 } 365 } 366 367 /** 368 * Adds an RTP header extension. 369 * @param extensionId 370 * @param rtpExtension 371 */ addRtpExtension(RtpExtension rtpExtension)372 public void addRtpExtension(RtpExtension rtpExtension) 373 { 374 transceiver.addRtpExtension(rtpExtension); 375 } 376 377 /** 378 * Expires the Octo-related parts of a confence. 379 */ expire()380 public void expire() 381 { 382 logger.info("Expiring"); 383 setRelays(new LinkedList<>()); 384 octoEndpoints.setEndpoints(Collections.emptySet()); 385 } 386 387 /** 388 * Sends a data message through the Octo relay. 389 * @param message 390 */ sendMessage(String message)391 public void sendMessage(String message) 392 { 393 relay.sendString( 394 message, 395 targets, 396 conference.getGid(), 397 null); 398 } 399 400 /** 401 * Gets a JSON representation of the parts of this object's state that 402 * are deemed useful for debugging. 403 */ 404 @SuppressWarnings("unchecked") getDebugState()405 public JSONObject getDebugState() 406 { 407 JSONObject debugState = new JSONObject(); 408 debugState.put("octoEndpoints", octoEndpoints.getDebugState()); 409 debugState.put("transceiver", transceiver.getDebugState()); 410 debugState.put("relay", relay.getDebugState()); 411 debugState.put("targets", targets.toString()); 412 413 return debugState; 414 } 415 requestKeyframe(long mediaSsrc)416 public void requestKeyframe(long mediaSsrc) 417 { 418 if (keyframeRequester != null) 419 { 420 keyframeRequester.requestKeyframe(mediaSsrc); 421 } 422 else 423 { 424 logger.warn("Failed to request a keyframe from a foreign endpoint."); 425 } 426 } 427 } 428