1 /** 2 * Copyright (C) Azureus Software, Inc, All Rights Reserved. 3 * 4 * This program is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU General Public License 6 * as published by the Free Software Foundation; either version 2 7 * of the License, or (at your option) any later version. 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 * You should have received a copy of the GNU General Public License 13 * along with this program; if not, write to the Free Software 14 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 15 * 16 */ 17 18 package com.aelitis.azureus.core.messenger; 19 20 import java.io.InputStream; 21 import java.io.UnsupportedEncodingException; 22 import java.net.Proxy; 23 import java.net.URL; 24 import java.net.URLEncoder; 25 import java.util.*; 26 27 import org.gudy.azureus2.core3.util.*; 28 import org.gudy.azureus2.core3.util.Timer; 29 import org.gudy.azureus2.plugins.utils.StaticUtilities; 30 import org.gudy.azureus2.plugins.utils.resourcedownloader.ResourceDownloader; 31 import org.gudy.azureus2.plugins.utils.resourcedownloader.ResourceDownloaderException; 32 import org.gudy.azureus2.plugins.utils.resourcedownloader.ResourceDownloaderFactory; 33 34 import com.aelitis.azureus.core.cnetwork.ContentNetwork; 35 import com.aelitis.azureus.core.cnetwork.ContentNetworkManagerFactory; 36 import com.aelitis.azureus.core.proxy.AEProxyFactory; 37 import com.aelitis.azureus.core.proxy.AEProxyFactory.PluginProxy; 38 import com.aelitis.azureus.util.*; 39 40 /** 41 * @author TuxPaper 42 * @created Sep 25, 2006 43 * 44 */ 45 public class PlatformMessenger 46 { 47 private static final boolean DEBUG_URL = System.getProperty( 48 "platform.messenger.debug.url", "0").equals("1"); 49 50 private static final String URL_PLATFORM_MESSAGE = "?service=rpc"; 51 52 private static final String URL_POST_PLATFORM_DATA = "service=rpc"; 53 54 private static final int MAX_POST_LENGTH = 1024 * 512 * 3; // 1.5M 55 56 private static boolean USE_HTTP_POST = true; 57 58 public static String REPLY_EXCEPTION = "exception"; 59 60 public static String REPLY_ACTION = "action"; 61 62 public static String REPLY_RESULT = "response"; 63 64 /** Key: id of queue; Value: Map of queued messages & listeners */ 65 static private Map<String, Map<PlatformMessage, PlatformMessengerListener>> mapQueues = new HashMap<String, Map<PlatformMessage, PlatformMessengerListener>>(); 66 67 private static final String QUEUE_NOAZID = "noazid."; 68 69 private static final String QUEUE_NORMAL = "msg."; 70 71 static private AEMonitor queue_mon = new AEMonitor( 72 "v3.PlatformMessenger.queue"); 73 74 static private Timer timerProcess = new Timer("v3.PlatformMessenger.queue"); 75 76 //static private TimerEvent timerEvent = null; 77 78 static private Map<String, TimerEvent> mapTimerEvents = new HashMap<String, TimerEvent>(); 79 80 static private AEMonitor mon_mapTimerEvents = new AEMonitor("mapTimerEvents"); 81 82 private static boolean initialized; 83 84 private static fakeContext context; 85 86 private static boolean allowMulti = false; 87 88 private static AsyncDispatcher dispatcher = new AsyncDispatcher(5000); 89 90 private static Map<String, Object> mapExtra = new HashMap<String, Object>(); 91 init()92 public static synchronized void init() { 93 if (initialized) { 94 return; 95 } 96 initialized = true; 97 98 // The UI will initialize this 99 context = new fakeContext(); 100 } 101 getClientMessageContext()102 public static ClientMessageContext getClientMessageContext() { 103 if (!initialized) { 104 init(); 105 } 106 return context; 107 } 108 queueMessage(PlatformMessage message, PlatformMessengerListener listener)109 public static void queueMessage(PlatformMessage message, 110 PlatformMessengerListener listener) { 111 queueMessage(message, listener, true); 112 } 113 queueMessage(PlatformMessage message, PlatformMessengerListener listener, boolean addToBottom)114 public static void queueMessage(PlatformMessage message, 115 PlatformMessengerListener listener, boolean addToBottom) { 116 117 if (!initialized) { 118 init(); 119 } 120 121 if (message == null) { 122 debug("fire timerevent"); 123 } 124 queue_mon.enter(); 125 try { 126 long fireBefore; 127 final String queueID; 128 if (message != null) { 129 if (!message.sendAZID()) { 130 queueID = QUEUE_NOAZID; 131 } else { 132 queueID = QUEUE_NORMAL; 133 } 134 135 Map<PlatformMessage, PlatformMessengerListener> mapQueue = mapQueues.get(queueID); 136 if (mapQueue == null) { 137 mapQueue = new LinkedHashMap<PlatformMessage, PlatformMessengerListener>(); 138 mapQueues.put(queueID, mapQueue); 139 } 140 mapQueue.put(message, listener); 141 142 debug("q " + queueID + "(" + mapQueue.size() + ") " 143 + message.toShortString() + ": " + message + " @ " 144 + new Date(message.getFireBefore()) + "; in " 145 + (message.getFireBefore() - SystemTime.getCurrentTime()) + "ms"); 146 147 fireBefore = message.getFireBefore(); 148 } else { 149 queueID = null; 150 fireBefore = SystemTime.getCurrentTime(); 151 } 152 153 if (queueID != null) { 154 try { 155 mon_mapTimerEvents.enter(); 156 157 TimerEvent timerEvent = mapTimerEvents.get(queueID); 158 159 if (timerEvent == null || timerEvent.hasRun() || fireBefore < timerEvent.getWhen()) { 160 if (timerEvent != null) { 161 mapTimerEvents.remove(queueID); 162 timerEvent.cancel(); 163 } 164 165 timerEvent = timerProcess.addEvent(fireBefore, 166 new TimerEventPerformer() { 167 public void perform(TimerEvent event) { 168 try { 169 mon_mapTimerEvents.enter(); 170 171 if ( mapTimerEvents.get(queueID) == event ){ 172 mapTimerEvents.remove( queueID ); 173 } 174 } finally { 175 mon_mapTimerEvents.exit(); 176 } 177 178 Map<PlatformMessage, PlatformMessengerListener> mapQueue = mapQueues.get(queueID); 179 while (mapQueue != null && mapQueue.size() > 0) { 180 processQueue(queueID, mapQueue); 181 } 182 /* 183 Object[] keys = mapQueues.keySet().toArray(); 184 for (int i = 0; i < keys.length; i++) { 185 Map mapQueue = mapQueues.get(keys[i]); 186 while (mapQueue != null && mapQueue.size() > 0) { 187 processQueue(mapQueue); 188 } 189 } 190 */ 191 } 192 }); 193 mapTimerEvents.put(queueID, timerEvent); 194 } 195 if (timerEvent != null) { 196 debug(" next q process for " + queueID + " in " + (timerEvent.getWhen() - SystemTime.getCurrentTime())); 197 } 198 } finally { 199 mon_mapTimerEvents.exit(); 200 } 201 } 202 } finally { 203 queue_mon.exit(); 204 } 205 } 206 207 /** 208 * @param string 209 */ debug(String string)210 public static void debug(String string) { 211 AEDiagnosticsLogger diag_logger = AEDiagnostics.getLogger("v3.PMsgr"); 212 diag_logger.log(string); 213 if (ConstantsVuze.DIAG_TO_STDOUT) { 214 System.out.println(Thread.currentThread().getName() + "|" 215 + System.currentTimeMillis() + "] " + string); 216 } 217 } 218 debug(String string, Throwable e)219 protected static void debug(String string, Throwable e) { 220 debug(string + "\n\t" + e.getClass().getName() + ": " + e.getMessage() 221 + ", " + Debug.getCompressedStackTrace(e, 1, 80)); 222 } 223 224 /** 225 * Sends the message almost immediately, skipping delayauthorization check 226 * @param message 227 * @param listener 228 * 229 * @since 3.0.5.3 230 */ pushMessageNow(PlatformMessage message, PlatformMessengerListener listener)231 public static void pushMessageNow(PlatformMessage message, 232 PlatformMessengerListener listener) { 233 debug("push " + message.toShortString() + ": " + message); 234 235 Map map = new HashMap(1); 236 map.put(message, listener); 237 processQueue(null, map); 238 } 239 240 /** 241 * @param requiresAuthorization 242 * 243 */ processQueue(String queueID, Map mapQueue)244 protected static void processQueue(String queueID, Map mapQueue) { 245 if (!initialized) { 246 init(); 247 } 248 249 final Map mapProcessing = new HashMap(); 250 251 boolean sendAZID = true; 252 long contentNetworkID = ContentNetwork.CONTENT_NETWORK_VUZE; 253 254 // Create urlStem (or post data) 255 boolean isMulti = false; 256 StringBuffer urlStem = new StringBuffer(); 257 long sequenceNo = 0; 258 259 Map<String, Object> mapPayload = new HashMap<String, Object>(); 260 mapPayload.put("azid", ConstantsVuze.AZID); 261 mapPayload.put("azv", Constants.AZUREUS_VERSION); 262 mapPayload.put("mode", FeatureUtils.getPlusMode()); 263 mapPayload.put("noadmode", FeatureUtils.getNoAdsMode()); 264 mapPayload.putAll(mapExtra); 265 List<Map> listCommands = new ArrayList<Map>(); 266 mapPayload.put("commands", listCommands); 267 268 boolean forceProxy = false; 269 270 queue_mon.enter(); 271 try { 272 String lastServer = null; 273 // add one at a time, ensure relay server messages are seperate 274 boolean first = true; 275 for (Iterator iter = mapQueue.keySet().iterator(); iter.hasNext();) { 276 PlatformMessage message = (PlatformMessage) iter.next(); 277 Object value = mapQueue.get(message); 278 279 Map<String, Object> mapCmd = new HashMap<String, Object>(); 280 281 boolean fp = message.isForceProxy(); 282 283 if ( fp ){ 284 forceProxy = true; 285 } 286 287 if (first) { 288 sendAZID = message.sendAZID(); 289 first = false; 290 } 291 292 // build urlStem 293 message.setSequenceNo(sequenceNo); 294 295 if (urlStem.length() > 0) { 296 urlStem.append('&'); 297 } 298 299 String listenerID = message.getListenerID(); 300 String messageID = message.getMessageID(); 301 Map params = message.getParameters(); 302 try { 303 urlStem.append("msg="); 304 urlStem.append(URLEncoder.encode(listenerID, "UTF-8")); 305 urlStem.append(":"); 306 urlStem.append(URLEncoder.encode(message.getOperationID(), 307 "UTF-8")); 308 } catch (UnsupportedEncodingException e) { 309 } 310 311 mapCmd.put("seq-id", sequenceNo); 312 mapCmd.put("listener-id", listenerID); 313 mapCmd.put("op-id", message.getOperationID()); 314 if (params != null) { 315 mapCmd.put("values", params); 316 } 317 listCommands.add(mapCmd); 318 319 // We used to check on MAX_POST_LENGTH, but with the changes that 320 // would require converting the map to JSON on every iteration to get 321 // the length. For now, just limit to 10 322 if (sequenceNo > 10) { 323 debug("breaking up batch at " + sequenceNo 324 + " because max limit would be exceeded"); 325 break; 326 } 327 328 String curServer = messageID + "-" + listenerID; 329 if (lastServer != null && !lastServer.equals(curServer)) { 330 isMulti = true; 331 } 332 lastServer = curServer; 333 334 PlatformMessengerListener listener = (PlatformMessengerListener) mapProcessing.get(message); 335 if (listener != null) { 336 listener.messageSent(message); 337 } 338 sequenceNo++; 339 340 // Adjust lists 341 mapProcessing.put(message, value); 342 343 iter.remove(); 344 345 if (!getAllowMulti() ) { 346 break; 347 } 348 } 349 } finally { 350 queue_mon.exit(); 351 } 352 //debug("about to process " + mapProcessing.size()); 353 354 if (mapProcessing.size() == 0) { 355 return; 356 } 357 358 // Build base RPC url based on listener and server 359 360 // one day all this URL hacking should be moved into the ContentNetwork... 361 362 ContentNetwork cn = ContentNetworkManagerFactory.getSingleton().getContentNetwork( 363 contentNetworkID); 364 if (cn == null) { 365 cn = ConstantsVuze.getDefaultContentNetwork(); 366 } 367 368 String sURL_RPC = ContentNetworkUtils.getUrl(cn, ContentNetwork.SERVICE_RPC) 369 + "&" + urlStem.toString(); 370 371 if ( forceProxy ){ 372 373 sendAZID = false; 374 375 // yah, well there's code in ContentNetworkUtils.getUrl that adds in the azid too :( 376 377 sURL_RPC = sURL_RPC.replaceAll( "([\\?&])azid=.*?&", "$1" ); 378 379 mapPayload.remove( "azid" ); 380 } 381 382 // Build full url and data to send 383 String sURL; 384 String sPostData = null; 385 String sJSONPayload = UrlUtils.encode(JSONUtils.encodeToJSON(mapPayload)); 386 if (USE_HTTP_POST) { 387 sURL = sURL_RPC; 388 389 sPostData = URL_POST_PLATFORM_DATA + "&payload=" + sJSONPayload; 390 sPostData = cn.appendURLSuffix(sPostData, true, sendAZID); 391 392 if (DEBUG_URL) { 393 debug("POST for " + mapProcessing.size() + ": " + sURL + "\n DATA: " 394 + sPostData); 395 } else { 396 debug("POST for " + mapProcessing.size() + ": " + sURL); 397 } 398 } else { 399 sURL = sURL_RPC + URL_PLATFORM_MESSAGE + "&payload=" + sJSONPayload; 400 401 sURL = cn.appendURLSuffix(sURL, false, sendAZID); 402 403 if (DEBUG_URL) { 404 debug("GET: " + sURL); 405 } else { 406 debug("GET: " + sURL_RPC + URL_PLATFORM_MESSAGE); 407 } 408 } 409 410 final String fURL = sURL; 411 final String fPostData = sPostData; 412 final boolean fForceProxy = forceProxy; 413 414 // one at a time to take advantage of keep-alive connections 415 416 dispatcher.dispatch( 417 new AERunnable() 418 { 419 public void 420 runSupport() 421 { 422 try { 423 processQueueAsync(fURL, fPostData, mapProcessing,fForceProxy); 424 } catch (Throwable e) { 425 e.printStackTrace(); 426 if (e instanceof ResourceDownloaderException) { 427 debug("Error while sending message(s) to Platform: " + e.toString()); 428 } else { 429 debug("Error while sending message(s) to Platform", e); 430 } 431 for (Iterator iter = mapProcessing.keySet().iterator(); iter.hasNext();) { 432 PlatformMessage message = (PlatformMessage) iter.next(); 433 PlatformMessengerListener l = (PlatformMessengerListener) mapProcessing.get(message); 434 if (l != null) { 435 try { 436 HashMap map = new HashMap(); 437 map.put("text", e.toString()); 438 map.put("Throwable", e); 439 l.replyReceived(message, REPLY_EXCEPTION, map); 440 } catch (Throwable e2) { 441 debug("Error while sending replyReceived", e2); 442 } 443 } 444 } 445 } 446 } 447 }); 448 449 } 450 451 /** 452 * @param mapProcessing 453 * @param surl 454 * @throws Exception 455 */ processQueueAsync(String sURL, String sData, Map mapProcessing, boolean forceProxy)456 protected static void processQueueAsync(String sURL, String sData, 457 Map mapProcessing, boolean forceProxy) throws Throwable { 458 URL url; 459 url = new URL(sURL); 460 461 Object[] result = downloadURL(url, sData, forceProxy); 462 463 String s = (String)result[0]; 464 List listReplies = (List)result[1]; 465 466 if ( listReplies == null || listReplies.isEmpty()) { 467 debug("Error while sending message(s) to Platform: reply: " + s 468 + "\nurl: " + sURL + "\nPostData: " + sData); 469 for (Iterator iter = mapProcessing.keySet().iterator(); iter.hasNext();) { 470 PlatformMessage message = (PlatformMessage) iter.next(); 471 PlatformMessengerListener l = (PlatformMessengerListener) mapProcessing.get(message); 472 if (l != null) { 473 try { 474 HashMap map = new HashMap(); 475 map.put("text", "result was " + s); 476 l.replyReceived(message, REPLY_EXCEPTION, map); 477 } catch (Throwable e2) { 478 debug("Error while sending replyReceived" + "\nurl: " + sURL 479 + "\nPostData: " + sData, e2); 480 } 481 } 482 } 483 return; 484 } 485 486 Map<Long, Map> mapOrder = new HashMap<Long, Map>(); 487 for (Object reply : listReplies) { 488 if (reply instanceof Map) { 489 mapOrder.put(MapUtils.getMapLong((Map) reply, "seq-id", -1), (Map) reply); 490 } 491 } 492 for (Iterator iter = mapProcessing.keySet().iterator(); iter.hasNext();) { 493 PlatformMessage message = (PlatformMessage) iter.next(); 494 PlatformMessengerListener l = (PlatformMessengerListener) mapProcessing.get(message); 495 if (l == null) { 496 continue; 497 } 498 Map mapReply = mapOrder.get(new Long(message.getSequenceNo())); 499 if (mapReply == null) { 500 debug("No reply for " + message.toShortString()); 501 } 502 String replyType = MapUtils.getMapString(mapReply, "type", "payload"); 503 Map payload; 504 if (replyType.equalsIgnoreCase("payload")) { 505 payload = MapUtils.getMapMap(mapReply, "payload", Collections.EMPTY_MAP); 506 } else { 507 payload = new HashMap(); 508 payload.put("message", MapUtils.getMapString(mapReply, "message", "?")); 509 } 510 511 512 if (mapReply != null) { 513 String reply = JSONUtils.encodeToJSON(payload); 514 debug("Got a reply for " 515 + message.toShortString() + "\n\t" 516 + reply.substring(0, Math.min(8192, reply.length()))); 517 } 518 519 try { 520 l.replyReceived(message, replyType, payload); 521 } catch (Exception e2) { 522 debug("Error while sending replyReceived", e2); 523 } 524 } 525 } 526 527 private static Object[] downloadURL( URL rpc_url, String postData, boolean forceProxy )528 downloadURL( 529 URL rpc_url, 530 String postData, 531 boolean forceProxy ) 532 533 throws Throwable 534 { 535 Throwable error = null; 536 537 if ( !forceProxy ){ 538 539 try{ 540 Object[] result = downloadURLSupport( null, null, rpc_url, postData ); 541 542 if ( result[1] == null ){ 543 544 throw( new Exception( "Request failed" )); 545 546 }else{ 547 548 return( result ); 549 } 550 }catch( Throwable e ){ 551 552 error = e; 553 } 554 } 555 556 try{ 557 PluginProxy plugin_proxy = AEProxyFactory.getPluginProxy( "vuze settings", rpc_url, true ); 558 559 if ( plugin_proxy == null ){ 560 561 if ( error != null ){ 562 563 throw( error ); 564 } 565 566 throw( new Exception( "Proxy unavailable" )); 567 568 }else{ 569 570 URL url = plugin_proxy.getURL(); 571 Proxy proxy = plugin_proxy.getProxy(); 572 573 boolean ok = false; 574 575 try{ 576 String proxy_host = rpc_url.getHost() + (rpc_url.getPort()==-1?"":(":" + rpc_url.getPort())); 577 578 Object[] result = downloadURLSupport( proxy, proxy_host, url, postData ); 579 580 ok = true; 581 582 return( result ); 583 584 }finally{ 585 586 plugin_proxy.setOK( ok ); 587 } 588 } 589 }catch( Throwable f ){ 590 591 throw( error==null?f:error ); 592 } 593 } 594 595 private static Object[] downloadURLSupport( Proxy proxy, String proxy_host, URL url, String postData )596 downloadURLSupport( 597 Proxy proxy, 598 String proxy_host, 599 URL url, 600 String postData ) 601 602 throws Throwable 603 { 604 ResourceDownloaderFactory rdf = StaticUtilities.getResourceDownloaderFactory(); 605 606 ResourceDownloader rd; 607 608 if ( proxy == null ){ 609 610 rd = rdf.create(url, postData); 611 612 }else{ 613 614 rd = rdf.create(url, postData, proxy); 615 } 616 617 if ( proxy_host != null ){ 618 619 rd.setProperty( "URL_HOST", proxy_host ); 620 } 621 622 rd.setProperty( "URL_Connection", "Keep-Alive" ); 623 624 rd = rdf.getRetryDownloader(rd, 3); 625 626 // We could report percentage to listeners, but there's no need to atm 627 // rd.addListener(new ResourceDownloaderListener() { 628 // 629 // public void reportPercentComplete(ResourceDownloader downloader, 630 // int percentage) { 631 // } 632 // 633 // public void reportActivity(ResourceDownloader downloader, String activity) { 634 // } 635 // 636 // public void failed(ResourceDownloader downloader, 637 // ResourceDownloaderException e) { 638 // } 639 // 640 // public boolean completed(ResourceDownloader downloader, InputStream data) { 641 // return true; 642 // } 643 // }); 644 645 InputStream is = rd.download(); 646 647 byte data[]; 648 649 try { 650 int length = is.available(); 651 652 data = new byte[length]; 653 654 is.read(data); 655 656 } finally { 657 658 is.close(); 659 } 660 661 String s = new String( data, "UTF8"); 662 663 Map mapAllReplies = JSONUtils.decodeJSON(s); 664 665 List listReplies = MapUtils.getMapList(mapAllReplies, "replies", null); 666 667 return( new Object[]{ s, listReplies }); 668 } 669 setAllowMulti(boolean allowMulti)670 public static void setAllowMulti(boolean allowMulti) { 671 PlatformMessenger.allowMulti = allowMulti; 672 } 673 getAllowMulti()674 public static boolean getAllowMulti() { 675 return allowMulti; 676 } 677 addExtraParam(String key, Object value)678 public static void addExtraParam(String key, Object value) { 679 synchronized (mapExtra) { 680 mapExtra.put(key, value); 681 } 682 } 683 684 private static class fakeContext 685 extends ClientMessageContextImpl 686 { 687 private long contentNetworkID = ConstantsVuze.DEFAULT_CONTENT_NETWORK_ID; 688 log(String str)689 private void log(String str) { 690 if (System.getProperty("browser.route.all.external.stimuli.for.testing", 691 "false").equalsIgnoreCase("true")) { 692 693 System.err.println(str); 694 } 695 debug(str); 696 } 697 fakeContext()698 public fakeContext() { 699 super("fakeContext", null); 700 } 701 deregisterBrowser()702 public void deregisterBrowser() { 703 log("deregisterBrowser"); 704 } 705 displayBrowserMessage(String message)706 public void displayBrowserMessage(String message) { 707 log("displayBrowserMessage - " + message); 708 } 709 executeInBrowser(String javascript)710 public boolean executeInBrowser(String javascript) { 711 log("executeInBrowser - " + javascript); 712 return false; 713 } 714 getBrowserData(String key)715 public Object getBrowserData(String key) { 716 log("getBrowserData - " + key); 717 return null; 718 } 719 sendBrowserMessage(String key, String op)720 public boolean sendBrowserMessage(String key, String op) { 721 log("sendBrowserMessage - " + key + "/" + op); 722 return false; 723 } 724 sendBrowserMessage(String key, String op, Map params)725 public boolean sendBrowserMessage(String key, String op, Map params) { 726 log("sendBrowserMessage - " + key + "/" + op + "/" + params); 727 return false; 728 } 729 setBrowserData(String key, Object value)730 public void setBrowserData(String key, Object value) { 731 log("setBrowserData - " + key + "/" + value); 732 } 733 sendBrowserMessage(String key, String op, Collection params)734 public boolean sendBrowserMessage(String key, String op, Collection params) { 735 log("sendBrowserMessage - " + key + "/" + op + "/" + params); 736 return false; 737 } 738 setTorrentURLHandler(torrentURLHandler handler)739 public void setTorrentURLHandler(torrentURLHandler handler) { 740 log("setTorrentURLHandler - " + handler); 741 } 742 getContentNetworkID()743 public long getContentNetworkID() { 744 return contentNetworkID; 745 } 746 setContentNetworkID(long contentNetwork)747 public void setContentNetworkID(long contentNetwork) { 748 this.contentNetworkID = contentNetwork; 749 } 750 } 751 } 752