1 /*
2  * Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.management.jfr;
27 
28 import java.io.IOException;
29 import java.io.RandomAccessFile;
30 import java.nio.channels.FileChannel;
31 import java.nio.file.Files;
32 import java.nio.file.Path;
33 import java.nio.file.Paths;
34 import java.nio.file.StandardOpenOption;
35 import java.security.AccessControlContext;
36 import java.security.AccessController;
37 import java.time.Duration;
38 import java.time.Instant;
39 import java.util.ArrayList;
40 import java.util.Collections;
41 import java.util.HashMap;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Objects;
45 import java.util.concurrent.Future;
46 import java.util.function.Consumer;
47 import java.security.AccessControlException;
48 import javax.management.JMX;
49 import javax.management.MBeanServerConnection;
50 import javax.management.ObjectName;
51 
52 import jdk.jfr.Configuration;
53 import jdk.jfr.EventSettings;
54 import jdk.jfr.EventType;
55 import jdk.jfr.Recording;
56 import jdk.jfr.RecordingState;
57 import jdk.jfr.consumer.EventStream;
58 import jdk.jfr.consumer.MetadataEvent;
59 import jdk.jfr.consumer.RecordedEvent;
60 import jdk.jfr.consumer.RecordingStream;
61 import jdk.jfr.internal.management.EventSettingsModifier;
62 import jdk.jfr.internal.management.ManagementSupport;
63 import jdk.management.jfr.DiskRepository.DiskChunk;
64 import jdk.jfr.internal.management.EventByteStream;
65 
66 /**
67  * An implementation of an {@link EventStream} that can serialize events over
68  * the network using an {@link MBeanServerConnection}.
69  * <p>
70  * The following example shows how to record garbage collection pauses and CPU
71  * usage on a remote host and print the events to standard out.
72  *
73  * <pre>
74  *     {@literal
75  *     String host = "com.example";
76  *     int port = 4711;
77  *
78  *     String url = "service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi";
79  *
80  *     JMXServiceURL u = new JMXServiceURL(url);
81  *     JMXConnector c = JMXConnectorFactory.connect(u);
82  *     MBeanServerConnection conn = c.getMBeanServerConnection();
83  *
84  *     try (var rs = new RemoteRecordingStream(conn)) {
85  *         rs.enable("jdk.GCPhasePause").withoutThreshold();
86  *         rs.enable("jdk.CPULoad").withPeriod(Duration.ofSeconds(1));
87  *         rs.onEvent("jdk.CPULoad", System.out::println);
88  *         rs.onEvent("jdk.GCPhasePause", System.out::println);
89  *         rs.start();
90  *     }
91  *     }
92  * </pre>
93  *
94  * @since 16
95  */
96 public final class RemoteRecordingStream implements EventStream {
97     private static final String ENABLED = "enabled";
98 
99     static final class RemoteSettings implements EventSettingsModifier {
100 
101         private final FlightRecorderMXBean mbean;
102         private final long recordingId;
103 
RemoteSettings(FlightRecorderMXBean mbean, long recordingId)104         RemoteSettings(FlightRecorderMXBean mbean, long recordingId) {
105             this.mbean = mbean;
106             this.recordingId = recordingId;
107         }
108 
109         @Override
with(String name, String value)110         public void with(String name, String value) {
111             Objects.requireNonNull(name);
112             Objects.requireNonNull(value);
113             // FlightRecorderMXBean implementation always returns
114             // new instance of Map so no need to create new here.
115             Map<String, String> newSettings = getEventSettings();
116             newSettings.put(name, value);
117             mbean.setRecordingSettings(recordingId, newSettings);
118         }
119 
120         @Override
toMap()121         public Map<String, String> toMap() {
122             return getEventSettings();
123         }
124 
getEventSettings()125         private Map<String, String> getEventSettings() {
126             return mbean.getRecordingSettings(recordingId);
127         }
128     }
129 
130     // Reference to stream is released when EventStream::close is called
131     static final class ChunkConsumer implements Consumer<Long> {
132 
133         private final DiskRepository repository;
134 
ChunkConsumer(DiskRepository repository)135         ChunkConsumer(DiskRepository repository) {
136             this.repository = repository;
137         }
138 
139         @Override
accept(Long endNanos)140         public void accept(Long endNanos) {
141             repository.onChunkComplete(endNanos);
142         }
143     }
144 
145     private static final ObjectName OBJECT_NAME = MBeanUtils.createObjectName();
146 
147     final Path path;
148     final FlightRecorderMXBean mbean;
149     final long recordingId;
150     final EventStream stream;
151     @SuppressWarnings("removal")
152     final AccessControlContext accessControllerContext;
153     final DiskRepository repository;
154     final Instant creationTime;
155     final Object lock = new Object();
156     volatile Instant startTime;
157     volatile Instant endTime;
158     volatile boolean closed;
159     private boolean started; // always guarded by lock
160 
161     /**
162      * Creates an event stream that operates against a {@link MBeanServerConnection}
163      * that has a registered {@link FlightRecorderMXBean}.
164      * <p>
165      * To configure event settings, use {@link #setSettings(Map)}.
166      *
167      * @param connection the {@code MBeanServerConnection} where the
168      *                   {@code FlightRecorderMXBean} is registered, not
169      *                   {@code null}
170      *
171      * @throws IOException       if a stream can't be opened, an I/O error occurs
172      *                           when trying to access the repository or the
173      *                           {@code FlightRecorderMXBean}
174      *
175      * @throws SecurityException if a security manager exists and its
176      *                           {@code checkRead} method denies read access to the
177      *                           directory, or files in the directory.
178      */
RemoteRecordingStream(MBeanServerConnection connection)179     public RemoteRecordingStream(MBeanServerConnection connection) throws IOException {
180         this(connection, makeTempDirectory(), true);
181     }
182 
183     /**
184      * Creates an event stream that operates against a {@link MBeanServerConnection}
185      * that has a registered {@link FlightRecorderMXBean}.
186      * <p>
187      * To configure event settings, use {@link #setSettings(Map)}.
188      *
189      * @param connection the {@code MBeanServerConnection} where the
190      *                   {@code FlightRecorderMXBean} is registered, not
191      *                   {@code null}
192      *
193      * @param directory  the directory to store event data that is downloaded, not
194      *                   {@code null}
195      *
196      * @throws IOException       if a stream can't be opened, an I/O error occurs
197      *                           when trying to access the repository or the
198      *                           {@code FlightRecorderMXBean}
199      *
200      * @throws SecurityException if a security manager exists and its
201      *                           {@code checkRead} method denies read access to the
202      *                           directory, or files in the directory.
203      */
RemoteRecordingStream(MBeanServerConnection connection, Path directory)204     public RemoteRecordingStream(MBeanServerConnection connection, Path directory) throws IOException {
205         this(connection, directory, false);
206     }
207 
208     @SuppressWarnings("removal")
RemoteRecordingStream(MBeanServerConnection connection, Path dir, boolean delete)209     private RemoteRecordingStream(MBeanServerConnection connection, Path dir, boolean delete) throws IOException {
210         Objects.requireNonNull(connection);
211         Objects.requireNonNull(dir);
212         accessControllerContext = AccessController.getContext();
213         // Make sure users can't implement malicious version of a Path object.
214         path = Paths.get(dir.toString());
215         if (!Files.exists(path)) {
216             throw new IOException("Download directory doesn't exist");
217         }
218 
219         if (!Files.isDirectory(path)) {
220             throw new IOException("Download location must be a directory");
221         }
222         checkFileAccess(path);
223         creationTime = Instant.now();
224         mbean = createProxy(connection);
225         recordingId = createRecording();
226         stream = ManagementSupport.newEventDirectoryStream(accessControllerContext, path, configurations(mbean));
227         stream.setStartTime(Instant.MIN);
228         repository = new DiskRepository(path, delete);
229         ManagementSupport.setOnChunkCompleteHandler(stream, new ChunkConsumer(repository));
230     }
231 
configurations(FlightRecorderMXBean mbean)232     private List<Configuration> configurations(FlightRecorderMXBean mbean) {
233         List<ConfigurationInfo> cis = mbean.getConfigurations();
234         List<Configuration> confs = new ArrayList<>(cis.size());
235         for (ConfigurationInfo ci : cis) {
236             confs.add(ManagementSupport.newConfiguration(ci.getName(), ci.getLabel(), ci.getDescription(),
237                     ci.getProvider(), ci.getSettings(), ci.getContents()));
238         }
239         return Collections.unmodifiableList(confs);
240     }
241 
242     @Override
onMetadata(Consumer<MetadataEvent> action)243     public void onMetadata(Consumer<MetadataEvent> action) {
244         stream.onMetadata(action);
245     }
246 
checkFileAccess(Path directory)247     private static void checkFileAccess(Path directory) throws IOException {
248         RandomAccessFile f = null;
249         try {
250             Path testFile = directory.resolve("test-access");
251             f = new RandomAccessFile(testFile.toFile(), "rw");
252             f.write(0);
253             f.seek(0);
254             f.read();
255             f.close();
256             Files.delete(testFile);
257         } catch (Exception e) {
258             closeSilently(f);
259             throw new IOException("Could not read/write/delete in directory" + directory + " :" + e.getMessage());
260         }
261     }
262 
closeSilently(RandomAccessFile f)263     private static void closeSilently(RandomAccessFile f) {
264         if (f == null) {
265             return;
266         }
267         try {
268             f.close();
269         } catch (IOException ioe) {
270             // ignore
271         }
272     }
273 
createProxy(MBeanServerConnection connection)274     private static FlightRecorderMXBean createProxy(MBeanServerConnection connection) throws IOException {
275         try {
276             return JMX.newMXBeanProxy(connection, OBJECT_NAME, FlightRecorderMXBean.class);
277         } catch (Exception e) {
278             throw new IOException("Could not create proxy for FlightRecorderMXBean: " + e.getMessage(), e);
279         }
280     }
281 
createRecording()282     private long createRecording() throws IOException {
283         try {
284             long id = mbean.newRecording();
285             Map<String, String> options = new HashMap<>();
286             options.put("name", EventByteStream.NAME + ": " + creationTime);
287             mbean.setRecordingOptions(id, options);
288             return id;
289         } catch (Exception e) {
290             throw new IOException("Could not create new recording: " + e.getMessage(), e);
291         }
292     }
293 
getRecordingOptions()294     private Map<String, String> getRecordingOptions() throws IOException {
295         try {
296             return mbean.getRecordingOptions(recordingId);
297         } catch (Exception e) {
298             throw new IOException("Could not get recording options: " + e.getMessage(), e);
299         }
300     }
301 
302     /**
303      * Replaces all settings for this recording stream.
304      * <p>
305      * The following example connects to a remote host and stream events using
306      * settings from the "default" configuration.
307      *
308      * <pre>
309      * {
310      *     {@literal
311      *
312      *     String host = "com.example";
313      *     int port = 4711;
314      *
315      *     String url = "service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi";
316      *
317      *     JMXServiceURL u = new JMXServiceURL(url);
318      *     JMXConnector c = JMXConnectorFactory.connect(u);
319      *     MBeanServerConnection conn = c.getMBeanServerConnection();
320      *
321      *     try (final var rs = new RemoteRecordingStream(conn)) {
322      *         rs.onMetadata(e -> {
323      *             for (Configuration c : e.getConfigurations()) {
324      *                 if (c.getName().equals("default")) {
325      *                     rs.setSettings(c.getSettings());
326      *                 }
327      *             }
328      *         });
329      *         rs.onEvent(System.out::println);
330      *         rs.start();
331      *     }
332      *
333      * }
334      * </pre>
335      *
336      * @param settings the settings to set, not {@code null}
337      *
338      * @see Recording#setSettings(Map)
339      */
setSettings(Map<String, String> settings)340     public void setSettings(Map<String, String> settings) {
341         Objects.requireNonNull(settings);
342         try {
343             mbean.setRecordingSettings(recordingId, settings);
344         } catch (Exception e) {
345             ManagementSupport.logDebug(e.getMessage());
346             close();
347         }
348     };
349 
350     /**
351      * Disables event with the specified name.
352      * <p>
353      * If multiple events with same name (for example, the same class is loaded in
354      * different class loaders), then all events that match the name are disabled.
355      *
356      * @param name the settings for the event, not {@code null}
357      *
358      * @return an event setting for further configuration, not {@code null}
359      *
360      */
disable(String name)361     public EventSettings disable(String name) {
362         Objects.requireNonNull(name);
363         EventSettings s = ManagementSupport.newEventSettings(new RemoteSettings(mbean, recordingId));
364         try {
365             return s.with(name + "#" + ENABLED, "false");
366         } catch (Exception e) {
367             ManagementSupport.logDebug(e.getMessage());
368             close();
369             return s;
370         }
371     }
372 
373     /**
374      * Enables the event with the specified name.
375      * <p>
376      * If multiple events have the same name (for example, the same class is loaded
377      * in different class loaders), then all events that match the name are enabled.
378      *
379      * @param name the settings for the event, not {@code null}
380      *
381      * @return an event setting for further configuration, not {@code null}
382      *
383      * @see EventType
384      */
enable(String name)385     public EventSettings enable(String name) {
386         Objects.requireNonNull(name);
387         EventSettings s = ManagementSupport.newEventSettings(new RemoteSettings(mbean, recordingId));
388         try {
389             return s.with(name + "#" + ENABLED, "true");
390         } catch (Exception e) {
391             ManagementSupport.logDebug(e.getMessage());
392             close();
393             return s;
394         }
395     }
396 
397     /**
398      * Determines how far back data is kept for the stream.
399      * <p>
400      * To control the amount of recording data stored on disk, the maximum length of
401      * time to retain the data can be specified. Data stored on disk that is older
402      * than the specified length of time is removed by the Java Virtual Machine
403      * (JVM).
404      * <p>
405      * If neither maximum limit or the maximum age is set, the size of the recording
406      * may grow indefinitely if events are not consumed.
407      *
408      * @param maxAge the length of time that data is kept, or {@code null} if
409      *               infinite
410      *
411      * @throws IllegalArgumentException if {@code maxAge} is negative
412      *
413      * @throws IllegalStateException    if the recording is in the {@code CLOSED}
414      *                                  state
415      */
setMaxAge(Duration maxAge)416     public void setMaxAge(Duration maxAge) {
417         Objects.requireNonNull(maxAge);
418         repository.setMaxAge(maxAge);
419     }
420 
421     /**
422      * Determines how much data is kept for the stream.
423      * <p>
424      * To control the amount of recording data that is stored on disk, the maximum
425      * amount of data to retain can be specified. When the maximum limit is
426      * exceeded, the Java Virtual Machine (JVM) removes the oldest chunk to make
427      * room for a more recent chunk.
428      * <p>
429      * If neither maximum limit or the maximum age is set, the size of the recording
430      * may grow indefinitely if events are not consumed.
431      * <p>
432      * The size is measured in bytes.
433      *
434      * @param maxSize the amount of data to retain, {@code 0} if infinite
435      *
436      * @throws IllegalArgumentException if {@code maxSize} is negative
437      *
438      * @throws IllegalStateException    if the recording is in {@code CLOSED} state
439      */
setMaxSize(long maxSize)440     public void setMaxSize(long maxSize) {
441         if (maxSize < 0) {
442             throw new IllegalArgumentException("Max size of recording can't be negative");
443         }
444         repository.setMaxSize(maxSize);
445     }
446 
447     @Override
onEvent(Consumer<RecordedEvent> action)448     public void onEvent(Consumer<RecordedEvent> action) {
449         stream.onEvent(action);
450     }
451 
452     @Override
onEvent(String eventName, Consumer<RecordedEvent> action)453     public void onEvent(String eventName, Consumer<RecordedEvent> action) {
454         stream.onEvent(eventName, action);
455     }
456 
457     @Override
onFlush(Runnable action)458     public void onFlush(Runnable action) {
459         stream.onFlush(action);
460     }
461 
462     @Override
onError(Consumer<Throwable> action)463     public void onError(Consumer<Throwable> action) {
464         stream.onError(action);
465     }
466 
467     @Override
onClose(Runnable action)468     public void onClose(Runnable action) {
469         stream.onClose(action);
470     }
471 
472     @Override
close()473     public void close() {
474         synchronized (lock) { // ensure one closer
475             if (closed) {
476                 return;
477             }
478             closed = true;
479         }
480         ManagementSupport.setOnChunkCompleteHandler(stream, null);
481         stream.close();
482         try {
483             mbean.closeRecording(recordingId);
484         } catch (IOException e) {
485             ManagementSupport.logDebug(e.getMessage());
486         }
487         try {
488             repository.close();
489         } catch (IOException e) {
490             ManagementSupport.logDebug(e.getMessage());
491         }
492     }
493 
494     @Override
remove(Object action)495     public boolean remove(Object action) {
496         return stream.remove(action);
497     }
498 
499     @Override
setReuse(boolean reuse)500     public void setReuse(boolean reuse) {
501         stream.setReuse(reuse);
502     }
503 
504     @Override
setOrdered(boolean ordered)505     public void setOrdered(boolean ordered) {
506         stream.setOrdered(ordered);
507     }
508 
509     @Override
setStartTime(Instant startTime)510     public void setStartTime(Instant startTime) {
511         stream.setStartTime(startTime);
512         this.startTime = startTime;
513     }
514 
515     @Override
setEndTime(Instant endTime)516     public void setEndTime(Instant endTime) {
517         stream.setEndTime(endTime);
518         this.endTime = endTime;
519     }
520 
521     @Override
start()522     public void start() {
523         synchronized (lock) { // ensure one starter
524             ensureStartable();
525             try {
526                 try {
527                     mbean.startRecording(recordingId);
528                 } catch (IllegalStateException ise) {
529                     throw ise;
530                 }
531                 startDownload();
532             } catch (Exception e) {
533                 ManagementSupport.logDebug(e.getMessage());
534                 close();
535                 return;
536             }
537             stream.start();
538             started = true;
539         }
540     }
541 
542     @Override
startAsync()543     public void startAsync() {
544         synchronized (lock) { // ensure one starter
545             ensureStartable();
546             stream.startAsync();
547             try {
548                 mbean.startRecording(recordingId);
549                 startDownload();
550             } catch (Exception e) {
551                 ManagementSupport.logDebug(e.getMessage());
552                 close();
553             }
554             started = true;
555         }
556     }
557 
ensureStartable()558     private void ensureStartable() {
559         if (closed) {
560             throw new IllegalStateException("Event stream is closed");
561         }
562         if (started) {
563             throw new IllegalStateException("Event stream can only be started once");
564         }
565     }
566 
567     /**
568      * Writes recording data to a file.
569      * <p>
570      * The recording stream must be started, but not closed.
571      * <p>
572      * It's highly recommended that a max age or max size is set before
573      * starting the stream. Otherwise, the dump may not contain any events.
574      *
575      * @param destination the location where recording data is written, not
576      *        {@code null}
577      *
578      * @throws IOException if the recording data can't be copied to the specified
579      *         location, or if the stream is closed, or not started.
580      *
581      * @throws SecurityException if a security manager exists and the caller doesn't
582      *         have {@code FilePermission} to write to the destination path
583      *
584      * @see RemoteRecordingStream#setMaxAge(Duration)
585      * @see RemoteRecordingStream#setMaxSize(long)
586      *
587      * @since 17
588      */
dump(Path destination)589     public void dump(Path destination) throws IOException {
590         Objects.requireNonNull(destination);
591         long id = -1;
592         try {
593             FileDump fileDump;
594             synchronized (lock) { // ensure running state while preparing dump
595                 if (closed) {
596                     throw new IOException("Recording stream has been closed, no content to write");
597                 }
598                 if (!started) {
599                     throw new IOException("Recording stream has not been started, no content to write");
600                 }
601                 // Take repository lock to prevent new data to be flushed
602                 // client-side after clone has been created on the server.
603                 synchronized (repository) {
604                     id = mbean.cloneRecording(recordingId, true);
605                     RecordingInfo ri = getRecordingInfo(mbean.getRecordings(), id);
606                     fileDump = repository.newDump(ri.getStopTime());
607                 }
608             }
609             // Write outside lock
610             fileDump.write(destination);
611         } catch (IOException ioe) {
612             throw ioe;
613         } catch (Exception e) {
614             ManagementSupport.logDebug(e.getMessage());
615             close();
616         } finally {
617             if (id != -1) {
618                 try {
619                     mbean.closeRecording(id);
620                 } catch (Exception e) {
621                     ManagementSupport.logDebug(e.getMessage());
622                     close();
623                 }
624             }
625         }
626     }
627 
getRecordingInfo(List<RecordingInfo> infos, long id)628     private RecordingInfo getRecordingInfo(List<RecordingInfo> infos, long id) throws IOException {
629         for (RecordingInfo info : infos) {
630             if (info.getId() == id) {
631                 return info;
632             }
633         }
634         throw new IOException("Unable to find id of dumped recording");
635     }
636 
637     @Override
awaitTermination(Duration timeout)638     public void awaitTermination(Duration timeout) throws InterruptedException {
639         stream.awaitTermination(timeout);
640     }
641 
642     @Override
awaitTermination()643     public void awaitTermination() throws InterruptedException {
644         stream.awaitTermination();
645     }
646 
makeTempDirectory()647     private static Path makeTempDirectory() throws IOException {
648         return Files.createTempDirectory("jfr-streaming");
649     }
650 
startDownload()651     private void startDownload() {
652         String name = "JFR: Download Thread " + creationTime;
653         Thread downLoadThread = new DownLoadThread(this, name);
654         downLoadThread.start();
655     }
656 
isClosed()657     boolean isClosed() {
658         return closed;
659     }
660 }
661