1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18	"fmt"
19	"os"
20	"path/filepath"
21
22	"go.etcd.io/etcd/pkg/fileutil"
23
24	"go.uber.org/zap"
25)
26
27// filePipeline pipelines allocating disk space
28type filePipeline struct {
29	lg *zap.Logger
30
31	// dir to put files
32	dir string
33	// size of files to make, in bytes
34	size int64
35	// count number of files generated
36	count int
37
38	filec chan *fileutil.LockedFile
39	errc  chan error
40	donec chan struct{}
41}
42
43func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline {
44	if lg == nil {
45		lg = zap.NewNop()
46	}
47	fp := &filePipeline{
48		lg:    lg,
49		dir:   dir,
50		size:  fileSize,
51		filec: make(chan *fileutil.LockedFile),
52		errc:  make(chan error, 1),
53		donec: make(chan struct{}),
54	}
55	go fp.run()
56	return fp
57}
58
59// Open returns a fresh file for writing. Rename the file before calling
60// Open again or there will be file collisions.
61func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
62	select {
63	case f = <-fp.filec:
64	case err = <-fp.errc:
65	}
66	return f, err
67}
68
69func (fp *filePipeline) Close() error {
70	close(fp.donec)
71	return <-fp.errc
72}
73
74func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
75	// count % 2 so this file isn't the same as the one last published
76	fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
77	if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
78		return nil, err
79	}
80	if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
81		fp.lg.Error("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err))
82		f.Close()
83		return nil, err
84	}
85	fp.count++
86	return f, nil
87}
88
89func (fp *filePipeline) run() {
90	defer close(fp.errc)
91	for {
92		f, err := fp.alloc()
93		if err != nil {
94			fp.errc <- err
95			return
96		}
97		select {
98		case fp.filec <- f:
99		case <-fp.donec:
100			os.Remove(f.Name())
101			f.Close()
102			return
103		}
104	}
105}
106