1/*
2Copyright 2017 The Perkeep Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17// Package union registers the "union" read-only blobserver storage type
18// to read from the given subsets, serving the first responding.
19package union // import "perkeep.org/pkg/blobserver/union"
20
21import (
22	"context"
23	"errors"
24	"io"
25	"sync"
26
27	"go4.org/jsonconfig"
28	"perkeep.org/pkg/blob"
29	"perkeep.org/pkg/blobserver"
30)
31
32type unionStorage struct {
33	subsets []blobserver.Storage
34}
35
36func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
37	sto := &unionStorage{}
38
39	reads := conf.RequiredList("subsets")
40	if err := conf.Validate(); err != nil {
41		return nil, err
42	}
43
44	for _, s := range reads {
45		rs, err := ld.GetStorage(s)
46		if err != nil {
47			return nil, err
48		}
49		sto.subsets = append(sto.subsets, rs)
50	}
51
52	return sto, nil
53}
54
55// ReceiveBlob would receive the blobs, but now just returns ErrReadonly.
56func (sto *unionStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
57	return blob.SizedRef{}, blobserver.ErrReadonly
58}
59
60// RemoveBlobs would remove the given blobs, but now just returns ErrReadonly.
61func (sto *unionStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
62	return blobserver.ErrReadonly
63}
64
65// Fetch the blob by trying all configured read Storage concurrently,
66// returning the first successful response, or the first error if there's no match.
67func (sto *unionStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
68	type result struct {
69		file io.ReadCloser
70		size uint32
71		err  error
72	}
73	results := make(chan result, len(sto.subsets))
74	var wg sync.WaitGroup
75	for _, bs := range sto.subsets {
76		bs := bs
77		wg.Add(1)
78		go func() {
79			defer wg.Done()
80			var res result
81			res.file, res.size, res.err = bs.Fetch(ctx, b)
82			results <- res
83		}()
84	}
85	go func() {
86		wg.Wait()
87		close(results)
88	}()
89	var firstErr error
90	var firstRes result
91	for r := range results {
92		if r.err != nil {
93			if firstErr == nil {
94				firstErr = r.err
95			}
96			continue
97		}
98		if firstRes.file != nil {
99			if r.file != nil {
100				r.file.Close() // don't need, we already have a successful Fetch
101			}
102			continue
103		}
104
105		firstRes = r
106	}
107	if firstRes.file != nil {
108		return firstRes.file, firstRes.size, nil
109	}
110	return nil, 0, firstErr
111}
112
113// StatBlobs on all BlobStatter reads sequentially, returning the first error.
114func (sto *unionStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, f func(blob.SizedRef) error) error {
115	if err := ctx.Err(); err != nil {
116		return err
117	}
118	// need to dedup the blobs
119	maybeDup := make(chan blob.SizedRef)
120	errCh := make(chan error, 1)
121	var wg sync.WaitGroup
122	var any bool
123	for _, s := range sto.subsets {
124		if bs, ok := s.(blobserver.BlobStatter); ok {
125			any = true
126			wg.Add(1)
127			go func() {
128				defer wg.Done()
129				if err := bs.StatBlobs(ctx, blobs, func(sr blob.SizedRef) error {
130					maybeDup <- sr
131					return nil
132				}); err != nil {
133					errCh <- err
134				}
135			}()
136		}
137	}
138	if !any {
139		return errors.New("union: No BlobStatter reader configured")
140	}
141
142	var closeChanOnce sync.Once
143	go func() {
144		wg.Wait()
145		closeChanOnce.Do(func() { close(maybeDup) })
146	}()
147
148	seen := make(map[blob.Ref]struct{}, len(blobs))
149	for {
150		select {
151		case <-ctx.Done():
152			return ctx.Err()
153		case err := <-errCh:
154			closeChanOnce.Do(func() { close(maybeDup) })
155			return err
156		case sr, ok := <-maybeDup:
157			if !ok {
158				return nil
159			}
160			if _, ok = seen[sr.Ref]; !ok {
161				seen[sr.Ref] = struct{}{}
162				if err := f(sr); err != nil {
163					return err
164				}
165			}
166		}
167	}
168}
169
170// EnumerateBlobs concurrently on the readers, returning one of the errors.
171func (sto *unionStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
172	return blobserver.MergedEnumerateStorage(ctx, dest, sto.subsets, after, limit)
173}
174
175func init() {
176	blobserver.RegisterStorageConstructor("union", blobserver.StorageConstructor(newFromConfig))
177}
178