1 /**
2 * Copyright (c) 2007-2012, Timothy Stack
3 *
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice, this
10 * list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * * Neither the name of Timothy Stack nor the names of its contributors
15 * may be used to endorse or promote products derived from this software
16 * without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
19 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
22 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
25 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 *
29 * @file grep_proc.cc
30 */
31
32 #include "config.h"
33
34 #include <string.h>
35 #include <stdio.h>
36 #include <errno.h>
37 #include <unistd.h>
38 #include <fcntl.h>
39 #include <signal.h>
40 #include <sys/wait.h>
41
42 #include "base/opt_util.hh"
43 #include "base/lnav_log.hh"
44 #include "base/string_util.hh"
45 #include "lnav_util.hh"
46 #include "grep_proc.hh"
47 #include "vis_line.hh"
48
49 using namespace std;
50
51 template<typename LineType>
grep_proc(pcre * code,grep_proc_source<LineType> & gps)52 grep_proc<LineType>::grep_proc(pcre *code, grep_proc_source<LineType> &gps)
53 : gp_pcre(code),
54 gp_source(gps)
55 {
56 require(this->invariant());
57
58 gps.register_proc(this);
59 }
60
61 template<typename LineType>
~grep_proc()62 grep_proc<LineType>::~grep_proc()
63 {
64 this->invalidate();
65 }
66
67 template<typename LineType>
handle_match(int line,string & line_value,int off,int * matches,int count)68 void grep_proc<LineType>::handle_match(int line,
69 string &line_value,
70 int off,
71 int *matches,
72 int count)
73 {
74 int lpc;
75
76 if (off == 0) {
77 fprintf(stdout, "%d\n", line);
78 }
79 fprintf(stdout, "[%d:%d]\n", matches[0], matches[1]);
80 for (lpc = 1; lpc < count; lpc++) {
81 fprintf(stdout,
82 "(%d:%d)",
83 matches[lpc * 2],
84 matches[lpc * 2 + 1]);
85 fwrite(&(line_value.c_str()[matches[lpc * 2]]),
86 1,
87 matches[lpc * 2 + 1] -
88 matches[lpc * 2],
89 stdout);
90 fputc('\n', stdout);
91 }
92 }
93
94 template<typename LineType>
start()95 void grep_proc<LineType>::start()
96 {
97 require(this->invariant());
98
99 if (this->gp_child_started || this->gp_queue.empty()) {
100 return;
101 }
102
103 auto_pipe in_pipe(STDIN_FILENO);
104 auto_pipe out_pipe(STDOUT_FILENO);
105 auto_pipe err_pipe(STDERR_FILENO);
106
107 /* Get ahold of some pipes for stdout and stderr. */
108 if (out_pipe.open() < 0) {
109 throw error(errno);
110 }
111
112 if (err_pipe.open() < 0) {
113 throw error(errno);
114 }
115
116 if ((this->gp_child = fork()) < 0) {
117 throw error(errno);
118 }
119
120 in_pipe.after_fork(this->gp_child);
121 out_pipe.after_fork(this->gp_child);
122 err_pipe.after_fork(this->gp_child);
123
124 if (this->gp_child != 0) {
125 log_perror(fcntl(out_pipe.read_end(), F_SETFL, O_NONBLOCK));
126 log_perror(fcntl(out_pipe.read_end(), F_SETFD, 1));
127 this->gp_line_buffer.set_fd(out_pipe.read_end());
128
129 log_perror(fcntl(err_pipe.read_end(), F_SETFL, O_NONBLOCK));
130 log_perror(fcntl(err_pipe.read_end(), F_SETFD, 1));
131 require(this->gp_err_pipe.get() == -1);
132 this->gp_err_pipe = std::move(err_pipe.read_end());
133 this->gp_child_started = true;
134 this->gp_child_queue_size = this->gp_queue.size();
135
136 this->gp_queue.clear();
137 return;
138 }
139
140 /* In the child... */
141
142 /*
143 * Restore the default signal handlers so we don't hang around
144 * forever if there is a problem.
145 */
146 signal(SIGINT, SIG_DFL);
147 signal(SIGTERM, SIG_DFL);
148
149 this->child_init();
150
151 this->child_loop();
152
153 _exit(0);
154 }
155
156 template<typename LineType>
child_loop()157 void grep_proc<LineType>::child_loop()
158 {
159 char outbuf[BUFSIZ * 2];
160 string line_value;
161
162 /* Make sure buffering is on, not sure of the state in the parent. */
163 if (setvbuf(stdout, outbuf, _IOFBF, BUFSIZ * 2) < 0) {
164 perror("setvbuf");
165 }
166 lnav_log_file = make_optional_from_nullable(fopen("/tmp/lnav.grep.err", "a"));
167 line_value.reserve(BUFSIZ * 2);
168 while (!this->gp_queue.empty()) {
169 LineType start_line = this->gp_queue.front().first;
170 LineType stop_line = this->gp_queue.front().second;
171 bool done = false;
172 LineType line;
173
174 this->gp_queue.pop_front();
175 for (line = this->gp_source.grep_initial_line(start_line, this->gp_highest_line);
176 line != -1 && (stop_line == -1 || line < stop_line) && !done;
177 this->gp_source.grep_next_line(line)) {
178 line_value.clear();
179 done = !this->gp_source.grep_value_for_line(line, line_value);
180 if (!done) {
181 pcre_context_static<128> pc;
182 pcre_input pi(line_value);
183
184 while (this->gp_pcre.match(pc, pi)) {
185 pcre_context::iterator pc_iter;
186 pcre_context::capture_t *m;
187
188 if (pi.pi_offset == 0) {
189 fprintf(stdout, "%d\n", (int) line);
190 }
191 m = pc.all();
192 fprintf(stdout, "[%d:%d]\n", m->c_begin, m->c_end);
193 for (pc_iter = pc.begin(); pc_iter != pc.end();
194 pc_iter++) {
195 if (!pc_iter->is_valid()) {
196 continue;
197 }
198 fprintf(stdout,
199 "(%d:%d)",
200 pc_iter->c_begin,
201 pc_iter->c_end);
202
203 /* If the capture was conditional, pcre will return a -1
204 * here.
205 */
206 if (pc_iter->c_begin >= 0) {
207 fwrite(pi.get_substr_start(pc_iter),
208 1,
209 pc_iter->length(),
210 stdout);
211 }
212 fputc('\n', stdout);
213 }
214 fprintf(stdout, "/\n");
215 }
216 }
217
218 if (((line + 1) % 10000) == 0) {
219 /* Periodically flush the buffer so the parent sees progress */
220 this->child_batch();
221 }
222 }
223
224 if (stop_line == -1) {
225 // When scanning to the end of the source, we need to return the
226 // highest line that was seen so that the next request that
227 // continues from the end works properly.
228 fprintf(stdout, "h%d\n", line - 1);
229 }
230 this->child_term();
231 }
232 }
233
234 template<typename LineType>
cleanup()235 void grep_proc<LineType>::cleanup()
236 {
237 if (this->gp_child != -1 && this->gp_child != 0) {
238 int status = 0;
239
240 kill(this->gp_child, SIGTERM);
241 while (waitpid(this->gp_child, &status, 0) < 0 && (errno == EINTR)) {
242 ;
243 }
244 require(!WIFSIGNALED(status) || WTERMSIG(status) != SIGABRT);
245 this->gp_child = -1;
246 this->gp_child_started = false;
247
248 if (this->gp_sink) {
249 for (size_t lpc = 0; lpc < this->gp_child_queue_size; lpc++) {
250 this->gp_sink->grep_end(*this);
251 }
252 }
253 }
254
255 if (this->gp_err_pipe != -1) {
256 this->gp_err_pipe.reset();
257 }
258
259 this->gp_pipe_range.clear();
260 this->gp_line_buffer.reset();
261
262 ensure(this->invariant());
263
264 if (!this->gp_queue.empty()) {
265 this->start();
266 }
267 }
268
269 template<typename LineType>
dispatch_line(char * line)270 void grep_proc<LineType>::dispatch_line(char *line)
271 {
272 int start, end, capture_start;
273
274 require(line != nullptr);
275
276 if (sscanf(line, "h%d", this->gp_highest_line.out()) == 1) {
277 } else if (sscanf(line, "%d", this->gp_last_line.out()) == 1) {
278 /* Starting a new line with matches. */
279 ensure(this->gp_last_line >= 0);
280 }
281 else if (sscanf(line, "[%d:%d]", &start, &end) == 2) {
282 require(start >= 0);
283 require(end >= 0);
284
285 /* Pass the match offsets to the sink delegate. */
286 if (this->gp_sink != nullptr) {
287 this->gp_sink->grep_match(*this, this->gp_last_line, start, end);
288 }
289 }
290 else if (sscanf(line, "(%d:%d)%n", &start, &end, &capture_start) == 2) {
291 require(start == -1 || start >= 0);
292 require(end >= 0);
293
294 /* Pass the captured strings to the sink delegate. */
295 if (this->gp_sink != nullptr) {
296 this->gp_sink->grep_capture(*this,
297 this->gp_last_line,
298 start,
299 end,
300 start < 0 ?
301 nullptr : &line[capture_start]);
302 }
303 }
304 else if (line[0] == '/') {
305 if (this->gp_sink != nullptr) {
306 this->gp_sink->grep_match_end(*this, this->gp_last_line);
307 }
308 }
309 else {
310 log_error("bad line from child -- %s", line);
311 }
312 }
313
314 template<typename LineType>
check_poll_set(const std::vector<struct pollfd> & pollfds)315 void grep_proc<LineType>::check_poll_set(const std::vector<struct pollfd> &pollfds)
316 {
317 require(this->invariant());
318
319 if (this->gp_err_pipe != -1 && pollfd_ready(pollfds, this->gp_err_pipe)) {
320 char buffer[1024 + 1];
321 ssize_t rc;
322
323 rc = read(this->gp_err_pipe, buffer, sizeof(buffer) - 1);
324 if (rc > 0) {
325 static const char *PREFIX = ": ";
326
327 buffer[rc] = '\0';
328 if (strncmp(buffer, PREFIX, strlen(PREFIX)) == 0) {
329 char *lf;
330
331 if ((lf = strchr(buffer, '\n')) != nullptr) {
332 *lf = '\0';
333 }
334 if (this->gp_control != nullptr) {
335 this->gp_control->grep_error(&buffer[strlen(PREFIX)]);
336 }
337 }
338 }
339 else if (rc == 0) {
340 this->gp_err_pipe.reset();
341 }
342 }
343
344 if (this->gp_line_buffer.get_fd() != -1 &&
345 pollfd_ready(pollfds, this->gp_line_buffer.get_fd())) {
346 try {
347 static const int MAX_LOOPS = 100;
348
349 int loop_count = 0;
350 bool drained = false;
351
352 while (loop_count < MAX_LOOPS) {
353 auto load_result = this->gp_line_buffer
354 .load_next_line(this->gp_pipe_range);
355
356 if (load_result.isErr()) {
357 break;
358 }
359
360 auto li = load_result.unwrap();
361
362 if (li.li_file_range.empty()) {
363 drained = true;
364 break;
365 }
366
367 this->gp_pipe_range = li.li_file_range;
368 this->gp_line_buffer.read_range(li.li_file_range).then([this](auto sbr) {
369 auto_mem<char> buf;
370
371 buf = (char *) malloc(sbr.length() + 1);
372 sbr.rtrim(is_line_ending);
373 memcpy(buf, sbr.get_data(), sbr.length());
374 buf[sbr.length()] = '\0';
375 this->dispatch_line(buf);
376 });
377
378 loop_count += 1;
379 }
380
381 if (this->gp_sink != nullptr) {
382 this->gp_sink->grep_end_batch(*this);
383 }
384
385 if (drained && this->gp_line_buffer.is_pipe_closed()) {
386 this->cleanup();
387 }
388 }
389 catch (line_buffer::error & e) {
390 this->cleanup();
391 }
392 }
393
394 ensure(this->invariant());
395 }
396
397 template<typename LineType>
invalidate()398 grep_proc<LineType> &grep_proc<LineType>::invalidate()
399 {
400 if (this->gp_sink) {
401 for (size_t lpc = 0; lpc < this->gp_queue.size(); lpc++) {
402 this->gp_sink->grep_end(*this);
403 }
404 }
405 this->gp_queue.clear();
406 this->cleanup();
407 return *this;
408 }
409
410 template class grep_proc<vis_line_t>;
411