1 use std::collections::{HashMap, HashSet};
2 use std::collections::hash_map::Entry;
3 use std::fs;
4 use std::io;
5 use std::path::Path;
6 
7 use csv;
8 use regex::Regex;
9 
10 use CliResult;
11 use config::{Config, Delimiter};
12 use select::SelectColumns;
13 use util::{self, FilenameTemplate};
14 
15 static USAGE: &'static str = "
16 Partitions the given CSV data into chunks based on the value of a column
17 
18 The files are written to the output directory with filenames based on the
19 values in the partition column and the `--filename` flag.
20 
21 Usage:
22     xsv partition [options] <column> <outdir> [<input>]
23     xsv partition --help
24 
25 partition options:
26     --filename <filename>  A filename template to use when constructing
27                            the names of the output files.  The string '{}'
28                            will be replaced by a value based on the value
29                            of the field, but sanitized for shell safety.
30                            [default: {}.csv]
31     -p, --prefix-length <n>  Truncate the partition column after the
32                            specified number of bytes when creating the
33                            output file.
34 
35 Common options:
36     -h, --help             Display this message
37     -n, --no-headers       When set, the first row will NOT be interpreted
38                            as column names. Otherwise, the first row will
39                            appear in all chunks as the header row.
40     -d, --delimiter <arg>  The field delimiter for reading CSV data.
41                            Must be a single character. (default: ,)
42 ";
43 
44 #[derive(Clone, Deserialize)]
45 struct Args {
46     arg_column: SelectColumns,
47     arg_input: Option<String>,
48     arg_outdir: String,
49     flag_filename: FilenameTemplate,
50     flag_prefix_length: Option<usize>,
51     flag_no_headers: bool,
52     flag_delimiter: Option<Delimiter>,
53 }
54 
run(argv: &[&str]) -> CliResult<()>55 pub fn run(argv: &[&str]) -> CliResult<()> {
56     let args: Args = util::get_args(USAGE, argv)?;
57     fs::create_dir_all(&args.arg_outdir)?;
58 
59     // It would be nice to support efficient parallel partitions, but doing
60     // do would involve more complicated inter-thread communication, with
61     // multiple readers and writers, and some way of passing buffers
62     // between them.
63     args.sequential_partition()
64 }
65 
66 impl Args {
67     /// Configuration for our reader.
rconfig(&self) -> Config68     fn rconfig(&self) -> Config {
69         Config::new(&self.arg_input)
70             .delimiter(self.flag_delimiter)
71             .no_headers(self.flag_no_headers)
72             .select(self.arg_column.clone())
73     }
74 
75     /// Get the column to use as a key.
key_column( &self, rconfig: &Config, headers: &csv::ByteRecord, ) -> CliResult<usize>76     fn key_column(
77         &self,
78         rconfig: &Config,
79         headers: &csv::ByteRecord,
80     ) -> CliResult<usize> {
81         let select_cols = rconfig.selection(headers)?;
82         if select_cols.len() == 1 {
83             Ok(select_cols[0])
84         } else {
85             fail!("can only partition on one column")
86         }
87     }
88 
89     /// A basic sequential partition.
sequential_partition(&self) -> CliResult<()>90     fn sequential_partition(&self) -> CliResult<()> {
91         let rconfig = self.rconfig();
92         let mut rdr = rconfig.reader()?;
93         let headers = rdr.byte_headers()?.clone();
94         let key_col = self.key_column(&rconfig, &headers)?;
95         let mut gen = WriterGenerator::new(self.flag_filename.clone());
96 
97         let mut writers: HashMap<Vec<u8>, BoxedWriter> =
98             HashMap::new();
99         let mut row = csv::ByteRecord::new();
100         while rdr.read_byte_record(&mut row)? {
101             // Decide what file to put this in.
102             let column = &row[key_col];
103             let key = match self.flag_prefix_length {
104                 // We exceed --prefix-length, so ignore the extra bytes.
105                 Some(len) if len < column.len() => &column[0..len],
106                 _ => &column[..],
107             };
108             let mut entry = writers.entry(key.to_vec());
109             let wtr = match entry {
110                 Entry::Occupied(ref mut occupied) => occupied.get_mut(),
111                 Entry::Vacant(vacant) => {
112                     // We have a new key, so make a new writer.
113                     let mut wtr = gen.writer(&*self.arg_outdir, key)?;
114                     if !rconfig.no_headers {
115                         wtr.write_record(&headers)?;
116                     }
117                     vacant.insert(wtr)
118                 }
119             };
120             wtr.write_byte_record(&row)?;
121         }
122         Ok(())
123     }
124 }
125 
126 type BoxedWriter = csv::Writer<Box<io::Write+'static>>;
127 
128 /// Generates unique filenames based on CSV values.
129 struct WriterGenerator {
130     template: FilenameTemplate,
131     counter: usize,
132     used: HashSet<String>,
133     non_word_char: Regex,
134 }
135 
136 impl WriterGenerator {
new(template: FilenameTemplate) -> WriterGenerator137     fn new(template: FilenameTemplate) -> WriterGenerator {
138         WriterGenerator {
139             template: template,
140             counter: 1,
141             used: HashSet::new(),
142             non_word_char: Regex::new(r"\W").unwrap(),
143         }
144     }
145 
146     /// Create a CSV writer for `key`.  Does not add headers.
writer<P>(&mut self, path: P, key: &[u8]) -> io::Result<BoxedWriter> where P: AsRef<Path>147     fn writer<P>(&mut self, path: P, key: &[u8]) -> io::Result<BoxedWriter>
148         where P: AsRef<Path>
149     {
150         let unique_value = self.unique_value(key);
151         self.template.writer(path.as_ref(), &unique_value)
152     }
153 
154     /// Generate a unique value for `key`, suitable for use in a
155     /// "shell-safe" filename.  If you pass `key` twice, you'll get two
156     /// different values.
unique_value(&mut self, key: &[u8]) -> String157     fn unique_value(&mut self, key: &[u8]) -> String {
158         // Sanitize our key.
159         let utf8 = String::from_utf8_lossy(key);
160         let safe = self.non_word_char.replace_all(&*utf8, "").into_owned();
161         let base =
162             if safe.is_empty() {
163                 "empty".to_owned()
164             } else {
165                 safe
166             };
167 
168         // Now check for collisions.
169         if !self.used.contains(&base) {
170             self.used.insert(base.clone());
171             base
172         } else {
173             loop {
174                 let candidate = format!("{}_{}", &base, self.counter);
175                 self.counter = self.counter.checked_add(1).unwrap_or_else(|| {
176                     // We'll run out of other things long before we ever
177                     // reach this, but we'll check just for correctness and
178                     // completeness.
179                     panic!("Cannot generate unique value")
180                 });
181                 if !self.used.contains(&candidate) {
182                     self.used.insert(candidate.clone());
183                     return candidate
184                 }
185             }
186         }
187     }
188 }
189