1 #include "config.h"
2
3 #ifdef HAVE_UNISTD_H
4 #include <unistd.h>
5 #endif
6
7 #include <ctype.h>
8
9 #include <iostream>
10 #include <string>
11 #include <vector>
12
13 #include "asserts.h"
14 #include "error.h"
15 #include "estring.h"
16 #include "fs.h"
17 #include "rconfig.h"
18 #include "timer.h"
19 #include "logger.h"
20
21 #include "archiver.h"
22
23 //-----------------------------------------------------------------------------
24
25 /** C'tor */
rstat()26 rstat::rstat()
27 {
28 TRY_nomem(m_exit_str[0] = "Success");
29 TRY_nomem(m_exit_str[1] = "Syntax or usage error");
30 TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
31 TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
32 TRY_nomem(m_exit_str[4] = "Requested action not supported");
33 TRY_nomem(m_exit_str[10] = "Error in socket I/O");
34 TRY_nomem(m_exit_str[11] = "Error in file I/O");
35 TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
36 TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
37 TRY_nomem(m_exit_str[14] = "Error in IPC code");
38 TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
39 TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
40 TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
41 TRY_nomem(m_exit_str[23] = "Partial transfer");
42 TRY_nomem(m_exit_str[24] = "Some files vanished before they could be transferred");
43 TRY_nomem(m_exit_str[25] = "The --max-delete limit stopped deletions");
44 TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
45 TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
46 TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
47 TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
48 TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
49 TRY_nomem(m_exit_str[255] = "An SSH error occured");
50
51 TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
52 TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
53 TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
54 TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
55 TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
56 TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
57 TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
58 TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
59 TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
60 TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
61 TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
62 TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
63 TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
64 TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
65 TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
66 TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
67 TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
68 TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
69 TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
70 TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
71 TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
72 TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
73 TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
74 TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
75 TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
76 TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
77 TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
78 TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
79 TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
80 TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
81 TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
82 TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
83 TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
84 TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
85 TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
86 TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
87 TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
88 TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
89 TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
90 TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
91
92 TRY_nomem(m_unknown_exit = "(Unknown exit code)");
93 TRY_nomem(m_unknown_signal = "(Unknown signal)");
94 }
95
96 /** Get a verbose string for an exit code */
exit(const int a_int) const97 const std::string& rstat::exit(const int a_int) const
98 {
99 if (m_exit_str.find(a_int) != m_exit_str.end()) {
100 return(m_exit_str.find(a_int)->second);
101 }
102 return(m_unknown_exit);
103 }
104
105 /** Get a verbose string for a signal number */
signal(const int a_int) const106 const std::string& rstat::signal(const int a_int) const
107 {
108 if (m_signal_str.find(a_int) != m_signal_str.end()) {
109 return(m_signal_str.find(a_int)->second);
110 }
111 return(m_unknown_signal);
112 }
113
114 class rstat rsync_estat_str;
115
116 //-----------------------------------------------------------------------------
117
118 /** C'tor
119
120 Set a job to be assiciated with this job archiver and initialize it's
121 processing status to "pending".
122
123 */
job_archiver(const job * a_job)124 job_archiver::job_archiver(const job * a_job)
125 {
126 clear();
127 m_job = a_job;
128 m_status = status_pending;
129 }
130
131 /** Generate a job prefix string
132
133 Create a string to uniquely identify this job to be used in the log file to
134 uniquely identify this job
135
136 */
prefix(void)137 const std::string job_archiver::prefix(void)
138 {
139 estring lstr;
140
141 lstr = "[Job:";
142 lstr += estring((void*)m_job);
143 lstr += "] ";
144
145 return(lstr);
146 }
147
148 /** Generate a job id string */
id(void)149 const std::string job_archiver::id(void)
150 {
151 estring lstr;
152
153 lstr = prefix();
154 lstr += " ";
155 lstr += m_job->generate_job_id();
156
157 return(lstr);
158 }
159
160 /** Clear the job archiver and return it to it's initial state
161
162 End any processes handling this job and return the job archiver to it's
163 "pending" state.
164
165 */
clear(void)166 void job_archiver::clear(void)
167 {
168 end();
169 m_child_pid = m_exec.my_pid();
170 m_status = status_pending;
171 m_success = true;
172 m_jr.clear();
173 m_jpr.clear();
174 m_error_msg.erase();
175 m_rsync_timeout_flag = false;
176 }
177
178 /** End any processes handling this job
179
180 If any child processes are handling this job, terminate them. Erase any
181 pending I/O for the now defunct child. Set our processing status to "done".
182
183 */
end(void)184 void job_archiver::end(void)
185 {
186 estring lstr;
187
188 m_timer.stop();
189 m_io_timer.stop();
190 if (m_exec.child_running()) {
191 lstr = prefix();
192 lstr += "Terminating child process!\n";
193 logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid);
194 m_exec.kill_child();
195 }
196 m_exec.clear();
197 m_io_out.erase();
198 m_io_err.erase();
199 m_status = status_done;
200 }
201
202 /** Return the processing status of this job archiver */
status(void)203 const job_archiver::archiving_status job_archiver::status(void)
204 {
205 return(m_status);
206 }
207
208 /** Begin processing
209
210 Attempt to fork a child process to handle this job. If unsuccessful then
211 retry again later. The child then calls mf_do_chores() to handle the actual
212 processing, while the parent updates the job archiver's status from
213 "pending" to "processing" and begins a timer to measure the duration of the
214 job process.
215
216 */
start(void)217 void job_archiver::start(void)
218 {
219 estring lstr;
220 estring path;
221 estring archive_dir;
222
223 m_jr.id(m_job->generate_job_id());
224
225 // Create directories before forking
226 job::paths_type::const_iterator pi;
227 for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
228 archive_dir = m_job->generate_archive_path(*pi);
229 path = archiver.working_archive_path();
230 path += "/";
231 path += archive_dir;
232 path = reform_path(path);
233 if (!exists(path)) {
234 lstr = prefix();
235 lstr += "Creating job archive path: ";
236 lstr += archive_dir;
237 lstr += "\n";
238 logger.write(lstr);
239 mk_dirhier(path);
240 }
241 }
242
243 try {
244 m_exec.fork();
245 }
246 catch(error e) {
247 lstr = prefix();
248 lstr += "Could not fork:\n";
249 logger.write(lstr);
250
251 lstr = e.str(prefix());
252 logger.write(lstr);
253
254 lstr = prefix();
255 lstr += "Will retry job later\n";
256 logger.write(lstr);
257
258 m_status = status_reschedule;
259 }
260 catch(...) {
261 error e = err_unknown;
262
263 lstr = prefix();
264 lstr += "Could not fork:\n";
265 logger.write(lstr);
266
267 lstr = e.str(prefix());
268 logger.write(lstr);
269
270 lstr = prefix();
271 lstr += "Will retry job later\n";
272 logger.write(lstr);
273
274 m_status = status_reschedule;
275 }
276
277 if (m_exec.is_child()) {
278 // wait_for_debugger = true;
279
280 // while (wait_for_debugger);
281
282 m_exec.reroute_stdio();
283 try {
284 mf_do_chores();
285 }
286 catch(error e) {
287 std::cerr << e;
288 m_success = false;
289 }
290 catch(...) {
291 std::cerr << err_unknown;
292 m_success = false;
293 }
294 if (m_success)
295 m_exec.exit(0);
296 else
297 m_exec.exit(1);
298 }
299
300 m_child_pid = m_exec.child_pid();
301
302 lstr = prefix();
303 lstr += "Spawning child process: PID ";
304 lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
305 lstr += "\n";
306 logger.write(lstr);
307
308 m_status = status_processing;
309 m_timer.start();
310 m_io_timer.start();
311 m_rsync_timeout_flag = false;
312 }
313
314 /** Parent processor for a job
315
316 Check for I/O from the child. Check the child's status to see if it's still
317 running, has exited with an exit code, or has exited from a signal. If the
318 child sis not exit normally (i.e. exit from a signal or exit with a non-zero
319 exit code) then check the vault for overflow. If the vault has exceeded
320 it's overflow threshold then that could be the cause for the child's
321 failure, in which case we reschedule the child to be processed again later.
322
323 If the job is finished (whether successful or not), update the job
324 archiver's status to "completed".
325
326 */
process(void)327 void job_archiver::process(void)
328 {
329 estring lstr;
330
331 if (m_exec.child_running()) {
332 // Process child I/O
333 mf_process_child_io(false);
334 }
335 else {
336 // Process remaining child I/O
337 mf_process_child_io(true);
338
339 // If child exited with an error, check vault overflow. If the vault is
340 // filling up, then reschedule the job for later retry.
341 lstr = prefix();
342 if (m_exec.child_signaled()) {
343 lstr += "Child exited from signal: ";
344 lstr += estring(m_exec.child_signal_no());
345 lstr += "\n";
346 }
347 else if (m_exec.child_exit_code() != 0) {
348 lstr += "Child exited abnormally with code: ";
349 lstr += estring(m_exec.child_exit_code());
350 lstr += "\n";
351 }
352 else {
353 lstr += "Child exited successfully\n";
354 m_status = status_completed;
355 }
356 logger.write(lstr);
357
358 if (m_exec.child_signaled() || !m_exec.child_exited_normally()) {
359 /*
360 if (vaulter.overflow()) {
361 lstr = prefix();
362 lstr += "Vault overflow detected, will retry later\n";
363 logger.write(lstr);
364 m_status = status_reschedule;
365 }
366 else {
367 m_status = status_error;
368 }
369 */
370 m_status = status_error;
371 }
372 else {
373 m_status = status_completed;
374 }
375
376 m_timer.stop();
377 m_io_timer.stop();
378 lstr = prefix();
379 lstr += "Finished, duration: ";
380 lstr += m_timer.duration();
381 lstr += "\n";
382 logger.write(lstr);
383 // m_status = status_completed;
384 }
385 }
386
387 /** Return the job report for this job */
report(void) const388 single_job_report job_archiver::report(void) const
389 {
390 return(m_jr);
391 }
392
393 /** Child processor for a job
394
395 For each path in this job:
396 - Create the directory heiararchy for this job in the archive
397 - Until done or until out of retrys:
398 - Choose a hardlink source, if applicable and available
399 - Construct the command line to pass to rsync
400 - Spawn rsync
401 - Process I/O sent back from rsync
402 - Process exit code or signal number returned from rsync
403 - Generate and submit a report to the report manager
404
405 */
mf_do_chores(void)406 void job_archiver::mf_do_chores(void)
407 {
408 /*
409 {
410 bool wait_for_debugger = true;
411
412 std::cerr << "Waiting for debugger to attach..." << std::endl;
413 while (wait_for_debugger);
414 std::cerr << "Debugger attached." << std::endl;
415 }
416 */
417
418 job::paths_type::const_iterator pi;
419
420 for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
421 estring archive_dir;
422 estring path;
423 std::string binary;
424 std::vector<std::string> argv;
425 std::vector<std::string> extra_argv;
426 estring opt;
427 bool hardlink = false;
428 bool multi_hardlink = false;
429 int num_retries = 0;
430 bool done = false;
431 int exit_code = 0;
432 int signal_num = 0;
433 timer t;
434 uint64 files_total = 0;
435 uint64 files_xferd = 0;
436 uint64 size_total = 0;
437 uint64 size_xferd = 0;
438 bool overflow_detected = 0;
439 estring error_msg;
440
441 archive_dir = m_job->generate_archive_path(*pi);
442
443 path = archiver.working_archive_path();
444 path += "/";
445 path += archive_dir;
446 path = reform_path(path);
447
448 if (!exists(path)) {
449 std::string es;
450
451 TRY_nomem(es = "No such directory: \"");
452 TRY_nomem(es += archive_dir);
453 TRY_nomem(es += "\"");
454
455 throw(ERROR(EEXIST,es));
456 }
457
458 if (!writable(path)) {
459 std::string es;
460
461 TRY_nomem(es = "Cannot write to archive directory: \"");
462 TRY_nomem(es += archive_dir);
463 TRY_nomem(es += "\"");
464
465 throw(ERROR(EACCES,es));
466 }
467
468 logger.set_error_logging(false);
469 hardlink = m_job->rsync_hardlink;
470 multi_hardlink = m_job->rsync_multi_hardlink;
471 while ((num_retries <= m_job->rsync_retry_count) && !done) {
472 execute exec;
473 job::excludes_type::const_iterator ei;
474 job::includes_type::const_iterator ii;
475 std::vector<std::string>::const_iterator oi;
476
477 exit_code = 0;
478 signal_num = 0;
479
480 if (m_job->rsync_connection == job::connection_ssh_local) {
481 binary = config.ssh_local_path();
482 }
483 else {
484 binary = config.rsync_local_path();
485 }
486
487 argv.clear();
488 if (m_job->rsync_connection == job::connection_ssh_local) {
489 argv.push_back("-o");
490 argv.push_back("BatchMode=yes");
491 extra_argv = m_job->generate_ssh_options_vector();
492 argv.insert(argv.end(), extra_argv.begin(), extra_argv.end());
493 argv.push_back("-l");
494 argv.push_back(m_job->rsync_remote_user);
495 argv.push_back(m_job->hostname);
496 if (m_job->rsync_remote_path.size() != 0)
497 argv.push_back(m_job->rsync_remote_path);
498 else
499 argv.push_back(config.rsync_local_path());
500 }
501
502 extra_argv = m_job->generate_rsync_options_vector();
503 argv.insert(argv.end(), extra_argv.begin(), extra_argv.end());
504
505
506 if (m_job->rsync_connection != job::connection_local) {
507 opt = "--rsync-path=";
508 if (m_job->rsync_remote_path.size() != 0)
509 opt += m_job->rsync_remote_path;
510 else
511 opt += config.rsync_local_path();
512 argv.push_back(opt);
513 }
514
515 if (hardlink) {
516 subdirectory subdir;
517 std::string youngest;
518 std::string relative_path;
519 bool hardlink_opt = false;
520 bool first_hardlink = false;
521 int linkdest_count = 0;
522
523 subdir.path(vaulter.vault());
524 if (subdir.size() > 0) {
525 subdirectory::const_iterator si;
526
527 sort(subdir.begin(), subdir.end());
528 reverse(subdir.begin(), subdir.end());
529 for (si = subdir.begin(); si != subdir.end(); ++si) {
530 estring ypath;
531
532 if (first_hardlink && !multi_hardlink) {
533 continue;
534 }
535 if (linkdest_count >= m_job->rsync_multi_hardlink_max) {
536 continue;
537 }
538 if (!is_timestamp(*si))
539 continue;
540 if (*si == config.timestamp().str())
541 continue;
542 std::cout
543 << "Considering potential hardlink source: "
544 << *si
545 << std::endl;
546 ypath = vaulter.vault();
547 ypath += "/";
548 ypath += *si;
549 ypath += "/";
550 ypath += archive_dir;
551 if (exists(ypath)) {
552 std::cout
553 << "Using archive as hardlink source: "
554 << *si
555 << std::endl;
556 if (!hardlink_opt) {
557 argv.push_back(std::string("--hard-links"));
558 hardlink_opt = true;
559 }
560
561 relative_path = mk_relative_path(ypath,path);
562
563 opt="--link-dest=";
564 opt += relative_path;
565 argv.push_back(opt);
566
567 first_hardlink = true;
568 linkdest_count++; // At most 20 link-dest options can be used w/ rsync
569 }
570 else {
571 std::cout
572 << "- No such path: "
573 << ypath
574 << std::endl;
575 }
576 }
577 }
578 }
579
580 for (
581 ei = m_job->excludes.begin();
582 ei != m_job->excludes.end();
583 ++ei
584 )
585 {
586 opt = "--exclude-from=";
587 opt += *ei;
588 argv.push_back(opt);
589 }
590
591 for (
592 ii = m_job->includes.begin();
593 ii != m_job->includes.end();
594 ++ii
595 )
596 {
597 opt = "--include-from=";
598 opt += *ei;
599 argv.push_back(opt);
600 }
601
602 argv.push_back(m_job->generate_source_path(*pi));
603
604 argv.push_back(path);
605
606 std::cout << "Command Line:" << std::endl;
607 {
608 uint16 c;
609 std::cout << " Binary: " << binary << std::endl;
610 for (c = 0; c < argv.size(); c++) {
611 std::cout << " Argv[" << c << "] = " << argv[c] << std::endl;
612 }
613 }
614
615 m_error_msg.erase();
616
617 t.start();
618
619 m_io_timer.start();
620 m_rsync_timeout_flag = false;
621 exec.exec(binary, argv);
622
623 mf_process_rsync_io(
624 exec,
625 m_job->rsync_timeout,
626 files_total,
627 files_xferd,
628 size_total,
629 size_xferd,
630 overflow_detected
631 );
632 t.stop();
633
634 signal_num = 0;
635 if (exec.child_signaled()) {
636 std::cout
637 << "Rsync caught signal: ["
638 << exec.child_signal_no()
639 << "] "
640 << rsync_estat_str.signal(exec.child_signal_no())
641 << std::endl;
642 signal_num = exec.child_signal_no();
643 }
644 std::cout
645 << "Rsync exit code: ["
646 << exec.child_exit_code()
647 << "] "
648 << rsync_estat_str.exit(exec.child_exit_code())
649 << std::endl;
650
651 exit_code = exec.child_exit_code();
652 if (!m_rsync_timeout_flag && exec.child_exited_normally() && (exit_code == 0)) {
653 done = true;
654 }
655 else if (!m_rsync_timeout_flag && (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)) {
656 std::cout << "Ignoring rsync failure" << std::endl;
657 done = true;
658 }
659 else if (overflow_detected) {
660 std::cout
661 << "Vault overflow detected"
662 << std::endl;
663 break;
664 }
665 else {
666 ++num_retries;
667 logger.set_error_logging(true);
668 }
669 if (m_rsync_timeout_flag) {
670 TRY_nomem(m_error_msg = "Rsync inactivity timeout");
671 }
672
673 if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
674 {
675 std::cout << "Failing, will not attempt to retry" << std::endl;
676 break;
677 }
678 if (m_job->rsync_behavior[exit_code]
679 == rsync_behavior::retry_without_hardlinks)
680 {
681 std::cout << "Retrying without hardlinks..." << std::endl;
682 hardlink = false;
683 }
684
685 if ((!done) && (m_job->rsync_retry_delay > 0)) {
686 std::cout << "Retries left: " << (m_job->rsync_retry_count - num_retries + 1) << std::endl;
687 if (num_retries <= m_job->rsync_retry_count) {
688 std::cout << "Waiting " << m_job->rsync_retry_delay
689 << " minutes before retrying... " << std::endl;
690 sleep( (m_job->rsync_retry_delay) * 60);
691 }
692 }
693 }
694 if (!done) {
695 if (num_retries >= m_job->rsync_retry_count) {
696 std::cout << "Retry count exceeded" << std::endl;
697 m_success = false;
698 }
699 else {
700 std::cout << "Giving up on this path" << std::endl;
701 m_success = false;
702 }
703 }
704 reportio().write_report(
705 m_job->generate_source_path(*pi),
706 t,
707 exit_code,
708 signal_num,
709 m_job->rsync_behavior[exit_code],
710 m_error_msg
711 );
712 }
713 }
714
mf_process_report(const std::string & a_str)715 void job_archiver::mf_process_report(const std::string& a_str)
716 {
717 if (reportio().is_report(a_str)) {
718 m_jpr = reportio().parse(a_str);
719 m_jr.add_report(m_jpr);
720 }
721 }
722
723 /** Process I/O from the child
724
725 While there is I/O to be read, read and parse it. When the end of a line is
726 reached write that line to the log file. If a_finalize is true, the flush
727 the child I/O buffer string.
728
729 */
mf_process_child_io(bool a_finalize)730 void job_archiver::mf_process_child_io(bool a_finalize)
731 {
732 estring lstr;
733 bool io_flag = false;
734
735 while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
736 int r;
737 const int buffer_size = 128;
738 char buffer[buffer_size] = { 0 };
739
740 r = m_exec.out_read(buffer, buffer_size);
741 if (r > 0) {
742 int c;
743
744 io_flag = true;
745 for (c = 0; c < r; ++c) {
746 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
747 lstr = prefix();
748 lstr += m_io_out;
749 lstr += "\n";
750 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
751 mf_process_report(lstr);
752 m_io_out.erase();
753 }
754 else {
755 m_io_out += buffer[c];
756 }
757 }
758 }
759 }
760 if (a_finalize && (m_io_out.size() > 0)) {
761 lstr = prefix();
762 lstr += m_io_out;
763 lstr += "\n";
764 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
765 mf_process_report(lstr);
766 m_io_out.erase();
767 }
768
769 while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
770 int r;
771 const int buffer_size = 128;
772 char buffer[buffer_size] = { 0 };
773
774 r = m_exec.err_read(buffer, buffer_size);
775 if (r > 0) {
776 int c;
777
778 io_flag = true;
779 for (c = 0; c < r; ++c) {
780 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
781 lstr = prefix();
782 lstr += m_io_err;
783 lstr += "\n";
784 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
785 mf_process_report(lstr);
786 m_io_err.erase();
787 }
788 else {
789 m_io_err += buffer[c];
790 }
791 }
792 }
793 }
794 if (a_finalize && (m_io_err.size() > 0)) {
795 lstr = prefix();
796 lstr += m_io_err;
797 lstr += "\n";
798 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
799 mf_process_report(lstr);
800 m_io_err.erase();
801 }
802 if (!io_flag)
803 sleep(config.io_poll_interval());
804 }
805
806 /** Trim off all non-digit leading and trailing characters from a string */
mf_trim_string(std::string & a_str)807 void job_archiver::mf_trim_string(std::string& a_str)
808 {
809 while ((a_str.size() > 0) && (!isdigit(a_str[0])))
810 a_str.erase(0,1);
811 while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
812 a_str.erase(a_str.size()-1,1);
813 }
814
815 /** Parse I/O from rsync
816
817 Search for special output from rsync to tell us something about the number
818 and size of files and files transfered.
819
820 */
mf_parse_rsync_io(const std::string a_str,uint64 & a_files_total,uint64 & a_files_xferd,uint64 & a_size_total,uint64 & a_size_xferd)821 void job_archiver::mf_parse_rsync_io(
822 const std::string a_str,
823 uint64& a_files_total,
824 uint64& a_files_xferd,
825 uint64& a_size_total,
826 uint64& a_size_xferd
827 )
828 {
829 estring estr;
830
831 if (a_str.find("Number of files: ") == 0) {
832 estr = a_str;
833 mf_trim_string(estr);
834 try {
835 a_files_total = estr;
836 }
837 catch(error e) {
838 estring es;
839
840 es = "Could not parse total number of files processed by rsync";
841 e.push_back(ERROR_INSTANCE(es));
842
843 // Not sure this is the best way to handle this...
844 std::cerr << e;
845 }
846 catch(...) {
847 error e = err_unknown;
848 estring es;
849
850 es = "Could not parse total number of files processed by rsync";
851 e.push_back(ERROR_INSTANCE(es));
852
853 // Not sure this is the best way to handle this...
854 std::cerr << e;
855 }
856 }
857 else if (a_str.find("Number of files transferred: ") == 0) {
858 estr = a_str;
859 mf_trim_string(estr);
860 try {
861 a_files_xferd = estr;
862 }
863 catch(error e) {
864 estring es;
865
866 es = "Could not parse total number of files transferred by rsync";
867 e.push_back(ERROR_INSTANCE(es));
868
869 // Not sure this is the best way to handle this...
870 std::cerr << e;
871 }
872 catch(...) {
873 error e = err_unknown;
874 estring es;
875
876 es = "Could not parse total number of files transferred by rsync";
877 e.push_back(ERROR_INSTANCE(es));
878
879 // Not sure this is the best way to handle this...
880 std::cerr << e;
881 }
882 }
883 else if (a_str.find("Total file size: ") == 0) {
884 estr = a_str;
885 mf_trim_string(estr);
886 try {
887 a_size_total = estr;
888 }
889 catch(error e) {
890 estring es;
891
892 es = "Could not parse total size of files processed by rsync";
893 e.push_back(ERROR_INSTANCE(es));
894
895 // Not sure this is the best way to handle this...
896 std::cerr << e;
897 }
898 catch(...) {
899 error e = err_unknown;
900 estring es;
901
902 es = "Could not parse total size of files processed by rsync";
903 e.push_back(ERROR_INSTANCE(es));
904
905 // Not sure this is the best way to handle this...
906 std::cerr << e;
907 }
908 }
909 else if (a_str.find("Total transferred file size: ") == 0) {
910 estr = a_str;
911 mf_trim_string(estr);
912 try {
913 a_size_xferd = estr;
914 }
915 catch(error e) {
916 estring es;
917
918 es = "Could not parse total size of files transferred by rsync";
919 e.push_back(ERROR_INSTANCE(es));
920
921 // Not sure this is the best way to handle this...
922 std::cerr << e;
923 }
924 catch(...) {
925 error e = err_unknown;
926 estring es;
927
928 es = "Could not parse total size of files transferred by rsync";
929 e.push_back(ERROR_INSTANCE(es));
930
931 // Not sure this is the best way to handle this...
932 std::cerr << e;
933 }
934 }
935 }
936
937 /** Process I/O from rsync
938
939 If there is I/O from rsync to be read, read it and then send it through the
940 parser.
941
942 */
mf_process_rsync_io(execute & a_exec,uint16 a_timeout,uint64 & a_files_total,uint64 & a_files_xferd,uint64 & a_size_total,uint64 & a_size_xferd,bool & a_overflow_detected)943 void job_archiver::mf_process_rsync_io(
944 execute& a_exec,
945 uint16 a_timeout,
946 uint64& a_files_total,
947 uint64& a_files_xferd,
948 uint64& a_size_total,
949 uint64& a_size_xferd,
950 bool& a_overflow_detected
951 )
952 {
953 size_t ro;
954 size_t re;
955 estring out;
956 estring err;
957 bool io_flag;
958 char buffer[1024] = { 0 };
959 char *ptr;
960
961 ro = 1;
962 re = 1;
963 while ((ro != 0) || (re != 0) || a_exec.child_running()) {
964 io_flag = false;
965 ro = 0;
966 re = 0;
967
968 if (!a_overflow_detected) {
969 a_overflow_detected = vaulter.overflow();
970 }
971
972 m_error_msg.erase();
973
974 if (a_exec.out_ready()) {
975 ro = read(a_exec.out_fd(), buffer, 1024);
976 if (ro > 0) {
977 io_flag = true;
978 for (ptr = buffer; ptr != buffer+ro; ++ptr) {
979 if ((*ptr != '\r') && (*ptr != '\n')) {
980 out += *ptr;
981 }
982 else {
983 reportio().write_rsync_out(out);
984 out.erase();
985 }
986 }
987 }
988 }
989
990 if (a_exec.err_ready()) {
991 re = read(a_exec.err_fd(), buffer, 1024);
992 if (re > 0) {
993 io_flag = true;
994 for (ptr = buffer; ptr != buffer+re; ++ptr) {
995 if ((*ptr != '\r') && (*ptr != '\n')) {
996 err += *ptr;
997 }
998 else {
999 reportio().write_rsync_err(err);
1000 err.erase();
1001 }
1002 }
1003 }
1004 }
1005
1006 m_io_timer.stop();
1007 // std::cerr << "[DEBUG]: --------------------------" << std::endl;
1008 // std::cerr << "[DEBUG]: m_io_timer.start_value() = " << m_io_timer.start_value() << std::endl;
1009 // std::cerr << "[DEBUG]: m_io_timer.stop_value() = " << m_io_timer.stop_value() << std::endl;
1010 // std::cerr << "[DEBUG]: time(0) = " << time(0) << std::endl;
1011 // std::cerr << "[DEBUG]: m_io_timer.duration() = " << m_io_timer.duration_secs() << std::endl;
1012 // std::cerr << "[DEBUG]: difference = " << (time(0) - m_io_timer.start_value()) << std::endl;
1013 // std::cerr << "[DEBUG]: timeout = " << a_timeout << std::endl;
1014 // std::cerr << "[DEBUG]: io_flag = " << io_flag << std::endl;
1015 // std::cerr << "[DEBUG]: m_rsync_timeout_flag = " << m_rsync_timeout_flag << std::endl;
1016 if (io_flag) {
1017 m_io_timer.stop();
1018 m_io_timer.start();
1019 m_rsync_timeout_flag = false;
1020 }
1021 if (!m_rsync_timeout_flag && (m_io_timer.duration_secs() > a_timeout)) {
1022 std::cerr << "*** Rsync program inactivity timeout" << std::endl;
1023 a_exec.kill_child();
1024 m_rsync_timeout_flag = true;
1025 }
1026
1027 if (!io_flag)
1028 sleep(config.io_poll_interval());
1029
1030 }
1031 if (out.size() > 0) {
1032 std::cerr << out << std::endl;
1033 mf_parse_rsync_io(
1034 out,
1035 a_files_total,
1036 a_files_xferd,
1037 a_size_total,
1038 a_size_xferd
1039 );
1040 out.erase();
1041 }
1042 if (err.size() > 0) {
1043 std::cerr << err << std::endl;
1044 mf_parse_rsync_io(
1045 out,
1046 a_files_total,
1047 a_files_xferd,
1048 a_size_total,
1049 a_size_xferd
1050 );
1051 err.erase();
1052 }
1053 }
1054
1055 //-----------------------------------------------------------------------------
1056
1057 /** C'tor */
archive_manager()1058 archive_manager::archive_manager()
1059 {
1060 if (this != &archiver)
1061 throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
1062
1063 clear();
1064 }
1065
1066 /** Clear the archive manager and clear the job list */
clear(void)1067 void archive_manager::clear(void)
1068 {
1069 m_jobs.clear();
1070 m_initialized = false;
1071 }
1072
1073 /** Initialize the archive manager
1074
1075 Log the archive timestamp, select and prepare a vault.
1076
1077 */
init(void)1078 void archive_manager::init(void)
1079 {
1080 timer t;
1081 estring lstr;
1082
1083 lstr = "Archive Manager - Beginning initialization\n";
1084 logger.write(lstr);
1085 t.start();
1086
1087 lstr = "Timestamp: ";
1088 lstr += config.timestamp().str();
1089 lstr += "\n";
1090 logger.write(lstr);
1091
1092 // Select a vault?
1093 vaulter.select();
1094 lstr = "Vault selected: ";
1095 lstr += vaulter.vault();
1096 lstr += "\n";
1097 logger.write(lstr);
1098
1099 reporter.vault().add_report(
1100 vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
1101 );
1102
1103 // Prepare the vault?
1104 vaulter.prepare();
1105
1106 t.stop();
1107 lstr = "Archive Manager - Finished initialization";
1108 lstr += ", duration: ";
1109 lstr += t.duration();
1110 lstr += "\n";
1111 logger.write(lstr);
1112
1113 m_initialized = true;
1114 }
1115
1116 /** Return the initialized status of the archive manager */
initialized(void) const1117 const bool archive_manager::initialized(void) const
1118 {
1119 return(m_initialized);
1120 }
1121
1122 /** Give a status report
1123
1124 After so many minutes of inactivity write a report to the log file of our
1125 current status of affairs.
1126
1127 */
mf_log_status(void)1128 void archive_manager::mf_log_status(void)
1129 {
1130 static timer t;
1131 const timer::value_type timeout = 30;
1132 estring lstr;
1133 std::vector<job_archiver*>::const_iterator ji;
1134 uint64 jobs_pending = 0;
1135 uint64 jobs_processing = 0;
1136 uint64 jobs_completed =0;
1137 uint64 jobs_remaining = 0;
1138
1139 if (!t.is_started())
1140 t.start();
1141
1142 t.stop();
1143 if (t.duration_mins() < timeout)
1144 return;
1145
1146 lstr = "\n";
1147 lstr += "STATUS REPORT:\n";
1148 lstr += "================================================================\n";
1149 logger.write(lstr);
1150 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
1151 lstr = "[";
1152 switch ((*ji)->status()) {
1153 case job_archiver::status_pending:
1154 lstr += "Pending ";
1155 ++jobs_pending;
1156 break;
1157 case job_archiver::status_processing:
1158 lstr += "Processing ";
1159 ++jobs_processing;
1160 break;
1161 case job_archiver::status_reschedule:
1162 lstr += "Reschedule ";
1163 ++jobs_pending;
1164 break;
1165 case job_archiver::status_fatal_error:
1166 lstr += "Fatal Error";
1167 ++jobs_completed;
1168 break;
1169 case job_archiver::status_error:
1170 lstr += "Error ";
1171 ++jobs_completed;
1172 break;
1173 case job_archiver::status_completed:
1174 lstr += "Completed ";
1175 ++jobs_completed;
1176 break;
1177 case job_archiver::status_done:
1178 lstr += "Done ";
1179 ++jobs_completed;
1180 break;
1181 default:
1182 lstr += "<unknown> ";
1183 break;
1184 }
1185 lstr += "]: ";
1186 lstr += (*ji)->id();
1187 lstr += "\n";
1188 logger.write(lstr);
1189 }
1190
1191 lstr = "---------------------------------------------------------------\n";
1192 lstr += " Jobs Pending: ";
1193 lstr += estring(jobs_pending);
1194 lstr += "\n";
1195
1196 lstr += " Jobs Processing: ";
1197 lstr += estring(jobs_processing);
1198 lstr += "\n";
1199
1200 lstr += " Jobs Completed: ";
1201 lstr += estring(jobs_completed);
1202 lstr += "\n";
1203
1204 lstr += " Total: ";
1205 lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
1206 lstr += "\n";
1207
1208 lstr += "Overflow Detected: ";
1209 if (vaulter.overflow()) {
1210 lstr += "TRUE";
1211 }
1212 else {
1213 lstr += "false";
1214 }
1215 lstr += "\n";
1216
1217 logger.write(lstr);
1218 logger.write("\n");
1219 t.start();
1220 }
1221
1222 /** Archive jobs
1223
1224 Create an archive directory. Generate a to-do list of job archiver objects.
1225 Process the job archiver objects:
1226 - While there are less than rsync-parallel job archivers processing, start
1227 the first available job archiver.
1228 - Check the status of each job and process I/O from jobs underway.
1229 - Remove jobs that failed to start.
1230 - Possibly reschedule failed jobs.
1231 - Remove completed jobs from active processing.
1232 - Call mf_log_status().
1233
1234 */
archive(void)1235 void archive_manager::archive(void)
1236 {
1237 timer t;
1238 estring lstr;
1239 configuration_manager::jobs_type::const_iterator cji;
1240 int num_processing = 0;
1241 std::vector<job_archiver*>::iterator ji;
1242 uint64 num_completed = 0;
1243 bool overflow_detected = false;
1244 estring debug_estr;
1245
1246 if (!initialized())
1247 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
1248
1249 lstr = "Archive Manager - Begin archiving\n";
1250 logger.write(lstr);
1251 t.start();
1252
1253 // Create archive directory
1254 try {
1255 if (exists(archive_path())) {
1256 lstr = "Found existing archive directory: ";
1257 lstr += archive_path();
1258 lstr += "\n";
1259 logger.write(lstr);
1260
1261 if (!writable(archive_path())) {
1262 std::string es;
1263
1264 TRY_nomem(es = "Cannot write to archive directory: \"");
1265 TRY_nomem(es += archive_path());
1266 TRY_nomem(es += "\"");
1267
1268 throw(ERROR(EACCES,es));
1269 }
1270
1271 rename_file(archive_path(), working_archive_path());
1272 }
1273 else if (exists(working_archive_path())) {
1274 lstr = "Found existing archive directory: ";
1275 lstr += working_archive_path();
1276 lstr += "\n";
1277 logger.write(lstr);
1278
1279 if (!writable(working_archive_path())) {
1280 std::string es;
1281
1282 TRY_nomem(es = "Cannot write to working archive directory: \"");
1283 TRY_nomem(es += working_archive_path());
1284 TRY_nomem(es += "\"");
1285
1286 throw(ERROR(EACCES,es));
1287 }
1288 }
1289 else {
1290 lstr = "Creating archive directory: ";
1291 lstr += working_archive_path();
1292 lstr += "\n";
1293 logger.write(lstr);
1294 mk_dir(working_archive_path());
1295 }
1296 }
1297 catch(error e) {
1298 logger.write("An error has occured: ");
1299 logger.write(e[0].what());
1300 logger.write("\n");
1301 throw(e);
1302 }
1303 catch(...) {
1304 error e = err_unknown;
1305
1306 logger.write("An error has occured: ");
1307 logger.write(e[0].what());
1308 logger.write("\n");
1309 throw(e);
1310 }
1311
1312 // Create a todo list
1313 logger.write("Creating to-do list\n");
1314 for (
1315 cji = config.jobs().begin();
1316 cji != config.jobs().end();
1317 ++cji
1318 )
1319 {
1320 job_archiver* ptr;
1321
1322 ptr = new job_archiver(&(*cji));
1323 if (ptr == 0)
1324 throw(err_nomem);
1325 TRY_nomem(m_jobs.push_back(ptr));
1326 }
1327
1328 // Backup clients
1329 logger.write("Processing jobs...\n");
1330 while (m_jobs.size() > num_completed) {
1331
1332 /*
1333 logger.write("[DEBUG]: ---[ TOP OF LOOP ]---\n");
1334
1335 debug_estr = "[DEBUG]: overflow_detected = ";
1336 debug_estr += estring(overflow_detected);
1337 debug_estr += "\n";
1338 logger.write(debug_estr);
1339
1340 debug_estr = "[DEBUG]: num_processing = ";
1341 debug_estr += estring(num_processing);
1342 debug_estr += "\n";
1343 logger.write(debug_estr);
1344 */
1345
1346 if (!overflow_detected) {
1347 overflow_detected = vaulter.overflow(true);
1348 /*
1349 if (overflow_detected) {
1350 logger.write("[DEBUG]: Variable Change :: ");
1351 logger.write("overflow_detected = true");
1352 logger.write("\n");
1353 }
1354 */
1355 }
1356
1357 // If the vault has exceeded it's highwater mark, wait for the
1358 // currently-processing jobs to terminate, and then attempt to prepare the
1359 // vault.
1360 if (overflow_detected && (num_processing == 0)) {
1361 TRY(vaulter.prepare(true),"Cannot complete archive");
1362 overflow_detected = vaulter.overflow();
1363 /*
1364 if (!overflow_detected) {
1365 logger.write("[DEBUG]: Variable Change :: ");
1366 logger.write("overflow_detected = false");
1367 logger.write("\n");
1368 }
1369 */
1370 }
1371
1372 // For each job in the list...
1373 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
1374 {
1375 // While we're running less than rsync-parallel jobs, start new ones
1376 if (
1377 !overflow_detected
1378 && (num_processing < config.rsync_parallel())
1379 && ((*ji)->status() == job_archiver::status_pending)
1380 )
1381 {
1382 try {
1383 (*ji)->start();
1384 }
1385 catch(error e) {
1386 if (e.num() == 12) {
1387 lstr = "Error starting job: Out of memory, will retry later\n";
1388 (*ji)->clear();
1389 }
1390 else {
1391 (*ji)->end();
1392 lstr = "Error starting job, aborting\n";
1393 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1394 num_processing--;
1395 reporter.jobs().add_report((*ji)->report());
1396 }
1397 logger.write(lstr);
1398 }
1399 catch(...) {
1400 (*ji)->end();
1401 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
1402 lstr += "-- JOB TERMINATED\n";
1403 logger.write(lstr);
1404 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1405 num_processing--;
1406 reporter.jobs().add_report((*ji)->report());
1407 }
1408 // logger.write("[DEBUG]: Variable Change :: num_processing++\n");
1409 num_processing++;
1410 }
1411
1412 // Process jobs
1413 if ((*ji)->status() == job_archiver::status_processing) {
1414 try {
1415 (*ji)->process();
1416 }
1417 catch(error e) {
1418 // TODO: Change 12 to ENOMEM?
1419 if (e.num() == 12) {
1420 lstr = "Error starting job: Out of memory, will retry later\n";
1421 (*ji)->clear();
1422 }
1423 else {
1424 (*ji)->end();
1425 lstr = "Error starting job, aborting\n";
1426 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1427 num_processing--;
1428 reporter.jobs().add_report((*ji)->report());
1429 }
1430 logger.write(lstr);
1431 }
1432 catch(...) {
1433 (*ji)->end();
1434 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
1435 lstr += "-- JOB TERMINATED\n";
1436 logger.write(lstr);
1437 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1438 num_processing--;
1439 reporter.jobs().add_report((*ji)->report());
1440 }
1441 }
1442
1443 // Remove jobs that could not start from active duty
1444 if ((*ji)->status() == job_archiver::status_reschedule) {
1445 (*ji)->clear();
1446 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1447 num_processing--;
1448 }
1449
1450 // If a job exited with an error, and the vault is full, then reschedule
1451 // the job for later
1452 if (
1453 ((*ji)->status() == job_archiver::status_error)
1454 &&
1455 overflow_detected
1456 )
1457 {
1458 lstr = "Vault overflow detected, will retry job later\n";
1459 logger.write(lstr);
1460 (*ji)->clear();
1461 num_processing--;
1462 }
1463
1464 // Remove completed jobs from the processing list
1465 if (
1466 ((*ji)->status() == job_archiver::status_completed)
1467 || ((*ji)->status() == job_archiver::status_fatal_error)
1468 || ((*ji)->status() == job_archiver::status_error)
1469 ) {
1470 (*ji)->end();
1471 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1472 num_processing--;
1473 num_completed++;
1474
1475 // logger.write("Adding job report to report manager\n");
1476 reporter.jobs().add_report((*ji)->report());
1477 }
1478 }
1479
1480 mf_log_status();
1481 sleep(1);
1482
1483 // logger.write("[DEBUG]: ---[ BOTTOM OF LOOP ]---\n");
1484 }
1485
1486 t.stop();
1487 lstr = "Archive Manager - Finished archiving, duration: ";
1488 lstr += t.duration();
1489 lstr += "\n";
1490 logger.write(lstr);
1491
1492 lstr = "Archive Manager - Finalizing archive path\n";
1493 logger.write(lstr);
1494 TRY(
1495 rename_file(working_archive_path(), archive_path()),
1496 "Cannot finalize archive"
1497 );
1498
1499 reporter.vault().add_report(
1500 vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
1501 );
1502 }
1503
1504 /** Return an absolute path to the finished archive directory */
archive_path(void) const1505 const std::string archive_manager::archive_path(void) const
1506 {
1507 estring path;
1508
1509 if (!initialized())
1510 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
1511
1512 path = vaulter.vault();
1513 path += "/";
1514 path += config.timestamp().str();
1515
1516 return(path);
1517 }
1518
1519 /** Return the absolute path to the unfinished working archive directory */
working_archive_path(void) const1520 const std::string archive_manager::working_archive_path(void) const
1521 {
1522 estring path;
1523
1524 if (!initialized())
1525 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
1526
1527 path = archive_path();
1528 path += ".incomplete";
1529
1530 return(path);
1531 }
1532
1533 //-----------------------------------------------------------------------------
1534
1535 /** The global archive manager */
1536 archive_manager archiver;
1537
1538