1 use std::fs;
2 use std::io;
3 use std::path::Path;
4 
5 use chan;
6 use csv;
7 use threadpool::ThreadPool;
8 
9 use CliResult;
10 use config::{Config, Delimiter};
11 use index::Indexed;
12 use util::{self, FilenameTemplate};
13 
14 static USAGE: &'static str = "
15 Splits the given CSV data into chunks.
16 
17 The files are written to the directory given with the name '{start}.csv',
18 where {start} is the index of the first record of the chunk (starting at 0).
19 
20 Usage:
21     xsv split [options] <outdir> [<input>]
22     xsv split --help
23 
24 split options:
25     -s, --size <arg>       The number of records to write into each chunk.
26                            [default: 500]
27     -j, --jobs <arg>       The number of spliting jobs to run in parallel.
28                            This only works when the given CSV data has
29                            an index already created. Note that a file handle
30                            is opened for each job.
31                            When set to '0', the number of jobs is set to the
32                            number of CPUs detected.
33                            [default: 0]
34     --filename <filename>  A filename template to use when constructing
35                            the names of the output files.  The string '{}'
36                            will be replaced by a value based on the value
37                            of the field, but sanitized for shell safety.
38                            [default: {}.csv]
39 
40 Common options:
41     -h, --help             Display this message
42     -n, --no-headers       When set, the first row will NOT be interpreted
43                            as column names. Otherwise, the first row will
44                            appear in all chunks as the header row.
45     -d, --delimiter <arg>  The field delimiter for reading CSV data.
46                            Must be a single character. (default: ,)
47 ";
48 
49 #[derive(Clone, Deserialize)]
50 struct Args {
51     arg_input: Option<String>,
52     arg_outdir: String,
53     flag_size: usize,
54     flag_jobs: usize,
55     flag_filename: FilenameTemplate,
56     flag_no_headers: bool,
57     flag_delimiter: Option<Delimiter>,
58 }
59 
run(argv: &[&str]) -> CliResult<()>60 pub fn run(argv: &[&str]) -> CliResult<()> {
61     let args: Args = util::get_args(USAGE, argv)?;
62     if args.flag_size == 0 {
63         return fail!("--size must be greater than 0.");
64     }
65     fs::create_dir_all(&args.arg_outdir)?;
66 
67     match args.rconfig().indexed()? {
68         Some(idx) => args.parallel_split(idx),
69         None => args.sequential_split(),
70     }
71 }
72 
73 impl Args {
sequential_split(&self) -> CliResult<()>74     fn sequential_split(&self) -> CliResult<()> {
75         let rconfig = self.rconfig();
76         let mut rdr = rconfig.reader()?;
77         let headers = rdr.byte_headers()?.clone();
78 
79         let mut wtr = self.new_writer(&headers, 0)?;
80         let mut i = 0;
81         let mut row = csv::ByteRecord::new();
82         while rdr.read_byte_record(&mut row)? {
83             if i > 0 && i % self.flag_size == 0 {
84                 wtr.flush()?;
85                 wtr = self.new_writer(&headers, i)?;
86             }
87             wtr.write_byte_record(&row)?;
88             i += 1;
89         }
90         wtr.flush()?;
91         Ok(())
92     }
93 
parallel_split( &self, idx: Indexed<fs::File, fs::File>, ) -> CliResult<()>94     fn parallel_split(
95         &self,
96         idx: Indexed<fs::File, fs::File>,
97     ) -> CliResult<()> {
98         let nchunks = util::num_of_chunks(
99             idx.count() as usize, self.flag_size);
100         let pool = ThreadPool::new(self.njobs());
101         let wg = chan::WaitGroup::new();
102         for i in 0..nchunks {
103             wg.add(1);
104             let args = self.clone();
105             let wg = wg.clone();
106             pool.execute(move || {
107                 let conf = args.rconfig();
108                 let mut idx = conf.indexed().unwrap().unwrap();
109                 let headers = idx.byte_headers().unwrap().clone();
110                 let mut wtr = args
111                     .new_writer(&headers, i * args.flag_size)
112                     .unwrap();
113 
114                 idx.seek((i * args.flag_size) as u64).unwrap();
115                 for row in idx.byte_records().take(args.flag_size) {
116                     let row = row.unwrap();
117                     wtr.write_byte_record(&row).unwrap();
118                 }
119                 wtr.flush().unwrap();
120                 wg.done();
121             });
122         }
123         wg.wait();
124         Ok(())
125     }
126 
new_writer( &self, headers: &csv::ByteRecord, start: usize, ) -> CliResult<csv::Writer<Box<io::Write+'static>>>127     fn new_writer(
128         &self,
129         headers: &csv::ByteRecord,
130         start: usize,
131     ) -> CliResult<csv::Writer<Box<io::Write+'static>>> {
132         let dir = Path::new(&self.arg_outdir);
133         let path = dir.join(self.flag_filename.filename(&format!("{}", start)));
134         let spath = Some(path.display().to_string());
135         let mut wtr = Config::new(&spath).writer()?;
136         if !self.rconfig().no_headers {
137             wtr.write_record(headers)?;
138         }
139         Ok(wtr)
140     }
141 
rconfig(&self) -> Config142     fn rconfig(&self) -> Config {
143         Config::new(&self.arg_input)
144             .delimiter(self.flag_delimiter)
145             .no_headers(self.flag_no_headers)
146     }
147 
njobs(&self) -> usize148     fn njobs(&self) -> usize {
149         if self.flag_jobs == 0 {
150             util::num_cpus()
151         } else {
152             self.flag_jobs
153         }
154     }
155 }
156