1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"fmt"
11	"sync/atomic"
12
13	"github.com/syndtr/goleveldb/leveldb/journal"
14	"github.com/syndtr/goleveldb/leveldb/storage"
15)
16
17// Logging.
18
19type dropper struct {
20	s  *session
21	fd storage.FileDesc
22}
23
24func (d dropper) Drop(err error) {
25	if e, ok := err.(*journal.ErrCorrupted); ok {
26		d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason)
27	} else {
28		d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
29	}
30}
31
32func (s *session) log(v ...interface{})                 { s.stor.Log(fmt.Sprint(v...)) }
33func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
34
35// File utils.
36
37func (s *session) newTemp() storage.FileDesc {
38	num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
39	return storage.FileDesc{Type: storage.TypeTemp, Num: num}
40}
41
42func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
43	ref += s.fileRef[fd.Num]
44	if ref > 0 {
45		s.fileRef[fd.Num] = ref
46	} else if ref == 0 {
47		delete(s.fileRef, fd.Num)
48	} else {
49		panic(fmt.Sprintf("negative ref: %v", fd))
50	}
51	return ref
52}
53
54// Session state.
55
56// Get current version. This will incr version ref, must call
57// version.release (exactly once) after use.
58func (s *session) version() *version {
59	s.vmu.Lock()
60	defer s.vmu.Unlock()
61	s.stVersion.incref()
62	return s.stVersion
63}
64
65func (s *session) tLen(level int) int {
66	s.vmu.Lock()
67	defer s.vmu.Unlock()
68	return s.stVersion.tLen(level)
69}
70
71// Set current version to v.
72func (s *session) setVersion(v *version) {
73	s.vmu.Lock()
74	defer s.vmu.Unlock()
75	// Hold by session. It is important to call this first before releasing
76	// current version, otherwise the still used files might get released.
77	v.incref()
78	if s.stVersion != nil {
79		// Release current version.
80		s.stVersion.releaseNB()
81	}
82	s.stVersion = v
83}
84
85// Get current unused file number.
86func (s *session) nextFileNum() int64 {
87	return atomic.LoadInt64(&s.stNextFileNum)
88}
89
90// Set current unused file number to num.
91func (s *session) setNextFileNum(num int64) {
92	atomic.StoreInt64(&s.stNextFileNum, num)
93}
94
95// Mark file number as used.
96func (s *session) markFileNum(num int64) {
97	nextFileNum := num + 1
98	for {
99		old, x := s.stNextFileNum, nextFileNum
100		if old > x {
101			x = old
102		}
103		if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
104			break
105		}
106	}
107}
108
109// Allocate a file number.
110func (s *session) allocFileNum() int64 {
111	return atomic.AddInt64(&s.stNextFileNum, 1) - 1
112}
113
114// Reuse given file number.
115func (s *session) reuseFileNum(num int64) {
116	for {
117		old, x := s.stNextFileNum, num
118		if old != x+1 {
119			x = old
120		}
121		if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
122			break
123		}
124	}
125}
126
127// Set compaction ptr at given level; need external synchronization.
128func (s *session) setCompPtr(level int, ik internalKey) {
129	if level >= len(s.stCompPtrs) {
130		newCompPtrs := make([]internalKey, level+1)
131		copy(newCompPtrs, s.stCompPtrs)
132		s.stCompPtrs = newCompPtrs
133	}
134	s.stCompPtrs[level] = append(internalKey{}, ik...)
135}
136
137// Get compaction ptr at given level; need external synchronization.
138func (s *session) getCompPtr(level int) internalKey {
139	if level >= len(s.stCompPtrs) {
140		return nil
141	}
142	return s.stCompPtrs[level]
143}
144
145// Manifest related utils.
146
147// Fill given session record obj with current states; need external
148// synchronization.
149func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
150	r.setNextFileNum(s.nextFileNum())
151
152	if snapshot {
153		if !r.has(recJournalNum) {
154			r.setJournalNum(s.stJournalNum)
155		}
156
157		if !r.has(recSeqNum) {
158			r.setSeqNum(s.stSeqNum)
159		}
160
161		for level, ik := range s.stCompPtrs {
162			if ik != nil {
163				r.addCompPtr(level, ik)
164			}
165		}
166
167		r.setComparer(s.icmp.uName())
168	}
169}
170
171// Mark if record has been committed, this will update session state;
172// need external synchronization.
173func (s *session) recordCommited(rec *sessionRecord) {
174	if rec.has(recJournalNum) {
175		s.stJournalNum = rec.journalNum
176	}
177
178	if rec.has(recPrevJournalNum) {
179		s.stPrevJournalNum = rec.prevJournalNum
180	}
181
182	if rec.has(recSeqNum) {
183		s.stSeqNum = rec.seqNum
184	}
185
186	for _, r := range rec.compPtrs {
187		s.setCompPtr(r.level, internalKey(r.ikey))
188	}
189}
190
191// Create a new manifest file; need external synchronization.
192func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
193	fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()}
194	writer, err := s.stor.Create(fd)
195	if err != nil {
196		return
197	}
198	jw := journal.NewWriter(writer)
199
200	if v == nil {
201		v = s.version()
202		defer v.release()
203	}
204	if rec == nil {
205		rec = &sessionRecord{}
206	}
207	s.fillRecord(rec, true)
208	v.fillRecord(rec)
209
210	defer func() {
211		if err == nil {
212			s.recordCommited(rec)
213			if s.manifest != nil {
214				s.manifest.Close()
215			}
216			if s.manifestWriter != nil {
217				s.manifestWriter.Close()
218			}
219			if !s.manifestFd.Zero() {
220				s.stor.Remove(s.manifestFd)
221			}
222			s.manifestFd = fd
223			s.manifestWriter = writer
224			s.manifest = jw
225		} else {
226			writer.Close()
227			s.stor.Remove(fd)
228			s.reuseFileNum(fd.Num)
229		}
230	}()
231
232	w, err := jw.Next()
233	if err != nil {
234		return
235	}
236	err = rec.encode(w)
237	if err != nil {
238		return
239	}
240	err = jw.Flush()
241	if err != nil {
242		return
243	}
244	err = s.stor.SetMeta(fd)
245	return
246}
247
248// Flush record to disk.
249func (s *session) flushManifest(rec *sessionRecord) (err error) {
250	s.fillRecord(rec, false)
251	w, err := s.manifest.Next()
252	if err != nil {
253		return
254	}
255	err = rec.encode(w)
256	if err != nil {
257		return
258	}
259	err = s.manifest.Flush()
260	if err != nil {
261		return
262	}
263	if !s.o.GetNoSync() {
264		err = s.manifestWriter.Sync()
265		if err != nil {
266			return
267		}
268	}
269	s.recordCommited(rec)
270	return
271}
272