1
2 /******************************************************************************
3 *
4 * This file is part of canu, a software program that assembles whole-genome
5 * sequencing reads into contigs.
6 *
7 * This software is based on:
8 * 'Celera Assembler' r4587 (http://wgs-assembler.sourceforge.net)
9 * the 'kmer package' r1994 (http://kmer.sourceforge.net)
10 *
11 * Except as indicated otherwise, this is a 'United States Government Work',
12 * and is released in the public domain.
13 *
14 * File 'README.licenses' in the root directory of this distribution
15 * contains full conditions and disclaimers.
16 */
17
18 #include "runtime.H"
19
20 #include "strings.H"
21
22 #include "sqStore.H"
23 #include "ovStore.H"
24 #include "ovStoreConfig.H"
25
26 #include <vector>
27 #include <algorithm>
28
29
30
31
32 void
assignReadsToSlices(sqStore * seq,uint64 minMemory,uint64 maxMemory)33 ovStoreConfig::assignReadsToSlices(sqStore *seq,
34 uint64 minMemory,
35 uint64 maxMemory) {
36
37 int64 procMax = sysconf(_SC_CHILD_MAX);
38 int64 openMax = sysconf(_SC_OPEN_MAX) - 16;
39
40 //
41 // Load the number of overlaps per read.
42 //
43
44 fprintf(stderr, "\n");
45 fprintf(stderr, "Finding number of overlaps per read and per file.\n");
46 fprintf(stderr, "\n");
47 fprintf(stderr, " Moverlaps\n");
48 fprintf(stderr, "------------ ----------------------------------------\n");
49
50 uint64 *oPF = new uint64 [_numInputs];
51 uint32 *oPR = new uint32 [_maxID + 1];
52 uint64 numOverlaps = 0;
53
54 memset(oPF, 0, sizeof(uint64) * (_numInputs));
55 memset(oPR, 0, sizeof(uint32) * (_maxID + 1));
56
57 for (uint32 ii=0; ii<_numInputs; ii++) {
58 ovFile *inputFile = new ovFile(seq, _inputNames[ii], ovFileFullCounts);
59
60 for (uint32 rr=0; rr<_maxID + 1; rr++) {
61 oPF[ii] += inputFile->getCounts()->numOverlaps(rr) / 2; // Reports counts as if they were
62 oPR[rr] += inputFile->getCounts()->numOverlaps(rr); // already symmetrized.
63 }
64
65 numOverlaps += oPF[ii] * 2;
66
67 delete inputFile;
68
69 fprintf(stderr, "%12.3f %40s\n", oPF[ii] / 1000000.0, _inputNames[ii]);
70 }
71
72 fprintf(stderr, "------------ ----------------------------------------\n");
73 fprintf(stderr, "%12.3f Moverlaps in inputs\n", numOverlaps / 2 / 1000000.0);
74 fprintf(stderr, "%12.3f Moverlaps to sort\n", numOverlaps / 1000000.0);
75 fprintf(stderr, "\n");
76
77 if (numOverlaps == 0)
78 fprintf(stderr, "Found no overlaps to sort.\n");
79
80
81 //
82 // Partition the overlaps into buckets.
83 //
84 // This will pick the smallest memory size that uses fewer than maxFiles buckets. Unreasonable
85 // values can break this - either too low memory or too high allowed open files (an OS limit).
86 //
87
88 uint64 olapsPerSliceMin = (minMemory - OVSTORE_MEMORY_OVERHEAD) / ovOverlapSortSize;
89 uint64 olapsPerSliceMax = (maxMemory - OVSTORE_MEMORY_OVERHEAD) / ovOverlapSortSize;
90
91 // Reset the limits so that the maximum number of overlaps per read can be held in one slice.
92
93 uint64 maxOverlapsPerRead = 0;
94
95 for (uint64 olaps=0, ii=0; ii<_maxID+1; ii++)
96 if (maxOverlapsPerRead < oPR[ii])
97 maxOverlapsPerRead = oPR[ii];
98
99 if ((olapsPerSliceMin < maxOverlapsPerRead) ||
100 (olapsPerSliceMax < maxOverlapsPerRead)) {
101 fprintf(stderr, "WARNING:\n");
102
103 if (olapsPerSliceMin < maxOverlapsPerRead) {
104 fprintf(stderr, "WARNING: Increasing minimum memory to handle " F_U64 " overlaps per read.\n", maxOverlapsPerRead);
105 olapsPerSliceMin = maxOverlapsPerRead;
106 minMemory = maxOverlapsPerRead * ovOverlapSortSize + OVSTORE_MEMORY_OVERHEAD;
107 }
108
109 if (olapsPerSliceMax < maxOverlapsPerRead) {
110 fprintf(stderr, "WARNING: Increasing maximum memory to handle " F_U64 " overlaps per read.\n", maxOverlapsPerRead);
111 olapsPerSliceMax = maxOverlapsPerRead;
112 maxMemory = maxOverlapsPerRead * ovOverlapSortSize + OVSTORE_MEMORY_OVERHEAD;
113 }
114
115 fprintf(stderr, "WARNING:\n");
116 fprintf(stderr, "\n");
117 }
118
119 fprintf(stderr, "Configuring for:\n");
120 fprintf(stderr, " Up to " F_S64 " processes.\n", procMax);
121 fprintf(stderr, " Up to " F_S64 " open files.\n", openMax);
122
123 if (minMemory < maxMemory)
124 fprintf(stderr, " At least %5.2f GB memory -> %4" F_U64P " sort processes with %.2f Moverlaps each.\n",
125 minMemory / 1024.0 / 1024.0 / 1024.0,
126 numOverlaps / olapsPerSliceMin + 1,
127 olapsPerSliceMin / 1000000.0);
128
129 fprintf(stderr, " At most %5.2f GB memory -> %4" F_U64P " sort processes with %.2f Moverlaps each.\n",
130 maxMemory / 1024.0 / 1024.0 / 1024.0,
131 numOverlaps / olapsPerSliceMax + 1,
132 olapsPerSliceMax / 1000000.0);
133 fprintf(stderr, "\n");
134
135
136
137 // More processes -> higher bandwidth utilization, but can also thrash the FS
138 // More memory -> harder to get machines to run on
139 //
140 // So arbitrarily pick a memory size 75% of the maximum.
141
142 uint64 sortMemory = minMemory + 3 * (maxMemory - minMemory) / 4;
143
144 uint64 olapsPerSlice = (sortMemory - OVSTORE_MEMORY_OVERHEAD) / ovOverlapSortSize;
145
146 // With that upper limit on the number of overlaps per slice, count how many slices
147 // we need to make.
148
149 _numSlices = 1;
150
151 uint64 total = 0;
152
153 for (uint64 olaps=0, ii=0; ii<_maxID+1; ii++) {
154 if (olaps + oPR[ii] > olapsPerSlice) {
155 olaps = 0;
156 _numSlices++;
157 }
158
159 olaps += oPR[ii];
160 total += oPR[ii];
161 }
162
163 // Divide those overlaps evenly among the slices, but no smaller than
164 // our minimum (oddly called 'maxOverlapsPerRead').
165
166 olapsPerSlice = (uint64)ceil((double)numOverlaps / (double)_numSlices) + 1;
167
168 if (olapsPerSlice < maxOverlapsPerRead)
169 olapsPerSlice = maxOverlapsPerRead;
170
171 _sortMemory = (olapsPerSlice * ovOverlapSortSize + OVSTORE_MEMORY_OVERHEAD) / 1024.0 / 1024.0 / 1024.0;
172
173 // One more time, just to count the number of slices we're making.
174
175 _numSlices = 1;
176
177 for (uint64 olaps=0, ii=0; ii<_maxID+1; ii++) {
178 if (olaps + oPR[ii] > olapsPerSlice) {
179 olaps = 0;
180 _numSlices++;
181 }
182
183 olaps += oPR[ii];
184 }
185
186 // Assign inputs to each bucketizer. Greedy load balancing.
187
188 // Essentially a free parameter - lower makes bigger buckets and fewer files.
189 _numBuckets = std::min(_numInputs, _numSlices);
190
191 uint64 *olapsPerBucket = new uint64 [_numBuckets];
192
193 for (uint32 ii=0; ii<_numBuckets; ii++)
194 olapsPerBucket[ii] = 0;
195
196 for (uint32 ss=0, ii=0; ii<_numInputs; ii++) {
197 uint32 mb = 0;
198
199 for (uint32 bb=0; bb<_numBuckets; bb++)
200 if (olapsPerBucket[bb] < olapsPerBucket[mb])
201 mb = bb;
202
203 _inputToBucket[ii] = mb;
204 olapsPerBucket[mb] += oPF[ii] * 2;
205 }
206
207 delete [] oPF;
208
209
210 // Report ovb to bucket mapping.
211
212 fprintf(stderr, "------------------------------------------------------------\n");
213 fprintf(stderr, "Will bucketize using " F_U32 " processes.\n", _numBuckets);
214 fprintf(stderr, "\n");
215 fprintf(stderr, " number number of\n");
216 fprintf(stderr, " bucket inputs overlaps\n");
217 fprintf(stderr, "------- ------- ------------\n");
218
219 uint64 totOlaps = 0;
220
221 for (uint32 ii=0; ii<_numBuckets; ii++) {
222 uint32 ni = 0;
223
224 for (uint32 xx=0; xx<_numInputs; xx++) {
225 if (_inputToBucket[xx] == ii)
226 ni++;
227 }
228
229 fprintf(stderr, "%7" F_U32P " %7" F_U32P " %12" F_U64P "\n",
230 ii, ni, olapsPerBucket[ii]);
231
232 totOlaps += olapsPerBucket[ii];
233 }
234
235 delete [] olapsPerBucket;
236
237 fprintf(stderr, "------- ------- ------------\n");
238 fprintf(stderr, " %12" F_U64P "\n", totOlaps);
239 fprintf(stderr, "\n");
240
241 // Assign reads to slices.
242
243 fprintf(stderr, "\n");
244 fprintf(stderr, "------------------------------------------------------------\n");
245 fprintf(stderr, "Will sort using " F_U32 " processes.\n", _numSlices);
246 fprintf(stderr, " Up to %7.2f M overlaps\n", olapsPerSlice / 1000000.0);
247 fprintf(stderr, " %7.2f GB memory\n", _sortMemory);
248 fprintf(stderr, "\n");
249 fprintf(stderr, " number of\n");
250 fprintf(stderr, " slice overlaps read range\n");
251 fprintf(stderr, "------ ------------ ---------------------\n");
252
253 totOlaps = 0;
254
255 {
256 uint32 first = 1;
257 uint64 olaps = 0;
258 uint32 slice = 0;
259
260 for (uint32 ii=0; ii<_maxID+1; ii++) {
261 if (olaps + oPR[ii] > olapsPerSlice) {
262 fprintf(stderr, "%6" F_U32P " %12" F_U64P " %10" F_U32P "-%-10" F_U32P "\n", slice, olaps, first, ii-1);
263 totOlaps += olaps;
264 olaps = 0;
265 slice++;
266 first = ii;
267 }
268
269 olaps += oPR[ii];
270 _readToSlice[ii] = slice;
271 }
272
273 fprintf(stderr, "%6" F_U32P " %12" F_U64P " %10" F_U32P "-%-10" F_U32P "\n", slice, olaps, first, _maxID);
274 totOlaps += olaps;
275
276 if (slice + 1 != _numSlices)
277 fprintf(stderr, "ERROR: expected %u slices, generated %u instead. %lu overlaps per slice allowed.\n",
278 _numSlices, slice, olapsPerSlice);
279 assert(slice + 1 == _numSlices);
280 }
281
282 fprintf(stderr, "------ ------------\n");
283 fprintf(stderr, " %12" F_U64P "\n", totOlaps);
284
285 fprintf(stderr, "\n");
286
287 delete [] oPR;
288 }
289
290
291
292
293 int
main(int argc,char ** argv)294 main(int argc, char **argv) {
295 char *seqName = NULL;
296 uint64 minMemory = (uint64)1 * 1024 * 1024 * 1024;
297 uint64 maxMemory = (uint64)4 * 1024 * 1024 * 1024;
298
299 std::vector<char const *> fileList;
300
301 char const *configOut = NULL;
302 char const *configIn = NULL;
303
304 bool writeNumBuckets = false;
305 bool writeNumSlices = false;
306 bool writeMemory = false;
307 uint32 writeInputs = 0;
308 uint32 writeSlices = 0;
309
310 argc = AS_configure(argc, argv, 1);
311
312 std::vector<char const *> err;
313 for (int32 arg=1; arg < argc; arg++) {
314 if (strcmp(argv[arg], "-S") == 0) {
315 seqName = argv[++arg];
316
317 } else if (strcmp(argv[arg], "-M") == 0) {
318 double lo=0.0, hi=0.0;
319
320 decodeRange<double>(argv[++arg], lo, hi);
321
322 minMemory = (uint64)ceil(lo * 1024.0 * 1024.0 * 1024.0);
323 maxMemory = (uint64)ceil(hi * 1024.0 * 1024.0 * 1024.0);
324
325 } else if (strcmp(argv[arg], "-L") == 0) {
326 AS_UTL_loadFileList(argv[++arg], fileList);
327
328 } else if (strcmp(argv[arg], "-create") == 0) {
329 configOut = argv[++arg];
330
331 } else if (strcmp(argv[arg], "-describe") == 0) {
332 configIn = argv[++arg];
333
334 } else if (strcmp(argv[arg], "-numbuckets") == 0) {
335 writeNumBuckets = true;
336 } else if (strcmp(argv[arg], "-numslices") == 0) {
337 writeNumSlices = true;
338 } else if (strcmp(argv[arg], "-sortmemory") == 0) {
339 writeMemory = true;
340 } else if (strcmp(argv[arg], "-listinputs") == 0) {
341 writeInputs = strtouint32(argv[++arg]);
342 } else if (strcmp(argv[arg], "-listslices") == 0) {
343 writeSlices = strtouint32(argv[++arg]);
344
345 } else if (((argv[arg][0] == '-') && (argv[arg][1] == 0)) ||
346 (fileExists(argv[arg]))) {
347 // Assume it's an input file
348 fileList.push_back(argv[arg]);
349
350 } else {
351 char *s = new char [1024];
352 snprintf(s, 1024, "%s: unknown option '%s'.\n", argv[0], argv[arg]);
353 err.push_back(s);
354 }
355 }
356
357 if ((seqName == NULL) && (configIn == NULL))
358 err.push_back("ERROR: No sequence store (-S) supplied.\n");
359
360 if ((fileList.size() == 0) && (configIn == NULL))
361 err.push_back("ERROR: No input overlap files (-L or last on the command line) supplied.\n");
362
363 if ((configOut != NULL) && (configIn != NULL))
364 err.push_back("ERROR: Can't both -create -describe a config.\n");
365
366 if ((configOut == NULL) && (configIn == NULL))
367 err.push_back("ERROR: Must supply one of -create or -describe.\n");
368
369 if ((minMemory <= OVSTORE_MEMORY_OVERHEAD) ||
370 (maxMemory <= OVSTORE_MEMORY_OVERHEAD + ovOverlapSortSize))
371 err.push_back("ERROR: Memory (-M) must be at least 0.25 GB to account for overhead.\n"); // , OVSTORE_MEMORY_OVERHEAD / 1024.0 / 1024.0 / 1024.0
372
373 if (err.size() > 0) {
374 fprintf(stderr, "usage: %s -S asm.seqStore -create out.config [opts] [-L fileList | *.ovb]\n", argv[0]);
375 fprintf(stderr, " -S asm.seqStore path to seqStore for this assembly\n");
376 fprintf(stderr, "\n");
377 fprintf(stderr, " -L fileList a list of ovb files in 'fileList'\n");
378 fprintf(stderr, "\n");
379 fprintf(stderr, " -M g use up to 'g' gigabytes memory for sorting overlaps\n");
380 fprintf(stderr, " default 4; g-0.25 gb is available for sorting overlaps\n");
381 fprintf(stderr, "\n");
382 fprintf(stderr, " -create config write overlap store configuration to file 'config'\n");
383 fprintf(stderr, "\n");
384 fprintf(stderr, " -describe config write a readable description of the config in 'config' to the screen\n");
385 fprintf(stderr, " -numbuckets write the number of buckets to the screen\n");
386 fprintf(stderr, " -numslices write the number of slices to the screen\n");
387 fprintf(stderr, " -sortmemory write the memory needed (in GB) for a sort job to the screen\n");
388 fprintf(stderr, " -listinputs n write a list of the input ovb files needed for bucketizer job 'n'");
389 fprintf(stderr, " -listslices n write a list of the input slice files needed for sorter job 'n'\n");
390 fprintf(stderr, "\n");
391 fprintf(stderr, "\n");
392 fprintf(stderr, "Sizes and Limits:\n");
393 fprintf(stderr, " ovOverlap " F_S32 " words of " F_S32 " bits each.\n", (int32)ovOverlapNWORDS, (int32)ovOverlapWORDSZ);
394 fprintf(stderr, " ovOverlapSortSize " F_S32 " bits\n", (int32)ovOverlapSortSize * 8);
395 fprintf(stderr, " SC_CHILD_MAX " F_S32 " processes\n", (int32)sysconf(_SC_CHILD_MAX));
396 fprintf(stderr, " SC_OPEN_MAX " F_S32 " files\n", (int32)sysconf(_SC_OPEN_MAX));
397 fprintf(stderr, "\n");
398
399 for (uint32 ii=0; ii<err.size(); ii++)
400 if (err[ii])
401 fputs(err[ii], stderr);
402
403 exit(1);
404 }
405
406 ovStoreConfig *config = NULL;
407
408 // If describing, load and describe.
409
410 if (configIn) {
411 config = new ovStoreConfig(configIn);
412 }
413
414 // Check parameters, reset some of them.
415
416 else {
417 if (minMemory < OVSTORE_MEMORY_OVERHEAD + ovOverlapSortSize) {
418 fprintf(stderr, "Reset minMemory from " F_U64 " to " F_SIZE_T "\n", minMemory, OVSTORE_MEMORY_OVERHEAD + ovOverlapSortSize);
419 minMemory = OVSTORE_MEMORY_OVERHEAD + ovOverlapSortSize;
420 }
421
422 sqStore *seq = new sqStore(seqName);
423 uint32 maxID = seq->sqStore_lastReadID();
424
425 config = new ovStoreConfig(fileList, maxID);
426
427 config->assignReadsToSlices(seq, minMemory, maxMemory);
428 config->writeConfig(configOut);
429
430 delete seq;
431 }
432
433 // If we have a config, report parameters.
434
435 if (config) {
436 uint32 memGB = (uint32)ceil(config->sortMemory() + 0.5); // Adds an extra 0.5 to 1.49 gb.
437
438 if (writeNumBuckets) {
439 fprintf(stdout, F_U32 "\n", config->numBuckets());
440 }
441
442 else if (writeNumSlices) {
443 fprintf(stdout, F_U32 "\n", config->numSlices());
444 }
445
446 else if (writeMemory) {
447 fprintf(stdout, F_U32 "\n", memGB);
448 }
449
450 else if (writeInputs) {
451 for (uint32 ff=0; ff<config->numInputs(writeInputs); ff++)
452 fprintf(stdout, "%s\n", config->getInput(writeInputs, ff));
453 }
454
455 else if (writeSlices) {
456 for (uint32 bb=1; bb<=config->numBuckets(); bb++) {
457 fprintf(stdout, "bucket%04" F_U32P "/slice%04" F_U32P "\n", bb, writeSlices);
458 fprintf(stdout, "bucket%04" F_U32P "/sliceSizes\n", bb);
459 }
460 }
461
462 else {
463 fprintf(stdout, "\n");
464 fprintf(stdout, "Configured for:\n");
465 fprintf(stdout, " numBuckets %8" F_U32P "\n", config->numBuckets());
466 fprintf(stdout, " numSlices %8" F_U32P "\n", config->numSlices());
467 fprintf(stdout, " sortMemory %8" F_U32P " GB (%5.3f GB)\n", memGB, config->sortMemory());
468 }
469 }
470
471 // Cleanup and quit!
472
473 delete config;
474
475 exit(0);
476 }
477