1// Copyright (C) 2020 The Syncthing Authors.
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this file,
5// You can obtain one at https://mozilla.org/MPL/2.0/.
6
7package model
8
9import (
10	"context"
11	"fmt"
12	"sync"
13	"time"
14
15	"github.com/thejerf/suture/v4"
16
17	"github.com/syncthing/syncthing/lib/config"
18	"github.com/syncthing/syncthing/lib/db"
19	"github.com/syncthing/syncthing/lib/events"
20	"github.com/syncthing/syncthing/lib/protocol"
21	"github.com/syncthing/syncthing/lib/svcutil"
22)
23
24type indexHandler struct {
25	conn                     protocol.Connection
26	downloads                *deviceDownloadState
27	folder                   string
28	folderIsReceiveEncrypted bool
29	prevSequence             int64
30	evLogger                 events.Logger
31	token                    suture.ServiceToken
32
33	cond   *sync.Cond
34	paused bool
35	fset   *db.FileSet
36	runner service
37}
38
39func newIndexHandler(conn protocol.Connection, downloads *deviceDownloadState, folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo, evLogger events.Logger) *indexHandler {
40	myIndexID := fset.IndexID(protocol.LocalDeviceID)
41	mySequence := fset.Sequence(protocol.LocalDeviceID)
42	var startSequence int64
43
44	// This is the other side's description of what it knows
45	// about us. Lets check to see if we can start sending index
46	// updates directly or need to send the index from start...
47
48	if startInfo.local.IndexID == myIndexID {
49		// They say they've seen our index ID before, so we can
50		// send a delta update only.
51
52		if startInfo.local.MaxSequence > mySequence {
53			// Safety check. They claim to have more or newer
54			// index data than we have - either we have lost
55			// index data, or reset the index without resetting
56			// the IndexID, or something else weird has
57			// happened. We send a full index to reset the
58			// situation.
59			l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", conn.ID().Short(), folder.Description())
60			startSequence = 0
61		} else {
62			l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", conn.ID().Short(), folder.Description(), startInfo.local.MaxSequence)
63			startSequence = startInfo.local.MaxSequence
64		}
65	} else if startInfo.local.IndexID != 0 {
66		// They say they've seen an index ID from us, but it's
67		// not the right one. Either they are confused or we
68		// must have reset our database since last talking to
69		// them. We'll start with a full index transfer.
70		l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", conn.ID().Short(), folder.Description(), startInfo.local.IndexID, myIndexID)
71		startSequence = 0
72	} else {
73		l.Debugf("Device %v folder %s has no index ID for us", conn.ID().Short(), folder.Description())
74	}
75
76	// This is the other side's description of themselves. We
77	// check to see that it matches the IndexID we have on file,
78	// otherwise we drop our old index data and expect to get a
79	// completely new set.
80
81	theirIndexID := fset.IndexID(conn.ID())
82	if startInfo.remote.IndexID == 0 {
83		// They're not announcing an index ID. This means they
84		// do not support delta indexes and we should clear any
85		// information we have from them before accepting their
86		// index, which will presumably be a full index.
87		l.Debugf("Device %v folder %s does not announce an index ID", conn.ID().Short(), folder.Description())
88		fset.Drop(conn.ID())
89	} else if startInfo.remote.IndexID != theirIndexID {
90		// The index ID we have on file is not what they're
91		// announcing. They must have reset their database and
92		// will probably send us a full index. We drop any
93		// information we have and remember this new index ID
94		// instead.
95		l.Infof("Device %v folder %s has a new index ID (%v)", conn.ID().Short(), folder.Description(), startInfo.remote.IndexID)
96		fset.Drop(conn.ID())
97		fset.SetIndexID(conn.ID(), startInfo.remote.IndexID)
98	}
99
100	return &indexHandler{
101		conn:                     conn,
102		downloads:                downloads,
103		folder:                   folder.ID,
104		folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
105		prevSequence:             startSequence,
106		evLogger:                 evLogger,
107
108		fset:   fset,
109		runner: runner,
110		cond:   sync.NewCond(new(sync.Mutex)),
111	}
112}
113
114// waitForFileset waits for the handler to resume and fetches the current fileset.
115func (s *indexHandler) waitForFileset(ctx context.Context) (*db.FileSet, error) {
116	s.cond.L.Lock()
117	defer s.cond.L.Unlock()
118
119	for s.paused {
120		select {
121		case <-ctx.Done():
122			return nil, ctx.Err()
123		default:
124			s.cond.Wait()
125		}
126	}
127
128	return s.fset, nil
129}
130
131func (s *indexHandler) Serve(ctx context.Context) (err error) {
132	l.Debugf("Starting index handler for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence)
133	stop := make(chan struct{})
134
135	defer func() {
136		err = svcutil.NoRestartErr(err)
137		l.Debugf("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err)
138		close(stop)
139	}()
140
141	// Broadcast the pause cond when the context quits
142	go func() {
143		select {
144		case <-ctx.Done():
145			s.cond.Broadcast()
146		case <-stop:
147		}
148	}()
149
150	// We need to send one index, regardless of whether there is something to send or not
151	fset, err := s.waitForFileset(ctx)
152	if err != nil {
153		return err
154	}
155	err = s.sendIndexTo(ctx, fset)
156
157	// Subscribe to LocalIndexUpdated (we have new information to send) and
158	// DeviceDisconnected (it might be us who disconnected, so we should
159	// exit).
160	sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
161	defer sub.Unsubscribe()
162
163	evChan := sub.C()
164	ticker := time.NewTicker(time.Minute)
165	defer ticker.Stop()
166
167	for err == nil {
168		fset, err = s.waitForFileset(ctx)
169		if err != nil {
170			return err
171		}
172
173		// While we have sent a sequence at least equal to the one
174		// currently in the database, wait for the local index to update. The
175		// local index may update for other folders than the one we are
176		// sending for.
177		if fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
178			select {
179			case <-ctx.Done():
180				return ctx.Err()
181			case <-evChan:
182			case <-ticker.C:
183			}
184			continue
185		}
186
187		err = s.sendIndexTo(ctx, fset)
188
189		// Wait a short amount of time before entering the next loop. If there
190		// are continuous changes happening to the local index, this gives us
191		// time to batch them up a little.
192		select {
193		case <-ctx.Done():
194			return ctx.Err()
195		case <-time.After(250 * time.Millisecond):
196		}
197	}
198
199	return err
200}
201
202// resume might be called because the folder was actually resumed, or just
203// because the folder config changed (and thus the runner and potentially fset).
204func (s *indexHandler) resume(fset *db.FileSet, runner service) {
205	s.cond.L.Lock()
206	s.paused = false
207	s.fset = fset
208	s.runner = runner
209	s.cond.Broadcast()
210	s.cond.L.Unlock()
211}
212
213func (s *indexHandler) pause() {
214	s.cond.L.Lock()
215	if s.paused {
216		s.evLogger.Log(events.Failure, "index handler got paused while already paused")
217	}
218	s.paused = true
219	s.fset = nil
220	s.runner = nil
221	s.cond.Broadcast()
222	s.cond.L.Unlock()
223}
224
225// sendIndexTo sends file infos with a sequence number higher than prevSequence and
226// returns the highest sent sequence number.
227func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error {
228	initial := s.prevSequence == 0
229	batch := db.NewFileInfoBatch(nil)
230	batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
231		l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
232		if initial {
233			initial = false
234			return s.conn.Index(ctx, s.folder, fs)
235		}
236		return s.conn.IndexUpdate(ctx, s.folder, fs)
237	})
238
239	var err error
240	var f protocol.FileInfo
241	snap, err := fset.Snapshot()
242	if err != nil {
243		return svcutil.AsFatalErr(err, svcutil.ExitError)
244	}
245	defer snap.Release()
246	previousWasDelete := false
247	snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool {
248		// This is to make sure that renames (which is an add followed by a delete) land in the same batch.
249		// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
250		// the batch ends with a non-delete, or that the last item in the batch is already a delete
251		if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
252			if err = batch.Flush(); err != nil {
253				return false
254			}
255		}
256
257		if shouldDebug() {
258			if fi.SequenceNo() < s.prevSequence+1 {
259				panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
260			}
261		}
262
263		if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
264			l.Warnln("Non-increasing sequence detected: Checking and repairing the db...")
265			// Abort this round of index sending - the next one will pick
266			// up from the last successful one with the repeaired db.
267			defer func() {
268				if fixed, dbErr := fset.RepairSequence(); dbErr != nil {
269					l.Warnln("Failed repairing sequence entries:", dbErr)
270					panic("Failed repairing sequence entries")
271				} else {
272					s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
273					l.Infof("Repaired %v sequence entries in database", fixed)
274				}
275			}()
276			return false
277		}
278
279		f = fi.(protocol.FileInfo)
280
281		// If this is a folder receiving encrypted files only, we
282		// mustn't ever send locally changed file infos. Those aren't
283		// encrypted and thus would be a protocol error at the remote.
284		if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() {
285			return true
286		}
287
288		f = prepareFileInfoForIndex(f)
289
290		previousWasDelete = f.IsDeleted()
291
292		batch.Append(f)
293		return true
294	})
295	if err != nil {
296		return err
297	}
298
299	err = batch.Flush()
300
301	// True if there was nothing to be sent
302	if f.Sequence == 0 {
303		return err
304	}
305
306	s.prevSequence = f.Sequence
307	return err
308}
309
310func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string) error {
311	deviceID := s.conn.ID()
312
313	s.cond.L.Lock()
314	paused := s.paused
315	fset := s.fset
316	runner := s.runner
317	s.cond.L.Unlock()
318
319	if paused {
320		l.Infof("%v for paused folder %q", op, s.folder)
321		return fmt.Errorf("%v: %w", s.folder, ErrFolderPaused)
322	}
323
324	defer runner.SchedulePull()
325
326	s.downloads.Update(s.folder, makeForgetUpdate(fs))
327
328	if !update {
329		fset.Drop(deviceID)
330	}
331	for i := range fs {
332		// The local attributes should never be transmitted over the wire.
333		// Make sure they look like they weren't.
334		fs[i].LocalFlags = 0
335		fs[i].VersionHash = nil
336	}
337	fset.Update(deviceID, fs)
338
339	seq := fset.Sequence(deviceID)
340	s.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{
341		"device":   deviceID.String(),
342		"folder":   s.folder,
343		"items":    len(fs),
344		"sequence": seq,
345		"version":  seq, // legacy for sequence
346	})
347
348	return nil
349}
350
351func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo {
352	// Mark the file as invalid if any of the local bad stuff flags are set.
353	f.RawInvalid = f.IsInvalid()
354	// If the file is marked LocalReceive (i.e., changed locally on a
355	// receive only folder) we do not want it to ever become the
356	// globally best version, invalid or not.
357	if f.IsReceiveOnlyChanged() {
358		f.Version = protocol.Vector{}
359	}
360	// never sent externally
361	f.LocalFlags = 0
362	f.VersionHash = nil
363	return f
364}
365
366func (s *indexHandler) String() string {
367	return fmt.Sprintf("indexHandler@%p for %s to %s at %s", s, s.folder, s.conn.ID().Short(), s.conn)
368}
369
370type indexHandlerRegistry struct {
371	sup           *suture.Supervisor
372	evLogger      events.Logger
373	conn          protocol.Connection
374	downloads     *deviceDownloadState
375	indexHandlers map[string]*indexHandler
376	startInfos    map[string]*clusterConfigDeviceInfo
377	folderStates  map[string]*indexHandlerFolderState
378	mut           sync.Mutex
379}
380
381type indexHandlerFolderState struct {
382	cfg    config.FolderConfiguration
383	fset   *db.FileSet
384	runner service
385}
386
387func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, closed chan struct{}, parentSup *suture.Supervisor, evLogger events.Logger) *indexHandlerRegistry {
388	r := &indexHandlerRegistry{
389		conn:          conn,
390		downloads:     downloads,
391		evLogger:      evLogger,
392		indexHandlers: make(map[string]*indexHandler),
393		startInfos:    make(map[string]*clusterConfigDeviceInfo),
394		folderStates:  make(map[string]*indexHandlerFolderState),
395		mut:           sync.Mutex{},
396	}
397	r.sup = suture.New(r.String(), svcutil.SpecWithDebugLogger(l))
398	ourToken := parentSup.Add(r.sup)
399	r.sup.Add(svcutil.AsService(func(ctx context.Context) error {
400		select {
401		case <-ctx.Done():
402			return ctx.Err()
403		case <-closed:
404			parentSup.Remove(ourToken)
405		}
406		return nil
407	}, fmt.Sprintf("%v/waitForClosed", r)))
408	return r
409}
410
411func (r *indexHandlerRegistry) String() string {
412	return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.ID().Short())
413}
414
415func (r *indexHandlerRegistry) GetSupervisor() *suture.Supervisor {
416	return r.sup
417}
418
419func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) {
420	if is, ok := r.indexHandlers[folder.ID]; ok {
421		r.sup.RemoveAndWait(is.token, 0)
422		delete(r.indexHandlers, folder.ID)
423	}
424	delete(r.startInfos, folder.ID)
425
426	is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger)
427	is.token = r.sup.Add(is)
428	r.indexHandlers[folder.ID] = is
429}
430
431// AddIndexInfo starts an index handler for given folder, unless it is paused.
432// If it is paused, the given startInfo is stored to start the sender once the
433// folder is resumed.
434// If an index handler is already running, it will be stopped first.
435func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterConfigDeviceInfo) {
436	r.mut.Lock()
437	defer r.mut.Unlock()
438
439	if is, ok := r.indexHandlers[folder]; ok {
440		r.sup.RemoveAndWait(is.token, 0)
441		delete(r.indexHandlers, folder)
442		l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.ID().Short(), folder)
443	}
444	folderState, ok := r.folderStates[folder]
445	if !ok {
446		l.Debugf("Pending index handler for device %v and folder %v", r.conn.ID().Short(), folder)
447		r.startInfos[folder] = startInfo
448		return
449	}
450	r.startLocked(folderState.cfg, folderState.fset, folderState.runner, startInfo)
451}
452
453// Remove stops a running index handler or removes one pending to be started.
454// It is a noop if the folder isn't known.
455func (r *indexHandlerRegistry) Remove(folder string) {
456	r.mut.Lock()
457	defer r.mut.Unlock()
458
459	l.Debugf("Removing index handler for device %v and folder %v", r.conn.ID().Short(), folder)
460	if is, ok := r.indexHandlers[folder]; ok {
461		r.sup.RemoveAndWait(is.token, 0)
462		delete(r.indexHandlers, folder)
463	}
464	delete(r.startInfos, folder)
465	l.Debugf("Removed index handler for device %v and folder %v", r.conn.ID().Short(), folder)
466}
467
468// RemoveAllExcept stops all running index handlers and removes those pending to be started,
469// except mentioned ones.
470// It is a noop if the folder isn't known.
471func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]struct{}) {
472	r.mut.Lock()
473	defer r.mut.Unlock()
474
475	for folder, is := range r.indexHandlers {
476		if _, ok := except[folder]; !ok {
477			r.sup.RemoveAndWait(is.token, 0)
478			delete(r.indexHandlers, folder)
479			l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.ID().Short(), folder)
480		}
481	}
482	for folder := range r.startInfos {
483		if _, ok := except[folder]; !ok {
484			delete(r.startInfos, folder)
485			l.Debugf("Removed pending index handler for device %v and folder %v (removeAllExcept)", r.conn.ID().Short(), folder)
486		}
487	}
488}
489
490// RegisterFolderState must be called whenever something about the folder
491// changes. The exception being if the folder is removed entirely, then call
492// Remove. The fset and runner arguments may be nil, if given folder is paused.
493func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfiguration, fset *db.FileSet, runner service) {
494	if !folder.SharedWith(r.conn.ID()) {
495		r.Remove(folder.ID)
496		return
497	}
498
499	r.mut.Lock()
500	if folder.Paused {
501		r.folderPausedLocked(folder.ID)
502	} else {
503		r.folderRunningLocked(folder, fset, runner)
504	}
505	r.mut.Unlock()
506}
507
508// folderPausedLocked stops a running index handler.
509// It is a noop if the folder isn't known or has not been started yet.
510func (r *indexHandlerRegistry) folderPausedLocked(folder string) {
511	l.Debugf("Pausing index handler for device %v and folder %v", r.conn.ID().Short(), folder)
512	delete(r.folderStates, folder)
513	if is, ok := r.indexHandlers[folder]; ok {
514		is.pause()
515		l.Debugf("Paused index handler for device %v and folder %v", r.conn.ID().Short(), folder)
516	} else {
517		l.Debugf("No index handler for device %v and folder %v to pause", r.conn.ID().Short(), folder)
518	}
519}
520
521// folderRunningLocked resumes an already running index handler or starts it, if it
522// was added while paused.
523// It is a noop if the folder isn't known.
524func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service) {
525	r.folderStates[folder.ID] = &indexHandlerFolderState{
526		cfg:    folder,
527		fset:   fset,
528		runner: runner,
529	}
530
531	is, isOk := r.indexHandlers[folder.ID]
532	if info, ok := r.startInfos[folder.ID]; ok {
533		if isOk {
534			r.sup.RemoveAndWait(is.token, 0)
535			delete(r.indexHandlers, folder.ID)
536			l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.ID().Short(), folder.ID)
537		}
538		r.startLocked(folder, fset, runner, info)
539		delete(r.startInfos, folder.ID)
540		l.Debugf("Started index handler for device %v and folder %v in resume", r.conn.ID().Short(), folder.ID)
541	} else if isOk {
542		l.Debugf("Resuming index handler for device %v and folder %v", r.conn.ID().Short(), folder)
543		is.resume(fset, runner)
544	} else {
545		l.Debugf("Not resuming index handler for device %v and folder %v as none is paused and there is no start info", r.conn.ID().Short(), folder.ID)
546	}
547}
548
549func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error {
550	r.mut.Lock()
551	defer r.mut.Unlock()
552	is, isOk := r.indexHandlers[folder]
553	if !isOk {
554		l.Infof("%v for nonexistent or paused folder %q", op, folder)
555		return ErrFolderMissing
556	}
557	return is.receive(fs, update, op)
558}
559
560// makeForgetUpdate takes an index update and constructs a download progress update
561// causing to forget any progress for files which we've just been sent.
562func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate {
563	updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files))
564	for _, file := range files {
565		if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() {
566			continue
567		}
568		updates = append(updates, protocol.FileDownloadProgressUpdate{
569			Name:       file.Name,
570			Version:    file.Version,
571			UpdateType: protocol.FileDownloadProgressUpdateTypeForget,
572		})
573	}
574	return updates
575}
576