1// Copyright 2017 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package gps 6 7import ( 8 "context" 9 "fmt" 10 "io/ioutil" 11 "log" 12 "net/url" 13 "os" 14 "os/signal" 15 "path/filepath" 16 "runtime" 17 "strings" 18 "sync" 19 "sync/atomic" 20 "time" 21 22 "github.com/golang/dep/gps/pkgtree" 23 "github.com/golang/dep/internal/fs" 24 "github.com/nightlyone/lockfile" 25 "github.com/pkg/errors" 26 "github.com/sdboyer/constext" 27) 28 29// Used to compute a friendly filepath from a URL-shaped input. 30var sanitizer = strings.NewReplacer("-", "--", ":", "-", "/", "-", "+", "-") 31 32// A locker is responsible for preventing multiple instances of dep from 33// interfering with one-another. 34// 35// Currently, anything that can either TryLock(), Unlock(), or GetOwner() 36// satisfies that need. 37type locker interface { 38 TryLock() error 39 Unlock() error 40 GetOwner() (*os.Process, error) 41} 42 43// A falselocker adheres to the locker interface and its purpose is to quietly 44// fail to lock when the DEPNOLOCK environment variable is set. 45// 46// This allows dep to run on systems where file locking doesn't work -- 47// particularly those that use union mount type filesystems that don't 48// implement hard links or fnctl() style locking. 49type falseLocker struct{} 50 51// Always returns an error to indicate there's no current ower PID for our 52// lock. 53func (fl falseLocker) GetOwner() (*os.Process, error) { 54 return nil, fmt.Errorf("falseLocker always fails") 55} 56 57// Does nothing and returns a nil error so caller believes locking succeeded. 58func (fl falseLocker) TryLock() error { 59 return nil 60} 61 62// Does nothing and returns a nil error so caller believes unlocking succeeded. 63func (fl falseLocker) Unlock() error { 64 return nil 65} 66 67// A SourceManager is responsible for retrieving, managing, and interrogating 68// source repositories. Its primary purpose is to serve the needs of a Solver, 69// but it is handy for other purposes, as well. 70// 71// gps's built-in SourceManager, SourceMgr, is intended to be generic and 72// sufficient for any purpose. It provides some additional semantics around the 73// methods defined here. 74type SourceManager interface { 75 // SourceExists checks if a repository exists, either upstream or in the 76 // SourceManager's central repository cache. 77 SourceExists(ProjectIdentifier) (bool, error) 78 79 // SyncSourceFor will attempt to bring all local information about a source 80 // fully up to date. 81 SyncSourceFor(ProjectIdentifier) error 82 83 // ListVersions retrieves a list of the available versions for a given 84 // repository name. 85 ListVersions(ProjectIdentifier) ([]PairedVersion, error) 86 87 // RevisionPresentIn indicates whether the provided Version is present in 88 // the given repository. 89 RevisionPresentIn(ProjectIdentifier, Revision) (bool, error) 90 91 // ListPackages parses the tree of the Go packages at or below root of the 92 // provided ProjectIdentifier, at the provided version. 93 ListPackages(ProjectIdentifier, Version) (pkgtree.PackageTree, error) 94 95 // GetManifestAndLock returns manifest and lock information for the provided 96 // root import path. 97 // 98 // gps currently requires that projects be rooted at their repository root, 99 // necessitating that the ProjectIdentifier's ProjectRoot must also be a 100 // repository root. 101 GetManifestAndLock(ProjectIdentifier, Version, ProjectAnalyzer) (Manifest, Lock, error) 102 103 // ExportProject writes out the tree of the provided import path, at the 104 // provided version, to the provided directory. 105 ExportProject(context.Context, ProjectIdentifier, Version, string) error 106 107 // ExportPrunedProject writes out the tree corresponding to the provided 108 // LockedProject, the provided version, to the provided directory, applying 109 // the provided pruning options. 110 // 111 // The first return value is the hex-encoded string representation of the 112 // hash, including colon-separated leaders indicating the version of the 113 // hashing function used, and the prune options that were applied. 114 ExportPrunedProject(context.Context, LockedProject, PruneOptions, string) error 115 116 // DeduceProjectRoot takes an import path and deduces the corresponding 117 // project/source root. 118 DeduceProjectRoot(ip string) (ProjectRoot, error) 119 120 // SourceURLsForPath takes an import path and deduces the set of source URLs 121 // that may refer to a canonical upstream source. 122 // In general, these URLs differ only by protocol (e.g. https vs. ssh), not path 123 SourceURLsForPath(ip string) ([]*url.URL, error) 124 125 // Release lets go of any locks held by the SourceManager. Once called, it 126 // is no longer allowed to call methods of that SourceManager; all 127 // method calls will immediately result in errors. 128 Release() 129 130 // InferConstraint tries to puzzle out what kind of version is given in a string - 131 // semver, a revision, or as a fallback, a plain tag 132 InferConstraint(s string, pi ProjectIdentifier) (Constraint, error) 133} 134 135// A ProjectAnalyzer is responsible for analyzing a given path for Manifest and 136// Lock information. Tools relying on gps must implement one. 137type ProjectAnalyzer interface { 138 // Perform analysis of the filesystem tree rooted at path, with the 139 // root import path importRoot, to determine the project's constraints, as 140 // indicated by a Manifest and Lock. 141 // 142 // Note that an error will typically cause the solver to treat the analyzed 143 // version as unusable. As such, an error should generally only be returned 144 // if the code tree is somehow malformed, but not if the implementor's 145 // expected files containing Manifest and Lock data are merely absent. 146 DeriveManifestAndLock(path string, importRoot ProjectRoot) (Manifest, Lock, error) 147 148 // Info reports this project analyzer's info. 149 Info() ProjectAnalyzerInfo 150} 151 152// ProjectAnalyzerInfo indicates a ProjectAnalyzer's name and version. 153type ProjectAnalyzerInfo struct { 154 Name string 155 Version int 156} 157 158// String returns a string like: "<name>.<decimal version>" 159func (p ProjectAnalyzerInfo) String() string { 160 return fmt.Sprintf("%s.%d", p.Name, p.Version) 161} 162 163// SourceMgr is the default SourceManager for gps. 164// 165// There's no (planned) reason why it would need to be reimplemented by other 166// tools; control via dependency injection is intended to be sufficient. 167type SourceMgr struct { 168 cachedir string // path to root of cache dir 169 lf locker // handle for the sm lock file on disk 170 suprvsr *supervisor // subsystem that supervises running calls/io 171 cancelAll context.CancelFunc // cancel func to kill all running work 172 deduceCoord *deductionCoordinator // subsystem that manages import path deduction 173 srcCoord *sourceCoordinator // subsystem that manages sources 174 sigmut sync.Mutex // mutex protecting signal handling setup/teardown 175 qch chan struct{} // quit chan for signal handler 176 relonce sync.Once // once-er to ensure we only release once 177 releasing int32 // flag indicating release of sm has begun 178} 179 180var _ SourceManager = &SourceMgr{} 181 182// ErrSourceManagerIsReleased is the error returned by any SourceManager method 183// called after the SourceManager has been released, rendering its methods no 184// longer safe to call. 185var ErrSourceManagerIsReleased = fmt.Errorf("this SourceManager has been released, its methods can no longer be called") 186 187// SourceManagerConfig holds configuration information for creating SourceMgrs. 188type SourceManagerConfig struct { 189 CacheAge time.Duration // Maximum valid age of cached data. <=0: Don't cache. 190 Cachedir string // Where to store local instances of upstream sources. 191 Logger *log.Logger // Optional info/warn logger. Discards if nil. 192 DisableLocking bool // True if the SourceManager should NOT use a lock file to protect the Cachedir from multiple processes. 193} 194 195// NewSourceManager produces an instance of gps's built-in SourceManager. 196// 197// The returned SourceManager aggressively caches information wherever possible. 198// If tools need to do preliminary work involving upstream repository analysis 199// prior to invoking a solve run, it is recommended that they create this 200// SourceManager as early as possible and use it to their ends. That way, the 201// solver can benefit from any caches that may have already been warmed. 202// 203// A cacheEpoch is calculated from now()-cacheAge, and older persistent cache data 204// is discarded. When cacheAge is <= 0, the persistent cache is 205// not used. 206// 207// gps's SourceManager is intended to be threadsafe (if it's not, please file a 208// bug!). It should be safe to reuse across concurrent solving runs, even on 209// unrelated projects. 210func NewSourceManager(c SourceManagerConfig) (*SourceMgr, error) { 211 if c.Logger == nil { 212 c.Logger = log.New(ioutil.Discard, "", 0) 213 } 214 215 err := fs.EnsureDir(filepath.Join(c.Cachedir, "sources"), 0777) 216 if err != nil { 217 return nil, err 218 } 219 220 // Fix for #820 221 // 222 // Consult https://godoc.org/github.com/nightlyone/lockfile for the lockfile 223 // behaviour. It's magic. It deals with stale processes, and if there is 224 // a process keeping the lock busy, it will pass back a temporary error that 225 // we can spin on. 226 227 glpath := filepath.Join(c.Cachedir, "sm.lock") 228 229 lockfile, err := func() (locker, error) { 230 if c.DisableLocking { 231 return falseLocker{}, nil 232 } 233 return lockfile.New(glpath) 234 }() 235 236 if err != nil { 237 return nil, CouldNotCreateLockError{ 238 Path: glpath, 239 Err: errors.Wrapf(err, "unable to create lock %s", glpath), 240 } 241 } 242 243 process, err := lockfile.GetOwner() 244 if err == nil { 245 // If we didn't get an error, then the lockfile exists already. We should 246 // check to see if it's us already: 247 if process.Pid == os.Getpid() { 248 return nil, CouldNotCreateLockError{ 249 Path: glpath, 250 Err: fmt.Errorf("lockfile %s already locked by this process", glpath), 251 } 252 } 253 254 // There is a lockfile, but it's owned by someone else. We'll try to lock 255 // it anyway. 256 } 257 258 // If it's a TemporaryError, we retry every second. Otherwise, we fail 259 // permanently. 260 // 261 // TODO: #534 needs to be implemented to provide a better way to log warnings, 262 // but until then we will just use stderr. 263 264 // Implicit Time of 0. 265 var lasttime time.Time 266 err = lockfile.TryLock() 267 for err != nil { 268 nowtime := time.Now() 269 duration := nowtime.Sub(lasttime) 270 271 // The first time this is evaluated, duration will be very large as lasttime is 0. 272 // Unless time travel is invented and someone travels back to the year 1, we should 273 // be ok. 274 if duration > 15*time.Second { 275 fmt.Fprintf(os.Stderr, "waiting for lockfile %s: %s\n", glpath, err.Error()) 276 lasttime = nowtime 277 } 278 279 if t, ok := err.(interface { 280 Temporary() bool 281 }); ok && t.Temporary() { 282 time.Sleep(time.Second * 1) 283 } else { 284 return nil, CouldNotCreateLockError{ 285 Path: glpath, 286 Err: errors.Wrapf(err, "unable to lock %s", glpath), 287 } 288 } 289 err = lockfile.TryLock() 290 } 291 292 ctx, cf := context.WithCancel(context.TODO()) 293 superv := newSupervisor(ctx) 294 deducer := newDeductionCoordinator(superv) 295 296 var sc sourceCache 297 if c.CacheAge > 0 { 298 // Try to open the BoltDB cache from disk. 299 epoch := time.Now().Add(-c.CacheAge).Unix() 300 boltCache, err := newBoltCache(c.Cachedir, epoch, c.Logger) 301 if err != nil { 302 c.Logger.Println(errors.Wrapf(err, "failed to open persistent cache %q", c.Cachedir)) 303 } else { 304 sc = newMultiCache(memoryCache{}, boltCache) 305 } 306 } 307 308 sm := &SourceMgr{ 309 cachedir: c.Cachedir, 310 lf: lockfile, 311 suprvsr: superv, 312 cancelAll: cf, 313 deduceCoord: deducer, 314 srcCoord: newSourceCoordinator(superv, deducer, c.Cachedir, sc, c.Logger), 315 qch: make(chan struct{}), 316 } 317 318 return sm, nil 319} 320 321// Cachedir returns the location of the cache directory. 322func (sm *SourceMgr) Cachedir() string { 323 return sm.cachedir 324} 325 326// UseDefaultSignalHandling sets up typical os.Interrupt signal handling for a 327// SourceMgr. 328func (sm *SourceMgr) UseDefaultSignalHandling() { 329 sigch := make(chan os.Signal, 1) 330 signal.Notify(sigch, os.Interrupt) 331 sm.HandleSignals(sigch) 332} 333 334// HandleSignals sets up logic to handle incoming signals with the goal of 335// shutting down the SourceMgr safely. 336// 337// Calling code must provide the signal channel, and is responsible for calling 338// signal.Notify() on that channel. 339// 340// Successive calls to HandleSignals() will deregister the previous handler and 341// set up a new one. It is not recommended that the same channel be passed 342// multiple times to this method. 343// 344// SetUpSigHandling() will set up a handler that is appropriate for most 345// use cases. 346func (sm *SourceMgr) HandleSignals(sigch chan os.Signal) { 347 sm.sigmut.Lock() 348 // always start by closing the qch, which will lead to any existing signal 349 // handler terminating, and deregistering its sigch. 350 if sm.qch != nil { 351 close(sm.qch) 352 } 353 sm.qch = make(chan struct{}) 354 355 // Run a new goroutine with the input sigch and the fresh qch 356 go func(sch chan os.Signal, qch <-chan struct{}) { 357 defer signal.Stop(sch) 358 select { 359 case <-sch: 360 // Set up a timer to uninstall the signal handler after three 361 // seconds, so that the user can easily force termination with a 362 // second ctrl-c 363 time.AfterFunc(3*time.Second, func() { 364 signal.Stop(sch) 365 }) 366 367 if opc := sm.suprvsr.count(); opc > 0 { 368 fmt.Printf("Signal received: waiting for %v ops to complete...\n", opc) 369 } 370 371 sm.Release() 372 case <-qch: 373 // quit channel triggered - deregister our sigch and return 374 } 375 }(sigch, sm.qch) 376 // Try to ensure handler is blocked in for-select before releasing the mutex 377 runtime.Gosched() 378 379 sm.sigmut.Unlock() 380} 381 382// StopSignalHandling deregisters any signal handler running on this SourceMgr. 383// 384// It's normally not necessary to call this directly; it will be called as 385// needed by Release(). 386func (sm *SourceMgr) StopSignalHandling() { 387 sm.sigmut.Lock() 388 if sm.qch != nil { 389 close(sm.qch) 390 sm.qch = nil 391 runtime.Gosched() 392 } 393 sm.sigmut.Unlock() 394} 395 396// CouldNotCreateLockError describe failure modes in which creating a SourceMgr 397// did not succeed because there was an error while attempting to create the 398// on-disk lock file. 399type CouldNotCreateLockError struct { 400 Path string 401 Err error 402} 403 404func (e CouldNotCreateLockError) Error() string { 405 return e.Err.Error() 406} 407 408// Release lets go of any locks held by the SourceManager. Once called, it is no 409// longer allowed to call methods of that SourceManager; all method calls will 410// immediately result in errors. 411func (sm *SourceMgr) Release() { 412 atomic.StoreInt32(&sm.releasing, 1) 413 414 sm.relonce.Do(func() { 415 // Send the signal to the supervisor to cancel all running calls. 416 sm.cancelAll() 417 sm.suprvsr.wait() 418 419 // Close the source coordinator. 420 sm.srcCoord.close() 421 422 // Close the file handle for the lock file and remove it from disk 423 sm.lf.Unlock() 424 os.Remove(filepath.Join(sm.cachedir, "sm.lock")) 425 426 // Close the qch, if non-nil, so the signal handlers run out. This will 427 // also deregister the sig channel, if any has been set up. 428 if sm.qch != nil { 429 close(sm.qch) 430 } 431 }) 432} 433 434// GetManifestAndLock returns manifest and lock information for the provided 435// ProjectIdentifier, at the provided Version. The work of producing the 436// manifest and lock is delegated to the provided ProjectAnalyzer's 437// DeriveManifestAndLock() method. 438func (sm *SourceMgr) GetManifestAndLock(id ProjectIdentifier, v Version, an ProjectAnalyzer) (Manifest, Lock, error) { 439 if atomic.LoadInt32(&sm.releasing) == 1 { 440 return nil, nil, ErrSourceManagerIsReleased 441 } 442 443 srcg, err := sm.srcCoord.getSourceGatewayFor(context.TODO(), id) 444 if err != nil { 445 return nil, nil, err 446 } 447 448 return srcg.getManifestAndLock(context.TODO(), id.ProjectRoot, v, an) 449} 450 451// ListPackages parses the tree of the Go packages at and below the ProjectRoot 452// of the given ProjectIdentifier, at the given version. 453func (sm *SourceMgr) ListPackages(id ProjectIdentifier, v Version) (pkgtree.PackageTree, error) { 454 if atomic.LoadInt32(&sm.releasing) == 1 { 455 return pkgtree.PackageTree{}, ErrSourceManagerIsReleased 456 } 457 458 srcg, err := sm.srcCoord.getSourceGatewayFor(context.TODO(), id) 459 if err != nil { 460 return pkgtree.PackageTree{}, err 461 } 462 463 return srcg.listPackages(context.TODO(), id.ProjectRoot, v) 464} 465 466// ListVersions retrieves a list of the available versions for a given 467// repository name. 468// 469// The list is not sorted; while it may be returned in the order that the 470// underlying VCS reports version information, no guarantee is made. It is 471// expected that the caller either not care about order, or sort the result 472// themselves. 473// 474// This list is always retrieved from upstream on the first call. Subsequent 475// calls will return a cached version of the first call's results. if upstream 476// is not accessible (network outage, access issues, or the resource actually 477// went away), an error will be returned. 478func (sm *SourceMgr) ListVersions(id ProjectIdentifier) ([]PairedVersion, error) { 479 if atomic.LoadInt32(&sm.releasing) == 1 { 480 return nil, ErrSourceManagerIsReleased 481 } 482 483 srcg, err := sm.srcCoord.getSourceGatewayFor(context.TODO(), id) 484 if err != nil { 485 // TODO(sdboyer) More-er proper-er errors 486 return nil, err 487 } 488 489 return srcg.listVersions(context.TODO()) 490} 491 492// RevisionPresentIn indicates whether the provided Revision is present in the given 493// repository. 494func (sm *SourceMgr) RevisionPresentIn(id ProjectIdentifier, r Revision) (bool, error) { 495 if atomic.LoadInt32(&sm.releasing) == 1 { 496 return false, ErrSourceManagerIsReleased 497 } 498 499 srcg, err := sm.srcCoord.getSourceGatewayFor(context.TODO(), id) 500 if err != nil { 501 // TODO(sdboyer) More-er proper-er errors 502 return false, err 503 } 504 505 return srcg.revisionPresentIn(context.TODO(), r) 506} 507 508// SourceExists checks if a repository exists, either upstream or in the cache, 509// for the provided ProjectIdentifier. 510func (sm *SourceMgr) SourceExists(id ProjectIdentifier) (bool, error) { 511 if atomic.LoadInt32(&sm.releasing) == 1 { 512 return false, ErrSourceManagerIsReleased 513 } 514 515 srcg, err := sm.srcCoord.getSourceGatewayFor(context.TODO(), id) 516 if err != nil { 517 return false, err 518 } 519 520 ctx := context.TODO() 521 if err := srcg.existsInCache(ctx); err == nil { 522 return true, nil 523 } 524 if err := srcg.existsUpstream(ctx); err != nil { 525 return false, err 526 } 527 return true, nil 528} 529 530// SyncSourceFor will ensure that all local caches and information about a 531// source are up to date with any network-acccesible information. 532// 533// The primary use case for this is prefetching. 534func (sm *SourceMgr) SyncSourceFor(id ProjectIdentifier) error { 535 if atomic.LoadInt32(&sm.releasing) == 1 { 536 return ErrSourceManagerIsReleased 537 } 538 539 srcg, err := sm.srcCoord.getSourceGatewayFor(context.TODO(), id) 540 if err != nil { 541 return err 542 } 543 544 return srcg.syncLocal(context.TODO()) 545} 546 547// ExportProject writes out the tree of the provided ProjectIdentifier's 548// ProjectRoot, at the provided version, to the provided directory. 549func (sm *SourceMgr) ExportProject(ctx context.Context, id ProjectIdentifier, v Version, to string) error { 550 if atomic.LoadInt32(&sm.releasing) == 1 { 551 return ErrSourceManagerIsReleased 552 } 553 554 srcg, err := sm.srcCoord.getSourceGatewayFor(ctx, id) 555 if err != nil { 556 return err 557 } 558 559 return srcg.exportVersionTo(ctx, v, to) 560} 561 562// ExportPrunedProject writes out a tree of the provided LockedProject, applying 563// provided pruning rules as appropriate. 564func (sm *SourceMgr) ExportPrunedProject(ctx context.Context, lp LockedProject, prune PruneOptions, to string) error { 565 if atomic.LoadInt32(&sm.releasing) == 1 { 566 return ErrSourceManagerIsReleased 567 } 568 569 srcg, err := sm.srcCoord.getSourceGatewayFor(ctx, lp.Ident()) 570 if err != nil { 571 return err 572 } 573 574 return srcg.exportPrunedVersionTo(ctx, lp, prune, to) 575} 576 577// DeduceProjectRoot takes an import path and deduces the corresponding 578// project/source root. 579// 580// Note that some import paths may require network activity to correctly 581// determine the root of the path, such as, but not limited to, vanity import 582// paths. (A special exception is written for gopkg.in to minimize network 583// activity, as its behavior is well-structured) 584func (sm *SourceMgr) DeduceProjectRoot(ip string) (ProjectRoot, error) { 585 if atomic.LoadInt32(&sm.releasing) == 1 { 586 return "", ErrSourceManagerIsReleased 587 } 588 589 // TODO(sdboyer) refactor deduceRootPath() so that this validation can move 590 // back down below a cache point, rather than executing on every call. 591 if !pathvld.MatchString(ip) { 592 return "", errors.Errorf("%q is not a valid import path", ip) 593 } 594 595 pd, err := sm.deduceCoord.deduceRootPath(context.TODO(), ip) 596 return ProjectRoot(pd.root), err 597} 598 599// InferConstraint tries to puzzle out what kind of version is given in a 600// string. Preference is given first for branches, then semver constraints, then 601// plain tags, and then revisions. 602func (sm *SourceMgr) InferConstraint(s string, pi ProjectIdentifier) (Constraint, error) { 603 if s == "" { 604 return Any(), nil 605 } 606 607 // Lookup the string in the repository 608 var version PairedVersion 609 versions, err := sm.ListVersions(pi) 610 if err != nil { 611 return nil, errors.Wrapf(err, "list versions for %s", pi) // means repo does not exist 612 } 613 SortPairedForUpgrade(versions) 614 for _, v := range versions { 615 if s == v.String() { 616 version = v 617 break 618 } 619 } 620 621 // Branch 622 if version != nil && version.Type() == IsBranch { 623 return version.Unpair(), nil 624 } 625 626 // Semver Constraint 627 c, err := NewSemverConstraintIC(s) 628 if c != nil && err == nil { 629 return c, nil 630 } 631 632 // Tag 633 if version != nil { 634 return version.Unpair(), nil 635 } 636 637 // Revision, possibly abbreviated 638 r, err := sm.disambiguateRevision(context.TODO(), pi, Revision(s)) 639 if err == nil { 640 return r, nil 641 } 642 643 return nil, errors.Errorf("%s is not a valid version for the package %s(%s)", s, pi.ProjectRoot, pi.Source) 644} 645 646// SourceURLsForPath takes an import path and deduces the set of source URLs 647// that may refer to a canonical upstream source. 648// In general, these URLs differ only by protocol (e.g. https vs. ssh), not path 649func (sm *SourceMgr) SourceURLsForPath(ip string) ([]*url.URL, error) { 650 deduced, err := sm.deduceCoord.deduceRootPath(context.TODO(), ip) 651 if err != nil { 652 return nil, err 653 } 654 655 return deduced.mb.possibleURLs(), nil 656} 657 658// disambiguateRevision looks up a revision in the underlying source, spitting 659// it back out in an unabbreviated, disambiguated form. 660// 661// For example, if pi refers to a git-based project, then rev could be an 662// abbreviated git commit hash. disambiguateRevision would return the complete 663// hash. 664func (sm *SourceMgr) disambiguateRevision(ctx context.Context, pi ProjectIdentifier, rev Revision) (Revision, error) { 665 srcg, err := sm.srcCoord.getSourceGatewayFor(context.TODO(), pi) 666 if err != nil { 667 return "", err 668 } 669 return srcg.disambiguateRevision(ctx, rev) 670} 671 672type timeCount struct { 673 count int 674 start time.Time 675} 676 677type durCount struct { 678 count int 679 dur time.Duration 680} 681 682type supervisor struct { 683 ctx context.Context 684 mu sync.Mutex // Guards all maps 685 cond sync.Cond // Wraps mu so callers can wait until all calls end 686 running map[callInfo]timeCount 687 ran map[callType]durCount 688} 689 690func newSupervisor(ctx context.Context) *supervisor { 691 supv := &supervisor{ 692 ctx: ctx, 693 running: make(map[callInfo]timeCount), 694 ran: make(map[callType]durCount), 695 } 696 697 supv.cond = sync.Cond{L: &supv.mu} 698 return supv 699} 700 701// do executes the incoming closure using a conjoined context, and keeps 702// counters to ensure the sourceMgr can't finish Release()ing until after all 703// calls have returned. 704func (sup *supervisor) do(inctx context.Context, name string, typ callType, f func(context.Context) error) error { 705 ci := callInfo{ 706 name: name, 707 typ: typ, 708 } 709 710 octx, err := sup.start(ci) 711 if err != nil { 712 return err 713 } 714 715 cctx, cancelFunc := constext.Cons(inctx, octx) 716 err = f(cctx) 717 sup.done(ci) 718 cancelFunc() 719 return err 720} 721 722func (sup *supervisor) start(ci callInfo) (context.Context, error) { 723 sup.mu.Lock() 724 defer sup.mu.Unlock() 725 if err := sup.ctx.Err(); err != nil { 726 // We've already been canceled; error out. 727 return nil, err 728 } 729 730 if existingInfo, has := sup.running[ci]; has { 731 existingInfo.count++ 732 sup.running[ci] = existingInfo 733 } else { 734 sup.running[ci] = timeCount{ 735 count: 1, 736 start: time.Now(), 737 } 738 } 739 740 return sup.ctx, nil 741} 742 743func (sup *supervisor) count() int { 744 sup.mu.Lock() 745 defer sup.mu.Unlock() 746 return len(sup.running) 747} 748 749func (sup *supervisor) done(ci callInfo) { 750 sup.mu.Lock() 751 752 existingInfo, has := sup.running[ci] 753 if !has { 754 panic(fmt.Sprintf("sourceMgr: tried to complete a call that had not registered via run()")) 755 } 756 757 if existingInfo.count > 1 { 758 // If more than one is pending, don't stop the clock yet. 759 existingInfo.count-- 760 sup.running[ci] = existingInfo 761 } else { 762 // Last one for this particular key; update metrics with info. 763 durCnt := sup.ran[ci.typ] 764 durCnt.count++ 765 durCnt.dur += time.Since(existingInfo.start) 766 sup.ran[ci.typ] = durCnt 767 delete(sup.running, ci) 768 769 if len(sup.running) == 0 { 770 // This is the only place where we signal the cond, as it's the only 771 // time that the number of running calls could become zero. 772 sup.cond.Signal() 773 } 774 } 775 sup.mu.Unlock() 776} 777 778// wait until all active calls have terminated. 779// 780// Assumes something else has already canceled the supervisor via its context. 781func (sup *supervisor) wait() { 782 sup.cond.L.Lock() 783 for len(sup.running) > 0 { 784 sup.cond.Wait() 785 } 786 sup.cond.L.Unlock() 787} 788 789type callType uint 790 791const ( 792 ctHTTPMetadata callType = iota 793 ctListVersions 794 ctGetManifestAndLock 795 ctListPackages 796 ctSourcePing 797 ctSourceInit 798 ctSourceFetch 799 ctExportTree 800 ctValidateLocal 801) 802 803func (ct callType) String() string { 804 switch ct { 805 case ctHTTPMetadata: 806 return "Retrieving go get metadata" 807 case ctListVersions: 808 return "Retrieving latest version list" 809 case ctGetManifestAndLock: 810 return "Reading manifest and lock data" 811 case ctListPackages: 812 return "Parsing PackageTree" 813 case ctSourcePing: 814 return "Checking for upstream existence" 815 case ctSourceInit: 816 return "Initializing local source cache" 817 case ctSourceFetch: 818 return "Fetching latest data into local source cache" 819 case ctExportTree: 820 return "Writing code tree out to disk" 821 default: 822 panic("unknown calltype") 823 } 824} 825 826// callInfo provides metadata about an ongoing call. 827type callInfo struct { 828 name string 829 typ callType 830} 831