1/* 2Copyright 2017 The Perkeep Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17// Package union registers the "union" read-only blobserver storage type 18// to read from the given subsets, serving the first responding. 19package union // import "perkeep.org/pkg/blobserver/union" 20 21import ( 22 "context" 23 "errors" 24 "io" 25 "sync" 26 27 "go4.org/jsonconfig" 28 "perkeep.org/pkg/blob" 29 "perkeep.org/pkg/blobserver" 30) 31 32type unionStorage struct { 33 subsets []blobserver.Storage 34} 35 36func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) { 37 sto := &unionStorage{} 38 39 reads := conf.RequiredList("subsets") 40 if err := conf.Validate(); err != nil { 41 return nil, err 42 } 43 44 for _, s := range reads { 45 rs, err := ld.GetStorage(s) 46 if err != nil { 47 return nil, err 48 } 49 sto.subsets = append(sto.subsets, rs) 50 } 51 52 return sto, nil 53} 54 55// ReceiveBlob would receive the blobs, but now just returns ErrReadonly. 56func (sto *unionStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) { 57 return blob.SizedRef{}, blobserver.ErrReadonly 58} 59 60// RemoveBlobs would remove the given blobs, but now just returns ErrReadonly. 61func (sto *unionStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error { 62 return blobserver.ErrReadonly 63} 64 65// Fetch the blob by trying all configured read Storage concurrently, 66// returning the first successful response, or the first error if there's no match. 67func (sto *unionStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) { 68 type result struct { 69 file io.ReadCloser 70 size uint32 71 err error 72 } 73 results := make(chan result, len(sto.subsets)) 74 var wg sync.WaitGroup 75 for _, bs := range sto.subsets { 76 bs := bs 77 wg.Add(1) 78 go func() { 79 defer wg.Done() 80 var res result 81 res.file, res.size, res.err = bs.Fetch(ctx, b) 82 results <- res 83 }() 84 } 85 go func() { 86 wg.Wait() 87 close(results) 88 }() 89 var firstErr error 90 var firstRes result 91 for r := range results { 92 if r.err != nil { 93 if firstErr == nil { 94 firstErr = r.err 95 } 96 continue 97 } 98 if firstRes.file != nil { 99 if r.file != nil { 100 r.file.Close() // don't need, we already have a successful Fetch 101 } 102 continue 103 } 104 105 firstRes = r 106 } 107 if firstRes.file != nil { 108 return firstRes.file, firstRes.size, nil 109 } 110 return nil, 0, firstErr 111} 112 113// StatBlobs on all BlobStatter reads sequentially, returning the first error. 114func (sto *unionStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f func(blob.SizedRef) error) error { 115 if err := ctx.Err(); err != nil { 116 return err 117 } 118 // need to dedup the blobs 119 maybeDup := make(chan blob.SizedRef) 120 errCh := make(chan error, 1) 121 var wg sync.WaitGroup 122 var any bool 123 for _, s := range sto.subsets { 124 if bs, ok := s.(blobserver.BlobStatter); ok { 125 any = true 126 wg.Add(1) 127 go func() { 128 defer wg.Done() 129 if err := bs.StatBlobs(ctx, blobs, func(sr blob.SizedRef) error { 130 maybeDup <- sr 131 return nil 132 }); err != nil { 133 errCh <- err 134 } 135 }() 136 } 137 } 138 if !any { 139 return errors.New("union: No BlobStatter reader configured") 140 } 141 142 var closeChanOnce sync.Once 143 go func() { 144 wg.Wait() 145 closeChanOnce.Do(func() { close(maybeDup) }) 146 }() 147 148 seen := make(map[blob.Ref]struct{}, len(blobs)) 149 for { 150 select { 151 case <-ctx.Done(): 152 return ctx.Err() 153 case err := <-errCh: 154 closeChanOnce.Do(func() { close(maybeDup) }) 155 return err 156 case sr, ok := <-maybeDup: 157 if !ok { 158 return nil 159 } 160 if _, ok = seen[sr.Ref]; !ok { 161 seen[sr.Ref] = struct{}{} 162 if err := f(sr); err != nil { 163 return err 164 } 165 } 166 } 167 } 168} 169 170// EnumerateBlobs concurrently on the readers, returning one of the errors. 171func (sto *unionStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error { 172 return blobserver.MergedEnumerateStorage(ctx, dest, sto.subsets, after, limit) 173} 174 175func init() { 176 blobserver.RegisterStorageConstructor("union", blobserver.StorageConstructor(newFromConfig)) 177} 178