1 /*
2  * Copyright (c) 2019, 2021, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.jfr.consumer;
27 
28 import java.io.IOException;
29 import java.nio.file.Path;
30 import java.security.AccessControlContext;
31 import java.security.AccessController;
32 import java.time.Duration;
33 import java.time.Instant;
34 import java.util.Collections;
35 import java.util.Objects;
36 import java.util.function.Consumer;
37 
38 import jdk.jfr.internal.SecuritySupport;
39 import jdk.jfr.internal.Utils;
40 import jdk.jfr.internal.consumer.EventDirectoryStream;
41 import jdk.jfr.internal.consumer.EventFileStream;
42 import jdk.jfr.internal.consumer.FileAccess;
43 
44 /**
45  * Represents a stream of events.
46  * <p>
47  * A stream is a sequence of events and the way to interact with a stream is to
48  * register actions. The {@code EventStream} interface is not to be implemented
49  * and future versions of the JDK may prevent this completely.
50  * <p>
51  * To receive a notification when an event arrives, register an action using the
52  * {@link #onEvent(Consumer)} method. To filter the stream for an event with a
53  * specific name, use {@link #onEvent(String, Consumer)} method.
54  * <p>
55  * By default, the same {@code RecordedEvent} object can be used to
56  * represent two or more distinct events. That object can be delivered
57  * multiple times to the same action as well as to other actions. To use an
58  * event object after the action is completed, the
59  * {@link #setReuse(boolean)} method should be set to {@code false} so a
60  * new object is allocated for each event.
61  * <p>
62  * Events are delivered in batches. To receive a notification when a batch is
63  * complete, register an action using the {@link #onFlush(Runnable)} method.
64  * This is an opportunity to aggregate or push data to external systems while
65  * the Java Virtual Machine (JVM) is preparing the next batch.
66  * <p>
67  * Events within a batch are sorted chronologically by their end time.
68  * Well-ordering of events is only maintained for events available to the JVM at
69  * the point of flush, i.e. for the set of events delivered as a unit in a
70  * single batch. Events delivered in a batch could therefore be out-of-order
71  * compared to events delivered in a previous batch, but never out-of-order with
72  * events within the same batch. If ordering is not a concern, sorting can be
73  * disabled using the {@link #setOrdered(boolean)} method.
74  * <p>
75  * To dispatch events to registered actions, the stream must be started. To
76  * start processing in the current thread, invoke the {@link #start()} method.
77  * To process actions asynchronously in a separate thread, invoke the
78  * {@link #startAsync()} method. To await completion of the stream, use the
79  * awaitTermination {@link #awaitTermination()} or the
80  * {@link #awaitTermination(Duration)} method.
81  * <p>
82  * When a stream ends it is automatically closed. To manually stop processing of
83  * events, close the stream by invoking the {@link #close()} method. A stream
84  * can also be automatically closed in exceptional circumstances, for example if
85  * the JVM that is being monitored exits. To receive a notification in any of
86  * these occasions, use the {@link #onClose(Runnable)} method to register an
87  * action.
88  * <p>
89  * If an unexpected exception occurs in an action, it is possible to catch the
90  * exception in an error handler. An error handler can be registered using the
91  * {@link #onError(Consumer)} method. If no error handler is registered, the
92  * default behavior is to print the exception and its backtrace to the standard
93  * error stream.
94  * <p>
95  * The following example shows how an {@code EventStream} can be used to listen
96  * to events on a JVM running Flight Recorder
97  *
98  * <pre>{@literal
99  * try (var es = EventStream.openRepository()) {
100  *   es.onEvent("jdk.CPULoad", event -> {
101  *     System.out.println("CPU Load " + event.getEndTime());
102  *     System.out.println(" Machine total: " + 100 * event.getFloat("machineTotal") + "%");
103  *     System.out.println(" JVM User: " + 100 * event.getFloat("jvmUser") + "%");
104  *     System.out.println(" JVM System: " + 100 * event.getFloat("jvmSystem") + "%");
105  *     System.out.println();
106  *   });
107  *   es.onEvent("jdk.GarbageCollection", event -> {
108  *     System.out.println("Garbage collection: " + event.getLong("gcId"));
109  *     System.out.println(" Cause: " + event.getString("cause"));
110  *     System.out.println(" Total pause: " + event.getDuration("sumOfPauses"));
111  *     System.out.println(" Longest pause: " + event.getDuration("longestPause"));
112  *     System.out.println();
113  *   });
114  *   es.start();
115  * }
116  * }</pre>
117  * <p>
118  * To start recording together with the stream, see {@link RecordingStream}.
119  *
120  * @since 14
121  */
122 public interface EventStream extends AutoCloseable {
123     /**
124      * Creates a stream from the repository of the current Java Virtual Machine
125      * (JVM).
126      * <p>
127      * By default, the stream starts with the next event flushed by Flight
128      * Recorder.
129      *
130      * @return an event stream, not {@code null}
131      *
132      * @throws IOException if a stream can't be opened, or an I/O error occurs
133      *         when trying to access the repository
134      *
135      * @throws SecurityException if a security manager exists and the caller
136      *         does not have
137      *         {@code FlightRecorderPermission("accessFlightRecorder")}
138      */
139     @SuppressWarnings("removal")
openRepository()140     public static EventStream openRepository() throws IOException {
141         Utils.checkAccessFlightRecorder();
142         return new EventDirectoryStream(
143             AccessController.getContext(),
144             null,
145             SecuritySupport.PRIVILEGED,
146             null,
147             Collections.emptyList(),
148             false
149         );
150     }
151 
152     /**
153      * Creates an event stream from a disk repository.
154      * <p>
155      * By default, the stream starts with the next event flushed by Flight
156      * Recorder.
157      *
158      * @param directory location of the disk repository, not {@code null}
159      *
160      * @return an event stream, not {@code null}
161      *
162      * @throws IOException if a stream can't be opened, or an I/O error occurs
163      *         when trying to access the repository
164      *
165      * @throws SecurityException if a security manager exists and its
166      *         {@code checkRead} method denies read access to the directory, or
167      *         files in the directory.
168      */
openRepository(Path directory)169     public static EventStream openRepository(Path directory) throws IOException {
170         Objects.requireNonNull(directory);
171         @SuppressWarnings("removal")
172         AccessControlContext acc = AccessController.getContext();
173         return new EventDirectoryStream(
174             acc,
175             directory,
176             FileAccess.UNPRIVILEGED,
177             null,
178             Collections.emptyList(),
179             true
180         );
181     }
182 
183     /**
184      * Creates an event stream from a file.
185      * <p>
186      * By default, the stream starts with the first event in the file.
187      *
188      * @param file location of the file, not {@code null}
189      *
190      * @return an event stream, not {@code null}
191      *
192      * @throws IOException if the file can't be opened, or an I/O error occurs
193      *         during reading
194      *
195      * @throws SecurityException if a security manager exists and its
196      *         {@code checkRead} method denies read access to the file
197      */
198     @SuppressWarnings("removal")
openFile(Path file)199     static EventStream openFile(Path file) throws IOException {
200         return new EventFileStream(AccessController.getContext(), file);
201     }
202 
203     /**
204      * Registers an action to perform when new metadata arrives in the stream.
205      *
206      * The event type of an event always arrives sometime before the actual event.
207      * The action must be registered before the stream is started.
208      *
209      * @implSpec The default implementation of this method is empty.
210      *
211      * @param action to perform, not {@code null}
212      *
213      * @throws IllegalStateException if an action is added after the stream has
214      *                               started
215      */
onMetadata(Consumer<MetadataEvent> action)216      default void onMetadata(Consumer<MetadataEvent> action) {
217      }
218 
219     /**
220      * Registers an action to perform on all events in the stream.
221      *
222      * @param action an action to perform on each {@code RecordedEvent}, not
223      *        {@code null}
224      */
onEvent(Consumer<RecordedEvent> action)225     void onEvent(Consumer<RecordedEvent> action);
226 
227     /**
228      * Registers an action to perform on all events matching a name.
229      *
230      * @param eventName the name of the event, not {@code null}
231      *
232      * @param action an action to perform on each {@code RecordedEvent} matching
233      *        the event name, not {@code null}
234      */
onEvent(String eventName, Consumer<RecordedEvent> action)235     void onEvent(String eventName, Consumer<RecordedEvent> action);
236 
237     /**
238      * Registers an action to perform after the stream has been flushed.
239      *
240      * @param action an action to perform after the stream has been
241      *        flushed, not {@code null}
242      */
onFlush(Runnable action)243     void onFlush(Runnable action);
244 
245     /**
246      * Registers an action to perform if an exception occurs.
247      * <p>
248      * If an action is not registered, an exception stack trace is printed to
249      * standard error.
250      * <p>
251      * Registering an action overrides the default behavior. If multiple actions
252      * have been registered, they are performed in the order of registration.
253      * <p>
254      * If this method itself throws an exception, resulting behavior is
255      * undefined.
256      *
257      * @param action an action to perform if an exception occurs, not
258      *        {@code null}
259      */
onError(Consumer<Throwable> action)260     void onError(Consumer<Throwable> action);
261 
262     /**
263      * Registers an action to perform when the stream is closed.
264      * <p>
265      * If the stream is already closed, the action will be performed immediately
266      * in the current thread.
267      *
268      * @param action an action to perform after the stream is closed, not
269      *        {@code null}
270      * @see #close()
271      */
onClose(Runnable action)272     void onClose(Runnable action);
273 
274     /**
275      * Releases all resources associated with this stream.
276      * <p>
277      * If a stream is started, asynchronously or synchronously, it is stopped
278      * immediately or after the next flush. This method does <em>NOT</em>
279      * guarantee that all registered actions are completed before return.
280      * <p>
281      * Closing a previously closed stream has no effect.
282      */
283     @Override
close()284     void close();
285 
286     /**
287      * Unregisters an action.
288      * <p>
289      * If the action has been registered multiple times, all instances are
290      * unregistered.
291      *
292      * @param action the action to unregister, not {@code null}
293      *
294      * @return {@code true} if the action was unregistered, {@code false}
295      *         otherwise
296      *
297      * @see #onEvent(Consumer)
298      * @see #onEvent(String, Consumer)
299      * @see #onFlush(Runnable)
300      * @see #onClose(Runnable)
301      * @see #onError(Consumer)
302      */
remove(Object action)303     boolean remove(Object action);
304 
305     /**
306      * Specifies that the event object in an {@link #onEvent(Consumer)} action
307      * can be reused.
308      * <p>
309      * If reuse is set to {@code true}, an action should not keep a reference
310      * to the event object after the action has completed.
311      *
312      * @param reuse {@code true} if an event object can be reused, {@code false}
313      * otherwise
314      */
setReuse(boolean reuse)315     void setReuse(boolean reuse);
316 
317     /**
318      * Specifies that events arrives in chronological order, sorted by the time
319      * they were committed to the stream.
320      *
321      * @param ordered if event objects arrive in chronological order to
322      *        {@link #onEvent(Consumer)}
323      */
setOrdered(boolean ordered)324     void setOrdered(boolean ordered);
325 
326     /**
327      * Specifies the start time of the stream.
328      * <p>
329      * The start time must be set before starting the stream
330      *
331      * @param startTime the start time, not {@code null}
332      *
333      * @throws IllegalStateException if the stream is already started
334      *
335      * @see #start()
336      * @see #startAsync()
337      */
setStartTime(Instant startTime)338     void setStartTime(Instant startTime);
339 
340     /**
341      * Specifies the end time of the stream.
342      * <p>
343      * The end time must be set before starting the stream.
344      * <p>
345      * At end time, the stream is closed.
346      *
347      * @param endTime the end time, not {@code null}
348      *
349      * @throws IllegalStateException if the stream is already started
350      *
351      * @see #start()
352      * @see #startAsync()
353      */
setEndTime(Instant endTime)354     void setEndTime(Instant endTime);
355 
356     /**
357      * Starts processing of actions.
358      * <p>
359      * Actions are performed in the current thread.
360      * <p>
361      * To stop the stream, use the {@link #close()} method.
362      *
363      * @throws IllegalStateException if the stream is already started or closed
364      */
start()365     void start();
366 
367     /**
368      * Starts asynchronous processing of actions.
369      * <p>
370      * Actions are performed in a single separate thread.
371      * <p>
372      * To stop the stream, use the {@link #close()} method.
373      *
374      * @throws IllegalStateException if the stream is already started or closed
375      */
startAsync()376     void startAsync();
377 
378     /**
379      * Blocks until all actions are completed, or the stream is closed, or the
380      * timeout occurs, or the current thread is interrupted, whichever happens
381      * first.
382      *
383      * @param timeout the maximum time to wait, not {@code null}
384      *
385      * @throws IllegalArgumentException if timeout is negative
386      * @throws InterruptedException if interrupted while waiting
387      *
388      * @see #start()
389      * @see #startAsync()
390      * @see Thread#interrupt()
391      */
awaitTermination(Duration timeout)392     void awaitTermination(Duration timeout) throws InterruptedException;
393 
394     /**
395      * Blocks until all actions are completed, or the stream is closed, or the
396      * current thread is interrupted, whichever happens first.
397      *
398      * @throws InterruptedException if interrupted while waiting
399      *
400      * @see #start()
401      * @see #startAsync()
402      * @see Thread#interrupt()
403      */
awaitTermination()404     void awaitTermination() throws InterruptedException;
405 }
406