1 /* 2 * Copyright (c) OSGi Alliance (2017, 2018). All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package org.osgi.util.promise; 18 19 import static java.util.Objects.requireNonNull; 20 import static org.osgi.util.promise.PromiseImpl.uncaughtException; 21 22 import java.util.ArrayList; 23 import java.util.Collection; 24 import java.util.List; 25 import java.util.concurrent.BlockingQueue; 26 import java.util.concurrent.Callable; 27 import java.util.concurrent.CancellationException; 28 import java.util.concurrent.ExecutionException; 29 import java.util.concurrent.Executor; 30 import java.util.concurrent.Executors; 31 import java.util.concurrent.Future; 32 import java.util.concurrent.RejectedExecutionHandler; 33 import java.util.concurrent.RunnableScheduledFuture; 34 import java.util.concurrent.ScheduledExecutorService; 35 import java.util.concurrent.ScheduledThreadPoolExecutor; 36 import java.util.concurrent.SynchronousQueue; 37 import java.util.concurrent.ThreadFactory; 38 import java.util.concurrent.ThreadPoolExecutor; 39 import java.util.concurrent.TimeUnit; 40 import java.util.concurrent.atomic.AtomicBoolean; 41 import java.util.concurrent.atomic.AtomicInteger; 42 43 import org.osgi.annotation.versioning.ConsumerType; 44 import org.osgi.util.promise.PromiseImpl.Result; 45 46 /** 47 * Promise factory to create Deferred and Promise objects. 48 * <p> 49 * Instances of this class can be used to create Deferred and Promise objects 50 * which use the executors used to construct this object for any callback or 51 * scheduled operation execution. 52 * 53 * @Immutable 54 * @author $Id: 23eb0cc4714c89abe53904985ec6a6d90d898c8a $ 55 * @since 1.1 56 */ 57 @ConsumerType 58 public class PromiseFactory { 59 /** 60 * The default factory which uses the default callback executor and default 61 * scheduled executor. 62 */ 63 final static PromiseFactory defaultFactory = new PromiseFactory( 64 null, null); 65 66 /** 67 * The executor to use for callbacks. If {@code null}, the default callback 68 * executor is used. 69 */ 70 private final Executor callbackExecutor; 71 /** 72 * The executor to use for scheduled operations. If {@code null}, the 73 * default scheduled executor is used. 74 */ 75 private final ScheduledExecutorService scheduledExecutor; 76 77 private final boolean allowCurrentThread; 78 79 /** 80 * Create a new PromiseFactory with the specified callback executor. 81 * <p> 82 * The default scheduled executor will be used. 83 * 84 * @param callbackExecutor The executor to use for callbacks. {@code null} 85 * can be specified for the default callback executor. 86 */ PromiseFactory(Executor callbackExecutor)87 public PromiseFactory(Executor callbackExecutor) { 88 this(callbackExecutor, null); 89 } 90 91 /** 92 * Create a new PromiseFactory with the specified callback executor and 93 * specified scheduled executor. 94 * 95 * @param callbackExecutor The executor to use for callbacks. {@code null} 96 * can be specified for the default callback executor. 97 * @param scheduledExecutor The scheduled executor for use for scheduled 98 * operations. {@code null} can be specified for the default 99 * scheduled executor. 100 */ PromiseFactory(Executor callbackExecutor, ScheduledExecutorService scheduledExecutor)101 public PromiseFactory(Executor callbackExecutor, 102 ScheduledExecutorService scheduledExecutor) { 103 this.callbackExecutor = callbackExecutor; 104 this.scheduledExecutor = scheduledExecutor; 105 allowCurrentThread = Boolean.parseBoolean(System.getProperty( 106 "org.osgi.util.promise.allowCurrentThread", 107 Boolean.TRUE.toString())); 108 109 } 110 111 /** 112 * Returns the executor to use for callbacks. 113 * 114 * @return The executor to use for callbacks. This will be the default 115 * callback executor if {@code null} was specified for the callback 116 * executor when this PromiseFactory was created. 117 */ executor()118 public Executor executor() { 119 if (callbackExecutor == null) { 120 return DefaultExecutors.callbackExecutor(); 121 } 122 return callbackExecutor; 123 } 124 125 /** 126 * Returns the scheduled executor to use for scheduled operations. 127 * 128 * @return The scheduled executor to use for scheduled operations. This will 129 * be the default scheduled executor if {@code null} was specified 130 * for the scheduled executor when this PromiseFactory was created. 131 */ scheduledExecutor()132 public ScheduledExecutorService scheduledExecutor() { 133 if (scheduledExecutor == null) { 134 return DefaultExecutors.scheduledExecutor(); 135 } 136 return scheduledExecutor; 137 } 138 139 /** 140 * Create a new Deferred with the callback executor and scheduled executor 141 * of this PromiseFactory object. 142 * <p> 143 * Use this method instead of {@link Deferred#Deferred()} to create a new 144 * {@link Deferred} whose associated Promise uses executors other than the 145 * default executors. 146 * 147 * @param <T> The value type associated with the returned Deferred. 148 * @return A new {@link Deferred} with the callback and scheduled executors 149 * of this PromiseFactory object 150 */ deferred()151 public <T> Deferred<T> deferred() { 152 return new Deferred<>(this); 153 } 154 155 /** 156 * Returns a new Promise that has been resolved with the specified value. 157 * <p> 158 * The returned Promise uses the callback executor and scheduled executor of 159 * this PromiseFactory object. 160 * <p> 161 * Use this method instead of {@link Promises#resolved(Object)} to create a 162 * Promise which uses executors other than the default executors. 163 * 164 * @param <T> The value type associated with the returned Promise. 165 * @param value The value of the resolved Promise. 166 * @return A new Promise that has been resolved with the specified value. 167 */ resolved(T value)168 public <T> Promise<T> resolved(T value) { 169 return new ResolvedPromiseImpl<>(value, this); 170 } 171 172 /** 173 * Returns a new Promise that has been resolved with the specified failure. 174 * <p> 175 * The returned Promise uses the callback executor and scheduled executor of 176 * this PromiseFactory object. 177 * <p> 178 * Use this method instead of {@link Promises#failed(Throwable)} to create a 179 * Promise which uses executors other than the default executors. 180 * 181 * @param <T> The value type associated with the returned Promise. 182 * @param failure The failure of the resolved Promise. Must not be 183 * {@code null}. 184 * @return A new Promise that has been resolved with the specified failure. 185 */ failed(Throwable failure)186 public <T> Promise<T> failed(Throwable failure) { 187 return new FailedPromiseImpl<>(failure, this); 188 } 189 190 /** 191 * Returns a new Promise that will hold the result of the specified task. 192 * <p> 193 * The returned Promise uses the callback executor and scheduled executor of 194 * this PromiseFactory object. 195 * <p> 196 * The specified task will be executed on the {@link #executor() callback 197 * executor}. 198 * 199 * @param <T> The value type associated with the returned Promise. 200 * @param task The task whose result will be available from the returned 201 * Promise. 202 * @return A new Promise that will hold the result of the specified task. 203 */ submit(Callable< ? extends T> task)204 public <T> Promise<T> submit(Callable< ? extends T> task) { 205 DeferredPromiseImpl<T> promise = new DeferredPromiseImpl<>(this); 206 Runnable submit = promise.new Submit(task); 207 try { 208 executor().execute(submit); 209 } catch (Exception t) { 210 promise.tryResolve(null, t); 211 } 212 return promise.orDone(); 213 } 214 215 /** 216 * Returns a new Promise that is a latch on the resolution of the specified 217 * Promises. 218 * <p> 219 * The returned Promise uses the callback executor and scheduled executor of 220 * this PromiseFactory object. 221 * <p> 222 * The returned Promise acts as a gate and must be resolved after all of the 223 * specified Promises are resolved. 224 * 225 * @param <T> The value type of the List value associated with the returned 226 * Promise. 227 * @param <S> A subtype of the value type of the List value associated with 228 * the returned Promise. 229 * @param promises The Promises which must be resolved before the returned 230 * Promise must be resolved. Must not be {@code null} and all of 231 * the elements in the collection must not be {@code null}. 232 * @return A Promise that must be successfully resolved with a List of the 233 * values in the order of the specified Promises if all the 234 * specified Promises are successfully resolved. The List in the 235 * returned Promise is the property of the caller and is modifiable. 236 * The returned Promise must be resolved with a failure of 237 * {@link FailedPromisesException} if any of the specified Promises 238 * are resolved with a failure. The failure 239 * {@link FailedPromisesException} must contain all of the specified 240 * Promises which resolved with a failure. 241 */ all( Collection<Promise<S>> promises)242 public <T, S extends T> Promise<List<T>> all( 243 Collection<Promise<S>> promises) { 244 if (promises.isEmpty()) { 245 List<T> value = new ArrayList<>(); 246 return resolved(value); 247 } 248 249 /* make a copy and capture the ordering */ 250 List<Promise<S>> list = new ArrayList<>(promises); 251 252 DeferredPromiseImpl<List<T>> chained = new DeferredPromiseImpl<>(this); 253 All<T,S> all = new All<>(chained, list); 254 for (Promise<S> p : list) { 255 p.onResolve(all); 256 } 257 return chained.orDone(); 258 } 259 260 /** 261 * A callback used to resolve the specified Promise when the specified list 262 * of Promises are resolved for the {@link PromiseFactory#all(Collection)} 263 * method. 264 * 265 * @ThreadSafe 266 */ 267 private static final class All<T, S extends T> implements Runnable { 268 private final DeferredPromiseImpl<List<T>> chained; 269 private final List<Promise<S>> promises; 270 private final AtomicInteger promiseCount; 271 All(DeferredPromiseImpl<List<T>> chained, List<Promise<S>> promises)272 All(DeferredPromiseImpl<List<T>> chained, List<Promise<S>> promises) { 273 this.chained = requireNonNull(chained); 274 this.promises = requireNonNull(promises); 275 this.promiseCount = new AtomicInteger(promises.size()); 276 } 277 278 @Override run()279 public void run() { 280 if (promiseCount.decrementAndGet() != 0) { 281 return; 282 } 283 List<T> value = new ArrayList<>(promises.size()); 284 List<Promise< ? >> failed = new ArrayList<>(promises.size()); 285 Throwable cause = null; 286 for (Promise<S> p : promises) { 287 Result<S> result = PromiseImpl.collect(p); 288 if (result.fail != null) { 289 failed.add(p); 290 if (cause == null) { 291 cause = result.fail; 292 } 293 } else { 294 value.add(result.value); 295 } 296 } 297 if (failed.isEmpty()) { 298 chained.tryResolve(value, null); 299 } else { 300 chained.tryResolve(null, 301 new FailedPromisesException(failed, cause)); 302 } 303 } 304 } 305 306 /** 307 * Returns an Executor implementation that executes tasks immediately on the 308 * thread calling the {@code Executor.execute} method. 309 * 310 * @return An Executor implementation that executes tasks immediately on the 311 * thread calling the {@code Executor.execute} method. 312 */ inlineExecutor()313 public static Executor inlineExecutor() { 314 return new InlineExecutor(); 315 } 316 allowCurrentThread()317 boolean allowCurrentThread() { 318 return allowCurrentThread; 319 } 320 321 /** 322 * An Executor implementation which executes the task immediately on the 323 * thread calling the {@code Executor.execute} method. 324 * 325 * @Immutable 326 */ 327 private static final class InlineExecutor implements Executor { InlineExecutor()328 InlineExecutor() {} 329 330 @Override execute(Runnable callback)331 public void execute(Runnable callback) { 332 callback.run(); 333 } 334 } 335 336 /** 337 * Default executors for Promises. 338 * 339 * @Immutable 340 */ 341 private static final class DefaultExecutors 342 implements ThreadFactory, RejectedExecutionHandler, Runnable { 343 private static final DefaultExecutors callbacks; 344 private static final ScheduledExecutor scheduledExecutor; 345 private static final ThreadPoolExecutor callbackExecutor; 346 static { 347 callbacks = new DefaultExecutors(); 348 scheduledExecutor = new ScheduledExecutor(2, callbacks); 349 callbackExecutor = new ThreadPoolExecutor(0, 64, 60L, 350 TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 351 callbacks, callbacks); 352 } 353 callbackExecutor()354 static Executor callbackExecutor() { 355 return callbackExecutor; 356 } 357 scheduledExecutor()358 static ScheduledExecutorService scheduledExecutor() { 359 return scheduledExecutor; 360 } 361 362 private final AtomicBoolean shutdownHookInstalled; 363 private final ThreadFactory delegateThreadFactory; 364 DefaultExecutors()365 private DefaultExecutors() { 366 shutdownHookInstalled = new AtomicBoolean(); 367 delegateThreadFactory = Executors.defaultThreadFactory(); 368 } 369 370 /** 371 * Executor threads should not prevent VM from exiting. 372 */ 373 @Override newThread(Runnable r)374 public Thread newThread(Runnable r) { 375 if (shutdownHookInstalled.compareAndSet(false, true)) { 376 Thread shutdownThread = delegateThreadFactory.newThread(this); 377 shutdownThread.setName( 378 "ExecutorShutdownHook," + shutdownThread.getName()); 379 try { 380 Runtime.getRuntime().addShutdownHook(shutdownThread); 381 } catch (IllegalStateException e) { 382 // VM is already shutting down... 383 callbackExecutor.shutdown(); 384 scheduledExecutor.shutdown(); 385 } 386 } 387 Thread t = delegateThreadFactory.newThread(r); 388 t.setName("PromiseFactory," + t.getName()); 389 t.setDaemon(true); 390 return t; 391 } 392 393 /** 394 * Call the callback using the caller's thread because the thread pool 395 * rejected the execution. 396 */ 397 @Override rejectedExecution(Runnable callback, ThreadPoolExecutor executor)398 public void rejectedExecution(Runnable callback, 399 ThreadPoolExecutor executor) { 400 try { 401 callback.run(); 402 } catch (Throwable t) { 403 uncaughtException(t); 404 } 405 } 406 407 /** 408 * Shutdown hook 409 */ 410 @Override run()411 public void run() { 412 // limit new thread creation 413 callbackExecutor.setMaximumPoolSize( 414 Math.max(1, callbackExecutor.getPoolSize())); 415 // Run all delayed callbacks now 416 scheduledExecutor.shutdown(); 417 BlockingQueue<Runnable> queue = scheduledExecutor.getQueue(); 418 if (!queue.isEmpty()) { 419 for (Object r : queue.toArray()) { 420 if (r instanceof RunnableScheduledFuture< ? >) { 421 RunnableScheduledFuture< ? > future = (RunnableScheduledFuture< ? >) r; 422 if ((future.getDelay(TimeUnit.NANOSECONDS) > 0L) 423 && queue.remove(future)) { 424 future.run(); 425 scheduledExecutor.afterExecute(future, null); 426 } 427 } 428 } 429 scheduledExecutor.shutdown(); 430 } 431 try { 432 scheduledExecutor.awaitTermination(20, TimeUnit.SECONDS); 433 } catch (InterruptedException e) { 434 Thread.currentThread().interrupt(); 435 } 436 // Shutdown callback executor 437 callbackExecutor.shutdown(); 438 try { 439 callbackExecutor.awaitTermination(20, TimeUnit.SECONDS); 440 } catch (InterruptedException e) { 441 Thread.currentThread().interrupt(); 442 } 443 } 444 445 /** 446 * ScheduledThreadPoolExecutor for scheduled execution. 447 * 448 * @ThreadSafe 449 */ 450 private static final class ScheduledExecutor 451 extends ScheduledThreadPoolExecutor { ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory)452 ScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) { 453 super(corePoolSize, threadFactory); 454 } 455 456 /** 457 * Handle uncaught exceptions 458 */ 459 @Override afterExecute(Runnable r, Throwable t)460 protected void afterExecute(Runnable r, Throwable t) { 461 super.afterExecute(r, t); 462 if ((t == null) && (r instanceof Future< ? >)) { 463 boolean interrupted = Thread.interrupted(); 464 try { 465 ((Future< ? >) r).get(); 466 } catch (CancellationException e) { 467 // ignore 468 } catch (InterruptedException e) { 469 interrupted = true; 470 } catch (ExecutionException e) { 471 t = e.getCause(); 472 } finally { 473 if (interrupted) { // restore interrupt status 474 Thread.currentThread().interrupt(); 475 } 476 } 477 } 478 if (t != null) { 479 uncaughtException(t); 480 } 481 } 482 } 483 } 484 } 485