1// Copyright 2018 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Package par implements parallel execution helpers.
6package par
7
8import (
9	"math/rand"
10	"sync"
11	"sync/atomic"
12)
13
14// Work manages a set of work items to be executed in parallel, at most once each.
15// The items in the set must all be valid map keys.
16type Work struct {
17	f       func(interface{}) // function to run for each item
18	running int               // total number of runners
19
20	mu      sync.Mutex
21	added   map[interface{}]bool // items added to set
22	todo    []interface{}        // items yet to be run
23	wait    sync.Cond            // wait when todo is empty
24	waiting int                  // number of runners waiting for todo
25}
26
27func (w *Work) init() {
28	if w.added == nil {
29		w.added = make(map[interface{}]bool)
30	}
31}
32
33// Add adds item to the work set, if it hasn't already been added.
34func (w *Work) Add(item interface{}) {
35	w.mu.Lock()
36	w.init()
37	if !w.added[item] {
38		w.added[item] = true
39		w.todo = append(w.todo, item)
40		if w.waiting > 0 {
41			w.wait.Signal()
42		}
43	}
44	w.mu.Unlock()
45}
46
47// Do runs f in parallel on items from the work set,
48// with at most n invocations of f running at a time.
49// It returns when everything added to the work set has been processed.
50// At least one item should have been added to the work set
51// before calling Do (or else Do returns immediately),
52// but it is allowed for f(item) to add new items to the set.
53// Do should only be used once on a given Work.
54func (w *Work) Do(n int, f func(item interface{})) {
55	if n < 1 {
56		panic("par.Work.Do: n < 1")
57	}
58	if w.running >= 1 {
59		panic("par.Work.Do: already called Do")
60	}
61
62	w.running = n
63	w.f = f
64	w.wait.L = &w.mu
65
66	for i := 0; i < n-1; i++ {
67		go w.runner()
68	}
69	w.runner()
70}
71
72// runner executes work in w until both nothing is left to do
73// and all the runners are waiting for work.
74// (Then all the runners return.)
75func (w *Work) runner() {
76	for {
77		// Wait for something to do.
78		w.mu.Lock()
79		for len(w.todo) == 0 {
80			w.waiting++
81			if w.waiting == w.running {
82				// All done.
83				w.wait.Broadcast()
84				w.mu.Unlock()
85				return
86			}
87			w.wait.Wait()
88			w.waiting--
89		}
90
91		// Pick something to do at random,
92		// to eliminate pathological contention
93		// in case items added at about the same time
94		// are most likely to contend.
95		i := rand.Intn(len(w.todo))
96		item := w.todo[i]
97		w.todo[i] = w.todo[len(w.todo)-1]
98		w.todo = w.todo[:len(w.todo)-1]
99		w.mu.Unlock()
100
101		w.f(item)
102	}
103}
104
105// Cache runs an action once per key and caches the result.
106type Cache struct {
107	m sync.Map
108}
109
110type cacheEntry struct {
111	done   uint32
112	mu     sync.Mutex
113	result interface{}
114}
115
116// Do calls the function f if and only if Do is being called for the first time with this key.
117// No call to Do with a given key returns until the one call to f returns.
118// Do returns the value returned by the one call to f.
119func (c *Cache) Do(key interface{}, f func() interface{}) interface{} {
120	entryIface, ok := c.m.Load(key)
121	if !ok {
122		entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry))
123	}
124	e := entryIface.(*cacheEntry)
125	if atomic.LoadUint32(&e.done) == 0 {
126		e.mu.Lock()
127		if atomic.LoadUint32(&e.done) == 0 {
128			e.result = f()
129			atomic.StoreUint32(&e.done, 1)
130		}
131		e.mu.Unlock()
132	}
133	return e.result
134}
135
136// Get returns the cached result associated with key.
137// It returns nil if there is no such result.
138// If the result for key is being computed, Get does not wait for the computation to finish.
139func (c *Cache) Get(key interface{}) interface{} {
140	entryIface, ok := c.m.Load(key)
141	if !ok {
142		return nil
143	}
144	e := entryIface.(*cacheEntry)
145	if atomic.LoadUint32(&e.done) == 0 {
146		return nil
147	}
148	return e.result
149}
150