1 /*
2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 package org.reactivestreams;
25 
26 import java.util.concurrent.Flow;
27 import static java.util.Objects.requireNonNull;
28 
29 /**
30  * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
31  */
32 public final class FlowAdapters {
33     /** Utility class. */
FlowAdapters()34     private FlowAdapters() {
35         throw new IllegalStateException("No instances!");
36     }
37 
38     /**
39      * Converts a Flow Publisher into a Reactive Streams Publisher.
40      * @param <T> the element type
41      * @param flowPublisher the source Flow Publisher to convert
42      * @return the equivalent Reactive Streams Publisher
43      */
44     @SuppressWarnings("unchecked")
toPublisher( Flow.Publisher<? extends T> flowPublisher)45     public static <T> org.reactivestreams.Publisher<T> toPublisher(
46             Flow.Publisher<? extends T> flowPublisher) {
47         requireNonNull(flowPublisher, "flowPublisher");
48         final org.reactivestreams.Publisher<T> publisher;
49         if (flowPublisher instanceof FlowPublisherFromReactive) {
50             publisher = (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
51         } else if (flowPublisher instanceof org.reactivestreams.Publisher) {
52             publisher = (org.reactivestreams.Publisher<T>)flowPublisher;
53         } else {
54             publisher = new ReactivePublisherFromFlow<T>(flowPublisher);
55         }
56         return publisher;
57     }
58 
59     /**
60      * Converts a Reactive Streams Publisher into a Flow Publisher.
61      * @param <T> the element type
62      * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert
63      * @return the equivalent Flow Publisher
64      */
65     @SuppressWarnings("unchecked")
toFlowPublisher( org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher )66     public static <T> Flow.Publisher<T> toFlowPublisher(
67             org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
68     ) {
69         requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher");
70         final Flow.Publisher<T> flowPublisher;
71         if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
72             flowPublisher = (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
73         } else if (reactiveStreamsPublisher instanceof Flow.Publisher) {
74             flowPublisher = (Flow.Publisher<T>)reactiveStreamsPublisher;
75         } else {
76             flowPublisher = new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
77         }
78         return flowPublisher;
79     }
80 
81     /**
82      * Converts a Flow Processor into a Reactive Streams Processor.
83      * @param <T> the input value type
84      * @param <U> the output value type
85      * @param flowProcessor the source Flow Processor to convert
86      * @return the equivalent Reactive Streams Processor
87      */
88     @SuppressWarnings("unchecked")
toProcessor( Flow.Processor<? super T, ? extends U> flowProcessor )89     public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
90             Flow.Processor<? super T, ? extends U> flowProcessor
91     ) {
92         requireNonNull(flowProcessor, "flowProcessor");
93         final org.reactivestreams.Processor<T, U> processor;
94         if (flowProcessor instanceof FlowToReactiveProcessor) {
95             processor = (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
96         } else if (flowProcessor instanceof org.reactivestreams.Processor) {
97             processor = (org.reactivestreams.Processor<T, U>)flowProcessor;
98         } else {
99             processor = new ReactiveToFlowProcessor<T, U>(flowProcessor);
100         }
101         return processor;
102     }
103 
104     /**
105      * Converts a Reactive Streams Processor into a Flow Processor.
106      * @param <T> the input value type
107      * @param <U> the output value type
108      * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert
109      * @return the equivalent Flow Processor
110      */
111     @SuppressWarnings("unchecked")
toFlowProcessor( org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor )112     public static <T, U> Flow.Processor<T, U> toFlowProcessor(
113             org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
114         ) {
115         requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor");
116         final Flow.Processor<T, U> flowProcessor;
117         if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
118             flowProcessor = (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
119         } else if (reactiveStreamsProcessor instanceof Flow.Processor) {
120             flowProcessor = (Flow.Processor<T, U>)reactiveStreamsProcessor;
121         } else {
122             flowProcessor = new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
123         }
124         return flowProcessor;
125     }
126 
127     /**
128      * Converts a Reactive Streams Subscriber into a Flow Subscriber.
129      * @param <T> the input and output value type
130      * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
131      * @return the equivalent Flow Subscriber
132      */
133     @SuppressWarnings("unchecked")
toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber)134     public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
135         requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber");
136         final Flow.Subscriber<T> flowSubscriber;
137         if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
138             flowSubscriber = (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
139         } else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
140             flowSubscriber = (Flow.Subscriber<T>)reactiveStreamsSubscriber;
141         } else {
142             flowSubscriber = new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
143         }
144         return flowSubscriber;
145     }
146 
147     /**
148      * Converts a Flow Subscriber into a Reactive Streams Subscriber.
149      * @param <T> the input and output value type
150      * @param flowSubscriber the Flow Subscriber instance to convert
151      * @return the equivalent Reactive Streams Subscriber
152      */
153     @SuppressWarnings("unchecked")
toSubscriber(Flow.Subscriber<T> flowSubscriber)154     public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
155         requireNonNull(flowSubscriber, "flowSubscriber");
156         final org.reactivestreams.Subscriber<T> subscriber;
157         if (flowSubscriber instanceof FlowToReactiveSubscriber) {
158             subscriber = (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
159         } else if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
160             subscriber = (org.reactivestreams.Subscriber<T>)flowSubscriber;
161         } else {
162             subscriber = new ReactiveToFlowSubscriber<T>(flowSubscriber);
163         }
164         return subscriber;
165     }
166 
167     /**
168      * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription.
169      */
170     static final class FlowToReactiveSubscription implements Flow.Subscription {
171         final org.reactivestreams.Subscription reactiveStreams;
172 
FlowToReactiveSubscription(org.reactivestreams.Subscription reactive)173         public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
174             this.reactiveStreams = reactive;
175         }
176 
177         @Override
request(long n)178         public void request(long n) {
179             reactiveStreams.request(n);
180         }
181 
182         @Override
cancel()183         public void cancel() {
184             reactiveStreams.cancel();
185         }
186 
187     }
188 
189     /**
190      * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription.
191      */
192     static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
193         final Flow.Subscription flow;
194 
ReactiveToFlowSubscription(Flow.Subscription flow)195         public ReactiveToFlowSubscription(Flow.Subscription flow) {
196             this.flow = flow;
197         }
198 
199         @Override
request(long n)200         public void request(long n) {
201             flow.request(n);
202         }
203 
204         @Override
cancel()205         public void cancel() {
206             flow.cancel();
207         }
208 
209 
210     }
211 
212     /**
213      * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
214      * @param <T> the element type
215      */
216     static final class FlowToReactiveSubscriber<T> implements Flow.Subscriber<T> {
217         final org.reactivestreams.Subscriber<? super T> reactiveStreams;
218 
FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive)219         public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
220             this.reactiveStreams = reactive;
221         }
222 
223         @Override
onSubscribe(Flow.Subscription subscription)224         public void onSubscribe(Flow.Subscription subscription) {
225             reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
226         }
227 
228         @Override
onNext(T item)229         public void onNext(T item) {
230             reactiveStreams.onNext(item);
231         }
232 
233         @Override
onError(Throwable throwable)234         public void onError(Throwable throwable) {
235             reactiveStreams.onError(throwable);
236         }
237 
238         @Override
onComplete()239         public void onComplete() {
240             reactiveStreams.onComplete();
241         }
242 
243     }
244 
245     /**
246      * Wraps a Flow Subscriber and forwards methods of the Reactive Streams Subscriber to it.
247      * @param <T> the element type
248      */
249     static final class ReactiveToFlowSubscriber<T> implements org.reactivestreams.Subscriber<T> {
250         final Flow.Subscriber<? super T> flow;
251 
ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow)252         public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
253             this.flow = flow;
254         }
255 
256         @Override
onSubscribe(org.reactivestreams.Subscription subscription)257         public void onSubscribe(org.reactivestreams.Subscription subscription) {
258             flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
259         }
260 
261         @Override
onNext(T item)262         public void onNext(T item) {
263             flow.onNext(item);
264         }
265 
266         @Override
onError(Throwable throwable)267         public void onError(Throwable throwable) {
268             flow.onError(throwable);
269         }
270 
271         @Override
onComplete()272         public void onComplete() {
273             flow.onComplete();
274         }
275 
276     }
277 
278     /**
279      * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it.
280      * @param <T> the input type
281      * @param <U> the output type
282      */
283     static final class ReactiveToFlowProcessor<T, U> implements org.reactivestreams.Processor<T, U> {
284         final Flow.Processor<? super T, ? extends U> flow;
285 
ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow)286         public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
287             this.flow = flow;
288         }
289 
290         @Override
onSubscribe(org.reactivestreams.Subscription subscription)291         public void onSubscribe(org.reactivestreams.Subscription subscription) {
292             flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
293         }
294 
295         @Override
onNext(T t)296         public void onNext(T t) {
297             flow.onNext(t);
298         }
299 
300         @Override
onError(Throwable t)301         public void onError(Throwable t) {
302             flow.onError(t);
303         }
304 
305         @Override
onComplete()306         public void onComplete() {
307             flow.onComplete();
308         }
309 
310         @Override
subscribe(org.reactivestreams.Subscriber<? super U> s)311         public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
312             flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s));
313         }
314     }
315 
316     /**
317      * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it.
318      * @param <T> the input type
319      * @param <U> the output type
320      */
321     static final class FlowToReactiveProcessor<T, U> implements Flow.Processor<T, U> {
322         final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
323 
FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive)324         public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
325             this.reactiveStreams = reactive;
326         }
327 
328         @Override
onSubscribe(Flow.Subscription subscription)329         public void onSubscribe(Flow.Subscription subscription) {
330             reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
331         }
332 
333         @Override
onNext(T t)334         public void onNext(T t) {
335             reactiveStreams.onNext(t);
336         }
337 
338         @Override
onError(Throwable t)339         public void onError(Throwable t) {
340             reactiveStreams.onError(t);
341         }
342 
343         @Override
onComplete()344         public void onComplete() {
345             reactiveStreams.onComplete();
346         }
347 
348         @Override
subscribe(Flow.Subscriber<? super U> s)349         public void subscribe(Flow.Subscriber<? super U> s) {
350             reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s));
351         }
352     }
353 
354     /**
355      * Reactive Streams Publisher that wraps a Flow Publisher.
356      * @param <T> the element type
357      */
358     static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> {
359         final Flow.Publisher<? extends T> flow;
360 
ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher)361         public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
362             this.flow = flowPublisher;
363         }
364 
365         @Override
subscribe(org.reactivestreams.Subscriber<? super T> reactive)366         public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
367             flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive));
368         }
369     }
370 
371     /**
372      * Flow Publisher that wraps a Reactive Streams Publisher.
373      * @param <T> the element type
374      */
375     static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> {
376 
377         final org.reactivestreams.Publisher<? extends T> reactiveStreams;
378 
FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher)379         public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) {
380             this.reactiveStreams = reactivePublisher;
381         }
382 
383         @Override
subscribe(Flow.Subscriber<? super T> flow)384         public void subscribe(Flow.Subscriber<? super T> flow) {
385             reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow));
386         }
387     }
388 
389 }
390