1 /*
2  * Copyright (c) 2020 SAP SE. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  *
23  */
24 
25 #include "precompiled.hpp"
26 #include "jvm.h"
27 #include "runtime/arguments.hpp"
28 #include "runtime/mutexLocker.hpp"
29 #include "runtime/os.hpp"
30 #include "runtime/thread.inline.hpp"
31 #include "services/heapDumperCompression.hpp"
32 
33 
open_writer()34 char const* FileWriter::open_writer() {
35   assert(_fd < 0, "Must not already be open");
36 
37   _fd = os::create_binary_file(_path, _overwrite);
38 
39   if (_fd < 0) {
40     return os::strerror(errno);
41   }
42 
43   return NULL;
44 }
45 
~FileWriter()46 FileWriter::~FileWriter() {
47   if (_fd >= 0) {
48     os::close(_fd);
49     _fd = -1;
50   }
51 }
52 
write_buf(char * buf,ssize_t size)53 char const* FileWriter::write_buf(char* buf, ssize_t size) {
54   assert(_fd >= 0, "Must be open");
55   assert(size > 0, "Must write at least one byte");
56 
57   ssize_t n = (ssize_t) os::write(_fd, buf, (uint) size);
58 
59   if (n <= 0) {
60     return os::strerror(errno);
61   }
62 
63   return NULL;
64 }
65 
66 
67 typedef char const* (*GzipInitFunc)(size_t, size_t*, size_t*, int);
68 typedef size_t(*GzipCompressFunc)(char*, size_t, char*, size_t, char*, size_t,
69                                   int, char*, char const**);
70 
71 static GzipInitFunc gzip_init_func;
72 static GzipCompressFunc gzip_compress_func;
73 
load_gzip_func(char const * name)74 void* GZipCompressor::load_gzip_func(char const* name) {
75   char path[JVM_MAXPATHLEN];
76   char ebuf[1024];
77   void* handle;
78   MutexLocker locker(Zip_lock, Monitor::_no_safepoint_check_flag);
79 
80   if (os::dll_locate_lib(path, sizeof(path), Arguments::get_dll_dir(), "zip")) {
81     handle = os::dll_load(path, ebuf, sizeof ebuf);
82 
83     if (handle != NULL) {
84       return os::dll_lookup(handle, name);
85     }
86   }
87 
88   return NULL;
89 }
90 
init(size_t block_size,size_t * needed_out_size,size_t * needed_tmp_size)91 char const* GZipCompressor::init(size_t block_size, size_t* needed_out_size,
92                                  size_t* needed_tmp_size) {
93   _block_size = block_size;
94   _is_first = true;
95 
96   if (gzip_compress_func == NULL) {
97     gzip_compress_func = (GzipCompressFunc) load_gzip_func("ZIP_GZip_Fully");
98 
99     if (gzip_compress_func == NULL) {
100       return "Cannot get ZIP_GZip_Fully function";
101     }
102   }
103 
104   if (gzip_init_func == NULL) {
105     gzip_init_func = (GzipInitFunc) load_gzip_func("ZIP_GZip_InitParams");
106 
107     if (gzip_init_func == NULL) {
108       return "Cannot get ZIP_GZip_InitParams function";
109     }
110   }
111 
112   char const* result = gzip_init_func(block_size, needed_out_size,
113                                       needed_tmp_size, _level);
114   *needed_out_size += 1024; // Add extra space for the comment in the first chunk.
115 
116   return result;
117 }
118 
compress(char * in,size_t in_size,char * out,size_t out_size,char * tmp,size_t tmp_size,size_t * compressed_size)119 char const* GZipCompressor::compress(char* in, size_t in_size, char* out, size_t out_size,
120                                      char* tmp, size_t tmp_size, size_t* compressed_size) {
121   char const* msg = NULL;
122 
123   if (_is_first) {
124     char buf[128];
125     // Write the block size used as a comment in the first gzip chunk, so the
126     // code used to read it later can make a good choice of the buffer sizes it uses.
127     jio_snprintf(buf, sizeof(buf), "HPROF BLOCKSIZE=" SIZE_FORMAT, _block_size);
128     *compressed_size = gzip_compress_func(in, in_size, out, out_size, tmp, tmp_size, _level,
129                                           buf, &msg);
130     _is_first = false;
131   } else {
132     *compressed_size = gzip_compress_func(in, in_size, out, out_size, tmp, tmp_size, _level,
133                                           NULL, &msg);
134   }
135 
136   return msg;
137 }
138 
WorkList()139 WorkList::WorkList() {
140   _head._next = &_head;
141   _head._prev = &_head;
142 }
143 
insert(WriteWork * before,WriteWork * work)144 void WorkList::insert(WriteWork* before, WriteWork* work) {
145   work->_prev = before;
146   work->_next = before->_next;
147   before->_next = work;
148   work->_next->_prev = work;
149 }
150 
remove(WriteWork * work)151 WriteWork* WorkList::remove(WriteWork* work) {
152   if (work != NULL) {
153     assert(work->_next != work, "Invalid next");
154     assert(work->_prev != work, "Invalid prev");
155     work->_prev->_next = work->_next;;
156     work->_next->_prev = work->_prev;
157     work->_next = NULL;
158     work->_prev = NULL;
159   }
160 
161   return work;
162 }
163 
add_by_id(WriteWork * work)164 void WorkList::add_by_id(WriteWork* work) {
165   if (is_empty()) {
166     add_first(work);
167   } else {
168     WriteWork* last_curr = &_head;
169     WriteWork* curr = _head._next;
170 
171     while (curr->_id < work->_id) {
172       last_curr = curr;
173       curr = curr->_next;
174 
175       if (curr == &_head) {
176         add_last(work);
177         return;
178       }
179     }
180 
181     insert(last_curr, work);
182   }
183 }
184 
185 
186 
CompressionBackend(AbstractWriter * writer,AbstractCompressor * compressor,size_t block_size,size_t max_waste)187 CompressionBackend::CompressionBackend(AbstractWriter* writer,
188      AbstractCompressor* compressor, size_t block_size, size_t max_waste) :
189   _active(false),
190   _err(NULL),
191   _nr_of_threads(0),
192   _works_created(0),
193   _work_creation_failed(false),
194   _id_to_write(0),
195   _next_id(0),
196   _in_size(block_size),
197   _max_waste(max_waste),
198   _out_size(0),
199   _tmp_size(0),
200   _written(0),
201   _writer(writer),
202   _compressor(compressor),
203   _lock(new (std::nothrow) PaddedMonitor(Mutex::leaf, "HProf Compression Backend",
204     true, Mutex::_safepoint_check_never)) {
205   if (_writer == NULL) {
206     set_error("Could not allocate writer");
207   } else if (_lock == NULL) {
208     set_error("Could not allocate lock");
209   } else {
210     set_error(_writer->open_writer());
211   }
212 
213   if (_compressor != NULL) {
214     set_error(_compressor->init(_in_size, &_out_size, &_tmp_size));
215   }
216 
217   _current = allocate_work(_in_size, _out_size, _tmp_size);
218 
219   if (_current == NULL) {
220     set_error("Could not allocate memory for buffer");
221   }
222 
223   _active = (_err == NULL);
224 }
225 
~CompressionBackend()226 CompressionBackend::~CompressionBackend() {
227   assert(!_active, "Must not be active by now");
228   assert(_nr_of_threads == 0, "Must have no active threads");
229   assert(_to_compress.is_empty() && _finished.is_empty(), "Still work to do");
230 
231   free_work_list(&_unused);
232   free_work(_current);
233   assert(_works_created == 0, "All work must have been freed");
234 
235   delete _compressor;
236   delete _writer;
237   delete _lock;
238 }
239 
deactivate()240 void CompressionBackend::deactivate() {
241   assert(_active, "Must be active");
242 
243   MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
244 
245   // Make sure we write the last partially filled buffer.
246   if ((_current != NULL) && (_current->_in_used > 0)) {
247     _current->_id = _next_id++;
248     _to_compress.add_last(_current);
249     _current = NULL;
250     ml.notify_all();
251   }
252 
253   // Wait for the threads to drain the compression work list and do some work yourself.
254   while (!_to_compress.is_empty()) {
255     do_foreground_work();
256   }
257 
258   _active = false;
259   ml.notify_all();
260 }
261 
thread_loop()262 void CompressionBackend::thread_loop() {
263   {
264     MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
265     _nr_of_threads++;
266   }
267 
268   WriteWork* work;
269   while ((work = get_work()) != NULL) {
270     do_compress(work);
271     finish_work(work);
272   }
273 
274   MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
275   _nr_of_threads--;
276   assert(_nr_of_threads >= 0, "Too many threads finished");
277 }
278 
set_error(char const * new_error)279 void CompressionBackend::set_error(char const* new_error) {
280   if ((new_error != NULL) && (_err == NULL)) {
281     _err = new_error;
282   }
283 }
284 
allocate_work(size_t in_size,size_t out_size,size_t tmp_size)285 WriteWork* CompressionBackend::allocate_work(size_t in_size, size_t out_size,
286                                              size_t tmp_size) {
287   WriteWork* result = (WriteWork*) os::malloc(sizeof(WriteWork), mtInternal);
288 
289   if (result == NULL) {
290     _work_creation_failed = true;
291     return NULL;
292   }
293 
294   _works_created++;
295   result->_in = (char*) os::malloc(in_size, mtInternal);
296   result->_in_max = in_size;
297   result->_in_used = 0;
298   result->_out = NULL;
299   result->_tmp = NULL;
300 
301   if (result->_in == NULL) {
302     goto fail;
303   }
304 
305   if (out_size > 0) {
306     result->_out = (char*) os::malloc(out_size, mtInternal);
307     result->_out_used = 0;
308     result->_out_max = out_size;
309 
310     if (result->_out == NULL) {
311       goto fail;
312     }
313   }
314 
315   if (tmp_size > 0) {
316     result->_tmp = (char*) os::malloc(tmp_size, mtInternal);
317     result->_tmp_max = tmp_size;
318 
319     if (result->_tmp == NULL) {
320       goto fail;
321     }
322   }
323 
324   return result;
325 
326 fail:
327   free_work(result);
328   _work_creation_failed = true;
329   return NULL;
330 }
331 
free_work(WriteWork * work)332 void CompressionBackend::free_work(WriteWork* work) {
333   if (work != NULL) {
334     os::free(work->_in);
335     os::free(work->_out);
336     os::free(work->_tmp);
337     os::free(work);
338     --_works_created;
339   }
340 }
341 
free_work_list(WorkList * list)342 void CompressionBackend::free_work_list(WorkList* list) {
343   while (!list->is_empty()) {
344     free_work(list->remove_first());
345   }
346 }
347 
do_foreground_work()348 void CompressionBackend::do_foreground_work() {
349   assert(!_to_compress.is_empty(), "Must have work to do");
350   assert(_lock->owned_by_self(), "Must have the lock");
351 
352   WriteWork* work = _to_compress.remove_first();
353   MutexUnlocker mu(_lock, Mutex::_no_safepoint_check_flag);
354   do_compress(work);
355   finish_work(work);
356 }
357 
get_work()358 WriteWork* CompressionBackend::get_work() {
359   MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
360 
361   while (_active && _to_compress.is_empty()) {
362     ml.wait();
363   }
364 
365   return _to_compress.remove_first();
366 }
367 
get_new_buffer(char ** buffer,size_t * used,size_t * max)368 void CompressionBackend::get_new_buffer(char** buffer, size_t* used, size_t* max) {
369   if (_active) {
370     MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
371 
372     if (*used > 0) {
373       _current->_in_used += *used;
374 
375       // Check if we do not waste more than _max_waste. If yes, write the buffer.
376       // Otherwise return the rest of the buffer as the new buffer.
377       if (_current->_in_max - _current->_in_used <= _max_waste) {
378         _current->_id = _next_id++;
379         _to_compress.add_last(_current);
380         _current = NULL;
381         ml.notify_all();
382       } else {
383         *buffer = _current->_in + _current->_in_used;
384         *used = 0;
385         *max = _current->_in_max - _current->_in_used;
386 
387         return;
388       }
389     }
390 
391     while ((_current == NULL) && _unused.is_empty() && _active) {
392       // Add more work objects if needed.
393       if (!_work_creation_failed && (_works_created <= _nr_of_threads)) {
394         WriteWork* work = allocate_work(_in_size, _out_size, _tmp_size);
395 
396         if (work != NULL) {
397           _unused.add_first(work);
398         }
399       } else if (!_to_compress.is_empty() && (_nr_of_threads == 0)) {
400         do_foreground_work();
401       } else {
402         ml.wait();
403       }
404     }
405 
406     if (_current == NULL) {
407       _current = _unused.remove_first();
408     }
409 
410     if (_current != NULL) {
411       _current->_in_used = 0;
412       _current->_out_used = 0;
413       *buffer = _current->_in;
414       *used = 0;
415       *max = _current->_in_max;
416 
417       return;
418     }
419   }
420 
421   *buffer = NULL;
422   *used = 0;
423   *max = 0;
424 
425   return;
426 }
427 
do_compress(WriteWork * work)428 void CompressionBackend::do_compress(WriteWork* work) {
429   if (_compressor != NULL) {
430     char const* msg = _compressor->compress(work->_in, work->_in_used, work->_out,
431                                             work->_out_max,
432     work->_tmp, _tmp_size, &work->_out_used);
433 
434     if (msg != NULL) {
435       MutexLocker ml(_lock, Mutex::_no_safepoint_check_flag);
436       set_error(msg);
437     }
438   }
439 }
440 
finish_work(WriteWork * work)441 void CompressionBackend::finish_work(WriteWork* work) {
442   MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
443 
444   _finished.add_by_id(work);
445 
446   // Write all finished works as far as we can.
447   while (!_finished.is_empty() && (_finished.first()->_id == _id_to_write)) {
448     WriteWork* to_write = _finished.remove_first();
449     size_t size = _compressor == NULL ? to_write->_in_used : to_write->_out_used;
450     char* p = _compressor == NULL ? to_write->_in : to_write->_out;
451     char const* msg = NULL;
452 
453     if (_err == NULL) {
454       _written += size;
455       MutexUnlocker mu(_lock, Mutex::_no_safepoint_check_flag);
456       msg = _writer->write_buf(p, (ssize_t) size);
457     }
458 
459     set_error(msg);
460     _unused.add_first(to_write);
461     _id_to_write++;
462   }
463 
464   ml.notify_all();
465 }
466