1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 
5 #include "lib/mlrutil.h"
6 #include "lib/mlr_globals.h"
7 #include "containers/lrec.h"
8 #include "containers/sllv.h"
9 #include "input/lrec_readers.h"
10 #include "mapping/mappers.h"
11 #include "output/lrec_writers.h"
12 
13 static int do_stream_chained_in_place(context_t* pctx, cli_opts_t* popts);
14 static int do_stream_chained_to_stdout(context_t* pctx, sllv_t* pmapper_list, cli_opts_t* popts);
15 
16 static int do_file_chained(char* filename, context_t* pctx,
17 	lrec_reader_t* plrec_reader, sllv_t* pmapper_list, lrec_writer_t* plrec_writer, FILE* output_stream,
18 	cli_opts_t* popts);
19 
20 static sllv_t* chain_map(lrec_t* pinrec, context_t* pctx, sllve_t* pmapper_list_head);
21 
22 static void drive_lrec(lrec_t* pinrec, context_t* pctx, sllve_t* pmapper_list_head, lrec_writer_t* plrec_writer,
23 	FILE* output_stream);
24 
25 typedef void progress_indicator_t(context_t* pctx, long long nr_progress_mod);
26 static void null_progress_indicator(context_t* pctx, long long nr_progress_mod);
27 static void stderr_progress_indicator(context_t* pctx, long long nr_progress_mod);
28 
29 // ----------------------------------------------------------------
do_stream_chained(context_t * pctx,sllv_t * pmapper_list,cli_opts_t * popts)30 int do_stream_chained(context_t* pctx, sllv_t* pmapper_list, cli_opts_t* popts) {
31 	if (popts->do_in_place) {
32 		return do_stream_chained_in_place(pctx, popts);
33 	} else {
34 		return do_stream_chained_to_stdout(pctx, pmapper_list, popts);
35 	}
36 }
37 
38 // ----------------------------------------------------------------
39 // For in-place mode, reconstruct the mappers on each input file. E.g. 'mlr -I head -n 2
40 // foo bar' should do head -n 2 on foo as well as on bar.
41 //
42 // I could have implemented this with a single construction of the mappers and having each
43 // mapper implement a virtual reset() method.  However, having effectively two initalizers
44 // per mapper -- constructor and reset method -- I'd surely miss some logic somewhere.
45 // With in-place mode being a less frequently used code path, this would likely lead to
46 // latent bugs. So while it seems inelegant to re-parse part of the command line over and
47 // over again to reconstruct mappers for each file, this approach leads to greater code
48 // stability.
49 
do_stream_chained_in_place(context_t * pctx,cli_opts_t * popts)50 static int do_stream_chained_in_place(context_t* pctx, cli_opts_t* popts) {
51 	MLR_INTERNAL_CODING_ERROR_IF(popts->filenames == NULL);
52 	MLR_INTERNAL_CODING_ERROR_IF(popts->filenames->length == 0);
53 
54 	int ok = 1;
55 
56 	// Read from each file name in turn
57 	for (sllse_t* pe = popts->filenames->phead; pe != NULL; pe = pe->pnext) {
58 
59 		// Allocate reader, mappers, and writer individually for each file name.
60 		// This way CSV headers appear in each file, head -n 10 puts 10 rows for
61 		// each output file, and so on.
62 		lrec_reader_t* plrec_reader = lrec_reader_alloc_or_die(&popts->reader_opts);
63 		lrec_writer_t* plrec_writer = lrec_writer_alloc_or_die(&popts->writer_opts);
64 
65 		// Note that the command-line parsers can operate destructively on argv,
66 		// e.g. verbs which take comma-delimited field names splitting on commas.
67 		// For this reason we need to duplicate argv on each run. We need to free
68 		// after processing in case mappers have retained pointers into argv.
69 
70 		int argi = popts->mapper_argb;
71 		char** argv_copy = copy_argv(popts->original_argv);
72 		sllv_t* pmapper_list = cli_parse_mappers(argv_copy, &argi, popts->argc, popts);
73 		// Should not have been allowed by the CLI parser:
74 		MLR_INTERNAL_CODING_ERROR_IF(pmapper_list->length < 1);
75 
76 		char* filename = pe->value;
77 		char* tempname = alloc_suffixed_temp_file_name(filename);
78 		FILE* output_stream = fopen(tempname, "wb");
79 		if (output_stream == NULL) {
80 			perror("fopen");
81 			fprintf(stderr, "%s: Could not open \"%s\" for write.\n",
82 				MLR_GLOBALS.bargv0, tempname);
83 			exit(1);
84 		}
85 
86 		pctx->filenum++;
87 		pctx->filename = filename;
88 		pctx->fnr = 0;
89 
90 		ok = do_file_chained(filename, pctx, plrec_reader, pmapper_list, plrec_writer,
91 			output_stream, popts) && ok;
92 
93 		// For in-place mode, there's no breaking from the loop over input files. Just an early
94 		// return from the mapper chain, which has already just happened.
95 		if (pctx->force_eof == TRUE) // e.g. mlr head
96 			pctx->force_eof = FALSE;
97 
98 		// Mappers and writers receive end-of-stream notifications via null input record.
99 		// Do that, now that data from the input file have been exhausted.
100 		drive_lrec(NULL, pctx, pmapper_list->phead, plrec_writer, output_stream);
101 		// Drain the pretty-printer.
102 		plrec_writer->pprocess_func(plrec_writer->pvstate, output_stream, NULL, pctx);
103 
104 		fclose(output_stream);
105 		int rc = rename(tempname, filename);
106 		if (rc != 0) {
107 			perror("rename");
108 			fprintf(stderr, "%s: Could not rename \"%s\" to \"%s\".\n",
109 				MLR_GLOBALS.bargv0, tempname, filename);
110 			exit(1);
111 		}
112 		free(tempname);
113 
114 		plrec_reader->pfree_func(plrec_reader);
115 		plrec_writer->pfree_func(plrec_writer, pctx);
116 
117 		mapper_chain_free(pmapper_list, pctx);
118 
119 		free_argv_copy(argv_copy);
120 	}
121 
122 	return ok;
123 }
124 
125 // ----------------------------------------------------------------
do_stream_chained_to_stdout(context_t * pctx,sllv_t * pmapper_list,cli_opts_t * popts)126 static int do_stream_chained_to_stdout(context_t* pctx, sllv_t* pmapper_list, cli_opts_t* popts) {
127 	FILE* output_stream = stdout;
128 
129 	lrec_reader_t* plrec_reader = lrec_reader_alloc_or_die(&popts->reader_opts);
130 	lrec_writer_t* plrec_writer = lrec_writer_alloc_or_die(&popts->writer_opts);
131 
132 	MLR_INTERNAL_CODING_ERROR_IF(pmapper_list->length < 1); // Should not have been allowed by the CLI parser.
133 
134 	int ok = 1;
135 	if (popts->filenames == NULL) {
136 		// No input at all
137 	} else if (popts->filenames->length == 0) {
138 		// Zero file names means read from standard input
139 		pctx->filenum++;
140 		pctx->filename = "(stdin)";
141 		pctx->fnr = 0;
142 		ok = do_file_chained("-", pctx, plrec_reader, pmapper_list, plrec_writer,
143 			output_stream, popts) && ok;
144 	} else {
145 		// Read from each file name in turn
146 		for (sllse_t* pe = popts->filenames->phead; pe != NULL; pe = pe->pnext) {
147 			char* filename = pe->value;
148 			pctx->filenum++;
149 			pctx->filename = filename;
150 			pctx->fnr = 0;
151 			ok = do_file_chained(filename, pctx, plrec_reader, pmapper_list, plrec_writer,
152 				output_stream, popts) && ok;
153 			if (pctx->force_eof == TRUE) // e.g. mlr head
154 				break;
155 		}
156 	}
157 
158 	// Mappers and writers receive end-of-stream notifications via null input record.
159 	// Do that, now that data from all input file(s) have been exhausted.
160 	drive_lrec(NULL, pctx, pmapper_list->phead, plrec_writer, output_stream);
161 
162 	// Drain the pretty-printer.
163 	plrec_writer->pprocess_func(plrec_writer->pvstate, output_stream, NULL, pctx);
164 
165 	plrec_reader->pfree_func(plrec_reader);
166 	plrec_writer->pfree_func(plrec_writer, pctx);
167 
168 	return ok;
169 }
170 
171 // ----------------------------------------------------------------
do_file_chained(char * filename,context_t * pctx,lrec_reader_t * plrec_reader,sllv_t * pmapper_list,lrec_writer_t * plrec_writer,FILE * output_stream,cli_opts_t * popts)172 static int do_file_chained(char* filename, context_t* pctx,
173 	lrec_reader_t* plrec_reader, sllv_t* pmapper_list, lrec_writer_t* plrec_writer, FILE* output_stream,
174 	cli_opts_t* popts)
175 {
176 	void* pvhandle = plrec_reader->popen_func(plrec_reader->pvstate, popts->reader_opts.prepipe, filename);
177 	progress_indicator_t* pindicator = popts->nr_progress_mod == 0LL
178 		? null_progress_indicator
179 		: stderr_progress_indicator;
180 
181 	// Start-of-file hook, e.g. expecting CSV headers on input.
182 	plrec_reader->psof_func(plrec_reader->pvstate, pvhandle);
183 
184 	while (1) {
185 		lrec_t* pinrec = plrec_reader->pprocess_func(plrec_reader->pvstate, pvhandle, pctx);
186 		if (pinrec == NULL)
187 			break;
188 		if (pctx->force_eof == TRUE) { // e.g. mlr head
189 			lrec_free(pinrec);
190 			break;
191 		}
192 		pctx->nr++;
193 		pctx->fnr++;
194 
195 		pindicator(pctx, popts->nr_progress_mod);
196 
197 		drive_lrec(pinrec, pctx, pmapper_list->phead, plrec_writer, output_stream);
198 	}
199 
200 	plrec_reader->pclose_func(plrec_reader->pvstate, pvhandle, popts->reader_opts.prepipe);
201 	return 1;
202 }
203 
204 // ----------------------------------------------------------------
drive_lrec(lrec_t * pinrec,context_t * pctx,sllve_t * pmapper_list_head,lrec_writer_t * plrec_writer,FILE * output_stream)205 static void drive_lrec(lrec_t* pinrec, context_t* pctx, sllve_t* pmapper_list_head, lrec_writer_t* plrec_writer,
206 	FILE* output_stream)
207 {
208 	sllv_t* outrecs = chain_map(pinrec, pctx, pmapper_list_head);
209 	if (outrecs != NULL) {
210 		for (sllve_t* pe = outrecs->phead; pe != NULL; pe = pe->pnext) {
211 			lrec_t* poutrec = pe->pvvalue;
212 			if (poutrec != NULL) // writer frees records (sllv void-star payload)
213 				plrec_writer->pprocess_func(plrec_writer->pvstate, output_stream, poutrec, pctx);
214 		}
215 		sllv_free(outrecs); // we free the list
216 	}
217 }
218 
219 // ----------------------------------------------------------------
220 // Map a single input record (maybe null at end of input stream) to zero or
221 // more output records.
222 //
223 // Return: list of lrec_t*. Input: lrec_t* and list of mapper_t*.
224 
chain_map(lrec_t * pinrec,context_t * pctx,sllve_t * pmapper_list_head)225 static sllv_t* chain_map(lrec_t* pinrec, context_t* pctx, sllve_t* pmapper_list_head) {
226 	mapper_t* pmapper = pmapper_list_head->pvvalue;
227 	sllv_t* outrecs = pmapper->pprocess_func(pinrec, pctx, pmapper->pvstate);
228 	if (pmapper_list_head->pnext == NULL) {
229 		return outrecs;
230 	} else if (outrecs == NULL) { // end of input stream
231 		return NULL;
232 	} else {
233 		sllv_t* nextrecs = sllv_alloc();
234 
235 		for (sllve_t* pe = outrecs->phead; pe != NULL; pe = pe->pnext) {
236 			lrec_t* poutrec = pe->pvvalue;
237 			sllv_t* nextrecsi = chain_map(poutrec, pctx, pmapper_list_head->pnext);
238 			sllv_transfer(nextrecs, nextrecsi);
239 			sllv_free(nextrecsi);
240 		}
241 		sllv_free(outrecs);
242 
243 		return nextrecs;
244 	}
245 }
246 
247 // ----------------------------------------------------------------
stderr_progress_indicator(context_t * pctx,long long nr_progress_mod)248 static void stderr_progress_indicator(context_t* pctx, long long nr_progress_mod) {
249 	long long remainder = pctx->nr % nr_progress_mod;
250 	if (remainder == 0) {
251 		fprintf(stderr, "NR=%lld FNR=%lld FILENAME=%s\n", pctx->nr, pctx->fnr, pctx->filename);
252 	}
253 }
254 
null_progress_indicator(context_t * pctx,long long nr_progress_mod)255 static void null_progress_indicator(context_t* pctx, long long nr_progress_mod) {
256 }
257