1 #include "autoconfig.h"
2 
3 #ifdef RCL_MONITOR
4 /* Copyright (C) 2006 J.F.Dockes
5  *   This program is free software; you can redistribute it and/or modify
6  *   it under the terms of the GNU General Public License as published by
7  *   the Free Software Foundation; either version 2 of the License, or
8  *   (at your option) any later version.
9  *
10  *   This program is distributed in the hope that it will be useful,
11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *   GNU General Public License for more details.
14  *
15  *   You should have received a copy of the GNU General Public License
16  *   along with this program; if not, write to the
17  *   Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19  */
20 
21 /**
22  * Recoll real time monitor processing. This file has the code to retrieve
23  * event from the event queue and do the database-side processing. Also the
24  * initialization function.
25  */
26 
27 #include <errno.h>
28 #include <fnmatch.h>
29 #include "safeunistd.h"
30 
31 #include <cstring>
32 #include <cstdio>
33 #include <cstdlib>
34 #include <list>
35 #include <vector>
36 #include <thread>
37 #include <mutex>
38 #include <condition_variable>
39 #include <chrono>
40 
41 using std::list;
42 using std::vector;
43 
44 #include "log.h"
45 #include "rclmon.h"
46 #include "log.h"
47 
48 #include "execmd.h"
49 #include "recollindex.h"
50 #include "pathut.h"
51 #ifndef _WIN32
52 #include "x11mon.h"
53 #endif
54 #include "subtreelist.h"
55 
56 typedef unsigned long mttcast;
57 
58 // Seconds between auxiliary db (stem, spell) updates:
59 static const int dfltauxinterval = 60 *60;
60 static int auxinterval = dfltauxinterval;
61 
62 // Seconds between indexing queue processing: for merging events to
63 // fast changing files and saving some of the indexing overhead.
64 static const int dfltixinterval = 30;
65 static int ixinterval = dfltixinterval;
66 
67 static RclMonEventQueue rclEQ;
68 
69 //
70 // Delayed events: this is a special feature for fast changing files.
71 // A list of pattern/delays can be specified in the configuration so
72 // that they don't get re-indexed before some timeout is elapsed. Such
73 // events are kept on a separate queue (m_dqueue) with an auxiliary
74 // list in time-to-reindex order, while the normal events are on
75 // m_iqueue.
76 
77 // Queue management performance: on a typical recoll system there will
78 // be only a few entries on the event queues and no significant time
79 // will be needed to manage them. Even on a busy system, the time used
80 // would most probably be negligible compared to the actual processing
81 // of the indexing events. So this is just for reference. Let I be the
82 // number of immediate events and D the number of delayed ones, N
83 // stands for either.
84 //
85 // Periodic timeout polling: the recollindex process periodically (2S)
86 // wakes up to check for exit requests. At this time it also checks
87 // the queues for new entries (should not happen because the producer
88 // would normally wake up the consumer threads), or ready entries
89 // among the delayed ones. At this time it calls the "empty()"
90 // routine. This has constant time behaviour (checks for stl container
91 // emptiness and the top entry of the delays list).
92 //
93 // Adding a new event (pushEvent()): this performs a search for an
94 // existing event with the same path (O(log(N)), then an insert on the
95 // appropriate queue (O(log(N))) and an insert on the times list (O(D)).
96 //
97 // Popping an event: this is constant time as it just looks at the
98 // tops of the normal and delayed queues.
99 
100 
101 // Indexing event container: a map indexed by file path for fast
102 // insertion of duplicate events to the same file
103 typedef map<string, RclMonEvent> queue_type;
104 
105 // Entries for delayed events are duplicated (as iterators) on an
106 // auxiliary, sorted by time-to-reindex list. We could get rid of
107 // this, the price would be that the RclEQ.empty() call would have to
108 // walk the whole queue instead of only looking at the first delays
109 // entry.
110 typedef list<queue_type::iterator> delays_type;
111 
112 // DelayPat stores a path wildcard pattern and a minimum time between
113 // reindexes, it is read from the recoll configuration
114 struct DelayPat {
115     string pattern;
116     int    seconds;
DelayPatDelayPat117     DelayPat() : seconds(0) {}
118 };
119 
120 /** Private part of RclEQ: things that we don't wish to exist in the interface
121  *  include file.
122  */
123 class RclEQData {
124 public:
125     int        m_opts;
126     // Queue for normal files (unlimited reindex)
127     queue_type m_iqueue;
128     // Queue for delayed reindex files
129     queue_type m_dqueue;
130     // The delays list stores pointers (iterators) to elements on
131     // m_dqueue.  The list is kept in time-to-index order. Elements of
132     // m_dqueue which are also in m_delays can only be deleted while
133     // walking m_delays, so we are certain that the m_dqueue iterators
134     // stored in m_delays remain valid.
135     delays_type m_delays;
136     // Configured intervals for path patterns, read from the configuration.
137     vector<DelayPat> m_delaypats;
138     RclConfig *m_config;
139     bool       m_ok;
140 
141     std::mutex m_mutex;
142     std::condition_variable m_cond;
143 
RclEQData()144     RclEQData()
145         : m_config(0), m_ok(true)
146         {
147         }
148     void readDelayPats(int dfltsecs);
searchDelayPats(const string & path)149     DelayPat searchDelayPats(const string& path)
150         {
151             for (vector<DelayPat>::iterator it = m_delaypats.begin();
152                  it != m_delaypats.end(); it++) {
153                 if (fnmatch(it->pattern.c_str(), path.c_str(), 0) == 0) {
154                     return *it;
155                 }
156             }
157             return DelayPat();
158         }
159     void delayInsert(const queue_type::iterator &qit);
160 };
161 
readDelayPats(int dfltsecs)162 void RclEQData::readDelayPats(int dfltsecs)
163 {
164     if (m_config == 0)
165         return;
166     string patstring;
167     if (!m_config->getConfParam("mondelaypatterns", patstring) ||
168         patstring.empty())
169         return;
170 
171     vector<string> dplist;
172     if (!stringToStrings(patstring, dplist)) {
173         LOGERR("rclEQData: bad pattern list: ["  << (patstring) << "]\n" );
174         return;
175     }
176 
177     for (vector<string>::iterator it = dplist.begin();
178          it != dplist.end(); it++) {
179         string::size_type pos = it->find_last_of(":");
180         DelayPat dp;
181         dp.pattern = it->substr(0, pos);
182         if (pos != string::npos && pos != it->size()-1) {
183             dp.seconds = atoi(it->substr(pos+1).c_str());
184         } else {
185             dp.seconds = dfltsecs;
186         }
187         m_delaypats.push_back(dp);
188         LOGDEB2("rclmon::readDelayPats: add ["  << (dp.pattern) << "] "  << (dp.seconds) << "\n" );
189     }
190 }
191 
192 // Insert event (as queue iterator) into delays list, in time order,
193 // We DO NOT take care of duplicate qits. erase should be called first
194 // when necessary.
delayInsert(const queue_type::iterator & qit)195 void RclEQData::delayInsert(const queue_type::iterator &qit)
196 {
197     MONDEB("RclEQData::delayInsert: minclock " << qit->second.m_minclock <<
198            std::endl);
199     for (delays_type::iterator dit = m_delays.begin();
200          dit != m_delays.end(); dit++) {
201         queue_type::iterator qit1 = *dit;
202         if ((*qit1).second.m_minclock > qit->second.m_minclock) {
203             m_delays.insert(dit, qit);
204             return;
205         }
206     }
207     m_delays.push_back(qit);
208 }
209 
RclMonEventQueue()210 RclMonEventQueue::RclMonEventQueue()
211 {
212     m_data = new RclEQData;
213 }
214 
~RclMonEventQueue()215 RclMonEventQueue::~RclMonEventQueue()
216 {
217     delete m_data;
218 }
219 
setopts(int opts)220 void RclMonEventQueue::setopts(int opts)
221 {
222     if (m_data)
223         m_data->m_opts = opts;
224 }
225 
226 /** Wait until there is something to process on the queue, or timeout.
227  *  returns a queue lock
228  */
wait(int seconds,bool * top)229 std::unique_lock<std::mutex> RclMonEventQueue::wait(int seconds, bool *top)
230 {
231     std::unique_lock<std::mutex> lock(m_data->m_mutex);
232 
233     MONDEB("RclMonEventQueue::wait, seconds: " << seconds << std::endl);
234     if (!empty()) {
235         MONDEB("RclMonEventQueue:: immediate return\n");
236         return lock;
237     }
238 
239     int err;
240     if (seconds > 0) {
241         if (top)
242             *top = false;
243         if (m_data->m_cond.wait_for(lock, std::chrono::seconds(seconds)) ==
244             std::cv_status::timeout) {
245             *top = true;
246             MONDEB("RclMonEventQueue:: timeout\n");
247             return lock;
248         }
249     } else {
250         m_data->m_cond.wait(lock);
251     }
252     MONDEB("RclMonEventQueue:: non-timeout return\n");
253     return lock;
254 }
255 
setConfig(RclConfig * cnf)256 void RclMonEventQueue::setConfig(RclConfig *cnf)
257 {
258     m_data->m_config = cnf;
259     // Don't use ixinterval here, could be 0 ! Base the default
260     // delayed reindex delay on the default ixinterval delay
261     m_data->readDelayPats(10 * dfltixinterval);
262 }
263 
getConfig()264 RclConfig *RclMonEventQueue::getConfig()
265 {
266     return m_data->m_config;
267 }
268 
ok()269 bool RclMonEventQueue::ok()
270 {
271     if (m_data == 0) {
272         LOGINFO("RclMonEventQueue: not ok: bad state\n" );
273         return false;
274     }
275     if (stopindexing) {
276         LOGINFO("RclMonEventQueue: not ok: stop request\n" );
277         return false;
278     }
279     if (!m_data->m_ok) {
280         LOGINFO("RclMonEventQueue: not ok: queue terminated\n" );
281         return false;
282     }
283     return true;
284 }
285 
setTerminate()286 void RclMonEventQueue::setTerminate()
287 {
288     MONDEB("RclMonEventQueue:: setTerminate\n");
289     std::unique_lock<std::mutex> lock(m_data->m_mutex);
290     m_data->m_ok = false;
291     m_data->m_cond.notify_all();
292 }
293 
294 // Must be called with the queue locked
empty()295 bool RclMonEventQueue::empty()
296 {
297     if (m_data == 0) {
298         MONDEB("RclMonEventQueue::empty(): true (m_data==0)\n");
299         return true;
300     }
301     if (!m_data->m_iqueue.empty()) {
302         MONDEB("RclMonEventQueue::empty(): false (m_iqueue not empty)\n");
303         return true;
304     }
305     if (m_data->m_dqueue.empty()) {
306         MONDEB("RclMonEventQueue::empty(): true (m_Xqueue both empty)\n");
307         return true;
308     }
309     // Only dqueue has events. Have to check the delays (only the
310     // first, earliest one):
311     queue_type::iterator qit = *(m_data->m_delays.begin());
312     if (qit->second.m_minclock > time(0)) {
313         MONDEB("RclMonEventQueue::empty(): true (no delay ready " <<
314                qit->second.m_minclock << ")\n");
315         return true;
316     }
317     MONDEB("RclMonEventQueue::empty(): returning false (delay expired)\n");
318     return false;
319 }
320 
321 // Retrieve indexing event for processing. Returns empty event if
322 // nothing interesting is found
323 // Must be called with the queue locked
pop()324 RclMonEvent RclMonEventQueue::pop()
325 {
326     time_t now = time(0);
327     MONDEB("RclMonEventQueue::pop(), now " << now << std::endl);
328 
329     // Look at the delayed events, get rid of the expired/unactive
330     // ones, possibly return an expired/needidx one.
331     while (!m_data->m_delays.empty()) {
332         delays_type::iterator dit = m_data->m_delays.begin();
333         queue_type::iterator qit = *dit;
334         MONDEB("RclMonEventQueue::pop(): in delays: evt minclock " <<
335                qit->second.m_minclock << std::endl);
336         if (qit->second.m_minclock <= now) {
337             if (qit->second.m_needidx) {
338                 RclMonEvent ev = qit->second;
339                 qit->second.m_minclock = time(0) + qit->second.m_itvsecs;
340                 qit->second.m_needidx = false;
341                 m_data->m_delays.erase(dit);
342                 m_data->delayInsert(qit);
343                 return ev;
344             } else {
345                 // Delay elapsed without new update, get rid of event.
346                 m_data->m_dqueue.erase(qit);
347                 m_data->m_delays.erase(dit);
348             }
349         } else {
350             // This and following events are for later processing, we
351             // are done with the delayed event list.
352             break;
353         }
354     }
355 
356     // Look for non-delayed event
357     if (!m_data->m_iqueue.empty()) {
358         queue_type::iterator qit = m_data->m_iqueue.begin();
359         RclMonEvent ev = qit->second;
360         m_data->m_iqueue.erase(qit);
361         return ev;
362     }
363 
364     return RclMonEvent();
365 }
366 
367 // Add new event (update or delete) to the processing queue.
368 // It seems that a newer event is always correct to override any
369 // older. TBVerified ?
370 // Some conf-designated files, supposedly updated at a high rate get
371 // special processing to limit their reindexing rate.
pushEvent(const RclMonEvent & ev)372 bool RclMonEventQueue::pushEvent(const RclMonEvent &ev)
373 {
374     MONDEB("RclMonEventQueue::pushEvent for " << ev.m_path << std::endl);
375     std::unique_lock<std::mutex> lock(m_data->m_mutex);
376 
377     DelayPat pat = m_data->searchDelayPats(ev.m_path);
378     if (pat.seconds != 0) {
379         // Using delayed reindex queue. Need to take care of minclock and also
380         // insert into the in-minclock-order list
381         queue_type::iterator qit = m_data->m_dqueue.find(ev.m_path);
382         if (qit == m_data->m_dqueue.end()) {
383             // Not there yet, insert new
384             qit =
385                 m_data->m_dqueue.insert(queue_type::value_type(ev.m_path, ev)).first;
386             // Set the time to next index to "now" as it has not been
387             // indexed recently (otherwise it would still be in the
388             // queue), and add the iterator to the delay queue.
389             qit->second.m_minclock = time(0);
390             qit->second.m_needidx = true;
391             qit->second.m_itvsecs = pat.seconds;
392             m_data->delayInsert(qit);
393         } else {
394             // Already in queue. Possibly update type but save minclock
395             // (so no need to touch m_delays). Flag as needing indexing
396             time_t saved_clock = qit->second.m_minclock;
397             qit->second = ev;
398             qit->second.m_minclock = saved_clock;
399             qit->second.m_needidx = true;
400         }
401     } else {
402         // Immediate event: just insert it, erasing any previously
403         // existing entry
404         m_data->m_iqueue[ev.m_path] = ev;
405     }
406 
407     m_data->m_cond.notify_all();
408     return true;
409 }
410 
checkfileanddelete(const string & fname)411 static bool checkfileanddelete(const string& fname)
412 {
413     bool ret;
414     ret = path_exists(fname);
415     unlink(fname.c_str());
416     return ret;
417 }
418 
419 // It's possible to override the normal indexing delay by creating a
420 // file in the config directory (which we then remove). And yes there
421 // is definitely a race condition (we can suppress the delay and file
422 // before the target doc is queued), and we can't be sure that the
423 // delay suppression will be used for the doc the user intended it
424 // for. But this is used for non-critical function and the race
425 // condition should happen reasonably seldom.
426 // We check for the request file in all possible user config dirs
427 // (usually, there is only the main one)
expeditedIndexingRequested(RclConfig * conf)428 static bool expeditedIndexingRequested(RclConfig *conf)
429 {
430     static vector<string> rqfiles;
431     if (rqfiles.empty()) {
432         rqfiles.push_back(path_cat(conf->getConfDir(), "rclmonixnow"));
433         const char *cp;
434         if ((cp = getenv("RECOLL_CONFTOP"))) {
435             rqfiles.push_back(path_cat(cp, "rclmonixnow"));
436         }
437         if ((cp = getenv("RECOLL_CONFMID"))) {
438             rqfiles.push_back(path_cat(cp, "rclmonixnow"));
439         }
440     }
441     bool found  = false;
442     for (vector<string>::const_iterator it = rqfiles.begin();
443          it != rqfiles.end(); it++) {
444         found = found || checkfileanddelete(*it);
445     }
446     return found;
447 }
448 
startMonitor(RclConfig * conf,int opts)449 bool startMonitor(RclConfig *conf, int opts)
450 {
451     if (!conf->getConfParam("monauxinterval", &auxinterval))
452         auxinterval = dfltauxinterval;
453     if (!conf->getConfParam("monixinterval", &ixinterval))
454         ixinterval = dfltixinterval;
455 
456     rclEQ.setConfig(conf);
457     rclEQ.setopts(opts);
458 
459     std::thread treceive(rclMonRcvRun, &rclEQ);
460     treceive.detach();
461 
462     LOGDEB("start_monitoring: entering main loop\n" );
463 
464     bool timedout;
465     time_t lastauxtime = time(0);
466     time_t lastixtime = lastauxtime;
467     time_t lastmovetime = 0;
468     bool didsomething = false;
469     list<string> modified;
470     list<string> deleted;
471 
472     while (true) {
473         time_t now = time(0);
474         if (now - lastmovetime > ixinterval) {
475             lastmovetime = now;
476             runWebFilesMoverScript(conf);
477         }
478 
479         {
480             // Wait for event or timeout.
481             // Set a relatively short timeout for better monitoring of
482             // exit requests.
483             std::unique_lock<std::mutex> lock = rclEQ.wait(2, &timedout);
484 
485             // x11IsAlive() can't be called from ok() because both
486             // threads call it and Xlib is not multithreaded.
487 #ifndef _WIN32
488             bool x11dead = !(opts & RCLMON_NOX11) && !x11IsAlive();
489             if (x11dead)
490                 LOGDEB("RclMonprc: x11 is dead\n" );
491 #else
492             bool x11dead = false;
493 #endif
494             if (!rclEQ.ok() || x11dead) {
495                 break;
496             }
497 
498             // Process event queue
499             for (;;) {
500                 // Retrieve event
501                 RclMonEvent ev = rclEQ.pop();
502                 if (ev.m_path.empty())
503                     break;
504                 switch (ev.evtype()) {
505                 case RclMonEvent::RCLEVT_MODIFY:
506                 case RclMonEvent::RCLEVT_DIRCREATE:
507                     LOGDEB0("Monitor: Modify/Check on "  << ev.m_path << "\n");
508                     modified.push_back(ev.m_path);
509                     break;
510                 case RclMonEvent::RCLEVT_DELETE:
511                     LOGDEB0("Monitor: Delete on "  << (ev.m_path) << "\n" );
512                     // If this is for a directory (which the caller should
513                     // tell us because he knows), we should purge the db
514                     // of all the subtree, because on a directory rename,
515                     // inotify will only generate one event for the
516                     // renamed top, not the subentries. This is relatively
517                     // complicated to do though, and we currently do not
518                     // do it, and just wait for a restart to do a full run and
519                     // purge.
520                     deleted.push_back(ev.m_path);
521                     if (ev.evflags() & RclMonEvent::RCLEVT_ISDIR) {
522                         vector<string> paths;
523                         if (subtreelist(conf, ev.m_path, paths)) {
524                             deleted.insert(deleted.end(),
525                                            paths.begin(), paths.end());
526                         }
527                     }
528                     break;
529                 default:
530                     LOGDEB("Monitor: got Other on ["  << (ev.m_path) << "]\n" );
531                 }
532             }
533         }
534 
535         now = time(0);
536         // Process. We don't do this every time but let the lists accumulate
537         // a little, this saves processing. Start at once if list is big.
538         if (expeditedIndexingRequested(conf) ||
539             (now - lastixtime > ixinterval) ||
540             (deleted.size() + modified.size() > 20)) {
541             lastixtime = now;
542             // Used to do the modified list first, but it does seem
543             // smarter to make room first...
544             if (!deleted.empty()) {
545                 deleted.sort();
546                 deleted.unique();
547                 if (!purgefiles(conf, deleted))
548                     break;
549                 deleted.clear();
550                 didsomething = true;
551             }
552             if (!modified.empty()) {
553                 modified.sort();
554                 modified.unique();
555                 if (!indexfiles(conf, modified))
556                     break;
557                 modified.clear();
558                 didsomething = true;
559             }
560         }
561 
562         // Recreate the auxiliary dbs every hour at most.
563         now = time(0);
564         if (didsomething && now - lastauxtime > auxinterval) {
565             lastauxtime = now;
566             didsomething = false;
567             if (!createAuxDbs(conf)) {
568                 // We used to bail out on error here. Not anymore,
569                 // because this is most of the time due to a failure
570                 // of aspell dictionary generation, which is not
571                 // critical.
572             }
573         }
574 
575         // Check for a config change
576         if (!(opts & RCLMON_NOCONFCHECK) && o_reexec && conf->sourceChanged()) {
577             LOGDEB("Rclmonprc: config changed, reexecuting myself\n" );
578             // We never want to have a -n option after a config
579             // change. -n was added by the reexec after the initial
580             // pass even if it was not given on the command line
581             o_reexec->removeArg("-n");
582             o_reexec->reexec();
583         }
584     }
585     LOGDEB("Rclmonprc: calling queue setTerminate\n" );
586     rclEQ.setTerminate();
587 
588     // We used to wait for the receiver thread here before returning,
589     // but this is not useful and may waste time / risk problems
590     // during our limited time window for exiting. To be reviewed if
591     // we ever need several monitor invocations in the same process
592     // (can't foresee any reason why we'd want to do this).
593     LOGDEB("Monitor: returning\n" );
594     return true;
595 }
596 
597 #endif // RCL_MONITOR
598 
599