1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"fmt"
11	"sync/atomic"
12	"time"
13
14	"github.com/syndtr/goleveldb/leveldb/journal"
15	"github.com/syndtr/goleveldb/leveldb/storage"
16)
17
18// Logging.
19
20type dropper struct {
21	s  *session
22	fd storage.FileDesc
23}
24
25func (d dropper) Drop(err error) {
26	if e, ok := err.(*journal.ErrCorrupted); ok {
27		d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason)
28	} else {
29		d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
30	}
31}
32
33func (s *session) log(v ...interface{})                 { s.stor.Log(fmt.Sprint(v...)) }
34func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
35
36// File utils.
37
38func (s *session) newTemp() storage.FileDesc {
39	num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
40	return storage.FileDesc{Type: storage.TypeTemp, Num: num}
41}
42
43// Session state.
44
45const (
46	// maxCachedNumber represents the maximum number of version tasks
47	// that can be cached in the ref loop.
48	maxCachedNumber = 256
49
50	// maxCachedTime represents the maximum time for ref loop to cache
51	// a version task.
52	maxCachedTime = 5 * time.Minute
53)
54
55// vDelta indicates the change information between the next version
56// and the currently specified version
57type vDelta struct {
58	vid     int64
59	added   []int64
60	deleted []int64
61}
62
63// vTask defines a version task for either reference or release.
64type vTask struct {
65	vid     int64
66	files   []tFiles
67	created time.Time
68}
69
70func (s *session) refLoop() {
71	var (
72		fileRef    = make(map[int64]int)    // Table file reference counter
73		ref        = make(map[int64]*vTask) // Current referencing version store
74		deltas     = make(map[int64]*vDelta)
75		referenced = make(map[int64]struct{})
76		released   = make(map[int64]*vDelta)  // Released version that waiting for processing
77		abandoned  = make(map[int64]struct{}) // Abandoned version id
78		next, last int64
79	)
80	// addFileRef adds file reference counter with specified file number and
81	// reference value
82	addFileRef := func(fnum int64, ref int) int {
83		ref += fileRef[fnum]
84		if ref > 0 {
85			fileRef[fnum] = ref
86		} else if ref == 0 {
87			delete(fileRef, fnum)
88		} else {
89			panic(fmt.Sprintf("negative ref: %v", fnum))
90		}
91		return ref
92	}
93	// skipAbandoned skips useless abandoned version id.
94	skipAbandoned := func() bool {
95		if _, exist := abandoned[next]; exist {
96			delete(abandoned, next)
97			return true
98		}
99		return false
100	}
101	// applyDelta applies version change to current file reference.
102	applyDelta := func(d *vDelta) {
103		for _, t := range d.added {
104			addFileRef(t, 1)
105		}
106		for _, t := range d.deleted {
107			if addFileRef(t, -1) == 0 {
108				s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t})
109			}
110		}
111	}
112
113	timer := time.NewTimer(0)
114	<-timer.C // discard the initial tick
115	defer timer.Stop()
116
117	// processTasks processes version tasks in strict order.
118	//
119	// If we want to use delta to reduce the cost of file references and dereferences,
120	// we must strictly follow the id of the version, otherwise some files that are
121	// being referenced will be deleted.
122	//
123	// In addition, some db operations (such as iterators) may cause a version to be
124	// referenced for a long time. In order to prevent such operations from blocking
125	// the entire processing queue, we will properly convert some of the version tasks
126	// into full file references and releases.
127	processTasks := func() {
128		timer.Reset(maxCachedTime)
129		// Make sure we don't cache too many version tasks.
130		for {
131			// Skip any abandoned version number to prevent blocking processing.
132			if skipAbandoned() {
133				next += 1
134				continue
135			}
136			// Don't bother the version that has been released.
137			if _, exist := released[next]; exist {
138				break
139			}
140			// Ensure the specified version has been referenced.
141			if _, exist := ref[next]; !exist {
142				break
143			}
144			if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime {
145				break
146			}
147			// Convert version task into full file references and releases mode.
148			// Reference version(i+1) first and wait version(i) to release.
149			// FileRef(i+1) = FileRef(i) + Delta(i)
150			for _, tt := range ref[next].files {
151				for _, t := range tt {
152					addFileRef(t.fd.Num, 1)
153				}
154			}
155			// Note, if some compactions take a long time, even more than 5 minutes,
156			// we may miss the corresponding delta information here.
157			// Fortunately it will not affect the correctness of the file reference,
158			// and we can apply the delta once we receive it.
159			if d := deltas[next]; d != nil {
160				applyDelta(d)
161			}
162			referenced[next] = struct{}{}
163			delete(ref, next)
164			delete(deltas, next)
165			next += 1
166		}
167
168		// Use delta information to process all released versions.
169		for {
170			if skipAbandoned() {
171				next += 1
172				continue
173			}
174			if d, exist := released[next]; exist {
175				if d != nil {
176					applyDelta(d)
177				}
178				delete(released, next)
179				next += 1
180				continue
181			}
182			return
183		}
184	}
185
186	for {
187		processTasks()
188
189		select {
190		case t := <-s.refCh:
191			if _, exist := ref[t.vid]; exist {
192				panic("duplicate reference request")
193			}
194			ref[t.vid] = t
195			if t.vid > last {
196				last = t.vid
197			}
198
199		case d := <-s.deltaCh:
200			if _, exist := ref[d.vid]; !exist {
201				if _, exist2 := referenced[d.vid]; !exist2 {
202					panic("invalid release request")
203				}
204				// The reference opt is already expired, apply
205				// delta here.
206				applyDelta(d)
207				continue
208			}
209			deltas[d.vid] = d
210
211		case t := <-s.relCh:
212			if _, exist := referenced[t.vid]; exist {
213				for _, tt := range t.files {
214					for _, t := range tt {
215						if addFileRef(t.fd.Num, -1) == 0 {
216							s.tops.remove(t.fd)
217						}
218					}
219				}
220				delete(referenced, t.vid)
221				continue
222			}
223			if _, exist := ref[t.vid]; !exist {
224				panic("invalid release request")
225			}
226			released[t.vid] = deltas[t.vid]
227			delete(deltas, t.vid)
228			delete(ref, t.vid)
229
230		case id := <-s.abandon:
231			if id >= next {
232				abandoned[id] = struct{}{}
233			}
234
235		case <-timer.C:
236
237		case r := <-s.fileRefCh:
238			ref := make(map[int64]int)
239			for f, c := range fileRef {
240				ref[f] = c
241			}
242			r <- ref
243
244		case <-s.closeC:
245			s.closeW.Done()
246			return
247		}
248	}
249}
250
251// Get current version. This will incr version ref, must call
252// version.release (exactly once) after use.
253func (s *session) version() *version {
254	s.vmu.Lock()
255	defer s.vmu.Unlock()
256	s.stVersion.incref()
257	return s.stVersion
258}
259
260func (s *session) tLen(level int) int {
261	s.vmu.Lock()
262	defer s.vmu.Unlock()
263	return s.stVersion.tLen(level)
264}
265
266// Set current version to v.
267func (s *session) setVersion(r *sessionRecord, v *version) {
268	s.vmu.Lock()
269	defer s.vmu.Unlock()
270	// Hold by session. It is important to call this first before releasing
271	// current version, otherwise the still used files might get released.
272	v.incref()
273	if s.stVersion != nil {
274		if r != nil {
275			var (
276				added   = make([]int64, 0, len(r.addedTables))
277				deleted = make([]int64, 0, len(r.deletedTables))
278			)
279			for _, t := range r.addedTables {
280				added = append(added, t.num)
281			}
282			for _, t := range r.deletedTables {
283				deleted = append(deleted, t.num)
284			}
285			select {
286			case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}:
287			case <-v.s.closeC:
288				s.log("reference loop already exist")
289			}
290		}
291		// Release current version.
292		s.stVersion.releaseNB()
293	}
294	s.stVersion = v
295}
296
297// Get current unused file number.
298func (s *session) nextFileNum() int64 {
299	return atomic.LoadInt64(&s.stNextFileNum)
300}
301
302// Set current unused file number to num.
303func (s *session) setNextFileNum(num int64) {
304	atomic.StoreInt64(&s.stNextFileNum, num)
305}
306
307// Mark file number as used.
308func (s *session) markFileNum(num int64) {
309	nextFileNum := num + 1
310	for {
311		old, x := atomic.LoadInt64(&s.stNextFileNum), nextFileNum
312		if old > x {
313			x = old
314		}
315		if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
316			break
317		}
318	}
319}
320
321// Allocate a file number.
322func (s *session) allocFileNum() int64 {
323	return atomic.AddInt64(&s.stNextFileNum, 1) - 1
324}
325
326// Reuse given file number.
327func (s *session) reuseFileNum(num int64) {
328	for {
329		old, x := atomic.LoadInt64(&s.stNextFileNum), num
330		if old != x+1 {
331			x = old
332		}
333		if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
334			break
335		}
336	}
337}
338
339// Set compaction ptr at given level; need external synchronization.
340func (s *session) setCompPtr(level int, ik internalKey) {
341	if level >= len(s.stCompPtrs) {
342		newCompPtrs := make([]internalKey, level+1)
343		copy(newCompPtrs, s.stCompPtrs)
344		s.stCompPtrs = newCompPtrs
345	}
346	s.stCompPtrs[level] = append(internalKey{}, ik...)
347}
348
349// Get compaction ptr at given level; need external synchronization.
350func (s *session) getCompPtr(level int) internalKey {
351	if level >= len(s.stCompPtrs) {
352		return nil
353	}
354	return s.stCompPtrs[level]
355}
356
357// Manifest related utils.
358
359// Fill given session record obj with current states; need external
360// synchronization.
361func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
362	r.setNextFileNum(s.nextFileNum())
363
364	if snapshot {
365		if !r.has(recJournalNum) {
366			r.setJournalNum(s.stJournalNum)
367		}
368
369		if !r.has(recSeqNum) {
370			r.setSeqNum(s.stSeqNum)
371		}
372
373		for level, ik := range s.stCompPtrs {
374			if ik != nil {
375				r.addCompPtr(level, ik)
376			}
377		}
378
379		r.setComparer(s.icmp.uName())
380	}
381}
382
383// Mark if record has been committed, this will update session state;
384// need external synchronization.
385func (s *session) recordCommited(rec *sessionRecord) {
386	if rec.has(recJournalNum) {
387		s.stJournalNum = rec.journalNum
388	}
389
390	if rec.has(recPrevJournalNum) {
391		s.stPrevJournalNum = rec.prevJournalNum
392	}
393
394	if rec.has(recSeqNum) {
395		s.stSeqNum = rec.seqNum
396	}
397
398	for _, r := range rec.compPtrs {
399		s.setCompPtr(r.level, internalKey(r.ikey))
400	}
401}
402
403// Create a new manifest file; need external synchronization.
404func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
405	fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()}
406	writer, err := s.stor.Create(fd)
407	if err != nil {
408		return
409	}
410	jw := journal.NewWriter(writer)
411
412	if v == nil {
413		v = s.version()
414		defer v.release()
415	}
416	if rec == nil {
417		rec = &sessionRecord{}
418	}
419	s.fillRecord(rec, true)
420	v.fillRecord(rec)
421
422	defer func() {
423		if err == nil {
424			s.recordCommited(rec)
425			if s.manifest != nil {
426				s.manifest.Close()
427			}
428			if s.manifestWriter != nil {
429				s.manifestWriter.Close()
430			}
431			if !s.manifestFd.Zero() {
432				s.stor.Remove(s.manifestFd)
433			}
434			s.manifestFd = fd
435			s.manifestWriter = writer
436			s.manifest = jw
437		} else {
438			writer.Close()
439			s.stor.Remove(fd)
440			s.reuseFileNum(fd.Num)
441		}
442	}()
443
444	w, err := jw.Next()
445	if err != nil {
446		return
447	}
448	err = rec.encode(w)
449	if err != nil {
450		return
451	}
452	err = jw.Flush()
453	if err != nil {
454		return
455	}
456	err = s.stor.SetMeta(fd)
457	return
458}
459
460// Flush record to disk.
461func (s *session) flushManifest(rec *sessionRecord) (err error) {
462	s.fillRecord(rec, false)
463	w, err := s.manifest.Next()
464	if err != nil {
465		return
466	}
467	err = rec.encode(w)
468	if err != nil {
469		return
470	}
471	err = s.manifest.Flush()
472	if err != nil {
473		return
474	}
475	if !s.o.GetNoSync() {
476		err = s.manifestWriter.Sync()
477		if err != nil {
478			return
479		}
480	}
481	s.recordCommited(rec)
482	return
483}
484