1package promise
2
3import (
4	"errors"
5	"sync"
6)
7
8type Future struct {
9	done   chan struct{}
10	result interface{}
11	err    error
12	once   sync.Once
13}
14
15func NewFuture() *Future {
16	return &Future{
17		done: make(chan struct{}),
18	}
19}
20
21func (f *Future) Get() (interface{}, error) {
22	<-f.done
23	return f.result, f.err
24}
25
26func (f *Future) Set(v interface{}, err error) {
27	f.once.Do(func() {
28		defer close(f.done)
29		f.result = v
30		f.err = err
31	})
32}
33
34func (f *Future) FanInGet() ([]*FanResult, error) {
35	ret, err := f.Get()
36	if err != nil {
37		return nil, err
38	}
39	if fanRet, ok := ret.([]*FanResult); ok {
40		return fanRet, nil
41	}
42	return nil, errors.New("not a fan in future")
43}
44
45type FanResult struct {
46	Result interface{}
47	Err    error
48}
49
50func FanIn(futures ...*Future) *Future {
51	f := NewFuture()
52	go func() {
53		fanResults := make([]*FanResult, len(futures))
54		wg := new(sync.WaitGroup)
55		wg.Add(len(futures))
56		for i, f := range futures {
57			go func(idx int) {
58				defer wg.Done()
59				ret, err := f.Get()
60				fanResults[idx] = &FanResult{Result: ret, Err: err}
61			}(i)
62		}
63		wg.Wait()
64		f.Set(fanResults, nil)
65	}()
66	return f
67}
68