1 /*
2 * Copyright (c) 2020 SAP SE. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 *
23 */
24
25 #include "precompiled.hpp"
26 #include "jvm.h"
27 #include "runtime/arguments.hpp"
28 #include "runtime/mutexLocker.hpp"
29 #include "runtime/os.hpp"
30 #include "runtime/thread.inline.hpp"
31 #include "services/heapDumperCompression.hpp"
32
33
open_writer()34 char const* FileWriter::open_writer() {
35 assert(_fd < 0, "Must not already be open");
36
37 _fd = os::create_binary_file(_path, _overwrite);
38
39 if (_fd < 0) {
40 return os::strerror(errno);
41 }
42
43 return NULL;
44 }
45
~FileWriter()46 FileWriter::~FileWriter() {
47 if (_fd >= 0) {
48 os::close(_fd);
49 _fd = -1;
50 }
51 }
52
write_buf(char * buf,ssize_t size)53 char const* FileWriter::write_buf(char* buf, ssize_t size) {
54 assert(_fd >= 0, "Must be open");
55 assert(size > 0, "Must write at least one byte");
56
57 ssize_t n = (ssize_t) os::write(_fd, buf, (uint) size);
58
59 if (n <= 0) {
60 return os::strerror(errno);
61 }
62
63 return NULL;
64 }
65
66
67 typedef char const* (*GzipInitFunc)(size_t, size_t*, size_t*, int);
68 typedef size_t(*GzipCompressFunc)(char*, size_t, char*, size_t, char*, size_t,
69 int, char*, char const**);
70
71 static GzipInitFunc gzip_init_func;
72 static GzipCompressFunc gzip_compress_func;
73
load_gzip_func(char const * name)74 void* GZipCompressor::load_gzip_func(char const* name) {
75 char path[JVM_MAXPATHLEN];
76 char ebuf[1024];
77 void* handle;
78 MutexLocker locker(Zip_lock, Monitor::_no_safepoint_check_flag);
79
80 if (os::dll_locate_lib(path, sizeof(path), Arguments::get_dll_dir(), "zip")) {
81 handle = os::dll_load(path, ebuf, sizeof ebuf);
82
83 if (handle != NULL) {
84 return os::dll_lookup(handle, name);
85 }
86 }
87
88 return NULL;
89 }
90
init(size_t block_size,size_t * needed_out_size,size_t * needed_tmp_size)91 char const* GZipCompressor::init(size_t block_size, size_t* needed_out_size,
92 size_t* needed_tmp_size) {
93 _block_size = block_size;
94 _is_first = true;
95
96 if (gzip_compress_func == NULL) {
97 gzip_compress_func = (GzipCompressFunc) load_gzip_func("ZIP_GZip_Fully");
98
99 if (gzip_compress_func == NULL) {
100 return "Cannot get ZIP_GZip_Fully function";
101 }
102 }
103
104 if (gzip_init_func == NULL) {
105 gzip_init_func = (GzipInitFunc) load_gzip_func("ZIP_GZip_InitParams");
106
107 if (gzip_init_func == NULL) {
108 return "Cannot get ZIP_GZip_InitParams function";
109 }
110 }
111
112 char const* result = gzip_init_func(block_size, needed_out_size,
113 needed_tmp_size, _level);
114 *needed_out_size += 1024; // Add extra space for the comment in the first chunk.
115
116 return result;
117 }
118
compress(char * in,size_t in_size,char * out,size_t out_size,char * tmp,size_t tmp_size,size_t * compressed_size)119 char const* GZipCompressor::compress(char* in, size_t in_size, char* out, size_t out_size,
120 char* tmp, size_t tmp_size, size_t* compressed_size) {
121 char const* msg = NULL;
122
123 if (_is_first) {
124 char buf[128];
125 // Write the block size used as a comment in the first gzip chunk, so the
126 // code used to read it later can make a good choice of the buffer sizes it uses.
127 jio_snprintf(buf, sizeof(buf), "HPROF BLOCKSIZE=" SIZE_FORMAT, _block_size);
128 *compressed_size = gzip_compress_func(in, in_size, out, out_size, tmp, tmp_size, _level,
129 buf, &msg);
130 _is_first = false;
131 } else {
132 *compressed_size = gzip_compress_func(in, in_size, out, out_size, tmp, tmp_size, _level,
133 NULL, &msg);
134 }
135
136 return msg;
137 }
138
WorkList()139 WorkList::WorkList() {
140 _head._next = &_head;
141 _head._prev = &_head;
142 }
143
insert(WriteWork * before,WriteWork * work)144 void WorkList::insert(WriteWork* before, WriteWork* work) {
145 work->_prev = before;
146 work->_next = before->_next;
147 before->_next = work;
148 work->_next->_prev = work;
149 }
150
remove(WriteWork * work)151 WriteWork* WorkList::remove(WriteWork* work) {
152 if (work != NULL) {
153 assert(work->_next != work, "Invalid next");
154 assert(work->_prev != work, "Invalid prev");
155 work->_prev->_next = work->_next;;
156 work->_next->_prev = work->_prev;
157 work->_next = NULL;
158 work->_prev = NULL;
159 }
160
161 return work;
162 }
163
add_by_id(WriteWork * work)164 void WorkList::add_by_id(WriteWork* work) {
165 if (is_empty()) {
166 add_first(work);
167 } else {
168 WriteWork* last_curr = &_head;
169 WriteWork* curr = _head._next;
170
171 while (curr->_id < work->_id) {
172 last_curr = curr;
173 curr = curr->_next;
174
175 if (curr == &_head) {
176 add_last(work);
177 return;
178 }
179 }
180
181 insert(last_curr, work);
182 }
183 }
184
185
186
CompressionBackend(AbstractWriter * writer,AbstractCompressor * compressor,size_t block_size,size_t max_waste)187 CompressionBackend::CompressionBackend(AbstractWriter* writer,
188 AbstractCompressor* compressor, size_t block_size, size_t max_waste) :
189 _active(false),
190 _err(NULL),
191 _nr_of_threads(0),
192 _works_created(0),
193 _work_creation_failed(false),
194 _id_to_write(0),
195 _next_id(0),
196 _in_size(block_size),
197 _max_waste(max_waste),
198 _out_size(0),
199 _tmp_size(0),
200 _written(0),
201 _writer(writer),
202 _compressor(compressor),
203 _lock(new (std::nothrow) PaddedMonitor(Mutex::leaf, "HProf Compression Backend",
204 true, Mutex::_safepoint_check_never)) {
205 if (_writer == NULL) {
206 set_error("Could not allocate writer");
207 } else if (_lock == NULL) {
208 set_error("Could not allocate lock");
209 } else {
210 set_error(_writer->open_writer());
211 }
212
213 if (_compressor != NULL) {
214 set_error(_compressor->init(_in_size, &_out_size, &_tmp_size));
215 }
216
217 _current = allocate_work(_in_size, _out_size, _tmp_size);
218
219 if (_current == NULL) {
220 set_error("Could not allocate memory for buffer");
221 }
222
223 _active = (_err == NULL);
224 }
225
~CompressionBackend()226 CompressionBackend::~CompressionBackend() {
227 assert(!_active, "Must not be active by now");
228 assert(_nr_of_threads == 0, "Must have no active threads");
229 assert(_to_compress.is_empty() && _finished.is_empty(), "Still work to do");
230
231 free_work_list(&_unused);
232 free_work(_current);
233 assert(_works_created == 0, "All work must have been freed");
234
235 delete _compressor;
236 delete _writer;
237 delete _lock;
238 }
239
deactivate()240 void CompressionBackend::deactivate() {
241 assert(_active, "Must be active");
242
243 MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
244
245 // Make sure we write the last partially filled buffer.
246 if ((_current != NULL) && (_current->_in_used > 0)) {
247 _current->_id = _next_id++;
248 _to_compress.add_last(_current);
249 _current = NULL;
250 ml.notify_all();
251 }
252
253 // Wait for the threads to drain the compression work list and do some work yourself.
254 while (!_to_compress.is_empty()) {
255 do_foreground_work();
256 }
257
258 _active = false;
259 ml.notify_all();
260 }
261
thread_loop()262 void CompressionBackend::thread_loop() {
263 {
264 MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
265 _nr_of_threads++;
266 }
267
268 WriteWork* work;
269 while ((work = get_work()) != NULL) {
270 do_compress(work);
271 finish_work(work);
272 }
273
274 MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
275 _nr_of_threads--;
276 assert(_nr_of_threads >= 0, "Too many threads finished");
277 }
278
set_error(char const * new_error)279 void CompressionBackend::set_error(char const* new_error) {
280 if ((new_error != NULL) && (_err == NULL)) {
281 _err = new_error;
282 }
283 }
284
allocate_work(size_t in_size,size_t out_size,size_t tmp_size)285 WriteWork* CompressionBackend::allocate_work(size_t in_size, size_t out_size,
286 size_t tmp_size) {
287 WriteWork* result = (WriteWork*) os::malloc(sizeof(WriteWork), mtInternal);
288
289 if (result == NULL) {
290 _work_creation_failed = true;
291 return NULL;
292 }
293
294 _works_created++;
295 result->_in = (char*) os::malloc(in_size, mtInternal);
296 result->_in_max = in_size;
297 result->_in_used = 0;
298 result->_out = NULL;
299 result->_tmp = NULL;
300
301 if (result->_in == NULL) {
302 goto fail;
303 }
304
305 if (out_size > 0) {
306 result->_out = (char*) os::malloc(out_size, mtInternal);
307 result->_out_used = 0;
308 result->_out_max = out_size;
309
310 if (result->_out == NULL) {
311 goto fail;
312 }
313 }
314
315 if (tmp_size > 0) {
316 result->_tmp = (char*) os::malloc(tmp_size, mtInternal);
317 result->_tmp_max = tmp_size;
318
319 if (result->_tmp == NULL) {
320 goto fail;
321 }
322 }
323
324 return result;
325
326 fail:
327 free_work(result);
328 _work_creation_failed = true;
329 return NULL;
330 }
331
free_work(WriteWork * work)332 void CompressionBackend::free_work(WriteWork* work) {
333 if (work != NULL) {
334 os::free(work->_in);
335 os::free(work->_out);
336 os::free(work->_tmp);
337 os::free(work);
338 --_works_created;
339 }
340 }
341
free_work_list(WorkList * list)342 void CompressionBackend::free_work_list(WorkList* list) {
343 while (!list->is_empty()) {
344 free_work(list->remove_first());
345 }
346 }
347
do_foreground_work()348 void CompressionBackend::do_foreground_work() {
349 assert(!_to_compress.is_empty(), "Must have work to do");
350 assert(_lock->owned_by_self(), "Must have the lock");
351
352 WriteWork* work = _to_compress.remove_first();
353 MutexUnlocker mu(_lock, Mutex::_no_safepoint_check_flag);
354 do_compress(work);
355 finish_work(work);
356 }
357
get_work()358 WriteWork* CompressionBackend::get_work() {
359 MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
360
361 while (_active && _to_compress.is_empty()) {
362 ml.wait();
363 }
364
365 return _to_compress.remove_first();
366 }
367
get_new_buffer(char ** buffer,size_t * used,size_t * max)368 void CompressionBackend::get_new_buffer(char** buffer, size_t* used, size_t* max) {
369 if (_active) {
370 MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
371
372 if (*used > 0) {
373 _current->_in_used += *used;
374
375 // Check if we do not waste more than _max_waste. If yes, write the buffer.
376 // Otherwise return the rest of the buffer as the new buffer.
377 if (_current->_in_max - _current->_in_used <= _max_waste) {
378 _current->_id = _next_id++;
379 _to_compress.add_last(_current);
380 _current = NULL;
381 ml.notify_all();
382 } else {
383 *buffer = _current->_in + _current->_in_used;
384 *used = 0;
385 *max = _current->_in_max - _current->_in_used;
386
387 return;
388 }
389 }
390
391 while ((_current == NULL) && _unused.is_empty() && _active) {
392 // Add more work objects if needed.
393 if (!_work_creation_failed && (_works_created <= _nr_of_threads)) {
394 WriteWork* work = allocate_work(_in_size, _out_size, _tmp_size);
395
396 if (work != NULL) {
397 _unused.add_first(work);
398 }
399 } else if (!_to_compress.is_empty() && (_nr_of_threads == 0)) {
400 do_foreground_work();
401 } else {
402 ml.wait();
403 }
404 }
405
406 if (_current == NULL) {
407 _current = _unused.remove_first();
408 }
409
410 if (_current != NULL) {
411 _current->_in_used = 0;
412 _current->_out_used = 0;
413 *buffer = _current->_in;
414 *used = 0;
415 *max = _current->_in_max;
416
417 return;
418 }
419 }
420
421 *buffer = NULL;
422 *used = 0;
423 *max = 0;
424
425 return;
426 }
427
do_compress(WriteWork * work)428 void CompressionBackend::do_compress(WriteWork* work) {
429 if (_compressor != NULL) {
430 char const* msg = _compressor->compress(work->_in, work->_in_used, work->_out,
431 work->_out_max,
432 work->_tmp, _tmp_size, &work->_out_used);
433
434 if (msg != NULL) {
435 MutexLocker ml(_lock, Mutex::_no_safepoint_check_flag);
436 set_error(msg);
437 }
438 }
439 }
440
finish_work(WriteWork * work)441 void CompressionBackend::finish_work(WriteWork* work) {
442 MonitorLocker ml(_lock, Mutex::_no_safepoint_check_flag);
443
444 _finished.add_by_id(work);
445
446 // Write all finished works as far as we can.
447 while (!_finished.is_empty() && (_finished.first()->_id == _id_to_write)) {
448 WriteWork* to_write = _finished.remove_first();
449 size_t size = _compressor == NULL ? to_write->_in_used : to_write->_out_used;
450 char* p = _compressor == NULL ? to_write->_in : to_write->_out;
451 char const* msg = NULL;
452
453 if (_err == NULL) {
454 _written += size;
455 MutexUnlocker mu(_lock, Mutex::_no_safepoint_check_flag);
456 msg = _writer->write_buf(p, (ssize_t) size);
457 }
458
459 set_error(msg);
460 _unused.add_first(to_write);
461 _id_to_write++;
462 }
463
464 ml.notify_all();
465 }
466