1 use std::fs;
2 use std::io;
3 use std::path::Path;
4
5 use chan;
6 use csv;
7 use threadpool::ThreadPool;
8
9 use CliResult;
10 use config::{Config, Delimiter};
11 use index::Indexed;
12 use util::{self, FilenameTemplate};
13
14 static USAGE: &'static str = "
15 Splits the given CSV data into chunks.
16
17 The files are written to the directory given with the name '{start}.csv',
18 where {start} is the index of the first record of the chunk (starting at 0).
19
20 Usage:
21 xsv split [options] <outdir> [<input>]
22 xsv split --help
23
24 split options:
25 -s, --size <arg> The number of records to write into each chunk.
26 [default: 500]
27 -j, --jobs <arg> The number of spliting jobs to run in parallel.
28 This only works when the given CSV data has
29 an index already created. Note that a file handle
30 is opened for each job.
31 When set to '0', the number of jobs is set to the
32 number of CPUs detected.
33 [default: 0]
34 --filename <filename> A filename template to use when constructing
35 the names of the output files. The string '{}'
36 will be replaced by a value based on the value
37 of the field, but sanitized for shell safety.
38 [default: {}.csv]
39
40 Common options:
41 -h, --help Display this message
42 -n, --no-headers When set, the first row will NOT be interpreted
43 as column names. Otherwise, the first row will
44 appear in all chunks as the header row.
45 -d, --delimiter <arg> The field delimiter for reading CSV data.
46 Must be a single character. (default: ,)
47 ";
48
49 #[derive(Clone, Deserialize)]
50 struct Args {
51 arg_input: Option<String>,
52 arg_outdir: String,
53 flag_size: usize,
54 flag_jobs: usize,
55 flag_filename: FilenameTemplate,
56 flag_no_headers: bool,
57 flag_delimiter: Option<Delimiter>,
58 }
59
run(argv: &[&str]) -> CliResult<()>60 pub fn run(argv: &[&str]) -> CliResult<()> {
61 let args: Args = util::get_args(USAGE, argv)?;
62 if args.flag_size == 0 {
63 return fail!("--size must be greater than 0.");
64 }
65 fs::create_dir_all(&args.arg_outdir)?;
66
67 match args.rconfig().indexed()? {
68 Some(idx) => args.parallel_split(idx),
69 None => args.sequential_split(),
70 }
71 }
72
73 impl Args {
sequential_split(&self) -> CliResult<()>74 fn sequential_split(&self) -> CliResult<()> {
75 let rconfig = self.rconfig();
76 let mut rdr = rconfig.reader()?;
77 let headers = rdr.byte_headers()?.clone();
78
79 let mut wtr = self.new_writer(&headers, 0)?;
80 let mut i = 0;
81 let mut row = csv::ByteRecord::new();
82 while rdr.read_byte_record(&mut row)? {
83 if i > 0 && i % self.flag_size == 0 {
84 wtr.flush()?;
85 wtr = self.new_writer(&headers, i)?;
86 }
87 wtr.write_byte_record(&row)?;
88 i += 1;
89 }
90 wtr.flush()?;
91 Ok(())
92 }
93
parallel_split( &self, idx: Indexed<fs::File, fs::File>, ) -> CliResult<()>94 fn parallel_split(
95 &self,
96 idx: Indexed<fs::File, fs::File>,
97 ) -> CliResult<()> {
98 let nchunks = util::num_of_chunks(
99 idx.count() as usize, self.flag_size);
100 let pool = ThreadPool::new(self.njobs());
101 let wg = chan::WaitGroup::new();
102 for i in 0..nchunks {
103 wg.add(1);
104 let args = self.clone();
105 let wg = wg.clone();
106 pool.execute(move || {
107 let conf = args.rconfig();
108 let mut idx = conf.indexed().unwrap().unwrap();
109 let headers = idx.byte_headers().unwrap().clone();
110 let mut wtr = args
111 .new_writer(&headers, i * args.flag_size)
112 .unwrap();
113
114 idx.seek((i * args.flag_size) as u64).unwrap();
115 for row in idx.byte_records().take(args.flag_size) {
116 let row = row.unwrap();
117 wtr.write_byte_record(&row).unwrap();
118 }
119 wtr.flush().unwrap();
120 wg.done();
121 });
122 }
123 wg.wait();
124 Ok(())
125 }
126
new_writer( &self, headers: &csv::ByteRecord, start: usize, ) -> CliResult<csv::Writer<Box<io::Write+'static>>>127 fn new_writer(
128 &self,
129 headers: &csv::ByteRecord,
130 start: usize,
131 ) -> CliResult<csv::Writer<Box<io::Write+'static>>> {
132 let dir = Path::new(&self.arg_outdir);
133 let path = dir.join(self.flag_filename.filename(&format!("{}", start)));
134 let spath = Some(path.display().to_string());
135 let mut wtr = Config::new(&spath).writer()?;
136 if !self.rconfig().no_headers {
137 wtr.write_record(headers)?;
138 }
139 Ok(wtr)
140 }
141
rconfig(&self) -> Config142 fn rconfig(&self) -> Config {
143 Config::new(&self.arg_input)
144 .delimiter(self.flag_delimiter)
145 .no_headers(self.flag_no_headers)
146 }
147
njobs(&self) -> usize148 fn njobs(&self) -> usize {
149 if self.flag_jobs == 0 {
150 util::num_cpus()
151 } else {
152 self.flag_jobs
153 }
154 }
155 }
156