1 #include "lm/common/compare.hh"
2 #include "lm/common/model_buffer.hh"
3 #include "lm/common/ngram.hh"
4 #include "util/stream/chain.hh"
5 #include "util/stream/multi_stream.hh"
6 #include "util/stream/sort.hh"
7 #include "lm/interpolate/split_worker.hh"
8
9 #include <boost/program_options.hpp>
10 #include <boost/version.hpp>
11
12 #if defined(_WIN32) || defined(_WIN64)
13
14 // Windows doesn't define <unistd.h>
15 //
16 // So we define what we need here instead:
17 //
18 #define STDIN_FILENO = 0
19 #define STDOUT_FILENO = 1
20 #else // Huzzah for POSIX!
21 #include <unistd.h>
22 #endif
23
24 /*
25 * This is a simple example program that takes in intermediate
26 * suffix-sorted ngram files and outputs two sets of files: one for backoff
27 * probability values (raw numbers, in suffix order) and one for
28 * probability values (ngram id and probability, in *context* order)
29 */
main(int argc,char * argv[])30 int main(int argc, char *argv[]) {
31 using namespace lm::interpolate;
32
33 const std::size_t ONE_GB = 1 << 30;
34 const std::size_t SIXTY_FOUR_MB = 1 << 26;
35 const std::size_t NUMBER_OF_BLOCKS = 2;
36
37 std::string FILE_NAME = "ngrams";
38 std::string CONTEXT_SORTED_FILENAME = "csorted-ngrams";
39 std::string BACKOFF_FILENAME = "backoffs";
40 std::string TMP_DIR = "/tmp/";
41
42 try {
43 namespace po = boost::program_options;
44 po::options_description options("canhazinterp Pass-3 options");
45
46 options.add_options()
47 ("help,h", po::bool_switch(), "Show this help message")
48 ("ngrams,n", po::value<std::string>(&FILE_NAME), "ngrams file")
49 ("csortngrams,c", po::value<std::string>(&CONTEXT_SORTED_FILENAME), "context sorted ngrams file")
50 ("backoffs,b", po::value<std::string>(&BACKOFF_FILENAME), "backoffs file")
51 ("tmpdir,t", po::value<std::string>(&TMP_DIR), "tmp dir");
52 po::variables_map vm;
53 po::store(po::parse_command_line(argc, argv, options), vm);
54
55 // Display help
56 if(vm["help"].as<bool>()) {
57 std::cerr << "Usage: " << options << std::endl;
58 return 1;
59 }
60 }
61 catch(const std::exception &e) {
62
63 std::cerr << e.what() << std::endl;
64 return 1;
65
66 }
67
68 // The basic strategy here is to have three chains:
69 // - The first reads the ngram order inputs using ModelBuffer. Those are
70 // then stripped of their backoff values and fed into the third chain;
71 // the backoff values *themselves* are written to the second chain.
72 //
73 // - The second chain takes the backoff values and writes them out to a
74 // file (one for each order).
75 //
76 // - The third chain takes just the probability values and ngrams and
77 // writes them out, sorted in context-order, to a file (one for each
78 // order).
79
80 // This will be used to read in the binary intermediate files. There is
81 // one file per order (e.g. ngrams.1, ngrams.2, ...)
82 lm::ModelBuffer buffer(FILE_NAME);
83
84 // Create a separate chains for each ngram order for:
85 // - Input from the intermediate files
86 // - Output to the backoff file
87 // - Output to the (context-sorted) probability file
88 util::stream::Chains ngram_inputs(buffer.Order());
89 util::stream::Chains backoff_chains(buffer.Order());
90 util::stream::Chains prob_chains(buffer.Order());
91 for (std::size_t i = 0; i < buffer.Order(); ++i) {
92 ngram_inputs.push_back(util::stream::ChainConfig(
93 lm::NGram<lm::ProbBackoff>::TotalSize(i + 1), NUMBER_OF_BLOCKS, ONE_GB));
94
95 backoff_chains.push_back(
96 util::stream::ChainConfig(sizeof(float), NUMBER_OF_BLOCKS, ONE_GB));
97
98 prob_chains.push_back(util::stream::ChainConfig(
99 sizeof(lm::WordIndex) * (i + 1) + sizeof(float), NUMBER_OF_BLOCKS,
100 ONE_GB));
101 }
102
103 // This sets the input for each of the ngram order chains to the
104 // appropriate file
105 buffer.Source(ngram_inputs);
106
107 util::FixedArray<util::scoped_ptr<SplitWorker> > workers(buffer.Order());
108 for (std::size_t i = 0; i < buffer.Order(); ++i) {
109 // Attach a SplitWorker to each of the ngram input chains, writing to the
110 // corresponding order's backoff and probability chains
111 workers.push_back(
112 new SplitWorker(i + 1, backoff_chains[i], prob_chains[i]));
113 ngram_inputs[i] >> boost::ref(*workers.back());
114 }
115
116 util::stream::SortConfig sort_cfg;
117 sort_cfg.temp_prefix = TMP_DIR;
118 sort_cfg.buffer_size = SIXTY_FOUR_MB;
119 sort_cfg.total_memory = ONE_GB;
120
121 // This will parallel merge sort the individual order files, putting
122 // them in context-order instead of suffix-order.
123 //
124 // Two new threads will be running, each owned by the chains[i] object.
125 // - The first executes BlockSorter.Run() to sort the n-gram entries
126 // - The second executes WriteAndRecycle.Run() to write each sorted
127 // block to disk as a temporary file
128 util::stream::Sorts<lm::ContextOrder> sorts(buffer.Order());
129 for (std::size_t i = 0; i < prob_chains.size(); ++i) {
130 sorts.push_back(prob_chains[i], sort_cfg, lm::ContextOrder(i + 1));
131 }
132
133 // Set the sort output to be on the same chain
134 for (std::size_t i = 0; i < prob_chains.size(); ++i) {
135 // The following call to Chain::Wait()
136 // joins the threads owned by chains[i].
137 //
138 // As such the following call won't return
139 // until all threads owned by chains[i] have completed.
140 //
141 // The following call also resets chain[i]
142 // so that it can be reused
143 // (including free'ing the memory previously used by the chain)
144 prob_chains[i].Wait();
145
146 // In an ideal world (without memory restrictions)
147 // we could merge all of the previously sorted blocks
148 // by reading them all completely into memory
149 // and then running merge sort over them.
150 //
151 // In the real world, we have memory restrictions;
152 // depending on how many blocks we have,
153 // and how much memory we can use to read from each block
154 // (sort_config.buffer_size)
155 // it may be the case that we have insufficient memory
156 // to read sort_config.buffer_size of data from each block from disk.
157 //
158 // If this occurs, then it will be necessary to perform one or more rounds
159 // of merge sort on disk;
160 // doing so will reduce the number of blocks that we will eventually
161 // need to read from
162 // when performing the final round of merge sort in memory.
163 //
164 // So, the following call determines whether it is necessary
165 // to perform one or more rounds of merge sort on disk;
166 // if such on-disk merge sorting is required, such sorting is performed.
167 //
168 // Finally, the following method launches a thread that calls
169 // OwningMergingReader.Run()
170 // to perform the final round of merge sort in memory.
171 //
172 // Merge sort could have be invoked directly
173 // so that merge sort memory doesn't coexist with Chain memory.
174 sorts[i].Output(prob_chains[i]);
175 }
176
177 // Create another model buffer for our output on e.g. csorted-ngrams.1,
178 // csorted-ngrams.2, ...
179 lm::ModelBuffer output_buf(CONTEXT_SORTED_FILENAME, true, false);
180 output_buf.Sink(prob_chains, buffer.Counts());
181
182 // Create a third model buffer for our backoff output on e.g. backoff.1,
183 // backoff.2, ...
184 lm::ModelBuffer boff_buf(BACKOFF_FILENAME, true, false);
185 boff_buf.Sink(backoff_chains, buffer.Counts());
186
187 // Joins all threads that chains owns,
188 // and does a for loop over each chain object in chains,
189 // calling chain.Wait() on each such chain object
190 ngram_inputs.Wait(true);
191 backoff_chains.Wait(true);
192 prob_chains.Wait(true);
193
194 return 0;
195 }
196