1package drainer
2
3import (
4	"github.com/hashicorp/nomad/nomad/structs"
5)
6
7const (
8	// defaultMaxIdsPerTxn is the maximum number of IDs that can be included in a
9	// single Raft transaction. This is to ensure that the Raft message
10	// does not become too large.
11	defaultMaxIdsPerTxn = (1024 * 256) / 36 // 0.25 MB of ids.
12)
13
14// partitionIds takes a set of IDs and returns a partitioned view of them such
15// that no batch would result in an overly large raft transaction.
16func partitionIds(maxIds int, ids []string) [][]string {
17	index := 0
18	total := len(ids)
19	var partitions [][]string
20	for remaining := total - index; remaining > 0; remaining = total - index {
21		if remaining < maxIds {
22			partitions = append(partitions, ids[index:])
23			break
24		} else {
25			partitions = append(partitions, ids[index:index+maxIds])
26			index += maxIds
27		}
28	}
29
30	return partitions
31}
32
33// transitionTuple is used to group desired transitions and evals
34type transitionTuple struct {
35	Transitions map[string]*structs.DesiredTransition
36	Evals       []*structs.Evaluation
37}
38
39// partitionAllocDrain returns a list of alloc transitions and evals to apply
40// in a single raft transaction.This is necessary to ensure that the Raft
41// transaction does not become too large.
42func partitionAllocDrain(maxIds int, transitions map[string]*structs.DesiredTransition,
43	evals []*structs.Evaluation) []*transitionTuple {
44
45	// Determine a stable ordering of the transitioning allocs
46	allocs := make([]string, 0, len(transitions))
47	for id := range transitions {
48		allocs = append(allocs, id)
49	}
50
51	var requests []*transitionTuple
52	submittedEvals, submittedTrans := 0, 0
53	for submittedEvals != len(evals) || submittedTrans != len(transitions) {
54		req := &transitionTuple{
55			Transitions: make(map[string]*structs.DesiredTransition),
56		}
57		requests = append(requests, req)
58		available := maxIds
59
60		// Add the allocs first
61		if remaining := len(allocs) - submittedTrans; remaining > 0 {
62			if remaining <= available {
63				for _, id := range allocs[submittedTrans:] {
64					req.Transitions[id] = transitions[id]
65				}
66				available -= remaining
67				submittedTrans += remaining
68			} else {
69				for _, id := range allocs[submittedTrans : submittedTrans+available] {
70					req.Transitions[id] = transitions[id]
71				}
72				submittedTrans += available
73
74				// Exhausted space so skip adding evals
75				continue
76			}
77
78		}
79
80		// Add the evals
81		if remaining := len(evals) - submittedEvals; remaining > 0 {
82			if remaining <= available {
83				req.Evals = evals[submittedEvals:]
84				submittedEvals += remaining
85			} else {
86				req.Evals = evals[submittedEvals : submittedEvals+available]
87				submittedEvals += available
88			}
89		}
90	}
91
92	return requests
93}
94