1 #include "config.h"
2 
3 #ifdef HAVE_UNISTD_H
4 #include <unistd.h>
5 #endif
6 
7 #include <ctype.h>
8 
9 #include <iostream>
10 #include <string>
11 #include <vector>
12 
13 #include "asserts.h"
14 #include "error.h"
15 #include "estring.h"
16 #include "fs.h"
17 #include "rconfig.h"
18 #include "timer.h"
19 #include "logger.h"
20 
21 #include "archiver.h"
22 
23 //-----------------------------------------------------------------------------
24 
25 /** C'tor */
rstat()26 rstat::rstat()
27 {
28 	TRY_nomem(m_exit_str[0] = "Success");
29 	TRY_nomem(m_exit_str[1] = "Syntax or usage error");
30 	TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
31 	TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
32 	TRY_nomem(m_exit_str[4] = "Requested action not supported");
33 	TRY_nomem(m_exit_str[10] = "Error in socket I/O");
34 	TRY_nomem(m_exit_str[11] = "Error in file I/O");
35 	TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
36 	TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
37 	TRY_nomem(m_exit_str[14] = "Error in IPC code");
38 	TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
39 	TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
40 	TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
41 	TRY_nomem(m_exit_str[23] = "Partial transfer");
42 	TRY_nomem(m_exit_str[24] = "Some files vanished before they could be transferred");
43 	TRY_nomem(m_exit_str[25] = "The --max-delete limit stopped deletions");
44 	TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
45 	TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
46 	TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
47 	TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
48 	TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
49 	TRY_nomem(m_exit_str[255] = "An SSH error occured");
50 
51 	TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
52 	TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
53 	TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
54 	TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
55 	TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
56 	TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
57 	TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
58 	TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
59 	TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
60 	TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
61 	TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
62 	TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
63 	TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
64 	TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
65 	TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
66 	TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
67 	TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
68 	TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
69 	TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
70 	TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
71 	TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
72 	TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
73 	TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
74 	TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
75 	TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
76 	TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
77 	TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
78 	TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
79 	TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
80 	TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
81 	TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
82 	TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
83 	TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
84 	TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
85 	TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
86 	TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
87 	TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
88 	TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
89 	TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
90 	TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
91 
92 	TRY_nomem(m_unknown_exit = "(Unknown exit code)");
93 	TRY_nomem(m_unknown_signal = "(Unknown signal)");
94 }
95 
96 /** Get a verbose string for an exit code */
exit(const int a_int) const97 const std::string& rstat::exit(const int a_int) const
98 {
99 	if (m_exit_str.find(a_int) != m_exit_str.end()) {
100 		return(m_exit_str.find(a_int)->second);
101 	}
102 	return(m_unknown_exit);
103 }
104 
105 /** Get a verbose string for a signal number */
signal(const int a_int) const106 const std::string& rstat::signal(const int a_int) const
107 {
108 	if (m_signal_str.find(a_int) != m_signal_str.end()) {
109 		return(m_signal_str.find(a_int)->second);
110 	}
111 	return(m_unknown_signal);
112 }
113 
114 class rstat rsync_estat_str;
115 
116 //-----------------------------------------------------------------------------
117 
118 /** C'tor
119 
120 	Set a job to be assiciated with this job archiver and initialize it's
121 	processing status to "pending".
122 
123  */
job_archiver(const job * a_job)124 job_archiver::job_archiver(const job * a_job)
125 {
126 	clear();
127 	m_job = a_job;
128 	m_status = status_pending;
129 }
130 
131 /** Generate a job prefix string
132 
133 	Create a string to uniquely identify this job to be used in the log file to
134 	uniquely identify this job
135 
136  */
prefix(void)137 const std::string job_archiver::prefix(void)
138 {
139 	estring lstr;
140 
141 	lstr = "[Job:";
142 	lstr += estring((void*)m_job);
143 	lstr += "] ";
144 
145 	return(lstr);
146 }
147 
148 /** Generate a job id string */
id(void)149 const std::string job_archiver::id(void)
150 {
151 	estring lstr;
152 
153 	lstr = prefix();
154 	lstr += " ";
155 	lstr += m_job->generate_job_id();
156 
157 	return(lstr);
158 }
159 
160 /** Clear the job archiver and return it to it's initial state
161 
162 	End any processes handling this job and return the job archiver to it's
163 	"pending" state.
164 
165  */
clear(void)166 void job_archiver::clear(void)
167 {
168 	end();
169 	m_child_pid = m_exec.my_pid();
170 	m_status = status_pending;
171 	m_success = true;
172 	m_jr.clear();
173 	m_jpr.clear();
174 	m_error_msg.erase();
175 	m_rsync_timeout_flag = false;
176 }
177 
178 /** End any processes handling this job
179 
180 	If any child processes are handling this job, terminate them.  Erase any
181 	pending I/O for the now defunct child.  Set our processing status to "done".
182 
183 */
end(void)184 void job_archiver::end(void)
185 {
186 	estring lstr;
187 
188 	m_timer.stop();
189 	m_io_timer.stop();
190 	if (m_exec.child_running()) {
191 		lstr = prefix();
192 		lstr += "Terminating child process!\n";
193 		logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid);
194 		m_exec.kill_child();
195 	}
196 	m_exec.clear();
197 	m_io_out.erase();
198 	m_io_err.erase();
199 	m_status = status_done;
200 }
201 
202 /** Return the processing status of this job archiver */
status(void)203 const job_archiver::archiving_status job_archiver::status(void)
204 {
205 	return(m_status);
206 }
207 
208 /** Begin processing
209 
210 	Attempt to fork a child process to handle this job.  If unsuccessful then
211 	retry again later.  The child then calls mf_do_chores() to handle the actual
212 	processing, while the parent updates the job archiver's status from
213 	"pending" to "processing" and begins a timer to measure the duration of the
214 	job process.
215 
216 */
start(void)217 void job_archiver::start(void)
218 {
219 	estring lstr;
220 	estring path;
221 	estring archive_dir;
222 
223 	m_jr.id(m_job->generate_job_id());
224 
225 	// Create directories before forking
226 	job::paths_type::const_iterator pi;
227 	for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
228 		archive_dir = m_job->generate_archive_path(*pi);
229 		path = archiver.working_archive_path();
230 		path += "/";
231 		path += archive_dir;
232 		path = reform_path(path);
233 		if (!exists(path)) {
234 			lstr = prefix();
235 			lstr += "Creating job archive path: ";
236 			lstr += archive_dir;
237 			lstr += "\n";
238 			logger.write(lstr);
239 			mk_dirhier(path);
240 		}
241 	}
242 
243 	try {
244 		m_exec.fork();
245 	}
246 	catch(error e) {
247 		lstr = prefix();
248 		lstr += "Could not fork:\n";
249 		logger.write(lstr);
250 
251 		lstr = e.str(prefix());
252 		logger.write(lstr);
253 
254 		lstr = prefix();
255 		lstr += "Will retry job later\n";
256 		logger.write(lstr);
257 
258 		m_status = status_reschedule;
259 	}
260 	catch(...) {
261 		error e = err_unknown;
262 
263 		lstr = prefix();
264 		lstr += "Could not fork:\n";
265 		logger.write(lstr);
266 
267 		lstr = e.str(prefix());
268 		logger.write(lstr);
269 
270 		lstr = prefix();
271 		lstr += "Will retry job later\n";
272 		logger.write(lstr);
273 
274 		m_status = status_reschedule;
275 	}
276 
277 	if (m_exec.is_child()) {
278 		// wait_for_debugger = true;
279 
280 		// while (wait_for_debugger);
281 
282 		m_exec.reroute_stdio();
283 		try {
284 			mf_do_chores();
285 		}
286 		catch(error e) {
287 			std::cerr << e;
288 			m_success = false;
289 		}
290 		catch(...) {
291 			std::cerr << err_unknown;
292 			m_success = false;
293 		}
294 		if (m_success)
295 			m_exec.exit(0);
296 		else
297 			m_exec.exit(1);
298 	}
299 
300 	m_child_pid = m_exec.child_pid();
301 
302 	lstr = prefix();
303 	lstr += "Spawning child process: PID ";
304 	lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
305 	lstr += "\n";
306 	logger.write(lstr);
307 
308 	m_status = status_processing;
309 	m_timer.start();
310 	m_io_timer.start();
311 	m_rsync_timeout_flag = false;
312 }
313 
314 /** Parent processor for a job
315 
316 	Check for I/O from the child.  Check the child's status to see if it's still
317 	running, has exited with an exit code, or has exited from a signal.  If the
318 	child sis not exit normally (i.e. exit from a signal or exit with a non-zero
319 	exit code) then check the vault for overflow.  If the vault has exceeded
320 	it's overflow threshold then that could be the cause for the child's
321 	failure, in which case we reschedule the child to be processed again later.
322 
323 	If the job is finished (whether successful or not), update the job
324 	archiver's status to "completed".
325 
326  */
process(void)327 void job_archiver::process(void)
328 {
329 	estring lstr;
330 
331 	if (m_exec.child_running()) {
332 		// Process child I/O
333 		mf_process_child_io(false);
334 	}
335 	else {
336 		// Process remaining child I/O
337 		mf_process_child_io(true);
338 
339 		// If child exited with an error, check vault overflow.  If the vault is
340 		// filling up, then reschedule the job for later retry.
341 		lstr = prefix();
342 		if (m_exec.child_signaled()) {
343 			lstr += "Child exited from signal: ";
344 			lstr += estring(m_exec.child_signal_no());
345 			lstr += "\n";
346 		}
347 		else if (m_exec.child_exit_code() != 0) {
348 			lstr += "Child exited abnormally with code: ";
349 			lstr += estring(m_exec.child_exit_code());
350 			lstr += "\n";
351 		}
352 		else {
353 			lstr += "Child exited successfully\n";
354 			m_status = status_completed;
355 		}
356 		logger.write(lstr);
357 
358 		if (m_exec.child_signaled() || !m_exec.child_exited_normally()) {
359 			/*
360 			if (vaulter.overflow()) {
361 				lstr = prefix();
362 				lstr += "Vault overflow detected, will retry later\n";
363 				logger.write(lstr);
364 				m_status = status_reschedule;
365 			}
366 			else {
367 				m_status = status_error;
368 			}
369 			*/
370 			m_status = status_error;
371 		}
372 		else {
373 			m_status = status_completed;
374 		}
375 
376 		m_timer.stop();
377 		m_io_timer.stop();
378 		lstr = prefix();
379 		lstr += "Finished, duration: ";
380 		lstr += m_timer.duration();
381 		lstr += "\n";
382 		logger.write(lstr);
383 		// m_status = status_completed;
384 	}
385 }
386 
387 /** Return the job report for this job */
report(void) const388 single_job_report job_archiver::report(void) const
389 {
390 	return(m_jr);
391 }
392 
393 /** Child processor for a job
394 
395 	For each path in this job:
396 	- Create the directory heiararchy for this job in the archive
397 	- Until done or until out of retrys:
398 		- Choose a hardlink source, if applicable and available
399 		- Construct the command line to pass to rsync
400 		- Spawn rsync
401 		- Process I/O sent back from rsync
402 		- Process exit code or signal number returned from rsync
403 	- Generate and submit a report to the report manager
404 
405  */
mf_do_chores(void)406 void job_archiver::mf_do_chores(void)
407 {
408 	/*
409 	{
410 		bool wait_for_debugger = true;
411 
412 		std::cerr << "Waiting for debugger to attach..." << std::endl;
413 		while (wait_for_debugger);
414 		std::cerr << "Debugger attached." << std::endl;
415 	}
416 	*/
417 
418 	job::paths_type::const_iterator pi;
419 
420 	for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
421 		estring archive_dir;
422 		estring path;
423 		std::string binary;
424 		std::vector<std::string> argv;
425 		std::vector<std::string> extra_argv;
426 		estring opt;
427 		bool hardlink = false;
428 		bool multi_hardlink = false;
429 		int num_retries = 0;
430 		bool done = false;
431 		int exit_code = 0;
432 		int signal_num = 0;
433 		timer t;
434 		uint64 files_total = 0;
435 		uint64 files_xferd = 0;
436 		uint64 size_total = 0;
437 		uint64 size_xferd = 0;
438 		bool overflow_detected = 0;
439 		estring error_msg;
440 
441 		archive_dir = m_job->generate_archive_path(*pi);
442 
443 		path = archiver.working_archive_path();
444 		path += "/";
445 		path += archive_dir;
446 		path = reform_path(path);
447 
448 		if (!exists(path)) {
449 			std::string es;
450 
451 			TRY_nomem(es = "No such directory: \"");
452 			TRY_nomem(es += archive_dir);
453 			TRY_nomem(es += "\"");
454 
455 			throw(ERROR(EEXIST,es));
456 		}
457 
458 		if (!writable(path)) {
459 			std::string es;
460 
461 			TRY_nomem(es = "Cannot write to archive directory: \"");
462 			TRY_nomem(es += archive_dir);
463 			TRY_nomem(es += "\"");
464 
465 			throw(ERROR(EACCES,es));
466 		}
467 
468 		logger.set_error_logging(false);
469 		hardlink = m_job->rsync_hardlink;
470 		multi_hardlink = m_job->rsync_multi_hardlink;
471 		while ((num_retries <= m_job->rsync_retry_count) && !done) {
472 			execute exec;
473 			job::excludes_type::const_iterator ei;
474 			job::includes_type::const_iterator ii;
475 			std::vector<std::string>::const_iterator oi;
476 
477 			exit_code = 0;
478 			signal_num = 0;
479 
480 			if (m_job->rsync_connection == job::connection_ssh_local) {
481 				binary = config.ssh_local_path();
482 			}
483 			else {
484 				binary = config.rsync_local_path();
485 			}
486 
487 			argv.clear();
488 			if (m_job->rsync_connection == job::connection_ssh_local) {
489 				argv.push_back("-o");
490 				argv.push_back("BatchMode=yes");
491 				extra_argv = m_job->generate_ssh_options_vector();
492 				argv.insert(argv.end(), extra_argv.begin(), extra_argv.end());
493 				argv.push_back("-l");
494 				argv.push_back(m_job->rsync_remote_user);
495 				argv.push_back(m_job->hostname);
496 				if (m_job->rsync_remote_path.size() != 0)
497 					argv.push_back(m_job->rsync_remote_path);
498 				else
499 					argv.push_back(config.rsync_local_path());
500 			}
501 
502 			extra_argv = m_job->generate_rsync_options_vector();
503 			argv.insert(argv.end(), extra_argv.begin(), extra_argv.end());
504 
505 
506 			if (m_job->rsync_connection != job::connection_local) {
507 				opt = "--rsync-path=";
508 				if (m_job->rsync_remote_path.size() != 0)
509 					opt += m_job->rsync_remote_path;
510 				else
511 					opt += config.rsync_local_path();
512 				argv.push_back(opt);
513 			}
514 
515 			if (hardlink) {
516 				subdirectory subdir;
517 				std::string youngest;
518 				std::string relative_path;
519 				bool hardlink_opt = false;
520 				bool first_hardlink = false;
521 				int linkdest_count = 0;
522 
523 				subdir.path(vaulter.vault());
524 				if (subdir.size() > 0) {
525 					subdirectory::const_iterator si;
526 
527 					sort(subdir.begin(), subdir.end());
528 					reverse(subdir.begin(), subdir.end());
529 					for (si = subdir.begin(); si != subdir.end(); ++si) {
530 						estring ypath;
531 
532 						if (first_hardlink && !multi_hardlink) {
533 							continue;
534 						}
535 						if (linkdest_count >= m_job->rsync_multi_hardlink_max) {
536 							continue;
537 						}
538 						if (!is_timestamp(*si))
539 							continue;
540 						if (*si == config.timestamp().str())
541 							continue;
542 						std::cout
543 							<< "Considering potential hardlink source: "
544 							<< *si
545 							<< std::endl;
546 						ypath = vaulter.vault();
547 						ypath += "/";
548 						ypath += *si;
549 						ypath += "/";
550 						ypath += archive_dir;
551 						if (exists(ypath)) {
552 							std::cout
553 								<< "Using archive as hardlink source: "
554 								<< *si
555 								<< std::endl;
556 							if (!hardlink_opt) {
557 								argv.push_back(std::string("--hard-links"));
558 								hardlink_opt = true;
559 							}
560 
561 							relative_path = mk_relative_path(ypath,path);
562 
563 							opt="--link-dest=";
564 							opt += relative_path;
565 							argv.push_back(opt);
566 
567 							first_hardlink = true;
568 							linkdest_count++; // At most 20 link-dest options can be used w/ rsync
569 						}
570 						else {
571 							std::cout
572 								<< "- No such path: "
573 								<< ypath
574 								<< std::endl;
575 						}
576 					}
577 				}
578 			}
579 
580 			for (
581 				ei = m_job->excludes.begin();
582 				ei != m_job->excludes.end();
583 				++ei
584 				)
585 			{
586 				opt = "--exclude-from=";
587 				opt += *ei;
588 				argv.push_back(opt);
589 			}
590 
591 			for (
592 				ii = m_job->includes.begin();
593 				ii != m_job->includes.end();
594 				++ii
595 				)
596 			{
597 				opt = "--include-from=";
598 				opt += *ei;
599 				argv.push_back(opt);
600 			}
601 
602 			argv.push_back(m_job->generate_source_path(*pi));
603 
604 			argv.push_back(path);
605 
606 			std::cout << "Command Line:" << std::endl;
607 			{
608 				uint16 c;
609 				std::cout << "  Binary: " << binary << std::endl;
610 				for (c = 0; c < argv.size(); c++) {
611 					std::cout << "    Argv[" << c << "] = " << argv[c] << std::endl;
612 				}
613 			}
614 
615 			m_error_msg.erase();
616 
617 			t.start();
618 
619 			m_io_timer.start();
620 			m_rsync_timeout_flag = false;
621 			exec.exec(binary, argv);
622 
623 			mf_process_rsync_io(
624 				exec,
625 				m_job->rsync_timeout,
626 				files_total,
627 				files_xferd,
628 				size_total,
629 				size_xferd,
630 				overflow_detected
631 				);
632 			t.stop();
633 
634 			signal_num = 0;
635 			if (exec.child_signaled()) {
636 				std::cout
637 					<< "Rsync caught signal: ["
638 					<< exec.child_signal_no()
639 					<< "] "
640 					<< rsync_estat_str.signal(exec.child_signal_no())
641 					<< std::endl;
642 				signal_num = exec.child_signal_no();
643 			}
644 			std::cout
645 				<< "Rsync exit code: ["
646 				<< exec.child_exit_code()
647 				<< "] "
648 				<< rsync_estat_str.exit(exec.child_exit_code())
649 				<< std::endl;
650 
651 			exit_code = exec.child_exit_code();
652 			if (!m_rsync_timeout_flag && exec.child_exited_normally() && (exit_code == 0)) {
653 				done = true;
654 			}
655 			else if (!m_rsync_timeout_flag && (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)) {
656 				std::cout << "Ignoring rsync failure" << std::endl;
657 				done = true;
658 			}
659 			else if (overflow_detected) {
660 				std::cout
661 					<< "Vault overflow detected"
662 					<< std::endl;
663 				break;
664 			}
665 			else {
666 				++num_retries;
667 				logger.set_error_logging(true);
668 			}
669 			if (m_rsync_timeout_flag) {
670 				TRY_nomem(m_error_msg = "Rsync inactivity timeout");
671 			}
672 
673 			if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
674 			{
675 				std::cout << "Failing, will not attempt to retry" << std::endl;
676 				break;
677 			}
678 			if (m_job->rsync_behavior[exit_code]
679 				== rsync_behavior::retry_without_hardlinks)
680 			{
681 				std::cout << "Retrying without hardlinks..." << std::endl;
682 				hardlink = false;
683 			}
684 
685 			if ((!done) && (m_job->rsync_retry_delay > 0)) {
686 				std::cout << "Retries left: " << (m_job->rsync_retry_count - num_retries + 1) << std::endl;
687 				if (num_retries <= m_job->rsync_retry_count) {
688 					std::cout << "Waiting " << m_job->rsync_retry_delay
689 						<< " minutes before retrying... " << std::endl;
690 					sleep( (m_job->rsync_retry_delay) * 60);
691 				}
692 			}
693 		}
694 		if (!done) {
695 			if (num_retries >= m_job->rsync_retry_count) {
696 				std::cout << "Retry count exceeded" << std::endl;
697 				m_success = false;
698 			}
699 			else {
700 				std::cout << "Giving up on this path" << std::endl;
701 				m_success = false;
702 			}
703 		}
704 		reportio().write_report(
705 			m_job->generate_source_path(*pi),
706 			t,
707 			exit_code,
708 			signal_num,
709 			m_job->rsync_behavior[exit_code],
710 			m_error_msg
711 			);
712 	}
713 }
714 
mf_process_report(const std::string & a_str)715 void job_archiver::mf_process_report(const std::string& a_str)
716 {
717 	if (reportio().is_report(a_str)) {
718 		m_jpr = reportio().parse(a_str);
719 		m_jr.add_report(m_jpr);
720 	}
721 }
722 
723 /** Process I/O from the child
724 
725 	While there is I/O to be read, read and parse it.  When the end of a line is
726 	reached write that line to the log file.  If a_finalize is true, the flush
727 	the child I/O buffer string.
728 
729  */
mf_process_child_io(bool a_finalize)730 void job_archiver::mf_process_child_io(bool a_finalize)
731 {
732 	estring lstr;
733 	bool io_flag = false;
734 
735 	while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
736 		int r;
737 		const int buffer_size = 128;
738 		char buffer[buffer_size] = { 0 };
739 
740 		r = m_exec.out_read(buffer, buffer_size);
741 		if (r > 0) {
742 			int c;
743 
744 			io_flag = true;
745 			for (c = 0; c < r; ++c) {
746 				if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
747 					lstr = prefix();
748 					lstr += m_io_out;
749 					lstr += "\n";
750 					logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
751 					mf_process_report(lstr);
752 					m_io_out.erase();
753 				}
754 				else {
755 					m_io_out += buffer[c];
756 				}
757 			}
758 		}
759 	}
760 	if (a_finalize && (m_io_out.size() > 0)) {
761 		lstr = prefix();
762 		lstr += m_io_out;
763 		lstr += "\n";
764 		logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
765 		mf_process_report(lstr);
766 		m_io_out.erase();
767 	}
768 
769 	while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
770 		int r;
771 		const int buffer_size = 128;
772 		char buffer[buffer_size] = { 0 };
773 
774 		r = m_exec.err_read(buffer, buffer_size);
775 		if (r > 0) {
776 			int c;
777 
778 			io_flag = true;
779 			for (c = 0; c < r; ++c) {
780 				if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
781 					lstr = prefix();
782 					lstr += m_io_err;
783 					lstr += "\n";
784 					logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
785 					mf_process_report(lstr);
786 					m_io_err.erase();
787 				}
788 				else {
789 					m_io_err += buffer[c];
790 				}
791 			}
792 		}
793 	}
794 	if (a_finalize && (m_io_err.size() > 0)) {
795 		lstr = prefix();
796 		lstr += m_io_err;
797 		lstr += "\n";
798 		logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
799 		mf_process_report(lstr);
800 		m_io_err.erase();
801 	}
802 	if (!io_flag)
803 		sleep(config.io_poll_interval());
804 }
805 
806 /** Trim off all non-digit leading and trailing characters from a string */
mf_trim_string(std::string & a_str)807 void job_archiver::mf_trim_string(std::string& a_str)
808 {
809 	while ((a_str.size() > 0) && (!isdigit(a_str[0])))
810 		a_str.erase(0,1);
811 	while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
812 		a_str.erase(a_str.size()-1,1);
813 }
814 
815 /** Parse I/O from rsync
816 
817 	Search for special output from rsync to tell us something about the number
818 	and size of files and files transfered.
819 
820  */
mf_parse_rsync_io(const std::string a_str,uint64 & a_files_total,uint64 & a_files_xferd,uint64 & a_size_total,uint64 & a_size_xferd)821 void job_archiver::mf_parse_rsync_io(
822 	const std::string a_str,
823 	uint64& a_files_total,
824 	uint64& a_files_xferd,
825 	uint64& a_size_total,
826 	uint64& a_size_xferd
827 	)
828 {
829 	estring estr;
830 
831 	if (a_str.find("Number of files: ") == 0) {
832 		estr = a_str;
833 		mf_trim_string(estr);
834 		try {
835 			a_files_total = estr;
836 		}
837 		catch(error e) {
838 			estring es;
839 
840 			es = "Could not parse total number of files processed by rsync";
841 			e.push_back(ERROR_INSTANCE(es));
842 
843 			// Not sure this is the best way to handle this...
844 			std::cerr << e;
845 		}
846 		catch(...) {
847 			error e = err_unknown;
848 			estring es;
849 
850 			es = "Could not parse total number of files processed by rsync";
851 			e.push_back(ERROR_INSTANCE(es));
852 
853 			// Not sure this is the best way to handle this...
854 			std::cerr << e;
855 		}
856 	}
857 	else if (a_str.find("Number of files transferred: ") == 0) {
858 		estr = a_str;
859 		mf_trim_string(estr);
860 		try {
861 			a_files_xferd = estr;
862 		}
863 		catch(error e) {
864 			estring es;
865 
866 			es = "Could not parse total number of files transferred by rsync";
867 			e.push_back(ERROR_INSTANCE(es));
868 
869 			// Not sure this is the best way to handle this...
870 			std::cerr << e;
871 		}
872 		catch(...) {
873 			error e = err_unknown;
874 			estring es;
875 
876 			es = "Could not parse total number of files transferred by rsync";
877 			e.push_back(ERROR_INSTANCE(es));
878 
879 			// Not sure this is the best way to handle this...
880 			std::cerr << e;
881 		}
882 	}
883 	else if (a_str.find("Total file size: ") == 0) {
884 		estr = a_str;
885 		mf_trim_string(estr);
886 		try {
887 			a_size_total = estr;
888 		}
889 		catch(error e) {
890 			estring es;
891 
892 			es = "Could not parse total size of files processed by rsync";
893 			e.push_back(ERROR_INSTANCE(es));
894 
895 			// Not sure this is the best way to handle this...
896 			std::cerr << e;
897 		}
898 		catch(...) {
899 			error e = err_unknown;
900 			estring es;
901 
902 			es = "Could not parse total size of files processed by rsync";
903 			e.push_back(ERROR_INSTANCE(es));
904 
905 			// Not sure this is the best way to handle this...
906 			std::cerr << e;
907 		}
908 	}
909 	else if (a_str.find("Total transferred file size: ") == 0) {
910 		estr = a_str;
911 		mf_trim_string(estr);
912 		try {
913 			a_size_xferd = estr;
914 		}
915 		catch(error e) {
916 			estring es;
917 
918 			es = "Could not parse total size of files transferred by rsync";
919 			e.push_back(ERROR_INSTANCE(es));
920 
921 			// Not sure this is the best way to handle this...
922 			std::cerr << e;
923 		}
924 		catch(...) {
925 			error e = err_unknown;
926 			estring es;
927 
928 			es = "Could not parse total size of files transferred by rsync";
929 			e.push_back(ERROR_INSTANCE(es));
930 
931 			// Not sure this is the best way to handle this...
932 			std::cerr << e;
933 		}
934 	}
935 }
936 
937 /** Process I/O from rsync
938 
939 	If there is I/O from rsync to be read, read it and then send it through the
940 	parser.
941 
942  */
mf_process_rsync_io(execute & a_exec,uint16 a_timeout,uint64 & a_files_total,uint64 & a_files_xferd,uint64 & a_size_total,uint64 & a_size_xferd,bool & a_overflow_detected)943 void job_archiver::mf_process_rsync_io(
944 	execute& a_exec,
945 	uint16 a_timeout,
946 	uint64& a_files_total,
947 	uint64& a_files_xferd,
948 	uint64& a_size_total,
949 	uint64& a_size_xferd,
950 	bool& a_overflow_detected
951 	)
952 {
953 	size_t ro;
954 	size_t re;
955 	estring out;
956 	estring err;
957 	bool io_flag;
958 	char buffer[1024] = { 0 };
959 	char *ptr;
960 
961 	ro = 1;
962 	re = 1;
963 	while ((ro != 0) || (re != 0) || a_exec.child_running()) {
964 		io_flag = false;
965 		ro = 0;
966 		re = 0;
967 
968 		if (!a_overflow_detected) {
969 			a_overflow_detected = vaulter.overflow();
970 		}
971 
972 		m_error_msg.erase();
973 
974 		if (a_exec.out_ready()) {
975 			ro = read(a_exec.out_fd(), buffer, 1024);
976 			if (ro > 0) {
977 				io_flag = true;
978 				for (ptr = buffer; ptr != buffer+ro; ++ptr) {
979 					if ((*ptr != '\r') && (*ptr != '\n')) {
980 						out += *ptr;
981 					}
982 					else {
983 						reportio().write_rsync_out(out);
984 						out.erase();
985 					}
986 				}
987 			}
988 		}
989 
990 		if (a_exec.err_ready()) {
991 			re = read(a_exec.err_fd(), buffer, 1024);
992 			if (re > 0) {
993 				io_flag = true;
994 				for (ptr = buffer; ptr != buffer+re; ++ptr) {
995 					if ((*ptr != '\r') && (*ptr != '\n')) {
996 						err += *ptr;
997 					}
998 					else {
999 						reportio().write_rsync_err(err);
1000 						err.erase();
1001 					}
1002 				}
1003 			}
1004 		}
1005 
1006 		m_io_timer.stop();
1007 // std::cerr << "[DEBUG]: --------------------------" << std::endl;
1008 // std::cerr << "[DEBUG]: m_io_timer.start_value() = " << m_io_timer.start_value() << std::endl;
1009 // std::cerr << "[DEBUG]:  m_io_timer.stop_value() = " << m_io_timer.stop_value() << std::endl;
1010 // std::cerr << "[DEBUG]:                  time(0) = " << time(0) << std::endl;
1011 // std::cerr << "[DEBUG]:    m_io_timer.duration() = " << m_io_timer.duration_secs() << std::endl;
1012 // std::cerr << "[DEBUG]:               difference = " << (time(0) - m_io_timer.start_value()) << std::endl;
1013 // std::cerr << "[DEBUG]:                  timeout = " << a_timeout << std::endl;
1014 // std::cerr << "[DEBUG]: io_flag = " << io_flag << std::endl;
1015 // std::cerr << "[DEBUG]: m_rsync_timeout_flag = " << m_rsync_timeout_flag << std::endl;
1016 		if (io_flag) {
1017 			m_io_timer.stop();
1018 			m_io_timer.start();
1019 			m_rsync_timeout_flag = false;
1020 		}
1021 		if (!m_rsync_timeout_flag && (m_io_timer.duration_secs() > a_timeout)) {
1022 			std::cerr << "*** Rsync program inactivity timeout" << std::endl;
1023 			a_exec.kill_child();
1024 			m_rsync_timeout_flag = true;
1025 		}
1026 
1027 		if (!io_flag)
1028 			sleep(config.io_poll_interval());
1029 
1030 	}
1031 	if (out.size() > 0) {
1032 		std::cerr << out << std::endl;
1033 		mf_parse_rsync_io(
1034 			out,
1035 			a_files_total,
1036 			a_files_xferd,
1037 			a_size_total,
1038 			a_size_xferd
1039 		);
1040 		out.erase();
1041 	}
1042 	if (err.size() > 0) {
1043 		std::cerr << err << std::endl;
1044 		mf_parse_rsync_io(
1045 			out,
1046 			a_files_total,
1047 			a_files_xferd,
1048 			a_size_total,
1049 			a_size_xferd
1050 		);
1051 		err.erase();
1052 	}
1053 }
1054 
1055 //-----------------------------------------------------------------------------
1056 
1057 /** C'tor */
archive_manager()1058 archive_manager::archive_manager()
1059 {
1060 	if (this != &archiver)
1061 		throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
1062 
1063 	clear();
1064 }
1065 
1066 /** Clear the archive manager and clear the job list */
clear(void)1067 void archive_manager::clear(void)
1068 {
1069 	m_jobs.clear();
1070 	m_initialized = false;
1071 }
1072 
1073 /** Initialize the archive manager
1074 
1075 	Log the archive timestamp, select and prepare a vault.
1076 
1077  */
init(void)1078 void archive_manager::init(void)
1079 {
1080 	timer t;
1081 	estring lstr;
1082 
1083 	lstr = "Archive Manager - Beginning initialization\n";
1084 	logger.write(lstr);
1085 	t.start();
1086 
1087 	lstr = "Timestamp: ";
1088 	lstr += config.timestamp().str();
1089 	lstr += "\n";
1090 	logger.write(lstr);
1091 
1092 	// Select a vault?
1093 	vaulter.select();
1094 	lstr = "Vault selected: ";
1095 	lstr += vaulter.vault();
1096 	lstr += "\n";
1097 	logger.write(lstr);
1098 
1099 	reporter.vault().add_report(
1100 		vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
1101 		);
1102 
1103 	// Prepare the vault?
1104 	vaulter.prepare();
1105 
1106 	t.stop();
1107 	lstr = "Archive Manager - Finished initialization";
1108 	lstr += ", duration: ";
1109 	lstr += t.duration();
1110 	lstr += "\n";
1111 	logger.write(lstr);
1112 
1113 	m_initialized = true;
1114 }
1115 
1116 /** Return the initialized status of the archive manager */
initialized(void) const1117 const bool archive_manager::initialized(void) const
1118 {
1119 	return(m_initialized);
1120 }
1121 
1122 /** Give a status report
1123 
1124 	After so many minutes of inactivity write a report to the log file of our
1125 	current status of affairs.
1126 
1127  */
mf_log_status(void)1128 void archive_manager::mf_log_status(void)
1129 {
1130 	static timer t;
1131 	const timer::value_type timeout = 30;
1132 	estring lstr;
1133 	std::vector<job_archiver*>::const_iterator ji;
1134 	uint64 jobs_pending = 0;
1135 	uint64 jobs_processing = 0;
1136 	uint64 jobs_completed =0;
1137 	uint64 jobs_remaining = 0;
1138 
1139 	if (!t.is_started())
1140 		t.start();
1141 
1142 	t.stop();
1143 	if (t.duration_mins() < timeout)
1144 		return;
1145 
1146 	lstr  = "\n";
1147 	lstr += "STATUS REPORT:\n";
1148 	lstr += "================================================================\n";
1149 	logger.write(lstr);
1150 	for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
1151 		lstr = "[";
1152 		switch ((*ji)->status()) {
1153 			case job_archiver::status_pending:
1154 				lstr += "Pending    ";
1155 				++jobs_pending;
1156 				break;
1157 			case job_archiver::status_processing:
1158 				lstr += "Processing ";
1159 				++jobs_processing;
1160 				break;
1161 			case job_archiver::status_reschedule:
1162 				lstr += "Reschedule ";
1163 				++jobs_pending;
1164 				break;
1165 			case job_archiver::status_fatal_error:
1166 				lstr += "Fatal Error";
1167 				++jobs_completed;
1168 				break;
1169 			case job_archiver::status_error:
1170 				lstr += "Error      ";
1171 				++jobs_completed;
1172 				break;
1173 			case job_archiver::status_completed:
1174 				lstr += "Completed  ";
1175 				++jobs_completed;
1176 				break;
1177 			case job_archiver::status_done:
1178 				lstr += "Done       ";
1179 				++jobs_completed;
1180 				break;
1181 			default:
1182 				lstr += "<unknown>  ";
1183 				break;
1184 		}
1185 		lstr += "]: ";
1186 		lstr += (*ji)->id();
1187 		lstr += "\n";
1188 		logger.write(lstr);
1189 	}
1190 
1191 	lstr  = "---------------------------------------------------------------\n";
1192 	lstr += "     Jobs Pending: ";
1193 	lstr += estring(jobs_pending);
1194 	lstr += "\n";
1195 
1196 	lstr += "  Jobs Processing: ";
1197 	lstr += estring(jobs_processing);
1198 	lstr += "\n";
1199 
1200 	lstr += "   Jobs Completed: ";
1201 	lstr += estring(jobs_completed);
1202 	lstr += "\n";
1203 
1204 	lstr += "            Total: ";
1205 	lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
1206 	lstr += "\n";
1207 
1208 	lstr += "Overflow Detected: ";
1209 	if (vaulter.overflow()) {
1210 		lstr += "TRUE";
1211 	}
1212 	else {
1213 		lstr += "false";
1214 	}
1215 	lstr += "\n";
1216 
1217 	logger.write(lstr);
1218 	logger.write("\n");
1219 	t.start();
1220 }
1221 
1222 /** Archive jobs
1223 
1224 	Create an archive directory.  Generate a to-do list of job archiver objects.
1225 	Process the job archiver objects:
1226 	- While there are less than rsync-parallel job archivers processing, start
1227 		the first available job archiver.
1228 	- Check the status of each job and process I/O from jobs underway.
1229 	- Remove jobs that failed to start.
1230 	- Possibly reschedule failed jobs.
1231 	- Remove completed jobs from active processing.
1232 	- Call mf_log_status().
1233 
1234  */
archive(void)1235 void archive_manager::archive(void)
1236 {
1237 	timer t;
1238 	estring lstr;
1239 	configuration_manager::jobs_type::const_iterator cji;
1240 	int num_processing = 0;
1241 	std::vector<job_archiver*>::iterator ji;
1242 	uint64 num_completed = 0;
1243 	bool overflow_detected = false;
1244 	estring debug_estr;
1245 
1246 	if (!initialized())
1247 		throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
1248 
1249 	lstr = "Archive Manager - Begin archiving\n";
1250 	logger.write(lstr);
1251 	t.start();
1252 
1253 	// Create archive directory
1254 	try {
1255 		if (exists(archive_path())) {
1256 			lstr = "Found existing archive directory: ";
1257 			lstr += archive_path();
1258 			lstr += "\n";
1259 			logger.write(lstr);
1260 
1261 			if (!writable(archive_path())) {
1262 				std::string es;
1263 
1264 				TRY_nomem(es = "Cannot write to archive directory: \"");
1265 				TRY_nomem(es += archive_path());
1266 				TRY_nomem(es += "\"");
1267 
1268 				throw(ERROR(EACCES,es));
1269 			}
1270 
1271 			rename_file(archive_path(), working_archive_path());
1272 		}
1273 		else if (exists(working_archive_path())) {
1274 			lstr = "Found existing archive directory: ";
1275 			lstr += working_archive_path();
1276 			lstr += "\n";
1277 			logger.write(lstr);
1278 
1279 			if (!writable(working_archive_path())) {
1280 				std::string es;
1281 
1282 				TRY_nomem(es = "Cannot write to working archive directory: \"");
1283 				TRY_nomem(es += working_archive_path());
1284 				TRY_nomem(es += "\"");
1285 
1286 				throw(ERROR(EACCES,es));
1287 			}
1288 		}
1289 		else {
1290 			lstr = "Creating archive directory: ";
1291 			lstr += working_archive_path();
1292 			lstr += "\n";
1293 			logger.write(lstr);
1294 			mk_dir(working_archive_path());
1295 		}
1296 	}
1297 	catch(error e) {
1298 		logger.write("An error has occured: ");
1299 		logger.write(e[0].what());
1300 		logger.write("\n");
1301 		throw(e);
1302 	}
1303 	catch(...) {
1304 		error e = err_unknown;
1305 
1306 		logger.write("An error has occured: ");
1307 		logger.write(e[0].what());
1308 		logger.write("\n");
1309 		throw(e);
1310 	}
1311 
1312 	// Create a todo list
1313 	logger.write("Creating to-do list\n");
1314 	for (
1315 		cji = config.jobs().begin();
1316 		cji != config.jobs().end();
1317 		++cji
1318 		)
1319 	{
1320 		job_archiver* ptr;
1321 
1322 		ptr = new job_archiver(&(*cji));
1323 		if (ptr == 0)
1324 			throw(err_nomem);
1325 		TRY_nomem(m_jobs.push_back(ptr));
1326 	}
1327 
1328 	// Backup clients
1329 	logger.write("Processing jobs...\n");
1330 	while (m_jobs.size() > num_completed) {
1331 
1332 		/*
1333 		logger.write("[DEBUG]: ---[ TOP OF LOOP ]---\n");
1334 
1335 		debug_estr = "[DEBUG]: overflow_detected = ";
1336 		debug_estr += estring(overflow_detected);
1337 		debug_estr += "\n";
1338 		logger.write(debug_estr);
1339 
1340 		debug_estr = "[DEBUG]: num_processing = ";
1341 		debug_estr += estring(num_processing);
1342 		debug_estr += "\n";
1343 		logger.write(debug_estr);
1344 		*/
1345 
1346 		if (!overflow_detected) {
1347 			overflow_detected = vaulter.overflow(true);
1348 			/*
1349 			if (overflow_detected) {
1350 				logger.write("[DEBUG]: Variable Change :: ");
1351 				logger.write("overflow_detected = true");
1352 				logger.write("\n");
1353 			}
1354 			*/
1355 		}
1356 
1357 		// If the vault has exceeded it's highwater mark, wait for the
1358 		// currently-processing jobs to terminate, and then attempt to prepare the
1359 		// vault.
1360 		if (overflow_detected && (num_processing == 0)) {
1361 			TRY(vaulter.prepare(true),"Cannot complete archive");
1362 			overflow_detected = vaulter.overflow();
1363 			/*
1364 			if (!overflow_detected) {
1365 				logger.write("[DEBUG]: Variable Change :: ");
1366 				logger.write("overflow_detected = false");
1367 				logger.write("\n");
1368 			}
1369 			*/
1370 		}
1371 
1372 		// For each job in the list...
1373 		for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
1374 		{
1375 			// While we're running less than rsync-parallel jobs, start new ones
1376 			if (
1377 				!overflow_detected
1378 				&& (num_processing < config.rsync_parallel())
1379 				&& ((*ji)->status() == job_archiver::status_pending)
1380 				)
1381 			{
1382 				try {
1383 					(*ji)->start();
1384 				}
1385 				catch(error e) {
1386 					if (e.num() == 12) {
1387 						lstr = "Error starting job: Out of memory, will retry later\n";
1388 						(*ji)->clear();
1389 					}
1390 					else {
1391 						(*ji)->end();
1392 						lstr = "Error starting job, aborting\n";
1393 						// logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1394 						num_processing--;
1395 						reporter.jobs().add_report((*ji)->report());
1396 					}
1397 					logger.write(lstr);
1398 				}
1399 				catch(...) {
1400 					(*ji)->end();
1401 					lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
1402 					lstr += "-- JOB TERMINATED\n";
1403 					logger.write(lstr);
1404 					// logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1405 					num_processing--;
1406 					reporter.jobs().add_report((*ji)->report());
1407 				}
1408 				// logger.write("[DEBUG]: Variable Change :: num_processing++\n");
1409 				num_processing++;
1410 			}
1411 
1412 			// Process jobs
1413 			if ((*ji)->status() == job_archiver::status_processing) {
1414 				try {
1415 					(*ji)->process();
1416 				}
1417 				catch(error e) {
1418 					// TODO: Change 12 to ENOMEM?
1419 					if (e.num() == 12) {
1420 						lstr  = "Error starting job: Out of memory, will retry later\n";
1421 						(*ji)->clear();
1422 					}
1423 					else {
1424 						(*ji)->end();
1425 						lstr = "Error starting job, aborting\n";
1426 						// logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1427 						num_processing--;
1428 						reporter.jobs().add_report((*ji)->report());
1429 					}
1430 					logger.write(lstr);
1431 				}
1432 				catch(...) {
1433 					(*ji)->end();
1434 					lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
1435 					lstr += "-- JOB TERMINATED\n";
1436 					logger.write(lstr);
1437 					// logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1438 					num_processing--;
1439 					reporter.jobs().add_report((*ji)->report());
1440 				}
1441 			}
1442 
1443 			// Remove jobs that could not start from active duty
1444 			if ((*ji)->status() == job_archiver::status_reschedule) {
1445 				(*ji)->clear();
1446 				// logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1447 				num_processing--;
1448 			}
1449 
1450 			// If a job exited with an error, and the vault is full, then reschedule
1451 			// the job for later
1452 			if (
1453 					((*ji)->status() == job_archiver::status_error)
1454 				&&
1455 					overflow_detected
1456 				)
1457 			{
1458 				lstr = "Vault overflow detected, will retry job later\n";
1459 				logger.write(lstr);
1460 				(*ji)->clear();
1461 				num_processing--;
1462 			}
1463 
1464 			// Remove completed jobs from the processing list
1465 			if (
1466 				((*ji)->status() == job_archiver::status_completed)
1467 				|| ((*ji)->status() == job_archiver::status_fatal_error)
1468 				|| ((*ji)->status() == job_archiver::status_error)
1469 				) {
1470 				(*ji)->end();
1471 				// logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1472 				num_processing--;
1473 				num_completed++;
1474 
1475 				// logger.write("Adding job report to report manager\n");
1476 				reporter.jobs().add_report((*ji)->report());
1477 			}
1478 		}
1479 
1480 		mf_log_status();
1481 		sleep(1);
1482 
1483 		// logger.write("[DEBUG]: ---[ BOTTOM OF LOOP ]---\n");
1484 	}
1485 
1486 	t.stop();
1487 	lstr = "Archive Manager - Finished archiving, duration: ";
1488 	lstr += t.duration();
1489 	lstr += "\n";
1490 	logger.write(lstr);
1491 
1492 	lstr = "Archive Manager - Finalizing archive path\n";
1493 	logger.write(lstr);
1494 	TRY(
1495 		rename_file(working_archive_path(), archive_path()),
1496 		"Cannot finalize archive"
1497 		);
1498 
1499 	reporter.vault().add_report(
1500 		vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
1501 		);
1502 }
1503 
1504 /** Return an absolute path to the finished archive directory */
archive_path(void) const1505 const std::string archive_manager::archive_path(void) const
1506 {
1507 	estring path;
1508 
1509 	if (!initialized())
1510 		throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
1511 
1512 	path = vaulter.vault();
1513 	path += "/";
1514 	path += config.timestamp().str();
1515 
1516 	return(path);
1517 }
1518 
1519 /** Return the absolute path to the unfinished working archive directory */
working_archive_path(void) const1520 const std::string archive_manager::working_archive_path(void) const
1521 {
1522 	estring path;
1523 
1524 	if (!initialized())
1525 		throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
1526 
1527 	path = archive_path();
1528 	path += ".incomplete";
1529 
1530 	return(path);
1531 }
1532 
1533 //-----------------------------------------------------------------------------
1534 
1535 /** The global archive manager */
1536 archive_manager archiver;
1537 
1538