1 /*
2    BAREOS® - Backup Archiving REcovery Open Sourced
3 
4    Copyright (C) 2013-2019 Bareos GmbH & Co. KG
5 
6    This program is Free Software; you can redistribute it and/or
7    modify it under the terms of version three of the GNU Affero General Public
8    License as published by the Free Software Foundation and included
9    in the file LICENSE.
10 
11    This program is distributed in the hope that it will be useful, but
12    WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14    Affero General Public License for more details.
15 
16    You should have received a copy of the GNU Affero General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19    02110-1301, USA.
20 */
21 /**
22  * @file
23  * Storage Daemon statistics gatherer.
24  *
25  * Written by Marco van Wieringen and Philipp Storz, November 2013
26  */
27 
28 #include "include/bareos.h"
29 #include "stored/stored.h"
30 #include "stored/stored_globals.h"
31 #include "stored/jcr_private.h"
32 #include "lib/util.h"
33 #include "include/jcr.h"
34 #include "lib/parse_conf.h"
35 #include "lib/bsock.h"
36 
37 namespace storagedaemon {
38 
39 static char OKstats[] = "2000 OK statistics\n";
40 static char DevStats[] =
41     "Devicestats [%lld]: Device=%s Read=%llu, Write=%llu, SpoolSize=%llu, "
42     "NumWaiting=%ld, NumWriters=%ld, "
43     "ReadTime=%lld, WriteTime=%lld, MediaId=%ld, VolBytes=%llu, VolFiles=%llu, "
44     "VolBlocks=%llu\n";
45 static char TapeAlerts[] = "Tapealerts [%lld]: Device=%s TapeAlert=%llu\n";
46 static char JobStats[] =
47     "Jobstats [%lld]: JobId=%ld, JobFiles=%lu, JobBytes=%llu, DevName=%s\n";
48 
49 /* Static globals */
50 static bool quit = false;
51 static bool statistics_initialized = false;
52 static pthread_t statistics_tid;
53 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
54 static pthread_cond_t wait_for_next_run = PTHREAD_COND_INITIALIZER;
55 
56 struct device_statistic {
57   dlink link;
58   bool collected{false};
59   utime_t timestamp{0};
60   btime_t DevReadTime{0};
61   btime_t DevWriteTime{0};
62   uint64_t DevWriteBytes{0};
63   uint64_t DevReadBytes{0};
64   uint64_t spool_size{0};
65   int num_waiting{0};
66   int num_writers{0};
67   DBId_t MediaId{0};
68   uint64_t VolCatBytes{0};
69   uint64_t VolCatFiles{0};
70   uint64_t VolCatBlocks{0};
71 };
72 
73 struct device_tapealert {
74   dlink link;
75   utime_t timestamp{0};
76   uint64_t flags{0};
77 };
78 
79 struct device_statistics {
80   dlink link;
81   char DevName[MAX_NAME_LENGTH]{};
82   struct device_statistic* cached{nullptr};
83   dlist* statistics{nullptr};
84   dlist* tapealerts{nullptr};
85 };
86 
87 struct job_statistic {
88   dlink link;
89   bool collected{false};
90   utime_t timestamp{0};
91   uint32_t JobFiles{0};
92   uint64_t JobBytes{0};
93   char* DevName{nullptr};
94 };
95 
96 struct job_statistics {
97   dlink link;
98   uint32_t JobId{0};
99   struct job_statistic* cached{nullptr};
100   dlist* statistics{nullptr};
101 };
102 
103 static dlist* device_statistics = NULL;
104 static dlist* job_statistics = NULL;
105 
setup_statistics()106 static inline void setup_statistics()
107 {
108   struct device_statistics* dev_stats = NULL;
109   struct job_statistics* job_stats = NULL;
110 
111   device_statistics = new dlist(dev_stats, &dev_stats->link);
112   job_statistics = new dlist(job_stats, &job_stats->link);
113 }
114 
UpdateDeviceTapealert(const char * devname,uint64_t flags,utime_t now)115 void UpdateDeviceTapealert(const char* devname, uint64_t flags, utime_t now)
116 {
117   bool found = false;
118   struct device_statistics* dev_stats = NULL;
119   struct device_tapealert* tape_alert = NULL;
120 
121   if (!me || !me->collect_dev_stats || !device_statistics) { return; }
122 
123   foreach_dlist (dev_stats, device_statistics) {
124     if (bstrcmp(dev_stats->DevName, devname)) {
125       found = true;
126       break;
127     }
128   }
129 
130   if (!found) {
131     dev_stats =
132         (struct device_statistics*)malloc(sizeof(struct device_statistics));
133     struct device_statistics empty_device_statistics;
134     *dev_stats = empty_device_statistics;
135 
136     bstrncpy(dev_stats->DevName, devname, sizeof(dev_stats->DevName));
137     P(mutex);
138     device_statistics->append(dev_stats);
139     V(mutex);
140   }
141 
142   /*
143    * Add a new tapealert message.
144    */
145   tape_alert =
146       (struct device_tapealert*)malloc(sizeof(struct device_tapealert));
147   struct device_tapealert empty_device_tapealert;
148   *tape_alert = empty_device_tapealert;
149 
150   tape_alert->timestamp = now;
151   tape_alert->flags = flags;
152 
153   if (!dev_stats->tapealerts) {
154     dev_stats->tapealerts = new dlist(tape_alert, &tape_alert->link);
155   }
156 
157   P(mutex);
158   dev_stats->tapealerts->append(tape_alert);
159   V(mutex);
160 
161   Dmsg3(200, "New stats [%lld]: Device %s TapeAlert %llu\n",
162         tape_alert->timestamp, dev_stats->DevName, tape_alert->flags);
163 }
164 
UpdateDeviceStatistics(const char * devname,Device * dev,utime_t now)165 static inline void UpdateDeviceStatistics(const char* devname,
166                                           Device* dev,
167                                           utime_t now)
168 {
169   bool found = false;
170   struct device_statistics* dev_stats = NULL;
171   struct device_statistic* dev_stat = NULL;
172 
173   if (!me || !me->collect_dev_stats || !device_statistics) { return; }
174 
175   /*
176    * See if we already have statistics for this device.
177    */
178   foreach_dlist (dev_stats, device_statistics) {
179     if (bstrcmp(dev_stats->DevName, devname)) {
180       found = true;
181       break;
182     }
183   }
184 
185   /*
186    * If we have statistics and the cached entry is filled it points
187    * to the latest sampled statistics so we compare them with the current
188    * statistics and if nothing changed we just return.
189    */
190   if (found && dev_stats->cached) {
191     dev_stat = dev_stats->cached;
192 
193     if (dev_stat->DevReadBytes == dev->DevReadBytes &&
194         dev_stat->DevWriteBytes == dev->DevWriteBytes &&
195         dev_stat->spool_size == dev->spool_size) {
196       return;
197     }
198   } else if (!found) {
199     dev_stats =
200         (struct device_statistics*)malloc(sizeof(struct device_statistics));
201     struct device_statistics empty_device_statistics;
202     *dev_stats = empty_device_statistics;
203 
204     bstrncpy(dev_stats->DevName, devname, sizeof(dev_stats->DevName));
205     P(mutex);
206     device_statistics->append(dev_stats);
207     V(mutex);
208   }
209 
210   /*
211    * Add a new set of statistics.
212    */
213   dev_stat = (struct device_statistic*)malloc(sizeof(struct device_statistic));
214 
215   struct device_statistic empty_device_statistic;
216   *dev_stat = empty_device_statistic;
217 
218   dev_stat->timestamp = now;
219   dev_stat->DevReadTime = dev->DevReadTime;
220   dev_stat->DevWriteTime = dev->DevWriteTime;
221   dev_stat->DevWriteBytes = dev->DevWriteBytes;
222   dev_stat->DevReadBytes = dev->DevReadBytes;
223   dev_stat->spool_size = dev->spool_size;
224   dev_stat->num_waiting = dev->num_waiting;
225   dev_stat->num_writers = dev->num_writers;
226   dev_stat->MediaId = dev->VolCatInfo.VolMediaId;
227   dev_stat->VolCatBytes = dev->VolCatInfo.VolCatBytes;
228   dev_stat->VolCatFiles = dev->VolCatInfo.VolCatFiles;
229   dev_stat->VolCatBlocks = dev->VolCatInfo.VolCatBlocks;
230 
231 
232   if (!dev_stats->statistics) {
233     dev_stats->statistics = new dlist(dev_stat, &dev_stat->link);
234   }
235 
236   P(mutex);
237   dev_stats->cached = dev_stat;
238   dev_stats->statistics->append(dev_stat);
239   V(mutex);
240 
241   Dmsg5(200,
242         "New stats [%lld]: Device %s Read %llu, Write %llu, Spoolsize %llu,\n",
243         dev_stat->timestamp, dev_stats->DevName, dev_stat->DevReadBytes,
244         dev_stat->DevWriteBytes, dev_stat->spool_size);
245   Dmsg4(200, "NumWaiting %ld, NumWriters %ld, ReadTime=%lld, WriteTime=%lld,\n",
246         dev_stat->num_waiting, dev_stat->num_writers, dev_stat->DevReadTime,
247         dev_stat->DevWriteTime);
248   Dmsg4(200, "MediaId=%ld VolBytes=%llu, VolFiles=%llu, VolBlocks=%llu\n",
249         dev_stat->MediaId, dev_stat->VolCatBytes, dev_stat->VolCatFiles,
250         dev_stat->VolCatBlocks);
251 }
252 
UpdateJobStatistics(JobControlRecord * jcr,utime_t now)253 void UpdateJobStatistics(JobControlRecord* jcr, utime_t now)
254 {
255   bool found = false;
256   struct job_statistics* job_stats = NULL;
257   struct job_statistic* job_stat = NULL;
258 
259   if (!me || !me->collect_job_stats || !job_statistics) { return; }
260 
261   /*
262    * Skip job 0 info
263    */
264   if (!jcr->JobId) { return; }
265 
266   /*
267    * See if we already have statistics for this job.
268    */
269   foreach_dlist (job_stats, job_statistics) {
270     if (job_stats->JobId == jcr->JobId) {
271       found = true;
272       break;
273     }
274   }
275 
276   /*
277    * If we have statistics and the cached entry is filled it points
278    * to the latest sampled statistics so we compare them with the current
279    * statistics and if nothing changed we just return.
280    */
281   if (found && job_stats->cached) {
282     job_stat = job_stats->cached;
283 
284     if (job_stat->JobFiles == jcr->JobFiles &&
285         job_stat->JobBytes == jcr->JobBytes) {
286       return;
287     }
288   } else if (!found) {
289     job_stats = (struct job_statistics*)malloc(sizeof(struct job_statistics));
290     struct job_statistics empty_job_statistics;
291     *job_stats = empty_job_statistics;
292 
293     job_stats->JobId = jcr->JobId;
294     P(mutex);
295     job_statistics->append(job_stats);
296     V(mutex);
297   }
298 
299   /*
300    * Add a new set of statistics.
301    */
302   job_stat = (struct job_statistic*)malloc(sizeof(struct job_statistic));
303   struct job_statistic empty_job_statistic;
304   *job_stat = empty_job_statistic;
305 
306   job_stat->timestamp = now;
307   job_stat->JobFiles = jcr->JobFiles;
308   job_stat->JobBytes = jcr->JobBytes;
309   if (jcr->impl->dcr && jcr->impl->dcr->device) {
310     job_stat->DevName = strdup(jcr->impl->dcr->device->resource_name_);
311   } else {
312     job_stat->DevName = strdup("unknown");
313   }
314 
315   if (!job_stats->statistics) {
316     job_stats->statistics = new dlist(job_stat, &job_stat->link);
317   }
318 
319   P(mutex);
320   job_stats->cached = job_stat;
321   job_stats->statistics->append(job_stat);
322   V(mutex);
323 
324   Dmsg5(
325       200,
326       "New stats [%lld]: JobId %ld, JobFiles %lu, JobBytes %llu, DevName %s\n",
327       job_stat->timestamp, job_stats->JobId, job_stat->JobFiles,
328       job_stat->JobBytes, job_stat->DevName);
329 }
330 
cleanup_cached_statistics()331 static inline void cleanup_cached_statistics()
332 {
333   struct device_statistics* dev_stats;
334   struct job_statistics* job_stats;
335 
336   P(mutex);
337   if (device_statistics) {
338     foreach_dlist (dev_stats, device_statistics) {
339       dev_stats->statistics->destroy();
340       dev_stats->statistics = NULL;
341     }
342 
343     device_statistics->destroy();
344     device_statistics = NULL;
345   }
346 
347   if (job_statistics) {
348     foreach_dlist (job_stats, job_statistics) {
349       job_stats->statistics->destroy();
350       job_stats->statistics = NULL;
351     }
352 
353     job_statistics->destroy();
354     job_statistics = NULL;
355   }
356   V(mutex);
357 }
358 
359 /**
360  * Entry point for a separate statistics thread.
361  */
statistics_thread_runner(void * arg)362 extern "C" void* statistics_thread_runner(void* arg)
363 {
364   utime_t now;
365   struct timeval tv;
366   struct timezone tz;
367   struct timespec timeout;
368   DeviceResource* device;
369   JobControlRecord* jcr;
370 
371   setup_statistics();
372 
373   /*
374    * Do our work as long as we are not signaled to quit.
375    */
376   while (!quit) {
377     now = (utime_t)time(NULL);
378 
379     if (me->collect_dev_stats) {
380       /*
381        * Loop over all defined devices.
382        */
383       foreach_res (device, R_DEVICE) {
384         if (device->collectstats) {
385           Device* dev;
386 
387           dev = device->dev;
388           if (dev && dev->initiated) {
389             UpdateDeviceStatistics(device->resource_name_, dev, now);
390           }
391         }
392       }
393     }
394 
395     if (me->collect_job_stats) {
396       /*
397        * Loop over all running Jobs in the Storage Daemon.
398        */
399       foreach_jcr (jcr) {
400         UpdateJobStatistics(jcr, now);
401       }
402       endeach_jcr(jcr);
403     }
404 
405     /*
406      * Wait for a next run. Normally this waits exactly
407      * me->stats_collect_interval seconds. It can be interrupted when signaled
408      * by the StopStatisticsThread() function.
409      */
410     gettimeofday(&tv, &tz);
411     timeout.tv_nsec = tv.tv_usec * 1000;
412     timeout.tv_sec = tv.tv_sec + me->stats_collect_interval;
413 
414     P(mutex);
415     pthread_cond_timedwait(&wait_for_next_run, &mutex, &timeout);
416     V(mutex);
417   }
418 
419   /*
420    * Cleanup the cached statistics.
421    */
422   cleanup_cached_statistics();
423 
424   return NULL;
425 }
426 
StartStatisticsThread(void)427 int StartStatisticsThread(void)
428 {
429   int status;
430 
431   /*
432    * First see if device and job stats collection is enabled.
433    */
434   if (!me->stats_collect_interval ||
435       (!me->collect_dev_stats && !me->collect_job_stats)) {
436     return 0;
437   }
438 
439   /*
440    * See if only device stats collection is enabled that there is a least
441    * one device of which stats are collected.
442    */
443   if (me->collect_dev_stats && !me->collect_job_stats) {
444     DeviceResource* device;
445     int cnt = 0;
446 
447     foreach_res (device, R_DEVICE) {
448       if (device->collectstats) { cnt++; }
449     }
450 
451     if (cnt == 0) { return 0; }
452   }
453 
454   if ((status = pthread_create(&statistics_tid, NULL, statistics_thread_runner,
455                                NULL)) != 0) {
456     return status;
457   }
458 
459   statistics_initialized = true;
460 
461   return 0;
462 }
463 
StopStatisticsThread()464 void StopStatisticsThread()
465 {
466   if (!statistics_initialized) { return; }
467 
468   quit = true;
469   pthread_cond_broadcast(&wait_for_next_run);
470   if (!pthread_equal(statistics_tid, pthread_self())) {
471     pthread_join(statistics_tid, NULL);
472   }
473 }
474 
StatsCmd(JobControlRecord * jcr)475 bool StatsCmd(JobControlRecord* jcr)
476 {
477   BareosSocket* dir = jcr->dir_bsock;
478   PoolMem msg(PM_MESSAGE);
479   PoolMem dev_tmp(PM_MESSAGE);
480 
481   if (device_statistics) {
482     struct device_statistics* dev_stats;
483 
484     foreach_dlist (dev_stats, device_statistics) {
485       if (dev_stats->statistics) {
486         struct device_statistic *dev_stat, *next_dev_stat;
487 
488         dev_stat = (struct device_statistic*)dev_stats->statistics->first();
489         while (dev_stat) {
490           next_dev_stat =
491               (struct device_statistic*)dev_stats->statistics->next(dev_stat);
492 
493           /*
494            * If the entry was already collected no need to do it again.
495            */
496           if (!dev_stat->collected) {
497             PmStrcpy(dev_tmp, dev_stats->DevName);
498             BashSpaces(dev_tmp);
499             Mmsg(msg, DevStats, dev_stat->timestamp, dev_tmp.c_str(),
500                  dev_stat->DevReadBytes, dev_stat->DevWriteBytes,
501                  dev_stat->spool_size, dev_stat->num_waiting,
502                  dev_stat->num_writers, dev_stat->DevReadTime,
503                  dev_stat->DevWriteTime, dev_stat->MediaId,
504                  dev_stat->VolCatBytes, dev_stat->VolCatFiles,
505                  dev_stat->VolCatBlocks);
506             Dmsg1(100, ">dird: %s", msg.c_str());
507             dir->fsend(msg.c_str());
508           }
509 
510           P(mutex);
511           /*
512            * If this is the last one on the list leave it for comparison.
513            */
514           if (!next_dev_stat) {
515             dev_stat->collected = true;
516           } else {
517             dev_stats->statistics->remove(dev_stat);
518 
519             if (dev_stats->cached == dev_stat) { dev_stats->cached = NULL; }
520           }
521           V(mutex);
522           dev_stat = next_dev_stat;
523         }
524       }
525 
526       if (dev_stats->tapealerts) {
527         struct device_tapealert *tape_alert, *next_tape_alert;
528 
529         tape_alert = (struct device_tapealert*)dev_stats->tapealerts->first();
530         while (tape_alert) {
531           PmStrcpy(dev_tmp, dev_stats->DevName);
532           BashSpaces(dev_tmp);
533           Mmsg(msg, TapeAlerts, tape_alert->timestamp, dev_tmp.c_str(),
534                tape_alert->flags);
535           Dmsg1(100, ">dird: %s", msg.c_str());
536           dir->fsend(msg.c_str());
537 
538           next_tape_alert =
539               (struct device_tapealert*)dev_stats->tapealerts->next(tape_alert);
540           P(mutex);
541           dev_stats->tapealerts->remove(tape_alert);
542           V(mutex);
543           tape_alert = next_tape_alert;
544         }
545       }
546     }
547   }
548 
549   if (job_statistics) {
550     bool found;
551     JobControlRecord* jcr;
552     struct job_statistics *job_stats, *next_job_stats;
553 
554     job_stats = (struct job_statistics*)job_statistics->first();
555     while (job_stats) {
556       if (job_stats->statistics) {
557         struct job_statistic *job_stat, *next_job_stat;
558 
559         job_stat = (struct job_statistic*)job_stats->statistics->first();
560         while (job_stat) {
561           next_job_stat =
562               (struct job_statistic*)job_stats->statistics->next(job_stat);
563 
564           /*
565            * If the entry was already collected no need to do it again.
566            */
567           if (!job_stat->collected) {
568             PmStrcpy(dev_tmp, job_stat->DevName);
569             BashSpaces(dev_tmp);
570             Mmsg(msg, JobStats, job_stat->timestamp, job_stats->JobId,
571                  job_stat->JobFiles, job_stat->JobBytes, dev_tmp.c_str());
572             Dmsg1(100, ">dird: %s", msg.c_str());
573             dir->fsend(msg.c_str());
574           }
575 
576           P(mutex);
577           /*
578            * If this is the last one on the list leave it for comparison.
579            */
580           if (!next_job_stat) {
581             job_stat->collected = true;
582           } else {
583             job_stats->statistics->remove(job_stat);
584             if (job_stats->cached == job_stat) { job_stats->cached = NULL; }
585           }
586           V(mutex);
587           job_stat = next_job_stat;
588         }
589       }
590 
591       /*
592        * If the Job doesn't exist anymore remove it from the job_statistics.
593        */
594       next_job_stats = (struct job_statistics*)job_statistics->next(job_stats);
595 
596       found = false;
597       foreach_jcr (jcr) {
598         if (jcr->JobId == job_stats->JobId) {
599           found = true;
600           break;
601         }
602       }
603       endeach_jcr(jcr);
604 
605       if (!found) {
606         P(mutex);
607         Dmsg1(200, "Removing jobid %d from job_statistics\n", job_stats->JobId);
608         job_statistics->remove(job_stats);
609         V(mutex);
610       }
611 
612       job_stats = next_job_stats;
613     }
614   }
615   dir->fsend(OKstats);
616 
617   return false;
618 }
619 
620 } /* namespace storagedaemon */
621