1 /* 2 * Copyright (c) 2019, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.jfr.internal.consumer; 27 28 import java.io.IOException; 29 import java.nio.file.Path; 30 import java.security.AccessControlContext; 31 import java.time.Duration; 32 import java.util.Arrays; 33 import java.util.Collections; 34 import java.util.Comparator; 35 import java.util.Objects; 36 import jdk.jfr.consumer.RecordedEvent; 37 38 /** 39 * Implementation of an event stream that operates against a recording file. 40 * 41 */ 42 public final class EventFileStream extends AbstractEventStream { 43 private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator(); 44 45 private final RecordingInput input; 46 47 private ChunkParser currentParser; 48 private RecordedEvent[] cacheSorted; 49 EventFileStream(AccessControlContext acc, Path path)50 public EventFileStream(AccessControlContext acc, Path path) throws IOException { 51 super(acc, null, Collections.emptyList()); 52 Objects.requireNonNull(path); 53 this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILEGED); 54 } 55 56 @Override start()57 public void start() { 58 start(0); 59 } 60 61 @Override startAsync()62 public void startAsync() { 63 startAsync(0); 64 } 65 66 @Override close()67 public void close() { 68 setClosed(true); 69 dispatcher().runCloseActions(); 70 try { 71 input.close(); 72 } catch (IOException e) { 73 // ignore 74 } 75 } 76 77 @Override process()78 protected void process() throws IOException { 79 Dispatcher disp = dispatcher(); 80 long start = 0; 81 long end = Long.MAX_VALUE; 82 if (disp.startTime != null) { 83 start = disp.startNanos; 84 } 85 if (disp.endTime != null) { 86 end = disp.endNanos; 87 } 88 89 currentParser = new ChunkParser(input, disp.parserConfiguration); 90 while (!isClosed()) { 91 onMetadata(currentParser); 92 if (currentParser.getStartNanos() > end) { 93 close(); 94 return; 95 } 96 disp = dispatcher(); 97 disp.parserConfiguration.filterStart = start; 98 disp.parserConfiguration.filterEnd = end; 99 currentParser.updateConfiguration(disp.parserConfiguration, true); 100 if (disp.parserConfiguration.isOrdered()) { 101 processOrdered(disp); 102 } else { 103 processUnordered(disp); 104 } 105 currentParser.resetCache(); 106 if (isClosed() || currentParser.isLastChunk()) { 107 return; 108 } 109 currentParser = currentParser.nextChunkParser(); 110 } 111 } 112 processOrdered(Dispatcher c)113 private void processOrdered(Dispatcher c) throws IOException { 114 if (cacheSorted == null) { 115 cacheSorted = new RecordedEvent[10_000]; 116 } 117 RecordedEvent event; 118 int index = 0; 119 while (!currentParser.isChunkFinished()) { 120 while ((event = currentParser.readStreamingEvent()) != null) { 121 if (index == cacheSorted.length) { 122 RecordedEvent[] tmp = cacheSorted; 123 cacheSorted = new RecordedEvent[2 * tmp.length]; 124 System.arraycopy(tmp, 0, cacheSorted, 0, tmp.length); 125 } 126 cacheSorted[index++] = event; 127 } 128 dispatchOrdered(c, index); 129 index = 0; 130 } 131 } 132 dispatchOrdered(Dispatcher c, int index)133 private void dispatchOrdered(Dispatcher c, int index) { 134 onMetadata(currentParser); 135 Arrays.sort(cacheSorted, 0, index, EVENT_COMPARATOR); 136 for (int i = 0; i < index; i++) { 137 c.dispatch(cacheSorted[i]); 138 } 139 onFlush(); 140 } 141 processUnordered(Dispatcher c)142 private void processUnordered(Dispatcher c) throws IOException { 143 onMetadata(currentParser); 144 while (!isClosed()) { 145 RecordedEvent event = currentParser.readStreamingEvent(); 146 if (event == null) { 147 onFlush(); 148 if (currentParser.isChunkFinished()) { 149 return; 150 } 151 continue; 152 } 153 onMetadata(currentParser); 154 c.dispatch(event); 155 } 156 } 157 } 158