1package solver 2 3import ( 4 "context" 5 "io" 6 "time" 7 8 "github.com/moby/buildkit/client" 9 "github.com/moby/buildkit/util/progress" 10 digest "github.com/opencontainers/go-digest" 11 "github.com/sirupsen/logrus" 12) 13 14func (j *Job) Status(ctx context.Context, ch chan *client.SolveStatus) error { 15 vs := &vertexStream{cache: map[digest.Digest]*client.Vertex{}, wasCached: make(map[digest.Digest]struct{})} 16 pr := j.pr.Reader(ctx) 17 defer func() { 18 if enc := vs.encore(); len(enc) > 0 { 19 ch <- &client.SolveStatus{Vertexes: enc} 20 } 21 close(ch) 22 }() 23 24 for { 25 p, err := pr.Read(ctx) 26 if err != nil { 27 if err == io.EOF { 28 return nil 29 } 30 return err 31 } 32 ss := &client.SolveStatus{} 33 for _, p := range p { 34 switch v := p.Sys.(type) { 35 case client.Vertex: 36 ss.Vertexes = append(ss.Vertexes, vs.append(v)...) 37 38 case progress.Status: 39 vtx, ok := p.Meta("vertex") 40 if !ok { 41 logrus.Warnf("progress %s status without vertex info", p.ID) 42 continue 43 } 44 vs := &client.VertexStatus{ 45 ID: p.ID, 46 Vertex: vtx.(digest.Digest), 47 Name: v.Action, 48 Total: int64(v.Total), 49 Current: int64(v.Current), 50 Timestamp: p.Timestamp, 51 Started: v.Started, 52 Completed: v.Completed, 53 } 54 ss.Statuses = append(ss.Statuses, vs) 55 case client.VertexLog: 56 vtx, ok := p.Meta("vertex") 57 if !ok { 58 logrus.Warnf("progress %s log without vertex info", p.ID) 59 continue 60 } 61 v.Vertex = vtx.(digest.Digest) 62 v.Timestamp = p.Timestamp 63 ss.Logs = append(ss.Logs, &v) 64 } 65 } 66 select { 67 case <-ctx.Done(): 68 return ctx.Err() 69 case ch <- ss: 70 } 71 } 72} 73 74type vertexStream struct { 75 cache map[digest.Digest]*client.Vertex 76 wasCached map[digest.Digest]struct{} 77} 78 79func (vs *vertexStream) append(v client.Vertex) []*client.Vertex { 80 var out []*client.Vertex 81 vs.cache[v.Digest] = &v 82 if v.Started != nil { 83 for _, inp := range v.Inputs { 84 if inpv, ok := vs.cache[inp]; ok { 85 if !inpv.Cached && inpv.Completed == nil { 86 inpv.Cached = true 87 inpv.Started = v.Started 88 inpv.Completed = v.Started 89 out = append(out, vs.append(*inpv)...) 90 delete(vs.cache, inp) 91 } 92 } 93 } 94 } 95 if v.Cached { 96 vs.markCached(v.Digest) 97 } 98 99 vcopy := v 100 return append(out, &vcopy) 101} 102 103func (vs *vertexStream) markCached(dgst digest.Digest) { 104 if v, ok := vs.cache[dgst]; ok { 105 if _, ok := vs.wasCached[dgst]; !ok { 106 for _, inp := range v.Inputs { 107 vs.markCached(inp) 108 } 109 } 110 vs.wasCached[dgst] = struct{}{} 111 } 112} 113 114func (vs *vertexStream) encore() []*client.Vertex { 115 var out []*client.Vertex 116 for _, v := range vs.cache { 117 if v.Started != nil && v.Completed == nil { 118 now := time.Now() 119 v.Completed = &now 120 if _, ok := vs.wasCached[v.Digest]; !ok && v.Error == "" { 121 v.Error = context.Canceled.Error() 122 } 123 out = append(out, v) 124 } 125 } 126 return out 127} 128