1 /*
2  * Copyright (c) OSGi Alliance (2017, 2018). All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package org.osgi.util.promise;
18 
19 import static java.util.Objects.requireNonNull;
20 import static org.osgi.util.promise.PromiseImpl.uncaughtException;
21 
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.CancellationException;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.RejectedExecutionHandler;
33 import java.util.concurrent.RunnableScheduledFuture;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.ScheduledThreadPoolExecutor;
36 import java.util.concurrent.SynchronousQueue;
37 import java.util.concurrent.ThreadFactory;
38 import java.util.concurrent.ThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42 
43 import org.osgi.annotation.versioning.ConsumerType;
44 import org.osgi.util.promise.PromiseImpl.Result;
45 
46 /**
47  * Promise factory to create Deferred and Promise objects.
48  * <p>
49  * Instances of this class can be used to create Deferred and Promise objects
50  * which use the executors used to construct this object for any callback or
51  * scheduled operation execution.
52  *
53  * @Immutable
54  * @author $Id: 23eb0cc4714c89abe53904985ec6a6d90d898c8a $
55  * @since 1.1
56  */
57 @ConsumerType
58 public class PromiseFactory {
59 	/**
60 	 * The default factory which uses the default callback executor and default
61 	 * scheduled executor.
62 	 */
63 	final static PromiseFactory				defaultFactory	= new PromiseFactory(
64 			null, null);
65 
66 	/**
67 	 * The executor to use for callbacks. If {@code null}, the default callback
68 	 * executor is used.
69 	 */
70 	private final Executor					callbackExecutor;
71 	/**
72 	 * The executor to use for scheduled operations. If {@code null}, the
73 	 * default scheduled executor is used.
74 	 */
75 	private final ScheduledExecutorService	scheduledExecutor;
76 
77 	private final boolean					allowCurrentThread;
78 
79 	/**
80 	 * Create a new PromiseFactory with the specified callback executor.
81 	 * <p>
82 	 * The default scheduled executor will be used.
83 	 *
84 	 * @param callbackExecutor The executor to use for callbacks. {@code null}
85 	 *            can be specified for the default callback executor.
86 	 */
PromiseFactory(Executor callbackExecutor)87 	public PromiseFactory(Executor callbackExecutor) {
88 		this(callbackExecutor, null);
89 	}
90 
91 	/**
92 	 * Create a new PromiseFactory with the specified callback executor and
93 	 * specified scheduled executor.
94 	 *
95 	 * @param callbackExecutor The executor to use for callbacks. {@code null}
96 	 *            can be specified for the default callback executor.
97 	 * @param scheduledExecutor The scheduled executor for use for scheduled
98 	 *            operations. {@code null} can be specified for the default
99 	 *            scheduled executor.
100 	 */
PromiseFactory(Executor callbackExecutor, ScheduledExecutorService scheduledExecutor)101 	public PromiseFactory(Executor callbackExecutor,
102 			ScheduledExecutorService scheduledExecutor) {
103 		this.callbackExecutor = callbackExecutor;
104 		this.scheduledExecutor = scheduledExecutor;
105 		allowCurrentThread = Boolean.parseBoolean(System.getProperty(
106 				"org.osgi.util.promise.allowCurrentThread",
107 				Boolean.TRUE.toString()));
108 
109 	}
110 
111 	/**
112 	 * Returns the executor to use for callbacks.
113 	 *
114 	 * @return The executor to use for callbacks. This will be the default
115 	 *         callback executor if {@code null} was specified for the callback
116 	 *         executor when this PromiseFactory was created.
117 	 */
executor()118 	public Executor executor() {
119 		if (callbackExecutor == null) {
120 			return DefaultExecutors.callbackExecutor();
121 		}
122 		return callbackExecutor;
123 	}
124 
125 	/**
126 	 * Returns the scheduled executor to use for scheduled operations.
127 	 *
128 	 * @return The scheduled executor to use for scheduled operations. This will
129 	 *         be the default scheduled executor if {@code null} was specified
130 	 *         for the scheduled executor when this PromiseFactory was created.
131 	 */
scheduledExecutor()132 	public ScheduledExecutorService scheduledExecutor() {
133 		if (scheduledExecutor == null) {
134 			return DefaultExecutors.scheduledExecutor();
135 		}
136 		return scheduledExecutor;
137 	}
138 
139 	/**
140 	 * Create a new Deferred with the callback executor and scheduled executor
141 	 * of this PromiseFactory object.
142 	 * <p>
143 	 * Use this method instead of {@link Deferred#Deferred()} to create a new
144 	 * {@link Deferred} whose associated Promise uses executors other than the
145 	 * default executors.
146 	 *
147 	 * @param <T> The value type associated with the returned Deferred.
148 	 * @return A new {@link Deferred} with the callback and scheduled executors
149 	 *         of this PromiseFactory object
150 	 */
deferred()151 	public <T> Deferred<T> deferred() {
152 		return new Deferred<>(this);
153 	}
154 
155 	/**
156 	 * Returns a new Promise that has been resolved with the specified value.
157 	 * <p>
158 	 * The returned Promise uses the callback executor and scheduled executor of
159 	 * this PromiseFactory object.
160 	 * <p>
161 	 * Use this method instead of {@link Promises#resolved(Object)} to create a
162 	 * Promise which uses executors other than the default executors.
163 	 *
164 	 * @param <T> The value type associated with the returned Promise.
165 	 * @param value The value of the resolved Promise.
166 	 * @return A new Promise that has been resolved with the specified value.
167 	 */
resolved(T value)168 	public <T> Promise<T> resolved(T value) {
169 		return new ResolvedPromiseImpl<>(value, this);
170 	}
171 
172 	/**
173 	 * Returns a new Promise that has been resolved with the specified failure.
174 	 * <p>
175 	 * The returned Promise uses the callback executor and scheduled executor of
176 	 * this PromiseFactory object.
177 	 * <p>
178 	 * Use this method instead of {@link Promises#failed(Throwable)} to create a
179 	 * Promise which uses executors other than the default executors.
180 	 *
181 	 * @param <T> The value type associated with the returned Promise.
182 	 * @param failure The failure of the resolved Promise. Must not be
183 	 *            {@code null}.
184 	 * @return A new Promise that has been resolved with the specified failure.
185 	 */
failed(Throwable failure)186 	public <T> Promise<T> failed(Throwable failure) {
187 		return new FailedPromiseImpl<>(failure, this);
188 	}
189 
190 	/**
191 	 * Returns a new Promise that will hold the result of the specified task.
192 	 * <p>
193 	 * The returned Promise uses the callback executor and scheduled executor of
194 	 * this PromiseFactory object.
195 	 * <p>
196 	 * The specified task will be executed on the {@link #executor() callback
197 	 * executor}.
198 	 *
199 	 * @param <T> The value type associated with the returned Promise.
200 	 * @param task The task whose result will be available from the returned
201 	 *            Promise.
202 	 * @return A new Promise that will hold the result of the specified task.
203 	 */
submit(Callable< ? extends T> task)204 	public <T> Promise<T> submit(Callable< ? extends T> task) {
205 		DeferredPromiseImpl<T> promise = new DeferredPromiseImpl<>(this);
206 		Runnable submit = promise.new Submit(task);
207 		try {
208 			executor().execute(submit);
209 		} catch (Exception t) {
210 			promise.tryResolve(null, t);
211 		}
212 		return promise.orDone();
213 	}
214 
215 	/**
216 	 * Returns a new Promise that is a latch on the resolution of the specified
217 	 * Promises.
218 	 * <p>
219 	 * The returned Promise uses the callback executor and scheduled executor of
220 	 * this PromiseFactory object.
221 	 * <p>
222 	 * The returned Promise acts as a gate and must be resolved after all of the
223 	 * specified Promises are resolved.
224 	 *
225 	 * @param <T> The value type of the List value associated with the returned
226 	 *            Promise.
227 	 * @param <S> A subtype of the value type of the List value associated with
228 	 *            the returned Promise.
229 	 * @param promises The Promises which must be resolved before the returned
230 	 *            Promise must be resolved. Must not be {@code null} and all of
231 	 *            the elements in the collection must not be {@code null}.
232 	 * @return A Promise that must be successfully resolved with a List of the
233 	 *         values in the order of the specified Promises if all the
234 	 *         specified Promises are successfully resolved. The List in the
235 	 *         returned Promise is the property of the caller and is modifiable.
236 	 *         The returned Promise must be resolved with a failure of
237 	 *         {@link FailedPromisesException} if any of the specified Promises
238 	 *         are resolved with a failure. The failure
239 	 *         {@link FailedPromisesException} must contain all of the specified
240 	 *         Promises which resolved with a failure.
241 	 */
all( Collection<Promise<S>> promises)242 	public <T, S extends T> Promise<List<T>> all(
243 			Collection<Promise<S>> promises) {
244 		if (promises.isEmpty()) {
245 			List<T> value = new ArrayList<>();
246 			return resolved(value);
247 		}
248 
249 		/* make a copy and capture the ordering */
250 		List<Promise<S>> list = new ArrayList<>(promises);
251 
252 		DeferredPromiseImpl<List<T>> chained = new DeferredPromiseImpl<>(this);
253 		All<T,S> all = new All<>(chained, list);
254 		for (Promise<S> p : list) {
255 			p.onResolve(all);
256 		}
257 		return chained.orDone();
258 	}
259 
260 	/**
261 	 * A callback used to resolve the specified Promise when the specified list
262 	 * of Promises are resolved for the {@link PromiseFactory#all(Collection)}
263 	 * method.
264 	 *
265 	 * @ThreadSafe
266 	 */
267 	private static final class All<T, S extends T> implements Runnable {
268 		private final DeferredPromiseImpl<List<T>>	chained;
269 		private final List<Promise<S>>				promises;
270 		private final AtomicInteger					promiseCount;
271 
All(DeferredPromiseImpl<List<T>> chained, List<Promise<S>> promises)272 		All(DeferredPromiseImpl<List<T>> chained, List<Promise<S>> promises) {
273 			this.chained = requireNonNull(chained);
274 			this.promises = requireNonNull(promises);
275 			this.promiseCount = new AtomicInteger(promises.size());
276 		}
277 
278 		@Override
run()279 		public void run() {
280 			if (promiseCount.decrementAndGet() != 0) {
281 				return;
282 			}
283 			List<T> value = new ArrayList<>(promises.size());
284 			List<Promise< ? >> failed = new ArrayList<>(promises.size());
285 			Throwable cause = null;
286 			for (Promise<S> p : promises) {
287 				Result<S> result = PromiseImpl.collect(p);
288 				if (result.fail != null) {
289 					failed.add(p);
290 					if (cause == null) {
291 						cause = result.fail;
292 					}
293 				} else {
294 					value.add(result.value);
295 				}
296 			}
297 			if (failed.isEmpty()) {
298 				chained.tryResolve(value, null);
299 			} else {
300 				chained.tryResolve(null,
301 						new FailedPromisesException(failed, cause));
302 			}
303 		}
304 	}
305 
306 	/**
307 	 * Returns an Executor implementation that executes tasks immediately on the
308 	 * thread calling the {@code Executor.execute} method.
309 	 *
310 	 * @return An Executor implementation that executes tasks immediately on the
311 	 *         thread calling the {@code Executor.execute} method.
312 	 */
inlineExecutor()313 	public static Executor inlineExecutor() {
314 		return new InlineExecutor();
315 	}
316 
allowCurrentThread()317 	boolean allowCurrentThread() {
318 		return allowCurrentThread;
319 	}
320 
321 	/**
322 	 * An Executor implementation which executes the task immediately on the
323 	 * thread calling the {@code Executor.execute} method.
324 	 *
325 	 * @Immutable
326 	 */
327 	private static final class InlineExecutor implements Executor {
InlineExecutor()328 		InlineExecutor() {}
329 
330 		@Override
execute(Runnable callback)331 		public void execute(Runnable callback) {
332 			callback.run();
333 		}
334 	}
335 
336 	/**
337 	 * Default executors for Promises.
338 	 *
339 	 * @Immutable
340 	 */
341 	private static final class DefaultExecutors
342 			implements ThreadFactory, RejectedExecutionHandler, Runnable {
343 		private static final DefaultExecutors	callbacks;
344 		private static final ScheduledExecutor	scheduledExecutor;
345 		private static final ThreadPoolExecutor	callbackExecutor;
346 		static {
347 			callbacks = new DefaultExecutors();
348 			scheduledExecutor = new ScheduledExecutor(2, callbacks);
349 			callbackExecutor = new ThreadPoolExecutor(0, 64, 60L,
350 					TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
351 					callbacks, callbacks);
352 		}
353 
callbackExecutor()354 		static Executor callbackExecutor() {
355 			return callbackExecutor;
356 		}
357 
scheduledExecutor()358 		static ScheduledExecutorService scheduledExecutor() {
359 			return scheduledExecutor;
360 		}
361 
362 		private final AtomicBoolean	shutdownHookInstalled;
363 		private final ThreadFactory	delegateThreadFactory;
364 
DefaultExecutors()365 		private DefaultExecutors() {
366 			shutdownHookInstalled = new AtomicBoolean();
367 			delegateThreadFactory = Executors.defaultThreadFactory();
368 		}
369 
370 		/**
371 		 * Executor threads should not prevent VM from exiting.
372 		 */
373 		@Override
newThread(Runnable r)374 		public Thread newThread(Runnable r) {
375 			if (shutdownHookInstalled.compareAndSet(false, true)) {
376 				Thread shutdownThread = delegateThreadFactory.newThread(this);
377 				shutdownThread.setName(
378 						"ExecutorShutdownHook," + shutdownThread.getName());
379 				try {
380 					Runtime.getRuntime().addShutdownHook(shutdownThread);
381 				} catch (IllegalStateException e) {
382 					// VM is already shutting down...
383 					callbackExecutor.shutdown();
384 					scheduledExecutor.shutdown();
385 				}
386 			}
387 			Thread t = delegateThreadFactory.newThread(r);
388 			t.setName("PromiseFactory," + t.getName());
389 			t.setDaemon(true);
390 			return t;
391 		}
392 
393 		/**
394 		 * Call the callback using the caller's thread because the thread pool
395 		 * rejected the execution.
396 		 */
397 		@Override
rejectedExecution(Runnable callback, ThreadPoolExecutor executor)398 		public void rejectedExecution(Runnable callback,
399 				ThreadPoolExecutor executor) {
400 			try {
401 				callback.run();
402 			} catch (Throwable t) {
403 				uncaughtException(t);
404 			}
405 		}
406 
407 		/**
408 		 * Shutdown hook
409 		 */
410 		@Override
run()411 		public void run() {
412 			// limit new thread creation
413 			callbackExecutor.setMaximumPoolSize(
414 					Math.max(1, callbackExecutor.getPoolSize()));
415 			// Run all delayed callbacks now
416 			scheduledExecutor.shutdown();
417 			BlockingQueue<Runnable> queue = scheduledExecutor.getQueue();
418 			if (!queue.isEmpty()) {
419 				for (Object r : queue.toArray()) {
420 					if (r instanceof RunnableScheduledFuture< ? >) {
421 						RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r;
422 						if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L)
423 								&& queue.remove(future)) {
424 							future.run();
425 							scheduledExecutor.afterExecute(future, null);
426 						}
427 					}
428 				}
429 				scheduledExecutor.shutdown();
430 			}
431 			try {
432 				scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS);
433 			} catch (InterruptedException e) {
434 				Thread.currentThread().interrupt();
435 			}
436 			// Shutdown callback executor
437 			callbackExecutor.shutdown();
438 			try {
439 				callbackExecutor.awaitTermination(20, TimeUnit.SECONDS);
440 			} catch (InterruptedException e) {
441 				Thread.currentThread().interrupt();
442 			}
443 		}
444 
445 		/**
446 		 * ScheduledThreadPoolExecutor for scheduled execution.
447 		 *
448 		 * @ThreadSafe
449 		 */
450 		private static final class ScheduledExecutor
451 				extends ScheduledThreadPoolExecutor {
ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory)452 			ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) {
453 				super(corePoolSize, threadFactory);
454 			}
455 
456 			/**
457 			 * Handle uncaught exceptions
458 			 */
459 			@Override
afterExecute(Runnable r, Throwable t)460 			protected void afterExecute(Runnable r, Throwable t) {
461 				super.afterExecute(r, t);
462 				if ((t == null) && (r instanceof Future< ? >)) {
463 					boolean interrupted = Thread.interrupted();
464 					try {
465 						((Future< ? >) r).get();
466 					} catch (CancellationException e) {
467 						// ignore
468 					} catch (InterruptedException e) {
469 						interrupted = true;
470 					} catch (ExecutionException e) {
471 						t = e.getCause();
472 					} finally {
473 						if (interrupted) { // restore interrupt status
474 							Thread.currentThread().interrupt();
475 						}
476 					}
477 				}
478 				if (t != null) {
479 					uncaughtException(t);
480 				}
481 			}
482 		}
483 	}
484 }
485