1 /*
2  * Copyright (c) 2002, 2016, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package com.sun.jmx.remote.internal;
26 
27 import java.io.IOException;
28 import java.io.NotSerializableException;
29 
30 import java.util.ArrayList;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.Executor;
35 
36 import java.security.AccessControlContext;
37 import java.security.AccessController;
38 import java.security.PrivilegedAction;
39 import javax.security.auth.Subject;
40 
41 import javax.management.Notification;
42 import javax.management.NotificationListener;
43 import javax.management.NotificationFilter;
44 import javax.management.ObjectName;
45 import javax.management.MBeanServerNotification;
46 import javax.management.InstanceNotFoundException;
47 import javax.management.ListenerNotFoundException;
48 
49 import javax.management.remote.NotificationResult;
50 import javax.management.remote.TargetedNotification;
51 
52 import com.sun.jmx.remote.util.ClassLogger;
53 import com.sun.jmx.remote.util.EnvHelp;
54 import java.lang.reflect.UndeclaredThrowableException;
55 import java.util.concurrent.RejectedExecutionException;
56 
57 
58 public abstract class ClientNotifForwarder {
59 
60     private final AccessControlContext acc;
61 
ClientNotifForwarder(Map<String, ?> env)62     public ClientNotifForwarder(Map<String, ?> env) {
63         this(null, env);
64     }
65 
66     private static int threadId;
67 
68     /* An Executor that allows at most one executing and one pending
69        Runnable.  It uses at most one thread -- as soon as there is
70        no pending Runnable the thread can exit.  Another thread is
71        created as soon as there is a new pending Runnable.  This
72        Executor is adapted for use in a situation where each Runnable
73        usually schedules up another Runnable.  On return from the
74        first one, the second one is immediately executed.  So this
75        just becomes a complicated way to write a while loop, but with
76        the advantage that you can replace it with another Executor,
77        for instance one that you are using to execute a bunch of other
78        unrelated work.
79 
80        You might expect that a java.util.concurrent.ThreadPoolExecutor
81        with corePoolSize=0 and maximumPoolSize=1 would have the same
82        behavior, but it does not.  A ThreadPoolExecutor only creates
83        a new thread when a new task is submitted and the number of
84        existing threads is < corePoolSize.  This can never happen when
85        corePoolSize=0, so new threads are never created.  Surprising,
86        but there you are.
87     */
88     private static class LinearExecutor implements Executor {
execute(Runnable command)89         public synchronized void execute(Runnable command) {
90             if (this.command != null)
91                 throw new IllegalArgumentException("More than one command");
92             this.command = command;
93             if (thread == null) {
94                 thread = new Thread(
95                     null,
96                     ()-> {
97                         while (true) {
98                             Runnable r;
99                             synchronized (LinearExecutor.this) {
100                                 if (LinearExecutor.this.command == null) {
101                                     thread = null;
102                                     return;
103                                 } else {
104                                     r = LinearExecutor.this.command;
105                                     LinearExecutor.this.command = null;
106                                 }
107                             }
108                             r.run();
109                         }
110                     },
111                     "ClientNotifForwarder-" + ++threadId,
112                     0,
113                     false
114                 );
115                 thread.setDaemon(true);
116                 thread.start();
117             }
118         }
119 
120         private Runnable command;
121         private Thread thread;
122     }
123 
ClientNotifForwarder(ClassLoader defaultClassLoader, Map<String, ?> env)124     public ClientNotifForwarder(ClassLoader defaultClassLoader, Map<String, ?> env) {
125         maxNotifications = EnvHelp.getMaxFetchNotifNumber(env);
126         timeout = EnvHelp.getFetchTimeout(env);
127 
128         /* You can supply an Executor in which the remote call to
129            fetchNotifications will be made.  The Executor's execute
130            method reschedules another task, so you must not use
131            an Executor that executes tasks in the caller's thread.  */
132         Executor ex = (Executor)
133             env.get("jmx.remote.x.fetch.notifications.executor");
134         if (ex == null)
135             ex = new LinearExecutor();
136         else if (logger.traceOn())
137             logger.trace("ClientNotifForwarder", "executor is " + ex);
138 
139         this.defaultClassLoader = defaultClassLoader;
140         this.executor = ex;
141         this.acc = AccessController.getContext();
142     }
143 
144     /**
145      * Called to fetch notifications from a server.
146      */
fetchNotifs(long clientSequenceNumber, int maxNotifications, long timeout)147     abstract protected NotificationResult fetchNotifs(long clientSequenceNumber,
148                                                       int maxNotifications,
149                                                       long timeout)
150             throws IOException, ClassNotFoundException;
151 
addListenerForMBeanRemovedNotif()152     abstract protected Integer addListenerForMBeanRemovedNotif()
153         throws IOException, InstanceNotFoundException;
154 
removeListenerForMBeanRemovedNotif(Integer id)155     abstract protected void removeListenerForMBeanRemovedNotif(Integer id)
156         throws IOException, InstanceNotFoundException,
157                ListenerNotFoundException;
158 
159     /**
160      * Used to send out a notification about lost notifs
161      */
lostNotifs(String message, long number)162     abstract protected void lostNotifs(String message, long number);
163 
164 
addNotificationListener(Integer listenerID, ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback, Subject delegationSubject)165     public synchronized void addNotificationListener(Integer listenerID,
166                                         ObjectName name,
167                                         NotificationListener listener,
168                                         NotificationFilter filter,
169                                         Object handback,
170                                         Subject delegationSubject)
171             throws IOException, InstanceNotFoundException {
172 
173         if (logger.traceOn()) {
174             logger.trace("addNotificationListener",
175                          "Add the listener "+listener+" at "+name);
176         }
177 
178         infoList.put(listenerID,
179                      new ClientListenerInfo(listenerID,
180                                             name,
181                                             listener,
182                                             filter,
183                                             handback,
184                                             delegationSubject));
185 
186 
187         init(false);
188     }
189 
190     public synchronized Integer[]
getListenerIds(ObjectName name, NotificationListener listener)191     getListenerIds(ObjectName name,
192                    NotificationListener listener)
193             throws ListenerNotFoundException, IOException {
194 
195         List<Integer> ids = new ArrayList<Integer>();
196         List<ClientListenerInfo> values =
197                 new ArrayList<ClientListenerInfo>(infoList.values());
198         for (int i=values.size()-1; i>=0; i--) {
199             ClientListenerInfo li = values.get(i);
200 
201             if (li.sameAs(name, listener)) {
202                 ids.add(li.getListenerID());
203             }
204         }
205 
206         if (ids.isEmpty())
207             throw new ListenerNotFoundException("Listener not found");
208 
209         return ids.toArray(new Integer[0]);
210     }
211 
212     public synchronized Integer
getListenerId(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback)213     getListenerId(ObjectName name,
214                    NotificationListener listener,
215                    NotificationFilter filter,
216                    Object handback)
217             throws ListenerNotFoundException, IOException {
218 
219         Integer id = null;
220 
221         List<ClientListenerInfo> values =
222                 new ArrayList<ClientListenerInfo>(infoList.values());
223         for (int i=values.size()-1; i>=0; i--) {
224             ClientListenerInfo li = values.get(i);
225             if (li.sameAs(name, listener, filter, handback)) {
226                 id=li.getListenerID();
227                 break;
228             }
229         }
230 
231         if (id == null)
232             throw new ListenerNotFoundException("Listener not found");
233 
234         return id;
235     }
236 
237     public synchronized Integer[]
removeNotificationListener(ObjectName name, NotificationListener listener)238         removeNotificationListener(ObjectName name,
239                                    NotificationListener listener)
240         throws ListenerNotFoundException, IOException {
241 
242         beforeRemove();
243 
244         if (logger.traceOn()) {
245             logger.trace("removeNotificationListener",
246                          "Remove the listener "+listener+" from "+name);
247         }
248         Integer[] liIds = getListenerIds(name, listener);
249         for (int i = 0; i < liIds.length; i++) {
250             infoList.remove(liIds[i]);
251         }
252 
253         return liIds;
254     }
255 
256     public synchronized Integer
removeNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback)257         removeNotificationListener(ObjectName name,
258                                    NotificationListener listener,
259                                    NotificationFilter filter,
260                                    Object handback)
261             throws ListenerNotFoundException, IOException {
262 
263         if (logger.traceOn()) {
264             logger.trace("removeNotificationListener",
265                          "Remove the listener "+listener+" from "+name);
266         }
267 
268         beforeRemove();
269         Integer liId = getListenerId(name, listener,
270                 filter, handback);
271         infoList.remove(liId);
272 
273         return liId;
274     }
275 
removeNotificationListener(ObjectName name)276     public synchronized Integer[] removeNotificationListener(ObjectName name) {
277         if (logger.traceOn()) {
278             logger.trace("removeNotificationListener",
279                          "Remove all listeners registered at "+name);
280         }
281 
282         List<Integer> ids = new ArrayList<Integer>();
283 
284         List<ClientListenerInfo> values =
285                 new ArrayList<ClientListenerInfo>(infoList.values());
286         for (int i=values.size()-1; i>=0; i--) {
287             ClientListenerInfo li = values.get(i);
288             if (li.sameAs(name)) {
289                 ids.add(li.getListenerID());
290 
291                 infoList.remove(li.getListenerID());
292             }
293         }
294 
295         return ids.toArray(new Integer[0]);
296     }
297 
298     /*
299      * Called when a connector is doing reconnection. Like <code>postReconnection</code>,
300      * this method is intended to be called only by a client connector:
301      * <code>RMIConnector</code> and <code>ClientIntermediary</code>.
302      * Call this method will set the flag beingReconnection to <code>true</code>,
303      * and the thread used to fetch notifis will be stopped, a new thread can be
304      * created only after the method <code>postReconnection</code> is called.
305      *
306      * It is caller's responsiblity to not re-call this method before calling
307      * <code>postReconnection</code>.
308      */
preReconnection()309     public synchronized ClientListenerInfo[] preReconnection() throws IOException {
310         if (state == TERMINATED || beingReconnected) { // should never
311             throw new IOException("Illegal state.");
312         }
313 
314         final ClientListenerInfo[] tmp =
315             infoList.values().toArray(new ClientListenerInfo[0]);
316 
317 
318         beingReconnected = true;
319 
320         infoList.clear();
321 
322         return tmp;
323     }
324 
325     /**
326      * Called after reconnection is finished.
327      * This method is intended to be called only by a client connector:
328      * <code>RMIConnector</code> and <code>ClientIntermediary</code>.
329      */
postReconnection(ClientListenerInfo[] listenerInfos)330     public synchronized void postReconnection(ClientListenerInfo[] listenerInfos)
331         throws IOException {
332 
333         if (state == TERMINATED) {
334             return;
335         }
336 
337         while (state == STOPPING) {
338             try {
339                 wait();
340             } catch (InterruptedException ire) {
341                 IOException ioe = new IOException(ire.toString());
342                 EnvHelp.initCause(ioe, ire);
343                 throw ioe;
344             }
345         }
346 
347         final boolean trace = logger.traceOn();
348         final int len   = listenerInfos.length;
349 
350         for (int i=0; i<len; i++) {
351             if (trace) {
352                 logger.trace("addNotificationListeners",
353                              "Add a listener at "+
354                              listenerInfos[i].getListenerID());
355             }
356 
357             infoList.put(listenerInfos[i].getListenerID(), listenerInfos[i]);
358         }
359 
360         beingReconnected = false;
361         notifyAll();
362 
363         if (currentFetchThread == Thread.currentThread() ||
364               state == STARTING || state == STARTED) { // doing or waiting reconnection
365               // only update mbeanRemovedNotifID
366             try {
367                 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
368             } catch (Exception e) {
369                 final String msg =
370                     "Failed to register a listener to the mbean " +
371                     "server: the client will not do clean when an MBean " +
372                     "is unregistered";
373                 if (logger.traceOn()) {
374                     logger.trace("init", msg, e);
375                 }
376             }
377         } else {
378               while (state == STOPPING) {
379                   try {
380                       wait();
381                   } catch (InterruptedException ire) {
382                       IOException ioe = new IOException(ire.toString());
383                       EnvHelp.initCause(ioe, ire);
384                       throw ioe;
385                   }
386               }
387 
388               if (listenerInfos.length > 0) { // old listeners are re-added
389                   init(true); // not update clientSequenceNumber
390               } else if (infoList.size() > 0) { // only new listeners added during reconnection
391                   init(false); // need update clientSequenceNumber
392               }
393           }
394     }
395 
terminate()396     public synchronized void terminate() {
397         if (state == TERMINATED) {
398             return;
399         }
400 
401         if (logger.traceOn()) {
402             logger.trace("terminate", "Terminating...");
403         }
404 
405         if (state == STARTED) {
406            infoList.clear();
407         }
408 
409         setState(TERMINATED);
410     }
411 
412 
413     // -------------------------------------------------
414     // private classes
415     // -------------------------------------------------
416     //
417 
418     private class NotifFetcher implements Runnable {
419 
420         private volatile boolean alreadyLogged = false;
421 
logOnce(String msg, SecurityException x)422         private void logOnce(String msg, SecurityException x) {
423             if (alreadyLogged) return;
424             // Log only once.
425             logger.config("setContextClassLoader",msg);
426             if (x != null) logger.fine("setContextClassLoader", x);
427             alreadyLogged = true;
428         }
429 
430         // Set new context class loader, returns previous one.
setContextClassLoader(final ClassLoader loader)431         private final ClassLoader setContextClassLoader(final ClassLoader loader) {
432             final AccessControlContext ctxt = ClientNotifForwarder.this.acc;
433             // if ctxt is null, log a config message and throw a
434             // SecurityException.
435             if (ctxt == null) {
436                 logOnce("AccessControlContext must not be null.",null);
437                 throw new SecurityException("AccessControlContext must not be null");
438             }
439             return AccessController.doPrivileged(
440                 new PrivilegedAction<ClassLoader>() {
441                     public ClassLoader run() {
442                         try {
443                             // get context class loader - may throw
444                             // SecurityException - though unlikely.
445                             final ClassLoader previous =
446                                 Thread.currentThread().getContextClassLoader();
447 
448                             // if nothing needs to be done, break here...
449                             if (loader == previous) return previous;
450 
451                             // reset context class loader - may throw
452                             // SecurityException
453                             Thread.currentThread().setContextClassLoader(loader);
454                             return previous;
455                         } catch (SecurityException x) {
456                             logOnce("Permission to set ContextClassLoader missing. " +
457                                     "Notifications will not be dispatched. " +
458                                     "Please check your Java policy configuration: " +
459                                     x, x);
460                             throw x;
461                         }
462                     }
463                 }, ctxt);
464         }
465 
466         public void run() {
467             final ClassLoader previous;
468             if (defaultClassLoader != null) {
469                 previous = setContextClassLoader(defaultClassLoader);
470             } else {
471                 previous = null;
472             }
473             try {
474                 doRun();
475             } finally {
476                 if (defaultClassLoader != null) {
477                     setContextClassLoader(previous);
478                 }
479             }
480         }
481 
482         private void doRun() {
483             synchronized (ClientNotifForwarder.this) {
484                 currentFetchThread = Thread.currentThread();
485 
486                 if (state == STARTING) {
487                     setState(STARTED);
488                 }
489             }
490 
491 
492             NotificationResult nr = null;
493             if (!shouldStop() && (nr = fetchNotifs()) != null) {
494                 // nr == null means got exception
495 
496                 final TargetedNotification[] notifs =
497                     nr.getTargetedNotifications();
498                 final int len = notifs.length;
499                 final Map<Integer, ClientListenerInfo> listeners;
500                 final Integer myListenerID;
501 
502                 long missed = 0;
503 
504                 synchronized(ClientNotifForwarder.this) {
505                     // check sequence number.
506                     //
507                     if (clientSequenceNumber >= 0) {
508                         missed = nr.getEarliestSequenceNumber() -
509                             clientSequenceNumber;
510                     }
511 
512                     clientSequenceNumber = nr.getNextSequenceNumber();
513 
514                     listeners = new HashMap<Integer, ClientListenerInfo>();
515 
516                     for (int i = 0 ; i < len ; i++) {
517                         final TargetedNotification tn = notifs[i];
518                         final Integer listenerID = tn.getListenerID();
519 
520                         // check if an mbean unregistration notif
521                         if (!listenerID.equals(mbeanRemovedNotifID)) {
522                             final ClientListenerInfo li = infoList.get(listenerID);
523                             if (li != null) {
524                                 listeners.put(listenerID, li);
525                             }
526                             continue;
527                         }
528                         final Notification notif = tn.getNotification();
529                         final String unreg =
530                             MBeanServerNotification.UNREGISTRATION_NOTIFICATION;
531                         if (notif instanceof MBeanServerNotification &&
532                             notif.getType().equals(unreg)) {
533 
534                             MBeanServerNotification mbsn =
535                                 (MBeanServerNotification) notif;
536                             ObjectName name = mbsn.getMBeanName();
537 
538                             removeNotificationListener(name);
539                         }
540                     }
541                     myListenerID = mbeanRemovedNotifID;
542                 }
543 
544                 if (missed > 0) {
545                     final String msg =
546                         "May have lost up to " + missed +
547                         " notification" + (missed == 1 ? "" : "s");
548                     lostNotifs(msg, missed);
549                     logger.trace("NotifFetcher.run", msg);
550                 }
551 
552                 // forward
553                 for (int i = 0 ; i < len ; i++) {
554                     final TargetedNotification tn = notifs[i];
555                     dispatchNotification(tn,myListenerID,listeners);
556                 }
557             }
558 
559             synchronized (ClientNotifForwarder.this) {
560                 currentFetchThread = null;
561             }
562 
563             if (nr == null) {
564                 if (logger.traceOn()) {
565                     logger.trace("NotifFetcher-run",
566                             "Recieved null object as notifs, stops fetching because the "
567                                     + "notification server is terminated.");
568                 }
569             }
570             if (nr == null || shouldStop()) {
571                 // tell that the thread is REALLY stopped
572                 setState(STOPPED);
573 
574                 try {
575                       removeListenerForMBeanRemovedNotif(mbeanRemovedNotifID);
576                 } catch (Exception e) {
577                     if (logger.traceOn()) {
578                         logger.trace("NotifFetcher-run",
579                                 "removeListenerForMBeanRemovedNotif", e);
580                     }
581                 }
582             } else {
583                 try {
584                     executor.execute(this);
585                 } catch (Exception e) {
586                     if (isRejectedExecutionException(e)) {
587                         // We reached here because the executor was shutdown.
588                         // If executor was supplied by client, then it was shutdown
589                         // abruptly or JMXConnector was shutdown along with executor
590                         // while this thread was suspended at L564.
591                         if (!(executor instanceof LinearExecutor)) {
592                             // Spawn new executor that will do cleanup if JMXConnector is closed
593                             // or keep notif system running otherwise
594                             executor = new LinearExecutor();
595                             executor.execute(this);
596                         }
597                     } else {
598                         throw e;
599                     }
600                 }
601             }
602         }
603 
604         private boolean isRejectedExecutionException(Exception e) {
605             Throwable cause = e;
606             while (cause != null) {
607                 if (cause instanceof RejectedExecutionException) {
608                     return true;
609                 }
610                 cause = cause.getCause();
611             }
612             return false;
613         }
614 
615         void dispatchNotification(TargetedNotification tn,
616                                   Integer myListenerID,
617                                   Map<Integer, ClientListenerInfo> listeners) {
618             final Notification notif = tn.getNotification();
619             final Integer listenerID = tn.getListenerID();
620 
621             if (listenerID.equals(myListenerID)) return;
622             final ClientListenerInfo li = listeners.get(listenerID);
623 
624             if (li == null) {
625                 logger.trace("NotifFetcher.dispatch",
626                              "Listener ID not in map");
627                 return;
628             }
629 
630             NotificationListener l = li.getListener();
631             Object h = li.getHandback();
632             try {
633                 l.handleNotification(notif, h);
634             } catch (RuntimeException e) {
635                 final String msg =
636                     "Failed to forward a notification " +
637                     "to a listener";
638                 logger.trace("NotifFetcher-run", msg, e);
639             }
640 
641         }
642 
643         private NotificationResult fetchNotifs() {
644             try {
645                 NotificationResult nr = ClientNotifForwarder.this.
646                     fetchNotifs(clientSequenceNumber,maxNotifications,
647                                 timeout);
648 
649                 if (logger.traceOn()) {
650                     logger.trace("NotifFetcher-run",
651                                  "Got notifications from the server: "+nr);
652                 }
653 
654                 return nr;
655             } catch (ClassNotFoundException | NotSerializableException e) {
656                 logger.trace("NotifFetcher.fetchNotifs", e);
657                 return fetchOneNotif();
658             } catch (IOException ioe) {
659                 if (!shouldStop()) {
660                     logger.error("NotifFetcher-run",
661                                  "Failed to fetch notification, " +
662                                  "stopping thread. Error is: " + ioe, ioe);
663                     logger.debug("NotifFetcher-run",ioe);
664                 }
665 
666                 // no more fetching
667                 return null;
668             }
669         }
670 
671         /* Fetch one notification when we suspect that it might be a
672            notification that we can't deserialize (because of a
673            missing class).  First we ask for 0 notifications with 0
674            timeout.  This allows us to skip sequence numbers for
675            notifications that don't match our filters.  Then we ask
676            for one notification.  If that produces a
677            ClassNotFoundException, NotSerializableException or
678            UnmarshalException, we increase our sequence number and ask again.
679            Eventually we will either get a successful notification, or a
680            return with 0 notifications.  In either case we can return a
681            NotificationResult.  This algorithm works (albeit less
682            well) even if the server implementation doesn't optimize a
683            request for 0 notifications to skip sequence numbers for
684            notifications that don't match our filters.
685 
686            If we had at least one
687            ClassNotFoundException/NotSerializableException/UnmarshalException,
688            then we must emit a JMXConnectionNotification.LOST_NOTIFS.
689         */
690         private NotificationResult fetchOneNotif() {
691             ClientNotifForwarder cnf = ClientNotifForwarder.this;
692 
693             long startSequenceNumber = clientSequenceNumber;
694 
695             int notFoundCount = 0;
696 
697             NotificationResult result = null;
698             long firstEarliest = -1;
699 
700             while (result == null && !shouldStop()) {
701                 NotificationResult nr;
702 
703                 try {
704                     // 0 notifs to update startSequenceNumber
705                     nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L);
706                 } catch (ClassNotFoundException e) {
707                     logger.warning("NotifFetcher.fetchOneNotif",
708                                    "Impossible exception: " + e);
709                     logger.debug("NotifFetcher.fetchOneNotif",e);
710                     return null;
711                 } catch (IOException e) {
712                     if (!shouldStop())
713                         logger.trace("NotifFetcher.fetchOneNotif", e);
714                     return null;
715                 }
716 
717                 if (shouldStop() || nr == null)
718                     return null;
719 
720                 startSequenceNumber = nr.getNextSequenceNumber();
721                 if (firstEarliest < 0)
722                     firstEarliest = nr.getEarliestSequenceNumber();
723 
724                 try {
725                     // 1 notif to skip possible missing class
726                     result = cnf.fetchNotifs(startSequenceNumber, 1, 0L);
727                 } catch (ClassNotFoundException | NotSerializableException e) {
728                     logger.warning("NotifFetcher.fetchOneNotif",
729                                    "Failed to deserialize a notification: "+e.toString());
730                     if (logger.traceOn()) {
731                         logger.trace("NotifFetcher.fetchOneNotif",
732                                      "Failed to deserialize a notification.", e);
733                     }
734 
735                     notFoundCount++;
736                     startSequenceNumber++;
737                 } catch (Exception e) {
738                     if (!shouldStop())
739                         logger.trace("NotifFetcher.fetchOneNotif", e);
740                     return null;
741                 }
742             }
743 
744             if (notFoundCount > 0) {
745                 final String msg =
746                     "Dropped " + notFoundCount + " notification" +
747                     (notFoundCount == 1 ? "" : "s") +
748                     " because classes were missing locally or incompatible";
749                 lostNotifs(msg, notFoundCount);
750                 // Even if result.getEarliestSequenceNumber() is now greater than
751                 // it was initially, meaning some notifs have been dropped
752                 // from the buffer, we don't want the caller to see that
753                 // because it is then likely to renotify about the lost notifs.
754                 // So we put back the first value of earliestSequenceNumber
755                 // that we saw.
756                 if (result != null) {
757                     result = new NotificationResult(
758                             firstEarliest, result.getNextSequenceNumber(),
759                             result.getTargetedNotifications());
760                 }
761             }
762 
763             return result;
764         }
765 
766         private boolean shouldStop() {
767             synchronized (ClientNotifForwarder.this) {
768                 if (state != STARTED) {
769                     return true;
770                 } else if (infoList.size() == 0) {
771                     // no more listener, stop fetching
772                     setState(STOPPING);
773 
774                     return true;
775                 }
776 
777                 return false;
778             }
779         }
780     }
781 
782 
783 // -------------------------------------------------
784 // private methods
785 // -------------------------------------------------
786     private synchronized void setState(int newState) {
787         if (state == TERMINATED) {
788             return;
789         }
790 
791         state = newState;
792         this.notifyAll();
793     }
794 
795     /*
796      * Called to decide whether need to start a thread for fetching notifs.
797      * <P>The parameter reconnected will decide whether to initilize the clientSequenceNumber,
798      * initilaizing the clientSequenceNumber means to ignore all notifications arrived before.
799      * If it is reconnected, we will not initialize in order to get all notifications arrived
800      * during the reconnection. It may cause the newly registered listeners to receive some
801      * notifications arrived before its registray.
802      */
803     private synchronized void init(boolean reconnected) throws IOException {
804         switch (state) {
805         case STARTED:
806             return;
807         case STARTING:
808             return;
809         case TERMINATED:
810             throw new IOException("The ClientNotifForwarder has been terminated.");
811         case STOPPING:
812             if (beingReconnected == true) {
813                 // wait for another thread to do, which is doing reconnection
814                 return;
815             }
816 
817             while (state == STOPPING) { // make sure only one fetching thread.
818                 try {
819                     wait();
820                 } catch (InterruptedException ire) {
821                     IOException ioe = new IOException(ire.toString());
822                     EnvHelp.initCause(ioe, ire);
823 
824                     throw ioe;
825                 }
826             }
827 
828             // re-call this method to check the state again,
829             // the state can be other value like TERMINATED.
830             init(reconnected);
831 
832             return;
833         case STOPPED:
834             if (beingReconnected == true) {
835                 // wait for another thread to do, which is doing reconnection
836                 return;
837             }
838 
839             if (logger.traceOn()) {
840                 logger.trace("init", "Initializing...");
841             }
842 
843             // init the clientSequenceNumber if not reconnected.
844             if (!reconnected) {
845                 try {
846                     NotificationResult nr = fetchNotifs(-1, 0, 0);
847 
848                     if (state != STOPPED) { // JDK-8038940
849                                             // reconnection must happen during
850                                             // fetchNotifs(-1, 0, 0), and a new
851                                             // thread takes over the fetching job
852                         return;
853                     }
854 
855                     clientSequenceNumber = nr.getNextSequenceNumber();
856                 } catch (ClassNotFoundException e) {
857                     // can't happen
858                     logger.warning("init", "Impossible exception: "+ e);
859                     logger.debug("init",e);
860                 }
861             }
862 
863             // for cleaning
864             try {
865                 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
866             } catch (Exception e) {
867                 final String msg =
868                     "Failed to register a listener to the mbean " +
869                     "server: the client will not do clean when an MBean " +
870                     "is unregistered";
871                 if (logger.traceOn()) {
872                     logger.trace("init", msg, e);
873                 }
874             }
875 
876             setState(STARTING);
877 
878             // start fetching
879             executor.execute(new NotifFetcher());
880 
881             return;
882         default:
883             // should not
884             throw new IOException("Unknown state.");
885         }
886     }
887 
888     /**
889      * Import: should not remove a listener during reconnection, the reconnection
890      * needs to change the listener list and that will possibly make removal fail.
891      */
892     private synchronized void beforeRemove() throws IOException {
893         while (beingReconnected) {
894             if (state == TERMINATED) {
895                 throw new IOException("Terminated.");
896             }
897 
898             try {
899                 wait();
900             } catch (InterruptedException ire) {
901                 IOException ioe = new IOException(ire.toString());
902                 EnvHelp.initCause(ioe, ire);
903 
904                 throw ioe;
905             }
906         }
907 
908         if (state == TERMINATED) {
909             throw new IOException("Terminated.");
910         }
911     }
912 
913 // -------------------------------------------------
914 // private variables
915 // -------------------------------------------------
916 
917     private final ClassLoader defaultClassLoader;
918     private Executor executor;
919 
920     private final Map<Integer, ClientListenerInfo> infoList =
921             new HashMap<Integer, ClientListenerInfo>();
922 
923     // notif stuff
924     private long clientSequenceNumber = -1;
925     private final int maxNotifications;
926     private final long timeout;
927     private Integer mbeanRemovedNotifID = null;
928     private Thread currentFetchThread;
929 
930     // state
931     /**
932      * This state means that a thread is being created for fetching and forwarding notifications.
933      */
934     private static final int STARTING = 0;
935 
936     /**
937      * This state tells that a thread has been started for fetching and forwarding notifications.
938      */
939     private static final int STARTED = 1;
940 
941     /**
942      * This state means that the fetching thread is informed to stop.
943      */
944     private static final int STOPPING = 2;
945 
946     /**
947      * This state means that the fetching thread is already stopped.
948      */
949     private static final int STOPPED = 3;
950 
951     /**
952      * This state means that this object is terminated and no more thread will be created
953      * for fetching notifications.
954      */
955     private static final int TERMINATED = 4;
956 
957     private int state = STOPPED;
958 
959     /**
960      * This variable is used to tell whether a connector (RMIConnector or ClientIntermediary)
961      * is doing reconnection.
962      * This variable will be set to true by the method <code>preReconnection</code>, and set
963      * to false by <code>postReconnection</code>.
964      * When beingReconnected == true, no thread will be created for fetching notifications.
965      */
966     private boolean beingReconnected = false;
967 
968     private static final ClassLogger logger =
969         new ClassLogger("javax.management.remote.misc",
970                         "ClientNotifForwarder");
971 }
972