1 /*
2  * Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http;
27 
28 import java.nio.ByteBuffer;
29 import java.nio.CharBuffer;
30 import java.nio.charset.CharacterCodingException;
31 import java.nio.charset.Charset;
32 import java.nio.charset.CharsetDecoder;
33 import java.nio.charset.CoderResult;
34 import java.nio.charset.CodingErrorAction;
35 import java.util.List;
36 import java.util.Objects;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.CompletionStage;
39 import java.util.concurrent.ConcurrentLinkedDeque;
40 import java.util.concurrent.Flow;
41 import java.util.concurrent.Flow.Subscriber;
42 import java.util.concurrent.Flow.Subscription;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicLong;
45 import java.util.concurrent.atomic.AtomicReference;
46 import java.util.function.Function;
47 import jdk.internal.net.http.common.Demand;
48 import java.net.http.HttpResponse.BodySubscriber;
49 import jdk.internal.net.http.common.MinimalFuture;
50 import jdk.internal.net.http.common.SequentialScheduler;
51 
52 /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
53 public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
54         implements BodySubscriber<R> {
55     private final CompletableFuture<R> cf = new MinimalFuture<>();
56     private final S subscriber;
57     private final Function<? super S, ? extends R> finisher;
58     private final Charset charset;
59     private final String eol;
60     private final AtomicBoolean subscribed = new AtomicBoolean();
61     private volatile LineSubscription downstream;
62 
LineSubscriberAdapter(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol)63     private LineSubscriberAdapter(S subscriber,
64                                   Function<? super S, ? extends R> finisher,
65                                   Charset charset,
66                                   String eol) {
67         if (eol != null && eol.isEmpty())
68             throw new IllegalArgumentException("empty line separator");
69         this.subscriber = Objects.requireNonNull(subscriber);
70         this.finisher = Objects.requireNonNull(finisher);
71         this.charset = Objects.requireNonNull(charset);
72         this.eol = eol;
73     }
74 
75     @Override
onSubscribe(Subscription subscription)76     public void onSubscribe(Subscription subscription) {
77         Objects.requireNonNull(subscription);
78         if (!subscribed.compareAndSet(false, true)) {
79             subscription.cancel();
80             return;
81         }
82 
83         downstream = LineSubscription.create(subscription,
84                                              charset,
85                                              eol,
86                                              subscriber,
87                                              cf);
88         subscriber.onSubscribe(downstream);
89     }
90 
91     @Override
onNext(List<ByteBuffer> item)92     public void onNext(List<ByteBuffer> item) {
93         Objects.requireNonNull(item);
94         try {
95             downstream.submit(item);
96         } catch (Throwable t) {
97             onError(t);
98         }
99     }
100 
101     @Override
onError(Throwable throwable)102     public void onError(Throwable throwable) {
103         Objects.requireNonNull(throwable);
104         try {
105             downstream.signalError(throwable);
106         } finally {
107             cf.completeExceptionally(throwable);
108         }
109     }
110 
111     @Override
onComplete()112     public void onComplete() {
113         try {
114             downstream.signalComplete();
115         } finally {
116             cf.complete(finisher.apply(subscriber));
117         }
118     }
119 
120     @Override
getBody()121     public CompletionStage<R> getBody() {
122         return cf;
123     }
124 
125     public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
create(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol)126     create(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol)
127     {
128         if (eol != null && eol.isEmpty())
129             throw new IllegalArgumentException("empty line separator");
130         return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
131                 Objects.requireNonNull(finisher),
132                 Objects.requireNonNull(charset),
133                 eol);
134     }
135 
136     static final class LineSubscription implements Flow.Subscription {
137         final Flow.Subscription upstreamSubscription;
138         final CharsetDecoder decoder;
139         final String newline;
140         final Demand downstreamDemand;
141         final ConcurrentLinkedDeque<ByteBuffer> queue;
142         final SequentialScheduler scheduler;
143         final Flow.Subscriber<? super String> upstream;
144         final CompletableFuture<?> cf;
145         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
146         private final AtomicLong demanded = new AtomicLong();
147         private volatile boolean completed;
148         private volatile boolean cancelled;
149 
150         private final char[] chars = new char[1024];
151         private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
152         private final CharBuffer buffer = CharBuffer.wrap(chars);
153         private final StringBuilder builder = new StringBuilder();
154         private String nextLine;
155 
LineSubscription(Flow.Subscription s, CharsetDecoder dec, String separator, Flow.Subscriber<? super String> subscriber, CompletableFuture<?> completion)156         private LineSubscription(Flow.Subscription s,
157                                  CharsetDecoder dec,
158                                  String separator,
159                                  Flow.Subscriber<? super String> subscriber,
160                                  CompletableFuture<?> completion) {
161             downstreamDemand = new Demand();
162             queue = new ConcurrentLinkedDeque<>();
163             upstreamSubscription = Objects.requireNonNull(s);
164             decoder = Objects.requireNonNull(dec);
165             newline = separator;
166             upstream = Objects.requireNonNull(subscriber);
167             cf = Objects.requireNonNull(completion);
168             scheduler = SequentialScheduler.lockingScheduler(this::loop);
169         }
170 
171         @Override
request(long n)172         public void request(long n) {
173             if (cancelled) return;
174             if (downstreamDemand.increase(n)) {
175                 scheduler.runOrSchedule();
176             }
177         }
178 
179         @Override
cancel()180         public void cancel() {
181             cancelled = true;
182             upstreamSubscription.cancel();
183         }
184 
submit(List<ByteBuffer> list)185         public void submit(List<ByteBuffer> list) {
186             queue.addAll(list);
187             demanded.decrementAndGet();
188             scheduler.runOrSchedule();
189         }
190 
signalComplete()191         public void signalComplete() {
192             completed = true;
193             scheduler.runOrSchedule();
194         }
195 
signalError(Throwable error)196         public void signalError(Throwable error) {
197             if (errorRef.compareAndSet(null,
198                     Objects.requireNonNull(error))) {
199                 scheduler.runOrSchedule();
200             }
201         }
202 
203         // This method looks at whether some bytes where left over (in leftover)
204         // from decoding the previous buffer when the previous buffer was in
205         // underflow. If so, it takes bytes one by one from the new buffer 'in'
206         // and combines them with the leftover bytes until 'in' is exhausted or a
207         // character was produced in 'out', resolving the previous underflow.
208         // Returns true if the buffer is still in underflow, false otherwise.
209         // However, in both situation some chars might have been produced in 'out'.
isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)210         private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
211                 throws CharacterCodingException {
212             int limit = leftover.position();
213             if (limit == 0) {
214                 // no leftover
215                 return false;
216             } else {
217                 CoderResult res = null;
218                 while (in.hasRemaining()) {
219                     leftover.position(limit);
220                     leftover.limit(++limit);
221                     leftover.put(in.get());
222                     leftover.position(0);
223                     res = decoder.decode(leftover, out,
224                             endOfInput && !in.hasRemaining());
225                     int remaining = leftover.remaining();
226                     if (remaining > 0) {
227                         assert leftover.position() == 0;
228                         leftover.position(remaining);
229                     } else {
230                         leftover.position(0);
231                     }
232                     leftover.limit(leftover.capacity());
233                     if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
234                         continue;
235                     }
236                     if (res.isError()) {
237                         res.throwException();
238                     }
239                     assert !res.isOverflow();
240                     return false;
241                 }
242                 return !endOfInput;
243             }
244         }
245 
246         // extract characters from start to end and remove them from
247         // the StringBuilder
take(StringBuilder b, int start, int end)248         private static String take(StringBuilder b, int start, int end) {
249             assert start == 0;
250             String line;
251             if (end == start) return "";
252             line = b.substring(start, end);
253             b.delete(start, end);
254             return line;
255         }
256 
257         // finds end of line, returns -1 if not found, or the position after
258         // the line delimiter if found, removing the delimiter in the process.
endOfLine(StringBuilder b, String eol, boolean endOfInput)259         private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
260             int len = b.length();
261             if (eol != null) { // delimiter explicitly specified
262                 int i = b.indexOf(eol);
263                 if (i >= 0) {
264                     // remove the delimiter and returns the position
265                     // of the char after it.
266                     b.delete(i, i + eol.length());
267                     return i;
268                 }
269             } else { // no delimiter specified, behaves as BufferedReader::readLine
270                 boolean crfound = false;
271                 for (int i = 0; i < len; i++) {
272                     char c = b.charAt(i);
273                     if (c == '\n') {
274                         // '\n' or '\r\n' found.
275                         // remove the delimiter and returns the position
276                         // of the char after it.
277                         b.delete(crfound ? i - 1 : i, i + 1);
278                         return crfound ? i - 1 : i;
279                     } else if (crfound) {
280                         // previous char was '\r', c != '\n'
281                         assert i != 0;
282                         // remove the delimiter and returns the position
283                         // of the char after it.
284                         b.delete(i - 1, i);
285                         return i - 1;
286                     }
287                     crfound = c == '\r';
288                 }
289                 if (crfound && endOfInput) {
290                     // remove the delimiter and returns the position
291                     // of the char after it.
292                     b.delete(len - 1, len);
293                     return len - 1;
294                 }
295             }
296             return endOfInput && len > 0 ? len : -1;
297         }
298 
299         // Looks at whether the StringBuilder contains a line.
300         // Returns null if more character are needed.
nextLine(StringBuilder b, String eol, boolean endOfInput)301         private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
302             int next = endOfLine(b, eol, endOfInput);
303             return (next > -1) ? take(b, 0, next) : null;
304         }
305 
306         // Attempts to read the next line. Returns the next line if
307         // the delimiter was found, null otherwise. The delimiters are
308         // consumed.
nextLine()309         private String nextLine()
310                 throws CharacterCodingException {
311             assert nextLine == null;
312             LINES:
313             while (nextLine == null) {
314                 boolean endOfInput = completed && queue.isEmpty();
315                 nextLine = nextLine(builder, newline,
316                         endOfInput && leftover.position() == 0);
317                 if (nextLine != null) return nextLine;
318                 ByteBuffer b;
319                 BUFFERS:
320                 while ((b = queue.peek()) != null) {
321                     if (!b.hasRemaining()) {
322                         queue.poll();
323                         continue BUFFERS;
324                     }
325                     BYTES:
326                     while (b.hasRemaining()) {
327                         buffer.position(0);
328                         buffer.limit(buffer.capacity());
329                         boolean endofInput = completed && queue.size() <= 1;
330                         if (isUnderFlow(b, buffer, endofInput)) {
331                             assert !b.hasRemaining();
332                             if (buffer.position() > 0) {
333                                 buffer.flip();
334                                 builder.append(buffer);
335                             }
336                             continue BUFFERS;
337                         }
338                         CoderResult res = decoder.decode(b, buffer, endofInput);
339                         if (res.isError()) res.throwException();
340                         if (buffer.position() > 0) {
341                             buffer.flip();
342                             builder.append(buffer);
343                             continue LINES;
344                         }
345                         if (res.isUnderflow() && b.hasRemaining()) {
346                             //System.out.println("underflow: adding " + b.remaining() + " bytes");
347                             leftover.put(b);
348                             assert !b.hasRemaining();
349                             continue BUFFERS;
350                         }
351                     }
352                 }
353 
354                 assert queue.isEmpty();
355                 if (endOfInput) {
356                     // Time to cleanup: there may be some undecoded leftover bytes
357                     // We need to flush them out.
358                     // The decoder has been configured to replace malformed/unmappable
359                     // chars with some replacement, in order to behave like
360                     // InputStreamReader.
361                     leftover.flip();
362                     buffer.position(0);
363                     buffer.limit(buffer.capacity());
364 
365                     // decode() must be called just before flush, even if there
366                     // is nothing to decode. We must do this even if leftover
367                     // has no remaining bytes.
368                     CoderResult res = decoder.decode(leftover, buffer, endOfInput);
369                     if (buffer.position() > 0) {
370                         buffer.flip();
371                         builder.append(buffer);
372                     }
373                     if (res.isError()) res.throwException();
374 
375                     // Now call decoder.flush()
376                     buffer.position(0);
377                     buffer.limit(buffer.capacity());
378                     res = decoder.flush(buffer);
379                     if (buffer.position() > 0) {
380                         buffer.flip();
381                         builder.append(buffer);
382                     }
383                     if (res.isError()) res.throwException();
384 
385                     // It's possible that we reach here twice - just for the
386                     // purpose of checking that no bytes were left over, so
387                     // we reset leftover/decoder to make the function reentrant.
388                     leftover.position(0);
389                     leftover.limit(leftover.capacity());
390                     decoder.reset();
391 
392                     // if some chars were produced then this call will
393                     // return them.
394                     return nextLine = nextLine(builder, newline, endOfInput);
395                 }
396                 return null;
397             }
398             return null;
399         }
400 
401         // The main sequential scheduler loop.
loop()402         private void loop() {
403             try {
404                 while (!cancelled) {
405                     Throwable error = errorRef.get();
406                     if (error != null) {
407                         cancelled = true;
408                         scheduler.stop();
409                         upstream.onError(error);
410                         cf.completeExceptionally(error);
411                         return;
412                     }
413                     if (nextLine == null) nextLine = nextLine();
414                     if (nextLine == null) {
415                         if (completed) {
416                             scheduler.stop();
417                             if (leftover.position() != 0) {
418                                 // Underflow: not all bytes could be
419                                 // decoded, but no more bytes will be coming.
420                                 // This should not happen as we should already
421                                 // have got a MalformedInputException, or
422                                 // replaced the unmappable chars.
423                                 errorRef.compareAndSet(null,
424                                         new IllegalStateException(
425                                                 "premature end of input ("
426                                                         + leftover.position()
427                                                         + " undecoded bytes)"));
428                                 continue;
429                             } else {
430                                 upstream.onComplete();
431                             }
432                             return;
433                         } else if (demanded.get() == 0
434                                 && !downstreamDemand.isFulfilled()) {
435                             long incr = Math.max(1, downstreamDemand.get());
436                             demanded.addAndGet(incr);
437                             upstreamSubscription.request(incr);
438                             continue;
439                         } else return;
440                     }
441                     assert nextLine != null;
442                     assert newline != null && !nextLine.endsWith(newline)
443                             || !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
444                     if (downstreamDemand.tryDecrement()) {
445                         String forward = nextLine;
446                         nextLine = null;
447                         upstream.onNext(forward);
448                     } else return; // no demand: come back later
449                 }
450             } catch (Throwable t) {
451                 try {
452                     upstreamSubscription.cancel();
453                 } finally {
454                     signalError(t);
455                 }
456             }
457         }
458 
create(Flow.Subscription s, Charset charset, String lineSeparator, Flow.Subscriber<? super String> upstream, CompletableFuture<?> cf)459         static LineSubscription create(Flow.Subscription s,
460                                        Charset charset,
461                                        String lineSeparator,
462                                        Flow.Subscriber<? super String> upstream,
463                                        CompletableFuture<?> cf) {
464             return new LineSubscription(Objects.requireNonNull(s),
465                     Objects.requireNonNull(charset).newDecoder()
466                             // use the same decoder configuration than
467                             // java.io.InputStreamReader
468                             .onMalformedInput(CodingErrorAction.REPLACE)
469                             .onUnmappableCharacter(CodingErrorAction.REPLACE),
470                     lineSeparator,
471                     Objects.requireNonNull(upstream),
472                     Objects.requireNonNull(cf));
473         }
474     }
475 }
476 
477