1package App::Netdisco::Backend::Role::Manager; 2 3use Dancer qw/:moose :syntax :script/; 4 5use List::Util 'sum'; 6use App::Netdisco::Util::MCE; 7 8use App::Netdisco::JobQueue 9 qw/jq_locked jq_getsome jq_lock jq_warm_thrusters/; 10 11use Role::Tiny; 12use namespace::clean; 13 14sub worker_begin { 15 my $self = shift; 16 my $wid = $self->wid; 17 18 return debug "mgr ($wid): no need for manager... skip begin" 19 if setting('workers')->{'no_manager'}; 20 21 debug "entering Manager ($wid) worker_begin()"; 22 23 # job queue initialisation 24 debug "mgr ($wid): building acl hints (please be patient...)"; 25 jq_warm_thrusters; 26 27 # requeue jobs locally 28 debug "mgr ($wid): searching for jobs booked to this processing node"; 29 my @jobs = jq_locked; 30 31 if (scalar @jobs) { 32 info sprintf "mgr (%s): found %s jobs booked to this processing node", 33 $wid, scalar @jobs; 34 $self->{queue}->enqueuep(100, @jobs); 35 } 36} 37 38# creates a 'signature' for each job so that we can check for duplicates ... 39# it happens from time to time due to the distributed nature of the job queue 40# and manager(s) - also kinder to the DB to skip here rather than jq_lock() 41my $memoize = sub { 42 no warnings 'uninitialized'; 43 my $job = shift; 44 return join chr(28), map {$job->{$_}} 45 (qw/action port subaction/, ($job->{device_key} ? 'device_key' : 'device')); 46}; 47 48sub worker_body { 49 my $self = shift; 50 my $wid = $self->wid; 51 52 if (setting('workers')->{'no_manager'}) { 53 prctl sprintf 'nd2: #%s mgr: inactive', $wid; 54 return debug "mgr ($wid): no need for manager... quitting" 55 } 56 57 while (1) { 58 prctl sprintf 'nd2: #%s mgr: gathering', $wid; 59 my $num_slots = 0; 60 my %seen_job = (); 61 62 $num_slots = parse_max_workers( setting('workers')->{tasks} ) 63 - $self->{queue}->pending(); 64 debug "mgr ($wid): getting potential jobs for $num_slots workers"; 65 66 foreach my $job ( jq_getsome($num_slots) ) { 67 next if $seen_job{ $memoize->($job) }++; 68 69 # mark job as running 70 next unless jq_lock($job); 71 info sprintf "mgr (%s): job %s booked out for this processing node", 72 $wid, $job->id; 73 74 # copy job to local queue 75 $self->{queue}->enqueuep($job->job_priority, $job); 76 } 77 78 #if (scalar grep {$_ > 1} values %seen_job) { 79 # debug 'WARNING: saw duplicate jobs after getsome()'; 80 # use DDP; debug p %seen_job; 81 #} 82 83 debug "mgr ($wid): sleeping now..."; 84 prctl sprintf 'nd2: #%s mgr: idle', $wid; 85 sleep( setting('workers')->{sleep_time} || 1 ); 86 } 87} 88 891; 90