1package solver 2 3import ( 4 "context" 5 "io" 6 "time" 7 8 "github.com/moby/buildkit/client" 9 "github.com/moby/buildkit/util/progress" 10 digest "github.com/opencontainers/go-digest" 11 "github.com/sirupsen/logrus" 12) 13 14func (j *Job) Status(ctx context.Context, ch chan *client.SolveStatus) error { 15 vs := &vertexStream{cache: map[digest.Digest]*client.Vertex{}} 16 pr := j.pr.Reader(ctx) 17 defer func() { 18 if enc := vs.encore(); len(enc) > 0 { 19 ch <- &client.SolveStatus{Vertexes: enc} 20 } 21 close(ch) 22 }() 23 24 for { 25 p, err := pr.Read(ctx) 26 if err != nil { 27 if err == io.EOF { 28 return nil 29 } 30 return err 31 } 32 ss := &client.SolveStatus{} 33 for _, p := range p { 34 switch v := p.Sys.(type) { 35 case client.Vertex: 36 ss.Vertexes = append(ss.Vertexes, vs.append(v)...) 37 38 case progress.Status: 39 vtx, ok := p.Meta("vertex") 40 if !ok { 41 logrus.Warnf("progress %s status without vertex info", p.ID) 42 continue 43 } 44 vs := &client.VertexStatus{ 45 ID: p.ID, 46 Vertex: vtx.(digest.Digest), 47 Name: v.Action, 48 Total: int64(v.Total), 49 Current: int64(v.Current), 50 Timestamp: p.Timestamp, 51 Started: v.Started, 52 Completed: v.Completed, 53 } 54 ss.Statuses = append(ss.Statuses, vs) 55 case client.VertexLog: 56 vtx, ok := p.Meta("vertex") 57 if !ok { 58 logrus.Warnf("progress %s log without vertex info", p.ID) 59 continue 60 } 61 v.Vertex = vtx.(digest.Digest) 62 v.Timestamp = p.Timestamp 63 ss.Logs = append(ss.Logs, &v) 64 } 65 } 66 select { 67 case <-ctx.Done(): 68 return ctx.Err() 69 case ch <- ss: 70 } 71 } 72} 73 74type vertexStream struct { 75 cache map[digest.Digest]*client.Vertex 76} 77 78func (vs *vertexStream) append(v client.Vertex) []*client.Vertex { 79 var out []*client.Vertex 80 vs.cache[v.Digest] = &v 81 if v.Started != nil { 82 for _, inp := range v.Inputs { 83 if inpv, ok := vs.cache[inp]; ok { 84 if !inpv.Cached && inpv.Completed == nil { 85 inpv.Cached = true 86 inpv.Started = v.Started 87 inpv.Completed = v.Started 88 out = append(out, vs.append(*inpv)...) 89 delete(vs.cache, inp) 90 } 91 } 92 } 93 } 94 vcopy := v 95 return append(out, &vcopy) 96} 97 98func (vs *vertexStream) encore() []*client.Vertex { 99 var out []*client.Vertex 100 for _, v := range vs.cache { 101 if v.Started != nil && v.Completed == nil { 102 now := time.Now() 103 v.Completed = &now 104 v.Error = context.Canceled.Error() 105 out = append(out, v) 106 } 107 } 108 return out 109} 110