1package Test2::Harness::Util::Queue; 2use strict; 3use warnings; 4 5our $VERSION = '1.000082'; 6 7use Carp qw/croak/; 8use Time::HiRes qw/time/; 9use Test2::Harness::Util qw/write_file_atomic/; 10 11use Test2::Harness::Util::File::JSONL(); 12 13use Test2::Harness::Util::HashBase qw{ 14 -file -qh -ended 15}; 16 17sub init { 18 my $self = shift; 19 20 croak "'file' is a required attribute" 21 unless $self->{+FILE}; 22} 23 24sub start { 25 my $self = shift; 26 write_file_atomic($self->{+FILE}, ""); 27} 28 29sub seek { 30 my $self = shift; 31 my ($pos) = @_; 32 33 $self->{+QH} ||= Test2::Harness::Util::File::JSONL->new(name => $self->{+FILE}); 34 $self->{+QH}->seek($pos); 35 36 return $pos; 37} 38 39sub reset { 40 my $self = shift; 41 delete $self->{+QH}; 42} 43 44sub poll { 45 my $self = shift; 46 my $max = shift; 47 48 return $self->{+ENDED} if $self->{+ENDED}; 49 50 $self->{+QH} ||= Test2::Harness::Util::File::JSONL->new(name => $self->{+FILE}); 51 my @out = $self->{+QH}->poll_with_index( $max ? (max => $max) : () ); 52 53 $self->{+ENDED} = $out[-1] if @out && !defined($out[-1]->[-1]); 54 55 return @out; 56} 57 58sub end { 59 my $self = shift; 60 $self->_enqueue(undef); 61} 62 63sub enqueue { 64 my $self = shift; 65 my ($task) = @_; 66 67 croak "Invalid task" 68 unless $task && ref($task) eq 'HASH' && values %$task; 69 70 $task->{stamp} ||= time; 71 72 $self->_enqueue($task); 73} 74 75sub _enqueue { 76 my $self = shift; 77 my ($task) = @_; 78 79 my $fh = Test2::Harness::Util::File::JSONL->new(name => $self->{+FILE}, use_write_lock => 1); 80 $fh->write($task); 81} 82 831; 84 85__END__ 86 87=pod 88 89=encoding UTF-8 90 91=head1 NAME 92 93Test2::Harness::Util::Queue - Representation of a queue. 94 95=head1 DESCRIPTION 96 97This module represents a queue, stored as a jsonl file. 98 99=head1 SYNOPSIS 100 101 use Test2::Harness::Util::Queue; 102 103 my $queue = Test2::Harness::Util::Queue->new(file => '/path/to/queue.jsonl'); 104 105 $queue->start(); # Create the queue 106 107 $queue->enqueue({foo => 'bar', baz => 'bat'}); 108 $queue->enqueue({foo => 'bar2', baz => 'bat2'}); 109 ... 110 111 $queue->end(); 112 113Then in another processs: 114 115 use Test2::Harness::Util::Queue; 116 117 my $queue = Test2::Harness::Util::Queue->new(file => '/path/to/queue.jsonl'); 118 119 my @items; 120 while (1) { 121 @items = $queue->poll(); 122 while (@items) { 123 my $item = shift @items or last; 124 125 ... process $item 126 } 127 128 # Queue ends with an 'undef' entry 129 last if @items && !defined($items[0]); 130 } 131 132=head1 METHODS 133 134=over 4 135 136=item $path = $queue->file 137 138The filename used for the queue 139 140=back 141 142=head2 READING 143 144=over 4 145 146=item $queue->reset() 147 148Restart reading the queue. 149 150=item @items = $queue->poll() 151 152Get more items from the queue. May need to call it multiple times, specially if 153another process is still writing to the queue. 154 155Returns an empty list if no items are available yet. 156 157Returns 'undef' to terminate the list. 158 159=item $bool = $queue->ended() 160 161Check if the queue has ended. 162 163=back 164 165=head1 WRITING 166 167=over 4 168 169=item $queue->start() 170 171Open the queue file for writing. 172 173=item $queue->enqueue(\%HASHREF) 174 175Add an item to the queue. 176 177=item $queue->end() 178 179Terminate the queue. 180 181=back 182 183=head1 SOURCE 184 185The source code repository for Test2-Harness can be found at 186F<http://github.com/Test-More/Test2-Harness/>. 187 188=head1 MAINTAINERS 189 190=over 4 191 192=item Chad Granum E<lt>exodist@cpan.orgE<gt> 193 194=back 195 196=head1 AUTHORS 197 198=over 4 199 200=item Chad Granum E<lt>exodist@cpan.orgE<gt> 201 202=back 203 204=head1 COPYRIGHT 205 206Copyright 2020 Chad Granum E<lt>exodist7@gmail.comE<gt>. 207 208This program is free software; you can redistribute it and/or 209modify it under the same terms as Perl itself. 210 211See F<http://dev.perl.org/licenses/> 212 213=cut 214