1package Gearman::Server::Job; 2use strict; 3use Scalar::Util; 4use Sys::Hostname; 5 6use fields ( 7 'func', 8 'uniq', 9 'argref', 10 'listeners', # arrayref of interested Clients 11 'worker', 12 'handle', 13 'status', # [1, 100] 14 'require_listener', 15 'server', # Gearman::Server that owns us 16 ); 17 18sub new { 19 my Gearman::Server::Job $self = shift; 20 my ($server, $func, $uniq, $argref, $highpri) = @_; 21 22 $self = fields::new($self) unless ref $self; 23 24 # if they specified a uniq, see if we have a dup job running already 25 # to merge with 26 if (length($uniq)) { 27 # a unique value of "-" means "use my args as my unique key" 28 $uniq = $$argref if $uniq eq "-"; 29 if (my $job = $server->job_of_unique($func, $uniq)) { 30 # found a match 31 return $job; 32 } 33 # create a new key 34 $server->set_unique_job($func, $uniq => $self); 35 } 36 37 $self->{'server'} = $server; 38 $self->{'func'} = $func; 39 $self->{'uniq'} = $uniq; 40 $self->{'argref'} = $argref; 41 $self->{'require_listener'} = 1; 42 $self->{'listeners'} = []; 43 $self->{'handle'} = $server->new_job_handle; 44 45 $server->enqueue_job($self, $highpri); 46 return $self; 47} 48 49sub add_listener { 50 my Gearman::Server::Job $self = shift; 51 my Gearman::Server::Client $li = shift; 52 53 push @{$self->{listeners}}, $li; 54 Scalar::Util::weaken($self->{listeners}->[-1]); 55} 56 57sub relay_to_listeners { 58 my Gearman::Server::Job $self = shift; 59 foreach my Gearman::Server::Client $c (@{$self->{listeners}}) { 60 next if !$c || $c->{closed}; 61 $c->write($_[0]); 62 } 63} 64 65sub relay_to_option_listeners { 66 my Gearman::Server::Job $self = shift; 67 my $option = $_[1]; 68 foreach my Gearman::Server::Client $c (@{$self->{listeners}}) { 69 next if !$c || $c->{closed}; 70 next unless $c->option($option); 71 $c->write($_[0]); 72 } 73 74} 75 76sub clear_listeners { 77 my Gearman::Server::Job $self = shift; 78 $self->{listeners} = []; 79} 80 81sub listeners { 82 my Gearman::Server::Job $self = shift; 83 return @{$self->{listeners}}; 84} 85 86sub uniq { 87 my Gearman::Server::Job $self = shift; 88 return $self->{uniq}; 89} 90 91sub note_finished { 92 my Gearman::Server::Job $self = shift; 93 my $success = shift; 94 95 $self->{server}->note_job_finished($self); 96 97 if ($Gearmand::graceful_shutdown) { 98 Gearmand::shutdown_if_calm(); 99 } 100} 101 102# accessors: 103sub worker { 104 my Gearman::Server::Job $self = shift; 105 return $self->{'worker'} unless @_; 106 return $self->{'worker'} = shift; 107} 108sub require_listener { 109 my Gearman::Server::Job $self = shift; 110 return $self->{'require_listener'} unless @_; 111 return $self->{'require_listener'} = shift; 112} 113 114# takes arrayref of [numerator,denominator] 115sub status { 116 my Gearman::Server::Job $self = shift; 117 return $self->{'status'} unless @_; 118 return $self->{'status'} = shift; 119} 120 121sub handle { 122 my Gearman::Server::Job $self = shift; 123 return $self->{'handle'}; 124} 125 126sub func { 127 my Gearman::Server::Job $self = shift; 128 return $self->{'func'}; 129} 130 131sub argref { 132 my Gearman::Server::Job $self = shift; 133 return $self->{'argref'}; 134} 135 1361; 137