1 /*
2  * Copyright (c) 2019, 2020, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.jfr.internal.consumer;
27 
28 import java.io.IOException;
29 import java.nio.file.Path;
30 import java.security.AccessControlContext;
31 import java.time.Duration;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.Comparator;
35 import java.util.Objects;
36 import jdk.jfr.consumer.RecordedEvent;
37 
38 /**
39  * Implementation of an event stream that operates against a recording file.
40  *
41  */
42 public final class EventFileStream extends AbstractEventStream {
43     private final static Comparator<? super RecordedEvent> EVENT_COMPARATOR = JdkJfrConsumer.instance().eventComparator();
44 
45     private final RecordingInput input;
46 
47     private ChunkParser currentParser;
48     private RecordedEvent[] cacheSorted;
49 
EventFileStream(AccessControlContext acc, Path path)50     public EventFileStream(AccessControlContext acc, Path path) throws IOException {
51         super(acc, null, Collections.emptyList());
52         Objects.requireNonNull(path);
53         this.input = new RecordingInput(path.toFile(), FileAccess.UNPRIVILEGED);
54     }
55 
56     @Override
start()57     public void start() {
58         start(0);
59     }
60 
61     @Override
startAsync()62     public void startAsync() {
63         startAsync(0);
64     }
65 
66     @Override
close()67     public void close() {
68         setClosed(true);
69         dispatcher().runCloseActions();
70         try {
71             input.close();
72         } catch (IOException e) {
73             // ignore
74         }
75     }
76 
77     @Override
process()78     protected void process() throws IOException {
79         Dispatcher disp = dispatcher();
80         long start = 0;
81         long end = Long.MAX_VALUE;
82         if (disp.startTime != null) {
83             start = disp.startNanos;
84         }
85         if (disp.endTime != null) {
86             end = disp.endNanos;
87         }
88 
89         currentParser = new ChunkParser(input, disp.parserConfiguration);
90         while (!isClosed()) {
91             onMetadata(currentParser);
92             if (currentParser.getStartNanos() > end) {
93                 close();
94                 return;
95             }
96             disp = dispatcher();
97             disp.parserConfiguration.filterStart = start;
98             disp.parserConfiguration.filterEnd = end;
99             currentParser.updateConfiguration(disp.parserConfiguration, true);
100             if (disp.parserConfiguration.isOrdered()) {
101                 processOrdered(disp);
102             } else {
103                 processUnordered(disp);
104             }
105             currentParser.resetCache();
106             if (isClosed() || currentParser.isLastChunk()) {
107                 return;
108             }
109             currentParser = currentParser.nextChunkParser();
110         }
111     }
112 
processOrdered(Dispatcher c)113     private void processOrdered(Dispatcher c) throws IOException {
114         if (cacheSorted == null) {
115             cacheSorted = new RecordedEvent[10_000];
116         }
117         RecordedEvent event;
118         int index = 0;
119         while (!currentParser.isChunkFinished()) {
120             while ((event = currentParser.readStreamingEvent()) != null) {
121                 if (index == cacheSorted.length) {
122                     RecordedEvent[] tmp = cacheSorted;
123                     cacheSorted = new RecordedEvent[2 * tmp.length];
124                     System.arraycopy(tmp, 0, cacheSorted, 0, tmp.length);
125                 }
126                 cacheSorted[index++] = event;
127             }
128             dispatchOrdered(c, index);
129             index = 0;
130         }
131     }
132 
dispatchOrdered(Dispatcher c, int index)133     private void dispatchOrdered(Dispatcher c, int index) {
134         onMetadata(currentParser);
135         Arrays.sort(cacheSorted, 0, index, EVENT_COMPARATOR);
136         for (int i = 0; i < index; i++) {
137             c.dispatch(cacheSorted[i]);
138         }
139         onFlush();
140     }
141 
processUnordered(Dispatcher c)142     private void processUnordered(Dispatcher c) throws IOException {
143         onMetadata(currentParser);
144         while (!isClosed()) {
145             RecordedEvent event = currentParser.readStreamingEvent();
146             if (event == null) {
147                 onFlush();
148                 if (currentParser.isChunkFinished()) {
149                     return;
150                 }
151                 continue;
152             }
153             onMetadata(currentParser);
154             c.dispatch(event);
155         }
156     }
157 }
158