1package lidar
2
3import (
4	"context"
5	"fmt"
6	"os"
7	"runtime/debug"
8	"strconv"
9	"sync"
10
11	"code.cloudfoundry.org/lager"
12	"code.cloudfoundry.org/lager/lagerctx"
13	"github.com/concourse/concourse/atc/db"
14	"github.com/concourse/concourse/atc/engine"
15	"github.com/concourse/concourse/atc/metric"
16	"github.com/concourse/concourse/tracing"
17)
18
19//go:generate counterfeiter . RateCalculator
20
21type RateCalculator interface {
22	RateLimiter() (Limiter, error)
23}
24
25func NewChecker(
26	logger lager.Logger,
27	checkFactory db.CheckFactory,
28	engine engine.Engine,
29	checkRateCalculator RateCalculator,
30) *checker {
31	return &checker{
32		logger:              logger,
33		checkFactory:        checkFactory,
34		engine:              engine,
35		running:             &sync.Map{},
36		checkRateCalculator: checkRateCalculator,
37	}
38}
39
40type checker struct {
41	logger lager.Logger
42
43	checkFactory        db.CheckFactory
44	engine              engine.Engine
45	checkRateCalculator RateCalculator
46
47	running *sync.Map
48}
49
50func (c *checker) Run(ctx context.Context) error {
51	c.logger.Info("start")
52	defer c.logger.Info("end")
53
54	checks, err := c.checkFactory.StartedChecks()
55	if err != nil {
56		c.logger.Error("failed-to-fetch-resource-checks", err)
57		return err
58	}
59
60	metric.Metrics.ChecksQueueSize.Set(int64(len(checks)))
61
62	if len(checks) == 0 {
63		return nil
64	}
65
66	limiter, err := c.checkRateCalculator.RateLimiter()
67	if err != nil {
68		return err
69	}
70
71	for _, ck := range checks {
72		if _, exists := c.running.LoadOrStore(ck.ID(), true); !exists {
73			if !ck.ManuallyTriggered() {
74				err := limiter.Wait(ctx)
75				if err != nil {
76					c.logger.Error("failed-to-wait-for-limiter", err)
77					continue
78				}
79			}
80
81			go func(check db.Check) {
82				loggerData := lager.Data{
83					"check_id": strconv.Itoa(check.ID()),
84				}
85				defer func() {
86					if r := recover(); r != nil {
87						err = fmt.Errorf("panic in checker check run %s: %v", loggerData, r)
88
89						fmt.Fprintf(os.Stderr, "%s\n %s\n", err.Error(), string(debug.Stack()))
90						c.logger.Error("panic-in-checker-check-run", err)
91
92						check.FinishWithError(err)
93					}
94				}()
95
96				spanCtx, span := tracing.StartSpanFollowing(
97					ctx,
98					check,
99					"checker.Run",
100					tracing.Attrs{
101						"team":                     check.TeamName(),
102						"pipeline":                 check.PipelineName(),
103						"check_id":                 strconv.Itoa(check.ID()),
104						"resource_config_scope_id": strconv.Itoa(check.ResourceConfigScopeID()),
105					},
106				)
107				defer span.End()
108				defer c.running.Delete(check.ID())
109
110				c.engine.NewCheck(check).Run(
111					lagerctx.NewContext(
112						spanCtx,
113						c.logger.WithData(loggerData),
114					),
115				)
116			}(ck)
117		}
118	}
119
120	return nil
121}
122