1 /*
2     tmp_file.c - write to and read from a temporary binary file
3     for fast storage plus added compression.
4 
5     Copyright (C) 2017, 2018 Genome Research Ltd.
6 
7     Author: Andrew Whitwham <aw7@sanger.ac.uk>
8 
9 Permission is hereby granted, free of charge, to any person obtaining a copy
10 of this software and associated documentation files (the "Software"), to deal
11 in the Software without restriction, including without limitation the rights
12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the Software is
14 furnished to do so, subject to the following conditions:
15 
16 The above copyright notice and this permission notice shall be included in
17 all copies or substantial portions of the Software.
18 
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
22 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25 DEALINGS IN THE SOFTWARE
26 */
27 
28 #include <config.h>
29 
30 #include <stdio.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <stdlib.h>
34 #include <stdarg.h>
35 #include <fcntl.h>
36 #include <errno.h>
37 
38 #ifdef _WIN32
39 #include <windows.h>
40 #endif /* _WIN32 */
41 
42 #include "tmp_file.h"
43 #include "htslib/sam.h"
44 
45 
tmp_print_error(tmp_file_t * tmp,const char * fmt,...)46 static void tmp_print_error(tmp_file_t *tmp, const char *fmt, ...) {
47     va_list argp;
48 
49     if (tmp->verbose) {
50         va_start(argp, fmt);
51         vfprintf(stderr, fmt, argp);
52         va_end(argp);
53     }
54 }
55 
56 
tmp_file_init(tmp_file_t * tmp,int verbose)57 static int tmp_file_init(tmp_file_t *tmp, int verbose) {
58     tmp->stream       = LZ4_createStream();
59     tmp->data_size    = 0;
60     tmp->group_size   = TMP_SAM_GROUP_SIZE;
61     tmp->input_size   = 0;
62     tmp->read_size    = 0;
63     tmp->output_size  = 0;
64     tmp->entry_number = 0;
65     tmp->offset = 0;
66     tmp->max_data_size    = TMP_SAM_MAX_DATA + sizeof(bam1_t); // arbitrary but growable
67     tmp->ring_buffer_size = TMP_SAM_RING_SIZE; // arbitrary (min 64K) but growable
68     tmp->comp_buffer_size = LZ4_COMPRESSBOUND(tmp->max_data_size * tmp->group_size);
69     tmp->ring_buffer = malloc(sizeof(uint8_t) * tmp->ring_buffer_size);
70     tmp->ring_index  = tmp->ring_buffer;
71     tmp->comp_buffer = malloc(tmp->comp_buffer_size);
72     tmp->verbose = verbose;
73     tmp->dict = NULL;
74     tmp->groups_written = 0;
75 
76     if (!tmp->ring_buffer || !tmp->comp_buffer || !tmp->stream) {
77         tmp_print_error(tmp, "[tmp_file] Error: unable to allocate compression buffers.\n");
78         return TMP_SAM_MEM_ERROR;
79     }
80 
81     return TMP_SAM_OK;
82 }
83 
84 
85 /*
86  * Opens the temp file and initialises memory.
87  * Verbose mode prints out error messages to stderr.
88  * Returns 0 on success, a negative number on failure.
89  */
tmp_file_open_write(tmp_file_t * tmp,char * tmp_name,int verbose)90 int tmp_file_open_write(tmp_file_t *tmp, char *tmp_name, int verbose) {
91     int ret;
92     unsigned int count = 1;
93     const unsigned int max_count = 100000; // more tries than this then something else is wrong
94     int fd;
95 
96     if ((ret = tmp_file_init(tmp, verbose))) {
97         return ret;
98     }
99 
100     // make space to write extended file name
101     if ((tmp->name = malloc(strlen(tmp_name) + 7)) == NULL) {
102         tmp_print_error(tmp, "[tmp_file] Error: unable to allocate memory for %s.\n", tmp_name);
103         return TMP_SAM_MEM_ERROR;
104     }
105 
106     // make sure temp file has a unique name
107     while (count < max_count) {
108         sprintf(tmp->name, "%s.%d", tmp_name, count);
109 
110 
111         #ifdef _WIN32
112         if ((fd = _open(tmp->name, O_RDWR|O_CREAT|O_EXCL|O_BINARY|O_TEMPORARY, 0600)) == -1) {
113         #else
114         if ((fd = open(tmp->name, O_RDWR|O_CREAT|O_EXCL, 0600)) == -1) {
115         #endif /* _WIN32 */
116 
117             if (errno != EEXIST) {
118                 tmp_print_error(tmp, "[tmp_file] Error: unable to create tmp file %s.\n", tmp->name);
119                 return TMP_SAM_FILE_ERROR;
120             }
121 
122             count++;
123             continue;
124         }
125 
126         break;
127     }
128 
129     if (count >= max_count) {
130         tmp_print_error(tmp, "[tmp_file] Error: unable to create unique temp file.\n");
131         return TMP_SAM_FILE_ERROR;
132     }
133 
134     if ((tmp->fp = fdopen(fd, "w+b")) == NULL) {
135         tmp_print_error(tmp, "[tmp_file] Error: unable to open write file %s.\n", tmp->name);
136         return TMP_SAM_FILE_ERROR;
137     }
138 
139     #ifndef _WIN32
140     unlink(tmp->name); // should auto delete when closed on linux
141     #endif
142 
143     return TMP_SAM_OK;
144 }
145 
146 
147 /*
148  * The ring buffer stores precompressionn/post decompression data.  LZ4 requires that
149  * previous data (64K worth) be available for efficient compression.  This function grows
150  * the ring buffer when needed.
151  * Returns 0 on success, a negative number on failure.
152  */
153 static int tmp_file_grow_ring_buffer(tmp_file_t *tmp, size_t new_size) {
154     // save the dictionary so lz4 can continue to function
155     int dict_size = 64 * 1024; // 64K max size
156 
157     if (tmp->groups_written) {
158         // if compression has been done then there is a dictionary to save
159 
160         if (tmp->dict == NULL) {
161 
162             if ((tmp->dict = malloc(sizeof(char) * dict_size)) == NULL) {
163                 tmp_print_error(tmp, "[tmp_file] Error: unable to allocate memory for compression dictionary.\n");
164                 return TMP_SAM_MEM_ERROR;
165             }
166         }
167 
168         if (LZ4_saveDict(tmp->stream, tmp->dict, dict_size) == 0) {
169             tmp_print_error(tmp, "[tmp_file] Error: unable to save compression dictionary.\n");
170             return TMP_SAM_LZ4_ERROR;
171         }
172     }
173 
174     if ((tmp->ring_buffer = realloc(tmp->ring_buffer, sizeof(char) * new_size)) == NULL) {
175         tmp_print_error(tmp, "[tmp_file] Error: unable to reallocate ring buffer.\n");
176         return TMP_SAM_MEM_ERROR;
177     }
178 
179     tmp->ring_buffer_size = new_size;
180 
181     return TMP_SAM_OK;
182 }
183 
184 
185 /*
186  * This does the actual compression and writing to a file.  The file format consists of a
187  * single size_t for the size of the compressed data followed by the data itself.
188  * Returns 0 on success, a negative number on failure.
189  */
190 static int tmp_file_write_to_file(tmp_file_t *tmp) {
191     size_t comp_size;
192 
193     if (tmp->input_size > tmp->max_data_size) {
194         tmp->max_data_size += tmp->input_size + sizeof(bam1_t);
195         tmp->comp_buffer_size = LZ4_COMPRESSBOUND(tmp->max_data_size);
196 
197         if ((tmp->comp_buffer = realloc(tmp->comp_buffer, sizeof(char) * tmp->comp_buffer_size)) == NULL) {
198             tmp_print_error(tmp, "[tmp_file] Error: unable to reallocate compression buffer.\n");
199             return TMP_SAM_MEM_ERROR;
200         }
201 
202         // make sure the ring buffer is big enough to accommodate the new max_data_size
203         if (tmp->ring_buffer_size < tmp->max_data_size * 5) {
204             int ret;
205             if ((ret = tmp_file_grow_ring_buffer(tmp, tmp->max_data_size * 5))) {
206                 return ret;
207             }
208         }
209     }
210 
211     tmp->ring_index = tmp->ring_buffer + tmp->offset;
212 
213     comp_size = LZ4_compress_fast_continue(tmp->stream, (const char *)tmp->ring_index,
214                    tmp->comp_buffer, tmp->input_size, tmp->comp_buffer_size, 1);
215 
216     if (comp_size == 0) {
217         tmp_print_error(tmp, "[tmp_file] Error: compression failed.\n");
218         return TMP_SAM_LZ4_ERROR;
219     }
220 
221     if (fwrite(&comp_size, sizeof(size_t), 1, tmp->fp) < 1) {
222         tmp_print_error(tmp, "[tmp_file] Error: tmp file write size failed.\n");
223         return TMP_SAM_FILE_ERROR;
224     }
225 
226     if (fwrite(tmp->comp_buffer, sizeof(char), comp_size, tmp->fp) < comp_size) {
227         tmp_print_error(tmp, "[tmp_file] Error: tmp file write data failed.\n");
228         return TMP_SAM_FILE_ERROR;
229     }
230 
231     tmp->offset += tmp->input_size;
232 
233     if (tmp->offset >= tmp->ring_buffer_size - tmp->max_data_size)
234         tmp->offset = 0;
235 
236     tmp->input_size = 0;
237     tmp->entry_number = 0;
238     tmp->groups_written++;
239 
240     return TMP_SAM_OK;
241 }
242 
243 
244 /*
245  * Stores an in memory bam structure for writing and if enough are gathered together writes
246  * it to a file.  Multiple alignments compress better that single ones though after a certain number
247  * there is a law of diminishing returns.
248  * Returns 0 on success, a negative number on failure.
249  */
250 int tmp_file_write(tmp_file_t *tmp, bam1_t *inbam) {
251 
252     if ((tmp->offset + tmp->input_size + sizeof(bam1_t) + inbam->l_data) >= tmp->ring_buffer_size) {
253         int ret;
254 
255         if ((ret = tmp_file_grow_ring_buffer(tmp, (tmp->offset + tmp->input_size + sizeof(bam1_t) + inbam->l_data) * 2))) {
256             tmp_print_error(tmp, "[tmp_file] Error: input line too big. (%ld).\n",
257                 (tmp->input_size + inbam->l_data));
258 
259             return ret;
260         }
261     }
262 
263     tmp->ring_index = tmp->ring_buffer + tmp->offset + tmp->input_size;
264 
265     // copy data into the ring buffer
266     memcpy(tmp->ring_index, inbam, sizeof(bam1_t));
267     memcpy(tmp->ring_index + sizeof(bam1_t) , inbam->data, inbam->l_data);
268     tmp->input_size += sizeof(bam1_t) + inbam->l_data;
269     tmp->entry_number++;
270 
271     if (tmp->entry_number == tmp->group_size) {
272         // actually write out the data
273         int ret;
274 
275         if ((ret = tmp_file_write_to_file(tmp))) {
276             return ret;
277         }
278     }
279 
280     return TMP_SAM_OK;
281 }
282 
283 
284 /*
285  * Marks the end of file writing.  Adds a size_t 0 to mark the end of
286  * the file. Companion function to tmp_file_begin_read below.
287  * Returns 0 on success, a negative number on failure.
288  */
289 int tmp_file_end_write(tmp_file_t *tmp) {
290     size_t terminator = 0;
291 
292     if (tmp->entry_number) {
293         int ret;
294 
295         if ((ret = tmp_file_write_to_file(tmp))) {
296             return ret;
297         }
298     }
299 
300     if (fwrite(&terminator, sizeof(size_t), 1, tmp->fp) < 1) {
301         tmp_print_error(tmp, "[tmp_file] Error: tmp file write terminator failed.\n");
302         return TMP_SAM_FILE_ERROR;
303     }
304 
305     fflush(tmp->fp);
306 
307     LZ4_freeStream(tmp->stream);
308 
309     return TMP_SAM_OK;
310 }
311 
312 
313 /*
314  * Prepares the file for reading.
315  * Companion function to tmp_file_end_write above.
316  * Returns 0 on success, a negative number on failure.
317  */
318 int tmp_file_begin_read(tmp_file_t *tmp) {
319 
320     rewind(tmp->fp);
321 
322     tmp->dstream = LZ4_createStreamDecode();
323     tmp->offset  = 0;
324     tmp->entry_number = tmp->group_size;
325 
326     if (!tmp->dstream) {
327         tmp_print_error(tmp, "[tmp_file] Error: unable to allocate compression stream.\n");
328         return TMP_SAM_MEM_ERROR;
329     }
330 
331     return TMP_SAM_OK;
332 }
333 
334 
335 /*
336  * Read the next alignment, either from memory or from a file.
337  * Returns size of entry on success, 0 on end of file or a negative on error.
338  */
339 int tmp_file_read(tmp_file_t *tmp, bam1_t *inbam) {
340     int entry_size;
341     uint8_t *data = inbam->data;
342 
343     /* while tmp_file_read assumes that the same bam1_t variable
344        is being used in each call, this may not be the case. So
345        default to the lowest memory size for safety. */
346     if (tmp->data_size > inbam->m_data) {
347         tmp->data_size = inbam->m_data;
348     }
349 
350     if (tmp->entry_number == tmp->group_size) {
351         // read more data
352         size_t comp_size;
353 
354         if (fread(&comp_size, sizeof(size_t), 1, tmp->fp) == 0 || comp_size == 0) {
355             return TMP_SAM_OK;
356         }
357 
358         if  (tmp->offset >= tmp->ring_buffer_size - tmp->max_data_size)
359             tmp->offset = 0;
360 
361         tmp->ring_index = tmp->ring_buffer + tmp->offset;
362 
363         if (fread(tmp->comp_buffer, sizeof(char), comp_size, tmp->fp) > comp_size) {
364             tmp_print_error(tmp, "[tmp_file] Error: error reading compressed data.\n");
365             return TMP_SAM_FILE_ERROR;
366         }
367 
368         tmp->output_size = LZ4_decompress_safe_continue(tmp->dstream, tmp->comp_buffer,
369                         (char *)tmp->ring_index, comp_size, tmp->max_data_size);
370 
371         if (tmp->output_size == 0) {
372             tmp_print_error(tmp, "[tmp_file] Error: decompression failed.\n");
373             return TMP_SAM_LZ4_ERROR;
374         }
375 
376         tmp->entry_number = 0;
377         tmp->read_size    = 0;
378     }
379 
380     tmp->ring_index = tmp->ring_buffer + tmp->offset;
381     memcpy(inbam, tmp->ring_index, sizeof(bam1_t));
382     inbam->data = data; // put the pointer to real bam data back
383 
384     if ((unsigned int)inbam->l_data > tmp->data_size) {
385         uint8_t *tmp_data;
386         tmp->data_size = inbam->l_data; kroundup32(tmp->data_size);
387 
388         if ((tmp_data = realloc(inbam->data, sizeof(uint8_t) * tmp->data_size)) == NULL) {
389             tmp_print_error(tmp, "[tmp_file] Error: unable to allocate tmp bam data memory.\n");
390             return TMP_SAM_MEM_ERROR;
391         }
392 
393         inbam->data = tmp_data;
394     }
395 
396     inbam->m_data = tmp->data_size; // set to the actual data size
397 
398     entry_size = sizeof(bam1_t);
399 
400     memcpy(inbam->data, tmp->ring_index + entry_size, inbam->l_data);
401     entry_size += inbam->l_data;
402 
403     tmp->offset += entry_size;
404     tmp->read_size += entry_size;
405     tmp->entry_number++;
406 
407     if (tmp->read_size > tmp->output_size) {
408         tmp_print_error(tmp, "[tmp_file] Error: wrong size of data returned RS:%ld OS:%ld EN:%ld GS:%ld.\n",
409             tmp->read_size, tmp->output_size, tmp->entry_number, tmp->group_size);
410         return TMP_SAM_LZ4_ERROR;
411     }
412 
413     if (tmp->read_size == tmp->output_size && tmp->entry_number != tmp->group_size) {
414         // hopefully the last entries in the read file
415         tmp->entry_number = tmp->group_size;
416     }
417 
418     return entry_size;
419 }
420 
421 
422 /*
423  * Frees up memory, closes the file and deletes it.
424  * Returns 0 on success or EOF on failure.
425  */
426 int tmp_file_destroy(tmp_file_t *tmp) {
427     int ret = 0;
428 
429     ret = fclose(tmp->fp);
430 
431     LZ4_freeStreamDecode(tmp->dstream);
432     free(tmp->ring_buffer);
433     free(tmp->comp_buffer);
434     free(tmp->name);
435     free(tmp->dict);
436 
437     return ret;
438 }
439