1 /* 2 * Copyright (c) 2002, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package com.sun.jmx.remote.internal; 26 27 import java.io.IOException; 28 import java.io.NotSerializableException; 29 30 import java.util.ArrayList; 31 import java.util.HashMap; 32 import java.util.List; 33 import java.util.Map; 34 import java.util.concurrent.Executor; 35 36 import java.security.AccessControlContext; 37 import java.security.AccessController; 38 import java.security.PrivilegedAction; 39 import javax.security.auth.Subject; 40 41 import javax.management.Notification; 42 import javax.management.NotificationListener; 43 import javax.management.NotificationFilter; 44 import javax.management.ObjectName; 45 import javax.management.MBeanServerNotification; 46 import javax.management.InstanceNotFoundException; 47 import javax.management.ListenerNotFoundException; 48 49 import javax.management.remote.NotificationResult; 50 import javax.management.remote.TargetedNotification; 51 52 import com.sun.jmx.remote.util.ClassLogger; 53 import com.sun.jmx.remote.util.EnvHelp; 54 import java.lang.reflect.UndeclaredThrowableException; 55 import java.util.concurrent.RejectedExecutionException; 56 57 58 public abstract class ClientNotifForwarder { 59 60 private final AccessControlContext acc; 61 ClientNotifForwarder(Map<String, ?> env)62 public ClientNotifForwarder(Map<String, ?> env) { 63 this(null, env); 64 } 65 66 private static int threadId; 67 68 /* An Executor that allows at most one executing and one pending 69 Runnable. It uses at most one thread -- as soon as there is 70 no pending Runnable the thread can exit. Another thread is 71 created as soon as there is a new pending Runnable. This 72 Executor is adapted for use in a situation where each Runnable 73 usually schedules up another Runnable. On return from the 74 first one, the second one is immediately executed. So this 75 just becomes a complicated way to write a while loop, but with 76 the advantage that you can replace it with another Executor, 77 for instance one that you are using to execute a bunch of other 78 unrelated work. 79 80 You might expect that a java.util.concurrent.ThreadPoolExecutor 81 with corePoolSize=0 and maximumPoolSize=1 would have the same 82 behavior, but it does not. A ThreadPoolExecutor only creates 83 a new thread when a new task is submitted and the number of 84 existing threads is < corePoolSize. This can never happen when 85 corePoolSize=0, so new threads are never created. Surprising, 86 but there you are. 87 */ 88 private static class LinearExecutor implements Executor { execute(Runnable command)89 public synchronized void execute(Runnable command) { 90 if (this.command != null) 91 throw new IllegalArgumentException("More than one command"); 92 this.command = command; 93 if (thread == null) { 94 thread = new Thread( 95 null, 96 ()-> { 97 while (true) { 98 Runnable r; 99 synchronized (LinearExecutor.this) { 100 if (LinearExecutor.this.command == null) { 101 thread = null; 102 return; 103 } else { 104 r = LinearExecutor.this.command; 105 LinearExecutor.this.command = null; 106 } 107 } 108 r.run(); 109 } 110 }, 111 "ClientNotifForwarder-" + ++threadId, 112 0, 113 false 114 ); 115 thread.setDaemon(true); 116 thread.start(); 117 } 118 } 119 120 private Runnable command; 121 private Thread thread; 122 } 123 ClientNotifForwarder(ClassLoader defaultClassLoader, Map<String, ?> env)124 public ClientNotifForwarder(ClassLoader defaultClassLoader, Map<String, ?> env) { 125 maxNotifications = EnvHelp.getMaxFetchNotifNumber(env); 126 timeout = EnvHelp.getFetchTimeout(env); 127 128 /* You can supply an Executor in which the remote call to 129 fetchNotifications will be made. The Executor's execute 130 method reschedules another task, so you must not use 131 an Executor that executes tasks in the caller's thread. */ 132 Executor ex = (Executor) 133 env.get("jmx.remote.x.fetch.notifications.executor"); 134 if (ex == null) 135 ex = new LinearExecutor(); 136 else if (logger.traceOn()) 137 logger.trace("ClientNotifForwarder", "executor is " + ex); 138 139 this.defaultClassLoader = defaultClassLoader; 140 this.executor = ex; 141 this.acc = AccessController.getContext(); 142 } 143 144 /** 145 * Called to fetch notifications from a server. 146 */ fetchNotifs(long clientSequenceNumber, int maxNotifications, long timeout)147 abstract protected NotificationResult fetchNotifs(long clientSequenceNumber, 148 int maxNotifications, 149 long timeout) 150 throws IOException, ClassNotFoundException; 151 addListenerForMBeanRemovedNotif()152 abstract protected Integer addListenerForMBeanRemovedNotif() 153 throws IOException, InstanceNotFoundException; 154 removeListenerForMBeanRemovedNotif(Integer id)155 abstract protected void removeListenerForMBeanRemovedNotif(Integer id) 156 throws IOException, InstanceNotFoundException, 157 ListenerNotFoundException; 158 159 /** 160 * Used to send out a notification about lost notifs 161 */ lostNotifs(String message, long number)162 abstract protected void lostNotifs(String message, long number); 163 164 addNotificationListener(Integer listenerID, ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback, Subject delegationSubject)165 public synchronized void addNotificationListener(Integer listenerID, 166 ObjectName name, 167 NotificationListener listener, 168 NotificationFilter filter, 169 Object handback, 170 Subject delegationSubject) 171 throws IOException, InstanceNotFoundException { 172 173 if (logger.traceOn()) { 174 logger.trace("addNotificationListener", 175 "Add the listener "+listener+" at "+name); 176 } 177 178 infoList.put(listenerID, 179 new ClientListenerInfo(listenerID, 180 name, 181 listener, 182 filter, 183 handback, 184 delegationSubject)); 185 186 187 init(false); 188 } 189 190 public synchronized Integer[] getListenerIds(ObjectName name, NotificationListener listener)191 getListenerIds(ObjectName name, 192 NotificationListener listener) 193 throws ListenerNotFoundException, IOException { 194 195 List<Integer> ids = new ArrayList<Integer>(); 196 List<ClientListenerInfo> values = 197 new ArrayList<ClientListenerInfo>(infoList.values()); 198 for (int i=values.size()-1; i>=0; i--) { 199 ClientListenerInfo li = values.get(i); 200 201 if (li.sameAs(name, listener)) { 202 ids.add(li.getListenerID()); 203 } 204 } 205 206 if (ids.isEmpty()) 207 throw new ListenerNotFoundException("Listener not found"); 208 209 return ids.toArray(new Integer[0]); 210 } 211 212 public synchronized Integer getListenerId(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback)213 getListenerId(ObjectName name, 214 NotificationListener listener, 215 NotificationFilter filter, 216 Object handback) 217 throws ListenerNotFoundException, IOException { 218 219 Integer id = null; 220 221 List<ClientListenerInfo> values = 222 new ArrayList<ClientListenerInfo>(infoList.values()); 223 for (int i=values.size()-1; i>=0; i--) { 224 ClientListenerInfo li = values.get(i); 225 if (li.sameAs(name, listener, filter, handback)) { 226 id=li.getListenerID(); 227 break; 228 } 229 } 230 231 if (id == null) 232 throw new ListenerNotFoundException("Listener not found"); 233 234 return id; 235 } 236 237 public synchronized Integer[] removeNotificationListener(ObjectName name, NotificationListener listener)238 removeNotificationListener(ObjectName name, 239 NotificationListener listener) 240 throws ListenerNotFoundException, IOException { 241 242 beforeRemove(); 243 244 if (logger.traceOn()) { 245 logger.trace("removeNotificationListener", 246 "Remove the listener "+listener+" from "+name); 247 } 248 Integer[] liIds = getListenerIds(name, listener); 249 for (int i = 0; i < liIds.length; i++) { 250 infoList.remove(liIds[i]); 251 } 252 253 return liIds; 254 } 255 256 public synchronized Integer removeNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback)257 removeNotificationListener(ObjectName name, 258 NotificationListener listener, 259 NotificationFilter filter, 260 Object handback) 261 throws ListenerNotFoundException, IOException { 262 263 if (logger.traceOn()) { 264 logger.trace("removeNotificationListener", 265 "Remove the listener "+listener+" from "+name); 266 } 267 268 beforeRemove(); 269 Integer liId = getListenerId(name, listener, 270 filter, handback); 271 infoList.remove(liId); 272 273 return liId; 274 } 275 removeNotificationListener(ObjectName name)276 public synchronized Integer[] removeNotificationListener(ObjectName name) { 277 if (logger.traceOn()) { 278 logger.trace("removeNotificationListener", 279 "Remove all listeners registered at "+name); 280 } 281 282 List<Integer> ids = new ArrayList<Integer>(); 283 284 List<ClientListenerInfo> values = 285 new ArrayList<ClientListenerInfo>(infoList.values()); 286 for (int i=values.size()-1; i>=0; i--) { 287 ClientListenerInfo li = values.get(i); 288 if (li.sameAs(name)) { 289 ids.add(li.getListenerID()); 290 291 infoList.remove(li.getListenerID()); 292 } 293 } 294 295 return ids.toArray(new Integer[0]); 296 } 297 298 /* 299 * Called when a connector is doing reconnection. Like <code>postReconnection</code>, 300 * this method is intended to be called only by a client connector: 301 * <code>RMIConnector</code> and <code>ClientIntermediary</code>. 302 * Call this method will set the flag beingReconnection to <code>true</code>, 303 * and the thread used to fetch notifis will be stopped, a new thread can be 304 * created only after the method <code>postReconnection</code> is called. 305 * 306 * It is caller's responsiblity to not re-call this method before calling 307 * <code>postReconnection</code>. 308 */ preReconnection()309 public synchronized ClientListenerInfo[] preReconnection() throws IOException { 310 if (state == TERMINATED || beingReconnected) { // should never 311 throw new IOException("Illegal state."); 312 } 313 314 final ClientListenerInfo[] tmp = 315 infoList.values().toArray(new ClientListenerInfo[0]); 316 317 318 beingReconnected = true; 319 320 infoList.clear(); 321 322 return tmp; 323 } 324 325 /** 326 * Called after reconnection is finished. 327 * This method is intended to be called only by a client connector: 328 * <code>RMIConnector</code> and <code>ClientIntermediary</code>. 329 */ postReconnection(ClientListenerInfo[] listenerInfos)330 public synchronized void postReconnection(ClientListenerInfo[] listenerInfos) 331 throws IOException { 332 333 if (state == TERMINATED) { 334 return; 335 } 336 337 while (state == STOPPING) { 338 try { 339 wait(); 340 } catch (InterruptedException ire) { 341 IOException ioe = new IOException(ire.toString()); 342 EnvHelp.initCause(ioe, ire); 343 throw ioe; 344 } 345 } 346 347 final boolean trace = logger.traceOn(); 348 final int len = listenerInfos.length; 349 350 for (int i=0; i<len; i++) { 351 if (trace) { 352 logger.trace("addNotificationListeners", 353 "Add a listener at "+ 354 listenerInfos[i].getListenerID()); 355 } 356 357 infoList.put(listenerInfos[i].getListenerID(), listenerInfos[i]); 358 } 359 360 beingReconnected = false; 361 notifyAll(); 362 363 if (currentFetchThread == Thread.currentThread() || 364 state == STARTING || state == STARTED) { // doing or waiting reconnection 365 // only update mbeanRemovedNotifID 366 try { 367 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif(); 368 } catch (Exception e) { 369 final String msg = 370 "Failed to register a listener to the mbean " + 371 "server: the client will not do clean when an MBean " + 372 "is unregistered"; 373 if (logger.traceOn()) { 374 logger.trace("init", msg, e); 375 } 376 } 377 } else { 378 while (state == STOPPING) { 379 try { 380 wait(); 381 } catch (InterruptedException ire) { 382 IOException ioe = new IOException(ire.toString()); 383 EnvHelp.initCause(ioe, ire); 384 throw ioe; 385 } 386 } 387 388 if (listenerInfos.length > 0) { // old listeners are re-added 389 init(true); // not update clientSequenceNumber 390 } else if (infoList.size() > 0) { // only new listeners added during reconnection 391 init(false); // need update clientSequenceNumber 392 } 393 } 394 } 395 terminate()396 public synchronized void terminate() { 397 if (state == TERMINATED) { 398 return; 399 } 400 401 if (logger.traceOn()) { 402 logger.trace("terminate", "Terminating..."); 403 } 404 405 if (state == STARTED) { 406 infoList.clear(); 407 } 408 409 setState(TERMINATED); 410 } 411 412 413 // ------------------------------------------------- 414 // private classes 415 // ------------------------------------------------- 416 // 417 418 private class NotifFetcher implements Runnable { 419 420 private volatile boolean alreadyLogged = false; 421 logOnce(String msg, SecurityException x)422 private void logOnce(String msg, SecurityException x) { 423 if (alreadyLogged) return; 424 // Log only once. 425 logger.config("setContextClassLoader",msg); 426 if (x != null) logger.fine("setContextClassLoader", x); 427 alreadyLogged = true; 428 } 429 430 // Set new context class loader, returns previous one. setContextClassLoader(final ClassLoader loader)431 private final ClassLoader setContextClassLoader(final ClassLoader loader) { 432 final AccessControlContext ctxt = ClientNotifForwarder.this.acc; 433 // if ctxt is null, log a config message and throw a 434 // SecurityException. 435 if (ctxt == null) { 436 logOnce("AccessControlContext must not be null.",null); 437 throw new SecurityException("AccessControlContext must not be null"); 438 } 439 return AccessController.doPrivileged( 440 new PrivilegedAction<ClassLoader>() { 441 public ClassLoader run() { 442 try { 443 // get context class loader - may throw 444 // SecurityException - though unlikely. 445 final ClassLoader previous = 446 Thread.currentThread().getContextClassLoader(); 447 448 // if nothing needs to be done, break here... 449 if (loader == previous) return previous; 450 451 // reset context class loader - may throw 452 // SecurityException 453 Thread.currentThread().setContextClassLoader(loader); 454 return previous; 455 } catch (SecurityException x) { 456 logOnce("Permission to set ContextClassLoader missing. " + 457 "Notifications will not be dispatched. " + 458 "Please check your Java policy configuration: " + 459 x, x); 460 throw x; 461 } 462 } 463 }, ctxt); 464 } 465 466 public void run() { 467 final ClassLoader previous; 468 if (defaultClassLoader != null) { 469 previous = setContextClassLoader(defaultClassLoader); 470 } else { 471 previous = null; 472 } 473 try { 474 doRun(); 475 } finally { 476 if (defaultClassLoader != null) { 477 setContextClassLoader(previous); 478 } 479 } 480 } 481 482 private void doRun() { 483 synchronized (ClientNotifForwarder.this) { 484 currentFetchThread = Thread.currentThread(); 485 486 if (state == STARTING) { 487 setState(STARTED); 488 } 489 } 490 491 492 NotificationResult nr = null; 493 if (!shouldStop() && (nr = fetchNotifs()) != null) { 494 // nr == null means got exception 495 496 final TargetedNotification[] notifs = 497 nr.getTargetedNotifications(); 498 final int len = notifs.length; 499 final Map<Integer, ClientListenerInfo> listeners; 500 final Integer myListenerID; 501 502 long missed = 0; 503 504 synchronized(ClientNotifForwarder.this) { 505 // check sequence number. 506 // 507 if (clientSequenceNumber >= 0) { 508 missed = nr.getEarliestSequenceNumber() - 509 clientSequenceNumber; 510 } 511 512 clientSequenceNumber = nr.getNextSequenceNumber(); 513 514 listeners = new HashMap<Integer, ClientListenerInfo>(); 515 516 for (int i = 0 ; i < len ; i++) { 517 final TargetedNotification tn = notifs[i]; 518 final Integer listenerID = tn.getListenerID(); 519 520 // check if an mbean unregistration notif 521 if (!listenerID.equals(mbeanRemovedNotifID)) { 522 final ClientListenerInfo li = infoList.get(listenerID); 523 if (li != null) { 524 listeners.put(listenerID, li); 525 } 526 continue; 527 } 528 final Notification notif = tn.getNotification(); 529 final String unreg = 530 MBeanServerNotification.UNREGISTRATION_NOTIFICATION; 531 if (notif instanceof MBeanServerNotification && 532 notif.getType().equals(unreg)) { 533 534 MBeanServerNotification mbsn = 535 (MBeanServerNotification) notif; 536 ObjectName name = mbsn.getMBeanName(); 537 538 removeNotificationListener(name); 539 } 540 } 541 myListenerID = mbeanRemovedNotifID; 542 } 543 544 if (missed > 0) { 545 final String msg = 546 "May have lost up to " + missed + 547 " notification" + (missed == 1 ? "" : "s"); 548 lostNotifs(msg, missed); 549 logger.trace("NotifFetcher.run", msg); 550 } 551 552 // forward 553 for (int i = 0 ; i < len ; i++) { 554 final TargetedNotification tn = notifs[i]; 555 dispatchNotification(tn,myListenerID,listeners); 556 } 557 } 558 559 synchronized (ClientNotifForwarder.this) { 560 currentFetchThread = null; 561 } 562 563 if (nr == null) { 564 if (logger.traceOn()) { 565 logger.trace("NotifFetcher-run", 566 "Recieved null object as notifs, stops fetching because the " 567 + "notification server is terminated."); 568 } 569 } 570 if (nr == null || shouldStop()) { 571 // tell that the thread is REALLY stopped 572 setState(STOPPED); 573 574 try { 575 removeListenerForMBeanRemovedNotif(mbeanRemovedNotifID); 576 } catch (Exception e) { 577 if (logger.traceOn()) { 578 logger.trace("NotifFetcher-run", 579 "removeListenerForMBeanRemovedNotif", e); 580 } 581 } 582 } else { 583 try { 584 executor.execute(this); 585 } catch (Exception e) { 586 if (isRejectedExecutionException(e)) { 587 // We reached here because the executor was shutdown. 588 // If executor was supplied by client, then it was shutdown 589 // abruptly or JMXConnector was shutdown along with executor 590 // while this thread was suspended at L564. 591 if (!(executor instanceof LinearExecutor)) { 592 // Spawn new executor that will do cleanup if JMXConnector is closed 593 // or keep notif system running otherwise 594 executor = new LinearExecutor(); 595 executor.execute(this); 596 } 597 } else { 598 throw e; 599 } 600 } 601 } 602 } 603 604 private boolean isRejectedExecutionException(Exception e) { 605 Throwable cause = e; 606 while (cause != null) { 607 if (cause instanceof RejectedExecutionException) { 608 return true; 609 } 610 cause = cause.getCause(); 611 } 612 return false; 613 } 614 615 void dispatchNotification(TargetedNotification tn, 616 Integer myListenerID, 617 Map<Integer, ClientListenerInfo> listeners) { 618 final Notification notif = tn.getNotification(); 619 final Integer listenerID = tn.getListenerID(); 620 621 if (listenerID.equals(myListenerID)) return; 622 final ClientListenerInfo li = listeners.get(listenerID); 623 624 if (li == null) { 625 logger.trace("NotifFetcher.dispatch", 626 "Listener ID not in map"); 627 return; 628 } 629 630 NotificationListener l = li.getListener(); 631 Object h = li.getHandback(); 632 try { 633 l.handleNotification(notif, h); 634 } catch (RuntimeException e) { 635 final String msg = 636 "Failed to forward a notification " + 637 "to a listener"; 638 logger.trace("NotifFetcher-run", msg, e); 639 } 640 641 } 642 643 private NotificationResult fetchNotifs() { 644 try { 645 NotificationResult nr = ClientNotifForwarder.this. 646 fetchNotifs(clientSequenceNumber,maxNotifications, 647 timeout); 648 649 if (logger.traceOn()) { 650 logger.trace("NotifFetcher-run", 651 "Got notifications from the server: "+nr); 652 } 653 654 return nr; 655 } catch (ClassNotFoundException | NotSerializableException e) { 656 logger.trace("NotifFetcher.fetchNotifs", e); 657 return fetchOneNotif(); 658 } catch (IOException ioe) { 659 if (!shouldStop()) { 660 logger.error("NotifFetcher-run", 661 "Failed to fetch notification, " + 662 "stopping thread. Error is: " + ioe, ioe); 663 logger.debug("NotifFetcher-run",ioe); 664 } 665 666 // no more fetching 667 return null; 668 } 669 } 670 671 /* Fetch one notification when we suspect that it might be a 672 notification that we can't deserialize (because of a 673 missing class). First we ask for 0 notifications with 0 674 timeout. This allows us to skip sequence numbers for 675 notifications that don't match our filters. Then we ask 676 for one notification. If that produces a 677 ClassNotFoundException, NotSerializableException or 678 UnmarshalException, we increase our sequence number and ask again. 679 Eventually we will either get a successful notification, or a 680 return with 0 notifications. In either case we can return a 681 NotificationResult. This algorithm works (albeit less 682 well) even if the server implementation doesn't optimize a 683 request for 0 notifications to skip sequence numbers for 684 notifications that don't match our filters. 685 686 If we had at least one 687 ClassNotFoundException/NotSerializableException/UnmarshalException, 688 then we must emit a JMXConnectionNotification.LOST_NOTIFS. 689 */ 690 private NotificationResult fetchOneNotif() { 691 ClientNotifForwarder cnf = ClientNotifForwarder.this; 692 693 long startSequenceNumber = clientSequenceNumber; 694 695 int notFoundCount = 0; 696 697 NotificationResult result = null; 698 long firstEarliest = -1; 699 700 while (result == null && !shouldStop()) { 701 NotificationResult nr; 702 703 try { 704 // 0 notifs to update startSequenceNumber 705 nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L); 706 } catch (ClassNotFoundException e) { 707 logger.warning("NotifFetcher.fetchOneNotif", 708 "Impossible exception: " + e); 709 logger.debug("NotifFetcher.fetchOneNotif",e); 710 return null; 711 } catch (IOException e) { 712 if (!shouldStop()) 713 logger.trace("NotifFetcher.fetchOneNotif", e); 714 return null; 715 } 716 717 if (shouldStop() || nr == null) 718 return null; 719 720 startSequenceNumber = nr.getNextSequenceNumber(); 721 if (firstEarliest < 0) 722 firstEarliest = nr.getEarliestSequenceNumber(); 723 724 try { 725 // 1 notif to skip possible missing class 726 result = cnf.fetchNotifs(startSequenceNumber, 1, 0L); 727 } catch (ClassNotFoundException | NotSerializableException e) { 728 logger.warning("NotifFetcher.fetchOneNotif", 729 "Failed to deserialize a notification: "+e.toString()); 730 if (logger.traceOn()) { 731 logger.trace("NotifFetcher.fetchOneNotif", 732 "Failed to deserialize a notification.", e); 733 } 734 735 notFoundCount++; 736 startSequenceNumber++; 737 } catch (Exception e) { 738 if (!shouldStop()) 739 logger.trace("NotifFetcher.fetchOneNotif", e); 740 return null; 741 } 742 } 743 744 if (notFoundCount > 0) { 745 final String msg = 746 "Dropped " + notFoundCount + " notification" + 747 (notFoundCount == 1 ? "" : "s") + 748 " because classes were missing locally or incompatible"; 749 lostNotifs(msg, notFoundCount); 750 // Even if result.getEarliestSequenceNumber() is now greater than 751 // it was initially, meaning some notifs have been dropped 752 // from the buffer, we don't want the caller to see that 753 // because it is then likely to renotify about the lost notifs. 754 // So we put back the first value of earliestSequenceNumber 755 // that we saw. 756 if (result != null) { 757 result = new NotificationResult( 758 firstEarliest, result.getNextSequenceNumber(), 759 result.getTargetedNotifications()); 760 } 761 } 762 763 return result; 764 } 765 766 private boolean shouldStop() { 767 synchronized (ClientNotifForwarder.this) { 768 if (state != STARTED) { 769 return true; 770 } else if (infoList.size() == 0) { 771 // no more listener, stop fetching 772 setState(STOPPING); 773 774 return true; 775 } 776 777 return false; 778 } 779 } 780 } 781 782 783 // ------------------------------------------------- 784 // private methods 785 // ------------------------------------------------- 786 private synchronized void setState(int newState) { 787 if (state == TERMINATED) { 788 return; 789 } 790 791 state = newState; 792 this.notifyAll(); 793 } 794 795 /* 796 * Called to decide whether need to start a thread for fetching notifs. 797 * <P>The parameter reconnected will decide whether to initilize the clientSequenceNumber, 798 * initilaizing the clientSequenceNumber means to ignore all notifications arrived before. 799 * If it is reconnected, we will not initialize in order to get all notifications arrived 800 * during the reconnection. It may cause the newly registered listeners to receive some 801 * notifications arrived before its registray. 802 */ 803 private synchronized void init(boolean reconnected) throws IOException { 804 switch (state) { 805 case STARTED: 806 return; 807 case STARTING: 808 return; 809 case TERMINATED: 810 throw new IOException("The ClientNotifForwarder has been terminated."); 811 case STOPPING: 812 if (beingReconnected == true) { 813 // wait for another thread to do, which is doing reconnection 814 return; 815 } 816 817 while (state == STOPPING) { // make sure only one fetching thread. 818 try { 819 wait(); 820 } catch (InterruptedException ire) { 821 IOException ioe = new IOException(ire.toString()); 822 EnvHelp.initCause(ioe, ire); 823 824 throw ioe; 825 } 826 } 827 828 // re-call this method to check the state again, 829 // the state can be other value like TERMINATED. 830 init(reconnected); 831 832 return; 833 case STOPPED: 834 if (beingReconnected == true) { 835 // wait for another thread to do, which is doing reconnection 836 return; 837 } 838 839 if (logger.traceOn()) { 840 logger.trace("init", "Initializing..."); 841 } 842 843 // init the clientSequenceNumber if not reconnected. 844 if (!reconnected) { 845 try { 846 NotificationResult nr = fetchNotifs(-1, 0, 0); 847 848 if (state != STOPPED) { // JDK-8038940 849 // reconnection must happen during 850 // fetchNotifs(-1, 0, 0), and a new 851 // thread takes over the fetching job 852 return; 853 } 854 855 clientSequenceNumber = nr.getNextSequenceNumber(); 856 } catch (ClassNotFoundException e) { 857 // can't happen 858 logger.warning("init", "Impossible exception: "+ e); 859 logger.debug("init",e); 860 } 861 } 862 863 // for cleaning 864 try { 865 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif(); 866 } catch (Exception e) { 867 final String msg = 868 "Failed to register a listener to the mbean " + 869 "server: the client will not do clean when an MBean " + 870 "is unregistered"; 871 if (logger.traceOn()) { 872 logger.trace("init", msg, e); 873 } 874 } 875 876 setState(STARTING); 877 878 // start fetching 879 executor.execute(new NotifFetcher()); 880 881 return; 882 default: 883 // should not 884 throw new IOException("Unknown state."); 885 } 886 } 887 888 /** 889 * Import: should not remove a listener during reconnection, the reconnection 890 * needs to change the listener list and that will possibly make removal fail. 891 */ 892 private synchronized void beforeRemove() throws IOException { 893 while (beingReconnected) { 894 if (state == TERMINATED) { 895 throw new IOException("Terminated."); 896 } 897 898 try { 899 wait(); 900 } catch (InterruptedException ire) { 901 IOException ioe = new IOException(ire.toString()); 902 EnvHelp.initCause(ioe, ire); 903 904 throw ioe; 905 } 906 } 907 908 if (state == TERMINATED) { 909 throw new IOException("Terminated."); 910 } 911 } 912 913 // ------------------------------------------------- 914 // private variables 915 // ------------------------------------------------- 916 917 private final ClassLoader defaultClassLoader; 918 private Executor executor; 919 920 private final Map<Integer, ClientListenerInfo> infoList = 921 new HashMap<Integer, ClientListenerInfo>(); 922 923 // notif stuff 924 private long clientSequenceNumber = -1; 925 private final int maxNotifications; 926 private final long timeout; 927 private Integer mbeanRemovedNotifID = null; 928 private Thread currentFetchThread; 929 930 // state 931 /** 932 * This state means that a thread is being created for fetching and forwarding notifications. 933 */ 934 private static final int STARTING = 0; 935 936 /** 937 * This state tells that a thread has been started for fetching and forwarding notifications. 938 */ 939 private static final int STARTED = 1; 940 941 /** 942 * This state means that the fetching thread is informed to stop. 943 */ 944 private static final int STOPPING = 2; 945 946 /** 947 * This state means that the fetching thread is already stopped. 948 */ 949 private static final int STOPPED = 3; 950 951 /** 952 * This state means that this object is terminated and no more thread will be created 953 * for fetching notifications. 954 */ 955 private static final int TERMINATED = 4; 956 957 private int state = STOPPED; 958 959 /** 960 * This variable is used to tell whether a connector (RMIConnector or ClientIntermediary) 961 * is doing reconnection. 962 * This variable will be set to true by the method <code>preReconnection</code>, and set 963 * to false by <code>postReconnection</code>. 964 * When beingReconnected == true, no thread will be created for fetching notifications. 965 */ 966 private boolean beingReconnected = false; 967 968 private static final ClassLogger logger = 969 new ClassLogger("javax.management.remote.misc", 970 "ClientNotifForwarder"); 971 } 972