1package tsm1
2
3import (
4	"encoding/binary"
5	"encoding/json"
6	"errors"
7	"io"
8
9	"github.com/golang/snappy"
10)
11
12var (
13	// ErrNoDigestManifest is returned if an attempt is made to write other parts of a
14	// digest before writing the manifest.
15	ErrNoDigestManifest = errors.New("no digest manifest")
16
17	// ErrDigestAlreadyWritten is returned if the client attempts to write more than
18	// one manifest.
19	ErrDigestAlreadyWritten = errors.New("digest manifest already written")
20)
21
22// DigestWriter allows for writing a digest of a shard.  A digest is a condensed
23// representation of the contents of a shard.  It can be scoped to one or more series
24// keys, ranges of times or sets of files.
25type DigestWriter struct {
26	w               io.WriteCloser
27	sw              *snappy.Writer
28	manifestWritten bool
29}
30
31func NewDigestWriter(w io.WriteCloser) (*DigestWriter, error) {
32	return &DigestWriter{w: w, sw: snappy.NewBufferedWriter(w)}, nil
33}
34
35func (w *DigestWriter) WriteManifest(m *DigestManifest) error {
36	if w.manifestWritten {
37		return ErrDigestAlreadyWritten
38	}
39
40	b, err := json.Marshal(m)
41	if err != nil {
42		return err
43	}
44
45	// Write length of manifest.
46	if err := binary.Write(w.sw, binary.BigEndian, uint32(len(b))); err != nil {
47		return err
48	}
49
50	// Write manifest.
51	if _, err = w.sw.Write(b); err != nil {
52		return err
53	}
54
55	w.manifestWritten = true
56
57	return err
58}
59
60func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error {
61	if !w.manifestWritten {
62		return ErrNoDigestManifest
63	}
64
65	if err := binary.Write(w.sw, binary.BigEndian, uint16(len(key))); err != nil {
66		return err
67	}
68
69	if _, err := w.sw.Write([]byte(key)); err != nil {
70		return err
71	}
72
73	if err := binary.Write(w.sw, binary.BigEndian, uint32(t.Len())); err != nil {
74		return err
75	}
76
77	for _, tr := range t.Ranges {
78		if err := binary.Write(w.sw, binary.BigEndian, tr.Min); err != nil {
79			return err
80		}
81
82		if err := binary.Write(w.sw, binary.BigEndian, tr.Max); err != nil {
83			return err
84		}
85
86		if err := binary.Write(w.sw, binary.BigEndian, tr.CRC); err != nil {
87			return err
88		}
89
90		if err := binary.Write(w.sw, binary.BigEndian, uint16(tr.N)); err != nil {
91			return err
92		}
93	}
94
95	return nil
96}
97
98func (w *DigestWriter) Flush() error {
99	return w.sw.Flush()
100}
101
102func (w *DigestWriter) Close() error {
103	if err := w.Flush(); err != nil {
104		return err
105	}
106
107	if err := w.sw.Close(); err != nil {
108		return err
109	}
110
111	return w.w.Close()
112}
113
114type DigestTimeSpan struct {
115	Ranges []DigestTimeRange
116}
117
118func (a DigestTimeSpan) Len() int      { return len(a.Ranges) }
119func (a DigestTimeSpan) Swap(i, j int) { a.Ranges[i], a.Ranges[j] = a.Ranges[j], a.Ranges[i] }
120func (a DigestTimeSpan) Less(i, j int) bool {
121	return a.Ranges[i].Min < a.Ranges[j].Min
122}
123
124func (t *DigestTimeSpan) Add(min, max int64, n int, crc uint32) {
125	for _, v := range t.Ranges {
126		if v.Min == min && v.Max == max && v.N == n && v.CRC == crc {
127			return
128		}
129	}
130	t.Ranges = append(t.Ranges, DigestTimeRange{Min: min, Max: max, N: n, CRC: crc})
131}
132
133type DigestTimeRange struct {
134	Min, Max int64
135	N        int
136	CRC      uint32
137}
138