1package Test2::Workflow::Runner; 2use strict; 3use warnings; 4 5our $VERSION = '0.000162'; 6 7use Test2::API(); 8use Test2::Todo(); 9use Test2::AsyncSubtest(); 10 11use Test2::Util qw/get_tid CAN_REALLY_FORK/; 12 13use Scalar::Util qw/blessed/; 14use Time::HiRes qw/sleep/; 15use List::Util qw/shuffle min/; 16use Carp qw/confess/; 17 18use Test2::Util::HashBase qw{ 19 stack no_fork no_threads max slots pid tid rand subtests filter 20}; 21 22use overload( 23 'fallback' => 1, 24 '&{}' => sub { 25 my $self = shift; 26 27 sub { 28 @_ = ($self); 29 goto &run; 30 } 31 }, 32); 33 34sub init { 35 my $self = shift; 36 37 $self->{+STACK} = []; 38 $self->{+SUBTESTS} = []; 39 40 $self->{+PID} = $$; 41 $self->{+TID} = get_tid(); 42 43 $self->{+NO_FORK} ||= $ENV{T2_WORKFLOW_NO_FORK} || !CAN_REALLY_FORK(); 44 45 my $can_thread = Test2::AsyncSubtest->CAN_REALLY_THREAD(); 46 my $should_thread = ($ENV{T2_WORKFLOW_USE_THREADS} || $ENV{T2_DO_THREAD_TESTS}) && !$ENV{T2_WORKFLOW_NO_THREADS}; 47 $self->{+NO_THREADS} ||= !($can_thread && $should_thread); 48 49 $self->{+RAND} = 1 unless defined $self->{+RAND}; 50 51 my @max = grep {defined $_} $self->{+MAX}, $ENV{T2_WORKFLOW_ASYNC}; 52 my $max = @max ? min(@max) : 3; 53 $self->{+MAX} = $max; 54 $self->{+SLOTS} = [] if $max; 55 56 unless(defined($self->{+FILTER})) { 57 if (my $raw = $ENV{T2_WORKFLOW}) { 58 my ($file, $line, $name); 59 if ($raw =~ m/^(.*)\s+(\d+)$/) { 60 ($file, $line) = ($1, $2); 61 } 62 elsif($raw =~ m/^(\d+)$/) { 63 $line = $1; 64 } 65 else { 66 $name = $raw; 67 } 68 69 $self->{+FILTER} = { 70 file => $file, 71 line => $line, 72 name => $name, 73 }; 74 } 75 } 76 77 if (my $task = delete $self->{task}) { 78 $self->push_task($task); 79 } 80} 81 82sub is_local { 83 my $self = shift; 84 return 0 unless $self->{+PID} == $$; 85 return 0 unless $self->{+TID} == get_tid(); 86 return 1; 87} 88 89sub send_event { 90 my $self = shift; 91 my ($type, %params) = @_; 92 93 my $class; 94 if ($type =~ m/\+(.*)$/) { 95 $class = $1; 96 } 97 else { 98 $class = "Test2::Event::$type"; 99 } 100 101 my $hub = Test2::API::test2_stack()->top(); 102 103 my $e = $class->new( 104 trace => Test2::Util::Trace->new( 105 frame => [caller(0)], 106 buffered => $hub->buffered, 107 nested => $hub->nested, 108 hid => $hub->hid, 109 huuid => $hub->uuid, 110 #cid => $self->{+CID}, 111 #uuid => $self->{+UUID}, 112 ), 113 114 %params, 115 ); 116 117 $hub->send($e); 118} 119 120sub current_subtest { 121 my $self = shift; 122 my $stack = $self->{+STACK} or return undef; 123 124 for my $state (reverse @$stack) { 125 next unless $state->{subtest}; 126 return $state->{subtest}; 127 } 128 129 return undef; 130} 131 132sub run { 133 my $self = shift; 134 135 my $stack = $self->stack; 136 137 my $c = 0; 138 while (@$stack) { 139 $self->cull; 140 141 my $state = $stack->[-1]; 142 my $task = $state->{task}; 143 144 unless($state->{started}++) { 145 my $skip = $task->skip; 146 147 my $filter; 148 if (my $f = $self->{+FILTER}) { 149 my $in_var = grep { $_->{filter_satisfied} } @$stack; 150 151 $filter = $task->filter($f) unless $in_var; 152 $state->{filter_satisfied} = 1 if $filter->{satisfied}; 153 } 154 155 $skip ||= $filter->{skip} if $filter; 156 157 if ($skip) { 158 $state->{ended}++; 159 $self->send_event( 160 'Skip', 161 reason => $skip || $filter, 162 name => $task->name, 163 pass => 1, 164 effective_pass => 1, 165 ); 166 pop @$stack; 167 next; 168 } 169 170 if ($task->flat) { 171 my $st = $self->current_subtest; 172 my $hub = $st ? $st->hub : Test2::API::test2_stack->top; 173 174 $state->{todo} = Test2::Todo->new(reason => $task->todo, hub => $hub) 175 if $task->todo; 176 177 $hub->send($_) for @{$task->events}; 178 } 179 else { 180 my $st = Test2::AsyncSubtest->new( 181 name => $task->name, 182 frame => $task->frame, 183 ); 184 $state->{subtest} = $st; 185 186 $state->{todo} = Test2::Todo->new(reason => $task->todo, hub => $st->hub) 187 if $task->todo; 188 189 for my $e (@{$task->events}) { 190 my $hub = $st->hub; 191 192 $e->trace->{buffered} = $hub->buffered; 193 $e->trace->{nested} = $hub->nested; 194 $e->trace->{hid} = $hub->hid; 195 $e->trace->{huuid} = $hub->uuid; 196 197 $hub->send($e); 198 } 199 200 my $slot = $self->isolate($state); 201 202 # if we forked/threaded then this state has ended here. 203 if (defined($slot)) { 204 push @{$self->{+SUBTESTS}} => [$st, $task] unless $st->finished; 205 $state->{subtest} = undef; 206 $state->{ended} = 1; 207 } 208 } 209 } 210 211 if ($state->{ended}) { 212 $state->{todo}->end() if $state->{todo}; 213 $state->{subtest}->stop() if $state->{subtest}; 214 215 return if $state->{in_thread}; 216 if(my $guard = delete $state->{in_fork}) { 217 $state->{subtest}->detach; 218 $guard->dismiss; 219 exit 0; 220 } 221 222 pop @$stack; 223 next; 224 } 225 226 if($state->{subtest} && !$state->{subtest_started}++) { 227 push @{$self->{+SUBTESTS}} => [$state->{subtest}, $task]; 228 $state->{subtest}->start(); 229 } 230 231 if ($task->isa('Test2::Workflow::Task::Action')) { 232 $state->{PID} = $$; 233 my $ok = eval { $task->code->($self); 1 }; 234 235 unless ($state->{PID} == $$) { 236 print STDERR "Task '" . $task->name . "' started in pid $state->{PID}, but ended in pid $$, did you forget to exit after forking?\n"; 237 exit 255; 238 } 239 240 $task->exception($@) unless $ok; 241 $state->{ended} = 1; 242 243 next; 244 } 245 246 if (!$state->{stage} || $state->{stage} eq 'BEFORE') { 247 $state->{before} = (defined $state->{before}) ? $state->{before} : 0; 248 249 if (my $add = $task->before->[$state->{before}++]) { 250 if ($add->around) { 251 $state->{PID} = $$; 252 my $ok = eval { $add->code->($self); 1 }; 253 my $err = $@; 254 my $complete = $state->{stage} && $state->{stage} eq 'AFTER'; 255 256 unless ($state->{PID} == $$) { 257 print STDERR "Task '" . $task->name . "' started in pid $state->{PID}, but ended in pid $$, did you forget to exit after forking?\n"; 258 exit 255; 259 } 260 261 unless($ok && $complete) { 262 $state->{ended} = 1; 263 $state->{stage} = 'AFTER'; 264 $task->exception($ok ? "'around' task failed to continue into the workflow chain.\n" : $err); 265 } 266 } 267 else { 268 $self->push_task($add); 269 } 270 } 271 else { 272 $state->{stage} = 'VARIANT'; 273 } 274 } 275 elsif ($state->{stage} eq 'VARIANT') { 276 if (my $v = $task->variant) { 277 $self->push_task($v); 278 } 279 $state->{stage} = 'PRIMARY'; 280 } 281 elsif ($state->{stage} eq 'PRIMARY') { 282 unless (defined $state->{order}) { 283 my $rand = defined($task->rand) ? $task->rand : $self->rand; 284 $state->{order} = [0 .. scalar(@{$task->primary}) - 1]; 285 @{$state->{order}} = shuffle(@{$state->{order}}) 286 if $rand; 287 } 288 my $num = shift @{$state->{order}}; 289 if (defined $num) { 290 $self->push_task($task->primary->[$num]); 291 } 292 else { 293 $state->{stage} = 'AFTER'; 294 } 295 } 296 elsif ($state->{stage} eq 'AFTER') { 297 $state->{after} = (defined $state->{after}) ? $state->{after} : 0; 298 if (my $add = $task->after->[$state->{after}++]) { 299 return if $add->around; 300 $self->push_task($add); 301 } 302 else { 303 $state->{ended} = 1; 304 } 305 } 306 } 307 308 $self->finish; 309} 310 311sub push_task { 312 my $self = shift; 313 my ($task) = @_; 314 315 confess "No Task!" unless $task; 316 confess "Bad Task ($task)!" unless blessed($task) && $task->isa('Test2::Workflow::Task'); 317 318 if ($task->isa('Test2::Workflow::Build')) { 319 confess "Can only push a Build instance when initializing the stack" 320 if @{$self->{+STACK}}; 321 $task = $task->compile(); 322 } 323 324 push @{$self->{+STACK}} => { 325 task => $task, 326 name => $task->name, 327 }; 328} 329 330sub add_mock { 331 my $self = shift; 332 my ($mock) = @_; 333 my $stack = $self->{+STACK}; 334 335 confess "Nothing on the stack!" 336 unless $stack && @$stack; 337 338 my ($state) = grep { !$_->{task}->scaffold} reverse @$stack; 339 push @{$state->{mocks}} => $mock; 340} 341 342sub isolate { 343 my $self = shift; 344 my ($state) = @_; 345 346 return if $state->{task}->skip; 347 348 my $iso = $state->{task}->iso; 349 my $async = $state->{task}->async; 350 351 # No need to isolate 352 return undef unless $iso || $async; 353 354 # Cannot isolate 355 unless($self->{+MAX} && $self->is_local) { 356 # async does not NEED to be isolated 357 return undef unless $iso; 358 } 359 360 # Wait for a slot, if max is set to 0 then we will not find a slot, instead 361 # we use '0'. We need to return a defined value to let the stack know that 362 # the task has ended. 363 my $slot = 0; 364 while($self->{+MAX} && $self->is_local) { 365 $self->cull; 366 for my $s (1 .. $self->{+MAX}) { 367 my $st = $self->{+SLOTS}->[$s]; 368 next if $st && !$st->finished; 369 $self->{+SLOTS}->[$s] = undef; 370 $slot = $s; 371 last; 372 } 373 last if $slot; 374 sleep(0.02); 375 } 376 377 my $st = $state->{subtest} 378 or confess "Cannot isolate a task without a subtest"; 379 380 if (!$self->no_fork) { 381 my $out = $st->fork; 382 if (blessed($out)) { 383 $state->{in_fork} = $out; 384 385 # drop back out to complete the task. 386 return undef; 387 } 388 else { 389 $self->send_event( 390 'Note', 391 message => "Forked PID $out to run: " . $state->{task}->name, 392 ); 393 $state->{pid} = $out; 394 } 395 } 396 elsif (!$self->no_threads) { 397 $state->{in_thread} = 1; 398 my $thr = $st->run_thread(\&run, $self); 399 $state->{thread} = $thr; 400 delete $state->{in_thread}; 401 $self->send_event( 402 'Note', 403 message => "Started Thread-ID " . $thr->tid . " to run: " . $state->{task}->name, 404 ); 405 } 406 else { 407 $st->finish(skip => "No isolation method available"); 408 return 0; 409 } 410 411 if($slot) { 412 $self->{+SLOTS}->[$slot] = $st; 413 } 414 else { 415 $st->finish; 416 } 417 418 return $slot; 419} 420 421sub cull { 422 my $self = shift; 423 424 my $subtests = delete $self->{+SUBTESTS} || return; 425 my @new; 426 427 # Cull subtests in reverse order, Nested subtests end before their parents. 428 for my $set (reverse @$subtests) { 429 my ($st, $task) = @$set; 430 next if $st->finished; 431 if (!$st->active && $st->ready) { 432 $st->finish(); 433 next; 434 } 435 436 # Use unshift to preserve order. 437 unshift @new => $set; 438 } 439 440 $self->{+SUBTESTS} = \@new; 441 442 return; 443} 444 445sub finish { 446 my $self = shift; 447 while(@{$self->{+SUBTESTS}}) { 448 $self->cull; 449 sleep(0.02) if @{$self->{+SUBTESTS}}; 450 } 451} 452 4531; 454 455__END__ 456 457=pod 458 459=encoding UTF-8 460 461=head1 NAME 462 463Test2::Workflow::Runner - Runs the workflows. 464 465=head1 SOURCE 466 467The source code repository for Test2-Workflow can be found at 468F<https://github.com/Test-More/Test2-Suite/>. 469 470=head1 MAINTAINERS 471 472=over 4 473 474=item Chad Granum E<lt>exodist@cpan.orgE<gt> 475 476=back 477 478=head1 AUTHORS 479 480=over 4 481 482=item Chad Granum E<lt>exodist@cpan.orgE<gt> 483 484=back 485 486=head1 COPYRIGHT 487 488Copyright 2018 Chad Granum E<lt>exodist7@gmail.comE<gt>. 489 490This program is free software; you can redistribute it and/or 491modify it under the same terms as Perl itself. 492 493See F<http://dev.perl.org/licenses/> 494 495=cut 496 497