1 /*
2  * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http.common;
27 
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.atomic.AtomicInteger;
30 
31 import static java.util.Objects.requireNonNull;
32 
33 /**
34  * A scheduler of ( repeatable ) tasks that MUST be run sequentially.
35  *
36  * <p> This class can be used as a synchronization aid that assists a number of
37  * parties in running a task in a mutually exclusive fashion.
38  *
39  * <p> To run the task, a party invokes {@code runOrSchedule}. To permanently
40  * prevent the task from subsequent runs, the party invokes {@code stop}.
41  *
42  * <p> The parties can, but do not have to, operate in different threads.
43  *
44  * <p> The task can be either synchronous ( completes when its {@code run}
45  * method returns ), or asynchronous ( completed when its
46  * {@code DeferredCompleter} is explicitly completed ).
47  *
48  * <p> The next run of the task will not begin until the previous run has
49  * finished.
50  *
51  * <p> The task may invoke {@code runOrSchedule} itself, which may be a normal
52  * situation.
53  */
54 public final class SequentialScheduler {
55 
56     /*
57        Since the task is fixed and known beforehand, no blocking synchronization
58        (locks, queues, etc.) is required. The job can be done solely using
59        nonblocking primitives.
60 
61        The machinery below addresses two problems:
62 
63          1. Running the task in a sequential order (no concurrent runs):
64 
65                 begin, end, begin, end...
66 
67          2. Avoiding indefinite recursion:
68 
69                 begin
70                   end
71                     begin
72                       end
73                         ...
74 
75        Problem #1 is solved with a finite state machine with 4 states:
76 
77            BEGIN, AGAIN, END, and STOP.
78 
79        Problem #2 is solved with a "state modifier" OFFLOAD.
80 
81        Parties invoke `runOrSchedule()` to signal the task must run. A party
82        that has invoked `runOrSchedule()` either begins the task or exploits the
83        party that is either beginning the task or ending it.
84 
85        The party that is trying to end the task either ends it or begins it
86        again.
87 
88        To avoid indefinite recursion, before re-running the task the
89        TryEndDeferredCompleter sets the OFFLOAD bit, signalling to its "child"
90        TryEndDeferredCompleter that this ("parent") TryEndDeferredCompleter is
91        available and the "child" must offload the task on to the "parent". Then
92        a race begins. Whichever invocation of TryEndDeferredCompleter.complete
93        manages to unset OFFLOAD bit first does not do the work.
94 
95        There is at most 1 thread that is beginning the task and at most 2
96        threads that are trying to end it: "parent" and "child". In case of a
97        synchronous task "parent" and "child" are the same thread.
98      */
99 
100     /**
101      * An interface to signal the completion of a {@link RestartableTask}.
102      *
103      * <p> The invocation of {@code complete} completes the task. The invocation
104      * of {@code complete} may restart the task, if an attempt has previously
105      * been made to run the task while it was already running.
106      *
107      * @apiNote {@code DeferredCompleter} is useful when a task is not necessary
108      * complete when its {@code run} method returns, but will complete at a
109      * later time, and maybe in different thread. This type exists for
110      * readability purposes at use-sites only.
111      */
112     public static abstract class DeferredCompleter {
113 
114         /** Extensible from this (outer) class ONLY. */
DeferredCompleter()115         private DeferredCompleter() { }
116 
117         /** Completes the task. Must be called once, and once only. */
complete()118         public abstract void complete();
119     }
120 
121     /**
122      * A restartable task.
123      */
124     @FunctionalInterface
125     public interface RestartableTask {
126 
127         /**
128          * The body of the task.
129          *
130          * @param taskCompleter
131          *         A completer that must be invoked once, and only once,
132          *         when this task is logically finished
133          */
run(DeferredCompleter taskCompleter)134         void run(DeferredCompleter taskCompleter);
135     }
136 
137     /**
138      * A simple and self-contained task that completes once its {@code run}
139      * method returns.
140      */
141     public static abstract class CompleteRestartableTask
142         implements RestartableTask
143     {
144         @Override
run(DeferredCompleter taskCompleter)145         public final void run(DeferredCompleter taskCompleter) {
146             try {
147                 run();
148             } finally {
149                 taskCompleter.complete();
150             }
151         }
152 
153         /** The body of the task. */
run()154         protected abstract void run();
155     }
156 
157     /**
158      * A task that runs its main loop within a synchronized block to provide
159      * memory visibility between runs. Since the main loop can't run concurrently,
160      * the lock shouldn't be contended and no deadlock should ever be possible.
161      */
162     public static final class SynchronizedRestartableTask
163             extends CompleteRestartableTask {
164 
165         private final Runnable mainLoop;
166         private final Object lock = new Object();
167 
SynchronizedRestartableTask(Runnable mainLoop)168         public SynchronizedRestartableTask(Runnable mainLoop) {
169             this.mainLoop = mainLoop;
170         }
171 
172         @Override
run()173         protected void run() {
174             synchronized(lock) {
175                 mainLoop.run();
176             }
177         }
178     }
179 
180     private static final int OFFLOAD =  1;
181     private static final int AGAIN   =  2;
182     private static final int BEGIN   =  4;
183     private static final int STOP    =  8;
184     private static final int END     = 16;
185 
186     private final AtomicInteger state = new AtomicInteger(END);
187     private final RestartableTask restartableTask;
188     private final DeferredCompleter completer;
189     private final SchedulableTask schedulableTask;
190 
191     /**
192      * An auxiliary task that starts the restartable task:
193      * {@code restartableTask.run(completer)}.
194      */
195     private final class SchedulableTask implements Runnable {
196         @Override
run()197         public void run() {
198             restartableTask.run(completer);
199         }
200     }
201 
SequentialScheduler(RestartableTask restartableTask)202     public SequentialScheduler(RestartableTask restartableTask) {
203         this.restartableTask = requireNonNull(restartableTask);
204         this.completer = new TryEndDeferredCompleter();
205         this.schedulableTask = new SchedulableTask();
206     }
207 
208     /**
209      * Runs or schedules the task to be run.
210      *
211      * @implSpec The recursion which is possible here must be bounded:
212      *
213      *  <pre>{@code
214      *     this.runOrSchedule()
215      *         completer.complete()
216      *             this.runOrSchedule()
217      *                 ...
218      * }</pre>
219      *
220      * @implNote The recursion in this implementation has the maximum
221      * depth of 1.
222      */
runOrSchedule()223     public void runOrSchedule() {
224         runOrSchedule(schedulableTask, null);
225     }
226 
227     /**
228      * Executes or schedules the task to be executed in the provided executor.
229      *
230      * <p> This method can be used when potential executing from a calling
231      * thread is not desirable.
232      *
233      * @param executor
234      *         An executor in which to execute the task, if the task needs
235      *         to be executed.
236      *
237      * @apiNote The given executor can be {@code null} in which case calling
238      * {@code runOrSchedule(null)} is strictly equivalent to calling
239      * {@code runOrSchedule()}.
240      */
runOrSchedule(Executor executor)241     public void runOrSchedule(Executor executor) {
242         runOrSchedule(schedulableTask, executor);
243     }
244 
runOrSchedule(SchedulableTask task, Executor executor)245     private void runOrSchedule(SchedulableTask task, Executor executor) {
246         while (true) {
247             int s = state.get();
248             if (s == END) {
249                 if (state.compareAndSet(END, BEGIN)) {
250                     break;
251                 }
252             } else if ((s & BEGIN) != 0) {
253                 // Tries to change the state to AGAIN, preserving OFFLOAD bit
254                 if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) {
255                     return;
256                 }
257             } else if ((s & AGAIN) != 0 || s == STOP) {
258                 /* In the case of AGAIN the scheduler does not provide
259                    happens-before relationship between actions prior to
260                    runOrSchedule() and actions that happen in task.run().
261                    The reason is that no volatile write is done in this case,
262                    and the call piggybacks on the call that has actually set
263                    AGAIN state. */
264                 return;
265             } else {
266                 // Non-existent state, or the one that cannot be offloaded
267                 throw new InternalError(String.valueOf(s));
268             }
269         }
270         if (executor == null) {
271             task.run();
272         } else {
273             executor.execute(task);
274         }
275     }
276 
277     /** The only concrete {@code DeferredCompleter} implementation. */
278     private class TryEndDeferredCompleter extends DeferredCompleter {
279 
280         @Override
complete()281         public void complete() {
282             while (true) {
283                 int s;
284                 while (((s = state.get()) & OFFLOAD) != 0) {
285                     // Tries to offload ending of the task to the parent
286                     if (state.compareAndSet(s, s & ~OFFLOAD)) {
287                         return;
288                     }
289                 }
290                 while (true) {
291                     if ((s & OFFLOAD) != 0) {
292                         /* OFFLOAD bit can never be observed here. Otherwise
293                            it would mean there is another invocation of
294                            "complete" that can run the task. */
295                         throw new InternalError(String.valueOf(s));
296                     }
297                     if (s == BEGIN) {
298                         if (state.compareAndSet(BEGIN, END)) {
299                             return;
300                         }
301                     } else if (s == AGAIN) {
302                         if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) {
303                             break;
304                         }
305                     } else if (s == STOP) {
306                         return;
307                     } else if (s == END) {
308                         throw new IllegalStateException("Duplicate completion");
309                     } else {
310                         // Non-existent state
311                         throw new InternalError(String.valueOf(s));
312                     }
313                     s = state.get();
314                 }
315                 restartableTask.run(completer);
316             }
317         }
318     }
319 
320     /**
321      * Tells whether, or not, this scheduler has been permanently stopped.
322      *
323      * <p> Should be used from inside the task to poll the status of the
324      * scheduler, pretty much the same way as it is done for threads:
325      * <pre>{@code
326      *     if (!Thread.currentThread().isInterrupted()) {
327      *         ...
328      *     }
329      * }</pre>
330      */
isStopped()331     public boolean isStopped() {
332         return state.get() == STOP;
333     }
334 
335     /**
336      * Stops this scheduler.  Subsequent invocations of {@code runOrSchedule}
337      * are effectively no-ops.
338      *
339      * <p> If the task has already begun, this invocation will not affect it,
340      * unless the task itself uses {@code isStopped()} method to check the state
341      * of the handler.
342      */
stop()343     public void stop() {
344         state.set(STOP);
345     }
346 
347     /**
348      * Returns a new {@code SequentialScheduler} that executes the provided
349      * {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
350      *
351      * @apiNote This is equivalent to calling
352      * {@code new SequentialScheduler(new SynchronizedRestartableTask(mainLoop))}
353      * The main loop must not perform any blocking operation.
354      *
355      * @param mainLoop The main loop of the new sequential scheduler
356      * @return a new {@code SequentialScheduler} that executes the provided
357      * {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
358      */
synchronizedScheduler(Runnable mainLoop)359     public static SequentialScheduler synchronizedScheduler(Runnable mainLoop) {
360         return new SequentialScheduler(new SynchronizedRestartableTask(mainLoop));
361     }
362 }
363