1 /*
2 BAREOS® - Backup Archiving REcovery Open Sourced
3
4 Copyright (C) 2013-2019 Bareos GmbH & Co. KG
5
6 This program is Free Software; you can redistribute it and/or
7 modify it under the terms of version three of the GNU Affero General Public
8 License as published by the Free Software Foundation and included
9 in the file LICENSE.
10
11 This program is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Affero General Public License for more details.
15
16 You should have received a copy of the GNU Affero General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 02110-1301, USA.
20 */
21 /**
22 * @file
23 * Storage Daemon statistics gatherer.
24 *
25 * Written by Marco van Wieringen and Philipp Storz, November 2013
26 */
27
28 #include "include/bareos.h"
29 #include "stored/stored.h"
30 #include "stored/stored_globals.h"
31 #include "stored/jcr_private.h"
32 #include "lib/util.h"
33 #include "include/jcr.h"
34 #include "lib/parse_conf.h"
35 #include "lib/bsock.h"
36
37 namespace storagedaemon {
38
39 static char OKstats[] = "2000 OK statistics\n";
40 static char DevStats[] =
41 "Devicestats [%lld]: Device=%s Read=%llu, Write=%llu, SpoolSize=%llu, "
42 "NumWaiting=%ld, NumWriters=%ld, "
43 "ReadTime=%lld, WriteTime=%lld, MediaId=%ld, VolBytes=%llu, VolFiles=%llu, "
44 "VolBlocks=%llu\n";
45 static char TapeAlerts[] = "Tapealerts [%lld]: Device=%s TapeAlert=%llu\n";
46 static char JobStats[] =
47 "Jobstats [%lld]: JobId=%ld, JobFiles=%lu, JobBytes=%llu, DevName=%s\n";
48
49 /* Static globals */
50 static bool quit = false;
51 static bool statistics_initialized = false;
52 static pthread_t statistics_tid;
53 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
54 static pthread_cond_t wait_for_next_run = PTHREAD_COND_INITIALIZER;
55
56 struct device_statistic {
57 dlink link;
58 bool collected{false};
59 utime_t timestamp{0};
60 btime_t DevReadTime{0};
61 btime_t DevWriteTime{0};
62 uint64_t DevWriteBytes{0};
63 uint64_t DevReadBytes{0};
64 uint64_t spool_size{0};
65 int num_waiting{0};
66 int num_writers{0};
67 DBId_t MediaId{0};
68 uint64_t VolCatBytes{0};
69 uint64_t VolCatFiles{0};
70 uint64_t VolCatBlocks{0};
71 };
72
73 struct device_tapealert {
74 dlink link;
75 utime_t timestamp{0};
76 uint64_t flags{0};
77 };
78
79 struct device_statistics {
80 dlink link;
81 char DevName[MAX_NAME_LENGTH]{};
82 struct device_statistic* cached{nullptr};
83 dlist* statistics{nullptr};
84 dlist* tapealerts{nullptr};
85 };
86
87 struct job_statistic {
88 dlink link;
89 bool collected{false};
90 utime_t timestamp{0};
91 uint32_t JobFiles{0};
92 uint64_t JobBytes{0};
93 char* DevName{nullptr};
94 };
95
96 struct job_statistics {
97 dlink link;
98 uint32_t JobId{0};
99 struct job_statistic* cached{nullptr};
100 dlist* statistics{nullptr};
101 };
102
103 static dlist* device_statistics = NULL;
104 static dlist* job_statistics = NULL;
105
setup_statistics()106 static inline void setup_statistics()
107 {
108 struct device_statistics* dev_stats = NULL;
109 struct job_statistics* job_stats = NULL;
110
111 device_statistics = new dlist(dev_stats, &dev_stats->link);
112 job_statistics = new dlist(job_stats, &job_stats->link);
113 }
114
UpdateDeviceTapealert(const char * devname,uint64_t flags,utime_t now)115 void UpdateDeviceTapealert(const char* devname, uint64_t flags, utime_t now)
116 {
117 bool found = false;
118 struct device_statistics* dev_stats = NULL;
119 struct device_tapealert* tape_alert = NULL;
120
121 if (!me || !me->collect_dev_stats || !device_statistics) { return; }
122
123 foreach_dlist (dev_stats, device_statistics) {
124 if (bstrcmp(dev_stats->DevName, devname)) {
125 found = true;
126 break;
127 }
128 }
129
130 if (!found) {
131 dev_stats =
132 (struct device_statistics*)malloc(sizeof(struct device_statistics));
133 struct device_statistics empty_device_statistics;
134 *dev_stats = empty_device_statistics;
135
136 bstrncpy(dev_stats->DevName, devname, sizeof(dev_stats->DevName));
137 P(mutex);
138 device_statistics->append(dev_stats);
139 V(mutex);
140 }
141
142 /*
143 * Add a new tapealert message.
144 */
145 tape_alert =
146 (struct device_tapealert*)malloc(sizeof(struct device_tapealert));
147 struct device_tapealert empty_device_tapealert;
148 *tape_alert = empty_device_tapealert;
149
150 tape_alert->timestamp = now;
151 tape_alert->flags = flags;
152
153 if (!dev_stats->tapealerts) {
154 dev_stats->tapealerts = new dlist(tape_alert, &tape_alert->link);
155 }
156
157 P(mutex);
158 dev_stats->tapealerts->append(tape_alert);
159 V(mutex);
160
161 Dmsg3(200, "New stats [%lld]: Device %s TapeAlert %llu\n",
162 tape_alert->timestamp, dev_stats->DevName, tape_alert->flags);
163 }
164
UpdateDeviceStatistics(const char * devname,Device * dev,utime_t now)165 static inline void UpdateDeviceStatistics(const char* devname,
166 Device* dev,
167 utime_t now)
168 {
169 bool found = false;
170 struct device_statistics* dev_stats = NULL;
171 struct device_statistic* dev_stat = NULL;
172
173 if (!me || !me->collect_dev_stats || !device_statistics) { return; }
174
175 /*
176 * See if we already have statistics for this device.
177 */
178 foreach_dlist (dev_stats, device_statistics) {
179 if (bstrcmp(dev_stats->DevName, devname)) {
180 found = true;
181 break;
182 }
183 }
184
185 /*
186 * If we have statistics and the cached entry is filled it points
187 * to the latest sampled statistics so we compare them with the current
188 * statistics and if nothing changed we just return.
189 */
190 if (found && dev_stats->cached) {
191 dev_stat = dev_stats->cached;
192
193 if (dev_stat->DevReadBytes == dev->DevReadBytes &&
194 dev_stat->DevWriteBytes == dev->DevWriteBytes &&
195 dev_stat->spool_size == dev->spool_size) {
196 return;
197 }
198 } else if (!found) {
199 dev_stats =
200 (struct device_statistics*)malloc(sizeof(struct device_statistics));
201 struct device_statistics empty_device_statistics;
202 *dev_stats = empty_device_statistics;
203
204 bstrncpy(dev_stats->DevName, devname, sizeof(dev_stats->DevName));
205 P(mutex);
206 device_statistics->append(dev_stats);
207 V(mutex);
208 }
209
210 /*
211 * Add a new set of statistics.
212 */
213 dev_stat = (struct device_statistic*)malloc(sizeof(struct device_statistic));
214
215 struct device_statistic empty_device_statistic;
216 *dev_stat = empty_device_statistic;
217
218 dev_stat->timestamp = now;
219 dev_stat->DevReadTime = dev->DevReadTime;
220 dev_stat->DevWriteTime = dev->DevWriteTime;
221 dev_stat->DevWriteBytes = dev->DevWriteBytes;
222 dev_stat->DevReadBytes = dev->DevReadBytes;
223 dev_stat->spool_size = dev->spool_size;
224 dev_stat->num_waiting = dev->num_waiting;
225 dev_stat->num_writers = dev->num_writers;
226 dev_stat->MediaId = dev->VolCatInfo.VolMediaId;
227 dev_stat->VolCatBytes = dev->VolCatInfo.VolCatBytes;
228 dev_stat->VolCatFiles = dev->VolCatInfo.VolCatFiles;
229 dev_stat->VolCatBlocks = dev->VolCatInfo.VolCatBlocks;
230
231
232 if (!dev_stats->statistics) {
233 dev_stats->statistics = new dlist(dev_stat, &dev_stat->link);
234 }
235
236 P(mutex);
237 dev_stats->cached = dev_stat;
238 dev_stats->statistics->append(dev_stat);
239 V(mutex);
240
241 Dmsg5(200,
242 "New stats [%lld]: Device %s Read %llu, Write %llu, Spoolsize %llu,\n",
243 dev_stat->timestamp, dev_stats->DevName, dev_stat->DevReadBytes,
244 dev_stat->DevWriteBytes, dev_stat->spool_size);
245 Dmsg4(200, "NumWaiting %ld, NumWriters %ld, ReadTime=%lld, WriteTime=%lld,\n",
246 dev_stat->num_waiting, dev_stat->num_writers, dev_stat->DevReadTime,
247 dev_stat->DevWriteTime);
248 Dmsg4(200, "MediaId=%ld VolBytes=%llu, VolFiles=%llu, VolBlocks=%llu\n",
249 dev_stat->MediaId, dev_stat->VolCatBytes, dev_stat->VolCatFiles,
250 dev_stat->VolCatBlocks);
251 }
252
UpdateJobStatistics(JobControlRecord * jcr,utime_t now)253 void UpdateJobStatistics(JobControlRecord* jcr, utime_t now)
254 {
255 bool found = false;
256 struct job_statistics* job_stats = NULL;
257 struct job_statistic* job_stat = NULL;
258
259 if (!me || !me->collect_job_stats || !job_statistics) { return; }
260
261 /*
262 * Skip job 0 info
263 */
264 if (!jcr->JobId) { return; }
265
266 /*
267 * See if we already have statistics for this job.
268 */
269 foreach_dlist (job_stats, job_statistics) {
270 if (job_stats->JobId == jcr->JobId) {
271 found = true;
272 break;
273 }
274 }
275
276 /*
277 * If we have statistics and the cached entry is filled it points
278 * to the latest sampled statistics so we compare them with the current
279 * statistics and if nothing changed we just return.
280 */
281 if (found && job_stats->cached) {
282 job_stat = job_stats->cached;
283
284 if (job_stat->JobFiles == jcr->JobFiles &&
285 job_stat->JobBytes == jcr->JobBytes) {
286 return;
287 }
288 } else if (!found) {
289 job_stats = (struct job_statistics*)malloc(sizeof(struct job_statistics));
290 struct job_statistics empty_job_statistics;
291 *job_stats = empty_job_statistics;
292
293 job_stats->JobId = jcr->JobId;
294 P(mutex);
295 job_statistics->append(job_stats);
296 V(mutex);
297 }
298
299 /*
300 * Add a new set of statistics.
301 */
302 job_stat = (struct job_statistic*)malloc(sizeof(struct job_statistic));
303 struct job_statistic empty_job_statistic;
304 *job_stat = empty_job_statistic;
305
306 job_stat->timestamp = now;
307 job_stat->JobFiles = jcr->JobFiles;
308 job_stat->JobBytes = jcr->JobBytes;
309 if (jcr->impl->dcr && jcr->impl->dcr->device) {
310 job_stat->DevName = strdup(jcr->impl->dcr->device->resource_name_);
311 } else {
312 job_stat->DevName = strdup("unknown");
313 }
314
315 if (!job_stats->statistics) {
316 job_stats->statistics = new dlist(job_stat, &job_stat->link);
317 }
318
319 P(mutex);
320 job_stats->cached = job_stat;
321 job_stats->statistics->append(job_stat);
322 V(mutex);
323
324 Dmsg5(
325 200,
326 "New stats [%lld]: JobId %ld, JobFiles %lu, JobBytes %llu, DevName %s\n",
327 job_stat->timestamp, job_stats->JobId, job_stat->JobFiles,
328 job_stat->JobBytes, job_stat->DevName);
329 }
330
cleanup_cached_statistics()331 static inline void cleanup_cached_statistics()
332 {
333 struct device_statistics* dev_stats;
334 struct job_statistics* job_stats;
335
336 P(mutex);
337 if (device_statistics) {
338 foreach_dlist (dev_stats, device_statistics) {
339 dev_stats->statistics->destroy();
340 dev_stats->statistics = NULL;
341 }
342
343 device_statistics->destroy();
344 device_statistics = NULL;
345 }
346
347 if (job_statistics) {
348 foreach_dlist (job_stats, job_statistics) {
349 job_stats->statistics->destroy();
350 job_stats->statistics = NULL;
351 }
352
353 job_statistics->destroy();
354 job_statistics = NULL;
355 }
356 V(mutex);
357 }
358
359 /**
360 * Entry point for a separate statistics thread.
361 */
statistics_thread_runner(void * arg)362 extern "C" void* statistics_thread_runner(void* arg)
363 {
364 utime_t now;
365 struct timeval tv;
366 struct timezone tz;
367 struct timespec timeout;
368 DeviceResource* device;
369 JobControlRecord* jcr;
370
371 setup_statistics();
372
373 /*
374 * Do our work as long as we are not signaled to quit.
375 */
376 while (!quit) {
377 now = (utime_t)time(NULL);
378
379 if (me->collect_dev_stats) {
380 /*
381 * Loop over all defined devices.
382 */
383 foreach_res (device, R_DEVICE) {
384 if (device->collectstats) {
385 Device* dev;
386
387 dev = device->dev;
388 if (dev && dev->initiated) {
389 UpdateDeviceStatistics(device->resource_name_, dev, now);
390 }
391 }
392 }
393 }
394
395 if (me->collect_job_stats) {
396 /*
397 * Loop over all running Jobs in the Storage Daemon.
398 */
399 foreach_jcr (jcr) {
400 UpdateJobStatistics(jcr, now);
401 }
402 endeach_jcr(jcr);
403 }
404
405 /*
406 * Wait for a next run. Normally this waits exactly
407 * me->stats_collect_interval seconds. It can be interrupted when signaled
408 * by the StopStatisticsThread() function.
409 */
410 gettimeofday(&tv, &tz);
411 timeout.tv_nsec = tv.tv_usec * 1000;
412 timeout.tv_sec = tv.tv_sec + me->stats_collect_interval;
413
414 P(mutex);
415 pthread_cond_timedwait(&wait_for_next_run, &mutex, &timeout);
416 V(mutex);
417 }
418
419 /*
420 * Cleanup the cached statistics.
421 */
422 cleanup_cached_statistics();
423
424 return NULL;
425 }
426
StartStatisticsThread(void)427 int StartStatisticsThread(void)
428 {
429 int status;
430
431 /*
432 * First see if device and job stats collection is enabled.
433 */
434 if (!me->stats_collect_interval ||
435 (!me->collect_dev_stats && !me->collect_job_stats)) {
436 return 0;
437 }
438
439 /*
440 * See if only device stats collection is enabled that there is a least
441 * one device of which stats are collected.
442 */
443 if (me->collect_dev_stats && !me->collect_job_stats) {
444 DeviceResource* device;
445 int cnt = 0;
446
447 foreach_res (device, R_DEVICE) {
448 if (device->collectstats) { cnt++; }
449 }
450
451 if (cnt == 0) { return 0; }
452 }
453
454 if ((status = pthread_create(&statistics_tid, NULL, statistics_thread_runner,
455 NULL)) != 0) {
456 return status;
457 }
458
459 statistics_initialized = true;
460
461 return 0;
462 }
463
StopStatisticsThread()464 void StopStatisticsThread()
465 {
466 if (!statistics_initialized) { return; }
467
468 quit = true;
469 pthread_cond_broadcast(&wait_for_next_run);
470 if (!pthread_equal(statistics_tid, pthread_self())) {
471 pthread_join(statistics_tid, NULL);
472 }
473 }
474
StatsCmd(JobControlRecord * jcr)475 bool StatsCmd(JobControlRecord* jcr)
476 {
477 BareosSocket* dir = jcr->dir_bsock;
478 PoolMem msg(PM_MESSAGE);
479 PoolMem dev_tmp(PM_MESSAGE);
480
481 if (device_statistics) {
482 struct device_statistics* dev_stats;
483
484 foreach_dlist (dev_stats, device_statistics) {
485 if (dev_stats->statistics) {
486 struct device_statistic *dev_stat, *next_dev_stat;
487
488 dev_stat = (struct device_statistic*)dev_stats->statistics->first();
489 while (dev_stat) {
490 next_dev_stat =
491 (struct device_statistic*)dev_stats->statistics->next(dev_stat);
492
493 /*
494 * If the entry was already collected no need to do it again.
495 */
496 if (!dev_stat->collected) {
497 PmStrcpy(dev_tmp, dev_stats->DevName);
498 BashSpaces(dev_tmp);
499 Mmsg(msg, DevStats, dev_stat->timestamp, dev_tmp.c_str(),
500 dev_stat->DevReadBytes, dev_stat->DevWriteBytes,
501 dev_stat->spool_size, dev_stat->num_waiting,
502 dev_stat->num_writers, dev_stat->DevReadTime,
503 dev_stat->DevWriteTime, dev_stat->MediaId,
504 dev_stat->VolCatBytes, dev_stat->VolCatFiles,
505 dev_stat->VolCatBlocks);
506 Dmsg1(100, ">dird: %s", msg.c_str());
507 dir->fsend(msg.c_str());
508 }
509
510 P(mutex);
511 /*
512 * If this is the last one on the list leave it for comparison.
513 */
514 if (!next_dev_stat) {
515 dev_stat->collected = true;
516 } else {
517 dev_stats->statistics->remove(dev_stat);
518
519 if (dev_stats->cached == dev_stat) { dev_stats->cached = NULL; }
520 }
521 V(mutex);
522 dev_stat = next_dev_stat;
523 }
524 }
525
526 if (dev_stats->tapealerts) {
527 struct device_tapealert *tape_alert, *next_tape_alert;
528
529 tape_alert = (struct device_tapealert*)dev_stats->tapealerts->first();
530 while (tape_alert) {
531 PmStrcpy(dev_tmp, dev_stats->DevName);
532 BashSpaces(dev_tmp);
533 Mmsg(msg, TapeAlerts, tape_alert->timestamp, dev_tmp.c_str(),
534 tape_alert->flags);
535 Dmsg1(100, ">dird: %s", msg.c_str());
536 dir->fsend(msg.c_str());
537
538 next_tape_alert =
539 (struct device_tapealert*)dev_stats->tapealerts->next(tape_alert);
540 P(mutex);
541 dev_stats->tapealerts->remove(tape_alert);
542 V(mutex);
543 tape_alert = next_tape_alert;
544 }
545 }
546 }
547 }
548
549 if (job_statistics) {
550 bool found;
551 JobControlRecord* jcr;
552 struct job_statistics *job_stats, *next_job_stats;
553
554 job_stats = (struct job_statistics*)job_statistics->first();
555 while (job_stats) {
556 if (job_stats->statistics) {
557 struct job_statistic *job_stat, *next_job_stat;
558
559 job_stat = (struct job_statistic*)job_stats->statistics->first();
560 while (job_stat) {
561 next_job_stat =
562 (struct job_statistic*)job_stats->statistics->next(job_stat);
563
564 /*
565 * If the entry was already collected no need to do it again.
566 */
567 if (!job_stat->collected) {
568 PmStrcpy(dev_tmp, job_stat->DevName);
569 BashSpaces(dev_tmp);
570 Mmsg(msg, JobStats, job_stat->timestamp, job_stats->JobId,
571 job_stat->JobFiles, job_stat->JobBytes, dev_tmp.c_str());
572 Dmsg1(100, ">dird: %s", msg.c_str());
573 dir->fsend(msg.c_str());
574 }
575
576 P(mutex);
577 /*
578 * If this is the last one on the list leave it for comparison.
579 */
580 if (!next_job_stat) {
581 job_stat->collected = true;
582 } else {
583 job_stats->statistics->remove(job_stat);
584 if (job_stats->cached == job_stat) { job_stats->cached = NULL; }
585 }
586 V(mutex);
587 job_stat = next_job_stat;
588 }
589 }
590
591 /*
592 * If the Job doesn't exist anymore remove it from the job_statistics.
593 */
594 next_job_stats = (struct job_statistics*)job_statistics->next(job_stats);
595
596 found = false;
597 foreach_jcr (jcr) {
598 if (jcr->JobId == job_stats->JobId) {
599 found = true;
600 break;
601 }
602 }
603 endeach_jcr(jcr);
604
605 if (!found) {
606 P(mutex);
607 Dmsg1(200, "Removing jobid %d from job_statistics\n", job_stats->JobId);
608 job_statistics->remove(job_stats);
609 V(mutex);
610 }
611
612 job_stats = next_job_stats;
613 }
614 }
615 dir->fsend(OKstats);
616
617 return false;
618 }
619
620 } /* namespace storagedaemon */
621