1package App::Netdisco::Backend::Role::Poller; 2 3use Dancer qw/:moose :syntax :script/; 4 5use Try::Tiny; 6use App::Netdisco::Util::MCE; 7 8use Time::HiRes 'sleep'; 9use App::Netdisco::JobQueue qw/jq_defer jq_complete/; 10 11use Role::Tiny; 12use namespace::clean; 13 14# add dispatch methods for poller tasks 15with 'App::Netdisco::Worker::Runner'; 16 17sub worker_begin { (shift)->{started} = time } 18 19sub worker_body { 20 my $self = shift; 21 my $wid = $self->wid; 22 23 while (1) { 24 prctl sprintf 'nd2: #%s poll: idle', $wid; 25 26 my $job = $self->{queue}->dequeue(1); 27 next unless defined $job; 28 29 try { 30 $job->started(scalar localtime); 31 prctl sprintf 'nd2: #%s poll: #%s: %s', 32 $wid, $job->id, $job->display_name; 33 info sprintf "pol (%s): starting %s job(%s) at %s", 34 $wid, $job->action, $job->id, $job->started; 35 $self->run($job); 36 } 37 catch { 38 $job->status('error'); 39 $job->log("error running job: $_"); 40 $self->sendto('stderr', $job->log ."\n"); 41 }; 42 43 $self->close_job($job); 44 sleep( setting('workers')->{'min_runtime'} || 0 ); 45 $self->exit(0); # recycle worker 46 } 47} 48 49sub close_job { 50 my ($self, $job) = @_; 51 my $now = scalar localtime; 52 53 info sprintf "pol (%s): wrapping up %s job(%s) - status %s at %s", 54 $self->wid, $job->action, $job->id, $job->status, $now; 55 56 try { 57 if ($job->status eq 'defer') { 58 jq_defer($job); 59 } 60 else { 61 $job->finished($now); 62 jq_complete($job); 63 } 64 } 65 catch { $self->sendto('stderr', "error closing job: $_\n") }; 66} 67 681; 69