1package Test2::Harness::Util::Queue;
2use strict;
3use warnings;
4
5our $VERSION = '1.000082';
6
7use Carp qw/croak/;
8use Time::HiRes qw/time/;
9use Test2::Harness::Util qw/write_file_atomic/;
10
11use Test2::Harness::Util::File::JSONL();
12
13use Test2::Harness::Util::HashBase qw{
14    -file -qh -ended
15};
16
17sub init {
18    my $self = shift;
19
20    croak "'file' is a required attribute"
21        unless $self->{+FILE};
22}
23
24sub start {
25    my $self = shift;
26    write_file_atomic($self->{+FILE}, "");
27}
28
29sub seek {
30    my $self = shift;
31    my ($pos) = @_;
32
33    $self->{+QH} ||= Test2::Harness::Util::File::JSONL->new(name => $self->{+FILE});
34    $self->{+QH}->seek($pos);
35
36    return $pos;
37}
38
39sub reset {
40    my $self = shift;
41    delete $self->{+QH};
42}
43
44sub poll {
45    my $self = shift;
46    my $max = shift;
47
48    return $self->{+ENDED} if $self->{+ENDED};
49
50    $self->{+QH} ||= Test2::Harness::Util::File::JSONL->new(name => $self->{+FILE});
51    my @out = $self->{+QH}->poll_with_index( $max ? (max => $max) : () );
52
53    $self->{+ENDED} = $out[-1] if @out && !defined($out[-1]->[-1]);
54
55    return @out;
56}
57
58sub end {
59    my $self = shift;
60    $self->_enqueue(undef);
61}
62
63sub enqueue {
64    my $self = shift;
65    my ($task) = @_;
66
67    croak "Invalid task"
68        unless $task && ref($task) eq 'HASH' && values %$task;
69
70    $task->{stamp} ||= time;
71
72    $self->_enqueue($task);
73}
74
75sub _enqueue {
76    my $self = shift;
77    my ($task) = @_;
78
79    my $fh = Test2::Harness::Util::File::JSONL->new(name => $self->{+FILE}, use_write_lock => 1);
80    $fh->write($task);
81}
82
831;
84
85__END__
86
87=pod
88
89=encoding UTF-8
90
91=head1 NAME
92
93Test2::Harness::Util::Queue - Representation of a queue.
94
95=head1 DESCRIPTION
96
97This module represents a queue, stored as a jsonl file.
98
99=head1 SYNOPSIS
100
101    use Test2::Harness::Util::Queue;
102
103    my $queue = Test2::Harness::Util::Queue->new(file => '/path/to/queue.jsonl');
104
105    $queue->start(); # Create the queue
106
107    $queue->enqueue({foo => 'bar', baz => 'bat'});
108    $queue->enqueue({foo => 'bar2', baz => 'bat2'});
109    ...
110
111    $queue->end();
112
113Then in another processs:
114
115    use Test2::Harness::Util::Queue;
116
117    my $queue = Test2::Harness::Util::Queue->new(file => '/path/to/queue.jsonl');
118
119    my @items;
120    while (1) {
121        @items = $queue->poll();
122        while (@items) {
123            my $item = shift @items or last;
124
125            ... process $item
126        }
127
128        # Queue ends with an 'undef' entry
129        last if @items && !defined($items[0]);
130    }
131
132=head1 METHODS
133
134=over 4
135
136=item $path = $queue->file
137
138The filename used for the queue
139
140=back
141
142=head2 READING
143
144=over 4
145
146=item $queue->reset()
147
148Restart reading the queue.
149
150=item @items = $queue->poll()
151
152Get more items from the queue. May need to call it multiple times, specially if
153another process is still writing to the queue.
154
155Returns an empty list if no items are available yet.
156
157Returns 'undef' to terminate the list.
158
159=item $bool = $queue->ended()
160
161Check if the queue has ended.
162
163=back
164
165=head1 WRITING
166
167=over 4
168
169=item $queue->start()
170
171Open the queue file for writing.
172
173=item $queue->enqueue(\%HASHREF)
174
175Add an item to the queue.
176
177=item $queue->end()
178
179Terminate the queue.
180
181=back
182
183=head1 SOURCE
184
185The source code repository for Test2-Harness can be found at
186F<http://github.com/Test-More/Test2-Harness/>.
187
188=head1 MAINTAINERS
189
190=over 4
191
192=item Chad Granum E<lt>exodist@cpan.orgE<gt>
193
194=back
195
196=head1 AUTHORS
197
198=over 4
199
200=item Chad Granum E<lt>exodist@cpan.orgE<gt>
201
202=back
203
204=head1 COPYRIGHT
205
206Copyright 2020 Chad Granum E<lt>exodist7@gmail.comE<gt>.
207
208This program is free software; you can redistribute it and/or
209modify it under the same terms as Perl itself.
210
211See F<http://dev.perl.org/licenses/>
212
213=cut
214