1 
2 /******************************************************************************
3  *
4  *  This file is part of canu, a software program that assembles whole-genome
5  *  sequencing reads into contigs.
6  *
7  *  This software is based on:
8  *    'Celera Assembler' r4587 (http://wgs-assembler.sourceforge.net)
9  *    the 'kmer package' r1994 (http://kmer.sourceforge.net)
10  *
11  *  Except as indicated otherwise, this is a 'United States Government Work',
12  *  and is released in the public domain.
13  *
14  *  File 'README.licenses' in the root directory of this distribution
15  *  contains full conditions and disclaimers.
16  */
17 
18 #include "runtime.H"
19 
20 #include "strings.H"
21 
22 #include "sqStore.H"
23 #include "ovStore.H"
24 #include "ovStoreConfig.H"
25 
26 #include <vector>
27 #include <algorithm>
28 
29 
30 
31 
32 void
assignReadsToSlices(sqStore * seq,uint64 minMemory,uint64 maxMemory)33 ovStoreConfig::assignReadsToSlices(sqStore        *seq,
34                                    uint64          minMemory,
35                                    uint64          maxMemory) {
36 
37   int64    procMax       = sysconf(_SC_CHILD_MAX);
38   int64    openMax       = sysconf(_SC_OPEN_MAX) - 16;
39 
40   //
41   //  Load the number of overlaps per read.
42   //
43 
44   fprintf(stderr, "\n");
45   fprintf(stderr, "Finding number of overlaps per read and per file.\n");
46   fprintf(stderr, "\n");
47   fprintf(stderr, "   Moverlaps\n");
48   fprintf(stderr, "------------ ----------------------------------------\n");
49 
50   uint64             *oPF                = new uint64 [_numInputs];
51   uint32             *oPR                = new uint32 [_maxID + 1];
52   uint64              numOverlaps        = 0;
53 
54   memset(oPF, 0, sizeof(uint64) * (_numInputs));
55   memset(oPR, 0, sizeof(uint32) * (_maxID + 1));
56 
57   for (uint32 ii=0; ii<_numInputs; ii++) {
58     ovFile            *inputFile = new ovFile(seq, _inputNames[ii], ovFileFullCounts);
59 
60     for (uint32 rr=0; rr<_maxID + 1; rr++) {
61       oPF[ii] += inputFile->getCounts()->numOverlaps(rr) / 2;   //  Reports counts as if they were
62       oPR[rr] += inputFile->getCounts()->numOverlaps(rr);       //  already symmetrized.
63     }
64 
65     numOverlaps += oPF[ii] * 2;
66 
67     delete inputFile;
68 
69     fprintf(stderr, "%12.3f %40s\n", oPF[ii] / 1000000.0, _inputNames[ii]);
70   }
71 
72   fprintf(stderr, "------------ ----------------------------------------\n");
73   fprintf(stderr, "%12.3f Moverlaps in inputs\n", numOverlaps / 2 / 1000000.0);
74   fprintf(stderr, "%12.3f Moverlaps to sort\n",   numOverlaps     / 1000000.0);
75   fprintf(stderr, "\n");
76 
77   if (numOverlaps == 0)
78     fprintf(stderr, "Found no overlaps to sort.\n");
79 
80 
81   //
82   //  Partition the overlaps into buckets.
83   //
84   //  This will pick the smallest memory size that uses fewer than maxFiles buckets.  Unreasonable
85   //  values can break this - either too low memory or too high allowed open files (an OS limit).
86   //
87 
88   uint64  olapsPerSliceMin = (minMemory - OVSTORE_MEMORY_OVERHEAD) / ovOverlapSortSize;
89   uint64  olapsPerSliceMax = (maxMemory - OVSTORE_MEMORY_OVERHEAD) / ovOverlapSortSize;
90 
91   //  Reset the limits so that the maximum number of overlaps per read can be held in one slice.
92 
93   uint64  maxOverlapsPerRead = 0;
94 
95   for (uint64 olaps=0, ii=0; ii<_maxID+1; ii++)
96     if (maxOverlapsPerRead < oPR[ii])
97       maxOverlapsPerRead = oPR[ii];
98 
99   if ((olapsPerSliceMin < maxOverlapsPerRead) ||
100       (olapsPerSliceMax < maxOverlapsPerRead)) {
101     fprintf(stderr, "WARNING:\n");
102 
103     if (olapsPerSliceMin < maxOverlapsPerRead) {
104       fprintf(stderr, "WARNING:  Increasing minimum memory to handle " F_U64 " overlaps per read.\n", maxOverlapsPerRead);
105       olapsPerSliceMin = maxOverlapsPerRead;
106       minMemory        = maxOverlapsPerRead * ovOverlapSortSize + OVSTORE_MEMORY_OVERHEAD;
107     }
108 
109     if (olapsPerSliceMax < maxOverlapsPerRead) {
110       fprintf(stderr, "WARNING:  Increasing maximum memory to handle " F_U64 " overlaps per read.\n", maxOverlapsPerRead);
111       olapsPerSliceMax = maxOverlapsPerRead;
112       maxMemory        = maxOverlapsPerRead * ovOverlapSortSize + OVSTORE_MEMORY_OVERHEAD;
113     }
114 
115     fprintf(stderr, "WARNING:\n");
116     fprintf(stderr, "\n");
117   }
118 
119   fprintf(stderr, "Configuring for:\n");
120   fprintf(stderr, "  Up to " F_S64 " processes.\n", procMax);
121   fprintf(stderr, "  Up to " F_S64 " open files.\n", openMax);
122 
123   if (minMemory < maxMemory)
124     fprintf(stderr, "  At least %5.2f GB memory -> %4" F_U64P " sort processes with %.2f Moverlaps each.\n",
125             minMemory / 1024.0 / 1024.0 / 1024.0,
126             numOverlaps / olapsPerSliceMin + 1,
127             olapsPerSliceMin / 1000000.0);
128 
129   fprintf(stderr, "  At most  %5.2f GB memory -> %4" F_U64P " sort processes with %.2f Moverlaps each.\n",
130           maxMemory / 1024.0 / 1024.0 / 1024.0,
131           numOverlaps / olapsPerSliceMax + 1,
132           olapsPerSliceMax / 1000000.0);
133   fprintf(stderr, "\n");
134 
135 
136 
137   //  More processes -> higher bandwidth utilization, but can also thrash the FS
138   //  More memory    -> harder to get machines to run on
139   //
140   //  So arbitrarily pick a memory size 75% of the maximum.
141 
142   uint64  sortMemory       = minMemory + 3 * (maxMemory - minMemory) / 4;
143 
144   uint64  olapsPerSlice    = (sortMemory - OVSTORE_MEMORY_OVERHEAD) / ovOverlapSortSize;
145 
146   //  With that upper limit on the number of overlaps per slice, count how many slices
147   //  we need to make.
148 
149   _numSlices = 1;
150 
151   uint64 total = 0;
152 
153   for (uint64 olaps=0, ii=0; ii<_maxID+1; ii++) {
154     if (olaps + oPR[ii] > olapsPerSlice) {
155       olaps = 0;
156       _numSlices++;
157     }
158 
159     olaps += oPR[ii];
160     total += oPR[ii];
161   }
162 
163   //  Divide those overlaps evenly among the slices, but no smaller than
164   //  our minimum (oddly called 'maxOverlapsPerRead').
165 
166   olapsPerSlice = (uint64)ceil((double)numOverlaps / (double)_numSlices) + 1;
167 
168   if (olapsPerSlice < maxOverlapsPerRead)
169     olapsPerSlice = maxOverlapsPerRead;
170 
171   _sortMemory = (olapsPerSlice * ovOverlapSortSize + OVSTORE_MEMORY_OVERHEAD) / 1024.0 / 1024.0 / 1024.0;
172 
173   //  One more time, just to count the number of slices we're making.
174 
175   _numSlices = 1;
176 
177   for (uint64 olaps=0, ii=0; ii<_maxID+1; ii++) {
178     if (olaps + oPR[ii] > olapsPerSlice) {
179       olaps = 0;
180       _numSlices++;
181     }
182 
183     olaps += oPR[ii];
184   }
185 
186   //  Assign inputs to each bucketizer.  Greedy load balancing.
187 
188   //  Essentially a free parameter - lower makes bigger buckets and fewer files.
189   _numBuckets = std::min(_numInputs, _numSlices);
190 
191   uint64  *olapsPerBucket = new uint64 [_numBuckets];
192 
193   for (uint32 ii=0; ii<_numBuckets; ii++)
194     olapsPerBucket[ii] = 0;
195 
196   for (uint32 ss=0, ii=0; ii<_numInputs; ii++) {
197     uint32  mb = 0;
198 
199     for (uint32 bb=0; bb<_numBuckets; bb++)
200       if (olapsPerBucket[bb] < olapsPerBucket[mb])
201         mb = bb;
202 
203     _inputToBucket[ii]  = mb;
204     olapsPerBucket[mb] += oPF[ii] * 2;
205   }
206 
207   delete [] oPF;
208 
209 
210   //  Report ovb to bucket mapping.
211 
212   fprintf(stderr, "------------------------------------------------------------\n");
213   fprintf(stderr, "Will bucketize using " F_U32 " processes.\n", _numBuckets);
214   fprintf(stderr, "\n");
215   fprintf(stderr, "         number    number of\n");
216   fprintf(stderr, " bucket  inputs     overlaps\n");
217   fprintf(stderr, "------- ------- ------------\n");
218 
219   uint64  totOlaps = 0;
220 
221   for (uint32 ii=0; ii<_numBuckets; ii++) {
222     uint32  ni = 0;
223 
224     for (uint32 xx=0; xx<_numInputs; xx++) {
225       if (_inputToBucket[xx] == ii)
226         ni++;
227     }
228 
229     fprintf(stderr, "%7" F_U32P " %7" F_U32P " %12" F_U64P "\n",
230             ii, ni, olapsPerBucket[ii]);
231 
232     totOlaps += olapsPerBucket[ii];
233   }
234 
235   delete [] olapsPerBucket;
236 
237   fprintf(stderr, "------- ------- ------------\n");
238   fprintf(stderr, "                %12" F_U64P "\n", totOlaps);
239   fprintf(stderr, "\n");
240 
241   //  Assign reads to slices.
242 
243   fprintf(stderr, "\n");
244   fprintf(stderr, "------------------------------------------------------------\n");
245   fprintf(stderr, "Will sort using " F_U32 " processes.\n",  _numSlices);
246   fprintf(stderr, "  Up to %7.2f M overlaps\n", olapsPerSlice / 1000000.0);
247   fprintf(stderr, "        %7.2f GB memory\n", _sortMemory);
248   fprintf(stderr, "\n");
249   fprintf(stderr, "          number of\n");
250   fprintf(stderr, " slice     overlaps       read range\n");
251   fprintf(stderr, "------ ------------ ---------------------\n");
252 
253   totOlaps = 0;
254 
255   {
256     uint32  first = 1;
257     uint64  olaps = 0;
258     uint32  slice = 0;
259 
260     for (uint32 ii=0; ii<_maxID+1; ii++) {
261       if (olaps + oPR[ii] > olapsPerSlice) {
262         fprintf(stderr, "%6" F_U32P " %12" F_U64P " %10" F_U32P "-%-10" F_U32P "\n", slice, olaps, first, ii-1);
263         totOlaps += olaps;
264         olaps = 0;
265         slice++;
266         first = ii;
267       }
268 
269       olaps            += oPR[ii];
270       _readToSlice[ii]  = slice;
271     }
272 
273     fprintf(stderr, "%6" F_U32P " %12" F_U64P " %10" F_U32P "-%-10" F_U32P "\n", slice, olaps, first, _maxID);
274     totOlaps += olaps;
275 
276     if (slice + 1 != _numSlices)
277       fprintf(stderr, "ERROR: expected %u slices, generated %u instead.  %lu overlaps per slice allowed.\n",
278               _numSlices, slice, olapsPerSlice);
279     assert(slice + 1 == _numSlices);
280   }
281 
282   fprintf(stderr, "------ ------------\n");
283   fprintf(stderr, "       %12" F_U64P "\n", totOlaps);
284 
285   fprintf(stderr, "\n");
286 
287   delete [] oPR;
288 }
289 
290 
291 
292 
293 int
main(int argc,char ** argv)294 main(int argc, char **argv) {
295   char                      *seqName         = NULL;
296   uint64                     minMemory       = (uint64)1 * 1024 * 1024 * 1024;
297   uint64                     maxMemory       = (uint64)4 * 1024 * 1024 * 1024;
298 
299   std::vector<char const *>  fileList;
300 
301   char const                *configOut       = NULL;
302   char const                *configIn        = NULL;
303 
304   bool                       writeNumBuckets = false;
305   bool                       writeNumSlices  = false;
306   bool                       writeMemory     = false;
307   uint32                     writeInputs     = 0;
308   uint32                     writeSlices     = 0;
309 
310   argc = AS_configure(argc, argv, 1);
311 
312   std::vector<char const *>  err;
313   for (int32 arg=1; arg < argc; arg++) {
314     if        (strcmp(argv[arg], "-S") == 0) {
315       seqName = argv[++arg];
316 
317     } else if (strcmp(argv[arg], "-M") == 0) {
318       double lo=0.0, hi=0.0;
319 
320       decodeRange<double>(argv[++arg], lo, hi);
321 
322       minMemory = (uint64)ceil(lo * 1024.0 * 1024.0 * 1024.0);
323       maxMemory = (uint64)ceil(hi * 1024.0 * 1024.0 * 1024.0);
324 
325     } else if (strcmp(argv[arg], "-L") == 0) {
326       AS_UTL_loadFileList(argv[++arg], fileList);
327 
328     } else if (strcmp(argv[arg], "-create") == 0) {
329       configOut = argv[++arg];
330 
331     } else if (strcmp(argv[arg], "-describe") == 0) {
332       configIn = argv[++arg];
333 
334     } else if (strcmp(argv[arg], "-numbuckets") == 0) {
335       writeNumBuckets = true;
336     } else if (strcmp(argv[arg], "-numslices") == 0) {
337       writeNumSlices = true;
338     } else if (strcmp(argv[arg], "-sortmemory") == 0) {
339       writeMemory = true;
340     } else if (strcmp(argv[arg], "-listinputs") == 0) {
341       writeInputs = strtouint32(argv[++arg]);
342     } else if (strcmp(argv[arg], "-listslices") == 0) {
343       writeSlices = strtouint32(argv[++arg]);
344 
345     } else if (((argv[arg][0] == '-') && (argv[arg][1] == 0)) ||
346                (fileExists(argv[arg]))) {
347       //  Assume it's an input file
348       fileList.push_back(argv[arg]);
349 
350     } else {
351       char *s = new char [1024];
352       snprintf(s, 1024, "%s: unknown option '%s'.\n", argv[0], argv[arg]);
353       err.push_back(s);
354     }
355   }
356 
357   if ((seqName == NULL) && (configIn == NULL))
358     err.push_back("ERROR: No sequence store (-S) supplied.\n");
359 
360   if ((fileList.size() == 0) && (configIn == NULL))
361     err.push_back("ERROR: No input overlap files (-L or last on the command line) supplied.\n");
362 
363   if ((configOut != NULL) && (configIn != NULL))
364     err.push_back("ERROR: Can't both -create -describe a config.\n");
365 
366   if ((configOut == NULL) && (configIn == NULL))
367     err.push_back("ERROR: Must supply one of -create or -describe.\n");
368 
369   if ((minMemory <= OVSTORE_MEMORY_OVERHEAD) ||
370       (maxMemory <= OVSTORE_MEMORY_OVERHEAD + ovOverlapSortSize))
371     err.push_back("ERROR: Memory (-M) must be at least 0.25 GB to account for overhead.\n");  //  , OVSTORE_MEMORY_OVERHEAD / 1024.0 / 1024.0 / 1024.0
372 
373   if (err.size() > 0) {
374     fprintf(stderr, "usage: %s -S asm.seqStore -create out.config [opts] [-L fileList | *.ovb]\n", argv[0]);
375     fprintf(stderr, "  -S asm.seqStore       path to seqStore for this assembly\n");
376     fprintf(stderr, "\n");
377     fprintf(stderr, "  -L fileList           a list of ovb files in 'fileList'\n");
378     fprintf(stderr, "\n");
379     fprintf(stderr, "  -M g                  use up to 'g' gigabytes memory for sorting overlaps\n");
380     fprintf(stderr, "                          default 4; g-0.25 gb is available for sorting overlaps\n");
381     fprintf(stderr, "\n");
382     fprintf(stderr, "  -create config        write overlap store configuration to file 'config'\n");
383     fprintf(stderr, "\n");
384     fprintf(stderr, "  -describe config      write a readable description of the config in 'config' to the screen\n");
385     fprintf(stderr, "  -numbuckets           write the number of buckets to the screen\n");
386     fprintf(stderr, "  -numslices            write the number of slices to the screen\n");
387     fprintf(stderr, "  -sortmemory           write the memory needed (in GB) for a sort job to the screen\n");
388     fprintf(stderr, "  -listinputs n         write a list of the input ovb files needed for bucketizer job 'n'");
389     fprintf(stderr, "  -listslices n         write a list of the input slice files needed for sorter job 'n'\n");
390     fprintf(stderr, "\n");
391     fprintf(stderr, "\n");
392     fprintf(stderr, "Sizes and Limits:\n");
393     fprintf(stderr, "  ovOverlap             " F_S32 " words of " F_S32 " bits each.\n", (int32)ovOverlapNWORDS, (int32)ovOverlapWORDSZ);
394     fprintf(stderr, "  ovOverlapSortSize     " F_S32 " bits\n",      (int32)ovOverlapSortSize * 8);
395     fprintf(stderr, "  SC_CHILD_MAX          " F_S32 " processes\n", (int32)sysconf(_SC_CHILD_MAX));
396     fprintf(stderr, "  SC_OPEN_MAX           " F_S32 " files\n",     (int32)sysconf(_SC_OPEN_MAX));
397     fprintf(stderr, "\n");
398 
399     for (uint32 ii=0; ii<err.size(); ii++)
400       if (err[ii])
401         fputs(err[ii], stderr);
402 
403     exit(1);
404   }
405 
406   ovStoreConfig  *config = NULL;
407 
408   //  If describing, load and describe.
409 
410   if (configIn) {
411     config = new ovStoreConfig(configIn);
412   }
413 
414   //  Check parameters, reset some of them.
415 
416   else {
417     if (minMemory < OVSTORE_MEMORY_OVERHEAD + ovOverlapSortSize) {
418       fprintf(stderr, "Reset minMemory from " F_U64 " to " F_SIZE_T "\n", minMemory, OVSTORE_MEMORY_OVERHEAD + ovOverlapSortSize);
419       minMemory  = OVSTORE_MEMORY_OVERHEAD + ovOverlapSortSize;
420     }
421 
422     sqStore        *seq    = new sqStore(seqName);
423     uint32          maxID  = seq->sqStore_lastReadID();
424 
425     config = new ovStoreConfig(fileList, maxID);
426 
427     config->assignReadsToSlices(seq, minMemory, maxMemory);
428     config->writeConfig(configOut);
429 
430     delete seq;
431   }
432 
433   //  If we have a config, report parameters.
434 
435   if (config) {
436     uint32  memGB = (uint32)ceil(config->sortMemory() + 0.5);  //  Adds an extra 0.5 to 1.49 gb.
437 
438     if (writeNumBuckets) {
439       fprintf(stdout, F_U32 "\n", config->numBuckets());
440     }
441 
442     else if (writeNumSlices) {
443       fprintf(stdout, F_U32 "\n", config->numSlices());
444     }
445 
446     else if (writeMemory) {
447       fprintf(stdout, F_U32 "\n", memGB);
448     }
449 
450     else if (writeInputs) {
451       for (uint32 ff=0; ff<config->numInputs(writeInputs); ff++)
452         fprintf(stdout, "%s\n", config->getInput(writeInputs, ff));
453     }
454 
455     else if (writeSlices) {
456       for (uint32 bb=1; bb<=config->numBuckets(); bb++) {
457         fprintf(stdout, "bucket%04" F_U32P "/slice%04" F_U32P "\n", bb, writeSlices);
458         fprintf(stdout, "bucket%04" F_U32P "/sliceSizes\n", bb);
459       }
460     }
461 
462     else {
463       fprintf(stdout, "\n");
464       fprintf(stdout, "Configured for:\n");
465       fprintf(stdout, "  numBuckets %8" F_U32P "\n", config->numBuckets());
466       fprintf(stdout, "  numSlices  %8" F_U32P "\n", config->numSlices());
467       fprintf(stdout, "  sortMemory %8" F_U32P " GB (%5.3f GB)\n", memGB, config->sortMemory());
468     }
469   }
470 
471   //  Cleanup and quit!
472 
473   delete config;
474 
475   exit(0);
476 }
477