1package App::Netdisco::JobQueue::PostgreSQL;
2
3use Dancer qw/:moose :syntax :script/;
4use Dancer::Plugin::DBIC 'schema';
5
6use App::Netdisco::Util::Device
7  qw/is_discoverable is_macsuckable is_arpnipable/;
8use App::Netdisco::Backend::Job;
9
10use Module::Load ();
11use Try::Tiny;
12
13use base 'Exporter';
14our @EXPORT = ();
15our @EXPORT_OK = qw/
16  jq_warm_thrusters
17  jq_getsome
18  jq_locked
19  jq_queued
20  jq_lock
21  jq_defer
22  jq_complete
23  jq_log
24  jq_userlog
25  jq_insert
26  jq_delete
27/;
28our %EXPORT_TAGS = ( all => \@EXPORT_OK );
29
30# given a device, tests if any of the primary acls applies
31# returns a list of job actions to be denied/skipped on this host.
32sub _get_denied_actions {
33  my $device = shift;
34  my @badactions = ();
35  return @badactions unless $device;
36
37  push @badactions, ('discover', @{ setting('job_prio')->{high} })
38    if not is_discoverable($device);
39
40  push @badactions, (qw/macsuck nbtstat/)
41    if not is_macsuckable($device);
42
43  push @badactions, 'arpnip'
44    if not is_arpnipable($device);
45
46  return @badactions;
47}
48
49sub jq_warm_thrusters {
50  my @devices = schema('netdisco')->resultset('Device')->all;
51  my $rs = schema('netdisco')->resultset('DeviceSkip');
52  my %actionset = ();
53
54  foreach my $d (@devices) {
55    my @badactions = _get_denied_actions($d);
56    $actionset{$d->ip} = \@badactions if scalar @badactions;
57  }
58
59  schema('netdisco')->txn_do(sub {
60    $rs->search({
61      backend => setting('workers')->{'BACKEND'},
62    }, { for => 'update' }, )->update({ actionset => [] });
63
64    my $deferrals = setting('workers')->{'max_deferrals'} - 1;
65    $rs->search({
66      backend => setting('workers')->{'BACKEND'},
67      deferrals => { '>' => $deferrals },
68    }, { for => 'update' }, )->update({ deferrals => $deferrals });
69
70    $rs->search({
71      backend => setting('workers')->{'BACKEND'},
72      actionset => { -value => [] },
73      deferrals => 0,
74    })->delete;
75
76    $rs->update_or_create({
77      backend => setting('workers')->{'BACKEND'},
78      device  => $_,
79      actionset => $actionset{$_},
80    }, { key => 'primary' }) for keys %actionset;
81  });
82
83  # fix up the pseudo devices which need layer 3
84  # TODO remove this after next release
85  schema('netdisco')->txn_do(sub {
86    my @hosts = grep { defined }
87                map  { schema('netdisco')->resultset('Device')->search_for_device($_->{only}) }
88                grep { exists $_->{only} and ref '' eq ref $_->{only} }
89                grep { exists $_->{driver} and $_->{driver} eq 'cli' }
90                    @{ setting('device_auth') };
91
92    $_->update({ layers => \[q{overlay(layers placing '1' from 6 for 1)}] })
93      for @hosts;
94  });
95}
96
97sub jq_getsome {
98  my $num_slots = shift;
99  return () unless $num_slots and $num_slots > 0;
100
101  my $jobs = schema('netdisco')->resultset('Admin');
102  my @returned = ();
103
104  my $tasty = schema('netdisco')->resultset('Virtual::TastyJobs')
105    ->search(undef,{ bind => [
106      setting('workers')->{'BACKEND'}, setting('job_prio')->{'high'},
107      setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'},
108      setting('workers')->{'retry_after'}, $num_slots,
109    ]});
110
111  while (my $job = $tasty->next) {
112    if ($job->device) {
113      # need to handle device discovered since backend daemon started
114      # and the skiplist was primed. these should be checked against
115      # the various acls and have device_skip entry added if needed,
116      # and return false if it should have been skipped.
117      my @badactions = _get_denied_actions($job->device);
118      if (scalar @badactions) {
119        schema('netdisco')->resultset('DeviceSkip')->find_or_create({
120          backend => setting('workers')->{'BACKEND'}, device => $job->device,
121        },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
122
123        # will now not be selected in a future _getsome()
124        next if scalar grep {$_ eq $job->action} @badactions;
125      }
126    }
127
128    # remove any duplicate jobs, incuding possibly this job if there
129    # is already an equivalent job running
130
131    # note that the self-removal of a job has an unhelpful log: it is
132    # reported as a duplicate of itself! however what's happening is that
133    # netdisco has seen another running job with same params (but the query
134    # cannot see that ID to use it in the message).
135
136    my %job_properties = (
137      action => $job->action,
138      port   => $job->port,
139      subaction => $job->subaction,
140      -or => [
141        { device => $job->device },
142        ($job->device_key ? ({ device_key => $job->device_key }) : ()),
143      ],
144    );
145
146    my $gone = $jobs->search({
147      status => 'queued',
148      -and => [
149        %job_properties,
150        -or => [{
151          job => { '!=' => $job->id },
152        },{
153          job => $job->id,
154          -exists => $jobs->search({
155            status => { -like => 'queued-%' },
156            started => \[q/> (now() - ?::interval)/, setting('jobs_stale_after')],
157            %job_properties,
158          })->as_query,
159        }],
160      ],
161    }, {for => 'update'})
162        ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) });
163
164    debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id;
165    push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
166  }
167
168  return @returned;
169}
170
171sub jq_locked {
172  my @returned = ();
173  my $rs = schema('netdisco')->resultset('Admin')->search({
174    status  => ('queued-'. setting('workers')->{'BACKEND'}),
175    started => \[q/> (now() - ?::interval)/, setting('jobs_stale_after')],
176  });
177
178  while (my $job = $rs->next) {
179      push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
180  }
181  return @returned;
182}
183
184sub jq_queued {
185  my $job_type = shift;
186
187  return schema('netdisco')->resultset('Admin')->search({
188    device => { '!=' => undef},
189    action => $job_type,
190    status => { -like => 'queued%' },
191  })->get_column('device')->all;
192}
193
194sub jq_lock {
195  my $job = shift;
196  my $happy = false;
197
198  # lock db row and update to show job has been picked
199  try {
200    my $updated = schema('netdisco')->resultset('Admin')
201      ->search({ job => $job->id, status => 'queued' }, { for => 'update' })
202      ->update({
203          status  => ('queued-'. setting('workers')->{'BACKEND'}),
204          started => \"now()",
205      });
206
207    $happy = true if $updated > 0;
208  }
209  catch {
210    error $_;
211  };
212
213  return $happy;
214}
215
216sub jq_defer {
217  my $job = shift;
218  my $happy = false;
219
220  # note this taints all actions on the device. for example if both
221  # macsuck and arpnip are allowed, but macsuck fails 10 times, then
222  # arpnip (and every other action) will be prevented on the device.
223
224  # seeing as defer is only triggered by an SNMP connect failure, this
225  # behaviour seems reasonable, to me (or desirable, perhaps).
226
227  try {
228    schema('netdisco')->txn_do(sub {
229      if ($job->device) {
230        schema('netdisco')->resultset('DeviceSkip')->find_or_create({
231          backend => setting('workers')->{'BACKEND'}, device => $job->device,
232        },{ key => 'device_skip_pkey' })->increment_deferrals;
233      }
234
235      # lock db row and update to show job is available
236      schema('netdisco')->resultset('Admin')
237        ->find($job->id, {for => 'update'})
238        ->update({ status => 'queued', started => undef });
239    });
240    $happy = true;
241  }
242  catch {
243    error $_;
244  };
245
246  return $happy;
247}
248
249sub jq_complete {
250  my $job = shift;
251  my $happy = false;
252
253  # lock db row and update to show job is done/error
254
255  # now that SNMP connect failures are deferrals and not errors, any complete
256  # status, whether success or failure, indicates an SNMP connect. reset the
257  # connection failures counter to forget about occasional connect glitches.
258
259  try {
260    schema('netdisco')->txn_do(sub {
261      if ($job->device) {
262        schema('netdisco')->resultset('DeviceSkip')->find_or_create({
263          backend => setting('workers')->{'BACKEND'}, device => $job->device,
264        },{ key => 'device_skip_pkey' })->update({ deferrals => 0 });
265      }
266
267      schema('netdisco')->resultset('Admin')
268        ->find($job->id, {for => 'update'})->update({
269          status => $job->status,
270          log    => $job->log,
271          started  => $job->started,
272          finished => $job->finished,
273          (($job->action eq 'hook') ? (subaction => $job->subaction) : ()),
274          ($job->only_namespace ? (action => ($job->action .'::'. $job->only_namespace)) : ()),
275        });
276    });
277    $happy = true;
278  }
279  catch {
280    # use DDP; p $job;
281    error $_;
282  };
283
284  return $happy;
285}
286
287sub jq_log {
288  return schema('netdisco')->resultset('Admin')->search({
289    'me.action' => { '-not_like' => 'hook::%' },
290    -or => [
291      { 'me.log' => undef },
292      { 'me.log' => { '-not_like' => 'duplicate of %' } },
293    ],
294  }, {
295    prefetch => 'target',
296    order_by => { -desc => [qw/entered device action/] },
297    rows     => (setting('jobs_qdepth') || 50),
298  })->with_times->hri->all;
299}
300
301sub jq_userlog {
302  my $user = shift;
303  return schema('netdisco')->resultset('Admin')->search({
304    username => $user,
305    log      => { '-not_like' => 'duplicate of %' },
306    finished => { '>' => \"(now() - interval '5 seconds')" },
307  })->with_times->all;
308}
309
310sub jq_insert {
311  my $jobs = shift;
312  $jobs = [$jobs] if ref [] ne ref $jobs;
313  my $happy = false;
314
315  try {
316    schema('netdisco')->txn_do(sub {
317      schema('netdisco')->resultset('Admin')->populate([
318        map {{
319            device     => $_->{device},
320            device_key => $_->{device_key},
321            port       => $_->{port},
322            action     => $_->{action},
323            subaction  => ($_->{extra} || $_->{subaction}),
324            username   => $_->{username},
325            userip     => $_->{userip},
326            status     => 'queued',
327        }} @$jobs
328      ]);
329    });
330    $happy = true;
331  }
332  catch {
333    error $_;
334  };
335
336  return $happy;
337}
338
339sub jq_delete {
340  my $id = shift;
341
342  if ($id) {
343      schema('netdisco')->txn_do(sub {
344        schema('netdisco')->resultset('Admin')->find($id)->delete();
345      });
346  }
347  else {
348      schema('netdisco')->txn_do(sub {
349        schema('netdisco')->resultset('Admin')->delete();
350      });
351  }
352}
353
354true;
355