1package Redis::JobQueue::Job; 2 3=head1 NAME 4 5Redis::JobQueue::Job - Object interface for creating and manipulating jobs 6 7=head1 VERSION 8 9This documentation refers to C<Redis::JobQueue::Job> version 1.19 10 11=cut 12 13#-- Pragmas -------------------------------------------------------------------- 14 15use 5.010; 16use strict; 17use warnings; 18 19# ENVIRONMENT ------------------------------------------------------------------ 20 21our $VERSION = '1.19'; 22 23#-- load the modules ----------------------------------------------------------- 24 25use Exporter qw( 26 import 27); 28our @EXPORT_OK = qw( 29 STATUS_CREATED 30 STATUS_WORKING 31 STATUS_COMPLETED 32 STATUS_FAILED 33); 34 35#-- load the modules ----------------------------------------------------------- 36 37# Modules 38use Carp; 39use List::Util qw( 40 min 41); 42use Mouse; # automatically turns on strict and warnings 43use Mouse::Util::TypeConstraints; 44use Params::Util qw( 45 _HASH0 46 _INSTANCE 47); 48use Time::HiRes qw(); 49 50#-- declarations --------------------------------------------------------------- 51 52=head1 SYNOPSIS 53 54There are several ways to create a C<Redis::JobQueue::Job> 55object: 56 57 my $pre_job = { 58 id => '4BE19672-C503-11E1-BF34-28791473A258', 59 queue => 'lovely_queue', 60 job => 'strong_job', 61 expire => 12*60*60, # 12h 62 status => STATUS_CREATED, 63 workload => \'Some stuff up to 512MB long', 64 result => \'JOB result comes here, up to 512MB long', 65 }; 66 67 my $job = Redis::JobQueue::Job->new( 68 id => $pre_job->{id}, 69 queue => $pre_job->{queue}, 70 job => $pre_job->{job}, 71 expire => $pre_job->{expire}, 72 status => $pre_job->{status}, 73 workload => $pre_job->{workload}, 74 result => $pre_job->{result}, 75 ); 76 77 $job = Redis::JobQueue::Job->new( $pre_job ); 78 79 my $next_job = Redis::JobQueue::Job->new( $job ); 80 81Access methods to read and assign the relevant attributes of the object. 82For example: 83 84 $job->$workload( \'New workload' ); 85 # or 86 $job->$workload( 'New workload' ); 87 88 my $id = $job->id; 89 # 'workload' and 'result' return a reference to the data 90 my $result = ${ $job->result }; 91 92Returns a list of names of the modified object fields: 93 94 my @modified = $job->modified_attributes; 95 96Resets the sign of changing an attribute. For example: 97 98 $job->clear_modified( qw( status ) ); 99 100=head1 DESCRIPTION 101 102Job API is implemented by C<Redis::JobQueue::Job> class. 103 104The main features of the C<Redis::JobQueue::Job> class are: 105 106=over 3 107 108=item * 109 110Provides an object oriented model of communication. 111 112=item * 113 114Supports data representing various aspects of the job. 115 116=item * 117 118Supports the creation of the job object, an automatic allowance for the change 119attributes and the ability to cleanse the signs of change attributes. 120 121=back 122 123=head1 EXPORT 124 125None by default. 126 127The following additional constants, defining defaults for various parameters, are available for export: 128 129=over 130 131=item C<STATUS_CREATED> 132 133Initial status of the job, showing that it was created. 134 135=cut 136use constant STATUS_CREATED => '__created__'; 137 138=item C<STATUS_WORKING> 139 140Jobs is being executed. Set by the worker function. 141 142=cut 143use constant STATUS_WORKING => '__working__'; 144 145=item C<STATUS_COMPLETED> 146 147Job is completed. Set by the worker function. 148 149=cut 150use constant STATUS_COMPLETED => '__completed__'; 151 152=item C<STATUS_FAILED> 153 154Job has failed. Set by the worker function. 155 156=cut 157use constant STATUS_FAILED => '__failed__'; 158 159=back 160 161User himself should specify the status L</ STATUS_WORKING>, L</ STATUS_COMPLETED>, L</ STATUS_FAILED> 162or own status when processing the job. 163 164=cut 165 166my $meta = __PACKAGE__->meta; 167 168subtype __PACKAGE__.'::NonNegInt', 169 as 'Int', 170 where { $_ >= 0 }, 171 message { ( $_ || '' ).' is not a non-negative integer!' }, 172; 173 174subtype __PACKAGE__.'::NonNegNum', 175 as 'Num', 176 where { $_ >= 0 }, 177 message { ( $_ || '' ).' is not a non-negative number!' }, 178; 179 180subtype __PACKAGE__.'::Progress', 181 as 'Num', 182 where { $_ >= 0 and $_ <= 1 }, 183 message { ( $_ || '' ).' is not a progress number!' }, 184; 185 186subtype __PACKAGE__.'::WOSpStr', 187 as 'Str', 188 where { $_ !~ / / }, 189 message { ( $_ || '' ).' contains spaces!' }, 190; 191 192subtype __PACKAGE__.'::DataRef', 193 as 'ScalarRef' 194; 195 196coerce __PACKAGE__.'::DataRef', 197 from 'Str', 198 via { \$_ }, 199; 200 201#-- constructor ---------------------------------------------------------------- 202 203=head2 CONSTRUCTOR 204 205An error will cause the program to halt if the argument is not valid. 206 207=head3 C<new( id =E<gt> $uuid, ... )> 208 209It generates a Job object and can be called as either a class method or 210an object method. 211 212If invoked with the first argument being an object of C<Redis::JobQueue::Job> class 213or a reference to a hash, then the new object attribute values are taken from 214the hash of the first argument. 215 216C<new> optionally takes arguments. These arguments are in key-value pairs. 217 218This example illustrates a C<new()> call with all the valid arguments: 219 220 $job = Redis::JobQueue::Job->new( 221 id => '4BE19672-C503-11E1-BF34-28791473A258', 222 # UUID string, using conventional UUID string format. 223 # Do not use it because filled in automatically when 224 # you create a job. 225 queue => 'lovely_queue', # The name of the job queue. 226 # (required) 227 job => 'strong_job', # The name of the job. 228 # (optional attribute) 229 expire => 12*60*60, # Job's time to live in seconds. 230 # 0 for no expire time. 231 # (required) 232 status => STATUS_CREATED, # Current status of the job. 233 # Do not use it because value should be set by the worker. 234 workload => \'Some stuff up to 512MB long', 235 # Baseline data for the function of the worker 236 # (the function name specified in the 'job'). 237 # Can be a scalar, an object or a reference to a scalar, hash, or array 238 result => \'JOB result comes here, up to 512MB long', 239 # The result of the function of the worker 240 # (the function name specified in the 'job'). 241 # Do not use it because value should be set by the worker. 242 ); 243 244Returns the object itself, we can chain settings. 245 246The attributes C<workload> and C<result> may contain a large amount of data, 247therefore, it is desirable that they be passed as references to the actual 248data to improve performance. 249 250Do not use spaces in an C<id> attribute value. 251 252Each element in the struct data has an accessor method, which is 253used to assign and fetch the element's value. 254 255=cut 256around BUILDARGS => sub { 257 my $orig = shift; 258 my $class = shift; 259 260 if ( _INSTANCE( $_[0], __PACKAGE__ ) ) { 261 my $job = shift; 262 return $class->$orig( ( map { ( $_, $job->$_ ) } $job->job_attributes ), @_ ); 263 } else { 264 return $class->$orig( @_ ); 265 } 266}; 267 268#-- public attributes ---------------------------------------------------------- 269 270=head2 METHODS 271 272An error will cause the program to halt if the argument is not valid. 273 274=head3 C<id> 275 276=head3 C<queue> 277 278=head3 C<job> 279 280=head3 C<expire> 281 282=head3 C<status> 283 284=head3 C<workload> 285 286=head3 C<result> 287 288The family of methods for a multitude of accessor methods for your data with 289the appropriate names. These methods are able to read and assign the relevant 290attributes of the object. 291 292As attributes C<workload> and C<result> may contain a large amount of data 293(scalars, references to arrays and hashes, objects): 294 295=over 3 296 297=item * 298 299A read method returns a reference to the data. 300 301=item * 302 303A write method can receive both data or a reference to the data. 304 305=back 306 307=cut 308has 'id' => ( 309 is => 'rw', 310 isa => __PACKAGE__.'::WOSpStr', 311 default => '', 312 trigger => sub { $_[0]->_modified_set( 'id' ) }, 313); 314 315has 'queue' => ( 316 is => 'rw', 317 isa => 'Maybe[Str]', 318 required => 1, 319 trigger => sub { $_[0]->_modified_set( 'queue' ) }, 320); 321 322has 'job' => ( 323 is => 'rw', 324 isa => 'Maybe[Str]', 325 default => '', 326 trigger => sub { $_[0]->_modified_set( 'job' ) }, 327); 328 329has 'status' => ( 330 is => 'rw', 331 isa => 'Str', 332 default => STATUS_CREATED, 333 trigger => sub { $_[0]->_modified_set( 'status', $_[1] ) }, 334); 335 336has 'expire' => ( 337 is => 'rw', 338 isa => 'Maybe['.__PACKAGE__.'::NonNegInt]', 339 required => 1, 340 trigger => sub { $_[0]->_modified_set( 'expire' ) }, 341); 342 343for my $name ( qw( workload result ) ) { 344 has $name => ( 345 is => 'rw', 346 # A reference because attribute can contain a large amount of data 347 isa => __PACKAGE__.'::DataRef | HashRef | ArrayRef | ScalarRef | Object', 348 coerce => 1, 349 builder => '_build_data', # will throw an error if you pass a bare non-subroutine reference as the default 350 trigger => sub { $_[0]->_modified_set( $name ) }, 351 ); 352} 353 354=head3 C<progress> 355 356Optional attribute, the progress of the task, 357contains a user-defined value from 0 to 1. 358 359=cut 360has 'progress' => ( 361 is => 'rw', 362 isa => __PACKAGE__.'::Progress', 363 default => 0, 364 trigger => sub { $_[0]->_modified_set( 'progress' ) }, 365); 366 367=head3 C<message> 368 369Optional attribute, a string message with additional user-defined information. 370 371=cut 372has 'message' => ( 373 is => 'rw', 374 isa => 'Maybe[Str]', 375 default => '', 376 trigger => sub { $_[0]->_modified_set( 'message' ) }, 377); 378 379=head3 C<created> 380 381Returns time of job creation. 382Set to the current time (C<Time::HiRes::time>) when job is created. 383 384If necessary, alternative value can be set as: 385 386 $job->created( time ); 387 388=head3 C<updated> 389 390Returns the time of the most recent modification of the job. 391 392Set to the current time (C<Time::HiRes::time>) when value(s) of any of the following data changes: 393L</status>, L</workload>, L</result>, L</progress>, L</message>, L</completed>, L</failed>. 394 395Can be updated manually: 396 397 $job->updated( time ); 398 399=cut 400for my $name ( qw( created updated ) ) { 401 has $name => ( 402 is => 'rw', 403 isa => __PACKAGE__.'::NonNegNum', 404 default => sub { Time::HiRes::time }, 405 trigger => sub { $_[0]->_modified_set( $name ) }, 406 ); 407} 408 409=head3 C<started> 410 411Returns the time that the job started processing. 412Set to the current time (C<Time::HiRes::time>) when the L</status> of the job is set to L</STATUS_WORKING>. 413 414If necessary, you can set your own value, for example: 415 416 $job->started( time ); 417 418=head3 C<completed> 419 420Returns the time of the task completion. 421 422It is set to 0 when task is created. 423 424Set to C<Time::HiRes::time> when L</status> is changed to L</STATUS_COMPLETED>. 425 426Can be modified manually: 427 428 $job->completed( time ); 429 430Change the C<completed> attribute sets C<failed> = 0. 431The attributes C<completed> and C<failed> are mutually exclusive. 432 433=head3 C<failed> 434 435Returns the time of the task failure. 436 437It is set to 0 when task is created. 438 439Set to C<Time::HiRes::time> when L</status> is changed to L</STATUS_FAILED>. 440 441Can be modified manually: 442 443 $job->failed( time ); 444 445Change the C<failed> attribute sets C<completed> = 0. 446The attributes C<failed> and C<completed> are mutually exclusive. 447 448=cut 449for my $name ( qw( started completed failed ) ) { 450 has $name => ( 451 is => 'rw', 452 isa => __PACKAGE__.'::NonNegNum', 453 default => 0, 454 trigger => sub { $_[0]->_modified_set( $name ) }, 455 ); 456} 457 458#-- private attributes --------------------------------------------------------- 459 460has '_meta_data' => ( 461 is => 'rw', 462 isa => 'HashRef', 463 init_arg => 'meta_data', 464 default => sub { {} }, 465); 466 467has '__modified' => ( 468 is => 'ro', 469 isa => 'HashRef[Int]', 470 lazy => 1, 471 init_arg => undef, # we make it impossible to set this attribute when creating a new object 472 builder => '_build_modified', 473); 474 475has '__modified_meta_data' => ( 476 is => 'rw', 477 isa => 'HashRef[Int]', 478 lazy => 1, 479 init_arg => undef, # we make it impossible to set this attribute when creating a new object 480 default => sub { return {}; }, 481); 482 483#-- public methods ------------------------------------------------------------- 484 485=head3 C<elapsed> 486 487Returns the time (a floating seconds since the epoch) since the job started processing (see L</started>) 488till job L</completed> or L</failed> or to the current time. 489Returns C<undef> if the start processing time was set to 0. 490 491=cut 492sub elapsed { 493 my ( $self ) = @_; 494 495 if ( my $started = $self->started ) { 496 return( ( $self->completed || $self->failed || Time::HiRes::time ) - $started ); 497 } else { 498 return( undef ); 499 } 500} 501 502=head3 C<meta_data> 503 504With no arguments, returns a reference to a hash of metadata (additional information related to the job). 505For example: 506 507 my $md = $job->meta_data; 508 509Hash value of an individual item metadata is available by specifying the name of the hash key. 510For example: 511 512 my $foo = $job->meta_data( 'foo' ); 513 514Separate metadata value can be set as follows: 515 516 my $foo = $job->meta_data( next => 16 ); 517 518Group metadata can be specified by reference to a hash. 519Metadata may contain scalars, references to arrays and hashes, objects. 520For example: 521 522 $job->meta_data( 523 { 524 'foo' => 12, 525 'bar' => [ 13, 14, 15 ], 526 'other' => { a => 'b', c => 'd' }, 527 } 528 ); 529 530The name of the metadata fields should not match the standard names returned by 531L</job_attributes> and must not begin with C<'__'}>. 532An invalid name causes die (C<confess>). 533 534=cut 535my %_attributes = map { ( $_->name eq '_meta_data' ? 'meta_data' : $_->name ) => 1 } grep { substr( $_->name, 0, 2 ) ne '__' } $meta->get_all_attributes; 536 537sub meta_data { 538 my ( $self, $key, $val ) = @_; 539 540 return $self->_meta_data 541 if !defined $key; 542 543 # metadata can be set with an external hash 544 if ( _HASH0( $key ) ) { 545 foreach my $field ( keys %$key ) { 546 confess 'The name of the metadata field the same as standart job field name' 547 if exists $_attributes{ $field } || substr( $field, 0, 2 ) eq '__'; 548 } 549 $self->_meta_data( $key ); 550 $self->__modified_meta_data( {} ); 551 $self->__modified_meta_data->{ $_ } = 1 552 foreach keys %$key; 553 return; 554 } 555 556 # getter 557 return $self->_meta_data->{ $key } 558 if !defined $val; 559 560 # setter 561 confess 'The name of the metadata field the same as standart job field name' 562 if exists $_attributes{ $key } || substr( $key, 0, 2 ) eq '__'; 563 $self->_meta_data->{ $key } = $val; 564 ++$self->__modified_meta_data->{ $key }; 565 566 # job data change 567 $self->updated( Time::HiRes::time ); 568 ++$self->__modified->{ 'updated' }; 569 570 return; 571} 572 573=head3 C<clear_modified( @fields )> 574 575Resets the sign of any specified attributes that have been changed. 576If no attribute names are specified, the signs are reset for all attributes. 577 578=cut 579sub clear_modified { 580 my ( $self, @fields ) = @_; 581 582 unless ( @fields ) { 583 $self->clear_modified( $self->job_attributes ); 584 my @keys = keys %{ $self->__modified_meta_data }; 585 $self->clear_modified( @keys ) 586 if @keys; 587 return; 588 } 589 590 foreach my $field ( @fields ) { 591 if ( exists $self->__modified->{ $field } ) { $self->__modified->{ $field } = 0 } 592 elsif ( exists $self->__modified_meta_data->{ $field } ) { $self->__modified_meta_data->{ $field } = 0 } 593 } 594 595 return; 596} 597 598=head3 C<modified_attributes> 599 600Returns a list of names of the object attributes that have been modified. 601 602=cut 603sub modified_attributes { 604 my ( $self ) = @_; 605 606 my @all_modified = ( 607 grep( { $self->__modified->{ $_ } } $self->job_attributes ), 608 grep( { $self->__modified_meta_data->{ $_ } } keys( %{ $self->__modified_meta_data } ) ), 609 ); 610 611 return @all_modified; 612} 613 614=head3 C<job_attributes> 615 616Returns a sorted list of the names of object attributes. 617 618=cut 619sub job_attributes { 620 return( sort keys %_attributes ); 621} 622 623#-- private methods ------------------------------------------------------------ 624 625sub _build_data { 626 my $empty_data = q{}; 627 return \$empty_data; 628} 629 630sub _build_modified { 631 my ( $self ) = @_; 632 633 my %modified; 634 map { $modified{ $_ } = 1 } $self->job_attributes; 635 return \%modified; 636} 637 638sub _modified_set { 639 my $self = shift; 640 my $field = shift; 641 642 if ( $field =~ /^(status|meta_data|workload|result|progress|message|started|completed|failed)$/ ) { 643 $self->updated( Time::HiRes::time ); 644 ++$self->__modified->{ 'updated' }; 645 } 646 647 if ( $field eq 'status' ) { 648 my $new_status = shift; 649 if ( $new_status eq STATUS_CREATED ) { $self->created( Time::HiRes::time ) } 650 elsif ( $new_status eq STATUS_WORKING ) { $self->started( Time::HiRes::time ) unless $self->started } 651 elsif ( $new_status eq STATUS_COMPLETED ) { $self->completed( Time::HiRes::time ) } 652 elsif ( $new_status eq STATUS_FAILED ) { $self->failed( Time::HiRes::time ) } 653 } 654 655 ++$self->__modified->{ $field }; 656 657 return; 658} 659 660#-- Closes and cleans up ------------------------------------------------------- 661 662no Mouse::Util::TypeConstraints; 663no Mouse; # keywords are removed from the package 664__PACKAGE__->meta->make_immutable(); 665 666__END__ 667 668=head1 DIAGNOSTICS 669 670An error will cause the program to halt (C<confess>) if an argument 671is not valid. Use C<$@> for the analysis of the specific reasons. 672 673=head1 SEE ALSO 674 675The basic operation of the L<Redis::JobQueue|Redis::JobQueue> package modules: 676 677L<Redis::JobQueue|Redis::JobQueue> - Object interface for creating and 678executing jobs queues, as well as monitoring the status and results of jobs. 679 680L<Redis::JobQueue::Job|Redis::JobQueue::Job> - Object interface for creating 681and manipulating jobs. 682 683L<Redis::JobQueue::Util|Redis::JobQueue::Util> - String manipulation utilities. 684 685L<Redis|Redis> - Perl binding for Redis database. 686 687=head1 SOURCE CODE 688 689Redis::JobQueue is hosted on GitHub: 690L<https://github.com/TrackingSoft/Redis-JobQueue> 691 692=head1 AUTHOR 693 694Sergey Gladkov, E<lt>sgladkov@trackingsoft.comE<gt> 695 696Please use GitHub project link above to report problems or contact authors. 697 698=head1 CONTRIBUTORS 699 700Alexander Solovey 701 702Jeremy Jordan 703 704Sergiy Zuban 705 706Vlad Marchenko 707 708=head1 COPYRIGHT AND LICENSE 709 710Copyright (C) 2012-2016 by TrackingSoft LLC. 711 712This package is free software; you can redistribute it and/or modify it under 713the same terms as Perl itself. See I<perlartistic> at 714L<http://dev.perl.org/licenses/artistic.html>. 715 716This program is 717distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; 718without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 719PARTICULAR PURPOSE. 720 721=cut 722