1 #include "lm/common/compare.hh"
2 #include "lm/common/model_buffer.hh"
3 #include "lm/common/ngram.hh"
4 #include "util/stream/chain.hh"
5 #include "util/stream/multi_stream.hh"
6 #include "util/stream/sort.hh"
7 #include "lm/interpolate/split_worker.hh"
8 
9 #include <boost/program_options.hpp>
10 #include <boost/version.hpp>
11 
12 #if defined(_WIN32) || defined(_WIN64)
13 
14 // Windows doesn't define <unistd.h>
15 //
16 // So we define what we need here instead:
17 //
18 #define STDIN_FILENO = 0
19 #define STDOUT_FILENO = 1
20 #else // Huzzah for POSIX!
21 #include <unistd.h>
22 #endif
23 
24 /*
25  * This is a simple example program that takes in intermediate
26  * suffix-sorted ngram files and outputs two sets of files: one for backoff
27  * probability values (raw numbers, in suffix order) and one for
28  * probability values (ngram id and probability, in *context* order)
29  */
main(int argc,char * argv[])30 int main(int argc, char *argv[]) {
31   using namespace lm::interpolate;
32 
33   const std::size_t ONE_GB = 1 << 30;
34   const std::size_t SIXTY_FOUR_MB = 1 << 26;
35   const std::size_t NUMBER_OF_BLOCKS = 2;
36 
37   std::string FILE_NAME = "ngrams";
38   std::string CONTEXT_SORTED_FILENAME = "csorted-ngrams";
39   std::string BACKOFF_FILENAME = "backoffs";
40   std::string TMP_DIR = "/tmp/";
41 
42   try {
43     namespace po = boost::program_options;
44     po::options_description options("canhazinterp Pass-3 options");
45 
46     options.add_options()
47       ("help,h", po::bool_switch(), "Show this help message")
48       ("ngrams,n", po::value<std::string>(&FILE_NAME), "ngrams file")
49       ("csortngrams,c", po::value<std::string>(&CONTEXT_SORTED_FILENAME), "context sorted ngrams file")
50       ("backoffs,b", po::value<std::string>(&BACKOFF_FILENAME), "backoffs file")
51       ("tmpdir,t", po::value<std::string>(&TMP_DIR), "tmp dir");
52     po::variables_map vm;
53     po::store(po::parse_command_line(argc, argv, options), vm);
54 
55     // Display help
56     if(vm["help"].as<bool>()) {
57       std::cerr << "Usage: " << options << std::endl;
58       return 1;
59     }
60   }
61   catch(const std::exception &e) {
62 
63     std::cerr << e.what() << std::endl;
64     return 1;
65 
66   }
67 
68   // The basic strategy here is to have three chains:
69   // - The first reads the ngram order inputs using ModelBuffer. Those are
70   //   then stripped of their backoff values and fed into the third chain;
71   //   the backoff values *themselves* are written to the second chain.
72   //
73   // - The second chain takes the backoff values and writes them out to a
74   //   file (one for each order).
75   //
76   // - The third chain takes just the probability values and ngrams and
77   //   writes them out, sorted in context-order, to a file (one for each
78   //   order).
79 
80   // This will be used to read in the binary intermediate files. There is
81   // one file per order (e.g. ngrams.1, ngrams.2, ...)
82   lm::ModelBuffer buffer(FILE_NAME);
83 
84   // Create a separate chains for each ngram order for:
85   // - Input from the intermediate files
86   // - Output to the backoff file
87   // - Output to the (context-sorted) probability file
88   util::stream::Chains ngram_inputs(buffer.Order());
89   util::stream::Chains backoff_chains(buffer.Order());
90   util::stream::Chains prob_chains(buffer.Order());
91   for (std::size_t i = 0; i < buffer.Order(); ++i) {
92     ngram_inputs.push_back(util::stream::ChainConfig(
93         lm::NGram<lm::ProbBackoff>::TotalSize(i + 1), NUMBER_OF_BLOCKS, ONE_GB));
94 
95     backoff_chains.push_back(
96         util::stream::ChainConfig(sizeof(float), NUMBER_OF_BLOCKS, ONE_GB));
97 
98     prob_chains.push_back(util::stream::ChainConfig(
99         sizeof(lm::WordIndex) * (i + 1) + sizeof(float), NUMBER_OF_BLOCKS,
100         ONE_GB));
101   }
102 
103   // This sets the input for each of the ngram order chains to the
104   // appropriate file
105   buffer.Source(ngram_inputs);
106 
107   util::FixedArray<util::scoped_ptr<SplitWorker> > workers(buffer.Order());
108   for (std::size_t i = 0; i < buffer.Order(); ++i) {
109     // Attach a SplitWorker to each of the ngram input chains, writing to the
110     // corresponding order's backoff and probability chains
111     workers.push_back(
112         new SplitWorker(i + 1, backoff_chains[i], prob_chains[i]));
113     ngram_inputs[i] >> boost::ref(*workers.back());
114   }
115 
116   util::stream::SortConfig sort_cfg;
117   sort_cfg.temp_prefix = TMP_DIR;
118   sort_cfg.buffer_size = SIXTY_FOUR_MB;
119   sort_cfg.total_memory = ONE_GB;
120 
121   // This will parallel merge sort the individual order files, putting
122   // them in context-order instead of suffix-order.
123   //
124   // Two new threads will be running, each owned by the chains[i] object.
125   // - The first executes BlockSorter.Run() to sort the n-gram entries
126   // - The second executes WriteAndRecycle.Run() to write each sorted
127   //   block to disk as a temporary file
128   util::stream::Sorts<lm::ContextOrder> sorts(buffer.Order());
129   for (std::size_t i = 0; i < prob_chains.size(); ++i) {
130     sorts.push_back(prob_chains[i], sort_cfg, lm::ContextOrder(i + 1));
131   }
132 
133   // Set the sort output to be on the same chain
134   for (std::size_t i = 0; i < prob_chains.size(); ++i) {
135     // The following call to Chain::Wait()
136     //     joins the threads owned by chains[i].
137     //
138     // As such the following call won't return
139     //     until all threads owned by chains[i] have completed.
140     //
141     // The following call also resets chain[i]
142     //     so that it can be reused
143     //     (including free'ing the memory previously used by the chain)
144     prob_chains[i].Wait();
145 
146     // In an ideal world (without memory restrictions)
147     //     we could merge all of the previously sorted blocks
148     //     by reading them all completely into memory
149     //     and then running merge sort over them.
150     //
151     // In the real world, we have memory restrictions;
152     //     depending on how many blocks we have,
153     //     and how much memory we can use to read from each block
154     //     (sort_config.buffer_size)
155     //     it may be the case that we have insufficient memory
156     //     to read sort_config.buffer_size of data from each block from disk.
157     //
158     // If this occurs, then it will be necessary to perform one or more rounds
159     // of merge sort on disk;
160     //     doing so will reduce the number of blocks that we will eventually
161     //     need to read from
162     //     when performing the final round of merge sort in memory.
163     //
164     // So, the following call determines whether it is necessary
165     //     to perform one or more rounds of merge sort on disk;
166     //     if such on-disk merge sorting is required, such sorting is performed.
167     //
168     // Finally, the following method launches a thread that calls
169     // OwningMergingReader.Run()
170     //     to perform the final round of merge sort in memory.
171     //
172     // Merge sort could have be invoked directly
173     //     so that merge sort memory doesn't coexist with Chain memory.
174     sorts[i].Output(prob_chains[i]);
175   }
176 
177   // Create another model buffer for our output on e.g. csorted-ngrams.1,
178   // csorted-ngrams.2, ...
179   lm::ModelBuffer output_buf(CONTEXT_SORTED_FILENAME, true, false);
180   output_buf.Sink(prob_chains, buffer.Counts());
181 
182   // Create a third model buffer for our backoff output on e.g. backoff.1,
183   // backoff.2, ...
184   lm::ModelBuffer boff_buf(BACKOFF_FILENAME, true, false);
185   boff_buf.Sink(backoff_chains, buffer.Counts());
186 
187   // Joins all threads that chains owns,
188   //    and does a for loop over each chain object in chains,
189   //    calling chain.Wait() on each such chain object
190   ngram_inputs.Wait(true);
191   backoff_chains.Wait(true);
192   prob_chains.Wait(true);
193 
194   return 0;
195 }
196