1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package wal 16 17import ( 18 "fmt" 19 "os" 20 "path/filepath" 21 22 "go.etcd.io/etcd/pkg/fileutil" 23 24 "go.uber.org/zap" 25) 26 27// filePipeline pipelines allocating disk space 28type filePipeline struct { 29 lg *zap.Logger 30 31 // dir to put files 32 dir string 33 // size of files to make, in bytes 34 size int64 35 // count number of files generated 36 count int 37 38 filec chan *fileutil.LockedFile 39 errc chan error 40 donec chan struct{} 41} 42 43func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline { 44 if lg == nil { 45 lg = zap.NewNop() 46 } 47 fp := &filePipeline{ 48 lg: lg, 49 dir: dir, 50 size: fileSize, 51 filec: make(chan *fileutil.LockedFile), 52 errc: make(chan error, 1), 53 donec: make(chan struct{}), 54 } 55 go fp.run() 56 return fp 57} 58 59// Open returns a fresh file for writing. Rename the file before calling 60// Open again or there will be file collisions. 61func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) { 62 select { 63 case f = <-fp.filec: 64 case err = <-fp.errc: 65 } 66 return f, err 67} 68 69func (fp *filePipeline) Close() error { 70 close(fp.donec) 71 return <-fp.errc 72} 73 74func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) { 75 // count % 2 so this file isn't the same as the one last published 76 fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2)) 77 if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil { 78 return nil, err 79 } 80 if err = fileutil.Preallocate(f.File, fp.size, true); err != nil { 81 fp.lg.Error("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err)) 82 f.Close() 83 return nil, err 84 } 85 fp.count++ 86 return f, nil 87} 88 89func (fp *filePipeline) run() { 90 defer close(fp.errc) 91 for { 92 f, err := fp.alloc() 93 if err != nil { 94 fp.errc <- err 95 return 96 } 97 select { 98 case fp.filec <- f: 99 case <-fp.donec: 100 os.Remove(f.Name()) 101 f.Close() 102 return 103 } 104 } 105} 106