1 /* 2 * Copyright @ 2015 - Present, 8x8 Inc 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package org.jitsi.videobridge.pubsub; 17 18 import java.util.*; 19 import java.util.concurrent.*; 20 21 import org.jitsi.videobridge.stats.*; 22 import org.jitsi.videobridge.xmpp.*; 23 import org.jitsi.utils.logging2.*; 24 import org.jivesoftware.smack.packet.*; 25 import org.jivesoftware.smack.packet.id.*; 26 import org.jivesoftware.smackx.pubsub.*; 27 import org.jivesoftware.smackx.pubsub.packet.*; 28 import org.jivesoftware.smackx.xdata.packet.*; 29 import org.jxmpp.jid.*; 30 import org.osgi.framework.*; 31 32 /** 33 * Implements some parts of PubSub (XEP-0060: Publish-Subscribe) for the 34 * purposes of a publisher (e.g. statistics transport). 35 * 36 * @author Hristo Terezov 37 * @author Lyubomir Marinov 38 */ 39 public class PubSubPublisher 40 { 41 /** 42 * Maps a service name (e.g. "pubsub.example.com") to the 43 * <tt>PubSubPublisher</tt> instance responsible for it. 44 */ 45 private static final Map<Jid, PubSubPublisher> instances 46 = new ConcurrentHashMap<>(); 47 48 /** 49 * The <tt>Logger</tt> used by the <tt>PubSubPublisher</tt> class and its 50 * instances to print debug information. 51 */ 52 private static final Logger logger 53 = new LoggerImpl(PubSubPublisher.class.getName()); 54 55 /** 56 * The default timeout of the packets in milliseconds. 57 */ 58 private static final int PACKET_TIMEOUT = 5000; 59 60 /** 61 * Gets a <tt>PubSubPublisher</tt> instance for a specific service (name). 62 * If a <tt>PubSubPublisher</tt> instance for the specified 63 * <tt>serviceName</tt> does not exist yet, a new instance is initialized. 64 * 65 * @param serviceName the name of the service 66 * @return the <tt>PubSubPublisher</tt> instance for the specified 67 * <tt>serviceName</tt> 68 */ getPubsubManager(Jid serviceName)69 public static PubSubPublisher getPubsubManager(Jid serviceName) 70 { 71 PubSubPublisher publisher = instances.get(serviceName); 72 73 if (publisher == null) 74 { 75 publisher = new PubSubPublisher(serviceName); 76 instances.put(serviceName, publisher); 77 } 78 79 return publisher; 80 } 81 82 /** 83 * Handles received response IQ packet. 84 * 85 * @param response the IQ packet. 86 */ handleIQResponse(IQ response)87 public static void handleIQResponse(IQ response) 88 { 89 IQ.Type type = response.getType(); 90 91 if (IQ.Type.error.equals(type)) 92 { 93 PubSubPublisher publisher = instances.get(response.getFrom()); 94 95 if (publisher != null) 96 { 97 publisher.handleErrorResponse(response); 98 } 99 } 100 else if (IQ.Type.result.equals(type)) 101 { 102 PubSubPublisher publisher = instances.get(response.getFrom()); 103 104 if (publisher != null) 105 { 106 publisher.handleCreateNodeResponse(response); 107 publisher.handleConfigureResponse(response); 108 publisher.handlePublishResponse(response); 109 } 110 } 111 } 112 113 /** 114 * Releases the resources for the <tt>PubSubPublisher</tt> and removes it 115 * from the list of available instances. 116 * 117 * @param publisher the <tt>PubSubPublisher</tt> release. 118 */ releasePubsubManager(PubSubPublisher publisher)119 public static void releasePubsubManager(PubSubPublisher publisher) 120 { 121 instances.values().remove(publisher); 122 publisher.dispose(); 123 } 124 125 /** 126 * Listeners for response events. 127 */ 128 private List<PubSubResponseListener> listeners = new LinkedList<>(); 129 130 /** 131 * List of the accessible PubSub nodes. 132 */ 133 private List<String> nodes = new LinkedList<>(); 134 135 /** 136 * Map with the requests for configuring a node. 137 */ 138 private Map<String, String> pendingConfigureRequests 139 = new ConcurrentHashMap<>(); 140 141 /** 142 * Map with the requests for node creation. 143 */ 144 private Map<String, String> pendingCreateRequests 145 = new ConcurrentHashMap<>(); 146 147 /** 148 * Map with the publish requests. 149 */ 150 private Map<String, String> pendingPublishRequests 151 = new ConcurrentHashMap<>(); 152 153 /** 154 * The name of the PubSub service. 155 */ 156 private Jid serviceName; 157 158 /** 159 * Timer for timeout of the requests that we are sending. 160 */ 161 private Timer timeoutTimer = new Timer(); 162 163 /** 164 * Initializes a new <tt>PubSubPublisher</tt> instance for a specific 165 * service (name). 166 * 167 * @param serviceName the name of the service. 168 */ PubSubPublisher(Jid serviceName)169 private PubSubPublisher(Jid serviceName) 170 { 171 this.serviceName = serviceName; 172 } 173 174 /** 175 * Adds a new <tt>PubSubResponseListener</tt> to the list of listeners. 176 * 177 * @param l the listener to add. 178 * @throws NullPointerException if <tt>l</tt> is <tt>null</tt> 179 */ addResponseListener(PubSubResponseListener l)180 public void addResponseListener(PubSubResponseListener l) 181 { 182 if (l == null) 183 throw new NullPointerException("l"); 184 else if(!listeners.contains(l)) 185 listeners.add(l); 186 } 187 188 /** 189 * Configures PubSub node. 190 * 191 * @param nodeName the name of the node 192 */ configureNode(String nodeName)193 private void configureNode(String nodeName) 194 { 195 ConfigureForm cfg = new ConfigureForm(DataForm.Type.submit); 196 PubSub pubsub = new PubSub(); 197 198 cfg.setAccessModel(AccessModel.open); 199 cfg.setPersistentItems(false); 200 cfg.setPublishModel(PublishModel.open); 201 pubsub.setTo(serviceName); 202 pubsub.setType(IQ.Type.set); 203 204 final String packetID = StanzaIdUtil.newStanzaId(); 205 206 pubsub.setStanzaId(packetID); 207 pubsub.addExtension( 208 new FormNode(FormNodeType.CONFIGURE_OWNER, nodeName ,cfg)); 209 try 210 { 211 send(pubsub); 212 } 213 catch (Exception e) 214 { 215 logger.error("Error sending configuration form."); 216 fireResponseCreateEvent(PubSubResponseListener.Response.SUCCESS); 217 return; 218 } 219 pendingConfigureRequests.put(packetID, nodeName); 220 timeoutTimer.schedule( 221 new TimerTask() 222 { 223 @Override 224 public void run() 225 { 226 String nodeName 227 = pendingConfigureRequests.remove(packetID); 228 229 if(nodeName != null) 230 { 231 logger.error( 232 "Timed out a configuration request " 233 + "(packetID=: " + packetID 234 + " nodeName=" + nodeName + ")"); 235 fireResponseCreateEvent( 236 PubSubResponseListener.Response.SUCCESS); 237 } 238 } 239 }, 240 PACKET_TIMEOUT); 241 } 242 243 /** 244 * Creates a PubSub node. 245 * 246 * @param nodeName the name of the node. 247 * @throws Exception if sending the request fails. 248 */ createNode(String nodeName)249 public void createNode(String nodeName) 250 throws Exception 251 { 252 PubSub request = new PubSub(); 253 254 request.setTo(serviceName); 255 request.setType(IQ.Type.set); 256 257 final String packetID = StanzaIdUtil.newStanzaId(); 258 259 request.setStanzaId(packetID); 260 request.addExtension( 261 new NodeExtension(PubSubElementType.CREATE, nodeName)); 262 263 pendingCreateRequests.put(packetID, nodeName); 264 265 // Send the request before starting the timer, as we have observed 266 // sending to be significantly delayed (possibly waiting for the XMPP 267 // component connection to become ready). 268 send(request); 269 270 timeoutTimer.schedule( 271 new TimerTask() 272 { 273 @Override 274 public void run() 275 { 276 String nodeName = pendingCreateRequests.remove(packetID); 277 if (nodeName != null) 278 { 279 logger.warn("Timed out a create request with ID " 280 + packetID); 281 } 282 } 283 }, 284 PACKET_TIMEOUT); 285 } 286 287 /** 288 * Releases the resources of this <tt>PubSubPublisher</tt> i.e. prepares it 289 * for garbage collection. 290 */ dispose()291 private void dispose() 292 { 293 timeoutTimer.cancel(); 294 timeoutTimer = null; 295 296 listeners = null; 297 nodes = null; 298 pendingConfigureRequests = null; 299 pendingCreateRequests = null; 300 pendingPublishRequests = null; 301 serviceName = null; 302 } 303 304 /** 305 * Fires event about the response of creating a node. 306 * 307 * @param type the type of the response 308 */ fireResponseCreateEvent(PubSubResponseListener.Response type)309 private void fireResponseCreateEvent(PubSubResponseListener.Response type) 310 { 311 for(PubSubResponseListener l : listeners) 312 l.onCreateNodeResponse(type); 313 } 314 315 /** 316 * Fires event about the response of publishing an item to a node. 317 * 318 * @param type the type of the response 319 */ fireResponsePublishEvent( PubSubResponseListener.Response type, IQ iq)320 private void fireResponsePublishEvent( 321 PubSubResponseListener.Response type, 322 IQ iq) 323 { 324 for(PubSubResponseListener l : listeners) 325 l.onPublishResponse(type, iq); 326 } 327 328 /** 329 * Handles PubSub configuration responses. 330 * 331 * @param response the configuration response. 332 */ handleConfigureResponse(IQ response)333 private void handleConfigureResponse(IQ response) 334 { 335 if(pendingConfigureRequests.remove(response.getStanzaId()) != null) 336 fireResponseCreateEvent(PubSubResponseListener.Response.SUCCESS); 337 } 338 339 /** 340 * Handles responses about PubSub node creation. 341 * 342 * @param response the response 343 */ handleCreateNodeResponse(IQ response)344 private void handleCreateNodeResponse(IQ response) 345 { 346 String packetID = response.getStanzaId(); 347 String nodeName = pendingCreateRequests.remove(packetID); 348 349 if (nodeName != null) 350 { 351 nodes.add(nodeName); 352 configureNode(nodeName); 353 } 354 } 355 356 /** 357 * Handles all error responses. 358 * 359 * @param response the response 360 */ handleErrorResponse(IQ response)361 private void handleErrorResponse(IQ response) 362 { 363 XMPPError err = response.getError(); 364 String packetID = response.getStanzaId(); 365 366 if(err != null) 367 { 368 XMPPError.Type errType = err.getType(); 369 XMPPError.Condition errCondition = err.getCondition(); 370 371 if((XMPPError.Type.CANCEL.equals(errType) 372 && (XMPPError.Condition.conflict.equals(errCondition) 373 || XMPPError.Condition.forbidden.equals( 374 errCondition))) 375 /* prosody bug, for backward compatibility */ 376 || (XMPPError.Type.AUTH.equals(errType) 377 && XMPPError.Condition.forbidden.equals(errCondition))) 378 { 379 if (XMPPError.Condition.forbidden.equals(errCondition)) 380 { 381 logger.warn( 382 "Creating node failed with <forbidden/> error." 383 + " Continuing anyway."); 384 } 385 386 String nodeName = pendingCreateRequests.remove(packetID); 387 logger.info("PubSub node already exists (packetID=" + packetID 388 + " nodeName=" + nodeName +")"); 389 390 if (nodeName != null) 391 { 392 // The node exists already (<conflict/>) or we are not 393 // allowed (forbidden/>). 394 nodes.add(nodeName); 395 fireResponseCreateEvent( 396 PubSubResponseListener.Response.SUCCESS); 397 return; 398 } 399 } 400 } 401 402 String nodeName; 403 StringBuilder errMsg = new StringBuilder("Error received"); 404 405 if((nodeName = pendingCreateRequests.remove(packetID)) != null) 406 { 407 fireResponseCreateEvent(PubSubResponseListener.Response.FAIL); 408 errMsg.append(" when creating the node: "); 409 } 410 else if((nodeName = pendingConfigureRequests.remove(packetID)) != null) 411 { 412 fireResponseCreateEvent(PubSubResponseListener.Response.SUCCESS); 413 errMsg.append(" when configuring the node: "); 414 } 415 else if((nodeName = pendingPublishRequests.remove(packetID)) != null) 416 { 417 fireResponsePublishEvent( 418 PubSubResponseListener.Response.FAIL, 419 response); 420 errMsg.append(" when publishing to the node: "); 421 } 422 else 423 { 424 nodeName = null; 425 } 426 if (nodeName != null) 427 errMsg.append(nodeName); 428 // Finish the sentence started with "Error received". 429 errMsg.append("."); 430 if(err != null) 431 { 432 errMsg.append(" Message: ").append(err.getDescriptiveText()) 433 .append(". Condition: ").append(err.getCondition()) 434 .append(". For packet with id: ").append(packetID) 435 .append("."); 436 } 437 logger.error(errMsg); 438 } 439 440 /** 441 * Handles PubSub publish responses. 442 * 443 * @param response the response 444 */ handlePublishResponse(IQ response)445 private void handlePublishResponse(IQ response) 446 { 447 if (pendingPublishRequests.remove(response.getStanzaId()) != null) 448 { 449 fireResponsePublishEvent( 450 PubSubResponseListener.Response.SUCCESS, 451 response); 452 } 453 } 454 455 /** 456 * Publishes items to a given PubSub node. 457 * 458 * @param nodeName the PubSub node. 459 * @param itemId the ID of the item to be published. If <tt>null</tt> the 460 * XMPP server will generate random ID by itself. 461 * @param ext the item to be send. 462 * @throws IllegalArgumentException if the node does not exist. 463 * @throws Exception if fail to send the item. 464 */ publish(String nodeName, String itemId, ExtensionElement ext)465 public void publish(String nodeName, String itemId, ExtensionElement ext) 466 throws Exception 467 { 468 if(!nodes.contains(nodeName)) 469 throw new IllegalArgumentException("The node doesn't exists"); 470 471 PubSub packet = new PubSub(); 472 473 packet.setTo(serviceName); 474 packet.setType(IQ.Type.set); 475 476 final String packetID = StanzaIdUtil.newStanzaId(); 477 478 packet.setStanzaId(packetID); 479 480 PayloadItem<ExtensionElement> item = new PayloadItem<>(itemId, ext); 481 482 packet.addExtension(new PublishItem<>(nodeName, item)); 483 pendingPublishRequests.put(packetID, nodeName); 484 timeoutTimer.schedule( 485 new TimerTask() 486 { 487 @Override 488 public void run() 489 { 490 String nodeName 491 = pendingPublishRequests.remove(packetID); 492 493 if(nodeName != null) 494 { 495 logger.error( 496 "Timed out a publish request: " + nodeName); 497 } 498 } 499 }, 500 PACKET_TIMEOUT); 501 send(packet); 502 } 503 504 /** 505 * Removes a <tt>PubSubResponseListener</tt> from the list of listeners. 506 * 507 * @param l the listener to be removed 508 */ removeResponseListener(PubSubResponseListener l)509 public void removeResponseListener(PubSubResponseListener l) 510 { 511 listeners.remove(l); 512 } 513 514 /** 515 * Sends <tt>IQ</tt> packet. 516 * 517 * @param iq the packet. 518 * @throws Exception if sending fails. 519 */ send(IQ iq)520 private void send(IQ iq) 521 throws Exception 522 { 523 BundleContext bundleContext 524 = StatsManagerBundleActivator.getBundleContext(); 525 526 if (bundleContext != null) 527 { 528 Collection<ComponentImpl> components 529 = ComponentImpl.getComponents(bundleContext); 530 531 for (ComponentImpl component : components) 532 component.send(iq); 533 } 534 } 535 } 536