1package Gearman::Server::Job;
2use strict;
3use Scalar::Util;
4use Sys::Hostname;
5
6use fields (
7            'func',
8            'uniq',
9            'argref',
10            'listeners',  # arrayref of interested Clients
11            'worker',
12            'handle',
13            'status',  # [1, 100]
14            'require_listener',
15            'server',  # Gearman::Server that owns us
16            );
17
18sub new {
19    my Gearman::Server::Job $self = shift;
20    my ($server, $func, $uniq, $argref, $highpri) = @_;
21
22    $self = fields::new($self) unless ref $self;
23
24    # if they specified a uniq, see if we have a dup job running already
25    # to merge with
26    if (length($uniq)) {
27        # a unique value of "-" means "use my args as my unique key"
28        $uniq = $$argref if $uniq eq "-";
29        if (my $job = $server->job_of_unique($func, $uniq)) {
30            # found a match
31            return $job;
32        }
33        # create a new key
34        $server->set_unique_job($func, $uniq => $self);
35    }
36
37    $self->{'server'} = $server;
38    $self->{'func'}   = $func;
39    $self->{'uniq'}   = $uniq;
40    $self->{'argref'} = $argref;
41    $self->{'require_listener'} = 1;
42    $self->{'listeners'} = [];
43    $self->{'handle'}  = $server->new_job_handle;
44
45    $server->enqueue_job($self, $highpri);
46    return $self;
47}
48
49sub add_listener {
50    my Gearman::Server::Job $self = shift;
51    my Gearman::Server::Client $li = shift;
52
53    push @{$self->{listeners}}, $li;
54    Scalar::Util::weaken($self->{listeners}->[-1]);
55}
56
57sub relay_to_listeners {
58    my Gearman::Server::Job $self = shift;
59    foreach my Gearman::Server::Client $c (@{$self->{listeners}}) {
60        next if !$c || $c->{closed};
61        $c->write($_[0]);
62    }
63}
64
65sub relay_to_option_listeners {
66    my Gearman::Server::Job $self = shift;
67    my $option = $_[1];
68    foreach my Gearman::Server::Client $c (@{$self->{listeners}}) {
69        next if !$c || $c->{closed};
70        next unless $c->option($option);
71        $c->write($_[0]);
72    }
73
74}
75
76sub clear_listeners {
77    my Gearman::Server::Job $self = shift;
78    $self->{listeners} = [];
79}
80
81sub listeners {
82    my Gearman::Server::Job $self = shift;
83    return @{$self->{listeners}};
84}
85
86sub uniq {
87    my Gearman::Server::Job $self = shift;
88    return $self->{uniq};
89}
90
91sub note_finished {
92    my Gearman::Server::Job $self = shift;
93    my $success = shift;
94
95    $self->{server}->note_job_finished($self);
96
97    if ($Gearmand::graceful_shutdown) {
98        Gearmand::shutdown_if_calm();
99    }
100}
101
102# accessors:
103sub worker {
104    my Gearman::Server::Job $self = shift;
105    return $self->{'worker'} unless @_;
106    return $self->{'worker'} = shift;
107}
108sub require_listener {
109    my Gearman::Server::Job $self = shift;
110    return $self->{'require_listener'} unless @_;
111    return $self->{'require_listener'} = shift;
112}
113
114# takes arrayref of [numerator,denominator]
115sub status {
116    my Gearman::Server::Job $self = shift;
117    return $self->{'status'} unless @_;
118    return $self->{'status'} = shift;
119}
120
121sub handle {
122    my Gearman::Server::Job $self = shift;
123    return $self->{'handle'};
124}
125
126sub func {
127    my Gearman::Server::Job $self = shift;
128    return $self->{'func'};
129}
130
131sub argref {
132    my Gearman::Server::Job $self = shift;
133    return $self->{'argref'};
134}
135
1361;
137