1 /*
2 tmp_file.c - write to and read from a temporary binary file
3 for fast storage plus added compression.
4
5 Copyright (C) 2017, 2018 Genome Research Ltd.
6
7 Author: Andrew Whitwham <aw7@sanger.ac.uk>
8
9 Permission is hereby granted, free of charge, to any person obtaining a copy
10 of this software and associated documentation files (the "Software"), to deal
11 in the Software without restriction, including without limitation the rights
12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the Software is
14 furnished to do so, subject to the following conditions:
15
16 The above copyright notice and this permission notice shall be included in
17 all copies or substantial portions of the Software.
18
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
22 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25 DEALINGS IN THE SOFTWARE
26 */
27
28 #include <config.h>
29
30 #include <stdio.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <stdlib.h>
34 #include <stdarg.h>
35 #include <fcntl.h>
36 #include <errno.h>
37
38 #ifdef _WIN32
39 #include <windows.h>
40 #endif /* _WIN32 */
41
42 #include "tmp_file.h"
43 #include "htslib/sam.h"
44
45
tmp_print_error(tmp_file_t * tmp,const char * fmt,...)46 static void tmp_print_error(tmp_file_t *tmp, const char *fmt, ...) {
47 va_list argp;
48
49 if (tmp->verbose) {
50 va_start(argp, fmt);
51 vfprintf(stderr, fmt, argp);
52 va_end(argp);
53 }
54 }
55
56
tmp_file_init(tmp_file_t * tmp,int verbose)57 static int tmp_file_init(tmp_file_t *tmp, int verbose) {
58 tmp->stream = LZ4_createStream();
59 tmp->data_size = 0;
60 tmp->group_size = TMP_SAM_GROUP_SIZE;
61 tmp->input_size = 0;
62 tmp->read_size = 0;
63 tmp->output_size = 0;
64 tmp->entry_number = 0;
65 tmp->offset = 0;
66 tmp->max_data_size = TMP_SAM_MAX_DATA + sizeof(bam1_t); // arbitrary but growable
67 tmp->ring_buffer_size = TMP_SAM_RING_SIZE; // arbitrary (min 64K) but growable
68 tmp->comp_buffer_size = LZ4_COMPRESSBOUND(tmp->max_data_size * tmp->group_size);
69 tmp->ring_buffer = malloc(sizeof(uint8_t) * tmp->ring_buffer_size);
70 tmp->ring_index = tmp->ring_buffer;
71 tmp->comp_buffer = malloc(tmp->comp_buffer_size);
72 tmp->verbose = verbose;
73 tmp->dict = NULL;
74 tmp->groups_written = 0;
75
76 if (!tmp->ring_buffer || !tmp->comp_buffer || !tmp->stream) {
77 tmp_print_error(tmp, "[tmp_file] Error: unable to allocate compression buffers.\n");
78 return TMP_SAM_MEM_ERROR;
79 }
80
81 return TMP_SAM_OK;
82 }
83
84
85 /*
86 * Opens the temp file and initialises memory.
87 * Verbose mode prints out error messages to stderr.
88 * Returns 0 on success, a negative number on failure.
89 */
tmp_file_open_write(tmp_file_t * tmp,char * tmp_name,int verbose)90 int tmp_file_open_write(tmp_file_t *tmp, char *tmp_name, int verbose) {
91 int ret;
92 unsigned int count = 1;
93 const unsigned int max_count = 100000; // more tries than this then something else is wrong
94 int fd;
95
96 if ((ret = tmp_file_init(tmp, verbose))) {
97 return ret;
98 }
99
100 // make space to write extended file name
101 if ((tmp->name = malloc(strlen(tmp_name) + 7)) == NULL) {
102 tmp_print_error(tmp, "[tmp_file] Error: unable to allocate memory for %s.\n", tmp_name);
103 return TMP_SAM_MEM_ERROR;
104 }
105
106 // make sure temp file has a unique name
107 while (count < max_count) {
108 sprintf(tmp->name, "%s.%d", tmp_name, count);
109
110
111 #ifdef _WIN32
112 if ((fd = _open(tmp->name, O_RDWR|O_CREAT|O_EXCL|O_BINARY|O_TEMPORARY, 0600)) == -1) {
113 #else
114 if ((fd = open(tmp->name, O_RDWR|O_CREAT|O_EXCL, 0600)) == -1) {
115 #endif /* _WIN32 */
116
117 if (errno != EEXIST) {
118 tmp_print_error(tmp, "[tmp_file] Error: unable to create tmp file %s.\n", tmp->name);
119 return TMP_SAM_FILE_ERROR;
120 }
121
122 count++;
123 continue;
124 }
125
126 break;
127 }
128
129 if (count >= max_count) {
130 tmp_print_error(tmp, "[tmp_file] Error: unable to create unique temp file.\n");
131 return TMP_SAM_FILE_ERROR;
132 }
133
134 if ((tmp->fp = fdopen(fd, "w+b")) == NULL) {
135 tmp_print_error(tmp, "[tmp_file] Error: unable to open write file %s.\n", tmp->name);
136 return TMP_SAM_FILE_ERROR;
137 }
138
139 #ifndef _WIN32
140 unlink(tmp->name); // should auto delete when closed on linux
141 #endif
142
143 return TMP_SAM_OK;
144 }
145
146
147 /*
148 * The ring buffer stores precompressionn/post decompression data. LZ4 requires that
149 * previous data (64K worth) be available for efficient compression. This function grows
150 * the ring buffer when needed.
151 * Returns 0 on success, a negative number on failure.
152 */
153 static int tmp_file_grow_ring_buffer(tmp_file_t *tmp, size_t new_size) {
154 // save the dictionary so lz4 can continue to function
155 int dict_size = 64 * 1024; // 64K max size
156
157 if (tmp->groups_written) {
158 // if compression has been done then there is a dictionary to save
159
160 if (tmp->dict == NULL) {
161
162 if ((tmp->dict = malloc(sizeof(char) * dict_size)) == NULL) {
163 tmp_print_error(tmp, "[tmp_file] Error: unable to allocate memory for compression dictionary.\n");
164 return TMP_SAM_MEM_ERROR;
165 }
166 }
167
168 if (LZ4_saveDict(tmp->stream, tmp->dict, dict_size) == 0) {
169 tmp_print_error(tmp, "[tmp_file] Error: unable to save compression dictionary.\n");
170 return TMP_SAM_LZ4_ERROR;
171 }
172 }
173
174 if ((tmp->ring_buffer = realloc(tmp->ring_buffer, sizeof(char) * new_size)) == NULL) {
175 tmp_print_error(tmp, "[tmp_file] Error: unable to reallocate ring buffer.\n");
176 return TMP_SAM_MEM_ERROR;
177 }
178
179 tmp->ring_buffer_size = new_size;
180
181 return TMP_SAM_OK;
182 }
183
184
185 /*
186 * This does the actual compression and writing to a file. The file format consists of a
187 * single size_t for the size of the compressed data followed by the data itself.
188 * Returns 0 on success, a negative number on failure.
189 */
190 static int tmp_file_write_to_file(tmp_file_t *tmp) {
191 size_t comp_size;
192
193 if (tmp->input_size > tmp->max_data_size) {
194 tmp->max_data_size += tmp->input_size + sizeof(bam1_t);
195 tmp->comp_buffer_size = LZ4_COMPRESSBOUND(tmp->max_data_size);
196
197 if ((tmp->comp_buffer = realloc(tmp->comp_buffer, sizeof(char) * tmp->comp_buffer_size)) == NULL) {
198 tmp_print_error(tmp, "[tmp_file] Error: unable to reallocate compression buffer.\n");
199 return TMP_SAM_MEM_ERROR;
200 }
201
202 // make sure the ring buffer is big enough to accommodate the new max_data_size
203 if (tmp->ring_buffer_size < tmp->max_data_size * 5) {
204 int ret;
205 if ((ret = tmp_file_grow_ring_buffer(tmp, tmp->max_data_size * 5))) {
206 return ret;
207 }
208 }
209 }
210
211 tmp->ring_index = tmp->ring_buffer + tmp->offset;
212
213 comp_size = LZ4_compress_fast_continue(tmp->stream, (const char *)tmp->ring_index,
214 tmp->comp_buffer, tmp->input_size, tmp->comp_buffer_size, 1);
215
216 if (comp_size == 0) {
217 tmp_print_error(tmp, "[tmp_file] Error: compression failed.\n");
218 return TMP_SAM_LZ4_ERROR;
219 }
220
221 if (fwrite(&comp_size, sizeof(size_t), 1, tmp->fp) < 1) {
222 tmp_print_error(tmp, "[tmp_file] Error: tmp file write size failed.\n");
223 return TMP_SAM_FILE_ERROR;
224 }
225
226 if (fwrite(tmp->comp_buffer, sizeof(char), comp_size, tmp->fp) < comp_size) {
227 tmp_print_error(tmp, "[tmp_file] Error: tmp file write data failed.\n");
228 return TMP_SAM_FILE_ERROR;
229 }
230
231 tmp->offset += tmp->input_size;
232
233 if (tmp->offset >= tmp->ring_buffer_size - tmp->max_data_size)
234 tmp->offset = 0;
235
236 tmp->input_size = 0;
237 tmp->entry_number = 0;
238 tmp->groups_written++;
239
240 return TMP_SAM_OK;
241 }
242
243
244 /*
245 * Stores an in memory bam structure for writing and if enough are gathered together writes
246 * it to a file. Multiple alignments compress better that single ones though after a certain number
247 * there is a law of diminishing returns.
248 * Returns 0 on success, a negative number on failure.
249 */
250 int tmp_file_write(tmp_file_t *tmp, bam1_t *inbam) {
251
252 if ((tmp->offset + tmp->input_size + sizeof(bam1_t) + inbam->l_data) >= tmp->ring_buffer_size) {
253 int ret;
254
255 if ((ret = tmp_file_grow_ring_buffer(tmp, (tmp->offset + tmp->input_size + sizeof(bam1_t) + inbam->l_data) * 2))) {
256 tmp_print_error(tmp, "[tmp_file] Error: input line too big. (%ld).\n",
257 (tmp->input_size + inbam->l_data));
258
259 return ret;
260 }
261 }
262
263 tmp->ring_index = tmp->ring_buffer + tmp->offset + tmp->input_size;
264
265 // copy data into the ring buffer
266 memcpy(tmp->ring_index, inbam, sizeof(bam1_t));
267 memcpy(tmp->ring_index + sizeof(bam1_t) , inbam->data, inbam->l_data);
268 tmp->input_size += sizeof(bam1_t) + inbam->l_data;
269 tmp->entry_number++;
270
271 if (tmp->entry_number == tmp->group_size) {
272 // actually write out the data
273 int ret;
274
275 if ((ret = tmp_file_write_to_file(tmp))) {
276 return ret;
277 }
278 }
279
280 return TMP_SAM_OK;
281 }
282
283
284 /*
285 * Marks the end of file writing. Adds a size_t 0 to mark the end of
286 * the file. Companion function to tmp_file_begin_read below.
287 * Returns 0 on success, a negative number on failure.
288 */
289 int tmp_file_end_write(tmp_file_t *tmp) {
290 size_t terminator = 0;
291
292 if (tmp->entry_number) {
293 int ret;
294
295 if ((ret = tmp_file_write_to_file(tmp))) {
296 return ret;
297 }
298 }
299
300 if (fwrite(&terminator, sizeof(size_t), 1, tmp->fp) < 1) {
301 tmp_print_error(tmp, "[tmp_file] Error: tmp file write terminator failed.\n");
302 return TMP_SAM_FILE_ERROR;
303 }
304
305 fflush(tmp->fp);
306
307 LZ4_freeStream(tmp->stream);
308
309 return TMP_SAM_OK;
310 }
311
312
313 /*
314 * Prepares the file for reading.
315 * Companion function to tmp_file_end_write above.
316 * Returns 0 on success, a negative number on failure.
317 */
318 int tmp_file_begin_read(tmp_file_t *tmp) {
319
320 rewind(tmp->fp);
321
322 tmp->dstream = LZ4_createStreamDecode();
323 tmp->offset = 0;
324 tmp->entry_number = tmp->group_size;
325
326 if (!tmp->dstream) {
327 tmp_print_error(tmp, "[tmp_file] Error: unable to allocate compression stream.\n");
328 return TMP_SAM_MEM_ERROR;
329 }
330
331 return TMP_SAM_OK;
332 }
333
334
335 /*
336 * Read the next alignment, either from memory or from a file.
337 * Returns size of entry on success, 0 on end of file or a negative on error.
338 */
339 int tmp_file_read(tmp_file_t *tmp, bam1_t *inbam) {
340 int entry_size;
341 uint8_t *data = inbam->data;
342
343 /* while tmp_file_read assumes that the same bam1_t variable
344 is being used in each call, this may not be the case. So
345 default to the lowest memory size for safety. */
346 if (tmp->data_size > inbam->m_data) {
347 tmp->data_size = inbam->m_data;
348 }
349
350 if (tmp->entry_number == tmp->group_size) {
351 // read more data
352 size_t comp_size;
353
354 if (fread(&comp_size, sizeof(size_t), 1, tmp->fp) == 0 || comp_size == 0) {
355 return TMP_SAM_OK;
356 }
357
358 if (tmp->offset >= tmp->ring_buffer_size - tmp->max_data_size)
359 tmp->offset = 0;
360
361 tmp->ring_index = tmp->ring_buffer + tmp->offset;
362
363 if (fread(tmp->comp_buffer, sizeof(char), comp_size, tmp->fp) > comp_size) {
364 tmp_print_error(tmp, "[tmp_file] Error: error reading compressed data.\n");
365 return TMP_SAM_FILE_ERROR;
366 }
367
368 tmp->output_size = LZ4_decompress_safe_continue(tmp->dstream, tmp->comp_buffer,
369 (char *)tmp->ring_index, comp_size, tmp->max_data_size);
370
371 if (tmp->output_size == 0) {
372 tmp_print_error(tmp, "[tmp_file] Error: decompression failed.\n");
373 return TMP_SAM_LZ4_ERROR;
374 }
375
376 tmp->entry_number = 0;
377 tmp->read_size = 0;
378 }
379
380 tmp->ring_index = tmp->ring_buffer + tmp->offset;
381 memcpy(inbam, tmp->ring_index, sizeof(bam1_t));
382 inbam->data = data; // put the pointer to real bam data back
383
384 if ((unsigned int)inbam->l_data > tmp->data_size) {
385 uint8_t *tmp_data;
386 tmp->data_size = inbam->l_data; kroundup32(tmp->data_size);
387
388 if ((tmp_data = realloc(inbam->data, sizeof(uint8_t) * tmp->data_size)) == NULL) {
389 tmp_print_error(tmp, "[tmp_file] Error: unable to allocate tmp bam data memory.\n");
390 return TMP_SAM_MEM_ERROR;
391 }
392
393 inbam->data = tmp_data;
394 }
395
396 inbam->m_data = tmp->data_size; // set to the actual data size
397
398 entry_size = sizeof(bam1_t);
399
400 memcpy(inbam->data, tmp->ring_index + entry_size, inbam->l_data);
401 entry_size += inbam->l_data;
402
403 tmp->offset += entry_size;
404 tmp->read_size += entry_size;
405 tmp->entry_number++;
406
407 if (tmp->read_size > tmp->output_size) {
408 tmp_print_error(tmp, "[tmp_file] Error: wrong size of data returned RS:%ld OS:%ld EN:%ld GS:%ld.\n",
409 tmp->read_size, tmp->output_size, tmp->entry_number, tmp->group_size);
410 return TMP_SAM_LZ4_ERROR;
411 }
412
413 if (tmp->read_size == tmp->output_size && tmp->entry_number != tmp->group_size) {
414 // hopefully the last entries in the read file
415 tmp->entry_number = tmp->group_size;
416 }
417
418 return entry_size;
419 }
420
421
422 /*
423 * Frees up memory, closes the file and deletes it.
424 * Returns 0 on success or EOF on failure.
425 */
426 int tmp_file_destroy(tmp_file_t *tmp) {
427 int ret = 0;
428
429 ret = fclose(tmp->fp);
430
431 LZ4_freeStreamDecode(tmp->dstream);
432 free(tmp->ring_buffer);
433 free(tmp->comp_buffer);
434 free(tmp->name);
435 free(tmp->dict);
436
437 return ret;
438 }
439