1class ParallelRunner
2  def initialize(files, processes, formatter, argv)
3    @files = files
4    @processes = processes
5    @formatter = formatter
6    @argv = argv
7    @last_files = {}
8    @output_files = []
9    @success = true
10  end
11
12  def launch_children
13    @children = @processes.times.map { |i|
14      name = tmp "mspec-multi-#{i}"
15      @output_files << name
16
17      env = {
18        "SPEC_TEMP_DIR" => "rubyspec_temp_#{i}",
19        "MSPEC_MULTI" => i.to_s
20      }
21      command = @argv + ["-fy", "-o", name]
22      $stderr.puts "$ #{command.join(' ')}" if $MSPEC_DEBUG
23      IO.popen([env, *command, close_others: false], "rb+")
24    }
25  end
26
27  def handle(child, message)
28    case message
29    when '.'
30      @formatter.unload
31      send_new_file_or_quit(child)
32    else
33      if message == nil
34        msg = "A child mspec-run process died unexpectedly"
35      else
36        msg = "A child mspec-run process printed unexpected output on STDOUT"
37        while chunk = (child.read_nonblock(4096) rescue nil)
38          message += chunk
39        end
40        message.chomp!('.')
41        msg += ": #{message.inspect}"
42      end
43
44      if last_file = @last_files[child]
45        msg += " while running #{last_file}"
46      end
47
48      @success = false
49      quit(child)
50      abort "\n#{msg}"
51    end
52  end
53
54  def quit(child)
55    begin
56      child.puts "QUIT"
57    rescue Errno::EPIPE
58      # The child process already died
59    end
60    _pid, status = Process.wait2(child.pid)
61    @success &&= status.success?
62    child.close
63    @children.delete(child)
64  end
65
66  def send_new_file_or_quit(child)
67    if @files.empty?
68      quit(child)
69    else
70      file = @files.shift
71      @last_files[child] = file
72      child.puts file
73    end
74  end
75
76  def run
77    MSpec.register_files @files
78    launch_children
79
80    puts @children.map { |child| child.gets }.uniq
81    @formatter.start
82    begin
83      @children.each { |child| send_new_file_or_quit(child) }
84
85      until @children.empty?
86        IO.select(@children)[0].each { |child|
87          handle(child, child.read(1))
88        }
89      end
90    ensure
91      @children.dup.each { |child| quit(child) }
92      @formatter.aggregate_results(@output_files)
93      @formatter.finish
94    end
95
96    @success
97  end
98end
99