1package App::Netdisco::JobQueue::PostgreSQL; 2 3use Dancer qw/:moose :syntax :script/; 4use Dancer::Plugin::DBIC 'schema'; 5 6use App::Netdisco::Util::Device 7 qw/is_discoverable is_macsuckable is_arpnipable/; 8use App::Netdisco::Backend::Job; 9 10use Module::Load (); 11use Try::Tiny; 12 13use base 'Exporter'; 14our @EXPORT = (); 15our @EXPORT_OK = qw/ 16 jq_warm_thrusters 17 jq_getsome 18 jq_locked 19 jq_queued 20 jq_lock 21 jq_defer 22 jq_complete 23 jq_log 24 jq_userlog 25 jq_insert 26 jq_delete 27/; 28our %EXPORT_TAGS = ( all => \@EXPORT_OK ); 29 30# given a device, tests if any of the primary acls applies 31# returns a list of job actions to be denied/skipped on this host. 32sub _get_denied_actions { 33 my $device = shift; 34 my @badactions = (); 35 return @badactions unless $device; 36 37 push @badactions, ('discover', @{ setting('job_prio')->{high} }) 38 if not is_discoverable($device); 39 40 push @badactions, (qw/macsuck nbtstat/) 41 if not is_macsuckable($device); 42 43 push @badactions, 'arpnip' 44 if not is_arpnipable($device); 45 46 return @badactions; 47} 48 49sub jq_warm_thrusters { 50 my @devices = schema('netdisco')->resultset('Device')->all; 51 my $rs = schema('netdisco')->resultset('DeviceSkip'); 52 my %actionset = (); 53 54 foreach my $d (@devices) { 55 my @badactions = _get_denied_actions($d); 56 $actionset{$d->ip} = \@badactions if scalar @badactions; 57 } 58 59 schema('netdisco')->txn_do(sub { 60 $rs->search({ 61 backend => setting('workers')->{'BACKEND'}, 62 }, { for => 'update' }, )->update({ actionset => [] }); 63 64 my $deferrals = setting('workers')->{'max_deferrals'} - 1; 65 $rs->search({ 66 backend => setting('workers')->{'BACKEND'}, 67 deferrals => { '>' => $deferrals }, 68 }, { for => 'update' }, )->update({ deferrals => $deferrals }); 69 70 $rs->search({ 71 backend => setting('workers')->{'BACKEND'}, 72 actionset => { -value => [] }, 73 deferrals => 0, 74 })->delete; 75 76 $rs->update_or_create({ 77 backend => setting('workers')->{'BACKEND'}, 78 device => $_, 79 actionset => $actionset{$_}, 80 }, { key => 'primary' }) for keys %actionset; 81 }); 82 83 # fix up the pseudo devices which need layer 3 84 # TODO remove this after next release 85 schema('netdisco')->txn_do(sub { 86 my @hosts = grep { defined } 87 map { schema('netdisco')->resultset('Device')->search_for_device($_->{only}) } 88 grep { exists $_->{only} and ref '' eq ref $_->{only} } 89 grep { exists $_->{driver} and $_->{driver} eq 'cli' } 90 @{ setting('device_auth') }; 91 92 $_->update({ layers => \[q{overlay(layers placing '1' from 6 for 1)}] }) 93 for @hosts; 94 }); 95} 96 97sub jq_getsome { 98 my $num_slots = shift; 99 return () unless $num_slots and $num_slots > 0; 100 101 my $jobs = schema('netdisco')->resultset('Admin'); 102 my @returned = (); 103 104 my $tasty = schema('netdisco')->resultset('Virtual::TastyJobs') 105 ->search(undef,{ bind => [ 106 setting('workers')->{'BACKEND'}, setting('job_prio')->{'high'}, 107 setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'}, 108 setting('workers')->{'retry_after'}, $num_slots, 109 ]}); 110 111 while (my $job = $tasty->next) { 112 if ($job->device) { 113 # need to handle device discovered since backend daemon started 114 # and the skiplist was primed. these should be checked against 115 # the various acls and have device_skip entry added if needed, 116 # and return false if it should have been skipped. 117 my @badactions = _get_denied_actions($job->device); 118 if (scalar @badactions) { 119 schema('netdisco')->resultset('DeviceSkip')->find_or_create({ 120 backend => setting('workers')->{'BACKEND'}, device => $job->device, 121 },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); 122 123 # will now not be selected in a future _getsome() 124 next if scalar grep {$_ eq $job->action} @badactions; 125 } 126 } 127 128 # remove any duplicate jobs, incuding possibly this job if there 129 # is already an equivalent job running 130 131 # note that the self-removal of a job has an unhelpful log: it is 132 # reported as a duplicate of itself! however what's happening is that 133 # netdisco has seen another running job with same params (but the query 134 # cannot see that ID to use it in the message). 135 136 my %job_properties = ( 137 action => $job->action, 138 port => $job->port, 139 subaction => $job->subaction, 140 -or => [ 141 { device => $job->device }, 142 ($job->device_key ? ({ device_key => $job->device_key }) : ()), 143 ], 144 ); 145 146 my $gone = $jobs->search({ 147 status => 'queued', 148 -and => [ 149 %job_properties, 150 -or => [{ 151 job => { '!=' => $job->id }, 152 },{ 153 job => $job->id, 154 -exists => $jobs->search({ 155 status => { -like => 'queued-%' }, 156 started => \[q/> (now() - ?::interval)/, setting('jobs_stale_after')], 157 %job_properties, 158 })->as_query, 159 }], 160 ], 161 }, {for => 'update'}) 162 ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) }); 163 164 debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id; 165 push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); 166 } 167 168 return @returned; 169} 170 171sub jq_locked { 172 my @returned = (); 173 my $rs = schema('netdisco')->resultset('Admin')->search({ 174 status => ('queued-'. setting('workers')->{'BACKEND'}), 175 started => \[q/> (now() - ?::interval)/, setting('jobs_stale_after')], 176 }); 177 178 while (my $job = $rs->next) { 179 push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); 180 } 181 return @returned; 182} 183 184sub jq_queued { 185 my $job_type = shift; 186 187 return schema('netdisco')->resultset('Admin')->search({ 188 device => { '!=' => undef}, 189 action => $job_type, 190 status => { -like => 'queued%' }, 191 })->get_column('device')->all; 192} 193 194sub jq_lock { 195 my $job = shift; 196 my $happy = false; 197 198 # lock db row and update to show job has been picked 199 try { 200 my $updated = schema('netdisco')->resultset('Admin') 201 ->search({ job => $job->id, status => 'queued' }, { for => 'update' }) 202 ->update({ 203 status => ('queued-'. setting('workers')->{'BACKEND'}), 204 started => \"now()", 205 }); 206 207 $happy = true if $updated > 0; 208 } 209 catch { 210 error $_; 211 }; 212 213 return $happy; 214} 215 216sub jq_defer { 217 my $job = shift; 218 my $happy = false; 219 220 # note this taints all actions on the device. for example if both 221 # macsuck and arpnip are allowed, but macsuck fails 10 times, then 222 # arpnip (and every other action) will be prevented on the device. 223 224 # seeing as defer is only triggered by an SNMP connect failure, this 225 # behaviour seems reasonable, to me (or desirable, perhaps). 226 227 try { 228 schema('netdisco')->txn_do(sub { 229 if ($job->device) { 230 schema('netdisco')->resultset('DeviceSkip')->find_or_create({ 231 backend => setting('workers')->{'BACKEND'}, device => $job->device, 232 },{ key => 'device_skip_pkey' })->increment_deferrals; 233 } 234 235 # lock db row and update to show job is available 236 schema('netdisco')->resultset('Admin') 237 ->find($job->id, {for => 'update'}) 238 ->update({ status => 'queued', started => undef }); 239 }); 240 $happy = true; 241 } 242 catch { 243 error $_; 244 }; 245 246 return $happy; 247} 248 249sub jq_complete { 250 my $job = shift; 251 my $happy = false; 252 253 # lock db row and update to show job is done/error 254 255 # now that SNMP connect failures are deferrals and not errors, any complete 256 # status, whether success or failure, indicates an SNMP connect. reset the 257 # connection failures counter to forget about occasional connect glitches. 258 259 try { 260 schema('netdisco')->txn_do(sub { 261 if ($job->device) { 262 schema('netdisco')->resultset('DeviceSkip')->find_or_create({ 263 backend => setting('workers')->{'BACKEND'}, device => $job->device, 264 },{ key => 'device_skip_pkey' })->update({ deferrals => 0 }); 265 } 266 267 schema('netdisco')->resultset('Admin') 268 ->find($job->id, {for => 'update'})->update({ 269 status => $job->status, 270 log => $job->log, 271 started => $job->started, 272 finished => $job->finished, 273 (($job->action eq 'hook') ? (subaction => $job->subaction) : ()), 274 ($job->only_namespace ? (action => ($job->action .'::'. $job->only_namespace)) : ()), 275 }); 276 }); 277 $happy = true; 278 } 279 catch { 280 # use DDP; p $job; 281 error $_; 282 }; 283 284 return $happy; 285} 286 287sub jq_log { 288 return schema('netdisco')->resultset('Admin')->search({ 289 'me.action' => { '-not_like' => 'hook::%' }, 290 -or => [ 291 { 'me.log' => undef }, 292 { 'me.log' => { '-not_like' => 'duplicate of %' } }, 293 ], 294 }, { 295 prefetch => 'target', 296 order_by => { -desc => [qw/entered device action/] }, 297 rows => (setting('jobs_qdepth') || 50), 298 })->with_times->hri->all; 299} 300 301sub jq_userlog { 302 my $user = shift; 303 return schema('netdisco')->resultset('Admin')->search({ 304 username => $user, 305 log => { '-not_like' => 'duplicate of %' }, 306 finished => { '>' => \"(now() - interval '5 seconds')" }, 307 })->with_times->all; 308} 309 310sub jq_insert { 311 my $jobs = shift; 312 $jobs = [$jobs] if ref [] ne ref $jobs; 313 my $happy = false; 314 315 try { 316 schema('netdisco')->txn_do(sub { 317 schema('netdisco')->resultset('Admin')->populate([ 318 map {{ 319 device => $_->{device}, 320 device_key => $_->{device_key}, 321 port => $_->{port}, 322 action => $_->{action}, 323 subaction => ($_->{extra} || $_->{subaction}), 324 username => $_->{username}, 325 userip => $_->{userip}, 326 status => 'queued', 327 }} @$jobs 328 ]); 329 }); 330 $happy = true; 331 } 332 catch { 333 error $_; 334 }; 335 336 return $happy; 337} 338 339sub jq_delete { 340 my $id = shift; 341 342 if ($id) { 343 schema('netdisco')->txn_do(sub { 344 schema('netdisco')->resultset('Admin')->find($id)->delete(); 345 }); 346 } 347 else { 348 schema('netdisco')->txn_do(sub { 349 schema('netdisco')->resultset('Admin')->delete(); 350 }); 351 } 352} 353 354true; 355