1package Test2::Workflow::Runner;
2use strict;
3use warnings;
4
5our $VERSION = '0.000162';
6
7use Test2::API();
8use Test2::Todo();
9use Test2::AsyncSubtest();
10
11use Test2::Util qw/get_tid CAN_REALLY_FORK/;
12
13use Scalar::Util qw/blessed/;
14use Time::HiRes qw/sleep/;
15use List::Util qw/shuffle min/;
16use Carp qw/confess/;
17
18use Test2::Util::HashBase qw{
19    stack no_fork no_threads max slots pid tid rand subtests filter
20};
21
22use overload(
23    'fallback' => 1,
24    '&{}' => sub {
25        my $self = shift;
26
27        sub {
28            @_ = ($self);
29            goto &run;
30        }
31    },
32);
33
34sub init {
35    my $self = shift;
36
37    $self->{+STACK}    = [];
38    $self->{+SUBTESTS} = [];
39
40    $self->{+PID} = $$;
41    $self->{+TID} = get_tid();
42
43    $self->{+NO_FORK} ||= $ENV{T2_WORKFLOW_NO_FORK} || !CAN_REALLY_FORK();
44
45    my $can_thread = Test2::AsyncSubtest->CAN_REALLY_THREAD();
46    my $should_thread = ($ENV{T2_WORKFLOW_USE_THREADS} || $ENV{T2_DO_THREAD_TESTS}) && !$ENV{T2_WORKFLOW_NO_THREADS};
47    $self->{+NO_THREADS} ||= !($can_thread && $should_thread);
48
49    $self->{+RAND} = 1 unless defined $self->{+RAND};
50
51    my @max = grep {defined $_} $self->{+MAX}, $ENV{T2_WORKFLOW_ASYNC};
52    my $max = @max ? min(@max) : 3;
53    $self->{+MAX} = $max;
54    $self->{+SLOTS} = [] if $max;
55
56    unless(defined($self->{+FILTER})) {
57        if (my $raw = $ENV{T2_WORKFLOW}) {
58            my ($file, $line, $name);
59            if ($raw =~ m/^(.*)\s+(\d+)$/) {
60                ($file, $line) = ($1, $2);
61            }
62            elsif($raw =~ m/^(\d+)$/) {
63                $line = $1;
64            }
65            else {
66                $name = $raw;
67            }
68
69            $self->{+FILTER} = {
70                file => $file,
71                line => $line,
72                name => $name,
73            };
74        }
75    }
76
77    if (my $task = delete $self->{task}) {
78        $self->push_task($task);
79    }
80}
81
82sub is_local {
83    my $self = shift;
84    return 0 unless $self->{+PID} == $$;
85    return 0 unless $self->{+TID} == get_tid();
86    return 1;
87}
88
89sub send_event {
90    my $self = shift;
91    my ($type, %params) = @_;
92
93    my $class;
94    if ($type =~ m/\+(.*)$/) {
95        $class = $1;
96    }
97    else {
98        $class = "Test2::Event::$type";
99    }
100
101    my $hub = Test2::API::test2_stack()->top();
102
103    my $e = $class->new(
104        trace => Test2::Util::Trace->new(
105            frame => [caller(0)],
106            buffered => $hub->buffered,
107            nested   => $hub->nested,
108            hid      => $hub->hid,
109            huuid    => $hub->uuid,
110            #cid      => $self->{+CID},
111            #uuid     => $self->{+UUID},
112        ),
113
114        %params,
115    );
116
117    $hub->send($e);
118}
119
120sub current_subtest {
121    my $self = shift;
122    my $stack = $self->{+STACK} or return undef;
123
124    for my $state (reverse @$stack) {
125        next unless $state->{subtest};
126        return $state->{subtest};
127    }
128
129    return undef;
130}
131
132sub run {
133    my $self = shift;
134
135    my $stack = $self->stack;
136
137    my $c = 0;
138    while (@$stack) {
139        $self->cull;
140
141        my $state  = $stack->[-1];
142        my $task   = $state->{task};
143
144        unless($state->{started}++) {
145            my $skip = $task->skip;
146
147            my $filter;
148            if (my $f = $self->{+FILTER}) {
149                my $in_var = grep { $_->{filter_satisfied} } @$stack;
150
151                $filter = $task->filter($f) unless $in_var;
152                $state->{filter_satisfied} = 1 if $filter->{satisfied};
153            }
154
155            $skip ||= $filter->{skip} if $filter;
156
157            if ($skip) {
158                $state->{ended}++;
159                $self->send_event(
160                    'Skip',
161                    reason         => $skip || $filter,
162                    name           => $task->name,
163                    pass           => 1,
164                    effective_pass => 1,
165                );
166                pop @$stack;
167                next;
168            }
169
170            if ($task->flat) {
171                my $st = $self->current_subtest;
172                my $hub = $st ? $st->hub : Test2::API::test2_stack->top;
173
174                $state->{todo} = Test2::Todo->new(reason => $task->todo, hub => $hub)
175                    if $task->todo;
176
177                $hub->send($_) for @{$task->events};
178            }
179            else {
180                my $st = Test2::AsyncSubtest->new(
181                    name  => $task->name,
182                    frame => $task->frame,
183                );
184                $state->{subtest} = $st;
185
186                $state->{todo} = Test2::Todo->new(reason => $task->todo, hub => $st->hub)
187                    if $task->todo;
188
189                for my $e (@{$task->events}) {
190                    my $hub = $st->hub;
191
192                    $e->trace->{buffered} = $hub->buffered;
193                    $e->trace->{nested}   = $hub->nested;
194                    $e->trace->{hid}      = $hub->hid;
195                    $e->trace->{huuid}    = $hub->uuid;
196
197                    $hub->send($e);
198                }
199
200                my $slot = $self->isolate($state);
201
202                # if we forked/threaded then this state has ended here.
203                if (defined($slot)) {
204                    push @{$self->{+SUBTESTS}} => [$st, $task] unless $st->finished;
205                    $state->{subtest} = undef;
206                    $state->{ended} = 1;
207                }
208            }
209        }
210
211        if ($state->{ended}) {
212            $state->{todo}->end() if $state->{todo};
213            $state->{subtest}->stop() if $state->{subtest};
214
215            return if $state->{in_thread};
216            if(my $guard = delete $state->{in_fork}) {
217                $state->{subtest}->detach;
218                $guard->dismiss;
219                exit 0;
220            }
221
222            pop @$stack;
223            next;
224        }
225
226        if($state->{subtest} && !$state->{subtest_started}++) {
227            push @{$self->{+SUBTESTS}} => [$state->{subtest}, $task];
228            $state->{subtest}->start();
229        }
230
231        if ($task->isa('Test2::Workflow::Task::Action')) {
232            $state->{PID} = $$;
233            my $ok = eval { $task->code->($self); 1 };
234
235            unless ($state->{PID} == $$) {
236                print STDERR "Task '" . $task->name . "' started in pid $state->{PID}, but ended in pid $$, did you forget to exit after forking?\n";
237                exit 255;
238            }
239
240            $task->exception($@) unless $ok;
241            $state->{ended} = 1;
242
243            next;
244        }
245
246        if (!$state->{stage} || $state->{stage} eq 'BEFORE') {
247            $state->{before}  = (defined $state->{before}) ? $state->{before} : 0;
248
249            if (my $add = $task->before->[$state->{before}++]) {
250                if ($add->around) {
251                    $state->{PID} = $$;
252                    my $ok = eval { $add->code->($self); 1 };
253                    my $err = $@;
254                    my $complete = $state->{stage} && $state->{stage} eq 'AFTER';
255
256                    unless ($state->{PID} == $$) {
257                        print STDERR "Task '" . $task->name . "' started in pid $state->{PID}, but ended in pid $$, did you forget to exit after forking?\n";
258                        exit 255;
259                    }
260
261                    unless($ok && $complete) {
262                        $state->{ended} = 1;
263                        $state->{stage} = 'AFTER';
264                        $task->exception($ok ? "'around' task failed to continue into the workflow chain.\n" : $err);
265                    }
266                }
267                else {
268                    $self->push_task($add);
269                }
270            }
271            else {
272                $state->{stage} = 'VARIANT';
273            }
274        }
275        elsif ($state->{stage} eq 'VARIANT') {
276            if (my $v = $task->variant) {
277                $self->push_task($v);
278            }
279            $state->{stage} = 'PRIMARY';
280        }
281        elsif ($state->{stage} eq 'PRIMARY') {
282            unless (defined $state->{order}) {
283                my $rand = defined($task->rand) ? $task->rand : $self->rand;
284                $state->{order} = [0 .. scalar(@{$task->primary}) - 1];
285                @{$state->{order}} = shuffle(@{$state->{order}})
286                    if $rand;
287            }
288            my $num = shift @{$state->{order}};
289            if (defined $num) {
290                $self->push_task($task->primary->[$num]);
291            }
292            else {
293                $state->{stage} = 'AFTER';
294            }
295        }
296        elsif ($state->{stage} eq 'AFTER') {
297            $state->{after}  = (defined $state->{after}) ? $state->{after} : 0;
298            if (my $add = $task->after->[$state->{after}++]) {
299                return if $add->around;
300                $self->push_task($add);
301            }
302            else {
303                $state->{ended} = 1;
304            }
305        }
306    }
307
308    $self->finish;
309}
310
311sub push_task {
312    my $self = shift;
313    my ($task) = @_;
314
315    confess "No Task!" unless $task;
316    confess "Bad Task ($task)!" unless blessed($task) && $task->isa('Test2::Workflow::Task');
317
318    if ($task->isa('Test2::Workflow::Build')) {
319        confess "Can only push a Build instance when initializing the stack"
320            if @{$self->{+STACK}};
321        $task = $task->compile();
322    }
323
324    push @{$self->{+STACK}} => {
325        task => $task,
326        name => $task->name,
327    };
328}
329
330sub add_mock {
331    my $self = shift;
332    my ($mock) = @_;
333    my $stack = $self->{+STACK};
334
335    confess "Nothing on the stack!"
336        unless $stack && @$stack;
337
338    my ($state) = grep { !$_->{task}->scaffold} reverse @$stack;
339    push @{$state->{mocks}} => $mock;
340}
341
342sub isolate {
343    my $self = shift;
344    my ($state) = @_;
345
346    return if $state->{task}->skip;
347
348    my $iso   = $state->{task}->iso;
349    my $async = $state->{task}->async;
350
351    # No need to isolate
352    return undef unless $iso || $async;
353
354    # Cannot isolate
355    unless($self->{+MAX} && $self->is_local) {
356        # async does not NEED to be isolated
357        return undef unless $iso;
358    }
359
360    # Wait for a slot, if max is set to 0 then we will not find a slot, instead
361    # we use '0'.  We need to return a defined value to let the stack know that
362    # the task has ended.
363    my $slot = 0;
364    while($self->{+MAX} && $self->is_local) {
365        $self->cull;
366        for my $s (1 .. $self->{+MAX}) {
367            my $st = $self->{+SLOTS}->[$s];
368            next if $st && !$st->finished;
369            $self->{+SLOTS}->[$s] = undef;
370            $slot = $s;
371            last;
372        }
373        last if $slot;
374        sleep(0.02);
375    }
376
377    my $st = $state->{subtest}
378        or confess "Cannot isolate a task without a subtest";
379
380    if (!$self->no_fork) {
381        my $out = $st->fork;
382        if (blessed($out)) {
383            $state->{in_fork} = $out;
384
385            # drop back out to complete the task.
386            return undef;
387        }
388        else {
389            $self->send_event(
390                'Note',
391                message => "Forked PID $out to run: " . $state->{task}->name,
392            );
393            $state->{pid} = $out;
394        }
395    }
396    elsif (!$self->no_threads) {
397        $state->{in_thread} = 1;
398        my $thr = $st->run_thread(\&run, $self);
399        $state->{thread} = $thr;
400        delete $state->{in_thread};
401        $self->send_event(
402            'Note',
403            message => "Started Thread-ID " . $thr->tid . " to run: " . $state->{task}->name,
404        );
405    }
406    else {
407        $st->finish(skip => "No isolation method available");
408        return 0;
409    }
410
411    if($slot) {
412        $self->{+SLOTS}->[$slot] = $st;
413    }
414    else {
415        $st->finish;
416    }
417
418    return $slot;
419}
420
421sub cull {
422    my $self = shift;
423
424    my $subtests = delete $self->{+SUBTESTS} || return;
425    my @new;
426
427    # Cull subtests in reverse order, Nested subtests end before their parents.
428    for my $set (reverse @$subtests) {
429        my ($st, $task) = @$set;
430        next if $st->finished;
431        if (!$st->active && $st->ready) {
432            $st->finish();
433            next;
434        }
435
436        # Use unshift to preserve order.
437        unshift @new => $set;
438    }
439
440    $self->{+SUBTESTS} = \@new;
441
442    return;
443}
444
445sub finish {
446    my $self = shift;
447    while(@{$self->{+SUBTESTS}}) {
448        $self->cull;
449        sleep(0.02) if @{$self->{+SUBTESTS}};
450    }
451}
452
4531;
454
455__END__
456
457=pod
458
459=encoding UTF-8
460
461=head1 NAME
462
463Test2::Workflow::Runner - Runs the workflows.
464
465=head1 SOURCE
466
467The source code repository for Test2-Workflow can be found at
468F<https://github.com/Test-More/Test2-Suite/>.
469
470=head1 MAINTAINERS
471
472=over 4
473
474=item Chad Granum E<lt>exodist@cpan.orgE<gt>
475
476=back
477
478=head1 AUTHORS
479
480=over 4
481
482=item Chad Granum E<lt>exodist@cpan.orgE<gt>
483
484=back
485
486=head1 COPYRIGHT
487
488Copyright 2018 Chad Granum E<lt>exodist7@gmail.comE<gt>.
489
490This program is free software; you can redistribute it and/or
491modify it under the same terms as Perl itself.
492
493See F<http://dev.perl.org/licenses/>
494
495=cut
496
497