1// Copyright 2014 Google Inc. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package scanner holds code for iterating through the contents of a CT log.
16package scanner
17
18import (
19	"context"
20	"fmt"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"github.com/golang/glog"
26	ct "github.com/google/certificate-transparency-go"
27	"github.com/google/certificate-transparency-go/client"
28	"github.com/google/certificate-transparency-go/x509"
29)
30
31// ScannerOptions holds configuration options for the Scanner.
32type ScannerOptions struct { // nolint:golint
33	FetcherOptions
34
35	// Custom matcher for x509 Certificates, functor will be called for each
36	// Certificate found during scanning. Should be a Matcher or LeafMatcher
37	// implementation.
38	Matcher interface{}
39
40	// Match precerts only (Matcher still applies to precerts).
41	PrecertOnly bool
42
43	// Number of concurrent matchers to run.
44	NumWorkers int
45
46	// Number of fetched entries to buffer on their way to the callbacks.
47	BufferSize int
48}
49
50// DefaultScannerOptions returns a new ScannerOptions with sensible defaults.
51func DefaultScannerOptions() *ScannerOptions {
52	return &ScannerOptions{
53		FetcherOptions: *DefaultFetcherOptions(),
54		Matcher:        &MatchAll{},
55		PrecertOnly:    false,
56		NumWorkers:     1,
57	}
58}
59
60// Scanner is a tool to scan all the entries in a CT Log.
61type Scanner struct {
62	fetcher *Fetcher
63
64	// Configuration options for this Scanner instance.
65	opts ScannerOptions
66
67	// Counters of the number of certificates scanned and matched.
68	certsProcessed int64
69	certsMatched   int64
70
71	// Counter of the number of precertificates encountered during the scan.
72	precertsSeen int64
73
74	unparsableEntries         int64
75	entriesWithNonFatalErrors int64
76}
77
78// entryInfo represents information about a log entry.
79type entryInfo struct {
80	// The index of the entry containing the LeafInput in the log.
81	index int64
82	// The log entry returned by the log server.
83	entry ct.LeafEntry
84}
85
86// Takes the error returned by either x509.ParseCertificate() or
87// x509.ParseTBSCertificate() and determines if it's non-fatal or otherwise.
88// In the case of non-fatal errors, the error will be logged,
89// entriesWithNonFatalErrors will be incremented, and the return value will be
90// false.
91// Fatal errors will cause the function to return true.
92// When err is nil, this method does nothing.
93func (s *Scanner) isCertErrorFatal(err error, logEntry *ct.LogEntry, index int64) bool {
94	if err == nil {
95		// No error to handle.
96		return false
97	} else if !x509.IsFatal(err) {
98		atomic.AddInt64(&s.entriesWithNonFatalErrors, 1)
99		// We'll make a note, but continue.
100		glog.V(1).Infof("Non-fatal error in %v at index %d: %v", logEntry.Leaf.TimestampedEntry.EntryType, index, err)
101		return false
102	}
103	return true
104}
105
106// Processes the given entry in the specified log.
107func (s *Scanner) processEntry(info entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
108	atomic.AddInt64(&s.certsProcessed, 1)
109
110	switch matcher := s.opts.Matcher.(type) {
111	case Matcher:
112		return s.processMatcherEntry(matcher, info, foundCert, foundPrecert)
113	case LeafMatcher:
114		return s.processMatcherLeafEntry(matcher, info, foundCert, foundPrecert)
115	default:
116		return fmt.Errorf("Unexpected matcher type %T", matcher)
117	}
118}
119
120func (s *Scanner) processMatcherEntry(matcher Matcher, info entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
121	rawLogEntry, err := ct.RawLogEntryFromLeaf(info.index, &info.entry)
122	if err != nil {
123		return fmt.Errorf("failed to build raw log entry %d: %v", info.index, err)
124	}
125	// Matcher instances need the parsed [pre-]certificate.
126	logEntry, err := rawLogEntry.ToLogEntry()
127	if s.isCertErrorFatal(err, logEntry, info.index) {
128		return fmt.Errorf("failed to parse [pre-]certificate in MerkleTreeLeaf[%d]: %v", info.index, err)
129	}
130
131	switch {
132	case logEntry.X509Cert != nil:
133		if s.opts.PrecertOnly {
134			// Only interested in precerts and this is an X.509 cert, early-out.
135			return nil
136		}
137		if matcher.CertificateMatches(logEntry.X509Cert) {
138			atomic.AddInt64(&s.certsMatched, 1)
139			foundCert(rawLogEntry)
140		}
141	case logEntry.Precert != nil:
142		if matcher.PrecertificateMatches(logEntry.Precert) {
143			atomic.AddInt64(&s.certsMatched, 1)
144			foundPrecert(rawLogEntry)
145		}
146		atomic.AddInt64(&s.precertsSeen, 1)
147	default:
148		return fmt.Errorf("saw unknown entry type: %v", logEntry.Leaf.TimestampedEntry.EntryType)
149	}
150	return nil
151}
152
153func (s *Scanner) processMatcherLeafEntry(matcher LeafMatcher, info entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
154	if !matcher.Matches(&info.entry) {
155		return nil
156	}
157
158	rawLogEntry, err := ct.RawLogEntryFromLeaf(info.index, &info.entry)
159	if rawLogEntry == nil {
160		return fmt.Errorf("failed to build raw log entry %d: %v", info.index, err)
161	}
162	switch eType := rawLogEntry.Leaf.TimestampedEntry.EntryType; eType {
163	case ct.X509LogEntryType:
164		if s.opts.PrecertOnly {
165			// Only interested in precerts and this is an X.509 cert, early-out.
166			return nil
167		}
168		foundCert(rawLogEntry)
169	case ct.PrecertLogEntryType:
170		foundPrecert(rawLogEntry)
171		atomic.AddInt64(&s.precertsSeen, 1)
172	default:
173		return fmt.Errorf("saw unknown entry type: %v", eType)
174	}
175	return nil
176}
177
178// Worker function to match certs.
179// Accepts MatcherJobs over the entries channel, and processes them.
180// Returns true over the done channel when the entries channel is closed.
181func (s *Scanner) matcherJob(entries <-chan entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) {
182	for e := range entries {
183		if err := s.processEntry(e, foundCert, foundPrecert); err != nil {
184			atomic.AddInt64(&s.unparsableEntries, 1)
185			glog.Errorf("Failed to parse entry at index %d: %s", e.index, err.Error())
186		}
187	}
188}
189
190// Pretty prints the passed in duration into a human readable string.
191func humanTime(dur time.Duration) string {
192	hours := int(dur / time.Hour)
193	dur %= time.Hour
194	minutes := int(dur / time.Minute)
195	dur %= time.Minute
196	seconds := int(dur / time.Second)
197	s := ""
198	if hours > 0 {
199		s += fmt.Sprintf("%d hours ", hours)
200	}
201	if minutes > 0 {
202		s += fmt.Sprintf("%d minutes ", minutes)
203	}
204	if seconds > 0 || len(s) == 0 {
205		s += fmt.Sprintf("%d seconds ", seconds)
206	}
207	return s
208}
209
210func (s *Scanner) logThroughput(treeSize int64, stop <-chan bool) {
211	const wndSize = 15
212	wnd := make([]int64, wndSize)
213	wndTotal := int64(0)
214
215	ticker := time.NewTicker(time.Second)
216	defer ticker.Stop()
217
218	for slot, filled, prevCnt := 0, 0, int64(0); ; slot = (slot + 1) % wndSize {
219		select {
220		case <-stop:
221			return
222		case <-ticker.C:
223			certsCnt := atomic.LoadInt64(&s.certsProcessed)
224			certsMatched := atomic.LoadInt64(&s.certsMatched)
225
226			slotValue := certsCnt - prevCnt
227			wndTotal += slotValue - wnd[slot]
228			wnd[slot], prevCnt = slotValue, certsCnt
229
230			if filled < wndSize {
231				filled++
232			}
233
234			throughput := float64(wndTotal) / float64(filled)
235			remainingCerts := treeSize - int64(s.opts.StartIndex) - certsCnt
236			remainingSeconds := int(float64(remainingCerts) / throughput)
237			remainingString := humanTime(time.Duration(remainingSeconds) * time.Second)
238			glog.V(1).Infof("Processed: %d certs (to index %d), matched %d (%2.2f%%). Throughput (last %ds): %3.2f ETA: %s\n",
239				certsCnt, s.opts.StartIndex+certsCnt, certsMatched,
240				(100.0*float64(certsMatched))/float64(certsCnt),
241				filled, throughput, remainingString)
242		}
243	}
244}
245
246// Scan performs a scan against the Log. Blocks until the scan is complete.
247//
248// For each x509 certificate found, calls foundCert with the corresponding
249// LogEntry, which includes the index of the entry and the certificate.
250// For each precert found, calls foundPrecert with the corresponding LogEntry,
251// which includes the index of the entry and the precert.
252func (s *Scanner) Scan(ctx context.Context, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
253	_, err := s.ScanLog(ctx, foundCert, foundPrecert)
254	return err
255}
256
257// ScanLog performs a scan against the Log, returning the count of scanned entries.
258func (s *Scanner) ScanLog(ctx context.Context, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) (int64, error) {
259	glog.V(1).Infof("Starting up Scanner...")
260	s.certsProcessed = 0
261	s.certsMatched = 0
262	s.precertsSeen = 0
263	s.unparsableEntries = 0
264	s.entriesWithNonFatalErrors = 0
265
266	sth, err := s.fetcher.Prepare(ctx)
267	if err != nil {
268		return -1, err
269	}
270
271	startTime := time.Now()
272	stop := make(chan bool)
273	go s.logThroughput(int64(sth.TreeSize), stop)
274	defer func() {
275		stop <- true
276		close(stop)
277	}()
278
279	// Start matcher workers.
280	var wg sync.WaitGroup
281	entries := make(chan entryInfo, s.opts.BufferSize)
282	for w, cnt := 0, s.opts.NumWorkers; w < cnt; w++ {
283		wg.Add(1)
284		go func(idx int) {
285			defer wg.Done()
286			s.matcherJob(entries, foundCert, foundPrecert)
287			glog.V(1).Infof("Matcher %d finished", idx)
288		}(w)
289	}
290
291	flatten := func(b EntryBatch) {
292		for i, e := range b.Entries {
293			entries <- entryInfo{index: b.Start + int64(i), entry: e}
294		}
295	}
296	err = s.fetcher.Run(ctx, flatten)
297	close(entries) // Causes matcher workers to terminate.
298	wg.Wait()      // Wait until they terminate.
299	if err != nil {
300		return -1, err
301	}
302
303	glog.V(1).Infof("Completed %d certs in %s", atomic.LoadInt64(&s.certsProcessed), humanTime(time.Since(startTime)))
304	glog.V(1).Infof("Saw %d precerts", atomic.LoadInt64(&s.precertsSeen))
305	glog.V(1).Infof("Saw %d unparsable entries", atomic.LoadInt64(&s.unparsableEntries))
306	glog.V(1).Infof("Saw %d non-fatal errors", atomic.LoadInt64(&s.entriesWithNonFatalErrors))
307
308	return int64(s.fetcher.opts.EndIndex), nil
309}
310
311// NewScanner creates a Scanner instance using client to talk to the log,
312// taking configuration options from opts.
313func NewScanner(client *client.LogClient, opts ScannerOptions) *Scanner {
314	var scanner Scanner
315	scanner.opts = opts
316	scanner.fetcher = NewFetcher(client, &scanner.opts.FetcherOptions)
317
318	// Set a default match-everything regex if none was provided.
319	if opts.Matcher == nil {
320		opts.Matcher = &MatchAll{}
321	}
322	return &scanner
323}
324