1// Copyright (C) 2014 The Syncthing Authors.
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this file,
5// You can obtain one at https://mozilla.org/MPL/2.0/.
6
7package model
8
9import (
10	"context"
11	"fmt"
12	"time"
13
14	"github.com/syncthing/syncthing/lib/config"
15	"github.com/syncthing/syncthing/lib/events"
16	"github.com/syncthing/syncthing/lib/protocol"
17	"github.com/syncthing/syncthing/lib/sync"
18)
19
20type ProgressEmitter struct {
21	cfg                config.Wrapper
22	registry           map[string]map[string]*sharedPullerState // folder: name: puller
23	interval           time.Duration
24	minBlocks          int
25	sentDownloadStates map[protocol.DeviceID]*sentDownloadState // States representing what we've sent to the other peer via DownloadProgress messages.
26	connections        map[protocol.DeviceID]protocol.Connection
27	foldersByConns     map[protocol.DeviceID][]string
28	disabled           bool
29	evLogger           events.Logger
30	mut                sync.Mutex
31
32	timer *time.Timer
33}
34
35type progressUpdate struct {
36	conn    protocol.Connection
37	folder  string
38	updates []protocol.FileDownloadProgressUpdate
39}
40
41func (p progressUpdate) send(ctx context.Context) {
42	p.conn.DownloadProgress(ctx, p.folder, p.updates)
43}
44
45// NewProgressEmitter creates a new progress emitter which emits
46// DownloadProgress events every interval.
47func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmitter {
48	t := &ProgressEmitter{
49		cfg:                cfg,
50		registry:           make(map[string]map[string]*sharedPullerState),
51		timer:              time.NewTimer(time.Millisecond),
52		sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState),
53		connections:        make(map[protocol.DeviceID]protocol.Connection),
54		foldersByConns:     make(map[protocol.DeviceID][]string),
55		evLogger:           evLogger,
56		mut:                sync.NewMutex(),
57	}
58
59	t.CommitConfiguration(config.Configuration{}, cfg.RawCopy())
60
61	return t
62}
63
64// serve starts the progress emitter which starts emitting DownloadProgress
65// events as the progress happens.
66func (t *ProgressEmitter) Serve(ctx context.Context) error {
67	t.cfg.Subscribe(t)
68	defer t.cfg.Unsubscribe(t)
69
70	var lastUpdate time.Time
71	var lastCount, newCount int
72	for {
73		select {
74		case <-ctx.Done():
75			l.Debugln("progress emitter: stopping")
76			return nil
77		case <-t.timer.C:
78			t.mut.Lock()
79			l.Debugln("progress emitter: timer - looking after", len(t.registry))
80
81			newLastUpdated := lastUpdate
82			newCount = t.lenRegistryLocked()
83			var progressUpdates []progressUpdate
84			for _, pullers := range t.registry {
85				for _, puller := range pullers {
86					if updated := puller.Updated(); updated.After(newLastUpdated) {
87						newLastUpdated = updated
88					}
89				}
90			}
91
92			if !newLastUpdated.Equal(lastUpdate) || newCount != lastCount {
93				lastUpdate = newLastUpdated
94				lastCount = newCount
95				t.sendDownloadProgressEventLocked()
96				progressUpdates = t.computeProgressUpdates()
97			} else {
98				l.Debugln("progress emitter: nothing new")
99			}
100
101			if newCount != 0 {
102				t.timer.Reset(t.interval)
103			}
104			t.mut.Unlock()
105
106			// Do the sending outside of the lock.
107			// If these send block, the whole process of reporting progress to others stops, but that's probably fine.
108			// It's better to stop this component from working under back-pressure than causing other components that
109			// rely on this component to be waiting for locks.
110			//
111			// This might leave remote peers in some funky state where we are unable the fact that we no longer have
112			// something, but there is not much we can do here.
113			for _, update := range progressUpdates {
114				update.send(ctx)
115			}
116		}
117	}
118}
119
120func (t *ProgressEmitter) sendDownloadProgressEventLocked() {
121	output := make(map[string]map[string]*pullerProgress)
122	for folder, pullers := range t.registry {
123		if len(pullers) == 0 {
124			continue
125		}
126		output[folder] = make(map[string]*pullerProgress)
127		for name, puller := range pullers {
128			output[folder][name] = puller.Progress()
129		}
130	}
131	t.evLogger.Log(events.DownloadProgress, output)
132	l.Debugf("progress emitter: emitting %#v", output)
133}
134
135func (t *ProgressEmitter) computeProgressUpdates() []progressUpdate {
136	var progressUpdates []progressUpdate
137	for id, conn := range t.connections {
138		for _, folder := range t.foldersByConns[id] {
139			pullers, ok := t.registry[folder]
140			if !ok {
141				// There's never been any puller registered for this folder yet
142				continue
143			}
144
145			state, ok := t.sentDownloadStates[id]
146			if !ok {
147				state = &sentDownloadState{
148					folderStates: make(map[string]*sentFolderDownloadState),
149				}
150				t.sentDownloadStates[id] = state
151			}
152
153			activePullers := make([]*sharedPullerState, 0, len(pullers))
154			for _, puller := range pullers {
155				if puller.folder != folder || puller.file.IsSymlink() || puller.file.IsDirectory() || len(puller.file.Blocks) <= t.minBlocks {
156					continue
157				}
158				activePullers = append(activePullers, puller)
159			}
160
161			// For every new puller that hasn't yet been seen, it will send all the blocks the puller has available
162			// For every existing puller, it will check for new blocks, and send update for the new blocks only
163			// For every puller that we've seen before but is no longer there, we will send a forget message
164			updates := state.update(folder, activePullers)
165
166			if len(updates) > 0 {
167				progressUpdates = append(progressUpdates, progressUpdate{
168					conn:    conn,
169					folder:  folder,
170					updates: updates,
171				})
172			}
173		}
174	}
175
176	// Clean up sentDownloadStates for devices which we are no longer connected to.
177	for id := range t.sentDownloadStates {
178		_, ok := t.connections[id]
179		if !ok {
180			// Null out outstanding entries for device
181			delete(t.sentDownloadStates, id)
182		}
183	}
184
185	// If a folder was unshared from some device, tell it that all temp files
186	// are now gone.
187	for id, state := range t.sentDownloadStates {
188		// For each of the folders that the state is aware of,
189		// try to match it with a shared folder we've discovered above,
190	nextFolder:
191		for _, folder := range state.folders() {
192			for _, existingFolder := range t.foldersByConns[id] {
193				if existingFolder == folder {
194					continue nextFolder
195				}
196			}
197
198			// If we fail to find that folder, we tell the state to forget about it
199			// and return us a list of updates which would clean up the state
200			// on the remote end.
201			state.cleanup(folder)
202			// updates := state.cleanup(folder)
203			// if len(updates) > 0 {
204			// XXX: Don't send this now, as the only way we've unshared a folder
205			// is by breaking the connection and reconnecting, hence sending
206			// forget messages for some random folder currently makes no sense.
207			// deviceConns[id].DownloadProgress(folder, updates, 0, nil)
208			// }
209		}
210	}
211
212	return progressUpdates
213}
214
215// VerifyConfiguration implements the config.Committer interface
216func (t *ProgressEmitter) VerifyConfiguration(from, to config.Configuration) error {
217	return nil
218}
219
220// CommitConfiguration implements the config.Committer interface
221func (t *ProgressEmitter) CommitConfiguration(_, to config.Configuration) bool {
222	t.mut.Lock()
223	defer t.mut.Unlock()
224
225	newInterval := time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second
226	if newInterval > 0 {
227		if t.disabled {
228			t.disabled = false
229			l.Debugln("progress emitter: enabled")
230		}
231		if t.interval != newInterval {
232			t.interval = newInterval
233			l.Debugln("progress emitter: updated interval", t.interval)
234		}
235	} else if !t.disabled {
236		t.clearLocked()
237		t.disabled = true
238		l.Debugln("progress emitter: disabled")
239	}
240	t.minBlocks = to.Options.TempIndexMinBlocks
241	if t.interval < time.Second {
242		// can't happen when we're not disabled, but better safe than sorry.
243		t.interval = time.Second
244	}
245
246	return true
247}
248
249// Register a puller with the emitter which will start broadcasting pullers
250// progress.
251func (t *ProgressEmitter) Register(s *sharedPullerState) {
252	t.mut.Lock()
253	defer t.mut.Unlock()
254	if t.disabled {
255		l.Debugln("progress emitter: disabled, skip registering")
256		return
257	}
258	l.Debugln("progress emitter: registering", s.folder, s.file.Name)
259	if t.emptyLocked() {
260		t.timer.Reset(t.interval)
261	}
262	if _, ok := t.registry[s.folder]; !ok {
263		t.registry[s.folder] = make(map[string]*sharedPullerState)
264	}
265	t.registry[s.folder][s.file.Name] = s
266}
267
268// Deregister a puller which will stop broadcasting pullers state.
269func (t *ProgressEmitter) Deregister(s *sharedPullerState) {
270	t.mut.Lock()
271	defer t.mut.Unlock()
272
273	if t.disabled {
274		l.Debugln("progress emitter: disabled, skip deregistering")
275		return
276	}
277
278	l.Debugln("progress emitter: deregistering", s.folder, s.file.Name)
279	delete(t.registry[s.folder], s.file.Name)
280}
281
282// BytesCompleted returns the number of bytes completed in the given folder.
283func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) {
284	t.mut.Lock()
285	defer t.mut.Unlock()
286
287	for _, s := range t.registry[folder] {
288		bytes += s.Progress().BytesDone
289	}
290	l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes)
291	return
292}
293
294func (t *ProgressEmitter) String() string {
295	return fmt.Sprintf("ProgressEmitter@%p", t)
296}
297
298func (t *ProgressEmitter) lenRegistry() int {
299	t.mut.Lock()
300	defer t.mut.Unlock()
301	return t.lenRegistryLocked()
302}
303
304func (t *ProgressEmitter) lenRegistryLocked() (out int) {
305	for _, pullers := range t.registry {
306		out += len(pullers)
307	}
308	return out
309}
310
311func (t *ProgressEmitter) emptyLocked() bool {
312	for _, pullers := range t.registry {
313		if len(pullers) != 0 {
314			return false
315		}
316	}
317	return true
318}
319
320func (t *ProgressEmitter) temporaryIndexSubscribe(conn protocol.Connection, folders []string) {
321	t.mut.Lock()
322	defer t.mut.Unlock()
323	t.connections[conn.ID()] = conn
324	t.foldersByConns[conn.ID()] = folders
325}
326
327func (t *ProgressEmitter) temporaryIndexUnsubscribe(conn protocol.Connection) {
328	t.mut.Lock()
329	defer t.mut.Unlock()
330	delete(t.connections, conn.ID())
331	delete(t.foldersByConns, conn.ID())
332}
333
334func (t *ProgressEmitter) clearLocked() {
335	for id, state := range t.sentDownloadStates {
336		conn, ok := t.connections[id]
337		if !ok {
338			continue
339		}
340		for _, folder := range state.folders() {
341			if updates := state.cleanup(folder); len(updates) > 0 {
342				conn.DownloadProgress(context.Background(), folder, updates)
343			}
344		}
345	}
346	t.registry = make(map[string]map[string]*sharedPullerState)
347	t.sentDownloadStates = make(map[protocol.DeviceID]*sentDownloadState)
348	t.connections = make(map[protocol.DeviceID]protocol.Connection)
349	t.foldersByConns = make(map[protocol.DeviceID][]string)
350}
351