1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4
5 #include "lib/mlrutil.h"
6 #include "lib/mlr_globals.h"
7 #include "containers/lrec.h"
8 #include "containers/sllv.h"
9 #include "input/lrec_readers.h"
10 #include "mapping/mappers.h"
11 #include "output/lrec_writers.h"
12
13 static int do_stream_chained_in_place(context_t* pctx, cli_opts_t* popts);
14 static int do_stream_chained_to_stdout(context_t* pctx, sllv_t* pmapper_list, cli_opts_t* popts);
15
16 static int do_file_chained(char* filename, context_t* pctx,
17 lrec_reader_t* plrec_reader, sllv_t* pmapper_list, lrec_writer_t* plrec_writer, FILE* output_stream,
18 cli_opts_t* popts);
19
20 static sllv_t* chain_map(lrec_t* pinrec, context_t* pctx, sllve_t* pmapper_list_head);
21
22 static void drive_lrec(lrec_t* pinrec, context_t* pctx, sllve_t* pmapper_list_head, lrec_writer_t* plrec_writer,
23 FILE* output_stream);
24
25 typedef void progress_indicator_t(context_t* pctx, long long nr_progress_mod);
26 static void null_progress_indicator(context_t* pctx, long long nr_progress_mod);
27 static void stderr_progress_indicator(context_t* pctx, long long nr_progress_mod);
28
29 // ----------------------------------------------------------------
do_stream_chained(context_t * pctx,sllv_t * pmapper_list,cli_opts_t * popts)30 int do_stream_chained(context_t* pctx, sllv_t* pmapper_list, cli_opts_t* popts) {
31 if (popts->do_in_place) {
32 return do_stream_chained_in_place(pctx, popts);
33 } else {
34 return do_stream_chained_to_stdout(pctx, pmapper_list, popts);
35 }
36 }
37
38 // ----------------------------------------------------------------
39 // For in-place mode, reconstruct the mappers on each input file. E.g. 'mlr -I head -n 2
40 // foo bar' should do head -n 2 on foo as well as on bar.
41 //
42 // I could have implemented this with a single construction of the mappers and having each
43 // mapper implement a virtual reset() method. However, having effectively two initalizers
44 // per mapper -- constructor and reset method -- I'd surely miss some logic somewhere.
45 // With in-place mode being a less frequently used code path, this would likely lead to
46 // latent bugs. So while it seems inelegant to re-parse part of the command line over and
47 // over again to reconstruct mappers for each file, this approach leads to greater code
48 // stability.
49
do_stream_chained_in_place(context_t * pctx,cli_opts_t * popts)50 static int do_stream_chained_in_place(context_t* pctx, cli_opts_t* popts) {
51 MLR_INTERNAL_CODING_ERROR_IF(popts->filenames == NULL);
52 MLR_INTERNAL_CODING_ERROR_IF(popts->filenames->length == 0);
53
54 int ok = 1;
55
56 // Read from each file name in turn
57 for (sllse_t* pe = popts->filenames->phead; pe != NULL; pe = pe->pnext) {
58
59 // Allocate reader, mappers, and writer individually for each file name.
60 // This way CSV headers appear in each file, head -n 10 puts 10 rows for
61 // each output file, and so on.
62 lrec_reader_t* plrec_reader = lrec_reader_alloc_or_die(&popts->reader_opts);
63 lrec_writer_t* plrec_writer = lrec_writer_alloc_or_die(&popts->writer_opts);
64
65 // Note that the command-line parsers can operate destructively on argv,
66 // e.g. verbs which take comma-delimited field names splitting on commas.
67 // For this reason we need to duplicate argv on each run. We need to free
68 // after processing in case mappers have retained pointers into argv.
69
70 int argi = popts->mapper_argb;
71 char** argv_copy = copy_argv(popts->original_argv);
72 sllv_t* pmapper_list = cli_parse_mappers(argv_copy, &argi, popts->argc, popts);
73 // Should not have been allowed by the CLI parser:
74 MLR_INTERNAL_CODING_ERROR_IF(pmapper_list->length < 1);
75
76 char* filename = pe->value;
77 char* tempname = alloc_suffixed_temp_file_name(filename);
78 FILE* output_stream = fopen(tempname, "wb");
79 if (output_stream == NULL) {
80 perror("fopen");
81 fprintf(stderr, "%s: Could not open \"%s\" for write.\n",
82 MLR_GLOBALS.bargv0, tempname);
83 exit(1);
84 }
85
86 pctx->filenum++;
87 pctx->filename = filename;
88 pctx->fnr = 0;
89
90 ok = do_file_chained(filename, pctx, plrec_reader, pmapper_list, plrec_writer,
91 output_stream, popts) && ok;
92
93 // For in-place mode, there's no breaking from the loop over input files. Just an early
94 // return from the mapper chain, which has already just happened.
95 if (pctx->force_eof == TRUE) // e.g. mlr head
96 pctx->force_eof = FALSE;
97
98 // Mappers and writers receive end-of-stream notifications via null input record.
99 // Do that, now that data from the input file have been exhausted.
100 drive_lrec(NULL, pctx, pmapper_list->phead, plrec_writer, output_stream);
101 // Drain the pretty-printer.
102 plrec_writer->pprocess_func(plrec_writer->pvstate, output_stream, NULL, pctx);
103
104 fclose(output_stream);
105 int rc = rename(tempname, filename);
106 if (rc != 0) {
107 perror("rename");
108 fprintf(stderr, "%s: Could not rename \"%s\" to \"%s\".\n",
109 MLR_GLOBALS.bargv0, tempname, filename);
110 exit(1);
111 }
112 free(tempname);
113
114 plrec_reader->pfree_func(plrec_reader);
115 plrec_writer->pfree_func(plrec_writer, pctx);
116
117 mapper_chain_free(pmapper_list, pctx);
118
119 free_argv_copy(argv_copy);
120 }
121
122 return ok;
123 }
124
125 // ----------------------------------------------------------------
do_stream_chained_to_stdout(context_t * pctx,sllv_t * pmapper_list,cli_opts_t * popts)126 static int do_stream_chained_to_stdout(context_t* pctx, sllv_t* pmapper_list, cli_opts_t* popts) {
127 FILE* output_stream = stdout;
128
129 lrec_reader_t* plrec_reader = lrec_reader_alloc_or_die(&popts->reader_opts);
130 lrec_writer_t* plrec_writer = lrec_writer_alloc_or_die(&popts->writer_opts);
131
132 MLR_INTERNAL_CODING_ERROR_IF(pmapper_list->length < 1); // Should not have been allowed by the CLI parser.
133
134 int ok = 1;
135 if (popts->filenames == NULL) {
136 // No input at all
137 } else if (popts->filenames->length == 0) {
138 // Zero file names means read from standard input
139 pctx->filenum++;
140 pctx->filename = "(stdin)";
141 pctx->fnr = 0;
142 ok = do_file_chained("-", pctx, plrec_reader, pmapper_list, plrec_writer,
143 output_stream, popts) && ok;
144 } else {
145 // Read from each file name in turn
146 for (sllse_t* pe = popts->filenames->phead; pe != NULL; pe = pe->pnext) {
147 char* filename = pe->value;
148 pctx->filenum++;
149 pctx->filename = filename;
150 pctx->fnr = 0;
151 ok = do_file_chained(filename, pctx, plrec_reader, pmapper_list, plrec_writer,
152 output_stream, popts) && ok;
153 if (pctx->force_eof == TRUE) // e.g. mlr head
154 break;
155 }
156 }
157
158 // Mappers and writers receive end-of-stream notifications via null input record.
159 // Do that, now that data from all input file(s) have been exhausted.
160 drive_lrec(NULL, pctx, pmapper_list->phead, plrec_writer, output_stream);
161
162 // Drain the pretty-printer.
163 plrec_writer->pprocess_func(plrec_writer->pvstate, output_stream, NULL, pctx);
164
165 plrec_reader->pfree_func(plrec_reader);
166 plrec_writer->pfree_func(plrec_writer, pctx);
167
168 return ok;
169 }
170
171 // ----------------------------------------------------------------
do_file_chained(char * filename,context_t * pctx,lrec_reader_t * plrec_reader,sllv_t * pmapper_list,lrec_writer_t * plrec_writer,FILE * output_stream,cli_opts_t * popts)172 static int do_file_chained(char* filename, context_t* pctx,
173 lrec_reader_t* plrec_reader, sllv_t* pmapper_list, lrec_writer_t* plrec_writer, FILE* output_stream,
174 cli_opts_t* popts)
175 {
176 void* pvhandle = plrec_reader->popen_func(plrec_reader->pvstate, popts->reader_opts.prepipe, filename);
177 progress_indicator_t* pindicator = popts->nr_progress_mod == 0LL
178 ? null_progress_indicator
179 : stderr_progress_indicator;
180
181 // Start-of-file hook, e.g. expecting CSV headers on input.
182 plrec_reader->psof_func(plrec_reader->pvstate, pvhandle);
183
184 while (1) {
185 lrec_t* pinrec = plrec_reader->pprocess_func(plrec_reader->pvstate, pvhandle, pctx);
186 if (pinrec == NULL)
187 break;
188 if (pctx->force_eof == TRUE) { // e.g. mlr head
189 lrec_free(pinrec);
190 break;
191 }
192 pctx->nr++;
193 pctx->fnr++;
194
195 pindicator(pctx, popts->nr_progress_mod);
196
197 drive_lrec(pinrec, pctx, pmapper_list->phead, plrec_writer, output_stream);
198 }
199
200 plrec_reader->pclose_func(plrec_reader->pvstate, pvhandle, popts->reader_opts.prepipe);
201 return 1;
202 }
203
204 // ----------------------------------------------------------------
drive_lrec(lrec_t * pinrec,context_t * pctx,sllve_t * pmapper_list_head,lrec_writer_t * plrec_writer,FILE * output_stream)205 static void drive_lrec(lrec_t* pinrec, context_t* pctx, sllve_t* pmapper_list_head, lrec_writer_t* plrec_writer,
206 FILE* output_stream)
207 {
208 sllv_t* outrecs = chain_map(pinrec, pctx, pmapper_list_head);
209 if (outrecs != NULL) {
210 for (sllve_t* pe = outrecs->phead; pe != NULL; pe = pe->pnext) {
211 lrec_t* poutrec = pe->pvvalue;
212 if (poutrec != NULL) // writer frees records (sllv void-star payload)
213 plrec_writer->pprocess_func(plrec_writer->pvstate, output_stream, poutrec, pctx);
214 }
215 sllv_free(outrecs); // we free the list
216 }
217 }
218
219 // ----------------------------------------------------------------
220 // Map a single input record (maybe null at end of input stream) to zero or
221 // more output records.
222 //
223 // Return: list of lrec_t*. Input: lrec_t* and list of mapper_t*.
224
chain_map(lrec_t * pinrec,context_t * pctx,sllve_t * pmapper_list_head)225 static sllv_t* chain_map(lrec_t* pinrec, context_t* pctx, sllve_t* pmapper_list_head) {
226 mapper_t* pmapper = pmapper_list_head->pvvalue;
227 sllv_t* outrecs = pmapper->pprocess_func(pinrec, pctx, pmapper->pvstate);
228 if (pmapper_list_head->pnext == NULL) {
229 return outrecs;
230 } else if (outrecs == NULL) { // end of input stream
231 return NULL;
232 } else {
233 sllv_t* nextrecs = sllv_alloc();
234
235 for (sllve_t* pe = outrecs->phead; pe != NULL; pe = pe->pnext) {
236 lrec_t* poutrec = pe->pvvalue;
237 sllv_t* nextrecsi = chain_map(poutrec, pctx, pmapper_list_head->pnext);
238 sllv_transfer(nextrecs, nextrecsi);
239 sllv_free(nextrecsi);
240 }
241 sllv_free(outrecs);
242
243 return nextrecs;
244 }
245 }
246
247 // ----------------------------------------------------------------
stderr_progress_indicator(context_t * pctx,long long nr_progress_mod)248 static void stderr_progress_indicator(context_t* pctx, long long nr_progress_mod) {
249 long long remainder = pctx->nr % nr_progress_mod;
250 if (remainder == 0) {
251 fprintf(stderr, "NR=%lld FNR=%lld FILENAME=%s\n", pctx->nr, pctx->fnr, pctx->filename);
252 }
253 }
254
null_progress_indicator(context_t * pctx,long long nr_progress_mod)255 static void null_progress_indicator(context_t* pctx, long long nr_progress_mod) {
256 }
257