1package App::Netdisco::Backend::Role::Manager;
2
3use Dancer qw/:moose :syntax :script/;
4
5use List::Util 'sum';
6use App::Netdisco::Util::MCE;
7
8use App::Netdisco::JobQueue
9  qw/jq_locked jq_getsome jq_lock jq_warm_thrusters/;
10
11use Role::Tiny;
12use namespace::clean;
13
14sub worker_begin {
15  my $self = shift;
16  my $wid = $self->wid;
17
18  return debug "mgr ($wid): no need for manager... skip begin"
19    if setting('workers')->{'no_manager'};
20
21  debug "entering Manager ($wid) worker_begin()";
22
23  # job queue initialisation
24  debug "mgr ($wid): building acl hints (please be patient...)";
25  jq_warm_thrusters;
26
27  # requeue jobs locally
28  debug "mgr ($wid): searching for jobs booked to this processing node";
29  my @jobs = jq_locked;
30
31  if (scalar @jobs) {
32      info sprintf "mgr (%s): found %s jobs booked to this processing node",
33        $wid, scalar @jobs;
34      $self->{queue}->enqueuep(100, @jobs);
35  }
36}
37
38# creates a 'signature' for each job so that we can check for duplicates ...
39# it happens from time to time due to the distributed nature of the job queue
40# and manager(s) - also kinder to the DB to skip here rather than jq_lock()
41my $memoize = sub {
42  no warnings 'uninitialized';
43  my $job = shift;
44  return join chr(28), map {$job->{$_}}
45    (qw/action port subaction/, ($job->{device_key} ? 'device_key' : 'device'));
46};
47
48sub worker_body {
49  my $self = shift;
50  my $wid = $self->wid;
51
52  if (setting('workers')->{'no_manager'}) {
53      prctl sprintf 'nd2: #%s mgr: inactive', $wid;
54      return debug "mgr ($wid): no need for manager... quitting"
55  }
56
57  while (1) {
58      prctl sprintf 'nd2: #%s mgr: gathering', $wid;
59      my $num_slots = 0;
60      my %seen_job = ();
61
62      $num_slots = parse_max_workers( setting('workers')->{tasks} )
63                     - $self->{queue}->pending();
64      debug "mgr ($wid): getting potential jobs for $num_slots workers";
65
66      foreach my $job ( jq_getsome($num_slots) ) {
67          next if $seen_job{ $memoize->($job) }++;
68
69          # mark job as running
70          next unless jq_lock($job);
71          info sprintf "mgr (%s): job %s booked out for this processing node",
72            $wid, $job->id;
73
74          # copy job to local queue
75          $self->{queue}->enqueuep($job->job_priority, $job);
76      }
77
78      #if (scalar grep {$_ > 1} values %seen_job) {
79      #  debug 'WARNING: saw duplicate jobs after getsome()';
80      #  use DDP; debug p %seen_job;
81      #}
82
83      debug "mgr ($wid): sleeping now...";
84      prctl sprintf 'nd2: #%s mgr: idle', $wid;
85      sleep( setting('workers')->{sleep_time} || 1 );
86  }
87}
88
891;
90