1// Copyright (C) 2014 The Syncthing Authors. 2// 3// This Source Code Form is subject to the terms of the Mozilla Public 4// License, v. 2.0. If a copy of the MPL was not distributed with this file, 5// You can obtain one at https://mozilla.org/MPL/2.0/. 6 7package model 8 9import ( 10 "context" 11 "fmt" 12 "time" 13 14 "github.com/syncthing/syncthing/lib/config" 15 "github.com/syncthing/syncthing/lib/events" 16 "github.com/syncthing/syncthing/lib/protocol" 17 "github.com/syncthing/syncthing/lib/sync" 18) 19 20type ProgressEmitter struct { 21 cfg config.Wrapper 22 registry map[string]map[string]*sharedPullerState // folder: name: puller 23 interval time.Duration 24 minBlocks int 25 sentDownloadStates map[protocol.DeviceID]*sentDownloadState // States representing what we've sent to the other peer via DownloadProgress messages. 26 connections map[protocol.DeviceID]protocol.Connection 27 foldersByConns map[protocol.DeviceID][]string 28 disabled bool 29 evLogger events.Logger 30 mut sync.Mutex 31 32 timer *time.Timer 33} 34 35type progressUpdate struct { 36 conn protocol.Connection 37 folder string 38 updates []protocol.FileDownloadProgressUpdate 39} 40 41func (p progressUpdate) send(ctx context.Context) { 42 p.conn.DownloadProgress(ctx, p.folder, p.updates) 43} 44 45// NewProgressEmitter creates a new progress emitter which emits 46// DownloadProgress events every interval. 47func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmitter { 48 t := &ProgressEmitter{ 49 cfg: cfg, 50 registry: make(map[string]map[string]*sharedPullerState), 51 timer: time.NewTimer(time.Millisecond), 52 sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState), 53 connections: make(map[protocol.DeviceID]protocol.Connection), 54 foldersByConns: make(map[protocol.DeviceID][]string), 55 evLogger: evLogger, 56 mut: sync.NewMutex(), 57 } 58 59 t.CommitConfiguration(config.Configuration{}, cfg.RawCopy()) 60 61 return t 62} 63 64// serve starts the progress emitter which starts emitting DownloadProgress 65// events as the progress happens. 66func (t *ProgressEmitter) Serve(ctx context.Context) error { 67 t.cfg.Subscribe(t) 68 defer t.cfg.Unsubscribe(t) 69 70 var lastUpdate time.Time 71 var lastCount, newCount int 72 for { 73 select { 74 case <-ctx.Done(): 75 l.Debugln("progress emitter: stopping") 76 return nil 77 case <-t.timer.C: 78 t.mut.Lock() 79 l.Debugln("progress emitter: timer - looking after", len(t.registry)) 80 81 newLastUpdated := lastUpdate 82 newCount = t.lenRegistryLocked() 83 var progressUpdates []progressUpdate 84 for _, pullers := range t.registry { 85 for _, puller := range pullers { 86 if updated := puller.Updated(); updated.After(newLastUpdated) { 87 newLastUpdated = updated 88 } 89 } 90 } 91 92 if !newLastUpdated.Equal(lastUpdate) || newCount != lastCount { 93 lastUpdate = newLastUpdated 94 lastCount = newCount 95 t.sendDownloadProgressEventLocked() 96 progressUpdates = t.computeProgressUpdates() 97 } else { 98 l.Debugln("progress emitter: nothing new") 99 } 100 101 if newCount != 0 { 102 t.timer.Reset(t.interval) 103 } 104 t.mut.Unlock() 105 106 // Do the sending outside of the lock. 107 // If these send block, the whole process of reporting progress to others stops, but that's probably fine. 108 // It's better to stop this component from working under back-pressure than causing other components that 109 // rely on this component to be waiting for locks. 110 // 111 // This might leave remote peers in some funky state where we are unable the fact that we no longer have 112 // something, but there is not much we can do here. 113 for _, update := range progressUpdates { 114 update.send(ctx) 115 } 116 } 117 } 118} 119 120func (t *ProgressEmitter) sendDownloadProgressEventLocked() { 121 output := make(map[string]map[string]*pullerProgress) 122 for folder, pullers := range t.registry { 123 if len(pullers) == 0 { 124 continue 125 } 126 output[folder] = make(map[string]*pullerProgress) 127 for name, puller := range pullers { 128 output[folder][name] = puller.Progress() 129 } 130 } 131 t.evLogger.Log(events.DownloadProgress, output) 132 l.Debugf("progress emitter: emitting %#v", output) 133} 134 135func (t *ProgressEmitter) computeProgressUpdates() []progressUpdate { 136 var progressUpdates []progressUpdate 137 for id, conn := range t.connections { 138 for _, folder := range t.foldersByConns[id] { 139 pullers, ok := t.registry[folder] 140 if !ok { 141 // There's never been any puller registered for this folder yet 142 continue 143 } 144 145 state, ok := t.sentDownloadStates[id] 146 if !ok { 147 state = &sentDownloadState{ 148 folderStates: make(map[string]*sentFolderDownloadState), 149 } 150 t.sentDownloadStates[id] = state 151 } 152 153 activePullers := make([]*sharedPullerState, 0, len(pullers)) 154 for _, puller := range pullers { 155 if puller.folder != folder || puller.file.IsSymlink() || puller.file.IsDirectory() || len(puller.file.Blocks) <= t.minBlocks { 156 continue 157 } 158 activePullers = append(activePullers, puller) 159 } 160 161 // For every new puller that hasn't yet been seen, it will send all the blocks the puller has available 162 // For every existing puller, it will check for new blocks, and send update for the new blocks only 163 // For every puller that we've seen before but is no longer there, we will send a forget message 164 updates := state.update(folder, activePullers) 165 166 if len(updates) > 0 { 167 progressUpdates = append(progressUpdates, progressUpdate{ 168 conn: conn, 169 folder: folder, 170 updates: updates, 171 }) 172 } 173 } 174 } 175 176 // Clean up sentDownloadStates for devices which we are no longer connected to. 177 for id := range t.sentDownloadStates { 178 _, ok := t.connections[id] 179 if !ok { 180 // Null out outstanding entries for device 181 delete(t.sentDownloadStates, id) 182 } 183 } 184 185 // If a folder was unshared from some device, tell it that all temp files 186 // are now gone. 187 for id, state := range t.sentDownloadStates { 188 // For each of the folders that the state is aware of, 189 // try to match it with a shared folder we've discovered above, 190 nextFolder: 191 for _, folder := range state.folders() { 192 for _, existingFolder := range t.foldersByConns[id] { 193 if existingFolder == folder { 194 continue nextFolder 195 } 196 } 197 198 // If we fail to find that folder, we tell the state to forget about it 199 // and return us a list of updates which would clean up the state 200 // on the remote end. 201 state.cleanup(folder) 202 // updates := state.cleanup(folder) 203 // if len(updates) > 0 { 204 // XXX: Don't send this now, as the only way we've unshared a folder 205 // is by breaking the connection and reconnecting, hence sending 206 // forget messages for some random folder currently makes no sense. 207 // deviceConns[id].DownloadProgress(folder, updates, 0, nil) 208 // } 209 } 210 } 211 212 return progressUpdates 213} 214 215// VerifyConfiguration implements the config.Committer interface 216func (t *ProgressEmitter) VerifyConfiguration(from, to config.Configuration) error { 217 return nil 218} 219 220// CommitConfiguration implements the config.Committer interface 221func (t *ProgressEmitter) CommitConfiguration(_, to config.Configuration) bool { 222 t.mut.Lock() 223 defer t.mut.Unlock() 224 225 newInterval := time.Duration(to.Options.ProgressUpdateIntervalS) * time.Second 226 if newInterval > 0 { 227 if t.disabled { 228 t.disabled = false 229 l.Debugln("progress emitter: enabled") 230 } 231 if t.interval != newInterval { 232 t.interval = newInterval 233 l.Debugln("progress emitter: updated interval", t.interval) 234 } 235 } else if !t.disabled { 236 t.clearLocked() 237 t.disabled = true 238 l.Debugln("progress emitter: disabled") 239 } 240 t.minBlocks = to.Options.TempIndexMinBlocks 241 if t.interval < time.Second { 242 // can't happen when we're not disabled, but better safe than sorry. 243 t.interval = time.Second 244 } 245 246 return true 247} 248 249// Register a puller with the emitter which will start broadcasting pullers 250// progress. 251func (t *ProgressEmitter) Register(s *sharedPullerState) { 252 t.mut.Lock() 253 defer t.mut.Unlock() 254 if t.disabled { 255 l.Debugln("progress emitter: disabled, skip registering") 256 return 257 } 258 l.Debugln("progress emitter: registering", s.folder, s.file.Name) 259 if t.emptyLocked() { 260 t.timer.Reset(t.interval) 261 } 262 if _, ok := t.registry[s.folder]; !ok { 263 t.registry[s.folder] = make(map[string]*sharedPullerState) 264 } 265 t.registry[s.folder][s.file.Name] = s 266} 267 268// Deregister a puller which will stop broadcasting pullers state. 269func (t *ProgressEmitter) Deregister(s *sharedPullerState) { 270 t.mut.Lock() 271 defer t.mut.Unlock() 272 273 if t.disabled { 274 l.Debugln("progress emitter: disabled, skip deregistering") 275 return 276 } 277 278 l.Debugln("progress emitter: deregistering", s.folder, s.file.Name) 279 delete(t.registry[s.folder], s.file.Name) 280} 281 282// BytesCompleted returns the number of bytes completed in the given folder. 283func (t *ProgressEmitter) BytesCompleted(folder string) (bytes int64) { 284 t.mut.Lock() 285 defer t.mut.Unlock() 286 287 for _, s := range t.registry[folder] { 288 bytes += s.Progress().BytesDone 289 } 290 l.Debugf("progress emitter: bytes completed for %s: %d", folder, bytes) 291 return 292} 293 294func (t *ProgressEmitter) String() string { 295 return fmt.Sprintf("ProgressEmitter@%p", t) 296} 297 298func (t *ProgressEmitter) lenRegistry() int { 299 t.mut.Lock() 300 defer t.mut.Unlock() 301 return t.lenRegistryLocked() 302} 303 304func (t *ProgressEmitter) lenRegistryLocked() (out int) { 305 for _, pullers := range t.registry { 306 out += len(pullers) 307 } 308 return out 309} 310 311func (t *ProgressEmitter) emptyLocked() bool { 312 for _, pullers := range t.registry { 313 if len(pullers) != 0 { 314 return false 315 } 316 } 317 return true 318} 319 320func (t *ProgressEmitter) temporaryIndexSubscribe(conn protocol.Connection, folders []string) { 321 t.mut.Lock() 322 defer t.mut.Unlock() 323 t.connections[conn.ID()] = conn 324 t.foldersByConns[conn.ID()] = folders 325} 326 327func (t *ProgressEmitter) temporaryIndexUnsubscribe(conn protocol.Connection) { 328 t.mut.Lock() 329 defer t.mut.Unlock() 330 delete(t.connections, conn.ID()) 331 delete(t.foldersByConns, conn.ID()) 332} 333 334func (t *ProgressEmitter) clearLocked() { 335 for id, state := range t.sentDownloadStates { 336 conn, ok := t.connections[id] 337 if !ok { 338 continue 339 } 340 for _, folder := range state.folders() { 341 if updates := state.cleanup(folder); len(updates) > 0 { 342 conn.DownloadProgress(context.Background(), folder, updates) 343 } 344 } 345 } 346 t.registry = make(map[string]map[string]*sharedPullerState) 347 t.sentDownloadStates = make(map[protocol.DeviceID]*sentDownloadState) 348 t.connections = make(map[protocol.DeviceID]protocol.Connection) 349 t.foldersByConns = make(map[protocol.DeviceID][]string) 350} 351