1 /****************************************************************************
2  *
3  *  MODULE:     iostream
4  *
5 
6  *  COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  *  Iostream is a library that implements streams, external memory
11  *  sorting on streams, and an external memory priority queue on
12  *  streams. These are the fundamental components used in external
13  *  memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma.  The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001.  PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  *  This program is free software; you can redistribute it and/or modify
25  *  it under the terms of the GNU General Public License as published by
26  *  the Free Software Foundation; either version 2 of the License, or
27  *  (at your option) any later version.
28  *
29 
30  *  This program is distributed in the hope that it will be useful,
31  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
32  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
33  *  General Public License for more details.  *
34  *  **************************************************************************/
35 
36 
37 
38 #ifndef AMI_SORT_IMPL_H
39 #define AMI_SORT_IMPL_H
40 
41 #include "ami_stream.h"
42 #include "mem_stream.h"
43 #include "mm.h"
44 #include "quicksort.h"
45 #include "queue.h"
46 #include "replacementHeap.h"
47 #include "replacementHeapBlock.h"
48 
49 #define SDEBUG if(0)
50 
51 
52 /* if this flag is defined, a run will be split into blocks, each
53    block sorted and then all blocks merged */
54 #define BLOCKED_RUN
55 
56 
57 /* ---------------------------------------------------------------------- */
58 //set run_size, last_run_size and nb_runs depending on how much memory
59 //is available
60 template<class T>
61 static void
initializeRunFormation(AMI_STREAM<T> * instream,size_t & run_size,size_t & last_run_size,unsigned int & nb_runs)62 initializeRunFormation(AMI_STREAM<T> *instream,
63 		       size_t &run_size, size_t &last_run_size,
64 		       unsigned int &nb_runs) {
65 
66   size_t mm_avail = MM_manager.memory_available();
67   off_t strlen;
68 
69 #ifdef BLOCKED_RUN
70   // not in place, can only use half memory
71   mm_avail = mm_avail/2;
72 #endif
73   run_size = mm_avail/sizeof(T);
74 
75 
76   strlen = instream->stream_len();
77   if (strlen == 0) {
78     nb_runs = last_run_size = 0;
79   } else {
80     if (strlen % run_size == 0) {
81       nb_runs = strlen/run_size;
82       last_run_size = run_size;
83     } else {
84       nb_runs = strlen/run_size + 1;
85       last_run_size = strlen % run_size;
86     }
87   }
88 
89   SDEBUG cout << "nb_runs=" << nb_runs
90 	      << ", run_size=" << run_size
91 	      << ", last_run_size=" << last_run_size
92 	      << "\n";
93 }
94 
95 
96 
97 /* ---------------------------------------------------------------------- */
98 /* data is allocated; read run_size elements from stream into data and
99    sort them using quicksort */
100 template<class T, class Compare>
makeRun_Block(AMI_STREAM<T> * instream,T * data,unsigned int run_size,Compare * cmp)101 size_t makeRun_Block(AMI_STREAM<T> *instream, T* data,
102 		   unsigned int run_size, Compare *cmp) {
103   AMI_err err;
104   off_t new_run_size = 0;
105 
106   //read next run from input stream
107   err = instream->read_array(data, run_size, &new_run_size);
108   assert(err == AMI_ERROR_NO_ERROR || err == AMI_ERROR_END_OF_STREAM);
109 
110   //sort it in memory in place
111   quicksort(data, new_run_size, *cmp);
112 
113   return new_run_size;
114 }
115 
116 
117 /* ---------------------------------------------------------------------- */
118 /* data is allocated; read run_size elements from stream into data and
119    sort them using quicksort; instead of reading the whole chunk at
120    once, it reads it in blocks, sorts each block and then merges the
121    blocks together. Note: it is not in place! it allocates another
122    array of same size as data, writes the sorted run into it and
123    deteles data, and replaces data with outdata */
124 template<class T, class Compare>
makeRun(AMI_STREAM<T> * instream,T * & data,int run_size,Compare * cmp)125 void makeRun(AMI_STREAM<T> *instream, T* &data,
126 	     int run_size, Compare *cmp) {
127 
128   unsigned int nblocks, last_block_size, crt_block_size, block_size;
129 
130 
131   block_size = STREAM_BUFFER_SIZE;
132 
133   if (run_size % block_size == 0) {
134     nblocks = run_size / block_size;
135     last_block_size = block_size;
136   } else {
137     nblocks = run_size / block_size + 1;
138     last_block_size = run_size % block_size;
139   }
140 
141   //create queue of blocks waiting to be merged
142   queue<MEM_STREAM<T> *> *blockList;
143   MEM_STREAM<T>* str;
144   blockList  = new  queue<MEM_STREAM<T> *>(nblocks);
145   for (unsigned int i=0; i < nblocks; i++) {
146     crt_block_size = (i == nblocks-1) ? last_block_size: block_size;
147     makeRun_Block(instream, &(data[i*block_size]), crt_block_size, cmp);
148     str = new MEM_STREAM<T>( &(data[i*block_size]), crt_block_size);
149     blockList->enqueue(str);
150   }
151   assert(blockList->length() == nblocks);
152 
153   //now data consists of sorted blocks: merge them
154   ReplacementHeapBlock<T,Compare> rheap(blockList);
155   SDEBUG rheap.print(cerr);
156   int i = 0;
157   T elt;
158   T* outdata = new T [run_size];
159   while (!rheap.empty()) {
160     elt = rheap.extract_min();
161     outdata[i] = elt;
162     //SDEBUG cerr << "makeRun: written " << elt << endl;
163     i++;
164   }
165   assert( i == run_size &&  blockList->length() == 0);
166   delete blockList;
167 
168   T* tmp = data;
169   delete [] tmp;
170   data = outdata;
171 }
172 
173 
174 
175 /* ---------------------------------------------------------------------- */
176 
177 //partition instream in streams that fit in main memory, sort each
178 //stream, remember its name, make it persistent and store it on
179 //disk. if entire stream fits in memory, sort it and store it and
180 //return it.
181 
182 //assume instream is allocated prior to the call.
183 // set nb_runs and allocate runNames.
184 
185 //The comparison object "cmp", of (user-defined) class represented by
186 //Compare, must have a member function called "compare" which is used
187 //for sorting the input stream.
188 
189 template<class T, class Compare>
190 queue<char*>*
runFormation(AMI_STREAM<T> * instream,Compare * cmp)191 runFormation(AMI_STREAM<T> *instream, Compare *cmp) {
192 
193   size_t run_size,last_run_size, crt_run_size;
194   unsigned int nb_runs;
195   queue<char*>* runList;
196   T* data;
197   AMI_STREAM<T>* str;
198   char* strname;
199 
200   assert(instream && cmp);
201   SDEBUG cout << "runFormation: ";
202   SDEBUG MM_manager.print();
203 
204   /* leave this in for now, in case some file-based implementations do
205      anything funny... -RW */
206   //rewind file
207   instream->seek(0); //should check error xxx
208 
209   //estimate run_size, last_run_size and nb_runs
210   initializeRunFormation(instream, run_size, last_run_size, nb_runs);
211 
212   //create runList (if 0 size, queue uses default)
213   runList = new queue<char*>(nb_runs);
214 
215   /* allocate space for a run */
216   if (nb_runs <= 1) {
217     //don't waste space if input stream is smaller than run_size
218     data = new T[last_run_size];
219   } else {
220     data = new T[run_size];
221   }
222   SDEBUG MM_manager.print();
223 
224   for (size_t i=0; i< nb_runs; i++) {
225   //while(!instream->eof()) {
226     crt_run_size = (i == nb_runs-1) ? last_run_size: run_size;
227 
228     SDEBUG cout << "i=" << i << ":  runsize=" << crt_run_size << ", ";
229 
230     //crt_run_size = makeRun_Block(instream, data, run_size, cmp);
231 #ifdef BLOCKED_RUN
232     makeRun(instream, data, crt_run_size, cmp);
233 #else
234     makeRun_Block(instream, data, crt_run_size, cmp);
235 #endif
236 
237     SDEBUG MM_manager.print();
238 
239     //read next run from input stream
240     //err = instream->read_array(data, crt_run_size);
241     //assert(err == AMI_ERROR_NO_ERROR);
242     //sort it in memory in place
243     //quicksort(data, crt_run_size, *cmp);
244 
245     if(crt_run_size > 0) {
246       //create a new stream to hold this run
247       str = new AMI_STREAM<T>();
248       str->write_array(data, crt_run_size);
249       assert(str->stream_len() == crt_run_size);
250 
251       //remember this run's name
252       str->name(&strname);	/* deleted after we dequeue */
253       runList->enqueue(strname);
254       //delete the stream -- should not keep too many streams open
255       str->persist(PERSIST_PERSISTENT);
256       delete str;
257     }
258 
259   }
260   SDEBUG MM_manager.print();
261   //release the run memory!
262   delete [] data;
263 
264   SDEBUG cout << "runFormation: done.\n";
265   SDEBUG MM_manager.print();
266 
267   return runList;
268 }
269 
270 
271 
272 
273 
274 
275 /* ---------------------------------------------------------------------- */
276 
277 //this is one pass of merge; estimate max possible merge arity <ar>
278 //and merge the first <ar> streams from runList ; create and return
279 //the resulting stream (does not add it to the queue -- the calling
280 //function will do that)
281 
282 //input streams are assumed to be sorted, and are not necessarily of
283 //the same length.
284 
285 //streamList does not contains streams, but names of streams, which
286 //must be opened in order to be merged
287 
288 //The comparison object "cmp", of (user-defined) class represented by
289 //Compare, must have a member function called "compare" which is used
290 //for sorting the input stream.
291 
292 
293 template<class T, class Compare>
294 AMI_STREAM<T>*
singleMerge(queue<char * > * streamList,Compare * cmp)295 singleMerge(queue<char*>* streamList, Compare *cmp)
296 {
297   AMI_STREAM<T>* mergedStr;
298   size_t mm_avail, blocksize;
299   unsigned int arity, max_arity;
300   T elt;
301 
302   assert(streamList && cmp);
303 
304   SDEBUG cout << "singleMerge: ";
305 
306   //estimate max possible merge arity with available memory (approx M/B)
307   mm_avail = MM_manager.memory_available();
308   //blocksize = getpagesize();
309   //should use AMI function, but there's no stream at this point
310   //now use static mtd -RW 5/05
311   AMI_STREAM<T>::main_memory_usage(&blocksize, MM_STREAM_USAGE_MAXIMUM);
312   max_arity = mm_avail / blocksize;
313   if(max_arity < 2) {
314 	cerr << __FILE__ ":" << __LINE__ << ": OUT OF MEMORY in singleMerge (going over limit)" << endl;
315 	max_arity = 2;
316   } else if(max_arity > MAX_STREAMS_OPEN) {
317 	max_arity = MAX_STREAMS_OPEN;
318   }
319   arity = (streamList->length() < max_arity) ?
320     streamList->length() :  max_arity;
321 
322   SDEBUG cout << "arity=" << arity << " (max_arity=" <<max_arity<< ")\n";
323 
324   /* create the output stream. if this is a complete merge, use finalpath */
325   //create output stream
326   mergedStr = new AMI_STREAM<T>();
327 
328   ReplacementHeap<T,Compare> rheap(arity, streamList);
329   SDEBUG rheap.print(cerr);
330 
331   int i = 0;
332   while (!rheap.empty()) {
333     //mergedStr->write_item( rheap.extract_min() );
334     //xxx should check error here
335     elt = rheap.extract_min();
336     mergedStr->write_item(elt);
337     //SDEBUG cerr << "smerge: written " << elt << endl;
338     i++;
339   }
340 
341   SDEBUG cout << "..done\n";
342 
343   return mergedStr;
344 }
345 
346 
347 
348 
349 /* ---------------------------------------------------------------------- */
350 
351 //merge runs whose names are given by runList; this may entail
352 //multiple passes of singleMerge();
353 
354 //return the resulting output stream
355 
356 //input streams are assumed to be sorted, and are not necessarily of
357 //the same length.
358 
359 //The comparison object "cmp", of (user-defined) class represented by
360 //Compare, must have a member function called "compare" which is used
361 //for sorting the input stream.
362 
363 
364 template<class T, class Compare>
365 AMI_STREAM<T>*
multiMerge(queue<char * > * runList,Compare * cmp)366 multiMerge(queue<char*>* runList, Compare *cmp)
367 {
368   AMI_STREAM<T> * mergedStr= NULL;
369   char* path;
370 
371   assert(runList && runList->length() > 1  && cmp);
372 
373   SDEBUG cout << "multiMerge: " << runList->length() << " runs"  << endl;
374 
375   while (runList->length() > 1) {
376 
377     //merge streams from streamlist into mergedStr
378     mergedStr = singleMerge<T,Compare>(runList, cmp);
379     //i thought the templates are not needed in the call, but seems to
380     //help the compiler..laura
381     assert(mergedStr);
382 
383     //if more runs exist, delete this stream and add it to list
384     if (runList->length() > 0) {
385       mergedStr->name(&path);
386       runList->enqueue(path);
387       mergedStr->persist(PERSIST_PERSISTENT);
388       delete mergedStr;
389     }
390   }
391 
392   assert(runList->length() == 0);
393   assert(mergedStr);
394   return mergedStr;
395 }
396 
397 
398 
399 
400 #endif
401 
402