1// Copyright 2015 CoreOS, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18	"bufio"
19	"encoding/binary"
20	"hash"
21	"io"
22	"sync"
23
24	"github.com/coreos/etcd/pkg/crc"
25	"github.com/coreos/etcd/wal/walpb"
26)
27
28type encoder struct {
29	mu sync.Mutex
30	bw *bufio.Writer
31
32	crc       hash.Hash32
33	buf       []byte
34	uint64buf []byte
35}
36
37func newEncoder(w io.Writer, prevCrc uint32) *encoder {
38	return &encoder{
39		bw:  bufio.NewWriter(w),
40		crc: crc.New(prevCrc, crcTable),
41		// 1MB buffer
42		buf:       make([]byte, 1024*1024),
43		uint64buf: make([]byte, 8),
44	}
45}
46
47func (e *encoder) encode(rec *walpb.Record) error {
48	e.mu.Lock()
49	defer e.mu.Unlock()
50
51	e.crc.Write(rec.Data)
52	rec.Crc = e.crc.Sum32()
53	var (
54		data []byte
55		err  error
56		n    int
57	)
58
59	if rec.Size() > len(e.buf) {
60		data, err = rec.Marshal()
61		if err != nil {
62			return err
63		}
64	} else {
65		n, err = rec.MarshalTo(e.buf)
66		if err != nil {
67			return err
68		}
69		data = e.buf[:n]
70	}
71	if err = writeInt64(e.bw, int64(len(data)), e.uint64buf); err != nil {
72		return err
73	}
74	_, err = e.bw.Write(data)
75	return err
76}
77
78func (e *encoder) flush() error {
79	e.mu.Lock()
80	defer e.mu.Unlock()
81	return e.bw.Flush()
82}
83
84func writeInt64(w io.Writer, n int64, buf []byte) error {
85	// http://golang.org/src/encoding/binary/binary.go
86	binary.LittleEndian.PutUint64(buf, uint64(n))
87	_, err := w.Write(buf)
88	return err
89}
90