1 /* dup1: 1st duplicate pass, split relation files into 'nslices'
2 slices (adapted from check).
3
4 Usage:
5 dup1 [-bz] [-n nslices_log] -out <dir> file1 ... filen
6 by default nslices_log = 1 (nslices = 2).
7
8 Files file1 ... filen are split into 'nslices' slices in
9 <dir>/0/filej ... <dir>/31/filej.
10
11 If option -bz is given, then the output is compressed with bzip2
12 instead of gzip.
13 Input can be in gzipped or bzipped format.
14 */
15
16 #include "cado.h" // IWYU pragma: keep
17
18 // IWYU pragma: no_include <bits/types/struct_rusage.h>
19
20 #define MAX_NSLICES_LOG 6
21
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <string.h>
26 #include <inttypes.h>
27 #ifdef HAVE_MINGW
28 #include <fcntl.h> /* for _O_BINARY */
29 #endif
30
31 #include "filter_config.h"
32 #include "filter_io.h" // filter_rels
33 #include "gzip.h" // fopen_maybe_compressed
34 #include "macros.h"
35 #include "portability.h" // strdup // IWYU pragma: keep
36 #include "misc.h" // filelist_clear
37 #include "params.h" // param_list_parse_*
38 #include "timing.h" // timingstats_dict_t
39 #include "verbose.h"
40
41 #define DEFAULT_LOG_MAX_NRELS_PER_FILES 25
42
43 /* Only (a,b) are parsed on input. This flags control whether we copy the
44 * rest of the relation data to the output file, or if we content
45 * ourselves with smaller .ab files */
46 static int only_ab = 0;
47
48 static uint64_t nr_rels_tot[(1 << MAX_NSLICES_LOG)];
49 static unsigned int nslices_log = 1, do_slice[(1 << MAX_NSLICES_LOG)];
50
51
52 typedef struct {
53 const char *prefix, *suffix;
54 char *filename;
55 FILE *file;
56 const char *msg;
57 unsigned int next_idx;
58 size_t lines_per_file, lines_left;
59 } split_output_iter_t;
60
61 static split_output_iter_t *
split_iter_init(const char * prefix,const char * suffix,const size_t lines_per_file,const char * msg)62 split_iter_init(const char *prefix, const char *suffix,
63 const size_t lines_per_file, const char *msg)
64 {
65 split_output_iter_t *iter = malloc(sizeof(split_output_iter_t));
66 ASSERT_ALWAYS(iter != NULL);
67 iter->prefix = strdup(prefix);
68 iter->suffix = strdup(suffix);
69 iter->next_idx = 0;
70 iter->filename = NULL;
71 iter->file = NULL;
72 if (msg)
73 iter->msg = strdup(msg);
74 else
75 iter->msg = NULL;
76 ASSERT_ALWAYS(lines_per_file > 0);
77 iter->lines_per_file = lines_per_file;
78 iter->lines_left = 0; /* Force opening of file on next write */
79 return iter;
80 }
81
82 /* used for counting time in different processes */
83 timingstats_dict_t stats;
84
85
86 static void
split_iter_end(split_output_iter_t * iter)87 split_iter_end(split_output_iter_t *iter)
88 {
89 if (iter->file != NULL)
90 fclose_maybe_compressed(iter->file, iter->filename);
91 free(iter->filename);
92 free((void *) iter->prefix);
93 free((void *) iter->suffix);
94 free((void *) iter->msg);
95 free(iter);
96 }
97
98 /* Closes the currently open file, if any, and opens the next one */
99 void
split_iter_open_next_file(split_output_iter_t * iter)100 split_iter_open_next_file(split_output_iter_t *iter)
101 {
102 if (iter->file != NULL) {
103 int rc;
104 #ifdef HAVE_GETRUSAGE
105 struct rusage r[1];
106 rc = fclose_maybe_compressed2(iter->file, iter->filename, r);
107 timingstats_dict_add(stats, iter->prefix, r);
108 #else
109 rc = fclose_maybe_compressed(iter->file, iter->filename);
110 #endif
111 ASSERT_ALWAYS (rc == 0);
112 }
113
114 free (iter->filename);
115 int rc = asprintf(&(iter->filename), "%s%04x%s",
116 iter->prefix, iter->next_idx++, iter->suffix);
117 ASSERT_ALWAYS (rc >= 0);
118 if (iter->msg != NULL)
119 fprintf (stderr, "%s%s\n", iter->msg, iter->filename);
120 iter->file = fopen_maybe_compressed(iter->filename, "w");
121 if (iter->file == NULL) {
122 char *msg;
123 rc = asprintf(&msg, "Could not open file %s for writing", iter->filename);
124 if (rc >= 0) {
125 perror(msg);
126 free(msg);
127 } else {
128 perror("Could not open file for writing");
129 }
130 exit(EXIT_FAILURE);
131 }
132 iter->lines_left = iter->lines_per_file;
133 }
134
135 static void
split_iter_write_next(split_output_iter_t * iter,const char * line)136 split_iter_write_next(split_output_iter_t *iter, const char *line)
137 {
138 if (iter->lines_left == 0)
139 split_iter_open_next_file(iter);
140 if (fputs (line, iter->file) == EOF) {
141 perror("Error writing relation");
142 abort();
143 }
144 iter->lines_left--;
145 }
146
147
148 /* Must be called only when nslices_log > 0 */
149 static inline unsigned int
compute_slice(int64_t a,uint64_t b)150 compute_slice (int64_t a, uint64_t b)
151 {
152 uint64_t h = CA_DUP1 * (uint64_t) a + CB_DUP1 * b;
153 /* Using the low bit of h is not a good idea, since then
154 odd values of i are twice more likely. The second low bit
155 also gives a small bias with RSA768 (but not for random
156 coprime a, b). We use here the nslices_log high bits.
157 */
158 h >>= (64 - nslices_log);
159 return (unsigned int) h;
160 }
161
162 /* Callback function called by prempt_scan_relations */
163
164 static void *
thread_dup1(void * context_data,earlyparsed_relation_ptr rel)165 thread_dup1 (void * context_data, earlyparsed_relation_ptr rel)
166 {
167 unsigned int slice = compute_slice (rel->a, rel->b);
168 split_output_iter_t **outiters = (split_output_iter_t**)context_data;
169
170 if (do_slice[slice])
171 {
172 if (only_ab)
173 {
174 char *p = rel->line;
175 while (*p != ':')
176 p++;
177 *p = '\n';
178 }
179
180 split_output_iter_t *iter = outiters[slice];
181 split_iter_write_next(iter, rel->line);
182 nr_rels_tot[slice]++;
183 }
184 return NULL;
185 }
186
187 /* Special callback function for when nslices = 1 */
188 static void *
thread_dup1_special(void * context_data,earlyparsed_relation_ptr rel)189 thread_dup1_special (void * context_data, earlyparsed_relation_ptr rel)
190 {
191 split_output_iter_t **outiters = (split_output_iter_t**)context_data;
192 if (do_slice[0])
193 {
194 if (only_ab)
195 {
196 char *p = rel->line;
197 while (*p != ':')
198 p++;
199 *p = '\n';
200 }
201
202 split_output_iter_t *iter = outiters[0];
203 split_iter_write_next(iter, rel->line);
204 nr_rels_tot[0]++;
205 }
206 return NULL;
207 }
208
declare_usage(param_list pl)209 static void declare_usage(param_list pl)
210 {
211 param_list_decl_usage(pl, "filelist", "file containing a list of input files");
212 param_list_decl_usage(pl, "basepath", "path added to all file in filelist");
213 param_list_decl_usage(pl, "out", "output directory");
214 param_list_decl_usage(pl, "prefix", "prefix for output files");
215 param_list_decl_usage(pl, "lognrels", "log of number of rels per output file");
216 param_list_decl_usage(pl, "n", "log of number of slices (default: 1)");
217 param_list_decl_usage(pl, "only", "do only slice i (default: all)");
218 param_list_decl_usage(pl, "outfmt",
219 "format of output file (default same as input)");
220 param_list_decl_usage(pl, "ab", "only print a and b in the output");
221 param_list_decl_usage(pl, "abhexa",
222 "read a and b as hexa not decimal");
223 param_list_decl_usage(pl, "force-posix-threads", "force the use of posix threads, do not rely on platform memory semantics");
224 param_list_decl_usage(pl, "path_antebuffer", "path to antebuffer program");
225 verbose_decl_usage(pl);
226 }
227
228 static void
usage(param_list pl,char * argv0)229 usage (param_list pl, char *argv0)
230 {
231 param_list_print_usage(pl, argv0, stderr);
232 exit(EXIT_FAILURE);
233 }
234
235
236 int
main(int argc,char * argv[])237 main (int argc, char * argv[])
238 {
239 char * argv0 = argv[0];
240 unsigned int log_max_nrels_per_files = DEFAULT_LOG_MAX_NRELS_PER_FILES;
241 int only_slice = -1;
242 int abhexa = 0;
243
244 param_list pl;
245 param_list_init(pl);
246 declare_usage(pl);
247 argv++,argc--;
248
249 param_list_configure_switch(pl, "ab", &only_ab);
250 param_list_configure_switch(pl, "abhexa", &abhexa);
251 param_list_configure_switch(pl, "force-posix-threads", &filter_rels_force_posix_threads);
252
253 #ifdef HAVE_MINGW
254 _fmode = _O_BINARY; /* Binary open for all files */
255 #endif
256
257 if (argc == 0)
258 usage (pl, argv0);
259
260 for( ; argc ; ) {
261 if (param_list_update_cmdline(pl, &argc, &argv)) { continue; }
262 /* Since we accept file names freeform, we decide to never abort
263 * on unrecognized options */
264 break;
265 // fprintf (stderr, "Unknown option: %s\n", argv[0]);
266 // abort();
267 }
268 /* print command-line arguments */
269 verbose_interpret_parameters(pl);
270 param_list_print_command_line (stdout, pl);
271 fflush(stdout);
272
273 param_list_parse_uint(pl, "n", &nslices_log);
274 const char *outdir = param_list_lookup_string(pl, "out");
275 param_list_parse_int(pl, "only", &only_slice);
276 param_list_parse_uint(pl, "lognrels", &log_max_nrels_per_files);
277 const char *outfmt = param_list_lookup_string(pl, "outfmt");
278 const char * filelist = param_list_lookup_string(pl, "filelist");
279 const char * basepath = param_list_lookup_string(pl, "basepath");
280 const char * path_antebuffer = param_list_lookup_string(pl, "path_antebuffer");
281 const char *prefix_files = param_list_lookup_string(pl, "prefix");
282
283 if (param_list_warn_unused(pl))
284 {
285 fprintf(stderr, "Error, unused parameters are given\n");
286 usage(pl, argv0);
287 }
288
289 if (nslices_log > MAX_NSLICES_LOG)
290 {
291 fprintf(stderr, "Error, -n is too large\n");
292 usage(pl, argv0);
293 }
294 if (basepath && !filelist)
295 {
296 fprintf(stderr, "Error, -basepath only valid with -filelist\n");
297 usage(pl, argv0);
298 }
299
300 if (!prefix_files)
301 {
302 fprintf(stderr, "Error, missing -prefix command line argument\n");
303 usage(pl, argv0);
304 }
305
306 if (!outdir)
307 {
308 fprintf(stderr, "Error, missing -out command line argument\n");
309 usage(pl, argv0);
310 }
311 if (outfmt && !is_supported_compression_format(outfmt)) {
312 fprintf(stderr, "Error, output compression format unsupported\n");
313 usage(pl, argv0);
314 }
315
316 unsigned int nslices = 1 << nslices_log;
317 if (only_slice < 0) /* split all slices */
318 {
319 for (unsigned int i = 0; i < nslices; i++)
320 do_slice[i] = 1;
321 }
322 else /* split only slide i */
323 {
324 for (unsigned int i = 0; i < nslices; i++)
325 do_slice[i] = (i == (unsigned int) only_slice);
326 }
327
328 if ((filelist != NULL) + (argc != 0) != 1) {
329 fprintf(stderr, "Error, provide either -filelist or freeform file names\n");
330 usage(pl, argv0);
331 }
332
333 set_antebuffer_path (argv0, path_antebuffer);
334 char ** files = filelist ? filelist_from_file(basepath, filelist, 0) : argv;
335
336 // If not output suffix is specified, use suffix of first input file
337 if (!outfmt && files[0] != NULL)
338 get_suffix_from_filename (files[0], &outfmt);
339
340 memset (nr_rels_tot, 0, sizeof(uint64_t) * nslices);
341
342 split_output_iter_t **outiters;
343 outiters = malloc(sizeof(split_output_iter_t *) * nslices);
344 ASSERT_ALWAYS(outiters != NULL);
345 for(unsigned int i = 0 ; i < nslices ; i++)
346 {
347 char *prefix, *suffix, *msg;
348 int rc = asprintf(&prefix, "%s/%d/%s.",
349 outdir, i, prefix_files);
350 ASSERT_ALWAYS(rc >= 0);
351 rc = asprintf(&suffix, only_ab ? ".ab%s" : "%s", outfmt);
352 ASSERT_ALWAYS(rc >= 0);
353 rc = asprintf (&msg, "# Opening output file for slice %d : ", i);
354 ASSERT_ALWAYS(rc >= 0);
355 outiters[i] = split_iter_init(prefix, suffix, 1UL<<log_max_nrels_per_files, msg);
356 free(prefix);
357 free(suffix);
358 free(msg);
359 }
360
361 timingstats_dict_init(stats);
362 if (nslices == 1)
363 filter_rels(files, (filter_rels_callback_t) &thread_dup1_special,
364 (void*)outiters, EARLYPARSE_NEED_LINE |
365 (abhexa ? EARLYPARSE_NEED_AB_HEXA : EARLYPARSE_NEED_AB_DECIMAL),
366 NULL, stats);
367 else
368 filter_rels(files, (filter_rels_callback_t) &thread_dup1, (void*)outiters,
369 EARLYPARSE_NEED_LINE |
370 (abhexa ? EARLYPARSE_NEED_AB_HEXA : EARLYPARSE_NEED_AB_DECIMAL),
371 NULL, stats);
372
373 for(unsigned int i = 0 ; i < nslices ; i++)
374 split_iter_end(outiters[i]);
375
376 for (unsigned int i = 0; i < nslices; i++)
377 fprintf (stderr, "# slice %d received %" PRIu64 " relations\n", i,
378 nr_rels_tot[i]);
379
380 if (filelist) filelist_clear(files);
381
382 free(outiters);
383
384 param_list_clear(pl);
385
386 // double thread_times[2];
387 // thread_seconds_user_sys(thread_times);
388 timingstats_dict_add_mythread(stats, "main");
389 // fprintf(stderr, "Main thread ends after having spent %.2fs+%.2fs on cpu \n", thread_times[0], thread_times[1]);
390 timingstats_dict_disp(stats);
391 timingstats_dict_clear(stats);
392
393 return 0;
394 }
395