1 use std::collections::{HashMap, HashSet};
2 use std::collections::hash_map::Entry;
3 use std::fs;
4 use std::io;
5 use std::path::Path;
6
7 use csv;
8 use regex::Regex;
9
10 use CliResult;
11 use config::{Config, Delimiter};
12 use select::SelectColumns;
13 use util::{self, FilenameTemplate};
14
15 static USAGE: &'static str = "
16 Partitions the given CSV data into chunks based on the value of a column
17
18 The files are written to the output directory with filenames based on the
19 values in the partition column and the `--filename` flag.
20
21 Usage:
22 xsv partition [options] <column> <outdir> [<input>]
23 xsv partition --help
24
25 partition options:
26 --filename <filename> A filename template to use when constructing
27 the names of the output files. The string '{}'
28 will be replaced by a value based on the value
29 of the field, but sanitized for shell safety.
30 [default: {}.csv]
31 -p, --prefix-length <n> Truncate the partition column after the
32 specified number of bytes when creating the
33 output file.
34
35 Common options:
36 -h, --help Display this message
37 -n, --no-headers When set, the first row will NOT be interpreted
38 as column names. Otherwise, the first row will
39 appear in all chunks as the header row.
40 -d, --delimiter <arg> The field delimiter for reading CSV data.
41 Must be a single character. (default: ,)
42 ";
43
44 #[derive(Clone, Deserialize)]
45 struct Args {
46 arg_column: SelectColumns,
47 arg_input: Option<String>,
48 arg_outdir: String,
49 flag_filename: FilenameTemplate,
50 flag_prefix_length: Option<usize>,
51 flag_no_headers: bool,
52 flag_delimiter: Option<Delimiter>,
53 }
54
run(argv: &[&str]) -> CliResult<()>55 pub fn run(argv: &[&str]) -> CliResult<()> {
56 let args: Args = util::get_args(USAGE, argv)?;
57 fs::create_dir_all(&args.arg_outdir)?;
58
59 // It would be nice to support efficient parallel partitions, but doing
60 // do would involve more complicated inter-thread communication, with
61 // multiple readers and writers, and some way of passing buffers
62 // between them.
63 args.sequential_partition()
64 }
65
66 impl Args {
67 /// Configuration for our reader.
rconfig(&self) -> Config68 fn rconfig(&self) -> Config {
69 Config::new(&self.arg_input)
70 .delimiter(self.flag_delimiter)
71 .no_headers(self.flag_no_headers)
72 .select(self.arg_column.clone())
73 }
74
75 /// Get the column to use as a key.
key_column( &self, rconfig: &Config, headers: &csv::ByteRecord, ) -> CliResult<usize>76 fn key_column(
77 &self,
78 rconfig: &Config,
79 headers: &csv::ByteRecord,
80 ) -> CliResult<usize> {
81 let select_cols = rconfig.selection(headers)?;
82 if select_cols.len() == 1 {
83 Ok(select_cols[0])
84 } else {
85 fail!("can only partition on one column")
86 }
87 }
88
89 /// A basic sequential partition.
sequential_partition(&self) -> CliResult<()>90 fn sequential_partition(&self) -> CliResult<()> {
91 let rconfig = self.rconfig();
92 let mut rdr = rconfig.reader()?;
93 let headers = rdr.byte_headers()?.clone();
94 let key_col = self.key_column(&rconfig, &headers)?;
95 let mut gen = WriterGenerator::new(self.flag_filename.clone());
96
97 let mut writers: HashMap<Vec<u8>, BoxedWriter> =
98 HashMap::new();
99 let mut row = csv::ByteRecord::new();
100 while rdr.read_byte_record(&mut row)? {
101 // Decide what file to put this in.
102 let column = &row[key_col];
103 let key = match self.flag_prefix_length {
104 // We exceed --prefix-length, so ignore the extra bytes.
105 Some(len) if len < column.len() => &column[0..len],
106 _ => &column[..],
107 };
108 let mut entry = writers.entry(key.to_vec());
109 let wtr = match entry {
110 Entry::Occupied(ref mut occupied) => occupied.get_mut(),
111 Entry::Vacant(vacant) => {
112 // We have a new key, so make a new writer.
113 let mut wtr = gen.writer(&*self.arg_outdir, key)?;
114 if !rconfig.no_headers {
115 wtr.write_record(&headers)?;
116 }
117 vacant.insert(wtr)
118 }
119 };
120 wtr.write_byte_record(&row)?;
121 }
122 Ok(())
123 }
124 }
125
126 type BoxedWriter = csv::Writer<Box<io::Write+'static>>;
127
128 /// Generates unique filenames based on CSV values.
129 struct WriterGenerator {
130 template: FilenameTemplate,
131 counter: usize,
132 used: HashSet<String>,
133 non_word_char: Regex,
134 }
135
136 impl WriterGenerator {
new(template: FilenameTemplate) -> WriterGenerator137 fn new(template: FilenameTemplate) -> WriterGenerator {
138 WriterGenerator {
139 template: template,
140 counter: 1,
141 used: HashSet::new(),
142 non_word_char: Regex::new(r"\W").unwrap(),
143 }
144 }
145
146 /// Create a CSV writer for `key`. Does not add headers.
writer<P>(&mut self, path: P, key: &[u8]) -> io::Result<BoxedWriter> where P: AsRef<Path>147 fn writer<P>(&mut self, path: P, key: &[u8]) -> io::Result<BoxedWriter>
148 where P: AsRef<Path>
149 {
150 let unique_value = self.unique_value(key);
151 self.template.writer(path.as_ref(), &unique_value)
152 }
153
154 /// Generate a unique value for `key`, suitable for use in a
155 /// "shell-safe" filename. If you pass `key` twice, you'll get two
156 /// different values.
unique_value(&mut self, key: &[u8]) -> String157 fn unique_value(&mut self, key: &[u8]) -> String {
158 // Sanitize our key.
159 let utf8 = String::from_utf8_lossy(key);
160 let safe = self.non_word_char.replace_all(&*utf8, "").into_owned();
161 let base =
162 if safe.is_empty() {
163 "empty".to_owned()
164 } else {
165 safe
166 };
167
168 // Now check for collisions.
169 if !self.used.contains(&base) {
170 self.used.insert(base.clone());
171 base
172 } else {
173 loop {
174 let candidate = format!("{}_{}", &base, self.counter);
175 self.counter = self.counter.checked_add(1).unwrap_or_else(|| {
176 // We'll run out of other things long before we ever
177 // reach this, but we'll check just for correctness and
178 // completeness.
179 panic!("Cannot generate unique value")
180 });
181 if !self.used.contains(&candidate) {
182 self.used.insert(candidate.clone());
183 return candidate
184 }
185 }
186 }
187 }
188 }
189