1package promise 2 3import ( 4 "errors" 5 "sync" 6) 7 8type Future struct { 9 done chan struct{} 10 result interface{} 11 err error 12 once sync.Once 13} 14 15func NewFuture() *Future { 16 return &Future{ 17 done: make(chan struct{}), 18 } 19} 20 21func (f *Future) Get() (interface{}, error) { 22 <-f.done 23 return f.result, f.err 24} 25 26func (f *Future) Set(v interface{}, err error) { 27 f.once.Do(func() { 28 defer close(f.done) 29 f.result = v 30 f.err = err 31 }) 32} 33 34func (f *Future) FanInGet() ([]*FanResult, error) { 35 ret, err := f.Get() 36 if err != nil { 37 return nil, err 38 } 39 if fanRet, ok := ret.([]*FanResult); ok { 40 return fanRet, nil 41 } 42 return nil, errors.New("not a fan in future") 43} 44 45type FanResult struct { 46 Result interface{} 47 Err error 48} 49 50func FanIn(futures ...*Future) *Future { 51 f := NewFuture() 52 go func() { 53 fanResults := make([]*FanResult, len(futures)) 54 wg := new(sync.WaitGroup) 55 wg.Add(len(futures)) 56 for i, f := range futures { 57 go func(idx int) { 58 defer wg.Done() 59 ret, err := f.Get() 60 fanResults[idx] = &FanResult{Result: ret, Err: err} 61 }(i) 62 } 63 wg.Wait() 64 f.Set(fanResults, nil) 65 }() 66 return f 67} 68