1 /* 2 * Copyright (c) 2019, 2021, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.jfr.consumer; 27 28 import java.io.IOException; 29 import java.nio.file.Path; 30 import java.security.AccessControlContext; 31 import java.security.AccessController; 32 import java.time.Duration; 33 import java.time.Instant; 34 import java.util.Collections; 35 import java.util.Objects; 36 import java.util.function.Consumer; 37 38 import jdk.jfr.internal.SecuritySupport; 39 import jdk.jfr.internal.Utils; 40 import jdk.jfr.internal.consumer.EventDirectoryStream; 41 import jdk.jfr.internal.consumer.EventFileStream; 42 import jdk.jfr.internal.consumer.FileAccess; 43 44 /** 45 * Represents a stream of events. 46 * <p> 47 * A stream is a sequence of events and the way to interact with a stream is to 48 * register actions. The {@code EventStream} interface is not to be implemented 49 * and future versions of the JDK may prevent this completely. 50 * <p> 51 * To receive a notification when an event arrives, register an action using the 52 * {@link #onEvent(Consumer)} method. To filter the stream for an event with a 53 * specific name, use {@link #onEvent(String, Consumer)} method. 54 * <p> 55 * By default, the same {@code RecordedEvent} object can be used to 56 * represent two or more distinct events. That object can be delivered 57 * multiple times to the same action as well as to other actions. To use an 58 * event object after the action is completed, the 59 * {@link #setReuse(boolean)} method should be set to {@code false} so a 60 * new object is allocated for each event. 61 * <p> 62 * Events are delivered in batches. To receive a notification when a batch is 63 * complete, register an action using the {@link #onFlush(Runnable)} method. 64 * This is an opportunity to aggregate or push data to external systems while 65 * the Java Virtual Machine (JVM) is preparing the next batch. 66 * <p> 67 * Events within a batch are sorted chronologically by their end time. 68 * Well-ordering of events is only maintained for events available to the JVM at 69 * the point of flush, i.e. for the set of events delivered as a unit in a 70 * single batch. Events delivered in a batch could therefore be out-of-order 71 * compared to events delivered in a previous batch, but never out-of-order with 72 * events within the same batch. If ordering is not a concern, sorting can be 73 * disabled using the {@link #setOrdered(boolean)} method. 74 * <p> 75 * To dispatch events to registered actions, the stream must be started. To 76 * start processing in the current thread, invoke the {@link #start()} method. 77 * To process actions asynchronously in a separate thread, invoke the 78 * {@link #startAsync()} method. To await completion of the stream, use the 79 * awaitTermination {@link #awaitTermination()} or the 80 * {@link #awaitTermination(Duration)} method. 81 * <p> 82 * When a stream ends it is automatically closed. To manually stop processing of 83 * events, close the stream by invoking the {@link #close()} method. A stream 84 * can also be automatically closed in exceptional circumstances, for example if 85 * the JVM that is being monitored exits. To receive a notification in any of 86 * these occasions, use the {@link #onClose(Runnable)} method to register an 87 * action. 88 * <p> 89 * If an unexpected exception occurs in an action, it is possible to catch the 90 * exception in an error handler. An error handler can be registered using the 91 * {@link #onError(Consumer)} method. If no error handler is registered, the 92 * default behavior is to print the exception and its backtrace to the standard 93 * error stream. 94 * <p> 95 * The following example shows how an {@code EventStream} can be used to listen 96 * to events on a JVM running Flight Recorder 97 * 98 * <pre>{@literal 99 * try (var es = EventStream.openRepository()) { 100 * es.onEvent("jdk.CPULoad", event -> { 101 * System.out.println("CPU Load " + event.getEndTime()); 102 * System.out.println(" Machine total: " + 100 * event.getFloat("machineTotal") + "%"); 103 * System.out.println(" JVM User: " + 100 * event.getFloat("jvmUser") + "%"); 104 * System.out.println(" JVM System: " + 100 * event.getFloat("jvmSystem") + "%"); 105 * System.out.println(); 106 * }); 107 * es.onEvent("jdk.GarbageCollection", event -> { 108 * System.out.println("Garbage collection: " + event.getLong("gcId")); 109 * System.out.println(" Cause: " + event.getString("cause")); 110 * System.out.println(" Total pause: " + event.getDuration("sumOfPauses")); 111 * System.out.println(" Longest pause: " + event.getDuration("longestPause")); 112 * System.out.println(); 113 * }); 114 * es.start(); 115 * } 116 * }</pre> 117 * <p> 118 * To start recording together with the stream, see {@link RecordingStream}. 119 * 120 * @since 14 121 */ 122 public interface EventStream extends AutoCloseable { 123 /** 124 * Creates a stream from the repository of the current Java Virtual Machine 125 * (JVM). 126 * <p> 127 * By default, the stream starts with the next event flushed by Flight 128 * Recorder. 129 * 130 * @return an event stream, not {@code null} 131 * 132 * @throws IOException if a stream can't be opened, or an I/O error occurs 133 * when trying to access the repository 134 * 135 * @throws SecurityException if a security manager exists and the caller 136 * does not have 137 * {@code FlightRecorderPermission("accessFlightRecorder")} 138 */ 139 @SuppressWarnings("removal") openRepository()140 public static EventStream openRepository() throws IOException { 141 Utils.checkAccessFlightRecorder(); 142 return new EventDirectoryStream( 143 AccessController.getContext(), 144 null, 145 SecuritySupport.PRIVILEGED, 146 null, 147 Collections.emptyList(), 148 false 149 ); 150 } 151 152 /** 153 * Creates an event stream from a disk repository. 154 * <p> 155 * By default, the stream starts with the next event flushed by Flight 156 * Recorder. 157 * 158 * @param directory location of the disk repository, not {@code null} 159 * 160 * @return an event stream, not {@code null} 161 * 162 * @throws IOException if a stream can't be opened, or an I/O error occurs 163 * when trying to access the repository 164 * 165 * @throws SecurityException if a security manager exists and its 166 * {@code checkRead} method denies read access to the directory, or 167 * files in the directory. 168 */ openRepository(Path directory)169 public static EventStream openRepository(Path directory) throws IOException { 170 Objects.requireNonNull(directory); 171 @SuppressWarnings("removal") 172 AccessControlContext acc = AccessController.getContext(); 173 return new EventDirectoryStream( 174 acc, 175 directory, 176 FileAccess.UNPRIVILEGED, 177 null, 178 Collections.emptyList(), 179 true 180 ); 181 } 182 183 /** 184 * Creates an event stream from a file. 185 * <p> 186 * By default, the stream starts with the first event in the file. 187 * 188 * @param file location of the file, not {@code null} 189 * 190 * @return an event stream, not {@code null} 191 * 192 * @throws IOException if the file can't be opened, or an I/O error occurs 193 * during reading 194 * 195 * @throws SecurityException if a security manager exists and its 196 * {@code checkRead} method denies read access to the file 197 */ 198 @SuppressWarnings("removal") openFile(Path file)199 static EventStream openFile(Path file) throws IOException { 200 return new EventFileStream(AccessController.getContext(), file); 201 } 202 203 /** 204 * Registers an action to perform when new metadata arrives in the stream. 205 * 206 * The event type of an event always arrives sometime before the actual event. 207 * The action must be registered before the stream is started. 208 * 209 * @implSpec The default implementation of this method is empty. 210 * 211 * @param action to perform, not {@code null} 212 * 213 * @throws IllegalStateException if an action is added after the stream has 214 * started 215 */ onMetadata(Consumer<MetadataEvent> action)216 default void onMetadata(Consumer<MetadataEvent> action) { 217 } 218 219 /** 220 * Registers an action to perform on all events in the stream. 221 * 222 * @param action an action to perform on each {@code RecordedEvent}, not 223 * {@code null} 224 */ onEvent(Consumer<RecordedEvent> action)225 void onEvent(Consumer<RecordedEvent> action); 226 227 /** 228 * Registers an action to perform on all events matching a name. 229 * 230 * @param eventName the name of the event, not {@code null} 231 * 232 * @param action an action to perform on each {@code RecordedEvent} matching 233 * the event name, not {@code null} 234 */ onEvent(String eventName, Consumer<RecordedEvent> action)235 void onEvent(String eventName, Consumer<RecordedEvent> action); 236 237 /** 238 * Registers an action to perform after the stream has been flushed. 239 * 240 * @param action an action to perform after the stream has been 241 * flushed, not {@code null} 242 */ onFlush(Runnable action)243 void onFlush(Runnable action); 244 245 /** 246 * Registers an action to perform if an exception occurs. 247 * <p> 248 * If an action is not registered, an exception stack trace is printed to 249 * standard error. 250 * <p> 251 * Registering an action overrides the default behavior. If multiple actions 252 * have been registered, they are performed in the order of registration. 253 * <p> 254 * If this method itself throws an exception, resulting behavior is 255 * undefined. 256 * 257 * @param action an action to perform if an exception occurs, not 258 * {@code null} 259 */ onError(Consumer<Throwable> action)260 void onError(Consumer<Throwable> action); 261 262 /** 263 * Registers an action to perform when the stream is closed. 264 * <p> 265 * If the stream is already closed, the action will be performed immediately 266 * in the current thread. 267 * 268 * @param action an action to perform after the stream is closed, not 269 * {@code null} 270 * @see #close() 271 */ onClose(Runnable action)272 void onClose(Runnable action); 273 274 /** 275 * Releases all resources associated with this stream. 276 * <p> 277 * If a stream is started, asynchronously or synchronously, it is stopped 278 * immediately or after the next flush. This method does <em>NOT</em> 279 * guarantee that all registered actions are completed before return. 280 * <p> 281 * Closing a previously closed stream has no effect. 282 */ 283 @Override close()284 void close(); 285 286 /** 287 * Unregisters an action. 288 * <p> 289 * If the action has been registered multiple times, all instances are 290 * unregistered. 291 * 292 * @param action the action to unregister, not {@code null} 293 * 294 * @return {@code true} if the action was unregistered, {@code false} 295 * otherwise 296 * 297 * @see #onEvent(Consumer) 298 * @see #onEvent(String, Consumer) 299 * @see #onFlush(Runnable) 300 * @see #onClose(Runnable) 301 * @see #onError(Consumer) 302 */ remove(Object action)303 boolean remove(Object action); 304 305 /** 306 * Specifies that the event object in an {@link #onEvent(Consumer)} action 307 * can be reused. 308 * <p> 309 * If reuse is set to {@code true}, an action should not keep a reference 310 * to the event object after the action has completed. 311 * 312 * @param reuse {@code true} if an event object can be reused, {@code false} 313 * otherwise 314 */ setReuse(boolean reuse)315 void setReuse(boolean reuse); 316 317 /** 318 * Specifies that events arrives in chronological order, sorted by the time 319 * they were committed to the stream. 320 * 321 * @param ordered if event objects arrive in chronological order to 322 * {@link #onEvent(Consumer)} 323 */ setOrdered(boolean ordered)324 void setOrdered(boolean ordered); 325 326 /** 327 * Specifies the start time of the stream. 328 * <p> 329 * The start time must be set before starting the stream 330 * 331 * @param startTime the start time, not {@code null} 332 * 333 * @throws IllegalStateException if the stream is already started 334 * 335 * @see #start() 336 * @see #startAsync() 337 */ setStartTime(Instant startTime)338 void setStartTime(Instant startTime); 339 340 /** 341 * Specifies the end time of the stream. 342 * <p> 343 * The end time must be set before starting the stream. 344 * <p> 345 * At end time, the stream is closed. 346 * 347 * @param endTime the end time, not {@code null} 348 * 349 * @throws IllegalStateException if the stream is already started 350 * 351 * @see #start() 352 * @see #startAsync() 353 */ setEndTime(Instant endTime)354 void setEndTime(Instant endTime); 355 356 /** 357 * Starts processing of actions. 358 * <p> 359 * Actions are performed in the current thread. 360 * <p> 361 * To stop the stream, use the {@link #close()} method. 362 * 363 * @throws IllegalStateException if the stream is already started or closed 364 */ start()365 void start(); 366 367 /** 368 * Starts asynchronous processing of actions. 369 * <p> 370 * Actions are performed in a single separate thread. 371 * <p> 372 * To stop the stream, use the {@link #close()} method. 373 * 374 * @throws IllegalStateException if the stream is already started or closed 375 */ startAsync()376 void startAsync(); 377 378 /** 379 * Blocks until all actions are completed, or the stream is closed, or the 380 * timeout occurs, or the current thread is interrupted, whichever happens 381 * first. 382 * 383 * @param timeout the maximum time to wait, not {@code null} 384 * 385 * @throws IllegalArgumentException if timeout is negative 386 * @throws InterruptedException if interrupted while waiting 387 * 388 * @see #start() 389 * @see #startAsync() 390 * @see Thread#interrupt() 391 */ awaitTermination(Duration timeout)392 void awaitTermination(Duration timeout) throws InterruptedException; 393 394 /** 395 * Blocks until all actions are completed, or the stream is closed, or the 396 * current thread is interrupted, whichever happens first. 397 * 398 * @throws InterruptedException if interrupted while waiting 399 * 400 * @see #start() 401 * @see #startAsync() 402 * @see Thread#interrupt() 403 */ awaitTermination()404 void awaitTermination() throws InterruptedException; 405 } 406