1 /*
2  * JaLingo, http://jalingo.sourceforge.net/
3  *
4  * Copyright (c) 2002-2006 Oleksandr Shyshko
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20 
21 package ja.centre.util.sort.external;
22 
23 import ja.centre.util.assertions.Arguments;
24 import ja.centre.util.sort.ISorter;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 
28 import java.io.IOException;
29 import java.util.Comparator;
30 
31 public class ExternalSorter<T> implements IExternalSorter<T> {
32     private static final Log LOG = LogFactory.getLog( ExternalSorter.class );
33     private IPersister<T> persister;
34     private ISorter<T> sorter;
35     private Comparator<T> comparator;
36     private int valuesInMemory;
37 
ExternalSorter( IPersister<T> persister, ISorter<T> sorter, Comparator<T> comparator, int valuesInMemory)38     public ExternalSorter( IPersister<T> persister, ISorter<T> sorter, Comparator<T> comparator, int valuesInMemory) {
39         Arguments.assertNotNull( "persister", persister );
40         Arguments.assertNotNull( "sorter", sorter );
41         Arguments.assertNotNull( "comparator", comparator );
42         Arguments.assertPositiveNonZero( "valuesInMemory", valuesInMemory );
43 
44         this.persister = persister;
45         this.sorter = sorter;
46         this.comparator = comparator;
47         this.valuesInMemory = valuesInMemory;
48     }
49 
sort( IReader<T> reader, IWriter<T> writer )50     public void sort( IReader<T> reader, IWriter<T> writer ) throws IOException {
51         Arguments.assertNotNull( "reader", reader );
52         Arguments.assertNotNull( "writer", writer );
53 
54         try {
55             doSort( reader, writer );
56         } finally{
57             reader.close();
58             writer.close();
59         }
60     }
61 
doSort( IReader<T> reader, IWriter<T> writer )62     private void doSort( IReader<T> reader, IWriter<T> writer ) throws IOException {
63         IMultipartWriter<T> multipartWriter = createMultipartWriter( persister, sorter, comparator, valuesInMemory );
64 
65         LOG.info( "Splitting to sorted parts..." );
66         while ( reader.hasNext() ) {
67             multipartWriter.write( reader.next() );
68         }
69         multipartWriter.close();
70 
71         // TODO merge with 10 readers at once (make it parameter - mergeFilesAtOnce)
72         LOG.info( "Copying from splitted parts to destination writer" );
73         IReader<T> multipartReader = new MergingMultipartReader<T>(
74                 multipartWriter.getExternalReaders(), comparator );
75         while ( multipartReader.hasNext() ) {
76             writer.write( multipartReader.next() );
77         }
78 
79         LOG.info( "Closing..." );
80         multipartReader.close();
81         multipartWriter.clean();
82     }
83 
createMultipartWriter( IPersister<T> persister, ISorter<T> sorter, Comparator<T> comparator, int valuesInMemory )84     protected IMultipartWriter<T> createMultipartWriter( IPersister<T> persister, ISorter<T> sorter, Comparator<T> comparator, int valuesInMemory ) {
85         return new FileMultipartWriter<T>( persister, sorter, comparator, valuesInMemory );
86     }
87 }