1 /*
2  * Copyright @ 2015 - Present, 8x8 Inc
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.jitsi.videobridge.pubsub;
17 
18 import java.util.*;
19 import java.util.concurrent.*;
20 
21 import org.jitsi.videobridge.stats.*;
22 import org.jitsi.videobridge.xmpp.*;
23 import org.jitsi.utils.logging2.*;
24 import org.jivesoftware.smack.packet.*;
25 import org.jivesoftware.smack.packet.id.*;
26 import org.jivesoftware.smackx.pubsub.*;
27 import org.jivesoftware.smackx.pubsub.packet.*;
28 import org.jivesoftware.smackx.xdata.packet.*;
29 import org.jxmpp.jid.*;
30 import org.osgi.framework.*;
31 
32 /**
33  * Implements some parts of PubSub (XEP-0060: Publish-Subscribe) for the
34  * purposes of a publisher (e.g. statistics transport).
35  *
36  * @author Hristo Terezov
37  * @author Lyubomir Marinov
38  */
39 public class PubSubPublisher
40 {
41     /**
42      * Maps a service name (e.g. "pubsub.example.com") to the
43      * <tt>PubSubPublisher</tt> instance responsible for it.
44      */
45     private static final Map<Jid, PubSubPublisher> instances
46         = new ConcurrentHashMap<>();
47 
48     /**
49      * The <tt>Logger</tt> used by the <tt>PubSubPublisher</tt> class and its
50      * instances to print debug information.
51      */
52     private static final Logger logger
53         = new LoggerImpl(PubSubPublisher.class.getName());
54 
55     /**
56      * The default timeout of the packets in milliseconds.
57      */
58     private static final int PACKET_TIMEOUT = 5000;
59 
60     /**
61      * Gets a <tt>PubSubPublisher</tt> instance for a specific service (name).
62      * If a <tt>PubSubPublisher</tt> instance for the specified
63      * <tt>serviceName</tt> does not exist yet, a new instance is initialized.
64      *
65      * @param serviceName the name of the service
66      * @return the <tt>PubSubPublisher</tt> instance for the specified
67      * <tt>serviceName</tt>
68      */
getPubsubManager(Jid serviceName)69     public static PubSubPublisher getPubsubManager(Jid serviceName)
70     {
71         PubSubPublisher publisher = instances.get(serviceName);
72 
73         if (publisher == null)
74         {
75             publisher = new PubSubPublisher(serviceName);
76             instances.put(serviceName, publisher);
77         }
78 
79         return publisher;
80     }
81 
82     /**
83      * Handles received response IQ packet.
84      *
85      * @param response the IQ packet.
86      */
handleIQResponse(IQ response)87     public static void handleIQResponse(IQ response)
88     {
89         IQ.Type type = response.getType();
90 
91         if (IQ.Type.error.equals(type))
92         {
93             PubSubPublisher publisher = instances.get(response.getFrom());
94 
95             if (publisher != null)
96             {
97                 publisher.handleErrorResponse(response);
98             }
99         }
100         else if (IQ.Type.result.equals(type))
101         {
102             PubSubPublisher publisher = instances.get(response.getFrom());
103 
104             if (publisher != null)
105             {
106                 publisher.handleCreateNodeResponse(response);
107                 publisher.handleConfigureResponse(response);
108                 publisher.handlePublishResponse(response);
109             }
110         }
111     }
112 
113     /**
114      * Releases the resources for the <tt>PubSubPublisher</tt> and removes it
115      * from the list of available instances.
116      *
117      * @param publisher the <tt>PubSubPublisher</tt> release.
118      */
releasePubsubManager(PubSubPublisher publisher)119     public static void releasePubsubManager(PubSubPublisher publisher)
120     {
121         instances.values().remove(publisher);
122         publisher.dispose();
123     }
124 
125     /**
126      * Listeners for response events.
127      */
128     private List<PubSubResponseListener> listeners = new LinkedList<>();
129 
130     /**
131      * List of the accessible PubSub nodes.
132      */
133     private List<String> nodes = new LinkedList<>();
134 
135     /**
136      * Map with the requests for configuring a node.
137      */
138     private Map<String, String> pendingConfigureRequests
139         = new ConcurrentHashMap<>();
140 
141     /**
142      * Map with the requests for node creation.
143      */
144     private Map<String, String> pendingCreateRequests
145         = new ConcurrentHashMap<>();
146 
147     /**
148      * Map with the publish requests.
149      */
150     private Map<String, String> pendingPublishRequests
151         = new ConcurrentHashMap<>();
152 
153     /**
154      * The name of the PubSub service.
155      */
156     private Jid serviceName;
157 
158     /**
159      * Timer for timeout of the requests that we are sending.
160      */
161     private Timer timeoutTimer = new Timer();
162 
163     /**
164      * Initializes a new <tt>PubSubPublisher</tt> instance for a specific
165      * service (name).
166      *
167      * @param serviceName the name of the service.
168      */
PubSubPublisher(Jid serviceName)169     private PubSubPublisher(Jid serviceName)
170     {
171         this.serviceName = serviceName;
172     }
173 
174     /**
175      * Adds a new <tt>PubSubResponseListener</tt> to the list of listeners.
176      *
177      * @param l the listener to add.
178      * @throws NullPointerException if <tt>l</tt> is <tt>null</tt>
179      */
addResponseListener(PubSubResponseListener l)180     public void addResponseListener(PubSubResponseListener l)
181     {
182         if (l == null)
183             throw new NullPointerException("l");
184         else if(!listeners.contains(l))
185             listeners.add(l);
186     }
187 
188     /**
189      * Configures PubSub node.
190      *
191      * @param nodeName the name of the node
192      */
configureNode(String nodeName)193     private void configureNode(String nodeName)
194     {
195         ConfigureForm cfg = new ConfigureForm(DataForm.Type.submit);
196         PubSub pubsub = new PubSub();
197 
198         cfg.setAccessModel(AccessModel.open);
199         cfg.setPersistentItems(false);
200         cfg.setPublishModel(PublishModel.open);
201         pubsub.setTo(serviceName);
202         pubsub.setType(IQ.Type.set);
203 
204         final String packetID = StanzaIdUtil.newStanzaId();
205 
206         pubsub.setStanzaId(packetID);
207         pubsub.addExtension(
208             new FormNode(FormNodeType.CONFIGURE_OWNER, nodeName ,cfg));
209         try
210         {
211             send(pubsub);
212         }
213         catch (Exception e)
214         {
215             logger.error("Error sending configuration form.");
216             fireResponseCreateEvent(PubSubResponseListener.Response.SUCCESS);
217             return;
218         }
219         pendingConfigureRequests.put(packetID, nodeName);
220         timeoutTimer.schedule(
221                 new TimerTask()
222                 {
223                     @Override
224                     public void run()
225                     {
226                         String nodeName
227                             = pendingConfigureRequests.remove(packetID);
228 
229                         if(nodeName != null)
230                         {
231                             logger.error(
232                                     "Timed out a configuration request "
233                                         + "(packetID=: " + packetID
234                                         + " nodeName=" + nodeName + ")");
235                             fireResponseCreateEvent(
236                                     PubSubResponseListener.Response.SUCCESS);
237                         }
238                     }
239                 },
240                 PACKET_TIMEOUT);
241     }
242 
243     /**
244      * Creates a PubSub node.
245      *
246      * @param nodeName the name of the node.
247      * @throws Exception if sending the request fails.
248      */
createNode(String nodeName)249     public void createNode(String nodeName)
250         throws Exception
251     {
252         PubSub request = new PubSub();
253 
254         request.setTo(serviceName);
255         request.setType(IQ.Type.set);
256 
257         final String packetID = StanzaIdUtil.newStanzaId();
258 
259         request.setStanzaId(packetID);
260         request.addExtension(
261                 new NodeExtension(PubSubElementType.CREATE, nodeName));
262 
263         pendingCreateRequests.put(packetID, nodeName);
264 
265         // Send the request before starting the timer, as we have observed
266         // sending to be significantly delayed (possibly waiting for the XMPP
267         // component connection to become ready).
268         send(request);
269 
270         timeoutTimer.schedule(
271                 new TimerTask()
272                 {
273                     @Override
274                     public void run()
275                     {
276                         String nodeName = pendingCreateRequests.remove(packetID);
277                         if (nodeName != null)
278                         {
279                             logger.warn("Timed out a create request with ID "
280                                             + packetID);
281                         }
282                     }
283                 },
284                 PACKET_TIMEOUT);
285     }
286 
287     /**
288      * Releases the resources of this <tt>PubSubPublisher</tt> i.e. prepares it
289      * for garbage collection.
290      */
dispose()291     private void dispose()
292     {
293         timeoutTimer.cancel();
294         timeoutTimer = null;
295 
296         listeners = null;
297         nodes = null;
298         pendingConfigureRequests = null;
299         pendingCreateRequests = null;
300         pendingPublishRequests = null;
301         serviceName = null;
302     }
303 
304     /**
305      * Fires event about the response of creating a node.
306      *
307      * @param type the type of the response
308      */
fireResponseCreateEvent(PubSubResponseListener.Response type)309     private void fireResponseCreateEvent(PubSubResponseListener.Response type)
310     {
311         for(PubSubResponseListener l : listeners)
312             l.onCreateNodeResponse(type);
313     }
314 
315     /**
316      * Fires event about the response of publishing an item to a node.
317      *
318      * @param type the type of the response
319      */
fireResponsePublishEvent( PubSubResponseListener.Response type, IQ iq)320     private void fireResponsePublishEvent(
321             PubSubResponseListener.Response type,
322             IQ iq)
323     {
324         for(PubSubResponseListener l : listeners)
325             l.onPublishResponse(type, iq);
326     }
327 
328     /**
329      * Handles PubSub configuration responses.
330      *
331      * @param response the configuration response.
332      */
handleConfigureResponse(IQ response)333     private void handleConfigureResponse(IQ response)
334     {
335         if(pendingConfigureRequests.remove(response.getStanzaId()) != null)
336             fireResponseCreateEvent(PubSubResponseListener.Response.SUCCESS);
337     }
338 
339     /**
340      * Handles responses about PubSub node creation.
341      *
342      * @param response the response
343      */
handleCreateNodeResponse(IQ response)344     private void handleCreateNodeResponse(IQ response)
345     {
346         String packetID = response.getStanzaId();
347         String nodeName = pendingCreateRequests.remove(packetID);
348 
349         if (nodeName != null)
350         {
351             nodes.add(nodeName);
352             configureNode(nodeName);
353         }
354     }
355 
356     /**
357      * Handles all error responses.
358      *
359      * @param response the response
360      */
handleErrorResponse(IQ response)361     private void handleErrorResponse(IQ response)
362     {
363         XMPPError err = response.getError();
364         String packetID = response.getStanzaId();
365 
366         if(err != null)
367         {
368             XMPPError.Type errType = err.getType();
369             XMPPError.Condition errCondition = err.getCondition();
370 
371             if((XMPPError.Type.CANCEL.equals(errType)
372                         && (XMPPError.Condition.conflict.equals(errCondition)
373                             || XMPPError.Condition.forbidden.equals(
374                                     errCondition)))
375                     /* prosody bug, for backward compatibility */
376                     || (XMPPError.Type.AUTH.equals(errType)
377                         && XMPPError.Condition.forbidden.equals(errCondition)))
378             {
379                 if (XMPPError.Condition.forbidden.equals(errCondition))
380                 {
381                     logger.warn(
382                             "Creating node failed with <forbidden/> error."
383                                 + " Continuing anyway.");
384                 }
385 
386                 String nodeName = pendingCreateRequests.remove(packetID);
387                 logger.info("PubSub node already exists (packetID=" + packetID
388                         + " nodeName=" + nodeName +")");
389 
390                 if (nodeName != null)
391                 {
392                     // The node exists already (<conflict/>) or we are not
393                     // allowed (forbidden/>).
394                     nodes.add(nodeName);
395                     fireResponseCreateEvent(
396                             PubSubResponseListener.Response.SUCCESS);
397                     return;
398                 }
399             }
400         }
401 
402         String nodeName;
403         StringBuilder errMsg = new StringBuilder("Error received");
404 
405         if((nodeName = pendingCreateRequests.remove(packetID)) != null)
406         {
407             fireResponseCreateEvent(PubSubResponseListener.Response.FAIL);
408             errMsg.append(" when creating the node: ");
409         }
410         else if((nodeName = pendingConfigureRequests.remove(packetID)) != null)
411         {
412             fireResponseCreateEvent(PubSubResponseListener.Response.SUCCESS);
413             errMsg.append(" when configuring the node: ");
414         }
415         else if((nodeName = pendingPublishRequests.remove(packetID)) != null)
416         {
417             fireResponsePublishEvent(
418                     PubSubResponseListener.Response.FAIL,
419                     response);
420             errMsg.append(" when publishing to the node: ");
421         }
422         else
423         {
424             nodeName = null;
425         }
426         if (nodeName != null)
427             errMsg.append(nodeName);
428         // Finish the sentence started with "Error received".
429         errMsg.append(".");
430         if(err != null)
431         {
432             errMsg.append(" Message: ").append(err.getDescriptiveText())
433                     .append(". Condition: ").append(err.getCondition())
434                     .append(". For packet with id: ").append(packetID)
435                     .append(".");
436         }
437         logger.error(errMsg);
438     }
439 
440     /**
441      * Handles PubSub publish responses.
442      *
443      * @param response the response
444      */
handlePublishResponse(IQ response)445     private void handlePublishResponse(IQ response)
446     {
447         if (pendingPublishRequests.remove(response.getStanzaId()) != null)
448         {
449             fireResponsePublishEvent(
450                     PubSubResponseListener.Response.SUCCESS,
451                     response);
452         }
453     }
454 
455     /**
456      * Publishes items to a given PubSub node.
457      *
458      * @param nodeName the PubSub node.
459      * @param itemId the ID of the item to be published. If <tt>null</tt> the
460      *               XMPP server will generate random ID by itself.
461      * @param ext the item to be send.
462      * @throws IllegalArgumentException if the node does not exist.
463      * @throws Exception if fail to send the item.
464      */
publish(String nodeName, String itemId, ExtensionElement ext)465     public void publish(String nodeName, String itemId, ExtensionElement ext)
466         throws Exception
467     {
468         if(!nodes.contains(nodeName))
469             throw new IllegalArgumentException("The node doesn't exists");
470 
471         PubSub packet = new PubSub();
472 
473         packet.setTo(serviceName);
474         packet.setType(IQ.Type.set);
475 
476         final String packetID = StanzaIdUtil.newStanzaId();
477 
478         packet.setStanzaId(packetID);
479 
480         PayloadItem<ExtensionElement> item = new PayloadItem<>(itemId, ext);
481 
482         packet.addExtension(new PublishItem<>(nodeName, item));
483         pendingPublishRequests.put(packetID, nodeName);
484         timeoutTimer.schedule(
485                 new TimerTask()
486                 {
487                     @Override
488                     public void run()
489                     {
490                         String nodeName
491                             = pendingPublishRequests.remove(packetID);
492 
493                         if(nodeName != null)
494                         {
495                             logger.error(
496                                     "Timed out a publish request: " + nodeName);
497                         }
498                     }
499                 },
500                 PACKET_TIMEOUT);
501         send(packet);
502     }
503 
504     /**
505      * Removes a <tt>PubSubResponseListener</tt> from the list of listeners.
506      *
507      * @param l the listener to be removed
508      */
removeResponseListener(PubSubResponseListener l)509     public void removeResponseListener(PubSubResponseListener l)
510     {
511         listeners.remove(l);
512     }
513 
514     /**
515      * Sends <tt>IQ</tt> packet.
516      *
517      * @param iq the packet.
518      * @throws Exception if sending fails.
519      */
send(IQ iq)520     private void send(IQ iq)
521         throws Exception
522     {
523         BundleContext bundleContext
524             = StatsManagerBundleActivator.getBundleContext();
525 
526         if (bundleContext != null)
527         {
528             Collection<ComponentImpl> components
529                 = ComponentImpl.getComponents(bundleContext);
530 
531             for (ComponentImpl component : components)
532                 component.send(iq);
533         }
534     }
535 }
536