1package App::Netdisco::Backend::Role::Poller;
2
3use Dancer qw/:moose :syntax :script/;
4
5use Try::Tiny;
6use App::Netdisco::Util::MCE;
7
8use Time::HiRes 'sleep';
9use App::Netdisco::JobQueue qw/jq_defer jq_complete/;
10
11use Role::Tiny;
12use namespace::clean;
13
14# add dispatch methods for poller tasks
15with 'App::Netdisco::Worker::Runner';
16
17sub worker_begin { (shift)->{started} = time }
18
19sub worker_body {
20  my $self = shift;
21  my $wid = $self->wid;
22
23  while (1) {
24      prctl sprintf 'nd2: #%s poll: idle', $wid;
25
26      my $job = $self->{queue}->dequeue(1);
27      next unless defined $job;
28
29      try {
30          $job->started(scalar localtime);
31          prctl sprintf 'nd2: #%s poll: #%s: %s',
32            $wid, $job->id, $job->display_name;
33          info sprintf "pol (%s): starting %s job(%s) at %s",
34            $wid, $job->action, $job->id, $job->started;
35          $self->run($job);
36      }
37      catch {
38          $job->status('error');
39          $job->log("error running job: $_");
40          $self->sendto('stderr', $job->log ."\n");
41      };
42
43      $self->close_job($job);
44      sleep( setting('workers')->{'min_runtime'} || 0 );
45      $self->exit(0); # recycle worker
46  }
47}
48
49sub close_job {
50  my ($self, $job) = @_;
51  my $now  = scalar localtime;
52
53  info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s",
54    $self->wid, $job->action, $job->id, $job->status, $now;
55
56  try {
57      if ($job->status eq 'defer') {
58          jq_defer($job);
59      }
60      else {
61          $job->finished($now);
62          jq_complete($job);
63      }
64  }
65  catch { $self->sendto('stderr', "error closing job: $_\n") };
66}
67
681;
69