1package solver
2
3import (
4	"context"
5	"io"
6	"time"
7
8	"github.com/moby/buildkit/client"
9	"github.com/moby/buildkit/util/progress"
10	digest "github.com/opencontainers/go-digest"
11	"github.com/sirupsen/logrus"
12)
13
14func (j *Job) Status(ctx context.Context, ch chan *client.SolveStatus) error {
15	vs := &vertexStream{cache: map[digest.Digest]*client.Vertex{}}
16	pr := j.pr.Reader(ctx)
17	defer func() {
18		if enc := vs.encore(); len(enc) > 0 {
19			ch <- &client.SolveStatus{Vertexes: enc}
20		}
21		close(ch)
22	}()
23
24	for {
25		p, err := pr.Read(ctx)
26		if err != nil {
27			if err == io.EOF {
28				return nil
29			}
30			return err
31		}
32		ss := &client.SolveStatus{}
33		for _, p := range p {
34			switch v := p.Sys.(type) {
35			case client.Vertex:
36				ss.Vertexes = append(ss.Vertexes, vs.append(v)...)
37
38			case progress.Status:
39				vtx, ok := p.Meta("vertex")
40				if !ok {
41					logrus.Warnf("progress %s status without vertex info", p.ID)
42					continue
43				}
44				vs := &client.VertexStatus{
45					ID:        p.ID,
46					Vertex:    vtx.(digest.Digest),
47					Name:      v.Action,
48					Total:     int64(v.Total),
49					Current:   int64(v.Current),
50					Timestamp: p.Timestamp,
51					Started:   v.Started,
52					Completed: v.Completed,
53				}
54				ss.Statuses = append(ss.Statuses, vs)
55			case client.VertexLog:
56				vtx, ok := p.Meta("vertex")
57				if !ok {
58					logrus.Warnf("progress %s log without vertex info", p.ID)
59					continue
60				}
61				v.Vertex = vtx.(digest.Digest)
62				v.Timestamp = p.Timestamp
63				ss.Logs = append(ss.Logs, &v)
64			}
65		}
66		select {
67		case <-ctx.Done():
68			return ctx.Err()
69		case ch <- ss:
70		}
71	}
72}
73
74type vertexStream struct {
75	cache map[digest.Digest]*client.Vertex
76}
77
78func (vs *vertexStream) append(v client.Vertex) []*client.Vertex {
79	var out []*client.Vertex
80	vs.cache[v.Digest] = &v
81	if v.Started != nil {
82		for _, inp := range v.Inputs {
83			if inpv, ok := vs.cache[inp]; ok {
84				if !inpv.Cached && inpv.Completed == nil {
85					inpv.Cached = true
86					inpv.Started = v.Started
87					inpv.Completed = v.Started
88					out = append(out, vs.append(*inpv)...)
89					delete(vs.cache, inp)
90				}
91			}
92		}
93	}
94	vcopy := v
95	return append(out, &vcopy)
96}
97
98func (vs *vertexStream) encore() []*client.Vertex {
99	var out []*client.Vertex
100	for _, v := range vs.cache {
101		if v.Started != nil && v.Completed == nil {
102			now := time.Now()
103			v.Completed = &now
104			v.Error = context.Canceled.Error()
105			out = append(out, v)
106		}
107	}
108	return out
109}
110