1package tsm1 2 3import ( 4 "encoding/binary" 5 "encoding/json" 6 "errors" 7 "io" 8 9 "github.com/golang/snappy" 10) 11 12var ( 13 // ErrNoDigestManifest is returned if an attempt is made to write other parts of a 14 // digest before writing the manifest. 15 ErrNoDigestManifest = errors.New("no digest manifest") 16 17 // ErrDigestAlreadyWritten is returned if the client attempts to write more than 18 // one manifest. 19 ErrDigestAlreadyWritten = errors.New("digest manifest already written") 20) 21 22// DigestWriter allows for writing a digest of a shard. A digest is a condensed 23// representation of the contents of a shard. It can be scoped to one or more series 24// keys, ranges of times or sets of files. 25type DigestWriter struct { 26 w io.WriteCloser 27 sw *snappy.Writer 28 manifestWritten bool 29} 30 31func NewDigestWriter(w io.WriteCloser) (*DigestWriter, error) { 32 return &DigestWriter{w: w, sw: snappy.NewBufferedWriter(w)}, nil 33} 34 35func (w *DigestWriter) WriteManifest(m *DigestManifest) error { 36 if w.manifestWritten { 37 return ErrDigestAlreadyWritten 38 } 39 40 b, err := json.Marshal(m) 41 if err != nil { 42 return err 43 } 44 45 // Write length of manifest. 46 if err := binary.Write(w.sw, binary.BigEndian, uint32(len(b))); err != nil { 47 return err 48 } 49 50 // Write manifest. 51 if _, err = w.sw.Write(b); err != nil { 52 return err 53 } 54 55 w.manifestWritten = true 56 57 return err 58} 59 60func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error { 61 if !w.manifestWritten { 62 return ErrNoDigestManifest 63 } 64 65 if err := binary.Write(w.sw, binary.BigEndian, uint16(len(key))); err != nil { 66 return err 67 } 68 69 if _, err := w.sw.Write([]byte(key)); err != nil { 70 return err 71 } 72 73 if err := binary.Write(w.sw, binary.BigEndian, uint32(t.Len())); err != nil { 74 return err 75 } 76 77 for _, tr := range t.Ranges { 78 if err := binary.Write(w.sw, binary.BigEndian, tr.Min); err != nil { 79 return err 80 } 81 82 if err := binary.Write(w.sw, binary.BigEndian, tr.Max); err != nil { 83 return err 84 } 85 86 if err := binary.Write(w.sw, binary.BigEndian, tr.CRC); err != nil { 87 return err 88 } 89 90 if err := binary.Write(w.sw, binary.BigEndian, uint16(tr.N)); err != nil { 91 return err 92 } 93 } 94 95 return nil 96} 97 98func (w *DigestWriter) Flush() error { 99 return w.sw.Flush() 100} 101 102func (w *DigestWriter) Close() error { 103 if err := w.Flush(); err != nil { 104 return err 105 } 106 107 if err := w.sw.Close(); err != nil { 108 return err 109 } 110 111 return w.w.Close() 112} 113 114type DigestTimeSpan struct { 115 Ranges []DigestTimeRange 116} 117 118func (a DigestTimeSpan) Len() int { return len(a.Ranges) } 119func (a DigestTimeSpan) Swap(i, j int) { a.Ranges[i], a.Ranges[j] = a.Ranges[j], a.Ranges[i] } 120func (a DigestTimeSpan) Less(i, j int) bool { 121 return a.Ranges[i].Min < a.Ranges[j].Min 122} 123 124func (t *DigestTimeSpan) Add(min, max int64, n int, crc uint32) { 125 for _, v := range t.Ranges { 126 if v.Min == min && v.Max == max && v.N == n && v.CRC == crc { 127 return 128 } 129 } 130 t.Ranges = append(t.Ranges, DigestTimeRange{Min: min, Max: max, N: n, CRC: crc}) 131} 132 133type DigestTimeRange struct { 134 Min, Max int64 135 N int 136 CRC uint32 137} 138