1 /* 2 * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 package org.reactivestreams; 25 26 import java.util.concurrent.Flow; 27 import static java.util.Objects.requireNonNull; 28 29 /** 30 * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API. 31 */ 32 public final class FlowAdapters { 33 /** Utility class. */ FlowAdapters()34 private FlowAdapters() { 35 throw new IllegalStateException("No instances!"); 36 } 37 38 /** 39 * Converts a Flow Publisher into a Reactive Streams Publisher. 40 * @param <T> the element type 41 * @param flowPublisher the source Flow Publisher to convert 42 * @return the equivalent Reactive Streams Publisher 43 */ 44 @SuppressWarnings("unchecked") toPublisher( Flow.Publisher<? extends T> flowPublisher)45 public static <T> org.reactivestreams.Publisher<T> toPublisher( 46 Flow.Publisher<? extends T> flowPublisher) { 47 requireNonNull(flowPublisher, "flowPublisher"); 48 final org.reactivestreams.Publisher<T> publisher; 49 if (flowPublisher instanceof FlowPublisherFromReactive) { 50 publisher = (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams); 51 } else if (flowPublisher instanceof org.reactivestreams.Publisher) { 52 publisher = (org.reactivestreams.Publisher<T>)flowPublisher; 53 } else { 54 publisher = new ReactivePublisherFromFlow<T>(flowPublisher); 55 } 56 return publisher; 57 } 58 59 /** 60 * Converts a Reactive Streams Publisher into a Flow Publisher. 61 * @param <T> the element type 62 * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert 63 * @return the equivalent Flow Publisher 64 */ 65 @SuppressWarnings("unchecked") toFlowPublisher( org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher )66 public static <T> Flow.Publisher<T> toFlowPublisher( 67 org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher 68 ) { 69 requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher"); 70 final Flow.Publisher<T> flowPublisher; 71 if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) { 72 flowPublisher = (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow); 73 } else if (reactiveStreamsPublisher instanceof Flow.Publisher) { 74 flowPublisher = (Flow.Publisher<T>)reactiveStreamsPublisher; 75 } else { 76 flowPublisher = new FlowPublisherFromReactive<T>(reactiveStreamsPublisher); 77 } 78 return flowPublisher; 79 } 80 81 /** 82 * Converts a Flow Processor into a Reactive Streams Processor. 83 * @param <T> the input value type 84 * @param <U> the output value type 85 * @param flowProcessor the source Flow Processor to convert 86 * @return the equivalent Reactive Streams Processor 87 */ 88 @SuppressWarnings("unchecked") toProcessor( Flow.Processor<? super T, ? extends U> flowProcessor )89 public static <T, U> org.reactivestreams.Processor<T, U> toProcessor( 90 Flow.Processor<? super T, ? extends U> flowProcessor 91 ) { 92 requireNonNull(flowProcessor, "flowProcessor"); 93 final org.reactivestreams.Processor<T, U> processor; 94 if (flowProcessor instanceof FlowToReactiveProcessor) { 95 processor = (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams); 96 } else if (flowProcessor instanceof org.reactivestreams.Processor) { 97 processor = (org.reactivestreams.Processor<T, U>)flowProcessor; 98 } else { 99 processor = new ReactiveToFlowProcessor<T, U>(flowProcessor); 100 } 101 return processor; 102 } 103 104 /** 105 * Converts a Reactive Streams Processor into a Flow Processor. 106 * @param <T> the input value type 107 * @param <U> the output value type 108 * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert 109 * @return the equivalent Flow Processor 110 */ 111 @SuppressWarnings("unchecked") toFlowProcessor( org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor )112 public static <T, U> Flow.Processor<T, U> toFlowProcessor( 113 org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor 114 ) { 115 requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor"); 116 final Flow.Processor<T, U> flowProcessor; 117 if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) { 118 flowProcessor = (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow); 119 } else if (reactiveStreamsProcessor instanceof Flow.Processor) { 120 flowProcessor = (Flow.Processor<T, U>)reactiveStreamsProcessor; 121 } else { 122 flowProcessor = new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor); 123 } 124 return flowProcessor; 125 } 126 127 /** 128 * Converts a Reactive Streams Subscriber into a Flow Subscriber. 129 * @param <T> the input and output value type 130 * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert 131 * @return the equivalent Flow Subscriber 132 */ 133 @SuppressWarnings("unchecked") toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber)134 public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) { 135 requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber"); 136 final Flow.Subscriber<T> flowSubscriber; 137 if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) { 138 flowSubscriber = (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow; 139 } else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) { 140 flowSubscriber = (Flow.Subscriber<T>)reactiveStreamsSubscriber; 141 } else { 142 flowSubscriber = new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber); 143 } 144 return flowSubscriber; 145 } 146 147 /** 148 * Converts a Flow Subscriber into a Reactive Streams Subscriber. 149 * @param <T> the input and output value type 150 * @param flowSubscriber the Flow Subscriber instance to convert 151 * @return the equivalent Reactive Streams Subscriber 152 */ 153 @SuppressWarnings("unchecked") toSubscriber(Flow.Subscriber<T> flowSubscriber)154 public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) { 155 requireNonNull(flowSubscriber, "flowSubscriber"); 156 final org.reactivestreams.Subscriber<T> subscriber; 157 if (flowSubscriber instanceof FlowToReactiveSubscriber) { 158 subscriber = (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams; 159 } else if (flowSubscriber instanceof org.reactivestreams.Subscriber) { 160 subscriber = (org.reactivestreams.Subscriber<T>)flowSubscriber; 161 } else { 162 subscriber = new ReactiveToFlowSubscriber<T>(flowSubscriber); 163 } 164 return subscriber; 165 } 166 167 /** 168 * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription. 169 */ 170 static final class FlowToReactiveSubscription implements Flow.Subscription { 171 final org.reactivestreams.Subscription reactiveStreams; 172 FlowToReactiveSubscription(org.reactivestreams.Subscription reactive)173 public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) { 174 this.reactiveStreams = reactive; 175 } 176 177 @Override request(long n)178 public void request(long n) { 179 reactiveStreams.request(n); 180 } 181 182 @Override cancel()183 public void cancel() { 184 reactiveStreams.cancel(); 185 } 186 187 } 188 189 /** 190 * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription. 191 */ 192 static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription { 193 final Flow.Subscription flow; 194 ReactiveToFlowSubscription(Flow.Subscription flow)195 public ReactiveToFlowSubscription(Flow.Subscription flow) { 196 this.flow = flow; 197 } 198 199 @Override request(long n)200 public void request(long n) { 201 flow.request(n); 202 } 203 204 @Override cancel()205 public void cancel() { 206 flow.cancel(); 207 } 208 209 210 } 211 212 /** 213 * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. 214 * @param <T> the element type 215 */ 216 static final class FlowToReactiveSubscriber<T> implements Flow.Subscriber<T> { 217 final org.reactivestreams.Subscriber<? super T> reactiveStreams; 218 FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive)219 public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) { 220 this.reactiveStreams = reactive; 221 } 222 223 @Override onSubscribe(Flow.Subscription subscription)224 public void onSubscribe(Flow.Subscription subscription) { 225 reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); 226 } 227 228 @Override onNext(T item)229 public void onNext(T item) { 230 reactiveStreams.onNext(item); 231 } 232 233 @Override onError(Throwable throwable)234 public void onError(Throwable throwable) { 235 reactiveStreams.onError(throwable); 236 } 237 238 @Override onComplete()239 public void onComplete() { 240 reactiveStreams.onComplete(); 241 } 242 243 } 244 245 /** 246 * Wraps a Flow Subscriber and forwards methods of the Reactive Streams Subscriber to it. 247 * @param <T> the element type 248 */ 249 static final class ReactiveToFlowSubscriber<T> implements org.reactivestreams.Subscriber<T> { 250 final Flow.Subscriber<? super T> flow; 251 ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow)252 public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) { 253 this.flow = flow; 254 } 255 256 @Override onSubscribe(org.reactivestreams.Subscription subscription)257 public void onSubscribe(org.reactivestreams.Subscription subscription) { 258 flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); 259 } 260 261 @Override onNext(T item)262 public void onNext(T item) { 263 flow.onNext(item); 264 } 265 266 @Override onError(Throwable throwable)267 public void onError(Throwable throwable) { 268 flow.onError(throwable); 269 } 270 271 @Override onComplete()272 public void onComplete() { 273 flow.onComplete(); 274 } 275 276 } 277 278 /** 279 * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it. 280 * @param <T> the input type 281 * @param <U> the output type 282 */ 283 static final class ReactiveToFlowProcessor<T, U> implements org.reactivestreams.Processor<T, U> { 284 final Flow.Processor<? super T, ? extends U> flow; 285 ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow)286 public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) { 287 this.flow = flow; 288 } 289 290 @Override onSubscribe(org.reactivestreams.Subscription subscription)291 public void onSubscribe(org.reactivestreams.Subscription subscription) { 292 flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); 293 } 294 295 @Override onNext(T t)296 public void onNext(T t) { 297 flow.onNext(t); 298 } 299 300 @Override onError(Throwable t)301 public void onError(Throwable t) { 302 flow.onError(t); 303 } 304 305 @Override onComplete()306 public void onComplete() { 307 flow.onComplete(); 308 } 309 310 @Override subscribe(org.reactivestreams.Subscriber<? super U> s)311 public void subscribe(org.reactivestreams.Subscriber<? super U> s) { 312 flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s)); 313 } 314 } 315 316 /** 317 * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it. 318 * @param <T> the input type 319 * @param <U> the output type 320 */ 321 static final class FlowToReactiveProcessor<T, U> implements Flow.Processor<T, U> { 322 final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams; 323 FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive)324 public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) { 325 this.reactiveStreams = reactive; 326 } 327 328 @Override onSubscribe(Flow.Subscription subscription)329 public void onSubscribe(Flow.Subscription subscription) { 330 reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); 331 } 332 333 @Override onNext(T t)334 public void onNext(T t) { 335 reactiveStreams.onNext(t); 336 } 337 338 @Override onError(Throwable t)339 public void onError(Throwable t) { 340 reactiveStreams.onError(t); 341 } 342 343 @Override onComplete()344 public void onComplete() { 345 reactiveStreams.onComplete(); 346 } 347 348 @Override subscribe(Flow.Subscriber<? super U> s)349 public void subscribe(Flow.Subscriber<? super U> s) { 350 reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s)); 351 } 352 } 353 354 /** 355 * Reactive Streams Publisher that wraps a Flow Publisher. 356 * @param <T> the element type 357 */ 358 static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> { 359 final Flow.Publisher<? extends T> flow; 360 ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher)361 public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) { 362 this.flow = flowPublisher; 363 } 364 365 @Override subscribe(org.reactivestreams.Subscriber<? super T> reactive)366 public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) { 367 flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive)); 368 } 369 } 370 371 /** 372 * Flow Publisher that wraps a Reactive Streams Publisher. 373 * @param <T> the element type 374 */ 375 static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> { 376 377 final org.reactivestreams.Publisher<? extends T> reactiveStreams; 378 FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher)379 public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) { 380 this.reactiveStreams = reactivePublisher; 381 } 382 383 @Override subscribe(Flow.Subscriber<? super T> flow)384 public void subscribe(Flow.Subscriber<? super T> flow) { 385 reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow)); 386 } 387 } 388 389 } 390