1package buildkit 2 3import ( 4 "context" 5 "encoding/json" 6 "io" 7 "strings" 8 "sync" 9 "time" 10 11 "github.com/containerd/containerd/content" 12 "github.com/docker/docker/api/types" 13 "github.com/docker/docker/api/types/backend" 14 "github.com/docker/docker/builder" 15 "github.com/docker/docker/daemon/images" 16 "github.com/docker/docker/pkg/jsonmessage" 17 controlapi "github.com/moby/buildkit/api/services/control" 18 "github.com/moby/buildkit/control" 19 "github.com/moby/buildkit/identity" 20 "github.com/moby/buildkit/session" 21 "github.com/moby/buildkit/util/tracing" 22 "github.com/pkg/errors" 23 "golang.org/x/sync/errgroup" 24 grpcmetadata "google.golang.org/grpc/metadata" 25) 26 27// Opt is option struct required for creating the builder 28type Opt struct { 29 SessionManager *session.Manager 30 Root string 31 Dist images.DistributionServices 32} 33 34// Builder can build using BuildKit backend 35type Builder struct { 36 controller *control.Controller 37 reqBodyHandler *reqBodyHandler 38 39 mu sync.Mutex 40 jobs map[string]*buildJob 41} 42 43// New creates a new builder 44func New(opt Opt) (*Builder, error) { 45 reqHandler := newReqBodyHandler(tracing.DefaultTransport) 46 47 c, err := newController(reqHandler, opt) 48 if err != nil { 49 return nil, err 50 } 51 b := &Builder{ 52 controller: c, 53 reqBodyHandler: reqHandler, 54 jobs: map[string]*buildJob{}, 55 } 56 return b, nil 57} 58 59// Cancel cancels a build using ID 60func (b *Builder) Cancel(ctx context.Context, id string) error { 61 b.mu.Lock() 62 if j, ok := b.jobs[id]; ok && j.cancel != nil { 63 j.cancel() 64 } 65 b.mu.Unlock() 66 return nil 67} 68 69// DiskUsage returns a report about space used by build cache 70func (b *Builder) DiskUsage(ctx context.Context) ([]*types.BuildCache, error) { 71 duResp, err := b.controller.DiskUsage(ctx, &controlapi.DiskUsageRequest{}) 72 if err != nil { 73 return nil, err 74 } 75 76 var items []*types.BuildCache 77 for _, r := range duResp.Record { 78 items = append(items, &types.BuildCache{ 79 ID: r.ID, 80 Mutable: r.Mutable, 81 InUse: r.InUse, 82 Size: r.Size_, 83 84 CreatedAt: r.CreatedAt, 85 LastUsedAt: r.LastUsedAt, 86 UsageCount: int(r.UsageCount), 87 Parent: r.Parent, 88 Description: r.Description, 89 }) 90 } 91 return items, nil 92} 93 94// Prune clears all reclaimable build cache 95func (b *Builder) Prune(ctx context.Context) (int64, error) { 96 ch := make(chan *controlapi.UsageRecord) 97 98 eg, ctx := errgroup.WithContext(ctx) 99 100 eg.Go(func() error { 101 defer close(ch) 102 return b.controller.Prune(&controlapi.PruneRequest{}, &pruneProxy{ 103 streamProxy: streamProxy{ctx: ctx}, 104 ch: ch, 105 }) 106 }) 107 108 var size int64 109 eg.Go(func() error { 110 for r := range ch { 111 size += r.Size_ 112 } 113 return nil 114 }) 115 116 if err := eg.Wait(); err != nil { 117 return 0, err 118 } 119 120 return size, nil 121} 122 123// Build executes a build request 124func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) { 125 var rc = opt.Source 126 127 if buildID := opt.Options.BuildID; buildID != "" { 128 b.mu.Lock() 129 130 upload := false 131 if strings.HasPrefix(buildID, "upload-request:") { 132 upload = true 133 buildID = strings.TrimPrefix(buildID, "upload-request:") 134 } 135 136 if _, ok := b.jobs[buildID]; !ok { 137 b.jobs[buildID] = newBuildJob() 138 } 139 j := b.jobs[buildID] 140 var cancel func() 141 ctx, cancel = context.WithCancel(ctx) 142 j.cancel = cancel 143 b.mu.Unlock() 144 145 if upload { 146 ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) 147 defer cancel() 148 err := j.SetUpload(ctx2, rc) 149 return nil, err 150 } 151 152 if remoteContext := opt.Options.RemoteContext; remoteContext == "upload-request" { 153 ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) 154 defer cancel() 155 var err error 156 rc, err = j.WaitUpload(ctx2) 157 if err != nil { 158 return nil, err 159 } 160 opt.Options.RemoteContext = "" 161 } 162 163 defer func() { 164 delete(b.jobs, buildID) 165 }() 166 } 167 168 var out builder.Result 169 170 id := identity.NewID() 171 172 frontendAttrs := map[string]string{} 173 174 if opt.Options.Target != "" { 175 frontendAttrs["target"] = opt.Options.Target 176 } 177 178 if opt.Options.Dockerfile != "" && opt.Options.Dockerfile != "." { 179 frontendAttrs["filename"] = opt.Options.Dockerfile 180 } 181 182 if opt.Options.RemoteContext != "" { 183 if opt.Options.RemoteContext != "client-session" { 184 frontendAttrs["context"] = opt.Options.RemoteContext 185 } 186 } else { 187 url, cancel := b.reqBodyHandler.newRequest(rc) 188 defer cancel() 189 frontendAttrs["context"] = url 190 } 191 192 cacheFrom := append([]string{}, opt.Options.CacheFrom...) 193 194 frontendAttrs["cache-from"] = strings.Join(cacheFrom, ",") 195 196 for k, v := range opt.Options.BuildArgs { 197 if v == nil { 198 continue 199 } 200 frontendAttrs["build-arg:"+k] = *v 201 } 202 203 for k, v := range opt.Options.Labels { 204 frontendAttrs["label:"+k] = v 205 } 206 207 if opt.Options.NoCache { 208 frontendAttrs["no-cache"] = "" 209 } 210 211 exporterAttrs := map[string]string{} 212 213 if len(opt.Options.Tags) > 0 { 214 exporterAttrs["name"] = strings.Join(opt.Options.Tags, ",") 215 } 216 217 req := &controlapi.SolveRequest{ 218 Ref: id, 219 Exporter: "moby", 220 ExporterAttrs: exporterAttrs, 221 Frontend: "dockerfile.v0", 222 FrontendAttrs: frontendAttrs, 223 Session: opt.Options.SessionID, 224 } 225 226 eg, ctx := errgroup.WithContext(ctx) 227 228 eg.Go(func() error { 229 resp, err := b.controller.Solve(ctx, req) 230 if err != nil { 231 return err 232 } 233 id, ok := resp.ExporterResponse["containerimage.digest"] 234 if !ok { 235 return errors.Errorf("missing image id") 236 } 237 out.ImageID = id 238 return nil 239 }) 240 241 ch := make(chan *controlapi.StatusResponse) 242 243 eg.Go(func() error { 244 defer close(ch) 245 return b.controller.Status(&controlapi.StatusRequest{ 246 Ref: id, 247 }, &statusProxy{streamProxy: streamProxy{ctx: ctx}, ch: ch}) 248 }) 249 250 eg.Go(func() error { 251 for sr := range ch { 252 dt, err := sr.Marshal() 253 if err != nil { 254 return err 255 } 256 257 auxJSONBytes, err := json.Marshal(dt) 258 if err != nil { 259 return err 260 } 261 auxJSON := new(json.RawMessage) 262 *auxJSON = auxJSONBytes 263 msgJSON, err := json.Marshal(&jsonmessage.JSONMessage{ID: "moby.buildkit.trace", Aux: auxJSON}) 264 if err != nil { 265 return err 266 } 267 msgJSON = append(msgJSON, []byte("\r\n")...) 268 n, err := opt.ProgressWriter.Output.Write(msgJSON) 269 if err != nil { 270 return err 271 } 272 if n != len(msgJSON) { 273 return io.ErrShortWrite 274 } 275 } 276 return nil 277 }) 278 279 if err := eg.Wait(); err != nil { 280 return nil, err 281 } 282 283 return &out, nil 284} 285 286type streamProxy struct { 287 ctx context.Context 288} 289 290func (sp *streamProxy) SetHeader(_ grpcmetadata.MD) error { 291 return nil 292} 293 294func (sp *streamProxy) SendHeader(_ grpcmetadata.MD) error { 295 return nil 296} 297 298func (sp *streamProxy) SetTrailer(_ grpcmetadata.MD) { 299} 300 301func (sp *streamProxy) Context() context.Context { 302 return sp.ctx 303} 304func (sp *streamProxy) RecvMsg(m interface{}) error { 305 return io.EOF 306} 307 308type statusProxy struct { 309 streamProxy 310 ch chan *controlapi.StatusResponse 311} 312 313func (sp *statusProxy) Send(resp *controlapi.StatusResponse) error { 314 return sp.SendMsg(resp) 315} 316func (sp *statusProxy) SendMsg(m interface{}) error { 317 if sr, ok := m.(*controlapi.StatusResponse); ok { 318 sp.ch <- sr 319 } 320 return nil 321} 322 323type pruneProxy struct { 324 streamProxy 325 ch chan *controlapi.UsageRecord 326} 327 328func (sp *pruneProxy) Send(resp *controlapi.UsageRecord) error { 329 return sp.SendMsg(resp) 330} 331func (sp *pruneProxy) SendMsg(m interface{}) error { 332 if sr, ok := m.(*controlapi.UsageRecord); ok { 333 sp.ch <- sr 334 } 335 return nil 336} 337 338type contentStoreNoLabels struct { 339 content.Store 340} 341 342func (c *contentStoreNoLabels) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { 343 return content.Info{}, nil 344} 345 346type wrapRC struct { 347 io.ReadCloser 348 once sync.Once 349 err error 350 waitCh chan struct{} 351} 352 353func (w *wrapRC) Read(b []byte) (int, error) { 354 n, err := w.ReadCloser.Read(b) 355 if err != nil { 356 e := err 357 if e == io.EOF { 358 e = nil 359 } 360 w.close(e) 361 } 362 return n, err 363} 364 365func (w *wrapRC) Close() error { 366 err := w.ReadCloser.Close() 367 w.close(err) 368 return err 369} 370 371func (w *wrapRC) close(err error) { 372 w.once.Do(func() { 373 w.err = err 374 close(w.waitCh) 375 }) 376} 377 378func (w *wrapRC) wait() error { 379 <-w.waitCh 380 return w.err 381} 382 383type buildJob struct { 384 cancel func() 385 waitCh chan func(io.ReadCloser) error 386} 387 388func newBuildJob() *buildJob { 389 return &buildJob{waitCh: make(chan func(io.ReadCloser) error)} 390} 391 392func (j *buildJob) WaitUpload(ctx context.Context) (io.ReadCloser, error) { 393 done := make(chan struct{}) 394 395 var upload io.ReadCloser 396 fn := func(rc io.ReadCloser) error { 397 w := &wrapRC{ReadCloser: rc, waitCh: make(chan struct{})} 398 upload = w 399 close(done) 400 return w.wait() 401 } 402 403 select { 404 case <-ctx.Done(): 405 return nil, ctx.Err() 406 case j.waitCh <- fn: 407 <-done 408 return upload, nil 409 } 410} 411 412func (j *buildJob) SetUpload(ctx context.Context, rc io.ReadCloser) error { 413 select { 414 case <-ctx.Done(): 415 return ctx.Err() 416 case fn := <-j.waitCh: 417 return fn(rc) 418 } 419} 420