1 /****************************************************************************
2 *
3 * MODULE: iostream
4 *
5
6 * COPYRIGHT (C) 2007 Laura Toma
7 *
8 *
9
10 * Iostream is a library that implements streams, external memory
11 * sorting on streams, and an external memory priority queue on
12 * streams. These are the fundamental components used in external
13 * memory algorithms.
14
15 * Credits: The library was developed by Laura Toma. The kernel of
16 * class STREAM is based on the similar class existent in the GPL TPIE
17 * project developed at Duke University. The sorting and priority
18 * queue have been developed by Laura Toma based on communications
19 * with Rajiv Wickremesinghe. The library was developed as part of
20 * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21 * Rajiv Wickremesinghe as part of the Terracost project.
22
23 *
24 * This program is free software; you can redistribute it and/or modify
25 * it under the terms of the GNU General Public License as published by
26 * the Free Software Foundation; either version 2 of the License, or
27 * (at your option) any later version.
28 *
29
30 * This program is distributed in the hope that it will be useful,
31 * but WITHOUT ANY WARRANTY; without even the implied warranty of
32 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33 * General Public License for more details. *
34 * **************************************************************************/
35
36
37
38 #ifndef AMI_SORT_IMPL_H
39 #define AMI_SORT_IMPL_H
40
41 #include "ami_stream.h"
42 #include "mem_stream.h"
43 #include "mm.h"
44 #include "quicksort.h"
45 #include "queue.h"
46 #include "replacementHeap.h"
47 #include "replacementHeapBlock.h"
48
49 #define SDEBUG if(0)
50
51
52 /* if this flag is defined, a run will be split into blocks, each
53 block sorted and then all blocks merged */
54 #define BLOCKED_RUN
55
56
57 /* ---------------------------------------------------------------------- */
58 //set run_size, last_run_size and nb_runs depending on how much memory
59 //is available
60 template<class T>
61 static void
initializeRunFormation(AMI_STREAM<T> * instream,size_t & run_size,size_t & last_run_size,unsigned int & nb_runs)62 initializeRunFormation(AMI_STREAM<T> *instream,
63 size_t &run_size, size_t &last_run_size,
64 unsigned int &nb_runs) {
65
66 size_t mm_avail = MM_manager.memory_available();
67 off_t strlen;
68
69 #ifdef BLOCKED_RUN
70 // not in place, can only use half memory
71 mm_avail = mm_avail/2;
72 #endif
73 run_size = mm_avail/sizeof(T);
74
75
76 strlen = instream->stream_len();
77 if (strlen == 0) {
78 nb_runs = last_run_size = 0;
79 } else {
80 if (strlen % run_size == 0) {
81 nb_runs = strlen/run_size;
82 last_run_size = run_size;
83 } else {
84 nb_runs = strlen/run_size + 1;
85 last_run_size = strlen % run_size;
86 }
87 }
88
89 SDEBUG cout << "nb_runs=" << nb_runs
90 << ", run_size=" << run_size
91 << ", last_run_size=" << last_run_size
92 << "\n";
93 }
94
95
96
97 /* ---------------------------------------------------------------------- */
98 /* data is allocated; read run_size elements from stream into data and
99 sort them using quicksort */
100 template<class T, class Compare>
makeRun_Block(AMI_STREAM<T> * instream,T * data,unsigned int run_size,Compare * cmp)101 size_t makeRun_Block(AMI_STREAM<T> *instream, T* data,
102 unsigned int run_size, Compare *cmp) {
103 AMI_err err;
104 off_t new_run_size = 0;
105
106 //read next run from input stream
107 err = instream->read_array(data, run_size, &new_run_size);
108 assert(err == AMI_ERROR_NO_ERROR || err == AMI_ERROR_END_OF_STREAM);
109
110 //sort it in memory in place
111 quicksort(data, new_run_size, *cmp);
112
113 return new_run_size;
114 }
115
116
117 /* ---------------------------------------------------------------------- */
118 /* data is allocated; read run_size elements from stream into data and
119 sort them using quicksort; instead of reading the whole chunk at
120 once, it reads it in blocks, sorts each block and then merges the
121 blocks together. Note: it is not in place! it allocates another
122 array of same size as data, writes the sorted run into it and
123 deteles data, and replaces data with outdata */
124 template<class T, class Compare>
makeRun(AMI_STREAM<T> * instream,T * & data,int run_size,Compare * cmp)125 void makeRun(AMI_STREAM<T> *instream, T* &data,
126 int run_size, Compare *cmp) {
127
128 unsigned int nblocks, last_block_size, crt_block_size, block_size;
129
130
131 block_size = STREAM_BUFFER_SIZE;
132
133 if (run_size % block_size == 0) {
134 nblocks = run_size / block_size;
135 last_block_size = block_size;
136 } else {
137 nblocks = run_size / block_size + 1;
138 last_block_size = run_size % block_size;
139 }
140
141 //create queue of blocks waiting to be merged
142 queue<MEM_STREAM<T> *> *blockList;
143 MEM_STREAM<T>* str;
144 blockList = new queue<MEM_STREAM<T> *>(nblocks);
145 for (unsigned int i=0; i < nblocks; i++) {
146 crt_block_size = (i == nblocks-1) ? last_block_size: block_size;
147 makeRun_Block(instream, &(data[i*block_size]), crt_block_size, cmp);
148 str = new MEM_STREAM<T>( &(data[i*block_size]), crt_block_size);
149 blockList->enqueue(str);
150 }
151 assert(blockList->length() == nblocks);
152
153 //now data consists of sorted blocks: merge them
154 ReplacementHeapBlock<T,Compare> rheap(blockList);
155 SDEBUG rheap.print(cerr);
156 int i = 0;
157 T elt;
158 T* outdata = new T [run_size];
159 while (!rheap.empty()) {
160 elt = rheap.extract_min();
161 outdata[i] = elt;
162 //SDEBUG cerr << "makeRun: written " << elt << endl;
163 i++;
164 }
165 assert( i == run_size && blockList->length() == 0);
166 delete blockList;
167
168 T* tmp = data;
169 delete [] tmp;
170 data = outdata;
171 }
172
173
174
175 /* ---------------------------------------------------------------------- */
176
177 //partition instream in streams that fit in main memory, sort each
178 //stream, remember its name, make it persistent and store it on
179 //disk. if entire stream fits in memory, sort it and store it and
180 //return it.
181
182 //assume instream is allocated prior to the call.
183 // set nb_runs and allocate runNames.
184
185 //The comparison object "cmp", of (user-defined) class represented by
186 //Compare, must have a member function called "compare" which is used
187 //for sorting the input stream.
188
189 template<class T, class Compare>
190 queue<char*>*
runFormation(AMI_STREAM<T> * instream,Compare * cmp)191 runFormation(AMI_STREAM<T> *instream, Compare *cmp) {
192
193 size_t run_size,last_run_size, crt_run_size;
194 unsigned int nb_runs;
195 queue<char*>* runList;
196 T* data;
197 AMI_STREAM<T>* str;
198 char* strname;
199
200 assert(instream && cmp);
201 SDEBUG cout << "runFormation: ";
202 SDEBUG MM_manager.print();
203
204 /* leave this in for now, in case some file-based implementations do
205 anything funny... -RW */
206 //rewind file
207 instream->seek(0); //should check error xxx
208
209 //estimate run_size, last_run_size and nb_runs
210 initializeRunFormation(instream, run_size, last_run_size, nb_runs);
211
212 //create runList (if 0 size, queue uses default)
213 runList = new queue<char*>(nb_runs);
214
215 /* allocate space for a run */
216 if (nb_runs <= 1) {
217 //don't waste space if input stream is smaller than run_size
218 data = new T[last_run_size];
219 } else {
220 data = new T[run_size];
221 }
222 SDEBUG MM_manager.print();
223
224 for (size_t i=0; i< nb_runs; i++) {
225 //while(!instream->eof()) {
226 crt_run_size = (i == nb_runs-1) ? last_run_size: run_size;
227
228 SDEBUG cout << "i=" << i << ": runsize=" << crt_run_size << ", ";
229
230 //crt_run_size = makeRun_Block(instream, data, run_size, cmp);
231 #ifdef BLOCKED_RUN
232 makeRun(instream, data, crt_run_size, cmp);
233 #else
234 makeRun_Block(instream, data, crt_run_size, cmp);
235 #endif
236
237 SDEBUG MM_manager.print();
238
239 //read next run from input stream
240 //err = instream->read_array(data, crt_run_size);
241 //assert(err == AMI_ERROR_NO_ERROR);
242 //sort it in memory in place
243 //quicksort(data, crt_run_size, *cmp);
244
245 if(crt_run_size > 0) {
246 //create a new stream to hold this run
247 str = new AMI_STREAM<T>();
248 str->write_array(data, crt_run_size);
249 assert(str->stream_len() == crt_run_size);
250
251 //remember this run's name
252 str->name(&strname); /* deleted after we dequeue */
253 runList->enqueue(strname);
254 //delete the stream -- should not keep too many streams open
255 str->persist(PERSIST_PERSISTENT);
256 delete str;
257 }
258
259 }
260 SDEBUG MM_manager.print();
261 //release the run memory!
262 delete [] data;
263
264 SDEBUG cout << "runFormation: done.\n";
265 SDEBUG MM_manager.print();
266
267 return runList;
268 }
269
270
271
272
273
274
275 /* ---------------------------------------------------------------------- */
276
277 //this is one pass of merge; estimate max possible merge arity <ar>
278 //and merge the first <ar> streams from runList ; create and return
279 //the resulting stream (does not add it to the queue -- the calling
280 //function will do that)
281
282 //input streams are assumed to be sorted, and are not necessarily of
283 //the same length.
284
285 //streamList does not contains streams, but names of streams, which
286 //must be opened in order to be merged
287
288 //The comparison object "cmp", of (user-defined) class represented by
289 //Compare, must have a member function called "compare" which is used
290 //for sorting the input stream.
291
292
293 template<class T, class Compare>
294 AMI_STREAM<T>*
singleMerge(queue<char * > * streamList,Compare * cmp)295 singleMerge(queue<char*>* streamList, Compare *cmp)
296 {
297 AMI_STREAM<T>* mergedStr;
298 size_t mm_avail, blocksize;
299 unsigned int arity, max_arity;
300 T elt;
301
302 assert(streamList && cmp);
303
304 SDEBUG cout << "singleMerge: ";
305
306 //estimate max possible merge arity with available memory (approx M/B)
307 mm_avail = MM_manager.memory_available();
308 //blocksize = getpagesize();
309 //should use AMI function, but there's no stream at this point
310 //now use static mtd -RW 5/05
311 AMI_STREAM<T>::main_memory_usage(&blocksize, MM_STREAM_USAGE_MAXIMUM);
312 max_arity = mm_avail / blocksize;
313 if(max_arity < 2) {
314 cerr << __FILE__ ":" << __LINE__ << ": OUT OF MEMORY in singleMerge (going over limit)" << endl;
315 max_arity = 2;
316 } else if(max_arity > MAX_STREAMS_OPEN) {
317 max_arity = MAX_STREAMS_OPEN;
318 }
319 arity = (streamList->length() < max_arity) ?
320 streamList->length() : max_arity;
321
322 SDEBUG cout << "arity=" << arity << " (max_arity=" <<max_arity<< ")\n";
323
324 /* create the output stream. if this is a complete merge, use finalpath */
325 //create output stream
326 mergedStr = new AMI_STREAM<T>();
327
328 ReplacementHeap<T,Compare> rheap(arity, streamList);
329 SDEBUG rheap.print(cerr);
330
331 int i = 0;
332 while (!rheap.empty()) {
333 //mergedStr->write_item( rheap.extract_min() );
334 //xxx should check error here
335 elt = rheap.extract_min();
336 mergedStr->write_item(elt);
337 //SDEBUG cerr << "smerge: written " << elt << endl;
338 i++;
339 }
340
341 SDEBUG cout << "..done\n";
342
343 return mergedStr;
344 }
345
346
347
348
349 /* ---------------------------------------------------------------------- */
350
351 //merge runs whose names are given by runList; this may entail
352 //multiple passes of singleMerge();
353
354 //return the resulting output stream
355
356 //input streams are assumed to be sorted, and are not necessarily of
357 //the same length.
358
359 //The comparison object "cmp", of (user-defined) class represented by
360 //Compare, must have a member function called "compare" which is used
361 //for sorting the input stream.
362
363
364 template<class T, class Compare>
365 AMI_STREAM<T>*
multiMerge(queue<char * > * runList,Compare * cmp)366 multiMerge(queue<char*>* runList, Compare *cmp)
367 {
368 AMI_STREAM<T> * mergedStr= NULL;
369 char* path;
370
371 assert(runList && runList->length() > 1 && cmp);
372
373 SDEBUG cout << "multiMerge: " << runList->length() << " runs" << endl;
374
375 while (runList->length() > 1) {
376
377 //merge streams from streamlist into mergedStr
378 mergedStr = singleMerge<T,Compare>(runList, cmp);
379 //i thought the templates are not needed in the call, but seems to
380 //help the compiler..laura
381 assert(mergedStr);
382
383 //if more runs exist, delete this stream and add it to list
384 if (runList->length() > 0) {
385 mergedStr->name(&path);
386 runList->enqueue(path);
387 mergedStr->persist(PERSIST_PERSISTENT);
388 delete mergedStr;
389 }
390 }
391
392 assert(runList->length() == 0);
393 assert(mergedStr);
394 return mergedStr;
395 }
396
397
398
399
400 #endif
401
402