1package App::Netdisco::Backend::Role::Scheduler;
2
3use Dancer qw/:moose :syntax :script/;
4
5use NetAddr::IP;
6use Algorithm::Cron;
7use App::Netdisco::Util::MCE;
8use App::Netdisco::JobQueue qw/jq_insert/;
9
10use Role::Tiny;
11use namespace::clean;
12
13sub worker_begin {
14  my $self = shift;
15  my $wid = $self->wid;
16
17  return debug "sch ($wid): no need for scheduler... skip begin"
18    unless setting('schedule');
19
20  debug "entering Scheduler ($wid) worker_begin()";
21
22  foreach my $action (keys %{ setting('schedule') }) {
23      my $config = setting('schedule')->{$action}
24        or next;
25
26      # accept either single crontab format, or individual time fields
27      $config->{when} = Algorithm::Cron->new(
28        base => 'local',
29        %{
30          (ref {} eq ref $config->{when})
31            ? $config->{when}
32            : {crontab => $config->{when}}
33        }
34      );
35  }
36}
37
38sub worker_body {
39  my $self = shift;
40  my $wid = $self->wid;
41
42  unless (setting('schedule')) {
43      prctl sprintf 'nd2: #%s sched: inactive', $wid;
44      return debug "sch ($wid): no need for scheduler... quitting"
45  }
46
47  while (1) {
48      # sleep until some point in the next minute
49      my $naptime = 60 - (time % 60) + int(rand(45));
50
51      prctl sprintf 'nd2: #%s sched: idle', $wid;
52      debug "sched ($wid): sleeping for $naptime seconds";
53
54      sleep $naptime;
55      prctl sprintf 'nd2: #%s sched: queueing', $wid;
56
57      # NB next_time() returns the next *after* win_start
58      my $win_start = time - (time % 60) - 1;
59      my $win_end   = $win_start + 60;
60
61      # if any job is due, add it to the queue
62      foreach my $action (keys %{ setting('schedule') }) {
63          my $sched = setting('schedule')->{$action} or next;
64          my $real_action = ($sched->{action} || $action);
65
66          # next occurence of job must be in this minute's window
67          debug sprintf "sched ($wid): $real_action: win_start: %s, win_end: %s, next: %s",
68            $win_start, $win_end, $sched->{when}->next_time($win_start);
69          next unless $sched->{when}->next_time($win_start) <= $win_end;
70
71          my $net = NetAddr::IP->new($sched->{device});
72          next if ($sched->{device}
73            and (!$net or $net->num == 0 or $net->addr eq '0.0.0.0'));
74
75          my @hostlist = map { (ref $_) ? $_->addr : undef }
76            (defined $sched->{device} ? ($net->hostenum) : (undef));
77          my @job_specs = ();
78
79          foreach my $host (@hostlist) {
80            push @job_specs, {
81              action => $real_action,
82              device => $host,
83              port   => $sched->{port},
84              subaction => $sched->{extra},
85            };
86          }
87
88          info sprintf 'sched (%s): queueing %s %s jobs',
89            $wid, (scalar @job_specs), $real_action;
90          jq_insert( \@job_specs );
91      }
92  }
93}
94
951;
96